Token Bucket为令牌桶算法,常被用于流量整形,或对客户端的限速。

假设家里是200M的宽带, 为了保证打游戏的流畅,我要把电视设置为10M。那么实现上可以如下:

 

方案一:

累加电视的所有接收/发出的数据包长度,如果超出10M,则丢弃报文;每秒更新一次累加值为0。

思路非常简单,也非常实用;从长时间统计上看,结果上偏差不大。

但可能限制出来流量图就会如下,不均匀,像抽风似的。如果设置了10条限速规则对10个设备进行限速时,则每秒开始的0.1秒,CPU压力很大,然后0.9秒很闲。

 

这种漏桶似的算法,利用率和平衡性较差。

方案二:

如果改进一下,将统计时间细化0.1秒,将效果会好不少。再进一步,将时间细化到每个报文接收,则效果就达到极致。

这就是Token Bucket做的事情。

 

流程出下:

1、当收到数据报文时,先试图获取token,如果剩余token足够则放行;

2、不够则更新token,等待token足够再继续。token按指定速率添加到bucket中,超过其容量则设置为最大值。

预期效果图如下:

 

示例代码实现如下:假设允许的速率每秒100,模拟包长在50~100随机,统计一下发送时间和平均速率。


 

# -- coding: utf-8 --# Good luck.import timeimport randomclass tb():    def __init__(self, token_per_sec):        self.captoken = token_per_sec        self.curr_token = 0        self.timestamp = time.time()    def refill(self):        if time.time() > self.timestamp:            refill_token = self.captoken*(time.time() - self.timestamp)            self.curr_token = min(self.curr_token + refill_token, self.captoken)            print "\tRefill tokens Current: %d. (Add %d)" %(self.curr_token, refill_token)            self.timestamp = time.time()        else:            print "\tNo refill...."    def consume(self, num):        ret = False        if self.curr_token > 0: # Control precision            self.curr_token -=  num            print "\tGet token %d : remain %d" %(num, self.curr_token)            ret = True        self.refill()        return rettest = tb(100)run_time = time.time()all_len = 0;times = 0while True:    t = 0.1    ts = time.time()    send_len = random.randint(10, 50)    print "Need send %d" %(send_len)    while not test.consume(send_len):        time.sleep(t)    te = time.time()    print "Use Time:%s Send:%d " %(te-ts, send_len)    all_len += send_len    times += 1    print "Avg = %s (times=%d)" %(all_len/(ts - run_time+0.1), times)    time.sleep(t)

运行效果

C:\Python27\python.exe X:/GIT/wxy_code_backup/Flash_tools/tokenbucket.py

Need send 44

No refill....

Refill tokens Current: 9. (Add 9)

Get token 44 : remain -34

Refill tokens Current: -23. (Add 10)

Use Time:0.200999975204 Send:44

Avg = 440.0 (times=1)

Need send 27

Refill tokens Current: -13. (Add 9)

Refill tokens Current: -3. (Add 10)

Refill tokens Current: 6. (Add 10)

Get token 27 : remain -20

Refill tokens Current: -10. (Add 10)

Use Time:0.302000045776 Send:27

Avg = 177.057409665 (times=2)

Need send 33

Refill tokens Current: 0. (Add 10)

Refill tokens Current: 9. (Add 10)

Get token 33 : remain -23

Refill tokens Current: -13. (Add 10)

Use Time:0.200999975204 Send:33

Avg = 129.51431022 (times=3)

Need send 42

Refill tokens Current: -3. (Add 10)

Refill tokens Current: 6. (Add 10)

Get token 42 : remain -35

Refill tokens Current: -25. (Add 10)

Use Time:0.201000213623 Send:42

Avg = 132.126711657 (times=4)

Need send 16

Refill tokens Current: -15. (Add 9)

Refill tokens Current: -5. (Add 9)

Refill tokens Current: 4. (Add 10)

Get token 16 : remain -11

Refill tokens Current: -1. (Add 9)

Use Time:0.300999879837 Send:16

Avg = 115.22048411 (times=5)

......

Need send 21

Refill tokens Current: -4. (Add 10)

Refill tokens Current: 5. (Add 9)

Get token 21 : remain -15

Refill tokens Current: -5. (Add 10)

Use Time:0.200999975204 Send:21

Avg = 100.389763573 (times=138)

Need send 41

Refill tokens Current: 4. (Add 9)

Get token 41 : remain -36

Refill tokens Current: -26. (Add 10)

Use Time:0.101000070572 Send:41

Avg = 100.66413934 (times=139)

Need send 18

Refill tokens Current: -15. (Add 10)

Refill tokens Current: -5. (Add 10)

Refill tokens Current: 4. (Add 10)

Get token 18 : remain -13

Refill tokens Current: -3. (Add 10)

Use Time:0.301999807358 Send:18

Avg = 100.607594694 (times=140)

 

稳定后效果还算可以,从发包点上也相对均匀。当然初始仅为波动而已,可以通过调整优化掉,但从长时间看,效果是一致的。

 

方案三:

方案一、二都缺少缓冲的支持。如果报文来了,但是现在资源不够,不能发送,直接丢弃,则客户端再重发,这个过程太浪费。

因此增加缓冲队列,根据系统的内存,设定队列的大小,对于不能发送的报文,先入队。

如果队列满了呢? 那只能丢弃了,毫无办法。(不考虑共享宽带的情况)。

从Linux收发包角度,每个以太网设备有一个发送队列,如果满了,则设置为状态不可用,则不再收包。道理是一样的。

 

关于更新Token时间的设计,如上例程序,仅为演示,我没有加线程额外处理。

在存在缓冲队列的情况下,则必须增加更新线程,因为存在收发异常的情况。

 

示例如下,不过缓冲不过是增加一些弹性,丢包依然不可避免。结果可预知:由于缓冲的存在,平均流量会在一段时间后,才能趋向限制值。

# -- coding: utf-8 --# Good luck.import timeimport randomimport threadingclass tb():    def __init__(self, token_per_sec):        self.captoken = token_per_sec        self.curr_token = 0        self.timestamp = time.time()        self.queue = []        self.q_max = 5    def refill(self):        if time.time() > self.timestamp:            refill_token = self.captoken*(time.time() - self.timestamp)            self.curr_token = min(self.curr_token + refill_token, self.captoken)            print "\tRefill tokens Current: %d. (Add %d)" %(self.curr_token, refill_token)            self.timestamp = time.time()        else:            print "\tNo refill...."    def consume(self, num):        ret = False        if self.curr_token > 0: # Control precision            self.curr_token -=  num            print "\tGet token %d : remain %d" %(num, self.curr_token)            ret = True        elif len(self.queue) < self.q_max:            # Enter queue            self.queue.append(num)            print "Enqueue Action."            ret = True        else:            # Drop it.            ret = False        self.refill()        return ret    def dequeue(self):        if len(self.queue) and self.curr_token > 0:            tmp = self.queue.pop()            self.curr_token -= tmp            print "Dequeue Action : %d (remain %d)" %(tmp, self.curr_token)def update_bucket(args):    print "Args=%s" %(args.captoken)    while True:        time.sleep(0.1)        args.refill()        args.dequeue()test = tb(100)run_time = time.time()all_len = 0times = 0tx_queue = []# update bucket thread.update_t = threading.Thread(target=update_bucket, args=(test,))update_t.start()while True:    t = 0.2    ts = time.time()    send_len = random.randint(40, 50)    print "Need send %d" %(send_len)    if test.consume(send_len):        te = time.time()        print "Use Time:%s Send:%d " %(te-ts, send_len)        all_len += send_len    else:        print "Drop Action."    times += 1    print "Avg = %s (times=%d)" %(all_len/(ts - run_time+0.1), times)    time.sleep(t)

运行结果:

Args=100

Need send 46

Enqueue Action.

Refill tokens Current: 0. (Add 0)

Use Time:0.0 Send:46

Avg = 455.44587139 (times=1)

Refill tokens Current: 10. (Add 9)

Dequeue Action : 46 (remain -35)

Need send 41

Enqueue Action.

Refill tokens Current: -25. (Add 10)

Use Time:0.0 Send:41

Avg = 289.036568661 (times=2)

Refill tokens Current: -25. (Add 0)

Refill tokens Current: -15. (Add 10)

Need send 47

Enqueue Action.

Refill tokens Current: -5. (Add 9)

Use Time:0.0 Send:47

Avg = 266.932297286 (times=3)

Refill tokens Current: -5. (Add 0)

Refill tokens Current: 4. (Add 10)

Dequeue Action : 47 (remain -42)

 

......

Drop Action.

Avg = 106.217083485 (times=170)

Refill tokens Current: -9. (Add 2)

Refill tokens Current: 0. (Add 10)

Dequeue Action : 42 (remain -41)

Need send 50

Enqueue Action.

Refill tokens Current: -34. (Add 7)

Use Time:0.0 Send:50

Avg = 107.055107617 (times=171)

Refill tokens Current: -31. (Add 2)

 

在有针对多个用户的多个限速规则,必然存在多个队列后,就需要有一定的调度算法,常用的RR、DRR、WRR,后面使用一些示例,展开讨论一下。

 

 

 

 

 

 

©著作权归作者所有:来自51CTO博客作者wangxinyu2011的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. OSPF学习笔记整理(上)
  2. 【原创】关于交换机端口链路类型Access、trunk、hybrid的理解(下)
  3. 【原创】关于交换机端口链路类型Access、trunk、hybrid的理解(上)
  4. CentOS 7 部署RabbitMQ 服务
  5. Iptables 规则用法小结
  6. Python下操作Memcache/Redis/RabbitMQ说明
  7. Android 网络开发抓包工具Fiddler、tcpdump和Wireshark的使用
  8. RabbitMQ演讲稿(精简版)
  9. Python爬虫面试题分享!这三点很重要

随机推荐

  1. Android系统驱动概述
  2. android常用软件下载资源链接
  3. 如何在Windows上安装Android SDK / ADB
  4. android的属性
  5. android:layout_gravity="bottom"不起作
  6. android 利用ksoap2方式连接webservice
  7. android drawable中的state属性说明
  8. Android消息推送时刻
  9. 14条Android Studio常用的的配置
  10. Android学习笔记__1__Android体系架构