Powered by AppSignal & Oban Pro

Email Information Extraction

guides/tutorials/email_extraction.livemd

Email Information Extraction

Mix.install(
  [
    {:dsxir, path: Path.expand("../..", __DIR__)},
    {:kino, "~> 0.19"}
  ]
)

Overview

An end-to-end port of the DSPy email extraction tutorial to dsxir. We build a small program that classifies an email, extracts entities, summarizes the body, and proposes action items, then evaluate it on a tiny labeled set and compile a few-shot version with Dsxir.Optimizer.BootstrapFewShot.

The tutorial assumes you have read the README and are comfortable with Dsxir.Signature, Dsxir.Module, and Dsxir.context/2.

When run from a checkout of dsxir, Mix.install/1 above resolves the library from the parent directory. If you launch this livebook from elsewhere, replace the path: line with {:dsxir, "~> 0.1"}.

Configuring the LM

Credentials live in the per-request context, never in Dsxir.configure/1. We set the architectural defaults once and use a Kino input to keep the API key out of the notebook on disk.

Dsxir.configure(
  lm: {Dsxir.LM.Sycophant, [model: "openai:gpt-4o-mini"]},
  adapter: Dsxir.Adapter.Chat
)
:ok
api_key_input = Kino.Input.password("OPENAI_API_KEY")
lm_frame = fn ->
  api_key = Kino.Input.read(api_key_input)

  [lm: {Dsxir.LM.Sycophant,
        [model: "openai:gpt-4o-mini", api_key: api_key, temperature: 0.0]}]
end
#Function<43.113135111/0 in :erl_eval.expr/6>

Signatures

Four declarative input/output contracts. We use Zoi schemas inline so the JSON adapter can validate enum-typed outputs end-to-end. String enums keep the prompt and the parsed payload isomorphic.

defmodule MyApp.Email.ClassifyEmail do
  use Dsxir.Signature

  @email_types ~w(order_confirmation support_request meeting_invitation
                  newsletter promotional invoice shipping_notification other)
  @urgency_levels ~w(low medium high critical)

  signature do
    instruction """
    Classify an inbound email by topic and urgency. Use `other` when no
    category fits. Urgency is the responder's required speed, not the
    sender's tone.
    """

    input :email_subject, :string
    input :email_body, :string
    input :sender, :string, desc: "Display name and/or address of the sender."

    output :email_type, Zoi.enum(@email_types),
      desc: "One of the supported topical categories."
    output :urgency, Zoi.enum(@urgency_levels), desc: "Required response speed."
  end
end
{:module, MyApp.Email.ClassifyEmail, <<70, 79, 82, 49, 0, 0, 149, ...>>, ...}
defmodule MyApp.Email.ExtractEntities do
  use Dsxir.Signature

  signature do
    instruction """
    Extract structured entities from an email. `financial_amount` is the
    single most relevant monetary value as a float, or null when none.
    `important_dates` and `contact_info` are short strings, one per item.
    """

    input :email_content, :string
    input :email_type, :string

    output :key_entities, {:list, :string},
      desc: "Named entities relevant to the email's purpose."
    output :financial_amount, Zoi.nullable(Zoi.float()),
      desc: "Dominant monetary value, or null when none is present."
    output :important_dates, {:list, :string},
      desc: "Dates or date-times mentioned in the email."
    output :contact_info, {:list, :string},
      desc: "Phone numbers, emails, or other contact strings."
  end
end
{:module, MyApp.Email.ExtractEntities, <<70, 79, 82, 49, 0, 0, 182, ...>>, ...}
defmodule MyApp.Email.SummarizeEmail do
  use Dsxir.Signature

  signature do
    instruction "Summarize the email in two or three sentences."

    input :email_subject, :string
    input :email_body, :string
    input :key_entities, {:list, :string}

    output :summary, :string
  end
end
{:module, MyApp.Email.SummarizeEmail, <<70, 79, 82, 49, 0, 0, 127, ...>>, ...}
defmodule MyApp.Email.GenerateActionItems do
  use Dsxir.Signature

  signature do
    instruction """
    Decide whether the recipient must act, and if so list the actions.
    `priority_score` is an integer from 1 (lowest) to 10 (highest).
    `deadline` is a short string or null when no deadline is implied.
    """

    input :email_type, :string
    input :urgency, :string
    input :email_summary, :string
    input :extracted_entities, {:list, :string}

    output :action_required, :boolean
    output :action_items, {:list, :string}
    output :deadline, Zoi.nullable(Zoi.string())
    output :priority_score, :integer
  end
end
{:module, MyApp.Email.GenerateActionItems, <<70, 79, 82, 49, 0, 0, 189, ...>>, ...}

Zoi.nullable/1 is the right call for optional outputs — the JSON adapter validates null against the schema rather than reaching for a with/case fallback.

The module

The four signatures compose in a single forward/2. We thread the prog through each call/3 so per-predictor demos stay attached when we compile later.

defmodule MyApp.Email.Processor do
  use Dsxir.Module

  predictor :classify, Dsxir.Predictor.ChainOfThought,
    signature: MyApp.Email.ClassifyEmail

  predictor :extract, Dsxir.Predictor.ChainOfThought,
    signature: MyApp.Email.ExtractEntities

  predictor :summarize, Dsxir.Predictor.ChainOfThought,
    signature: MyApp.Email.SummarizeEmail

  predictor :actions, Dsxir.Predictor.ChainOfThought,
    signature: MyApp.Email.GenerateActionItems

  def forward(prog, %{email_subject: subj, email_body: body} = inputs) do
    sender = Map.get(inputs, :sender, "")

    {prog, classification} =
      call(prog, :classify, %{email_subject: subj, email_body: body, sender: sender})

    {prog, entities} =
      call(prog, :extract, %{
        email_content: subj <> "\n\n" <> body,
        email_type: classification.fields.email_type
      })

    {prog, summary} =
      call(prog, :summarize, %{
        email_subject: subj,
        email_body: body,
        key_entities: entities.fields.key_entities
      })

    {prog, actions} =
      call(prog, :actions, %{
        email_type: classification.fields.email_type,
        urgency: classification.fields.urgency,
        email_summary: summary.fields.summary,
        extracted_entities: entities.fields.key_entities
      })

    merged =
      classification.fields
      |> Map.merge(entities.fields)
      |> Map.merge(summary.fields)
      |> Map.merge(actions.fields)

    {prog, Dsxir.Prediction.new(merged)}
  end
end
{:module, MyApp.Email.Processor, <<70, 79, 82, 49, 0, 0, 113, ...>>, ...}

Notes:

  • We compose the merged prediction explicitly. Dsxir.Prediction.new/2 keeps the structure flat so callers reach for pred[:email_type], pred[:summary], and so on uniformly.
  • Reasoning fields from each ChainOfThought step are dropped by the merge. If you want them surfaced, namespace them (%{classify_reasoning: classification.fields.reasoning, ...}) before merging.

Sample emails

emails = [
  %{
    email_subject: "Order Confirmation #12345",
    sender: "orders@techstore.com",
    email_body: """
    Thank you for your order! Your MacBook Pro 16" has been confirmed.
    Total: $2,399.00. Estimated delivery: December 15, 2026.
    Tracking number: 1Z999AA1234567890.
    """
  },
  %{
    email_subject: "URGENT: Server outage affecting production",
    sender: "alerts@company.com",
    email_body: """
    Critical alert: Production servers are experiencing 100% CPU usage.
    Customer-facing services are down. Please respond immediately.
    Call the on-call engineer at +1-555-0123. Issue started at 14:30 UTC.
    """
  },
  %{
    email_subject: "Q4 Planning Meeting - Action Required",
    sender: "ceo@company.com",
    email_body: """
    Please attend the Q4 planning meeting on Friday, December 8th at 2:00 PM
    in Conference Room A. We will discuss budget allocations and strategic
    initiatives. RSVP by December 6th.
    """
  }
]
[
  %{
    email_subject: "Order Confirmation #12345",
    email_body: "Thank you for your order! Your MacBook Pro 16\" has been confirmed.\nTotal: $2,399.00. Estimated delivery: December 15, 2026.\nTracking number: 1Z999AA1234567890.\n",
    sender: "orders@techstore.com"
  },
  %{
    email_subject: "URGENT: Server outage affecting production",
    email_body: "Critical alert: Production servers are experiencing 100% CPU usage.\nCustomer-facing services are down. Please respond immediately.\nCall the on-call engineer at +1-555-0123. Issue started at 14:30 UTC.\n",
    sender: "alerts@company.com"
  },
  %{
    email_subject: "Q4 Planning Meeting - Action Required",
    email_body: "Please attend the Q4 planning meeting on Friday, December 8th at 2:00 PM\nin Conference Room A. We will discuss budget allocations and strategic\ninitiatives. RSVP by December 6th.\n",
    sender: "ceo@company.com"
  }
]

Running on one email

Run this cell once you have entered an API key above.

Dsxir.context(lm_frame.(), fn ->
  prog = Dsxir.Program.new(MyApp.Email.Processor)
  {_prog, pred} = MyApp.Email.Processor.forward(prog, hd(emails))

  %{
    type: pred[:email_type],
    urgency: pred[:urgency],
    summary: pred[:summary],
    action_items: pred[:action_items],
    priority_score: pred[:priority_score]
  }
end)
%{
  type: "order_confirmation",
  summary: "Your order #12345 for a MacBook Pro 16\" has been confirmed with a total of $2,399.00. The estimated delivery date is December 15, 2026, and your tracking number is 1Z999AA1234567890.",
  urgency: "low",
  action_items: [],
  priority_score: 2
}

Building a labeled dataset

Dsxir.Example carries inputs and labels in one map. :input_keys marks which keys flow into forward/2; the rest are treated as labels for the metric.

trainset =
  Enum.map(emails, fn email ->
    label =
      case email.email_subject do
        "Order Confirmation" <> _ -> %{email_type: "order_confirmation", urgency: "low"}
        "URGENT" <> _ -> %{email_type: "other", urgency: "critical"}
        "Q4 Planning" <> _ -> %{email_type: "meeting_invitation", urgency: "medium"}
      end

    Dsxir.Example.new(Map.merge(email, label),
      input_keys: [:email_subject, :email_body, :sender]
    )
  end)
[
  %Dsxir.Example{
    data: %{
      email_subject: "Order Confirmation #12345",
      email_body: "Thank you for your order! Your MacBook Pro 16\" has been confirmed.\nTotal: $2,399.00. Estimated delivery: December 15, 2026.\nTracking number: 1Z999AA1234567890.\n",
      sender: "orders@techstore.com",
      email_type: "order_confirmation",
      urgency: "low"
    },
    input_keys: MapSet.new([:email_subject, :email_body, :sender])
  },
  %Dsxir.Example{
    data: %{
      email_subject: "URGENT: Server outage affecting production",
      email_body: "Critical alert: Production servers are experiencing 100% CPU usage.\nCustomer-facing services are down. Please respond immediately.\nCall the on-call engineer at +1-555-0123. Issue started at 14:30 UTC.\n",
      sender: "alerts@company.com",
      email_type: "other",
      urgency: "critical"
    },
    input_keys: MapSet.new([:email_subject, :email_body, :sender])
  },
  %Dsxir.Example{
    data: %{
      email_subject: "Q4 Planning Meeting - Action Required",
      email_body: "Please attend the Q4 planning meeting on Friday, December 8th at 2:00 PM\nin Conference Room A. We will discuss budget allocations and strategic\ninitiatives. RSVP by December 6th.\n",
      sender: "ceo@company.com",
      email_type: "meeting_invitation",
      urgency: "medium"
    },
    input_keys: MapSet.new([:email_subject, :email_body, :sender])
  }
]

A metric

The metric scores classification accuracy on email_type and urgency, returning a float in [0.0, 1.0]. Dsxir.Metric.apply/4 coerces booleans, so a strict “both must match” metric could return a boolean too.

defmodule MyApp.Email.Metrics do
  @spec classification(Dsxir.Example.t(), Dsxir.Prediction.t(), nil | list()) :: float()
  def classification(%Dsxir.Example{data: data}, %Dsxir.Prediction{fields: f}, _trace) do
    type_hit = if data.email_type == f.email_type, do: 1.0, else: 0.0
    urgency_hit = if data.urgency == f.urgency, do: 1.0, else: 0.0
    (type_hit + urgency_hit) / 2.0
  end
end
{:module, MyApp.Email.Metrics, <<70, 79, 82, 49, 0, 0, 12, ...>>, ...}

Evaluating

Dsxir.Evaluate runs the devset through Task.Supervisor.async_stream_nolink/4. Settings are snapshot once and replayed in each worker, so the LM configuration from the surrounding Dsxir.context/2 reaches every row.

ev = %Dsxir.Evaluate{
  devset: trainset,
  metric: &amp;MyApp.Email.Metrics.classification/3,
  num_threads: 3,
  max_errors: 1
}

Dsxir.context(lm_frame.(), fn ->
  prog = Dsxir.Program.new(MyApp.Email.Processor)
  result = Dsxir.evaluate(ev, prog)

  %{score: result.score, errors: result.errors}
end)
%{errors: %{count: 2, by_class: %{framework: 2}}, score: 33.3}

result.results carries one row per example with the example, the prediction, the metric value, and any captured error. When :save_as is set, the rows are written to JSON Lines before the call returns.

Compiling with BootstrapFewShot

Dsxir.Optimizer.BootstrapFewShot slots labeled demos and then runs each example through Dsxir.with_trace/1 to harvest successful traces as bootstrapped demos.

tmp_dir = System.tmp_dir!()

Dsxir.context(lm_frame.(), fn ->
  prog = Dsxir.Program.new(MyApp.Email.Processor)

  {:ok, compiled, stats} =
    Dsxir.compile(
      Dsxir.Optimizer.BootstrapFewShot,
      prog,
      trainset,
      &amp;MyApp.Email.Metrics.classification/3,
      max_labeled_demos: 2,
      max_bootstrapped_demos: 2,
      threshold: 0.5
    )

  path = Path.join(tmp_dir, "email_processor.v1.json")
  Dsxir.save!(compiled, path)

  %{stats: stats, saved_to: path}
end)
%{
  stats: %{
    threshold: 0.5,
    error_count: 0,
    max_errors: 10,
    labeled_demos: 2,
    predictor_count: 4,
    bootstrapped_demos: 8,
    rounds: 1
  },
  saved_to: "/var/folders/89/2p5fpn1s6010ds0ck4rct_bc0000gn/T/email_processor.v1.json"
}

stats reports labeled and bootstrapped demo counts, the round count, the configured threshold, and how many per-example errors were swallowed (capped by :max_errors). Diversity during bootstrap is delivered by pushing a settings frame with temperature: 1.0 and a per-call nonce, so repeated runs produce different traces without touching the wire protocol from your code.

Loading the compiled program

path = Path.join(tmp_dir, "email_processor.v1.json")

Dsxir.context(lm_frame.(), fn ->
  prog = Dsxir.load!(MyApp.Email.Processor, path)
  {_prog, pred} = MyApp.Email.Processor.forward(prog, hd(emails))
  pred.fields
end)
%{
  deadline: nil,
  reasoning: "1. The email type is an order confirmation, which typically serves to verify that a purchase has been successfully made.\n2. The urgency level is marked as low, indicating that there is no immediate need for action.\n3. The email summary provides details about the order, including the item (MacBook Pro 16\"), price, estimated delivery date (December 15, 2026), order number (#12345), and tracking number.\n4. Since the delivery date is far in the future, there is no pressing need to act on this information right now.\n5. While the recipient should keep this information for their records, no immediate actions are required.",
  summary: "The email confirms your order of a MacBook Pro 16\" for $2,399.00, with an estimated delivery date of December 15, 2026. Your order number is #12345, and a tracking number is included for shipment tracking.",
  email_type: "order_confirmation",
  urgency: "low",
  key_entities: ["MacBook Pro 16\"", "Order #12345"],
  financial_amount: 2399.0,
  important_dates: ["December 15, 2026"],
  contact_info: [],
  action_required: false,
  action_items: [],
  priority_score: 2
}

load!/2 validates the artifact’s predictor names and field shape against the target module’s signatures. A drift surfaces as Dsxir.Errors.Invalid.SignatureMismatch with a structural diff, so a breaking signature change fails loudly rather than producing silently wrong demos.

Multi-tenant deployment

When the program is hosted behind a Phoenix endpoint, every request gets its own Dsxir.context/2 frame carrying the tenant’s API key and metadata. The framework merges :metadata into every telemetry event, so per-tenant token and cost dashboards work out of the box.

def call(conn, _opts) do
  tenant = conn.assigns.tenant

  Dsxir.context(
    [
      lm: {Dsxir.LM.Sycophant,
           [model: tenant.model_id, api_key: tenant.api_key]},
      cache: false,
      metadata: %{tenant_id: tenant.id, request_id: conn.assigns.request_id}
    ],
    fn ->
      program =
        Dsxir.load!(MyApp.Email.Processor,
                    "tenants/#{tenant.id}/email_processor.json")

      {_program, pred} =
        MyApp.Email.Processor.forward(program, %{
          email_subject: conn.params["subject"],
          email_body:    conn.params["body"],
          sender:        conn.params["sender"]
        })

      json(conn, pred.fields)
    end
  )
end

Where to go next

  • Swap Dsxir.Adapter.Chat for Dsxir.Adapter.Json when the upstream provider supports structured outputs — the Zoi enums then become hard-validated on the wire.
  • Attach [:dsxir, :predictor, :stop] handlers to record per-tenant token spend; the metadata map flows through unchanged.
  • Drop Dsxir.Predictor.ChainOfThought for the leaf predictors that do not benefit from reasoning (e.g. SummarizeEmail) — the reasoning field costs tokens.