This wasn't a crazy rewrite or anything, I just updated it to the new YouTube Transcript and OpenAI API's, as well as super simplifying the code. On top of that, it now works single threaded, just using multiple gunicorn threads for concurrency. It's a lot simplier and cleaner, although not up to my current standards.
224 lines
7.8 KiB
Python
224 lines
7.8 KiB
Python
"""
|
|
Main module that handles processing of YouTube transcripts and connecting to the AI service.
|
|
Each user session has its own output stream and thread to handle the asynchronous AI response.
|
|
"""
|
|
|
|
from http import HTTPStatus
|
|
import re
|
|
import threading
|
|
import asyncio
|
|
from datetime import datetime
|
|
from flask import Response
|
|
from collections.abc import Generator
|
|
import pytz
|
|
import os
|
|
import logging
|
|
|
|
# Youtube Transcript imports
|
|
import youtube_transcript_api._errors
|
|
from youtube_transcript_api import YouTubeTranscriptApi
|
|
from youtube_transcript_api.formatters import TextFormatter
|
|
|
|
# OpenAI API imports
|
|
from openai import OpenAI
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
# Configure logging
|
|
try:
|
|
logging.basicConfig(
|
|
filename="./logs/main.log",
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s: %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
# FIX: Bruh what is this :joy:
|
|
except FileNotFoundError as e:
|
|
with open("./logs/main.log", "x"):
|
|
pass
|
|
logging.basicConfig(
|
|
filename="./logs/main.log",
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s: %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
logging.info(f"No main.log file was found ({e}), so one was created.")
|
|
|
|
if not load_dotenv():
|
|
logging.fatal(
|
|
"Error loading dotenv, your goose is very likely to be cooked (no OpenAI API Key </3)"
|
|
)
|
|
|
|
# Global dict for per-user session streams.
|
|
user_streams = {}
|
|
# Lock to ensure thread-safe operations on shared memory.
|
|
stream_lock = threading.Lock()
|
|
|
|
# For running async code in non-async functions.
|
|
awaiter = asyncio.run
|
|
|
|
|
|
# OpenAI Client configuration
|
|
client = OpenAI(
|
|
organization=os.getenv("OPENAI_ORG"),
|
|
project=os.getenv("OPENAI_PROJ"),
|
|
api_key=os.getenv("OPENAI_API_KEY"),
|
|
)
|
|
prompt_env = os.getenv("OPENAI_PROMPT")
|
|
# No .unwrap or .expect function </3
|
|
if prompt_env is None or not prompt_env.strip():
|
|
logging.fatal(
|
|
"No ENV var set for OPENAI_PROMPT, unable to submit instructions to AI."
|
|
)
|
|
raise ValueError("The OPENAI_PROMPT environment variable is not set")
|
|
|
|
prompt_id = prompt_env
|
|
|
|
|
|
def create_and_stream(transcript: str) -> Generator[str, None, str]:
|
|
stream = client.responses.create(
|
|
model="gpt-4.1-mini",
|
|
prompt={
|
|
"id": "pmpt_69097600a25c8190ba77a32457973dcd087a89928ce72d22",
|
|
"version": "1",
|
|
},
|
|
input=[{"role": "user", "content": transcript}],
|
|
stream=True,
|
|
)
|
|
for event in stream:
|
|
# behold, one of the most bloated match statements ever.
|
|
# because of my uneeded comments, of course
|
|
# wouldn't have it any other way
|
|
match event.type:
|
|
case "response.created":
|
|
# TODO: Should the user really see this?
|
|
# Logging is fine, but there has to be some kind of idenfitier for responses, right?
|
|
logging.info("Stream {stream_id} created for response.")
|
|
yield "Transcript submitted to AI.\n\n"
|
|
case "response.output_text.delta":
|
|
# This is where the cash money money cash is
|
|
# Could put a diobolical debug statement here
|
|
yield event.delta
|
|
case "response.output_text.done":
|
|
# TODO: Again, should the user really see this?
|
|
# Newsflash: they don't since it's not yeilded!
|
|
logging.info("Stream {stream_id} completed")
|
|
return "\nAI response end."
|
|
case "error":
|
|
# HACK: In order to abide by the type checking, since I don't know how it'll handle errors
|
|
# Since the loop is handled by Flask and not me idk what it'll do with that iterator
|
|
# No I'm not writing another Generator for it.
|
|
err_msg = event.message
|
|
logging.error(f"Error while streaming: {err_msg}")
|
|
return str(ValueError(err_msg))
|
|
# NOTE: For debug, really.
|
|
# There are many events that I likely don't care about and would bloat a log
|
|
case _:
|
|
logging.warning(
|
|
f"Unhandled event type: {event.type}\nEvent contents: {event}"
|
|
)
|
|
continue
|
|
# TODO: Decide the severity
|
|
logging.critical(
|
|
"Generator returned early, likely an error with the stream that wasn't reported"
|
|
)
|
|
# HACK: Same deal as the "error" case.
|
|
return str(
|
|
ValueError(
|
|
"OpenAI never reported response done, so response may be incomplete."
|
|
)
|
|
)
|
|
|
|
|
|
def process(url: str, session_id: str) -> Response:
|
|
"""
|
|
Process a YouTube URL: parse the video id, retrieve its transcript, and prepare the session for AI processing.
|
|
|
|
Args:
|
|
url (str): The YouTube URL provided by the user.
|
|
session_id (str): The unique session identifier.
|
|
|
|
Returns:
|
|
Response: The proper HTTP response based off what goes on in this here backend
|
|
"""
|
|
# Current time for logging I assume
|
|
current_time = datetime.now(pytz.timezone("America/New_York")).strftime(
|
|
"%Y-%m-%d %H:%M:%S"
|
|
)
|
|
# hey wadda ya know
|
|
logging.info(f"New Entry at {current_time} for session {session_id}")
|
|
logging.info(f"URL: {url}")
|
|
# Parse video id out of user submitted url
|
|
video_id = get_video_id(url)
|
|
# If there is no video id
|
|
if not video_id:
|
|
logging.warning(f"Could not parse video id from URL: {url}")
|
|
return Response(
|
|
"Couldn't parse video ID from URL. (Are you sure you entered a valid YouTube.com or YouTu.be URL?)",
|
|
HTTPStatus.BAD_REQUEST,
|
|
)
|
|
logging.info(f"Parsed Video ID: {video_id}")
|
|
transcript = get_auto_transcript(video_id)
|
|
if not transcript:
|
|
logging.error(
|
|
f"Error: could not retrieve transcript for session {session_id}. Assistant won't be called."
|
|
)
|
|
return Response(
|
|
"Successfully parsed video ID from URL, however the transcript was disabled by the video owner or invalid.",
|
|
HTTPStatus.INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
# Hello
|
|
return Response(create_and_stream(transcript), HTTPStatus.OK)
|
|
|
|
|
|
def get_video_id(url: str):
|
|
"""
|
|
Extract the YouTube video ID from a URL.
|
|
|
|
Args:
|
|
url (str): The YouTube URL.
|
|
|
|
Returns:
|
|
str or None: The video ID if found, otherwise None.
|
|
"""
|
|
# I was going to add trimming but I think the JavaScript does it
|
|
# and I hate JavaScript too much to go look for it
|
|
youtu_be = r"(?<=youtu.be/)([A-Za-z0-9_-]{11})"
|
|
youtube_com = r"(?<=youtube\.com\/watch\?v=)([A-Za-z0-9_-]{11})"
|
|
id_match = re.search(youtu_be, url)
|
|
if not id_match:
|
|
id_match = re.search(youtube_com, url)
|
|
if not id_match:
|
|
logging.warning(f"Failed to parse video ID from URL: {url}")
|
|
return None
|
|
return id_match.group(1)
|
|
|
|
|
|
def get_auto_transcript(video_id: str):
|
|
"""
|
|
Retrieve and format the transcript from a YouTube video.
|
|
|
|
Args:
|
|
video_id (str): The YouTube video identifier.
|
|
|
|
Returns:
|
|
str or None: The formatted transcript if successful; otherwise None.
|
|
"""
|
|
trans_api_errors = youtube_transcript_api._errors
|
|
try:
|
|
ytt_api = YouTubeTranscriptApi()
|
|
transcript = ytt_api.fetch(video_id)
|
|
except trans_api_errors.TranscriptsDisabled as e:
|
|
logging.exception(f"Exception while fetching transcript: {e}")
|
|
return None
|
|
formatter = TextFormatter()
|
|
txt_transcript = formatter.format_transcript(transcript)
|
|
logging.info("Transcript successfully retrieved and formatted.")
|
|
return txt_transcript
|
|
|
|
|
|
logging.info(
|
|
f"Main initialized at {datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d %H:%M:%S')}. Application starting."
|
|
)
|