412 lines
16 KiB
Python
412 lines
16 KiB
Python
import logging
|
|
import regex as re
|
|
from random import randint
|
|
from asyncio import run as asyncio_run, sleep
|
|
import discord
|
|
from discord import Bot, Message
|
|
from discord.abc import GuildChannel as Channel
|
|
|
|
from ai_logic import run, get_instructions
|
|
from quotes import select_quote, select_student
|
|
from students import *
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SERVER_ID: int = 1339601046745645148
|
|
GENERAL_ID: int = 1339601047294840876
|
|
|
|
bot = Bot(intents=discord.Intents.all())
|
|
|
|
# ---------- Discord Helper Functions ----------
|
|
async def get_channel(channel_id: int) -> Channel:
|
|
"""
|
|
Tries to get a cached channel object, if it fails it will send an API request to retrieve a new one.
|
|
|
|
Args:
|
|
channel_id (int): The ID of the channel to retrieve.
|
|
|
|
Raises:
|
|
ValueError: If the channel cannot be retrieved.
|
|
|
|
Returns:
|
|
discord.abc.GuildChannel: The retrieved discord channel object.
|
|
"""
|
|
logger.debug(f"Attempting to get channel with ID {channel_id}")
|
|
channel = bot.get_channel(channel_id)
|
|
if not channel:
|
|
logger.debug(f"Channel with ID {channel_id} not found in cache, fetching from API.")
|
|
channel = await bot.fetch_channel(channel_id)
|
|
if not channel:
|
|
logger.critical(f"Could not fetch channel with channel_id {channel_id}, fetch_channel returned None.")
|
|
return None
|
|
logger.info(f"Successfully retrieved {channel_id}, #{channel.name}")
|
|
return channel
|
|
|
|
async def get_message(message: Message | int, channel_id: int = None, attempts: int = 3) -> Message:
|
|
"""
|
|
Retrieves a message object either from cache or by fetching it from the channel.
|
|
|
|
Args:
|
|
message (Message | int): The message object or message ID to retrieve.
|
|
channel_id (int, optional): The ID of the channel to fetch the message from if not in cache.
|
|
attempts (int, optional): The number of attempts to retrieve the message. Defaults to 3.
|
|
|
|
Returns:
|
|
Message: The retrieved message object.
|
|
"""
|
|
logger.debug(f"Attempting to get message with ID {message if isinstance(message, int) else message.id}")
|
|
for attempt in range(attempts):
|
|
if isinstance(message, int):
|
|
got_message: Message = bot.get_message(message)
|
|
if not got_message and channel_id:
|
|
logger.debug(f"Message with ID {message} not found in cache, fetching from channel ID {channel_id}.")
|
|
channel: discord.abc.GuildChannel = await get_channel(channel_id)
|
|
got_message: Message = await channel.fetch_message(message)
|
|
elif not got_message:
|
|
logger.error(f"Message with ID {message} not found in cache and no channel ID provided.")
|
|
elif isinstance(message, Message):
|
|
got_message: Message = bot.get_message(message.id)
|
|
if not got_message:
|
|
logger.debug(f"Message with ID {message.id} not found in cache, fetching from channel.")
|
|
got_message: Message = await message.channel.fetch_message(message.id)
|
|
else:
|
|
logger.error(f"Couldn't retrieve message:\nMessage: {message}\nChannel ID: {channel_id}")
|
|
return None
|
|
|
|
if got_message:
|
|
return got_message
|
|
|
|
if attempt < attempts - 1:
|
|
logger.warning(f"Attempt {attempt + 1} failed. Retrying in 2 seconds...")
|
|
await sleep(2)
|
|
|
|
logger.error(f"Failed to retrieve message after {attempts} attempts.")
|
|
return None
|
|
|
|
async def send_message(message: str, channel: int | Channel = GENERAL_ID) -> Message:
|
|
"""
|
|
Has the bot send a message to a specified channel. If no channel is specified, the bot sends it to general.
|
|
|
|
Args:
|
|
message (str): The message to send.
|
|
channel (int | Message, optional): Channel to send message to, Defaults to GENERAL_ID.
|
|
|
|
Returns:
|
|
Message: The sent message object.
|
|
"""
|
|
logger.debug(f"Attempting to send message: {message}")
|
|
if isinstance(channel, int):
|
|
channel = await get_channel(channel)
|
|
if isinstance(channel, Channel):
|
|
sent_message: Message = await channel.send(message)
|
|
if isinstance(sent_message, Message):
|
|
logger.info(f"Message '{message}' successfully sent to {sent_message.channel}")
|
|
return sent_message
|
|
else:
|
|
logger.error(f"Message likely wasn't successfully sent, as channel.send did not return a Message.")
|
|
elif isinstance(channel, None):
|
|
logger.error(f"Message couldn't be sent as a channel wasn't found/messagable.")
|
|
|
|
async def edit_message(message: Message | int, channel: int = None):
|
|
"""
|
|
Edits a message in a channel.
|
|
|
|
Args:
|
|
message (Message | int): The message to edit.
|
|
channel (int, optional): The channel ID of the message. Defaults to None.
|
|
"""
|
|
logger.debug(f"Attempting to edit message {message if isinstance(message, int) else message.id}")
|
|
if isinstance(message, int):
|
|
message = await get_message(message, channel)
|
|
if isinstance(message, Message):
|
|
await message.edit(content=message.content)
|
|
logger.info(f"Message {message.id} successfully edited.")
|
|
else:
|
|
logger.error(f"Message couldn't be edited as it was not found.")
|
|
|
|
# ---------- Message Utility Functions ----------
|
|
async def msg_is_reply(message: Message) -> tuple[bool, Message]:
|
|
"""
|
|
Checks if a message is a reply to another message.
|
|
|
|
Args:
|
|
message (Message): The message to check.
|
|
|
|
Returns:
|
|
tuple[bool, Message]: A tuple containing a boolean indicating if the message is a reply and the replied message.
|
|
"""
|
|
logger.debug(f"Checking if message ID {message.id} is a reply.")
|
|
if message.reference is not None:
|
|
replied_msg = message.reference.cached_message
|
|
|
|
if replied_msg is None:
|
|
logger.debug(f"Replied message not found in cache, fetching from channel ID {message.reference.channel_id}.")
|
|
channel = await get_channel(message.reference.channel_id)
|
|
replied_msg = await channel.fetch_message(message.reference.message_id)
|
|
|
|
if isinstance(replied_msg, Message):
|
|
return True, replied_msg
|
|
return False, None
|
|
|
|
async def get_reply_chain(msg: Message) -> list[Message]:
|
|
"""
|
|
Retrieves the chain of replies for a given message.
|
|
|
|
Args:
|
|
msg (Message): The message to get the reply chain for.
|
|
|
|
Returns:
|
|
list[Message]: A list of messages in the reply chain.
|
|
"""
|
|
logger.debug(f"Getting reply chain for message ID {msg.id}.")
|
|
i = 0
|
|
reply_chain = [msg]
|
|
while i < 10:
|
|
is_reply, msg = await msg_is_reply(msg)
|
|
if not is_reply:
|
|
break
|
|
reply_chain.insert(0, msg)
|
|
i += 1
|
|
return reply_chain
|
|
|
|
async def gen_message_list(discord_messages: list[Message]) -> list[dict]:
|
|
"""
|
|
Generates a list of message dictionaries from a list of Discord messages.
|
|
|
|
Args:
|
|
discord_messages (list[Message]): The list of Discord messages.
|
|
|
|
Returns:
|
|
list[dict]: A list of message dictionaries.
|
|
"""
|
|
logger.debug("Generating message list from Discord messages.")
|
|
messages = []
|
|
if not isinstance(discord_messages, list):
|
|
discord_messages = [discord_messages]
|
|
|
|
for message in discord_messages:
|
|
students_mentioned = None
|
|
if isinstance(message, Message):
|
|
message_copy = {
|
|
"id": message.id,
|
|
"content": message.content,
|
|
"author": message.author,
|
|
"mentions": message.mentions,
|
|
"created_at": message.created_at,
|
|
"jump_url": message.jump_url
|
|
}
|
|
role = "assistant" if message_copy["author"].id == bot.user.id else "user"
|
|
students_mentioned = None
|
|
|
|
if message_copy["mentions"] and role == "user":
|
|
students_mentioned = [student.id for student in message_copy["mentions"] if student.id in STUDENT_IDS]
|
|
mentions = re.findall(r'<@\d{17,19}>', message_copy["content"])
|
|
for mention in mentions:
|
|
mention_id = mention.replace("<@", "").replace(">", "")
|
|
student_info = ID_TO_NAME.get(int(mention_id))
|
|
if student_info:
|
|
message_copy["content"] = message_copy["content"].replace(mention, student_info.split("\n")[0])
|
|
message_copy["content"] += '\n{'
|
|
if students_mentioned:
|
|
message_copy["content"] += f"\n\nInfo on students mentioned (DO NOT REPEAT!): "
|
|
for student in students_mentioned:
|
|
message_copy["content"] += f"\n{STUDENTS.get(student)}"
|
|
message_copy["content"] += f"\n\nSent by: {STUDENTS.get(message_copy['author'].id) if message_copy['author'].id in STUDENT_IDS else message_copy['author'].name + ' (Author\'s details not found)'}" + '}'
|
|
|
|
if not message_copy["content"]:
|
|
debug = True
|
|
logger.error(f"No message content for message {message_copy['id']}.")
|
|
fetched_message = await get_message(message_copy["id"])
|
|
message_copy["content"] = fetched_message.content
|
|
|
|
thread_message = {
|
|
"role": role,
|
|
"content": message_copy["content"]
|
|
}
|
|
debug = True # Delete after fix
|
|
if debug:
|
|
debug_dump = "Messages Dump:\n"
|
|
for msg_copy in discord_messages:
|
|
debug_dump += f"""
|
|
Message: {msg_copy}
|
|
Message Time: {msg_copy.created_at}
|
|
Message ID: {msg_copy.id}
|
|
Message Author: {msg_copy.author}
|
|
Message Content: {msg_copy.content}
|
|
Link: {msg_copy.jump_url}
|
|
"""
|
|
logger.debug(debug_dump)
|
|
messages.append(thread_message)
|
|
else:
|
|
logger.warning(f"Argument {message} is not of type Message and will be skipped.")
|
|
|
|
return messages
|
|
|
|
# ---------- AI Interaction Functions ----------
|
|
async def send_quote(quote: str = None) -> None:
|
|
"""
|
|
Sends a quote to the general channel.
|
|
|
|
Args:
|
|
quote (str, optional): The quote to send. If not provided, a random quote is selected.
|
|
"""
|
|
if not quote:
|
|
quote = select_quote()
|
|
logger.info(f"Sending quote '{quote}' in #general...")
|
|
await send_message(quote)
|
|
|
|
async def after_class(student: int = None) -> None:
|
|
"""
|
|
Sends a message to a student to see the bot after class.
|
|
|
|
Args:
|
|
student (int, optional): The ID of the student. If not provided, a random student is selected.
|
|
"""
|
|
if not student:
|
|
student = select_student()
|
|
logger.info(f"Sending mention to see {student} after class to #general...")
|
|
await send_message(f"Come see me after class <@{student}>")
|
|
|
|
async def has_essay(message: Message) -> bool:
|
|
if message.author.id in ASSIGNED_ESSAY:
|
|
async with message.channel.typing():
|
|
message = await get_message(message) if not message else message
|
|
message_list = await gen_message_list(message)
|
|
response = await run(
|
|
message_list,
|
|
f"{get_instructions(message)}\n\nTHIS USER HAS AN ESSAY!!! Yell at them and tell them to finish their essay on {ASSIGNED_ESSAY.get(message.author.id)}!!! Unless...they sent you the essay, in that case, you can decide whether or not it is satisfactory (it needs to be in MLA format and 300-2000 characters long, and to be relevant of course). If it is satisfactory, you can tell them they are free to go, after you run the function \"clear_essay\".",
|
|
message.author.id
|
|
)
|
|
await message.reply(response)
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
# ---------- Discord Commands & Event Handlers ----------
|
|
def setup_discord_bot(bot: Bot) -> None:
|
|
"""
|
|
Sets up the Discord bot with commands and event listeners.
|
|
|
|
Args:
|
|
bot (Bot): The Discord bot instance.
|
|
"""
|
|
@bot.slash_command(description="Talk to Mr. Jacobs!!!")
|
|
async def message(ctx: discord.ApplicationContext, text: str) -> None:
|
|
"""
|
|
Slash command to talk to Mr. Jacobs. The bot will respond to the user's message using AI.
|
|
|
|
Args:
|
|
ctx (discord.ApplicationContext): The context of the command.
|
|
text (str): The message text.
|
|
"""
|
|
logging.info(f"User {ctx.author.global_name} sent message {text}")
|
|
await ctx.defer()
|
|
instructions = get_instructions(ctx.channel)
|
|
thread_messages = [{"role": "user", "content": text}]
|
|
response = await run(thread_messages, instructions, ctx.author.id)
|
|
await ctx.respond(content=response)
|
|
|
|
@bot.slash_command(
|
|
name="rps",
|
|
description="Play \"Rock, Paper, Scissors, Essay\" with Mr. Jacobs!!!",
|
|
)
|
|
async def rps_essay(
|
|
ctx: discord.ApplicationContext,
|
|
choice: str = discord.Option(
|
|
description="Your selection for the game",
|
|
choices=[
|
|
discord.OptionChoice("Rock"),
|
|
discord.OptionChoice("Paper"),
|
|
discord.OptionChoice("Scissors")
|
|
]
|
|
)
|
|
) -> None:
|
|
"""
|
|
Play Rock Paper Scissors with Mr. Jacobs.
|
|
|
|
Args:
|
|
ctx: Application command context
|
|
choice: Your selection for the game
|
|
"""
|
|
logging.info(f"{ctx.author} chose {choice}")
|
|
outcomes = {"Rock": "Paper", "Paper": "Scissors", "Scissors": "Rock"}
|
|
await ctx.respond(
|
|
f"I choose {outcomes[choice]}, you owe me a {randint(1,15)} page essay "
|
|
f"on {assign_essay(ctx.author.id)}. MLA format, double spaced, "
|
|
"12pt Times New Roman!"
|
|
)
|
|
|
|
@bot.event
|
|
async def on_message(message: Message) -> None:
|
|
"""
|
|
Event listener for new messages. The bot will respond to messages that mention it or reply to its messages.
|
|
Bot disregards its own messages.
|
|
|
|
Args:
|
|
message (Message): The new message.
|
|
"""
|
|
logger.debug(f"New message received: {message.content}")
|
|
|
|
if message.author.id == bot.user.id or message.interaction_metadata:
|
|
return
|
|
|
|
if await handle_reply(message):
|
|
return
|
|
|
|
if await handle_mentions(message):
|
|
return
|
|
|
|
async def handle_reply(message: Message) -> bool:
|
|
"""
|
|
Handles replies to the bot's messages.
|
|
|
|
Args:
|
|
message (Message): The new message.
|
|
|
|
Returns:
|
|
bool: True if the message was handled, False otherwise.
|
|
"""
|
|
is_reply, first_reply = await msg_is_reply(message)
|
|
if is_reply and first_reply.author.id == bot.user.id:
|
|
async with message.channel.typing():
|
|
if await has_essay(message):
|
|
return True
|
|
reply_chain = await get_reply_chain(message)
|
|
thread_messages = await gen_message_list(reply_chain)
|
|
await process_thread_messages(thread_messages, message)
|
|
return True
|
|
return False
|
|
|
|
async def handle_mentions(message: Message) -> bool:
|
|
"""
|
|
Handles messages that mention the bot.
|
|
|
|
Args:
|
|
message (Message): The new message.
|
|
|
|
Returns:
|
|
bool: True if the message was handled, False otherwise.
|
|
"""
|
|
if message.mentions:
|
|
for user in message.mentions:
|
|
if user.id == bot.user.id:
|
|
async with message.channel.typing():
|
|
if await has_essay(message):
|
|
return True
|
|
thread_messages = await gen_message_list(message)
|
|
await process_thread_messages(thread_messages, message)
|
|
return True
|
|
return False
|
|
|
|
async def process_thread_messages(thread_messages: list, message: Message) -> None:
|
|
"""
|
|
Processes a list of thread messages and sends a response.
|
|
|
|
Args:
|
|
thread_messages (list): The list of thread messages.
|
|
message (Message): The original message.
|
|
"""
|
|
instructions = get_instructions(message)
|
|
response = await run(thread_messages, instructions, message.author.id)
|
|
await message.reply(response)
|