一个Citus集群由一个coordinator实例和多个worker实例组成。数据在工作人员上进行分片和复制,协调器存储有关这些分片的元数据。对集群的所有查询都通过协调器执行。协调器将查询分成更小的查询片段,其中每个查询片段都可以在分片上独立运行。协调器然后将查询片段分发给工作人员,监督他们的执行,合并他们的结果,并将最终结果返回给用户。下图可以简要描述查询处理架构。Citus的查询处理管道包括两个组件:分布式查询规划器和执行器以及PostgreSQL规划器和执行器我们将在后续部分中更详细地讨论它们。DistributedQueryPlannerCitus的DistributedQueryPlanner采用SQL查询并计划它以进行分布式执行。对于SELECT查询,规划器首先创建输入查询的计划树并将其转换为可交换和关联的形式,以便可以并行化。它还应用了一些优化来确保以可扩展的方式执行查询并最大限度地减少网络I/O。接下来,规划器将查询分成两部分——在协调器上运行的协调器查询和在工作器上的各个分片上运行的工作器查询片段。然后规划器将这些查询片段分发给工作人员,以便他们的所有资源得到有效利用。这一步之后,分布式查询计划被传递给分布式执行器执行。对于分布式列上的键值查找或修改查询,计划过程略有不同,因为它们恰好命中一个分片。一旦规划器收到传入查询,它需要决定查询应该路由到的正确分片。为此,它提取传入行中的分布列并查找元数据以确定查询的正确分片。然后规划器重写命令的SQL以引用分片表而不是原始表。然后将这个重写的计划传递给分布式执行器。分布式查询执行器Citus的分布式执行器运行分布式查询计划并处理故障。Executors非常适合快速响应涉及过滤器、聚合和并置连接的查询,以及运行具有完整SQL覆盖率的单租户查询。它根据需要为每个分片打开一个到woker的连接,并将所有片段查询发送给它们。然后它从每个片段查询中获取结果,合并它们,并将最终结果返回给用户。子查询/CTE推拉执行如有必要,Citus可以将子查询和CTE的结果收集到协调器节点中,并将它们推回工作程序以供外部查询使用。这允许Citus支持更广泛的SQL结构。例如,在WHERE子句中包含子查询有时不能与主查询同时内联执行,而必须单独执行。假设Web分析应用程序维护一个按page_id分区的page_views表。要查找访问量最大的前20个页面上的访问者主机数,我们可以使用子查询来查找页面列表,然后使用外部查询来计算主机数。SELECTpage_id,count(distincthost_ip)FROMpage_viewsWHEREpage_idIN(SELECTpage_idFROMpage_viewsGROUPBYpage_idORDERBYcount(*)DESCLIMIT20)GROUPBYpage_id;执行者希望通过page_id为每个分片运行此查询的片段,计算不同的host_ips并在协调器上组合结果。但是,子查询中的LIMIT意味着子查询不能作为片段的一部分执行。通过递归规划查询,Citus可以单独运行子查询,将结果推送给所有工作人员,运行主片段查询,并将结果拉回协调器。推拉设计支持以上子查询。让我们通过查看此查询的EXPLAIN输出来了解这一点。它对应于:GroupAggregate(cost=0.00..0.00rows=0width=0)GroupKey:remote_scan.page_id->Sort(cost=0.00..0.00rows=0width=0)SortKey:remote_scan.page_id->自定义扫描(Citus自适应)(成本=0.00..0.00行=0宽度=0)->分布式子计划6_1->限制(成本=0.00..0.00行=0宽度=0)->排序(成本=0.00。.0.00行=0宽度=0)排序键:COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint,'0'::bigint))))::bigint,'0'::bigint)DESC->HashAggregate(cost=0.00..0.00rows=0width=0)GroupKey:remote_scan.page_id->CustomScan(CitusAdaptive)(cost=0.00..0.00rows=0width=0)任务计数:32显示的任务:32个中的一个->任务节点:host=localhostport=9701dbname=postgres->HashAggregate(cost=54.70..56.70rows=200width=12)组键:page_id->对page_views_102008page_views进行序列扫描(成本=0.00..43.47行=2247宽度=4)任务计数:32显示的任务:32之一->任务节点:host=localhostport=9701dbname=postgres->HashAggregate(cost=84.50..86.75rows=225width=36)GroupKey:page_views.page_id,page_views.host_ip->HashJoin(cost=17.00..78.88rows=1124width=36)哈希条件:(page_views.page_id=intermediate_result.page_id)->对page_views_102008page_views进行序列扫描(成本=0.00..43.47行=2247宽度=36)->哈希(成本=14.50..14.50行=200宽度=4)->HashAggregate(cost=12.50..14.50rows=200width=4)GroupKey:intermediate_result.page_id->FunctionScanonread_intermediate_resultintermediate_result(cost=0.00..10.00rows=1000width=4)让我们把它拆开并检查每一块GroupAggregate(cost=0.00..0.00rows=0width=0)GroupKey:remote_scan.page_id->Sort(cost=0.00..0.00rows=0width=0)SortKey:remote_scan.page_id树的根是协调器节点如何处理工作人员的结果。在这种情况下,它将对它们进行分组,并且GroupAggregate要求首先对它们进行排序。->自定义扫描(Citus自适应)(成本=0.00..0.00行=0宽度=0)->分布式子计划6_1。自定义扫描有两个大的子树,从“分布式子计划”开始。->限制(成本=0.00..0.00行=0宽度=0)->排序(成本=0.00..0.00行=0宽度=0)排序键:COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2)::bigint,'0'::bigint))))::bigint,'0'::bigint)DESC->HashAggregate(cost=0.00..0.00rows=0width=0)组键:remote_scan.page_id->自定义扫描(Citus自适应)(成本=0.00..0.00行=0宽度=0)任务计数:32显示的任务:32个中的一个->任务节点:host=localhostport=9701dbname=postgres->HashAggregate(cost=54.70..56.70rows=200width=12)组键:page_id->page_views_102008page_views上的SeqScan(cost=0.00..43.47rows=2247width=4)。有32个工作节点,每个分片都运行上面的(Citus正在选择一个代表来显示)。我们可以识别IN(...)子查询的所有部分:排序、分组和限制。当所有工作人员完成此查询后,他们将输出发送回协调器,协调器将其组合成“中间结果”。任务计数:32显示的任务:32个中的一个->任务节点:host=localhostport=9701dbname=postgres->HashAggregate(cost=84.50..86.75rows=225width=36)GroupKey:page_views.page_id,page_views。host_ip->HashJoin(cost=17.00..78.88rows=1124width=36)HashCond:(page_views.page_id=intermediate_result.page_id).Citus在第二个子树中启动另一个执行器作业。它将计算page_views中的不同主机。它使用JOIN连接中间结果。中间结果将有助于将其限制在前二十页。->page_views_102008page_views上的序列扫描(成本=0.00..43.47行=2247宽度=36)->哈希(成本=14.50..14.50行=200宽度=4)->HashAggregate(成本=12.50..14.50行=200width=4)GroupKey:intermediate_result.page_id->FunctionScanonread_intermediate_resultintermediate_result(cost=0.00..10.00rows=1000width=4).Workers在内部使用read_intermediate_result函数检索中间结果,该函数是从协调器节点Load文件中复制的数据。这个例子展示了Citus如何使用分布式子计划来执行多步查询,以及如何使用EXPLAIN来理解分布式查询执行。PostgreSQL规划器和执行器一旦分布式执行器将查询片段发送给工作人员,它们就会像常规PostgreSQL查询一样被处理。该worker上的PostgreSQL规划器选择最佳计划在相应的分片表上本地执行该查询。PostgreSQL执行器然后运行查询并将查询结果返回给分布式执行器。您可以从PostgreSQL手册中了解有关PostgreSQL计划器和执行器的更多信息。最后,分布式执行器将结果传递给协调器进行最终聚合。计划者http://www.postgresql.org/docs/current/static/planner-optimizer.html执行者http://www.postgresql.org/docs/current/static/executor.html
