In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("Spark SQL Course")
sc = SparkContext(conf=conf)

spark = (SparkSession
    .builder
    .appName("Spark SQL Course")
    .getOrCreate()
)

# `DataFrame`

In [59]:
from pyspark.sql import Row

row1 = Row(name="John", age=21)
row2 = Row(name="James", age=32)
row3 = Row(name="Jane", age=18)
row1['name']

'John'

In [60]:
df = spark.createDataFrame([row1, row2, row3])

In [63]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [64]:
df.show()

+-----+---+
| name|age|
+-----+---+
| John| 21|
|James| 32|
| Jane| 18|
+-----+---+



In [65]:
print(df.rdd.toDebugString().decode("utf-8"))

(16) MapPartitionsRDD[376] at javaToPython at NativeMethodAccessorImpl.java:0 []
 |   MapPartitionsRDD[375] at javaToPython at NativeMethodAccessorImpl.java:0 []
 |   SQLExecutionRDD[374] at javaToPython at NativeMethodAccessorImpl.java:0 []
 |   MapPartitionsRDD[373] at javaToPython at NativeMethodAccessorImpl.java:0 []
 |   MapPartitionsRDD[370] at applySchemaToPythonRDD at <unknown>:0 []
 |   MapPartitionsRDD[369] at map at SerDeUtil.scala:137 []
 |   MapPartitionsRDD[368] at mapPartitions at SerDeUtil.scala:184 []
 |   PythonRDD[367] at RDD at PythonRDD.scala:53 []
 |   ParallelCollectionRDD[366] at readRDDFromFile at PythonRDD.scala:262 []


In [66]:
df.rdd.getNumPartitions()

16

## Creating dataframes

In [8]:
rows = [
    Row(name="John", age=21, gender="male"),
    Row(name="James", age=25, gender="female"),
    Row(name="Albert", age=46, gender="male")
]
df = spark.createDataFrame(rows)
df.show()

+------+---+------+
|  name|age|gender|
+------+---+------+
|  John| 21|  male|
| James| 25|female|
|Albert| 46|  male|
+------+---+------+



In [9]:
column_names = ["name", "age", "gender"]
rows = [
    ["John", 21, "male"],
    ["James", 25, "female"],
    ["Albert", 46, "male"]
]
df = spark.createDataFrame(rows, column_names)
df.show()

+------+---+------+
|  name|age|gender|
+------+---+------+
|  John| 21|  male|
| James| 25|female|
|Albert| 46|  male|
+------+---+------+



In [10]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)



In [11]:
column_names = ["name", "age", "gender"]
rdd = sc.parallelize([
    ("John", 21, "male"),
    ("James", 25, "female"),
    ("Albert", 46, "male")
])
df = spark.createDataFrame(rdd, column_names)
df.show()

+------+---+------+
|  name|age|gender|
+------+---+------+
|  John| 21|  male|
| James| 25|female|
|Albert| 46|  male|
+------+---+------+



## Schema

In [12]:
df.schema

StructType(List(StructField(name,StringType,true),StructField(age,LongType,true),StructField(gender,StringType,true)))

In [13]:
type(df.schema)

pyspark.sql.types.StructType

In [14]:
from pyspark.sql.types import *

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True)
])
rows = [("John", 21, "male")]
df = spark.createDataFrame(rows, schema)
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)

+----+---+------+
|name|age|gender|
+----+---+------+
|John| 21|  male|
+----+---+------+



# Queries

In [15]:
column_names = ["name", "age", "gender"]
rows = [
    ["John", 21, "male"],
    ["Jane", 25, "female"]
]
df = spark.createDataFrame(rows, column_names)

# Create a temporary view from the DataFrame
df.createOrReplaceTempView("new_view")

# Apply the query
query = "SELECT name, age FROM new_view WHERE gender='male'"
men_df = spark.sql(query)
men_df.show()

+----+---+
|name|age|
+----+---+
|John| 21|
+----+---+



## `SELECT`

In [16]:
df.createOrReplaceTempView("table")
query = "SELECT name, age FROM table"
spark.sql(query).show()

+----+---+
|name|age|
+----+---+
|John| 21|
|Jane| 25|
+----+---+



In [17]:
df.select("name", "age").show()

+----+---+
|name|age|
+----+---+
|John| 21|
|Jane| 25|
+----+---+



## `WHERE`

In [18]:
df.createOrReplaceTempView("table")
query = "SELECT * FROM table WHERE age > 21"
spark.sql(query).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+



In [19]:
df.where("age > 21").show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+



In [20]:
# Alternatively:
df.where(df['age'] > 21).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+



In [21]:
df.where(df.age > 21).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+



In [22]:
df.select("*").where("age > 21").show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+



## `LIMIT`

In [23]:
df.createOrReplaceTempView("table")
query = query = "SELECT * FROM table LIMIT 1"
spark.sql(query).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|John| 21|  male|
+----+---+------+



In [24]:
df.limit(1).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|John| 21|  male|
+----+---+------+



In [25]:
df.select("*").limit(1).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|John| 21|  male|
+----+---+------+



## `ORDER BY`

In [26]:
df.createOrReplaceTempView("table")
query = "SELECT * FROM table ORDER BY name ASC"
spark.sql(query).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
|John| 21|  male|
+----+---+------+



In [27]:
df.orderBy(df.name.asc()).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
|John| 21|  male|
+----+---+------+



## `ALIAS`

In [28]:
df.createOrReplaceTempView("table")
query = "SELECT name, age, gender AS sex FROM table"
spark.sql(query).show()

+----+---+------+
|name|age|   sex|
+----+---+------+
|John| 21|  male|
|Jane| 25|female|
+----+---+------+



In [29]:
df.select(df.name, df.age, df.gender.alias('sex')).show()

+----+---+------+
|name|age|   sex|
+----+---+------+
|John| 21|  male|
|Jane| 25|female|
+----+---+------+



## `CAST`

In [30]:
df.createOrReplaceTempView("table")
query = "SELECT name, cast(age AS float) AS age_f FROM table"
spark.sql(query).show()

+----+-----+
|name|age_f|
+----+-----+
|John| 21.0|
|Jane| 25.0|
+----+-----+



In [31]:
df.select(df.name, df.age.cast("float").alias("age_f")).show()

+----+-----+
|name|age_f|
+----+-----+
|John| 21.0|
|Jane| 25.0|
+----+-----+



In [32]:
new_age_col = df.age.cast("float").alias("age_f")
df.select(df.name, new_age_col).show()

+----+-----+
|name|age_f|
+----+-----+
|John| 21.0|
|Jane| 25.0|
+----+-----+



## Adding new columns

In [33]:
df.createOrReplaceTempView("table")
query = "SELECT *, 12*age AS age_months FROM table"
spark.sql(query).show()

+----+---+------+----------+
|name|age|gender|age_months|
+----+---+------+----------+
|John| 21|  male|       252|
|Jane| 25|female|       300|
+----+---+------+----------+



In [34]:
df.withColumn("age_months", df.age * 12).show()

+----+---+------+----------+
|name|age|gender|age_months|
+----+---+------+----------+
|John| 21|  male|       252|
|Jane| 25|female|       300|
+----+---+------+----------+



In [35]:
df.select("*", (df.age * 12).alias("age_months")).show()

+----+---+------+----------+
|name|age|gender|age_months|
+----+---+------+----------+
|John| 21|  male|       252|
|Jane| 25|female|       300|
+----+---+------+----------+



# Column functions

## Numeric functions examples

In [36]:
from pyspark.sql import functions as fn

columns = ["brand", "cost"]
df = spark.createDataFrame([
    ("garnier", 3.49),
    ("elseve", 2.71)
], columns)

round_cost = fn.round(df.cost, 1)
floor_cost = fn.floor(df.cost)
ceil_cost = fn.ceil(df.cost)

df.withColumn('round', round_cost)\
    .withColumn('floor', floor_cost)\
    .withColumn('ceil', ceil_cost)\
    .show()

+-------+----+-----+-----+----+
|  brand|cost|round|floor|ceil|
+-------+----+-----+-----+----+
|garnier|3.49|  3.5|    3|   4|
| elseve|2.71|  2.7|    2|   3|
+-------+----+-----+-----+----+



## String functions examples

In [37]:
from pyspark.sql import functions as fn

columns = ["first_name", "last_name"]

df = spark.createDataFrame([
    ("John", "Doe"),
    ("Mary", "Jane")
], columns)

last_name_initial = fn.substring(df.last_name, 0, 1)
name = fn.concat_ws(" ", df.first_name, last_name_initial)
df.withColumn("name", name).show()

+----------+---------+------+
|first_name|last_name|  name|
+----------+---------+------+
|      John|      Doe|John D|
|      Mary|     Jane|Mary J|
+----------+---------+------+



## Date functions examples

In [38]:
from datetime import date
from pyspark.sql import functions as fn

df = spark.createDataFrame([
    (date(2015, 1, 1), date(2015, 1, 15)),
    (date(2015, 2, 21), date(2015, 3, 8)),
], ["start_date", "end_date"])

days_between = fn.datediff(df.end_date, df.start_date)
start_month = fn.month(df.start_date)

df.withColumn('days_between', days_between)\
    .withColumn('start_month', start_month)\
    .show()

+----------+----------+------------+-----------+
|start_date|  end_date|days_between|start_month|
+----------+----------+------------+-----------+
|2015-01-01|2015-01-15|          14|          1|
|2015-02-21|2015-03-08|          15|          2|
+----------+----------+------------+-----------+



## Conditional transformations

In [39]:
df = spark.createDataFrame([
    ("John", 21, "male"),
    ("Jane", 25, "female"),
    ("Albert", 46, "male"),
    ("Brad", 49, "super-hero")
], ["name", "age", "gender"])

supervisor = fn.when(df.gender == 'male', 'Mr. Smith')\
    .when(df.gender == 'female', 'Miss Jones')\
    .otherwise('NA')

df.withColumn("supervisor", supervisor).show()

+------+---+----------+----------+
|  name|age|    gender|supervisor|
+------+---+----------+----------+
|  John| 21|      male| Mr. Smith|
|  Jane| 25|    female|Miss Jones|
|Albert| 46|      male| Mr. Smith|
|  Brad| 49|super-hero|        NA|
+------+---+----------+----------+



## User-defined functions

In [40]:
from pyspark.sql import functions as fn
from pyspark.sql.types import StringType

df = spark.createDataFrame([(1, 3), (4, 2)], ["first", "second"])

def my_func(col_1, col_2):
    if (col_1 > col_2):
        return "{} is bigger than {}".format(col_1, col_2)
    else:
        return "{} is bigger than {}".format(col_2, col_1)

my_udf = fn.udf(my_func, StringType())

df.withColumn("udf", my_udf(df['first'], df['second'])).show()

+-----+------+------------------+
|first|second|               udf|
+-----+------+------------------+
|    1|     3|3 is bigger than 1|
|    4|     2|4 is bigger than 2|
+-----+------+------------------+



# Joins

## Using the `spark.sql` API

In [41]:
from datetime import date

products = spark.createDataFrame([
    ('1', 'mouse', 'microsoft', 39.99),
    ('2', 'keyboard', 'logitech', 59.99),
], ['prod_id', 'prod_cat', 'prod_brand', 'prod_value'])

purchases = spark.createDataFrame([
    (date(2017, 11, 1), 2, '1'),
    (date(2017, 11, 2), 1, '1'),
    (date(2017, 11, 5), 1, '2'),
], ['date', 'quantity', 'prod_id'])

# The default join type is the "INNER" join
purchases.join(products, 'prod_id').show()

+-------+----------+--------+--------+----------+----------+
|prod_id|      date|quantity|prod_cat|prod_brand|prod_value|
+-------+----------+--------+--------+----------+----------+
|      1|2017-11-01|       2|   mouse| microsoft|     39.99|
|      1|2017-11-02|       1|   mouse| microsoft|     39.99|
|      2|2017-11-05|       1|keyboard|  logitech|     59.99|
+-------+----------+--------+--------+----------+----------+



## Using a `SQL` query

In [42]:
products.createOrReplaceTempView("products")
purchases.createOrReplaceTempView("purchases")

query = """
SELECT * FROM
(purchases AS prc INNER JOIN products AS prd 
on prc.prod_id = prd.prod_id)
"""
spark.sql(query).show()


+----------+--------+-------+-------+--------+----------+----------+
|      date|quantity|prod_id|prod_id|prod_cat|prod_brand|prod_value|
+----------+--------+-------+-------+--------+----------+----------+
|2017-11-01|       2|      1|      1|   mouse| microsoft|     39.99|
|2017-11-02|       1|      1|      1|   mouse| microsoft|     39.99|
|2017-11-05|       1|      2|      2|keyboard|  logitech|     59.99|
+----------+--------+-------+-------+--------+----------+----------+



In [43]:
new_purchases = spark.createDataFrame([
    (date(2017, 11, 1), 2, '1'),
    (date(2017, 11, 2), 1, '3'),
], ['date', 'quantity', 'prod_id_x'])

# The default join type is the "INNER" join
join_rule = new_purchases.prod_id_x == products.prod_id
new_purchases.join(products, join_rule, 'left').show()


+----------+--------+---------+-------+--------+----------+----------+
|      date|quantity|prod_id_x|prod_id|prod_cat|prod_brand|prod_value|
+----------+--------+---------+-------+--------+----------+----------+
|2017-11-02|       1|        3|   null|    null|      null|      null|
|2017-11-01|       2|        1|      1|   mouse| microsoft|     39.99|
+----------+--------+---------+-------+--------+----------+----------+



In [44]:
new_purchases = spark.createDataFrame([
    (date(2017, 11, 1), 2, '1'),
    (date(2017, 11, 2), 1, '3'),
], ['date', 'quantity', 'prod_id_x'])

# The default join type is the "INNER" join
join_rule = new_purchases.prod_id_x == products.prod_id
new_purchases.join(products, join_rule, 'left').show()

+----------+--------+---------+-------+--------+----------+----------+
|      date|quantity|prod_id_x|prod_id|prod_cat|prod_brand|prod_value|
+----------+--------+---------+-------+--------+----------+----------+
|2017-11-02|       1|        3|   null|    null|      null|      null|
|2017-11-01|       2|        1|      1|   mouse| microsoft|     39.99|
+----------+--------+---------+-------+--------+----------+----------+



## Various types of joins

In [45]:
left = spark.createDataFrame([
    (1, "A1"), (2, "A2"), (3, "A3"), (4, "A4")], 
    ["id", "value"])

right = spark.createDataFrame([
    (3, "A3"), (4, "A4"), (4, "A4_1"), (5, "A5"), (6, "A6")], 
    ["id", "value"])

join_types = [
    "inner", "outer", "left", "right",
    "leftsemi", "leftanti"
]

In [46]:
for join_type in join_types:
    print(join_type)
    left.join(right, on="id", how=join_type)\
        .orderBy("id")\
        .show()

inner
+---+-----+-----+
| id|value|value|
+---+-----+-----+
|  3|   A3|   A3|
|  4|   A4|   A4|
|  4|   A4| A4_1|
+---+-----+-----+

outer
+---+-----+-----+
| id|value|value|
+---+-----+-----+
|  1|   A1| null|
|  2|   A2| null|
|  3|   A3|   A3|
|  4|   A4| A4_1|
|  4|   A4|   A4|
|  5| null|   A5|
|  6| null|   A6|
+---+-----+-----+

left
+---+-----+-----+
| id|value|value|
+---+-----+-----+
|  1|   A1| null|
|  2|   A2| null|
|  3|   A3|   A3|
|  4|   A4|   A4|
|  4|   A4| A4_1|
+---+-----+-----+

right
+---+-----+-----+
| id|value|value|
+---+-----+-----+
|  3|   A3|   A3|
|  4|   A4|   A4|
|  4|   A4| A4_1|
|  5| null|   A5|
|  6| null|   A6|
+---+-----+-----+

leftsemi
+---+-----+
| id|value|
+---+-----+
|  3|   A3|
|  4|   A4|
+---+-----+

leftanti
+---+-----+
| id|value|
+---+-----+
|  1|   A1|
|  2|   A2|
+---+-----+



# Agregations

## Examples using the API

In [47]:
from pyspark.sql import functions as fn

products = spark.createDataFrame([
    ('1', 'mouse', 'microsoft', 39.99),
    ('2', 'mouse', 'microsoft', 59.99),
    ('3', 'keyboard', 'microsoft', 59.99),
    ('4', 'keyboard', 'logitech', 59.99),
    ('5', 'mouse', 'logitech', 29.99),
], ['prod_id', 'prod_cat', 'prod_brand', 'prod_value'])

products.groupBy('prod_cat').avg('prod_value').show()

+--------+-----------------+
|prod_cat|  avg(prod_value)|
+--------+-----------------+
|keyboard|            59.99|
|   mouse|43.32333333333333|
+--------+-----------------+



In [48]:
products.groupBy('prod_cat').agg(fn.avg('prod_value')).show()

+--------+-----------------+
|prod_cat|  avg(prod_value)|
+--------+-----------------+
|keyboard|            59.99|
|   mouse|43.32333333333333|
+--------+-----------------+



In [49]:
from pyspark.sql import functions as fn

products.groupBy('prod_brand', 'prod_cat')\
    .agg(fn.avg('prod_value')).show()

+----------+--------+---------------+
|prod_brand|prod_cat|avg(prod_value)|
+----------+--------+---------------+
| microsoft|   mouse|          49.99|
|  logitech|keyboard|          59.99|
| microsoft|keyboard|          59.99|
|  logitech|   mouse|          29.99|
+----------+--------+---------------+



In [50]:
from pyspark.sql import functions as fn

products.groupBy('prod_brand').agg(
    fn.round(fn.avg('prod_value'), 1).alias('average'),
    fn.ceil(fn.sum('prod_value')).alias('sum'),
    fn.min('prod_value').alias('min')
).show()

+----------+-------+---+-----+
|prod_brand|average|sum|  min|
+----------+-------+---+-----+
|  logitech|   45.0| 90|29.99|
| microsoft|   53.3|160|39.99|
+----------+-------+---+-----+



## Example using a query

In [51]:
products.createOrReplaceTempView("products")

query = """
SELECT
prod_brand,
round(avg(prod_value), 1) AS average,
min(prod_value) AS min
FROM products
GROUP BY prod_brand
"""

spark.sql(query).show()

+----------+-------+-----+
|prod_brand|average|  min|
+----------+-------+-----+
|  logitech|   45.0|29.99|
| microsoft|   53.3|39.99|
+----------+-------+-----+



# Window functions

## Numerical window functions

In [52]:
from pyspark.sql import Window
from pyspark.sql import functions as fn

# First, we create the Window definition
window = Window.partitionBy('prod_brand')

# Then, we can use "over" to aggregate on this window
avg = fn.avg('prod_value').over(window)

# Finally, we can it as a classical column
products.withColumn('avg_brand_value', fn.round(avg, 2)).show()

+-------+--------+----------+----------+---------------+
|prod_id|prod_cat|prod_brand|prod_value|avg_brand_value|
+-------+--------+----------+----------+---------------+
|      4|keyboard|  logitech|     59.99|          44.99|
|      5|   mouse|  logitech|     29.99|          44.99|
|      1|   mouse| microsoft|     39.99|          53.32|
|      2|   mouse| microsoft|     59.99|          53.32|
|      3|keyboard| microsoft|     59.99|          53.32|
+-------+--------+----------+----------+---------------+



In [53]:
from pyspark.sql import Window
from pyspark.sql import functions as fn

# The window can be defined on multiple columns
window = Window.partitionBy('prod_brand', 'prod_cat')

avg = fn.avg('prod_value').over(window)

products.withColumn('avg_value', fn.round(avg, 2)).show()

+-------+--------+----------+----------+---------+
|prod_id|prod_cat|prod_brand|prod_value|avg_value|
+-------+--------+----------+----------+---------+
|      1|   mouse| microsoft|     39.99|    49.99|
|      2|   mouse| microsoft|     59.99|    49.99|
|      4|keyboard|  logitech|     59.99|    59.99|
|      3|keyboard| microsoft|     59.99|    59.99|
|      5|   mouse|  logitech|     29.99|    29.99|
+-------+--------+----------+----------+---------+



In [54]:
from pyspark.sql import Window
from pyspark.sql import functions as fn

# Multiple windows can be defined
window1 = Window.partitionBy('prod_brand')
window2 = Window.partitionBy('prod_cat')

avg_brand = fn.avg('prod_value').over(window1)
avg_cat = fn.avg('prod_value').over(window2)

products \
    .withColumn('avg_by_brand', fn.round(avg_brand, 2)) \
    .withColumn('avg_by_cat', fn.round(avg_cat, 2)) \
    .show()

+-------+--------+----------+----------+------------+----------+
|prod_id|prod_cat|prod_brand|prod_value|avg_by_brand|avg_by_cat|
+-------+--------+----------+----------+------------+----------+
|      4|keyboard|  logitech|     59.99|       44.99|     59.99|
|      3|keyboard| microsoft|     59.99|       53.32|     59.99|
|      5|   mouse|  logitech|     29.99|       44.99|     43.32|
|      1|   mouse| microsoft|     39.99|       53.32|     43.32|
|      2|   mouse| microsoft|     59.99|       53.32|     43.32|
+-------+--------+----------+----------+------------+----------+



## Lag and Lead

In [55]:
purchases = spark.createDataFrame([
    (date(2017, 11, 1), 'mouse'),
    (date(2017, 11, 2), 'mouse'),
    (date(2017, 11, 4), 'keyboard'),
    (date(2017, 11, 6), 'keyboard'),
    (date(2017, 11, 9), 'keyboard'),
    (date(2017, 11, 12), 'mouse'),
    (date(2017, 11, 18), 'keyboard')
], ['date', 'prod_cat'])
purchases.show()

window = Window.partitionBy('prod_cat').orderBy('date')

prev_purch = fn.lag('date', 1).over(window)
next_purch = fn.lead('date', 1).over(window)

purchases\
    .withColumn('prev', prev_purch)\
    .withColumn('next', next_purch)\
    .orderBy('prod_cat', 'date')\
    .show()

+----------+--------+
|      date|prod_cat|
+----------+--------+
|2017-11-01|   mouse|
|2017-11-02|   mouse|
|2017-11-04|keyboard|
|2017-11-06|keyboard|
|2017-11-09|keyboard|
|2017-11-12|   mouse|
|2017-11-18|keyboard|
+----------+--------+

+----------+--------+----------+----------+
|      date|prod_cat|      prev|      next|
+----------+--------+----------+----------+
|2017-11-04|keyboard|      null|2017-11-06|
|2017-11-06|keyboard|2017-11-04|2017-11-09|
|2017-11-09|keyboard|2017-11-06|2017-11-18|
|2017-11-18|keyboard|2017-11-09|      null|
|2017-11-01|   mouse|      null|2017-11-02|
|2017-11-02|   mouse|2017-11-01|2017-11-12|
|2017-11-12|   mouse|2017-11-02|      null|
+----------+--------+----------+----------+



## Rank, DenseRank and RowNumber

In [56]:
contestants = spark.createDataFrame([
    ('veterans', 'John', 3000),
    ('veterans', 'Bob', 3200),
    ('veterans', 'Mary', 4000),
    ('young', 'Jane', 4000),
    ('young', 'April', 3100),
    ('young', 'Alice', 3700),
    ('young', 'Micheal', 4000),
], ['category', 'name', 'points'])

contestants.show()

+--------+-------+------+
|category|   name|points|
+--------+-------+------+
|veterans|   John|  3000|
|veterans|    Bob|  3200|
|veterans|   Mary|  4000|
|   young|   Jane|  4000|
|   young|  April|  3100|
|   young|  Alice|  3700|
|   young|Micheal|  4000|
+--------+-------+------+



In [57]:
window = Window.partitionBy('category')\
    .orderBy(contestants.points.desc())

rank = fn.rank().over(window)
dense_rank = fn.dense_rank().over(window)
row_number = fn.row_number().over(window)

contestants\
    .withColumn('rank', rank)\
    .withColumn('dense_rank', dense_rank)\
    .withColumn('row_number', row_number)\
    .orderBy('category', fn.col('points').desc())\
    .show()

+--------+-------+------+----+----------+----------+
|category|   name|points|rank|dense_rank|row_number|
+--------+-------+------+----+----------+----------+
|veterans|   Mary|  4000|   1|         1|         1|
|veterans|    Bob|  3200|   2|         2|         2|
|veterans|   John|  3000|   3|         3|         3|
|   young|   Jane|  4000|   1|         1|         1|
|   young|Micheal|  4000|   1|         1|         2|
|   young|  Alice|  3700|   3|         2|         3|
|   young|  April|  3100|   4|         3|         4|
+--------+-------+------+----+----------+----------+

