运行Apache Spark作业时出现“Connection reset by peer”错误

16
We have two HDP clusters, named A and B.
CLUSTER A NODES: - It consists of a total of 20 commodity machines. - There are 20 data nodes. - Namenode HA is configured with one active and one standby namenode.
CLUSTER B NODES: - It consists of a total of 5 commodity machines. - There are 5 datanodes. - No HA is configured, and this cluster has one primary and one secondary namenode.
We have three major components in our application that perform ETL (Extract, Transform, and Load) operations on incoming files. These components are referred to as E, T, and L, respectively.
COMPONENT E CHARACTERISTICS: - This component is an Apache Spark Job and runs solely on Cluster B. - Its job is to pick up files from NAS storage and put them into HDFS in Cluster B.
COMPONENT T CHARACTERISTICS:
  • 这个组件也是一个Apache Spark作业,它在B集群上运行。
  • 它的工作是拾取由E组件写入HDFS的文件,对其进行转换,然后将转换后的文件写入A集群中的HDFS。

COMPONENT L CHARACTERISTICS :

  • 这个组件也是一个Apache Spark作业,它仅在A集群上运行。
  • 它的工作是拾取由T组件编写的文件,并将数据加载到存在于A集群中的Hive表中。

Component L是三个组件中最好的组件,我们没有遇到任何问题。在组件E中有一些小的无法解释的问题,但组件T是最麻烦的。

组件E和T都使用DFS客户端与namenode通信。

以下是我们在运行组件T时不时观察到的异常摘录:

clusterA.namenode.com/10.141.160.141:8020. Trying to fail over immediately.
java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "clusterB.datanode.com"; destination host is: "clusterA.namenode.com":8020;
            at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:782)
            at org.apache.hadoop.ipc.Client.call(Client.java:1459)
            at org.apache.hadoop.ipc.Client.call(Client.java:1392)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
            at com.sun.proxy.$Proxy15.complete(Unknown Source)
            at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:464)
            at sun.reflect.GeneratedMethodAccessor1240.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
            at com.sun.proxy.$Proxy16.complete(Unknown Source)
            at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2361)
            at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2338)
            at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2303)
            at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
            at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
            at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:109)
            at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
            at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
            at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
            at com.abc.xyz.io.CounterWriter.close(CounterWriter.java:34)
            at com.abc.xyz.common.io.PathDataSink.close(PathDataSink.java:47)
            at com.abc.xyz.diamond.parse.map.node.AbstractOutputNode.finalise(AbstractOutputNode.java:142)
            at com.abc.xyz.diamond.parse.map.application.spark.node.SparkOutputNode.finalise(SparkOutputNode.java:239)
            at com.abc.xyz.diamond.parse.map.DiamondMapper.onParseComplete(DiamondMapper.java:1072)
            at com.abc.xyz.diamond.parse.decode.decoder.DiamondDecoder.parse(DiamondDecoder.java:956)
            at com.abc.xyz.parsing.functions.ProcessorWrapper.process(ProcessorWrapper.java:96)
            at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:131)
            at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:45)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
            at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
            at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
            at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
            at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:123)
            at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:82)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
            at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
            at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
            at org.apache.spark.scheduler.Task.run(Task.scala:89)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection reset by peer
            at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
            at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
            at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
            at sun.nio.ch.IOUtil.read(IOUtil.java:197)
            at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
            at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
            at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
            at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
            at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
            at java.io.FilterInputStream.read(FilterInputStream.java:133)
            at java.io.FilterInputStream.read(FilterInputStream.java:133)
            at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:554)
            at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
            at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
            at java.io.DataInputStream.readInt(DataInputStream.java:387)
            at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1116)
            at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1011)   

如前所述,我们面临这种异常的发生是非常间歇性的,当它发生时,我们的应用程序会卡住,导致我们不得不重新启动它。

我们尝试过的解决方案:

我们最初怀疑的是,在集群A中,我们过载了活动namenode,因为组件T确实同时打开了许多DFS客户端,并在不同文件上执行文件操作(没有同一文件的争用问题)。在解决这个问题的努力中,我们查看了namenode的两个关键参数dfs.namenode.handler.count和ipc.server.listen.queue.size,并将后者从128(默认值)提高到1024。
不幸的是,组件T仍然存在问题。我们开始采取不同的方法来解决这个问题。我们专注于找出发生连接重置原因的唯一原因。根据许多文章和堆栈交换讨论,该问题描述如下,“对等方设置了RST标志,导致立即终止连接”。在我们的案例中,我们确定对等方是集群A的namenode。
记住RST标志,我深入了解了TCP通信的内部工作原理,只涉及RST标志的原因。
Linux发行版(不包括BSD)中的每个套接字都有两个与之关联的队列,即接受和回溯队列。
在TCP握手过程中,所有请求都保留在回溯队列中,直到从开始建立连接的节点接收到ACK数据包。一旦接收到,请求就会转移到接受队列,打开套接字的应用程序可以开始从远程客户端接收数据包。
回溯队列的大小由两个内核级别参数控制,即net.ipv4.tcp_max_syn_backlog和net.core.somaxconn,而应用程序(在我们的情况下是namenode)可以请求内核所需的队列大小,受上限限制(我们认为接受队列大小是由ipc.server.listen.queue.size定义的队列大小)。
此外,这里还有另一个有趣的事情要注意,如果net.ipv4.tcp_max_syn_backlog的大小大于net.core.somaxconn,则前者的值将被截断为后者的值。该声明基于Linux文档,并可在https://linux.die.net/man/2/listen中找到。
回到重点,当回溯完全填满时,TCP会以两种方式行为,并且此行为也可以通过称为net.ipv4.tcp_abort_on_overflow的内核参数进行控制。默认情况下设置为0,当回溯队列已满时,导致内核丢弃任何新的SYN数据包,这反过来让发送方重新发送SYN数据包。当设置为1时,内核将在数据包中标记RST标志并将其发送给发送方,从而突然终止连接。
我们检查了上述内核参数的值,并发现所有机器上的net.core.somaxconn均设置为1024,net.ipv4.tcp_abort_on_overflow设置为0,net.ipv4.tcp_max_syn_backlog设置为4096,包括两个集群中的所有机器。
现在我们唯一怀疑的是将集群A与集群B连接的交换机,因为任何一个集群中的机器都不会设置RST标志,因为参数net.ipv4.tcp_abort_on_overflow设置为0。

我的问题

  • 从HDFS文档中可以看出,DFS客户端使用RPC与namenode通信执行文件操作。每个RPC调用都需要与namenode建立TCP连接吗?
  • 参数ipc.server.listen.queue.size是否定义了namenode接受RPC请求的套接字的accept队列长度?
  • 当namenode负载过重时,它能否隐式关闭与DFS客户端的连接,从而使内核发送一个设置了RST标志的数据包,即使内核参数net.ipv4.tcp_abort_on_overflow设置为0?
  • L2或L3交换机(用于连接我们两个集群中的机器)能否设置RST标志,因为它们无法处理突发流量?

我们解决这个问题的下一步方法是通过使用tcpdump或wireshark分析数据包来确定哪台机器或交换机(没有路由器参与)正在设置RST标志。我们还将将上述所有队列的大小增加到4096,以有效处理突发流量。

名称节点日志没有显示任何异常,除了在某些时间点上看到的Ambari中的名称节点连接负载峰值,并不一定是发生连接重置异常时发生的。

总之,我想知道我们是否朝着解决这个问题的正确方向前进,还是只会走上死路?

附言:对于我的问题,内容有些长,我想在请求任何帮助或建议之前向读者呈现整个上下文。感谢您的耐心阅读。

1个回答

3

首先,您的网络确实可能存在异常情况,也许您可以通过所提到的步骤来追踪问题的根源。

话虽如此,在看到这些步骤时,我个人认为有一些反直觉的事情发生了。

您目前让步骤T执行转换和最脆弱的集群内传输。也许您正在经历比其他人更糟糕的可靠性问题,但我会认真考虑将复杂的部分和脆弱的部分分离。

如果您这样做(或者只是将工作分成较小的块),那么就可以很容易地设计出解决方案,使其脆弱的步骤在某些情况下可能会失败,但当这种情况发生时,它将简单地重试。当然,重试的代价非常小,因为只需要重试一个小部分的工作。


总之:解决连接问题可能有所帮助,但如果可能的话,最好设计一种间歇性失败的解决方案。


我们已经为所有组件配置了重试,并且还分离了脆弱的部分。这是由于本地集群网络升级引起的问题。感谢您的回复。 - Aniketh Jain
@aniketh 很高兴听到这个。如果答案有用,您可以通过接受和/或投票来向其他人表明这一点。 - Dennis Jaheruddin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接