KafkaTopic

Strimzi를 사용하면 Kubernetes 환경에서 Kafka Topic을 쉽고 효율적으로 관리할 수 있습니다. Strimzi의 Topic Operator를 통해 Kafka Topic을 Kubernetes의 Custom Resource로 선언적으로 생성, 수정, 삭제할 수 있으며, 이는 접근 권한 문제와 같은 다양한 문제를 해결하는 데 도움이 됩니다.

KafkaTopic 생성하기

먼저 Strimzi를 이용하여 KafkaTopic을 생성하기 위해서는 Kafka Cluster가 Kubernetes 환경에 Strimzi Cluster Operator로 이미 배포되어 있어야 합니다. Strimzi를 이용한 Kafka Cluster 구축은 아래 링크를 통해 진행할 수 있습니다.

 

Strimzi를 이용한 Kafka Cluster 배포

https://stdhsw.tistory.com/entry/Strimzi-2-Cluster-Operator%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%9C-Kafka-Cluster-%EB%B0%B0%ED%8F%AC

 

my-topic.yml 파일 작성

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 3
  replicas: 2
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824

strimzi.io/cluster 라벨을 이용하여 my-cluster 이름의 카프카 클러스터에 my-topic을 생성합니다. spec에는 토픽의 세부 사항을 정의할 수 있습니다. 여기에는 파티션의 수(partitions), 각 파티션의 복제본 수(replicas)를 설정하였으며, 토픽에 적용할 추가적인 설정은 config에 기록하여 정의할 수 있습니다.

Strimzi에서는 Topic을 생성할 때 63글자 미만이며, 소문자와 hyphen을 이용하여 토픽명을 정의하는 것을 권장하고 있습니다. (특수문자, 공백, 기호는 사용하지 마세요)

 

 

Topic 생성

kubectl apply -f my-topic.yml

kafkaTopic 리소스를 생성하게 되면 my-cluster의 카프카 클러스터로 생성된 Entity Operator의 Topic Operator가 my-cluster 카프카에 토픽을 생성합니다.

만약 KafkaTopic 리소스가 아닌 다른 방법으로 토픽을 생성되었다면 ?

Kafka Streams 또는 auto.create.topics.enable 옵션으로 생성된 토픽은 기존 토픽명에 해시값이 붙어 "my-topic---c55e57fe2546a33f9e603caf57165db4072e827e"와 같은 이름으로 KafkaTopic 리소스가 생성됩니다. 이렇게 생성된 KafkaTopic리소스로도 해당 토픽을 관리할 수 있습니다.

 

kcat으로 Topic에 메시지 전송(Producer) 및 수신(Consumer)

Kafka 토픽이 성공적으로 생성된 후, 그 토픽이 정상적으로 동작하는지 확인하기 위해 kcat (이전 명칭: kafkacat)을 사용하여 메시지를 전송하고 수신하여 정상 동작을 확인합니다.

 

kafka-kcat.yml

apiVersion: v1
kind: Pod
metadata:
  name: kcat
  namespace: kafka
spec:
  containers:
    - name: kcat
      image: edenhill/kcat:1.7.1
      command: ["tail"]
      args: ["-f", "/dev/null"]
  terminationGracePeriodSeconds: 0

 

kcat 파드 생성

kubectl apply -f kafka-kcat.yml

 

kcat을 이용한 메시지 전송

# kcat으로 카프카 클러스터 확인
> kubectl exec -it -n kafka kcat -- kcat -L -b my-cluster-kafka-bootstrap:9092
Metadata for all topics (from broker -1: my-cluster-kafka-bootstrap:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-pool-a-0.my-cluster-kafka-brokers.kafka.svc:9092
  broker 2 at my-cluster-pool-b-2.my-cluster-kafka-brokers.kafka.svc:9092
  broker 3 at my-cluster-pool-b-3.my-cluster-kafka-brokers.kafka.svc:9092
  broker 1 at my-cluster-pool-a-1.my-cluster-kafka-brokers.kafka.svc:9092 (controller)
 1 topics:
  topic "my-topic" with 3 partitions:
    partition 0, leader 1, replicas: 1,0, isrs: 1,0
    partition 1, leader 0, replicas: 0,2, isrs: 0,2
    partition 2, leader 2, replicas: 2,3, isrs: 2,3

# kcat으로 메시지 전송 메시지를 전부 입력하였다면 ctrl+d로 나옵니다.
> kubectl exec -it -n kafka kcat -- kcat -P -b my-cluster-kafka-bootstrap:9092 -t my-topic -K:
> 1:a
> 2:b
> 3:c
> 1:aa
> 2:bb
> 3:cc

 

kcat을 이용한 메시지 수신

# kcat을 이용하여 메시지는 수신합니다.
> kubectl exec -it -n kafka kcat -- kcat -C -b my-cluster-kafka-bootstrap:9092 -t my-topic
b
c
bb
cc
a
aa
% Reached end of topic my-topic [1] at offset 4
% Reached end of topic my-topic [2] at offset 2
% Reached end of topic my-topic [0] at offset 0

 

참고문서

https://strimzi.io/docs/operators/latest/deploying

+ Recent posts