elasticsearch-rubyでAWSのElasticsearch Serviceで構築したElasticsearchに接続したときにはまった

https://dev-nabewata07.hatenablog.com/entry/2019/08/11/183106

上記記事の「アクセスポリシーで権限を設定する」までの手順でドメインの作成、アクセスポリシーの設定を行ったあとにRubyelasticsearchfaraday_middleware/aws_sigv4というgemを使ってAWSのElasticsearch Serviceで構築したElasticsearchに接続したときにハマったので対応を記録する。

該当のGem

https://rubygems.org/gems/elasticsearch

https://rubygems.org/gems/faraday_middleware-aws-sigv4

環境

Rubyのバージョン

$ ruby -v
ruby 2.6.3p62 (2019-04-16 revision 67580) [x86_64-darwin18]

各Gemのバージョン

aws-sigv4 (1.1.0)
elasticsearch (7.3.0)
elasticsearch-api (7.3.0)
elasticsearch-transport (7.3.0)

結論から

接続先のポート番号が9200以外のときはElasticsearch::Client.newでクライアントを作るときは引数でportも指定する必要がある。

具体的な理由は今回のelasticsearch-rubyGemではデフォルトのポート番号が9200で設定されるがAWSのElasticsearch Serviceで作成したドメインにアクセスするときは80または443番ポートでアクセスする必要があるため。

スクリプトは下記のように記述した。

host = ENV['ES_HOST']
port = 443
client = Elasticsearch::Client.new(host: host, port: port) do |f|
  f.request :aws_sigv4,
    service: service,
    region: region,
    access_key_id: ENV['AWS_ACCESS_KEY_ID'],
    secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']
  f.adapter Faraday.default_adapter
end

最初に記述したスクリプト

require 'elasticsearch'
require 'faraday_middleware/aws_sigv4'

require 'dotenv'
Dotenv.load

# e.g. https://my-domain.region.es.com
host = ENV['ES_HOST']

region = 'ap-northeast-1'
service = 'es'

client = Elasticsearch::Client.new(host: host) do |f|
  f.request :aws_sigv4,
    service: service,
    region: region,
    access_key_id: ENV['AWS_ACCESS_KEY_ID'],
    secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']
  f.adapter Faraday.default_adapter
end

puts client.cat.indices

実行すると

$ bundle exec ruby sample.rb
...
/path/to/ruby/2.6.0/net/http.rb:947:in `initialize': execution expired (Net::OpenTimeout)
...
/path/to/ruby/2.6.0/net/http.rb:947:in `initialize': execution expired (Faraday::ConnectionFailed)

上記のように例外が発生して失敗した。

対応

調査したところelasticsearch-transportのドキュメントにDefult Portの項目があり、別のポート番号の場合は指定するように書いてあった。

https://github.com/elastic/elasticsearch-ruby/tree/master/elasticsearch-transport#default-port

The default port is 9200. Please specify a port for your host(s) if they differ from this default. Please see below for an exception to this when connecting using an Elastic Cloud ID.

もとにしているGemであるelasticsearch-rubyのドキュメントにも

https://github.com/elastic/elasticsearch-ruby

Usageの項目に

https://github.com/elastic/elasticsearch-ruby#usage

このGemはelasticsearch-transportelasticsearch-apiの2つのライブラリのラッパーである旨と

Both of these libraries are extensively documented. Please read the elasticsearch-transport and the elasticsearch-api documentation carefully.

のようにそちらのドキュメントを読むように記述してあった。

具体的には結論からで書いたようにportを明示的に指定した。

まとめ

当たり前だけどライブラリを使うときはドキュメントを注意深く読む。

心がけていたつもりだったけど時間がないときなどはおろそかにしがちなのでいい反省になった。

Elasticsearchでindexを作成してからbulk insert, aliasの切り替えまで

Elasticsearchでindexを作成してデータをbulk insertで投入することと、新しくインデックスを作成してエイリアスを切り替えることでアトミックにESのデータを入れ替える作業をするときの手順。

AWSのElasticsearch Serviceを使った。

ESへのリクエストはmacOSからcurlコマンドを使った。

aws cliのバージョン

$ aws --version
aws-cli/1.11.13 Python/2.7.10 Darwin/18.6.0 botocore/1.4.70

Elasticsearch Serviceでドメインを作成する

EBS Optionやインスタンスタイプを指定する必要がある。

$ aws es create-elasticsearch-domain --domain-name nabewata07-es-test --elasticsearch-cluster-config InstanceType=t2.small.elasticsearch --elasticsearch-version 6.2 --ebs-options EBSEnabled=true,VolumeType=standard,VolumeSize=10
{
    "DomainStatus": {
        "ElasticsearchClusterConfig": {
            "DedicatedMasterEnabled": false,
            "InstanceCount": 1,
            "ZoneAwarenessEnabled": false,
            "InstanceType": "t2.small.elasticsearch"
        },
        "DomainId": "xxxxxxxxxxxx/nabewata07-es-test",
        "Created": true,
        "Deleted": false,
        "EBSOptions": {
            "VolumeSize": 10,
            "VolumeType": "standard",
            "EBSEnabled": true
        },
        "Processing": true,
        "DomainName": "nabewata07-es-test",
        "SnapshotOptions": {
            "AutomatedSnapshotStartHour": 0
        },
        "ElasticsearchVersion": "6.2",
        "AccessPolicies": "",
        "AdvancedOptions": {
            "rest.action.multi.allow_explicit_index": "true"
        },
        "ARN": "arn:aws:es:ap-northeast-1:xxxxxxxxxxxx:domain/nabewata07-es-test"
    }
}

アクセスポリシーで権限を設定する

Createしたときに設定を忘れていてaws es update-elasticsearch-domain-configで設定した。

$ aws es update-elasticsearch-domain-config --domain-name nabewata07-es-test --access-policies '
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": [
        "es:*"
      ],
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": [
            "xxx.xxx.xxx.xxx"
          ]
        }
      },
      "Resource": "arn:aws:es:ap-northeast-1:xxxxxxxxxxxx:domain/nabewata07-es-test/*"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": [
          "arn:aws:iam::xxxxxxxxxxxx:user/nabewata07"
        ]
      },
      "Action": [
        "es:*"
      ],
      "Resource": "arn:aws:es:ap-northeast-1:xxxxxxxxxxxx:domain/nabewata07-es-test/*"
    }
  ]
}
'

作業するために各環境変数を設定する

$ ES_ENDPOINT=$(aws es describe-elasticsearch-domain --domain-name nabewata07-es-test --query "DomainStatus.Endpoint" --output text) && echo $ES_ENDPOINT
search-nabewata07-es-test-xxxxxxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.es.amazonaws.com
$ export INDEX='sample_index'

具体的なESの操作はここから。

indexを作成する

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html

$ curl -X PUT "${ES_ENDPOINT}/${INDEX}"
{"acknowledged":true,"shards_acknowledged":true,"index":"sample_index"}

mappingを定義する

mappingとは、ドキュメントにどのようなフィールドがありそれらのフィールドがどのような型なのかを定義するもの。

https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html

例えば下記のmappingを定義してmapping.jsonという名前で保存する。

{
  "properties": {
    "title":    { "type": "text" },
    "name":     { "type": "text" },
    "age":      { "type": "integer" },
    "created":  {
      "type":   "date",
      "format": "strict_date_optional_time||epoch_millis"
    }
  }
}

dateのフォーマットは

format | Elasticsearch Reference [7.3] | Elastic

を参照。

mappingを反映する

先に作成したmappingのファイルを使って反映する。

$ curl -XPUT "${ES_ENDPOINT}/${INDEX}/_mapping/_doc" -H 'Content-Type: application/json' --data-binary @mapping.json
{"acknowledged":true}

データを投入する

Bulk APIを使用する。

Bulk API | Elasticsearch Reference [7.3] | Elastic

ドキュメントをまとめて登録することができる。

投入するデータを準備する

下記のようなJSONファイルを作成する

{"index": {"_index":"sample_index", "_type":"_doc", "_id":"1"}}
{"title": "title0", "name":"name0", "age":10, "created":"2019-08-01"}
{"index": {"_index":"sample_index", "_type":"_doc", "_id":"2"}}
{"title": "title1", "name":"name1", "age":11, "created":"2019-08-01"}

bulk_insert.jsonという名前で保存する。

Bulk APIでデータを投入する

$ curl -s -H "Content-Type: application/x-ndjson" -XPOST "${ES_ENDPOINT}/_bulk" --data-binary @bulk_insert.json
{"took":160,"errors":false,"items":[{"index":{"_index":"sample_index","_type":"_doc","_id":"1","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}},{"index":{"_index":"sample_index","_type":"_doc","_id":"2","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}}]}

結果を確認する

インデックスに入っているドキュメントの数を数えて確認する

$ curl -X GET "${ES_ENDPOINT}/${INDEX}/_count"
{"count":2,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0}}%

aliasを設定する

POSTでaliasを作成、反映する。

$ curl -XPOST "${ES_ENDPOINT}/${INDEX}/_alias/sample_alias"
{"acknowledged":true}

aliasを確認する

$ curl -X GET "${ES_ENDPOINT}/_cat/aliases?v"
alias        index        filter routing.index routing.search
sample_alias sample_index -      -             -

aliasを切り替える

indexをもう一つ作成する

$ export NEW_INDEX=sample_index01
$ curl -X PUT "${ES_ENDPOINT}/${NEW_INDEX}"
{"acknowledged":true}

新しいインデックスにおなじmappingを反映する

先に作成したmappingのファイルを使って反映する。

$ curl -XPUT "${ES_ENDPOINT}/${NEW_INDEX}/_mapping/_doc" -H 'Content-Type: application/json' --data-binary @mapping.json
{"acknowledged":true}

新しいインデックスに投入するデータを準備する

下記のようなJSONファイルを作成する。

ドキュメントの数を3つにした。

{"index": {"_index":"sample_index01", "_type":"_doc", "_id":"1"}}
{"title": "title2", "name":"name2", "age":12, "created":"2019-08-01"}
{"index": {"_index":"sample_index01", "_type":"_doc", "_id":"2"}}
{"title": "title3", "name":"name1", "age":13, "created":"2019-08-01"}
{"index": {"_index":"sample_index01", "_type":"_doc", "_id":"3"}}
{"title": "title4", "name":"name1", "age":14, "created":"2019-08-01"}

bulk_insert01.jsonという名前で保存する。

Bulk APIでデータを投入する

$ curl -s -H "Content-Type: application/x-ndjson" -XPOST "${ES_ENDPOINT}/_bulk" --data-binary @bulk_insert01.json
{"took":14,"errors":false,"items":[{"index":{"_index":"sample_index01","_type":"_doc","_id":"1","_version":4,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":3,"_primary_term":1,"status":201}},{"index":{"_index":"sample_index01","_type":"_doc","_id":"2","_version":5,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":4,"_primary_term":1,"status":201}},{"index":{"_index":"sample_index01","_type":"_doc","_id":"3","_version":3,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}}]}

aliasを切り替える

$ curl -X POST "${ES_ENDPOINT}/_aliases?pretty" -H 'Content-Type: application/json' -d"
{
    \"actions\" : [
        { \"remove\" : { \"index\" : \"${INDEX}\", \"alias\" : \"sample_alias\" } },
        { \"add\" : { \"index\" : \"${NEW_INDEX}\", \"alias\" : \"sample_alias\" } }
    ]
}
"
{
  "acknowledged" : true
}

aliasを確認する

$ curl -X GET "${ES_ENDPOINT}/_cat/aliases?v"
alias        index          filter routing.index routing.search
sample_alias sample_index01 -      -             -

sample_aliasの向き先がsample_index01に変更されている。

INSERT結果を確認する

インデックスに入っているドキュメントの数を数えて確認する

$ curl -X GET "${ES_ENDPOINT}/sample_alias/_count"
{"count":3,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0}}

ドキュメントIDを指定して取得したいとき

$ curl -X GET "${ES_ENDPOINT}/sample_alias/_doc/1"
{"_index":"sample_index01","_type":"_doc","_id":"1","_version":4,"found":true,"_source":{"title": "title2", "name":"name2", "age":12, "created":"2019-08-01"}}

削除するときはメソッドをDELETEにする。

その他の操作

上記の作業以外にも実施しそうなコマンドを記録しておく。

インデックスを削除する

$ curl -X DELETE "${ES_ENDPOINT}/${INDEX}"

mappingを確認する

$ curl -X GET "${ES_ENDPOINT}/${INDEX}/_mapping"
{"sample_index":{"mappings":{"_doc":{"properties":{"age":{"type":"integer"},"created":{"type":"date"},"name":{"type":"text"},"title":{"type":"text"}}}}}}

すべてのindexを列挙する

curl -X GET "${ES_ENDPOINT}/_cat/indices?v"

そんなに高頻度でする作業ではないので手順をまとめたけどすべてcurlで行っているので、次はスクリプトで実現したい。

S3にオブジェクトを作成したイベントをトリガーにしてSNS経由でLambda関数を呼び出す

S3にオブジェクトを作成した際にPUTやPOSTのイベントが発生する。

それをトリガーにしてSNS経由でLambda関数を呼び出し、作成されたS3オブジェクトを取得して何らかの処理をするための各種設定とLambda関数の作成を行った。

作業した環境

Macからaws-cliで作業する。バージョンは

$ aws --version
aws-cli/1.11.13 Python/2.7.10 Darwin/18.6.0 botocore/1.4.70

以下、設定の作業内容。

必要なリソースの作成

S3バケットを作る

$ aws s3 mb s3://nabewata07-event-test00 --region ap-northeast-1
make_bucket: nabewata07-event-test00

SNS Topicを作成する

$ aws sns create-topic --name event-test-topic01 --region ap-northeast-1
{
    "TopicArn": "arn:aws:sns:ap-northeast-1:xxxxxxxxxxxxx:event-test-topic01"
}

SNSトピックをトリガーにしたLambda関数を作成する

イベントを発行したオブジェクトのContentTypeを表示する関数

console.log('Loading function');

const aws = require('aws-sdk');

const s3 = new aws.S3({ apiVersion: '2006-03-01' });

exports.handler = async (event, context) => {
    // Get the object from the event and show its content type
    const jsonStr = event.Records[0].Sns.Message;
    const obj = JSON.parse(jsonStr)
    const target = obj.Records[0].s3;
    const bucket = target.bucket.name;
    const key = decodeURIComponent(target.object.key.replace(/\+/g, ' '));
    const params = {
        Bucket: bucket,
        Key: key,
    };
    try {
        const { ContentType, Body } = await s3.getObject(params).promise();
        console.log('CONTENT TYPE:', ContentType);
        // do something with content body
        //console.log('Body:', Body.toString());
        return ContentType;
    } catch (err) {
        console.log(err);
        throw new Error(message);
    }
};

作成したコードをzip圧縮する

デプロイのため。

$ zip index.js.zip index.js
  adding: index.js (deflated 47%)

Lambda関数を作成する

$ aws lambda create-function --function-name s3GetObjectSample --runtime nodejs10.x --role arn:aws:iam::xxxxxxxxxxxx:role/service-role/LambdaS3FuncRole --handler index.handler --zip-file fileb://index.js.zip --region ap-northeast-1
{
    "CodeSha256": "78l2lot/Qie5DQX2R7j1PMXLbiNivMToAaVbny7OQpQ=",
    "FunctionName": "s3GetObjectSample",
    "CodeSize": 635,
    "MemorySize": 128,
    "FunctionArn": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxx:function:s3GetObjectSample",
    "Version": "$LATEST",
    "Role": "arn:aws:iam::xxxxxxxxx:role/service-role/LambdaS3FuncRole",
    "Timeout": 3,
    "LastModified": "2019-07-28T23:34:32.759+0000",
    "Handler": "index.handler",
    "Runtime": "nodejs10.x",
    "Description": ""
}

Lambda関数のポリシーに必要な権限

LambdaS3FuncRoleには以下のポリシー設定が必要。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": "arn:aws:s3:::nabewata07-event-test00/upload/sampledata/*"
        },
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:ap-northeast-1:xxxxxxxxxxxxxx:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:ap-northeast-1:xxxxxxxxxxxxxx:log-group:/aws/lambda/s3GetObjectSample:*"
            ]
        }
    ]
}

SNS TopicからLambda関数を呼び出す許可を設定する

Lambda関数のFunction policyで設定する。

$ aws lambda add-permission --function-name s3GetObjectSample --statement-id invoke-from-sns-topic00 --action lambda:InvokeFunction --principal sns.amazonaws.com --source-arn arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:event-test-topic01
{
    "Statement": "{\"Sid\":\"invoke-from-sns-topic00\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"sns.amazonaws.com\"},\"Action\":\"lambda:InvokeFunction\",\"Resource\":\"arn:aws:lambda:ap-northeast-1:325528992442:function:s3GetObjectSample\",\"Condition\":{\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:sns:ap-northeast-1:325528992442:event-test-topic01\"}}}"
}

LambdaがSNSトピックをサブスクライブする設定を行う

--topic-arnは先に作成したSNS TopicのARNを指定。

--notification-endpointには先に作成したLambda関数のARNを指定。

$ aws sns subscribe --protocol lambda \
--topic-arn arn:aws:sns:ap-northeast-1:325528992442:event-test-topic01 \
--notification-endpoint arn:aws:lambda:ap-northeast-1:325528992442:function:s3GetObjectSample \
--region ap-northeast-1

{
    "SubscriptionArn": "arn:aws:sns:ap-northeast-1:325528992442:event-test-topic01:af6d12be-b9ed-40f3-9041-49c59fa3360f"
}

SNS TopicがS3から呼び出される許可をする

下記のStatementをSNSのアクセスポリシーに追加する。

{
    "Sid": "Stmt1561879283049",
    "Effect": "Allow",
    "Principal": "*",
    "Action": "sns:Publish",
    "Resource": "arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:event-test-topic01",
    "Condition": {
        "ArnEquals": {
            "aws:SourceArn": "arn:aws:s3:::nabewata07-event-test00"
        }
    }
}

これはCLIから実行する方法がわからなかったのでマネジメントコンソールのGUIから行った。

S3バケットにイベントの設定を行う

設定用のJSONファイルを作る

対象のバケットupload/sampledataというプレフィックスかつ.csvというサフィックスが付いたオブジェクトがPUTまたはPOSTによって作成されたときに先ほど作成したSNSトピックに通知する設定にした。

{
    "TopicConfigurations": [
        {
            "Filter": {
                "Key": {
                    "FilterRules": [
                        {
                            "Name": "Prefix",
                            "Value": "upload/sampledata"
                        },
                        {
                            "Name": "Suffix",
                            "Value": ".csv"
                        }
                    ]
                }
            },
            "Id": "event-test-topic01",
            "TopicArn": "arn:aws:sns:ap-northeast-1:xxxxxxxxxxxxx:event-test-topic01",
            "Events": [
                "s3:ObjectCreated:Put",
                "s3:ObjectCreated:Post"
            ]
        }
    ]
}

これをs3_put_notification.jsonというファイル名で保存した。

S3バケットにイベントの設定を反映する

$ aws s3api put-bucket-notification-configuration --bucket nabewata07-event-test00 --notification-configuration file://s3_put_notification.json

Lambda関数を呼び出してみる

S3にオブジェクトをアップロードする

$ touch test.csv
$ aws s3 cp ./test.csv s3://nabewata07-event-test00/upload/sampledata/
upload: ./test.csv to s3://nabewata07-event-test00/upload/sampledata/test.csv

CloudWatchLogsでログが確認できればOK

Lambdaが/aws/lambda/s3GetObjectSampleというロググループを作成するため、そこで確認した。

INFO CONTENT TYPE: text/csv

所感

それぞれのリソースを作ってからそれらの連携の設定と、連携を許可する権限の設定があり少しややこしい。

連携の設定はしたけど許可の設定を忘れていて実行できないなどが発生しないためにもここに書いてある各設定を忘れないようにしたい。

rubyのfakerでテストデータ作成

名前、Eメールアドレス、適当な文字列などでテストデータを作成したいときにRubyのfakerというGemを使った。

PythonのライブラリやWeb上で公開されているツールなどもあるが、自分で柔軟に出力をカスタマイズしたいときはWeb上のツールでは不十分なのと自分が書きやすい言語ということでRubyのGemを採用した。

使ったGem

github.com

機能

テストデータの種類(名前とか住所とか)をひとまとまりにしたGeneratorという単位がある。 例えばDefault GeneratorにはAddress, Date, Job, Nameなどがあり、Japanese Mediaという漫画のキャラクター名を出力するものもある。

READMEにもある通り機能はかなり豊富でDefault Generatorだけでもたいていのテストデータは作れそうに見える。

今回データを作るときに使ったコードは下記のようなものになった。

氏名、ランダムな文字列、単語、日付 のサンプルデータでCSVファイルを作成した。

require 'faker'

Faker::Config.locale = 'ja'

File.open('./sample_data01.csv', 'w') do |f|
  100.times do |i|
    name = Faker::Name.name
    str = Faker::Lorem.characters(30)
    word = Faker::Lorem.word
    date = Faker::Date.backward

    str = "#{name},#{str},#{word},#{date}"
    f.puts str
  end
end

これで100行のCSVファイルが作成できる。

名前の部分は

https://github.com/stympy/faker/blob/be75ff9a2c208e57a95bf836c400513a13cfb67f/lib/locales/ja.yml#L68

のYmlファイルからとってきているみたいだった。

作成されたデータの一部。

森田 誠,x8e2cojfg4xlhibhggyfzpsdec2t0t,しょくん,2019-01-03
小島 颯,pg6wy8ntlsqi09wekjnj1kqn6twj17,勇気,2018-08-06
菊地 大地,km0ruz1urb86yju24pwozbcv11ilf0,明治,2018-12-20
...

ちょっと適当なデータで処理時間やメモリ消費の計測をやってみたいときとか、大きめのデータサイズのファイルを処理するときの軽い検証などに便利に使えそう。