108 lines
3.3 KiB
Python
108 lines
3.3 KiB
Python
|
|
# To parse video ids
|
|
import re
|
|
|
|
# Youtube Transcript stuff import
|
|
from youtube_transcript_api import YouTubeTranscriptApi
|
|
from youtube_transcript_api.formatters import TextFormatter
|
|
|
|
# OpenAI API stuff import
|
|
from openai import AssistantEventHandler
|
|
from openai import OpenAI
|
|
# For streaming
|
|
from typing_extensions import override
|
|
|
|
# To get the env var
|
|
import os
|
|
|
|
|
|
### OpenAI Config
|
|
|
|
# This is copy and pasted straight up from the quickstart guide:
|
|
class EventHandler(AssistantEventHandler):
|
|
@override
|
|
def on_text_created(self, text) -> None:
|
|
print(f"\nassistant > ", end="", flush=True)
|
|
|
|
@override
|
|
def on_text_delta(self, delta, snapshot):
|
|
print(delta.value, end="", flush=True)
|
|
|
|
def on_tool_call_created(self, tool_call):
|
|
print(f"\nassistant > {tool_call.type}\n", flush=True)
|
|
|
|
def on_tool_call_delta(self, delta, snapshot):
|
|
if delta.type == 'code_interpreter':
|
|
if delta.code_interpreter.input:
|
|
print(delta.code_interpreter.input, end="", flush=True)
|
|
if delta.code_interpreter.outputs:
|
|
print(f"\n\noutput >", flush=True)
|
|
for output in delta.code_interpreter.outputs:
|
|
if output.type == "logs":
|
|
print(f"\n{output.logs}", flush=True)
|
|
|
|
# Setting up OpenAI Client with API Key
|
|
api_key =os.getenv("OPENAI_API_KEY")
|
|
client = OpenAI(
|
|
organization='org-7ANUFsqOVIXLLNju8Rvmxu3h',
|
|
api_key=api_key
|
|
)
|
|
|
|
# screw bardo assistant that is configured to make notes and 5Q&A based on any given YouTube Transcript
|
|
asst_screw_bardo_id = "asst_KsEI7n5ZxMcHG6HTJmvbFFhe"
|
|
|
|
# uhh no we need a new thread each time tf
|
|
# make sure to call the function after the transcript is confirmed to work, it would be very stupid to call the function and make a new thread this early
|
|
def create_and_stream(transcript):
|
|
with client.beta.threads.create_and_run_stream(
|
|
assistant_id=asst_screw_bardo_id,
|
|
thread={
|
|
"messages" : [
|
|
{"role": "user",
|
|
"content": transcript}
|
|
]
|
|
},
|
|
event_handler=EventHandler()
|
|
) as stream:
|
|
stream.until_done()
|
|
return
|
|
|
|
def get_video_id():
|
|
# local consts
|
|
youtu_be = r'(?<=youtu.be/)([A-Za-z0-9_-]{11})'
|
|
youtube_com = r'(?<=youtube\.com\/watch\?v=)([A-Za-z0-9_-]{11})'
|
|
# local vars
|
|
valid_id = False
|
|
|
|
# Get URL from user
|
|
while(not valid_id):
|
|
user_url = input("Enter the URL of the YouTube video: ")
|
|
id = re.search(youtu_be, user_url)
|
|
|
|
if (not id):
|
|
id = re.search(youtube_com, user_url)
|
|
|
|
if (not id):
|
|
print("Error: Couldn't parse video ID from URL, please make sure you are pasting a full \"youtube.com\" or \"youtu.be url\"\n")
|
|
else:
|
|
valid_id = True
|
|
|
|
id = id.group(1)
|
|
print(id)
|
|
return id
|
|
|
|
# Takes the transcript and formats it in basic text before writing it to auto-transcript.txt
|
|
def get_auto_transcript(video_id):
|
|
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en'], proxies=None, cookies=None, preserve_formatting=False)
|
|
formatter = TextFormatter
|
|
|
|
txt_transcript = formatter.format_transcript(self=any, transcript=transcript)
|
|
print(txt_transcript)
|
|
return txt_transcript
|
|
|
|
# Stores the video id imputted by the user
|
|
video_id = get_video_id()
|
|
|
|
transcript = get_auto_transcript(video_id)
|
|
|
|
create_and_stream(transcript) |