What Kubernetes can bring to Apache Spark pipelines ?

Versions: Apache Spark 2.3.0, Kubernetes 1.10

Commercial version of Apache Spark distributed by Databricks offers a serverless and auto-scalable approach for the applications written in this framework. Among the time some other companies tried to provide similar alternatives, going even to put Apache Spark pipelines into AWS Lambda functions. But with the version 2.3.0 another alternative appears as a solution for scalability and elasticity overhead - Kubernetes.

This post starts the series of posts about Spark on Kubernetes component, released as alpha in 2.3.0 version. It tries to explain how Kubernetes can help to improve data pipelines written with Apache Spark. It's divided in different sections where each of them describes 1 specific point. They'll talk about: fault tolerance, scalability, replication, ops, isolation and universality.

Managed fault tolerance

The first point where Kubernetes is interesting for Spark pipelines is fault tolerance. Thanks to its architecture Kubernetes is able to monitor what happens in the cluster, what nodes are unhealthy, what pods don't meet expected requirements (e.g. containers are down), and make everything to bring back the most optimal situation.

Pods

Pod is one of resources (objects) available in Kubernetes. It composed of 1 or more container images deployed as a single unit.

Kubernetes objects define fault tolerance through restartPolicy property that can take one of 3 values: Always, OnFailure or Never and it applies for all containers of given pod. If one of them fails, Kubernetes will look for restartPolicy and accordingly to it restart or not the failed container. Another fault tolerance management concerns pods. They need an extra object called controller to monitor and restart them when for instance the node they're running on becomes unhealthy.

Such managed fault tolerance can be interesting for Apache Spark workloads. The data processing framework deals pretty good with executors and tasks failures (you can learn that in the posts Spark failure detection - heartbeats and Failed tasks resubmit). It also provides some solutions for driver failures with spark.driver.supervise property (limited to Mesos and standalone schedulers though). Kubernetes built-in fault-tolerance could complete already existing solutions for both executors and driver sides.

Scalability

Nowadays who tells cluster tells scalability. With built-in features as horizontal pod autoscaler, Kubernetes can ensure that the number of running objects can support current workload. The autoscaler is based on resource use (CPU, memory) and accordingly is able to create new objects to handle increased or decreased load. The number of created replicas is controlled by minReplicas and maxReplicas properties.

But horizontal pod autoscaler is not the single possibility to scale in Kubernetes. Elastic scaling consisting on adding and removing nodes automatically from the cluster is also possible with 3rd part plugins. One of them, Autoscaler, adapts the cluster to the resources running at givem moment. If one of resources is scheduled but doesn't have place to run, Autoscaler will bring up new node that can be evicted once the node becomes unused.

This property could enhance dynamic resource allocation in Spark feature and let the pipelines to be much more pragmatic. We could start with a small cluster, just to handle a little bit more than expected normal load and let Kubernetes to scale the cluster accordingly to the executors asked from the dynamic resource allocation action. And since new executors would be Kubernetes objects (pods, it will be detailed in one of incoming posts), Autoscaler could increase or decrease the number of cluster nodes to allow all pods to run or to free created but unused compute resources. However an important challenge appears here because of the external shuffle service that for now is ran on the same node than its executor. In such case it's obvious that we can't destroy unused node very easily.

Scheduling

As told in previous sections Kubernetes is able to schedule new resources in the cluster. This feature can be used to enhance Spark's speculative execution feature. You can learn more about it in the post speculative execution in Spark but for now we can simply recall that the engine will monitor tasks performing poorly and trigger them on another executor to take earlier computed result.

Thanks to Kubernetes resources limitations speculative execution can go much further. We could for instance instruct Spark to schedule speculative tasks with a little bit more resources than initially specified in original (not speculative) executors. If the jobs executed on speculative executors perform better, we could replace original executors with speculative ones.

For instance we could start a job with 4 GB of RAM per executor. But an unexpected load increase could make this configuration inefficient. Spark would in this case launch 30% of speculative tasks with 6 GB of memory and estimate if it helped and accordingly to the result automatically rebalance to new cluster state. And once some of these new resources become unused, it could scale them down.

It's an example of vertical scaling and even though it became less popular than the horizontal one, it can be useful in some specific scenarios as unbalanced datasets. They can be rebalanced with Spark's repartition methods though but such automatic management appears as an alternative to unexpected dataset changes (e.g. in streaming context the dataset was initially evenly balanced but after some time it became unbalanced).

Kubernetes and cluster resources

When a Kubernetes object is configured we can often specify its targeted and maximal cluster resources as CPU or memory. It helps the scheduler to find the nodes matching resource requirements for created objects.

Operational overhead reduced

Kubernetes comes with the concept called operators. An operator is a controller specific to given application, enhancing Kubernetes API in order to fully manage instances on behalf of a Kubernetes user. Operators use Kubernetes primitive mechanisms as replication or deployment to create easily deployable and fully manageable services. They don't apply only to one-node software but also to distributed applications as NoSQL databases.

The list of new operators is growing but among them we can already find some important references to Elaticsearch, Apache Spark, Apache Kafka, RDBMS (MySQL, PostgreSQL, ...), Prometheus and so forth. It means the Kubernetes could be use similarly to some of cloud managed services where we only need to specify some initial requirements (throughput, parallelisation) and let the services to take the rest operational overhead (deployment, monitoring, provisioning, scalability, CRON scheduling).

Thus, operational complexity with Kubernetes decreases. Since it's able to manage applications, databases and infrastructure, we gain one common management piece for all of them.

Workloads isolation

Another interesting concept of Kubernetes is the ability to isolate workloads with the help of namespaces. Namespace is a kind of virtual cluster backed by one physical cluster. They group set of resources and let to:

For the case of data processing pipelines above points help a lot. Names isolation help to work with normalized code without taking care about environment prefixes in the names. For instance, instead of making 3 configuration entries for one database, one for each environment, e.g. dev-mysql-store, stage-mysql-store, prod-mysql-store, we can simply use mysql-store and let Kubernetes to figure out which one should be used thanks to the defined namespace. The second point of the list is related to resources allocation. It's also important for data pipelines. We can take as example a pipeline composed of batch jobs executed at regular interval. With resource quotas we can create one namespace for business critical jobs and reserve to it more resources than to another namespace with less critical jobs those execution can take longer.

Vendor agnostic

Kubernetes is also vendor-agnostic. That said it isn't coupled to one specific cloud provider and the day when we'll need to migrate our data pipelines to another provider, it'll be a plus. Even though this scenario doesn't occur very often nowadays, it's still a point to keep in mind during workloads implementation.

A great example of Kubernetes agnostic character is the way it deals with volumes. A volume is the place where files used by Kubernetes resources are stored. It can be a local file system or network one (e.g. S3, GCP) and can be used in 2 different ways. The first one consists on defining expected volume explicitly. This approach is not portable since for a migration we'd change the volumes of all pods. But there is another more portable solution called volume claims. In this situation the resource doesn't want a concrete volume but rather some specific amount on space somewhere. And it's up to administrators to define such places. This approach is much more flexible than previous one since it doesn't require any template changes.

And this flexibility is interesting for the pipelines. Not only it allows to migrate them easier but also helps to compare them just before taking the final decision about provider.

This post shown some of interesting Kubernetes features that could enhance already data processing frameworks including Apache Spark. Reliability, scalability, isolation - all these keywords of a good software promise a lot. However between their utility and real implementation is still a gap. The gap that was started to be approached by Spark on Kubernetes project, released in alpha version in Apache Spark 2.3.0. The project that will be the main topic of the subsequent posts on this blog.