using System; using System.Collections.Generic; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; using Library; using System.Threading; namespace MySystem { public class RabbitMQClient { public readonly static RabbitMQClient Instance = new RabbitMQClient(); string UserName,Password,HostName,VirtualHostName; private RabbitMQClient() { UserName = ConfigurationManager.AppSettings["MqUserName"].ToString(); Password = ConfigurationManager.AppSettings["MqPassword"].ToString(); HostName = ConfigurationManager.AppSettings["MqHostName"].ToString(); VirtualHostName = ConfigurationManager.AppSettings["MqVirtualHostName"].ToString(); } #region 单对单发送 public void SendMsg(string content, string QueueName) { // 创建连接对象工厂 var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复 }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } var conn = factory.CreateConnection(p); var channel = conn.CreateModel(); channel.QueueDeclare(QueueName); channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content)); channel.Dispose(); conn.Dispose(); } public void SendMsg2(string content, string QueueName) { // 创建连接对象工厂 // if (_connection == null) // { // CreateConn(); // } // else if (!_connection.IsOpen) // { // CreateConn(); // } // var channel = _connection.CreateModel(); // channel.ExchangeDeclare("kxs_direct_ranch", "direct", true); // channel.QueueDeclare(QueueName, true, false, false); // channel.QueueBind(QueueName, "kxs_direct_ranch", QueueName); // channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content)); // channel.Dispose(); _channel_send.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content)); Utils.WriteLog(DateTime.Now.ToString() + "\n" + QueueName + "\n" + content + "\n\n\n", "SendMsg2"); } public void SendMsg3(string content, string QueueName) { _channel_kxs_send.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content)); Utils.WriteLog(DateTime.Now.ToString() + "\n" + QueueName + "\n" + content + "\n\n\n", "SendMsg3"); } public void PushPosData(string content, string QueueName) { _channel_kxs_push.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content)); Utils.WriteLog(DateTime.Now.ToString() + "\n" + QueueName + "\n" + content + "\n\n\n", "PushPosData"); } #endregion #region 单对单发送 public void ListenSendMsg() { Thread th = new Thread(ListenSendMsgDo); th.IsBackground = true; th.Start(); } Dictionary channels = new Dictionary(); public void ListenSendMsgDo() { while (true) { //创建连接对象工厂 var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复 }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } var conn = factory.CreateConnection(p); bool op = true; while (op) { string data = RedisDbconn.Instance.RPop("MainServerMq"); if (!string.IsNullOrEmpty(data)) { try { string[] dataList = data.Split("#cut#"); string QueueName = dataList[0]; if (!channels.ContainsKey(QueueName)) { var channelCreate = conn.CreateModel(); channels.Add(QueueName, channelCreate); } var channel = channels[QueueName]; channel.QueueDeclare(QueueName, true, false, false); channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(dataList[1])); } catch (Exception ex) { op = false; Utils.WriteLog(DateTime.Now.ToString() + "\r\n" + ex.ToString(), "MQ消息队列单对单发送监听异常"); } } } // channel.Dispose(); conn.Dispose(); Utils.WriteLog(DateTime.Now.ToString(), "MQ测试"); } } #endregion #region 单对单接收 public static IModel _channel_send; public void CreateConn2(string QueueName) { var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复 VirtualHost = VirtualHostName, }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } var conn = factory.CreateConnection(p); _channel_send = conn.CreateModel(); _channel_send.ExchangeDeclare("kxs_direct_ranch", "direct", true); _channel_send.QueueDeclare(QueueName, true, false, false); _channel_send.QueueBind(QueueName, "kxs_direct_ranch", QueueName); } public static IModel _channel_kxs_send; public void CreateConn3(string QueueName) { var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复 VirtualHost = VirtualHostName, }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } var conn = factory.CreateConnection(p); _channel_kxs_send = conn.CreateModel(); _channel_kxs_send.ExchangeDeclare("kxs_direct_ranch", "direct", true); _channel_kxs_send.QueueDeclare(QueueName, true, false, false); _channel_kxs_send.QueueBind(QueueName, "kxs_direct_ranch", QueueName); } public static IModel _channel_kxs_push; public void CreatePushConn(string QueueName) { var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复 VirtualHost = VirtualHostName, }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } var conn = factory.CreateConnection(p); _channel_kxs_push = conn.CreateModel(); _channel_kxs_push.ExchangeDeclare("kxs_direct_ranch", "direct", true); _channel_kxs_push.QueueDeclare(QueueName, true, false, false); _channel_kxs_push.QueueBind(QueueName, "kxs_direct_ranch", QueueName); } public static IModel _channel_prize; public void CreatePrizeConn(string QueueName) { var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复 VirtualHost = VirtualHostName, }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } var conn = factory.CreateConnection(p); _channel_prize = conn.CreateModel(); _channel_prize.ExchangeDeclare("kxs_direct_ranch", "direct", true); _channel_prize.QueueDeclare(QueueName, true, false, false); _channel_prize.QueueBind(QueueName, "kxs_direct_ranch", QueueName); } public void SendPrize(string content, string QueueName) { _channel_prize.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content)); } public static IConnection _connection; public void CreateConn() { var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复 RequestedHeartbeat = TimeSpan.FromMinutes(1), VirtualHost = VirtualHostName, }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } _connection = factory.CreateConnection(p); } public void StartReceive(string QueueName) { if (_connection == null) { CreateConn(); } else if (!_connection.IsOpen) { CreateConn(); } var channel = _connection.CreateModel(); channel.QueueBind(QueueName, "kxs_dead_ranch", QueueName); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { string MsgContent = Encoding.Default.GetString(e.Body.ToArray()); Utils.WriteLog(MsgContent, "接收mq数据队列"); if(QueueName == "QUEUE_KXS_TO_LKB_USER_INFO_DIVISION") { if(SycnJavaUsersService.Instance.Add(MsgContent)) { channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息 } } else { if(SycnSpBindService.Instance.SimDo(MsgContent)) { channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息 } } }; channel.BasicConsume(QueueName, false, consumer); } public void StartReceiveApi(string QueueName) { if (_connection == null) { CreateConn(); } else if (!_connection.IsOpen) { CreateConn(); } var channel = _connection.CreateModel(); channel.QueueBind(QueueName, "kxs_dead_ranch", "DEAD_QUEUE_GD_ACT_DIVISION"); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { string MsgContent = Encoding.Default.GetString(e.Body.ToArray()); RedisDbconn.Instance.AddList("JavaUrlDataQueue", MsgContent); Utils.WriteLog(MsgContent, "接收mq接口数据队列"); if(ApiDataFromJavaService.Instance.ApiDataDo(MsgContent)) { channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息 } }; channel.BasicConsume(QueueName, false, consumer); } #endregion #region 单对多发送 public void SendMsgToExchange(string content, string Exchange = "") { //创建连接对象工厂 var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复 }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } var conn = factory.CreateConnection(p); var channel = conn.CreateModel(); channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false); channel.BasicPublish(Exchange, "", null, Encoding.Default.GetBytes(content)); channel.Dispose(); conn.Dispose(); } #endregion #region 单对多接收 public void StartReceiveFromExchange(string QueueName = "", string Exchange = "") { var factory = new ConnectionFactory() { UserName = UserName, Password = Password, AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接 TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复 }; List p = new List(); string[] HostNames = HostName.Split(','); foreach (string subHostName in HostNames) { string[] subHostNameData = subHostName.Split(':'); p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1]))); } var conn = factory.CreateConnection(p); var channel = conn.CreateModel(); //定义队列 channel.QueueDeclare(QueueName, true, false, false); //定义交换机 channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false); //绑定队列到交换机 channel.QueueBind(QueueName, Exchange, ""); var consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { Utils.WriteLog(Encoding.Default.GetString(e.Body.ToArray()), "接收到的MQ消息"); channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息 }; channel.BasicConsume(QueueName, false, consumer); } #endregion } }