
🔥Hi everyone , In this blog we will talk about efficiently orchestrating complex machine learning workflows on kubernetes. kubeflow provides a powerful platform for managing these workflows, but to truly harness its potential, it’s important to define configurations that make effective use of resources while ensuring reliable execution.
👉🏻 It’s important to note that pipeline configuration goes beyond just packaging code into components and deploying them on kubernetes clusters. It is the fine-grained control of execution behaviour, resource allocation, and handling unexpected scenarios.
🥁 Well Pipeline configurations are the backbone of a robust and efficient machine learning workflows. Here are the few reasons why :
- Execution Workflow Management: Pipelines often consist of multiple interconnected components that need to execute in a specific order, similar to Directed Acyclic Graphs (DAGs). Stitch configurations allow you to define dependencies and sequencing logic, ensuring that components run in the correct order and adhere to your desired workflow.
- Resource Allocation and Customisation: Each component in a Kubeflow pipeline may have distinct resource requirements. For instance, one pod may need general computing resources with specific CPU, RAM, and disk specifications, while another may require specialised hardware like NVIDIA GPUs for accelerated computing.
- Cost Optimisation: Resource configurations help manage cloud costs effectively. By controlling resource allocation and scaling strategies, you can prevent over provisioning, leading to cost savings in cloud computing environments.
- Fault Tolerance and Error Handling: Pipelines can encounter failures due to node failures or issues within the code. Error Handling configurations enable you to define strategies for handling such failures, such as retries, fallback components, or graceful termination.
- Pod Placement on Nodes: Efficient pod placement on Kubernetes nodes is important for optimising resource utilisation and performance. With affinity and toleration configurations, you can fine-tune where your pods are scheduled, ensuring that they land on nodes with the necessary hardware, software.
Moreover, It empowers you to customise, manage, and control every aspect of your pipeline, ensuring that it runs smoothly, optimally and gracefully handles unexpected scenarios.
while building pipeline you need to load components before stitching them together, kubeflow provides various ways to incorporate component here are few most popular ways.
from kfp import components# Load component from a file
components.load_component_from_file('~/path/to/component.yaml')
# Using create_component_from_func as a class
def add(a: float, b: float) -> float:
"""Returns sum of two arguments"""
return a + b
add_op = create_component_from_func(
func=add,
base_image='python:3.7', # Optional
output_component_file='add.component.yaml', # Optional
packages_to_install=['pandas==0.24'], # Optional
)
# Using create_component_from_func as a decorator
@create_component_from_func
def add_op(a: float, b: float) -> float:
"""Returns sum of two arguments"""
return a + b
In a Kubeflow pipeline, creating a linear sequence of components is just one aspect of orchestration. However, pipelines often require more complex workflows. Let’s explore essential techniques for stitching components together.
a. After Method
Sequential execution using after in Kubeflow Pipelines ensures that components run one after the other, creating a clear sequence of tasks.
@dsl.pipeline(name='my-pipeline')
def my_pipeline():
task1 = my_component(text='1st task')
task2 = my_component(text='2nd task').after(task1)
b. Parallel Method
The ParallelFor construct in Kubeflow Pipelines enables parallel execution of multiple components. Tasks within the ParallelFor block can run concurrently, making it ideal for scenarios where you want to process data independently and efficiently.
with dsl.ParallelFor(
items=[{'a': 1, 'b': 10}, {'a': 2, 'b': 20}],
parallelism=1
) as item:
task1 = my_component(..., number=item.a)
task2 = my_component(..., number=item.b)
c. Condition
Conditional execution with Condition allows you to create branches in your pipeline. Tasks within the conditional block execute based on specific conditions.
task1 = my_component1(...)
with Condition(task1.output=='pizza', 'pizza-condition'):
task2 = my_component2(...)
Once your pipeline components are stitched together, it’s time to place your pipeline on dedicated , accelerated , general , compute optimised or spot node groups. Pod placement using V1Affinity and V1Toleration is a way of managing how and where your pods are scheduled onto nodes within your cluster.
task = my_component(...)aff_general_computing = V1Affinity(
node_affinity=V1NodeAffinity(
required_during_scheduling_ignored_during_execution=V1NodeSelector(
node_selector_terms=[V1NodeSelectorTerm(
match_expressions=[V1NodeSelectorRequirement(
key='beta.kubernetes.io/instance-type',
operator='In',
values=['p2.xlarge'])])])))
tol_general_computing = V1Toleration( effect='NoExecute',
key='beta.kubernetes.io/instance-type', operator='Equal',
value='p2.xlarge')
task.add_affinity(affinity=aff_general_computing)
task.add_affinity(affinity=tol_general_computing)
Once you have decided where your pipeline pods have to go you can set resource requests to ensure that pods receive the minimum or maximum CPU, GPU , Storage and memory resources which they need to run effectively, preventing resource contention and potential pod evictions. Moreover, resource request in k8s is important for efficient cluster management.
a. CPU
CPU resource request and limit allow you to specify the minimum and maximum cpu resources that a pod should use.
task = my_component(...)# Set cpu request (minimum) for this operator.
task.set_cpu_request(cpu="1000m")
# Set cpu limit (maximum) for this operator.
task.set_cpu_limit(cpu="2000m")
b. Memory
Memory resource request and limit define the minimum and maximum memory usage for a pod.
# string which can be a number or a number followed by one of
# “E”, “P”, “T”, “G”, “M”, “K”.task = my_component(...)
# Set memory request (minimum) for this operator.
task.set_memory_request(cpu="1G")
# Set memory limit (maximum) for this operator.
task.set_memory_limit(cpu="2G")
c. Storage
Ephemeral storage request and limit determine the minimum and maximum storage space a pod can use for temporary storage.
# string which can be a number or a number followed by one of
# “E”, “P”, “T”, “G”, “M”, “K”.task = my_component(...)
# Set ephemeral-storage request (minimum) for this operator
task.set_ephemeral_storage_request(cpu="5G")
# Set ephemeral-storage request (maximum) for this operator.
task.set_ephemeral_storage_limit(cpu="10G")
d. GPU
GPU resource configuration is different from cpu and memory. In k8s, you don’t specify gpu requests; you only set gpu limit.
task = my_component(...)# Note that there is no need to add GPU request.
# GPUs are only supposed to be specified in the limits section.
task.set_gpu_limit(gpu="1", vendor="nvidia.com/gpu")
Once you have decided how much ram , core, memory and storage being used by pipeline pods its time to access environment variables within the pods using the python k8s client sdk.
from kubernetes import client, config# Load Cluster configurations
config.load_incluster_config()
# Create Api client
api_client = client.CoreV1Api()
# Define namespace
namespace = "xyz"
from kubernetes import client, config
def get_config_map(namespace, config_map_name):
try:
# Get the ConfigMap
config_map = api_client.read_namespaced_config_map(name=config_map_name,namespace=namespace)
return config_map
except Exception as e:
print(f"Error: {e}")
return None
kubeflow volume mounts are essential for providing persistent storage and data sharing within machine learning pipeline components. They enable tasks to access and manipulate data, models, and other resources, ensuring data consistency across pipeline stages.
a. PVC Volume Mount
v1.Volume defines the volume source and is declared at the Pod level, while v1.VolumeMount specifies how that volume is mounted into a container and is declared at the container level. These two objects work together to enable data sharing and storage access within Kubernetes Pods.
from kubernetes import client as k8s_client
from kfp import dslpvc_name='pipeline-claim',
volume_name='pipeline',
volume_mount_path='/mnt/pipeline'
train = train_op(...)
local_pvc = k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name)
volume = k8s_client.V1Volume(name=volume_name, persistent_volume_claim=local_pvc)
mount = k8s_client.V1VolumeMount(mount_path=volume_mount_path, name=volume_name)
train.add_volume(volume).add_volume_mount(mount)
b. Remote storage
Aws s3 sync can be used to sync data in from bucket to the pod running ml code, or out to the bucket from the pod for other pods to access during run.
aws s3 sync s3://data-bucket/experiment/artifacts local/folder
aws s3 sync local/folder s3://data-bucket/experiment/artifacts
Handling abrupt failures in a kubeflow pipeline ensures robustness and resilience. Two essential mechanisms to achieve this are retry policies and exit handlers:
a. Retry
Implementing retry policies allows you to specify how your pipeline components respond to transient failures.
task = my_component(...)ALLOWED_RETRY_POLICIES = (
'Always',
'OnError',
'OnFailure',
'OnTransientError',
)
# Sets the number of times the task is retried until it's declared failed.
task.set_retry(
num_retries=3,
policy=ALLOWED_RETRY_POLICIES[2],
backoff_duration ="2m"
)
b. Exit Handlers
Exit handlers are used to define cleanup actions or graceful termination procedures when a pipeline component fails or completes.
@component
def print_op(message: str):
"""Prints a message."""
print(message)@component
def fail_op(message: str):
"""Fails."""
import sys
print(message)
sys.exit(1)
@dsl.pipeline(name='pipeline-with-exit-handler')
def pipeline_exit_handler(message: str = 'Hello World!'):
exit_task = print_op(message='Exit handler has worked!')
with dsl.ExitHandler(exit_task):
print_op(message=message)
fail_op(message='Task failed.')
In the world of machine learning pipelines, getting the right configuration is key. We’ve covered methods to tune resources, manage variables, and orchestrate tasks. With all the above configurations your pipelines become robust and adaptable. Mastering these techniques helps you manage kubeflow pipelines effectively, making your machine learning journeys more efficient and resilient.
References
⭐ If you love and appreciate what I write, please consider subscribing. Your support is an encouragement, and it fuels my passion for providing even better content! 🚀📚