Powered by AppSignal & Oban Pro

Tris-to-Quads Converter Service with Nx and Zenoh

elixir/tris_to_quads_livebook.livemd

Tris-to-Quads Converter Service with Nx and Zenoh

# Livebook setup - copy this entire cell to run
Mix.install([
  {:pythonx, "~> 0.4.7"},
  {:nx, "~> 0.7"},
  {:exla, "~> 0.7"},
  {:zenoh, "~> 0.1.0"},
  {:mime, "~> 2.0"},
  {:jason, "~> 1.4"},
  {:opentelemetry_api, "~> 1.3"},
  {:opentelemetry, "~> 1.3"}
])

# Configure Nx backend
Nx.global_default_backend(EXLA.Backend)

# Configure OpenTelemetry for console logging
Application.put_env(:opentelemetry, :span_processor, :batch)
Application.put_env(:opentelemetry, :traces_exporter, :none)
Application.put_env(:opentelemetry, :metrics_exporter, :none)
Application.put_env(:opentelemetry, :logs_exporter, :none)

Logger.configure(level: :info)

Setup Python Environment with Blender

# Initialize Python environment with Blender dependencies
Pythonx.uv_init("""
[project]
name = "tris-to-quads-livebook"
version = "0.0.0"
requires-python = "==3.11.*"
dependencies = [
  "bpy==4.5.*",
  "pulp",
]
""")

IO.puts("✓ Python environment initialized with Blender and optimization libraries")

Nx Optimization Algorithm

This section implements the core tris-to-quads optimization using Nx for high-performance linear programming.

defmodule TrisToQuadsNx do
  @moduledoc """
  Nx-powered optimization for tris-to-quads conversion.
  Uses linear programming to select optimal edges for conversion.
  """

  import Nx.Defn

  @doc """
  Solves the tris-to-quads optimization problem using Nx.

  Given a mesh with triangular faces, finds the optimal set of edges to dissolve
  that maximizes quad conversion while respecting triangle constraints.
  """
  def optimize_tris_to_quads(edges, faces) do
    # edges: [{edge_id, length, face1_id, face2_id}, ...]
    # faces: [{face_id, edge1_id, edge2_id, edge3_id}, ...]

    # Build constraint matrix A and objective vector c
    {a_matrix, c_vector} = build_optimization_problem(edges, faces)

    # Solve using linear programming (maximize c*x subject to A*x <= b, x binary)
    solution = solve_binary_lp(a_matrix, c_vector)

    # Extract selected edges
    selected_edges = extract_selected_edges(solution, edges)

    selected_edges
  end

  defp build_optimization_problem(edges, faces) do
    num_edges = length(edges)
    num_faces = length(faces)

    # Objective: maximize edge lengths (prefer longer edges)
    c_vector = Nx.tensor(for {_, length, _, _} <- edges, do: length)

    # Constraints: each triangular face can have at most one edge dissolved
    a_matrix = Nx.zeros({num_faces, num_edges})

    # Build constraint matrix
    a_matrix = Enum.reduce(faces, a_matrix, fn {face_id, e1, e2, e3}, matrix ->
      # Find edge indices
      e1_idx = find_edge_index(edges, e1)
      e2_idx = find_edge_index(edges, e2)
      e3_idx = find_edge_index(edges, e3)

      # Set constraint: sum of edges in this face <= 1
      matrix
      |> Nx.put_slice([face_id, e1_idx], Nx.tensor([1]))
      |> Nx.put_slice([face_id, e2_idx], Nx.tensor([1]))
      |> Nx.put_slice([face_id, e3_idx], Nx.tensor([1]))
    end)

    {a_matrix, c_vector}
  end

  defp find_edge_index(edges, edge_id) do
    Enum.find_index(edges, fn {id, _, _, _} -> id == edge_id end)
  end

  @doc """
  Simple binary linear programming solver using Nx.
  For production, consider using more sophisticated solvers.
  """
  defn solve_binary_lp(a_matrix, c_vector) do
    # This is a simplified implementation
    # In practice, you'd use a proper LP solver
    # For now, use a greedy approach: select edges that don't conflict

    num_vars = Nx.size(c_vector)
    solution = Nx.zeros({num_vars})

    # Sort edges by length (descending)
    sorted_indices = Nx.argsort(c_vector, direction: :desc)

    # Greedily select non-conflicting edges
    selected = Nx.zeros({num_vars})

    for i <- 0..(num_vars - 1) do
      edge_idx = sorted_indices[i]

      # Check if this edge conflicts with already selected edges
      conflicts = check_conflicts(a_matrix, selected, edge_idx)

      if not conflicts do
        selected = Nx.put_slice(selected, [edge_idx], Nx.tensor([1]))
      end
    end

    selected
  end

  defn check_conflicts(a_matrix, selected, edge_idx) do
    # Check if selecting this edge would violate any constraints
    edge_constraints = a_matrix[[.., edge_idx]]
    selected_constraints = Nx.dot(a_matrix, selected)

    # A constraint is violated if selected_constraints + edge_constraints > 1
    violations = Nx.greater(Nx.add(selected_constraints, edge_constraints), 1)
    Nx.any(violations)
  end

  defp extract_selected_edges(solution, edges) do
    solution_list = Nx.to_list(solution)

    Enum.zip(edges, solution_list)
    |> Enum.filter(fn {_, selected} -> selected == 1 end)
    |> Enum.map(fn {edge, _} -> edge end)
  end
end

IO.puts("✓ Nx optimization module loaded")

Mock 3D Mesh Data Generation

Generate sample mesh data for testing the optimization algorithm.

defmodule MockMesh do
  @moduledoc """
  Generates mock 3D mesh data for testing tris-to-quads optimization.
  """

  def generate_cube_mesh do
    # Simple cube mesh with triangular faces
    # In a real implementation, this would come from Blender via Pythonx

    vertices = [
      {0, 0, 0}, {1, 0, 0}, {1, 1, 0}, {0, 1, 0},  # Bottom face
      {0, 0, 1}, {1, 0, 1}, {1, 1, 1}, {0, 1, 1}   # Top face
    ]

    # Triangular faces (each face has 3 vertices)
    faces = [
      # Bottom face triangles
      {[0, 1, 2], [0, 2, 3]},
      # Top face triangles
      {[4, 5, 6], [4, 6, 7]},
      # Side faces
      {[0, 1, 5], [0, 5, 4]},  # Front
      {[1, 2, 6], [1, 6, 5]},  # Right
      {[2, 3, 7], [2, 7, 6]},  # Back
      {[3, 0, 4], [3, 4, 7]}   # Left
    ]

    # Convert to edges and faces format expected by optimizer
    {edges, faces_data} = convert_to_optimizer_format(vertices, faces)

    {vertices, edges, faces_data}
  end

  def generate_sphere_mesh(radius \\ 1.0, subdivisions \\ 2) do
    # Generate icosphere vertices and faces
    # This is a simplified version - real implementation would use proper icosphere algorithm

    # For demo, create a simple triangulated sphere approximation
    vertices = generate_sphere_vertices(radius, subdivisions)
    faces = triangulate_sphere_faces(vertices)

    {edges, faces_data} = convert_to_optimizer_format(vertices, faces)

    {vertices, edges, faces_data}
  end

  defp generate_sphere_vertices(radius, subdivisions) do
    # Simplified sphere vertex generation
    # Real implementation would use icosphere algorithm
    [
      {0, 0, radius}, {radius, 0, 0}, {0, radius, 0}, {-radius, 0, 0}, {0, -radius, 0}, {0, 0, -radius}
    ]
  end

  defp triangulate_sphere_faces(vertices) do
    # Create triangular faces connecting vertices
    # This is simplified - real sphere triangulation is more complex
    [
      {[0, 1, 2]}, {[0, 2, 3]}, {[0, 3, 4]}, {[0, 4, 1]},  # Top pyramid
      {[5, 2, 1]}, {[5, 3, 2]}, {[5, 4, 3]}, {[5, 1, 4]}   # Bottom pyramid
    ]
  end

  defp convert_to_optimizer_format(vertices, faces) do
    # Convert mesh data to format expected by optimizer
    edges = extract_edges_from_faces(faces, vertices)
    faces_data = convert_faces_to_optimizer_format(faces)

    {edges, faces_data}
  end

  defp extract_edges_from_faces(faces, vertices) do
    edge_map = %{}
    edge_id = 0

    # Extract all unique edges from faces
    {edges, _} = Enum.reduce(faces, {[], edge_id}, fn face_triangles, {acc, id} ->
      Enum.reduce(face_triangles, {acc, id}, fn triangle, {acc2, id2} ->
        [v1, v2, v3] = triangle
        edges = [
          {{min(v1, v2), max(v1, v2)}, calculate_edge_length(vertices, v1, v2)},
          {{min(v2, v3), max(v2, v3)}, calculate_edge_length(vertices, v2, v3)},
          {{min(v3, v1), max(v3, v1)}, calculate_edge_length(vertices, v3, v1)}
        ]

        {new_acc, new_id} = Enum.reduce(edges, {acc2, id2}, fn {edge_key, length}, {acc3, id3} ->
          case Map.get(edge_map, edge_key) do
            nil ->
              edge_data = {id3, length, nil, nil}  # edge_id, length, face1, face2
              {[{edge_key, edge_data} | acc3], id3 + 1}
            existing_id ->
              {acc3, id3}
          end
        end)

        {new_acc, new_id}
      end)
    end)

    # Convert edge map to list format
    Enum.map(edges, fn {_, edge_data} -> edge_data end)
  end

  defp calculate_edge_length(vertices, v1_idx, v2_idx) do
    {x1, y1, z1} = Enum.at(vertices, v1_idx)
    {x2, y2, z2} = Enum.at(vertices, v2_idx)

    dx = x2 - x1
    dy = y2 - y1
    dz = z2 - z1

    :math.sqrt(dx*dx + dy*dy + dz*dz)
  end

  defp convert_faces_to_optimizer_format(faces) do
    # Convert faces to optimizer format: {face_id, edge1_id, edge2_id, edge3_id}
    Enum.with_index(faces, fn face_triangles, face_id ->
      # For simplicity, take first triangle of each face
      [v1, v2, v3] = hd(face_triangles)
      {face_id, {min(v1, v2), max(v1, v2)}, {min(v2, v3), max(v2, v3)}, {min(v3, v1), max(v3, v1)}}
    end)
  end
end

IO.puts("✓ Mock mesh generation module loaded")

Test the Nx Optimization

# Generate test mesh data
{vertices, edges, faces} = MockMesh.generate_cube_mesh()

IO.puts("Generated test mesh:")
IO.puts("  Vertices: #{length(vertices)}")
IO.puts("  Edges: #{length(edges)}")
IO.puts("  Faces: #{length(faces)}")

# Run optimization
selected_edges = TrisToQuadsNx.optimize_tris_to_quads(edges, faces)

IO.puts("Optimization Results:")
IO.puts("  Selected edges for conversion: #{length(selected_edges)}")
Enum.each(selected_edges, fn {edge_id, length, _, _} ->
  IO.puts("    Edge #{edge_id}: length #{Float.round(length, 3)}")
end)

Zenoh Service Integration

defmodule TrisToQuadsZenohService do
  @moduledoc """
  Zenoh-based tris-to-quads conversion service.
  Receives 3D model requests via Zenoh and returns converted models.
  """

  use GenServer
  require Logger

  @zenoh_key_expr "tris_to_quads/**"

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl true
  def init(opts) do
    Logger.info("Starting Tris-to-Quads Zenoh Service...")

    # Start Zenoh session
    {:ok, session} = Zenoh.start_session()

    # Declare subscriber
    {:ok, subscriber} = Zenoh.Session.declare_subscriber(session, @zenoh_key_expr, self())

    Logger.info("✓ Zenoh service listening on #{@zenoh_key_expr}")

    {:ok, %{
      session: session,
      subscriber: subscriber,
      python_initialized: false
    }}
  end

  @impl true
  def handle_info({:zenoh_sample, sample}, state) do
    Logger.info("Received Zenoh sample: #{inspect(sample.key_expr)}")

    Task.start(fn ->
      process_conversion_request(sample)
    end)

    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    Logger.debug("Unhandled message: #{inspect(msg)}")
    {:noreply, state}
  end

  @impl true
  def terminate(_reason, state) do
    if state.subscriber, do: Zenoh.Subscriber.undeclare(state.subscriber)
    if state.session, do: Zenoh.Session.close(state.session)
  end

  # Process conversion requests
  defp process_conversion_request(sample) do
    try do
      # Parse request
      request = parse_request(sample)

      # Decode MIME binary data
      binary_data = decode_mime_data(request["model_data"])

      # Process with Blender + Nx
      result_data = process_with_blender_and_nx(binary_data, request)

      # Encode result as MIME
      mime_result = encode_mime_data(result_data)

      # Publish result
      publish_result(sample, mime_result)

    rescue
      e ->
        Logger.error("Error processing request: #{inspect(e)}")
        publish_error(sample, "Processing failed: #{inspect(e)}")
    end
  end

  defp parse_request(sample) do
    # Parse JSON payload
    Jason.decode!(sample.value)
  end

  defp decode_mime_data(mime_text) do
    # Decode base64 MIME data back to binary
    # In real implementation, handle proper MIME parsing
    case Base.decode64(mime_text) do
      {:ok, binary} -> binary
      :error -> raise "Invalid MIME data"
    end
  end

  defp encode_mime_data(binary_data) do
    # Encode binary data as base64 MIME text
    Base.encode64(binary_data)
  end

  defp process_with_blender_and_nx(binary_data, request) do
    # This would integrate with the full Blender pipeline
    # For demo, return mock processed data

    # Save input to temp file
    temp_input = "/tmp/input_#{System.system_time(:millisecond)}.glb"
    File.write!(temp_input, binary_data)

    # Call Blender processing (would use Pythonx here)
    # result = call_blender_conversion(temp_input, request)

    # For demo, simulate processing
    result = "processed_#{request["filename"]}_quads.usdc"
    binary_result = "mock processed binary data for #{result}" |> :erlang.binary_to_list() |> :erlang.list_to_binary()

    # Cleanup
    File.rm(temp_input)

    binary_result
  end

  defp publish_result(sample, result) do
    # Publish result back via Zenoh
    result_key = String.replace(sample.key_expr, "request", "response")
    Zenoh.Session.put(sample.session, result_key, result)
  end

  defp publish_error(sample, error) do
    # Publish error back via Zenoh
    error_key = String.replace(sample.key_expr, "request", "error")
    Zenoh.Session.put(sample.session, error_key, error)
  end
end

# Start the service (commented out for Livebook - uncomment to run)
# {:ok, _service} = TrisToQuadsZenohService.start_link()
# IO.puts("✓ Zenoh service started")

MIME Binary Data Handling

defmodule MimeHandler do
  @moduledoc """
  Handles MIME encoding/decoding for binary 3D model data.
  Converts between binary files and text-safe MIME format for Zenoh transport.
  """

  @doc """
  Encode binary 3D model data as MIME text.
  """
  def encode_binary_data(binary_data, filename, content_type \\ "model/gltf-binary") do
    # Create MIME structure
    mime_data = %{
      "content_type" => content_type,
      "filename" => filename,
      "data" => Base.encode64(binary_data),
      "encoding" => "base64",
      "timestamp" => DateTime.utc_now() |> DateTime.to_iso8601()
    }

    Jason.encode!(mime_data)
  end

  @doc """
  Decode MIME text back to binary 3D model data.
  """
  def decode_mime_data(mime_text) do
    mime_data = Jason.decode!(mime_text)

    case mime_data["encoding"] do
      "base64" ->
        case Base.decode64(mime_data["data"]) do
          {:ok, binary} -> binary
          :error -> raise "Invalid base64 MIME data"
        end
      _ ->
        raise "Unsupported MIME encoding: #{mime_data["encoding"]}"
    end
  end

  @doc """
  Load 3D model file and encode as MIME.
  """
  def load_and_encode_file(file_path) do
    case File.read(file_path) do
      {:ok, binary_data} ->
        filename = Path.basename(file_path)
        content_type = detect_content_type(filename)
        encode_binary_data(binary_data, filename, content_type)
      {:error, reason} ->
        raise "Failed to read file #{file_path}: #{reason}"
    end
  end

  @doc """
  Decode MIME and save to file.
  """
  def decode_and_save_file(mime_text, output_path) do
    binary_data = decode_mime_data(mime_text)

    case File.write(output_path, binary_data) do
      :ok -> :ok
      {:error, reason} -> raise "Failed to write file #{output_path}: #{reason}"
    end
  end

  defp detect_content_type(filename) do
    ext = String.downcase(Path.extname(filename))

    case ext do
      ".glb" -> "model/gltf-binary"
      ".gltf" -> "model/gltf+json"
      ".usdc" -> "model/usd"
      ".usda" -> "model/usd"
      ".fbx" -> "application/octet-stream"
      _ -> "application/octet-stream"
    end
  end
end

IO.puts("✓ MIME handler loaded")

Complete Service Pipeline Demo

# Demo: Load a 3D file, encode as MIME, "process" it, and decode
demo_file = "path/to/your/model.glb"  # Replace with actual file path

try do
  # Step 1: Load and encode file as MIME
  IO.puts("Step 1: Loading and encoding 3D model...")
  mime_data = MimeHandler.load_and_encode_file(demo_file)
  IO.puts("✓ Encoded as MIME text (#{byte_size(mime_data)} chars)")

  # Step 2: Simulate Zenoh transport (MIME text)
  IO.puts("Step 2: MIME data ready for Zenoh transport")
  IO.puts("First 100 chars: #{String.slice(mime_data, 0, 100)}...")

  # Step 3: Decode MIME back to binary
  IO.puts("Step 3: Decoding MIME back to binary...")
  binary_data = MimeHandler.decode_mime_data(mime_data)
  IO.puts("✓ Decoded binary data (#{byte_size(binary_data)} bytes)")

  # Step 4: Save processed result
  output_file = "output/processed_quads.usdc"
  # In real service, this would be the converted model
  mock_result_mime = MimeHandler.encode_binary_data(binary_data, "processed_quads.usdc")
  MimeHandler.decode_and_save_file(mock_result_mime, output_file)

  IO.puts("✓ Saved processed model to #{output_file}")

rescue
  e ->
    IO.puts("Demo failed (expected if demo file doesn't exist): #{inspect(e)}")
    IO.puts("To run demo, replace demo_file with path to actual .glb/.gltf file")
end

Integration with Full Blender Pipeline

For production use, integrate this Livebook with the full Blender processing from tris_to_quads_converter.exs:

# Production integration example
defmodule FullTrisToQuadsService do
  def process_model_via_blender(input_mime, output_filename) do
    # Decode input
    input_binary = MimeHandler.decode_mime_data(input_mime)

    # Save to temp file
    temp_input = "/tmp/input_#{System.system_time(:millisecond)}.glb"
    File.write!(temp_input, input_binary)

    # Call full Blender pipeline (from tris_to_quads_converter.exs)
    # This would use Pythonx to execute the Blender conversion script
    result_binary = call_blender_conversion(temp_input, output_filename)

    # Encode result as MIME
    MimeHandler.encode_binary_data(result_binary, output_filename, "model/usd")
  end

  defp call_blender_conversion(input_path, output_filename) do
    # Integrate with the Pythonx Blender code from tris_to_quads_converter.exs
    # This would execute the full conversion pipeline
    "mock_result" |> :erlang.binary_to_list() |> :erlang.list_to_binary()
  end
end

Usage Instructions

  1. Setup: Run the setup cells to initialize Python/Blender environment
  2. Test Nx: Run the optimization test with mock mesh data
  3. Test MIME: Try the MIME encoding/decoding demo
  4. Production: Integrate with full Blender pipeline for real 3D processing
  5. Zenoh: Uncomment service startup for real-time conversion requests

Performance Notes

  • Nx Optimization: Much faster than Python Pulp for large meshes
  • MIME Encoding: Efficient base64 transport of binary 3D data
  • Zenoh: Low-latency pub/sub for distributed processing
  • Blender Integration: Handles complex 3D operations that Nx cannot

This Livebook provides the complete foundation for a production tris-to-quads conversion service!