Kafka

什么是kafka

Kafka是由Apache软件基金会开发的 一个开源流处理平台,由Scala语言编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka最初是由LinkedIn开发,2011年开源

kafka集群结构

屏幕截图 2023-02-19 162528

  • broker:节点,集群有多大,broker就有多少,要是整数型且唯一

  • topic:主题,kafka面向主题存储,分门别类的存储数据

  • partition:分区,可以应对更多的数据,数据量允许超过单节点kafka的大小

  • leader:负责日常的读写操作

  • follower:负责备份leader的数据,如果leader,会从follower中选出新的leader,继续工作

image-20230218153232511
消费者组:可以提高消费者性能,只要消费的组id是同一个,那么就是一个组的,一个组的消费者实例不允许多余分区的个数
屏幕截图 2023-02-19 163557

kafka的特征和优势

  • Kafka作为消息队列,他和其他同类产品相比,突出的特点就是性能强大

  • kafka将消息保存在硬盘中

  • kafka对硬盘的读取规则进行优化后,效率能够接近内存

  • 硬盘的优化规则主要依靠:顺序读写、零拷贝、日志压缩等技术

  • kafka的性能是恒定的,和数据的大小无关

  • kafka处理队列中的数据默认设置:

1
2
kafka默认队列中的信息保存7天,可以配置这个时间
kafka队列信息能够一直向硬盘中保存(理论上没有大小限制)

kafka的安装和配置

必须将我们的kafka软件的解压位置设置在一个根目录,文件夹名称尽量短,然后路径不要有中文和空格(新建文件夹kafka , 在kafka 中存放解压后的kafka , 在D:\kafka 文件夹中创建 data)

还需创建一个data文件夹:用于保存kafka启动后,在运行过程中产生的数据
屏幕截图 2023-02-19 165804
打开config目录下的server.properties文件,修改

1
log.dirs=D:/kafka/data

image
打开config目录下的zookeeper.properties文件,修改

1
dataDir=D:/kafka/data

启动kafka

image
首先进入D:\kafka\kafka_2.13-2.4.1\bin\windows目录

打开dos窗口

启动zookeeper

1
zookeeper-server-start.bat ..\..\config\zookeeper.properties

启动kafka

1
kafka-server-start.bat ..\..\config\server.properties

演示kafka

启动的kafka和zookeeper的窗口不要关

我们 csmall-cart-webapi中测试

添加依赖

1
2
3
4
5
6
7
8
9
10
<!--使用kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--能够将java对象和json字符串相互转换的依赖-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>

配置

1
2
3
4
5
6
spring:
kafka:
bootstrap-servers: localhost:9092 #配置kafka服务器的位置
# consumer.group-id是比不配置的信息,是组名
consumer:
group-id: csmall

在SpringBoot启动类中添加启动Kafka的注解

1
2
3
4
5
6
7
8
9
10
11
12
13
//启动支持kafka
@EnableKafka
//为了测试kafka发送和接受消息的效果,利用SpringBoot自带的任务调度工具,周期的发送消息到kafka
@EnableScheduling
@SpringBootApplication
@EnableDubbo
public class CsmallCartWebapiApplication {

public static void main(String[] args) {
SpringApplication.run(CsmallCartWebapiApplication.class, args);
}

}

下面我们就可以实现周期性的向kafka发送消息并接收的操作了

编写消息的发送

cart-webapi包下创建kafka包

包中创建Producer类来发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.tedu.csmall.cart.webapi.kafka;

import cn.tedu.csmall.commons.pojo.cart.model.CartTbl;
import com.google.gson.Gson;
import org.apache.commons.lang.math.RandomUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

//周期性的向kafka发送消息
//Producer实例需要交给Spring容器管理
@Component
public class Producer {
//KafkaTemplate<[主题的类型],[发送消息的类型]>
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

int i=1;
//设置每个10s中发送一次消息
@Scheduled(fixedRate = 10000)
public void sendMessage(){
//实例化一个CartTbl对象,当做消息发送
CartTbl cart = new CartTbl();
cart.setId(i++);
cart.setCommodityCode("PC100");
cart.setPrice(RandomUtils.nextInt(300)+50);
cart.setCount(RandomUtils.nextInt(10)+1);
cart.setUserId("UU100");
//将cart对象转成json格式的字符串
Gson gson = new Gson();
String json = gson.toJson(cart);
System.out.println("本次发送的消息为:"+json);
//执行发送
kafkaTemplate.send("myCart",json);
}
}

创建一个叫Consumer的类来接收消息创建一个叫Consumer的类来接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
public class Consumer {
//SpringKafka接受消息依靠的是一个监听机制
//SpringKafka提供了一个监听线程,实时关注kafka的消息往来
//如果有消息发送到指定的主题,就会自动调用下面的方法
@KafkaListener(topics = "myCart")
public void received(ConsumerRecord<String,String> record){
//ConsumerRecord<[主题类型],[消息类型]>
//record是消息本身
String json = record.value();
//将json格式转成指定的java对象
Gson gson = new Gson();
CartTbl cart = gson.fromJson(json, CartTbl.class);
System.out.println("接受到消息:"+cart);

}
}