I don't remember struggling so much with git...

This commit is contained in:
ForeverPyrite
2025-01-07 22:33:31 -05:00
parent 66fe18ec46
commit 1432d7aca1
3 changed files with 213 additions and 102 deletions

2
.gitignore vendored
View File

@@ -1,6 +1,6 @@
log.md
.env
.vscode
.vscode/
# Byte-compiled / optimized / DLL files
__pycache__/

3
.vscode/launch.json vendored
View File

@@ -8,9 +8,10 @@
"name": "Python Debugger: Flask",
"type": "debugpy",
"request": "launch",
"cwd": "./app",
"module": "flask",
"env": {
"FLASK_APP": "app.py",
"FLASK_APP": "./app.py",
"FLASK_DEBUG": "1"
},
"args": [

View File

@@ -1,134 +1,244 @@
# To parse video ids
import re
import threading
import asyncio
from asyncio import sleep
from typing_extensions import override
from datetime import datetime
import pytz
import os
import logging
import uuid
# Youtube Transcript stuff import
# Youtube Transcript imports
import youtube_transcript_api._errors
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import TextFormatter
# OpenAI API stuff import
# OpenAI API imports
from openai import AssistantEventHandler
from openai import OpenAI
### For streaming
from typing_extensions import override
import asyncio
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
# Initialize user stream dictionary
user_streams = {}
# Threading lock for thread safe stuff I think, idk it was used in the docs
stream_lock = threading.Lock()
# Handle async outside of async functions
awaiter = asyncio.run
# Configure logging
try:
logging.basicConfig(
filename='./logs/main.log',
level=logging.INFO,
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
except FileNotFoundError as e:
with open("./logs/main.log", "x"):
pass
logging.basicConfig(
filename='./logs/main.log',
level=logging.INFO,
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.info(f"No main.log file was found ({e}), so one was created.")
# The StreamOutput class to handle streaming
class StreamOutput:
def __init__(self):
self.delta: str = ""
self.response: str = ""
self.done: bool = False
self.buffer: list = []
def reset(self):
self.delta = ""
self.response = ""
self.done = False
self.buffer: list = []
async def send_delta(self, delta):
self.delta = delta
self.response += delta
def get_index(list):
if len(list) == 0:
return 0
def __init__(self):
self.delta: str = ""
self.response: str = ""
self.done: bool = False
self.buffer: list = []
def reset(self):
self.delta = ""
self.response = ""
self.done = False
self.buffer = []
def send_delta(self, delta):
awaiter(self.process_delta(delta))
async def process_delta(self, delta):
self.delta = delta
self.response += delta
def get_index(lst):
if len(lst) == 0:
return 0
else:
return len(lst) - 1
if self.buffer:
try:
if self.delta != self.buffer[get_index(self.buffer)]:
self.buffer.append(delta)
except IndexError as index_error:
logging.error(f"Caught IndexError: {str(index_error)}")
self.buffer.append(delta)
else:
return len(list)-1
if self.buffer != []:
try:
if self.delta != self.buffer[get_index(self.buffer)]:
self.buffer.append(delta)
except IndexError as index_error:
log(f"\nCaught IndexError: {str(index_error)}")
self.buffer.append(delta)
else: self.buffer.append(delta)
return
# To get the env var
from dotenv import load_dotenv
import os
load_dotenv()
# For logging
import pytz
from datetime import datetime
def log(message):
try:
with open("logs/log.md", "a") as file:
file.write(message)
except FileNotFoundError:
with open("logs/log.md", "x+"):
log(message)
### OpenAI Config
# OpenAI Config
# Setting up OpenAI Client with API Key
client = OpenAI(
organization='org-7ANUFsqOVIXLLNju8Rvmxu3h',
project="proj_NGz8Kux8CSka7DRJucAlDCz6",
api_key=os.getenv("OPENAI_API_KEY")
organization='org-7ANUFsqOVIXLLNju8Rvmxu3h',
project="proj_NGz8Kux8CSka7DRJucAlDCz6",
api_key=os.getenv("OPENAI_API_KEY")
)
# screw bardo assistant that is configured to make notes and 5Q&A based on any given YouTube Transcript
# Screw Bardo Assistant ID
asst_screw_bardo_id = "asst_JGFaX6uOIotqy5mIJnu3Yyp7"
# This is copy and pasted straight up from the quickstart guide, just appending to an output buffer instead of directly printing:
class EventHandler(AssistantEventHandler):
@override
def on_text_created(self, text) -> None:
awaiter(output_stream.send_delta("Response Recieved:\n\nScrew-Bardo:\n\n"))
# Event Handler for OpenAI Assistant
class EventHandler(AssistantEventHandler):
def __init__(self, output_stream: StreamOutput):
super().__init__()
self.output_stream = output_stream
@override
def on_text_delta(self, delta, snapshot):
awaiter(output_stream.send_delta(delta.value))
@override
def on_text_created(self, text) -> None:
self.output_stream.send_delta("Response Received:\n\nScrew-Bardo:\n\n")
logging.info("Text created event handled.")
def on_tool_call_created(self, tool_call):
raise Exception("Assistant shouldn't be calling tools.")
@override
def on_text_delta(self, delta, snapshot):
self.output_stream.send_delta(delta.value)
logging.debug(f"Text delta received: {delta.value}")
def create_and_stream(transcript):
with client.beta.threads.create_and_run_stream(
assistant_id=asst_screw_bardo_id,
thread={
"messages": [{"role": "user", "content": transcript}]
},
event_handler=EventHandler()
) as stream:
stream.until_done()
output_stream.done = True
def on_tool_call_created(self, tool_call):
error_msg = "Assistant shouldn't be calling tools."
logging.error(error_msg)
raise Exception(error_msg)
def create_and_stream(transcript, session_id):
logging.info(f"Starting OpenAI stream thread for session {session_id}.")
event_handler = EventHandler(user_streams[session_id]['output_stream'])
try:
with client.beta.threads.create_and_run_stream(
assistant_id=asst_screw_bardo_id,
thread={
"messages": [{"role": "user", "content": transcript}]
},
event_handler=event_handler
) as stream:
stream.until_done()
with stream_lock:
user_streams[session_id]['output_stream'].done = True
logging.info(f"OpenAI stream completed for session {session_id}.")
except Exception as e:
logging.exception(f"Exception occurred during create_and_stream for session {session_id}.")
def yoink(session_id):
logging.info(f"Starting stream for session {session_id}...")
with stream_lock:
user_data = user_streams.get(session_id)
if not user_data:
logging.critical(f"User data not found for session id {session_id}?")
return # Session might have ended
output_stream: StreamOutput = user_data.get('output_stream')
thread: threading.Thread = user_data.get('thread')
thread.start()
while True:
if not output_stream or not thread:
logging.error(f"No output stream/thread for session {session_id}.\nThread: {thread.name if thread else "None"}")
break
if output_stream.done and not output_stream.buffer:
break
try:
if output_stream.buffer:
delta = output_stream.buffer.pop(0)
yield bytes(delta, encoding="utf-8")
else:
asyncio.run(sleep(0.018))
except Exception as e:
logging.exception(f"Exception occurred during streaming for session {session_id}: {e}")
break
logging.info(f"Stream completed successfully for session {session_id}.")
logging.info(f"Completed Assistant Response for session {session_id}:\n{output_stream.response}")
with stream_lock:
thread.join()
del user_streams[session_id]
logging.info(f"Stream thread joined and resources cleaned up for session {session_id}.")
def process(url, session_id):
# Should initialize the key in the dictionary
current_time = datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d %H:%M:%S')
logging.info(f"New Entry at {current_time} for session {session_id}")
logging.info(f"URL: {url}")
video_id = get_video_id(url)
if not video_id:
logging.warning(f"Could not parse video id from URL: {url}")
return (False, "Couldn't parse video ID from URL. (Are you sure you entered a valid YouTube.com or YouTu.be URL?)", 400)
logging.info(f"Parsed Video ID: {video_id}")
# Get the transcript for that video ID
transcript = get_auto_transcript(video_id)
if not transcript:
logging.error(f"Error: could not retrieve transcript for session {session_id}. Assistant won't be called.")
return (False, "Successfully parsed video ID from URL, however the ID was either invalid, the transcript was disabled by the video owner, or some other error was raised because of YouTube.", 200)
user_streams[session_id] = {
'output_stream': None, # Ensure output_stream is per user
'thread': None
}
# Create a new StreamOutput for the session
with stream_lock:
user_streams[session_id]['output_stream'] = StreamOutput()
thread = threading.Thread(
name=f"create_stream_{session_id}",
target=create_and_stream,
args=(transcript, session_id)
)
user_streams[session_id]['thread'] = thread
logging.info(f"Stream preparation complete for session {session_id}, sending reply.")
return (True, None, None)
def get_video_id(url):
youtu_be = r'(?<=youtu.be/)([A-Za-z0-9_-]{11})'
youtube_com = r'(?<=youtube\.com\/watch\?v=)([A-Za-z0-9_-]{11})'
id = re.search(youtu_be, url)
if not id:
id = re.search(youtube_com, url)
if not id:
# Couldn't parse video ID from URL
return None
return id.group(1)
youtu_be = r'(?<=youtu.be/)([A-Za-z0-9_-]{11})'
youtube_com = r'(?<=youtube\.com\/watch\?v=)([A-Za-z0-9_-]{11})'
id_match = re.search(youtu_be, url)
if not id_match:
id_match = re.search(youtube_com, url)
if not id_match:
# Couldn't parse video ID from URL
logging.warning(f"Failed to parse video ID from URL: {url}")
return None
return id_match.group(1)
# Takes the transcript and formats it in basic text before writing it to auto-transcript.txt
def get_auto_transcript(video_id):
trans_api_errors = youtube_transcript_api._errors
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en'], proxies=None, cookies=None, preserve_formatting=False)
except trans_api_errors.TranscriptsDisabled as e:
log(f'\n\n# Exception while fetching transcript:\n \n{e}\n')
return None
formatter = TextFormatter() # Ensure that you create an instance of TextFormatter
trans_api_errors = youtube_transcript_api._errors
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en'], proxies=None, cookies=None, preserve_formatting=False)
except trans_api_errors.TranscriptsDisabled as e:
logging.exception(f"Exception while fetching transcript: {e}")
return None
txt_transcript = formatter.format_transcript(transcript)
return txt_transcript
formatter = TextFormatter() # Ensure that you create an instance of TextFormatter
txt_transcript = formatter.format_transcript(transcript)
logging.info("Transcript successfully retrieved and formatted.")
return txt_transcript
# Initialize output stream
output_stream = StreamOutput()
log(f"\n\n# Main initilized at {datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d %H:%M:%S')}. Presumeably application starting.\n")
logging.info(f"Main initialized at {datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d %H:%M:%S')}. Presumably application starting.")