我希望能够从分布式(而非本地)Storm拓扑向HBase编写新条目。已经存在一些GitHub项目,提供HBase Mappers或预制的Storm bolts来将Tuples写入HBase。这些项目为在LocalCluster上执行其示例提供说明。
我遇到的问题是,无论是使用这两个项目还是直接从bolt访问HBase API,都需要将HBase-site.xml文件包含在类路径中。使用直接API方法,并且也许用GitHub的方法,当您执行
如何修改storm bolt的类路径以包含Hbase配置文件?
更新:使用danehammer的答案,以下是我使其正常工作的方式
将以下文件复制到您的〜/.storm目录中:
通过构造函数将字节数组传递给您的 Spout 类。Spout 类将此字节数组保存到字段中(不要在构造函数中反序列化。我发现如果 Spout 有一个 Configuration 字段,则在运行拓扑时会出现无法序列化异常)
在 Spout 的 open 方法中,反序列化配置并访问 HBase 表:
现在,在您的nextTuple方法中,您可以使用Scanner获取下一行:
从结果中提取所需内容,并将这些值传递到一些可序列化对象中以供处理。
我遇到的问题是,无论是使用这两个项目还是直接从bolt访问HBase API,都需要将HBase-site.xml文件包含在类路径中。使用直接API方法,并且也许用GitHub的方法,当您执行
HBaseConfiguration.create();
时,它将尝试从类路径上的一个条目中找到所需的信息。如何修改storm bolt的类路径以包含Hbase配置文件?
更新:使用danehammer的答案,以下是我使其正常工作的方式
将以下文件复制到您的〜/.storm目录中:
- hbase-common-0.98.0.2.1.2.0-402-hadoop2.jar
- hbase-site.xml
- storm.yaml: 注意:如果您不将storm.yaml复制到该目录中,则storm jar命令将不会在类路径中使用该目录(请查看storm.py python脚本,以了解自己的逻辑 - 如果有文档记录会很好)
接下来,在拓扑类的主方法中获取HBase配置并进行序列化:
final Configuration hbaseConfig = HBaseConfiguration.create();
final DataOutputBuffer databufHbaseConfig = new DataOutputBuffer();
hbaseConfig.write(databufHbaseConfig);
final byte[] baHbaseConfigSerialized = databufHbaseConfig.getData();
通过构造函数将字节数组传递给您的 Spout 类。Spout 类将此字节数组保存到字段中(不要在构造函数中反序列化。我发现如果 Spout 有一个 Configuration 字段,则在运行拓扑时会出现无法序列化异常)
在 Spout 的 open 方法中,反序列化配置并访问 HBase 表:
Configuration hBaseConfiguration = new Configuration();
ByteArrayInputStream bas = new ByteArrayInputStream(baHbaseConfigSerialized);
hBaseConfiguration.readFields(new DataInputStream(bas));
HTable tbl = new HTable(hBaseConfiguration, HBASE_TABLE_NAME);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("YOUR_COLUMN"));
scnrTbl = tbl.getScanner(scan);
现在,在您的nextTuple方法中,您可以使用Scanner获取下一行:
Result rsltWaveform = scnrWaveformTbl.next();
从结果中提取所需内容,并将这些值传递到一些可序列化对象中以供处理。