.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "auto/integrations/aws/athena/athena.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note Click :ref:`here ` to download the full example code .. rst-class:: sphx-glr-example-title .. _sphx_glr_auto_integrations_aws_athena_athena.py: Athena Query ############ This example shows how to use a Flyte AthenaTask to execute a query. .. GENERATED FROM PYTHON SOURCE LINES 7-12 .. code-block:: default from flytekit import kwtypes, task, workflow from flytekit.types.schema import FlyteSchema from flytekitplugins.athena import AthenaConfig, AthenaTask .. GENERATED FROM PYTHON SOURCE LINES 13-15 This is the world's simplest query. Note that in order for registration to work properly, you'll need to give your Athena task a name that's unique across your project/domain for your Flyte installation. .. GENERATED FROM PYTHON SOURCE LINES 15-31 .. code-block:: default athena_task_no_io = AthenaTask( name="sql.athena.no_io", inputs={}, query_template=""" select 1 """, output_schema_type=None, task_config=AthenaConfig(database="mnist"), ) @workflow def no_io_wf(): return athena_task_no_io() .. GENERATED FROM PYTHON SOURCE LINES 32-76 Of course, in real world applications we are usually more interested in using Athena to query a dataset. In this case we've populated our vaccinations table with the publicly available dataset `here `__. For a primer on how upload a dataset, checkout of the official `AWS docs `__. The data is formatted according to this schema: +----------------------------------------------+ | country (string) | +----------------------------------------------+ | iso_code (string) | +----------------------------------------------+ | date (string) | +----------------------------------------------+ | total_vaccinations (string) | +----------------------------------------------+ | people_vaccinated (string) | +----------------------------------------------+ | people_fully_vaccinated (string) | +----------------------------------------------+ | daily_vaccinations_raw (string) | +----------------------------------------------+ | daily_vaccinations (string) | +----------------------------------------------+ | total_vaccinations_per_hundred (string) | +----------------------------------------------+ | people_vaccinated_per_hundred (string) | +----------------------------------------------+ | people_fully_vaccinated_per_hundred (string) | +----------------------------------------------+ | daily_vaccinations_per_million (string) | +----------------------------------------------+ | vaccines (string) | +----------------------------------------------+ | source_name (string) | +----------------------------------------------+ | source_website (string) | +----------------------------------------------+ Let's look out how we can parameterize our query to filter results for a specific country, provided as a user input specifying a country iso code. We'll produce a FlyteSchema that we can use in downstream flyte tasks for further analysis or manipulation. Note that we cache this output data so we don't have to re-run the query in future workflow iterations should we decide to change how we manipulate data downstream .. GENERATED FROM PYTHON SOURCE LINES 76-96 .. code-block:: default athena_task_templatized_query = AthenaTask( name="sql.athena.w_io", # Define inputs as well as their types that can be used to customize the query. inputs=kwtypes(iso_code=str), task_config=AthenaConfig(database="vaccinations"), query_template=""" SELECT * FROM vaccinations where iso_code like {{ .inputs.iso_code }} """, # While we define a generic schema as the output here, Flyte supports more strongly typed schemas to provide # better compile-time checks for task compatibility. Refer to :py:class:`flytekit.FlyteSchema` for more details output_schema_type=FlyteSchema, # Cache the output data so we don't have to re-run the query in future workflow iterations # should we decide to change how we manipulate data downstream. # For more information about caching, visit :ref:`Task Caching ` cache=True, cache_version="1.0", ) .. GENERATED FROM PYTHON SOURCE LINES 97-98 Now we (trivially) clean up and interact with the data produced from the above Athena query in a separate Flyte task. .. GENERATED FROM PYTHON SOURCE LINES 98-108 .. code-block:: default @task def manipulate_athena_schema(s: FlyteSchema) -> FlyteSchema: df = s.open().all() return df[df.total_vaccinations.notnull()] @workflow def full_athena_wf(country_iso_code: str) -> FlyteSchema: demo_schema = athena_task_templatized_query(iso_code=country_iso_code) return manipulate_athena_schema(s=demo_schema) .. rst-class:: sphx-glr-timing **Total running time of the script:** ( 0 minutes 0.000 seconds) .. _sphx_glr_download_auto_integrations_aws_athena_athena.py: .. only:: html .. container:: sphx-glr-footer sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: athena.py ` .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: athena.ipynb ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_