sparkkgml.data_acquisition

Classes

DataAcquisition

A class for knowledge graph query and data preprocessing tasks such as null handling and null Drop.

Module Contents

class sparkkgml.data_acquisition.DataAcquisition(sparkSession: pyspark.sql.SparkSession = None)[source]

A class for knowledge graph query and data preprocessing tasks such as null handling and null Drop.

_endpoint

The endpoint for retrieving data.

Type:

str

_query

The query for retrieving data.

Type:

str

_amputationMethod

The method for handling null values in the DataFrame (‘nullReplacement’ or ‘nullDrop’).

Type:

str

_rowNullDropPercent

The percentage threshold for dropping rows with null values.

Type:

int

_columnNullDropPercent

The percentage threshold for dropping columns with null values.

Type:

int

_nullReplacementMethod

The method for replacing null values (‘median’, ‘mean’, ‘mode’, or ‘customValue’).

Type:

str

_customValueVariable

The name of the variable used as a custom replacement value for null values.

Type:

str

_customStringValueVariable

The name of the variable used as a custom replacement string for null values.

Type:

str

_endpoint = ''
_query = ''
_handleNullValues = 'True'
_amputationMethod = 'nullDrop'
_rowNullDropPercent = 100
_columnNullDropPercent = 0
_nullReplacementMethod = 'mod'
_customValueVariable = ''
_customStringValueVariable = ''
get_endpoint()[source]
get_query()[source]
get_amputationMethod()[source]
get_rowNullDropPercent()[source]
get_columnNullDropPercent()[source]
get_nullReplacementMethod()[source]
get_customValueVariable()[source]
get_customStringValueVariable()[source]
set_endpoint(endpoint)[source]
set_query(query)[source]
set_amputationMethod(amputationMethod)[source]
set_rowNullDropPercent(rowNullDropPercent)[source]
set_columnNullDropPercent(columnNullDropPercent)[source]
set_nullReplacementMethod(nullReplacementMethod)[source]
set_customValueVariable(customValueVariable)[source]
set_customStringValueVariable(customStringValueVariable)[source]
getDataFrame(endpoint=None, query=None)[source]

Retrieve data from a SPARQL endpoint and convert it into a Spark DataFrame.

Parameters:
  • endpoint (str, optional) – The SPARQL endpoint URL. If not provided, the default endpoint will be used.

  • query (str, optional) – The SPARQL query string. If not provided, the default query will be used.

Returns:

The resulting Spark DataFrame.

Return type:

pyspark.sql.DataFrame

Raises:

TypeError – If there are null values in the Pandas DataFrame and no handling method is specified.

Notes

This function retrieves data from a SPARQL endpoint and converts it into a Spark DataFrame. It follows the following steps:

  1. If the endpoint is not provided, the default endpoint is used. If the default endpoint is not set, an error message is displayed and the function returns.

  2. If the query is not provided, the default query is used. If the default query is not set, an error message is displayed and the function returns.

  3. The data is queried from the SPARQL endpoint and converted into a Pandas DataFrame.

  4. If there are null values in the Pandas DataFrame, handling methods are applied based on the configured amputation method.

  5. The Pandas DataFrame is then converted into a Spark DataFrame.

  6. The resulting Spark DataFrame is returned.

query_local_rdf(rdf_file_path, rdf_format, query)[source]

Query RDF data stored locally and convert results to a Spark DataFrame.

This function loads RDF data from a specified file in a given format, executes a SPARQL query, and converts the query results into a Spark DataFrame. The RDF data is first loaded into an RDFlib Graph object and then queried using the provided SPARQL query. The query results are processed as follows:

  1. The query results are converted to a list of dictionaries.

  2. A CSV representation of the results is created using the csv.DictWriter.

  3. The CSV data is read into a Pandas DataFrame.

  4. Depending on the ‘amputationMethod’ attribute of the ‘DataAcquisition’ class, null values in the Pandas DataFrame may be handled. If ‘amputationMethod’ is ‘nullDrop’, null values are dropped; if ‘amputationMethod’ is ‘nullReplacement’, null values are replaced based on the defined strategy.

  5. Finally, a Spark DataFrame is created from the Pandas DataFrame.

Parameters:
  • self (object) – The instance of the class calling this method.

  • rdf_file_path (str) – The path to the RDF data file.

  • rdf_format (str) – The format of the RDF data (e.g., “turtle”, “xml”, “n3”).

  • query (str) – The SPARQL query to execute on the RDF data.

Returns:

A Spark DataFrame containing the query results.

Return type:

pyspark.sql.DataFrame

Raises:

ValueError – If any of the required parameters (rdf_file_path, rdf_format, or query) is missing or empty.

Note

  • This function assumes the existence of a SparkSession in the ‘DataAcquisition’ class context.

  • Handling of null values is based on the ‘amputationMethod’ attribute of the ‘DataAcquisition’ class.

nullReplacement(df)[source]

Apply null replacement methods on variables with NaN values in a DataFrame.

Parameters:

df (pandas.DataFrame) – The input DataFrame.

Returns:

The DataFrame with null values replaced according to the specified method.

Return type:

pandas.DataFrame

Raises:

ValueError – If the customStringValueVariable or customValueVariable is not defined for the ‘customValue’ null replacement method.

Notes

This function applies null replacement methods on variables with NaN values in the input DataFrame. It follows the following steps:

  1. If the null replacement method is set to ‘median’, iterate over the columns of the DataFrame that have null values and fill them with the column’s median value. Note that this method cannot be applied to string columns.

  2. If the null replacement method is set to ‘mean’, iterate over the columns of the DataFrame that have null values and fill them with the column’s mean value.

  3. If the null replacement method is set to ‘mod’ (mode/most frequent value), iterate over the columns of the DataFrame that have null values and fill them with the column’s mode (first most frequent value). Note that this method cannot be applied to string columns.

  4. If the null replacement method is set to ‘customValue’, iterate over the columns of the DataFrame that have null values. If the column’s data type is an object (string), fill the null values with the specified custom string value. If the column’s data type is not an object, fill the null values with the specified custom numeric value.

  5. The resulting DataFrame with replaced null values is returned.

nullDrop(df)[source]

Apply null dropping according to thresholds on a DataFrame.

Parameters:

df (pandas.DataFrame) – The input DataFrame.

Returns:

The DataFrame with null values dropped according to the specified thresholds.

Return type:

pandas.DataFrame

Notes

This function applies null dropping on the input DataFrame based on the following steps:

  1. Drop columns where the percentage of missing values is greater than or equal to the specified self._columnNullDropPercent threshold.

  2. Drop rows where the percentage of missing values is greater than or equal to the specified self._rowNullDropPercent threshold.

  3. If there are still null values in the DataFrame after dropping, the nullReplacement function is called to apply the specified null replacement method.

  4. The resulting DataFrame with dropped null values is returned.

Warning

  • The nullReplacement method will be called if there are still null values after dropping. Make sure the nullReplacementMethod is properly configured.