import re import threading import asyncio from asyncio import sleep from typing_extensions import override from datetime import datetime import pytz import os import logging # Youtube Transcript imports import youtube_transcript_api._errors from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api.formatters import TextFormatter # OpenAI API imports from openai import AssistantEventHandler from openai import OpenAI # Load environment variables from dotenv import load_dotenv load_dotenv() # Handle async outside of async functions awaiter = asyncio.run # Configure logging logging.basicConfig( filename='./logs/main.log', level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # The StreamOutput class to handle streaming class StreamOutput: def __init__(self): self.delta: str = "" self.response: str = "" self.done: bool = False self.buffer: list = [] def reset(self): self.delta = "" self.response = "" self.done = False self.buffer = [] def send_delta(self, delta): awaiter(self.process_delta(delta)) async def process_delta(self, delta): self.delta = delta self.response += delta def get_index(lst): if len(lst) == 0: return 0 else: return len(lst) - 1 if self.buffer: try: if self.delta != self.buffer[get_index(self.buffer)]: self.buffer.append(delta) except IndexError as index_error: logging.error(f"Caught IndexError: {str(index_error)}") self.buffer.append(delta) else: self.buffer.append(delta) return # OpenAI Config # Setting up OpenAI Client with API Key client = OpenAI( organization='org-7ANUFsqOVIXLLNju8Rvmxu3h', project="proj_NGz8Kux8CSka7DRJucAlDCz6", api_key=os.getenv("OPENAI_API_KEY") ) # Screw Bardo Assistant ID asst_screw_bardo_id = "asst_JGFaX6uOIotqy5mIJnu3Yyp7" # Event Handler for OpenAI Assistant class EventHandler(AssistantEventHandler): @override def on_text_created(self, text) -> None: output_stream.send_delta("Response Received:\n\nScrew-Bardo:\n\n") logging.info("Text created event handled.") @override def on_text_delta(self, delta, snapshot): output_stream.send_delta(delta.value) logging.debug(f"Text delta received: {delta.value}") def on_tool_call_created(self, tool_call): error_msg = "Assistant shouldn't be calling tools." logging.error(error_msg) raise Exception(error_msg) def create_and_stream(transcript): logging.info("Starting OpenAI stream thread.") try: with client.beta.threads.create_and_run_stream( assistant_id=asst_screw_bardo_id, thread={ "messages": [{"role": "user", "content": transcript}] }, event_handler=EventHandler() ) as stream: stream.until_done() output_stream.done = True logging.info("OpenAI stream completed.") except Exception as e: logging.exception("Exception occurred during create_and_stream.") def yoink(thread: threading.Thread): logging.info("Starting stream thread...") thread.start() logging.info("Stream thread started. Beginning to stream output.") try: while not output_stream.done: if output_stream.buffer: delta = output_stream.buffer.pop(0) yield bytes(delta, encoding="utf-8") else: asyncio.run(sleep(0.018)) except Exception as e: logging.exception("Exception occurred during streaming output.") finally: logging.info("Stream completed successfully.") logging.info(f"Completed Assistant Response:\n{output_stream.response}") output_stream.reset() thread.join() logging.info("Stream thread joined. Task completed successfully without errors.") def process(url): current_time = datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d %H:%M:%S') logging.info(f"New Entry at {current_time}") logging.info(f"URL: {url}") video_id = get_video_id(url) if not video_id: logging.warning(f"Could not parse video id from URL: {url}") return (False, "Couldn't parse video ID from URL. (Are you sure you entered a valid YouTube.com or YouTu.be URL?)", 400, None) logging.info(f"Parsed Video ID: {video_id}") # Get the transcript for that video ID transcript = get_auto_transcript(video_id) if not transcript: logging.error("Error: could not retrieve transcript. Assistant won't be called.") return (False, "Successfully parsed video ID from URL, however the ID was either invalid, the transcript was disabled by the video owner, or some other error was raised because of YouTube.", 200, None) thread = threading.Thread( name="create_stream", target=create_and_stream, args=(transcript,) ) logging.info("Stream preparation complete, sending reply.") return (True, None, None, thread) def get_video_id(url): youtu_be = r'(?<=youtu.be/)([A-Za-z0-9_-]{11})' youtube_com = r'(?<=youtube\.com\/watch\?v=)([A-Za-z0-9_-]{11})' id_match = re.search(youtu_be, url) if not id_match: id_match = re.search(youtube_com, url) if not id_match: # Couldn't parse video ID from URL logging.warning(f"Failed to parse video ID from URL: {url}") return None return id_match.group(1) def get_auto_transcript(video_id): trans_api_errors = youtube_transcript_api._errors try: transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en'], proxies=None, cookies=None, preserve_formatting=False) except trans_api_errors.TranscriptsDisabled as e: logging.exception(f"Exception while fetching transcript: {e}") return None formatter = TextFormatter() # Ensure that you create an instance of TextFormatter txt_transcript = formatter.format_transcript(transcript) logging.info("Transcript successfully retrieved and formatted.") return txt_transcript # Initialize output stream output_stream = StreamOutput() logging.info(f"Main initialized at {datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d %H:%M:%S')}. Presumably application starting.")