How to Do Data Validation on Your Data on Pandas with pytest | by Byron Dolon | May, 2023


The code we’ll be working with in this piece is this set of Python functions that use Pandas to read in and process data. It includes a function to read the raw data in chunks, then a few functions that perform some transformations on the raw data.

# data_processing.py
import pandas as pd
from pandas import DataFrame

def read_raw_data(file_path: str, chunk_size: int = 1000) -> DataFrame:
csv_reader = pd.read_csv(file_path, chunksize=chunk_size)
processed_chunks = []

for chunk in csv_reader:
chunk = chunk.loc[chunk["Order ID"] != "Order ID"].dropna()
processed_chunks.append(chunk)

return pd.concat(processed_chunks, axis=0)

def split_purchase_address(df_to_process: DataFrame) -> DataFrame:
df_address_split = df_to_process["Purchase Address"].str.split(
",", n=3, expand=True
)
df_address_split.columns = ["Street Name", "City", "State and Postal Code"]

df_state_postal_split = (
df_address_split["State and Postal Code"]
.str.strip()
.str.split(" ", n=2, expand=True)
)
df_state_postal_split.columns = ["State Code", "Postal Code"]

return pd.concat([df_to_process, df_address_split, df_state_postal_split], axis=1)

def extract_product_pack_information(df_to_process: DataFrame) -> DataFrame:
df_to_process["Pack Information"] = (
df_to_process["Product"].str.extract(r".*((.*)).*").fillna("Not Pack")
)

return df_to_process

def one_hot_encode_product_column(df_to_process: DataFrame) -> DataFrame:
return pd.get_dummies(df_to_process, columns=["Product"])

def process_raw_data(file_path: str, chunk_size: int) -> DataFrame:
df = read_raw_data(file_path=file_path, chunk_size=chunk_size)

return (
df.pipe(split_purchase_address)
.pipe(extract_product_pack_information)
.pipe(one_hot_encode_product_column)
)

Next, we can get started with implementing our first data validation test. If you’re going to follow along in a notebook or IDE, you should import the following in a new file (or in another cell in your notebook):

import pandas as pd
import numpy as np
import pytest
from pandas import DataFrame
from data_processing import (
read_raw_data,
split_purchase_address,
extract_product_pack_information,
one_hot_encode_product_column,
)
from pandas.testing import assert_series_equal, assert_index_equal

You can read more on how to actually run pytest (naming conventions for files and how tests are discovered here, but for our case, all you need to do is create a new file called test_data_processing.py and in your IDE as you add to the file you just can run pytest and optionally with “- -verbose”.

Quick Introduction to pytest and Simple Data Validation Check

Pytest is a testing framework in Python that makes it easy for you to write tests for your data pipelines. You can primarily make use of the assert statement, which essentially checks if a condition you place after assert evaluates to True or False. If it evaluates to False, it will raise an exception AssertionError (and when used with pytest will cause the test to fail).

So first, let’s test something simple. All we’re going to do is check if the output of one of our functions (the first one to read the raw data) returns a DataFrame.

As a quick aside, you’ll notice in the original function we write the arrow -> syntax to add type hints to the function where we say that the function should return a DataFrame. This means that if you write in your function to return something other than a DataFrame, your IDE will flag it as returning an invalid output (but this won’t technically break your code or prevent it from running).

To actually check if the function returns a DataFrame, we’ll implement a function to test the read_raw_data function and just call it test_read_raw_data.

def test_read_raw_data():
"""Testing output of raw table read in is DataFrame"""
test_df = read_raw_data(file_path="Updated_sales.csv", chunk_size=1000)
assert isinstance(test_df, DataFrame) # checking if it's a DataFrame

In this function, we add a one-line docstring to explain that our test function is just checking if the output is a DataFrame. Then, we assign the output of the existing read_raw_data function to a variable and use isinstance to return True or False if the specified object is of the type you put in. In this case, we check if the test_df is a DataFrame.

We can similarly do this for the rest of our functions that just take a DataFrame as input and are expected to return a DataFrame as output. Implementing it can look like this:

def test_pipe_functions_output_df():
"""Testing output of raw table read in is DataFrame"""
test_df = read_raw_data(file_path="Updated_sales.csv", chunk_size=1000)
all_pipe_functions = [
split_purchase_address,
extract_product_pack_information,
one_hot_encode_product_column,
]
for function in all_pipe_functions:
assert isinstance(function(test_df), DataFrame)

Note that you can also use the assert statement in a for loop, so we just go through each of the functions, passing in a DataFrame as input and checking to see if the output is also a DataFrame.

Implementing fixtures in pytest for more efficient testing

You can see above that we had to write the exact same line twice in our two different test functions:

test_df = read_raw_data(file_path="Updated_sales.csv", chunk_size=1000)

This is because for both test functions, we needed a DataFrame as input for our test to check if the output of our data processing functions resulted in a DataFrame. So you can avoid copying the same code in all your test functions, you can use fixtures, which let you write some code that pytest will let you reuse in your different tests. Doing so looks like this:

@pytest.fixture
def test_df() -> DataFrame:
return read_raw_data(file_path="Updated_sales.csv", chunk_size=1000)

def test_read_raw_data(test_df):
"""Testing output of raw table read in is DataFrame"""
assert isinstance(test_df, DataFrame) # checking if it's a DataFrame

def test_pipe_functions_output_df(test_df):
"""Testing output of raw table read in is DataFrame"""
all_pipe_functions = [
split_purchase_address,
extract_product_pack_information,
one_hot_encode_product_column,
]
for function in all_pipe_functions:
assert isinstance(function(test_df), DataFrame)

We define the test_df in a function this time that returns the raw DataFrame. Then, in our test functions, we just include test_df as a parameter and we can use it just as we did before.

Next, let’s get into checking our split_purchase_address function, which essentially outputs the same DataFrame passed as input but with additional address columns. Our test function will look like this:

def test_split_purchase_address(test_df):
"""Testing multiple columns in output and rows unchanged"""
split_purchase_address_df = split_purchase_address(test_df)
assert len(split_purchase_address_df.columns) > len(test_df.columns)
assert split_purchase_address_df.index.__len__() == test_df.index.__len__()
assert_index_equal(split_purchase_address_df.index, test_df.index) # using the Pandas testing

Here, we’ll check two main things:

  1. Does the output DataFrame have more columns than the original DataFrame?
  2. Does the output DataFrame have a different index than the original DataFrame?

First, we run the split_purchase_address function, passing the test_df as input and assigning the result to a new variable. This gives us the output of the original function that we can then test.

To actually do the test, we could check if a specific column exists in the output DataFrame, but a simpler (not necessarily better) way of doing it is just checking if the output DataFrame has more columns than the original with the assert statement. Similarly, we can assert if the length of the index for each of the DataFrames is the same.

You can also check the Pandas testing documentation for some built-in testing functions, but there are only a few functions that essentially just check if two of a DataFrame, index, or Series are equal. We use the assert_index_equal function to do the same thing that we do with the index.__len__().

As mentioned before, we can also check if a DataFrame contains a specific column. We’ll move on to the next function extract_product_pack_information which should always output the original DataFrame with an additional column called “Pack Information”. Our test function will look like this:

def test_extract_product_pack_information(test_df):
"""Test specific output column in new DataFrame"""
product_pack_df = extract_product_pack_information(test_df)
assert "Pack Information" in product_pack_df.columns

Here, all we do is call columns again on the output of the original function, but this time check specifically if the “Pack Information” column is in the list of columns. If for some reason we edited our original extract_product_pack_information function to return additional columns or renamed the output column, this test would fail. This would be a good reminder to check if what whatever we used the final data for (like a machine learning model) also took that into account.

We could then make do two things:

  1. Make changes downstream in our code pipeline (like code that refers to the “Pack Information” column);
  2. Edit our tests to reflect the changes in our processing function.

Another thing we should be doing is checking to see if the DataFrame returned by our functions has columns of our desired data types. For example, if we’re doing calculations on numerical columns, we should see if the columns are returned as an int or float, depending on what we need.

Let’s test data types on our one_hot_encode_product_column function, where we do a common step in feature engineering on one of the categorical columns in the original DataFrame. We expect all the columns to be of the uint8 DataType (what the get_dummies function in Pandas returns by default), so we can test that like this.

def test_one_hot_encode_product_column(test_df):
"""Testing if column types are correct"""
encoded_df = one_hot_encode_product_column(test_df)
encoded_columns = [column for column in encoded_df.columns if "_" in column]
for encoded_column in encoded_columns:
assert encoded_df[encoded_column].dtype == np.dtype("uint8")

The output of the get_dummies function also returns columns that have an underscore (this, of course, could be done better by checking the actual column names- like in the previous test function we check for specific columns).

Here, all we’re doing is in a for loop of target columns checking if all of them are of the np.dtype("uint8") data type. I checked this previously by just in a notebook checking the data type of one of the output columns like column.dtype.

Another good practice in addition to testing the individual functions you have that make up your data processing and transformation pipelines is testing the final output of your pipeline.

To do so, we’ll simulate running our entire pipeline in the test, and then check the resulting DataFrame.

def test_process_raw_data(test_df):
"""Testing the final output DataFrame as a final sanity check"""
processed_df = (
test_df.pipe(split_purchase_address)
.pipe(extract_product_pack_information)
.pipe(one_hot_encode_product_column)
)

# check if all original columns are still in DataFrame
for column in test_df.columns:
if column not in processed_df.columns:
raise AssertionError(f"COLUMN -- {column} -- not in final DataFrame")

assert all(
element in list(test_df.columns) for element in list(processed_df.columns)
)

# check if final DataFrame doesn't have duplicates
assert assert_series_equal(
processed_df["Order ID"].drop_duplicates(), test_df["Order ID"]
)

Our final test_process_raw_data will check for two final things:

  1. Checking if the original columns are still present in the final DataFrame — this isn’t always a requirement, but it might be that you want all the raw data to still be available (and not transformed) in your output. Doing so is simple- we just need to check if the column in the test_df is still present in the processed_df. Finally, we can this time raise an AssertionError (similarly to just using an assert statement) if the column is not present. This is a nice example of how you can output a specific message in your tests when needed.
  2. Checking if the final DataFrame doesn’t have any duplicates — there are a lot of different ways you can do this- in this case, we’re just using the “Order ID” (which we expect to be like an index) and the assert_series_equal to see if the output DataFrame didn’t generate any duplicate rows.

Checking the pytest output

For a quick look at what running pytest looks like, in your IDE just run:

pytest --verbose

Pytest will check the new test file with all the test functions and run them! This is a simple implementation of having a series of data validation and testing checks on your data processing pipeline. If you run the above, the output should look something like this:

The output of pytest 1 — image by Author
The output of pytest 2— image by Author
The output of pytest 3— image by Author

You can see that our final test failed, specifically the part of the test where we check if all of the columns from the initial DataFrame are present in the final. Also that our custom error message in the AssertionError we defined earlier is populating correctly—that the “Product” column from our original DataFrame is not showing up in the final DataFrame (see if you can find why based on our initial data processing functions).

There’s a lot more room to improve on this testing—we just have a really simple implementation with basic testing and data validation cases. For more complex pipelines, you may want to have a lot more testing both for your individual data processing functions, as well as on your raw and final output DataFrames to ensure that the data you end up using is data you can trust.



Source link

Leave a Comment