Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

Linkage: Mato Grosso do Sul

books/ms/linkage.livemd

Linkage: Mato Grosso do Sul

Informações Gerais

> Lembre-se de ativar o runtime para Mix Standalone com o caminho para /data

Objetivo deste book consiste em unificar as seguintes bases do estado de Mato Grosso do Sul:

  • SI-PNI
  • e-SUS VE
  • SIVEP

As três bases são arquivos .csv (delimitado por ;) e os arquivos resultantes são .csv (delimitado por ,).

Entrada

Saída

Identificação dos caminhos

get_input_path = fn context ->
  "Caminho para #{context}: "
  |> IO.gets()
  |> String.trim()
  |> Path.expand(__DIR__)
  |> tap(&unless(File.exists?(&1), do: raise("Caminho #{&1} não existe.")))
end

get_destination_path = fn context ->
  "Caminho para #{context}: "
  |> IO.gets()
  |> String.trim()
  |> Path.expand(__DIR__)
  |> tap(&File.mkdir_p!(&1))
end

paths = %{
  input: %{
    esus_ve: get_input_path.("a base do e-SUS VE"),
    sipni: get_input_path.("a base do SI-PNI"),
    sivep: get_input_path.("a base do SIVEP")
  },
  pipe: get_destination_path.("o diretório de processamento"),
  output: get_destination_path.("o diretório de resultados")
}

csvfy: Normalização de CSV

  • CSV por padrão deve ser separado por vírgula
  • Codificação do arquivo deve ser UTF-8
op = &Phi.XSV.csvfy(&1, &2, ";")
id = :csvfy
esus_ve =
  :esus_ve
  |> Phi.Pipe.new(paths.input.esus_ve)
  |> Phi.Pipe.run(id, op, result_dir: paths.pipe)
sipni =
  :sipni
  |> Phi.Pipe.new(paths.input.sipni)
  |> Phi.Pipe.run(id, op, result_dir: paths.pipe)
sivep =
  :sivep
  |> Phi.Pipe.new(paths.input.sivep)
  |> Phi.Pipe.run(id, op, result_dir: paths.pipe)

filter_columns: Corte de colunas

Colunas de interesse

e-SUS VE

  • #3 dataInicioSintomas: DateTime
  • #9 cpf: string (hash)
  • #26 sintomas: string
  • #30 tipoTeste: string
  • #31 resultadoTeste: Positivo | Negativo
  • #47 idade: integer
  • #63 municipioIBGE: integer (IBGE 7 dígitos)

SIPNI

  • #10 paciente_cpf: string (hash)
  • #14 paciente_endereco_coIbgeMunicipio: integer (IBGE 6 dígitos)
  • #20 paciente_idade: integer
  • #27 vacina_codigo: integer
  • #28 vacina_dataAplicacao: dd/MM/yyyy
  • #33 vacina_nome: string
  • #34 DOSE: 1 | 2 | 8

Versão antiga:

  • #14 paciente_cpf: string (hash)
  • #19 paciente_endereco_coIbgeMunicipio: integer (IBGE 6 dígitos)
  • #26 paciente_idade: float
  • #37 vacina_codigo: integer
  • #38 vacina_dataAplicacao: DateTime
  • #45 vacina_nome: string
  • #46 vacina_numDose: integer

SIVEP

  • #4 DT_SIN_PRI: dd/MM/yyyy
  • #13 NU_CPF: string (hash)
  • #17 NU_IDADE_N: integer
  • #32 CO_MUN_RES: integer (IBGE 6 dígitos)
  • #81 HOSPITAL: 1 | 2 | 9
  • #102 PCR_RESUL: 1 | 0
  • #124 CLASSI_FIN: 5 covid | ?
  • #127 EVOLUCAO: 1 cura | 2 óbito | 9
  • #140 PCR_SARS2: 1 | 0
  • #151 TP_TES_AN: 2 teste rápido antigênico | ?
  • #159 AN_SARS2: 1 | ?
op = fn columns -> &Phi.XSV.select(&1, &2, columns) end
id = :filter_columns
columns = [
  "dataInicioSintomas",
  "cpf",
  "sintomas",
  "tipoTeste",
  "resultadoTeste",
  "idade",
  "municipioIBGE"
]

esus_ve =
  esus_ve
  |> Phi.Pipe.run(id, op.(columns))
columns = [
  "paciente_cpf",
  "paciente_endereco_coIbgeMunicipio",
  "paciente_idade",
  "vacina_codigo",
  "vacina_dataAplicacao",
  "vacina_nome",
  "DOSE"
]

sipni =
  sipni
  |> Phi.Pipe.run(id, op.(columns))

# Phi.XSV.run(~w(frequency --select DOSE #{sipni.path})) |> IO.puts
# Phi.XSV.run(~w(headers #{sipni.path})) |> IO.puts
columns = [
  "DT_SIN_PRI",
  "NU_CPF",
  "NU_IDADE_N",
  "CO_MUN_RES",
  "HOSPITAL",
  "PCR_RESUL",
  "CLASSI_FIN",
  "EVOLUCAO",
  "PCR_SARS2",
  "TP_TES_AN",
  "AN_SARS2"
]

sivep =
  sivep
  |> Phi.Pipe.run(id, op.(columns))

filter_year_2021: Filtro por ano

Restrições:

  • e-SUS VE:
    • dataInicioSintomas:
      • year == 2021
  • SIPNI:
    • vacina_dataAplicacao:
      • year == 2021
  • SIVEP:
    • DT_SIN_PRI:
      • year == 2021
op = fn columns_indexes, type ->
  regex =
    case type do
      :datetime -> "^2021"
      :dd_mm_yyyy -> "2021$"
    end

  &Phi.XSV.search(&1, &2, columns_indexes, regex)
end

id = :filter_year
esus_ve =
  esus_ve
  |> Phi.Pipe.run(id, op.("dataInicioSintomas", :datetime))
sipni =
  sipni
  |> Phi.Pipe.run(id, op.("vacina_dataAplicacao", :dd_mm_yyyy))

# Phi.XSV.run(~w(frequency --select vacina_dataAplicacao #{sipni.path})) |> IO.puts
sivep =
  sivep
  |> Phi.Pipe.run(id, op.("DT_SIN_PRI", :dd_mm_yyyy))

filter_age_15_plus: Filtro por idade

Restrições:

  • e-SUS VE:
    • idade:
      • > 15
  • SIPNI:
    • paciente_idade:
      • > 15
  • SIVEP:
    • NU_IDADE_N:
      • > 15
op = fn column_index, type ->
  regex =
    case type do
      :float -> "^1[5-9](.0)?|1[0-3][0-9](.0)?|[2-9][0-9](.0)?$"
      :integer -> "^1[5-9]|1[0-3][0-9]|[2-9][0-9]$"
    end

  &Phi.XSV.search(&1, &2, column_index, regex)
end

id = :filter_age_15_plus
esus_ve =
  esus_ve
  |> Phi.Pipe.run(id, op.("idade", :integer))
sipni =
  sipni
  |> Phi.Pipe.run(id, op.("paciente_idade", :integer))

# Phi.XSV.run(~w(frequency --select paciente_idade #{sipni.path})) |> IO.puts
sivep =
  sivep
  |> Phi.Pipe.run(id, op.("NU_IDADE_N", :integer))

filter_state_50: Filtro por município do estado

Restringir para residentes do Mato Grosso do Sul

Restrições:

  • e-SUS VE:
    • municipioIBGE:
      • 50XXXXX
  • SIPNI:
    • paciente_endereco_coIbgeMunicipio:
      • 50XXXX
  • SIVEP:
    • CO_MUN_RES:
      • 50XXXX
op = fn column_index ->
  &Phi.XSV.search(&1, &2, column_index, "^50")
end

id = :filter_state
esus_ve =
  esus_ve
  |> Phi.Pipe.run(id, op.("municipioIBGE"))
sipni =
  sipni
  |> Phi.Pipe.run(id, op.("paciente_endereco_coIbgeMunicipio"))
sivep =
  sivep
  |> Phi.Pipe.run(id, op.("CO_MUN_RES"))

filter_null: Filtro de valores nulos em colunas obrigatórias

e-SUS VE

  • dataInicioSintomas
  • sintomas
  • tipoTeste
  • resultadoTeste
  • idade
  • municipioIBGE

SIPNI

Versão nova:

  • paciente_endereco_coIbgeMunicipio
  • paciente_idade
  • vacina_codigo
  • vacina_dataAplicacao
  • DOSE

Versão antiga:

  • paciente_endereco_coIbgeMunicipio
  • paciente_idade
  • vacina_codigo
  • vacina_dataAplicacao
  • vacina_numDose

SIVEP

  • DT_SIN_PRI
  • NU_IDADE_N
  • CO_MUN_RES
op = fn column_index ->
  &Phi.XSV.search(&1, &2, column_index, "^.+")
end

id = :filter_null
esus_ve =
  esus_ve
  |> Phi.Pipe.run_many(id, [
    op.("dataInicioSintomas"),
    op.("sintomas"),
    op.("tipoTeste"),
    op.("resultadoTeste"),
    op.("idade"),
    op.("municipioIBGE")
  ])
sipni =
  sipni
  |> Phi.Pipe.run_many(id, [
    op.("paciente_endereco_coIbgeMunicipio"),
    op.("paciente_idade"),
    op.("vacina_codigo"),
    op.("vacina_dataAplicacao"),
    op.("DOSE")
  ])
sivep =
  sivep
  |> Phi.Pipe.run_many(id, [
    op.("DT_SIN_PRI"),
    op.("NU_IDADE_N"),
    op.("CO_MUN_RES")
  ])

format: Validação e Formatação

  • e-SUS VE:
    • dataInicioSintomas
      • #2 esus_symptoms_date: date
    • cpf
      • #1 esus_cpf: string (hash)
    • sintomas
      • Valida se é sintomático
    • tipoTeste
      • Valida se é RT-PCR ou TESTE RÁPIDO - ANTÍGENO
    • resultadoTeste
      • Valida se teste foi positivo
    • idade
      • #4 esus_age_index_15_to_80_by_10: integer
    • municipioIBGE
      • #3 esus_city: integer (IBGE 7 dígitos)
esus_ve =
  esus_ve
  |> Phi.Pipe.run(:format, fn input_path, output_path ->
    file = File.open!(output_path, [:append])

    IO.write(
      file,
      [
        "esus_cpf",
        "esus_symptoms_date",
        "esus_city",
        "esus_age_index_15_to_80_by_10"
      ]
      |> Enum.join(",")
      |> Kernel.<>("\n")
    )

    input_path
    |> File.stream!(read_ahead: 100_000)
    |> NimbleCSV.RFC4180.parse_stream()
    |> Flow.from_enumerable()
    |> Flow.map(fn [date, cpf, _symptoms, type, result, age, city] ->
      if type in ["RT-PCR", "TESTE RÁPIDO - ANTÍGENO"] and String.first(result) == "P" do
        [
          cpf,
          Phi.FormatUtils.date(date, :datetime),
          Phi.FormatUtils.city(city),
          Phi.FormatUtils.age_index(age, :from_18_to_80_by_10)
        ]
      else
        nil
      end
    end)
    |> Flow.reject(&amp;is_nil/1)
    |> Stream.chunk_every(100_000)
    |> Stream.map(&amp;IO.write(file, NimbleCSV.RFC4180.dump_to_iodata(&amp;1)))
    |> Stream.run()

    File.close(file)
  end)
  • SIPNI:
    • paciente_cpf
      • #1 sipni_cpf: string (hash)
    • paciente_endereco_coIbgeMunicipio
      • #3 sipni_city: integer (IBGE 7 dígitos)
    • paciente_idade
      • #4 sipni_age_index_15_to_80_by_10: integer
    • vacina_codigo
      • Para definir #5 sipni_is_full_vaccination em [1]
      • #1 vaccine_code em [2]
    • vacina_dataAplicacao
      • #2 sipni_vaccination_date: date
    • vacina_nome
      • #2 vaccine_name em [2]
    • DOSE
      • Para definir #5 sipni_is_full_vaccination em [1]
  • Arquivos:
    • [1] format: Dados para linkage
    • [2] vaccines: Dados de vacinas
# Para converter IBGE 6 dígitos para IBGE 7 dígitos

try do
  :ets.new(:cities, [:set, :public, :named_table])
rescue
  _error -> :ets.delete_all_objects(:cities)
end

"sandbox/input/ms_cities.csv"
|> Path.expand(__DIR__)
|> File.read!()
|> NimbleCSV.RFC4180.parse_string()
|> Enum.map(fn [id] -> {String.slice(id, 0, 6), String.to_integer(id)} end)
|> then(&amp;:ets.insert(:cities, &amp;1))

# Para gerar a tabela de vacinas

try do
  :ets.new(:vaccines, [:set, :public, :named_table])
rescue
  _error -> :ets.delete_all_objects(:vaccines)
end

sipni =
  sipni
  |> Phi.Pipe.run(:format, fn input_path, output_path ->
    file = File.open!(output_path, [:append])

    IO.write(
      file,
      [
        "sipni_cpf",
        "sipni_vaccination_date",
        "sipni_city",
        "sipni_age_index_15_to_80_by_10",
        "sipni_is_full_vaccination"
      ]
      |> Enum.join(",")
      |> Kernel.<>("\n")
    )

    input_path
    |> File.stream!(read_ahead: 100_000)
    |> NimbleCSV.RFC4180.parse_stream()
    |> Flow.from_enumerable()
    |> Flow.map(fn [cpf, city, age, v_code, date, v_name, v_dosage] ->
      v_code = String.to_integer(v_code)
      :ets.insert_new(:vaccines, {v_code, v_name})

      city =
        if city == "500000" do
          nil
        else
          Phi.FormatUtils.city(city, :cities)
        end

      is_full_vaccination =
        if v_code == 88 or v_dosage == "2" do
          1
        else
          0
        end

      [
        cpf,
        Phi.FormatUtils.date(date, :dd_mm_yyyy),
        city,
        Phi.FormatUtils.age_index(age, :from_18_to_80_by_10),
        is_full_vaccination
      ]
    end)
    |> Stream.chunk_every(100_000)
    |> Stream.map(&amp;IO.write(file, NimbleCSV.RFC4180.dump_to_iodata(&amp;1)))
    |> Stream.run()

    File.close(file)
  end)
  |> Phi.Pipe.run(
    :vaccines,
    fn _input_path, output_path ->
      :vaccines
      |> :ets.tab2list()
      |> Enum.map(fn {code, name} -> [code, name] end)
      |> Enum.sort(&amp;(List.first(&amp;1) <= List.first(&amp;2)))
      |> then(&amp;[~w(code name) | &amp;1])
      |> NimbleCSV.RFC4180.dump_to_iodata()
      |> then(&amp;File.write!(output_path, &amp;1))
    end,
    result_dir: paths.output
  )
  |> tap(fn _sipni -> :ets.delete(:vaccines) end)
  |> then(&amp;Map.put(&amp;1, :path, &amp;1.history.format))
  • SIVEP:

    • DT_SIN_PRI
      • #2 sivep_symptoms_date: date
    • NU_CPF
      • #1 sivep_cpf: string (hash)
    • NU_IDADE_N
      • #4 sivep_age_index_15_to_80_by_10: integer
    • CO_MUN_RES
      • #3 sivep_city: integer (IBGE 7 dígitos)
    • HOSPITAL
      • #6 sivep_is_hospitalization: 1 | 0
    • EVOLUCAO
      • #7 sivep_is_death: 1 | 0
    • PCR_SARS2
  • Condicional caso:

    • CLASSI_FIN é 5 OU
    • PCR_RESULT é 1 OU
    • PCR_SARS2 é 1 OU
    • TP_TES_AN é 2 E
      • AN_SARS2 é 1
    • Invalida #81 HOSPITAL se valor não for 1
    • Invalida #127 EVOLUCAO se valor não for 1
# Para converter IBGE 6 dígitos para IBGE 7 dígitos

try do
  :ets.new(:cities, [:set, :public, :named_table])
rescue
  _error -> :ets.delete_all_objects(:cities)
end

"sandbox/input/ms_cities.csv"
|> Path.expand(__DIR__)
|> File.read!()
|> NimbleCSV.RFC4180.parse_string()
|> Enum.map(fn [id] -> {String.slice(id, 0, 6), String.to_integer(id)} end)
|> then(&amp;:ets.insert(:cities, &amp;1))

sivep =
  sivep
  |> Phi.Pipe.run(:format, fn input_path, output_path ->
    file = File.open!(output_path, [:append])

    IO.write(
      file,
      [
        "sivep_cpf",
        "sivep_symptoms_date",
        "sivep_city",
        "sivep_age_index_15_to_80_by_10",
        "sivep_is_case",
        "sivep_is_hospitalization",
        "sivep_is_death"
      ]
      |> Enum.join(",")
      |> Kernel.<>("\n")
    )

    input_path
    |> File.stream!(read_ahead: 100_000)
    |> NimbleCSV.RFC4180.parse_stream()
    |> Flow.from_enumerable()
    |> Flow.map(fn [
                     date,
                     cpf,
                     age,
                     city,
                     hospitalization,
                     pcr_result,
                     classification,
                     death,
                     pcr_sars2,
                     test_type,
                     test_sars2_result
                   ] ->
      if classification == "5" or pcr_result == "1" or pcr_sars2 == "1" or
           (test_type == "2" and test_sars2_result == "1") do
        city =
          if city == "500000" do
            nil
          else
            Phi.FormatUtils.city(city, :cities)
          end

        [
          cpf,
          Phi.FormatUtils.date(date, :dd_mm_yyyy),
          city,
          Phi.FormatUtils.age_index(age, :from_18_to_80_by_10),
          1,
          if(hospitalization == "1", do: 1),
          if(death == "2", do: 1)
        ]
      end
    end)
    |> Stream.reject(&amp;is_nil/1)
    |> Stream.chunk_every(100_000)
    |> Stream.map(&amp;IO.write(file, NimbleCSV.RFC4180.dump_to_iodata(&amp;1)))
    |> Stream.run()

    File.close(file)
  end)

linkage: Linkage entre as bases

Full join entre as bases através das colunas de CPF, partindo da base do e-SUS VE.

  • Resultado:
    • #1 cpf: string (hash)
    • #2 city: integer (IBGE 7 dígitos)
    • #3 age_index_15_to_80_by_10: integer
    • #4 esus_symptoms_date: date
    • #5 sivep_symptoms_date: date
    • #6 sipni_vaccination_date: date
    • #7 sivep_is_case: 1 | 0
    • #8 sivep_is_hospitalization: 1 | 0
    • #9 sivep_is_death: 1 | 0
    • #10 sipni_is_full_vaccination: 1 | 0
op = fn input1_column, input2_path, input2_column ->
  &amp;Phi.XSV.full_join(&amp;1, input2_path, &amp;2, input1_column, input2_column)
end

id = :linkage

one_of = fn
  "", "" -> nil
  "", s2 -> {:ok, s2}
  s1, _s2 -> {:ok, s1}
end

esus_ve =
  esus_ve
  |> Phi.Pipe.run(:esus_sipni_linkage, op.("esus_cpf", sipni.path, "sipni_cpf"))
  |> Phi.Pipe.run(:linkage_format, fn input_path, output_path ->
    file = File.open!(output_path, [:append])

    IO.write(
      file,
      [
        "cpf",
        "city",
        "age_index_15_to_80_by_10",
        "esus_symptoms_date",
        "sipni_vaccination_date",
        "sipni_is_full_vaccination"
      ]
      |> Enum.join(",")
      |> Kernel.<>("\n")
    )

    input_path
    |> File.stream!(read_ahead: 100_000)
    |> NimbleCSV.RFC4180.parse_stream()
    |> Flow.from_enumerable()
    |> Flow.map(fn [cpf1, e2, c1, a1, cpf2, s2, c2, a2, s5] ->
      with {:ok, cpf} <- one_of.(cpf1, cpf2),
           {:ok, city} <- one_of.(c1, c2),
           {:ok, age} <- one_of.(a1, a2) do
        [cpf, city, age, e2, s2, s5]
      end
    end)
    |> Flow.reject(&amp;is_nil/1)
    |> Stream.chunk_every(100_000)
    |> Stream.map(&amp;IO.write(file, NimbleCSV.RFC4180.dump_to_iodata(&amp;1)))
    |> Stream.run()

    File.close(file)
  end)
  |> Phi.Pipe.run(:sivep_linkage, op.("cpf", sivep.path, "sivep_cpf"))
  |> Phi.Pipe.run(:full_linkage, fn input_path, output_path ->
    file = File.open!(output_path, [:append])

    IO.write(
      file,
      [
        "cpf",
        "city",
        "age_index_15_to_80_by_10",
        "esus_symptoms_date",
        "sivep_symptoms_date",
        "sipni_vaccination_date",
        "sivep_is_case",
        "sivep_is_hospitalization",
        "sivep_is_death",
        "sipni_is_full_vaccination"
      ]
      |> Enum.join(",")
      |> Kernel.<>("\n")
    )

    input_path
    |> File.stream!(read_ahead: 100_000)
    |> NimbleCSV.RFC4180.parse_stream()
    |> Flow.from_enumerable()
    |> Flow.map(fn [cpf1, c1, a1, e2, s2, s5, cpf2, si2, c2, a2, si5, si6, si7] ->
      with {:ok, cpf} <- one_of.(cpf1, cpf2),
           {:ok, city} <- one_of.(c1, c2),
           {:ok, age} <- one_of.(a1, a2) do
        [cpf, city, age, e2, si2, s2, si5, si6, si7, s5]
      end
    end)
    |> Flow.reject(&amp;is_nil/1)
    |> Stream.chunk_every(100_000)
    |> Stream.map(&amp;IO.write(file, NimbleCSV.RFC4180.dump_to_iodata(&amp;1)))
    |> Stream.run()

    File.close(file)
  end)
  |> Phi.Pipe.run(:linkage_sort, &amp;Phi.XSV.full_sort(&amp;1, &amp;2))
  |> Phi.Pipe.run(:linkage_duplicity_filter, fn input_path, output_path ->
    file = File.open!(output_path, [:append])

    IO.write(
      file,
      [
        "cpf",
        "city",
        "age_index_15_to_80_by_10",
        "esus_symptoms_date",
        "sivep_symptoms_date",
        "sipni_vaccination_date",
        "sivep_is_case",
        "sivep_is_hospitalization",
        "sivep_is_death",
        "sipni_is_full_vaccination"
      ]
      |> Enum.join(",")
      |> Kernel.<>("\n")
    )

    input_path
    |> File.stream!(read_ahead: 100_000)
    |> Stream.drop(1)
    |> Enum.reduce({[], 0}, fn line, {data, size} ->
      if List.first(data) == line do
        {data, size}
      else
        if size > 100_000 do
          [last | data] = data
          IO.write(file, Enum.join(data, ""))
          {[last], 1}
        else
          {[line | data], size + 1}
        end
      end
    end)
    |> then(&amp;elem(&amp;1, 0))
    |> Enum.join("")
    |> then(&amp;IO.write(file, &amp;1))

    File.close(file)
  end)
  |> Phi.Pipe.run(id, &amp;Phi.XSV.sort(&amp;1, &amp;2, select: [2]), result_dir: paths.output)
Phi.XSV.run(~w(headers #{esus_ve.path})) |> IO.puts()

Sanitização

> Atenção! A partir desta seção os caminhos do path de cada Pipe poderão ser inválidos

  • Confirme :rename_results? para renomear os arquivos de saída para nomes mais significativos
  • Confirme :purge? para remover o diretório de processamento
sanitize = fn opts ->
  if Keyword.get(opts, :purge?, false) == true do
    File.rm_rf!(paths.pipe)
  end

  if Keyword.get(opts, :rename_results?, false) == true do
    path = esus_ve.history.linkage

    if File.exists?(path) do
      new_path = Path.join(Path.dirname(path), "linkage.csv")
      File.rename!(path, new_path)
    end

    path = sipni.history.vaccines

    if File.exists?(path) do
      new_path = Path.join(Path.dirname(path), "vaccines.csv")
      File.rename!(path, new_path)
    end
  end
end

sanitize.(rename_results?: true, purge?: true)