66 lines
2.5 KiB
Python
66 lines
2.5 KiB
Python
from flask import Flask, render_template, Response, request
|
|
from main import get_auto_transcript, get_video_id, create_and_stream, output_buffer, output_lock
|
|
from datetime import datetime
|
|
import threading
|
|
import pytz
|
|
import time
|
|
|
|
app = Flask(__name__, static_folder="website/static", template_folder="website")
|
|
|
|
@app.route('/')
|
|
def home():
|
|
return render_template('index.html')
|
|
|
|
@app.route('/streamtest', methods=['POST'])
|
|
def streaming():
|
|
def generate():
|
|
for i in range(10):
|
|
yield f"Data chunk {i}\n"
|
|
time.sleep(1) # Simulating a delay in data generation
|
|
|
|
return Response(generate(), content_type='text/plain')
|
|
|
|
@app.route('/process_url', methods=['POST'])
|
|
def process_url():
|
|
# Opens a file to log the video id and the assistants respone to see if I can further improve instructions:
|
|
#log = open("log.txt", "at", 1)
|
|
url = request.form['url']
|
|
|
|
# Extract the video ID from the URL
|
|
video_id = get_video_id(url) # Modify this function to accept the URL
|
|
if not video_id:
|
|
return "Couldn't parse video ID from URL. (Are you sure you entered a valid YouTube.com or YouTu.be URL?)"
|
|
|
|
# Get the transcript for that video ID
|
|
transcript = get_auto_transcript(video_id)
|
|
if (not transcript):
|
|
return "Successfully parsed video ID from URL, however the ID was either invalid, the transcript was disabled by the video owner, or some other error was raised because of YouTube."
|
|
|
|
|
|
# Start processing in a separate thread
|
|
threading.Thread(target=create_and_stream, args=(transcript,)).start()
|
|
|
|
# Process the transcript and stream the result.
|
|
# response = create_and_stream(transcript)
|
|
# log.write(f"\n\n\n### New Entry at {datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d %H:%M:%S')}\n\n URL: {url}\n Video ID: {video_id}\n\nAssistant Response: \n{response}")
|
|
# Return a response
|
|
return Response("Processing started. Check /stream_output for updates.", status=200) # Add more detailed output if needed
|
|
|
|
@app.route('/stream_output')
|
|
def stream_output():
|
|
def yoink():
|
|
while True:
|
|
with output_lock:
|
|
if output_buffer:
|
|
message = output_buffer.pop(0)
|
|
yield f"{message}"
|
|
|
|
time.sleep(0.005) # Adjust as necessary for your application
|
|
|
|
return Response(yoink(), content_type='text/plain')
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': # Change this line to properly check for main
|
|
app.run(debug=True) |