kafka和kakfa-manager部署
# 安装kafka-3.1.0
# zookeeper集群部署
# 下载kafka,配置
运行主机:192.168.14.21
- 配置环境
192.168.14.21 hdss14-21.host.com #我这里走的内部dns
zookeeper 集群模式,本机也部署了一份,所以用的localhost:2181
2
- 下载安装
wget -c https://archive.apache.org/dist/kafka/3.1.0/kafka_2.12-3.1.0.tgz
tar xf kafka_2.12-3.1.0.tgz -C /opt/
ln -s /opt/kafka_2.12-3.1.0/ /opt/kafka
mkdir /data/kafka/logs -p
cd /opt/kafka
2
3
4
5
6
- 修改kafka-3.1.0配置
vim config/server.properties
做好主机解析,
[root@hdss14-21 kafka]# grep -Ev "^$|#" config/server.properties
broker.id=0
listeners=PLAINTEXT://hdss14-21.host.com:9092
advertised.listeners=PLAINTEXT://hdss14-21.host.com:9092
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
#dataDir=/data/kafka/data
num.partitions=4
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
max.request.size=33554432
message.max.bytes=33554432
[root@hdss14-21 kafka]# grep -Ev "^$|#" config/connect-distributed.properties #修改max.request.size 大小
max.request.size=33554432
message.max.bytes = 33554432
producer.max.request.size=33554432
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
- 配置jdk
- 手动启动
cd /opt/kafka && bin/kafka-server-start.sh -daemon config/server.properties
- 添加到systemd
vi /usr/lib/systemd/system/kafka.service
[Unit]
Description=Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
Environment="PATH=/usr/local/jdk1.8/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin"
User=root
Group=root
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- systemd启动
systemctl enable kafka.service
systemctl start kafka.service
2
3
# 安装kafka-manager-3.0.0.6
3.0.0.5 支持kafka最高版本:2.4.0
3.0.0.6 支持kafka最高版本:3.1.0
- dockerHUb镜像,镜像比较老
docker-hub镜像 (opens new window)
# 制作镜像
scala-sbt基础镜像 (opens new window)
github构建镜像kafka-manager文档 (opens new window)
- 拉取构建镜像
docker pull hseeberger/scala-sbt:11.0.14.1_1.6.2_2.12.15
由于网络问题,建议使用境外服务器
FROM hseeberger/scala-sbt:11.0.14.1_1.6.2_2.12.15
ENV ZK_HOSTS=zookeeper.host.com:2181 \
KM_VERSION=3.0.0.6
RUN cd /tmp && sleep 10 && \
test -f ${KM_VERSION}.tar.gz || wget https://github.com/yahoo/CMAK/archive/refs/tags/${KM_VERSION}.tar.gz
RUN cd /tmp && \
tar xf ${KM_VERSION}.tar.gz && \
cd CMAK-${KM_VERSION} && \
sbt clean dist && \
unzip target/universal/cmak-${KM_VERSION}.zip -d / && \
mv /cmak-${KM_VERSION} /kafka-manager-${KM_VERSION} && \
rm -rf /tmp/CMAK-${KM_VERSION} /home/sbtuser/ /usr/share/{doc,sbt,scala}
WORKDIR /kafka-manager-${KM_VERSION}
EXPOSE 29000
ENTRYPOINT ["bin/cmak","-Dconfig.file=conf/application.conf","-Dhttp.port=29000"]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 构建镜像
docker build -t kafka-manager:3.0.0.6 .
# K8s部署kafka-manager
- 创建deployment.yml
mkdir /opt/application/kafka && cd /opt/application/kafka
vi kafka-manager-dp.yml
注意💥💥:zookeeper.host.com 需要添加解析
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
name: kafka-manager
namespace: devops
labels:
name: kafka-manager
spec:
replicas: 1
selector:
matchLabels:
name: kafka-manager
template:
metadata:
labels:
app: kafka-manager
name: kafka-manager
spec:
containers:
- name: kafka-manager
image: harbor.yfklife.cn/public/kafka-manager:3.0.0.6
ports:
- containerPort: 29000
protocol: TCP
env:
- name: ZK_HOSTS
value: zookeeper.host.com:2181
- name: APPLICATION_SECRET
value: letmein
imagePullPolicy: IfNotPresent
imagePullSecrets:
- name: harbor
restartPolicy: Always
terminationGracePeriodSeconds: 30
securityContext:
runAsUser: 0
schedulerName: default-scheduler
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 1
revisionHistoryLimit: 7
progressDeadlineSeconds: 600
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
- 创建service.yml
vi kafka-manager-svc.yml
kind: Service
apiVersion: v1
metadata:
name: kafka-manager
namespace: devops
spec:
ports:
- protocol: TCP
port: 29000
targetPort: 29000
selector:
app: kafka-manager
2
3
4
5
6
7
8
9
10
11
12
- 创建ingress.yml
vi kafka-manager-ingress.yml
kind: Ingress
apiVersion: extensions/v1beta1
metadata:
name: kafka-manager
namespace: devops
spec:
rules:
- host: kafka-manager.yfklife.cn
http:
paths:
- path: /
backend:
serviceName: kafka-manager
servicePort: 29000
2
3
4
5
6
7
8
9
10
11
12
13
14
- 创建,添加域名解析,访问
kubectl apply -f kafka-manager-dp.yml
kubectl apply -f kafka-manager-ingress.yml
kubectl apply -f kafka-manager-svc.yml
2
3
访问: http://kafka-manager.yfklife.cn
# kafka-manager 添加kafka
zookeeper.host.com 添加解析,按提示修改默认的值,Save
# kafka-manager问题(KeeperErrorCode = Unimplemented for /kafka-manager/mutex)
github-issue (opens new window)
解决办法:登录zk,手动创建数据
#Version:
zookeeper:3.4.14
wurstmeister/kafka_2.12-3.1.0
kafkamanager/kafka-manager:3.0.0.6
[root@hdss14-21 kafka]# /opt/zookeeper/bin/zkCli.sh
................省略........................
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /kafka-manager
[configs, deleteClusters, clusters]
[zk: localhost:2181(CONNECTED) 1] create /kafka-manager/mutex ""
Created /kafka-manager/mutex
[zk: localhost:2181(CONNECTED) 2] create /kafka-manager/mutex/locks ""
Created /kafka-manager/mutex/locks
[zk: localhost:2181(CONNECTED) 3] create /kafka-manager/mutex/leases ""
Created /kafka-manager/mutex/leases
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- 查看cluster list
# Kafka学习笔记
# logstash
- 使用logstash配置输入日志
vi www-to-kafka.conf
input {
file{
path => "/opt/logs/stat/deviveactive/stat*.log"
type => "logs-deviveactive"
start_position => "beginning" #定义文件读取的位置
}
file{
path => "/opt/logs/stat/gamerole/stat*.log"
type => "logs-gamerole"
start_position => "beginning" #定义文件读取的位置
}
}
filter {
mutate {
remove_field => "@version"
remove_field => "@timestamp"
remove_field => "host"
remove_field => "path"
}
}
output {
if [type] == "logs-deviveactive"{
kafka {
bootstrap_servers => "172.16.255.17:9092" #生产者
topic_id => "deviveactive" #设置写入kafka的topic
compression_type => "snappy" #消息压缩模式,默认是none,可选gzip、snappy。
codec => plain{ format=> "%{message}" }
}
}
if [type] == "logs-gamerole"{
kafka {
bootstrap_servers => "172.16.255.17:9092" #生产者
topic_id => "gamerole" #设置写入kafka的topic
compression_type => "snappy" #消息压缩模式,默认是none,可选gzip、snappy。
codec => plain{ format=> "%{message}" }
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
- logstash检测配置文件是否正确
bin/logstash -f /usr/local/logstash/config/www_to_kafka.conf --config.test_and_exit
- logstash 后台启动
bin/logstash -f /usr/local/logstash/config/www_to_kafka.conf &
# kafka命令
- 查看有那些分区
./bin/kafka-topics.sh --zookeeper 172.16.255.17:2181 --list
- 创建分区192.168.1.92:2181
./bin/kafka-topics.sh --create --zookeeper 172.16.255.17:2181 --replication-factor 1 --partitions 4 --topic logs-deviveactive
./bin/kafka-topics.sh --create --zookeeper 172.16.255.17:2181 --replication-factor 1 --partitions 4 --topic logs-gamerole
2
- 查看是否正常写入数据
./bin/kafka-console-consumer.sh --bootstrap-server 172.16.255.17:9092 --topic logs-deviveactive --from-beginning
./bin/kafka-console-consumer.sh --bootstrap-server 172.16.255.17:9092 --topic logs-gamerole --from-beginning
2
- 查看topic位置点
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic logs-deviveactive --time -1 --broker-list 172.16.255.17:9092
- 查看有哪些组
./kafka-consumer-groups.sh --bootstrap-server 172.16.255.17:9092 --list
- 查看组 topic 信息
./kafka-consumer-groups.sh --bootstrap-server 172.16.255.17:9092 --group logs-deviveactive --describe
- 设置为最初的偏移量
./kafka-consumer-groups.sh --bootstrap-server 172.16.255.17:9092 --group gamerole --topic logs-deviveactive --reset-offsets --to-earliest --execute
./kafka-consumer-groups.sh --bootstrap-server 172.16.255.17:9092 --group gamerole --topic gamerole --reset-offsets --to-latest --execute
- 设置任意偏移量:比如到300
./kafka-consumer-groups.sh --bootstrap-server 172.16.255.17:9092 --group gamerole --topic logs-deviveactive --reset-offsets --to-offset 300 --execute
- 查看帮助
./bin/kafka-consumer-groups.sh
- 查看topic参数
./bin/kafka-configs.sh --bootstrap-server 172.16.255.17:9092 --topic logs-deviveactive --all --describe
- 修改topic参数
./bin/kafka-configs.sh --bootstrap-server 172.16.255.17:9092 --topic logs-deviveactive --alter --add-config max.message.bytes=2048588
# 个人存储下载地址。。。
3.0.0.6.tar.gz
docker_images:kafka-manager-3.0.0.6.tar.gz
2