Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Web Scraper

main.livemd

Web Scraper

Mix.install([
  {:kino, "~> 0.10.0"},
  {:floki, "~> 0.34.3"},
  {:jason, "~> 1.4"},
  {:httpoison, "~> 2.1"},
  {:hackney, "~> 1.18"}
])

Define the Scraper Module

defmodule Scraper do
  @moduledoc """
  A simple web sraper
  """

  require Logger
  alias HTTPoison, as: HTTP

  def scrape_all(data) do
    url_list = to_string(data.url_list) |> String.trim()

    urls =
      if String.length(url_list) > 0 do
        String.split(url_list, ",")
        |> Enum.map(&String.trim(&1))
        |> Enum.filter(fn x -> x != "" end)
      else
        url = data.main_url_slug |> to_string()

        num_pages = get_clean_input(data.num_pages, :int)

        start_from = get_clean_input(data.start_from, :int)

        page_change_delta = get_clean_input(data.page_change_delta, :int)

        pages_range =
          case data.page_change_direction do
            :increase ->
              Enum.to_list(start_from..(start_from + (num_pages - 1) * page_change_delta))

            :decrease ->
              Enum.to_list(start_from..(start_from - (num_pages - 1) * page_change_delta))

            _ ->
              []
          end

        pages_range
        |> Enum.map(fn page ->
          url
          |> String.replace("\#{}", Integer.to_string(page))
        end)
      end

    IO.inspect(urls)
    output = for(x <- urls, do: get_page(Map.merge(data, %{main_url: x}))) |> List.flatten()

    IO.inspect(length(output))
    output
  end

  def get_page(data) do
    # should run scrape on a single page

    if data.scrape_each_page == true do
      page_links =
        scrape(data.main_url, data.main_item_selector)
        |> Enum.map(fn x ->
          partial_url = Floki.attribute(x, "href") |> Enum.at(0) |> to_string()
          full_url = to_string(data.relative_url_prefix) <> partial_url
          full_url
        end)

      IO.inspect(page_links)
      # go to each page, get the attributes from each page
      output =
        Enum.map(
          page_links,
          fn url ->
            updated_url =
              if String.length(data.main_item_url_prefix) > 0 and
                   !String.contains?(url, "https://") do
                to_string(data.main_item_url_prefix) <> url
              else
                url
              end

            info =
              case scrape_page(updated_url) do
                {:ok, document} -> get_attributes(document, data)
                _ -> nil
              end

            info
          end
        )
        |> Enum.filter(fn x -> x != nil end)

      output
    else
      # get_attributes for each item and then add the static information to each item
      items = scrape(data.main_url, data.main_item_selector)

      Enum.map(items, &amp;get_attributes(&amp;1, data))
      |> Enum.filter(fn x -> x != nil end)
    end
  end

  # functions for getting main items from a page
  def scrape(url, main_item_selector) do
    case HTTP.get(url, ["Accept": "*/*", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"],
           ssl: [
             verify_fun:
               {fn _, reason, state ->
                  case reason do
                    {:bad_cert, :cert_expired} -> {:valid, state}
                    {:bad_cert, :unknown_ca} -> {:valid, state}
                    {:extension, _} -> {:valid, state}
                    :valid -> {:valid, state}
                    :valid_peer -> {:valid, state}
                    error -> {:fail, error}
                  end
                end, []}
           ],
           recv_timeout: 10000,
           timeout: 25000
         ) do
      {:ok, %HTTP.Response{status_code: 200, body: body}} ->
        IO.inspect("main url body #{body}")
        {:ok, document} = Floki.parse_document(body)
        IO.inspect(document)
        get_items(document, main_item_selector)

      {:ok, %HTTP.Response{status_code: status_code, body: _body}} ->
        Logger.info("Received status code #{status_code} from #{url}")
        :error

      {:error, %HTTP.Error{reason: reason}} ->
        Logger.error("HTTP Error: #{reason}")
        :error
    end
  end

  defp get_items(body, selector) do
    items =
      body
      |> Floki.find(selector)

    items
  end

  # functions from getting information from a product page
  def scrape_page(url) do
    case HTTP.get(url, ["Accept": "*/*","User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"],
           ssl: [
             verify_fun:
               {fn _, reason, state ->
                  case reason do
                    {:bad_cert, :cert_expired} -> {:valid, state}
                    {:bad_cert, :unknown_ca} -> {:valid, state}
                    {:extension, _} -> {:valid, state}
                    :valid -> {:valid, state}
                    :valid_peer -> {:valid, state}
                    error -> {:fail, error}
                  end
                end, []}
           ],
           follow_redirect: true,
           recv_timeout: 10000,
           timeout: 25000
         ) do
      {:ok, %HTTP.Response{status_code: 200, body: body}} ->
        Floki.parse_document(body)

      {:ok, %HTTP.Response{status_code: status_code, body: _body}} ->
        Logger.info("Received status code #{status_code} from #{url}")
        :error

      {:error, %HTTP.Error{reason: reason}} ->
        Logger.error("HTTP Error: #{reason}")
        :error

      _ ->
        IO.inspect("Error getting url #{url}")
        :error
    end
  end

  defp get_attributes(item, data) do
    IO.inspect("now getting attributes")
    # use the selectors and static information given in data to extract details from the item
    title = get_text_from_selector(item, data.title_selector)
    url_selector_1 = get_clean_input(data.item_url_selector, :str)
    url_selector_2 = get_clean_input(data.item_url_selector_2, :str)
    url_selector_3 = get_clean_input(data.item_url_selector_3, :str)

    possible_urls =
      Enum.map(
        [url_selector_1, url_selector_2, url_selector_3],
        fn x ->
          get_attribute_from_selector(item, x, "href")
        end
      )
      |> Enum.filter(fn x ->
        String.length(x) != 0
      end)

    IO.inspect(possible_urls)
    url = Enum.at(possible_urls, 0)

    related_url = to_string(data.related_url_input) |> String.trim()

    if String.length(title) == 0 or url == nil or String.length(url) == String.length(related_url) do
      nil
    else
      url_prefix = to_string(data.relative_url_prefix) |> String.trim()

      url =
        if String.length(url_prefix) > 0 and
             (!String.contains?(url, "https://") or !String.contains?(url, url_prefix)) do
          url_prefix <> url
        else
          url
        end

      {resource_type, _} = to_string(data.resource_type_input) |> Integer.parse()

      publisher = to_string(data.publisher_input) |> String.trim()

      year_selector = data.year_selector
      date_selector = data.date_selector

      {year, date_year_month_day} =
        cond do
          year_selector != "" and date_selector != "" ->
            {get_text_from_selector(item, year_selector),
             get_text_from_selector(item, date_selector)}

          year_selector != "" and date_selector == "" ->
            {get_text_from_selector(item, year_selector), ""}

          year_selector == "" and date_selector != "" ->
            timestamp = get_text_from_selector(item, date_selector)
            parsed_date_tuple = DateTime.from_iso8601(timestamp)

            year =
              case parsed_date_tuple do
                {_, parsed_date, _} -> parsed_date.year
                _ -> ""
              end

            {year, timestamp}

          true ->
            {"", ""}
        end

      details =
        Map.new([
          {"title", title},
          {"url", url},
          {"year", year},
          {"date_year_month_day", date_year_month_day},
          {"resource_type", resource_type},
          {"related_url", related_url},
          {"publisher", publisher},
          {"is_global", false},
          {"is_pdf", String.contains?(url, ".pdf")}
        ])

      details
    end
  end

  defp get_text_from_selector(item, selector) do
    Floki.find(item, selector)
    |> Floki.text()
    |> to_string()
    |> String.replace("Published:", "")
    |> String.replace("Recent Publication:", "")
    |> String.trim()
  end

  defp get_attribute_from_selector(item, selector, attribute) do
    Floki.find(item, selector)
    |> Floki.attribute(attribute)
    |> Enum.at(0)
    |> to_string()
    |> String.trim()
  end

  defp get_clean_input(item, type) do
    cleaned = to_string(item) |> String.trim()

    output =
      case type do
        :int ->
          {num, _} = Integer.parse(cleaned)
          num

        :str ->
          cleaned

        _ ->
          cleaned
      end

    output
  end
end

Build the Form

url_list = Kino.Input.textarea("Enter list of urls separated by commas")
main_url = Kino.Input.text("Main URL")

has_pagination = Kino.Input.checkbox("Has Pagination")
num_pages = Kino.Input.text("Number of pages to scrape")

main_url_slug =
  Kino.Input.text(
    "Re-enter main URL with a placeholder for page number e.g. 'https://example.com/#{}'"
  )

start_from = Kino.Input.text("Enter first value to apply to slug")

page_change_direction =
  Kino.Input.select("Increase or decrease value to get new pages",
    increase: "Increase",
    decrease: "Decrease"
  )

page_change_delta =
  Kino.Input.text(
    "Enter the integer delta to get next page e.g. if pages are 1,2,3 this should be 1"
  )

main_item_selector = Kino.Input.text("CSS Selector for the main item")
main_item_url_prefix = Kino.Input.text("Main item URL Prefix")
scrape_each_page = Kino.Input.checkbox("Need to scrape individual page?")

item_url_selector =
  Kino.Input.text("CSS selector for main url (should be url of pdf if item is pdf)")

item_url_selector_2 = Kino.Input.text("1st Alternative CSS selector for main url")
item_url_selector_3 = Kino.Input.text("2nd Alternative CSS selector for main url")
relative_url_prefix = Kino.Input.text("Prefix to add to relative urls")
title_selector = Kino.Input.text("CSS selector for title")

# Static values to include
related_url_input = Kino.Input.text("Enter the related url")
resource_type_input = Kino.Input.text("Enter the resource type as integer")
publisher_input = Kino.Input.text("Enter the name of publisher")
year_selector = Kino.Input.text("Enter the selector for year")
date_selector = Kino.Input.text("Enter the selector for date")

scraper_form =
  Kino.Control.form(
    [
      main_url: main_url,
      url_list: url_list,
      has_pagination: has_pagination,
      num_pages: num_pages,
      main_url_slug: main_url_slug,
      start_from: start_from,
      page_change_direction: page_change_direction,
      page_change_delta: page_change_delta,
      related_url_input: related_url_input,
      main_item_selector: main_item_selector,
      main_item_url_prefix: main_item_url_prefix,
      item_url_selector: item_url_selector,
      item_url_selector_2: item_url_selector_2,
      item_url_selector_3: item_url_selector_3,
      scrape_each_page: scrape_each_page,
      relative_url_prefix: relative_url_prefix,
      title_selector: title_selector,
      year_selector: year_selector,
      date_selector: date_selector,
      resource_type_input: resource_type_input,
      publisher_input: publisher_input
    ],
    submit: "Process"
  )

Listen to Form Submissions and Process them

frame = Kino.Frame.new()
Kino.listen(
  scraper_form,
  fn event ->
    IO.inspect(event)
    # Create a function that generates the JSON output
    content_fun = fn ->
      output =
        if event.data.has_pagination == true do
          Scraper.scrape_all(event.data)
          |> Jason.encode!()
        else
          Scraper.get_page(event.data)
          |> Jason.encode!()
        end

      output
    end

    # Create the download button
    Kino.Frame.render(frame, Kino.Download.new(content_fun), to: event.origin)
  end
)

Thanks Jonatan Klosko!