1

I am working with <spark.version>2.2.1</spark.version> I would like to write a dataframe that has a map field into postgres as json field.

Example code:

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.collection.immutable.HashMap

case class ExampleJson(map: HashMap[String,Long])

object JdbcLoaderJson extends App{

  val finalUrl = s"jdbc:postgresql://localhost:54321/development"
  val user = "user"
  val password = "123456"

  val sparkConf = new SparkConf()

  sparkConf.setMaster(s"local[2]")
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()

  def writeWithJson(tableName: String) : Unit = {

    def getProperties: Properties = {
      val p = new Properties()
      val prop = new java.util.Properties
      prop.setProperty("user", user)
      prop.setProperty("password", password)
      prop
    }

    var schema = "public"
    var table = tableName
    val asList = List(ExampleJson(HashMap("x" -> 1L, "y" -> 2L)),
                      ExampleJson(HashMap("y" -> 3L, "z" -> 4L)))

    val asDf = spark.createDataFrame(asList)
    asDf.show(false)
   asDf.write.mode(SaveMode.Overwrite).jdbc(finalUrl, tableName, getProperties)

  }

  writeWithJson("with_json")

}

Output:

+-------------------+
|map                |
+-------------------+
|Map(x -> 1, y -> 2)|
|Map(y -> 3, z -> 4)|
+-------------------+

Exception in thread "main" java.lang.IllegalArgumentException: Can't get JDBC type for map<string,bigint>
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:172)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:172)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType(JdbcUtils.scala:171)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$schemaString$1$$anonfun$23.apply(JdbcUtils.scala:707)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$schemaString$1$$anonfun$23.apply(JdbcUtils.scala:707)
    at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
    at scala.collection.AbstractMap.getOrElse(Map.scala:59)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$schemaString$1.apply(JdbcUtils.scala:707)
    
Process finished with exit code 1

I am actually ok with string as well instead of the map, it is more about writing json column to postgres from spark

3
  • can you post your target table schema ? Commented May 19, 2021 at 13:39
  • So usually the spark will create it automatically, but it does not matter any table with column of json Commented May 19, 2021 at 15:56
  • ok, can add some sample final output in table ? Commented May 19, 2021 at 16:02

1 Answer 1

1

Convert HashMap data into json string something like below.

asDf
.select(
    to_json(struct($"*"))
    .as("map")
)
.write
.mode(SaveMode.Overwrite)
.jdbc(finalUrl, tableName, getProperties)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.