热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【队列源码研究】消息队列beanstalkd源码详解

顺风车运营研发团队李乐1.消息队列简介计算机软件发展的一个重要目标是降低软件耦合性;网站架构中,系统解耦合的重要手段就是异步,业务之间的消

顺风车运营研发团队 李乐

1.消息队列简介

计算机软件发展的一个重要目标是降低软件耦合性;

网站架构中,系统解耦合的重要手段就是异步,业务之间的消息传递不是同步调用,而是将一个业务操作分为多个阶段,每个阶段之间通过共享数据的方式异步执行;

在分布式系统中,多个服务器集群通过分布式消息队列实现异步;分布式消息队列可以看作是内存队列的分布式部署;

分布式消息队列架构图通常如下所示:

clipboard.png

消息队列是典型的生产者消费者模式,两者不存在直接调用,只要保持数据结构不变,彼此功能实现可以随意改变而不互相影响;异步消息队列还有以下特点:

  • 提高系统可用性:消费者服务器发生故障时,生产者服务器可以继续处理业务请求,系统整体表现无故障;此时数据会在消息队列服务器堆积,待消费者服务器恢复后,可以继续处理消息队列中的数据;
  • 加快网站相应速度:业务处理前端的生产者服务器在处理完业务请求后,将数据写入消息队列,不需要等待消费者服务器处理就可以返回,减少响应延迟;
  • 消除并发访问高峰:用户访问是随机的,存在高峰和低谷;可以使用消息队列将突然增加的访问请求数据放入消息队列中,等待消费者服务器依次处理;

消费者消费消息时,通常有两种模式可以选择:拉模型与推模型。

  • 拉模型是由消息的消费者发起的,主动权把握在消费者手中,它会根据自己的情况对生产者发起调用;
  • 推模式消费者只会被动接受消息,消息队列一旦发现消息进入,就会通知消费者执行对消息的处理;

2.beanstalkd基本知识

2.1beanstalkd简介

beanstalkd是一个轻量级的消息队列;主要有一下特点:

  • 拉模式,消费者需要主动从服务器拉取消息数据;
  • tube:类似于消息主题topic,一个beanstalkd中可以支持多个tube,每个tube都有自己的producer和consumer;多个生产者可以往同一个tube生产job,多个消费者也能监听同一个tube获取job;
  • job:代替了传统的message,与消息最大的区别是,job有多种状态;
  • conn:代表一个客户端链接;
  • 优先级:job可以有0~2^32个优先级,0代表最高优先级,beanstalkd使用堆处理job的优先级排序,因此reserve命令的时间复杂度是O(logN);
  • 延时:生产者发布任务时可以指定延时,到达延迟时间后,job才能被消费者消费;
  • 超时机制:消费者从beanstalkd获取一个job后,必须在预设的 TTR (time-to-run) 时间内处理完任务,并发送 delete / release/ bury 命令改变任务状态;否则 Beanstalkd 会认为消息消费失败,重置job状态,使其可以被其他消费者消费。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从重新计时TTR;
  • 暂停:pause命令可以暂停当前tube,暂停时期内所有job都不能够被消费者消费;

job有一下几种状态:

  • READY,需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;
  • DELAYED,延迟执行的任务,;
  • RESERVED,已经被消费者获取, 正在执行的任务,Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;
  • BURIED,保留的任务: 任务不会被执行,也不会消失,除非有人将他修改为其他状态;
  • DELETED,消息被彻底删除。Beanstalkd 不再维持这些消息。

状态之间的转移图如下所示:

clipboard.png

思考:

  • beanstalkd如何维护job的状态?tube有3个集合delay、ready和 buried分别存放对应状态的job,conn的reserved_jobs集合存储状态为reserved的job(消费者获取一个job后,job的状态才会改变为reserved,因此这个集合由conn维护);
  • delay状态的job怎么修改为ready?delay集合是一个按照时间排序的最小堆,beanstalkd不定时循环从堆根节点获取job,校验是否需要改变其状态未ready;
  • 如何实现优先级?只有ready状态的job才能被消费者获取消费,ready集合是一个按照优先级排序的最小堆,根节点始终是优先级最高得job;
  • 拉模式实现?消费者使用reserve命令获取job,beanstalkd检查消费者监听的所有tube,查找到ready的job即返回,否则阻塞消费者知道有ready状态的job产生为止;

2.2beanstalkd命令

beanstalkd支持以下命令:

clipboard.png

3.beanstalkd源码分析

3.1数据结构

3.1.1基础结构体

//堆
struct Heap {int cap; //堆容量int len; //堆元素数目void **data; //元素数组Less less; //元素比较的函数指针Record rec; //函数指针,将元素插入堆时,会调用此函数
};//函数指针定义:
typedef int(*Less)(void*, void*);
typedef void(*Record)(void*, int);//API:元素的插入与删除
void * heapremove(Heap *h, int k);
int heapinsert(Heap *h, void *x)

//集合
struct ms {size_t used, cap, last; //cap为当前集合容量;used集合中元素数目;last上次访问的集合元素的位置void **items; //存储元素的数组ms_event_fn oninsert, onremove; //往集合插入元素,删除元素时调用的函数
};//函数指针定义如下
typedef void(*ms_event_fn)(ms a, void *item, size_t i);//API
void ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove);//初始化集合
int ms_append(ms a, void *item) //往集合追加元素
int ms_contains(ms a, void *item)//判断集合是否包含元素
void * ms_take(ms a) //获取并删除元素(会从上次访问的位置last开始查找)
int ms_remove(ms a, void *item) //删除元素,从头开始查找
int ms_clear(ms a) //清空集合

3.1.2 管道tube

struct tube {uint refs; //引用计数char name[MAX_TUBE_NAME_LEN]; //名称Heap ready; //存储状态未ready的job,按照优先级排序Heap delay; //存储状态未delayed的job,按照到期时间排序struct ms waiting; //等待当前tube有job产生的消费者集合int64 pause; //执行pause命令后,pause字段记录暂停时间int64 deadline_at; //deadline_at记录暂停到达时间struct job buried; //存储状态为buried的job,是一个链表
};

创建tube的代码如下:

tube make_tube(const char *name)
{tube t;//底层调用malloc分配空间t = new(struct tube);if (!t) return NULL;t->name[MAX_TUBE_NAME_LEN - 1] = '\0';strncpy(t->name, name, MAX_TUBE_NAME_LEN - 1);if (t->name[MAX_TUBE_NAME_LEN - 1] != '\0') twarnx("truncating tube name");//设置ready与delay堆的函数指针t->ready.less = job_pri_less;t->delay.less = job_delay_less;t->ready.rec = job_setheappos;t->delay.rec = job_setheappos;t->buried = (struct job) { };t->buried.prev = t->buried.next = &t->buried;ms_init(&t->waiting, NULL, NULL);return t;
}

两个堆指针函数如下:

//按照优先级比较
int job_pri_less(void *ax, void *bx)
{job a = ax, b = bx;if (a->r.pri r.pri) return 1;if (a->r.pri > b->r.pri) return 0;return a->r.id r.id;
}
//按照过期时间比较
int job_delay_less(void *ax, void *bx)
{job a = ax, b = bx;if (a->r.deadline_at r.deadline_at) return 1;if (a->r.deadline_at > b->r.deadline_at) return 0;return a->r.id r.id;
}
//设置每个job在堆的index
void job_setheappos(void *j, int pos)
{((job)j)->heap_index = pos;
}

3.1.3任务job

注:job创建完成后,先会保存在全局变量all_jobs的hash表中;然后才会插入到tube的各job队列中;

struct job {Jobrec r; // 存储job信息tube tube; //指向其所属tubejob prev, next; //job可以组织为双向链表(buried状态的job就是链表)job ht_next; //所有的job都存储在一个hash表中(拉链法),job的id为hash值;(tube中的job集合存储指针指向各个job)size_t heap_index; /* where is this job in its current heap */…………char body[];//job的数据,柔性数组
};// job的描述信息
struct Jobrec {uint64 id;uint32 pri;int64 delay;int64 ttr;int32 body_size;int64 created_at; //创建时间int64 deadline_at; //延迟job的过期时间//统计计数uint32 reserve_ct;uint32 timeout_ct;uint32 release_ct;uint32 bury_ct;uint32 kick_ct;byte state;//当前状态
};

3.14套接字socket

struct Socket {int fd;Handle f; //socket发生事件时的处理函数void *x; //服务器监听的socket指向server结构体;客户端对应的socket指向conn结构体int added; //往epoll注册事件时,计算操作类型
};

3.15服务器server

struct Server {char *port;char *addr;Socket sock; //监听的socketHeap conns; //存储即将有事件发生的客户端;按照事件发生的时间排序的最小堆;//例如:当客户端获取job后,若唱过TTR时间没处理完,job会状态应重置为ready状态;//当客户端调用reserve获取job但当前tube没有ready状态的job时,客户端会被阻塞timeout时间;
};

3.1.6客户端链接conn

struct Conn {Server *srv; //执行服务器Socket sock; //客户端socketchar state; //客户端状态:等待接收命令,等待接收数据,等待回复命令,等待返回job,关闭,获取job阻塞中char type; //客户端类型:生产者,消费者,获取job阻塞中Conn *next;tube use; //当前使用的tube;put命令发布的job会插入到当前tube中int64 tickat; //客户端处理job的TTR到期时间;或者客户端阻塞的到期时间;用于在server的conns堆比较int tickpos; // 在srv->conns堆里的位置job soonest_job; //所有reserve任务里到期时间最近的jobint rw; //当前关心的事件: 'r', 'w', or 'h'(读、写、关闭连接)int pending_timeout; //客户端获取job而阻塞的到期时间char halfclosed; //表示客户端断开连接char cmd[LINE_BUF_SIZE]; // 输入缓冲区int cmd_len;int cmd_read;char *reply; //输出缓冲区int reply_len;int reply_sent;char reply_buf[LINE_BUF_SIZE];//put命令发布job时,从客户端读入的jobint in_job_read;job in_job;//待返回给客户端的jobjob out_job;int out_job_sent;//当前客户端监听的所有tube集合struct ms watch;//当前客户端的所有reserved状态的jobstruct job reserved_jobs;
};

3.2 服务器启动过程

3.2.1 epoll简介

epoll结构体:

typedef union epoll_data {void *ptr;int fd;__uint32_t u32;__uint64_t u64;
} epoll_data_t;//保存触发事件的某个fd相关的数据struct epoll_event {__uint32_t events; /* epoll event */epoll_data_t data; /* User data variable */
};
//其中events表示感兴趣的事件和被触发的事件,可能的取值为:
//EPOLLIN:表示对应的文件描述符可以读;
//EPOLLOUT:表示对应的文件描述符可以写;
//EPOLLPRI:表示对应的文件描述符有紧急的数可读;
//EPOLLERR:表示对应的文件描述符发生错误;
//EPOLLHUP:表示对应的文件描述符被挂断;

epoll API定义如下:

int epoll_create(int size) //生成一个epoll专用的文件描述符,其中的参数是指定生成描述符的最大范围;int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) //用于控制某个文件描述符上的事件,可以注册事件,修改事件,删除事件int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout) //轮询I/O事件的发生;

3.2.2 beanstalkd使用epoll

//创建epoll:
epfd = epoll_create(1);//注册事件
int sockwant(Socket *s, int rw)
{…………ev.events |= EPOLLRDHUP | EPOLLPRI;ev.data.ptr = s; //注意:传入的是sokcet指针;(socket的x字段会指向server或者conn结构体,当socket对应的fd发生事件时,可以得到server或conn对象)return epoll_ctl(epfd, op, s->fd, &ev);
}//等待事件发生
//返回h r w 事件类型
int socknext(Socket **s, int64 timeout)
{…………r = epoll_wait(epfd, &ev, 1, (int)(timeout/1000000));if (r == -1 && errno != EINTR) {twarn("epoll_wait");exit(1);}if (r > 0) {*s = ev.data.ptr; //返回发生事件的socketif (ev.events & (EPOLLHUP|EPOLLRDHUP)) {return 'h';} else if (ev.events & EPOLLIN) {return 'r';} else if (ev.events & EPOLLOUT) {return 'w';}}return 0;
}

3.2.3服务器启动

int main(int argc, char **argv)
{optparse(&srv, argv+1);//解析输入参数r = make_server_socket(srv.addr, srv.port); //创建socketprot_init(); //初始化全局tubes集合,创建名称为default的默认tubesrvserve(&srv);//启动服务器return 0;
}struct ms tubes;//全局变量void prot_init()
{//初始化tube集合ms_init(&tubes, NULL, NULL);//创建默认tube;tube_find_or_make方法会先从tubes集合查找指定名称为tube,查找到直接返回;否则创建新的tubeTUBE_ASSIGN(default_tube, tube_find_or_make("default"));
}void srvserve(Server *s)
{//s->sock为server监听的socket;设置其处理函数为srvaccept;s->sock.x = s;s->sock.f = (Handle)srvaccept;s->conns.less = (Less)connless; //设置s->conns堆的函数指针s->conns.rec = (Record)connrec;r = listen(s->sock.fd, 1024); //监听r = sockwant(&s->sock, 'r'); //注册到epoll//开启循环for (;;) {//服务器有一些事件需要在特定时间执行,获得最早待执行事件的时间间隔,作为epoll_wait的等待时间;后面详细分析函数内部period = prottick(s);int rw = socknext(&sock, period); //epoll waitif (rw) {sock->f(sock->x, rw); //调用socket的处理函数}}
}
//至此,服务器启动完毕,等待客户端链接

conns堆分析:

上面说过,conns存储即将有事件发生的客户端;按照事件发生的时间排序的最小堆;
例如:当客户端获取job后,若唱过TTR时间没处理完,job会状态应重置为ready状态;
当客户端调用reserve获取job但当前tube没有ready状态的job时,客户端会被阻塞timeout时间;

//堆节点比较的函数指针:
int connless(Conn *a, Conn *b)
{return a->tickat tickat;
}//将客户端对象插入conns堆时,tickpos记录其插入的index(避免客户端重复插入;插入之前发现其tickpos>-1则先删除再插入)
void connrec(Conn *c, int i)
{c->tickpos = i;
}

处理客户端链接请求:

void srvaccept(Server *s, int ev)
{h_accept(s->sock.fd, ev, s);
}void h_accept(const int fd, const short which, Server *s)
{cfd = accept(fd, (struct sockaddr *)&addr, &addrlen);flags = fcntl(cfd, F_GETFL, 0); //获得fd标识r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK); //设置fd非阻塞,使用epoll必须设置非阻塞,负责epoll无法同时监听多个fd//创建conn对象;默认监听default_tube(c->watch存储所有监听的tube);默认使用default_tube(c->use)//注意:初始化conn对象时,客户端状态为STATE_WANTCOMMAND,即等待接收客户端命令;c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);c->srv = s;c->sock.x = c;c->sock.f = (Handle)prothandle; //设置客户端处理函数c->sock.fd = cfd;r = sockwant(&c->sock, 'r'); //epoll注册,监听可读事件
}

当客户端socket可读或可写时,会执行prothandle函数:

static void prothandle(Conn *c, int ev)
{h_conn(c->sock.fd, ev, c);
}static void h_conn(const int fd, const short which, Conn *c)
{//客户端断开链接,标记if (which == 'h') {c->halfclosed = 1;}//客户端数据交互(根据客户端状态不同执行不同的读写操作)conn_data(c);//解析完命令时,执行命令while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);}

3.3 服务器与客户端的数据交互

beanstalkd将客户端conn分为以下几种状态:等待接受命令,等待接收数据,等待回复数据,等待返回job等;

#define STATE_WANTCOMMAND 0
#define STATE_WANTDATA 1
#define STATE_SENDJOB 2
#define STATE_SENDWORD 3
#define STATE_WAIT 4
#define STATE_BITBUCKET 5
#define STATE_CLOSE 6

当客户端fd可读或者可写时,服务器根据当前客户端的状态执行不同的操作:

注意:TCP是基于流的,因此存在半包、粘包问题;即,服务器一次read的命令请求数据可能不完整,或者一次read多个命令请求的数据;

//有些状态操作已省略
static void conn_data(Conn *c)
{switch (c->state) {case STATE_WANTCOMMAND:r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read); //读取命令到输入缓冲区cmdc->cmd_read += r;c->cmd_len = cmd_len(c); //定位\r\n,并返回命令请求开始位置到\r\n长度;如果没有\r\b说明命令请求不完全,返回0if (c->cmd_len) return do_cmd(c); //如果读取完整的命令,则处理;否则意味着命令不完全,需要下次继续接收break;case STATE_WANTDATA: //只有当使用put命令发布任务时,才会携带数据;客户端状态才会成为STATE_WANTDATA;//而读取命令行时,已经携带了任务的必要参数,那时已经创建了任务,并存储在c->in_job字段j = c->in_job;r = read(c->sock.fd, j->body + c->in_job_read, j->r.body_size -c->in_job_read); //读取任务数据c->in_job_read += r; //记录任务读取了多少数据maybe_enqueue_incoming_job(c); //函数会判断任务数据是否已经读取完全,完全则将任务写入tube的ready或delay队列;后面会将break;case STATE_SENDWORD: //回复客户端命令请求r= write(c->sock.fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);c->reply_sent += r; //已经发送的字节数if (c->reply_sent == c->reply_len) return reset_conn(c); //如果返回数据已经发完,则重置客户端rw,关心可读事件;否则继续待发送数据状态break;case STATE_SENDJOB: //待发送jobj = c->out_job;//返回数据与jobiov[0].iov_base = (void *)(c->reply + c->reply_sent);iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */iov[1].iov_base = j->body + c->out_job_sent;iov[1].iov_len = j->r.body_size - c->out_job_sent;r = writev(c->sock.fd, iov, 2);c->reply_sent += r;if (c->reply_sent >= c->reply_len) {c->out_job_sent += c->reply_sent - c->reply_len;c->reply_sent = c->reply_len;}if (c->out_job_sent == j->r.body_size) { //如果job的数据已经发完,则重置客户端rw,关心可读事件;否则继续待发送jobreturn reset_conn(c);}break;}
}

3.4 命令的处理过程

3.4.1查找命令

//命令执行的入口函数
static void do_cmd(Conn *c)
{dispatch_cmd(c); //分发并执行命令fill_extra_data(c); //put命令时,不仅需要执行命令,还需要接续job数据
}static void
dispatch_cmd(Conn *c)
{//查找命令类型type = which_cmd(c);//switch处理各个命令switch (type) {…………}
}

beanstalkd有以下命令定义:

//命令字符串
#define CMD_PUT "put "
#define CMD_PEEK_READY "peek-ready"
#define CMD_RESERVE "reserve"
#define CMD_RELEASE "release "
…………//命令编码类型:
#define OP_UNKNOWN 0
#define OP_PUT 1
#define OP_PEEKJOB 2
#define OP_RESERVE 3
#define OP_DELETE 4
#define OP_RELEASE 5
…………

查找命令其实就是字符串比较:

static int which_cmd(Conn *c)
{//宏定义;比较输入缓冲区命令字符串与命令表中字符串比较,返回命令类型#define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);//宏替换后就是一系列if语句TEST_CMD(c->cmd, CMD_PUT, OP_PUT);TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);…………
}

3.4.2命令1——发布任务

case OP_PUT:r = read_pri(&pri, c->cmd + 4, &delay_buf); //解析优先级prir = read_delay(&delay, delay_buf, &ttr_buf); //解析delayr = read_ttr(&ttr, ttr_buf, &size_buf); //解析ttrbody_size = strtoul(size_buf, &end_buf, 10); //解析job字节数op_ct[type]++; //统计if (body_size > job_data_size_limit) { //job长度超过限制;返回return skip(c, body_size + 2, MSG_JOB_TOO_BIG);}//put,说明是生产者,设置conn类型为生产者connsetproducer(c);//初始化job结构体,存储在hash表all_jobs中c->in_job = make_job(pri, delay, ttr, body_size + 2, c->use);//解析客户端发来的任务数据,存储在c->in_job的body数据字段fill_extra_data(c);//校验job数据是否读取完毕,完了则入tube的队列maybe_enqueue_incoming_job(c);

任务入队列:

static void maybe_enqueue_incoming_job(Conn *c)
{job j = c->in_job;//任务数据已经读取完毕,入队列(ready或者delay队列)if (c->in_job_read == j->r.body_size) return enqueue_incoming_job(c);//任务数据没有读取完毕,则设置客户端conn状态未等待接收数据STATE_WANTDATAc->state = STATE_WANTDATA;
}static void enqueue_incoming_job(Conn *c)
{int r;job j = c->in_job;c->in_job = NULL; /* the connection no longer owns this job */c->in_job_read = 0;//入队列r = enqueue_job(c->srv, j, j->r.delay, 1);//返回数据;并设置conn状态为STATE_SENDWORDreply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->r.id);
}static int enqueue_job(Server *s, job j, int64 delay, char update_store)
{int r;j->reserver = NULL;if (delay) { //入delay队列,设置任务的deadline_atj->r.deadline_at = nanoseconds() + delay;r = heapinsert(&j->tube->delay, j);j->r.state = Delayed;} else { //入ready队列r = heapinsert(&j->tube->ready, j);if (!r) return 0;j->r.state = Ready; }//检查有没有消费者正在阻塞等待此tube产生job,若有需要返回job;process_queue();return 1;
}

返回命令回复给客户端:

//reply_line函数组装命令回复数据,调用reply函数;只是将数据写入到输出缓冲区,并修改了客户端状态为STATE_SENDWORD,实际发送数据在3.3节已经说过;
static void reply(Conn *c, char *line, int len, int state)
{if (!c) return;connwant(c, 'w');//修改关心的事件为可写事件c->next = dirty; //放入dirty链表dirty = c;c->reply = line; //输出数据缓冲区c->reply_len = len;c->reply_sent = 0;c->state = state; //设置conn状态
}

connwant函数实现如下:

void connwant(Conn *c, int rw)
{c->rw = rw; //c->rw记录当前客户端关心的socket事件connsched(c);
}void connsched(Conn *c)
{if (c->tickpos > -1) { //c->tickpos记录当前客户端在srv->conns堆的索引;(思考:tickpos在什么时候赋值的?heap的函数指针rec)heapremove(&c->srv->conns, c->tickpos);}c->tickat = conntickat(c); //计算当前客户端待发生的某个事件的时间if (c->tickat) {heapinsert(&c->srv->conns, c); //插入srv->conns堆}
}

问题1:connwant只是修改了conn的rw字段为‘w’,表示关心客户端的读时间,什么时候调用epoll注册呢?dirty链表又是做什么的呢?

beanstalkd有个函数update_conns负责更新客户端socket的事件到epoll;其在每次循环开始,执行epoll_wait之前都会执行;

static void update_conns()
{int r;Conn *c;while (dirty) { //遍历dirty链表,更新每一个conn关心的socket事件c = dirty;dirty = dirty->next;c->next = NULL;r = sockwant(&c->sock, c->rw);if (r == -1) {twarn("sockwant");connclose(c);}}
}

问题2:srv->conns存储的客户端都是在某个时间点有事件待处理的,客户端都有哪些事件需要处理呢?

  • 1)消费者获取job后,job的状态改为reserved,当TTR时间过后,如果客户端还没有处理完这个job,服务器会将这个job的状态重置为ready,以便让其他消费者可以消费;
  • 2)消费者调用reserve获取job时,假如其监听的tube没有ready状态的job,那么客户端将会被阻塞,直到有job产生,或者阻塞超时;

//计算当前客户端待处理事件的deadline
static int64 conntickat(Conn *c)
{//客户端正在阻塞if (conn_waiting(c)) {margin = SAFETY_MARGIN;}//如果客户端有reserved状态的任务,则获取到期时间最近的;(当客户端处于阻塞状态时,应该提前SAFETY_MARGIN时间处理此事件)//connsoonestjob:获取到期时间最近的reserved jobif (has_reserved_job(c)) {t = connsoonestjob(c)->r.deadline_at - nanoseconds() - margin;should_timeout = 1;}//客户端阻塞超时时间if (c->pending_timeout >= 0) {t = min(t, ((int64)c->pending_timeout) * 1000000000);should_timeout = 1;}//返回时间发生的时间;后续会将此客户端插入srv->conns堆,且是按照此时间排序的;if (should_timeout) {return nanoseconds() + t;}return 0;
}

问题3:当生产者新发布一个job到某个tube时,此时可能有其他消费者监听此tube,且阻塞等待job的产生,此时就需要将此job返回给消费者;处理函数为process_queue

static void process_queue()
{int64 now = nanoseconds();while ((j = next_eligible_job(now))) { //遍历所有tube,当tube有客户端等待,且有ready状态的job时,返回jobheapremove(&j->tube->ready, j->heap_index);//ms_take:将客户端从此job所属tube的waiting集合中删除;并返回客户端conn//remove_waiting_conn:从当前客户端conn监听的所有tube的waiting队列中移除自己//reserve_job:返回此job给客户端reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);}
}static job next_eligible_job(int64 now)
{tube t;size_t i;job j = NULL, candidate;//循环所有tubefor (i = 0; i pause) { //假如tube正在暂停,且超时时间未到,则跳过if (t->deadline_at > now) continue;t->pause = 0;}if (t->waiting.used && t->ready.len) { //tube的waiting集合有元素说明有客户端正在阻塞等待此tube产生任务;有ready状态的任务candidate = t->ready.data[0]; //从tubes里获取满足条件的优先级最高得job返回if (!j || job_pri_less(candidate, j)) {j = candidate;}}}return j;
}Conn * remove_waiting_conn(Conn *c)
{tube t;size_t i;if (!conn_waiting(c)) return NULL;c->type &= ~CONN_TYPE_WAITING; //去除CONN_TYPE_WAITING标志global_stat.waiting_ct--;for (i = 0; i watch.used; i++) { //遍历客户端监听的所有tube,挨个从tube的waiting队列中删除自己t = c->watch.items[i];t->stat.waiting_ct--;ms_remove(&t->waiting, c);}return c;
}static void reserve_job(Conn *c, job j)
{j->r.deadline_at = nanoseconds() + j->r.ttr; //job的实效时间j->r.state = Reserved; //状态改为Reservedjob_insert(&c->reserved_jobs, j); //插入客户端的reserved_jobs链表j->reserver = c; //记录job当前消费者if (c->soonest_job && j->r.deadline_at soonest_job->r.deadline_at) { //soonest_job记录最近要到期的Reserved状态的job,更新;c->soonest_job = j;}return reply_job(c, j, MSG_RESERVED); //返回job
}

3.4.3 命令2——获取任务reserve

case OP_RESERVE_TIMEOUT:timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10); //reserve可以设置阻塞超时时间,解析case OP_RESERVE:op_ct[type]++;connsetworker(c); //设置客户端类型为消费者CONN_TYPE_WORKER//当客户端有多个任务正在处理,处于reserved状态,且超时时间即将到达时;如果此时客户端监听的所有tube都没有ready状态的任务,则直接返回MSG_DEADLINE_SOON给客户端if (conndeadlinesoon(c) && !conn_ready(c)) {return reply_msg(c, MSG_DEADLINE_SOON);}//设置当前客户端正在等待jobwait_for_job(c, timeout);//同3.4.2节process_queue();

上面说过,当客户端有多个任务正在处理,处于reserved状态,且超时时间即将到达时;

如果此时客户端监听的所有tube都没有ready状态的任务,则直接返回MSG_DEADLINE_SOON给客户端;

否则会导致客户端的阻塞,导致这些reserved的任务超时;

static void wait_for_job(Conn *c, int timeout)
{c->state = STATE_WAIT; //设置客户端状态为STATE_WAITenqueue_waiting_conn(c); //将客户端添加到其监听的所有tube的waiting队列中//设置客户端的超时时间c->pending_timeout = timeout;//修改关心的事件为可读事件connwant(c, 'h');c->next = dirty; //将当前客户端添加到dirty链表中dirty = c;
}static void enqueue_waiting_conn(Conn *c)
{tube t;size_t i;global_stat.waiting_ct++;c->type |= CONN_TYPE_WAITING;for (i = 0; i watch.used; i++) { //c->watch为客户端监听的所有tubet = c->watch.items[i];t->stat.waiting_ct++;ms_append(&t->waiting, c); //t->waiting为等待当前tube有任务产生的所有客户端}
}

3.4.4 循环之始epoll_wait之前

在执行epoll_wait之前,需要计算超时时间;不能被epoll_wait一直阻塞;服务器还有很多事情待处理;

  • 1)将状态未delay的且已经到期的job移到ready队列;
  • 2)tube暂停时间到达,如果tube存在消费者阻塞等待获取job,需要返回job给客户端;
  • 3)消费者消费的状态为reserved的job可能即将超时到期;
  • 4)客户端阻塞等待job的超时时间可能即将达到;

服务器需要及时处理这些所有事情,因此epoll_wait等待时间不能过长;

int64 prottick(Server *s)
{int64 period &#61; 0x34630B8A000LL; //默认epoll_wait等待时间now &#61; nanoseconds();while ((j &#61; delay_q_peek())) { //遍历所有tube的delay队列中过期时间已经到达或者即将的job&#xff08;即将到达时间最小&#xff09;d &#61; j->r.deadline_at - now;if (d > 0) {period &#61; min(period, d); //即将到达&#xff0c;更新periodbreak;}j &#61; delay_q_take();r &#61; enqueue_job(s, j, 0, 0); //job入队到ready队列if (r <1) bury_job(s, j, 0); /* out of memory, so bury it */}for (i &#61; 0; i deadline_at - now;if (t->pause && d <&#61; 0) { //tube暂停期限达到&#xff0c;process_queue同3.4.2节t->pause &#61; 0;process_queue();}else if (d > 0) {period &#61; min(period, d); //tube暂停即将到期&#xff0c;更新period}}while (s->conns.len) {Conn *c &#61; s->conns.data[0]; //循环获取conn待执行事件发生时间最早的d &#61; c->tickat - now;if (d > 0) { //发生事件未到&#xff0c;更新period&#xff0c;结束循环period &#61; min(period, d);break;}heapremove(&s->conns, 0); //否则&#xff0c;移除conn&#xff0c;处理客户端事件conn_timeout(c);}update_conns(); //更新客户端关心的socke事件&#xff0c;其实就是遍历dirty链表return period;
}static job delay_q_peek()
{int i;tube t;job j &#61; NULL, nj;for (i &#61; 0; i delay.len &#61;&#61; 0) {continue;}nj &#61; t->delay.data[0];if (!j || nj->r.deadline_at r.deadline_at) j &#61; nj;}return j;
}static void conn_timeout(Conn *c)
{int r, should_timeout &#61; 0;job j;//客户端正在被阻塞时&#xff0c;如果有reserved状态的job即将到期&#xff0c;则需要解除客户端阻塞//conndeadlinesoon&#xff1a;查询到期时间最小的reserved job&#xff0c;校验其是否即将到期&#xff08;1秒内到期&#xff09;if (conn_waiting(c) && conndeadlinesoon(c)) should_timeout &#61; 1;//connsoonestjob获取到期时间最近的reserved jobwhile ((j &#61; connsoonestjob(c))) {if (j->r.deadline_at >&#61; nanoseconds()) break;timeout_ct&#43;&#43;; //已经超时j->r.timeout_ct&#43;&#43;;r &#61; enqueue_job(c->srv, remove_this_reserved_job(c, j), 0, 0); //从客户端的reserved_jobs链表移除job&#xff0c;重新入到tube的相应job队列if (r <1) bury_job(c->srv, j, 0); /* out of memory, so bury it */connsched(c); //重新计算conn待处理事件的时间&#xff0c;入srv->conns堆}if (should_timeout) {return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON); //reserved即将到期&#xff0c;解除阻塞&#xff0c;返回MSG_DEADLINE_SOON消息} else if (conn_waiting(c) && c->pending_timeout >&#61; 0) { //客户端阻塞超时&#xff0c;解除阻塞c->pending_timeout &#61; -1;return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);}
}

4、总结

本文主要介绍beanstalkd基本设计思路&#xff1b;从源码层次分析主要数据结构&#xff0c;服务器初始化过程&#xff0c;简要介绍了put和reserve两个命令执行过程&#xff1b;

beanstalkd其他的命令就不再介绍了&#xff0c;基本类似&#xff0c;感兴趣的可以自己研究。



推荐阅读
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • Whatsthedifferencebetweento_aandto_ary?to_a和to_ary有什么区别? ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • ejava,刘聪dejava
    本文目录一览:1、什么是Java?2、java ... [详细]
  • 流数据流和IO流的使用及应用
    本文介绍了流数据流和IO流的基本概念和用法,包括输入流、输出流、字节流、字符流、缓冲区等。同时还介绍了异常处理和常用的流类,如FileReader、FileWriter、FileInputStream、FileOutputStream、OutputStreamWriter、InputStreamReader、BufferedReader、BufferedWriter等。此外,还介绍了系统流和标准流的使用。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • Webpack5内置处理图片资源的配置方法
    本文介绍了在Webpack5中处理图片资源的配置方法。在Webpack4中,我们需要使用file-loader和url-loader来处理图片资源,但是在Webpack5中,这两个Loader的功能已经被内置到Webpack中,我们只需要简单配置即可实现图片资源的处理。本文还介绍了一些常用的配置方法,如匹配不同类型的图片文件、设置输出路径等。通过本文的学习,读者可以快速掌握Webpack5处理图片资源的方法。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • C语言注释工具及快捷键,删除C语言注释工具的实现思路
    本文介绍了C语言中注释的两种方式以及注释的作用,提供了删除C语言注释的工具实现思路,并分享了C语言中注释的快捷键操作方法。 ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
author-avatar
手机用户2602902715
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有