如何在X小时后高效地调用函数?

13

我知道我可以这样做:

func randomFunc() {
    // do stuff
    go destroyObjectAfterXHours(4, "idofobject")
    // do other stuff
}

func destroyObjectAfterXHours(hours int, id string) {
    time.Sleep(hours * time.Hour)
    destroyObject(id)
}

但是如果我们想象 destroyObjectAfterXHours 在几分钟内被调用了一百万次,这个解决方案将会非常糟糕。

我希望有人能分享一个更有效的解决方案。

我一直在思考一个潜在的解决方案,在那里销毁时间和对象ID被存储在某个地方,然后会有一个函数每隔X分钟遍历该列表,销毁需要销毁的对象,并从存储信息的位置中删除它们的ID和时间信息。这会是一个好的解决方案吗?

我担心这也会是一个糟糕的解决方案,因为您将不得不一直遍历一个包含数百万个项目的列表,然后必须有效地删除其中一些项目等等。


你能再描述一下你的使用情况吗?我问这个问题是因为可能有更优雅的解决方案。例如,MongoDB允许基于时间戳的TTL索引自动删除文档。 - Markus W Mahlberg
4个回答

30

time.AfterFunc 函数就是为这种情况设计的:

func randomFunc() {
    // do stuff
    time.AfterFunc(4*time.Hour, func() { destroyObject("idofobject") })
    // do other stuff
}

time.AfterFunc 非常高效且易于使用。

如文档所述,一旦经过持续时间,该函数会在goroutine中调用。与问题中预先创建的goroutine不同。


据我所理解,这将生成自己的goroutine,这意味着一百万个请求仍将导致创建一百万个goroutine? - fisker
1
是的,当计时器到期时,运行时会创建一个goroutine来运行该函数。就计时器到期而言,运行时实现非常高效(运行时维护了分桶堆的计时器)。 - Charlie Tumahai
1
在计时器到期后,将创建goroutine。在调用destroyObject之前很长时间就会执行do other stuff。如果destroyObject很快并且过期时间分散在一段时间内,您不需要担心成千上万的goroutine。 - Charlie Tumahai
这真的让我大吃一惊...我想不起来曾经见过应该如此复杂的东西变得如此简单...我会尝试测试它...再次感谢 :)! - fisker
1
AfterFunc 是一个阻塞调用吗? - Abhijit Sarkar
显示剩余2条评论

4
所以,我同意你的解决方案#2而不是第一个。
遍历一百万个数字列表比拥有一百万个单独的Go例程要容易得多。
与循环相比,Go例程是昂贵的,并且需要内存和处理时间。例如,一百万个Go例程需要大约4GB的RAM。
另一方面,遍历一个列表需要非常少的空间,并且在O(n)时间内完成。
一个很好的例子就是Go Cache这个精确的函数,它会定期运行Go例程来删除过期的元素。

https://github.com/patrickmn/go-cache/blob/master/cache.go#L931

这是一个更详细的例子,说明了他们如何做到这一点:
type Item struct {
    Object     interface{}
    Expiration int64
}


func (item Item) Expired() bool {
    if item.Expiration == 0 {
        return false
    }
   return time.Now().UnixNano() > item.Expiration
}

func RemoveItem(s []Item, index int) []int {
     return append(s[:index], s[index+1:]...)
}

func deleteExpired(items []Item){ 
    var deletedItems  []int
    for k, v := range items {
        if v.Expired(){
            deletedItems = append(deletedItems, k)
        }
    }
    for key, deletedIndex := range deleteditems{ 
        items = RemoveItem(items, deletedIndex)
    }
}

上述实现使用链表而不是数组肯定会更好,但这是一般思路。

1
顺便说一下,正如你所说,链表可能更好,我认为你应该在这里使用golang内置的双向链表,https://golang.org/pkg/container/list/ - John Balvin Arias

1
这是一个有趣的问题。我想到了一种解决方案,使用堆来维护待销毁物品的队列,并睡眠恰好的时间,直到下一个物品需要被销毁。我认为这样更有效率,但在某些情况下收益可能很小。无论如何,您可以在此处查看代码:
package main
import (
    "container/heap"
    "fmt"
    "time"
)

type Item struct {
    Expiration time.Time
    Object     interface{} // It would make more sence to be *interface{}, but not as convinient
}

//MINIT is the minimal interval for delete to run. In most cases, it is better to be set as 0
const MININT = 1 * time.Second

func deleteExpired(addCh chan Item) (quitCh chan bool) {
    quitCh = make(chan bool)
    go func() {
        h := make(ExpHeap, 0)
        var t *time.Timer

        item := <-addCh
        heap.Push(&h, &item)
        t = time.NewTimer(time.Until(h[0].Expiration))

        for {
            //Check unfinished incoming first
            for incoming := true; incoming; {
                select {
                case item := <-addCh:
                    heap.Push(&h, &item)
                default:
                    incoming = false
                }
            }
            if delta := time.Until(h[0].Expiration); delta >= MININT {
                t.Reset(delta)
            } else {
                t.Reset(MININT)
            }

            select {
            case <-quitCh:
                return
            //New Item incoming, break the timer
            case item := <-addCh:
                heap.Push(&h, &item)
                if item.Expiration.After(h[0].Expiration) {
                    continue
                }
                if delta := time.Until(item.Expiration); delta >= MININT {
                    t.Reset(delta)
                } else {
                    t.Reset(MININT)
                }
            //Wait until next item to be deleted
            case <-t.C:
                for !h[0].Expiration.After(time.Now()) {
                    item := heap.Pop(&h).(*Item)
                    destroy(item.Object)
                }
                if delta := time.Until(h[0].Expiration); delta >= MININT {
                    t.Reset(delta)
                } else {
                    t.Reset(MININT)
                }
            }
        }
    }()
    return quitCh
}

type ExpHeap []*Item

func (h ExpHeap) Len() int {
    return len(h)
}

func (h ExpHeap) Swap(i, j int) {
    h[i], h[j] = h[j], h[i]
}

func (h ExpHeap) Less(i, j int) bool {
    return h[i].Expiration.Before(h[j].Expiration)
}

func (h *ExpHeap) Push(x interface{}) {
    item := x.(*Item)
    *h = append(*h, item)
}

func (h *ExpHeap) Pop() interface{} {
    old, n := *h, len(*h)
    item := old[n-1]
    *h = old[:n-1]
    return item
}

//Auctural destroy code.
func destroy(x interface{}) {
    fmt.Printf("%v @ %v\n", x, time.Now())
}

func main() {
    addCh := make(chan Item)
    quitCh := deleteExpired(addCh)

    for i := 30; i > 0; i-- {
        t := time.Now().Add(time.Duration(i) * time.Second / 2)
        addCh <- Item{t, t}
    }
    time.Sleep(7 * time.Second)
    quitCh <- true
}

游乐场: https://play.golang.org/p/JNV_6VJ_yfK

顺便提一下,有像cron这样的作业管理包,但我不熟悉它们,所以无法对其效率发表评论。

编辑: 我仍然没有足够的声望来评论:( 关于性能:这段代码基本上具有更少的CPU使用量,因为它只在必要时唤醒自己,并且仅遍历需要销毁的项而不是整个列表。根据个人(实际上是ACM经验),大约现代CPU可以在1.2秒左右处理10^9的循环,这意味着在10^6的规模上,遍历整个列表需要超过1毫秒,不包括实际销毁代码和数据复制(平均情况下将花费大量时间超过数千次运行,达到100毫秒左右的规模)。我的代码方法是O(lg N),在10^6的规模上至少快了一千倍(考虑常数)。请再次注意,所有这些计算都是基于经验而不是基准测试的(有,但我无法提供)。

编辑2: 经过再次思考,我认为简单的解决方案可以使用一个简单的优化:
func deleteExpired(items []Item){ 
    tail = len(items)
    for index, v := range items { //better naming
        if v.Expired(){
            tail--
            items[tail],items[index] = v,items[tail]
        }
    }
    deleteditems := items[tail:]
    items:=items[:tail]
}

通过这个改变,它不再低效地复制数据并且不会分配额外的空间。

编辑3: 从这里更改代码。 我测试了afterfunc的内存使用情况。在我的笔记本电脑上,每次调用它使用250字节,而在playground上是69字节(我很好奇原因)。使用我的代码,一个指针+一个time.Time是28字节。在一百万的规模下,差异很小。使用After Func是一个更好的选择。


非常感激,但对我来说看起来有点复杂:p。。我会尝试更仔细地研究它。你认为性能会比cjds答案中的那个好吗? - fisker
非常感谢 <3 如果 Cerise 的 time.AfterFunc 不按照我希望的方式工作,我会尝试集成这段代码 (: - fisker
堆确实是解决问题的好方法,但为什么不使用运行时的计时器堆呢? - Charlie Tumahai
不知怎么的,我神奇地获得了评论的能力:)。time.AfterFunc每次被调用时都会生成一个goroutine,并且只在计时器停止后释放goroutine。所以基本上它与您的原始解决方案相同。性能分析已编辑到答案中。 - leaf bebop
@leafbebop 当调用time.AfterFunc时,不会生成一个goroutine。goroutine是在goFunc中启动的。当时间到期时,将调用函数goFunc - Charlie Tumahai
显示剩余2条评论

1
如果只需一次性操作,可以轻松实现此操作。
// Make the destruction cancelable
cancel := make(chan bool)

go func(t time.Duration, id int){
 expired := time.NewTimer(t).C
 select {
   // destroy the object when the timer is expired
   case <-expired:
     destroyObject(id)
   // or cancel the destruction in case we get a cancel signal
   // before it is destroyed
   case <-cancel:
     fmt.Println("Cancelled destruction of",id)
     return
 }
}(time.Hours * 4, id)

if weather == weather.SUNNY {
  cancel <- true
}

如果您想每4小时执行此操作:
// Same as above, though since id may have already been destroyed
// once, I name the channel different
done := make(chan bool)

go func(t time.Duration,id int){

  // Sends to the channel every t
  tick := time.NewTicker(t).C

  // Wrap, otherwise select will only execute the first tick
  for{
    select {
      // t has passed, so id can be destroyed
      case <-tick:
        destroyObject(id)
      // We are finished destroying stuff
      case <-done:
        fmt.Println("Ok, ok, I quit destroying...")
        return
    }
  }
}()

if weather == weather.RAINY {
  done <- true
}

这背后的想法是为每个可取消的销毁任务运行单个goroutine。比如,您有一个会话并且用户执行了某些操作,因此您希望保持会话活动状态。由于goroutines非常便宜,因此您可以简单地启动另一个goroutine。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接