Powered by AppSignal & Oban Pro

Amazon S3 Tables

livebooks/aws/s3_tables.livemd

Amazon S3 Tables

Mix.install([
  {:aws, "~> 1.0"},
  {:hackney, "~> 1.16"},
  {:uuid, "~> 1.1"},
  {:explorer, "~> 0.11"},
  {:kino, "~> 0.19"}
])

概要

S3 Tables は、Amazon S3 上に Apache Iceberg テーブルを管理し、Athena などから SQL で扱いやすくする仕組みです この Livebook では、認証、カタログ作成、テーブルバケット作成、データ投入、検索、削除までを順番に試します

準備

Explorer の別名を定義して、後続のセルを読みやすくします

alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame

認証

AWS のアクセスキー、シークレットキー、リージョンを Livebook 上で入力できるようにします

access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")

[
  access_key_id_input,
  secret_access_key_input,
  region_input
]
|> Kino.Layout.grid(columns: 3)

入力した認証情報から AWS クライアントを作成します

client =
  AWS.Client.create(
    Kino.Input.read(access_key_id_input),
    Kino.Input.read(secret_access_key_input),
    Kino.Input.read(region_input)
  )

後続で使うリージョン文字列を取り出します

region = Kino.Input.read(region_input)

Glue カタログ作成時に使う AWS アカウント ID を取得します

account_id =
  client
  |> AWS.STS.get_caller_identity(%{})
  |> elem(1)
  |> Map.get("GetCallerIdentityResponse")
  |> Map.get("GetCallerIdentityResult")
  |> Map.get("Account")

データ作成

サンプルとして Explorer の wine データセットを読み込み、内容を確認します

wine_df = Explorer.Datasets.wine()

Kino.DataTable.new(wine_df)

データカタログの作成

Glue に s3tablescatalog が既にあるかを確認します

exist_s3tablescatalog =
  client
  |> AWS.Glue.get_catalogs(%{})
  |> elem(1)
  |> Map.get("CatalogList")
  |> Enum.any?(fn catalog -> Map.get(catalog, "Name") == "s3tablescatalog" end)

存在しない場合だけ、S3 Tables 用の Federated Catalog を作成します

if not exist_s3tablescatalog do
  client
  |> AWS.Glue.create_catalog(%{
    "Name" => "s3tablescatalog",
    "CatalogInput" => %{
      "Description" => "Federated catalog for S3 Tables",
      "FederatedCatalog" => %{
        "Identifier" => "arn:aws:s3tables:#{region}:#{account_id}:bucket/*",
        "ConnectionName" => "aws:s3tables"
      },
      "CreateDatabaseDefaultPermissions" => [%{
        "Principal" => %{
          "DataLakePrincipalIdentifier" => "IAM_ALLOWED_PRINCIPALS"
        },
        "Permissions" => ["ALL"]
      }],
      "CreateTableDefaultPermissions" => [%{
        "Principal" => %{
          "DataLakePrincipalIdentifier" => "IAM_ALLOWED_PRINCIPALS"
        },
        "Permissions" => ["ALL"]
      }]
    }
  })
end

作成後のカタログ一覧を表示して、登録されたことを確認します

client
|> AWS.Glue.get_catalogs(%{})
|> elem(1)
|> Map.get("CatalogList")

出力先バケット作成

Athena のクエリ結果を書き出す S3 バケット名を入力します

output_bucket_name_input = Kino.Input.text("OUTPUT_BUCKET_NAME")

入力したバケット名を読み取ります

output_bucket_name = Kino.Input.read(output_bucket_name_input)

Athena の結果出力先となるバケットを作成します

AWS.S3.create_bucket(client, output_bucket_name, %{
  "CreateBucketConfiguration" => %{
    "LocationConstraint" => region
  }
})

テーブルバケット作成

S3 Tables 本体の保存先となるテーブルバケット名を入力します

table_bucket_name_input = Kino.Input.text("TABLE_BUCKET_NAME")

入力したテーブルバケット名を読み取ります

table_bucket_name = Kino.Input.read(table_bucket_name_input)

S3 Tables のテーブルバケットを作成します

buckets_res =
  client
  |> AWS.S3Tables.create_table_bucket(%{
    "name" => table_bucket_name
  })

後続 API で使うため、作成したテーブルバケットの ARN を取り出します

buckets_arn =
  buckets_res
  |> elem(1)
  |> Map.get("arn")

テーブルバケットの一覧を表示して、作成結果を確認します

client
|> AWS.S3Tables.list_table_buckets()
|> elem(1)
|> Map.get("tableBuckets")

名前空間作成

テーブルをまとめる論理的な単位として名前空間を作成します

namespace_res =
  client
  |> AWS.S3Tables.create_namespace(buckets_arn, %{
    "namespace" => ["sample_namespace"]
  })

名前空間一覧を表示して、作成できたことを確認します

client
|> AWS.S3Tables.list_namespaces(buckets_arn)
|> elem(1)
|> Map.get("namespaces")

テーブル作成

Explorer の型情報を S3 Tables 用のスキーマ定義に変換します

fields =
  wine_df
  |> Map.get(:dtypes)
  |> Enum.map(fn {name, type} ->
    %{
      "name" => name,
      "type" => case type do
        {:f, _} -> "float"
        {:s, _} -> "int"
        _ -> "string"
      end
    }
  end)

Iceberg 形式で wine テーブルを作成します

table_res =
  client
  |> AWS.S3Tables.create_table("sample_namespace", buckets_arn, %{
    "name" => "wine",
    "format" => "ICEBERG",
    "metadata" => %{
      "iceberg" => %{
        "schema" => %{
          "fields" => fields
        }
      }
    }
  })

テーブル一覧を表示して、作成されたテーブルを確認します

client
|> AWS.S3Tables.list_tables(buckets_arn)
|> elem(1)
|> Map.get("tables")

データの追加

Athena に SQL を投げて実行 ID を返すヘルパー関数を定義します

exec_query = fn query ->
  client
  |> AWS.Athena.start_query_execution(%{
    "QueryString" => query,
    "ClientRequestToken" => UUID.uuid1(),
    "QueryExecutionContext" => %{
      "Catalog" => "s3tablescatalog/#{table_bucket_name}",
      "Database" => "sample_namespace"      
    },
    "ResultConfiguration" => %{
      "OutputLocation" => "s3://#{output_bucket_name}"
    }
  })
  |> elem(1)
  |> Map.get("QueryExecutionId")
end

クエリ実行状態を取得するヘルパー関数を定義します

get_state = fn exec_id ->
  client
  |> AWS.Athena.get_query_execution(%{"QueryExecutionId" => exec_id})
  |> elem(1)
  |> then(& &1["QueryExecution"]["Status"]["State"])
end

Athena の結果を DataFrame に変換するヘルパー関数を定義します

get_results = fn exec_id ->
  results =
    client
    |> AWS.Athena.get_query_results(%{"QueryExecutionId" => exec_id})
    |> elem(1)

  results_df =
    results
    |> then(& &1["ResultSet"]["Rows"])
    |> Enum.map(& &1["Data"])
    |> then(fn columns ->
      [headers | values] = columns

      headers
      |> Enum.map(& &1["VarCharValue"])
      |> Enum.with_index()
      |> Enum.reduce(%{}, fn {col_name, index}, acc ->
        col_values =
          Enum.map(values, fn each_values ->
            each_values
            |> Enum.at(index)
            |> then(& &1["VarCharValue"])
          end)

        %{col_name => col_values}
        |> Map.merge(acc)
      end)
    end)
    |> DataFrame.new()

  results
  |> then(& &1["ResultSet"]["ResultSetMetadata"]["ColumnInfo"])
  |> Enum.map(fn info ->
    {
      info["Name"],
      case info["Type"] do
        "integer" ->
          :integer

        "float" ->
          :float

        _ ->
          :string
      end
    }
  end)
  |> Enum.reduce(results_df, fn {col_name, col_type}, df ->
    DataFrame.mutate_with(df, &%{col_name => Series.cast(&1[col_name], col_type)})
  end)
end

念のため既存データを削除して、再実行しやすい状態にします

exec_id = exec_query.("DELETE FROM \"wine\";")

削除クエリの実行状態を確認します

get_state.(exec_id)

削除後のテーブル内容を取得するクエリを実行します

exec_id = exec_query.("SELECT * FROM \"wine\";")

削除後に空になっていることを確認します

exec_id
|> get_results.()
|> Kino.DataTable.new()

INSERT 文を組み立てるために列名を取り出します

cols = Explorer.DataFrame.names(wine_df)

DataFrame の各行を SQL 用の値リストに変換します

vals =
  wine_df
  |> Explorer.DataFrame.to_rows()
  |> Enum.map(fn row ->
    Enum.map(cols, fn name ->
      Map.get(row, name) |> then(fn val -> "#{val}" end)
    end)
  end)

各行に対応する INSERT 文の一覧を作成します

queries =
  vals
  |> Enum.map(fn row ->
    "INSERT INTO wine (#{Enum.join(cols, ",")}) VALUES (#{Enum.join(row, ",")})"
  end)

INSERT 文を順番に実行して、状態を表示します

Enum.map(queries, fn query ->
  exec_id = exec_query.(query)

  Process.sleep(1_000)

  exec_id
  |> get_state.()
  |> IO.inspect()
end)

データ取得

テーブル全件を取得するクエリを実行します

exec_id = exec_query.("SELECT * FROM \"wine\"")

取得結果をテーブル表示します

exec_id
|> get_results.()
|> Kino.DataTable.new()

条件付き検索の例として、一部の列だけを絞り込んで取得します

exec_id = exec_query.("
  SELECT
    class,
    color_intensity,
    flavanoids
  FROM
    wine
  WHERE
    color_intensity < 5.0
    AND flavanoids >= 2.0
  ORDER BY
    class
  ")

条件付き検索の結果を表示します

exec_id
|> get_results.()
|> Kino.DataTable.new()

集計の例として、class ごとの最大 alcohol を取得します

exec_id = exec_query.("
  SELECT
    class,
    max(alcohol) AS alcohol
  FROM
    wine
  GROUP BY
    class
  ORDER BY
    alcohol DESC
  ")

集計クエリの結果を表示します

exec_id
|> get_results.()
|> Kino.DataTable.new()

テーブル削除

作成した wine テーブルを削除します

AWS.S3Tables.delete_table(client, "wine", "sample_namespace", buckets_arn, %{})

名前空間削除

テーブル削除後に名前空間も削除します

AWS.S3Tables.delete_namespace(client, "sample_namespace", buckets_arn, %{})

テーブルバケット削除

最後に S3 Tables のテーブルバケットを削除します

AWS.S3Tables.delete_table_bucket(client, buckets_arn, %{})

出力先バケット削除

バケット削除の前に、Athena が出力したオブジェクト一覧を取得します

keys =
  client
  |> AWS.S3.list_objects(output_bucket_name)
  |> elem(1)
  |> Map.get("ListBucketResult")
  |> Map.get("Contents", [])
  |> Enum.map(fn content -> Map.get(content, "Key") end)

削除対象オブジェクトを先にすべて消します

keys
|> Enum.map(fn key ->
  AWS.S3.delete_object(client, output_bucket_name, key, %{})
end)

空になった出力先バケット自体を削除します

AWS.S3.delete_bucket(client, output_bucket_name, %{})