From 3ea5681f86b061965a8e1f571197102fec3de4e8 Mon Sep 17 00:00:00 2001 From: ForeverPyrite Date: Sat, 5 Oct 2024 16:57:03 -0400 Subject: [PATCH] Streaming Update (NEEDS OPTIMIZATIONS AND FAILSAFES!!!!!!!!!!!!!!!!!!!!!!) --- .gitignore | 1 - app.py | 40 ++++++++++++--------- main.py | 77 ++++++++++++++++------------------------ website/index.html | 2 +- website/static/script.js | 38 +++++++++++++------- 5 files changed, 81 insertions(+), 77 deletions(-) diff --git a/.gitignore b/.gitignore index afe8238..87d114f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ -thread-killer.py log.txt .env diff --git a/app.py b/app.py index c3497d9..1553329 100644 --- a/app.py +++ b/app.py @@ -1,23 +1,12 @@ from flask import Flask, render_template, Response, request -from main import get_auto_transcript, get_video_id, create_and_stream, EventHandler +from main import get_auto_transcript, get_video_id, create_and_stream, output_buffer, output_lock from datetime import datetime -import sys -import io +import threading import pytz import time app = Flask(__name__, static_folder="website/static", template_folder="website") -class StreamToLogger(io.StringIO): - def __init__(self): - super().__init__() - - def write(self, message): - # I could probably log stuff here - print(message, end='') # Print to standard output (console) - # You could also log this message or handle it differently. - - @app.route('/') def home(): return render_template('index.html') @@ -33,9 +22,6 @@ def streaming(): @app.route('/process_url', methods=['POST']) def process_url(): - old_stdout = sys.stdout - new_stdout = StreamToLogger() - sys.stdout = new_stdout # Opens a file to log the video id and the assistants respone to see if I can further improve instructions: #log = open("log.txt", "at", 1) url = request.form['url'] @@ -50,11 +36,31 @@ def process_url(): if (not transcript): return "Successfully parsed video ID from URL, however the ID was either invalid, the transcript was disabled by the video owner, or some other error was raised because of YouTube." + + # Start processing in a separate thread + threading.Thread(target=create_and_stream, args=(transcript,)).start() + # Process the transcript and stream the result. # response = create_and_stream(transcript) # log.write(f"\n\n\n### New Entry at {datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d %H:%M:%S')}\n\n URL: {url}\n Video ID: {video_id}\n\nAssistant Response: \n{response}") # Return a response - return Response(create_and_stream(transcript), content_type="text/plain", status=200, direct_passthrough=True) # Add more detailed output if needed + return Response("Processing started. Check /stream_output for updates.", status=200) # Add more detailed output if needed + +@app.route('/stream_output') +def stream_output(): + def yoink(): + while True: + with output_lock: + if output_buffer: + message = output_buffer.pop(0) + yield f"{message}" + + time.sleep(0.005) # Adjust as necessary for your application + + return Response(yoink(), content_type='text/plain') + + + if __name__ == '__main__': # Change this line to properly check for main app.run(debug=True) \ No newline at end of file diff --git a/main.py b/main.py index cd59220..f97dd3c 100644 --- a/main.py +++ b/main.py @@ -13,6 +13,12 @@ from openai import OpenAI # For streaming from typing_extensions import override +import threading + +# Output buffer and thread lock +output_buffer = [] +output_lock = threading.Lock() + # To get the env var from dotenv import load_dotenv import os @@ -21,36 +27,6 @@ load_dotenv() ### OpenAI Config -# This is copy and pasted straight up from the quickstart guide: -class EventHandler(AssistantEventHandler): - @override - def on_text_created(self, text) -> None: - print(f"\nassistant > ", end="", flush=True) - - - @override - def on_text_delta(self, delta, snapshot): - print(delta.value, end="", flush=True) - - - def on_tool_call_created(self, tool_call): - print(f"\nassistant > {tool_call.type}\n", flush=True) - - - def on_tool_call_delta(self, delta, snapshot): - if delta.type == 'code_interpreter': - if delta.code_interpreter.input: - print(delta.code_interpreter.input, end="", flush=True) - - if delta.code_interpreter.outputs: - print(f"\n\noutput >", flush=True) - - for output in delta.code_interpreter.outputs: - if output.type == "logs": - print(f"\n{output.logs}", flush=True) - - - # Setting up OpenAI Client with API Key api_key = os.getenv("OPENAI_API_KEY") client = OpenAI( @@ -62,23 +38,32 @@ client = OpenAI( # screw bardo assistant that is configured to make notes and 5Q&A based on any given YouTube Transcript asst_screw_bardo_id = "asst_JGFaX6uOIotqy5mIJnu3Yyp7" -# uhh no we need a new thread each time tf -# make sure to call the function after the transcript is confirmed to work, it would be very stupid to call the function and make a new thread this early -def create_and_stream(transcript): - with client.beta.threads.create_and_run_stream( - assistant_id=asst_screw_bardo_id, - thread={ - "messages" : [ - {"role": "user", - "content": transcript} - ] - }, - event_handler=EventHandler() - ) as stream: - stream.until_done() - messages = stream.get_final_messages() - return messages[0].content[0].text.value +# This is copy and pasted straight up from the quickstart guide, just appending to an output buffer instead of directly printing: +class EventHandler(AssistantEventHandler): + @override + def on_text_created(self, text) -> None: + with output_lock: + output_buffer.append(f"\nassistant > {text}") + + @override + def on_text_delta(self, delta, snapshot): + with output_lock: + output_buffer.append(delta.value) + def on_tool_call_created(self, tool_call): + with output_lock: + output_buffer.append(f"\nassistant > {tool_call.type}\n") + +def create_and_stream(transcript): + with client.beta.threads.create_and_run_stream( + assistant_id=asst_screw_bardo_id, + thread={ + "messages": [{"role": "user", "content": transcript}] + }, + event_handler=EventHandler() + ) as stream: + stream.until_done() + def get_video_id(url): youtu_be = r'(?<=youtu.be/)([A-Za-z0-9_-]{11})' youtube_com = r'(?<=youtube\.com\/watch\?v=)([A-Za-z0-9_-]{11})' diff --git a/website/index.html b/website/index.html index 24cc215..f283176 100644 --- a/website/index.html +++ b/website/index.html @@ -17,7 +17,7 @@
-

Response will appear here.

+
Response will appear here.        
diff --git a/website/static/script.js b/website/static/script.js index e35feb3..633ae33 100644 --- a/website/static/script.js +++ b/website/static/script.js @@ -1,13 +1,14 @@ - document.addEventListener("DOMContentLoaded", (event) => { + const response_area = document.getElementById('response-area'); document.getElementById('submit').addEventListener('click', function() { var url = document.getElementById('url_box').value; - const response_area = document.getElementById('response-area'); if (!url) { response_area.innerText = 'Please enter a URL.'; return; } + + // First, process the URL fetch('/process_url', { method: 'POST', headers: { @@ -15,6 +16,26 @@ document.addEventListener("DOMContentLoaded", (event) => { }, body: new URLSearchParams({ url: url }) }) + .then(response => { + if (!response.ok) { + throw new Error('Network response was not ok'); + } + // Start streaming once processing is started + streamOutput(response_area); + }) + .catch(error => { + console.error('Error processing URL:', error); + response_area.innerText = 'Error processing URL: ' + error.message; + }); + }); +}); + +function streamOutput(response_area) { + // Fetch the streaming output + const streamResponsePromise = fetch('/stream_output'); + response_area.innerHTML = "" + + streamResponsePromise .then(response => { const reader = response.body.getReader(); const decoder = new TextDecoder("utf-8"); @@ -28,14 +49,7 @@ document.addEventListener("DOMContentLoaded", (event) => { // Decode and process the chunk const chunk = decoder.decode(value, { stream: true }); - - // Split the received chunk by new line to handle multiple lines (if any) - chunk.split('\n').forEach(data => { - if (data.trim()) { // Avoid empty strings - // Update the inner HTML of the output div - response_area.innerHTML += `

${data}

`; - } - }); + response_area.innerHTML += chunk; // Continue reading readStream(); @@ -47,6 +61,6 @@ document.addEventListener("DOMContentLoaded", (event) => { }) .catch(error => { console.error('Error fetching stream:', error); + response_area.innerText = 'Error fetching stream: ' + error.message; }); - }); -}); +}