I'm looking for a way to debug spark pandas UDF in vscode and Pycharm Community version (place breakpoint and stop inside UDF). At the moment when breakpoint is placed inside UDF debugger doesn't stop.
In the reference below there is described Local mode and Distributed mode.
I'm trying at least to debug in Local mode. Pycharm/VS Code there should be a way to debug local enc by "Attach to Local Process". Just I can not figure out how.
At the moment I can not find any answer how to attach pyspark debugger to local process inside UDF in VS Code(my dev ide).
I found only examples below in Pycharm.
- Attache to local process How can PySpark be called in debug mode?
When I try to attach to process I'm getting message below in Pycharm. In VS Code I'm getting msg that process can not be attached.
Attaching to a process with PID=33,692
/home/usr_name/anaconda3/envs/yf/bin/python3.8 /snap/pycharm-community/223/plugins/python-ce/helpers/pydev/pydevd_attach_to_process/attach_pydevd.py --port 40717 --pid 33692
WARNING: The 'kernel.yama.ptrace_scope' parameter value is not 0, attach to process may not work correctly.
Please run 'sudo sysctl kernel.yama.ptrace_scope=0' to change the value temporary
or add the 'kernel.yama.ptrace_scope = 0' line to /etc/sysctl.d/10-ptrace.conf to set it permanently.
Process finished with exit code 0
Server stopped.
- pyspark_xray https://github.com/bradyjiang/pyspark_xray With this package, it is possible to debug rdds running on worker, but I was not able to adjust package to debug UDFs
Example code, breakpoint doesn't stop inside UDF pandas_function(url_json):
import pandas as pd
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
spark = pyspark.sql.SparkSession.builder.appName("test") \
.master('local[*]') \
.getOrCreate()
sc = spark.sparkContext
# Create initial dataframe respond_sdf
d_list = [('api_1',"{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }"),
(' api_2', "{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }")]
schema = StructType([
StructField('url', StringType(), True),
StructField('content', StringType(), True)
])
jsons = sc.parallelize(rdd_list)
respond_sdf = spark.createDataFrame(jsons, schema)
# Pandas UDF
def pandas_function(url_json):
# Here I want to place breakpoint
df = pd.DataFrame(eval(url_json['content'][0]))
return df
# Pnadas UDF transformation applied to respond_sdf
respond_sdf.groupby(F.monotonically_increasing_id()).applyInPandas(pandas_function, schema=schema).show()