# kafka搭建以及使用

# kafka 基础概念

  • producer 生产者

    消息的发送者叫 Producer

  • consume 消费者

    接受者是 Consumer

  • broker 经理人 Kafka 集群中有很多台 Server,其中每一台 Server 都可以存储消息,将每一台 Server 称为一个 kafka 实例,也叫做 broker。

  • topic 主题 一个 topic 里保存的是同一类消息,相当于对消息的分类

  • partition 分区 每个 topic 都可以分成多个 partition,每个 partition 在存储层面是 append log 文件

    为什么要进行分区呢?最根本的原因就是:kafka基于文件进行存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此,采用分区的办法,一个分区对应一个文件,这样就可以将数据分别存储到不同的server上去,另外这样做也可以负载均衡,容纳更多的消费者。

  • offset 偏移量

    一个分区对应一个磁盘上的文件,而消息在文件中的位置就称为 offset(偏移量),offset 为一个 long 型数字,它可以唯一标记一条消息。由于kafka 并没有提供其他额外的索引机制来存储 offset,文件只能顺序的读写,所以在kafka中几乎不允许对消息进行“随机读写”

# 总结一下 Kafka 的几个要点:

  • kafka 是一个基于发布-订阅的分布式消息系统(消息队列)
  • Kafka 面向大数据,消息保存在主题中,而每个 topic 有分为多个分区
  • kafka 的消息数据保存在磁盘,每个 partition 对应磁盘上的一个文件,消息写入就是简单的文件追加,文件可以在集群内复制备份以防丢失 即使消息被消费,kafka 也不会立即删除该消息,可以通过配置使得过一段时间后自动删除以释放磁盘空间
  • kafka依赖分布式协调服务Zookeeper,适合离线/在线信息的消费,与 storm 和 spark 等实时流式数据分析常常结合使用

一个 topic 对应的多个 partition 分散存储到集群中的多个 broker 上,存储方式是一个 partition 对应一个文件,每个 broker 负责存储在自己机器上的 partition 中的消息读写。

订阅 Topic 是以一个消费组来订阅的,发送到 Topic 的消息,只会被订阅此 Topic 的每个 group 中的一个 consumer 消费。

1666852785183.png

1666852922116.png

# docker compose安装

创建一个docker-compose.yml然后docker-compose up -d启动容器

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on: [ zookeeper ]
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.1.9
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  kafka-manager:  
    image: sheepkiller/kafka-manager                ## 镜像:开源的web管理kafka集群的界面
    environment:
        ZK_HOSTS: 192.168.1.9                    ## 修改:宿主机IP
    ports:  
      - "9000:9000"                                 ## 暴露端口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 使用

kfkafkafakademo_kafka_1
kafakademo_zookeeper_1

# 创建topic

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --partitions 4 --zookeeper kafakademo_zookeeper_1:2181 --replication-factor 1 
1

# 查看刚刚创建的topic

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper kafakademo_zookeeper_1:2181 --describe --topic test
1

# 发布消息

$KAFKA_HOME/bin/kafka-console-producer.sh --topic=test --broker-list localhost:9092
1

# 订阅消息

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

log-wrap
1
2
3

# 操作

--bootstrap-server(String: server toconnect to) 连接kafka broker 主机名称和端口号
--topic<String: topic>操作的topic名称
--create 创建主题
--delete 删除主题
--alter 修改主题
--list 查看所有主题
--describe 查看主题详细描述
--partitions 设置分区数
--replication-factor 设置分区副本
--config 更新系统默认的配置
1
2
3
4
5
6
7
8
9
10
进入kfaka容器
docker exec -it 65b7225a79c6ee3438be6cb884eb287fba7238429977c294a16b3fe8785f2a18 bash
1
2

# 集成SpringBoot

# 导入kafka包

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.11.0.2</version>
        </dependency>
1
2
3
4
5
6
7
8
9
10

# 生产者Producer

public class CustomProducer {
    public static void main(String[] args) {
        //0配置
        Properties properties = new Properties();
        //连接集群bootstrap.servers
        properties.put("bootstrap.servers", "192.168.1.9:9092");
//        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
        //指定key和value的序列化类型key.serializer
//        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
//        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        properties.put();
//        properties.put()

        //1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //2.发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("test1", "warren"));
        }
        kafkaProducer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

带回调

public class CustomProducerCallBack {
    public static void main(String[] args) {
        //0配置
        Properties properties = new Properties();
        //连接集群bootstrap.servers
        properties.put("bootstrap.servers", "192.168.1.9:9092");
//        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
        //指定key和value的序列化类型key.serializer
//        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
//        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        properties.put();
//        properties.put()

        //1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //2.发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("test1", "warren" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题: " + recordMetadata.topic() + "分区:" + recordMetadata.partition());
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

同步方法kafkaProducer.send().get()

public class CustomProducerSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //0配置
        Properties properties = new Properties();
        //连接集群bootstrap.servers
        properties.put("bootstrap.servers", "192.168.1.9:9092");
//        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
        //指定key和value的序列化类型key.serializer
//        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
//        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        properties.put();
//        properties.put()

        //1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //2.发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("test1", "warren" + i)).get();
        }
        kafkaProducer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

同步在同一个分区中partition,需要加属性key,一般是表名

            kafkaProducer.send(new ProducerRecord<>("test1", "b","warren" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题: " + recordMetadata.topic() + "分区:" + recordMetadata.partition());
                    }
                }
            });
1
2
3
4
5
6
7
8

# 自定义分区

public class MyPartitionner implements Partitioner {
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        //获取数据
        String msgValues = o1.toString();
        int partition;
        if (msgValues.contains("warren")) {
            partition = 0;
        }  else {
            partition = 1;
        }
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

使用

        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.warren.blog.testkafaka.MyPartitionner");
1

# kafka生产环境提高吞吐量


  //0 配置
  Properties properties = new Properties();
  //连接kafka集群
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.9:9092");
  //序列化
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

  //设置缓冲区大小
  properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

  //批次大小
  properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

  //linger.ms

  properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  //压缩
  properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# kafka数据可靠性

        // acks
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        // 重试三次
        properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
1
2
3
4

# kafka幂等性和至少一次

<PID, Partition, SeqNumber>
PID:kafka每次重启都会分配一个新的
Partition:表示分区号
Sequence Number是单调自增的

所以幂等性只能保证的是在单分区单会话內不重复
enable.idempotence默认true
1
2
3
4
5
6
7

# kafka事物

public class CustomProducerTranactions {
    public static void main(String[] args) {
        //0配置
        Properties properties = new Properties();
        //连接集群bootstrap.servers
        properties.put("bootstrap.servers", "192.168.1.9:9092");
//        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
        //指定key和value的序列化类型key.serializer
//        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
//        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //必须指定事物id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranaction_id_01");

        //1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //2.发送数据

        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        try {
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("test1", "test tranactions" + i));
            }
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            kafkaProducer.close();
        }
        kafkaProducer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 相关知识点

1.为了方便扩展,并提高吞吐量,一个topic分为多个partition
1

# 数据有序

单分区内,有序
多分区,分区与分区间无序
1
2

# 点对点模式

# 发布/订阅模式

# 相关资料

搭建 (opens new window)

全量分析

评 论:

上次更新时间: 1/12/2024, 4:11:56 PM