【Kafka学习】

Kafka介绍

  • kafka背景
  • 一、 什么是kafka
    • 1.1 kafka基本术语
    • 1.2 kafka特性
    • 1.3 kafka使用场景
    • 1.4 kafka的topic为什么要分区?
  • 二、Kafka安装
    • 2.1 kafak启动
    • 2.2 kafka常见命令
  • 三、SpringBoot+Kafka
    • 3.1 依赖引入
    • 3.2 kafka配置
    • 3.3 生产者config
    • 3.4 消费者config
    • 3.5 注册topic并发送消息
    • 3.6 监听器
    • 3.7 KafkaTemplate发送消息及结果确认
    • 3.8 启动类

kafka背景

一、 什么是kafka

Kafka 是一个分布式的基于发布/订阅模式的消息队列,主要应用与大数据实时处理领域。

1.1 kafka基本术语

1、 生产者: 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。
2、消费者:订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。
3、主题:消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。
4、分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性。为了实现扩展性,提高并发能力,一个非常大的 topic 可以分布到多个 broker (即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个 有序的队列。
5、消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。
6、broker: 一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
7、偏移量:偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

1.2 kafka特性

1、高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。/2、持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
3、容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
4、高并发: 支持数千个客户端同时读写

1.3 kafka使用场景

1、数据跟踪:Kafka 可以用来跟踪用户行为,比如我们经常京东、淘宝购物,打开APP时,你的登陆信息会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的点击某一商品的点击率,你的购物爱好都会作为一个个消息传递给 Kafka ,然后生成报告,就可以根据你的喜好像你推荐展示你喜欢的内容。
2、限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。

1.4 kafka的topic为什么要分区?

如果一个topic消息只存于一个broker,那么这个broker就会有瓶颈,无法做到水平扩展。所以把topic内的数据分布到整个集群中。分区可以解决水平扩展的一个方案。

二、Kafka安装

2.1 kafak启动

kafka 是基于 Zookeeper 的消息管理系统,所以启动的时候是需要使用到 Zookeeper ,但其内置了Zookeeper ,所以只需要根据bin目录下的文件进行启动即可。
1、启动zookeeper

bin\windows\zookeeper-server-start.bat config\zookeeper.properties


2、启动kafka

bin\windows\kafka-server-start.bat config\server.properties

2.2 kafka常见命令

1、创建topic

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2、这样我们就创建了一个名为test的Topic。

我们也可以通过命令来查看我们已经创建的 Topic:

.\kafka-topics.bat --list --zookeeper 127.0.0.1:2181

删除topic:

.\kafka-topics.bat --delete --zookeeper 127.0.0.1:2181 --topic test

3、发送消息

.\kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic test1
>hello world
>beijing

4、消费消息

.\kafka-console-consumer.bat --zookeeper 127.0.0.1:2181 --topic test1

5、停止

bin/kafka-server-stop.sh
bin/zookeeper-server-stop.sh

三、SpringBoot+Kafka

3.1 依赖引入

<dependency>             
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

3.2 kafka配置

#============== kafka ===================
kafka.consumer.servers=localhost:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=earliest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10kafka.producer.servers=192.168.217.128:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960

3.3 生产者config

package com.example.kafka_demo.config;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${kafka.producer.servers}")private String servers;@Value("${kafka.producer.retries}")private int retries;@Value("${kafka.producer.batch.size}")private int batchSize;@Value("${kafka.producer.linger}")private int linger;@Value("${kafka.producer.buffer.memory}")private int bufferMemory;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, linger);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}
}

3.4 消费者config

package com.example.kafka_demo.config;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${kafka.consumer.servers}")private String servers;@Value("${kafka.consumer.enable.auto.commit}")private boolean enableAutoCommit;@Value("${kafka.consumer.session.timeout}")private String sessionTimeout;@Value("${kafka.consumer.auto.commit.interval}")private String autoCommitInterval;@Value("${kafka.consumer.group.id}")private String groupId;@Value("${kafka.consumer.auto.offset.reset}")private String autoOffsetReset;@Value("${kafka.consumer.concurrency}")private int concurrency;@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(concurrency);factory.getContainerProperties().setPollTimeout(1500);return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return propsMap;}} 

3.5 注册topic并发送消息

@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping("/flink")public void  test(){TestClass list = userMapper.getList();String string = JSON.toJSONString(list);kafkaTemplate.setProducerListener(kafkaSendResultHandler);kafkaTemplate.send("topic_1", "key1", string);}

3.6 监听器

负责接受订阅topic的信息

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class Listener {protected final Logger logger = LoggerFactory.getLogger(this.getClass());@KafkaListener(topics = {"topic_1"})public void listen(ConsumerRecord<?, ?> record) {System.out.println("kafka的record: " + record);System.out.println("kafka的key: " + record.key());System.out.println("kafka的value: " + record.value());}}

3.7 KafkaTemplate发送消息及结果确认

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;@Component
public class KafkaSendResultHandler implements ProducerListener {@Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {System.out.println("消息发送成功");}@Overridepublic void onError(ProducerRecord producerRecord, Exception exception) {System.out.println("消息发送失败");}
}

3.8 启动类