Web Scraper
Mix.install([
{:kino, "~> 0.10.0"},
{:floki, "~> 0.34.3"},
{:jason, "~> 1.4"},
{:httpoison, "~> 2.1"},
{:hackney, "~> 1.18"}
])
Define the Scraper Module
defmodule Scraper do
@moduledoc """
A simple web sraper
"""
require Logger
alias HTTPoison, as: HTTP
def scrape_all(data) do
url_list = to_string(data.url_list) |> String.trim()
urls =
if String.length(url_list) > 0 do
String.split(url_list, ",")
|> Enum.map(&String.trim(&1))
|> Enum.filter(fn x -> x != "" end)
else
url = data.main_url_slug |> to_string()
num_pages = get_clean_input(data.num_pages, :int)
start_from = get_clean_input(data.start_from, :int)
page_change_delta = get_clean_input(data.page_change_delta, :int)
pages_range =
case data.page_change_direction do
:increase ->
Enum.to_list(start_from..(start_from + (num_pages - 1) * page_change_delta))
:decrease ->
Enum.to_list(start_from..(start_from - (num_pages - 1) * page_change_delta))
_ ->
[]
end
pages_range
|> Enum.map(fn page ->
url
|> String.replace("\#{}", Integer.to_string(page))
end)
end
IO.inspect(urls)
output = for(x <- urls, do: get_page(Map.merge(data, %{main_url: x}))) |> List.flatten()
IO.inspect(length(output))
output
end
def get_page(data) do
# should run scrape on a single page
if data.scrape_each_page == true do
page_links =
scrape(data.main_url, data.main_item_selector)
|> Enum.map(fn x ->
partial_url = Floki.attribute(x, "href") |> Enum.at(0) |> to_string()
full_url = to_string(data.relative_url_prefix) <> partial_url
full_url
end)
IO.inspect(page_links)
# go to each page, get the attributes from each page
output =
Enum.map(
page_links,
fn url ->
updated_url =
if String.length(data.main_item_url_prefix) > 0 and
!String.contains?(url, "https://") do
to_string(data.main_item_url_prefix) <> url
else
url
end
info =
case scrape_page(updated_url) do
{:ok, document} -> get_attributes(document, data)
_ -> nil
end
info
end
)
|> Enum.filter(fn x -> x != nil end)
output
else
# get_attributes for each item and then add the static information to each item
items = scrape(data.main_url, data.main_item_selector)
Enum.map(items, &get_attributes(&1, data))
|> Enum.filter(fn x -> x != nil end)
end
end
# functions for getting main items from a page
def scrape(url, main_item_selector) do
case HTTP.get(url, ["Accept": "*/*", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"],
ssl: [
verify_fun:
{fn _, reason, state ->
case reason do
{:bad_cert, :cert_expired} -> {:valid, state}
{:bad_cert, :unknown_ca} -> {:valid, state}
{:extension, _} -> {:valid, state}
:valid -> {:valid, state}
:valid_peer -> {:valid, state}
error -> {:fail, error}
end
end, []}
],
recv_timeout: 10000,
timeout: 25000
) do
{:ok, %HTTP.Response{status_code: 200, body: body}} ->
IO.inspect("main url body #{body}")
{:ok, document} = Floki.parse_document(body)
IO.inspect(document)
get_items(document, main_item_selector)
{:ok, %HTTP.Response{status_code: status_code, body: _body}} ->
Logger.info("Received status code #{status_code} from #{url}")
:error
{:error, %HTTP.Error{reason: reason}} ->
Logger.error("HTTP Error: #{reason}")
:error
end
end
defp get_items(body, selector) do
items =
body
|> Floki.find(selector)
items
end
# functions from getting information from a product page
def scrape_page(url) do
case HTTP.get(url, ["Accept": "*/*","User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"],
ssl: [
verify_fun:
{fn _, reason, state ->
case reason do
{:bad_cert, :cert_expired} -> {:valid, state}
{:bad_cert, :unknown_ca} -> {:valid, state}
{:extension, _} -> {:valid, state}
:valid -> {:valid, state}
:valid_peer -> {:valid, state}
error -> {:fail, error}
end
end, []}
],
follow_redirect: true,
recv_timeout: 10000,
timeout: 25000
) do
{:ok, %HTTP.Response{status_code: 200, body: body}} ->
Floki.parse_document(body)
{:ok, %HTTP.Response{status_code: status_code, body: _body}} ->
Logger.info("Received status code #{status_code} from #{url}")
:error
{:error, %HTTP.Error{reason: reason}} ->
Logger.error("HTTP Error: #{reason}")
:error
_ ->
IO.inspect("Error getting url #{url}")
:error
end
end
defp get_attributes(item, data) do
IO.inspect("now getting attributes")
# use the selectors and static information given in data to extract details from the item
title = get_text_from_selector(item, data.title_selector)
url_selector_1 = get_clean_input(data.item_url_selector, :str)
url_selector_2 = get_clean_input(data.item_url_selector_2, :str)
url_selector_3 = get_clean_input(data.item_url_selector_3, :str)
possible_urls =
Enum.map(
[url_selector_1, url_selector_2, url_selector_3],
fn x ->
get_attribute_from_selector(item, x, "href")
end
)
|> Enum.filter(fn x ->
String.length(x) != 0
end)
IO.inspect(possible_urls)
url = Enum.at(possible_urls, 0)
related_url = to_string(data.related_url_input) |> String.trim()
if String.length(title) == 0 or url == nil or String.length(url) == String.length(related_url) do
nil
else
url_prefix = to_string(data.relative_url_prefix) |> String.trim()
url =
if String.length(url_prefix) > 0 and
(!String.contains?(url, "https://") or !String.contains?(url, url_prefix)) do
url_prefix <> url
else
url
end
{resource_type, _} = to_string(data.resource_type_input) |> Integer.parse()
publisher = to_string(data.publisher_input) |> String.trim()
year_selector = data.year_selector
date_selector = data.date_selector
{year, date_year_month_day} =
cond do
year_selector != "" and date_selector != "" ->
{get_text_from_selector(item, year_selector),
get_text_from_selector(item, date_selector)}
year_selector != "" and date_selector == "" ->
{get_text_from_selector(item, year_selector), ""}
year_selector == "" and date_selector != "" ->
timestamp = get_text_from_selector(item, date_selector)
parsed_date_tuple = DateTime.from_iso8601(timestamp)
year =
case parsed_date_tuple do
{_, parsed_date, _} -> parsed_date.year
_ -> ""
end
{year, timestamp}
true ->
{"", ""}
end
details =
Map.new([
{"title", title},
{"url", url},
{"year", year},
{"date_year_month_day", date_year_month_day},
{"resource_type", resource_type},
{"related_url", related_url},
{"publisher", publisher},
{"is_global", false},
{"is_pdf", String.contains?(url, ".pdf")}
])
details
end
end
defp get_text_from_selector(item, selector) do
Floki.find(item, selector)
|> Floki.text()
|> to_string()
|> String.replace("Published:", "")
|> String.replace("Recent Publication:", "")
|> String.trim()
end
defp get_attribute_from_selector(item, selector, attribute) do
Floki.find(item, selector)
|> Floki.attribute(attribute)
|> Enum.at(0)
|> to_string()
|> String.trim()
end
defp get_clean_input(item, type) do
cleaned = to_string(item) |> String.trim()
output =
case type do
:int ->
{num, _} = Integer.parse(cleaned)
num
:str ->
cleaned
_ ->
cleaned
end
output
end
end
Build the Form
url_list = Kino.Input.textarea("Enter list of urls separated by commas")
main_url = Kino.Input.text("Main URL")
has_pagination = Kino.Input.checkbox("Has Pagination")
num_pages = Kino.Input.text("Number of pages to scrape")
main_url_slug =
Kino.Input.text(
"Re-enter main URL with a placeholder for page number e.g. 'https://example.com/#{}'"
)
start_from = Kino.Input.text("Enter first value to apply to slug")
page_change_direction =
Kino.Input.select("Increase or decrease value to get new pages",
increase: "Increase",
decrease: "Decrease"
)
page_change_delta =
Kino.Input.text(
"Enter the integer delta to get next page e.g. if pages are 1,2,3 this should be 1"
)
main_item_selector = Kino.Input.text("CSS Selector for the main item")
main_item_url_prefix = Kino.Input.text("Main item URL Prefix")
scrape_each_page = Kino.Input.checkbox("Need to scrape individual page?")
item_url_selector =
Kino.Input.text("CSS selector for main url (should be url of pdf if item is pdf)")
item_url_selector_2 = Kino.Input.text("1st Alternative CSS selector for main url")
item_url_selector_3 = Kino.Input.text("2nd Alternative CSS selector for main url")
relative_url_prefix = Kino.Input.text("Prefix to add to relative urls")
title_selector = Kino.Input.text("CSS selector for title")
# Static values to include
related_url_input = Kino.Input.text("Enter the related url")
resource_type_input = Kino.Input.text("Enter the resource type as integer")
publisher_input = Kino.Input.text("Enter the name of publisher")
year_selector = Kino.Input.text("Enter the selector for year")
date_selector = Kino.Input.text("Enter the selector for date")
scraper_form =
Kino.Control.form(
[
main_url: main_url,
url_list: url_list,
has_pagination: has_pagination,
num_pages: num_pages,
main_url_slug: main_url_slug,
start_from: start_from,
page_change_direction: page_change_direction,
page_change_delta: page_change_delta,
related_url_input: related_url_input,
main_item_selector: main_item_selector,
main_item_url_prefix: main_item_url_prefix,
item_url_selector: item_url_selector,
item_url_selector_2: item_url_selector_2,
item_url_selector_3: item_url_selector_3,
scrape_each_page: scrape_each_page,
relative_url_prefix: relative_url_prefix,
title_selector: title_selector,
year_selector: year_selector,
date_selector: date_selector,
resource_type_input: resource_type_input,
publisher_input: publisher_input
],
submit: "Process"
)
Listen to Form Submissions and Process them
frame = Kino.Frame.new()
Kino.listen(
scraper_form,
fn event ->
IO.inspect(event)
# Create a function that generates the JSON output
content_fun = fn ->
output =
if event.data.has_pagination == true do
Scraper.scrape_all(event.data)
|> Jason.encode!()
else
Scraper.get_page(event.data)
|> Jason.encode!()
end
output
end
# Create the download button
Kino.Frame.render(frame, Kino.Download.new(content_fun), to: event.origin)
end
)
Thanks Jonatan Klosko!