Kedro integration guide: Compare pipeline executions#
You can compare metrics, parameters, dataset versions, and other metadata from Kedro pipelines.
This guide shows how to:
- Log data versions, parameters, and metrics for every Kedro pipeline execution.
- See the diff between Kedro pipelines executions in Neptune app.
- Group Kedro pipeline executions by dataset versions and compare them.
See example in Neptune  Code examples 
Before you start#
- Sign up at neptune.ai/register.
-
Create a project for storing your metadata.
-
Have the Kedro–Neptune plugin configured and initialized according to the Setup guide.
Preparing the training runs#
Setting up the scripts#
In this section, we'll set up the Kedro nodes and add Neptune logging to the code.
-
To log the model training parameters to Neptune automatically, define them in the
conf/base/parameters.yml
file: -
To log the training datasets to Neptune automatically, define them in
conf/base/catalog.yml
or other catalog files:catalog.yml# Here you can define all your data sets by using simple YAML syntax. # # Documentation for this file format can be found in "The Data Catalog" # Link: https://kedro.readthedocs.io/en/stable/data/data_catalog.html # # We support interacting with a variety of data stores including local file systems, cloud, network and HDFS # # An example data set definition can look as follows: # #bikes: # type: pandas.CSVDataSet # filepath: "data/01_raw/bikes.csv" # #weather: # type: spark.SparkDataSet # filepath: s3a://your_bucket/data/01_raw/weather* # file_format: csv # credentials: dev_s3 # load_args: # header: True # inferSchema: True # save_args: # sep: '|' # header: True # #scooters: # type: pandas.SQLTableDataSet # credentials: scooters_credentials # table_name: scooters # load_args: # index_col: ['name'] # columns: ['name', 'gear'] # save_args: # if_exists: 'replace' # # if_exists: 'fail' # # if_exists: 'append' # # The Data Catalog supports being able to reference the same file using two different DataSet implementations # (transcoding), templating and a way to reuse arguments that are frequently repeated. See more here: # https://kedro.readthedocs.io/en/stable/data/data_catalog.html # # This is a data set used by the "Hello World" example pipeline provided with the project # template. Please feel free to remove it once you remove the example pipeline. example_iris_data: type: pandas.CSVDataSet filepath: data/01_raw/iris.csv rf_model: type: kedro.extras.datasets.pickle.PickleDataSet filepath: data/06_models/rf_model.pkl mlp_model: type: kedro.extras.datasets.pickle.PickleDataSet filepath: data/06_models/mlp_model.pkl predictions: type: kedro.extras.datasets.json.JSONDataSet filepath: data/07_model_output/predictions.json predictions@neptune: type: kedro_neptune.NeptuneFileDataSet filepath: data/07_model_output/predictions.json
-
Create a model training node in the
src/KEDRO_PROJECT/nodes.py
file.Use the parameters you defined in the
conf/base/parameters.yml
file. The node should output a trained model.nodes.pyfrom sklearn.ensemble import RandomForestClassifier ... def train_rf_model( train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any] ): max_depth = parameters["rf_max_depth"] n_estimators = parameters["rf_n_estimators"] max_features = parameters["rf_max_features"] clf = RandomForestClassifier( max_depth=max_depth, n_estimators=n_estimators, max_features=max_features, ) clf.fit(train_x, train_y) return clf
nodes.py""" This is a boilerplate pipeline generated using Kedro 0.18.4 """ from typing import Any, Dict, Tuple import matplotlib.pyplot as plt import neptune import numpy as np import pandas as pd from scikitplot.metrics import plot_precision_recall, plot_roc from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score from sklearn.neural_network import MLPClassifier def split_data( data: pd.DataFrame, parameters: Dict[str, Any] ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]: """Splits data into features and target training and test sets. Args: data: Data containing features and target. parameters: Parameters defined in parameters.yml. Returns: Split data. """ data_train = data.sample( frac=parameters["train_fraction"], random_state=parameters["random_state"] ) data_test = data.drop(data_train.index) X_train = data_train.drop(columns=parameters["target_column"]) X_test = data_test.drop(columns=parameters["target_column"]) y_train = data_train[parameters["target_column"]] y_test = data_test[parameters["target_column"]] return X_train, X_test, y_train, y_test def train_rf_model( train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any] ): max_depth = parameters["rf_max_depth"] n_estimators = parameters["rf_n_estimators"] max_features = parameters["rf_max_features"] clf = RandomForestClassifier( max_depth=max_depth, n_estimators=n_estimators, max_features=max_features, ) clf.fit(train_x, train_y) return clf def train_mlp_model( train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any] ): """Node for training MLP model""" alpha = parameters["mlp_alpha"] max_iter = parameters["mlp_max_iter"] clf = MLPClassifier(alpha=alpha, max_iter=max_iter) clf.fit(train_x, train_y) return clf def get_predictions( rf_model: RandomForestClassifier, mlp_model: MLPClassifier, test_x: pd.DataFrame ) -> Dict[str, Any]: """Node for making predictions given a pre-trained model and a test set.""" predictions = {} for name, model in zip(["rf", "mlp"], [rf_model, mlp_model]): y_pred = model.predict_proba(test_x).tolist() predictions[name] = y_pred return predictions def evaluate_models( predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.handler.Handler ): """Node for - evaluating Random Forest and MLP models - creating ROC and Precision-Recall Curves """ for name, y_pred_proba in predictions.items(): y_true = test_y.to_numpy() y_pred_proba = np.array(y_pred_proba) y_pred = np.argmax(y_pred_proba, axis=1) y_pred = np.where( y_pred == 0, "setosa", np.where(y_pred == 1, "versicolor", "virginica"), ) accuracy = accuracy_score(y_true, y_pred) neptune_run[f"nodes/evaluate_models/metrics/accuracy_{name}"] = accuracy
Note
In this example, we create a Kedro pipeline that trains and ensembles predictions from two models: Random Forest and MLPClassifier.
For simplicity, we only show the Random Forest code snippets. See the full
nodes.py
for the MLPClassifier. -
Create a model prediction node in the
src/KEDRO_PROJECT/nodes.py
file.This node should output a dictionary with predictions for two models: Random Forest and MLPClassifier.
nodes.pydef get_predictions( rf_model: RandomForestClassifier, mlp_model: MLPClassifier, test_x: pd.DataFrame ) -> Dict[str, Any]: """Node for making predictions given a pre-trained model and a test set.""" predictions = {} for name, model in zip(["rf", "mlp"], [rf_model, mlp_model]): y_pred = model.predict_proba(test_x).tolist() predictions[name] = y_pred return predictions
nodes.py""" This is a boilerplate pipeline generated using Kedro 0.18.4 """ from typing import Any, Dict, Tuple import matplotlib.pyplot as plt import neptune import numpy as np import pandas as pd from scikitplot.metrics import plot_precision_recall, plot_roc from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score from sklearn.neural_network import MLPClassifier def split_data( data: pd.DataFrame, parameters: Dict[str, Any] ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]: """Splits data into features and target training and test sets. Args: data: Data containing features and target. parameters: Parameters defined in parameters.yml. Returns: Split data. """ data_train = data.sample( frac=parameters["train_fraction"], random_state=parameters["random_state"] ) data_test = data.drop(data_train.index) X_train = data_train.drop(columns=parameters["target_column"]) X_test = data_test.drop(columns=parameters["target_column"]) y_train = data_train[parameters["target_column"]] y_test = data_test[parameters["target_column"]] return X_train, X_test, y_train, y_test def train_rf_model( train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any] ): max_depth = parameters["rf_max_depth"] n_estimators = parameters["rf_n_estimators"] max_features = parameters["rf_max_features"] clf = RandomForestClassifier( max_depth=max_depth, n_estimators=n_estimators, max_features=max_features, ) clf.fit(train_x, train_y) return clf def train_mlp_model( train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any] ): """Node for training MLP model""" alpha = parameters["mlp_alpha"] max_iter = parameters["mlp_max_iter"] clf = MLPClassifier(alpha=alpha, max_iter=max_iter) clf.fit(train_x, train_y) return clf def get_predictions( rf_model: RandomForestClassifier, mlp_model: MLPClassifier, test_x: pd.DataFrame ) -> Dict[str, Any]: """Node for making predictions given a pre-trained model and a test set.""" predictions = {} for name, model in zip(["rf", "mlp"], [rf_model, mlp_model]): y_pred = model.predict_proba(test_x).tolist() predictions[name] = y_pred return predictions def evaluate_models( predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.handler.Handler ): """Node for - evaluating Random Forest and MLP models - creating ROC and Precision-Recall Curves """ for name, y_pred_proba in predictions.items(): y_true = test_y.to_numpy() y_pred_proba = np.array(y_pred_proba) y_pred = np.argmax(y_pred_proba, axis=1) y_pred = np.where( y_pred == 0, "setosa", np.where(y_pred == 1, "versicolor", "virginica"), ) accuracy = accuracy_score(y_true, y_pred) neptune_run[f"nodes/evaluate_models/metrics/accuracy_{name}"] = accuracy
-
Import Neptune towards the top of the
nodes.py
file:nodes.py""" This is a boilerplate pipeline generated using Kedro 0.18.4 """ from typing import Any, Dict, Tuple import matplotlib.pyplot as plt import neptune import numpy as np import pandas as pd from scikitplot.metrics import plot_precision_recall, plot_roc from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score from sklearn.neural_network import MLPClassifier def split_data( ...
-
Create a model evaluation node in the
src/KEDRO_PROJECT/nodes.py
file.nodes.pydef evaluate_models( predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.handler.Handler): """Node for - evaluating Random Forest and MLP models - creating ROC and Precision-Recall Curves """ ...
Tip
You can treat
neptune_run
like a normal Neptune run and log metadata to it as you normally would.You must use the special string
neptune_run
as the run handler in Kedro pipelines. -
Calculate and log the accuracy to the
nodes/evaluate_models/metrics/accuracy_{model_name}
namespace:nodes.pyfrom sklearn.metrics import accuracy_score ... def evaluate_models( predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.handler.Handler): """Node for - evaluating Random Forest and MLP models - creating ROC and Precision-Recall Curves """ for name, y_pred_proba in predictions.items(): y_true = test_y.to_numpy() y_pred_proba = np.array(y_pred_proba) y_pred = np.argmax(y_pred_proba, axis=1) y_pred = np.where( y_pred == 0, "setosa", np.where(y_pred == 1, "versicolor", "virginica"), ) accuracy = accuracy_score(y_true, y_pred) neptune_run[f"nodes/evaluate_models/metrics/accuracy_{name}"] = accuracy
nodes.py""" This is a boilerplate pipeline generated using Kedro 0.18.4 """ from typing import Any, Dict, Tuple import matplotlib.pyplot as plt import neptune import numpy as np import pandas as pd from scikitplot.metrics import plot_precision_recall, plot_roc from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score from sklearn.neural_network import MLPClassifier def split_data( data: pd.DataFrame, parameters: Dict[str, Any] ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]: """Splits data into features and target training and test sets. Args: data: Data containing features and target. parameters: Parameters defined in parameters.yml. Returns: Split data. """ data_train = data.sample( frac=parameters["train_fraction"], random_state=parameters["random_state"] ) data_test = data.drop(data_train.index) X_train = data_train.drop(columns=parameters["target_column"]) X_test = data_test.drop(columns=parameters["target_column"]) y_train = data_train[parameters["target_column"]] y_test = data_test[parameters["target_column"]] return X_train, X_test, y_train, y_test def train_rf_model( train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any] ): max_depth = parameters["rf_max_depth"] n_estimators = parameters["rf_n_estimators"] max_features = parameters["rf_max_features"] clf = RandomForestClassifier( max_depth=max_depth, n_estimators=n_estimators, max_features=max_features, ) clf.fit(train_x, train_y) return clf def train_mlp_model( train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any] ): """Node for training MLP model""" alpha = parameters["mlp_alpha"] max_iter = parameters["mlp_max_iter"] clf = MLPClassifier(alpha=alpha, max_iter=max_iter) clf.fit(train_x, train_y) return clf def get_predictions( rf_model: RandomForestClassifier, mlp_model: MLPClassifier, test_x: pd.DataFrame ) -> Dict[str, Any]: """Node for making predictions given a pre-trained model and a test set.""" predictions = {} for name, model in zip(["rf", "mlp"], [rf_model, mlp_model]): y_pred = model.predict_proba(test_x).tolist() predictions[name] = y_pred return predictions def evaluate_models( predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.handler.Handler ): """Node for - evaluating Random Forest and MLP models - creating ROC and Precision-Recall Curves """ for name, y_pred_proba in predictions.items(): y_true = test_y.to_numpy() y_pred_proba = np.array(y_pred_proba) y_pred = np.argmax(y_pred_proba, axis=1) y_pred = np.where( y_pred == 0, "setosa", np.where(y_pred == 1, "versicolor", "virginica"), ) accuracy = accuracy_score(y_true, y_pred) neptune_run[f"nodes/evaluate_models/metrics/accuracy_{name}"] = accuracy
The example
nodes.py
script is now ready.
Adding the run to the pipeline#
Next, we'll add the Neptune run handler to the Kedro pipeline.
- Go to a pipeline definition, such as
src/KEDRO_PROJECT/pipeline.py
. - Add nodes to train the RF and MLP models, get predictions, and evaluate the models. Add "neptune_run" as an input to the
evaluate_models
node.
from .nodes import (
...,
evaluate_models,
get_predictions,
train_mlp_model,
train_rf_model,
)
...
node(
func=train_rf_model,
inputs=["X_train", "y_train", "parameters"],
outputs="rf_model",
name="train_rf_model",
),
node(
func=train_mlp_model,
inputs=["X_train", "y_train", "parameters"],
outputs="mlp_model",
name="train_mlp_model",
),
node(
func=get_predictions,
inputs=["rf_model", "mlp_model", "X_test"],
outputs="predictions",
name="get_predictions",
),
node(
func=evaluate_models,
inputs=["predictions", "y_test", "neptune_run"],
outputs=None,
name="evaluate_models",
),
...
"""
This is a boilerplate pipeline
generated using Kedro 0.18.4
"""
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import (
evaluate_models,
get_predictions,
split_data,
train_mlp_model,
train_rf_model,
)
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=split_data,
inputs=["example_iris_data", "parameters"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split",
),
node(
func=train_rf_model,
inputs=["X_train", "y_train", "parameters"],
outputs="rf_model",
name="train_rf_model",
),
node(
func=train_mlp_model,
inputs=["X_train", "y_train", "parameters"],
outputs="mlp_model",
name="train_mlp_model",
),
node(
func=get_predictions,
inputs=["rf_model", "mlp_model", "X_test"],
outputs="predictions",
name="get_predictions",
),
node(
func=evaluate_models,
inputs=["predictions", "y_test", "neptune_run"],
outputs=None,
name="evaluate_models",
),
]
)
Executing the training runs#
To have a few different runs to compare, run the training with different parameters and dataset versions.
-
In the
conf/base/parameters.yml
file, change some model training hyperparameters: -
In the
conf/base/catalog.yml
file, change the training dataset version (rename the existing file to ensure that the path is valid): -
On the command line, execute your Kedro pipeline:
Repeat the above steps a few more times.
Comparing pipeline executions#
We can now compare the Kedro pipeline executions in the Neptune app.
- Navigate to the Neptune project where you've logged your Kedro pipeline runs.
- In the runs table, click Add column.
-
Add the following fields as columns in the table:
- parameters from the
kedro/catalog/parameters/
namespace - metrics from the
kedro/nodes/evaluate_models/metrics/
namespace - dataset path from the
kedro/catalog/datasets/example_iris_data/filepath
namespace
Tip
- To customize the name and color of a column, click the settings icon ().
- To save your view for later, click Save view as new above the table.
- parameters from the
-
To select a few runs to compare, at the left edge of the table, click the eye icon ().
- Select Compare runs → Side-by-side.
-
To explore the differences between the runs, check the Rows with diff only and Show cell changes boxes.
-
To explore the runs in groups – for example, by dataset version used for the run – select Group by.
Type a field name to group the runs by.
Related
See example dashboard in Neptune  Code examples