0

I have a source file which has some bad data records, I want to handle ArrayIndexOutofBoundsException and increment my accumulator which is used to output bad data records.

val test = sourceFile.map(x => x.split("\\|")).
  filter(line => line(0).contains("1017")).map(x => {
    try {
      x.filter(line => {
        line.length >= 37
      }).map(x => x(38))
    }
    catch {
      case x: ArrayIndexOutOfBoundsException => {
        println("Bad Data Found".format(x))
        Linecounter += 1
        None
      }
    }
  })

test.count()
test.saveAsTextFile(Quotepath)
println("Bad Data Count %s:-".format(Linecounter))

Problem is I can't see accumulator output as count of records example 1 or etc .., can anyone help not sure whats wrong here.

5
  • What's your actual/expected output? Commented Oct 23, 2015 at 11:08
  • 0 records in the output as expected but I want my accumulator to get incremented when ever there is a ArrayIndexOutOfBoundException example Bad Data Count:- 1 Commented Oct 23, 2015 at 11:26
  • Do you get "Bad Data Found" printed? What's the definition of Linecounter? Commented Oct 23, 2015 at 11:30
  • Bad Data Count :- 0 this is what getting printed at the moment I like this to be Bad Data Count :- 1 Commented Oct 23, 2015 at 11:36
  • If you're using ArrayIndexOutOfBoundsException to find which 'x' values don't have 39 elements, e.g. x(38), then why not just count the values that meet that criteria? In other words, why go through the obfuscated method of exception handling??? Commented Oct 23, 2015 at 12:15

2 Answers 2

4

Let

val xs = (1 to 5).toArray

We would like to fetch values from xs by some index, yet it may be out of bounds. We lift xs onto a partial function from indices onto corresponding values,

val xsL = xs.lift
xsL: Int => Option[Int] = <function1>

We now fetch values from xsL by some index, where out of bound indices are not defined in the partial function and hence deliver None,

val res = xs.map( i => xsL(i*2) )
Array[Option[Int]] = Array(Some(3), Some(5), None, None, None)

namely indices 6, 8, and 10 are out of bounds.

In order to collect the defined part consider

res.flatten
Array[Int] = Array(3, 5)

In order to count the number of out of bound indices consider

res.count(_ == None)
Int = 3

This approach avoids the use of exception catching and (mutable) variables, while containing all the information required.

Sign up to request clarification or add additional context in comments.

Comments

0

There is no need to use 'try' & 'catch' clauses to get the functionality you're looking for:

val goodLines = sourceFile.map(x => x.split("\\|"))
                          .filter(_(0).contains("1017"))
                          .toSeq //just in case sourceFile is an Iterator

val test = goodLines.filter(_.length > 38)
                    .map(_(38))

val Linecounter = goodLines.count(_.length < 39)

4 Comments

println("Count of Bad Status Rows:-%s".format(FilterDataSet("1013").filter(.length < 10).count())) println("Count of Bad Luld Rows:-%s".format(FilterDataSet("1041").filter(.length < 10).count())) println("Count of Bad Quote Rows:-%s".format(FilterDataSet("1017").filter(_.length < 10).count())) I cant get size on these above commands so need to sue count is there a other way to get size ?
Instead of filter(.length < 10).count() I think what you want is .count(_.length < 10).
Hi , I liked your first code which you explained using (size) with out count , count is a transformation which is over head on spark but I wasn't able to find size function ..
There are many ways to skin the cat: (i) filter(.length < 10).size, (ii) .count(.length <10), (iii) .map(l => if(l.length < 10) 1 else 0)).sum

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.