从Storm bolt向HBase插入行

3
我希望能够从分布式(而非本地)Storm拓扑向HBase编写新条目。已经存在一些GitHub项目,提供HBase Mappers预制的Storm bolts来将Tuples写入HBase。这些项目为在LocalCluster上执行其示例提供说明。
我遇到的问题是,无论是使用这两个项目还是直接从bolt访问HBase API,都需要将HBase-site.xml文件包含在类路径中。使用直接API方法,并且也许用GitHub的方法,当您执行HBaseConfiguration.create();时,它将尝试从类路径上的一个条目中找到所需的信息。
如何修改storm bolt的类路径以包含Hbase配置文件?
更新:使用danehammer的答案,以下是我使其正常工作的方式
将以下文件复制到您的〜/.storm目录中:
  • hbase-common-0.98.0.2.1.2.0-402-hadoop2.jar
  • hbase-site.xml
  • storm.yaml: 注意:如果您不将storm.yaml复制到该目录中,则storm jar命令将不会在类路径中使用该目录(请查看storm.py python脚本,以了解自己的逻辑 - 如果有文档记录会很好)

接下来,在拓扑类的主方法中获取HBase配置并进行序列化:

final Configuration hbaseConfig = HBaseConfiguration.create();
final DataOutputBuffer databufHbaseConfig = new DataOutputBuffer();
hbaseConfig.write(databufHbaseConfig);
final byte[] baHbaseConfigSerialized = databufHbaseConfig.getData();

通过构造函数将字节数组传递给您的 Spout 类。Spout 类将此字节数组保存到字段中(不要在构造函数中反序列化。我发现如果 Spout 有一个 Configuration 字段,则在运行拓扑时会出现无法序列化异常)
在 Spout 的 open 方法中,反序列化配置并访问 HBase 表:
Configuration hBaseConfiguration = new Configuration();
ByteArrayInputStream bas = new ByteArrayInputStream(baHbaseConfigSerialized);
hBaseConfiguration.readFields(new DataInputStream(bas));
HTable tbl = new HTable(hBaseConfiguration, HBASE_TABLE_NAME);

Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("YOUR_COLUMN"));

scnrTbl = tbl.getScanner(scan);

现在,在您的nextTuple方法中,您可以使用Scanner获取下一行:
Result rsltWaveform = scnrWaveformTbl.next();

从结果中提取所需内容,并将这些值传递到一些可序列化对象中以供处理。

在构造函数中不对字节数组进行反序列化,再加上一个。 - Kali_89
1个回答

2
当您使用“storm jar”命令部署拓扑时,~/.storm文件夹将位于类路径上(请参见this link下的jar命令)。如果您将hbase-site.xml文件(或相关的*-site.xml文件)放置在该文件夹中,则“storm jar”期间的HBaseConfiguration.create()会找到该文件并正确地返回一个org.apache.hadoop.configuration.Configuration。这需要在您的拓扑中存储和序列化,以便在集群中分发该配置。

你是说你会在拓扑类中创建HBaseConfiguration,然后序列化并传递给Bolt(可能是在setBolt(...).addConfiguration方法中传递)? - Steven Magana-Zook

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接