时间:2022-11-16来源:www.pcxitongcheng.com作者:电脑系统城
kafka的安装需要依赖于jdk,需要在服务器上提前安装好该环境,这里使用用jdk1.8。
官网地址:
较新的版本已自带Zookeeper,无需额外下载。这里使用3.2.0做演示。
注意要下载Binary downloads标签下的tgz包,Source download标签下的包为源码。无法直接运行,需要编译。
使用ssh连接工具将kafka_2.12-3.2.0.tgz这个包上传到云服务器上的一个目录。
打开命令行,进入到放有压缩包的目录,执行
1 | tar -zxvf kafka_2.12-3.2.0.tgz |
然后使用cd命令进入到/kafka_2.12-3.2.0/config/下,使用
1 | vi server.properties |
编辑配置文件。
删除listeners和advertised前方的#号,改成如下配置:
1 2 3 4 5 |
listeners=PLAINTEXT://云服务器内网ip:9092(本地访问用本地ip) # 如果要提供外网访问则必须配置此项 advertised.listeners=PLAINTEXT://云服务器公网ip:9092(若要远程访问需配置此项为云服务器的公网ip) # zookeeper连接地址,集群配置格式为ip:port,ip:port,ip:port zookeeper.connect=云服务器公网ip:2181 |
在云服务器控制台内进入安全组页面,添加两条新的入站规则,tcp/9092和tcp/2181
先查看使用的防火墙类型iptables/firewalld
iptables操作命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
1.打开/关闭/重启防火墙 开启防火墙(重启后永久生效):chkconfig iptables on 关闭防火墙(重启后永久生效):chkconfig iptables off 开启防火墙(即时生效,重启后失效):service iptables start 关闭防火墙(即时生效,重启后失效):service iptables stop 重启防火墙:service iptables restartd 2.查看打开的端口 /etc/init .d /iptables status 3.开启端口 iptables -A INPUT -p tcp --dport 8080 -j ACCEPT 4.保存并重启防火墙 /etc/rc .d /init .d /iptables save /etc/init .d /iptables restart |
Centos7默认安装了firewalld,如果没有安装的话,可以使用 yum install firewalld firewalld-config进行安装。
操作指令如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
1.启动防火墙 systemctl start firewalld 2.禁用防火墙 systemctl stop firewalld 3.设置开机启动 systemctl enable firewalld 4.停止并禁用开机启动 sytemctl disable firewalld 5.重启防火墙 firewall-cmd --reload 6.查看状态 systemctl status firewalld或者 firewall-cmd --state 7.在指定区域打开端口(记得重启防火墙) firewall-cmd --zone=public --add-port=80 /tcp (永久生效再加上 --permanent) |
打开tcp/9092和tcp/2181这两个端口后,重启防火墙,并查看开放的端口确实生效。
cd命令进入kafka_2.12-3.2.0目录下,执行
1 | bin /zookeeper-server-start .sh config /zookeeper .properties |
启动zookeeper,不加-daemon方便排除启动错误,新建一个shell窗口,进入该目录再执行
1 | bin /kafka-server-start .sh config /server .properties |
启动kafka,若打印日志未报错,若未出现error日志,说明启动成功。
1 2 3 4 5 6 7 8 9 |
查询kafka下所有的topic bin /kafka-topics .sh --list --zookeeper ip:port 因为kafka使用zookeeper作为配置中心,一些topic信息需要查询该kafka对应的zookeeper 创建topic bin /kafka-topics .sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test 开启生产者 bin /kafka-console-producer .sh --broker-list cos100:9092 --topic test 开启消费者 bin /kafka-console-consumer .sh --bootstrap-server cos100:9092 --topic test |
在pom.xml文件中引入kafka依赖
1 2 3 4 5 6 7 8 9 10 |
< dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka</ artifactId > < version >2.9.0</ version > </ dependency > < dependency > < groupId >org.apache.kafka</ groupId > < artifactId >kafka-clients</ artifactId > < version >3.2.0</ version > </ dependency > |
在application.yml配置文件中配置kafka
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
server: port: 8080 spring: kafka: bootstrap-servers: 云服务器外网ip地址:9092 producer: # 生产者 retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送 batch-size: 16384 buffer-memory: 33554432 acks: 1 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 # RECORD # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 # BATCH # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交 # TIME # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交 # COUNT # TIME | COUNT 有一个条件满足时提交 # COUNT_TIME # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交 # MANUAL # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种 # MANUAL_IMMEDIATE ack-mode: manual_immediate |
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@RestController public class KafkaController { private final static String TOPIC_NAME = "test-topic" ; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping ( "/send" ) public String send( @RequestParam ( "msg" ) String msg) { kafkaTemplate.send(TOPIC_NAME, "key" , msg); return String.format( "消息 %s 发送成功!" , msg); } } |
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
@Component public class DemoConsumer { /** * @param record record * @KafkaListener(groupId = "testGroup", topicPartitions = { * @TopicPartition(topic = "topic1", partitions = {"0", "1"}), * @TopicPartition(topic = "topic2", partitions = "0", * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) * },concurrency = "6") * //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数 */ @KafkaListener (topics = "test-topic" , groupId = "testGroup1" ) public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println( "testGroup1 message: " + value); System.out.println( "testGroup1 record: " + record); //手动提交offset,一般是提交一个banch,幂等性防止重复消息 // === 每条消费完确认性能不好! ack.acknowledge(); } //配置多个消费组 @KafkaListener (topics = "test--topic" , groupId = "testGroup2" ) public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println( "testGroup2 message: " + value); System.out.println( "testGroup2 record: " + record); //手动提交offset ack.acknowledge(); } } |
使用swagger测试发送消息
控制台打印消息
2024-07-07
myeclipse怎么导入tomcat教程2024-07-07
myeclipse如何启动tomcat2024-07-07
myeclipse如何绑定tomcat上线了一个小的预约程序,配置通过Nginx进行访问入口,默认的日志是没有请求时间的,因此需要配置一下,将每一次的请求的访问响应时间记录出来,备查与优化使用....
2023-03-17