27

I'm sure this is a simple SQLContext question, but I can't find any answer in the Spark docs or Stackoverflow

I want to create a Spark Dataframe from a SQL Query on MySQL

For example, I have a complicated MySQL query like

SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...

and I want a Dataframe with Columns X,Y and Z

I figured out how to load entire tables into Spark, and I could load them all, and then do the joining and selection there. However, that is very inefficient. I just want to load the table generated by my SQL query.

Here is my current approximation of the code, that doesn't work. Mysql-connector has an option "dbtable" that can be used to load a whole table. I am hoping there is some way to specify a query

  val df = sqlContext.format("jdbc").
    option("url", "jdbc:mysql://localhost:3306/local_content").
    option("driver", "com.mysql.jdbc.Driver").
    option("useUnicode", "true").
    option("continueBatchOnError","true").
    option("useSSL", "false").
    option("user", "root").
    option("password", "").
    sql(
"""
select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100
"""
    ).load()
1

5 Answers 5

42

I found this here Bulk data migration through Spark SQL

The dbname parameter can be any query wrapped in parenthesis with an alias. So in my case, I need to do this:

val query = """
  (select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
    join DialogLine as dl on dl.DialogID=d.DialogID
    join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID
    join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
    join WordRoot as wr on wr.WordRootID=wi.WordRootID
    where d.InSite=1 and dl.Active=1
    limit 100) foo
"""

val df = sqlContext.format("jdbc").
  option("url", "jdbc:mysql://localhost:3306/local_content").
  option("driver", "com.mysql.jdbc.Driver").
  option("useUnicode", "true").
  option("continueBatchOnError","true").
  option("useSSL", "false").
  option("user", "root").
  option("password", "").
  option("dbtable",query).
  load()

As expected, loading each table as its own Dataframe and joining them in Spark was very inefficient.

Sign up to request clarification or add additional context in comments.

Comments

4

If you have your table already registered in your SQLContext, you could simply use sql method.

val resultDF = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")

3 Comments

By the way, this is not the gist of your question so I add this as a comment. If you are looking how to achieve previous steps (connect to mysql, etc...), you can check this post Spark + MySQL example
Thanks. I already found how to load an entire table into Spark. However, my problem is that I have a complicated query joining many large tables, and selecting only a few columns. I was hoping to create a single simple dataframe with only a the selected columns
How to register the table with SQLContext?
1

TL;DR: just create a view in your database.

Detail: I have a table t_city in my postgres database, on which I create a view:

create view v_city_3500 as
    select asciiname, country, population, elevation
    from t_city
    where elevation>3500
    and population>100000

select * from v_city_3500;

 asciiname | country | population | elevation
-----------+---------+------------+-----------
 Potosi    | BO      |     141251 |      3967
 Oruro     | BO      |     208684 |      3936
 La Paz    | BO      |     812799 |      3782
 Lhasa     | CN      |     118721 |      3651
 Puno      | PE      |     116552 |      3825
 Juliaca   | PE      |     245675 |      3834

In the spark-shell:

val sx= new org.apache.spark.sql.SQLContext(sc)

var props=new java.util.Properties()
props.setProperty("driver", "org.postgresql.Driver" )
val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn"

val city_df=sx.read.jdbc(url=url,table="t_city",props)
val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)

Result:

city_df.count()
Long = 145725

city_3500_df.count()
Long = 6

1 Comment

With this approach, the responsibility of filtering the rows is not in the application anymore, but in the database (separation of concerns). Depending on the use-case and ecosystem, it may be an advantage (e.g. if reused in multiple applications) the or a disadvantage (e.g. if you only use it once).
1

to save the output of a query to a new dataframe, simple set the result equal to a variable:

val newDataFrame = spark.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")

and now newDataFrame is a dataframe with all the dataframe functionalities available to it.

1 Comment

Thanks, but I need to join many tables in SQL and select only a few columns. I don't want to load every table into Spark. Can I create a SQL table to load via a query? See the details I added to my question.
0

with MYSQL read/loading data something like below

val conf = new SparkConf().setAppName("SparkMe Application").setMaster("local[2]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val jdbcDF = sqlContext.read.format("jdbc").options(
      Map("url" -> "jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password",
        "dbtable" -> "TABLE_NAME")).load()

write data to table as below

import java.util.Properties
    val prop = new Properties()
    prop.put("user", "<>")
    prop.put("password", "simple$123")
    val dfWriter = jdbcDF.write.mode("append")
    dfWriter.jdbc("jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password", "tableName", prop)

to create dataframe from query do something like below

val finalModelDataDF = {
      val query = "select * from table_name"
      sqlContext.sql(query)
    };

    finalModelDataDF.show()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.