Powered by AppSignal & Oban Pro

Cloudflareq Clients

livebooks/clients.livemd

Cloudflareq Clients

Mix.install([
  {:cloudflareq, path: Path.join(__DIR__, "..")},
  {:req_s3, "~> 0.2"}
])

Configuration

All clients require a Cloudflare Account ID and API Token. Set them here and they’ll be shared across all examples.

account_id = System.fetch_env!("CLOUDFLARE_ACCOUNT_ID")
api_token = System.fetch_env!("CLOUDFLARE_API_TOKEN")

shared_opts = [cf_account_id: account_id, cf_api_token: api_token]

:ok

D1 (SQL Database)

The Cloudflareq.D1 client provides access to Cloudflare’s D1 serverless SQL databases.

Create a client

d1 = Cloudflareq.D1.new(shared_opts)

List databases

{:ok, databases} = Cloudflareq.D1.list_databases(d1)
databases

Create a database

{:ok, db} = Cloudflareq.D1.create_database(d1, "livebook-demo")
db

Run queries

Pass the :database_id option to target a specific database.

d1_db = Cloudflareq.D1.new(shared_opts ++ [database_id: db.uuid])

Create a table

{:ok, result} =
  Cloudflareq.D1.query(d1_db, """
  CREATE TABLE IF NOT EXISTS users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    email TEXT NOT NULL UNIQUE
  )
  """)

result

Insert rows with parameterized queries

{:ok, result} =
  Cloudflareq.D1.query(
    d1_db,
    "INSERT INTO users (name, email) VALUES (?1, ?2)",
    ["Alice", "alice@example.com"]
  )

result.meta

Batch queries

Execute multiple queries in a single request. Each entry can be a SQL string or a {sql, params} tuple.

{:ok, results} =
  Cloudflareq.D1.batch(d1_db, [
    {"INSERT INTO users (name, email) VALUES (?1, ?2)", ["Bob", "bob@example.com"]},
    {"INSERT INTO users (name, email) VALUES (?1, ?2)", ["Carol", "carol@example.com"]},
    "SELECT * FROM users"
  ])

# The last result contains the SELECT rows
List.last(results).rows

Clean up

Cloudflareq.D1.delete_database(d1, db.uuid)

R2 (Object Storage)

The Cloudflareq.R2 client manages Cloudflare R2 buckets and provides S3-compatible access for object operations.

Create a client

r2 = Cloudflareq.R2.new(shared_opts)

List buckets

{:ok, %{buckets: buckets}} = Cloudflareq.R2.list_buckets(r2)
buckets

Create a bucket

{:ok, bucket} = Cloudflareq.R2.create_bucket(r2, "livebook-demo")
bucket

Bucket details

{:ok, bucket} = Cloudflareq.R2.get_bucket(r2, "livebook-demo")
bucket

S3-compatible object operations

R2 is S3-compatible. Use create_temp_credentials/3 to get short-lived credentials, then s3/2 to create a Req request with ReqS3 attached.

> Note: S3 operations require the req_s3 dependency and a parent access key. See the > Cloudflare docs for creating R2 API > tokens with S3 access.

# Create temporary credentials scoped to the bucket
{:ok, creds} =
  Cloudflareq.R2.create_temp_credentials(r2,
    bucket: "livebook-demo",
    parent_access_key_id: System.fetch_env!("R2_ACCESS_KEY_ID"),
    permission: "object-read-write",
    ttl_seconds: 900
  )

# Build an S3-compatible request using the temp credentials
s3_req = Cloudflareq.R2.s3(r2, creds)

Upload an object

Req.put!(s3_req, url: "s3://livebook-demo/hello.txt", body: "Hello from Livebook!")

Download an object

resp = Req.get!(s3_req, url: "s3://livebook-demo/hello.txt")
resp.body

Generate a presigned URL

{:ok, url} = Cloudflareq.R2.presign_url(r2, "livebook-demo", "hello.txt", expires_in: 3600)
url

Clean up

Cloudflareq.R2.delete_bucket(r2, "livebook-demo")

Workers (Serverless Scripts)

The Cloudflareq.Workers client manages Cloudflare Worker scripts.

Create a client

workers = Cloudflareq.Workers.new(shared_opts)

List scripts

{:ok, scripts} = Cloudflareq.Workers.list_scripts(workers)
scripts

Upload a script

script_content = """
export default {
  async fetch(request) {
    return new Response("Hello from Cloudflareq!");
  }
};
"""

{:ok, script} =
  Cloudflareq.Workers.upload_script(workers, "livebook-demo", script_content, %{
    "main_module" => "worker.mjs",
    "compatibility_date" => "2024-01-01"
  })

script

Get script content

{:ok, content} = Cloudflareq.Workers.get_script_content(workers, "livebook-demo")
content

Update script content

updated_content = """
export default {
  async fetch(request) {
    const url = new URL(request.url);
    return new Response(`Hello from ${url.pathname}!`);
  }
};
"""

{:ok, script} = Cloudflareq.Workers.put_script_content(workers, "livebook-demo", updated_content)
script

Clean up

Cloudflareq.Workers.delete_script(workers, "livebook-demo", force: true)

Queues (Message Queues)

The Cloudflareq.Queues client manages Cloudflare Queues for asynchronous message passing.

Create a client

queues = Cloudflareq.Queues.new(shared_opts)

List queues

{:ok, queue_list} = Cloudflareq.Queues.list_queues(queues)
queue_list

Create a queue

{:ok, queue} = Cloudflareq.Queues.create_queue(queues, "livebook-demo")
queue

Send a message

:ok = Cloudflareq.Queues.send_message(queues, queue.queue_id, %{event: "user.signup", user_id: 42})

Send a batch of messages

messages = [
  %{body: %{event: "order.created", order_id: 1}},
  %{body: %{event: "order.created", order_id: 2}},
  %{body: %{event: "order.shipped", order_id: 1}, delay_seconds: 30}
]

:ok = Cloudflareq.Queues.send_message_batch(queues, queue.queue_id, messages)

Pull messages

Queues support pull-based consumers. Create an HTTP pull consumer first, then pull messages.

{:ok, consumer} =
  Cloudflareq.Queues.create_consumer(queues, queue.queue_id, type: "http_pull")

consumer
{:ok, %{messages: messages}} =
  Cloudflareq.Queues.pull_messages(queues, queue.queue_id, batch_size: 10)

messages

Acknowledge messages

After processing, acknowledge messages to remove them from the queue, or retry them.

if messages != [] do
  acks = Enum.map(messages, &%{lease_id: &1.lease_id})

  {:ok, ack_result} =
    Cloudflareq.Queues.ack_messages(queues, queue.queue_id, acks: acks)

  ack_result
end

Consumer management

{:ok, consumers} = Cloudflareq.Queues.list_consumers(queues, queue.queue_id)
consumers

Clean up

Cloudflareq.Queues.delete_consumer(queues, queue.queue_id, consumer.consumer_id)
Cloudflareq.Queues.delete_queue(queues, queue.queue_id)