Linkage: Mato Grosso do Sul
Informações Gerais
> Lembre-se de ativar o runtime para Mix Standalone
com o caminho para /data
Objetivo deste book consiste em unificar as seguintes bases do estado de Mato Grosso do Sul:
- SI-PNI
- e-SUS VE
- SIVEP
As três bases são arquivos .csv
(delimitado por ;
) e os arquivos resultantes são .csv
(delimitado por ,
).
Entrada
Saída
Identificação dos caminhos
get_input_path = fn context ->
"Caminho para #{context}: "
|> IO.gets()
|> String.trim()
|> Path.expand(__DIR__)
|> tap(&unless(File.exists?(&1), do: raise("Caminho #{&1} não existe.")))
end
get_destination_path = fn context ->
"Caminho para #{context}: "
|> IO.gets()
|> String.trim()
|> Path.expand(__DIR__)
|> tap(&File.mkdir_p!(&1))
end
paths = %{
input: %{
esus_ve: get_input_path.("a base do e-SUS VE"),
sipni: get_input_path.("a base do SI-PNI"),
sivep: get_input_path.("a base do SIVEP")
},
pipe: get_destination_path.("o diretório de processamento"),
output: get_destination_path.("o diretório de resultados")
}
csvfy: Normalização de CSV
- CSV por padrão deve ser separado por vírgula
- Codificação do arquivo deve ser UTF-8
op = &Phi.XSV.csvfy(&1, &2, ";")
id = :csvfy
esus_ve =
:esus_ve
|> Phi.Pipe.new(paths.input.esus_ve)
|> Phi.Pipe.run(id, op, result_dir: paths.pipe)
sipni =
:sipni
|> Phi.Pipe.new(paths.input.sipni)
|> Phi.Pipe.run(id, op, result_dir: paths.pipe)
sivep =
:sivep
|> Phi.Pipe.new(paths.input.sivep)
|> Phi.Pipe.run(id, op, result_dir: paths.pipe)
filter_columns: Corte de colunas
Colunas de interesse
e-SUS VE
-
#3 dataInicioSintomas
:DateTime
-
#9 cpf
:string (hash)
-
#26 sintomas
:string
-
#30 tipoTeste
:string
-
#31 resultadoTeste
:Positivo | Negativo
-
#47 idade
:integer
-
#63 municipioIBGE
:integer (IBGE 7 dígitos)
SIPNI
-
#10 paciente_cpf
:string (hash)
-
#14 paciente_endereco_coIbgeMunicipio
:integer (IBGE 6 dígitos)
-
#20 paciente_idade
:integer
-
#27 vacina_codigo
:integer
-
#28 vacina_dataAplicacao
:dd/MM/yyyy
-
#33 vacina_nome
:string
-
#34 DOSE
:1 | 2 | 8
Versão antiga:
-
#14 paciente_cpf
:string (hash)
-
#19 paciente_endereco_coIbgeMunicipio
:integer (IBGE 6 dígitos)
-
#26 paciente_idade
:float
-
#37 vacina_codigo
:integer
-
#38 vacina_dataAplicacao
:DateTime
-
#45 vacina_nome
:string
-
#46 vacina_numDose
:integer
SIVEP
-
#4 DT_SIN_PRI
:dd/MM/yyyy
-
#13 NU_CPF
:string (hash)
-
#17 NU_IDADE_N
:integer
-
#32 CO_MUN_RES
:integer (IBGE 6 dígitos)
-
#81 HOSPITAL
:1 | 2 | 9
-
#102 PCR_RESUL
:1 | 0
-
#124 CLASSI_FIN
:5 covid | ?
-
#127 EVOLUCAO
:1 cura | 2 óbito | 9
-
#140 PCR_SARS2
:1 | 0
-
#151 TP_TES_AN
:2 teste rápido antigênico | ?
-
#159 AN_SARS2
:1 | ?
op = fn columns -> &Phi.XSV.select(&1, &2, columns) end
id = :filter_columns
columns = [
"dataInicioSintomas",
"cpf",
"sintomas",
"tipoTeste",
"resultadoTeste",
"idade",
"municipioIBGE"
]
esus_ve =
esus_ve
|> Phi.Pipe.run(id, op.(columns))
columns = [
"paciente_cpf",
"paciente_endereco_coIbgeMunicipio",
"paciente_idade",
"vacina_codigo",
"vacina_dataAplicacao",
"vacina_nome",
"DOSE"
]
sipni =
sipni
|> Phi.Pipe.run(id, op.(columns))
# Phi.XSV.run(~w(frequency --select DOSE #{sipni.path})) |> IO.puts
# Phi.XSV.run(~w(headers #{sipni.path})) |> IO.puts
columns = [
"DT_SIN_PRI",
"NU_CPF",
"NU_IDADE_N",
"CO_MUN_RES",
"HOSPITAL",
"PCR_RESUL",
"CLASSI_FIN",
"EVOLUCAO",
"PCR_SARS2",
"TP_TES_AN",
"AN_SARS2"
]
sivep =
sivep
|> Phi.Pipe.run(id, op.(columns))
filter_year_2021: Filtro por ano
Restrições:
-
e-SUS VE:
-
dataInicioSintomas
:-
year == 2021
-
-
-
SIPNI:
-
vacina_dataAplicacao
:-
year == 2021
-
-
-
SIVEP:
-
DT_SIN_PRI
:-
year == 2021
-
-
op = fn columns_indexes, type ->
regex =
case type do
:datetime -> "^2021"
:dd_mm_yyyy -> "2021$"
end
&Phi.XSV.search(&1, &2, columns_indexes, regex)
end
id = :filter_year
esus_ve =
esus_ve
|> Phi.Pipe.run(id, op.("dataInicioSintomas", :datetime))
sipni =
sipni
|> Phi.Pipe.run(id, op.("vacina_dataAplicacao", :dd_mm_yyyy))
# Phi.XSV.run(~w(frequency --select vacina_dataAplicacao #{sipni.path})) |> IO.puts
sivep =
sivep
|> Phi.Pipe.run(id, op.("DT_SIN_PRI", :dd_mm_yyyy))
filter_age_15_plus: Filtro por idade
Restrições:
-
e-SUS VE:
-
idade
:-
> 15
-
-
-
SIPNI:
-
paciente_idade
:-
> 15
-
-
-
SIVEP:
-
NU_IDADE_N
:-
> 15
-
-
op = fn column_index, type ->
regex =
case type do
:float -> "^1[5-9](.0)?|1[0-3][0-9](.0)?|[2-9][0-9](.0)?$"
:integer -> "^1[5-9]|1[0-3][0-9]|[2-9][0-9]$"
end
&Phi.XSV.search(&1, &2, column_index, regex)
end
id = :filter_age_15_plus
esus_ve =
esus_ve
|> Phi.Pipe.run(id, op.("idade", :integer))
sipni =
sipni
|> Phi.Pipe.run(id, op.("paciente_idade", :integer))
# Phi.XSV.run(~w(frequency --select paciente_idade #{sipni.path})) |> IO.puts
sivep =
sivep
|> Phi.Pipe.run(id, op.("NU_IDADE_N", :integer))
filter_state_50: Filtro por município do estado
Restringir para residentes do Mato Grosso do Sul
Restrições:
-
e-SUS VE:
-
municipioIBGE
:-
50XXXXX
-
-
-
SIPNI:
-
paciente_endereco_coIbgeMunicipio
:-
50XXXX
-
-
-
SIVEP:
-
CO_MUN_RES
:-
50XXXX
-
-
op = fn column_index ->
&Phi.XSV.search(&1, &2, column_index, "^50")
end
id = :filter_state
esus_ve =
esus_ve
|> Phi.Pipe.run(id, op.("municipioIBGE"))
sipni =
sipni
|> Phi.Pipe.run(id, op.("paciente_endereco_coIbgeMunicipio"))
sivep =
sivep
|> Phi.Pipe.run(id, op.("CO_MUN_RES"))
filter_null: Filtro de valores nulos em colunas obrigatórias
e-SUS VE
-
dataInicioSintomas
-
sintomas
-
tipoTeste
-
resultadoTeste
-
idade
-
municipioIBGE
SIPNI
Versão nova:
-
paciente_endereco_coIbgeMunicipio
-
paciente_idade
-
vacina_codigo
-
vacina_dataAplicacao
-
DOSE
Versão antiga:
-
paciente_endereco_coIbgeMunicipio
-
paciente_idade
-
vacina_codigo
-
vacina_dataAplicacao
-
vacina_numDose
SIVEP
-
DT_SIN_PRI
-
NU_IDADE_N
-
CO_MUN_RES
op = fn column_index ->
&Phi.XSV.search(&1, &2, column_index, "^.+")
end
id = :filter_null
esus_ve =
esus_ve
|> Phi.Pipe.run_many(id, [
op.("dataInicioSintomas"),
op.("sintomas"),
op.("tipoTeste"),
op.("resultadoTeste"),
op.("idade"),
op.("municipioIBGE")
])
sipni =
sipni
|> Phi.Pipe.run_many(id, [
op.("paciente_endereco_coIbgeMunicipio"),
op.("paciente_idade"),
op.("vacina_codigo"),
op.("vacina_dataAplicacao"),
op.("DOSE")
])
sivep =
sivep
|> Phi.Pipe.run_many(id, [
op.("DT_SIN_PRI"),
op.("NU_IDADE_N"),
op.("CO_MUN_RES")
])
format: Validação e Formatação
-
e-SUS VE:
-
dataInicioSintomas
-
#2 esus_symptoms_date
:date
-
-
cpf
-
#1 esus_cpf
:string (hash)
-
-
sintomas
- Valida se é sintomático
-
tipoTeste
-
Valida se é
RT-PCR
ouTESTE RÁPIDO - ANTÍGENO
-
Valida se é
-
resultadoTeste
- Valida se teste foi positivo
-
idade
-
#4 esus_age_index_15_to_80_by_10
:integer
-
-
municipioIBGE
-
#3 esus_city
:integer (IBGE 7 dígitos)
-
-
esus_ve =
esus_ve
|> Phi.Pipe.run(:format, fn input_path, output_path ->
file = File.open!(output_path, [:append])
IO.write(
file,
[
"esus_cpf",
"esus_symptoms_date",
"esus_city",
"esus_age_index_15_to_80_by_10"
]
|> Enum.join(",")
|> Kernel.<>("\n")
)
input_path
|> File.stream!(read_ahead: 100_000)
|> NimbleCSV.RFC4180.parse_stream()
|> Flow.from_enumerable()
|> Flow.map(fn [date, cpf, _symptoms, type, result, age, city] ->
if type in ["RT-PCR", "TESTE RÁPIDO - ANTÍGENO"] and String.first(result) == "P" do
[
cpf,
Phi.FormatUtils.date(date, :datetime),
Phi.FormatUtils.city(city),
Phi.FormatUtils.age_index(age, :from_18_to_80_by_10)
]
else
nil
end
end)
|> Flow.reject(&is_nil/1)
|> Stream.chunk_every(100_000)
|> Stream.map(&IO.write(file, NimbleCSV.RFC4180.dump_to_iodata(&1)))
|> Stream.run()
File.close(file)
end)
-
SIPNI:
-
paciente_cpf
-
#1 sipni_cpf
:string (hash)
-
-
paciente_endereco_coIbgeMunicipio
-
#3 sipni_city
:integer (IBGE 7 dígitos)
-
-
paciente_idade
-
#4 sipni_age_index_15_to_80_by_10
:integer
-
-
vacina_codigo
-
Para definir
#5 sipni_is_full_vaccination
em[1]
-
#1 vaccine_code
em[2]
-
Para definir
-
vacina_dataAplicacao
-
#2 sipni_vaccination_date
:date
-
-
vacina_nome
-
#2 vaccine_name
em[2]
-
-
DOSE
-
Para definir
#5 sipni_is_full_vaccination
em[1]
-
Para definir
-
-
Arquivos:
-
[1] format
: Dados para linkage -
[2] vaccines
: Dados de vacinas
-
# Para converter IBGE 6 dígitos para IBGE 7 dígitos
try do
:ets.new(:cities, [:set, :public, :named_table])
rescue
_error -> :ets.delete_all_objects(:cities)
end
"sandbox/input/ms_cities.csv"
|> Path.expand(__DIR__)
|> File.read!()
|> NimbleCSV.RFC4180.parse_string()
|> Enum.map(fn [id] -> {String.slice(id, 0, 6), String.to_integer(id)} end)
|> then(&:ets.insert(:cities, &1))
# Para gerar a tabela de vacinas
try do
:ets.new(:vaccines, [:set, :public, :named_table])
rescue
_error -> :ets.delete_all_objects(:vaccines)
end
sipni =
sipni
|> Phi.Pipe.run(:format, fn input_path, output_path ->
file = File.open!(output_path, [:append])
IO.write(
file,
[
"sipni_cpf",
"sipni_vaccination_date",
"sipni_city",
"sipni_age_index_15_to_80_by_10",
"sipni_is_full_vaccination"
]
|> Enum.join(",")
|> Kernel.<>("\n")
)
input_path
|> File.stream!(read_ahead: 100_000)
|> NimbleCSV.RFC4180.parse_stream()
|> Flow.from_enumerable()
|> Flow.map(fn [cpf, city, age, v_code, date, v_name, v_dosage] ->
v_code = String.to_integer(v_code)
:ets.insert_new(:vaccines, {v_code, v_name})
city =
if city == "500000" do
nil
else
Phi.FormatUtils.city(city, :cities)
end
is_full_vaccination =
if v_code == 88 or v_dosage == "2" do
1
else
0
end
[
cpf,
Phi.FormatUtils.date(date, :dd_mm_yyyy),
city,
Phi.FormatUtils.age_index(age, :from_18_to_80_by_10),
is_full_vaccination
]
end)
|> Stream.chunk_every(100_000)
|> Stream.map(&IO.write(file, NimbleCSV.RFC4180.dump_to_iodata(&1)))
|> Stream.run()
File.close(file)
end)
|> Phi.Pipe.run(
:vaccines,
fn _input_path, output_path ->
:vaccines
|> :ets.tab2list()
|> Enum.map(fn {code, name} -> [code, name] end)
|> Enum.sort(&(List.first(&1) <= List.first(&2)))
|> then(&[~w(code name) | &1])
|> NimbleCSV.RFC4180.dump_to_iodata()
|> then(&File.write!(output_path, &1))
end,
result_dir: paths.output
)
|> tap(fn _sipni -> :ets.delete(:vaccines) end)
|> then(&Map.put(&1, :path, &1.history.format))
-
SIVEP:
-
DT_SIN_PRI
-
#2 sivep_symptoms_date
:date
-
-
NU_CPF
-
#1 sivep_cpf
:string (hash)
-
-
NU_IDADE_N
-
#4 sivep_age_index_15_to_80_by_10
:integer
-
-
CO_MUN_RES
-
#3 sivep_city
:integer (IBGE 7 dígitos)
-
-
HOSPITAL
-
#6 sivep_is_hospitalization
:1 | 0
-
-
EVOLUCAO
-
#7 sivep_is_death
:1 | 0
-
-
PCR_SARS2
-
-
Condicional
caso
:-
CLASSI_FIN
é5
OU -
PCR_RESULT
é1
OU -
PCR_SARS2
é1
OU -
TP_TES_AN
é2
E-
AN_SARS2
é1
-
-
Invalida
#81 HOSPITAL
se valor não for1
-
Invalida
#127 EVOLUCAO
se valor não for1
-
# Para converter IBGE 6 dígitos para IBGE 7 dígitos
try do
:ets.new(:cities, [:set, :public, :named_table])
rescue
_error -> :ets.delete_all_objects(:cities)
end
"sandbox/input/ms_cities.csv"
|> Path.expand(__DIR__)
|> File.read!()
|> NimbleCSV.RFC4180.parse_string()
|> Enum.map(fn [id] -> {String.slice(id, 0, 6), String.to_integer(id)} end)
|> then(&:ets.insert(:cities, &1))
sivep =
sivep
|> Phi.Pipe.run(:format, fn input_path, output_path ->
file = File.open!(output_path, [:append])
IO.write(
file,
[
"sivep_cpf",
"sivep_symptoms_date",
"sivep_city",
"sivep_age_index_15_to_80_by_10",
"sivep_is_case",
"sivep_is_hospitalization",
"sivep_is_death"
]
|> Enum.join(",")
|> Kernel.<>("\n")
)
input_path
|> File.stream!(read_ahead: 100_000)
|> NimbleCSV.RFC4180.parse_stream()
|> Flow.from_enumerable()
|> Flow.map(fn [
date,
cpf,
age,
city,
hospitalization,
pcr_result,
classification,
death,
pcr_sars2,
test_type,
test_sars2_result
] ->
if classification == "5" or pcr_result == "1" or pcr_sars2 == "1" or
(test_type == "2" and test_sars2_result == "1") do
city =
if city == "500000" do
nil
else
Phi.FormatUtils.city(city, :cities)
end
[
cpf,
Phi.FormatUtils.date(date, :dd_mm_yyyy),
city,
Phi.FormatUtils.age_index(age, :from_18_to_80_by_10),
1,
if(hospitalization == "1", do: 1),
if(death == "2", do: 1)
]
end
end)
|> Stream.reject(&is_nil/1)
|> Stream.chunk_every(100_000)
|> Stream.map(&IO.write(file, NimbleCSV.RFC4180.dump_to_iodata(&1)))
|> Stream.run()
File.close(file)
end)
linkage: Linkage entre as bases
Full join entre as bases através das colunas de CPF, partindo da base do e-SUS VE.
-
Resultado:
-
#1 cpf
:string (hash)
-
#2 city
:integer (IBGE 7 dígitos)
-
#3 age_index_15_to_80_by_10
:integer
-
#4 esus_symptoms_date
:date
-
#5 sivep_symptoms_date
:date
-
#6 sipni_vaccination_date
:date
-
#7 sivep_is_case
:1 | 0
-
#8 sivep_is_hospitalization
:1 | 0
-
#9 sivep_is_death
:1 | 0
-
#10 sipni_is_full_vaccination
:1 | 0
-
op = fn input1_column, input2_path, input2_column ->
&Phi.XSV.full_join(&1, input2_path, &2, input1_column, input2_column)
end
id = :linkage
one_of = fn
"", "" -> nil
"", s2 -> {:ok, s2}
s1, _s2 -> {:ok, s1}
end
esus_ve =
esus_ve
|> Phi.Pipe.run(:esus_sipni_linkage, op.("esus_cpf", sipni.path, "sipni_cpf"))
|> Phi.Pipe.run(:linkage_format, fn input_path, output_path ->
file = File.open!(output_path, [:append])
IO.write(
file,
[
"cpf",
"city",
"age_index_15_to_80_by_10",
"esus_symptoms_date",
"sipni_vaccination_date",
"sipni_is_full_vaccination"
]
|> Enum.join(",")
|> Kernel.<>("\n")
)
input_path
|> File.stream!(read_ahead: 100_000)
|> NimbleCSV.RFC4180.parse_stream()
|> Flow.from_enumerable()
|> Flow.map(fn [cpf1, e2, c1, a1, cpf2, s2, c2, a2, s5] ->
with {:ok, cpf} <- one_of.(cpf1, cpf2),
{:ok, city} <- one_of.(c1, c2),
{:ok, age} <- one_of.(a1, a2) do
[cpf, city, age, e2, s2, s5]
end
end)
|> Flow.reject(&is_nil/1)
|> Stream.chunk_every(100_000)
|> Stream.map(&IO.write(file, NimbleCSV.RFC4180.dump_to_iodata(&1)))
|> Stream.run()
File.close(file)
end)
|> Phi.Pipe.run(:sivep_linkage, op.("cpf", sivep.path, "sivep_cpf"))
|> Phi.Pipe.run(:full_linkage, fn input_path, output_path ->
file = File.open!(output_path, [:append])
IO.write(
file,
[
"cpf",
"city",
"age_index_15_to_80_by_10",
"esus_symptoms_date",
"sivep_symptoms_date",
"sipni_vaccination_date",
"sivep_is_case",
"sivep_is_hospitalization",
"sivep_is_death",
"sipni_is_full_vaccination"
]
|> Enum.join(",")
|> Kernel.<>("\n")
)
input_path
|> File.stream!(read_ahead: 100_000)
|> NimbleCSV.RFC4180.parse_stream()
|> Flow.from_enumerable()
|> Flow.map(fn [cpf1, c1, a1, e2, s2, s5, cpf2, si2, c2, a2, si5, si6, si7] ->
with {:ok, cpf} <- one_of.(cpf1, cpf2),
{:ok, city} <- one_of.(c1, c2),
{:ok, age} <- one_of.(a1, a2) do
[cpf, city, age, e2, si2, s2, si5, si6, si7, s5]
end
end)
|> Flow.reject(&is_nil/1)
|> Stream.chunk_every(100_000)
|> Stream.map(&IO.write(file, NimbleCSV.RFC4180.dump_to_iodata(&1)))
|> Stream.run()
File.close(file)
end)
|> Phi.Pipe.run(:linkage_sort, &Phi.XSV.full_sort(&1, &2))
|> Phi.Pipe.run(:linkage_duplicity_filter, fn input_path, output_path ->
file = File.open!(output_path, [:append])
IO.write(
file,
[
"cpf",
"city",
"age_index_15_to_80_by_10",
"esus_symptoms_date",
"sivep_symptoms_date",
"sipni_vaccination_date",
"sivep_is_case",
"sivep_is_hospitalization",
"sivep_is_death",
"sipni_is_full_vaccination"
]
|> Enum.join(",")
|> Kernel.<>("\n")
)
input_path
|> File.stream!(read_ahead: 100_000)
|> Stream.drop(1)
|> Enum.reduce({[], 0}, fn line, {data, size} ->
if List.first(data) == line do
{data, size}
else
if size > 100_000 do
[last | data] = data
IO.write(file, Enum.join(data, ""))
{[last], 1}
else
{[line | data], size + 1}
end
end
end)
|> then(&elem(&1, 0))
|> Enum.join("")
|> then(&IO.write(file, &1))
File.close(file)
end)
|> Phi.Pipe.run(id, &Phi.XSV.sort(&1, &2, select: [2]), result_dir: paths.output)
Phi.XSV.run(~w(headers #{esus_ve.path})) |> IO.puts()
Sanitização
> Atenção! A partir desta seção os caminhos do path de cada Pipe poderão ser inválidos
-
Confirme
:rename_results?
para renomear os arquivos de saída para nomes mais significativos -
Confirme
:purge?
para remover o diretório de processamento
sanitize = fn opts ->
if Keyword.get(opts, :purge?, false) == true do
File.rm_rf!(paths.pipe)
end
if Keyword.get(opts, :rename_results?, false) == true do
path = esus_ve.history.linkage
if File.exists?(path) do
new_path = Path.join(Path.dirname(path), "linkage.csv")
File.rename!(path, new_path)
end
path = sipni.history.vaccines
if File.exists?(path) do
new_path = Path.join(Path.dirname(path), "vaccines.csv")
File.rename!(path, new_path)
end
end
end
sanitize.(rename_results?: true, purge?: true)