Regresión Segmentada
Mix.install([
{:scholar, "~> 0.2.1"},
{:nx, "~> 0.6.2"},
{:exla, "~> 0.6.1"},
{:explorer, "~> 0.7.1"},
{:kino_vega_lite, "~> 0.1.10"},
{:tucan, "~> 0.2.1"}
])
Nx.global_default_backend(EXLA.Backend)
Nx.Defn.global_default_options(compiler: EXLA, client: :host)
Introducción
En análisis de regresión, al obtener un conjunto de datos que pueden ser visualizados en el plano bidimensional, uno trata de pasar una línea que mejor se ajusta a los datos. Este problema se puede resolver a través de mínimos cuadrados lineal (OLS).
Formalmente, sea un conjunto de $n$ datos $P={(x1,y_1),(x_2,y_2),…,(x_n,y_n)}$ tal que $x_1 < x_2 < … < x_n$. Dada una recta $L$ definida por $y=wx+b$, decimos que el _error de $L$ respecto de los datos $P$ es la suma de las distancias cuadráticas. El objetivo del problema es minimizar esta medida de error.
$$ Error(L,P)=\sum_{i=1}^{n}(y_i-(wx_i+b))^2 $$
En la siguiente gráfica vemos un a regresión lineal resuelta a través de OLS. Sin embargo, notar que la línea no se ajusta tan bien a los datos (puntos rojos y azules). Existen diferentes metedologías de regresión por la cual se puede encontrar un mejor estimador.
En este notebook vamos a realizar una implementación del método de mínimos cuadrados segmentado para realizar una regresión por pedazos (piecewise regression). Lo interesante de este método es que su implementación utiliza una técnica de diseño de algoritmos llamada programación dinámica.
Formulación del problema
Sea el conjunto de datos $P = {(x_1, y_1), (x_2, y_2), …, (x_n, y_n)}$, con $x_1 < x_2 < … < x_n$. Usaremos $p_i$ para denotar el punto $(x_i, y_i)$.
Primero debemos particionar $P$ en algún número de segmentos. Cada segmento es un subconjunto de $P$ que representa un conjunto contiguo de coordenadas $x$; es decir, es un subconjunto de la forma ${pi, p{i+1}, …, p_{j-1}, p_j}$ para ciertos índices $i ≤ j$. Luego, para cada segmento $S$ en nuestra partición de $P$, calculamos la función de ajuste lineal que minimiza el error con respecto a los puntos en $S$ mediante OLS.
La penalización de una partición $X$ esta definida como
$$ f(X) = \lambda|X| + \sum{S{i,j} \subset X}e_{i,j} $$
donde $e{i,j}$ es el error mínimo de ajustar una recta en los puntos $S{i,j} = {pi, p{i+1} …, p_j}$, y $\lambda$ es un hiperparámetro de penalización tal que $\lambda > 0$.
El objetivo en este problema es hallar una partición de penalización mínima.
$$ X{opt} = \argmin{X}f(X) $$
Librerías numéricas en Elixir
Para implementar el algoritmo de regresión segmentada vamos a utilizar algunas librerías numéricas de Elixir.
La principal librería numérica es Nx, la cual cumple un rol similar al de Numpy en el lenguaje Python. Esta librería nos va a permitir operar con vectores y matrices, y provee una gran variedad de funciones numéricas de gran utilidad.
Otra librería que vamos a utilizar es Scholar, una librería de orientada al aprendizaje automático tradicional implementada sobre Nx. Para este notebook vamos a utilizar los módulos de regresión lineal y de distancias.
alias Scholar.Linear.LinearRegression
alias Scholar.Metrics.Distance
Finalmente, vamos a necesitar una librería para visualizar los resultados de la regresión segmentada. Para ello vamos a utilizar Tucan, la cual está implementada sobre VegaLite y simplifica considerablemente la API para construir gráficas.
Diseño del algoritmo
Vamos a utilizar una técnica de diseño de algoritmos llamada programación dinámica, vista en el libro Algorithm Design de Jon Kleinberg y Eva Tardos. Consiste en, a partir del problema inicial, obtener un conjunto de subproblemas, los cuales vamos a solucionar para construir una solución al problema original. Debemos ser capaces de construir soluciones a los subproblemas mediante una relación de recurrencia.
Para resolver la regresión segmentada, la siguiente observación es muy útil: el último punto $pn$ pertenece a un solo segmento en la partición óptima $X{opt}$, y dicho segmento comienza en un punto anterior $pi$. Esta observación nos lleva al conjunto de subproblemas que buscamos: si sabemos solucionar el problema para el último segmento ${p_i,…,p_n}$, entonces podemos quitarlos de consideración para el subproblema de los puntos restantes ${p_1,…,p{i-1}}$.
Entonces si $OPT(i-1)$ representa la solución óptima para el subconjunto de puntos ${p1,…,p{i-1}}$ y denotamos $e_{i,j}$ como el error de regresión lineal en el último segmento ${p_i,…,p_n}$, la solución óptima del problema es
$$ OPT(n) = e_{i,n} + \lambda + OPT(i-1) $$
De manera general, podemos aplicar la misma lógica para el subproblema determinado para el segmento ${p1,…,p{j}}$
$$ OPT(j) = \min{1 \leq i \leq j} {e{i,j} + \lambda + OPT(i-1)} $$
incluyendo dicho segmento en la solución óptima si y solo si el mínimo es obtenido en el indice $i$.
Implementación
defmodule SegmentedRegression do
def compute_opt(n, errors, lambda \\ 1) do
opt = initial_opt(n)
for j <- 0..(n - 1) do
opt_j =
for i <- 1..j do
opt[i - 1]
|> Nx.add(errors[i][j])
|> Nx.add(lambda)
|> Nx.to_number()
end
|> Enum.min()
opt = update_opt(opt, j, opt_j)
Nx.to_number(opt[j])
end
|> Nx.tensor()
end
def find_segments(j, x, y, errors, opt, lambda \\ 1)
def find_segments(j, _, _, _, _, _) when j <= 0, do: []
def find_segments(j, x, y, errors, opt, lambda) do
{_, min_index} =
for i <- 1..j do
opt[i - 1]
|> Nx.add(errors[i][j])
|> Nx.add(lambda)
|> Nx.to_number()
end
|> Enum.zip(0..j)
|> Enum.min_by(&elem(&1, 0))
[
{x[min_index..j], y[min_index..j]}
| find_segments(min_index - 1, x, y, errors, opt, lambda)
]
end
def calculate_errors(x, y, n) do
for i <- 0..(n - 1) do
for j <- 0..(n - 1) do
if i <= j do
x[i..j]
|> LinearRegression.fit(y[i..j])
|> LinearRegression.predict(x[i..j])
|> Distance.squared_euclidean(y[i..j])
|> Nx.to_number()
else
:nan
end
end
end
|> Nx.tensor()
end
defp initial_opt(n) do
Nx.tensor(:nan)
|> Nx.tile([n])
|> Nx.indexed_put(Nx.tensor([[0]]), Nx.tensor([0]))
end
defp update_opt(opt, index, update) do
Nx.indexed_put(opt, Nx.tensor([[index]]), Nx.tensor([update]))
end
end
Ejemplos con diferentes datasets
# Datos de cultivos de papa (ton/ha) en función de precipitaciones (mm)
# Se combinaron y ordenaron datos de entrenamiento y validacion
samples =
Enum.zip(
[206, 188, 219, 372, 345, 231, 203, 170, 55, 91, 292, 141, 129, 170, 324] ++
[213, 80, 391, 250, 57, 303, 263, 157, 72, 157, 188, 216, 362, 283, 308],
[29, 25, 31, 25, 29, 30, 26, 23, 12, 15, 28, 24, 23, 22, 30] ++
[30, 16, 25, 26, 9, 28, 28, 25, 13, 23, 26, 25, 28, 33, 30]
)
|> Enum.sort_by(fn {x, _y} -> x end, &<=/2)
n_samples = length(samples)
x =
samples
|> Enum.map(fn {x, _y} -> x end)
|> Nx.tensor()
|> Nx.stack(axis: 1)
y =
samples
|> Enum.map(fn {_x, y} -> y end)
|> Nx.tensor()
|> Nx.stack(axis: 1)
errors = SegmentedRegression.calculate_errors(x, y, n_samples)
opt = SegmentedRegression.compute_opt(n_samples, errors)
segments = SegmentedRegression.find_segments(n_samples - 1, x, y, errors, opt)
alias Explorer.DataFrame, as: DF
scatter =
[x, y]
|> Nx.concatenate(axis: 1)
|> DF.new()
|> Tucan.scatter("x1", "x2",
width: 500,
height: 500,
filled: true,
point_size: 50,
point_color: "red"
)
Tucan.layers(
for {x, y} <- segments do
y_best_fit =
LinearRegression.fit(x, y)
|> LinearRegression.predict(x)
|> Nx.as_type(:f64)
Nx.concatenate([x, y_best_fit], axis: 1)
|> DF.new()
|> Tucan.lineplot("x1", "x2",
width: 500,
height: 350
)
end ++ [scatter]
)
ice_cream_df =
Kino.FS.file_path("ice_cream_selling_data.csv")
|> DF.from_csv!()
ice_cream_df
|> Tucan.scatter("Temperature (°C)", "Ice Cream Sales (units)",
width: 500,
height: 350,
filled: true,
point_size: 50,
point_color: "red"
)
{x, y} =
ice_cream_df
|> Nx.stack(axis: 1)
|> Nx.sort(direction: :asc, axis: 1)
|> Nx.split(0.5, axis: 1)
n_samples = elem(Nx.shape(x), 0)
errors = SegmentedRegression.calculate_errors(x, y, n_samples)
opt = SegmentedRegression.compute_opt(n_samples, errors)
segments = SegmentedRegression.find_segments(n_samples - 1, x, y, errors, opt)
scatter =
[x, y]
|> Nx.concatenate(axis: 1)
|> DF.new()
|> Tucan.scatter("x1", "x2",
width: 500,
height: 500,
filled: true,
point_size: 50,
point_color: "red"
)
Tucan.layers(
for {x, y} <- segments do
y_best_fit =
LinearRegression.fit(x, y)
|> LinearRegression.predict(x)
|> Nx.as_type(:f64)
Nx.concatenate([x, y_best_fit], axis: 1)
|> DF.new()
|> Tucan.lineplot("x1", "x2",
width: 500,
height: 350
)
end ++ [scatter]
)