微信扫一扫 分享朋友圈

已有 39 人浏览分享

开启左侧

Kafka如何配置用户名密码访问

[复制链接]
39 0
1 软件版本
kafka_2.12-2.4.0.tgz(带zookeeper)http://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz

2 kafka服务端部署
2.1 将安装包上传到服务器,并解压
  1. tar zxvf kafka_2.12-2.4.0.tgz -C /data

  2. mv kafka_2.12-2.4.0 kafka
复制代码
2.2 修改kafka配置文件 server.properties
vim /data/kafka/config/server.properties:
  1. ############################# Server Basics #############################

  2. # The id of the broker. This must be set to a unique integer for each broker.

  3. broker.id=0
  4. host.name=10.7.2.201

  5. listeners=SASL_PLAINTEXT://10.7.2.201:9092
  6. advertised.listeners=SASL_PLAINTEXT://10.7.2.201:9092

  7. security.inter.broker.protocol=SASL_PLAINTEXT
  8. sasl.enabled.mechanisms=PLAIN
  9. sasl.mechanism.inter.broker.protocol=PLAIN
  10. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  11. allow.everyone.if.no.acl.found=true

复制代码
2.3 添加SASL配置文件
在kafka安装目录下创建一个 kafka_server_jaas.conf 文件
vim /data/kafka/kafka_server_jaas.conf:
  1. KafkaServer {
  2.   org.apache.kafka.common.security.plain.PlainLoginModule required
  3.     username="admin"
  4.     password="adminpasswd"
  5.     user_admin="adminpasswd"
  6.     user_producer="producerpwd"
  7.     user_consumer="consumerpwd";
  8. };

复制代码
说明:该配置通过org.apache.org.apache.kafka.common.security.plain.PlainLoginModule由指定采用PLAIN机制,定义了用户。
    usemame和password指定该代理与集群其他代理初始化连接的用户名和密码
    "user_"为前缀后接用户名方式创建连接代理的用户名和密码,例如,user_producer=“producerpwd” 是指用户名为producer,密码为producerpwd
    username为admin的用户,和user为admin的用户,密码要保持一致,否则会认证失败
    上述配置中,创建了三个用户,分别为admin、producer和consumer(创建多少个用户,可根据业务需要配置,用户名和密码可自定义设置)

2.4 修改启动脚本
vim /data/kafka/bin/kafka-server-start.sh ,找到 export KAFKA_HEAP_OPTS , 添加jvm 参数为kafka_server_jaas.conf文件:
  1. -Djava.security.auth.login.config=/data/kafka/kafka_server_jaas.conf
复制代码
image.png
2.5 启动kafka服务
  1. # 先启动zookeeper
  2. /data/kafka/bin/zookeeper-server-start.sh -daemon /data/kafka/config/zookeeper.properties  

  3. # 启动kafka
  4. /data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties
复制代码
3 kafka客户端配置登录认证
3.1 安装kafka客户端

在/root 路径下新拷贝一个kafka安装包,解压后重命名,作为kafka客户端,客户端路径 /root/kafka
3.2 创建客户端认证配置文件

在 /root/kafka 路径下创建一个 kafka_client_jaas.conf 文件:
vim /root/kafka/kafka_client_jaas.conf:
  1. KafkaClient {
  2.         org.apache.kafka.common.security.plain.PlainLoginModule required
  3.         username="producer"
  4.         password="producerpwd";
  5. };
复制代码
说明: 这里配置用户名和密码需要和服务端配置的账号密码保持一致,这里配置了producer这个用户
3.3 添加kafka-console-producer.sh认证文件路径,后面启动生产者测试时使用
vim /root/kafka/bin/kafka-console-producer.sh ,找到 “x$KAFKA_HEAP_OPTS”,添加以下参数:

  1. -Djava.security.auth.login.config=/root/kafka/kafka_client_jaas.conf
复制代码
image.png
3.4 添加kafka-console-consumer.sh认证文件路径,后面启动消费者测试时使用
vim /root/kafka/bin/kafka-console-consumer.sh ,找到 “x$KAFKA_HEAP_OPTS”,添加以下参数:

  1. -Djava.security.auth.login.config=/root/kafka/kafka_client_jaas.conf
复制代码
image.png
4 用客户端连接
  1. # 开启生产者:
  2. /root/kafka/bin/kafka-console-producer.sh --broker-list 10.7.2.201:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN


  3. # 开启消费者:
  4. /root/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.7.2.201:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
复制代码
image.png
生产者可正常生产数据,消费者能消费到数据
5 python连接kafka(python2环境)
5.1 安装kafka-python模块包

下载模块包:kafka-python-2.0.2.tar.gz,使用手册:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
解压后,执行以下命令进行安装:

  1. python setup.py install
复制代码
5.2 向kafka中写入数据
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from __future__ import print_function
  4. from kafka import KafkaProducer
  5. import json
  6. import sys

  7. reload(sys)
  8. sys.setdefaultencoding('utf8')

  9. def produceLog(topic_name, log):
  10.     kafka_producer= KafkaProducer(
  11.                                   sasl_mechanism="PLAIN",
  12.                                   security_protocol='SASL_PLAINTEXT',
  13.                                   sasl_plain_username="producer",
  14.                                   sasl_plain_password="producerpwd",
  15.                                   bootstrap_servers='10.7.2.201:9092'
  16.                                   )                                            
  17.     kafka_producer.send(topic_name, log, partition=0)
  18.     kafka_producer.flush()
  19.     kafka_producer.close()
  20.    
  21. for i in range(10):
  22.     sendlog_dict = {"name":"测试"}
  23.     sendlog_dict['ID'] = i
  24.     sendlog_srt = json.dumps(sendlog_dict, ensure_ascii=False)
  25.     produceLog("pythontest", sendlog_srt)
复制代码
执行后,在kafka中查看 pythontest 主题中的数据,如下图:
image.png
数据写入成功

5.3 消费数据

  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from __future__ import print_function
  4. from kafka import KafkaConsumer
  5. from kafka import TopicPartition
  6. import sys

  7. reload(sys)
  8. sys.setdefaultencoding('utf8')

  9. consumer= KafkaConsumer(
  10.                         'pythontest',  # 消息主题
  11.                         group_id="test",
  12.                         client_id="python",
  13.                         sasl_mechanism="PLAIN",
  14.                         security_protocol='SASL_PLAINTEXT',
  15.                         sasl_plain_username="producer",
  16.                         sasl_plain_password="producerpwd",
  17.                         bootstrap_servers='10.7.2.201:9092',
  18.                                     auto_offset_reset='earliest' # 从头开始消费
  19.                         )                                            

  20. print("主题的分区信息:{}".format(consumer.partitions_for_topic('pythontest')))
  21. print("主题列表:{}".format(consumer.topics()))
  22. print("当前消费者订阅的主题:{}".format(consumer.subscription()))
  23. print("当前消费者topic分区信息:{}".format(consumer.assignment()))
  24. print("当前消费者可消费的偏移量:{}".format(consumer.beginning_offsets(consumer.assignment())))

  25. for msg in consumer:
  26.     print(msg.value)
复制代码
image.png

免责声明:
1,海欣资源网所发布的资源由网友上传和分享,不保证信息的正确性和完整性,且不对因信息的不正确或遗漏导致的任何损失或损害承担责任。
2,海欣资源网的资源来源于网友分享,仅限用于学习交流和测试研究目的,不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。
3,海欣资源网所发布的资源由网友上传和分享,版权争议与本站无关,您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。
4,如果您喜欢,请支持正版,购买正版,得到更好的正版服务,如有侵权,请联系我们删除并予以真诚的道歉,联系方式邮箱 haixinst@qq.com
海欣资源-企业信息化分享平台。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

0

关注

0

粉丝

41

主题
热度排行
回复排行
最新贴子

Archiver|手机版|海欣资源 ( 湘ICP备2021008090号-1 )|网站地图

GMT+8, 2024-12-4 02:53 , Gzip On, MemCached On.

免责声明:本站所发布的资源和文章均来自网络,仅限用于学习交流和测试研究目的,不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。 本站信息来自网络,版权争议与本站无关,您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。 如果您喜欢,请支持正版,购买正版,得到更好的正版服务,如有侵权,请联系我们删除并予以真诚的道歉。