市区町村のNDVI
Mix.install(
[
{:nx, "~> 0.9"},
{:evision, "~> 0.2"},
{:exla, "~> 0.9"},
{:geo, "~> 3.5"},
{:jason, "~> 1.4"},
{:kino, "~> 0.14"},
{:kino_maplibre, "~> 0.1"},
{:kino_vega_lite, "~> 0.1"}
],
config: [nx: [default_backend: EXLA.Backend]]
)
データの指定
提供:だいち(ALOS) AVNIR-2 データ(JAXA)
だいち(ALOS) AVNIR-2 の仕様はこちら
https://www.eorc.jaxa.jp/ALOS/jp/alos/sensor/avnir2_j.htm
scene_id_list = [
"202ce08d-ba4b-4ffe-8165-109fd3a8b917",
"34d8dc6f-fdd1-4542-a038-c1235a5a97fa",
"12ad308b-6ce1-40ec-9ebf-f0215c30500e",
"e2e85b2e-a208-4a65-87fd-b92721b037a8",
"208a3618-7700-421b-bf05-fd59551cc1aa",
"d5ce7320-5b25-4ced-bda5-0e25a9d75940",
"9d14706f-cee7-4eb4-9151-2558609c3de0",
"3f4555ac-eaf3-4066-a1ba-20bb1ec1c0b3"
]
ヘッダーファイルの読込
ヘッダーファイルの仕様はこちら
defmodule Coordinate do
defstruct latitude: 0.0, longitude: 0.0
end
defmodule BandInfo do
defstruct gain: 0.0, offset: 0.0
end
defmodule Affine do
defstruct a: 0.0, b: 0.0, c: 0.0, d: 0.0
end
defmodule Conversion do
defstruct x: 0.0, y: 0.0
end
defmodule HeaderInfo do
defstruct blue_band: %BandInfo{},
green_band: %BandInfo{},
red_band: %BandInfo{},
nir_band: %BandInfo{},
center: %Coordinate{},
left_top: %Coordinate{},
right_top: %Coordinate{},
left_bottom: %Coordinate{},
right_bottom: %Coordinate{},
affine: %Affine{},
conversion: %Conversion{},
degree: 0.0,
map_degree: 0.0,
datetime: nil,
product_id: ""
def get_string(info, start, value) do
info
|> String.slice(start, value)
|> String.trim()
end
def get_value(info, start, value) do
info
|> get_string(start, value)
|> String.to_float()
end
def read(hdr_file_path) do
info = File.read!(hdr_file_path)
%HeaderInfo{
# 青色光バンド
blue_band: %BandInfo{
gain: get_value(info, 1721, 8),
offset: get_value(info, 1729, 8)
},
# 緑色光バンド
green_band: %BandInfo{
gain: get_value(info, 1737, 8),
offset: get_value(info, 1745, 8)
},
# 赤色光バンド
red_band: %BandInfo{
gain: get_value(info, 1752, 8),
offset: get_value(info, 1760, 8)
},
# 近赤外線バンド
nir_band: %BandInfo{
gain: get_value(info, 1768, 8),
offset: get_value(info, 1776, 8)
},
# 画像中央
center: %Coordinate{
latitude: get_value(info, 248, 16),
longitude: get_value(info, 264, 16)
},
# 画像左上
left_top: %Coordinate{
latitude: get_value(info, 376, 16),
longitude: get_value(info, 392, 16)
},
# 画像右上
right_top: %Coordinate{
latitude: get_value(info, 408, 16),
longitude: get_value(info, 424, 16)
},
# 画像左下
left_bottom: %Coordinate{
latitude: get_value(info, 440, 16),
longitude: get_value(info, 456, 16)
},
# 画像右下
right_bottom: %Coordinate{
latitude: get_value(info, 472, 16),
longitude: get_value(info, 488, 16)
},
affine: %Affine{
a: get_value(info, 1224, 16),
b: get_value(info, 1240, 16),
c: get_value(info, 1256, 16),
d: get_value(info, 1272, 16)
},
conversion: %Conversion{
x: get_value(info, 1208, 8),
y: get_value(info, 1216, 8)
},
degree: get_value(info, 760, 16),
map_degree: get_value(info, 921, 16),
datetime:
info
|> get_string(192, 24)
|> then(fn str ->
String.slice(str, 0, 4) <>
"-" <>
String.slice(str, 4, 2) <>
"-" <>
String.slice(str, 6, 2) <>
"T" <>
String.slice(str, 8, 2) <>
":" <>
String.slice(str, 10, 2) <>
":" <> String.slice(str, 12, 2)
end)
|> NaiveDateTime.from_iso8601!(),
product_id: get_string(info, 128, 16)
}
end
end
NDVIの算出
defmodule NDVI do
def read_header(file_path_list) do
file_path_list
|> Enum.find(fn file -> Path.extname(file) == ".txt" end)
|> HeaderInfo.read()
end
def get_band_tensor(file_path_list, prefix) do
file_path_list
|> Enum.find(fn file ->
file
|> Path.basename()
|> String.starts_with?(prefix)
end)
|> Evision.imread(flags: Evision.Constant.cv_IMREAD_GRAYSCALE())
|> Evision.Mat.to_nx(EXLA.Backend)
end
def calc(file_path_list) do
header_info = read_header(file_path_list)
red_tensor =
file_path_list
|> get_band_tensor("IMG-03")
|> Nx.multiply(header_info.red_band.gain)
|> Nx.add(header_info.red_band.offset)
nir_tensor =
file_path_list
|> get_band_tensor("IMG-04")
|> Nx.multiply(header_info.nir_band.gain)
|> Nx.add(header_info.nir_band.offset)
ndvi_tensor =
Nx.select(
Nx.multiply(
Nx.not_equal(red_tensor, 0),
Nx.not_equal(nir_tensor, 0)
),
Nx.divide(
Nx.subtract(nir_tensor, red_tensor),
Nx.add(nir_tensor, red_tensor)
),
0
)
{header_info, ndvi_tensor}
end
end
ndvi_list =
scene_id_list
|> Enum.map(fn scene_id ->
"/tmp/#{scene_id}"
|> File.ls!()
|> Enum.map(fn filename -> Path.join(["/tmp", scene_id, filename]) end)
|> NDVI.calc()
end)
{header_info, ndvi_tensor} = Enum.at(ndvi_list, 0)
ndvi_map =
ndvi_tensor
|> Nx.multiply(128)
|> Nx.add(128)
|> Nx.as_type(:u8)
|> Evision.Mat.from_nx_2d()
|> then(&[src: &1, colormap: Evision.Constant.cv_COLORMAP_WINTER()])
|> Evision.applyColorMap()
Evision.resize(ndvi_map, {640, 640})
center = {header_info.center.longitude, header_info.center.latitude}
coordinates = [
[header_info.left_top.longitude, header_info.left_top.latitude],
[header_info.right_top.longitude, header_info.right_top.latitude],
[header_info.right_bottom.longitude, header_info.right_bottom.latitude],
[header_info.left_bottom.longitude, header_info.left_bottom.latitude]
]
# 地図にプロットするために BASE64 エンコードする
get_data_url = fn mat ->
Evision.imencode(".png", mat)
|> Base.encode64()
|> then(&"data:image/png;base64,#{&1}")
end
img_url =
ndvi_map
|> Evision.resize({4960, 4960})
|> get_data_url.()
MapLibre.new(center: center, zoom: 8, style: :terrain)
|> MapLibre.add_source("image", type: :image, url: img_url, coordinates: coordinates)
|> MapLibre.add_layer(id: "overlay", source: "image", type: :raster)
市区町村のポリゴンデータを用意する
青森県の行政区域データを /tmp/GML にダウンロードしておく
出典: 「国土数値情報(行政区域データ)」(国土交通省)(https://nlftp.mlit.go.jp/ksj/gml/datalist/KsjTmplt-N03-2024.html)を加工して作成
gml_dir = "/tmp/GML"
geojson_file =
gml_dir
# ファイル一覧取得
|> File.ls!()
# `.geojson` で終わるもののうち先頭を取得
|> Enum.find(&String.ends_with?(&1, ".geojson"))
geojson_data =
[gml_dir, geojson_file]
|> Path.join()
|> File.read!()
|> Jason.decode!()
|> Geo.JSON.decode!()
行政コード = 02361 (藤崎町)のデータを取得する
city_geojson = %Geo.GeometryCollection{
geometries: Enum.filter(geojson_data.geometries, &(&1.properties["N03_007"] == "02361"))
}
city_map =
MapLibre.new(center: {140.5, 40.68}, zoom: 11, style: :terrain)
|> MapLibre.add_geo_source("city_geojson", city_geojson)
|> MapLibre.add_layer(
id: "city_geojson",
source: "city_geojson",
type: :fill,
paint: [fill_color: "#00ff00", fill_opacity: 0.5]
)
NDVI画像の回転
{ndvi_height, ndvi_width, 3} = Evision.Mat.shape(ndvi_map)
header_info.degree
wrap_rotate = fn mat, degree ->
# 元の大きさ
{src_height, src_width, 3} = mat.shape
# 四隅がはみ出ることを考慮しない回転のアフィン変換
affine =
{src_width / 2, src_height / 2}
|> Evision.getRotationMatrix2D(degree, 1)
|> Evision.Mat.to_nx(EXLA.Backend)
# 回転後に四隅を画像内に含むためのサイズを計算
cos = Nx.abs(affine[[0, 0]])
sin = Nx.abs(affine[[0, 1]])
dst_width =
Nx.add(Nx.multiply(src_height, sin), Nx.multiply(src_width, cos))
|> Nx.to_number()
|> trunc()
dst_height =
Nx.add(Nx.multiply(src_height, cos), Nx.multiply(src_width, sin))
|> Nx.to_number()
|> trunc()
# (変換後サイズ - 変更前サイズ) / 2 の分だけずらして画像の中心を回転の中心に合わせる
bias =
Nx.tensor([
[0, 0, (dst_width - src_width) / 2],
[0, 0, (dst_height - src_height) / 2]
])
# 四隅を画像内に含めるアフィン変換
new_affine = Nx.add(affine, bias)
Evision.warpAffine(mat, new_affine, {dst_width, dst_height})
end
rotated_map = wrap_rotate.(ndvi_map, -header_info.degree)
Evision.resize(rotated_map, {640, 640})
上辺と下辺の差を計算する
diff_longitude =
header_info.left_top.longitude - header_info.right_top.longitude -
(header_info.left_bottom.longitude - header_info.right_bottom.longitude)
1ピクセルあたりの緯度経度を計算する
ndvi_longitudes = [
header_info.left_top.longitude,
header_info.right_top.longitude,
header_info.left_bottom.longitude,
header_info.right_bottom.longitude
]
ndvi_latitudes = [
header_info.left_top.latitude,
header_info.right_top.latitude,
header_info.left_bottom.latitude,
header_info.right_bottom.latitude
]
ndvi_min_longitude = Enum.min(ndvi_longitudes)
ndvi_max_longitude = Enum.max(ndvi_longitudes)
ndvi_min_latitude = Enum.min(ndvi_latitudes)
ndvi_max_latitude = Enum.max(ndvi_latitudes)
{ndvi_min_longitude, ndvi_max_longitude, ndvi_min_latitude, ndvi_max_latitude}
{ndvi_height, ndvi_width, 3} = Evision.Mat.shape(rotated_map)
pix_per_longitude = ndvi_width / (ndvi_max_longitude - ndvi_min_longitude)
pix_per_latitude = ndvi_height / (ndvi_max_latitude - ndvi_min_latitude)
{pix_per_longitude, pix_per_latitude}
横方向への補正値を計算する
diff_x = trunc(diff_longitude / 2 * pix_per_longitude)
補正値の分だけ水平移動する
affine =
[
[1, 0, diff_x],
[0, 1, 0]
]
|> Nx.tensor()
|> Nx.as_type(:f32)
corrected_map = Evision.warpAffine(rotated_map, affine, {ndvi_width, ndvi_height})
Evision.resize(corrected_map, {640, 640})
マスク画像による切り抜き
# 緯度経度の最大最小を求める
city_coordinates =
city_geojson.geometries
|> Enum.map(& &1.coordinates)
|> List.flatten()
city_longitudes = Enum.map(city_coordinates, &elem(&1, 0))
city_latitudes = Enum.map(city_coordinates, &elem(&1, 1))
city_min_longitude = Enum.min(city_longitudes)
city_max_longitude = Enum.max(city_longitudes)
city_min_latitude = Enum.min(city_latitudes)
city_max_latitude = Enum.max(city_latitudes)
{city_min_longitude, city_max_longitude, city_min_latitude, city_max_latitude}
# 最大最小の差からマスク画像の大きさを求める
city_width = trunc((city_max_longitude - city_min_longitude) * pix_per_longitude)
city_height = trunc((city_max_latitude - city_min_latitude) * pix_per_latitude)
{city_width, city_height}
# 緯度経度の最小から最大をピクセル数の0から幅・高さにスケールする
normalized_points =
city_geojson.geometries
|> Enum.map(& &1.coordinates)
|> Enum.map(fn coordinate ->
coordinate
|> Enum.at(0)
|> Enum.map(fn {x, y} ->
[
trunc((x - city_min_longitude) * pix_per_longitude),
# 縦方向は緯度が北緯の場合逆転するため、高さから引く
trunc(city_height - (y - city_min_latitude) * pix_per_latitude)
]
end)
|> Nx.tensor(type: :s32)
end)
# 空画像を用意する
empty_mat =
[0, 0, 0, 255]
|> Nx.tensor(type: :u8)
|> Nx.tile([city_height, city_width, 1])
|> Evision.Mat.from_nx_2d()
Evision.resize(empty_mat, {640, 640})
# ポリゴンを透明色で塗りつぶす
mask_mat = Evision.fillPoly(empty_mat, normalized_points, {0, 0, 0, 0})
Evision.resize(mask_mat, {640, 640})
地図へのプロット
city_center = {
(city_min_longitude + city_max_longitude) / 2,
(city_min_latitude + city_max_latitude) / 2
}
city_bbox_coordinates = [
[city_min_longitude, city_max_latitude],
[city_max_longitude, city_max_latitude],
[city_max_longitude, city_min_latitude],
[city_min_longitude, city_min_latitude]
]
image_url =
mask_mat
|> Evision.resize({trunc(city_height / 4), trunc(city_width / 4)})
|> get_data_url.()
MapLibre.new(center: city_center, zoom: 11, style: :terrain)
|> MapLibre.add_source("city_mask",
type: :image,
url: image_url,
coordinates: city_bbox_coordinates
)
|> MapLibre.add_layer(id: "overlay", source: "city_mask", type: :raster)
NDVI を行政区域で切り抜く
# 緯度経度を元に NDVI 画像内の行政区域の位置を計算する
city_left_top_x = trunc((city_min_longitude - ndvi_min_longitude) * pix_per_longitude)
city_left_top_y = trunc((ndvi_max_latitude - city_max_latitude) * pix_per_latitude)
{city_left_top_x, city_left_top_y}
city_ndvi_bbox_map =
corrected_map
|> Evision.Mat.to_nx(EXLA.Backend)
|> then(
& &1[
[
city_left_top_y..(city_left_top_y + city_height - 1),
city_left_top_x..(city_left_top_x + city_width - 1)
]
]
)
|> Evision.Mat.from_nx_2d()
mask_mat
|> Evision.Mat.to_nx(EXLA.Backend)
|> Nx.slice_along_axis(3, 1, axis: 2)
|> Nx.tile([3])
|> Nx.select(0, Evision.Mat.to_nx(city_ndvi_bbox_map, EXLA.Backend))
|> Nx.as_type(:u8)
|> Evision.Mat.from_nx_2d()
時系列による推移
defmodule CityNDVI do
def get_lat_lon(header_info) do
ndvi_longitudes = [
header_info.left_top.longitude,
header_info.right_top.longitude,
header_info.left_bottom.longitude,
header_info.right_bottom.longitude
]
ndvi_latitudes = [
header_info.left_top.latitude,
header_info.right_top.latitude,
header_info.left_bottom.latitude,
header_info.right_bottom.latitude
]
ndvi_min_longitude = Enum.min(ndvi_longitudes)
ndvi_max_longitude = Enum.max(ndvi_longitudes)
ndvi_min_latitude = Enum.min(ndvi_latitudes)
ndvi_max_latitude = Enum.max(ndvi_latitudes)
{ndvi_min_longitude, ndvi_max_longitude, ndvi_min_latitude, ndvi_max_latitude}
end
def wrap_rotate(tensor, degree) do
{src_height, src_width} = Nx.shape(tensor)
affine =
{src_width / 2, src_height / 2}
|> Evision.getRotationMatrix2D(degree, 1)
|> Evision.Mat.to_nx(EXLA.Backend)
cos = Nx.abs(affine[[0, 0]])
sin = Nx.abs(affine[[0, 1]])
dst_width =
Nx.add(Nx.multiply(src_height, sin), Nx.multiply(src_width, cos))
|> Nx.to_number()
|> trunc()
dst_height =
Nx.add(Nx.multiply(src_height, cos), Nx.multiply(src_width, sin))
|> Nx.to_number()
|> trunc()
bias =
Nx.tensor([
[0, 0, (dst_width - src_width) / 2],
[0, 0, (dst_height - src_height) / 2]
])
new_affine = Nx.add(affine, bias)
Evision.warpAffine(tensor, new_affine, {dst_width, dst_height})
end
def correct(ndvi_tensor, header_info, ndvi_lat_lon) do
{ndvi_min_longitude, ndvi_max_longitude, _, _} = ndvi_lat_lon
rotated_mat = wrap_rotate(ndvi_tensor, -header_info.degree)
diff_longitude =
header_info.left_top.longitude - header_info.right_top.longitude -
(header_info.left_bottom.longitude - header_info.right_bottom.longitude)
{ndvi_height, ndvi_width} = rotated_mat.shape
pix_per_longitude = ndvi_width / (ndvi_max_longitude - ndvi_min_longitude)
diff_x = trunc(diff_longitude / 2 * pix_per_longitude)
affine =
[
[1, 0, diff_x],
[0, 1, 0]
]
|> Nx.tensor()
|> Nx.as_type(:f32)
Evision.warpAffine(rotated_mat, affine, {ndvi_width, ndvi_height})
end
def get_city_bbox(corrected_mat, mask_mat, city_geojson, ndvi_lat_lon) do
{ndvi_min_longitude, ndvi_max_longitude, ndvi_min_latitude, ndvi_max_latitude} = ndvi_lat_lon
city_coordinates =
city_geojson.geometries
|> Enum.map(& &1.coordinates)
|> List.flatten()
city_longitudes = Enum.map(city_coordinates, &elem(&1, 0))
city_latitudes = Enum.map(city_coordinates, &elem(&1, 1))
city_min_longitude = Enum.min(city_longitudes)
city_max_latitude = Enum.max(city_latitudes)
{ndvi_height, ndvi_width} = corrected_mat.shape
pix_per_longitude = ndvi_width / (ndvi_max_longitude - ndvi_min_longitude)
pix_per_latitude = ndvi_height / (ndvi_max_latitude - ndvi_min_latitude)
{city_height, city_width, 4} = mask_mat.shape
city_left_top_x = trunc((city_min_longitude - ndvi_min_longitude) * pix_per_longitude)
city_left_top_y = trunc((ndvi_max_latitude - city_max_latitude) * pix_per_latitude)
corrected_mat
|> Evision.Mat.to_nx(EXLA.Backend)
|> then(
& &1[
[
city_left_top_y..(city_left_top_y + city_height - 1),
city_left_top_x..(city_left_top_x + city_width - 1)
]
]
)
end
def crop(city_bbox, mask_mat) do
mask_mat
|> Evision.Mat.to_nx(EXLA.Backend)
|> Nx.slice_along_axis(3, 1, axis: 2)
|> Nx.select(0, city_bbox)
end
def get_city_ndvi(ndvi_tensor, header_info, mask_mat, city_geojson) do
ndvi_lat_lon = get_lat_lon(header_info)
corrected_mat = correct(ndvi_tensor, header_info, ndvi_lat_lon)
city_bbox = get_city_bbox(corrected_mat, mask_mat, city_geojson, ndvi_lat_lon)
city_mean_ndvi =
mask_mat
|> Evision.Mat.to_nx(EXLA.Backend)
|> Nx.slice_along_axis(3, 1, axis: 2)
|> Nx.squeeze()
# マスク内だけで平均する
|> Nx.select(0, 1)
|> then(&Nx.weighted_mean(city_bbox, &1))
|> Nx.to_number()
city_ndvi_map =
city_bbox
|> Nx.multiply(128)
|> Nx.add(128)
|> Nx.as_type(:u8)
|> then(&[src: &1, colormap: Evision.Constant.cv_COLORMAP_WINTER()])
|> Evision.applyColorMap()
|> Evision.Mat.to_nx(EXLA.Backend)
croped_city_ndvi_map =
mask_mat
|> Evision.Mat.to_nx(EXLA.Backend)
|> Nx.slice_along_axis(3, 1, axis: 2)
|> Nx.tile([3])
|> Nx.select(0, city_ndvi_map)
|> Nx.as_type(:u8)
{city_mean_ndvi, croped_city_ndvi_map}
end
end
city_ndvi_list =
ndvi_list
|> Enum.map(fn {header_info, ndvi_tensor} ->
# 観測日をタブにする
date =
header_info.datetime
|> NaiveDateTime.to_iso8601()
|> String.slice(0..9)
IO.inspect(date)
{city_ndvi, city_ndvi_map} =
CityNDVI.get_city_ndvi(ndvi_tensor, header_info, mask_mat, city_geojson)
{date, city_ndvi, city_ndvi_map}
end)
city_ndvi_list
|> Enum.map(fn {date, _, city_ndvi_map} ->
{date, city_ndvi_map |> Nx.reverse(axes: [2]) |> Kino.Image.new()}
end)
|> Kino.Layout.tabs()
plot_data =
city_ndvi_list
|> Enum.map(fn {date, city_mean_ndvi, _} ->
%{
date: date,
ndvi: city_mean_ndvi
}
end)
VegaLite.new(width: 700)
|> VegaLite.data_from_values(plot_data)
|> VegaLite.mark(:bar)
|> VegaLite.encode_field(:x, "date", type: :temporal)
|> VegaLite.encode_field(:y, "ndvi", type: :quantitative)