浏览代码

添加机具数据推送队列

lcl 1 年之前
父节点
当前提交
fffeef0023
共有 3 个文件被更改,包括 109 次插入0 次删除
  1. 30 0
      AppStart/RabbitMQClient.cs
  2. 78 0
      AppStart/Service/KxsPosPushService.cs
  3. 1 0
      Startup.cs

+ 30 - 0
AppStart/RabbitMQClient.cs

@@ -70,6 +70,11 @@ namespace MySystem
             _channel_kxs_send.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
             _channel_kxs_send.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
             function.WriteLog(DateTime.Now.ToString() + "\n" + QueueName + "\n" + content + "\n\n\n", "SendMsg3");
             function.WriteLog(DateTime.Now.ToString() + "\n" + QueueName + "\n" + content + "\n\n\n", "SendMsg3");
         }
         }
+        public void PushPosData(string content, string QueueName)
+        {
+            _channel_kxs_push.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
+            function.WriteLog(DateTime.Now.ToString() + "\n" + QueueName + "\n" + content + "\n\n\n", "PushPosData");
+        }
         #endregion
         #endregion
 
 
         #region 单对单发送
         #region 单对单发送
@@ -184,6 +189,31 @@ namespace MySystem
             _channel_kxs_send.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
             _channel_kxs_send.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
         }
         }
 
 
+        public static IModel _channel_kxs_push;
+        public void CreatePushConn(string QueueName)
+        { 
+            var factory = new ConnectionFactory()
+            {
+                UserName = UserName,
+                Password = Password,
+                AutomaticRecoveryEnabled = true,  //如果connection挂掉是否重新连接
+                TopologyRecoveryEnabled = true,  //连接恢复后,连接的交换机,队列等是否一同恢复
+                VirtualHost = VirtualHostName,
+            };
+            List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
+            string[] HostNames = HostName.Split(',');
+            foreach (string subHostName in HostNames)
+            {
+                string[] subHostNameData = subHostName.Split(':');
+                p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
+            }
+            var conn = factory.CreateConnection(p);
+            _channel_kxs_push = conn.CreateModel();
+            _channel_kxs_push.ExchangeDeclare("kxs_direct_ranch", "direct", true);
+            _channel_kxs_push.QueueDeclare(QueueName, true, false, false);
+            _channel_kxs_push.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
+        }
+
         public static IConnection _connection;
         public static IConnection _connection;
         public void CreateConn()
         public void CreateConn()
         { 
         { 

+ 78 - 0
AppStart/Service/KxsPosPushService.cs

@@ -0,0 +1,78 @@
+using System;
+using System.Collections.Generic;
+using Library;
+using System.Linq;
+using System.Threading;
+using RabbitMQ.Client;
+using System.Text;
+
+namespace MySystem
+{
+    public class KxsPosPushService
+    {
+        public readonly static KxsPosPushService Instance = new KxsPosPushService();
+        private KxsPosPushService()
+        { }
+
+        public void Start()
+        {
+            Thread th = new Thread(dosomething);
+            th.IsBackground = true;
+            th.Start();
+        }
+
+        public void dosomething()
+        {
+            while (true)
+            {
+                string content = RedisDbconn.Instance.RPop<string>("KxsPosDataQueue");
+                if (!string.IsNullOrEmpty(content))
+                {
+                    try
+                    {
+                        RabbitMQClient.Instance.PushPosData(content, "");
+                    }
+                    catch (Exception ex)
+                    {                        
+                        function.WriteLog(DateTime.Now.ToString() + "\r\n" + ex.ToString(), "推送客小爽机具数据异常");
+                    }
+                }
+                else
+                {
+                    Thread.Sleep(500);
+                }
+            }
+        }
+
+
+        public void StartMp()
+        {
+            Thread th = new Thread(dosomethingMp);
+            th.IsBackground = true;
+            th.Start();
+        }
+
+        public void dosomethingMp()
+        {
+            while (true)
+            {
+                string content = MpRedisDbconn.Instance.RPop<string>("KxsActQueue");
+                if (!string.IsNullOrEmpty(content))
+                {
+                    try
+                    {
+                        RabbitMQClient.Instance.SendMsg3(content, "QUEUE_KXS_ACT_DIVISION");
+                    }
+                    catch (Exception ex)
+                    {                        
+                        function.WriteLog(DateTime.Now.ToString() + "\r\n" + ex.ToString(), "推送客小爽码牌激活数据异常");
+                    }
+                }
+                else
+                {
+                    Thread.Sleep(500);
+                }
+            }
+        }
+    }
+}

+ 1 - 0
Startup.cs

@@ -137,6 +137,7 @@ namespace MySystem
             RabbitMQClient.Instance.StartReceive("QUEUE_GD_ACT_DIVISION");
             RabbitMQClient.Instance.StartReceive("QUEUE_GD_ACT_DIVISION");
             RabbitMQClient.Instance.CreateConn2("QUEUE_LKB_TRADE_DIVISION");
             RabbitMQClient.Instance.CreateConn2("QUEUE_LKB_TRADE_DIVISION");
             RabbitMQClient.Instance.CreateConn3("QUEUE_KXS_ACT_DIVISION");
             RabbitMQClient.Instance.CreateConn3("QUEUE_KXS_ACT_DIVISION");
+            // RabbitMQClient.Instance.CreatePushConn("");
             MpOrderService.Instance.Start();
             MpOrderService.Instance.Start();
             MpMerchantActService.Instance.Start();
             MpMerchantActService.Instance.Start();
             KxsOrderService.Instance.Start();
             KxsOrderService.Instance.Start();