Using enums and functools to Upgrade Your Pandas Data Pipelines | by Byron Dolon | Jun, 2023


A quick introduction to enums

You might first be wondering “What’s an enum”?

An enum, short for enumeration, is a “set of symbolic names (members) bound to unique values” (Python docs, 2023). Practically speaking, this means that you can define and use a set of related variables under one main “class”.

A simple example would be having an enum class “Color”, and having names like “Red”, “Green”, and “Blue” that you could use whenever you wanted to refer to specific colors.

Next, you’re probably wondering what’s the point of defining some variables in a separate enum class when you could just call the names you need directly in your data processing pipelines.

Enums have a few key benefits:

  • Defining enums lets you have related constants organized in one (or many) classes that can act as a source of truth for dimensions, measures, and other constants you need to call in your pipelines;
  • Using enums will allow you to avoid passing invalid values in your data pipelines, assuming you correctly define and maintain the enum class;
  • Enums allow users to work with a standardized set of data points and constants, which is helpful when multiple people are aggregating or creating models based on one main source of data (to help avoid having multiple definitions or aliases for the same column in the raw data source).

This sounds a little abstract, so let’s take a look at how you can practically apply enums when working with data in an example of a standard pre-processing pipeline.

Using enums in your data processing pipelines

We already have our initial DataFrame, so let’s begin by creating a function to add a few more columns to our data by splitting the purchase address.

def split_purchase_address(df_to_process: DataFrame) -> DataFrame:
df_address_split = df_to_process["Purchase Address"].str.split(",", n=3, expand=True)
df_address_split.columns = ["Street Name", "City", "State and Postal Code"]

df_state_postal_split = (
df_address_split["State and Postal Code"]
.str.strip()
.str.split(" ", n=2, expand=True)
)
df_state_postal_split.columns = ["State Code", "Postal Code"]

return pd.concat([df_to_process, df_address_split, df_state_postal_split], axis=1)

Next, we can apply this to our existing table by using the native Pandas pipe method like so, where we call pipe on the DataFrame and pass the function name as an argument.

processed_df = df.pipe(split_purchase_address)

Next, you’ll see that the data we have is still on a very granular level, with the Order ID as the primary key of the table. When we want to aggregate the data for further analysis, we can use the groupby method in Pandas to do so.

Some code you might see in Pandas to group data on a set of columns and then do an aggregate count on one of the dimensions (in this case we’ll use the Order ID) can look like this:

# groupby normally
grouped_df = (
processed_df
.groupby(
["Product", "Quantity Ordered", "Street Name", "City", "State Code", "Postal Code"]
)
["Order ID"]
.count()
.reset_index()
.sort_values("Order ID", ascending=False)
.rename({"Order ID": "Count of Order IDs"}, axis=1)
)

The result of this is a new DataFrame that looks like this:

In this simple example, grouping by just six columns is not too difficult and we can pass a list of these columns directly to the groupby method. However, this has a few drawbacks:

  • What if we were working with a larger dataset and wanted to group by 20 columns?
  • What if we had new requirements from end users come in and we needed to tweak the specific columns to group by?
  • What if the underlying table changes and the names or aliases of the columns changed?

We can in part address some of these issues by defining the columns in an enum class. Specifically for this case, we can define these group by columns pertaining to our sales table in a new class SalesGroupByColumns as follows:

class SalesGroupByColumns(Enum):
PRODUCT = "Product"
QUANTITY_ORDERED = "Quantity Ordered"
STREET_NAME = "Street Name"
CITY = "City"
STATE_CODE = "State Code"
POSTAL_CODE = "Postal Code"

What we’re doing here is ultimately just defining the columns as constants inside a new Enum class (which is taken from the import before from enum import Enum.

Now that we have these new enum values defined, we can access individual members of the enum like this:

SalesGroupByColumns.PRODUCT
SalesGroupByColumns.PRODUCT.value

Just calling the enum name will return the enum member, and calling value on the target enum lets us access the string value of the enum member directly. Now, to get all members of the enum into a list we can pass to the groupby, we can use a list comprehension like this:

[column.value for column in SalesGroupByColumns]

Having gotten them into a list, we can assign this output to a variable and then pass this variable to our groupby method instead of passing the raw list of strings directly:

# groupby adjusted
groupby_columns = [column.value for column in SalesGroupByColumns]

grouped_df = (
processed_df
.groupby(groupby_columns)
["Order ID"]
.count()
.reset_index()
.sort_values("Order ID", ascending=False)
.rename({"Order ID": "Count of Order IDs"}, axis=1)
)

grouped_df.head()

We arrive at the same table as before, but with code, that’s a little cleaner to look at. The benefit of this for maintainability can be seen if you’re working on the processing pipeline over time.

For example, you might find that you want to add by a few new columns, like for example if you also wanted to do a little more feature engineering and create a house number and product category column to then add to the group by. You could update your enum class like this:

# what's the benefit? adding new columns!

class SalesGroupByColumns(Enum):
PRODUCT = "Product"
QUANTITY_ORDERED = "Quantity Ordered"
STREET_NAME = "Street Name"
CITY = "City"
STATE_CODE = "State Code"
POSTAL_CODE = "Postal Code"
HOUSE_NUMBER = "House Number"
PRODUCT_CATEGORY = "Prouct Category"

# then you run the code same as before and it would still work

Then, you wouldn’t need to change your existing processing code, because the list comprehension would automatically grab all the values in the SalesGroupByColumns class and apply that to your aggregation logic.

A good note here is that all of this will only work if you know exactly what you’re defining in the enum class and use them only as intended. If you make changes here and you’re grabbing all these columns to group by in a few different tables, it’s important that that’s what you’re intending to do.

Otherwise, you could instead define the set of enums you need to use for a specific table either in separate classes or if it makes sense in a separate list of columns (so you still avoid passing a raw list of strings to the groupby method.

Using enums for your data aggregation in Pandas

For another example, say we had a different case where we apply a few additional transformations to our data by changing some columns’ data types and creating a new total cost column. We can add to our previous pipeline like so:

def convert_numerical_column_types(df_to_process: DataFrame) -> DataFrame:
df_to_process["Quantity Ordered"] = df_to_process["Quantity Ordered"].astype(int)
df_to_process["Price Each"] = df_to_process["Price Each"].astype(float)
df_to_process["Order ID"] = df_to_process["Order ID"].astype(int)

return df_to_process

def calculate_total_order_cost(df_to_process: DataFrame) -> DataFrame:
df_to_process["Total Cost"] = df_to_process["Quantity Ordered"] * df_to_process["Price Each"]
return df_to_process

processed_df = (
df
.pipe(split_purchase_address)
.pipe(convert_numerical_column_types)
.pipe(calculate_total_order_cost)
)

Now that our DataFrame is transformed on an Order ID level, let’s next perform another group by on a new set of columns, but this time aggregate on a few different measures:

# let's say we have a file now "SalesColumns.py"
# we can add to it

import numpy as np

class AddressColumns(Enum):
STREET_NAME = "Street Name"
CITY = "City"
STATE_CODE = "State Code"
POSTAL_CODE = "Postal Code"

class SalesMeasureColumns(Enum):
TOTAL_COST = "Total Cost"
QUANTITY_ORDERED = "Quantity Ordered"

# then separately we can do the groupby
groupby_columns = [column.value for column in AddressColumns]

grouped_df = (
processed_df
.groupby(groupby_columns)
.agg(
Total_Cost=(SalesMeasureColumns.TOTAL_COST.value, np.sum),
Total_Quantity_Ordered=(SalesMeasureColumns.QUANTITY_ORDERED.value, np.sum)
)
.reset_index()
.sort_values("Total_Cost", ascending=False)
)

There are a few key points to note here:

  • We’ve defined a new set of enum classes: AddressColumns and SalesMeasureColumns. Now for a different table where we want to group specifically on address fields, we can instead define the groupby_columns list to include those columns to later pass to the groupby method on the transformed DataFrame.
  • The SalesMeasureColumns class includes the measures that we want to be aggregating. Putting column names from the raw table in the class means that if other people also want to sum the cost and quantity ordered they call the proper columns.

We could additionally add to our pipeline from before that chained pipes and the functions we defined from before and put this code to collect the list of columns and aggregate the table in a new function. Then the final code becomes easier to read and potentially debug and log over time.

For the aggregation, it’s also possible that total_cost and quantity_ordered are defined differently for different tables, teams, and end users. Defining it in the enum for SalesMeasuresColumns means that for the Sales table and measures all users can do the aggregation on these columns with the same definitions.



Source link

Leave a Comment