현재 제가 근무하는 회사는 모든 서비스를 마이크로서비스 아키텍처(MSA)로 동작하기 위해 Kubernetes 환경에서 운영하고 있습니다. 이러한 환경에서는 Horizontal Pod Autoscaler(HPA)를 사용하여, Consumer 애플리케이션이 일정 리소스를 초과하여 사용할 경우, 이를 감지하고 자동으로 스케일아웃(Scale Out)을 진행하여 최대 파티션 개수만큼 늘어날 수 있도록 설정하였으며 반대로 리소스 사용량이 감소하면, 스케일인(Scale In)을 통해 리소스 소비를 최소화하도록 조정하였습니다. 그러나, HPA로 인하여 Kafka 컨슈머의 스케일아웃(Scale Out) 또는 스케일인(Scale In)이 빈번히 발생하면, 파티션 할당(Partition Assignment) 과정에서 부하가 발생하여 Kafka의 성능 저하로 이어질 수 있습니다. 이는 파티션 할당 과정에서의 잦은 변동이 시스템에 부담을 주기 때문입니다. 이러한 문제를 해결하기 위해, 파티션 할당 전략(Partition Assignment Strategy)을 조정하는 설정 변경을 통해 이러한 부하를 최소화할 수 있습니다. 이번 문서에서는 Kafka 컨슈머 그룹에서 파티션을 효율적으로 할당하고 관리함으로써, 성능 저하를 최소화하여 최적화하는 방법에 대하여 정리하였습니다.

 

실습 방법

이번 실습에서는 kcat을 이용하여 카프카로부터 데이터를 consume하는 방법으로 진행하였습니다. kcat을 이용해서 partition.assignment.strategy를 설정하여 컨슈머 모드로 동작하는 방법은 다음과 같습니다.

(kcat 버전 1.7.0 이상부터 사용 가능한 옵션입니다.)

kcat -C -G <컨슈머그룹> -b <브로커주소> -X partition.assignment.strategy=<할당전략> 토픽명

 

docker compose 카프카 클러스터 구축방법
https://stdhsw.tistory.com/entry/Docker-compose%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%9C-Kafka-Cluster-%EA%B5%AC%EC%B6%95-KRAFT-Mode

 

kcat 기본 사용법

https://stdhsw.tistory.com/entry/kcatkafkacat-%EC%82%AC%EC%9A%A9%EB%B2%95

 

partition.assignment.strategy 설정

partition.assignment.strategy 설정은 Kafka Consumer에서 파티션 할당 알고리즘을 결정하는 설정입니다. 이 설정을 통해 Consumer Group 내의 각 Consumer에게 파티션을 할당하는 방식을 정의할 수 있으며, Kafka 클러스터의 전반적인 성능과 가용성에 영향을 미칠 수 있습니다. 다음은 partition.assignment.strategy에 적용될 수 있는 전략과 각각의 특성에 대하여 알아보겠습니다.

Range Assignment Strategy

Range Assignment Strategy는 partition.assignment.strategy 설정의 기본값으로, Kafka Consumer Group에서 파티션 할당을 관리하는 데 사용되는 전략입니다. 이 전략의 핵심은 각 토픽의 파티션들을 Consumer Group 내의 Consumer 리스트에 순차적으로 할당하는 것입니다. 이 할당 방식은 특히 토픽별로 파티션을 균등하게 분배하고자 할 때 유리합니다. 각 토픽에 대해 Consumer들은 대략적으로 균등한 부하를 받게 되어, 부하 분산에 효과적입니다. 그러나 Range Assignment Strategy는 몇 가지 단점도 가지고 있습니다. 특히, Consumer Group 내에서 Consumer의 수가 변할 때 (즉, Scale out이나 Scale in이 발생할 때) 전체 파티션 할당 과정이 재실행됩니다. 이는 Consumer Group에 속한 모든 Consumer의 연결을 잠시 끊고, 파티션 할당을 새롭게 시작한다는 의미입니다. 이러한 과정은 Consumer 수에 자주 변화가 있는 환경에서 비효율적이며, 메시지 처리 지연이나 리밸런싱 동안의 처리량 감소와 같은 문제를 초래할 수 있습니다.

장점 단점
 토픽별로 파티션에 데이터가 골고루 분포되었을 경우, Consumer Group 내의 각 Consumer가 균등하게 부하를 받게 됩니다.

균등한 부하를 받기 때문에 각 컨슈머의 부하량을 예상하기 쉽습니다.
 각 토픽에 대해 파티션을 균등하게 할당하려고 시도하지만, Consumer가 여러 토픽에서 데이터를 가져오는 경우에는 특정 파티션에 부하가 집중될 위험이 있습니다.

Consumer Group에 새로운 Consumer가 추가되거나 기존 Consumer가 제거되는 등의 Scale out/in이 발생하면, Range Assignment Strategy에 따라 모든 파티션이 재할당됩니다. 이 과정에서 모든 Consumer의 연결이 잠시 끊기게 되며, 이는 리밸런싱 과정을 거치게 되어 성능 저하가 발생할 수 있습니다.

 

Range Assignment Strategy의 동작은 아래 이미지처럼 Consumer Group 내의 Consumer 수가 줄어들 경우 모든 파티션 할당은 revoked로 연결이 끊어지고 다시 Topic을 기준으로 남아있는 Consumer에 재할당을 수행합니다.

Round Robin Assignment Strategy 

Round Robin Assignment Strategy는 Kafka에서 사용할 수 있는 또 다른 파티션 할당 전략으로, Range Assignment Strategy와 달리 토픽을 기준으로 파티션을 할당하지 않습니다. 대신, 이 전략은 사용 가능한 모든 파티션을 Consumer Group 내의 모든 Consumer에게 순서대로 할당합니다, 토픽 간 경계를 무시하고 전체 파티션 목록을 기준으로 합니다.

장점 단점
Round Robin 전략은 각 Consumer에게 전체 파티션 목록을 걸쳐 균등하게 할당함으로써, 모든 Consumer가 대략적으로 동일한 양의 작업을 처리하도록 합니다.  특정 토픽이 다른 토픽보다 훨씬 더 높은 메시지 유입률을 가지고 있을 경우, Round Robin 방식은 이러한 불균형을 고려하지 않기 때문에, 특정 토픽의 파티션에서 처리 지연이 발생할 수 있습니다.

Range Assignment Strategy와 마찬가지로 Consumer Group에 새로운 Consumer가 추가되거나 기존 Consumer가 제거되는 등의 Scale out/in이 발생하면 모든 파티션이 재할당됩니다.

 

Round Robin Assignment Strategy의 동작은 아래 이미지처럼 Consumer Group 내의 Consumer 수가 줄어들 경우 모든 파티션 할당은 revoked로 연결이 끊어지고 모든 파티션을 기준으로 Consumer에 재할당을 수행합니다.

 

Sticky Assignment Strategy

Sticky Assignment Strategy는 파티션 재할당을 최소화하는 것을 목표로 하는 전략입니다. Consumer Group의 Consumer 개수의 변화가 생겼을 때 기존의 파티션 할당을 가능한 유지 하여 Consumer 리밸런싱이 발생할 때 파티션의 재할당을 최소화하여 성능을 향상할 수 있는 전략입니다.

장점 단점
Sticky Assignment Strategy는 Consumer에 한 번 할당된 파티션을 가능한 한 그대로 유지하여 Consumer 개수의 변화가 발생하여도 할당된 파티션은 재할당을 수행하지 않습니다.

이미 할당된 파티션에 대해서는 Consumer 리밸런싱을 수행하지 않아 자주 Scale In/Out이 발생하는 환경에서 사용하기 적합합니다.
Sticky Assignment Strategy는 기존의 파티션 할당을 가능한 한 유지하려고 합니다. 이로 인해 Consumer Group 내에서 파티션을 새로운 Consumer에게 더 효율적으로 재분배하는 것이 어려워질 수 있습니다.

 

Sticky Assignment Strategy의 동작은 아래 이미지처럼 Consumer Group 내의 Consumer 수가 줄어들 경우 현재 동작중인 Consumer에 할당된 파티션은 유지하고 끊어진 Consumer에 대하여 재할당을 수행합니다. Sticky Assignment Strategy는 Consumer의 개수가 줄어드는 상황에서는 큰 문제가 발생하지 않으나 반대로 Consumer의 개수가 늘어나거나 잠시 중단되었던 Consumer가 restart를 하여 참여할 경우 해당 컨슈머에는 파티션할당이 불균형할 수 있습니다.

Cooperative Sticky Assignment Strategy 

Cooperative Sticky Assignment Strategy는 기존의 Sticky Assignment Strategy를 기반으로 하면서 몇 가지 중요한 개선 사항을 도입한 것입니다. 이 전략은 기존 Sticky 전략의 단점을 보완하며, 특히 Consumer Group 내의 Consumer 구성이 변할 때 발생하는 리밸런싱 프로세스의 효율성을 개선합니다. Cooperative Sticky 전략은 파티션 할당 변경이 필요할 때 전체 Consumer Group에 대한 리밸런싱을 수행하는 대신, 필요한 최소한의 변경만을 수행합니다. 이는 리밸런싱 프로세스가 더 빠르고 효율적으로 완료될 수 있도록 하여 전체 시스템의 성능에 미치는 영향을 최소화합니다.

장점 단점
Sticky 전략은 재할당의 불균형이 발생할 수 있는 반면에 Cooperative Sticky Assignment Strategy는 이러한 단점을 보완하여 Consumer 리밸런싱을 최소화하면서 균형있는 재할당이 이루어집니다.

Consumer의 Scale in/out이 자주 발생하는 환경에서 사용하기 적합니다.
Cooperative Sticky는 Consumer assignment 작업을 효율적으로 수행하지만 리밸런싱 작업이 다른 Strategy보다 오래 걸리는 단점이 있습니다.

 

Kubernetes 환경에서 Consumer Pod를 Horizontal Pod Autoscaler (HPA)와 함께 구성하여 Pod의 Scale in/out이 발생하는 환경에서는 Cooperative Sticky Assignment Strategy을 선택하는 것이 현명한 선택입니다. Cooperative Sticky는 티션 할당을 최소화하면서 점진적인 리밸런싱을 지원하기 때문에, Pod의 확장 및 축소가 발생해도 Kafka Consumer Group의 전체적인 성능 저하를 줄일 수 있습니다.

마지막으로

Kafka를 사용하는 경험이 쌓이면서 느끼는 것은 모든 상황을 만족하는 설정 값은 없다는 것을 많이 느끼게 되는데요. 현재 자신이 사용하고 있는 데이터의 특성과 Producer, Consumer의 상황을 잘 파악하고 가장 적절한 설정 값을 적용하는 것이 가장 중요하다고 생각합니다.

 

Aggregation을 수행할 때 사용할 데이터

terms

terms는 데이터의 keyword별 종류 및 개수를 집계하는데 사용합니다.

 

모든 도큐먼트의 level의 종류와 개수 구하기

우리는 데이터를 가져오는 것이 목적이 아닌 Aggregation을 수행하는 것이 목적이기 때문에 "size"를 0으로 설정하였습니다.

level 필드의 데이터 종류와 각 종류별로 도큐먼트가 몇개 있는지 확인합니다.

api POST http://localhost:9200/test-log-index-2021-09-12/_search
header content-type: application/json
body {
    "size": 0,
    "query": {
        "match_all": {}
    },
    "aggs": {
        "byLevel": {
            "terms": {
                "field": "level"
            }
        }
    }
}

결과 보기

"byLevel" 내부의 buckets를 보면 "key"에는 level필드의 값이 "doc_count"에는 개수를 표현하고 있습니다.

즉 "info"는 9개가 있고, "warn"은 6개, "debug"는 5개가 있는 것을 확인할 수 있습니다.

 

range

값의 범위 별로 도큐먼트의 개수를 측정하는데 사용할 수 있습니다.

 

모든 도큐먼트의 duration의 범위별 개수 측정하기

우리는 데이터를 가져오는 것이 목적이 아닌 Aggregations을 수행하는 것이 목적이기 때문에 "size"를 0으로 설정하였습니다.

duration의 범위 100이하, 100부터 200까지, 200부터 300까지, 300부터 400까지, 400이상 으로 범위를 지정하여 검색하겠습니다.

api POST http://localhost:9200/test-log-index-2021-09-12/_search
header content-type: application/json
body {
    "size": 0,
    "query": {
        "match_all": {}
    },
    "aggs": {
        "byDuration": {
            "range": {
                "field": "duration",
                "ranges": [
                    {
                        "to": 100
                    },
                    {
                        "from": 100,
                        "to": 200
                    },
                    {
                        "from": 200,
                        "to": 300
                    },
                    {
                        "from": 300,
                        "to": 400
                    },
                    {
                        "from": 400
                    }
                ]
            }
        }
    }
}

결과 보기

 

histogram

range의 경우 from, to를 통하여 범위를 지정했다면 histogram은 range와 다르게 interval옵션을 통하여 interval단위 별로 필드의 개수를 측정하는 Aggregations입니다.

 

모든 도큐먼트의 duration을 100 단위로 개수 측정하기

우리는 데이터를 가져오는 것이 목적이 아닌 Aggregations을 수행하는 것이 목적이기 때문에 "size"를 0으로 설정하였습니다.

api POST http://localhost:9200/test-log-index-2021-09-12/_search
header content-type: application/json
body {
    "size": 0,
    "query": {
        "match_all": {}
    },
    "aggs": {
        "byDuration": {
            "histogram": {
                "field": "duration",
                "interval": 100
            }
        }
    }
}

결과 보기

100 단위 별로 도큐먼트의 개수를 확인할 수 있습니다.

 

date_range

range처럼 date필드 또한 범위를 지정하여 집계를 할 수 있습니다.

 

모든 도큐먼트의 start_time 필드를 범위 별로 측정하기

우리는 데이터를 가져오는 것이 목적이 아닌 Aggregations을 수행하는 것이 목적이기 때문에 "size"를 0으로 설정하였습니다.

"start_time"필드의 값을 범위 별로 측정하도록 하겠습니다.

api POST http://localhost:9200/test-log-index-2021-09-12/_search
header content-type: application/json
body {
    "size": 0,
    "query": {
        "match_all": {}
    },
    "aggs": {
        "date_range": {
            "date_range": {
                "field": "start_time",
                "ranges": [
                    {
                        "from": "2021-09-12 10:10:10",
                        "to": "2021-09-12 10:10:15"
                    },
                    {
                        "from": "2021-09-12 10:10:15",
                        "to": "2021-09-12 10:10:20"
                    },
                    {
                        "from": "2021-09-12 10:10:20"
                    }
                ]
            }
        }
    }
}

결과 보기

각 시간대별로 도큐먼트의 개수를 확인할 수 있습니다.

 

date_histogram

histogram과 같이 date_histogram도 interval 옵션을 넣어 각 interval별 집계를 측정할 수 있습니다.

interval옵션에 second, minute, hour, day, month, week 등을 넣을 수 있습니다.

 

모든 도큐먼트의 start_time 필드를 1분 별로 측정하기

우리는 데이터를 가져오는 것이 목적이 아닌 Aggregations을 수행하는 것이 목적이기 때문에 "size"를 0으로 설정하였습니다.

api POST http://localhost:9200/test-log-index-2021-09-12/_search
header content-type: application/json
body {
    "size": 0,
    "query": {
        "match_all": {}
    },
    "aggs": {
        "date_his": {
            "date_histogram": {
                "field": "start_time",
                "interval": "minute"
            }
        }
    }
}

결과 보기

 

 

 

 

삽입된 데이터는 이전 페이지를 참조해주세요

https://stdhsw.tistory.com/entry/Elasticsearch-%EA%B2%80%EC%83%89%ED%95%98%EA%B8%B01

 

match 쿼리

match는 term과 비슷해 보이지만 미묘한 차이가 있습니다. match의 경우 기본적인 검색을 수행하는데 사용을 하지만 저 같은 경우 text 필드에서 보다 상세한 검색을 수행할 경우 주로 사용됩니다.  즉 문서의 내용에서 특정 단어가 들어 갔는지 검색할때 많이 사용됩니다.

 

text 필드 검색하기

message 필드에 "aaa"라는 문자열이 포함된 모든 도큐먼트를 검색합니다.

api POST http://localhost:9200/my-log-index-2021-08-29/_search
header Content-type: application/json
body {
    "query": {
        "bool": {
            "must": [
                {
                    "match": {
                        "message": "aaa"
                    }
                }
            ]
        }
    }
}

 

term 쿼리

term의 경우 match와 비슷한 모습을 취하지만 term은 완전히 같은 데이터를 검색하는데 많이 사용됩니다. 저 같은 경우 주로 keyword타입에서 특정 단어를 검색하는데 많이 사용하고 있습니다.

 

keyword 타입 검색하기

level 필드의 값이 "info"인 값을 가진 도큐먼트를 검색합니다.

api POST http://localhost:9200/my-log-index-2021-08-29/_search
header Content-type: application/json
body {
    "query": {
        "bool": {
            "must": [
                {
                    "term": {
                        "level": "info"
                    }
                }
            ]
        }
    }
}

 

range 쿼리

range 쿼리는 값의 범위를 검색하는데 사용됩니다. 

 

range쿼리의 파라미터

파라미터명 설명
gte 크거나 같다
gt 크다
lte 작거나 같다
lt 작다

 

범위를 이용하여 검색하기

user_id가 2보다 크거나 같으면서 7보다 작은 도큐먼트 검색하기

api POST http://localhost:9200/my-log-index-2021-08-29/_search
header Content-type: application/json
body {
    "query": {
        "bool": {
            "must": [
                {
                    "range": {
                        "user_id": {
                            "gte": "2",
                            "lt": "7"
                        }
                    }
                }
            ]
        }
    }
}

 

 

+ Recent posts