{"id":48559,"date":"2023-10-06T15:12:02","date_gmt":"2023-10-06T13:12:02","guid":{"rendered":"https:\/\/www.inovex.de\/?p=48559"},"modified":"2023-10-10T11:23:43","modified_gmt":"2023-10-10T09:23:43","slug":"datenpipeline-mit-streams-und-tasks-in-snowflake","status":"publish","type":"post","link":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/","title":{"rendered":"Kontinuierliche Datenpipeline mit Streams und Tasks in Snowflake"},"content":{"rendered":"<p>Willkommen im zweiten Artikel unserer Serie \u00fcber die Snowflake Data Cloud. <a href=\"https:\/\/www.inovex.de\/de\/blog\/kontinuierlicher-import-von-daten-mit-snowpipe\/\" rel=\"\">Der erste Artikel der Serie<\/a> hat sich mit Snowpipe besch\u00e4ftigt und wie es das\u00a0Laden von Dateien erm\u00f6glicht, sobald sie in einem externen Cloudspeicher zur Verf\u00fcgung stehen. \u00a0In diesem Teil stellen wir eine weitere M\u00f6glichkeit vor, Daten \u00fcber eine gesteuerte Pipeline zu laden und im selben Schritt Transformationen anwenden zu k\u00f6nnen. Bei den Werkzeugen handelt es sich um die Objekttypen <em>Streams<\/em> und <em>Tasks\u00a0<\/em>in Snowflake. Im Folgenden werden wir uns diese detailliert ansehen und gehen auf die Vorteile ein, die sie insbesondere im Zusammenspiel mit sich bringen.<!--more-->Ein Anwendungsfall in nahezu jedem datengetriebenen Projekt ist es, neue oder ge\u00e4nderte Daten einer Quelltabelle zuverl\u00e4ssig weiterzuverarbeiten und in entsprechende Zielstrukturen zu \u00fcberf\u00fchren.<br \/>\nGenau f\u00fcr einen solchen Use Case (auch die Bewirtschaftung einer Slowly Changing Dimension Typ 2 ist beispielsweise ein solches Szenario) bieten sich Streams und Tasks hervorragend an, da sie Features wie Scheduling und die automatisierbare Weiterverarbeitung von \u00c4nderungen bereits als integrierte Features mitbringen.<\/p>\n<div id=\"ez-toc-container\" class=\"ez-toc-v2_0_82_2 counter-hierarchy ez-toc-counter ez-toc-custom ez-toc-container-direction\">\n<div class=\"ez-toc-title-container\"><p class=\"ez-toc-title\" style=\"cursor:inherit\"><\/p>\n<\/div><nav><ul class='ez-toc-list ez-toc-list-level-1 ' ><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-1\" href=\"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#Use-Case\" >Use Case<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-2\" href=\"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#Streams\" >Streams<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-3\" href=\"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#Tasks\" >Tasks<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-4\" href=\"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#Zieltabelle\" >Zieltabelle<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-5\" href=\"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#Fazit-zu-Streams-und-Tasks-in-Snowflake\" >Fazit zu Streams und Tasks in Snowflake<\/a><\/li><\/ul><\/nav><\/div>\n<h2><span class=\"ez-toc-section\" id=\"Use-Case\"><\/span>Use Case<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>Um genau zu verstehen, wie die Objekttypen Streams und Tasks in Snowflake funktionieren, bauen wir Schritt f\u00fcr Schritt einen geeigneten Anwendungsfall.<br \/>\nHierf\u00fcr das Beispieldataset \u201e<a href=\"https:\/\/docs.snowflake.com\/de\/user-guide\/sample-data-tpcds\">TPC-DS<\/a>\u201c zum Einsatz. Da es frei verf\u00fcgbar ist, lassen sich alle Code-Beispiele gut in der eigenen Umgebung nachbauen.<\/p>\n<p>In unserem Use Case werden wir Daten von einer Quell- in eine Zieltabelle via Datenpipeline mit Streams und Tasks \u00fcberf\u00fchren und dabei<\/p>\n<ul>\n<li>einen Klon der Quelltabelle &#8222;CUSTOMER&#8220; erstellen<\/li>\n<li>diesen Klon inkrementell mit Daten der Quelltabelle f\u00fcllen<\/li>\n<li>zus\u00e4tzlich zuf\u00e4llige Aktualisierungen und L\u00f6schungen ausf\u00fchren<\/li>\n<li>alle CDC-\u00c4nderungen an dieser Tabelle identifizieren und in eine Zieltabelle weiterverarbeiten<\/li>\n<li>zur besseren Nutzbarkeit der Daten eine kleine beispielhafte Transformation implementieren<\/li>\n<li>verschiedene Tasks erstellen, ausf\u00fchren und zeitgesteuert einplanen<\/li>\n<li>einen Stream erstellen<\/li>\n<\/ul>\n<p>In den nachfolgenden Skripten muss die Bezeichnung f\u00fcr das Warehouse angepasst werden, wobei hier ein Warehouse der Gr\u00f6\u00dfe &#8222;XSMALL&#8220; ausreichend ist. Um die Skripte 1:1 ausf\u00fchren zu k\u00f6nnen, kann mit folgendem Code ein Warehouse mit dem Namen &#8222;DATALAB_WH&#8220; erstellt werden.<\/p>\n<pre class=\"lang:tsql decode:true \">-- create warehouse for following executions\r\nCREATE OR REPLACE WAREHOUSE DATALAB_WH\r\nWITH WAREHOUSE_SIZE = 'XSMALL'\r\nAUTO_SUSPEND = 10\r\nAUTO_RESUME = TRUE\r\nINITIALLY_SUSPENDED = TRUE;<\/pre>\n<p>Um eine saubere Trennung zu den Qelldaten aus dem TPD-DS Dataset zu erm\u00f6glichen, erstellen wir zun\u00e4chst ein separates Schema und erstellen dort alle weiteren Datenbankobjekte.<\/p>\n<pre class=\"lang:tsql decode:true\">-- create a separate schema \r\nCREATE OR replace SCHEMA TPC_DS.BLOG;<\/pre>\n<p>&nbsp;<\/p>\n<h2><span class=\"ez-toc-section\" id=\"Streams\"><\/span>Streams<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>In Snowflake stellt ein <a href=\"https:\/\/docs.snowflake.com\/en\/sql-reference\/sql\/create-stream\" target=\"_blank\" rel=\"noopener\">Stream<\/a> change data capture (CDC) Informationen zu einer Tabelle bereit. Hiermit lassen sich \u00c4nderungen (INSERT, UPDATE und DELETE) auf Zeilenebene zu einem Zeitpunkt identifizieren. Diese k\u00f6nnen dann je nach Anwendungsfall verschiedenartig weiterverarbeitet werden. Der Inhalt eines Streams kann \u00fcber eine gew\u00f6hnliche Query wie eine Tabelle abgefragt werden und enth\u00e4lt neben den Spalten der darunter liegenden Tabelle die folgenden drei zus\u00e4tzliche Spalten:<\/p>\n<ul>\n<li>&#8222;METADATA$ACTION&#8220;: Information \u00fcber die Art der Aktion. Kann die Werte INSERT oder DELETE annehmen. Updates werden in einem Stream als DELETE der urspr\u00fcnglichen Zeile und INSERT der Zeile mit neuem Wert dargestellt.<\/li>\n<li>&#8222;METADATA$ISUPDATE&#8220;: Information, ob es sich bei dieser Zeile um eine UPDATE Operation handelt. Kann die Werte TRUE oder FALSE annehmen.<\/li>\n<li>&#8222;METADATA$ROW_ID&#8220;: Eine eindeutige und unver\u00e4nderliche ID f\u00fcr jede Zeile in der Quelltabelle. Mit ihr lassen sich \u00fcber die Zeit hinweg \u00c4nderungen eindeutig zuordnen. Auch die durch UPDATES erzeugten DELETE und INSERT Eintr\u00e4ge im Stream k\u00f6nnen auf diese Weise durch dieselbe ID in Zusammenhang gesetzt werden.<\/li>\n<\/ul>\n<p>Wie zuvor erw\u00e4hnt, m\u00f6chten wir einen Staging-Klon der Customer-Tabelle im gerade erzeugten Schema erstellen. Diesen beladen wir sp\u00e4ter periodisch, um einen sich ver\u00e4ndernden Datenbestand einer produktiven Umgebung zu simulieren.<br \/>\nDer \u00dcbersicht halber arbeiten wir nur mit einer Auswahl der urspr\u00fcnglichen Spalten.<\/p>\n<pre class=\"lang:tsql decode:true\">-- create staging table clone from CUSTOMER as a new source we want to fill periodically\r\nCREATE OR replace TABLE TPC_DS.BLOG.STAGING_CUSTOMER (\r\n C_CUSTOMER_SK NUMBER(38, 0)\r\n ,C_CUSTOMER_ID VARCHAR(16777216)\r\n ,C_SALUTATION VARCHAR(16777216)\r\n ,C_FIRST_NAME VARCHAR(16777216)\r\n ,C_LAST_NAME VARCHAR(16777216)\r\n ,C_PREFERRED_CUST_FLAG VARCHAR(16777216)\r\n ,C_BIRTH_DAY NUMBER(38, 0)\r\n ,C_BIRTH_MONTH NUMBER(38, 0)\r\n ,C_BIRTH_YEAR NUMBER(38, 0)\r\n ,C_BIRTH_COUNTRY VARCHAR(16777216)\r\n ,C_LOGIN VARCHAR(16777216)\r\n ,C_EMAIL_ADDRESS VARCHAR(16777216)\r\n ,LAST_MODIFIED DATETIME\r\n );<\/pre>\n<p>Den Inhalt und die \u00c4nderungen an dieser Tabelle wollen wir sp\u00e4ter in einer kuratierten Zieltabelle festhalten. Um diese \u00c4nderungen zu erkennen, erstellen wir einen Stream.<\/p>\n<pre class=\"lang:tsql decode:true\">-- create stream on staging table\r\nCREATE OR replace stream STAGING_CUSTOMER_STREAM ON TABLE TPC_DS.BLOG.STAGING_CUSTOMER;<\/pre>\n<p><span style=\"font-weight: 400;\">Initial ist dieser Stream leer, da noch keine Operationen auf der darunter liegenden Tabelle ausgef\u00fchrt wurden.<\/span><\/p>\n<pre class=\"lang:tsql decode:true\">-- select stream\r\nSELECT * \r\nFROM TPC_DS.BLOG.STAGING_CUSTOMER_STREAM;<\/pre>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-48683\" src=\"https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_empty_stream-2.png\" alt=\"\" width=\"1151\" height=\"374\" srcset=\"https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_empty_stream-2.png 1151w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_empty_stream-2-300x97.png 300w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_empty_stream-2-1024x333.png 1024w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_empty_stream-2-768x250.png 768w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_empty_stream-2-400x130.png 400w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_empty_stream-2-360x117.png 360w\" sizes=\"auto, (max-width: 1151px) 100vw, 1151px\" \/><\/p>\n<p>&nbsp;<\/p>\n<h2><span class=\"ez-toc-section\" id=\"Tasks\"><\/span>Tasks<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>Ein <a href=\"https:\/\/docs.snowflake.com\/en\/sql-reference\/sql\/create-task\" target=\"_blank\" rel=\"noopener\">Task<\/a> bietet in Snowflake die M\u00f6glichkeit, auf einfache Art und Weise Transformationen, beispielsweise via SQL Statements, wiederholt ausf\u00fchren zu lassen.<\/p>\n<p>In unserem Fall werden wir mit einem Task den Inhalt eines Streams, also das Delta dessen zugrunde liegender Tabelle, gezielt mit einem SQL Statement weiterverarbeiten. Da ein Stream laufend \u00c4nderungen ansammelt und ein Task zur wiederkehrenden Ausf\u00fchrung gedacht ist, bietet sich die Verwendung in Kombination hervorragend f\u00fcr die Modellierung einer kontinuierlichen Datenpipeline an.<\/p>\n<p>Tasks kommen au\u00dferdem mit einer Vielzahl an zus\u00e4tzlichen Features daher, wie beispielsweise automatischem Logging und Monitoring, Scheduling sowie die an Bedingungen gekoppelte Ausf\u00fchrung.<\/p>\n<p><span style=\"font-weight: 400;\">Nachfolgend erzeugen wir einen Task, der ein Subset an Zeilen der Customer-Tabelle aus dem TPC-DS Dataset in den Staging-Klon l\u00e4dt. Wie eingangs erw\u00e4hnt, verschlanken wir die sehr breite Tabelle und arbeiten mit einer Auswahl an Spalten weiter.<\/span><\/p>\n<pre class=\"lang:tsql decode:true\">-- create task for incremental staging table load\r\nCREATE OR replace task STAGING_CUSTOMER_LOAD warehouse = DATALAB_WH schedule = '1 minute' AS\r\nINSERT INTO TPC_DS.BLOG.STAGING_CUSTOMER (\r\n  SELECT C_CUSTOMER_SK\r\n   ,C_CUSTOMER_ID\r\n   ,C_SALUTATION\r\n   ,C_FIRST_NAME\r\n   ,C_LAST_NAME\r\n   ,C_PREFERRED_CUST_FLAG\r\n   ,C_BIRTH_DAY\r\n   ,C_BIRTH_MONTH\r\n   ,C_BIRTH_YEAR\r\n   ,C_BIRTH_COUNTRY\r\n   ,C_LOGIN\r\n   ,C_EMAIL_ADDRESS\r\n   ,current_timestamp\r\n  FROM TPC_DS.silver_1gb.customer\r\n  WHERE round(c_customer_sk \/ 10) = (\r\n    SELECT COALESCE(MAX(ROUND(C_CUSTOMER_SK\/ 10)) + 1, 1)\r\n    FROM TPC_DS.BLOG.STAGING_CUSTOMER\r\n    )\r\n );<\/pre>\n<p>Ein Task ist bei der Erstellung standardm\u00e4\u00dfig deaktiviert. Es gibt die M\u00f6glichkeit<\/p>\n<ul>\n<li>den Task auszuf\u00fchren (&#8222;EXECUTE&#8220;)<\/li>\n<li>den konfigurierten zeitgesteuerten Plan zu aktivieren (&#8222;RESUME&#8220;)<\/li>\n<li>den zeitgesteuerten Plan zu deaktivieren\u00a0 (&#8222;SUPEND&#8220;).<\/li>\n<\/ul>\n<pre class=\"lang:tsql decode:true\">-- commands for the recent task\r\nexecute task TPC_DS.BLOG.STAGING_CUSTOMER_LOAD;\r\n-- alter task TPC_DS.BLOG.STAGING_CUSTOMER_LOAD resume;\r\n-- alter task TPC_DS.BLOG.STAGING_CUSTOMER_LOAD suspend;<\/pre>\n<p>Um einem realen Szenario m\u00f6glichst nahe zu kommen, erstellen wir auch Tasks, die L\u00f6schungen und \u00c4nderungen vornehmen.<\/p>\n<p>Bei dem Task &#8222;STAGING_CUSTOMER_DELETES&#8220; wird eine willk\u00fcrliche Zeile gel\u00f6scht.<br \/>\nBei dem Task &#8222;STAGING_CUSTOMER_UPDATES&#8220; \u00e4ndern wir den Wert des Felds &#8222;C_FIRST_NAME&#8220; ebenfalls f\u00fcr eine willk\u00fcrliche Zeile. Als Operation setzen wir den Inhalt des Felds \u00fcber die Funktion UPPER() auf Gro\u00dfbuchstaben. Beide Tasks werden noch \u00fcber den &#8222;EXECUTE&#8220;-Befehl ausgef\u00fchrt.<\/p>\n<pre class=\"lang:tsql decode:true\">-- create task for incremental delete simulation\r\nCREATE OR replace task STAGING_CUSTOMER_DELETES warehouse = DATALAB_WH schedule = '1 minute'\r\nAS\r\nDELETE FROM TPC_DS.BLOG.STAGING_CUSTOMER AS a USING (\r\n  SELECT TOP 1 c_customer_id\r\n  FROM TPC_DS.BLOG.STAGING_CUSTOMER\r\n  ORDER BY random()\r\n  ) AS b\r\nWHERE a.c_customer_id = b.c_customer_id;\r\n<\/pre>\n<pre class=\"lang:tsql decode:true\">-- commands for the recent task\r\nexecute task TPC_DS.BLOG.STAGING_CUSTOMER_DELETES;\r\n-- alter task TPC_DS.BLOG.STAGING_CUSTOMER_DELETES resume;\r\n-- alter task TPC_DS.BLOG.STAGING_CUSTOMER_DELETES suspend;\r\n<\/pre>\n<pre class=\"lang:tsql decode:true\">-- create task for incremental update simulation\r\nCREATE OR replace task STAGING_CUSTOMER_UPDATES warehouse = DATALAB_WH schedule = '1 minute'\r\nAS\r\nUPDATE TPC_DS.BLOG.STAGING_CUSTOMER\r\nSET C_FIRST_NAME = UPPER(C_FIRST_NAME)\r\nWHERE c_customer_id = (\r\n  SELECT TOP 1 c_customer_id\r\n  FROM TPC_DS.BLOG.STAGING_CUSTOMER\r\n  ORDER BY random()\r\n  );\r\n<\/pre>\n<pre class=\"lang:tsql decode:true\">-- commands for the recent task\r\nexecute task TPC_DS.BLOG.STAGING_CUSTOMER_UPDATES;\r\n-- alter task TPC_DS.BLOG.STAGING_CUSTOMER_UPDATES resume;\r\n-- alter task TPC_DS.BLOG.STAGING_CUSTOMER_UPDATES suspend;\r\n<\/pre>\n<p>Die drei historischen Ausf\u00fchrungen der Tasks (pro Task eine Ausf\u00fchrung) lassen sich \u00fcbersichtlich mit Bordmitteln abfragen. Es sind umfassende Metadaten zu den L\u00e4ufen vorhanden. Zu den bereitgestellten Informationen z\u00e4hlen beispielsweise<\/p>\n<ul>\n<li>der komplette Ausgef\u00fchrte Code in der Spalte &#8222;QUERY TEXT&#8220;,<\/li>\n<li>sofern konfiguriert, ist auch die Bedingung f\u00fcr jeden Ladelauf in der Spalte &#8222;CONDITION_TEXT&#8220; einsehbar,<\/li>\n<li>auch werden genaue Zeitstempel von Start &#8222;SCHEDULED_TIME&#8220; und Ende &#8222;COMPLETED_TIME&#8220; der Ausf\u00fchrung des Tasks, Start der Ausf\u00fchrung Query &#8222;QUERY_START_TIME&#8220; und n\u00e4chster planm\u00e4\u00dfiger Start &#8222;NEXT_SCHEDULED_TIME&#8220; des Tasks automatisch gepflegt.<\/li>\n<\/ul>\n<pre class=\"lang:tsql decode:true\">-- query task history\r\nSELECT *\r\nFROM TABLE (information_schema.task_history())\r\nWHERE name IN (\r\n  'STAGING_CUSTOMER_LOAD'\r\n  ,'STAGING_CUSTOMER_DELETES'\r\n  ,'STAGING_CUSTOMER_UPDATES'\r\n  )\r\nORDER BY scheduled_time DESC;\r\n<\/pre>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-48717\" src=\"https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_executed_tasks.png\" alt=\"\" width=\"1119\" height=\"386\" srcset=\"https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_executed_tasks.png 1119w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_executed_tasks-300x103.png 300w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_executed_tasks-1024x353.png 1024w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_executed_tasks-768x265.png 768w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_executed_tasks-400x138.png 400w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_executed_tasks-360x124.png 360w\" sizes=\"auto, (max-width: 1119px) 100vw, 1119px\" \/><\/p>\n<h2><span class=\"ez-toc-section\" id=\"Zieltabelle\"><\/span>Zieltabelle<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>Nachdem wir nun eine Quelltabelle f\u00fcr unsere Zwecke gebaut haben und auf dieser mit passenden Tasks INSERTS, UPDATES und DELETES erzeugen k\u00f6nnen, folgt wie eingangs erw\u00e4hnt unsere Zieltabelle. In dieser sollen sich grunds\u00e4tzlich alle durch die Tasks erzeugten oder ver\u00e4nderten Daten wiederfinden. Zus\u00e4tzlich sollen L\u00f6schungen erkenntlich gemacht und das umst\u00e4ndlich formatierte Geburtsdatum in den Datentyp DATE transformiert werden. Au\u00dferdem beladen wir die folgenden selbst erstellten Metadatenspalten:<\/p>\n<ul>\n<li>&#8222;CREATED_AT&#8220;: Zeitstempel der initialen Beladung.<\/li>\n<li>&#8222;DELETED_AT&#8220;: Zeitstempel der L\u00f6schung. NULL sofern Datensatz in der Quelltabelle nicht gel\u00f6scht wurde.<\/li>\n<li>&#8222;IS_DELETED&#8220;: Information dar\u00fcber, ob der Datensatz in der Quelltabelle gel\u00f6scht wurde. Kann die Werte TRUE oder FALSE annehmen.<\/li>\n<li>&#8222;LAST_MODIFIED_AT&#8220;: Zeitstempel der letzten \u00c4nderung des Datensatzes.<\/li>\n<\/ul>\n<p>Als Quelle dient uns der Inhalt des Streams. Die Weiterverarbeitung der Streamdaten l\u00e4sst sich je nach Anforderung gestalten.<\/p>\n<p>In unserem Beispiel m\u00f6chten wir zun\u00e4chst einmal alle neuen Eintr\u00e4ge und alle Aktualisierungen \u00fcberf\u00fchren. L\u00f6schungen sollen in der Zieltabelle bestehen bleiben und lediglich entsprechende Metadaten zur L\u00f6schung in die Felder &#8222;DELETED_AT&#8220; und &#8222;IS_DELETED&#8220; eingetragen werden.<\/p>\n<p>Bei dieser Gelegenheit (\u00dcberf\u00fchrung von Staging zu kuratiert) implementieren wir au\u00dferdem kleinere Datentransformationen, um die Daten f\u00fcr Endnutzer:innen besser lesbar zu machen. Konkret geht es um das Geburtsdatum, das in der Quelltabelle in separaten Spalten f\u00fcr Jahr, Monat und Tag gespeichert wurde. In unserem Fall bietet eine Umwandlung dieser Spalten in den Datentyp DATE eine bessere Lesbarkeit.<br \/>\nDie Zieltabelle wird mit folgendem Code erzeugt:<\/p>\n<pre class=\"lang:tsql decode:true \">-- create target table for customer stream data\r\nCREATE OR replace TABLE TPC_DS.BLOG.CUSTOMER (\r\n C_CUSTOMER_ID VARCHAR(16777216)\r\n ,C_SALUTATION VARCHAR(16777216)\r\n ,C_FIRST_NAME VARCHAR(16777216)\r\n ,C_LAST_NAME VARCHAR(16777216)\r\n ,C_BIRTH_COUNTRY VARCHAR(16777216)\r\n ,c_BIRTH_DATE DATE\r\n ,C_EMAIL_ADDRESS VARCHAR(16777216)\r\n ,CREATED_AT DATETIME\r\n ,DELETED_AT DATETIME\r\n ,IS_DELETED VARCHAR(7)\r\n ,LAST_MODIFIED_AT DATETIME\r\n );\r\n<\/pre>\n<p>Um diese Tabelle nun automatisiert zu bef\u00fcllen, erstellen wir einen letzten Task. Diesen konfigurieren wir so, dass er min\u00fctlich gestartet wird, und schalten \u00fcber den &#8222;RESUME&#8220;-Befehl die periodische Ausf\u00fchrung aktiv.<\/p>\n<pre class=\"lang:tsql decode:true \">-- create task to fill new customer target table (insert, update, delete) with stream data as source\r\nCREATE OR replace task CUSTOMER_LOAD warehouse = DATALAB_WH schedule = '1 minute' \r\nwhen system$stream_has_data('TPC_DS.BLOG.STAGING_CUSTOMER_STREAM')\r\nAS\r\nMERGE INTO TPC_DS.BLOG.CUSTOMER t\r\nUSING TPC_DS.BLOG.STAGING_CUSTOMER_STREAM s\r\n ON t.c_customer_id = s.c_customer_id\r\n  -- new entry: insert entry\r\nWHEN NOT MATCHED\r\n AND METADATA$ACTION = 'INSERT'\r\n AND METADATA$ISUPDATE = 'False'\r\n THEN\r\n  INSERT (\r\n   C_CUSTOMER_ID\r\n   ,C_SALUTATION\r\n   ,C_FIRST_NAME\r\n   ,C_LAST_NAME\r\n   ,C_BIRTH_COUNTRY\r\n   ,C_BIRTH_DATE\r\n   ,C_EMAIL_ADDRESS\r\n   ,CREATED_AT\r\n   ,DELETED_AT\r\n   ,IS_DELETED\r\n   ,LAST_MODIFIED_AT\r\n   )\r\n  VALUES (\r\n   s.C_CUSTOMER_ID\r\n   ,s.C_SALUTATION\r\n   ,s.C_FIRST_NAME\r\n   ,s.C_LAST_NAME\r\n   ,s.C_BIRTH_COUNTRY\r\n   ,TO_DATE(CONCAT(s.C_BIRTH_YEAR, '-', s.C_BIRTH_MONTH, '-',  s.C_BIRTH_DAY))\r\n   ,s.C_EMAIL_ADDRESS\r\n   ,current_timestamp\r\n   ,NULL\r\n   ,'False'\r\n   ,current_timestamp\r\n   )\r\n   -- deleted entry: update entry\r\nWHEN MATCHED\r\n AND s.METADATA$ACTION = 'DELETE'\r\n AND s.METADATA$ISUPDATE = 'False'\r\n THEN\r\n  UPDATE\r\n  SET t.IS_DELETED = 'True'\r\n   ,DELETED_AT = current_timestamp\r\n   -- updated entry: update entry\r\nWHEN MATCHED\r\n AND s.METADATA$ACTION = 'INSERT'\r\n AND s.METADATA$ISUPDATE = 'True'\r\n THEN\r\n  UPDATE\r\n  SET t.C_SALUTATION = s.C_SALUTATION\r\n   ,t.C_FIRST_NAME = s.C_FIRST_NAME\r\n   ,t.C_LAST_NAME = s.C_LAST_NAME\r\n   ,t.C_BIRTH_COUNTRY = s.C_BIRTH_COUNTRY\r\n   ,t.C_BIRTH_DATE = TO_DATE(CONCAT(s.C_BIRTH_YEAR, '-', s.C_BIRTH_MONTH, '-',  s.C_BIRTH_DAY))\r\n   ,t.C_EMAIL_ADDRESS = s.C_EMAIL_ADDRESS\r\n   ,t.LAST_MODIFIED_AT = current_timestamp;\r\n<\/pre>\n<pre class=\"lang:tsql decode:true\">-- commands for the recent task\r\n-- execute task TPC_DS.BLOG.CUSTOMER_LOAD;\r\nalter task TPC_DS.BLOG.CUSTOMER_LOAD resume;\r\n-- alter task TPC_DS.BLOG.CUSTOMER_LOAD suspend;\r\n<\/pre>\n<p>Da die Tasks zur Bef\u00fcllung der Staging-Tabelle bisher nur manuell ausgef\u00fchrt wurden und noch nicht zeitlich eingeplant sind, zeigt die Ausf\u00fchrungshistorie nur eine Aktivit\u00e4t (gr\u00fcne Markierung im n\u00e4chsten Screenshot). Danach wurde die Ausf\u00fchrung aufgrund der implementierten Bedingung \u00fcbersprungen (gelbe Markierung). Es handelt sich dabei um die Pr\u00fcfung auf den Inhalt des Streams, so dass der Task nur dann startet, wenn auch zu verarbeitende Zeilen vorhanden sind. Konkret umgesetzt wurde das elegant \u00fcber die Funktion &#8222;system$stream_has_data()&#8220;. Anschaulich wird das Ganze, wenn wir wenige Minuten, also ein paar Starts des Tasks, abwarten und dann die Historie abfragen. \u00dcber diese Selektion sind eine Vielzahl an Metainformationen zu den Aufrufen des Tasks einsehbar.<\/p>\n<pre class=\"lang:tsql decode:true\">-- query task history\r\nSELECT *\r\nFROM TABLE (information_schema.task_history())\r\nWHERE name IN (\r\n  'CUSTOMER_LOAD'\r\n  )\r\nORDER BY scheduled_time DESC;\r\n<\/pre>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-48728\" src=\"https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_1-1.png\" alt=\"\" width=\"1122\" height=\"381\" srcset=\"https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_1-1.png 1122w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_1-1-300x102.png 300w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_1-1-1024x348.png 1024w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_1-1-768x261.png 768w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_1-1-400x136.png 400w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_1-1-360x122.png 360w\" sizes=\"auto, (max-width: 1122px) 100vw, 1122px\" \/><\/p>\n<p>Abschlie\u00dfend aktivieren wir die zeitgesteuerte Ausf\u00fchrung der zu Beginn angelegten Tasks. Diese wurden ebenfalls mit einem min\u00fctlichen Zeitintervall erstellt. \u00a0Wir warten nochmals ein paar Minuten und fragen die Ausf\u00fchrungshistorie \u00fcber obige Query erneut ab. Wir sehen nun mehrfach erfolgreich durchgef\u00fchrte L\u00e4ufe.<\/p>\n<pre class=\"lang:tsql decode:true\">-- resume recent tasks\r\nalter task TPC_DS.BLOG.STAGING_CUSTOMER_LOAD resume;\r\nalter task TPC_DS.BLOG.STAGING_CUSTOMER_DELETES resume;\r\nalter task TPC_DS.BLOG.STAGING_CUSTOMER_UPDATES resume;\r\n<\/pre>\n<pre class=\"lang:tsql decode:true \">-- query task history\r\nSELECT *\r\nFROM TABLE (information_schema.task_history())\r\nWHERE name IN (\r\n  'CUSTOMER_LOAD'\r\n  )\r\nORDER BY scheduled_time DESC;\r\n<\/pre>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-48731\" src=\"https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_2.png\" alt=\"\" width=\"1114\" height=\"452\" srcset=\"https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_2.png 1114w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_2-300x122.png 300w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_2-1024x415.png 1024w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_2-768x312.png 768w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_2-400x162.png 400w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_select_target_table_tasks_2-360x146.png 360w\" sizes=\"auto, (max-width: 1114px) 100vw, 1114px\" \/><\/p>\n<p><span style=\"font-weight: 400;\">Um Kosten zu sparen, deaktivieren wir am Ende die automatische Ausf\u00fchrung aller Tasks und stoppen sicherheitshalber auch den Betrieb des Warehouses.<\/span><\/p>\n<pre class=\"lang:tsql decode:true\">-- suspend all tasks\r\nalter task TPC_DS.BLOG.STAGING_CUSTOMER_LOAD suspend;\r\nalter task TPC_DS.BLOG.STAGING_CUSTOMER_DELETES suspend;\r\nalter task TPC_DS.BLOG.STAGING_CUSTOMER_UPDATES suspend;\r\nalter task TPC_DS.BLOG.CUSTOMER_LOAD suspend;\r\n-- suspend warehouse\r\nalter warehouse DATALAB_WH suspend;\r\n<\/pre>\n<h2><span class=\"ez-toc-section\" id=\"Fazit-zu-Streams-und-Tasks-in-Snowflake\"><\/span>Fazit zu Streams und Tasks in Snowflake<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>Selbst getrennt betrachtet bieten Streams und auch Tasks je f\u00fcr sich von Haus aus schon sehr viel Funktionalit\u00e4t und M\u00f6glichkeiten zur Konfiguration.<\/p>\n<p>Tasks sind ein sehr einfaches und schlankes Mittel, um wiederkehrende SQL Statements ausf\u00fchren zu lassen. Der zeitliche Turnus l\u00e4sst sich dabei je nach Anforderung bis auf eine min\u00fctliche Ausf\u00fchrung takten. Sollte mehr Individualit\u00e4t notwendig sein, kann das Scheduling auch via Crontab beliebig an die Gegebenheiten angepasst werden.<\/p>\n<p>F\u00fcr komplexere Abl\u00e4ufe k\u00f6nnen Tasks ganz einfach in Reihe \/ in Abh\u00e4ngigkeit geschaltet werden und so beliebige Abl\u00e4ufe steuern. Definiert werden diese dann \u00fcber einen Directed Acyclic Graph (DAG). Zum Einsatz kommt dies in unseren Projekten beispielsweise bei der Beladung von Data Warehouses. Bevor die Faktentabellen prozessiert werden k\u00f6nnen, muss sichergestellt sein, dass die dazugeh\u00f6rigen Quellsysteme und Stammdaten (Dimensionen) vollst\u00e4ndig beladen sind:<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-48845\" src=\"https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_dag_diagram_data_warehouse-1.png\" alt=\"\" width=\"1136\" height=\"594\" srcset=\"https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_dag_diagram_data_warehouse-1.png 1136w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_dag_diagram_data_warehouse-1-300x157.png 300w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_dag_diagram_data_warehouse-1-1024x535.png 1024w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_dag_diagram_data_warehouse-1-768x402.png 768w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_dag_diagram_data_warehouse-1-400x209.png 400w, https:\/\/www.inovex.de\/wp-content\/uploads\/snowflake_dag_diagram_data_warehouse-1-360x188.png 360w\" sizes=\"auto, (max-width: 1136px) 100vw, 1136px\" \/><\/p>\n<p>Mit Streams haben wir ebenfalls ein sehr m\u00e4chtiges, aber dennoch schlankes Werkzeug zur Orchestrierung von Daten zur Verf\u00fcgung. Je nach Anwendungsfall kommt es in unseren gr\u00f6\u00dferen Datenplattformen dazu, dass wir mehrere Datenstr\u00f6me f\u00fcr den Inhalt eines Streams haben. Da dieser aber bei Verarbeitung sofort geleert wird, bietet es sich an, f\u00fcr verschiedene Abl\u00e4ufe separate Streams auf dieselbe Quelltabelle zu setzen. Ein Stream kann \u00fcbrigens nicht nur eine Tabelle, sondern auch Verzeichnistabellen, externe Tabellen, oder die einer View zugrunde liegenden Tabellen auf \u00c4nderungen \u00fcberwachen.<\/p>\n<p>Die individuellen St\u00e4rken entfalten sich \u2013 wie Eingangs im Artikel erw\u00e4hnt \u2013 umso mehr, wenn sie im Zusammenspiel als Datenpipeline implementiert werden. Auf anderen Stacks und insbesondere bei fr\u00fcheren L\u00f6sungen zu Datenplattformen musste hier noch sehr viel manueller Code geschrieben und gewartet werden. Es f\u00fchlt sich sehr gut an, wie diese beiden Tools nahtlos in Snowflake integriert sind und wie smooth sie sich in verschiedenen Konstellationen nutzen lassen.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Willkommen im zweiten Artikel unserer Serie \u00fcber die Snowflake Data Cloud. Der erste Artikel der Serie hat sich mit Snowpipe besch\u00e4ftigt und wie es das\u00a0Laden von Dateien erm\u00f6glicht, sobald sie in einem externen Cloudspeicher zur Verf\u00fcgung stehen. \u00a0In diesem Teil stellen wir eine weitere M\u00f6glichkeit vor, Daten \u00fcber eine gesteuerte Pipeline zu laden und im [&hellip;]<\/p>\n","protected":false},"author":379,"featured_media":49235,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"inline_featured_image":false,"ep_exclude_from_search":false,"footnotes":""},"tags":[181,385,1108],"service":[446,411],"coauthors":[{"id":379,"display_name":"Andreas Hauser","user_nicename":"ahauser"}],"class_list":["post-48559","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","tag-business-intelligence","tag-data-engineering","tag-snowflake","service-business-intelligence","service-data-engineering"],"acf":[],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v27.4 - https:\/\/yoast.com\/product\/yoast-seo-wordpress\/ -->\n<title>Kontinuierliche Datenpipeline mit Streams und Tasks in Snowflake - inovex GmbH<\/title>\n<meta name=\"description\" content=\"In diesem Artikel zu Snowflake zeigen wir mit Streams und Tasks zwei Tools, die Daten \u00fcber eine gesteuerte Pipeline Laden und Transformieren.\" \/>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/\" \/>\n<meta property=\"og:locale\" content=\"de_DE\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"Kontinuierliche Datenpipeline mit Streams und Tasks in Snowflake - inovex GmbH\" \/>\n<meta property=\"og:description\" content=\"In diesem Artikel zu Snowflake zeigen wir mit Streams und Tasks zwei Tools, die Daten \u00fcber eine gesteuerte Pipeline Laden und Transformieren.\" \/>\n<meta property=\"og:url\" content=\"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/\" \/>\n<meta property=\"og:site_name\" content=\"inovex GmbH\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/inovexde\" \/>\n<meta property=\"article:published_time\" content=\"2023-10-06T13:12:02+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2023-10-10T09:23:43+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/www.inovex.de\/wp-content\/uploads\/Kontinuierliche-Datenpipeline-in-Snowflake-2.png\" \/>\n\t<meta property=\"og:image:width\" content=\"1500\" \/>\n\t<meta property=\"og:image:height\" content=\"880\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/png\" \/>\n<meta name=\"author\" content=\"Andreas Hauser\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:image\" content=\"https:\/\/www.inovex.de\/wp-content\/uploads\/Kontinuierliche-Datenpipeline-in-Snowflake-2-1024x601.png\" \/>\n<meta name=\"twitter:creator\" content=\"@inovexgmbh\" \/>\n<meta name=\"twitter:site\" content=\"@inovexgmbh\" \/>\n<meta name=\"twitter:label1\" content=\"Verfasst von\" \/>\n\t<meta name=\"twitter:data1\" content=\"Andreas Hauser\" \/>\n\t<meta name=\"twitter:label2\" content=\"Gesch\u00e4tzte Lesezeit\" \/>\n\t<meta name=\"twitter:data2\" content=\"14\u00a0Minuten\" \/>\n\t<meta name=\"twitter:label3\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data3\" content=\"Andreas Hauser\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\\\/\\\/schema.org\",\"@graph\":[{\"@type\":\"Article\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/#article\",\"isPartOf\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/\"},\"author\":{\"name\":\"Andreas Hauser\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#\\\/schema\\\/person\\\/4cddd44d8928450633aaa3bd4df2e3ce\"},\"headline\":\"Kontinuierliche Datenpipeline mit Streams und Tasks in Snowflake\",\"datePublished\":\"2023-10-06T13:12:02+00:00\",\"dateModified\":\"2023-10-10T09:23:43+00:00\",\"mainEntityOfPage\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/\"},\"wordCount\":1795,\"commentCount\":0,\"publisher\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#organization\"},\"image\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/#primaryimage\"},\"thumbnailUrl\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/Kontinuierliche-Datenpipeline-in-Snowflake-2.png\",\"keywords\":[\"Business Intelligence\",\"Data Engineering\",\"Snowflake\"],\"articleSection\":[\"Analytics\"],\"inLanguage\":\"de\",\"potentialAction\":[{\"@type\":\"CommentAction\",\"name\":\"Comment\",\"target\":[\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/#respond\"]}]},{\"@type\":\"WebPage\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/\",\"url\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/\",\"name\":\"Kontinuierliche Datenpipeline mit Streams und Tasks in Snowflake - inovex GmbH\",\"isPartOf\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#website\"},\"primaryImageOfPage\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/#primaryimage\"},\"image\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/#primaryimage\"},\"thumbnailUrl\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/Kontinuierliche-Datenpipeline-in-Snowflake-2.png\",\"datePublished\":\"2023-10-06T13:12:02+00:00\",\"dateModified\":\"2023-10-10T09:23:43+00:00\",\"description\":\"In diesem Artikel zu Snowflake zeigen wir mit Streams und Tasks zwei Tools, die Daten \u00fcber eine gesteuerte Pipeline Laden und Transformieren.\",\"breadcrumb\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/#breadcrumb\"},\"inLanguage\":\"de\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/\"]}]},{\"@type\":\"ImageObject\",\"inLanguage\":\"de\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/#primaryimage\",\"url\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/Kontinuierliche-Datenpipeline-in-Snowflake-2.png\",\"contentUrl\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/Kontinuierliche-Datenpipeline-in-Snowflake-2.png\",\"width\":1500,\"height\":880,\"caption\":\"Kontinuierliche Datenpipeline in Snowflake\"},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/datenpipeline-mit-streams-und-tasks-in-snowflake\\\/#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"Home\",\"item\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"Kontinuierliche Datenpipeline mit Streams und Tasks in Snowflake\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#website\",\"url\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/\",\"name\":\"inovex GmbH\",\"description\":\"\",\"publisher\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#organization\"},\"potentialAction\":[{\"@type\":\"SearchAction\",\"target\":{\"@type\":\"EntryPoint\",\"urlTemplate\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/?s={search_term_string}\"},\"query-input\":{\"@type\":\"PropertyValueSpecification\",\"valueRequired\":true,\"valueName\":\"search_term_string\"}}],\"inLanguage\":\"de\"},{\"@type\":\"Organization\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#organization\",\"name\":\"inovex GmbH\",\"url\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/\",\"logo\":{\"@type\":\"ImageObject\",\"inLanguage\":\"de\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#\\\/schema\\\/logo\\\/image\\\/\",\"url\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/2021\\\/03\\\/inovex-logo-16-9-1.png\",\"contentUrl\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/2021\\\/03\\\/inovex-logo-16-9-1.png\",\"width\":1921,\"height\":1081,\"caption\":\"inovex GmbH\"},\"image\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#\\\/schema\\\/logo\\\/image\\\/\"},\"sameAs\":[\"https:\\\/\\\/www.facebook.com\\\/inovexde\",\"https:\\\/\\\/x.com\\\/inovexgmbh\",\"https:\\\/\\\/www.instagram.com\\\/inovexlife\\\/\",\"https:\\\/\\\/www.linkedin.com\\\/company\\\/inovex\",\"https:\\\/\\\/www.youtube.com\\\/channel\\\/UC7r66GT14hROB_RQsQBAQUQ\"]},{\"@type\":\"Person\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#\\\/schema\\\/person\\\/4cddd44d8928450633aaa3bd4df2e3ce\",\"name\":\"Andreas Hauser\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"de\",\"@id\":\"https:\\\/\\\/secure.gravatar.com\\\/avatar\\\/cb4ef7717069c40e3590b1782ae86ba9e910455afefd8d57adfe31a7790e1e62?s=96&d=retro&r=g75aeb9af1b9caa8a4608c2d7502417e3\",\"url\":\"https:\\\/\\\/secure.gravatar.com\\\/avatar\\\/cb4ef7717069c40e3590b1782ae86ba9e910455afefd8d57adfe31a7790e1e62?s=96&d=retro&r=g\",\"contentUrl\":\"https:\\\/\\\/secure.gravatar.com\\\/avatar\\\/cb4ef7717069c40e3590b1782ae86ba9e910455afefd8d57adfe31a7790e1e62?s=96&d=retro&r=g\",\"caption\":\"Andreas Hauser\"},\"sameAs\":[\"https:\\\/\\\/www.linkedin.com\\\/in\\\/andreas-hauser-de\\\/\"],\"url\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/author\\\/ahauser\\\/\"}]}<\/script>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"Kontinuierliche Datenpipeline mit Streams und Tasks in Snowflake - inovex GmbH","description":"In diesem Artikel zu Snowflake zeigen wir mit Streams und Tasks zwei Tools, die Daten \u00fcber eine gesteuerte Pipeline Laden und Transformieren.","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/","og_locale":"de_DE","og_type":"article","og_title":"Kontinuierliche Datenpipeline mit Streams und Tasks in Snowflake - inovex GmbH","og_description":"In diesem Artikel zu Snowflake zeigen wir mit Streams und Tasks zwei Tools, die Daten \u00fcber eine gesteuerte Pipeline Laden und Transformieren.","og_url":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/","og_site_name":"inovex GmbH","article_publisher":"https:\/\/www.facebook.com\/inovexde","article_published_time":"2023-10-06T13:12:02+00:00","article_modified_time":"2023-10-10T09:23:43+00:00","og_image":[{"width":1500,"height":880,"url":"https:\/\/www.inovex.de\/wp-content\/uploads\/Kontinuierliche-Datenpipeline-in-Snowflake-2.png","type":"image\/png"}],"author":"Andreas Hauser","twitter_card":"summary_large_image","twitter_image":"https:\/\/www.inovex.de\/wp-content\/uploads\/Kontinuierliche-Datenpipeline-in-Snowflake-2-1024x601.png","twitter_creator":"@inovexgmbh","twitter_site":"@inovexgmbh","twitter_misc":{"Verfasst von":"Andreas Hauser","Gesch\u00e4tzte Lesezeit":"14\u00a0Minuten","Written by":"Andreas Hauser"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#article","isPartOf":{"@id":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/"},"author":{"name":"Andreas Hauser","@id":"https:\/\/www.inovex.de\/de\/#\/schema\/person\/4cddd44d8928450633aaa3bd4df2e3ce"},"headline":"Kontinuierliche Datenpipeline mit Streams und Tasks in Snowflake","datePublished":"2023-10-06T13:12:02+00:00","dateModified":"2023-10-10T09:23:43+00:00","mainEntityOfPage":{"@id":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/"},"wordCount":1795,"commentCount":0,"publisher":{"@id":"https:\/\/www.inovex.de\/de\/#organization"},"image":{"@id":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#primaryimage"},"thumbnailUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/Kontinuierliche-Datenpipeline-in-Snowflake-2.png","keywords":["Business Intelligence","Data Engineering","Snowflake"],"articleSection":["Analytics"],"inLanguage":"de","potentialAction":[{"@type":"CommentAction","name":"Comment","target":["https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#respond"]}]},{"@type":"WebPage","@id":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/","url":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/","name":"Kontinuierliche Datenpipeline mit Streams und Tasks in Snowflake - inovex GmbH","isPartOf":{"@id":"https:\/\/www.inovex.de\/de\/#website"},"primaryImageOfPage":{"@id":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#primaryimage"},"image":{"@id":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#primaryimage"},"thumbnailUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/Kontinuierliche-Datenpipeline-in-Snowflake-2.png","datePublished":"2023-10-06T13:12:02+00:00","dateModified":"2023-10-10T09:23:43+00:00","description":"In diesem Artikel zu Snowflake zeigen wir mit Streams und Tasks zwei Tools, die Daten \u00fcber eine gesteuerte Pipeline Laden und Transformieren.","breadcrumb":{"@id":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#breadcrumb"},"inLanguage":"de","potentialAction":[{"@type":"ReadAction","target":["https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/"]}]},{"@type":"ImageObject","inLanguage":"de","@id":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#primaryimage","url":"https:\/\/www.inovex.de\/wp-content\/uploads\/Kontinuierliche-Datenpipeline-in-Snowflake-2.png","contentUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/Kontinuierliche-Datenpipeline-in-Snowflake-2.png","width":1500,"height":880,"caption":"Kontinuierliche Datenpipeline in Snowflake"},{"@type":"BreadcrumbList","@id":"https:\/\/www.inovex.de\/de\/blog\/datenpipeline-mit-streams-und-tasks-in-snowflake\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Home","item":"https:\/\/www.inovex.de\/de\/"},{"@type":"ListItem","position":2,"name":"Kontinuierliche Datenpipeline mit Streams und Tasks in Snowflake"}]},{"@type":"WebSite","@id":"https:\/\/www.inovex.de\/de\/#website","url":"https:\/\/www.inovex.de\/de\/","name":"inovex GmbH","description":"","publisher":{"@id":"https:\/\/www.inovex.de\/de\/#organization"},"potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/www.inovex.de\/de\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"de"},{"@type":"Organization","@id":"https:\/\/www.inovex.de\/de\/#organization","name":"inovex GmbH","url":"https:\/\/www.inovex.de\/de\/","logo":{"@type":"ImageObject","inLanguage":"de","@id":"https:\/\/www.inovex.de\/de\/#\/schema\/logo\/image\/","url":"https:\/\/www.inovex.de\/wp-content\/uploads\/2021\/03\/inovex-logo-16-9-1.png","contentUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/2021\/03\/inovex-logo-16-9-1.png","width":1921,"height":1081,"caption":"inovex GmbH"},"image":{"@id":"https:\/\/www.inovex.de\/de\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/inovexde","https:\/\/x.com\/inovexgmbh","https:\/\/www.instagram.com\/inovexlife\/","https:\/\/www.linkedin.com\/company\/inovex","https:\/\/www.youtube.com\/channel\/UC7r66GT14hROB_RQsQBAQUQ"]},{"@type":"Person","@id":"https:\/\/www.inovex.de\/de\/#\/schema\/person\/4cddd44d8928450633aaa3bd4df2e3ce","name":"Andreas Hauser","image":{"@type":"ImageObject","inLanguage":"de","@id":"https:\/\/secure.gravatar.com\/avatar\/cb4ef7717069c40e3590b1782ae86ba9e910455afefd8d57adfe31a7790e1e62?s=96&d=retro&r=g75aeb9af1b9caa8a4608c2d7502417e3","url":"https:\/\/secure.gravatar.com\/avatar\/cb4ef7717069c40e3590b1782ae86ba9e910455afefd8d57adfe31a7790e1e62?s=96&d=retro&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/cb4ef7717069c40e3590b1782ae86ba9e910455afefd8d57adfe31a7790e1e62?s=96&d=retro&r=g","caption":"Andreas Hauser"},"sameAs":["https:\/\/www.linkedin.com\/in\/andreas-hauser-de\/"],"url":"https:\/\/www.inovex.de\/de\/blog\/author\/ahauser\/"}]}},"_links":{"self":[{"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/posts\/48559","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/users\/379"}],"replies":[{"embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/comments?post=48559"}],"version-history":[{"count":5,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/posts\/48559\/revisions"}],"predecessor-version":[{"id":49231,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/posts\/48559\/revisions\/49231"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/media\/49235"}],"wp:attachment":[{"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/media?parent=48559"}],"wp:term":[{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/tags?post=48559"},{"taxonomy":"service","embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/service?post=48559"},{"taxonomy":"author","embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/coauthors?post=48559"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}