Files
jacobs-bot/discord_logic.py
2025-08-12 11:50:18 -05:00

498 lines
21 KiB
Python
Executable File

# ----- Imports & Setup -----
import logging
import regex as re
from random import randint
from asyncio import run as asyncio_run, sleep
from datetime import timedelta
import discord
from discord import Bot, Message
from discord.abc import GuildChannel as Channel
from discord.ext import commands
from ai_logic import run, get_instructions
from quotes import select_quote, select_student
from students import *
logger = logging.getLogger(__name__)
SERVER_ID: int = 1339601046745645148
GENERAL_ID: int = 1339601047294840876
bot = Bot(intents=discord.Intents.all())
# ----- Discord Helper Functions -----
async def get_channel(channel_id: int) -> Channel:
"""
Tries to get a cached channel object, if it fails it will send an API request to retrieve a new one.
Args:
channel_id (int): The ID of the channel to retrieve.
Raises:
ValueError: If the channel cannot be retrieved.
Returns:
discord.abc.GuildChannel: The retrieved discord channel object.
"""
logger.debug(f"Attempting to get channel with ID {channel_id}")
channel = bot.get_channel(channel_id)
if not channel:
logger.debug(f"Channel with ID {channel_id} not found in cache, fetching from API.")
channel = await bot.fetch_channel(channel_id)
if not channel:
logger.critical(f"Could not fetch channel with channel_id {channel_id}, fetch_channel returned None.")
return None
logger.info(f"Successfully retrieved {channel_id}, #{channel.name}")
return channel
async def get_message(message: Message | int, channel_id: int = None, attempts: int = 3) -> Message:
"""
Retrieves a message object either from cache or by fetching it from the channel.
Args:
message (Message | int): The message object or message ID to retrieve.
channel_id (int, optional): The ID of the channel to fetch the message from if not in cache.
attempts (int, optional): The number of attempts to retrieve the message. Defaults to 3.
Returns:
Message: The retrieved message object.
"""
logger.debug(f"Attempting to get message with ID {message if isinstance(message, int) else message.id}")
for attempt in range(attempts):
if isinstance(message, int):
got_message: Message = bot.get_message(message)
if not got_message and channel_id:
logger.debug(f"Message with ID {message} not found in cache, fetching from channel ID {channel_id}.")
channel: discord.abc.GuildChannel = await get_channel(channel_id)
got_message: Message = await channel.fetch_message(message)
elif not got_message:
logger.error(f"Message with ID {message} not found in cache and no channel ID provided.")
elif isinstance(message, Message):
got_message: Message = bot.get_message(message.id)
if not got_message:
logger.debug(f"Message with ID {message.id} not found in cache, fetching from channel.")
got_message: Message = await message.channel.fetch_message(message.id)
else:
logger.error(f"Couldn't retrieve message:\nMessage: {message}\nChannel ID: {channel_id}")
return None
if got_message:
return got_message
if attempt < attempts - 1:
logger.warning(f"Attempt {attempt + 1} failed. Retrying in 2 seconds...")
await sleep(2)
logger.error(f"Failed to retrieve message after {attempts} attempts.")
return None
async def send_message(message: str, channel: int | Channel = GENERAL_ID) -> Message:
"""
Has the bot send a message to a specified channel. If no channel is specified, the bot sends it to general.
Args:
message (str): The message to send.
channel (int | Message, optional): Channel to send message to, Defaults to GENERAL_ID.
Returns:
Message: The sent message object.
"""
logger.debug(f"Attempting to send message: {message}")
if isinstance(channel, int):
channel = await get_channel(channel)
if isinstance(channel, Channel):
sent_message: Message = await channel.send(message)
if isinstance(sent_message, Message):
logger.info(f"Message '{message}' successfully sent to {sent_message.channel}")
return sent_message
else:
logger.error(f"Message likely wasn't successfully sent, as channel.send did not return a Message.")
elif isinstance(channel, None):
logger.error(f"Message couldn't be sent as a channel wasn't found/messagable.")
async def edit_message(message: Message | int, channel: int = None):
"""
Edits a message in a channel.
Args:
message (Message | int): The message to edit.
channel (int, optional): The channel ID of the message. Defaults to None.
"""
logger.debug(f"Attempting to edit message {message if isinstance(message, int) else message.id}")
if isinstance(message, int):
message = await get_message(message, channel)
if isinstance(message, Message):
await message.edit(content=message.content)
logger.info(f"Message {message.id} successfully edited.")
else:
logger.error(f"Message couldn't be edited as it was not found.")
# ----- Message Utility Functions -----
async def msg_is_reply(message: Message) -> tuple[bool, Message]:
"""
Checks if a message is a reply to another message.
Args:
message (Message): The message to check.
Returns:
tuple[bool, Message]: A tuple containing a boolean indicating if the message is a reply and the replied message.
"""
logger.debug(f"Checking if message ID {message.id} is a reply.")
if message.reference is not None:
replied_msg = message.reference.cached_message
if replied_msg is None:
logger.debug(f"Replied message not found in cache, fetching from channel ID {message.reference.channel_id}.")
channel = await get_channel(message.reference.channel_id)
replied_msg = await channel.fetch_message(message.reference.message_id)
if isinstance(replied_msg, Message):
return True, replied_msg
return False, None
async def get_reply_chain(msg: Message) -> list[Message]:
"""
Retrieves the chain of replies for a given message.
Args:
msg (Message): The message to get the reply chain for.
Returns:
list[Message]: A list of messages in the reply chain.
"""
logger.debug(f"Getting reply chain for message ID {msg.id}.")
i = 0
reply_chain = [msg]
while i < 10:
is_reply, msg = await msg_is_reply(msg)
if not is_reply:
break
reply_chain.insert(0, msg)
i += 1
return reply_chain
async def gen_message_list(discord_messages: list[Message]) -> list[dict]:
"""
Generates a list of message dictionaries from a list of Discord messages.
Args:
discord_messages (list[Message]): The list of Discord messages.
Returns:
list[dict]: A list of message dictionaries.
"""
logger.debug("Generating message list from Discord messages.")
messages = []
if not isinstance(discord_messages, list):
discord_messages = [discord_messages]
for message in discord_messages:
students_mentioned = None
if isinstance(message, Message):
message_copy = {
"id": message.id,
"content": message.content,
"author": message.author,
"mentions": message.mentions,
"created_at": message.created_at,
"jump_url": message.jump_url
}
role = "assistant" if message_copy["author"].id == bot.user.id else "user"
students_mentioned = None
if message_copy["mentions"] and role == "user":
students_mentioned = [student.id for student in message_copy["mentions"] if student.id in STUDENT_IDS]
mentions = re.findall(r'<@\d{17,19}>', message_copy["content"])
for mention in mentions:
mention_id = mention.replace("<@", "").replace(">", "")
student_info = ID_TO_NAME.get(int(mention_id))
if student_info:
message_copy["content"] = message_copy["content"].replace(mention, student_info.split("\n")[0])
message_copy["content"] += '\n{'
if students_mentioned:
message_copy["content"] += f"\n\nInfo on students mentioned (DO NOT REPEAT!): "
for student in students_mentioned:
message_copy["content"] += f"\n{STUDENTS.get(student)}"
message_copy["content"] += f"\n\nSent by: {STUDENTS.get(message_copy['author'].id) if message_copy['author'].id in STUDENT_IDS else message_copy['author'].name + ' (Author\'s details not found)'}" + '}'
if not message_copy["content"]:
debug = True
logger.error(f"No message content for message {message_copy['id']}.")
fetched_message = await get_message(message_copy["id"])
message_copy["content"] = fetched_message.content
thread_message = {
"role": role,
"content": message_copy["content"]
}
debug = True # Delete after fix
if debug:
debug_dump = "Messages Dump:\n"
for msg_copy in discord_messages:
debug_dump += f"""
Message: {msg_copy}
Message Time: {msg_copy.created_at}
Message ID: {msg_copy.id}
Message Author: {msg_copy.author}
Message Content: {msg_copy.content}
Link: {msg_copy.jump_url}
"""
logger.debug(debug_dump)
messages.append(thread_message)
else:
logger.warning(f"Argument {message} is not of type Message and will be skipped.")
return messages
# ----- AI Interaction Functions -----
async def send_quote(quote: str = None) -> None:
"""
Sends a quote to the general channel.
Args:
quote (str, optional): The quote to send. If not provided, a random quote is selected.
"""
if not quote:
quote = select_quote()
logger.info(f"Sending quote '{quote}' in #general...")
await send_message(quote)
async def after_class(student: int = None) -> None:
"""
Sends a message to a student to see the bot after class.
Args:
student (int, optional): The ID of the student. If not provided, a random student is selected.
"""
if not student:
student = select_student()
logger.info(f"Sending mention to see {student} after class to #general...")
await send_message(f"Come see me after class <@{student}>")
async def has_essay(message: Message) -> bool:
if message.author.id in ASSIGNED_ESSAY:
async with message.channel.typing():
message = await get_message(message) if not message else message
message_list = await gen_message_list(message)
response = await run(
message_list,
f"{get_instructions(message)}\n\nTHIS USER HAS AN ESSAY!!! Yell at them and tell them to finish their essay on {ASSIGNED_ESSAY.get(message.author.id)}!!! Unless...they sent you the essay, in that case, you can decide whether or not it is satisfactory (it needs to be in MLA format and 300-2000 characters long, and to be relevant of course). If it is satisfactory, you can tell them they are free to go, after you run the function \"clear_essay\".",
message.author.id
)
await message.reply(response)
return True
else:
return False
# ----- Discord Commands & Event Handlers -----
def setup_discord_bot(bot: Bot) -> None:
"""
Sets up the Discord bot with commands and event listeners.
Args:
bot (Bot): The Discord bot instance.
"""
@bot.slash_command(description="Talk to Mr. Jacobs!!!")
async def message(ctx: discord.ApplicationContext, text: str) -> None:
"""
Slash command to talk to Mr. Jacobs. The bot will respond to the user's message using AI.
Args:
ctx (discord.ApplicationContext): The context of the command.
text (str): The message text.
"""
logging.info(f"User {ctx.author.global_name} sent message {text}")
await ctx.defer()
instructions = get_instructions(ctx.channel)
thread_messages = [{"role": "user", "content": text}]
response = await run(thread_messages, instructions, ctx.author.id)
await ctx.respond(content=response)
@bot.slash_command(name="clear_essay", description="Clear the essay assigned to a student")
async def clear_essay_command(ctx: discord.ApplicationContext, student: str) -> None:
"""
Slash command to clear the essay assigned to a student.
Args:
ctx (discord.ApplicationContext): The context of the command.
student (str): The ID of the student whose essay is to be cleared.
"""
try:
student = int(student)
except Exception as e:
logging.error(f"Failed to convert student ID to int: {e}")
await ctx.respond("Invalid student ID format.", ephemeral=True)
return
if not ctx.author.id == 620319269233885226:
await ctx.respond("You don't have permission to use this command.")
return
if student not in STUDENT_IDS:
await ctx.respond("Invalid student ID.", ephemeral=True)
return
ASSIGNED_ESSAY.pop(student, None)
await ctx.respond(f"Cleared essay for <@{student}>")
@bot.slash_command(name="get_essay", description="Get the essay assigned to a student, or all students")
async def get_essay_command(ctx: discord.ApplicationContext, student: str = None) -> None:
if student:
try:
student = int(student)
except Exception as e:
logging.error(f"Failed to convert student ID to int: {e}")
await ctx.respond("Invalid student ID format.", ephemeral=True)
return
await ctx.respond(get_essay(student))
else:
await ctx.respond(get_essay(), ephemeral=True)
@bot.slash_command(name="assign_essay", description="Assign an essay to a student")
@commands.has_permissions(moderate_members=True)
async def assign_essay_command(
ctx: discord.ApplicationContext,
student: str,
timeout: int | None = 0,
topic: str | None = None
) -> None:
"""
Slash command to assign an essay to a student.
Args:
ctx (discord.ApplicationContext): The context of the command.
student (int): The ID of the student to assign the essay to.
timeout (int | None, optional): The timeout in seconds before the essay is cleared. Defaults to 0.
topic (str | None, optional): The topic of the essay. If not provided, a random topic is selected.
"""
try:
student = int(student)
except Exception as e:
logging.error(f"Failed to convert student ID to int: {e}")
await ctx.respond("Invalid student ID format.")
return
if not ctx.author.id == 620319269233885226:
await ctx.respond("You don't have permission to use this command.")
return
if timeout <= 0:
timeout = None
logging.info(f"Assigning essay to student {student} with timeout {timeout} and topic {topic}")
if student not in STUDENT_IDS:
await ctx.respond("Invalid student ID.", ephemeral=True)
return
assign_essay(student, topic)
await ctx.respond(f"Assigned essay to <@{student}>: {ASSIGNED_ESSAY[student]}")
if timeout and topic:
timeout_until = discord.utils.utcnow() + timedelta(seconds=timeout)
try:
member: discord.Member = await ctx.interaction.guild.fetch_member(student)
await member.timeout(until=timeout_until, reason=f"Assigned essay: {topic}")
await ctx.respond(f'{member.mention} has been timed out for {timeout}.', ephemeral=True)
except discord.Forbidden:
await ctx.respond(f'Failed to timeout {member.mention}. Missing permissions.', ephemeral=True)
except discord.HTTPException as e:
await ctx.respond(f'Failed to timeout {member.mention}. {e.text}', ephemeral=True)
return
@bot.slash_command(
name="rps",
description="Play \"Rock, Paper, Scissors, Essay\" with Mr. Jacobs!!!",
)
async def rps_essay(
ctx: discord.ApplicationContext,
choice: str = discord.Option(
description="Your selection for the game",
choices=[
discord.OptionChoice("Rock"),
discord.OptionChoice("Paper"),
discord.OptionChoice("Scissors")
]
)
) -> None:
"""
Play Rock Paper Scissors with Mr. Jacobs.
Args:
ctx: Application command context
choice: Your selection for the game
"""
logging.info(f"{ctx.author} chose {choice}")
outcomes = {"Rock": "Paper", "Paper": "Scissors", "Scissors": "Rock"}
await ctx.respond(
f"I choose {outcomes[choice]}, you owe me a {randint(1,15)} page essay "
f"on {assign_essay(ctx.author.id)}. MLA format, double spaced, "
"12pt Times New Roman!"
)
@bot.event
async def on_message(message: Message) -> None:
"""
Event listener for new messages. The bot will respond to messages that mention it or reply to its messages.
Bot disregards its own messages.
Args:
message (Message): The new message.
"""
logger.debug(f"New message received: {message.content}")
if message.author.id == bot.user.id or message.interaction_metadata:
return
if await handle_reply(message):
return
if await handle_mentions(message):
return
async def handle_reply(message: Message) -> bool:
"""
Handles replies to the bot's messages.
Args:
message (Message): The new message.
Returns:
bool: True if the message was handled, False otherwise.
"""
is_reply, first_reply = await msg_is_reply(message)
if is_reply and first_reply.author.id == bot.user.id:
async with message.channel.typing():
if await has_essay(message):
return True
reply_chain = await get_reply_chain(message)
thread_messages = await gen_message_list(reply_chain)
await process_thread_messages(thread_messages, message)
return True
return False
async def handle_mentions(message: Message) -> bool:
"""
Handles messages that mention the bot.
Args:
message (Message): The new message.
Returns:
bool: True if the message was handled, False otherwise.
"""
if message.mentions:
for user in message.mentions:
if user.id == bot.user.id:
async with message.channel.typing():
if await has_essay(message):
return True
thread_messages = await gen_message_list(message)
await process_thread_messages(thread_messages, message)
return True
return False
async def process_thread_messages(thread_messages: list, message: Message) -> None:
"""
Processes a list of thread messages and sends a response.
Args:
thread_messages (list): The list of thread messages.
message (Message): The original message.
"""
instructions = get_instructions(message)
response = await run(thread_messages, instructions, message.author.id)
await message.reply(response)