博客
关于我
kafka实战
阅读量:796 次
发布时间:2023-03-21

本文共 7270 字,大约阅读时间需要 24 分钟。

Kafka详细介绍与实战操作

Kafka是一种高效的分布式消息队列系统,近年来在大数据处理和实时数据流处理领域得到了广泛应用。本文将从基础到实践,详细介绍Kafka的核心概念、安装配置以及实际应用案例。


Kafka的核心概念

1. Broker

Kafka集群由多个服务器组成,每个服务器称为Broker。这些Broker通过 replication机制确保数据的高可用性和可靠性。默认情况下,Kafka Broker服务器会监听特定端口(如9092),接收来自生产者的消息请求。

2. Topic

Topic是Kafka中组织和管理消息的核心概念。每条发布到Kafka集群的消息都有一个类别(Topic),逻辑上不同Topic的消息分开存储,物理上可能分布在多个Broker上。通过指定Topic,生产者和消费者可以只关注自己感兴趣的消息。

3. Partition

Partition是物理上的概念,每个Topic可以包含多个Partition。Kafka根据均衡策略将消息分配到不同的Partition中。每个Partition中的消息按顺序存储,最新的消息会最后被消费。

4. Producer(生产者)

生产者是向Kafka Broker发送消息的程序。生产者负责将数据发布到指定的Topic中,可以选择指定目标Partition或让Kafka自动均衡消息分布。

5. Consumer(消费者)

消费者是向Kafka Broker读取消息的程序。消费者可以通过订阅特定的Topic和Partition,拉取最新的消息进行处理。Kafka支持多个消费者同时消费同一Topic的消息,每个消费者属于一个特定的Consumer Group。

6. Consumer Group

Consumer Group是一个逻辑概念,每个消费者都属于一个特定的Consumer Group。通过为消费者指定Group ID,可以实现多个消费者协同工作,共同消费消息流。


Kafka的主要功能

Kafka主要提供三种核心功能:

  • 发布与订阅消息流

    Kafka允许生产者发布消息流,消费者订阅特定Topic的消息流,这是消息队列的典型功能。

  • 存储消息流的容错机制

    Kafka以文件的方式存储消息流,支持多次复制(replication),确保数据的高可用性和持久性。

  • 实时数据流处理

    Kafka支持在消息发布时就进行处理,这使得它成为构建实时数据流管道的理想工具。


  • Kafka的使用场景

  • 构建实时数据流管道

    Kafka常用于将系统或应用程序之间的数据通过实时流管道传输,确保数据的高效传输和可靠性。

  • 构建实时数据处理程序

    Kafka可以用于构建处理实时数据流的程序,例如数据分析、转换或触发事件处理。

  • 支持大规模的发布-订阅场景

    Kafka的分区机制使其能够处理非常大的消息量,适用于分布式系统中的数据同步和异步通信。


  • Kafka的详细介绍

    1.3.1 消息传输流程

    Kafka的消息传输过程如下:

    • 生产者:向Kafka Broker发送消息时,会根据Topic指定目标Partition或让Kafka自动均衡消息分布。
    • Broker:接收消息并存储到相应的Partition中。
    • 消费者:通过订阅Topic和Partition,拉取最新的消息进行处理。

    1.3.2 Kafka服务器的消息存储策略

    Kafka的存储策略基于分区(Partition)实现:

    • 每个Topic可以指定多个Partition,消息按顺序存储。
    • 最新消息会被存储在最后一个Partition中。
    • 消费者可以通过Partition的offset记录位置,实现按顺序消费。

    1.3.3 与生产者的交互

    • 生产者可以通过指定Partition直接将消息发送到特定分区,或让Kafka自动均衡。
    • 如果不指定Partition,Kafka会采用随机均衡策略。

    1.3.4 与消费者的交互

    • 消费者通过offset记录当前消费位置,Kafka会根据offset拉取最新的消息。
    • 每个Consumer Group中的消费者数量不应超过Partition数量,否则多余的消费者将无法接收消息。

    Kafka安装与使用

    2.1 下载

    从Kafka官网下载最新版本的安装包。选择二进制版本(tgz文件),例如版本0.11.0.1。

    2.2 安装

  • 确保安装了JDK(Kafka依赖Java运行环境)。
  • 解压下载的Kafka安装包到指定目录(例如/home)。
  • 启动Zookeeper和Kafka服务器。
  • 2.3 配置

    • consumer.properites:消费者配置,默认值适用。
    • producer.properites:生产者配置,默认值适用。
    • server.properties:Kafka服务器配置,需配置Broker ID、监听地址和Zookeeper地址。

    2.4 运行

  • 启动Zookeeper:
    cd /path/to/kafka
    bin/zookeeper-server-start.sh config/zookeeper.properties
  • 启动Kafka:
    bin/kafka-server-start.sh config/server.properties
  • 2.5 发送第一个消息

  • 创建Topic:
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  • 启动消息消费者:
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  • 发送消息:
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

  • 使用Java程序

    3.1 创建Topic

    public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.180.128:9092");
    AdminClient adminClient = AdminClient.create(props);
    ArrayList
    topics = new ArrayList<>();
    NewTopic newTopic = new NewTopic("topic-test", 1, (short) 1);
    topics.add(newTopic);
    CreateTopicsResult result = adminClient.createTopics(topics);
    result.all().get();
    }

    3.2 生产者发送消息

    public static void main(String[] args) throws ExecutionException, InterruptedException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.180.128:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer
    producer = new KafkaProducer<>(props);
    for (int i = 0; i < 100; i++) {
    producer.send(new ProducerRecord<>("topic-test", Integer.toString(i), Integer.toString(i)));
    }
    producer.close();
    }

    3.3 消费者消费消息

    public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.180.128:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer
    consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("topic-test"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection
    collection) { }
    @Override
    public void onPartitionsAssigned(Collection
    collection) {
    consumer.seekToBeginning(collection);
    }
    });
    while (true) {
    ConsumerRecords
    records = consumer.poll(100);
    for (ConsumerRecord
    record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    }
    }

    使用Spring-Kafka

    4.1 基本配置

    在Spring配置文件中添加以下依赖:

    org.apache.kafka
    kafka-clients
    0.11.0.1
    org.apache.kafka
    kafka-streams
    0.11.0.1
    org.springframework.kafka
    spring-kafka
    1.3.0.RELEASE

    4.2 生产者配置

    @Configuration
    @EnableKafka
    public class KafkaConfig {
    @Bean
    public KafkaAdmin admin() {
    Map
    configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.128:9092");
    return new KafkaAdmin(configs);
    }
    @Bean
    public NewTopic topic1() {
    return new NewTopic("foo", 10, (short) 2);
    }
    @Bean
    public ProducerFactory
    producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public Map
    producerConfigs() {
    Map
    props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.128:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
    }
    @Bean
    public KafkaTemplate
    kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
    }
    }

    4.3 消费者配置

    @Configuration
    public class KafkaConfig {
    @Bean
    public SimpleConsumerListener simpleConsumerListener() {
    return new SimpleConsumerListener();
    }
    }
    @Component
    public class SimpleConsumerListener {
    private static final Logger logger = LoggerFactory.getLogger(SimpleConsumerListener.class);
    private final CountDownLatch latch1 = new CountDownLatch(1);
    @KafkaListener(id = "foo", topics = "topic-test")
    public void listen(byte[] records) {
    logger.info("接收到消息:" + Arrays.toString(records));
    this.latch1.countDown();
    }
    }

    最后

    本文详细介绍了Kafka的核心概念、安装配置以及实际应用案例,涵盖了从基础到实践的全过程。如果需要更深入的学习,可以参考Kafka的官方文档或相关技术博客。

    转载地址:http://tyqfk.baihongyu.com/

    你可能感兴趣的文章
    mysql_real_connect 参数注意
    查看>>
    mysql_secure_installation初始化数据库报Access denied
    查看>>
    MySQL_西安11月销售昨日未上架的产品_20161212
    查看>>
    Mysql——深入浅出InnoDB底层原理
    查看>>
    MySQL“被动”性能优化汇总
    查看>>
    MySQL、HBase 和 Elasticsearch:特点与区别详解
    查看>>
    MySQL、Redis高频面试题汇总
    查看>>
    MYSQL、SQL Server、Oracle数据库排序空值null问题及其解决办法
    查看>>
    mysql一个字段为空时使用另一个字段排序
    查看>>
    MySQL一个表A中多个字段关联了表B的ID,如何关联查询?
    查看>>
    MYSQL一直显示正在启动
    查看>>
    MySQL一站到底!华为首发MySQL进阶宝典,基础+优化+源码+架构+实战五飞
    查看>>
    MySQL万字总结!超详细!
    查看>>
    Mysql下载以及安装(新手入门,超详细)
    查看>>
    MySQL不会性能调优?看看这份清华架构师编写的MySQL性能优化手册吧
    查看>>
    MySQL不同字符集及排序规则详解:业务场景下的最佳选
    查看>>
    Mysql不同官方版本对比
    查看>>
    MySQL与Informix数据库中的同义表创建:深入解析与比较
    查看>>
    mysql与mem_细说 MySQL 之 MEM_ROOT
    查看>>
    MySQL与Oracle的数据迁移注意事项,另附转换工具链接
    查看>>