🛠️ Resolvendo o desafio de um bilhão de linhas em Go (de 1m40s para 8.4s)
Há um tempo atrás, um amigo comentou comigo sobre um desafio que envolvia ler um arquivo com 1 bilhão de linhas. Achei a ideia muito interessante, mas como estava em semana de provas na faculdade, acabei deixando para ver isso depois. Meses depois, vi um vídeo do Theo sobre o desafio e resolvi dar uma olhada mais de perto.
O objetivo do One Billion Row Challenge é calcular a temperatura mínima, máxima e média de uma lista de cidades - o detalhe é que são 1 bilhão de itens nessa lista, onde cada item consiste do nome de uma cidade e uma temperatura, sendo que cada cidade pode aparecer mais de uma vez. or fim, o programa deve exibir esses valores em ordem alfabética pelo nome da cidade.
Achei que seria divertido tentar resolver o desafio, mesmo que não houvesse uma recompensa. Enfim, fiz este texto descrevendo o meu processo.
Primeira tentativa: fazendo o código funcionar
Sempre que preciso resolver um problema mais complicado, minha primeira meta é fazer o programa funcionar. Pode não ser o código mais rápido e nem o mais limpo, mas é um código que funciona.
Basicamente, criei a estrutura Location
para representar cada cidade da lista, contendo a temperatura mínima, máxima, a soma das temperaturas e quantas vezes a cidade aparece na lista (esses dois últimos são necessários para calcular a média). Sei que existe uma maneira de calcular a média diretamente, sem precisar armazenar a quantidade de temperaturas e a soma delas. Mas, como mencionei anteriormente, o objetivo era fazer o código funcionar.
A lista de dados é formada pelo nome da cidade seguido pela temperatura, separados por um ponto e vírgula. Por exemplo:
Antananarivo;15.6
Iqaluit;-20.7
Dolisie;25.8
Kuopio;-6.8
O jeito mais simples para ler os dados é utilizando o Scan
, que permite ler uma linha de cada vez. Com a linha, é possível dividi-la em duas partes: os valores antes e após o ponto e vírgula. Para obter a temperatura, é possível usar o strconv.ParseFloat
, que converte uma string em um float. O código completo da primeira implementação pode ser visto abaixo:
package main
import (
"bufio"
"fmt"
"math"
"os"
"sort"
"strconv"
"strings"
)
type Location struct {
min float64
max float64
sum float64
count int
}
func NewLocation() *Location {
return &Location{
min: math.MaxInt16,
max: math.MinInt16,
sum: 0,
count: 0,
}
}
func (loc *Location) Add(temp float64) {
if temp < loc.min {
loc.min = temp
} else if temp > loc.max {
loc.max = temp
}
loc.sum += temp
loc.count += 1
}
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
func main() {
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
file, _ := os.Open("./measurements.txt")
defer file.Close()
m := map[string]*Location{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
name, tempStr, _ := strings.Cut(line, ";")
temp, _ := strconv.ParseFloat(tempStr, 32)
loc, ok := m[name]
if !ok {
loc = NewLocation()
m[name] = loc
}
loc.Add(temp)
}
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
for _, name := range keys {
loc := m[name]
mean := loc.sum / float64(loc.count)
fmt.Printf("%s: %.1f/%.1f/%.1f\n", name, loc.min, mean, loc.max)
}
}
Essa versão mais simples levou aproximadamente 97 segundos
para rodar.
Otimizando a conversão de strings para floats
Analisando o profile da execução, percebi que um dos maiores gargalos era a função strconv.ParseFloat
. Basicamente, tempo total de execução dela foi de 23 segundos (aproximadamente 23% do tempo total).
O problema dessa função é que ela é genérica, ou seja, é feita para funcionar com qualquer float válido. Porém, nossos dados têm um formato de temperatura bem específico. Veja o exemplo abaixo:
Antananarivo;15.6
Iqaluit;-20.7
Dolisie;5.8
Kuopio;-6.8
O formato da temperatura é sempre o mesmo: um ou dois dígitos antes do ponto e um dígito após o ponto, podendo incluir um sinal de menos no inicio. Assim, podemos criar uma função que converte os valores de forma específica, otimizando o processo sem a necessidade de realizar todas as verificações genéricas do ParseFloat
.
func bytesToTemp(b []byte) float64 {
var v int16
var isNeg int16 = 1
for i := 0; i < len(b)-1; i++ {
char := b[i]
if char == '-' {
isNeg = -1
} else if char == '.' {
digit := int16(b[i+1] - '0')
v = v*10 + digit
} else {
digit := int16(char - '0')
v = v*10 + digit
}
}
return float64(v*isNeg) / 10
}
Para ler os dados em formato de bytes em vez de string, alterei o retorno do Scanner
de string para bytes
line := scanner.Bytes()
before, after, _ := bytes.Cut(line, []byte{';'})
name := string(before)
temp := bytesToTemp(after)
Essas pequenas mudanças fizeram o tempo de execução cair para 75 segundos
.
Lendo pedaços maiores de dados
A maior vantagem de usar o Scan
é que o programa não precisa carregar todo o arquivo de uma só vez na memória. Em vez disso, permite ler linha por linha, trocando desempenho por memória.
É importante ressaltar que existe um meio-termo entre ler uma linha por vez e carregar os 14 GB de dados de uma só vez. Esse meio termo é a leitura de chunks, que são pedaços de memória. Dessa forma, ao invés de ler todo o arquivo de uma vez, podemos ler blocos de 128 MB.
buf := make([]byte, chunkSize)
reader := bufio.NewReader(file)
var leftData []byte
for {
n, err := reader.Read(buf)
if err != nil {
if err == io.EOF {
break
}
panic(err)
}
chunk := append(leftData, buf[:n]...)
lastIndex := bytes.LastIndex(chunk, []byte{'\n'})
leftData = chunk[lastIndex+1:]
lines := bytes.Split(chunk[:lastIndex], []byte{'\n'})
for _, line := range lines {
before, after, _ := bytes.Cut(line, []byte{';'})
name := string(before)
temp := bytesToTemp(after)
loc, ok := m[name]
if !ok {
loc = NewLocation()
m[name] = loc
}
loc.Add(temp)
}
}
Com isso, o tempo de execução caiu para 70 segundos
. Melhor que antes, mas ainda da para melhorar.
Alterando os tipos dos dados
É fato que todo o desafio gira em torno de números com casas decimais. Porém, lidar com pontos flutuantes é sempre um grande desafio (vide IEEE-754). Nesse caso, por que não usar inteiros para representar a temperatura?
type Location struct {
min int16
max int16
sum int32
count int32
}
Como definido anteriormente, uma temperatura é sempre representada por até três dígitos. Logo, removendo a vírgula, os valores podem variar entre -999 e 999, de modo que int16
é o suficiente para representá-los. Para a soma e a contagem, int32
é mais que o suficiente, visto que esse tipo pode variar entre -2147483648 e 2147483647.
Dado que agora esperamos um valor inteiro de 16 bits para a temperatura, precisamos modificar a função bytesToTemp
. Para isso, mudamos o retorno para int16
e removemos a divisão no final. Assim, a função vai sempre vai retornar um número inteiro.
func bytesToTemp(b []byte) int16 {
var v int16
var isNeg int16 = 1
for i := 0; i < len(b)-1; i++ {
char := b[i]
if char == '-' {
isNeg = -1
} else if char == '.' {
digit := int16(b[i+1] - '0')
v = v*10 + digit
} else {
digit := int16(char - '0')
v = v*10 + digit
}
}
return v * isNeg
}
Para finalizar, modifiquei a função Add
para aceitar os valores inteiros e ajustei o print para dividir os valores antes de mostrá-los na tela. Com isso, o tempo caiu três segundos, indo para 60 segundos
. Não é muito, mas uma vitória é uma vitória.
Melhorando a Performance da Conversão de Bytes para String
Novamente analisando o profile, vi que tinha uma certa função chamada slicebytetostring
que custava 13,5 segundos de tempo de execução. Analisando, descobri que essa função é a responsável por converter um conjunto de bytes em uma string (o próprio nome da função deixa claro isso). No caso, essa é a função chamada internamente quando se usa a função string(bytes)
.
Em Go, assim como na maioria das linguagens, strings são imutáveis, o que significa que não podem ser modificadas após serem criadas (normalmente, quando se faz isso, uma nova string é criada). Por outro lado, listas são mutáveis. Ou seja, quando se faz uma conversão de uma lista de bytes para string, é preciso criar uma cópia da lista para garantir que a string não mude se a lista mudar.
Para evitar o custo adicional de alocação de memória nessas conversões, podemos utilizar a biblioteca unsafe
para realizar a conversão de bytes para string (Nota: ela é chamada de unsafe
por um motivo).
name := unsafe.String(unsafe.SliceData(before), len(before))
Diferente do caso anterior, a função acima reutiliza os bytes passados para gerar a string. O problema disso é que, se a lista original mudar, a string resultante também será afetada. Embora possamos garantir que isso não ocorrerá neste contexto específico, em aplicações maiores e mais complexas, o uso de unsafe
pode se tornar bem inseguro.
Com essa mudança, reduzimos o tempo de execução para 51 segundos
. Nada mal.
Reimplementando bytes.Cut
Lembra que eu mencionei que as temperaturas sempre tinham formatos específicos? Então, segundo o profile da execução, que separa a linha em duas partes (nome da cidade e temperatura), custa 5.38 segundos para rodar. E refizermos ela na mão?
Para separar os dois valores, precisamos encontrar onde está o ";". Como a gente já sabe, os valores da temperatura podem ter entre três e cinco caracteres. Assim, precisamos verificar se o caractere anterior aos dígitos é um ";". Simples, não?
idx := 0
if line[len(line)-4] == ';' {
idx = len(line) - 4
} else if line[len(line)-5] == ';' {
idx = len(line) - 5
} else {
idx = len(line) - 6
}
before := line[:idx]
after := line[idx+1:]
Com isso, o tempo de execução foi para 46 segundos
, cerca de 5 segundos a menos que antes (quem diria, não é?).
Paralelismo para acelerar o processamento
Todo esse tempo, nosso objetivo foi tornar o código o mais rápido possível em um núcleo. Mudando coisas aqui e ali, diminuímos o tempo de 97 segundos para 46 segundos. Claro, ainda daria para melhorar o tempo sem ter que lidar com paralelismo, mas a vida é curta demais para se preocupar com isso, não é?
Para poder rodar o código em vários núcleos, decidi usar a estrutura de canais nativa do Go. Além disso, também criei um grupo de espera que vai indicar quando o processamento dos dados terminaram.
Vale destacar que
workers
, nesse caso, é uma constante que define quantasgoroutines
serão criadas para processar os dados. No meu caso, são 12, visto que eu tenho um processador com 6 núcleos e 12 threads.
chunkChan := make(chan []byte, workers)
var wg sync.WaitGroup
wg.Add(workers)
O próximo passo foi criar as goroutines
que serão responsável por receber os dados do canal e processá-los. A lógica de processamento dos dados é semelhante ao modelo single thread.
for i := 0; i < workers; i++ {
go func() {
for chunk := range chunkChan {
lines := bytes.Split(chunk, []byte{'\n'})
for _, line := range lines {
before, after := parseLine(line)
name := unsafe.String(unsafe.SliceData(before), len(before))
temp := bytesToTemp(after)
loc, ok := m[name]
if !ok {
loc = NewLocation()
m[name] = loc
}
loc.Add(temp)
}
}
wg.Done()
}()
}
Por fim, o código responsável por ler os dados do disco e enviá-los ao canal:
for {
n, err := reader.Read(buf)
if err != nil {
if err == io.EOF {
break
}
panic(err)
}
chunk := append(leftData, buf[:n]...)
lastIndex := bytes.LastIndex(chunk, []byte{'\n'})
leftData = chunk[lastIndex+1:]
chunkChan <- chunk[:lastIndex]
}
close(chunkChan)
wg.Wait()
Vale ressaltar que os mapas em Go não são thread-safe. Isso significa que acessar ou alterar dados no mesmo mapa de forma concorrente pode levar a problemas de consistência ou erros. Embora não tenha observado problemas durante meus testes, vale a pena tratar esse problema.
Uma das maneiras de resolver esse problema seria criar um mecanismo de trava para o mapa, permitindo que somente uma goroutine
consiga utilizá-lo por vez. Isso, claro, poderia tornar a execução um pouco mais lenta.
A segunda alternativa consiste em criar um mapa para cada uma das goroutines
, de modo que não vai existir concorrência entre elas. Por fim, os mapas são colocados em um novo canal e os valores do mapa principal calculados a partir deles. Essa solução ainda vai ter um custo, mas vai ser menor que a anterior.
close(chunkChan)
go func() {
wg.Wait()
close(mapChan)
}()
keys := make([]string, 0, 825)
m := map[string]*Location{}
for lm := range mapChan {
for lk, lLoc := range lm {
loc, ok := m[lk]
if !ok {
keys = append(keys, lk)
m[lk] = lLoc
continue
}
if lLoc.min < loc.min {
loc.min = lLoc.min
}
if lLoc.max > loc.max {
loc.max = lLoc.max
}
loc.sum += lLoc.sum
loc.count += lLoc.count
}
}
Além disso, como o processamento passou a ser distribuído entre diferentes núcleos, diminui o tamanho do chunk de 128 MB para 2 MB. Cheguei nesse número testando vários valores, tendo entre 1 MB e 5 MB os melhores resultando. Na média, 2 MB obteve o melhor desempenho.
Enfim, o nosso tempo de processamento caiu de 46 segundos
para... 12 segundos
.
Otimizando a quebra de linhas no chunk
Todas as vezes que eu analisava o profile, a função bytes.Split
chamava a minha atenção. O tempo de execução dela era de 16 segundos (tempo total, considerando todas as goroutines
), o que parece justo, visto que ela é responsável por quebrar um chunk em linhas. No entanto, parecia um trabalho dobrado, dado que ela primeiro quebra as linhas para, em seguida, as linhas serem lidas uma a uma. Por que não fazer ambos ao mesmo tempo?
Minha abordagem foi percorrer o chunk e verificar se o byte atual correspondia a um \n
. Dessa forma, consegui percorrer todas as linhas ao mesmo tempo em que as quebrava, processando em seguida.
start := 0
start := 0
for end, b := range chunk {
if b != '\n' {
continue
}
before, after := parseLine(chunk[start:end])
// ...
start = end + 1
}
Com essa simples mudança, o tempo de execução caiu para aproximadamente 9 segundos
.
Executed in 8.45 secs fish external
usr time 58.47 secs 152.00 micros 58.47 secs
sys time 4.52 secs 136.00 micros 4.52 secs
Próximos passos
Atualmente, o maior gargalo da aplicação é o mapa. Somando todas as operações de leitura e escrita, são 32 segundos (de longe, o tempo mais alto). Talvez criar um algoritmo de hash mais eficiente resolva? Fica como ideia para o futuro.
No mais, conseguimos diminuir o tempo de 1 minuto e 40 segundos para quase 8 segundos, sem usar qualquer biblioteca externa. Além disso, tentando fazer a aplicação ficar cada vez mais rápida, me fez aprender muita coisa.
Gostaria de destacar alguns artigos incríveis que achei de pessoas que também participaram do desafio.