elasticsearch-rubyでAWSのElasticsearch Serviceで構築したElasticsearchに接続したときにはまった
https://dev-nabewata07.hatenablog.com/entry/2019/08/11/183106
上記記事の「アクセスポリシーで権限を設定する」までの手順でドメインの作成、アクセスポリシーの設定を行ったあとにRubyのelasticsearch
とfaraday_middleware/aws_sigv4
というgemを使ってAWSのElasticsearch Serviceで構築したElasticsearchに接続したときにハマったので対応を記録する。
該当のGem
https://rubygems.org/gems/elasticsearch
https://rubygems.org/gems/faraday_middleware-aws-sigv4
環境
Rubyのバージョン
$ ruby -v ruby 2.6.3p62 (2019-04-16 revision 67580) [x86_64-darwin18]
各Gemのバージョン
aws-sigv4 (1.1.0) elasticsearch (7.3.0) elasticsearch-api (7.3.0) elasticsearch-transport (7.3.0)
結論から
接続先のポート番号が9200
以外のときはElasticsearch::Client.new
でクライアントを作るときは引数でport
も指定する必要がある。
具体的な理由は今回のelasticsearch-ruby
Gemではデフォルトのポート番号が9200
で設定されるがAWSのElasticsearch Serviceで作成したドメインにアクセスするときは80または443番ポートでアクセスする必要があるため。
スクリプトは下記のように記述した。
host = ENV['ES_HOST'] port = 443 client = Elasticsearch::Client.new(host: host, port: port) do |f| f.request :aws_sigv4, service: service, region: region, access_key_id: ENV['AWS_ACCESS_KEY_ID'], secret_access_key: ENV['AWS_SECRET_ACCESS_KEY'] f.adapter Faraday.default_adapter end
最初に記述したスクリプト
require 'elasticsearch' require 'faraday_middleware/aws_sigv4' require 'dotenv' Dotenv.load # e.g. https://my-domain.region.es.com host = ENV['ES_HOST'] region = 'ap-northeast-1' service = 'es' client = Elasticsearch::Client.new(host: host) do |f| f.request :aws_sigv4, service: service, region: region, access_key_id: ENV['AWS_ACCESS_KEY_ID'], secret_access_key: ENV['AWS_SECRET_ACCESS_KEY'] f.adapter Faraday.default_adapter end puts client.cat.indices
実行すると
$ bundle exec ruby sample.rb ... /path/to/ruby/2.6.0/net/http.rb:947:in `initialize': execution expired (Net::OpenTimeout) ... /path/to/ruby/2.6.0/net/http.rb:947:in `initialize': execution expired (Faraday::ConnectionFailed)
上記のように例外が発生して失敗した。
対応
調査したところelasticsearch-transport
のドキュメントにDefult Portの項目があり、別のポート番号の場合は指定するように書いてあった。
https://github.com/elastic/elasticsearch-ruby/tree/master/elasticsearch-transport#default-port
The default port is 9200. Please specify a port for your host(s) if they differ from this default. Please see below for an exception to this when connecting using an Elastic Cloud ID.
もとにしているGemであるelasticsearch-ruby
のドキュメントにも
https://github.com/elastic/elasticsearch-ruby
Usageの項目に
https://github.com/elastic/elasticsearch-ruby#usage
このGemはelasticsearch-transport
とelasticsearch-api
の2つのライブラリのラッパーである旨と
Both of these libraries are extensively documented. Please read the elasticsearch-transport and the elasticsearch-api documentation carefully.
のようにそちらのドキュメントを読むように記述してあった。
具体的には結論から
で書いたようにportを明示的に指定した。
まとめ
当たり前だけどライブラリを使うときはドキュメントを注意深く読む。
心がけていたつもりだったけど時間がないときなどはおろそかにしがちなのでいい反省になった。
Elasticsearchでindexを作成してからbulk insert, aliasの切り替えまで
Elasticsearchでindexを作成してデータをbulk insertで投入することと、新しくインデックスを作成してエイリアスを切り替えることでアトミックにESのデータを入れ替える作業をするときの手順。
AWSのElasticsearch Serviceを使った。
ESへのリクエストはmacOSからcurlコマンドを使った。
aws cliのバージョン
$ aws --version aws-cli/1.11.13 Python/2.7.10 Darwin/18.6.0 botocore/1.4.70
Elasticsearch Serviceでドメインを作成する
EBS Optionやインスタンスタイプを指定する必要がある。
$ aws es create-elasticsearch-domain --domain-name nabewata07-es-test --elasticsearch-cluster-config InstanceType=t2.small.elasticsearch --elasticsearch-version 6.2 --ebs-options EBSEnabled=true,VolumeType=standard,VolumeSize=10 { "DomainStatus": { "ElasticsearchClusterConfig": { "DedicatedMasterEnabled": false, "InstanceCount": 1, "ZoneAwarenessEnabled": false, "InstanceType": "t2.small.elasticsearch" }, "DomainId": "xxxxxxxxxxxx/nabewata07-es-test", "Created": true, "Deleted": false, "EBSOptions": { "VolumeSize": 10, "VolumeType": "standard", "EBSEnabled": true }, "Processing": true, "DomainName": "nabewata07-es-test", "SnapshotOptions": { "AutomatedSnapshotStartHour": 0 }, "ElasticsearchVersion": "6.2", "AccessPolicies": "", "AdvancedOptions": { "rest.action.multi.allow_explicit_index": "true" }, "ARN": "arn:aws:es:ap-northeast-1:xxxxxxxxxxxx:domain/nabewata07-es-test" } }
アクセスポリシーで権限を設定する
Createしたときに設定を忘れていてaws es update-elasticsearch-domain-config
で設定した。
$ aws es update-elasticsearch-domain-config --domain-name nabewata07-es-test --access-policies ' { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": [ "es:*" ], "Condition": { "IpAddress": { "aws:SourceIp": [ "xxx.xxx.xxx.xxx" ] } }, "Resource": "arn:aws:es:ap-northeast-1:xxxxxxxxxxxx:domain/nabewata07-es-test/*" }, { "Effect": "Allow", "Principal": { "AWS": [ "arn:aws:iam::xxxxxxxxxxxx:user/nabewata07" ] }, "Action": [ "es:*" ], "Resource": "arn:aws:es:ap-northeast-1:xxxxxxxxxxxx:domain/nabewata07-es-test/*" } ] } '
作業するために各環境変数を設定する
$ ES_ENDPOINT=$(aws es describe-elasticsearch-domain --domain-name nabewata07-es-test --query "DomainStatus.Endpoint" --output text) && echo $ES_ENDPOINT search-nabewata07-es-test-xxxxxxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.es.amazonaws.com $ export INDEX='sample_index'
具体的なESの操作はここから。
indexを作成する
https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
$ curl -X PUT "${ES_ENDPOINT}/${INDEX}" {"acknowledged":true,"shards_acknowledged":true,"index":"sample_index"}
mappingを定義する
mappingとは、ドキュメントにどのようなフィールドがありそれらのフィールドがどのような型なのかを定義するもの。
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html
例えば下記のmappingを定義してmapping.jsonという名前で保存する。
{ "properties": { "title": { "type": "text" }, "name": { "type": "text" }, "age": { "type": "integer" }, "created": { "type": "date", "format": "strict_date_optional_time||epoch_millis" } } }
dateのフォーマットは
format | Elasticsearch Reference [7.3] | Elastic
を参照。
mappingを反映する
先に作成したmappingのファイルを使って反映する。
$ curl -XPUT "${ES_ENDPOINT}/${INDEX}/_mapping/_doc" -H 'Content-Type: application/json' --data-binary @mapping.json {"acknowledged":true}
データを投入する
Bulk APIを使用する。
Bulk API | Elasticsearch Reference [7.3] | Elastic
ドキュメントをまとめて登録することができる。
投入するデータを準備する
下記のようなJSONファイルを作成する
{"index": {"_index":"sample_index", "_type":"_doc", "_id":"1"}} {"title": "title0", "name":"name0", "age":10, "created":"2019-08-01"} {"index": {"_index":"sample_index", "_type":"_doc", "_id":"2"}} {"title": "title1", "name":"name1", "age":11, "created":"2019-08-01"}
bulk_insert.jsonという名前で保存する。
Bulk APIでデータを投入する
$ curl -s -H "Content-Type: application/x-ndjson" -XPOST "${ES_ENDPOINT}/_bulk" --data-binary @bulk_insert.json {"took":160,"errors":false,"items":[{"index":{"_index":"sample_index","_type":"_doc","_id":"1","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}},{"index":{"_index":"sample_index","_type":"_doc","_id":"2","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}}]}
結果を確認する
インデックスに入っているドキュメントの数を数えて確認する
$ curl -X GET "${ES_ENDPOINT}/${INDEX}/_count" {"count":2,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0}}%
aliasを設定する
POSTでaliasを作成、反映する。
$ curl -XPOST "${ES_ENDPOINT}/${INDEX}/_alias/sample_alias" {"acknowledged":true}
aliasを確認する
$ curl -X GET "${ES_ENDPOINT}/_cat/aliases?v" alias index filter routing.index routing.search sample_alias sample_index - - -
aliasを切り替える
indexをもう一つ作成する
$ export NEW_INDEX=sample_index01 $ curl -X PUT "${ES_ENDPOINT}/${NEW_INDEX}" {"acknowledged":true}
新しいインデックスにおなじmappingを反映する
先に作成したmappingのファイルを使って反映する。
$ curl -XPUT "${ES_ENDPOINT}/${NEW_INDEX}/_mapping/_doc" -H 'Content-Type: application/json' --data-binary @mapping.json {"acknowledged":true}
新しいインデックスに投入するデータを準備する
下記のようなJSONファイルを作成する。
ドキュメントの数を3つにした。
{"index": {"_index":"sample_index01", "_type":"_doc", "_id":"1"}} {"title": "title2", "name":"name2", "age":12, "created":"2019-08-01"} {"index": {"_index":"sample_index01", "_type":"_doc", "_id":"2"}} {"title": "title3", "name":"name1", "age":13, "created":"2019-08-01"} {"index": {"_index":"sample_index01", "_type":"_doc", "_id":"3"}} {"title": "title4", "name":"name1", "age":14, "created":"2019-08-01"}
bulk_insert01.jsonという名前で保存する。
Bulk APIでデータを投入する
$ curl -s -H "Content-Type: application/x-ndjson" -XPOST "${ES_ENDPOINT}/_bulk" --data-binary @bulk_insert01.json {"took":14,"errors":false,"items":[{"index":{"_index":"sample_index01","_type":"_doc","_id":"1","_version":4,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":3,"_primary_term":1,"status":201}},{"index":{"_index":"sample_index01","_type":"_doc","_id":"2","_version":5,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":4,"_primary_term":1,"status":201}},{"index":{"_index":"sample_index01","_type":"_doc","_id":"3","_version":3,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}}]}
aliasを切り替える
$ curl -X POST "${ES_ENDPOINT}/_aliases?pretty" -H 'Content-Type: application/json' -d" { \"actions\" : [ { \"remove\" : { \"index\" : \"${INDEX}\", \"alias\" : \"sample_alias\" } }, { \"add\" : { \"index\" : \"${NEW_INDEX}\", \"alias\" : \"sample_alias\" } } ] } " { "acknowledged" : true }
aliasを確認する
$ curl -X GET "${ES_ENDPOINT}/_cat/aliases?v" alias index filter routing.index routing.search sample_alias sample_index01 - - -
sample_aliasの向き先がsample_index01に変更されている。
INSERT結果を確認する
インデックスに入っているドキュメントの数を数えて確認する
$ curl -X GET "${ES_ENDPOINT}/sample_alias/_count" {"count":3,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0}}
ドキュメントIDを指定して取得したいとき
$ curl -X GET "${ES_ENDPOINT}/sample_alias/_doc/1" {"_index":"sample_index01","_type":"_doc","_id":"1","_version":4,"found":true,"_source":{"title": "title2", "name":"name2", "age":12, "created":"2019-08-01"}}
削除するときはメソッドをDELETEにする。
その他の操作
上記の作業以外にも実施しそうなコマンドを記録しておく。
インデックスを削除する
$ curl -X DELETE "${ES_ENDPOINT}/${INDEX}"
mappingを確認する
$ curl -X GET "${ES_ENDPOINT}/${INDEX}/_mapping" {"sample_index":{"mappings":{"_doc":{"properties":{"age":{"type":"integer"},"created":{"type":"date"},"name":{"type":"text"},"title":{"type":"text"}}}}}}
すべてのindexを列挙する
curl -X GET "${ES_ENDPOINT}/_cat/indices?v"
S3にオブジェクトを作成したイベントをトリガーにしてSNS経由でLambda関数を呼び出す
S3にオブジェクトを作成した際にPUTやPOSTのイベントが発生する。
それをトリガーにしてSNS経由でLambda関数を呼び出し、作成されたS3オブジェクトを取得して何らかの処理をするための各種設定とLambda関数の作成を行った。
作業した環境
$ aws --version aws-cli/1.11.13 Python/2.7.10 Darwin/18.6.0 botocore/1.4.70
以下、設定の作業内容。
必要なリソースの作成
S3バケットを作る
$ aws s3 mb s3://nabewata07-event-test00 --region ap-northeast-1
make_bucket: nabewata07-event-test00
SNS Topicを作成する
$ aws sns create-topic --name event-test-topic01 --region ap-northeast-1 { "TopicArn": "arn:aws:sns:ap-northeast-1:xxxxxxxxxxxxx:event-test-topic01" }
SNSトピックをトリガーにしたLambda関数を作成する
イベントを発行したオブジェクトのContentTypeを表示する関数
console.log('Loading function'); const aws = require('aws-sdk'); const s3 = new aws.S3({ apiVersion: '2006-03-01' }); exports.handler = async (event, context) => { // Get the object from the event and show its content type const jsonStr = event.Records[0].Sns.Message; const obj = JSON.parse(jsonStr) const target = obj.Records[0].s3; const bucket = target.bucket.name; const key = decodeURIComponent(target.object.key.replace(/\+/g, ' ')); const params = { Bucket: bucket, Key: key, }; try { const { ContentType, Body } = await s3.getObject(params).promise(); console.log('CONTENT TYPE:', ContentType); // do something with content body //console.log('Body:', Body.toString()); return ContentType; } catch (err) { console.log(err); throw new Error(message); } };
作成したコードをzip圧縮する
デプロイのため。
$ zip index.js.zip index.js adding: index.js (deflated 47%)
Lambda関数を作成する
$ aws lambda create-function --function-name s3GetObjectSample --runtime nodejs10.x --role arn:aws:iam::xxxxxxxxxxxx:role/service-role/LambdaS3FuncRole --handler index.handler --zip-file fileb://index.js.zip --region ap-northeast-1 { "CodeSha256": "78l2lot/Qie5DQX2R7j1PMXLbiNivMToAaVbny7OQpQ=", "FunctionName": "s3GetObjectSample", "CodeSize": 635, "MemorySize": 128, "FunctionArn": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxx:function:s3GetObjectSample", "Version": "$LATEST", "Role": "arn:aws:iam::xxxxxxxxx:role/service-role/LambdaS3FuncRole", "Timeout": 3, "LastModified": "2019-07-28T23:34:32.759+0000", "Handler": "index.handler", "Runtime": "nodejs10.x", "Description": "" }
Lambda関数のポリシーに必要な権限
LambdaS3FuncRoleには以下のポリシー設定が必要。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::nabewata07-event-test00/upload/sampledata/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:ap-northeast-1:xxxxxxxxxxxxxx:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:ap-northeast-1:xxxxxxxxxxxxxx:log-group:/aws/lambda/s3GetObjectSample:*" ] } ] }
SNS TopicからLambda関数を呼び出す許可を設定する
Lambda関数のFunction policyで設定する。
$ aws lambda add-permission --function-name s3GetObjectSample --statement-id invoke-from-sns-topic00 --action lambda:InvokeFunction --principal sns.amazonaws.com --source-arn arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:event-test-topic01 { "Statement": "{\"Sid\":\"invoke-from-sns-topic00\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"sns.amazonaws.com\"},\"Action\":\"lambda:InvokeFunction\",\"Resource\":\"arn:aws:lambda:ap-northeast-1:325528992442:function:s3GetObjectSample\",\"Condition\":{\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:sns:ap-northeast-1:325528992442:event-test-topic01\"}}}" }
LambdaがSNSトピックをサブスクライブする設定を行う
--topic-arn
は先に作成したSNS TopicのARNを指定。
--notification-endpoint
には先に作成したLambda関数のARNを指定。
$ aws sns subscribe --protocol lambda \ --topic-arn arn:aws:sns:ap-northeast-1:325528992442:event-test-topic01 \ --notification-endpoint arn:aws:lambda:ap-northeast-1:325528992442:function:s3GetObjectSample \ --region ap-northeast-1 { "SubscriptionArn": "arn:aws:sns:ap-northeast-1:325528992442:event-test-topic01:af6d12be-b9ed-40f3-9041-49c59fa3360f" }
SNS TopicがS3から呼び出される許可をする
下記のStatementをSNSのアクセスポリシーに追加する。
{ "Sid": "Stmt1561879283049", "Effect": "Allow", "Principal": "*", "Action": "sns:Publish", "Resource": "arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:event-test-topic01", "Condition": { "ArnEquals": { "aws:SourceArn": "arn:aws:s3:::nabewata07-event-test00" } } }
これはCLIから実行する方法がわからなかったのでマネジメントコンソールのGUIから行った。
S3バケットにイベントの設定を行う
設定用のJSONファイルを作る
対象のバケットのupload/sampledata
というプレフィックスかつ.csv
というサフィックスが付いたオブジェクトがPUTまたはPOSTによって作成されたときに先ほど作成したSNSトピックに通知する設定にした。
{ "TopicConfigurations": [ { "Filter": { "Key": { "FilterRules": [ { "Name": "Prefix", "Value": "upload/sampledata" }, { "Name": "Suffix", "Value": ".csv" } ] } }, "Id": "event-test-topic01", "TopicArn": "arn:aws:sns:ap-northeast-1:xxxxxxxxxxxxx:event-test-topic01", "Events": [ "s3:ObjectCreated:Put", "s3:ObjectCreated:Post" ] } ] }
これをs3_put_notification.json
というファイル名で保存した。
S3バケットにイベントの設定を反映する
$ aws s3api put-bucket-notification-configuration --bucket nabewata07-event-test00 --notification-configuration file://s3_put_notification.json
Lambda関数を呼び出してみる
S3にオブジェクトをアップロードする
$ touch test.csv $ aws s3 cp ./test.csv s3://nabewata07-event-test00/upload/sampledata/ upload: ./test.csv to s3://nabewata07-event-test00/upload/sampledata/test.csv
CloudWatchLogsでログが確認できればOK
Lambdaが/aws/lambda/s3GetObjectSample
というロググループを作成するため、そこで確認した。
INFO CONTENT TYPE: text/csv
所感
それぞれのリソースを作ってからそれらの連携の設定と、連携を許可する権限の設定があり少しややこしい。
連携の設定はしたけど許可の設定を忘れていて実行できないなどが発生しないためにもここに書いてある各設定を忘れないようにしたい。
rubyのfakerでテストデータ作成
名前、Eメールアドレス、適当な文字列などでテストデータを作成したいときにRubyのfakerというGemを使った。
PythonのライブラリやWeb上で公開されているツールなどもあるが、自分で柔軟に出力をカスタマイズしたいときはWeb上のツールでは不十分なのと自分が書きやすい言語ということでRubyのGemを採用した。
使ったGem
機能
テストデータの種類(名前とか住所とか)をひとまとまりにしたGeneratorという単位がある。 例えばDefault GeneratorにはAddress, Date, Job, Nameなどがあり、Japanese Mediaという漫画のキャラクター名を出力するものもある。
READMEにもある通り機能はかなり豊富でDefault Generatorだけでもたいていのテストデータは作れそうに見える。
今回データを作るときに使ったコードは下記のようなものになった。
氏名、ランダムな文字列、単語、日付 のサンプルデータでCSVファイルを作成した。
require 'faker' Faker::Config.locale = 'ja' File.open('./sample_data01.csv', 'w') do |f| 100.times do |i| name = Faker::Name.name str = Faker::Lorem.characters(30) word = Faker::Lorem.word date = Faker::Date.backward str = "#{name},#{str},#{word},#{date}" f.puts str end end
これで100行のCSVファイルが作成できる。
名前の部分は
https://github.com/stympy/faker/blob/be75ff9a2c208e57a95bf836c400513a13cfb67f/lib/locales/ja.yml#L68
のYmlファイルからとってきているみたいだった。
作成されたデータの一部。
森田 誠,x8e2cojfg4xlhibhggyfzpsdec2t0t,しょくん,2019-01-03 小島 颯,pg6wy8ntlsqi09wekjnj1kqn6twj17,勇気,2018-08-06 菊地 大地,km0ruz1urb86yju24pwozbcv11ilf0,明治,2018-12-20 ...
ちょっと適当なデータで処理時間やメモリ消費の計測をやってみたいときとか、大きめのデータサイズのファイルを処理するときの軽い検証などに便利に使えそう。