使用mongoid ruby适配器分批查找mongoDB记录

42

如何在使用mongoDB和mongoid适配器的rails 3中批量查找mongo DB?我需要获取特定mongo DB集合中的所有记录并在solr中进行索引(用于搜索数据的初始索引)。

问题是,使用Model.all会将所有记录都读入内存中。当我遍历这些记录并在solr中进行索引时,我的内存被占满,进程停止运行。

我想要实现的是批量查找mongo,以便我可以每次迭代处理1,000条记录,将它们传递给solr进行索引,然后处理下一批1,000条记录。以下是我目前的代码:

Model.all.each do |r|
  Sunspot.index(r)
end

对于一个大约有1.5百万条记录的集合,这将占用8GB以上的内存并杀死进程。 在ActiveRecord中,有一个find_in_batches方法,它允许我将查询分成可管理的批次来控制内存。 但是,我似乎找不到类似于mongoDB/mongoid的东西。

我希望能够做类似以下的事情:

Model.all.in_batches_of(1000) do |batch|
  Sunpot.index(batch)
end

这将通过每次只处理可管理的问题集来缓解我的内存问题和查询困难。然而,在mongoDB中批量查找方面的文档很稀少。我看到很多关于批量插入的文档,但没有关于批量查找的文档。


1
你确定这个程序存在内存问题吗?Mongoid和底层的Mongo驱动程序已经使用游标批量查询,这可以保持内存占用较小。 - Ryan McGeary
1
顺便提一下,你应该将接受的答案更改为@RyanMcGeary的答案 - 这样,您问题的所有未来访问者都将看到正确的答案,没有人会实现已由驱动程序完成的手动优化。 - Aliaksei Kliuchnikau
6个回答

96

在 Mongoid 中,您不需要手动分批查询。

在 Mongoid 中,Model.all 返回一个 Mongoid::Criteria 实例。在对该 Criteria 调用 #each 时,会实例化一个 Mongo 驱动程序游标并用于迭代记录。这个底层的 Mongo 驱动程序游标已经批处理了所有的记录。默认情况下,batch_size 是 100。

要了解更多信息,请阅读来自 Mongoid 作者和维护者的此评论

总之,您只需要这样做:

Model.all.each do |r|
  Sunspot.index(r)
end

дёҚй”ҷпјҢйӮЈе…¶д»–зҡ„Enumerableж–№жі•еғҸmapжҲ–иҖ…collectе‘ўпјҹ - Bogdan Gusiev
1
@Edmund,“Hit”可能不是在这里使用的最好词语,因为它暗示着每次重新运行查询。这是一个数据库游标。将其视为以100个批次流式传输数据。 - Ryan McGeary
1
@RyanMcGeary,你回答中的链接已经失效了。你能否编辑/纠正一下? - p.matsinopoulos
1
@p.matsinopoulos 我花了一些时间才找到相同的评论。已经过去将近5年了,Mongoid已经从GitHub Issues切换到JIRA。我想我找到了适当的评论。 - Ryan McGeary
1
最近版本中,批处理大小通常从100开始,然后增加以减少对数据库的调用次数。这样做的好处是它适用于所有可枚举的方法,因此如果您想要在实际的Ruby批处理中获取记录(例如100个数组),您可以执行以下操作:Model.all.each_slice(100) { |array| ... } - Adrien Rey-Jarthon
显示剩余4条评论

8

如果您正在迭代一个需要进行大量处理的集合(例如为每个项目查询外部API),则光标可能会超时。在这种情况下,您需要执行多个查询以便不要保持光标处于打开状态。

require 'mongoid'

module Mongoid
  class Criteria
    def in_batches_of(count = 100)
      Enumerator.new do |y|
        total = 0

        loop do
          batch = 0

          self.limit(count).skip(total).each do |item|
            total += 1
            batch += 1
            y << item
          end

          break if batch == 0
        end
      end
    end
  end
end

以下是可用于添加批处理功能的助手方法。可以像这样使用:

Post.all.order_by(:id => 1).in_batches_of(7).each_with_index do |post, index|
  # call external slow API
end

一定要在查询中始终使用order_by。否则,分页可能无法按照您所需的方式执行。此外,建议每次批处理100条或更少。正如在被接受的答案中所说,Mongoid每次查询100条记录以确保在处理期间不保留游标。


2
在标准中使用.no_timeout方法可以避免手动重新连接:Post.all.order_by(:id => 1).batch_size(7).no_timeout.each_with_index do ... - rewritten
在某些情况下,即使使用了“no_timeout”,它也不起作用;我不知道限制是什么,但从我观察到的情况来看,如果在集合上迭代大约2-3个小时,它将超时。 - Curious Sam

7

批量发送到 Sunspot 会更快。
以下是我的做法:

records = []
Model.batch_size(1000).no_timeout.only(:your_text_field, :_id).all.each do |r|
  records << r
  if records.size > 1000
    Sunspot.index! records
    records.clear
  end
end
Sunspot.index! records

no_timeout: 防止游标在默认10分钟后断开连接。

only: 仅选择已索引的ID和字段。

batch_size: 每次获取1000个条目而不是100个。


1
在循环结束后记得运行'Sunspot.index! records',否则你将无法索引最后一组小于1000的记录。 - matt walters
正确。我忘记复制这部分了。 - Mic92

2

我不太确定批处理,但你可以这样做

current_page = 0
item_count = Model.count
while item_count > 0
  Model.all.skip(current_page * 1000).limit(1000).each do |item|
    Sunpot.index(item)
  end
  item_count-=1000
  current_page+=1
end

但如果你正在寻找一个完美的长期解决方案,我不建议这样做。让我解释一下我是如何在我的应用程序中处理相同的情况的。与其执行批处理作业,

  • i have created a resque job which updates the solr index

    class SolrUpdator
     @queue = :solr_updator
    
     def self.perform(item_id)
       item = Model.find(item_id)
       #i have used RSolr, u can change the below code to handle sunspot
       solr = RSolr.connect :url => Rails.application.config.solr_path
       js = JSON.parse(item.to_json)
       solr.add js         
     end
    

    end

  • After adding the item, i just put an entry to the resque queue

    Resque.enqueue(SolrUpdator, item.id.to_s)
    
  • Thats all, start the resque and it will take care of everything

Ramesh,你提供的第一个代码块非常适合我的用例。这只是使用脚本文件进行一次加载和数据索引,因此在我特定的情况下使用resque可能过于复杂。但是批处理功能完美地运行! - Dan L
不需要这样做。Mongoid和底层Mongo驱动程序已经使用游标批处理查询了。这可以保持内存占用小。 - Ryan McGeary

-3

以下内容适用于您,只需尝试即可

Model.all.in_groups_of(1000, false) do |r|
  Sunspot.index! r
end

将所有数据库加载到内存中... 呃。这样做的整个目的是能够批量查询文档,如果您有400万个文档,首先将它们加载到单个数组中,然后再加载到另一个组数组中,将会使您的服务器崩溃。 - rewritten
@rewritten请检查上面的解决方案,和我提供的一样,他进行了解释。感谢Ryan McGeary的解释。 - ratnakar
"in_groups_of" 是 Rails 数组方法,使用时应将 "Model.all" 转换为数组,但这并不被推荐。-1 是为了警告人们不要这样做。 - Sebastián Palma

-3
正如@RyanMcGeary所说,您不需要担心批处理查询。然而,逐个索引对象比批量索引要慢得多。
Model.all.to_a.in_groups_of(1000, false) do |records|
  Sunspot.index! records
end

1
Model.all.to_a 会将整个集合加载到内存中。 - Brian Armstrong
没错,请不要这样做:当我们谈论大型数据集时,避免一次性将整个集合转换为数组:使用Model.find_each或分批处理,但永远不要使用Model.all.to_a - Adit Saxena
1
Model.find_each 不是 Mongoid 的方法。你应该使用 Model.all.each。 - Paul McClean

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接