lets fucking go
This commit is contained in:
3
.vscode/launch.json
vendored
3
.vscode/launch.json
vendored
@@ -8,9 +8,10 @@
|
||||
"name": "Python Debugger: Flask",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"cwd": "./app",
|
||||
"module": "flask",
|
||||
"env": {
|
||||
"FLASK_APP": "./app/app.py",
|
||||
"FLASK_APP": "./app.py",
|
||||
"FLASK_DEBUG": "1"
|
||||
},
|
||||
"args": [
|
||||
|
||||
56
app/app.py
56
app/app.py
@@ -1,40 +1,68 @@
|
||||
import logging
|
||||
from flask import Flask, render_template, Response, request
|
||||
from main import yoink, process
|
||||
import os
|
||||
from flask import Flask, render_template, Response, request, session
|
||||
from main import yoink, process, user_streams, stream_lock
|
||||
import uuid # Import UUID
|
||||
|
||||
app = Flask(__name__, static_folder="website/static", template_folder="website")
|
||||
app.secret_key = os.urandom(24) # Necessary for using sessions
|
||||
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
filename='./logs/app.log',
|
||||
level=logging.INFO,
|
||||
level=logging.DEBUG,
|
||||
format='%(asctime)s %(levelname)s: %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
|
||||
def create_session():
|
||||
session_id = str(uuid.uuid4())
|
||||
# This should never happen but I'm putting the logic there anyways
|
||||
try:
|
||||
if user_streams[session_id]:
|
||||
session_id = create_session()
|
||||
except KeyError:
|
||||
pass
|
||||
return session_id
|
||||
|
||||
|
||||
@app.route('/')
|
||||
def home():
|
||||
logging.info("Home page accessed.")
|
||||
return render_template('index.html')
|
||||
session_id = create_session()
|
||||
session['id'] = session_id
|
||||
logging.info(f"Home page accessed. Assigned initial session ID: {session_id}")
|
||||
return render_template('index.html', session_id=session_id)
|
||||
|
||||
@app.route('/process_url', methods=['POST'])
|
||||
def process_url():
|
||||
global most_recent_thread
|
||||
session_id = session.get('id')
|
||||
if not session_id:
|
||||
session_id = create_session()
|
||||
session['id'] = session_id
|
||||
logging.info(f"No existing session. Created new session ID: {session_id}")
|
||||
|
||||
url = request.form['url']
|
||||
logging.info(f"Received URL for processing: {url}")
|
||||
success, msg, status_code, most_recent_thread = process(url)
|
||||
logging.info(f"Received URL for processing from session {session_id}: {url}")
|
||||
success, msg, status_code, = process(url, session_id)
|
||||
|
||||
if success:
|
||||
logging.info("Processing started successfully.")
|
||||
logging.info(f"Processing started successfully for session {session_id}.")
|
||||
return Response("Processing started. Check /stream_output for updates.", content_type='text/plain', status=200)
|
||||
else:
|
||||
logging.error(f"Processing failed: {msg}")
|
||||
logging.error(f"Processing failed for session {session_id}: {msg}")
|
||||
return Response(msg, content_type='text/plain', status=status_code)
|
||||
|
||||
@app.route('/stream_output')
|
||||
def stream_output():
|
||||
logging.info("Streaming output requested.")
|
||||
return Response(yoink(most_recent_thread), content_type='text/plain', status=200)
|
||||
session_id = session.get('id')
|
||||
if not session_id or session_id not in user_streams:
|
||||
logging.warning(f"Stream requested without a valid session ID: {session_id}")
|
||||
return Response("No active stream for this session.", content_type='text/plain', status=400)
|
||||
logging.info(f"Streaming output requested for session {session_id}.")
|
||||
return Response(yoink(session_id), content_type='text/plain', status=200)
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.info("Starting Flask application.")
|
||||
app.run(debug=True)
|
||||
logging.info("Starting Flask application.")
|
||||
app.run(debug=True, threaded=True) # Enable threaded to handle multiple requests
|
||||
|
||||
131
app/main.py
131
app/main.py
@@ -7,6 +7,7 @@ from datetime import datetime
|
||||
import pytz
|
||||
import os
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
# Youtube Transcript imports
|
||||
import youtube_transcript_api._errors
|
||||
@@ -21,16 +22,34 @@ from openai import OpenAI
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
# Initialize user stream dictionary
|
||||
user_streams = {}
|
||||
|
||||
# Threading lock for thread safe stuff I think, idk it was used in the docs
|
||||
stream_lock = threading.Lock()
|
||||
|
||||
# Handle async outside of async functions
|
||||
awaiter = asyncio.run
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
filename='./logs/main.log',
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s %(levelname)s: %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
try:
|
||||
logging.basicConfig(
|
||||
filename='./logs/main.log',
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s %(levelname)s: %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
except FileNotFoundError as e:
|
||||
with open("./logs/main.log", "x"):
|
||||
pass
|
||||
logging.basicConfig(
|
||||
filename='./logs/main.log',
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s %(levelname)s: %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
logging.info(f"No main.log file was found ({e}), so one was created.")
|
||||
|
||||
|
||||
# The StreamOutput class to handle streaming
|
||||
class StreamOutput:
|
||||
@@ -84,14 +103,19 @@ asst_screw_bardo_id = "asst_JGFaX6uOIotqy5mIJnu3Yyp7"
|
||||
|
||||
# Event Handler for OpenAI Assistant
|
||||
class EventHandler(AssistantEventHandler):
|
||||
|
||||
def __init__(self, output_stream: StreamOutput):
|
||||
super().__init__()
|
||||
self.output_stream = output_stream
|
||||
|
||||
@override
|
||||
def on_text_created(self, text) -> None:
|
||||
output_stream.send_delta("Response Received:\n\nScrew-Bardo:\n\n")
|
||||
self.output_stream.send_delta("Response Received:\n\nScrew-Bardo:\n\n")
|
||||
logging.info("Text created event handled.")
|
||||
|
||||
@override
|
||||
def on_text_delta(self, delta, snapshot):
|
||||
output_stream.send_delta(delta.value)
|
||||
self.output_stream.send_delta(delta.value)
|
||||
logging.debug(f"Text delta received: {delta.value}")
|
||||
|
||||
def on_tool_call_created(self, tool_call):
|
||||
@@ -99,67 +123,92 @@ class EventHandler(AssistantEventHandler):
|
||||
logging.error(error_msg)
|
||||
raise Exception(error_msg)
|
||||
|
||||
def create_and_stream(transcript):
|
||||
logging.info("Starting OpenAI stream thread.")
|
||||
def create_and_stream(transcript, session_id):
|
||||
logging.info(f"Starting OpenAI stream thread for session {session_id}.")
|
||||
event_handler = EventHandler(user_streams[session_id]['output_stream'])
|
||||
try:
|
||||
with client.beta.threads.create_and_run_stream(
|
||||
assistant_id=asst_screw_bardo_id,
|
||||
thread={
|
||||
"messages": [{"role": "user", "content": transcript}]
|
||||
},
|
||||
event_handler=EventHandler()
|
||||
event_handler=event_handler
|
||||
) as stream:
|
||||
stream.until_done()
|
||||
output_stream.done = True
|
||||
logging.info("OpenAI stream completed.")
|
||||
with stream_lock:
|
||||
user_streams[session_id]['output_stream'].done = True
|
||||
logging.info(f"OpenAI stream completed for session {session_id}.")
|
||||
except Exception as e:
|
||||
logging.exception("Exception occurred during create_and_stream.")
|
||||
logging.exception(f"Exception occurred during create_and_stream for session {session_id}.")
|
||||
|
||||
def yoink(thread: threading.Thread):
|
||||
logging.info("Starting stream thread...")
|
||||
thread.start()
|
||||
logging.info("Stream thread started. Beginning to stream output.")
|
||||
def yoink(session_id):
|
||||
logging.info(f"Starting stream for session {session_id}...")
|
||||
with stream_lock:
|
||||
user_data = user_streams.get(session_id)
|
||||
if not user_data:
|
||||
logging.critical(f"User data not found for session id {session_id}?")
|
||||
return # Session might have ended
|
||||
output_stream: StreamOutput = user_data.get('output_stream')
|
||||
thread: threading.Thread = user_data.get('thread')
|
||||
thread.start()
|
||||
while True:
|
||||
|
||||
try:
|
||||
while not output_stream.done:
|
||||
if not output_stream or not thread:
|
||||
logging.error(f"No output stream/thread for session {session_id}.\nThread: {thread.name if thread else "None"}")
|
||||
break
|
||||
|
||||
if output_stream.done and not output_stream.buffer:
|
||||
break
|
||||
|
||||
try:
|
||||
if output_stream.buffer:
|
||||
delta = output_stream.buffer.pop(0)
|
||||
yield bytes(delta, encoding="utf-8")
|
||||
else:
|
||||
asyncio.run(sleep(0.018))
|
||||
except Exception as e:
|
||||
logging.exception("Exception occurred during streaming output.")
|
||||
finally:
|
||||
logging.info("Stream completed successfully.")
|
||||
logging.info(f"Completed Assistant Response:\n{output_stream.response}")
|
||||
output_stream.reset()
|
||||
thread.join()
|
||||
logging.info("Stream thread joined. Task completed successfully without errors.")
|
||||
except Exception as e:
|
||||
logging.exception(f"Exception occurred during streaming for session {session_id}: {e}")
|
||||
break
|
||||
|
||||
def process(url):
|
||||
logging.info(f"Stream completed successfully for session {session_id}.")
|
||||
logging.info(f"Completed Assistant Response for session {session_id}:\n{output_stream.response}")
|
||||
with stream_lock:
|
||||
thread.join()
|
||||
del user_streams[session_id]
|
||||
logging.info(f"Stream thread joined and resources cleaned up for session {session_id}.")
|
||||
|
||||
def process(url, session_id):
|
||||
# Should initialize the key in the dictionary
|
||||
current_time = datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d %H:%M:%S')
|
||||
logging.info(f"New Entry at {current_time}")
|
||||
logging.info(f"New Entry at {current_time} for session {session_id}")
|
||||
logging.info(f"URL: {url}")
|
||||
|
||||
video_id = get_video_id(url)
|
||||
if not video_id:
|
||||
logging.warning(f"Could not parse video id from URL: {url}")
|
||||
return (False, "Couldn't parse video ID from URL. (Are you sure you entered a valid YouTube.com or YouTu.be URL?)", 400, None)
|
||||
return (False, "Couldn't parse video ID from URL. (Are you sure you entered a valid YouTube.com or YouTu.be URL?)", 400)
|
||||
logging.info(f"Parsed Video ID: {video_id}")
|
||||
|
||||
# Get the transcript for that video ID
|
||||
transcript = get_auto_transcript(video_id)
|
||||
if not transcript:
|
||||
logging.error("Error: could not retrieve transcript. Assistant won't be called.")
|
||||
return (False, "Successfully parsed video ID from URL, however the ID was either invalid, the transcript was disabled by the video owner, or some other error was raised because of YouTube.", 200, None)
|
||||
|
||||
thread = threading.Thread(
|
||||
name="create_stream",
|
||||
target=create_and_stream,
|
||||
args=(transcript,)
|
||||
)
|
||||
logging.info("Stream preparation complete, sending reply.")
|
||||
return (True, None, None, thread)
|
||||
logging.error(f"Error: could not retrieve transcript for session {session_id}. Assistant won't be called.")
|
||||
return (False, "Successfully parsed video ID from URL, however the ID was either invalid, the transcript was disabled by the video owner, or some other error was raised because of YouTube.", 200)
|
||||
user_streams[session_id] = {
|
||||
'output_stream': None, # Ensure output_stream is per user
|
||||
'thread': None
|
||||
}
|
||||
# Create a new StreamOutput for the session
|
||||
with stream_lock:
|
||||
user_streams[session_id]['output_stream'] = StreamOutput()
|
||||
thread = threading.Thread(
|
||||
name=f"create_stream_{session_id}",
|
||||
target=create_and_stream,
|
||||
args=(transcript, session_id)
|
||||
)
|
||||
user_streams[session_id]['thread'] = thread
|
||||
logging.info(f"Stream preparation complete for session {session_id}, sending reply.")
|
||||
return (True, None, None)
|
||||
|
||||
def get_video_id(url):
|
||||
youtu_be = r'(?<=youtu.be/)([A-Za-z0-9_-]{11})'
|
||||
|
||||
BIN
requirements.txt
BIN
requirements.txt
Binary file not shown.
Reference in New Issue
Block a user