Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

AWS Dynamo

livebooks/aws/dynamo.livemd

AWS Dynamo

Mix.install([
  {:ex_aws, "~> 2.5"},
  {:ex_aws_dynamo, "~> 4.2"},
  {:poison, "~> 5.0"},
  {:hackney, "~> 1.20"},
  {:sweet_xml, "~> 0.7"},
  {:explorer, "~> 0.9"},
  {:kino, "~> 0.14"}
])

準備

alias ExAws.Dynamo
alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame

認証

access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")

[
  access_key_id_input,
  secret_access_key_input,
  region_input
]
|> Kino.Layout.grid(columns: 3)
auth_config = [
  access_key_id: Kino.Input.read(access_key_id_input),
  secret_access_key: Kino.Input.read(secret_access_key_input),
  region: Kino.Input.read(region_input)
]

Kino.nothing()

テーブル作成

df = Explorer.Datasets.fossil_fuels()

Kino.DataTable.new(df)
table_name = "fossil_fuels"
key_schema = [
  year: :hash,
  country: :range
]

key_types = [
  year: :number,
  country: :string
]

{read_capacity, write_capacity} = {1, 1}
table_name
|> Dynamo.create_table(key_schema, key_types, read_capacity, write_capacity)
|> ExAws.request(auth_config)
Dynamo.list_tables()
|> ExAws.request(auth_config)

データ追加

item =
  df
  |> DataFrame.to_rows()
  |> Enum.at(0)
table_name
|> Dynamo.put_item(item)
|> ExAws.request(auth_config)
df
|> DataFrame.put(
  :initial,
  Series.transform(df["country"], fn country -> String.first(country) end)
)
|> DataFrame.filter(initial == "A")
|> DataFrame.filter(year >= 2012)
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()
df
|> DataFrame.put(
  :initial,
  Series.transform(df["country"], fn country -> String.first(country) end)
)
|> DataFrame.filter(initial == "A")
|> DataFrame.filter(year >= 2012)
|> DataFrame.select(DataFrame.names(df))
|> DataFrame.to_rows()
|> Enum.with_index()
|> Enum.map(fn {row, index} ->
  IO.inspect(index)

  Process.sleep(500)

  table_name
  |> Dynamo.put_item(row)
  |> ExAws.request(auth_config)
  |> IO.inspect()
end)

データ取得

res =
  table_name
  |> Dynamo.scan()
  |> ExAws.request!(auth_config)
decode = fn item ->
  item
  |> Enum.reduce(%{}, fn {key, type_value}, merged ->
    type =
      type_value
      |> Map.keys()
      |> Enum.at(0)

    value =
      type_value
      |> Map.values()
      |> Enum.at(0)

    parsed =
      case type do
        "N" ->
          if String.contains?(value, ".") do
            String.to_float(value)
          else
            String.to_integer(value)
          end

        "S" ->
          value
      end

    %{key => parsed}
    |> Map.merge(merged)
  end)
end
decoded_df =
  res["Items"]
  |> Enum.map(&decode.(&1))
  |> DataFrame.new()

decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()
decoded_df =
  table_name
  |> Dynamo.scan(
    limit: 3,
    expression_attribute_values: [value: 1_000],
    expression_attribute_names: %{"#name" => "total"},
    filter_expression: "#name > :value"
  )
  |> ExAws.request!(auth_config)
  |> Map.get("Items")
  |> Enum.map(&decode.(&1))
  |> DataFrame.new()

decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()
decoded_df =
  table_name
  |> Dynamo.query(
    limit: 5,
    expression_attribute_values: [value: 2013],
    expression_attribute_names: %{"#name" => "year"},
    key_condition_expression: "#name = :value"
  )
  |> ExAws.request!(auth_config)
  |> Map.get("Items")
  |> Enum.map(&decode.(&1))
  |> DataFrame.new()

decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()
decoded_df =
  table_name
  |> Dynamo.get_item(%{year: 2012, country: "AFGHANISTAN"})
  |> ExAws.request!(auth_config)
  |> Map.get("Item")
  |> decode.()
  |> List.wrap()
  |> DataFrame.new()

decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()

データ更新

table_name
|> Dynamo.update_item(
  %{year: 2012, country: "AFGHANISTAN"},
  expression_attribute_values: [cement_value: 6],
  expression_attribute_names: %{"#cement_name" => "cement"},
  update_expression: "set #cement_name = :cement_value"
)
|> ExAws.request!(auth_config)
decoded_df =
  table_name
  |> Dynamo.get_item(%{year: 2012, country: "AFGHANISTAN"})
  |> ExAws.request!(auth_config)
  |> Map.get("Item")
  |> decode.()
  |> List.wrap()
  |> DataFrame.new()

decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()

データ削除

table_name
|> Dynamo.delete_item(%{year: 2012, country: "AFGHANISTAN"})
|> ExAws.request!(auth_config)
decoded_df =
  table_name
  |> Dynamo.scan()
  |> ExAws.request!(auth_config)
  |> Map.get("Items")
  |> Enum.map(&decode.(&1))
  |> DataFrame.new()

decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()

テーブル設定更新

table_name
|> Dynamo.update_table(%{
  provisioned_throughput: %{
    read_capacity_units: 3,
    write_capacity_units: 3
  }
})
|> ExAws.request(auth_config)
table_name
|> Dynamo.describe_table()
|> ExAws.request(auth_config)

テーブル削除

table_name
|> Dynamo.delete_table()
|> ExAws.request(auth_config)
Dynamo.list_tables()
|> ExAws.request(auth_config)