Ver Fonte

推送wifi数据每月交易额延时队列

lcl há 6 meses atrás
pai
commit
c411164cd6

+ 34 - 0
AppStart/Helper/SycnSpServer/SycnSpTradeWifiService.cs

@@ -142,6 +142,9 @@ namespace MySystem
                                     {
                                         StatTradeAfter(db, pos.BuyUserId, pos.BrandId, StartMonth.ToString("yyyyMM"), TradeAmt);
                                     }
+                                    //推送延时队列
+                                    // TimeSpan ts = DateTime.Now - StartMonth;
+                                    // RabbitMQClientV2.Instance.SendMsg("{\"pos_sn\":\"" + pos.PosSn + "\",\"amt\":\"" + TradeAmt + "\"}", "KXS_WIFI_TRADE_QUEUE", (uint)ts.TotalMilliseconds);
                                 }
                             }
 
@@ -334,5 +337,36 @@ namespace MySystem
                 }
             }
         }
+
+        //推送wifi每月交易到java
+        public void PushMsgToJava(string MsgContent)
+        {
+            PxcModels.WebCMSEntities db = new PxcModels.WebCMSEntities();
+            try
+            {
+                JsonData jsonObj = JsonMapper.ToObject(MsgContent);
+                string PosSn = jsonObj["pos_sn"].ToString();
+                decimal TradeAmount = decimal.Parse(jsonObj["amt"].ToString());
+                PxcModels.PosMachinesTwo pos = db.PosMachinesTwo.FirstOrDefault(m => m.PosSn == PosSn) ?? new PxcModels.PosMachinesTwo();
+                if(pos.Status > -1 && pos.BuyUserId > 0 && pos.ActivationState == 1)
+                {
+                    PxcModels.PosMerchantInfo mer = db.PosMerchantInfo.FirstOrDefault(m => m.Id == pos.BindMerchantId) ?? new PxcModels.PosMerchantInfo();
+                    PosPushDataNewHelper.Trade(new TradeRecord()
+                    {
+                        TradeSnNo = pos.PosSn,
+                        MerNo = mer.KqMerNo,
+                        TradeSerialNo = Guid.NewGuid().ToString(),
+                        TradeAmount = TradeAmount,
+                        CreateDate = DateTime.Now,
+                        ProductType = pos.BrandId.ToString(),
+                    });
+                }
+            }
+            catch(Exception ex)
+            {
+                function.WriteLog(DateTime.Now.ToString() + "\n" + ex.ToString(), "推送wifi每月交易到java异常");
+            }
+            db.Dispose();
+        }
     }
 }

+ 90 - 316
AppStart/RabbitMQClientV2.cs

@@ -1,347 +1,121 @@
-using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Text;
 using RabbitMQ.Client;
 using RabbitMQ.Client.Events;
-using System;
-using System.Text;
+using Library;
+using System.Threading;
+using LitJson;
 
 namespace MySystem
 {
-    class ConfigModel
-    {
-    }
-
-    public enum ExchangeTypeEnum
-    {
-        /// <summary>
-        /// 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
-        /// 很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
-        /// </summary>
-        fanout = 1,
-
-        /// <summary>
-        /// 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配
-        /// 。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,
-        /// 则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
-        /// </summary>
-        direct = 2,
-
-        /// <summary>
-        /// 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
-        /// 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
-        /// 因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
-        /// </summary>
-        topic = 3,
-
-        header = 4
-    }
-
-
-    /// <summary>
-    /// 数据被执行后的处理方式
-    /// </summary>
-    public enum ProcessingResultsEnum
-    {
-        /// <summary>
-        /// 处理成功
-        /// </summary>
-        Accept,
-
-        /// <summary>
-        /// 可以重试的错误
-        /// </summary>
-        Retry,
-
-        /// <summary>
-        /// 无需重试的错误
-        /// </summary>
-        Reject,
-    }
-
-    /// <summary>
-    /// 消息队列的配置信息
-    /// </summary>
-    public class RabbitMqConfigModel
-    {
-        #region host
-        /// <summary>
-        /// 服务器IP地址
-        /// </summary>
-        public string IP { get; set; }
-
-        /// <summary>
-        /// 服务器端口,默认是 5672
-        /// </summary>
-        public int Port { get; set; }
-
-        /// <summary>
-        /// 登录用户名
-        /// </summary>
-        public string UserName { get; set; }
-
-        /// <summary>
-        /// 登录密码
-        /// </summary>
-        public string Password { get; set; }
-        /// <summary>
-        /// 虚拟主机名称
-        /// </summary>
-        public string VirtualHost { get; set; }
-        #endregion
-
-        #region Queue
-        /// <summary>
-        /// 队列名称
-        /// </summary>
-        public string QueueName { get; set; }
-
-        /// <summary>
-        /// 是否持久化该队列
-        /// </summary>
-        public bool DurableQueue { get; set; }
-        #endregion
-
-        #region exchange
-        /// <summary>
-        /// 路由名称
-        /// </summary>
-        public string ExchangeName { get; set; }
-
-        /// <summary>
-        /// 路由的类型枚举
-        /// </summary>
-        public ExchangeTypeEnum ExchangeType { get; set; }
-
-        /// <summary>
-        /// 路由的关键字
-        /// </summary>
-        public string RoutingKey { get; set; }
-
-        #endregion
-
-        #region message
-        /// <summary>
-        /// 是否持久化队列中的消息
-        /// </summary>
-        public bool DurableMessage { get; set; }
-        #endregion
-    }
-    /// <summary>
-    /// 基类
-    /// </summary>
-    public class BaseService
+    public class RabbitMQClientV2
     {
+        public readonly static RabbitMQClientV2 Instance = new RabbitMQClientV2();
+        string UserName,Password,HostName,VirtualHostName;
+        private RabbitMQClientV2()
+        {
+            UserName = ConfigurationManager.AppSettings["MqUserName"].ToString();
+            Password = ConfigurationManager.AppSettings["MqPassword"].ToString();
+            HostName = ConfigurationManager.AppSettings["MqHostName"].ToString();
+            VirtualHostName = ConfigurationManager.AppSettings["MqVirtualHostName"].ToString();
+        }
 
-        public static IConnection _connection;
-
-        /// <summary>
-        /// 服务器配置
-        /// </summary>
-        public RabbitMqConfigModel RabbitConfig { get; set; }
-
-
-        #region 构造函数
-        /// <summary>
-        /// 构造函数
-        /// </summary>
-        /// <param name="config"></param>
-        public BaseService(RabbitMqConfigModel config)
+        #region 单对单发送
+        public void SendMsg(string content, string QueueName, uint delayMilliseconds = 0)
         {
-            try
+            if(delayMilliseconds > 0)
             {
-                RabbitConfig = config;
-                CreateConn();
+                // 设置消息的延迟时间
+                var properties = _channel_send.CreateBasicProperties();
+                properties.Headers = new Dictionary<string, object>
+                {
+                    { "x-delay", delayMilliseconds } // 延迟5秒
+                };
+                _channel_send.BasicPublish("kxs_direct_ranch", QueueName, properties, Encoding.Default.GetBytes(content));
             }
-            catch (Exception)
+            else
             {
-                throw;
+                _channel_send.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
             }
+            function.WriteLog(DateTime.Now.ToString() + "\n" + QueueName + "\n" + content + "\n\n\n", "SendMsg2");
         }
         #endregion
 
-        #region 方法
-        #region 初始化
-        /// <summary>
-        /// 创建连接
-        /// </summary>
-        public void CreateConn()
-        {
-            ConnectionFactory cf = new ConnectionFactory();
-            cf.Port = RabbitConfig.Port; //服务器的端口
-            cf.Endpoint = new AmqpTcpEndpoint(new Uri("amqp://" + RabbitConfig.IP + "/")); //服务器ip
-            cf.UserName = RabbitConfig.UserName; //登录账户
-            cf.Password = RabbitConfig.Password; //登录账户
-            cf.VirtualHost = RabbitConfig.VirtualHost; //虚拟主机
-            cf.RequestedHeartbeat = TimeSpan.Parse("60"); //虚拟主机
-
-            _connection = cf.CreateConnection();
-        }
-        #endregion
-
-        #region 发送消息
-        /// <summary>
-        /// 发送消息,泛型
-        /// </summary>
-        /// <typeparam name="T"></typeparam>
-        /// <param name="message"></param>
-        /// <returns></returns>
-        public bool Send<T>(T messageInfo, ref string errMsg)
-        {
-            if (messageInfo == null)
+        #region 单对单接收
+        public static IModel _channel_send;
+        public void CreateConn(string QueueName)
+        { 
+            var factory = new ConnectionFactory()
             {
-                errMsg = "消息对象不能为空";
-                return false;
-            }
-            string value = JsonConvert.SerializeObject(messageInfo);
-            return Send(value, ref errMsg);
-        }
-        /// <summary>
-        /// 发送消息,string类型
-        /// </summary>
-        /// <param name="message"></param>
-        /// <param name="errMsg"></param>
-        /// <returns></returns>
-        public bool Send(string message)
-        {
-            if (string.IsNullOrEmpty(message))
+                UserName = UserName,
+                Password = Password,
+                AutomaticRecoveryEnabled = true,  //如果connection挂掉是否重新连接
+                TopologyRecoveryEnabled = true,  //连接恢复后,连接的交换机,队列等是否一同恢复
+                VirtualHost = VirtualHostName,
+            };
+            List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
+            string[] HostNames = HostName.Split(',');
+            foreach (string subHostName in HostNames)
             {
-                return false;
+                string[] subHostNameData = subHostName.Split(':');
+                p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
             }
-            try
+            var conn = factory.CreateConnection(p);
+            _channel_send = conn.CreateModel();
+            _channel_send.ExchangeDeclare("kxs_direct_ranch", "x-delayed-message", true);
+            _channel_send.QueueDeclare(QueueName, true, false, false);
+            string routingKey = "delayed_routing_key";
+            IDictionary<string, object> arguments = new Dictionary<string, object>
             {
-                if (!_connection.IsOpen)
-                {
-                    CreateConn();
-                }
-                using (var channel = _connection.CreateModel())
-                {
-                    //推送消息
-                    byte[] bytes = Encoding.UTF8.GetBytes(message);
-
-                    IBasicProperties properties = channel.CreateBasicProperties();
-                    properties.DeliveryMode = Convert.ToByte(RabbitConfig.DurableMessage ? 2 : 1); //支持可持久化数据
-
-                    if (string.IsNullOrEmpty(RabbitConfig.ExchangeName))
-                    {
-                        //使用自定义的路由
-                        channel.ExchangeDeclare(RabbitConfig.ExchangeName, RabbitConfig.ExchangeType.ToString(), RabbitConfig.DurableMessage, false, null);
-                        channel.BasicPublish("", RabbitConfig.QueueName, properties, bytes);
-                    }
-                    else
-                    {
-                        //申明消息队列,且为可持久化的,如果队列的名称不存在,系统会自动创建,有的话不会覆盖
-                        channel.QueueDeclare(RabbitConfig.QueueName, RabbitConfig.DurableQueue, false, false, null);
-                        channel.BasicPublish(RabbitConfig.ExchangeName, RabbitConfig.RoutingKey, properties, bytes);
-                    }
-                    return true;
-                }
+                { "x-delayed-type", "direct" } // 这里可以指定不同的交换机类型
+            };
+            _channel_send.QueueBind(QueueName, "kxs_direct_ranch", routingKey, arguments);
+        }
 
-            }
-            catch (Exception ex)
+        public static IConnection _connection;
+        public void CreateConn()
+        { 
+            var factory = new ConnectionFactory()
+            {
+                UserName = UserName,
+                Password = Password,
+                AutomaticRecoveryEnabled = true,  //如果connection挂掉是否重新连接
+                TopologyRecoveryEnabled = true,  //连接恢复后,连接的交换机,队列等是否一同恢复
+                RequestedHeartbeat = TimeSpan.FromMinutes(1),
+                VirtualHost = VirtualHostName,
+            };
+            List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
+            string[] HostNames = HostName.Split(',');
+            foreach (string subHostName in HostNames)
             {
-                Library.function.WriteLog(DateTime.Now.ToString() + "\r\n" + ex.ToString(), "发送MQ队列消息异常");
-                return false;
+                string[] subHostNameData = subHostName.Split(':');
+                p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
             }
+            _connection = factory.CreateConnection(p);
         }
-        #endregion
-    }
-    public class RabbitBasicService : BaseService
-    {
-
-        /// <summary>
-        /// 构造函数
-        /// </summary>
-        /// <param name="config"></param>
-        public RabbitBasicService(RabbitMqConfigModel config)
-        : base(config)
-        { }
-
-
-        /// <summary>
-        /// 接受消息,使用Action进行处理
-        /// </summary>
-        /// <typeparam name="T"></typeparam>
-        /// <param name="method"></param>
-        public void Receive()
+        public void StartReceive(string QueueName)
         {
-            try
+            if (_connection == null)
             {
-                using (var channel = _connection.CreateModel())
-                {
-                    //申明队列
-                    channel.QueueDeclare(RabbitConfig.QueueName, RabbitConfig.DurableQueue, false, false, null);
-                    //使用路由
-                    if (!string.IsNullOrEmpty(RabbitConfig.ExchangeName))
-                    {
-                        //申明路由
-                        channel.ExchangeDeclare(RabbitConfig.ExchangeName, RabbitConfig.ExchangeType.ToString(), RabbitConfig.DurableQueue);
-                        //队列和交换机绑定
-                        channel.QueueBind(RabbitConfig.QueueName, RabbitConfig.ExchangeName, RabbitConfig.RoutingKey);
-                    }
-
-                    //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
-                    channel.BasicQos(0, 1, false);
-                    //在队列上定义一个消费者
-                    // var customer = new QueueingBasicConsumer(channel);
-                    var customer = new EventingBasicConsumer(channel);
-                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
-                    consumer.Received += (a, e) =>
-                    {
-                        string MsgContent = Encoding.Default.GetString(e.Body.ToArray());
-                        
-                        channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
-                    };
-                    //消费队列,并设置应答模式为程序主动应答
-                    channel.BasicConsume(RabbitConfig.QueueName, false, customer);
-
-                    // while (true)//timer
-                    // {
-                    //     //阻塞函数,获取队列中的消息
-                    //     ProcessingResultsEnum processingResult = ProcessingResultsEnum.Retry;
-                    //     ulong deliveryTag = 0;
-                    //     try
-                    //     {
-                    //         //Thread.Sleep(10);
-
-                    //         var ea = customer.Queue.Dequeue();
-                    //         deliveryTag = ea.DeliveryTag;
-                    //         byte[] bytes = ea.Body;
-                    //         string body = Encoding.UTF8.GetString(bytes);
-                    //         // T info = JsonConvert.DeserializeObject<T>(body);
-                    //         method(body);
-                    //         processingResult = ProcessingResultsEnum.Accept;
-                    //     }
-                    //     catch (Exception ex)
-                    //     {
-                    //         processingResult = ProcessingResultsEnum.Reject; //系统无法处理的错误
-                    //     }
-                    //     finally
-                    //     {
-                    //         switch (processingResult)
-                    //         {
-                    //             case ProcessingResultsEnum.Accept:
-                    //                 //回复确认处理成功
-                    //                 channel.BasicAck(deliveryTag,
-                    //                 false);//处理单挑信息
-                    //                 break;
-                    //             case ProcessingResultsEnum.Retry:
-                    //                 //发生错误了,但是还可以重新提交给队列重新分配
-                    //                 channel.BasicNack(deliveryTag, false, true);
-                    //                 break;
-                    //             case ProcessingResultsEnum.Reject:
-                    //                 //发生严重错误,无法继续进行,这种情况应该写日志或者是发送消息通知管理员
-                    //                 channel.BasicNack(deliveryTag, false, false);
-                    //                 //写日志
-                    //                 break;
-                    //         }
-                    //     }
-                    // }
-
-                }
+                CreateConn();
             }
-            catch (Exception ex)
+            else if (!_connection.IsOpen)
             {
+                CreateConn();
             }
+            var channel = _connection.CreateModel();
+            channel.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
+            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
+            consumer.Received += (a, e) =>
+            {
+                string MsgContent = Encoding.Default.GetString(e.Body.ToArray());
+                SycnSpTradeWifiService.Instance.PushMsgToJava(MsgContent);
+                channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
+            };
+            channel.BasicConsume(QueueName, false, consumer);
         }
+        #endregion
+
     }
-    #endregion
 }

+ 4 - 2
Startup.cs

@@ -232,7 +232,7 @@ namespace MySystem
                 InstallmentDeductionService.Instance.Start(); //分期扣款(每月20号执行)
                 LeaderAmountMonthChangeQueue.Instance.Start(); //每月一号记录上月盟主储蓄金和可提现余额
                 ResetPosFirstFlagAndIsExecuteService.Instance.Start(); //每月1号计算商户首台机具标记
-                OperateAmountRecordService.Instance.Start(); //运营中心额度变更 
+                OperateAmountRecordService.Instance.Start(); //运营中心额度变更
                 VirtualApplyHelper.Instance.Start(); //申请虚拟广电卡
 
                 ChangePosTimer.Instance.Start(); //售后换新执行机具数据转移
@@ -253,6 +253,9 @@ namespace MySystem
                 ToBigHelper.Instance.Start(); //机具通对资格
                 TradeFilterService.Instance.Start(); //营训奖励拦截
 
+                
+                // RabbitMQClientV2.Instance.CreateConn("KXS_WIFI_TRADE_QUEUE"); //推送wifi每月交易额
+
                 // PosCouponPrizeTest.Instance.Start();
             // }
 
@@ -281,7 +284,6 @@ namespace MySystem
 
             // 备用,暂时不放开的
             // StatStoreDataService.Instance.Start();
-            // RabbitMQClient.Instance.StartReceive("SycnTableData");
             // PayHelper.Instance.Start();
             // OrderHelper.Instance.Start();
             // OrderRefundHelper.Instance.Start();