Carbonara storage format

on waitingforcode.com

Carbonara storage format

Even though carbonara is mostly known as an Italian pasta dish, in the context of Gnocchi it means completely different thing. Carbonara is the name of time points storage format in Gnocchi.

This post tries to explain the characteristics of the Carbonara in 4 sections. The first one focuses on its main concepts. The next 3 are dedicated to the data representation formats involved in the processing lifecycle: BoundTimeSerie, GroupedTimeSerie and finally AggregatedTimeSerie.

Carbonara internals

Gnocchi can store time series points in different storage engines: in the files (local or distributed file system) or in Redis. The data is stored as bytes so the underlying storage is transparent for the system. Among the stored types we can distinguish: raw measures and aggregation results. The former ones represent the new measures that need to be processed. The latter ones concern the result of the processing.

The raw measures are represented as an array composed of the tuples representing: timestamp and registered value. These values are persisted in plain (uncompressed) format, simply as serialized bytes. The serialization happens in gnocchi.incoming.StorageDriver#_encode_measures(self, measures) method. Its result is later persisted to the storage used in the incoming layer.

More complicated process has place for the aggregated time series that are constructed after the processing of the raw measures by the aggregation engine (metricd). The most noticeable difference is the compression. The aggregation results are compressed with LZ4 that, according to the release notes of the version 3, helped to reduce the storage requirements by 50 points. And who tells the compression, he automatically tells columnar format which often performs better when the consecutive sets of data share the same characteristics (= compressing 1 type of data is often more efficient than mixed one, you can learn more in the post about compression in Parquet). In Gnocchi the processed measures are persisted as 2 distinct arrays. The first contains the timestamps (or booleans, we'll see that in the part about AggregatedTimeSerie) and the second aggregated measures.

Under-the-hood, the processed time series are represented by the children classes of TimeSerie. The parent provides mainly getters and properties such as: first or last entry among the stored points, values or timestamps that are included in the field called ts. This class also define the compression method (LZ4) and the factory method to create an instance of GroupedTimeSerie, explained in further in the post.

BoundTimeSerie

When the raw measures are processed, they're first converted back from the bytes format to the Numpy's array. The first processing step consists on retrieving the unaggregated time series that are represented by the instance of BoundTimeSerie class. Its name fully illustrates the responsibility of this class: keeping a time serie limited in size. The limits of stored points are controlled by 2 properties of the class:

  • block_size - defines how many points will be stored in given BoundTimeSerie and later persisted to the storage. It's expressed as a time unit, e.g. block_size of 1 second for the measures gathered at 1 second interval will only store 1 measure point in the BoundTimeSerie. The tests below assert this behavior:
    def should_keep_only_1_second_of_data_because_of_the_block_size(self):
      # the method, together with serialize/unserialize is mostly used when new measures are processed.
      # In this situation the already stored measures (unaggregated) are unserialized and the new
      # measures are serialized to make them available for the further processing
      bounded_timeserie = carbonara.BoundTimeSerie.from_data(
          [datetime64(2018, 1, 1, 10, 0, 0), datetime64(2018, 1, 1, 10, 0, 1),
            datetime64(2018, 1, 1, 10, 0, 2), datetime64(2018, 1, 1, 10, 0, 3),
            datetime64(2018, 1, 1, 10, 0, 4), datetime64(2018, 1, 1, 10, 0, 5)],
          [0, 1, 2, 3, 4, 5],
          block_size=numpy.timedelta64(1, 's'))
      # at this moment the BoundTimeSerie stores all 6 points
      self.assertEqual(len(bounded_timeserie), 6)
      # but now, when new values are set the block_size is involved and keeps only 1 second of data
      bounded_timeserie.set_values(numpy.array([(datetime64(2018, 1, 1, 10, 0, 6), 6),
                                                (datetime64(2018, 1, 1, 10, 0, 7), 7)],
                                                dtype=carbonara.TIMESERIES_ARRAY_DTYPE))
      self.assertEqual(len(bounded_timeserie), 1)
      self.assertEqual(bounded_timeserie.timestamps[0], datetime64(2018, 1, 1, 10, 0, 7))
    
    def should_keep_all_points_because_of_a_large_block_size(self):
      # Unlike the previous test case, here the block_size is big enough
      # to keep all points of given time serie
      bounded_timeserie = carbonara.BoundTimeSerie.from_data(
          [datetime64(2018, 1, 1, 10, 0, 0), datetime64(2018, 1, 1, 10, 0, 1),
            datetime64(2018, 1, 1, 10, 0, 2), datetime64(2018, 1, 1, 10, 0, 3),
            datetime64(2018, 1, 1, 10, 0, 4), datetime64(2018, 1, 1, 10, 0, 5)],
          [0, 1, 2, 3, 4, 5],
          block_size=numpy.timedelta64(20, 'm'))
      self.assertEqual(len(bounded_timeserie), 6)
      bounded_timeserie.set_values(numpy.array([(datetime64(2018, 1, 1, 10, 0, 6), 6),
                                                (datetime64(2018, 1, 1, 10, 0, 7), 7)],
                                                dtype=carbonara.TIMESERIES_ARRAY_DTYPE))
      self.assertEqual(len(bounded_timeserie), 8)
      
  • back_window - the behavior of block_size is influenced by the value defined for back_window. This property determines if BoundTimeSerie will store points prior to the first stored timestamp. For instance if the first stored timestamp is 10:06:00 and the back_window is of 60 seconds, the file will be able to store the data from 10:05:00. The following tests show this case:
    def should_keep_1_second_point_when_back_window_is_specified(self):
      bounded_timeserie = carbonara.BoundTimeSerie.from_data(
        [datetime64(2018, 1, 1, 10, 0, 0), datetime64(2018, 1, 1, 10, 0, 1),
          datetime64(2018, 1, 1, 10, 0, 2), datetime64(2018, 1, 1, 10, 0, 3),
          datetime64(2018, 1, 1, 10, 0, 4), datetime64(2018, 1, 1, 10, 0, 5)],
        [0, 1, 2, 3, 4, 5],
        block_size=numpy.timedelta64(1, 's'), back_window=1)
      # at this moment the BoundTimeSerie stores all 6 points
      self.assertEqual(len(bounded_timeserie), 6)
      # but now, when new values are set the block_size is involved and keeps only 1 second of data
      # + 1 second of past data
      bounded_timeserie.set_values(numpy.array([(datetime64(2018, 1, 1, 10, 0, 6), 6)],
                                                dtype=carbonara.TIMESERIES_ARRAY_DTYPE))
      self.assertEqual(len(bounded_timeserie), 2)
      expected_timestamps = numpy.array([datetime64(2018, 1, 1, 10, 0, 5),
                                        datetime64(2018, 1, 1, 10, 0, 6)], dtype='
      

During the processing the BoundTimeSerie with new measures is persisted as an unaggregated time serie. Thanks to that it's available for the next aggregation step.

GroupedTimeSerie

However the aggregation step described in the previous section is not so linear because meantime it converts BoundTimeSerie to other types of objects. The first such intermediary object is GroupedTimeSerie. Unlike BoundTimeSerie it's not a children of TimeSerie class and it's not persisted by the StorageDriver. Instead it's only an in-memory representation of the points used to compute the final aggregation.

The grouped time serie implements the logic of time window construction. The points, logically separated in BoundTimeSerie instance, are merged to the timespan according to defined granularity level. After grouping separate points to the timespans it's possible to apply aggregation methods defined in GroupedTimeSerie:

def should_group_bound_time_series_to_groups_respecting_defined_timespan(self):
  bounded_timeserie = carbonara.BoundTimeSerie.from_data(
    [datetime64(2018, 1, 1, 10, 0, 0), datetime64(2018, 1, 1, 10, 0, 1),
      datetime64(2018, 1, 1, 10, 0, 2), datetime64(2018, 1, 1, 10, 0, 6),
      datetime64(2018, 1, 1, 10, 0, 40), datetime64(2018, 1, 1, 10, 0, 59)],
    [0, 1, 2, 3, 4, 5])

  grouped_timeserie = bounded_timeserie.group_serie(granularity=numpy.timedelta64(3, 's'),
                                                    start=None)

  self.assertEqual(len(grouped_timeserie.tstamps), 4)
  expected_timespans = [datetime64(2018, 1, 1, 10, 0, 0), datetime64(2018, 1, 1, 10, 0, 6),
                        datetime64(2018, 1, 1, 10, 0, 39), datetime64(2018, 1, 1, 10, 0, 57)]
  self.assertIn(expected_timespans, grouped_timeserie.tstamps)

def should_compute_aggregates_on_grouped_time_serie(self):
  bounded_timeserie = carbonara.BoundTimeSerie.from_data(
    [datetime64(2018, 1, 1, 10, 0, 0), datetime64(2018, 1, 1, 10, 0, 1),
      datetime64(2018, 1, 1, 10, 0, 2), datetime64(2018, 1, 1, 10, 0, 6),
      datetime64(2018, 1, 1, 10, 0, 40), datetime64(2018, 1, 1, 10, 0, 59)],
    [100, 101, 102, 200, 300, 400])

  grouped_timeserie = bounded_timeserie.group_serie(numpy.timedelta64(3, 's'))
  sums = grouped_timeserie.sum()

  self.assertEqual(len(sums['values']), 4)
  self.assertIn([303, 200, 300, 400], sums['values'])

The results generated by the aggregation methods are later passed to the AggregatedTimeSerie object, detailed in the next section.

AggregatedTimeSerie

In the next step, for each granularity and aggregation method, Gnocchi uses GroupedTimeSerie to build an instance of AggregatedTimeSerie through gnocchi.carbonara.AggregatedTimeSerie#from_grouped_serie(grouped_serie, sampling, aggregation_method) factory method. It invokes another one, gnocchi.carbonara.AggregatedTimeSerie#_resample_grouped(grouped_serie, agg_name, q=None) that in its turn executes the aggregation method on given group of points and contructs new time series, exactly as shown in the following test case:

def should_build_aggregated_timeserie_from_grouped_timeserie(self):
  bounded_timeserie = carbonara.BoundTimeSerie.from_data(
    [datetime64(2018, 1, 1, 10, 0, 0), datetime64(2018, 1, 1, 10, 0, 1),
      datetime64(2018, 1, 1, 10, 0, 2), datetime64(2018, 1, 1, 10, 0, 6),
      datetime64(2018, 1, 1, 10, 0, 40), datetime64(2018, 1, 1, 10, 0, 59)],
    [1, 1, 1, 10, 20, 30])
  granularity = numpy.timedelta64(3, 's')
  grouped_timeserie = bounded_timeserie.group_serie(granularity)

  aggregated_timeserie = carbonara.AggregatedTimeSerie.from_grouped_serie(grouped_timeserie,
                                                                        granularity, 'sum')
  self.assertIn([3, 10, 20, 30], aggregated_timeserie.values)
  expected_timespans = [datetime64(2018, 1, 1, 10, 0, 0), datetime64(2018, 1, 1, 10, 0, 6),
                        datetime64(2018, 1, 1, 10, 0, 39), datetime64(2018, 1, 1, 10, 0, 57)]
  self.assertIn(expected_timespans, grouped_timeserie.tstamps)

def should_build_aggregated_timeserie_and_align_the_granularity_level(self):
  bounded_timeserie = carbonara.BoundTimeSerie.from_data(
    [datetime64(2018, 1, 1, 10, 0, 0), datetime64(2018, 1, 1, 10, 0, 1),
      datetime64(2018, 1, 1, 10, 0, 2), datetime64(2018, 1, 1, 10, 0, 6),
      datetime64(2018, 1, 1, 10, 0, 40), datetime64(2018, 1, 1, 10, 0, 59)],
    [1, 1, 1, 10, 20, 30])
  granularity_grouped = numpy.timedelta64(3, 's')
  grouped_timeserie = bounded_timeserie.group_serie(granularity_grouped)

  granularity_aggregated = numpy.timedelta64(30, 's')
  aggregated_timeserie = carbonara.AggregatedTimeSerie.from_grouped_serie(grouped_timeserie,
                                                                          granularity_aggregated, 'sum')
  # At this stage the different granularity doesn't have any impact
  # The granularity is passed only to the constructor and only when
  # resample method is called we can receive the aggregates matching the new granularity
  self.assertIn([3, 10, 20, 30], aggregated_timeserie.values)
  aggregated_timeserie = aggregated_timeserie.resample(granularity_aggregated)
  self.assertIn([13, 50], aggregated_timeserie.values)

The granularity is an important property when the aggregations are persisted to the storage. Together with a property called POINTS_PER_SPLIT it defines how many points will be stored in a single aggregated time serie split. The whole AggregatedTimeSerie is divided in chunks in its split() method which results are returned to the StorageDriver and persisted to the underlying storage mechanism:

# gnocchi.storage.StorageDriver#_add_measures
for key, split in ts.split():
    if oldest_key_to_keep is None or key >= oldest_key_to_keep:
        LOG.debug(
            "Storing split %s (%s) for metric %s",
            key, aggregation, metric)
        self._store_timeserie_split(
            metric, key, split, aggregation, oldest_mutable_timestamp,
            oldest_point_to_keep)

The idea of splitting the aggregation results is summarized in the following picture:

Each split is nonetheless a smaller instance of AggregatedTimeSerie that is later serialized and persisted to the used storage. As in the case of BoundTimeSerie, AggregatedTimeSerie writes 2 distinct arrays: timestamps and values. But it's true when the data is compressed. The uncompressed data is represented in two arrays but with different types. The first array stores boolean flags telling if the measure was taken for given time range. The second array, as in the case of compressed version, stores the values for each flag. The number of items in each array is equal to the POINTS_PER_SPLIT parameter. Let's see now some tests showing that:

def should_split_aggregation_on_multiple_chunks(self):
  bounded_timeserie = carbonara.BoundTimeSerie.from_data(
      [datetime64(2018, 1, 1, 10, 0, 0), datetime64(2018, 1, 1, 10, 0, 1),
        datetime64(2018, 1, 1, 10, 0, 2), datetime64(2018, 1, 1, 10, 0, 6),
        datetime64(2018, 1, 1, 10, 0, 40), datetime64(2018, 1, 1, 10, 0, 59)],
      [1, 1, 1, 10, 20, 30])
  granularity = numpy.timedelta64(3, 's')
  grouped_timeserie = bounded_timeserie.group_serie(granularity)

  aggregated_timeserie = carbonara.AggregatedTimeSerie.from_grouped_serie(grouped_timeserie,
                                                                          granularity, 'sum')
  # We define here that each aggregated chunk will contain 10 points. Since the
  # granularity is 3 seconds, the chunk will store 30 seconds of data
  original_split = carbonara.SplitKey.POINTS_PER_SPLIT
  carbonara.SplitKey.POINTS_PER_SPLIT = 10
  try:
      chunks = list(aggregated_timeserie.split())
      self.assertEqual(2, len(chunks))
      chunk_1 = chunks[0]
      self.assertEqual(datetime64(2018, 1, 1, 10, 0, 0), chunk_1[0].key)
      self.assertIn([3, 10], chunk_1[1].values)
      self.assertIn([datetime64(2018, 1, 1, 10, 0, 0), datetime64(2018, 1, 1, 10, 0, 6)],
                    chunk_1[1].timestamps)
      chunk_2 = chunks[1]
      self.assertEqual(datetime64(2018, 1, 1, 10, 0, 30), chunk_2[0].key)
      self.assertIn([20, 30], chunk_2[1].values)
      self.assertIn([datetime64(2018, 1, 1, 10, 0, 39), datetime64(2018, 1, 1, 10, 0, 57)],
                    chunk_2[1].timestamps)
  finally:
      carbonara.SplitKey.POINTS_PER_SPLIT = original_split

def should_show_the_serialized_format_for_uncompressed_data(self):
  bounded_timeserie = carbonara.BoundTimeSerie.from_data(
      [datetime64(2018, 1, 1, 10, 0, 0), datetime64(2018, 1, 1, 10, 0, 1),
        datetime64(2018, 1, 1, 10, 0, 2), datetime64(2018, 1, 1, 10, 0, 6),
        datetime64(2018, 1, 1, 10, 0, 40), datetime64(2018, 1, 1, 10, 0, 59)],
      [1, 1, 1, 10, 20, 30])
  granularity = numpy.timedelta64(20, 's')
  grouped_timeserie = bounded_timeserie.group_serie(granularity)

  aggregated_timeserie = carbonara.AggregatedTimeSerie.from_grouped_serie(grouped_timeserie,
                                                                          granularity, 'sum')
  offset, serialized_timeserie = aggregated_timeserie.serialize(
      carbonara.SplitKey(datetime64(2018, 1, 1, 10, 0, 0), granularity), False)
  unserialized_serialization_format = numpy.frombuffer(serialized_timeserie, dtype=[('b', '<?'), ('v', '<d')])

  self.assertEqual(3, len(unserialized_serialization_format))
  self.assertIn([True, False, True], unserialized_serialization_format['b'])
  self.assertIn([13, 0, 50], unserialized_serialization_format['v'])

The data is Gnocchi is stored thanks to the different representations of time series. The most basic one consists on storing the raw measures as received. This storage differs from the one of the aggregated time series. The latter one is stored compressed (LZ4) and also with the columnar-like format where the array with timestamps (or boolean flags) is followed by the array of values.

Read also about Carbonara storage format here: Timeseries storage and data compression , Gnocchi 4 Introspective .

Share, like or comment this post on Twitter: