image

Access unlimited bootcamps and 650+ courses

50
%OFF
Article image
Luiz Júnior
Luiz Júnior29/08/2024 08:01
Share

Projeto Pipeline de Dados usando Telegram e AWS

  • #AWS
  • #Python
  • #AWS Lambda

1. Contexto

1.1 Afinal que é um chatbot?

Um chatbot é uma forma de software que interage com usuários por meio de conversas automatizadas em plataformas de mensagens. Um exemplo comum de aplicação para chatbots é o atendimento ao cliente, no qual, em muitos casos, auxiliam os clientes na resolução de problemas ou no esclarecimento de dúvidas frequentes antes mesmo da intervenção de um atendente humano.

1.2. O Telegram

O Telegram é uma plataforma de mensagens instantâneas freeware (distribuída gratuitamente) e, em grande parte, de código aberto. É amplamente utilizado por desenvolvedores devido à sua posição pioneira na implementação da funcionalidade de criação de chatbots, os quais possibilitam a automação de uma variedade de tarefas.

image

1.3. A Arquitetura

Uma tarefa analítica de interesse envolve a condução de uma análise exploratória dos dados recebidos por um chatbot para responder a perguntas como:

  1. Qual é o horário em que os usuários mais interagem com o bot?
  2. Qual é o problema ou dúvida mais comum?
  3. O bot está efetivamente resolvendo problemas ou esclarecendo dúvidas?

Dessa forma, propomos a criação de um pipeline de dados que realize a ingestão, processamento, armazenamento e exposição de mensagens de um grupo no Telegram, permitindo que profissionais de dados conduzam análises. A arquitetura sugerida é dividida em duas partes: uma transacional no Telegram, onde os dados são gerados, e outra analítica na Amazon Web Services (AWS), onde os dados são analisados.

image

  • Telegram
O Telegram atua como a fonte de dados transacionais. As mensagens enviadas por usuários em um grupo são capturadas por um bot e redirecionadas através do webhook do backend do aplicativo para um endpoint (um endereço web que aceita requisições HTTP) exposto pelo AWS API Gateway. As mensagens são transportadas no corpo ou payload da requisição.
  • AWS | Ingestão
Uma solicitação HTTP, contendo o conteúdo da mensagem em seu payload, é recebida pelo AWS API Gateway, que, por sua vez, encaminha essas informações para o AWS Lambda, atuando como seu gatilho. O AWS Lambda, por sua vez, recebe o payload da requisição no parâmetro "event", registra o conteúdo em um arquivo no formato JSON (mantendo o formato original do payload) e o armazena no AWS S3, organizado por partições diárias.
  • AWS | ETL
Diariamente, o AWS Event Bridge aciona o AWS Lambda, o qual processa todas as mensagens do dia anterior (com um atraso de um dia, ou D-1). Durante esse processo, ocorre a denormalização dos dados semiestruturados, comumente encontrados em arquivos no formato JSON. O conteúdo processado é então registrado em um arquivo com formato Apache Parquet e armazenado no AWS S3, organizado em partições diárias.
  • AWS | Apresentação
Finalmente, uma tabela no AWS Athena é vinculada ao bucket no AWS S3 que contém os dados processados: denormalizados, particionados e orientados por coluna. Isso possibilita que profissionais de dados executem consultas analíticas (agregações, ordenações, etc.) na tabela, utilizando SQL para extrair insights valiosos.

2. Ingestão de Dados

2.1 O que é uma API?

Para o projeto as mensagens são recolhidas por um bot e podem ser recuperadas por meio de uma API.

Uma API (Interface de Programação de Aplicações) é um conjunto de regras e definições que permite que softwares diferentes se comuniquem entre si. Essa interface possibilita que uma aplicação acesse funcionalidades ou dados de outra, agindo como uma ponte para a troca de informações. As APIs são fundamentais para a integração de sistemas e serviços, permitindo que desenvolvedores conectem diferentes componentes de software de maneira eficiente e interoperável.

A única informação necessária é o token de acesso concedido pelo BotFather durante a criação do bot.

Referência:

Oracle. (2022). What is an API? Understanding the Basics. https://www.oracle.com/cloud/what-is/api/
IMAGENS

A fase de ingestão tem a responsabilidade, como o próprio nome sugere, de incorporar os dados transacionais em ambientes analíticos. Em termos gerais, os dados ingeridos são mantidos no formato mais próximo possível do original, ou seja, sem qualquer alteração em seu conteúdo ou estrutura (esquema). A título de exemplo, dados provenientes de uma API web que segue o formato REST (representational state transfer) são recebidos e, consequentemente, armazenados no formato JSON.

Preservar os dados em seu formato original oferece diversas vantagens, especialmente a capacidade de reprocessamento. Essa persistência pode ser realizada de duas maneiras:

  • Batch: Blocos de dados são incorporados em uma frequência bem definida, geralmente em intervalos de horas ou dias.
  • Streaming: Dados são incorporados conforme são produzidos e disponibilizados.

No contexto deste projeto, as mensagens capturadas pelo bot do Telegram podem ser incorporadas através da API web de bots do Telegram, fornecendo os dados no formato JSON. Como o Telegram retém mensagens por apenas 24 horas em seus servidores, a ingestão via streaming é mais apropriada. Para possibilitar esse tipo de ingestão, será utilizado um webhook para redirecionar automaticamente as mensagens para outra API web.

Portanto, é necessário um serviço da AWS que forneça uma API web para receber os dados redirecionados, sendo o AWS API Gateway escolhido para essa função. Dentre suas diversas funcionalidades, o AWS API Gateway permite o encaminhamento dos dados recebidos para outros serviços da AWS. Assim, vamos conectá-lo ao AWS Lambda, que por sua vez, armazenará os dados em seu formato original (JSON) em um bucket do AWS S3.

Sistemas que respondem a eventos são comumente denominados como event-driven.

2.3 Passos Necessários

  1. stabelecer um "bucket" no AWS S3;
  2. Desenvolver uma função no AWS Lambda;
  3. Configurar uma API web no AWS API Gateway;
  4. Ajustar o "webhook" da API de bots do Telegram.

2.4 S3 na AWS

Na fase de ingestão, o AWS S3 desempenha o papel de armazenar de forma passiva as mensagens capturadas pelo bot do Telegram em seu formato original: JSON. Para isso, é suficiente criar um "bucket". Por padrão, adicionaremos o sufixo "-raw" ao seu nome (seguindo essa convenção para todos os serviços desta camada).

Uma pequena observação: um data lake é a denominação dada a um repositório de grande volume de dados. Ele é estruturado em zonas que armazenam réplicas dos dados em diversos níveis de processamento. Embora a nomenclatura das zonas possa variar, as mais comuns incluem: raw e enriched, ou bronze, silver e gold.

2.5 AWS Lambda

Na fase de ingestão, o AWS Lambda desempenha um papel ativo na persistência das mensagens capturadas pelo bot do Telegram em um bucket do AWS S3. Para isso, criaremos uma função que opera da seguinte maneira:

  1. Recebe a mensagem no parâmetro "event";
  2. Verifica se a mensagem tem origem no grupo correto do Telegram;
  3. Persiste a mensagem no formato JSON no bucket do AWS S3;
  4. Retorna uma mensagem de sucesso (código de retorno HTTP igual a 200) para a API de bots do Telegram.

Código usado no AWS Lambda

import os
import json
import logging
from datetime import datetime, timezone


import boto3



def lambda_handler(event: dict, context: dict) -> dict:


'''
Recebe uma mensagens do Telegram via AWS API Gateway, verifica no
seu conteúdo se foi produzida em um determinado grupo e a escreve,
em seu formato original JSON, em um bucket do AWS S3.
'''


# vars de ambiente


BUCKET = os.environ['AWS_S3_BUCKET']
TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])


# vars lógicas


tzinfo = timezone(offset=timedelta(hours=-3))
date = datetime.now(tzinfo).strftime('%Y-%m-%d')
timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')


filename = f'{timestamp}.json'


# código principal


client = boto3.client('s3')


try:


  message = json.loads(event["body"])
  chat_id = message["message"]["chat"]["id"]


  if chat_id == TELEGRAM_CHAT_ID:


    with open(f"/tmp/{filename}", mode='w', encoding='utf8') as fp:
      json.dump(message, fp)


    client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')


except Exception as exc:
    logging.error(msg=exc)
    return dict(statusCode="500")


else:
    return dict(statusCode="200")


Para garantir o funcionamento adequado da função, algumas configurações necessitam ser ajustadas.

Variáveis de Ambiente:

  • Observe que o código requer a configuração de duas variáveis de ambiente: AWS_S3_BUCKET com o nome do bucket do AWS S3 e TELEGRAM_CHAT_ID com o ID do chat do grupo do Telegram. Para incluir variáveis de ambiente em uma função do AWS Lambda, basta acessar Configurações -> Variáveis de Ambiente no console da função.

Observação: Variáveis de ambiente representam uma excelente maneira de armazenar informações sensíveis.

Permissão:

  • Por último, é necessário conceder permissão de escrita no bucket do AWS S3 para a função do AWS Lambda no AWS IAM.

2.6 AWS API Gateway

Na etapa de ingestão, estou utilizando o AWS API Gateway para receber as mensagens capturadas pelo meu bot do Telegram, enviadas via webhook, e iniciar uma função no meu AWS Lambda, passando o conteúdo da mensagem no seu parâmetro "event". Para fazer isso, estou seguindo os passos abaixo ao criar uma API e configurá-la como gatilho da minha função no AWS Lambda:

  1. Acessei o serviço e escolhi: Criar API -> REST API;
  2. Inseri um nome, preferencialmente um que termina com o sufixo "-api";
  3. Selecionei: Ações -> Criar Método -> POST;

Na tela de configuração:

  • Selecionei o tipo de integração como Lambda Function; Habilitei o uso de integração de proxy Lambda; Busquei pelo nome da minha função no AWS Lambda. Pude testar a integração com o AWS Lambda por meio da ferramenta de testes do serviço. Ao final, procedi com a implantação da API para obter o endereço web.

2.7 No Telegram

Vou configurar o webhook para redirecionar as mensagens para a URL do AWS API Gateway.

setWebhook:

  • O método setWebhook configura o redirecionamento das mensagens capturadas pelo meu bot para o endereço web especificado no parâmetro "url".

Observação: os métodos getUpdates e setWebhook são mutuamente exclusivos, ou seja, enquanto o webhook estiver ativo, o método getUpdates não funcionará. Para desativar o webhook, basta usar o método deleteWebhook.

Códigos Usados

response = requests.get(url=f'{base_url}/setWebhook?url={aws_api_gateway_url}')


print(json.dumps(json.loads(response.text), indent=2))
  • getWebhookInfo
response = requests.get(url=f'{base_url}/getWebhookInfo')


print(json.dumps(json.loads(response.text), indent=2))

3. ETL - Extraction, Transformation and Load

A fase de extração, transformação e carregamento (ETL) é uma etapa abrangente encarregada da manipulação dos dados provenientes de sistemas transacionais, ou seja, dados já armazenados nas camadas cruas ou raw de sistemas analíticos. Os procedimentos executados nesta etapa variam consideravelmente, dependendo da área da empresa, do volume, variedade e velocidade dos dados consumidos, entre outros fatores. Em linhas gerais, entretanto, os dados crus ingeridos passam por um processo contínuo de "data wrangling", no qual são limpos, deduplicados, etc., antes de serem armazenados utilizando técnicas de particionamento, orientação a coluna e compressão. Ao final deste processo, os dados processados estão prontos para serem analisados por profissionais de dados.

No contexto deste projeto, as mensagens de um único dia, já armazenadas na camada cru, serão consolidadas em um único arquivo, orientado a coluna e comprimido, para então serem persistidas na camada enriquecida. Além disso, durante essa etapa, os dados também passarão por processos de "data wrangling".

Para alcançar esses objetivos, utilizaremos uma função do AWS Lambda como o motor de processamento e um bucket do AWS S3 como a camada enriquecida para armazenar os dados processados. Para garantir a recorrência desse processo, configuraremos uma regra no AWS Event Bridge para acionar diariamente a função.

Na fase de ETL, o AWS S3 desempenha o papel de armazenar passivamente as mensagens processadas de um dia em um único arquivo no formato Parquet. Para isso, é suficiente criar um "bucket". Por padrão, adicionaremos o sufixo "-enriched" ao seu nome (mantendo essa convenção para todos os serviços desta camada).

- *Relembrando: um data lake é a denominação atribuída a um repositório de grande volume de dados. Ele é estruturado em zonas que armazenam réplicas dos dados em diferentes estágios de processamento. A nomenclatura das zonas pode variar, entretanto, as mais comuns incluem: raw e enriched, ou bronze, silver e gold.*

3.3 AWS Lambda

Na fase de ETL, estou utilizando o AWS Lambda para ativamente processar as mensagens capturadas pelo meu bot do Telegram, as quais foram armazenadas na camada cru no bucket do AWS S3, e persisti-las na camada enriquecida, também em um bucket do AWS S3.

Assim, vou criar uma função que opera da seguinte maneira:

1. Lista todos os arquivos JSON de uma única partição da camada crua em um bucket do AWS S3;

1. Para cada arquivo listado:

- Faz o download do arquivo e carrega o conteúdo da mensagem;

- Executa uma função de "data wrangling";

- Cria uma tabela do PyArrow e a concatena com as demais.

- Persiste a tabela no formato Parquet na camada enriquecida em um bucket do AWS S3.

*Observação: O uso de duas camadas distintas de armazenamento e processamento possibilita o reprocessamento dos dados crus de diversas maneiras, sempre que necessário.*

O código usado foi o seguinte:

import os
import json
import logging
from datetime import datetime, timedelta, timezone


import boto3
import pyarrow as pa
import pyarrow.parquet as pq



def lambda_handler(event: dict, context: dict) -> bool:


'''
Diariamente é executado para compactar as diversas mensagensm, no formato
JSON, do dia anterior, armazenadas no bucket de dados cru, em um único
arquivo no formato PARQUET, armazenando-o no bucket de dados enriquecidos
'''


# vars de ambiente


RAW_BUCKET = os.environ['AWS_S3_BUCKET']
ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']


# vars lógicas


tzinfo = timezone(offset=timedelta(hours=-3))
date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')
timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')


# código principal


table = None
client = boto3.client('s3')


try:


    response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'telegram/context_date={date}')


    for content in response['Contents']:


      key = content['Key']
      client.download_file(RAW_BUCKET, key, f"/tmp/{key.split('/')[-1]}")


      with open(f"/tmp/{key.split('/')[-1]}", mode='r', encoding='utf8') as fp:


        data = json.load(fp)
        data = data["message"]


      parsed_data = parse_data(data=data)
      iter_table = pa.Table.from_pydict(mapping=parsed_data)


      if table:


        table = pa.concat_tables([table, iter_table])


      else:


        table = iter_table
        iter_table = None


    pq.write_table(table=table, where=f'/tmp/{timestamp}.parquet')
    client.upload_file(f"/tmp/{timestamp}.parquet", ENRICHED_BUCKET, f"telegram/context_date={date}/{timestamp}.parquet")


    return True


except Exception as exc:
    logging.error(msg=exc)
    return False

E para fazer o wrangling usei o seguinte código.

def parse_data(data: dict) -> dict:


date = datetime.now().strftime('%Y-%m-%d')
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')


parsed_data = dict()


for key, value in data.items():


    if key == 'from':
        for k, v in data[key].items():
            if k in ['id', 'is_bot', 'first_name']:
              parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]


    elif key == 'chat':
        for k, v in data[key].items():
            if k in ['id', 'type']:
              parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]


    elif key in ['message_id', 'date', 'text']:
        parsed_data[key] = [value]


if not 'text' in parsed_data.keys():
  parsed_data['text'] = [None]


return parsed_data

Configurações de Variáveis de Ambiente

É importante observar que o código requer a configuração de duas variáveis de ambiente: `AWS_S3_BUCKET` e `AWS_S3_ENRICHED`, que correspondem aos nomes dos buckets do AWS S3 para as camadas cru e enriquecida, respectivamente. Para adicionar variáveis de ambiente a uma função do AWS Lambda, basta acessar Configurações -> Variáveis de Ambiente no console da função.

Permissões

É necessário conceder permissão de escrita nos buckets do AWS S3 para a função do AWS Lambda no AWS IAM.

Recursos

O timeout padrão para funções do AWS Lambda é de 3 segundos. No caso desta função, iremos aumentar o tempo para 5 minutos, especialmente para lidar com operações de entrada/saída (input/output) de arquivos no AWS S3.

3.4 AWS Event Bridge

Na etapa de ETL, o AWS Event Bridge desempenha um papel crucial ao ativar diariamente a função de ETL no AWS Lambda. Essa dinâmica operacional é essencial, uma vez que o Event Bridge atua como um confiável programador de tarefas, garantindo a execução regular e automatizada do processo de extração, transformação e carregamento (ETL). Dessa forma, essa integração entre o Event Bridge e o Lambda estabelece uma eficiente e programada operação, assegurando a consistência e a recorrência na manipulação dos dados ao longo do tempo.

4. Apresentação dos Dados

4.1. AWS Athena

Na fase de apresentação, o AWS Athena desempenha um papel fundamental ao fornecer dados por meio de uma interface SQL aos usuários do sistema analítico. Para criar essa interface, é simplesmente necessário estabelecer uma tabela externa com base nos dados armazenados na camada mais refinada da arquitetura, a camada enriquecida. O AWS Athena permite que os usuários realizem consultas SQL diretamente sobre os dados armazenados no formato Parquet, proporcionando uma experiência ágil e intuitiva para a análise de informações. Essa integração facilita a extração de insights valiosos por parte dos profissionais de dados, contribuindo para uma tomada de decisões informada e eficaz.

Criação da Tabela

CREATE EXTERNAL TABLE `telegram`(
`message_id` bigint,
`user_id` bigint,
`user_is_bot` boolean,
`user_first_name` string,
`chat_id` bigint,
`chat_type` string,
`text` string,
`date` bigint)
PARTITIONED BY (
`context_date` date)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://<bucket-m42-enriquecido>/'

Vamos as consultas

SELECT * FROM `telegram` LIMIT 10;

A página do serviço Athena apresenta o resultado da consulta da seguinte forma:

image

A página do serviço Athena apresenta o resultado da consulta da seguinte forma:

Com os dados disponíveis, os usuários têm a capacidade de realizar diversas consultas analíticas. Abaixo estão alguns exemplos:

  1. Contagem de mensagens diárias.
SELECT
context_date,
count(1) AS "message_amount"
FROM "telegram"
GROUP BY context_date
ORDER BY context_date DESC

image

  • Quantidade de mensagens por usuário por dia.
SELECT
user_id,
user_first_name,
context_date,
count(1) AS "message_amount"
FROM "telegram"
GROUP BY
user_id,
user_first_name,
context_date
ORDER BY context_date DESC

image

  • Média do tamanho das mensagens por usuário por dia.
SELECT
user_id,
user_first_name,
context_date,
CAST(AVG(length(text)) AS INT) AS "average_message_length"
FROM "telegram"
GROUP BY
user_id,
user_first_name,
context_date
ORDER BY context_date DESC

image

  • Quantidade de mensagens por hora por dia da semana por número da semana.
WITH
parsed_date_cte AS (
  SELECT
      *,
      CAST(date_format(from_unixtime("date"),'%Y-%m-%d %H:%i:%s') AS timestamp) AS parsed_date
  FROM "telegram"
),
hour_week_cte AS (
  SELECT
      *,
      EXTRACT(hour FROM parsed_date) AS parsed_date_hour,
      EXTRACT(dow FROM parsed_date) AS parsed_date_weekday,
      EXTRACT(week FROM parsed_date) AS parsed_date_weeknum
  FROM parsed_date_cte
)
SELECT
  parsed_date_hour,
  parsed_date_weekday,
  parsed_date_weeknum,
  count(1) AS "message_amount"
FROM hour_week_cte
GROUP BY
  parsed_date_hour,
  parsed_date_weekday,
  parsed_date_weeknum
ORDER BY
  parsed_date_weeknum,
  parsed_date_weekday

image

5. Conclusão

A integração de chatbots em aplicativos de mensagens, como o Telegram, aliada à poderosa infraestrutura em nuvem proporcionada pela AWS, incluindo serviços notáveis como o S3, AWS Lambda e AWS Gateway, representa um marco significativo na melhoria da experiência do usuário e na otimização operacional. Essa convergência de tecnologias abre novas perspectivas para a análise de dados, promovendo avanços substanciais nas estratégias de negócios.

Ao integrar chatbots em plataformas de mensagens populares, como o Telegram, empresas podem proporcionar interações instantâneas e personalizadas, melhorando a comunicação com os usuários. A AWS, por sua vez, desempenha um papel crucial nesse cenário ao fornecer uma infraestrutura robusta e escalável. O Amazon S3 oferece armazenamento seguro e confiável para dados, enquanto o AWS Lambda permite a execução de código sem a necessidade de gerenciar servidores, proporcionando eficiência operacional e economia de recursos.

O AWS Gateway, como ponto de entrada para APIs, simplifica e agiliza a comunicação entre diferentes serviços, garantindo uma integração suave entre o chatbot e outros componentes do ecossistema. Essa coesão tecnológica não apenas aprimora a eficiência operacional, mas também oferece um terreno fértil para a análise de dados em larga escala.

A análise de dados torna-se mais acessível e eficiente, com a capacidade de extrair insights valiosos das interações dos usuários com o chatbot. Informações cruciais são processadas e armazenadas de forma segura no S3, enquanto o AWS Lambda permite a execução de análises em tempo real, possibilitando uma tomada de decisões mais informada e ágil.

Em conclusão, a integração de chatbots em aplicativos de mensagens, aliada à infraestrutura em nuvem oferecida pela AWS, cria uma sinergia poderosa. Essa abordagem não apenas eleva a experiência do usuário, mas também impulsiona a eficiência operacional e fornece uma base sólida para análises de dados avançadas, capacitando as empresas a prosperarem na era da transformação digital.

Share
Comments (2)
Luiz Júnior
Luiz Júnior - 30/08/2024 06:33

Valeu Johnnatan!!! o legal da área tech é que sempre estamos aprendendo algo zero o tempo todo, abs.

JM

Johnnatan Moreno - 29/08/2024 10:45

Muito bom suas explicações, estou iniciando do Zero a area de tech e estou iniciando buscando entender com o funcionam esses boots, sites, etc...