Skip to content

Kedro integration guide: Compare pipeline executions#

Comparing and grouping pipeline runs in the Neptune runs table

You can compare metrics, parameters, dataset versions, and other metadata from Kedro pipelines.

This guide shows how to:

  • Log data versions, parameters, and metrics for every Kedro pipeline execution.
  • See the diff between Kedro pipelines executions in Neptune app.
  • Group Kedro pipeline executions by dataset versions and compare them.

See example in Neptune  Code examples 

Before you start#

Preparing the training runs#

Setting up the scripts#

In this section, we'll set up the Kedro nodes and add Neptune logging to the code.

  1. To log the model training parameters to Neptune automatically, define them in the conf/base/parameters.yml file:

    parameters.yml
    # Random forest parameters
    rf_max_depth: 3
    rf_max_features: 3
    rf_n_estimators: 25
    
    parameters.yml
    train_fraction: 0.8
    random_state: 3
    target_column: species
    
    # Random forest parameters
    rf_max_depth: 2
    rf_max_features: 4
    rf_n_estimators: 25
    
    # MLP parameters
    mlp_alpha: 0.01
    mlp_max_iter: 50
    
  2. To log the training datasets to Neptune automatically, define them in conf/base/catalog.yml or other catalog files:

    catalog.yml
    example_iris_data:
        type: pandas.CSVDataSet
        filepath: data/01_raw/iris.csv
    
    catalog.yml
    # Here you can define all your data sets by using simple YAML syntax.
    #
    # Documentation for this file format can be found in "The Data Catalog"
    # Link: https://kedro.readthedocs.io/en/stable/data/data_catalog.html
    #
    # We support interacting with a variety of data stores including local file systems, cloud, network and HDFS
    #
    # An example data set definition can look as follows:
    #
    #bikes:
    #  type: pandas.CSVDataSet
    #  filepath: "data/01_raw/bikes.csv"
    #
    #weather:
    #  type: spark.SparkDataSet
    #  filepath: s3a://your_bucket/data/01_raw/weather*
    #  file_format: csv
    #  credentials: dev_s3
    #  load_args:
    #    header: True
    #    inferSchema: True
    #  save_args:
    #    sep: '|'
    #    header: True
    #
    #scooters:
    #  type: pandas.SQLTableDataSet
    #  credentials: scooters_credentials
    #  table_name: scooters
    #  load_args:
    #    index_col: ['name']
    #    columns: ['name', 'gear']
    #  save_args:
    #    if_exists: 'replace'
    #    # if_exists: 'fail'
    #    # if_exists: 'append'
    #
    # The Data Catalog supports being able to reference the same file using two different DataSet implementations
    # (transcoding), templating and a way to reuse arguments that are frequently repeated. See more here:
    # https://kedro.readthedocs.io/en/stable/data/data_catalog.html
    #
    # This is a data set used by the "Hello World" example pipeline provided with the project
    # template. Please feel free to remove it once you remove the example pipeline.
    
    example_iris_data:
    type: pandas.CSVDataSet
    filepath: data/01_raw/iris.csv
    
    rf_model:
    type: kedro.extras.datasets.pickle.PickleDataSet
    filepath: data/06_models/rf_model.pkl
    
    mlp_model:
    type: kedro.extras.datasets.pickle.PickleDataSet
    filepath: data/06_models/mlp_model.pkl
    
    predictions:
    type: kedro.extras.datasets.json.JSONDataSet
    filepath: data/07_model_output/predictions.json
    
    predictions@neptune:
    type: kedro_neptune.NeptuneFileDataSet
    filepath: data/07_model_output/predictions.json
    
  3. Create a model training node in the src/KEDRO_PROJECT/nodes.py file.

    Use the parameters you defined in the conf/base/parameters.yml file. The node should output a trained model.

    nodes.py
    from sklearn.ensemble import RandomForestClassifier
    ...
    
    def train_rf_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
    
        max_depth = parameters["rf_max_depth"]
        n_estimators = parameters["rf_n_estimators"]
        max_features = parameters["rf_max_features"]
    
        clf = RandomForestClassifier(
            max_depth=max_depth,
            n_estimators=n_estimators,
            max_features=max_features,
        )
        clf.fit(train_x, train_y)
    
        return clf
    

    nodes.py
    """
    This is a boilerplate pipeline
    generated using Kedro 0.18.4
    """
    
    from typing import Any, Dict, Tuple
    
    import matplotlib.pyplot as plt
    import neptune
    import numpy as np
    import pandas as pd
    from scikitplot.metrics import plot_precision_recall, plot_roc
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    from sklearn.neural_network import MLPClassifier
    
    
    def split_data(
        data: pd.DataFrame, parameters: Dict[str, Any]
    ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
        """Splits data into features and target training and test sets.
    
        Args:
            data: Data containing features and target.
            parameters: Parameters defined in parameters.yml.
        Returns:
            Split data.
        """
    
        data_train = data.sample(
            frac=parameters["train_fraction"], random_state=parameters["random_state"]
        )
        data_test = data.drop(data_train.index)
    
        X_train = data_train.drop(columns=parameters["target_column"])
        X_test = data_test.drop(columns=parameters["target_column"])
        y_train = data_train[parameters["target_column"]]
        y_test = data_test[parameters["target_column"]]
    
        return X_train, X_test, y_train, y_test
    
    
    def train_rf_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
    
        max_depth = parameters["rf_max_depth"]
        n_estimators = parameters["rf_n_estimators"]
        max_features = parameters["rf_max_features"]
    
        clf = RandomForestClassifier(
            max_depth=max_depth,
            n_estimators=n_estimators,
            max_features=max_features,
        )
        clf.fit(train_x, train_y)
    
        return clf
    
    
    def train_mlp_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
        """Node for training MLP model"""
        alpha = parameters["mlp_alpha"]
        max_iter = parameters["mlp_max_iter"]
    
        clf = MLPClassifier(alpha=alpha, max_iter=max_iter)
        clf.fit(train_x, train_y)
    
        return clf
    
    
    def get_predictions(
        rf_model: RandomForestClassifier,
        mlp_model: MLPClassifier,
        test_x: pd.DataFrame
    ) -> Dict[str, Any]:
        """Node for making predictions given a pre-trained model and a test set."""
        predictions = {}
        for name, model in zip(["rf", "mlp"], [rf_model, mlp_model]):
            y_pred = model.predict_proba(test_x).tolist()
            predictions[name] = y_pred
    
        return predictions
    
    
    def evaluate_models(
        predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.handler.Handler
    ):
        """Node for
        - evaluating Random Forest and MLP models
        - creating ROC and Precision-Recall Curves
        """
    
        for name, y_pred_proba in predictions.items():
    
            y_true = test_y.to_numpy()
            y_pred_proba = np.array(y_pred_proba)
            y_pred = np.argmax(y_pred_proba, axis=1)
            y_pred = np.where(
                y_pred == 0, "setosa",
                np.where(y_pred == 1, "versicolor", "virginica"),
            )
    
            accuracy = accuracy_score(y_true, y_pred)
            neptune_run[f"nodes/evaluate_models/metrics/accuracy_{name}"] = accuracy
    

    Note

    In this example, we create a Kedro pipeline that trains and ensembles predictions from two models: Random Forest and MLPClassifier.

    For simplicity, we only show the Random Forest code snippets. See the full nodes.py for the MLPClassifier.

  4. Create a model prediction node in the src/KEDRO_PROJECT/nodes.py file.

    This node should output a dictionary with predictions for two models: Random Forest and MLPClassifier.

    nodes.py
    def get_predictions(
        rf_model: RandomForestClassifier,
        mlp_model: MLPClassifier,
        test_x: pd.DataFrame
    ) -> Dict[str, Any]:
        """Node for making predictions given a pre-trained model and a test set."""
        predictions = {}
        for name, model in zip(["rf", "mlp"], [rf_model, mlp_model]):
            y_pred = model.predict_proba(test_x).tolist()
            predictions[name] = y_pred
    
        return predictions
    

    nodes.py
    """
    This is a boilerplate pipeline
    generated using Kedro 0.18.4
    """
    
    from typing import Any, Dict, Tuple
    
    import matplotlib.pyplot as plt
    import neptune
    import numpy as np
    import pandas as pd
    from scikitplot.metrics import plot_precision_recall, plot_roc
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    from sklearn.neural_network import MLPClassifier
    
    
    def split_data(
        data: pd.DataFrame, parameters: Dict[str, Any]
    ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
        """Splits data into features and target training and test sets.
    
        Args:
            data: Data containing features and target.
            parameters: Parameters defined in parameters.yml.
        Returns:
            Split data.
        """
    
        data_train = data.sample(
            frac=parameters["train_fraction"], random_state=parameters["random_state"]
        )
        data_test = data.drop(data_train.index)
    
        X_train = data_train.drop(columns=parameters["target_column"])
        X_test = data_test.drop(columns=parameters["target_column"])
        y_train = data_train[parameters["target_column"]]
        y_test = data_test[parameters["target_column"]]
    
        return X_train, X_test, y_train, y_test
    
    
    def train_rf_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
    
        max_depth = parameters["rf_max_depth"]
        n_estimators = parameters["rf_n_estimators"]
        max_features = parameters["rf_max_features"]
    
        clf = RandomForestClassifier(
            max_depth=max_depth,
            n_estimators=n_estimators,
            max_features=max_features,
        )
        clf.fit(train_x, train_y)
    
        return clf
    
    
    def train_mlp_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
        """Node for training MLP model"""
        alpha = parameters["mlp_alpha"]
        max_iter = parameters["mlp_max_iter"]
    
        clf = MLPClassifier(alpha=alpha, max_iter=max_iter)
        clf.fit(train_x, train_y)
    
        return clf
    
    
    def get_predictions(
        rf_model: RandomForestClassifier,
        mlp_model: MLPClassifier,
        test_x: pd.DataFrame
    ) -> Dict[str, Any]:
        """Node for making predictions given a pre-trained model and a test set."""
        predictions = {}
        for name, model in zip(["rf", "mlp"], [rf_model, mlp_model]):
            y_pred = model.predict_proba(test_x).tolist()
            predictions[name] = y_pred
    
        return predictions
    
    
    def evaluate_models(
        predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.handler.Handler
    ):
        """Node for
        - evaluating Random Forest and MLP models
        - creating ROC and Precision-Recall Curves
        """
    
        for name, y_pred_proba in predictions.items():
    
            y_true = test_y.to_numpy()
            y_pred_proba = np.array(y_pred_proba)
            y_pred = np.argmax(y_pred_proba, axis=1)
            y_pred = np.where(
                y_pred == 0, "setosa",
                np.where(y_pred == 1, "versicolor", "virginica"),
            )
    
            accuracy = accuracy_score(y_true, y_pred)
            neptune_run[f"nodes/evaluate_models/metrics/accuracy_{name}"] = accuracy
    

  5. Import Neptune towards the top of the nodes.py file:

    nodes.py
    """
    This is a boilerplate pipeline
    generated using Kedro 0.18.4
    """
    
    from typing import Any, Dict, Tuple
    
    import matplotlib.pyplot as plt
    import neptune
    import numpy as np
    import pandas as pd
    from scikitplot.metrics import plot_precision_recall, plot_roc
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    from sklearn.neural_network import MLPClassifier
    
    
    def split_data(
    ...
    
  6. Create a model evaluation node in the src/KEDRO_PROJECT/nodes.py file.

    nodes.py
    def evaluate_models(
        predictions: dict,
        test_y: pd.DataFrame,
        neptune_run: neptune.handler.Handler):
        """Node for
        - evaluating Random Forest and MLP models
        - creating ROC and Precision-Recall Curves
        """
        ...
    

    Tip

    You can treat neptune_run like a normal Neptune run and log metadata to it as you normally would.

    You must use the special string neptune_run as the run handler in Kedro pipelines.

  7. Calculate and log the accuracy to the nodes/evaluate_models/metrics/accuracy_{model_name} namespace:

    nodes.py
    from sklearn.metrics import accuracy_score
    ...
    
    def evaluate_models(
        predictions: dict,
        test_y: pd.DataFrame,
        neptune_run: neptune.handler.Handler):
        """Node for
        - evaluating Random Forest and MLP models
        - creating ROC and Precision-Recall Curves
        """
    
        for name, y_pred_proba in predictions.items():
    
            y_true = test_y.to_numpy()
            y_pred_proba = np.array(y_pred_proba)
            y_pred = np.argmax(y_pred_proba, axis=1)
            y_pred = np.where(
                y_pred == 0,
                "setosa",
                np.where(y_pred == 1, "versicolor", "virginica"),
            )
    
            accuracy = accuracy_score(y_true, y_pred)
            neptune_run[f"nodes/evaluate_models/metrics/accuracy_{name}"] = accuracy
    

    nodes.py
    """
    This is a boilerplate pipeline
    generated using Kedro 0.18.4
    """
    
    from typing import Any, Dict, Tuple
    
    import matplotlib.pyplot as plt
    import neptune
    import numpy as np
    import pandas as pd
    from scikitplot.metrics import plot_precision_recall, plot_roc
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    from sklearn.neural_network import MLPClassifier
    
    
    def split_data(
        data: pd.DataFrame, parameters: Dict[str, Any]
    ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
        """Splits data into features and target training and test sets.
    
        Args:
            data: Data containing features and target.
            parameters: Parameters defined in parameters.yml.
        Returns:
            Split data.
        """
    
        data_train = data.sample(
            frac=parameters["train_fraction"], random_state=parameters["random_state"]
        )
        data_test = data.drop(data_train.index)
    
        X_train = data_train.drop(columns=parameters["target_column"])
        X_test = data_test.drop(columns=parameters["target_column"])
        y_train = data_train[parameters["target_column"]]
        y_test = data_test[parameters["target_column"]]
    
        return X_train, X_test, y_train, y_test
    
    
    def train_rf_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
    
        max_depth = parameters["rf_max_depth"]
        n_estimators = parameters["rf_n_estimators"]
        max_features = parameters["rf_max_features"]
    
        clf = RandomForestClassifier(
            max_depth=max_depth,
            n_estimators=n_estimators,
            max_features=max_features,
        )
        clf.fit(train_x, train_y)
    
        return clf
    
    
    def train_mlp_model(
        train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
    ):
        """Node for training MLP model"""
        alpha = parameters["mlp_alpha"]
        max_iter = parameters["mlp_max_iter"]
    
        clf = MLPClassifier(alpha=alpha, max_iter=max_iter)
        clf.fit(train_x, train_y)
    
        return clf
    
    
    def get_predictions(
        rf_model: RandomForestClassifier,
        mlp_model: MLPClassifier,
        test_x: pd.DataFrame
    ) -> Dict[str, Any]:
        """Node for making predictions given a pre-trained model and a test set."""
        predictions = {}
        for name, model in zip(["rf", "mlp"], [rf_model, mlp_model]):
            y_pred = model.predict_proba(test_x).tolist()
            predictions[name] = y_pred
    
        return predictions
    
    
    def evaluate_models(
        predictions: dict, test_y: pd.DataFrame, neptune_run: neptune.handler.Handler
    ):
        """Node for
        - evaluating Random Forest and MLP models
        - creating ROC and Precision-Recall Curves
        """
    
        for name, y_pred_proba in predictions.items():
    
            y_true = test_y.to_numpy()
            y_pred_proba = np.array(y_pred_proba)
            y_pred = np.argmax(y_pred_proba, axis=1)
            y_pred = np.where(
                y_pred == 0, "setosa",
                np.where(y_pred == 1, "versicolor", "virginica"),
            )
    
            accuracy = accuracy_score(y_true, y_pred)
            neptune_run[f"nodes/evaluate_models/metrics/accuracy_{name}"] = accuracy
    

    The example nodes.py script is now ready.

Adding the run to the pipeline#

Next, we'll add the Neptune run handler to the Kedro pipeline.

  1. Go to a pipeline definition, such as src/KEDRO_PROJECT/pipeline.py.
  2. Add nodes to train the RF and MLP models, get predictions, and evaluate the models. Add "neptune_run" as an input to the evaluate_models node.
pipeline.py
from .nodes import (
    ...,
    evaluate_models,
    get_predictions,
    train_mlp_model,
    train_rf_model,
)

...
    node(
        func=train_rf_model,
        inputs=["X_train", "y_train", "parameters"],
        outputs="rf_model",
        name="train_rf_model",
    ),
    node(
        func=train_mlp_model,
        inputs=["X_train", "y_train", "parameters"],
        outputs="mlp_model",
        name="train_mlp_model",
    ),
    node(
        func=get_predictions,
        inputs=["rf_model", "mlp_model", "X_test"],
        outputs="predictions",
        name="get_predictions",
    ),
    node(
        func=evaluate_models,
        inputs=["predictions", "y_test", "neptune_run"],
        outputs=None,
        name="evaluate_models",
    ),
...
pipeline.py
"""
This is a boilerplate pipeline
generated using Kedro 0.18.4
"""

from kedro.pipeline import Pipeline, node, pipeline

from .nodes import (
    evaluate_models,
    get_predictions,
    split_data,
    train_mlp_model,
    train_rf_model,
)


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=split_data,
                inputs=["example_iris_data", "parameters"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split",
            ),
            node(
                func=train_rf_model,
                inputs=["X_train", "y_train", "parameters"],
                outputs="rf_model",
                name="train_rf_model",
            ),
            node(
                func=train_mlp_model,
                inputs=["X_train", "y_train", "parameters"],
                outputs="mlp_model",
                name="train_mlp_model",
            ),
            node(
                func=get_predictions,
                inputs=["rf_model", "mlp_model", "X_test"],
                outputs="predictions",
                name="get_predictions",
            ),
            node(
                func=evaluate_models,
                inputs=["predictions", "y_test", "neptune_run"],
                outputs=None,
                name="evaluate_models",
            ),
        ]
    )

Executing the training runs#

To have a few different runs to compare, run the training with different parameters and dataset versions.

  1. In the conf/base/parameters.yml file, change some model training hyperparameters:

    # Random forest parameters
    rf_max_depth: 2
    rf_max_features: 4
    rf_n_estimators: 25
    
    # MLP parameters
    mlp_alpha: 0.01
    mlp_max_iter: 50
    
  2. In the conf/base/catalog.yml file, change the training dataset version (rename the existing file to ensure that the path is valid):

    example_iris_data:
        type: pandas.CSVDataSet
        # filepath: data/01_raw/iris.csv
        filepath: data/01_raw/iris_v2.csv
    
  3. On the command line, execute your Kedro pipeline:

    kedro run
    

Repeat the above steps a few more times.

Comparing pipeline executions#

We can now compare the Kedro pipeline executions in the Neptune app.

  1. Navigate to the Neptune project where you've logged your Kedro pipeline runs.
  2. In the runs table, click Add column.
  3. Add the following fields as columns in the table:

    • parameters from the kedro/catalog/parameters/ namespace
    • metrics from the kedro/nodes/evaluate_models/metrics/ namespace
    • dataset path from the kedro/catalog/datasets/example_iris_data/filepath namespace

    Tip

    • To customize the name and color of a column, click the settings icon ().
    • To save your view for later, click Save view as new above the table.

    See table view in Neptune 

  4. To select a few runs to compare, at the left edge of the table, click the eye icon ().

  5. Select Compare runsSide-by-side.
  6. To explore the differences between the runs, check the Rows with diff only and Show cell changes boxes.

    See comparison view in Neptune 

  7. To explore the runs in groups – for example, by dataset version used for the run – select Group by.

    Type a field name to group the runs by.

    See grouped runs in Neptune 

See example dashboard in Neptune  Code examples