From 05b7c247a7088636186ddc13cff82b0153f3b4cf Mon Sep 17 00:00:00 2001 From: ForeverPyrite Date: Tue, 15 Oct 2024 14:26:59 -0400 Subject: [PATCH] =?UTF-8?q?streaming=20actually=20works=20=F0=9F=A4=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 52 +++++++++++----------- main.py | 94 +++++++++++++++++++++++++++------------- website/static/script.js | 12 ++--- website/static/style.css | 4 +- 4 files changed, 97 insertions(+), 65 deletions(-) diff --git a/app.py b/app.py index 1553329..02ffb50 100644 --- a/app.py +++ b/app.py @@ -1,30 +1,28 @@ from flask import Flask, render_template, Response, request -from main import get_auto_transcript, get_video_id, create_and_stream, output_buffer, output_lock +from main import get_auto_transcript, get_video_id, create_and_stream, output_stream, fake_stream, awaiter +from asyncio import sleep from datetime import datetime import threading import pytz import time + + app = Flask(__name__, static_folder="website/static", template_folder="website") @app.route('/') def home(): return render_template('index.html') -@app.route('/streamtest', methods=['POST']) -def streaming(): - def generate(): - for i in range(10): - yield f"Data chunk {i}\n" - time.sleep(1) # Simulating a delay in data generation - - return Response(generate(), content_type='text/plain') - @app.route('/process_url', methods=['POST']) def process_url(): # Opens a file to log the video id and the assistants respone to see if I can further improve instructions: #log = open("log.txt", "at", 1) url = request.form['url'] + if url == "test": + global thread + thread = threading.Thread(name="test_thread", target=fake_stream) + return Response("teehee", status=200) # Extract the video ID from the URL video_id = get_video_id(url) # Modify this function to accept the URL @@ -36,31 +34,31 @@ def process_url(): if (not transcript): return "Successfully parsed video ID from URL, however the ID was either invalid, the transcript was disabled by the video owner, or some other error was raised because of YouTube." + thread = threading.Thread(name="create_stream", target=create_and_stream, args=(transcript,)) # The comma here is very intentional, it's so that it iterates it as a tuple rather than iterateing the string. - # Start processing in a separate thread - threading.Thread(target=create_and_stream, args=(transcript,)).start() - - # Process the transcript and stream the result. - # response = create_and_stream(transcript) - # log.write(f"\n\n\n### New Entry at {datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d %H:%M:%S')}\n\n URL: {url}\n Video ID: {video_id}\n\nAssistant Response: \n{response}") - # Return a response return Response("Processing started. Check /stream_output for updates.", status=200) # Add more detailed output if needed @app.route('/stream_output') def stream_output(): def yoink(): - while True: - with output_lock: - if output_buffer: - message = output_buffer.pop(0) - yield f"{message}" - - time.sleep(0.005) # Adjust as necessary for your application - - return Response(yoink(), content_type='text/plain') + print("Starting stream thread.") + thread.start() + # Start streaming output from output_stream + print("Starting to stream output...") + most_recent = "" + while not output_stream.done: + if output_stream.buffer != []: + delta = output_stream.buffer.pop(0) + yield bytes(delta, encoding="utf-8") + else: + awaiter(sleep(0.05)) + output_stream.reset() + thread.join() + return + return Response(yoink(), content_type='text/plain', status=200) -if __name__ == '__main__': # Change this line to properly check for main +if __name__ == '__main__': app.run(debug=True) \ No newline at end of file diff --git a/main.py b/main.py index f97dd3c..6c7e477 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,3 @@ - # To parse video ids import re @@ -10,14 +9,45 @@ from youtube_transcript_api.formatters import TextFormatter # OpenAI API stuff import from openai import AssistantEventHandler from openai import OpenAI -# For streaming + +### For streaming from typing_extensions import override +import asyncio +awaiter = asyncio.run -import threading - -# Output buffer and thread lock -output_buffer = [] -output_lock = threading.Lock() +# The StreamOutput class to handle streaming +class StreamOutput: + + def __init__(self): + self.delta: str = "" + self.response: str = "" + self.done: bool = False + self.buffer: list = [] + + def reset(self): + self.delta = "" + self.response = "" + self.done = False + self.buffer: list = [] + print("Reset stream output obj") + + async def send_delta(self, delta): + self.delta = delta + self.response += delta + def get_index(list): + if len(list) == 0: + return 0 + else: + return len(list)-1 + if self.buffer != []: + try: + if self.delta != self.buffer[get_index(self.buffer)]: + self.buffer.append(delta) + except IndexError as index_error: + print(index_error) + self.buffer.append(delta) + + else: self.buffer.append(delta) # To get the env var from dotenv import load_dotenv @@ -25,6 +55,7 @@ import os load_dotenv() + ### OpenAI Config # Setting up OpenAI Client with API Key @@ -42,28 +73,37 @@ asst_screw_bardo_id = "asst_JGFaX6uOIotqy5mIJnu3Yyp7" class EventHandler(AssistantEventHandler): @override def on_text_created(self, text) -> None: - with output_lock: - output_buffer.append(f"\nassistant > {text}") + awaiter(output_stream.send_delta("Response Recieved:\nScrew-Bardo: ")) @override def on_text_delta(self, delta, snapshot): - with output_lock: - output_buffer.append(delta.value) + awaiter(output_stream.send_delta(delta.value)) def on_tool_call_created(self, tool_call): - with output_lock: - output_buffer.append(f"\nassistant > {tool_call.type}\n") + raise Exception("Assistant shouldn't be calling tools.") def create_and_stream(transcript): - with client.beta.threads.create_and_run_stream( - assistant_id=asst_screw_bardo_id, - thread={ - "messages": [{"role": "user", "content": transcript}] - }, - event_handler=EventHandler() - ) as stream: - stream.until_done() - + with client.beta.threads.create_and_run_stream( + assistant_id=asst_screw_bardo_id, + thread={ + "messages": [{"role": "user", "content": transcript}] + }, + event_handler=EventHandler() + ) as stream: + stream.until_done() + output_stream.done = True + +def fake_stream(): + i = 0 + STREAM = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 18] + print("Starting fake stream.") + while i <= len(STREAM)-1: + awaiter(asyncio.sleep(0.05)) + awaiter(output_stream.send_delta(str(STREAM[i]))) + i += 1 + output_stream.done = True + return + def get_video_id(url): youtu_be = r'(?<=youtu.be/)([A-Za-z0-9_-]{11})' youtube_com = r'(?<=youtube\.com\/watch\?v=)([A-Za-z0-9_-]{11})' @@ -91,12 +131,4 @@ def get_auto_transcript(video_id): txt_transcript = formatter.format_transcript(transcript) return txt_transcript - -# Stores the video id imputted by the user -""" -video_id = get_video_id() - -transcript = get_auto_transcript(video_id) - -create_and_stream(transcript) -""" \ No newline at end of file +output_stream = StreamOutput() \ No newline at end of file diff --git a/website/static/script.js b/website/static/script.js index 633ae33..28a9479 100644 --- a/website/static/script.js +++ b/website/static/script.js @@ -1,6 +1,7 @@ document.addEventListener("DOMContentLoaded", (event) => { const response_area = document.getElementById('response-area'); - document.getElementById('submit').addEventListener('click', function() { + const submit_button = document.getElementById('submit') + submit_button.addEventListener('click', function() { var url = document.getElementById('url_box').value; if (!url) { @@ -21,11 +22,13 @@ document.addEventListener("DOMContentLoaded", (event) => { throw new Error('Network response was not ok'); } // Start streaming once processing is started + submit_button.style.display = "none"; streamOutput(response_area); }) .catch(error => { console.error('Error processing URL:', error); response_area.innerText = 'Error processing URL: ' + error.message; + submit_button.style.display = "flex"; }); }); }); @@ -42,11 +45,10 @@ function streamOutput(response_area) { function readStream() { reader.read().then(({ done, value }) => { - if (done) { - console.log("Stream finished."); - return; + if(done) { + document.getElementById('submit').style.display = "flex"; + return } - // Decode and process the chunk const chunk = decoder.decode(value, { stream: true }); response_area.innerHTML += chunk; diff --git a/website/static/style.css b/website/static/style.css index afc363b..04c2e74 100644 --- a/website/static/style.css +++ b/website/static/style.css @@ -37,10 +37,10 @@ body .content { } #response-area { - display: block; + display: flex; height: 90%; min-height: 90vh; - overflow: auto; + text-wrap: wrap; flex-wrap: wrap; align-content: flex-end; }