import os import re import threading import pytz from datetime import datetime from dotenv import load_dotenv from youtube_transcript_api import YouTubeTranscriptApi, _errors from youtube_transcript_api.formatters import TextFormatter from openai import AssistantEventHandler, OpenAI from typing_extensions import override import asyncio # Load environment variables load_dotenv() # For logging def log(message: str): timestamp = datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d %H:%M:%S') with open("logs/log.md", "a") as file: file.write(f"{timestamp} - {message}\n") # StreamOutput class to handle streaming class StreamOutput: def __init__(self): self.response = "" self.done = False self.buffer = [] self.lock = threading.Lock() def reset(self): with self.lock: self.response = "" self.done = False self.buffer = [] def add_to_buffer(self, delta: str): with self.lock: self.response += delta self.buffer.append(delta) output_stream = StreamOutput() # OpenAI Client Configuration client = OpenAI( organization='org-7ANUFsqOVIXLLNju8Rvmxu3h', project="proj_NGz8Kux8CSka7DRJucAlDCz6", api_key=os.getenv("OPENAI_API_KEY") ) asst_screw_bardo_id = "asst_JGFaX6uOIotqy5mIJnu3Yyp7" # Async helper def awaiter(coro): asyncio.run(coro) # EventHandler for OpenAI Assistant class EventHandler(AssistantEventHandler): @override def on_text_created(self, text) -> None: awaiter(output_stream.send_delta("Response Received:\n\nScrew-Bardo:\n\n")) @override def on_text_delta(self, delta, snapshot): awaiter(output_stream.send_delta(delta.value)) def on_tool_call_created(self, tool_call): raise Exception("Assistant shouldn't be calling tools.") def create_and_stream(transcript: str): try: with client.beta.threads.create_and_run_stream( assistant_id=asst_screw_bardo_id, thread={ "messages": [{"role": "user", "content": transcript}] }, event_handler=EventHandler() ) as stream: stream.until_done() output_stream.done = True except Exception as e: log(f"Error in create_and_stream: {e}") output_stream.done = True def get_video_id(url: str) -> str: youtu_be = r'(?<=youtu.be/)([A-Za-z0-9_-]{11})' youtube_com = r'(?<=youtube\.com\/watch\?v=)([A-Za-z0-9_-]{11})' match = re.search(youtu_be, url) or re.search(youtube_com, url) if match: return match.group(1) return None def get_auto_transcript(video_id: str) -> str: try: transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en']) formatter = TextFormatter() return formatter.format_transcript(transcript) except _errors.TranscriptsDisabled as e: log(f'Exception while fetching transcript: {e}') except Exception as e: log(f'Unexpected error while fetching transcript: {e}') return None