Skip to content

How to use callbacks when sync is interrupted#

If the metadata synchronization lags too much or does not progress at all, you can enable a callback that stops the connection to the Neptune servers automatically. The callback mechanism is available in the asynchronous (default) connection mode.

There are two scenarios which may trigger the callback:

  • Nothing has been synchronized with the server for a certain duration ("no progress").
  • Operations are being synchronized, but there is a delay between queueing an operation and synchronizing it with the servers ("lag").

The callback is disabled by default. You can enable it for both or just one of the above scenarios.

Neptune provides a default callback, which you can activate with an environment variable, but you can also define your own.

Enabling the default callback#

To enable the default callback for either scenario, set the corresponding environment variable to TRUE:

Synchronization is lagging:

export NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK='TRUE'
export NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK='TRUE'
setx NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK 'TRUE'
%env NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK='TRUE'

Synchronization is not progressing at all:

export NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK='TRUE'
export NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK='TRUE'
setx NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK 'TRUE'
%env NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK='TRUE'

The callback is triggered if the threshold for the lag or lack of progress is exceeded. You can override the default thresholds with the async_lag_threshold and async_no_progress_threshold initialization parameters, respectively.

For details about the default callback function, see utils.stop_synchronization_callback().

Passing init argument instead of using environment variable

You can also enable the default callback by passing this function to the argument of either async_lag_callback or async_no_progress_callback when initializing a run or other object.

Enable default callback in case of no sync progress
import neptune
from neptune.utils import stop_synchronization_callback

run = neptune.init_run(async_no_progress_callback = stop_synchronization_callback)
Enable default callback in case of too much sync lag
import neptune
from neptune.utils import stop_synchronization_callback

run = neptune.init_run(async_lag_callback = stop_synchronization_callback)
Enable default callback in both scenarios
import neptune
from neptune.utils import stop_synchronization_callback

run = neptune.init_run(
    async_no_progress_callback = stop_synchronization_callback,
    async_lag_callback = stop_synchronization_callback,
)

Defining a custom callback#

You can also define your own callback function and pass it to any init function.

The callback function should take a Neptune object as its argument and can use the stop() method to stop it, but you can decide what the function should do.

Parameters

Name      Type Default Description
neptune_object Neptune object (Run, Project) - Neptune object to be stopped by the callback.

Examples

Define custom callback
import neptune

def my_custom_callback(run):
    print("Server synchronization is flaky. Stopping the connection in 60 seconds.")
    run.stop(seconds=60.0)
Enable custom callback in case of lag
run = neptune.init_run(
    async_lag_callback = my_custom_callback,
)
Enable custom callback in both scenarios, with default thresholds
run = neptune.init_run(
    async_no_progress_callback = my_custom_callback,
    async_lag_callback = my_custom_callback,
)

Setting thresholds#

You can set the thresholds separately for each scenario, using the corresponding initialization parameters. The threshold is expressed in seconds.

Lag#

Example: Trigger a custom callback if the delay between the queueing and syncing of an operation exceeds 2000 seconds:

run = neptune.init_run(
    async_lag_callback = my_custom_callback,
    async_lag_threshold = 2000.0,
)

No progress#

Example: Trigger the default callback if nothing has been synchronized with the server for 360 seconds at some point during the run:

from neptune.utils import stop_synchronization_callback

run = neptune.init_run(
    async_no_progress_callback = stop_synchronization_callback, # (1)!
    async_no_progress_threshold = 360.0,
)
  1. You can leave this (and the import statement) out if you've enabled the default callback with the corresponding environment variable.