image

Acesse bootcamps ilimitados e +650 cursos

50
%OFF
Article image
Hugo Neto
Hugo Neto03/10/2022 19:42
Compartilhe

ETL e análise de dados com Apache Spark (PySpark)

    Já não se faz mais ETL como antigamente. O bom e velho Excel já não atende mais as necessidades das profissionais que trabalham com isso e deu lugar a ferramentas específicas para essa atividade (Talend, Apache Hop, SSIS). Até mesmo ferramentas de self service BI, possuem funcionalidades para realizar esse trabalho. Os Pythonistas de plantão já contavam com a Biblioteca Pandas para realizar esse trabalho, porém ao submeter um volume estrondoso de dados ao um script utilizando a dita cuja, percebe-se que ela não segura o rojão. Foi necessária a criação de uma ferramenta que possibilita-se o processamento distribuído dos dados. Surge então o ecossistema Hadoop, e mais a frente, o Apache Spark.

    A ideia aqui é apresentar de forma prática a utilização do Spark, interagindo com ele através da SDK PySpark. O script lê 5 datasets, faz a conversão deste para dataframe e seguida, converte eles para views de modo a permitir a interação com os dados através de SQL.

    import pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.master('local[*]').getOrCreate()
    clientes = spark.read.csv("C:\\Users\\gugum\\dados\\clientes.csv", sep=';', inferSchema=True,header=True)
    vendedores = spark.read.csv("C:\\Users\\gugum\\dados\\vendedores.csv", sep=';', inferSchema=True,header=True)
    produtos = spark.read.csv("C:\\Users\\gugum\\dados\\produtos.csv", sep=';', inferSchema=True,header=True)
    vendas = spark.read.csv("C:\\Users\\gugum\\dados\\vendas.csv", sep=';', inferSchema=True,header=True)
    itensvendas = spark.read.csv("C:\\Users\\gugum\\dados\\itensvenda.csv", sep=';', inferSchema=True,header=True)
    clientes.createOrReplaceTempView("clientesView")
    produtos.createOrReplaceTempView("produtosView")
    vendedores.createOrReplaceTempView("vendedoresView")
    vendas.createOrReplaceTempView("vendasView")
    itensvendas.createOrReplaceTempView("itensvendasView")
    df_desnormalizado = spark.sql("""select
    itensvendasView.IdProduto,
    itensvendasView.Quantidade,
    itensvendasView.ValorUnitario,
    itensvendasView.ValorTotal,
    itensvendasView.Desconto,
    vendasView.IDVenda,
    vendasView.Data,
    vendasView.Total,
    produtosView.IDProduto,
    produtosView.Produto,
    produtosView.Preco,
    clientesView.Cliente,
    clientesView.Estado,
    clientesView.Sexo,
    clientesView.Status,
    vendedoresView.IDVendedor,
    vendedoresView.Nome
    from itensvendasView
    inner join vendasView on vendasView.idvenda = itensvendasview.idvenda
    inner join produtosView on produtosView.idproduto = itensvendasview.idproduto
    inner join clientesView on clientesView.idcliente = vendasview.idcliente
    inner join vendedoresView on vendedoresview.idvendedor  =  vendasview.idvendedor
    """)
    #vendas por cliente
    teste2 = spark.sql("""select
    distinct clientesView.cliente,
    round(sum(itensvendasView.valorunitario)) as total_vendas
    from itensvendasView
    inner join vendasView on vendasView.idvenda = itensvendasview.idvenda
    inner join produtosView on produtosView.idproduto = itensvendasview.idproduto
    inner join clientesView on clientesView.idcliente = vendasview.idcliente
    inner join vendedoresView on vendedoresview.idvendedor  =  vendasview.idvendedor
    group by clientesview.cliente
    order by total_vendas desc
    """)
    teste2.show(250)
    df_desnormalizado.to_csv("vendas.csv")
    

    Ao final é feita uma análise dos dados processados e em seguida, é gerado um arquivo CSV com dados desnormalizados.

    Lembrando que esse código foi escrito para fins didáticos. Numa solução feita para ambiente produtivo, as tarefas seriam separadas, formando um Pipeline a ser orquestrado e “startado” por alguma ferramenta do tipo Job Scheduling.

    Até a próxima pessoal!

    Compartilhe
    Comentários (2)
    Laio Silva
    Laio Silva - 04/10/2022 18:31

    Olá, tudo bom?

    Bacana seu artigo.

    Qual sua visão sobre controle de qualidade?

    Por favor, posso contar com seu voto no artigo abaixo?

    DIO| Codifique o seu futuro global agora

    Qual sua opinião? Algo a acrescentar?

    Desde já, te agradeço!

    Victor Marques
    Victor Marques - 03/10/2022 22:49

    valeu por compartilhar