Youtube
slug = "youtube"
Mix.install([
# {:livebook_cluster, "0.1.1", git: "git@git.sr.ht:~minasmazar/livebook_cluster"},
{:livebook_cluster, "0.1.1", path: Path.expand("~/workspace/livebook_cluster")},
{:floki, "~> 0.36.2"},
{:tesla, "~> 1.10"},
{:kino, "~> 0.12.0"}
],
config: [
livebook_cluster: [slug: slug]
]
)
Main modules
Video
defmodule Youtube.Video do
defstruct [:id, :url, title: nil]
def new(attrs) do
struct(__MODULE__, attrs)
|> Map.put_new(:url, "https://youtube.com/watch?v=#{attrs[:id]}")
end
def equal?(video = %__MODULE__{}, another = %__MODULE__{}) do
(video.url == another.url) || (video.id == another.id)
end
def needs_sync?(%__MODULE__{id: id, title: title}) do
!title || !id
end
end
Youtube
search/1
Search for given query on Youtube via yt-dlp
.
get_rss/1
Having the channel ID (here a video explainging how to do that), construct as follow:
https://youtube.com/watch?v=CHANNEL-ID
to https://www.youtube.com/feeds/videos.xml?channel_id=CHANNEL-ID
defmodule Youtube do
alias Youtube.Video
def play(url) do
System.cmd("open", [url])
end
def search(query, limit \\ 1) do
with {output, _} <- System.cmd("yt-dlp", ["ytsearch#{limit}:\"#{query}\"", "--get-id", "--get-title"]) do
output
|> String.split("\n")
|> Enum.chunk_every(2)
|> Enum.map(fn
[title, id] -> Video.new(id: id, title: title)
_ -> nil
end)
|> Enum.filter(& &1)
end
end
def search_and_play(query) do
with [%{id: id} | _] <- search(query, 1) do
play("https://youtube.com/watch?v=#{id}&autoplay=true")
end
end
def fetch_info(vid = %Video{url: nil, id: id}) when is_binary(id) do
url = "https://youtube.com/watch?v=#{id}"
fetch_info(%{vid | url: url})
end
def fetch_info(vid = %Video{title: title, id: id, url: url})
when (is_nil(title) or is_nil(id)) and is_binary(url) do
with {output, _} <- System.cmd("yt-dlp", [vid.url, "--get-id", "--get-title"]) do
IO.puts("Fetching info for #{inspect vid}")
IO.puts("Received info #{inspect output}")
output
|> String.split("\n")
|> Enum.chunk_every(2)
|> Enum.map(fn
[title, id] -> Video.new(id: id, title: title, url: vid.url)
_ -> nil
end)
|> Enum.filter(& &1)
|> Enum.at(0)
end
end
def fetch_info(vid), do: vid
def get_rss(url) do
url
|> get_page_html()
|> parse_page()
|> parse_channel_element()
end
defp parse_channel_element(document) do
element =
document
|> Floki.find("link[itemprop='url'][href*='/channel']")
|> Enum.at(0)
with {_tag, props, _} <- element do
channel_url =
props
|> Enum.find(fn {key, _value} -> key == "href" end)
|> Tuple.to_list()
|> Enum.at(1)
with [_, id] <- Regex.run(~r[/channel/(.+)], channel_url) do
"https://www.youtube.com/feeds/videos.xml?channel_id=#{id}"
end
end
end
defp parse_page(html) do
Floki.parse_document!(html)
end
defp get_page_html(url) do
with {:ok, %{body: body}} <- Tesla.get(url), do: body
end
end
UX
defmodule UX do
use GenServer
def start_link(frame \\ Kino.Frame.new()) do
GenServer.start_link(__MODULE__, frame, name: __MODULE__)
end
def init(frame) do
{:ok, build_ui(frame)}
end
def frame, do: GenServer.call(__MODULE__, :frame)
def set_frame(frame), do: GenServer.call(__MODULE__, {:frame, frame})
def set_videos(videos) do
GenServer.cast(__MODULE__, {:set_videos, videos})
end
def handle_call(:frame, _, frame), do: {:reply, frame, frame}
def handle_call({:frame, frame}, _, _) do
with frame <- build_ui(frame) do
{:reply, frame, frame}
end
end
def handle_cast({:set_videos, videos}, frame) do
Kino.Frame.clear(frame)
for video <- videos, do: Kino.Frame.append(frame, render_video(video))
{:noreply, frame}
end
defp render_video(video) do
Kino.HTML.new("""
#{video.url}">#{video.title}
""")
end
defp build_ui(frame) do
channel_rss_from_url =
Kino.Control.form(
[
channel_url: Kino.Input.textarea("Channels:")
],
submit: "Submit"
)
Kino.listen(channel_rss_from_url, fn event ->
with result <- Youtube.get_rss(event.data.channel_url) do
IO.puts inspect(result)
end
end)
search =
Kino.Control.form(
[
query: Kino.Input.text("Search:")
],
submit: "Submit"
)
Kino.listen(search, fn event ->
result = Youtube.search(event.data.query, 1)
IO.puts inspect(result)
end)
search_and_play =
Kino.Control.form(
[
query: Kino.Input.text("Search and play:")
],
submit: "Submit"
)
Kino.listen(search_and_play, fn event ->
result = Youtube.search_and_play(event.data.query)
IO.puts inspect(result)
end)
collection = Kino.Frame.new()
refresh = Kino.Control.button("Refresh")
Kino.Frame.append(collection, refresh)
list = Kino.Frame.new()
Kino.Frame.append(collection, list)
Kino.listen(refresh, fn _event ->
videos = case LivebookCluster.rpc("kv", {KV, :get, [:youtube]}) do
videos when is_list(videos) -> videos
_ -> []
end
Kino.Frame.clear(list)
videos
|> Enum.map(fn video ->
url = URI.to_string(video.uri)
IO.puts("found video #{url}")
title = Map.get(video, :title, url)
html = Kino.HTML.new("""
#{url}" target="_blank">#{title}
""")
Kino.Frame.append(list, html)
end)
end)
[channel_rss_from_url, search, search_and_play, collection] |> Enum.each(& Kino.Frame.append(frame, &1))
frame
end
end
UX.start_link()
Collector
Fetch videos additiona info asynchronously.
defmodule Youtube.Collector do
@bifrost "bifrost"
@kv "kv"
@youtube_url_regex ~r[https?:\/\/(www\.)?youtube\.com\/watch\?v=(.{9,11})]
alias Youtube.Video
use GenServer
def start_link do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def init(videos) do
with state <- %{videos: videos} do
LivebookCluster.subscribe(@bifrost)
{:ok, schedule_next(state, :soon)}
end
end
def handle_info(:run, state) do
{:noreply, schedule_next(state)}
end
def handle_info({:page, page}, state) do
if (url = Map.get(page, :url)) && Regex.match?(@youtube_url_regex, url) do
sync_later(Video.new(url: url))
end
page
|> Map.get(:articles, [])
|> Enum.each(fn article ->
case Regex.scan(@youtube_url_regex, article) do
[match | _] -> with [url | _] <- match, do: sync_later(Video.new(url: url))
_ -> nil
end
end)
page
|> Map.get(:links, [])
|> Enum.each(fn link ->
case Regex.run(@youtube_url_regex, link) do
[url | _] -> sync_later(Video.new(url: url))
_ -> nil
end
end)
{:noreply, state}
end
def handle_info({:event, event}, state) do
with html <- Map.get(event, :html) || "",
type <- Map.get(event, :type),
selection <- Map.get(event, :selection) || "" do
case type do
"copy" ->
case Regex.run(@youtube_url_regex, selection) do
[url | _] -> sync_later(Video.new(url: url))
_ -> nil
end
_ ->
case Regex.run(@youtube_url_regex, html) do
[url | _] -> sync_later(Video.new(url: url))
_ -> nil
end
end
end
{:noreply, state}
end
def handle_info({:sync, video}, state) do
with videos <- state.videos do
existing_video = state.videos
|> Enum.find(& Video.equal?(&1, video))
video = Youtube.fetch_info(existing_video || video)
videos = [video | videos]
|> Enum.filter(fn %Video{id: id} -> id end)
|> Enum.uniq_by(fn %Video{id: id} -> id end)
state = %{state | videos: videos}
IO.puts("Collector has now #{length videos}")
UX.set_videos(videos)
LivebookCluster.broadcast(@kv, {:set, :videos, videos})
{:noreply, state}
end
end
defp schedule_next(state), do: schedule_next(state, 120_000)
defp schedule_next(state, :soon), do: schedule_next(state, 4_000)
defp schedule_next(state, timeout) do
Process.send_after(self(), :run, timeout)
state
end
defp sync_later(video) do
if Video.needs_sync?(video) do
timeout = 5_000 + Enum.random(0..10_000)
IO.puts("Syncing video #{inspect video} in #{timeout}msecs")
Process.send_after(self(), {:sync, video}, timeout)
end
end
end
Youtube.Collector.start_link()
Usage
UX.set_frame(Kino.Frame.new())