SpDataHelper.cs 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.IO;
  5. using System.Text;
  6. using System.Threading;
  7. using LitJson;
  8. using MySystem;
  9. using MySystem.SpModels;
  10. //营业额日汇总统计
  11. public class SpDataHelper
  12. {
  13. public readonly static SpDataHelper Instance = new SpDataHelper();
  14. private SpDataHelper()
  15. { }
  16. public void Start(string QueueName)
  17. {
  18. RabbitMQClient2.Instance.StartReceive(QueueName, content => DoWorks(content));
  19. }
  20. public void DoWorks(string content)
  21. {
  22. try
  23. {
  24. if(!string.IsNullOrEmpty(content))
  25. {
  26. string[] data = content.Split(new string[]{ "#cut#" }, StringSplitOptions.None);
  27. string tableName = data[0];
  28. string jsonString = data[1];
  29. if(tableName == "TradeRecord") DoQueueTrade(jsonString);
  30. }
  31. }
  32. catch (Exception ex)
  33. {
  34. Utils.WriteLog(DateTime.Now.ToString() + "\n" + ex, "SP原始数据入库异常");
  35. }
  36. }
  37. public BlockingCollection<TradeRecord> list = new BlockingCollection<TradeRecord>();
  38. public void DoQueueTrade(string jsonString)
  39. {
  40. TradeRecord trade = Newtonsoft.Json.JsonConvert.DeserializeObject<TradeRecord>(jsonString);
  41. list.Add(trade);
  42. }
  43. public void StartData()
  44. {
  45. Thread th = new Thread(StartDataDo);
  46. th.IsBackground = true;
  47. th.Start();
  48. }
  49. public void StartDataDo()
  50. {
  51. List<string> BrandIds = new List<string>();
  52. BrandIds.Add("14");
  53. BrandIds.Add("17");
  54. BrandIds.Add("23");
  55. BrandIds.Add("24");
  56. BrandIds.Add("25");
  57. BrandIds.Add("26");
  58. BrandIds.Add("32");
  59. MySystem.PxcModels.WebCMSEntities db = new MySystem.PxcModels.WebCMSEntities();
  60. MySystem.JavaProductModels.WebCMSEntities pdb = new MySystem.JavaProductModels.WebCMSEntities();
  61. bool conn = true;
  62. while (true)
  63. {
  64. try
  65. {
  66. TradeRecord trade;
  67. if(list.TryTake(out trade))
  68. {
  69. if(!conn)
  70. {
  71. db = new MySystem.PxcModels.WebCMSEntities();
  72. pdb = new MySystem.JavaProductModels.WebCMSEntities();
  73. conn = true;
  74. }
  75. Utils.WriteLog(Newtonsoft.Json.JsonConvert.SerializeObject(trade), "SP原始数据队列日志");
  76. // if(!BrandIds.Contains(trade.ProductType)) SycnSpTradeService.Instance.DoQueueTrade(db, pdb, trade);
  77. // if(trade.ProductType == "23") SycnSpTradeWifiService.Instance.DoQueueTrade(db, trade);
  78. }
  79. else
  80. {
  81. if(conn)
  82. {
  83. db.Dispose();
  84. pdb.Dispose();
  85. conn = false;
  86. }
  87. Thread.Sleep(5000);
  88. }
  89. }
  90. catch(Exception ex)
  91. {
  92. Utils.WriteLog(ex.ToString(), "SP原始数据队列异常");
  93. }
  94. }
  95. }
  96. }