Pipelines#

The GeoAnalytics Canada Pipeline system helps with developing and building portable, scalable Earth Observation pre-processing pipelines and machine learning (ML) workflows based on Docker containers. The underlying system uses Argo Workflows and the Argo Project documentation can be useful in debugging/expanding the pipelines functionality.

The Pipelines platform consists of:

  • A UI for managing and tracking pipelines and their execution.

  • An engine for scheduling a pipeline’s execution

  • An SDK for defining, building, and deploying pipelines in Python.

  • The SDK we use is the

  • Hera python library.

A pipeline is a representation of a workflow containing the parameters required to run the workflow and the inputs and outputs of each component. Each pipeline component is a self-contained code block, packaged as a Docker image.

A Workflow can also leverage GEOAnalytics Canada’s Cloud Storage as an Artifact store to share artifacts between Tasks/Steps.

[8]:
# Import the required libraries

import os

from hera.shared import global_config
from hera.workflows import Container, Workflow, Steps

Configuration#

Setting some global values within the notebook helps keep the workflow easier to update to different Container Registries, Containers, and Images.

[9]:
# CONFIG
# -------------------------
CR_URL="someregistry.domain.com"
IMG_LABEL="repository/imagename"
TAG_LABEL="0.1.0"
IMG_TAG=f"{CR_URL}/{IMG_LABEL}:{TAG_LABEL}"
# -------------------------

Setting Up The Workflow#

The next Cell implements a template of how a single-step Workflow would be implemented. This Workflow is created with a single Container which is then executed during the Steps procedure within the Workflow.

[10]:
global_config.api_version = "argoproj.io/v1"
global_config.host = os.getenv('WORKFLOW_HOST')

with Workflow(
    name='nameofworkflow', # Must be lowercase
    namespace=os.getenv('WORKFLOW_NS'),
    entrypoint='name-of-entry-task-step',
    parallelism=1, # Number of tasks to run in parallel
) as w:
    t = Container(
    name='unique-container-name',
    image=f'{IMG_TAG}',
    command=["sh", "./entrypoint.sh"], #
)

    with Steps(name="name-of-step-template"):
        t(name="name-of-task-step")

Submitting A Workflow to Pipelines#

All that is left to do to submit your workflow, is to run the .create() method on the workflow object. This uses the GEOAnalytics Canada preconfigured backend settings to ensure your workflows are submitted with the correct permissions and security.

[ ]:
w.create()

Workflow Approaches#

  • Steps and Parallelism

  • Artifact Passing

  • Accessing the GEOAnalytics Shared Data

Steps and Parallelism#

The following workflow highlights a strength of using Pipelines - parallelism. Concurrently executing decoupled or independent tasks is possible by using the Steps.parallel() method. The following workflow will first execute A and then run B and C at the same time. Without using Steps.parallel(), the Steps would then be executed sequentially.

[ ]:
# The following source was used to create the below code:
# https://hera.readthedocs.io/en/latest/examples/workflows/steps_with_callable_container/

import os

from hera.shared import global_config
from hera.workflows import Container, Parameter, Workflow, Steps

# CONFIG
# -------------------------
CR_URL="docker.io"
IMG_LABEL="docker/whalesay"
TAG_LABEL="latest"
IMG_TAG=f"{CR_URL}/{IMG_LABEL}:{TAG_LABEL}"
# -------------------------

global_config.api_version = "argoproj.io/v1"
global_config.host = os.getenv('WORKFLOW_HOST')

with Workflow(
    name='step-parallel',
    namespace=os.getenv('WORKFLOW_NS'),
    entrypoint='workflowsteps',
    parallelism=1, # Number of tasks to run in parallel
) as w:
    container_task = Container(
        name='whalesay-geoanalytics',
        image=f'{IMG_TAG}',
        command=["cowsay"],
        inputs=[Parameter(name="message")],
        args=["{{inputs.parameters.message}}"],
    )

    with Steps(name='workflowsteps') as s:
        container_task(
            name='A',
            arguments=[Parameter(name='message', value='Hi!')]
        )

        with s.parallel():
            container_task(
                name='B',
                arguments=[Parameter(name='message', value='Hello!')]
            )
            container_task(
                name='C',
                arguments=[Parameter(name='message', value='General Kenobi!')]
            )

Artifact Passing#

It a lot of cases it’s necessary to pass an output to the next Step of a workflow. Artifacts solve this scenario. You can create an Artifact as an ourput from a previous Step to be consumed by a subsequent step. The Workflow below demonstrates an Artifact being created and then consumed in the next Step. An important thing to notice is that the output and input paths differ - this is to allow more flexible insertion of information in your Step implementations.

Note
Using our shared-data Blob Container as the Artifact Repository allows artifacts to be archived in an accessible location for all users on the tenant.
[ ]:
import os

from hera.shared import global_config
from hera.workflows import Container, Artifact, Workflow, Steps, Step

# CONFIG
# -------------------------
IMG_LABEL="busybox"
TAG_LABEL="1.36.0"
IMG_TAG=f"{IMG_LABEL}:{TAG_LABEL}"
# -------------------------

global_config.api_version = "argoproj.io/v1"
global_config.host = os.getenv('WORKFLOW_HOST')

with Workflow(
    name='artifact-passing',
    namespace=os.getenv('WORKFLOW_NS'),
    entrypoint='workflowsteps',
    parallelism=1, # Number of tasks to run in parallel
) as w:
    create_file = Container(
        name='creator',
        image=f'{IMG_TAG}',
        command=['/bin/sh', '-c', 'echo "Hello from Task1" >> /tmp/hello.txt'],
        outputs=[Artifact(name='hellomessage', path='/tmp/hello.txt')]
    )
    read_file = Container(
        name='reader',
        image=f'{IMG_TAG}',
        command=["cat"],
        args=["/tmp/artifact/hello.txt"],
        inputs=[Artifact(name='inputmessage', path='/tmp/artifact/hello.txt')]
    )
    with Steps(name='workflowsteps') as s:
        Step(name='createfile', template=create_file)
        Step(
            name='readfile',
            template=read_file,
            arguments=[Artifact(
                name='inputmessage',
                from_='{{steps.createfile.outputs.artifacts.hellomessage}}',
                subpath='/tmp/hello.txt'
            )]
        )

Accessing the Shared Data Bucket#

We can access the shared-data bucket found in GEOAnalytics Canada that is used as a common data storage location for each tenant group.

Using a pre-populated file read-test.txt that contains the string "Hello from the Bucket!", we can cat the file contents to stdout and observe that in the UI logs.

[ ]:
import os

from hera.shared import global_config
from hera.workflows import Container, Workflow, Steps
from hera.workflows.models import VolumeMount

# CONFIG
# -------------------------
IMG_LABEL="busybox"
TAG_LABEL="1.36.0"
IMG_TAG=f"{IMG_LABEL}:{TAG_LABEL}"
# -------------------------

global_config.api_version = "argoproj.io/v1"
global_config.host = os.getenv('WORKFLOW_HOST')

with Workflow(
    name='shared-data-access',
    namespace=os.getenv('WORKFLOW_NS'),
    entrypoint='workflowsteps',
    parallelism=1, # Number of tasks to run in parallel
) as w:
    container_task = Container(
        name='busybox-geoanalytics',
        image=f'{IMG_TAG}',
        command=["cat", "/mnt/vol/shared-data/test-read.txt"],
        volume_mounts=[
            VolumeMount(name="shared-data", mount_path="/mnt/vol"),
        ],
    )
    with Steps(name='workflowsteps') as s:
        container_task(
            name='readit'
        )

You can inspect portions of your workflow by leveraging the IPython.display.JSON widget. Some parameters may be missing - there are some defaults that are intserted by the Workflow Controller that enable the workflow to execute in the GEOAnalytics Platform.

[ ]:
from IPython.display import JSON
JSON(w.to_dict())