check_stat.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import time
  2. import requests
  3. from msg_send_save import send_feishu
  4. # 服务状态检查间隔(秒)
  5. CHECK_INTERVAL = 120 # 2分钟
  6. # 心跳超时时间(秒)
  7. HEARTBEAT_TIMEOUT = 120 # 2分钟
  8. # 服务状态跟踪变量
  9. is_service_online = True # 初始假设服务在线
  10. last_status_change_time = time.time() # 上次状态变更时间
  11. def check_service_status():
  12. """
  13. 检查服务状态
  14. Returns:
  15. bool: True表示服务在线,False表示服务离线
  16. """
  17. try:
  18. response = requests.get('http://127.0.0.1:3100/stat', timeout=10)
  19. if response.status_code == 200:
  20. data = response.json()
  21. last_heartbeat = data.get('heartbeat', 0)
  22. # 检查最近一次心跳时间是否在超时范围内
  23. if time.time() - last_heartbeat <= HEARTBEAT_TIMEOUT:
  24. return True
  25. else:
  26. return False
  27. else:
  28. return False
  29. except Exception as e:
  30. print(f"检查服务状态时出错: {e}")
  31. return False
  32. def notify_status_change(online):
  33. """
  34. 通知服务状态变更
  35. Args:
  36. online (bool): True表示服务上线,False表示服务下线
  37. """
  38. if online:
  39. message = "【服务状态通知】QQ机器人服务已重新上线"
  40. else:
  41. message = "【服务状态通知】QQ机器人服务已下线,请及时处理"
  42. send_feishu(message)
  43. print(message)
  44. def main():
  45. """
  46. 主循环,定时检查服务状态
  47. """
  48. global is_service_online, last_status_change_time
  49. print("开始监控服务状态...")
  50. while True:
  51. try:
  52. # 检查当前服务状态
  53. current_status = check_service_status()
  54. # 如果状态发生变化
  55. if current_status != is_service_online:
  56. # 更新状态和变更时间
  57. is_service_online = current_status
  58. last_status_change_time = time.time()
  59. # 通知状态变更
  60. notify_status_change(current_status)
  61. else:
  62. # 状态未变化,打印当前状态
  63. status_text = "在线" if is_service_online else "离线"
  64. # 获取当前时间的时间元组
  65. current_time = time.localtime()
  66. # 格式化输出年月日时分秒
  67. formatted_time = time.strftime("%Y年%m月%d日 %H:%M:%S", current_time)
  68. print(f"{formatted_time} 服务状态: {status_text}")
  69. # 等待下次检查
  70. time.sleep(CHECK_INTERVAL)
  71. except KeyboardInterrupt:
  72. print("监控程序已退出")
  73. break
  74. except Exception as e:
  75. print(f"监控过程中发生错误: {e}")
  76. time.sleep(CHECK_INTERVAL)
  77. if __name__ == "__main__":
  78. main()