LogWatcher
Mix.install([
{:file_system, "~> 1.0"},
{:csv, "~> 3.2"}
])
Section
defmodule Helper do
def get_data_and_update_state(%{file_sizes: file_sizes} = state, path) do
previous_file_size = Map.get(file_sizes, path, 0)
current_file_size = get_size(path)
appended = current_file_size - previous_file_size
file_sizes = Map.put(file_sizes, path, current_file_size)
state = Map.put(state, :file_sizes, file_sizes)
{:ok, f} = File.open(path, [:binary])
{:ok, data} = :file.pread(f, previous_file_size, appended)
File.close(f)
case data do
"" -> {state, []}
data -> {state, CSV.decode([data], headers: headers())}
end
end
defp headers do
[
:log_time,
:user_name,
:database_name,
:process_id,
:connection_from,
:session_id,
:session_line_num,
:command_tag,
:session_start_time,
:virtual_transaction_id,
:transaction_i,
:error_severity,
:sql_state_code,
:message,
:detail,
:hint,
:internal_query,
:internal_query_pos,
:context,
:query,
:query_pos,
:location,
:application_name,
:backend_type,
:leader_pid,
:query_id
]
end
def get_size(path) do
File.lstat!(path).size
end
end
defmodule Watcher do
use GenServer
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
def init(args) do
dir = args[:dir]
file_sizes =
File.ls!(dir)
|> Enum.filter(&String.ends_with?(&1, "csv"))
|> Enum.map(&Path.join(dir, &1))
|> Map.new(&{&1, Helper.get_size(&1)})
{:ok, watcher_pid} = FileSystem.start_link([dirs: [dir]] ++ args)
FileSystem.subscribe(watcher_pid)
{:ok, %{watcher_pid: watcher_pid, file_sizes: file_sizes, dir: dir}}
end
def handle_info(
{:file_event, watcher_pid, {path, [:modified]}},
%{watcher_pid: watcher_pid} = state
) do
{new_state, data} = Helper.get_data_and_update_state(state, path)
data |> Enum.map(fn
{:ok, data} -> IO.puts(data[:message])
{:error, error} -> IO.warn(error)
end)
{:noreply, new_state}
end
def handle_info({:file_event, watcher_pid, {path, events}}, %{watcher_pid: watcher_pid} = state) do
# Your own logic for path and events
IO.inspect({path, events})
{:noreply, state}
end
def handle_info({:file_event, watcher_pid, :stop}, %{watcher_pid: watcher_pid} = state) do
# Your own logic when monitor stop
{:noreply, state}
end
end
defmodule Pooler do
use GenServer
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
def init(args) do
dir = args[:dir]
frequency = args[:frequency] || 5000
file_sizes =
File.ls!(dir)
|> Enum.filter(&String.ends_with?(&1, "csv"))
|> Enum.map(&Path.join(dir, &1))
|> Map.new(&{&1, Helper.get_size(&1)})
last_file = file_sizes |> Map.keys() |> Enum.sort(:desc) |> hd
count = map_size(file_sizes)
Process.send_after(self(), :pool, 0)
{:ok,
%{file_sizes: file_sizes, dir: dir, last_file: last_file, count: count, frequency: frequency}}
end
def handle_info(
:pool,
%{frequency: frequency, last_file: path} = state
) do
{new_state, data} = Helper.get_data_and_update_state(state, path)
data |> Enum.map(fn {:ok, data} -> IO.inspect(data[:message]) end)
Process.send_after(self(), :pool, frequency)
{:noreply, new_state}
end
def handle_info(msg, %{file_sizes: _, frequency: _, last_file: _} = state) do
IO.inspect(msg)
Process.send_after(self(), :pool, 5000)
{:noreply, state}
end
end
Watcher.start_link(dir: "/var/log/postgres", extension: "csv", frequency: 5000)