시작하기 앞서 Ingest Pipeline이란

Elasticsearch에서 Ingest Pipeline은 데이터를 색인하기 전에 전처리 및 변환 작업을 수행하는 개념입니다. Ingest Pipeline은 데이터를 가져와서 필요한 형식으로 변환하거나 데이터를 필터링하고 파싱 하는 등의 작업 및 데이터의 일관성을 유지하는데 매우 유용하여 효율적인 데이터 처리를 가능하게 하는 중요한 기능입니다.

그림에 보이는 것과 같이 데이터가 들어오면 Ingest pipeline을 통하여 순차적으로 프로세서가 실행되고 결과물은 해당 인덱스에 저장됩니다. Ingest Pipeline이 동작할 수 있도록 해당 노드에는 ingest role이 등록되어야 하지만 일반적으로 데이터 노드에 많이 사용합니다. 허나 프로세서가 많아 부하를 많이 받을 경우 노드를 별도로 분리하는 방법을 고려해 볼 필요성이 있습니다.

이번 문서에서는 여러 유용한 파이프라인 프로세서 중 Enrichment에 대해서 실습해봤습니다.

Enrich Processor

RDB에서 간단한 기능인 JOIN이라는 기능을 Elasticsearch에서 역색인 구조를 가지고 있는 Elasticsearch에서는 구현하는데는 아주 많은 리소스와 페널티가 있습니다. 그렇기 때문에 JOIN을 수행하는 행위는 Elasticsearch에서 권장하지 않고 있습니다.

Elasticsearch의 Enrich Processor는 Ingest pipeline의 기능중 하나로 외부 데이터 소스를 활용하여 기존 인덱스 데이터를 이용하여 추가 정보를 추출하거나 결합할 수 있는 기능을 제공하여 RDB에 JOIN과 비슷한 기능을 가능하게 되었습니다.

이러한 Enrich processor를 이용하여 데이터를 추가하는 방법을 실습해봤습니다.

작업 내용

미리 저장된 my-service 인덱스의 데이터를 enrich.field를 기반으로 추출하여 my-log 인덱스가 인덱싱 될 때 같이 데이터가 기록되도록 구현하였습니다.

소스 인덱스 생성하기

먼저 Enrich policy를 생성하기 전에 데이터를 추가하기 위한 소스 인덱스를 생성하고 데이터를 미리 삽입하는 작업을 수행하겠습니다.

### 서비스 인덱스 생성
PUT http://localhost:9200/my-service
Content-Type: application/json

{
  "settings": {
    "index": {
      "number_of_shards": 4,
      "number_of_replicas": 1,
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "properties": {
      "service": { "type": "keyword" },
      "endpoint": { "type": "keyword" },
      "description": { "type": "text" }
    }
  }
}

### 서비스 데이터 삽입1
PUT http://localhost:9200/my-service/_doc/1?refresh=wait_for
Content-Type: application/json

{
  "service": "payment",
  "endpoint": "localhot:32000",
  "description": "This service is for selling products"
}

### 서비스 데이터 삽입2
PUT http://localhost:9200/my-service/_doc/2?refresh=wait_for
Content-Type: application/json

{
  "service": "login",
  "endpoint": "localhot:31000",
  "description": "This service is for users to login"
}

Enrich processor 생성 및 실행

### 엔리치 생성
PUT http://localhost:9200/_enrich/policy/my-enrich
Content-Type: application/json

{
  "match": {
    "indices": "my-service",
    "match_field": "service",
    "enrich_fields": ["endpoint", "description"]
  }
}

### 엔리치 실행
POST http://localhost:9200/_enrich/policy/my-enrich/_execute

enrich policy 생성 파라미터

  • match.indices : 데이터를 추출할 소스 인덱스를 지정합니다.
  • match.match_field : 소스 인덱스에 서로 매칭할 필드를 지정합니다.
  • match.enrich_fields : 매칭 후 추출될 데이터 필드를 지정합니다.

파이프라인 생성

파이프라인을 생성할 때는 processor를 리스트 형식으로 생성하게 되는데 리스트의 순서대로 processor가 실행됩니다. 

### 파이프라인 생성
PUT http://localhost:9200/_ingest/pipeline/service_join
Content-Type: application/json

{
  "description" : "Service information using enrich",
  "processors" : [
    {
      "enrich" : {
        "policy_name": "my-enrich",
        "field" : "service",
        "target_field": "service_info",
        "max_matches": "1"
      }
    }
  ]
}

파이프라인 생성 파라미터

  • enrich.policy_name : 위에서 생성한 enrich policy
  • enrich.field : my-log인덱스에서 매칭할 필드를 지정합니다.
  • enrich.target_field : enrich policy에서 추출된 (endpoint, description) 데이터를 넣을 필드를 설정합니다.
  • enrich.max_matches : 최대로 매칭할 수 있는 개수를 설정합니다. 간단한 테스트이므로 1로 설정하였습니다.

my-log 인덱스 생성 및 데이터 삽입

데이터가 추가될 때 pipeline을 명시하여 위에서 설정한 Ingest pipeline이 동작할 수 있도록 합니다.

PUT http://localhost:9200/my-log
Content-Type: application/json

{
  "settings": {
    "index": {
      "number_of_shards": 4,
      "number_of_replicas": 1,
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "properties": {
      "service": { "type": "keyword" },
      "level": { "type": "keyword" },
      "message": {"type": "text" }
    }
  }
}

### 로그 데이터 삽입1
PUT http://localhost:9200/my-log/_doc/1?pipeline=service_join
Content-Type: application/json

{
  "service": "login",
  "level": "debug",
  "message": "user A logged in successfully"
}

### 로그 데이터 삽입2
PUT http://localhost:9200/my-log/_doc/2?pipeline=service_join
Content-Type: application/json

{
"service": "login",
"level": "error",
"message": "user B login failed"
}

### 로그 데이터 삽입3
PUT http://localhost:9200/my-log/_doc/3?pipeline=service_join
Content-Type: application/json

{
  "service": "payment",
  "level": "debug",
  "message": "user a successfully paid for the product"
}

실제로 우리는 my-log 인덱스에 데이터를 넣을 때 service, level, message 데이터만 인덱싱하였습니다. 이제 결과물을 확인하면 enrich를 통하여 새롭게 정의된 service_info 필드를 통해 service의 데이터가 추가된 것을 확인할 수 있습니다.

GET http://localhost:9200/my-log/_search

### 결과
{
  "took": 8,
  "timed_out": false,
  "_shards": {
    "total": 4,
    "successful": 4,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "my-log",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "service_info": {
            "description": "This service is for users to login",
            "endpoint": "localhot:30000",
            "service": "login"
          },
          "message": "user A logged in successfully",
          "level": "debug",
          "service": "login"
        }
      },
      {
        "_index": "my-log",
        "_id": "2",
        "_score": 1.0,
        "_source": {
          "service_info": {
            "description": "This service is for users to login",
            "endpoint": "localhot:30000",
            "service": "login"
          },
          "message": "user B login failed",
          "level": "error",
          "service": "login"
        }
      },
      {
        "_index": "my-log",
        "_id": "3",
        "_score": 1.0,
        "_source": {
          "service_info": {
            "description": "This service is for selling products",
            "endpoint": "localhot:30001",
            "service": "payment"
          },
          "message": "user a successfully paid for the product",
          "level": "debug",
          "service": "payment"
        }
      }
    ]
  }
}

 

참조

https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html

 

Ingest pipelines | Elasticsearch Guide [8.10] | Elastic

Ingest pipelines let you perform common transformations on your data before indexing. For example, you can use pipelines to remove fields, extract values from text, and enrich your data. A pipeline consists of a series of configurable tasks called processo

www.elastic.co

https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-setup.html

 

Set up an enrich processor | Elasticsearch Guide [8.10] | Elastic

The enrich processor performs several operations and may impact the speed of your ingest pipeline. We strongly recommend testing and benchmarking your enrich processors before deploying them in production. We do not recommend using the enrich processor to

www.elastic.co

https://www.elastic.co/guide/en/elasticsearch/reference/current/put-enrich-policy-api.html

 

Create enrich policy API | Elasticsearch Guide [8.10] | Elastic

Once created, you can’t update or change an enrich policy. Instead, you can: Create and execute a new enrich policy. Replace the previous enrich policy with the new enrich policy in any in-use enrich processors. Use the delete enrich policy API to delete

www.elastic.co

 

'Elasticsearch > pipeline' 카테고리의 다른 글

Elasticsearch Ingest pipeline json  (0) 2023.10.20
Elasticsearch Ingest pipeline split processor  (0) 2023.10.19

+ Recent posts