Airflow integration guide#
Apache Airflow is an open source platform for batch oriented workflows. With Neptune, you can log metadata generated from different tasks of Airflow DAG runs.
You can do the following with the Neptune-Airflow integration:
- Compare model results
- Track the workflow/DAG config
- Log metadata from different tasks in one place
See example in Neptune  Code examples 
Before you start#
- Sign up at neptune.ai/register.
- Create a project for storing your metadata.
- Have Airflow installed and configured.
Setting Neptune credentials#
Save your Neptune credentials as Airflow Variables to ensure that they work for all tasks.
NEPTUNE_API_TOKEN
- Neptune API token of the account that is doing the logging. (How to find)NEPTUNE_PROJECT
- Full name of the Neptune project where the metadata should be logged (workspace-name/project-name
).
You can also set your credentials as regular environment variables, but this needs to be done on all machines. Airflow Variables will take precedence.
Passing your Neptune credentials
Once you've registered and created a project, set your Neptune API token and full project name to the NEPTUNE_API_TOKEN
and NEPTUNE_PROJECT
environment variables, respectively.
To find your API token: In the bottom-left corner of the Neptune app, expand the user menu and select Get my API token.
Your full project name has the form workspace-name/project-name
. You can copy it from the project settings: Click the
menu in the top-right →
Details & privacy.
On Windows, navigate to Settings → Edit the system environment variables, or enter the following in Command Prompt: setx SOME_NEPTUNE_VARIABLE 'some-value'
While it's not recommended especially for the API token, you can also pass your credentials in the code when initializing Neptune.
run = neptune.init_run(
project="ml-team/classification", # your full project name here
api_token="h0dHBzOi8aHR0cHM6Lkc78ghs74kl0jvYh...3Kb8", # your API token here
)
For more help, see Set Neptune credentials.
Installing the integration#
Basic logging example#
To enable Neptune logging, create a NeptuneLogger instance and use it in your task:
from neptune_airflow import NeptuneLogger
with DAG(
...
) as dag:
def your_task(**context):
logger = NeptuneLogger()
return task_results(logger, **context)
More options#
Getting the task run and logging metadata#
You can get the Neptune run of a context with the get_run_from_context()
method, then log whatever metadata you want to the returned Run
object.
with DAG(
...
) as dag:
def task(**context):
logger = NeptuneLogger()
...
with logger.get_run_from_context(
context=context, log_context=log_context
) as run:
...
run["model_checkpoint/checkpoint"].upload_files("my_model.h5")
run.sync()
run.stop()
Logging to a run namespace based on task ID#
You can get a namespace handler based on the task ID with the get_task_handler_from_context()
method.
This lets you organize metadata from different tasks to separate namespaces within the same Neptune Run
object.
from neptune.types import File
with DAG(
...
) as dag:
def task(**context):
logger = NeptuneLogger()
...
with logger.get_task_handler_from_context(
context=context, log_context=True
) as handler:
...
for image, label in zip(x_test[:10], y_test[:10]):
prediction = model.predict(image[None], verbose=0)
predicted = prediction.argmax()
desc = f"label : {label} | predicted : {predicted}"
handler["visualization/test_prediction"].append(
File.as_image(image), description=desc
)
The above example would log the predictions under <task_id>/visualization/test_prediction
inside the Neptune run.
Finding the Neptune run associated with a DAG run#
To ensure that the custom ID of the Neptune run is no longer than 36 characters, it is generated in the following way:
You can use this to find the custom Neptune ID of any DAG run based on its ID.
The custom run ID is stored in the sys/custom_run_id
field.
Related
- What you can log and display
- Neptune-Airflow API reference
- neptune-airflow repo on GitHub
- Airflow repo on GitHub