为了账号安全,请及时绑定邮箱和手机立即绑定

分布式出站 http 速率限制器

分布式出站 http 速率限制器

Go
HUWWW 2023-06-26 17:48:51
我有一个微服务架构应用程序,其中多个服务轮询外部 API。外部 API 的速率限制为每分钟 600 个请求。如何让所有实例一起保持在共享 600 速率限制以下?Google只给我带来了3个解决方案,最有希望的是:myntra/golimit是这三个中最有前途的,但我实际上不知道如何设置它。wallstreetcn/rate似乎仅在达到限制时才拒绝(我的应用程序需要等到它可以发出请求)并且funcEvery中的函数rate.NewLimiter似乎是不同的导入/依赖项,我无法弄清楚它是什么manavo/go-rate-limiter有一个&ldquo;软&rdquo;限制,显然,这可能会让我超过限制。有些端点如果几秒钟内无法访问我并不介意,但其他端点请求应该尽可能有效。目前我有一个业余解决方案。下面的代码允许我设置每分钟的限制,并且它在请求之间休眠以将请求分散到一分钟内。此客户端速率限制是针对每个实例的,因此我必须对 600 个请求除以实例数量进行硬编码。var semaphore = make(chan struct{}, 5)var rate = make(chan struct{}, 10)func init(){    // leaky bucket    go func() {        ticker := time.NewTicker(100 * time.Millisecond)        defer ticker.Stop()        for range ticker.C {            _, ok := <-rate            // if this isn't going to run indefinitely, signal            // this to return by closing the rate channel.            if !ok {                return            }        }}()以及发出 http API 请求的函数内部。rate <- struct{}{}    // check the concurrency semaphore    semaphore <- struct{}{}    defer func() {        <-semaphore}()如何让所有实例一起保持在共享 600 速率限制以下?首选项: - 基于密钥的速率限制计数器,因此可以设置多个计数器。- 在设定的持续时间内分散请求,以便在前 30 秒内而不是在整分钟持续时间内发送 600 个请求。
查看完整描述

2 回答

?
拉风的咖菲猫

TA贡献1995条经验 获得超2个赞

我无法与您找到的库交谈,但漏桶速率限制器非常简单。您需要某种共享事务存储。每个存储桶(或速率限制器)只是一个整数和一个时间值。该整数是特定时间桶中的滴数。每次必须应用速率限制时,减去自上次更新以来泄漏的滴数,然后加一,然后检查滴数是否在桶的容量范围内。

我们正在使用 Redis 来完成这类事情。要在 Redis 中实现此事务性,需要一个脚本(。例如,在 SQL 数据库中,aSELECT FOR UPDATE后跟一条语句可以实现相同的效果。UPDATE这是我们的 Redis 脚本:

-- replicate_commands allows us to use the TIME command. We depend on accurate

-- (and reasonably consistent) timestamps. Multiple clients may have

-- inacceptable clock drift.

redis.replicate_commands()


local rate = tonumber(ARGV[1]) -- how many drops leak away in one second

local cap = tonumber(ARGV[2]) -- how many drops fit in the bucket

local now, _ = unpack(redis.call('TIME'))


-- A bucket is represented by a hash with two keys, n and t. n is the number of

-- drops in the bucket at time t (seconds since epoch).

local xs = redis.call('HMGET', KEYS[1], 'n', 't')

local n = tonumber(xs[1])

local t = tonumber(xs[2])


if type(n) ~= "number" or type(t) ~= "number" then

    -- The bucket doesn't exist yet (n and t are false), or someone messed with

    -- our hash values. Either way, pretend the bucket is empty.

    n, t = 0, now

end


-- remove drops that leaked since t

n = n - (now-t)*rate

if n < 0 then

    n = 0

end


-- add one drop if it fits

if n < cap then

    n = n + 1

else

    n = cap

end


redis.call('HMSET', KEYS[1], 'n', n, 't', now)

redis.call('EXPIRE', KEYS[1], math.floor(n/rate) + 1)


return n

每秒 10 滴的调用示例,容量为 10 滴:


EVALSHA <SHA_IN_HEX> 1 rate-limit:my-bucket 10 10 

该脚本返回桶中的滴数。如果该数字等于容量,您可以短暂休眠并重试,或者完全拒绝该请求,具体取决于您的要求。


请注意,脚本永远不会返回大于容量的值,因此在您的情况下恢复时间不会超过十分之一秒。这可能不是您所需要的,因为您正在尝试匹配第三方速率限制器。也就是说,您可能可以接受桶溢出,从而导致突发请求后恢复时间更长。


查看完整回答
反对 回复 2023-06-26
?
慕森王

TA贡献1777条经验 获得超3个赞

如果你想要一个全局速率限制器,你需要一个地方来维护分布式状态,比如zookeeper。通常,我们不想支付管理费用。或者,您可以设置转发代理,在其中进行速率限制。



查看完整回答
反对 回复 2023-06-26
  • 2 回答
  • 0 关注
  • 105 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信