DynamoDB × OpenSearch で実現するゼロETL連携の構築手順

DynamoDB × OpenSearch で実現するゼロETL連携の構築手順

はじめに

DynamoDBは、シンプルなデータ構造を扱う用途では非常に便利ですが、検索要件が加わると構成に悩むことがあります。

特に、「データ管理は DynamoDBのまま進めたい」「一方で、検索は OpenSearchのように柔軟にしたい」「しかも同期処理はなるべく自前で持ちたくない」といったケースは少なくありません。

本記事では、そうした課題に対する選択肢として、Amazon DynamoDBとAmazon OpenSearch Service を、Lambdaなどのデータパイプラインを自前で実装せずに連携する構成(いわゆるゼロETL に相当する取り込み)の構築手順を紹介します。

DynamoDBを活用しながら検索機能も強化したい方や、AWS マネージドサービスを活かして運用負荷を抑えたい方の参考になれば幸いです。

ゼロETL連携について

概要

Lambdaなどによる独自のデータパイプラインを用意せず、できるだけ構成要素を抑えてデータ連携を行う手法です。AWS が提供するマネージドな取り込み経路として、本記事では OpenSearch Ingestion のパイプラインを用い、DynamoDB の変更ストリームやテーブルエクスポートと OpenSearchを接続します。

メリット

  • 運用負荷の低減:同期専用の Lambdaなどを自前で開発・保守する必要がなく、パイプラインとマネージド機能に処理を任せられます。
  • ほぼリアルタイムの反映:DynamoDB Streams から変更を取り込むことで、アプリケーションがテーブルを更新したあと、検索側へ低遅延で反映しやすくなります。
  • 初期データと増分の一貫性:初回のフルロードと、更新時の差分取り込みを、同じパイプライン設計のまま扱えます。
  • 軽い変換をパイプライン内で完結:プロセッサ(スクリプトなど)でインデックス向けの軽い変換やフィールド付与ができ、小さな加工であれば追加サービスが不要な場合があります。
  • AWS ネイティブな権限設計:IAM ロールと OpenSearchのロールマッピングでアクセスを制御でき、VPC やドメイン設定とあわせてセキュリティを設計しやすいです。

デメリット

  • 柔軟性の限界:複雑なビジネスロジックや多段の結合・集計が主目的の場合は、従来型の ETL やイベント駆動の独自実装が必要となることがあります。
  • コストの積み上がり:DynamoDB Streams、PITR(ポイントインタイムリカバリ)、OpenSearchドメイン、Ingestion パイプライン、S3、データ転送などが複合的に発生します。トラフィックとテーブル規模に応じた見積もりが必要です。
  • リージョンと機能の前提:パイプラインや連携機能の利用可能リージョン、OpenSearchの提供形態(マネージドドメイン/Serverlessなど)の制約に依存します。
  • スキーマとマッピングの維持:DynamoDBのアイテム構造と OpenSearchのインデックスマッピングの整合を、テーブル設計とパイプライン設定の両面で保つ必要があります。

構築手順

早速、今回紹介するゼロETL統合の構築手順をご紹介します。

従来ではLambdaなどを用いてデータの連携を行なっていた部分を、AWS上の設定によって実現することができます。

1.DynamoDBの構築

要件に沿ったテーブルを作成します。次の設定が必須です。

  • DynamoDB Streams を有効にする(OpenSearch連携に必要)
  • PITRを有効にする(同上)

いずれも本連携で前提となる設定です。

2.OpenSearchの構築

要件に沿った検索ドメインを作成します。本手順において、ドメイン側で特別なオプションは不要です。

3.S3の構築

DynamoDBのエクスポート先となるバケットを作成します。本手順において、バケット側で特別なオプションは不要です。

4.専用IAMロールの作成

OpenSearch Ingestion のパイプライン用に、専用の IAM ロールを作成します。

  • IAM コンソールでロールを作成し、ユースケースとして OpenSearch Ingestion pipelines を選択する。
  • この時点では追加のポリシーは付けず、そのままロールを作成して問題ありません。

5.カスタムポリシーの作成

IAM で先ほどのロールにアタッチするカスタムポリシーを作成します。以下は設定例です。Resource の各 ARN は、ご自身の環境で作成したテーブル・ドメイン・バケットに合わせて置き換えてください。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "allowRunExportJob",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeTable",
                "dynamodb:DescribeContinuousBackups",
                "dynamodb:ExportTableToPointInTime"
            ],
            "Resource": [
                "arn:aws:dynamodb:ap-northeast-1:915494428181:table/Test_Table"
            ]
        },
        {
            "Sid": "allowCheckExportjob",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeExport"
            ],
            "Resource": [
                "arn:aws:dynamodb:ap-northeast-1:915494428181:table/Test_Table/export/*"
            ]
        },
        {
            "Sid": "allowReadFromStream",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:ListStreams"
            ],
            "Resource": [
                "arn:aws:dynamodb:ap-northeast-1:915494428181:table/Test_Table/stream/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "es:ESHttp*",
                "es:ESHttpGet",
                "es:DescribeDomain"
            ],
            "Resource": "arn:aws:es:ap-northeast-1:915494428181:domain/pretest2/*"
        },
        {
            "Sid": "allowReadAndWriteToS3ForExport",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:AbortMultipartUpload",
                "s3:PutObject",
                "s3:PutObjectAcl",
                "s3:DeleteObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::lixil-test-bucket",
                "arn:aws:s3:::lixil-test-bucket/*"
            ]
        }
    ]
}

パイプライン作成やデータ連携がうまくいかない場合は、まずこのポリシーの Resource やアクションの不足がないか確認してください。検証環境のみ、切り分けのため一時的に Resource を “*” に広げることもありますが、本番では最小権限のままにしてください。

6.パイプラインの作成

OpenSearchサービスコンソールの左ペインから Pipelines を開き、パイプラインを作成します。

以下はパイプライン定義の例です(YAML 形式)。各 ARN・バケット名・ホスト URL は、これまでの手順で作成したリソースに合わせて変更してください。コンソールのブループリントで同等のソース・シンク・プロセッサ構成にしていても構いません。

version: '2'
extension:
  osis_configuration_metadata:
    builder_type: visual
lixil-test-pipeline:
  source:
    dynamodb:
      acknowledgments: true
      tables:
        - table_arn: 'arn:aws:dynamodb:ap-northeast-1:915494428181:table/Test_Table'
          stream:
            start_position: LATEST
            view_on_remove: OLD_IMAGE
          export:
            s3_bucket: lixil-test-bucket
            s3_region: ap-northeast-1
            s3_prefix: ddb-to-opensearch-export/
      aws:
        region: ap-northeast-1
    processor:
      - script:
          lang: "javascript"
          source: |
            function parseDynamoDBValue(dynamoValue) {
              if (dynamoValue.S !== undefined) return dynamoValue.S;
              if (dynamoValue.N !== undefined) return parseFloat(dynamoValue.N);
              return null;
            }

            if (ctx.dynamodb.NewImage && ctx.dynamodb.NewImage.product_details) {
              let raw_value = parseDynamoDBValue(ctx.dynamodb.NewImage.product_details);
              if (raw_value) {
                ctx.product_details = raw_value.toString().toLowerCase();
              }
            }

      - set:
          fields:
            index_name: my-custom-index-table
  sink:
    - opensearch:
        hosts:
          - >-
            https://search-pretest2-inczyztgojswyf5ig7nj7gkjyi.ap-northeast-1.es.amazonaws.com
        aws:
          serverless: false
          region: ap-northeast-1
        index_type: custom
        index: my-custom-index-table
        document_id: '${getMetadata("primary_key")}'
        action: '${getMetadata("opensearch_action")}'
        document_version: '${getMetadata("document_version")}'
        document_version_type: external
        flush_timeout: '-1'

設定を保存し、パイプラインのステータスが Active になるまで待ちます。アクティブ化後、指定した DynamoDB テーブルから OpenSearchドメインへデータが取り込まれます。

データ連携がうまくいかない場合

ここまでの構築でエクスポートが始まらないなど、データの連携に失敗するときは、次の手順で OpenSearchのセキュリティ設定を確認してください。

  1. OpenSearch Dashboards にサインイン:ドメイン概要の Dashboards URL を開き、マスターユーザー(IAM または内部ユーザー)でサインインします。
  2. セキュリティ画面を開く:左ナビから Security を選択します。
  3. ロールの選択または作成Roles で、既存ロール(例: all_access)を使うか、カスタムロールを作成します。パイプライン書き込みには、クラスター権限(例: cluster_monitor)とインデックス権限(例: create_index、CRUD)が必要です。
  4. バックエンドロールのマッピング:利用するロールの詳細で Mapped users タブを開き、Manage mapping から Backend roles に  パイプラインが使用する IAM ロールの ARN を追加して保存します。次のスクリーンショットのようにマッピングされていれば問題ありません。

5.動作確認:パイプラインのメトリクスと OpenSearchのインデックスを確認し、必要ならパイプラインを再起動します。

ここまでの設定が完了すれば、DynamoDBからOpenSearchへの取り込みが動作するはずです。

テーブルへの更新はストリーム経由でインデックスにも反映されます。検索の確認は、ドメイン画面から OpenSearch Dashboardsを開き、Discoverなどで行えます。

最後に

今回、弊社のプロジェクトにて扱ったDynamoDBとOpenSearchのゼロETL統合について紹介しました。

従来では、Lambdaなどを用いてデータの連携を行う必要がありましたが、今回の構成を用いることでデータ連携の難易度は大幅にと下がると感じています。

今後、「管理するデータがシンプルなためRDSではなくDynamoDBを使いたい」「ただし検索機能もある程度充実させたい」といった要件があれば、ぜひこちらの記事の内容を参考にDynamoDBとOpenSearchのデータ連携をお試しいただければ幸いです。

AWSの新サービス "DevOps Agent" にIAMロールの整理整頓を手伝ってもらう
サムネイル

ABOUT US
TETSU
新卒でSler会社に入社し、主にサーバーサイドを中心に要件定義から開発、保守まで幅広く経験。 2025年8月にアイスリーデザインに入社し、現在は経験が浅いフロントエンド案件に挑戦中。