123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- from flask import Flask, request
- import pandas as pd
- from openai import OpenAI
- import requests
- import time
- import re
- from wakeonlan import send_magic_packet
- from ai_pic import story_start, story_start_p2p, story_start_p2p_sese
- from msg_send_save import send_private_message_word,save_group_message,send_group_message_pic,send_group_message_word,send_group_poke,del_group_message
- from chat_model import AI_chat,AI_get_picprompt,AI_lora_getpic_prompt,AI_sendphoto_ornot
- from special_date import get_week,get_v50,get_holidays
- from config import config_get
- import random
- import os
- app = Flask(__name__)
- # 读取王石列表,存为stones数据df文件
- stones = pd.read_csv('other/stone.csv')
- if 'Unnamed: 0' in stones.columns:
- stones = stones.drop('Unnamed: 0', axis=1)
- for i in range(len(stones)):
- final_stat = "\n"
- abilities = stones.loc[i, 'stats'].split("&")
- for j in range(len(abilities)):
- ability = abilities[j].split("#")
- if ability[2] != "Nan":
- final_stat += ability[2]+":"
- if ability[0].endswith("%"):
- final_stat += ability[0].split("%")[0] + ability[1] +"% "
- else:
- final_stat += ability[0] + ability[1] + " "
- stones.loc[i,"stat"] = final_stat
- # QQ机器人基础板块,包括人格,绘图风格和名称自我意识
- my_name = config_get("my_name")
- prompt_max = config_get("prompt_max")
- prompt_less = config_get("prompt_less")
- # 一个比较没用的变量,用来限制绘画cd的,现在默认是0就是无cd
- last_request_time = 0
- def on_my_message(group_id, user_id, user_name, raw_message, message):
- """
- 每次触发猫猫关键词,收到@使用这个方法来进行回应
- """
- global last_request_time
- # 先对解码后消息进行判断,如果第一个解码消息为@,则触发命令模式,遍历完成若没有命令触发则再触发人格对话
- if message[0]['type'] == "at" and message[1]['type'] == "text":
- command = message[1]['data']['text'].strip(" ")
- if str(command).startswith("/"):
- on_command(group_id, user_id, command)
- elif command=="v你50":
- print(send_group_message_pic(group_id, user_id, 'v50.png'))
- else:
- AI_ans = AI_chat(group_id, user_name+":"+raw_message, prompt_less)
- send_group_message_word(group_id, AI_ans)
- save_group_message(group_id, my_name, AI_ans)
- # 根据条件触发AI绘画
- if AI_sendphoto_ornot("猫猫:"+AI_ans) == "是":
- current_time = time.time()
- # 判断距离上次请求图片的时间间隔是否小于60秒(1分钟)
- if current_time-last_request_time >= 0:
- last_request_time = current_time
- send_group_message_pic(group_id, user_id, story_start(AI_get_picprompt(group_id)))
- else:
- AI_ans = AI_chat(group_id, user_name + ":" + raw_message, prompt_less)
- send_group_message_word(group_id, AI_ans)
- save_group_message(group_id, my_name, AI_ans)
- # 根据条件触发AI绘画
- if AI_sendphoto_ornot(AI_ans) == "是":
- current_time = time.time()
- # 判断距离上次请求图片的时间间隔是否小于60秒(1分钟)
- if current_time - last_request_time >= 0:
- last_request_time = current_time
- send_group_message_pic(group_id, user_id, story_start(AI_get_picprompt(group_id)))
- def on_command(group_id, user_id,command):
- """
- 每当接到命令符号时,开启这个功能进行回应
- """
- global last_request_time
- if command == "/菜单":
- home = """欢迎使用命令功能:
- /菜单:显示功能介绍
- /王石:查询王石效果
- /猫猫画画:调用自定义AI绘画功能
- /整点瑟瑟:不用我多说了吧
- 所有命令均需要@猫猫触发"""
- send_group_message_word(group_id, home)
- save_group_message(group_id, my_name, home)
- elif str(command).startswith("/王石"):
- stone_stat = ch_stone(command)
- send_group_message_word(group_id, stone_stat)
- save_group_message(group_id, my_name, stone_stat)
- elif str(command).startswith("/猫猫画画"):
- pic_word = str(command)[len("/猫猫画画"):]
- current_time = time.time()
- # 判断距离上次请求图片的时间间隔是否小于60秒(1分钟)
- if current_time - last_request_time >= 0:
- send_group_message_word(group_id, "图片正在生成中...")
- last_request_time = current_time
- send_group_message_pic(group_id, user_id, story_start(AI_lora_getpic_prompt(pic_word)))
- elif str(command).startswith("/整点瑟瑟"):
- pic_word = str(command)[len("/猫猫瑟瑟"):]
- current_time = time.time()
- # 判断距离上次请求图片的时间间隔是否小于60秒(1分钟)
- if group_id == 637612718:
- send_group_message_word(group_id, "本群该功能已被禁用!")
- else:
- if current_time - last_request_time >= 0:
- send_group_message_word(group_id, "图片正在生成中...")
- last_request_time = current_time
- message_id = send_group_message_pic(group_id, user_id, story_start("nsfw,"+AI_lora_getpic_prompt(pic_word)))['data']['message_id']
- time.sleep(8)
- del_group_message(message_id)
- else:
- error = "抱歉没有找到你所输入的命令!如有关于命令的问题请@猫猫输入/菜单进行查询\n示例:\n@猫猫 /菜单"
- send_group_message_word(group_id, error)
- save_group_message(group_id, my_name, error)
- def ch_stone(name):
- """
- 根据关键词搜索王石,返回具体介绍
- """
- word = name.lstrip("/王石")
- for i in range(len(stones)):
- if stones.loc[i, 'name'] == word:
- stone_up_on = stones.loc[i, 'up_on']
- if stone_up_on == "Nan":
- stone_up_on = ""
- else:
- stone_up_on = "\n可强化:" + stone_up_on
- return stones.loc[i, 'type'] +" " + stones.loc[i, 'name'] + stone_up_on + stones.loc[i, 'stat']
- return "这里似乎没有你想找的东西喵~\n如果要查找王石,请使用/王石查询正确的王石名称,示例:\n@猫猫 /王石科隆老大\n如果不清楚王石全名,可以在[掌上波多姆]中进行模糊搜索喵"
- def change2setu(raw_message,group_id,user_id):
- """
- 进入变瑟图命令
- """
- message = str(raw_message)
- # 提取 QQ 号
- pattern = r'\[CQ:at,qq=(\d+),name=[^]]+\]'
- match = re.search(pattern, message)
- if match:
- qq = match.group(1)
- try:
- img_url = f'https://q1.qlogo.cn/g?b=qq&nk={qq}&s=640'
- # 创建 user 文件夹
- on_file_path = os.path.join('user', qq)
- if not os.path.exists(on_file_path):
- os.makedirs(on_file_path)
- # 下载图片
- img_response = requests.get(img_url)
- img_response.raise_for_status()
- file_path = os.path.join('user', qq, f'{qq}.jpg')
- with open(file_path, 'wb') as file:
- file.write(img_response.content)
- print(f'图片已保存到 {file_path}')
- send_group_message_word(group_id, "开始变身!")
- message_id = send_group_message_pic(group_id, user_id, story_start_p2p_sese(qq))
- print(message_id)
- time.sleep(8)
- del_group_message(message_id['data']['message_id'])
- except Exception as e:
- print(f'发生未知错误: {e}')
- send_group_message_word(group_id, "变身失败!")
- else:
- print('未找到 QQ 号')
- def change2girl(raw_message,group_id,user_id):
- """
- 进入变美少女命令
- """
- message = str(raw_message)
- # 提取 QQ 号
- pattern = r'\[CQ:at,qq=(\d+),name=[^]]+\]'
- match = re.search(pattern, message)
- if match:
- qq = match.group(1)
- try:
- img_url = f'https://q1.qlogo.cn/g?b=qq&nk={qq}&s=640'
- # 创建 user 文件夹
- on_file_path = os.path.join('user', qq)
- if not os.path.exists(on_file_path):
- os.makedirs(on_file_path)
- # 下载图片
- img_response = requests.get(img_url)
- img_response.raise_for_status()
- file_path = os.path.join('user', qq, f'{qq}.jpg')
- with open(file_path, 'wb') as file:
- file.write(img_response.content)
- print(f'图片已保存到 {file_path}')
- send_group_message_word(group_id, "开始变身!")
- send_group_message_pic(group_id, user_id, story_start_p2p(qq))
- except Exception as e:
- print(f'发生未知错误: {e}')
- send_group_message_word(group_id, "变身失败!")
- else:
- print('未找到 QQ 号')
- @app.route('/cat', methods=['POST'])
- def chat():
- data = request.json
- data_type = data['post_type']
- # 管理员模式,目前只有开启主机功能
- if 'message_type' in data:
- if data['message_type'] == 'private':
- # 进入管理员模式
- if data['raw_message'] == "原神启动" and data['user_id'] == 3494084267:
- send_magic_packet("22-43-4D-05-34-B2")
- send_private_message_word(data['user_id'], "主机3B0IJ2U已启动!")
- print(data)
- # 进入群聊模式
- if data_type == "message" and data['message_type'] == "group":
- group_id = data['group_id']
- user_id = data['user_id']
- message = data['message']
- raw_message = data['raw_message']
- time_now = data['time']
- user_name = data['sender']['nickname']
- # 首先判断消息类型,如果为图片类型则存入url,否则存入csv文件中
- if message[0].get('type')=='image':
- image_data = message[0].get('data', {})
- url = image_data.get('url')
- save_group_message(group_id, user_name, f"图片内容:{url}")
- else:
- save_group_message(group_id, user_name, raw_message)
- # 然后根据命令优先级进入AI对话模式,在这里写入AI模块
- if str(raw_message).startswith("变美少女[CQ:at"):
- change2girl(raw_message, group_id, user_id)
- elif str(raw_message).startswith("变瑟图[CQ:at"):
- if group_id == 637612718:
- send_group_message_word(group_id,"本群该功能已被禁用!")
- else:
- change2setu(raw_message, group_id, user_id)
- elif my_name in raw_message:
- on_my_message(group_id, user_id, user_name, raw_message, message)
- print(
- f"收到message!群号:{group_id},发送人:{user_id},时间{time_now}\n"
- f"消息内容:{message}\n原始消息内容:{raw_message}")
- else:
- print(f"群聊{group_id}已记录新消息!")
- # 进入提示消息模式
- elif data_type == "notice":
- notice_type = data['notice_type']
- group_id = data['group_id']
- if notice_type == "group_increase":
- user_id = data['user_id']
- print(f"群成员增加!群号:{group_id},加入者:{user_id}")
- requests.post('http://127.0.0.1:3000/send_group_msg', json={
- 'group_id': group_id,
- 'message': [{
- "type": "at",
- "data": {
- "qq": f"{user_id}"
- }
- }, {
- 'type': 'text',
- 'data': {
- 'text': "欢迎新成员加入群聊喵~"
- }
- }]
- })
- elif notice_type == "notify" and data['target_id'] == data['self_id']:
- user_id = data['user_id']
- print(f"我被{user_id}戳了一下!")
- send_group_poke(group_id,user_id)
- if get_week()=="星期四":
- # 触发疯狂星期四文案
- # 生成一个0到99之间的随机整数,控制特殊文案概率为40%
- rand_num = random.randint(0, 99)
- print(rand_num)
- send_group_message_word(group_id,get_v50()) if rand_num < 30 else send_group_message_word(group_id,"₍^ >ヮ<^₎")
- else:
- send_group_message_word(group_id,"₍^ >ヮ<^₎")
- return ""
- if __name__ == '__main__':
- app.run(host='0.0.0.0', port=3100)
|