Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Regresión Segmentada

segmented_least_squares.livemd

Regresión Segmentada

Mix.install([
  {:scholar, "~> 0.2.1"},
  {:nx, "~> 0.6.2"},
  {:exla, "~> 0.6.1"},
  {:explorer, "~> 0.7.1"},
  {:kino_vega_lite, "~> 0.1.10"},
  {:tucan, "~> 0.2.1"}
])

Nx.global_default_backend(EXLA.Backend)
Nx.Defn.global_default_options(compiler: EXLA, client: :host)

Introducción

En análisis de regresión, al obtener un conjunto de datos que pueden ser visualizados en el plano bidimensional, uno trata de pasar una línea que mejor se ajusta a los datos. Este problema se puede resolver a través de mínimos cuadrados lineal (OLS).

Formalmente, sea un conjunto de $n$ datos $P={(x1,y_1),(x_2,y_2),…,(x_n,y_n)}$ tal que $x_1 < x_2 < … < x_n$. Dada una recta $L$ definida por $y=wx+b$, decimos que el _error de $L$ respecto de los datos $P$ es la suma de las distancias cuadráticas. El objetivo del problema es minimizar esta medida de error.

$$ Error(L,P)=\sum_{i=1}^{n}(y_i-(wx_i+b))^2 $$

En la siguiente gráfica vemos un a regresión lineal resuelta a través de OLS. Sin embargo, notar que la línea no se ajusta tan bien a los datos (puntos rojos y azules). Existen diferentes metedologías de regresión por la cual se puede encontrar un mejor estimador.

En este notebook vamos a realizar una implementación del método de mínimos cuadrados segmentado para realizar una regresión por pedazos (piecewise regression). Lo interesante de este método es que su implementación utiliza una técnica de diseño de algoritmos llamada programación dinámica.

Formulación del problema

Sea el conjunto de datos $P = {(x_1, y_1), (x_2, y_2), …, (x_n, y_n)}$, con $x_1 < x_2 < … < x_n$. Usaremos $p_i$ para denotar el punto $(x_i, y_i)$.

Primero debemos particionar $P$ en algún número de segmentos. Cada segmento es un subconjunto de $P$ que representa un conjunto contiguo de coordenadas $x$; es decir, es un subconjunto de la forma ${pi, p{i+1}, …, p_{j-1}, p_j}$ para ciertos índices $i ≤ j$. Luego, para cada segmento $S$ en nuestra partición de $P$, calculamos la función de ajuste lineal que minimiza el error con respecto a los puntos en $S$ mediante OLS.

La penalización de una partición $X$ esta definida como

$$ f(X) = \lambda|X| + \sum{S{i,j} \subset X}e_{i,j} $$

donde $e{i,j}$ es el error mínimo de ajustar una recta en los puntos $S{i,j} = {pi, p{i+1} …, p_j}$, y $\lambda$ es un hiperparámetro de penalización tal que $\lambda > 0$.

El objetivo en este problema es hallar una partición de penalización mínima.

$$ X{opt} = \argmin{X}f(X) $$

Librerías numéricas en Elixir

Para implementar el algoritmo de regresión segmentada vamos a utilizar algunas librerías numéricas de Elixir.

La principal librería numérica es Nx, la cual cumple un rol similar al de Numpy en el lenguaje Python. Esta librería nos va a permitir operar con vectores y matrices, y provee una gran variedad de funciones numéricas de gran utilidad.

Otra librería que vamos a utilizar es Scholar, una librería de orientada al aprendizaje automático tradicional implementada sobre Nx. Para este notebook vamos a utilizar los módulos de regresión lineal y de distancias.

alias Scholar.Linear.LinearRegression
alias Scholar.Metrics.Distance

Finalmente, vamos a necesitar una librería para visualizar los resultados de la regresión segmentada. Para ello vamos a utilizar Tucan, la cual está implementada sobre VegaLite y simplifica considerablemente la API para construir gráficas.

Diseño del algoritmo

Vamos a utilizar una técnica de diseño de algoritmos llamada programación dinámica, vista en el libro Algorithm Design de Jon Kleinberg y Eva Tardos. Consiste en, a partir del problema inicial, obtener un conjunto de subproblemas, los cuales vamos a solucionar para construir una solución al problema original. Debemos ser capaces de construir soluciones a los subproblemas mediante una relación de recurrencia.

Para resolver la regresión segmentada, la siguiente observación es muy útil: el último punto $pn$ pertenece a un solo segmento en la partición óptima $X{opt}$, y dicho segmento comienza en un punto anterior $pi$. Esta observación nos lleva al conjunto de subproblemas que buscamos: si sabemos solucionar el problema para el último segmento ${p_i,…,p_n}$, entonces podemos quitarlos de consideración para el subproblema de los puntos restantes ${p_1,…,p{i-1}}$.

Entonces si $OPT(i-1)$ representa la solución óptima para el subconjunto de puntos ${p1,…,p{i-1}}$ y denotamos $e_{i,j}$ como el error de regresión lineal en el último segmento ${p_i,…,p_n}$, la solución óptima del problema es

$$ OPT(n) = e_{i,n} + \lambda + OPT(i-1) $$

De manera general, podemos aplicar la misma lógica para el subproblema determinado para el segmento ${p1,…,p{j}}$

$$ OPT(j) = \min{1 \leq i \leq j} {e{i,j} + \lambda + OPT(i-1)} $$

incluyendo dicho segmento en la solución óptima si y solo si el mínimo es obtenido en el indice $i$.

Implementación

defmodule SegmentedRegression do
  def compute_opt(n, errors, lambda \\ 1) do
    opt = initial_opt(n)

    for j <- 0..(n - 1) do
      opt_j =
        for i <- 1..j do
          opt[i - 1]
          |> Nx.add(errors[i][j])
          |> Nx.add(lambda)
          |> Nx.to_number()
        end
        |> Enum.min()

      opt = update_opt(opt, j, opt_j)
      Nx.to_number(opt[j])
    end
    |> Nx.tensor()
  end

  def find_segments(j, x, y, errors, opt, lambda \\ 1)
  def find_segments(j, _, _, _, _, _) when j <= 0, do: []

  def find_segments(j, x, y, errors, opt, lambda) do
    {_, min_index} =
      for i <- 1..j do
        opt[i - 1]
        |> Nx.add(errors[i][j])
        |> Nx.add(lambda)
        |> Nx.to_number()
      end
      |> Enum.zip(0..j)
      |> Enum.min_by(&amp;elem(&amp;1, 0))

    [
      {x[min_index..j], y[min_index..j]}
      | find_segments(min_index - 1, x, y, errors, opt, lambda)
    ]
  end

  def calculate_errors(x, y, n) do
    for i <- 0..(n - 1) do
      for j <- 0..(n - 1) do
        if i <= j do
          x[i..j]
          |> LinearRegression.fit(y[i..j])
          |> LinearRegression.predict(x[i..j])
          |> Distance.squared_euclidean(y[i..j])
          |> Nx.to_number()
        else
          :nan
        end
      end
    end
    |> Nx.tensor()
  end

  defp initial_opt(n) do
    Nx.tensor(:nan)
    |> Nx.tile([n])
    |> Nx.indexed_put(Nx.tensor([[0]]), Nx.tensor([0]))
  end

  defp update_opt(opt, index, update) do
    Nx.indexed_put(opt, Nx.tensor([[index]]), Nx.tensor([update]))
  end
end

Ejemplos con diferentes datasets

# Datos de cultivos de papa (ton/ha) en función de precipitaciones (mm)
# Se combinaron y ordenaron datos de entrenamiento y validacion 
samples =
  Enum.zip(
    [206, 188, 219, 372, 345, 231, 203, 170, 55, 91, 292, 141, 129, 170, 324] ++
      [213, 80, 391, 250, 57, 303, 263, 157, 72, 157, 188, 216, 362, 283, 308],
    [29, 25, 31, 25, 29, 30, 26, 23, 12, 15, 28, 24, 23, 22, 30] ++
      [30, 16, 25, 26, 9, 28, 28, 25, 13, 23, 26, 25, 28, 33, 30]
  )
  |> Enum.sort_by(fn {x, _y} -> x end, &amp;<=/2)

n_samples = length(samples)

x =
  samples
  |> Enum.map(fn {x, _y} -> x end)
  |> Nx.tensor()
  |> Nx.stack(axis: 1)

y =
  samples
  |> Enum.map(fn {_x, y} -> y end)
  |> Nx.tensor()
  |> Nx.stack(axis: 1)

errors = SegmentedRegression.calculate_errors(x, y, n_samples)
opt = SegmentedRegression.compute_opt(n_samples, errors)
segments = SegmentedRegression.find_segments(n_samples - 1, x, y, errors, opt)
alias Explorer.DataFrame, as: DF

scatter =
  [x, y]
  |> Nx.concatenate(axis: 1)
  |> DF.new()
  |> Tucan.scatter("x1", "x2",
    width: 500,
    height: 500,
    filled: true,
    point_size: 50,
    point_color: "red"
  )

Tucan.layers(
  for {x, y} <- segments do
    y_best_fit =
      LinearRegression.fit(x, y)
      |> LinearRegression.predict(x)
      |> Nx.as_type(:f64)

    Nx.concatenate([x, y_best_fit], axis: 1)
    |> DF.new()
    |> Tucan.lineplot("x1", "x2",
      width: 500,
      height: 350
    )
  end ++ [scatter]
)
ice_cream_df =
  Kino.FS.file_path("ice_cream_selling_data.csv")
  |> DF.from_csv!()

ice_cream_df
|> Tucan.scatter("Temperature (°C)", "Ice Cream Sales (units)",
  width: 500,
  height: 350,
  filled: true,
  point_size: 50,
  point_color: "red"
)
{x, y} =
  ice_cream_df
  |> Nx.stack(axis: 1)
  |> Nx.sort(direction: :asc, axis: 1)
  |> Nx.split(0.5, axis: 1)

n_samples = elem(Nx.shape(x), 0)

errors = SegmentedRegression.calculate_errors(x, y, n_samples)
opt = SegmentedRegression.compute_opt(n_samples, errors)
segments = SegmentedRegression.find_segments(n_samples - 1, x, y, errors, opt)
scatter =
  [x, y]
  |> Nx.concatenate(axis: 1)
  |> DF.new()
  |> Tucan.scatter("x1", "x2",
    width: 500,
    height: 500,
    filled: true,
    point_size: 50,
    point_color: "red"
  )

Tucan.layers(
  for {x, y} <- segments do
    y_best_fit =
      LinearRegression.fit(x, y)
      |> LinearRegression.predict(x)
      |> Nx.as_type(:f64)

    Nx.concatenate([x, y_best_fit], axis: 1)
    |> DF.new()
    |> Tucan.lineplot("x1", "x2",
      width: 500,
      height: 350
    )
  end ++ [scatter]
)