5

I want to use multiprocessing on a model to get predictions using a data frame as input. I have the following code:

def perform_model_predictions(model, dataFrame, cores=4): 
    try:
        with Pool(processes=cores) as pool:
            result = pool.map(model.predict, dataFrame)
            return result
        # return model.predict(dataFrame)
    except AttributeError:
        logging.error("AttributeError occurred", exc_info=True)

The error I'm getting is:

raise TypeError("sparse matrix length is ambiguous; use getnnz()"
TypeError: sparse matrix length is ambiguous; use getnnz() or shape[0]

I think the issue is with the fact that I'm passing in a data frame as the second parameter to the pool.map function. Any advice or help would be appreciated.

1 Answer 1

2

The trick is to split your dataframe into chunks. map expects a list of objects that are going to be processed by the model.predict. Here's a full working example, with model obviously mocked:

import numpy as np
import pandas as pd
from multiprocessing import Pool

no_cores = 4

large_df = pd.concat([pd.Series(np.random.rand(1111)), pd.Series(np.random.rand(1111))], axis = 1)
chunk_size = len(large_df) // no_cores + no_cores
chunks = [df_chunk for g, df_chunk in large_df.groupby(np.arange(len(large_df)) // chunk_size)]

class model(object):
    @staticmethod
    def predict(df):
        return np.random.randint(0,2)

def perform_model_predictions(model, dataFrame, cores): 
    try:
        with Pool(processes=cores) as pool:
            result = pool.map(model.predict, dataFrame)
            return result
        # return model.predict(dataFrame)
    except AttributeError:
        logging.error("AttributeError occurred", exc_info=True)

perform_model_predictions(model, chunks, no_cores)

Mind that the number of chunks here is selected such that it matches number of cores (or simply any number you want to allocate). This way each core gets a fair share and multiprocessing does not spend much time on object serialization.

If you'd like to process each row (pd.Series) separately, time spent on serialization could be a concern. In such case I'd recommend using joblib and reading docs on its various backends. I did not write on it as it seemed you want to call predict on pd.Dataframe.

Extra warning

It can happen that multiprocessing, instead of getting you better performance, will make it worse. It happens in rather rare situations when your model.predict calls external modules that themselves spawn threads. I wrote about the issue here. Long story short, joblib again could be an answer.

Sign up to request clarification or add additional context in comments.

11 Comments

I tried that using: data = class_name.perform_model_transformation(model, transformed_data) chunk_size = len(data) // cores + cores chunks = [df_chunk for g, df_chunk in data.groupby(np.arange(len(data)) // chunk_size)] labelled_data = class_name.perform_model_predictions(model, chunks, cores) and I got the same error: sparse matrix length is ambiguous; use getnnz() or shape[0]
The error is coming from your predict code and we have no idea what's happening there, Your original code was incorrect as it was passing a dataframe where iterable element is expected. Copy & paste my code to see that it works. Adjust predict code that it works with the dataframe.
For reference: the type of data is: <class 'scipy.sparse.csr.csr_matrix'> and the shape is (49, 800)
Then your title and text is misleading, as it makes people think you're using dataframes. You can use the same logic as presented here to chop the matrix into smaller chunks.
@YihanBao is clear to you how to chop your matrix in chunks?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.