5

Getting started with pyspark.ml and the pipelines API, I find myself writing custom transformers for typical preprocessing tasks in order to use them in a pipeline. Examples:

from pyspark.ml import Pipeline, Transformer


class CustomTransformer(Transformer):
    # lazy workaround - a transformer needs to have these attributes
    _defaultParamMap = dict()
    _paramMap = dict()
    _params = dict()

class ColumnSelector(CustomTransformer):
    """Transformer that selects a subset of columns
    - to be used as pipeline stage"""

    def __init__(self, columns):
        self.columns = columns


    def _transform(self, data):
        return data.select(self.columns)


class ColumnRenamer(CustomTransformer):
    """Transformer renames one column"""


    def __init__(self, rename):
        self.rename = rename

    def _transform(self, data):
        (colNameBefore, colNameAfter) = self.rename
        return data.withColumnRenamed(colNameBefore, colNameAfter)


class NaDropper(CustomTransformer):
    """
    Drops rows with at least one not-a-number element
    """

    def __init__(self, cols=None):
        self.cols = cols


    def _transform(self, data):
        dataAfterDrop = data.dropna(subset=self.cols) 
        return dataAfterDrop


class ColumnCaster(CustomTransformer):

    def __init__(self, col, toType):
        self.col = col
        self.toType = toType

    def _transform(self, data):
        return data.withColumn(self.col, data[self.col].cast(self.toType))

They work, but I was wondering if this is a pattern or antipattern - are such transformers a good way to work with the pipeline API? Was it necessary to implement them, or is equivalent functionality provided somewhere else?

1
  • how do you call the custom transformers? Commented May 16, 2019 at 16:05

1 Answer 1

4

I'd say it is primarily opinion based, although it looks unnecessarily verbose and Python Transformers don't integrate well with the rest of the Pipeline API.

It is also worth pointing out that everything you have here can be easily achieved with SQLTransformer. For example:

from pyspark.ml.feature import SQLTransformer

def column_selector(columns):
    return SQLTransformer(
        statement="SELECT {} FROM __THIS__".format(", ".join(columns))
    )

or

def na_dropper(columns):
    return SQLTransformer(
        statement="SELECT * FROM __THIS__ WHERE {}".format(
            " AND ".join(["{} IS NOT NULL".format(x) for x in columns])
        )
    )

With a little bit of effort you can use SQLAlchemy with Hive dialect to avoid handwritten SQL.

Sign up to request clarification or add additional context in comments.

3 Comments

Could you elaborate on "Python Transformers don't integrate well with the rest of the Pipeline API"?
By default there are not MLWritable (although there are nice hacks).
Well, SQL is not the elegant alternative I had hoped for, but good answer nonetheless -> accept.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.