nsq 是用go语言实现的分布式队列。阅读源码对go语言的chanel,分布式有着更好的理解
核心代码分位3部分:
官方的介绍为
nsqd is the daemon that receives, queues, and delivers messages to clients.
It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).
It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.
大意为:nsqd是接收,分发队列信息的守护进程。一般集群化运行,也可以独自部署。
下面对nsqd的2个逻辑做一次学习
在Makefile中,写到
$(BLDDIR)/nsqd: $(wildcard apps/nsqd/*.go nsqd/*.go nsq/*.go internal/*/*.go)
可以找到nsqd的代码入口在apps/nsqd/nsqd.go
apps/nsqd/nsqd.go
这个文件作为程序入口,主要做了几件事情:
首先作者使用svc包来控制程序的启动:
type program struct {
nsqd *nsqd.NSQD
}
func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
log.Fatal(err)
}
}
func (p *program) Init(env svc.Environment) error {...}
func (p *program) Start() error {...}
func (p *program) Stop() error {...}
使用svc 能更简洁的保证程序干净的退出。在nsqd中,退出信号有两个:SIGINT(输入任意健) 和 SIGTERM(kill)。
Start()函数是主要逻辑的入口,在函数中引用了NewOptions(),它会创建一个默认的Options 结构。Options 后续会作为nsqd启动的参数来源
opts := nsqd.NewOptions()
作者通过flag包实现了命令行参数接收,如果命令行中执行配置文件,会同时读取配置文件。根据配置文件,命令行参数,来创建一个nsqd结构
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)
接下来会加载数据
err := nsqd.LoadMetadata()
err = nsqd.PersistMetadata()
LoadMetadata()过程为:
PersistMetadata()过程为:
接下来调用启动nsqd的主逻辑nsqd.Main(),主要完成以下过程
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
})
n.waitGroup.Wrap(func() { n.queueScanLoop() })
n.waitGroup.Wrap(func() { n.lookupLoop() })
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(func() { n.statsdLoop() })
}
这里使用到了waitGroup,它是一个groutines 的控制包,能上线类似python 的join()功能。可以实现所有groutines都执行完再退出。
作者封装了waitGroup库
type WaitGroupWrapper struct {
sync.WaitGroup
}
func (w *WaitGroupWrapper) Wrap(cb func()) {
w.Add(1)
go func() {
cb()
w.Done()
}()
}
Add() 会计数器加1,Done()使得计数器减一。此外WaitGroup提供Wait()函数:当计数器归0时,继续执行,否则阻塞。等待线程执行完再退出的作用。
此外,将函数作为参数,再在内部groutines执行,和python的装饰器的用法类似。
回到Main()函数中,启动http_api利用到了github.com/nsqio/nsq/internal/http_api
包, 设置router等参数后,启动。
queueScanLoop() 是管道扫进程,他的逻辑是将tpic,channel中的数据读入到worker channel, 并每隔一定的时间更新worker数量,扫描chanel中的数据。
select {
case <-workTicker.C:
if len(channels) == 0 {
continue
}
case <-refreshTicker.C:
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
goto exit
}
这里使用select来监听io操作,每隔扫描间隔时,判断channel中的是否存在数据需要处理,如果没有,则略过本次扫描。
每隔刷新间隔判断worker数量是否发生变化。
loop:
numDirty := 0
for i := 0; i
numDirty++
}
}
if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
goto loop
}
这里还有dirty比率的概念,channel中有数据就认为是dirty,当该比率超过配置中的值时,则继续处理调用worker来处理,而不是等待固定间隔才进行扫描。
启动lookupLoop()和statsdLoop();这两个函数的作用初步看和nsqdlookup通信用,细节还未了解。
上面阐述了nsqd的启动逻辑。nsqd使用http api和用户交互
在api文档中,看到pub接口用来发布信息:
使用示例curl -d "
在nsqd/http.go中,定义了路由规则
func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer {
...
s := &httpServer{
ctx: ctx,
tlsEnabled: tlsEnabled,
tlsRequired: tlsRequired,
router: router,
}
router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
...
}
在doPUB()函数中,可以看到数据存储时,最终调用了opic.PutMessage(msg)
err = topic.PutMessage(msg)
func (t *Topic) PutMessage(m *Message) error {
t.RLock()
defer t.RUnlock()
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New("exiting")
}
err := t.put(m)
if err != nil {
return err
}
atomic.AddUint64(&t.messageCount, 1)
return nil
}
PutMessage的逻辑是做并发控制(加锁)后,调Topic.put(*Message) 来写入信息。
这里有两个锁控制机制:
RLoclk
go语言中,sync包有两种锁,分别是互斥锁sync.Mutex和读写锁sync.RWMutex。
type Mutex
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
type RWMutex
func (rw *RWMutex) Lock()
func (rw *RWMutex) RLock()
func (rw *RWMutex) RLocker() Locker
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Unlock()
互斥锁倾向于在全局使用,一旦加锁,就必须解锁之后才能访问。不二次加锁、二次解锁都会报错。
读写锁用在读远多于写的场景。
Lock()表示写加锁,加写锁前,如果已经存在写锁,或者其他读锁,会阻塞住,直到锁可用。已阻塞的 Lock 调用会从获得的锁中排除新的读取器,即写锁权限高于读锁,有写锁时优先进行写锁定。
RLock()表示读加锁,当有写锁时,无法加载读锁,当只有读锁或者没有锁时,可以加载读锁,读锁可以加载多个,所以适用于"读多写少"的场景。
关于读写锁的具体例子请参考golang中sync.RWMutex和sync.Mutex区别
atomic
atomic是sync包中的另一种锁机制,在实现上,它比互斥锁层级更低:互斥锁调用的是golang的api,而atomic是在内核层面实现。因此它比互斥锁效率更高,但是使用上也存在一定的限制。如果使用存储相关接口,存入的是nil,或者类型不对,会报错。
此外,在一些文章中,以及stack overflow中都提到尽量少用atomic,具体原因还不知道。
atomic有几种常见的函数:
具体查看atomic介绍
在上面的PutMessage逻辑中,增加topic读锁和topic中的部分值的原子操作锁后,调用了put()函数来实现写入。
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
return nil
}
put函数的操作是,将Message写入channel,如果该topic的memoryMsgChan长度满了,则通过default逻辑,写入buffer中.
buffer的实现是利用了sync.Pool包,相当于是一块缓存,在gc前释放,存储的长度受限于内存大小。
这里有两个问题:
经过查找,发现处理上述两个channel的函数是messagePump,而messagePump在创建一个新Topic时会被后台调用:
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
...
t.waitGroup.Wrap(func() {t.messagePump()})
...
}
func (t *Topic) messagePump() {
...
if len(chans) > 0 {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
select {
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
...
}
...
for i, channel := range chans {
chanMsg := msg
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
...
err := channel.PutMessage(chanMsg)
...
}
...
}
上述调用了channel的PutMessage()完成了message写入channel的memoryMsgChan中,写入逻辑和写入topic逻辑类似。到这里完成了数据的写入流程分析。
官方的介绍如下
nsqlookupd is the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information.
There are two interfaces: A TCP interface which is used by nsqd for broadcasts and an HTTP interface for clients to perform discovery and administrative actions.
大意为:nsqlookup是管理nsqd集群拓补信息的守护进程。nsqlookup用于
下面梳理一下nsqllookup的两个逻辑:
根据查询数据的过程进行梳理,nsq提供了几个封装好的查询接口,如果nsq_tail、nsq_to_file 等。此处从nsq_til 举例查看。
nsq_tail中主要逻辑如下:
consumers := []*nsq.Consumer{}
for i := 0; i
consumer, err := nsq.NewConsumer(topics[i], *channel, cfg)
if err != nil {
log.Fatal(err)
}
consumer.AddHandler(&TailHandler{topicName: topics[i], totalMessages: *totalMessages})
err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
log.Fatal(err)
}
err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
log.Fatal(err)
}
cOnsumers= append(consumers, consumer)
}
nsq_tail的逻辑是针对每个topic,分别初始化一个消费者consumer, 此处consumer实现的库是go-nsq。
并实现一个nsq_tail逻辑的handler,初始化在consumer中。
之后从nsqd和nsqdlookup中获取数据,并调用handler处理。
在go-nsq/consumer.go中,ConnectToNSQLookupd()会调用queryLookupd()和lookupdLoop(),而lookupdLoop()又会定期调用queryLookupd()。代码如下:
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
...
if numLookupd == 1 {
r.queryLookupd()
r.wg.Add(1)
go r.lookupdLoop()
}
...
}
func (r *Consumer) lookupdLoop() {
...
for {
select {
case <-ticker.C:
r.queryLookupd()
case <-r.lookupdRecheckChan:
r.queryLookupd()
case <-r.exitChan:
goto exit
}
}
...
}
// make an HTTP req to one of the configured nsqlookupd instances to discover
// which nsqd's provide the topic we are consuming.
//
// initiate a connection to any new producers that are identified.
func (r *Consumer) queryLookupd() {
...
var nsqdAddrs []string
for _, producer := range data.Producers {
broadcastAddress := producer.BroadcastAddress
port := producer.TCPPort
joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
nsqdAddrs = append(nsqdAddrs, joined)
}
// apply filter
if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {
nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
}
for _, addr := range nsqdAddrs {
err = r.ConnectToNSQD(addr)
if err != nil && err != ErrAlreadyConnected {
r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
continue
}
}}
在queryLookupd()中,获取到生产者信息后,调用ConnectToNSQD()连接每个nsqd server。用ConnectToNSQD()实现了读取message。
ConnectYpNSQD()调用了connection结果的函数readLoop()。
func (c *Conn) readLoop() {
for {
...
frameType, data, err := ReadUnpackedResponse(c)
...
switch frameType {
case FrameTypeResponse:
c.delegate.OnResponse(c, data)
case FrameTypeMessage:
msg, err := DecodeMessage(data)
if err != nil {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
goto exit
}
msg.Delegate = delegate
msg.NSQDAddress = c.String()
atomic.AddInt64(&c.rdyCount, -1)
atomic.AddInt64(&c.messagesInFlight, 1)
atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
c.delegate.OnMessage(c, msg)
...
}
}
在c.delegate.OnMessage(c, msg)中,会将message写入Consumer.incomingMessages。完成数据读取。