Source code for sparkkgml.feature_engineering

from pyspark.sql.functions import *

[docs] class FeatureEngineering: """ A class for extracting features and their descriptions from a Spark DataFrame. Attributes: _entityColumn (str): The name of the entity column. Methods: get_entityColumn(): Getter method for the entity column. set_entityColumn(entityColumn): Setter method for the entity column. getFeatures(df): Extracts features and their descriptions from a DataFrame. """ def __init__(self): self._entityColumn = '' # getter functions
[docs] def get_entityColumn(self): return self._entityColumn
# setter functions
[docs] def set_entityColumn(self, entityColumn): self._entityColumn= entityColumn
[docs] def getFeatures(self, df): """Extracts features and their descriptions from a DataFrame. Args: df (pyspark.sql.DataFrame): The input DataFrame. Returns: tuple: A tuple containing the collapsed DataFrame and a dictionary of feature descriptions. Notes: This function analyzes each column in the input DataFrame and extracts features. The resulting features are stored in a collapsed DataFrame where each row represents a unique entity. A dictionary of feature descriptions provides information about each feature's properties. Feature Descriptions: - featureType (str): The type of the feature, combining information about whether it is a list or a single value, whether it is categorical or non-categorical, and the data type. - name (str): The name of the feature column. - nullable (bool): A flag indicating if the feature can have null values. Extracted based on the rule that a feature is nullable if it has at least one null value. - datatype (spark.DataType): The data type of the feature column. - numberDistinctValues (int): The number of distinct values in the feature column. - isListOfEntries (bool): A flag indicating if the feature is a list of entries. Extracted based on the rule that a feature is considered a list if it has more than one entry in at least one row. - isCategorical (bool): A flag indicating if the feature is categorical. Extracted based on the rule that a feature is considered categorical if the ratio of distinct values to the total number of entities is less than 0.1. """ #if entity column is not set, set it as the first column if self._entityColumn == '': self._entityColumn=df.columns[0] print(f'No entity column has been set, that is why the first column {self._entityColumn} is used as entity column') keyColumnNameString=self._entityColumn featureColumns = [col for col in df.columns if col != keyColumnNameString] collapsedDataframe = df.select(keyColumnNameString).dropDuplicates() numberRows = collapsedDataframe.count() #dictionary to map all the features of columns featureDescriptions = {} #apply and get all the features for all the columns for currentFeatureColumnNameString in featureColumns: #two column df for key and selected feature twoColumnDf = df.select(keyColumnNameString, currentFeatureColumnNameString).dropDuplicates() #groupBy the column according to key groupedTwoColumnDf = twoColumnDf.groupBy(keyColumnNameString) #make a list of feature if it is a list, add a new column for length of the list collapsedTwoColumnDfwithSize=groupedTwoColumnDf\ .agg(collect_list(currentFeatureColumnNameString).alias(currentFeatureColumnNameString))\ .withColumn("size", size(col(currentFeatureColumnNameString))) minNumberOfElements = collapsedTwoColumnDfwithSize.select("size").agg(min("size")).first()[0] maxNumberOfElements = collapsedTwoColumnDfwithSize.select("size").agg(max("size")).first()[0] #determine the feature specific properties nullable = True if minNumberOfElements == 0 else False datatype = twoColumnDf.schema[currentFeatureColumnNameString].dataType numberDistinctValues = twoColumnDf.select(currentFeatureColumnNameString).distinct().count() isListOfEntries = True if maxNumberOfElements > 1 else False #availability = collapsedTwoColumnDfwithSize.filter(F.col("size") > 0).count() / numberRows isCategorical = True if (numberDistinctValues / numberRows) < 0.1 else False #append a string for feature type "ListOf|Single + Categorical|NonCategorical + dataType" featureType = "ListOf_" if isListOfEntries else "Single_" featureType += "Categorical_" if isCategorical else "NonCategorical_" featureType += str(datatype).split("Type")[0] featureSummary = { "featureType": featureType, "name": currentFeatureColumnNameString, "nullable": nullable, "datatype": datatype, "numberDistinctValues": numberDistinctValues, "isListOfEntries": isListOfEntries, "isCategorical": isCategorical, #"avalability": availability } #append the features dictionary to mapping featureDescriptions[currentFeatureColumnNameString] = featureSummary #append the end-result feature column to end-result dataframe if isListOfEntries: joinable_df = collapsedTwoColumnDfwithSize.select(keyColumnNameString, currentFeatureColumnNameString) else: joinable_df = twoColumnDf.select(keyColumnNameString, currentFeatureColumnNameString) collapsedDataframe = collapsedDataframe.join(joinable_df, keyColumnNameString) return collapsedDataframe, featureDescriptions