Formação Full Stack Data & Analytics
Aula A008 - Feature Engineering
SPARK-SQL
Formação Full Stack Data & Analytics
Aula A008 - Feature Engineering
SPARK-SQL
Roberto SSoares - LfLngLrnng
in/roberto-dos-santos-soares
Portifólio: roberto-ssoares
" [+] Faturamento,
[-] Custo,
[+] Qualidade de vida "
"Bruno Jardim"
Agradeço a "PoD Academy" e "DSA Academy" pelo conhecimento transmitido.
e a minha linda esposa "Elizabete" por todo apoio.
CRISP-DM (Data Understanding)
Feature Engineering com SPARK-SQL
Instalando e Carregando Pacotes
Instalando e Carregando Pacotes
Instala o pacote watermark
#!pip install -q -U watermark
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Roberto Soares - LfLngLrnng"
Configuração do ambiente para utilização do Spark
Instalando:
# install java
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#download the spark
#!wget https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
# unzip the spark files
#!tar xf spark-3.3.0-bin-hadoop3.tgz
,
# set the environment variables
#import os
#os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"
Instala o pacote findspark:
#!pip install -q findspark
Ambiente Spark Local - Construindo SparkSession:
#import findspark
#findspark.init()
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.master("local[*]").getOrCreate()
import os
os.environ['PYSPARK_PYTHON'] = r'D:\anaconda3\envs\spark-pod-env\python.exe'
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf() \
.setAppName('A008-FEATURE-ENGINEERING') \
.setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
spark
#import sys
#print(sys.executable)
#print(sys.version)
#print(sys.version_info)
caminho = "D:/_jupyter/pod/ds/mod06-ciencia-de-dados/data/"
base_transacoes = "base_transacoes.csv"
base_churn = "base_churn.csv"
df_transacoes = spark.read.csv(caminho+base_transacoes,
header=True,
inferSchema=True
)
shape = (df_transacoes.count(), len(df_transacoes.columns))
print(shape)
df_publico = spark.read.csv(caminho+base_churn,
header=True,
inferSchema=True
)
shape = (df_publico.count(), len(df_publico.columns))
print(shape)
Mostrar o Schema da tabela - df_transacoes:
print(df_transacoes.printSchema())
print()
print(df_transacoes.show(10,False))
Observações:
Função para padronizar no nome das variáveis:
O código apresentado tem o objetivo de padronizar os nomes das colunas de um DataFrame do PySpark.
A padronização inclui:
import unicodedata
from pyspark.sql import DataFrame
def remover_acentuacao_e_formatar(nome_coluna: str) -> str:
"""
Remove acentuação, transforma cedilhas e converte espaços para underscores.
Transforma a string para caixa alta.
"""
# Remove acentuação e caracteres não-ASCII
nome_sem_acentuacao = unicodedata.normalize('NFKD', nome_coluna).encode('ASCII', 'ignore').decode('ASCII')
# Substitui espaços por underscores e converte para maiúsculas
nome_padronizado = nome_sem_acentuacao.replace(" ", "_").upper()
return nome_padronizado
def padronizar_nomes_colunas(dataframe: DataFrame) -> DataFrame:
"""
Padroniza os nomes das colunas de um DataFrame PySpark removendo acentuações,
substituindo espaços por underscores e convertendo para caixa alta.
"""
# Renomeia as colunas do DataFrame utilizando a função de padronização
novas_colunas = [remover_acentuacao_e_formatar(coluna) for coluna in dataframe.columns]
dataframe_padronizado = dataframe.toDF(*novas_colunas)
return dataframe_padronizado
Aplicando a padronização do nome das variáveis em df_transacoes:
# Exemplo de uso:
df_transacoes = padronizar_nomes_colunas(df_transacoes)
df_transacoes.printSchema()
Verificando se há valores nulos em df_transacoes:
Esse código em PySpark é usado para contar o número de valores nulos em cada coluna de um DataFrame.
Vamos analisar o que cada parte do código faz:
Resumo:
from pyspark.sql.functions import *
df_transacoes.select([count(when(isnull(c), c)).alias(c) for c in df_transacoes.columns]).show()
Observações:
Criando uma tabela temporária - sql_trancacoes:
df_transacoes.createOrReplaceTempView("sql_transacoes")
Mostrar o Schema da tabela - df_publico:
print(df_publico.printSchema())
print()
print(df_publico.show(10))
Aplicando a padronização do nome das variáveis em df_publico:
# Exemplo de uso:
df_publico = padronizar_nomes_colunas(df_publico)
df_publico.printSchema()
Verificando se há valores nulos em df_publico:
df_publico.select([count(when(isnull(c), c)).alias(c) for c in df_publico.columns]).show()
Observações:
Criando uma tabela temporária - sql_publico:
df_publico.createOrReplaceTempView("sql_publico")
Função de Metadados:
Você pode gerar um DataFrame de metadados utilizando Spark SQL. Abaixo está a função equivalente que gera o mesmo resultado usando SQL em PySpark:
Explicação:
Observação:
from pyspark.sql.functions import expr
def gerar_metadados_sql(table_name: str):
# Obtendo as colunas e seus tipos
dataframe = spark.table(table_name)
colunas = dataframe.columns
tipos = [str(dataframe.schema[col].dataType) for col in colunas]
# Lista para armazenar o resultado de cada coluna
metadados_list = []
# Iterando sobre cada coluna para gerar os metadados
for col, dtype in zip(colunas, tipos):
sql_query = f"""
SELECT
'{table_name}' AS nome_tabela,
'{col}' AS nome_variavel,
'{dtype}' AS tipo,
COUNT(*) AS total_linhas,
COUNT(CASE WHEN `{col}` IS NULL THEN 1 END) AS qt_nulos,
COUNT(DISTINCT `{col}`) AS cardinalidade,
(COUNT(CASE WHEN `{col}` IS NULL THEN 1 END) / COUNT(*)) * 100 AS percent_nulos
FROM
{table_name}
"""
# Executando a consulta SQL para cada coluna e coletando o resultado
metadados_df = spark.sql(sql_query)
metadados_list.append(metadados_df)
# Unindo os resultados de todas as colunas
final_metadados_df = metadados_list[0]
for df in metadados_list[1:]:
final_metadados_df = final_metadados_df.union(df)
return final_metadados_df
# Exemplo de uso:
metadados_transacoes = gerar_metadados_sql("sql_transacoes")
metadados_transacoes.show()
# Exemplo de uso:
metadados_publico = gerar_metadados_sql("sql_publico")
metadados_publico.show(truncate=False)
Spark-SQL - Querys aleatórias
# Query
spark.sql("""
SELECT
ID_CLIENTE,
ROUND(AVG(VALOR),2) AS VL_MED_ROUPAS,
COUNT(*) AS QT_ITENS
FROM sql_transacoes
WHERE CATEGORIA = 'Roupas' AND ID_CLIENTE = 1
GROUP BY ID_CLIENTE
ORDER BY ID_CLIENTE
""").show()
spark.sql("SELECT * FROM sql_transacoes WHERE Categoria ='Esportes' ORDER BY 'ID Cliente'").show(truncate=False)
Quantas transações a pessoa de ID 1 da Categoria Eletrônicos fez?
spark.sql("""
SELECT *
FROM sql_transacoes
WHERE ID_CLIENTE = 1 AND CATEGORIA = 'Eletrônicos'
""").show()
Qual valor médio das transações por categoria ?
spark.sql("""
SELECT
CATEGORIA,
ROUND(AVG(VALOR),2) AS VL_MED_CATEG,
COUNT(*) AS QT_ITENS
FROM sql_transacoes
WHERE ID_CLIENTE = 1
GROUP BY CATEGORIA
""").show()
Valores do Consumo:
df_temp_01 = spark.sql("""
SELECT
ID_CLIENTE,
ROUND(SUM(VALOR),2) AS VL_TOT_CONSUMO,
ROUND(AVG(VALOR),2) AS VL_MED_CONSUMO,
ROUND(MAX(VALOR),2) AS VL_MAX_CONSUMO,
ROUND(MIN(VALOR),2) AS VL_MIN_CONSUMO
FROM sql_transacoes
GROUP BY ID_CLIENTE
ORDER BY ID_CLIENTE
""")
shape = (df_temp_01.count(), len(df_temp_01.columns))
print(shape)
print()
print(df_temp_01.show())
Valores Total e Médio da Categoria Esporte:
df_temp_02 = spark.sql("""
SELECT
ID_CLIENTE,
ROUND(SUM(CASE WHEN CATEGORIA = 'Esportes' THEN VALOR ELSE 0 END),2) AS VL_TOT_CONS_ESPORTES,
ROUND(AVG(CASE WHEN CATEGORIA = 'Esportes' THEN VALOR ELSE NULL END),2) AS VL_MED_CONS_ESPORTES
FROM sql_transacoes
GROUP BY ID_CLIENTE
ORDER BY ID_CLIENTE
""")
shape = (df_temp_02.count(), len(df_temp_02.columns))
print(shape)
print()
print(df_temp_02.show())
CRISP-DM (Data Preparation) - Feature Engineering
Criar uma flag que marca ultimos 3 meses
Racional do código:
df_temp_03 = spark.sql("""
SELECT
*,
DATA,
CASE WHEN DATA BETWEEN DATE_ADD(MAX(DATA) OVER (PARTITION BY ID_CLIENTE), -90) AND MAX(DATA) OVER (PARTITION BY ID_CLIENTE) THEN 1 ELSE 0 END AS FLG_U03M,
CASE WHEN DATA BETWEEN DATE_ADD(MAX(DATA) OVER (PARTITION BY ID_CLIENTE), -180) AND MAX(DATA) OVER (PARTITION BY ID_CLIENTE) THEN 1 ELSE 0 END AS FLG_U06M,
CASE WHEN DATA BETWEEN DATE_ADD(MAX(DATA) OVER (PARTITION BY ID_CLIENTE), -365) AND MAX(DATA) OVER (PARTITION BY ID_CLIENTE) THEN 1 ELSE 0 END AS FLG_U12M
FROM sql_transacoes
ORDER BY ID_CLIENTE, DATA;
""")
df_temp_03.createOrReplaceTempView("df_temp_03")
shape = (df_temp_03.count(), len(df_temp_03.columns))
print(shape)
print()
print(df_temp_03.show())
Comentário
A construção das variáveis explicativas é agnóstica.
df_temp_04 = spark.sql("""
SELECT
ID_CLIENTE,
ROUND(SUM(CASE WHEN CATEGORIA = 'Esportes' THEN VALOR ELSE 0 END),2) AS VL_TOT_CONS_ESPORTES,
ROUND(AVG(CASE WHEN CATEGORIA = 'Esportes' THEN Valor ELSE NULL END),2) AS VL_MED_CONS_ESPORTES,
ROUND(AVG(CASE WHEN CATEGORIA = 'Esportes' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ESPORTES,
ROUND(AVG(CASE WHEN CATEGORIA = 'Esportes' AND FLG_U06M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U06M_CONS_ESPORTES,
ROUND(AVG(CASE WHEN CATEGORIA = 'Esportes' AND FLG_U12M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U12M_CONS_ESPORTES
FROM df_temp_03
GROUP BY ID_CLIENTE
ORDER BY ID_CLIENTE
""")
df_temp_04.show()
spark.sql("""SELECT * FROM sql_transacoes""").show(3)
spark.sql("""SELECT * FROM sql_publico """).show(3)
EAD - Exploratory Data Analysis:
1. Tempo desde a Última Transação:
dfqry01 = spark.sql("""
SELECT
*,
MAX(DATA) OVER () AS DT_MAIS_RECENTE,
DATEDIFF(MAX(DATA) OVER (), DATA) AS TMP_ULT_TRANSACAO
FROM
sql_transacoes
""")
print(dfqry01.sort("TMP_ULT_TRANSACAO","DATA").show(5))
print()
print(dfqry01.sort("TMP_ULT_TRANSACAO","DATA", ascending=False).show(5))
dfqry01.createOrReplaceTempView("tmpview01")
2. Frequência de Compra:
spark.sql("""SELECT ID_CLIENTE, COUNT(*) AS QTD_COMPRAS, MAX(DT_MAIS_RECENTE) AS ULT_TRANSACAO FROM tmpview01 GROUP BY ID_CLIENTE""").show(3)
from pyspark.sql.functions import months_between
# SQL query
dfqry02 = spark.sql("""
WITH CTE AS (
SELECT
ID_CLIENTE,
COUNT(*) AS QTD_COMPRAS,
MAX(DT_MAIS_RECENTE) AS ULT_TRANSACAO
FROM tmpview01
GROUP BY ID_CLIENTE
)
SELECT
A.*,
B.QTD_COMPRAS,
date_sub(B.ULT_TRANSACAO, A.DIAS_DESDE_A_INSCRICAO) AS DT_INSCRICAO,
ROUND(months_between(B.ULT_TRANSACAO, date_sub(B.ULT_TRANSACAO, A.DIAS_DESDE_A_INSCRICAO)),0) AS MESES_DESDE_INSCRICAO,
ROUND(B.QTD_COMPRAS / months_between(B.ULT_TRANSACAO, date_sub(B.ULT_TRANSACAO, A.DIAS_DESDE_A_INSCRICAO)),4) AS FREQUENCIA_COMPRA
FROM sql_publico AS A
LEFT OUTER JOIN CTE AS B
ON A.ID = B.ID_CLIENTE
""")
print(dfqry02.sort("FREQUENCIA_COMPRA").show(10))
print()
print(dfqry02.sort("FREQUENCIA_COMPRA", ascending=False).show(10))
dfqry02.createOrReplaceTempView("tmpview02")
spark.sql("""SELECT * FROM tmpview01 WHERE ID_CLIENTE = 145""").show()
3. Total Gasto:
spark.sql("""SELECT * FROM tmpview01""").show(3)
spark.sql("""SELECT ID_CLIENTE, ROUND(SUM(VALOR),2) AS TOT_GASTO_CLI FROM tmpview01 GROUP BY ID_CLIENTE ORDER BY TOT_GASTO_CLI DESC""").show()
4. Categoria Favorita:
spark.sql("""
SELECT
ID_CLIENTE,
CATEGORIA,
ROUND(SUM(VALOR),2) AS TOT_GASTO_CLI
FROM tmpview01
GROUP BY ID_CLIENTE, CATEGORIA
ORDER BY ID_CLIENTE,TOT_GASTO_CLI DESC
""").show()
spark.sql("""
WITH CTE AS (
SELECT
ID_CLIENTE,
CATEGORIA,
ROUND(SUM(VALOR),2) AS TOT_GASTO_CLI,
RANK(CATEGORIA) OVER (PARTITION BY ID_CLIENTE ORDER BY ROUND(SUM(VALOR),2) DESC) AS RANK_CATEG
FROM tmpview01
GROUP BY ID_CLIENTE, CATEGORIA
)
SELECT * FROM CTE WHERE WHERE RANK_CATEG = 1
""").show()
5. Gasto Médio por Transação:
spark.sql("""
SELECT
ROUND(SUM(VALOR)/COUNT(*),2) AS GASTO_MEDIO
FROM tmpview01
""").show()
6. Duração da Assinatura:
dfqry03 = spark.sql("""
WITH CTE AS (
SELECT DISTINCT
ID_CLIENTE,
DT_MAIS_RECENTE
FROM tmpview01
)
SELECT
A.*,
B.DT_MAIS_RECENTE,
DATEDIFF(B.DT_MAIS_RECENTE, DT_INSCRICAO) AS DURACAO_ASSINATURA
FROM tmpview02 AS A
LEFT OUTER JOIN CTE AS B
ON A.ID = B.ID_CLIENTE
""")
dfqry03.show()
dfqry03.createOrReplaceTempView("tmpview03")
7. Número de Categorias Compradas:
spark.sql("""SELECT ID_CLIENTE, COUNT(DISTINCT CATEGORIA) AS CATEG_DIFERENTES FROM tmpview01 GROUP BY ID_CLIENTE""").show()
8. Usou Suporte antes da Primeira Compra:
spark.sql("""SELECT DISTINCT ID_CLIENTE, MIN(DATA) OVER(PARTITION BY ID_CLIENTE) AS PRIM_COMPRA FROM tmpview01""").show(3)
dfqry04 = spark.sql("""
WITH CTE AS (
SELECT DISTINCT
ID_CLIENTE,
MIN(DATA) OVER(PARTITION BY ID_CLIENTE) AS PRIM_COMPRA
FROM tmpview01
)
SELECT
A.*,
PRIM_COMPRA,
datediff(DT_INSCRICAO,PRIM_COMPRA) AS DIAS_INSCRICAO_PRIM_COMPRA,
CASE WHEN datediff(DT_INSCRICAO,PRIM_COMPRA) > 0 AND USOU_SUPORTE = 1 THEN 1 ELSE 0 END AS SUPORTE_ANTES_PRIM_COMPRA
FROM tmpview02 AS A
LEFT OUTER JOIN CTE AS B
ON A.ID = B.ID_CLIENTE
""")
dfqry04.show()
dfqry04.createOrReplaceTempView("tmpview04")
9. Dias entre Inscrição e Primeira Compra:
spark.sql("""SELECT ID, DIAS_INSCRICAO_PRIM_COMPRA FROM tmpview04""").show()
10. Frequência de Transações por Plano:
spark.sql("""
WITH CTE AS (
SELECT
ID_CLIENTE,
COUNT(*) AS TOT_TRANSACOES
FROM tmpview01
GROUP BY ID_CLIENTE
)
SELECT ID, PLANO, ROUND((TOT_TRANSACOES / MESES_DESDE_INSCRICAO),2) AS FREQ_TRANSACOES_PLANO
FROM tmpview03
LEFT OUTER JOIN CTE
ON ID = ID_CLIENTE
""").show()
dfqry05 = spark.sql("""
SELECT
*,
DATA,
CASE WHEN DATA BETWEEN DATE_ADD(MAX(DATA) OVER (PARTITION BY ID_CLIENTE), -90) AND MAX(DATA) OVER (PARTITION BY ID_CLIENTE) THEN 1 ELSE 0 END AS FLG_U03M,
CASE WHEN DATA BETWEEN DATE_ADD(MAX(DATA) OVER (PARTITION BY ID_CLIENTE), -180) AND MAX(DATA) OVER (PARTITION BY ID_CLIENTE) THEN 1 ELSE 0 END AS FLG_U06M,
CASE WHEN DATA BETWEEN DATE_ADD(MAX(DATA) OVER (PARTITION BY ID_CLIENTE), -365) AND MAX(DATA) OVER (PARTITION BY ID_CLIENTE) THEN 1 ELSE 0 END AS FLG_U12M
FROM tmpview01
ORDER BY ID_CLIENTE, DATA;
""")
dfqry05.show()
dfqry05.createOrReplaceTempView("tmpview05")
11. Média do Valor Gasto em Esportes nos Últimos 3 Meses:
spark.sql("""
SELECT
ID_CLIENTE,
ROUND(SUM(CASE WHEN CATEGORIA = 'Esportes' THEN VALOR ELSE 0 END),2) AS VL_TOT_CONS_ESPORTES,
ROUND(AVG(CASE WHEN CATEGORIA = 'Esportes' THEN Valor ELSE NULL END),2) AS VL_MED_CONS_ESPORTES,
ROUND(AVG(CASE WHEN CATEGORIA = 'Esportes' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ESPORTES
FROM df_temp_03
WHERE CATEGORIA = 'Esportes'
GROUP BY ID_CLIENTE
ORDER BY ID_CLIENTE
""").show(10)
12. Média do Valor Gasto em Eletrônicos nos Últimos 3 Meses:
spark.sql("""
SELECT
ID_CLIENTE,
ROUND(SUM(CASE WHEN CATEGORIA = 'Eletrônicos' THEN VALOR ELSE 0 END),2) AS VL_TOT_CONS_ELETRONICOS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Eletrônicos' THEN Valor ELSE NULL END),2) AS VL_MED_CONS_ELETRONICOS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Eletrônicos' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ELETRONICOS
FROM df_temp_03
WHERE CATEGORIA = 'Eletrônicos'
GROUP BY ID_CLIENTE
ORDER BY ID_CLIENTE
""").show(10)
13. Média do Valor Gasto em Roupas nos Últimos 3 Meses:
spark.sql("""
SELECT
ID_CLIENTE,
ROUND(SUM(CASE WHEN CATEGORIA = 'Roupas' THEN VALOR ELSE 0 END),2) AS VL_TOT_CONS_ROUPAS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Roupas' THEN Valor ELSE NULL END),2) AS VL_MED_CONS_ROUPAS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Roupas' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ROUPAS
FROM df_temp_03
WHERE CATEGORIA = 'Roupas'
GROUP BY ID_CLIENTE
ORDER BY ID_CLIENTE
""").show(10)
14. Média do Valor Gasto em Alimentos nos Últimos 3 Meses:
spark.sql("""
SELECT
ID_CLIENTE,
ROUND(SUM(CASE WHEN CATEGORIA = 'Alimentos' THEN VALOR ELSE 0 END),2) AS VL_TOT_CONS_ALIMENTOS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Alimentos' THEN Valor ELSE NULL END),2) AS VL_MED_CONS_ALIMENTOS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Alimentos' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ALIMENTOS
FROM df_temp_03
WHERE CATEGORIA = 'Alimentos'
GROUP BY ID_CLIENTE
ORDER BY ID_CLIENTE
""").show(10)
15. Média do Valor Gasto em Livros nos Últimos 3 Meses:
spark.sql("""
SELECT
ID_CLIENTE,
ROUND(SUM(CASE WHEN CATEGORIA = 'Livros' THEN VALOR ELSE 0 END),2) AS VL_TOT_CONS_LIVROS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Livros' THEN Valor ELSE NULL END),2) AS VL_MED_CONS_LIVROS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Livros' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_LIVROS
FROM df_temp_03
WHERE CATEGORIA = 'Livros'
GROUP BY ID_CLIENTE
ORDER BY ID_CLIENTE
""").show(10)
spark.sql("""
SELECT
ID_CLIENTE,
--ROUND(SUM(CASE WHEN CATEGORIA = 'Esportes' THEN VALOR ELSE 0 END),2) AS VL_TOT_CONS_ESPORTES,
--ROUND(AVG(CASE WHEN CATEGORIA = 'Esportes' THEN Valor ELSE NULL END),2) AS VL_MED_CONS_ESPORTES,
ROUND(AVG(CASE WHEN CATEGORIA = 'Esportes' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ESPORTES,
--ROUND(SUM(CASE WHEN CATEGORIA = 'Eletrônicos' THEN VALOR ELSE 0 END),2) AS VL_TOT_CONS_ELETRONICOS,
--ROUND(AVG(CASE WHEN CATEGORIA = 'Eletrônicos' THEN Valor ELSE NULL END),2) AS VL_MED_CONS_ELETRONICOS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Eletrônicos' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ELETRONICOS,
--ROUND(SUM(CASE WHEN CATEGORIA = 'Roupas' THEN VALOR ELSE 0 END),2) AS VL_TOT_CONS_ROUPAS,
--ROUND(AVG(CASE WHEN CATEGORIA = 'Roupas' THEN Valor ELSE NULL END),2) AS VL_MED_CONS_ROUPAS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Roupas' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ROUPAS,
--ROUND(SUM(CASE WHEN CATEGORIA = 'Alimentos' THEN VALOR ELSE 0 END),2) AS VL_TOT_CONS_ALIMENTOS,
--ROUND(AVG(CASE WHEN CATEGORIA = 'Alimentos' THEN Valor ELSE NULL END),2) AS VL_MED_CONS_ALIMENTOS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Alimentos' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ALIMENTOS,
--ROUND(SUM(CASE WHEN CATEGORIA = 'Livros' THEN VALOR ELSE 0 END),2) AS VL_TOT_CONS_LIVROS,
--ROUND(AVG(CASE WHEN CATEGORIA = 'Livros' THEN Valor ELSE NULL END),2) AS VL_MED_CONS_LIVROS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Livros' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_LIVROS
FROM df_temp_03
GROUP BY ID_CLIENTE
ORDER BY ID_CLIENTE
""").show(10)
spark.sql("""
SELECT
ID_CLIENTE,
ROUND(AVG(CASE WHEN CATEGORIA = 'Esportes' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ESPORTES,
ROUND(AVG(CASE WHEN CATEGORIA = 'Eletrônicos' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ELETRONICOS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Roupas' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ROUPAS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Alimentos' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_ALIMENTOS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Livros' AND FLG_U03M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U03M_CONS_LIVROS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Esportes' AND FLG_U06M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U06M_CONS_ESPORTES,
ROUND(AVG(CASE WHEN CATEGORIA = 'Eletrônicos' AND FLG_U06M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U06M_CONS_ELETRONICOS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Roupas' AND FLG_U06M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U06M_CONS_ROUPAS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Alimentos' AND FLG_U06M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U06M_CONS_ALIMENTOS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Livros' AND FLG_U06M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U06M_CONS_LIVROS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Esportes' AND FLG_U12M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U12M_CONS_ESPORTES,
ROUND(AVG(CASE WHEN CATEGORIA = 'Eletrônicos' AND FLG_U12M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U12M_CONS_ELETRONICOS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Roupas' AND FLG_U12M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U12M_CONS_ROUPAS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Alimentos' AND FLG_U12M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U12M_CONS_ALIMENTOS,
ROUND(AVG(CASE WHEN CATEGORIA = 'Livros' AND FLG_U12M = 1 THEN VALOR ELSE NULL END),2) AS VL_MED_U12M_CONS_LIVROS
FROM df_temp_03
GROUP BY ID_CLIENTE
ORDER BY ID_CLIENTE
""").show(10)
+----------+-------------------------+----------------------------+-----------------------+--------------------------+-----------------------+-------------------------+----------------------------+-----------------------+--------------------------+-----------------------+-------------------------+----------------------------+-----------------------+--------------------------+-----------------------+
|ID_CLIENTE|VL_MED_U03M_CONS_ESPORTES|VL_MED_U03M_CONS_ELETRONICOS|VL_MED_U03M_CONS_ROUPAS|VL_MED_U03M_CONS_ALIMENTOS|VL_MED_U03M_CONS_LIVROS|VL_MED_U06M_CONS_ESPORTES|VL_MED_U06M_CONS_ELETRONICOS|VL_MED_U06M_CONS_ROUPAS|VL_MED_U06M_CONS_ALIMENTOS|VL_MED_U06M_CONS_LIVROS|VL_MED_U12M_CONS_ESPORTES|VL_MED_U12M_CONS_ELETRONICOS|VL_MED_U12M_CONS_ROUPAS|VL_MED_U12M_CONS_ALIMENTOS|VL_MED_U12M_CONS_LIVROS|
+----------+-------------------------+----------------------------+-----------------------+--------------------------+-----------------------+-------------------------+----------------------------+-----------------------+--------------------------+-----------------------+-------------------------+----------------------------+-----------------------+--------------------------+-----------------------+
| 1| 57.29| 118.45| 96.4| NULL| 193.01| 57.29| 118.45| 66.07| 187.82| 193.01| 57.29| 132.24| 99.4| 187.82| 183.69|
| 2| 57.35| NULL| NULL| 47.58| NULL| 57.35| 69.05| NULL| 54.79| NULL| 84.62| 69.05| NULL| 54.79| NULL|
| 3| 164.27| 97.44| NULL| NULL| NULL| 164.27| 97.44| NULL| NULL| NULL| 164.27| 105.79| NULL| NULL| NULL|
| 4| NULL| NULL| NULL| 103.19| NULL| NULL| NULL| 178.83| 103.19| NULL| NULL| 59.87| 183.94| 103.19| NULL|
| 5| NULL| 141.41| 150.11| NULL| NULL| NULL| 141.41| 150.11| NULL| NULL| 148.03| 141.41| 150.11| NULL| NULL|
| 6| 127.34| 8.9| NULL| NULL| NULL| 127.34| 8.9| NULL| NULL| NULL| 127.34| 8.9| NULL| NULL| 199.29|
| 7| 97.0| NULL| 103.96| NULL| NULL| 68.6| NULL| 103.96| 51.54| NULL| 68.6| 124.52| 103.96| 51.54| 92.03|
| 8| NULL| 85.86| NULL| NULL| NULL| NULL| 93.56| NULL| NULL| NULL| 150.17| 93.56| NULL| 56.97| NULL|
| 9| NULL| NULL| NULL| 51.77| 165.23| NULL| NULL| NULL| 51.77| 165.23| NULL| NULL| NULL| 43.64| 165.23|
| 10| NULL| 144.9| NULL| NULL| NULL| NULL| 144.9| NULL| NULL| NULL| NULL| 144.9| NULL| 185.66| NULL|
+----------+-------------------------+----------------------------+-----------------------+--------------------------+-----------------------+-------------------------+----------------------------+-----------------------+--------------------------+-----------------------+-------------------------+----------------------------+-----------------------+--------------------------+-----------------------+
only showing top 10 rows
%reload_ext watermark
%watermark -a "RobertoSSoares-LfLngLrnng"
%watermark -v -m
%watermark --iversions
Fim