Copying Tables Using Ecto.Repo.stream/2
Mix.install(
[
{:ecto, "~> 3.11"},
{:ecto_sql, "~> 3.11"},
{:postgrex, "~> 0.18.0"}
],
system_env: [
SOURCE_DB_URL: "postgres://postgres@localhost/source_db",
TARGET_DB_URL: "postgres://postgres@localhost/target_db"
]
)
Introduction
In this guide, we will explore how to efficiently stream data from one PostgreSQL database to another using Ecto. This approach is particularly useful for handling large datasets without consuming too much memory, as it processes the data in chunks rather than loading it all at once.
Streaming data between two databases is often necessary for data migrations, backups, or ETL (Extract, Transform, Load) processes. Using Ecto’s streaming capabilities, we can efficiently copy data from one database to another without overloading system resources.
We will use Ecto, Ecto SQL, and Postgrex libraries to facilitate this process. The steps below detail how to set up, seed, and copy data between two repositories.
Setting Up Repositories and Seeding Data
Configuring Database URLs
First, we need to set the database URLs for SourceRepo
and TargetRepo
using environment variables. These variables should be:
-
SOURCE_DB_URL
- The URL for the source database. -
TARGET_DB_URL
- The URL for the target database.
Refer to the Ecto.Repo documentation for more information on how to properly configure URLs.
Defining Repositories
We need to define repository modules for both our source and target databases.
defmodule SourceRepo do
use Ecto.Repo,
otp_app: :stream_app,
adapter: Ecto.Adapters.Postgres,
log: false
def init(_type, config) do
{:ok, Keyword.merge(config, url: System.fetch_env!("SOURCE_DB_URL"), log: false)}
end
end
defmodule TargetRepo do
use Ecto.Repo,
otp_app: :stream_app,
adapter: Ecto.Adapters.Postgres,
log: false
def init(_type, config) do
{:ok, Keyword.merge(config, url: System.fetch_env!("TARGET_DB_URL"), log: false)}
end
end
{SourceRepo, TargetRepo}
{SourceRepo, TargetRepo}
After defining these modules, we need to start the repositories:
# remove comments below to create databases
# Mix.Tasks.Ecto.Create.run(["-r", "SourceRepo"])
# Mix.Tasks.Ecto.Create.run(["-r", "TargetRepo"])
for repo <- Ecto.Repo.all_running() do
repo.stop()
end
{:ok, _pid} = SourceRepo.start_link()
{:ok, _pid} = TargetRepo.start_link()
"Repos have been started."
"Repos have been started."
Creating the Users Table
Defining the Migration
We will create a users
table in both databases, which will include a birthday
column. The migration script for this is as follows:
defmodule Migrations.AddUsersTable do
use Ecto.Migration
def change do
create_if_not_exists table("users") do
add :birthday, :date, null: false
end
end
end
Migrations.AddUsersTable
Migrations.AddUsersTable
Running the Migration
To apply this migration to both the source and target databases, run the following commands:
opts = [all: true, log_migrations_sql: :debug]
Ecto.Migrator.run(SourceRepo, [{0, Migrations.AddUsersTable}], :up, opts)
Ecto.Migrator.run(TargetRepo, [{0, Migrations.AddUsersTable}], :up, opts)
"Migrations have been applied."
15:53:29.021 [info] Migrations already up
15:53:29.024 [info] Migrations already up
"Migrations have been applied."
Seeding Data into the Source Database
Generating Sample Data
To test our streaming process, we need to add some data to the users
table in SourceRepo
. Here, we generate 1,000,000 users with random birthdays:
users_stream = Stream.map(1..1_000_000, fn id ->
birthday = Date.add(~D[1950-01-01], :rand.uniform(365 * 50))
Enum.join([id, Date.to_iso8601(birthday)], "\t") <> "\n"
end)
#Stream<[enum: 1..1000000, funs: [#Function<50.38948127/1 in Stream.map/2>]]>
Inserting Sample Data
The generated users are then inserted into the users
table in SourceRepo
using the following transaction:
SourceRepo.transaction(
fn ->
source_stream = Ecto.Adapters.SQL.stream(SourceRepo, "COPY users FROM STDIN", [])
Enum.into(users_stream, source_stream)
end,
timeout: :infinity
)
{:ok,
%Ecto.Adapters.SQL.Stream{
meta: %{
pid: #PID<0.269.0>,
opts: [repo: SourceRepo, timeout: 15000, pool_size: 10, pool: DBConnection.ConnectionPool],
cache: #Reference<0.1091724032.3306815494.52797>,
stacktrace: nil,
adapter: Ecto.Adapters.Postgres,
repo: SourceRepo,
telemetry: {SourceRepo, false, [:source_repo, :query]},
sql: Ecto.Adapters.Postgres.Connection
},
statement: "COPY users FROM STDIN",
params: [],
opts: []
}}
Verifying the Inserted Data
We can verify that the data was successfully inserted by counting the rows in the users
table:
import Ecto.Query
count = SourceRepo.aggregate(from("users"), :count)
"#{count} users are inserted"
"1000000 users are inserted"
Copying Data Between Databases
Preparing for Data Copying
Before copying the data, let’s check the current count of users in the TargetRepo
to have a reference for later verification:
import Ecto.Query
queryable = from("users")
before_count = TargetRepo.aggregate(queryable, :count)
0
Streaming Data from Source to Target
To copy data from the SourceRepo
to the TargetRepo
, we use Ecto’s COPY
command with streaming:
SourceRepo.transaction(
fn ->
source =
SourceRepo
|> Ecto.Adapters.SQL.stream("COPY users TO STDOUT", [])
|> Stream.flat_map(fn %Postgrex.Result{rows: rows} -> rows end)
TargetRepo.transaction(
fn ->
target = Ecto.Adapters.SQL.stream(TargetRepo, "COPY users FROM STDIN", [])
Enum.into(source, target)
end,
timeout: :infinity
)
end,
timeout: :infinity
)
{:ok,
{:ok,
%Ecto.Adapters.SQL.Stream{
meta: %{
pid: #PID<0.282.0>,
opts: [repo: TargetRepo, timeout: 15000, pool_size: 10, pool: DBConnection.ConnectionPool],
cache: #Reference<0.1091724032.3306815494.52834>,
stacktrace: nil,
adapter: Ecto.Adapters.Postgres,
repo: TargetRepo,
telemetry: {TargetRepo, false, [:target_repo, :query]},
sql: Ecto.Adapters.Postgres.Connection
},
statement: "COPY users FROM STDIN",
params: [],
opts: []
}}}
Verifying the Data Copy
Once the data is copied, we verify the operation by checking the count of rows in TargetRepo
:
after_count = TargetRepo.aggregate(queryable, :count)
"#{after_count - before_count } users have been copied from the source db to the target db."
"1000000 users have been copied from the source db to the target db."
Conclusion
In this guide, we’ve learned how to use Ecto.Repo.stream/2
to efficiently stream data between two PostgreSQL databases. This approach allows us to handle large data transfers without consuming excessive memory, making it ideal for large-scale operations.
By following these steps, you can easily adapt this method to perform data migrations, ETL tasks, or simply move data between two databases in a resource-efficient manner.
Resetting the Database
Truncating the Users Table
To reset the database and clear the users
table in both repositories, use the following commands:
Ecto.Adapters.SQL.query!(SourceRepo, "TRUNCATE users RESTART IDENTITY;", [])
Ecto.Adapters.SQL.query!(TargetRepo, "TRUNCATE users RESTART IDENTITY;", [])
:ok
:ok
Verifying the Reset
Finally, verify that both tables are empty:
import Ecto.Query
source_count = SourceRepo.aggregate(from("users"), :count)
target_count = TargetRepo.aggregate(from("users"), :count)
IO.puts("""
source: #{source_count} users
target: #{target_count} users
""")
source: 0 users
target: 0 users
:ok