1

I am new with python spark and I need your help, thanks in advance for that!

So here we go, I have this piece of script:

from datetime import datetime
from pyspark import SparkContext

def getNormalizedDate(dateOfCL):
        #the result will be in [0,1]
        dot=datetime.now()
        od=datetime.strptime("Jan 01 2010", "%b %d %Y")

        return (float((dateOfCL-od).days)/float((dot-od).days))

def addition(a, b):
        a1=a
        b1=b
        if not type(a) is float:
                a1=getNormalizedDate(a)
        if not type(b) is float:
                b1=getNormalizedDate(b)

        return float(a1+b1)

def debugFunction(x):
        print "x[0]: " + str(type(x[0]))
        print "x[1]: " + str(type(x[1])) + " --> " + str(x[1])
        return x[1]



if __name__ == '__main__':
        sc = SparkContext("local", "File Scores")

        textFile = sc.textFile("/data/spark/file.csv")
        #print "Number of lines: " + str(textFile.count())

        test1 = textFile.map(lambda line: line.split(";"))
        # result of this:
        # [u'01', u'01', u'add', u'fileName', u'Path', u'1', u'info', u'info2', u'info3', u'Sep 24 2014']

        test2 = test1.map(lambda line: (line[3], datetime.strptime(line[len(line)-1], "%b %d %Y")))

        test6=test2.reduceByKey(addition)
        #print test6
        test6.persist()

        result=sorted(test6.collect(), key=debugFunction)

This ends with an error:

Traceback (most recent call last):
  File "/data/spark/script.py", line 40, in <module>
    result=sorted(test6.collect(), key=lambda x:x[1])
TypeError: can't compare datetime.datetime to float

For info, test6.collect() gives this content

[(u'file1', 0.95606060606060606), 
(u'file2', 0.91515151515151516), 
(u'file3', 0.8797979797979798), 
(u'file4', 0.0), 
(u'file5', 0.94696969696969702), 
(u'file6', 0.95606060606060606), 
(u'file7', 0.98131313131313136), 
(u'file8', 0.86161616161616161)]

and I want to sort it based on the float value (not the key) How should proceed please?

Thank you guys.

1
  • ok, I modified the code to add a debugFunction. I wanted to display the content of my test6 and weirdly, it still has some "dates" inside, for instance: x[0]: <type 'unicode'> x[1]: <type 'float'> --> 19.0141414141 x[0]: <type 'unicode'> x[1]: <type 'datetime.datetime'> --> 2014-09-19 00:00:00 Could someone explain why I still got dates please? Commented Jun 4, 2015 at 16:19

2 Answers 2

1

For those who might be interested, I found the problem. I was reducing by key, and after that performing the addition of items contained in the list of values. some of the files are unique and won't be affected by this reduction, so they will still have a date instead of a float.

what i do now is

test2 = test1.map(lambda line: (line[3], line[len(line)-1])).map(getNormalizedDate)

that will make pairs of (file, float)

only then, i reduce by key

finally, the step

result=sorted(test6.collect(), key=lamba x:x[1])

gives me the right sorting i was looking for.

I hope this helps!!

Sign up to request clarification or add additional context in comments.

Comments

0

I for one prefer working with [DataFrames][1] over RDDs whenever possible, the API is more high-level. You can order the data on a data frame by a specific columns like so:

df = spark.read.csv('input_data.csv')
df.sort('column_name').write.csv(path='output_path')

where spark is an instance of pyspark.sql.session.SparkSession class.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.