Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Copying Tables Using `Ecto.Repo.stream/2`

livebooks/ecto-streaming-copy.livemd

Copying Tables Using Ecto.Repo.stream/2

Mix.install(
  [
    {:ecto, "~> 3.11"},
    {:ecto_sql, "~> 3.11"},
    {:postgrex, "~> 0.18.0"}
  ],
  system_env: [
    SOURCE_DB_URL: "postgres://postgres@localhost/source_db",
    TARGET_DB_URL: "postgres://postgres@localhost/target_db"
  ]
)

Introduction

In this guide, we will explore how to efficiently stream data from one PostgreSQL database to another using Ecto. This approach is particularly useful for handling large datasets without consuming too much memory, as it processes the data in chunks rather than loading it all at once.

Streaming data between two databases is often necessary for data migrations, backups, or ETL (Extract, Transform, Load) processes. Using Ecto’s streaming capabilities, we can efficiently copy data from one database to another without overloading system resources.

We will use Ecto, Ecto SQL, and Postgrex libraries to facilitate this process. The steps below detail how to set up, seed, and copy data between two repositories.

Setting Up Repositories and Seeding Data

Configuring Database URLs

First, we need to set the database URLs for SourceRepo and TargetRepo using environment variables. These variables should be:

  • SOURCE_DB_URL - The URL for the source database.
  • TARGET_DB_URL - The URL for the target database.

Refer to the Ecto.Repo documentation for more information on how to properly configure URLs.

Defining Repositories

We need to define repository modules for both our source and target databases.

defmodule SourceRepo do
  use Ecto.Repo,
    otp_app: :stream_app,
    adapter: Ecto.Adapters.Postgres,
    log: false

  def init(_type, config) do
    {:ok, Keyword.merge(config, url: System.fetch_env!("SOURCE_DB_URL"), log: false)}
  end
end

defmodule TargetRepo do
  use Ecto.Repo,
    otp_app: :stream_app,
    adapter: Ecto.Adapters.Postgres,
    log: false

  def init(_type, config) do
    {:ok, Keyword.merge(config, url: System.fetch_env!("TARGET_DB_URL"), log: false)}
  end
end

{SourceRepo, TargetRepo}
{SourceRepo, TargetRepo}

After defining these modules, we need to start the repositories:

# remove comments below to create databases
# Mix.Tasks.Ecto.Create.run(["-r", "SourceRepo"])
# Mix.Tasks.Ecto.Create.run(["-r", "TargetRepo"])

for repo <- Ecto.Repo.all_running() do
  repo.stop()
end

{:ok, _pid} = SourceRepo.start_link()
{:ok, _pid} = TargetRepo.start_link()

"Repos have been started."
"Repos have been started."

Creating the Users Table

Defining the Migration

We will create a users table in both databases, which will include a birthday column. The migration script for this is as follows:

defmodule Migrations.AddUsersTable do
  use Ecto.Migration

  def change do
    create_if_not_exists table("users") do
      add :birthday, :date, null: false
    end
  end
end

Migrations.AddUsersTable
Migrations.AddUsersTable

Running the Migration

To apply this migration to both the source and target databases, run the following commands:

opts = [all: true, log_migrations_sql: :debug]

Ecto.Migrator.run(SourceRepo, [{0, Migrations.AddUsersTable}], :up, opts)
Ecto.Migrator.run(TargetRepo, [{0, Migrations.AddUsersTable}], :up, opts)

"Migrations have been applied."

15:53:29.021 [info] Migrations already up

15:53:29.024 [info] Migrations already up
"Migrations have been applied."

Seeding Data into the Source Database

Generating Sample Data

To test our streaming process, we need to add some data to the users table in SourceRepo. Here, we generate 1,000,000 users with random birthdays:

users_stream = Stream.map(1..1_000_000, fn id ->
  birthday = Date.add(~D[1950-01-01], :rand.uniform(365 * 50))
  Enum.join([id, Date.to_iso8601(birthday)], "\t") <> "\n"
end)
#Stream<[enum: 1..1000000, funs: [#Function<50.38948127/1 in Stream.map/2>]]>

Inserting Sample Data

The generated users are then inserted into the users table in SourceRepo using the following transaction:

SourceRepo.transaction(
  fn ->
    source_stream = Ecto.Adapters.SQL.stream(SourceRepo, "COPY users FROM STDIN", [])
    Enum.into(users_stream, source_stream)
  end,
  timeout: :infinity
)
{:ok,
 %Ecto.Adapters.SQL.Stream{
   meta: %{
     pid: #PID<0.269.0>,
     opts: [repo: SourceRepo, timeout: 15000, pool_size: 10, pool: DBConnection.ConnectionPool],
     cache: #Reference<0.1091724032.3306815494.52797>,
     stacktrace: nil,
     adapter: Ecto.Adapters.Postgres,
     repo: SourceRepo,
     telemetry: {SourceRepo, false, [:source_repo, :query]},
     sql: Ecto.Adapters.Postgres.Connection
   },
   statement: "COPY users FROM STDIN",
   params: [],
   opts: []
 }}

Verifying the Inserted Data

We can verify that the data was successfully inserted by counting the rows in the users table:

import Ecto.Query
count = SourceRepo.aggregate(from("users"), :count)

"#{count} users are inserted"
"1000000 users are inserted"

Copying Data Between Databases

Preparing for Data Copying

Before copying the data, let’s check the current count of users in the TargetRepo to have a reference for later verification:

import Ecto.Query
queryable = from("users")

before_count = TargetRepo.aggregate(queryable, :count)
0

Streaming Data from Source to Target

To copy data from the SourceRepo to the TargetRepo, we use Ecto’s COPY command with streaming:

SourceRepo.transaction(
  fn ->
    source =
      SourceRepo
      |> Ecto.Adapters.SQL.stream("COPY users TO STDOUT", [])
      |> Stream.flat_map(fn %Postgrex.Result{rows: rows} -> rows end)
  
    TargetRepo.transaction(
      fn ->
        target = Ecto.Adapters.SQL.stream(TargetRepo, "COPY users FROM STDIN", [])
        Enum.into(source, target)
      end,
      timeout: :infinity
    )
  end,
  timeout: :infinity
)
{:ok,
 {:ok,
  %Ecto.Adapters.SQL.Stream{
    meta: %{
      pid: #PID<0.282.0>,
      opts: [repo: TargetRepo, timeout: 15000, pool_size: 10, pool: DBConnection.ConnectionPool],
      cache: #Reference<0.1091724032.3306815494.52834>,
      stacktrace: nil,
      adapter: Ecto.Adapters.Postgres,
      repo: TargetRepo,
      telemetry: {TargetRepo, false, [:target_repo, :query]},
      sql: Ecto.Adapters.Postgres.Connection
    },
    statement: "COPY users FROM STDIN",
    params: [],
    opts: []
  }}}

Verifying the Data Copy

Once the data is copied, we verify the operation by checking the count of rows in TargetRepo:

after_count = TargetRepo.aggregate(queryable, :count)

"#{after_count - before_count } users have been copied from the source db to the target db."
"1000000 users have been copied from the source db to the target db."

Conclusion

In this guide, we’ve learned how to use Ecto.Repo.stream/2 to efficiently stream data between two PostgreSQL databases. This approach allows us to handle large data transfers without consuming excessive memory, making it ideal for large-scale operations.

By following these steps, you can easily adapt this method to perform data migrations, ETL tasks, or simply move data between two databases in a resource-efficient manner.

Resetting the Database

Truncating the Users Table

To reset the database and clear the users table in both repositories, use the following commands:

Ecto.Adapters.SQL.query!(SourceRepo, "TRUNCATE users RESTART IDENTITY;", [])
Ecto.Adapters.SQL.query!(TargetRepo, "TRUNCATE users RESTART IDENTITY;", [])

:ok
:ok

Verifying the Reset

Finally, verify that both tables are empty:

import Ecto.Query
source_count = SourceRepo.aggregate(from("users"), :count)
target_count = TargetRepo.aggregate(from("users"), :count)

IO.puts("""
source: #{source_count} users
target: #{target_count} users
""")
source: 0 users
target: 0 users
:ok