Postgres-XC 聚合原理 以及 如何编写聚合函数

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

Postgres-XC聚合与PostgreSQL的聚合有一定的区别, 因为Postgres-XC的数据存储在datanode, 聚合时数据可能分布在多个datanode上.
Postgres-XC支持传统的聚合方法, 聚合操作可以将数据从所有的数据节点传到coordinator节点后, 在coordinator节点进行聚合. 但是这种方法对于数据量较大的情况效率明显偏低.
Postgres-XC还支持另一种聚合方式, 就是数据在各自的datanode执行, 形成结果后, 将datanode聚合的结果传输到coordinator节点再次聚合.

集群聚合与单节点聚合的区别

Postgres-XC相比PostgreSQL的聚合增加了cfunc :
sfunc( internal-state, next-data-values ) ---> next-internal-state
这个过程是在datanode节点完成的. input 是该datanode节点上的所有行(一次1行的进行调用).

cfunc( internal-state, internal-state ) ---> next-internal-state
这个过程是在coordinator节点完成的. input是datanode节点的最终结果.

ffunc( internal-state ) ---> aggregate-value
这个过程是在coordinator节点完成的. input是cfunc的结果.

集群聚合函数的语法

CREATE AGGREGATE name ( input_data_type [ , ... ] ) (  
    SFUNC = sfunc,  
    STYPE = state_data_type  
    [ , CFUNC = cfunc ]  
    [ , FINALFUNC = ffunc ]  
    [ , INITCOND = initial_condition ]  
    [ , INITCOLLECT = initial_collection_condition ]  
    [ , SORTOP = sort_operator ]  
)  

单点聚合函数的语法

CREATE AGGREGATE name (  
    BASETYPE = base_type,  
    SFUNC = sfunc,  
    STYPE = state_data_type  
    [ , FINALFUNC = ffunc ]  
    [ , INITCOND = initial_condition ]  
    [ , SORTOP = sort_operator ]  
)  

sfunc和stype是必须的. 如果没有定义cfunc那么这个聚合就只支持传统的PostgreSQL聚合方法.
如果定义了cfunc, 那么这个聚合支持传统的聚合方法, 同时还支持分布式聚合方法.

例子

创建sfunc

digoal=# create or replace function d_sum(int,int) returns int as $$  
select $1+$2;  
$$ language sql strict;  

创建聚合

digoal=# create AGGREGATE d_sum(int)          
(  
sfunc=d_sum,  
stype=int,  
cfunc=d_sum,  
initcond='0',  
initcollect='0'  
);  

创建测试表

digoal=# create table t1(id int, info text) distribute by (id) to group gp0;  
digoal=# insert into t1 select generate_series(1,10),'test';  
digoal=# insert into t1 values (null,'test');  

比较d_sum和sum的聚合结果

digoal=# select d_sum(id) from t1;  
 d_sum   
-------  
    55  
(1 row)  
digoal=# select sum(id) from t1;  
 sum   
-----  
  55  
(1 row)  

比较sum和d_sum的执行计划

digoal=# explain (analyze,verbose,buffers) select d_sum(id) from t1;  
                                                              QUERY PLAN                                                              

------------------------------------------------------------------------------------------------------------------------------------  
---  
 Aggregate  (cost=250.00..250.01 rows=1 width=4) (actual time=1.657..1.657 rows=1 loops=1)  
   Output: d_sum((d_sum(t1.id)))  
   ->  Materialize  (cost=0.00..0.00 rows=0 width=0) (actual time=0.703..0.743 rows=5 loops=1)  
         Output: (d_sum(t1.id))  
         ->  Data Node Scan on "__REMOTE_GROUP_QUERY__"  (cost=0.00..0.00 rows=1000 width=4) (actual time=0.702..0.740 rows=5 loops=  
1)  
               Output: d_sum(t1.id)  
               Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5  
               Remote query: SELECT d_sum(group_1.id)  FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1     
 Total runtime: 1.689 ms  
(9 rows)  

d_sum, sum两者都是有了分布式聚合.

digoal=# explain (analyze,verbose,buffers) select sum(id) from t1;  
                                                              QUERY PLAN                                                              

------------------------------------------------------------------------------------------------------------------------------------  
---  
 Aggregate  (cost=2.50..2.51 rows=1 width=4) (actual time=0.674..0.674 rows=1 loops=1)  
   Output: pg_catalog.sum((sum(t1.id)))  
   ->  Materialize  (cost=0.00..0.00 rows=0 width=0) (actual time=0.507..0.659 rows=5 loops=1)  
         Output: (sum(t1.id))  
         ->  Data Node Scan on "__REMOTE_GROUP_QUERY__"  (cost=0.00..0.00 rows=1000 width=4) (actual time=0.507..0.655 rows=5 loops=  
1)  
               Output: sum(t1.id)  
               Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5  
               Remote query: SELECT sum(group_1.id)  FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1     
 Total runtime: 0.705 ms  
(9 rows)  

在没有cfunc的情况下, 数据必须从所有的datanode汇总到coordinator节点后执行sfunc聚合. 如下:

digoal=# drop AGGREGATE d_sum(int);  
DROP AGGREGATE  
digoal=# create AGGREGATE d_sum(int)   
(  
sfunc=d_sum,  
stype=int,  
initcond='0'  
);  
CREATE AGGREGATE  
digoal=# select d_sum(id) from t1;  
 d_sum   
-------  
    55  
(1 row)  
digoal=# explain (analyze,verbose,buffers) select d_sum(id) from t1;  
                                                 QUERY PLAN                                                   
------------------------------------------------------------------------------------------------------------  
 Aggregate  (cost=250.00..250.01 rows=1 width=4) (actual time=1.981..1.981 rows=1 loops=1)  
   Output: d_sum(id)  
   ->  Data Node Scan on t1  (cost=0.00..0.00 rows=1000 width=4) (actual time=0.567..0.610 rows=11 loops=1)  
         Output: id, info  
         Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5  
         Remote query: SELECT id, info FROM ONLY t1 WHERE true  
 Total runtime: 2.015 ms  
(7 rows)  

聚合数据处理流程简介

-1. 数据汇总聚合方法数据处理流程 :

Two phased aggregation - is used when the entire aggregation takes place on the Coordinator node.   
In first phase called transition phase, Postgres-XC creates a temporary variable of data type stype to hold the current internal state of the aggregate.   
创建初始变量 tmpvar(stype)  

At each input row, the aggregate argument value(s) are calculated and the state transition function is invoked with the current state value and the new argument value(s) to calculate a new internal state value.   
per row迭代计算tmpvar : sfunc(tmpvar, vardict parameters) -> tmpvar   

After all the rows have been processed, in the second phase or finalization phase the final function is invoked once to calculate the aggregate's return value.   
If there is no final function then the ending state value is returned as-is.  
final函数 : ffunc(tmpvar) -> result(对应聚合函数输出类型)   

-2. 分布式聚合方法数据处理流程 :

Three phased aggregation - is used when the process of aggregation is divided between Coordinator and Datanodes.   
In this mode, each Postgres-XC Datanode involved in the query carries out the first phase named transition phase.   
This phase is similar to the first phase in the two phased aggregation mode discussed above, except that, every Datanode applies this phase on the rows available at the Datanode.   
The result of transition phase is then transferred to the Coordinator node. Second phase called collection phase takes place on the Coordinator.   
Postgres-XC Coordinator node creates a temporary variable of data type stype to hold the current internal state of the collection phase.   
For every input from the Datanode (result of transition phase on that node), the collection function is invoked with the current collection state value and the new transition value (obtained from the Datanode) to calculate a new internal collection state value.   
After all the transition values from data nodes have been processed, in the third or finalization phase the final function is invoked once to calculate the aggregate's return value.   
If there is no final function then the ending collection state value is returned as-is.  

聚合中的数据类型简介

sfunc( internal-state, next-data-values ) ---> next-internal-state  
cfunc( internal-state, internal-state ) ---> next-internal-state  
ffunc( internal-state ) ---> aggregate-value  

聚合语法中的 input_data_type 对应 next-data-values
聚合语法中的 state_data_type 对应 internal-state
聚合最终输出类型有两种aggregate-value 或者 internal-state.
-1. 当定义了finalfunc时是finalfunc的返回类型.
-2. 当没有定义finalfunc时, 输出类型是cfunc或者sfunc的返回类型, 也就是internal-state类型, 既stype.
从流程来分析, sfunc的第一个参数的数据类型以及返回数据类型都必须和stype定义的类型相同.
sfunc的第二个参数必须和聚合的input类型相同.
cfunc的两个参数以及返回类型都必须和stype定义的类型相同.
ffunc的输入参数必须和stype定义的类型相同.
所以整个聚合涉及了3个数据类型:
-1. 聚合的输入类型,
-2. stype , (同时initcond, initcollect类型=stype);
-3. 以及 finalfunc的返回类型.

聚合对空值的处理简介

与sfunc,cfunc,finalfunc的定义有关, 是否strict. 是则忽略null值, 不进行调用.
不是strict则需要函数自己处理null值. 所以要特别小心.

An aggregate function can provide an initial condition, that is, an initial value for the internal transition or collection state value. This is specified and stored in the database as a value of type text, but it must be a valid external representation of a constant of the state value data type. If it is not supplied then the state value starts out null.  

If the collection function is declared "strict", then it cannot be called with null inputs. With such a collection function, aggregate execution behaves as follows. Null state transition results are ignored (the function is not called and the previous collection state value is retained). If the initial state value is null, then at the first non-null state transition result replaces the collection state value, and the collection function is invoked at subsequent rows with all-nonnull transition values. This is handy for implementing aggregates like max.  

If the state transition function is declared "strict", then it cannot be called with null inputs. With such a transition function, aggregate execution behaves as follows. Rows with any null input values are ignored (the function is not called and the previous state value is retained). If the initial state value is null, then at the first row with all-nonnull input values, the first argument value replaces the state value, and the transition function is invoked at subsequent rows with all-nonnull input values. This is handy for implementing aggregates like max. Note that this behavior is only available when state_data_type is the same as the first input_data_type. When these types are different, you must supply a nonnull initial condition or use a nonstrict transition function.  

If the state transition and/or collection function is not strict, then it will be called unconditionally at each input row, and must deal with null inputs and null transition/collection values for itself. This allows the aggregate author to have full control over the aggregate's handling of null values.  

If the final function is declared "strict", then it will not be called when the ending state value is null; instead a null result will be returned automatically. (Of course this is just the normal behavior of strict functions.) In any case the final function has the option of returning a null value. For example, the final function for avg returns null when it sees there were zero input rows.  

例如count, id有一条为null :

digoal=# select count(id) from t1;  
 count   
-------  
    10  
(1 row)  
digoal=# select count(*) from t1;  
 count   
-------  
    11  
(1 row)  

假如把d_sum(int,int)函数改成called on null input.
这种情况下, 即使row取到的是空值也会调用func. 因此null+val=null, 最后得到的就是null了.

digoal=# alter function d_sum(int,int) called on null input;  
ALTER FUNCTION  
digoal=# select d_sum(id) from t1;  
 d_sum   
-------  

(1 row)  

聚合的优化

使用btree.

sort_operator  
The associated sort operator for a MIN- or MAX-like aggregate. This is just an operator name (possibly schema-qualified). The operator is assumed to have the same input data types as the aggregate (which must be a single-argument aggregate).  

Aggregates that behave like MIN or MAX can sometimes be optimized by looking into an index instead of scanning every input row. If this aggregate can be so optimized, indicate it by specifying a sort operator. The basic requirement is that the aggregate must yield the first element in the sort ordering induced by the operator; in other words:  

SELECT agg(col) FROM tab;  
must be equivalent to:  
SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;  

Further assumptions are that the aggregate ignores null inputs, and that it delivers a null result if and only if there were no non-null inputs. Ordinarily, a data type's < operator is the proper sort operator for MIN, and > is the proper sort operator for MAX. Note that the optimization will never actually take effect unless the specified operator is the "less than" or "greater than" strategy member of a B-tree index operator class.  

定义类似count(*)这样的无参数聚合

digoal=# select * from pg_aggregate where aggfnoid::text ~ 'count';  
     aggfnoid     |      aggtransfn       |   aggcollectfn   | aggfinalfn | aggsortop | aggtranstype | agginitval | agginitcollect   
------------------+-----------------------+------------------+------------+-----------+--------------+------------+----------------  
 pg_catalog.count | int8inc_any           | int8_sum_to_int8 | -          |         0 |           20 | 0          | 0  
 pg_catalog.count | int8inc               | int8_sum_to_int8 | -          |         0 |           20 | 0          | 0  
digoal=# \df+ int8inc_any  
                                                                        List of functions  
   Schema   |    Name     | Result data type | Argument data types |  Type  | Volatility |  Owner   | Language | Source code |        
      Description               
------------+-------------+------------------+---------------------+--------+------------+----------+----------+-------------+------  
------------------------------  
 pg_catalog | int8inc_any | bigint           | bigint, "any"       | normal | immutable  | postgres | internal | int8inc_any | incre  
ment, ignores second argument  
(1 row)  

digoal=# \df+ int8inc  
                                                           List of functions  
   Schema   |  Name   | Result data type | Argument data types |  Type  | Volatility |  Owner   | Language | Source code | Descripti  
on   
------------+---------+------------------+---------------------+--------+------------+----------+----------+-------------+----------  
---  
 pg_catalog | int8inc | bigint           | bigint              | normal | immutable  | postgres | internal | int8inc     | increment  
(1 row)  
digoal=# select proisstrict from pg_proc where proname='int8inc';  
 proisstrict   
-------------  
 t  
(1 row)  
digoal=# select proisstrict from pg_proc where proname='int8inc_any';  
 proisstrict   
-------------  
 t  
(1 row)  

以上count实际上分了两种, 一种是count(列名字), 一种是count(*). int8inc_any用于count(列名), int8inc用于count(*).
举例 :

-- 创建sfunc  
digoal=# create or replace function d_count(int8) returns int8 as $$       
select $1+1;   
$$ language sql strict;  
-- 创建cfunc  
digoal=# create or replace function d_count(int8,int8) returns int8 as $$  
select $1+$2;  
$$ language sql strict;  
-- 创建聚合  
digoal=# create aggregate d_count(*)                                       
(              
sfunc=d_count,  
stype=int8,  
cfunc=d_count,  
initcond='0',  
initcollect='0'  
);  
digoal=# select d_count(*) from t1;  
 d_count   
---------  
      11  
(1 row)  
digoal=# explain (analyze,verbose,buffers) select d_count(*) from t1;  
                                                              QUERY PLAN                                                              

------------------------------------------------------------------------------------------------------------------------------------  
---  
 Aggregate  (cost=250.00..250.01 rows=1 width=0) (actual time=1.308..1.308 rows=1 loops=1)  
   Output: sj.d_count(*)  
   ->  Materialize  (cost=0.00..0.00 rows=0 width=0) (actual time=0.712..0.725 rows=5 loops=1)  
         Output: (d_count(*))  
         ->  Data Node Scan on "__REMOTE_GROUP_QUERY__"  (cost=0.00..0.00 rows=1000 width=0) (actual time=0.712..0.721 rows=5 loops=  
1)  
               Output: d_count(*)  
               Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5  
               Remote query: SELECT d_count(*)  FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1     
 Total runtime: 1.339 ms  
(9 rows)  

注意, 为什么d_count(id)得到的结果不是10呢? 因为我没有定义d_count("any"), 只定义了d_count(*) :

digoal=# select d_count(id) from t1;        
 d_count   
---------  
       2  
       3  
       4  
       9  
      10  
       5  
       6  
       7  
       8  
      11  

(11 rows)  

因为这里的d_count是普通函数, 而不是聚合.
实际上调用的是d_count(int8)函数.
将sfunc,cfunc改名后就更能看出来了.

digoal=# alter function d_count(int8) rename to s_d_count;  
ALTER FUNCTION  
Time: 18.591 ms  
digoal=# alter function d_count(int8,int8) rename to c_d_count;  
ALTER FUNCTION  
Time: 4.049 ms  
digoal=# create aggregate d_count(*)                                       
(              
sfunc=s_d_count,  
stype=int8,  
cfunc=c_d_count,  
initcond='0',  
initcollect='0'  
);  
CREATE AGGREGATE  
Time: 21.752 ms  
digoal=# explain (analyze,verbose,buffers) select d_count(*) from t1;  
                                                              QUERY PLAN                                                              

------------------------------------------------------------------------------------------------------------------------------------  
---  
 Aggregate  (cost=250.00..250.01 rows=1 width=0) (actual time=1.732..1.732 rows=1 loops=1)  
   Output: sj.d_count(*)  
   ->  Materialize  (cost=0.00..0.00 rows=0 width=0) (actual time=0.986..1.008 rows=5 loops=1)  
         Output: (d_count(*))  
         ->  Data Node Scan on "__REMOTE_GROUP_QUERY__"  (cost=0.00..0.00 rows=1000 width=0) (actual time=0.985..1.002 rows=5 loops=  
1)  
               Output: d_count(*)  
               Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5  
               Remote query: SELECT d_count(*)  FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1     
 Total runtime: 1.766 ms  
(9 rows)  

Time: 3.088 ms  
digoal=# explain (analyze,verbose,buffers) select d_count(id) from t1;  
ERROR:  function d_count(integer) does not exist  
LINE 1: explain (analyze,verbose,buffers) select d_count(id) from t1...  
                                                 ^  
HINT:  No function matches the given name and argument types. You might need to add explicit type casts.  

定义类似count(*)和count(列名)的函数还有一个count(列名)是怎么定义的呢

digoal=# create or replace function s_d_count(int8,"anyelement") returns int8 as $$  
select $1+1;  
$$ language sql strict;  
CREATE FUNCTION  
Time: 4.715 ms  
digoal=# create aggregate d_count("anyelement")                                       
(              
sfunc=s_d_count,  
stype=int8,  
cfunc=c_d_count,  
initcond='0',  
initcollect='0'  
);  
CREATE AGGREGATE  
Time: 69.536 ms  
digoal=# select d_count(id) from t1;  
 d_count   
---------  
      10  
(1 row)  
Time: 3.694 ms  
digoal=# explain (analyze,verbose,buffers) select d_count(id) from t1;  
                                                              QUERY PLAN                                                              

------------------------------------------------------------------------------------------------------------------------------------  
---  
 Aggregate  (cost=250.00..250.01 rows=1 width=4) (actual time=1.913..1.913 rows=1 loops=1)  
   Output: d_count((d_count(t1.id)))  
   ->  Materialize  (cost=0.00..0.00 rows=0 width=0) (actual time=0.893..0.923 rows=5 loops=1)  
         Output: (d_count(t1.id))  
         ->  Data Node Scan on "__REMOTE_GROUP_QUERY__"  (cost=0.00..0.00 rows=1000 width=4) (actual time=0.892..0.917 rows=5 loops=  
1)  
               Output: d_count(t1.id)  
               Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5  
               Remote query: SELECT d_count(group_1.id)  FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1     
 Total runtime: 1.947 ms  
(9 rows) 

第二个d_count聚合复用了前面定义的c_d_count(int8,int8).

digoal=# select * from pg_aggregate where aggfnoid::text ~ 'd_count';  
  aggfnoid  |  aggtransfn  | aggcollectfn | aggfinalfn | aggsortop | aggtranstype | agginitval | agginitcollect   
------------+--------------+--------------+------------+-----------+--------------+------------+----------------  
 sj.d_count | sj.s_d_count | c_d_count    | -          |         0 |           20 | 0          | 0  
 sj.d_count | sj.s_d_count | c_d_count    | -          |         0 |           20 | 0          | 0  
(2 rows)  
Time: 2.811 ms  

[注意]

-1. 如果你定义的聚合既支持分布式聚合同时又支持传统的数据汇总聚合, Postgres-XC会根据成本选择合适的聚合方法, 所以在这种情况下, 必须确保两种聚合方法得到的聚合结果是一致的, 否则会有问题.
-2. 聚合最好不要与函数名重复. 否则会难以排错.

[参考]

-1. http://postgres-xc.sourceforge.net/docs/1_0_3/xaggr.html
-2. http://postgres-xc.sourceforge.net/docs/1_0_3/sql-createaggregate.html
-3. http://blog.163.com/digoal@126/blog/static/16387704020121118112533410/

相关文章
|
8月前
|
SQL HIVE
Hive学习---4、函数(单行函数、高级聚合函数、炸裂函数、窗口函数)(二)
Hive学习---4、函数(单行函数、高级聚合函数、炸裂函数、窗口函数)(二)
|
关系型数据库 PostgreSQL
PostgreSQL listagg within group (order by) 聚合兼容用法 string_agg ( order by) - 行列变换,CSV构造...
标签 PostgreSQL , order-set agg , listagg , string_agg , order 背景 listagg — Rows to Delimited Strings The listagg function transforms values from a g...
5824 0
|
8月前
|
SQL JSON Java
Hive学习---4、函数(单行函数、高级聚合函数、炸裂函数、窗口函数)(一)
Hive学习---4、函数(单行函数、高级聚合函数、炸裂函数、窗口函数)(一)
|
9月前
|
关系型数据库
GreenPlum和openGauss进行简单聚合时对扫描列的区别
GreenPlum和openGauss进行简单聚合时对扫描列的区别
85 0
|
11月前
|
缓存 自然语言处理 数据挖掘
白话Elasticsearch50-深入聚合数据分析之基于doc values正排索引的聚合内部原理
白话Elasticsearch50-深入聚合数据分析之基于doc values正排索引的聚合内部原理
78 0
|
SQL HIVE 开发者
Hive 高阶--分组窗口函数--聚合函数集成分组函数(SUM)|学习笔记
快速学习 Hive 高阶--分组窗口函数--聚合函数集成分组函数(SUM)
246 0
|
SQL
SQL 分组查询 -简述及举例
SQL 分组查询 -简述及举例
137 0
|
SQL XML 数据格式
SQL字符串的分组聚合(ZT)
本文转载于T-Sql:字符串分组聚合,也许你还有更简单的办法?         今天在看订阅的RSS的时候,看到这么一个问题:T-Sql中如何对分组的信息进行聚合,并以逗号连接字符;也就是对一个表中的某个字段进行分组,然后对另一个字段聚合,如果表达得不太清楚,请看下面的表。
942 0
|
关系型数据库 索引
MySQL · 源码分析 · 聚合函数(Aggregate Function)的实现过程
--- title: MySQL · 源码分析 · 聚合函数(Aggregate Function)的实现过程 author: 道客 --- ## 总览 聚合函数(Aggregate Function)顾名思义,就是将一组数据进行统一计算,常常用于分析型数据库中,当然在应用中是非常重要不可或缺的函数计算方式。比如我们常见的COUNT/AVG/SUM/MIN/MAX等等。本文主要分析下
1814 0