当前位置: 首页 > 科技观察

如何优化时间序列数据的K-means聚类速度?

时间:2023-03-20 18:06:10 科技观察

时间序列数据(TimeSeriesData)是按时间排序的数据。利率、汇率、股票价格都是时间序列数据。时间序列数据的时间间隔可以是分钟、秒(如高频金融数据),也可以是日、周、月、季度、年甚至更大的时间单位。数据分析解决方案提供商NewRelic在博客中介绍了针对时间序列数据优化K-means聚类以提高速度的方法。作者整理介绍了这篇文章。在NewRelic,我们每分钟收集13.7亿个数据点。我们为客户收集、分析和呈现的数据中有很大一部分是时间序列数据。为了创建应用程序与其他实体(例如服务器和容器)之间的关系,以创建新的智能产品,例如NewRelicRadar,我们不断探索对时间序列数据进行分组的更快、更有效的方法。鉴于我们收集的数据量巨大,更快的聚类时间至关重要。加速k均值聚类k均值聚类是一种流行的数据分组方法。k-means方法的基本原理包括确定每个数据点之间的距离并将它们分组到有意义的集群中。我们通常使用平面上的2D数据来演示此过程。二维以上的聚类当然是可能的,但可视化此类数据的过程变得更加复杂。例如,下图显示了k-means聚类在经过多次迭代后在任意两个维度上的收敛性:不幸的是,这种方法不适用于时间序列数据,因为它们通常是变化的一维数据。但是,我们仍然可以使用一些不同的函数来计算两个时间序列数据之间的距离因子。在这些情况下,我们可以使用均方误差(MSE)来探索不同的k-means实现。在测试这些实现时,我们注意到很多都存在严重的性能问题,但我们仍然能够展示加速k-means聚类的可能方法,在某些情况下甚至可以提高一个数量级。这里我们将使用Python的NumPy包。如果您决定继续,您可以将此代码直接复制并粘贴到JupyterNotebook中。让我们从导入包开始,这是我们一直使用的包:importtimeimportnumpyasnpimportmatplotlib.pyplotasplt%matplotlibinline在接下来的测试中,我们首先生成10000个随机时间序列数据,每个样本长度为500。然后我们将噪声添加到正弦中随机长度的波。虽然这种类型的数据对于k-means聚类方法来说并不理想,但对于未优化的实现来说已经足够了。n=10000ts_len=500phases=np.array(np.random.randint(0,50,[n,2]))pure=np.sin([np.linspace(-np.pi*x[0],-np.pi*x[1],ts_len)forxinphases])noise=np.array([np.random.normal(0,1,ts_len)forxinrange(n)])signals=pure*noise#Normalizeeverythingbetween0and1signals+=np.abs(np.min(signals))signals/=np.max(signals)plt.plot(signals[0])第一个实现让我们从最基本和最直接的实现开始。euclid_dist为距离函数实现了一个简单的MSE估计器,k_means实现了基本的k-means算法。我们从初始数据集中选择num_clust随机时间序列数据作为质心(代表每个簇的中心)。在num_iter迭代过程中,我们不断移动质心,同时最小化这些质心与其余时间序列数据之间的距离。defeuclid_dist(t1,t2):returnp.sqrt(((t1-t2)**2).sum())defk_means(data,num_clust,num_iter):质心=信号[np.random.randint(0,signals.shape[0],num_clust)]forninrange(num_iter):assignments={}forind,iinenumerate(data):min_dist=float('inf')closest_clust=Noneforc_ind,jinenumerate(centroids):dist=euclid_dist(i,j)ifdist0:print("adding{}centroid(s)".format(k-new_centroids.shape[0]))additional_centroids=data[np.random.randint(0,data.shape[0],k-new_centroids.shape[0])]new_centroids=np.append(new_centroids,additional_centroids,axis=0)returnnew_centroidsdefk_means(data,num_clust,num_iter):centroids=信号[np.random.randint(0,signals.shape[0],num_clust)]last_centroids=centroidsforninrange(num_iter):closest=closest_centroids(data,centroids)centroids=move_centroids(data,closest,centroids)ifnotnp.any(last_centroids!=centroids):print("earlyfinish!")breaklast_centroids=centroids返回centroidst1=time.time()centroids=k_means(signals,100,100)t2=time。time()print("Took{}seconds".format(t2-t1))adding1centroid(s)earlyfinish!took206.72993397712708seconds花了3.5分钟多一点,这很好!但我们也想更快地完成。k-means++实现我们的下一个实现使用k-means++算法。该算法的目的是选择一个更好的初始质心。让我们看看这个优化是否有效...definit_centroids(data,num_clust):centroids=np.zeros([num_clust,data.shape[1]])centroids[0,:]=data[np.random.randint(0,data.shape[0],1)]foriinrange(1,num_clust):D2=np.min([np.linalg.norm(data-c,axis=1)**2forcincentroids[0:i,:]],axis=0)probs=D2/D2.sum()cumprobs=probs.cumsum()ind=np.where(cumprobs>=np.random.random())[0][0]质心[i,:]=np.expand_dims(data[ind],axis=0)返回质心defk_means(data,num_clust,num_iter):centroids=init_centroids(data,num_clust)last_centroids=centroidsforninrange(num_iter):closest=closest_centroids(data,centroids))centroids=move_centroids(data,closest,centroids)ifnotnp.any(last_centroids!=centroids):print("提前完成!")breaklast_centroids=centroidsreturncentroidst1=time.time()centroids=k_means(signals,100,100)t2=time.time()print("Took{}seconds".format(t2-t1))earlyfinish!took180.91435194015503seconds和我们之前的迭代相比,加入k-means++算法可以得到稍好的性能。然而,当我们将其并行化时,这种优化方法才真正开始显着回报。并行实现到目前为止,我们所有的实现都是单线程的,因此我们决定探索k-means++算法的并行化部分。因为我们使用的是JupyterNotebook,所以我们选择使用ipyparallel进行并行计算来管理并行度(ipyparallel地址:https://github.com/ipython/ipyparallel)。有了ipyparallel,我们就不用担心fork整个服务器了,只是需要解决一些特殊的问题。例如,我们必须指示我们的工作节点加载NumPy。importipyparallelasippc=ipp.Client()v=c[:]v.use_cloudpickle()withv.sync_imports():importnumpyasnp加载worker的更多细节请参考ipyparallel入门指南:https://ipyparallel.readthedocs.io/en/latest/在这个实现中,我们关注并行化的两个方面。首先,calc_centroids有一个循环遍历每个质心并将其与我们的时间序列数据进行比较。我们使用map_sync将这些迭代中的每一个发送给我们的工作人员。接下来,我们在k-means++质心搜索中并行化一个类似的循环。注意对v.push的调用:因为我们的lambda引用数据,我们需要确保它在工作节点上可用。为此,我们调用ipyparallel的push方法将该变量复制到worker的全局范围内。看代码:defcalc_centroids(data,centroids):returnp.array(v.map_sync(lambdax:np.sqrt(((x-data)**2).sum(axis=1)),centroids))defclosest_centroids(点,质心):dist=calc_centroids(points,centroids)returnnp.argmin(dist,axis=0)definit_centroids(data,num_clust):v.push(dict(data=data))centroids=np.zeros([num_clust,数据.shape[1]])centroids[0,:]=data[np.random.randint(0,data.shape[0],1)]foriinrange(1,num_clust):D2=np.min(v.map_sync(lambdac:np.linalg.norm(data-c,axis=1)**2,centroids[0:i,:]),axis=0)probs=D2/D2.sum()cumprobs=probs.cumsum()ind=np.where(cumprobs>=np.random.random())[0][0]centroids[i,:]=np.expand_dims(data[ind],axis=0)返回centroidst1=time。time()centroids=k_means(signals,100,100)t2=time.time()print("Took{}seconds".format(t2-t1))adding2centroid(s)earlyfinish!took143.49819207191467seconds结果只多了一点两分钟,这是我们迄今为止实现的最快速度!下一步:更快!我们仅在这些测试中使用中央处理器(CPU)。CPU提供了方便的并行化,但我们认为只要付出更多的努力,我们就可以使用图形处理单元(GPU)实施集群,并且速度提高一个数量级。我们或许可以使用TensorFlow来做到这一点,TensorFlow是一种用于数值计算和机器学习的开源软件。事实上,TensorFlow已经包含一个k-means实现,但我们几乎肯定需要对其进行调整以将其用于时间序列聚类。无论如何,我们都不会停止寻找更快、更高效的聚类算法来帮助管理我们的用户数据。