3

I'm new to scala and spark and I need to build a graph from a dataframe. this is the structure of my dataframe where S and O are nodes and column P presents edges.

+---------------------------+---------------------+----------------------------+
|S                          |P                    |O                           |
+---------------------------+---------------------+----------------------------+
|http://website/Jimmy_Carter|http://web/name      |James Earl Carter           |
|http://website/Jimmy_Car   |http://web/country   |http://website/United_States|
|http://website/Jimmy_Car   |http://web/birthPlace|http://web/Georgia_(US)     |
+---------------------------+---------------------+----------------------------+

This is the code of the dataframe and I want to create a graph from the dataframe "dfA"

 val test = sc
     .textFile("testfile.ttl")
     .map(_.split(" "))
     .map(p => Triple(Try(p(0).toString()).toOption,
                      Try(p(1).toString()).toOption,
                      Try(p(2).toString()).toOption))
     .toDF()

  val url_regex = """^(?:"|<{1}\s?)(.*)(?:>(?:\s\.)?|,\s.*)$"""
  val dfA = test
      .withColumn("Subject", regexp_extract($"Subject", url_regex, 1))
      .withColumn("Predicate", regexp_extract($"Predicate", url_regex, 1))
      .withColumn("Object", regexp_extract($"Object", url_regex, 1))
3
  • Have you had a chance to take a look at the answer? Is it solving your problem? Commented Apr 9, 2019 at 15:49
  • Hi, yes the answer solved the problem. but I changed ' val edges : RDD[Edge(VertexId, VertexId, String)] ' to ' val edges : RDD[Edge[String]] '. Thanks a lot ! Commented Apr 9, 2019 at 17:02
  • And sorry for the late reply, I had issues and I wasn't able to verify the code. I did it today. Commented Apr 9, 2019 at 17:13

1 Answer 1

13

To create a GraphX graph, you need to extract the vertices from your dataframe and associate them to IDs. Then, you need to extract the edges (2-tuples of vertices + metadata) using these IDs. And all that needs to be in RDDs, not dataframes.

In other words, you need a RDD[(VertexId, X)] for vertices, and a RDD[Edge(VertexId, VertexId, Y)] where X is the vertex metadata and Y the edge metadata. Note that VertexId is just an alias for Long.

In your case, with "S" and "O" the vertex columns and "P" the edge column, it would go as follows.

// Let's create the vertex RDD.
val vertices : RDD[(VertexId, String)] = df
    .select(explode(array('S, 'O))) // S and O are the vertices
    .distinct // we remove duplicates
    .rdd.map(_.getAs[String](0)) // transform to RDD
    .zipWithIndex // associate a long index to each vertex
    .map(_.swap)

// Now let's define a vertex dataframe because joins are clearer in sparkSQL
val vertexDf = vertices.toDF("id", "node")

// And let's extract the edges and join their vertices with their respective IDs
val edges : RDD[Edge[String]] = df
    .join(vertexDf, df("S") === vertexDf("node")) // getting the IDs for "S"
    .select('P, 'O, 'id as 'idS)
    .join(vertexDf, df("O") === vertexDf("node")) // getting the IDs for "O"
    .rdd.map(row => // creating the edge using column "P" as metadata 
      Edge(row.getAs[Long]("idS"), row.getAs[Long]("id"), row.getAs[String]("P")))

// And finally
val graph = Graph(vertices, edges)
Sign up to request clarification or add additional context in comments.

2 Comments

I have a node with multiple metadata can you help me create vertices RDD for the same ?
At least in Spark 3.4.x, it should be Edge[String] and not Edge[(VertexId, VertexId, String)]

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.