11

I am very new to AWS Glue. I am working on a small project and the ask is to read a file from S3 bucket, transpose it and load it in a mysql table. The source data in S3 bucket looks as below

    +----+----+-------+-----+---+--+--------+
    |cost|data|minutes|name |sms|id|category|
    +----+----+-------+-----+---+--+--------+
    |  5 |1000|  200  |prod1|500|p1|service |
    +----+----+-------+-----+---+--+--------+

The target table structure is Product_id, Parameter, value

I am expecting target table to have following values

p1, cost, 5

P1, data, 1000

I am able to load the target table with ID and Value. But I am not able to populate the parameter column. This column is not present in the input data and I want to populate a string depending on which column value I am populating.

Here is the code I used for cost.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## @type: DataSource
## @args: [database = "mainclouddb", table_name = "s3product", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mainclouddb", table_name = "s3product", transformation_ctx = "datasource0")

## @type: ApplyMapping
## @args: [mapping = [("cost", "long", "value", "int"), ("id", "string", "product_id", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("cost", "long", "value", "int"), ("id", "string", "product_id", "string")], transformation_ctx = "applymapping1")

## @type: SelectFields
## @args: [paths = ["product_id", "parameter", "value"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["product_id", "parameter", "value"], transformation_ctx = "selectfields2")

## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "resolvechoice3")

## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")

## @type: DataSink
## @args: [database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "datasink5")

job.commit()

Can somebody help me to add this new column to my data frame so that it can be made available in the table?

Thanks

3
  • Btw, just noticed that the spark variable is unused... Commented Nov 12, 2019 at 17:00
  • Could you be more clear on how you want to derive the column which is not present in your source data? Commented Nov 27, 2019 at 6:43
  • I am planning to use the column name. Basically it will be a name value-pair and there will be one record for each name - cost, data and so on. I can use the column name as hardcoded string for this new column. Commented Nov 27, 2019 at 19:57

2 Answers 2

2

For a smaller datsframe you can do the following

  1. convert the dynamic frame to spark dataframe
  2. add a column
  3. convert back to dynamic frame

step 1

datasource0 = datasource0.toDF()

step 2

from pyspark.sql.functions import udf
getNewValues = udf(lambda val: val+1) # you can do what you need to do here instead of val+1

datasource0 = datasource0.withColumn('New_Col_Name', getNewValues(col('some_existing_col'))

step 3

from awsglue.dynamicframe import DynamicFrame
datasource0 = DynamicFrame.fromDF(datasource0, glueContext, "datasource0")

The issue is when you have a large dataset the operation toDF() is very expensive!

Sign up to request clarification or add additional context in comments.

1 Comment

So what's the solution for bigger dataframes?
2

One way to add columns to a dynamicframe directly without converting a spark dataframe in between is to use a Map transformation (note that this is different from ApplyMapping).

So let's assume that your input dynframe (with the data looking like in your example row) is called dyf_in.

You can do something like the following to create 2 separate dynamicframes, one with the cost entries, and another with the data entries:

from awsglue.gluetypes import _create_dynamic_record
def getCosts(rec):
  return _create_dynamic_record({
    'Product_id':rec['id'],
    'Parameter':'cost',
    'value':rec['cost'
  }
def getDatas(rec):
  return _create_dynamic_record({
    'Product_id':rec['id'],
    'Parameter':'data',
    'value':rec['data']
  }

dyf_costs = Map.apply(frame=dyf_in, f=getCosts, transformation_ctx='dyf_costs')
dyf_datas = Map.apply(frame=dyf_in, f=getDatas, transformation_ctx='dyf_datas')

And then you either push those dynamicframes into the same sink, or use something like Join (after adding an extra column in the Map funcs to use as a unique join key, and then dropping it afterwards) to concatenate the two dynamicframes into a single one.

One thing I'm not sure if Glue is able to do (at least with Map) is do this sort of a transpose (which is what you're sort of trying to do?) directly without running through the same dynamicframe twice as my example does.

Glue Databrew seems to have some sort of a transpose function available, but I don't know much about Databrew, and maybe it's not even applicable to your situation, so I won't comment on that further.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.