.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "auto/integrations/kubernetes/k8s_spark/pyspark_pi.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note Click :ref:`here ` to download the full example code .. rst-class:: sphx-glr-example-title .. _sphx_glr_auto_integrations_kubernetes_k8s_spark_pyspark_pi.py: .. _intermediate_using_spark_tasks: Writing a PySpark Task ------------------------ Flyte has an optional plugin that makes it possible to run `Apache Spark `_ jobs natively on your kubernetes cluster. This plugin has been used extensively at Lyft and is battle tested. It makes it extremely easy to run your pyspark (coming soon to scala/java) code as a task. The plugin creates a new virtual cluster for the spark execution dynamically and Flyte will manage the execution, auto-scaling for the spark job. Spark in Flytekit ================= For a more complete example refer to the :std:ref:`example-spark` #. Ensure you have ``flytekit>=0.16.0`` #. Enable Spark in backend, following the previous section. #. Install the `flytekit spark plugin `_ :: pip install flytekitplugins-spark #. Write regular pyspark code - with one change in ``@task`` decorator. Refer to the example below: .. code-block:: python @task( task_config=Spark( # this configuration is applied to the spark cluster spark_conf={ "spark.driver.memory": "1000M", "spark.executor.instances": "2", "spark.driver.cores": "1", } ), cache_version="1", cache=True, ) def hello_spark(partitions: int) -> float: ... sess = flytekit.current_context().spark_session # Regular Pypsark code ... #. Run it locally .. code-block:: python hello_spark(partitions=10) #. Use it in a workflow (check cookbook) #. Run it on a remote cluster - To do this, you have to build the correct dockerfile, as explained here :std:ref:`spark-docker-image`. You can also you the `Standard Dockerfile recommended by Spark `_. Examples ======== .. _example-spark: How Flytekit Simplifies Usage of Pyspark in a Users Code ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The task ``hello_spark`` runs a new spark cluster, which when run locally runs a single node client only cluster, but when run remote spins up a arbitrarily sized cluster depending on the specified spark configuration. ``spark_conf`` This example also shows how a user can simply create 2 tasks, that use different Docker images. For more information refer to :any:`hosted_multi_images` .. GENERATED FROM PYTHON SOURCE LINES 67-74 .. code-block:: default import datetime import random from operator import add import flytekit from flytekit import Resources, task, workflow .. GENERATED FROM PYTHON SOURCE LINES 75-76 The following import is required to configure a Spark Server in Flyte: .. GENERATED FROM PYTHON SOURCE LINES 76-79 .. code-block:: default from flytekitplugins.spark import Spark .. GENERATED FROM PYTHON SOURCE LINES 80-86 Spark Task Sample ^^^^^^^^^^^^^^^^^ This example shows how a Spark task can be written simply by adding a ``@task(task_config=Spark(...)...)`` decorator. Refer to `Spark `__ class to understand the various configuration options. .. GENERATED FROM PYTHON SOURCE LINES 86-119 .. code-block:: default @task( task_config=Spark( # this configuration is applied to the spark cluster spark_conf={ "spark.driver.memory": "1000M", "spark.executor.memory": "1000M", "spark.executor.cores": "1", "spark.executor.instances": "2", "spark.driver.cores": "1", } ), limits=Resources(mem="2000M"), cache_version="1", ) def hello_spark(partitions: int) -> float: print("Starting Spark with Partitions: {}".format(partitions)) n = 100000 * partitions sess = flytekit.current_context().spark_session count = ( sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) ) pi_val = 4.0 * count / n print("Pi val is :{}".format(pi_val)) return pi_val def f(_): x = random.random() * 2 - 1 y = random.random() * 2 - 1 return 1 if x**2 + y**2 <= 1 else 0 .. GENERATED FROM PYTHON SOURCE LINES 120-121 This is a regular python function task. This will not execute on the spark cluster .. GENERATED FROM PYTHON SOURCE LINES 121-127 .. code-block:: default @task(cache_version="1") def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int: print("My printed value: {} @ {}".format(value_to_print, date_triggered)) return 1 .. GENERATED FROM PYTHON SOURCE LINES 128-129 The Workflow shows that a spark task and any python function (or any other task type) can be chained together as long as they match the parameter specifications. .. GENERATED FROM PYTHON SOURCE LINES 129-140 .. code-block:: default @workflow def my_spark(triggered_date: datetime.datetime) -> float: """ Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care about how the image is configured. """ pi = hello_spark(partitions=50) print_every_time(value_to_print=pi, date_triggered=triggered_date) return pi .. GENERATED FROM PYTHON SOURCE LINES 141-142 Workflows with spark tasks can be executed locally. Some aspects of spark, like links to plugins_hive metastores may not work, but these are limitations of using Spark and are not introduced by Flyte. .. GENERATED FROM PYTHON SOURCE LINES 142-151 .. code-block:: default if __name__ == "__main__": """ NOTE: To run a multi-image workflow locally, all dependencies of all the tasks should be installed, ignoring which may result in local runtime failures. """ print(f"Running {__file__} main...") print( f"Running my_spark(triggered_date=datetime.datetime.now()){my_spark(triggered_date=datetime.datetime.now())}" ) .. rst-class:: sphx-glr-timing **Total running time of the script:** ( 0 minutes 0.000 seconds) .. _sphx_glr_download_auto_integrations_kubernetes_k8s_spark_pyspark_pi.py: .. only:: html .. container:: sphx-glr-footer sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: pyspark_pi.py ` .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: pyspark_pi.ipynb ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_