RabbitMQClient2.cs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using RabbitMQ.Client;
  5. using RabbitMQ.Client.Events;
  6. using Library;
  7. using System.Threading;
  8. namespace MySystem
  9. {
  10. public class RabbitMQClient2
  11. {
  12. public readonly static RabbitMQClient2 Instance = new RabbitMQClient2();
  13. string UserName,Password,HostName,VirtualHostName;
  14. private RabbitMQClient2()
  15. {
  16. UserName = ConfigurationManager.AppSettings["MqUserName"].ToString();
  17. Password = ConfigurationManager.AppSettings["MqPassword"].ToString();
  18. HostName = ConfigurationManager.AppSettings["MqHostName"].ToString();
  19. VirtualHostName = ConfigurationManager.AppSettings["MqVirtualHostName"].ToString();
  20. }
  21. public static IConnection _connection;
  22. public void CreateConn()
  23. {
  24. var factory = new ConnectionFactory()
  25. {
  26. HostName = HostName,
  27. UserName = UserName,
  28. Password = Password,
  29. VirtualHost = VirtualHostName
  30. };
  31. _connection = factory.CreateConnection();
  32. }
  33. #region 单对单接收
  34. public void StartReceive(string QueueName, Action<string> CallBack)
  35. {
  36. if (_connection == null)
  37. {
  38. CreateConn();
  39. }
  40. else if (!_connection.IsOpen)
  41. {
  42. CreateConn();
  43. }
  44. var consumer = new EventingBasicConsumer(_channel[QueueName]);
  45. consumer.Received += (model, ea) =>
  46. {
  47. var body = ea.Body.ToArray();
  48. //获取接收的数据
  49. var message = Encoding.UTF8.GetString(body);
  50. // 模拟消息处理逻辑
  51. try
  52. {
  53. CallBack(message);
  54. // 手动确认消息
  55. _channel[QueueName].BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  56. }
  57. catch (Exception ex)
  58. {
  59. // 如果处理失败,可以选择拒绝消息或重新入队
  60. _channel[QueueName].BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
  61. Utils.WriteLog(ex.ToString(), "MQ异常");
  62. }
  63. };
  64. // 设置 autoAck = false
  65. _channel[QueueName].BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
  66. }
  67. #endregion
  68. #region 单对单发送
  69. public Dictionary<string, IModel> _channel = new Dictionary<string, IModel>();
  70. public void Conn(string QueueName)
  71. {
  72. if (_connection == null)
  73. {
  74. CreateConn();
  75. }
  76. else if (!_connection.IsOpen)
  77. {
  78. CreateConn();
  79. }
  80. var channel = _connection.CreateModel();
  81. channel.ExchangeDeclare("kxs_direct_ranch", "direct", true);
  82. channel.QueueDeclare(QueueName, true, false, false);
  83. channel.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
  84. if(!_channel.ContainsKey(QueueName)) _channel.Add(QueueName, channel);
  85. }
  86. public void Push(string QueueName, string Content)
  87. {
  88. _channel[QueueName].BasicPublish("", QueueName, null, Encoding.Default.GetBytes(Content));
  89. }
  90. #endregion
  91. }
  92. }