import pandas as pd
import re
from sparkkgml.data_acquisition import DataAcquisition
from kgextension.endpoints import DBpedia
from pyspark.sql import SparkSession
[docs]
def clean_column_names(df):
"""
Clean column names of a Pandas DataFrame by removing invalid characters.
Args:
df (pandas.DataFrame): The input Pandas DataFrame.
Returns:
pandas.DataFrame: The DataFrame with cleaned column names.
"""
# Regular expression pattern for removing invalid characters
pattern = r'[\s`/\\,:;\'"\[\]\{\}\(\).]'
# Apply strip to remove leading and trailing whitespace characters
#cleaned_columns = [col.strip() for col in df]
# Clean the column names
cleaned_columns = [re.sub(pattern, '', col) for col in df]
# Create a dictionary to map original column names to cleaned column names
column_mapping = {col: cleaned_col for col, cleaned_col in zip(df.columns, cleaned_columns)}
#Rename the columns in the DataFrame
df_cleaned = df.rename(columns=column_mapping)
return df_cleaned
[docs]
def spark_dbpedia_lookup_linker(sparkDataFrame, column, new_attribute_name='new_link', progress=True,
base_url='https://lookup.dbpedia.org/api/search/', max_hits=1, query_class='',
lookup_api='KeywordSearch', caching=True):
"""
Perform DBpedia entity linking on a Spark DataFrame using the DBpedia Spotlight service.
Args:
sparkDataFrame (pyspark.sql.DataFrame): The input Spark DataFrame.
column (str): Name of the column whose entities should be found.
new_attribute_name (str, optional): Name of the column containing the link to the knowledge graph.
Defaults to 'new_link'.
progress (bool, optional): If True, progress bars will be shown to inform the user about the progress.
Defaults to True.
base_url (str, optional): The base URL of the DBpedia Lookup API. Defaults to 'https://lookup.dbpedia.org/api/search/'.
max_hits (int, optional): Maximal number of URIs that should be returned per entity. Defaults to 1.
query_class (str, optional): Specifies whether the entities that occur first ('first'), that have the highest support
('support'), or that have the highest similarity score ('similarityScore') should be chosen.
Defaults to ''.
lookup_api (str, optional): The DBpedia Lookup API to use. Defaults to 'KeywordSearch'.
caching (bool, optional): Turn result-caching for queries issued during the execution on or off. Defaults to True.
Returns:
pyspark.sql.DataFrame: DataFrame with new column(s) containing the DBpedia URIs.
Notes:
This function performs DBpedia entity linking on a Spark DataFrame using the DBpedia Spotlight service. It follows these steps:
1. Convert the Spark DataFrame to a Pandas DataFrame.
2. Apply the `dbpedia_lookup_linker` function from the kgextension library to the Pandas DataFrame.
3. Convert the resulting Pandas DataFrame back to a Spark DataFrame.
4. Return the Spark DataFrame with the new column(s) containing the DBpedia URIs.
"""
from kgextension.linking import dbpedia_lookup_linker
spark = SparkSession.builder.getOrCreate()
DataAcquisitionObject=DataAcquisition(spark)
pandasDf=dbpedia_lookup_linker(sparkDataFrame.toPandas(), column, new_attribute_name, progress, base_url, max_hits, query_class, lookup_api, caching)
sparkDataFrame2= spark.createDataFrame(clean_column_names(DataAcquisitionObject.nullDrop(pandasDf)))
return sparkDataFrame2
[docs]
def spark_specific_relation_generator(sparkDataFrame, columns, endpoint=DBpedia, uri_data_model=False, progress=True,
direct_relation="http://purl.org/dc/terms/subject",
hierarchy_relation=None, max_hierarchy_depth=1, prefix_lookup=False, caching=True):
"""
Generate attributes from a specific direct relation on a Spark DataFrame.
Args:
sparkDataFrame (pyspark.sql.DataFrame): The input Spark DataFrame.
columns (str or list): Name(s) of column(s) that contain(s) the link(s) to the knowledge graph.
endpoint (Endpoint, optional): SPARQL Endpoint to be queried; ignored when 'uri_data_model' is True.
Defaults to DBpedia.
uri_data_model (bool, optional): If enabled, the URI is directly queried instead of a SPARQL endpoint.
Defaults to False.
progress (bool, optional): If True, progress bars will be shown to inform the user about the progress made
by the process. Defaults to True.
direct_relation (str, optional): Direct relation used to create features. Defaults to
'http://purl.org/dc/terms/subject'.
hierarchy_relation (str, optional): Hierarchy relation used to connect categories. Defaults to None.
max_hierarchy_depth (int, optional): Maximal number of hierarchy steps taken. Defaults to 1.
prefix_lookup (bool/str/dict, optional): True: Namespaces of prefixes will be looked up at prefix.cc and added
to the SPARQL query.
str: User provides the path to a JSON file with prefixes and namespaces.
dict: User provides a dictionary with prefixes and namespaces.
Defaults to False.
caching (bool, optional): Turn result-caching for queries issued during the execution on or off. Defaults to True.
Returns:
pyspark.sql.DataFrame: DataFrame with additional features.
Notes:
This function generates attributes from a specific direct relation on a Spark DataFrame. It follows these steps:
1. Convert the Spark DataFrame to a Pandas DataFrame.
2. Apply the `specific_relation_generator` function from the kgextension library to the Pandas DataFrame.
3. Convert the resulting Pandas DataFrame back to a Spark DataFrame.
4. Return the Spark DataFrame with the additional features.
"""
from kgextension.generator import specific_relation_generator
spark = SparkSession.builder.getOrCreate()
DataAcquisitionObject=DataAcquisition(spark)
pandasDf=specific_relation_generator(sparkDataFrame.toPandas(), columns, endpoint, uri_data_model, progress, direct_relation, hierarchy_relation, max_hierarchy_depth, prefix_lookup, caching)
sparkDataFrame2= spark.createDataFrame(clean_column_names(DataAcquisitionObject.nullDrop(pandasDf)))
return sparkDataFrame2