AWS Dynamo
Mix.install([
{:ex_aws, "~> 2.5"},
{:ex_aws_dynamo, "~> 4.2"},
{:poison, "~> 5.0"},
{:hackney, "~> 1.20"},
{:sweet_xml, "~> 0.7"},
{:explorer, "~> 0.9"},
{:kino, "~> 0.14"}
])
準備
alias ExAws.Dynamo
alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame
認証
access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")
[
access_key_id_input,
secret_access_key_input,
region_input
]
|> Kino.Layout.grid(columns: 3)
auth_config = [
access_key_id: Kino.Input.read(access_key_id_input),
secret_access_key: Kino.Input.read(secret_access_key_input),
region: Kino.Input.read(region_input)
]
Kino.nothing()
テーブル作成
df = Explorer.Datasets.fossil_fuels()
Kino.DataTable.new(df)
table_name = "fossil_fuels"
key_schema = [
year: :hash,
country: :range
]
key_types = [
year: :number,
country: :string
]
{read_capacity, write_capacity} = {1, 1}
table_name
|> Dynamo.create_table(key_schema, key_types, read_capacity, write_capacity)
|> ExAws.request(auth_config)
Dynamo.list_tables()
|> ExAws.request(auth_config)
データ追加
item =
df
|> DataFrame.to_rows()
|> Enum.at(0)
table_name
|> Dynamo.put_item(item)
|> ExAws.request(auth_config)
df
|> DataFrame.put(
:initial,
Series.transform(df["country"], fn country -> String.first(country) end)
)
|> DataFrame.filter(initial == "A")
|> DataFrame.filter(year >= 2012)
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()
df
|> DataFrame.put(
:initial,
Series.transform(df["country"], fn country -> String.first(country) end)
)
|> DataFrame.filter(initial == "A")
|> DataFrame.filter(year >= 2012)
|> DataFrame.select(DataFrame.names(df))
|> DataFrame.to_rows()
|> Enum.with_index()
|> Enum.map(fn {row, index} ->
IO.inspect(index)
Process.sleep(500)
table_name
|> Dynamo.put_item(row)
|> ExAws.request(auth_config)
|> IO.inspect()
end)
データ取得
res =
table_name
|> Dynamo.scan()
|> ExAws.request!(auth_config)
decode = fn item ->
item
|> Enum.reduce(%{}, fn {key, type_value}, merged ->
type =
type_value
|> Map.keys()
|> Enum.at(0)
value =
type_value
|> Map.values()
|> Enum.at(0)
parsed =
case type do
"N" ->
if String.contains?(value, ".") do
String.to_float(value)
else
String.to_integer(value)
end
"S" ->
value
end
%{key => parsed}
|> Map.merge(merged)
end)
end
decoded_df =
res["Items"]
|> Enum.map(&decode.(&1))
|> DataFrame.new()
decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()
decoded_df =
table_name
|> Dynamo.scan(
limit: 3,
expression_attribute_values: [value: 1_000],
expression_attribute_names: %{"#name" => "total"},
filter_expression: "#name > :value"
)
|> ExAws.request!(auth_config)
|> Map.get("Items")
|> Enum.map(&decode.(&1))
|> DataFrame.new()
decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()
decoded_df =
table_name
|> Dynamo.query(
limit: 5,
expression_attribute_values: [value: 2013],
expression_attribute_names: %{"#name" => "year"},
key_condition_expression: "#name = :value"
)
|> ExAws.request!(auth_config)
|> Map.get("Items")
|> Enum.map(&decode.(&1))
|> DataFrame.new()
decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()
decoded_df =
table_name
|> Dynamo.get_item(%{year: 2012, country: "AFGHANISTAN"})
|> ExAws.request!(auth_config)
|> Map.get("Item")
|> decode.()
|> List.wrap()
|> DataFrame.new()
decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()
データ更新
table_name
|> Dynamo.update_item(
%{year: 2012, country: "AFGHANISTAN"},
expression_attribute_values: [cement_value: 6],
expression_attribute_names: %{"#cement_name" => "cement"},
update_expression: "set #cement_name = :cement_value"
)
|> ExAws.request!(auth_config)
decoded_df =
table_name
|> Dynamo.get_item(%{year: 2012, country: "AFGHANISTAN"})
|> ExAws.request!(auth_config)
|> Map.get("Item")
|> decode.()
|> List.wrap()
|> DataFrame.new()
decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()
データ削除
table_name
|> Dynamo.delete_item(%{year: 2012, country: "AFGHANISTAN"})
|> ExAws.request!(auth_config)
decoded_df =
table_name
|> Dynamo.scan()
|> ExAws.request!(auth_config)
|> Map.get("Items")
|> Enum.map(&decode.(&1))
|> DataFrame.new()
decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()
テーブル設定更新
table_name
|> Dynamo.update_table(%{
provisioned_throughput: %{
read_capacity_units: 3,
write_capacity_units: 3
}
})
|> ExAws.request(auth_config)
table_name
|> Dynamo.describe_table()
|> ExAws.request(auth_config)
テーブル削除
table_name
|> Dynamo.delete_table()
|> ExAws.request(auth_config)
Dynamo.list_tables()
|> ExAws.request(auth_config)