猪JVM Java堆空间错误

4

我正在尝试运行一个调用Java编写的用户定义函数的Pig脚本。我试图使用一个非常小的264字节文件来测试此脚本。但是,我最终遇到了Java堆空间错误并且作业失败了。我尝试使用-Xms1024M选项运行作业,它可以运行较小的文件,但在处理较大的文件时会失败。 即使我的集群足够强大,也不应该因为这样的小文件而出问题,我想知道如何修复这个内存泄漏问题。 请问有人能帮忙吗?

import java.util.HashMap;
import java.lang.annotation.Annotation;
import java.lang.reflect.Array;
import java.lang.reflect.Method;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.text.*;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.*;


import com.tictactec.ta.lib.CoreAnnotated;
import com.tictactec.ta.lib.MAType;
import com.tictactec.ta.lib.MInteger;
import com.tictactec.ta.lib.RetCode;
import com.tictactec.ta.lib.meta.annotation.InputParameterInfo;
import com.tictactec.ta.lib.meta.annotation.InputParameterType;
import com.tictactec.ta.lib.meta.annotation.OptInputParameterInfo;
import com.tictactec.ta.lib.meta.annotation.OptInputParameterType;
import com.tictactec.ta.lib.meta.annotation.OutputParameterInfo;
import com.tictactec.ta.lib.meta.annotation.OutputParameterType;

public class taLib extends EvalFunc<DataBag> 
{

    private static final int MIN_ARGS = 3;

    public static CoreAnnotated core = new CoreAnnotated();
    private static Method func_ref = null;

    public DecimalFormat df = new DecimalFormat("#.###");


    public DataBag exec(Tuple args) throws IOException 
    {

        DataBag input=null;
        MInteger outStart = new MInteger();
        MInteger outLen = new MInteger();
        Map<String,Object>outputParams=new HashMap<String, Object>();
        String func_name;
        List<Integer> ip_colmns= new ArrayList<Integer>();
        List<double[]>ip_list=new ArrayList<double[]>();
        List<String>opt_type=new ArrayList<String>();
        List<Object>opt_params=new ArrayList<Object>();
        //////

        long m1=Runtime.getRuntime().freeMemory();
        System.out.println(m1);
        long m2=Runtime.getRuntime().totalMemory();
        System.out.println(m2);
        //////
        int ip_noofparams=0;
        int op_noofparams=0;
        int opt_noofparams=0;

        if (args == null || args.size() < MIN_ARGS)
            throw new IllegalArgumentException("talib: must have at least " +   
MIN_ARGS + "  args");

        if(args.get(0) instanceof DataBag)
        {input = (DataBag)args.get(0);}
        else{throw new IllegalArgumentException("Only a valid bag name can be 
passed");}

        // get no of fields in bag
        Tuple t0=input.iterator().next();
        int fields_in_bag=t0.getAll().size();

        if(args.get(1) instanceof String)
        {func_name = (String)args.get(1);}
        else{throw new IllegalArgumentException("Only valid function name can be 
passed at arg 1");}
        func_ref=methodChk(func_name);

        if (func_ref == null) {
            throw new IllegalArgumentException("talib: function " 
                               + func_name + " was not found");
        }

        for (Annotation[] annotations : func_ref.getParameterAnnotations()) 
             {
              for (Annotation annotation : annotations) 
               {
                if(annotation instanceof InputParameterInfo)
                {
                 InputParameterInfo inputParameterInfo = 
(InputParameterInfo)annotation;


if(inputParameterInfo.type().equals(InputParameterType.TA_Input_Price))
                   {

ip_noofparams=numberOfSetBits(inputParameterInfo.flags());
                   }
                    else
                    {
                    ip_noofparams++;
                    }
                }
                if(annotation instanceof OptInputParameterInfo)
                {
                    OptInputParameterInfo optinputParameterInfo=
(OptInputParameterInfo)annotation;
                    opt_noofparams++;
                    if 
(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerRange))
                    {
                        opt_type.add("Integer");
                    }
                    else 
 if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_RealRange))
                    {
                        opt_type.add("Double");
                    }
                    else  
 if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerList))
                    {
                        opt_type.add("String");
                    }
                    else{throw new IllegalArgumentException("whoopsie ...serious 
 mess in opt_annotations");}

                }
                if (annotation instanceof OutputParameterInfo) 
                {
                            OutputParameterInfo outputParameterInfo = 
 (OutputParameterInfo) annotation;
                            op_noofparams++;
                    if 
 (outputParameterInfo.type().equals(OutputParameterType.TA_Output_Real)) 
                    {
                              outputParams.put(outputParameterInfo.paramName(), new 
 double[(int) input.size()]);
                    }
                    else if    
 (outputParameterInfo.type().equals(OutputParameterType.TA_Output_Integer)) 
                    {
                              outputParams.put(outputParameterInfo.paramName(), new  
  int[(int)input.size()]);
                            }
                } 
               }
            }

        int total_params =ip_noofparams+opt_noofparams;
        if((args.size()-2)!=total_params){throw new IllegalArgumentException("Wrong   
 no of argumets passed to UDF");}


         // get the ip colmns no's 
        for(int i=2;i<(2+ip_noofparams);i++)
        {   
           if(args.get(i) instanceof Integer )
             {
              if((Integer)args.get(i)>=0 && (Integer)args.get(i)<fields_in_bag)
            {
            ip_colmns.add((Integer) args.get(i));
            }
            else{throw new IllegalArgumentException("The input colmn specified 
 is invalid..please enter a valid colmn no:0-"+(fields_in_bag-1));}     
             }  
               else{throw new IllegalArgumentException("Wrong arguments entered: 
 Only"+ip_noofparams+"field no's of type(integer) allowed for fn"+func_name ); }

         }

        // create a list of ip arrays 
        for(int i=0;i<ip_colmns.size();i++)
        {
         ip_list.add((double[]) Array.newInstance(double.class, (int)input.size()));
        }
        int z=0;
        int x=0;
        // fill up the arrays
        for(Tuple t1: input)
        {       

            Iterator<double[]> itr=ip_list.iterator();
            z=0;
            while(itr.hasNext())
            {

             if((Double)t1.get(ip_colmns.get(z)) instanceof Double)
             {
              ((double[])itr.next())[x]=(Double) t1.get(ip_colmns.get(z++));
             }
             else{throw new IllegalArgumentException("Illegal argument while 
 filling up array...only double typr allowed");}
            }
            x++;
        }

        //deal with opt params
        int s=0;
        for(int i=(2+ip_noofparams);i<(2+ip_noofparams+opt_noofparams);i++)
        {



 if(opt_type.get(s).equalsIgnoreCase(args.get(i).getClass().getSimpleName().toString()))
            {       
                if(opt_type.get(s).equalsIgnoreCase("String"))
                {
                    String m=args.get(i).toString().toLowerCase();
                    String ma=m.substring(0, 1).toUpperCase();
                    String mac=m.substring(1);
                    String macd=ma+mac;
                    MAType type =MAType.valueOf(macd);
                    opt_params.add(type);
                    s++;
                }

                else{
                    opt_params.add(args.get(i));
                    s++;
                    }

            }
            else if(opt_type.get(s).equalsIgnoreCase("Double"))
                    {


 if(args.get(i).getClass().getSimpleName().toString().equalsIgnoreCase("Integer"))
                     {
                    opt_params.add((Double)((Integer)args.get(i)+0.0));
                    s++;
                     } 
                  else{throw new IllegalArgumentException("Opt arguments do   
 not match for fn:"+func_name+", pls enter opt arguments in right order"); }
                    }
            else{throw new IllegalArgumentException("Opt arguments do not match 
 for fn:"+func_name+", pls enter opt arguments in right order");}

        } 


        List<Object> ta_argl = new ArrayList<Object>();
        ta_argl.add(new Integer(0));
        ta_argl.add(new Integer((int)input.size() - 1));
        for(double[]in: ip_list)
        {
         ta_argl.add(in);
        }

        if(opt_noofparams!=0)
        {ta_argl.addAll(opt_params);}
        ta_argl.add(outStart);
        ta_argl.add(outLen);

        for (Map.Entry<String, Object> entry : outputParams.entrySet()) 
            {
             ta_argl.add(entry.getValue());
            }


            RetCode rc = RetCode.Success;
        try {
            rc = (RetCode)func_ref.invoke(core, ta_argl.toArray());
            } catch (Exception e) 
                    {
            assert false : "I died in ta-lib, but Java made me a zombie...";
            }

        assert rc == RetCode.Success : "ret code from " + func_name;



        if (outLen.value == 0) return null;

        //////
        DataBag ret=null;
        ret =outTA(input,outputParams,outStart);
        outputParams.clear();
        ip_list.clear();
        opt_params.clear();
        opt_type.clear();
        ip_colmns.clear();
        Runtime.getRuntime().gc();
        return ret;

    }





    public DataBag outTA(DataBag bag,Map<String, Object> outputParams,MInteger outStart)
    {
        DataBag nbag=null;
        TupleFactory mTupleFactory=TupleFactory.getInstance();
        BagFactory mBagFactory=BagFactory.getInstance();
        nbag=mBagFactory.newDefaultBag();
        Tuple tw=bag.iterator().next();
        int fieldsintup=tw.getAll().size();


        for(Tuple t0: bag)
        {
            Tuple t1=mTupleFactory.newTuple();

            for(int z=0;z<fieldsintup;z++)
            {
                try {
                    t1.append(t0.get(z));
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    System.out.println("Ouch");
                }
            }
            nbag.add(t1);
        }

        int i = 0;
        int j=0;        
        for (Tuple t2: nbag) 
        {
         if(i>=outStart.value)
          {
            for(Map.Entry<String,Object>entry: outputParams.entrySet())
            {
            t2.append(entry.getKey().substring(3).toString());

             if(entry.getValue() instanceof double[])
              {
               t2.append( new Double 
(df.format(((double[])entry.getValue())[j])));
              }
             else if(entry.getValue() instanceof int[])
              {
               t2.append( ((int[])entry.getValue())[j]);
              }
             else{throw new 
IllegalArgumentException(entry.getValue().getClass()+"not supported");}
            }       
            i++;j++;
          }
          else
           {t2.append(0.0);
            i++;    
               }

        }

        return nbag;
    }

    public Method methodChk(String fn)
    {
        String fn_name=fn;
        Method tmp_fn=null;
        for (Method meth: core.getClass().getDeclaredMethods()) 
     {
      if (meth.getName().equalsIgnoreCase(fn_name)) 
       {
        tmp_fn = meth;
        break;
       }
     }
        return tmp_fn;
    }


    public int numberOfSetBits(int i) {
        i = i - ((i >> 1) & 0x55555555);
        i = (i & 0x33333333) + ((i >> 2) & 0x33333333);
        return ((i + (i >> 4) & 0xF0F0F0F) * 0x1010101) >> 24;
    }

}

你能否在某个地方发布你的UDF代码?或者至少描述一下它在做什么? - Chris White
UDF基本上是根据用户传递给UDF的参数,在时间序列数据上计算函数(由用户指定)。我不认为我可以发布代码...它很大,但我可以发布我遇到的错误。 - user1426777
我问这个问题是因为如果你将值累加到一个集合中,而且这个集合变得很大,那么你很容易就会耗尽内存——也许你没有在调用之间清除集合/收集器,也许你正在追加到缓冲区而没有清除它等等。在你提供的当前信息量下,没有人能帮助你——这就像我说“我的车坏了,请修理它”,但我不能让你看或触摸它一样。 - Chris White
我试图在这里发布错误,但它不允许我自己回答问题,而且评论字段的字符数有限。但你是对的,我正在将值累加到HashMap中,但在调用结束时它被清除了,我所有其他的集合也是如此。 - user1426777
我还没有尝试过使用gzip,但我可以试一试。 - user1426777
显示剩余4条评论
1个回答

4

1
我设置了mapred作业的选项,所以我的命令看起来像这样: pig -Dpig.mapred.child.java.opts=-Xms1024M <jobname.pig> - user1426777
好的,经过一些尝试后,我可以确定是bz2压缩导致了堆空间问题...所以我转而使用gzip...看起来可以解决问题。 - user1426777
因此感谢您的建议...但是现在每当我尝试运行一个8 GB的较大文件时,某些原因会导致作业崩溃,并显示此错误。 - user1426777
org.apache.pig.backend.executionengine.ExecException: 错误2135:存储函数收到错误。与firstBadLink 165.222.220.222:50010的连接应答失败。 位于org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore.getNext(POStore.java:151) org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.runPipeline(POSplit.java:254) org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.processPlan(POSplit.java:236) org.apache.pig.backend.hadoop.executionengine.physicalLayer.relaton - user1426777

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接