LinkPrediction(Scala)
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.FeatureHasher
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} import org.apache.spark.sql.functions._ import scala.collection.mutable.WrappedArray import org.apache.spark.sql.DataFrame import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier} import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.feature.FeatureHasher
object LinkPrediction {
  // Unsupervised methods for link prediction
  def linkPredictionMeasures(dataset: DataFrame, adjacency_list: DataFrame): DataFrame = {
         // adding source node neighbors to the dataset
       val df1 = dataset.join(adjacency_list, dataset.col("source_node") === adjacency_list.col("source")).select("source_node", "destination_node", "neighbors", "label").toDF("source_node", "destination_node", "src_neighbors", "label")
        
        // adding destination node neighbors to the dataset
       val df2 = df1.join(adjacency_list, df1.col("destination_node") === adjacency_list.col("source")).select("source_node", "destination_node", "src_neighbors", "neighbors", "label").toDF("source_node", "destination_node", "src_neighbors", "dest_neighbors", "label")
        
        
       // Common neighbors count measure
       val common_neighbors_count = udf { 
         (Set1: WrappedArray[Int], Set2: WrappedArray[Int]) => 
           (Set1.toList.intersect(Set2.toList)).size}
      
       val common_neighbors_df = df2.withColumn("common_neighbors_count", common_neighbors_count($"src_neighbors", $"dest_neighbors"))
    
      // Jaccard coefficient measure
      val jaccard_coefficient = udf { 
       (Set1: WrappedArray[Int], Set2: WrappedArray[Int]) => 
         (Set1.toList.intersect(Set2.toList)).size.toDouble/(Set1.toList.union(Set2.toList)).distinct.size.toDouble}
      
      val jaccard_coefficient_df = common_neighbors_df.withColumn("jaccard_coefficient", jaccard_coefficient($"src_neighbors", $"dest_neighbors"))
    
      // preferential attachment score
      val preferential_attachment_score = udf { 
       (Set1: WrappedArray[Int], Set2: WrappedArray[Int]) => 
         (Set1.toList.size*Set2.toList.size) }
      
      val pref_attach_score_df = jaccard_coefficient_df.withColumn("pref_attach_score", preferential_attachment_score($"src_neighbors", $"dest_neighbors"))
//       pref_attach_score_df.sort($"pref_attach_score".desc).show(10, false)
          
      // Adamic score and Resource Allocation Index
      val common_neighbors = udf { 
       (Set1: WrappedArray[String], Set2: WrappedArray[String]) => 
         (Set1.toList.intersect(Set2.toList)) }
    
      // Appending common neighbors column to the dataframe
      val df3 = pref_attach_score_df.withColumn("common_neighbors", common_neighbors($"src_neighbors", $"dest_neighbors")).select("source_node", "destination_node", "common_neighbors")
    
      val common_neighbors_explode_df = df3.withColumn("common_neighbors_explode", explode($"common_neighbors")).select("source_node", "destination_node", "common_neighbors_explode")
    
      val neighbors_count_df = adjacency_list.withColumn("degree", size($"neighbors"))
    
      val df4 = common_neighbors_explode_df.join(neighbors_count_df, common_neighbors_explode_df.col("common_neighbors_explode") === neighbors_count_df.col("source")).select("source_node", "destination_node", "common_neighbors_explode", "degree")
      
      // expression for adamic score
      val exp1 = "sum(1.0/log10(degree))"
      // expression for resource allocation index
      val exp2 = "sum(1.0/degree)"
      
      val adamic_and_resource_allocation_df = df4.groupBy($"source_node", $"destination_node").agg(expr(exp1).as("adamic_score"), expr(exp2).as("resource_allocation_index")).select("source_node", "destination_node", "adamic_score", "resource_allocation_index").toDF("source", "destination", "adamic_score", "resource_allocation_index")
    
      // Neighborhood distance measure
      val neighborhood_distance_score = udf { 
       (common_neighbor_count: Double, pref_attach_score: Double) => 
         common_neighbor_count/math.sqrt(pref_attach_score) }
    
      val neighborhood_distance_score_df = pref_attach_score_df.withColumn("neighborhood_distance_score", neighborhood_distance_score($"common_neighbors_count", $"pref_attach_score"))
    
      // Source node degree measure
      val source_node_degree_df = neighborhood_distance_score_df.withColumn("source_node_degree", size($"src_neighbors"))
    
      val link_prediction_measures = source_node_degree_df.join(adamic_and_resource_allocation_df, source_node_degree_df("source_node") === adamic_and_resource_allocation_df("source") && source_node_degree_df("destination_node") === adamic_and_resource_allocation_df("destination")).select("source_node", "destination_node", "common_neighbors_count", "jaccard_coefficient", "pref_attach_score", "adamic_score", "resource_allocation_index", "neighborhood_distance_score", "source_node_degree", "label")
      link_prediction_measures
    }
  
  
  def main(args: Array[String]): Unit = {
      // Reading the dataset of the DBLP graph
      val graphSchema = new StructType()
         .add("source_node",IntegerType,true)
         .add("destination_node",IntegerType,true)
      val dblp_graph = spark.read.option("header", "true").option("sep", "\t").option("inferSchema","true").schema(graphSchema).csv("/FileStore/tables/com_dblp_ungraph-6a67b.txt")
      val reversed_dblp_graph = dblp_graph.select("destination_node", "source_node").toDF("source_node", "destination_node")
      val dataset = dblp_graph.union(reversed_dblp_graph)
      dataset.cache()
      
      // Total edges
      val edges_count = dataset.count()
      println("Total number of edges are "+edges_count)
      
      // Total nodes
      println("Total number of nodes are "+dataset.select("source_node").distinct.count())
    
      val adj_list = dataset.groupBy("source_node").agg(collect_list("destination_node") as "neighbors").toDF("source", "neighbors")
          
      // Getting all 2 length missing edges 
      val dataset1 = dataset.toDF("source", "destination")
      val graph_missing_edges = dataset.join(dataset1, dataset.col("destination_node") === dataset1.col("source")).select("source_node", "destination")
    
      // filter self edges
      val df11 = graph_missing_edges.filter($"source_node" =!= $"destination").toDF("source_node", "destination_node")
      // filter existing edges
      val df12 = df11.except(dataset)
      
      // Train and Test Split
      val train_split: Double = 0.8
      val test_split: Double = 0.2
    
      val dataset_with_label = df11.intersect(dblp_graph).withColumn("label", lit(1))
      val Array(train_edges, test_edges) = dataset_with_label.randomSplit(Array(train_split, test_split))
      println("dataset with label count "+dataset_with_label.count())
    
      val limit_missing_edges_count = dataset_with_label.count()
      
      // Limit the missing edges to the count of the existing edges
      val missing_edges = df12.limit(limit_missing_edges_count.toInt)
      val missing_edges_count = missing_edges.count()
      println("Total number of missing edges are "+missing_edges_count)
      
      val missing_edges_with_label = missing_edges.withColumn("label", lit(0))
      val Array(missing_train_edges, missing_test_edges) = missing_edges_with_label.randomSplit(Array(train_split, test_split))
      
      val train_df = missing_train_edges.union(train_edges)
      val test_df = missing_test_edges.union(test_edges)
      println("training set length "+train_df.count())
      println("testing set length "+test_df.count())
      val train_with_measures_df = linkPredictionMeasures(train_df, adj_list)
      val test_with_mesaures_df = linkPredictionMeasures(test_df, adj_list)
      
      // Training the above dataset with random forest classifier
      val hasher = new FeatureHasher().setInputCols(Array("resource_allocation_index","common_neighbors_count","adamic_score","neighborhood_distance_score","pref_attach_score","jaccard_coefficient","source_node_degree")).setOutputCol("features")
      val trainingData = hasher.transform(train_with_measures_df)
      val testData = hasher.transform(test_with_mesaures_df)
    
      // Train a RandomForest model.
      val rf = new RandomForestClassifier()
        .setLabelCol("label")
        .setFeaturesCol("features")
        .setNumTrees(5)

      // Train model. This also runs the indexers.
      val model = rf.fit(trainingData)
    
      // Make predictions
      val predictions = model.transform(testData)

      // Select example rows to display.
      predictions.select("prediction", "label", "features").show(5)
    
      // Select (prediction, true label) and compute test error.
      val evaluator = new MulticlassClassificationEvaluator()
        .setLabelCol("label")
        .setPredictionCol("prediction")
        .setMetricName("accuracy")
      val accuracy = evaluator.evaluate(predictions)
      println(s"Test Error = ${(1.0 - accuracy)}")
  }
}
defined object LinkPrediction
val emptyArray =  Array.empty[String]
LinkPrediction.main(emptyArray);
Total number of edges are 2099732 Total number of nodes are 317080 dataset with label count 976724 Total number of missing edges are 976724 training set length 1563750 testing set length 389698
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 628.0 failed 1 times, most recent failure: Lost task 4.0 in stage 628.0 (TID 36135, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
// Reading the training dataset of facebook graph
val graphSchema = new StructType()
  .add("source_node",IntegerType,true)
  .add("destination_node",IntegerType,true)
val dblp_graph = spark.read.option("header", "true").option("sep", "\t").option("inferSchema","true").schema(graphSchema).csv("/FileStore/tables/com_dblp_ungraph-6a67b.txt")
val reversed_dblp_graph = dblp_graph.select("destination_node", "source_node").toDF("source_node", "destination_node")
val train_df = dblp_graph.union(reversed_dblp_graph)
train_df.cache()
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} graphSchema: org.apache.spark.sql.types.StructType = StructType(StructField(source_node,IntegerType,true), StructField(destination_node,IntegerType,true)) dblp_graph: org.apache.spark.sql.DataFrame = [source_node: int, destination_node: int] reversed_dblp_graph: org.apache.spark.sql.DataFrame = [source_node: int, destination_node: int] train_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [source_node: int, destination_node: int] res61: train_df.type = [source_node: int, destination_node: int]
// length of the dataset
// reversed_dblp_graph.show(10, false)
train_df.show(10, false)
+-----------+----------------+ |source_node|destination_node| +-----------+----------------+ |0 |1 | |0 |2 | |0 |4519 | |0 |23073 | |0 |33043 | |0 |33971 | |0 |75503 | |0 |101215 | |0 |120044 | |0 |123880 | +-----------+----------------+ only showing top 10 rows
val edges_count = train_df.count()
println("Total number of edges present are : "+edges_count)
Total number of edges present are : 2099732 edges_count: Long = 2099732
// Number of vertices
train_df.select("source_node").distinct.count()
res74: Long = 317080
train_df.filter($"source_node" === 1 && $"destination_node" === 0).count()
res66: Long = 1
var graph = train_df.rdd
val graphFinal = graph.map(x=>(x(0).asInstanceOf[String],x(1).asInstanceOf[String]))
graph: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[21] at rdd at command-804872739992427:1 graphFinal: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[22] at map at command-804872739992427:2
graphFinal.take(10)
res3: Array[(String, String)] = Array((1,690569), (1,315892), (1,189226), (2,834328), (2,1615927), (2,1194519), (2,470294), (2,961886), (2,626040), (3,176995))
val adj_list = graphFinal.groupByKey()
// adj_list.lookup("1")(0).toList.map(x => println(x))
println(adj_list.lookup("3").length)
1 adj_list: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[26] at groupByKey at command-1799595110804121:1
val train_df1 = train_df.toDF("source", "destination")
val graph_missing_edges = train_df.join(train_df1, train_df.col("destination_node") === train_df1.col("source")).select("source_node", "destination")
train_df1: org.apache.spark.sql.DataFrame = [source: int, destination: int] graph_missing_edges: org.apache.spark.sql.DataFrame = [source_node: int, destination: int]
// filter self edges
val df11 = graph_missing_edges.filter($"source_node" =!= $"destination").toDF("source_node", "destination_node")
// filter existing edges
val df12 = df11.except(train_df)
// limit to edges present
val df2 = df12.limit(edges_count.toInt)
df11: org.apache.spark.sql.DataFrame = [source_node: int, destination_node: int] df12: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [source_node: int, destination_node: int] df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [source_node: int, destination_node: int]
// Total number of missing edges
df2.count()
res80: Long = 2099732
df2.show(10, false)
+-----------+----------------+ |source_node|destination_node| +-----------+----------------+ |750 |44583 | |1111 |22957 | |1945 |72776 | |3248 |246813 | |3305 |32682 | |3412 |367506 | |4069 |23067 | |4799 |350297 | |4941 |1886 | |6473 |126974 | +-----------+----------------+ only showing top 10 rows