2

I had the following code:

     import org.jooq._ 
     import org.jooq.impl._ 
     import org.jooq.impl.DSL._ 
     import java.sql.DriverManager
     import org.apache.log4j.receivers.db.dialect.SQLDialect

     val session = SparkSession.builder().getOrCreate()
     var df1 = session.emptyDataFrame
     var df2 = session.emptyDataFrame

     val userName = "user"
     val password = "pass"

     val c = DriverManager.getConnection("jdbc:mysql://blah_blah.com", userName, password)

     df1 = sql(s"select * from $db1_name.$tb1_name")
     df2 = c.prepareStatement(s"select * from $db2_name.$tb2_name")

Then I got the following error:

    found : org.jooq.SQL
    required: org.apache.spark.sql.DataFrame
    (which expands to) 
    org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
    df1 = sql(s"select * from $db1_name.$tb1_name")
             ^

    found : java.sql.PreparedStatement
    required: org.apache.spark.sql.DataFrame
    (which expands to) 
    org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
    df2 = c.prepareStatement(s"select * from $db2_name.$tb2_name")

Then per the comments suggestions I changed my code to:

I have the following Scala code:

    val userName = "user"
    val password = "pass"

    val session = SparkSession.builder().getOrCreate()
    var df1 = session.emptyDataFrame
    var df2 = session.emptyDataFrame

    ....
    ....
    df1 = sql(s"select * from $db1_name.$tb1_name")
    df2 = session.read.format("jdbc").
    option("url", "jdbc:mysql://blah_blah.com").
    option("driver", "com.mysql.jdbc.Driver").
    option("useUnicode", "true").
    option("continueBatchOnError","true").
    option("useSSL", "false").
    option("user", userName).
    option("password", password).
    option("dbtable",s"select * from $db2_name.$tb2_name").load()

I am getting errors as following:

    The last packet sent successfully to the server was 0 milliseconds 
    ago. The driver has not received any packets from the server.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
    at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:989)
    at com.mysql.jdbc.MysqlIO.readPacket(MysqlIO.java:632)
    at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1016)
    at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2194)
    at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2225)
    at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2024)
    at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:779)
    at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
    at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:389)
    at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:115)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
    ... 78 elided
    Caused by: java.io.EOFException: Can not read response from server. 
    Expected to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
    at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:3011)
    at com.mysql.jdbc.MysqlIO.readPacket(MysqlIO.java:567)
    ... 100 more

Any solution or suggestion on these two errors?

I have tried postgresql and h2 driver as well => org.postgresql.Driver

But I get similar errors (not exact maybe)

3
  • 2
    Maybe helpful? That solution uses spark directly to query the database rather than pulling data from jdbc and then trying to fit it into a DataFrame. Commented Sep 23, 2019 at 20:41
  • I get this error: The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. Commented Sep 24, 2019 at 3:17
  • spark.sql where spark means that the spark Session. Commented Sep 24, 2019 at 11:54

2 Answers 2

1

Your issue is that the scala compilere have already initialized the var ds1 and ds2 as empty dataframe. you have to try to read directly from spark:

spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "select c1, c2 from t1")
.load()

for other info you can check directly on the apache spark page

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

Sign up to request clarification or add additional context in comments.

8 Comments

I need to have them saved as dataframe. I tried your solution the output of the queries won't be dataframe.
By definition of Apache Spark library the code that I wrote returns a sql.Dataframe. So you are sure that write in the right way? post your code!
I can assign to an empty dataframe as I was going to. But the connection is refused and I am not sure why: Caused by: java.net.ConnectException: Connection refused (Connection refused). So I do: var df2 = SparkSession.builder().getOrCreate().emptyDataFrame and then df2 = session.read.format("jdbc").option("url", "jdbc:mysql://blah_blah.com").option("driver", "com.mysql.jdbc.Driver").option("useUnicode", "true").option("continueBatchOnError","true").option("useSSL", "false").option("user", userName).option("password", password).option("dbtable",s"select * from $db2_name.$tb2_name").load()
@Alan can you post all the stack trace for connection refuse and update the code on you post? Are you sure that you still use the right username and password?
Yes i use correct username and password. I even ssh to the server from GCP using the credentials. I updated the question along with the errors that I am seeing.
|
1

You can simply get a DataFrame by reading as below. Set you connection details:

val jdbcHostname = "some.host.name"
val jdbcDatabase = "some_db"
val driver = "com.mysql.cj.jdbc.Driver" // update driver as needed, In your case it will be `org.postgresql.Driver`
// url to DB
val jdbcUrl = s"jdbc:mysql://$jdbcHostname:3306/$jdbcDatabase"
val username = "someUser"
val password = "somePass"

// create a properties map for your DB connection
val connectionProperties = new Properties()

connectionProperties.put("user", s"${username}")
connectionProperties.put("password", s"${password}")
connectionProperties.setProperty("Driver", driver)

and then read from JDBC as:

// use above created url and connection properties to fetch data
val tableName = "some-table"
val mytable = spark.read.jdbc(jdbcUrl, tableName, connectionProperties)

Spark automatically reads the schema from the database table and maps its types back to Spark SQL types.

You can use the above mytable dataframe to run your queries or save data.

Say you want to select the columns like and save then

// your select query
val selectedDF = mytable.select("c1", "c2")
// now you can save above dataframe

6 Comments

Thanks for your detailed answer. I get this error though: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
@Alan you can download the MySQL connector and add to the classpath. You can even add this dependency to your project - <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.11</version>
@Alan I just noticed you need to use org.postgresql.Driver as your driver
Thanks for your comment. Actually, as I mentioned in my main post that I've tried org.postgresql.Driver as well.
@Alan you are facing connection issues to server. Can you add ?useSSL=false&autoReconnect=true to your JDBC URL. If SSL false is not working, you can remove just that and try
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.