시작하기 앞서 Ingest Pipeline이란

Elasticsearch에서 Ingest Pipeline은 데이터를 색인하기 전에 전처리 및 변환 작업을 수행하는 개념입니다. Ingest Pipeline은 데이터를 가져와서 필요한 형식으로 변환하거나 데이터를 필터링하고 파싱 하는 등의 작업 및 데이터의 일관성을 유지하는데 매우 유용하여 효율적인 데이터 처리를 가능하게 하는 중요한 기능입니다.

그림에 보이는 것과 같이 데이터가 들어오면 Ingest pipeline을 통하여 순차적으로 프로세서가 실행되고 결과물은 해당 인덱스에 저장됩니다. Ingest Pipeline이 동작할 수 있도록 해당 노드에는 ingest role이 등록되어야 하지만 일반적으로 데이터 노드에 많이 사용합니다. 허나 프로세서가 많아 부하를 많이 받을 경우 노드를 별도로 분리하는 방법을 고려해 볼 필요성이 있습니다.

이번 문서에서는 여러 유용한 파이프라인 프로세서 중 json에 대해서 실습해봤습니다.

만약 JSON Pipeline을 사용하지 않았다면

만약 JSON Pipeline을 사용하지 않았다면 데이터가 Text형식으로 문자열 그대로 저장되어 통계를 내거나 보다 자세한 데이터 처리를 하는데 많은 제약이 있을 수 있습니다. 아래는 JSON Pipeline을 사용하지 않았을 때의 데이터 형식입니다.

### 데이터 검색
GET http://localhost:9200/my-manifests/_search

### 결과
{
  "took": 4,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "my-manifests",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "cluster": "my-cluster",
          "manifest": "{\"cluster\":\"my-cluster\",\"namespace\":\"my-namespace\",\"name\":\"my-name\",\"version\":\"1.0.0\"}"
        }
      }
    ]
  }
}

 

JSON Pipeline

개발을 할 때 엔지니어가 json 데이터를 Elasticsearch 쿼리로 변경하여 데이터를 넣는 형식으로 개발을 진행한다면 JSON Pipeline은 굳이 필요 없는 기능으로 여길 수 있지만 개발을 하다 보면 앞으로 들어오는 json 데이터의 형식을 모르거나 json 문자열 데이터를 변경할 수 없는 환경에서 개발을 진행해야 되는 경우도 많습니다. 저 같은 경우 Kubernetes의 Resource Manifest 데이터를 가공 처리하는데 Manifest데이터는 yaml 또는 json 데이터로 수집되는데 어떠한 key와 value가 들어오는지 모든 것을 파악하기 어려운 상황을 겪었습니다. 이런 경우 JSON Pipeline을 사용하면 유용하게 데이터를 처리할 수 있을 것 같아 실습 내용을 정리하였습니다.

JSON Pipeline 생성

### 파이프라인 생성
PUT http://localhost:9200/_ingest/pipeline/index-json
Content-Type: application/json

{
  "description" : "ingest pipeline for json",
  "processors" : [
    {
      "json" : {
        "field" : "manifest",
        "target_field": "manifest_json"
      }
    }
  ]
}

파이프라인 생성 파라미터

  • json : 파이프라인 프로세서 중 JSON 프로세서를 사용합니다.
  • json.field : 문자열 형식으로 들어오는 json 필드를 명시합니다.
  • json.target_field : 문자열 필드 형식으로 들어온 데이터를 어떤 필드에 저장할지 지정합니다.

JSON Pipeline 테스트

파이프라인은 simulate라는 기능을 이용하여 내 생각대로 결과물이 나오는지 확인할 수 있습니다.

### 파이프라인 시뮬레이터
POST http://localhost:9200/_ingest/pipeline/_simulate
Content-Type: application/json

{
  "pipeline": {
    "description" : "ingest pipeline for json",
    "processors" : [
      {
        "json" : {
          "field" : "manifest",
          "target_field": "manifest_json"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "cluster": "my-cluster",
        "manifest": "{\"cluster\":\"my-cluster\",\"namespace\":\"my-namespace\",\"name\":\"my-name\",\"version\":\"1.0.0\"}"
      }
    }
  ]
}

실제 데이터로 JSON Pipeline 적용되었는지 확인

먼저 테스트를 하기 위한 인덱스부터 정의하도록 하겠습니다. JSON Pipeline으로 생성되는 모든 필드들을 지정하기는 매우 번거롭기 때문에 dynamic_template을 통하여 JSON Pipeline이 생성하는 필드를 지정하였고 데이터를 넣을 때는 pipeline=index-json을 통해서 파이프라인을 지정해 줍니다.

### 인덱스 생성
PUT http://localhost:9200/my-manifests
Content-Type: application/json

{
  "mappings": {
    "dynamic_templates": [
      {
        "default_strings": {
          "match_mapping_type": "string",
          "mapping": {
            "type": "keyword"
          }
        }
      }
    ],
    "properties": {
      "cluster": {
        "type": "keyword"
      },
      "manifest": {
        "type": "text"
      }
    }
  }
}

### 인덱스 데이터 삽입1
POST http://localhost:9200/my-manifests/_doc/1?pipeline=index-json
Content-Type: application/json

{
  "cluster": "my-cluster",
  "manifest": "{\"cluster\":\"my-cluster\",\"namespace\":\"my-namespace\",\"name\":\"my-name\",\"version\":\"1.0.0\"}"
}

### 인덱스 데이터 삽입2
POST http://localhost:9200/my-manifests/_doc/2?pipeline=index-json
Content-Type: application/json

{
  "cluster": "my-cluster",
  "manifest": "{\"cluster\":\"my-cluster\",\"namespace\":\"my-namespace\",\"name\":\"test\",\"version\":\"2.0.0\"}"
}

결과 확인

manifest_json 필드를 통하여 json의 데이터가 dynamic_template으로 지정한 타입으로 필드가 생성된 것을 확인할 수 있습니다.  확실한 차이점을 보기 위해 위에서 정의한 "만약 JSON Pipeline을 사용하지 않았다면"의 결과와 비교해 보세요.

### 데이터 검색
GET http://localhost:9200/my-manifests/_search

### 결과 확인
{
  "took": 6,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 2,
      "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "my-manifests",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "cluster": "my-cluster",
          "manifest": "{\"cluster\":\"my-cluster\",\"namespace\":\"my-namespace\",\"name\":\"my-name\",\"version\":\"1.0.0\"}",
          "manifest_json": {
            "cluster": "my-cluster",
            "namespace": "my-namespace",
            "name": "my-name",
            "version": "1.0.0"
          }
        }
      },
      {
        "_index": "my-manifests",
        "_id": "2",
        "_score": 1.0,
        "_source": {
          "cluster": "my-cluster",
          "manifest": "{\"cluster\":\"my-cluster\",\"namespace\":\"my-namespace\",\"name\":\"test\",\"version\":\"2.0.0\"}",
          "manifest_json": {
            "cluster": "my-cluster",
            "namespace": "my-namespace",
            "name": "test",
            "version": "2.0.0"
          }
        }
      }
    ]
  }
}

시작하기 앞서 Ingest Pipeline이란

Elasticsearch에서 Ingest Pipeline은 데이터를 색인하기 전에 전처리 및 변환 작업을 수행하는 개념입니다. Ingest Pipeline은 데이터를 가져와서 필요한 형식으로 변환하거나 데이터를 필터링하고 파싱 하는 등의 작업 및 데이터의 일관성을 유지하는데 매우 유용하여 효율적인 데이터 처리를 가능하게 하는 중요한 기능입니다.

그림에 보이는 것과 같이 데이터가 들어오면 Ingest pipeline을 통하여 순차적으로 프로세서가 실행되고 결과물은 해당 인덱스에 저장됩니다. Ingest Pipeline이 동작할 수 있도록 해당 노드에는 ingest role이 등록되어야 하지만 일반적으로 데이터 노드에 많이 사용합니다. 허나 프로세서가 많아 부하를 많이 받을 경우 노드를 별도로 분리하는 방법을 고려해 볼 필요성이 있습니다.

이번 문서에서는 여러 유용한 파이프라인 프로세서 중 Split에 대해서 실습해봤습니다.

Split processor란

Elasticsearch Ingest Pipeline에서 Split Processor는 텍스트 필드를 분할하거나 분리하는 데 사용되는 프로세서입니다. Split Processor를 사용하면 텍스트 필드에서 구분자를 기반으로 하여 여러 하위 텍스트 필드로 분리하거나 특정 패턴을 기반으로 필드를 분할할 수 있습니다. 이는 로그 데이터 또는 다양한 유형의 텍스트 데이터를 구조화하고 필요한 정보를 추출하는 데 유용하게 사용될 수 있습니다.

파이프라인 Split processor 생성하기

### 파이프라인 생성
PUT http://localhost:9200/_ingest/pipeline/user-split
Content-Type: application/json

{
  "description" : "Split user field",
  "processors" : [
    {
      "split" : {
        "field" : "users",
        "separator": ","
      }
    }
  ]
}

user-split 파이프라인 파라미터

  • split : 파이프라인에는 여러 프로세서를 등록할 수 있는데 이번 실습에서는 split 프로세서만 선언하였습니다.
  • split.field : 데이터를 분할할 필드명을 정의합니다.
  • split.separator: 데이터를 분할하는 기준이 되는 문자 캐릭터를 정의합니다.

파이프라인 정상동작 테스트

Elasticsearch에서는 simulate라는 기능을 제공하여 해당 파이프라인의 동작을 엔지니어의 생각대로 동작하는지 테스트를 해볼 수 있습니다.

### 파이프라인 시뮬레이터
POST http://localhost:9200/_ingest/pipeline/_simulate
Content-Type: application/json

{
  "pipeline": {
    "description" : "Split user field",
    "processors" : [
      {
        "split" : {
          "field" : "users",
          "separator": ","
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "users": "user1,user2,user3"
      }
    }
  ]
}

실제 데이터 인덱싱을 통한 결과 확인

파이프라인 테스트가 생각대로 정상적이었다면 이제 실제 데이터를 인덱싱하여 파이프라인이 정상적으로 동작하는지 실습해 보겠습니다.

### 데이터 삽입1
POST http://localhost:30001/user-connected-log/_doc/1?pipeline=user-split
Content-Type: application/json

{
  "level": "info",
  "users": "user1,user2,user3",
  "message": "user connected"
}

### 데이터 삽입2
POST http://localhost:30001/user-connected-log/_doc/2?pipeline=user-split
Content-Type: application/json

{
  "level": "info",
  "users": "user3,user4,user5",
  "message": "user connected"
}

결과 확인

doc_id 1에서는 user1, user2, user3의 데이터가 인덱싱 되었고 doc_id 2에서는 user3, user4, user5의 데이터가 인덱싱 되었습니다. 여기서 users에 user1로만 검색하면 doc_id 1의 데이터만 결과로 추력 되는 것을 확인할 수 있습니다.

### 인덱스 검색
GET http://localhost:30001/user-connected-log/_search
Content-Type: application/json

{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "users": "user1"
          }
        }
      ]
    }
  }
}


### 결과
{
  "took": 5,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 0.6931471,
    "hits": [
      {
        "_index": "user-connected-log",
        "_id": "1",
        "_score": 0.6931471,
        "_source": {
          "message": "user connected",
          "level": "info",
          "users": [
            "user1",
            "user2",
            "user3"
          ]
        }
      }
    ]
  }
}

 

'Elasticsearch > pipeline' 카테고리의 다른 글

Elasticsearch Ingest pipeline json  (0) 2023.10.20
Elasticsearch Ingest pipeline enrich processor  (0) 2023.10.18

시작하기 앞서 Ingest Pipeline이란

Elasticsearch에서 Ingest Pipeline은 데이터를 색인하기 전에 전처리 및 변환 작업을 수행하는 개념입니다. Ingest Pipeline은 데이터를 가져와서 필요한 형식으로 변환하거나 데이터를 필터링하고 파싱 하는 등의 작업 및 데이터의 일관성을 유지하는데 매우 유용하여 효율적인 데이터 처리를 가능하게 하는 중요한 기능입니다.

그림에 보이는 것과 같이 데이터가 들어오면 Ingest pipeline을 통하여 순차적으로 프로세서가 실행되고 결과물은 해당 인덱스에 저장됩니다. Ingest Pipeline이 동작할 수 있도록 해당 노드에는 ingest role이 등록되어야 하지만 일반적으로 데이터 노드에 많이 사용합니다. 허나 프로세서가 많아 부하를 많이 받을 경우 노드를 별도로 분리하는 방법을 고려해 볼 필요성이 있습니다.

이번 문서에서는 여러 유용한 파이프라인 프로세서 중 Enrichment에 대해서 실습해봤습니다.

Enrich Processor

RDB에서 간단한 기능인 JOIN이라는 기능을 Elasticsearch에서 역색인 구조를 가지고 있는 Elasticsearch에서는 구현하는데는 아주 많은 리소스와 페널티가 있습니다. 그렇기 때문에 JOIN을 수행하는 행위는 Elasticsearch에서 권장하지 않고 있습니다.

Elasticsearch의 Enrich Processor는 Ingest pipeline의 기능중 하나로 외부 데이터 소스를 활용하여 기존 인덱스 데이터를 이용하여 추가 정보를 추출하거나 결합할 수 있는 기능을 제공하여 RDB에 JOIN과 비슷한 기능을 가능하게 되었습니다.

이러한 Enrich processor를 이용하여 데이터를 추가하는 방법을 실습해봤습니다.

작업 내용

미리 저장된 my-service 인덱스의 데이터를 enrich.field를 기반으로 추출하여 my-log 인덱스가 인덱싱 될 때 같이 데이터가 기록되도록 구현하였습니다.

소스 인덱스 생성하기

먼저 Enrich policy를 생성하기 전에 데이터를 추가하기 위한 소스 인덱스를 생성하고 데이터를 미리 삽입하는 작업을 수행하겠습니다.

### 서비스 인덱스 생성
PUT http://localhost:9200/my-service
Content-Type: application/json

{
  "settings": {
    "index": {
      "number_of_shards": 4,
      "number_of_replicas": 1,
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "properties": {
      "service": { "type": "keyword" },
      "endpoint": { "type": "keyword" },
      "description": { "type": "text" }
    }
  }
}

### 서비스 데이터 삽입1
PUT http://localhost:9200/my-service/_doc/1?refresh=wait_for
Content-Type: application/json

{
  "service": "payment",
  "endpoint": "localhot:32000",
  "description": "This service is for selling products"
}

### 서비스 데이터 삽입2
PUT http://localhost:9200/my-service/_doc/2?refresh=wait_for
Content-Type: application/json

{
  "service": "login",
  "endpoint": "localhot:31000",
  "description": "This service is for users to login"
}

Enrich processor 생성 및 실행

### 엔리치 생성
PUT http://localhost:9200/_enrich/policy/my-enrich
Content-Type: application/json

{
  "match": {
    "indices": "my-service",
    "match_field": "service",
    "enrich_fields": ["endpoint", "description"]
  }
}

### 엔리치 실행
POST http://localhost:9200/_enrich/policy/my-enrich/_execute

enrich policy 생성 파라미터

  • match.indices : 데이터를 추출할 소스 인덱스를 지정합니다.
  • match.match_field : 소스 인덱스에 서로 매칭할 필드를 지정합니다.
  • match.enrich_fields : 매칭 후 추출될 데이터 필드를 지정합니다.

파이프라인 생성

파이프라인을 생성할 때는 processor를 리스트 형식으로 생성하게 되는데 리스트의 순서대로 processor가 실행됩니다. 

### 파이프라인 생성
PUT http://localhost:9200/_ingest/pipeline/service_join
Content-Type: application/json

{
  "description" : "Service information using enrich",
  "processors" : [
    {
      "enrich" : {
        "policy_name": "my-enrich",
        "field" : "service",
        "target_field": "service_info",
        "max_matches": "1"
      }
    }
  ]
}

파이프라인 생성 파라미터

  • enrich.policy_name : 위에서 생성한 enrich policy
  • enrich.field : my-log인덱스에서 매칭할 필드를 지정합니다.
  • enrich.target_field : enrich policy에서 추출된 (endpoint, description) 데이터를 넣을 필드를 설정합니다.
  • enrich.max_matches : 최대로 매칭할 수 있는 개수를 설정합니다. 간단한 테스트이므로 1로 설정하였습니다.

my-log 인덱스 생성 및 데이터 삽입

데이터가 추가될 때 pipeline을 명시하여 위에서 설정한 Ingest pipeline이 동작할 수 있도록 합니다.

PUT http://localhost:9200/my-log
Content-Type: application/json

{
  "settings": {
    "index": {
      "number_of_shards": 4,
      "number_of_replicas": 1,
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "properties": {
      "service": { "type": "keyword" },
      "level": { "type": "keyword" },
      "message": {"type": "text" }
    }
  }
}

### 로그 데이터 삽입1
PUT http://localhost:9200/my-log/_doc/1?pipeline=service_join
Content-Type: application/json

{
  "service": "login",
  "level": "debug",
  "message": "user A logged in successfully"
}

### 로그 데이터 삽입2
PUT http://localhost:9200/my-log/_doc/2?pipeline=service_join
Content-Type: application/json

{
"service": "login",
"level": "error",
"message": "user B login failed"
}

### 로그 데이터 삽입3
PUT http://localhost:9200/my-log/_doc/3?pipeline=service_join
Content-Type: application/json

{
  "service": "payment",
  "level": "debug",
  "message": "user a successfully paid for the product"
}

실제로 우리는 my-log 인덱스에 데이터를 넣을 때 service, level, message 데이터만 인덱싱하였습니다. 이제 결과물을 확인하면 enrich를 통하여 새롭게 정의된 service_info 필드를 통해 service의 데이터가 추가된 것을 확인할 수 있습니다.

GET http://localhost:9200/my-log/_search

### 결과
{
  "took": 8,
  "timed_out": false,
  "_shards": {
    "total": 4,
    "successful": 4,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "my-log",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "service_info": {
            "description": "This service is for users to login",
            "endpoint": "localhot:30000",
            "service": "login"
          },
          "message": "user A logged in successfully",
          "level": "debug",
          "service": "login"
        }
      },
      {
        "_index": "my-log",
        "_id": "2",
        "_score": 1.0,
        "_source": {
          "service_info": {
            "description": "This service is for users to login",
            "endpoint": "localhot:30000",
            "service": "login"
          },
          "message": "user B login failed",
          "level": "error",
          "service": "login"
        }
      },
      {
        "_index": "my-log",
        "_id": "3",
        "_score": 1.0,
        "_source": {
          "service_info": {
            "description": "This service is for selling products",
            "endpoint": "localhot:30001",
            "service": "payment"
          },
          "message": "user a successfully paid for the product",
          "level": "debug",
          "service": "payment"
        }
      }
    ]
  }
}

 

참조

https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html

 

Ingest pipelines | Elasticsearch Guide [8.10] | Elastic

Ingest pipelines let you perform common transformations on your data before indexing. For example, you can use pipelines to remove fields, extract values from text, and enrich your data. A pipeline consists of a series of configurable tasks called processo

www.elastic.co

https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-setup.html

 

Set up an enrich processor | Elasticsearch Guide [8.10] | Elastic

The enrich processor performs several operations and may impact the speed of your ingest pipeline. We strongly recommend testing and benchmarking your enrich processors before deploying them in production. We do not recommend using the enrich processor to

www.elastic.co

https://www.elastic.co/guide/en/elasticsearch/reference/current/put-enrich-policy-api.html

 

Create enrich policy API | Elasticsearch Guide [8.10] | Elastic

Once created, you can’t update or change an enrich policy. Instead, you can: Create and execute a new enrich policy. Replace the previous enrich policy with the new enrich policy in any in-use enrich processors. Use the delete enrich policy API to delete

www.elastic.co

 

'Elasticsearch > pipeline' 카테고리의 다른 글

Elasticsearch Ingest pipeline json  (0) 2023.10.20
Elasticsearch Ingest pipeline split processor  (0) 2023.10.19

+ Recent posts