当前位置: 首页 > 科技观察

使用 SingleStore 作为时间序列数据库

时间:2023-03-12 21:14:13 科技观察

使用SingleStore作为时序数据库基于关系技术,支持key-value、JSON、全文搜索、地理空间和时序等多模型能力。本文将使用Kaggle的历史标准普尔500股票数据来探索SingleStore对时间序列数据的支持。我们还将构建一个快速仪表板,以使用Streamlit可视化烛台图表。本文中使用的SQL脚本、Python代码和笔记本文件在GitHub上以DBC、HTML和iPython格式提供。简介自关系数据库技术出现以来,出现了许多管理数据的新要求。MartinFowler等知名人士提出了PolyglotPersistence作为管理各种数据和数据处理需求的解决方案,如图1所示。但是,混合持久性是有代价的,并招致了批评,例如:在一个经常被引用的混合在持久性帖子中,MartinFowler为一家虚构的零售商绘制了一个Web应用程序草图,该应用程序使用Riak、Neo4j、MongoDB、Cassandra和RDBMS来处理不同的数据集。不难想象他的零售商的DevOps工程师会一个接一个地跳槽。—StephenPimentel另外:我过去曾看到,如果您尝试做其中的六项[技术],您至少需要18个人来操作存储端——即六项存储技术。那是不可扩展的,而且成本太高。—戴夫·麦克罗里(DaveMcCrory)近年来,也出现了一些使用微服务实现混合持久化架构的提议。但是,SingleStore可以通过在单个多模型数据库系统中支持不同的数据类型和处理需求来提供更简单的解决方案。这带来了许多好处,例如更低的TCO(总拥有成本)、开发人员学习多种产品的负担更小、没有集成麻烦等。我们将在一系列文章中更详细地讨论SingleStore的多模型功能,从现在开始与时间序列数据。首先,我们需要在SingleStore网站上创建一个免费的托管服务帐户,并在Databricks网站上创建一个免费的社区版(CE)帐户。在撰写本文时,SingleStore的托管服务帐户附带500美元的信用额度,这对于本文中描述的案例研究来说绰绰有余。对于DatabricksCE,我们没有注册试用版,而是注册了一个免费帐户。在上一篇文章中,我们指出Spark非常适合带有SingleStore的ETL,因此这就是此处使用Spark的原因。如果您没有Kaggle帐户,请创建一个并下载all_stocks_5yr.csv文件。Kaggle网站指出文件大小为29.58MB。数据集由以下字段组成:日期:从2013年2月8日到2018年2月7日的五年每日时间段。没有缺失值。open:开盘价。11个缺失值。high:最高价。8个缺失值。low:最低价。8个缺失值。close:收盘价。没有缺失值。交易量:交易量。没有缺失值。名称:交易代码。505个唯一值。没有缺失值。在开始阶段,我们将使用日期、关闭和名称信息。配置DatabricksCE上一篇文章详细说明了如何配置DatabricksCE并将其与SingleStore一起使用,因此我们可以将它们用于此用例。上传CSV文件要使用CSV文件,我们需要将其上传到DatabricksCE环境。上一篇文章提供了有关如何上传可用于此用例的CSV文件的详细说明。创建数据库表在我们的SingleStore托管服务帐户中,使用SQL编辑器创建一个新的timeseries_db数据库。如下图:SQL:CREATEDATABASEIFNOTEXISTStimeseries_db;再建一张表如下:SQL:USEtimeseries_db;CREATEROWSTORETABLEIFNOTEXISTStick(tsDATETIMESERIESTIMESTAMP,symbolVARCHAR(5),priceNUMERIC(18,4),KEY(ts));每行都有一个名为ts的时间值属性。我们使用DATETIME而不是DATETIME(6)因为我们在这个例子中不使用小数秒。SERIESTIMESTAMP将表列指定为默认时间戳。在ts上创建一个KEY,因为这允许我们有效地过滤值的范围。填写笔记本现在创建一个名为DataLoaderforTimeSeries的新DatabricksCEPython笔记本。将新笔记本附加到Spark集群。在新的代码单元中,添加以下代码:Python:frompyspark.sql.typesimport*tick_schema=StructType([StructField("ts",TimestampType(),True),StructField("open",DoubleType(),True),StructField("high",DoubleType(),True),StructField("low",DoubleType(),True),StructField("price",DoubleType(),True),StructField("volume",IntegerType(),True),StructField("symbol",StringType(),True)])这种模式确保我们拥有正确的列类型。在下一个代码单元格中创建一个新的数据框,如下所示:Python:tick_df=spark.read.csv("/FileStore/all_stocks_5yr.csv",header=True,schema=tick_schema)这将读取CSV文件并创建一个名为tick_df。我们还告诉Spark有一个标题行,并要求它使用之前定义的模式。在下一个代码单元中,我们获取行数:Python:tick_df.count()执行此操作并获取值619040。根据前面的初步分析决定,我们删除一些列如下:Python:tick_df=tick_df.drop("open","high","low","volume")并对数据进行排序:Python:tick_df=tick_df.sort("ts","symbol")在下一个代码单元中,让我们看一下Dataframe的结构:Python:tick_df.show(10)输出如下:PlainText:+-------------------+--------+-----+|TS|价格|符号|+-----------------+--------+------+|2013-02-0800:00:00|45.08|一个||2013-02-0800:00:00|14.75|AAL||2013-02-0800:00:00|78.9|AAP||2013-02-0800:00:00|67.8542|AAPL||2013-02-0800:00:00|36.25|ABBV||2013-02-0800:00:00|46.89|美国广播公司||2013-02-0800:00:00|34.41|ABT||2013-02-0800:00:00|73.31|乙腈||2013-02-0800:00:00|39.12|ADBE||2013-02-0800:00:00|45.7|ADI|+-----------------+--------+------+仅显示前10行现在准备将Dataframe写入SingleStore.在下一个代码单元中,您可以添加以下内容:Python:%run./Setup在设置笔记本中,您需要确保已为SingleStoreManagedServices集群添加服务器地址和密码。在下一个代码单元中,我们将为SingleStoreSpark连接器设置一些参数,如下所示:Python:spark.conf.set("spark.datasource.singlestore.ddlEndpoint",cluster)spark.conf.set("spark.datasource.singlestore.user","admin")spark.conf.set("spark.datasource.singlestore.password",password)spark.conf.set("spark.datasource.singlestore.disablePushdown","false")最后准备使用SparkConnector将Dataframe写入SingleStorePython:(tick_df.write.format("singlestore").option("loadDataCompression","LZ4").mode("ignore").save("timeseries_db.tick"))This会将Dataframe写入timeseries_db数据库中的tick表。从SingleStore可以检查表是否已成功填充。示例查询现在我们已经构建了系统,我们可以运行一些查询。SingleStore支持一组用于处理时间序列数据的有用函数。让我们看一些例子。平均值下面的查询说明了如何计算表中所有时间序列值的简单平均值:SQL:SELECTsymbol,AVG(price)FROMtickGROUPBYsymbolORDERBYsymbol;输出应该是:纯文本:+---------+----------------+|符号|AVG(价格)|+--------+-------------+|一个|49.20202542||艾尔|38.39325226||亚太经合组织|132.43346307||美国航空航天局|109.06669849||艾比维|聚合和分组。SingleStore支持几个函数:FIRST:与最小时间戳关联的值。本文档包含其他详细信息和示例。LAST:与最大时间戳关联的值。本文档包含其他详细信息和示例。TIME_BUCKET:将时间规范化为最近存储桶的开始时间。本文档包含其他详细信息和示例。例如,您可以使用TIME_BUCKET查询按五天间隔分组的平均时间序列值,如下所示:SQL:SELECTsymbol,TIME_BUCKET("5d",ts),AVG(price)FROMtickWHEREsymbol="AAPL"GROUPBY1,2ORDERBY1,2;输出应为:纯文本:+--------+------------------------+-------------+|符号|TIME_BUCKET("5d",ts)|AVG(价格)|+--------+----------------------+-------------+|美国航空航天局|2013-02-0800:00:00.0|67.75280000||美国航空航天局|2013-02-1300:00:00.0|66.36943333||美国航空航天局|2013-02-1800:00:00.0|64.48960000||美国航空航天局|02-2800:00:00.0|61.51996667|.........这些函数也可以结合起来创建烛台图表,显示一只股票随时间变化的最高价、最低价、开盘价和收盘价,以五天为一个窗口单位,如下:SQL:SELECTTIME_BUCKET("5d")ASts,symbol,MIN(price)ASlow,MAX(price)AShigh,FIRST(price)ASopen,LAST(price)AScloseFROMtickWHEREsymbol="AAPL"GROUPBY2,1ORDERBY2,1;输出应为:纯文本:+------------+--------+----------+---------+------------+----------+|ts|symbol|lo瓦|高|打开|关闭|+------------+--------+--------+----------+---------+------------+|2013-02-08|美国航空航天局|66.8428|68.5614|67.8542||65.7371|66.77156|66.7156|65.7371||2013-02-18|苹果|63.7228|65.7128|65.7128|64.4014||2013-02-23|苹果|63.2571|64.1385|63.5099||2013-02-28|美国航空航天局|60.0071|63.0571|63.0571|60.0071|............平滑可以使用AVG聚合overwindows来平滑时间序列数据这是一个示例,查看过去三个报价的价格和移动平均线:SQL:SELECTsymbol,ts,price,AVG(price)OVER(ORDERBYtsROWSBETWEEN3PRECEDINGANDCURRENTROW)ASsmoothed_priceFROMtickWHEREsymbol=“美国航空航天局”;输出应为:纯文本:+--------+------------------------+----------+----------------+|符号|TS|价格|平滑价格|+--------+----------------------+--------+--------------+|美国航空航天局|2013-02-0800:00:00.0|67.8542|67.85420000||美国航空航天局|2013-02-1100:00:00.0|68.5614|68.20780000||美国航空航天局|2013-02-1300:00:00.0|66.7156|67.49350000||美国航空航天局|2013-02-1400:00:00.0|66.6556|某个时间点的当前表行也是常见的时间序列需求。这可以通过ORDERBY和LIMIT轻松实现。这是一个示例:SQL:SELECT*FROMtickWHEREts<="2021-10-1100:00:00"ANDsymbol="AAPL"ORDERBYtsDESCLIMIT1;输出应为:纯文本:+----------------------+--------+----------+|TS|符号|价格|+--------------------+--------+----------+|2018-02-0700:00:00.0|美国航空航天局|159.5400|+--------------------+--------+----------+Interpolationtimeseriesdatamayhavemissing值。我们可以插入缺失点。SingleStore文档提供了一个示例存储过程,可在处理报价数据时用于此目的。加分项:前面提到过Streamlit可视化烛台图表,如果能够将这些图表视为图形而不是表格,那就太好了,使用Streamlit这很容易做到。上一篇文章展示了我们可以轻松地将Streamlit连接到SingleStore。安装所需的软件我们需要安装以下包:纯文本:streamlitpandasplotlyPymysql这些可以在GitHub上的requirements.txt文件中找到。运行文件如下:Shell:pipinstall-rrequirements.txt示例应用程序以下是streamlit_app.py的完整代码清单:Python:#streamlit_app.pyimportstreamlitasstimportpandasaspdimportplotly.graph_objectsasgoimportpymysql#Initializeconnection.definit_connection():返回pymysql.connect(**st.secrets["singlestore"])conn=init_connection()symbol=st.sidebar.text_input("Symbol",value="AAPL",max_chars=None,key=None,type="default")num_days=st.sidebar.slider("Numberofdays",2,30,5)#Performquery.data=pd.read_sql("""SELECTTIME_BUCKET(%s)ASday,symbol,MIN(价格)为低,MAX(价格)为高,FIRST(价格)为开盘价,LAST(价格)为closeFROMtickWHERE符号=%sGROUPBY2,1ORDERBY2,1;""",conn,params=(str(num_days)+"d",symbol.upper()))st.subheader(symbol.upper())fig=go.Figure(data=[go.Candlestick(x=data["day"],open=数据[“开”],高=数据[“高”],低=数据[“低”],关闭=数据[“关闭”],名称=符号l,)])fig.update_xaxes(type="category")fig.update_layout(height=700)st.plotly_chart(fig,use_container_width=True)st.write(data)创建机密文件读取根目录下的机密文件.streamlit/secrets.toml的目录你需要创建这个文件如下:password=""host和password要替换成创建集群时从SingleStore托管服务中获取的对应值。运行代码按如下方式运行Streamlit应用程序:Shell:streamlitrunstreamlit_app.pyWeb浏览器中的输出应如图2所示。在网页上,您可以在文本框中输入新的股票代码并使用滑块更改TIME_BUCKET的天数。随意试验代码以满足您的需要。总结本文表明SingleStore是处理时间序列数据的有效解决方案。借助SQL和内置函数的强大功能,我们可以实现很多。SingleStore通过添加FIRST、LAST和TIME_BUCKET扩展了对时间序列的支持。致谢感谢JohnPickford博士对适当时间序列数据集的建议和指导。还要感谢Part-TimeLarry提供的精彩视频Streamlit——使用Python构建金融仪表盘和启发本文Streamlit可视化部分的GitHub代码。译者介绍杨小娟,51CTO社区编辑,西安电子科技大学计算机专业硕士研究生,高级研发工程师,信息系统项目经理,近20年Java开发经验。在NEC、Oracle、英方从事过Oracle数据库的数据存储、数据迁移、同构/异构数据库复制工作,尤其对数据库和数据编码有深入的学习和理解。原标题:UsingSingleStoreasaTimeSeriesDatabase,作者:AkmalChaudhri