53 questions
0
votes
0
answers
110
views
Delta Table to Iceberg metadata migration is failing
I am trying to migrate Delta Tables to Iceberg using Scala-Spark keeping the data intact, following this - https://iceberg.apache.org/docs/1.4.3/delta-lake-migration/
Here is the sample code (...
0
votes
1
answer
40
views
Caching Processed Data In Spark in the Event of Task Failures so as to Not Reprocess the same when the Task Restarts
We have a use-case wherein we need to cache certain data that has been processed so that Spark does not reprocess the same data in the event of task failures.
So say we have a thousand Foo objects for ...
1
vote
1
answer
64
views
Ignoring codec because it collides with previously generated codec
i am trying to register custom code(for map) like below
val session: CqlSession = CassandraConnector.apply(spark.sparkContext).openSession()
val codecRegistry: MutableCodecRegistry = session....
0
votes
1
answer
52
views
Is there a way to partition/group by data where sum of column values per each group is under a limit?
i want to partition/group rows for every group of size <= limit
for example, if i have:
+--------+----------+
| id| size|
+--------+----------+
| 1| 3|
| 2| 6|
...
0
votes
1
answer
43
views
Scala spark query optimization
I have two dataframes that have 300 columns and 1000 rows each. They have the same column names. The values are of mixed datatypes like Struct/List/Timestamp/String/etc. I am trying to compare the ...
2
votes
1
answer
1k
views
org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus can't be cast to org.apache.spark.sql.execution.datasources.FileStatusWithMetadat
Getting the following error while creating a delta table using scalaspark. _delta_log is getting created at the warehouse but it lands into this error after _delta_log creation
Exception in thread &...
1
vote
3
answers
1k
views
How to setup and run scala-spark in intellij?
I am trying to run use Intellij to build spark applications written in scala. I get the following error when I execute the scala program:
Exception in thread "main" java.lang....
0
votes
0
answers
79
views
Scala Spark distributed run on Google Cloud Platform but worker not working
i'm newbie to Scala Spark programming. I have to build a Recommendation System for movies in Scala Spark with the usage of Google Cloud Platform. The dataset is composed by (movie_id, user_id, rating) ...
0
votes
1
answer
73
views
Explode nested list of objects into DataFrame in Spark
I have a dataframe that looks like this
| Column |
|------------------------------------------------|
|[{a: 2, b: 4}, {a: 2, b: 3}] |
|-------...
-1
votes
3
answers
111
views
Access newly created column in withinColumn
I have the following dataset:
|value|
+-----+
| 1|
| 2|
| 3|
I want to create a new column newValue that takes the value of newValue from the previous row and does something with it. For ...
1
vote
2
answers
136
views
Spark Array column - Find max interval between two values
I have a Scala Spark dataframe with the schema:
root
|-- passengerId: string (nullable = true)
|-- travelHist: array (nullable = true)
| |-- element: integer (containsNull = true)...
-1
votes
2
answers
127
views
Divide a column value into multiple rows by number of months based on start date & end date columns
I want to divide the quantity value into multiple rows divided by number of months from start date & end date column. Each row should have start date and end date of the month. I also want ...
0
votes
1
answer
2k
views
Scala - Create Dataframe with only 1 row from a List using for comprehension
For some weird reasons I need to get the column names of a dataframe and insert it as the first row(I cannot just import without header). I tried using for comprehension to create a dataframe that ...
1
vote
1
answer
609
views
How to improve spark filter() performance on an array of struct?
I am working on a spark project and have some performance issue that I am struggling with, any help will be appreciated.
I have a column Collection that is an array of struct:
root
|-- Collection: ...
0
votes
1
answer
210
views
Spark extract values from Json struct
I have a spark dataframe column (custHeader) in the below format and I want to extract the value of the key - phone into a separate column. trying to use the from_json function, but it is giving me a ...
0
votes
1
answer
222
views
Spark broadcasts right dataset from left join, which causes org.apache.spark.sql.execution.OutOfMemorySparkException
Spark broadcasts right dataset from left join, which causes org.apache.spark.sql.execution.OutOfMemorySparkException: Size of broadcasted table far exceeds estimates and exceeds limit of spark.driver....
-1
votes
1
answer
69
views
How to apply custom logics on spark dataframe using scala
Imagine a data for this question is in a nested json structure. I have flattened the data from json using explode() and added it into one data-frame with columns project, Task, Task-Evidence, Task-...
1
vote
2
answers
775
views
Performance Implications of spark read
I am trying to understand if there is any difference in the following approaches, in terms of memory usage, optimisation, parallelism etc.
Scenario:
CSV files in an S3 bucket. 100 columns, more than ...
3
votes
1
answer
3k
views
MountVolume.SetUp failed for volume "spark-conf-volume-driver" : configmap "spark-drv-27c9b887c306cb9c-conf-map" not found
I am trying to run scala distributed code using spark-submit with cluster mode in minikube.
1.I used this dockerfile
FROM datamechanics/spark:2.4.6-hadoop-3.1.0-java-8-scala-2.12-python-3.7-dm18
...
0
votes
3
answers
135
views
Convert Vector[String] to Dataframe in Scala Spark
I have this Vector[String]:
user_uid,score,value
255938,34096,8
259117,34599,10
253664,28891,7
how can I convert it to DataFrame?
I already tried this:
val dataInVectorRow = dataInVectorString
.map(...
0
votes
1
answer
100
views
spark scala exploding struct array throwing error ambigous reference to fields
Currently I'm working on exploding a struct array with pair of keys are same.
{
"A": [{
"AA": {
"AB": "21",
"AC": &...
0
votes
1
answer
630
views
save dataframe with records limit but also make sure same value is not across multiple files
suppose I have this dataframe:
id
value
A
1
A
2
A
3
B
1
B
2
C
1
D
1
D
2
and so on. basically I want to make sure even with records limit any certain id can only appear in one single file(suppose ...
0
votes
1
answer
70
views
Why different behavior when mixed case are used, vs same case are used in spark 3.2
I am running a simple query in spark 3.2
val df1 = sc.parallelize(List((1,2,3,4,5),(1,2,3,4,5))).toDF("id","col2","col3","col4", "col5")
val ...
0
votes
1
answer
63
views
Issues running Graph queries after upgrading Spark 2.4.3 to 3.1.3
We are upgrading the Scala Spark
Spark from 2.4.3 to 3.1.3
scalaVersion from 2.11.8 to 2.12.10
spark-cassandra-connector from 2.4.2 to 3.1.0
Cassandra version 3.2 and all the subsequent dependancies.
...
1
vote
1
answer
358
views
Performance degraded after upgrading from spark-cassandra-connector 2.4.0 to 3.1.0
Context: Working on a message processing application which processes millions of messages every day. The application is built using scala, spark, and uses Kafka, Cassandra DB. Multiple DB queries are ...
1
vote
2
answers
119
views
Get the column names of the max and second value for each record in a scala dataframe
I have a dataframe with a lot of columns, but for this example, we can use this one:
`val dfIn = sqlContext.createDataFrame(Seq(("r0", 0, 2, 3, "a"),("r1", 1, 0, 0, "...
0
votes
1
answer
166
views
How to create Scala trait which stores data from other columns in dataset and then create new dataset with column storing the trait in Scala?
I am new to Scala and am currently studying datasets for Scala and Spark. Based on my input dataset below, I am trying to create a new dataset (see below). In the new dataset, I aim to have a new ...
1
vote
2
answers
1k
views
Different behaviour of same query in Spark 2.3 vs Spark 3.2
I am running a simple query in two versions of spark, 2.3 & 3.2. The code is as below
spark-shell --master yarn --deploy-mode client
val df1 = sc.parallelize(List((1,2,3,4,5),(1,2,3,4,5))).toDF(&...
0
votes
0
answers
205
views
How to integrate Intellij and Databricks, like when using the jdwp with a regular Spark cluster?
I have been looking online for awhile, but have found nothing, thus this question. I would like to be able to debug my Apache Spark code (written in Scala) remotely on Databricks, similar to the way ...
0
votes
1
answer
173
views
Add a tag to the list in the DataFrame based on the threshold given for the values in the list in Scala Spark
I have a Dataframe that has a column "grades" containing a list of Grade objects that have 2 fields: name (String) and value (Double). I would like to add the word PASS to the list of tags ...
0
votes
1
answer
67
views
Add a tag to the list in the DataFrame based on the data from the second DataFrame
I have two DataFrames - the first one with the columns model, cnd, age, tags (this is a repeatable field - String list/array), min, max and the second one with the main_model column.
I would like to ...
0
votes
1
answer
756
views
Convert spark scala dataset of one type to another
I have a dataset with following case class type:
case class AddressRawData(
addressId: String,
customerId: String,
address: ...
0
votes
1
answer
2k
views
inequality test of two columns from same dataframe in pyspark
in scala spark we can filter if column A value is not equal to column B or same dataframe as
df.filter(col("A")=!=col("B"))
How we can do this same in Pyspark ?
I have tried ...
0
votes
0
answers
131
views
DataFrame values Changing after adding columns using withColumn
I have created a dataframe by reading the data from db2 and dataframe looks like below.
df1.show()
Table_Name | Source_count | Target_Count
----------------------------------------
Test_tab | 12750 ...
1
vote
1
answer
109
views
spark-submit error loading class with fatjar on macOS
I am trying to run a simple hello world spark application
This is my code
package com.sd.proj.executables
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, ...
0
votes
1
answer
56
views
Why we use Val for Accumulators and not Var in scala?
Why we use Val instead of Var for accumulators? If its like a counter that's shared across for multiple executor nodes to just update/change it, then it means reassigning a Val right?
val accum = sc....
0
votes
2
answers
370
views
Coalesce dynamic column list from two datasets
I am trying to translate a pyspark job, which is dynamically coalescing the columns from two datasets with additional filters/condition.
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("&...
0
votes
2
answers
8k
views
How to use when() .otherwise function in Spark with multiple conditions
This is my first post so let me know if I need to give more details.
I am trying to create a boolean column, "immediate", that shows true when at least on of the columns has some data in it. ...
0
votes
0
answers
67
views
ScalaSpark - Difference between 2 dataframes - Identify inserts, updates and deletes
I am trying to translate below code from pyspark to scala.
I am able to successfully create the dataframes from input data.
from pyspark.sql.functions import col, array, when, array_remove, lit, size, ...
0
votes
1
answer
635
views
How Spark broadcast the data in Broadcast Join
How Spark broadcast the data when we use Broadcast Join with hint - As I can see when we use the broadcast hint: It calls this function
def broadcast[T](df: Dataset[T]): Dataset[T] = {
Dataset[T](...
2
votes
2
answers
13k
views
How to set Spark Config in an AWS Glue job, using Scala Spark?
When running my job, I am getting the following exception:
Exception in User Class: org.apache.spark.SparkException : Job aborted due to stage failure: Task 32 in stage 2.0 failed 4 times, most recent ...
0
votes
0
answers
54
views
Joing large RDDs in scala spark
I want to join large(1TB) data RDD with medium(10GB) size data RDD. There was an earlier processing on large data with was completing in 8 hours. I then joined the medium sized data to get an info ...
0
votes
1
answer
495
views
How to convert 'Jul 24 2022' to '2022-07-24' in spark sql
I want to convert a string date column to a date or timestamp (YYYY-MM-DD). How can i do it in scala Spark Sql ?
Input:
D1
Apr 24 2022|
Jul 08 2021|
Jan 16 2022|
Expected :
D2
2022-04-24|
2021-07-08|
...
0
votes
1
answer
417
views
Need to add quotes for all in spark
Need to add quotes for all in spark dataframe
Input:
val someDF = Seq(
| ("user1", "math","algebra-1","90"),
| ("user1", "physics&...
2
votes
0
answers
2k
views
spark GroupBy throws StateSchemaNotCompatible exception with different "Existing key schema"
I am reading and writing events from EventHub in spark after trying to aggregated based on few keys like this:
val df1 = df0
.groupBy(
colKey,
colTimestamp
)
.agg(...
0
votes
1
answer
360
views
Cannot stream files in subfolders with wildcards in pySpark streaming
This code works only if I make directory="s3://bucket/folder/2022/10/18/4/*"
from pyspark.sql.functions import from_json
from pyspark.streaming import StreamingContext
ssc = ...
1
vote
2
answers
1k
views
Spark merge two columns that are arrays of different structs with overlapping field
I have a question I was unable to solve when working with Scala Spark (or PySpark).
How can we merge two fields that are arrays of structs of different fields.
For example, if I have schema like so:
...
0
votes
1
answer
806
views
Installing external packages in spark on spylon kernel
I'm looking for a way to install outside packages on spylon kernel. I already tried initialize spark-shell with --package command inside the spylon but it justs creates another instance. I tried %%...
3
votes
2
answers
3k
views
How to move files from one S3 bucket directory to another directory in same bucket? Scala/Java
I want to move all files under a directory in my s3 bucket to another directory within the same bucket, using scala.
Here is what I have:
def copyFromInputFilesToArchive(spark: SparkSession) : Unit = {...
1
vote
1
answer
479
views
CSV Coma Delimiter Split in Spark RDD but NOT to split coma with in double quotes
I have a CSV file with data as below
id,name,comp_name
1,raj,"rajeswari,motors"
2,shiva,amber kings
my requirement is to read this file to spark RDD, then do map split with coma delimiter.
...