Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Youtube

livebooks/youtube.livemd

Youtube

slug = "youtube"

Mix.install([
  # {:livebook_cluster, "0.1.1", git: "git@git.sr.ht:~minasmazar/livebook_cluster"},
  {:livebook_cluster, "0.1.1", path: Path.expand("~/workspace/livebook_cluster")},
  {:floki, "~> 0.36.2"},
  {:tesla, "~> 1.10"},
  {:kino, "~> 0.12.0"}
],
  config: [
    livebook_cluster: [slug: slug]
  ]  
)

Main modules

Video

defmodule Youtube.Video do
  defstruct [:id, :url, title: nil]

  def new(attrs) do
    struct(__MODULE__, attrs)
    |> Map.put_new(:url, "https://youtube.com/watch?v=#{attrs[:id]}")
  end

  def equal?(video = %__MODULE__{}, another =  %__MODULE__{}) do
    (video.url == another.url) || (video.id == another.id)
  end

  def needs_sync?(%__MODULE__{id: id, title: title}) do
    !title || !id
  end
end

Youtube

search/1

Search for given query on Youtube via yt-dlp.

get_rss/1

Having the channel ID (here a video explainging how to do that), construct as follow:

https://youtube.com/watch?v=CHANNEL-ID to https://www.youtube.com/feeds/videos.xml?channel_id=CHANNEL-ID

defmodule Youtube do
  alias Youtube.Video

  def play(url) do
    System.cmd("open", [url])
  end

  def search(query, limit \\ 1) do
    with {output, _} <- System.cmd("yt-dlp", ["ytsearch#{limit}:\"#{query}\"", "--get-id", "--get-title"]) do
      output
      |> String.split("\n")
      |> Enum.chunk_every(2)
      |> Enum.map(fn
        [title, id] -> Video.new(id: id, title: title)
        _ -> nil
      end)
      |> Enum.filter(&amp; &amp;1)
    end
  end

  def search_and_play(query) do
    with [%{id: id} | _] <- search(query, 1) do
      play("https://youtube.com/watch?v=#{id}&autoplay=true")
    end
  end

  def fetch_info(vid = %Video{url: nil, id: id}) when is_binary(id) do
    url = "https://youtube.com/watch?v=#{id}"
    fetch_info(%{vid | url: url})
  end

  def fetch_info(vid = %Video{title: title, id: id, url: url})
  when (is_nil(title) or is_nil(id)) and is_binary(url) do
    with {output, _} <- System.cmd("yt-dlp", [vid.url, "--get-id", "--get-title"]) do
      IO.puts("Fetching info for #{inspect vid}")
      IO.puts("Received info #{inspect output}")
      output
      |> String.split("\n")
      |> Enum.chunk_every(2)
      |> Enum.map(fn
        [title, id] -> Video.new(id: id, title: title, url: vid.url)
        _ -> nil
      end)
      |> Enum.filter(&amp; &amp;1)
      |> Enum.at(0)
    end
  end

  def fetch_info(vid), do: vid

  def get_rss(url) do
    url
    |> get_page_html()
    |> parse_page()
    |> parse_channel_element()
  end

  defp parse_channel_element(document) do
    element =
      document
      |> Floki.find("link[itemprop='url'][href*='/channel']")
      |> Enum.at(0)

    with {_tag, props, _} <- element do
      channel_url =
        props
        |> Enum.find(fn {key, _value} -> key == "href" end)
        |> Tuple.to_list()
        |> Enum.at(1)

      with [_, id] <- Regex.run(~r[/channel/(.+)], channel_url) do
        "https://www.youtube.com/feeds/videos.xml?channel_id=#{id}"
      end
    end
  end

  defp parse_page(html) do
    Floki.parse_document!(html)
  end

  defp get_page_html(url) do
    with {:ok, %{body: body}} <- Tesla.get(url), do: body
  end
end

UX

defmodule UX do
  use GenServer

  def start_link(frame \\ Kino.Frame.new()) do
    GenServer.start_link(__MODULE__, frame, name: __MODULE__)
  end

  def init(frame) do
    {:ok, build_ui(frame)}
  end

  def frame, do: GenServer.call(__MODULE__, :frame)
  def set_frame(frame), do: GenServer.call(__MODULE__, {:frame, frame})

  def set_videos(videos) do
    GenServer.cast(__MODULE__, {:set_videos, videos})
  end

  def handle_call(:frame, _, frame), do: {:reply, frame, frame}
  def handle_call({:frame, frame}, _, _) do
    with frame <- build_ui(frame) do
      {:reply, frame, frame}
    end
  end

  def handle_cast({:set_videos, videos}, frame) do
    Kino.Frame.clear(frame)
    for video <- videos, do: Kino.Frame.append(frame, render_video(video))
    {:noreply, frame}
  end

  defp render_video(video) do
    Kino.HTML.new("""
      #{video.url}">#{video.title}
    """)
  end

  defp build_ui(frame) do
    channel_rss_from_url =
      Kino.Control.form(
        [
          channel_url: Kino.Input.textarea("Channels:")
        ],
        submit: "Submit"
      )

    Kino.listen(channel_rss_from_url, fn event ->
      with result <- Youtube.get_rss(event.data.channel_url) do
	      IO.puts inspect(result)
      end
    end)

    search =
      Kino.Control.form(
	      [
	        query: Kino.Input.text("Search:")
        ],
	      submit: "Submit"
      )

    Kino.listen(search, fn event ->
      result = Youtube.search(event.data.query, 1)
      IO.puts inspect(result)
    end)

    search_and_play =
      Kino.Control.form(
	      [
	        query: Kino.Input.text("Search and play:")
	      ],
	      submit: "Submit"
      )

    Kino.listen(search_and_play, fn event ->
      result = Youtube.search_and_play(event.data.query)
      IO.puts inspect(result)
    end)

    collection = Kino.Frame.new()
    refresh = Kino.Control.button("Refresh")
    Kino.Frame.append(collection, refresh)
    list = Kino.Frame.new()
    Kino.Frame.append(collection, list)

    Kino.listen(refresh, fn _event ->
      videos = case LivebookCluster.rpc("kv", {KV, :get, [:youtube]}) do
		  videos when is_list(videos) -> videos
		  _ -> []
	  end

    Kino.Frame.clear(list)
    videos
    |> Enum.map(fn video ->
      url = URI.to_string(video.uri)
	    IO.puts("found video #{url}")
	    title = Map.get(video, :title, url)
	    html = Kino.HTML.new("""
  	    #{url}" target="_blank">#{title}
      """)
	    Kino.Frame.append(list, html)
      end)
    end)

    [channel_rss_from_url, search, search_and_play, collection] |> Enum.each(&amp; Kino.Frame.append(frame, &amp;1))
    frame
  end
end

UX.start_link()

Collector

Fetch videos additiona info asynchronously.

defmodule Youtube.Collector do
  @bifrost "bifrost"
  @kv "kv"
  @youtube_url_regex ~r[https?:\/\/(www\.)?youtube\.com\/watch\?v=(.{9,11})]

  alias Youtube.Video
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(videos) do
    with state <- %{videos: videos} do
      LivebookCluster.subscribe(@bifrost)
      {:ok, schedule_next(state, :soon)}
    end
  end

  def handle_info(:run, state) do
    {:noreply, schedule_next(state)}
  end

  def handle_info({:page, page}, state) do
    if (url = Map.get(page, :url)) &amp;&amp; Regex.match?(@youtube_url_regex, url) do
      sync_later(Video.new(url: url))
    end

    page
    |> Map.get(:articles, [])
    |> Enum.each(fn article ->
      case Regex.scan(@youtube_url_regex, article) do
        [match | _] -> with [url | _] <- match, do: sync_later(Video.new(url: url))
        _ -> nil
      end
    end)

    page
    |> Map.get(:links, [])
    |> Enum.each(fn link ->
      case Regex.run(@youtube_url_regex, link) do
        [url | _] -> sync_later(Video.new(url: url))
        _ -> nil
      end
    end)

    {:noreply, state}
  end

  def handle_info({:event, event}, state) do
    with html <- Map.get(event, :html) || "",
         type <- Map.get(event, :type),
         selection <- Map.get(event, :selection) || "" do
      case type do
        "copy" ->
          case Regex.run(@youtube_url_regex, selection) do
            [url | _] -> sync_later(Video.new(url: url))
            _ -> nil
          end
        _ ->
          case Regex.run(@youtube_url_regex, html) do
            [url | _] -> sync_later(Video.new(url: url))
            _ -> nil
          end
      end
    end

    {:noreply, state}
  end

  def handle_info({:sync, video}, state) do
    with videos <- state.videos do
      existing_video = state.videos
      |> Enum.find(&amp; Video.equal?(&amp;1, video))

      video = Youtube.fetch_info(existing_video || video)

      videos = [video | videos]
      |> Enum.filter(fn %Video{id: id} -> id end)
      |> Enum.uniq_by(fn %Video{id: id} -> id end)

      state = %{state | videos: videos}
      IO.puts("Collector has now #{length videos}")
      UX.set_videos(videos)
      LivebookCluster.broadcast(@kv, {:set, :videos, videos})

      {:noreply, state}
    end
  end

  defp schedule_next(state), do: schedule_next(state, 120_000)
  defp schedule_next(state, :soon), do: schedule_next(state, 4_000)
  defp schedule_next(state, timeout) do
    Process.send_after(self(), :run, timeout)
    state
  end

  defp sync_later(video) do
    if Video.needs_sync?(video) do
      timeout = 5_000 + Enum.random(0..10_000)
      IO.puts("Syncing video #{inspect video} in #{timeout}msecs")
      Process.send_after(self(), {:sync, video}, timeout)
    end
  end
end

Youtube.Collector.start_link()

Usage

UX.set_frame(Kino.Frame.new())