Pipelines#

The GeoAnalytics Canada Pipeline system helps with developing and building portable, scalable Earth Observation pre-processing pipelines and machine learning (ML) workflows based on Docker containers. The underlying system uses Argo Workflows and the Argo Project documentation can be useful in debugging/expanding the pipelines functionality.

The Pipelines platform consists of:

  • A UI for managing and tracking pipelines and their execution.

  • An engine for scheduling a pipeline’s execution

  • An SDK for defining, building, and deploying pipelines in Python.

A pipeline is a representation of a workflow containing the parameters required to run the workflow and the inputs and outputs of each component. Each pipeline component is a self-contained code block, packaged as a Docker image.

A Workflow can also leverage GEOAnalytics Canada’s Cloud Storage as an Artifact store to share artifacts between Tasks/Steps.

[8]:
# Import the required libraries
import os

from hera.shared import global_config
from hera.workflows import Container, Workflow, Steps

Configuration#

Setting some global values within the notebook helps keep the workflow easier to update to different Container Registries, Containers, and Images.

[9]:
# CONFIG
# -------------------------
CR_URL="someregistry.domain.com"
IMG_LABEL="repository/imagename"
TAG_LABEL="0.1.0"
IMG_TAG=f"{CR_URL}/{IMG_LABEL}:{TAG_LABEL}"
# -------------------------

Setting Up The Workflow#

The next Cell implements a template of how a single-step Workflow would be implemented. This Workflow is created with a single Container which is then executed during the Steps procedure within the Workflow.

[ ]:
global_config.api_version = "argoproj.io/v1"
global_config.host = os.getenv('WORKFLOW_HOST')

with Workflow(
    name='unique-name-of-workflow', # Must be unique and lowercase
    namespace=os.getenv('WORKFLOW_NS'), # Namespace to run workflow in <- Preconfigured in GEOAnalytics Canada
    entrypoint='name-of-entry-task-step', # The name of the entrypoint task/step
    parallelism=1, # Number of tasks to run in parallel
) as w:

    # This section defines a template that can be used in multiple steps
    t = Container(
    name='unique-container-name',
    image=f'{IMG_TAG}',
    command=["sh", "./entrypoint.sh"], #
)

    # This section defines the entrypoint task/step
    # Here you can organise the flow of your workflow
    with Steps(name="name-of-step-template"):
        t(name="name-of-task-step")

Note
Contact your GEOAnalytics Canada Administrator for more information about available resources.

Here is an alternative approach to writing Workflows - It is recommended to use a context-manager to ensure objects are properly gc’d. This example demonstrates how to use a Python function as an input to a Task instead of a Docker Image. However, you will need to provide a base Docker Image to act as the environment for your source code to run in.

[ ]:

from hera.env import Env from hera.workflow import Workflow, Task w = Workflow( name='unique-name-of-workflow', namespace=os.getenv('WORKFLOW_NS'), entrypoint='source-task', parallelism=10, # Number of tasks to run in parallel ) def some_func(): import os #do something print(os.getenv('TASKSAY')) # Environment Variables for running source/container env_list = [ Env(name='SOME_ENV', value='SOME_VAL'), Env(name='TASKSAY', value='Workflows Are Powerful!') ] # Task using a function as input t1 = Task( name='source-task', image='registry.eo4ph.geoanalytics.ca/project-name/image-name:image-tag', source=some_func, env=env_list ) # Task using a prebuilt Docker Image running some Python application t2 = Task( name='container-task', image='registry.eo4ph.geoanalytics.ca/project-name/image-name:image-tag' command=['/bin/bash', '-c', 'python run.py'], env=env_list ) w.add_task(t1) # Add source-task to workflow w.add_task(t2) # Add container-task to workflow t1 >> t2 # DAG - make t1 run before t2

Submitting A Workflow to Pipelines#

All that is left to do to submit your workflow, is to run the .create() method on the workflow object. This uses the GEOAnalytics Canada preconfigured backend settings to ensure your workflows are submitted with the correct permissions and security.

[ ]:
w.create()

Workflow Approaches#

There are different ways to organize your workflow. It will depend on the application/use-case and goal of your system that will dictate how it will end up flowing. You can leverage different mechanisms to control the flow/state and described below.

  • Steps and Parallelism

  • Artifact Passing

  • Accessing the GEOAnalytics Shared Data

Steps and Parallelism#

The following workflow highlights a strength of using Pipelines - parallelism. Concurrently executing decoupled or independent tasks is possible by using the Steps.parallel() method. The following workflow will first execute A and then run B and C at the same time. Without using Steps.parallel(), the Steps would then be executed sequentially.

[ ]:
# The following source was used to create the below code:
# https://hera.readthedocs.io/en/latest/examples/workflows/steps_with_callable_container/

import os

from hera.shared import global_config
from hera.workflows import Container, Parameter, Workflow, Steps

# CONFIG
# -------------------------
CR_URL="docker.io"
IMG_LABEL="docker/whalesay"
TAG_LABEL="latest"
IMG_TAG=f"{CR_URL}/{IMG_LABEL}:{TAG_LABEL}"
# -------------------------

global_config.api_version = "argoproj.io/v1"
global_config.host = os.getenv('WORKFLOW_HOST')

with Workflow(
    name='step-parallel',
    namespace=os.getenv('WORKFLOW_NS'),
    entrypoint='workflowsteps',
    parallelism=1, # Number of tasks to run in parallel
) as w:
    container_task = Container(
        name='whalesay-geoanalytics',
        image=f'{IMG_TAG}',
        command=["cowsay"],
        inputs=[Parameter(name="message")],
        args=["{{inputs.parameters.message}}"],
    )

    with Steps(name='workflowsteps') as s:
        container_task(
            name='A',
            arguments=[Parameter(name='message', value='Hi!')]
        )

        with s.parallel():
            container_task(
                name='B',
                arguments=[Parameter(name='message', value='Hello!')]
            )
            container_task(
                name='C',
                arguments=[Parameter(name='message', value='General Kenobi!')]
            )

The above Workflow would result in a DAG that looks like this:

A preceedes B and C, which are run in parallel. An extra task, D, is included for verbosity and to show that the workflow is complete.

Artifact Passing#

It a lot of cases it’s necessary to pass an output to the next Step of a workflow. Artifacts solve this scenario. You can create an Artifact as an output from a previous Step to be consumed by a subsequent step. The Workflow below demonstrates an Artifact being created and then consumed in the next Step. An important thing to notice is that the output and input paths differ - this is to allow more flexible insertion of information in your Step implementations.

Note
Using our shared-data Blob Container as the Artifact Repository allows artifacts to be archived in an accessible location for all users on the tenant.
[ ]:
import os

from hera.shared import global_config
from hera.workflows import Container, Artifact, Workflow, Steps, Step

# CONFIG
# -------------------------
IMG_LABEL="busybox"
TAG_LABEL="1.36.0"
IMG_TAG=f"{IMG_LABEL}:{TAG_LABEL}"
# -------------------------

global_config.api_version = "argoproj.io/v1"
global_config.host = os.getenv('WORKFLOW_HOST')

with Workflow(
    name='artifact-passing',
    namespace=os.getenv('WORKFLOW_NS'),
    entrypoint='workflowsteps',
    parallelism=1, # Number of tasks to run in parallel
) as w:
    create_file = Container(
        name='creator',
        image=f'{IMG_TAG}',
        command=['/bin/sh', '-c', 'echo "Hello from Task1" >> /tmp/hello.txt'],
        outputs=[Artifact(name='hellomessage', path='/tmp/hello.txt')]
    )
    read_file = Container(
        name='reader',
        image=f'{IMG_TAG}',
        command=["cat"],
        args=["/tmp/artifact/hello.txt"],
        inputs=[Artifact(name='inputmessage', path='/tmp/artifact/hello.txt')]
    )
    with Steps(name='workflowsteps') as s:
        Step(name='createfile', template=create_file)
        Step(
            name='readfile',
            template=read_file,
            arguments=[Artifact(
                name='inputmessage',
                from_='{{steps.createfile.outputs.artifacts.hellomessage}}',
                subpath='/tmp/hello.txt'
            )]
        )

The above Workflow will generate a DAG that looks like this:

Accessing the Shared Data Bucket#

We can access the shared-data bucket found in GEOAnalytics Canada that is used as a common data storage location for each tenant group.

Using a pre-populated file read-test.txt that contains the string "Hello from the Bucket!", we can cat the file contents to stdout and observe that in the UI logs.

[ ]:
import os

from hera.shared import global_config
from hera.workflows import Container, Workflow, Steps
from hera.workflows.models import VolumeMount

# CONFIG
# -------------------------
IMG_LABEL="busybox"
TAG_LABEL="1.36.0"
IMG_TAG=f"{IMG_LABEL}:{TAG_LABEL}"
# -------------------------

global_config.api_version = "argoproj.io/v1"
global_config.host = os.getenv('WORKFLOW_HOST')

with Workflow(
    name='shared-data-access',
    namespace=os.getenv('WORKFLOW_NS'),
    entrypoint='workflowsteps',
    parallelism=1, # Number of tasks to run in parallel
) as w:
    container_task = Container(
        name='busybox-geoanalytics',
        image=f'{IMG_TAG}',
        command=["cat", "/mnt/vol/shared-data/test-read.txt"],
        volume_mounts=[
            VolumeMount(name="shared-data", mount_path="/mnt/vol"),
        ],
    )
    with Steps(name='workflowsteps') as s:
        container_task(
            name='readit'
        )

The above Workflow is similar in concept to the Artifact store except that the Cloud storage is directly mounted to the Workflow Task Container for it to access via the filesystem. This is useful for large datasets that are too large to upload or manage with the Artifact store.

You can inspect portions of your workflow by leveraging the IPython.display.JSON widget. Some parameters may be missing - there are some defaults that are intserted by the Workflow Controller that enable the workflow to execute in the GEOAnalytics Platform.

[ ]:
from IPython.display import JSON
JSON(w.to_dict())