Read Policies

Read Policies in Feature Set Data Ingestion

The selection of a Read Policy significantly influences how data is ingested into a feature set. When defining a data source in the feature set definition, careful consideration of the chosen Read Policy is crucial.

This document provides an overview of the diverse read policies accessible within Qwak.

🚧

Default Read Policy

If no read policy is set,Β NewOnly is the default read policy.


New Only

When employing the "New Only" read policy, each batch execution exclusively processes newly added records, encompassing the timeframe from the last batch execution to the current job execution.

In this context, the batch execution time is determined by the timestamp in the timestamp_column_name column, representing the configured Event time .

When consuming data under the "new only" read policy, Qwak defines "new data" as records generated after the completion of the previous batch. The notion of "after" is grounded in the time definition established through the data sources and the data-created column of feature sets.

For instance, consider a scenario with a "New Only" read policy. If the initial job triggered on 20/11/2023 at 00:00:00 ingested data with a maximum timestamp of 19/11/2023 at 23:55:00, subsequent executions will consider data as new if it arrived after 19/11/2023 at 23:55:00, extending up to and including the present moment.


from datetime import datetime
from qwak.feature_store.feature_sets import batch
from qwak.feature_store.feature_sets.transformations import SparkSqlTransformation
from qwak.feature_store.feature_sets.execution_spec import ClusterTemplate
from qwak.feature_store.feature_sets.read_policies import ReadPolicy


@batch.feature_set(
	name="user-transtaction-aggregations-prod",
	key="user_id",
	data_sources={"snowflake_datasource": ReadPolicy.NewOnly},
	timestamp_column_name="date_created" # --> must be included in transformation output
)
@batch.scheduling(cron_expression = "0 0 * * *")
@batch.execution_specification(cluster_template = ClusterTemplate.MEDIUM)
@batch.backfill(start_date=datetime(2019, 12, 31, 22, 0, 0))
def transform():
  return SparkSqlTransformation(sql="""
SELECT user_id as user_id,
  age as age,
  sex as sex,
  job as job,
  date_created as date_created
FROM snowflake_datasource
""")

Full Read

The "Full Read" policy, when applied to a single data source, involves consuming all available data from that source up to the scheduled batch time. This essentially means that the defined query or data retrieval process will run against the specified data source (e.g., Snowflake) and retrieve records based on the conditions defined by the timestamp column, up until the scheduled batch time.

Use Cases

  1. Snapshot of Data Source:

    1. In scenarios where a complete snapshot of the data source is required, each batch job reads all the data until the execution time.

      import qwak.feature_store.feature_sets.read_policies as ReadPolicy
      import qwak.feature_store.feature_sets.batch as batch
      
      @batch.feature_set(
          name="aggregation-by-time",
          key="user_id",
          data_sources={"full_read_source": ReadPolicy.FullRead}
          timestamp_column_name="processing_time"
      )
      
  2. Joining Dimension Tables:

    1. When joining a dimension table of user information with another data source, the Full Read policy ensures that all relevant records are considered.

      import qwak.feature_store.feature_sets.read_policies as ReadPolicy
      import qwak.feature_store.feature_sets.batch as batch
      
      @batch.feature_set(
          name="aggregation-by-time",
          key="transaction_id",
          data_sources={"snowflake": ReadPolicy.NewOnly, "full_read_source": ReadPolicy.FullRead}
          timestamp_column_name="processing_time"
      )
      

Timestamp Considerations:

  • Batch Feature Sets Constraints:
    • It's important to note that batch feature sets cannot insert rows with timestamps older than the current oldest timestamp in the offline feature store. Each batch must produce a timestamp equal to or larger than the timestamp of the last batch.
  • Handling Timestamps:
    • To utilize the Full Read policy and manage timestamps, Qwak feature set transformations support parameters like qwak_ingestion_start_timestamp and qwak_ingestion_end_timestamp. These parameters can be employed to define timestamp columns in transformations.
  1. import qwak.feature_store.feature_sets.read_policies as ReadPolicy
    import qwak.feature_store.feature_sets.batch as batch
    
    @batch.feature_set(
        name="aggregation-by-time",
        key="user_id",
        data_sources={"full_read_source": ReadPolicy.FullRead}
        timestamp_column_name="processing_time"
    )
    def transform():
        return SparkSqlTransformation(sql="""
                SELECT user_id,
                     to_timestamp(${qwak_ingestion_start_timestamp}) as processing_time_start,
                     to_timestamp(${qwak_ingestion_end_timestamp}) as processing_time
                FROM full_read_source""")
    
    

Time Frame

Each batch reads records within a specified time frame, starting from the job execution time until a defined period in the past.

πŸ“˜

Example

We want to track the total number of transactions a user made in the past 7 days.

We can use a TimeFrame read policy to show the aggregated data over sliding window.

This read policy allows us to read only the newly added records in the last 7 days, and transfer the updated information to the feature store.

import qwak.feature_store.feature_sets.read_policies as ReadPolicy
import qwak.feature_store.feature_sets.batch as batch
from  qwak.feature_store.feature_sets.transformations import SparkSqlTransformation

@batch.feature_set(
    name='total-transactions-7d',
    key="transaction_id",
    data_sources={"snowflake": ReadPolicy.TimeFrame(days=7)},
)
@batch.scheduling(cron_expression="@daily")
def transform():
    return SparkSqlTransformation(sql="""
        SELECT transaction_id,
        <SUM(Amount)> as transactions_total_amount_7d
        FROM snowflake
        GROUP BY transcation_id
    """)

TimeFrame Aggregations.Population Flavor

import qwak.feature_store.feature_sets.read_policies as ReadPolicy
import qwak.feature_store.feature_sets.batch as batch
from  qwak.feature_store.feature_sets.transformations import SparkSqlTransformation

@batch.feature_set(
    name='total-transactions-7d',
    key="transaction_id",
    data_sources = {
        "snowflake": ReadPolicy.TimeFrame(
            days = 7,
            flavor = Aggregations.Population
        )},,
)
@batch.scheduling(cron_expression="@daily")
def transform():
    return SparkSqlTransformation(sql="""
        SELECT transaction_id,
        <SUM(Amount)> as transactions_total_amount_7d
        FROM snowflake
        GROUP BY transcation_id
    """)

Using the Aggregations.Population flavor of the TimeFrame read policy will result in Keys that belonged to a previous window, but are not present at the current current window, having null feature values.

🚧

Note

The Backfill API feature is currently not supported for Feature Sets defined with Aggregations.Population flavor.

Using multiple Read policies

Qwak feature sets support fetching data from multiple sources, with different read policy for each.