89

E.g

sqlContext = SQLContext(sc)

sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()

The above statement prints theentire table on terminal. But I want to access each row in that table using for or while to perform further calculations.

1
  • I believe I provided a correct answer. Can you select, or provide feedback to improve? Commented Dec 2, 2016 at 0:05

7 Answers 7

86

No and Yes.

No:

Technical speaking, you simply cannot iterate on DataFrames and other distributed data structures. They can only be accessed by dedicated higher order function and / or SQL methods (https://docs.python.org/3/glossary.html#term-iterable).

Yes:

You can use collect to get a local list of Row objects that can be iterated.

for row in df.rdd.collect():
    do_something(row)

or convert toLocalIterator

for row in df.rdd.toLocalIterator():
    do_something(row)

Note:

Sparks distributed data and distributed processing allows to work on amounts of data that are very hard to handle otherwise.

When using collect(), there is a trade off - e.g. you can loop over rows but the data might not fit into local memory anymore or computations might take much much more time.

Sign up to request clarification or add additional context in comments.

3 Comments

Newbie question: As iterating an already collected dataframe "beats the purpose", from a dataframe, how should I pick the rows I need for further processing?
Did some reading and looks like forming a new dataframe with where() would be the Spark-way of doing it properly.
"it beats all purpose of using Spark" is pretty strong and subjective language. The collect() method exists for a reason, and there are many valid uses cases for it. Once Spark is done processing the data, iterating through the final results might be the only way to integrate with/write to external APIs or legacy systems.
71

To "loop" and take advantage of Spark's parallel computation framework, you could define a custom function and use map.

def customFunction(row):

   return (row.name, row.age, row.city)

sample2 = sample.rdd.map(customFunction)

or

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))

The custom function would then be applied to every row of the dataframe. Note that sample2 will be a RDD, not a dataframe.

Map may be needed if you are going to perform more complex computations. If you just need to add a simple derived column, you can use the withColumn, with returns a dataframe.

sample3 = sample.withColumn('age2', sample.age + 2)

2 Comments

Can you please tell me how to actually use the customFunction so that the return values could be used inside a loop for further procesing? I have a collect() based approach but my data is too large and it causes the Pyspark (v. 3) to fail. Thank you!
hi @David, if I use map() on a rdd, will each row run customFunction() in order? In my case, I hope every row will be processed sequentially.
15

Using list comprehensions in python, you can collect an entire column of values into a list using just two lines:

df = sqlContext.sql("show tables in default")
tableList = [x["tableName"] for x in df.rdd.collect()]

In the above example, we return a list of tables in database 'default', but the same can be adapted by replacing the query used in sql().

Or more abbreviated:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]

And for your example of three columns, we can create a list of dictionaries, and then iterate through them in a for loop.

sql_text = "select name, age, city from user"
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
             for x in sqlContext.sql(sql_text).rdd.collect()]
for row in tupleList:
    print("{} is a {} year old from {}".format(
        row["name"],
        row["age"],
        row["city"]))

Comments

9

Give A Try Like this

    result = spark.createDataFrame([('SpeciesId','int'), ('SpeciesName','string')],["col_name", "data_type"]); 
    for f in result.collect(): 
        print (f.col_name)

Comments

9

It might not be the best practice, but you can simply target a specific column using collect(), export it as a list of Rows, and loop through the list.

Assume this is your df:

+----------+----------+-------------------+-----------+-----------+------------------+ 
|      Date|  New_Date|      New_Timestamp|date_sub_10|date_add_10|time_diff_from_now|
+----------+----------+-------------------+-----------+-----------+------------------+ 
|2020-09-23|2020-09-23|2020-09-23 00:00:00| 2020-09-13| 2020-10-03| 51148            | 
|2020-09-24|2020-09-24|2020-09-24 00:00:00| 2020-09-14| 2020-10-04| -35252           |
|2020-01-25|2020-01-25|2020-01-25 00:00:00| 2020-01-15| 2020-02-04| 20963548         |
|2020-01-11|2020-01-11|2020-01-11 00:00:00| 2020-01-01| 2020-01-21| 22173148         |
+----------+----------+-------------------+-----------+-----------+------------------+

to loop through rows in Date column:

rows = df3.select('Date').collect()

final_list = []
for i in rows:
    final_list.append(i[0])

print(final_list)

Comments

2

If you want to do something to each row in a DataFrame object, use map. This will allow you to perform further calculations on each row. It's the equivalent of looping across the entire dataset from 0 to len(dataset)-1.

Note that this will return a PipelinedRDD, not a DataFrame.

Comments

1

above

tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 

should be

tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]} 

for name, age, and city are not variables but simply keys of the dictionary.

2 Comments

Is a square bracket missing from right hand side of code line 2?
When you're not addressing the original question, don't post it as an answer but rather prefer commenting or suggest edit to the partially correct answer.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.