如何在Hadoop Streaming中使用"typedbytes"或"rawbytes"?

5
我遇到了一个问题,需要使用Hadoop Streaming的"typedbytes"或"rawbytes"模式来解决,这样可以在非Java语言中分析二进制数据。(否则,Streaming会将某些字符(通常是\t和\n)解释为分隔符,并抱怨非UTF-8字符。将所有二进制数据转换为Base64会减慢工作流程,达不到预期目的。)
这些二进制模式是由HADOOP-1722添加的。在调用Hadoop Streaming作业的命令行上,"-io rawbytes"允许您将数据定义为32位整数大小,后跟该大小的原始数据;"-io typedbytes"允许您将数据定义为1位零(表示原始字节),后跟32位整数大小,后跟该大小的原始数据。我已经创建了这些格式的文件(包含一个或多个记录),并通过使用/对比typedbytes.py的输出来验证它们是否处于正确的格式中。我还尝试了所有可能的变化(大端、小端、不同的字节偏移等)。我正在使用CDH4中的Hadoop 0.20,它具有实现typedbytes处理的类,并且当设置"-io"开关时,它会进入这些类。
我使用"hadoop fs -copyFromLocal"将二进制文件复制到HDFS。当我尝试将其用作MapReduce作业的输入时,它在试图创建指定长度(例如3个字节)的字节数组的行上失败,并显示一个OutOfMemoryError。它可能会错误地读取数字并尝试分配一个巨大的块。尽管如此,它确实成功地将记录传递给Mapper(是前面的记录吗?不确定),Mapper将其写入标准错误,以便我可以看到它。记录开头始终有太多字节:例如,如果文件是"\x00\x00\x00\x00\x03hey",则Mapper将看到"\x04\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x00\x08\x00\x00\x00\x00\x03hey"(可重现的位,但我看不出任何规律)。

这个讲座的第5页我了解到,流处理中有“loadtb”和“dumptb”子命令,可以在一步中将类型化的字节复制到/从HDFS并将其包装/解包到SequenceFile中。当与“-inputformat org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat”一起使用时,Hadoop正确地解压缩了SequenceFile,但随后以完全相同的方式错误地解释了其中包含的类型化字节。

此外,我找不到任何关于此功能的文档。在2月7日(我将其电子邮件发送给自己),它在Apache上的streaming.html页面中简要提到,但是这个r0.21.0网页已被删除,r1.1.1的等效页面没有提到原始字节或类型化字节。

我的问题是:在Hadoop Streaming中使用rawbytes或typedbytes的正确方法是什么?有人曾经成功过吗?如果有,能否发布一份说明书?似乎这对于想要在Hadoop Streaming中使用二进制数据的任何人来说都是一个问题,而这应该是一个相当广泛的群体。
附言:我注意到DumboHadoopyrmr都使用了这个功能,但应该有一种直接使用它的方法,而不需要通过基于Python或R的框架进行介绍。

"\x04\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x00\x08\x00\x00\x00\x00\x03hey" 是一个长整型(类型 4),其值为 0,后面跟着一个长度为 8 的字符串(类型 7)("\x00\x00\x00\x00\x03hey")。这是一些 typedbytes 的编码值(0(类型),8(长度),字节)。 - Seth Fitzsimmons
3个回答

5

好的,我找到了一种奇怪但可行的组合。

  1. Prepare a valid typedbytes file in your local filesystem, following the documentation or by imitating typedbytes.py.

  2. Use

    hadoop jar path/to/streaming.jar loadtb path/on/HDFS.sequencefile < local/typedbytes.tb
    

    to wrap the typedbytes in a SequenceFile and put it in HDFS, in one step.

  3. Use

    hadoop jar path/to/streaming.jar -inputformat org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat ...
    

    to run a map-reduce job in which the mapper gets input from the SequenceFile. Note that -io typedbytes or -D stream.map.input=typedbytes should not be used--- explicitly asking for typedbytes leads to the misinterpretation I described in my question. But fear not: Hadoop Streaming splits the input on its binary record boundaries and not on its '\n' characters. The data arrive in the mapper as "rawdata" separated by '\t' and '\n', like this:

    1. 32-bit signed integer, representing length (note: no type character)
    2. block of raw binary with that length: this is the key
    3. '\t' (tab character... why?)
    4. 32-bit signed integer, representing length
    5. block of raw binary with that length: this is the value
    6. '\n' (newline character... ?)
  4. If you want to additionally send raw data from mapper to reducer, add

    -D stream.map.output=typedbytes -D stream.reduce.input=typedbytes
    

    to your Hadoop command line and format the mapper's output and reducer's expected input as valid typedbytes. They also alternate for key-value pairs, but this time with type characters and without '\t' and '\n'. Hadoop Streaming correctly splits these pairs on their binary record boundaries and groups by keys.

我找到的关于stream.map.outputstream.reduce.input唯一的文档是在HADOOP-1722交流中,从09年2月6日开始。(早期讨论考虑了一种不同的参数化格式。)
这个方法并没有为输入提供强类型:类型字符在创建SequenceFile和使用时丢失。但它确实在二进制记录边界处提供拆分,而不是'\n',这是真正重要的事情,并提供了mapper和reducer之间的强类型。

为了清晰起见,至少在hadoop 2.0.0-cdh4.2.0中,“原始二进制数据”块实际上是类型字节本身(即一个类型字节,一个长度,然后是数据)。这刚刚让我有点困惑。 - Danica
如果您的TypedBytes文件包含'\n',它是否仍然有效?您是否获得确切的字节或者它被改变了一些东西? - Y.H.
你会得到确切的字节,其中包含一些我不理解目的的附加字节。"\t"和"\n"包括在Hadoop提供给mapper(而不是reducer,那是一系列不同的字节)之前和之后的未修改字节中。你不需要向发送到loadtb的typedbytes添加额外的字节。你提供给loadtb的内容完全遵循规范。 - Jim Pivarski
我这周花了很多时间来研究这个问题,也许使用Hadoop(和Node.js)进行二进制流传输可以帮助更好地解决问题。特别是,我想我已经找到了如何使用-io typedbytes而不会破坏mapper或reducer输入的方法。(它还避免了引入“意外”的制表符或换行符,而是选择配对类型化字节。) - Seth Fitzsimmons

1
我们通过对二进制数据进行十六进制编码来解决在向 Mapper 流式传输数据时的数据问题。这将利用并增加操作的并行效率,而不是在节点上处理数据之前先转换数据。

感谢您的评论。在调查typedbytes之前,我正在进行即时的流式转换,将数据从base64编码转换为二进制,并将其转换回来(这可能就是您所描述的“hexaencoding”)。我试图避免所有的转换,以便一个二进制数据块可以按原样使用。 - Jim Pivarski
Jim,FYI,@ratang2000 只是拼写错误:它只是十六进制编码,意思是将字节转换为其十六进制字符串等效物。这与Base64编码不同。他的用法与此处提到的相同:https://issues.apache.org/jira/browse/HADOOP-1722。 - Subfuzion

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接