from flask import Flask, request import pandas as pd from openai import OpenAI import requests import time import re from wakeonlan import send_magic_packet from ai_pic import story_start, story_start_p2p, story_start_p2p_sese from msg_send_save import send_private_message_word,save_group_message,send_group_message_pic,send_group_message_word,send_group_poke,del_group_message from chat_model import AI_chat,AI_get_picprompt,AI_lora_getpic_prompt,AI_sendphoto_ornot from special_date import get_week,get_v50,get_holidays from config import config_get import random import os app = Flask(__name__) # 读取王石列表,存为stones数据df文件 stones = pd.read_csv('other/stone.csv') if 'Unnamed: 0' in stones.columns: stones = stones.drop('Unnamed: 0', axis=1) for i in range(len(stones)): final_stat = "\n" abilities = stones.loc[i, 'stats'].split("&") for j in range(len(abilities)): ability = abilities[j].split("#") if ability[2] != "Nan": final_stat += ability[2]+":" if ability[0].endswith("%"): final_stat += ability[0].split("%")[0] + ability[1] +"% " else: final_stat += ability[0] + ability[1] + " " stones.loc[i,"stat"] = final_stat # QQ机器人基础板块,包括人格,绘图风格和名称自我意识 my_name = config_get("my_name") prompt_max = config_get("prompt_max") prompt_less = config_get("prompt_less") # 一个比较没用的变量,用来限制绘画cd的,现在默认是0就是无cd last_request_time = 0 # 最近一次的心跳上报时间,用来记录服务状态 last_heartbeat_time = time.time() def on_my_message(group_id, user_id, user_name, raw_message, message): """ 每次触发猫猫关键词,收到@使用这个方法来进行回应 """ global last_request_time # 先对解码后消息进行判断,如果第一个解码消息为@,则触发命令模式,遍历完成若没有命令触发则再触发人格对话 if message[0]['type'] == "at" and message[1]['type'] == "text": command = message[1]['data']['text'].strip(" ") if str(command).startswith("/"): on_command(group_id, user_id, command) elif command=="v你50": print(send_group_message_pic(group_id, user_id, 'v50.png')) else: AI_ans = AI_chat(group_id, user_name+":"+raw_message, prompt_less) send_group_message_word(group_id, AI_ans) save_group_message(group_id, my_name, AI_ans) # 根据条件触发AI绘画 if AI_sendphoto_ornot("猫猫:"+AI_ans) == "是": current_time = time.time() # 判断距离上次请求图片的时间间隔是否小于60秒(1分钟) if current_time-last_request_time >= 0: last_request_time = current_time send_group_message_pic(group_id, user_id, story_start(AI_get_picprompt(group_id))) else: AI_ans = AI_chat(group_id, user_name + ":" + raw_message, prompt_less) send_group_message_word(group_id, AI_ans) save_group_message(group_id, my_name, AI_ans) # 根据条件触发AI绘画 if AI_sendphoto_ornot(AI_ans) == "是": current_time = time.time() # 判断距离上次请求图片的时间间隔是否小于60秒(1分钟) if current_time - last_request_time >= 0: last_request_time = current_time send_group_message_pic(group_id, user_id, story_start(AI_get_picprompt(group_id))) def on_command(group_id, user_id,command): """ 每当接到命令符号时,开启这个功能进行回应 """ global last_request_time if command == "/菜单": home = """欢迎使用命令功能: /菜单:显示功能介绍 /王石:查询王石效果 /猫猫画画:调用自定义AI绘画功能 /整点瑟瑟:不用我多说了吧 所有命令均需要@猫猫触发""" send_group_message_word(group_id, home) save_group_message(group_id, my_name, home) elif str(command).startswith("/王石"): stone_stat = ch_stone(command) send_group_message_word(group_id, stone_stat) save_group_message(group_id, my_name, stone_stat) elif str(command).startswith("/猫猫画画"): pic_word = str(command)[len("/猫猫画画"):] current_time = time.time() # 判断距离上次请求图片的时间间隔是否小于60秒(1分钟) if current_time - last_request_time >= 0: send_group_message_word(group_id, "图片正在生成中...") last_request_time = current_time send_group_message_pic(group_id, user_id, story_start(AI_lora_getpic_prompt(pic_word))) elif str(command).startswith("/整点瑟瑟"): pic_word = str(command)[len("/猫猫瑟瑟"):] current_time = time.time() # 判断距离上次请求图片的时间间隔是否小于60秒(1分钟) if group_id == 637612718: send_group_message_word(group_id, "本群该功能已被禁用!") else: if current_time - last_request_time >= 0: send_group_message_word(group_id, "图片正在生成中...") last_request_time = current_time message_id = send_group_message_pic(group_id, user_id, story_start("nsfw,"+AI_lora_getpic_prompt(pic_word)))['data']['message_id'] time.sleep(8) del_group_message(message_id) else: error = "抱歉没有找到你所输入的命令!如有关于命令的问题请@猫猫输入/菜单进行查询\n示例:\n@猫猫 /菜单" send_group_message_word(group_id, error) save_group_message(group_id, my_name, error) def ch_stone(name): """ 根据关键词搜索王石,返回具体介绍 """ word = name.lstrip("/王石") for i in range(len(stones)): if stones.loc[i, 'name'] == word: stone_up_on = stones.loc[i, 'up_on'] if stone_up_on == "Nan": stone_up_on = "" else: stone_up_on = "\n可强化:" + stone_up_on return stones.loc[i, 'type'] +" " + stones.loc[i, 'name'] + stone_up_on + stones.loc[i, 'stat'] return "这里似乎没有你想找的东西喵~\n如果要查找王石,请使用/王石查询正确的王石名称,示例:\n@猫猫 /王石科隆老大\n如果不清楚王石全名,可以在[掌上波多姆]中进行模糊搜索喵" def change2setu(raw_message,group_id,user_id): """ 进入变瑟图命令 """ message = str(raw_message) # 提取 QQ 号 pattern = r'\[CQ:at,qq=(\d+),name=[^]]+\]' match = re.search(pattern, message) if match: qq = match.group(1) try: img_url = f'https://q1.qlogo.cn/g?b=qq&nk={qq}&s=640' # 创建 user 文件夹 on_file_path = os.path.join('user', qq) if not os.path.exists(on_file_path): os.makedirs(on_file_path) # 下载图片 img_response = requests.get(img_url) img_response.raise_for_status() file_path = os.path.join('user', qq, f'{qq}.jpg') with open(file_path, 'wb') as file: file.write(img_response.content) print(f'图片已保存到 {file_path}') send_group_message_word(group_id, "开始变身!") message_id = send_group_message_pic(group_id, user_id, story_start_p2p_sese(qq)) print(message_id) time.sleep(8) del_group_message(message_id['data']['message_id']) except Exception as e: print(f'发生未知错误: {e}') send_group_message_word(group_id, "变身失败!") else: print('未找到 QQ 号') def change2girl(raw_message,group_id,user_id): """ 进入变美少女命令 """ message = str(raw_message) # 提取 QQ 号 pattern = r'\[CQ:at,qq=(\d+),name=[^]]+\]' match = re.search(pattern, message) if match: qq = match.group(1) try: img_url = f'https://q1.qlogo.cn/g?b=qq&nk={qq}&s=640' # 创建 user 文件夹 on_file_path = os.path.join('user', qq) if not os.path.exists(on_file_path): os.makedirs(on_file_path) # 下载图片 img_response = requests.get(img_url) img_response.raise_for_status() file_path = os.path.join('user', qq, f'{qq}.jpg') with open(file_path, 'wb') as file: file.write(img_response.content) print(f'图片已保存到 {file_path}') send_group_message_word(group_id, "开始变身!") send_group_message_pic(group_id, user_id, story_start_p2p(qq)) except Exception as e: print(f'发生未知错误: {e}') send_group_message_word(group_id, "变身失败!") else: print('未找到 QQ 号') @app.route('/stat', methods=['GET']) def stat_get(): return {"heartbeat": last_heartbeat_time} @app.route('/cat', methods=['POST']) def chat(): global last_heartbeat_time data = request.json data_type = data['post_type'] # 管理员模式,目前只有开启主机功能 if 'message_type' in data: if data['message_type'] == 'private': # 进入管理员模式 if data['raw_message'] == "原神启动" and data['user_id'] == 3494084267: send_magic_packet("22-43-4D-05-34-B2") send_private_message_word(data['user_id'], "主机3B0IJ2U已启动!") print(data) # 进入群聊模式 if data_type == "message" and data['message_type'] == "group": group_id = data['group_id'] user_id = data['user_id'] message = data['message'] raw_message = data['raw_message'] time_now = data['time'] user_name = data['sender']['nickname'] # 首先判断消息类型,如果为图片类型则存入url,否则存入csv文件中 if message[0].get('type')=='image': image_data = message[0].get('data', {}) url = image_data.get('url') save_group_message(group_id, user_name, f"图片内容:{url}") else: save_group_message(group_id, user_name, raw_message) # 然后根据命令优先级进入AI对话模式,在这里写入AI模块 if str(raw_message).startswith("变美少女[CQ:at"): change2girl(raw_message, group_id, user_id) elif str(raw_message).startswith("变瑟图[CQ:at"): if group_id == 637612718: send_group_message_word(group_id,"本群该功能已被禁用!") else: change2setu(raw_message, group_id, user_id) elif my_name in raw_message: on_my_message(group_id, user_id, user_name, raw_message, message) print( f"收到message!群号:{group_id},发送人:{user_id},时间{time_now}\n" f"消息内容:{message}\n原始消息内容:{raw_message}") else: print(f"群聊{group_id}已记录新消息!") # 进入提示消息模式 elif data_type == "notice": notice_type = data['notice_type'] group_id = data['group_id'] if notice_type == "group_increase": user_id = data['user_id'] print(f"群成员增加!群号:{group_id},加入者:{user_id}") requests.post('http://127.0.0.1:3000/send_group_msg', json={ 'group_id': group_id, 'message': [{ "type": "at", "data": { "qq": f"{user_id}" } }, { 'type': 'text', 'data': { 'text': "欢迎新成员加入群聊喵~" } }] }) elif notice_type == "notify" and data['target_id'] == data['self_id']: user_id = data['user_id'] print(f"我被{user_id}戳了一下!") send_group_poke(group_id,user_id) if get_week()=="星期四": # 触发疯狂星期四文案 # 生成一个0到99之间的随机整数,控制特殊文案概率为40% rand_num = random.randint(0, 99) print(rand_num) send_group_message_word(group_id,get_v50()) if rand_num < 30 else send_group_message_word(group_id,"₍^ >ヮ<^₎") else: send_group_message_word(group_id,"₍^ >ヮ<^₎") elif data_type == "meta_event": if data["meta_event_type"] == "heartbeat": last_heartbeat_time = data['time'] return "" if __name__ == '__main__': app.run(host='0.0.0.0', port=3100)