源码版本信息
Project: kubernetes
Branch: master
Last commit id: d25d741c (2021-09-26)
在 Operator 开发过程中难免会用到 Event 对象,所以很有必要了解 Event 相关细节,可以避免很多 bug 的产生。client-go 在处理 Event 的时候,有这样一些特性:
如果 apiserver 失联,会重试发送 12 次,第一次间隔是 [0,10),剩余每次间隔 10s,合计110-120 s 左右如果还连不上 apiserver 就会放弃本次事件的发送;
client-go 在发送 event 之前会先进行一系列预处理流程,如果相似 event 的聚合,效果就是新发送一个关于相同资源对象的 Reason 和 Message 都相同的 event,这时候新 event 的 count 就是这类事件发生的次数,LastTimestamp 是事件产生时间,FirstTimestamp 是第一次观察到这类事件的时间;并且快速发送多个一样的 event 满足一定条件时会被聚合成一个;
client-go 中发送 event 的 burst 是 25,qps 是 1/300,意味着令牌桶大小是 25,5分钟产生一块新令牌,换言之快速发送 25 个 event 之后,5分钟内发送的 event 会被丢弃;
消息广播器的缓冲区大小是 1000,如果产生事件的速度太快,当 EventWatcher 来不及处理时,新产生的 event 也会被直接丢弃;
以 job 控制器中 event 的用法为例,大致步骤如下
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"})
recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
从这里我们可以得到几个关键信息,首先是涉及到的几个主要对象:
EventBroadcaster
EventSink
EventRecorder
从名字直接猜的话,也许是这样工作的:EventRecorder 产生 events,EventBroadcaster 广播 events,EventSink,不好猜,字面意思是个事件槽,大概就是事件的一个中转站,最后通过这个 Sink 事件会流转到其他地方,比如 logger 或者 apiserver?到这里为止纯属YY…… 下面具体来看。
EventBroadcaster 用来接收 events,然后发送到一个 EventSink、watcher 或者 log 中;先看下接口定义:
client-go/tools/record/event.go:113
type EventBroadcaster interface {
// 将从 EventBroadcaster 接收到的 events 丢给 eventHandler
StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
// 将从 EventBroadcaster 接收到的 events 丢给 EventSink
StartRecordingToSink(sink EventSink) watch.Interface
// 将从 EventBroadcaster 接收到的 events 丢给指定的日志函数
StartLogging(logf func(format string, args ...interface{})) watch.Interface
// 将从 EventBroadcaster 接收到的 events 丢给指定的结构化日志函数
StartStructuredLogging(verbosity klog.Level) watch.Interface
// 用于获取 EventRecorder,EventRecorder 可以发送 events 给 EventBroadcaster
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
Shutdown()
}
这里注意不要找到 tools/events 包里去,两个包有很相似的代码,看起来是 tools/events 包已经过期了,目前引用的都是 tools/record 包。
EventBroadcaster 对应的实现是 eventBroadcasterImpl
client-go/tools/record/event.go:181
type eventBroadcasterImpl struct {
*watch.Broadcaster
sleepDuration time.Duration
options CorrelatorOptions
}
前面提到第一步就是 eventBroadcaster := record.NewBroadcaster()
调用,我们下面看看这里的 New 过程
实例化一个 EventBroadcaster 的过程中会直接开启一个 goroutine 来从 Broadcaster.imcoming
接收 Event,然后分发给所有的 Broadcaster.watchers
client-go/tools/record/event.go:159
func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), // maxQueuedEvents == 1000
sleepDuration: defaultSleepDuration, // 10s
}
}
这里通过调用 watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)
来创建 Broadcaster,继续看下逻辑:
func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, queueLength),
stopped: make(chan struct{}),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1) // wg.Add(1)
go m.loop()
return m
}
这里实例化了一个 Broadcaster,先看下 Broadcaster 的结构:
apimachinery/pkg/watch/mux.go:42
type Broadcaster struct {
watchers map[int64]*broadcasterWatcher
nextWatcher int64
distributing sync.WaitGroup
incoming chan Event
stopped chan struct{}
watchQueueLength int
fullChannelBehavior FullChannelBehavior
}
这里的 Event 是这个结构:
type Event struct {
Type EventType // "ADDED" / "MODIFIED" / "DELETED" / "BOOKMARK" / "ERROR"
// 如果 EventType 是 "ADDED"/"MODIFIED",Object 是对象的最新状态;
// 如果 EventType 是 "DELETED",Object 是对象删除前的状态;
// 如果 EventType 是 "BOOKMARK",Object 里只有 ResourceVersion 字段被设置,客户端会保证不会收到重复的 event 或者丢失任何一个 event
Object runtime.Object
}
回过来看调用的 m.loop()
,loop() 方法的逻辑是从 m.incoming 接受消息,然后分发给所有的 watchers
func (m *Broadcaster) loop() {
for event := range m.incoming { // chan Event 类型,这里的 Event apimachinery/pkg/watch.Event
if event.Type == internalRunFunctionMarker { // 如果是 fake 类型的 Event,就直接调用其内部 Obj 的方法,下面有具体 internalRunFunctionMarker 逻辑的用途分析,在 blockQueue 方法里会讲到。
event.Object.(functionFakeRuntimeObject)()
continue
}
m.distribute(event) // 分发逻辑
}
m.closeAll()
m.distributing.Done() // wg.Done()
}
继续看 distribute 逻辑,这里主要是将一个 Event 分发给所有的 watcher
func (m *Broadcaster) distribute(event Event) {
if m.fullChannelBehavior == DropIfChannelFull { // 默认行为,如果 channel 满了不阻塞,直接丢弃消息
for _, w := range m.watchers { // map[int64]*broadcasterWatcher
select {
case w.result <- event: // 将 event 丢给 broadcasterWatcher.result
case <-w.stopped:
default: // 当 w.result 满了写不进去时直接继续下一轮循环,区别于 else 里的阻塞行为
}
}
} else {
for _, w := range m.watchers {
select {
case w.result <- event: // result 满了会阻塞
case <-w.stopped:
}
}
}
}
开头提到的 job 控制器里 events 相关代码里接着两步是这样的:
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
这两个函数调用代码都不长,主要逻辑在内部的 StartEventWatcher 中。简单看下两个方法的定义:
// client-go/tools/record/event.go:190
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watch.Interface {
return e.StartEventWatcher(
func(e *v1.Event) { // 收到一个 Event,直接打印到日志里
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
})
}
// ......
// client-go/tools/record/event.go:291
func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
eventCorrelator := NewEventCorrelatorWithOptions(e.options)
return e.StartEventWatcher(
func(event *v1.Event) {
recordToSink(sink, event, eventCorrelator, e.sleepDuration)
})
}
我们来看 StartEventWatcher 逻辑,然后回过来聊这两个方法。
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher := e.Watch() // Watch 过程下面来看
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() { // 前面 Broadcaster 就是往这里丢的 Event
event, ok := watchEvent.Object.(*v1.Event) // Event.Object 就是具体的 corev1.Event
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
eventHandler(event) // 丢给 handler 函数处理
}
}()
return watcher
}
StartEventWatcher()
方法的入参是一个能处理 Event 的 handler 函数,这里的 Event 是 corev1.Event,也就是我们通过 kubectl 命令具体看到的 Event 资源对象。上面有一个 Watch()
方法的调用,我们分析下具体内容。
Watch() 方法会 new 一个 watcher,然后加到 m.watchers map 里,返回这个 watcher,这个 watcher 不会接收到历史 events,而且会阻塞到成功加入 Broadcaster 为止。比如 Broadcaster 到 incoming 队列里已经有很多 Event 了,这时候新启动一个 watcher 直接开始工作会收到老消息,下面通过 blockQueue 逻辑实现了只接收新消息的逻辑,具体看下代码:
func (m *Broadcaster) Watch() Interface {
var w *broadcasterWatcher
m.blockQueue(func() { // blockQueue() 下面有分析
id := m.nextWatcher
m.nextWatcher++
w = &broadcasterWatcher{
result: make(chan Event, m.watchQueueLength),
stopped: make(chan struct{}),
id: id,
m: m,
}
m.watchers[id] = w
})
if w == nil {
panic("broadcaster already stopped")
}
return w
}
blockQueue 用来阻塞 incoming 队列用的,就是往 incoming 里加入一个 fake 的 Event,然后挂起当前 gorouting,直到这个 Event 被处理到为止
func (m *Broadcaster) blockQueue(f func()) {
select {
case <-m.stopped:
return
default:
}
var wg sync.WaitGroup
wg.Add(1)
m.incoming <- Event{
Type: internalRunFunctionMarker, // "internal-do-function"
Object: functionFakeRuntimeObject(func() {
defer wg.Done() // 这个 Event 被消费后会调用到 Done()
f() // 阻塞结束后调用
}),
}
wg.Wait() // 阻塞直到上面加入到 Event 被处理完
}
讲完了 StartEventWatcher 的逻辑,回过头看一下 StartRecordingToSink 的具体逻辑。StartRecordingToSink 的作用是将从指定 eventBroadcaster 接收到的消息传送到给定的 sink 中去
client-go/tools/record/event.go:190
func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
eventCorrelator := NewEventCorrelatorWithOptions(e.options)
return e.StartEventWatcher(
func(event *v1.Event) {
recordToSink(sink, event, eventCorrelator, e.sleepDuration)
})
}
这里传给 StartEventWatcher()
方法的 handler 函数是 recordToSink(sink, event, eventCorrelator, e.sleepDuration)
,看下这个 handler 是怎么实现的:
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
// 修改前复制一份,因为一个 event 有多个 listener
eventCopy := *event
event = &eventCopy
result, err := eventCorrelator.EventCorrelate(event) // 聚合处理等,下面会提到
if err != nil {
utilruntime.HandleError(err)
}
if result.Skip {
return
}
tries := 0
for {
// 具体执行将 event 写到 sink 的过程,这里是在 if 的条件里,所以直到成功了才会 break
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
if tries >= maxTriesPerEvent { // 最多重试 12 次
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
}
// 第一次 sleep 随机一点,避免 apiserver 失联的时候所有 client 一起失败
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) // 10s * [0.0,1.0)
} else {
time.Sleep(sleepDuration) // 10s
}
}
}
client-go/tools/record/event.go:238
func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
var newEvent *v1.Event
var err error
if updateExistingEvent { // 如果是更新已有的 event,则调用 Patch 方法
newEvent, err = sink.Patch(event, patch)
}
// 更新可能失败,因为这个 event 可能已经被删除了
if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
// 如果是新建,则需要确保 ResourceVersion 为空
event.ResourceVersion = ""
newEvent, err = sink.Create(event)
}
if err == nil {
// 更新 eventCorrelator 状态
eventCorrelator.UpdateState(newEvent)
return true
}
// 连不上 apiserver 等原因引起的失败
switch err.(type) {
case *restclient.RequestConstructionError:
// 这种情况重试也会失败,所以直接返回 true
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
return true
case *errors.StatusError: // 服务器端拒绝更新,放弃
if errors.IsAlreadyExists(err) {
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
} else {
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
}
return true
case *errors.UnexpectedObjectError:
default: // http 传输问题,比如失联,需要重试
}
klog.Errorf("Unable to write event: '%#v': '%v'(may retry after sleeping)", event, err)
return false
}
接口定义如下
client-go/tools/record/event.go:47
type EventSink interface {
Create(event *v1.Event) (*v1.Event, error)
Update(event *v1.Event) (*v1.Event, error)
Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
}
实现是
type EventSinkImpl struct { Interface EventInterface}
回到一开始的用法:eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
可以看到 Interface 引用的是 kubeClient.CoreV1().Events("")
,这里的逻辑就到了 client-go 的 clientset 中了,这里的类型也就是 corev1.EventInterface
,所以上面接口的 Create、Update、Patch 也就都是通过 clientset 来实现的。
EventCorrelator 的作用是预处理所有 events,聚合频繁产生的相似的 events,将多次接受到的 events 聚合成一个等,从而降低系统压力。
上面提到一个 eventCorrelator.EventCorrelate() 调用,首先看下对象定义:
client-go/tools/record/events_cache.go:405
type EventCorrelator struct {
// 过滤器
filterFunc EventFilterFunc
// 聚合器
aggregator *EventAggregator
// 观察器
logger *eventLogger
}
过滤器主要是限速用的,看一个具体的实现:
client-go/tools/record/events_cache.go:129
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
var record spamRecord
// eventKey 的结构大概这样:event.Source.(Component+Host)+event.InvolvedObject.(Kind+Namespace+event+Name)+...
eventKey := f.spamKeyFunc(event)
f.Lock()
defer f.Unlock()
value, found := f.cache.Get(eventKey) // cache 是一个 LRU 缓存
if found {
record = value.(spamRecord)
}
// 没有限速器就加一个
if record.rateLimiter == nil {
// 默认一个 source+object 的 burst 是 25 ,qps 是 1/300(5分钟一个),也就是令牌桶初始容量是 25,然后 5 分钟才会多一个令牌进来
record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
}
// 看速率是否满足要求
filter := !record.rateLimiter.TryAccept()
// 更新 cache
f.cache.Add(eventKey, record)
return filter
}
聚合器的作用是将相似的 events 聚合成一个 event
聚合器定义如下
client-go/tools/record/events_cache.go:191
type EventAggregator struct {
sync.RWMutex
cache *lru.Cache
keyFunc EventAggregatorKeyFunc
messageFunc EventAggregatorMessageFunc
maxEvents uint // 当相似 event 数量超过这个最大值时就触发聚合操作,默认是 10
maxIntervalInSeconds uint // 过了这个间隔的两个相似 event 被认为是一个新的 event,默认 10min
clock clock.PassiveClock
}
对应的 New 函数
func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
maxEvents int, maxIntervalInSeconds int, clock clock.PassiveClock) *EventAggregator {
return &EventAggregator{
cache: lru.New(lruCacheSize), // 默认 4096
keyFunc: keyFunc,
messageFunc: messageFunc,
maxEvents: uint(maxEvents), // 默认是 10
maxIntervalInSeconds: uint(maxIntervalInSeconds), // 默认 600s
clock: clock,
}
}
这里的 keyFunc 如下
func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
return strings.Join([]string{
event.Source.Component,
event.Source.Host,
event.InvolvedObject.Kind,
event.InvolvedObject.Namespace,
event.InvolvedObject.Name,
string(event.InvolvedObject.UID),
event.InvolvedObject.APIVersion,
event.Type,
event.Reason,
event.ReportingController,
event.ReportingInstance,
},
""), event.Message
}
messageFunc 如下
func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
return "(combined from similar events): " + event.Message
}
聚合过程在 EventAggregate()
方法中实现
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
now := metav1.NewTime(e.clock.Now())
var record aggregateRecord // 维护了所有接收到过的 event 的 key
// 计算这个 event 的 key
eventKey := getEventKey(newEvent)
// 类似这样 "(combined from similar events): " + event.Message
aggregateKey, localKey := e.keyFunc(newEvent)
// 查询 caches 里是否有相似 events 记录
e.Lock()
defer e.Unlock()
value, found := e.cache.Get(aggregateKey)
if found {
record = value.(aggregateRecord)
}
// 看向下之前的记录是否太旧了,这个事件是 10min,如果太旧了就更新。
maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
interval := now.Time.Sub(record.lastTimestamp.Time)
if interval > maxInterval {
record = aggregateRecord{localKeys: sets.NewString()}
}
// 新 event 写入聚合 record 里,并且放到 cache 中
record.localKeys.Insert(localKey)
record.lastTimestamp = now
e.cache.Add(aggregateKey, record)
// 如果不重复的 events 数量小于10
if uint(record.localKeys.Len()) < e.maxEvents {
return newEvent, eventKey
}
// 保证 localKeys 不增长,pop 出来一个
record.localKeys.PopAny()
// 返回一个聚合后的 event 和对应的“聚合 key”
eventCopy := &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
Namespace: newEvent.Namespace,
},
Count: 1,
FirstTimestamp: now,
InvolvedObject: newEvent.InvolvedObject,
LastTimestamp: now,
Message: e.messageFunc(newEvent),
Type: newEvent.Type,
Reason: newEvent.Reason,
Source: newEvent.Source,
}
return eventCopy, aggregateKey
}
观察器做的事情是将一个新产生的 Event 和 LRU 缓存里的做对比,如果 key 一致,也就是两个 Event 表示的信息一样,则更新缓存;如果不一样,就加到缓存里。
client-go/tools/record/events_cache.go:315
type eventLogger struct {
sync.RWMutex
cache *lru.Cache
clock clock.PassiveClock
}
观察器有一个 eventObserve() 方法,如果 key 是相同的,这个方法会直接更新已经存在的 event,反之增加一个新的 event
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
var (
patch []byte
err error
)
eventCopy := *newEvent // 复制一份
event := &eventCopy
e.Lock()
defer e.Unlock()
// 检查缓存里是否有需要更新的 event,这里的 key 就是前面提到的 EventAggregatorByReasonFunc() 计算出来的 key
lastObservation := e.lastEventObservationFromCache(key)
// 如果发现了需要更新的 event,也就是新的 event 已经存在已经老的和其各个属性都一样,Reason、Message 等都一样,而且属于同一个对象
if lastObservation.count > 0 {
// update the event based on the last observation so patch will work as desired
event.Name = lastObservation.name
event.ResourceVersion = lastObservation.resourceVersion
event.FirstTimestamp = lastObservation.firstTimestamp // Event 构造的时候会设置 firstTimestamp 和 lastTimestamp,这里更新了 firstTimestamp
event.Count = int32(lastObservation.count) + 1 // 计数器加1
eventCopy2 := *event
eventCopy2.Count = 0
eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
eventCopy2.Message = ""
newData, _ := json.Marshal(event)
oldData, _ := json.Marshal(eventCopy2)
patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
}
// 记录新观察到的 Event
e.cache.Add(
key,
eventLog{
count: uint(event.Count),
firstTimestamp: event.FirstTimestamp,
name: event.Name,
resourceVersion: event.ResourceVersion,
},
)
return event, patch, err
}
回到 EventCorrelate() 方法的实现上
client-go/tools/record/events_cache.go:510
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
if newEvent == nil {
return nil, fmt.Errorf("event is nil")
}
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent) // 聚合器处理
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) // 观察器处理
if c.filterFunc(observedEvent) { // 过滤器处理
return &EventCorrelateResult{Skip: true}, nil
}
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}
最后两步是:
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"})
recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
这里的 NewRecorder 定义如下
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}
}
入参是 Scheme 和 EventSource,EventSource 结构很简单:
type EventSource struct {
// Event 从哪个组件来的,比如:job-controller
Component string `json:"component,omitempty" protobuf:"bytes,1,opt,name=component"`
// Event 从哪个节点来的
Host string `json:"host,omitempty" protobuf:"bytes,2,opt,name=host"`
}
看下 EventRecorder 的定义和实现,接口长这样:
type EventRecorder interface {
// 这里的 object 是这个 event 相关的那个资源对象;eventtype 是 'Normal/Warning' 这类简单的字符串;reason 表示这个 event 产生的原因,message 是一个更详细的可读性好的描述信息
Event(object runtime.Object, eventtype, reason, message string)
// 和 Event() 类似,只是用来 Sprintf
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
// Eventf() 基础上加了一个 annotations
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
对应的实现是:
type recorderImpl struct {
scheme *runtime.Scheme
source v1.EventSource
*watch.Broadcaster
clock clock.PassiveClock
}
我们写代码时使用最多的 Eventf() 对应的就是 recorderImpl 对象的 Eventf() 方法,接下来看下 Eventf() 的具体实现。
Eventf() 方法只是简单地通过 fmt.Sprintf() 格式化字符串后调用 Event()
client-go/tools/record/event.go:354
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, nil, eventtype, reason, message)
}
func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
接着逻辑就到了generateEvent() 方法里
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object) // 获取 object 的 ObjectReference
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
return
}
if !util.ValidateEventType(eventtype) { // 校验 eventtype 是 "Normal"/"Warning"
klog.Errorf("Unsupported event type: '%v'", eventtype)
return
}
event := recorder.makeEvent(ref, annotations, eventtype, reason, message) // 构建 event
event.Source = recorder.source
// events 操作不应该阻塞,如果 event 太多的时候直接丢弃,然后打印一个日志
if sent := recorder.ActionOrDrop(watch.Added, event); !sent {
klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
}
}
这里有两个函数调用:
makeEvent()
ActionOrDrop()
makeEvent 从名字就能猜到这是构造一个 Event 对象的:
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
t := metav1.Time{Time: recorder.clock.Now()}
namespace := ref.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
return &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()), // 名字是相关 object 的 Name + 时间戳
Namespace: namespace, // 和 ref 在同一个 namespace 下,如果 ref 没有 namespace 就放到 default 下
Annotations: annotations, // 支持添加 annotations
},
InvolvedObject: *ref,
Reason: reason,
Message: message,
FirstTimestamp: t,
LastTimestamp: t,
Count: 1,
Type: eventtype,
}
}
ActionOrDrop 的逻辑是尝试往 Broadcaster.incoming channel 中写入 Event,如果失败了就直接 Drop 掉
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
select {
case m.incoming <- Event{action, obj}:
return true
default:
return false
}
}
(转载请保留本文原始链接 https://www.danielhu.cn/post/k8s/event/)