{"id":30950,"date":"2021-09-06T07:33:46","date_gmt":"2021-09-06T06:33:46","guid":{"rendered":"https:\/\/www.inovex.de\/?p=30950"},"modified":"2022-11-21T09:22:37","modified_gmt":"2022-11-21T08:22:37","slug":"fully-managing-databricks-from-airflow-using-custom-operators","status":"publish","type":"post","link":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/","title":{"rendered":"Fully Managing Databricks from Airflow using Custom Operators"},"content":{"rendered":"<p>In this article we will explain how to use Airflow to orchestrate data processing applications built on Databricks beyond the provided functionality of the <span class=\"lang:default decode:true crayon-inline \">DatabricksSubmitRunOperator<\/span> and <span class=\"lang:default decode:true crayon-inline \">DatabricksRunNowOperator<\/span>. We will create custom Airflow operators that use the <span class=\"lang:default decode:true crayon-inline \">DatabricksHook<\/span>\u00a0 to make API calls so that we can manage the entire Databricks Workspace out of Airflow. As an example use case we want to create an Airflow sensor that listens for a specific file in our storage account. When it arrives, an all-purpose cluster gets created, a couple of processing jobs are submitted and a final (small) result will be retrieved before the all-purpose cluster is terminated and deleted.<!--more--><\/p>\n<div id=\"ez-toc-container\" class=\"ez-toc-v2_0_79_2 counter-hierarchy ez-toc-counter ez-toc-custom ez-toc-container-direction\">\n<div class=\"ez-toc-title-container\"><p class=\"ez-toc-title\" style=\"cursor:inherit\"><\/p>\n<\/div><nav><ul class='ez-toc-list ez-toc-list-level-1 ' ><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-1\" href=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#Prerequisites\" >Prerequisites<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-2\" href=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#Motivation\" >Motivation<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-3\" href=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#Installation\" >Installation<\/a><ul class='ez-toc-list-level-3' ><li class='ez-toc-heading-level-3'><a class=\"ez-toc-link ez-toc-heading-4\" href=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#Setup-local-Airflow\" >Setup local Airflow<\/a><\/li><\/ul><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-5\" href=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#Authorisation\" >Authorisation<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-6\" href=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#Generic-Databricks-Operators\" >Generic Databricks Operators<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-7\" href=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#Implementations-for-Specific-Use-Cases\" >Implementations for Specific Use Cases<\/a><ul class='ez-toc-list-level-3' ><li class='ez-toc-heading-level-3'><a class=\"ez-toc-link ez-toc-heading-8\" href=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#Getting-Query-Results\" >Getting Query Results<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-3'><a class=\"ez-toc-link ez-toc-heading-9\" href=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#Triggering-a-Pipeline-Based-on-an-Event\" >Triggering a Pipeline Based on an Event<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-3'><a class=\"ez-toc-link ez-toc-heading-10\" href=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#Starting-All-Purpose-Cluster-Execute-Jobs-and-Terminate\" >Starting All Purpose Cluster, Execute Jobs and Terminate<\/a><\/li><\/ul><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-11\" href=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#Final-DAG\" >Final DAG<\/a><\/li><\/ul><\/nav><\/div>\n<h2><span class=\"ez-toc-section\" id=\"Prerequisites\"><\/span>Prerequisites<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>We assume the reader to be familiar with the following topics:<\/p>\n<ul>\n<li>Python Basics incl. Object Oriented Programming and API requests.<\/li>\n<li>Airflow Basics incl. Operator Extension, Hooks, Sensors, Templating, Providers and XComs. See for instance this excellent introduction by <a href=\"https:\/\/michal.karzynski.pl\/blog\/2017\/03\/19\/developing-workflows-with-apache-airflow\" target=\"_blank\" rel=\"noopener\">Michal Karzynski<\/a>.<\/li>\n<li><a href=\"https:\/\/docs.databricks.com\/workspace\/index.html\">Databricks Workspace<\/a><\/li>\n<\/ul>\n<h2><span class=\"ez-toc-section\" id=\"Motivation\"><\/span>Motivation<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>When building big data processing applications that work with batch data, a common technology stack is the combination of Apache Spark as the distributed processing engine and Apache Airflow as the scheduler. Spark abstracts most of the complexity involved in distributed computing while Airflow provides a powerful scheduler that allows to define complex dependencies between tasks in a Directed Acyclic Graph (DAG) with task-specific policies for retries, on failure behaviour etc. Out of the box, it also comes with a powerful monitoring UI.<\/p>\n<p>When building a Spark application that runs on any major cloud platform (Azure, AWS, GCP) a natural choice for a Spark service is Databricks. Databricks describes itself as a &#8222;Unified Analytics Platform&#8220; bringing together (Big) Data Engineering, Data Science and Business Intelligence in a single service. Although it has many features such as MLFlow integration, Delta Lake integration, interactive notebook sessions, a simple job scheduler, a REST API and more, at its core it is a managed Spark service taking care of the cumbersome process of creating and configuring multiple machines into a Spark cluster in the cloud.<\/p>\n<p>Since their own scheduler is quite simple (comparable to a linux cron task) and limited to Databricks jobs only, it is not suited to orchestrate complex data pipelines that use multiple cloud services, task dependencies and task specific behavior. Therefore, Databricks also provides the Airflow plugins <span class=\"lang:default decode:true crayon-inline \">DatabricksSubmitRunOperator<\/span>\u00a0 and <span class=\"lang:default decode:true crayon-inline \">DatabricksRunNowOperator<\/span>\u00a0 that use its REST API to <a href=\"https:\/\/docs.databricks.com\/dev-tools\/data-pipelines.html\">trigger Databricks jobs out of Airflow<\/a>.<\/p>\n<p>While the first one takes a full <a href=\"https:\/\/docs.databricks.com\/dev-tools\/api\/latest\/jobs.html#runs-submit\">JSON job definition<\/a> \u00a0for the execution, the second one triggers an existing job in the workspace. Although this allows using Databricks in combination with Airflow it still does not cover the full spectrum of use cases that are usually covered by complex data pipelines such as: Getting small query results to be sent out via e-mail, triggering a pipeline based on an event as well as starting an all purpose cluster, executing multiple smaller jobs on it and terminating it afterwards.<\/p>\n<p>For all of these use cases (and many others) we can implement custom Airflow operators that leverage the powerful Databricks API using the <span class=\"lang:default decode:true crayon-inline \">DatabricksHook<\/span> . In this article we will describe a generic way to write custom Databricks operators and build an exemplary data pipeline of specialized custom Databricks operators that address all of the above mentioned use cases. The following figure shows the final DAG we want to arrive at.<\/p>\n<figure id=\"attachment_31059\" aria-describedby=\"caption-attachment-31059\" style=\"width: 780px\" class=\"wp-caption alignnone\"><img loading=\"lazy\" decoding=\"async\" class=\"size-full wp-image-31059\" src=\"https:\/\/www.inovex.de\/wp-content\/uploads\/airflow_final_dag_symm.png\" alt=\"Final Airflow DAG using custom operators\" width=\"780\" height=\"381\" srcset=\"https:\/\/www.inovex.de\/wp-content\/uploads\/airflow_final_dag_symm.png 780w, https:\/\/www.inovex.de\/wp-content\/uploads\/airflow_final_dag_symm-300x147.png 300w, https:\/\/www.inovex.de\/wp-content\/uploads\/airflow_final_dag_symm-768x375.png 768w, https:\/\/www.inovex.de\/wp-content\/uploads\/airflow_final_dag_symm-400x195.png 400w, https:\/\/www.inovex.de\/wp-content\/uploads\/airflow_final_dag_symm-360x176.png 360w\" sizes=\"auto, (max-width: 780px) 100vw, 780px\" \/><figcaption id=\"caption-attachment-31059\" class=\"wp-caption-text\">Final Airflow DAG using custom operators<\/figcaption><\/figure>\n<h2><span class=\"ez-toc-section\" id=\"Installation\"><\/span>Installation<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>We will be using Python 3.7 from the <a href=\"https:\/\/www.anaconda.com\/products\/individual\">anaconda distribution<\/a> and Airflow 2.1.0 with the Databricks extension.<\/p>\n<pre class=\"theme:vs2012 lang:default decode:true\">conda create -n airflow_databricks python=3.7\r\nconda activate airflow_databricks\r\npip install apache-airflow[databricks]==2.1.0<\/pre>\n<p>A Databricks Workspace must be set up at the cloud provider of your choice. We will use Azure Databricks (as of 07\/2021) throughout this tutorial.<\/p>\n<p>This should be straight forward but in case you need assistance you can find a short tutorial <a href=\"https:\/\/kyleake.medium.com\/how-to-create-and-deploy-a-databricks-workspace-using-the-azure-portal-ep-3-180c6f6d941f\">here<\/a>.<\/p>\n<h3><span class=\"ez-toc-section\" id=\"Setup-local-Airflow\"><\/span>Setup local Airflow<span class=\"ez-toc-section-end\"><\/span><\/h3>\n<p>From the official documentation:<\/p>\n<pre class=\"theme:vs2012 lang:sh decode:true\" title=\"Local airflow setup\"># initialize the database\r\nairflow db init\r\n\r\nairflow users create \\\r\n--username admin \\\r\n--firstname Peter \\\r\n--lastname Parker \\\r\n--role Admin \\\r\n--email spiderman@inovex.de   # ;)\r\n\r\n# start the web server, default port is 8080\r\nairflow webserver --port 8080\r\n\r\n# start the scheduler\r\n# open a new terminal or else run webserver with ``-D`` option to run it as a daemon\r\nairflow scheduler\r\n\r\n# visit localhost:8080 in the browser and use the admin account you just created<\/pre>\n<p>This will set up airflow in your local machine&#8217;s root directory. Alternatively, you might want to adjust the <span class=\"lang:default decode:true crayon-inline \">AIRFLOW_HOME<\/span>\u00a0 variable to another path: <span class=\"lang:default decode:true crayon-inline \">export AIRFLOW_HOME=my\/path\/<\/span><\/p>\n<p>As another option you may use a <a href=\"https:\/\/airflow.apache.org\/docs\/apache-airflow\/stable\/start\/docker.html\">docker container.<\/a><\/p>\n<h2><span class=\"ez-toc-section\" id=\"Authorisation\"><\/span>Authorisation<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>The simplest way for a given user to authorise for the Databricks API is to generate a personal access token (PAT)\u00a0in the Databricks workspace\u00a0using the web UI:<\/p>\n<p>[ <em><strong>WORKSPACE_NAME<\/strong><\/em> ] <i>\u2192 <\/i><strong><em>User Settings<\/em><\/strong>\u00a0 (in the top right corner).<\/p>\n<figure id=\"attachment_30968\" aria-describedby=\"caption-attachment-30968\" style=\"width: 734px\" class=\"wp-caption alignnone\"><img loading=\"lazy\" decoding=\"async\" class=\"wp-image-30968 size-full\" src=\"https:\/\/www.inovex.de\/wp-content\/uploads\/db1.png\" alt=\"Generate a personal access token in your Databricks workspace\" width=\"734\" height=\"209\" srcset=\"https:\/\/www.inovex.de\/wp-content\/uploads\/db1.png 734w, https:\/\/www.inovex.de\/wp-content\/uploads\/db1-300x85.png 300w, https:\/\/www.inovex.de\/wp-content\/uploads\/db1-400x114.png 400w, https:\/\/www.inovex.de\/wp-content\/uploads\/db1-360x103.png 360w\" sizes=\"auto, (max-width: 734px) 100vw, 734px\" \/><figcaption id=\"caption-attachment-30968\" class=\"wp-caption-text\">Generate PAT in Databricks<\/figcaption><\/figure>\n<p>It must be stored as an Airflow connection in order to later\u00a0be securely accessed.<\/p>\n<p>In the Airflow UI: <i><strong>Admin<\/strong> \u2192 <strong>Connections<\/strong><\/i>\u00a0select\u00a0 <span class=\"lang:default decode:true crayon-inline \">databricks_default<\/span>\u00a0 and fill in the form as follows:<\/p>\n<figure id=\"attachment_30984\" aria-describedby=\"caption-attachment-30984\" style=\"width: 618px\" class=\"wp-caption alignnone\"><img loading=\"lazy\" decoding=\"async\" class=\"size-full wp-image-30984\" src=\"https:\/\/www.inovex.de\/wp-content\/uploads\/databricks_connection_2.png\" alt=\"Airflow connection for Databricks\" width=\"618\" height=\"710\" srcset=\"https:\/\/www.inovex.de\/wp-content\/uploads\/databricks_connection_2.png 618w, https:\/\/www.inovex.de\/wp-content\/uploads\/databricks_connection_2-261x300.png 261w, https:\/\/www.inovex.de\/wp-content\/uploads\/databricks_connection_2-400x460.png 400w, https:\/\/www.inovex.de\/wp-content\/uploads\/databricks_connection_2-360x414.png 360w\" sizes=\"auto, (max-width: 618px) 100vw, 618px\" \/><figcaption id=\"caption-attachment-30984\" class=\"wp-caption-text\">Creating a new Airflow connection for Databricks<\/figcaption><\/figure>\n<p>Additional connections can be added via <strong>Admin<\/strong> \u2192 <strong>Connections<\/strong> \u2192 \u00a0<strong>+ <\/strong>. Make sure to select &#8222;Databricks&#8220; as the connection type.<\/p>\n<h2><span class=\"ez-toc-section\" id=\"Generic-Databricks-Operators\"><\/span>Generic Databricks Operators<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>In order to be able to create custom operators that allow us to orchestrate Databricks, we must create an Airflow operator that inherits from Airflow&#8217;s <a href=\"https:\/\/airflow.apache.org\/docs\/apache-airflow\/stable\/_api\/airflow\/models\/baseoperator\/index.html\"><span class=\"lang:default decode:true crayon-inline\">BaseOperator<\/span><\/a>\u00a0 class. Therefore, let&#8217;s create a new module <em>inovex_databricks_operators.py<\/em> at <span class=\"lang:default decode:true crayon-inline \">airflow\/plugins\/operators<\/span>\u00a0. Each custom operator must implement at least an <span class=\"lang:python decode:true crayon-inline\">execute()<\/span>\u00a0 method. Additionally, one will usually implement a constructor to allow for flexible connection IDs and additional parameters. In case of our Databricks Operators we will be passing a request type, an endpoint URL and a dictionary acting as the request body for the API calls.<\/p>\n<p>Have a look at the following example of a generic custom operator: The operator takes a HTTP request type (e.g. <span class=\"lang:default decode:true crayon-inline \">POST<\/span> ) and an endpoint URL as string as well as an optional dictionary containing the request body and calls the Databricks API. \u00a0Additional <span class=\"lang:default decode:true crayon-inline \">BaseOperator<\/span> arguments can be passed as keyword arguments.<\/p>\n<pre class=\"theme:vs2012 lang:python decode:true\">import json\r\nfrom typing import Dict, Optional\r\n\r\nfrom airflow.models import BaseOperator\r\nfrom airflow.providers.databricks.hooks.databricks import DatabricksHook\r\n\r\n\r\nclass GenericDatabricksOperator(BaseOperator):\r\n\r\n    template_fields = (\"request_body\",)\r\n\r\n    def __init__(\r\n        self,\r\n        request_type: str,\r\n        endpoint: str,\r\n        request_body: Optional[Dict] = None,\r\n        databricks_conn_id: str = \"databricks_default\",\r\n        **kwargs,\r\n    ):\r\n        super().__init__(**kwargs)\r\n\r\n        self.request_type = request_type\r\n        self.endpoint = endpoint\r\n        self.request_body = request_body or {}\r\n        self.databricks_conn_id = databricks_conn_id\r\n\r\n    def execute(self, context):\r\n        databricks_hook = DatabricksHook(databricks_conn_id=self.databricks_conn_id)\r\n        self.log.info(\"Submitting %s to %s \" % (self.request_type, self.endpoint))\r\n        responds = databricks_hook._do_api_call(  \r\n            (self.request_type, self.endpoint), self.request_body  \r\n        )\r\n        self.log.info(\"Responds:\")\r\n        self.log.info(json.dumps(responds))\r\n\r\n        return responds\r\n\r\n<\/pre>\n<p>Inside the <span class=\"lang:python decode:true crayon-inline \">execute()<\/span> \u00a0method, we create an instance of the <span class=\"lang:default decode:true crayon-inline \">DatabricksHook<\/span>\u00a0 using the Databricks connection ID.<\/p>\n<p>The hook has a <span class=\"lang:python decode:true crayon-inline\">_do_api_call()<\/span>\u00a0 method which retrieves the credentials from the Airflow connection and makes API calls to Databricks using Python\u2019s built-in request package.<\/p>\n<p>The method takes two required arguments to build the request:<\/p>\n<ol>\n<li>A tuple of strings containing the HTTP request type (e.g. <span class=\"lang:default decode:true crayon-inline\">&#8222;POST&#8220;<\/span> ,<span class=\"lang:default decode:true crayon-inline\">&#8222;GET&#8220;<\/span> ) and the API endpoint (starting with <span class=\"lang:default decode:true crayon-inline\">&#8222;api\/2.0\/&#8220;<\/span>\u00a0)<\/li>\n<li>A dictionary containing the request body asked for. Since this is a templated field the request body can contain <a href=\"https:\/\/airflow.apache.org\/docs\/apache-airflow\/stable\/macros-ref.html\">Airflow macros<\/a> that will be rendered by the operator.<\/li>\n<\/ol>\n<p>You can find the required parameters for each possible API call in the <a href=\"https:\/\/docs.databricks.com\/dev-tools\/api\/latest\/index.html#apis\">Databricks API documentation<\/a>.\u00a0 Note that <span class=\"lang:python decode:true crayon-inline \">_do_api_call<\/span>\u00a0 by convention\u00a0is a protected method.<\/p>\n<p>This is basically it. With this Operator you can build very flexible Databricks data pipelines.<br \/>\nAs an example, see the following DAG with a single task that starts an existing cluster.<\/p>\n<pre class=\"theme:vs2012 lang:python decode:true\">from datetime import datetime\r\n\r\nfrom inovex_databricks_operators import GenericDatabricksOperator\r\n\r\nfrom airflow import DAG\r\n\r\n\r\ndefault_args = {\"owner\": \"inovex\", \"retries\": 0, \"start_date\": datetime(2021, 1, 1)}\r\n\r\nwith DAG(\"GenericExample\", schedule_interval=None, default_args=default_args) as dag:\r\n    # lets start an all_purpose cluster we have already created manually given its id.\r\n    # We get the request structure from:\r\n    # https:\/\/docs.databricks.com\/dev-tools\/api\/latest\/clusters.html#start\r\n    start_existing_cluster = GenericDatabricksOperator(\r\n        task_id=\"start_existing_cluster\",\r\n        request_type=\"POST\",\r\n        endpoint=\"api\/2.0\/clusters\/start\",\r\n        request_body={\"cluster_id\": \"YOUR-CLUSTER-ID\"},\r\n        do_xcom_push=True,  # this way we could process the response in another task\r\n    )\r\n\r\n<\/pre>\n<p>Next, let&#8217;s have a look at some use cases for which we will create specialized Operators.<\/p>\n<h2><span class=\"ez-toc-section\" id=\"Implementations-for-Specific-Use-Cases\"><\/span>Implementations for Specific Use Cases<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<h3><span class=\"ez-toc-section\" id=\"Getting-Query-Results\"><\/span>Getting Query Results<span class=\"ez-toc-section-end\"><\/span><\/h3>\n<p>Querying a table e.g. for sending the result to some user for reporting or monitoring via E-Mail is a common task in many ETL workflows. This use case requires extracting some return value from Databricks into Airflow so that it can be sent out using the <span class=\"lang:default decode:true crayon-inline \">EmailOperator<\/span>\u00a0.<\/p>\n<p>Via the Databricks Job API, the results returned from the job run of a notebook can be retrieved by using the <span class=\"lang:default decode:true crayon-inline \">2.0\/jobs\/runs\/get-output<\/span>\u00a0 endpoint. The request must specify the run ID of the executed job. Values to be returned are passed to <span class=\"lang:python decode:true crayon-inline \">dbutils.notebook.exit()<\/span>\u00a0 as a string at the end of the Databricks notebook.<\/p>\n<figure id=\"attachment_31024\" aria-describedby=\"caption-attachment-31024\" style=\"width: 780px\" class=\"wp-caption alignnone\"><img loading=\"lazy\" decoding=\"async\" class=\"size-full wp-image-31024\" src=\"https:\/\/www.inovex.de\/wp-content\/uploads\/databricks_notebook_result.png\" alt=\"Return Databricks notebook result\" width=\"780\" height=\"445\" srcset=\"https:\/\/www.inovex.de\/wp-content\/uploads\/databricks_notebook_result.png 780w, https:\/\/www.inovex.de\/wp-content\/uploads\/databricks_notebook_result-300x171.png 300w, https:\/\/www.inovex.de\/wp-content\/uploads\/databricks_notebook_result-768x438.png 768w, https:\/\/www.inovex.de\/wp-content\/uploads\/databricks_notebook_result-400x228.png 400w, https:\/\/www.inovex.de\/wp-content\/uploads\/databricks_notebook_result-360x205.png 360w\" sizes=\"auto, (max-width: 780px) 100vw, 780px\" \/><figcaption id=\"caption-attachment-31024\" class=\"wp-caption-text\">Return Databricks notebook result<\/figcaption><\/figure>\n<p>As you can see in the code snippet below, we construct the <span class=\"lang:default decode:true crayon-inline \">GetDatabricksResultOperator<\/span> with a connection ID for Databricks and the task ID of a specific <span class=\"lang:default decode:true crayon-inline \">DatabricksRunNowOperator<\/span>\u00a0or <span class=\"lang:default decode:true crayon-inline \">DatabricksSubmitRunOperator<\/span> task instance. Make sure that the task pushes the run ID to xcom by setting the\u00a0 run operator\u2019s <span class=\"lang:default decode:true crayon-inline \">do_xcom_push<\/span> parameter to <span class=\"lang:python decode:true crayon-inline \">True<\/span>\u00a0.<\/p>\n<p>On execution, we retrieve the run ID of the specified task from xcom. We then inject that <span class=\"lang:default decode:true crayon-inline \">run_id<\/span> into the endpoint and execute the API call. From the response we retrieve the result and push it to xcom and thereby make it accessible to other tasks in a DAG (e.g. to an instance of the E-Mail operator).<\/p>\n<pre class=\"theme:vs2012 lang:python decode:true\" title=\"Custom operator for getting notebook results\">from airflow.models import BaseOperator\r\nfrom airflow.providers.databricks.hooks.databricks import DatabricksHook\r\n\r\nclass GetDatabricksResultOperator(BaseOperator):\r\n    def __init__(\r\n        self, run_task_id: str, databricks_conn_id: str = \"databricks_default\", **kwargs\r\n    ):\r\n\r\n        super().__init__(**kwargs)\r\n        self.run_task_id = run_task_id\r\n        self.databricks_conn_id = databricks_conn_id\r\n\r\n    def execute(self, context):\r\n        databricks_hook = DatabricksHook(databricks_conn_id=self.databricks_conn_id)\r\n        run_id = context[\"task_instance\"].xcom_pull(self.run_task_id, key=\"run_id\")\r\n        responds = databricks_hook._do_api_call(\r\n            (\"GET\", f\"api\/2.0\/jobs\/runs\/get-output?run_id={run_id}\"), {}\r\n        )\r\n        result = responds[\"notebook_output\"][\"result\"]\r\n        self.log.info(\"Result:\")\r\n        self.log.info(result)\r\n\r\n        return result\r\n\r\n<\/pre>\n<h3><span class=\"ez-toc-section\" id=\"Triggering-a-Pipeline-Based-on-an-Event\"><\/span>Triggering a Pipeline Based on an Event<span class=\"ez-toc-section-end\"><\/span><\/h3>\n<p>It is common to trigger a workflow on the occurrence of an external event, e.g. the emergence of a <span class=\"lang:default decode:true crayon-inline \">_SUCCESS<\/span>\u00a0 file in a storage bucket. While we could use some serverless cloud function service for that, it requires us to have this additional service available and configured, adds this additional dependency to the application and makes it less platform agnostic. A simple alternative would be an Airflow sensor that listens for a file at a specified path and triggers the respective jobs.<\/p>\n<p>Our <span class=\"lang:default decode:true crayon-inline \">DatabricksFileExistsSensor<\/span>\u00a0\u00a0inherits from Airflow\u2019s <span class=\"lang:default decode:true crayon-inline \">BaseSensorOperator<\/span> and is initialized with a file path. On <span class=\"lang:python decode:true crayon-inline \">poke()<\/span> , a request is sent to the <span class=\"lang:default decode:true crayon-inline \">2.0\/dbfs\/get-status<\/span>\u00a0 endpoint, the body containing that file path.<\/p>\n<p>With the API call throwing an exception if the specified path does not exist, we need to catch the exception with the error code <span class=\"lang:default decode:true crayon-inline \">\u201cRESOURCE_DOES_NOT_EXIST\u201c<\/span>\u00a0 and continue poking by returning <span class=\"lang:python decode:true crayon-inline \">False<\/span> . When the request is successful i.e. not throwing the exception, we complete the task by returning <span class=\"lang:python decode:true crayon-inline \">True<\/span>\u00a0.<\/p>\n<pre class=\"lang:python decode:true\">from airflow.exceptions import AirflowException\r\nfrom airflow.providers.databricks.hooks.databricks import DatabricksHook\r\nfrom airflow.sensors.base import BaseSensorOperator\r\n\r\n\r\nclass DatabricksFileExistsSensor(BaseSensorOperator):\r\n    def __init__(\r\n        self, file_path: str, databricks_conn_id: str = \"databricks_default\", **kwargs\r\n    ):\r\n        super().__init__(**kwargs)\r\n        self.file_path = file_path\r\n        self.databricks_conn_id = databricks_conn_id\r\n\r\n    def poke(self, context):\r\n        databricks_hook = DatabricksHook(databricks_conn_id=self.databricks_conn_id)\r\n\r\n        try:\r\n            databricks_hook._do_api_call(\r\n                (\"GET\", \"api\/2.0\/dbfs\/get-status\"), {\"path\": self.file_path}\r\n            )\r\n            self.log.info(\"File %s found\" % self.file_path)\r\n            return True\r\n\r\n        except AirflowException as e:\r\n            print(e)\r\n            if '\"error_code\":\"RESOURCE_DOES_NOT_EXIST\"' in str(e):\r\n                self.log.info(\"No file found at %s\" % self.file_path)\r\n            else:\r\n                raise e\r\n\r\n            return False\r\n\r\n<\/pre>\n<h3><span class=\"ez-toc-section\" id=\"Starting-All-Purpose-Cluster-Execute-Jobs-and-Terminate\"><\/span>Starting All Purpose Cluster, Execute Jobs and Terminate<span class=\"ez-toc-section-end\"><\/span><\/h3>\n<p>Last but not least, using the provided Databricks operators as they are for job execution has a fundamental drawback. Each Airflow task is executed as an individual Databricks job. In Databricks, each job either starts and shutdowns a new job cluster or uses a predefined all-purpose cluster (identified by its ID in the job definition). Considering a workflow composed of many small jobs that run for 1 or 2 minutes each, the overhead of creating and deleting the job clusters between each of the tasks is disproportionately high compared to the jobs\u2019 execution times. On the other hand, using an all-purpose cluster requires manually creating that cluster and injecting the cluster id to the job definitions. The termination of the cluster would have to be managed manually or by an auto termination configuration (which will, more often than not, be set too long or too short and thus be wasteful).<\/p>\n<p>All of this is not satisfying for the efficient automation of complex workflows. What we would like to have instead, is a way of starting a defined all-purpose cluster automatically at the beginning of the DAG, execute all jobs and finally terminate and delete it.<\/p>\n<p>Note that using an all-purpose cluster instead of a job cluster is approx. <a href=\"https:\/\/databricks.com\/de\/product\/azure-pricing\">4x more expensive<\/a> in terms of Databricks Units. There is a somewhat indirect way to also run multiple jobs on one job cluster but that deserves its own blog post.<\/p>\n<p>The <span class=\"lang:default decode:true crayon-inline\">CreateDatabricksClusterOperator<\/span> is used to create and start a Databricks all-purpose cluster according to a given <a href=\"https:\/\/docs.databricks.com\/dev-tools\/api\/latest\/clusters.html#create\">specification<\/a>. The API request to the endpoint <span class=\"lang:default decode:true crayon-inline \">2.0\/clusters\/create<\/span>\u00a0 returns the cluster\u2019s ID, which we push to xcom for future reference.<br \/>\nTo make sure that the cluster is in a running state after the execution of our operator, we keep requesting the cluster\u2019s state using the <span class=\"lang:default decode:true crayon-inline \">2.0\/clusters\/get<\/span>\u00a0 endpoint every 10 seconds until the returned <span class=\"lang:default decode:true crayon-inline \">cluster_status<\/span> is <span class=\"lang:default decode:true crayon-inline \">RUNNING<\/span> . In addition, we define an <span class=\"lang:python decode:true crayon-inline \">on_kill()<\/span>\u00a0 method that makes sure the cluster is being shut down if the task gets interrupted prior to finishing.<\/p>\n<p>Once the task succeeds, any existing or new job can be executed on the running cluster using the <span class=\"lang:default decode:true crayon-inline \">DatabricksSubmitRunOperator<\/span> or the <span class=\"lang:default decode:true crayon-inline \">DatabricksRunNowOperator<\/span> . We only need to retrieve the cluster id from xcom (as demonstrated in the final DAG).<\/p>\n<pre class=\"lang:python decode:true\">from time import sleep\r\nfrom typing import Dict\r\n\r\nfrom airflow.exceptions import AirflowException\r\nfrom airflow.models import BaseOperator\r\nfrom airflow.providers.databricks.hooks.databricks import DatabricksHook\r\n\r\n\r\nclass CreateDatabricksClusterOperator(BaseOperator):\r\n\r\n    template_fields = (\"request_body\",)\r\n\r\n    def __init__(\r\n        self,\r\n        request_body: Dict,\r\n        databricks_conn_id: str = \"databricks_default\",\r\n        **kwargs,\r\n    ):\r\n        super().__init__(**kwargs)\r\n\r\n        self.request_body = request_body\r\n        self.databricks_conn_id = databricks_conn_id\r\n\r\n        self.databricks_hook = None\r\n        self.cluster_id = None\r\n\r\n    def execute(self, context):\r\n        self.databricks_hook = DatabricksHook(\r\n            databricks_conn_id=self.databricks_conn_id\r\n        )\r\n        responds = self.databricks_hook._do_api_call(\r\n            (\"POST\", \"api\/2.0\/clusters\/create\"), self.request_body\r\n        )\r\n        self.cluster_id = responds[\"cluster_id\"]\r\n        self._sleep_until_created()\r\n\r\n        context[\"task_instance\"].xcom_push(\"all_purpose_cluster_id\", self.cluster_id)\r\n\r\n    def _sleep_until_created(self):\r\n        self.log.info(\"Waiting until cluster %s is created\" % self.cluster_id)\r\n        while True:\r\n            cluster_status_responds = self.databricks_hook._do_api_call(\r\n                (\"GET\", f\"api\/2.0\/clusters\/get?cluster_id={self.cluster_id}\"), {}\r\n            )\r\n\r\n            cluster_status = cluster_status_responds[\"state\"]\r\n\r\n            if cluster_status not in (\"PENDING\", \"RUNNING\"):\r\n                raise AirflowException(\r\n                    f\"Cluster status of {self.cluster_id} is neither 'PENDING' nor 'RUNNING'. It's {cluster_status}\"\r\n                )\r\n\r\n            if cluster_status == \"RUNNING\":\r\n                self.log.info(\"Cluster %s created\" % self.cluster_id)\r\n                break\r\n\r\n            sleep(10)\r\n\r\n    def on_kill(self):\r\n        self.databricks_hook._do_api_call(\r\n            (\"POST\", \"api\/2.0\/clusters\/permanent-delete\"),\r\n            {\"cluster_id\": self.cluster_id},\r\n        )\r\n        self.log.info(\"Cluster %s killed\" % self.cluster_id)\r\n\r\n<\/pre>\n<p>The <span class=\"lang:default decode:true crayon-inline \">TerminateDatabricksClusterOperator<\/span>\u00a0 allows us to shut down a specific cluster via the <span class=\"lang:default decode:true crayon-inline \">2.0\/clusters\/permanent-delete<\/span>\u00a0 endpoint, irrespective of whether or not the upstream job execution tasks resulted in success or error. This behavior is enforced by setting the operator\u2019s trigger rule to <span class=\"lang:default decode:true crayon-inline \">&#8222;all_done&#8220;<\/span>\u00a0 (instead of <span class=\"lang:default decode:true crayon-inline \">&#8222;all_success&#8220;<\/span>\u00a0).<\/p>\n<pre class=\"lang:python decode:true\">from airflow.models import BaseOperator\r\nfrom airflow.providers.databricks.hooks.databricks import DatabricksHook\r\n\r\n\r\nclass TerminateDatabricksClusterOperator(BaseOperator):\r\n    def __init__(\r\n        self,\r\n        create_cluster_task_id: str,\r\n        databricks_conn_id: str = \"databricks_default\",\r\n        trigger_rule: str = \"all_done\",  # we want to make sure the cluster is deleted in any case\r\n        **kwargs,\r\n    ):\r\n        super().__init__(trigger_rule=trigger_rule, **kwargs)\r\n\r\n        self.create_cluster_task_id = create_cluster_task_id\r\n        self.databricks_conn_id = databricks_conn_id\r\n\r\n    def execute(self, context):\r\n        cluster_id = context[\"task_instance\"].xcom_pull(\r\n            self.create_cluster_task_id, key=\"all_purpose_cluster_id\"\r\n        )\r\n        databricks_hook = DatabricksHook(databricks_conn_id=self.databricks_conn_id)\r\n        databricks_hook._do_api_call(\r\n            (\"POST\", \"api\/2.0\/clusters\/permanent-delete\"), {\"cluster_id\": cluster_id}\r\n        )\r\n        self.log.info(\"Cluster %s terminated\" % cluster_id)\r\n\r\n<\/pre>\n<h2><span class=\"ez-toc-section\" id=\"Final-DAG\"><\/span>Final DAG<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>Our final example DAG is a sequence of the custom airflow operators:<\/p>\n<ol>\n<li>File sensor: The sensor listens for a given file path in the DBFS and finishes when the requested file (created by another external process) exists.<\/li>\n<li>Start cluster: Using the <span class=\"lang:default decode:true crayon-inline \">CreateDatabricksClusterOperator<\/span>\u00a0we set up a cluster and wait for it to be running.<\/li>\n<li>Execute job: We submit and run a job in Databricks using the <span class=\"lang:default decode:true crayon-inline \">DatabricksSubmitRunOperator<\/span> . In the request body we provide as the <span class=\"lang:default decode:true crayon-inline \">existing_cluster_id<\/span>\u00a0 the Airflow macro <span class=\"lang:python decode:true crayon-inline\">&#8222;{{ti.xcom_pull(&#8217;start_cluster&#8216;, key=&#8217;all_purpose_cluster_id&#8216;)}}&#8220;<\/span>\u00a0 causing the operator to retrieve the cluster ID of our all-purpose cluster from xcom. In this case, <span class=\"lang:default decode:true crayon-inline \">start_cluster<\/span>\u00a0 is the task_id of the <span class=\"lang:default decode:true crayon-inline \">CreateDatabricksClusterOperator<\/span>\u00a0 task that created the cluster and <span class=\"lang:default decode:true crayon-inline \">all_purpose_cluster_id<\/span>\u00a0 is the key under which the Operator pushed the <span class=\"lang:default decode:true crayon-inline \">cluster_id<\/span>\u00a0 onto xcom.<\/li>\n<li>Get result: The value returned by the executed notebook is obtained using the <span class=\"lang:default decode:true crayon-inline \">GetDatabricksResultOperator<\/span> and the result is pushed to xcom for later use (e.g. by an E-Mail Operator).<\/li>\n<li>Terminate cluster: Regardless of the state in which 3. and 4. finished, the Databricks cluster is being shut down and deleted.<\/li>\n<\/ol>\n<pre class=\"lang:python decode:true\">from datetime import datetime\r\n\r\nfrom custom_databricks_operators import (\r\n    CreateDatabricksClusterOperator,\r\n    DatabricksFileExistsSensor,\r\n    GetDatabricksResultOperator,\r\n    TerminateDatabricksClusterOperator,\r\n)\r\n\r\nfrom airflow import DAG\r\nfrom airflow.providers.databricks.operators.databricks import (\r\n    DatabricksSubmitRunOperator,\r\n)\r\n\r\ndefault_args = {\"owner\": \"inovex\", \"retries\": 0, \"start_date\": datetime(2021, 1, 1)}\r\n\r\n\r\nwith DAG(\"FinalDag\", schedule_interval=None, default_args=default_args) as dag:\r\n\r\n    wait_for_file = DatabricksFileExistsSensor(\r\n        task_id=\"check_file\", file_path=\"\/tmp.parquet\/_SUCCESS\"\r\n    )\r\n\r\n    # https:\/\/docs.databricks.com\/dev-tools\/api\/latest\/clusters.html#create\r\n    create_request = {\r\n        \"cluster_name\": \"airflow_cluster\",\r\n        \"spark_version\": \"7.3.x-scala2.12\",\r\n        \"node_type_id\": \"Standard_DS3_v2\",\r\n        \"num_workers\": 1,\r\n    }\r\n    # NOTE: subject to all-purpose databricks pricing!\r\n    start_cluster = CreateDatabricksClusterOperator(\r\n        task_id=\"start_cluster\", request_body=create_request\r\n    )\r\n\r\n    # https:\/\/docs.databricks.com\/dev-tools\/api\/latest\/jobs.html#runs-submit\r\n    submit_job_request = {\r\n        \"existing_cluster_id\": \"{{ti.xcom_pull('start_cluster', key='all_purpose_cluster_id')}}\",\r\n        \"notebook_task\": {\"notebook_path\": \"\/Shared\/main\"},\r\n    }\r\n    run_notebook_task = DatabricksSubmitRunOperator(\r\n        task_id=\"run_notebook_task\", json=submit_job_request, do_xcom_push=True\r\n    )\r\n\r\n    # run_task_id = task_id from DatabricksSubmitRunOperator\r\n    get_result = GetDatabricksResultOperator(\r\n        task_id=\"get_result\", run_task_id=\"run_notebook_task\"\r\n    )\r\n\r\n    # create_cluster_task_id = task_id from CreateDatabricksClusterOperator\r\n    terminate_cluster = TerminateDatabricksClusterOperator(\r\n        task_id=\"terminate_cluster\", create_cluster_task_id=\"start_cluster\"\r\n    )\r\n\r\nwait_for_file &gt;&gt; start_cluster &gt;&gt; run_notebook_task &gt;&gt; get_result &gt;&gt; terminate_cluster\r\n\r\n<\/pre>\n<p>&nbsp;<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In this article we will explain how to use Airflow to orchestrate data processing applications built on Databricks beyond the provided functionality of the DatabricksSubmitRunOperator and DatabricksRunNowOperator. We will create custom Airflow operators that use the DatabricksHook\u00a0 to make API calls so that we can manage the entire Databricks Workspace out of Airflow. As an [&hellip;]<\/p>\n","protected":false},"author":246,"featured_media":31597,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"inline_featured_image":false,"ep_exclude_from_search":false,"footnotes":""},"tags":[783,77,71,385,784],"service":[414,411],"coauthors":[{"id":246,"display_name":"Frauke Beccard","user_nicename":"fbeccard"},{"id":245,"display_name":"Alan Mazankiewicz","user_nicename":"amazankiewicz"}],"class_list":["post-30950","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","tag-airflow","tag-big-data","tag-cloud","tag-data-engineering","tag-databricks","service-cloud","service-data-engineering"],"acf":[],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v26.5 - https:\/\/yoast.com\/wordpress\/plugins\/seo\/ -->\n<title>Fully Managing Databricks from Airflow using Custom Operators - inovex GmbH<\/title>\n<meta name=\"description\" content=\"This is a tutorial on custom Airflow operators to build advanced pipelines on Databricks beyond the provided &#039;Submit Run&#039; and &#039;Run Now&#039; functionality.\" \/>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/\" \/>\n<meta property=\"og:locale\" content=\"de_DE\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"Fully Managing Databricks from Airflow using Custom Operators - inovex GmbH\" \/>\n<meta property=\"og:description\" content=\"This is a tutorial on custom Airflow operators to build advanced pipelines on Databricks beyond the provided &#039;Submit Run&#039; and &#039;Run Now&#039; functionality.\" \/>\n<meta property=\"og:url\" content=\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/\" \/>\n<meta property=\"og:site_name\" content=\"inovex GmbH\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/inovexde\" \/>\n<meta property=\"article:published_time\" content=\"2021-09-06T06:33:46+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2022-11-21T08:22:37+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/www.inovex.de\/wp-content\/uploads\/managing-databricks-from-airflow.png\" \/>\n\t<meta property=\"og:image:width\" content=\"1920\" \/>\n\t<meta property=\"og:image:height\" content=\"1080\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/png\" \/>\n<meta name=\"author\" content=\"Frauke Beccard, Alan Mazankiewicz\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:image\" content=\"https:\/\/www.inovex.de\/wp-content\/uploads\/managing-databricks-from-airflow-1024x576.png\" \/>\n<meta name=\"twitter:creator\" content=\"@inovexgmbh\" \/>\n<meta name=\"twitter:site\" content=\"@inovexgmbh\" \/>\n<meta name=\"twitter:label1\" content=\"Verfasst von\" \/>\n\t<meta name=\"twitter:data1\" content=\"Frauke Beccard\" \/>\n\t<meta name=\"twitter:label2\" content=\"Gesch\u00e4tzte Lesezeit\" \/>\n\t<meta name=\"twitter:data2\" content=\"13\u00a0Minuten\" \/>\n\t<meta name=\"twitter:label3\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data3\" content=\"Frauke Beccard, Alan Mazankiewicz\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\/\/schema.org\",\"@graph\":[{\"@type\":\"Article\",\"@id\":\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#article\",\"isPartOf\":{\"@id\":\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/\"},\"author\":{\"name\":\"Frauke Beccard\",\"@id\":\"https:\/\/www.inovex.de\/de\/#\/schema\/person\/e347c08d93adc0681a52c90fb28af485\"},\"headline\":\"Fully Managing Databricks from Airflow using Custom Operators\",\"datePublished\":\"2021-09-06T06:33:46+00:00\",\"dateModified\":\"2022-11-21T08:22:37+00:00\",\"mainEntityOfPage\":{\"@id\":\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/\"},\"wordCount\":2148,\"commentCount\":1,\"publisher\":{\"@id\":\"https:\/\/www.inovex.de\/de\/#organization\"},\"image\":{\"@id\":\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#primaryimage\"},\"thumbnailUrl\":\"https:\/\/www.inovex.de\/wp-content\/uploads\/managing-databricks-from-airflow.png\",\"keywords\":[\"Airflow\",\"Big Data\",\"Cloud\",\"Data Engineering\",\"Databricks\"],\"articleSection\":[\"Analytics\",\"English Content\",\"General\"],\"inLanguage\":\"de\",\"potentialAction\":[{\"@type\":\"CommentAction\",\"name\":\"Comment\",\"target\":[\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#respond\"]}]},{\"@type\":\"WebPage\",\"@id\":\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/\",\"url\":\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/\",\"name\":\"Fully Managing Databricks from Airflow using Custom Operators - inovex GmbH\",\"isPartOf\":{\"@id\":\"https:\/\/www.inovex.de\/de\/#website\"},\"primaryImageOfPage\":{\"@id\":\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#primaryimage\"},\"image\":{\"@id\":\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#primaryimage\"},\"thumbnailUrl\":\"https:\/\/www.inovex.de\/wp-content\/uploads\/managing-databricks-from-airflow.png\",\"datePublished\":\"2021-09-06T06:33:46+00:00\",\"dateModified\":\"2022-11-21T08:22:37+00:00\",\"description\":\"This is a tutorial on custom Airflow operators to build advanced pipelines on Databricks beyond the provided 'Submit Run' and 'Run Now' functionality.\",\"breadcrumb\":{\"@id\":\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#breadcrumb\"},\"inLanguage\":\"de\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/\"]}]},{\"@type\":\"ImageObject\",\"inLanguage\":\"de\",\"@id\":\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#primaryimage\",\"url\":\"https:\/\/www.inovex.de\/wp-content\/uploads\/managing-databricks-from-airflow.png\",\"contentUrl\":\"https:\/\/www.inovex.de\/wp-content\/uploads\/managing-databricks-from-airflow.png\",\"width\":1920,\"height\":1080,\"caption\":\"Abstract Illustration of Airflow Managing the Databricks Platform\"},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"Home\",\"item\":\"https:\/\/www.inovex.de\/de\/\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"Fully Managing Databricks from Airflow using Custom Operators\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\/\/www.inovex.de\/de\/#website\",\"url\":\"https:\/\/www.inovex.de\/de\/\",\"name\":\"inovex GmbH\",\"description\":\"\",\"publisher\":{\"@id\":\"https:\/\/www.inovex.de\/de\/#organization\"},\"potentialAction\":[{\"@type\":\"SearchAction\",\"target\":{\"@type\":\"EntryPoint\",\"urlTemplate\":\"https:\/\/www.inovex.de\/de\/?s={search_term_string}\"},\"query-input\":{\"@type\":\"PropertyValueSpecification\",\"valueRequired\":true,\"valueName\":\"search_term_string\"}}],\"inLanguage\":\"de\"},{\"@type\":\"Organization\",\"@id\":\"https:\/\/www.inovex.de\/de\/#organization\",\"name\":\"inovex GmbH\",\"url\":\"https:\/\/www.inovex.de\/de\/\",\"logo\":{\"@type\":\"ImageObject\",\"inLanguage\":\"de\",\"@id\":\"https:\/\/www.inovex.de\/de\/#\/schema\/logo\/image\/\",\"url\":\"https:\/\/www.inovex.de\/wp-content\/uploads\/2021\/03\/inovex-logo-16-9-1.png\",\"contentUrl\":\"https:\/\/www.inovex.de\/wp-content\/uploads\/2021\/03\/inovex-logo-16-9-1.png\",\"width\":1921,\"height\":1081,\"caption\":\"inovex GmbH\"},\"image\":{\"@id\":\"https:\/\/www.inovex.de\/de\/#\/schema\/logo\/image\/\"},\"sameAs\":[\"https:\/\/www.facebook.com\/inovexde\",\"https:\/\/x.com\/inovexgmbh\",\"https:\/\/www.instagram.com\/inovexlife\/\",\"https:\/\/www.linkedin.com\/company\/inovex\",\"https:\/\/www.youtube.com\/channel\/UC7r66GT14hROB_RQsQBAQUQ\"]},{\"@type\":\"Person\",\"@id\":\"https:\/\/www.inovex.de\/de\/#\/schema\/person\/e347c08d93adc0681a52c90fb28af485\",\"name\":\"Frauke Beccard\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"de\",\"@id\":\"https:\/\/www.inovex.de\/de\/#\/schema\/person\/image\/583cd8c17d8c96750794c8cdd28de0ff\",\"url\":\"https:\/\/secure.gravatar.com\/avatar\/09a3803d466f9d701864b10e087c084eea86fc2c67ef9cca3616dd9cd5823f8e?s=96&d=retro&r=g\",\"contentUrl\":\"https:\/\/secure.gravatar.com\/avatar\/09a3803d466f9d701864b10e087c084eea86fc2c67ef9cca3616dd9cd5823f8e?s=96&d=retro&r=g\",\"caption\":\"Frauke Beccard\"},\"url\":\"https:\/\/www.inovex.de\/de\/blog\/author\/fbeccard\/\"}]}<\/script>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"Fully Managing Databricks from Airflow using Custom Operators - inovex GmbH","description":"This is a tutorial on custom Airflow operators to build advanced pipelines on Databricks beyond the provided 'Submit Run' and 'Run Now' functionality.","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/","og_locale":"de_DE","og_type":"article","og_title":"Fully Managing Databricks from Airflow using Custom Operators - inovex GmbH","og_description":"This is a tutorial on custom Airflow operators to build advanced pipelines on Databricks beyond the provided 'Submit Run' and 'Run Now' functionality.","og_url":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/","og_site_name":"inovex GmbH","article_publisher":"https:\/\/www.facebook.com\/inovexde","article_published_time":"2021-09-06T06:33:46+00:00","article_modified_time":"2022-11-21T08:22:37+00:00","og_image":[{"width":1920,"height":1080,"url":"https:\/\/www.inovex.de\/wp-content\/uploads\/managing-databricks-from-airflow.png","type":"image\/png"}],"author":"Frauke Beccard, Alan Mazankiewicz","twitter_card":"summary_large_image","twitter_image":"https:\/\/www.inovex.de\/wp-content\/uploads\/managing-databricks-from-airflow-1024x576.png","twitter_creator":"@inovexgmbh","twitter_site":"@inovexgmbh","twitter_misc":{"Verfasst von":"Frauke Beccard","Gesch\u00e4tzte Lesezeit":"13\u00a0Minuten","Written by":"Frauke Beccard, Alan Mazankiewicz"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#article","isPartOf":{"@id":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/"},"author":{"name":"Frauke Beccard","@id":"https:\/\/www.inovex.de\/de\/#\/schema\/person\/e347c08d93adc0681a52c90fb28af485"},"headline":"Fully Managing Databricks from Airflow using Custom Operators","datePublished":"2021-09-06T06:33:46+00:00","dateModified":"2022-11-21T08:22:37+00:00","mainEntityOfPage":{"@id":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/"},"wordCount":2148,"commentCount":1,"publisher":{"@id":"https:\/\/www.inovex.de\/de\/#organization"},"image":{"@id":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#primaryimage"},"thumbnailUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/managing-databricks-from-airflow.png","keywords":["Airflow","Big Data","Cloud","Data Engineering","Databricks"],"articleSection":["Analytics","English Content","General"],"inLanguage":"de","potentialAction":[{"@type":"CommentAction","name":"Comment","target":["https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#respond"]}]},{"@type":"WebPage","@id":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/","url":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/","name":"Fully Managing Databricks from Airflow using Custom Operators - inovex GmbH","isPartOf":{"@id":"https:\/\/www.inovex.de\/de\/#website"},"primaryImageOfPage":{"@id":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#primaryimage"},"image":{"@id":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#primaryimage"},"thumbnailUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/managing-databricks-from-airflow.png","datePublished":"2021-09-06T06:33:46+00:00","dateModified":"2022-11-21T08:22:37+00:00","description":"This is a tutorial on custom Airflow operators to build advanced pipelines on Databricks beyond the provided 'Submit Run' and 'Run Now' functionality.","breadcrumb":{"@id":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#breadcrumb"},"inLanguage":"de","potentialAction":[{"@type":"ReadAction","target":["https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/"]}]},{"@type":"ImageObject","inLanguage":"de","@id":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#primaryimage","url":"https:\/\/www.inovex.de\/wp-content\/uploads\/managing-databricks-from-airflow.png","contentUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/managing-databricks-from-airflow.png","width":1920,"height":1080,"caption":"Abstract Illustration of Airflow Managing the Databricks Platform"},{"@type":"BreadcrumbList","@id":"https:\/\/www.inovex.de\/de\/blog\/fully-managing-databricks-from-airflow-using-custom-operators\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Home","item":"https:\/\/www.inovex.de\/de\/"},{"@type":"ListItem","position":2,"name":"Fully Managing Databricks from Airflow using Custom Operators"}]},{"@type":"WebSite","@id":"https:\/\/www.inovex.de\/de\/#website","url":"https:\/\/www.inovex.de\/de\/","name":"inovex GmbH","description":"","publisher":{"@id":"https:\/\/www.inovex.de\/de\/#organization"},"potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/www.inovex.de\/de\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"de"},{"@type":"Organization","@id":"https:\/\/www.inovex.de\/de\/#organization","name":"inovex GmbH","url":"https:\/\/www.inovex.de\/de\/","logo":{"@type":"ImageObject","inLanguage":"de","@id":"https:\/\/www.inovex.de\/de\/#\/schema\/logo\/image\/","url":"https:\/\/www.inovex.de\/wp-content\/uploads\/2021\/03\/inovex-logo-16-9-1.png","contentUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/2021\/03\/inovex-logo-16-9-1.png","width":1921,"height":1081,"caption":"inovex GmbH"},"image":{"@id":"https:\/\/www.inovex.de\/de\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/inovexde","https:\/\/x.com\/inovexgmbh","https:\/\/www.instagram.com\/inovexlife\/","https:\/\/www.linkedin.com\/company\/inovex","https:\/\/www.youtube.com\/channel\/UC7r66GT14hROB_RQsQBAQUQ"]},{"@type":"Person","@id":"https:\/\/www.inovex.de\/de\/#\/schema\/person\/e347c08d93adc0681a52c90fb28af485","name":"Frauke Beccard","image":{"@type":"ImageObject","inLanguage":"de","@id":"https:\/\/www.inovex.de\/de\/#\/schema\/person\/image\/583cd8c17d8c96750794c8cdd28de0ff","url":"https:\/\/secure.gravatar.com\/avatar\/09a3803d466f9d701864b10e087c084eea86fc2c67ef9cca3616dd9cd5823f8e?s=96&d=retro&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/09a3803d466f9d701864b10e087c084eea86fc2c67ef9cca3616dd9cd5823f8e?s=96&d=retro&r=g","caption":"Frauke Beccard"},"url":"https:\/\/www.inovex.de\/de\/blog\/author\/fbeccard\/"}]}},"_links":{"self":[{"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/posts\/30950","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/users\/246"}],"replies":[{"embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/comments?post=30950"}],"version-history":[{"count":6,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/posts\/30950\/revisions"}],"predecessor-version":[{"id":31596,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/posts\/30950\/revisions\/31596"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/media\/31597"}],"wp:attachment":[{"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/media?parent=30950"}],"wp:term":[{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/tags?post=30950"},{"taxonomy":"service","embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/service?post=30950"},{"taxonomy":"author","embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/coauthors?post=30950"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}