Эх сурвалжийг харах

添加客小爽激活数据推送队列

lcl 1 жил өмнө
parent
commit
df8b183568

+ 29 - 0
AppStart/RabbitMQClient.cs

@@ -64,6 +64,10 @@ namespace MySystem
             // channel.Dispose();
             _channel_send.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
         }
+        public void SendMsg3(string content, string QueueName)
+        {
+            _channel_kxs_send.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
+        }
         #endregion
 
         #region 单对单发送
@@ -153,6 +157,31 @@ namespace MySystem
             _channel_send.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
         }
 
+        public static IModel _channel_kxs_send;
+        public void CreateConn3(string QueueName)
+        { 
+            var factory = new ConnectionFactory()
+            {
+                UserName = UserName,
+                Password = Password,
+                AutomaticRecoveryEnabled = true,  //如果connection挂掉是否重新连接
+                TopologyRecoveryEnabled = true,  //连接恢复后,连接的交换机,队列等是否一同恢复
+                VirtualHost = VirtualHostName,
+            };
+            List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
+            string[] HostNames = HostName.Split(',');
+            foreach (string subHostName in HostNames)
+            {
+                string[] subHostNameData = subHostName.Split(':');
+                p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
+            }
+            var conn = factory.CreateConnection(p);
+            _channel_kxs_send = conn.CreateModel();
+            _channel_kxs_send.ExchangeDeclare("kxs_direct_ranch", "direct", true);
+            _channel_kxs_send.QueueDeclare(QueueName, true, false, false);
+            _channel_kxs_send.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
+        }
+
         public static IConnection _connection;
         public void CreateConn()
         { 

+ 47 - 0
AppStart/Service/KxsActService.cs

@@ -0,0 +1,47 @@
+using System;
+using System.Collections.Generic;
+using Library;
+using System.Linq;
+using System.Threading;
+using RabbitMQ.Client;
+using System.Text;
+
+namespace MySystem
+{
+    public class KxsActService
+    {
+        public readonly static KxsActService Instance = new KxsActService();
+        private KxsActService()
+        { }
+
+        public void Start()
+        {
+            Thread th = new Thread(dosomething);
+            th.IsBackground = true;
+            th.Start();
+        }
+
+        public void dosomething()
+        {
+            while (true)
+            {
+                string content = MpRedisDbconn.Instance.RPop<string>("KxsActQueue");
+                if (!string.IsNullOrEmpty(content))
+                {
+                    try
+                    {
+                        RabbitMQClient.Instance.SendMsg3(content, "QUEUE_KXS_ACT_DIVISION");
+                    }
+                    catch (Exception ex)
+                    {                        
+                        function.WriteLog(DateTime.Now.ToString() + "\r\n" + ex.ToString(), "推送客小爽激活数据异常");
+                    }
+                }
+                else
+                {
+                    Thread.Sleep(500);
+                }
+            }
+        }
+    }
+}

+ 2 - 0
Startup.cs

@@ -136,9 +136,11 @@ namespace MySystem
             //必须打开的
             RabbitMQClient.Instance.StartReceive("QUEUE_GD_ACT_DIVISION");
             RabbitMQClient.Instance.CreateConn2("QUEUE_LKB_TRADE_DIVISION");
+            RabbitMQClient.Instance.CreateConn3("QUEUE_KXS_ACT_DIVISION");
             MpOrderService.Instance.Start();
             MpMerchantActService.Instance.Start();
             KxsOrderService.Instance.Start();
+            KxsActService.Instance.Start();
         }
     }
 }