using System; using System.Collections.Generic; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; using Library; using System.Threading; namespace MySystem { public class RabbitMQClient2 { public readonly static RabbitMQClient2 Instance = new RabbitMQClient2(); string UserName,Password,HostName,VirtualHostName; private RabbitMQClient2() { UserName = ConfigurationManager.AppSettings["MqUserName"].ToString(); Password = ConfigurationManager.AppSettings["MqPassword"].ToString(); HostName = ConfigurationManager.AppSettings["MqHostName"].ToString(); VirtualHostName = ConfigurationManager.AppSettings["MqVirtualHostName"].ToString(); } public static IConnection _connection; public void CreateConn() { // var factory = new ConnectionFactory() // { // HostName = HostName, // UserName = UserName, // Password = Password, // // VirtualHost = VirtualHostName // }; // _connection = factory.CreateConnection(); var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复 VirtualHost = VirtualHostName, }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } _connection = factory.CreateConnection(p); } #region 单对单接收 public void StartReceive(string QueueName, Action CallBack) { if (_connection == null) { CreateConn(); } else if (!_connection.IsOpen) { CreateConn(); } var consumer = new EventingBasicConsumer(_channel[QueueName]); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); //获取接收的数据 var message = Encoding.UTF8.GetString(body); // 模拟消息处理逻辑 try { CallBack(message); // 手动确认消息 _channel[QueueName].BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } catch (Exception ex) { // 如果处理失败,可以选择拒绝消息或重新入队 _channel[QueueName].BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true); Utils.WriteLog(ex.ToString(), "MQ异常"); } }; // 设置 autoAck = false _channel[QueueName].BasicConsume(queue: QueueName, autoAck: false, consumer: consumer); } #endregion #region 单对单发送 public Dictionary _channel = new Dictionary(); public void Conn(string QueueName) { if (_connection == null) { CreateConn(); } else if (!_connection.IsOpen) { CreateConn(); } var channel = _connection.CreateModel(); channel.ExchangeDeclare("kxs_direct_ranch", "direct", true); channel.QueueDeclare(QueueName, true, false, false); channel.QueueBind(QueueName, "kxs_direct_ranch", QueueName); if(!_channel.ContainsKey(QueueName)) _channel.Add(QueueName, channel); } public void Push(string QueueName, string Content) { _channel[QueueName].BasicPublish("", QueueName, null, Encoding.Default.GetBytes(Content)); } #endregion } }