热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

KubernetesEvent原理和源码分析

在Operator开发过程中难免会用到Event对象,所以很有必要了解Event相关细节,可以避

概述

源码版本信息

  • Project: kubernetes

  • Branch: master

  • Last commit id: d25d741c (2021-09-26)

在 Operator 开发过程中难免会用到 Event 对象,所以很有必要了解 Event 相关细节,可以避免很多 bug 的产生。client-go 在处理 Event 的时候,有这样一些特性:

  1. 如果 apiserver 失联,会重试发送 12 次,第一次间隔是 [0,10),剩余每次间隔 10s,合计110-120 s 左右如果还连不上 apiserver 就会放弃本次事件的发送;

  2. client-go 在发送 event 之前会先进行一系列预处理流程,如果相似 event 的聚合,效果就是新发送一个关于相同资源对象的 Reason 和 Message 都相同的 event,这时候新 event 的 count 就是这类事件发生的次数,LastTimestamp 是事件产生时间,FirstTimestamp 是第一次观察到这类事件的时间;并且快速发送多个一样的 event 满足一定条件时会被聚合成一个;

  3. client-go 中发送 event 的 burst 是 25,qps 是 1/300,意味着令牌桶大小是 25,5分钟产生一块新令牌,换言之快速发送 25 个 event 之后,5分钟内发送的 event 会被丢弃;

  4. 消息广播器的缓冲区大小是 1000,如果产生事件的速度太快,当 EventWatcher 来不及处理时,新产生的 event 也会被直接丢弃;

event

以 job 控制器中 event 的用法为例,大致步骤如下

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"})
recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)

从这里我们可以得到几个关键信息,首先是涉及到的几个主要对象:

  • EventBroadcaster

  • EventSink

  • EventRecorder

从名字直接猜的话,也许是这样工作的:EventRecorder 产生 events,EventBroadcaster 广播 events,EventSink,不好猜,字面意思是个事件槽,大概就是事件的一个中转站,最后通过这个 Sink 事件会流转到其他地方,比如 logger 或者 apiserver?到这里为止纯属YY…… 下面具体来看。

EventBroadcaster

EventBroadcaster 用来接收 events,然后发送到一个 EventSink、watcher 或者 log 中;先看下接口定义:

  • client-go/tools/record/event.go:113

type EventBroadcaster interface {
   // 将从 EventBroadcaster 接收到的 events 丢给 eventHandler
   StartEventWatcher(eventHandler func(*v1.Event)watch.Interface 
   // 将从 EventBroadcaster 接收到的 events 丢给 EventSink
   StartRecordingToSink(sink EventSink) watch.Interface
   // 将从 EventBroadcaster 接收到的 events 丢给指定的日志函数
   StartLogging(logf func(format string, args ...interface{})watch.Interface
   // 将从 EventBroadcaster 接收到的 events 丢给指定的结构化日志函数
   StartStructuredLogging(verbosity klog.Level) watch.Interface
   // 用于获取 EventRecorderEventRecorder 可以发送 events 给 EventBroadcaster
   NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
   Shutdown()
}

这里注意不要找到 tools/events 包里去,两个包有很相似的代码,看起来是 tools/events 包已经过期了,目前引用的都是 tools/record 包。

EventBroadcaster 对应的实现是 eventBroadcasterImpl

  • client-go/tools/record/event.go:181

type eventBroadcasterImpl struct {
     *watch.Broadcaster
     sleepDuration time.Duration
     options       CorrelatorOptions
}

前面提到第一步就是 eventBroadcaster := record.NewBroadcaster()
 调用,我们下面看看这里的 New 过程

NewBroadcaster()

实例化一个 EventBroadcaster 的过程中会直接开启一个 goroutine 来从 Broadcaster.imcoming
 接收 Event,然后分发给所有的 Broadcaster.watchers

  • client-go/tools/record/event.go:159

func NewBroadcaster() EventBroadcaster {
    return &eventBroadcasterImpl{
        Broadcaster:   watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), // maxQueuedEvents == 1000
        sleepDuration: defaultSleepDuration, // 10s
    }
}

这里通过调用 watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)
 来创建 Broadcaster,继续看下逻辑:

func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
   m := &Broadcaster{
      watchers:            map[int64]*broadcasterWatcher{},
      incoming:            make(chan Event, queueLength),
      stopped:             make(chan struct{}),
      watchQueueLength:    queueLength,
      fullChannelBehavior: fullChannelBehavior,
   }
   m.distributing.Add(1// wg.Add(1)
   go m.loop()
   return m
}

这里实例化了一个 Broadcaster,先看下 Broadcaster 的结构:

  • apimachinery/pkg/watch/mux.go:42

type Broadcaster struct {
   watchers     map[int64]*broadcasterWatcher
   nextWatcher  int64
   distributing sync.WaitGroup
   incoming chan Event
   stopped  chan struct{}
   watchQueueLength int
   fullChannelBehavior FullChannelBehavior
}

这里的 Event 是这个结构:

type Event struct {
   Type EventType // "ADDED" / "MODIFIED" / "DELETED" / "BOOKMARK" / "ERROR"
   // 如果 EventType 是 "ADDED"/"MODIFIED",Object 是对象的最新状态;
   // 如果 EventType 是 "DELETED",Object 是对象删除前的状态;
   // 如果 EventType 是 "BOOKMARK",Object 里只有 ResourceVersion 字段被设置,客户端会保证不会收到重复的 event 或者丢失任何一个 event
   Object runtime.Object
}

回过来看调用的 m.loop()
,loop() 方法的逻辑是从 m.incoming 接受消息,然后分发给所有的 watchers

func (m *Broadcaster) loop() {
   for event := range m.incoming { // chan Event 类型,这里的 Event apimachinery/pkg/watch.Event
      if event.Type == internalRunFunctionMarker { // 如果是 fake 类型的 Event,就直接调用其内部 Obj 的方法,下面有具体 internalRunFunctionMarker 逻辑的用途分析,在 blockQueue 方法里会讲到。
         event.Object.(functionFakeRuntimeObject)()
         continue
      }
      m.distribute(event) // 分发逻辑
   }
   m.closeAll()
   m.distributing.Done() // wg.Done()
}

继续看 distribute 逻辑,这里主要是将一个 Event 分发给所有的 watcher

func (m *Broadcaster) distribute(event Event) {
   if m.fullChannelBehavior == DropIfChannelFull { // 默认行为,如果 channel 满了不阻塞,直接丢弃消息
      for _, w := range m.watchers { // map[int64]*broadcasterWatcher
         select {
         case w.result <- event: // 将 event 丢给 broadcasterWatcher.result
         case <-w.stopped:
         default// 当 w.result 满了写不进去时直接继续下一轮循环,区别于 else 里的阻塞行为
         }
      }
   } else {
      for _, w := range m.watchers {
         select {
         case w.result <- event: // result 满了会阻塞
         case <-w.stopped:
         }
      }
   }
}

StartEventWatcher

开头提到的 job 控制器里 events 相关代码里接着两步是这样的:

eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

这两个函数调用代码都不长,主要逻辑在内部的 StartEventWatcher 中。简单看下两个方法的定义:

// client-go/tools/record/event.go:190
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watch.Interface {
   return e.StartEventWatcher(
      func(e *v1.Event) { // 收到一个 Event,直接打印到日志里
         klog.V(verbosity).InfoS("Event occurred""object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
      })
}
// ......
// client-go/tools/record/event.go:291
func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
   eventCorrelator := NewEventCorrelatorWithOptions(e.options)
   return e.StartEventWatcher(
      func(event *v1.Event) {
         recordToSink(sink, event, eventCorrelator, e.sleepDuration)
      })
}

我们来看 StartEventWatcher 逻辑,然后回过来聊这两个方法。

func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)watch.Interface {
   watcher := e.Watch() // Watch 过程下面来看
   go func() {
      defer utilruntime.HandleCrash()
      for watchEvent := range watcher.ResultChan() { // 前面 Broadcaster 就是往这里丢的 Event
         event, ok := watchEvent.Object.(*v1.Event) // Event.Object 就是具体的 corev1.Event
         if !ok {
            // This is all local, so there's no reason this should
            // ever happen.
            continue
         }
         eventHandler(event) // 丢给 handler 函数处理
      }
   }()
   return watcher
}

StartEventWatcher()
 方法的入参是一个能处理 Event 的 handler 函数,这里的 Event 是 corev1.Event,也就是我们通过 kubectl 命令具体看到的 Event 资源对象。上面有一个 Watch()
 方法的调用,我们分析下具体内容。

Watch() 方法会 new 一个 watcher,然后加到 m.watchers map 里,返回这个 watcher,这个 watcher 不会接收到历史 events,而且会阻塞到成功加入 Broadcaster 为止。比如 Broadcaster 到 incoming 队列里已经有很多 Event 了,这时候新启动一个 watcher 直接开始工作会收到老消息,下面通过 blockQueue 逻辑实现了只接收新消息的逻辑,具体看下代码:

func (m *Broadcaster) Watch() Interface {
   var w *broadcasterWatcher
   m.blockQueue(func() { // blockQueue() 下面有分析
      id := m.nextWatcher
      m.nextWatcher++
      w = &broadcasterWatcher{
         result:  make(chan Event, m.watchQueueLength),
         stopped: make(chan struct{}),
         id:      id,
         m:       m,
      }
      m.watchers[id] = w
   })
   if w == nil {
      panic("broadcaster already stopped")
   }
   return w
}

blockQueue 用来阻塞 incoming 队列用的,就是往 incoming 里加入一个 fake 的 Event,然后挂起当前 gorouting,直到这个 Event 被处理到为止

func (m *Broadcaster) blockQueue(f func()) {
   select {
   case <-m.stopped:
      return
   default:
   }
   var wg sync.WaitGroup
   wg.Add(1)
   m.incoming <- Event{
      Type: internalRunFunctionMarker, // "internal-do-function"
      Object: functionFakeRuntimeObject(func() {
         defer wg.Done() // 这个 Event 被消费后会调用到 Done()
         f() // 阻塞结束后调用
      }),
   }
   wg.Wait() // 阻塞直到上面加入到 Event 被处理完
}

StartRecordingToSink

讲完了 StartEventWatcher 的逻辑,回过头看一下 StartRecordingToSink 的具体逻辑。StartRecordingToSink 的作用是将从指定 eventBroadcaster 接收到的消息传送到给定的 sink 中去

  • client-go/tools/record/event.go:190

func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
   eventCorrelator := NewEventCorrelatorWithOptions(e.options)
   return e.StartEventWatcher(
      func(event *v1.Event) {
         recordToSink(sink, event, eventCorrelator, e.sleepDuration)
      })
}

这里传给 StartEventWatcher()
 方法的 handler 函数是 recordToSink(sink, event, eventCorrelator, e.sleepDuration)
,看下这个 handler 是怎么实现的:

func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
   // 修改前复制一份,因为一个 event 有多个 listener
   eventCopy := *event
   event = &eventCopy
   result, err := eventCorrelator.EventCorrelate(event) // 聚合处理等,下面会提到
   if err != nil {
      utilruntime.HandleError(err)
   }
   if result.Skip {
      return
   }
   tries := 0
   for {
      // 具体执行将 event 写到 sink 的过程,这里是在 if 的条件里,所以直到成功了才会 break
      if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
         break
      }
      tries++
      if tries >= maxTriesPerEvent { // 最多重试 12 次
         klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
         break
      }
      // 第一次 sleep 随机一点,避免 apiserver 失联的时候所有 client 一起失败
      if tries == 1 {
        time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) // 10s * [0.0,1.0)
      } else {
         time.Sleep(sleepDuration) // 10s
      }
   }
}

recordEvent

  • client-go/tools/record/event.go:238

func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
   var newEvent *v1.Event
   var err error
   if updateExistingEvent { // 如果是更新已有的 event,则调用 Patch 方法
      newEvent, err = sink.Patch(event, patch)
   }
   // 更新可能失败,因为这个 event 可能已经被删除了
   if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
      // 如果是新建,则需要确保 ResourceVersion 为空
      event.ResourceVersion = ""
      newEvent, err = sink.Create(event)
   }
   if err == nil {
      // 更新 eventCorrelator 状态
      eventCorrelator.UpdateState(newEvent)
      return true
   }

   // 连不上 apiserver 等原因引起的失败
   switch err.(type) {
   case *restclient.RequestConstructionError:
      // 这种情况重试也会失败,所以直接返回 true
      klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
      return true
   case *errors.StatusError: // 服务器端拒绝更新,放弃
      if errors.IsAlreadyExists(err) {
         klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
      } else {
         klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
      }
      return true
   case *errors.UnexpectedObjectError:
   default// http 传输问题,比如失联,需要重试

   }
   klog.Errorf("Unable to write event: '%#v': '%v'(may retry after sleeping)", event, err)
   return false
}

EventSink

接口定义如下

  • client-go/tools/record/event.go:47

type EventSink interface {
   Create(event *v1.Event) (*v1.Event, error)
   Update(event *v1.Event) (*v1.Event, error)
   Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
}

实现是

type EventSinkImpl struct {   Interface EventInterface}

回到一开始的用法:eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

可以看到 Interface 引用的是 kubeClient.CoreV1().Events("")
 ,这里的逻辑就到了 client-go 的 clientset 中了,这里的类型也就是 corev1.EventInterface
,所以上面接口的 Create、Update、Patch 也就都是通过 clientset 来实现的。

EventCorrelator

EventCorrelator 的作用是预处理所有 events,聚合频繁产生的相似的 events,将多次接受到的 events 聚合成一个等,从而降低系统压力。

上面提到一个 eventCorrelator.EventCorrelate() 调用,首先看下对象定义:

  • client-go/tools/record/events_cache.go:405

type EventCorrelator struct {
   // 过滤器
   filterFunc EventFilterFunc
   // 聚合器
   aggregator *EventAggregator
   // 观察器
   logger *eventLogger
}

EventFilterFunc

过滤器主要是限速用的,看一个具体的实现:

  • client-go/tools/record/events_cache.go:129

func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
   var record spamRecord
  // eventKey 的结构大概这样:event.Source.(Component+Host)+event.InvolvedObject.(Kind+Namespace+event+Name)+...
   eventKey := f.spamKeyFunc(event)

   f.Lock()
   defer f.Unlock()
   value, found := f.cache.Get(eventKey) // cache 是一个 LRU 缓存
   if found {
      record = value.(spamRecord)
   }

   // 没有限速器就加一个
   if record.rateLimiter == nil {
      // 默认一个 source+object 的 burst 是 25 ,qps 是 1/300(5分钟一个),也就是令牌桶初始容量是 25,然后 5 分钟才会多一个令牌进来
      record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
   }

   // 看速率是否满足要求
   filter := !record.rateLimiter.TryAccept()
   // 更新 cache
   f.cache.Add(eventKey, record)

   return filter
}

EventAggregator

聚合器的作用是将相似的 events 聚合成一个 event

聚合器定义如下

  • client-go/tools/record/events_cache.go:191

type EventAggregator struct {
   sync.RWMutex
   cache *lru.Cache
   keyFunc EventAggregatorKeyFunc
   messageFunc EventAggregatorMessageFunc
   maxEvents uint            // 当相似 event 数量超过这个最大值时就触发聚合操作,默认是 10
   maxIntervalInSeconds uint // 过了这个间隔的两个相似 event 被认为是一个新的 event,默认 10min
   clock clock.PassiveClock
}

对应的 New 函数

func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
   maxEvents int, maxIntervalInSeconds int, clock clock.PassiveClock)
 *EventAggregator
 {
   return &EventAggregator{
      cache:                lru.New(lruCacheSize), // 默认 4096
      keyFunc:              keyFunc,
      messageFunc:          messageFunc,
      maxEvents:            uint(maxEvents), // 默认是 10
      maxIntervalInSeconds: uint(maxIntervalInSeconds), // 默认 600s
      clock:                clock,
   }
}

这里的 keyFunc 如下

func EventAggregatorByReasonFunc(event *v1.Event) (stringstring) {
   return strings.Join([]string{
      event.Source.Component,
      event.Source.Host,
      event.InvolvedObject.Kind,
      event.InvolvedObject.Namespace,
      event.InvolvedObject.Name,
      string(event.InvolvedObject.UID),
      event.InvolvedObject.APIVersion,
      event.Type,
      event.Reason,
      event.ReportingController,
      event.ReportingInstance,
   },
      ""), event.Message
}

messageFunc 如下

func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
   return "(combined from similar events): " + event.Message
}

聚合过程在 EventAggregate()
 方法中实现

func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
   now := metav1.NewTime(e.clock.Now())
   var record aggregateRecord // 维护了所有接收到过的 event 的 key
   // 计算这个 event 的 key
   eventKey := getEventKey(newEvent)
   // 类似这样 "(combined from similar events): " + event.Message
   aggregateKey, localKey := e.keyFunc(newEvent)

   // 查询 caches 里是否有相似 events 记录
   e.Lock()
   defer e.Unlock()
   value, found := e.cache.Get(aggregateKey)
   if found {
      record = value.(aggregateRecord)
   }

   // 看向下之前的记录是否太旧了,这个事件是 10min,如果太旧了就更新。
   maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
   interval := now.Time.Sub(record.lastTimestamp.Time)
   if interval > maxInterval {
      record = aggregateRecord{localKeys: sets.NewString()}
   }

   // 新 event 写入聚合 record 里,并且放到 cache 中
   record.localKeys.Insert(localKey)
   record.lastTimestamp = now
   e.cache.Add(aggregateKey, record)

   // 如果不重复的 events 数量小于10
   if uint(record.localKeys.Len()) < e.maxEvents {
      return newEvent, eventKey
   }

   // 保证 localKeys 不增长,pop 出来一个
   record.localKeys.PopAny()

   // 返回一个聚合后的 event 和对应的“聚合 key”
   eventCopy := &v1.Event{
      ObjectMeta: metav1.ObjectMeta{
         Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
         Namespace: newEvent.Namespace,
      },
      Count:          1,
      FirstTimestamp: now,
      InvolvedObject: newEvent.InvolvedObject,
      LastTimestamp:  now,
      Message:        e.messageFunc(newEvent),
      Type:           newEvent.Type,
      Reason:         newEvent.Reason,
      Source:         newEvent.Source,
   }
   return eventCopy, aggregateKey
}

eventLogger

观察器做的事情是将一个新产生的 Event 和 LRU 缓存里的做对比,如果 key 一致,也就是两个 Event 表示的信息一样,则更新缓存;如果不一样,就加到缓存里。

  • client-go/tools/record/events_cache.go:315

type eventLogger struct {
   sync.RWMutex
   cache *lru.Cache
   clock clock.PassiveClock
}

观察器有一个 eventObserve() 方法,如果 key 是相同的,这个方法会直接更新已经存在的 event,反之增加一个新的 event

func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
   var (
      patch []byte
      err   error
   )
   eventCopy := *newEvent // 复制一份
   event := &eventCopy

   e.Lock()
   defer e.Unlock()

   // 检查缓存里是否有需要更新的 event,这里的 key 就是前面提到的 EventAggregatorByReasonFunc() 计算出来的 key
   lastObservation := e.lastEventObservationFromCache(key)

   // 如果发现了需要更新的 event,也就是新的 event 已经存在已经老的和其各个属性都一样,Reason、Message 等都一样,而且属于同一个对象
   if lastObservation.count > 0 {
      // update the event based on the last observation so patch will work as desired
      event.Name = lastObservation.name
      event.ResourceVersion = lastObservation.resourceVersion
      event.FirstTimestamp = lastObservation.firstTimestamp // Event 构造的时候会设置 firstTimestamp 和 lastTimestamp,这里更新了 firstTimestamp
      event.Count = int32(lastObservation.count) + 1 // 计数器加1

      eventCopy2 := *event
      eventCopy2.Count = 0
      eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(00))
      eventCopy2.Message = ""

      newData, _ := json.Marshal(event)
      oldData, _ := json.Marshal(eventCopy2)
      patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
   }

   // 记录新观察到的 Event
   e.cache.Add(
      key,
      eventLog{
         count:           uint(event.Count),
         firstTimestamp:  event.FirstTimestamp,
         name:            event.Name,
         resourceVersion: event.ResourceVersion,
      },
   )
   return event, patch, err
}

EventCorrelator.EventCorrelate()

回到 EventCorrelate() 方法的实现上

  • client-go/tools/record/events_cache.go:510

func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
   if newEvent == nil {
      return nil, fmt.Errorf("event is nil")
   }
   aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent) // 聚合器处理
   observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) // 观察器处理
   if c.filterFunc(observedEvent) { // 过滤器处理
      return &EventCorrelateResult{Skip: true}, nil
   }
   return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

EventRecorder

最后两步是:

recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"})
recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)

这里的 NewRecorder 定义如下

func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
   return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}
}

入参是 Scheme 和 EventSource,EventSource 结构很简单:

type EventSource struct {
   // Event 从哪个组件来的,比如:job-controller
   Component string `json:"component,omitempty" protobuf:"bytes,1,opt,name=component"`
   // Event 从哪个节点来的
   Host string `json:"host,omitempty" protobuf:"bytes,2,opt,name=host"`
}

看下 EventRecorder 的定义和实现,接口长这样:

type EventRecorder interface {
   // 这里的 object 是这个 event 相关的那个资源对象;eventtype 是 'Normal/Warning' 这类简单的字符串;reason 表示这个 event 产生的原因,message 是一个更详细的可读性好的描述信息
   Event(object runtime.Object, eventtype, reason, message string)
   // 和 Event() 类似,只是用来 Sprintf
   Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
   // Eventf() 基础上加了一个 annotations
   AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

对应的实现是:

type recorderImpl struct {
   scheme *runtime.Scheme
   source v1.EventSource
   *watch.Broadcaster
   clock clock.PassiveClock
}

recorderImpl.Eventf()

我们写代码时使用最多的 Eventf() 对应的就是 recorderImpl 对象的 Eventf() 方法,接下来看下 Eventf() 的具体实现。

Eventf() 方法只是简单地通过 fmt.Sprintf() 格式化字符串后调用 Event()

  • client-go/tools/record/event.go:354

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
    recorder.generateEvent(object, nil, eventtype, reason, message)
}

func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
   recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

接着逻辑就到了generateEvent() 方法里

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
   ref, err := ref.GetReference(recorder.scheme, object) // 获取 object 的 ObjectReference
   if err != nil {
      klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
      return
   }

   if !util.ValidateEventType(eventtype) { // 校验 eventtype 是 "Normal"/"Warning"
      klog.Errorf("Unsupported event type: '%v'", eventtype)
      return
   }

   event := recorder.makeEvent(ref, annotations, eventtype, reason, message) // 构建 event
   event.Source = recorder.source
   // events 操作不应该阻塞,如果 event 太多的时候直接丢弃,然后打印一个日志
   if sent := recorder.ActionOrDrop(watch.Added, event); !sent {
      klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
   }
}

这里有两个函数调用:

  • makeEvent()

  • ActionOrDrop()

makeEvent 从名字就能猜到这是构造一个 Event 对象的:

func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
   t := metav1.Time{Time: recorder.clock.Now()}
   namespace := ref.Namespace
   if namespace == "" {
      namespace = metav1.NamespaceDefault
   }
   return &v1.Event{
      ObjectMeta: metav1.ObjectMeta{
         Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()), // 名字是相关 object 的 Name + 时间戳
         Namespace:   namespace, // 和 ref 在同一个 namespace 下,如果 ref 没有 namespace 就放到 default 下
         Annotations: annotations, // 支持添加 annotations
      },
      InvolvedObject: *ref,
      Reason:         reason,
      Message:        message,
      FirstTimestamp: t,
      LastTimestamp:  t,
      Count:          1,
      Type:           eventtype,
   }
}

ActionOrDrop 的逻辑是尝试往 Broadcaster.incoming channel 中写入 Event,如果失败了就直接 Drop 掉

func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
   select {
   case m.incoming <- Event{action, obj}:
      return true
   default:
      return false
   }
}

(转载请保留本文原始链接 https://www.danielhu.cn/post/k8s/event/)




推荐阅读
  • andr ... [详细]
  • 数据库内核开发入门 | 搭建研发环境的初步指南
    本课程将带你从零开始,逐步掌握数据库内核开发的基础知识和实践技能,重点介绍如何搭建OceanBase的开发环境。 ... [详细]
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • 题目描述:给定n个半开区间[a, b),要求使用两个互不重叠的记录器,求最多可以记录多少个区间。解决方案采用贪心算法,通过排序和遍历实现最优解。 ... [详细]
  • 深入解析:手把手教你构建决策树算法
    本文详细介绍了机器学习中广泛应用的决策树算法,通过天气数据集的实例演示了ID3和CART算法的手动推导过程。文章长度约2000字,建议阅读时间5分钟。 ... [详细]
  • 中科院学位论文排版指南
    随着毕业季的到来,许多即将毕业的学生开始撰写学位论文。本文介绍了使用LaTeX排版学位论文的方法,特别是针对中国科学院大学研究生学位论文撰写规范指导意见的最新要求。LaTeX以其精确的控制和美观的排版效果成为许多学者的首选。 ... [详细]
  • Hadoop发行版本选择指南:技术解析与应用实践
    本文详细介绍了Hadoop的不同发行版本及其特点,帮助读者根据实际需求选择最合适的Hadoop版本。内容涵盖Apache Hadoop、Cloudera CDH等主流版本的特性及应用场景。 ... [详细]
  • 深入解析Java枚举及其高级特性
    本文详细介绍了Java枚举的概念、语法、使用规则和应用场景,并探讨了其在实际编程中的高级应用。所有相关内容已收录于GitHub仓库[JavaLearningmanual](https://github.com/Ziphtracks/JavaLearningmanual),欢迎Star并持续关注。 ... [详细]
  • 版本控制工具——Git常用操作(下)
    本文由云+社区发表作者:工程师小熊摘要:上一集我们一起入门学习了git的基本概念和git常用的操作,包括提交和同步代码、使用分支、出现代码冲突的解决办法、紧急保存现场和恢复 ... [详细]
  • 本文探讨了如何在Classic ASP中实现与PHP的hash_hmac('SHA256', $message, pack('H*', $secret))函数等效的哈希生成方法。通过分析不同实现方式及其产生的差异,提供了一种使用Microsoft .NET Framework的解决方案。 ... [详细]
  • 优化SQL Server批量数据插入存储过程的实现
    本文介绍了一种改进的SQL Server存储过程,用于生成批量插入语句。该方法不仅提高了性能,还支持单行和多行模式,适用于SQL Server 2005及以上版本。 ... [详细]
  • 访问一个网页的全过程
    准备:DHCPUDPIP和以太网启动主机,用一根以太网电缆连接到学校的以太网交换机,交换机又与学校的路由器相连.学校的这台路由器与一个ISP链接,此ISP(Intern ... [详细]
  • 本文探讨了在Git子模块目录中运行pre-commit时遇到的错误,并提供了一种通过Docker环境解决此问题的方法。 ... [详细]
author-avatar
ThinkSNS
ThinkSNS(简称TS),一款全平台综合性社交系统,为国内外大中小企业和创业者提供社会化软件研发及技术解决方案,目前最新版本为ThinkSNS+。
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有