Criei um pipeline de dados com a api do jogo league of legends utilizando ferramentas open source
Do inicio
O projeto nasceu no final de um bootcamp sobre engenharia de dados, onde para receber o certificado era necessário criar um projeto com as tecnologias vistas durante o bootcamp, não era para recriar um exercício, ou 'copiar só que diferente', é você ver um problema e então resolver.
A um tempo atras eu jogava league of legends, até pensei que era besteira achar que recuperar dados de um jogo é um problema, mas ao longo do tempo fui parando de jogar, para só assistir, ver campeonatos, live streams, e percebi que não é só um joguinho, existe muito dinheiro rolando no mundo dos jogos competitivos, e o Brasil está neste mundo, o mercado dos jogos competitivos.
A algum tempo criou-se uma discussão sobre o cenário dos jogadores de league of legends do Brasil. Uma discussão que os desafiantes (maior rank que um jogadores pode chegar), os 200 melhores, estavam criando uma leva de jogadores profissionais sem qualidade, aonde o desafiante no Brasil seria um platina em outros servidores.
Eu não sou nem de perto um jogador do desafiante, mas eu sei aonde consigo os dados destes jogadores, e as partidas que jogaram, e até o que acontece dentro delas. Então eu queria mostrar um resumo dos jogadores desafiantes e quem sabe comparar isso com outras nações. Assim surgiu o projeto.
Arquitetura do pipeline
O bootcamp tinha um foco com a AWS, porém a AWS abstrai algumas coisas, o Data Bricks também, e eu gosto de entender um pouco mais das ferramentas de forma conceitual, então eu trouxe toda a parte de configurações das ferramentas (não tudo, só o suficiente) para minha máquina, com zero custos adicionais, utlizando Docker e docker-compose.
Como eu vim do desenvolvimento de software, acabei criando uma aplicação que abstrais as consultas para a API da Riot (empresa dona do jogo), persiste os dados no data lake, e que poderia ser utilizada pelo Airflow.
Data Colector
A aplicação se encontra no diretório data_colector, em uma arquitetura simples, aonde o resultado são duas classes que serão instanciadas dentro do Airflow para ai sim serem utilizadas pelas tasks. As duas classes principais são:
APIService: Responsavel pela chamada a api da riot games, fazendo todo o tratamento de erros, e evitando o bloquei de ip, através a biblioteca backoff.
MinioWriter: Responsavel por persistir os dados no data lake.
A classe APIService tem diversos métodos que basicamente chamam um end-point diferente da API, alguns métodos dependem de dados vindos de execuções anteriores, e ai entra o nosso orquestrador, o Airflow.
Data Lake
O data lake foi construido utilizando a ferramenta Minio, sendo um armazem de objetos, você pode salvar todo tipo de arquivo, json, xml, fotos, videos etc...
A arquitetura do data lake é separado por 3 camadas (buckets), bronze, gold, silver, em outras arquiteturas podem mudar os nomes (raw, curated etc...), ou ter mais camadas.
Bronze: Onde os dados chegam crus, sem nenhum tratamento, apenas são armazenados e particionados.
Silver: Os dados são parseados, estruturados de forma tabular, deduplicados, e salvos em formato delta table (mais para frente falaremos sobre o delta lake).
Gold: Os dados são agregados, aplica-se regras de negócio e estão disponiveis para uso.
Orquestração com Airflow
O Airflow é um orquestrador, o intuito dele é basicamente mandar, mas neste projeto ele vai executar também. A dag instancia o data_colector, então executa seus métodos, de forma separada para ser algo mais assincrono.
Tudo começa com a busca do summoners (jogadores), no elo desafiente, que ao todos são 200. Então com os summoners, podemos recuperar suas maestrias (uma métrica dos campeões que os jogadores utilizam) e suas ultimas 10 partidas. Aqui entramos em um ponto um pouco chato, para recuperar 10 partidas de cada jogador, demora um bom tempo, pela latência das respostas da API e também a limitação dada as keys devs. A API da riot deixa você criar suas aplicações, e suas aplicações geram keys permitindo fazer chamadas a API, porem as keys de dev (personal) e produção tem diferenças, que é uma limitação de requests.
E isso fez uma diferença enorme, pois são 200 summoners * 10 = 2000 requests, isto para recuperar as partidas. Pois logo apos isto, é feito um filtro para eliminar partidas dulpicadas (pois pode ser que os jogadores jogaram juntos na mesma partida), e então é feito o request para recuparar os detalhes das partidas, com as informações de kills, itens, quem ganhou quem perdeu etc... 2000 partidas.
Existe também as infos estáticas, são json's com os dados de nome do campeão, nome dos poderes, valores base de cada item e entre outras informações. Optei por criar diretamente no airflow as funções que fazem estas chamas, por serem pequenas e simples. A task fetch_static_data utiliza a classe MinioWriter para persistir os dados no camada bronze do data lake.
Leva mais ou menos 50 minutos para rodar todo o ETL da API. Algumas tasks são executadas em paralelo como podem ver a dag no Airflow:
O orquetstrador utiliza 3 dags, esta da imagem acima, e as dags que executam o job spark para fazer a transformação dos dados dentro do data lake. Uma coisa que eu achei legal, no final desta dag ela chama outra dag spark_etl, utilizando TriggerDagRunOperator, a dag então faz a transição dos dados da camada bronze para a silver.
Separei as dags que fazem as transformações de cada camada, para eu poder executá-las separadamente, isolando a responsabilidade por dag. Então se eu querer mexer nos meus jobs spark que só afetam a camada silver, eu posso fazê-lo sem ter que esperar todo o ETL acontecer.
Spark
Spark é um ferramenta para processamento de dados distribuidos, mas como estamos em um container local, temos um container com o master e um worker, então não tem uma fila com vários workers que irão executar as tarefas e entregar em segundos. Mas a ideia aqui foi conseguir fazer com que essas três ferramentas se comuniquem: Minio + Spark + Airflow.
São as grandes ferramentas do mercado, tirando o minio, mas ele faz o papel do Amazon S3. Quando consegui conectar os três, minha mente explodiu, agora eu posso criar projetos com essas tecnologias open source, local e sem custo adicional. Ainda não subi na mão, um cluster com estas ferramentas, mas é no mínimo interessante ter esta possibilidade 'desbloqueada'.
Para conectar o spark ao minio é como conectar se conectar no s3, a diferença é que eles estão no mesmo network, então é só chamar a url e pronto. A imagem que utilizei eu achei através de um outro projeto do Rodrigo Carneiro (repo), essa imagem ja vem com hadoop instalado e configurado, então só precisa se preocupar com a configuração do master e do worker.
Mas para podermos fazer conexão entre o spark e o minio, e outras ferramentas, como o postgresql e utilizar o delta lake. Precisamos lidar com os packages .jar, e isso foi um problema, eu demorei para entender onde os jars deveriam estar, tanto no container onde o spark estava quanto até fora dele, na máquina local, para poder criar os jobs spark. Graças ao Pedro Simas Neto, me deu umas dicas que me ajudaram muito a entender esta parte. Mostrou que daria para baixar os próprios packages pelo spark-submit, através do maven. Você passa os packages por parâmetro e pronto, mas tem um certo porém, ainda não fui muito a fundo, o tempo de execução do seu job pode crescer consideravelmente, pois ele irá baixar estas dependências sempre que rodar, se tiver alguma outra forma de fazer, comente ai.
Delta Lake
Delta Lake nos permite versionar nossos dados, gerenciar o lake de forma que podemos deletar e atualizar, como se fosse um data warehouse, tanto que é o Delta que abriu o mundo do lake house. Utilizei o Delta apenas para entender como funciona sua instalação, o que me surpreendeu por ser simples, basicamente instalamos o jar do Delta, adicionamos as linhas seguintes linhas na configuração, e pronto, está rodando.
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension" "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog"
Os dados são salvos em formato delta, mas podeira salvar somente em parquet que para o propósito do projeto, daria no mesmo, resumindo, instalei mas não utilizei para o seu real uso (quem sabe mais para frente monto em lake house).
PostgreSql
Os dados já preparados, saindo da camada gold, então são persistidos no banco de dados PostgreSql, que seria o data warehouse, persisto em modo overwrite as tabelas de summoners e matchs, então toda vez que o ETL roda as tabelas são re-criadas, do zero, mas não tem problema, os dados históricos, que são os dados do summoner vem junto com a camada gold. Já os dados de matchs, não precisam de histórico, pois quando a partida acaba os dados não mudam mais, não vai ter mais kills ou mais jogadores, então é persistido somente uma versão. O summoner tem dados de agregação como quantidade de partidas, o nome, se é veterano ou não, quantidade de pontos, esses dados mudam ao longo do tempo, e isso eu gostaria de saber, por isto versionados estes dados.
Metabase
Metabase a plataforma para a analise e visualização dos dados, é open source, e muiito completa. O Metabase se conecta a fonte, que é o PostgreSql, e atraves dos componentes de visualização podemos criar gráficos apartir de queries:
Acabou?
O projeto é consideravelmente grande, eu comecei a desenvolver o data_colector nos tempos vagos, mas foi nas ferias do trabalho que eu parei para construir toda a arquitetura do pipeline, então me dei este tempo para fazer o possível para ter a POC pronta, chegando a este resultado. Pela limitação das requests da API, busquei somente as últimas 10 partidas por jogador, ao longo dos dias foram adicionando novas partidas mas não no volume que eu esperava, o que seria 100 partidas mais ou menos por jogador. Vou procurar solicitar o token de produção pois a quantidade de requests aumenta consideravelmente, mas como é um projeto de estudos e não aberto para outros jogadores, pode ser que não se encaixe nas políticas da Riot, mas o jeito é tentar.
Mas da para melhorar?
Claro, eu tenho algumas ideias do que irei ir implementando para deixar o projeto mais "redondinho":
- Criar um sistema de checkpoint, hoje se o script que busca os dados das partidas parar, ele vai começar do zero, então se estivermos em 1400 de 2000 partidas, e por algum motivo o job parar, já era, irá começar desde o inicio. A ideia seria salvar a lista de matchs em um Redis, a cada match recuperada é removida da lista e persistida novamente, o job continua até zerar a lista OU se começar o ETL desde o início.
- Teste unitárias no data_colector, a implementação de teste ajudaria tanto na cobertura de falhas como ajudaria na melhoria do código.
- Decidir se fica ou sai o Delta Lake, poderiasmos utilizar o lake como um lake house e o banco só para conectar na ferramenta de data viz OU remover o Delta Lake e modelar o PosgreSql com star schema ou snow flake.
- Expandir o data_colector e com isto os jobs para trazerem dados de vários servidores, assim conseguimos comparar cada server por região, seria uma feature MUITO interessante, talvez até comece por esta.
- Talvez, separar o data_colector em outro repositório e importar para o projeto através do pip.
AWS
Tenho um plano para subir boa parte da arquitetura na AWS, talvez o airflow continue local não sei bem, vamos ver os custos, mas segue a imagem da arquitetura utilizando a AWS:
Tabnews
Migrei o post para o tabnews, adorei a iniciativa, quero postar aqui e só por o link nas outras plataformas no futuro :D