NX multivariate regression
Section
Based very heavily on Sean Moriarity’s “Up and Running Nx”, which you can find at:
I have extended his univariate regression model:
y = m * x + b
to a multivariate regression model:
y = m1 * x1 + m2 * x2 + b
Note: The code examples in Sean’s article use Nx.to_scalar/1
, which no longer seems to exist. I switched to use Nx.to_integer/1
Mix.install([
{:exla, "~> 0.1.0-dev", github: "elixir-nx/nx", sparse: "exla"},
{:nx, "~> 0.1.0-dev", github: "elixir-nx/nx", sparse: "nx", override: true}
])
defmodule LinReg do
import Nx.Defn
defn predict({m, b}, x) do
m * x + b
end
defn loss(params, x, y) do
y_pred = predict(params, x)
Nx.mean(Nx.power(y - y_pred, 2))
end
defn update({m, b} = params, inp, tar) do
{grad_m, grad_b} = grad(params, &loss(&1, inp, tar))
{
m - grad_m * 0.01,
b - grad_b * 0.01
}
end
defn init_random_params do
m = Nx.random_normal({}, 0.0, 0.1)
b = Nx.random_normal({}, 0.0, 0.1)
{m, b}
end
def train(epochs, data) do
init_params = init_random_params()
for _ <- 1..epochs, reduce: init_params do
acc ->
data
|> Enum.take(200)
|> Enum.reduce(
acc,
fn batch, cur_params ->
{inp, tar} = Enum.unzip(batch)
x = Nx.tensor(inp)
y = Nx.tensor(tar)
update(cur_params, x, y)
end
)
end
end
end
# Sean's original example used `:rand.normal(0.0, 10.0)`,
# but `:rand.uniform` is easier to explain & understand
target_m = (:rand.uniform_real() * 10) |> IO.inspect(label: "actual target_m")
target_b = (:rand.uniform_real() * 10) |> IO.inspect(label: "actual target_b")
target_fn = fn x -> target_m * x + target_b end
# Function to generate a list of 32 random uniform variables between 0.0 and 10.0
thirty_two = fn -> for _ <- 1..32, do: :rand.uniform() * 10 end
# Function to generate lists containing 32 two-element tuples with
# first element a random uniform variables between 0.0 and 10.0
# second element the result of calling `target_fn` with the first tuple element as `x`
# In other words, 32 tuples of {`x`, `target_m` * `x` + `target_b`}
data =
Stream.repeatedly(fn -> thirty_two.() end)
|> Stream.map(fn x -> Enum.zip(x, Enum.map(x, target_fn)) end)
data
|> Enum.take(3)
IO.puts("Target m: #{target_m}\tTarget b: #{target_b}\n")
{m, b} = LinReg.train(100, data)
IO.puts("Learned m: #{Nx.to_number(m)}\tLearned b: #{Nx.to_number(b)}")
defmodule Eqn do
@target_m1 (:rand.uniform_real() * 10)
|> IO.inspect(label: "actual @target_m1")
@target_m2 (:rand.uniform_real() * 10.0)
|> IO.inspect(label: "actual @target_m2")
@target_b (:rand.uniform_real() * 5.0)
|> IO.inspect(label: "actual @target_b")
def target_fn({x1, x2}) do
@target_m1 * x1 + @target_m2 * x2 + @target_b
end
def calc_datapoints(exs) do
exs
|> Enum.zip(do_calc_datapoints(exs))
end
defp do_calc_datapoints(exs) do
Enum.map(exs, &do_calc_datapoint/1)
end
defp do_calc_datapoint({x1, x2}) do
target_fn({x1, x2})
end
end
data2 =
Stream.repeatedly(fn -> for _ <- 1..32, do: {:rand.uniform() * 10, :rand.uniform() * 10} end)
|> Stream.map(fn xes -> Eqn.calc_datapoints(xes) end)
data2
|> Enum.take(2)
defmodule LinReg2 do
import Nx.Defn
defn predict({m1, m2, b}, {x1, x2}) do
m1 * x1 + m2 * x2 + b
end
defn loss(params, xes, y) do
y_pred = predict(params, xes)
Nx.mean(Nx.power(y - y_pred, 2))
end
defn update({m1, m2, b} = params, inp, tar) do
{grad_m1, grad_m2, grad_b} = grad(params, &loss(&1, inp, tar))
{
m1 - grad_m1 * 0.01,
m2 - grad_m2 * 0.01,
b - grad_b * 0.01
}
end
defn init_random_params do
m1 = Nx.random_normal({}, 0.0, 0.1)
m2 = Nx.random_normal({}, 0.0, 0.1)
b = Nx.random_normal({}, 0.0, 0.1)
{m1, m2, b}
end
def train(epochs, data) do
init_params = init_random_params()
for _ <- 1..epochs, reduce: init_params do
acc ->
data
|> Enum.take(200)
|> Enum.reduce(
acc,
fn batch, cur_params ->
# IO.inspect(batch, label: "batch")
# IO.inspect(cur_params, label: "cur_params")
{xes, tar} = Enum.unzip(batch)
{inp_x1, inp_x2} = Enum.unzip(xes)
x1 = Nx.tensor(inp_x1)
x2 = Nx.tensor(inp_x2)
y = Nx.tensor(tar)
update(cur_params, {x1, x2}, y)
end
)
end
end
end
# IO.puts("Target m1: #{target_m1}\tTarget m2: #{target_m2}\tTarget b: #{target_b}\n")
{m1, m2, b} = LinReg2.train(10, data2)
IO.puts(
"Learned m1: #{Nx.to_number(m1)}\tLearned m2: #{Nx.to_number(m2)}\tLearned b: #{Nx.to_number(b)}"
)
# y = 2*x1 + 5*x2 + 7
x = Nx.tensor([[1, 1, 1], [1, 2, 2], [1, 3, 3], [1, 4, 5]])
b = Nx.tensor([2, 5, 7])
# Xb
y = Nx.dot(x, b)
x_prime_x = Nx.dot(Nx.transpose(x), x)
x_prime_y = Nx.dot(Nx.transpose(x), y)
b_hat = Nx.dot(Nx.LinAlg.invert(x_prime_x), x_prime_y)
defmodule Regression do
@doc """
E.g.:
x1 = [1, 2, 3, 4]
x2 = [1, 2, 3, 5]
"""
def bivariate(y, x1, x2)
when is_list(y) and is_list(x1) and is_list(x2) and length(x1) == length(x2) do
x0 = List.duplicate(1, length(x1))
x = Nx.tensor([x0, x1, x2]) |> Nx.transpose()
y = Nx.tensor(y)
x_prime_x = Nx.dot(Nx.transpose(x), x)
x_prime_y = Nx.dot(Nx.transpose(x), y)
b_hat = Nx.dot(Nx.LinAlg.invert(x_prime_x), x_prime_y)
y_hat = Nx.dot(x, b_hat)
%{b_hat: b_hat, y_hat: y_hat}
end
end
Regression.bivariate([14, 26, 38, 57], [1, 2, 3, 4], [1, 2, 3, 5])