1

I'm trying to read data out of HBase and save it as a sequenceFile, but getting

java.io.IOException: Could not find a serializer for the Value class: 'org.apache.hadoop.hbase.client.Result'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization.

error.

I saw two similar posts:

hadoop writables NotSerializableException with Apache Spark API

and

Spark HBase Join Error: object not serializable class: org.apache.hadoop.hbase.client.Result

Following those two posts, I registered Kyro classes with three classes, but still no luck.

Here's my program:

        String tableName = "validatorTableSample";
        System.out.println("Start indexing hbase: " + tableName);
        SparkConf sparkConf = new SparkConf().setAppName("HBaseRead");
        Class[] classes = {org.apache.hadoop.io.LongWritable.class, org.apache.hadoop.io.Text.class, org.apache.hadoop.hbase.client.Result.class};
        sparkConf.registerKryoClasses(classes);
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        Configuration conf = HBaseConfiguration.create();
        conf.set(TableInputFormat.INPUT_TABLE, tableName);
//      conf.setStrings("io.serializations",
//          conf.get("io.serializations"),
//          MutationSerialization.class.getName(),
//          ResultSerialization.class.getName());
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        JavaPairRDD<ImmutableBytesWritable, Result> hBasePairRDD = sc.newAPIHadoopRDD(
            conf,
            TableInputFormat.class,
            ImmutableBytesWritable.class,
            Result.class);

        hBasePairRDD.saveAsNewAPIHadoopFile("/tmp/tempOutputPath", ImmutableBytesWritable.class, Result.class, SequenceFileOutputFormat.class);
        System.out.println("Finished readFromHbaseAndSaveAsSequenceFile() .........");

Here's the error stacktrace:

java.io.IOException: Could not find a serializer for the Value class: 'org.apache.hadoop.hbase.client.Result'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization.
    at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1254)
    at org.apache.hadoop.io.SequenceFile$Writer.<init>(SequenceFile.java:1156)
    at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273)
    at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:530)
    at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.getSequenceWriter(SequenceFileOutputFormat.java:64)
    at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:75)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1112)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1095)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/05/25 10:58:38 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Could not find a serializer for the Value class: 'org.apache.hadoop.hbase.client.Result'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization.
    at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1254)
    at org.apache.hadoop.io.SequenceFile$Writer.<init>(SequenceFile.java:1156)
    at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273)
    at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:530)
    at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.getSequenceWriter(SequenceFileOutputFormat.java:64)
    at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:75)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1112)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1095)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/05/25 10:58:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
3
  • Did you get your error resolved ? Commented Aug 22, 2017 at 9:02
  • No, I haven't, I'm still faced with this issue, any clues please? Commented Aug 25, 2017 at 20:59
  • I have posted and answer ..Please try that once ..It worked for me Commented Aug 26, 2017 at 4:50

2 Answers 2

1

Here is what was needed to make it work

Because we use HBase to store our data and this reducer outputs its result to HBase table, Hadoop is telling us that he doesn’t know how to serialize our data. That is why we need to help it. Inside setUp set the io.serializations variable

conf.setStrings("io.serializations", new String[]{hbaseConf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName()});
Sign up to request clarification or add additional context in comments.

Comments

0

the code already pass the test

object HbaseDataExport extends LoggingTime{
  def main(args: Array[String]): Unit = {
    val con = SparkConfig.getProperties()
    val sparkConf = SparkConfig.getSparkConf()
    val sc = SparkContext.getOrCreate(sparkConf)
    val config = HBaseConfiguration.create()
    config.setStrings("io.serializations",
      config.get("io.serializations"),
      "org.apache.hadoop.hbase.mapreduce.MutationSerialization",
      "org.apache.hadoop.hbase.mapreduce.ResultSerialization")
    val path = "/Users/jhTian/Desktop/hbaseTimeData/part-m-00030"
    val path1 = "hdfs://localhost:9000/hbaseTimeData/part-m-00030"

    sc.newAPIHadoopFile(path1, classOf[SequenceFileInputFormat[Text, Result]], classOf[Text], classOf[Result], config).foreach(x => {
      import collection.JavaConversions._
      for (i <- x._2.listCells) {
        logger.info(s"family:${Bytes.toString(CellUtil.cloneFamily(i))},qualifier:${Bytes.toString(CellUtil.cloneQualifier(i))},value:${Bytes.toString(CellUtil.cloneValue(i))}")
      }
    })
    sc.stop()
  }
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.