Sacks - data parallelization unit in Gnocchi

Versions: Gnocchi 4.2

To facilitate parallel processing Apache Spark and Apache Kafka have their concept of partitions, Apache Beam works with bundles and Gnocchi deals with sacks. Despite the different naming, the sacks are the same for Gnocchi as the partitions for Spark or Kafka - the unit of work parallelization.

This time we focus on another important concept in Gnocchi called sacks. The first section explains how they're generated. The next one presents the reading part for the preparation of time series aggregations. The final section describes some general information about the configuration.

Sacks generation

To facilitate and parallelize data processing, the incoming data is put in different sacks. The storage action responsible for handling new measures is one of these methods: gnocchi.incoming.IncomingDriver#_store_new_measures(metric_id, data) or gnocchi.incoming.IncomingDriver#add_measures_batch(metrics_and_measures). In normal circumstances the latter one calls under-the-hood the former one except for Redis and Ceph storage that implement it independently. Despite that, they all use sacks. For instance, if the backend is a local filesystem, the sacks are considered as paths in the working directory. Gnocchi puts them inside a directory with metric's measures, as shown in the following tests:

def should_build_correct_sack_name(self):
  file_storage = incoming.file.FileStorage(Conf())

  sack_path = file_storage._sack_path(7)

  # as you can see, the sack path is a concatenation of:
  # "incoming" + number of sacks (defined in the gnocchi-config file under "sacks" key) + sack number
  self.assertEqual('/incoming30-7', sack_path)

  def should_store_new_measures_in_appropriate_metric_sack_in_different_files(self):
  sacks_path = '/tmp/incoming30-4/00000000-0000-0000-0000-000000000004'
  try:
      os.makedirs(sacks_path)
  except OSError as exc:
      if exc.errno == errno.EEXIST and os.path.isdir(sacks_path):
          shutil.rmtree(sacks_path)

  file_storage = incoming.file.FileStorage(Conf())
  file_storage.basepath_tmp = '/tmp/'

  for i in range(0, 5):
      file_storage.add_measures(uuid.UUID(int=4), [])
      time.sleep(5)

  # As you can see, at least for local filesystem storage, new measures are
  # added as separate files to the sacks directory
  # Globally the directory looks like here:
  # bartosz:/tmp$ tree incoming30-4/00000000-0000-0000-0000-000000000004/
  #    incoming30-4/00000000-0000-0000-0000-000000000004/
  #    β”œβ”€β”€ 449d66be-d6c5-485d-97f1-4cc2311d918f_20180425_13:19:02
  #    β”œβ”€β”€ 8304a8e1-c1bd-44f2-9f82-66872c52b61e_20180425_13:19:07
  #    β”œβ”€β”€ 86ac1386-2998-4c7e-8185-965a972d7fd3_20180425_13:18:47
  #    β”œβ”€β”€ aa74f624-dbc5-438f-a9bd-f32af4574c55_20180425_13:18:52
  #    └── d1bb81b4-7c23-4270-ba3e-aecad3724306_20180425_13:18:57
  # 0 directories, 5 files

  measures = [measure_file for measure_file in listdir(sacks_path)
              if isfile(join(sacks_path, measure_file))]
  self.assertEqual(5, len(measures))

As you can see in the above tests, new measures are written directly in new timestamped files. The sack number corresponding to the specific metric is computed in gnocchi.incoming.IncomingDriver#sack_for_metric(metric_id) and it's independent on the underlying storage:

def sack_for_metric(self, metric_id):
    return metric_id.int % self.NUM_SACKS

The other storage engines behave similarly. Since it's not always easy to demonstrate it through the tests, let's prove that saying by analyzing source code that for Redis looks like:

def add_measures_batch(self, metrics_and_measures):
  notified_sacks = set()
  pipe = self._client.pipeline(transaction=False)
  for metric_id, measures in six.iteritems(metrics_and_measures):
      sack_name = self.get_sack_name(self.sack_for_metric(metric_id))
      path = self._build_measure_path_with_sack(metric_id, sack_name)
      pipe.rpush(path, self._encode_measures(measures))
      if self.greedy and sack_name not in notified_sacks:
          # value has no meaning, we just use this for notification
          pipe.setnx(sack_name, 1)
          notified_sacks.add(sack_name)
  pipe.execute()

As you can see, the concept of sack and metric path is also present here. The difference is that this path is used as a key in the Redis store and new measures are appended to the list stored under this key.

Sacks in aggregations computation

The sacks are processed by the aggregation services called metricd. Each of them is running in a different node and all belong to the same group on the cluster. Thanks to this membership, they can share the responsibility for different sacks (the details of this are explained better in the post about Horizontal scalability in Gnocchi). Each aggregation daemon knows the numbers of sacks that belong to it.

During the processing the first thing it does is to discover what metrics need to be processed. To do so, the daemon queries the location pointed out by the sack path presented in the previous section. Let's continue with our local filesystem example where this discovery is made through these 2 methods:

def list_metric_with_measures_to_process(self, sack):
  return set(self._list_target(self._sack_path(sack)))

@staticmethod
def _list_target(target):
  try:
    return os.listdir(target)
  except OSError as e:
    # Some other process treated this one, then do nothing
    if e.errno == errno.ENOENT:
      return []
    raise

# And this method returns new measures 
@contextlib.contextmanager
def process_measure_for_metric(self, metric_id):
  files = self._list_measures_container_for_metric(metric_id)
  measures = self._make_measures_array() # creates numpy.array([], dtype=TIMESERIES_ARRAY_DTYPE)
  for f in files:
    abspath = self._build_measure_path(metric_id, f)
    with open(abspath, "rb") as e:
        measures = numpy.concatenate((
            measures, self._unserialize_measures(f, e.read())))

  yield measures

  self._delete_measures_files_for_metric(metric_id, files)

The code adopts a classical path where the measures to process are: read from the metric's directory, unserialized, put to the measures array and returned to the metricd that processes them. At the end of the processing all read files are deleted. It's a pretty classical pattern but a potential problem could be the ordering of measures. Nobody guarantees that measure files, composed of a random UUID with the generation datetime are ordered. However, it's not a problem at this stage because it's up to the method computing the aggregates to sort the returned measures:

# gnocchi.storage.StorageDriver#_compute_and_store_timeseries
def _compute_and_store_timeseries(self, metric, measures):
  # NOTE(mnaser): The metric could have been handled by
  #               another worker, ignore if no measures.
  if len(measures) == 0:
      LOG.debug("Skipping %s (already processed)", metric)
      return

  measures = numpy.sort(measures, order='timestamps')

  agg_methods = list(metric.archive_policy.aggregation_methods)
  block_size = metric.archive_policy.max_block_size
  back_window = metric.archive_policy.back_window
  # ...

Thus, for each processing job Gnocchi's aggregation daemon retrieves all newly registered measures for a given metric, concatenates them into a single array and returns to the method responsible for the generation of the final aggregations.

Configuration

Gnocchi's documentation gives some inputs about the sacks. First of all it provides a method to compute the initial value for the sacks number:

sacks = number of active metrics / 300

In the above formula we can replace 300 by, depending on the case, 500 (if we don't expect to add new metrics) or by 100 (if we've just started and the number of active metrics is expected to grow). This formula brings also another question - how to change the number of metrics for already started system ? Unfortunately, it doesn't come without the downtime. The operation consists on calling gnocchi-change-sack-size $number_of_sacks command that will execute only when there is no remaining measures to process. Thus, we need to stop the input services and wait that metricd finishes its work before making the change. After that we can restart all services with modified number of sacks.

To reduce the downtime an alternative version exists. It consists on applying gnocchi-change-sack-size on a new storage. After this action we must make all input services sending measures to this new storage. Finally, when all metricd daemons finish to process data from the old storage, switch also them to consume measures from new location.

Gnocchi uses the concept of sacks to represent the parallelization unit. As shown through this test, the concept is similar for all storage engines - it constructs an unique path to each of sacks and puts inside that the associated metrics with all new measures to process. The number of all sacks is then strongly depending on the number of active metrics. With that we avoid the case of uneven distribution.