我可能听起来很幼稚,但这是我最近在项目中遇到的问题。需要更好地理解它。
df.persist(StorageLevel.MEMORY_AND_DISK)
每当我们在HBase读取时使用这样的persist - 对于流作业的其他后续批次,相同的数据会再次返回,但是HBase会在每个批次运行时进行更新。
HBase读取代码:
val df = sqlContext.read.options(Map(HBaseTableCatalog.tableCatalog -> schema)).format(dbSetup.dbClass).load().persist(StorageLevel.MEMORY_AND_DISK)
我用
cache()
代替了persist(StorageLevel.MEMORY_AND_DISK)
,从HBase表返回的更新记录与预期一样。我们尝试使用
persist(StorageLevel.MEMORY_AND_DISK)
的原因是为了确保内存存储不会满,并且我们不会在执行特定流程时重新进行所有转换。
Spark版本 - 1.6.3
HBase版本 - 1.1.2.2.6.4.42-1能有人解释一下这个,并帮助我更好地理解吗?
foreach
还是类似的东西?有什么理由坚持使用 1.6.3 吗?我怀疑它是否会得到很多关注(如果有的话)。 - Jacek Laskowskiforeach
或类似的东西。为了提供一个我们如何读取HBase的示例,请参考下面的链接。 [link]https://github.com/hortonworks-spark/shc/blob/master/examples/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseSource.scala请查看上面链接中的withCatalog方法。那应该会给你一个更好的想法。 - Dasarathy D R