설정 동기화 작업

1. vscode 설정 동기화하기

code -> 기본 설정 -> 백업 및 동기화 설정... 을 선택합니다.

 

2. 로그인 및 백업할 리스트 선택

동기화할 리스트를 선택하고 Microsoft나 Github으로 로그인을 진행합니다.

위 2가지 작업만으로 간단하게 동기화 작업을 수행할 수 있습니다.

 


설정 불러오기

1. vscode 설정 동기화하기

vscode 설정 동기화 작업과 같이 새로운 컴퓨터에서 code -> 기본 설정 -> 백업 및 동기화 설정... 을 선택합니다.

만약 새로운 컴퓨터에서 동기화 하기 이전에 설정을 변경하였다면 변경된 설정을 병합할 것인지 안내창이 나옵니다. 본인의 설정에 맞게 선택하여 진행하시면 됩니다.

vscode를 사용하여 업무를 하다보면 여러가지 이유로 잠깐 사용하고 그 뒤로는 사용하지 않게 되는 extension과 설정들이 있습니다. 이런 일이 반복되면 사용하지 않는 extension과 설정들로 vscode를 깔끔하게 정리하고 싶은 순간들이 있는데요. 그래서 이번 문서에서는 macOS에서 vscode를 초기화 하는 방법에 대하여 정리하였습니다.

 

1. vscode 종료

먼저 vscode를 초기화하기 전에 vscode를 안전하게 종료합니다.

 

2. vscode 설정 폴더 삭제

VSCode의 설정 파일과 사용자 데이터는 ~/Library/Application Support/Code 디렉토리에 저장됩니다. 터미널을 열고 아래 명령어를 실행하여 해당 폴더를 삭제합니다.

rm -rf ~/Library/Application\ Support/Code

 

3. 캐시 및 설정 데이터 삭제

캐시 파일과 로컬 저장소 데이터는 ~/Library/Caches/com.microsoft.VSCode~/Library/Application Support/Code에 있습니다. 아래 명령어를 사용하여 이 폴더들도 삭제합니다.

rm -rf ~/Library/Caches/com.microsoft.VSCode
rm -rf ~/Library/Saved\ Application\ State/com.microsoft.VSCode.savedState

 

4. 확장 프로그램 삭제

설치된 확장 프로그램은 ~/.vscode/extensions 폴더에 저장됩니다. 이 폴더를 삭제하여 확장 프로그램도 제거합니다.

rm -rf ~/.vscode/extensions

 

이상 macOS에 설치된 vscode 초기화 방법이였습니다. 위와 같은 방법으로 vscode를 초기화하여 깔끔한 개발 환경을 유지해보세요~

저는 현재 메인으로 사용하고 있는 IDE는 JetBrains에서 서비스하고 있는 Goland를 사용하고 있습니다. 그러나 JetBrains를 사용하다 보면 코드 파일이 아닌 Markdown, Text 파일의 경우 가로로 무한으로 입력되어 파일의 내용을 확인하기 위해 가로 스크롤로 이동해야 하는 불편함이 기본값으로 설정되어 있습니다. 이번 문서에서는 특정 파일 확장자에서는 파일의 내용을 한눈에 보기 좋게 설정하는 방법에 대하여 간단하게 정리하였습니다.

 

기본 설정 화면

기본 설정 화면을 보시면 내용이 계속해서 가로로만 입력되어 가로 스크롤이 계속 늘어나는 것을 확인할 수 있습니다.

 

화면 설정 변경

설정 -> 에디터 -> 일반 -> 소프트 랩(Soft warp) 화면에서 "다음 파일을 소프트랩" 설정에 체크 박스를 클릭하시면 됩니다. 해당 설정을 활성하면 *.md, *.txt, *.rst, *.adoc 파일은 가로의 넓이가 고정되어 파일의 내용을 한눈에 보기 좋게 설정됩니다.

 

결과 확인

결과를 보면 첫 사진과 다르게 가로 넓이가 고정되어 가로 스크롤이 사라지고 파일의 내용을 한눈에 보기 좋게 설정된 것을 알 수 있습니다.

 

 

kubernetes max pods

Kubernetes 공식 문서에 따르면 기본 설정으로 노드 당 최대 110개의 파드를 생성할 수 있습니다. 이는 노드에서 파드의 IP를 할당하기 위해서입니다. IP 주소 자원은 한정되어 있기 때문에, 노드당 생성할 수 있는 파드 수를 제한하여 네트워크 충돌을 방지하고 안정적인 운영을 보장하고 있습니다.

 

아래 이미지는 Kubernetes 공식 문서에서 제시하는 기본값입니다.

  • 노드는 최대 110개의 파드
  • 클러스터는 최대 5000개의 노드
  • 클러스터는 최대 150000개의 파드
  • 클러스터는 최대 300000개의 컨테이너

 

kubernetes max pods 개수 수정

kubernetes 공식 문서에서 제공하는 기본 값을 사용하는 것을 권장하지만 그래도 상황에 따라 노드의 개수는 늘리지 못하지만 최대 파드의 개수가 더 필요한 경우가 있습니다. 이번 문서에서는 최대 파드의 개수를 수정하는 방법에 대해 알아보겠습니다.

 

현재 노드의 할당할 수 있는 파드의 개수 확인

$ kubectl get nodes -o=custom-columns=NODE:.metadata.name,MAXPODS:.status.capacity.pods
NODE      MAXPODS
master    110
worker1   110
worker2   110

 

해당 worker node kubelet 수정

먼저 worker node에 접속하여 sudo 권한을 부여하고 maxPods의 내용을 추가합니다.

vi /var/lib/kubelet/config.yaml
# 아래 내용 추가
maxPods: 250

 

worker node kubelet 재시작

sudo systemctl daemon-reload
sudo systemctl restart kubelet

 

결과 확인

$ kubectl get nodes -o=custom-columns=NODE:.metadata.name,MAXPODS:.status.capacity.pods
NODE      MAXPODS
master    110
worker1   250
worker2   250

 

 

참고 문서

https://kubernetes.io/docs/setup/best-practices/cluster-large/

https://github.com/kubernetes/kubernetes/issues/23349

 

오픈소스 프로젝트와 라이센스

오픈소스 프로젝트를 살펴보면, 각 코드 파일에 라이센스가 명시되어 있는 것을 볼 수 있습니다. 이 라이센스는 프로젝트에 사용된 코드의 사용 범위와 조건을 정의하여, 코드를 법적으로 보호하는 중요한 역할을 합니다. 라이센스를 각 코드 파일에 일일이 추가하는 작업은 번거로울 수 있습니다. 이러한 불편을 해소하기 위해 Google에서는 addlicense 도구를 제공하고 있으며, 이 도구를 사용하면 코드에 라이센스를 효율적으로 추가할 수 있습니다. 본 문서에서는 addlicense를 사용하여 Go 언어의 코드 파일에 라이센스를 추가하는 방법을 설명하겠습니다. 비록 예시는 Go 언어로 진행하지만, addlicense는 다양한 프로그래밍 언어에 적용 가능합니다.

 

이번 문서에서는 설치 방식의 사용법과 Docker Container 형식의 사용법에 대하여 정리하였습니다. 본인의 환경에 맞는 방법을 선택하여 진행하시면 됩니다.

 

addlicense 설치 방식의 사용법

Google에서 제공하는 addlicense 도구는 Go 언어로 개발되어 있습니다. 따라서 이 도구를 사용하기 전에는 Go 언어가 시스템에 설치되어 있어야 합니다. Go 언어가 이미 설치되어 있지 않다면, 사용하기 전에 반드시 Go 언어의 설치를 먼저 진행해야 합니다. Go 언어 설치는 공식 웹사이트에서 지원하는 안내에 따라 진행할 수 있습니다.

https://go.dev/doc/install

 

addlicense 설치

# addlicense 설치
go install github.com/google/addlicense@latest

 

addlicense 설치 확인

# 설치 확인
ls $(go env GOPATH)/bin
addlicense --help

 

addlicense 설치 확인 시 문제 발생한 경우

만약 command not found 발생 시 PATH를 추가합니다. (이미 추가되어있는 분은 수행하지 않습니다.)

# zsh: command not found: addlicense와 같은 문제가 발생할 경우 PATH에 go bin위치 추가
export PATH=$PATH:$(go env GOPATH)/bin

 

addlicense를 이용하여 라이센스 추가하기

프로젝트 디렉토리로 이동 후 실행

# addlicense 사용법 **로 모든 경로를 지정합니다.
addlicense -c '프로젝트명' -l apache ./**/*.go

 

addlicense 삭제

더 이상 addlicense가 필요 없을 경우 삭제하는 방법입니다.

# addlicense 삭제 방법
ls $(go env GOPATH)/bin
rm $(go env GOPATH)/bin/addlicense

 

Docker container 방식의 사용법

개발자로 근무하면서 다양한 도구와 라이브러리를 설치하고 관리하는 것은 종종 번거롭고 시간이 많이 소모되는 일입니다. 특히 설치한 프로그램을 나중에 제거할 때 발생하는 문제들로 인해 작업 효율이 떨어질 수 있습니다. 이러한 문제를 해결하기 위해 많은 개발자들이 설치와 설정, 실행을 간편화하는 방법으로 Container를 많이 사용하고 있습니다. Container를 사용하면 필요할 때만 특정 환경을 구성하고 작업이 끝나면 컨테이너를 간편하게 제거할 수 있습니다. 이번 섹션에서는 Docker 컨테이너를 사용하여 addlicense 도구를 실행하는 방법을 소개하겠습니다. 

 

도커 이미지 다운로드

docker pull ghcr.io/google/addlicense:latest

 

컨테이너로 실행

docker run -it -v ${PWD}:/src ghcr.io/google/addlicense -c "프로젝트명" **/*.go

 

결과  확인

아래 명령어로 수행한 결과 다음과 같이 라이센스가 추가되었습니다.

addlicense -c 'mytest' -l apache ./**/*.go

 

'Go언어' 카테고리의 다른 글

golang version upgrade (ubuntu)  (0) 2023.07.09
go work 사용해보기  (0) 2023.07.06
Ubuntu(Linux)에서 Go 재설치  (0) 2022.10.30
Go언어 interface reflect  (0) 2021.08.15
Go언어 Cron  (0) 2021.06.15

Cruise Control

Strimzi는 LinkedIn에서 개발한 Cruise Control을 지원합니다. Cruise Control은 Kafka 클러스터의 성능 모니터링, 로드 밸런싱, 스케일링 및 장애 복구 등을 자동화하는 도구로, Kafka 클러스터의 운영을 크게 단순화하고 최적화합니다. Strimzi를 사용하면 Cruise Control을 Kafka 클러스터와 쉽게 통합하여 다음과 같은 기능을 활용할 수 있습니다.

로드 밸런싱
Cruise Control은 클러스터 전체의 워크로드 분포를 지속적으로 모니터링하고, 필요에 따라 파티션을 다시 분배하여 로드 밸런싱을 수행합니다. 이를 통해 리소스 사용률을 최적화하고, 클러스터의 처리량과 성능을 극대화할 수 있습니다.

Broker 추가 및 제거
클러스터의 워크로드 요구사항이 변경될 때, Cruise Control을 사용하면 Kafka 브로커를 자동으로 추가하거나 제거할 수 있습니다.

장애 복구
브로커 장애가 발생할 경우, Cruise Control은 자동으로 장애 복구 프로세스를 수행하여, 영향을 받은 파티션을 다른 브로커로 재할당합니다. 이는 클러스터의 가용성과 내구성을 유지하는 데 중요합니다.

 

위와 같이 Cruise Control에서는 여러가지 기능을 제공하지만 이번 문서 실습에서는 Broker의 추가 및 제거에 중점으로 실습을 진행하였습니다.

Cruise Control Goal

Cruise Control은 Kafka 클러스터의 최적화와 관리를 위해 다양한 목표(Goals)를 사용합니다. 목표는 goals, default.goals, hard.goals로 구분되어 있습니다. goals에서는 default.goals에서 사용할 목표를 정의합니다. default.goals은 Cruise Control의 작업 중에 기본적으로 사용되는 목표들의 집합을 정의하며 반드시 goals에 정의된 목표만 사용할 수 있습니다. hard.goals 설정은 반드시 만족해야 하는 목표들의 집합을 정의합니다. 이 목표들은 Cruise Control이 어떤 상황에서도 위반해서는 안 되는 필수 조건으로 작용합니다.

goals, default.goals, hard.goals 기본값

# goal 기본값
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal

# default.goal 기본값
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal

# hard.goal 기본값
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal

goal 목표의 의미

# 클러스터의 복제본이 랙 고장에 대해 내성을 가지도록 보장합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal

# 랙을 고려하여 파티션의 복제본이 분산되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal

# 브로커 당 최소한의 토픽 리더를 유지하여 리더 분포의 균형을 맞춥니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal

# 모든 브로커가 설정된 복제본 용량을 초과하지 않도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal

# 모든 브로커의 디스크 사용량이 설정된 임계값을 넘지 않도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal

# 네트워크 입력 용량의 임계값을 넘지 않도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal

# 네트워크 출력 용량의 임계값을 넘지 않도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal

# CPU 사용량이 설정된 임계값을 넘지 않도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal

# 전체 클러스터에 걸쳐 복제본이 고르게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal

# 네트워크 출력 잠재력을 기준으로 최적화를 수행합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal

# 디스크 사용량이 브로커 간에 균일하게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal

# 네트워크 입력 사용량이 브로커 간에 균일하게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal

# 네트워크 출력 사용량이 브로커 간에 균일하게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal

# CPU 사용량이 브로커 간에 균일하게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal

# 특정 토픽의 복제본이 클러스터에 걸쳐 고르게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal

# 리더 복제본이 브로커 간에 고르게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal

# 리더 복제본의 입력 데이터가 브로커 간에 고르게 분포되도록 합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal

# Kafka Assigner 도구와 유사한 방식으로 디스크 사용량을 최적화합니다.
com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal

# Kafka Assigner 도구와 유사한 방식으로 랙 인식 복제본 배치를 최적화합니다.
com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal

# 리더 복제본의 선출을 최적화하여 특정 조건에서 리더의 선출을 선호합니다.
com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal

Strimzi를 이용한 Cruise Control 배포

Strimzi에서는 Cruise Control를 배포하는 방법으로는 별도의 리소스를 정의하는 것이 아닌 kafkas.kafka.strimzi.io 리소스에 cruiseControl: {}를 정의하면 카프카 클러스터와 함께 Deployment 형식으로 배포됩니다.

---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
  annotations:
    strimzi.io/node-pools: enabled
spec:
  kafka:
    version: 3.6.1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    config:
      default.replication.factor: 2
      min.insync.replicas: 1
      inter.broker.protocol.version: "3.6"
      num.partitions: 3
    resources:
      limits:
        cpu: 1
        memory: 1Gi
      requests:
        cpu: 500m
        memory: 1Gi
    logging:
      type: inline
      loggers:
        kafka.root.logger.level: INFO
        kafka.request.logger.level: INFO
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 1Gi
      deleteClaim: true
  entityOperator:
    topicOperator: {}
    userOperator: {}
  cruiseControl:
    config:
      default.goals: >
        com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal
      hard.goals: >
        com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal

Kafka 리소스를 생성하게 되면 zookeeper, kafka와는 다르게 Cruise Control은 Deployment 리소스로 배포됩니다.

KafkaRebalance

Strimzi에서 최적화 작업을 제안하기 위해서는 KafkaRebalance라는 리소스를 생성하는 것으로 제안을 할 수 있습니다. kafkaRebalance는 Cruise Control에 의해 제안을 확인하고 해당 제안을 적용하게 됩니다. KafkaRebalance로 제안할 수 있는 종류는 다음과 같습니다.

full 기본값

Cruise Control에 정의된 goal을 기반으로 클러스터의 모든 브로커에 rebalance를 수행합니다. KafkaRebalance의 기본값으로 KafkaRebalance에 아무런 정보를 기입하지 않으면 full 모드로 동작하지만 사용자는 KafkaRebalance 리소스를 생성할 때 mode 필드를 통해 리밸런싱의 모드를 직접 지정할 수 있습니다. full 모드를 명시적으로 지정하면, Cruise Control은 클러스터 전체에 대한 종합적인 리밸런싱 작업을 수행할 수 있습니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: my-rebalance
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  mode: full

 

add-brokers

add-brokers 옵션은 Kafka 클러스터에 브로커를 추가할 때 사용되며, Cruise Control을 통해 이러한 작업을 자동화하고 최적화할 수 있습니다. Kafka 클러스터의 확장 과정에서 단순히 새로운 브로커를 추가하는 것만으로는 기존에 생성된 토픽의 파티션 복제본이 자동으로 새 브로커로 재할당되지 않습니다. 이 경우, 새 브로커는 앞으로 생성될 토픽의 파티션에만 사용될 수 있습니다.

add-brokers 옵션을 사용하여 Kafka 클러스터에 새로 추가된 브로커들에게 기존 토픽의 파티션 복제본을 자동으로 할당할 수 있습니다. 이는 클러스터의 리소스 사용을 최적화하고, 전체적인 부하 분산을 개선하는 데 도움이 됩니다. add-brokers 작업을 통해 새 브로커는 기존 토픽의 파티션 복제본을 할당받게 되며, 이는 클러스터의 부하를 보다 균등하게 분산시키고, 특정 브로커에 과부하가 집중되는 것을 방지합니다. 

파티션 복제본의 재할당 과정은 클러스터의 크기와 상태, 네트워크 속도 등에 따라 시간이 다소 소요될 수 있습니다.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: my-rebalance
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  mode: add-brokers
  brokers: [2, 3]

remove-brokers

Kafka Cluster를 축소학 위해 Broker를 제거할 때 사용됩니다. KafkaRebalance를 사용하지 않고 브로커를 제거할 경우 파티션의 메시지가 정상적으로 Replication 되지 않고 Broker가 드랍되어 메시지 손실이 발생할 수 있습니다. 이러한 문제를 방지하기 위해 remove-broker를 사용하여 파티션을 이동하고 안전하게 Broker를 제거할 수 있습니다. remove-brokers 명령을 실행할 때, 제거하려는 브로커의 ID를 지정합니다. Cruise Control은 이 정보를 바탕으로 해당 브로커에 할당된 파티션 복제본을 다른 브로커로 이동시키는 최적의 계획을 생성하고 실행합니다.

브로커를 제거하는 과정에서 파티션의 리밸런싱이 발생합니다. 이 과정은 네트워크 트래픽과 I/O 부하를 증가시킬 수 있으므로, 클러스터의 성능에 일시적인 영향을 줄 수 있습니다.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: my-rebalance
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  mode: remove-brokers
  brokers: [2, 3]

KafkaRebalance 수락하기

KafkaRebalance 리소스를 통해 Cruise Control을 사용하여 Kafka 클러스터의 리밸런싱을 관리할 때, 작업의 상태는 여러 단계를 거칩니다. 이 과정은 사용자가 제안된 리밸런싱 계획을 검토하고 사용자가 승인하는 방식으로 리밸런싱을 수행하는 방식으로 클러스터의 변경 사항을 보다 안전하게 관리할 수 있습니다.

# 리소스 상태 확인
> kubectl get kafkarebalances.kafka.strimzi.io -n kafka
NAME           CLUSTER    PENDINGPROPOSAL  PROPOSALREADY   REBALANCING   READY   NOTREADY
my-rebalance   my-cluster                                  true

 

KafkaRebalance 리소스 상태

PENDINGPROPOSAL
KafkaRebalance 리소스가 생성된 직후, Cruise Control이 리밸런싱 작업의 계획을 작성하고 있는 단계를 나타냅니다.

PROPOSALREADY
작업 계획이 완료되었고 사용자의 검토를 기다리고 있는 상태입니다.

REBALANCING
사용자가 리밸런싱 계획을 승인한 후, Cruise Control이 실제로 리밸런싱 작업을 수행하고 있는 상태입니다.

READY
리밸런싱 작업이 성공적으로 완료되어, KafkaRebalance 리소스가 최종 상태에 도달한 것입니다.

NOTREADY
리밸런싱 작업에 문제가 발생하여 계획이 성공적으로 수행되지 않았음을 나타내는 상태입니다. 이 경우, 문제의 원인을 분석하고 해결한 후 리밸런싱을 다시 시도해야 할 수 있습니다.

 

KafkaRebalance 리소스가 정상적으로 생성되었다면 describe를 이용하여 작업의 계획을 확인할 수 있습니다.

# 작업 계획 확인
> kubectl describe kafkarebalances.kafka.strimzi.io -n kafka my-rebalance
Name:         my-rebalance
Namespace:    kafka
Labels:       strimzi.io/cluster=my-cluster
Annotations:  <none>
API Version:  kafka.strimzi.io/v1beta2
Kind:         KafkaRebalance
Metadata:
  Creation Timestamp:  2024-03-17T06:48:41Z
  Generation:          3
  Resource Version:    179034
  UID:                 00ff25be-fb27-4f99-a9bb-2722560dfddd
Spec:
  Goals:
    LeaderReplicaDistributionGoal
    TopicReplicaDistributionGoal
  Skip Hard Goal Check:  true
Status:
  Conditions:
    Last Transition Time:  2024-03-17T07:01:35.511905341Z
    Status:                True
    Type:                  ProposalReady
  Observed Generation:     3
  Optimization Result:
    After Before Load Config Map:  my-rebalance
    Data To Move MB:               0
    Excluded Brokers For Leadership:
    Excluded Brokers For Replica Move:
    Excluded Topics:
    Intra Broker Data To Move MB:         0
    Monitored Partitions Percentage:      100
    Num Intra Broker Replica Movements:   0
    Num Leader Movements:                 0
    Num Replica Movements:                0
    On Demand Balancedness Score After:   100
    On Demand Balancedness Score Before:  100
    Provision Recommendation:
    Provision Status:                     UNDECIDED
    Recent Windows:                       1
  Session Id:                             b31d2057-273f-4666-9565-51ce304e95db
Events:                                   <none>

 

작업의 계획을 확인하고 annotate 주석을 이용하여 작업을 승인합니다.

# 작업 승인
> kubectl annotate kafkarebalance -n kafka my-rebalance strimzi.io/rebalance="approve"

# 작업을 중지할 때 stop 수행
# kubectl annotate kafkarebalance -n kafka --overwrite my-rebalance strimzi.io/rebalance="stop"

# 완료된 작업을 나중에 재사용할 때 refresh 수행 후 approve
# kubectl annotate kafkarebalance -n kafka --overwrite my-rebalance strimzi.io/rebalance="refresh"
# kubectl annotate kafkarebalance -n kafka --overwrite my-rebalance strimzi.io/rebalance="approve"

 

 


Cruise Control과 KafkaRebalance 실습

지금까지 Cruise Control과 KafkaRebalance에 대해서 알아봤습니다. 이제는 Broker 추가 및 제거하는 방법으로 직접 실습을 진행하겠습니다.

 

Cluster 구성

실습 진행을 위해 테스트용 클러스터를 구축합니다. 이번 실습에서는 별다른 설정 없이 cruiseControl 기본 값 만으로 테스트를 진행하였습니다.

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-a
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 2
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: true
        
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
  annotations:
    strimzi.io/node-pools: enabled
spec:
  kafka:
    version: 3.6.1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    config:
      default.replication.factor: 2
      min.insync.replicas: 1
      inter.broker.protocol.version: "3.6"
      num.partitions: 3
    resources:
      limits:
        cpu: 1
        memory: 1Gi
      requests:
        cpu: 500m
        memory: 1Gi
    logging:
      type: inline
      loggers:
        kafka.root.logger.level: INFO
        kafka.request.logger.level: INFO
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 1Gi
      deleteClaim: true
  entityOperator:
    topicOperator: {}
    userOperator: {}
  cruiseControl:
    {}

 

Topic 생성

토픽 a-topic, b-topic을 생성하고 나서 토픽 파티션의 할당된 브로커를 확인하면 0, 1번 브로커에 배포된 것을 확인할 수 있습니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: a-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 4
  replicas: 2
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: b-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 4
  replicas: 2
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824

 

kcat 파드를 이용하여 토픽 생성 결과 확인

> ❯ kubectl exec -it -n kafka kcat -- kcat -L -b my-cluster-kafka-bootstrap:9092
Metadata for all topics (from broker -1: my-cluster-kafka-bootstrap:9092/bootstrap):
 2 brokers:
  broker 0 at my-cluster-pool-a-0.my-cluster-kafka-brokers.kafka.svc:9092
  broker 1 at my-cluster-pool-a-1.my-cluster-kafka-brokers.kafka.svc:9092 (controller)
 5 topics:
  ( ... 생략 ... )
  topic "a-topic" with 4 partitions:
    partition 0, leader 0, replicas: 0,1, isrs: 0,1
    partition 1, leader 1, replicas: 1,0, isrs: 1,0
    partition 2, leader 0, replicas: 0,1, isrs: 0,1
    partition 3, leader 1, replicas: 1,0, isrs: 1,0
  topic "b-topic" with 4 partitions:
    partition 0, leader 1, replicas: 1,0, isrs: 1,0
    partition 1, leader 0, replicas: 0,1, isrs: 0,1
    partition 2, leader 1, replicas: 1,0, isrs: 1,0
    partition 3, leader 0, replicas: 0,1, isrs: 0,1

 

Broker 확장

KafkaNodePool의 개수를 증가 시켜 브로커를 추가합니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-a
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 4
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: true

 

브로커를 추가하여 파드가 생성되었지만 카프카 클러스터를 확인하면 추가된 브로커는 클러스터에 합류하지 못한 것을 알 수 있습니다.

 

kcat을 이용하여 확인하면 클러스터에 변화가 없는 것을 알 수 있습니다.

> kubectl exec -it -n kafka kcat -- kcat -L -b my-cluster-kafka-bootstrap:9092
Metadata for all topics (from broker -1: my-cluster-kafka-bootstrap:9092/bootstrap):
 2 brokers:
  broker 0 at my-cluster-pool-a-0.my-cluster-kafka-brokers.kafka.svc:9092
  broker 1 at my-cluster-pool-a-1.my-cluster-kafka-brokers.kafka.svc:9092 (controller)
 5 topics:
  ( ... 생략 ... )
  topic "a-topic" with 4 partitions:
    partition 0, leader 0, replicas: 0,1, isrs: 0,1
    partition 1, leader 1, replicas: 1,0, isrs: 1,0
    partition 2, leader 0, replicas: 0,1, isrs: 0,1
    partition 3, leader 1, replicas: 1,0, isrs: 1,0
  topic "b-topic" with 4 partitions:
    partition 0, leader 1, replicas: 1,0, isrs: 1,0
    partition 1, leader 0, replicas: 0,1, isrs: 0,1
    partition 2, leader 1, replicas: 1,0, isrs: 1,0
    partition 3, leader 0, replicas: 0,1, isrs: 0,1

 

KafkaRebalance add-brokers

2, 3번 브로커를 추가합니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: rebalance-add
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  mode: add-brokers
  brokers: [2, 3]

 

KafkaRebalance 리소스 상태 확인

 

작업 계획 확인

 

작업 승인

 

결과 확인

> kubectl exec -it -n kafka kcat -- kcat -L -b my-cluster-kafka-bootstrap:9092
Metadata for all topics (from broker -1: my-cluster-kafka-bootstrap:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-pool-a-0.my-cluster-kafka-brokers.kafka.svc:9092
  broker 2 at my-cluster-pool-a-2.my-cluster-kafka-brokers.kafka.svc:9092
  broker 3 at my-cluster-pool-a-3.my-cluster-kafka-brokers.kafka.svc:9092
  broker 1 at my-cluster-pool-a-1.my-cluster-kafka-brokers.kafka.svc:9092 (controller)
 5 topics:
 (... 생략 ...)
  topic "a-topic" with 4 partitions:
    partition 0, leader 3, replicas: 3,2, isrs: 2,3
    partition 1, leader 3, replicas: 3,2, isrs: 2,3
    partition 2, leader 3, replicas: 3,2, isrs: 2,3
    partition 3, leader 2, replicas: 2,3, isrs: 3,2
  topic "b-topic" with 4 partitions:
    partition 0, leader 1, replicas: 1,0, isrs: 1,0
    partition 1, leader 3, replicas: 3,2, isrs: 2,3
    partition 2, leader 3, replicas: 3,2, isrs: 2,3
    partition 3, leader 0, replicas: 0,1, isrs: 0,1

결과를 확인하면 기존 토픽의 파티션이 새롭게 추가된 브로커에도 할당된 것을 확인할 수 있습니다.

 

KafkaRebalance remove-brokers 생성

이제는 반대로 추가한 브로커를 제거하는 방법을 수행합니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: rebalance-remove
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  mode: remove-brokers
  brokers: [2, 3]

 

remove-brokers 작업 수행

 

KafkaNodePool의 개수를 줄여 자원 반납

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-a
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 2
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: true

 

참고문서

https://strimzi.io/docs/operators/latest/deploying#cruise-control-concepts-str

KafkaTopic

Strimzi를 사용하면 Kubernetes 환경에서 Kafka Topic을 쉽고 효율적으로 관리할 수 있습니다. Strimzi의 Topic Operator를 통해 Kafka Topic을 Kubernetes의 Custom Resource로 선언적으로 생성, 수정, 삭제할 수 있으며, 이는 접근 권한 문제와 같은 다양한 문제를 해결하는 데 도움이 됩니다.

KafkaTopic 생성하기

먼저 Strimzi를 이용하여 KafkaTopic을 생성하기 위해서는 Kafka Cluster가 Kubernetes 환경에 Strimzi Cluster Operator로 이미 배포되어 있어야 합니다. Strimzi를 이용한 Kafka Cluster 구축은 아래 링크를 통해 진행할 수 있습니다.

 

Strimzi를 이용한 Kafka Cluster 배포

https://stdhsw.tistory.com/entry/Strimzi-2-Cluster-Operator%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%9C-Kafka-Cluster-%EB%B0%B0%ED%8F%AC

 

my-topic.yml 파일 작성

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 3
  replicas: 2
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824

strimzi.io/cluster 라벨을 이용하여 my-cluster 이름의 카프카 클러스터에 my-topic을 생성합니다. spec에는 토픽의 세부 사항을 정의할 수 있습니다. 여기에는 파티션의 수(partitions), 각 파티션의 복제본 수(replicas)를 설정하였으며, 토픽에 적용할 추가적인 설정은 config에 기록하여 정의할 수 있습니다.

Strimzi에서는 Topic을 생성할 때 63글자 미만이며, 소문자와 hyphen을 이용하여 토픽명을 정의하는 것을 권장하고 있습니다. (특수문자, 공백, 기호는 사용하지 마세요)

 

 

Topic 생성

kubectl apply -f my-topic.yml

kafkaTopic 리소스를 생성하게 되면 my-cluster의 카프카 클러스터로 생성된 Entity Operator의 Topic Operator가 my-cluster 카프카에 토픽을 생성합니다.

만약 KafkaTopic 리소스가 아닌 다른 방법으로 토픽을 생성되었다면 ?

Kafka Streams 또는 auto.create.topics.enable 옵션으로 생성된 토픽은 기존 토픽명에 해시값이 붙어 "my-topic---c55e57fe2546a33f9e603caf57165db4072e827e"와 같은 이름으로 KafkaTopic 리소스가 생성됩니다. 이렇게 생성된 KafkaTopic리소스로도 해당 토픽을 관리할 수 있습니다.

 

kcat으로 Topic에 메시지 전송(Producer) 및 수신(Consumer)

Kafka 토픽이 성공적으로 생성된 후, 그 토픽이 정상적으로 동작하는지 확인하기 위해 kcat (이전 명칭: kafkacat)을 사용하여 메시지를 전송하고 수신하여 정상 동작을 확인합니다.

 

kafka-kcat.yml

apiVersion: v1
kind: Pod
metadata:
  name: kcat
  namespace: kafka
spec:
  containers:
    - name: kcat
      image: edenhill/kcat:1.7.1
      command: ["tail"]
      args: ["-f", "/dev/null"]
  terminationGracePeriodSeconds: 0

 

kcat 파드 생성

kubectl apply -f kafka-kcat.yml

 

kcat을 이용한 메시지 전송

# kcat으로 카프카 클러스터 확인
> kubectl exec -it -n kafka kcat -- kcat -L -b my-cluster-kafka-bootstrap:9092
Metadata for all topics (from broker -1: my-cluster-kafka-bootstrap:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-pool-a-0.my-cluster-kafka-brokers.kafka.svc:9092
  broker 2 at my-cluster-pool-b-2.my-cluster-kafka-brokers.kafka.svc:9092
  broker 3 at my-cluster-pool-b-3.my-cluster-kafka-brokers.kafka.svc:9092
  broker 1 at my-cluster-pool-a-1.my-cluster-kafka-brokers.kafka.svc:9092 (controller)
 1 topics:
  topic "my-topic" with 3 partitions:
    partition 0, leader 1, replicas: 1,0, isrs: 1,0
    partition 1, leader 0, replicas: 0,2, isrs: 0,2
    partition 2, leader 2, replicas: 2,3, isrs: 2,3

# kcat으로 메시지 전송 메시지를 전부 입력하였다면 ctrl+d로 나옵니다.
> kubectl exec -it -n kafka kcat -- kcat -P -b my-cluster-kafka-bootstrap:9092 -t my-topic -K:
> 1:a
> 2:b
> 3:c
> 1:aa
> 2:bb
> 3:cc

 

kcat을 이용한 메시지 수신

# kcat을 이용하여 메시지는 수신합니다.
> kubectl exec -it -n kafka kcat -- kcat -C -b my-cluster-kafka-bootstrap:9092 -t my-topic
b
c
bb
cc
a
aa
% Reached end of topic my-topic [1] at offset 4
% Reached end of topic my-topic [2] at offset 2
% Reached end of topic my-topic [0] at offset 0

 

참고문서

https://strimzi.io/docs/operators/latest/deploying

 

Cluster Operator를 이용한 Kafka Cluster 배포

Strimzi를 사용하여 Kafka 클러스터를 배포하는 것은 Kubernetes 환경에서 매우 간단합니다. Strimzi의 Custom Resource Definitions (CRD)인 Kafka 리소스를 활용하여 Kafka Broker와 Zookeeper 앙상블을 포함한 Kafka 클러스터를 구성할 수 있습니다.

Strimzi 설치를 진행하지 않으셨다면 아래 링크를 통해 Strimzi 설치를 먼저 진행하여 주시기 바랍니다.

https://stdhsw.tistory.com/entry/Strimzi-1-Strimzi-Cluster-Operator-%EC%84%A4%EC%B9%98-in-Kubernetes

 

Zookeeper 모드 Kafka Cluster 배포

Strimzi의 Custom Resource인 Kafka 리소스를 사용하여 간단하게 Kafka 클러스터를 배포하는 방법은 다음과 같습니다.

 

my-cluster.yml 파일 작성

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-a
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 2
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: true
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-b
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 2
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: true

---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
  annotations:
    strimzi.io/node-pools: enabled
spec:
  kafka:
    version: 3.6.1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    config:
      default.replication.factor: 2
      min.insync.replicas: 1
      inter.broker.protocol.version: "3.6"
      num.partitions: 3
    resources:
      limits:
        cpu: 1
        memory: 1Gi
      requests:
        cpu: 500m
        memory: 1Gi
    logging:
      type: inline
      loggers:
        kafka.root.logger.level: INFO
        kafka.request.logger.level: INFO
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 1Gi
      deleteClaim: true
  entityOperator:
    topicOperator: {}
    userOperator: {}

KafkaNodePool은 카프카 브로커의 특성을 정의하고 해당 특성을 카프카 클러스터에 브로커로 등록해 주는 역할을 수행합니다. KafkaNodePool을 이용하여 브로커 노드 관리를 효율적으로 수행할 수 있습니다. 카프카 브로커는 KafkaNodePool 마다 2개씩 구성하여 총 4대의 브로커로 구성되었으며, Zookeeper는 3대로 구성하였습니다.

 

Kafka 리소스 생성

kubectl apply -f my-cluster.yml

 

kafka 리소스를 생성하였다면 StrimziPodSet리소스가 생성됩니다. 기존에는 StatefulSet을 통하여 파드가 관리되었지만 현재는 Strimzi에서 정의한 StrimziPodSet으로 파드가 관리됩니다. 위와 같이 카프카 클러스터를 구축하였다면 다음과 같이 StrimziPodSet이 생성된 것을 확인할 수 있습니다.

 

그리고 my-cluster 카프카 설정으로 Deployment 워크로드로 EntityOperator가 생성됩니다. EntityOperator는 TopicOperator와 UserOperator로 구성되며 Topic과 User를 생성 및 관리하는 역할을 수행합니다. 하나의 카프카 클러스터에 최대 하나의 EntityOperator가 생성될 수 있습니다.

StrimziPodSet과 EntityOperator가 모두 정상적으로 배포가 되었고 이를 통해 다음과 같이 파드가 생성된 것을 확인할 수 있습니다.

KRAFT 모드는 현재

이글을 작성하고 다음날... Strimzi에서 Kraft모드 사용 정식 버전이 나왔습니다. Strimzi 버전 0.40.0으로 Upgrade를 진행하시면 정식 Kraft 모드로 카프카 클러스터 배포를 진행하실 수 있습니다.

 

kraft-cluster.yml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dual-role
  namespace: kafka
  labels:
    strimzi.io/cluster: kraft-cluster
spec:
  replicas: 4
  roles:
    - controller
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        deleteClaim: false
---

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kraft-cluster
  namespace: kafka
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: 3.6.1
    metadataVersion: 3.6-IV2
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
    resources:
      limits:
        cpu: 1
        memory: 1Gi
      requests:
        cpu: 500m
        memory: 1Gi
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 5Gi
          deleteClaim: false
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 1Gi
      deleteClaim: false
  entityOperator:
    userOperator: {}

 

KafkaNodePool은 Strimzi Kafka Operator에서 제공하는 리소스 중 하나로, Kafka 브로커의 구성 및 성질을 정의하는 리소스입니다. KRaft 모드(즉, Zookeeper 없이 Kafka를 운영하는 모드)에서 KafkaNodePool을 사용하면 브로커의 역할을 보다 세밀하게 설정할 수 있습니다. KRaft 모드에서 브로커는 다음과 같은 역할을 가질 수 있습니다.

  • Controller: Kafka 클러스터 내에서 메타데이터를 관리하는 중앙 제어 노드의 역할을 합니다. 컨트롤러는 클러스터의 전반적인 상태를 관리하고, 토픽 생성, 삭제, 파티션 리밸런싱과 같은 작업을 총괄합니다. KRaft 모드에서는 단일 컨트롤러 또는 컨트롤러 세트가 이러한 역할을 담당합니다.
  • Broker: 실제 메시지를 저장하고 클라이언트 요청을 처리하는 노드의 역할을 합니다. 브로커는 프로듀서로부터 메시지를 받아 저장하고, 컨슈머의 요청에 따라 메시지를 제공합니다.
  • Controller와 Broker 모두: 하나의 노드가 컨트롤러 역할과 브로커 역할을 동시에 수행할 수도 있습니다. 이 경우, 노드는 메타데이터 관리와 메시지 저장 및 처리의 책임을 모두 갖게 됩니다.

KafkaNodePool 리소스를 사용하여 각 브로커 노드의 역할을 구성함으로써, Kafka 클러스터의 성능과 안정성을 최적화하고, 특정 운영 요구사항에 맞춰 클러스터를 조정할 수 있습니다.

 

YAML파일을 보시면 zookeeper가 선언된 것을 확인할 수 있는데 아직까지 Strimzi 0.40.0 버전에서는 Kafka CRD에 zookeeper가 반듯이 선언되도록 정의가 되어 있어 zookeeper를 선언해야 되지만 실제로 zookeeper가 배포되지는 않습니다.

 

참고문서

https://strimzi.io/docs/operators/latest/deploying

 

Strimzi 란

Strimzi는 Apache Kafka 및 기타 Apache Kafka 생태계 프로젝트를 Kubernetes 및 Red Hat OpenShift와 같은 컨테이너 오케스트레이션 플랫폼에서 쉽게 배포, 관리 및 운영할 수 있도록 도와주는 오픈 소스 프로젝트입니다. Strimzi를 통하여 Kafka를 Kubernetes에 배포하면 Kafka 클러스터를 쉽게 확장하고 Kafka 클러스터를 쉽게 백업하고 Kafka 클러스터를 쉽게 업그레이드할 수 있습니다. 또한 Strimzi는 Prometheus, Grafana 등과 같은 모니터링 도구와 통합되어 Kafka 클러스터의 상태와 성능을 지속적으로 모니터링할 수 있도록 하며, Kafka 클러스터를 쉽게 재구성하고 Kafka 클러스터를 쉽게 보안할 수 있습니다.

또한, Strimzi는 Zookeeper, Kafka Connect, Kafka MirrorMaker, Kafka Bridge, Kafka Exporter, Cruise Control, Strimzi UI와 같은 Kafka 생태계 프로젝트를 Kubernetes에 통합 배포할 수 있도록 지원합니다. 이는 Kafka 기반 애플리케이션과 시스템을 효율적으로 구축, 확장 및 관리하는 데 큰 도움이 됩니다. 이번 실습을 통하여 Kubernetes 환경에서 Strimzi Cluster Operator 배포와 각 리소스의 기능에 대하여 알아보겠습니다.

 

Strimzi 동작 방식

이미지 출처 : https://strimzi.io/docs/operators/latest/overview#kafka-components_str

 

Strimzi 설치

Kubernetes 환경에 Strimzi를 설치하는 방법은 대표적으로 3가지 방법이 있습니다. YAML을 통한 설치 방법, OperatorHub을 이용한 설치 방법, Helm을 이용한 설치방법이 있습니다. 이번 문서에서는 YAML 방식과, Helm 방식에 대하여 정리하였습니다. 둘 중 하나의 방법을 선택하여 진행하여 주시기 바랍니다.

YAML 설치 방법

Strimzi 클러스터를 Kubernetes 환경에 배포하는 과정에서는 기본적인 Kubernetes 명령어와 개념에 대한 이해가 필요합니다. YAML을 이용하여 배포를 진행할 때는 Kubernets 명령어 습득 만으로도 Strimzi 클러스터를 배포할 수 있는 이점이 있지만 다수의 설정이나 복잡한 구성을 변경하고자 할 때 번거롭고 오류가 발생할 수 있는 불편함이 있습니다.

 

GitHub에서 양식 가져오기

`https://github.com/strimzi/strimzi-kafka-operator` GitHub주소를 가져와 해당 디렉토리로 이동하여 다음 명령어를 통하여 간편하게 Strimzi Cluster Operator를 배포할 수 있습니다.

kubectl apply -f install/cluster-operator -n 네임스페이스

 

Helm을 이용한 설치 방법

Helm을 사용하여 Strimzi Kafka 클러스터를 Kubernetes 환경에 배포하는 방법은 Kubernetes 명령어뿐만 아니라 Helm의 기본 사용법에 대한 이해도 필요하게 합니다. 이러한 접근 방식은 초기 학습해야 되는 단점이 있을 수 있지만, Helm의 기본 사용법을 숙지하면 Helm은 Kubernetes 애플리케이션의 패키지 관리자 역할을 수행하여 다양한 설정을 values.yaml 파일을 통해 간단하게 수정할 수 있으며, 업그레이드, 롤백, 삭제 등의 작업을 간단한 명령어로 수행할 수 있습니다. 뿐만 아니라 values.yaml 파일을 공유함으로써 협업하는 팀원들에게 현재의 설정 값을 간편하게 공유할 수 있습니다.

 

values.yml 설정 변경

Kubernets Cluster내에 모든 Namespace에 접근할 수 있도록 설정을 변경합니다.

watchAnyNamespace: true

 

Helm을 이용한 Strimzi 설치

# helm repo 추가
helm repo add strimzi https://strimzi.io/charts

# helm으로 Strimzi 설치 진행
helm upgrade --install -n kafka --create-namespace strimzi-kafka-operator strimzi/strimzi-kafka-operator --version 0.40.0 -f values.yml

 

정상 동작 확인

위와 같이 둘중 하나의 방식으로 Strimzi를 정상적으로 설치되었다면 Kubernetes 클러스터에 CRD(CustomResourceDefinitions)가 정의되고 Strimzi Cluster Operator가 배포된 것을 확인할 수 있습니다.

 

Strimzi CRD 확인

❯ kubectl api-resources  | grep strimzi
strimzipodsets             sps        core.strimzi.io/v1beta2         true       StrimziPodSet
kafkabridges               kb         kafka.strimzi.io/v1beta2        true       KafkaBridge
kafkaconnectors            kctr       kafka.strimzi.io/v1beta2        true       KafkaConnector
kafkaconnects              kc         kafka.strimzi.io/v1beta2        true       KafkaConnect
kafkamirrormaker2s         kmm2       kafka.strimzi.io/v1beta2        true       KafkaMirrorMaker2
kafkamirrormakers          kmm        kafka.strimzi.io/v1beta2        true       KafkaMirrorMaker
kafkanodepools             knp        kafka.strimzi.io/v1beta2        true       KafkaNodePool
kafkarebalances            kr         kafka.strimzi.io/v1beta2        true       KafkaRebalance
kafkas                     k          kafka.strimzi.io/v1beta2        true       Kafka
kafkatopics                kt         kafka.strimzi.io/v1beta2        true       KafkaTopic
kafkausers                 ku         kafka.strimzi.io/v1beta2        true       KafkaUser

 

Strimzi Operator 확인

❯ k get po -n kafka | grep strimzi
strimzi-cluster-operator-95d88f6b5-mmc7q    1/1     Running   1 (30m ago)   70m

 

Strimzi Cluster Operator

Strimzi Cluster Operator는 Kubernetes 환경에서 Strimzi CRD(CustomResourceDefinitions) 관련 리소스를 관리하는 역할을 수행합니다. 주요 기능으로는 Kafka Cluster 배포 및 관리, 구성 관리, 스케일링, 업데이트 및 업그레이드, 모니터링 및 로깅, 보안 관리, 복구, 토픽 생성 관리 및 사용자 생성 관리, 그리고 커넥터 생성 관리를  포함됩니다. 이를 통해 Kafka 클러스터의 성능, 보안, 효율성을 향상하며 Kubernetes 환경에서의 관리를 용이하게 합니다.

Strimzi 공식 문서에서는 Kubernetes 클러스터에 하나의 Cluster Operator만 배포하는 것을 권장하는데, 이는 여러 Operator가 동일한 리소스를 관리하려고 할 때 발생할 수 있는 충돌과 관련된 문제를 방지하기 위함입니다. Cluster Operator는 Strimzi에서 생성된 Custom Resource Definitions(CRD)에 대한 감시로 리소스 관리 및 조정 작업을 수행합니다. Cluster Operator가 동일한 CRD 인스턴스를 관리하려고 하면 예상치 못한 동작이나 데이터 불일치가 발생할 수 있습니다.

 

Strimzi Cluster 구조

Kubernetes Cluster 내에서 단 하나의 Strimzi Cluster Operator만 존재하며 이 Operator를 통해 Kubernetes Cluster 내에서 여러 개의 Kafka Cluster를 구축하고 관리할 수 있습니다.

 

Strimzi CustomResource

Strimzi를 설치하면, Kafka 클러스터 및 관련 리소스를 Kubernetes에서 관리할 수 있도록 여러 CRD(CustomResourceDefinitions) 를 생성합니다. 각 CRD는 Kafka 생태계의 특정 구성 요소를 나타내며, Strimzi Cluster Operator를 통해 관리됩니다. 주요 CRD와 그 기능은 다음과 같습니다

  • kafka : Kafka 클러스터의 생성, 구성, 관리를 위한 리소스를 정의합니다. 사용자는 이 리소스를 사용하여 Kafka 브로커, ZooKeeper 앙상블, Entity Operator를 포함한 전체 Kafka 클러스터를 정의하고 배포할 수 있습니다.
  • KafkaTopic : Kafka 클러스터 내의 개별 토픽을 관리하기 위해 사용됩니다. 이를 통해 토픽의 생성, 삭제, 구성 변경 등을 Kubernetes 네이티브 방식으로 수행할 수 있습니다.
  • KafkaUser : Kafka 클러스터의 사용자와 인증 정보를 관리합니다. 이 리소스를 사용하여 사용자 인증서 및 액세스 권한을 설정하고 관리할 수 있습니다.
  • KafkaConnect : Kafka Connect 클러스터를 배포하고 관리하는 CRD입니다. 데이터 소스와 싱크를 Kafka 토픽과 연결하는 커넥터를 배포 및 관리할 수 있습니다.
  • KafkaConnectors : Kafka Connect 클러스터 내에서 실행되는 개별 커넥터를 정의하고 관리합니다.
  • KafkaBridge : Kafka 클러스터에 접근할 수 있는 REST 인터페이스를 제공하는 Kafka Bridge 인스턴스를 배포 및 관리합니다. 이는 외부 애플리케이션이 Kafka 클러스터와 통신할 수 있게 해 줍니다.
  • KafkaMirrorMaker : 토픽의 데이터를 미러링 하는 데 사용되지만 현재는 사용하지 않습니다.
  • KafkaMirrorMaker2 : KafkaMirrorMaker의 더 발전된 버전으로, 더 복잡한 미러링 및 리패키징 요구사항을 지원합니다. 이를 통해 다양한 클러스터 간에 토픽, 메시지, 파티션 등을 더 효율적으로 미러링 할 수 있습니다.
  • KafkaRebalance : Kafka 클러스터 내에서 파티션의 리밸런싱을 관리하는 CRD입니다. 클러스터의 부하 분산 및 성능 최적화를 위한 자동 리밸런싱 작업을 지원합니다.
  • StrimziPodSet : 기존에는 StatefulSet을 이용하여 Kafka Broker가 관리되었지만 현재는 StatefulSet 대신 StrimziPodSet을 이용하여 관리됩니다. Kafka리소스로 생성된 Kafka Broker, Zookeeper 모두 StrimziPodSet으로 관리됩니다.
  • KafkaNodePool : 각 특성에 맞는 Kafka Broker의 특성과 Storage를 관리합니다.

참고문서

https://strimzi.io/docs/operators/latest/deploying

 

Deploying and Upgrading (0.40.0)

Configure and manage a Strimzi deployment to your precise needs using Strimzi custom resources. Strimzi provides example custom resources with each release, allowing you to configure and create instances of supported Kafka components. Fine-tune your deploy

strimzi.io

 

현재 제가 근무하는 회사는 모든 서비스를 마이크로서비스 아키텍처(MSA)로 동작하기 위해 Kubernetes 환경에서 운영하고 있습니다. 이러한 환경에서는 Horizontal Pod Autoscaler(HPA)를 사용하여, Consumer 애플리케이션이 일정 리소스를 초과하여 사용할 경우, 이를 감지하고 자동으로 스케일아웃(Scale Out)을 진행하여 최대 파티션 개수만큼 늘어날 수 있도록 설정하였으며 반대로 리소스 사용량이 감소하면, 스케일인(Scale In)을 통해 리소스 소비를 최소화하도록 조정하였습니다. 그러나, HPA로 인하여 Kafka 컨슈머의 스케일아웃(Scale Out) 또는 스케일인(Scale In)이 빈번히 발생하면, 파티션 할당(Partition Assignment) 과정에서 부하가 발생하여 Kafka의 성능 저하로 이어질 수 있습니다. 이는 파티션 할당 과정에서의 잦은 변동이 시스템에 부담을 주기 때문입니다. 이러한 문제를 해결하기 위해, 파티션 할당 전략(Partition Assignment Strategy)을 조정하는 설정 변경을 통해 이러한 부하를 최소화할 수 있습니다. 이번 문서에서는 Kafka 컨슈머 그룹에서 파티션을 효율적으로 할당하고 관리함으로써, 성능 저하를 최소화하여 최적화하는 방법에 대하여 정리하였습니다.

 

실습 방법

이번 실습에서는 kcat을 이용하여 카프카로부터 데이터를 consume하는 방법으로 진행하였습니다. kcat을 이용해서 partition.assignment.strategy를 설정하여 컨슈머 모드로 동작하는 방법은 다음과 같습니다.

(kcat 버전 1.7.0 이상부터 사용 가능한 옵션입니다.)

kcat -C -G <컨슈머그룹> -b <브로커주소> -X partition.assignment.strategy=<할당전략> 토픽명

 

docker compose 카프카 클러스터 구축방법
https://stdhsw.tistory.com/entry/Docker-compose%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%9C-Kafka-Cluster-%EA%B5%AC%EC%B6%95-KRAFT-Mode

 

kcat 기본 사용법

https://stdhsw.tistory.com/entry/kcatkafkacat-%EC%82%AC%EC%9A%A9%EB%B2%95

 

partition.assignment.strategy 설정

partition.assignment.strategy 설정은 Kafka Consumer에서 파티션 할당 알고리즘을 결정하는 설정입니다. 이 설정을 통해 Consumer Group 내의 각 Consumer에게 파티션을 할당하는 방식을 정의할 수 있으며, Kafka 클러스터의 전반적인 성능과 가용성에 영향을 미칠 수 있습니다. 다음은 partition.assignment.strategy에 적용될 수 있는 전략과 각각의 특성에 대하여 알아보겠습니다.

Range Assignment Strategy

Range Assignment Strategy는 partition.assignment.strategy 설정의 기본값으로, Kafka Consumer Group에서 파티션 할당을 관리하는 데 사용되는 전략입니다. 이 전략의 핵심은 각 토픽의 파티션들을 Consumer Group 내의 Consumer 리스트에 순차적으로 할당하는 것입니다. 이 할당 방식은 특히 토픽별로 파티션을 균등하게 분배하고자 할 때 유리합니다. 각 토픽에 대해 Consumer들은 대략적으로 균등한 부하를 받게 되어, 부하 분산에 효과적입니다. 그러나 Range Assignment Strategy는 몇 가지 단점도 가지고 있습니다. 특히, Consumer Group 내에서 Consumer의 수가 변할 때 (즉, Scale out이나 Scale in이 발생할 때) 전체 파티션 할당 과정이 재실행됩니다. 이는 Consumer Group에 속한 모든 Consumer의 연결을 잠시 끊고, 파티션 할당을 새롭게 시작한다는 의미입니다. 이러한 과정은 Consumer 수에 자주 변화가 있는 환경에서 비효율적이며, 메시지 처리 지연이나 리밸런싱 동안의 처리량 감소와 같은 문제를 초래할 수 있습니다.

장점 단점
 토픽별로 파티션에 데이터가 골고루 분포되었을 경우, Consumer Group 내의 각 Consumer가 균등하게 부하를 받게 됩니다.

균등한 부하를 받기 때문에 각 컨슈머의 부하량을 예상하기 쉽습니다.
 각 토픽에 대해 파티션을 균등하게 할당하려고 시도하지만, Consumer가 여러 토픽에서 데이터를 가져오는 경우에는 특정 파티션에 부하가 집중될 위험이 있습니다.

Consumer Group에 새로운 Consumer가 추가되거나 기존 Consumer가 제거되는 등의 Scale out/in이 발생하면, Range Assignment Strategy에 따라 모든 파티션이 재할당됩니다. 이 과정에서 모든 Consumer의 연결이 잠시 끊기게 되며, 이는 리밸런싱 과정을 거치게 되어 성능 저하가 발생할 수 있습니다.

 

Range Assignment Strategy의 동작은 아래 이미지처럼 Consumer Group 내의 Consumer 수가 줄어들 경우 모든 파티션 할당은 revoked로 연결이 끊어지고 다시 Topic을 기준으로 남아있는 Consumer에 재할당을 수행합니다.

Round Robin Assignment Strategy 

Round Robin Assignment Strategy는 Kafka에서 사용할 수 있는 또 다른 파티션 할당 전략으로, Range Assignment Strategy와 달리 토픽을 기준으로 파티션을 할당하지 않습니다. 대신, 이 전략은 사용 가능한 모든 파티션을 Consumer Group 내의 모든 Consumer에게 순서대로 할당합니다, 토픽 간 경계를 무시하고 전체 파티션 목록을 기준으로 합니다.

장점 단점
Round Robin 전략은 각 Consumer에게 전체 파티션 목록을 걸쳐 균등하게 할당함으로써, 모든 Consumer가 대략적으로 동일한 양의 작업을 처리하도록 합니다.  특정 토픽이 다른 토픽보다 훨씬 더 높은 메시지 유입률을 가지고 있을 경우, Round Robin 방식은 이러한 불균형을 고려하지 않기 때문에, 특정 토픽의 파티션에서 처리 지연이 발생할 수 있습니다.

Range Assignment Strategy와 마찬가지로 Consumer Group에 새로운 Consumer가 추가되거나 기존 Consumer가 제거되는 등의 Scale out/in이 발생하면 모든 파티션이 재할당됩니다.

 

Round Robin Assignment Strategy의 동작은 아래 이미지처럼 Consumer Group 내의 Consumer 수가 줄어들 경우 모든 파티션 할당은 revoked로 연결이 끊어지고 모든 파티션을 기준으로 Consumer에 재할당을 수행합니다.

 

Sticky Assignment Strategy

Sticky Assignment Strategy는 파티션 재할당을 최소화하는 것을 목표로 하는 전략입니다. Consumer Group의 Consumer 개수의 변화가 생겼을 때 기존의 파티션 할당을 가능한 유지 하여 Consumer 리밸런싱이 발생할 때 파티션의 재할당을 최소화하여 성능을 향상할 수 있는 전략입니다.

장점 단점
Sticky Assignment Strategy는 Consumer에 한 번 할당된 파티션을 가능한 한 그대로 유지하여 Consumer 개수의 변화가 발생하여도 할당된 파티션은 재할당을 수행하지 않습니다.

이미 할당된 파티션에 대해서는 Consumer 리밸런싱을 수행하지 않아 자주 Scale In/Out이 발생하는 환경에서 사용하기 적합합니다.
Sticky Assignment Strategy는 기존의 파티션 할당을 가능한 한 유지하려고 합니다. 이로 인해 Consumer Group 내에서 파티션을 새로운 Consumer에게 더 효율적으로 재분배하는 것이 어려워질 수 있습니다.

 

Sticky Assignment Strategy의 동작은 아래 이미지처럼 Consumer Group 내의 Consumer 수가 줄어들 경우 현재 동작중인 Consumer에 할당된 파티션은 유지하고 끊어진 Consumer에 대하여 재할당을 수행합니다. Sticky Assignment Strategy는 Consumer의 개수가 줄어드는 상황에서는 큰 문제가 발생하지 않으나 반대로 Consumer의 개수가 늘어나거나 잠시 중단되었던 Consumer가 restart를 하여 참여할 경우 해당 컨슈머에는 파티션할당이 불균형할 수 있습니다.

Cooperative Sticky Assignment Strategy 

Cooperative Sticky Assignment Strategy는 기존의 Sticky Assignment Strategy를 기반으로 하면서 몇 가지 중요한 개선 사항을 도입한 것입니다. 이 전략은 기존 Sticky 전략의 단점을 보완하며, 특히 Consumer Group 내의 Consumer 구성이 변할 때 발생하는 리밸런싱 프로세스의 효율성을 개선합니다. Cooperative Sticky 전략은 파티션 할당 변경이 필요할 때 전체 Consumer Group에 대한 리밸런싱을 수행하는 대신, 필요한 최소한의 변경만을 수행합니다. 이는 리밸런싱 프로세스가 더 빠르고 효율적으로 완료될 수 있도록 하여 전체 시스템의 성능에 미치는 영향을 최소화합니다.

장점 단점
Sticky 전략은 재할당의 불균형이 발생할 수 있는 반면에 Cooperative Sticky Assignment Strategy는 이러한 단점을 보완하여 Consumer 리밸런싱을 최소화하면서 균형있는 재할당이 이루어집니다.

Consumer의 Scale in/out이 자주 발생하는 환경에서 사용하기 적합니다.
Cooperative Sticky는 Consumer assignment 작업을 효율적으로 수행하지만 리밸런싱 작업이 다른 Strategy보다 오래 걸리는 단점이 있습니다.

 

Kubernetes 환경에서 Consumer Pod를 Horizontal Pod Autoscaler (HPA)와 함께 구성하여 Pod의 Scale in/out이 발생하는 환경에서는 Cooperative Sticky Assignment Strategy을 선택하는 것이 현명한 선택입니다. Cooperative Sticky는 티션 할당을 최소화하면서 점진적인 리밸런싱을 지원하기 때문에, Pod의 확장 및 축소가 발생해도 Kafka Consumer Group의 전체적인 성능 저하를 줄일 수 있습니다.

마지막으로

Kafka를 사용하는 경험이 쌓이면서 느끼는 것은 모든 상황을 만족하는 설정 값은 없다는 것을 많이 느끼게 되는데요. 현재 자신이 사용하고 있는 데이터의 특성과 Producer, Consumer의 상황을 잘 파악하고 가장 적절한 설정 값을 적용하는 것이 가장 중요하다고 생각합니다.

 

kcat(이전 명칭: kafkacat)은 Kafka를 테스트하고 디버깅하는 데 매우 유용한 명령줄 도구입니다. Kafka 클러스터, 토픽, 파티션 등에 대한 메타데이터 조회 기능을 제공하며, Producer 및 Consumer 모드에서 각각 메시지를 전송하고 수신하는 기능을 가지고 있습니다. 

Producer 모드에서 kcat은 표준 입력 또는 파일로부터 메시지를 읽어 지정된 Kafka 클러스터에 토픽 파티션으로 전송할 수 있습니다. Consumer 모드에서는 Kafka 토픽 파티션으로부터 메시지를 읽어 표준 출력으로 내보내며, 이를 통해 메시지 스트림의 내용을 실시간으로 모니터링하고 분석할 수 있습니다.

 

kcat은 kafka-console-consumer.sh와 kafka-console-producer.sh와 같은 Kafka의 기본 제공 명령줄 도구들과 유사해 보일 수 있지만, 더 다양하고 고급 기능을 제공합니다. 이 문서에서는 kcat의 설치 방법부터 기본적인 사용법에 대하여 다루었습니다.

kcat 설치 방법

kcat을 사용하기 전에, 시스템에 설치되어 있지 않다면 먼저 설치 과정을 거쳐야 합니다. 여기서는 macOS와 Ubuntu 시스템에서 kcat을 설치하는 방법에 대하여 작성하였습니다.

 

macOS 설치

macOS에서 kcat을 설치하는 가장 간단한 방법은 Homebrew를 사용합니다. Homebrew는 macOS 용 패키지 관리자입니다. Homebrew의 명령어를 이용하여 터미널에서 kcat을 설치할 수 있습니다

> brew update
> brew install kcat

 

Ubuntu 설치

Ubuntu에서 kcat을 설치하는 과정은 apt 패키지 관리자를 통해 간단하게 수행할 수 있습니다.

> sudo apt-get update
> sudo apt-get -y install kafkacat

 

설치가 완료되었다면 다음과 같이 명령어를 통하여 정상적으로 잘 설치가 되었는지 확인할 수 있습니다.

> kcat -V

Create Topic

Kcat은 Kafka 클러스터와 상호작용하는 데 매우 유용한 도구이지만, 아쉽게도 직접적으로 토픽을 생성하는 기능을 제공하지 않습니다. Kafka 토픽 생성은 일반적으로 Kafka의 내장 명령줄 도구인 kafka-topics.sh를 사용하여 수행됩니다. 토픽을 생성할 때는 토픽의 이름, 파티션 수, 복제 계수(replication factor)와 같은 중요한 설정을 지정해야 합니다. 예를 들어, my-topic이라는 이름의 토픽을 파티션 3개와 복제 계수 2로 설정하여 생성하기 위해서는 다음과 같은 명령어를 사용할 수 있습니다.

 

토픽생성 명령어

# kafka-topics.sh --create --bootstrap-server <브로커 주소> --replication-factor <복제본개수> --partitions <파티션개수> --topic <토픽명>
> kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 3 --topic my-topic

이 명령어는 Kafka 클러스터의 브로커 주소를 --bootstrap-server 옵션을 통해 지정하고, --replication-factor와 --partitions 옵션으로 각각 복제본 수와 파티션 수를 설정하여 my-topic 토픽을 생성합니다.

메타데이터 출력

Kcat을 활용하여 Kafka 클러스터의 메타데이터를 조회가 가능하며 사용 가능한 토픽 목록과 같은 정보를 얻을 수 있습니다.

  • '-L' 옵션을 사용하여 Kafka 클러스터의 메타데이터를 출력할 수 있습니다. 이 옵션은 클러스터에 대한 정보, 예를 들어 사용 가능한 토픽, 각 토픽의 파티션 정보, 리플리케이션 상태 등을 포함한 전반적인 상태 보고서를 제공합니다.
  • Kafka 클러스터와의 연결을 설정하기 위해서는 '-b' 옵션을 통해 브로커 주소를 입력합니다.
# kcat -L -b 브로커주소
> kcat -L -b localhost:10000

Producer 사용법

kcat을 이용한 Kafka Producer의 기본 사용법에 대해 설명하겠습니다. kcat의 Producer 모드를 사용하면, 사용자는 Kafka 토픽으로 메시지를 쉽게 전송할 수 있습니다. 이 모드를 활성화하려면, 다음 단계를 따르면 됩니다.

 

기본 방식의 메시지 전송

먼저 '-P' 옵션을 사용하여 kcat을 Producer 모드로 설정합니다. 이 모드는 Kafka 토픽에 메시지를 전송하기 위한 것입니다. '-b' 옵션으로 Kafka 브로커의 주소를 지정합니다. 브로커는 Kafka 시스템에서 메시지를 받아 처리하는 서버를 말합니다. '-t' 옵션을 사용해 데이터를 전송할 토픽을 지정합니다.

데이터 입력이 완료되면, Ctrl+D를 눌러 데이터 전송을 완료하고 kcat 명령을 종료합니다. 이는 표준 입력의 끝을 나타내며, kcat이 메시지 전송을 마무리하고 종료하도록 합니다.

kcat -P -b localhost:10000 -t my-topic
aaa
bbb
ccc

 

Key 값을 이용한 Partitioner 메시지 전송

추가적으로 kcat의 '-K:' 옵션을 사용하면 제공된 키를 사용하여 메시지를 특정 파티션으로 라우팅합니다. 이 기능은 Kafka의 키 기반 메시지 분할(partitioning) 메커니즘을 활용하는 것으로, 같은 키를 가진 메시지들이 항상 같은 파티션에 할당되도록 보장합니다. 이는 메시지의 순서를 유지하거나, 관련 메시지를 동일한 파티션에 그룹화하는 데 유용합니다.

'-K:' 옵션을 사용할 때는 메시지를 "키:값" 형식으로 작성하여 전송해야 합니다. 이때, 키와 값 사이에는 콜론(':')을 사용하여 구분합니다. 예를 들어, '1:Hello World'와 같은 형식으로 메시지를 작성하면, '1'이 메시지의 키가 되고, 'Hello World'가 메시지의 값이 됩니다. 메시지를 전송할 때 해당 키가 항상 동일한 파티션에 할당됩니다. 이는 데이터의 일관성과 순서를 유지할 수 있습니다.

kcat -P -b localhost:10000 -t my-topic -K:

 

파일 형식의 메시지 전송

위와 같은 방법으로도 Kafka에 메시지를 전송하는 과정은 매우 간단하고 직관적이지만, 수동으로 반복하여 메시지를 전송하거나 대량의 데이터를 한 번에 전송하는 경우 불편함을 겪을 수 있습니다. 이러한 작업을 효율적으로 처리하기 위해 kcat은 파일을 통해 미리 입력된 데이터를 전송하는 기능을 제공합니다.

kcat -P -b localhost:10000 -t my-topic -l data.txt

Consumer 사용법

기본 방식의 메시지 수신

kcat에서 '-C' 옵션은 Consumer 모드를 활성화하여 지정된 Kafka 토픽의 데이터를 가져오고, 이를 표준출력으로 출력하는 기능을 제공합니다. 이 옵션을 사용하면, Kafka 토픽에 저장된 메시지를 실시간으로 읽어 볼 수 있습니다. 여기서 -b 옵션은 Kafka 브로커의 주소를 지정하며, -t 옵션은 데이터를 가져올 토픽의 이름을 지정합니다. 이 명령을 실행하면, kcat은 my-topic 토픽으로부터 메시지를 읽어 표준 출력으로 내보냅니다. 이 과정은 Ctrl+C를 눌러 중단할 수 있습니다.

kcat -C -b localhost:10000 -t my-topic

 

Consumer Group 사용법

kcat에서 '-G' 옵션은 Consumer Group 모드를 활성화하여, 여러 개의 Consumer가 속한 Group을 지정하고, Kafka 토픽의 데이터를 여러 Consumer에 분산해서 처리할 수 있게 합니다. 이 기능은 Kafka의 Consumer Group 메커니즘을 활용하는 것으로, 메시지의 부하를 여러 Consumer 사이에 균등하게 분산시켜 처리할 수 있게 해주며, 데이터 처리의 확장성과 안정성을 높일 수 있습니다.

'-G'옵션을 적용했다면 위와는 다르게 '-t' 옵션을 제거해야 합니다.

kcat -C -G my-group -b localhost:10000 my-topic

Kafka에서 같은 Consumer Group에 속한 여러 Consumer를 사용할 경우, Consumer 리밸런싱이라는 과정을 통해 각 Consumer에게 특정 파티션들이 자동으로 할당되어 지정된 파티션으로 부터 데이터를 수신할 수 있습니다.

 

Consumer 출력 포맷 설정

kcat을 사용할 때, Consumer로 출력되는 데이터 포맷을 사용자가 지정할 수 있어, 출력 데이터를 더욱 보기 쉽고, 분석하기 용이하게 만들 수 있습니다. 이러한 출력 포맷은 kcat의 다양한 옵션을 통해 커스터마이징될 수 있으며, 메시지의 키, 값, 타임스탬프, 파티션 정보 등 다양한 메타데이터를 포함할 수 있습니다. 출력 포맷 설정에 대한 자세한 옵션은 kcat -h 명령어를 실행함으로써 확인할 수 있습니다.

kcat -C  -b localhost:10000 -t my-topic -f 'Topic[%t] Partition[%p] Offset[%o] %k:%s\n'

 

참조

https://docs.confluent.io/platform/current/tools/kafkacat-usage.html

 

Use kcat (formerly kafkacat) to test and debug Apache Kafka deployments | Confluent Documentation

kcat (formerly kafkacat) Utility for Confluent Platform kcat (formerly kafkacat) is a command-line utility that you can use to test and debug Apache Kafka® deployments. You can use kcat to produce, consume, and list topic and partition information for Kaf

docs.confluent.io

 

개요

현재 회사는 Strimzi를 활용하여 Kubernetes 환경에서 Kafka Cluster를 성공적으로 구축하여 상용 서버 및 개발 서버에서 운영하고 있습니다. 그러나 팀원 다 같이 사용하는 실제 운영 환경에서 카프카 설정을 변경하는 테스트나 다양한 실험을 진행하기에는 제약이 따르게 됩니다. 공유된 리소스 상에서의 실험은 다른 팀원들에게 영향을 줄 수 있으므로, 실제 작업 환경에서의 실험은 신중히 이루어져야 합니다. 이러한 상황에서는 개인 Kafka Cluster를 구축하여 필요한 테스트나 실험을 자유롭게 진행할 수 있는 환경이 필요하다는 것을 느끼게 되는데요. 이번 문서에서는 바로 그러한 필요성으로 인하여 Docker Compose를 사용하여 개인 Kafka Cluster를 구축하는 방법에 대해서 간단하게 정리하였습니다.
컨테이너 기술을 활용하여 Kafka Cluster를 구축한 경우 관리 및 유지보수의 효율성이  향상됩니다. 특히 컨테이너를 사용하면 Kafka Cluster의 삭제 및 재생성 작업을 매우 간단하고 빠르게 수행할 수 있으며, 컨테이너화된 환경이 제공하는 격리와 독립성 덕분에 컨테이너에서 발생하는 변경사항이 다른 컨테이너에 영향을 주지 않습니다. 따라서, Kafka Cluster를 구성하는 개별 컨테이너들을 쉽게 종료하고, 필요에 따라 새로운 설정으로 다시 시작할 수 있습니다. 이러한 유연성은 개발과 테스트 과정에서 특히 유용하며, 다양한 설정과 구성을 실험하고 최적화하는데 아주 편리합니다. (이번 실습에서는 Kafka Cluster를 구축하는 것을 목표로 하고 있기 때문에 docker-compose 및 docker 사용법에 대해서는 자세히 다루지 않습니다.)

 

이번 실습 목표

이번 실습의 목표는 개인 컴퓨터에 docker-compose를 이용하여 개인 전용 개발환경 kraft mode인 Kafka Cluster를 구축하는 것이기 때문에 최소한의 자원으로 Kafka Cluster를 구축하는 것을 목표로 하고 있습니다. (kraft mode는 zookeeper를 제거한 kafka cluster 입니다.)

  • kafka broker : 3대
  • kafka ui : 1대

docker-compose.yml 파일 작성

작업하는 위치에 다음과 같이 docker-compose.yml 파일을 생성합니다.

networks:
  kafka_network:

volumes:
  Kafka00:
  Kafka01:
  Kafka02:

services:
  ### Kafka00
  kafka00:
    image: bitnami/kafka:3.7.0
    restart: unless-stopped
    container_name: kafka00
    ports:
      - '10000:9094'
    environment:
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # KRaft settings
      - KAFKA_CFG_BROKER_ID=0
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka00:9093,1@kafka01:9093,2@kafka02:9093
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # Listeners
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka00:9092,EXTERNAL://127.0.0.1:10000
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      # Clustering
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
    networks:
      - kafka_network
    volumes:
      - "Kafka00:/bitnami/kafka"
  ### Kafka01
  kafka01:
    image: bitnami/kafka:3.7.0
    restart: unless-stopped
    container_name: kafka01
    ports:
      - '10001:9094'
    environment:
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # KRaft settings
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka00:9093,1@kafka01:9093,2@kafka02:9093
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # Listeners
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka01:9092,EXTERNAL://127.0.0.1:10001
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      # Clustering
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
    networks:
      - kafka_network
    volumes:
      - "Kafka01:/bitnami/kafka"
  ## Kafka02
  kafka02:
    image: bitnami/kafka:3.7.0
    restart: unless-stopped
    container_name: kafka02
    ports:
      - '10002:9094'
    environment:
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # KRaft settings
      - KAFKA_CFG_BROKER_ID=2
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka00:9093,1@kafka01:9093,2@kafka02:9093
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # Listeners
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka02:9092,EXTERNAL://127.0.0.1:10002
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      # Clustering
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
    networks:
      - kafka_network
    volumes:
      - "Kafka02:/bitnami/kafka"

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    restart: unless-stopped
    container_name: kafka-ui
    ports:
      - '8080:8080'
    environment:
      - KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka00:9092,kafka01:9092,kafka02:9092
      - DYNAMIC_CONFIG_ENABLED=true
      - KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
      - KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
      #- KAFKA_CLUSTERS_0_METRICS_PORT=9999
    depends_on:
      - kafka00
      - kafka01
      - kafka02
    networks:
      - kafka_network

컨테이너 실행하기

docker-compose.yml 파일을 잘 작성하였다면 해당 디렉토리에서 다음 명령어를 통하여 컨테이너를 실행합니다.

docker-compose up -d

정상 동작 확인

이번 실습에서는 클러스터 및 토픽 현황을 파악하기 쉽게 하기 위해 kafka-ui 컨테이너를 같이 동작하게 되어있습니다. kafka-ui를 통해 현재 카프카 클러스터가 정상적으로 동작하는지 브라우저에서 확인할 수 있습니다.
http://localhost:8080/ui/clusters/local/brokers

 

참조

https://github.com/ArminShoeibi/KafkaDockerCompose/blob/main/docker-compose-cluster.yml

 

'Kafka > kafka 기본' 카테고리의 다른 글

kcat(kafkacat) 사용법  (2) 2024.03.05
Docker compose를 이용한 Kafka Cluster 구축 (with Zookeeper)  (0) 2024.03.03

개요

현재 회사는 Strimzi를 활용하여 Kubernetes 환경에서 Kafka Cluster를 성공적으로 구축하여 상용 서버 및 개발 서버에서 운영하고 있습니다. 그러나 팀원 다 같이 사용하는 실제 운영 환경에서 카프카 설정을 변경하는 테스트나 다양한 실험을 진행하기에는 제약이 따르게 됩니다. 공유된 리소스 상에서의 실험은 다른 팀원들에게 영향을 줄 수 있으므로, 실제 작업 환경에서의 실험은 신중히 이루어져야 합니다. 이러한 상황에서는 개인 Kafka Cluster를 구축하여 필요한 테스트나 실험을 자유롭게 진행할 수 있는 환경이 필요하다는 것을 느끼게 되는데요. 이번 문서에서는 바로 그러한 필요성으로 인하여 Docker Compose를 사용하여 개인 Kafka Cluster를 구축하는 방법에 대해서 간단하게 정리하였습니다.

컨테이너 기술을 활용하여 Kafka Cluster를 구축한 경우 관리 및 유지보수의 효율성이  향상됩니다. 특히 컨테이너를 사용하면 Kafka Cluster의 삭제 및 재생성 작업을 매우 간단하고 빠르게 수행할 수 있으며, 컨테이너화된 환경이 제공하는 격리와 독립성 덕분에 컨테이너에서 발생하는 변경사항이 다른 컨테이너에 영향을 주지 않습니다. 따라서, Kafka Cluster를 구성하는 개별 컨테이너들을 쉽게 종료하고, 필요에 따라 새로운 설정으로 다시 시작할 수 있습니다. 이러한 유연성은 개발과 테스트 과정에서 특히 유용하며, 다양한 설정과 구성을 실험하고 최적화하는데 아주 편리합니다. (이번 실습에서는 Kafka Cluster를 구축하는 것을 목표로 하고 있기 때문에 docker-compose 및 docker 사용법에 대해서는 자세히 다루지 않습니다.)

이번 실습 목표

이번 실습의 목표는 개인 컴퓨터에 docker-compose를 이용하여 Kafka Cluster를 구축하는 것이기 때문에 최소한의 자원으로 Kafka Cluster를 구축하는 것을 목표로 하고 있습니다. 추가적인 설정은 zookeeper documentation, kafka documentation을 통해 설정하는 것을 권장합니다.

  • zookeeper : 1대
  • kafka broker : 3대
  • kafka ui : 1대

docker-compose.yml 파일 작성

작업하는 위치에 다음과 같이 docker-compose.yml 파일을 생성합니다.

version: '3'

services:
  zookeeper1:
    image: "bitnami/zookeeper:3.7.2"
    container_name: zookeeper1
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - "~/zookeeper/1:/bitnami/zookeeper"

  kafka1:
    image: "bitnami/kafka:3.6.0"
    container_name: kafka1
    ports:
      - "9092:9092"
    environment:
      - KAFKA_BROKER_ID=1
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181
    depends_on:
      - zookeeper1
    volumes:
      - ~/kafka/1:/bitnami/kafka

  kafka2:
    image: "bitnami/kafka:3.6.0"
    container_name: kafka2
    ports:
      - "9093:9092"
    environment:
      - KAFKA_BROKER_ID=2
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181
    depends_on:
      - zookeeper1
    volumes:
      - ~/kafka/2:/bitnami/kafka

  kafka3:
    image: "bitnami/kafka:3.6.0"
    container_name: kafka3
    ports:
      - "9094:9092"
    environment:
      - KAFKA_BROKER_ID=3
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181
    depends_on:
      - zookeeper1
    volumes:
      - ~/kafka/3:/bitnami/kafka

  kafka-ui:
    image: 'provectuslabs/kafka-ui:latest'
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9093,kafka3:9094
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper1:2181
    depends_on:
      - zookeeper1
      - kafka1
      - kafka2
      - kafka3

이번 실습은 개인 개발 환경 카프카 클러스터를 구축하는 것이 목적이기 때문에 volume을 로컬 호스트 경로를 마운트하여 사용하였습니다. 이러한 로컬 마운트를 통해 쉽게 카프카 데이터의 변경 및 로그에 대한 내용을 로컬에서 직접 확인하기 위한 설정입니다. 만약 개인 테스트 용도가 아니라면 별도의 volume을 구성하여 사용하는 것을 권장합니다.

 

zookeeper 환경변수

  • ALLOW_ANONYMOUS_LOGIN=yes 익명 사용자의 로그인을 허용합니다.

kafka 환경변수

  • KAFKA_BROKER_ID 각 브로커의 아이디를 부여합니다.
  • ALLOW_PLAINTEXT_LISTENER=yes 개발 환경에서 사용하도록 보안 리스너 설정이 없이 동작합니다.
  • KAFKA_ZOOKEEPER_CONNECT 카프카가 주키퍼에 연결할 수 있도록 주키퍼 연결 정보를 입력합니다.

컨테이너 실행하기

docker-compose.yml 파일을 잘 작성하였다면 해당 디렉토리에서 다음 명령어를 통하여 컨테이너를 실행합니다.

docker-compose up -d

정상 동작 확인

이번 실습에서는 클러스터 및 토픽 현황을 파악하기 쉽게 하기 위해 kafka-ui 컨테이너를 같이 동작하게 되어있습니다. kafka-ui를 통해 현재 카프카 클러스터가 정상적으로 동작하는지 브라우저에서 확인할 수 있습니다.

http://localhost:8080/ui/clusters/local/brokers

컨테이너 종료

docker-compose down -v

'Kafka > kafka 기본' 카테고리의 다른 글

kcat(kafkacat) 사용법  (2) 2024.03.05
Docker compose를 이용한 Kafka Cluster 구축 (KRAFT Mode)  (0) 2024.03.05

Service Discovery

Service Discovery는 동적으로 변화하는 환경에서 수집 대상을 자동으로 찾아내고 관리하는 기술로, 사용자가 수동으로 각각의 서비스를 추적하고 업데이트하는 번거로움을 해소해 줍니다. 이 기술은 특히 클라우드 환경에서 매우 유용하며, AWS, Azure, GCP와 같은 주요 Public Cloud 제공 업체들과 Kubernetes, 호스트, HTTP 서비스 등 다양한 환경에서의 Service Discovery를 지원합니다. Prometheus는 이러한 동적 환경에 탁월하게 대응할 수 있도록 Service Discovery 기능을 제공하고 있습니다. 본 문서에서는 Prometheus의 구성 파일을 통해 Service Discovery를 어떻게 정의하는지, 그리고 Prometheus가 이 설정을 바탕으로 어떻게 자동으로 수집 대상을 식별하고 선정하는지에 대한 방법에 대하여 기록하였습니다. 이를 통해 사용자는 Prometheus를 사용하여 보다 효율적으로 자원을 모니터링 할 수 있는 방법을 이해하는데 많은 도움이 되었으면 합니다.

 

Kubernetes Service Discovery

동적인 환경에서 서비스들이 어떻게 변화하는지를 실시간으로 파악하고 관리하는 것은 매우 복잡하고 어려운 일입니다. 이러한 문제를 해결하기 위해 Prometheus에서는 다양한 Service Discovery 메커니즘을 제공하고 있으며, 그 중에서도 특히 Kubernetes 환경에 최적화된 Kubernetes Service Discovery를 예시로 들어 Prometheus의 Service Discovery 작동 원리를 상세하게 설명해 보고자 합니다. 이 예제를 통해 설명드리는 것은 Kubernetes 환경에서의 Service Discovery 방식이지만, 이와 유사한 원리로 다양한 환경에서 Prometheus가 어떻게 서비스를 자동으로 감지하고 모니터링 대상으로 추가하는지에 대한 이해를 돕기 위함입니다. 따라서 이 예제를 통해 Kubernetes Service Discovery의 구체적인 작동 방식을 파악하시면, Prometheus가 지원하는 다른 환경들에서의 Service Discovery 작동 방식도 보다 쉽게 이해하고 적용하실 수 있습니다. 이번 문서의 내용은 Prometheus의 설치 과정이나 초기 설정 방법 등 기초적인 내용보다는, Prometheus가 어떻게 동적인 서비스 환경에서 모니터링 대상을 자동으로 발견하고 관리하는지에 대한 동작 원리와 방식에 초점을 맞추어 정리하였습니다.

 

Service Discovery 설정 파일 작성

Prometheus에서 scrape_configs 설정을 통해 Kubernetes 상의 특정 노드에서 실행 중인 exporter의 메트릭을 수집하는 과정은 매우 세밀하고 구체적인 접근이 필요합니다. 이러한 설정에서 scrape_configs의 선언 순서는 타겟을 효과적으로 식별하고 분류하는 데 있어 핵심적인 역할을 합니다. Prometheus는 이 설정들을 선언된 순서대로 처리하게 되므로, 가능한 한 빠르게 대상을 좁혀나갈 수 있도록 최적화된 조건을 선두에 배치하는 것이 중요합니다. 이렇게 함으로써, Prometheus는 불필요한 탐색을 줄이고, 보다 효율적으로 메트릭을 수집할 수 있는 대상을 신속하게 파악할 수 있습니다. (참고 : Kubernetes Service Discovery는 node, pod, service, endpoints, endpointslice의 리소스를 통하여 수집 대상을 정의할 수 있습니다.)

 

이 예제는 Prometheus의 구성에서 특히 Kubernetes 환경 내의 특정 노드에서 실행되고 있는 nginx-exporter라는 이름의 워크로드로부터 메트릭을 수집하는 방법니다. 이 과정에서 kubernetes_sd_configs 설정에서 role을 pod로 지정함으로써, Prometheus가 Kubernetes의 파드 단위에서 동작하는 서비스를 대상으로 메트릭 수집을 수행하도록 설정하였습니다.

  1. 해당 파드가 worker1이라는 노드에 동작한다면 Keep 합니다.
  2. worker1에 동작하는 파드의 워크로드가 nginx-exporter라면 Keep 합니다.
  3. 파드의 9113이라는 Container port가 있으면 Keep 합니다.

이러한 단계별 접근 방식은 Prometheus가 Kubernetes 환경 내에서 메트릭을 수집할 때 필요한 정확성과 효율성을 보장합니다. 이 과정을 통해 Prometheus는 불필요한 데이터 수집을 최소화하고, 관련된 메트릭만을 효과적으로 수집하며, 시스템의 부하를 줄이는 동시에 모니터링의 정확성을 높일 수 있습니다.

scrape_configs:
  - job_name: 'my-metrics'
    kubernetes_sd_configs:
      - role: pod  # 파드 레벨에서 메트릭 수집

    relabel_configs:
      # 동작중인 노드이름이 'worker1'인 파드만 수집 설정
      - source_labels: [__meta_kubernetes_pod_node_name]
        action: keep
        regex: worker1

      # 워크로드 이름이 'nginx-exporter'인 파드만 수집 설정
      - source_labels: [__meta_kubernetes_pod_label_app]
        action: keep
        regex: nginx-exporter

      # 컨테이너 포트가 9113 파드만 수집
      - source_labels: [__meta_kubernetes_pod_container_port_number]
        action: keep
        regex: '(.*)9113'

 

Service Discovery 동작 방식

Prometheus의 Service Discovery 메커니즘은 크게 Discovery Manager와 Scrape Manager 두 가지 핵심 구성 요소의 동작에 의해 구현됩니다. 이 Manager는 Prometheus가 동적 환경에서 서비스를 발견하고, 해당 서비스로부터 메트릭을 수집하는 작업을 수행합니다.

 

Discovery Manager 동작

Discovery Manager는 사용자가 정의한 Service Discovery 설정에 따라 각각의 서비스 발견 메커니즘을 실행하고, 이러한 메커니즘을 통해 발견된 리소스의 변화를 지속적으로 감지합니다. 이렇게 발견된 리소스에 필요한 메타데이터를 Label로 할당하는 작업을 수행합니다. 이 메타데이터는 주로 리소스를 식별하고 분류하는 데 사용되는 라벨 형태로 제공되며, Prometheus가 메트릭을 수집할 때 리소스를 구분하고 필터링하는 데 필수적입니다. 예를 들어, Kubernetes 환경에서 Discovery Manager는 각 파드, 서비스, 노드 등의 Kubernetes 리소스에 대한 정보를 수집하고, 이러한 리소스에 적절한 라벨을 할당하여 Scrape Manager가 이들을 구분할 수 있도록 합니다. Discovery Manager의 이름만 봤을 때는 위에서 선언한 scrape_configs를 통하여 필터링까지 수행하는 것 같지만 실제로 Discovery Manager에서는 필터링을 수행하지 않고 변경된 리소스만 감지하는 역할만 수행합니다.

 

소스 코드 위치

https://github.com/prometheus/prometheus/tree/main/discovery

 

Scrape Manager 동작

Discovery Manager가 발견한 리소스 정보는 일종의 중간 저장소 또는 메시지 큐인 Channel을 통해 Scrape Manager에 전달됩니다. Scrape Manager는 scrape_configs에 정의된 설정을 기반으로 실제 메트릭 수집 대상을 선별하고 필터링하는 중요한 작업을 수행합니다. scrape_configs 설정에는 수집할 대상의 종류, 주소, 포트, 경로 등이 명시되어 있으며, Scrape Manager는 이 정보를 사용하여 Prometheus가 어떤 서비스로부터 메트릭을 수집할지를 결정합니다. 또한, 이 과정에서 특정 조건에 맞는 리소스에 대해서만 메트릭을 수집하도록 필터링하여 Prometheus가 설정된 간격에 따라 각 수집 대상으로부터 메트릭을 수집합니다.

 

소스 코드 위치

https://github.com/prometheus/prometheus/tree/main/scrape

대상을 필터링하는 소스코드 위치

https://github.com/prometheus/prometheus/blob/main/model/relabel/relabel.go

 

Service Discovery 동작 순서

위에서 선언한 scrape_configs를 기반으로 동작하는 방식은 다음과 같습니다.

Discovery Manager는 kubernetes_sd_configs 설정을 통해 Kubernetes 환경과의 통신을 위한 Kubernetes 클라이언트를 초기화합니다. 이 클라이언트는 Kubernetes API Server에 Request를 통하여, 설정된 role에 따라 특정 Kubernetes 리소스(예를 들어, Pod)의 변화를 실시간으로 감지할 수 있는 Watcher(Informer)를 생성합니다. 이러한 방식으로 Prometheus는 Kubernetes 클러스터 내의 Pod 리소스의 생성, 업데이트, 삭제 등의 이벤트를 지속적으로 모니터링할 수 있게 됩니다.

 

Discovery Manager가 Kubernetes API 서버로부터 받아온 Pod 정보에는 다양한 메타데이터가 포함되어 있으며, 이 중에서 특히 중요한 정보는 라벨 형태로 Prometheus에 전달됩니다. 예를 들어, __meta_kubernetes_namespace, __meta_kubernetes_pod_name, __meta_kubernetes_pod_container_name과 같은 라벨은 각 Pod의 메타데이터의 값을 Label에 추가합니다. 이러한 Label들은 메트릭 데이터에 포함되어 Prometheus 사용자가 보다 상세한 쿼리와 분석을 수행할 수 있도록 합니다.

 

이렇게 Discovery Manager에 의해 수집되고 라벨이 할당된 리소스 정보는 Channel을 통해 Scrape Manager로 전달됩니다. Scrape Manager는 이 정보를 받아 relabel_configs 설정에 정의된 규칙을 적용하여 실제 메트릭을 수집할 대상을 최종적으로 결정합니다. relabel_configs에서는 수집할 대상의 라벨을 재정의하거나, 특정 조건에 부합하는 리소스만을 필터링하여 수집 대상으로 선정하는 작업을 수행합니다. 이 과정을 통해 Prometheus는 사용자가 정의한 규칙에 따라 필요한 리소스로부터만 메트릭을 수집하게 되며, 이는 Prometheus의 수집 효율성을 극대화하고 불필요한 데이터 수집을 최소화하는 데 중요한 역할을 합니다.

 

마지막으로, Scrape Manager는 scrape_configs에 정의된 간격에 따라, 선별된 대상으로부터 메트릭을 수집합니다. 이때 각 메트릭에는 Discovery Manager로부터 할당받은 라벨 정보가 포함되어 있으며, relabel_configs에 정의된 순서대로 수집할 대상과 수집하지 않을 대상을 분리 또는 라벨 재정의하는 작업을 수행합니다. 이렇게 분리된 수집 대상에게 사용자가 정의한 일정 간격으로 메트릭을 수집합니다.

 

 

참조

https://prometheus.io/docs/prometheus/latest/configuration/configuration/

 

Configuration | Prometheus

An open-source monitoring system with a dimensional data model, flexible query language, efficient time series database and modern alerting approach.

prometheus.io

 

 

'Prometheus' 카테고리의 다른 글

Prometheus exporter의 구조와 개인 exporter 만들기  (0) 2023.11.04
Prometheus 윈도우10에 설치  (0) 2021.06.13

Elasticsearch Master node scale in

Elasticsearch를 관리 운영을 하다 보면 안정성에 너무 신경 쓴 나머지 실제 부하보다 너무 많은 마스터 노드를 구성하게 되어 나중에 마스터 노드를 불 필요하게 많이 설정을 했다는 것을 깨닫게 되는 순간이 올 수 있습니다. 물론 필요한 만큼 마스터 노드를 구축했지만 회사의 자금 사정으로 노드의 개수를 줄여야 되는 순간도 발생할 수 있고요 (현재 저의 상황입니다.) ...  이럴 때 아무 생각 없이 마스터 노드의 개수를 줄이면 다음과 같은 에러 master not discovered or elected yet, an election requires at least 3 nodes with ids from 를 출력하면서 Elasticsearch의 클러스터에 문제가 발생할 수 있는데요. 이번 문서에서는 뜻하지 않게 마스터 노드의 개수를 줄여야 될 때 해당 에러를 해결하여 마스터 노드의 개수를 줄이는 방법에 대하여 정리하였습니다.

 

이번 실습의 목표!!

이번 테스트의 목적은 극단적으로 테스트를 하기 위해 마스터 노드 5대로 구성된 클러스터를 마스터 노드 1대로 줄이는 것을 목표로 하였습니다.

 

이런 에러가 발생하는 이유는 무엇일까?

Elasticsearch 공식 문서를 참고하면 클러스터의 마스터 노드가 한 번에 절반이 넘는 개수의 마스터 노드가 드랍되면 Elasticsearch의 클러스터를 운영할 수 없게 됩니다. 예를 들어 7개인 경우 4대, 5대인 경우 3대, 3대인 경우 2대의 노드가 드랍되면 문제가 발생하게 됩니다. 절반 이상의 노드가 한번에 드랍되면 클러스터를 유지할 수 없기 때문에 해당 에러가 발생하게 됩니다.

When removing master-eligible nodes, it is important not to remove too many all at the same time. For instance, if there are currently seven master-eligible nodes and you wish to reduce this to three, it is not possible simply to stop four of the nodes at once: to do so would leave only three nodes remaining, which is less than half of the voting configuration, which means the cluster cannot take any further actions.
More precisely, if you shut down half or more of the master-eligible nodes all at the same time then the cluster will normally become unavailable. If this happens then you can bring the cluster back online by starting the removed nodes again.
As long as there are at least three master-eligible nodes in the cluster, as a general rule it is best to remove nodes one-at-a-time, allowing enough time for the cluster to automatically adjust the voting configuration and adapt the fault tolerance level to the new set of nodes.

 

실제 출력된 Error 메시지

아무런 설정을 하지 않고 마스터 노드의 개수를 줄이면(Scale in) 에러를 출력하면서 클러스터가 동작하지 않습니다. 먼저 에러의 메시지에서 지시한 데로 일단 3개 이상의 마스터 노드로 Scale out을 수행하여 일단 먼저 클러스터부터 다시 정상화해줍니다.저 같은 경우 원래대로 5대로 돌려놓았습니다.

{"@timestamp":"2023-11-26T05:37:28.578Z", "log.level": "WARN", "message":"master not discovered or elected yet, an election requires at least 3 nodes with ids from [ZppHhQQ9TBOt9BKVvqasng, 98xtbt4RRVyDEYUzidEqZw, aezJdnjdRc6XSlSCoY99CA, yTKgyqsqTc-kV7EyOkbRPQ, A86U5_pGQ9O_s2UP16sbOA], have only discovered non-quorum [{elasticsearch-master-0}{aezJdnjdRc6XSlSCoY99CA}{RdbsY5IJT-6DxLbE-tTXEA}{elasticsearch-master-0}{10.1.0.205}{10.1.0.205:9300}{m}]; discovery will continue using [10.1.0.208:9300, 10.1.0.204:9300, 10.1.0.207:9300, 10.1.0.206:9300] from hosts providers and [{elasticsearch-master-1}{yTKgyqsqTc-kV7EyOkbRPQ}{NUvkKaw7Ts-sKun6xLxVeQ}{elasticsearch-master-1}{10.1.0.207}{10.1.0.207:9300}{m}, {elasticsearch-master-3}{A86U5_pGQ9O_s2UP16sbOA}{iowYGyv2S6yV3o35KyV5ZQ}{elasticsearch-master-3}{10.1.0.208}{10.1.0.208:9300}{m}, {elasticsearch-master-2}{98xtbt4RRVyDEYUzidEqZw}{llyII7k3QsSjLrYuxaWe7g}{elasticsearch-master-2}{10.1.0.204}{10.1.0.204:9300}{m}, {elasticsearch-master-0}{aezJdnjdRc6XSlSCoY99CA}{RdbsY5IJT-6DxLbE-tTXEA}{elasticsearch-master-0}{10.1.0.205}{10.1.0.205:9300}{m}] from last-known cluster state; node term 2, last-accepted version 38 in term 2", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[elasticsearch-master-0][cluster_coordination][T#1]","log.logger":"org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper","elasticsearch.cluster.uuid":"MtfvN5pDQHmSCjaFEQpmPA","elasticsearch.node.id":"aezJdnjdRc6XSlSCoY99CA","elasticsearch.node.name":"elasticsearch-master-0","elasticsearch.cluster.name":"elasticsearch"}

 

마스터 노드 권한 제외 시키기

현재 마스터 노드의 권한을 가진 노드들이 elasticsearch-master-0, elasticsearch-master-1, elasticsearch-master-2, elasticsearch-master-3, elasticsearch-master-4로 구성되어 있습니다. 먼저 마스터 노드의 개수를 줄이기 위해 1개의 마스터 노드를 제외하고 다른 마스터 노드의 권한을 제외시켜 줍니다. 저는 elasticsearch-master-0만 마스터 노드의 권한을 유지하고 그 이외의 노드는 마스터 노드의 권한을 제외하였습니다. (솔직히 이번 실습에는 이게 전부입니다.)

### 마스터 투표권 제외
POST /_cluster/voting_config_exclusions?node_names=elasticsearch-master-1,elasticsearch-master-2,elasticsearch-master-3,elasticsearch-master-4

 

마스터 노드 줄이기

다른 노드들의 권한을 제외했다면 이제 모든 준비는 끝났습니다. 이제 그냥 줄여주면 됩니다. 한 번에 너무 많은 노드를 줄이는 것은 권장하지 않기 때문에 만약 서비스 중인 클러스터라면 안전하게 한 대씩 줄여주세요.

저 같은 경우 Elasticsearch를 Kubernetes에 구성하여 사용하기 때문에 아래 명령어로 마스터 노드의 개수를 줄였습니다.

$ kubectl scale sts elasticsearch-master --replicas=1

 

마지막으로

혹시나 나중에 다시 마스터 노드를 늘릴 수 있기 때문에 마스터 노드 권한을 제외한 목록을 제거해 줍니다. (제거해주지 않으면 나중에 스캐일 아웃해도 마스터의 권한을 가질 수 없습니다.)

DELETE /_cluster/voting_config_exclusions

 

저는 마스터 노드를 줄이는 업무를 진행할 때 이 방법을 몰라서 많이 당황을 했었는데요... 다행히 Elasticsearch 공식 문서에 잘 설명이 되어 있어 무사히 업무를 잘 마무리 할 수 있었습니다. 이런 경험 덕분에 보다 더 안정적인 클러스터를 운영할 수 있는 엔지니어가 된 거 같습니다.

 

참조

https://www.elastic.co/guide/en/elasticsearch/reference/current/add-elasticsearch-nodes.html

 

Add and remove nodes in your cluster | Elasticsearch Guide [8.11] | Elastic

Voting exclusions are only required when removing at least half of the master-eligible nodes from a cluster in a short time period. They are not required when removing master-ineligible nodes, nor are they required when removing fewer than half of the mast

www.elastic.co

 

'Elasticsearch > Error 처리' 카테고리의 다른 글

[Error] Error bulk 429 Too many requests  (0) 2023.06.30

맥북에 oh my zsh 설치하기

새로운 맥북을 지급받아 오랜만에 새롭게 맥북을 세팅하게 되었는데... 너무 오랜만에 하는 세팅이라 세팅하는 방법을 잊어버렸습니다. 그래서 이번에 새로운 맥북을 설정하는 김에 내용을 정리하였습니다. 제가 보려고 정리한 내용이지만 누군가에게 도움이 되었으면 좋겠습니다.

 

Homebrew 설치하기

Homebrew는 macOS 운영 체제에서 소프트웨어를 설치하고 관리하기 위한 패키지 관리자입니다. Homebrew를 통하여 소프트웨어 설치를 진행할 예정입니다.

 

homebrew 설치

brew -v 명령어를 통하여 homebrew가 설치 되었는지 확인합니다. 만약 homebrew가 설치되어 있지 않다면 아래 명령어를 통하여 설치를 진행해 주세요

# homebrew 설치 확인
brew -v

# homebrew 설치
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"

homebrew : https://brew.sh/ko/

 

iterm2 설치하기

iterm2는 기본 터미널에서 기능이 확장된 터미널 프로그램입니다. 이런 iterm을 설치한 homebrew를 통하여 설치를 합니다.

brew install --cask iterm2

iterm2 : https://formulae.brew.sh/cask/iterm2

 

curl 설치하기

이제 iterm2로 들어와서 curl 또한 homebrew를 통하여 설치합니다.

brew install curl

 

oh-my-zsh 설치하기

위에서 받은 curl을 이용하여 oh-my-zsh를 설치합니다.

sh -c "$(curl -fsSL https://raw.githubusercontent.com/robbyrussell/oh-my-zsh/master/tools/install.sh)"

 

oh-my-zsh 디렉토리 구조 (넘어가도 무관)

oh-my-zsh를 설치하면 ~/.oh-my-zsh 디렉토리가 생기고 해당 디렉토리의 구조를 보면 다음과 같습니다.

❯ ls -al
total 144
drwxr-xr-x@  23 hsw  staff    736 11  3 08:37 .
drwxr-x---+  38 hsw  staff   1216 11 17 19:19 ..
-rw-r--r--@   1 hsw  staff    115  4  8  2023 .editorconfig
drwxr-xr-x@  14 hsw  staff    448 11 17 19:00 .git
drwxr-xr-x@   7 hsw  staff    224  5 14  2023 .github
-rw-r--r--@   1 hsw  staff    109  4  8  2023 .gitignore
-rw-r--r--@   1 hsw  staff    131  4  8  2023 .gitpod.Dockerfile
-rw-r--r--@   1 hsw  staff    259  4  8  2023 .gitpod.yml
-rw-r--r--@   1 hsw  staff     49  4  8  2023 .prettierrc
-rw-r--r--@   1 hsw  staff   3374  4  8  2023 CODE_OF_CONDUCT.md
-rw-r--r--@   1 hsw  staff   9211  4  8  2023 CONTRIBUTING.md
-rw-r--r--@   1 hsw  staff   1142  4  8  2023 LICENSE.txt
-rw-r--r--@   1 hsw  staff  18224 11  3 08:37 README.md
-rw-r--r--@   1 hsw  staff    953 11  3 08:37 SECURITY.md
drwxr-xr-x@   6 hsw  staff    192  4  8  2023 cache
drwxr-xr-x@   5 hsw  staff    160 11  3 08:37 custom
drwxr-xr-x@  22 hsw  staff    704 11  3 08:37 lib
drwxr-xr-x@   3 hsw  staff     96 11 17 19:00 log
-rw-r--r--@   1 hsw  staff   7470 11  3 08:37 oh-my-zsh.sh
drwxr-xr-x@ 334 hsw  staff  10688 11  3 08:37 plugins
drwxr-xr-x@   3 hsw  staff     96  4  8  2023 templates
drwxr-xr-x@ 145 hsw  staff   4640 11  3 08:37 themes
drwxr-xr-x@   9 hsw  staff    288 11  3 08:37 tools

여기서 우리가 중점적으로 봐야하는 디렉토리는 아래와 같습니다.

  • themes : oh-my-zsh의 테마들이 들어있습니다.
  • plugins : oh-my-zsh의 부가적인 기능을 수행할 수 있는 플러그인이 있습니다. (기본적으로 git plugin은 설치되어 있습니다.)
  • custom : 외부 설정인 themes, plugins을 저장하는 디렉토리입니다. 

 

기타 플러그인 설치하기

저는 자동완성 플러그인을 설치하였습니다. 본인의 취향에 맞게 플러그인을 설치해 주세요.

git clone https://github.com/zsh-users/zsh-autosuggestions ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/zsh-autosuggestions
git clone https://github.com/zsh-users/zsh-syntax-highlighting ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/zsh-syntax-highlighting

 

vim ~/.zshrc 설정파일에서 plugins을 찾아서 내용을 아래와 같이 수정합니다.

plugins=(git zsh-syntax-highlighting zsh-autosuggestions)

 

테마 설치하기

저는 개인적으로 powerlevel10k 테마를 좋아하기 때문에 powerlevel10k로 설치하는 것을 진행하였습니다.

git clone --depth=1 https://github.com/romkatv/powerlevel10k.git ${ZSH_CUSTOM:-$HOME/.oh-my-zsh/custom}/themes/powerlevel10k

 

vim ~/.zshrc 설정파일에서 ZSH_THEME을 찾아서 내용을 다음과 같이 수정합니다.

ZSH_THEME="powerlevel10k/powerlevel10k"

 

 

마지막으로 적용하고 powerlevel10k 테마의 원하는 설정값을 적용하세요.

source ~/.zshrc

 

powerlevel10k 테마 수정 방법

powerlevel10k 설치하다 보면 종종 설정값을 바꾸고 싶은 순간이 자주 오는데 아래 명령어를 통하여 다시 원하는 설정을 적용하세요

p10k configure

 

vscode와 iTerm2 연동하기

vscdoe와 iTerm2를 연동하기 위해 vscode를 실행하고 설정(cmd + ,)으로 들어가 "External: Osx Exec"를 검색하여 "iTerm.app"을 입력해주면 됩니다.

설정이 완료되었다면 맥에서는 단축키 cmd + shift + C를 이용하여 정상적으로 vscode에서 iTerm2를 외부 터미널로 생성하는지 확인합니다.

Prometheus exporter의 구조

Prometheus exporter의 구조는 하나 이상의 registry와 여러 개의 collector로 구성되어 있습니다. Prometheus가 "/metrics"로 request를 보내게 되면 "/metrics" endpoint에 등록된 handler에 의해 registry를 통하여 metric을 수집하게 됩니다. registry는 각각의 collector로부터 metric을 수집하여 metric을 정렬 수행한 후 http body 형식으로 변환하여 response를 전달하는 구조로 되어 있습니다. 

 

Collector란

collector는 각각의 특성에 맞는 데이터를 수집하여 해당 데이터를 metric으로 변환하여 registry에 전달하는 역할을 합니다. 예를 들어 processor collector는 processor에 관련된 데이터만 수집하여 processor metric을 생성하고 disk collector는 disk에 관련한 데이터를 수집하여 disk metric을 생성하도록 역할을 분리해 놓은 것입니다. 전체적인 Exporter의 구조는 아래 그림과 같습니다.

 

쉽고 간단하게 개인 Exporter 만들기

이번에 개인 Exporter를 만들기 위해서 제가 개발한 exporter_builder라는 Opensource를 이용하였습니다. exporter_builder는 명령어 2번만으로 Prometheus exporter의 기본적인 틀을 생성해 주는 Opensource입니다. 개인 Prometheus exporter를 초기에 개발 시 유용하게 사용할 수 있습니다.

https://github.com/k8shuginn/exporter_builder

 

GitHub - k8shuginn/exporter_builder: exporter_builder is a project that allows you to create your own prometheus exporter.

exporter_builder is a project that allows you to create your own prometheus exporter. - GitHub - k8shuginn/exporter_builder: exporter_builder is a project that allows you to create your own prometh...

github.com

 

1. 오픈소스 exporter_builder를 설치합니다.

먼저 go가 설치되어 있어야합니다. 그리고 아래 명령어를 실행하여 exporte_builder를 설치하여 줍니다. exporter_builder를 설치하면 이제 builder라는 명령어를 사용할 수 있습니다.

go install github.com/k8shuginn/exporter_builder/cmd/builder@latest

 

2. config.yaml 파일을 생성합니다.

name: my_exporter
module: github.com/myname/my_exporter
collectors:
  - sample1
  - sample2

name : 개인 exporter의 이름

module : go.mod init을 하기 위한 프로젝트명

collectors : 각 특성의 collector 이름

 

3. exporter_builder를 이용하여 exporter 생성

builder --config ./config.yaml

exporter가 정상적으로 생성되면 아래와 같이 my_exporter가 생성됩니다.

 

4. Collector 제작

sample2의 collector.go파일을 확인하면 아래 코드와 같습니다. Prometheus에서 "/metrics"로 request를 보내게 되면 regitsry의 gather를 통하여 Collect 메서드를 호출하게 됩니다. 그러면 데이터를 만들어 "chan<- prometheus.Metric" channel에 Metric을 전송해 주는 코드를 작성하시면 됩니다.

package sample2

import (
	"github.com/prometheus/client_golang/prometheus"
)

var (
	sampleDesc = prometheus.NewDesc(
		"sample_metric",
		"sample metric",
		[]string{"key1", "key2"}, nil)
)

func SetFlags() {

}

type Collector struct {
}

func NewCollector() *Collector {
	return &Collector{}
}

func (c *Collector) Describe(ch chan<- *prometheus.Desc) {
	ch <- sampleDesc
}

func (c *Collector) Collect(ch chan<- prometheus.Metric) {
	ch <- prometheus.MustNewConstMetric(sampleDesc, prometheus.GaugeValue, 1, "value1", "value2")
}

 

SetFlags는 flag 사용시 이용하시면 됩니다. 사용하지 않더라도 지우지 마세요. 이상 Exporter의 구조와 개인 Exporter를 만드는 방법에 대하여 알아봤습니다. 자신만의 Exporter로 안전한 모니터링 시스템을 구축하세요 ~

'Prometheus' 카테고리의 다른 글

Prometheus가 Kubernetes service discover하는 동작 방식  (0) 2024.02.23
Prometheus 윈도우10에 설치  (0) 2021.06.13

Elasticsearch rack 설정을 통한 샤드 배치

보다 더 안정적인 Elasticsearch cluster를 운영하기 위해서는 각각의 데이터 노드가 별도의 서버 Rack에서 동작하도록 설정하는 것을 권장하고 있습니다. 그 이유는 서버의 하드웨어의 이슈가 발생하거나 서버실에서의 화재 또는 네트워크의 문제로 인하여 해당 서버랙이 문제가 발생하게 되면 해당 서버랙에 가동 중인 모든 노드에 영향을 미칠 수 있습니다. 만약 문제가 발생한 서버랙에 여러 대의 데이터 노드가 동작하고 있었다면 치명적인 데이터 유실이 발생하게 됩니다. 그렇기 때문에 각각의 서버랙을 구분하여 인덱스의 Primary shard와 Replica shard를 동일한 서버랙에 배치하지 못하도록 설정을 한다면 서버랙에 문제가 발생하여도 데이터의 유실을 방지할 수 있습니다. 이번 문서에서는 Elasticsearch data node에 rack_id를 부여하여 Primary shard와 Replica shard를 rack_id로 구분하여 배치할 수 있는 방법에 대하여 정리하였습니다.

 

극단적인 예시로 모든 데이터 노드를 하나의 랙에 동작하고 있다면 해당 랙에 문제가 발생 시 모든 데이터를 유실하여 서비스 운영에 치명적인 문제가 될 수 있습니다. 그렇기 때문에 각각의 서버 랙에 노드를 균등하게 배치하고 각 서버랙이 설치된 데이터 노드에 rack_id를 부여하면 하나의 서버랙에 문제가 발생하여도 데이터는 Replica shard를 통하여 데이터를 보존할 수 있습니다.

 

데이터 노드 서버 rack_id 설정

"rack one"에 설치되는 데이터 노드의 이름을 elasticsearch-one-0, elasticsearch-one-1, elasticsearch-one-2라고 지정하고 "rack two"에 설치되는 데이터 노드의 이름을 elasticsearch-two-0, elasticsearch-two-1, elasticsearch-two-2라고 설정하도록 하겠습니다.

 

rack one에 설치 되는 노드 elasticsearch.yml 파일에 값 추가하고 실행하기

node.attr.rack_id: rack_one

 

rack two에 설치 되는 노드 elasticsearch.yml 파일에 값 추가하고 실행하기

node.attr.rack_id: rack_two

 

master node에 rack_id를 지정하여 rack_id를 통해 샤드를 할당할 때 replica shard가 primary shard와 rack_id가 같지 않도록 elasticsearch.yml 파일에 다음과 같이 값을 추가합니다.

cluster.routing.allocation.awareness.attributes: rack_id

 

인덱스에 데이터를 인덱싱하고 결과 확인

인덱스에 number_of_shards의 값을 4로 설정하고 number_of_replicas의 값을 1로 설정하여 데이터를 인덱싱 하고 테스트를 진행하였습니다.

위 결과로 확인해보면 rack one에 설치된 노드에 primary shard가 생성되면 해당 샤드의 replica shard는 반듯이 rack two에 배치가 된 것을 확인할 수 있습니다. 이렇게 샤드의 배치를 서버랙 기준으로 설정하여 보다 안정적인 서비스를 운영할 수 있습니다.

 

마지막으로

서버랙을 기준으로 샤드할당을 지정할 수 있지만 zone을 기준으로 샤드할당을 설정할 수도 있습니다. 그러나 AWS Zone을 기준으로 설정할 경우 primary shard와 replica shard가 서로 데이터 sync를 하는 과정에서 너무 많은 네트워크 비용이 발생할 수 있습니다. 회사에서 많은 비용을 투자할 수 있다면 AWS Zone까지 설정하는 것이 가장 최선이지만 저의 경우 네트워크 비용 발생이 부담스러워 비용 발생을 최소화하기 위해 rack_id를 기준으로 샤드를 배치하는 것까지만 설정하고 운영하는 것이 비용 대비 좋은 선택이라고 생각됩니다.

 

참조

https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cluster.html

 

Cluster-level shard allocation and routing settings | Elasticsearch Guide [8.10] | Elastic

You cannot mix the usage of percentage/ratio values and byte values across the cluster.routing.allocation.disk.watermark.low, cluster.routing.allocation.disk.watermark.high, and cluster.routing.allocation.disk.watermark.flood_stage settings. Either all val

www.elastic.co

 

 

 

Opensearch를 사용하게 된 이유

Elasticsearch를 기본적으로 사용하는데 문제는 없지만 일부 기능들이 Enterprise license에서만 제공하는 기능들이 있습니다. 그런 이유로 회사에서 Elasticsearch에서 Opensearch로 데이터를 Migration 하는 작업을 진행하였습니다. 저희 같은 경우 비용 절감을 위해 AWS Managed service를 이용하고 있지 않다는 점을 알아주셨으면 좋겠습니다. 이번 문서에서는 데이터를 Migration 하는 방법에 대해서 정리해지만 해당 방법이 최선이라는 생각은 하지 않습니다. 더 좋은 방법을 알고 계신다면 댓글로 남겨주세요.

 

시작하기 전에 간단한 설명

먼저 저의 Elasticsearch의 클러스터 운영방법에 대해서 간단하게 설명 드리겠습니다. 저희 같은 경우 인덱스를 날짜 별로 관리를 하고 있기 때문에 이전의 날짜에는 데이터가 업데이트되지 않는 방식으로 운영하고 있습니다. 그리고 Snapshot의 경우 AWS S3에 저장하고 있습니다.

 

Opensearch 클러스터 구성

달리는 차의 바퀴를 갈이 끼우는 것은 매우 어려운 일이기 때문에 우리는 서비스를 중단하지 않고 데이터를 이주하기 위해서는 Elasticsearch의 동일한 스팩의 Opensearch 클러스터를 미리 구성해야 합니다. 

 

기존에 있던 Elasticsearch와 동일한 스팩의 Opensearch를 클러스터를 구성하고 Elasticsearch와 Opensearch의 스냅샷 Repository를 동일한 S3를 바라보도록 설정하였습니다. 이렇게 설정한 것을 그림으로 표현하면 다음과 같습니다.

Elasticsearch -> Opensearch 데이터 Migration

1. Elasticsearch 에서 Snapshot을 생성합니다.

Elasticsearch와 Opensearch 모두 동일하게 바라보는 Repository로 Snapshot을 생성합니다.

Elasticsearch 스냅샷 생성 방법 (https://stdhsw.tistory.com/entry/Elasticsearch-710-%EB%B2%84%EC%A0%84-AWS-S3-Snapshot-%EC%83%9D%EC%84%B1%ED%95%98%EA%B8%B0Kubernetes)

 

2. Indexing하는 Processor를 Elasticsearch와 Opensearch 둘 다 전송합니다.

데이터를 인덱싱하는 Processor의 타겟을 Elasticsearch, Opensearch 둘 다 지정합니다. 대부분 FluentBit, Vector 등 멀티 타겟을 지정하는 것을 지원하지만 만약 인덱싱하는 Processor에 멀티 타겟을 지정할 수 없다면 envoy proxy에 대해서 알아보는 것을 권장합니다.

이렇게 두 타켓을 지정하였다면 아래 그림과 같은 모습입니다.

 

3. 하루전날짜, 이틀전날짜 인덱스는 스냅샷으로 이주

1번에서 수행한 Elasticsearch의 스냅샷을 이용하여 하루전날짜, 이틀전날짜의 인덱스를 Opensearch에서 복원합니다.

Opensearch 스냅샷 복원방법 (https://stdhsw.tistory.com/entry/Opensearch-Snapshot-AWS-S3%EB%B0%B1%EC%97%85-%EB%B0%8F-%EB%B3%B5%EA%B5%AC)

 

4. Search processor의 타겟을 변경합니다.

Search processor의 타겟을 변경하고 안정적으로 서비스가 동작하는지 전체적으로 검토합니다. 그리고 서비스가 안정적이라고 확신하면 그때 Indexing Processor가 Elasticsearch에 인덱싱 하지 않도록 합니다.

 

5. Elasticsearch의 자원을 반납합니다.

이제 마지막으로 모든 것이 정상적으로 운영이 가능하다면 Elasticsearch의 자원을 반납해줍니다.

 

마지막으로

지금까지 Elasticsearch에서 Opensearch로 Migration하는 방법을 정리해 봤는데요 이 방법은 저의 머릿속에서의 최선의 방법이라 생각해서 수행한 방법입니다. 다행히 서비스 동작에 문제는 없었지만 해당 방법이 최선의 방법이라는 생각은 하지 않습니다. 만약 이 방법보다 더 좋은 방법을 알고 계신다면 댓글로 공유해 주시면 감사하겠습니다.

 

'Opensearch' 카테고리의 다른 글

Opensearch Searchable Snapshot  (0) 2023.07.17
Opensearch Snapshot (AWS S3)백업 및 복구  (0) 2023.07.17
Opensearch SM(Snapshot Management)  (0) 2023.07.17
Opensearch Index Template  (0) 2023.07.14
Opensearch ISM(Index State Management)  (0) 2023.07.13

Data streams

Data Stream은 시계열 데이터를 관리하기 위한 기능으로 하나 이상의 시계열 인덱스를 논리적으로 묶은 그룹으로, 여러 시계열 데이터 소스를 하나의 개체로 인지하여 관리하기 때문에 로그, 이벤트, 지표 및 기타 지속적으로 생성되는 데이터에 적합한 기능입니다. Elasticsearch의 경우 역색인 형태를 가지고 있기 때문에 Document를 삭제하거나 업데이트를 하는데 많이 비율적인 모습을 보이고 있습니다. 그렇기 때문에 데이터를 Document 단위로 관리를 하는 것보다는 인덱스 단위로 관리하는 것이 더 효율적입니다. 이번 문서에서는 시계열 데이터를 data stream을 통해 관리하는 방법에 대하여 정리하였습니다.

 

Data stream의 인덱싱과 검색 동작 방식

data stream을 통해 데이터를 인덱싱 할 경우 마지막으로 생성된 인덱스에 데이터를 인덱싱을 수행합니다.

그러나 검색은 인덱싱과 다르게 data stream으로 생성된 모든 인덱스에 검색 요청을 수행합니다.

사전 작업(클러스터 구성 및 설정)

먼저 이번 실습을 위해 저는 Master Node 3대, Hot Data 2대, Warm Data 2대로 클러스터를 구성하였으며, 빠른 테스트를 위해 클러스터가 인덱스를 롤오버 검사를 하는 주기를 5초로 설정하였습니다. 설정 방법은 아래와 같습니다.

### 인덱스 롤오버 체크 주기 변경
PUT http://localhost:9200/_cluster/settings
Content-Type: application/json

{
  "persistent": {
    "indices.lifecycle.poll_interval": "5s"
  }
}

 

ILM(Index Lifecycle Management) 설정

data stream은 ILM(Index Lifecycle Management)를 고려하여 추가된 기능으로 data stream을 사용할 때 ILM 기능과 같이 사용하는 것을 권장합니다. ILM 설정으로는 빠른 테스트를 위해 인덱스가 롤오버 되면 2분 뒤에 Warm 노드로 이동하고 10분 뒤에 Delete 되도록 설정하였습니다. 실제로는 이렇게 짧은 시간으로 설정하지는 않고 최고 일단위로 설정합니다.

### Create ILM policy
PUT http://localhost:9200/_ilm/policy/my-ilm-policy
Content-Type: application/json

{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_size": "1gb",
            "max_docs": 10
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "warm": {
        "min_age": "2m",
        "actions": {
          "forcemerge": {
            "max_num_segments": 1
          },
          "shrink": {
            "number_of_shards": 1
          },
          "allocate": {
            "number_of_replicas": 1
          },
          "set_priority": {
            "priority": 50
          }
        }
      },
      "delete": {
        "min_age": "10m",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

데이터가 Warm노드로 이동할 때 shrink 옵션을 통하여 샤드의 개수가 1개로 설정되도록 하였습니다.

 

Component template 생성

component template은 template의 기능을 여러개로 분리하여 관리하는 기능으로 한번 정의해 놓으면 다른 인덱스 template에서도 재사용할 수 있게 하는 기능입니다. 이번 실습에서는 settings에 관련된 template과 mapping에 관련된 template을 분리하여 생성하였습니다.

데이터 스트림에 인덱싱된 모든 문서에는 또는 필드 유형 @timestamp으로 매핑된 필드가 포함되어야 합니다. 인덱스 템플릿이 필드에 대한 매핑을 지정하지 않으면 Elasticsearch는 기본 옵션을 사용하여 필드로 매핑합니다

### create component template
PUT http://localhost:9200/_component_template/my-settings
Content-Type: application/json

{
  "template": {
    "settings": {
      "index.lifecycle.name": "my-ilm-policy",
      "number_of_shards": "2",
      "number_of_replicas": "1",
      "refresh_interval": "5s"
    }
  }
}

### create component template
PUT http://localhost:9200/_component_template/my-mappings
Content-Type: application/json

{
  "template": {
    "mappings": {
      "properties": {
        "@timestamp": {
          "type": "date",
          "format": "date_optional_time||epoch_millis"
        },
        "message": {
          "type": "text"
        },
        "level": {
          "type": "keyword"
        }
      }
    }
  }
}

 

Index template 생성

위에서 생성한 component template을 이용하여 index template을 구성하였습니다.

### create index template
PUT http://localhost:9200/_index_template/my-index-template
Content-Type: application/json

{
  "index_patterns": ["my-index*"],
  "priority": 500,
  "data_stream": {},
  "composed_of": ["my-mappings", "my-settings"]
}

 

Data stream 생성 및 데이터 테스트

위에서 설정한 것을 기반으로 data stream을 생성합니다.

### create data stream
PUT http://localhost:9200/_data_stream/my-index

### data stream 정보 가져오기
GET http://localhost:9200/_data_stream

 

이제 data stream에 실제로 데이터를 넣으면서 확인해보겠습니다.

curl -XPOST http://localhost:9200/my-index/_bulk -H 'Content-Type: application/json' -d '
{ "create": {} }
{ "@timestamp": "1698068232000", "level": "debug", "message": "test message" }
{ "create": {} }
{ "@timestamp": "1698068422000", "level": "debug", "message": "test message" }
{ "create": {} }
{ "@timestamp": "1698068232000", "level": "info", "message": "success test message" }
{ "create": {} }
{ "@timestamp": "1698068432000", "level": "info", "message": "success test message" }
{ "create": {} }
{ "@timestamp": "1698068242000", "level": "error", "message": "failed test message" }
{ "create": {} }
{ "@timestamp": "1698068452000", "level": "error", "message": "failed test message" }
{ "create": {} }
{ "@timestamp": "1698068232000", "level": "debug", "message": "test message" }
{ "create": {} }
{ "@timestamp": "1698068422000", "level": "debug", "message": "test message" }
{ "create": {} }
{ "@timestamp": "1698068232000", "level": "info", "message": "success test message" }
{ "create": {} }
{ "@timestamp": "1698068432000", "level": "info", "message": "success test message" }
{ "create": {} }
{ "@timestamp": "1698068242000", "level": "error", "message": "failed test message" }
{ "create": {} }
{ "@timestamp": "1698068452000", "level": "error", "message": "failed test message" }

'

 

데이터 확인

data stream으로 생성된 인덱스는. ds의 접두사가 붙어서 인덱스가 생성되었습니다. 이제 생성된 인덱스는 ILM에 설정한 것과 같이 2분 뒤에 Warm 노드로 이동하고 10분 뒤에 삭제됩니다.

2분뒤 .ds-my-index-2023.10.24-000001 인덱스가 shrink-f-pw-.ds-my-index-2023.10.24-000001 인덱스로 변하면서 샤드의 개수가 1개로 줄었습니다.

다시 한번 데이터 인덱싱

curl -XPOST http://localhost:9200/my-index/_bulk -H 'Content-Type: application/json' -d '
{ "create": {} }
{ "@timestamp": "1698068232000", "level": "debug", "message": "test message" }
{ "create": {} }
{ "@timestamp": "1698068422000", "level": "debug", "message": "test message" }
{ "create": {} }
{ "@timestamp": "1698068232000", "level": "info", "message": "success test message" }
{ "create": {} }
{ "@timestamp": "1698068432000", "level": "info", "message": "success test message" }
{ "create": {} }
{ "@timestamp": "1698068242000", "level": "error", "message": "failed test message" }
{ "create": {} }
{ "@timestamp": "1698068452000", "level": "error", "message": "failed test message" }
{ "create": {} }
{ "@timestamp": "1698068232000", "level": "debug", "message": "test message" }
{ "create": {} }
{ "@timestamp": "1698068422000", "level": "debug", "message": "test message" }
{ "create": {} }
{ "@timestamp": "1698068232000", "level": "info", "message": "success test message" }
{ "create": {} }
{ "@timestamp": "1698068432000", "level": "info", "message": "success test message" }
{ "create": {} }
{ "@timestamp": "1698068242000", "level": "error", "message": "failed test message" }
{ "create": {} }
{ "@timestamp": "1698068452000", "level": "error", "message": "failed test message" }

'

 

인덱스의 상태를 확인해 보면 shrink-f-pw-.ds-my-index-2023.10.24-000001에 document가 12개 있고 ds-my-index-2023.10.24-000002에 document가 12개 있는 것을 확인할 수 있습니다.

이제 data stream으로 생성될 수 있는 인덱스는 종류별로 생성되었습니다. data stream으로 다음과 같이 검색을 진행하면 data stream으로 생성된 모든 인덱스에서 검색이 된 것을 확인할 수 있습니다.

### search index
POST http://localhost:9200/my-index/_search
Content-Type: application/json

{
  "size": 1000
}

 

참고

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-component-template.html

 

Create or update component template API | Elasticsearch Guide [8.10] | Elastic

Create or update component template APIedit Creates or updates a component template. Component templates are building blocks for constructing index templates that specify index mappings, settings, and aliases. response = client.cluster.put_component_templa

www.elastic.co

https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html

 

Data streams | Elasticsearch Guide [8.10] | Elastic

A data stream lets you store append-only time series data across multiple indices while giving you a single named resource for requests. Data streams are well-suited for logs, events, metrics, and other continuously generated data. You can submit indexing

www.elastic.co

 

Elasticsaerch 설정 변경 및 버전 업그레이드

https://stdhsw.tistory.com/entry/Elasticsearch-version-upgrade-rolling-%EB%B0%A9%EC%8B%9D-in-kubernetes

 

Elasticsearch version upgrade rolling 방식 (in Kubernetes)

Elasticsaerch 설정 변경 및 버전 업그레이드 Elasticsearch를 운영하다 보면 처음에 생각하지 못하거나 점점 Elasticsearch에 대한 지식이 늘면서 운영하는 Elasticsearch의 설정을 변경하고 싶은 순간이 있습

stdhsw.tistory.com

이전 블로그에서는 Rolling방식으로 Elasticsearch의 버전을 업그레이드하는 방법에 대해서 정리하였습니다. 이번에는 Swap 방식을 통하여 Elasticsearch의 Version을 Upgrade 하는 방법에 대해서 정리하였습니다.

클러스터 구성

이번 실습 Elasticsearch의 클러스터 구성으로는 Master node 3대, Data node 4대로 구성하여 진행하였습니다.

Elasticsearch의 버전은 8.5.1을 사용하였습니다.

 

Master node helm values (value-master.yaml)

---
createCert: false
clusterName: "elasticsearch"
nodeGroup: "master"
replicas: 3
roles:
  - master

image: "docker.elastic.co/elasticsearch/elasticsearch"
imageTag: "8.5.1"
imagePullPolicy: "IfNotPresent"

protocol: http
service:
  type: NodePort
  nodePort: "30000"

 

Data node helm values (value-data.yaml)

---
createCert: false
clusterName: "elasticsearch"
nodeGroup: "data"
replicas: 4

image: "docker.elastic.co/elasticsearch/elasticsearch"
imageTag: "8.5.1"
imagePullPolicy: "IfNotPresent"

roles:
  - data
  - data_content
  - data_hot
  - data_warm
  - data_cold
  - ingest
  - ml
  - remote_cluster_client
  - transform
  
protocol: http
service:
  type: NodePort
  nodePort: "30001"

 

Makefile 구성

PREFIX := elasticsearch
TIMEOUT := 1200s

install:
	helm upgrade --wait --timeout=$(TIMEOUT) --install --values value-master.yaml $(PREFIX)-master ./elasticsearch
	helm upgrade --wait --timeout=$(TIMEOUT) --install --values value-data.yaml $(PREFIX)-data ./elasticsearch

 

설치 및 결과 확인

# elasticsearch 설치
> make install

# 파드 확인
> kubectl get po                                                                                                                                               ─╯
NAME                     READY   STATUS    RESTARTS   AGE
elasticsearch-data-0     1/1     Running   0          25m
elasticsearch-data-1     1/1     Running   0          25m
elasticsearch-data-2     1/1     Running   0          25m
elasticsearch-data-3     1/1     Running   0          25m
elasticsearch-master-0   1/1     Running   0          26m
elasticsearch-master-1   1/1     Running   0          26m
elasticsearch-master-2   1/1     Running   0          26m

# 버전 확인
GET http://localhost:30001/_nodes/elasticsearch-data-0

{
  "_nodes": {
    "total": 1,
    "successful": 1,
    "failed": 0
  },
  "cluster_name": "elasticsearch",
  "nodes": {
    "4312Dy1YSS6tnBs28Pn4tg": {
      "name": "elasticsearch-data-0",
      "transport_address": "10.1.0.63:9300",
      "host": "10.1.0.63",
      "ip": "10.1.0.63",
      "version": "8.5.1",
      생략 ....

정상적으로 클러스터가 구성되었습니다. 이제 해당 클러스터의 버전을 업그레이드 작업을 진행하겠습니다.

새로운 노드 구성

rolling upgrade 방식과는 다르게 이번에는 새로운 노드 그룹을 생성합니다.

New Master node helm values (value-newmaster.yaml)

---
createCert: false
clusterName: "elasticsearch"
nodeGroup: "new-master"
replicas: 3
roles:
  - master

image: "docker.elastic.co/elasticsearch/elasticsearch"
imageTag: "8.6.0"
imagePullPolicy: "IfNotPresent"

protocol: http
service:
  type: NodePort
  nodePort: "30000"

 

New Data node helm values (value-newdata.yaml)

---
createCert: false
clusterName: "elasticsearch"
nodeGroup: "new-data"
replicas: 4

image: "docker.elastic.co/elasticsearch/elasticsearch"
imageTag: "8.6.0"
imagePullPolicy: "IfNotPresent"

roles:
  - data
  - data_content
  - data_hot
  - data_warm
  - data_cold
  - ingest
  - ml
  - remote_cluster_client
  - transform
  
protocol: http
service:
  type: NodePort
  nodePort: "30001"

 

Makefile 구성

PREFIX := elasticsearch
TIMEOUT := 1200s

install:
	helm upgrade --wait --timeout=$(TIMEOUT) --install --values value-newmaster.yaml $(PREFIX)-new-master ./elasticsearch
	helm upgrade --wait --timeout=$(TIMEOUT) --install --values value-newdata.yaml $(PREFIX)-new-data ./elasticsearch

 

설치

> make install

현재 클러스터 상태

위와 같이 구성하였다면 클러스터의 상태는 다음과 같을 것입니다.

기존 노드들 하나씩 Scale In

기존의 마스터, 데이터 노드의 Statfulset의 Scale을 하나씩 줄여 줍니다.

> kubectl scale sts elasticsearch-master --replicas=2
# 위 작업 완료 시 
> kubectl scale sts elasticsearch-master --replicas=1
# 위 작업 완료 시 
> kubectl scale sts elasticsearch-master --replicas=0

# 위 작업 완료 시 
> kubectl scale sts elasticsearch-data --replicas=3
# 위 작업 완료 시 
> kubectl scale sts elasticsearch-data --replicas=2
# 위 작업 완료 시 
> kubectl scale sts elasticsearch-data --replicas=1
# 위 작업 완료 시 
> kubectl scale sts elasticsearch-data --replicas=0

# 모든 스케일이 정상적으로 내려가면 기존 자원 반납
> helm delete elasticsearch-master
> helm delete elasticsearch-data

작업양은 많지만 보다 더 빠른 방법

Snapshot을 이용

일시적으로 오늘 이전의 데이터는 잠시 출력이 되지 않아도 된다면 사용하는 방법입니다.

  • 상태 복구를 위한 스냅샷 생성
  • 오늘 날짜의 데이터를 제외한 모든 인덱스를 삭제
  • 오늘 날짜의 인덱스 샤드를 Reroute를 이용하여 원하는 노드로 배치
  • 샤드들이 이동이 전부 정상적으로 이루어지면 기존 노드 Scale In
  • 오늘 이전의 날짜의 데이터를 스냅샷으로 복구

reroute 사용법은 아래 블로그에서 확인해 주세요

https://stdhsw.tistory.com/entry/Elasticsearch-reroute-%EC%83%A4%EB%93%9C%EB%A5%BC-%EB%8B%A4%EB%A5%B8-%EB%85%B8%EB%93%9C%EB%A1%9C-%EC%9D%B4%EB%8F%99%EC%8B%9C%ED%82%A4%EA%B8%B0

 

Elasticsearch reroute 샤드를 다른 노드로 이동시키기

reroute Elasticsearch에서는 기본적으로 각각의 노드에 샤드가 일정 비율로 균등하게 배치되도록 하고 있습니다. 그래도 뜻하지 않게 특정 노드에 샤드가 많이 배치될 수도 있고 어떤 인덱스는 샤드

stdhsw.tistory.com

 

Elasticsaerch 설정 변경 및 버전 업그레이드

Elasticsearch를 운영하다 보면 처음에 생각하지 못하거나 점점 Elasticsearch에 대한 지식이 늘면서 운영하는 Elasticsearch의 설정을 변경하고 싶은 순간이 있습니다. 뿐만 아니라 보안적인 이슈로 인하여 해당 이슈가 해결된 Version으로 업그레이드를 해야 하는 순간도 겪을 수 있습니다. Elasticsearch의 버전 및 설정을 바꾸는 방법이 여러 가지 있지만 이번 문서에서는 그중에 rolling 방식으로 변경하는 방법에 대해서 정리하였습니다.

주의사항

이번 실습 환경은 Kubernetes 환경에서 실습을 진행하였습니다.

  • Shard의 Replication이 존재하지 않으면 데이터가 손실될 수 있습니다.
  • Replication partition의 개수보다 작은 수의 노드가 한 번에 rolling update 되어야 한다는 것입니다. (이해하기 어려우면 그냥 노드 하나씩 업그레이드가 진행되면 됩니다.)
  • Kubernetes에서는 기본적으로 파드 순서의 역순으로 파드가 업데이트를 진행합니다. 그렇기 때문에 한 번에 드랍되는 문제는 크게 걱정하지 않아도 되지만 억지로 파드를 드랍하는 행위는 해서는 안됩니다.
  • 마스터 노드 또한 업그레이드를 진행해야 되기 때문에 마스터 Role을 가진 노드의 개수가 2대 이상 유지되어야 합니다.
  • elasticsearch의 helm으로 구성하였습니다. (https://artifacthub.io/packages/helm/elastic/elasticsearch)

클러스터 구성

이번 실습 Elasticsearch의 클러스터 구성으로는 Master node 3대, Data node 4대로 구성하여 진행하였습니다.

Elasticsearch의 버전은 8.5.1을 사용하였습니다.

 

Master node helm values (value-master.yaml)

---
createCert: false
clusterName: "elasticsearch"
nodeGroup: "master"
replicas: 3
roles:
  - master

image: "docker.elastic.co/elasticsearch/elasticsearch"
imageTag: "8.5.1"
imagePullPolicy: "IfNotPresent"

protocol: http
service:
  type: NodePort
  nodePort: "30000"

 

Data node helm values (value-data.yaml)

---
createCert: false
clusterName: "elasticsearch"
nodeGroup: "data"
replicas: 4

image: "docker.elastic.co/elasticsearch/elasticsearch"
imageTag: "8.5.1"
imagePullPolicy: "IfNotPresent"

roles:
  - data
  - data_content
  - data_hot
  - data_warm
  - data_cold
  - ingest
  - ml
  - remote_cluster_client
  - transform
  
protocol: http
service:
  type: NodePort
  nodePort: "30001"

 

Makefile 구성

PREFIX := elasticsearch
TIMEOUT := 1200s

install:
	helm upgrade --wait --timeout=$(TIMEOUT) --install --values value-master.yaml $(PREFIX)-master ./elasticsearch
	helm upgrade --wait --timeout=$(TIMEOUT) --install --values value-data.yaml $(PREFIX)-data ./elasticsearch

 

설치 및 결과 확인

# elasticsearch 설치
> make install

# 파드 확인
> kubectl get po                                                                                                                                               ─╯
NAME                     READY   STATUS    RESTARTS   AGE
elasticsearch-data-0     1/1     Running   0          25m
elasticsearch-data-1     1/1     Running   0          25m
elasticsearch-data-2     1/1     Running   0          25m
elasticsearch-data-3     1/1     Running   0          25m
elasticsearch-master-0   1/1     Running   0          26m
elasticsearch-master-1   1/1     Running   0          26m
elasticsearch-master-2   1/1     Running   0          26m

# 버전 확인
GET http://localhost:30001/_nodes/elasticsearch-data-0

{
  "_nodes": {
    "total": 1,
    "successful": 1,
    "failed": 0
  },
  "cluster_name": "elasticsearch",
  "nodes": {
    "4312Dy1YSS6tnBs28Pn4tg": {
      "name": "elasticsearch-data-0",
      "transport_address": "10.1.0.63:9300",
      "host": "10.1.0.63",
      "ip": "10.1.0.63",
      "version": "8.5.1",
      생략 ....

정상적으로 클러스터가 구성되었습니다. 이제 해당 클러스터의 버전을 업그레이드 작업을 진행하겠습니다.

Elasticsearch version upgrade

기존 8.5.1 버전을 8.6.0 버전으로 업그레이 작업을 진행하겠습니다.

 

Master node helm values (value-master.yaml)

---
createCert: false
clusterName: "elasticsearch"
nodeGroup: "master"
replicas: 3
roles:
  - master

image: "docker.elastic.co/elasticsearch/elasticsearch"
imageTag: "8.6.0"
imagePullPolicy: "IfNotPresent"

protocol: http
service:
  type: NodePort
  nodePort: "30000"

 

Data node helm values (value-data.yaml)

---
createCert: false
clusterName: "elasticsearch"
nodeGroup: "data"
replicas: 4

image: "docker.elastic.co/elasticsearch/elasticsearch"
imageTag: "8.6.0"
imagePullPolicy: "IfNotPresent"

roles:
  - data
  - data_content
  - data_hot
  - data_warm
  - data_cold
  - ingest
  - ml
  - remote_cluster_client
  - transform
  
protocol: http
service:
  type: NodePort
  nodePort: "30001"

 

업그레이드 및 결과 확인

helm 설치 방법을 upgrade 형식으로 Makefile을 만들었기 때문에 make install 명령어를 통하여 업그레이드가 가능합니다.

# 버전 업그레이드
> make install

> GET http://localhost:30001/_nodes/elasticsearch-data-0
{
  "_nodes": {
    "total": 1,
    "successful": 1,
    "failed": 0
  },
  "cluster_name": "elasticsearch",
  "nodes": {
    "4312Dy1YSS6tnBs28Pn4tg": {
      "name": "elasticsearch-data-0",
      "transport_address": "10.1.0.72:9300",
      "host": "10.1.0.72",
      "ip": "10.1.0.72",
      "version": "8.6.0",
      생략 ....

업그레이드 순서로는 Statefulset의 가장 뒷 번호부터 업그레이드가 진행되며 데이터가 많으면 많을 수록 오랜 시간이 걸리게 됩니다.

 

클러스터 업그레이 동작 방식

현재 클러스터의 구성으로는 마스터 노드는 데이터를 가지지 않습니다. 마스터 노드의 롤링 업데이트 진행 방법은 현재 마스터 역할을 하고 있는 노드가 드랍될 때 다른 마스터 권한을 가진 노드에게 마스터 권한을 넘겨주면서 롤링 업데이트가 진행됩니다.

데이터 노드들은 마스터 노드와 다르게 데이터를 가지고 있는데요 데이터 노드가 순차적으로 드랍될 때 드랍된 샤드의 데이터를 Replica shard를 통하여 다른 노드에 샤드를 새롭게 구성하면서 롤링 업데이트를 진행합니다.

파란색은 현재 Primary shard를 뜻하고 흰색은 Replica shard를 뜻합니다. 그리고 보라색은 복구된 Shard를 의미합니다.

시작하기 앞서 Ingest Pipeline이란

Elasticsearch에서 Ingest Pipeline은 데이터를 색인하기 전에 전처리 및 변환 작업을 수행하는 개념입니다. Ingest Pipeline은 데이터를 가져와서 필요한 형식으로 변환하거나 데이터를 필터링하고 파싱 하는 등의 작업 및 데이터의 일관성을 유지하는데 매우 유용하여 효율적인 데이터 처리를 가능하게 하는 중요한 기능입니다.

그림에 보이는 것과 같이 데이터가 들어오면 Ingest pipeline을 통하여 순차적으로 프로세서가 실행되고 결과물은 해당 인덱스에 저장됩니다. Ingest Pipeline이 동작할 수 있도록 해당 노드에는 ingest role이 등록되어야 하지만 일반적으로 데이터 노드에 많이 사용합니다. 허나 프로세서가 많아 부하를 많이 받을 경우 노드를 별도로 분리하는 방법을 고려해 볼 필요성이 있습니다.

이번 문서에서는 여러 유용한 파이프라인 프로세서 중 json에 대해서 실습해봤습니다.

만약 JSON Pipeline을 사용하지 않았다면

만약 JSON Pipeline을 사용하지 않았다면 데이터가 Text형식으로 문자열 그대로 저장되어 통계를 내거나 보다 자세한 데이터 처리를 하는데 많은 제약이 있을 수 있습니다. 아래는 JSON Pipeline을 사용하지 않았을 때의 데이터 형식입니다.

### 데이터 검색
GET http://localhost:9200/my-manifests/_search

### 결과
{
  "took": 4,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "my-manifests",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "cluster": "my-cluster",
          "manifest": "{\"cluster\":\"my-cluster\",\"namespace\":\"my-namespace\",\"name\":\"my-name\",\"version\":\"1.0.0\"}"
        }
      }
    ]
  }
}

 

JSON Pipeline

개발을 할 때 엔지니어가 json 데이터를 Elasticsearch 쿼리로 변경하여 데이터를 넣는 형식으로 개발을 진행한다면 JSON Pipeline은 굳이 필요 없는 기능으로 여길 수 있지만 개발을 하다 보면 앞으로 들어오는 json 데이터의 형식을 모르거나 json 문자열 데이터를 변경할 수 없는 환경에서 개발을 진행해야 되는 경우도 많습니다. 저 같은 경우 Kubernetes의 Resource Manifest 데이터를 가공 처리하는데 Manifest데이터는 yaml 또는 json 데이터로 수집되는데 어떠한 key와 value가 들어오는지 모든 것을 파악하기 어려운 상황을 겪었습니다. 이런 경우 JSON Pipeline을 사용하면 유용하게 데이터를 처리할 수 있을 것 같아 실습 내용을 정리하였습니다.

JSON Pipeline 생성

### 파이프라인 생성
PUT http://localhost:9200/_ingest/pipeline/index-json
Content-Type: application/json

{
  "description" : "ingest pipeline for json",
  "processors" : [
    {
      "json" : {
        "field" : "manifest",
        "target_field": "manifest_json"
      }
    }
  ]
}

파이프라인 생성 파라미터

  • json : 파이프라인 프로세서 중 JSON 프로세서를 사용합니다.
  • json.field : 문자열 형식으로 들어오는 json 필드를 명시합니다.
  • json.target_field : 문자열 필드 형식으로 들어온 데이터를 어떤 필드에 저장할지 지정합니다.

JSON Pipeline 테스트

파이프라인은 simulate라는 기능을 이용하여 내 생각대로 결과물이 나오는지 확인할 수 있습니다.

### 파이프라인 시뮬레이터
POST http://localhost:9200/_ingest/pipeline/_simulate
Content-Type: application/json

{
  "pipeline": {
    "description" : "ingest pipeline for json",
    "processors" : [
      {
        "json" : {
          "field" : "manifest",
          "target_field": "manifest_json"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "cluster": "my-cluster",
        "manifest": "{\"cluster\":\"my-cluster\",\"namespace\":\"my-namespace\",\"name\":\"my-name\",\"version\":\"1.0.0\"}"
      }
    }
  ]
}

실제 데이터로 JSON Pipeline 적용되었는지 확인

먼저 테스트를 하기 위한 인덱스부터 정의하도록 하겠습니다. JSON Pipeline으로 생성되는 모든 필드들을 지정하기는 매우 번거롭기 때문에 dynamic_template을 통하여 JSON Pipeline이 생성하는 필드를 지정하였고 데이터를 넣을 때는 pipeline=index-json을 통해서 파이프라인을 지정해 줍니다.

### 인덱스 생성
PUT http://localhost:9200/my-manifests
Content-Type: application/json

{
  "mappings": {
    "dynamic_templates": [
      {
        "default_strings": {
          "match_mapping_type": "string",
          "mapping": {
            "type": "keyword"
          }
        }
      }
    ],
    "properties": {
      "cluster": {
        "type": "keyword"
      },
      "manifest": {
        "type": "text"
      }
    }
  }
}

### 인덱스 데이터 삽입1
POST http://localhost:9200/my-manifests/_doc/1?pipeline=index-json
Content-Type: application/json

{
  "cluster": "my-cluster",
  "manifest": "{\"cluster\":\"my-cluster\",\"namespace\":\"my-namespace\",\"name\":\"my-name\",\"version\":\"1.0.0\"}"
}

### 인덱스 데이터 삽입2
POST http://localhost:9200/my-manifests/_doc/2?pipeline=index-json
Content-Type: application/json

{
  "cluster": "my-cluster",
  "manifest": "{\"cluster\":\"my-cluster\",\"namespace\":\"my-namespace\",\"name\":\"test\",\"version\":\"2.0.0\"}"
}

결과 확인

manifest_json 필드를 통하여 json의 데이터가 dynamic_template으로 지정한 타입으로 필드가 생성된 것을 확인할 수 있습니다.  확실한 차이점을 보기 위해 위에서 정의한 "만약 JSON Pipeline을 사용하지 않았다면"의 결과와 비교해 보세요.

### 데이터 검색
GET http://localhost:9200/my-manifests/_search

### 결과 확인
{
  "took": 6,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 2,
      "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "my-manifests",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "cluster": "my-cluster",
          "manifest": "{\"cluster\":\"my-cluster\",\"namespace\":\"my-namespace\",\"name\":\"my-name\",\"version\":\"1.0.0\"}",
          "manifest_json": {
            "cluster": "my-cluster",
            "namespace": "my-namespace",
            "name": "my-name",
            "version": "1.0.0"
          }
        }
      },
      {
        "_index": "my-manifests",
        "_id": "2",
        "_score": 1.0,
        "_source": {
          "cluster": "my-cluster",
          "manifest": "{\"cluster\":\"my-cluster\",\"namespace\":\"my-namespace\",\"name\":\"test\",\"version\":\"2.0.0\"}",
          "manifest_json": {
            "cluster": "my-cluster",
            "namespace": "my-namespace",
            "name": "test",
            "version": "2.0.0"
          }
        }
      }
    ]
  }
}

시작하기 앞서 Ingest Pipeline이란

Elasticsearch에서 Ingest Pipeline은 데이터를 색인하기 전에 전처리 및 변환 작업을 수행하는 개념입니다. Ingest Pipeline은 데이터를 가져와서 필요한 형식으로 변환하거나 데이터를 필터링하고 파싱 하는 등의 작업 및 데이터의 일관성을 유지하는데 매우 유용하여 효율적인 데이터 처리를 가능하게 하는 중요한 기능입니다.

그림에 보이는 것과 같이 데이터가 들어오면 Ingest pipeline을 통하여 순차적으로 프로세서가 실행되고 결과물은 해당 인덱스에 저장됩니다. Ingest Pipeline이 동작할 수 있도록 해당 노드에는 ingest role이 등록되어야 하지만 일반적으로 데이터 노드에 많이 사용합니다. 허나 프로세서가 많아 부하를 많이 받을 경우 노드를 별도로 분리하는 방법을 고려해 볼 필요성이 있습니다.

이번 문서에서는 여러 유용한 파이프라인 프로세서 중 Split에 대해서 실습해봤습니다.

Split processor란

Elasticsearch Ingest Pipeline에서 Split Processor는 텍스트 필드를 분할하거나 분리하는 데 사용되는 프로세서입니다. Split Processor를 사용하면 텍스트 필드에서 구분자를 기반으로 하여 여러 하위 텍스트 필드로 분리하거나 특정 패턴을 기반으로 필드를 분할할 수 있습니다. 이는 로그 데이터 또는 다양한 유형의 텍스트 데이터를 구조화하고 필요한 정보를 추출하는 데 유용하게 사용될 수 있습니다.

파이프라인 Split processor 생성하기

### 파이프라인 생성
PUT http://localhost:9200/_ingest/pipeline/user-split
Content-Type: application/json

{
  "description" : "Split user field",
  "processors" : [
    {
      "split" : {
        "field" : "users",
        "separator": ","
      }
    }
  ]
}

user-split 파이프라인 파라미터

  • split : 파이프라인에는 여러 프로세서를 등록할 수 있는데 이번 실습에서는 split 프로세서만 선언하였습니다.
  • split.field : 데이터를 분할할 필드명을 정의합니다.
  • split.separator: 데이터를 분할하는 기준이 되는 문자 캐릭터를 정의합니다.

파이프라인 정상동작 테스트

Elasticsearch에서는 simulate라는 기능을 제공하여 해당 파이프라인의 동작을 엔지니어의 생각대로 동작하는지 테스트를 해볼 수 있습니다.

### 파이프라인 시뮬레이터
POST http://localhost:9200/_ingest/pipeline/_simulate
Content-Type: application/json

{
  "pipeline": {
    "description" : "Split user field",
    "processors" : [
      {
        "split" : {
          "field" : "users",
          "separator": ","
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "users": "user1,user2,user3"
      }
    }
  ]
}

실제 데이터 인덱싱을 통한 결과 확인

파이프라인 테스트가 생각대로 정상적이었다면 이제 실제 데이터를 인덱싱하여 파이프라인이 정상적으로 동작하는지 실습해 보겠습니다.

### 데이터 삽입1
POST http://localhost:30001/user-connected-log/_doc/1?pipeline=user-split
Content-Type: application/json

{
  "level": "info",
  "users": "user1,user2,user3",
  "message": "user connected"
}

### 데이터 삽입2
POST http://localhost:30001/user-connected-log/_doc/2?pipeline=user-split
Content-Type: application/json

{
  "level": "info",
  "users": "user3,user4,user5",
  "message": "user connected"
}

결과 확인

doc_id 1에서는 user1, user2, user3의 데이터가 인덱싱 되었고 doc_id 2에서는 user3, user4, user5의 데이터가 인덱싱 되었습니다. 여기서 users에 user1로만 검색하면 doc_id 1의 데이터만 결과로 추력 되는 것을 확인할 수 있습니다.

### 인덱스 검색
GET http://localhost:30001/user-connected-log/_search
Content-Type: application/json

{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "users": "user1"
          }
        }
      ]
    }
  }
}


### 결과
{
  "took": 5,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 0.6931471,
    "hits": [
      {
        "_index": "user-connected-log",
        "_id": "1",
        "_score": 0.6931471,
        "_source": {
          "message": "user connected",
          "level": "info",
          "users": [
            "user1",
            "user2",
            "user3"
          ]
        }
      }
    ]
  }
}

 

'Elasticsearch > pipeline' 카테고리의 다른 글

Elasticsearch Ingest pipeline json  (0) 2023.10.20
Elasticsearch Ingest pipeline enrich processor  (0) 2023.10.18

시작하기 앞서 Ingest Pipeline이란

Elasticsearch에서 Ingest Pipeline은 데이터를 색인하기 전에 전처리 및 변환 작업을 수행하는 개념입니다. Ingest Pipeline은 데이터를 가져와서 필요한 형식으로 변환하거나 데이터를 필터링하고 파싱 하는 등의 작업 및 데이터의 일관성을 유지하는데 매우 유용하여 효율적인 데이터 처리를 가능하게 하는 중요한 기능입니다.

그림에 보이는 것과 같이 데이터가 들어오면 Ingest pipeline을 통하여 순차적으로 프로세서가 실행되고 결과물은 해당 인덱스에 저장됩니다. Ingest Pipeline이 동작할 수 있도록 해당 노드에는 ingest role이 등록되어야 하지만 일반적으로 데이터 노드에 많이 사용합니다. 허나 프로세서가 많아 부하를 많이 받을 경우 노드를 별도로 분리하는 방법을 고려해 볼 필요성이 있습니다.

이번 문서에서는 여러 유용한 파이프라인 프로세서 중 Enrichment에 대해서 실습해봤습니다.

Enrich Processor

RDB에서 간단한 기능인 JOIN이라는 기능을 Elasticsearch에서 역색인 구조를 가지고 있는 Elasticsearch에서는 구현하는데는 아주 많은 리소스와 페널티가 있습니다. 그렇기 때문에 JOIN을 수행하는 행위는 Elasticsearch에서 권장하지 않고 있습니다.

Elasticsearch의 Enrich Processor는 Ingest pipeline의 기능중 하나로 외부 데이터 소스를 활용하여 기존 인덱스 데이터를 이용하여 추가 정보를 추출하거나 결합할 수 있는 기능을 제공하여 RDB에 JOIN과 비슷한 기능을 가능하게 되었습니다.

이러한 Enrich processor를 이용하여 데이터를 추가하는 방법을 실습해봤습니다.

작업 내용

미리 저장된 my-service 인덱스의 데이터를 enrich.field를 기반으로 추출하여 my-log 인덱스가 인덱싱 될 때 같이 데이터가 기록되도록 구현하였습니다.

소스 인덱스 생성하기

먼저 Enrich policy를 생성하기 전에 데이터를 추가하기 위한 소스 인덱스를 생성하고 데이터를 미리 삽입하는 작업을 수행하겠습니다.

### 서비스 인덱스 생성
PUT http://localhost:9200/my-service
Content-Type: application/json

{
  "settings": {
    "index": {
      "number_of_shards": 4,
      "number_of_replicas": 1,
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "properties": {
      "service": { "type": "keyword" },
      "endpoint": { "type": "keyword" },
      "description": { "type": "text" }
    }
  }
}

### 서비스 데이터 삽입1
PUT http://localhost:9200/my-service/_doc/1?refresh=wait_for
Content-Type: application/json

{
  "service": "payment",
  "endpoint": "localhot:32000",
  "description": "This service is for selling products"
}

### 서비스 데이터 삽입2
PUT http://localhost:9200/my-service/_doc/2?refresh=wait_for
Content-Type: application/json

{
  "service": "login",
  "endpoint": "localhot:31000",
  "description": "This service is for users to login"
}

Enrich processor 생성 및 실행

### 엔리치 생성
PUT http://localhost:9200/_enrich/policy/my-enrich
Content-Type: application/json

{
  "match": {
    "indices": "my-service",
    "match_field": "service",
    "enrich_fields": ["endpoint", "description"]
  }
}

### 엔리치 실행
POST http://localhost:9200/_enrich/policy/my-enrich/_execute

enrich policy 생성 파라미터

  • match.indices : 데이터를 추출할 소스 인덱스를 지정합니다.
  • match.match_field : 소스 인덱스에 서로 매칭할 필드를 지정합니다.
  • match.enrich_fields : 매칭 후 추출될 데이터 필드를 지정합니다.

파이프라인 생성

파이프라인을 생성할 때는 processor를 리스트 형식으로 생성하게 되는데 리스트의 순서대로 processor가 실행됩니다. 

### 파이프라인 생성
PUT http://localhost:9200/_ingest/pipeline/service_join
Content-Type: application/json

{
  "description" : "Service information using enrich",
  "processors" : [
    {
      "enrich" : {
        "policy_name": "my-enrich",
        "field" : "service",
        "target_field": "service_info",
        "max_matches": "1"
      }
    }
  ]
}

파이프라인 생성 파라미터

  • enrich.policy_name : 위에서 생성한 enrich policy
  • enrich.field : my-log인덱스에서 매칭할 필드를 지정합니다.
  • enrich.target_field : enrich policy에서 추출된 (endpoint, description) 데이터를 넣을 필드를 설정합니다.
  • enrich.max_matches : 최대로 매칭할 수 있는 개수를 설정합니다. 간단한 테스트이므로 1로 설정하였습니다.

my-log 인덱스 생성 및 데이터 삽입

데이터가 추가될 때 pipeline을 명시하여 위에서 설정한 Ingest pipeline이 동작할 수 있도록 합니다.

PUT http://localhost:9200/my-log
Content-Type: application/json

{
  "settings": {
    "index": {
      "number_of_shards": 4,
      "number_of_replicas": 1,
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "properties": {
      "service": { "type": "keyword" },
      "level": { "type": "keyword" },
      "message": {"type": "text" }
    }
  }
}

### 로그 데이터 삽입1
PUT http://localhost:9200/my-log/_doc/1?pipeline=service_join
Content-Type: application/json

{
  "service": "login",
  "level": "debug",
  "message": "user A logged in successfully"
}

### 로그 데이터 삽입2
PUT http://localhost:9200/my-log/_doc/2?pipeline=service_join
Content-Type: application/json

{
"service": "login",
"level": "error",
"message": "user B login failed"
}

### 로그 데이터 삽입3
PUT http://localhost:9200/my-log/_doc/3?pipeline=service_join
Content-Type: application/json

{
  "service": "payment",
  "level": "debug",
  "message": "user a successfully paid for the product"
}

실제로 우리는 my-log 인덱스에 데이터를 넣을 때 service, level, message 데이터만 인덱싱하였습니다. 이제 결과물을 확인하면 enrich를 통하여 새롭게 정의된 service_info 필드를 통해 service의 데이터가 추가된 것을 확인할 수 있습니다.

GET http://localhost:9200/my-log/_search

### 결과
{
  "took": 8,
  "timed_out": false,
  "_shards": {
    "total": 4,
    "successful": 4,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "my-log",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "service_info": {
            "description": "This service is for users to login",
            "endpoint": "localhot:30000",
            "service": "login"
          },
          "message": "user A logged in successfully",
          "level": "debug",
          "service": "login"
        }
      },
      {
        "_index": "my-log",
        "_id": "2",
        "_score": 1.0,
        "_source": {
          "service_info": {
            "description": "This service is for users to login",
            "endpoint": "localhot:30000",
            "service": "login"
          },
          "message": "user B login failed",
          "level": "error",
          "service": "login"
        }
      },
      {
        "_index": "my-log",
        "_id": "3",
        "_score": 1.0,
        "_source": {
          "service_info": {
            "description": "This service is for selling products",
            "endpoint": "localhot:30001",
            "service": "payment"
          },
          "message": "user a successfully paid for the product",
          "level": "debug",
          "service": "payment"
        }
      }
    ]
  }
}

 

참조

https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html

 

Ingest pipelines | Elasticsearch Guide [8.10] | Elastic

Ingest pipelines let you perform common transformations on your data before indexing. For example, you can use pipelines to remove fields, extract values from text, and enrich your data. A pipeline consists of a series of configurable tasks called processo

www.elastic.co

https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-setup.html

 

Set up an enrich processor | Elasticsearch Guide [8.10] | Elastic

The enrich processor performs several operations and may impact the speed of your ingest pipeline. We strongly recommend testing and benchmarking your enrich processors before deploying them in production. We do not recommend using the enrich processor to

www.elastic.co

https://www.elastic.co/guide/en/elasticsearch/reference/current/put-enrich-policy-api.html

 

Create enrich policy API | Elasticsearch Guide [8.10] | Elastic

Once created, you can’t update or change an enrich policy. Instead, you can: Create and execute a new enrich policy. Replace the previous enrich policy with the new enrich policy in any in-use enrich processors. Use the delete enrich policy API to delete

www.elastic.co

 

'Elasticsearch > pipeline' 카테고리의 다른 글

Elasticsearch Ingest pipeline json  (0) 2023.10.20
Elasticsearch Ingest pipeline split processor  (0) 2023.10.19

ILM policy로 data phases 설정

data phases를 구성한다는 것은 성능을 올린다기보다는 자원을 효율적으로 사용하는 방법입니다. Elasticsearch 인덱스의 document를 수정하거나 삭제하는 것은 역색인 방식에서는 많은 자원을 소모하게 되기 때문에 데이터를 document 단위로 관리를 하는 것보다는 주로 인덱스 단위로 삭제하여 효율적으로 스토리지 사용량을 유지할 수 있습니다. 그래서 Elasticsearch에서는 ILM(Index Lifecycle Management)를 이용하여 인덱스의 생명주기를 자동으로 관리해 주는 기능을 제공하고 있습니다. 이번 문서에서는 ILM을 이용하여 자원을 효율적으로 관리하는 방법에 대하여 기록하였습니다.

 

Data phases 수행을 위한 노드 설정

많은 사용자가 로그를 분석하기 위해서 Elasticsearch를 사용합니다. 로그의 데이터를 효율적으로 관리하기 위해 많은 사용자들이 로그 인덱스를 날짜 별로 생성하여 사용하고 당일 로그를 분석하기 위해 해당 인덱스에서 많은 조회를 수행합니다. 그리고 다음날이 되면 어제 생성되었던 인덱스에서는 데이터가 들어가지 않고 오늘 날짜로 된 로그 인덱스가 새롭게 생성되어 오늘 날짜의 인덱스에 데이터가 들어가게 됩니다. 이러한 특징을 이용하여 Data phases 설정을 통하여 각각의 노드에 역할을 부여하고 해당 역할에 어울리는 자원을 할당하여 자원을 효율적으로 사용할 수 있습니다. 이번 실습을 위해 저는 Master 3대, Hot 2대, Warm 2대, Cold 2대를 구성하여 테스트를 진행하였습니다.

 

Hot node

hot 노드에는 데이터가 현재도 인덱싱 되며 자주 사용되는 데이터를 보관합니다. 데이터를 읽고 쓰는 작업이 빈번하게 이루어지기 때문에 SSD 스토리지를 사용하는 것을 권장합니다.

- roles : data, data_content, data_hot

 

Warm node

데이터가 거의 인덱싱 되지 않으며, 데이터를 읽는데 주로 사용하는 노드입니다.

- roles : data_warm

 

Cold node

데이터도 인덱싱 되지 않으며, 데이터를 읽는 쿼리 또한 거의 사용하지 않는 노드입니다. 주로 Searchable snapshot을 cold node를 통하여 사용하지만 Searchable snapshot은 Enterpise license에서 제공하기 때문에 이번 실습에서는 Searchable snapshot을 사용하지 못하였습니다.

- roles : data_cold

 

ILM Policy 생성

ILM Policy 생성

PUT http://localhost:9200/_ilm/policy/my_log_ilm
Content-Type: application/json

{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_age": "1d",
            "max_size": "10gb",
            "max_docs": 10
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "warm": {
        "min_age": "2m",
        "actions": {
          "forcemerge": {
            "max_num_segments": 1
          },
          "shrink": {
            "number_of_shards": 2
          },
          "allocate": {
            "number_of_replicas": 1
          },
          "set_priority": {
            "priority": 50
          }
        }
      },
      "cold": {
        "min_age": "4m",
        "actions": {
          "allocate": {
            "number_of_replicas": 0
          },
          "set_priority": {
            "priority": 10
          },
          "freeze": {}
        }
      },
      "delete": {
        "min_age": "6m",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

 

Hot tier 설정 파라미터 설명

  • min_age: "0ms" 인덱스가 생성되자마자 hot tier로 지정됩니다.
  • actions.rollover.max_age: rollover가 발생되는 조건으로 인덱스가 생성되고 하루(1d)가 지나면 인덱스가 rollover 됩니다.
  • actions.rollover.max_size: rollover가 발생되는 조건으로 인덱스의 크기가 10gb를 넘어가면 인덱스가 rollover 됩니다.
  • actions.rollover.max_docs: rollover가 발생되는 조건으로 인덱스의 document의 개수가 10개 넘어가면 rollover 됩니다. 이 설정은 테스트를 위해 아주 작은 값을 넣었습니다. 실제로는 입력하지 않다도 됩니다.
  • actions.set_priority.priority: 노드의 문제가 생길 경우 hot 데이터를 복구하는 것은 우선순위를 가장 높게 설정합니다.

Warm tier 설정 파라미터 설명

  • min_age: 인덱스가 rollover된지 2분이 지나면 해당 인덱스는 Warm tier로 설정됩니다.
  • actions.forcemerge.max_num_segments: 이 설정은 hot tier에서 여러 루씬 세그먼트를 사용한 것을 하나로 합치는 것을 의미합니다. 더 이상 데이터가 해당 인덱스에 써지지 않을 경우에 이 설정을 넣는 것을 권장합니다.
  • actions.shrink.number_of_shards: 여러 샤드로 나누어진 것을 2개로 합칩니다. 샤드의 수가 줄어 마스터 노드의 부하를 줄여줍니다.
  • actions.allocate.number_of_replicas: 복제 샤드를 1개로 설정합니다.
  • set_priority.priority: 인덱스 복구 우선순위를 hot tier보다 낮게 설정합니다.

Cold tier 설정 파라미터 설명

  • min_age: 인덱스가 rollover된지 4분이 지나면 해당 인덱스는 Cold tier로 설정합니다.
  • actions.allocate.number_of_replicas: 인덱스의 복제 샤드를 제거하여 디스크 사용량을 줄여줍니다.
  • set_priority.priority: 인덱스 복구 우선순위를 Warm tier보다 낮게 설정합니다.
  • freeze: 해당 인덱스를 읽기 전용으로 설정합니다.

Delete 설정 파라미터 설명

  • min_age: 인덱스가 rollover된지 6분이 지나면 해당 인덱스를 삭제합니다.

ILM Policy 설정을 통하여 데이터의 Tier가 변경되기 위해서는 먼저 Rollover가 발생하여야 합니다. 이번 테스트를 위해 doc의 개수가 10개 이상이면 rollover가 발생하도록 설정을 하였고 warm은 2분 뒤 cold는 4분 뒤 delete는 6분 뒤로 아주 짧게 시간을 설정하였습니다.

 

Index Tempalte 설정

ILM Policy 설정을 적용한 Index template을 구성합니다. 인덱스 template 설정에 대한 설명은 다음 링크를 통해 확인해 주세요

https://stdhsw.tistory.com/entry/Elasticsearch%EC%9D%98-Index-template-%EC%84%A4%EC%A0%95

PUT http://localhost:9200/_template/my_log_template
Content-Type: application/json

{
  "index_patterns": ["my_log-*"],
  "settings": {
    "index": {
      "number_of_shards": "4",
      "number_of_replicas": "1",
      "refresh_interval": "5s",
      "lifecycle": {
        "name": "my_log_ilm",
        "rollover_alias": "my_log"
      }
    }
  },
  "mappings": {
    "properties": {
      "app": {
        "type": "keyword"
      },
      "level": {
        "type": "keyword"
      },
      "message": {
        "type": "text"
      }
    }
  }
}

 

빠른 테스트를 위한 클러스터 설정(무시해도 됩니다.)

테스트를 진행하는데 너무 오랜 시간을 투자할 수 없기 때문에 Elasticsearch에서 Index를 롤오버를 검사하고 수행하는 주기를 10초로 줄여줍니다. 만약 이 설정을 추가하지 않을 경우 기본값으로 10분의 시간이 소요됩니다. 테스트를 위해 설정을 해주었지만 상용서버에서는 설정을 변경하지 않는 것을 권장드립니다.

PUT http://localhost:9200/_cluster/settings
Content-Type: application/json

{
  "persistent": {
    "indices.lifecycle.poll_interval": "10s"
  }
}

 

인덱스 생성하기

인덱스를 수동적으로 먼저 생성해 주는 이유는 is_write_index 옵션을 사용하기 위해서입니다.

PUT http://localhost:9200/my_log-000001
Content-Type: application/json

{
  "aliases": {
    "my_log": {
      "is_write_index": true
    }
  }
}

 

인덱스에 데이터 넣기

테스트를 위한 데이터를 벌크를 통하여 넣어줍니다. 한번에 rollover가 발생할 수 있도록 10개의 document를 넣습니다.

curl -XPOST http://localhost:9200/my_log/_bulk -H 'Content-Type: application/json' -d '
{ "index": {} }
{ "app": "my-app", "level": "debug", "message": "successes hello world" }
{ "index": {} }
{ "app": "my-app", "level": "info", "message": "hello world" }
{ "index": {} }
{ "app": "my-app", "level": "error", "message": "failed hello world" }
{ "index": {} }
{ "app": "my-app", "level": "warn", "message": "warning hello world" }
{ "index": {} }
{ "app": "my-app", "level": "debug", "message": "successes hello world" }
{ "index": {} }
{ "app": "my-app", "level": "info", "message": "hello world" }
{ "index": {} }
{ "app": "my-app", "level": "error", "message": "failed hello world" }
{ "index": {} }
{ "app": "my-app", "level": "warn", "message": "warning hello world" }
{ "index": {} }
{ "app": "my-app", "level": "error", "message": "failed hello world" }
{ "index": {} }
{ "app": "my-app", "level": "warn", "message": "warning hello world" }

'

인덱스 생성 결과

인덱스 생성된 결과를 보면 my_log-000001은 hot node에만 생성된 것을 확인할 수 있습니다.

 

Hot node에서 Warm node로 인덱스 이동

인덱스가 생성되고 롤오버 된 지 2분이 지나면 해당 데이터는 warm노드로 이동합니다. 이때 ILM Policy에서 값을 설정한 것처럼 인덱스의 샤드가 2개로 합쳐지면서 해당 인덱스의 이름 앞에 shrink이라는 단어가 추가되었습니다. 인덱스의 명칭이 변경되었지만 원래의 인덱스 이름으로 alias가 생성되었기 때문에 원래 인덱스 이름 그대로 검색을 하여도 전혀 문제가 발생하지 않습니다.

 

Warm node에서 Cold node로 인덱스 이동

인덱스가 롤오버된지 4분이 지나면 해당 인덱스는 Cold 노드로 이동하게 됩니다. 이것 또한 ILM Policy에 값을 설정한 것처럼 복제 샤드를 제거하였고 해당 인덱스는 읽기 모드로 전환된 상태입니다. 여전히 원래 인덱스 이름으로 alias를 가지고 있기 때문에 원래 인덱스명으로 검색이 가능합니다. 주로 Cold 데이터는 Searchable snapshot을 이용하지만 위에서 언급한 것처럼 Searchable snapshot은 Enterpise license에서만 제공하기 때문에 이번 실습에서는 그냥 warm노드처럼 사용하였습니다.

 

Cold node에서 인덱스 삭제까지

인덱스가 롤오버 된지 6분이 지나면 해당 인덱스는 삭제됩니다. 아래 이미지를 보면 my_log-000001 인덱스가 삭제된 것을 확인할 수 있습니다.

 

마지막으로

이번에 빠르게 실습을 하기 위해 data tier의 변경을 아주 짧게 2분 단위로 변경될 수 있도록 하여 테스트를 진행하였습니다. 그러나 실제로는 이렇게 사용하지 않고 일 단위로 tier가 변경되도록 설정을 합니다. 예시로 다음과 같이 설정할 경우 시간 흐름도에 따라 data tier가 변경되는 모습은 다음과 같습니다.

  • hot의 min_age: "0ms"
  • warm의 min_age: "1d"
  • cold의 min_age: "3d"
  • delete의 min_age: "7d"

 

참조

https://www.elastic.co/guide/en/elasticsearch/reference/current/data-tiers.html

 

Data tiers | Elasticsearch Guide [8.10] | Elastic

Elasticsearch generally expects nodes within a data tier to share the same hardware profile. Variations not following this recommendation should be carefully architected to avoid hot spotting.

www.elastic.co

https://www.elastic.co/guide/en/elasticsearch/reference/current/set-up-a-data-stream.html

 

Set up a data stream | Elasticsearch Guide [8.10] | Elastic

If you use Fleet or Elastic Agent, skip this tutorial. Fleet and Elastic Agent set up data streams for you. See Fleet’s data streams documentation.

www.elastic.co

https://www.elastic.co/guide/en/elasticsearch/reference/current/ilm-actions.html

 

Index lifecycle actions | Elasticsearch Guide [8.10] | Elastic

Index lifecycle actionsedit Allocate Move shards to nodes with different performance characteristics and reduce the number of replicas. Delete Permanently remove the index. Force merge Reduce the number of index segments and purge deleted documents. Migrat

www.elastic.co

 

 

JVM 설정

elasticsearch JVM이라고 검색을 해보면 많은 글들이 JVM은 전체 메모리에 50%로 설정하라고 설명하는 곳이 많습니다. 그렇지만 단순히 50%라고만 하기에는 사용자의 환경이 다르기 때문에 설명이 부족합니다. elasticsearch 공식 문서에 따르면 전체 메모리의 50% 이하로 설정하는 것을 권장하고 있으며, 환경에 따라 다르지만 대중적으로 26GB 이하로 설정해야 된다고 설명을 하고 있지만 사용자의 환경에 따라 30GB까지도 설정을 할 수 있다고 합니다. 그 이유는 Zero base comporessed OOP 때문인데요 Compressed OOP는 아래에서 다루도록 하겠습니다. 그리고 JVM 설정에 유의해야 되는 것은 전체 메모리의 50% 이하를 JVM에 줬다면 나머지 메모리는 시스템과 Lucene이 사용하게 됩니다. 그렇기 때문에 통계 및 분석이 많은 쿼리를 자주 사용한다면 JVM에 메모리를 높게 주고(50%에 가깝게) 검색만 주로 하는 쿼리를 자주 사용한다면 JVM메모리를 보다 적게 주어 Lucene이 더 많은 메모리를 사용할 수 있게 하는 것이 좋습니다.

 

Compressed OOP

포인터의 공간 낭비를 줄이고 좀 더 빠른 연산을 위해 포인터를 압축해서 표현하는 메모리 최적화 기법 중 하나입니다. 이 기법의 원리는 포인터가 메모리 주소를 가리키는게 아닌 Object Offset을 가리키도록 변형해서 동작시키는 것입니다. 만약 8비트 포인터를 사용한다면 256바이트의 물리적인 주소를 표현하는게 아니라 256개의 객체를 가리킬 수 있게 되는 것입니다. 만약 JVM의 크기가 32GB를 넘어가게 된다면 포인터 구조가 Compressed OOP가 아닌 일반 OOP 방식으로 전환되어 Compressed OOP의 이점을 살릴 수가 없습니다. 그러므로 메모리의 여유가 있다면 차라리 노드를 더 늘려 주는 것이 더 현명한 선택이 될 것 같습니다.

 

Compressed OOP 메모리 주소

  • 32비트 OOP : 2의 32승 까지의 주소를 가리킬 수 있다. 4GB
  • 64비트 OOP : 2의 64승 까지의 주소를 가리킬 수 있다. 18EB
  • 32비트 Compressed OOP : 2의 32승 오브젝트를 가리킬 수 있습니다.
  • 메모리 공간으로는 2의 32승 곱하기 8만큼 표현이 가능합니다.

 

Zero base Compressed OOP 확인

Zero base Compressed OOP는 JVM의 힙 메모리가 0번지부터 시작하게 하는 설정이다. JVM의 크기가 32GB로 설정하여 Zero base Compressed OOP가 깨진 것과 30GB 이하로 설정하여 Zero base Compressed OOP가 활성화된 것의 차이를 확인해 보겠습니다.

 

JVM 32GB

$ java -Xmx32768m -XX:+PrintFlagsFinal -XX:+UnlockDiagnosticVMOptions -XX:+PrintComressedOopsMode 2>/dev/null | grep Compressed | grep Oops

bool PrintCompressedOopsMode = true
bool UseCompressedOops = false

 

JVM 30GB

$ java -Xmx30720m -XX:+PrintFlagsFinal -XX:+UnlockDiagnosticVMOptions -XX:+PrintComressedOopsMode 2>/dev/null | grep Compressed | grep Oops

heep address : 0x0000000...
bool PrintCompressedOopsMode = true
bool UseCompressedOops = true

 

JVM 여러가지 옵션

Elasticsearch 공식 문서에 따르면 Xms, Xmx의 값은 동일해야 하며 전체 메모리의 50% 이하로 설정하며 root jvm.options 파일을 수정하지 않는 것을 권장합니다. 저는 Xms, Xmx만 수정하고 나머지는 그냥 이런 것들이 있다는 것만 알고 넘어갔습니다.

  • -Xms : 기본 heep 사이즈 (기본값 : 64MB)
  • -Xmx : 최대 heep 사이즈 (기본값 : 256MB)
  • -XX:PermSize : 기본 Perm 사이즈
  • -XX:MaxPermSize : 최대 Perm 사이즈
  • -XX:NewSize : 최소 New 영역 사이즈
  • -XX:MaxNewSize : 최대 New 영역 사이즈
  • -XX:SurvivorRatio : New 영역 / Survivor 영역 비율 설정
  • -XX:NewRatio : Young Gen, Old Gen의 비율 설정
  • -XX:+DisableExplicitGC : System.gc() 무시 설정
  • -XX:+CMSPermGenSweepingEnabled : Perm Gen 영역도 GC의 대상이 되도록 설정
  • -XX:+CMSClassUnloadingEnabled : Class 데이터도 GC의 대상이 되도록 설정

요약

  • JVM 설정은 전체 메모리의 50%이하로 설정
  • 사용자 환경에 따라 다르지만 최대 26GB이하로 설정하는 것을 권장합니다. 물론 환경에 따라 30GB까지 가능합니다.
  • 26GB 또는 30GB가 넘으면 Zero base compressed OOP가 깨짐
  • 통계 및 분석을 많이 사용하면 JVM을 50%에 가깝게 설정

 

참고

https://www.elastic.co/guide/en/elasticsearch/reference/current/advanced-configuration.html#set-jvm-options

 

Advanced configuration | Elasticsearch Guide [8.10] | Elastic

This functionality is in technical preview and may be changed or removed in a future release. Elastic will apply best effort to fix any issues, but features in technical preview are not subject to the support SLA of official GA features.

www.elastic.co


노드 역할 분리 및 노드 개수

소규모 프로젝트 및 개발 환경에서는 리소스를 절약하게 위해 단일 노드로 사용해도 무방합니다. 그러나 프로젝트의 규모가 거대해지고 데이터의 양이 많아지면 그때는 단일 노드로는 불가능한 시점이 오게 되고 Elasticsaerch의 클러스터가 깨지는 위험이 발생할 수 있습니다. 이럴 때 자원을 효율적(효율적이라는 표현은 자원을 적게 쓰는 것이 아닌 역할에 맞게 잘 활용한다는 의미)으로 사용하기 위해서는 노드의 역할을 분배하는 것을 권장하고 있습니다. 노드의 종류는 여러 가지가 있지만 대표적으로 마스터 노드와 데이터 노드를 분리하는 것만으로도 클러스터를 안정적으로 운영할 수 있습니다. 마스터 노드는 클러스터 운영관리 및 노드, 샤드를 관리하는 역할을 수행하고 데이터 노드는 데이터를 저장하고 검색하는 역할을 수행합니다. 마스터와 데이터의 역할을 분리하게 되면 마스터는 온전히 마스터의 역할만 수행하게 되어 마스터 노드에 문제가 발생할 문제가 줄어들게 됩니다. 마스터 노드를 설정할 때는 마스터 선출을 위한 Vorting이 이루어 지게 되므로 홀수개로 설정하는 것을 권장합니다. 마스터 노드를 여러 개로 설정할 경우 실제로 마스터의 역할을 수행하는 것은 1개이며 나머지는 현재 마스터 노드의 문제가 발생할 경우 마스터로 승격되어 마스터 노드의 역할을 수행할 수 있도록 Standby형태로 대기하고 있습니다.

 

Scale In시 주의사항 (in kubernetes)

인덱스의 데이터는 샤드로 분할하여 데이터가 저장됩니다. 이때 Replica Shard를 지정하지 않았다면 데이터 노드의 Scale In을 수행할 때 데이터의 손실이 발생할 수 있습니다. 물론 Replica Shard를 구성하더라도 무턱대로 Scale In을 수행하면 데이터의 손실일 발생할 수 있습니다. 아래 그림은 Scale In 수행시 데이터 손실이 발생하는 경우입니다.

그림에서 볼 수 있듯이 Replica Shard가 존재하더라도 Data Node 2번과 3번이 같이 드랍되면 Shard2번의 데이터는 손실되게 됩니다. 그렇기 때문에 Scale In을 수행할 때는 1개씩 줄여가면서 데이터의 복사가 완전히 이루어지면 다음 노드를 줄이는 것을 권장합니다.

 

Scale Up시 주의사항 (in kubernetes)

저 같은 경우 Elasticsearch를 Kubernetes 환경에서 사용하고 있습니다. 만약 마스터 노드가 1개밖에 없는 상황에서 마스터 노드를 스케일업을 수행하게 되면 클러스터가 깨질 수 있습니다. 그 이유는 해당 마스터 노드가 드랍될 때 마스터의 역할을 수행할 수 있는 노드가 없기 때문입니다. 그렇기 때문에 Kubernetes환경에서 마스터 노드가 1개 일 경우 스케일업을 수행할 때는 반듯이 먼저 스케일 아웃을 하여 해당 마스터 노드가 드랍되더라도 마스터의 역할을 대신할 수 있는 노드가 있기 때문에 문제없이 클러스터를 운영할 수 있습니다.

 

노드는 무조건 많으면 좋을까?

노드의 개수가 많으면 데이터를 분할 저장하고 부하를 나눠 받기 때문에 무조건 좋게 느껴질 수 있습니다. 그러나 각각의 노드들은 서로 통신을 하며 클러스터를 유지하기 때문에 노드가 너무 많게 되면 서로 통신하느라 클러스터의 문제가 발생할 수 있습니다. 그래서 저는 스토리지가 부족하면 스케일아웃을 수행하고 CPU, Memory 리소스가 부족하면 스케일업을 수행하고 있습니다. 무엇이 부족한지 측정하기 위해서는 Elasticsearch Exporter를 설치하여 Prometheus, Grafana를 통하여 측정할 수 있습니다.

vm.max_map_count

Elasticsearch는 파일을 메모리에 매핑하고 해당 Lucene MMapDirectory을 통하여 인덱스 샤드가 저장됩니다. 이러한 동작 과정에서 많은 데이터를 처리하기 위해 어느 정도 크기의 가상 주소 공간이 필요합니다. 그러나 기본적으로 대부분의 운영체제에서는 65,630 만큼의 mmap 개수로 설정되어 있는데 이 수치는 elasticsearch를 운영하는데 매우 작은 양입니다. 그래서 별도로 설정을 하지 않고 Elasticsearch를 실행시키면 bootstrap에 의해 해당 vm.map_map_count의 값이 매우 작다는 Error가 발생하면서 Elasticsearch가 종료합니다. 이러한 문제를 해결하기 위해 vm.max_map_count의 값을 262,144 값으로 수정하여 사용하시면 됩니다.

수정 방법

# 명령어로 변경하면 재부팅 시 유지되지 않는다.
$ sudo sysctl -w vm.max_map_count = 262144

# 파일 수정으로 영구적으로 값 유지 vm.max_map_count 수정
$ vim /etc/sysctl.conf

# max_map_count 값 확인
$ sysctl vm.max_map_count

Kubernetes 환경에서 설치된 Elasticsearch는?

kubernetes 환경에서 helm을 통하여 elasticsearch를 설치하셨다면 해당 설정을 굳이 하실 필요가 없는데요 그 이유는 Elasticsearch StatefulSet의 initcontainer의 "configure-sysctl"이라는 컨테이너가 Elasticsearch가 구동되기 전에 해당 설정을 적용하고 Elasticsearch가 동작하기 때문입니다.

 

elasticsearch/template/statefulset.yaml

      - name: configure-sysctl
        securityContext:
          runAsUser: 0
          privileged: true
        image: "{{ .Values.image }}:{{ .Values.imageTag }}"
        imagePullPolicy: "{{ .Values.imagePullPolicy }}"
        command: ["sysctl", "-w", "vm.max_map_count={{ .Values.sysctlVmMaxMapCount}}"]

참고

https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules-store.html

 

Store | Elasticsearch Guide [8.10] | Elastic

This is a low-level setting. Some store implementations have poor concurrency or disable optimizations for heap memory usage. We recommend sticking to the defaults.

www.elastic.co

https://www.elastic.co/guide/en/elasticsearch/reference/current/vm-max-map-count.html

 

Virtual memory | Elasticsearch Guide [8.10] | Elastic

Elasticsearch uses a mmapfs directory by default to store its indices. The default operating system limits on mmap counts is likely to be too low, which may result in out of memory exceptions. On Linux, you can increase the limits by running the following

www.elastic.co


메모리 스와핑 비활성화

대부분의 운영체제에서는 메모리 관리를 효율적으로 하기 위해서 메모리 스와핑이라는 기술을 사용합니다. 메모리 스와핑은 메모리가 부족하거나 현재 사용되지 않거나 우선순위가 낮은 프로세스의 데이터를 메모리 스와핑을 통하여 디스크에 이동시켜 메모리를 효율적으로 사용하는 방법인데 Elasticsearch에서는 이러한 메모리 스와핑이 치명적인 성능 저하의 원인이 될 수 있습니다.

Elasticsearch는 메모리를 많이 사용하는 특성을 가지고 있는데 메모리 스와핑 작업에 의해 가비지 컬렉션이 비정상적으로 느려지며 노드가 데이터를 디스크로 쓰이고 읽어지는 과정에서 데이터를 처리하는데 응답속도가 매우 느려지고 이러한 문제로 인하여 클러스터와의 연결이 끊어지는 문제가 발생하게 됩니다. 그래서 Elasticsearch 공식 문서에서는 메모리 스와핑을 비활성화하는 것을 권장하고 있습니다.

메모리 스왑핑 비활성

# 명령어로 일시적으로 스와핑 비활성화
$ sudo swapoff -a

# 파일 수정으로 영구적으로 비활성화
$ vi /etc/fstab


# 스왑 상태 확인 명령어
$ free
# 스왑 디렉토리 확인
$ cat /proc/swaps
# 스왑 통계 확인
$ sar -B 2 5

메모리 스와핑을 비활성화하기 때문에 Elasticsearch가 동작하는 노드에 다른 애플리케이션이 같이 동작하는 것은 권장하지 않습니다.

메모리 스와핑 비활성을 할 수 없는 상황이라면?

환경 특성상 메모리 스와핑을 비활성화할 수 없는 상황이라면 최대한 스와핑 주기를 낮춰 발생 빈도를 줄이는 방법이 있습니다.

# 명령어로 설정
$ sudo sysctl vm.swappiness=1

# 설정 확인
$ cat /proc/sys/vm/swappiness

memory_lock 설정

만약에 현제 root 권한이 없어 메모리 스와핑 설정이 불가능한 경우에는 bootstrap.memory_lock 설정을 통하여 메모리 스와핑을 최소화할 수 있습니다. 커널 수준에서 제공되는 함수 중에 mlockall()가 있는데 이 함수는 프로세스의 페이징을 금지하고 모든 메모리가 램에 상주하는 것을 보장해 줍니다. memory_lock을 이용하여 애플리케이션에서 메모리를 강제로 스와핑 하지 못하도록 설정합니다.

memory_lock 설정

# memory_lock 설정하기
$ vi ./elasticsearch.yml
bootstrap.memory_lock: true

# Request를 보내 memory_lock 설정 여부 확인하기
curl -XGET http://localhost:9200/_nodes?filter_path=**.mlockall

Kubernetes 환경에서 설치된 Elasticsearch는?

kubelet에서는 메모리 스왑이 기본적으로 off된 환경이라고 합니다. 그렇기 때문에 메모리 스왑핑에 대해서는 kubernetes 환경에서는 굳이 고민을 안 해도 될 것 같습니다. (https://github.com/elastic/helm-charts/issues/7)

참고

https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration-memory.html

 

Disable swapping | Elasticsearch Guide [8.10] | Elastic

mlockall might cause the JVM or shell session to exit if it tries to allocate more memory than is available!

www.elastic.co


Max open file 설정

Elasticsearch에서는 클러스터의 노드끼리 소켓을 사용해 통신을 하며, 루씬 인덱스에서 데이터를 색인 처리하는데 많은 수의 파일을 사용하고 있습니다. 그렇기 때문에 파일을 열람할 수 있는 크기가 매우 작으면 문제가 될 수 있기 때문에 설정을 통하여 max open file의 수를 지정해 주어야 합니다.

max open file 설정

# 명령어를 통하여 일시적인 적용
$ sudo su
$ ulimit -n 81920

# 파일 수정을 통하여 영구적인 적용
$ vim /etc/security/limits.conf
irterm  softnofile  81920
irterm  hardnofile  81920

max file 설정 값 확인

curl -XGET http://localhost:9200/_nodes/stats/process

위 Request를 통해 open_file_descriptors로는 현재 열려 있는 파일 디스크립트의 개수를 알 수 있으며, max_file_descriptors를 통하여 최대 사용 가능한 파일 디스크립터의 개수를 확인할 수 있습니다.

참고

https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-system-settings.html

 

Configuring system settings | Elasticsearch Guide [8.10] | Elastic

Ubuntu and limits.conf Ubuntu ignores the limits.conf file for processes started by init.d. To enable the limits.conf file, edit /etc/pam.d/su and uncomment the following line: # session required pam_limits.so

www.elastic.co

 

reroute

Elasticsearch에서는 기본적으로 각각의 노드에 샤드가 일정 비율로 균등하게 배치되도록 하고 있습니다. 그래도 뜻하지 않게 특정 노드에 샤드가 많이 배치될 수도 있고 어떤 인덱스는 샤드의 크기가 매우 커서 노드의 스토리지 사용량이 불균형이 발생할 수도 있습니다. 이런 경우 Elasticsearch에서 reroute를 이용하여 해당 문제를 해결할 수 있습니다.

샤드 재배치

해당 인덱스의 특정 샤드를 reroute를 이용하여 "elasticsearch-data-1"에서 "elasticsearch-data-3"노드로 이동하도록 하겠습니다.

curl -X POST http://localhost:9200/_cluster/reroute -H 'Content-Type: application/json' -d '
{
  "commands": [
    {
      "move": {
        "index": "my-log",
        "shard": 0,
        "from_node": "elasticsearch-data-1",
        "to_node": "elasticsearch-data-3"
      }
    }
  ]
}
'

성공했을 때의 결과는 다음과 같이 출력됩니다.

{
  "acknowledged": true,
  "state": {
    "cluster_uuid": "kvqSEAWcRiKL1Riklcqi_w",
    "version": 93,
    "state_uuid": "HnEEoQ3YRVSEzzme0i_5hg",
    "master_node": "iWgI2xYvRYyhfhdV_TFKKQ",
    "blocks": {},
    "nodes": {...생략 ...},
    "routing_table": {...생략 ...},
    "routing_nodes": {...생략 ...},
    "health": {
      "disk": {
        "high_watermark": "90%",
        "high_max_headroom": "150gb",
        "flood_stage_watermark": "95%",
        "flood_stage_max_headroom": "100gb",
        "frozen_flood_stage_watermark": "95%",
        "frozen_flood_stage_max_headroom": "20gb"
      }
    }
  }
}

해당 request를 처리하는데 관여한 노드의 정보와 인덱스 샤드의 대한 정보가 출력되며 disk의 설정 값까지 출력됩니다.

 

request를 수행하기 전과 수행한 후의 샤드의 배치 모습을 아래와 같이 그림으로 그렸습니다. 여기서 중요한 것은 "Shard 0"의 Primary shard와 Replica shard가 고가용성(HA)을 유지하기 위해서 같은 노드에 존재하게 하면 안 됩니다.

 

마지막으로

위에서 설명한 것처럼 샤드는 인덱스 별로 크기가 모두 다르기 때문에 스토리지 사용량을 균일하게 분배할 때 용이하게 사용할 수 있을 것 같습니다. 또한 reroute는 샤드의 재배치뿐만 아니라 특정 노드가 드롭되었을 때 샤드를 복구하는 과정에서 해당 샤드를 어디에 배치하는지에 대한 설정도 가능합니다. 자세한 내용은 아래 elasticsearch 공식 문서를 참고해 주세요

참고

https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-reroute.html#cluster-reroute-api-prereqs

 

Cluster reroute API | Elasticsearch Guide [8.10] | Elastic

move Move a started shard from one node to another node. Accepts index and shard for index name and shard number, from_node for the node to move the shard from, and to_node for the node to move the shard to. cancel Cancel allocation of a shard (or recovery

www.elastic.co

 

copy_to란?

Elasitcsearch의 index에는 여러 필드들이 존재합니다. copy_to는 여러 필드의 값을 그룹으로 묶어 하나의 필드로 검색할 수 있는 기능을 제공하는 것입니다. Elasticsearch 공식 문서에서는 first name과 last name을 예시를 들어 두 필드를 하나의 full name이라는 필드로 copy_to 하여 full name 하나의 필드만으로 first name과 last name을 모두 검색이 가능하도록 하였습니다. 이렇게 copy_to를 이용하면 검색을 할 때 "피트"라는 사람이 "피트"가 first name인지 last name인지 명확하게 알지 못할 때 검색이 유용할 뿐만 아니라 여러 필드를 자주 검색할 때 쿼리에 여러 필드를 검색하는 것보다 하나의 copy_to 필드를 검색하는 것이 검색 속도 향상에도 많은 도움이 된다고 합니다.

 

copy_to 사용해보기

Elasticsearch 공식 문서에서는 first name, last name을 예시로 들었지만 저 같은 경우 Elasticsearch를 로그를 저장하고 분석하는데 많이 사용하기 때문에 App name, Service name을 예시로 사용해 보겠습니다.

 

my-log 인덱스 필드 타입

필드명 타입
app keyword
service keyword
app_service keyword
level keyword
message text

 

my-log 인덱스 생성

curl -X PUT http://localhost:9200/my-log -H 'Content-Type: application/json' -d '
{
  "mappings": {
    "properties": {
      "app": {
        "type": "keyword",
        "copy_to": "app_service"
      },
      "service": {
        "type": "keyword",
        "copy_to": "app_service"
      },
      "app_service": {
        "type": "keyword"
      },
      "level": {
        "type": "keyword"
      },
      "message": {
        "type": "text"
      }
    }
  }
}
'

 

데이터는 아래와 같이 넣어줬습니다.

{"app":"payment", "service":"api_server", "level": "debug", "message": "A payment was successful"}
{"app":"payment", "service":"api_server", "level": "debug", "message": "B payment was successful"}
{"app":"client", "service":"server", "level": "debug", "message": "client connected"}

 

이제 app_service라는 필드 하나만으로 payment를 검색하여 2개의 데이터가 출력되는지 확인합니다.

curl -X POST http://localhost:9200/my-log/_search -H 'Content-Type: application/json' -d '
{
  "query": {
    "match": {
      "app_service": {
        "query": "payment"
      }
    }
  }
}
'

 

출력결과

{
  "took": 43,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 2,
      "relation": "eq"
    },
    "max_score": 0.5908618,
    "hits": [
      {
        "_index": "my-log",
        "_id": "1",
        "_score": 0.5908618,
        "_source": {
          "app": "payment",
          "service": "api_server",
          "level": "debug",
          "message": "A payment was successful"
        }
      },
      {
        "_index": "my-log",
        "_id": "2",
        "_score": 0.5908618,
        "_source": {
          "app": "payment",
          "service": "api_server",
          "level": "debug",
          "message": "B payment was successful"
        }
      }
    ]
  }
}

위에서 설명했던 것과 같이 여러 필드를 검색 조건에 전부 기입하는 것 보다 이렇게 copy_to를 이용하면 개발자 입장에서 보다 간편하고 검색 성능을 높을 수 있습니다. copy_to를 잘 활용만 한다면 상황에 따라 개발하는데 많은 도움이 될 것 같습니다.

 

참고

https://www.elastic.co/guide/en/elasticsearch/reference/current/copy-to.html

 

copy_to | Elasticsearch Guide [8.10] | Elastic

copy_to is not supported for field types where values take the form of objects, e.g. date_range

www.elastic.co

+ Recent posts