# Using `JSON` data with `Python`

First, let's use `Python` to serialize and deserialize data using `JSON` for data containing only built-in types

## Serialization and deserialization of built-in types

In [None]:
import json

obj = {
    "name": "Foo Bar",
    "age": 78,
    "friends": ["Jane","John"],
    "balance": 345.80,
    "other_names":("Doe","Joe"),
    "active": True,
    "spouse": None
}

print(json.dumps(obj, sort_keys=True, indent=4))

In [None]:
with open('user.json','w') as file:
    json.dump(obj, file, sort_keys=True, indent=4)

In [None]:
!cat user.json

In [None]:
json.loads('{"active": true, "age": 78, "balance": 345.8, "friends": ["Jane","John"], "name": "Foo Bar", "other_names": ["Doe","Joe"],"spouse":null}')

In [None]:
with open('user.json', 'r') as file:
    user_data = json.load(file)

print(user_data)

## Serialization and deserialization of custom objects

In [None]:
class User(object):
    """Custom User Class
    """
    def __init__(self, name, age, active, balance, 
                 other_names, friends, spouse):
        self.name = name
        self.age = age
        self.active = active
        self.balance = balance
        self.other_names = other_names
        self.friends = friends
        self.spouse = spouse
            
    def __repr__(self):
        s = "User("
        s += "name=" + repr(self.name)
        s += ", age=" + repr(self.age)
        s += ", active=" + repr(self.active)
        s += ", other_names=" + repr(self.other_names)
        s += ", friends=" + repr(self.friends)
        s += ", spouse=" + repr(self.spouse) + ")"
        return s

new_user = User(
    name = "Foo Bar",
    age = 78,
    friends = ["Jane", "John"],
    balance = 345.80,
    other_names = ("Doe", "Joe"),
    active = True,
    spouse = None
)

new_user

In [None]:
# This will raise a TypeError
json.dumps(new_user)

As expected, the custom object `new_user` is not JSON serializable. So let's build a method that does that for us.

- I bet this comes as no surprise to us, since earlier on we established that
the json module only understands the built-in types, and User is not one of those.

- We need to send our user data to a client over anetwork, so how do we get 
ourselves out of this error state?

- A simple solution would be to convert our custom type in to a serializable
type â€” i.e a built-in type. We can conveniently define a method convert_to_dict() 
that returns a dictionary representation of our object. json.dumps() 
takes in a optional argument, default , which specifies a function to be called if the object is not serializable. This function returns a JSON encodable version of the object.

In [None]:
def obj_to_dict(obj):
    """Converts an object to a dictionary representation of the object including 
    meta-data information about the object's module and class name.

    Parameters
    ----------
    obj : `object`
        A python object to be converted into a dictionary representation

    Returns
    -------
    output : `dict`
        A dictionary representation of the object
    """
    # Add object meta data 
    obj_dict = {
        "__class__": obj.__class__.__name__,
        "__module__": obj.__module__
    }
    # Add the object properties
    obj_dict.update(obj.__dict__)
    return obj_dict

obj_to_dict(new_user)

The function `convert_to_dict` does the following:

- We then create a dictionary named obj_dict to act as the dict representation of our object.

- The magic methods `__class__.__name__` and `__module__` provide crucial metadata on the object: the class name and the module name

- We add the instance attributes of the object using `obj.__dict__` (`Python` stores instance attributes in a dictionary under the hood)

- The resulting `dict` is now serializable

Now we can comfortably call `json.dumps()` on the object and pass `default=convert_to_dict`

**Remark.** Obviously this fails if one of the attributes is not `JSON` serializable

In [None]:
print(json.dumps(new_user, default=obj_to_dict, indent=4, sort_keys=True))

Now, if we want to decode (deserialiaze) a custom object, and create the correct object type, we need a function that does the opposite of convert_to_dict, since `json.loads` simply returns a `dict`:

In [None]:
user_data = json.loads(json.dumps(new_user, default=obj_to_dict))
print(user_data)

We need `json.loads()` to reconstruct a `User` object from this dictionary: `json.loads()`
takes an optional argument `object_hook` which specifies a function that returns the desired custom object, given the decoded output (which in this case is a `dict`).

In [None]:
def dict_to_obj(input_dict):
    """Converts a dictionary representation of an object to an instance of the object.

    Parameters
    ----------
    input_dict : `dict`
        A dictionary representation of the object, containing "__module__" 
        and "__class__" metadata

    Returns
    -------    
    obj : `object`
        A python object constructed from the dictionary representation    
    """
    assert "__class__" in input_dict and "__module__" in input_dict
    class_name = input_dict.pop("__class__")
    module_name = input_dict.pop("__module__")
    module = __import__(module_name)
    class_ = getattr(module, class_name)
    obj = class_(**input_dict)
    return obj

This function does the following: 

- Extract the class name from the dictionary under the key `__class__`

- Extract the module name from the dictionary under the key `__module__`

- Imports the module and get the class

- Instantiate the class by giving to the class constructor all the instance arguments through dictionary unpacking

In [None]:
obj_data = json.dumps(new_user, default=obj_to_dict)
new_object = json.loads(obj_data, object_hook=dict_to_obj)
new_object

In [None]:
type(new_object)

In [None]:
new_object.age

# Using `JSON` with Spark

First, we download the data if it's not there yet

In [None]:
import requests, zipfile, io
from pathlib import Path

path = Path('drug-enforcement.json')
if not path.exists():
    url = "https://stephanegaiffas.github.io/big_data_course/data/drug-enforcement.json.zip"
    r = requests.get(url)
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(path='./')

## Reading a `JSON` dataset with `Spark`

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.functions import col

conf = SparkConf().setAppName("Spark JSON")
sc = SparkContext(conf=conf)

spark = (SparkSession
    .builder
    .appName("Spark JSON")
    .getOrCreate()
)

In [None]:
filename = "drug-enforcement.json"

First, lets look at the data. It's a large set of JSON records about drugs enforcement.

In [None]:
!head -n 1000 drug-enforcement.json

We need to tell spark that rows span on several lines with the `multLine` option

In [None]:
df = spark.read.json(filename, multiLine=True)

In [None]:
df.printSchema()

In [None]:
df.schema

This dataset is a little bit of a mess ! 

- First, there is a nested `opendfa` dictionary. Each element of the dictionary is an array
- A first good idea is to **"flatten" the schema of the DataFrame**, so that there is no nested types

## Flattening of the schema

All the columns in the nested structure `openfda` are put up in the schema.
These columns nested in the openfda are as follows:

In [None]:
df.select('openfda.*').columns

In [None]:
df.select("openfda.*").head(2)

In [None]:
for c in df.select("openfda.*").columns:
    df = df.withColumn("openfda_" + c, col("openfda." + c))

In [None]:
df = df.select([c for c in df.columns if c != "openfda"])

In [None]:
df.printSchema()

In [None]:
df.head(2)

Note that the display of the `DataFrame` is not as usual... it displays the dataframe like a list of `Row`, since the columns "openfda*" contain arrays of varying length

## Missing data

A strategy can be to remove rows with missing data. 
`dropna` has several options, explained below.

In [None]:
df.dropna().count()

If we remove all lines with at least one missing value, we end up with an empty dataframe !

In [None]:
df.dropna(how='all').count()

`dropna` accepts the following arguments

- `how`: can be `'any'` or `'all'`. If `'any'`, rows containing any null values will be dropped entirely (this is the default). If `'all'`, only rows which are entirely empty will be dropped.

- `thresh`: accepts an integer representing the "threshold" for how many empty cells a row must have before being dropped. `tresh` is a middle ground between `how='any'` and `how='all'`. As a result, the presence of `thresh` will override `how`

- `subset`: accepts a list of column names. When a subset is present, N/A values will only be checked against the columns whose names are provided.

In [None]:
n_columns = len(df.columns)

In [None]:
df.dropna(thresh=n_columns).count()

In [None]:
df.dropna(thresh=n_columns-1).count()

In [None]:
df.dropna(thresh=n_columns-10).count()

In [None]:
df = df.dropna(subset=['postal_code', 'city', 'country', 'address_1'])
df.count()

But before this, let's count the number of missing value for each column

In [None]:
# For each column we create a new column containing 1 if the value is null and 0 otherwise.
# We need to bast Boolean to Int so that we can use fn.sum after
for c in df.columns:
    # Do not do this for _isnull columns (ince case you run this cell twice...)
    if not c.endswith("_isnull"):
        df = df.withColumn(c + "_isnull", fn.isnull(col(c)).cast('int'))

In [None]:
df.head()

In [None]:
# Get the list of _isnull columns
isnull_columns = [c for c in df.columns if c.endswith("_isnull")]

# On the _isnull columns :
#  - we compute the sum to have the number of null values and rename the column
#  - convert to pandas for better readability
#  - transpose the pandas dataframe for better readability
missing_values = df.select(isnull_columns)\
    .agg(*[fn.sum(c).alias(c.replace("_isnull", "")) for c in isnull_columns])\
    .toPandas()

missing_values.T\
    .rename({0: "missing values"}, axis="columns")

We see that `more_code_info` is always null and that `termination_date` if often null. 
Most of the `openfda*` columns are also almost always empty.

We can keep only the columns with no missing values

In [None]:
# This line can seem complicated, run pieces of each to understand
kept_columns = list(
    missing_values.columns[(missing_values.iloc[0] == 0).values]
)

In [None]:
df_kept = df.select(kept_columns)

In [None]:
df_kept.head(2)

In [None]:
df_kept.printSchema()

In [None]:
df_kept.count()

## Filtering by string values 

Cases from South San Francisco

In [None]:
df.filter(df.city == "South San Francisco")\
    .toPandas()

**Remark.** Once again, we use `.toPandas()` to pretty format the results in the notebook. 
But it's a BAD idea to do this if the spark DataFrame is large, since it requires a `collect()`

Aside from filtering strings by a perfect match, there are plenty of other powerful ways to filter by strings in `pyspark` :

- `df.filter(df.city.contains('San Francisco'))`: returns rows where strings of a column contain a provided substring. In our example, filtering by rows which contain the substring "San Francisco" would be a good way to get all rows in San Francisco, instead of just "South San Francisco".

- `df.filter(df.city.startswith('San'))`: Returns rows where a string starts with a provided substring.

- `df.filter(df.city.endswith('ice'))`: Returns rows where a string starts with a provided substring.

- `df.filter(df.city.isNull())`: Returns rows where values in a provided column are null.

- `df.filter(df.city.isNotNull())`: Opposite of the above.

- `df.filter(df.city.like('San%'))`: Performs a SQL-like query containing the LIKE clause.

- `df.filter(df.city.rlike('[A-Z]*ice$'))`: Performs a regexp filter.

- `df.filter(df.city.isin('San Francisco', 'Los Angeles'))`: Looks for rows where the string value of a column matches any of the provided strings exactly.

You can try some of these to understand

In [None]:
df.filter(df.city.contains('San Francisco'))\
    .toPandas()

In [None]:
df.filter(df.city.isin('San Francisco', 'Los Angeles')).toPandas()

## Filtering by Date Values

In addition to filtering by strings, we can also filter by columns where the values are stored as dates or datetimes (or strings that can be inferred as dates). Perhaps the most useful way to filter dates is by using the `between()` method, which allows us to find results within a certain date range. Here we find all the results which were reported in the years 2013 and 2014:

In [None]:
df.filter(df.city == "South San Francisco")\
    .filter(df.report_date.between('2013-01-01 00:00:00','2015-01-11 00:00:00'))\
    .toPandas()

Spark is smart and understand that the string contains a date actually