前言
Spark YarnShuffleService是作为Hadoop Yarn模块中NodeManager的辅助服务寄生在其进程内部,大家都知道可以通过这个外部服务来削减Executor自身在shuffle过程中的压力,且得益于这个服务的常驻特性,Shuffle Write的文件可以跟着这个服务走,就可以实现动态资源分配等Spark的高级特性。
当然,这个过程中我们把压力实则转移到了NodeManger上,NodeManager的如果有过重的GC问题,在响应Shuffle Client的时候就会有问题,很多Shuffle Client的频繁重试乃至FetchFailed
相关的异常都基本和这个方面有关。另外,也可能给NodeManager带来OOM的风险,比如 YARN-7110
不得不说,Spark YarnShuffleService也是一个相对稳定的模块,高版本兼容低版本,低版本兼容高版本很多时候都没什么问题,因为这个模块几乎没有改动。此外由于这个服务的Jar包是需要放在NodeManager的ClassPath中,推动Hadoop的升级相对繁琐与困难,在兼容性测试通过的基础上,我们也不乐于去做这个推动者,以至于我们生产集群早起部署的Spark 2.1.2 based的分支光荣的承载的Spark 1.6.x~Spark 2.3.x。
凡事都有契机,
For users who enabled external shuffle service, this feature can only be worked when external shuffle service is newer than Spark 2.2.
这种情况就不得不升级了。
趁着夜黑风高,Hadoop Team上Capacity Scheduler/CGroup/NodeLabel/开超售,上2.9等等等等,我们这有个Spark 2.3.2的jar包,兄弟们拿去用吧。
问题来了(现象)
当然,这么大规模的升级动作,必然不能一蹴而就,经历的N次回滚之后,终于算是“尘埃落定”。当用户大爷们开始进场了,那么问题就来了。
1. NodeManager进程吃CPU
48核的机器,每个核60-80%的使用率,都给NodeManager进程吃了,还能不能好好跑Container了?
赶紧把NodeManager的jstack一打
哇哦,IO类的操作这么吃CPU,什么情况?
撸下代码
/** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. */
public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }
原来是Shuffle Server服务端用了96个线程
/**
* Specifies an upper bound on the number of Netty threads that Spark requires by default.
* In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
* that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
* at a premium.
*
* Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
* allocation. It can be overridden by setting the number of serverThreads and clientThreads
* manually in Spark's configuration.
*/
private val MAX_DEFAULT_NETTY_THREADS = 8
根据官方描述2-4个线程基本就可以满足万兆网的数据传输服务了,保险起见那我们就把这个参数设置成8吧。
spark.shuffle.io.serverThreads=8
果然NodeManager的CPU消耗降下来了,CPU被限制在了600%-800%,但这几个CPU相当于还是爆满的。
2. ExecutorLostFailure
Spark启动Executor之后,首先由Executor端的CoarseGrainedExecutorBackend
发送RegisterExecutor
消息到Driver端的CoarseGrainedSchedulerBackend
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
CoarseGrainedSchedulerBackend
收到这个消息之后, 塞进executorDataMap
完成executor的注册,然后发送一个异步消息给CoarseGrainedExecutorBackend
(就不管了),就就就就开始makeOffers
, 下发task到这个executor了。
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
//此处删掉一些代码
val data = new ExecutorData(executorRef, executorAddress, hostname,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}
CoarseGrainedExecutorBackend
接受到RegisteredExecutor
消息后开始处理
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
这个过程其实才算Executor
本尊的初始化过程,而且这个过程其实相对来说是比较“耗时”“容易出问题”的,一个是要完成block manager的初始化,其中需要完成Shuffle Client的初始化,如果是开启了External Shuffle Service服务,那就需要 registerWithExternalShuffleServer
,实例化并注册Shuffle Client到Shuffle Server端(也就是NodeManager 7337端口),
这个分为两个过程,一个是创建Shuffle Client的实例,另一个是Shuffle Client基于spark.shuffle.registration.timeout 5000
的超时间隔发一个RegisterExecutor
的同步消息给Shuffle Server,然后基于spark.shuffle.registration.maxAttempts 3
进行这个周期的重试,当这两个参数设置过大,或者Shuffle Client实例化的时间过长,就会阻塞整个Executor
本尊的初始化过程,这个过程完成后才能启动Executor
端的心跳上报机制,当上个过程不完成,心跳没上报,Driver
端就无法更新Executor
的最新状态。这个时候Task已然可能已经分发到这个Executor
上,然后就会出现以下这类ExecutorLostFailure,往往Executor
查看日志端也并没有task被分配到这个Executor
上。
往往Executor
查看日志端也并没有task被分配到这个Executor
上。基本上刷完下面的日志不成功连接,就会结果了这个Executor
2019-08-02 02:23:36,064 [129209] - ERROR [dispatcher-event-loop-0:Logging$class@91] - Failed to connect to external shuffle server, will retry 14 more times after waiting 5 seconds...
java.io.IOException: Failed to connect to hadoop3909.jd.163.org/10.196.68.54:7337
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:201)
at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:142)
at org.apache.spark.storage.BlockManager$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:289)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:286)
at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:260)
at org.apache.spark.executor.Executor.<init>(Executor.scala:116)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83)
at org.apache.spark.rpc.netty.Inbox$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
这块总体上可以算是Spark本身的一个缺陷,在executor未真正初始化之前不应该将它算成一个可用的计算节点,不应该将任务下发到这个节点,明显的这边task处于无效的等待中,最后无谓的失败掉!
另外当开始Dynamic Executor Allocation特性和BlackList特性时,如果minExecutors值设置较小,如1,如果任务被分配到这个节点上,就可能出现cannot run anywhere due to node and executor blacklist
样子的错误,导致整个Job 被杀掉,这问题就更严重了。两个增强Spark自身鲁棒性的功能,配合另一个反而更加容易让其挂掉,呵呵。
大量的FetchFailed
一种情况是,服务端主动或被动关闭连接,客户端收到RST信号
org.apache.spark.shuffle.FetchFailedException: Connection reset by peer
这种一般是由于服务端压力过大造成的,对于这类异常一般都是通过调整以下两个参数来调整客户端的重试次数来给客户端自己更多的机会,加大等待间隔来给服务端更多的喘息机会来消化自身的压力。
spark.shuffle.io.maxRetries 15
spark.shuffle.io.retryWait 6s
但是服务端的压力实在过大就需要从服务端入手了,盲目的调整这两个参数只会让你的任务更慢的失败而已。
另一种情况是,客户端判断有请求但通道中没有流量,这个时候超过网络超时的设置就会断开连接,
2019-08-02 01:31:57,119 [673532] - INFO [Executor task launch worker for task 5038:Logging$class@54] - Code generated in 32.595886 ms
2019-08-02 01:46:55,672 [1572085] - ERROR [shuffle-client-6-4:TransportChannelHandler@144] - Connection to hadoop3816.jd.163.org/10.196.77.46:7337 has been quiet for 900000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
2019-08-02 01:46:55,674 [1572087] - ERROR [shuffle-client-6-4:TransportResponseHandler@144] - Still have 4 requests outstanding when connection from hadoop3816.jd.163.org/10.196.77.46:7337 is closed
2019-08-02 01:46:55,675 [1572088] - ERROR [shuffle-client-6-4:OneForOneBlockFetcher$1@138] - Failed while starting block fetches
java.io.IOException: Connection from hadoop3816.jd.163.org/10.196.77.46:7337 closed
此时同一Executor
上的别的task复用这个客户端时就会出现 connection * closed
异常,注意日志第一行和第二行的时间间隔,这时候由于spark.network.timeout
的设置,这个没有流量的通道其实闲置了近15分钟的时间,我们可以通过设置spark.shuffle.io.connectionTimeout=xxs
来单独控制这段逻辑的超时时间,而不是用spark.network.timeout
的统一设置。
什么原因(本质)
Shuffle 过程本质上是一个IO密集型的操作,但CPU消耗过大,其实才是真正的问题所在,再加上用户作业在整个过程中并不是一个变量,变量只有Hadoop相关的修改,及Spark Yarn ShuffleService的2.1-> 2.3的变化,抛开Hadoop先不说,先看下Spark自身的变化。
SPARK-15074,由于发现做一个大任务的时候花了大把的时间在shuffle fetch过程中,jstack信息显示主要的时间是花在了反复地读取index文件上,这和我们的场景很像,这个issue对应的PR中以entry的方式缓存index文件,而我们用的Spark 2.3通过SPARK-21501之后已经改成了以文件大小的方式缓存,默认的话只有100m的大小,对于集群上大部分shuffle作业,这点基本上不够看,等于没有缓存,所以大量的开销都花在了index文件的读取上。
通过调整spark.shuffle.service.index.cache.size=6144m
,大大缩减这块的开销,减少了NodeManager CPU压力,缓解了Shuffle Client注册,连接、传输超时和断连的问题。
总结
- 不建议通过调整spark.network.timeout来调整所有spark网络超时相关的参数,影响面太广,不可控,建议通过不同的timeout参数控制不同的逻辑
- Spark 大超时参数可能影响Executor注册及Task调度的初始逻辑,这里有坑,详细见上面
- 要注意Spark Yarn Shuffle Service在Index文件缓存方式上的变化,2.1的时候通过entry个数来限制总量,当单entry过大时容易造成NodeManager的内存压力和OOM风险,2.3的 spark.shuffle.service.index.cache.size 100M太小,需要调整。