Setting up Apache Spark on Kubernetes with microk8s

on waitingforcode.com

Setting up Apache Spark on Kubernetes with microk8s

You want to become a data engineer and don't know where to start? I was like you 4 years ago when I started to learn the data. From that experience I prepared a 12-weeks course that will help you to become a data engineer. Join the class today! Join the class!
When I discovered microk8s I was delighted! An easy installation in very few steps and you can start to play with Kubernetes locally (tried on Ubuntu 16). However, running Apache Spark 2.4.4 on top of microk8s is not an easy piece of cake. In this post I will show you 4 different problems you may encounter, and propose possible solutions.

microk8s installation is pretty well explained on its official documentation (link in Further reading section), so I won't cover it here. Let's suppose it's up and running. But unfortunately, it isn't enough to make Apache Spark application running :( In the next 4 sections I will show you the problems I encountered during testing SparkPi on Kubernetes locally.

PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target

Throughout this article we'll try to submit a new Apache Spark job with that command:

 ./bin/spark-submit  --master k8s://127.0.0.1:16443  --deploy-mode cluster  --name spark-pi --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=2  --conf spark.kubernetes.container.image=waitingforcode_spark:v0.2_spark2.4.4   --conf spark.app.name=spark-pi --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark local:///opt/spark/examples/target/original-spark-examples_2.11-2.4.4.jar  5

If you setup it like this, there is a big risk that it doesn't work because of this exception (truncated for readability):

Exception in thread "kubernetes-dispatcher-0" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@38727517 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@2a3f563d[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
    at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
    at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:632)
    at java.lang.Thread.run(Thread.java:748)
Exception in thread "main" io.fabric8.kubernetes.client.KubernetesClientException: Failed to start websocket
    at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$2.onFailure(WatchConnectionManager.java:207)
    at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.java:543)
    at okhttp3.internal.ws.RealWebSocket$2.onFailure(RealWebSocket.java:208)
    at okhttp3.RealCall$AsyncCall.execute(RealCall.java:148)
    at java.lang.Thread.run(Thread.java:748)
Caused by: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
    at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1946)
    at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:316)
    at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:310)
    at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1639)
    at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:223)
    at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
    at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
    at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
    at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
    ... 4 more
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:397)
    at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:302)
    at sun.security.validator.Validator.validate(Validator.java:262)

According to microk8s documentation about ports, our master's port (16443) is SSL encrypted. The exception clearly points out a problem with the provided certificates, or rather the lack of them. Fortunately, the community behind Spark on Kubernetes thought about that case and added some authentication parameter to spark-submit command. The one that will solve our problem is called spark.kubernetes.authenticate.submission.caCertFile which provides CA cert file when we start the driver.

Thanks to caCertFile, our spark-submit command should look like --conf spark.kubernetes.authenticate.caCertFile=/var/snap/microk8s/current/certs/ca.crt and execute correctly on Kubernetes. Or, almost correctly.

Unauthorized

I wrote "almost correctly" because you should now get an authorization exception like:

Exception in thread "main" io.fabric8.kubernetes.client.KubernetesClientException: Unauthorized
    at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$2.onFailure(WatchConnectionManager.java:201)
    at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.java:543)
    at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:185)
    at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)
    at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

The problem indicates some issues with authentication against Kubernetes API server. According to the Kubernetes documentation, "Kubernetes uses client certificates, bearer tokens, an authenticating proxy, or HTTP basic auth to authenticate API requests through authentication plugins". From that you can easily find another Spark configuration property to overcome the issue, called spark.kubernetes.authenticate.submission.oauthToken. It defines OAuth token to use for Kubernetes API authorization. To retrieve your token, get first the default-token by looking for a name starting with "default-token-":

bartosz:/data/spark-2.4.4$ microk8s.kubectl -n kube-system get secret
NAME                                             TYPE                                  DATA   AGE
attachdetach-controller-token-2462t              kubernetes.io/service-account-token   3      2d3h
certificate-controller-token-d2msj               kubernetes.io/service-account-token   3      2d3h
clusterrole-aggregation-controller-token-kgkh5   kubernetes.io/service-account-token   3      2d3h
coredns-token-5xwj5                              kubernetes.io/service-account-token   3      52m
cronjob-controller-token-v9lj5                   kubernetes.io/service-account-token   3      2d3h
daemon-set-controller-token-mswf4                kubernetes.io/service-account-token   3      2d3h

default-token-cndqq                              kubernetes.io/service-account-token   3      2d3h

After that, issue microk8s.kubectl -n kube-system describe secret default-token-cndqq:

bartosz:/data/spark-2.4.4$ microk8s.kubectl -n kube-system describe secret default-token-cndqq
Name:         default-token-cndqq
Namespace:    kube-system
Labels:       
Annotations:  kubernetes.io/service-account.name: default
              kubernetes.io/service-account.uid: 099279d3-1e07-4656-a525-638bae57f5b6

Type:  kubernetes.io/service-account-token

Data
====
ca.crt:     1103 bytes
namespace:  11 bytes
token:      MY_TOKEN

After that, spark-submit should have an extra parameter --conf spark.kubernetes.authenticate.submission.oauthToken=MY_TOKEN.

UnknownHostException: kubernetes.default.svc: Try again

After adding 2 properties to spark-submit we're able to send the job to Kubernetes. However, the server can not be able to execute the request successfully. It happened to me because of this exception (truncated for readability):

20/01/10 16:36:31 ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: External scheduler cannot be instantiated
    at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2794)
    at org.apache.spark.SparkContext.(SparkContext.scala:493)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Pod]  with name: [spark-pi-1578674176026-driver]  in namespace: [default]  failed.
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
    at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:229)
Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try again
    at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)

After some digging, I found a potential solution in Cannot resolve pod ip address #188 issue on Github. What helped me to solve the issue was a simple disable/enable of the DNS:

bartosz:/data/spark-2.4.4$ microk8s.disable dns
Disabling DNS
Reconfiguring kubelet
Removing DNS manifest
[sudo] password for bartosz:
DNS is disabled

bartosz:/data/spark-2.4.4$ microk8s.enable dns
Enabling DNS
Applying manifest
serviceaccount/coredns created
configmap/coredns created
deployment.apps/coredns created
service/kube-dns created
clusterrole.rbac.authorization.k8s.io/coredns created
clusterrolebinding.rbac.authorization.k8s.io/coredns created
Restarting kubelet
DNS is enabled

To check if it worked, you can use busybox and test nslookup against kubernetes.default, like here:

bartosz:/data/spark-2.4.4$ microk8s.kubectl run -i --tty --image busybox:1.28.4 dns-test2 --restart=Never --rm /bin/sh

nslookup kubernetes.default
Server:    10.152.183.10
Address 1: 10.152.183.10 kube-dns.kube-system.svc.cluster.local

Name:      kubernetes.default
Address 1: 10.152.183.1 kubernetes.default.svc.cluster.local

Bad Kubernetes client version for Apache Spark 2.4.4

I thought that the unknown host error was the last one but unfortunately, no. After resubmitting the application with spark-submit, I got a new exception:

20/01/10 17:30:50 INFO SparkContext: Added JAR file:///opt/spark/examples/target/original-spark-examples_2.11-2.4.4.jar at spark://spark-pi-1578677440483-driver-svc.default.svc:7078/jars/original-spark-examples_2.11-2.4.4.jar with timestamp 1578677450586
20/01/10 17:30:54 INFO ExecutorPodsAllocator: Going to request 2 executors from Kubernetes.
20/01/10 17:30:54 WARN WatchConnectionManager: Exec Failure: HTTP 403, Status: 403 -
java.net.ProtocolException: Expected HTTP 101 response but was '403 Forbidden'
    at okhttp3.internal.ws.RealWebSocket.checkResponse(RealWebSocket.java:216)
    at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:183)
    at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)
    at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20/01/10 17:30:54 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)

Once again, after searching a little, I found Andy Grove's article EKS security patches cause Apache Spark jobs to fail with permissions error explaining that a security fix caused some regressions on Apache Spark 2.4.4. Fortunately, Andy gives 3 possible solutions to overcome the issue and the one which worked for me was the Option 2 where I set io.fabric8 dependency in resource-managers/kubernetes/core/pom.xml to 4.4.2:

    <dependency>
      <groupId>io.fabric8</groupId>
      <artifactId>kubernetes-client</artifactId>
      <version>4.4.2</version>
      <exclusions>
        <exclusion>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>com.fasterxml.jackson.dataformat</groupId>
          <artifactId>jackson-dataformat-yaml</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

After recompiling the project and rebuilding the Apache Spark image with docker-image-tool.sh, I was finally able to see my spark-submit working!

As of this writing, Kubernetes resource manager is still marked as an experimental feature and as you can see in this article, it has still some issues making its use difficult for beginners. However, Kubernetes and Spark on Kubernetes community is very active and finding answers to our questions is very often only a matter of time.

Share on:

Share, like or comment this post on Twitter:

Share, like or comment this post on Facebook: