0

I have a numba UDF:

@numba.jit(nopython=True)
def generate_sample_numba(cumulative_dollar_volume: np.ndarray, dollar_tau: Union[int, np.ndarray]) -> np.ndarray:
        """ Generate the sample using numba for speed.
        """
        covered_dollar_volume = 0
        bar_index = 0
        bar_index_array = np.zeros_like(cumulative_dollar_volume, dtype=np.uint32)
        
        if isinstance(dollar_tau, int):
            dollar_tau = np.array([dollar_tau] * len(cumulative_dollar_volume))

        for i in range(len(cumulative_dollar_volume)):
            bar_index_array[i] = bar_index
            if cumulative_dollar_volume[i] >= covered_dollar_volume + dollar_tau[i]:
                bar_index += 1
                covered_dollar_volume = cumulative_dollar_volume[i]
        return bar_index_array

The UDF takes two inputs:

  1. The cumulative_dollar_volume numpy array, which is essentially the groups in group_by
  2. The dollar_tau threshold, which is either an integer or numpy array.

In this question, I am particularly interested in the numpy array configuration. This post well explains the idea behind the generat_sample_numba function.

I want to achieve the same results from Pandas by using polars:

data["bar_index"] = data.groupby(["ticker", "date"]).apply(lambda x: generate_sample_numba(x["cumulative_dollar_volume"].values, x["dollar_tau"].values)).explode().values.astype(int)

Apprently, the best option in Polars is by group_by().agg(pl.col().map_batehces():

cqt_sample = cqt_sample.with_columns(
    (pl.col("price") * pl.col("size")).alias("dollar_volume")).with_columns(
    pl.col("dollar_volume").cum_sum().over(["ticker", "date"]).alias("cumulative_dollar_volume"),
    pl.lit(1_000_000).alias("dollar_tau")
    )

(cqt_sample
    .group_by(["ticker", "date"])
    .agg(pl.col(["cumulative_dollar_volume", "dollar_tau"])
         .map_batches(lambda x: generate_sample_numba(x["cumulative_dollar_volume"].to_numpy(), 1_000_000))
                      )#.alias("bar_index")
                      )#.explode("bar_index")

but the map_bathces() method seems to throw some strange results.`

However, when I use the integer dollar_tau with one input column it works fine:

(cqt_sample
    .group_by(["ticker", "date"])
    .agg(pl.col("cumulative_dollar_volume")
         .map_batches(lambda x: generate_sample_numba(x.to_numpy(), 1_000_000))
                      ).alias("bar_index")
                      ).explode("bar_index")
5
  • 3
    It's a bit unclear what the problem is. The only information we currently have is "strange results". Can you provide cqt_sample in order to make your example runnable, along with the expected output? Commented Feb 27, 2024 at 7:01
  • I apologize for the confusion, you can find 3 days of raw cqt datasets here. In terms of “strange results” I am getting two columns of lists: cumulative dollar volume and dollar tau transformed by the UDF. However, the dollar tau is supposed to help the UDF to determine the threshold. Commented Feb 27, 2024 at 7:08
  • I would use “ticker” for the identifier and [“TSLA”, “MMM”, LCFY.W”] for testing. Commented Feb 27, 2024 at 7:10
  • 1
    Thank you for the links but that is a lot of data. Just a small 10-row sample to make your code runnable should be enough to show the issue. As for the problem pl.col("a", "b").map_batches is shorthand for pl.col("a").map_batches, pl.col("b").map_batches - but you want to pass multiple items at the same time so would likely need a struct: pl.struct("a", "b").map_batches Commented Feb 27, 2024 at 7:41
  • Function names have since changed, but it's essentially this: stackoverflow.com/a/71679606 Commented Feb 27, 2024 at 7:44

2 Answers 2

2

As suggested in the comments, you'll need to call pl.Expr.map_batches on a struct column that contains all information needed by the function. Inside the function, you then pick the struct apart to obtain the desired information.

(
    data
    .group_by(["ticker", "date"])
    .agg(
        pl.struct("cumulative_dollar_volume", "dollar_tau").map_batches(lambda x: \
            generate_sample_numba(
                x.struct.field("cumulative_dollar_volume").to_numpy(),
                dollar_tau=x.struct.field("dollar_tau").to_numpy()
            )
        )
        .alias("bar_index")
    )
).explode("bar_index")
Sign up to request clarification or add additional context in comments.

8 Comments

@KevinLi maintain_order=True ensures that the order of the groups is consistent with the input data, i.e. if ticker A came before ticker B in the input, the same is true for the output.
@KevinLi As we don't assign an alias to the column created by map_batches, it will automatically be named "cumulative_dollar_volume" (as it is the first column used in the expression). The returned column will be a list column that contains the samples. explode is then used to obtain a separate row for each sample.
@KevinLi If the dataframe was sorted by the group column before using group_by, sorting afterwards will give the same result as maintain_order=True, yes.
@KevinLi Sure, you can add .alias("new_name") at the end of the expression within .agg and, then, use .explode("new_name").
@KevinLi I've approved the suggested edit and included the alias. However, I've removed the final sort as it is not really contributing the original solution.
|
0

The origianl apporach is to apply the map_bathces method with group_by where a new DataFrame would be genreated. However, if we still want to access the information from the original DataFrame, the windows function is a lot better choice, instead of merging the newly generated column(s) to the original one:

udf_expression = (
    pl.struct(["cumulative_volume", self.volume_tau])
    .map_batches(lambda x: self.generate_sample_numba(x.struct.field("cumulative_volume").to_numpy(), x.struct.field(self.volume_tau).to_numpy()))
    .over([self.identifier_col, self.date_col])
    .alias("bar_index")
    )

data = (data
        .filter(self.cqt_filter)
        .with_columns(
            pl.col(self.size_col).cum_sum().over([self.identifier_col, self.date_col]).alias("cumulative_volume")
            )
        .with_columns(udf_expression))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.