【.com快译】为什么转用Spark?虽然我们都在谈论大数据,但我们通常在进入职场一段时间后才会遇到大数据。在我工作的Wix.com,超过1.6亿用户正在生成大量数据,因此我们需要扩展我们的数据流程。尽管还有其他选择(例如Dask),但我们决定选择Spark主要有两个原因:(1)它是最新的技术,广泛用于大数据。(2)我们拥有Spark所需的基础设施。如何使用PySpark为pandas人群编写代码?你可能对熊猫很熟悉。仅仅获得正确的语法可能是一个好的开始,但需要更多的条件来确保PySpark项目的成功。你需要了解Spark的工作原理。让Spark工作起来很困难,但一旦你开始工作,它就会非常棒!Spark的简要概述建议查看这篇文章并阅读MapReduce方面以获得更深入的理解:《如何使用Spark处理大数据?》(https://towardsdatascience.com/the-hitchhikers-guide-to-handle-big-data-using-spark-90b9be0fe89a).我们在这里要理解的概念是水平缩放。从扩大规模开始比较容易。如果我们有一个运行良好的pandas代码,但随后数据变得对它来说太大了,我们可能会转移到具有更多内存的更强大的机器上,希望它能够处理它。这意味着我们仍然有一台机器同时处理所有数据——这就是垂直缩放。如果我们改为决定使用MapReduce,并将数据分成块,并让不同的机器处理每个块,那就是水平缩放。五种Spark最佳实践这五种Spark最佳实践帮助我将运行时间缩短了10倍并扩展了项目。1.从小处着手——抽样数据如果我们想让大数据发挥作用,我们首先需要使用少量数据,看看我们正朝着正确的方向前进。在我的项目中,我抽取了10%的数据并确保管道正常工作,这使我可以使用SparkUI的SQL部分并查看数字在流程中的流动,而不必等待流程运行太久.以我的经验,如果你能用小样本量达到所需的运行时间,你通常可以很容易地扩展。2.了解基础知识:任务、分区和内核这可能是使用Spark时需要了解的最重要的事情:1个分区用于在1个内核上运行的1个任务。你总是想知道你有多少个分区-密切关注每个阶段的任务数量,并将它们与Spark连接中正确的核心数量相匹配。一些技巧和经验法则可以帮助您做到这一点(所有这些都需要针对您的情况进行测试):任务与核心的比例应该是每个核心大约2到4个任务。每个分区的大小应该在200MB-400MB左右,具体取决于每个worker的内存大小,可以根据需要进行调整。3.调试SparkSpark使用惰性求值,即等待动作被调用后再执行计算指令图。动作示例包括show()和count()等。这样就很难知道我们的代码哪里有bug,哪里需要优化。我发现有很大帮助的一种做法是使用df.cache()将代码分成几个部分,然后使用df.count()强制Spark计算每个部分的df。现在使用SparkUI,您可以查看计算的每个部分并找出问题所在。值得一提的是,在没有我们在(1)中提到的采样的情况下使用它会产生非常长的运行时间,这将很难调试。4.找到并修复偏度让我们从定义偏度开始。正如我们提到的,我们的数据被分成多个分区;转换后,每个分区的大小可能会相应改变。这可能导致分区之间的大小差异很大,这意味着我们的数据是倾斜的。通过查看SparkUI中的阶段性详细信息并查找最大值和中值之间的显着差异,可以发现偏度:图1.大差异(中值=3秒,最大值=7.5分钟)意味着数据存在偏差。这意味着我们有几个任务比其他任务慢得多。为什么这很糟糕——这会导致其他阶段等待这几个任务,让核心处于等待状态而无事可做。如果你知道偏斜来自哪里,你可以直接修复它并改变分区。如果您不知道/或无法直接弄清楚,请尝试以下操作:调整任务与核心的比例如前所述,如果我们的任务多于核心,我们希望其他核心在更长的任务时仍然存在忙于其他任务。虽然这是事实,但前面提到的比率(2-4:1)并不能真正解释任务持续时间之间如此大的差异。我们可以尝试将比率增加到10:1,看看是否有帮助,但这种方法可能还有其他缺点。向数据中添加随机字符串(salting)Salting是指使用随机键重新分区数据,以便新分区可以平衡。下面是PySpark的代码示例(使用经常导致偏斜的groupby):图25.Spark中迭代代码的问题这是一个棘手的问题。如前所述,Spark使用惰性求值,因此在运行代码时,它只构建一个计算图(DAG)。但是当你有一个迭代过程时,这种方法可能会有很大的问题,因为DAG重新打开以前的迭代,它变得非常大。这对于驱动程序来说可能太大而无法放入内存。由于应用程序卡住了,因此很难找到问题所在,但在SparkUI中,似乎很长时间都没有作业在运行(确实如此),直到驱动程序最终崩溃才发现并非如此。这是目前Spark的一个固有问题,对我有用的解决方法是每5-6次迭代使用df.checkpoint()/df.localCheckpoint()(稍微试验一下以找到适合您的数字)。这个技巧起作用的原因是checkpoint()打破了沿袭和DAG(不像cache()),保存结果,并从一个新的检查点开始。缺点是如果出现问题,您没有整个DAG来重新创建df。原标题:5个ApacheSparkBestPracticesForDataScience,作者:ZionBadash
