本文共 7270 字,大约阅读时间需要 24 分钟。
Kafka是一种高效的分布式消息队列系统,近年来在大数据处理和实时数据流处理领域得到了广泛应用。本文将从基础到实践,详细介绍Kafka的核心概念、安装配置以及实际应用案例。
Kafka集群由多个服务器组成,每个服务器称为Broker。这些Broker通过 replication机制确保数据的高可用性和可靠性。默认情况下,Kafka Broker服务器会监听特定端口(如9092),接收来自生产者的消息请求。
Topic是Kafka中组织和管理消息的核心概念。每条发布到Kafka集群的消息都有一个类别(Topic),逻辑上不同Topic的消息分开存储,物理上可能分布在多个Broker上。通过指定Topic,生产者和消费者可以只关注自己感兴趣的消息。
Partition是物理上的概念,每个Topic可以包含多个Partition。Kafka根据均衡策略将消息分配到不同的Partition中。每个Partition中的消息按顺序存储,最新的消息会最后被消费。
生产者是向Kafka Broker发送消息的程序。生产者负责将数据发布到指定的Topic中,可以选择指定目标Partition或让Kafka自动均衡消息分布。
消费者是向Kafka Broker读取消息的程序。消费者可以通过订阅特定的Topic和Partition,拉取最新的消息进行处理。Kafka支持多个消费者同时消费同一Topic的消息,每个消费者属于一个特定的Consumer Group。
Consumer Group是一个逻辑概念,每个消费者都属于一个特定的Consumer Group。通过为消费者指定Group ID,可以实现多个消费者协同工作,共同消费消息流。
Kafka主要提供三种核心功能:
发布与订阅消息流
Kafka允许生产者发布消息流,消费者订阅特定Topic的消息流,这是消息队列的典型功能。存储消息流的容错机制
Kafka以文件的方式存储消息流,支持多次复制(replication),确保数据的高可用性和持久性。实时数据流处理
Kafka支持在消息发布时就进行处理,这使得它成为构建实时数据流管道的理想工具。构建实时数据流管道
Kafka常用于将系统或应用程序之间的数据通过实时流管道传输,确保数据的高效传输和可靠性。构建实时数据处理程序
Kafka可以用于构建处理实时数据流的程序,例如数据分析、转换或触发事件处理。支持大规模的发布-订阅场景
Kafka的分区机制使其能够处理非常大的消息量,适用于分布式系统中的数据同步和异步通信。Kafka的消息传输过程如下:
Kafka的存储策略基于分区(Partition)实现:
从Kafka官网下载最新版本的安装包。选择二进制版本(tgz文件),例如版本0.11.0.1。
/home
)。cd /path/to/kafkabin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.180.128:9092"); AdminClient adminClient = AdminClient.create(props); ArrayListtopics = new ArrayList<>(); NewTopic newTopic = new NewTopic("topic-test", 1, (short) 1); topics.add(newTopic); CreateTopicsResult result = adminClient.createTopics(topics); result.all().get();}
public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.180.128:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<>("topic-test", Integer.toString(i), Integer.toString(i))); } producer.close();}
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.180.128:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("topic-test"), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection collection) { } @Override public void onPartitionsAssigned(Collection collection) { consumer.seekToBeginning(collection); } }); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }}
在Spring配置文件中添加以下依赖:
org.apache.kafka kafka-clients 0.11.0.1 org.apache.kafka kafka-streams 0.11.0.1 org.springframework.kafka spring-kafka 1.3.0.RELEASE
@Configuration@EnableKafkapublic class KafkaConfig { @Bean public KafkaAdmin admin() { Mapconfigs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.128:9092"); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("foo", 10, (short) 2); } @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.128:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }}
@Configurationpublic class KafkaConfig { @Bean public SimpleConsumerListener simpleConsumerListener() { return new SimpleConsumerListener(); }}@Componentpublic class SimpleConsumerListener { private static final Logger logger = LoggerFactory.getLogger(SimpleConsumerListener.class); private final CountDownLatch latch1 = new CountDownLatch(1); @KafkaListener(id = "foo", topics = "topic-test") public void listen(byte[] records) { logger.info("接收到消息:" + Arrays.toString(records)); this.latch1.countDown(); }}
本文详细介绍了Kafka的核心概念、安装配置以及实际应用案例,涵盖了从基础到实践的全过程。如果需要更深入的学习,可以参考Kafka的官方文档或相关技术博客。
转载地址:http://tyqfk.baihongyu.com/