Streaming Feature Set

A Streaming Feature Set is identical to a Batch Feature Set in terms of its use (retrieving online/offline features), but instead of reading data from a Batch Source, it reads from an infinite Stream Source - For example, Apache Kafka.

The 2 basic building blocks that define a Streaming Feature Set are its Streaming Source (e.g., Kafka) and a Transformation.

See Streaming Sources section for more details regarding the available Streaming Sources.

Streaming Feature Set Creation

To create a streaming feature set in Qwak, follow these steps, which involve defining a feature transformation function and utilizing the @streaming.feature_set decorator along with the specified parameters:

  1. Feature Transformation Function:
    • Begin by crafting a feature transformation function tailored to the desired processing of your raw data.
  2. Decorator Implementation:
    • Apply the @streaming.feature_set decorator to your transformation function, ensuring to include the following parameters:
      • name: If not explicitly defined, the decorated function's name is used. The name field is restricted to alphanumeric and hyphen characters, with a maximum length of 40 characters.
      • key: Specify the key for which to calculate the features in the feature set.
      • data_sources: Provide a list containing the names of relevant data sources that the feature set data will be ingested from. Currently streaming feature sets support only a single data source configuration.
      • timestamp_column_name:The name of the column in the data source that contains timestamp information. This is used to order the data chronologically and ensure that the feature values are updated in the correct order.
      • offline_scheduling_policy: A crontab definition of the the offline ingestion policy - which affects the data freshness the offline store. defaults to */30 * * * * (every 30 minutes)
      • online_trigger_interval: Defines the online ingestion policy - which affects the data freshness of the online store. Defaults to 5 seconds.

These steps ensure the seamless creation of a batch feature set, allowing users to define the transformation logic and specify the essential parameters for efficient feature extraction and processing within the Qwak ecosystem.

Streaming Feature Set Example

from qwak.feature_store.feature_sets import streaming
from qwak.feature_store.feature_sets.transformations import SparkSqlTransformation

@streaming.feature_set(
    name="user-features",
    key="user_id",
    data_sources=["my_kafka_source"],
    timestamp_column_name="date_created",
    offline_scheduling_policy="0 * * * *",
    online_trigger_interval=30
)
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user_id,
               registration_country,
               registration_device,
               date_created
        FROM my_kafka_source""")

This example:

  • Creates a streaming feature set, with online store freshness of 30 seconds and an hourly offline store freshness
  • Ingests data from the my_kafka_source source.
  • Creates a transformed feature vector with the fields: user_id, registration_country, registration_device
  • Ingests the feature vector into the Qwak Feature Store

Adding Metadata

An optional decorator for defining feature set metadata information of:
owner - User name of feature set owner
description - Describe what does this feature set do
display_name - Alternative feature set name for UI display

from qwak.feature_store.feature_sets import streaming
from qwak.feature_store.feature_sets.transformations import SparkSqlTransformation

@streaming.feature_set(
    name="user-features",
    key="user_id",
    data_sources=["my_kafka_source"],
    timestamp_column_name="date_created",
    offline_scheduling_policy="0 * * * *",
    online_trigger_interval=30
)
@streaming.metadata(
    owner="John Doe",
    display_name="User Aggregation Data",
    description="User origin country and devices"
)
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user_id,
               registration_country,
               registration_device,
               date_created
        FROM my_kafka_source""")

Specifying Execution Resources

At Qwak, the allocation of resources crucial for streaming execution jobs, often termed as the "cluster template," is vital. This template encompasses resources like CPU, memory, and temporary storage, essential for executing user-defined transformations and facilitating feature ingestion into designated stores.

The default size for the cluster template initiates at MEDIUM if none is explicitly specified.

For streaming feature sets, two different resource specifications are provided:

  • online_cluster_template online feature store ingestion job resources
  • offline_cluster_template offline feature store ingestion job resources
from qwak.feature_store.feature_sets import streaming
from qwak.feature_store.feature_sets.execution_spec import ClusterTemplate
from qwak.feature_store.feature_sets.transformations import SparkSqlTransformation

@streaming.feature_set(
    name="user-features",
    key="user_id",
    data_sources=["my_kafka_source"],
    timestamp_column_name="date_created",
    offline_scheduling_policy="0 * * * *",
    online_trigger_interval=30
)
@streaming.execution_specification(
    online_cluster_template=ClusterTemplate.SMALL,
    offline_cluster_template=ClusterTemplate.MEDIUM,
)
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user_id,
               registration_country,
               registration_device,
               date_created
        FROM my_kafka_source""")

Transformations

Row-Level transformations that are applied to the data (in a streaming fashion) - these transformations produce the actual features.

SQL Transformations

Row-Level arbitrary SQL, with support for PySpark Pandas UDFs inside SQL Transforms, leveraging Vectorized computation using PyArrow.

from qwak.feature_store.feature_sets import streaming
from qwak.feature_store.feature_sets.transformations import SparkSqlTransformation

@streaming.feature_set(
    name='sample_fs',
    key='user_id',
    data_source=['sample-streaming-source'],
    timestamp_column_name='timestamp'
)
def transform():
    return SparkSqlTransformation(
        sql="select timestamp, user_id, first_name, last_name from sample_source"
    )
)

Or, when using a Pandas UDF:

from qwak.feature_store.feature_sets import streaming
from qwak.feature_store.feature_sets.transformations import SparkSqlTransformation, qwak_pandas_df, Schema, Column

import pandas as pd


@qwak_pandas_udf(output_schema=Schema([
    Column(type=Type.long)]))
def plus_one(column_a: pd.Series) -> pd.Series:
    return column_a + 1

@qwak_pandas_udf(output_schema=Schema([
    Column(type=Type.long)]))
def mul_by_two(column_a: pd.Series) -> pd.Series:
    return column_a * 2

@streaming.feature_set(
    name='sample_fs',
    key='user_id',
    data_source=['sample-streaming-source'],
    timestamp_column_name='timestamp'
)
def transform():
    return SparkSqlTransformation(
        sql="select timestamp, mul_by_two(value) as col1, plus_one(value) as col2 from ds1",
        functions=[plus_one, mul_by_two]
    )
)

Full-DataFrame Pandas UDF Transforms

Just like the Pandas UDFs supported in SQL Transforms, but defined on the entire DataFrame (no need to write any SQL):

from qwak.feature_store.feature_sets import streaming
from qwak.feature_store.feature_sets.transformations import UdfTransformation, qwak_pandas_udf, Schema, Column

import pandas as pd

@qwak_pandas_udf(output_schema=Schema([
    Column(name='rate_mul', type=Type.long),
    Column(name='date_created', type=Type.timestamp),
]))
def func(df: pd.DataFrame) -> pd.DataFrame:
    df = pd.DataFrame({'rate_mul': df['value'] * 1000, 'date_created': df['date_created']})
    return df


@streaming.feature_set(
    name="user-features",
    key="user_id",
    data_sources=["my_kafka_source"],
    timestamp_column_name="date_created",
    offline_scheduling_policy="0 * * * *",
    online_trigger_interval=30
)
def user_features():
    return UdfTransformation(functions=func)

Event-time Aggregations

In addition to row-level operations, event-time aggregations are also supported, with EXACTLY ONCE semantics.
Internally, we employ our highly-optimized proprietary implementation, partially based on open source Apache Spark :tm:.

This implementation is designed to optimize for data freshness, low serving latency and high throughput, while handling multiple long and short, overlapping time windows and out-of-order data (late arrivals) without the intense resource consumption that is often incurred in these cases.

Enabling Aggregations is done by adding them on top of the row-level transform, for example:

sql = "SELECT timestamp, user_id, transaction_amount, is_remote, offset, topic, partition FROM transaction_stream"
SqlTransformation(sql)\
    .aggregate(QwakAggregation.avg("transaction_amount"))\
    .aggregate(QwakAggregation.boolean_or("is_remote"))\
    .aggregate(QwakAggregation.sum("transaction_amount"))\
    .by_windows("1 minute, 1 hour, 3 hour, 1 day, 7 day")

In the above example, we declare that we are interested in the average transaction amount, sum of transaction amounts and whether there was only remote transaction, all computed on 5 time windows, from 1 minute to 7 days.

Let's break this example into pieces:

  1. row-level transform: the regular row-level transform defined for row-level streaming - this can either be a SQL transform (with or without pandas UDFs) or a full-dataframe pandas udf. all aggregations are defined on columns that belong to the output of this transform. all row-level modifications prior to aggregation (string manipulation, currency conversion, boolean conditions etc.)

atm, we also need to select 3 kafka metadata columns - these are internally used by Qwak to guarantee EXACTLY ONCE.

  1. Declarative aggregates: we add each aggregation in a chaining fashion, in the above example we had avg, sum, and boolean_or.
  2. time windows: define the time windows on which we aggregate - in that case we had 3 aggregates and 5 time windows - meaning the resulting Featureset will have 15 features.

We currently support the following aggregates:

  1. SUM - a sum of column, for example, QwakAggregation.sum("transaction_amount")
  2. COUNT - count (not distinct), a column is specified for API uniformity. for example, QwakAggregation.count("transaction_amount")
  3. AVERAGE - mean value, for example QwakAggregation.avg("transaction_amount")
  4. MIN - minimum value, for example QwakAggregation.min("transaction_amount")
  5. MAX - maximum value, for example QwakAggregation.max("transaction_amount")
  6. BOOLEAN OR - boolean or, defined over a boolean column, for example QwakAggregation.boolean_or("is_remote")
  7. BOOLEAN AND - boolean and, defined over a boolean column, for example QwakAggregation.boolean_and("is_remote")
  8. Sample Variance - QwakAggregation.sample_variance("transaction_amount")
  9. Sample STDEV - QwakAggregation.sample_stdev("transaction_amount")
  10. Population Variance - QwakAggregation.population_variance("transaction_amount")
  11. Population STDEV - QwakAggregation.population_stdev("transaction_amount")

In addition, it's also possible to add an Alias - a prefix for the result feature name.
by default, an aggregate results in a feature named <aggregate_name>_<column_name>_<window_size>, for each window defined.
In some cases, it may be desired to a have different prefix rather than <aggregate_name>_<column_name>- in these cases, we simply specify an alias:

SqlTransformation(sql)\
    .aggregate(QwakAggregation.avg("transaction_amount"))\
    .aggregate(QwakAggregation.boolean_or("is_remote").alias("had_remote_transaction"))\
    .by_windows("1 minute, 1 hour")

in the above sample, we've aliased the boolean_or aggregate, so it's now called had_remote_transactions_<window>, where <window> is the time window.
the example below will result in 4 features: avg_transaction_amount_1m, avg_transaction_amount_1h, had_remote_transactions_1m, had_remote_transactions_1h

Event-time Aggregations Backfill

For streaming aggregation featuresets, adding backfill spec will populate historical features values from batch data sources before deploying the actual streaming featureset

The StreamingBackfill parameters are:

  • start_datetime: Datetime to start fetching values from
  • end_datetime: Datetime to end fetching values from

🚧

end_datetime must be divisible by slice size

  • transform: An SQL transformation that select the relevant features from the batch sources,
    and it's output schema must include the raw streaming source schema
  • data_source_specs: List of existing batch data source names to fetch from
  • execution_spec: [optional] resource template for backfill step
@streaming.feature_set(
  key="user_id",
  data_sources=["users_registration_stream"],
  timestamp_column_name="reg_date",
  name="my_backfilled_streaming_agg_fs",
)
@streaming.backfill(
  start_date=start_date=(2022,1,1,0,0,0),
  end_date=start_date=(2023,9,1,0,0,0),
  data_sources=[{"batch_backfill_source_name": BackfillSourceRange(start_date=(2023,1,1,0,0,0),
                                                                   end_date=(2023,8,1,0,0,0))}],
  backfill_cluster_template=ClusterTemplate.XLARGE
  backfill_transformation=SparkSqlTransformation("SELECT user_id, amount, reg_date FROM backfill_data_source")
)
def user_streaming_features():
  return SparkSqlTransformation("SELECT user_id, amount, reg_date FROM data_source")\
							.aggregate(QwakAggregation.avg("amount"))\
							.by_windows("1 minute, 1 hour")

In the example above we are extracting from batch_backfill_source_name (which is an existing batch data source), the user_id and amount features and
rename them to match the desired feature name of the raw stream source column names
The data will be between 1/1/2020 and 1/9/2022

If it is needed to specify specific datetime filter for each batch source (i.e selecting from different sub start and end time for each source), we need to pass BackfillBatchDataSourceSpec:

πŸ“˜

Filtering per data source is optional,
keep in mind that the general start and end time filter set for the backfill will be lower and upper limits for any sub specific backfill source filter

data_source_specs=[BackfillBatchDataSourceSpec(data_source_name="sample-batch-source",
                                               start_datetime=datetime(2021, 1, 1),
                                               end_datetime=datetime(2021, 3, 1))],