# kafka搭建以及使用
# kafka 基础概念
producer 生产者
消息的发送者叫 Producer
consume 消费者
接受者是 Consumer
broker 经理人 Kafka 集群中有很多台 Server,其中每一台 Server 都可以存储消息,将每一台 Server 称为一个 kafka 实例,也叫做 broker。
topic 主题 一个 topic 里保存的是同一类消息,相当于对消息的分类
partition 分区 每个 topic 都可以分成多个 partition,每个 partition 在存储层面是 append log 文件
为什么要进行分区呢?最根本的原因就是:kafka基于文件进行存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此,采用分区的办法,一个分区对应一个文件,这样就可以将数据分别存储到不同的server上去,另外这样做也可以负载均衡,容纳更多的消费者。
offset 偏移量
一个分区对应一个磁盘上的文件,而消息在文件中的位置就称为 offset(偏移量),offset 为一个 long 型数字,它可以唯一标记一条消息。由于kafka 并没有提供其他额外的索引机制来存储 offset,文件只能顺序的读写,所以在kafka中几乎不允许对消息进行“随机读写”
# 总结一下 Kafka 的几个要点:
- kafka 是一个基于发布-订阅的分布式消息系统(消息队列)
- Kafka 面向大数据,消息保存在主题中,而每个 topic 有分为多个分区
- kafka 的消息数据保存在磁盘,每个 partition 对应磁盘上的一个文件,消息写入就是简单的文件追加,文件可以在集群内复制备份以防丢失 即使消息被消费,kafka 也不会立即删除该消息,可以通过配置使得过一段时间后自动删除以释放磁盘空间
- kafka依赖分布式协调服务Zookeeper,适合离线/在线信息的消费,与 storm 和 spark 等实时流式数据分析常常结合使用
一个 topic 对应的多个 partition 分散存储到集群中的多个 broker 上,存储方式是一个 partition 对应一个文件,每个 broker 负责存储在自己机器上的 partition 中的消息读写。
订阅 Topic 是以一个消费组来订阅的,发送到 Topic 的消息,只会被订阅此 Topic 的每个 group 中的一个 consumer 消费。
# docker compose安装
创建一个docker-compose.yml然后docker-compose up -d启动容器
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.1.9
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
kafka-manager:
image: sheepkiller/kafka-manager ## 镜像:开源的web管理kafka集群的界面
environment:
ZK_HOSTS: 192.168.1.9 ## 修改:宿主机IP
ports:
- "9000:9000" ## 暴露端口
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 使用
kfkafkafakademo_kafka_1
kafakademo_zookeeper_1
# 创建topic
$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --partitions 4 --zookeeper kafakademo_zookeeper_1:2181 --replication-factor 1
# 查看刚刚创建的topic
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper kafakademo_zookeeper_1:2181 --describe --topic test
# 发布消息
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=test --broker-list localhost:9092
# 订阅消息
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
log-wrap
2
3
# 操作
--bootstrap-server(String: server toconnect to) 连接kafka broker 主机名称和端口号
--topic<String: topic>操作的topic名称
--create 创建主题
--delete 删除主题
--alter 修改主题
--list 查看所有主题
--describe 查看主题详细描述
--partitions 设置分区数
--replication-factor 设置分区副本
--config 更新系统默认的配置
2
3
4
5
6
7
8
9
10
进入kfaka容器
docker exec -it 65b7225a79c6ee3438be6cb884eb287fba7238429977c294a16b3fe8785f2a18 bash
2
# 集成SpringBoot
# 导入kafka包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.2</version>
</dependency>
2
3
4
5
6
7
8
9
10
# 生产者Producer
public class CustomProducer {
public static void main(String[] args) {
//0配置
Properties properties = new Properties();
//连接集群bootstrap.servers
properties.put("bootstrap.servers", "192.168.1.9:9092");
// ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
//指定key和value的序列化类型key.serializer
// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// properties.put();
// properties.put()
//1.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//2.发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test1", "warren"));
}
kafkaProducer.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
带回调
public class CustomProducerCallBack {
public static void main(String[] args) {
//0配置
Properties properties = new Properties();
//连接集群bootstrap.servers
properties.put("bootstrap.servers", "192.168.1.9:9092");
// ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
//指定key和value的序列化类型key.serializer
// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// properties.put();
// properties.put()
//1.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//2.发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test1", "warren" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println("主题: " + recordMetadata.topic() + "分区:" + recordMetadata.partition());
}
}
});
}
kafkaProducer.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
同步方法kafkaProducer.send().get()
public class CustomProducerSync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//0配置
Properties properties = new Properties();
//连接集群bootstrap.servers
properties.put("bootstrap.servers", "192.168.1.9:9092");
// ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
//指定key和value的序列化类型key.serializer
// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// properties.put();
// properties.put()
//1.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//2.发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test1", "warren" + i)).get();
}
kafkaProducer.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
同步在同一个分区中partition,需要加属性key,一般是表名
kafkaProducer.send(new ProducerRecord<>("test1", "b","warren" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println("主题: " + recordMetadata.topic() + "分区:" + recordMetadata.partition());
}
}
});
2
3
4
5
6
7
8
# 自定义分区
public class MyPartitionner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
//获取数据
String msgValues = o1.toString();
int partition;
if (msgValues.contains("warren")) {
partition = 0;
} else {
partition = 1;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
使用
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.warren.blog.testkafaka.MyPartitionner");
# kafka生产环境提高吞吐量
//0 配置
Properties properties = new Properties();
//连接kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.9:9092");
//序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//设置缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# kafka数据可靠性
// acks
properties.put(ProducerConfig.ACKS_CONFIG, "1");
// 重试三次
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
2
3
4
# kafka幂等性和至少一次
<PID, Partition, SeqNumber>
PID:kafka每次重启都会分配一个新的
Partition:表示分区号
Sequence Number是单调自增的
所以幂等性只能保证的是在单分区单会话內不重复
enable.idempotence默认true
2
3
4
5
6
7
# kafka事物
public class CustomProducerTranactions {
public static void main(String[] args) {
//0配置
Properties properties = new Properties();
//连接集群bootstrap.servers
properties.put("bootstrap.servers", "192.168.1.9:9092");
// ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
//指定key和value的序列化类型key.serializer
// ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//必须指定事物id
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranaction_id_01");
//1.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//2.发送数据
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
try {
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test1", "test tranactions" + i));
}
kafkaProducer.commitTransaction();
} catch (Exception e) {
kafkaProducer.close();
}
kafkaProducer.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 相关知识点
1.为了方便扩展,并提高吞吐量,一个topic分为多个partition
# 数据有序
单分区内,有序
多分区,分区与分区间无序
2