Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import discord | |
import os | |
import threading | |
from discord.ext import commands | |
import gradio_client | |
import gradio as gr | |
from gradio_client import Client | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
import asyncio | |
from collections import deque | |
import re | |
import random | |
DISCORD_TOKEN = os.environ.get("DISCORD_TOKEN", None) | |
intents = discord.Intents.all() | |
bot = commands.Bot(command_prefix='!', intents=intents) | |
# Enhanced welcome system with AI | |
welcome_list = [] | |
recent_introductions = deque(maxlen=15) # Store last 15 full introductions for rich context | |
model = None | |
tokenizer = None | |
# Initialize the AI model | |
def initialize_ai_model(): | |
global model, tokenizer | |
try: | |
print("π€ Loading BitNet AI model for personalized welcomes...") | |
model_id = "microsoft/bitnet-b1.58-2B-4T" | |
tokenizer = AutoTokenizer.from_pretrained(model_id) | |
model = AutoModelForCausalLM.from_pretrained( | |
model_id, | |
torch_dtype=torch.bfloat16 | |
) | |
print("β AI model loaded successfully!") | |
except Exception as e: | |
print(f"β Failed to load AI model: {e}") | |
print("π Falling back to traditional welcome messages...") | |
# Fallback welcome messages (original ones) | |
fallback_welcome_messages = [ | |
"Welcome to the community <:hugging_croissant:1103375763207622656> \n", | |
"Good to have you with us! :hugging: Got any cool projects you feel like sharing? :eyes: \n", | |
"Welcome aboard π¦ β΅ \n", | |
"Hello friends! :wave: Where are you folks from? :globe_with_meridians: <:hugging_earth:968126923408564306> \n", | |
"Glad you're here! Welcome! π \n", | |
"Happy to have you with us! <:blobcatlove:1103376097841790986> How much have you played around with ML/AI? :computer: \n", | |
"New faces, new friends! Welcome! ππ \n" | |
] | |
welcome_messages_counter = 0 | |
wait_messages_counter = 0 | |
channel_id = 900017973547388988 # 900017973547388988 = #introduce-yourself | |
def store_full_introduction(message_content, author_name): | |
"""Store the complete introduction message for rich AI context""" | |
return { | |
'author': author_name, | |
'content': message_content, | |
'timestamp': discord.utils.utcnow().isoformat() | |
} | |
async def generate_ai_welcome_message(new_members, recent_context): | |
"""Generate a personalized welcome message using BitNet AI with full introduction context""" | |
if not model or not tokenizer: | |
return None | |
try: | |
# Build rich context from recent full introductions | |
if recent_context: | |
context_intros = "\n".join([ | |
f"- {intro['author']}: {intro['content'][:300]}..." if len(intro['content']) > 300 | |
else f"- {intro['author']}: {intro['content']}" | |
for intro in list(recent_context)[-8:] # Use last 8 intros for context | |
]) | |
else: | |
context_intros = "No recent introductions available." | |
# Create the AI prompt with full context | |
system_prompt = """You are a friendly, encouraging Discord community welcomer for a tech/AI community. | |
You'll be given recent introductions from community members to understand the vibe and interests. | |
Generate a warm, personalized welcome message that: | |
- Is enthusiastic and welcoming but not overwhelming | |
- References themes or interests you notice from recent introductions | |
- Asks an engaging question that connects to what people are discussing | |
- Uses 1-2 relevant emojis | |
- Keeps it concise (2-3 sentences max) | |
- Feels natural and conversational | |
DO NOT mention the new members' @ tags in your message - they will be added separately.""" | |
user_prompt = f"""Here are recent introductions from the community: | |
{context_intros} | |
Based on these introductions, generate a welcoming message for new members joining the community. Make it feel connected to what current members are sharing and interested in.""" | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_prompt}, | |
] | |
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
chat_input = tokenizer(prompt, return_tensors="pt").to(model.device) | |
# Generate with controlled parameters | |
with torch.no_grad(): | |
chat_outputs = model.generate( | |
**chat_input, | |
max_new_tokens=100, | |
do_sample=True, | |
temperature=0.8, | |
top_p=0.9, | |
pad_token_id=tokenizer.eos_token_id | |
) | |
response = tokenizer.decode( | |
chat_outputs[0][chat_input['input_ids'].shape[-1]:], | |
skip_special_tokens=True | |
).strip() | |
# Clean up the response | |
response = re.sub(r'\n+', ' ', response) | |
response = response[:400] # Limit length but allow more room | |
return response | |
except Exception as e: | |
print(f"β AI generation failed: {e}") | |
return None | |
async def on_ready(): | |
print(f'π€ {bot.user} has landed! Ready to create amazing welcomes!') | |
# Initialize AI model in background | |
loop = asyncio.get_event_loop() | |
await loop.run_in_executor(None, initialize_ai_model) | |
async def on_member_join(member): | |
global welcome_list | |
global welcome_messages_counter | |
welcome_list.append(member.mention) | |
if len(welcome_list) >= 8: | |
channel = bot.get_channel(channel_id) | |
print(f"channel: {channel}") | |
# Check if the channel has received at least 3 messages from other users since the last bot message | |
count = 0 | |
print(f"count: {count}") | |
async for message in channel.history(limit=3): | |
if message.author.bot: | |
print(f"This is a bot message! -> {message.content}") | |
else: | |
count = count + 1 | |
print(f"count: {count}") | |
if count == 3: | |
print(f"count: {count}") | |
# Try to generate AI welcome message | |
ai_message = await generate_ai_welcome_message(welcome_list[:8], list(recent_introductions)) | |
if ai_message: | |
# Use AI-generated message | |
message = f'{ai_message} {" ".join(welcome_list[:8])}' | |
print(f"π€ Generated AI welcome: {message}") | |
else: | |
# Fallback to traditional messages | |
message = f'{fallback_welcome_messages[welcome_messages_counter]} {welcome_list[0]} {welcome_list[1]} {welcome_list[2]} {welcome_list[3]} {welcome_list[4]} {welcome_list[5]} {welcome_list[6]} {welcome_list[7]}' | |
if welcome_messages_counter == 6: | |
welcome_messages_counter = -1 | |
welcome_messages_counter = welcome_messages_counter + 1 | |
print(f"π Using fallback welcome message") | |
await channel.send(message) | |
welcome_list = [] | |
else: | |
print(f"welcome_list: {welcome_list}") | |
async def on_message(message): | |
# React to introductions | |
if message.channel.id == 900017973547388988: | |
await message.add_reaction('π€') | |
# Store full introduction for rich context (if it's not from a bot and has substantial content) | |
if not message.author.bot and len(message.content) > 20: | |
full_intro = store_full_introduction(message.content, message.author.display_name) | |
recent_introductions.append(full_intro) | |
print(f"π Stored full introduction from {message.author.display_name}: {message.content[:100]}...") | |
await bot.process_commands(message) | |
# New command to test AI welcome generation | |
async def test_welcome(ctx): | |
"""Test the AI welcome message generation (admin only)""" | |
if not ctx.author.guild_permissions.administrator: | |
await ctx.send("β Only admins can test this feature!") | |
return | |
# Generate a test welcome | |
test_members = [ctx.author.mention] | |
ai_message = await generate_ai_welcome_message(test_members, list(recent_introductions)) | |
if ai_message: | |
await ctx.send(f"π€ **AI Test Welcome:**\n{ai_message}") | |
else: | |
await ctx.send("β AI generation failed, check console for errors.") | |
async def recent_intros(ctx): | |
"""Show recent introductions stored for AI context (admin only)""" | |
if not ctx.author.guild_permissions.administrator: | |
await ctx.send("β Only admins can view stored introductions!") | |
return | |
if not recent_introductions: | |
await ctx.send("π No recent introductions stored yet.") | |
return | |
intro_list = [] | |
for i, intro in enumerate(list(recent_introductions)[-5:], 1): # Show last 5 | |
preview = intro['content'][:150] + "..." if len(intro['content']) > 150 else intro['content'] | |
intro_list.append(f"**{i}. {intro['author']}:** {preview}") | |
intros_text = "\n\n".join(intro_list) | |
await ctx.send(f"π **Recent Introductions (Last 5):**\n\n{intros_text}") | |
async def welcome_stats(ctx): | |
"""Show welcome system statistics""" | |
if not ctx.author.guild_permissions.administrator: | |
await ctx.send("β Only admins can view stats!") | |
return | |
ai_status = "β Loaded" if model and tokenizer else "β Not loaded" | |
intro_count = len(recent_introductions) | |
waiting_count = len(welcome_list) | |
stats_message = f"""π **Welcome System Stats** | |
π€ AI Model: {ai_status} | |
π Full Intros Stored: {intro_count}/15 | |
β³ Members Waiting: {waiting_count}/8 | |
π― Channel ID: {channel_id}""" | |
await ctx.send(stats_message) | |
# Fun gradio interface | |
def greet(name): | |
return f"Hello {name}! π The AI-powered Discord bot is running!" | |
DISCORD_TOKEN = os.environ.get("DISCORD_TOKEN", None) | |
def run_bot(): | |
bot.run(DISCORD_TOKEN) | |
# Start bot in separate thread | |
threading.Thread(target=run_bot).start() | |
# Launch Gradio interface | |
demo = gr.Interface( | |
fn=greet, | |
inputs="text", | |
outputs="text", | |
title="π€ AI-Powered Discord Welcome Bot", | |
description="Enhanced with BitNet b1.58 for personalized community welcomes!" | |
) | |
demo.launch() |