0

I am trying to parse a log file with millions of records. It contains host name, timestamp, status code etc. After successfully parsing host and status code and url, when I am trying to parse timestamp, I am getting an error. following is my code:

lines=sc.textFile(filepath)
df_log= lines.map(lambda x: Row(header=x)).toDF()
timestamp_pattern= r'\[\d{2}\/\w{3}\/\d{4}\:\d{2}\:\d{2}\:\d{2}\s\S+\d{4}]'
df2=df_log.select(regexp_extract(col('header'),timestamp_pattern,1).alias("timestamp"))

everything is working fine till here. after this when I am trying to df2.show(10). I am getting following error:

Py4JJavaErrorTraceback (most recent call last) <ipython-input-112-5a86d13b2926> in <module>()
----> 1 df2.show(1)

/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
    316         """
    317         if isinstance(truncate, bool) and truncate:
--> 318             print(self._jdf.showString(n, 20))
    319         else:
    320             print(self._jdf.showString(n, int(truncate)))

/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)    1131         answer = self.gateway_client.send_command(command)    1132         return_value
= get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)    1134     1135         for temp_arg in temp_args:

/opt/cloudera/parcels/SPARK2/lib/spark2/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o965.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, ip-20-0-31-210.ec2.internal, executor 2): java.lang.IndexOutOfBoundsException: No group 1  at java.util.regex.Matcher.group(Matcher.java:538)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)   at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)     at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)   at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)   at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)     at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)   at org.apache.spark.scheduler.Task.run(Task.scala:99)   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)  at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1430)    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1417)     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1417)  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:797)     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:797)     at scala.Option.foreach(Option.scala:257)   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:797)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1645)     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1600)   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1589)   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:623)   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1943)    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1956)    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)   at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2378)    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)   at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2772)  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2377)     at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2384)   at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2120)   at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2119)   at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2802)   at org.apache.spark.sql.Dataset.head(Dataset.scala:2119)    at org.apache.spark.sql.Dataset.take(Dataset.scala:2334)    at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)   at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:498)     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)   at py4j.Gateway.invoke(Gateway.java:280)    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)     at py4j.commands.CallCommand.execute(CallCommand.java:79)   at py4j.GatewayConnection.run(GatewayConnection.java:214)   at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IndexOutOfBoundsException: No group 1     at java.util.regex.Matcher.group(Matcher.java:538)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)   at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)     at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)   at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)   at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)     at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)   at org.apache.spark.scheduler.Task.run(Task.scala:99)   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)  ... 1 more

Please help me with this.

following is the excerpt of the records in the log file for referance:

109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] GET /administrator/ HTTP/1.1 200 4263 - Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -

109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] GET /administrator/ HTTP/1.1 200 4263 - Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -

109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] GET /administrator/ HTTP/1.1 200 4263 - Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -

109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] POST /administrator/index.php HTTP/1.1 200 4494 http://almhuette-raith.at/administrator/ Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -

46.72.177.4 - - [12/Dec/2015:18:31:08 +0100] GET /administrator/ HTTP/1.1 200 4263 - Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -

46.72.177.4 - - [12/Dec/2015:18:31:08 +0100] POST /administrator/index.php HTTP/1.1 200 4494 http://almhuette-raith.at/administrator/ Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -

83.167.113.100 - - [12/Dec/2015:18:31:25 +0100] GET /administrator/ HTTP/1.1 200 4263 - Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -

83.167.113.100 - - [12/Dec/2015:18:31:25 +0100] POST /administrator/index.php HTTP/1.1 200 4494 http://almhuette-raith.at/administrator/ Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -

95.29.198.15 - - [12/Dec/2015:18:32:10 +0100] GET /administrator/ HTTP/1.1 200 4263 - Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -

95.29.198.15 - - [12/Dec/2015:18:32:11 +0100] POST /administrator/index.php HTTP/1.1 200 4494 http://almhuette-raith.at/administrator/ Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -

109.184.11.34 - - [12/Dec/2015:18:32:56 +0100] GET /administrator/ HTTP/1.1 200 4263 - Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -

109.184.11.34 - - [12/Dec/2015:18:32:56 +0100] POST /administrator/index.php HTTP/1.1 200 4494 http://almhuette-raith.at/administrator/ Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 - `

1 Answer 1

1

change last line to -

timestamp_pattern= r'\[\d{2}\/\w{3}\/\d{4}\:\d{2}\:\d{2}\:\d{2}\s\S+\d{4}]'
df2=df_log.select(regexp_extract(col('header'),timestamp_pattern,0).alias("timestamp"))

note: I changed the groupId from 1 to 0 since there is no group set in your timestamp_pattern

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.