DiffSingerElixirPoC
# System.shell("mix hex.clean --all")
# System.shell("mix hex.config http_timeout 120")
# If the download fails, you can comment out all the git repositories first,
# and then install them incrementally one by one
# after the hex packages are installed.
Mix.install([
{:orchid, "~> 0.5"},
{:diff_singer, git: "https://github.com/GES233/DiffSinger.git"},
{:orchid_stratum, "~> 0.1"},
{:orchid_intervention, "~> 0.1"},
{:kino, "~> 0.19.0"},
{:vega_lite, "~> 0.1.8"},
{:kino_vega_lite, "~> 0.1.11"},
{:kino_benchee, "~> 0.1.0"}
])
:telemetry.attach(
"orchid-step-exception-logger",
[:orchid, :step, :exception],
&Orchid.Runner.Hooks.Telemetry.error_handler/4,
%{}
)
Fetch Model’s Metadata
Obtain the metadata and basic information of the sound library’s models as the skeleton for subsequent dependency graph construction.
Here, we take Qixuan v2.5.0 (for OpenUTAU), maintained by OpenVPI, as an example.
# If you have your local version
# Chenge the path to yours
model_root_path = "E:/ProgramAssets/OpenUTAUSingers/Qixuan_v2.5.0_DiffSinger_OpenUtau"
model_config = DiffSinger.VoiceBank.Config.fetch_overview(model_root_path)
:ok
Prelude Steps
This is a node that converts lyrics/MIDI into corresponding phoneme IDs and pitches (MIDI) based on a phoneme dictionary.
A primary reason for needing MIDI is to use its duration information as a reference for subsequent phoneme duration prediction.
A rasterized node will be implemented later to accommodate the processing of notes and phonemes.
defmodule CommonEncoder do
@moduledoc """
The lyrics and pitch are initially encoded for use in subsequent models.
"""
def run_partial(%Orchid.Param{payload: words}, opts) do
lang_dict = Keyword.fetch!(opts, :lang_dict)
phoneme_dict = Keyword.fetch!(opts, :phoneme_dict)
Enum.reduce(
words,
{[], [], [], [], []},
fn {phonemes, duration, midi_note}, {acc_l, acc_t, acc_wdiv, acc_wdur, acc_midi} ->
ph_count = length(phonemes)
{curr_langs, curr_toks} =
phonemes
|> Enum.map(fn {lang, phone} -> {lang_dict[lang], phoneme_dict[phone]} end)
|> Enum.unzip()
curr_midis = List.duplicate(midi_note, ph_count)
{
acc_l ++ curr_langs,
acc_t ++ curr_toks,
acc_wdiv ++ [ph_count],
acc_wdur ++ [duration],
acc_midi ++ curr_midis
}
end)
end
end
defmodule DurationPredictEncoder do
use Orchid.Step
def run(param, opts) do
{langs, toks, w_div, w_dur, ph_midis} = CommonEncoder.run_partial(param, opts)
lang_map_param = Orchid.Param.new(:lang_map, :payload, Nx.tensor([langs], type: :s64))
phoneme_map_param = Orchid.Param.new(:phoneme_map, :payload, Nx.tensor([toks], type: :s64))
word_div_param = Orchid.Param.new(:word_division, :payload, Nx.tensor([w_div], type: :s64))
word_dur_param = Orchid.Param.new(:word_duration, :payload, Nx.tensor([w_dur], type: :s64))
ph_midi_param = Orchid.Param.new(:ph_midi, :payload, Nx.tensor([ph_midis], type: :s64))
{:ok, [lang_map_param, phoneme_map_param, word_div_param, word_dur_param, ph_midi_param]}
end
end
defmodule PitchPredictEncoder do
use Orchid.Step
def run(%Orchid.Param{} = param, opts) do
{langs, toks, _w_div, w_dur, ph_midis} = CommonEncoder.run_partial(param, opts)
languages = Orchid.Param.new(:lang_map, :payload, Nx.tensor([langs], type: :s64))
tokens = Orchid.Param.new(:phoneme_map, :payload, Nx.tensor([toks], type: :s64))
word_dur = Orchid.Param.new(:word_duration, :payload, Nx.tensor([w_dur], type: :s64))
ph_midi = Orchid.Param.new(:ph_midi, :payload, Nx.tensor([ph_midis], type: :s64))
{:ok, [languages, tokens, word_dur, ph_midi]}
end
end
# Used for Variance model
defmodule VarianceEncoder do
use Orchid.Step
def run(param, opts) do
{langs, toks, _w_div, w_dur, ph_midis} = CommonEncoder.run_partial(param, opts)
languages = Orchid.Param.new(:lang_map, :payload, Nx.tensor([langs], type: :s64))
tokens = Orchid.Param.new(:phoneme_map, :payload, Nx.tensor([toks], type: :s64))
word_dur = Orchid.Param.new(:word_duration, :payload, Nx.tensor([w_dur], type: :s64))
ph_midi = Orchid.Param.new(:ph_midi, :payload, Nx.tensor([ph_midis], type: :s64))
{:ok, [languages, tokens, word_dur, ph_midi]}
end
end
Variance Model
Predict Phoneme Duration:
defmodule PredictDuration do
@behaviour OrchidSymbiont.Step
def required, do: [:duration_linguistic, :duration_predict]
def run_with_model(
[
%Orchid.Param{payload: lang_map},
%Orchid.Param{payload: phoneme_map},
%Orchid.Param{payload: word_division},
%Orchid.Param{payload: word_duration},
%Orchid.Param{payload: phoneme_midi}
],
handlers,
_opts
) do
duration_linguistic = handlers.duration_linguistic
duration_predict = handlers.duration_predict
inputs = {phoneme_map, lang_map, word_division, word_duration}
{:ok, {encoder_out_tensor, mask_tensor}} = OrchidSymbiont.call(duration_linguistic, {:infer, inputs}, :infinity)
{:ok, {result}} = OrchidSymbiont.call(duration_predict, {:infer, {encoder_out_tensor, mask_tensor, phoneme_midi}}, :infinity)
{:ok, [
Orchid.Param.new(:ph_dur_pred, :encoder_out, result)
]}
end
end
Predict Pitch:
defmodule PredictPitch do
@behaviour OrchidSymbiont.Step
def required, do: [:pitch_linguistic, :pitch_predict]
def run_with_model(
[
%Orchid.Param{payload: languages},
%Orchid.Param{payload: phonemes},
%Orchid.Param{payload: phoneme_duration},
%Orchid.Param{payload: note_midi}
], handlers, opts) do
ph_dur =
phoneme_duration
|> Nx.backend_transfer(Nx.BinaryBackend)
|> Nx.round()
|> Nx.as_type(:s64)
{:ok, {encoder_out, _mask}} =
OrchidSymbiont.call(
handlers.pitch_linguistic,
{:infer, {phonemes, languages, ph_dur}},
:infinity
)
total_frames =
ph_dur
|> Nx.sum()
|> Nx.to_number()
key = Nx.Random.key(System.system_time())
{pitch_noise, _key} =
Nx.Random.normal(
key,
0.0,
1.0,
shape: {1, total_frames},
type: :f32
)
expr =
Nx.broadcast(Nx.tensor(1.0, type: :f32), {1, total_frames})
retake =
Nx.broadcast(Nx.tensor(1, type: :u8), {1, total_frames})
note_rest =
note_midi
|> Nx.equal(0)
|> Nx.as_type(:u8)
note_midi = note_midi |> Nx.as_type(:f32)
note_dur = ph_dur
steps =
opts
|> Keyword.get(:steps, 20)
|> Nx.tensor(type: :s64)
{:ok, {pitch_pred}} =
OrchidSymbiont.call(
handlers.pitch_predict,
{:infer,
{
encoder_out,
ph_dur,
note_midi,
note_rest,
note_dur,
pitch_noise,
expr,
retake,
steps
}},
:infinity
)
{:ok, Orchid.Param.new(:pitch_pred, :payload, pitch_pred)}
end
end
defmodule MIDIToPitch do
use Orchid.Step
def run(%Orchid.Param{payload: midi_pred}, _opts) do
{:ok, midi_pred
|> Nx.backend_transfer(Nx.BinaryBackend)
|> Nx.add(-69.0)
|> Nx.divide(12.0)
|> then(&Nx.pow(2.0, &1))
|> Nx.multiply(440)
|> then(fn converted_f0 ->
Nx.select(Nx.less(Nx.backend_transfer(midi_pred, Nx.BinaryBackend), 0.0), Nx.tensor(0.0), converted_f0)
end)
|> then(&Orchid.Param.new(:f0, :tensor, &1))}
end
end
Other Variance Params:
defmodule VarianceModel do
@behaviour OrchidSymbiont.Step
def required, do: [:variance_linguistic, :variance]
def run_with_model(
[
%Orchid.Param{payload: languages},
%Orchid.Param{payload: phonemes},
%Orchid.Param{payload: ph_dur},
%Orchid.Param{payload: pitch}
],
handlers,
opts
) do
ph_dur =
ph_dur
|> Nx.backend_transfer(Nx.BinaryBackend)
|> Nx.round()
|> Nx.as_type(:s64)
{:ok, {encoder_out, _mask}} =
OrchidSymbiont.call(
handlers.variance_linguistic,
{:infer, {phonemes, languages, ph_dur}}
)
total_frames =
ph_dur
|> Nx.sum()
|> Nx.to_number()
key = Nx.Random.key(System.system_time())
{breath_noise, key} =
Nx.Random.normal(key, 0.0, 1.0,
shape: {1, total_frames},
type: :f32
)
{voice_noise, _key} =
Nx.Random.normal(key, 0.0, 1.0,
shape: {1, total_frames},
type: :f32
)
retake =
Nx.broadcast(
Nx.tensor([1,1], type: :u8),
{1, total_frames, 2}
)
steps =
opts
|> Keyword.get(:steps, 20)
|> Nx.tensor(type: :s64)
{:ok, {breath_pred, voice_pred}} =
OrchidSymbiont.call(
handlers.variance,
{:infer,
{
encoder_out,
ph_dur,
pitch,
breath_noise,
voice_noise,
retake,
steps
}},
:infinity
)
{:ok, [
Orchid.Param.new(:breathiness, :payload, breath_pred),
Orchid.Param.new(:voicing, :payload, voice_pred)
]}
end
end
Acoustic Model and Vocoder
Acoustic Model:
defmodule Acoustic do
@behaviour OrchidSymbiont.Step
def required, do: [:acoustic]
def run_with_model(
[
%Orchid.Param{payload: languages},
%Orchid.Param{payload: phonemes},
%Orchid.Param{payload: phoneme_duration},
%Orchid.Param{payload: pitch},
%Orchid.Param{payload: breathiness},
%Orchid.Param{payload: voicing}
],
handlers,
opts
) do
durations =
phoneme_duration
|> Nx.backend_transfer(Nx.BinaryBackend)
|> Nx.round()
|> Nx.as_type(:s64)
frames = Nx.axis_size(pitch, 1)
gender =
Nx.broadcast(Nx.tensor(0.0, type: :f32), {1, frames})
velocity =
Nx.broadcast(Nx.tensor(1.0, type: :f32), {1, frames})
steps =
opts
|> Keyword.get(:steps, 20)
|> Nx.tensor(type: :s64)
depth =
opts
|> Keyword.get(:depth, 1.0)
|> Nx.tensor(type: :f32)
{:ok, {mel}} =
OrchidSymbiont.call(
handlers.acoustic,
{:infer,
{
phonemes,
languages,
durations,
pitch,
breathiness,
voicing,
gender,
velocity,
depth,
steps
}},
:infinity
)
{:ok, Orchid.Param.new(:mel, :payload, mel)}
end
end
Vocoder:
defmodule NSFHifiGAN_Vocoder do
@behaviour OrchidSymbiont.Step
def required, do: [:vocoder]
def run_with_model(
[%Orchid.Param{payload: mel}, %Orchid.Param{payload: f0}],
handlers,
_opts
) do
{:ok, {audio}} = OrchidSymbiont.call(handlers.vocoder, {:infer, {mel, f0}})
{:ok, Orchid.Param.new(:audio, :payload, audio)}
end
end
Post-process (tensor to audio)
defmodule TensorToWave do
use Orchid.Step
def run(%Orchid.Param{payload: wave_tensor}, opts) do
pcm_data =
wave_tensor
|> Nx.flatten()
|> Nx.backend_transfer(Nx.BinaryBackend)
|> Nx.multiply(32767.0)
|> Nx.clip(-32768.0, 32767.0)
|> Nx.as_type(:s16)
|> Nx.to_binary()
sample_rate = Keyword.get(opts, :sample_rate, 44100)
byte_rate = sample_rate * 1 * 2 # sample_rate * channels * bytes_per_sample (Int16=2)
data_size = byte_size(pcm_data)
file_size = 36 + data_size
header = <<
"RIFF", file_size::little-integer-size(32), "WAVE",
"fmt ", 16::little-integer-size(32), # Subchunk1Size
1::little-integer-size(16), # AudioFormat (1 = PCM)
1::little-integer-size(16), # NumChannels (1 = Mono)
sample_rate::little-integer-size(32), # SampleRate
byte_rate::little-integer-size(32), # ByteRate
2::little-integer-size(16), # BlockAlign (channels * 2)
16::little-integer-size(16), # BitsPerSample (16 bits)
"data", data_size::little-integer-size(32) # Subchunk2Size
>>
File.write!("E:/final.wav", header <> pcm_data)
{:ok, Orchid.Param.new(:final, :audio, header <> pcm_data)}
end
end
Build Pipeline and Prepare to Demo
defmodule QixuanPipeline do
require Logger
def load_models(model_root_path, model_config) do
Logger.info("Loading DiffSinger models...")
models =[
{:duration_linguistic, model_config.predict_map.maybe_duration.linguistic.path},
{:duration_predict, model_config.predict_map.maybe_duration.duration.path},
{:pitch_linguistic, model_config.predict_map.maybe_pitch.linguistic.path},
{:pitch_predict, model_config.predict_map.maybe_pitch.predict.path},
{:variance_linguistic, model_config.variance.linguistic.path},
{:variance, model_config.variance.variance.path},
{:acoustic, model_config.acoustic.infer.path},
{:vocoder, model_config.vocoder.path}
]
for {name, rel_path} <- models do
path = Path.join([model_root_path] ++ rel_path)
:ok = OrchidSymbiont.register(name, {Orchid.Symbiont.OrtexRunner, [name: name, path: path]})
end
Logger.info("All models loaded successfully.")
:ok
end
def build_recipe(model_config) do
injector = [extra_hooks_stack: [OrchidSymbiont.Hooks.Injector]]
dur_dict = model_config.predict_map.maybe_duration.phonemes
pitch_dict = model_config.predict_map.maybe_pitch.phonemes
var_dict = model_config.variance.phonemes
sample_rate = model_config.vocoder.maybe_config["sample_rate"]
duration_steps =[
{DurationPredictEncoder, :words,[:duration_lang, :duration_phoneme, :word_division, :word_duration, :duration_ph_midi],[lang_dict: dur_dict.maybe_lang_dict, phoneme_dict: dur_dict.phoneme_dict]},
{PredictDuration,[:duration_lang, :duration_phoneme, :word_division, :word_duration, :duration_ph_midi],
:phoneme_duration_predict, injector}
]
pitch_steps = [
{PitchPredictEncoder, :words,[:pitch_lang, :pitch_phoneme, :word_duration_from_pitch, :pitch_ph_midi],[lang_dict: pitch_dict.maybe_lang_dict, phoneme_dict: pitch_dict.phoneme_dict]},
{PredictPitch,[:pitch_lang, :pitch_phoneme, :phoneme_duration_predict, :pitch_ph_midi],
:pitch_pred_midi, injector},
{MIDIToPitch, :pitch_pred_midi, :pitch_pred}
]
variance_steps = [
{VarianceEncoder, :words,[:variance_lang, :variance_phoneme, :word_duration_from_variance, :variance_ph_midi],[lang_dict: var_dict.maybe_lang_dict, phoneme_dict: var_dict.phoneme_dict]},
{VarianceModel,[:variance_lang, :variance_phoneme, :phoneme_duration_predict, :pitch_pred_midi],[:breathiness_pred, :voice_pred], injector}
]
acoustic_step =[
{Acoustic,[:variance_lang, :variance_phoneme, :phoneme_duration_predict, :pitch_pred, :breathiness_pred, :voice_pred],
:mel, injector}
]
vocoder_step = [
{NSFHifiGAN_Vocoder,[:mel, :pitch_pred], :wave_tensor, injector},
{TensorToWave, :wave_tensor, :audio,[sample_rate: sample_rate]}
]
all_steps = duration_steps ++ pitch_steps ++ variance_steps ++ acoustic_step ++ vocoder_step
Orchid.Recipe.new(all_steps)
end
end
Add a tracker
defmodule Orchid.Livebook.GanttTracker do
use Agent
def start_link(_) do
Agent.start_link(fn ->[] end, name: __MODULE__)
end
def handle_event([:orchid, :step, status], measurements, meta, _config) do
run_id = List.first(Process.get(:"$callers")) || self()
run_id_str = inspect(run_id)
time_val = if status == :start, do: measurements.system_time, else: measurements.duration
Agent.update(__MODULE__, fn state ->[{run_id_str, status, time_val, meta} | state]
end)
end
def get_spans() do
events = Agent.get(__MODULE__, & &1) |> Enum.reverse()
{_, spans} = Enum.reduce(events, {%{},[]}, fn
{run_id, :start, sys_time, meta}, {pending, spans} ->
start_ms = System.convert_time_unit(sys_time, :native, :microsecond) / 1000.0
sig = {run_id, meta.impl, meta.in_keys, meta.out_keys}
{Map.put(pending, sig, start_ms), spans}
{run_id, status, duration, meta}, {pending, spans} when status in[:done, :exception, :special] ->
duration_ms = System.convert_time_unit(duration, :native, :microsecond) / 1000.0
sig = {run_id, meta.impl, meta.in_keys, meta.out_keys}
case Map.pop(pending, sig) do
{start_ms, new_pending} when not is_nil(start_ms) ->
impl_name = if is_function(meta.impl), do: "Anonymous", else: inspect(meta.impl) |> String.split(".") |> List.last()
out_keys = inspect(meta.out_keys) |> String.slice(0..14) |> Kernel.<>("...")
span = %{
run_id: run_id,
step: "#{impl_name}(#{out_keys})",
start: start_ms,
end: start_ms + duration_ms,
visual_end: start_ms + max(duration_ms, 10.0),
duration: duration_ms,
status: status
}
{new_pending, [span | spans]}
{nil, ^pending} -> {pending, spans}
end
end)
spans
end
def clear(), do: Agent.update(__MODULE__, fn _ ->[] end)
end
Kino.start_child({Orchid.Livebook.GanttTracker, []})
events =[
{[:orchid, :step, :start], "start"},
{[:orchid, :step, :done], "done"},
{[:orchid, :step, :exception], "exception"},
{[:orchid, :step, :special], "special"}
]
for {event, suffix} <- events do
:telemetry.attach(
"orchid-gantt-#{suffix}",
event,
&Orchid.Livebook.GanttTracker.handle_event/4,
nil
)
end
defmodule MermaidRenderer do
def render(%Orchid.Recipe{} = recipe) do
mermaid_code = generate_mermaid(recipe.steps)
Kino.Mermaid.new(mermaid_code)
end
defp generate_mermaid(steps) do
header = "graph TD\n"
body =
steps
|> Enum.with_index()
|> Enum.flat_map(fn {step, idx} ->
{impl, in_keys, out_keys, _opts} = Orchid.Step.ensure_full_step(step)
step_id = "step_#{idx}"
step_name = format_impl(impl)
step_node = ~s| #{step_id}["⚙️ #{step_name}"]:::stepClass|
in_edges =
in_keys
|> normalize_keys()
|> Enum.map(fn k -> ~s| #{k}(["#{k}"]) -.-> #{step_id}| end)
out_edges =
out_keys
|> normalize_keys()
|> Enum.map(fn k -> ~s| #{step_id} ==> #{k}(["#{k}"])| end)
[step_node] ++ in_edges ++ out_edges
end)
|> Enum.uniq()
|> Enum.join("\n")
styles = """
classDef stepClass fill:#2eb82e,stroke:#fff,stroke-width:2px,color:#fff;
"""
header <> body <> styles
end
defp normalize_keys(keys) do
keys
|> Orchid.Step.ID.normalize_keys_to_set()
|> MapSet.to_list()
|> Enum.map(&to_string/1)
end
defp format_impl(impl) when is_function(impl), do: "Anonymous Function"
defp format_impl(impl) do
inspect(impl) |> String.replace_prefix("Elixir.", "")
end
end
Pitch Intervention
defmodule PitchOffset do
@behaviour OrchidIntervention.Operate
def short_circuit?, do: false
def data_enable, do: {true, true}
def merge(inner_payload, intervention_payload) do
inner_payload = Nx.backend_transfer(inner_payload, Nx.BinaryBackend)
intervention_payload = Nx.backend_transfer(intervention_payload, Nx.BinaryBackend)
{:ok, Nx.add(inner_payload, intervention_payload)}
end
end
Run Workflow
QixuanPipeline.load_models(model_root_path, model_config)
recipe = QixuanPipeline.build_recipe(model_config)
inputs = [
%Orchid.Param{name: :words, payload: [
{[{"zh", "AP"}], 10, 0},
{[{"zh", "zh/l"}, {"zh", "zh/iang"}], 40, 60},
{[{"zh", "zh/zh"}, {"zh", "zh/i"}], 40, 62},
{[{"zh", "zh/l"}, {"zh", "zh/ao"}], 40, 64},
{[{"zh", "zh/h"}, {"zh", "zh/u"}], 40, 60},
{[{"zh", "AP"}], 1, 0},
{[{"zh", "zh/l"}, {"zh", "zh/iang"}], 40, 60},
{[{"zh", "zh/zh"}, {"zh", "zh/i"}], 40, 62},
{[{"zh", "zh/l"}, {"zh", "zh/ao"}], 40, 64},
{[{"zh", "zh/h"}, {"zh", "zh/u"}], 40, 60}
]},
]
interventions = %{
:pitch_pred_midi => {PitchOffset, Orchid.Param.new(:pitch_pred_midi, :payload, Nx.tensor(12.0, type: :f32))}
}
Orchid.Livebook.GanttTracker.clear()
{elapse, {:ok, results}} = :timer.tc(
&Orchid.run/3,
[recipe, inputs, [
global_hooks_stack: [Orchid.Hook.ApplyInterventions],
baggage: %{interventions: interventions}
]],
:microsecond
)
require Logger
Logger.info "Used #{elapse / 1000}ms."
Show Graph
MermaidRenderer.render recipe
Execution Timeline:
alias VegaLite, as: Vl
spans = Orchid.Livebook.GanttTracker.get_spans()
min_start = spans |> Enum.map(& &1.start) |> Enum.min(fn -> 0 end)
chart_data =
Enum.map(spans, fn span ->
%{
"step" => span.step,
"start" => span.start - min_start,
"end" => span.visual_end - min_start,
"duration" => Float.round(span.duration, 2),
"status" => Atom.to_string(span.status)
}
end)
Vl.new(width: 600, height: 250, title: "Orchid Workflow Execution Timeline")
|> Vl.data_from_values(chart_data)
|> Vl.mark(:bar, corner_radius: 4, height: 20)
|> Vl.encode_field(:y, "step", type: :nominal, title: "Steps", sort: [field: "start", op: "min"], axis: [labelLimit: 400])
|> Vl.encode_field(:x, "start", type: :quantitative, title: "Time Offset (ms)")
|> Vl.encode_field(:x2, "end")
|> Vl.encode_field(:color, "status", type: :nominal, title: "Status", scale: [range: ["#2eb82e", "#d9534f"]])
|> Vl.encode_field(:tooltip, "duration", type: :quantitative, title: "Duration (ms)")
|> Kino.VegaLite.new()
Prepare for the demo:
alias VegaLite, as: Vl
mel_tensor =
results.mel.payload[0]
|> Nx.backend_transfer(Nx.BinaryBackend)
mel_data =
mel_tensor
|> Nx.to_batched(1)
|> Enum.with_index()
|> Enum.flat_map(fn {row_tensor, frame_idx} ->
row_tensor
|> Nx.to_flat_list()
|> Enum.with_index()
|> Enum.map(fn {val, bin_idx} ->
%{
"x1" => frame_idx, "x2" => frame_idx + 1,
"y1" => bin_idx, "y2" => bin_idx + 1,
"value" => val
}
end)
end)
f0_data =
results.pitch_pred.payload[0]
|> Nx.backend_transfer(Nx.BinaryBackend)
|> Nx.to_flat_list()
|> Enum.with_index()
|> Enum.map(fn {f0, frame} -> %{"frame" => frame, "f0" => f0} end)
durations = results.phoneme_duration_predict.payload[0]
|> Nx.backend_transfer(Nx.BinaryBackend)
|> Nx.to_flat_list()
boundaries = Enum.scan(durations, 0, fn dur, acc -> dur + acc end)
boundary_data = Enum.map(boundaries, fn b -> %{"frame" => b} end)
mel_layer =
Vl.new()
|> Vl.data_from_values(mel_data)
|> Vl.mark(:rect, tooltip: false, stroke: nil)
|> Vl.encode_field(:x, "x1", type: :quantitative, title: "Time (Frames)")
|> Vl.encode_field(:x2, "x2")
|> Vl.encode_field(:y, "y1", type: :quantitative, title: "Mel Frequency Bin")
|> Vl.encode_field(:y2, "y2")
|> Vl.encode_field(:color, "value",
type: :quantitative,
scale: [scheme: "greys", reverse: true, domain: [-11.0, 2.0]],
legend: false
)
f0_layer =
Vl.new()
|> Vl.data_from_values(f0_data)
|> Vl.mark(:line, color: "#007bff", strokeWidth: 2)
|> Vl.encode_field(:x, "frame", type: :quantitative)
|> Vl.encode_field(:y, "f0",
type: :quantitative,
title: "Pitch (Hz)",
axis: [orient: "right", titleColor: "#007bff"],
scale: [domain: [50, 800]]
)
boundary_layer =
Vl.new()
|> Vl.data_from_values(boundary_data)
|> Vl.mark(:rule, color: "red", strokeDash: [4, 4], strokeWidth: 1.5)
|> Vl.encode_field(:x, "frame", type: :quantitative)
:ok
Demostration
File.write!("E:/final.wav", results.audio.payload)
Kino.Audio.new(results.audio.payload, :wav)
Vl.new(width: 800, height: 400, title: "Output Analysis (Mel Spectrum + F0 + Phoneme Duration)")
|> Vl.resolve(:scale, y: :independent)
|> Vl.layers([mel_layer, f0_layer, boundary_layer])
|> Kino.VegaLite.new()
Multiple Pipeline
Try concurrent.
Kino.start_child({Orchid.Livebook.GanttTracker,[]})
Orchid.Livebook.GanttTracker.clear()
pipeline_tasks =
for _ <- 1..3 do
Task.async(fn ->
Orchid.run(recipe, inputs, [executor_and_opts: {Orchid.Executor.Async, []},
baggage: %{
meta_store: MetaStore,
blob_store: BlobStore
},
global_hooks_stack: [OrchidStratum.BypassHook]
])
end)
end
{elapse, _res} = fn -> Task.await_many(pipeline_tasks, :infinity) end
|> :timer.tc
# Est: T = 4 * Concurrent + 3
Logger.info "Total used #{elapse / 1000_000}s."
alias VegaLite, as: Vl
spans = Orchid.Livebook.GanttTracker.get_spans()
min_start = spans |> Enum.map(& &1.start) |> Enum.min(fn -> 0 end)
chart_data =
Enum.map(spans, fn span ->
%{
"run_id" => span.run_id,
"step" => span.step,
"start" => (span.start - min_start) / 1000,
"end" => (span.end - min_start) / 1000,
"duration" => Float.round(span.duration, 2),
"status" => Atom.to_string(span.status)
}
end)
Vl.new(width: 600, height: 250, title: "Concurrent Orchid Workflows")
|> Vl.data_from_values(chart_data)
|> Vl.encode_field(:row, "run_id", type: :nominal, title: "Pipeline Instance (PID)")
|> Vl.mark(:bar, corner_radius: 4, height: 20)
|> Vl.encode_field(:y, "step",
type: :nominal,
title: nil,
sort: [field: "start", op: "min"],
axis: [labelLimit: 300]
)
|> Vl.encode_field(:x, "start", type: :quantitative, title: "Global Time (ms)")
|> Vl.encode_field(:x2, "end")
|> Vl.encode_field(:color, "status", type: :nominal, scale: [range: ["#2eb82e", "#d9534f"]])
|> Vl.encode_field(:tooltip, "duration", type: :quantitative, title: "Duration (ms)")
|> Vl.resolve(:scale, x: :shared)
|> Kino.VegaLite.new()
Cache
metaref = OrchidStratum.MetaStorage.EtsAdapter.init()
blobref = OrchidStratum.BlobStorage.EtsAdapter.init()
cache_recipe_opts = [
executor_and_opts: {Orchid.Executor.Async, []},
baggage: %{
meta_store: {OrchidStratum.MetaStorage.EtsAdapter, metaref},
blob_store: {OrchidStratum.BlobStorage.EtsAdapter, blobref}
},
global_hooks_stack: [OrchidStratum.BypassHook]
]
recipe_with_cache = Orchid.Recipe.walk(
recipe.steps,
fn step ->
case step do
{impl, i, o} -> {impl, i, o, [cache: true]}
{impl, i, o, old_opts} ->
{impl, i, o, old_opts ++
[cache: true,
cache_keys: Keyword.keys(old_opts)
]
}
end
end
)
# Prewarm
Orchid.run(recipe_with_cache, inputs, cache_recipe_opts)
defmodule BenchmarkRunner do
def run_without_cache(recipe, inputs) do
Orchid.run(recipe, inputs, [executor_and_opts: {Orchid.Executor.Async, []}])
end
def run_with_cache(recipe_with_cache, inputs, cache_recipe_opts) do
Orchid.run(recipe_with_cache, inputs, cache_recipe_opts)
end
end
Benchee.run(
%{
"run_without_cache" => fn -> BenchmarkRunner.run_without_cache(recipe, inputs) end,
"run_with_cache" => fn -> BenchmarkRunner.run_with_cache(recipe_with_cache, inputs, cache_recipe_opts) end
},
time: 3,
memory_time: 3,
reduction_time: 2
)