Transformations

This section describes the various transformations supported by Qwak.

SQL

📘

Qwak runs Spark SQL in the background. Please comply with Spark Standards.

The following is an implementation of creating a transformation using a SQL:

import qwak.feature_store.feature_sets.read_policies as ReadPolicy
import qwak.feature_store.feature_sets.batch as batch
from  qwak.feature_store.feature_sets.transformations import SparkSqlTransformation

@batch.feature_set(
    name="user-transaction-aggregations",
    key="user_id",
    data_sources={"snowflake_datasource": ReadPolicy.TimeFrame(days=30)},
)
def user_features():
    return SparkSqlTransformation(sql="""
            SELECT
            user_id,
            AVG(credit_amount) as avg_credit_amount,
            STD(credit_amount) as std_credit_amount,
            MAX(credit_amount) as max_credit_amount,
            MIN(date_created) as first_transaction,
            AVG(duration) as avg_loan_duration,
            AVG(job) as seniority_level
        FROM snowflake_datasource
        Group By user_id""")

Creating Transformations

When creating transformations, keep the following guidelines in mind:

  1. Key Inclusion:
    • The resulting feature vector must incorporate the feature set key, used in the definition.
  2. Timestamp Column Requirement:
    • For read policies such as NewOnly and FullRead, it is imperative to include the timestamp column in the returned feature vector.
  3. Use the data source as the table name in the FROM clause.
  4. Make sure the column names resulting from the SQL has no special characters. The allowed characters are: a-z,A-Z,0-9,_.

📘

Logging

We support the default Python logger, which you can import from the standard python logging library.

PySpark

To use this feature, ensure that you have installed the qwak-sdk with the feature-store extra.

pip install -U "qwak-sdk[feature-store]"

PySpark transformation is defined by creating a UDF which is responsible for the transformation logic.

UDF definition:

  • Arguments:
    • df_dict: spark.DataFrame- Mandatory
      A dictionary in the form of {'\<batch_sourcename>': df ...}.
    • qwargs: Dict[str, Any]- Optional
      If added, runtime parameters will be injected via qwargs (e.g. qwak_ingestion_start_timestamp, qwak_ingestion_end_timestamp)
  • Return value: spark.DataFrame

The returned df (PySpark DataFrame) must contain a column representing the configured key. The df column names must not include whitespaces or special characters.

🚧

Python and Dependency Restrictions

To ensure compatibility and stability, it is mandatory to use Python 3.8 when registering a feature set with a Koalas transformation. Additionally, ensure that cloudpickle version is locked to 2.2.1.

from typing import Dict, Any

import pyspark.sql as spark  
import pyspark.sql.functions as F

from qwak.feature_store.feature_sets import batch  
from qwak.feature_store.feature_sets.read_policies import ReadPolicy  
from qwak.feature_store.feature_sets.transformations import PySparkTransformation

@batch.feature_set(  
    name="user-features",
    key="user",  
    data_sources={"snowflake_transactions_table": ReadPolicy.TimeFrame(days=30)},  
    timestamp_column_name="date_created"  
)  
@batch.scheduling(cron_expression="0 8 * * *")  
def transform():  
  def amount_stats(df_dict: Dict[str, spark.DataFrame], qwargs: Dict[str, Any]) -> spark.DataFrame:  
    df = df_dict['snowflake_transactions_table']  
    agg_df = df.groupby('user').agg(F.max('amount').alias("max_duration"))  
    
    return agg_df
  
  return PySparkTransformation(function=amount_stats)

❗️

Function scope and dependencies

PySpark function scope and variables must be defined under the transform function, as shown in the code snippet above.
At runtime, only PySpark and python native library, are available.

📘

Logging

We support the default Python logger, which you can import from the standard python logging library.

Pandas On Spark

Pandas On Spark is a pandas implementation using Spark. Please ensure your code is Pandas On Spark library compliant.
The User Defined Function (UDF) receives a dictionary
in the form of {'\<batch_source_name>': pyspark.pandas.DataFrame ...} as input.
The returned pyspark.pandas.DataFrame (Pandas On Spark DataFrame) must contain a column representing the configured key and timestamp column. The psdf must not include complex columns, such as multi-index, and the name must not include whitespaces or special characters.
Make sure that column names returned from the UDF do not contain special characters.
The allowed characters are: a-z,A-Z,0-9,_..

🚧

Restrictions

Deployment - supported for Hybrid deployments ONLY.

Dependencies - to ensure compatibility and stability, it is mandatory to use Python 3.8 when registering a Feature Set with a Pandas On Spark transformation.

from typing import Dict, Any
import qwak.feature_store.feature_sets.batch as batch
from qwak.feature_store.feature_sets.read_policies import ReadPolicy
from qwak.feature_store.feature_sets.transformations import PandasOnSparkTransformation
from pyspark.pandas import DataFrame

@batch.feature_set(  
    name="user-features",
    key="user",  
    data_sources={"snowflake_transactions_table": ReadPolicy.TimeFrame(days=30)},  
    timestamp_column_name="date_created"  
)  
@batch.scheduling(cron_expression="0 8 * * *")  
def transform():
  
  def amount_stats(df_dict: Dict[str, DataFrame], qwargs: Dict[str, Any]) -> DataFrame:
    ps_df = df_dict['snowflake_transactions_table']
    agg_psdf = ps_df.groupby('user').agg({'amount': ['avg', 'sum']})
    return agg_psdf
                    
  return PandasOnSparkTransformation(function=amount_stats)

❗️

Function scope and dependencies

Pandas On Spark function scope and variables must be defined under the transform function, as shown in the code snippet above.

📘

Logging

We support the default Python logger, which you can import from the standard python logging library.

Koalas (deprecated)

Koalas is a pandas implementation using Spark. Ensure your code complies with Databricks Koalas library.

The UDF receives as input a dictionary in the form of {'\<batch_sourcename>': kdf ...}.

The returned kdf (Koalas DataFrame) must contain a column representing the configured key. The kdf must not include complex columns, such as multi index, and the name must not include whitespaces or special characters.

Make sure that column names returned from the UDF has no special characters.
The allowed characters are: a-z,A-Z,0-9,_.

🚧

Python and Dependency Restrictions

To ensure compatibility and stability, it is mandatory to use Python 3.8 when registering a feature set with a Koalas transformation. Additionally, ensure that cloudpickle version is locked to 2.2.1.

import qwak.feature_store.feature_sets.batch as batch
from qwak.feature_store.feature_sets.read_policies import ReadPolicy
from qwak.feature_store.feature_sets.transformations import KoalasTransformation

@batch.feature_set(  
    name="user-features",
    key="user",  
    data_sources={"snowflake_transactions_table": ReadPolicy.TimeFrame(days=30)},  
    timestamp_column_name="date_created"  
)  
@batch.scheduling(cron_expression="0 8 * * *")  
def transform():

  def amount_stats(kdf_dict):  
    kdf = kdf_dict['snowflake_transactions_table']  
    agg_kdf = kdf.groupby('user').agg({'amount': ['avg', 'sum']})  
    agg_kdf.columns = ['_'.join(column) for column in kdf.columns.values]  
    return agg_kdf

  return KoalasTransformation(function=amount_stats)

❗️

Function scope and dependencies

Koalas function scope and variables must be defined under the transform function, as shown in the code snippet above.
At runtime, only pandas and koalas and python native library, are available.

📘

Logging

We support the default Python logger, which you can import from the standard python logging library.

Supported Spark column types

IntegerType, LongType, StringType, DoubleType, DecimalType, FloatType, BooleanType, TimestampType, DateType, ArrayType.

ArrayType column may include any of the above column types, except for another ArrayType column.

❗️

Batch cannot insert rows with an older timestamp than the current oldest timestamp in the offline feature store.

Each batch must produce a timestamp equal to or larger than the last batch.