使用AccumuloRowInputFormat时,Accumulo MapReduce作业失败,出现java.io.EOFException

3

我所有的映射器都出现了以下异常。为了简洁起见,我仅展示了最后一次失败。

为什么会发生这种情况,我该如何解决?

16/09/21 17:01:57 INFO mapred.JobClient: Task Id : attempt_201609151451_0044_m_000002_2, Status : FAILED
java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readUTF(DataInputStream.java:609)
    at java.io.DataInputStream.readUTF(DataInputStream.java:564)
    at org.apache.accumulo.core.client.mapreduce.RangeInputSplit.readFields(RangeInputSplit.java:154)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
    at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:356)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:640)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
    at org.ap
16/09/21 17:02:00 INFO mapred.JobClient: Job complete: job_201609151451_0044
16/09/21 17:02:00 INFO mapred.JobClient: Counters: 8
16/09/21 17:02:00 INFO mapred.JobClient:   Job Counters
16/09/21 17:02:00 INFO mapred.JobClient:     Failed map tasks=1
16/09/21 17:02:00 INFO mapred.JobClient:     Launched map tasks=48
16/09/21 17:02:00 INFO mapred.JobClient:     Data-local map tasks=13
16/09/21 17:02:00 INFO mapred.JobClient:     Rack-local map tasks=35
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all maps in occupied slots (ms)=343982
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all reduces in occupied slots (ms)=0
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0

我正在使用Accumulo表作为我的输入数据。我的设置如下:

@Override
public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    String idMapFileContent = readResourceFile(TYPE_ID_MAP_FILENAME);
    conf.set(TYPE_ID_MAP_KEY, idMapFileContent);

    Job job = Job.getInstance(conf, this.getClass().getSimpleName());
    job.setJarByClass(this.getClass());
    job.setMapperClass(DanglingLinksFinderMapper.class);
    job.setReducerClass(DanglingLinksFinderReducer.class);
    this.setupRowInputFormat(job);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    Path out = new Path(args[0]);
    LOGGER.info("Writing to output directory: " + out.toUri());
    FileOutputFormat.setOutputPath(job, out);

    int exitCode = job.waitForCompletion(true) ? 0 : 1;
}

private Job setupRowInputFormat(Job job)
        throws IOException, AccumuloSecurityException
{
    job.setInputFormatClass(AccumuloRowInputFormat.class);
    Configuration conf = job.getConfiguration();

    AccumuloConnectInfo connectInfo = new AccumuloConnectInfo(conf);
    LOGGER.info(connectInfo.toString());

    AccumuloRowInputFormat.setZooKeeperInstance(job, connectInfo.getInstanceNames(), connectInfo.getZookeeperInstanceNames());
    AccumuloRowInputFormat.setConnectorInfo(job, connectInfo.getUserName(), connectInfo.getPassword());
    AccumuloRowInputFormat.setScanAuthorizations(job, new Authorizations());
    AccumuloRowInputFormat.setInputTableName(job, TABLE_NAME);
    return job;
}

我正在使用Hadoop 2.6.0,Accumulo 1.5.0和Java 1.7。
我前几天曾经成功运行过,而且(据我所知)没有更改任何东西。所以我认为可能与我正在运行的服务器上的配置或数据状态有关?在我的本地机器上的Docker容器上运行测试表时,作业可以正常工作,但在远程测试服务器上失败了。
我可以登录`accumulo shell`并扫描我正在处理的表。那里面一切看起来都很好。我还尝试在测试服务器上运行压缩,这也正常工作,但未解决问题。

你使用的 Accumulo 版本是什么? - elserj
更新了答案以包含Accumulo版本。 - Mack
谢谢!顺便说一句,我极其强烈地建议您升级到新版本。1.5已经很久不再支持了。 - elserj
1个回答

2
我猜你使用的累积卡库版本与你为MapReduce作业本身(Mapper/Reducer)使用的库(通过DistributedCache或libjars CLI选项)不匹配。
因为您未指定范围,所以AccumuloInputFormat将自动获取表中所有Tablet边界,并创建与表中Tablet数量相同的RangeInputSplit对象。这个分割创建是在本地JVM中完成的(当您提交作业时创建的JVM)。这些RangeInputSplit对象被序列化并传递到YARN。
提供的错误是当Mapper获取其中一个序列化的RangeInputSplit对象并尝试反序列化它时发生的。由于序列化的数据不足以反序列化Mapper运行中的Accumulo版本期望读取的内容,因此出现此问题。
这可能只是您所使用的Accumulo版本中的序列化错误(请分享该版本),但我没有听说过这样的错误。我猜想本地类路径和Mapper类路径上的Accumulo版本存在差异。

“jar mismatch” 的概念对我来说很有道理。但是,我实际上没有将任何依赖项打包到我正在使用的 jar 文件中,并且我正在使用 accumulo 的 tool.sh 脚本运行该 jar,我相信该脚本会将安装在 Accumulo 中的 jar 加载到类路径中。尽管如此,这并不能消除由该脚本加载的 jar 或错误的环境设置所加载的 jar 之间可能存在冲突的可能性。 - Mack
我已经与我们的系统管理员核实,这绝对是一个jar不匹配的问题。显然,在配置测试服务器时,安装了Accumulo 1.5.4。然后它被降级到1.5.0,但是一些最初的jar文件仍然保留在原地。我认为这会导致由于此已知不兼容性而导致失败。 - Mack
太好了。很高兴你解决了它。 - elserj

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接