Feature stores - Feast example

Recently you've discovered the building blocks of a feature store. This time I would like to demonstrate a feature store in action. I've chosen Feast as a playground use case because it's Open Source, has good working examples, and implements an Apache Spark ingestion job!

Features definition

The first part I'd like to focus on is the features definition. To define a feature with Feast, you have to initialize a FeatureTable and define all feature properties. Among these properties you will find a list of:

Features definition is only a matter of a simple class initialization. The classes interact with Feast components only when they are applied to the Feast's Client instance. You can find an example below where I'm initializing a Player feature based on players.parquet file:

goals = Feature("goals", ValueType.INT64)
assists = Feature("assists", ValueType.INT64)
player_id_entity = Entity(name="player_id", description="Player unique ID", value_type=ValueType.INT64)
players_feature_table = FeatureTable(
    name="players",
    entities=["player_id"],
    features=[goals, assists],
    labels={
        "creation_time": "2021-05-24T08:00",
        "author": "waitingforcode"
    },
    batch_source=FileSource(
        file_format=ParquetFormat(),
        event_timestamp_column="event_time",
        created_timestamp_column="event_time",
        file_url="file:///home/bartosz/workspace/wfc-playground/feature-store-feast/input"
    )
)

client = Client(
    core_url="localhost:6565",
    serving_url="localhost:6566"
)


client.apply(player_id_entity)
client.apply(players_feature_table)

As you can notice, I had to call the client's apply twice. First for the entity of the FeatureTable, and the second time for the FeatureTable itself. Without that, you will encounter an error like Feature Table refers to no existent Entity: (table:players, entity: player_id, project: default). But it's only the definition; the features are not yet indexed in the offline store. To make it happen, we have to ingest them explicitly. We can then have a different feature engineering job relying on it and writing the generated features to the offline store:

client = Client(
    core_url="localhost:6565",
    serving_url="localhost:6566"
)

client.ingest('players',
              source='file:///home/bartosz/workspace/wfc-playground/feature-store-feast/input/players.parquet')

Features can be ingested to Feast store twofold, either as Pandas DataFrame or file paths. I went with the latter strategy because I prepared the dataset with PySpark, and it seemed easier to implement. After this operation, you should see the confirmation log like:

Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.

That's cool but how to use all these features? We can export them with the help of an Apache Spark job managed by the feast-spark.

Apache Spark jobs

So far we've been working with core and serving modules of Feast. To use Apache Spark jobs, we'll need to interact with the jobs service. But it's not the one that will process the data, though. You can set the runtime environment to EMR, Dataproc or k8s with SPARK_LAUNCHER property. The service will then be responsible only for monitoring the final outcome of the job. It will delegate the physical execution to the used runtime. Apache Spark integration comes with feast-spark module and we can use it for various things like loading offline features to the online store or exporting offline features for training.

I will start by exporting the features from the offline store. In my export query, I'm selecting the features (feature_refs), defining the entities for which I want to select the features (entity_source) and finally, I'm setting the location where the job will write the extracted features:

client = Client(
    core_url="localhost:6565",
    serving_url="localhost:6566",
    spark_staging_location="file:///home/bartosz/workspace/wfc-playground/feature-store-feast/staging",
    spark_launcher="standalone",
    # The version we're using here for the
    spark_home="/home/bartosz/learning/apache_spark/envs/spark-3.0.2-bin-hadoop3.2"
)

player = pd.DataFrame(columns=['player_id', 'event_timestamp'])
player['player_id'] = [100, 200, 300, 400, 500, 600]
player['event_timestamp'] = pd.to_datetime(
    ['2021-05-25T00:00:00.000']*6)
historical_feature_retrieval_job = feast_spark.Client(client).get_historical_features(
    feature_refs=['players:assists', 'players:goals'],
    entity_source=player,
    output_location='file:///home/bartosz/workspace/wfc-playground/feature-store-feast/output_features'
)

One important thing to notice here is the player['event_timestamp'] filter. It specifies the max timestamp. Put another way, you will get all features created earlier than this value. If you run a PySpark job on top of the extracted files, you should see the content like:

+---------+-------------------+--------------+----------------+
|player_id|event_timestamp    |players__goals|players__assists|
+---------+-------------------+--------------+----------------+
|100      |2021-05-25 02:00:00|1             |2               |
|200      |2021-05-25 02:00:00|3             |4               |
|400      |2021-05-25 02:00:00|7             |8               |
|300      |2021-05-25 02:00:00|5             |6               |
|600      |2021-05-25 02:00:00|null          |null            |
|500      |2021-05-25 02:00:00|null          |null            |
+---------+-------------------+--------------+----------------+

As you can see, some of the entities don't have features and it's normal because the ingested dataset contained only 4 players (100, 200, 300, 400). Now you can use the features in your training job. But what if we needed these features for a low latency requirement? We must start by synchronizing the offline features with the online store:

players_feature_table = client.get_feature_table('players')
job = feast_spark.Client(client).start_offline_to_online_ingestion(players_feature_table,
                                                                   start=datetime.datetime(2021, 1, 1),
                                                                   end=datetime.datetime(2021, 5, 31))

Now, we can get the online features from the online store. I didn't mention that before but I'm using here the docker-compose provided with Feast and the online store will be Redis:

online_response = client.get_online_features(
    feature_refs=['players:assists', 'players:goals'],
    entity_rows=[{"player_id": 100}]
)
print(online_response.to_dict())

online_response = client.get_online_features(
    feature_refs=['players:assists', 'players:goals'],
    entity_rows=[{"player_id": 600}]
)
print(online_response.to_dict())

On your screen you should see:

{'player_id': [100], 'players:goals': [1], 'players:assists': [2]}
{'players:assists': [None], 'players:goals': [None], 'player_id': [600]}

As you can see, it worked! If you want to watch the code in action, I summarized them in the following demo:

After writing these 2 blog posts about feature stores, I feel that it tries to bring closer data science and data engineering. At first glance, it may look like a yet-another data science SDK, but in the hidden part, it implies some data engineering aspects like orchestration, datasets versioning, distributed data processing or NoSQL data stores.