Use cases of Java ServiceLoader

Versions: Apache Beam 2.2.0, Apache Spark 2.2.0, Java 8

During my analysis of Apache Spark and Apache Beam projects I've discovered the use of ServiceLoader - a Java's util class used to load other classes dynamically.

This post presents ServiceLoader feature. The first section describes possible use cases and explains its API. The two next sections show the role of ServiceLoader in Apache Spark and Apache Beam. The last part shows how the ServiceLoader can be used through some simple learning tests. Please note that the ServiceLoader presented below concerns Java 8. The Java 9 version is deliberately omitted since it's not implemented in Spark or Beam sections.

ServiceLoader explained

In order to understand ServiceLoader it's important to discover the definitions of service and service provider. In Java world, a service is a set of interfaces that describes what they (interfaces) can do. The service is very often an interface or an abstract class. In the other side, a service provider is the implementation of the service. That means, it explains how the set of interfaces are done.

ServiceLoader is a class helping to load the service providers for services. The ServiceLoader resolves the classes to load from the configuration file defined in META-INF/services directory. The file must be named with the full name of the loaded service. For instance, if we want to load a service provided for com.waitingforcode.TestService, we'd create a file with this name in META-INF/services directory. The file should contain the list of all available service providers, separated by new line separator.

The service provider can be loaded from one of the following factory methods:

All available classes are accessible through iterator() method. It returns the iterator typed to the class of loaded service. The last interesting method (classical toString, notify... are omitted) is reload(). As its name indicates, it allows to reload the service providers from scratch. Thanks to this method we can easily manipulate ServiceLoader behavior on runtime (ex: override the services file and reload the ServiceLoader after).

If in the classpath 2 or more files exist for given service, ServiceLoader will simply return all of declared providers through its iterator. The loaded configuration files are represented by Enumeration<URL> java.util.ServiceLoader.LazyIterator#configs field created either with java.lang.ClassLoader's getResources(String name) or getSystemResources(String name) method.

ServiceLoader in Apache Spark

Apache Spark's use of ServiceLoader is mainly visible in 2 places in SQL module: structured streaming data source registry and UI SQL tab. The service providers for both these services are located in appropriated places:

bartosz:~/programming/spark-sql-2.2.0/META-INF$ tree services/
services/
β”œβ”€β”€ org.apache.spark.scheduler.SparkHistoryListenerFactory
└── org.apache.spark.sql.sources.DataSourceRegister

0 directories, 2 files

The use of both is pretty straightforward. For the case of data source registry, the ServiceLoader is simply used to resolve the service provider for given data source, either from the data source name or from full class name. The whole mechanism is explained in the Kafka source provider section of the post about Apache Kafka in Spark Structured Streaming module.

In the other side, the UI SQL tab is loaded in org.apache.spark.ui.SparkUI#createHistoryUI(conf: SparkConf, listenerBus: SparkListenerBus, securityManager: SecurityManager, appName: String, basePath: String, startTime: Long) method:

val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
  Utils.getContextOrSparkClassLoader).asScala
listenerFactories.foreach { listenerFactory =>
  val listeners = listenerFactory.createListeners(conf, sparkUI)
  listeners.foreach(listenerBus.addListener)
}

As you can see, there is no magic. The listener factories are first loaded with ServiceLoader and later they're used to create specific instances of listeners. These instances are finally added to the SparkListenerBus instance. It, as its name indicates, posts each type of event to corresponding listener methods (e.g. onStageSubmitted for stage submit event, onJobStart to job start event and so on).

Another Spark's place using ServiceLoader is cluster manager retrieval. It can be found at spark-yarn module in META-INF/services/org.apache.spark.scheduler.ExternalClusterManager file. The ExternalClusterManager interface defines the cluster manager that can be used to schedule Spark jobs. One of them is org.apache.spark.scheduler.cluster.YarnClusterManager. The cluster manager to use is loaded in org.apache.spark.SparkContext#getClusterManager(url: String) method, similarly to the data source:

val serviceLoaders =
  ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))

// Example of canCreate(url) in YarnClusterManager:
override def canCreate(masterURL: String): Boolean = {
  masterURL == "yarn"
}

ServiceLoader in Apache Beam

The ServiceLoader in Apache Beam is used to adapt the behavior to the runtime environment. As you probably remember from previous posts about Apache Beam, this data processing framework translates the processing pipeline to the code understandable by the used runner. We can see that through the example of 2 loaded services, FileSystemRegistrar and PipelineOptionsRegistrar:

bartosz:~/programming/beam-2.2.0/META-INF$ tree services/
services/
β”œβ”€β”€ org.apache.beam.sdk.io.FileSystemRegistrar
└── org.apache.beam.sdk.options.PipelineOptionsRegistrar

0 directories, 2 files

The pipeline options registry, represented here as org.apache.beam.sdk.options.PipelineOptionsRegistrar file, defines all available pipeline options. In the case of main SDK module (beam-sdks-java-core), these options are exposed as the implementation of org.apache.beam.sdk.options.PipelineOptionsRegistrar interface's org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar#getPipelineOptions() method. For Google Cloud Platform environment, the additional pipeline options are defined in org.apache.beam.sdk.extensions.gcp.options.GcpPipelineOptionsRegistrar.

And since pipeline options are coupled to the runners, we can also find a service file under META-INF/services/org.apache.beam.sdk.runners.PipelineRunnerRegistrar. According to the PipelineRunnerRegistrar Javadoc comment, it's a facility method to automatically register runners:

{@link PipelineRunner} creators have the ability to automatically have their 
{@link PipelineRunner} registered with this SDK by creating a {@link ServiceLoader} entry
and a concrete implementation of this interface.

Similar role, i.e. the adaptation of the code to the runtime context, is reserved to org.apache.beam.sdk.io.FileSystemRegistrar. Apache Beam, as other data processing frameworks, can work with different file systems. It's able to work in local file system on Windows, Linux or Mac but it's also able to deal with remote file systems as Google Cloud Storage. And all of these implementations are loaded through ServiceLoader's FileSystemRegistrar file.

Beside runners and execution context, ServiceLoader is also used in different extensions. For instance we can retrieve META-INF/services file in Protobuf module (beam-sdks-java-extensions-protobuf). It's used there in the file called org.apache.beam.sdk.coders.CoderProviderRegistrar. It defines the method returning org.apache.beam.sdk.coders.CoderProvider instances.

ServiceLoader example

Below tests show how ServiceLoader behaves in different contexts: when 1 provider is defined, when 2 provider are defined in the same file and when 2 providers are declared in 2 different configuration files (classpath and joined external JAR). Before the code, let's see the content of META-INF/services files:

# com.waitingforcode.serviceloader.WelcomeService
com.waitingforcode.serviceloader.EnglishWelcomeGoodbyeService

# com.waitingforcode.serviceloader.GoodbyeService
com.waitingforcode.serviceloader.EnglishWelcomeGoodbyeService
com.waitingforcode.serviceloader.FrenchWelcomeGoodbyeService

# com.waitingforcode.serviceloader.DispatchedService - from local classpath
com.waitingforcode.serviceloader.SecondDispatchedService

# com.waitingforcode.serviceloader.DispatchedService - from serviceloader-test-1.0-SNAPSHOT.jar
com.waitingforcode.serviceloader.FirstDispatchedService

And finally the tests and services are defined as:

class ServiceLoaderTest extends FunSuite with Matchers with BeforeAndAfter {

  test("should load only the single one defined implementation of service") {
    val welcomeServiceLoader = ServiceLoader.load(classOf[WelcomeService])
    val serviceToUse = welcomeServiceLoader.iterator().next()

    serviceToUse shouldBe a [EnglishWelcomeGoodbyeService]
  }

  test("should find goodbye service with language prefix") {
    val serviceToUse = loadGoodbyeService("French")

    serviceToUse shouldBe defined
    serviceToUse.get shouldBe a [FrenchWelcomeGoodbyeService]
  }

  private def loadGoodbyeService(languePrefix: String): Option[GoodbyeService] = {
    val goodbyeServiceLoader = ServiceLoader.load(classOf[GoodbyeService])
    val goodbyeServiceLoaderIterator = goodbyeServiceLoader.iterator()
    var goodbyeService: Option[GoodbyeService] = None
    while (goodbyeServiceLoaderIterator.hasNext && goodbyeService.isEmpty) {
      val currentService = goodbyeServiceLoaderIterator.next()
      if (currentService.getClass.toString.contains(languePrefix)) {
        goodbyeService = Some(currentService)
      }
    }
    goodbyeService
  }

  test("should load 2 available providers - 1 from classpath and 1 from jar") {
    import scala.collection.JavaConverters._
    val dispatchedServiceLoader = ServiceLoader.load(classOf[DispatchedService])

    val loadedClasses = dispatchedServiceLoader.asScala.map(loadedClass => loadedClass.getClass)

    loadedClasses should have size 2
    loadedClasses should contain allOf(classOf[FirstDispatchedService], classOf[SecondDispatchedService])
  }

}

trait WelcomeService {
  def sayHi: String
}

trait GoodbyeService {
  def sayGoodbye: String
}

class EnglishWelcomeGoodbyeService extends WelcomeService with GoodbyeService {
  override def sayHi: String = "Hi"

  override def sayGoodbye: String = "Goodbye"
}

class FrenchWelcomeGoodbyeService extends WelcomeService with GoodbyeService {
  override def sayHi: String = "Salut"

  override def sayGoodbye: String = "Au revoir"
}

trait DispatchedService {}
class FirstDispatchedService extends DispatchedService {}
class SecondDispatchedService extends DispatchedService {}

As shown through the sections above, ServiceLoader is a simple tool used to load service providers through one of 3 methods (load, load with custom class loader and load from extensions class loader). The service providers are loaded from the files from META-INF/services directory. The second and the third sections presented ServiceLoader's use in data processing frameworks: Spark and Beam. The final and the last part shown some simple use cases of ServiceLoader in learning tests.