Elasticsearchでindexを作成してからbulk insert, aliasの切り替えまで

Elasticsearchでindexを作成してデータをbulk insertで投入することと、新しくインデックスを作成してエイリアスを切り替えることでアトミックにESのデータを入れ替える作業をするときの手順。

AWSのElasticsearch Serviceを使った。

ESへのリクエストはmacOSからcurlコマンドを使った。

aws cliのバージョン

$ aws --version
aws-cli/1.11.13 Python/2.7.10 Darwin/18.6.0 botocore/1.4.70

Elasticsearch Serviceでドメインを作成する

EBS Optionやインスタンスタイプを指定する必要がある。

$ aws es create-elasticsearch-domain --domain-name nabewata07-es-test --elasticsearch-cluster-config InstanceType=t2.small.elasticsearch --elasticsearch-version 6.2 --ebs-options EBSEnabled=true,VolumeType=standard,VolumeSize=10
{
    "DomainStatus": {
        "ElasticsearchClusterConfig": {
            "DedicatedMasterEnabled": false,
            "InstanceCount": 1,
            "ZoneAwarenessEnabled": false,
            "InstanceType": "t2.small.elasticsearch"
        },
        "DomainId": "xxxxxxxxxxxx/nabewata07-es-test",
        "Created": true,
        "Deleted": false,
        "EBSOptions": {
            "VolumeSize": 10,
            "VolumeType": "standard",
            "EBSEnabled": true
        },
        "Processing": true,
        "DomainName": "nabewata07-es-test",
        "SnapshotOptions": {
            "AutomatedSnapshotStartHour": 0
        },
        "ElasticsearchVersion": "6.2",
        "AccessPolicies": "",
        "AdvancedOptions": {
            "rest.action.multi.allow_explicit_index": "true"
        },
        "ARN": "arn:aws:es:ap-northeast-1:xxxxxxxxxxxx:domain/nabewata07-es-test"
    }
}

アクセスポリシーで権限を設定する

Createしたときに設定を忘れていてaws es update-elasticsearch-domain-configで設定した。

$ aws es update-elasticsearch-domain-config --domain-name nabewata07-es-test --access-policies '
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": [
        "es:*"
      ],
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": [
            "xxx.xxx.xxx.xxx"
          ]
        }
      },
      "Resource": "arn:aws:es:ap-northeast-1:xxxxxxxxxxxx:domain/nabewata07-es-test/*"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": [
          "arn:aws:iam::xxxxxxxxxxxx:user/nabewata07"
        ]
      },
      "Action": [
        "es:*"
      ],
      "Resource": "arn:aws:es:ap-northeast-1:xxxxxxxxxxxx:domain/nabewata07-es-test/*"
    }
  ]
}
'

作業するために各環境変数を設定する

$ ES_ENDPOINT=$(aws es describe-elasticsearch-domain --domain-name nabewata07-es-test --query "DomainStatus.Endpoint" --output text) && echo $ES_ENDPOINT
search-nabewata07-es-test-xxxxxxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.es.amazonaws.com
$ export INDEX='sample_index'

具体的なESの操作はここから。

indexを作成する

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html

$ curl -X PUT "${ES_ENDPOINT}/${INDEX}"
{"acknowledged":true,"shards_acknowledged":true,"index":"sample_index"}

mappingを定義する

mappingとは、ドキュメントにどのようなフィールドがありそれらのフィールドがどのような型なのかを定義するもの。

https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html

例えば下記のmappingを定義してmapping.jsonという名前で保存する。

{
  "properties": {
    "title":    { "type": "text" },
    "name":     { "type": "text" },
    "age":      { "type": "integer" },
    "created":  {
      "type":   "date",
      "format": "strict_date_optional_time||epoch_millis"
    }
  }
}

dateのフォーマットは

format | Elasticsearch Reference [7.3] | Elastic

を参照。

mappingを反映する

先に作成したmappingのファイルを使って反映する。

$ curl -XPUT "${ES_ENDPOINT}/${INDEX}/_mapping/_doc" -H 'Content-Type: application/json' --data-binary @mapping.json
{"acknowledged":true}

データを投入する

Bulk APIを使用する。

Bulk API | Elasticsearch Reference [7.3] | Elastic

ドキュメントをまとめて登録することができる。

投入するデータを準備する

下記のようなJSONファイルを作成する

{"index": {"_index":"sample_index", "_type":"_doc", "_id":"1"}}
{"title": "title0", "name":"name0", "age":10, "created":"2019-08-01"}
{"index": {"_index":"sample_index", "_type":"_doc", "_id":"2"}}
{"title": "title1", "name":"name1", "age":11, "created":"2019-08-01"}

bulk_insert.jsonという名前で保存する。

Bulk APIでデータを投入する

$ curl -s -H "Content-Type: application/x-ndjson" -XPOST "${ES_ENDPOINT}/_bulk" --data-binary @bulk_insert.json
{"took":160,"errors":false,"items":[{"index":{"_index":"sample_index","_type":"_doc","_id":"1","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}},{"index":{"_index":"sample_index","_type":"_doc","_id":"2","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}}]}

結果を確認する

インデックスに入っているドキュメントの数を数えて確認する

$ curl -X GET "${ES_ENDPOINT}/${INDEX}/_count"
{"count":2,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0}}%

aliasを設定する

POSTでaliasを作成、反映する。

$ curl -XPOST "${ES_ENDPOINT}/${INDEX}/_alias/sample_alias"
{"acknowledged":true}

aliasを確認する

$ curl -X GET "${ES_ENDPOINT}/_cat/aliases?v"
alias        index        filter routing.index routing.search
sample_alias sample_index -      -             -

aliasを切り替える

indexをもう一つ作成する

$ export NEW_INDEX=sample_index01
$ curl -X PUT "${ES_ENDPOINT}/${NEW_INDEX}"
{"acknowledged":true}

新しいインデックスにおなじmappingを反映する

先に作成したmappingのファイルを使って反映する。

$ curl -XPUT "${ES_ENDPOINT}/${NEW_INDEX}/_mapping/_doc" -H 'Content-Type: application/json' --data-binary @mapping.json
{"acknowledged":true}

新しいインデックスに投入するデータを準備する

下記のようなJSONファイルを作成する。

ドキュメントの数を3つにした。

{"index": {"_index":"sample_index01", "_type":"_doc", "_id":"1"}}
{"title": "title2", "name":"name2", "age":12, "created":"2019-08-01"}
{"index": {"_index":"sample_index01", "_type":"_doc", "_id":"2"}}
{"title": "title3", "name":"name1", "age":13, "created":"2019-08-01"}
{"index": {"_index":"sample_index01", "_type":"_doc", "_id":"3"}}
{"title": "title4", "name":"name1", "age":14, "created":"2019-08-01"}

bulk_insert01.jsonという名前で保存する。

Bulk APIでデータを投入する

$ curl -s -H "Content-Type: application/x-ndjson" -XPOST "${ES_ENDPOINT}/_bulk" --data-binary @bulk_insert01.json
{"took":14,"errors":false,"items":[{"index":{"_index":"sample_index01","_type":"_doc","_id":"1","_version":4,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":3,"_primary_term":1,"status":201}},{"index":{"_index":"sample_index01","_type":"_doc","_id":"2","_version":5,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":4,"_primary_term":1,"status":201}},{"index":{"_index":"sample_index01","_type":"_doc","_id":"3","_version":3,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}}]}

aliasを切り替える

$ curl -X POST "${ES_ENDPOINT}/_aliases?pretty" -H 'Content-Type: application/json' -d"
{
    \"actions\" : [
        { \"remove\" : { \"index\" : \"${INDEX}\", \"alias\" : \"sample_alias\" } },
        { \"add\" : { \"index\" : \"${NEW_INDEX}\", \"alias\" : \"sample_alias\" } }
    ]
}
"
{
  "acknowledged" : true
}

aliasを確認する

$ curl -X GET "${ES_ENDPOINT}/_cat/aliases?v"
alias        index          filter routing.index routing.search
sample_alias sample_index01 -      -             -

sample_aliasの向き先がsample_index01に変更されている。

INSERT結果を確認する

インデックスに入っているドキュメントの数を数えて確認する

$ curl -X GET "${ES_ENDPOINT}/sample_alias/_count"
{"count":3,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0}}

ドキュメントIDを指定して取得したいとき

$ curl -X GET "${ES_ENDPOINT}/sample_alias/_doc/1"
{"_index":"sample_index01","_type":"_doc","_id":"1","_version":4,"found":true,"_source":{"title": "title2", "name":"name2", "age":12, "created":"2019-08-01"}}

削除するときはメソッドをDELETEにする。

その他の操作

上記の作業以外にも実施しそうなコマンドを記録しておく。

インデックスを削除する

$ curl -X DELETE "${ES_ENDPOINT}/${INDEX}"

mappingを確認する

$ curl -X GET "${ES_ENDPOINT}/${INDEX}/_mapping"
{"sample_index":{"mappings":{"_doc":{"properties":{"age":{"type":"integer"},"created":{"type":"date"},"name":{"type":"text"},"title":{"type":"text"}}}}}}

すべてのindexを列挙する

curl -X GET "${ES_ENDPOINT}/_cat/indices?v"

そんなに高頻度でする作業ではないので手順をまとめたけどすべてcurlで行っているので、次はスクリプトで実現したい。