API Reference
Client
Fennel Client has the following methods on it:
extract_features
Given some input and output features, extracts the current value of all the output features given the values of the input features.
Arguments:
output_feature_list: List[Union[Feature, Featureset]]
: list of features (written as fully qualified name of a feature along with the featureset) that should be extracted. Can also take featureset objects as input, in which case all features in the featureset are extracted.input_feature_list: List[Union[Feature, Featureset]]
: list of features/featuresets for which values are knowninput_df: Dataframe
: a pandas dataframe object that contains the values of all features in the input feature list. Each row of the dataframe can be thought of as one entity for which features are desired.log: bool
- boolean which indicates if the extracted features should also be logged (for log-and-wait approach to training data generation). Default is Falseworkflow: str
- the name of the workflow associated with the feature extraction. Only relevant whenlog
is set to Truesampling_rate: float
- the rate at which feature data should be sampled before logging. Only relevant when log is set to True. The default value is 1.0
Example:
1client = Client(<URL>)
2
3@featureset
4class UserFeatures:
5userid: int = feature(id=1)
6... 6 more features
7
1feature_df = client.extract_features(
2 output_feature_list=[
3 UserFeatures,
4 ],
5 input_feature_list=[UserFeatures.userid],
6 input_dataframe=pd.DataFrame(
7 {"UserFeatures.userid": [18232, 18234]}
8 ),
9)
10self.assertEqual(feature_df.shape, (2, 7))
sync
Synchronizes the local dataset and featureset definitions with the server. This method should be called after all the datasets and featuresets definitions have been defined using the client SDK. This method will create the resources required for the datasets and featuresets on the server. It will also update the resources / throw errors if the schema of the datasets and featuresets have changed.
Arguments:
datasets: List[Dataset]
- a list of dataset definitions that need to be synced with the serverfeaturesets: List[Featureset]
- a list of featureset definitions that need to be synced with the server
Example
1client.sync(
2 datasets=[UserInfoDataset],
3 featuresets=[UserFeatures],
4)
log
While Fennel supports inbuilt connectors to external datasets, it's also possible to "manually" log data to Fennel datasets using log
.
Arguments:
dataset_name: str
- the name of the dataset to which data needs to be loggeddataframe: Dataframe
- the data that needs to be logged, expressed as a Pandas dataframe.batch_size: int
- the size of batches in which dataframe is chunked before sending to the server. Useful when attempting to send very large batches. The default value is 1000.
This method throws an error if the schema of the dataframe (i.e. column names and types) are not compatible with the schema of the dataset.
Example
1now = datetime.now()
2data = [
3 [18232, "John", 32, "USA", now],
4 [18234, "Monica", 24, "Chile", now],
5]
6columns = ["user_id", "name", "age", "country", "timestamp"]
7df = pd.DataFrame(data, columns=columns)
8response = client.log("fennel_webhook", "UserInfoDataset", df)
9assert response.status_code == requests.codes.OK, response.json()
extract_historical_features
For offline training of models, users often need to extract features for a large number of entities. This method allows users to extract features for a large number of entities in a single call while ensuring point-in-time correctness of the extracted features.
This api is an asynchronous api that returns a request id and the path to the output folder in S3 containing the extracted features.
Arguments:
input_feature_list: List[Union[Feature, Featureset]]
- List of features or featuresets to use as input.output_feature_list: List[Union[Feature, Featureset]]
- List of features or featuresets to compute.timestamp_column: str
- The name of the column containing the timestamps.format: str
- The format of the input data. Can be either "pandas", "csv", "json" or "parquet". Default is "pandas".input_dataframe: Optional[pd.DataFrame]
- Dataframe containing the input features. Only relevant when format is "pandas".output_bucket: Optional[str]
- The name of the S3 bucket where the output data should be stored.output_prefix: Optional[str]
- The prefix of the S3 key where the output data should be stored.
The following parameters are only relevant when format is "csv", "json" or "parquet".
input_bucket: Optional[str]
- The name of the S3 bucket containing the input data.input_prefix: Optional[str]
- The prefix of the S3 key containing the input data.feature_to_column_map (Optional[Dict[Feature, str]])
: A dictionary mapping features to column names.
Returns:
Dict[str, Any]
- A dictionary containing the following information:- request_id
- output s3 bucket
- output s3 path prefix
- completion rate.
- failure rate.
- status
A completion rate of 1.0 indicates that all processing has been completed. A completion rate of 1.0 and a failure rate of 0.0 indicates that all processing has been completed successfully.
Example
1response = client.extract_historical_features(
2 output_feature_list=[
3 UserFeatures,
4 ],
5 input_feature_list=[UserFeatures.userid],
6 format="pandas",
7 input_dataframe=pd.DataFrame(
8 {"UserFeatures.userid": [18232, 18234], "timestamp": [now, now]}
9 ),
10 timestamp_column="timestamp",
11)
extract_historical_features_progress
This method allows users to monitor the progress of the extract_historical_features asynchronous operation.
It accepts the request ID that was returned by the extract_historical_features
method and returns the current status of that operation.
The response format of this function and the extract_historical_features
function are identical.
Arguments:
request_id: str
- The request ID returned by theextract_historical_features
method. This ID uniquely identifies the feature extraction operation
Returns:
Dict[str, Any]
- A dictionary containing the following information:- request_id
- output s3 bucket
- output s3 path prefix
- completion rate.
- failure rate.
- status
A completion rate of 1.0 indicates that all processing has been completed. A completion rate of 1.0 and a failure rate of 0.0 indicates that all processing has been completed successfully.
Example
1client.extract_historical_features_progress(request_id='bf5dfe5d-0040-4405-a224-b82c7a5bf085')
2>>> {'request_id': 'bf5dfe5d-0040-4405-a224-b82c7a5bf085', 'output_bucket': <bucket_name>, 'output_prefix': <output_prefix>, 'completion_rate': 0.76, 'failure_rate': 0.0}
3
extract_historical_cancel_request
The extract_historical_cancel_request
method allows users to cancel an extract_historical_features asynchronous operation.
The response format of this function and the extract_historical_features
function are identical.
Arguments:
request_id: str
- The request ID returned by theextract_historical_features
method. This ID uniquely identifies the feature extraction operation
Returns:
Dict[str, Any]
- A dictionary containing the following information:- request_id
- output s3 bucket
- output s3 path prefix
- completion rate.
- failure rate.
- status