Amazon S3 Tables
Mix.install([
{:aws, "~> 1.0"},
{:hackney, "~> 1.16"},
{:uuid, "~> 1.1"},
{:explorer, "~> 0.11"},
{:kino, "~> 0.19"}
])
概要
S3 Tables は、Amazon S3 上に Apache Iceberg テーブルを管理し、Athena などから SQL で扱いやすくする仕組みです この Livebook では、認証、カタログ作成、テーブルバケット作成、データ投入、検索、削除までを順番に試します
準備
Explorer の別名を定義して、後続のセルを読みやすくします
alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame
認証
AWS のアクセスキー、シークレットキー、リージョンを Livebook 上で入力できるようにします
access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")
[
access_key_id_input,
secret_access_key_input,
region_input
]
|> Kino.Layout.grid(columns: 3)
入力した認証情報から AWS クライアントを作成します
client =
AWS.Client.create(
Kino.Input.read(access_key_id_input),
Kino.Input.read(secret_access_key_input),
Kino.Input.read(region_input)
)
後続で使うリージョン文字列を取り出します
region = Kino.Input.read(region_input)
Glue カタログ作成時に使う AWS アカウント ID を取得します
account_id =
client
|> AWS.STS.get_caller_identity(%{})
|> elem(1)
|> Map.get("GetCallerIdentityResponse")
|> Map.get("GetCallerIdentityResult")
|> Map.get("Account")
データ作成
サンプルとして Explorer の wine データセットを読み込み、内容を確認します
wine_df = Explorer.Datasets.wine()
Kino.DataTable.new(wine_df)
データカタログの作成
Glue に s3tablescatalog が既にあるかを確認します
exist_s3tablescatalog =
client
|> AWS.Glue.get_catalogs(%{})
|> elem(1)
|> Map.get("CatalogList")
|> Enum.any?(fn catalog -> Map.get(catalog, "Name") == "s3tablescatalog" end)
存在しない場合だけ、S3 Tables 用の Federated Catalog を作成します
if not exist_s3tablescatalog do
client
|> AWS.Glue.create_catalog(%{
"Name" => "s3tablescatalog",
"CatalogInput" => %{
"Description" => "Federated catalog for S3 Tables",
"FederatedCatalog" => %{
"Identifier" => "arn:aws:s3tables:#{region}:#{account_id}:bucket/*",
"ConnectionName" => "aws:s3tables"
},
"CreateDatabaseDefaultPermissions" => [%{
"Principal" => %{
"DataLakePrincipalIdentifier" => "IAM_ALLOWED_PRINCIPALS"
},
"Permissions" => ["ALL"]
}],
"CreateTableDefaultPermissions" => [%{
"Principal" => %{
"DataLakePrincipalIdentifier" => "IAM_ALLOWED_PRINCIPALS"
},
"Permissions" => ["ALL"]
}]
}
})
end
作成後のカタログ一覧を表示して、登録されたことを確認します
client
|> AWS.Glue.get_catalogs(%{})
|> elem(1)
|> Map.get("CatalogList")
出力先バケット作成
Athena のクエリ結果を書き出す S3 バケット名を入力します
output_bucket_name_input = Kino.Input.text("OUTPUT_BUCKET_NAME")
入力したバケット名を読み取ります
output_bucket_name = Kino.Input.read(output_bucket_name_input)
Athena の結果出力先となるバケットを作成します
AWS.S3.create_bucket(client, output_bucket_name, %{
"CreateBucketConfiguration" => %{
"LocationConstraint" => region
}
})
テーブルバケット作成
S3 Tables 本体の保存先となるテーブルバケット名を入力します
table_bucket_name_input = Kino.Input.text("TABLE_BUCKET_NAME")
入力したテーブルバケット名を読み取ります
table_bucket_name = Kino.Input.read(table_bucket_name_input)
S3 Tables のテーブルバケットを作成します
buckets_res =
client
|> AWS.S3Tables.create_table_bucket(%{
"name" => table_bucket_name
})
後続 API で使うため、作成したテーブルバケットの ARN を取り出します
buckets_arn =
buckets_res
|> elem(1)
|> Map.get("arn")
テーブルバケットの一覧を表示して、作成結果を確認します
client
|> AWS.S3Tables.list_table_buckets()
|> elem(1)
|> Map.get("tableBuckets")
名前空間作成
テーブルをまとめる論理的な単位として名前空間を作成します
namespace_res =
client
|> AWS.S3Tables.create_namespace(buckets_arn, %{
"namespace" => ["sample_namespace"]
})
名前空間一覧を表示して、作成できたことを確認します
client
|> AWS.S3Tables.list_namespaces(buckets_arn)
|> elem(1)
|> Map.get("namespaces")
テーブル作成
Explorer の型情報を S3 Tables 用のスキーマ定義に変換します
fields =
wine_df
|> Map.get(:dtypes)
|> Enum.map(fn {name, type} ->
%{
"name" => name,
"type" => case type do
{:f, _} -> "float"
{:s, _} -> "int"
_ -> "string"
end
}
end)
Iceberg 形式で wine テーブルを作成します
table_res =
client
|> AWS.S3Tables.create_table("sample_namespace", buckets_arn, %{
"name" => "wine",
"format" => "ICEBERG",
"metadata" => %{
"iceberg" => %{
"schema" => %{
"fields" => fields
}
}
}
})
テーブル一覧を表示して、作成されたテーブルを確認します
client
|> AWS.S3Tables.list_tables(buckets_arn)
|> elem(1)
|> Map.get("tables")
データの追加
Athena に SQL を投げて実行 ID を返すヘルパー関数を定義します
exec_query = fn query ->
client
|> AWS.Athena.start_query_execution(%{
"QueryString" => query,
"ClientRequestToken" => UUID.uuid1(),
"QueryExecutionContext" => %{
"Catalog" => "s3tablescatalog/#{table_bucket_name}",
"Database" => "sample_namespace"
},
"ResultConfiguration" => %{
"OutputLocation" => "s3://#{output_bucket_name}"
}
})
|> elem(1)
|> Map.get("QueryExecutionId")
end
クエリ実行状態を取得するヘルパー関数を定義します
get_state = fn exec_id ->
client
|> AWS.Athena.get_query_execution(%{"QueryExecutionId" => exec_id})
|> elem(1)
|> then(& &1["QueryExecution"]["Status"]["State"])
end
Athena の結果を DataFrame に変換するヘルパー関数を定義します
get_results = fn exec_id ->
results =
client
|> AWS.Athena.get_query_results(%{"QueryExecutionId" => exec_id})
|> elem(1)
results_df =
results
|> then(& &1["ResultSet"]["Rows"])
|> Enum.map(& &1["Data"])
|> then(fn columns ->
[headers | values] = columns
headers
|> Enum.map(& &1["VarCharValue"])
|> Enum.with_index()
|> Enum.reduce(%{}, fn {col_name, index}, acc ->
col_values =
Enum.map(values, fn each_values ->
each_values
|> Enum.at(index)
|> then(& &1["VarCharValue"])
end)
%{col_name => col_values}
|> Map.merge(acc)
end)
end)
|> DataFrame.new()
results
|> then(& &1["ResultSet"]["ResultSetMetadata"]["ColumnInfo"])
|> Enum.map(fn info ->
{
info["Name"],
case info["Type"] do
"integer" ->
:integer
"float" ->
:float
_ ->
:string
end
}
end)
|> Enum.reduce(results_df, fn {col_name, col_type}, df ->
DataFrame.mutate_with(df, &%{col_name => Series.cast(&1[col_name], col_type)})
end)
end
念のため既存データを削除して、再実行しやすい状態にします
exec_id = exec_query.("DELETE FROM \"wine\";")
削除クエリの実行状態を確認します
get_state.(exec_id)
削除後のテーブル内容を取得するクエリを実行します
exec_id = exec_query.("SELECT * FROM \"wine\";")
削除後に空になっていることを確認します
exec_id
|> get_results.()
|> Kino.DataTable.new()
INSERT 文を組み立てるために列名を取り出します
cols = Explorer.DataFrame.names(wine_df)
DataFrame の各行を SQL 用の値リストに変換します
vals =
wine_df
|> Explorer.DataFrame.to_rows()
|> Enum.map(fn row ->
Enum.map(cols, fn name ->
Map.get(row, name) |> then(fn val -> "#{val}" end)
end)
end)
各行に対応する INSERT 文の一覧を作成します
queries =
vals
|> Enum.map(fn row ->
"INSERT INTO wine (#{Enum.join(cols, ",")}) VALUES (#{Enum.join(row, ",")})"
end)
INSERT 文を順番に実行して、状態を表示します
Enum.map(queries, fn query ->
exec_id = exec_query.(query)
Process.sleep(1_000)
exec_id
|> get_state.()
|> IO.inspect()
end)
データ取得
テーブル全件を取得するクエリを実行します
exec_id = exec_query.("SELECT * FROM \"wine\"")
取得結果をテーブル表示します
exec_id
|> get_results.()
|> Kino.DataTable.new()
条件付き検索の例として、一部の列だけを絞り込んで取得します
exec_id = exec_query.("
SELECT
class,
color_intensity,
flavanoids
FROM
wine
WHERE
color_intensity < 5.0
AND flavanoids >= 2.0
ORDER BY
class
")
条件付き検索の結果を表示します
exec_id
|> get_results.()
|> Kino.DataTable.new()
集計の例として、class ごとの最大 alcohol を取得します
exec_id = exec_query.("
SELECT
class,
max(alcohol) AS alcohol
FROM
wine
GROUP BY
class
ORDER BY
alcohol DESC
")
集計クエリの結果を表示します
exec_id
|> get_results.()
|> Kino.DataTable.new()
テーブル削除
作成した wine テーブルを削除します
AWS.S3Tables.delete_table(client, "wine", "sample_namespace", buckets_arn, %{})
名前空間削除
テーブル削除後に名前空間も削除します
AWS.S3Tables.delete_namespace(client, "sample_namespace", buckets_arn, %{})
テーブルバケット削除
最後に S3 Tables のテーブルバケットを削除します
AWS.S3Tables.delete_table_bucket(client, buckets_arn, %{})
出力先バケット削除
バケット削除の前に、Athena が出力したオブジェクト一覧を取得します
keys =
client
|> AWS.S3.list_objects(output_bucket_name)
|> elem(1)
|> Map.get("ListBucketResult")
|> Map.get("Contents", [])
|> Enum.map(fn content -> Map.get(content, "Key") end)
削除対象オブジェクトを先にすべて消します
keys
|> Enum.map(fn key ->
AWS.S3.delete_object(client, output_bucket_name, key, %{})
end)
空になった出力先バケット自体を削除します
AWS.S3.delete_bucket(client, output_bucket_name, %{})