转载本文请联系黑客下午茶公众号。Citus提供大型数据集的实时查询。我们在Citus的常见工作负载之一涉及为事件数据的实时仪表板提供支持。例如,您可以是帮助其他企业监控其HTTP流量的云服务提供商。每次您的某个客户端收到HTTP请求时,您的服务都会收到一条日志记录。您想要提取所有这些记录并创建一个HTTP分析仪表板,为您的客户提供洞察力,例如他们的网站服务所服务的HTTP错误数量。重要的是,这些数据的显示要尽可能短,这样您的客户就可以解决他们站点的问题。仪表板显示历史趋势图也很重要。或者,您可能正在构建广告网络并希望向客户展示其广告系列的点击率。延迟在这个例子中也很关键,原始数据量很大,历史和实时数据都很重要。在本节中,我们将演示如何构建第一个示例的一部分,但该架构同样适用于第二个和许多其他用例。real-time-analytics-Hands-On-Lab-Hyperscale-Citushttps://github.com/citusdata/real-time-analytics-Hands-On-Lab-Hyperscale-Citus为您的客户构建实时分析https://github.com/citusdata/postgres-analytics-tutorial数据模型我们正在处理的数据是不可变的日志数据流。我们将直接插入Citus,但这种数据首先通过Kafka之类的东西进行路由也很常见。这具有通常的优势,即一旦卷变得无法管理,就可以更轻松地预聚合数据。我们将使用一个简单的模式来摄取HTTP事件数据。以这个schema为例展示整体架构;一个真实的系统可能会使用额外的列。--这是在协调器上运行CREATETABLEhttp_request(site_idINT,ingest_timeTIMESTAMPTZDEFAULTnow(),urlTEXT,request_countryTEXT,ip_addressTEXT,status_codeINT,response_time_msecINT);SELECTcreate_distributed_table('http_request',)when'site当我们调用create_distributed_table时,我们要求Citus使用site_id列散列分发http_request。这意味着特定站点的所有数据都将存在于同一个分片中。create_distributed_tablehttps://docs.citusdata.com/en/v10.2/develop/api_udf.html#create-distributed-tableUDF使用分片计数的默认配置值。我们建议使用集群中CPU核心数量的2-4倍的分片。使用这么多分片可以让您在添加新工作节点后重新平衡集群中的数据。使用2-4倍CPU内核数进行分片https://docs.citusdata.com/en/v10.2/faq/faq.html#faq-choose-shard-countAzureDatabaseforPostgreSQL-Hyperscale(Citus)usingstreams复制以实现高可用性,因此维护分片副本是多余的。在流复制不可用的任何生产环境中,您应该将citus.shard_replication_factor设置为2或更高以实现容错。AzureDatabaseforPostgreSQLhttps://docs.microsoft.com/azure/postgresql/hyperscale/流复制https://www.postgresql.org/docs/current/static/warm-standby.html有了这个,系统可以数据被接受并提供查询!在继续执行本文中的其他命令时,让以下循环在psql控制台的后台运行。它每秒或每两秒生成一次假数据。DO$$BEGINLOOPINSERTINTOhttp_request(site_id,ingest_time,url,request_country,ip_address,status_code,response_time_msec)值(trunc(random()*32),clock_timestamp(),concat('http://example.com/',md5(random()::text)),('{China,India,USA,Indonesia}'::text[])[ceil(random()*4)],concat(trunc(random()*250+2),'.',trunc(random()*250+2),'.',trunc(random()*250+2),'.',trunc(random()*250+2))::inet,('{200,404}'::int[])[ceil(random()*2)],5+trunc(random()*150));犯罪;执行pg_sleep(random()*0.25);ENDLOOP;END$$;拍摄数据后,您可以运行仪表板查询,例如:SELECTsite_id,date_trunc('minute',ingest_time)asminute,COUNT(1)ASrequest_count,SUM(CASEWHEN(status_codebetween200and299)THEN1ELSE0END)assuccess_count,SUM(CASEWHEN(status_codebetween200and299)THEN0ELSE1END)aserror_count,SUM(response_time_msec)/COUNT(1)ASaverage_response_time_msecFROMhttp_requestWHEREdate_trunc('minute',ingest_time)>now()-'5minutes'::intervalGROUPBYsite_id,minuteORDERBYminuteASC;上述设置有效,但有两个缺点:每次您需要生成图表时,您的HTTP分析仪表板都必须遍历每一行。例如,如果您的客户对过去一年的趋势感兴趣,您的查询将从头开始汇总过去一年的每一行。您的存储成本将随着摄取率和可查询历史记录的长度按比例增长。实际上,您可能希望将原始事件保留较短的时间段(一个月),而查看较长时间段(年)的历史图表。聚合您可以通过将原始数据聚合成预先聚合的形式来克服这两个缺点。在这里,我们将原始数据聚合到一个表中,该表存储1分钟间隔的摘要。在生产系统中,您可能还需要1小时和1天等间隔,这与仪表板中的缩放级别相对应。当用户想要上个月的请求时间时,仪表板可以简单地读取和绘制过去30天中每一天的值。CREATETABLEhttp_request_1min(site_idINT,ingest_timeTIMESTAMPTZ,--这一行代表哪一分钟error_countINT,success_countINT,request_countINT,average_response_time_msecINT,CHECK(request_count=error_count+success_count),CHECK(timeminingest_run='));选择create_distributed_table('http_request_1min','site_id');在http_request_1min(site_id,ingest_time)上创建索引http_request_1min_idx;这看起来很像前面的代码块。最重要的是:它还在site_id上进行分片,并对分片计数和复制因子使用相同的默认配置。因为三者都匹配,所以http_requestshard和http_request_1minshard是一一对应的,Citus会把匹配的shard放在同一个worker上。这称为主机托管;它使连接等查询更快,并使我们的聚合成为可能。并置(co-location)https://docs.citusdata.com/en/v10.2/sharding/data_modeling.html#colocation要填充http_request_1min,我们将定期运行INSERTINTOSELECT。这是可能的,因为表位于同一位置。为方便起见,以下函数包装了聚合查询。--当我们汇总lastCREATETABLElatest_rollup(minutetimestamptzPRIMARYKEY,--"minute"应该不比分钟更精确CHECK(minute=date_trunc('minute',minute)));--初始化为很久以前的时间INSERTINTOlatest_rollupVALUES('10-10-1901');--functiontodotherollupCREATEORREPLACEFUNCTIONrollup_http_request()RETURNSvoidAS$$DECLAREcurr_rollup_timetimestamptz:=date_trunc('minute',now());last_rollup_timetimestamptz:=latest_rollup分钟;开始插入http_request_1min(site_id,ingest_time,request_count,success_count,error_count,average_response_time_msec)SELECTsite_id,date_trunc('minute',ingest_time),COUNT(1)作为request_count,SUM(CASEWHEN(status_code)在200和299之间)THEN1ELSE0END)作为success_count,SUM(CASEWHEN(status_code在200和299之间)THEN0ELSE1END)作为error_count,SUM(response_time_msec)/COUNT(1)ASaverage_response_time_msecFROMhttp_request--仅汇总自last_rollup_time以来的新数据WHEREdate_trunc('minute',ingest_time)<@tstzrange(last_rollup_time,curr_rollup_time,'(]')GROUPBY1,2;--更新最新的that_rollup中的值我们运行--rollup它将对比curr_rollup_time更新的数据进行操作UPDATElatest_rollupSETminute=curr_rollup_time;END;$$LANGUAGEplpgsql;上面的函数应该每分钟调用一次你可以通过在协调器节点上添加一个crontab条目来做到这一点:*****psql-c'SELECTrollup_http_request();'或者,诸如pg_cron之类的扩展允许您直接从数据库安排定期查询。pg_cron之前的检测https://github.com/citusdata/pg_cron板查询是现在好多了:SELECTsite_id,ingest_timeasminute,request_count,success_count,error_count,average_response_time_msecFROMhttp_request_1minWHEREingest_time>date_trunc('minute',now())-'5minutes'::interval;Queriesare更快,但我们仍然需要使旧数据过期以避免无限的存储成本。只需决定您希望为每个粒度保留多长时间的数据,并使用标准查询来删除过期数据。在以下示例中,我们决定将原始数据保留一天,将按分钟聚合的数据保留一个月:DELETEFROMhttp_requestWHEREingest_time
