CO2可視化
Mix.install(
  [
    {:nx, "~> 0.9"},
    {:evision, "~> 0.2"},
    {:exla, "~> 0.9"},
    {:netcdf, "~> 0.1"},
    {:geo, "~> 3.5"},
    {:kino, "~> 0.15"},
    {:kino_maplibre, "~> 0.1"},
    {:kino_vega_lite, "~> 0.1"}
  ],
  config: [nx: [default_backend: EXLA.Backend]]
)
出典
NASA Earthdata の OCO-2 衛星データを使用
https://www.earthdata.nasa.gov/
以下のように OCO2_L2_Lite_FP 11r のデータを格納しているものとする
- 
tmp    
- 
OCO2        
- 
201612            
- oco2_LtCO2_161201_B11014Ar_230111183332s.nc4
 
 - 
201712            
- oco2_LtCO2_171201_B11014Ar_221129005324s.nc4
 
 - 
201812            
- oco2_LtCO2_181201_B11014Ar_221017172333s.nc4
 
 - …
 - 
202212            
- oco2_LtCO2_221201_B11014Ar_230118175544s.nc4
 - oco2_LtCO2_221202_B11014Ar_230118180140s.nc4
 - oco2_LtCO2_221203_B11014Ar_230118180210s.nc4
 - …
 
 
 - 
201612            
 
 - 
OCO2        
 
衛星データの読込
oco2_dir = "/tmp/OCO2/"
target_month_dir = "202212"
{:ok, file} =
  [oco2_dir, target_month_dir]
  |> Path.join()
  |> File.ls!()
  |> Enum.sort()
  |> Enum.at(0)
  |> then(&[oco2_dir, target_month_dir, &1])
  |> Path.join()
  |> NetCDF.File.open()
variables =
  ["latitude", "longitude", "vertex_latitude", "vertex_longitude", "xco2"]
  |> Enum.map(fn var_name ->
    file
    |> NetCDF.Variable.load(var_name)
    |> elem(1)
  end)
as_nx_type = fn
  :i8 -> :s8
  :i16 -> :s16
  :i32 -> :s32
  :i64 -> :s64
  t -> t
end
var_tensors =
  for var <- variables, into: %{} do
    tensor = Nx.tensor(var.value, type: as_nx_type.(var.type))
    {var.name, tensor}
  end
衛星データのグラフ表示
display_graph = fn tensors, item_name ->
  plot_data =
    tensors[item_name]
    |> Nx.to_flat_list()
    |> Enum.with_index()
    |> Enum.map(fn {item, index} ->
      %{
        "index" => index,
        item_name => item
      }
    end)
  VegaLite.new(width: 700)
  |> VegaLite.data_from_values(plot_data)
  |> VegaLite.mark(:line)
  |> VegaLite.encode_field(:x, "index", type: :quantitative)
  |> VegaLite.encode_field(:y, item_name, type: :quantitative)
end
display_graph.(var_tensors, "longitude")
display_graph.(var_tensors, "latitude")
display_graph.(var_tensors, "xco2")
{max_co2, min_co2} = {
  var_tensors["xco2"] |> Nx.reduce_max() |> Nx.to_number(),
  var_tensors["xco2"] |> Nx.reduce_min() |> Nx.to_number()
}
plot_data =
  var_tensors["xco2"]
  |> Nx.to_flat_list()
  |> Enum.with_index()
  |> Enum.map(fn {item, index} ->
    %{
      "index" => index,
      "xco2" => item
    }
  end)
y_scale = %{"domain" => [min_co2, max_co2]}
VegaLite.new(width: 700)
|> VegaLite.data_from_values(plot_data)
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "index", type: :quantitative)
|> VegaLite.encode_field(:y, "xco2", type: :quantitative, scale: y_scale)
衛星データの地図表示
corrected_co2 =
  var_tensors["xco2"]
  |> Nx.subtract(min_co2)
  |> Nx.multiply(255 / (max_co2 - min_co2))
  |> Nx.as_type(:u8)
jet_map =
  Nx.iota({256})
  |> Nx.tile([256, 1])
  |> Nx.as_type(:u8)
  |> Evision.Mat.from_nx_2d()
  |> then(&[src: &1, colormap: Evision.Constant.cv_COLORMAPcv_COLORMAP_JET_WINTER()])
  |> Evision.applyColorMap()
common_expression = ["step", ["get", "xco2"]]
bgr =
  jet_map
  |> Evision.Mat.to_nx(EXLA.Backend)
  |> then(& &1[[0, 0..255]])
b = bgr[[0..255, 0]]
g = bgr[[0..255, 1]]
r = bgr[[0..255, 2]]
get_expression = fn table_tensor ->
  [_ | tail] =
    [Nx.iota({256}), table_tensor]
    |> Nx.stack()
    |> Nx.transpose()
    |> Nx.to_flat_list()
  tail
end
r_expression = common_expression ++ get_expression.(r)
g_expression = common_expression ++ get_expression.(g)
b_expression = common_expression ++ get_expression.(b)
jet_expression = [
  "rgb",
  ["round", r_expression],
  ["round", g_expression],
  ["round", b_expression]
]
map_source = %{
  "latitude" => Nx.to_flat_list(var_tensors["latitude"]),
  "longitude" => Nx.to_flat_list(var_tensors["longitude"]),
  "xco2" => Nx.to_flat_list(corrected_co2)
}
MapLibre.new()
|> MapLibre.add_table_source(
  "xco2",
  map_source,
  {:lng_lat, ["longitude", "latitude"]},
  properties: ["xco2"]
)
|> MapLibre.add_layer(
  id: "xco2_layer",
  source: "xco2",
  type: :circle,
  paint: [circle_color: jet_expression]
)
1ヶ月分の衛星データを表示する
load_japan_data = fn filename ->
  {:ok, nc_file} =
    [oco2_dir, target_month_dir, filename]
    |> Path.join()
    |> NetCDF.File.open()
  longitude =
    nc_file
    |> NetCDF.Variable.load("longitude")
    |> elem(1)
    |> then(&Nx.tensor(&1.value, type: as_nx_type.(&1.type)))
  latitude =
    nc_file
    |> NetCDF.Variable.load("latitude")
    |> elem(1)
    |> then(&Nx.tensor(&1.value, type: as_nx_type.(&1.type)))
  # 経度127-147,緯度26-46の範囲を対象にする
  is_japan =
    Nx.logical_and(
      Nx.logical_and(Nx.greater_equal(longitude, 127), Nx.less(longitude, 147)),
      Nx.logical_and(Nx.greater_equal(latitude, 26), Nx.less(latitude, 46))
    )
  japan_indeces =
    is_japan
    |> Nx.to_flat_list()
    |> Enum.with_index()
    |> Enum.filter(fn {is_true, _} -> is_true > 0 end)
    |> Enum.map(fn {_, index} -> index end)
  xco2 =
    nc_file
    |> NetCDF.Variable.load("xco2")
    |> elem(1)
    |> then(&Nx.tensor(&1.value, type: as_nx_type.(&1.type)))
  case japan_indeces do
    [] ->
      nil
    japan_indeces ->
      [longitude, latitude, xco2]
      |> Nx.stack()
      |> Nx.take(Nx.tensor(japan_indeces), axis: 1)
  end
end
target_month_tensor =
  [oco2_dir, target_month_dir]
  |> Path.join()
  |> File.ls!()
  |> Enum.sort()
  |> Enum.map(&load_japan_data.(&1))
  |> Enum.filter(&(&1 != nil))
  |> Nx.concatenate(axis: 1)
{max_co2, min_co2} = {
  target_month_tensor[2] |> Nx.reduce_max() |> Nx.to_number(),
  target_month_tensor[2] |> Nx.reduce_min() |> Nx.to_number()
}
target_month_corrected_co2 =
  target_month_tensor[2]
  |> Nx.subtract(min_co2)
  |> Nx.multiply(255 / (max_co2 - min_co2))
  |> Nx.as_type(:u8)
map_source = %{
  "latitude" => Nx.to_flat_list(target_month_tensor[1]),
  "longitude" => Nx.to_flat_list(target_month_tensor[0]),
  "xco2" => Nx.to_flat_list(target_month_corrected_co2)
}
MapLibre.new(center: {136, 36}, zoom: 3)
|> MapLibre.add_table_source(
  "xco2",
  map_source,
  {:lng_lat, ["longitude", "latitude"]},
  properties: ["xco2"]
)
|> MapLibre.add_layer(
  id: "xco2_layer",
  source: "xco2",
  type: :circle,
  paint: [circle_color: jet_expression]
)
多角形による地図表示
polygon_tensor =
  [
    var_tensors["vertex_longitude"],
    var_tensors["vertex_latitude"]
  ]
  |> Nx.stack()
  |> Nx.transpose()
  # 観測範囲数 * 4(頂点数) * 2(経度と緯度)にする
  |> Nx.reshape({:auto, 4, 2})
co2_polygon_tensor =
  [
    polygon_tensor,
    corrected_co2 |> Nx.new_axis(-1) |> Nx.tile([4]) |> Nx.new_axis(-1)
  ]
  |> Nx.concatenate(axis: 2)
is_japan =
  Nx.logical_and(
    Nx.logical_and(
      Nx.greater_equal(var_tensors["longitude"], 127),
      Nx.less(var_tensors["longitude"], 147)
    ),
    Nx.logical_and(
      Nx.greater_equal(var_tensors["latitude"], 26),
      Nx.less(var_tensors["latitude"], 46)
    )
  )
japan_indeces =
  is_japan
  |> Nx.to_flat_list()
  |> Enum.with_index()
  |> Enum.filter(fn {is_true, _} -> is_true > 0 end)
  |> Enum.map(fn {_, index} -> index end)
  |> Nx.tensor()
japan_co2_polygon_tensor = Nx.take(co2_polygon_tensor, japan_indeces, axis: 0)
geo_collection =
  japan_co2_polygon_tensor
  |> Nx.to_batched(1)
  |> Enum.map(fn co2_polygon ->
    coordinates =
      co2_polygon[[0, 0..3, 0..1]]
      |> Nx.to_flat_list()
      |> Enum.chunk_every(2)
      |> Enum.map(&List.to_tuple(&1))
    co2 =
      co2_polygon[[0, 0, 2]]
      |> Nx.to_number()
    %Geo.Polygon{
      coordinates: [coordinates],
      properties: %{"xco2" => co2}
    }
  end)
  |> Enum.to_list()
  |> then(&%Geo.GeometryCollection{geometries: &1})
MapLibre.new(center: {141, 30}, zoom: 4)
|> MapLibre.add_geo_source("co2", geo_collection)
|> MapLibre.add_layer(
  id: "co2_polygon",
  source: "co2",
  type: :fill,
  paint: [fill_color: jet_expression]
)
1ヶ月分の多角形表示
load_japan_polygon_data = fn filename ->
  {:ok, nc_file} =
    [oco2_dir, target_month_dir, filename]
    |> Path.join()
    |> NetCDF.File.open()
  longitude =
    nc_file
    |> NetCDF.Variable.load("longitude")
    |> elem(1)
    |> then(&Nx.tensor(&1.value, type: as_nx_type.(&1.type)))
  latitude =
    nc_file
    |> NetCDF.Variable.load("latitude")
    |> elem(1)
    |> then(&Nx.tensor(&1.value, type: as_nx_type.(&1.type)))
  is_japan =
    Nx.logical_and(
      Nx.logical_and(Nx.greater_equal(longitude, 127), Nx.less(longitude, 147)),
      Nx.logical_and(Nx.greater_equal(latitude, 26), Nx.less(latitude, 46))
    )
  japan_indeces =
    is_japan
    |> Nx.to_flat_list()
    |> Enum.with_index()
    |> Enum.filter(fn {is_true, _} -> is_true > 0 end)
    |> Enum.map(fn {_, index} -> index end)
  xco2 =
    nc_file
    |> NetCDF.Variable.load("xco2")
    |> elem(1)
    |> then(&Nx.tensor(&1.value, type: as_nx_type.(&1.type)))
  vertex_longitude =
    nc_file
    |> NetCDF.Variable.load("vertex_longitude")
    |> elem(1)
    |> then(&Nx.tensor(&1.value, type: as_nx_type.(&1.type)))
  vertex_latitude =
    nc_file
    |> NetCDF.Variable.load("vertex_latitude")
    |> elem(1)
    |> then(&Nx.tensor(&1.value, type: as_nx_type.(&1.type)))
  polygon_tensor =
    [
      vertex_longitude,
      vertex_latitude
    ]
    |> Nx.stack()
    |> Nx.transpose()
    |> Nx.reshape({:auto, 4, 2})
  co2_polygon_tensor =
    [
      polygon_tensor,
      xco2 |> Nx.new_axis(-1) |> Nx.tile([4]) |> Nx.new_axis(-1)
    ]
    |> Nx.concatenate(axis: 2)
  case japan_indeces do
    [] ->
      nil
    japan_indeces ->
      Nx.take(co2_polygon_tensor, Nx.tensor(japan_indeces), axis: 0)
  end
end
monthly_polygon_tensor =
  [oco2_dir, target_month_dir]
  |> Path.join()
  |> File.ls!()
  |> Enum.sort()
  |> Enum.map(fn filename ->
    load_japan_polygon_data.(filename)
  end)
  |> Enum.filter(&(&1 != nil))
  |> Nx.concatenate(axis: 0)
geometries =
  monthly_polygon_tensor
  |> Nx.to_batched(1)
  |> Enum.map(fn co2_polygon ->
    coordinates =
      co2_polygon[[0, 0..3, 0..1]]
      |> Nx.to_flat_list()
      |> Enum.chunk_every(2)
      |> Enum.map(&List.to_tuple(&1))
    co2 =
      co2_polygon[[0, 0, 2]]
      |> Nx.subtract(min_co2)
      |> Nx.multiply(255 / (max_co2 - min_co2))
      |> Nx.as_type(:u8)
      |> Nx.to_number()
    %Geo.Polygon{
      coordinates: [coordinates],
      properties: %{"xco2" => co2}
    }
  end)
  |> Enum.to_list()
geo_collection = %Geo.GeometryCollection{geometries: geometries}
MapLibre.new(center: {136, 36}, zoom: 3, style: :terrain)
|> MapLibre.add_geo_source("co2_geo", geo_collection)
|> MapLibre.add_layer(
  id: "co2_polygon",
  source: "co2_geo",
  type: :fill,
  paint: [fill_color: jet_expression, fill_opacity: 0.5]
)
年毎に地図表示する
load_tensors = fn file_path ->
  {:ok, nc_file} = NetCDF.File.open(file_path)
  for var_name <- ["longitude", "latitude", "xco2"], into: %{} do
    tensor =
      nc_file
      |> NetCDF.Variable.load(var_name)
      |> elem(1)
      |> then(&Nx.tensor(&1.value, type: as_nx_type.(&1.type)))
    {var_name, tensor}
  end
end
month_list = ["201612", "201712", "201812", "201912", "202012", "202112", "202212"]
time_series_tensors =
  month_list
  |> Enum.map(fn month_dir ->
    [oco2_dir, month_dir]
    |> Path.join()
    |> File.ls!()
    |> Enum.sort()
    |> Enum.at(0)
    |> then(&[oco2_dir, month_dir, &1])
    |> Path.join()
    |> load_tensors.()
  end)
full_xco2 =
  time_series_tensors
  |> Enum.map(& &1["xco2"])
  |> Nx.concatenate()
{max_co2, min_co2} = {
  full_xco2 |> Nx.reduce_max() |> Nx.to_number(),
  full_xco2 |> Nx.reduce_min() |> Nx.to_number()
}
time_series_tensors
|> Enum.zip(month_list)
|> Enum.map(fn {tensors, month} ->
  corrected_co2 =
    tensors["xco2"]
    |> Nx.subtract(min_co2)
    |> Nx.multiply(255 / (max_co2 - min_co2))
    |> Nx.as_type(:u8)
  map_source = %{
    "latitude" => Nx.to_flat_list(tensors["latitude"]),
    "longitude" => Nx.to_flat_list(tensors["longitude"]),
    "xco2" => Nx.to_flat_list(corrected_co2)
  }
  map =
    MapLibre.new()
    |> MapLibre.add_table_source(
      "xco2",
      map_source,
      {:lng_lat, ["longitude", "latitude"]},
      properties: ["xco2"]
    )
    |> MapLibre.add_layer(
      id: "xco2_layer",
      source: "xco2",
      type: :circle,
      paint: [circle_color: jet_expression]
    )
  {month <> "01", map}
end)
|> Kino.Layout.tabs()
二酸化炭素濃度の時系列変化をグラフ化する
mean_xco2_list =
  time_series_tensors
  |> Enum.map(fn tensors ->
    Nx.mean(tensors["xco2"]) |> Nx.to_number()
  end)
plot_data =
  mean_xco2_list
  |> Enum.zip(month_list)
  |> Enum.map(fn {mean_xco2, month} ->
    %{
      mean_xco2: mean_xco2,
      year: month |> String.slice(0..3) |> Integer.parse() |> elem(0)
    }
  end)
VegaLite.new(width: 700)
|> VegaLite.data_from_values(plot_data)
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "year", type: :nominal)
|> VegaLite.encode_field(:y, "mean_xco2", type: :quantitative, scale: %{"domain" => [400, 420]})
回帰直線を求めて将来の値を予測する
points =
  mean_xco2_list
  |> Enum.zip(month_list)
  |> Enum.map(fn {mean_xco2, month} ->
    [month |> String.slice(0..3) |> Integer.parse() |> elem(0), mean_xco2]
  end)
  |> Nx.tensor()
[vx, vy, x, y] =
  points
  |> Evision.fitLine(Evision.Constant.cv_DIST_L2(), 0, 0.01, 0.01)
  |> Evision.Mat.to_nx(EXLA.Backend)
  |> Nx.to_flat_list()
slope = vy / vx
intercept = y - slope * x
{slope, intercept}
xco2_predictions =
  2023..2100
  |> Enum.map(fn year ->
    %{
      mean_xco2: slope * year + intercept,
      year: year
    }
  end)
xco2_predictions
|> Enum.at(-1)
predictions_data = plot_data ++ xco2_predictions
VegaLite.new(width: 700)
|> VegaLite.data_from_values(predictions_data)
|> VegaLite.mark(:line)
|> VegaLite.encode_field(:x, "year", type: :nominal)
|> VegaLite.encode_field(:y, "mean_xco2", type: :quantitative)