using System; using System.Collections.Generic; using System.Data; using Library; using LitJson; namespace MySystem { public class PublicGetService { public readonly static PublicGetService Instance = new PublicGetService(); private PublicGetService() { } public void StartGet(JobMqMsg jobInfo) { string content = ""; string CheckDate = DateTime.Now.ToString("yyMMdd"); Brand brand = jobInfo.BrandInfo; try { if (brand.IsCheck) { //清除5天前的数据 string clearDate = DateTime.Now.AddDays(-5).ToString("yyMMdd"); RedisDbconn.Instance.Clear(brand.ReqType + "_data_" + clearDate + "_*"); if (brand.ReqType == "api") { //请求第三方接口获取数据 string result = function.PostWebRequest(brand.Url, brand.ReqParam); JsonData jsonObj = JsonMapper.ToObject(result); if (jsonObj.Count > 0) { for (int i = 0; i < jsonObj.Count; i++) { content = jsonObj[i].ToJson(); RedisDbconn.Instance.Set(brand.ReqType + "_data_" + CheckDate + "_" + brand.CheckKey, content); } jobInfo.Status = 1; jobInfo.Msg = "抓取完成"; RabbitMQClient.Instance.SendMsg(Newtonsoft.Json.JsonConvert.SerializeObject(jobInfo), "GetSourceDataBack"); } else { jobInfo.Status = 3; jobInfo.Msg = "暂无数据"; RabbitMQClient.Instance.SendMsg(Newtonsoft.Json.JsonConvert.SerializeObject(jobInfo), "GetSourceDataBack"); } } else if (brand.ReqType == "ftp") { List files = function.GetFtpFileList(brand.Url, brand.FtpUserName, brand.FtpPassword); if (files.Count > 0) { foreach (string filename in files) { string downloadFilePath = function.FtpDownload(brand.Url, filename, brand.FtpUserName, brand.FtpPassword); if (!string.IsNullOrEmpty(downloadFilePath)) { content = function.ReadInstance(downloadFilePath); RedisDbconn.Instance.Set(brand.ReqType + "_data_" + CheckDate + "_" + brand.CheckKey, content); } } jobInfo.Status = 1; jobInfo.Msg = "抓取完成"; RabbitMQClient.Instance.SendMsg(Newtonsoft.Json.JsonConvert.SerializeObject(jobInfo), "GetSourceDataBack"); } else { jobInfo.Status = 3; jobInfo.Msg = "暂无数据"; RabbitMQClient.Instance.SendMsg(Newtonsoft.Json.JsonConvert.SerializeObject(jobInfo), "GetSourceDataBack"); } } } else { string OrderString = jobInfo.OrderString; if (OrderString.StartsWith("Sp:") && OrderString.EndsWith(":Sycn")) { int doCount = 0; //执行数量 string TableName = OrderString.Split(':')[1]; bool op = true; while (op) { string requestMes = RedisDbconn.Instance.RPop(TableName); if (!string.IsNullOrEmpty(requestMes)) { JsonData jsonData = JsonMapper.ToObject(requestMes); string tableName = jobInfo.BrandInfo.TargetTableName; string checkKey = jobInfo.BrandInfo.CheckKey; string sql = "insert into " + tableName + " ("; List fields = jobInfo.BrandInfo.FieldRelation; foreach (FieldRelationTable field in fields) { sql += field.TargetFieldName + ","; } sql = sql.TrimEnd(','); sql += ") values ($val$);select @@IDENTITY"; DataTable check = dbconn.dtable("select * from " + tableName + " where " + checkKey + " = '" + jsonData[checkKey].ToString() + "'"); if (check.Rows.Count < 1) { string vals = ""; foreach (FieldRelationTable field in fields) { string val = jsonData[field.SourceFieldName ].ToString(); if (field.TargetFieldType == "number") { vals += val + ","; } else if (field.TargetFieldType == "date") { vals += "'" + DateTime.Parse(val).ToString("yyyy-MM-dd HH:mm:ss") + "',"; } else { vals += "'" + val + "',"; } } vals = vals.TrimEnd(','); sql = sql.Replace("$val$", vals); DataTable dt = dbconn.dtable(sql); if (dt.Rows.Count > 0) { string Id = dt.Rows[0][0].ToString(); dt = dbconn.dtable("select * from " + tableName + " where Id=" + Id); if (dt.Rows.Count > 0) { Dictionary tendisObj = new Dictionary(); foreach (DataColumn dc in dt.Columns) { tendisObj.Add(dc.ColumnName, dt.Rows[0][dc.ColumnName].ToString()); } RedisDbconn.Instance.AddList(TableName, Newtonsoft.Json.JsonConvert.SerializeObject(tendisObj)); } } doCount += 1; } } else { op = false; } } if (doCount > 0) { jobInfo.Status = 1; jobInfo.Msg = "抓取完成"; RabbitMQClient.Instance.SendMsg(Newtonsoft.Json.JsonConvert.SerializeObject(jobInfo), "GetSourceDataBack"); } else { jobInfo.Status = 3; jobInfo.Msg = "暂无数据"; RabbitMQClient.Instance.SendMsg(Newtonsoft.Json.JsonConvert.SerializeObject(jobInfo), "GetSourceDataBack"); } } } } catch (Exception ex) { if (!string.IsNullOrEmpty(content)) { Dictionary data = new Dictionary(); data.Add("ErrTime", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")); data.Add("ErrMsg", ex.ToString()); data.Add("Content", content); data.Add("CheckKey", brand.CheckKey); data.Add("CheckDate", CheckDate); RedisDbconn.Instance.AddList(brand.ReqType + "_get_err", data); } else { RedisDbconn.Instance.AddList(brand.ReqType + "_get_service", DateTime.Now.ToString() + ":" + ex.ToString()); } } } //核对抓取过来的数据和推送过来的数据 public void StartCheck(JobMqMsg jobInfo) { try { Brand brand = jobInfo.BrandInfo; DataTable dt = dbconn.dtable("select * from " + brand.TargetTableName + " where Id>" + jobInfo.StartId + " and Status=0 order by Id"); if (dt.Rows.Count > 0) { string note = ""; DataColumnCollection columns = dt.Columns; int checkedCount = 0; foreach (DataRow dr in dt.Rows) { string checkDate = dr["CreateDate"].ToString(); string jsonString = RedisDbconn.Instance.Get(brand.ReqType + "_data_" + DateTime.Parse(checkDate).ToString("yyMMdd") + "_" + brand.CheckKey); JsonData jsonObj = JsonMapper.ToObject(jsonString); bool isOk = true; foreach (FieldRelationTable field in brand.FieldRelation) { string fType = field.TargetFieldType; if (fType == "int") { if (int.Parse(function.CheckInt(dr[field.TargetFieldName].ToString())) != int.Parse(function.CheckInt(jsonObj[field.SourceFieldName].ToString()))) { isOk = false; note = "目标字段" + field.TargetFieldName + "和原始字段" + field.SourceFieldName + "不一致"; break; } } else if (fType == "decimal") { if (decimal.Parse(function.CheckNum(dr[field.TargetFieldName].ToString())) != decimal.Parse(function.CheckNum(jsonObj[field.SourceFieldName].ToString()))) { isOk = false; note = "目标字段" + field.TargetFieldName + "和原始字段" + field.SourceFieldName + "不一致"; break; } } else if (fType == "datetime") { if (DateTime.Parse(dr[field.TargetFieldName].ToString()) != DateTime.Parse(jsonObj[field.SourceFieldName].ToString())) { isOk = false; note = "目标字段" + field.TargetFieldName + "和原始字段" + field.SourceFieldName + "不一致"; break; } } else { if (dr[field.TargetFieldName].ToString() != jsonObj[field.SourceFieldName].ToString()) { isOk = false; note = "目标字段" + field.TargetFieldName + "和原始字段" + field.SourceFieldName + "不一致"; break; } } } if (isOk) { dbconn.op("update " + brand.TargetTableName + " set Status=1 where Id=" + dr["Id"].ToString()); Dictionary item = new Dictionary(); foreach (DataColumn dc in columns) { item.Add(dc.ColumnName, dr[dc.ColumnName].ToString()); } RedisDbconn.Instance.AddList(brand.TargetTableName, item); jobInfo.EndId = int.Parse(dr["Id"].ToString()); checkedCount += 1; } else { dbconn.op("update " + brand.TargetTableName + " set Status=-1 where Id=" + dr["Id"].ToString()); RedisDbconn.Instance.AddList(jobInfo.Id + "_check_fail", dr); } } if (checkedCount < dt.Rows.Count) { jobInfo.Status = 2; jobInfo.Note = note; jobInfo.Msg = "核对未完成"; } else { jobInfo.Status = 1; jobInfo.Msg = "核对完成"; } RabbitMQClient.Instance.SendMsg(Newtonsoft.Json.JsonConvert.SerializeObject(jobInfo), "CheckSourceDataBack"); } else { jobInfo.Status = 3; jobInfo.Msg = "暂无数据核对"; RabbitMQClient.Instance.SendMsg(Newtonsoft.Json.JsonConvert.SerializeObject(jobInfo), "CheckSourceDataBack"); } } catch (Exception ex) { jobInfo.Status = -1; jobInfo.Msg = "执行异常,请排查"; jobInfo.Note = ex.ToString(); RabbitMQClient.Instance.SendMsg(Newtonsoft.Json.JsonConvert.SerializeObject(jobInfo), "CheckSourceDataBack"); } } } }