Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

LogWatcher

logwatcher.livemd

LogWatcher

Mix.install([
  {:file_system, "~> 1.0"},
  {:csv, "~> 3.2"}
])

Section

defmodule Helper do
  def get_data_and_update_state(%{file_sizes: file_sizes} = state, path) do
    previous_file_size = Map.get(file_sizes, path, 0)
    current_file_size = get_size(path)
    appended = current_file_size - previous_file_size
    file_sizes = Map.put(file_sizes, path, current_file_size)
    state = Map.put(state, :file_sizes, file_sizes)
    {:ok, f} = File.open(path, [:binary])
    {:ok, data} = :file.pread(f, previous_file_size, appended)
    File.close(f)

    case data do
      "" -> {state, []}
      data -> {state, CSV.decode([data], headers: headers())}
    end
  end

  defp headers do
    [
      :log_time,
      :user_name,
      :database_name,
      :process_id,
      :connection_from,
      :session_id,
      :session_line_num,
      :command_tag,
      :session_start_time,
      :virtual_transaction_id,
      :transaction_i,
      :error_severity,
      :sql_state_code,
      :message,
      :detail,
      :hint,
      :internal_query,
      :internal_query_pos,
      :context,
      :query,
      :query_pos,
      :location,
      :application_name,
      :backend_type,
      :leader_pid,
      :query_id
    ]
  end

  def get_size(path) do
    File.lstat!(path).size
  end
end
defmodule Watcher do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args)
  end

  def init(args) do
    dir = args[:dir]

    file_sizes =
      File.ls!(dir)
      |> Enum.filter(&String.ends_with?(&1, "csv"))
      |> Enum.map(&Path.join(dir, &1))
      |> Map.new(&{&1, Helper.get_size(&1)})

    {:ok, watcher_pid} = FileSystem.start_link([dirs: [dir]] ++ args)
    FileSystem.subscribe(watcher_pid)
    {:ok, %{watcher_pid: watcher_pid, file_sizes: file_sizes, dir: dir}}
  end

  def handle_info(
        {:file_event, watcher_pid, {path, [:modified]}},
        %{watcher_pid: watcher_pid} = state
      ) do
    {new_state, data} = Helper.get_data_and_update_state(state, path)

    data |> Enum.map(fn 
      {:ok, data} -> IO.puts(data[:message])
      {:error, error} -> IO.warn(error)
        end)

    {:noreply, new_state}
  end

  def handle_info({:file_event, watcher_pid, {path, events}}, %{watcher_pid: watcher_pid} = state) do
    # Your own logic for path and events
    IO.inspect({path, events})

    {:noreply, state}
  end

  def handle_info({:file_event, watcher_pid, :stop}, %{watcher_pid: watcher_pid} = state) do
    # Your own logic when monitor stop
    {:noreply, state}
  end
end
defmodule Pooler do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args)
  end

  def init(args) do
    dir = args[:dir]
    frequency = args[:frequency] || 5000

    file_sizes =
      File.ls!(dir)
      |> Enum.filter(&String.ends_with?(&1, "csv"))
      |> Enum.map(&Path.join(dir, &1))
      |> Map.new(&{&1, Helper.get_size(&1)})

    last_file = file_sizes |> Map.keys() |> Enum.sort(:desc) |> hd
    count = map_size(file_sizes)
    Process.send_after(self(), :pool, 0)

    {:ok,
     %{file_sizes: file_sizes, dir: dir, last_file: last_file, count: count, frequency: frequency}}
  end

  def handle_info(
        :pool,
        %{frequency: frequency, last_file: path} = state
      ) do
    
    {new_state, data} = Helper.get_data_and_update_state(state, path)
    data |> Enum.map(fn {:ok, data} -> IO.inspect(data[:message]) end)
    Process.send_after(self(), :pool, frequency)
    {:noreply, new_state}
  end

  def handle_info(msg, %{file_sizes: _, frequency: _, last_file: _} = state) do
    IO.inspect(msg)
    Process.send_after(self(), :pool, 5000)
    {:noreply, state}
  end

end
Watcher.start_link(dir: "/var/log/postgres", extension: "csv", frequency: 5000)