import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}; import org.apache.spark.sql.functions._ import scala.collection.mutable.WrappedArray import org.apache.spark.sql.DataFrame import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier} import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.feature.FeatureHasher
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.FeatureHasher
object LinkPrediction { // Unsupervised methods for link prediction def linkPredictionMeasures(dataset: DataFrame, adjacency_list: DataFrame): DataFrame = { // adding source node neighbors to the dataset val df1 = dataset.join(adjacency_list, dataset.col("source_node") === adjacency_list.col("source")).select("source_node", "destination_node", "neighbors", "label").toDF("source_node", "destination_node", "src_neighbors", "label") // adding destination node neighbors to the dataset val df2 = df1.join(adjacency_list, df1.col("destination_node") === adjacency_list.col("source")).select("source_node", "destination_node", "src_neighbors", "neighbors", "label").toDF("source_node", "destination_node", "src_neighbors", "dest_neighbors", "label") // Common neighbors count measure val common_neighbors_count = udf { (Set1: WrappedArray[Int], Set2: WrappedArray[Int]) => (Set1.toList.intersect(Set2.toList)).size} val common_neighbors_df = df2.withColumn("common_neighbors_count", common_neighbors_count($"src_neighbors", $"dest_neighbors")) // Jaccard coefficient measure val jaccard_coefficient = udf { (Set1: WrappedArray[Int], Set2: WrappedArray[Int]) => (Set1.toList.intersect(Set2.toList)).size.toDouble/(Set1.toList.union(Set2.toList)).distinct.size.toDouble} val jaccard_coefficient_df = common_neighbors_df.withColumn("jaccard_coefficient", jaccard_coefficient($"src_neighbors", $"dest_neighbors")) // preferential attachment score val preferential_attachment_score = udf { (Set1: WrappedArray[Int], Set2: WrappedArray[Int]) => (Set1.toList.size*Set2.toList.size) } val pref_attach_score_df = jaccard_coefficient_df.withColumn("pref_attach_score", preferential_attachment_score($"src_neighbors", $"dest_neighbors")) // pref_attach_score_df.sort($"pref_attach_score".desc).show(10, false) // Adamic score and Resource Allocation Index val common_neighbors = udf { (Set1: WrappedArray[String], Set2: WrappedArray[String]) => (Set1.toList.intersect(Set2.toList)) } // Appending common neighbors column to the dataframe val df3 = pref_attach_score_df.withColumn("common_neighbors", common_neighbors($"src_neighbors", $"dest_neighbors")).select("source_node", "destination_node", "common_neighbors") val common_neighbors_explode_df = df3.withColumn("common_neighbors_explode", explode($"common_neighbors")).select("source_node", "destination_node", "common_neighbors_explode") val neighbors_count_df = adjacency_list.withColumn("degree", size($"neighbors")) val df4 = common_neighbors_explode_df.join(neighbors_count_df, common_neighbors_explode_df.col("common_neighbors_explode") === neighbors_count_df.col("source")).select("source_node", "destination_node", "common_neighbors_explode", "degree") // expression for adamic score val exp1 = "sum(1.0/log10(degree))" // expression for resource allocation index val exp2 = "sum(1.0/degree)" val adamic_and_resource_allocation_df = df4.groupBy($"source_node", $"destination_node").agg(expr(exp1).as("adamic_score"), expr(exp2).as("resource_allocation_index")).select("source_node", "destination_node", "adamic_score", "resource_allocation_index").toDF("source", "destination", "adamic_score", "resource_allocation_index") // Neighborhood distance measure val neighborhood_distance_score = udf { (common_neighbor_count: Double, pref_attach_score: Double) => common_neighbor_count/math.sqrt(pref_attach_score) } val neighborhood_distance_score_df = pref_attach_score_df.withColumn("neighborhood_distance_score", neighborhood_distance_score($"common_neighbors_count", $"pref_attach_score")) // Source node degree measure val source_node_degree_df = neighborhood_distance_score_df.withColumn("source_node_degree", size($"src_neighbors")) val link_prediction_measures = source_node_degree_df.join(adamic_and_resource_allocation_df, source_node_degree_df("source_node") === adamic_and_resource_allocation_df("source") && source_node_degree_df("destination_node") === adamic_and_resource_allocation_df("destination")).select("source_node", "destination_node", "common_neighbors_count", "jaccard_coefficient", "pref_attach_score", "adamic_score", "resource_allocation_index", "neighborhood_distance_score", "source_node_degree", "label") link_prediction_measures } def main(args: Array[String]): Unit = { // Reading the dataset of the DBLP graph val graphSchema = new StructType() .add("source_node",IntegerType,true) .add("destination_node",IntegerType,true) val dblp_graph = spark.read.option("header", "true").option("sep", "\t").option("inferSchema","true").schema(graphSchema).csv("/FileStore/tables/com_dblp_ungraph-6a67b.txt") val reversed_dblp_graph = dblp_graph.select("destination_node", "source_node").toDF("source_node", "destination_node") val dataset = dblp_graph.union(reversed_dblp_graph) dataset.cache() // Total edges val edges_count = dataset.count() println("Total number of edges are "+edges_count) // Total nodes println("Total number of nodes are "+dataset.select("source_node").distinct.count()) val adj_list = dataset.groupBy("source_node").agg(collect_list("destination_node") as "neighbors").toDF("source", "neighbors") // Getting all 2 length missing edges val dataset1 = dataset.toDF("source", "destination") val graph_missing_edges = dataset.join(dataset1, dataset.col("destination_node") === dataset1.col("source")).select("source_node", "destination") // filter self edges val df11 = graph_missing_edges.filter($"source_node" =!= $"destination").toDF("source_node", "destination_node") // filter existing edges val df12 = df11.except(dataset) // Train and Test Split val train_split: Double = 0.8 val test_split: Double = 0.2 val dataset_with_label = df11.intersect(dblp_graph).withColumn("label", lit(1)) val Array(train_edges, test_edges) = dataset_with_label.randomSplit(Array(train_split, test_split)) println("dataset with label count "+dataset_with_label.count()) val limit_missing_edges_count = dataset_with_label.count() // Limit the missing edges to the count of the existing edges val missing_edges = df12.limit(limit_missing_edges_count.toInt) val missing_edges_count = missing_edges.count() println("Total number of missing edges are "+missing_edges_count) val missing_edges_with_label = missing_edges.withColumn("label", lit(0)) val Array(missing_train_edges, missing_test_edges) = missing_edges_with_label.randomSplit(Array(train_split, test_split)) val train_df = missing_train_edges.union(train_edges) val test_df = missing_test_edges.union(test_edges) println("training set length "+train_df.count()) println("testing set length "+test_df.count()) val train_with_measures_df = linkPredictionMeasures(train_df, adj_list) val test_with_mesaures_df = linkPredictionMeasures(test_df, adj_list) // Training the above dataset with random forest classifier val hasher = new FeatureHasher().setInputCols(Array("resource_allocation_index","common_neighbors_count","adamic_score","neighborhood_distance_score","pref_attach_score","jaccard_coefficient","source_node_degree")).setOutputCol("features") val trainingData = hasher.transform(train_with_measures_df) val testData = hasher.transform(test_with_mesaures_df) // Train a RandomForest model. val rf = new RandomForestClassifier() .setLabelCol("label") .setFeaturesCol("features") .setNumTrees(5) // Train model. This also runs the indexers. val model = rf.fit(trainingData) // Make predictions val predictions = model.transform(testData) // Select example rows to display. predictions.select("prediction", "label", "features").show(5) // Select (prediction, true label) and compute test error. val evaluator = new MulticlassClassificationEvaluator() .setLabelCol("label") .setPredictionCol("prediction") .setMetricName("accuracy") val accuracy = evaluator.evaluate(predictions) println(s"Test Error = ${(1.0 - accuracy)}") } }
defined object LinkPrediction
// Reading the training dataset of facebook graph val graphSchema = new StructType() .add("source_node",IntegerType,true) .add("destination_node",IntegerType,true) val dblp_graph = spark.read.option("header", "true").option("sep", "\t").option("inferSchema","true").schema(graphSchema).csv("/FileStore/tables/com_dblp_ungraph-6a67b.txt") val reversed_dblp_graph = dblp_graph.select("destination_node", "source_node").toDF("source_node", "destination_node") val train_df = dblp_graph.union(reversed_dblp_graph) train_df.cache()
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
graphSchema: org.apache.spark.sql.types.StructType = StructType(StructField(source_node,IntegerType,true), StructField(destination_node,IntegerType,true))
dblp_graph: org.apache.spark.sql.DataFrame = [source_node: int, destination_node: int]
reversed_dblp_graph: org.apache.spark.sql.DataFrame = [source_node: int, destination_node: int]
train_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [source_node: int, destination_node: int]
res61: train_df.type = [source_node: int, destination_node: int]
// length of the dataset // reversed_dblp_graph.show(10, false) train_df.show(10, false)
+-----------+----------------+
|source_node|destination_node|
+-----------+----------------+
|0 |1 |
|0 |2 |
|0 |4519 |
|0 |23073 |
|0 |33043 |
|0 |33971 |
|0 |75503 |
|0 |101215 |
|0 |120044 |
|0 |123880 |
+-----------+----------------+
only showing top 10 rows
var graph = train_df.rdd val graphFinal = graph.map(x=>(x(0).asInstanceOf[String],x(1).asInstanceOf[String]))
graph: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[21] at rdd at command-804872739992427:1
graphFinal: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[22] at map at command-804872739992427:2
val train_df1 = train_df.toDF("source", "destination") val graph_missing_edges = train_df.join(train_df1, train_df.col("destination_node") === train_df1.col("source")).select("source_node", "destination")
train_df1: org.apache.spark.sql.DataFrame = [source: int, destination: int]
graph_missing_edges: org.apache.spark.sql.DataFrame = [source_node: int, destination: int]
// filter self edges val df11 = graph_missing_edges.filter($"source_node" =!= $"destination").toDF("source_node", "destination_node") // filter existing edges val df12 = df11.except(train_df) // limit to edges present val df2 = df12.limit(edges_count.toInt)
df11: org.apache.spark.sql.DataFrame = [source_node: int, destination_node: int]
df12: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [source_node: int, destination_node: int]
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [source_node: int, destination_node: int]
df2.show(10, false)
+-----------+----------------+
|source_node|destination_node|
+-----------+----------------+
|750 |44583 |
|1111 |22957 |
|1945 |72776 |
|3248 |246813 |
|3305 |32682 |
|3412 |367506 |
|4069 |23067 |
|4799 |350297 |
|4941 |1886 |
|6473 |126974 |
+-----------+----------------+
only showing top 10 rows