Cruise Control

Strimzi는 LinkedIn에서 개발한 Cruise Control을 지원합니다. Cruise Control은 Kafka 클러스터의 성능 모니터링, 로드 밸런싱, 스케일링 및 장애 복구 등을 자동화하는 도구로, Kafka 클러스터의 운영을 크게 단순화하고 최적화합니다. Strimzi를 사용하면 Cruise Control을 Kafka 클러스터와 쉽게 통합하여 다음과 같은 기능을 활용할 수 있습니다.

로드 밸런싱
Cruise Control은 클러스터 전체의 워크로드 분포를 지속적으로 모니터링하고, 필요에 따라 파티션을 다시 분배하여 로드 밸런싱을 수행합니다. 이를 통해 리소스 사용률을 최적화하고, 클러스터의 처리량과 성능을 극대화할 수 있습니다.

Broker 추가 및 제거
클러스터의 워크로드 요구사항이 변경될 때, Cruise Control을 사용하면 Kafka 브로커를 자동으로 추가하거나 제거할 수 있습니다.

장애 복구
브로커 장애가 발생할 경우, Cruise Control은 자동으로 장애 복구 프로세스를 수행하여, 영향을 받은 파티션을 다른 브로커로 재할당합니다. 이는 클러스터의 가용성과 내구성을 유지하는 데 중요합니다.

 

위와 같이 Cruise Control에서는 여러가지 기능을 제공하지만 이번 문서 실습에서는 Broker의 추가 및 제거에 중점으로 실습을 진행하였습니다.

Cruise Control Goal

Cruise Control은 Kafka 클러스터의 최적화와 관리를 위해 다양한 목표(Goals)를 사용합니다. 목표는 goals, default.goals, hard.goals로 구분되어 있습니다. goals에서는 default.goals에서 사용할 목표를 정의합니다. default.goals은 Cruise Control의 작업 중에 기본적으로 사용되는 목표들의 집합을 정의하며 반드시 goals에 정의된 목표만 사용할 수 있습니다. hard.goals 설정은 반드시 만족해야 하는 목표들의 집합을 정의합니다. 이 목표들은 Cruise Control이 어떤 상황에서도 위반해서는 안 되는 필수 조건으로 작용합니다.

goals, default.goals, hard.goals 기본값

# goal 기본값
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal

# default.goal 기본값
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal

# hard.goal 기본값
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal

goal 목표의 의미

# 클러스터의 복제본이 랙 고장에 대해 내성을 가지도록 보장합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal

# 랙을 고려하여 파티션의 복제본이 분산되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal

# 브로커 당 최소한의 토픽 리더를 유지하여 리더 분포의 균형을 맞춥니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal

# 모든 브로커가 설정된 복제본 용량을 초과하지 않도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal

# 모든 브로커의 디스크 사용량이 설정된 임계값을 넘지 않도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal

# 네트워크 입력 용량의 임계값을 넘지 않도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal

# 네트워크 출력 용량의 임계값을 넘지 않도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal

# CPU 사용량이 설정된 임계값을 넘지 않도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal

# 전체 클러스터에 걸쳐 복제본이 고르게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal

# 네트워크 출력 잠재력을 기준으로 최적화를 수행합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal

# 디스크 사용량이 브로커 간에 균일하게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal

# 네트워크 입력 사용량이 브로커 간에 균일하게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal

# 네트워크 출력 사용량이 브로커 간에 균일하게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal

# CPU 사용량이 브로커 간에 균일하게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal

# 특정 토픽의 복제본이 클러스터에 걸쳐 고르게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal

# 리더 복제본이 브로커 간에 고르게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal

# 리더 복제본의 입력 데이터가 브로커 간에 고르게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal

# Kafka Assigner 도구와 유사한 방식으로 디스크 사용량을 최적화합니다.
com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal

# Kafka Assigner 도구와 유사한 방식으로 랙 인식 복제본 배치를 최적화합니다.
com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal

# 리더 복제본의 선출을 최적화하여 특정 조건에서 리더의 선출을 선호합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal

Strimzi를 이용한 Cruise Control 배포

Strimzi에서는 Cruise Control를 배포하는 방법으로는 별도의 리소스를 정의하는 것이 아닌 kafkas.kafka.strimzi.io 리소스에 cruiseControl: {}를 정의하면 카프카 클러스터와 함께 Deployment 형식으로 배포됩니다.

---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
  annotations:
    strimzi.io/node-pools: enabled
spec:
  kafka:
    version: 3.6.1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    config:
      default.replication.factor: 2
      min.insync.replicas: 1
      inter.broker.protocol.version: "3.6"
      num.partitions: 3
    resources:
      limits:
        cpu: 1
        memory: 1Gi
      requests:
        cpu: 500m
        memory: 1Gi
    logging:
      type: inline
      loggers:
        kafka.root.logger.level: INFO
        kafka.request.logger.level: INFO
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 1Gi
      deleteClaim: true
  entityOperator:
    topicOperator: {}
    userOperator: {}
  cruiseControl:
    config:
      default.goals: >
        com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal
      hard.goals: >
        com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal

Kafka 리소스를 생성하게 되면 zookeeper, kafka와는 다르게 Cruise Control은 Deployment 리소스로 배포됩니다.

KafkaRebalance

Strimzi에서 최적화 작업을 제안하기 위해서는 KafkaRebalance라는 리소스를 생성하는 것으로 제안을 할 수 있습니다. kafkaRebalance는 Cruise Control에 의해 제안을 확인하고 해당 제안을 적용하게 됩니다. KafkaRebalance로 제안할 수 있는 종류는 다음과 같습니다.

full 기본값

Cruise Control에 정의된 goal을 기반으로 클러스터의 모든 브로커에 rebalance를 수행합니다. KafkaRebalance의 기본값으로 KafkaRebalance에 아무런 정보를 기입하지 않으면 full 모드로 동작하지만 사용자는 KafkaRebalance 리소스를 생성할 때 mode 필드를 통해 리밸런싱의 모드를 직접 지정할 수 있습니다. full 모드를 명시적으로 지정하면, Cruise Control은 클러스터 전체에 대한 종합적인 리밸런싱 작업을 수행할 수 있습니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: my-rebalance
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  mode: full

 

add-brokers

add-brokers 옵션은 Kafka 클러스터에 브로커를 추가할 때 사용되며, Cruise Control을 통해 이러한 작업을 자동화하고 최적화할 수 있습니다. Kafka 클러스터의 확장 과정에서 단순히 새로운 브로커를 추가하는 것만으로는 기존에 생성된 토픽의 파티션 복제본이 자동으로 새 브로커로 재할당되지 않습니다. 이 경우, 새 브로커는 앞으로 생성될 토픽의 파티션에만 사용될 수 있습니다.

add-brokers 옵션을 사용하여 Kafka 클러스터에 새로 추가된 브로커들에게 기존 토픽의 파티션 복제본을 자동으로 할당할 수 있습니다. 이는 클러스터의 리소스 사용을 최적화하고, 전체적인 부하 분산을 개선하는 데 도움이 됩니다. add-brokers 작업을 통해 새 브로커는 기존 토픽의 파티션 복제본을 할당받게 되며, 이는 클러스터의 부하를 보다 균등하게 분산시키고, 특정 브로커에 과부하가 집중되는 것을 방지합니다. 

파티션 복제본의 재할당 과정은 클러스터의 크기와 상태, 네트워크 속도 등에 따라 시간이 다소 소요될 수 있습니다.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: my-rebalance
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  mode: add-brokers
  brokers: [2, 3]

remove-brokers

Kafka Cluster를 축소학 위해 Broker를 제거할 때 사용됩니다. KafkaRebalance를 사용하지 않고 브로커를 제거할 경우 파티션의 메시지가 정상적으로 Replication 되지 않고 Broker가 드랍되어 메시지 손실이 발생할 수 있습니다. 이러한 문제를 방지하기 위해 remove-broker를 사용하여 파티션을 이동하고 안전하게 Broker를 제거할 수 있습니다. remove-brokers 명령을 실행할 때, 제거하려는 브로커의 ID를 지정합니다. Cruise Control은 이 정보를 바탕으로 해당 브로커에 할당된 파티션 복제본을 다른 브로커로 이동시키는 최적의 계획을 생성하고 실행합니다.

브로커를 제거하는 과정에서 파티션의 리밸런싱이 발생합니다. 이 과정은 네트워크 트래픽과 I/O 부하를 증가시킬 수 있으므로, 클러스터의 성능에 일시적인 영향을 줄 수 있습니다.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: my-rebalance
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  mode: remove-brokers
  brokers: [2, 3]

KafkaRebalance 수락하기

KafkaRebalance 리소스를 통해 Cruise Control을 사용하여 Kafka 클러스터의 리밸런싱을 관리할 때, 작업의 상태는 여러 단계를 거칩니다. 이 과정은 사용자가 제안된 리밸런싱 계획을 검토하고 사용자가 승인하는 방식으로 리밸런싱을 수행하는 방식으로 클러스터의 변경 사항을 보다 안전하게 관리할 수 있습니다.

# 리소스 상태 확인
> kubectl get kafkarebalances.kafka.strimzi.io -n kafka
NAME           CLUSTER    PENDINGPROPOSAL  PROPOSALREADY   REBALANCING   READY   NOTREADY
my-rebalance   my-cluster                                  true

 

KafkaRebalance 리소스 상태

PENDINGPROPOSAL
KafkaRebalance 리소스가 생성된 직후, Cruise Control이 리밸런싱 작업의 계획을 작성하고 있는 단계를 나타냅니다.

PROPOSALREADY
작업 계획이 완료되었고 사용자의 검토를 기다리고 있는 상태입니다.

REBALANCING
사용자가 리밸런싱 계획을 승인한 후, Cruise Control이 실제로 리밸런싱 작업을 수행하고 있는 상태입니다.

READY
리밸런싱 작업이 성공적으로 완료되어, KafkaRebalance 리소스가 최종 상태에 도달한 것입니다.

NOTREADY
리밸런싱 작업에 문제가 발생하여 계획이 성공적으로 수행되지 않았음을 나타내는 상태입니다. 이 경우, 문제의 원인을 분석하고 해결한 후 리밸런싱을 다시 시도해야 할 수 있습니다.

 

KafkaRebalance 리소스가 정상적으로 생성되었다면 describe를 이용하여 작업의 계획을 확인할 수 있습니다.

# 작업 계획 확인
> kubectl describe kafkarebalances.kafka.strimzi.io -n kafka my-rebalance
Name:         my-rebalance
Namespace:    kafka
Labels:       strimzi.io/cluster=my-cluster
Annotations:  <none>
API Version:  kafka.strimzi.io/v1beta2
Kind:         KafkaRebalance
Metadata:
  Creation Timestamp:  2024-03-17T06:48:41Z
  Generation:          3
  Resource Version:    179034
  UID:                 00ff25be-fb27-4f99-a9bb-2722560dfddd
Spec:
  Goals:
    LeaderReplicaDistributionGoal
    TopicReplicaDistributionGoal
  Skip Hard Goal Check:  true
Status:
  Conditions:
    Last Transition Time:  2024-03-17T07:01:35.511905341Z
    Status:                True
    Type:                  ProposalReady
  Observed Generation:     3
  Optimization Result:
    After Before Load Config Map:  my-rebalance
    Data To Move MB:               0
    Excluded Brokers For Leadership:
    Excluded Brokers For Replica Move:
    Excluded Topics:
    Intra Broker Data To Move MB:         0
    Monitored Partitions Percentage:      100
    Num Intra Broker Replica Movements:   0
    Num Leader Movements:                 0
    Num Replica Movements:                0
    On Demand Balancedness Score After:   100
    On Demand Balancedness Score Before:  100
    Provision Recommendation:
    Provision Status:                     UNDECIDED
    Recent Windows:                       1
  Session Id:                             b31d2057-273f-4666-9565-51ce304e95db
Events:                                   <none>

 

작업의 계획을 확인하고 annotate 주석을 이용하여 작업을 승인합니다.

# 작업 승인
> kubectl annotate kafkarebalance -n kafka my-rebalance strimzi.io/rebalance="approve"

# 작업을 중지할 때 stop 수행
# kubectl annotate kafkarebalance -n kafka --overwrite my-rebalance strimzi.io/rebalance="stop"

# 완료된 작업을 나중에 재사용할 때 refresh 수행 후 approve
# kubectl annotate kafkarebalance -n kafka --overwrite my-rebalance strimzi.io/rebalance="refresh"
# kubectl annotate kafkarebalance -n kafka --overwrite my-rebalance strimzi.io/rebalance="approve"

 

 


Cruise Control과 KafkaRebalance 실습

지금까지 Cruise Control과 KafkaRebalance에 대해서 알아봤습니다. 이제는 Broker 추가 및 제거하는 방법으로 직접 실습을 진행하겠습니다.

 

Cluster 구성

실습 진행을 위해 테스트용 클러스터를 구축합니다. 이번 실습에서는 별다른 설정 없이 cruiseControl 기본 값 만으로 테스트를 진행하였습니다.

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-a
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 2
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: true
        
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
  annotations:
    strimzi.io/node-pools: enabled
spec:
  kafka:
    version: 3.6.1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    config:
      default.replication.factor: 2
      min.insync.replicas: 1
      inter.broker.protocol.version: "3.6"
      num.partitions: 3
    resources:
      limits:
        cpu: 1
        memory: 1Gi
      requests:
        cpu: 500m
        memory: 1Gi
    logging:
      type: inline
      loggers:
        kafka.root.logger.level: INFO
        kafka.request.logger.level: INFO
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 1Gi
      deleteClaim: true
  entityOperator:
    topicOperator: {}
    userOperator: {}
  cruiseControl:
    {}

 

Topic 생성

토픽 a-topic, b-topic을 생성하고 나서 토픽 파티션의 할당된 브로커를 확인하면 0, 1번 브로커에 배포된 것을 확인할 수 있습니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: a-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 4
  replicas: 2
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: b-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 4
  replicas: 2
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824

 

kcat 파드를 이용하여 토픽 생성 결과 확인

> ❯ kubectl exec -it -n kafka kcat -- kcat -L -b my-cluster-kafka-bootstrap:9092
Metadata for all topics (from broker -1: my-cluster-kafka-bootstrap:9092/bootstrap):
 2 brokers:
  broker 0 at my-cluster-pool-a-0.my-cluster-kafka-brokers.kafka.svc:9092
  broker 1 at my-cluster-pool-a-1.my-cluster-kafka-brokers.kafka.svc:9092 (controller)
 5 topics:
  ( ... 생략 ... )
  topic "a-topic" with 4 partitions:
    partition 0, leader 0, replicas: 0,1, isrs: 0,1
    partition 1, leader 1, replicas: 1,0, isrs: 1,0
    partition 2, leader 0, replicas: 0,1, isrs: 0,1
    partition 3, leader 1, replicas: 1,0, isrs: 1,0
  topic "b-topic" with 4 partitions:
    partition 0, leader 1, replicas: 1,0, isrs: 1,0
    partition 1, leader 0, replicas: 0,1, isrs: 0,1
    partition 2, leader 1, replicas: 1,0, isrs: 1,0
    partition 3, leader 0, replicas: 0,1, isrs: 0,1

 

Broker 확장

KafkaNodePool의 개수를 증가 시켜 브로커를 추가합니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-a
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 4
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: true

 

브로커를 추가하여 파드가 생성되었지만 카프카 클러스터를 확인하면 추가된 브로커는 클러스터에 합류하지 못한 것을 알 수 있습니다.

 

kcat을 이용하여 확인하면 클러스터에 변화가 없는 것을 알 수 있습니다.

> kubectl exec -it -n kafka kcat -- kcat -L -b my-cluster-kafka-bootstrap:9092
Metadata for all topics (from broker -1: my-cluster-kafka-bootstrap:9092/bootstrap):
 2 brokers:
  broker 0 at my-cluster-pool-a-0.my-cluster-kafka-brokers.kafka.svc:9092
  broker 1 at my-cluster-pool-a-1.my-cluster-kafka-brokers.kafka.svc:9092 (controller)
 5 topics:
  ( ... 생략 ... )
  topic "a-topic" with 4 partitions:
    partition 0, leader 0, replicas: 0,1, isrs: 0,1
    partition 1, leader 1, replicas: 1,0, isrs: 1,0
    partition 2, leader 0, replicas: 0,1, isrs: 0,1
    partition 3, leader 1, replicas: 1,0, isrs: 1,0
  topic "b-topic" with 4 partitions:
    partition 0, leader 1, replicas: 1,0, isrs: 1,0
    partition 1, leader 0, replicas: 0,1, isrs: 0,1
    partition 2, leader 1, replicas: 1,0, isrs: 1,0
    partition 3, leader 0, replicas: 0,1, isrs: 0,1

 

KafkaRebalance add-brokers

2, 3번 브로커를 추가합니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: rebalance-add
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  mode: add-brokers
  brokers: [2, 3]

 

KafkaRebalance 리소스 상태 확인

 

작업 계획 확인

 

작업 승인

 

결과 확인

> kubectl exec -it -n kafka kcat -- kcat -L -b my-cluster-kafka-bootstrap:9092
Metadata for all topics (from broker -1: my-cluster-kafka-bootstrap:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-pool-a-0.my-cluster-kafka-brokers.kafka.svc:9092
  broker 2 at my-cluster-pool-a-2.my-cluster-kafka-brokers.kafka.svc:9092
  broker 3 at my-cluster-pool-a-3.my-cluster-kafka-brokers.kafka.svc:9092
  broker 1 at my-cluster-pool-a-1.my-cluster-kafka-brokers.kafka.svc:9092 (controller)
 5 topics:
 (... 생략 ...)
  topic "a-topic" with 4 partitions:
    partition 0, leader 3, replicas: 3,2, isrs: 2,3
    partition 1, leader 3, replicas: 3,2, isrs: 2,3
    partition 2, leader 3, replicas: 3,2, isrs: 2,3
    partition 3, leader 2, replicas: 2,3, isrs: 3,2
  topic "b-topic" with 4 partitions:
    partition 0, leader 1, replicas: 1,0, isrs: 1,0
    partition 1, leader 3, replicas: 3,2, isrs: 2,3
    partition 2, leader 3, replicas: 3,2, isrs: 2,3
    partition 3, leader 0, replicas: 0,1, isrs: 0,1

결과를 확인하면 기존 토픽의 파티션이 새롭게 추가된 브로커에도 할당된 것을 확인할 수 있습니다.

 

KafkaRebalance remove-brokers 생성

이제는 반대로 추가한 브로커를 제거하는 방법을 수행합니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: rebalance-remove
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  mode: remove-brokers
  brokers: [2, 3]

 

remove-brokers 작업 수행

 

KafkaNodePool의 개수를 줄여 자원 반납

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-a
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 2
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: true

 

참고문서

https://strimzi.io/docs/operators/latest/deploying#cruise-control-concepts-str

KafkaTopic

Strimzi를 사용하면 Kubernetes 환경에서 Kafka Topic을 쉽고 효율적으로 관리할 수 있습니다. Strimzi의 Topic Operator를 통해 Kafka Topic을 Kubernetes의 Custom Resource로 선언적으로 생성, 수정, 삭제할 수 있으며, 이는 접근 권한 문제와 같은 다양한 문제를 해결하는 데 도움이 됩니다.

KafkaTopic 생성하기

먼저 Strimzi를 이용하여 KafkaTopic을 생성하기 위해서는 Kafka Cluster가 Kubernetes 환경에 Strimzi Cluster Operator로 이미 배포되어 있어야 합니다. Strimzi를 이용한 Kafka Cluster 구축은 아래 링크를 통해 진행할 수 있습니다.

 

Strimzi를 이용한 Kafka Cluster 배포

https://stdhsw.tistory.com/entry/Strimzi-2-Cluster-Operator%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%9C-Kafka-Cluster-%EB%B0%B0%ED%8F%AC

 

my-topic.yml 파일 작성

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 3
  replicas: 2
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824

strimzi.io/cluster 라벨을 이용하여 my-cluster 이름의 카프카 클러스터에 my-topic을 생성합니다. spec에는 토픽의 세부 사항을 정의할 수 있습니다. 여기에는 파티션의 수(partitions), 각 파티션의 복제본 수(replicas)를 설정하였으며, 토픽에 적용할 추가적인 설정은 config에 기록하여 정의할 수 있습니다.

Strimzi에서는 Topic을 생성할 때 63글자 미만이며, 소문자와 hyphen을 이용하여 토픽명을 정의하는 것을 권장하고 있습니다. (특수문자, 공백, 기호는 사용하지 마세요)

 

 

Topic 생성

kubectl apply -f my-topic.yml

kafkaTopic 리소스를 생성하게 되면 my-cluster의 카프카 클러스터로 생성된 Entity Operator의 Topic Operator가 my-cluster 카프카에 토픽을 생성합니다.

만약 KafkaTopic 리소스가 아닌 다른 방법으로 토픽을 생성되었다면 ?

Kafka Streams 또는 auto.create.topics.enable 옵션으로 생성된 토픽은 기존 토픽명에 해시값이 붙어 "my-topic---c55e57fe2546a33f9e603caf57165db4072e827e"와 같은 이름으로 KafkaTopic 리소스가 생성됩니다. 이렇게 생성된 KafkaTopic리소스로도 해당 토픽을 관리할 수 있습니다.

 

kcat으로 Topic에 메시지 전송(Producer) 및 수신(Consumer)

Kafka 토픽이 성공적으로 생성된 후, 그 토픽이 정상적으로 동작하는지 확인하기 위해 kcat (이전 명칭: kafkacat)을 사용하여 메시지를 전송하고 수신하여 정상 동작을 확인합니다.

 

kafka-kcat.yml

apiVersion: v1
kind: Pod
metadata:
  name: kcat
  namespace: kafka
spec:
  containers:
    - name: kcat
      image: edenhill/kcat:1.7.1
      command: ["tail"]
      args: ["-f", "/dev/null"]
  terminationGracePeriodSeconds: 0

 

kcat 파드 생성

kubectl apply -f kafka-kcat.yml

 

kcat을 이용한 메시지 전송

# kcat으로 카프카 클러스터 확인
> kubectl exec -it -n kafka kcat -- kcat -L -b my-cluster-kafka-bootstrap:9092
Metadata for all topics (from broker -1: my-cluster-kafka-bootstrap:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-pool-a-0.my-cluster-kafka-brokers.kafka.svc:9092
  broker 2 at my-cluster-pool-b-2.my-cluster-kafka-brokers.kafka.svc:9092
  broker 3 at my-cluster-pool-b-3.my-cluster-kafka-brokers.kafka.svc:9092
  broker 1 at my-cluster-pool-a-1.my-cluster-kafka-brokers.kafka.svc:9092 (controller)
 1 topics:
  topic "my-topic" with 3 partitions:
    partition 0, leader 1, replicas: 1,0, isrs: 1,0
    partition 1, leader 0, replicas: 0,2, isrs: 0,2
    partition 2, leader 2, replicas: 2,3, isrs: 2,3

# kcat으로 메시지 전송 메시지를 전부 입력하였다면 ctrl+d로 나옵니다.
> kubectl exec -it -n kafka kcat -- kcat -P -b my-cluster-kafka-bootstrap:9092 -t my-topic -K:
> 1:a
> 2:b
> 3:c
> 1:aa
> 2:bb
> 3:cc

 

kcat을 이용한 메시지 수신

# kcat을 이용하여 메시지는 수신합니다.
> kubectl exec -it -n kafka kcat -- kcat -C -b my-cluster-kafka-bootstrap:9092 -t my-topic
b
c
bb
cc
a
aa
% Reached end of topic my-topic [1] at offset 4
% Reached end of topic my-topic [2] at offset 2
% Reached end of topic my-topic [0] at offset 0

 

참고문서

https://strimzi.io/docs/operators/latest/deploying

 

Cluster Operator를 이용한 Kafka Cluster 배포

Strimzi를 사용하여 Kafka 클러스터를 배포하는 것은 Kubernetes 환경에서 매우 간단합니다. Strimzi의 Custom Resource Definitions (CRD)인 Kafka 리소스를 활용하여 Kafka Broker와 Zookeeper 앙상블을 포함한 Kafka 클러스터를 구성할 수 있습니다.

Strimzi 설치를 진행하지 않으셨다면 아래 링크를 통해 Strimzi 설치를 먼저 진행하여 주시기 바랍니다.

https://stdhsw.tistory.com/entry/Strimzi-1-Strimzi-Cluster-Operator-%EC%84%A4%EC%B9%98-in-Kubernetes

 

Zookeeper 모드 Kafka Cluster 배포

Strimzi의 Custom Resource인 Kafka 리소스를 사용하여 간단하게 Kafka 클러스터를 배포하는 방법은 다음과 같습니다.

 

my-cluster.yml 파일 작성

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-a
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 2
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: true
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-b
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 2
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: true

---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
  annotations:
    strimzi.io/node-pools: enabled
spec:
  kafka:
    version: 3.6.1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    config:
      default.replication.factor: 2
      min.insync.replicas: 1
      inter.broker.protocol.version: "3.6"
      num.partitions: 3
    resources:
      limits:
        cpu: 1
        memory: 1Gi
      requests:
        cpu: 500m
        memory: 1Gi
    logging:
      type: inline
      loggers:
        kafka.root.logger.level: INFO
        kafka.request.logger.level: INFO
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 1Gi
      deleteClaim: true
  entityOperator:
    topicOperator: {}
    userOperator: {}

KafkaNodePool은 카프카 브로커의 특성을 정의하고 해당 특성을 카프카 클러스터에 브로커로 등록해 주는 역할을 수행합니다. KafkaNodePool을 이용하여 브로커 노드 관리를 효율적으로 수행할 수 있습니다. 카프카 브로커는 KafkaNodePool 마다 2개씩 구성하여 총 4대의 브로커로 구성되었으며, Zookeeper는 3대로 구성하였습니다.

 

Kafka 리소스 생성

kubectl apply -f my-cluster.yml

 

kafka 리소스를 생성하였다면 StrimziPodSet리소스가 생성됩니다. 기존에는 StatefulSet을 통하여 파드가 관리되었지만 현재는 Strimzi에서 정의한 StrimziPodSet으로 파드가 관리됩니다. 위와 같이 카프카 클러스터를 구축하였다면 다음과 같이 StrimziPodSet이 생성된 것을 확인할 수 있습니다.

 

그리고 my-cluster 카프카 설정으로 Deployment 워크로드로 EntityOperator가 생성됩니다. EntityOperator는 TopicOperator와 UserOperator로 구성되며 Topic과 User를 생성 및 관리하는 역할을 수행합니다. 하나의 카프카 클러스터에 최대 하나의 EntityOperator가 생성될 수 있습니다.

StrimziPodSet과 EntityOperator가 모두 정상적으로 배포가 되었고 이를 통해 다음과 같이 파드가 생성된 것을 확인할 수 있습니다.

KRAFT 모드는 현재

이글을 작성하고 다음날... Strimzi에서 Kraft모드 사용 정식 버전이 나왔습니다. Strimzi 버전 0.40.0으로 Upgrade를 진행하시면 정식 Kraft 모드로 카프카 클러스터 배포를 진행하실 수 있습니다.

 

kraft-cluster.yml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dual-role
  namespace: kafka
  labels:
    strimzi.io/cluster: kraft-cluster
spec:
  replicas: 4
  roles:
    - controller
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        deleteClaim: false
---

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kraft-cluster
  namespace: kafka
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: 3.6.1
    metadataVersion: 3.6-IV2
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
    resources:
      limits:
        cpu: 1
        memory: 1Gi
      requests:
        cpu: 500m
        memory: 1Gi
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 5Gi
          deleteClaim: false
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 1Gi
      deleteClaim: false
  entityOperator:
    userOperator: {}

 

KafkaNodePool은 Strimzi Kafka Operator에서 제공하는 리소스 중 하나로, Kafka 브로커의 구성 및 성질을 정의하는 리소스입니다. KRaft 모드(즉, Zookeeper 없이 Kafka를 운영하는 모드)에서 KafkaNodePool을 사용하면 브로커의 역할을 보다 세밀하게 설정할 수 있습니다. KRaft 모드에서 브로커는 다음과 같은 역할을 가질 수 있습니다.

  • Controller: Kafka 클러스터 내에서 메타데이터를 관리하는 중앙 제어 노드의 역할을 합니다. 컨트롤러는 클러스터의 전반적인 상태를 관리하고, 토픽 생성, 삭제, 파티션 리밸런싱과 같은 작업을 총괄합니다. KRaft 모드에서는 단일 컨트롤러 또는 컨트롤러 세트가 이러한 역할을 담당합니다.
  • Broker: 실제 메시지를 저장하고 클라이언트 요청을 처리하는 노드의 역할을 합니다. 브로커는 프로듀서로부터 메시지를 받아 저장하고, 컨슈머의 요청에 따라 메시지를 제공합니다.
  • Controller와 Broker 모두: 하나의 노드가 컨트롤러 역할과 브로커 역할을 동시에 수행할 수도 있습니다. 이 경우, 노드는 메타데이터 관리와 메시지 저장 및 처리의 책임을 모두 갖게 됩니다.

KafkaNodePool 리소스를 사용하여 각 브로커 노드의 역할을 구성함으로써, Kafka 클러스터의 성능과 안정성을 최적화하고, 특정 운영 요구사항에 맞춰 클러스터를 조정할 수 있습니다.

 

YAML파일을 보시면 zookeeper가 선언된 것을 확인할 수 있는데 아직까지 Strimzi 0.40.0 버전에서는 Kafka CRD에 zookeeper가 반듯이 선언되도록 정의가 되어 있어 zookeeper를 선언해야 되지만 실제로 zookeeper가 배포되지는 않습니다.

 

참고문서

https://strimzi.io/docs/operators/latest/deploying

 

Strimzi 란

Strimzi는 Apache Kafka 및 기타 Apache Kafka 생태계 프로젝트를 Kubernetes 및 Red Hat OpenShift와 같은 컨테이너 오케스트레이션 플랫폼에서 쉽게 배포, 관리 및 운영할 수 있도록 도와주는 오픈 소스 프로젝트입니다. Strimzi를 통하여 Kafka를 Kubernetes에 배포하면 Kafka 클러스터를 쉽게 확장하고 Kafka 클러스터를 쉽게 백업하고 Kafka 클러스터를 쉽게 업그레이드할 수 있습니다. 또한 Strimzi는 Prometheus, Grafana 등과 같은 모니터링 도구와 통합되어 Kafka 클러스터의 상태와 성능을 지속적으로 모니터링할 수 있도록 하며, Kafka 클러스터를 쉽게 재구성하고 Kafka 클러스터를 쉽게 보안할 수 있습니다.

또한, Strimzi는 Zookeeper, Kafka Connect, Kafka MirrorMaker, Kafka Bridge, Kafka Exporter, Cruise Control, Strimzi UI와 같은 Kafka 생태계 프로젝트를 Kubernetes에 통합 배포할 수 있도록 지원합니다. 이는 Kafka 기반 애플리케이션과 시스템을 효율적으로 구축, 확장 및 관리하는 데 큰 도움이 됩니다. 이번 실습을 통하여 Kubernetes 환경에서 Strimzi Cluster Operator 배포와 각 리소스의 기능에 대하여 알아보겠습니다.

 

Strimzi 동작 방식

이미지 출처 : https://strimzi.io/docs/operators/latest/overview#kafka-components_str

 

Strimzi 설치

Kubernetes 환경에 Strimzi를 설치하는 방법은 대표적으로 3가지 방법이 있습니다. YAML을 통한 설치 방법, OperatorHub을 이용한 설치 방법, Helm을 이용한 설치방법이 있습니다. 이번 문서에서는 YAML 방식과, Helm 방식에 대하여 정리하였습니다. 둘 중 하나의 방법을 선택하여 진행하여 주시기 바랍니다.

YAML 설치 방법

Strimzi 클러스터를 Kubernetes 환경에 배포하는 과정에서는 기본적인 Kubernetes 명령어와 개념에 대한 이해가 필요합니다. YAML을 이용하여 배포를 진행할 때는 Kubernets 명령어 습득 만으로도 Strimzi 클러스터를 배포할 수 있는 이점이 있지만 다수의 설정이나 복잡한 구성을 변경하고자 할 때 번거롭고 오류가 발생할 수 있는 불편함이 있습니다.

 

GitHub에서 양식 가져오기

`https://github.com/strimzi/strimzi-kafka-operator` GitHub주소를 가져와 해당 디렉토리로 이동하여 다음 명령어를 통하여 간편하게 Strimzi Cluster Operator를 배포할 수 있습니다.

kubectl apply -f install/cluster-operator -n 네임스페이스

 

Helm을 이용한 설치 방법

Helm을 사용하여 Strimzi Kafka 클러스터를 Kubernetes 환경에 배포하는 방법은 Kubernetes 명령어뿐만 아니라 Helm의 기본 사용법에 대한 이해도 필요하게 합니다. 이러한 접근 방식은 초기 학습해야 되는 단점이 있을 수 있지만, Helm의 기본 사용법을 숙지하면 Helm은 Kubernetes 애플리케이션의 패키지 관리자 역할을 수행하여 다양한 설정을 values.yaml 파일을 통해 간단하게 수정할 수 있으며, 업그레이드, 롤백, 삭제 등의 작업을 간단한 명령어로 수행할 수 있습니다. 뿐만 아니라 values.yaml 파일을 공유함으로써 협업하는 팀원들에게 현재의 설정 값을 간편하게 공유할 수 있습니다.

 

values.yml 설정 변경

Kubernets Cluster내에 모든 Namespace에 접근할 수 있도록 설정을 변경합니다.

watchAnyNamespace: true

 

Helm을 이용한 Strimzi 설치

# helm repo 추가
helm repo add strimzi https://strimzi.io/charts

# helm으로 Strimzi 설치 진행
helm upgrade --install -n kafka --create-namespace strimzi-kafka-operator strimzi/strimzi-kafka-operator --version 0.40.0 -f values.yml

 

정상 동작 확인

위와 같이 둘중 하나의 방식으로 Strimzi를 정상적으로 설치되었다면 Kubernetes 클러스터에 CRD(CustomResourceDefinitions)가 정의되고 Strimzi Cluster Operator가 배포된 것을 확인할 수 있습니다.

 

Strimzi CRD 확인

❯ kubectl api-resources  | grep strimzi
strimzipodsets             sps        core.strimzi.io/v1beta2         true       StrimziPodSet
kafkabridges               kb         kafka.strimzi.io/v1beta2        true       KafkaBridge
kafkaconnectors            kctr       kafka.strimzi.io/v1beta2        true       KafkaConnector
kafkaconnects              kc         kafka.strimzi.io/v1beta2        true       KafkaConnect
kafkamirrormaker2s         kmm2       kafka.strimzi.io/v1beta2        true       KafkaMirrorMaker2
kafkamirrormakers          kmm        kafka.strimzi.io/v1beta2        true       KafkaMirrorMaker
kafkanodepools             knp        kafka.strimzi.io/v1beta2        true       KafkaNodePool
kafkarebalances            kr         kafka.strimzi.io/v1beta2        true       KafkaRebalance
kafkas                     k          kafka.strimzi.io/v1beta2        true       Kafka
kafkatopics                kt         kafka.strimzi.io/v1beta2        true       KafkaTopic
kafkausers                 ku         kafka.strimzi.io/v1beta2        true       KafkaUser

 

Strimzi Operator 확인

❯ k get po -n kafka | grep strimzi
strimzi-cluster-operator-95d88f6b5-mmc7q    1/1     Running   1 (30m ago)   70m

 

Strimzi Cluster Operator

Strimzi Cluster Operator는 Kubernetes 환경에서 Strimzi CRD(CustomResourceDefinitions) 관련 리소스를 관리하는 역할을 수행합니다. 주요 기능으로는 Kafka Cluster 배포 및 관리, 구성 관리, 스케일링, 업데이트 및 업그레이드, 모니터링 및 로깅, 보안 관리, 복구, 토픽 생성 관리 및 사용자 생성 관리, 그리고 커넥터 생성 관리를  포함됩니다. 이를 통해 Kafka 클러스터의 성능, 보안, 효율성을 향상하며 Kubernetes 환경에서의 관리를 용이하게 합니다.

Strimzi 공식 문서에서는 Kubernetes 클러스터에 하나의 Cluster Operator만 배포하는 것을 권장하는데, 이는 여러 Operator가 동일한 리소스를 관리하려고 할 때 발생할 수 있는 충돌과 관련된 문제를 방지하기 위함입니다. Cluster Operator는 Strimzi에서 생성된 Custom Resource Definitions(CRD)에 대한 감시로 리소스 관리 및 조정 작업을 수행합니다. Cluster Operator가 동일한 CRD 인스턴스를 관리하려고 하면 예상치 못한 동작이나 데이터 불일치가 발생할 수 있습니다.

 

Strimzi Cluster 구조

Kubernetes Cluster 내에서 단 하나의 Strimzi Cluster Operator만 존재하며 이 Operator를 통해 Kubernetes Cluster 내에서 여러 개의 Kafka Cluster를 구축하고 관리할 수 있습니다.

 

Strimzi CustomResource

Strimzi를 설치하면, Kafka 클러스터 및 관련 리소스를 Kubernetes에서 관리할 수 있도록 여러 CRD(CustomResourceDefinitions) 를 생성합니다. 각 CRD는 Kafka 생태계의 특정 구성 요소를 나타내며, Strimzi Cluster Operator를 통해 관리됩니다. 주요 CRD와 그 기능은 다음과 같습니다

  • kafka : Kafka 클러스터의 생성, 구성, 관리를 위한 리소스를 정의합니다. 사용자는 이 리소스를 사용하여 Kafka 브로커, ZooKeeper 앙상블, Entity Operator를 포함한 전체 Kafka 클러스터를 정의하고 배포할 수 있습니다.
  • KafkaTopic : Kafka 클러스터 내의 개별 토픽을 관리하기 위해 사용됩니다. 이를 통해 토픽의 생성, 삭제, 구성 변경 등을 Kubernetes 네이티브 방식으로 수행할 수 있습니다.
  • KafkaUser : Kafka 클러스터의 사용자와 인증 정보를 관리합니다. 이 리소스를 사용하여 사용자 인증서 및 액세스 권한을 설정하고 관리할 수 있습니다.
  • KafkaConnect : Kafka Connect 클러스터를 배포하고 관리하는 CRD입니다. 데이터 소스와 싱크를 Kafka 토픽과 연결하는 커넥터를 배포 및 관리할 수 있습니다.
  • KafkaConnectors : Kafka Connect 클러스터 내에서 실행되는 개별 커넥터를 정의하고 관리합니다.
  • KafkaBridge : Kafka 클러스터에 접근할 수 있는 REST 인터페이스를 제공하는 Kafka Bridge 인스턴스를 배포 및 관리합니다. 이는 외부 애플리케이션이 Kafka 클러스터와 통신할 수 있게 해 줍니다.
  • KafkaMirrorMaker : 토픽의 데이터를 미러링 하는 데 사용되지만 현재는 사용하지 않습니다.
  • KafkaMirrorMaker2 : KafkaMirrorMaker의 더 발전된 버전으로, 더 복잡한 미러링 및 리패키징 요구사항을 지원합니다. 이를 통해 다양한 클러스터 간에 토픽, 메시지, 파티션 등을 더 효율적으로 미러링 할 수 있습니다.
  • KafkaRebalance : Kafka 클러스터 내에서 파티션의 리밸런싱을 관리하는 CRD입니다. 클러스터의 부하 분산 및 성능 최적화를 위한 자동 리밸런싱 작업을 지원합니다.
  • StrimziPodSet : 기존에는 StatefulSet을 이용하여 Kafka Broker가 관리되었지만 현재는 StatefulSet 대신 StrimziPodSet을 이용하여 관리됩니다. Kafka리소스로 생성된 Kafka Broker, Zookeeper 모두 StrimziPodSet으로 관리됩니다.
  • KafkaNodePool : 각 특성에 맞는 Kafka Broker의 특성과 Storage를 관리합니다.

참고문서

https://strimzi.io/docs/operators/latest/deploying

 

Deploying and Upgrading (0.40.0)

Configure and manage a Strimzi deployment to your precise needs using Strimzi custom resources. Strimzi provides example custom resources with each release, allowing you to configure and create instances of supported Kafka components. Fine-tune your deploy

strimzi.io

 

현재 제가 근무하는 회사는 모든 서비스를 마이크로서비스 아키텍처(MSA)로 동작하기 위해 Kubernetes 환경에서 운영하고 있습니다. 이러한 환경에서는 Horizontal Pod Autoscaler(HPA)를 사용하여, Consumer 애플리케이션이 일정 리소스를 초과하여 사용할 경우, 이를 감지하고 자동으로 스케일아웃(Scale Out)을 진행하여 최대 파티션 개수만큼 늘어날 수 있도록 설정하였으며 반대로 리소스 사용량이 감소하면, 스케일인(Scale In)을 통해 리소스 소비를 최소화하도록 조정하였습니다. 그러나, HPA로 인하여 Kafka 컨슈머의 스케일아웃(Scale Out) 또는 스케일인(Scale In)이 빈번히 발생하면, 파티션 할당(Partition Assignment) 과정에서 부하가 발생하여 Kafka의 성능 저하로 이어질 수 있습니다. 이는 파티션 할당 과정에서의 잦은 변동이 시스템에 부담을 주기 때문입니다. 이러한 문제를 해결하기 위해, 파티션 할당 전략(Partition Assignment Strategy)을 조정하는 설정 변경을 통해 이러한 부하를 최소화할 수 있습니다. 이번 문서에서는 Kafka 컨슈머 그룹에서 파티션을 효율적으로 할당하고 관리함으로써, 성능 저하를 최소화하여 최적화하는 방법에 대하여 정리하였습니다.

 

실습 방법

이번 실습에서는 kcat을 이용하여 카프카로부터 데이터를 consume하는 방법으로 진행하였습니다. kcat을 이용해서 partition.assignment.strategy를 설정하여 컨슈머 모드로 동작하는 방법은 다음과 같습니다.

(kcat 버전 1.7.0 이상부터 사용 가능한 옵션입니다.)

kcat -C -G <컨슈머그룹> -b <브로커주소> -X partition.assignment.strategy=<할당전략> 토픽명

 

docker compose 카프카 클러스터 구축방법
https://stdhsw.tistory.com/entry/Docker-compose%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%9C-Kafka-Cluster-%EA%B5%AC%EC%B6%95-KRAFT-Mode

 

kcat 기본 사용법

https://stdhsw.tistory.com/entry/kcatkafkacat-%EC%82%AC%EC%9A%A9%EB%B2%95

 

partition.assignment.strategy 설정

partition.assignment.strategy 설정은 Kafka Consumer에서 파티션 할당 알고리즘을 결정하는 설정입니다. 이 설정을 통해 Consumer Group 내의 각 Consumer에게 파티션을 할당하는 방식을 정의할 수 있으며, Kafka 클러스터의 전반적인 성능과 가용성에 영향을 미칠 수 있습니다. 다음은 partition.assignment.strategy에 적용될 수 있는 전략과 각각의 특성에 대하여 알아보겠습니다.

Range Assignment Strategy

Range Assignment Strategy는 partition.assignment.strategy 설정의 기본값으로, Kafka Consumer Group에서 파티션 할당을 관리하는 데 사용되는 전략입니다. 이 전략의 핵심은 각 토픽의 파티션들을 Consumer Group 내의 Consumer 리스트에 순차적으로 할당하는 것입니다. 이 할당 방식은 특히 토픽별로 파티션을 균등하게 분배하고자 할 때 유리합니다. 각 토픽에 대해 Consumer들은 대략적으로 균등한 부하를 받게 되어, 부하 분산에 효과적입니다. 그러나 Range Assignment Strategy는 몇 가지 단점도 가지고 있습니다. 특히, Consumer Group 내에서 Consumer의 수가 변할 때 (즉, Scale out이나 Scale in이 발생할 때) 전체 파티션 할당 과정이 재실행됩니다. 이는 Consumer Group에 속한 모든 Consumer의 연결을 잠시 끊고, 파티션 할당을 새롭게 시작한다는 의미입니다. 이러한 과정은 Consumer 수에 자주 변화가 있는 환경에서 비효율적이며, 메시지 처리 지연이나 리밸런싱 동안의 처리량 감소와 같은 문제를 초래할 수 있습니다.

장점 단점
 토픽별로 파티션에 데이터가 골고루 분포되었을 경우, Consumer Group 내의 각 Consumer가 균등하게 부하를 받게 됩니다.

균등한 부하를 받기 때문에 각 컨슈머의 부하량을 예상하기 쉽습니다.
 각 토픽에 대해 파티션을 균등하게 할당하려고 시도하지만, Consumer가 여러 토픽에서 데이터를 가져오는 경우에는 특정 파티션에 부하가 집중될 위험이 있습니다.

Consumer Group에 새로운 Consumer가 추가되거나 기존 Consumer가 제거되는 등의 Scale out/in이 발생하면, Range Assignment Strategy에 따라 모든 파티션이 재할당됩니다. 이 과정에서 모든 Consumer의 연결이 잠시 끊기게 되며, 이는 리밸런싱 과정을 거치게 되어 성능 저하가 발생할 수 있습니다.

 

Range Assignment Strategy의 동작은 아래 이미지처럼 Consumer Group 내의 Consumer 수가 줄어들 경우 모든 파티션 할당은 revoked로 연결이 끊어지고 다시 Topic을 기준으로 남아있는 Consumer에 재할당을 수행합니다.

Round Robin Assignment Strategy 

Round Robin Assignment Strategy는 Kafka에서 사용할 수 있는 또 다른 파티션 할당 전략으로, Range Assignment Strategy와 달리 토픽을 기준으로 파티션을 할당하지 않습니다. 대신, 이 전략은 사용 가능한 모든 파티션을 Consumer Group 내의 모든 Consumer에게 순서대로 할당합니다, 토픽 간 경계를 무시하고 전체 파티션 목록을 기준으로 합니다.

장점 단점
Round Robin 전략은 각 Consumer에게 전체 파티션 목록을 걸쳐 균등하게 할당함으로써, 모든 Consumer가 대략적으로 동일한 양의 작업을 처리하도록 합니다.  특정 토픽이 다른 토픽보다 훨씬 더 높은 메시지 유입률을 가지고 있을 경우, Round Robin 방식은 이러한 불균형을 고려하지 않기 때문에, 특정 토픽의 파티션에서 처리 지연이 발생할 수 있습니다.

Range Assignment Strategy와 마찬가지로 Consumer Group에 새로운 Consumer가 추가되거나 기존 Consumer가 제거되는 등의 Scale out/in이 발생하면 모든 파티션이 재할당됩니다.

 

Round Robin Assignment Strategy의 동작은 아래 이미지처럼 Consumer Group 내의 Consumer 수가 줄어들 경우 모든 파티션 할당은 revoked로 연결이 끊어지고 모든 파티션을 기준으로 Consumer에 재할당을 수행합니다.

 

Sticky Assignment Strategy

Sticky Assignment Strategy는 파티션 재할당을 최소화하는 것을 목표로 하는 전략입니다. Consumer Group의 Consumer 개수의 변화가 생겼을 때 기존의 파티션 할당을 가능한 유지 하여 Consumer 리밸런싱이 발생할 때 파티션의 재할당을 최소화하여 성능을 향상할 수 있는 전략입니다.

장점 단점
Sticky Assignment Strategy는 Consumer에 한 번 할당된 파티션을 가능한 한 그대로 유지하여 Consumer 개수의 변화가 발생하여도 할당된 파티션은 재할당을 수행하지 않습니다.

이미 할당된 파티션에 대해서는 Consumer 리밸런싱을 수행하지 않아 자주 Scale In/Out이 발생하는 환경에서 사용하기 적합합니다.
Sticky Assignment Strategy는 기존의 파티션 할당을 가능한 한 유지하려고 합니다. 이로 인해 Consumer Group 내에서 파티션을 새로운 Consumer에게 더 효율적으로 재분배하는 것이 어려워질 수 있습니다.

 

Sticky Assignment Strategy의 동작은 아래 이미지처럼 Consumer Group 내의 Consumer 수가 줄어들 경우 현재 동작중인 Consumer에 할당된 파티션은 유지하고 끊어진 Consumer에 대하여 재할당을 수행합니다. Sticky Assignment Strategy는 Consumer의 개수가 줄어드는 상황에서는 큰 문제가 발생하지 않으나 반대로 Consumer의 개수가 늘어나거나 잠시 중단되었던 Consumer가 restart를 하여 참여할 경우 해당 컨슈머에는 파티션할당이 불균형할 수 있습니다.

Cooperative Sticky Assignment Strategy 

Cooperative Sticky Assignment Strategy는 기존의 Sticky Assignment Strategy를 기반으로 하면서 몇 가지 중요한 개선 사항을 도입한 것입니다. 이 전략은 기존 Sticky 전략의 단점을 보완하며, 특히 Consumer Group 내의 Consumer 구성이 변할 때 발생하는 리밸런싱 프로세스의 효율성을 개선합니다. Cooperative Sticky 전략은 파티션 할당 변경이 필요할 때 전체 Consumer Group에 대한 리밸런싱을 수행하는 대신, 필요한 최소한의 변경만을 수행합니다. 이는 리밸런싱 프로세스가 더 빠르고 효율적으로 완료될 수 있도록 하여 전체 시스템의 성능에 미치는 영향을 최소화합니다.

장점 단점
Sticky 전략은 재할당의 불균형이 발생할 수 있는 반면에 Cooperative Sticky Assignment Strategy는 이러한 단점을 보완하여 Consumer 리밸런싱을 최소화하면서 균형있는 재할당이 이루어집니다.

Consumer의 Scale in/out이 자주 발생하는 환경에서 사용하기 적합니다.
Cooperative Sticky는 Consumer assignment 작업을 효율적으로 수행하지만 리밸런싱 작업이 다른 Strategy보다 오래 걸리는 단점이 있습니다.

 

Kubernetes 환경에서 Consumer Pod를 Horizontal Pod Autoscaler (HPA)와 함께 구성하여 Pod의 Scale in/out이 발생하는 환경에서는 Cooperative Sticky Assignment Strategy을 선택하는 것이 현명한 선택입니다. Cooperative Sticky는 티션 할당을 최소화하면서 점진적인 리밸런싱을 지원하기 때문에, Pod의 확장 및 축소가 발생해도 Kafka Consumer Group의 전체적인 성능 저하를 줄일 수 있습니다.

마지막으로

Kafka를 사용하는 경험이 쌓이면서 느끼는 것은 모든 상황을 만족하는 설정 값은 없다는 것을 많이 느끼게 되는데요. 현재 자신이 사용하고 있는 데이터의 특성과 Producer, Consumer의 상황을 잘 파악하고 가장 적절한 설정 값을 적용하는 것이 가장 중요하다고 생각합니다.

 

kcat(이전 명칭: kafkacat)은 Kafka를 테스트하고 디버깅하는 데 매우 유용한 명령줄 도구입니다. Kafka 클러스터, 토픽, 파티션 등에 대한 메타데이터 조회 기능을 제공하며, Producer 및 Consumer 모드에서 각각 메시지를 전송하고 수신하는 기능을 가지고 있습니다. 

Producer 모드에서 kcat은 표준 입력 또는 파일로부터 메시지를 읽어 지정된 Kafka 클러스터에 토픽 파티션으로 전송할 수 있습니다. Consumer 모드에서는 Kafka 토픽 파티션으로부터 메시지를 읽어 표준 출력으로 내보내며, 이를 통해 메시지 스트림의 내용을 실시간으로 모니터링하고 분석할 수 있습니다.

 

kcat은 kafka-console-consumer.sh와 kafka-console-producer.sh와 같은 Kafka의 기본 제공 명령줄 도구들과 유사해 보일 수 있지만, 더 다양하고 고급 기능을 제공합니다. 이 문서에서는 kcat의 설치 방법부터 기본적인 사용법에 대하여 다루었습니다.

kcat 설치 방법

kcat을 사용하기 전에, 시스템에 설치되어 있지 않다면 먼저 설치 과정을 거쳐야 합니다. 여기서는 macOS와 Ubuntu 시스템에서 kcat을 설치하는 방법에 대하여 작성하였습니다.

 

macOS 설치

macOS에서 kcat을 설치하는 가장 간단한 방법은 Homebrew를 사용합니다. Homebrew는 macOS 용 패키지 관리자입니다. Homebrew의 명령어를 이용하여 터미널에서 kcat을 설치할 수 있습니다

> brew update
> brew install kcat

 

Ubuntu 설치

Ubuntu에서 kcat을 설치하는 과정은 apt 패키지 관리자를 통해 간단하게 수행할 수 있습니다.

> sudo apt-get update
> sudo apt-get -y install kafkacat

 

설치가 완료되었다면 다음과 같이 명령어를 통하여 정상적으로 잘 설치가 되었는지 확인할 수 있습니다.

> kcat -V

Create Topic

Kcat은 Kafka 클러스터와 상호작용하는 데 매우 유용한 도구이지만, 아쉽게도 직접적으로 토픽을 생성하는 기능을 제공하지 않습니다. Kafka 토픽 생성은 일반적으로 Kafka의 내장 명령줄 도구인 kafka-topics.sh를 사용하여 수행됩니다. 토픽을 생성할 때는 토픽의 이름, 파티션 수, 복제 계수(replication factor)와 같은 중요한 설정을 지정해야 합니다. 예를 들어, my-topic이라는 이름의 토픽을 파티션 3개와 복제 계수 2로 설정하여 생성하기 위해서는 다음과 같은 명령어를 사용할 수 있습니다.

 

토픽생성 명령어

# kafka-topics.sh --create --bootstrap-server <브로커 주소> --replication-factor <복제본개수> --partitions <파티션개수> --topic <토픽명>
> kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 3 --topic my-topic

이 명령어는 Kafka 클러스터의 브로커 주소를 --bootstrap-server 옵션을 통해 지정하고, --replication-factor와 --partitions 옵션으로 각각 복제본 수와 파티션 수를 설정하여 my-topic 토픽을 생성합니다.

메타데이터 출력

Kcat을 활용하여 Kafka 클러스터의 메타데이터를 조회가 가능하며 사용 가능한 토픽 목록과 같은 정보를 얻을 수 있습니다.

  • '-L' 옵션을 사용하여 Kafka 클러스터의 메타데이터를 출력할 수 있습니다. 이 옵션은 클러스터에 대한 정보, 예를 들어 사용 가능한 토픽, 각 토픽의 파티션 정보, 리플리케이션 상태 등을 포함한 전반적인 상태 보고서를 제공합니다.
  • Kafka 클러스터와의 연결을 설정하기 위해서는 '-b' 옵션을 통해 브로커 주소를 입력합니다.
# kcat -L -b 브로커주소
> kcat -L -b localhost:10000

Producer 사용법

kcat을 이용한 Kafka Producer의 기본 사용법에 대해 설명하겠습니다. kcat의 Producer 모드를 사용하면, 사용자는 Kafka 토픽으로 메시지를 쉽게 전송할 수 있습니다. 이 모드를 활성화하려면, 다음 단계를 따르면 됩니다.

 

기본 방식의 메시지 전송

먼저 '-P' 옵션을 사용하여 kcat을 Producer 모드로 설정합니다. 이 모드는 Kafka 토픽에 메시지를 전송하기 위한 것입니다. '-b' 옵션으로 Kafka 브로커의 주소를 지정합니다. 브로커는 Kafka 시스템에서 메시지를 받아 처리하는 서버를 말합니다. '-t' 옵션을 사용해 데이터를 전송할 토픽을 지정합니다.

데이터 입력이 완료되면, Ctrl+D를 눌러 데이터 전송을 완료하고 kcat 명령을 종료합니다. 이는 표준 입력의 끝을 나타내며, kcat이 메시지 전송을 마무리하고 종료하도록 합니다.

kcat -P -b localhost:10000 -t my-topic
aaa
bbb
ccc

 

Key 값을 이용한 Partitioner 메시지 전송

추가적으로 kcat의 '-K:' 옵션을 사용하면 제공된 키를 사용하여 메시지를 특정 파티션으로 라우팅합니다. 이 기능은 Kafka의 키 기반 메시지 분할(partitioning) 메커니즘을 활용하는 것으로, 같은 키를 가진 메시지들이 항상 같은 파티션에 할당되도록 보장합니다. 이는 메시지의 순서를 유지하거나, 관련 메시지를 동일한 파티션에 그룹화하는 데 유용합니다.

'-K:' 옵션을 사용할 때는 메시지를 "키:값" 형식으로 작성하여 전송해야 합니다. 이때, 키와 값 사이에는 콜론(':')을 사용하여 구분합니다. 예를 들어, '1:Hello World'와 같은 형식으로 메시지를 작성하면, '1'이 메시지의 키가 되고, 'Hello World'가 메시지의 값이 됩니다. 메시지를 전송할 때 해당 키가 항상 동일한 파티션에 할당됩니다. 이는 데이터의 일관성과 순서를 유지할 수 있습니다.

kcat -P -b localhost:10000 -t my-topic -K:

 

파일 형식의 메시지 전송

위와 같은 방법으로도 Kafka에 메시지를 전송하는 과정은 매우 간단하고 직관적이지만, 수동으로 반복하여 메시지를 전송하거나 대량의 데이터를 한 번에 전송하는 경우 불편함을 겪을 수 있습니다. 이러한 작업을 효율적으로 처리하기 위해 kcat은 파일을 통해 미리 입력된 데이터를 전송하는 기능을 제공합니다.

kcat -P -b localhost:10000 -t my-topic -l data.txt

Consumer 사용법

기본 방식의 메시지 수신

kcat에서 '-C' 옵션은 Consumer 모드를 활성화하여 지정된 Kafka 토픽의 데이터를 가져오고, 이를 표준출력으로 출력하는 기능을 제공합니다. 이 옵션을 사용하면, Kafka 토픽에 저장된 메시지를 실시간으로 읽어 볼 수 있습니다. 여기서 -b 옵션은 Kafka 브로커의 주소를 지정하며, -t 옵션은 데이터를 가져올 토픽의 이름을 지정합니다. 이 명령을 실행하면, kcat은 my-topic 토픽으로부터 메시지를 읽어 표준 출력으로 내보냅니다. 이 과정은 Ctrl+C를 눌러 중단할 수 있습니다.

kcat -C -b localhost:10000 -t my-topic

 

Consumer Group 사용법

kcat에서 '-G' 옵션은 Consumer Group 모드를 활성화하여, 여러 개의 Consumer가 속한 Group을 지정하고, Kafka 토픽의 데이터를 여러 Consumer에 분산해서 처리할 수 있게 합니다. 이 기능은 Kafka의 Consumer Group 메커니즘을 활용하는 것으로, 메시지의 부하를 여러 Consumer 사이에 균등하게 분산시켜 처리할 수 있게 해주며, 데이터 처리의 확장성과 안정성을 높일 수 있습니다.

'-G'옵션을 적용했다면 위와는 다르게 '-t' 옵션을 제거해야 합니다.

kcat -C -G my-group -b localhost:10000 my-topic

Kafka에서 같은 Consumer Group에 속한 여러 Consumer를 사용할 경우, Consumer 리밸런싱이라는 과정을 통해 각 Consumer에게 특정 파티션들이 자동으로 할당되어 지정된 파티션으로 부터 데이터를 수신할 수 있습니다.

 

Consumer 출력 포맷 설정

kcat을 사용할 때, Consumer로 출력되는 데이터 포맷을 사용자가 지정할 수 있어, 출력 데이터를 더욱 보기 쉽고, 분석하기 용이하게 만들 수 있습니다. 이러한 출력 포맷은 kcat의 다양한 옵션을 통해 커스터마이징될 수 있으며, 메시지의 키, 값, 타임스탬프, 파티션 정보 등 다양한 메타데이터를 포함할 수 있습니다. 출력 포맷 설정에 대한 자세한 옵션은 kcat -h 명령어를 실행함으로써 확인할 수 있습니다.

kcat -C  -b localhost:10000 -t my-topic -f 'Topic[%t] Partition[%p] Offset[%o] %k:%s\n'

 

참조

https://docs.confluent.io/platform/current/tools/kafkacat-usage.html

 

Use kcat (formerly kafkacat) to test and debug Apache Kafka deployments | Confluent Documentation

kcat (formerly kafkacat) Utility for Confluent Platform kcat (formerly kafkacat) is a command-line utility that you can use to test and debug Apache Kafka® deployments. You can use kcat to produce, consume, and list topic and partition information for Kaf

docs.confluent.io

 

개요

현재 회사는 Strimzi를 활용하여 Kubernetes 환경에서 Kafka Cluster를 성공적으로 구축하여 상용 서버 및 개발 서버에서 운영하고 있습니다. 그러나 팀원 다 같이 사용하는 실제 운영 환경에서 카프카 설정을 변경하는 테스트나 다양한 실험을 진행하기에는 제약이 따르게 됩니다. 공유된 리소스 상에서의 실험은 다른 팀원들에게 영향을 줄 수 있으므로, 실제 작업 환경에서의 실험은 신중히 이루어져야 합니다. 이러한 상황에서는 개인 Kafka Cluster를 구축하여 필요한 테스트나 실험을 자유롭게 진행할 수 있는 환경이 필요하다는 것을 느끼게 되는데요. 이번 문서에서는 바로 그러한 필요성으로 인하여 Docker Compose를 사용하여 개인 Kafka Cluster를 구축하는 방법에 대해서 간단하게 정리하였습니다.
컨테이너 기술을 활용하여 Kafka Cluster를 구축한 경우 관리 및 유지보수의 효율성이  향상됩니다. 특히 컨테이너를 사용하면 Kafka Cluster의 삭제 및 재생성 작업을 매우 간단하고 빠르게 수행할 수 있으며, 컨테이너화된 환경이 제공하는 격리와 독립성 덕분에 컨테이너에서 발생하는 변경사항이 다른 컨테이너에 영향을 주지 않습니다. 따라서, Kafka Cluster를 구성하는 개별 컨테이너들을 쉽게 종료하고, 필요에 따라 새로운 설정으로 다시 시작할 수 있습니다. 이러한 유연성은 개발과 테스트 과정에서 특히 유용하며, 다양한 설정과 구성을 실험하고 최적화하는데 아주 편리합니다. (이번 실습에서는 Kafka Cluster를 구축하는 것을 목표로 하고 있기 때문에 docker-compose 및 docker 사용법에 대해서는 자세히 다루지 않습니다.)

 

이번 실습 목표

이번 실습의 목표는 개인 컴퓨터에 docker-compose를 이용하여 개인 전용 개발환경 kraft mode인 Kafka Cluster를 구축하는 것이기 때문에 최소한의 자원으로 Kafka Cluster를 구축하는 것을 목표로 하고 있습니다. (kraft mode는 zookeeper를 제거한 kafka cluster 입니다.)

  • kafka broker : 3대
  • kafka ui : 1대

docker-compose.yml 파일 작성

작업하는 위치에 다음과 같이 docker-compose.yml 파일을 생성합니다.

networks:
  kafka_network:

volumes:
  Kafka00:
  Kafka01:
  Kafka02:

services:
  ### Kafka00
  kafka00:
    image: bitnami/kafka:3.7.0
    restart: unless-stopped
    container_name: kafka00
    ports:
      - '10000:9094'
    environment:
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # KRaft settings
      - KAFKA_CFG_BROKER_ID=0
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka00:9093,1@kafka01:9093,2@kafka02:9093
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # Listeners
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka00:9092,EXTERNAL://127.0.0.1:10000
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      # Clustering
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
    networks:
      - kafka_network
    volumes:
      - "Kafka00:/bitnami/kafka"
  ### Kafka01
  kafka01:
    image: bitnami/kafka:3.7.0
    restart: unless-stopped
    container_name: kafka01
    ports:
      - '10001:9094'
    environment:
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # KRaft settings
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka00:9093,1@kafka01:9093,2@kafka02:9093
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # Listeners
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka01:9092,EXTERNAL://127.0.0.1:10001
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      # Clustering
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
    networks:
      - kafka_network
    volumes:
      - "Kafka01:/bitnami/kafka"
  ## Kafka02
  kafka02:
    image: bitnami/kafka:3.7.0
    restart: unless-stopped
    container_name: kafka02
    ports:
      - '10002:9094'
    environment:
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # KRaft settings
      - KAFKA_CFG_BROKER_ID=2
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka00:9093,1@kafka01:9093,2@kafka02:9093
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # Listeners
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka02:9092,EXTERNAL://127.0.0.1:10002
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      # Clustering
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
    networks:
      - kafka_network
    volumes:
      - "Kafka02:/bitnami/kafka"

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    restart: unless-stopped
    container_name: kafka-ui
    ports:
      - '8080:8080'
    environment:
      - KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka00:9092,kafka01:9092,kafka02:9092
      - DYNAMIC_CONFIG_ENABLED=true
      - KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
      - KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
      #- KAFKA_CLUSTERS_0_METRICS_PORT=9999
    depends_on:
      - kafka00
      - kafka01
      - kafka02
    networks:
      - kafka_network

컨테이너 실행하기

docker-compose.yml 파일을 잘 작성하였다면 해당 디렉토리에서 다음 명령어를 통하여 컨테이너를 실행합니다.

docker-compose up -d

정상 동작 확인

이번 실습에서는 클러스터 및 토픽 현황을 파악하기 쉽게 하기 위해 kafka-ui 컨테이너를 같이 동작하게 되어있습니다. kafka-ui를 통해 현재 카프카 클러스터가 정상적으로 동작하는지 브라우저에서 확인할 수 있습니다.
http://localhost:8080/ui/clusters/local/brokers

 

참조

https://github.com/ArminShoeibi/KafkaDockerCompose/blob/main/docker-compose-cluster.yml

 

'Kafka > kafka 기본' 카테고리의 다른 글

kcat(kafkacat) 사용법  (2) 2024.03.05
Docker compose를 이용한 Kafka Cluster 구축 (with Zookeeper)  (0) 2024.03.03

개요

현재 회사는 Strimzi를 활용하여 Kubernetes 환경에서 Kafka Cluster를 성공적으로 구축하여 상용 서버 및 개발 서버에서 운영하고 있습니다. 그러나 팀원 다 같이 사용하는 실제 운영 환경에서 카프카 설정을 변경하는 테스트나 다양한 실험을 진행하기에는 제약이 따르게 됩니다. 공유된 리소스 상에서의 실험은 다른 팀원들에게 영향을 줄 수 있으므로, 실제 작업 환경에서의 실험은 신중히 이루어져야 합니다. 이러한 상황에서는 개인 Kafka Cluster를 구축하여 필요한 테스트나 실험을 자유롭게 진행할 수 있는 환경이 필요하다는 것을 느끼게 되는데요. 이번 문서에서는 바로 그러한 필요성으로 인하여 Docker Compose를 사용하여 개인 Kafka Cluster를 구축하는 방법에 대해서 간단하게 정리하였습니다.

컨테이너 기술을 활용하여 Kafka Cluster를 구축한 경우 관리 및 유지보수의 효율성이  향상됩니다. 특히 컨테이너를 사용하면 Kafka Cluster의 삭제 및 재생성 작업을 매우 간단하고 빠르게 수행할 수 있으며, 컨테이너화된 환경이 제공하는 격리와 독립성 덕분에 컨테이너에서 발생하는 변경사항이 다른 컨테이너에 영향을 주지 않습니다. 따라서, Kafka Cluster를 구성하는 개별 컨테이너들을 쉽게 종료하고, 필요에 따라 새로운 설정으로 다시 시작할 수 있습니다. 이러한 유연성은 개발과 테스트 과정에서 특히 유용하며, 다양한 설정과 구성을 실험하고 최적화하는데 아주 편리합니다. (이번 실습에서는 Kafka Cluster를 구축하는 것을 목표로 하고 있기 때문에 docker-compose 및 docker 사용법에 대해서는 자세히 다루지 않습니다.)

이번 실습 목표

이번 실습의 목표는 개인 컴퓨터에 docker-compose를 이용하여 Kafka Cluster를 구축하는 것이기 때문에 최소한의 자원으로 Kafka Cluster를 구축하는 것을 목표로 하고 있습니다. 추가적인 설정은 zookeeper documentation, kafka documentation을 통해 설정하는 것을 권장합니다.

  • zookeeper : 1대
  • kafka broker : 3대
  • kafka ui : 1대

docker-compose.yml 파일 작성

작업하는 위치에 다음과 같이 docker-compose.yml 파일을 생성합니다.

version: '3'

services:
  zookeeper1:
    image: "bitnami/zookeeper:3.7.2"
    container_name: zookeeper1
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - "~/zookeeper/1:/bitnami/zookeeper"

  kafka1:
    image: "bitnami/kafka:3.6.0"
    container_name: kafka1
    ports:
      - "9092:9092"
    environment:
      - KAFKA_BROKER_ID=1
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181
    depends_on:
      - zookeeper1
    volumes:
      - ~/kafka/1:/bitnami/kafka

  kafka2:
    image: "bitnami/kafka:3.6.0"
    container_name: kafka2
    ports:
      - "9093:9092"
    environment:
      - KAFKA_BROKER_ID=2
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181
    depends_on:
      - zookeeper1
    volumes:
      - ~/kafka/2:/bitnami/kafka

  kafka3:
    image: "bitnami/kafka:3.6.0"
    container_name: kafka3
    ports:
      - "9094:9092"
    environment:
      - KAFKA_BROKER_ID=3
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181
    depends_on:
      - zookeeper1
    volumes:
      - ~/kafka/3:/bitnami/kafka

  kafka-ui:
    image: 'provectuslabs/kafka-ui:latest'
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9093,kafka3:9094
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper1:2181
    depends_on:
      - zookeeper1
      - kafka1
      - kafka2
      - kafka3

이번 실습은 개인 개발 환경 카프카 클러스터를 구축하는 것이 목적이기 때문에 volume을 로컬 호스트 경로를 마운트하여 사용하였습니다. 이러한 로컬 마운트를 통해 쉽게 카프카 데이터의 변경 및 로그에 대한 내용을 로컬에서 직접 확인하기 위한 설정입니다. 만약 개인 테스트 용도가 아니라면 별도의 volume을 구성하여 사용하는 것을 권장합니다.

 

zookeeper 환경변수

  • ALLOW_ANONYMOUS_LOGIN=yes 익명 사용자의 로그인을 허용합니다.

kafka 환경변수

  • KAFKA_BROKER_ID 각 브로커의 아이디를 부여합니다.
  • ALLOW_PLAINTEXT_LISTENER=yes 개발 환경에서 사용하도록 보안 리스너 설정이 없이 동작합니다.
  • KAFKA_ZOOKEEPER_CONNECT 카프카가 주키퍼에 연결할 수 있도록 주키퍼 연결 정보를 입력합니다.

컨테이너 실행하기

docker-compose.yml 파일을 잘 작성하였다면 해당 디렉토리에서 다음 명령어를 통하여 컨테이너를 실행합니다.

docker-compose up -d

정상 동작 확인

이번 실습에서는 클러스터 및 토픽 현황을 파악하기 쉽게 하기 위해 kafka-ui 컨테이너를 같이 동작하게 되어있습니다. kafka-ui를 통해 현재 카프카 클러스터가 정상적으로 동작하는지 브라우저에서 확인할 수 있습니다.

http://localhost:8080/ui/clusters/local/brokers

컨테이너 종료

docker-compose down -v

'Kafka > kafka 기본' 카테고리의 다른 글

kcat(kafkacat) 사용법  (2) 2024.03.05
Docker compose를 이용한 Kafka Cluster 구축 (KRAFT Mode)  (0) 2024.03.05

해당 문서는 2023년 7월에 작성된 문서입니다.

kafka.common.InconsistentClusterIdException은 Apache Kafka에서 발생하는 클러스터 ID 불일치해서 발생한 오류입니다. 이 오류는 일반적으로 Kafka 브로커에 설정된 클러스터 ID와 실제 클러스터 ID가 일치하지 않을 때 발생합니다. 클러스터 ID는 Kafka 클러스터의 고유 식별자이며, 모든 브로커가 동일한 클러스터 ID를 가지고 있어야 합니다. 여기서 대표적인 2가지 해결 방법에 대해서 정리하였습니다.

Error Message

[2023-06-28 01:42:38,833] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID Ap0xMkY_TIyx9Ntyq565Pg doesn't match stored clusterId Some(LwpqAcZKQXCPnInmC0w2sw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong

 

해결방법 1 : Cluster ID 일치 시키기

먼저 Kafka 클러스터에 속한 모든 브로커의 클러스터 ID를 확인합니다. 브로커의 설정 파일 (server.properties)을 열고 cluster.id 항목을 찾아 현재 클러스터 ID를 확인합니다. 모든 브로커가 동일한 클러스터 ID를 가져야 합니다. 각각의 브로커의 meta.properties 파일에 cluster id를 통일시킨 후 카프카를 재시작한다.

해결방법 2 : meta.properties 파일 삭제

1번 방법으로 해결이 안 될 경우 각각의 브로커에서 meta.properties 파일을 삭제하고 재시작합니다.

rm -f (log.dirs명시한 경로)/meta.properties

마지막으로

위 2가지 해결 방안은 일시적인 해결책입니다. 즉 해당 Exception이 또 발생할 가능성이 있습니다. 좀 더 카프카에 대해 배워서 해당 문제가 또 발생하지 않는 방법을 찾아야 될 것 같습니다. 보다 더 좋은 해결책을 아신다면 공유 부탁드립니다.

 

 

 

 

+ Recent posts