Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Amazon Athena

livebooks/aws/athena.livemd

Amazon Athena

Mix.install([
  {:aws, "~> 0.13"},
  {:uuid, "~> 1.1"},
  {:hackney, "~> 1.20"},
  {:explorer, "~> 0.8"},
  {:kino, "~> 0.13"}
])

準備

alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame

認証

access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")

[
  access_key_id_input,
  secret_access_key_input,
  region_input
]
|> Kino.Layout.grid(columns: 3)
client =
  AWS.Client.create(
    Kino.Input.read(access_key_id_input),
    Kino.Input.read(secret_access_key_input),
    Kino.Input.read(region_input)
  )

データ作成

wine_df = Explorer.Datasets.wine()

Kino.DataTable.new(wine_df)
# CSV に保存
csv_filename = "wine.csv"

DataFrame.to_csv(wine_df, csv_filename)

データアップロード

# バケット一覧の確認
buckets_res =
  client
  |> AWS.S3.list_buckets()
  |> elem(1)
Kino.DataTable.new(buckets_res["ListAllMyBucketsResult"]["Buckets"]["Bucket"])
bucket_name_input = Kino.Input.text("BUCKET_NAME")
bucket_name = Kino.Input.read(bucket_name_input)
athena_prefix = "athena/"
file = File.read!(csv_filename)
md5 = :crypto.hash(:md5, file) |> Base.encode64()

client
|> AWS.S3.put_object(
  bucket_name,
  athena_prefix <> csv_filename,
  %{"Body" => file, "ContentMD5" => md5}
)

データベース定義

create_db_query = "CREATE DATABASE athena_sample"
create_db_token = UUID.uuid1()
exec_id =
  client
  |> AWS.Athena.start_query_execution(%{
    "QueryString" => create_db_query,
    "ClientRequestToken" => create_db_token,
    "ResultConfiguration" => %{
      "OutputLocation" => "s3://#{bucket_name}"
    }
  })
  |> elem(1)
  |> Map.get("QueryExecutionId")
client
|> AWS.Athena.get_query_execution(%{"QueryExecutionId" => exec_id})
|> elem(1)
|> then(&amp; &amp;1["QueryExecution"]["Status"]["State"])

SQL実行関数定義

exec_query = fn query ->
  client
  |> AWS.Athena.start_query_execution(%{
    "QueryString" => query,
    "ClientRequestToken" => UUID.uuid1(),
    "ResultConfiguration" => %{
      "OutputLocation" => "s3://#{bucket_name}"
    }
  })
  |> elem(1)
  |> Map.get("QueryExecutionId")
end
get_state = fn exec_id ->
  client
  |> AWS.Athena.get_query_execution(%{"QueryExecutionId" => exec_id})
  |> elem(1)
  |> then(&amp; &amp;1["QueryExecution"]["Status"]["State"])
end

テーブル作成

# 再実行時のために削除
exec_id = exec_query.("DROP TABLE IF EXISTS athena_sample.wine_table")
get_state.(exec_id)
types =
  wine_df
  |> DataFrame.names()
  |> Enum.map(fn name ->
    {name, DataFrame.dtypes(wine_df)[name]}
  end)
# CREATE 文の列定義を生成
table_columns =
  types
  |> Enum.map(fn {name, type} ->
    name <>
      " " <>
      case type do
        :integer ->
          "int"

        :float ->
          "float"
      end
  end)
  |> Enum.join(",")
exec_id = exec_query.("
  CREATE EXTERNAL TABLE
    athena_sample.wine_table (#{table_columns})
  ROW FORMAT
    SerDe 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
  WITH SerDeProperties (
    'serialization.format' = ',',
    'field.delim' = ','
  )
  STORED AS TEXTFILE
  LOCATION
    's3://#{bucket_name}/#{athena_prefix}' 
  TBLPROPERTIES (
    'has_encrypted_data'='false',
    'skip.header.line.count'='1'
  )
  ")
get_state.(exec_id)

データ取得

exec_id = exec_query.("
  SELECT
    *
  FROM
    athena_sample.wine_table
  ")
get_state.(exec_id)
# 実行結果の取得
results =
  client
  |> AWS.Athena.get_query_results(%{"QueryExecutionId" => exec_id})
  |> elem(1)
# 列名と型を取得
types =
  results
  |> then(&amp; &amp;1["ResultSet"]["ResultSetMetadata"]["ColumnInfo"])
  |> Enum.map(fn info ->
    {
      info["Name"],
      case info["Type"] do
        "integer" ->
          :integer

        "float" ->
          :float

        _ ->
          :string
      end
    }
  end)
# データを取得
results_df =
  results
  |> then(&amp; &amp;1["ResultSet"]["Rows"])
  |> Enum.map(&amp; &amp;1["Data"])
  |> then(fn columns ->
    [headers | values] = columns

    headers
    |> Enum.map(&amp; &amp;1["VarCharValue"])
    |> Enum.with_index()
    |> Enum.reduce(%{}, fn {col_name, index}, acc ->
      col_values =
        Enum.map(values, fn each_values ->
          each_values
          |> Enum.at(index)
          |> then(&amp; &amp;1["VarCharValue"])
        end)

      %{col_name => col_values}
      |> Map.merge(acc)
    end)
  end)
  |> DataFrame.new()
parsed_df =
  types
  |> Enum.reduce(results_df, fn {col_name, col_type}, df ->
    DataFrame.mutate_with(df, &amp;%{col_name => Series.cast(&amp;1[col_name], col_type)})
  end)
Kino.DataTable.new(parsed_df)
get_results = fn exec_id ->
  results =
    client
    |> AWS.Athena.get_query_results(%{"QueryExecutionId" => exec_id})
    |> elem(1)

  results_df =
    results
    |> then(&amp; &amp;1["ResultSet"]["Rows"])
    |> Enum.map(&amp; &amp;1["Data"])
    |> then(fn columns ->
      [headers | values] = columns

      headers
      |> Enum.map(&amp; &amp;1["VarCharValue"])
      |> Enum.with_index()
      |> Enum.reduce(%{}, fn {col_name, index}, acc ->
        col_values =
          Enum.map(values, fn each_values ->
            each_values
            |> Enum.at(index)
            |> then(&amp; &amp;1["VarCharValue"])
          end)

        %{col_name => col_values}
        |> Map.merge(acc)
      end)
    end)
    |> DataFrame.new()

  results
  |> then(&amp; &amp;1["ResultSet"]["ResultSetMetadata"]["ColumnInfo"])
  |> Enum.map(fn info ->
    {
      info["Name"],
      case info["Type"] do
        "integer" ->
          :integer

        "float" ->
          :float

        _ ->
          :string
      end
    }
  end)
  |> Enum.reduce(results_df, fn {col_name, col_type}, df ->
    DataFrame.mutate_with(df, &amp;%{col_name => Series.cast(&amp;1[col_name], col_type)})
  end)
end
exec_id = exec_query.("
  SELECT
    class,
    color_intensity,
    flavanoids
  FROM
    athena_sample.wine_table
  WHERE
    color_intensity < 5.0
    AND flavanoids >= 2.0
  ORDER BY
    class
  ")
exec_id
|> get_results.()
|> Kino.DataTable.new()
exec_id = exec_query.("
  SELECT
    class,
    max(alcohol) AS alcohol
  FROM
    athena_sample.wine_table
  GROUP BY
    class
  ORDER BY
    alcohol DESC
  ")
exec_id
|> get_results.()
|> Kino.DataTable.new()

テーブル削除

exec_id = exec_query.("DROP TABLE athena_sample.wine_table")
get_state.(exec_id)

データベース削除

exec_id = exec_query.("DROP DATABASE athena_sample")
get_state.(exec_id)

DataFrameでの操作

wine_df
|> DataFrame.filter(color_intensity < 5)
|> DataFrame.filter(flavanoids >= 2.0)
|> DataFrame.sort_by(class)
|> DataFrame.select(["class", "color_intensity", "flavanoids"])
|> Kino.DataTable.new()
wine_df
|> DataFrame.group_by("class")
|> DataFrame.summarise(alcohol: max(alcohol))
|> DataFrame.sort_by(desc: alcohol)
|> Kino.DataTable.new()