.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "auto/integrations/kubernetes/k8s_spark/dataframe_passing.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note Click :ref:`here ` to download the full example code .. rst-class:: sphx-glr-example-title .. _sphx_glr_auto_integrations_kubernetes_k8s_spark_dataframe_passing.py: .. _intermediate_spark_dataframes_passing: Converting a Spark DataFrame to a Pandas DataFrame ================================================== In this example, you will understand how a Spark dataset can be returned from a task and consumed as a pandas DataFrame. If the dataframe does not fit in memory, it will result in a runtime failure. .. GENERATED FROM PYTHON SOURCE LINES 12-13 Let's import the libraries. .. GENERATED FROM PYTHON SOURCE LINES 13-24 .. code-block:: default import flytekit import pandas from flytekit import Resources, kwtypes, task, workflow from flytekit.types.structured.structured_dataset import StructuredDataset from flytekitplugins.spark import Spark try: from typing import Annotated except ImportError: from typing_extensions import Annotated .. GENERATED FROM PYTHON SOURCE LINES 25-26 We define two column types: `name: str` and `age: int`. .. GENERATED FROM PYTHON SOURCE LINES 26-28 .. code-block:: default columns = kwtypes(name=str, age=int) .. GENERATED FROM PYTHON SOURCE LINES 29-30 We define a task that returns a Spark DataFrame. .. GENERATED FROM PYTHON SOURCE LINES 30-61 .. code-block:: default @task( task_config=Spark( spark_conf={ "spark.driver.memory": "1000M", "spark.executor.memory": "1000M", "spark.executor.cores": "1", "spark.executor.instances": "2", "spark.driver.cores": "1", } ), limits=Resources(mem="2000M"), cache_version="1", ) def create_spark_df() -> Annotated[StructuredDataset, columns]: """ This task returns a Spark dataset that conforms to the defined schema. Failure to do so should result in a runtime error. TODO: runtime error enforcement """ sess = flytekit.current_context().spark_session return StructuredDataset( dataframe=sess.createDataFrame( [ ("Alice", 5), ("Bob", 10), ("Charlie", 15), ], ["name", "age"], ) ) .. GENERATED FROM PYTHON SOURCE LINES 62-67 ``create_spark_df`` is a Spark task that runs within a Spark context (and relies on a Spark cluster that is up and running). Notice that the task simply returns a ``pyspark.DataFrame`` object, even though the return type specifies ``StructuredDataset``. The flytekit type-system will automatically convert the ``pyspark.DataFrame`` to a ``StructuredDataset`` object. ``StructuredDataset`` object is an abstract representation of a DataFrame, that can conform to different DataFrame formats. .. GENERATED FROM PYTHON SOURCE LINES 69-70 We also define a task that consumes the Spark DataFrame. .. GENERATED FROM PYTHON SOURCE LINES 70-76 .. code-block:: default @task(cache_version="1") def sum_of_all_ages(s: Annotated[StructuredDataset, columns]) -> int: df: pandas.DataFrame = s.open(pandas.DataFrame).all() return int(df["age"].sum()) .. GENERATED FROM PYTHON SOURCE LINES 77-80 The task ``sum_of_all_ages`` receives a parameter of type ``StructuredDataset``. We can use the ``open`` method to specify the DataFrame format, which is ``pandas.DataFrame`` in our case. Only performing an ``all`` on structured dataset will load the data into memory (or download if it is run in remote). .. GENERATED FROM PYTHON SOURCE LINES 82-83 Finally, we define the workflow. .. GENERATED FROM PYTHON SOURCE LINES 83-93 .. code-block:: default @workflow def my_smart_structured_dataset() -> int: """ This workflow shows how a simple schema can be created in Spark and passed to a python function and accessed as a pandas DataFrame. Flyte Schemas are abstract DataFrames and not tied to a specific memory representation. """ df = create_spark_df() return sum_of_all_ages(s=df) .. GENERATED FROM PYTHON SOURCE LINES 94-96 This workflow allows connecting ``create_spark_df`` with ``sum_of_all_ages`` since the return type of the first task and the parameter type for the second task match. .. GENERATED FROM PYTHON SOURCE LINES 98-99 This program can be executed locally, which greatly simplifies using disparate DataFrame technologies for the end-user. .. GENERATED FROM PYTHON SOURCE LINES 99-103 .. code-block:: default if __name__ == "__main__": print(f"Running {__file__} main...") print(f"Running my_smart_schema()-> {my_smart_structured_dataset()}") .. GENERATED FROM PYTHON SOURCE LINES 104-108 .. note:: New DataFrame technologies can be dynamically loaded in Flytekit's TypeEngine. To register a custom DataFrame type, you can define an encoder and decoder for ``StructuredDataset`` as outlined in the :ref:`structured_dataset_example` example. .. rst-class:: sphx-glr-timing **Total running time of the script:** ( 0 minutes 0.000 seconds) .. _sphx_glr_download_auto_integrations_kubernetes_k8s_spark_dataframe_passing.py: .. only:: html .. container:: sphx-glr-footer sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: dataframe_passing.py ` .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: dataframe_passing.ipynb ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_