Building Storage Systems with ExCodecs
Mix.install([
{:ex_codecs, path: Path.join(__DIR__, "..")},
{:kino, "~> 0.14"}
])
Practical Usage Patterns
The Basic Pattern
# Every compression operation in ExCodecs follows this pattern:
data = "Important data to store"
# Encode (compress)
{:ok, compressed} = ExCodecs.encode(:zstd, data)
# Decode (decompress) — always matches the codec used to encode
{:ok, recovered} = ExCodecs.decode(:zstd, compressed)
IO.puts("Round-trip OK: #{recovered == data}")
Choosing a Codec at Runtime
# Let the workload decide
defmodule StorageConfig do
def codec_for_content_type("application/json"), do: :zstd
def codec_for_content_type("application/octet-stream"), do: :lz4
def codec_for_content_type("application/x-array"), do: :blosc2
def codec_for_content_type(_), do: :zstd
end
content_type = "application/json"
codec = StorageConfig.codec_for_content_type(content_type)
IO.puts("Selected codec: #{inspect(codec)}")
IO.puts("Supported: #{ExCodecs.supports?(codec)}")
Building a Compressed Storage Layer
defmodule CompressedStore do
@moduledoc """
A simple key-value store with transparent compression.
Stores values compressed and decompresses on read.
Codec and options are configurable per-store instance.
"""
use Agent
defstruct [:codec, :opts, :table]
def start_link(opts \\ []) do
codec = Keyword.get(opts, :codec, :zstd)
codec_opts = Keyword.get(opts, :opts, [])
name = Keyword.get(opts, :name, __MODULE__)
Agent.start_link(fn ->
%__MODULE__{
codec: codec,
opts: codec_opts,
table: %{}
}
end, name: name)
end
def put(store \\ __MODULE__, key, value) do
Agent.get_and_update(store, fn state ->
case ExCodecs.encode(state.codec, value, state.opts) do
{:ok, compressed} ->
entry = %{
compressed: compressed,
original_size: byte_size(value),
codec: state.codec
}
{{:ok, byte_size(compressed)}, Map.put(state.table, key, entry)}
{:error, err} ->
{{:error, err}, state}
end
end)
end
def get(store \\ __MODULE__, key) do
Agent.get(store, fn state ->
case Map.get(state.table, key) do
nil -> {:error, :not_found}
entry -> ExCodecs.decode(entry.codec, entry.compressed)
end
end)
end
def info(store \\ __MODULE__) do
Agent.get(store, fn state ->
entries = Map.values(state.table)
total_original = Enum.sum(Enum.map(entries, & &1.original_size))
total_compressed = Enum.sum(Enum.map(entries, &byte_size(&1.compressed)))
%{
codec: state.codec,
entry_count: length(entries),
total_original_bytes: total_original,
total_compressed_bytes: total_compressed,
savings_pct: Float.round(100 * (1 - total_compressed / max(total_original, 1)), 1)
}
end)
end)
def delete(store \\ __MODULE__, key) do
Agent.get_and_update(store, fn state ->
{Map.get(state.table, key), Map.delete(state.table, key)}
end)
end
end
Using the Compressed Store
{:ok, _pid} = CompressedStore.start_link(codec: :zstd, level: 3)
# Store various data types
CompressedStore.put(:doc1, String.duplicate("Hello, World! ", 1000))
CompressedStore.put(:doc2, :crypto.strong_rand_bytes(4096) |> Base.encode64())
CompressedStore.put(:doc3, Jason.encode!(%{for i <- 1..100, do: %{id: i, value: :rand.uniform(1000)}}))
# Note: building JSON manually since we don't require Jason
json_data = Enum.map_join(1..100, ",", fn i ->
"{\"id\":#{i},\"value\":#{:rand.uniform(1000)}}"
end)
CompressedStore.put(:doc3, "[#{json_data}]")
# Retrieve and verify
{:ok, doc1} = CompressedStore.get(:doc1)
IO.puts("Retrieved doc1 (#{byte_size(doc1)} bytes)")
# Check storage efficiency
info = CompressedStore.info()
IO.puts("\nStorage Summary:")
IO.puts(" Codec: #{inspect(info.codec)}")
IO.puts(" Entries: #{info.entry_count}")
IO.puts(" Original: #{info.total_original_bytes} bytes")
IO.puts(" Compressed: #{info.total_compressed_bytes} bytes")
IO.puts(" Savings: #{info.savings_pct}%")
Multi-Codec Store
defmodule MultiCodecStore do
@moduledoc """
A store that selects the best codec per value.
Tries multiple codecs and picks the one with the best ratio.
Stores the chosen codec alongside the compressed data.
"""
use Agent
@codecs [:lz4, :snappy, :zstd, :bzip2]
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)
Agent.start_link(fn -> %{} end, name: name)
end
def put(store \\ __MODULE__, key, value) do
Agent.get_and_update(store, fn state ->
results = for codec <- @codecs do
{:ok, enc} = ExCodecs.encode(codec, value)
{codec, enc, byte_size(enc)}
end)
{best_codec, best_enc, best_size} =
Enum.min_by(results, fn {_, _, size} -> size end)
entry = %{
codec: best_codec,
compressed: best_enc,
original_size: byte_size(value),
compressed_size: best_size
}
{:ok, entry_info} = {entry, Map.put(state, key, entry)}
end)
end
def get(store \\ __MODULE__, key) do
Agent.get(store, fn state ->
case Map.get(state, key) do
nil -> {:error, :not_found}
entry -> ExCodecs.decode(entry.codec, entry.compressed)
end
end)
end
def stats(store \\ __MODULE__) do
Agent.get(store, fn state ->
state
|> Map.values()
|> Enum.group_by(& &1.codec)
|> Enum.map(fn {codec, entries} ->
{codec, length(entries),
Enum.sum(Enum.map(entries, & &1.original_size)),
Enum.sum(Enum.map(entries, & &1.compressed_size))}
end)
|> Enum.sort_by(&elem(&1, 1))
end)
end
end
Using the Multi-Codec Store
{:ok, _pid} = MultiCodecStore.start_link()
datasets = %{
repetitive: String.duplicate("abcabcabc", 5000),
semi_random: for(_ <- 1..10000, into: <<>>, do: <<:rand.uniform(255)>>),
floats: for(i <- 1..4096, into: <<>>, do: <<i * 0.1::float-size(64)-little>>)
}
for {name, data} <- datasets do
{:ok, entry} = MultiCodecStore.put(name, data)
IO.puts("#{name}: best codec = #{inspect(entry.codec)}, " <>
"#{entry.original_size} -> #{entry.compressed_size} bytes " <>
"(#{Float.round(100 * entry.compressed_size / entry.original_size, 1)}%)")
end
IO.puts("\nCodec distribution:")
for {codec, count, orig, comp} <- MultiCodecStore.stats() do
IO.puts(" #{inspect(codec)}: #{count} entries, " <>
"#{orig} -> #{comp} bytes")
end
Chunked Compression for Large Files
defmodule ChunkedCompressor do
@moduledoc """
Compresses large data in chunks to limit memory usage.
Each chunk is compressed independently, enabling:
- Parallel compression
- Random access on decompression
- Memory-bounded processing
"""
@default_chunk_size 64 * 1024 # 64 KB
def compress(data, codec \\ :zstd, opts \\ [], chunk_size \\ @default_chunk_size) do
chunks = chunk_binary(data, chunk_size)
compressed_chunks = Enum.map(chunks, fn chunk ->
{:ok, enc} = ExCodecs.encode(codec, chunk, opts)
enc
end)
# Header: number of chunks, then offsets
offsets = compressed_chunks
|> Enum.scan(0, fn chunk, acc -> acc + byte_size(chunk) end)
|> Enum.drop(-1)
|> List.insert_at(0, 0)
header = <<
length(compressed_chunks)::unsigned-big-32,
byte_size(data)::unsigned-big-64
>>
offset_data = for offset <- offsets, into: <<>>, do: <<offset::unsigned-big-32>>
{:ok, header <> offset_data <> IO.iodata_to_binary(compressed_chunks)}
end
def decompress(blob, codec \\ :zstd) do
<<num_chunks::unsigned-big-32, _original_size::unsigned-big-64, rest::binary>> = blob
offsets_size = num_chunks * 4
<<offsets::binary-size(offsets_size), chunks_binary::binary>> = rest
offsets_list = for <<offset::unsigned-big-32 <- offsets>>, do: offset
offsets_list = offsets_list ++ [byte_size(chunks_binary)]
chunks = Enum.chunk_every(offsets_list, 2, 1, :discard)
|> Enum.map(fn {start_off, end_off} ->
size = end_off - start_off
<<_::binary-size(start_off), chunk::binary-size(size), _::binary>> = chunks_binary
chunk
end)
decompressed = Enum.map(chunks, fn chunk ->
{:ok, dec} = ExCodecs.decode(codec, chunk)
dec
end)
{:ok, IO.iodata_to_binary(decompressed)}
end
defp chunk_binary(data, chunk_size) do
for <<chunk::binary-size(chunk_size) <- data>>, do: chunk
end
end
# Demo: chunked compression
large_data = String.duplicate("This is a chunk of data that repeats. ", 10000)
{:ok, compressed} = ChunkedCompressor.compress(large_data, :zstd)
{:ok, recovered} = ChunkedCompressor.decompress(compressed, :zstd)
IO.puts("Original: #{byte_size(large_data)} bytes")
IO.puts("Compressed: #{byte_size(compressed)} bytes")
IO.puts("Ratio: #{Float.round(100 * byte_size(compressed) / byte_size(large_data), 1)}%")
IO.puts("Round-trip: #{recovered == large_data}")
Error Handling Best Practices
# Pattern 1: Always handle both success and error
case ExCodecs.encode(:zstd, my_data) do
{:ok, compressed} ->
:ok = write_to_disk(compressed)
{:ok, compressed}
{:error, %ExCodecs.Error{reason: :unsupported_codec}} ->
# Codec not available — fall back or fail gracefully
{:error, :codec_missing}
{:error, %ExCodecs.Error{reason: :invalid_data} = err} ->
# Data is not binary — log and reject
IO.puts("Invalid data: #{err.message}")
{:error, :bad_input}
{:error, %ExCodecs.Error{reason: :compression_failed} = err} ->
# Compression itself failed — possibly corrupted or too large
IO.puts("Compression failed: #{err.message}")
{:error, :compress_failed}
end
# Pattern 2: Validate before compress
def safe_compress(codec, data, opts \\ []) do
with {:ok, true} <- {:check_codec, ExCodecs.supports?(codec)},
{:ok, true} <- {:check_binary, is_binary(data)},
{:ok, compressed} <- ExCodecs.encode(codec, data, opts),
{:ok, ^data} <- ExCodecs.decode(codec, compressed) do
{:ok, compressed}
else
{:check_codec, false} -> {:error, :unsupported_codec}
{:check_binary, false} -> {:error, :not_binary}
{:error, err} -> {:error, err}
end
end
{:ok, result} = safe_compress(:zstd, "verify round-trip")
IO.puts("Safe compress OK: #{is_binary(result)}")
# Pattern 3: Fallback codec chain
def compress_with_fallback(data, codecs \\ [:zstd, :lz4, :snappy]) do
codecs
|> Enum.reduce_while({:error, :no_codec_available}, fn codec, _acc ->
case ExCodecs.encode(codec, data) do
{:ok, compressed} -> {:halt, {:ok, {codec, compressed}}}
{:error, _} -> {:cont, {:error, :codec_failed}}
end
end)
end
{:ok, {used_codec, compressed}} = compress_with_fallback("fallback test data")
IO.puts("Used codec: #{inspect(used_codec)}")
IO.puts("Compressed: #{byte_size(compressed)} bytes")
ETS Integration
defmodule CompressedETS do
@moduledoc """
ETS-backed storage with transparent compression.
Stores compressed values in ETS for fast in-memory access
with reduced memory footprint.
"""
def new(name \\ :compressed_store) do
:ets.new(name, [:set, :public, :named_table])
name
end
def insert(table, key, value, codec \\ :zstd, opts \\ []) do
{:ok, compressed} = ExCodecs.encode(codec, value, opts)
:ets.insert(table, {key, compressed, codec, byte_size(value)})
{:ok, byte_size(compressed)}
end
def lookup(table, key) do
case :ets.lookup(table, key) do
[{^key, compressed, codec, _orig_size}] ->
ExCodecs.decode(codec, compressed)
[] ->
{:error, :not_found}
end
end
def stats(table) do
entries = :ets.tab2list(table)
total_orig = Enum.sum(Enum.map(entries, fn {_, _, _, orig} -> orig end))
total_comp = Enum.sum(Enum.map(entries, fn {_, comp, _, _} -> byte_size(comp) end))
%{
entries: length(entries),
original_bytes: total_orig,
compressed_bytes: total_comp,
savings_pct: Float.round(100 * (1 - total_comp / max(total_orig, 1)), 1)
}
end
end
table = CompressedETS.new()
for i <- 1..100 do
data = String.duplicate("item-#{rem(i, 10)} ", 500)
CompressedETS.insert(table, "key_#{i}", data)
end
stats = CompressedETS.stats(table)
IO.puts("ETS Compressed Store:")
IO.puts(" Entries: #{stats.entries}")
IO.puts(" Original: #{stats.original_bytes} bytes")
IO.puts(" Compressed: #{stats.compressed_bytes} bytes")
IO.puts(" Savings: #{stats.savings_pct}%")
# Verify retrieval
{:ok, val} = CompressedETS.lookup(table, "key_1")
IO.puts("\nRetrieved #{byte_size(val)} bytes successfully")
:ets.delete(table)
File Storage Integration
defmodule CompressedFileIO do
@moduledoc """
File I/O with transparent compression.
Writes compressed data to disk with a codec header for
self-describing files.
"""
@magic "EXCD"
def write(path, data, codec \\ :zstd, opts \\ []) do
{:ok, compressed} = ExCodecs.encode(codec, data, opts)
header = <<
@magic::binary,
codec_atom_to_bin(codec)::binary,
byte_size(data)::unsigned-big-64,
byte_size(compressed)::unsigned-big-64
>>
File.write(path, header <> compressed)
end
def read(path) do
case File.read(path) do
{:ok, <<@magic::binary, codec_bin::binary-size(8), _orig_size::unsigned-big-64, _comp_size::unsigned-big-64, compressed::binary>>} ->
codec = bin_to_codec_atom(codec_bin)
ExCodecs.decode(codec, compressed)
{:ok, _} ->
{:error, :invalid_format}
{:error, reason} ->
{:error, reason}
end
end
defp codec_atom_to_bin(atom) do
atom |> Atom.to_string() |> String.pad_trailing(8)
end
defp bin_to_codec_atom(bin) do
bin |> String.trim_trailing() |> String.to_atom()
end
end
# Demo: write and read compressed file
test_data = String.duplicate("Compressed file storage example. ", 2000)
path = Path.join(System.tmp_dir!(), "ex_codecs_demo.dat")
:ok = CompressedFileIO.write(path, test_data, :zstd, level: 9)
{:ok, recovered} = CompressedFileIO.read(path)
IO.puts("Wrote and read #{byte_size(test_data)} bytes")
IO.puts("Round-trip OK: #{recovered == test_data}")
IO.puts("File size: #{File.stat!(path).size} bytes")
File.rm(path)
Key Takeaways
- Wrap encode/decode in your own modules to centralize codec choice and error handling
- Always handle errors — codecs can fail on invalid input, memory pressure, etc.
- Store the codec name alongside compressed data so you know how to decompress
- Use ETS for compressed in-memory caches — significant memory savings
- Chunk large files — enables random access, parallel processing, and bounded memory
- Try multiple codecs for mixed workloads — different data compresses best with different algorithms
Next: Zarr-Style Workloads