123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- using Cksoft.Data;
- using Cksoft.Data.Repository;
- using Cksoft.Unity;
- using Cksoft.Unity.Log4NetConfig;
- using DllEapDal.OFILM;
- using DllEapEntity;
- using DllEapEntity.OFILM;
- using Microsoft.Extensions.Configuration;
- using Newtonsoft.Json;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace DllEapBll.Services
- {
- public class QueuemonitorService
- {
- private int maxQueueCount = 100;
- private IConfiguration Configuration;
- public QueuemonitorService(IConfiguration configuration)
- {
- this.Configuration = configuration;
- this.maxQueueCount = Convert.ToInt32(configuration["MaxQueueCount"]);
- }
- public void Monitor(IDatabase db)
- {
- string errorinfo = string.Empty;
- var mqs = db.FindListForCondition<MQServer>(string.Empty, ref errorinfo);
- if (mqs == null || mqs.Count() <= 0)
- {
- return;
- }
- int total;
- var configDal = new QueueAlarmConfigDal(db);
- var configs = configDal.Get(1, 10000, "asc", "ID", string.Empty, ref errorinfo, out total);
- if (configs == null || configs.Count() == 0)
- {
- return;
- }
- this.HandleAlarm(configs, mqs);
- LogHelper<QueuemonitorService>.LogError($"检查队列信息完成", "队列报警", string.Empty);
- }
- public void HandleAlarm(IEnumerable<QueueAlarmConfig> configs, IEnumerable<MQServer> mqs)
- {
- IList<string> errorMsgs = new List<string>();
- string error = string.Empty;
- var monitorDal = new RabbitMQMonitorDal();
- foreach (var item in mqs)
- {
- var infos = monitorDal.GetQueueInfo(item.IpAddress, ref error);
- if (infos == null || infos.Count() <= 0)
- {
- return;
- }
- // 当队列总数超过1000个时,报警
- if (infos.Count() > maxQueueCount)
- {
- errorMsgs.Add($"服务器[{item.FName}]的队列总数[{infos.Count()}]已超过设定的最大值[{maxQueueCount}],请及时处理");
- continue;
- }
- // 分析每个队列的待处理消息数是否超过设定的阈值
- var details = infos;
- foreach (var queue in details)
- {
- var config = configs.FirstOrDefault(c => c.QueueName.ToLower() == queue.name.ToLower());
- if (config == null)
- continue;
- if (config.Threshold < queue.messages_ready)
- {
- errorMsgs.Add($"服务器[{item.FName}]上的队列[{queue.name}]的未消费的消息数量[{queue.messages_ready}]" +
- $"已超过设定的阈值[{config.Threshold}],请及时处理");
- }
- }
- }
- if (errorMsgs == null || errorMsgs.Count <= 0)
- {
- return;
- }
- // 推送消息
- var message = string.Join(@"\n", errorMsgs);
- Task.Run(() =>
- {
- this.Notice(message, ref error);
- if (!string.IsNullOrEmpty(error))
- {
- LogHelper<QueuemonitorService>.LogError($"推送队列服务器异常信息[{message}]失败", "队列报警", string.Empty);
- }
- });
- }
- public void Notice(string content, ref string errorinfo)
- {
- var builder = new ConfigurationBuilder().AddJsonFile("appsettings.json");
- var configuration = builder.Build();
- var lanxinConfig = configuration.GetSection("LanXinConfig").Get<LanxinConfig>();
- if (lanxinConfig == null)
- {
- errorinfo = "配置文件中LanXin节点未找到或配置不正确";
- return;
- }
- var requestData = new LanxinRequestDto
- {
- Appid = lanxinConfig.Appid,
- Secret = lanxinConfig.Secret,
- Tousers = lanxinConfig.Tousers,
- Content = content
- };
- var json = JsonConvert.SerializeObject(requestData);
- var res = HttpRequestHelper<OfilmLanxinReponseDto>.Post(lanxinConfig.Url, json, ref errorinfo, "application/json");
- if (res == null || res.Code != 1)
- {
- errorinfo = $"推送机台[{content}]断线信息失败:{res.Message ?? "请求蓝信接口异常"}";
- }
- }
- public void CheckMysqlSlaveState()
- {
- string errorinfo = string.Empty;
- try
- {
- var slaveStr = Configuration.GetConnectionString("eapslave");
- using (IDatabase db = DbFactory.Base(slaveStr, DatabaseType.MySql))
- {
- var state = CheckMySql(db);
- if (!string.IsNullOrEmpty(state))
- {
- if (state == "自动重启同步服务")
- {
- Thread.Sleep(60 * 1000);
- var sql = "stop slave;";
- db.ExecuteBySql(sql);
- sql = "start slave;";
- db.ExecuteBySql(sql);
- state = CheckMySql(db);
- if (!string.IsNullOrEmpty(state))
- {
- this.Notice($"Mysql从库同步异常", ref errorinfo);
- LogHelper<QueuemonitorService>.LogError($"Mysql从库同步异常", "从库监测", string.Empty);
- }
- }
- else
- {
- this.Notice($"Mysql从库同步异常", ref errorinfo);
- LogHelper<QueuemonitorService>.LogError($"Mysql从库同步异常", "从库监测", errorinfo);
- }
- }
- }
- }
- catch (Exception e)
- {
- this.Notice($"Mysql从库检查失败,原因:[{e.Message}]", ref errorinfo);
- LogHelper<QueuemonitorService>.LogError($"Mysql从库检查失败,原因:[{e.ToString()}]", "从库监测", string.Empty);
- }
- }
- // 检查从库状态
- private string CheckMySql(IDatabase db)
- {
- var sql = "show slave status ";
- var dt = db.FindTable(sql);
- if (dt != null && dt.Rows.Count > 0)
- {
- var firstRow = dt.Rows[0];
- var ioRunning = firstRow["Slave_IO_Running"].ToString();
- var sqlRunning = firstRow["Slave_SQL_Running"].ToString();
- if (ioRunning.ToLower() != "yes" || sqlRunning.ToLower() != "yes")
- {
- if (firstRow["Last_SQL_Error"] != null
- && firstRow["Last_SQL_Error"].ToString().Contains("Slave SQL thread retried transaction 10 time(s) in vain"))
- {
- LogHelper<QueuemonitorService>.LogError($"Mysql从库检查失败,原因:[{firstRow["Last_SQL_Error"].ToString()}]", "从库监测", string.Empty);
- return "自动重启同步服务";
- }
- return "Mysql从库同步异常";
- }
- }
- return string.Empty;
- }
- }
- }
|