Amazon Athena
Mix.install([
{:aws, "~> 0.13"},
{:uuid, "~> 1.1"},
{:hackney, "~> 1.20"},
{:explorer, "~> 0.9"},
{:kino, "~> 0.14"}
])
準備
alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame
認証
access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")
[
access_key_id_input,
secret_access_key_input,
region_input
]
|> Kino.Layout.grid(columns: 3)
client =
AWS.Client.create(
Kino.Input.read(access_key_id_input),
Kino.Input.read(secret_access_key_input),
Kino.Input.read(region_input)
)
データ作成
wine_df = Explorer.Datasets.wine()
Kino.DataTable.new(wine_df)
# CSV に保存
csv_filename = "wine.csv"
DataFrame.to_csv(wine_df, csv_filename)
データアップロード
# バケット一覧の確認
buckets_res =
client
|> AWS.S3.list_buckets()
|> elem(1)
Kino.DataTable.new(buckets_res["ListAllMyBucketsResult"]["Buckets"]["Bucket"])
bucket_name_input = Kino.Input.text("BUCKET_NAME")
bucket_name = Kino.Input.read(bucket_name_input)
athena_prefix = "athena/"
file = File.read!(csv_filename)
md5 = :crypto.hash(:md5, file) |> Base.encode64()
client
|> AWS.S3.put_object(
bucket_name,
athena_prefix <> csv_filename,
%{"Body" => file, "ContentMD5" => md5}
)
データベース定義
create_db_query = "CREATE DATABASE athena_sample"
create_db_token = UUID.uuid1()
exec_id =
client
|> AWS.Athena.start_query_execution(%{
"QueryString" => create_db_query,
"ClientRequestToken" => create_db_token,
"ResultConfiguration" => %{
"OutputLocation" => "s3://#{bucket_name}"
}
})
|> elem(1)
|> Map.get("QueryExecutionId")
client
|> AWS.Athena.get_query_execution(%{"QueryExecutionId" => exec_id})
|> elem(1)
|> then(& &1["QueryExecution"]["Status"]["State"])
SQL実行関数定義
exec_query = fn query ->
client
|> AWS.Athena.start_query_execution(%{
"QueryString" => query,
"ClientRequestToken" => UUID.uuid1(),
"ResultConfiguration" => %{
"OutputLocation" => "s3://#{bucket_name}"
}
})
|> elem(1)
|> Map.get("QueryExecutionId")
end
get_state = fn exec_id ->
client
|> AWS.Athena.get_query_execution(%{"QueryExecutionId" => exec_id})
|> elem(1)
|> then(& &1["QueryExecution"]["Status"]["State"])
end
テーブル作成
# 再実行時のために削除
exec_id = exec_query.("DROP TABLE IF EXISTS athena_sample.wine_table")
get_state.(exec_id)
types =
wine_df
|> DataFrame.names()
|> Enum.map(fn name ->
{name, DataFrame.dtypes(wine_df)[name]}
end)
# CREATE 文の列定義を生成
table_columns =
types
|> Enum.map(fn {name, type} ->
name <>
" " <>
case type do
:integer ->
"int"
:float ->
"float"
end
end)
|> Enum.join(",")
exec_id = exec_query.("
CREATE EXTERNAL TABLE
athena_sample.wine_table (#{table_columns})
ROW FORMAT
SerDe 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SerDeProperties (
'serialization.format' = ',',
'field.delim' = ','
)
STORED AS TEXTFILE
LOCATION
's3://#{bucket_name}/#{athena_prefix}'
TBLPROPERTIES (
'has_encrypted_data'='false',
'skip.header.line.count'='1'
)
")
get_state.(exec_id)
データ取得
exec_id = exec_query.("
SELECT
*
FROM
athena_sample.wine_table
")
get_state.(exec_id)
# 実行結果の取得
results =
client
|> AWS.Athena.get_query_results(%{"QueryExecutionId" => exec_id})
|> elem(1)
# 列名と型を取得
types =
results
|> then(& &1["ResultSet"]["ResultSetMetadata"]["ColumnInfo"])
|> Enum.map(fn info ->
{
info["Name"],
case info["Type"] do
"integer" ->
:integer
"float" ->
:float
_ ->
:string
end
}
end)
# データを取得
results_df =
results
|> then(& &1["ResultSet"]["Rows"])
|> Enum.map(& &1["Data"])
|> then(fn columns ->
[headers | values] = columns
headers
|> Enum.map(& &1["VarCharValue"])
|> Enum.with_index()
|> Enum.reduce(%{}, fn {col_name, index}, acc ->
col_values =
Enum.map(values, fn each_values ->
each_values
|> Enum.at(index)
|> then(& &1["VarCharValue"])
end)
%{col_name => col_values}
|> Map.merge(acc)
end)
end)
|> DataFrame.new()
parsed_df =
types
|> Enum.reduce(results_df, fn {col_name, col_type}, df ->
DataFrame.mutate_with(df, &%{col_name => Series.cast(&1[col_name], col_type)})
end)
Kino.DataTable.new(parsed_df)
get_results = fn exec_id ->
results =
client
|> AWS.Athena.get_query_results(%{"QueryExecutionId" => exec_id})
|> elem(1)
results_df =
results
|> then(& &1["ResultSet"]["Rows"])
|> Enum.map(& &1["Data"])
|> then(fn columns ->
[headers | values] = columns
headers
|> Enum.map(& &1["VarCharValue"])
|> Enum.with_index()
|> Enum.reduce(%{}, fn {col_name, index}, acc ->
col_values =
Enum.map(values, fn each_values ->
each_values
|> Enum.at(index)
|> then(& &1["VarCharValue"])
end)
%{col_name => col_values}
|> Map.merge(acc)
end)
end)
|> DataFrame.new()
results
|> then(& &1["ResultSet"]["ResultSetMetadata"]["ColumnInfo"])
|> Enum.map(fn info ->
{
info["Name"],
case info["Type"] do
"integer" ->
:integer
"float" ->
:float
_ ->
:string
end
}
end)
|> Enum.reduce(results_df, fn {col_name, col_type}, df ->
DataFrame.mutate_with(df, &%{col_name => Series.cast(&1[col_name], col_type)})
end)
end
exec_id = exec_query.("
SELECT
class,
color_intensity,
flavanoids
FROM
athena_sample.wine_table
WHERE
color_intensity < 5.0
AND flavanoids >= 2.0
ORDER BY
class
")
exec_id
|> get_results.()
|> Kino.DataTable.new()
exec_id = exec_query.("
SELECT
class,
max(alcohol) AS alcohol
FROM
athena_sample.wine_table
GROUP BY
class
ORDER BY
alcohol DESC
")
exec_id
|> get_results.()
|> Kino.DataTable.new()
テーブル削除
exec_id = exec_query.("DROP TABLE athena_sample.wine_table")
get_state.(exec_id)
データベース削除
exec_id = exec_query.("DROP DATABASE athena_sample")
get_state.(exec_id)
DataFrameでの操作
wine_df
|> DataFrame.filter(color_intensity < 5)
|> DataFrame.filter(flavanoids >= 2.0)
|> DataFrame.sort_by(class)
|> DataFrame.select(["class", "color_intensity", "flavanoids"])
|> Kino.DataTable.new()
wine_df
|> DataFrame.group_by("class")
|> DataFrame.summarise(alcohol: max(alcohol))
|> DataFrame.sort_by(desc: alcohol)
|> Kino.DataTable.new()