Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

CO2可視化

livebooks/earthdata/oco2.livemd

CO2可視化

Mix.install(
  [
    {:nx, "~> 0.9"},
    {:evision, "~> 0.2"},
    {:exla, "~> 0.9"},
    {:netcdf, "~> 0.1"},
    {:geo, "~> 3.5"},
    {:kino, "~> 0.14"},
    {:kino_maplibre, "~> 0.1"},
    {:kino_vega_lite, "~> 0.1"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)

出典

NASA Earthdata の OCO-2 衛星データを使用

https://www.earthdata.nasa.gov/

https://ocov2.jpl.nasa.gov/

以下のように OCO2_L2_Lite_FP 11r のデータを格納しているものとする

  • tmp
    • OCO2
      • 201612
        • oco2_LtCO2_161201_B11014Ar_230111183332s.nc4
      • 201712
        • oco2_LtCO2_171201_B11014Ar_221129005324s.nc4
      • 201812
        • oco2_LtCO2_181201_B11014Ar_221017172333s.nc4
      • 202212
        • oco2_LtCO2_221201_B11014Ar_230118175544s.nc4
        • oco2_LtCO2_221202_B11014Ar_230118180140s.nc4
        • oco2_LtCO2_221203_B11014Ar_230118180210s.nc4

衛星データの読込

oco2_dir = "/tmp/OCO2/"
target_month_dir = "202212"
{:ok, file} =
  [oco2_dir, target_month_dir]
  |> Path.join()
  |> File.ls!()
  |> Enum.sort()
  |> Enum.at(0)
  |> then(&[oco2_dir, target_month_dir, &1])
  |> Path.join()
  |> NetCDF.File.open()
variables =
  ["latitude", "longitude", "vertex_latitude", "vertex_longitude", "xco2"]
  |> Enum.map(fn var_name ->
    file
    |> NetCDF.Variable.load(var_name)
    |> elem(1)
  end)
as_nx_type = fn
  :i8 -> :s8
  :i16 -> :s16
  :i32 -> :s32
  :i64 -> :s64
  t -> t
end

var_tensors =
  for var <- variables, into: %{} do
    tensor = Nx.tensor(var.value, type: as_nx_type.(var.type))
    {var.name, tensor}
  end

衛星データのグラフ表示

display_graph = fn tensors, item_name ->
  plot_data =
    tensors[item_name]
    |> Nx.to_flat_list()
    |> Enum.with_index()
    |> Enum.map(fn {item, index} ->
      %{
        "index" => index,
        item_name => item
      }
    end)

  VegaLite.new(width: 700)
  |> VegaLite.data_from_values(plot_data)
  |> VegaLite.mark(:line)
  |> VegaLite.encode_field(:x, "index", type: :quantitative)
  |> VegaLite.encode_field(:y, item_name, type: :quantitative)
end
display_graph.(var_tensors, "longitude")
display_graph.(var_tensors, "latitude")
display_graph.(var_tensors, "xco2")
{max_co2, min_co2} = {
  var_tensors["xco2"] |> Nx.reduce_max() |> Nx.to_number(),
  var_tensors["xco2"] |> Nx.reduce_min() |> Nx.to_number()
}
plot_data =
  var_tensors["xco2"]
  |> Nx.to_flat_list()
  |> Enum.with_index()
  |> Enum.map(fn {item, index} ->
    %{
      "index" => index,
      "xco2" => item
    }
  end)

y_scale = %{"domain" => [min_co2, max_co2]}

VegaLite.new(width: 700)
|> VegaLite.data_from_values(plot_data)
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "index", type: :quantitative)
|> VegaLite.encode_field(:y, "xco2", type: :quantitative, scale: y_scale)

衛星データの地図表示

corrected_co2 =
  var_tensors["xco2"]
  |> Nx.subtract(min_co2)
  |> Nx.multiply(255 / (max_co2 - min_co2))
  |> Nx.as_type(:u8)
jet_map =
  Nx.iota({256})
  |> Nx.tile([256, 1])
  |> Nx.as_type(:u8)
  |> Evision.Mat.from_nx_2d()
  |> then(&amp;[src: &amp;1, colormap: Evision.Constant.cv_COLORMAPcv_COLORMAP_JET_WINTER()])
  |> Evision.applyColorMap()
common_expression = ["step", ["get", "xco2"]]

bgr =
  jet_map
  |> Evision.Mat.to_nx(EXLA.Backend)
  |> then(&amp; &amp;1[[0, 0..255]])

b = bgr[[0..255, 0]]
g = bgr[[0..255, 1]]
r = bgr[[0..255, 2]]

get_expression = fn table_tensor ->
  [_ | tail] =
    [Nx.iota({256}), table_tensor]
    |> Nx.stack()
    |> Nx.transpose()
    |> Nx.to_flat_list()

  tail
end

r_expression = common_expression ++ get_expression.(r)
g_expression = common_expression ++ get_expression.(g)
b_expression = common_expression ++ get_expression.(b)

jet_expression = [
  "rgb",
  ["round", r_expression],
  ["round", g_expression],
  ["round", b_expression]
]
map_source = %{
  "latitude" => Nx.to_flat_list(var_tensors["latitude"]),
  "longitude" => Nx.to_flat_list(var_tensors["longitude"]),
  "xco2" => Nx.to_flat_list(corrected_co2)
}
MapLibre.new()
|> MapLibre.add_table_source(
  "xco2",
  map_source,
  {:lng_lat, ["longitude", "latitude"]},
  properties: ["xco2"]
)
|> MapLibre.add_layer(
  id: "xco2_layer",
  source: "xco2",
  type: :circle,
  paint: [circle_color: jet_expression]
)

1ヶ月分の衛星データを表示する

load_japan_data = fn filename ->
  {:ok, nc_file} =
    [oco2_dir, target_month_dir, filename]
    |> Path.join()
    |> NetCDF.File.open()

  longitude =
    nc_file
    |> NetCDF.Variable.load("longitude")
    |> elem(1)
    |> then(&amp;Nx.tensor(&amp;1.value, type: as_nx_type.(&amp;1.type)))

  latitude =
    nc_file
    |> NetCDF.Variable.load("latitude")
    |> elem(1)
    |> then(&amp;Nx.tensor(&amp;1.value, type: as_nx_type.(&amp;1.type)))

  # 経度127-147,緯度26-46の範囲を対象にする
  is_japan =
    Nx.logical_and(
      Nx.logical_and(Nx.greater_equal(longitude, 127), Nx.less(longitude, 147)),
      Nx.logical_and(Nx.greater_equal(latitude, 26), Nx.less(latitude, 46))
    )

  japan_indeces =
    is_japan
    |> Nx.to_flat_list()
    |> Enum.with_index()
    |> Enum.filter(fn {is_true, _} -> is_true > 0 end)
    |> Enum.map(fn {_, index} -> index end)

  xco2 =
    nc_file
    |> NetCDF.Variable.load("xco2")
    |> elem(1)
    |> then(&amp;Nx.tensor(&amp;1.value, type: as_nx_type.(&amp;1.type)))

  case japan_indeces do
    [] ->
      nil

    japan_indeces ->
      [longitude, latitude, xco2]
      |> Nx.stack()
      |> Nx.take(Nx.tensor(japan_indeces), axis: 1)
  end
end
target_month_tensor =
  [oco2_dir, target_month_dir]
  |> Path.join()
  |> File.ls!()
  |> Enum.sort()
  |> Enum.map(&amp;load_japan_data.(&amp;1))
  |> Enum.filter(&amp;(&amp;1 != nil))
  |> Nx.concatenate(axis: 1)
{max_co2, min_co2} = {
  target_month_tensor[2] |> Nx.reduce_max() |> Nx.to_number(),
  target_month_tensor[2] |> Nx.reduce_min() |> Nx.to_number()
}

target_month_corrected_co2 =
  target_month_tensor[2]
  |> Nx.subtract(min_co2)
  |> Nx.multiply(255 / (max_co2 - min_co2))
  |> Nx.as_type(:u8)
map_source = %{
  "latitude" => Nx.to_flat_list(target_month_tensor[1]),
  "longitude" => Nx.to_flat_list(target_month_tensor[0]),
  "xco2" => Nx.to_flat_list(target_month_corrected_co2)
}

MapLibre.new(center: {136, 36}, zoom: 3)
|> MapLibre.add_table_source(
  "xco2",
  map_source,
  {:lng_lat, ["longitude", "latitude"]},
  properties: ["xco2"]
)
|> MapLibre.add_layer(
  id: "xco2_layer",
  source: "xco2",
  type: :circle,
  paint: [circle_color: jet_expression]
)

多角形による地図表示

polygon_tensor =
  [
    var_tensors["vertex_longitude"],
    var_tensors["vertex_latitude"]
  ]
  |> Nx.stack()
  |> Nx.transpose()
  # 観測範囲数 * 4(頂点数) * 2(経度と緯度)にする
  |> Nx.reshape({:auto, 4, 2})
co2_polygon_tensor =
  [
    polygon_tensor,
    corrected_co2 |> Nx.new_axis(-1) |> Nx.tile([4]) |> Nx.new_axis(-1)
  ]
  |> Nx.concatenate(axis: 2)
is_japan =
  Nx.logical_and(
    Nx.logical_and(
      Nx.greater_equal(var_tensors["longitude"], 127),
      Nx.less(var_tensors["longitude"], 147)
    ),
    Nx.logical_and(
      Nx.greater_equal(var_tensors["latitude"], 26),
      Nx.less(var_tensors["latitude"], 46)
    )
  )

japan_indeces =
  is_japan
  |> Nx.to_flat_list()
  |> Enum.with_index()
  |> Enum.filter(fn {is_true, _} -> is_true > 0 end)
  |> Enum.map(fn {_, index} -> index end)
  |> Nx.tensor()
japan_co2_polygon_tensor = Nx.take(co2_polygon_tensor, japan_indeces, axis: 0)
geo_collection =
  japan_co2_polygon_tensor
  |> Nx.to_batched(1)
  |> Enum.map(fn co2_polygon ->
    coordinates =
      co2_polygon[[0, 0..3, 0..1]]
      |> Nx.to_flat_list()
      |> Enum.chunk_every(2)
      |> Enum.map(&amp;List.to_tuple(&amp;1))

    co2 =
      co2_polygon[[0, 0, 2]]
      |> Nx.to_number()

    %Geo.Polygon{
      coordinates: [coordinates],
      properties: %{"xco2" => co2}
    }
  end)
  |> Enum.to_list()
  |> then(&amp;%Geo.GeometryCollection{geometries: &amp;1})
MapLibre.new(center: {141, 30}, zoom: 4)
|> MapLibre.add_geo_source("co2", geo_collection)
|> MapLibre.add_layer(
  id: "co2_polygon",
  source: "co2",
  type: :fill,
  paint: [fill_color: jet_expression]
)

1ヶ月分の多角形表示

load_japan_polygon_data = fn filename ->
  {:ok, nc_file} =
    [oco2_dir, target_month_dir, filename]
    |> Path.join()
    |> NetCDF.File.open()

  longitude =
    nc_file
    |> NetCDF.Variable.load("longitude")
    |> elem(1)
    |> then(&amp;Nx.tensor(&amp;1.value, type: as_nx_type.(&amp;1.type)))

  latitude =
    nc_file
    |> NetCDF.Variable.load("latitude")
    |> elem(1)
    |> then(&amp;Nx.tensor(&amp;1.value, type: as_nx_type.(&amp;1.type)))

  is_japan =
    Nx.logical_and(
      Nx.logical_and(Nx.greater_equal(longitude, 127), Nx.less(longitude, 147)),
      Nx.logical_and(Nx.greater_equal(latitude, 26), Nx.less(latitude, 46))
    )

  japan_indeces =
    is_japan
    |> Nx.to_flat_list()
    |> Enum.with_index()
    |> Enum.filter(fn {is_true, _} -> is_true > 0 end)
    |> Enum.map(fn {_, index} -> index end)

  xco2 =
    nc_file
    |> NetCDF.Variable.load("xco2")
    |> elem(1)
    |> then(&amp;Nx.tensor(&amp;1.value, type: as_nx_type.(&amp;1.type)))

  vertex_longitude =
    nc_file
    |> NetCDF.Variable.load("vertex_longitude")
    |> elem(1)
    |> then(&amp;Nx.tensor(&amp;1.value, type: as_nx_type.(&amp;1.type)))

  vertex_latitude =
    nc_file
    |> NetCDF.Variable.load("vertex_latitude")
    |> elem(1)
    |> then(&amp;Nx.tensor(&amp;1.value, type: as_nx_type.(&amp;1.type)))

  polygon_tensor =
    [
      vertex_longitude,
      vertex_latitude
    ]
    |> Nx.stack()
    |> Nx.transpose()
    |> Nx.reshape({:auto, 4, 2})

  co2_polygon_tensor =
    [
      polygon_tensor,
      xco2 |> Nx.new_axis(-1) |> Nx.tile([4]) |> Nx.new_axis(-1)
    ]
    |> Nx.concatenate(axis: 2)

  case japan_indeces do
    [] ->
      nil

    japan_indeces ->
      Nx.take(co2_polygon_tensor, Nx.tensor(japan_indeces), axis: 0)
  end
end
monthly_polygon_tensor =
  [oco2_dir, target_month_dir]
  |> Path.join()
  |> File.ls!()
  |> Enum.sort()
  |> Enum.map(fn filename ->
    load_japan_polygon_data.(filename)
  end)
  |> Enum.filter(&amp;(&amp;1 != nil))
  |> Nx.concatenate(axis: 0)
geometries =
  monthly_polygon_tensor
  |> Nx.to_batched(1)
  |> Enum.map(fn co2_polygon ->
    coordinates =
      co2_polygon[[0, 0..3, 0..1]]
      |> Nx.to_flat_list()
      |> Enum.chunk_every(2)
      |> Enum.map(&amp;List.to_tuple(&amp;1))

    co2 =
      co2_polygon[[0, 0, 2]]
      |> Nx.subtract(min_co2)
      |> Nx.multiply(255 / (max_co2 - min_co2))
      |> Nx.as_type(:u8)
      |> Nx.to_number()

    %Geo.Polygon{
      coordinates: [coordinates],
      properties: %{"xco2" => co2}
    }
  end)
  |> Enum.to_list()

geo_collection = %Geo.GeometryCollection{geometries: geometries}
MapLibre.new(center: {136, 36}, zoom: 3, style: :terrain)
|> MapLibre.add_geo_source("co2_geo", geo_collection)
|> MapLibre.add_layer(
  id: "co2_polygon",
  source: "co2_geo",
  type: :fill,
  paint: [fill_color: jet_expression, fill_opacity: 0.5]
)

年毎に地図表示する

load_tensors = fn file_path ->
  {:ok, nc_file} = NetCDF.File.open(file_path)

  for var_name <- ["longitude", "latitude", "xco2"], into: %{} do
    tensor =
      nc_file
      |> NetCDF.Variable.load(var_name)
      |> elem(1)
      |> then(&amp;Nx.tensor(&amp;1.value, type: as_nx_type.(&amp;1.type)))

    {var_name, tensor}
  end
end
month_list = ["201612", "201712", "201812", "201912", "202012", "202112", "202212"]
time_series_tensors =
  month_list
  |> Enum.map(fn month_dir ->
    [oco2_dir, month_dir]
    |> Path.join()
    |> File.ls!()
    |> Enum.sort()
    |> Enum.at(0)
    |> then(&amp;[oco2_dir, month_dir, &amp;1])
    |> Path.join()
    |> load_tensors.()
  end)
full_xco2 =
  time_series_tensors
  |> Enum.map(&amp; &amp;1["xco2"])
  |> Nx.concatenate()

{max_co2, min_co2} = {
  full_xco2 |> Nx.reduce_max() |> Nx.to_number(),
  full_xco2 |> Nx.reduce_min() |> Nx.to_number()
}
time_series_tensors
|> Enum.zip(month_list)
|> Enum.map(fn {tensors, month} ->
  corrected_co2 =
    tensors["xco2"]
    |> Nx.subtract(min_co2)
    |> Nx.multiply(255 / (max_co2 - min_co2))
    |> Nx.as_type(:u8)

  map_source = %{
    "latitude" => Nx.to_flat_list(tensors["latitude"]),
    "longitude" => Nx.to_flat_list(tensors["longitude"]),
    "xco2" => Nx.to_flat_list(corrected_co2)
  }

  map =
    MapLibre.new()
    |> MapLibre.add_table_source(
      "xco2",
      map_source,
      {:lng_lat, ["longitude", "latitude"]},
      properties: ["xco2"]
    )
    |> MapLibre.add_layer(
      id: "xco2_layer",
      source: "xco2",
      type: :circle,
      paint: [circle_color: jet_expression]
    )

  {month <> "01", map}
end)
|> Kino.Layout.tabs()

二酸化炭素濃度の時系列変化をグラフ化する

mean_xco2_list =
  time_series_tensors
  |> Enum.map(fn tensors ->
    Nx.mean(tensors["xco2"]) |> Nx.to_number()
  end)
plot_data =
  mean_xco2_list
  |> Enum.zip(month_list)
  |> Enum.map(fn {mean_xco2, month} ->
    %{
      mean_xco2: mean_xco2,
      year: month |> String.slice(0..3) |> Integer.parse() |> elem(0)
    }
  end)

VegaLite.new(width: 700)
|> VegaLite.data_from_values(plot_data)
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "year", type: :nominal)
|> VegaLite.encode_field(:y, "mean_xco2", type: :quantitative, scale: %{"domain" => [400, 420]})

回帰直線を求めて将来の値を予測する

points =
  mean_xco2_list
  |> Enum.zip(month_list)
  |> Enum.map(fn {mean_xco2, month} ->
    [month |> String.slice(0..3) |> Integer.parse() |> elem(0), mean_xco2]
  end)
  |> Nx.tensor()
[vx, vy, x, y] =
  points
  |> Evision.fitLine(Evision.Constant.cv_DIST_L2(), 0, 0.01, 0.01)
  |> Evision.Mat.to_nx(EXLA.Backend)
  |> Nx.to_flat_list()
slope = vy / vx
intercept = y - slope * x

{slope, intercept}
xco2_predictions =
  2023..2100
  |> Enum.map(fn year ->
    %{
      mean_xco2: slope * year + intercept,
      year: year
    }
  end)

xco2_predictions
|> Enum.at(-1)
predictions_data = plot_data ++ xco2_predictions

VegaLite.new(width: 700)
|> VegaLite.data_from_values(predictions_data)
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "year", type: :nominal)
|> VegaLite.encode_field(:y, "mean_xco2", type: :quantitative)