.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "auto/integrations/kubernetes/pod/pod.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note Click :ref:`here ` to download the full example code .. rst-class:: sphx-glr-example-title .. _sphx_glr_auto_integrations_kubernetes_pod_pod.py: Pod Example ----------- Pod tasks can be used whenever multiple containers need to spin up within a single task. They expose a fully modifiable Kubernetes `pod spec `__ which can be used to customize the task execution runtime. All we need to do to use pod tasks are: 1. Define a pod spec 2. Specify the name of the primary container The primary container is the driver for Flyte task execution, for example, producing inputs and outputs. Pod tasks accept arguments that ordinary container tasks usually accept, such as resource specifications, etc. However, these are only applied to the primary container. To customize other containers brought up during the execution, we can define a full-fledged pod spec. This is done using the `Kubernetes Python client library `__'s, `V1PodSpec `__. In order to use pod tasks, install the Flyte pod plugin first: .. prompt:: bash $ pip install flytekitplugins-pod .. GENERATED FROM PYTHON SOURCE LINES 30-37 Simple Pod Task =============== In this example, let's define a simple pod spec in which a shared volume is mounted in both primary and secondary containers. The secondary writes a file that the primary waits on before completing. First, we import the necessary libraries and define the shared data path. .. GENERATED FROM PYTHON SOURCE LINES 37-54 .. code-block:: default import os import time from typing import List from flytekit import Resources, TaskMetadata, dynamic, map_task, task, workflow from flytekitplugins.pod import Pod from kubernetes.client.models import ( V1Container, V1EmptyDirVolumeSource, V1PodSpec, V1ResourceRequirements, V1Volume, V1VolumeMount, ) _SHARED_DATA_PATH = "/data/message.txt" .. GENERATED FROM PYTHON SOURCE LINES 55-56 We define a simple pod spec with two containers. .. GENERATED FROM PYTHON SOURCE LINES 56-99 .. code-block:: default def generate_pod_spec_for_task(): # Primary containers do not require us to specify an image, the default image built for Flyte tasks will get used. primary_container = V1Container(name="primary") # NOTE: For non-primary containers, we must specify the image. secondary_container = V1Container( name="secondary", image="alpine", ) secondary_container.command = ["/bin/sh"] secondary_container.args = [ "-c", "echo hi pod world > {}".format(_SHARED_DATA_PATH), ] resources = V1ResourceRequirements( requests={"cpu": "1", "memory": "100Mi"}, limits={"cpu": "1", "memory": "100Mi"} ) primary_container.resources = resources secondary_container.resources = resources shared_volume_mount = V1VolumeMount( name="shared-data", mount_path="/data", ) secondary_container.volume_mounts = [shared_volume_mount] primary_container.volume_mounts = [shared_volume_mount] pod_spec = V1PodSpec( containers=[primary_container, secondary_container], volumes=[ V1Volume( name="shared-data", empty_dir=V1EmptyDirVolumeSource(medium="Memory") ) ], ) return pod_spec .. GENERATED FROM PYTHON SOURCE LINES 100-102 Although pod tasks for the most part allow us to customize Kubernetes container attributes, we can still use Flyte directives to specify resources and even the image. The default image built for Flyte tasks will be used unless ``container_image`` task attribute is specified. .. GENERATED FROM PYTHON SOURCE LINES 102-125 .. code-block:: default @task( task_config=Pod( pod_spec=generate_pod_spec_for_task(), primary_container_name="primary" ), requests=Resources( mem="1G", ), ) def my_pod_task() -> str: # The code defined in this task will get injected into the primary container. while not os.path.isfile(_SHARED_DATA_PATH): time.sleep(5) with open(_SHARED_DATA_PATH, "r") as shared_message_file: return shared_message_file.read() @workflow def pod_workflow() -> str: s = my_pod_task() return s .. GENERATED FROM PYTHON SOURCE LINES 126-131 Pod Task in Map Task ==================== To use pod task as part of map task, we send pod task definition to :py:func:`~flytekit:flytekit.map_task`. This will run pod task across a collection of inputs. .. GENERATED FROM PYTHON SOURCE LINES 131-179 .. code-block:: default @task( task_config=Pod( pod_spec=V1PodSpec( containers=[ V1Container( name="primary", resources=V1ResourceRequirements( requests={"cpu": ".5", "memory": "500Mi"}, limits={"cpu": ".5", "memory": "500Mi"}, ), ) ], init_containers=[ V1Container( image="alpine", name="init", command=["/bin/sh"], args=["-c", 'echo "I\'m a customizable init container"'], resources=V1ResourceRequirements( limits={"cpu": ".5", "memory": "500Mi"}, ), ) ], ), primary_container_name="primary", ) ) def my_pod_map_task(stringify: int) -> str: return str(stringify) @task def coalesce(b: List[str]) -> str: coalesced = ", ".join(b) return coalesced @workflow def my_map_workflow(a: List[int]) -> str: mapped_out = map_task(my_pod_map_task, metadata=TaskMetadata(retries=1))( stringify=a ) coalesced = coalesce(b=mapped_out) return coalesced .. GENERATED FROM PYTHON SOURCE LINES 180-184 Dynamic Pod Tasks ==================== To use pod task a dynamic task, simply pass the pod task config to an annotated dynamic task. .. GENERATED FROM PYTHON SOURCE LINES 184-217 .. code-block:: default @task def simple_stringify(val: int) -> str: return f"{val} served courtesy of a dynamic pod task!" @dynamic( task_config=Pod( pod_spec=V1PodSpec( containers=[ V1Container( name="primary", resources=V1ResourceRequirements( requests={"cpu": ".5", "memory": "450Mi"}, limits={"cpu": ".5", "memory": "500Mi"}, ), ) ], ), primary_container_name="primary", ) ) def my_dynamic_pod_task(val: int) -> str: return simple_stringify(val=val) @workflow def my_dynamic_pod_task_workflow(val: int = 6) -> str: s = my_dynamic_pod_task(val=val) return s .. GENERATED FROM PYTHON SOURCE LINES 218-219 The workflows can be executed locally as follows: .. GENERATED FROM PYTHON SOURCE LINES 219-224 .. code-block:: default if __name__ == "__main__": print(f"Running {__file__} main...") print(f"Calling PodWorkflow()... {pod_workflow()}") print(f"Calling my_map_workflow()... {my_map_workflow()}") print(f"Calling my_dynamic_pod_task_workflow()... {my_dynamic_pod_task_workflow()}") .. rst-class:: sphx-glr-timing **Total running time of the script:** ( 0 minutes 0.000 seconds) .. _sphx_glr_download_auto_integrations_kubernetes_pod_pod.py: .. only:: html .. container:: sphx-glr-footer sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: pod.py ` .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: pod.ipynb ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_