RabbitMQClient.cs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using RabbitMQ.Client;
  5. using RabbitMQ.Client.Events;
  6. using Infrastructure;
  7. using Infrastructure.Model;
  8. namespace Common
  9. {
  10. public class RabbitMQClient
  11. {
  12. public readonly static RabbitMQClient Instance = new RabbitMQClient();
  13. string UserName,Password,HostName,VirtualHostName;
  14. private RabbitMQClient()
  15. {
  16. var options = App.OptionsSetting;
  17. RabbitMqConfigs RabbitMqConfigs = options.RabbitMqConfigs;
  18. UserName = RabbitMqConfigs.UserName;
  19. Password = RabbitMqConfigs.Password;
  20. HostName = RabbitMqConfigs.HostName;
  21. VirtualHostName = RabbitMqConfigs.VirtualHostName;
  22. }
  23. public static IConnection _connection;
  24. public void CreateConn()
  25. {
  26. var factory = new ConnectionFactory()
  27. {
  28. HostName = HostName,
  29. UserName = UserName,
  30. Password = Password,
  31. VirtualHost = VirtualHostName
  32. };
  33. _connection = factory.CreateConnection();
  34. }
  35. #region 单对单接收
  36. public void StartReceive(string QueueName, Action<string> CallBack)
  37. {
  38. if (_connection == null)
  39. {
  40. CreateConn();
  41. }
  42. else if (!_connection.IsOpen)
  43. {
  44. CreateConn();
  45. }
  46. var consumer = new EventingBasicConsumer(_channel[QueueName]);
  47. consumer.Received += (model, ea) =>
  48. {
  49. var body = ea.Body.ToArray();
  50. //获取接收的数据
  51. var message = Encoding.UTF8.GetString(body);
  52. // 模拟消息处理逻辑
  53. try
  54. {
  55. CallBack(message);
  56. // 手动确认消息
  57. _channel[QueueName].BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  58. }
  59. catch (Exception ex)
  60. {
  61. // 如果处理失败,可以选择拒绝消息或重新入队
  62. _channel[QueueName].BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
  63. Function.WriteLog(ex.ToString(), "MQ异常");
  64. }
  65. };
  66. // 设置 autoAck = false
  67. _channel[QueueName].BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
  68. }
  69. #endregion
  70. #region 单对单发送
  71. public Dictionary<string, IModel> _channel = new Dictionary<string, IModel>();
  72. public void Conn(string QueueName)
  73. {
  74. if (_connection == null)
  75. {
  76. CreateConn();
  77. }
  78. else if (!_connection.IsOpen)
  79. {
  80. CreateConn();
  81. }
  82. var channel = _connection.CreateModel();
  83. channel.ExchangeDeclare("hbc_direct_ranch", "direct", true);
  84. channel.QueueDeclare(QueueName, true, false, false);
  85. channel.QueueBind(QueueName, "hbc_direct_ranch", QueueName);
  86. if(!_channel.ContainsKey(QueueName)) _channel.Add(QueueName, channel);
  87. }
  88. public void Push(string QueueName, string Content)
  89. {
  90. _channel[QueueName].BasicPublish("", QueueName, null, Encoding.Default.GetBytes(Content));
  91. }
  92. #endregion
  93. }
  94. }