Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Intro to Timescale

guides/intro.livemd

Intro to Timescale

Mix.install([
  {:timescale, "~> 0.1.0"},
  {:kino_db, "~> 0.2.0"},
  {:jason, "~> 1.4"},
  {:postgrex, "~> 0.16.5"},
  {:kino_ecto, git: "https://github.com/vorce/kino_ecto"}
])

# Add https://github.com/vorce/kino_ecto

Building a health tracker

Let’s learn about using the timescale Elixir library for TimescaleDB.

For our example, let’s imagine we’re building a fitness application that tracks your heart rate through a wearable device. Our application will receive (literal) heartbeats from the wearable device at varying intervals that we will record into the database.

Setting up the Repo

First and foremost, please ensure you have TimescaleDB installed on your machine using the installation guide. To go through these examples, you’ll need the Timescale Toolkit installed on your machine as well. The best way to install is with Docker, or if you’re brave, build from source.

Once Postgres/Timescale is up in running, you’ll also need to set up secrets for connecting to a database. This will also require creating a Postgres database. Try the following commands in your shell:

$ psql -c 'create database timescale_fitness'

You’ll also need to set the following secrets in Livebook

  • POSTGRES_HOST - postgres hostname to connect to (default localhost)
  • POSTGRES_USER - postgres username to connect with (default postgres)
  • POSTGRES_PASS - postgres password to connect with (default postgres)
  • POSTGRES_DATABASE - database name (default timescale_fitness)
defmodule Fitness.Repo do
  use Ecto.Repo,
    otp_app: :fitness,
    adapter: Ecto.Adapters.Postgres

  def init(_type, config) do
    {:ok,
     Keyword.merge(config,
       migration_lock: :pg_advisory_lock,
       hostname: System.get_env("LB_POSTGRES_HOST", "localhost"),
       port: 5432,
       username: System.get_env("LB_POSTGRES_USER", "postgres"),
       password: System.get_env("LB_POSTGRES_PASSWORD", "postgres"),
       # If using Fly, enable IPv6 below
       # socket_options: [:inet6],
       database: System.get_env("LB_POSTGRES_DATABASE", "timescale_fitness")
     )}
  end

  # Helper function for LiveBook demonstrations.
  def migrate({num, migration}, direction) when is_atom(migration) do
    {:ok, _, _} =
      Ecto.Migrator.with_repo(__MODULE__, fn repo ->
        Ecto.Migrator.run(repo, [{num, migration}], direction, all: true)
      end)

    "Successfully Migrated #{inspect(migration)}"
  end
end

alias Fitness.Repo
import Ecto.Query, except: [first: 2, last: 2]
{:ok, repo_pid} = Kino.start_child(Repo)
opts = [
  hostname: "localhost",
  port: 5432,
  username: "postgres",
  password: "postgres",
  database: "timescale_fitness"
]

{:ok, conn} = Kino.start_child({Postgrex, opts})

Creating our hypertable migration

defmodule Fitness.Repo.Migrations.CreateHeartbeat do
  use Ecto.Migration

  import Timescale.Migration

  def up do
    create_timescaledb_extension()

    create_if_not_exists table(:users) do
      add(:fullname, :string)
    end

    create_if_not_exists(unique_index(:users, [:fullname]))

    create_if_not_exists table(:heartbeats, primary_key: false) do
      add(:timestamp, :naive_datetime_usec, null: false)
      add(:user_id, references(:users), null: false)
    end

    create_hypertable(:heartbeats, :timestamp)
  end

  def down do
    drop(table("heartbeats"), mode: :cascade)
    drop(table("users"), mode: :cascade)

    drop_timescaledb_extension()
  end
end

Repo.migrate({0, Fitness.Repo.Migrations.CreateHeartbeat}, :up)

Generating mock data

To facilitate our example, let’s create three users who are tracking their heartbeats.

users = [
  %{fullname: "Dave Lucia"},
  %{fullname: "Alex Koutmos"},
  %{fullname: "Peter Ullrich"}
]

Repo.insert_all("users", users, on_conflict: :nothing)

query =
  from(u in "users", order_by: [desc: u.fullname], select: %{id: u.id, fullname: u.fullname})

[%{id: peter_id} = peter, %{id: dave_id} = dave, %{id: alex_id} = alex] = Repo.all(query)

Next, we’ve built a little module to help us simulate heartbeats for an entire day.

defmodule Fitness.Generator do
  @ms_in_day :timer.hours(24)

  @doc """
  Given a date, will generate a list of heartbeats for the day,
  represented as a list of maps with a `timestamp` field
  """
  @spec generate_heartbeats(%{id: integer()}, Date.t()) :: list(%{timestamp: NaiveDateTime.t()})
  def generate_heartbeats(user, day) do
    do_generate_heartbeats(user, [], NaiveDateTime.new!(day, ~T[00:00:00.000]), 0)
  end

  defp do_generate_heartbeats(user, heartbeats, day, ms) do
    # keep it between 60-200 beats per minute
    next = floor(:timer.minutes(1) / Enum.random(60..200)) + ms

    if next < @ms_in_day do
      heartbeat = %{timestamp: NaiveDateTime.add(day, next, :millisecond), user_id: user.id}
      do_generate_heartbeats(user, [heartbeat | heartbeats], day, next)
    else
      Enum.reverse(heartbeats)
    end
  end
end

# TODO try method in https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit

batch_insert = fn heartbeats ->
  heartbeats
  |> Enum.chunk_every(100)
  |> Enum.map(fn chunk ->
    Repo.insert_all("heartbeats", chunk)
  end)
end

Generate small dataset

Next, we generate heartbeats for one user, and batch insert them into the database.

batch_insert.(Fitness.Generator.generate_heartbeats(alex, ~D[2022-09-22]))

Querying with Timescale

Now that we have a dataset generated, let’s try some queries using Timescale hyperfunctions. Let’s start with the basics, and try to get the first and last values in our timeseries, using the first and last hyperfunctions.

import Timescale.Hyperfunctions

Repo.all(
  from(h in "heartbeats",
    where: h.user_id == ^alex_id,
    select: {first(h.timestamp, h.timestamp), last(h.timestamp, h.timestamp)}
  )
)

Ok, so not so interesting, but we can validate that our data starts roughly at the beginning of today and ends towards midnight.

For more of a challenge, let’s use the time_bucket hyperfunction to calculate average BPM for each hour.

First, let’s explore how the time_bucket hyperfunction works.

Repo.all(
  from(h in "heartbeats",
    where: h.user_id == ^alex_id,
    select: {h.timestamp, time_bucket(h.timestamp, "1 second")},
    limit: 5
  )
)

The first item in each tuple is the actual timestamp of the heartbeat, down to the microsecond. The second item is the result of time_bucket/2 on the timestamp, bucketed down to the nearest second. time_bucket/2 acts like the floor/1 math function, and as we’ll see in a moment, enables further aggregation over time-series.

Repo.all(
  from(h in "heartbeats",
    where: h.user_id == ^alex_id,
    group_by: selected_as(:minute),
    select: %{
      minute: selected_as(time_bucket(h.timestamp, "1 minute"), :minute),
      bpm: count(h)
    },
    limit: 5
  )
)

Performance with large datasets

So far, we’ve only taken a look at a relatively small dataset.

Repo.one(from(h in "heartbeats", select: count(h)))

Let’s make this way more interesting by generating a months worth of heartbeats for our second user. As shown above, that should put us around 5 million rows of data. Go grab a soda, as this is gonna take a little while to execute.

for date <- Date.range(~D[2022-10-01], ~D[2022-10-31]) do
  batch_insert.(Fitness.Generator.generate_heartbeats(dave, date))
end
result3 = Postgrex.query!(conn, "select * from approximate_row_count('heartbeats');", [])

Now that we have a large dataset, let’s retry our previous queries and see how they perform.

result4 =
  Postgrex.query!(
    conn,
    "explain analyze select first(h.timestamp, h.timestamp), last(h.timestamp, h.timestamp) from heartbeats as h where user_id = 1;",
    []
  )

Wow! They’re still lightning fast. Timescale changes the query plan to make use of the hypertable architecture. Each chunk of our hypertable is aware of the time column, and can easily eliminate rows in bulk from our query plan.

Compression

One of the most compelling features of TimescaleDB is its ability to compress data. While data storage may be relatively cheap, the rate at which time-series data grows changes the equation a bit!

result5 =
  Postgrex.query!(
    conn,
    "select pg_size_pretty(hypertable_size('heartbeats')) as heartbeats_size",
    []
  )
defmodule Fitness.Repo.Migrations.CompressHeartbeats do
  use Ecto.Migration

  import Timescale.Migration

  def up do
    enable_hypertable_compression(:heartbeats, segment_by: :user_id)
    add_compression_policy(:heartbeats, "7 days")
  end

  def down do
    execute("SELECT decompress_chunk(i, true) from show_chunks('heartbeats') i;")
    flush()
    remove_compression_policy(:test_hypertable, if_exists: true)
    disable_hypertable_compression(:heartbeats)
  end
end

Repo.migrate({1, Fitness.Repo.Migrations.CompressHeartbeats}, :up)

Now that we’ve compressed the table, we should observe that its size as been dramatically reduced. For some datasets, its common to see compression ratios of up to 96%.

result6 =
  Postgrex.query!(
    conn,
    "select pg_size_pretty(hypertable_size('heartbeats')) as heartbeats_size",
    []
  )

Continuous Aggregates

As should be clear by now, aggregating data over time is extremely common when working with time-series data. Often, you’ll want to compute aggregations across your dataset, grouped by various attributes such as your users, location, and of course, time.

TimescaleDB has a feature called Continuous Aggregates that dramatically improves the performance of expensive aggregations. It accomplishes this with some clever usage of materialized views, triggers, and hypertables, to produce views on data that update in real-time and query instaneously.

Let’s build a continuous aggregate for calcuating the min and max daily BPM for each user.

result7 =
  Postgrex.query!(
    conn,
    """
    CREATE MATERIALIZED VIEW heartbeats_minutely
    WITH (timescaledb.continuous) AS
      SELECT time_bucket('1 minute', h.timestamp) as minute, count(h) as bpm, user_id
      FROM heartbeats as h
      GROUP BY minute, user_id
    """,
    []
  )
result8 =
  Postgrex.query!(
    conn,
    "SELECT add_retention_policy('heartbeats', INTERVAL '1 year');",
    []
  )

Filling gaps in time-series

Warning: this section requires the TimescaleDB Toolkit to be installed, which may require building from source.

defmodule Fitness.Repo.Migrations.AddTimescaleToolkit do
  use Ecto.Migration

  import Timescale.Migration

  def up do
    create_timescaledb_toolkit_extension()
  end

  def down do
    drop_timescaledb_toolkit_extension()
  end
end

Repo.migrate({3, Fitness.Repo.Migrations.AddTimescaleToolkit}, :up)

With any system, there are opportunities for the system to go down. This could be due to a datacenter outage, global DNS being attacked, or simply an application configuration mishap. Regardless, it is possible for gaps in the collection of our data, which poses problems for calculations and aggregations.

Luckily, TimescaleDB provides hyperfunctions for gapfilling and interpoltion so smooth over these gaps.

Let’s add some heartbeats in for another user every second, skipping over a few.

Repo.insert_all("heartbeats", [
  %{timestamp: ~N[2022-11-01 09:00:00], user_id: peter_id},
  %{timestamp: ~N[2022-11-01 09:00:01], user_id: peter_id},
  # No heartbeat at 9:00:02
  %{timestamp: ~N[2022-11-01 09:00:03], user_id: peter_id},
  # No heartbeat at 9:00:04
  %{timestamp: ~N[2022-11-01 09:00:05], user_id: peter_id}
])

Now we’ve got 4 heartbeats, with two missing at seconds 2 and 4. We can use the time_bucket_gapfill/2 function to fill in heartbeats.

Repo.all(
  from(h in "heartbeats",
    where: h.user_id == ^peter_id,
    select: %{
      second: selected_as(time_bucket_gapfill(h.timestamp, "1 second"), :second),
      user_id: h.user_id
    },
    where:
      h.timestamp >= ^~N[2022-11-01 09:00:00] and
        h.timestamp <= ^~N[2022-11-01 09:00:05],
    group_by: [selected_as(:second), h.user_id]
  )
)