Pandas pipelines

Method chaining is a great way for writing pandas code as it allows us to go from:

raw_data = pd.read_parquet(...)
data_with_types = set_dtypes(raw_data)
data_without_outliers = remove_outliers(data_with_types)

to

data = (
    pd.read_parquet(...)
    .pipe(set_dtypes)
    .pipe(remove_outliers)
)

But it does come at a cost, mostly in our ability to debug long pipelines. If there’s a mistake somewhere along the way, you can only inspect the end result and lose the ability to inspect intermediate results. A mitigation for this is to add decorators to your pipeline functions that log common attributes of your dataframe on each step:

Logging in method chaining

In order to use the logging capabilitites we first need to ensure we have a proper logger configured. We do this by running logging.basicConfig(level=logging.DEBUG).

[1]:
from sklego.datasets import load_chicken
from sklego.pandas_utils import log_step
chickweight = load_chicken(give_pandas=True)
[2]:
import logging

logging.basicConfig(level=logging.DEBUG)

If we now add a log_step decorator to our pipeline function and execute the function, we see that we get some logging statements for free

[3]:
@log_step
def set_dtypes(chickweight):
    return chickweight.assign(
        diet=lambda d: d['diet'].astype('category'),
        chick=lambda d: d['chick'].astype('category'),
    )
[4]:
chickweight.pipe(set_dtypes).head()
INFO:__main__:[set_dtypes(df)] n_obs=578 n_col=4 time=0:00:00.003235
[4]:
weight time chick diet
0 42 0 1 1
1 51 2 1 1
2 59 4 1 1
3 64 6 1 1
4 76 8 1 1

We can choose to log at different log levels. For example if we have a remove_outliers function that calls different outlier removal functions for different types of outliers, we might in general be only interested in the total outliers removed. In order to get that, we set the log level for our specific implementations to logging.DEBUG

[5]:
@log_step(level=logging.DEBUG)
def remove_dead_chickens(chickweight):
    dead_chickens = chickweight.groupby('chick').size().loc[lambda s: s < 12]
    return chickweight.loc[lambda d: ~d['chick'].isin(dead_chickens)]


@log_step
def remove_outliers(chickweight):
    return chickweight.pipe(remove_dead_chickens)
[6]:
chickweight.pipe(set_dtypes).pipe(remove_outliers).head()
INFO:__main__:[set_dtypes(df)] n_obs=578 n_col=4 time=0:00:00.002340
DEBUG:__main__:[remove_dead_chickens(df)] n_obs=519 n_col=4 time=0:00:00.005238
INFO:__main__:[remove_outliers(df)] n_obs=519 n_col=4 time=0:00:00.006010
[6]:
weight time chick diet
0 42 0 1 1
1 51 2 1 1
2 59 4 1 1
3 64 6 1 1
4 76 8 1 1

We can now easily switch between log levels to get the full detail or the general overview

[7]:
logging.getLogger(__name__).setLevel(logging.INFO)
chickweight.pipe(set_dtypes).pipe(remove_outliers).head()
INFO:__main__:[set_dtypes(df)] n_obs=578 n_col=4 time=0:00:00.002603
INFO:__main__:[remove_outliers(df)] n_obs=519 n_col=4 time=0:00:00.003335
[7]:
weight time chick diet
0 42 0 1 1
1 51 2 1 1
2 59 4 1 1
3 64 6 1 1
4 76 8 1 1