using Cksoft.Data; using Cksoft.Data.Repository; using Cksoft.Unity; using Cksoft.Unity.Log4NetConfig; using DllEapDal.OFILM; using DllEapEntity; using DllEapEntity.OFILM; using Microsoft.Extensions.Configuration; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace DllEapBll.Services { public class QueuemonitorService { private int maxQueueCount = 100; private IConfiguration Configuration; public QueuemonitorService(IConfiguration configuration) { this.Configuration = configuration; this.maxQueueCount = Convert.ToInt32(configuration["MaxQueueCount"]); } public void Monitor(IDatabase db) { string errorinfo = string.Empty; var mqs = db.FindListForCondition(string.Empty, ref errorinfo); if (mqs == null || mqs.Count() <= 0) { return; } int total; var configDal = new QueueAlarmConfigDal(db); var configs = configDal.Get(1, 10000, "asc", "ID", string.Empty, ref errorinfo, out total); if (configs == null || configs.Count() == 0) { return; } this.HandleAlarm(configs, mqs); LogHelper.LogError($"检查队列信息完成", "队列报警", string.Empty); } public void HandleAlarm(IEnumerable configs, IEnumerable mqs) { IList errorMsgs = new List(); string error = string.Empty; var monitorDal = new RabbitMQMonitorDal(); foreach (var item in mqs) { var infos = monitorDal.GetQueueInfo(item.IpAddress, ref error); if (infos == null || infos.Count() <= 0) { return; } // 当队列总数超过1000个时,报警 if (infos.Count() > maxQueueCount) { errorMsgs.Add($"服务器[{item.FName}]的队列总数[{infos.Count()}]已超过设定的最大值[{maxQueueCount}],请及时处理"); continue; } // 分析每个队列的待处理消息数是否超过设定的阈值 var details = infos; foreach (var queue in details) { var config = configs.FirstOrDefault(c => c.QueueName.ToLower() == queue.name.ToLower()); if (config == null) continue; if (config.Threshold < queue.messages_ready) { errorMsgs.Add($"服务器[{item.FName}]上的队列[{queue.name}]的未消费的消息数量[{queue.messages_ready}]" + $"已超过设定的阈值[{config.Threshold}],请及时处理"); } } } if (errorMsgs == null || errorMsgs.Count <= 0) { return; } // 推送消息 var message = string.Join(@"\n", errorMsgs); Task.Run(() => { this.Notice(message, ref error); if (!string.IsNullOrEmpty(error)) { LogHelper.LogError($"推送队列服务器异常信息[{message}]失败", "队列报警", string.Empty); } }); } public void Notice(string content, ref string errorinfo) { var builder = new ConfigurationBuilder().AddJsonFile("appsettings.json"); var configuration = builder.Build(); var lanxinConfig = configuration.GetSection("LanXinConfig").Get(); if (lanxinConfig == null) { errorinfo = "配置文件中LanXin节点未找到或配置不正确"; return; } var requestData = new LanxinRequestDto { Appid = lanxinConfig.Appid, Secret = lanxinConfig.Secret, Tousers = lanxinConfig.Tousers, Content = content }; var json = JsonConvert.SerializeObject(requestData); var res = HttpRequestHelper.Post(lanxinConfig.Url, json, ref errorinfo, "application/json"); if (res == null || res.Code != 1) { errorinfo = $"推送机台[{content}]断线信息失败:{res.Message ?? "请求蓝信接口异常"}"; } } public void CheckMysqlSlaveState() { string errorinfo = string.Empty; try { var slaveStr = Configuration.GetConnectionString("eapslave"); using (IDatabase db = DbFactory.Base(slaveStr, DatabaseType.MySql)) { var state = CheckMySql(db); if (!string.IsNullOrEmpty(state)) { if (state == "自动重启同步服务") { Thread.Sleep(60 * 1000); var sql = "stop slave;"; db.ExecuteBySql(sql); sql = "start slave;"; db.ExecuteBySql(sql); state = CheckMySql(db); if (!string.IsNullOrEmpty(state)) { this.Notice($"Mysql从库同步异常", ref errorinfo); LogHelper.LogError($"Mysql从库同步异常", "从库监测", string.Empty); } } else { this.Notice($"Mysql从库同步异常", ref errorinfo); LogHelper.LogError($"Mysql从库同步异常", "从库监测", errorinfo); } } } } catch (Exception e) { this.Notice($"Mysql从库检查失败,原因:[{e.Message}]", ref errorinfo); LogHelper.LogError($"Mysql从库检查失败,原因:[{e.ToString()}]", "从库监测", string.Empty); } } // 检查从库状态 private string CheckMySql(IDatabase db) { var sql = "show slave status "; var dt = db.FindTable(sql); if (dt != null && dt.Rows.Count > 0) { var firstRow = dt.Rows[0]; var ioRunning = firstRow["Slave_IO_Running"].ToString(); var sqlRunning = firstRow["Slave_SQL_Running"].ToString(); if (ioRunning.ToLower() != "yes" || sqlRunning.ToLower() != "yes") { if (firstRow["Last_SQL_Error"] != null && firstRow["Last_SQL_Error"].ToString().Contains("Slave SQL thread retried transaction 10 time(s) in vain")) { LogHelper.LogError($"Mysql从库检查失败,原因:[{firstRow["Last_SQL_Error"].ToString()}]", "从库监测", string.Empty); return "自动重启同步服务"; } return "Mysql从库同步异常"; } } return string.Empty; } } }