Lidando com arquivos CSV gigantes em NodeJS
Se você precisa processar arquivos CSV no seu dia-a-dia, muito provavelmente já fez uso de alguma das diversas bibliotecas de parse de arquivos CSV para JSON. Tudo flui muito bem até você precisar lidar com aquele CSV gigante com milhões de linhas e, ao tentar processar, estoura a quantidade de memória do Node e tenta alternativas como utilizar flags como --max-old-space-size=8192
para aumentar a memória disponível do Node que, dependendo do seu arquivo, pode até funcionar, mas quase que provavelmente, depois de vários minutos processando, recebe novamente aquela exceção de que o Node ficou sem memória.
Isto acontece devido a estes processamentos carregarem todo o conteúdo que foi feito parse em memória e em vez de ir processando e liberando, ele simplesmente acumula mais e mais informações até chegar no limite e explodir travar.
Depois de certa "bateção de cabeça" com isso, descobri uma forma de lidar com arquivos CSV gigantes sem ter problemas de memória utilizando streams. Tudo fluiu bem, o problema é que em atividades assíncronas, como por exemplo para cada linha desse CSV fosse feito uma consulta em banco de dados e o progresso do processamento dependesse do retorno desta consulta fazia com que o processo ficasse bem lento devido à execução de uma linha depender de que a anterior fosse toda processada.
Foi aí que achei a biblioteca promise-streams
que me permitiu definir o número de threads que rodariam em paralelo no processamento das linhas do CSV. Em resumo, eu poderia processar quantas linhas eu quisesse ao mesmo tempo em paralelo, como se fosse um cluster, onde a cada tarefa concluída, automaticamente uma nova já era posta em execução, e, em teoria, sempre teríamos sendo processadas o número X de threads definidos por nós no código.
Exemplo
const csv = require("csv-parser");
const fs = require("fs");
const ps = require("promise-streams");
function main() {
const stream = fs.createReadStream("./arquivo_gigante.csv");
stream
.pipe(csv({ separator: "," }))
.pipe(
ps.map({ concurrent: 10 }, async (data) => {
new Promise((resolve) => {
try {
// faz o processamento necessário
} catch (err) {
// trata o erro
}
resolve(true);
});
})
)
.wait()
.then(() => {
console.log("Processamento finalizado!");
});
}
main();
Observe que em concurrent eu setei como 10
, ou seja, o código seria executado paralelamente em 10 threads, deixando o código bem mais performático caso o meu processamento demande de alguma resposta de API ou até mesmo consulta em banco de dados.
Desta forma, a execução deste processamento do CSV não fica acumulando valores "parseados" em memória, ele recebe a linha, processa, e a descarta, fazendo este fluxo em 10 execuções paralelas, linha por linha.
Este é o meu primeiro guia de qualquer coisa. Espero ter contribuído positivamente com a comunidade e com certeza adições e observações ao código são mais do que bem-vindas para o enriquecimento deste tema.