{"id":21066,"date":"2017-10-17T07:58:59","date_gmt":"2017-10-17T06:58:59","guid":{"rendered":"https:\/\/www.inovex.de\/blog\/?p=3539"},"modified":"2024-08-29T08:04:31","modified_gmt":"2024-08-29T06:04:31","slug":"efficient-udafs-with-pyspark","status":"publish","type":"post","link":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/","title":{"rendered":"Efficient UD(A)Fs with PySpark"},"content":{"rendered":"<p>Nowadays, Spark surely is one of the most prevalent technologies in the fields of data science and big data. Luckily, even though it is developed in Scala and runs in the Java Virtual Machine (JVM), it comes with Python bindings also known as PySpark, whose API was heavily influenced by <a href=\"http:\/\/pandas.pydata.org\/\" target=\"_blank\" rel=\"noopener noreferrer\">Pandas<\/a>. With respect to functionality, modern PySpark has about the same capabilities as Pandas when it comes to typical ETL and data wrangling, e.g. groupby, aggregations and so on. As a general rule of thumb, one should consider an alternative to Pandas whenever the data set has more than 10,000,000 rows which, depending on the number of columns and data types, translates to about 5-10 GB of memory usage. At that point PySpark might be an option for you that does the job, but of course there are others like for instance <a href=\"http:\/\/dask.pydata.org\/en\/latest\/index.html\" target=\"_blank\" rel=\"noopener noreferrer\">Dask<\/a> which won\u2019t be addressed in this post.<!--more--><\/p>\n<p>If you are new to Spark, one important thing to note is that Spark has two remarkable features besides its programmatic data wrangling capabilities. One is that Spark comes with SQL as an alternative way of defining queries and the other is <a href=\"https:\/\/spark.apache.org\/docs\/latest\/ml-guide.html\" target=\"_blank\" rel=\"noopener noreferrer\">Spark MLlib<\/a> for machine learning. Both topics are beyond the scope of this post but should be taken into account if you are considering PySpark as an alternative to Pandas and scikit-learn for larger data sets.<\/p>\n<p>But enough praise for PySpark, there are still some ugly sides as well as rough edges to it and we want to address some of them here, of course, in a constructive way. First of all, due to its relatively young age, PySpark lacks some features that Pandas provides, for example in areas such as reshaping\/pivoting or time series. Also, it is not as straightforward to use advanced mathematical functions from SciPy within PySpark. That\u2019s why sooner or later, you might walk into a scenario where you want to apply some Pandas or SciPy operations to your data frame in PySpark. Unfortunately, there is no built-in mechanism for using Pandas transformations in PySpark. In fact, this requires a lot of boilerplate code with many error-prone details to consider. Therefore we make a wish to the coding fairy, cross two fingers that someone else already solved this and start googling\u2026 and here we are \ud83d\ude09<\/p>\n<p>The remainder of this blog post walks you through the process of writing efficient Pandas UDAFs in PySpark. In fact, we end up abstracting all the necessary boilerplate code into a single Python decorator, which allows us to conveniently specify our PySpark Pandas function. To give more insights into performance considerations, this post also contains a little journey into the internals of PySpark.<\/p>\n<div id=\"ez-toc-container\" class=\"ez-toc-v2_0_83 counter-hierarchy ez-toc-counter ez-toc-custom ez-toc-container-direction\">\n<div class=\"ez-toc-title-container\"><p class=\"ez-toc-title\" style=\"cursor:inherit\"><\/p>\n<\/div><nav><ul class='ez-toc-list ez-toc-list-level-1 ' ><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-1\" href=\"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/#UDAFs-with-RDDs\" >UDAFs with RDDs<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-2\" href=\"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/#PySpark-internals\" >PySpark internals<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-3\" href=\"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/#PySpark-UDAFs-with-Pandas\" >PySpark UDAFs with Pandas<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-4\" href=\"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/#Summary\" >Summary<\/a><\/li><\/ul><\/nav><\/div>\n<h2><span class=\"ez-toc-section\" id=\"UDAFs-with-RDDs\"><\/span>UDAFs with RDDs<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>To start with a recap, an aggregation function is a function that operates on a set of rows and produces a result, for example a <span class=\"lang:python decode:true crayon-inline \">sum()<\/span> or <span class=\"lang:python decode:true crayon-inline \">count()<\/span> function. A User-Defined Aggregation Function (UDAF) is typically used for more complex aggregations that are not natively shipped with your analysis tool in question. In our case, this means we provide some Python code that takes a set of rows and produces an aggregate result. At the time of writing &#8211; with PySpark 2.2 as latest version &#8211; there is no \u201cofficial\u201c way of defining an arbitrary UDAF function. Also, the tracking Jira issue <a href=\"https:\/\/issues.apache.org\/jira\/browse\/SPARK-10915\" target=\"_blank\" rel=\"noopener noreferrer\">SPARK-10915<\/a> does not indicate that this changes in near future. Depending on your use-case, this might even be a reason to completely discard PySpark as a viable solution. However, as you might have guessed from the title of this article, there are workarounds to the rescue. This is where the RDD API comes in. As a reminder, a Resilient Distributed Dataset (RDD) is the low-level data structure of Spark and a Spark DataFrame is built on top of it. As we are mostly dealing with DataFrames in PySpark, we can get access to the underlying RDD with the help of the <span class=\"lang:python decode:true crayon-inline \">rdd<\/span> attribute and convert it back with <span class=\"lang:python decode:true crayon-inline \">toDF()<\/span> . This RDD API allows us to specify arbitrary Python functions that get executed on the data. To give an example, let\u2019s say we have a DataFrame <span class=\"lang:python decode:true crayon-inline \">df<\/span> of one billion rows with a boolean\u00a0<span class=\"lang:python decode:true crayon-inline\">is_sold<\/span>\u00a0column and we want to filter for rows with sold products. One could accomplish this with the code<\/p>\n<pre class=\"lang:python decode:true\">df.rdd.filter(lambda x: x.is_sold == True).toDF()<\/pre>\n<p>Although not explicitly declared as such, this lambda function is essentially a user-defined function (UDF). For this exact use case, we could also use the more high-level DataFrame <span class=\"lang:python decode:true crayon-inline \">filter()<\/span> method, producing the same result:<\/p>\n<pre class=\"lang:python decode:true \">df.filter(df.is_sold == True)<\/pre>\n<p>Before we now go into the details on how to implement UDAFs using the RDD API, there is something important to keep in mind which might sound counterintuitive to the title of this post: in PySpark you should avoid all kind of Python UDFs &#8211; like RDD functions or data frame UDFs &#8211; as much as possible! Whenever there is a built-in DataFrame method available, this will be much faster than its RDD counterpart. To get a better understanding of the substantial performance difference, we will now take a little detour and investigate what happens behind the scenes in those two filter examples.<\/p>\n<h2><span class=\"ez-toc-section\" id=\"PySpark-internals\"><\/span>PySpark internals<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>PySpark is actually a wrapper around the Spark core written in Scala. When you start your <a href=\"http:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.SparkSession\" target=\"_blank\" rel=\"noopener noreferrer\">SparkSession<\/a> in Python, in the background PySpark uses <a href=\"https:\/\/www.py4j.org\/\" target=\"_blank\" rel=\"noopener noreferrer\">Py4J<\/a> to launch a JVM and create a Java SparkContext. All PySpark operations, for example our <span class=\"lang:python decode:true crayon-inline \">df.filter()<\/span> method call, behind the scenes get translated into corresponding calls on the respective Spark DataFrame object within the JVM SparkContext. This is in general extremely fast and the overhead can be neglected as long as you don\u2019t call the function millions of times. So in our <span class=\"lang:python decode:true crayon-inline \">df.filter()<\/span> example, the DataFrame operation and the filter condition will be send to the Java SparkContext, where it gets compiled into an overall optimized query plan. Once the query is executed, the filter condition is evaluated on the distributed DataFrame within Java, without any callback to Python! In case our workflow loads the DataFrame from Hive and saves the resulting DataFrame as Hive table, throughout the entire query execution all data operations are performed in a distributed fashion within Java Spark workers, which allows Spark to be very fast for queries on large data sets. Okay, so why is the RDD <span class=\"lang:python decode:true crayon-inline \">filter()<\/span> method then so much slower? The reason is that the lambda function cannot be directly applied to the DataFrame residing in JVM memory. What actually happens internally is that Spark spins up Python workers next to the Spark executors on the cluster nodes. At execution time, the Spark workers send our lambda function to those Python workers. Next, the Spark workers start serializing their RDD partitions and pipe them to the Python workers via sockets, where our lambda function gets evaluated on each row. For the resulting rows, the whole serialization\/deserialization procedure happens again in the opposite direction so that the actual <span class=\"lang:python decode:true crayon-inline \">filter()<\/span> can be applied to the result set.<\/p>\n<p>The entire data flow when using arbitrary Python functions in PySpark is also shown in the following image, which has been taken from the old <a href=\"https:\/\/cwiki.apache.org\/confluence\/display\/SPARK\/PySpark+Internals\" target=\"_blank\" rel=\"noopener noreferrer\">PySpark Internals wiki<\/a>:<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-3541\" src=\"https:\/\/www.inovex.de\/wp-content\/uploads\/2017\/10\/pyspark_udf_dataflow.jpg\" alt=\"UDAF Data Flow in PySpark\" width=\"640\" height=\"480\" \/><\/p>\n<p>Even if all of this sounded awkwardly technical to you, you get the point that executing Python functions in a distributed Java system is very expensive in terms of execution time due to excessive copying of data back and forth.<\/p>\n<p>To give a short summary to this low-level excursion: as long as we avoid all kind of Python UDFs, a PySpark program will be approximately as fast as Spark program based on Scala. If we cannot avoid UDFs, we should at least try to make them as efficient as possible, which is what show in the remaining post. Before we move on though, one side note should be kept in mind. The general problem of accessing data frames from different programming languages in the realm of data analytics is currently addressed by the creator of Pandas <a href=\"http:\/\/wesmckinney.com\/\" target=\"_blank\" rel=\"noopener noreferrer\">Wes McKinney<\/a>. He is also the initiator of the <a href=\"http:\/\/arrow.apache.org\/\" target=\"_blank\" rel=\"noopener noreferrer\">Apache Arrow<\/a> project which tries to standardize the way columnar data is stored in memory so that everyone using Arrow won\u2019t need to do the cumbersome object translation by serialization and deserialization anymore. Hopefully with version 2.3, as shown in the issues <a href=\"https:\/\/issues.apache.org\/jira\/browse\/SPARK-13534\" target=\"_blank\" rel=\"noopener noreferrer\">SPARK-13534<\/a> and <a href=\"https:\/\/issues.apache.org\/jira\/browse\/SPARK-21190\" target=\"_blank\" rel=\"noopener noreferrer\">SPARK-21190<\/a>, Spark will make use of Arrow, which should drastically speed up our Python UDFs. Still, even in that case we should always prefer built-in Spark functions whenever possible.<\/p>\n<h2><span class=\"ez-toc-section\" id=\"PySpark-UDAFs-with-Pandas\"><\/span>PySpark UDAFs with Pandas<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>As mentioned before our detour into the internals of PySpark, for defining an arbitrary UDAF function we need an operation that allows us to operate on multiple rows and produce one or multiple resulting rows. This functionality is provided by the RDD method <span class=\"lang:python decode:true crayon-inline \">mapPartitions<\/span> , where we can apply an arbitrary Python function <span class=\"lang:python decode:true crayon-inline \">my_func<\/span> to a DataFrame <span class=\"lang:python decode:true crayon-inline \">df<\/span> partition with:<\/p>\n<pre class=\"lang:python decode:true\">df.rdd.mapPartitions(my_func).toDF()<\/pre>\n<p>If you want to further read up on RDDs and partitions, you can checkout the chapter <a href=\"https:\/\/jaceklaskowski.gitbooks.io\/mastering-apache-spark\/spark-rdd-partitions.html\" target=\"_blank\" rel=\"noopener noreferrer\">Partitions and Partitioning<\/a> of the excellent Mastering Apache Spark 2 book by Jacek Laskowski. In most cases we would want to control the number of partitions, like 100, or even group by a column, let\u2019s say <span class=\"lang:python decode:true crayon-inline \">country<\/span> , in which case we would write:<\/p>\n<pre class=\"lang:python decode:true\">df.repartition(100).rdd.mapPartitions(my_func).toDF()\r\n\r\n<\/pre>\n<p>or<\/p>\n<pre class=\"lang:python decode:true \">df.repartition('country').rdd.mapPartitions(my_func).toDF()\r\n\r\n<\/pre>\n<p>Having solved one problem, as it is quite often in life, we have introduced another problem. As we are working now with the low-level RDD interface, our function <span class=\"lang:python decode:true crayon-inline \">my_func<\/span> will be passed an iterator of PySpark <a href=\"https:\/\/spark.apache.org\/docs\/latest\/api\/python\/reference\/pyspark.sql\/api\/pyspark.sql.Row.html\" target=\"_blank\" rel=\"noopener noreferrer\">Row<\/a> objects and needs to return them as well. A <span class=\"lang:python decode:true crayon-inline \">Row<\/span> object itself is only a container for the column values in one row, as you might have guessed. When we return such a <span class=\"lang:python decode:true crayon-inline \">Row<\/span> , the data types of these values therein must be interpretable by Spark in order to translate them back to Scala. This is a lot of low-level stuff to deal with since in most cases we would love to implement our UDF\/UDAF with the help of Pandas, keeping in mind that one partition should hold less than 10 million rows.<\/p>\n<p>So first we need to define a nice function that will convert a <span class=\"lang:python decode:true crayon-inline \">Row<\/span> iterator into a Pandas DataFrame:<\/p>\n<pre class=\"lang:python decode:true\">import logging\r\n\r\nimport pandas as pd\r\n\r\n_logger = logging.getLogger(__name__)\r\n\r\ndef rows_to_pandas(rows):\r\n\r\n    \"\"\"Converts a Spark Row iterator of a partition to a Pandas DataFrame assuming YARN\r\n\r\n    Args:\r\n\r\n        rows: iterator over PySpark Row objects\r\n\r\n    Returns:\r\n\r\n        Pandas DataFrame\r\n\r\n    \"\"\"\r\n\r\n    first_row, rows = peek(rows)\r\n\r\n    if not first_row:\r\n\r\n        _logger.warning(\"Spark DataFrame is empty! Returning empty Pandas DataFrame!\")\r\n\r\n        return pd.DataFrame()\r\n\r\n    first_row_info = [\"{} ({}): {}\".format(k, rtype(first_row[k]), first_row[k])\r\n\r\n                      for k in first_row.__fields__]\r\n\r\n    _logger.debug(\"First partition row: {}\".format(first_row_info))\r\n\r\n    df = pd.DataFrame.from_records(rows, columns=first_row.__fields__)\r\n\r\n    _logger.debug(\"Converted partition to DataFrame of shape {} with types:\\n{}\".format(df.shape, df.dtypes))\r\n\r\n    return df<\/pre>\n<p>This function actually does only one thing which is calling <span class=\"lang:python decode:true crayon-inline \">df = pd.DataFrame.from_records(rows, columns=first_row.__fields__)<\/span> in order to generate a DataFrame. The rest of the code makes sure that the iterator is not empty and for debugging reasons we also peek into the first row and print the value as well as the datatype of each column. This has proven in practice to be extremely helpful in case something goes wrong and one needs to debug what\u2019s going on in the UDF\/UDAF. The functions <span class=\"lang:python decode:true crayon-inline \">peek<\/span> and <span class=\"lang:python decode:true crayon-inline \">rtype<\/span> are defined as follows:<\/p>\n<pre class=\"lang:python decode:true \">from itertools import chain\r\n\r\ndef peek(iterable):\r\n\r\n    \"\"\"Peek into the first element and return the whole iterator again\r\n\r\n    Args:\r\n\r\n        iterable: iterable object like list or iterator\r\n\r\n    Returns:\r\n\r\n        tuple of first element and original iterable\r\n\r\n    \"\"\"\r\n\r\n    iterable = iter(iterable)\r\n\r\n    try:\r\n\r\n        first_elem = next(iterable)\r\n\r\n    except StopIteration:\r\n\r\n        return None, iterable\r\n\r\n    iterable = chain([first_elem], iterable)\r\n\r\n    return first_elem, iterable\r\n\r\ndef rtype(var):\r\n\r\n    \"\"\"Heuristic representation for nested types\/containers\r\n\r\n    Args:\r\n\r\n        var: some (nested) variable\r\n\r\n    Returns:\r\n\r\n        str: string representation of nested datatype (NA=Not Available)\r\n\r\n    \"\"\"\r\n\r\n    def etype(x):\r\n\r\n        return type(x).__name__\r\n\r\n    if isinstance(var, list):\r\n\r\n        elem_type = etype(var[0]) if var else \"NA\"\r\n\r\n        return \"List[{}]\".format(elem_type)\r\n\r\n    elif isinstance(var, dict):\r\n\r\n        keys = list(var.keys())\r\n\r\n        if keys:\r\n\r\n            key = keys[0]\r\n\r\n            key_type, val_type = etype(key), etype(var[key])\r\n\r\n        else:\r\n\r\n            key_type, val_type = \"NA\", \"NA\"\r\n\r\n        return \"Dict[{}, {}]\".format(key_type, val_type)\r\n\r\n    elif isinstance(var, tuple):\r\n\r\n        elem_types = ', '.join(etype(elem) for elem in var)\r\n\r\n        return \"Tuple[{}]\".format(elem_types)\r\n\r\n    else:\r\n\r\n        return etype(var)<\/pre>\n<p>The next part is to actually convert the result of our UDF\/UDAF back to an iterator of Row objects. Since our result will most likely be a Pandas DataFrame or Series, we define the following:<\/p>\n<pre class=\"lang:python decode:true \">import numpy as np\r\n\r\nfrom pyspark.sql.types import Row\r\n\r\ndef convert_dtypes(rows):\r\n\r\n    \"\"\"Converts some Pandas data types to pure Python data types\r\n\r\n    Args:\r\n\r\n        rows (array): numpy recarray holding all rows\r\n\r\n    Returns:\r\n\r\n        Iterator over lists of row values\r\n\r\n    \"\"\"\r\n\r\n    dtype_map = {pd.Timestamp: lambda x: x.to_pydatetime(),\r\n\r\n                 np.int8: lambda x: int(x),\r\n\r\n                 np.int16: lambda x: int(x),\r\n\r\n                 np.int32: lambda x: int(x),\r\n\r\n                 np.int64: lambda x: int(x),\r\n\r\n                 np.float16: lambda x: float(x),\r\n\r\n                 np.float32: lambda x: float(x),\r\n\r\n                 np.float64: lambda x: float(x),\r\n\r\n                 np.float128: lambda x: float(x)}\r\n\r\n    for row in rows:\r\n\r\n        yield [dtype_map.get(type(elem), lambda x: x)(elem) for elem in row]\r\n\r\ndef pandas_to_rows(df):\r\n\r\n    \"\"\"Converts Pandas DataFrame to iterator of Row objects\r\n\r\n    Args:\r\n\r\n        df: Pandas DataFrame\r\n\r\n    Returns:\r\n\r\n        Iterator over PySpark Row objects\r\n\r\n    \"\"\"\r\n\r\n    if df is None:\r\n\r\n        _logger.debug(\"Returning nothing\")\r\n\r\n        return iter([])\r\n\r\n    if type(df) is pd.Series:\r\n\r\n        df = df.to_frame().T\r\n\r\n    if df.empty:\r\n\r\n        _logger.warning(\"Pandas DataFrame is empty! Returning nothing!\")\r\n\r\n        return iter([])\r\n\r\n    _logger.debug(\"Convert DataFrame of shape {} to partition with types:\\n{}\".format(df.shape, df.dtypes))\r\n\r\n    records = df.to_records(index=False)\r\n\r\n    records = convert_dtypes(records)\r\n\r\n    first_row, records = peek(records)\r\n\r\n    first_row_info = [\"{} ({}): {}\".format(k, rtype(v), v) for k, v in zip(df.columns, first_row)]\r\n\r\n    _logger.debug(\"First record row: {}\".format(first_row_info))\r\n\r\n    row = Row(*df.columns)\r\n\r\n    return (row(*elems) for elems in records)<\/pre>\n<p>This looks a bit more complicated but essentially we convert a Pandas Series to a DataFrame if necessary and handle the edge cases of an empty DataFrame or <span class=\"lang:python decode:true crayon-inline \">None<\/span> as return value. We then convert the <span class=\"lang:python decode:true crayon-inline \">DataFrame<\/span> to records, convert some NumPy data types to the Python equivalent and create an iterator over <span class=\"lang:python decode:true crayon-inline \">Row<\/span> objects from the converted records.<\/p>\n<p>With these functions at hand we can define a <a href=\"https:\/\/wiki.python.org\/moin\/PythonDecorators#What_is_a_Decorator\" target=\"_blank\" rel=\"noopener noreferrer\">Python decorator<\/a> that will allow us to automatically call the functions <span class=\"lang:python decode:true crayon-inline \">rows_to_pandas<\/span> and <span class=\"lang:python decode:true crayon-inline \">pandas_to_rows<\/span> at the right time:<\/p>\n<pre class=\"lang:python decode:true \">from functools import wraps\r\n\r\nclass pandas_udaf(object):\r\n\r\n    \"\"\"Decorator for PySpark UDAFs using Pandas\r\n\r\n    Args:\r\n\r\n        loglevel (int): minimum loglevel for emitting messages\r\n\r\n    \"\"\"\r\n\r\n    def __init__(self, loglevel=logging.INFO):\r\n\r\n        self.loglevel = loglevel\r\n\r\n    def __call__(self, func):\r\n\r\n        @wraps(func)\r\n\r\n        def wrapper(*args):\r\n\r\n            # use *args to allow decorating methods (incl. self arg)\r\n\r\n            args = list(args)\r\n\r\n            setup_logger(loglevel=self.loglevel)\r\n\r\n            args[-1] = rows_to_pandas(args[-1])\r\n\r\n            df = func(*args)\r\n\r\n            return pandas_to_rows(df)\r\n\r\n        return wrapper<\/pre>\n<p>The code is pretty much self-explanatory if you have ever written a Python decorator; otherwise, you should read about it since it takes some time to wrap your head around it. Basically, we set up a default logger, create a Pandas DataFrame from the Row iterator, pass it to our UDF\/UDAF and convert its return value back to a Row iterator. The only additional thing that might still raise questions is the usage of <span class=\"lang:python decode:true crayon-inline \">args[-1]<\/span> . This is due to the fact that <span class=\"lang:python decode:true crayon-inline \">func<\/span> might also be a method of an object. In this case, the first argument would be <span class=\"lang:python decode:true crayon-inline \">self<\/span> but the last argument is in either cases the actual argument that <span class=\"lang:python decode:true crayon-inline \">mapPartitions<\/span> will pass to us. The code of <span class=\"lang:python decode:true crayon-inline \">setup_logger<\/span> depends on your Spark installation. In case you are using Spark on Apache <a href=\"https:\/\/hortonworks.com\/apache\/yarn\/\" target=\"_blank\" rel=\"noopener noreferrer\">YARN<\/a>, it might look like this:<\/p>\n<pre class=\"lang:python decode:true \">import os\r\n\r\nimport sys\r\n\r\ndef setup_logger(loglevel=logging.INFO, logfile=\"pyspark.log\"):\r\n\r\n    \"\"\"Setup basic logging for logging on the executor\r\n\r\n    Args:\r\n\r\n        loglevel (int): minimum loglevel for emitting messages\r\n\r\n        logfile (str): name of the logfile\r\n\r\n    \"\"\"\r\n\r\n    logformat = \"%(asctime)s %(levelname)s %(module)s.%(funcName)s: %(message)s\"\r\n\r\n    datefmt = \"%y\/%m\/%d %H:%M:%S\"\r\n\r\n    try:\r\n\r\n        logfile = os.path.join(os.environ['LOG_DIRS'].split(',')[0], logfile)\r\n\r\n    except (KeyError, IndexError):\r\n\r\n        logging.basicConfig(level=loglevel,\r\n\r\n                            stream=sys.stdout,\r\n\r\n                            format=logformat,\r\n\r\n                            datefmt=datefmt)\r\n\r\n        logger = logging.getLogger(__name__)\r\n\r\n        logger.error(\"LOG_DIRS is not in environment variables or empty, using STDOUT instead.\")\r\n\r\n    logging.basicConfig(level=loglevel,\r\n\r\n                        filename=logfile,\r\n\r\n                        format=logformat,\r\n\r\n                        datefmt=datefmt)<\/pre>\n<p>Now having all parts in place let\u2019s assume the code above resides in the python module <a href=\"http:\/\/www.florianwilhelm.info\/src\/pyspark_udaf.py\" target=\"_blank\" rel=\"noopener noreferrer\">pyspark_udaf.py<\/a>. A future post will cover the topic of deploying dependencies in a systematic way for production requirements. For now we just presume that <a href=\"http:\/\/www.florianwilhelm.info\/src\/pyspark_udaf.py\" target=\"_blank\" rel=\"noopener noreferrer\">pyspark_udaf.py<\/a> as well as all its dependencies like Pandas, NumPy, etc. are accessible by the Spark driver as well as the executors. This allows us to then easily define an example UDAF <span class=\"lang:python decode:true crayon-inline \">my_func<\/span> that collects some basic statistics for each country as:<\/p>\n<pre class=\"lang:python decode:true \">import pyspark_udaf\r\n\r\nimport logging\r\n\r\n@pyspark_udaf.pandas_udaf(loglevel=logging.DEBUG)\r\n\r\ndef my_func(df):\r\n\r\n    if df.empty:\r\n\r\n        return\r\n\r\n    df = df.groupby('country').apply(lambda x: x.drop('country', axis=1).describe())\r\n\r\n    return df.reset_index()\r\n\r\n<\/pre>\n<p>It is of course not really useful in practice to return some statistics with the help of a UDAF that could also be retrieved with basic PySpark functionality but this is just an example. We now generate a dummy data DataFrame and apply the function to each partition as above with:<\/p>\n<pre class=\"lang:python decode:true \">i\r\n\r\n# make pyspark_udaf.py available to the executors\r\n\r\nspark.sparkContext.addFile('.\/pyspark_udaf.py')\r\n\r\ndf = spark.createDataFrame(\r\n\r\n    data = [('DEU', 2, 1.0), ('DEU', 3, 8.0), ('FRA', 2, 6.0),\r\n\r\n            ('FRA', 0, 8.0), ('DEU', 3, 8.0), ('FRA', 1, 3.0)],\r\n\r\n    schema = ['country', 'feature1', 'feature2'])\r\n\r\nstats_df = df.repartition('country').rdd.mapPartitions(my_func).toDF()\r\n\r\nprint(stats_df.toPandas())<\/pre>\n<p>The code above can be easily tested with the help of a Jupyter notebook with PySpark where the <a href=\"http:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.SparkSession\" target=\"_blank\" rel=\"noopener noreferrer\">SparkSession<\/a> <span class=\"lang:python decode:true crayon-inline \">spark<\/span> is predefined.<\/p>\n<h2><span class=\"ez-toc-section\" id=\"Summary\"><\/span>Summary<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<p>Overall, this proposed method allows the definition of an UDF as well as an UDAF since it is up to the function <span class=\"lang:python decode:true crayon-inline \">my_func<\/span> if it returns (1) a DataFrame having as many rows as the input DataFrame (think <a href=\"https:\/\/pandas.pydata.org\/pandas-docs\/stable\/generated\/pandas.DataFrame.transform.html\" target=\"_blank\" rel=\"noopener noreferrer\">Pandas transform<\/a>), (2) a DataFrame of only a single row or (3) optionally a Series (think <a href=\"https:\/\/pandas.pydata.org\/pandas-docs\/stable\/generated\/pandas.DataFrame.aggregate.html\" target=\"_blank\" rel=\"noopener noreferrer\">Pandas aggregate<\/a>) or a DataFrame with an arbitrary number of rows (think <a href=\"https:\/\/pandas.pydata.org\/pandas-docs\/stable\/generated\/pandas.DataFrame.apply.html\" target=\"_blank\" rel=\"noopener noreferrer\">Pandas apply<\/a>) with even varying columns. Therefore, this approach should be applicable to a variety of use cases where the built-in PySpark functionality is not sufficient.<\/p>\n<p>To wrap it up, this blog post gives you a template on how to write PySpark UD(A)Fs while abstracting all the boilerplate in a dedicated module. We also went down the rabbit hole to explore the technical difficulties the Spark developers face in providing Python bindings to a distributed JVM-based system. In this respect we are really looking forward to closer integration of <a href=\"http:\/\/arrow.apache.org\/\" target=\"_blank\" rel=\"noopener noreferrer\">Apache Arrow<\/a> and Spark in the upcoming Spark 2.3 and future versions.<\/p>\n<p><em>This article originally appeared on <a href=\"http:\/\/www.florianwilhelm.info\/2017\/10\/efficient_udfs_with_pyspark\/\" target=\"_blank\" rel=\"noopener noreferrer\">florianwilhelm.de<\/a>.<\/em><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Nowadays, Spark surely is one of the most prevalent technologies in the fields of data science and big data. Luckily, even though it is developed in Scala and runs in the Java Virtual Machine (JVM), it comes with Python bindings also known as PySpark, whose API was heavily influenced by Pandas. With respect to functionality, [&hellip;]<\/p>\n","protected":false},"author":52,"featured_media":13094,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"inline_featured_image":false,"ep_exclude_from_search":false,"footnotes":""},"tags":[206],"service":[431],"coauthors":[{"id":52,"display_name":"Florian Wilhelm","user_nicename":"fwilhelm"},{"id":55,"display_name":"Bernhard Sch\u00e4fer","user_nicename":"bschaefer"}],"class_list":["post-21066","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","tag-data-science","service-data-science"],"acf":[],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v27.6 - https:\/\/yoast.com\/product\/yoast-seo-wordpress\/ -->\n<title>Implementing efficient UD(A)Fs with PySpark<\/title>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/\" \/>\n<meta property=\"og:locale\" content=\"de_DE\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"Implementing efficient UD(A)Fs with PySpark\" \/>\n<meta property=\"og:description\" content=\"Nowadays, Spark surely is one of the most prevalent technologies in the fields of data science and big data. Luckily, even though it is developed in Scala and runs in the Java Virtual Machine (JVM), it comes with Python bindings also known as PySpark, whose API was heavily influenced by Pandas. With respect to functionality, [&hellip;]\" \/>\n<meta property=\"og:url\" content=\"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/\" \/>\n<meta property=\"og:site_name\" content=\"inovex GmbH\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/inovexde\" \/>\n<meta property=\"article:published_time\" content=\"2017-10-17T06:58:59+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2024-08-29T06:04:31+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/www.inovex.de\/wp-content\/uploads\/2017\/10\/pyspark.jpg\" \/>\n\t<meta property=\"og:image:width\" content=\"1920\" \/>\n\t<meta property=\"og:image:height\" content=\"1080\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/jpeg\" \/>\n<meta name=\"author\" content=\"Florian Wilhelm, Bernhard Sch\u00e4fer\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:image\" content=\"https:\/\/www.inovex.de\/wp-content\/uploads\/2017\/10\/pyspark-1024x576.jpg\" \/>\n<meta name=\"twitter:creator\" content=\"@inovexgmbh\" \/>\n<meta name=\"twitter:site\" content=\"@inovexgmbh\" \/>\n<meta name=\"twitter:label1\" content=\"Verfasst von\" \/>\n\t<meta name=\"twitter:data1\" content=\"Florian Wilhelm\" \/>\n\t<meta name=\"twitter:label2\" content=\"Gesch\u00e4tzte Lesezeit\" \/>\n\t<meta name=\"twitter:data2\" content=\"17\u00a0Minuten\" \/>\n\t<meta name=\"twitter:label3\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data3\" content=\"Florian Wilhelm, Bernhard Sch\u00e4fer\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\\\/\\\/schema.org\",\"@graph\":[{\"@type\":\"Article\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/#article\",\"isPartOf\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/\"},\"author\":{\"name\":\"Florian Wilhelm\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#\\\/schema\\\/person\\\/57ad7c24ee7f9ec59ed87598c73fe79e\"},\"headline\":\"Efficient UD(A)Fs with PySpark\",\"datePublished\":\"2017-10-17T06:58:59+00:00\",\"dateModified\":\"2024-08-29T06:04:31+00:00\",\"mainEntityOfPage\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/\"},\"wordCount\":2386,\"commentCount\":9,\"publisher\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#organization\"},\"image\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/#primaryimage\"},\"thumbnailUrl\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/2017\\\/10\\\/pyspark.jpg\",\"keywords\":[\"Data Science\"],\"articleSection\":[\"Analytics\",\"English Content\",\"General\"],\"inLanguage\":\"de\",\"potentialAction\":[{\"@type\":\"CommentAction\",\"name\":\"Comment\",\"target\":[\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/#respond\"]}]},{\"@type\":\"WebPage\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/\",\"url\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/\",\"name\":\"Implementing efficient UD(A)Fs with PySpark\",\"isPartOf\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#website\"},\"primaryImageOfPage\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/#primaryimage\"},\"image\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/#primaryimage\"},\"thumbnailUrl\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/2017\\\/10\\\/pyspark.jpg\",\"datePublished\":\"2017-10-17T06:58:59+00:00\",\"dateModified\":\"2024-08-29T06:04:31+00:00\",\"breadcrumb\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/#breadcrumb\"},\"inLanguage\":\"de\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/\"]}]},{\"@type\":\"ImageObject\",\"inLanguage\":\"de\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/#primaryimage\",\"url\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/2017\\\/10\\\/pyspark.jpg\",\"contentUrl\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/2017\\\/10\\\/pyspark.jpg\",\"width\":1920,\"height\":1080,\"caption\":\"Pyspark Logo\"},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/efficient-udafs-with-pyspark\\\/#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"Home\",\"item\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"Efficient UD(A)Fs with PySpark\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#website\",\"url\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/\",\"name\":\"inovex GmbH\",\"description\":\"\",\"publisher\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#organization\"},\"potentialAction\":[{\"@type\":\"SearchAction\",\"target\":{\"@type\":\"EntryPoint\",\"urlTemplate\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/?s={search_term_string}\"},\"query-input\":{\"@type\":\"PropertyValueSpecification\",\"valueRequired\":true,\"valueName\":\"search_term_string\"}}],\"inLanguage\":\"de\"},{\"@type\":\"Organization\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#organization\",\"name\":\"inovex GmbH\",\"url\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/\",\"logo\":{\"@type\":\"ImageObject\",\"inLanguage\":\"de\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#\\\/schema\\\/logo\\\/image\\\/\",\"url\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/2021\\\/03\\\/inovex-logo-16-9-1.png\",\"contentUrl\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/2021\\\/03\\\/inovex-logo-16-9-1.png\",\"width\":1921,\"height\":1081,\"caption\":\"inovex GmbH\"},\"image\":{\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#\\\/schema\\\/logo\\\/image\\\/\"},\"sameAs\":[\"https:\\\/\\\/www.facebook.com\\\/inovexde\",\"https:\\\/\\\/x.com\\\/inovexgmbh\",\"https:\\\/\\\/www.instagram.com\\\/inovexlife\\\/\",\"https:\\\/\\\/www.linkedin.com\\\/company\\\/inovex\",\"https:\\\/\\\/www.youtube.com\\\/channel\\\/UC7r66GT14hROB_RQsQBAQUQ\"]},{\"@type\":\"Person\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/#\\\/schema\\\/person\\\/57ad7c24ee7f9ec59ed87598c73fe79e\",\"name\":\"Florian Wilhelm\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"de\",\"@id\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/cropped-florian-1-IMG_5829-800x610-1-96x96.jpg5db1abe47435abb84b0b7484ce0890e9\",\"url\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/cropped-florian-1-IMG_5829-800x610-1-96x96.jpg\",\"contentUrl\":\"https:\\\/\\\/www.inovex.de\\\/wp-content\\\/uploads\\\/cropped-florian-1-IMG_5829-800x610-1-96x96.jpg\",\"caption\":\"Florian Wilhelm\"},\"url\":\"https:\\\/\\\/www.inovex.de\\\/de\\\/blog\\\/author\\\/fwilhelm\\\/\"}]}<\/script>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"Implementing efficient UD(A)Fs with PySpark","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/","og_locale":"de_DE","og_type":"article","og_title":"Implementing efficient UD(A)Fs with PySpark","og_description":"Nowadays, Spark surely is one of the most prevalent technologies in the fields of data science and big data. Luckily, even though it is developed in Scala and runs in the Java Virtual Machine (JVM), it comes with Python bindings also known as PySpark, whose API was heavily influenced by Pandas. With respect to functionality, [&hellip;]","og_url":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/","og_site_name":"inovex GmbH","article_publisher":"https:\/\/www.facebook.com\/inovexde","article_published_time":"2017-10-17T06:58:59+00:00","article_modified_time":"2024-08-29T06:04:31+00:00","og_image":[{"width":1920,"height":1080,"url":"https:\/\/www.inovex.de\/wp-content\/uploads\/2017\/10\/pyspark.jpg","type":"image\/jpeg"}],"author":"Florian Wilhelm, Bernhard Sch\u00e4fer","twitter_card":"summary_large_image","twitter_image":"https:\/\/www.inovex.de\/wp-content\/uploads\/2017\/10\/pyspark-1024x576.jpg","twitter_creator":"@inovexgmbh","twitter_site":"@inovexgmbh","twitter_misc":{"Verfasst von":"Florian Wilhelm","Gesch\u00e4tzte Lesezeit":"17\u00a0Minuten","Written by":"Florian Wilhelm, Bernhard Sch\u00e4fer"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/#article","isPartOf":{"@id":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/"},"author":{"name":"Florian Wilhelm","@id":"https:\/\/www.inovex.de\/de\/#\/schema\/person\/57ad7c24ee7f9ec59ed87598c73fe79e"},"headline":"Efficient UD(A)Fs with PySpark","datePublished":"2017-10-17T06:58:59+00:00","dateModified":"2024-08-29T06:04:31+00:00","mainEntityOfPage":{"@id":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/"},"wordCount":2386,"commentCount":9,"publisher":{"@id":"https:\/\/www.inovex.de\/de\/#organization"},"image":{"@id":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/#primaryimage"},"thumbnailUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/2017\/10\/pyspark.jpg","keywords":["Data Science"],"articleSection":["Analytics","English Content","General"],"inLanguage":"de","potentialAction":[{"@type":"CommentAction","name":"Comment","target":["https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/#respond"]}]},{"@type":"WebPage","@id":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/","url":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/","name":"Implementing efficient UD(A)Fs with PySpark","isPartOf":{"@id":"https:\/\/www.inovex.de\/de\/#website"},"primaryImageOfPage":{"@id":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/#primaryimage"},"image":{"@id":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/#primaryimage"},"thumbnailUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/2017\/10\/pyspark.jpg","datePublished":"2017-10-17T06:58:59+00:00","dateModified":"2024-08-29T06:04:31+00:00","breadcrumb":{"@id":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/#breadcrumb"},"inLanguage":"de","potentialAction":[{"@type":"ReadAction","target":["https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/"]}]},{"@type":"ImageObject","inLanguage":"de","@id":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/#primaryimage","url":"https:\/\/www.inovex.de\/wp-content\/uploads\/2017\/10\/pyspark.jpg","contentUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/2017\/10\/pyspark.jpg","width":1920,"height":1080,"caption":"Pyspark Logo"},{"@type":"BreadcrumbList","@id":"https:\/\/www.inovex.de\/de\/blog\/efficient-udafs-with-pyspark\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Home","item":"https:\/\/www.inovex.de\/de\/"},{"@type":"ListItem","position":2,"name":"Efficient UD(A)Fs with PySpark"}]},{"@type":"WebSite","@id":"https:\/\/www.inovex.de\/de\/#website","url":"https:\/\/www.inovex.de\/de\/","name":"inovex GmbH","description":"","publisher":{"@id":"https:\/\/www.inovex.de\/de\/#organization"},"potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/www.inovex.de\/de\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"de"},{"@type":"Organization","@id":"https:\/\/www.inovex.de\/de\/#organization","name":"inovex GmbH","url":"https:\/\/www.inovex.de\/de\/","logo":{"@type":"ImageObject","inLanguage":"de","@id":"https:\/\/www.inovex.de\/de\/#\/schema\/logo\/image\/","url":"https:\/\/www.inovex.de\/wp-content\/uploads\/2021\/03\/inovex-logo-16-9-1.png","contentUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/2021\/03\/inovex-logo-16-9-1.png","width":1921,"height":1081,"caption":"inovex GmbH"},"image":{"@id":"https:\/\/www.inovex.de\/de\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/inovexde","https:\/\/x.com\/inovexgmbh","https:\/\/www.instagram.com\/inovexlife\/","https:\/\/www.linkedin.com\/company\/inovex","https:\/\/www.youtube.com\/channel\/UC7r66GT14hROB_RQsQBAQUQ"]},{"@type":"Person","@id":"https:\/\/www.inovex.de\/de\/#\/schema\/person\/57ad7c24ee7f9ec59ed87598c73fe79e","name":"Florian Wilhelm","image":{"@type":"ImageObject","inLanguage":"de","@id":"https:\/\/www.inovex.de\/wp-content\/uploads\/cropped-florian-1-IMG_5829-800x610-1-96x96.jpg5db1abe47435abb84b0b7484ce0890e9","url":"https:\/\/www.inovex.de\/wp-content\/uploads\/cropped-florian-1-IMG_5829-800x610-1-96x96.jpg","contentUrl":"https:\/\/www.inovex.de\/wp-content\/uploads\/cropped-florian-1-IMG_5829-800x610-1-96x96.jpg","caption":"Florian Wilhelm"},"url":"https:\/\/www.inovex.de\/de\/blog\/author\/fwilhelm\/"}]}},"_links":{"self":[{"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/posts\/21066","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/users\/52"}],"replies":[{"embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/comments?post=21066"}],"version-history":[{"count":5,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/posts\/21066\/revisions"}],"predecessor-version":[{"id":57649,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/posts\/21066\/revisions\/57649"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/media\/13094"}],"wp:attachment":[{"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/media?parent=21066"}],"wp:term":[{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/tags?post=21066"},{"taxonomy":"service","embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/service?post=21066"},{"taxonomy":"author","embeddable":true,"href":"https:\/\/www.inovex.de\/de\/wp-json\/wp\/v2\/coauthors?post=21066"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}