SourceDataToDb.cs 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Security.Cryptography;
  5. using System.Threading;
  6. using Library;
  7. using LitJson;
  8. using MySystem;
  9. using MySystem.Models;
  10. public class SourceDataToDb
  11. {
  12. public readonly static SourceDataToDb Instance = new SourceDataToDb();
  13. private SourceDataToDb()
  14. { }
  15. public void Start()
  16. {
  17. Thread th = new Thread(DoWorks);
  18. th.IsBackground = true;
  19. th.Start();
  20. }
  21. public void DoWorks()
  22. {
  23. while (true)
  24. {
  25. string content = RedisDbconn.Instance.RPop<string>("DataToDbQueue");
  26. if (!string.IsNullOrEmpty(content))
  27. {
  28. try
  29. {
  30. function.WriteLog(DateTime.Now.ToString() + "\n" + content, "补原始数据");
  31. string[] data = content.Split('|');
  32. string jsonString = data[2];
  33. int BrandId = int.Parse(data[1]);
  34. string Kind = data[0];
  35. WebCMSEntities db = new WebCMSEntities();
  36. string key = "kxs_" + Kind + "_list_" + BrandId;
  37. function.WriteLog(key, "补原始数据");
  38. JobMqMsg obj = RedisDbconn.Instance.GetList<JobMqMsg>("GetSpData", 1, 1000).FirstOrDefault(m => m.OrderString.Contains(key));
  39. if(obj != null)
  40. {
  41. function.WriteLog("找到job", "补原始数据");
  42. PublicImportDataService.Instance.InsertData(key, jsonString, obj, BrandId);
  43. function.WriteLog("执行完毕", "补原始数据");
  44. }
  45. db.Dispose();
  46. }
  47. catch(Exception ex)
  48. {
  49. function.WriteLog(DateTime.Now.ToString() + "\n" + ex.ToString(), "补原始数据异常");
  50. }
  51. }
  52. else
  53. {
  54. Thread.Sleep(5000);
  55. }
  56. }
  57. }
  58. }