||
- using System;
- using System.Collections.Generic;
- using System.Text;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using Library;
- using System.Threading;
- namespace MySystem
- {
- public class RabbitMQClient
- {
- public readonly static RabbitMQClient Instance = new RabbitMQClient();
- string UserName,Password,HostName,VirtualHostName;
- private RabbitMQClient()
- {
- UserName = ConfigurationManager.AppSettings["MqUserName"].ToString();
- Password = ConfigurationManager.AppSettings["MqPassword"].ToString();
- HostName = ConfigurationManager.AppSettings["MqHostName"].ToString();
- VirtualHostName = ConfigurationManager.AppSettings["MqVirtualHostName"].ToString();
- }
- #region 单对单发送
- public void SendMsg(string content, string QueueName)
- {
- // 创建连接对象工厂
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- var conn = factory.CreateConnection(p);
- var channel = conn.CreateModel();
- channel.QueueDeclare(QueueName);
- channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
- channel.Dispose();
- conn.Dispose();
- }
- public void SendMsg2(string content, string QueueName)
- {
- // 创建连接对象工厂
- // if (_connection == null)
- // {
- // CreateConn();
- // }
- // else if (!_connection.IsOpen)
- // {
- // CreateConn();
- // }
- // var channel = _connection.CreateModel();
- // channel.ExchangeDeclare("kxs_direct_ranch", "direct", true);
- // channel.QueueDeclare(QueueName, true, false, false);
- // channel.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
- // channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
- // channel.Dispose();
- _channel_send.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
- function.WriteLog(DateTime.Now.ToString() + "\n" + QueueName + "\n" + content + "\n\n\n", "SendMsg2");
- }
- public void SendMsg3(string content, string QueueName)
- {
- _channel_kxs_send.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
- function.WriteLog(DateTime.Now.ToString() + "\n" + QueueName + "\n" + content + "\n\n\n", "SendMsg3");
- }
- public void PushPosData(string content, string QueueName)
- {
- _channel_kxs_push.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(content));
- function.WriteLog(DateTime.Now.ToString() + "\n" + QueueName + "\n" + content + "\n\n\n", "PushPosData");
- }
- #endregion
- #region 单对单发送
- public void ListenSendMsg()
- {
- Thread th = new Thread(ListenSendMsgDo);
- th.IsBackground = true;
- th.Start();
- }
- Dictionary<string, IModel> channels = new Dictionary<string, IModel>();
- public void ListenSendMsgDo()
- {
- while (true)
- {
- //创建连接对象工厂
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- var conn = factory.CreateConnection(p);
- bool op = true;
- while (op)
- {
- string data = RedisDbconn.Instance.RPop<string>("MainServerMq");
- if (!string.IsNullOrEmpty(data))
- {
- try
- {
- string[] dataList = data.Split("#cut#");
- string QueueName = dataList[0];
- if (!channels.ContainsKey(QueueName))
- {
- var channelCreate = conn.CreateModel();
- channels.Add(QueueName, channelCreate);
- }
- var channel = channels[QueueName];
- channel.QueueDeclare(QueueName, true, false, false);
- channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(dataList[1]));
- }
- catch (Exception ex)
- {
- op = false;
- function.WriteLog(DateTime.Now.ToString() + "\r\n" + ex.ToString(), "MQ消息队列单对单发送监听异常");
- }
- }
- }
- // channel.Dispose();
- conn.Dispose();
- function.WriteLog(DateTime.Now.ToString(), "MQ测试");
- }
- }
- #endregion
- #region 单对单接收
- public static IModel _channel_send;
- public void CreateConn2(string QueueName)
- {
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复
- VirtualHost = VirtualHostName,
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- var conn = factory.CreateConnection(p);
- _channel_send = conn.CreateModel();
- _channel_send.ExchangeDeclare("kxs_direct_ranch", "direct", true);
- _channel_send.QueueDeclare(QueueName, true, false, false);
- _channel_send.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
- }
- public static IModel _channel_kxs_send;
- public void CreateConn3(string QueueName)
- {
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复
- VirtualHost = VirtualHostName,
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- var conn = factory.CreateConnection(p);
- _channel_kxs_send = conn.CreateModel();
- _channel_kxs_send.ExchangeDeclare("kxs_direct_ranch", "direct", true);
- _channel_kxs_send.QueueDeclare(QueueName, true, false, false);
- _channel_kxs_send.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
- }
- public static IModel _channel_kxs_push;
- public void CreatePushConn(string QueueName)
- {
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复
- VirtualHost = VirtualHostName,
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- var conn = factory.CreateConnection(p);
- _channel_kxs_push = conn.CreateModel();
- _channel_kxs_push.ExchangeDeclare("kxs_direct_ranch", "direct", true);
- _channel_kxs_push.QueueDeclare(QueueName, true, false, false);
- _channel_kxs_push.QueueBind(QueueName, "kxs_direct_ranch", QueueName);
- }
- public static IConnection _connection;
- public void CreateConn()
- {
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true, //连接恢复后,连接的交换机,队列等是否一同恢复
- RequestedHeartbeat = TimeSpan.FromMinutes(1),
- VirtualHost = VirtualHostName,
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- _connection = factory.CreateConnection(p);
- }
- public void StartReceive(string QueueName)
- {
- if (_connection == null)
- {
- CreateConn();
- }
- else if (!_connection.IsOpen)
- {
- CreateConn();
- }
- var channel = _connection.CreateModel();
- channel.QueueBind(QueueName, "kxs_dead_ranch", QueueName);
- EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
- consumer.Received += (a, e) =>
- {
- string MsgContent = Encoding.Default.GetString(e.Body.ToArray());
- function.WriteLog(MsgContent, "接收mq数据队列");
- if(QueueName == "QUEUE_KXS_TO_LKB_USER_INFO_DIVISION")
- {
- if(SycnJavaUsersService.Instance.Add(MsgContent))
- {
- channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
- }
- }
- else
- {
- if(SycnSpBindService.Instance.SimDo(MsgContent))
- {
- channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
- }
- }
- };
- channel.BasicConsume(QueueName, false, consumer);
- }
- public void StartReceiveApi(string QueueName)
- {
- if (_connection == null)
- {
- CreateConn();
- }
- else if (!_connection.IsOpen)
- {
- CreateConn();
- }
- var channel = _connection.CreateModel();
- channel.QueueBind(QueueName, "kxs_dead_ranch", "DEAD_QUEUE_GD_ACT_DIVISION");
- EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
- consumer.Received += (a, e) =>
- {
- string MsgContent = Encoding.Default.GetString(e.Body.ToArray());
- RedisDbconn.Instance.AddList("JavaUrlDataQueue", MsgContent);
- function.WriteLog(MsgContent, "接收mq接口数据队列");
- if(ApiDataFromJavaService.Instance.ApiDataDo(MsgContent))
- {
- channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
- }
- };
- channel.BasicConsume(QueueName, false, consumer);
- }
- #endregion
- #region 单对多发送
- public void SendMsgToExchange(string content, string Exchange = "")
- {
- //创建连接对象工厂
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- var conn = factory.CreateConnection(p);
- var channel = conn.CreateModel();
- channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false);
- channel.BasicPublish(Exchange, "", null, Encoding.Default.GetBytes(content));
- channel.Dispose();
- conn.Dispose();
- }
- #endregion
- #region 单对多接收
- public void StartReceiveFromExchange(string QueueName = "", string Exchange = "")
- {
- var factory = new ConnectionFactory()
- {
- UserName = UserName,
- Password = Password,
- AutomaticRecoveryEnabled = true, //如果connection挂掉是否重新连接
- TopologyRecoveryEnabled = true //连接恢复后,连接的交换机,队列等是否一同恢复
- };
- List<AmqpTcpEndpoint> p = new List<AmqpTcpEndpoint>();
- string[] HostNames = HostName.Split(',');
- foreach (string subHostName in HostNames)
- {
- string[] subHostNameData = subHostName.Split(':');
- p.Add(new AmqpTcpEndpoint(subHostNameData[0], int.Parse(subHostNameData[1])));
- }
- var conn = factory.CreateConnection(p);
- var channel = conn.CreateModel();
- //定义队列
- channel.QueueDeclare(QueueName, true, false, false);
- //定义交换机
- channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false);
- //绑定队列到交换机
- channel.QueueBind(QueueName, Exchange, "");
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (a, e) =>
- {
- Library.function.WriteLog(Encoding.Default.GetString(e.Body.ToArray()), "接收到的MQ消息");
- channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
- };
- channel.BasicConsume(QueueName, false, consumer);
- }
- #endregion
- }
- }
|