Sponsored by AppSignal
Would you like to see your link here? Contact us
Notesclub

ElixirTube Data Wizard

priv/data/wizard.livemd

ElixirTube Data Wizard

Mix.install(
  [
    {:kino, "~> 0.11.2", override: true},
    {:tesla, "~> 1.8"},
    {:jason, "~> 1.4"},
    {:openai, "~> 0.5.4"},
    {:slugify, "~> 1.3"},
    {:yaml_elixir, "~> 2.9"}
  ],
  config: [
    openai: [
      api_key: System.get_env("LB_OPEN_AI_API_KEY"),
      organization_key: System.get_env("LB_OPEN_AI_ORGANIZATION_ID"),
      http_options: [recv_timeout: 120_000]
    ],
    youtube: [
      api_key: System.get_env("LB_YOUTUBE_DATA_API_KEY")
    ]
  ]
)

Intro

This notebook will help you enrich ElixirTube library by extracting data from YouTube and optionally normalizing/transforming it with GPT3.5.

To use this notebook you need to add the following secrets to your Livebook hub:

  • YOUTUBE_DATA_API_KEY -> required, can be obtained from Google’s developer console.

Optional: if you choose to use GPT3.5, you also need to have an OpenAI account with credit balance and set these environment variables:

  • OPEN_AI_API_KEY
  • OPEN_AI_ORGANIZATION_ID

Modules and structs

You may want to scroll down to Input section to make use of these.

# Entity structs

defmodule Series do
  defstruct [:title, description: "", urls: []]
end

defmodule Speaker do
  defstruct [:name, bio: "", urls: []]
end

defmodule Playlist do
  defstruct [
    :title,
    :locations,
    :source,
    :thumbnails,
    :published_at,
    description: "",
    urls: []
  ]
end

defmodule Video do
  defstruct [
    :title,
    :raw_title,
    :speaker_names,
    :source,
    :thumbnails,
    :published_at,
    description: ""
  ]
end

# Value structs

defmodule Thumbnails do
  defstruct ~w[xs s m l xl]a
end

# Relationship:

# Series has many playlists has many videos
# Speakers has many and belongs to videos

Kino.nothing()
defmodule YouTube do
  use Tesla

  plug(Tesla.Middleware.BaseUrl, "https://youtube.googleapis.com")
  plug(Tesla.Middleware.JSON)

  @hosts ~w[www.youtube.com youtube.com]
  @parts "id,snippet,status"

  def extract_playlist_data(url) when is_binary(url) do
    with %{host: yt, path: "/playlist", query: q} when yt in @hosts <- URI.parse(url),
         %{"list" => id} when is_binary(id) <- URI.decode_query(q),
         {:ok, playlist} <- fetch_playlist(id),
         {:ok, videos} <- fetch_videos(id) do
      {:ok, %{playlist: playlist, videos: videos}}
    else
      %URI{} -> {:error, :invalid_uri}
      %{} -> {:error, :missing_playlist_id}
      error -> error
    end
  end

  defp api_key do
    Application.fetch_env!(:youtube, :api_key)
  end

  defp fetch_playlist(id) do
    query = [key: api_key(), part: @parts, id: id, maxResults: 1]

    case get("/youtube/v3/playlists", query: query) do
      {:ok, %{status: 200, body: %{"items" => [%{"id" => ^id} = data]}}} ->
        {:ok, to_playlist(data)}

      {_, response} ->
        {:error, response}
    end
  end

  defp fetch_videos(playlist_id) do
    Stream.resource(
      fn -> [key: api_key(), part: @parts, playlistId: playlist_id, maxResults: 50] end,
      fn
        :no_more_page ->
          {:halt, :no_more_page}

        query ->
          case get("/youtube/v3/playlistItems", query: query) do
            {:ok, %{status: 200, body: %{"items" => videos, "nextPageToken" => page_token}}} ->
              {videos, Keyword.put(query, :pageToken, page_token)}

            {:ok, %{status: 200, body: %{"items" => videos}}} ->
              {videos, :no_more_page}
          end
      end,
      fn _ -> :noop end
    )
    |> Stream.filter(&amp;public?/1)
    |> Stream.map(&amp;to_video/1)
    |> Enum.to_list()
    |> case do
      [] -> {:error, :fetch_videos_empty_handed}
      videos -> {:ok, videos}
    end
  end

  defp public?(%{"status" => %{"privacyStatus" => "public"}}), do: true
  defp public?(_), do: false

  defp to_playlist(%{"id" => id, "snippet" => %{"thumbnails" => t} = s}) do
    %Playlist{
      title: s["title"],
      description: s["description"],
      source: "youtube:#{id}",
      thumbnails: to_thumbnails(t),
      published_at: s["publishedAt"]
    }
  end

  defp to_video(%{"snippet" => %{"thumbnails" => t} = s}) do
    %Video{
      title: s["title"],
      raw_title: s["title"],
      description: s["description"],
      speaker_names: [],
      source: "youtube:#{s["resourceId"]["videoId"]}",
      thumbnails: to_thumbnails(t),
      published_at: s["publishedAt"]
    }
  end

  defp to_thumbnails(t) do
    %Thumbnails{
      xs: t["default"]["url"],
      s: t["medium"]["url"],
      m: t["high"]["url"],
      l: t["standard"]["url"],
      xl: t["maxres"]["url"]
    }
  end
end

Kino.nothing()
defmodule GPT do
  def retitle_and_set_speakers(videos, event_name) do
    input = Enum.map(videos, fn v -> %{title: v.raw_title, speakers: v.speaker_names} end)
    input_json = Jason.encode!(%{talks: input})

    prompt = """
      #{input_json}

      Update the input JSON above with:

      For each object:
        1. find speaker names inside the `title`
        2. set `speakers` to array of speaker names
        3. update the `title` using these steps:
          3.1. Remove speaker names
          3.2. Remove '#{event_name}' (account for occasional slight typo and variation in capitalization)

      Do not include any explanations, only return a RFC8259 compliant JSON response adhering to the input JSON structure without deviation.
    """

    max_retries = 8

    outputs =
      Enum.reduce_while(1..max_retries, [], fn nth, _ac ->
        gpt_result =
          OpenAI.completions(
            model: "gpt-3.5-turbo-instruct",
            max_tokens: ceil(String.length(input_json) * 1.5),
            prompt: prompt
          )

        with {:ok, %{choices: [%{"finish_reason" => "stop", "text" => json}]}} <- gpt_result,
             {:ok, %{"talks" => list}} <- Jason.decode(json) do
          {:halt, {:ok, list}}
        else
          error ->
            case nth do
              ^max_retries ->
                {:halt, {:gpterror, error}}

              _ ->
                {:cont, []}
            end
        end
      end)

    case outputs do
      {:ok, list} when is_list(list) ->
        videos =
          videos
          |> Enum.zip(list)
          |> Enum.map(fn {v, o} ->
            %{
              v
              | title: Map.get(o, "title", v.raw_title),
                speaker_names: Map.get(o, "speakers", v.speaker_names)
            }
          end)

        {:ok, videos}

      {:gpterror, _} = error ->
        error
    end
  end
end

Kino.nothing()
defmodule SaveData do
  @template_dir Path.join(__DIR__, ".templates")
  @templates %{
    Series => Path.join(@template_dir, "series.yml.eex"),
    Speaker => Path.join(@template_dir, "speaker.yml.eex"),
    Playlist => Path.join(@template_dir, "playlist.yml.eex"),
    Video => Path.join(@template_dir, "video.yml.eex")
  }

  def write_all_overwriting_duplicates!(data) do
    # Save speakers
    dir = Path.join(__DIR__, "speakers")
    File.mkdir_p!(dir)

    Enum.each(data.speakers, fn speaker ->
      Path.join(dir, "#{Slug.slugify(speaker.name)}.yml")
      |> write!(speaker)
    end)

    # Save series
    slug = Slug.slugify(data.series.title)
    dir = Path.join([__DIR__, "series", slug])
    File.mkdir_p!(dir)

    Path.join(dir, "series.yml")
    |> write!(data.series)

    # Save playlist
    slug = Slug.slugify(data.playlist.title)
    dir = Path.join(dir, slug)
    File.mkdir_p!(dir)

    Path.join(dir, "playlist.yml")
    |> write!(data.playlist)

    # Save videos
    dir = Path.join(dir, "media")
    File.mkdir_p!(dir)

    data.videos
    |> Enum.with_index()
    |> Enum.each(fn {video, idx} ->
      Path.join(dir, "#{idx}_#{Slug.slugify(video.title)}.yml")
      |> write!(video)
    end)
  end

  defp write!(path, data) do
    content = eval_template!(data)
    File.write!(path, content)
  end

  defp eval_template!(%strukt{} = data) do
    template = Map.fetch!(@templates, strukt)
    assigns = Map.from_struct(data)
    EEx.eval_file(template, assigns: assigns)
  end
end

Kino.nothing()

Input

"""
### Series information

*Series* is a way to organize recurring events/conferences.
As an example, `ElixirConf 2014`, `ElixirConf 2015`, etc. belongs to `ElixirConf` series.
"""
|> Kino.Markdown.new()
|> Kino.render()

series_data =
  __DIR__
  |> Path.join("series/*/series.yml")
  |> Path.wildcard()
  |> Enum.map(fn path ->
    %{"series" => data} = YamlElixir.read_from_file!(path)
    %{data | "urls" => Enum.join(data["urls"], "\n")}
  end)
  |> then(&amp;[%{} | &amp;1])

series_options =
  series_data
  |> Stream.with_index()
  |> Enum.map(fn
    {_data, 0} -> {0, ""}
    {data, at} -> {at, data["title"]}
  end)

select_series = Kino.Input.select("Select existing or create a new series", series_options)
data = %{series: %Series{}, playlist: %Playlist{}, videos: [], speakers: []}

series_data =
  select_series
  |> Kino.Input.read()
  |> then(&amp;Enum.at(series_data, &amp;1))

"### Edit series data"
|> Kino.Markdown.new()
|> Kino.render()

input_series_title =
  Kino.Input.textarea(
    "Series title (unique identifier, will overwrite existing series of the same title)",
    default: Map.get(series_data, "title", "")
  )
  |> tap(&amp;Kino.render/1)

input_series_description =
  Kino.Input.textarea("Series description", default: Map.get(series_data, "description", ""))
  |> tap(&amp;Kino.render/1)

input_series_urls =
  Kino.Input.textarea("Series URLs (one per line)",
    default: Map.get(series_data, "urls", "")
  )
  |> tap(&amp;Kino.render/1)

"""
### Video source

Enter a YouTube playlist URL for an event:
"""
|> Kino.Markdown.new()
|> Kino.render()

input_playlist_url =
  Kino.Input.textarea("YouTube playlist URL")
series = %Series{
  title: input_series_title |> Kino.Input.read() |> String.trim(),
  description: input_series_description |> Kino.Input.read() |> String.trim(),
  urls:
    input_series_urls
    |> Kino.Input.read()
    |> String.trim()
    |> String.split("\n")
    |> Enum.map(&amp;String.trim/1)
}

{:ok, %{playlist: _, videos: _} = result} =
  input_playlist_url
  |> Kino.Input.read()
  |> YouTube.extract_playlist_data()

result =
  Map.update(result, :videos, [], fn videos ->
    Enum.uniq_by(videos, &amp; &amp;1.source)
  end)

data =
  data
  |> Map.put(:series, series)
  |> Map.merge(result)

"""
### Playlist Information

**Number of videos**: #{Enum.count(data.videos)}

Edit as you see fit:
"""
|> Kino.Markdown.new()
|> Kino.render()

input_playlist_title =
  Kino.Input.textarea("Title", default: result.playlist.title)
  |> tap(&amp;Kino.render/1)

input_playlist_description =
  Kino.Input.textarea("Description", default: result.playlist.description)
  |> tap(&amp;Kino.render/1)

input_playlist_locations =
  Kino.Input.textarea("Locations (one per line)", default: result.playlist.locations)
  |> tap(&amp;Kino.render/1)

input_playlist_urls =
  Kino.Input.textarea("URLs (one per line)", default: "")
playlist =
  %{
    result.playlist
    | title: input_playlist_title |> Kino.Input.read() |> String.trim(),
      locations:
        input_playlist_locations
        |> Kino.Input.read()
        |> String.trim()
        |> String.split("\n")
        |> Enum.map(&amp;String.trim/1),
      description: input_playlist_description |> Kino.Input.read() |> String.trim(),
      urls:
        input_playlist_urls
        |> Kino.Input.read()
        |> String.trim()
        |> String.split("\n")
        |> Enum.map(&amp;String.trim/1)
  }

data = %{data | playlist: playlist}

Kino.Tree.new(data)
"### Video selection"
|> Kino.Markdown.new()
|> Kino.render()

include_all_videos? = Kino.Input.checkbox("Include all videos from the playlist?", default: true)
Kino.Markdown.new("Select videos to include:")
|> Kino.render()

include_video? = Kino.Input.read(include_all_videos?)

videos_to_include =
  data.videos
  |> Enum.with_index()
  |> Enum.map(fn {video, idx} ->
    input =
      Kino.Input.checkbox("##{idx} #{video.title}", default: include_video?)

    Kino.render(input)
    %{video: video, input: input}
  end)

Kino.Markdown.new("### Video data")
|> Kino.render()

use_gpt? = Kino.Input.checkbox("Use GPT 3.5 Turbo to set speakers and clean up video titles?")
videos =
  videos_to_include
  |> Enum.filter(fn %{input: i} -> Kino.Input.read(i) end)
  |> Enum.map(fn %{video: v} -> v end)

videos =
  case Kino.Input.read(use_gpt?) do
    false ->
      videos

    true ->
      videos
      |> Enum.chunk_every(10)
      |> Enum.map(fn ten_videos ->
        Task.async(fn -> GPT.retitle_and_set_speakers(ten_videos, data.playlist.title) end)
      end)
      |> Enum.flat_map(fn task ->
        case Task.await(task, :infinity) do
          {:ok, updated_videos} when is_list(updated_videos) ->
            updated_videos

          error ->
            Kino.Tree.new(error) |> Kino.render()
            []
        end
      end)
  end

:ok
Kino.Markdown.new("Review and edit the video data as you see fit:")
|> Kino.render()

video_data_inputs =
  Enum.map(videos, fn video ->
    frame = Kino.Frame.new()

    video_url =
      case video.source do
        "youtube:" <> id -> "https://youtu.be/#{id}"
      end

    """
    [![](#{video.thumbnails.s})](#{video_url})

    Original title:

    ```
    #{video.raw_title}
    ```
    """
    |> Kino.Markdown.new()
    |> then(&amp;Kino.Frame.render(frame, &amp;1))

    inputs = [
      title: Kino.Input.textarea("Title", default: video.title),
      description: Kino.Input.textarea("Description", default: video.description),
      speaker_names:
        Kino.Input.textarea("Speaker names (one name per line)",
          default: Enum.join(video.speaker_names, "\n")
        )
    ]

    inputs
    |> Enum.each(fn {_, input} -> Kino.Frame.append(frame, input) end)

    %{video: video, frame: frame, inputs: inputs}
  end)

video_data_inputs
|> Enum.with_index()
|> Enum.map(fn {%{frame: frame}, idx} -> {"##{idx}", frame} end)
|> Kino.Layout.tabs()

Save data

videos =
  video_data_inputs
  |> Enum.map(fn %{video: video, inputs: inputs} ->
    %{
      video
      | title:
          inputs[:title]
          |> Kino.Input.read()
          |> String.trim(),
        description:
          inputs[:description]
          |> Kino.Input.read()
          |> String.trim(),
        speaker_names:
          inputs[:speaker_names]
          |> Kino.Input.read()
          |> String.trim()
          |> String.split("\n")
          |> Enum.map(&amp;String.trim/1)
          |> Enum.reject(&amp;(&amp;1 == ""))
    }
  end)

speakers =
  videos
  |> Enum.flat_map(&amp; &amp;1.speaker_names)
  |> Enum.uniq()
  |> Enum.map(&amp;%Speaker{name: &amp;1})

Kino.Markdown.new("Data preview:") |> Kino.render()

data = %{data | videos: videos, speakers: speakers}

Kino.Tree.new(data) |> Kino.render()

Kino.Markdown.new(
  "The data above will be stored as YAML files in `#{__DIR__}` which you can further edit using any text editor."
)
# Let's do this!

SaveData.write_all_overwriting_duplicates!(data)