RabbitMQClient.cs 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using RabbitMQ.Client;
  5. using RabbitMQ.Client.Events;
  6. using Library;
  7. namespace MySystem
  8. {
  9. public class RabbitMQClient
  10. {
  11. public readonly static RabbitMQClient Instance = new RabbitMQClient();
  12. string UserName,Password,HostName;
  13. private RabbitMQClient()
  14. {
  15. UserName = ConfigurationManager.AppSettings["MqUserName"].ToString();
  16. Password = ConfigurationManager.AppSettings["MqPassword"].ToString();
  17. HostName = ConfigurationManager.AppSettings["MqHostName"].ToString();
  18. }
  19. #region 单对单发送
  20. public void SendMsg(string content, string QueueName = "")
  21. {
  22. //创建连接对象工厂
  23. var factory = new ConnectionFactory()
  24. {
  25. UserName = UserName,
  26. Password = Password,
  27. AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
  28. TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
  29. };
  30. List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
  31. string[] HostNames = HostName.Split(',');
  32. foreach (string subHostName in HostNames)
  33. {
  34. string[] subHostNameData = subHostName.Split(':');
  35. p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
  36. }
  37. var conn = factory.CreateConnection(p);
  38. var channel = conn.CreateModel();
  39. channel.QueueDeclare(QueueName, true, false, false);
  40. channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
  41. channel.Dispose();
  42. conn.Dispose();
  43. }
  44. #endregion
  45. #region 单对单接收
  46. public void StartReceive(string QueueName)
  47. {
  48. var factory = new ConnectionFactory()
  49. {
  50. UserName = UserName,
  51. Password = Password,
  52. AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
  53. TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
  54. };
  55. List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
  56. string[] HostNames = HostName.Split(',');
  57. foreach (string subHostName in HostNames)
  58. {
  59. string[] subHostNameData = subHostName.Split(':');
  60. p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
  61. }
  62. var conn = factory.CreateConnection(p);
  63. var channel = conn.CreateModel();
  64. channel.QueueDeclare(QueueName, true, false, false);
  65. EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
  66. consumer.Received += (a, e) =>
  67. {
  68. string MsgContent = Encoding.Default.GetString(e.Body.ToArray());
  69. if (QueueName == "PublicMainServer")
  70. {
  71. JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
  72. ReceiveTaskService.Instance.Start(job);
  73. }
  74. else if (QueueName == "SycnTableData")
  75. {
  76. JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
  77. if (job.BrandInfo.DataType == 1)
  78. {
  79. //同步激活
  80. SycnActiveRewardService.Instance.Start(job);
  81. }
  82. else if (job.BrandInfo.DataType == 2)
  83. {
  84. //同步交易
  85. SycnTradeRecordService.Instance.Start(job);
  86. }
  87. else if (job.BrandInfo.DataType == 8)
  88. {
  89. //同步商户
  90. SycnMerchantInfoService.Instance.Start(job);
  91. }
  92. }
  93. else if (QueueName == "ProfitForEverMonth")
  94. {
  95. JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
  96. ProfitService.Instance.Start(job);
  97. }
  98. else if (QueueName == "FluxPrize")
  99. {
  100. JobMqMsg job = Newtonsoft.Json.JsonConvert.DeserializeObject<JobMqMsg>(MsgContent);
  101. FluxService.Instance.Start(job);
  102. }
  103. channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
  104. };
  105. channel.BasicConsume(QueueName, false, consumer);
  106. }
  107. #endregion
  108. #region 单对多发送
  109. public void SendMsgToExchange(string content, string Exchange = "")
  110. {
  111. //创建连接对象工厂
  112. var factory = new ConnectionFactory()
  113. {
  114. UserName = UserName,
  115. Password = Password,
  116. AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
  117. TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
  118. };
  119. List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
  120. string[] HostNames = HostName.Split(',');
  121. foreach (string subHostName in HostNames)
  122. {
  123. string[] subHostNameData = subHostName.Split(':');
  124. p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
  125. }
  126. var conn = factory.CreateConnection(p);
  127. var channel = conn.CreateModel();
  128. channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false);
  129. channel.BasicPublish(Exchange, "", null, Encoding.Default.GetBytes(content));
  130. channel.Dispose();
  131. conn.Dispose();
  132. }
  133. #endregion
  134. #region 单对多接收
  135. public void StartReceiveFromExchange(string QueueName = "", string Exchange = "")
  136. {
  137. var factory = new ConnectionFactory()
  138. {
  139. UserName = UserName,
  140. Password = Password,
  141. AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
  142. TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
  143. };
  144. List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
  145. string[] HostNames = HostName.Split(',');
  146. foreach (string subHostName in HostNames)
  147. {
  148. string[] subHostNameData = subHostName.Split(':');
  149. p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
  150. }
  151. var conn = factory.CreateConnection(p);
  152. var channel = conn.CreateModel();
  153. //定义队列
  154. channel.QueueDeclare(QueueName, true, false, false);
  155. //定义交换机
  156. channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false);
  157. //绑定队列到交换机
  158. channel.QueueBind(QueueName, Exchange, "");
  159. var consumer = new EventingBasicConsumer(channel);
  160. consumer.Received += (a, e) =>
  161. {
  162. Library.function.WriteLog(Encoding.Default.GetString(e.Body.ToArray()), "接收到的MQ消息");
  163. channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
  164. };
  165. channel.BasicConsume(QueueName, false, consumer);
  166. }
  167. #endregion
  168. }
  169. }