123 lines
4.1 KiB
Python
123 lines
4.1 KiB
Python
import os
|
|
import logging
|
|
from dotenv import load_dotenv
|
|
|
|
import discord
|
|
from discord import Bot
|
|
from openai import OpenAI
|
|
from asyncio import run
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from students import STUDENTS
|
|
from quotes import QUOTES, select_quote, select_student
|
|
|
|
# Logging.
|
|
logging.basicConfig(
|
|
filename="./logs/jacobs.log",
|
|
filemode="at+",
|
|
level=logging.DEBUG
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DISCORD_TOKEN = os.getenv('DISCORD_TOKEN')
|
|
OPENAI_KEY = os.getenv('OPENAI_KEY')
|
|
|
|
# Discord
|
|
bot = Bot(intents=discord.Intents.default())
|
|
SERVER_ID: int = 1339601046745645148
|
|
GENERAL_ID: int = 1339601047294840876
|
|
ch_general = None
|
|
|
|
async def get_general():
|
|
global ch_general
|
|
global bot
|
|
if not ch_general:
|
|
ch_general = await bot.fetch_channel(GENERAL_ID)
|
|
if not ch_general:
|
|
raise(ValueError("General channel object should *not* be none!"))
|
|
return ch_general
|
|
|
|
async def send_quote(quote: str = None) -> None:
|
|
await get_general()
|
|
if not quote:
|
|
quote = select_quote()
|
|
logger.info(f"Sending quote {quote} in #general...")
|
|
await ch_general.send(quote)
|
|
|
|
async def after_class(student: int = None) -> None:
|
|
global ch_general
|
|
if not student:
|
|
student = select_quote()
|
|
logger.info(f"Sending mention to see {student} after class to #general...")
|
|
await ch_general.send(f"Come see me after class <@{student}>")
|
|
|
|
|
|
# OpenAI & Assistant configuration
|
|
client = OpenAI(
|
|
api_key=OPENAI_KEY,
|
|
project="proj_vUQ7duhKKc1jxIqllRhy6DVJ",
|
|
)
|
|
|
|
mr_jacobs = client.beta.assistants.retrieve("asst_KdPdwqNAKijujfyCRrJCOgJN")
|
|
|
|
base_instructions = mr_jacobs.instructions
|
|
instructions = base_instructions + f"\n\nHere is a dictionary containing some of your most well known quotes, organized by their frequency: \n{QUOTES}\n\nStudent: "
|
|
|
|
def get_run_status(run):
|
|
status = client.beta.threads.runs.retrieve(
|
|
run.id,
|
|
thread_id=run.thread_id
|
|
).status
|
|
logger.info(f"Status of run {run.id} is {status}")
|
|
return status
|
|
|
|
def get_instructions(student: int | None = None) -> str:
|
|
logging.info(f"Looking for {student} in students...")
|
|
if student in STUDENTS:
|
|
return instructions + STUDENTS.get(student)
|
|
else:
|
|
logging.warning(f"Couldn't find {student}")
|
|
return instructions + "Unknown"
|
|
|
|
async def run_message(run):
|
|
# ew but Python doesn't have a do while and it's less ugly than the same statement twice.
|
|
while True:
|
|
complete = get_run_status(run) == "completed"
|
|
if complete:
|
|
break
|
|
thread_messages = client.beta.threads.messages.list(run.thread_id)
|
|
for msg_ob in thread_messages.data:
|
|
if msg_ob.id == thread_messages.first_id:
|
|
response = msg_ob.content[0].text.value
|
|
return response
|
|
raise Exception(f"bro what did you do dumbass, figure it out.:\nThread Messages List:\n{thread_messages}")
|
|
|
|
@bot.slash_command(description="Talk to Mr. Jacobs!!!")
|
|
async def message(ctx: discord.ApplicationContext, text: str):
|
|
logging.info(f"User {ctx.author.global_name} sent message {text}")
|
|
instructions = get_instructions(ctx.author.id)
|
|
bot_reply = await ctx.respond("*hmmmm*...let me see here...")
|
|
run = client.beta.threads.create_and_run(
|
|
assistant_id=mr_jacobs.id,
|
|
instructions=instructions,
|
|
thread={
|
|
"messages": [
|
|
{
|
|
"role": "user",
|
|
"content": text
|
|
}
|
|
]
|
|
}
|
|
)
|
|
response = await run_message(run)
|
|
await bot_reply.edit_original_response(content=response)
|
|
|
|
# After successful initilization, schedule tasks.
|
|
task_scheduler = BackgroundScheduler()
|
|
task_scheduler.add_job(lambda: bot.loop.create_task(send_quote()), 'cron', day_of_week='mon-fri', hour=8, minute=50)
|
|
task_scheduler.add_job(lambda: bot.loop.create_task(send_quote()), 'cron', day_of_week='mon-fri', hour='8-14', minute="*/20", jitter=180)
|
|
task_scheduler.add_job(lambda: bot.loop.create_task(after_class()), 'cron', day_of_week='mon-fri', hour=10, minute=55)
|
|
task_scheduler.start()
|
|
|
|
logger.info(f"Presumably successfully initilized, starting bot.")
|
|
bot.run(DISCORD_TOKEN) |