基于Redis的Golang延时任务队列实现
一、延时应用场景
一个完整的任务调度系统,对延时任务的支持必不可少。
延时任务、延迟消息、延迟队列基本语境和实现类似,那么它有哪些适用场景呢?
最常见的如:
用户下单xx分钟内未付款订单自动取消,释放库存;
订单发货后xx天自动确认收货;
订单结束后xx天自动评价;
用户注册后1min内触发xx动作等。
二、延时解决方案
延时作为常见的需求自然有众多解决方案,数据库轮询是最容易想到的一个方案,时间轮,小顶堆,有序链表,延时队列以及各类开源项目也是琳琅满目。了解每种解决方案的原理以及优缺点,可以帮助在生产中做好技术选型。
1.数据库轮询
最简单且容易想到的方案是后台启动定时脚本,定时轮询扫描数据库获取满足条件数据并处理,这种方案实现简单有效。
时间处理精度问题,linux系统crontab最小是1分钟,如果需要更细时间粒度可以通过脚本for{}无限循环轮询数据库,总执行时间为50秒,每次轮询后sleep10秒,类似操作可达成更小时间粒度。
此方案项目初级比较有效,但也有较多弊端:
- 轮询粒度不好把控,轮询间隔时间过长影响精准度,过短又会产生大量不必要的数据库扫描,增加数据库压力;
- 随着数据量增大此方案存在较大性能瓶颈;
- 延时任务过多也会造成定时脚本不易维护。
2.延迟消息队列
2.1RabbitMQ队列
RabbitMQ本身不支持延时消息,但可通过死信队列及死信路由设置间接达成。
TTL(Time to live)分消息TTL和队列TTL,控制消息超时时间,消息在队列中生存时间一旦超过TTL设置时间即成为dead letter(死信),然后通过Dead letter exchange死信路由交换机来重新路由消息。
利用成熟RabbitMQ消息组件,稳定、易扩展、支持分布式,消息支持持久化可靠性好。但消息的延时时间需要保持一致,死信队列还是先进先出,如果先进的队列由于未到执行时间会阻塞所有后入消息,因此一种延时时间需要建一套路由。
除死信队列方案外还有一些RabbitMQ的插件可以实现延时。
2.2 RocketMQ
RocketMQ是支持延时消息的,且足够高效可靠,但延迟消息的时间不是任意时间,而是仅支持18个固定的时间段,这里不再赘述。
3.时间轮算法
时间轮算法是实现延时最常用的算法,这里重点介绍它的实现方案。
3.1实现原理
可以想象一个时钟的表盘,有一个指针绕着转动,每走一个格子称为一个刻度(时间间隔interval),表盘每个格子上挂载待执行任务列表(任务桶buckets),指针转动一圈总长度(bucketSize),这些元素构成一个时间轮。
如果刻度是1s,总长度是60s,那么转一圈就是1分钟,可以实现1分钟内的延时。要实现更长时间跨度,可将总长度设置更大,但这会造成占用内存过大,更多空转浪费资源。有两种优化方案,使用多层时间轮或多级时间轮。
- 多层时间轮就是增加圈数circle,一圈代表60s,那么10圈就是10分钟。
- 多级时间轮可以想象成时钟的时针、分针、秒针,一级到达后执行二级,再到三级,直到满足执行任务。
3.2具体代码
定义时间轮结构如下:
type TimeWheel struct {
ticker *time.Ticker //ticker
interval time.Duration //time duration of moving one slot.
buckets []*list.List //bucket list
bucketSize int //total size of bucket
currentPos int //current position in buckets
callbackFunc func(interface{}) //execute func
stopChannel chan bool //stop the ticker channel
}
定时器触发使用time.Ticker,它是Go自身实现的内置定时器,基于最小堆结构实现。Buckets存放任务列表,使用双向链表container/list结构,注意它非线程安全。
新建一个时间轮实例:
//create timewheel instance
func New(interval time.Duration, bucketSize int, callbackFunc func(interface{})) (*TimeWheel, e
rror) {
if interval <= 0 || bucketSize <= 0 || callbackFunc == nil {
return nil, errors.New("create timewheel instance fail")
}
tw := &TimeWheel{
interval: interval,
buckets: make([]*list.List, bucketSize),
bucketSize: bucketSize,
currentPos: 0,
callbackFunc: callbackFunc,
stopChannel: make(chan bool),
}
//init bucket,every bucket will have a list
for i := 0; i < bucketSize; i++ {
tw.buckets[i] = list.New()
}
return tw, nil
}
定义任务Task结构体,并添加任务。为了构造多层时间轮,给任务添加circle代表该任务在第几圈。pos代表任务在当前表盘上的位置。
//define task
type Task struct {
Id interface{} //task id global uniqueness
Data interface{} //data of task
Delay time.Duration //delay time, 30 means after 30 second
Circle int //task position in timewheel
}
//add task
func (tw *TimeWheel) AddTask(task *Task) {
delaySeconds := int(task.Delay.Seconds())
intervalSeconds := int(tw.interval.Seconds())
circle := int(delaySeconds / intervalSeconds / tw.bucketSize)
pos := int(tw.currentPos+delaySeconds/intervalSeconds) % tw.bucketSize
task.Circle = circle
tw.buckets[pos].PushBack(task)
}
启动时间轮,每经过一刻度(这个刻度可以是1s、5s任意),做一次检查,如果当前格里有任务则取出执行,碰到多圈任务将circle-1。当指针走到末尾代表走完一圈,会重置再从头执行。
//start timewheel
func (tw *TimeWheel) Start() {
//add ticker
tw.ticker = time.NewTicker(tw.interval)
//receive chan
go func() {
for {
select {
case <-tw.ticker.C: //reach a tick
log.Println("1 tick")
tw.tickHandler()
case <-tw.stopChannel: //true
tw.ticker.Stop() //stop the ticker
return
}
}
}()
}
//1 tick handler
func (tw *TimeWheel) tickHandler() {
bucket := tw.buckets[tw.currentPos]
for e := bucket.Front(); e != nil; {
task := e.Value.(*Task) //e.value is a task
if task.Circle > 0 {
task.Circle--
e = e.Next()
continue
}
//do task
go tw.callbackFunc(task.Data)
//remove e
next := e.Next()
bucket.Remove(e)
e = next
}
//finish 1 circle,reset
if tw.currentPos == tw.bucketSize-1 {
log.Println("new circle")
tw.currentPos = 0
} else {
tw.currentPos++
}
}
测试时间轮一圈10s,间隔刻度1s,添加延时12s的延时任务,第13s后执行任务。
func TestTimeWheel(t *testing.T) {
tw, err := New(1*time.Second, 10, func(data interface{}) {
log.Println("do task", data)
})
if err != nil {
t.Error(err)
}
log.Println("start timewheel...")
tw.Start()
task := Task{Id: 1, Data: "test1", Delay: 12 * time.Second}
tw.AddTask(&task)
time.Sleep(20 * time.Second)
}
执行效果:
3.3 更多细节考虑
3.3.1 长时间跨度的解决方案
由于时间跨度越大轮子越大,会占用更多内存,所以可以考虑采用磁盘文件+内存时间轮相结合的方案。内存时间轮只加载1小时的任务,磁盘文件可以时间命名(2020101721代表2020年10月17日21:00-21:59:59所有延时任务),每小时一个文件,一天24个,一般情况不会保存太多文件。
3.3.2 内存时间轮的高可用性
因为采用内存时间轮,如果程序崩溃会导致数据丢失。将时间轮持久化保存成文件存储,到达时间后预加载到内存,程序崩溃、重启后也可以重新加载,文件保存可保障数据不会丢失,当然也可保存在redis或其他持久化存储中。
除内存时间轮外也可以直接使用redis的list结构替代container/list,redis的string结构保存时间轮当前指针。
3.3.3 任务执行方式
3.3.4 分布式集群任务分发
2020101721_0
2020101721_1
2020101721_2
...
2020101721_9
3.4 方案分析
时间轮方案执行效率高,时间精度高,但内存时间轮重启或宕机后需要考虑持久化和消费标记,集群扩展实现也较复杂。
4.排序链表算法
要使用排序链表数据结构,最先想到的就是redis的sorted set结构,这里以redis有序集合为基础来实现延时。
4.1 实现原理
redis有序集合zset结构是一个有序链表,可以通过zadd向链表添加元素,并将其score设置为延时任务执行的时间戳,值设为任务id。然后通过zrange获取链表第一个元素(默认是score最小元素),通过判断score和当前时间大小,决定是否到达执行时间。
4.2 具体代码
按时间轮设计思想定义一个带定时器的结构体:
//define bucket ticker
type BucketTicker struct {
Ticker *time.Ticker
Interval time.Duration
Name string
CallbackFunc func(interface{}) bool
}
//new ticker
func New(interval time.Duration, bucketName string, callbackFunc func(interface{}) bool) (*Buck
etTicker, error) {
if interval <= 0 || callbackFunc == nil {
return nil, errors.New("create bucket ticker instance fail")
}
bucket := &BucketTicker{
Interval: interval,
Name: bucketName,
CallbackFunc: callbackFunc,
}
return bucket, nil
}
定义任务及添加方法,将任务的执行时间(当前时间+延时时间)和任务唯一Id存到zset结构中,将任务主体序列化存到kv结构(string)中。
//define task
type Task struct {
Id string //task id global uniqueness
Data interface{} //data of task
Delay time.Duration //delay time, 30 means after 30 second
Timestamp int
}
//add task
func (bucket *BucketTicker) AddTask(task *Task) error {
//task id and delay time in redis zset
timestamp := time.Now().Add(task.Delay).Unix()
err := redisclient.ZAdd(bucket.Name, int(timestamp), task.Id)
if err != nil {
return err
}
//task body in redis string
data, err := json.Marshal(task)
if err != nil {
return err
}
err = redisclient.Set(task.Id, string(data))
if err != nil {
return err
}
return nil
}
启动定时器,每隔一个刻度,检查是否有满足执行时间的任务。间隔时间越长,可以减少与redis查询频率,但延时任务处理精度会降低。
func (bucket *BucketTicker) Start() {
timer := time.NewTicker(bucket.Interval) //interval
go func() {
for {
select {
case t := <-timer.C:
log.Println("1 tick")
bucket.tickHandler(t, bucket.Name)
}
}
}()
}
//tick handler
func (bucket *BucketTicker) tickHandler(currentTime time.Time, bucketName string) {
for {
task, err := getTask(bucketName)
if err != nil {
log.Println("error happen!", err)
return
}
if task == nil { //no task
return
}
//not arrival execution time
if task.Timestamp > int(currentTime.Unix()) {
return
}
//do task
taskDetail, err := getTaskDetail(task.Id)
if err != nil { //retry
log.Println("error happen!", err)
continue
}
//if callback success, remove finish task
if ok := bucket.CallbackFunc(taskDetail.Data); ok {
err = removeTask(bucketName, task.Id)
if err != nil {
continue
}
} else {
log.Println("error happen!", errors.New("callback error"))
continue //retry
}
return
}
}
getTask(),getTaskDetail()和removeTask()分别执行Redis操作。
//get task from redis zset
func getTask(bucketName string) (*Task, error) {
value, err := redisclient.ZRangeFirst(bucketName) //ZRANGE key 0 0 WITHSCORES
if err != nil {
return nil, err
}
if value == nil {
return nil, nil
}
timestamp := int(value[0].(float64))
taskId := value[1].(string)
task := Task{
Id: taskId,
Timestamp: timestamp,
}
return &task, nil
}
//get task detail by taskId
func getTaskDetail(taskId string) (*Task, error) {
v, err := redisclient.Get(taskId)
if err != nil {
return nil, err
}
if v == "" {
return nil, nil
}
task := Task{}
err = json.Unmarshal([]byte(v), &task)
if err != nil {
return nil, err
}
return &task, nil
}
//remove the task
func removeTask(bucketName string, taskId string) error {
err := redisclient.ZRem(bucketName, taskId)
if err != nil {
return err
}
err = redisclient.Del(taskId)
if err != nil {
return err
}
return nil
}
编写测试用例测试,添加2个延时任务分别是延时5秒和延时8秒。
func TestRedisDelay(t *testing.T) {
delay, err := New(1*time.Second, "test", func(data interface{}) bool {
log.Println("do task ", data)
return true
})
if err != nil {
t.Error(err)
}
log.Println("start ticker...")
delay.Start()
task1 := Task{Id: "1", Data: "task1", Delay: 5 * time.Second}
task2 := Task{Id: "2", Data: "task2", Delay: 8 * time.Second}
delay.AddTask(&task1)
delay.AddTask(&task2)
time.Sleep(10 * time.Second)
}
4.3 分布式集群任务分片
当有更多延时任务时,考虑存储多个bucket,每个bucket有自己的定时器,执行自己的任务列表。当有任务添加时,轮询加入不同bucket中。
4.4 方案分析
由于依赖比较成熟的组件redis,高可用程序挂掉重启后仍可继续处理,集群分片拓展也容易。但由于每次都取出数据比对score,会有频繁Redis IO操作,造成较大的资源浪费。
共有 0 条评论