[Python] PySpark to M, SQL or Pandas

Hace tiempo escribí un artículo sobre como escribir en pandas algunos códigos de referencia de SQL o M (power query). Si bien en su momento fue de gran utilidad, lo cierto es que hoy existe otro lenguaje que representa un fuerte pie en el análisis de datos.

Spark se convirtió en el jugar principal para lectura de datos en Lakes. Aunque sea cierto que existe SparkSQL, no quise dejar de traer estas analogías de código entre PySpark, M, SQL y Pandas para quienes estén familiarizados con un lenguaje, puedan ver como realizar una acción con el otro.

Lo primero es ponernos de acuerdo en la lectura del post.

Power Query corre en capas. Cada linea llama a la anterior (que devuelve una tabla) generando esta perspectiva o visión en capas. Por ello cuando leamos en el código #"Paso anterior" hablamos de una tabla.

En Python, asumiremos a "df" como un pandas dataframe (pandas.DataFrame) ya cargado y a "spark_frame" a un frame de pyspark cargado (spark.read)

Conozcamos los ejemplos que serán listados en el siguiente orden: SQL, PySpark, Pandas, Power Query.

En SQL:

SELECT TOP 5 * FROM table

En PySpark

spark_frame.limit(5)

En Pandas:

df.head()

En Power Query:

Table.FirstN(#"Paso Anterior",5)

Contar filas

SELECT COUNT(*) FROM table1

spark_frame.count()

df.shape()

Table.RowCount(#"Paso Anterior")

Seleccionar filas

SELECT column1, column2 FROM table1

spark_frame.select("column1", "column2")

df[["column1", "column2"]]

#"Paso Anterior"[[Columna1],[Columna2]]

O podría ser:

Table.SelectColumns(#"Paso Anterior", {"Columna1", "Columna2"} )

Filtrar filas

SELECT column1, column2 FROM table1 WHERE column1 = 2

spark_frame.filter("column1 = 2")
# OR
spark_frame.filter(spark_frame['column1'] == 2)

df[['column1', 'column2']].loc[df['column1'] == 2]

Table.SelectRows(#"Paso Anterior", each [column1] == 2 )

Varios filtros de filas

SELECT * FROM table1 WHERE column1 > 1 AND column2 < 25

spark_frame.filter((spark_frame['column1'] > 1) & (spark_frame['column2'] < 25))

O con operadores OR y NOT

spark_frame.filter((spark_frame['column1'] > 1) | ~(spark_frame['column2'] < 25))

df.loc[(df['column1'] > 1) & (df['column2'] < 25)]

O con operadores OR y NOT

df.loc[(df['column1'] > 1) | ~(df['column2'] < 25)]

Table.SelectRows(#"Paso Anterior", each [column1] > 1 and column2 < 25 )

O con operadores OR y NOT

Table.SelectRows(#"Paso Anterior", each [column1] > 1 or not ([column1] < 25 ) )

Filtros con operadores complejos

SELECT * FROM table1 WHERE column1 BETWEEN 1 and 5 AND column2 IN (20,30,40,50) AND column3 LIKE '%arcelona%'

from pyspark.sql.functions import col
spark_frame.filter(
(col('column1').between(1, 5)) &
(col('column2').isin(20, 30, 40, 50)) &
(col('column3').like('%arcelona%'))
)
# O
spark_frame.where(
(col('column1').between(1, 5)) &
(col('column2').isin(20, 30, 40, 50)) &
(col('column3').contains('arcelona'))
)

df.loc[(df['colum1'].between(1,5)) & (df['column2'].isin([20,30,40,50])) & (df['column3'].str.contains('arcelona'))]

Table.SelectRows(#"Paso Anterior", each ([column1] > 1 and [column1] < 5) and List.Contains({20,30,40,50}, [column2]) and Text.Contains([column3], "arcelona") )

Join tables

SELECT t1.column1, t2.column1 FROM table1 t1 LEFT JOIN table2 t2 ON t1.column_id = t2.column_id

Sería correcto cambiar el alias de columnas de mismo nombre así:

spark_frame1.join(spark_frame2, spark_frame1["column_id"] == spark_frame2["column_id"], "left").select(spark_frame1["column1"].alias("column1_df1"), spark_frame2["column1"].alias("column1_df2"))

Hay dos funciones que pueden ayudarnos en este proceso merge y join.

df_joined = df1.merge(df2, left_on='lkey', right_on='rkey', how='left')
df_joined = df1.join(df2, on='column_id', how='left')Luego seleccionamos dos columnas
df_joined.loc[['column1_df1', 'column1_df2']]

En Power Query vamos a ir eligiendo una columna de antemano y luego añadiendo la segunda.

#"Origen" = #"Paso Anterior"[[column1_t1]]
#"Paso Join" = Table.NestedJoin(#"Origen", {"column_t1_id"}, table2, {"column_t2_id"}, "Prefijo", JoinKind.LeftOuter)
#"Expansion" = Table.ExpandTableColumn(#"Paso Join", "Prefijo", {"column1_t2"}, {"Prefijo_column1_t2"})

Group By

SELECT column1, count(*) FROM table1 GROUP BY column1

from pyspark.sql.functions import count
spark_frame.groupBy("column1").agg(count("*").alias("count"))

df.groupby('column1')['column1'].count()

Table.Group(#"Paso Anterior", {"column1"}, {{"Alias de count", each Table.RowCount(_), type number}})

Filtrando un agrupado

SELECT store, sum(sales) FROM table1 GROUP BY store HAVING sum(sales) > 1000

from pyspark.sql.functions import sum as spark_sum
spark_frame.groupBy("store").agg(spark_sum("sales").alias("total_sales")).filter("total_sales > 1000")

df_grouped = df.groupby('store')['sales'].sum()
df_grouped.loc[df_grouped > 1000]

#"Grouping" = Table.Group(#"Paso Anterior", {"store"}, {{"Alias de sum", each List.Sum([sales]), type number}})
#"Final" = Table.SelectRows( #"Grouping" , each [Alias de sum] > 1000 )

Ordenar descendente por columna

SELECT * FROM table1 ORDER BY column1 DESC

spark_frame.orderBy("column1", ascending=False)

df.sort_values(by=['column1'], ascending=False)

Table.Sort(#"Paso Anterior",{{"column1", Order.Descending}})

Unir una tabla con otra de la misma característica

SELECT * FROM table1 UNION SELECT * FROM table2

spark_frame1.union(spark_frame2)

En Pandas tenemos dos opciones conocidas, la función append y concat.

df.append(df2)
pd.concat([df1, df2])

Table.Combine({table1, table2})

Transformaciones

Las siguientes transformaciones son directamente entre PySpark, Pandas y Power Query puesto que no son tan comunes en un lenguaje de consulta como SQL. Puede que su resultado no sea idéntico pero si similar para el caso a resolver.

Analizar el contenido de una tabla

spark_frame.summary()

df.describe()

Table.Profile(#"Paso Anterior")

Chequear valores únicos de las columnas

spark_frame.groupBy("column1").count().show()

df.value_counts("columna1")

Table.Profile(#"Paso Anterior")[[Column],[DistinctCount]]

Generar Tabla de prueba con datos cargados a mano

spark_frame = spark.createDataFrame([(1, "Boris Yeltsin"), (2, "Mikhail Gorbachev")], inferSchema=True)

df = pd.DataFrame([[1,2],["Boris Yeltsin", "Mikhail Gorbachev"]], columns=["CustomerID", "Name"])

Table.FromRecords({[CustomerID = 1, Name = "Bob", Phone = "123-4567"]})

Quitar una columna

spark_frame.drop("column1")

df.drop(columns=['column1'])
df.drop(['column1'], axis=1)

Table.RemoveColumns(#"Paso Anterior",{"column1"})

Aplicar transformaciones sobre una columna

spark_frame.withColumn("column1", col("column1") + 1)

df.apply(lambda x : x['column1'] + 1 , axis = 1)

Table.TransformColumns(#"Paso Anterior", {{"column1", each _ + 1, type number}})

Hemos terminado el largo camino de consultas y transformaciones que nos ayudarían a tener un mejor tiempo a puro código con PySpark, SQL, Pandas y Power Query para que conociendo uno sepamos usar el otro.