Skip to content

Working with Kedro: Comparing pipeline executions#

Comparing and grouping pipeline runs in the Neptune runs table

You can compare metrics, parameters, dataset versions, and other metadata from Kedro pipelines.

This guide shows how to:

  • Log data versions, parameters, and metrics for every Kedro pipeline execution.
  • See the diff between Kedro pipelines executions in Neptune app.
  • Group Kedro pipeline executions by dataset versions and compare them.

See example dashboard in Neptune  Code examples 

Before you start#

Preparing the training runs#

Setting up the scripts#

In this section, we'll set up the Kedro nodes and add Neptune logging to the code.

  1. To log the model training parameters to Neptune automatically, define them in the conf/base/parameters.yml file:

    parameters.yml
    # Random forest parameters
    rf_max_depth: 3
    rf_max_features: 3
    rf_n_estimators: 25
    
    parameters.yml
    # Parameters for the example pipeline. Feel free to delete these once you
    # remove the example pipeline from hooks.py and the example nodes in
    # src/pipelines/
    
    # Data split parameters
    example_test_data_ratio: 0.2
    
    # Random forest parameters
    rf_max_depth: 3
    rf_max_features: 3
    rf_n_estimators: 25
    
    # MLP parameters
    mlp_alpha: 0.02
    mlp_max_iter: 50
    
  2. To log the training datasets to Neptune automatically, define them in conf/base/catalog.yml or other catalog files:

    catalog.yml
    example_iris_data:
        type: pandas.CSVDataSet
        filepath: data/01_raw/iris.csv
    
    catalog.yml
    # Here you can define all your data sets by using simple YAML syntax.
    #
    # Documentation for this file format can be found in "The Data Catalog"
    # Link: https://kedro.readthedocs.io/en/stable/05_data/01_data_catalog.html
    #
    # We support interacting with a variety of data stores including local file systems, cloud, network and HDFS
    #
    # An example data set definition can look as follows:
    #
    #bikes:
    #  type: pandas.CSVDataSet
    #  filepath: "data/01_raw/bikes.csv"
    #
    #weather:
    #  type: spark.SparkDataSet
    #  filepath: s3a://your_bucket/data/01_raw/weather*
    #  file_format: csv
    #  credentials: dev_s3
    #  load_args:
    #    header: True
    #    inferSchema: True
    #  save_args:
    #    sep: '|'
    #    header: True
    #
    #scooters:
    #  type: pandas.SQLTableDataSet
    #  credentials: scooters_credentials
    #  table_name: scooters
    #  load_args:
    #    index_col: ['name']
    #    columns: ['name', 'gear']
    #  save_args:
    #    if_exists: 'replace'
    #    # if_exists: 'fail'
    #    # if_exists: 'append'
    #
    # The Data Catalog supports being able to reference the same file using two different DataSet implementations
    # (transcoding), templating and a way to reuse arguments that are frequently repeated. See more here:
    # https://kedro.readthedocs.io/en/stable/05_data/01_data_catalog.html
    #
    # This is a data set used by the "Hello World" example pipeline provided with the project
    # template. Please feel free to remove it once you remove the example pipeline.
    
    example_iris_data:
    type: pandas.CSVDataSet
    filepath: data/01_raw/iris_v2.csv
    
    rf_model:
    type: kedro.extras.datasets.pickle.PickleDataSet
    filepath: data/06_models/rf_model.pkl
    
    mlp_model:
    type: kedro.extras.datasets.pickle.PickleDataSet
    filepath: data/06_models/mlp_model.pkl
    
    predictions:
    type: kedro.extras.datasets.json.JSONDataSet
    filepath: data/07_model_output/predictions.json
    
    predictions@neptune:
    type: kedro_neptune.NeptuneFileDataSet
    filepath: data/07_model_output/predictions.json
    
  3. Create a model training node in the src/KEDRO_PROJECT/pipelines/data_science/nodes.py file.

    Use the parameters you defined in the conf/base/parameters.yml file. The node should output a trained model.

    def train_rf_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
    
        max_depth = parameters["rf_max_depth"]
        n_estimators = parameters["rf_n_estimators"]
        max_features = parameters["rf_max_features"]
    
        clf = RandomForestClassifier(
            max_depth=max_depth,
            n_estimators=n_estimators,
            max_features=max_features,
        )
        clf.fit(train_x, train_y.idxmax(axis=1))
    
        return clf
    

    """Example code for the nodes in the example pipeline. This code is meant
    just for illustrating basic Kedro features.
    
    Delete this when you start working on your own Kedro project.
    """
    # pylint: disable=invalid-name
    
    import logging
    from typing import Any, Dict
    
    import matplotlib.pyplot as plt
    import numpy as np
    import pandas as pd
    from scikitplot.metrics import plot_precision_recall_curve, plot_roc_curve
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    from sklearn.neural_network import MLPClassifier
    
    import neptune.new as neptune
    
    
    def train_rf_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
        """Node for training Random Forest model"""
        max_depth = parameters["rf_max_depth"]
        n_estimators = parameters["rf_n_estimators"]
        max_features = parameters["rf_max_features"]
    
        clf = RandomForestClassifier(
            max_depth=max_depth,
            n_estimators=n_estimators,
            max_features=max_features,
        )
        clf.fit(train_x, train_y.idxmax(axis=1))
    
        return clf
    
    
    def train_mlp_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
        """Node for training MLP model"""
        alpha = parameters["mlp_alpha"]
        max_iter = parameters["mlp_max_iter"]
    
        clf = MLPClassifier(alpha=alpha, max_iter=max_iter)
        clf.fit(train_x, train_y)
    
        return clf
    
    
    def get_predictions(
        rf_model: RandomForestClassifier,
        mlp_model: MLPClassifier,
        test_x: pd.DataFrame,
    ) -> Dict[str, Any]:
        """Node for making predictions given a pre-trained model and a test set."""
        predictions = {}
        for name, model in zip(["rf", "mlp"], [rf_model, mlp_model]):
            y_pred = model.predict_proba(test_x).tolist()
            predictions[name] = y_pred
    
        return predictions
    
    
    def evaluate_models(
        predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.new.handler.Handler
    ):
        """Node for
        - evaluating Random Forest and MLP models
        - creating ROC and Precision-Recall Curves
        """
    
        for name, y_pred in predictions.items():
            y_true = test_y.to_numpy().argmax(axis=1)
            y_pred = np.array(y_pred)
    
            accuracy = accuracy_score(y_true, y_pred.argmax(axis=1).ravel())
            neptune_run[
                f"nodes/evaluate_models/metrics/accuracy_{name}"
            ] = accuracy
    
            fig, ax = plt.subplots()
            plot_roc_curve(
                test_y.idxmax(axis=1),
                y_pred,
                ax=ax,
                title=f"ROC curve {name}",
            )
            neptune_run["nodes/evaluate_models/plots/plot_roc_curve"].log(fig)
    
            fig, ax = plt.subplots()
            plot_precision_recall_curve(
                test_y.idxmax(axis=1), y_pred, ax=ax, title=f"PR curve {name}"
            )
            neptune_run[
                "nodes/evaluate_models/plots/plot_precision_recall_curve"
            ].log(fig)
    
    
    def ensemble_models(
        predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.new.handler.Handler
    ) -> np.ndarray:
        """Node for averaging predictions of Random Forest and MLP models"""
        y_true = test_y.to_numpy().argmax(axis=1)
        y_pred_averaged = np.stack(predictions.values()).mean(axis=0)
    
        accuracy = accuracy_score(y_true, y_pred_averaged.argmax(axis=1).ravel())
        neptune_run[f"nodes/ensemble_models/metrics/accuracy_ensemble"] = accuracy
    

    Note

    In this example, we create a Kedro pipeline that trains and ensembles predictions from two models: Random Forest and MLPClassifier.

    For simplicity, we only show the Random Forest code snippets. See the full nodes.py for the MLPClassifier.

  4. Create a model prediction node in the src/KEDRO_PROJECT/pipelines/data_science/nodes.py file.

    This node should output a dictionary with predictions for two models: Random Forest and MLPClassifier.

    def get_predictions(
        rf_model: RandomForestClassifier,
        mlp_model: MLPClassifier,
        test_x: pd.DataFrame,
    ):
        """Node for making predictions given a pre-trained model and a test set."""
        predictions = {}
        for name, model in zip(["rf", "mlp"], [rf_model, mlp_model]):
            y_pred = model.predict_proba(test_x).tolist()
            predictions[name] = y_pred
    
        return predictions
    

    """Example code for the nodes in the example pipeline. This code is meant
    just for illustrating basic Kedro features.
    
    Delete this when you start working on your own Kedro project.
    """
    # pylint: disable=invalid-name
    
    import logging
    from typing import Any, Dict
    
    import matplotlib.pyplot as plt
    import numpy as np
    import pandas as pd
    from scikitplot.metrics import plot_precision_recall_curve, plot_roc_curve
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    from sklearn.neural_network import MLPClassifier
    
    import neptune.new as neptune
    
    
    def train_rf_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
        """Node for training Random Forest model"""
        max_depth = parameters["rf_max_depth"]
        n_estimators = parameters["rf_n_estimators"]
        max_features = parameters["rf_max_features"]
    
        clf = RandomForestClassifier(
            max_depth=max_depth,
            n_estimators=n_estimators,
            max_features=max_features,
        )
        clf.fit(train_x, train_y.idxmax(axis=1))
    
        return clf
    
    
    def train_mlp_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
        """Node for training MLP model"""
        alpha = parameters["mlp_alpha"]
        max_iter = parameters["mlp_max_iter"]
    
        clf = MLPClassifier(alpha=alpha, max_iter=max_iter)
        clf.fit(train_x, train_y)
    
        return clf
    
    
    def get_predictions(
        rf_model: RandomForestClassifier,
        mlp_model: MLPClassifier,
        test_x: pd.DataFrame,
    ) -> Dict[str, Any]:
        """Node for making predictions given a pre-trained model and a test set."""
        predictions = {}
        for name, model in zip(["rf", "mlp"], [rf_model, mlp_model]):
            y_pred = model.predict_proba(test_x).tolist()
            predictions[name] = y_pred
    
        return predictions
    
    
    def evaluate_models(
        predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.new.handler.Handler
    ):
        """Node for
        - evaluating Random Forest and MLP models
        - creating ROC and Precision-Recall Curves
        """
    
        for name, y_pred in predictions.items():
            y_true = test_y.to_numpy().argmax(axis=1)
            y_pred = np.array(y_pred)
    
            accuracy = accuracy_score(y_true, y_pred.argmax(axis=1).ravel())
            neptune_run[
                f"nodes/evaluate_models/metrics/accuracy_{name}"
            ] = accuracy
    
            fig, ax = plt.subplots()
            plot_roc_curve(
                test_y.idxmax(axis=1), y_pred, ax=ax, title=f"ROC curve {name}"
            )
            neptune_run["nodes/evaluate_models/plots/plot_roc_curve"].log(fig)
    
            fig, ax = plt.subplots()
            plot_precision_recall_curve(
                test_y.idxmax(axis=1), y_pred, ax=ax, title=f"PR curve {name}"
            )
            neptune_run[
                "nodes/evaluate_models/plots/plot_precision_recall_curve"
            ].log(fig)
    
    
    def ensemble_models(
        predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.new.handler.Handler
    ) -> np.ndarray:
        """Node for averaging predictions of Random Forest and MLP models"""
        y_true = test_y.to_numpy().argmax(axis=1)
        y_pred_averaged = np.stack(predictions.values()).mean(axis=0)
    
        accuracy = accuracy_score(y_true, y_pred_averaged.argmax(axis=1).ravel())
        neptune_run[f"nodes/ensemble_models/metrics/accuracy_ensemble"] = accuracy
    

  5. Towards the top of the nodes.py file, import Neptune:

    nodes.py
    import logging
    import matplotlib.pyplot as plt
    import neptune.new as neptune
    import numpy as np
    import pandas as pd
    from scikitplot.metrics import plot_roc_curve, plot_precision_recall_curve
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    from sklearn.neural_network import MLPClassifier
    from typing import Any, Dict
    
    
    def train_rf_model(
    ...
    

  6. Add the neptune_run argument of type neptune.new.handler.Handler to the evaluate_models() function:

    nodes.py
    def evaluate_models(
        predictions: dict,
        test_y: pd.DataFrame,
        neptune_run: neptune.new.handler.Handler,
    ):
    ...
    

    Tip

    You can treat neptune_run like a normal Neptune run and log metadata to it as you normally would.

    You must use the special string neptune_run as the run handler in Kedro pipelines.

  7. Calculate and log the accuracy to the nodes/evaluate_models/metrics/accuracy_{model_name} namespace:

    nodes.py
    def evaluate_models(
        predictions: dict,
        test_y: pd.DataFrame,
        neptune_run: neptune.new.handler.Handler,
    ):
        ...
    
        for name, y_pred in predictions.items():
            y_true = test_y.to_numpy().argmax(axis=1)
            y_pred = np.array(y_pred)
    
            accuracy = accuracy_score(y_true, y_pred.argmax(axis=1).ravel())
            neptune_run[
                f"nodes/evaluate_models/metrics/accuracy_{name}"
            ] = accuracy
    

    nodes.py
    """Example code for the nodes in the example pipeline. This code is meant
    just for illustrating basic Kedro features.
    
    Delete this when you start working on your own Kedro project.
    """
    # pylint: disable=invalid-name
    
    import logging
    from typing import Any, Dict
    
    import matplotlib.pyplot as plt
    import numpy as np
    import pandas as pd
    from scikitplot.metrics import plot_precision_recall_curve, plot_roc_curve
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    from sklearn.neural_network import MLPClassifier
    
    import neptune.new as neptune
    
    
    def train_rf_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
        """Node for training Random Forest model"""
        max_depth = parameters["rf_max_depth"]
        n_estimators = parameters["rf_n_estimators"]
        max_features = parameters["rf_max_features"]
    
        clf = RandomForestClassifier(
            max_depth=max_depth,
            n_estimators=n_estimators,
            max_features=max_features,
        )
        clf.fit(train_x, train_y.idxmax(axis=1))
    
        return clf
    
    
    def train_mlp_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
        """Node for training MLP model"""
        alpha = parameters["mlp_alpha"]
        max_iter = parameters["mlp_max_iter"]
    
        clf = MLPClassifier(alpha=alpha, max_iter=max_iter)
        clf.fit(train_x, train_y)
    
        return clf
    
    
    def get_predictions(
        rf_model: RandomForestClassifier,
        mlp_model: MLPClassifier,
        test_x: pd.DataFrame,
    ) -> Dict[str, Any]:
        """Node for making predictions given a pre-trained model and a test set."""
        predictions = {}
        for name, model in zip(["rf", "mlp"], [rf_model, mlp_model]):
            y_pred = model.predict_proba(test_x).tolist()
            predictions[name] = y_pred
    
        return predictions
    
    
    def evaluate_models(
        predictions: dict,
        test_y: pd.DataFrame,
        neptune_run: neptune.new.handler.Handler,
    ):
        """Node for
        - evaluating Random Forest and MLP models
        - creating ROC and Precision-Recall Curves
        """
    
        for name, y_pred in predictions.items():
            y_true = test_y.to_numpy().argmax(axis=1)
            y_pred = np.array(y_pred)
    
            accuracy = accuracy_score(y_true, y_pred.argmax(axis=1).ravel())
            neptune_run[
                f"nodes/evaluate_models/metrics/accuracy_{name}"
            ] = accuracy
    
            fig, ax = plt.subplots()
            plot_roc_curve(
                test_y.idxmax(axis=1), y_pred, ax=ax, title=f"ROC curve {name}"
            )
            neptune_run["nodes/evaluate_models/plots/plot_roc_curve"].log(fig)
    
            fig, ax = plt.subplots()
            plot_precision_recall_curve(
                test_y.idxmax(axis=1), y_pred, ax=ax, title=f"PR curve {name}"
            )
            neptune_run[
                "nodes/evaluate_models/plots/plot_precision_recall_curve"
            ].log(fig)
    
    
    def ensemble_models(
        predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.new.handler.Handler
    ) -> np.ndarray:
        """Node for averaging predictions of Random Forest and MLP models"""
        y_true = test_y.to_numpy().argmax(axis=1)
        y_pred_averaged = np.stack(predictions.values()).mean(axis=0)
    
        accuracy = accuracy_score(y_true, y_pred_averaged.argmax(axis=1).ravel())
        neptune_run[f"nodes/ensemble_models/metrics/accuracy_ensemble"] = accuracy
    

    The example nodes.py script is now ready.

Adding the run to the pipeline#

Next, we'll add the Neptune run handler to the Kedro pipeline.

  1. Go to a pipeline definition, such as src/KEDRO_PROJECT/pipelines/data_science/pipelines.py.
  2. Add the neptune_run handler as an input to the evaluate_models node.
pipelines.py
node(
    evaluate_models,
    dict(
        predictions="predictions",
        test_y="example_test_y",
        neptune_run="neptune_run",
    ),
    None,
    name="evaluate_models",
),
pipelines.py
"""Example code for the nodes in the example pipeline. This code is meant
just for illustrating basic Kedro features.

Delete this when you start working on your own Kedro project.
"""

from kedro.pipeline import Pipeline, node

from .nodes import predict, report_accuracy, train_model


def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                train_model,
                ["example_train_x", "example_train_y", "parameters"],
                "example_model",
                name="train",
            ),
            node(
                predict,
                dict(model="example_model", test_x="example_test_x"),
                "example_predictions",
                name="predict",
            ),
            node(
                evaluate_models,
                ["example_predictions", "example_test_y", "neptune_run"],
                None,
                name="evaluate_models",
            ),
        ]
    )

Executing the training runs#

To have a few different runs to compare, run the training with different parameters and dataset versions.

  1. In the conf/base/parameters.yml file, change some model training hyperparameters:

    # Random forest parameters
    rf_max_depth: 2
    rf_max_features: 4
    rf_n_estimators: 25
    
    # MLP parameters
    mlp_alpha: 0.01
    mlp_max_iter: 50
    
  2. In the conf/base/catalog.yml file, change the training dataset version:

        example_iris_data:
        type: pandas.CSVDataSet
        # filepath: data/01_raw/iris.csv
        filepath: data/01_raw/iris_v2.csv
    
  3. On the command line, execute your Kedro pipeline:

    kedro run
    

Repeat the above steps a few more times.

Comparing pipeline executions#

We can now compare the Kedro pipeline executions in the Neptune app.

  1. In the Neptune project where you've logged your Kedro pipeline runs, click the Runs tab.
  2. In the runs table, click Add column.
  3. Add the following fields as columns in the table:

    • parameters from the kedro/catalog/parameters/ namespace
    • metrics from the kedro/nodes/evaluate_models/metrics/ namespace
    • dataset versions from the kedro/catalog/datasets/example_iris_data/ namespace

    Tip

    • To customize the name and color of a column, click the settings icon ().
    • To save your view for later, click Save view as new above the table.

    See table view in Neptune 

  4. To select a few runs to compare, at the left edge of the table, click the eye icon ().

  5. In the left pane, select Compare runsSide-by-side.
  6. To explore the differences between the runs, check the Rows with diff only and Show cell changes boxes.

    See comparison view in Neptune 

  7. To explore the runs in groups – for example, by dataset version used for the run – return to the runs table and click Group by.

    Type a field name to group the runs by.

    See grouped runs in Neptune 

See example dashboard in Neptune  Code examples