RabbitMQClient.cs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using RabbitMQ.Client;
  5. using RabbitMQ.Client.Events;
  6. using System.Threading;
  7. using Infrastructure;
  8. using Infrastructure.Model;
  9. namespace Common
  10. {
  11. public class RabbitMQClient
  12. {
  13. public readonly static RabbitMQClient Instance = new RabbitMQClient();
  14. string UserName,Password,HostName,VirtualHostName;
  15. private RabbitMQClient()
  16. {
  17. var options = App.OptionsSetting;
  18. RabbitMqConfigs RabbitMqConfigs = options.RabbitMqConfigs;
  19. UserName = RabbitMqConfigs.UserName;
  20. Password = RabbitMqConfigs.Password;
  21. HostName = RabbitMqConfigs.HostName;
  22. VirtualHostName = RabbitMqConfigs.VirtualHostName;
  23. }
  24. public static IConnection _connection;
  25. public void CreateConn()
  26. {
  27. // var factory = new ConnectionFactory()
  28. // {
  29. // HostName = HostName,
  30. // UserName = UserName,
  31. // Password = Password,
  32. // VirtualHost = VirtualHostName
  33. // };
  34. // _connection = factory.CreateConnection();
  35. var factory = new ConnectionFactory()
  36. {
  37. UserName = UserName,
  38. Password = Password,
  39. AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
  40. TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复
  41. VirtualHost = VirtualHostName,
  42. };
  43. List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
  44. string[] HostNames = HostName.Split(',');
  45. foreach (string subHostName in HostNames)
  46. {
  47. string[] subHostNameData = subHostName.Split(':');
  48. p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
  49. }
  50. _connection = factory.CreateConnection(p);
  51. }
  52. #region 单对单接收
  53. public void StartReceive(string QueueName, Action<string> CallBack)
  54. {
  55. if (_connection == null)
  56. {
  57. CreateConn();
  58. }
  59. else if (!_connection.IsOpen)
  60. {
  61. CreateConn();
  62. }
  63. var consumer = new EventingBasicConsumer(_channel[QueueName]);
  64. consumer.Received += (model, ea) =>
  65. {
  66. var body = ea.Body.ToArray();
  67. //获取接收的数据
  68. var message = Encoding.UTF8.GetString(body);
  69. // 模拟消息处理逻辑
  70. try
  71. {
  72. CallBack(message);
  73. // 手动确认消息
  74. _channel[QueueName].BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  75. }
  76. catch (Exception ex)
  77. {
  78. // 如果处理失败,可以选择拒绝消息或重新入队
  79. _channel[QueueName].BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
  80. Utils.WriteLog(ex.ToString(), "MQ异常");
  81. }
  82. };
  83. // 设置 autoAck = false
  84. _channel[QueueName].BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
  85. }
  86. #endregion
  87. #region 单对单发送
  88. public Dictionary<string, IModel> _channel = new Dictionary<string, IModel>();
  89. public void Conn(string QueueName)
  90. {
  91. if (_connection == null)
  92. {
  93. CreateConn();
  94. }
  95. else if (!_connection.IsOpen)
  96. {
  97. CreateConn();
  98. }
  99. var channel = _connection.CreateModel();
  100. channel.ExchangeDeclare("kxs_direct_ranch", "direct", true);
  101. channel.QueueDeclare(QueueName, true, false, false);
  102. channel.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
  103. if(!_channel.ContainsKey(QueueName)) _channel.Add(QueueName, channel);
  104. }
  105. public void Push(string QueueName, string Content)
  106. {
  107. _channel[QueueName].BasicPublish("", QueueName, null, Encoding.Default.GetBytes(Content));
  108. }
  109. #endregion
  110. }
  111. }