Преглед изворни кода

通过MQ接收SP原始数据

lcl пре 4 месеци
родитељ
комит
74c347db9f

+ 103 - 0
Common/RabbitMQClient.cs

@@ -0,0 +1,103 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
+using System.Threading;
+using Infrastructure;
+using Infrastructure.Model;
+
+namespace Common
+{
+    public class RabbitMQClient
+    {
+        public readonly static RabbitMQClient Instance = new RabbitMQClient();
+        string UserName,Password,HostName,VirtualHostName;
+        private RabbitMQClient()
+        {
+            var options = App.OptionsSetting;
+            RabbitMqConfigs RabbitMqConfigs = options.RabbitMqConfigs;
+            UserName = RabbitMqConfigs.UserName;
+            Password = RabbitMqConfigs.Password;
+            HostName = RabbitMqConfigs.HostName;
+            VirtualHostName = RabbitMqConfigs.VirtualHostName;
+        }
+
+        public static IConnection _connection;
+        public void CreateConn()
+        { 
+            var factory = new ConnectionFactory()
+            {
+                HostName = HostName,
+                UserName = UserName,
+                Password = Password,
+                VirtualHost = VirtualHostName
+            };
+            _connection = factory.CreateConnection();
+        }
+
+        #region 单对单接收
+
+        public void StartReceive(string QueueName, Action<string> CallBack)
+        {
+            if (_connection == null)
+            {
+                CreateConn();
+            }
+            else if (!_connection.IsOpen)
+            {
+                CreateConn();
+            }
+
+            var consumer = new EventingBasicConsumer(_channel[QueueName]);
+            consumer.Received += (model, ea) =>
+            {
+                var body = ea.Body.ToArray();
+                //获取接收的数据
+                var message = Encoding.UTF8.GetString(body);
+
+                // 模拟消息处理逻辑
+                try
+                {
+                    CallBack(message);
+                    // 手动确认消息
+                    _channel[QueueName].BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
+                }
+                catch (Exception ex)
+                {
+                    // 如果处理失败,可以选择拒绝消息或重新入队
+                    _channel[QueueName].BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
+                    Utils.WriteLog(ex.ToString(), "MQ异常");
+                }
+            };
+
+            // 设置 autoAck = false
+            _channel[QueueName].BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
+        }
+        #endregion
+
+        #region 单对单发送
+        public Dictionary<string, IModel> _channel = new Dictionary<string, IModel>();
+        public void Conn(string QueueName)
+        {
+            if (_connection == null)
+            {
+                CreateConn();
+            }
+            else if (!_connection.IsOpen)
+            {
+                CreateConn();
+            }
+            var channel = _connection.CreateModel();
+            channel.ExchangeDeclare("kxs_direct_ranch", "direct", true);
+            channel.QueueDeclare(QueueName, true, false, false);
+            channel.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
+            if(!_channel.ContainsKey(QueueName)) _channel.Add(QueueName, channel);
+        }
+        public void Push(string QueueName, string Content)
+        {
+            _channel[QueueName].BasicPublish("", QueueName, null, Encoding.Default.GetBytes(Content));
+        }
+        #endregion
+    }
+}

+ 1 - 0
Extensions/AppServiceExtensions.cs

@@ -32,6 +32,7 @@ namespace Infrastructure
             services.AddTransient<IHdUnBindRecordService, HdUnBindRecordService>();
             services.AddTransient<IHdChangeRecordService, HdChangeRecordService>();
             services.AddTransient<IHdDepositRecordService, HdDepositRecordService>();
+            services.AddTransient<IKxsTradeRecordService, KxsTradeRecordService>();
         }
 
         private static void Register(IServiceCollection services, string item)

+ 9 - 0
Model/Base/OptionsSetting.cs

@@ -26,6 +26,7 @@ namespace Infrastructure.Model
         public CodeGen CodeGen { get; set; }
         public List<DbConfigs> DbConfigs { get; set; }
         public DbConfigs CodeGenDbConfig { get; set; }
+        public RabbitMqConfigs RabbitMqConfigs { get; set; }
     }
     /// <summary>
     /// 发送邮件数据配置
@@ -134,4 +135,12 @@ namespace Infrastructure.Model
         public string Secret { get; set; }
         public string BucketName { get; set; }
     }
+
+    public class RabbitMqConfigs
+    {
+        public string UserName { get; set; }
+        public string Password { get; set; }
+        public string HostName { get; set; }
+        public string VirtualHostName { get; set; }
+    }
 }

+ 290 - 0
Model/Database/KxsTradeRecord.cs

@@ -0,0 +1,290 @@
+using Mapster;
+
+
+namespace Model
+{
+    /// <summary>
+    /// 交易记录 kxs_trade_record
+    /// </summary>
+    [SplitTable(SplitType.Month)]
+    [SugarTable("kxs_trade_record_{year}{month}{day}", "交易记录")]
+    [Tenant("0")]
+    public class KxsTradeRecord
+    {
+        /// <summary>
+        /// 交易单号
+        /// </summary>
+        [SugarColumn(ColumnDescription = "交易单号", Length = 100, ColumnName = "trade_serial_no")]
+        public string? tradeSerialNo { get; set; }
+
+
+        /// <summary>
+        /// 备用字段3
+        /// </summary>
+        [SugarColumn(ColumnDescription = "备用字段3", Length = 50, ColumnName = "field3")]
+        public string? field3 { get; set; }
+
+
+        /// <summary>
+        /// 备用字段2
+        /// </summary>
+        [SugarColumn(ColumnDescription = "备用字段2", Length = 50, ColumnName = "field2")]
+        public string? field2 { get; set; }
+
+
+        /// <summary>
+        /// 备用字段1
+        /// </summary>
+        [SugarColumn(ColumnDescription = "备用字段1", Length = 50, ColumnName = "field1")]
+        public string? field1 { get; set; }
+
+
+        /// <summary>
+        /// 渠道流水号
+        /// </summary>
+        [SugarColumn(ColumnDescription = "渠道流水号", Length = 50, ColumnName = "channel_serial")]
+        public string? channelSerial { get; set; }
+
+
+        /// <summary>
+        /// 代理商编号
+        /// </summary>
+        [SugarColumn(ColumnDescription = "代理商编号", Length = 32, ColumnName = "agent_no")]
+        public string? agentNo { get; set; }
+
+
+        /// <summary>
+        /// 产品类型
+        /// </summary>
+        [SugarColumn(ColumnDescription = "产品类型", Length = 32, ColumnName = "product_type")]
+        public string? productType { get; set; }
+
+
+        /// <summary>
+        /// 备注
+        /// </summary>
+        [SugarColumn(ColumnDescription = "备注", Length = 64, ColumnName = "remark")]
+        public string? remark { get; set; }
+
+
+        /// <summary>
+        /// 出款方式
+        /// </summary>
+        [SugarColumn(ColumnDescription = "出款方式", Length = 16, ColumnName = "settle_method")]
+        public string? settleMethod { get; set; }
+
+
+        /// <summary>
+        /// 出款手续费
+        /// </summary>
+        [SugarColumn(ColumnDescription = "出款手续费", ColumnName = "settle_fee")]
+        public decimal settleFee { get; set; }
+
+
+        /// <summary>
+        /// 交易卡号
+        /// </summary>
+        [SugarColumn(ColumnDescription = "交易卡号", Length = 32, ColumnName = "bank_card_no")]
+        public string? bankCardNo { get; set; }
+
+
+        /// <summary>
+        /// 设备类型
+        /// </summary>
+        [SugarColumn(ColumnDescription = "设备类型", Length = 16, ColumnName = "receipt_type")]
+        public string? receiptType { get; set; }
+
+
+        /// <summary>
+        /// 店面收银交易标识
+        /// </summary>
+        [SugarColumn(ColumnDescription = "店面收银交易标识", Length = 16, ColumnName = "is_store_cashier")]
+        public string? isStoreCashier { get; set; }
+
+
+        /// <summary>
+        /// 挖款金额
+        /// </summary>
+        [SugarColumn(ColumnDescription = "挖款金额", ColumnName = "dig_amt")]
+        public decimal digAmt { get; set; }
+
+
+        /// <summary>
+        /// 挖款标识
+        /// </summary>
+        [SugarColumn(ColumnDescription = "挖款标识", Length = 4, ColumnName = "dig_amt_flag")]
+        public string? digAmtFlag { get; set; }
+
+
+        /// <summary>
+        /// 商户手机号
+        /// </summary>
+        [SugarColumn(ColumnDescription = "商户手机号", Length = 11, ColumnName = "mer_mobile")]
+        public string? merMobile { get; set; }
+
+
+        /// <summary>
+        /// 设备类型
+        /// </summary>
+        [SugarColumn(ColumnDescription = "设备类型", Length = 16, ColumnName = "device_type")]
+        public string? deviceType { get; set; }
+
+
+        /// <summary>
+        /// 交易SN号
+        /// </summary>
+        [SugarColumn(ColumnDescription = "交易SN号", Length = 50, ColumnName = "trade_sn_no")]
+        public string? tradeSnNo { get; set; }
+
+
+        /// <summary>
+        /// 商户编号
+        /// </summary>
+        [SugarColumn(ColumnDescription = "商户编号", Length = 50, ColumnName = "mer_no")]
+        public string? merNo { get; set; }
+
+
+        /// <summary>
+        /// 商户名称
+        /// </summary>
+        [SugarColumn(ColumnDescription = "商户名称", Length = 50, ColumnName = "mer_name")]
+        public string? merName { get; set; }
+
+
+        /// <summary>
+        /// 交易金额
+        /// </summary>
+        [SugarColumn(ColumnDescription = "交易金额", ColumnName = "trade_amount")]
+        public decimal tradeAmount { get; set; }
+
+
+        /// <summary>
+        /// 交易参考号
+        /// </summary>
+        [SugarColumn(ColumnDescription = "交易参考号", Length = 32, ColumnName = "trade_refer_no")]
+        public string? tradeReferNo { get; set; }
+
+
+        /// <summary>
+        /// 银行授权码
+        /// </summary>
+        [SugarColumn(ColumnDescription = "银行授权码", Length = 32, ColumnName = "bank_auth_code")]
+        public string? bankAuthCode { get; set; }
+
+
+        /// <summary>
+        /// 优惠费率标识
+        /// </summary>
+        [SugarColumn(ColumnDescription = "优惠费率标识", Length = 16, ColumnName = "discount_rate_flag")]
+        public string? discountRateFlag { get; set; }
+
+
+        /// <summary>
+        /// 备用字段4
+        /// </summary>
+        [SugarColumn(ColumnDescription = "备用字段4", Length = 50, ColumnName = "field4")]
+        public string? field4 { get; set; }
+
+
+        /// <summary>
+        /// 交易状态
+        /// </summary>
+        [SugarColumn(ColumnDescription = "交易状态", Length = 16, ColumnName = "trade_status")]
+        public string? tradeStatus { get; set; }
+
+
+        /// <summary>
+        /// 错误信息
+        /// </summary>
+        [SugarColumn(ColumnDescription = "错误信息", Length = 32, ColumnName = "error_msg")]
+        public string? errorMsg { get; set; }
+
+
+        /// <summary>
+        /// 交易日期
+        /// </summary>
+        [SugarColumn(ColumnDescription = "交易日期", Length = 8, ColumnName = "trade_date")]
+        public string? tradeDate { get; set; }
+
+
+        /// <summary>
+        /// 交易时间
+        /// </summary>
+        [SugarColumn(ColumnDescription = "交易时间", Length = 8, ColumnName = "trade_time")]
+        public string? tradeTime { get; set; }
+
+
+        /// <summary>
+        /// 交易类型
+        /// </summary>
+        [SugarColumn(ColumnDescription = "交易类型", Length = 32, ColumnName = "trade_type")]
+        public string? tradeType { get; set; }
+
+
+        /// <summary>
+        /// 输入模式
+        /// </summary>
+        [SugarColumn(ColumnDescription = "输入模式", Length = 32, ColumnName = "ser_entry_mode")]
+        public string? serEntryMode { get; set; }
+
+
+        /// <summary>
+        /// 银行卡类型
+        /// </summary>
+        [SugarColumn(ColumnDescription = "银行卡类型", Length = 16, ColumnName = "bank_card_type")]
+        public string? bankCardType { get; set; }
+
+
+        /// <summary>
+        /// 错误码
+        /// </summary>
+        [SugarColumn(ColumnDescription = "错误码", Length = 16, ColumnName = "error_code")]
+        public string? errorCode { get; set; }
+
+
+        /// <summary>
+        /// 备用字段5
+        /// </summary>
+        [SugarColumn(ColumnDescription = "备用字段5", Length = 50, ColumnName = "field5")]
+        public string? field5 { get; set; }
+
+
+        /// <summary>
+        /// 更新时间
+        /// </summary>
+        [SugarColumn(ColumnDescription = "更新时间", ColumnName = "update_time")]
+        public DateTime? updateTime { get; set; }
+
+
+        /// <summary>
+        /// 创建时间
+        /// </summary>
+        [SplitField]
+        [SugarColumn(ColumnDescription = "创建时间", ColumnName = "create_time")]
+        public DateTime? createTime { get; set; }
+
+
+        /// <summary>
+        /// 删除标记
+        /// </summary>
+        [SugarColumn(ColumnDescription = "删除标记", ColumnName = "del_flag")]
+        public int delFlag { get; set; }
+
+
+        /// <summary>
+        /// 版本号
+        /// </summary>
+        [SugarColumn(ColumnDescription = "版本号", ColumnName = "version")]
+        public int version { get; set; }
+
+
+        /// <summary>
+        /// ID
+        /// </summary>
+        [SugarColumn(ColumnDescription = "ID", IsPrimaryKey = true, ColumnName = "id")]
+        public long id { get; set; }
+
+
+
+    }
+}

+ 3 - 1
Program.cs

@@ -109,6 +109,8 @@ app.MapControllers();
 app.Urls.Add("http://*:5049");
 
 HaoDaHelper.Instance.Start(); 
-HaoDaHelper.Instance.StartDeposit(); 
+HaoDaHelper.Instance.StartDeposit();
+RabbitMQClient.Instance.Conn("SpDataQueue");
+SpDataHelper.Instance.Start("SpDataQueue");
 
 app.Run();

+ 17 - 0
Services/IService/IKxsTradeRecordService.cs

@@ -0,0 +1,17 @@
+using Model;
+using Model.Base;
+using Microsoft.AspNetCore.Mvc;
+
+
+namespace Services
+{
+    public interface IKxsTradeRecordService : IBaseService<KxsTradeRecord>
+    {
+        /// <summary>
+        /// 交易记录-添加
+        /// </summary>
+        /// <param name="param">参数请求体</param>
+        /// <returns>添加</returns>
+        long addKxsTradeRecord(KxsTradeRecord param);
+    }
+}

+ 27 - 0
Services/KxsTradeRecordService.cs

@@ -0,0 +1,27 @@
+using Attribute;
+using Model;
+using Model.Base;
+using Repository;
+using Service;
+using Microsoft.AspNetCore.Mvc;
+
+
+namespace Services
+{
+    /// <summary>
+    /// 交易记录Service业务层处理
+    /// </summary>
+    [AppService(ServiceType = typeof(IKxsTradeRecordService), ServiceLifetime = LifeTime.Transient)]
+    public class KxsTradeRecordService : BaseService<KxsTradeRecord>, IKxsTradeRecordService
+    {
+        /// <summary>
+        /// 交易记录-添加
+        /// </summary>
+        /// <param name="param">参数请求体</param>
+        /// <returns>添加</returns>
+        public long addKxsTradeRecord(KxsTradeRecord param)
+        {
+            return Insertable(param).SplitTable().ExecuteReturnSnowflakeId();
+        }
+    }
+}

+ 1 - 0
SpServer.csproj

@@ -34,6 +34,7 @@
     <PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
     <PackageReference Include="NLog" Version="5.2.8" />
     <PackageReference Include="Npgsql" Version="8.0.3" />
+    <PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
     <PackageReference Include="SqlSugar.IOC" Version="2.0.0" />
     <PackageReference Include="SqlSugarCoreNoDrive" Version="5.1.4.136" />
     <PackageReference Include="summerboot" Version="2.1.3" />

+ 1 - 0
SqlSugar/InitTable.cs

@@ -33,6 +33,7 @@ namespace SqlSugar
             db.CodeFirst.SplitTables().InitTables<HdBindRecord>();
             db.CodeFirst.SplitTables().InitTables<HdTradeRecord>();
             db.CodeFirst.SplitTables().InitTables<HdDepositRecord>();
+            db.CodeFirst.SplitTables().InitTables<KxsTradeRecord>();
         }
     }
 }

+ 49 - 0
Task/SpDataHelper.cs

@@ -0,0 +1,49 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Text;
+using System.Threading;
+using Common;
+using Infrastructure;
+using Services;
+using Model;
+using LitJson;
+
+//营业额日汇总统计
+public class SpDataHelper
+{
+    public readonly static SpDataHelper Instance = new SpDataHelper();
+    private SpDataHelper()
+    { }
+
+    public void Start(string QueueName)
+    {
+        RabbitMQClient.Instance.StartReceive(QueueName, content => DoWorks(content));
+    }
+
+    public void DoWorks(string content)
+    {
+        try
+        {
+            if(!string.IsNullOrEmpty(content))
+            {
+                string[] data = content.Split(new string[]{ "#cut#" }, StringSplitOptions.None);
+                string tableName = data[0];
+                string jsonString = data[1];
+                if(tableName == "TradeRecord") DoQueueTrade(jsonString);
+            }
+        }
+        catch (Exception ex)
+        {
+            Utils.WriteLog(DateTime.Now.ToString() + "\n" + ex, "SP原始数据入库异常");
+        }
+    }
+
+    public void DoQueueTrade(string jsonString)
+    { 
+        KxsTradeRecord data = Newtonsoft.Json.JsonConvert.DeserializeObject<KxsTradeRecord>(jsonString);
+        var tradeService = App.GetService<IKxsTradeRecordService>();
+        tradeService.addKxsTradeRecord(data);
+    }
+
+}

+ 14 - 0
Util/Utils.cs

@@ -0,0 +1,14 @@
+public class Utils
+{
+    
+    #region 打控制台日志
+
+    public static void WriteLog(string msg, string filename = "message")
+    {
+        Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + "--" + filename + "--" + msg);
+    }
+
+    #endregion
+
+
+}

+ 6 - 0
appsettings.Development.json

@@ -40,6 +40,12 @@
     "RefreshTokenTime": 30, //分钟
     "TokenType": "Bearer"
   },
+  "RabbitMqConfigs": {
+    "UserName": "admin",
+    "Password": "admin",
+    "HostName": "",
+    "VirtualHostName": "/"
+  },
   "InitDb": false, //是否初始化db
   "ApiKey": "*ga34|^7" //webapi解密的key
 }