首先是要安装rabbitmq啦。这个我相信会linux和windows的人应该都会安装吧!

具体安装方法可以百度。但是我在linux安装的时候,本来想源码安装的,但是好像安装文件有问题,不知道是不是自己没有C语言的编译包。现在不管他了,我最后是安装的RPM包。

 

 这个两个安装包,我已经分享出来,大家自己去下载吧。

erlang19.0.4-1  https://ftp.dgpm.co/s/zzc7sZbm8mxqe93

rabbitmq3.7.5   https://ftp.dgpm.co/s/NKx9HZmKfSKMys8

在安装rabbitmq的时候,有个小报错,只需要安装支持包就可以解决。就是这个socat

yum install socat一键搞定。

安装完成以后。效果是这样的

 

 里面有一些常用的命令。

执行:

/bin/systemctl stoprabbitmq-server.service #关闭

/bin/systemctl start  rabbitmq-server.service #启动

/bin/systemctl status rabbitmq-server.service #状态

1. 用户管理

用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。

相应的命令

(1) 新增一个用户

rabbitmqctl  add_user  Username  Password

(1.1)赋予用户权限命令

 rabbitmqctl  set_permissions -p "/" User'.*' '.*' '.*'

(1.2)赋予用户角色命令

 rabbitmqctl set_user_tags User administrator

 

(2) 删除一个用户

rabbitmqctl  delete_user  Username

(3) 修改用户的密码

rabbitmqctl  change_password  Username  Newpassword

(4) 查看当前用户列表

rabbitmqctl  list_users

开启管理UI:

rabbitmq-plugins enable rabbitmq_management
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
最后的的是防火墙开放端口。大家都懂得。

有一个用的比较多的命令,查看队列的。就是rabbitmqctl list_queues   rabbitmqctl list_exchanges
查看队列和消息数量。

 

 

--------------------------------好了,rabbitMQ的安装就介绍了这里了吧------------------------------------------

现在开始说在python里面生产者和消费者的代码,刚开始消费者一直有一个报错。我都迷糊,查了好多资料,都解决不了,最后才知道,是rabbitmq的参数位置变了,所以我的参数位置也要调整。

 

 

就是这个报错,我都快懵了。

 

 python生产者代码:

#_*_coding:utf-8_*_#author:david.zimport pikaimport randomcredentials=pika.PlainCredentials('david','123')  #这里做了一个用户身份认证识别connection=pika.BlockingConnection(    pika.ConnectionParameters('IP地址',5672,'/',credentials) #这里是连rabbitmq)channel=connection.channel()  #声明一个管道#声明queuechannel.queue_declare(queue='PMC2',durable=True)  #加True是为管道持久化#n RabbitMQ a message can never be sent directly to queue,it always# list=['我爱北京天安门','天安门上太阳升','鱼戏莲叶东,',' 鱼戏莲叶西,','鱼戏莲叶南,',' 鱼戏莲叶北。']# for i in list:# #     print (i)channel.basic_publish(exchange='',                      routing_key='PMC2',                      body="我爱北京天安门",                      properties=pika.BasicProperties(                          delivery_mode=2, #模式为2是为了消息持久化                      ))print ("[x] Sent 'Hello World!'")connection.close()

 python消费者代码:

#!/usr/bin/env python# -*- coding: utf-8 -*-# @Time    : 2018/2/22 15:57# @Author  :david# @File    : rabbitMQ_rescv_1.py# @Software: PyCharmimport pikacredentials = pika.PlainCredentials('david', '123')connection = pika.BlockingConnection(pika.ConnectionParameters('IP地址',5672,'/',credentials))channel = connection.channel()channel.queue_declare(queue='PMC2',durable=True)def callback(ch, method, properties, body):print(" [x] Received %r" % body.decode())channel.basic_qos(prefetch_count=1) #加上这句可以实现负载平均化channel.basic_consume('PMC2', #消费消息                      callback,                      True)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()

自此简单的生产者消费代码就出来了。

然后就是广播,用的exchange.这里一定要提醒一下,广播原理类似于收音机,他是会发广播。但是如果生产者如果下线,然后在上线,是不会收到,上线之前的广播的,因为广播是不会一直存的。过时就没了。

fanout_producer.py代码

#_*_coding:utf-8_*_#author:david.zimport pikaimport syscredentials=pika.PlainCredentials('david','123')connection =pika.BlockingConnection(pika.ConnectionParameters('ip地址', 5672, '/', credentials))channel = connection.channel()channel.exchange_declare(exchange='logs',                         exchange_type='fanout')# message = ' '.join(sys.argv[1:]) or "info: Hello World!"message ="info: Hello World!"channel.basic_publish(exchange='logs',                      routing_key='',                      body=message)print(" [x] Sent %r" % message)connection.close()

fanout_consumer.py代码

#_*_coding:utf-8_*_#author:david.zimport pikacredentials=pika.PlainCredentials('david','123')connection =pika.BlockingConnection(pika.ConnectionParameters('IP地址', 5672, '/', credentials))channel = connection.channel()channel.exchange_declare(exchange='logs',                         exchange_type='fanout')result = channel.queue_declare('',exclusive=True)  # exclusiv唯一的。不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除queue_name = result.method.queueprint ("random queuename",queue_name)channel.queue_bind(exchange='logs',                   queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(                      queue_name,                      callback,                      True)channel.start_consuming()

 



©著作权归作者所有:来自51CTO博客作者天天打老虎的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. python连接redis存储
  2. Microsoft 365 解决方案:Teams 如何加强与外部用户协作体验
  3. Authing 客户故事|句子互动
  4. Centos7系统安装nextcloud13.0.6,详细教程
  5. Centos7系统安装nextcloud13.0.6,开始遇到坑
  6. 20210121 Linux系统及安装
  7. 全网最硬核PHP面试题来了 2021年学习面试跳槽必备(一)
  8. 阿凡提EGGNETWORK恒价通证+加密社交催生新玩法 EFTalk
  9. 0-10V转0-10V隔离器,信号转换器,信号分配模块

随机推荐

  1. 用虚拟机装的linux与宿主机的IP能互ping
  2. 计算所有字符,包括linux中的空格
  3. Linux服务列表(CentOS)
  4. 【Linux】CentOS7上解压zip需要安装uzip
  5. libpcap丢包原理分析及Fedora 9 内核2.6.
  6. linux下启动、关闭oracle服务
  7. 如何在虚拟机中安装kali linux
  8. CentOS 7下配置IP地址
  9. 心中的完美的E680I[文字]
  10. zynq PS侧DMA驱动