java.lang.IllegalArgumentException: 错误的文件系统: , 期望的是: hdfs://localhost:9000。

5
我正在尝试实现reduce端连接,并使用mapfile reader查找分布式缓存,但在stderr中检查时显示以下错误,lookupfile文件已经存在于hdfs中,并且似乎已经正确加载到缓存中,如stdout所示。
java.lang.IllegalArgumentException: Wrong FS: file:/app/hadoop/tmp/mapred/local/taskTracker/distcache/-8118663285704962921_-1196516983_170706299/localhost/input/delivery_status/DeliveryStatusCodes/data, expected: hdfs://localhost:9000 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:390) at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:554) at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:816) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1479) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1474) at org.apache.hadoop.io.MapFile$Reader.createDataFileReader(MapFile.java:302) at org.apache.hadoop.io.MapFile$Reader.open(MapFile.java:284) at org.apache.hadoop.io.MapFile$Reader.(MapFile.java:273) at org.apache.hadoop.io.MapFile$Reader.(MapFile.java:260) at org.apache.hadoop.io.MapFile$Reader.(MapFile.java:253) at mr_poc.reducerrsj.initializeDepartmentsMap(reducerrsj.java:59) at mr_poc.reducerrsj.setup(reducerrsj.java:42) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:174) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190) at org.apache.hadoop.mapred.Child.main(Child.java:249) java.lang.NullPointerException at mr_poc.reducerrsj.buildOutputValue(reducerrsj.java:83) at mr_poc.reducerrsj.reduce(reducerrsj.java:127) at mr_poc.reducerrsj.reduce(reducerrsj.java:1) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.
这是我的驱动程序代码,
package mr_poc;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class driverrsj extends Configured implements Tool{

    @Override
    public int run(String[] arg) throws Exception {
if(arg.length != 3)
{
    System.out.printf("3 parameters are required for DriverRSJ- <Input Dir1> <Input Dir2> <Output Dir> \n");
    return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
DistributedCache.addCacheFile(new URI("/input/delivery_status"), conf);
System.out.println("Cache : " + job.getConfiguration().get("mapred.cache.files"));
job.setJarByClass(driverrsj.class);
conf.setInt("cust_info", 1);
conf.setInt("status", 2);
StringBuilder inputPaths = new StringBuilder();
inputPaths.append(arg[0].toString()).append(",").append(arg[1].toString());
FileInputFormat.setInputPaths(job, inputPaths.toString());
FileOutputFormat.setOutputPath(job, new Path(arg[2]));
job.setJarByClass(driverrsj.class);
job.setMapperClass(mappperRSJ.class);
job.setReducerClass(reducerrsj.class);
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class);
job.setMapOutputValueClass(Text.class);
//job.setPartitionerClass(partinonrsj.class);
job.setSortComparatorClass(secondarysortcomp.class);
job.setGroupingComparatorClass(GroupingComparatorRSJ.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);


boolean success =job.waitForCompletion(true);
return success? 0 : 1;

    }
    
    public static void main(String[] args) throws Exception{
        int exitCode = ToolRunner.run(new Configuration(), new driverrsj(),args);
        System.exit(exitCode);
        
    }
    

}

这是我的 reducer 代码。
包 mr_poc;
import java.io.File;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class reducerrsj extends Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text>{
    StringBuilder reduceValueBuilder = new StringBuilder("");
    NullWritable nullWritableKey = NullWritable.get();
    Text reduceOutputValue = new Text("");
    String strSeparator = ",";
    private MapFile.Reader deptMapReader = null;
    Text txtMapFileLookupKey = new Text();
    Text txtMapFileLookupValue = new Text();
    //Path[] cacheFilesLocal;
    //Path[] eachPath;
    
    @Override
    protected void setup(Context context) throws IOException,InterruptedException {
        Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
        
        
        
        for ( Path eachPath : cacheFiles){
            
            System.out.println(eachPath.toString());
              System.out.println(eachPath.getName());
            if(eachPath.getName().toString().contains("delivery_status"))
            {
                  
                URI uriUncompressedFile = new File(eachPath.toString()+ "/DeliveryStatusCodes").toURI();
                initializeDepartmentsMap(uriUncompressedFile, context);
            
            }
            }
        }
    
    //@SuppressWarnings("deprecation")
    private void initializeDepartmentsMap(URI uriUncompressedFile, Context context)
    throws IOException {
    // {{
    // Initialize the reader of the map file (side data)
        Configuration conf = context.getConfiguration();
        conf.addResource(new Path("/usr/local/hadoop-1.2.1/conf/core-site.xml"));
        FileSystem dfs = FileSystem.get(conf);
    try {
        
        
    deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration());
    } catch (Exception e) {
    e.printStackTrace();
    }
    // }}
    }
    private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key,
            StringBuilder reduceValueBuilder, Text value) {
             
            if (key.getsourceindex() == 2) {
            
             
            String arrSalAttributes[] = value.toString().split(",");
            txtMapFileLookupKey.set(arrSalAttributes[0].toString());
            System.out.println("key=" + txtMapFileLookupKey);
            
            
            try {
                
            deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue);
            }
             catch (Exception e) {
            txtMapFileLookupValue.set("");
                e.printStackTrace();
            } finally {
            txtMapFileLookupValue
            .set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue
            .equals("")) ? "NOT-FOUND"
            : txtMapFileLookupValue.toString());
            }
        
            reduceValueBuilder.append(txtMapFileLookupValue.toString());
            
             
            } else if(key.getsourceindex() == 1) {
        
            String arrEmpAttributes[] = value.toString().split(",");
            reduceValueBuilder.append(arrEmpAttributes[0].toString()).append(
            strSeparator);
            } 
            
             
            
            txtMapFileLookupKey.set("");
            txtMapFileLookupValue.set("");
            
            return reduceValueBuilder;
    }
     
    @Override
    public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values,
    Context context) throws IOException, InterruptedException {
     
    
    for (Text value : values) {
    buildOutputValue(key, reduceValueBuilder, value);
    }
     
    // Drop last comma, set value, and emit output
    if (reduceValueBuilder.length() > 1) {
     
    //reduceValueBuilder.setLength(reduceValueBuilder.length() - 1);
    // Emit output
    reduceOutputValue.set(reduceValueBuilder.toString());
    context.write(nullWritableKey, reduceOutputValue);
    } else {
    System.out.println("Key=" + key.getjoinkey() + "src="
    + key.getsourceindex());
     
    }
    // Reset variables
    reduceValueBuilder.setLength(0);
    reduceOutputValue.set("");
     
    }
    @Override
    protected void cleanup(Context context) throws IOException,
    InterruptedException {
         if(deptMapReader != null)
         {
deptMapReader.close();
    }
    }
}

这是我的 core-site.xml 文件。

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/app/hadoop/tmp</value>
  <description>A base for other temporary directories.</description>
</property>
<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:9000</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>
</configuration>

任何帮助都将不胜感激。提前感谢!!!
3个回答

10

我也遇到了同样的问题,我通过添加代码解决了这个问题。

FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"),conf)

在驱动程序类中。

您需要从java.net.URI导入URI


4

您应该根据您的core-site.xml文件设置conf属性,像这样:

conf.set("fs.defaultFS", "hdfs://host:port");
conf.set("mapreduce.jobtracker.address", "host:port");

0
请在作业运行器中包含以下行:

DistributedCache.addCacheFile(new URI(“”),conf);

请在mapper的设置方法中包含以下代码。
@Override
protected void setup(Context context) throws IOException, InterruptedException {
    Configuration configuration = new Configuration();
    FileSystem fileSystem = null;
    try {
         fileSystem = FileSystem.get(new URI("<File location"),configuration);
    } catch (URISyntaxException e) {
        e.printStackTrace();
    }

    String location = <S3 file location>;
    FSDataInputStream fsDataInputStream =fileSystem.open(new Path(location));
    Scanner scanner = new Scanner(fsDataInputStream);
    int i = 1;
    while(scanner.hasNextLine()) {
        String str[] = scanner.nextLine().split(",");
        LOG.info("keys are \t" + str[0] + str[1]);
        stickerMap.put(str[0] + str[1], i);
        ++i;
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接