| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- using System;
- using System.Collections.Generic;
- using System.Text;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using Library;
- using System.Threading;
- namespace MySystem
- {
- public class RabbitMQClient2
- {
- public readonly static RabbitMQClient2 Instance = new RabbitMQClient2();
- string UserName,Password,HostName,VirtualHostName;
- private RabbitMQClient2()
- {
- UserName = ConfigurationManager.AppSettings["MqUserName"].ToString();
- Password = ConfigurationManager.AppSettings["MqPassword"].ToString();
- HostName = ConfigurationManager.AppSettings["MqHostName"].ToString();
- VirtualHostName = ConfigurationManager.AppSettings["MqVirtualHostName"].ToString();
- }
- public static IConnection _connection;
- public void CreateConn()
- {
- // var factory = new ConnectionFactory()
- // {
- // HostName = HostName,
- // UserName = UserName,
- // Password = Password,
- // // VirtualHost = VirtualHostName
- // };
- // _connection = factory.CreateConnection();
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复
- VirtualHost = VirtualHostName,
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- _connection = factory.CreateConnection(p);
- }
- #region 单对单接收
- public void StartReceive(string QueueName, Action<string> CallBack)
- {
- if (_connection == null)
- {
- CreateConn();
- }
- else if (!_connection.IsOpen)
- {
- CreateConn();
- }
- var consumer = new EventingBasicConsumer(_channel[QueueName]);
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body.ToArray();
- //获取接收的数据
- var message = Encoding.UTF8.GetString(body);
- // 模拟消息处理逻辑
- try
- {
- CallBack(message);
- // 手动确认消息
- _channel[QueueName].BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
- }
- catch (Exception ex)
- {
- // 如果处理失败,可以选择拒绝消息或重新入队
- _channel[QueueName].BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
- Utils.WriteLog(ex.ToString(), "MQ异常");
- }
- };
- // 设置 autoAck = false
- _channel[QueueName].BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
- }
- #endregion
- #region 单对单发送
- public Dictionary<string, IModel> _channel = new Dictionary<string, IModel>();
- public void Conn(string QueueName)
- {
- if (_connection == null)
- {
- CreateConn();
- }
- else if (!_connection.IsOpen)
- {
- CreateConn();
- }
- var channel = _connection.CreateModel();
- channel.ExchangeDeclare("kxs_direct_ranch", "direct", true);
- channel.QueueDeclare(QueueName, true, false, false);
- channel.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
- if(!_channel.ContainsKey(QueueName)) _channel.Add(QueueName, channel);
- }
- public void Push(string QueueName, string Content)
- {
- _channel[QueueName].BasicPublish("", QueueName, null, Encoding.Default.GetBytes(Content));
- }
- #endregion
- }
- }
|