python操作rabbitmq

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法 安装模块 pip install pika 基于Queue实现生产者消费者模型

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading
message = Queue.Queue(10)
def producer(i):
   while True:
   message.put(i)

def consumer(i):
   while True:
   msg = message.get()

for i in range(12):
   t = threading.Thread(target=producer, args=(i,))
   t.start()

for i in range(10):
   t = threading.Thread(target=consumer, args=(i,))
   t.start()

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列

#!/usr/bin/env python
import pika

# ######################### 生产者 #########################
connection = pika.BlockingConnection(pika.ConnectionParameters(

        host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print( [x] Sent 'Hello World!')

connection.close()
#!/usr/bin/env python

import pika
# ########################## 消费者 ##########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
      print( [x] Received %r % body)
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

发布订阅 发布者

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
 host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
 type='fanout')

message = ' '.join(sys.argv[1:]) or info: Hello World!
channel.basic_publish(exchange='logs',
 routing_key='',
 body=message)
print( [x] Sent %r % message)
connection.close()

订阅者

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
 host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
 type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
 queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
 print( [x] %r % body)

channel.basic_consume(callback,
 queue=queue_name,
 no_ack=True)

channel.start_consuming()

发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
 host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
 type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
 sys.stderr.write(Usage: %s [info] [warning] [error]\n % sys.argv[0])
 sys.exit(1)

for severity in severities:
 channel.queue_bind(exchange='direct_logs',
 queue=queue_name,
 routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
 print( [x] %r:%r % (method.routing_key, body))

channel.basic_consume(callback,
 queue=queue_name,
 no_ack=True)

channel.start_consuming()
消费者
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
 host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
 type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
 routing_key=severity,
 body=message)
print( [x] Sent %r:%r % (severity, message))
connection.close()

生产者

*在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。 - # 表示可以匹配 0 个 或 多个 单词 - 表示只能匹配 一个 单词**

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
 host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
 type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
 sys.stderr.write(Usage: %s [binding_key]...\n % sys.argv[0])
 sys.exit(1)

for binding_key in binding_keys:
 channel.queue_bind(exchange='topic_logs',
 queue=queue_name,
 routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
 print( [x] %r:%r % (method.routing_key, body))

channel.basic_consume(callback,
 queue=queue_name,
 no_ack=True)

channel.start_consuming()

消费者
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
 host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
 type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
 routing_key=routing_key,
 body=message)
print( [x] Sent %r:%r % (routing_key, message))
connection.close()

生产者

设置密码

credentials = pika.PlainCredentials(username,password)

connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.14.47',credentials=credentials))
# 超时时间
conn.add_timeout(5, lambda: channel.stop_consuming())

python操作rabbitmq
http://www.jcwit.com/article/160/
作者
Carlos
发布于
2019年3月25日
许可协议