はじめに
DynamoDBは、シンプルなデータ構造を扱う用途では非常に便利ですが、検索要件が加わると構成に悩むことがあります。
特に、「データ管理は DynamoDBのまま進めたい」「一方で、検索は OpenSearchのように柔軟にしたい」「しかも同期処理はなるべく自前で持ちたくない」といったケースは少なくありません。
本記事では、そうした課題に対する選択肢として、Amazon DynamoDBとAmazon OpenSearch Service を、Lambdaなどのデータパイプラインを自前で実装せずに連携する構成(いわゆるゼロETL に相当する取り込み)の構築手順を紹介します。
DynamoDBを活用しながら検索機能も強化したい方や、AWS マネージドサービスを活かして運用負荷を抑えたい方の参考になれば幸いです。
ゼロETL連携について
概要
Lambdaなどによる独自のデータパイプラインを用意せず、できるだけ構成要素を抑えてデータ連携を行う手法です。AWS が提供するマネージドな取り込み経路として、本記事では OpenSearch Ingestion のパイプラインを用い、DynamoDB の変更ストリームやテーブルエクスポートと OpenSearchを接続します。
メリット
- 運用負荷の低減:同期専用の Lambdaなどを自前で開発・保守する必要がなく、パイプラインとマネージド機能に処理を任せられます。
- ほぼリアルタイムの反映:DynamoDB Streams から変更を取り込むことで、アプリケーションがテーブルを更新したあと、検索側へ低遅延で反映しやすくなります。
- 初期データと増分の一貫性:初回のフルロードと、更新時の差分取り込みを、同じパイプライン設計のまま扱えます。
- 軽い変換をパイプライン内で完結:プロセッサ(スクリプトなど)でインデックス向けの軽い変換やフィールド付与ができ、小さな加工であれば追加サービスが不要な場合があります。
- AWS ネイティブな権限設計:IAM ロールと OpenSearchのロールマッピングでアクセスを制御でき、VPC やドメイン設定とあわせてセキュリティを設計しやすいです。
デメリット
- 柔軟性の限界:複雑なビジネスロジックや多段の結合・集計が主目的の場合は、従来型の ETL やイベント駆動の独自実装が必要となることがあります。
- コストの積み上がり:DynamoDB Streams、PITR(ポイントインタイムリカバリ)、OpenSearchドメイン、Ingestion パイプライン、S3、データ転送などが複合的に発生します。トラフィックとテーブル規模に応じた見積もりが必要です。
- リージョンと機能の前提:パイプラインや連携機能の利用可能リージョン、OpenSearchの提供形態(マネージドドメイン/Serverlessなど)の制約に依存します。
- スキーマとマッピングの維持:DynamoDBのアイテム構造と OpenSearchのインデックスマッピングの整合を、テーブル設計とパイプライン設定の両面で保つ必要があります。

構築手順
早速、今回紹介するゼロETL統合の構築手順をご紹介します。
従来ではLambdaなどを用いてデータの連携を行なっていた部分を、AWS上の設定によって実現することができます。
1.DynamoDBの構築
要件に沿ったテーブルを作成します。次の設定が必須です。
- DynamoDB Streams を有効にする(OpenSearch連携に必要)
- PITRを有効にする(同上)
いずれも本連携で前提となる設定です。

2.OpenSearchの構築
要件に沿った検索ドメインを作成します。本手順において、ドメイン側で特別なオプションは不要です。

3.S3の構築
DynamoDBのエクスポート先となるバケットを作成します。本手順において、バケット側で特別なオプションは不要です。

4.専用IAMロールの作成
OpenSearch Ingestion のパイプライン用に、専用の IAM ロールを作成します。
- IAM コンソールでロールを作成し、ユースケースとして OpenSearch Ingestion pipelines を選択する。
- この時点では追加のポリシーは付けず、そのままロールを作成して問題ありません。

5.カスタムポリシーの作成
IAM で先ほどのロールにアタッチするカスタムポリシーを作成します。以下は設定例です。Resource の各 ARN は、ご自身の環境で作成したテーブル・ドメイン・バケットに合わせて置き換えてください。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "allowRunExportJob",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeTable",
"dynamodb:DescribeContinuousBackups",
"dynamodb:ExportTableToPointInTime"
],
"Resource": [
"arn:aws:dynamodb:ap-northeast-1:915494428181:table/Test_Table"
]
},
{
"Sid": "allowCheckExportjob",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeExport"
],
"Resource": [
"arn:aws:dynamodb:ap-northeast-1:915494428181:table/Test_Table/export/*"
]
},
{
"Sid": "allowReadFromStream",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:ListStreams"
],
"Resource": [
"arn:aws:dynamodb:ap-northeast-1:915494428181:table/Test_Table/stream/*"
]
},
{
"Effect": "Allow",
"Action": [
"es:ESHttp*",
"es:ESHttpGet",
"es:DescribeDomain"
],
"Resource": "arn:aws:es:ap-northeast-1:915494428181:domain/pretest2/*"
},
{
"Sid": "allowReadAndWriteToS3ForExport",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::lixil-test-bucket",
"arn:aws:s3:::lixil-test-bucket/*"
]
}
]
}
パイプライン作成やデータ連携がうまくいかない場合は、まずこのポリシーの Resource やアクションの不足がないか確認してください。検証環境のみ、切り分けのため一時的に Resource を “*” に広げることもありますが、本番では最小権限のままにしてください。
6.パイプラインの作成
OpenSearchサービスコンソールの左ペインから Pipelines を開き、パイプラインを作成します。

以下はパイプライン定義の例です(YAML 形式)。各 ARN・バケット名・ホスト URL は、これまでの手順で作成したリソースに合わせて変更してください。コンソールのブループリントで同等のソース・シンク・プロセッサ構成にしていても構いません。
version: '2'
extension:
osis_configuration_metadata:
builder_type: visual
lixil-test-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: 'arn:aws:dynamodb:ap-northeast-1:915494428181:table/Test_Table'
stream:
start_position: LATEST
view_on_remove: OLD_IMAGE
export:
s3_bucket: lixil-test-bucket
s3_region: ap-northeast-1
s3_prefix: ddb-to-opensearch-export/
aws:
region: ap-northeast-1
processor:
- script:
lang: "javascript"
source: |
function parseDynamoDBValue(dynamoValue) {
if (dynamoValue.S !== undefined) return dynamoValue.S;
if (dynamoValue.N !== undefined) return parseFloat(dynamoValue.N);
return null;
}
if (ctx.dynamodb.NewImage && ctx.dynamodb.NewImage.product_details) {
let raw_value = parseDynamoDBValue(ctx.dynamodb.NewImage.product_details);
if (raw_value) {
ctx.product_details = raw_value.toString().toLowerCase();
}
}
- set:
fields:
index_name: my-custom-index-table
sink:
- opensearch:
hosts:
- >-
https://search-pretest2-inczyztgojswyf5ig7nj7gkjyi.ap-northeast-1.es.amazonaws.com
aws:
serverless: false
region: ap-northeast-1
index_type: custom
index: my-custom-index-table
document_id: '${getMetadata("primary_key")}'
action: '${getMetadata("opensearch_action")}'
document_version: '${getMetadata("document_version")}'
document_version_type: external
flush_timeout: '-1'
設定を保存し、パイプラインのステータスが Active になるまで待ちます。アクティブ化後、指定した DynamoDB テーブルから OpenSearchドメインへデータが取り込まれます。
データ連携がうまくいかない場合
ここまでの構築でエクスポートが始まらないなど、データの連携に失敗するときは、次の手順で OpenSearchのセキュリティ設定を確認してください。
- OpenSearch Dashboards にサインイン:ドメイン概要の Dashboards URL を開き、マスターユーザー(IAM または内部ユーザー)でサインインします。
- セキュリティ画面を開く:左ナビから Security を選択します。
- ロールの選択または作成:Roles で、既存ロール(例: all_access)を使うか、カスタムロールを作成します。パイプライン書き込みには、クラスター権限(例: cluster_monitor)とインデックス権限(例: create_index、CRUD)が必要です。
- バックエンドロールのマッピング:利用するロールの詳細で Mapped users タブを開き、Manage mapping から Backend roles に パイプラインが使用する IAM ロールの ARN を追加して保存します。次のスクリーンショットのようにマッピングされていれば問題ありません。

5.動作確認:パイプラインのメトリクスと OpenSearchのインデックスを確認し、必要ならパイプラインを再起動します。
ここまでの設定が完了すれば、DynamoDBからOpenSearchへの取り込みが動作するはずです。
テーブルへの更新はストリーム経由でインデックスにも反映されます。検索の確認は、ドメイン画面から OpenSearch Dashboardsを開き、Discoverなどで行えます。


最後に
今回、弊社のプロジェクトにて扱ったDynamoDBとOpenSearchのゼロETL統合について紹介しました。
従来では、Lambdaなどを用いてデータの連携を行う必要がありましたが、今回の構成を用いることでデータ連携の難易度は大幅にと下がると感じています。
今後、「管理するデータがシンプルなためRDSではなくDynamoDBを使いたい」「ただし検索機能もある程度充実させたい」といった要件があれば、ぜひこちらの記事の内容を参考にDynamoDBとOpenSearchのデータ連携をお試しいただければ幸いです。










