using Cksoft.Data; using Cksoft.Data.Repository; using Cksoft.Unity; using DllEapEntity; using DllHsms; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Configuration; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.IO; using System.Linq; namespace DllEapBll.SignalR { public class LogHandler { public LogHandler(IConfiguration configuration, IHubContext hubContext) { this.hubContext = hubContext; // channel = connection.CreateModel(); } IHubContext hubContext; private IList MessageQueues = new List(); IDictionary> macConnections = new Dictionary>(); IList factories = new List(); IDictionary channels = new Dictionary(); public async void Startdll(int macId, string connectionId, string errorinfo) { try { var server = this.GetCurrentMqServer(macId); if (server == null) return; var dic = new Dictionary(); dic.Add("ConnectionId", connectionId); dic.Add("MqServerCode", server.FCode); macConnections.Add(macId, dic); //if (MessageQueues.Any(c => c == server.FCode)) //{ // return; //} int result = StartRecQueue(server, macId, connectionId, errorinfo); if (result < 0) { errorinfo = $"启动失败,错误信息为:{errorinfo}"; return; } } catch (Exception ex) { errorinfo = ex.Message.ToString(); return; } } ////开始接收队列线程 public int StartRecQueue(MQServer server, int macId, string connectionId, string errorinfo) { try { //注册通道 var channel = RegeditChannel(server, macId, errorinfo); if (channel == null) { errorinfo = "注册队列通道发生错误,错误信息为:" + errorinfo; return -1; } channels.Add(connectionId, channel); //同样要声明交换机的类型及名称,不然publish和consumer匹配不上 // exchange 交换机名称 type交换机类型fanout、direct、topic channel.ExchangeDeclare(exchange: "LogExchage", type: "fanout", autoDelete: true ); //声明一个队列,这个队列的名称随机 var queueName = channel.QueueDeclare().QueueName; //将这个队列绑定(bind)到交换机上面 channel.QueueBind(queue: queueName, exchange: "LogExchage", routingKey: ""); //CurrQueueChannel.QueueDeclare(queueName, false, false, false, null); //BasicQos函数是针对通道的,必须放在注册消费者前执行,否则注册的消费者会接受多条信息 // 第一个参数可接受消息的大小 第二个参数处理消息最大数量 第三个参数是不是针对整个Connection channel.BasicQos(0, 1, false); //生命一个consumer var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: queueName, autoAck: false, //是否不要手动应答(no manual Ack),ture自动应答,自动删除处理消息;false手动应答,服务器的消息会等待应答结果才消除 consumer: consumer); consumer.Received += (model, ea) => { string error = ""; WriteLog(macId, connectionId, ea.Body, error); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; MessageQueues.Add(server.FCode); return 1; } catch (Exception ex) { errorinfo = ex.Message.ToString(); return -1; } } private IModel RegeditChannel(MQServer server, int macId, string errorinfo) { try { var currFac = factories.FirstOrDefault(c => c.HostName == server.FName); IModel channel = null; //if (currFac == null) //{ var factory = new ConnectionFactory(); factory.AutomaticRecoveryEnabled = true; factory.HostName = server.IpAddress; factory.UserName = server.FUser; factory.Password = server.FPasswd; // factories.Add(factory); currFac = factory; var connection = currFac.CreateConnection(); channel = connection.CreateModel(); //} return channel; } catch (Exception ex) { errorinfo = ex.Message.ToString(); return null; } } private async void WriteLog(int macId, string connectionId, byte[] datas, string errorinfo) { var response = new EapResponse() { Code = 1, Msg = string.Empty }; try { HsmsLog log = EntityHelper.DeSerializeBytes(datas); if (log != null && log.MacID == macId) { //if (!macConnections.ContainsKey(log.MacID)) //{ // return; //} // var connectionId = macConnections[log.MacID]["ConnectionId"]; int ftype = log.FType; int mcaid = log.MacID; string logstr = log.Log; string fcode = log.MacCode; Block orgblock = log.OrgBlock;//通讯块 if (orgblock != null) { if (orgblock.GetBlockS(ref errorinfo) == 0) { return; } } DateTime occurtime = log.OccurTime; var logStr = $"发生时间:{occurtime.ToString("yyyy-MM-dd HH:mm:ss.fff")} 机台ID:{mcaid} 机台编号:{fcode} 信息:{logstr}\r\n"; if (orgblock != null) { //打印指令数据 var newLog = orgblock.GetLog(log.Log); logStr += newLog; } logStr = logStr.Replace("<", "<").Replace(">", ">") .Replace("\r\n", "
").Replace(" ", " "); log.Log = logStr; response.Data = log; await hubContext.Clients.Client(connectionId).SendAsync("ReceiveUpdate", response); } return; } catch (Exception ex) { errorinfo = ex.Message.ToString(); return; } } private string GetLogFile(int mcaid, string code, ref string errorinfo) { try { string CurrFileDir = "";//AppConfigurtaionServices.Configuration["FileDir"]; //每天1个日志 string sFilePath = $"{CurrFileDir}{Path.DirectorySeparatorChar}log{DateTime.Now.ToString("yyyyMMdd")}";// CurrFileDir + "log" + DateTime.Now.ToString("yyyyMMdd"); string sFileName = $"{code}_{mcaid}.log"; sFileName = sFilePath + Path.DirectorySeparatorChar + sFileName; //文件的绝对路径 if (!Directory.Exists(sFilePath))//验证路径是否存在 { return ""; //Directory.CreateDirectory(sFilePath); //不存在则创建 } return sFileName; } catch (Exception ex) { errorinfo = ex.Message.ToString(); return ""; } } public string getstrline(string url) { try { using (StreamReader sr = new StreamReader(url)) { string line; // 从文件读取并显示行,直到文件的末尾 line = sr.ReadLine(); return line; } } catch (Exception ex) { return ""; } } /// /// 根据MACID获取对应的队列服务器信息 /// /// /// private MQServer GetCurrentMqServer(int macId) { using (IDatabase db = DbFactory.Base("eapslave")) { var sql = $@"select * from mqserver where id = ( select mstid from mqserverdetail where APServerID=( select eapappserverid from eapappservermac where macid={macId}))"; var mqServer = db.FindList(sql).FirstOrDefault(); return mqServer; } } public void Dispose(string connectionId) { if (channels.ContainsKey(connectionId)) { var channel = channels[connectionId]; if (channel != null) { channel.Close(); channel.Dispose(); channels.Remove(connectionId); } } } } }