You are basically looking for table pivoting; for your case, groupBy id, pivot col1 as headers, and aggregate col2 as list using collect_list function:
df.groupBy("id").pivot("col1").agg(collect_list("col2")).show
+---+----+--------+----+
| id| p1| p2| p3|
+---+----+--------+----+
| x|[a1]| [b1]| []|
| y| []|[b2, b3]|[c1]|
+---+----+--------+----+
If it's guaranteed that there's at most one value in p1 and p3 for each id, you can convert those columns to String type by getting the first item of the array:
df.groupBy("id").pivot("col1").agg(collect_list("col2"))
.withColumn("p1", $"p1"(0)).withColumn("p3", $"p3"(0))
.show
+---+----+--------+----+
| id| p1| p2| p3|
+---+----+--------+----+
| x| a1| [b1]|null|
| y|null|[b2, b3]| c1|
+---+----+--------+----+
If you need to convert the column types dynamically, i.e. only use array type column types when you have to:
// get array Type columns
val arrayColumns = df.groupBy("id", "col1").agg(count("*").as("N"))
.where($"N" > 1).select("col1").distinct.collect.map(row => row.getString(0))
// arrayColumns: Array[String] = Array(p2)
// aggregate / pivot data frame
val aggDf = df.groupBy("id").pivot("col1").agg(collect_list("col2"))
// aggDf: org.apache.spark.sql.DataFrame = [id: string, p1: array<string> ... 2 more fields]
// get string columns
val stringColumns = aggDf.columns.filter(x => x != "id" && !arrayColumns.contains(x))
// use foldLeft on string columns to convert the columns to string type
stringColumns.foldLeft(aggDf)((df, x) => df.withColumn(x, col(x)(0))).show
+---+----+--------+----+
| id| p1| p2| p3|
+---+----+--------+----+
| x| a1| [b1]|null|
| y|null|[b2, b3]| c1|
+---+----+--------+----+