streaming actually works 🤨

This commit is contained in:
ForeverPyrite
2024-10-15 14:26:59 -04:00
parent 3ea5681f86
commit 05b7c247a7
4 changed files with 97 additions and 65 deletions

52
app.py
View File

@@ -1,30 +1,28 @@
from flask import Flask, render_template, Response, request from flask import Flask, render_template, Response, request
from main import get_auto_transcript, get_video_id, create_and_stream, output_buffer, output_lock from main import get_auto_transcript, get_video_id, create_and_stream, output_stream, fake_stream, awaiter
from asyncio import sleep
from datetime import datetime from datetime import datetime
import threading import threading
import pytz import pytz
import time import time
app = Flask(__name__, static_folder="website/static", template_folder="website") app = Flask(__name__, static_folder="website/static", template_folder="website")
@app.route('/') @app.route('/')
def home(): def home():
return render_template('index.html') return render_template('index.html')
@app.route('/streamtest', methods=['POST'])
def streaming():
def generate():
for i in range(10):
yield f"Data chunk {i}\n"
time.sleep(1) # Simulating a delay in data generation
return Response(generate(), content_type='text/plain')
@app.route('/process_url', methods=['POST']) @app.route('/process_url', methods=['POST'])
def process_url(): def process_url():
# Opens a file to log the video id and the assistants respone to see if I can further improve instructions: # Opens a file to log the video id and the assistants respone to see if I can further improve instructions:
#log = open("log.txt", "at", 1) #log = open("log.txt", "at", 1)
url = request.form['url'] url = request.form['url']
if url == "test":
global thread
thread = threading.Thread(name="test_thread", target=fake_stream)
return Response("teehee", status=200)
# Extract the video ID from the URL # Extract the video ID from the URL
video_id = get_video_id(url) # Modify this function to accept the URL video_id = get_video_id(url) # Modify this function to accept the URL
@@ -36,31 +34,31 @@ def process_url():
if (not transcript): if (not transcript):
return "Successfully parsed video ID from URL, however the ID was either invalid, the transcript was disabled by the video owner, or some other error was raised because of YouTube." return "Successfully parsed video ID from URL, however the ID was either invalid, the transcript was disabled by the video owner, or some other error was raised because of YouTube."
thread = threading.Thread(name="create_stream", target=create_and_stream, args=(transcript,)) # The comma here is very intentional, it's so that it iterates it as a tuple rather than iterateing the string.
# Start processing in a separate thread
threading.Thread(target=create_and_stream, args=(transcript,)).start()
# Process the transcript and stream the result.
# response = create_and_stream(transcript)
# log.write(f"\n\n\n### New Entry at {datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d %H:%M:%S')}\n\n URL: {url}\n Video ID: {video_id}\n\nAssistant Response: \n{response}")
# Return a response
return Response("Processing started. Check /stream_output for updates.", status=200) # Add more detailed output if needed return Response("Processing started. Check /stream_output for updates.", status=200) # Add more detailed output if needed
@app.route('/stream_output') @app.route('/stream_output')
def stream_output(): def stream_output():
def yoink(): def yoink():
while True: print("Starting stream thread.")
with output_lock: thread.start()
if output_buffer: # Start streaming output from output_stream
message = output_buffer.pop(0) print("Starting to stream output...")
yield f"{message}" most_recent = ""
while not output_stream.done:
time.sleep(0.005) # Adjust as necessary for your application if output_stream.buffer != []:
delta = output_stream.buffer.pop(0)
return Response(yoink(), content_type='text/plain') yield bytes(delta, encoding="utf-8")
else:
awaiter(sleep(0.05))
output_stream.reset()
thread.join()
return
return Response(yoink(), content_type='text/plain', status=200)
if __name__ == '__main__': # Change this line to properly check for main if __name__ == '__main__':
app.run(debug=True) app.run(debug=True)

94
main.py
View File

@@ -1,4 +1,3 @@
# To parse video ids # To parse video ids
import re import re
@@ -10,14 +9,45 @@ from youtube_transcript_api.formatters import TextFormatter
# OpenAI API stuff import # OpenAI API stuff import
from openai import AssistantEventHandler from openai import AssistantEventHandler
from openai import OpenAI from openai import OpenAI
# For streaming
### For streaming
from typing_extensions import override from typing_extensions import override
import asyncio
awaiter = asyncio.run
import threading # The StreamOutput class to handle streaming
class StreamOutput:
# Output buffer and thread lock
output_buffer = [] def __init__(self):
output_lock = threading.Lock() self.delta: str = ""
self.response: str = ""
self.done: bool = False
self.buffer: list = []
def reset(self):
self.delta = ""
self.response = ""
self.done = False
self.buffer: list = []
print("Reset stream output obj")
async def send_delta(self, delta):
self.delta = delta
self.response += delta
def get_index(list):
if len(list) == 0:
return 0
else:
return len(list)-1
if self.buffer != []:
try:
if self.delta != self.buffer[get_index(self.buffer)]:
self.buffer.append(delta)
except IndexError as index_error:
print(index_error)
self.buffer.append(delta)
else: self.buffer.append(delta)
# To get the env var # To get the env var
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -25,6 +55,7 @@ import os
load_dotenv() load_dotenv()
### OpenAI Config ### OpenAI Config
# Setting up OpenAI Client with API Key # Setting up OpenAI Client with API Key
@@ -42,28 +73,37 @@ asst_screw_bardo_id = "asst_JGFaX6uOIotqy5mIJnu3Yyp7"
class EventHandler(AssistantEventHandler): class EventHandler(AssistantEventHandler):
@override @override
def on_text_created(self, text) -> None: def on_text_created(self, text) -> None:
with output_lock: awaiter(output_stream.send_delta("Response Recieved:\nScrew-Bardo: "))
output_buffer.append(f"\nassistant > {text}")
@override @override
def on_text_delta(self, delta, snapshot): def on_text_delta(self, delta, snapshot):
with output_lock: awaiter(output_stream.send_delta(delta.value))
output_buffer.append(delta.value)
def on_tool_call_created(self, tool_call): def on_tool_call_created(self, tool_call):
with output_lock: raise Exception("Assistant shouldn't be calling tools.")
output_buffer.append(f"\nassistant > {tool_call.type}\n")
def create_and_stream(transcript): def create_and_stream(transcript):
with client.beta.threads.create_and_run_stream( with client.beta.threads.create_and_run_stream(
assistant_id=asst_screw_bardo_id, assistant_id=asst_screw_bardo_id,
thread={ thread={
"messages": [{"role": "user", "content": transcript}] "messages": [{"role": "user", "content": transcript}]
}, },
event_handler=EventHandler() event_handler=EventHandler()
) as stream: ) as stream:
stream.until_done() stream.until_done()
output_stream.done = True
def fake_stream():
i = 0
STREAM = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 18]
print("Starting fake stream.")
while i <= len(STREAM)-1:
awaiter(asyncio.sleep(0.05))
awaiter(output_stream.send_delta(str(STREAM[i])))
i += 1
output_stream.done = True
return
def get_video_id(url): def get_video_id(url):
youtu_be = r'(?<=youtu.be/)([A-Za-z0-9_-]{11})' youtu_be = r'(?<=youtu.be/)([A-Za-z0-9_-]{11})'
youtube_com = r'(?<=youtube\.com\/watch\?v=)([A-Za-z0-9_-]{11})' youtube_com = r'(?<=youtube\.com\/watch\?v=)([A-Za-z0-9_-]{11})'
@@ -91,12 +131,4 @@ def get_auto_transcript(video_id):
txt_transcript = formatter.format_transcript(transcript) txt_transcript = formatter.format_transcript(transcript)
return txt_transcript return txt_transcript
output_stream = StreamOutput()
# Stores the video id imputted by the user
"""
video_id = get_video_id()
transcript = get_auto_transcript(video_id)
create_and_stream(transcript)
"""

View File

@@ -1,6 +1,7 @@
document.addEventListener("DOMContentLoaded", (event) => { document.addEventListener("DOMContentLoaded", (event) => {
const response_area = document.getElementById('response-area'); const response_area = document.getElementById('response-area');
document.getElementById('submit').addEventListener('click', function() { const submit_button = document.getElementById('submit')
submit_button.addEventListener('click', function() {
var url = document.getElementById('url_box').value; var url = document.getElementById('url_box').value;
if (!url) { if (!url) {
@@ -21,11 +22,13 @@ document.addEventListener("DOMContentLoaded", (event) => {
throw new Error('Network response was not ok'); throw new Error('Network response was not ok');
} }
// Start streaming once processing is started // Start streaming once processing is started
submit_button.style.display = "none";
streamOutput(response_area); streamOutput(response_area);
}) })
.catch(error => { .catch(error => {
console.error('Error processing URL:', error); console.error('Error processing URL:', error);
response_area.innerText = 'Error processing URL: ' + error.message; response_area.innerText = 'Error processing URL: ' + error.message;
submit_button.style.display = "flex";
}); });
}); });
}); });
@@ -42,11 +45,10 @@ function streamOutput(response_area) {
function readStream() { function readStream() {
reader.read().then(({ done, value }) => { reader.read().then(({ done, value }) => {
if (done) { if(done) {
console.log("Stream finished."); document.getElementById('submit').style.display = "flex";
return; return
} }
// Decode and process the chunk // Decode and process the chunk
const chunk = decoder.decode(value, { stream: true }); const chunk = decoder.decode(value, { stream: true });
response_area.innerHTML += chunk; response_area.innerHTML += chunk;

View File

@@ -37,10 +37,10 @@ body .content {
} }
#response-area { #response-area {
display: block; display: flex;
height: 90%; height: 90%;
min-height: 90vh; min-height: 90vh;
overflow: auto; text-wrap: wrap;
flex-wrap: wrap; flex-wrap: wrap;
align-content: flex-end; align-content: flex-end;
} }