3

My DataFrame table contains rows such as

['row1', 'col_1', 'col_2', 'col_3', ..., 'col_N', 'alpha']

N (the number of columns except the first and the last ones) is relatively large.

Now, I need to create another DataFrame out of this by multiplying each of these columns named col_i by column alpha. Is there a smarter way than to do a manual multiplication per each of these columns, as in:

sc = SparkContext()
sqlc = SQLContext(sc)

sqlc.sql('SELECT col_1 * alpha, col_2 * alpha, ..., col_N * alpha FROM table')

So I'd like to know whether it's possible to do the same operation on each column without specifically writing it for each one.

3 Answers 3

2

Not sure how efficient this is, but I might do something like this:

col_names = df.columns
# start from one to exclude row, end at -1 to exclude alpha column
for x in range(1, len(col_names) - 1): 
    new_column_name = col_names[x] + "_x_alpha" # get good new column names
    df = df.withColumn(new_column_name, (getattr(df, col_names[x]) * getattr(df, col_names[3])))

This will yield the same dataframe you had originally, but with new columns that multiply each column_* entry by the entry in alpha.

When I run df.show() on my trivial example, I get this output:

row col_1 col_2 alpha col_1_x_alpha col_2_x_alpha
1   2     3     4     8             12           
2   3     4     5     15            20           
3   4     5     6     24            30  

Then you could run a SQL query to get only those columns that are of type column_*_x_alpha.

Sign up to request clarification or add additional context in comments.

Comments

1

You can use the list comprehension:

df.select(*[(F.col(col) * F.col('alpha')).alias(col) for col in df.columns if col != 'alpha'])

Comments

0

Old question... but I ran into the same issue and found this question while looking for the answer.

As @Katya Willard suspected, her answer is not the most efficient. The pyspark documentation notes about withColumn():

(...) calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select() with the multiple columns at once. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html

This introduces the question of how to do the operation on columns with a select statement. You want to build the select statement in a loop, and only then send it to your dataframe in one go:

from pyspark.sql.functions import col
# We need the 'col' function to use string column names without the df.colname syntax

def col_transform_select_list(col_name_list_for_transform, full_col_list):
  '''Prepare SELECT column list where some columns have the value multiplied 
     by the value from 'alpha` into a renamed column.
     The other columns are returned untouched and the column order is preserved.
  '''
  selection_col_list = list()
  alias_pattern = "{}_*_alpha"
  for col_name in full_col_list:
    if col_name in col_name_list_for_transform:
      selection_col_list.append((col(col_name)*col('alpha')).alias(alias_pattern.format(col_name)))
    else:
      selection_col_list.append(col_name)
  return selection_col_list

# Now run the select query directly on your table:
cols_to_multiply = [colname for colname in table.columns if colname.startswith('col_')]
prepared_column_list = col_transform_select_list(cols_to_multiply, table.columns)
multiplied_table = table.select(prepared_column_list)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.