2

This is how i load my csv file in spark data frame

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf



val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(4))

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df1Final=df1result.withColumn("DataPartition", lit(null: String))

This is example of one of my input file name .

Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPrivate.CUS.1.2017-09-07-1056.Full

Fundamental.FinancialLineItem.FinancialLineItem.Japan.CUS.1.2017-09-07-1056.Full.txt

Now i want to read this file and split it with "." operator and then add CUS as new column in place of DataPartition .

Can i do it without any UDF .

Here is the schema of existing data frame

root
 |-- LineItem_organizationId: long (nullable = true)
 |-- LineItem_lineItemId: integer (nullable = true)
 |-- StatementTypeCode: string (nullable = true)
 |-- LineItemName: string (nullable = true)
 |-- LocalLanguageLabel: string (nullable = true)
 |-- FinancialConceptLocal: string (nullable = true)
 |-- FinancialConceptGlobal: string (nullable = true)
 |-- IsDimensional: boolean (nullable = true)
 |-- InstrumentId: string (nullable = true)
 |-- LineItemSequence: string (nullable = true)
 |-- PhysicalMeasureId: string (nullable = true)
 |-- FinancialConceptCodeGlobalSecondary: string (nullable = true)
 |-- IsRangeAllowed: boolean (nullable = true)
 |-- IsSegmentedByOrigin: boolean (nullable = true)
 |-- SegmentGroupDescription: string (nullable = true)
 |-- SegmentChildDescription: string (nullable = true)
 |-- SegmentChildLocalLanguageLabel: string (nullable = true)
 |-- LocalLanguageLabel_languageId: integer (nullable = true)
 |-- LineItemName_languageId: integer (nullable = true)
 |-- SegmentChildDescription_languageId: integer (nullable = true)
 |-- SegmentChildLocalLanguageLabel_languageId: integer (nullable = true)
 |-- SegmentGroupDescription_languageId: integer (nullable = true)
 |-- SegmentMultipleFundbDescription: string (nullable = true)
 |-- SegmentMultipleFundbDescription_languageId: integer (nullable = true)
 |-- IsCredit: boolean (nullable = true)
 |-- FinancialConceptLocalId: integer (nullable = true)
 |-- FinancialConceptGlobalId: integer (nullable = true)
 |-- FinancialConceptCodeGlobalSecondaryId: string (nullable = true)
 |-- FFAction: string (nullable = true)

Updating the code after suggested answer

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.{input_file_name, regexp_extract}

spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(4))

import org.apache.spark.sql.functions.input_file_name

val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN")

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)

df1result.withColumn("cus_val", get_cus_val(input_file_name))

df1result.printSchema()
1
  • Does extracting text between SelfSourcedPrivate. and a dot (.) works for you? (Lets assume given sub string SelfSourcedPrivate.CUS.) Commented Oct 6, 2017 at 7:12

1 Answer 1

6

You can get the file name with predefined UDF i.e input_file_name(), After that either you can create a UDF to extract CUS or use regexp_extract wo UDF.

Using regexp_extract wo UDF regex usage here

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

df.withColumn("cus_val", 
  regexp_extract(input_file_name, "\.(\w+)\.[0-9]+\.", 1))

Using custom UDF

import org.apache.spark.sql.functions.udf

val get_cus_val = udf(filePath: String => filePath.split("\\.")(4))

import org.apache.spark.sql.functions.input_file_name

df.withColumn("cus_val", get_cus_val(input_file_name))
Sign up to request clarification or add additional context in comments.

1 Comment

The regex you have provided is returning empty values

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.