Horizontal scalability in Gnocchi

Versions: Gnocchi 4.2

One of the reasons behind the choice of Gnocchi as time series database to study was its naturally provided horizontal scalability. At the moment of making that choice I was relying only on the official documentation. Now it's a good moment to come back and analyze the horizontal scalability by myself.

This post shows how the horizontal scalability was implemented in Gnocchi. The first part recalls some context for the 2 remaining parts presenting the actors involved in the scaling. The former one presents the coordinator while the latter the partitioner. The post doesn't contain the tests but instead of them the source code snippets are added to prove the telling.

Aggregation layer

The raw measures are processed by the aggregation layer that generates the aggregation results exposed later to the end user. In order to guarantee the freshness of the data this part must be scalable. Nobody wants to see the results generate with hours of delay. It's why Gnocchi comes here with the horizontal scalability support, i.e. we can add as many nodes (metricd service) as we need to generate the data with low latency. And unlike other time-series systems, it's neither a commercial extension nor difficult to achieve.

The data processing in the aggregation layer is distributed: one metricd service can be responsible for at least 1 sack (parallelization unit). And the responsibility can be not exclusive, i.e. different services can be responsible for the same sack because of the replication. It brings automatically the concepts of locking, i.e. situation when only one of these services processes the sack at given moment.

Coordinator

As in a lot of similar distributed architectures, the workload distribution is orchestrated by a coordinator. Every data processor connects to it during the configuration step, just here:

# gnocchi.cli.metricd.MetricProcessBase#_configure
def _configure(self):
    member_id = "%s.%s.%s" % (socket.gethostname(),
                              self.worker_id,
                              # NOTE(jd) Still use a uuid here so we're
                              # sure there's no conflict in case of
                              # crash/restart
                              str(uuid.uuid4()))
    self.coord = get_coordinator_and_start(member_id,
                                            self.conf.coordination_url)

The coordinator is not a part of Gnocchi. It's provided with Tooz library also included in the OpenStack platform. As you can see in the above snippet the coordinator is resolved through the configuration entry. However when it's not provided, it's set by default to the URL of the index storage.

The processing daemons connect to the coordinator to acquire the locks of the processed sacks:

# locking from gnocchi.cli.metricd.MetricProcessor#_run_job
for s in sacks:
    # TODO(gordc): support delay release lock so we don't
    # process a sack right after another process
    lock = self.incoming.get_sack_lock(self.coord, s)
    if not lock.acquire(blocking=False):
        continue

Another responsibility of the coordinator is the group membership management. Each of nodes establishing a connection with the coordinator is the member of a group that it provided during the configuration step:

# gnocchi.cli.metricd.MetricProcessor#_configure
def _configure(self):
    super(MetricProcessor, self)._configure()

    # create fallback in case paritioning fails or assigned no tasks
    self.fallback_tasks = list(
        six.moves.range(self.incoming.NUM_SACKS))
    try:
        self.partitioner = self.coord.join_partitioned_group(
            self.GROUP_ID, partitions=200)

The metric processors informs the coordinator that they're still alive by sending the heartbeats. Fortunately it's done automatically thanks to the start_heart=True parameter defined when the connection is established.

Partitioner

If you looked carefully at the configuration step of MetricProcessor class, you certainly noticed that the join_partitioned_group method returns another object called partitioner. It's used to assign the sacks to the particular nodes, as shown here:

# gnocchi.cli.metricd.MetricProcessor#_get_sacks_to_process
def _get_sacks_to_process(self):
    try:
        self.coord.run_watchers()
        if (not self._tasks or
                self.group_state != self.partitioner.ring.nodes):
            self.group_state = self.partitioner.ring.nodes.copy()
            self._tasks = [
                i for i in six.moves.range(self.incoming.NUM_SACKS)
                if self.partitioner.belongs_to_self(
                    i, replicas=self.conf.metricd.processing_replicas)]

As you can notice, the processor calls run_watchers method before retrieving the sacks (here represented as _tasks field). The run_watchers call guarantees to the partitioner to be up-to-date with all possible changes in the group that could happen from the last processing. It will invoke the appropriate callbacks defined in the tooz.partitioner.Partitioner class:

Objects, and in the case of Gnocchi, sacks distribution is based on the principle of consistent hashing ring, presented more in details in the post about Dynamo paper and consistent hashing. Gnocchi also uses here the implementation provided by Tooz library, tooz.hashring.HashRing.

Obviously it would be counter-effective to reinvent the wheel and define a new coordination and resources sharing mechanism. Initially Gnocchi was developed in the environment of OpenStack and quite naturally it uses the solutions provided by the Tooz library. As we could learn in the first section, the distributed character of the system is mainly visible in the example of the processing layer where one node is responsible for different sacks. All is coordinated by the coordinator implemented in Tooz. Among the main duties of the coordinator we can distinguish locks and group membership management. It's completed by the object called partitioner that knows which objects (sacks in Gnocchi) belongs to which nodes of the group.