浏览代码

通过MQ推送SP原始数据

lichunlei 4 月之前
父节点
当前提交
700cf33852

二进制
AppStart/.DS_Store


+ 3 - 3
AppStart/Helper/CheckWifiData.cs

@@ -119,7 +119,7 @@ public class CheckWifiData
                 }
                 catch(Exception ex)
                 {
-                    function.WriteLog(DateTime.Now.ToString() + "\n" + ex.ToString(), "接收云长WIFI设备绑定异常");
+                    Utils.WriteLog(DateTime.Now.ToString() + "\n" + ex.ToString(), "接收云长WIFI设备绑定异常");
                 }
             }
             else
@@ -246,7 +246,7 @@ public class CheckWifiData
                 }
                 catch(Exception ex)
                 {
-                    function.WriteLog(DateTime.Now.ToString() + "\n" + ex.ToString(), "接收云长WIFI设备交易异常");
+                    Utils.WriteLog(DateTime.Now.ToString() + "\n" + ex.ToString(), "接收云长WIFI设备交易异常");
                 }
             }
             else
@@ -352,7 +352,7 @@ public class CheckWifiData
                 }
                 catch(Exception ex)
                 {
-                    function.WriteLog(DateTime.Now.ToString() + "\n" + ex.ToString(), "接收来量吧WIFI设备交易异常");
+                    Utils.WriteLog(DateTime.Now.ToString() + "\n" + ex.ToString(), "接收来量吧WIFI设备交易异常");
                 }
             }
             else

+ 1 - 1
AppStart/Helper/LogHelper.cs

@@ -14,7 +14,7 @@ public class LogHelper
 
     public void WriteLog(string Content, string FileName, string BrandId = "0")
     {
-        // function.WriteLog(Content, FileName);
+        // Utils.WriteLog(Content, FileName);
         Dictionary<string, string> dic = new Dictionary<string, string>();
         dic.Add("Topic", FileName);
         dic.Add("Content", Content);

+ 30 - 1
AppStart/Helper/PublicImportDataService.cs

@@ -162,7 +162,7 @@ namespace MySystem
                         }
                         catch (Exception ex)
                         {
-                            function.WriteLog(DateTime.Now.ToString() + "\n" + data + "\n" + ex.ToString(), key + "队列异常");
+                            Utils.WriteLog(DateTime.Now.ToString() + "\n" + data + "\n" + ex.ToString(), key + "队列异常");
                             Start(jobInfo);
                             Thread.Sleep(60000);
                         }
@@ -495,6 +495,7 @@ namespace MySystem
                 }
                 string sql = "insert into " + brand.TargetTableName + " (" + fields + ") values (" + fieldvals + ");select @@IDENTITY";
                 dbconn.op(sql);
+                PushData(brand.TargetTableName, fields, fieldvals);
                 if(brand.TargetTableName == "BindRecord")
                 {
                     string[] fieldList = fields.Split(',');
@@ -558,6 +559,7 @@ namespace MySystem
                     }
                     sql = "insert into Merchants (" + mer_field + ") values (" + mer_val + ");";
                     dbconn.op(sql);
+                    PushData("Merchants", mer_field, mer_val);
                 }
                 DataTable result = dbconn.dtable("select Id from " + brand.TargetTableName + " order by Id desc limit 1");
                 if (result.Rows.Count > 0)
@@ -568,6 +570,33 @@ namespace MySystem
             return "0";
         }
 
+        private void PushData(string tableName, string fields, string fieldvals)
+        {
+            try
+            {
+                Dictionary<string, string> spData = new Dictionary<string, string>();
+                Dictionary<string, string> mainData = new Dictionary<string, string>();
+                string[] fieldList = fields.Split(',');
+                if(fieldList.Length > 0)
+                {
+                    string[] fieldValList = fieldvals.Split(',');
+                    for (int i = 0; i < fieldList.Length; i++)
+                    {
+                        string fieldItem = fieldList[i];
+                        string fieldValItem = fieldValList[i].Trim('\'');
+                        spData.Add(fieldItem.Substring(0, 1).ToLower() + fieldItem.Substring(1), fieldValItem);
+                        mainData.Add(fieldItem, fieldValItem);
+                    }
+                }
+                RabbitMQClient2.Instance.Push("SpDataQueue", tableName + "#cut#" + Newtonsoft.Json.JsonConvert.SerializeObject(spData));
+                RabbitMQClient2.Instance.Push("MainDataQueue", tableName + "#cut#" + Newtonsoft.Json.JsonConvert.SerializeObject(mainData));
+            }
+            catch(Exception ex)
+            {
+                Utils.WriteLog(ex.ToString(), "通过MQ发送SP原始数据异常");
+            }
+        }
+
 
 
 

+ 1 - 1
AppStart/Helper/SourceDataToDb.cs

@@ -45,7 +45,7 @@ public class SourceDataToDb
                 }
                 catch(Exception ex)
                 {
-                    function.WriteLog(DateTime.Now.ToString() + "\n" + ex.ToString(), "补原始数据异常");
+                    Utils.WriteLog(DateTime.Now.ToString() + "\n" + ex.ToString(), "补原始数据异常");
                 }
             }
             else

+ 100 - 0
AppStart/RabbitMQClient2.cs

@@ -0,0 +1,100 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
+using Library;
+using System.Threading;
+
+namespace MySystem
+{
+    public class RabbitMQClient2
+    {
+        public readonly static RabbitMQClient2 Instance = new RabbitMQClient2();
+        string UserName,Password,HostName,VirtualHostName;
+        private RabbitMQClient2()
+        {
+            UserName = ConfigurationManager.AppSettings["MqUserName"].ToString();
+            Password = ConfigurationManager.AppSettings["MqPassword"].ToString();
+            HostName = ConfigurationManager.AppSettings["MqHostName"].ToString();
+            VirtualHostName = ConfigurationManager.AppSettings["MqVirtualHostName"].ToString();
+        }
+
+        public static IConnection _connection;
+        public void CreateConn()
+        { 
+            var factory = new ConnectionFactory()
+            {
+                HostName = HostName,
+                UserName = UserName,
+                Password = Password,
+                VirtualHost = VirtualHostName
+            };
+            _connection = factory.CreateConnection();
+        }
+
+        #region 单对单接收
+
+        public void StartReceive(string QueueName, Action<string> CallBack)
+        {
+            if (_connection == null)
+            {
+                CreateConn();
+            }
+            else if (!_connection.IsOpen)
+            {
+                CreateConn();
+            }
+
+            var consumer = new EventingBasicConsumer(_channel[QueueName]);
+            consumer.Received += (model, ea) =>
+            {
+                var body = ea.Body.ToArray();
+                //获取接收的数据
+                var message = Encoding.UTF8.GetString(body);
+
+                // 模拟消息处理逻辑
+                try
+                {
+                    CallBack(message);
+                    // 手动确认消息
+                    _channel[QueueName].BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
+                }
+                catch (Exception ex)
+                {
+                    // 如果处理失败,可以选择拒绝消息或重新入队
+                    _channel[QueueName].BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
+                    Utils.WriteLog(ex.ToString(), "MQ异常");
+                }
+            };
+
+            // 设置 autoAck = false
+            _channel[QueueName].BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
+        }
+        #endregion
+
+        #region 单对单发送
+        public Dictionary<string, IModel> _channel = new Dictionary<string, IModel>();
+        public void Conn(string QueueName)
+        {
+            if (_connection == null)
+            {
+                CreateConn();
+            }
+            else if (!_connection.IsOpen)
+            {
+                CreateConn();
+            }
+            var channel = _connection.CreateModel();
+            channel.ExchangeDeclare("kxs_direct_ranch", "direct", true);
+            channel.QueueDeclare(QueueName, true, false, false);
+            channel.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
+            if(!_channel.ContainsKey(QueueName)) _channel.Add(QueueName, channel);
+        }
+        public void Push(string QueueName, string Content)
+        {
+            _channel[QueueName].BasicPublish("", QueueName, null, Encoding.Default.GetBytes(Content));
+        }
+        #endregion
+    }
+}

+ 16 - 0
AppStart/Utils.cs

@@ -0,0 +1,16 @@
+using System;
+
+public class Utils
+{
+    
+    #region 打控制台日志
+
+    public static void WriteLog(string msg, string filename = "message")
+    {
+        Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + "--" + filename + "--" + msg);
+    }
+
+    #endregion
+
+
+}

+ 2 - 2
Areas/Api/Controllers/v1/Kq3Controller.cs

@@ -101,7 +101,7 @@ namespace MySystem.Areas.Api.Controllers
             }
             string content = Newtonsoft.Json.JsonConvert.SerializeObject(Request.Headers) + "#cut#" + requestMes;
             // LogHelper.Instance.WriteLog(content, "接收WIFI数据-" + Path + "通知");
-            function.WriteLog(content, "接收WIFI数据-" + Path + "通知");
+            Utils.WriteLog(content, "接收WIFI数据-" + Path + "通知");
             RedisDbconn.Instance.AddList("wifi_data_list", Path + "#cut#" + requestMes);
             Dictionary<string, object> dic = new Dictionary<string, object>();
             dic.Add("code", 200);
@@ -124,7 +124,7 @@ namespace MySystem.Areas.Api.Controllers
             }
             string content = Newtonsoft.Json.JsonConvert.SerializeObject(Request.Headers) + "#cut#" + requestMes;
             LogHelper.Instance.WriteLog(DateTime.Now.ToString() + content, "联动掌中宝推送数据", "34");
-            // function.WriteLog(DateTime.Now.ToString() + content, "联动掌中宝推送数据");
+            // Utils.WriteLog(DateTime.Now.ToString() + content, "联动掌中宝推送数据");
             JsonData jsonObj = JsonMapper.ToObject(requestMes);
             string serviceType = jsonObj["serviceType"].ToString();
             if(serviceType == "MATERIAL_BIND_STATUS_NOTIFY") RedisDbconn.Instance.AddList("kxs_bind_list_34", content);

+ 3 - 0
Startup.cs

@@ -226,6 +226,9 @@ namespace MySystem
             LogHelper.Instance.Start();
             CheckWifiData.Instance.Start(); //解析wifi设备通知数据
             SourceDataToDb.Instance.Start(); //补原始数据
+            
+            RabbitMQClient2.Instance.Conn("SpDataQueue");
+            RabbitMQClient2.Instance.Conn("MainDataQueue");
             // 必须执行
         }
     }