云风的skynet,定义为一个游戏服务器框架,用c + lua基于Actor模型实现。代码极其精简,c部分的代码只有三千行左右。
整个skynet框架要解决的核心问题是:把一个消息(数据包)从一个服务(Actor)发送给另一个服务(Actor),并接收其返回。也就是在同一进程内(作者也强调并非只限于同一进程,因为可能会有集群间的通讯)的一个服务通过类似rpc之类的调用同一进程内的另外一个服务,并接收处理结果。而skynet就是处理这些服务间发送数据包的规则和正确性。
skynet的核心层全部是c来实现。
当系统启动的时候,会得到一个提前分配好的节点id,我们称之为harbor id,这个id是集群用的,一个集群内可以启动很多个skynet节点,每个节点都会分配到唯一的id。
一个节点(即一个进程)内有很多个服务,服务可以狭义地暂且理解为功能模块。
当初始化一个服务的时候,会生成一个skynet_context来作为服务的实例;一个唯一(即使是在集群里也是唯一)的服务handle,即服务的唯一id,用来识别服务;一个消息队列message_queue;还要向框架注册一个callback,当服务收到有发送来的消息时,通过这个方法传入。
初始化一个服务的代码如下:
struct skynet_context *
skynet_context_new(const char * name, const char *param) {struct skynet_module * mod = skynet_module_query(name);if (mod == NULL)return NULL;void *inst = skynet_module_instance_create(mod);if (inst == NULL)return NULL;struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));CHECKCALLING_INIT(ctx)ctx->mod = mod;ctx->instance = inst;ctx->ref = 2;ctx->cb = NULL;ctx->cb_ud = NULL;ctx->session_id = 0;ctx->logfile = NULL;ctx->init = false;ctx->endless = false;ctx->handle = 0; ctx->handle = skynet_handle_register(ctx);struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);context_inc();CHECKCALLING_BEGIN(ctx)int r = skynet_module_instance_init(mod, inst, ctx, param);CHECKCALLING_END(ctx)if (r == 0) {struct skynet_context * ret = skynet_context_release(ctx);if (ret) {ctx->init = true;}skynet_globalmq_push(queue);if (ret) {skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");}return ret;} else {skynet_error(ctx, "FAILED launch %s", name);uint32_t handle = ctx->handle;skynet_context_release(ctx);skynet_handle_retire(handle);struct drop_t d = { handle };skynet_mq_release(queue, drop_message, &d);return NULL;}
}
在skynet_handle_register方法中生成一个服务handle,handle是一个32位的整数,在生成handle的时候,是把该节点的harbor id写到了handle的高8位里面,所以一个服务的handle,就可以知道这个服务是哪个节点的。
s->handle_index = handle + 1;rwlock_wunlock(&s->lock);handle |= s->harbor;return handle;
所以说,harbor id最高也只有256个,也就意味着skynet集群最多只能有256个节点,而一个节点里最多也只能有24位个服务,即1.6M个。因为一个handle是32位的整数,高8位用来存储harbor id,只有低的24位用来分配给本节点的handle。
消息队列message_queue是用来存储发送给该服务的消息的。所有发送给该服务的消息,都要先压到该服务的消息队列中。
服务启动起来了,来看看数据包是如何从一个服务发送给另一个服务的。来看看 skynet_send 和 callback 函数的定义:
int skynet_send(struct skynet_context * context,uint32_t source,uint32_t destination,int type,int session,void * msg,size_t sz
);typedef int (*skynet_cb)(struct skynet_context * context,void *ud,int type,int session,uint32_t source ,const void * msg,size_t sz
);
source和destination分别是发送方和接收方的handle。
type是发送方和接收方处理数据包的协议
session识别本次调用的口令,发送方发送一个消息后,保留该session,以便收到回应数据包时,能识别出是哪一次调用。
msg/sz是数据包的内容和长度,成对使用
skynet 的消息调度
Skynet 维护了两级消息队列。
每个服务实体有一个私有的消息队列,队列中是一个个发送给它的消息。消息由四部分构成:
struct skynet_message {uint32_t source;int session;void * data;size_t sz;
};
向一个服务发送一个消息,就是把这样一个消息体压入这个服务的私有消息队列中。这个结构的值复制进消息队列的,但消息内容本身不做复制。
Skynet 维护了一个全局消息队列,里面放的是诸个不为空的次级消息队列。
在 Skynet 启动时,建立了若干工作线程(数量可配置),它们不断的从主消息列队中取出一个次级消息队列来,再从次级队列中取去一条消息,调用对应的服务的 callback 函数进行出来。为了调用公平,一次仅处理一条消息,而不是耗净所有消息(虽然那样的局部效率更高,因为减少了查询服务实体的次数,以及主消息队列进出的次数),这样可以保证没有服务会被饿死。
这样,skynet就实现了把一个消息(数据包)从一个服务发送给另一个服务。
给大家推荐一个关于skynet项目实战的一个训练营 现在报名相当于免费,主讲内容:
多核并发编程
消息队列。线程池
actor消息调度
网络模块实现
时间轮定时器实现
lua/c/接口编程
skynet编程精要
demo演示actor编程思维
地址:点击观看视频
更多skynet资料加群:832218493免费领取!