普通调优
jvm调优
数据倾斜
- 数据倾斜只会发生在 shuffle 过程中
- 在进行 shuffle 的时候,必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处理,比如按照 key 进行聚合或 join 等操作。此时如果某个 key 对应的数据量特别大的话,就会发生数据倾斜。
shuffle调优
- Spark 是根据 shuffle 类算子来进行 stage 的划分
- 触发 shuffle 操作的算子: distinct、groupByKey、reduceByKey、 aggregateByKey、join、cogroup、repartition 等
- 可以通过 Spark Web UI 来查看当前运行到了第几个 stage, 看一下当前这个stage 各个 task 分配的数据量,从而进一步确定是不是 task 分配的数据不均匀 导致了数据倾斜。
参数调优
spark.shuffle.file.buffer
- 默认值: 32k
- 参数说明: 该参数用于设置 shuffle write task 的 BufferedOutputStream 的buffer 缓冲大小。将数据写到磁盘文件之前,会先写入 buffer 缓冲中,待缓冲 写满之后,才会溢写到磁盘.
- 调优建议: 如果作业可用的内存资源较为充足的话,可以适当增加这个参 数的大小(比如 64k),从而减少 shuffle write 过程中溢写磁盘文件的次数,也 就可以减少磁盘 IO 次数,进而提升性能。在实践中发现,合理调节该参数, 性能会有 1%~5%的提升.
spark.reducer.maxSizeInFlight
- 默认值: 48m
- 参数说明:该参数用于设置 shuffle read task 的 buffer 缓冲大小,而这个buffer 缓冲决定了每次能够拉取多少数据.
- 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如 96m),从而减少拉取数据的次数,也就可以减少网络传输的 次数,进而提升性能。在实践中发现,合理调节该参数,性能会有 1%~5% 的提升.
spark.shuffle.io.maxRetries
- 默认值: 3
- 参数说明:shuffle read task 从 shuffle write task 所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代 表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会 导致作业执行失败。
- 调优建议:对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试 最大次数(比如 60 次),以避免由于 JVM 的 full gc 或者网络不稳定等因素导 致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数可以大幅度提升稳定性.
解决方案
- Kafka数据源, 均匀分布在不同partitio种
- Hive中key分布不均衡, 处理成Hive中间表
- 调整并行度分散同一个Task的不同key
- SparkSQL:
spark.sql.shuffle.partitions
, shuffle read task 的并行度,该值默认是 200,对于很多场景来说都有点过小 - RDD:
reduceByKey(1000)
程序开发调优
RDD相关
- 避免创建重复的 RDD
- 尽可能复用同一个 RDD
- 对多次使用的 RDD 进行持久化: 调用RDD的cache()和persist()方法
- cache()方法表示: 使用非序列化的方式将 RDD 中的数据全部尝试持久化到内存中.
- persist()方法表示: 手动选择持久化级别,并使用指定的方式进行持久化.
- 使用 MEMORY_ONLY发生OOM, 尝试使 用 MEMORY_ONLY_SER/MEMORY_AND_DISK_SER 级别.
算子调优
- 尽量避免使用 shuffle 类算子, 尽量使用 map 类的非 shuffle 算子
- 使用 map-side 预聚合的 shuffle 操作
- 通常来说,在可能的情况下,建议 使用 reduceByKey 或者 aggregateByKey 算子来替代掉 groupByKey 算子
- 使用高性能的算子
- 使用 reduceByKey/aggregateByKey 替代 groupByKey
- 使用 mapPartitions 替代普通 map, 注意出现OOM的情况(单partition数据量过大)
- 使用 foreachPartitions 替代 foreach
- 使用 filter 之后进行 coalesce 操作
- 使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作
- 广播大变量
- 在算子函数中使用到外部变量时,默认情况下,Spark 会将该变量复制多 个副本,通过网络传输到 task 中,此时每个 task 都有一个变量副本。大量的变量副本在网络中传输的性能开销,以及在各个节点的 Executor 中占用过多内存导致的频繁 GC,都 会极大地影响性能。
- 广播后的变量,会保证每个 Executor 的内存中,只 驻留一份变量副本,而 Executor 中的 task 执行时共享该 Executor 中的那份变 量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能 开销,并减少对 Executor 内存的占用开销,降低 GC 的频率。
- 使用 Kryo 优化序列化性能
- 优化数据结构
- 三种类型比较耗费内存: 对象, 字符串和集合类型
- 在 Spark 编码实现中,特别是对于算子函数中的代 码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类 型(比如 Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少 内存占用,从而降低 GC 频率,提升性能。
运行资源调优
参数
num-executors
- 参数调优建议:每个 Spark 作业的运行一般设置 50~100 个左右的 Executor 进程比较合适,设置太少或太多的 Executor 进程都不好。设置的太少,无法充 分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。
executor-memory
- 参数说明:
- 该参数用于设置每个 Executor 进程的内存。Executor 内存的大 小,很多时候直接决定了 Spark 作业的性能,而且跟常见的 JVM OOM 异常, 也有直接的关联。
- 参数调优建议:
- 每个 Executor 进程的内存设置 4G~8G 较为合适。
- 但是这 只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看 自己团队的资源队列的最大内存限制是多少,num-executors 乘以 executor- memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共 享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的 1/3~1/2,避免你自己的 Spark 作业占用了队列所有的资源,导致别的同学的作 业无法运行。
executor-cores
driver-memory
spark.default.parallelism
- 参数说明: 该参数用于设置每个 stage 的默认 task 数量。这个参数极为重要,如果不设置可能会直接影响你的 Spark 作业性能。
- 参数调优建议:
- Spark 作业的默认 task 数量为 500~1000 个较为合适。
- 官网建议的设置原则是,设置该参数为 num-executors * executor-cores 的 2~3 倍较为合适