为了账号安全,请及时绑定邮箱和手机立即绑定

如何同步慢速计算并缓存它?

如何同步慢速计算并缓存它?

Go
小怪兽爱吃肉 2022-10-04 19:56:43
在golang后端,我想为多个客户端提供一个值,让我们称之为分数。分数随时间而变化,计算速度很慢。计算不依赖于先前的结果。当没有客户时,我根本不想计算它。因此,计算应该只在有要求的情况下进行。但还有另一个事实 - 分数不能在5秒内改变。所以我尝试了不同的建议,一切都有其缺点:在客户缺席的情况下进行昂贵的计算:var score interface{}// run in a separate goroutinefunc calculateScorePeriodically() {    for{        select{        case <-time.After(5*time.Second):            score = calculateScoreExpensiveAndSlow()        }    }}func serveScore(w http.ResponseWriter, r* http.Request) {    b, _ := json.Marshal(score)    w.Write(b)}在很长的计算周期内阻止所有客户端(但实际上可能只是向它们提供旧数据)。而且你不能移动到互斥体之外,因为这样多个客户端可以同时进入计算块,并且不会在5秒间隔内进行计算,而是按顺序进行计算:ifvar (    score interface{}    mutex sync.Mutex    updatedAt time.Time)func getCachedScore() float64 {    mutex.Lock()    defer mutex.Unlock()    currentTime := time.Now()    if currentTime.Sub(updatedAt) < 5*time.Second {        return score    }    updatedAt = currentTime    score = calculateScoreExpensiveAndSlow()    return score}func serveScore(w http.ResponseWriter, r* http.Request) {    b, _ := json.Marshal(getCachedScore())    w.Write(b)}如何解决上述两个缺点?PS.我认为这是一个通用问题,也是一种模式 - 它有一个特殊的名字吗?
查看完整描述

2 回答

?
largeQ

TA贡献2039条经验 获得超7个赞

可能有多种解决方案。一个简单的解决方案是使用指定的 goroutine 进行计算,您可以通过在通道上发送值来表示需要重新计算。发送可能是非阻塞的,因此,如果计算正在进行中,则不会发生任何事情。


下面是一个可重用的缓存实现:


type cache struct {

    mu      sync.RWMutex

    value   interface{}

    updated time.Time


    calcCh     chan struct{}

    expiration time.Duration

}


func NewCache(calc func() interface{}, expiration time.Duration) *cache {

    c := &cache{

        value:   calc(),

        updated: time.Now(),

        calcCh:  make(chan struct{}),

    }


    go func() {

        for range c.calcCh {

            v := calc()


            c.mu.Lock()

            c.value, c.updated = v, time.Now()

            c.mu.Unlock()

        }

    }()


    return c

}


func (c *cache) Get() (value interface{}, updated time.Time) {

    c.mu.RLock()

    value, updated = c.value, c.updated

    c.mu.RUnlock()


    if time.Since(updated) > c.expiration {

        // Trigger a new calculation (will happen in another goroutine).

        // Do non-blocking send, if a calculation is in progress,

        // this will have no effect

        select {

        case c.calcCh <- struct{}{}:

        default:

        }

    }


    return

}


func (c *cache) Stop() {

    close(c.calcCh)

}

注意:是停止背景戈鲁廷。呼叫后,不得调用。Cache.Stop()Cache.Stop()Cache.Get()


将其用于您的情况:


func getCachedScore() interface{} {

    // ...

}


var scoreCache = NewCache(getCachedScore, 5*time.Second)


func serveScore(w http.ResponseWriter, r* http.Request) {

    score, _ := scoreCache.Get()

    b, _ := json.Marshal(score)

    w.Write(b)

}


查看完整回答
反对 回复 2022-10-04
?
汪汪一只猫

TA贡献1898条经验 获得超8个赞

这是我实现的,与icza的答案相关,但具有更多功能:


package common


import (

    "context"

    "sync/atomic"

    "time"

)


type (

    CachedUpdater func() interface{}

    ChanStruct    chan struct{}

)


type Cached struct {

    value        atomic.Value  // holds the cached value's interface{}

    updatedAt    atomic.Value  // holds time.Time, time when last update sequence was started at

    updatePeriod time.Duration // controls minimal anount of time between updates

    needUpdate   ChanStruct

}


//cachedUpdater is a user-provided function with long expensive calculation, that gets current state

func MakeCached(ctx context.Context, updatePeriod time.Duration, cachedUpdater CachedUpdater) *Cached {

    v := &Cached{

        updatePeriod: updatePeriod,

        needUpdate:   make(ChanStruct),

    }

    //v.updatedAt.Store(time.Time{}) // "was never updated", but time should never be nil interface

    v.doUpdate(time.Now(), cachedUpdater)

    go v.updaterController(ctx, cachedUpdater)

    return v

}


//client will get cached value immediately, and optionally may trigger an update, if value is outdated

func (v *Cached) Get() interface{} {

    if v.IsExpired(time.Now()) {

        v.RequestUpdate()

    }

    return v.value.Load()

}


//updateController goroutine can be terminated both by cancelling context, provided to maker, or by closing chan

func (v *Cached) Stop() {

    close(v.needUpdate)

}


//returns true if value is outdated and updater function was likely not called yet

func (v *Cached) IsExpired(currentTime time.Time) bool {

    updatedAt := v.updatedAt.Load().(time.Time)

    return currentTime.Sub(updatedAt) > v.updatePeriod

}


//requests updaterController to perform update, using non-blocking send to unbuffered chan. controller can decide not to update in case if it has recently updated value

func (v *Cached) RequestUpdate() bool {

    select {

    case v.needUpdate <- struct{}{}:

        return true

    default:

        return false

    }

}


func (v *Cached) updaterController(ctx context.Context, cachedUpdater CachedUpdater) {

    for {

        select {

        case <-ctx.Done():

            return

        case _, ok := <-v.needUpdate:

            if !ok {

                return

            }

            currentTime := time.Now()

            if !v.IsExpired(currentTime) {

                continue

            }

            v.doUpdate(currentTime, cachedUpdater)

        }

    }

}


func (v *Cached) doUpdate(currentTime time.Time, cachedUpdater CachedUpdater) {

    v.updatedAt.Store(currentTime)

    v.value.Store(cachedUpdater())

}


查看完整回答
反对 回复 2022-10-04
  • 2 回答
  • 0 关注
  • 61 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信