Pipelines#
The GeoAnalytics Canada Pipeline system helps with developing and building portable, scalable Earth Observation pre-processing pipelines and machine learning (ML) workflows based on Docker containers. The underlying system uses Argo Workflows and the Argo Project documentation can be useful in debugging/expanding the pipelines functionality.
The Pipelines platform consists of:
A UI for managing and tracking pipelines and their execution.
An engine for scheduling a pipeline’s execution
An SDK for defining, building, and deploying pipelines in Python.
The SDK we use is the
A pipeline is a representation of a workflow containing the parameters required to run the workflow and the inputs and outputs of each component. Each pipeline component is a self-contained code block, packaged as a Docker image.
A Workflow can also leverage GEOAnalytics Canada’s Cloud Storage as an Artifact store to share artifacts between Tasks/Steps.
[8]:
# Import the required libraries
import os
from hera.shared import global_config
from hera.workflows import Container, Workflow, Steps
Configuration#
Setting some global values within the notebook helps keep the workflow easier to update to different Container Registries, Containers, and Images.
[9]:
# CONFIG
# -------------------------
CR_URL="someregistry.domain.com"
IMG_LABEL="repository/imagename"
TAG_LABEL="0.1.0"
IMG_TAG=f"{CR_URL}/{IMG_LABEL}:{TAG_LABEL}"
# -------------------------
Setting Up The Workflow#
The next Cell implements a template of how a single-step Workflow would be implemented. This Workflow is created with a single Container which is then executed during the Steps procedure within the Workflow.
[10]:
global_config.api_version = "argoproj.io/v1"
global_config.host = os.getenv('WORKFLOW_HOST')
with Workflow(
name='nameofworkflow', # Must be lowercase
namespace=os.getenv('WORKFLOW_NS'),
entrypoint='name-of-entry-task-step',
parallelism=1, # Number of tasks to run in parallel
) as w:
t = Container(
name='unique-container-name',
image=f'{IMG_TAG}',
command=["sh", "./entrypoint.sh"], #
)
with Steps(name="name-of-step-template"):
t(name="name-of-task-step")
Submitting A Workflow to Pipelines#
All that is left to do to submit your workflow, is to run the .create()
method on the workflow object. This uses the GEOAnalytics Canada preconfigured backend settings to ensure your workflows are submitted with the correct permissions and security.
[ ]:
w.create()
Workflow Approaches#
Steps and Parallelism
Artifact Passing
Accessing the GEOAnalytics Shared Data
Steps and Parallelism#
The following workflow highlights a strength of using Pipelines - parallelism. Concurrently executing decoupled or independent tasks is possible by using the Steps.parallel()
method. The following workflow will first execute A
and then run B
and C
at the same time. Without using Steps.parallel()
, the Steps would then be executed sequentially.
[ ]:
# The following source was used to create the below code:
# https://hera.readthedocs.io/en/latest/examples/workflows/steps_with_callable_container/
import os
from hera.shared import global_config
from hera.workflows import Container, Parameter, Workflow, Steps
# CONFIG
# -------------------------
CR_URL="docker.io"
IMG_LABEL="docker/whalesay"
TAG_LABEL="latest"
IMG_TAG=f"{CR_URL}/{IMG_LABEL}:{TAG_LABEL}"
# -------------------------
global_config.api_version = "argoproj.io/v1"
global_config.host = os.getenv('WORKFLOW_HOST')
with Workflow(
name='step-parallel',
namespace=os.getenv('WORKFLOW_NS'),
entrypoint='workflowsteps',
parallelism=1, # Number of tasks to run in parallel
) as w:
container_task = Container(
name='whalesay-geoanalytics',
image=f'{IMG_TAG}',
command=["cowsay"],
inputs=[Parameter(name="message")],
args=["{{inputs.parameters.message}}"],
)
with Steps(name='workflowsteps') as s:
container_task(
name='A',
arguments=[Parameter(name='message', value='Hi!')]
)
with s.parallel():
container_task(
name='B',
arguments=[Parameter(name='message', value='Hello!')]
)
container_task(
name='C',
arguments=[Parameter(name='message', value='General Kenobi!')]
)
Artifact Passing#
It a lot of cases it’s necessary to pass an output to the next Step of a workflow. Artifacts solve this scenario. You can create an Artifact as an ourput from a previous Step to be consumed by a subsequent step. The Workflow below demonstrates an Artifact being created and then consumed in the next Step. An important thing to notice is that the output and input paths differ - this is to allow more flexible insertion of information in your Step implementations.
NoteUsing ourshared-data
Blob Container as the Artifact Repository allows artifacts to be archived in an accessible location for all users on the tenant.
[ ]:
import os
from hera.shared import global_config
from hera.workflows import Container, Artifact, Workflow, Steps, Step
# CONFIG
# -------------------------
IMG_LABEL="busybox"
TAG_LABEL="1.36.0"
IMG_TAG=f"{IMG_LABEL}:{TAG_LABEL}"
# -------------------------
global_config.api_version = "argoproj.io/v1"
global_config.host = os.getenv('WORKFLOW_HOST')
with Workflow(
name='artifact-passing',
namespace=os.getenv('WORKFLOW_NS'),
entrypoint='workflowsteps',
parallelism=1, # Number of tasks to run in parallel
) as w:
create_file = Container(
name='creator',
image=f'{IMG_TAG}',
command=['/bin/sh', '-c', 'echo "Hello from Task1" >> /tmp/hello.txt'],
outputs=[Artifact(name='hellomessage', path='/tmp/hello.txt')]
)
read_file = Container(
name='reader',
image=f'{IMG_TAG}',
command=["cat"],
args=["/tmp/artifact/hello.txt"],
inputs=[Artifact(name='inputmessage', path='/tmp/artifact/hello.txt')]
)
with Steps(name='workflowsteps') as s:
Step(name='createfile', template=create_file)
Step(
name='readfile',
template=read_file,
arguments=[Artifact(
name='inputmessage',
from_='{{steps.createfile.outputs.artifacts.hellomessage}}',
subpath='/tmp/hello.txt'
)]
)