QueuemonitorService.cs 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. using Cksoft.Data;
  2. using Cksoft.Data.Repository;
  3. using Cksoft.Unity;
  4. using Cksoft.Unity.Log4NetConfig;
  5. using DllEapDal.OFILM;
  6. using DllEapEntity;
  7. using DllEapEntity.OFILM;
  8. using Microsoft.Extensions.Configuration;
  9. using Newtonsoft.Json;
  10. using System;
  11. using System.Collections.Generic;
  12. using System.Linq;
  13. using System.Text;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. namespace DllEapBll.Services
  17. {
  18. public class QueuemonitorService
  19. {
  20. private int maxQueueCount = 100;
  21. private IConfiguration Configuration;
  22. public QueuemonitorService(IConfiguration configuration)
  23. {
  24. this.Configuration = configuration;
  25. this.maxQueueCount = Convert.ToInt32(configuration["MaxQueueCount"]);
  26. }
  27. public void Monitor(IDatabase db)
  28. {
  29. string errorinfo = string.Empty;
  30. var mqs = db.FindListForCondition<MQServer>(string.Empty, ref errorinfo);
  31. if (mqs == null || mqs.Count() <= 0)
  32. {
  33. return;
  34. }
  35. int total;
  36. var configDal = new QueueAlarmConfigDal(db);
  37. var configs = configDal.Get(1, 10000, "asc", "ID", string.Empty, ref errorinfo, out total);
  38. if (configs == null || configs.Count() == 0)
  39. {
  40. return;
  41. }
  42. this.HandleAlarm(configs, mqs);
  43. LogHelper<QueuemonitorService>.LogError($"检查队列信息完成", "队列报警", string.Empty);
  44. }
  45. public void HandleAlarm(IEnumerable<QueueAlarmConfig> configs, IEnumerable<MQServer> mqs)
  46. {
  47. IList<string> errorMsgs = new List<string>();
  48. string error = string.Empty;
  49. var monitorDal = new RabbitMQMonitorDal();
  50. foreach (var item in mqs)
  51. {
  52. var infos = monitorDal.GetQueueInfo(item.IpAddress, ref error);
  53. if (infos == null || infos.Count() <= 0)
  54. {
  55. return;
  56. }
  57. // 当队列总数超过1000个时,报警
  58. if (infos.Count() > maxQueueCount)
  59. {
  60. errorMsgs.Add($"服务器[{item.FName}]的队列总数[{infos.Count()}]已超过设定的最大值[{maxQueueCount}],请及时处理");
  61. continue;
  62. }
  63. // 分析每个队列的待处理消息数是否超过设定的阈值
  64. var details = infos;
  65. foreach (var queue in details)
  66. {
  67. var config = configs.FirstOrDefault(c => c.QueueName.ToLower() == queue.name.ToLower());
  68. if (config == null)
  69. continue;
  70. if (config.Threshold < queue.messages_ready)
  71. {
  72. errorMsgs.Add($"服务器[{item.FName}]上的队列[{queue.name}]的未消费的消息数量[{queue.messages_ready}]" +
  73. $"已超过设定的阈值[{config.Threshold}],请及时处理");
  74. }
  75. }
  76. }
  77. if (errorMsgs == null || errorMsgs.Count <= 0)
  78. {
  79. return;
  80. }
  81. // 推送消息
  82. var message = string.Join(@"\n", errorMsgs);
  83. Task.Run(() =>
  84. {
  85. this.Notice(message, ref error);
  86. if (!string.IsNullOrEmpty(error))
  87. {
  88. LogHelper<QueuemonitorService>.LogError($"推送队列服务器异常信息[{message}]失败", "队列报警", string.Empty);
  89. }
  90. });
  91. }
  92. public void Notice(string content, ref string errorinfo)
  93. {
  94. var builder = new ConfigurationBuilder().AddJsonFile("appsettings.json");
  95. var configuration = builder.Build();
  96. var lanxinConfig = configuration.GetSection("LanXinConfig").Get<LanxinConfig>();
  97. if (lanxinConfig == null)
  98. {
  99. errorinfo = "配置文件中LanXin节点未找到或配置不正确";
  100. return;
  101. }
  102. var requestData = new LanxinRequestDto
  103. {
  104. Appid = lanxinConfig.Appid,
  105. Secret = lanxinConfig.Secret,
  106. Tousers = lanxinConfig.Tousers,
  107. Content = content
  108. };
  109. var json = JsonConvert.SerializeObject(requestData);
  110. var res = HttpRequestHelper<OfilmLanxinReponseDto>.Post(lanxinConfig.Url, json, ref errorinfo, "application/json");
  111. if (res == null || res.Code != 1)
  112. {
  113. errorinfo = $"推送机台[{content}]断线信息失败:{res.Message ?? "请求蓝信接口异常"}";
  114. }
  115. }
  116. public void CheckMysqlSlaveState()
  117. {
  118. string errorinfo = string.Empty;
  119. try
  120. {
  121. var slaveStr = Configuration.GetConnectionString("eapslave");
  122. using (IDatabase db = DbFactory.Base(slaveStr, DatabaseType.MySql))
  123. {
  124. var state = CheckMySql(db);
  125. if (!string.IsNullOrEmpty(state))
  126. {
  127. if (state == "自动重启同步服务")
  128. {
  129. Thread.Sleep(60 * 1000);
  130. var sql = "stop slave;";
  131. db.ExecuteBySql(sql);
  132. sql = "start slave;";
  133. db.ExecuteBySql(sql);
  134. state = CheckMySql(db);
  135. if (!string.IsNullOrEmpty(state))
  136. {
  137. this.Notice($"Mysql从库同步异常", ref errorinfo);
  138. LogHelper<QueuemonitorService>.LogError($"Mysql从库同步异常", "从库监测", string.Empty);
  139. }
  140. }
  141. else
  142. {
  143. this.Notice($"Mysql从库同步异常", ref errorinfo);
  144. LogHelper<QueuemonitorService>.LogError($"Mysql从库同步异常", "从库监测", errorinfo);
  145. }
  146. }
  147. }
  148. }
  149. catch (Exception e)
  150. {
  151. this.Notice($"Mysql从库检查失败,原因:[{e.Message}]", ref errorinfo);
  152. LogHelper<QueuemonitorService>.LogError($"Mysql从库检查失败,原因:[{e.ToString()}]", "从库监测", string.Empty);
  153. }
  154. }
  155. // 检查从库状态
  156. private string CheckMySql(IDatabase db)
  157. {
  158. var sql = "show slave status ";
  159. var dt = db.FindTable(sql);
  160. if (dt != null && dt.Rows.Count > 0)
  161. {
  162. var firstRow = dt.Rows[0];
  163. var ioRunning = firstRow["Slave_IO_Running"].ToString();
  164. var sqlRunning = firstRow["Slave_SQL_Running"].ToString();
  165. if (ioRunning.ToLower() != "yes" || sqlRunning.ToLower() != "yes")
  166. {
  167. if (firstRow["Last_SQL_Error"] != null
  168. && firstRow["Last_SQL_Error"].ToString().Contains("Slave SQL thread retried transaction 10 time(s) in vain"))
  169. {
  170. LogHelper<QueuemonitorService>.LogError($"Mysql从库检查失败,原因:[{firstRow["Last_SQL_Error"].ToString()}]", "从库监测", string.Empty);
  171. return "自动重启同步服务";
  172. }
  173. return "Mysql从库同步异常";
  174. }
  175. }
  176. return string.Empty;
  177. }
  178. }
  179. }