Read Policies

What is a Read Policy?

A Read Policy is used to determine how feature set data is ingested. It's important to pay attention to the chosen Read Policy when defining a new data source.

This document describes the various read policies available in Qwak.

🚧

Default Read Policy

Te default read policy is newOnly, if no read policy is set.


New Only

Each batch execution reads only the newly added records, between the last batch execution time to the current job execution time.

The batch execution refers to the timestamp in the timestamp_column_name column, which represents the configured Event time .


import qwak.feature_store.feature_sets.read_policies as ReadPolicy
import qwak.feature_store.feature_sets.batch as batch
from  qwak.feature_store.feature_sets.transformations import SparkSqlTransformation

@batch.feature_set(
    name="aggregation-by-time-v1",
    entity="user",
    data_sources={"snowflake": ReadPolicy.NewOnly()},
    timestamp_column_name="date_created" # --> must be included in transformation output
)
def user_features():
    return SparkSqlTransformation(sql="""
        SELECT user_id,
               registration_country,
               registration_device,
               date_created
        FROM snowflake_users_table
    """)

Full Read

Each batch reads all the records in the data source up until the job execution time.

Use Cases

  1. Performing a "snapshot" of data source. Each batch job will read all the data until the execution time

    import qwak.feature_store.feature_sets.read_policies as ReadPolicy
    import qwak.feature_store.feature_sets.batch as batch
    
    @batch.feature_set(
        name="aggregation-by-time",
        entity="user_id",
        data_sources={"full_read_source": ReadPolicy.FullRead}
        timestamp_column_name="processing_time" # --> must be included in transformation output using qwak timestamp
    )
    
  2. Joining a dimension table of user information with another data source

    import qwak.feature_store.feature_sets.read_policies as ReadPolicy
    import qwak.feature_store.feature_sets.batch as batch
    
    @batch.feature_set(
        name="aggregation-by-time",
        entity="transcation",
        data_sources={"snowflake": ReadPolicy.NewOnly,
    									"full_read_source": ReadPolicy.FullRead}
        timestamp_column_name="processing_time" # --> must be included in transformation output using qwak timestamp
    )
    
  3. Computing lifetime aggregates (e.g., the total sum of payments a user has ever made)
    when using this policy, for each entity that had at least 1 record since the last scan, all records are retrieved (for example, if we have a payments use case and the entities are users, then for each user that had at least 1 payment since the last scan, ALL payments are retrieved.

    import qwak.feature_store.feature_sets.read_policies as ReadPolicy
    import qwak.feature_store.feature_sets.batch as batch
    
    @batch.feature_set(
        name="aggregation-by-time",
        entity="user_id",
        data_sources={
          "full_read_source": ReadPolicy.FullRead(flavor=PopulationTimeframe())
        }
        timestamp_column_name="processing_time" # --> must be included in transformation output using qwak timestamp
    )
    

🚧

Past Timestamps

Batch feature sets cannot insert rows with an older timestamp than the current oldest timestamp in the offline feature store.

Each batch must produce a timestamp equal to or larger than the last batch.


In order to use FullRead policy and handle the timestamp,
Qwak feature set transformations supports the following parameters:

  1. qwak_ingestion_start_timestamp
  2. qwak_ingestion_end_timestamp

These parameters may be used to define timestamp columns as such:

  1. Using Spark SQL transformation

    import qwak.feature_store.feature_sets.read_policies as ReadPolicy
    import qwak.feature_store.feature_sets.batch as batch
    
    @batch.feature_set(
        name="aggregation-by-time",
        entity="user_id",
        data_sources={"full_read_source": ReadPolicy.FullRead}
        timestamp_column_name="processing_time"
    )
    def transform():
        return SparkSqlTransformation(sql="""
                SELECT user_id,
        				to_timestamp(${qwak_ingestion_start_timestamp}) as processing_time_start,
        				to_timestamp(${qwak_ingestion_end_timestamp}) as processing_time,
        		FROM full_read_source""")
    
    

Time Frame

Each batch reads records within a specified time frame, starting from the job execution time until a defined period in the past.

πŸ“˜

Example

We want to track the total number of transactions a user made in the past 7 days.

We can use a TimeFrame read policy to show the aggregated data over sliding window.

This read policy allows us to read only the newly added records in the last 7 days, and transfer the updated information to the feature store.

import qwak.feature_store.feature_sets.read_policies as ReadPolicy
import qwak.feature_store.feature_sets.batch as batch
from  qwak.feature_store.feature_sets.transformations import SparkSqlTransformation

@batch.feature_set(
    name='total-transactions-7d',
    entity="transaction",
    data_sources={"snowflake": ReadPolicy.TimeFrame(days=7)},
)
@batch.scheduling(cron_expression="@daily")
def transform():
    return SparkSqlTransformation(sql="""
        SELECT transaction,
        <SUM(Amount)> as transactions_total_amount_7d
        FROM snowflake
        GROUP BY transcation 
    """)

πŸ“˜

Read Policy Entities

Entities that belonged to a previous window but are not present at the current current window, will have null feature values.