Mail Aggregator System
Setup
Mix.install([
{:phoenix_live_view, "~> 0.20.17"},
{:notmuch, "~> 0.1"}, # Notmuch Elixir binding
{:imap, "~> 0.3"}, # For IMAP support
{:swoosh, "~> 1.5"}, # Email handling
{:kino, "~> 0.11.0"},
{:timex, "~> 3.7"},
{:jason, "~> 1.4"}
])
defmodule MailAggregator do
@moduledoc """
Aggregates and manages multiple email sources
"""
defmodule Source do
defstruct [:id, :type, :config, :last_sync, :status]
end
defmodule Newsletter do
defstruct [:id, :source_id, :subject, :content, :received_at, :tags, metadata: %{}]
end
def source_types do
%{
"notmuch" => %{
name: "Notmuch",
config_schema: [
db_path: :string,
query: :string,
tags: [:string]
],
handler: MailAggregator.Sources.Notmuch
},
"imap" => %{
name: "IMAP",
config_schema: [
host: :string,
port: :integer,
username: :string,
password: :string,
folder: :string,
ssl: :boolean
],
handler: MailAggregator.Sources.IMAP
},
"asteroid" => %{
name: "Asteroid Mail",
config_schema: [
api_endpoint: :string,
api_key: :string,
folders: [:string]
],
handler: MailAggregator.Sources.Asteroid
}
}
end
end
defmodule MailAggregator.Sources.Notmuch do
@moduledoc """
Notmuch email source handler
"""
def sync(config) do
with {:ok, db} <- Notmuch.DB.open(config.db_path),
{:ok, query} <- Notmuch.Query.create(db, config.query),
{:ok, threads} <- Notmuch.Query.search_threads(query) do
threads
|> Enum.map(&process_thread(&1, config))
|> Enum.filter(&(&1 != nil))
else
error -> {:error, error}
end
end
defp process_thread(thread, config) do
with {:ok, messages} <- Notmuch.Thread.get_messages(thread),
message <- List.first(messages),
true <- should_process?(message, config) do
%MailAggregator.Newsletter{
id: Notmuch.Message.get_message_id(message),
source_id: "notmuch",
subject: Notmuch.Message.get_header(message, "subject"),
content: get_content(message),
received_at: Notmuch.Message.get_date(message),
tags: Notmuch.Message.get_tags(message)
}
else
_ -> nil
end
end
defp should_process?(message, config) do
tags = Notmuch.Message.get_tags(message)
Enum.any?(config.tags, &(&1 in tags))
end
end
defmodule MailAggregator.Sources.Asteroid do
@moduledoc """
Asteroid Mail source handler
"""
def sync(config) do
# Implementation would depend on Asteroid Mail's API
with {:ok, client} <- connect(config),
{:ok, messages} <- fetch_messages(client, config.folders) do
messages
|> Enum.map(&process_message/1)
end
end
defp connect(config) do
# Implement Asteroid Mail API connection
{:ok, %{endpoint: config.api_endpoint, key: config.api_key}}
end
end
defmodule MailAggregator.Aggregator do
@moduledoc """
Manages newsletter aggregation from multiple sources
"""
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
schedule_sync()
{:ok, %{sources: [], newsletters: [], last_sync: nil}}
end
def handle_info(:sync, state) do
new_state = sync_all_sources(state)
schedule_sync()
{:noreply, new_state}
end
defp sync_all_sources(state) do
newsletters = state.sources
|> Enum.map(&sync_source/1)
|> List.flatten()
|> sort_and_deduplicate()
%{state |
newsletters: newsletters,
last_sync: DateTime.utc_now()
}
end
defp schedule_sync do
Process.send_after(self(), :sync, :timer.minutes(15))
end
end
LiveView Interface
defmodule MailAggregatorWeb.NewsletterLive do
use Phoenix.LiveView
def mount(_params, _session, socket) do
if connected?(socket) do
MailAggregator.Aggregator.subscribe()
end
{:ok, assign(socket,
newsletters: MailAggregator.Aggregator.get_newsletters(),
filters: %{
sources: [],
tags: [],
search: nil
},
view_mode: :grid
)}
end
def render(assigns) do
~H"""
Sources
<%= for source <- @sources do %>
<%= source.name %>
<% end %>
Tags
<%= for tag <- available_tags(@newsletters) do %>
<%= tag %>
<% end %>
Grid
List
<%= for newsletter <- filtered_newsletters(@newsletters, @filters) do %>
<%= newsletter.subject %>
<%= newsletter.source_id %>
<%= Timex.format!(newsletter.received_at, "{relative}") %>
<%= for tag <- newsletter.tags do %>
<%= tag %>
<% end %>
<%= preview_content(newsletter.content) %>
<% end %>
"""
end
def handle_event("toggle_source", %{"id" => source_id}, socket) do
{:noreply, update_filters(socket, :sources, source_id)}
end
def handle_event("search", %{"value" => search}, socket) do
{:noreply, put_in(socket.assigns.filters.search, search)}
end
defp filtered_newsletters(newsletters, filters) do
newsletters
|> filter_by_sources(filters.sources)
|> filter_by_tags(filters.tags)
|> filter_by_search(filters.search)
end
end
Configuration Example
# config/config.exs
config :mail_aggregator, MailAggregator,
sources: [
%{
id: "notmuch_main",
type: "notmuch",
config: %{
db_path: "~/.mail",
query: "tag:newsletter",
tags: ["newsletter", "important"]
}
},
%{
id: "asteroid_work",
type: "asteroid",
config: %{
api_endpoint: "https://mail.example.com/api",
api_key: {:system, "ASTEROID_API_KEY"},
folders: ["Newsletters", "Updates"]
}
}
]
Key features:
-
Multiple Source Support:
- Notmuch integration
- Asteroid Mail integration
- IMAP fallback
- Extensible source system
-
Smart Aggregation:
- Deduplication
- Tag management
- Search capabilities
- Configurable sync intervals
-
Rich UI:
- Grid/List views
- Filtering by source/tags
- Search functionality
- Real-time updates
-
Content Processing:
- Newsletter detection
- Content extraction
- Preview generation
- Metadata enrichment
To set up:
-
Configure Notmuch:
notmuch setup notmuch new
-
Configure Asteroid Mail (if using):
config :mail_aggregator, :asteroid, api_endpoint: "https://your-asteroid-instance.com", api_key: System.get_env("ASTEROID_API_KEY")
-
Start the aggregator:
MailAggregator.start_link()
Would you like me to expand on any particular aspect or add more features?