Executando verificação de segurança...
25

TabNewsLake - Um Datalake para nossas News

TabNewsLake

Consumo da API pública do TabNews para criação de um Datalake.

"Arquitetura TabNews Lake"

Sobre

Essa iniciativa tem como objetivo as seguintes ações em relação aos dados:

  • Coletar
  • Armazenar
  • Organizar
  • Analisar

Bem como compartilhar os resultados obtidos e fazer uso dos mesmos dados para treinamentos voltados à área de Engenharia, Ciência e Análise de dados.

Etapas

1. Ingestão

No primeiro momento estamos coletando os dados diretamente da API fornecida pelo time da TabNews. Utilizamos como referência este post do GabrielSozinho.

Para coletar os dados estamos utilizando um script Python com a biblioteca requests no endpoint https://www.tabnews.com.br/api/v1/contents . Onde a busca é feita do content mais recente para o mais antigo. Vale dizer que por hora, fazemos uma varredura iterativa até que sejam coletados todos os contents com os devidos status atualizado. Isto é, com o valor atualizado de TabCoins, número de contents filhos, etc.

Foi aberta uma issue sugerindo a possibilidade de receber os contents mais atulizados no lugar dos mais recentes e/ou relevantes, assim, não seria necessário passar por todos contents da plataforma para atualizar a base toda.

def get_data(**kwargs):
    url = "https://tabnews.com.br/api/v1/contents"
    resp = requests.get(url, params=kwargs)
    return resp

O script Python criado, em sua primeira versão, realizaria a ingestão diretamente no Datalake utilizando Apache Spark, entretando, o custo envolvido para disponibilizar o cluster não faria sentido para o trabalho. Assim, como alternativa (e melhor opção na realidade), subimos um delivery stream no Kinesis Data Firehose na AWS. Desta forma, o script Python realiza um PUT diretamente no Firehose, onde o mesmo entrega o dado no S3 em formato JSON.

def save_with_firehose(data, firehose_client):

    data = [[i] for i in data]

    d = json.dumps(data)[1:-1].replace("], [", "]\n[") + "\n"

    firehose_client.put_record(
        DeliveryStreamName="tabnews-contents",
        Record={"Data": d},
    )

    return None

Levou um tempo para descobrir o formato ideal de como enviar os dados para o Firehose de maneira que o Apache Spark fosse capaz de ler, mas deu certo!

Para executar o script python basta:

$ python src/raw/contents.py --help
usage: contents.py [-h] [--date DATE] [--save_type {firehose,spark}]

optional arguments:
  -h, --help            show this help message and exit
  --date DATE           Data limite para busca (mais antiga): YYYY-MM-DD
  --save_type {firehose,spark}
                        Modo de salvar os dados

Assim, caso deseje coletar dados do dia atual até 01/06/2022, enviando os dados para seu delivery stream:

$ python src/raw/contents.py --date 2022-06-01 --save_type firehose

2. Consumo e criação de tabela em Bronze

Estamos trabalhando com camadas de dados. Isto é, separando em níveis a qualidade de nosso dado. O S3 onde o dado é entregue pelo Firehose chamamos de Raw. Nesta camada, pode haver dados duplicados, com muitos arquivos (pois é onde chega tudo no formato mais cru) e com possíveis inconsistências.

Para criar tabelas que sejam mais fáceis de se trabalhar, e aplicar a delicinha do SQL, criamos a camada Bronze, utilizando Apache Spark Structed Streaming com Auto Loader do Databricks. Com esses carinhas, podemos ler os dados em Raw e gravá-los em Bronze no formato Delta, facilitando e otimizando a leitura, bem como garantindo maior integridade dos dados.

Ainda que o cluster de Apache Spark não fique ligado 24/7 esperando os dados de Raw chegarem, utilizamos o CloudFiles com Stream, o que facilita nossa vida, pois o mesmo é capaz de gerenciar quais arquivos ele já leu e quais não, iteração por iteração. Ou seja, ao realizar o processamento dos dados em Raw, ele lê, processa e armazena em Bronze os dados, na sua próxima iteração (agendada), haverão dados novos (ou não) em Raw, assim, ele lerá apenas esses novos dados e fará seu processo normalmente (mesmo que estejam no mesmo bucket://folder que os dados antigos, ignorando-os).

df_stream = (spark.readStream
                  .format("cloudFiles")
                  .schema(schema)
                  .option("cloudFiles.format", "json")
                  .load(raw_path))

stream = (df_stream.writeStream
                  .trigger(once=True)
                  .option("checkpointLocation", checkpoint_path)
                  .foreachBatch(lambda df, batchId: upsert(df, df_delta, table))
                  .start())

Próximos passos

  • Realizar estatísticas descritivas dos posts e usuários da plataforma

Bônus

Algumas estatísticas a partir dos dados coletados:

Estatísticas por data

Quantidade histórica de conteúdos postados e usuários distintos realizando posts:

Estatísticas Diárias

Claro que há interesse em entender o tamanho atual desta plataforma, assim podemos considerar os valores acumulados:

Estatísticas Acumuladas

Estatísticas por usuário

Bora acompanhar os top 10 usuários que mais realizando posts por aqui?

Post Usuários

Podemos fazer o mesmo para TabCoins? Está na mão!

TabCoins Usuários

Por enquanto é isso.

Carregando publicação patrocinada...
2

Muito legal os gráficos gerados, principalmente os dois primeiros. O TabNews tem a página /status que contém só algumas informações básicas. Será legal um dia poder consumir esse tipo de informação diretamente por lá, mesmo que os gráficos sejam atualizados apenas uma vez ao dia para evitar um consumo excessivo de processamento.

2

Excelente trabalho teomewhy! As ferramentas para datalake da AWS são muito boas, não só para estatística, como também para otimização de processos. Na empresa onde eu trabalho, conseguimos otimizar um processo importante de negócio (que não posso dar detalhes aqui, por questão de segurança) onde o processo demorava cerca de 12 horas para finalizar e conseguimos reduzir esse tempo para cerca 1h, somente utilizando datalake e lambda.

1

Da hora de mais! Usar o Apache Spark como solução de engine de dados em memória ajuda muito a escalar o processamento, de forma barata. Gosto bastante hehe.

2

Que sensacional, tanto a abordagem dos dados quanto toda descrição do que foi feito aqui nessa publicação! Vou divulgar no meu LinkedIn amanhã 🎉

Uma dúvida: no último gráfico, ele representa a soma das TabCoins dos contents de cada usuário? Se sim, isso inclui tanto contents com e sem parent_id?

2

Salve, filipedeschamps!
Obrigado pelo feedback! 😁

Então, pelo que entendi, o endpoint de content só retorna os posts pais e suas respectivas estatísticas. Nenhum dos dados obtidos por este endpoint tem parent_id diferente de null, i.e. são todos nulos.

Fiz um bate de números para alguns posts, e o valor de TabCoins para eles bate com o que está no campo tabcoins que a API retorna, e é este mesmo campo que utilizei para somar as TabCoins:

select owner_username,
       count(distinct id) as qtdContent,
       sum(tabcoins) as qtTabcoins,
       sum(tabcoins) / count(distinct id) as qtTabCoinsContent

from bronze.tabnews.contents

group by 1
order by 3 desc

limit 10

Acho que temos como oportunidade navegar post a post para pegar todos os posts filhos, hehe.

1

Boa, correto! Por padrão o endpoint /contents vai assumir stragegy=best e é o que está sendo utilizado na Home 🤝

Outra estratégia que você pode utilizar para o scraping é o stragegy=new que irá listar as publicações de forma descrescente, veja:

https://www.tabnews.com.br/api/v1/contents?strategy=new&page=1

E caso queira pegar tudo de um usuário (tanto publicações root quanto child):

https://www.tabnews.com.br/api/v1/contents/filipedeschamps?strategy=new&page=1

1

Perfeito! No nosso script já utilizamos o strategy=new. A questão é que se um post (particularmente os mais antigos) é atualizado ou ganha mais Tabcoins, mesmo pegando por essa estratégia não vamos ter visibilidade, certo? A não ser que percorra sempre todas as pages.

Por isso pode ser uma boa ideia ter uma strategy=updated_at (algo assim), ordenando os contents pela sua atualização, seja por edição, tabcoins, childs, etc. Assim evitamos percorrer todas as pages para pegar todos os contents atualizados, trabalhando apenas com as pages mais recentes. Não sei se me fiz entender hehe. Mas é algo que ajudaria tanto na coleta dos dados (em velocidade), quanto evitaria o escalonamento de minhas requests à API.

Coloquei esses pontos nessa issue

2

Perfeito! No nosso script já utilizamos o strategy=new. A questão é que se um post (particularmente os mais antigos) é atualizado ou ganha mais Tabcoins, mesmo pegando por essa estratégia não vamos ter visibilidade, certo? A não ser que percorra sempre todas as pages.

Hmm é verdade!

Por isso pode ser uma boa ideia ter uma strategy=updated_at (algo assim), ordenando os contents pela sua atualização, seja por edição, tabcoins, childs, etc. Assim evitamos percorrer todas as pages para pegar todos os contents atualizados, trabalhando apenas com as pages mais recentes. Não sei se me fiz entender hehe. Mas é algo que ajudaria tanto na coleta dos dados (em velocidade), quanto evitaria o escalonamento de minhas requests à API.

Seria bom até para outras coisas 🤝

Em paralelo, para o maior controle possível seria legal expor a tabela balance_operations com todas as movimentações. No futuro gostaria de fazer isso 🤝

1
1

Muito legal Téo, meus parabéns. Seria legal subir essa base no Kaggle para que mais pessoas rodem análises. Vou pegar os dados e gerar algumas análises aqui.

1

Caramba, se eu te disser que inicei uma tentativa de webscraping, mas de uma forma bem mais caseira, acredita?!

Muito boa a sua ideia e conteúdo, parabéns!

1

Fala aí Teo!? Muito bacana o seu conteúdo... parabéns meu caro.

Deixo uma sugestão...

Fazer a infraestrutura do DL com Terraform e compartilhar junto com o código do projeto...
T+

2

Salve! Obrigado!

Isso é uma ótima ideia, principalmente por eu não ter contato direto com o Terraform.

Esse lake foi criado usando uma stack do cloudformation disponibilizado pelo Databricks. Na verdade eu criei o S3 via console e o ambiente do databricks em si que foi criado via cloudformation. Além de usarmos o mesmo ambiente para diferentes repos/projetos.

Gostei da sugestão, acho que vale dar uma estudada sim e pensar em algo. Valeu!

1
1
1
1
1