0

I am very new to scala (typically I do this in R)

I have imported a large dataframe (2000+ columns, 100000+ rows) that is zero-inflated.

Task To convert the data to libsvm format

Steps As I understand the steps are as follows

  1. Ensure feature columns are set to DoubleType and Target is an Int
  2. Iterate through each row, retaining each value >0 in one array and index of its column in another array
  3. Convert to RDD[LabeledPoint]
  4. Save RDD in libsvm format

I am stuck on 3 (but maybe) because I am doing step 2 wrong.

Here is my code:

Main Function:

@Test
def testSpark(): Unit =
{
try
{

  var mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")


  val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)

  val indexer = new StringIndexer()
    .setInputCol("Majors_Final")
    .setOutputCol("Majors_Final_Indexed")
  val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
  val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)



  //only doubles accepted by sparse vector, so that's what we filter for
  val fieldSeq: scala.collection.Seq[StructField] = schema.fields.toSeq.filter(f => f.dataType == DoubleType)

  val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)


  val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()


  assertTrue(true)
}
catch
{
  case ex: Exception =>
  {

    println(s"There has been an Exception. Message is ${ex.getMessage} and ${ex}")
    fail()
  }
  }
}

Convert each row to LabeledPoint:

 @throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label:Int): LabeledPoint =
{
  try
  {
    val values: Map[String, Double] = rowIn.getValuesMap(fieldNameSeq)

    val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)
    val rowValuesItr: Iterable[Double] = sortedValuesMap.values

    var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
    var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
    var currentPosition: Int = 0
    rowValuesItr.foreach
    {
      kv =>
        if (kv > 0)
        {
          valuesArray += kv;
          positionsArray += currentPosition;
        }
        currentPosition = currentPosition + 1;
    }

    val lp:LabeledPoint = new LabeledPoint(label,  org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray, valuesArray.toArray))

    return lp

  }
  catch
  {
    case ex: Exception =>
    {
      throw new Exception(ex)
    }
  }
}

Problem So then I try to create a dataframe of labeledpoints which can easily be converted to an RDD.

val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()

But I get the following error:

SparkTest.scala:285: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for seri alizing other types will be added in future releases. [INFO] val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()

3
  • 1
    Did you try import spark.implicits._ as the error message mentioned? Also, usually return is not used in scala, it can create problems. Commented Sep 1, 2017 at 3:30
  • Task not serializable and org.apache.spark.SparkException: Task not serializable after adding implicits Commented Sep 1, 2017 at 3:55
  • I think I need to look at matrices Commented Sep 1, 2017 at 3:56

1 Answer 1

0

OK, so I skipped the DataFrame and created an Array of LabeledPoints whish is easily converted to an RDD. The rest is easy.

I stress, that while this works, I am new to scala and there may be more efficient ways to do this.

Main Function is now as follows:

  val mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")
  val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)

  val indexer = new StringIndexer()
    .setInputCol("Majors_Final")
    .setOutputCol("Majors_Final_Indexed")
  val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
  val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)

  mDFFinal.show()
  //only doubles accepted by sparse vector, so that's what we filter for
  val fieldSeq: scala.collection.Seq[StructField] = mDFFinal.schema.fields.toSeq.filter(f => f.dataType == DoubleType)
  val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)

  var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]()

  mDFFinal.collect().foreach
  {

    row => positionsArray+=convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"));

  }

  val mRdd:RDD[LabeledPoint]= spark.sparkContext.parallelize(positionsArray.toSeq)

  MLUtils.saveAsLibSVMFile(mRdd, "./output/libsvm")
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.