Findings in Running Google Dataproc

Gepostet am: 17. Oktober 2018

In this article we will share the experience we have gained from running Dataproc clusters on Google Cloud. We specifically selected topics which you definitively have to deal with if you want to operate Dataproc clusters in production and that differ from practices we are used to from on-premises clusters.

The topics are:

Cluster Creation

Before you start to create Dataproc clusters in the Google Cloud, you have to decide where the clusters should live. Different factors should be taken into account. For example, it makes sense that clusters are deployed next to systems which provide or consume required data. This affects not only the latency but also the costs. Traffic costs increase if the traffic leaves the boundaries of zones, regions or even the Google Cloud.

Also the network has to meet some requirements so that Dataproc nodes are able to communicate with each other. In one of our cases we couldn’t run a Dataproc cluster in the same network used by a Kubernetes cluster due to some communication restrictions. However, we needed access to data stored in Kafka which was also deployed on Kubernetes. So we decided to deploy our cluster in a subnet of this network and defined appropriate firewall rules. Of course you have to size the subnet according to the number of planned cluster nodes. In this phase we recommend to involve networking teams that are familiar with networks in the Google Cloud.

There are multiple ways to create a Dataproc cluster: Using a REST API, the gcloud command line tool or the Google Cloud Platform Console (GUI). You should consider using the REST API or the command line to create new clusters, because the graphical console doesn’t support the full feature set. Also this way it is easy to store commands and execute them automatically.

Now you have to think about the sizing of the cluster. For a long-running cluster, you should activate high availability. But if you plan to start short-lived clusters you don’t have to waste costs for two additional master nodes. Also the concept of preemptible VM instances can save you money. These are instances that run a maximum of 24 hours and may be terminated at any time by Compute Engine. However, preemptible instances run at a much lower price and are therefore perfectly suited e.g. for batch jobs. Note that preemptible instances only participate in computing but cannot extend HDFS storage.

When creating a cluster, you also have to select a Dataproc image version. Google provides an overview of the supported versions in this version list. You have to be aware that support for old versions runs out (find the conditions here). So try to use the most current versions. If you later have to update your jobs to new versions, simply run a second cluster in parallel to the old one and migrate your workflows one by one.

At this point we would like to give you a general hint regarding the official Google Cloud documentation: Always read the English version! Other versions are often outdated or contain less information. For example, in the German translation of the version list most current versions were not listed at time of writing.

Cluster Access

The easiest way to connect to a Dataproc node is via the Cloud Console. There you can select your cluster and in the VMs tab you can open a shell in the browser by clicking the SSH button next to a master node.

Cloud Console Dataproc cluster details view

If you want to access nodes from your local machine, you have to install the Google Cloud SDK. After following the guides to install and initialize the SDK you can execute the following commands to open a shell to any Dataproc node:

You can get the command to connect to your cluster also by clicking View gcloud command next to the SSH button in the cloud console.

The recommended way to access web interfaces is to run a SOCKS proxy which uses an SSH tunnel to communicate with the cluster (see documentation). In the Cloud SDK shell execute:

When the connection is established, open a browser which uses the SOCKS proxy, e.g. with:

Now you can navigate for example to the YARN UI on http://my-dataproc-cluster-m:8088/.

Google Cloud Storage

Google Cloud provides an object storage comparable to Amazon S3 which you definitively should consider using for multiple use cases. The Google Cloud Storage (GCS) is independent of your Dataproc clusters and so you can use it to separate data storage and computation. This allows you to share your data between clusters and facilitates software updates. For example, you can setup a new cluster with an up-to-date Spark version and move your workflows one by one to the new system without need to move the data. But you have to keep in mind that the performance of GCS is not the same as HDFS. So you have to decide by yourself if you accept e.g. a higher latency to get the advantages of GCS like e.g. lower storage costs.

Another use case is to store common configuration or property files in GCS. We also use the Google Cloud Storage as destination for logging output of our jobs and to store the Spark event logs. This way we avoid the risk that our disk space on the nodes runs out because of huge log files.

The Java library Cloud Storage connector enables you to use GCS in your Spark jobs just like HDFS. Simply replace the prefix hdfs:// with gs:// in your paths. The library is installed on Dataproc clusters per default and is also integrated in HDFS shell commands. So for example it is possible to list the contents of a bucket with hdfs dfs -ls gs://mybucket/.

Other ways to access GCS is via the graphical Google Cloud Platform Console or using gsutil. Gsutil is a command line tool to manage GCS buckets. As an example, the following command copies all contents from a GCS path to a local directory (this may be executed on a cluster node):

It is also possible to mount GCS buckets as file systems on your cluster nodes using Cloud Storage FUSE.

Configuration Management

You have to think about how to handle configuration changes before creating a cluster. Google Dataproc doesn’t provide a solution to manage configurations like we know it from other Hadoop Distributions (e.g. Cloudera Manager), so you have to do it by hand or with custom scripts.

There are two challenges to be solved: How to change configurations of running nodes and how to provide the most current configuration to nodes added to a cluster automatically.

You don’t want to connect to every node in the cluster, edit the configuration files and restart the affected services by hand every time a simple value has to be changed. So we recommend to write a script that executes all these tasks automatically.

We can simplify the handling of configuration changes by storing copies of configuration files (e.g. yarn-site.xml) in GCS. So you just have to edit these copies and then distribute them to the nodes by replacing the old versions.

We already explained how to copy files from GCS to the cluster and custom commands on the nodes can be executed using gcloud compute ssh. This command accepts an argument --command where you can specify the commands to restart the modified services (see reference).

With these building blocks you are able to create a custom script to distribute configuration changes. But how does a new node added to a cluster know about these changes?

For most Hadoop components on Google Dataproc property changes can be specified on cluster creation via the --properties flag. All property changes specified this way are applied before the daemons on cluster start. The supported configuration files are listed in the official documentation.

The following command for example activates cleaning of Spark event logs (in spark-defaults.conf) and YARN log aggregation (in yarn-site.xml):

To modify files or other environment characteristics not specified in the properties list, you have to use initialization actions. These actions are scripts executed immediately after a cluster starts up. There are some predefined actions available but you can also write your own actions as simple scripts stored in GCS. It is also possible to define for each script if it should be executed on all nodes or on master nodes only (or only on workers, respectively).

Like the properties mentioned before, you have to specify initialization actions on cluster creation, e.g.:

The property changes and initialization actions defined for a cluster are also executed when you add a new node, but unfortunately it is not possible to add or modify these actions after the cluster has been created. As a workaround we recommend to write a ‘master script’, which is able to load and run other scripts stored in GCS dynamically. With this method you are able to add scripts in the future for use cases you didn’t have in mind before.

The described challenges are mainly relevant if you want to maintain a long running cluster (e.g. for 24/7 streaming workflows). It is not that complicated if you just start Dataproc clusters for short-time analytics and then delete them afterwards. In this case you may just use the properties flag and simple initialization actions without worrying about future changes.

In addition to initialization actions you may take a look at Cloud Dataproc Optional Components which are currently in beta status.

Configuration Tips

Here are some example properties you may want to set on cluster creation. As already mentioned, we recommend to write Spark event logs to GCS. To do so, you can modify the following properties:

  • spark:spark.eventLog.dir=gs://my-bucket/cluster/my-dataproc-cluster/spark/eventlog
  • spark:spark.history.fs.logDirectory=gs://my-bucket/cluster/my-dataproc-cluster/spark/eventlog
  • spark:spark.history.fs.cleaner.enabled=true

To activate aggregation of YARN logs, following properties are applicable:

  • yarn:yarn.log-aggregation-enable=true
  • yarn:yarn.log-aggregation.retain-seconds=604800 (just an example value)
  • yarn:yarn.log.server.url=http://my-dataproc-cluster-m-0:19888/jobhistory/logs

On the cluster properties list there are also some specific properties for Google Dataproc. For example, we had troubles connecting to a secured Kafka cluster from our Dataproc cluster. We found out that the library conscrypt from Google caused authentication errors. Our solution was to disable this library as primary Java security provider by setting the following property:

This property is not documented e.g. in the German translation of the official documentation. So remember to always consider the English version.

Some default values for configurations on Dataproc are not what we would expect. For example Spark dynamic resource allocation is enabled per default with an initial set of 10,000 executors. In general, dynamic allocation makes sense especially in cloud environments, because it enables you to dynamically add executors to running jobs. But such a high value of spark.dynamicAllocation.initialExecutors means every time you start a simple job, Spark tries to allocate resources for at least 10,000 executors (!). According to the Spark documentation, spark.dynamicAllocation.enabled=false is the default. So be careful and check how property values differ on Google Dataproc.

Monitoring

Similar to configuration management, Google Cloud doesn’t offer a sophisticated monitoring solution for Big Data applications out of the box. However, Google offers a generic monitoring tool named Stackdriver. Stackdriver provides a monitoring and alerting platform by collecting metrics and logs from various systems.

Some metrics like CPU usage, disk and network I/O of Dataproc instances are collected per default and can be viewed in the Stackdriver UI. Also there are Stackdriver logging agents running on the clusters gathering log messages from diverse log files. For example, Hadoop, Spark and also YARN user logs are supported if the logged messages are in a compatible format. Under the hood, Google uses Fluentd and you can have a look at or modify the configs containing the supported log files and formats under /etc/google-fluentd/ on your cluster node.

The collected logs can be examined in the Cloud Console Logging UI. It is possible to search and filter the messages by source, label, log level etc. which is very helpful to investigate errors:

Cloud Console Stackdriver Logging UI

To integrate special application metrics like the number of failed Spark tasks or how many virtual cores are available to YARN in Stackdriver is not trivial. Therefore, in one of our projects we decided to build our own monitoring agent which polls the APIs of YARN and Spark and delivers selected metrics to a monitoring infrastructure based on Prometheus and Grafana which already existed before. This enables us to supervise the overall health of a cluster (active nodes, available cores or memory, pending applications etc.) and stability of our Spark jobs (failed apps or tasks, average input rate or scheduling delay of streaming jobs etc.). The effort of building a custom monitoring agent is not as high as you may think and if you already have a comparable monitoring solution, this might be an appropriate solution.

Additionally we evaluated the commercial monitoring product Datadog which is quite simple to deploy on Dataproc clusters. Datadog offers many integrations including YARN, HDFS and Spark out of the box. The metrics collected by the agents running on the cluster nodes are sent to external servers and can be observed on a central website. The costs are calculated by started agents which may be equivalent to the number of cluster nodes but it may also suffice to deploy Datadog agents only on the master nodes (so you can get the status of all nodes out of YARN metrics).

So if you want to start fast or don’t want to build a monitoring system of your own and have the necessary budget, Datadog may be worth a look.

Read on

For more information on Big Data have a look at our portfolio. If you like to get your hands dirty yourself (and be paid for it!) you might want to consider joining us!

2018-11-15T13:59:50+00:00