2

As Postgresql documentation states in # 36.12.4. Partial Aggregation aggregate functions can support partial aggregation. To do this with custom aggregate function as again the documentation says we have to define COMBINEFUNC. I have tried to do this, but it is never called. Is there any example of how do call this partial aggregation correctly. I didn't find either.

The general idea is that I have for example a table

type value
1 1
1 2
2 3
2 4

And I want sum values for each type (eg, for type=1 - 1+2=3 and for type=2 - 3+4=7) and then combine it with by for example multiplying the subset results (eg, 3*7=21).

I thought it could be achieved with partial aggregation but I didn't find how to do this correctly. Or maybe I do it completely wrong?

Update

Adding example SQL

CREATE OR REPLACE FUNCTION my_sum_sfunc(state integer, value integer) 
RETURNS integer 
LANGUAGE plpgsql
AS $$
BEGIN
    RAISE NOTICE 'SUM % + %', state, value;
    RETURN state + value;
END;
$$;

CREATE OR REPLACE FUNCTION my_sum_combinefunc(state1 integer, state2 integer)
RETURNS integer 
LANGUAGE plpgsql
AS $$
BEGIN
    RAISE NOTICE 'COMBINE % * %', state1, state2;
    RETURN state1 * state2;
END;
$$;

CREATE AGGREGATE my_sum(integer) (
    SFUNC = my_sum_sfunc,
    STYPE = integer,
    INITCOND = '0',
    COMBINEFUNC = my_sum_combinefunc,
    PARALLEL = SAFE
);

CREATE TABLE my_table (
    id integer PRIMARY KEY,
    "type" integer NOT NULL,
    value integer
);

INSERT INTO my_table (id, "type", value)
VALUES
    (1, 1, 1),
    (2, 1, 2),
    (3, 2, 3),
    (4, 2, 4);

I've tried simple

SELECT my_sum(v.value)
FROM my_table v

Also tried

SELECT my_sum(v.value) OVER (PARTITION BY v."type")
FROM my_table v

None of these makes call to combine function


Update 2 Posting my full bitwise aggregate functions as asked

CREATE OR REPLACE FUNCTION bit_mask_and_state(agg_state integer[], cur_value integer[])
 RETURNS integer[]
 LANGUAGE plpgsql
 IMMUTABLE
AS $function$
DECLARE res integer[];
BEGIN
    select array_agg(coalesce(a,x'FFFFFFFF'::integer) & coalesce(b,x'FFFFFFFF'::integer)) as s
    into res
    from unnest(agg_state, cur_value) x(a,b);
    return res;
END;
$function$
;

CREATE OR REPLACE AGGREGATE bit_array_and(integer[]) (
    SFUNC = bit_mask_and_state,
    STYPE = integer[]
);

CREATE OR REPLACE FUNCTION bit_mask_or_state(agg_state integer[], cur_value integer[])
 RETURNS integer[]
 LANGUAGE plpgsql
 IMMUTABLE
AS $function$
DECLARE res integer[];
BEGIN
    select array_agg(coalesce(a,0) | coalesce(b,0)) as s
    into res
    from unnest(agg_state, cur_value) x(a,b);
    return res;
END;
$function$
;

CREATE OR REPLACE AGGREGATE bit_array_or(integer[]) (
    SFUNC = bit_mask_or_state,
    STYPE = integer[]
);

Tried to create aggregate like this

CREATE AGGREGATE my_sum(integer[]) (
    SFUNC = bit_mask_or_state,
    STYPE = integer[],
    COMBINEFUNC = bit_mask_or_state,
    PARALLEL = SAFE
);

But it didn't work as I expected. Ended up with the following

    select bit_mask_and_state(
            bit_array_or(r.mask_array) FILTER (WHERE r."type" = 1),
            bit_array_or(r.mask_array) FILTER (WHERE r."type" = 2)
        )
    from ... r
7
  • From the docs you linked: "This mode can be used for parallel aggregation by having different worker processes scan different portions of a table." (emphasis mine) and "*Worth noting also is that for an aggregate to be executed in parallel, the aggregate itself must be marked PARALLEL SAFE." Did you actually test this on a table where the query planner did consider it advantageous to use multiple workers? Commented Oct 30 at 1:48
  • Please show us the code you tried, including the definitions of the functions, the definition of the aggregate, the definition of the table and the data you generated, the query you ran, the settings you ran it under, and ideally a detailed query plan. Commented Oct 30 at 1:50
  • 1
    @JohnH: Don't just vote with the crowd if you don't actually understand the question Commented Oct 30 at 7:09
  • 1
    @Dale_K: Don't just vote with the crowd if you don't actually understand the question Commented Oct 30 at 7:09
  • 2
    @NickBarnes The question does not say "parallel" but the doc it referenced, does. They're both quoting different sections of the same chapter. Also, Bergi's right on the money: bumping up the sample size to 300k values results in a parallel aggregate visible in the plan, and also evident by the result being always zero (the initcond is 0 by which it keeps multiplying when combining partial results, so it ends up 0) db<>fiddle1 If there's not enough rows, it only uses the base aggregate: db<>fiddle2 Commented Oct 30 at 8:01

2 Answers 2

1

Your aggregate definition looks correct.

The reason that the combine function is never called is that PostgreSQL doesn't choose a parallel plan, since the table is so tiny. Insert many more rows into my_table, and PostgreSQL will consider a parallel plan. Use EXPLAIN to see the execution plan — if there is a Gather node, PostgreSQL uses a parallel plan.

Sign up to request clarification or add additional context in comments.

Comments

1

sum values for each type (eg, for type=1 - 1+2=3 and for type=2 - 3+4=7) and then combine it with by for example multiplying the subset results (eg, 3*7=21).

That sounds like you need a nested aggregate, not a partial aggregate (not necessarily):
demo at db<>fiddle

select my_multiply_agg(sum_per_type)
from(select sum(value) as sum_per_type
     from my_table
     group by type) as subquery;
my_multiply_agg
21

It's using the regular, built-in sum() in a subquery (you can also use a CTE) to collect 1+2=3 for type 1 and 3+4=7 for type 2, then it multiplies those using a custom aggregate:

CREATE OR REPLACE FUNCTION my_multiply_agg_sfunc(state numeric, value bigint) 
RETURNS numeric RETURN state * value;

CREATE OR REPLACE FUNCTION my_multiply_agg_combinefunc(state1 numeric, state2 numeric)
RETURNS numeric STRICT RETURN state1 * state2;

CREATE AGGREGATE my_multiply_agg(bigint) (
    SFUNC = my_multiply_agg_sfunc,
    STYPE = numeric,
    INITCOND = '1',
    COMBINEFUNC = my_multiply_agg_combinefunc,
    PARALLEL = SAFE
);

Partial aggregation is there only to let Postgres speed things up in some scenarios (e.g. high row count, partitioning), by splitting the work between parallel workers. It's good practice to add the feature to user-defined functions, but it's not strictly necessary for what you're doing.

Also, it does not expose any way to control which rows end up in each part, which seems to be a requirement in the logic you laid out. On its own it wouldn't let you implement it.

Myself included, it seems that most people here initially focused on the partial/parallel aggregate aspect way more than the actual thing you described you're trying to do.


The functions you originally showed would always get you a zero if parallel execution kicked in. The initcond should typically be a neutral element - in multiplication, that's 1. It is 0 when you add things. You're mixing both.

Also, if the aggregate definition specifies a non-null initcond, keep in mind that that will be used not only as the initial state for each partial aggregation run, but also as the initial state for the combine function, which will be called to combine each partial result into that state.

CREATE OR REPLACE FUNCTION my_sum_sfunc(state bigint, value integer) 
RETURNS bigint RETURN coalesce(state,0) + value;

CREATE OR REPLACE FUNCTION my_sum_combinefunc(state1 bigint, state2 bigint)
RETURNS bigint STRICT RETURN state1*state2;

CREATE AGGREGATE my_sum(integer) (
    SFUNC = my_sum_sfunc,
    STYPE = bigint,
    --initcond replaced by `coalesce()` in sfunc and making combinefunc strict
    COMBINEFUNC = my_sum_combinefunc,
    PARALLEL = SAFE
);

After "fixing" that, you'll find your function is nondeterministic. The order of + and * is significant: 2*3+4 =10 but 2+3*4 =14. Since there's no way to guarantee which parallel worker will pick which subset of rows, same input can lead to different outputs at different times.

Another thing to keep in mind is that regular sum() of int returns bigint, otherwise it'd risk overflow. Since you end up multiplying sums, that's even more of a concern in your case. Above bigint, you'll need numeric, which is what I switched to, in the example up top.

These are logical problems but functionally, your functions are fine. As immediately pointed out by @Bergi, confirmed by @Laurenz Albe and demonstrated on db<>fiddle, you just need more rows to trigger parallel execution. Here's your example on 300k:

explain analyze
SELECT my_sum(v.value)
FROM my_table v;
QUERY PLAN
Finalize Aggregate (cost=48504.82..48504.83 rows=1 width=8) (actual time=5025.803..5025.879 rows=1 loops=1)

  -> Gather (cost=48504.46..48504.57 rows=1 width=8) (actual time=5020.055..5025.590 rows=2 loops=1)

        Workers Planned: 1

        Workers Launched: 1

        -> Partial Aggregate (cost=47504.46..47504.47 rows=1 width=8) (actual time=5008.662..5008.663 rows=1 loops=2)

              -> Parallel Seq Scan on my_table v (cost=0.00..3386.71 rows=176471 width=4) (actual time=0.012..81.643 rows=150000 loops=2)

Planning Time: 0.212 ms
Execution Time: 5025.948 ms
SELECT my_sum(v.value)
FROM my_table v;
my_sum
0

7 Comments

Sorry for confusing. This code is just an example. My real functions are different: stype is integer[], sfunc makes bitwise OR on array elements and combinefunc should do bitwise AND on aggregated sets. But it doesn't. The example is different but the problem is the same
Also I should say that expected number of rows in set is about 2-4. So there would be no parallel execution as I expect
I don't know enough to say for sure but I have a suspicion you might be actually looking for a nested aggregate, not a partial one. Re-reading your question, it sounds like you might want to first sum(value)..group by type in a subquery/CTE, and once you have a sum-per-type, you want to multiply them all together in the main/outer/top-level query. Same with the bitwise OR/AND example.
At this point, editing the question to add or swap out the example for to the bitwise operations you described, might derail the thread by changing or overextending the scope (it already went the partial aggregation way, which, if I'm understanding your intent right, is a blind alley), so it might be better if you simply post a new one, asking specifically about the example you're really working with (int[] with bit_or() and bit_and()), showing a sample input vs desired output. The partial aggregation would only be worth a mention as one of the things you tried.
I've added an edit showing nested aggregates but I'd still be interested in taking a look at your actual example with the bit_or() and bit_and().
Updated post with real functions
Thanks. I'm afraid feeding two filtered aggregates into a scalar function will force you to go back and extend this code whenever a new type comes up. However, if by design there can only ever be at most two types, I guess the flexibility of simply nesting aggregates gives you no added value in this particular case. The fact that both functions are set up for aggregate calls but you ended up using only the base function for one of them the way you did, smells like you might've had initially also tried nesting them directly. If so, unfortunately, the subquery/CTE is required for that to work

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.