Powered by AppSignal & Oban Pro

DiffSingerElixirPoC

simple_run.livemd

DiffSingerElixirPoC

# System.shell("mix hex.clean --all")
# System.shell("mix hex.config http_timeout 120")

# If the download fails, you can comment out all the git repositories first,
# and then install them incrementally one by one
# after the hex packages are installed.
Mix.install([
  {:orchid, "~> 0.5"},
  {:diff_singer, git: "https://github.com/GES233/DiffSinger.git"},
  {:orchid_stratum, "~> 0.1"},
  {:orchid_intervention, "~> 0.1"},
  {:kino, "~> 0.19.0"},
  {:vega_lite, "~> 0.1.8"},
  {:kino_vega_lite, "~> 0.1.11"},
  {:kino_benchee, "~> 0.1.0"}
])

:telemetry.attach(
  "orchid-step-exception-logger",
  [:orchid, :step, :exception],
  &Orchid.Runner.Hooks.Telemetry.error_handler/4,
  %{}
)

Fetch Model’s Metadata

Obtain the metadata and basic information of the sound library’s models as the skeleton for subsequent dependency graph construction.

Here, we take Qixuan v2.5.0 (for OpenUTAU), maintained by OpenVPI, as an example.

# If you have your local version
# Chenge the path to yours
model_root_path = "E:/ProgramAssets/OpenUTAUSingers/Qixuan_v2.5.0_DiffSinger_OpenUtau"
model_config = DiffSinger.VoiceBank.Config.fetch_overview(model_root_path)

:ok

Prelude Steps

This is a node that converts lyrics/MIDI into corresponding phoneme IDs and pitches (MIDI) based on a phoneme dictionary.

A primary reason for needing MIDI is to use its duration information as a reference for subsequent phoneme duration prediction.

A rasterized node will be implemented later to accommodate the processing of notes and phonemes.

defmodule CommonEncoder do
  @moduledoc """
  The lyrics and pitch are initially encoded for use in subsequent models.
  """

  def run_partial(%Orchid.Param{payload: words}, opts) do
   lang_dict = Keyword.fetch!(opts, :lang_dict)
    phoneme_dict = Keyword.fetch!(opts, :phoneme_dict)

    Enum.reduce(
      words,
      {[], [], [], [], []},
      fn {phonemes, duration, midi_note}, {acc_l, acc_t, acc_wdiv, acc_wdur, acc_midi} ->
        ph_count = length(phonemes)

        {curr_langs, curr_toks} =
          phonemes
          |> Enum.map(fn {lang, phone} -> {lang_dict[lang], phoneme_dict[phone]} end)
          |> Enum.unzip()

        curr_midis = List.duplicate(midi_note, ph_count)

        {
          acc_l ++ curr_langs,
          acc_t ++ curr_toks,
          acc_wdiv ++ [ph_count],
          acc_wdur ++ [duration],
          acc_midi ++ curr_midis
        }
      end)
  end
end
defmodule DurationPredictEncoder do
  use Orchid.Step

  def run(param, opts) do
    {langs, toks, w_div, w_dur, ph_midis} = CommonEncoder.run_partial(param, opts)

    lang_map_param = Orchid.Param.new(:lang_map, :payload, Nx.tensor([langs], type: :s64))
    phoneme_map_param = Orchid.Param.new(:phoneme_map, :payload, Nx.tensor([toks], type: :s64))
    word_div_param = Orchid.Param.new(:word_division, :payload, Nx.tensor([w_div], type: :s64))
    word_dur_param = Orchid.Param.new(:word_duration, :payload, Nx.tensor([w_dur], type: :s64))
    ph_midi_param = Orchid.Param.new(:ph_midi, :payload, Nx.tensor([ph_midis], type: :s64))

    {:ok, [lang_map_param, phoneme_map_param, word_div_param, word_dur_param, ph_midi_param]}
  end
end
defmodule PitchPredictEncoder do
  use Orchid.Step

  def run(%Orchid.Param{} = param, opts) do
    {langs, toks, _w_div, w_dur, ph_midis} = CommonEncoder.run_partial(param, opts)

    languages = Orchid.Param.new(:lang_map, :payload, Nx.tensor([langs], type: :s64))
    tokens = Orchid.Param.new(:phoneme_map, :payload, Nx.tensor([toks], type: :s64))
    word_dur = Orchid.Param.new(:word_duration, :payload, Nx.tensor([w_dur], type: :s64))
    ph_midi = Orchid.Param.new(:ph_midi, :payload, Nx.tensor([ph_midis], type: :s64))

    {:ok, [languages, tokens, word_dur, ph_midi]}
  end
end
# Used for Variance model
defmodule VarianceEncoder do
  use Orchid.Step

  def run(param, opts) do
    {langs, toks, _w_div, w_dur, ph_midis} = CommonEncoder.run_partial(param, opts)

    languages = Orchid.Param.new(:lang_map, :payload, Nx.tensor([langs], type: :s64))
    tokens = Orchid.Param.new(:phoneme_map, :payload, Nx.tensor([toks], type: :s64))
    word_dur = Orchid.Param.new(:word_duration, :payload, Nx.tensor([w_dur], type: :s64))
    ph_midi = Orchid.Param.new(:ph_midi, :payload, Nx.tensor([ph_midis], type: :s64))

    {:ok, [languages, tokens, word_dur, ph_midi]}
  end
end

Variance Model

Predict Phoneme Duration:

defmodule PredictDuration do
  @behaviour OrchidSymbiont.Step

  def required, do: [:duration_linguistic, :duration_predict]

  def run_with_model(
    [
      %Orchid.Param{payload: lang_map},
      %Orchid.Param{payload: phoneme_map},
      %Orchid.Param{payload: word_division},
      %Orchid.Param{payload: word_duration},
      %Orchid.Param{payload: phoneme_midi}
    ],
    handlers,
    _opts
  ) do
    duration_linguistic = handlers.duration_linguistic
    duration_predict = handlers.duration_predict

    inputs = {phoneme_map, lang_map, word_division, word_duration}

    {:ok, {encoder_out_tensor, mask_tensor}} = OrchidSymbiont.call(duration_linguistic, {:infer, inputs}, :infinity)

    {:ok, {result}} = OrchidSymbiont.call(duration_predict, {:infer, {encoder_out_tensor, mask_tensor, phoneme_midi}}, :infinity)

    {:ok, [
      Orchid.Param.new(:ph_dur_pred, :encoder_out, result)
    ]}
  end
end

Predict Pitch:

defmodule PredictPitch do
  @behaviour OrchidSymbiont.Step

  def required, do: [:pitch_linguistic, :pitch_predict]

  def run_with_model(
    [
      %Orchid.Param{payload: languages},
      %Orchid.Param{payload: phonemes},
      %Orchid.Param{payload: phoneme_duration},
      %Orchid.Param{payload: note_midi}
    ], handlers, opts) do

    ph_dur =
      phoneme_duration
      |> Nx.backend_transfer(Nx.BinaryBackend)
      |> Nx.round()
      |> Nx.as_type(:s64)

    {:ok, {encoder_out, _mask}} =
      OrchidSymbiont.call(
        handlers.pitch_linguistic,
        {:infer, {phonemes, languages, ph_dur}},
        :infinity
      )

    total_frames =
      ph_dur
      |> Nx.sum()
      |> Nx.to_number()

    key = Nx.Random.key(System.system_time())

    {pitch_noise, _key} =
      Nx.Random.normal(
        key,
        0.0,
        1.0,
        shape: {1, total_frames},
        type: :f32
      )

    expr =
      Nx.broadcast(Nx.tensor(1.0, type: :f32), {1, total_frames})

    retake =
      Nx.broadcast(Nx.tensor(1, type: :u8), {1, total_frames})

    note_rest =
      note_midi
      |> Nx.equal(0)
      |> Nx.as_type(:u8)

    note_midi = note_midi |> Nx.as_type(:f32)

    note_dur = ph_dur

    steps =
      opts
      |> Keyword.get(:steps, 20)
      |> Nx.tensor(type: :s64)

    {:ok, {pitch_pred}} =
      OrchidSymbiont.call(
        handlers.pitch_predict,
        {:infer,
          {
            encoder_out,
            ph_dur,
            note_midi,
            note_rest,
            note_dur,
            pitch_noise,
            expr,
            retake,
            steps
          }},
        :infinity
      )

    {:ok, Orchid.Param.new(:pitch_pred, :payload, pitch_pred)}
  end
end
defmodule MIDIToPitch do
  use Orchid.Step

  def run(%Orchid.Param{payload: midi_pred}, _opts) do
    {:ok, midi_pred
      |> Nx.backend_transfer(Nx.BinaryBackend)
      |> Nx.add(-69.0)
      |> Nx.divide(12.0)
      |> then(&Nx.pow(2.0, &1))
      |> Nx.multiply(440)
      |> then(fn converted_f0 ->
           Nx.select(Nx.less(Nx.backend_transfer(midi_pred, Nx.BinaryBackend), 0.0), Nx.tensor(0.0), converted_f0)
         end)
      |> then(&Orchid.Param.new(:f0, :tensor, &1))}
  end
end

Other Variance Params:

defmodule VarianceModel do
  @behaviour OrchidSymbiont.Step

  def required, do: [:variance_linguistic, :variance]

  def run_with_model(
    [
      %Orchid.Param{payload: languages},
      %Orchid.Param{payload: phonemes},
      %Orchid.Param{payload: ph_dur},
      %Orchid.Param{payload: pitch}
    ],
    handlers,
    opts
  ) do

    ph_dur =
      ph_dur
      |> Nx.backend_transfer(Nx.BinaryBackend)
      |> Nx.round()
      |> Nx.as_type(:s64)

    {:ok, {encoder_out, _mask}} =
      OrchidSymbiont.call(
        handlers.variance_linguistic,
        {:infer, {phonemes, languages, ph_dur}}
      )

    total_frames =
      ph_dur
      |> Nx.sum()
      |> Nx.to_number()

    key = Nx.Random.key(System.system_time())

    {breath_noise, key} =
      Nx.Random.normal(key, 0.0, 1.0,
        shape: {1, total_frames},
        type: :f32
      )

    {voice_noise, _key} =
      Nx.Random.normal(key, 0.0, 1.0,
        shape: {1, total_frames},
        type: :f32
      )

    retake =
      Nx.broadcast(
        Nx.tensor([1,1], type: :u8),
        {1, total_frames, 2}
      )

    steps =
      opts
      |> Keyword.get(:steps, 20)
      |> Nx.tensor(type: :s64)

    {:ok, {breath_pred, voice_pred}} =
      OrchidSymbiont.call(
        handlers.variance,
        {:infer,
          {
            encoder_out,
            ph_dur,
            pitch,
            breath_noise,
            voice_noise,
            retake,
            steps
          }},
        :infinity
      )

    {:ok, [
      Orchid.Param.new(:breathiness, :payload, breath_pred),
      Orchid.Param.new(:voicing, :payload, voice_pred)
    ]}
  end
end

Acoustic Model and Vocoder

Acoustic Model:

defmodule Acoustic do
  @behaviour OrchidSymbiont.Step

  def required, do: [:acoustic]

  def run_with_model(
    [
      %Orchid.Param{payload: languages},
      %Orchid.Param{payload: phonemes},
      %Orchid.Param{payload: phoneme_duration},
      %Orchid.Param{payload: pitch},
      %Orchid.Param{payload: breathiness},
      %Orchid.Param{payload: voicing}
    ],
    handlers,
    opts
  ) do

    durations =
      phoneme_duration
      |> Nx.backend_transfer(Nx.BinaryBackend)
      |> Nx.round()
      |> Nx.as_type(:s64)

    frames = Nx.axis_size(pitch, 1)

    gender =
      Nx.broadcast(Nx.tensor(0.0, type: :f32), {1, frames})

    velocity =
      Nx.broadcast(Nx.tensor(1.0, type: :f32), {1, frames})

    steps =
      opts
      |> Keyword.get(:steps, 20)
      |> Nx.tensor(type: :s64)

    depth =
      opts
      |> Keyword.get(:depth, 1.0)
      |> Nx.tensor(type: :f32)

    {:ok, {mel}} =
      OrchidSymbiont.call(
        handlers.acoustic,
        {:infer,
          {
            phonemes,
            languages,
            durations,
            pitch,
            breathiness,
            voicing,
            gender,
            velocity,
            depth,
            steps
          }},
        :infinity
      )

    {:ok, Orchid.Param.new(:mel, :payload, mel)}
  end
end

Vocoder:

defmodule NSFHifiGAN_Vocoder do
  @behaviour OrchidSymbiont.Step

  def required, do: [:vocoder]

  def run_with_model(
    [%Orchid.Param{payload: mel}, %Orchid.Param{payload: f0}],
    handlers,
    _opts
  ) do

    {:ok, {audio}} = OrchidSymbiont.call(handlers.vocoder, {:infer, {mel, f0}})

    {:ok, Orchid.Param.new(:audio, :payload, audio)}
  end
end

Post-process (tensor to audio)

defmodule TensorToWave do
  use Orchid.Step

   def run(%Orchid.Param{payload: wave_tensor}, opts) do
    pcm_data =
      wave_tensor
      |> Nx.flatten()
      |> Nx.backend_transfer(Nx.BinaryBackend)
      |> Nx.multiply(32767.0)
      |> Nx.clip(-32768.0, 32767.0)
      |> Nx.as_type(:s16)
      |> Nx.to_binary()

    sample_rate = Keyword.get(opts, :sample_rate, 44100)
    byte_rate = sample_rate * 1 * 2  # sample_rate * channels * bytes_per_sample (Int16=2)
    data_size = byte_size(pcm_data)
    file_size = 36 + data_size

    header = <<
      "RIFF", file_size::little-integer-size(32), "WAVE",
      "fmt ", 16::little-integer-size(32),       # Subchunk1Size
      1::little-integer-size(16),                # AudioFormat (1 = PCM)
      1::little-integer-size(16),                # NumChannels (1 = Mono)
      sample_rate::little-integer-size(32),      # SampleRate
      byte_rate::little-integer-size(32),        # ByteRate
      2::little-integer-size(16),                # BlockAlign (channels * 2)
      16::little-integer-size(16),               # BitsPerSample (16 bits)
      "data", data_size::little-integer-size(32) # Subchunk2Size
    >>

    File.write!("E:/final.wav", header <> pcm_data)

    {:ok, Orchid.Param.new(:final, :audio, header <> pcm_data)}
  end
end

Build Pipeline and Prepare to Demo

defmodule QixuanPipeline do
  require Logger

  def load_models(model_root_path, model_config) do
    Logger.info("Loading DiffSinger models...")

    models =[
      {:duration_linguistic, model_config.predict_map.maybe_duration.linguistic.path},
      {:duration_predict,    model_config.predict_map.maybe_duration.duration.path},
      {:pitch_linguistic,    model_config.predict_map.maybe_pitch.linguistic.path},
      {:pitch_predict,       model_config.predict_map.maybe_pitch.predict.path},
      {:variance_linguistic, model_config.variance.linguistic.path},
      {:variance,            model_config.variance.variance.path},
      {:acoustic,            model_config.acoustic.infer.path},
      {:vocoder,             model_config.vocoder.path}
    ]

    for {name, rel_path} <- models do
      path = Path.join([model_root_path] ++ rel_path)
      :ok = OrchidSymbiont.register(name, {Orchid.Symbiont.OrtexRunner, [name: name, path: path]})
    end

    Logger.info("All models loaded successfully.")
    :ok
  end

  def build_recipe(model_config) do
    injector = [extra_hooks_stack: [OrchidSymbiont.Hooks.Injector]]

    dur_dict   = model_config.predict_map.maybe_duration.phonemes
    pitch_dict = model_config.predict_map.maybe_pitch.phonemes
    var_dict   = model_config.variance.phonemes
    sample_rate = model_config.vocoder.maybe_config["sample_rate"]

    duration_steps =[
      {DurationPredictEncoder, :words,[:duration_lang, :duration_phoneme, :word_division, :word_duration, :duration_ph_midi],[lang_dict: dur_dict.maybe_lang_dict, phoneme_dict: dur_dict.phoneme_dict]},
      {PredictDuration,[:duration_lang, :duration_phoneme, :word_division, :word_duration, :duration_ph_midi], 
        :phoneme_duration_predict, injector}
    ]

    pitch_steps = [
      {PitchPredictEncoder, :words,[:pitch_lang, :pitch_phoneme, :word_duration_from_pitch, :pitch_ph_midi],[lang_dict: pitch_dict.maybe_lang_dict, phoneme_dict: pitch_dict.phoneme_dict]},
      {PredictPitch,[:pitch_lang, :pitch_phoneme, :phoneme_duration_predict, :pitch_ph_midi], 
        :pitch_pred_midi, injector},
      {MIDIToPitch, :pitch_pred_midi, :pitch_pred}
    ]

    variance_steps = [
      {VarianceEncoder, :words,[:variance_lang, :variance_phoneme, :word_duration_from_variance, :variance_ph_midi],[lang_dict: var_dict.maybe_lang_dict, phoneme_dict: var_dict.phoneme_dict]},
      {VarianceModel,[:variance_lang, :variance_phoneme, :phoneme_duration_predict, :pitch_pred_midi],[:breathiness_pred, :voice_pred], injector}
    ]

    acoustic_step =[
      {Acoustic,[:variance_lang, :variance_phoneme, :phoneme_duration_predict, :pitch_pred, :breathiness_pred, :voice_pred], 
        :mel, injector}
    ]

    vocoder_step = [
      {NSFHifiGAN_Vocoder,[:mel, :pitch_pred], :wave_tensor, injector},
      {TensorToWave, :wave_tensor, :audio,[sample_rate: sample_rate]}
    ]

    all_steps = duration_steps ++ pitch_steps ++ variance_steps ++ acoustic_step ++ vocoder_step

    Orchid.Recipe.new(all_steps)
  end
end

Add a tracker

defmodule Orchid.Livebook.GanttTracker do
  use Agent

  def start_link(_) do
    Agent.start_link(fn ->[] end, name: __MODULE__)
  end

  def handle_event([:orchid, :step, status], measurements, meta, _config) do
    run_id = List.first(Process.get(:"$callers")) || self()
    run_id_str = inspect(run_id)

    time_val = if status == :start, do: measurements.system_time, else: measurements.duration

    Agent.update(__MODULE__, fn state ->[{run_id_str, status, time_val, meta} | state]
    end)
  end

  def get_spans() do
    events = Agent.get(__MODULE__, &amp; &amp;1) |> Enum.reverse()

        {_, spans} = Enum.reduce(events, {%{},[]}, fn
      {run_id, :start, sys_time, meta}, {pending, spans} ->
        start_ms = System.convert_time_unit(sys_time, :native, :microsecond) / 1000.0
        sig = {run_id, meta.impl, meta.in_keys, meta.out_keys}
        {Map.put(pending, sig, start_ms), spans}

      {run_id, status, duration, meta}, {pending, spans} when status in[:done, :exception, :special] ->
        duration_ms = System.convert_time_unit(duration, :native, :microsecond) / 1000.0
        sig = {run_id, meta.impl, meta.in_keys, meta.out_keys}
        
        case Map.pop(pending, sig) do
          {start_ms, new_pending} when not is_nil(start_ms) ->
            impl_name = if is_function(meta.impl), do: "Anonymous", else: inspect(meta.impl) |> String.split(".") |> List.last()
            out_keys = inspect(meta.out_keys) |> String.slice(0..14) |> Kernel.<>("...")
            
            span = %{
              run_id: run_id,
              step: "#{impl_name}(#{out_keys})",
              start: start_ms,
              end: start_ms + duration_ms,
              visual_end: start_ms + max(duration_ms, 10.0),
              duration: duration_ms,
              status: status
            }
            {new_pending, [span | spans]}
          {nil, ^pending} -> {pending, spans}
        end
    end)
    
    spans
  end

  def clear(), do: Agent.update(__MODULE__, fn _ ->[] end)
end
Kino.start_child({Orchid.Livebook.GanttTracker, []})

events =[
  {[:orchid, :step, :start], "start"},
  {[:orchid, :step, :done], "done"},
  {[:orchid, :step, :exception], "exception"},
  {[:orchid, :step, :special], "special"}
]

for {event, suffix} <- events do
  :telemetry.attach(
    "orchid-gantt-#{suffix}",
    event,
    &amp;Orchid.Livebook.GanttTracker.handle_event/4,
    nil
  )
end
defmodule MermaidRenderer do

  def render(%Orchid.Recipe{} = recipe) do
    mermaid_code = generate_mermaid(recipe.steps)

    Kino.Mermaid.new(mermaid_code)
  end

  defp generate_mermaid(steps) do
    header = "graph TD\n"

    body =
      steps
      |> Enum.with_index()
      |> Enum.flat_map(fn {step, idx} ->
        {impl, in_keys, out_keys, _opts} = Orchid.Step.ensure_full_step(step)

        step_id = "step_#{idx}"
        step_name = format_impl(impl)
        step_node = ~s|    #{step_id}["⚙️ #{step_name}"]:::stepClass|

        in_edges =
          in_keys
          |> normalize_keys()
          |> Enum.map(fn k -> ~s|    #{k}(["#{k}"]) -.-> #{step_id}| end)

        out_edges =
          out_keys
          |> normalize_keys()
          |> Enum.map(fn k -> ~s|    #{step_id} ==> #{k}(["#{k}"])| end)

        [step_node] ++ in_edges ++ out_edges
      end)
      |> Enum.uniq()
      |> Enum.join("\n")


    styles = """

        classDef stepClass fill:#2eb82e,stroke:#fff,stroke-width:2px,color:#fff;
    """

    header <> body <> styles
  end

  defp normalize_keys(keys) do
    keys
    |> Orchid.Step.ID.normalize_keys_to_set()
    |> MapSet.to_list()
    |> Enum.map(&amp;to_string/1)
  end

  defp format_impl(impl) when is_function(impl), do: "Anonymous Function"
  defp format_impl(impl) do
    inspect(impl) |> String.replace_prefix("Elixir.", "")
  end
end

Pitch Intervention

defmodule PitchOffset do
  @behaviour OrchidIntervention.Operate

  def short_circuit?, do: false
  def data_enable, do: {true, true}

  def merge(inner_payload, intervention_payload) do
    inner_payload = Nx.backend_transfer(inner_payload, Nx.BinaryBackend)
    intervention_payload = Nx.backend_transfer(intervention_payload, Nx.BinaryBackend)
    {:ok, Nx.add(inner_payload, intervention_payload)}
  end
end

Run Workflow

QixuanPipeline.load_models(model_root_path, model_config)

recipe = QixuanPipeline.build_recipe(model_config)

inputs = [
  %Orchid.Param{name: :words, payload: [
    {[{"zh", "AP"}], 10, 0},
    {[{"zh", "zh/l"}, {"zh", "zh/iang"}], 40, 60},
    {[{"zh", "zh/zh"}, {"zh", "zh/i"}], 40, 62},
    {[{"zh", "zh/l"}, {"zh", "zh/ao"}], 40, 64},
    {[{"zh", "zh/h"}, {"zh", "zh/u"}], 40, 60},

    {[{"zh", "AP"}], 1, 0},

    {[{"zh", "zh/l"}, {"zh", "zh/iang"}], 40, 60},
    {[{"zh", "zh/zh"}, {"zh", "zh/i"}], 40, 62},
    {[{"zh", "zh/l"}, {"zh", "zh/ao"}], 40, 64},
    {[{"zh", "zh/h"}, {"zh", "zh/u"}], 40, 60}
  ]},
]

interventions = %{
  :pitch_pred_midi => {PitchOffset, Orchid.Param.new(:pitch_pred_midi, :payload, Nx.tensor(12.0, type: :f32))}
}

Orchid.Livebook.GanttTracker.clear()

{elapse, {:ok, results}} = :timer.tc(
  &amp;Orchid.run/3,
  [recipe, inputs, [
    global_hooks_stack: [Orchid.Hook.ApplyInterventions],
    baggage: %{interventions: interventions}
  ]],
  :microsecond
)

require Logger
Logger.info "Used #{elapse / 1000}ms."

Show Graph

MermaidRenderer.render recipe

Execution Timeline:

alias VegaLite, as: Vl

spans = Orchid.Livebook.GanttTracker.get_spans()
min_start = spans |> Enum.map(&amp; &amp;1.start) |> Enum.min(fn -> 0 end)

chart_data =
  Enum.map(spans, fn span ->
    %{
      "step" => span.step,
      "start" => span.start - min_start,
      "end" => span.visual_end - min_start,
      "duration" => Float.round(span.duration, 2),
      "status" => Atom.to_string(span.status)
    }
  end)

Vl.new(width: 600, height: 250, title: "Orchid Workflow Execution Timeline")
|> Vl.data_from_values(chart_data)
|> Vl.mark(:bar, corner_radius: 4, height: 20)
|> Vl.encode_field(:y, "step", type: :nominal, title: "Steps", sort: [field: "start", op: "min"], axis: [labelLimit: 400])
|> Vl.encode_field(:x, "start", type: :quantitative, title: "Time Offset (ms)")
|> Vl.encode_field(:x2, "end")
|> Vl.encode_field(:color, "status", type: :nominal, title: "Status", scale: [range: ["#2eb82e", "#d9534f"]])
|> Vl.encode_field(:tooltip, "duration", type: :quantitative, title: "Duration (ms)")
|> Kino.VegaLite.new()

Prepare for the demo:

alias VegaLite, as: Vl


mel_tensor =
  results.mel.payload[0]
  |> Nx.backend_transfer(Nx.BinaryBackend)

mel_data =
  mel_tensor
  |> Nx.to_batched(1)
  |> Enum.with_index()
  |> Enum.flat_map(fn {row_tensor, frame_idx} ->
    row_tensor
    |> Nx.to_flat_list()
    |> Enum.with_index()
    |> Enum.map(fn {val, bin_idx} ->
      %{
        "x1" => frame_idx, "x2" => frame_idx + 1,
        "y1" => bin_idx, "y2" => bin_idx + 1,
        "value" => val
      }
    end)
  end)

f0_data =
  results.pitch_pred.payload[0]
  |> Nx.backend_transfer(Nx.BinaryBackend)
  |> Nx.to_flat_list()
  |> Enum.with_index()
  |> Enum.map(fn {f0, frame} -> %{"frame" => frame, "f0" => f0} end)

durations = results.phoneme_duration_predict.payload[0]
  |> Nx.backend_transfer(Nx.BinaryBackend)
  |> Nx.to_flat_list()

boundaries = Enum.scan(durations, 0, fn dur, acc -> dur + acc end)
boundary_data = Enum.map(boundaries, fn b -> %{"frame" => b} end)


mel_layer =
  Vl.new()
  |> Vl.data_from_values(mel_data)
  |> Vl.mark(:rect, tooltip: false, stroke: nil)
  |> Vl.encode_field(:x, "x1", type: :quantitative, title: "Time (Frames)")
  |> Vl.encode_field(:x2, "x2")
  |> Vl.encode_field(:y, "y1", type: :quantitative, title: "Mel Frequency Bin")
  |> Vl.encode_field(:y2, "y2")
  |> Vl.encode_field(:color, "value",
    type: :quantitative,
    scale: [scheme: "greys", reverse: true, domain: [-11.0, 2.0]],
    legend: false
  )

f0_layer =
  Vl.new()
  |> Vl.data_from_values(f0_data)
  |> Vl.mark(:line, color: "#007bff", strokeWidth: 2)
  |> Vl.encode_field(:x, "frame", type: :quantitative)
  |> Vl.encode_field(:y, "f0",
    type: :quantitative,
    title: "Pitch (Hz)",
    axis: [orient: "right", titleColor: "#007bff"],
    scale: [domain: [50, 800]]
  )

boundary_layer =
  Vl.new()
  |> Vl.data_from_values(boundary_data)
  |> Vl.mark(:rule, color: "red", strokeDash: [4, 4], strokeWidth: 1.5)
  |> Vl.encode_field(:x, "frame", type: :quantitative)

:ok

Demostration

File.write!("E:/final.wav", results.audio.payload)

Kino.Audio.new(results.audio.payload, :wav)
Vl.new(width: 800, height: 400, title: "Output Analysis (Mel Spectrum + F0 + Phoneme Duration)")
|> Vl.resolve(:scale, y: :independent)
|> Vl.layers([mel_layer, f0_layer, boundary_layer])
|> Kino.VegaLite.new()

Multiple Pipeline

Try concurrent.

Kino.start_child({Orchid.Livebook.GanttTracker,[]})
Orchid.Livebook.GanttTracker.clear()

pipeline_tasks = 
  for _ <- 1..3 do
    Task.async(fn ->
      Orchid.run(recipe, inputs, [executor_and_opts: {Orchid.Executor.Async, []},
      baggage: %{
        meta_store: MetaStore,
        blob_store: BlobStore
      },
      global_hooks_stack: [OrchidStratum.BypassHook]
    ])
    end)
  end


{elapse, _res} = fn -> Task.await_many(pipeline_tasks, :infinity) end
|> :timer.tc

# Est: T = 4 * Concurrent + 3
Logger.info "Total used #{elapse / 1000_000}s."
alias VegaLite, as: Vl

spans = Orchid.Livebook.GanttTracker.get_spans()
min_start = spans |> Enum.map(&amp; &amp;1.start) |> Enum.min(fn -> 0 end)

chart_data = 
  Enum.map(spans, fn span ->
    %{
      "run_id" => span.run_id,
      "step" => span.step,
      "start" => (span.start - min_start) / 1000,
      "end" => (span.end - min_start) / 1000,
      "duration" => Float.round(span.duration, 2),
      "status" => Atom.to_string(span.status)
    }
  end)

Vl.new(width: 600, height: 250, title: "Concurrent Orchid Workflows")
|> Vl.data_from_values(chart_data)
|> Vl.encode_field(:row, "run_id", type: :nominal, title: "Pipeline Instance (PID)")
|> Vl.mark(:bar, corner_radius: 4, height: 20)
|> Vl.encode_field(:y, "step",
     type: :nominal,
     title: nil,
     sort: [field: "start", op: "min"],
     axis: [labelLimit: 300]
   )
|> Vl.encode_field(:x, "start", type: :quantitative, title: "Global Time (ms)")
|> Vl.encode_field(:x2, "end")
|> Vl.encode_field(:color, "status", type: :nominal, scale: [range: ["#2eb82e", "#d9534f"]])
|> Vl.encode_field(:tooltip, "duration", type: :quantitative, title: "Duration (ms)")
|> Vl.resolve(:scale, x: :shared)
|> Kino.VegaLite.new()

Cache

metaref = OrchidStratum.MetaStorage.EtsAdapter.init()
blobref = OrchidStratum.BlobStorage.EtsAdapter.init()

cache_recipe_opts = [
  executor_and_opts: {Orchid.Executor.Async, []},
  baggage: %{
    meta_store: {OrchidStratum.MetaStorage.EtsAdapter, metaref},
    blob_store: {OrchidStratum.BlobStorage.EtsAdapter, blobref}
  },
  global_hooks_stack: [OrchidStratum.BypassHook]
]

recipe_with_cache = Orchid.Recipe.walk(
  recipe.steps,
  fn step ->
    case step do
      {impl, i, o} -> {impl, i, o, [cache: true]}
        {impl, i, o, old_opts} ->
          {impl, i, o, old_opts ++
            [cache: true,
              cache_keys: Keyword.keys(old_opts)
            ]
          }
    end
  end
)

# Prewarm
Orchid.run(recipe_with_cache, inputs, cache_recipe_opts)

defmodule BenchmarkRunner do
  def run_without_cache(recipe, inputs) do
    Orchid.run(recipe, inputs, [executor_and_opts: {Orchid.Executor.Async, []}])
  end

  def run_with_cache(recipe_with_cache, inputs, cache_recipe_opts) do
    Orchid.run(recipe_with_cache, inputs, cache_recipe_opts)
  end
end

Benchee.run(
  %{
    "run_without_cache" => fn -> BenchmarkRunner.run_without_cache(recipe, inputs) end,
    "run_with_cache"    => fn -> BenchmarkRunner.run_with_cache(recipe_with_cache, inputs, cache_recipe_opts) end
  },
  time: 3,
  memory_time: 3,
  reduction_time: 2
)