## 介绍

controller是IPBlock-Operator-Plus的核心模块,在ipblock_controller.go​中实现。它负责核心业务逻辑的调度和协调,监听 Kubernetes 中 IPBlock 自定义资源的变化,维护和管理其生命周期。它是整个项目的核心部分,协调各个模块协同工作。

Reconcile(调和、协调、解决冲突)​是Kubernetes Operator的核心方法,用于驱动资源的状态的期望一致性,这也是Kubernetes controller的核心理念与任务。对于IPBlock-Operator-Plus而言,它的任务就是处理每一个IPBlock​对象的状态变更、触发动作以及更新状态字段(Status)等核心逻辑。

结构定义

IPBlockReconciler​接口定义

type IPBlockReconciler struct {
client.Client // 客户端通信
Scheme *runtime.Scheme // 序列化和反序列化
Recorder record.EventRecorder // Event记录器
Adapter engine.Adapter // 封禁适配器接口
AdapterName string
GatewayHost string
CmName string
CmNamespace string
Whitelist *policy.Whitelist // ConfigMap读取
mu sync.RWMutex // 读写锁
Notifier notify.Notifier // 通知接口
}

ipblock_controller.go​中实现的函数,如下:

// 处理CRD各种事件的具体业务逻辑, req包含标识当前对象的信息:名称和命名空间
// ctrl 是 controller-runtime的主入口包
// ctrl.Request 表示控制器需要处理的资源对象标识,也就是哪个资源要 Reconcile
// ctrl.Result 控制是否需要重新排队执行该资源,也就是是否需要再次执行
func (r *IPBlockReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {}

// 计算当前Spec的Hash
func HashSpec(spec opsv1.IPBlockSpec) (string, error) {}

// 用于解决对象版本冲突引发的更新问题
func (r *IPBlockReconciler) UpdateIPBlockStatus(ctx context.Context, ipblock *opsv1.IPBlock, updateFn func(*opsv1.IPBlock)) (*opsv1.IPBlock, error) {}

// 定时解封
func (r *IPBlockReconciler) scheduleAutoUnblock(ipblock *opsv1.IPBlock) {}

// 并发安全,通过锁来更新白名单
func (r *IPBlockReconciler) UpdateWhitelist(wl *policy.Whitelist) {}

// 并发安全,通过锁来更新GatewayHost
func (r *IPBlockReconciler) UpdateGatewayHost(newHost string) {}

字段定义

IPBlock定义

type IPBlock struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec IPBlockSpec `json:"spec,omitempty"`
Status IPBlockStatus `json:"status,omitempty"`
}

// Spec
// 封禁请求
type IPBlockSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

// Foo is an example field of IPBlock. Edit ipblock_types.go to remove/update
Foo string `json:"foo,omitempty"`
IP string `json:"ip"` // 目标IP
Reason string `json:"reason,omitempty"` // 封禁原因
Source string `json:"source,omitempty"` // 封禁来源,如 "alertmanager"、"manual"、"webhook",便于追踪
By string `json:"by,omitempty"` // 谁触发的封禁
Duration string `json:"duration,omitempty"` // 封禁持续时间
Tags []string `json:"tags,omitempty"` // 关键词筛选
Unblock bool `json:"unblock,omitempty"` // 用户显式解封
Trigger bool `json:"trigger,omitempty"` // 用户显式请求重新封禁

}

// Status
// 封禁状态
type IPBlockStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Result string `json:"result,omitempty"` // success, failed, unblocked
Phase string `json:"phase,omitempty"` // pending, active, expired
BlockedAt string `json:"blockedAt,omitempty"`
UnblockedAt string `json:"unblockedAt,omitempty"`
Message string `json:"message,omitempty"`
LastSpecHash string `json:"lastSpecHash,omitempty"`
}

状态机

phase

phase用来标识 IP 状态阶段

Phase 含义说明
pending 初始化阶段(默认值)
active 已封禁状态
expired 已解封(自动或手动)
skipped 跳过(如白名单命中)
failed 操作失败,如封禁失败、非法配置等

phase状态机.drawio

状态流转逻辑说明
初始状态:pending
  • 创建时设为 pending

  • 若命中白名单 → 设为 skipped

  • 若未命中白名单,尝试封禁:

    • 成功 → active
    • 失败 → failed
白名单状态:skipped
  • 表示已跳过处理
  • 幂等逻辑下跳过执行(不重复封禁)
封禁中状态:active
  • 表示 IP 当前已封禁
  • 若设定 Duration​ 且未过期 → 等待时间结束后由 goroutine 调用 scheduleAutoUnblock​ 解封 → expired
  • 若手动触发 Spec.Trigger = true​ → 重新执行封禁流程
  • 若手动设置 Spec.Unblock = true​ → 执行解封操作 → expired
已解封状态:expired
  • 解封成功后设置为 expired
  • 表示资源生命周期已闭环(可留待 GC)
失败状态:failed
  • 封禁失败或非法配置(如 duration​ 解析失败)
  • 写入 .Status.Result = failed​,并通过 notifier 通知错误

result

Result 值 含义
success 操作成功(封禁或解封成功)
failed 操作失败(封禁或解封失败)
unblocked 已解封(通常对应解封动作完成)
skipped 跳过操作(如白名单、重复跳过)
(空字符串) 尚未执行操作或初始状态

result状态机.drawio

Reconcile具体实现

reconcile主要做了五件事:

  1. 如果有手动解封,优先处理
  2. 白名单跳过(只在非 trigger 情况下判断)
  3. 手动强制封禁
  4. 幂等判断(状态无变化 + 未触发)
  5. 封禁操作

手动解封处理

spec​ 表示用户期望的状态,(想要什么)
status​ 表示控制器观察到的实际状态。(现在是什么)

大致流程是:

  • 读取spec.Unblock == true​,表明用户意图要解封
  • 执行解封逻辑 ——> 解封成功
  • 更新status.result == unblocked​,记录了解封完成
  • 然后要把spec.unblock​更新为false​,否则下次Reconcile又会重复解封,造成混乱

主要在判断unblocked​是否为true​。

预处理:

首先需要通过r.Get​去集群中获取名为 req.Name​ 的 IPBlock​ 资源,看看有没有这个资源。

有的话初始化phase​为pending​,使用r.Update​来更新。

手动解封判断:

首先判断当前资源的result​是不是unblocked​,是的话跳过。

不是的话,就需要用r.Adapter.Unban(ip)​方法来解封ip,解封成功或失败都会用r.UpdateIPBlockStatus​来更新资源的状态(Status),然后通过 Notify 进行通知。

最后利用r.Patch​更新ipblock.Spec.Unblock​为false

白名单跳过

首先判断白名单是否存在 && IP 是否在白名单中,命中(即phase == skipped​),则更新status.phase​和status.result​为skipped

手动强制封禁

spec.trigger == true​时,控制器在 Reconcile 中会识别到这一标志,先将其重置为false​,并设置 triggered = true​。
随后通过r.Update(ctx, &ipblock)​更新资源对象,从而触发一次新的 Reconcile​。
在下一轮 Reconcile 中,控制器由于检测到 triggered == true​(即使 spec 内容未变、哈希一致),也会跳过幂等判断,进入并执行 Step 5 中的封禁操作。

这么做实现了一种 “按钮行为” ,不在这里直接调用Ban()​方法是出于幂等性 + 声明式控制 + 最小变更原理考虑。

原因 说明
幂等性 封禁操作必须避免重复执行 → 所以控制器通过 hash 和triggered​判断
声明式控制 控制器行为应由spec​驱动 → 用户设置trigger: true​即可,不需要命令式调用函数
对齐 controller-runtime 模式 所有资源状态变更由 Reconcile 驱动,包括 trigger 的清除

幂等判断

这里通过HashSpec()​来进行哈希,也就是对Spec整个部分进行哈希编码

func HashSpec(spec opsv1.IPBlockSpec) (string, error) {
b, err := json.Marshal(spec)
if err != nil {
return "", err
}

h := sha256.Sum256(b)
return fmt.Sprintf("%x", h), nil
}

控制器通过计算当前 IPBlock 的 spec​ 字段的哈希值(HashSpec(ipblock.Spec)​),与之前记录在 status.lastSpecHash​ 中的哈希值进行对比,从而判断用户配置是否发生变更。

  • 如果一致(即 LastSpecHash == currentHash​),且 Phase != pending​,并且没有设置 trigger == true​,说明:

    • 用户未修改spec
    • 控制器已经执行过封禁或跳过处理
    • 当前状态与期望一致

    因此就会跳过本次处理,保证了控制器的幂等性,避免了重复封禁

  • 若哈希值发生变化或设置了 trigger == true​,说明用户期望重新执行封禁逻辑,此时控制器将继续执行 Step 5 封禁操作。

封禁操作

在 Step 5 中,控制器根据 spec.Duration​ 字段判断封禁时长:

  • 如果 Duration​ 为空字符串,表示永久封禁,设置 isPermanent = true​,并不传递具体时长;

  • 如果 Duration​ 非空,则调用 time.ParseDuration​ 解析该字符串为具体时长 banSeconds​(秒数);

    • 如果解析失败,则更新状态为失败,并返回错误;

随后调用适配器接口 r.Adapter.Ban(ip, isPermanent, banSeconds)​ 执行封禁操作。

  • 若封禁失败,更新状态为失败,并通过Notify发送错误通知;
  • 若封禁成功,更新状态为成功,包括更新时间戳、记录最后的 spec 哈希等信息,并通过通知器发送封禁成功通知。

最后,若为非永久封禁,且未曾自动解封,则启动一个协程异步执行自动解封计划。

对于自动解封函数scheduleAutoUnblock​,主要思路就是time.Sleep(传入的时间)​阻塞当前协程等待这段时间(即封禁到期时间),然后执行r.Adapter.UnBan(ipblock.Spec.IP)​,对于成功/失败,更新状态并通知。

状态更新函数

func (r *IPBlockReconciler) UpdateIPBlockStatus(ctx context.Context, ipblock *opsv1.IPBlock, updateFn func(*opsv1.IPBlock)) (*opsv1.IPBlock, error) {}

目的是为了安全地更新Status字段,特别解决并发更新是的版本冲突问题

  • 定义一个变量 latest​,用来存储每次重试时获取的最新 IPBlock 对象。

  • 调用 retry.RetryOnConflict(retry.DefaultBackoff, func() error {...})​,该函数会捕获因为版本冲突导致的更新失败,并自动按照默认重试策略重试。

  • 在重试函数内部:

    • 先调用 r.Get(...)​ 从 API Server 获取该 IPBlock 的最新版本,避免基于过时版本更新造成冲突。
    • 调用传入的回调函数 updateFn(&latest)​,让调用方修改 latest​ 对象的 Status 字段。
    • 调用 r.Status().Update(ctx, &latest)​ 仅更新对象的 Status 子资源(符合 Kubernetes 设计,Status 和 Spec 可分开更新)。
  • 如果 Update​ 出错且是版本冲突错误,RetryOnConflict​ 会自动重试,直到成功或超过最大重试次数。

  • 如果最终失败,记录错误日志。

  • 返回最新状态的对象指针和错误。

通知模板与动态参数替换

这里主要使用的 Lark 的 card json 模板,将需要替换的部分写成变量形式。

son
...
"elements": [
{
"tag": "markdown",
"content": "<font color=\"grey\">告警内容</font>\n**${ip}**\n 发起请求过于频繁(1分钟内 **${count}** 次)",
"i18n_content": {
"en_us": "<font color=\"grey\">Alert details</font>\nMobile client crash rate at 5%"
},
...

然后在使用时,动态传入参数即可。(模板类型,传递参数)

if r.Notifier != nil {
logger.Info("Notifier found, sending ban notification", "ip", ip)
err := r.Notifier.Notify(ctx, "ban", map[string]string{
"alarm_time": time.Now().Format("2006-01-02 15:04:05"),
"ip": ip,
"count": countExtra(ipblock.Spec.Reason), // 这里填实际count
})
if err != nil {
logger.Error(err, "发送封禁通知失败", "ip", ip)
} else {
logger.Info("发送封禁通知成功", "ip", ip)
}
} else {
logger.Info("Notifier is nil, skipping ban notification")
}

ConfigMap热更新

main.go​中watchConfigMap​实现。

func watchConfigMap(ctx context.Context, mgr ctrl.Manager, reconciler *controller.IPBlockReconciler) {}
  1. 通过 mgr.GetCache().GetInformer(ctx, &corev1.ConfigMap{})​ 获取 ConfigMap 的 Informer
  • Informer 是 controller-runtime 提供的机制,底层封装了 Kubernetes 的缓存和事件监听功能。

  • 它会监听集群中 ConfigMap 资源的新增、更新、删除事件,触发相应回调函数。

  1. 通过 AddEventHandler​ 注册事件处理函数,即注册一个”当资源发生变化时需要执行的函数”
  • AddFunc:当监听的 ConfigMap 新建时触发。
  • UpdateFunc:当监听的 ConfigMap 更新时触发。
  1. 具体处理逻辑:

a. 读取并更新关键配置字段

  • gatewayHost​:网关地址,变更时调用 reconciler.UpdateGatewayHost​ 动态更新。

  • 白名单:调用 config.LoadWhitelistFromConfigMap​ 加载最新白名单,更新 reconciler.UpdateWhitelist​(并发安全更新)。

  • 适配器名称(engine​ 字段):更新适配器实例。

b. 触发器(Triggers)加载和重启

  • 从 ConfigMap 的 trigger​ 字段加载触发器配置。

  • 停止所有旧触发器,重新创建并注册新触发器,最后启动它们。

c. 通知(Notify)配置加载

  • 读取通知类型、Webhook URL 和模板。

  • 动态创建新的通知实例(如飞书 LarkNotify),替换旧实例,实现通知配置的热切换。

并发处理

IPBlock-Operator-Plus在实际使用中可能会出现并发问题,主要会出现以下三种情况:

场景举例:

  • 单 Trigger 重复报警:Grafana 配置有误,1分钟内对同一 IP 发出10次报警。
  • 多个 Trigger 同时触发:比如 Prometheus 和 Grafana 同时监控 TCP 和 HTTP 流量,触发了对同一 IP 的告警。
  • 多个 IP 并发告警:高峰期系统受到 SYN Flood 攻击,多个 IP 同时发起连接,产生大量告警。
  1. 如果单Trigger频繁报警,POST了多个Webhook,可能会出现重复CR资源。

  2. 如果有多个Trigger,多个Trigger通知了同一个 IP,也可能出现重复CR资源,或者出现冲突。

  3. 多个Trigger通知不同 IP,可能出现资源竞争或冲突。

针对这三种情况,在internal/utils​下添加了一些工具专门处理这几种并发问题。

  • debouncer.go​:定义 Debouncer 接口,拥有一个ShouldAllow(key string) bool​方法,用于决定是否允许本次处理。
  • lru_debouncer.go​:Debouncer 的接口具体实现。利用LRU处理Webhook风暴,避免因为大量 Webhook 通知同一个 IP,导致出现大量重复的 CR 资源。
  • gen_name.go​:对 IPBlock CR 名进行唯一性标识,避免多个 Trigger 通知同一个 IP,导致出现大量重复的 CR 资源。
  • IPLock.go​:用于管理 IP 维度的并发锁,避免多个 Trigger 并发冲突或重复创建 CR ,在同一时间只允许一个 Trigger (或一个 goroutine)处理同一个 IP。

LRU 防抖机制

确保同一个 IP 只会有一个资源,防止 Webhook 风暴。

LRU(Least Recently Used)缓存淘汰策略,是一种缓存淘汰机制。它会淘汰最近最少被访问的数据,腾出空间给新数据。

具体是通过维护两个链表:活跃链表和非活跃链表实现的。

有新数据访问时,把新数据插入非活跃链表的尾部。

如果访问了非活跃链表中的数据,就把这个数据放在活跃链表的头部。

如果访问了活跃链表中的数据,就把这个数据放在活跃链表的头部。

如果缓存满了,再插入时,淘汰位于链表最后的数据。

逻辑保存在utils/lru_debouncer.go​中,这里是对 Debouncer 接口的实现。主要有两个方法:

// 使用"k8s.io/utils/lru"包,生成一个 LRU 防抖器。
func NewLRUDebouncer(size int, ttl time.Duration) *lruDebouncer {}

// 检查当前行为是否合法。
func (d *lruDebouncer) ShouldAllow(ip string) bool {}

核心方法是ShouldAllow​,这个方法做了下面几件事:

  1. 给 cache 加上互斥锁,防止并发请求中对其访问和修改。
  2. 在缓存中检查是否有该 IP 的记录
  3. 有的话,就检查它上次触发时间 ts 和现在 time.Since(ts) 的间隔。
  4. 如果间隔时间小于设定的 TTL,说明是重复触发了,就返回 false,拒绝本次操作。

然后在internal/trigger/grafana.go​中应用防抖(在创建 CR 之前)

// 防抖,防止重复创建 CR
if !g.Debouncer.ShouldAllow(ip) {
logger.Info("[grafana] Skip duplicate IPBlock within TTL", "ip", ip)
return
}

最后在main.go​中,注册防抖器。

我们定义了防抖频率规则:

  • 对于不同 IP,LRU 中最多保存1000个IP,达到1000个后会自动淘汰最近最少使用的 IP。
  • 对于同一个IP,如果最近60s内发生过一次封禁,那么这60s内再次收到该IP的相同请求时,会被防抖识别为重复。
// 选择触发器
func CreateTriggerByConfig(cfg TriggerConfig, mgr ctrl.Manager) trigger.Trigger {
switch cfg.Name {
case "grafana":
return &trigger.GrafanaTrigger{
Client: mgr.GetClient(),
Addr: cfg.Addr,
Path: cfg.Path,
// 1000 个 IP, 60 秒防抖
Debouncer: utils.NewLRUDebouncer(1000, 60*time.Second),
IPLocker: utils.NewIPLock(),
}
// TODO 其他触发器 ...
default:
return nil
}
}

唯一性 CR 名称

确保同一个 IP 只会有一个资源。

既然可能会出现重复的 CR 资源,那我们就通过一些特征来让每个 CR 唯一化,这样多个 Trigger 在上报相同 IP 时,生成的 CR 名也是相同的。K8s不会出现相同名的 CR ,这样就有效避免了大量相同 CR 消耗资源的并发问题。

逻辑保存在utils/gen_name.go​中,主要是对 IP 进行 md5 hash,取8位,然后和ipblock-​进行拼接。

func GenCRName(ip string) string {
hash := md5.Sum([]byte(ip))
return "ipblock-" + hex.EncodeToString(hash[:8]) // 16位
}

然后我们在internal/trigger/grafana.go​中应用,在创建 CR 之前,先判断 CR 是否已经存在(g.Client.Get()​ + apierrors.IsNotFound(err)​)。

crName := utils.GenCRName(ip)

// 先判断 CR 是否已经存在
var existing opsv1.IPBlock
err := g.Client.Get(context.Background(), client.ObjectKey{
Name: crName,
Namespace: "default",
}, &existing)
if err == nil {
logger.Info("[grafana] IPBlock already exists, skip creation", "ip", ip)
return
}

// 如果 NotFound,继续创建
if err != nil && !apierrors.IsNotFound(err) {
logger.Error(err, "[grafana] Error checking IPBlock existence", "ip", ip)
return
}

IP锁机制

确保同一时间只允许一个 Trigger (或一个 goroutine)处理同一个 IP 的封禁逻辑。

假设同一时刻,这两个 trigger 几乎同时收到了针对 IP 1.2.3.4​ 的 webhook:

  • TriggerA 开始处理:打算创建一个 IPBlock​ 资源
  • TriggerB 也收到相同 IP 的告警,也想创建一个同样的 IPBlock

如果没有加锁:

  • 两个 trigger 可能几乎同时进入了 Client.Create(...)
  • 结果 Kubernetes 可能报错 AlreadyExists
  • 状态可能冲突,日志也会乱

逻辑保存在utils/IPLock.go​中,主要做三件事:

type IPLock struct {
global sync.Mutex // 保护 locks map
locks map[string]*sync.Mutex // 全局锁,用来保护 locks 这个 map,防止并发读写它时发生竞态。
}
// 新建一个锁管理器
func NewIPLock() *IPLock {}

// 获取某个 IP 的锁
func (i *IPLock) Lock(ip string) {}

// 释放某个 IP 的锁
func (i *IPLock) Unlock(ip string) {}

核心是Lock​,它做了这几件事:

  1. 新建一个全局锁,防止多个 goroutine 访问 map 出现问题。
  2. 查看该 IP 是否有对应的锁,没有就新建一个锁,并加入map中。
  3. 再释放全局锁。
  4. 最后调用具体 IP 的lock.Lock()​,阻塞等待获取锁。

然后应用在internal/trigger/grafana.go​,对预警循环时,会循环获取到每个 IP,这里我们使用匿名函数,为了确保每个 IP 的锁在处理完成后能及时释放,避免因 defer​ 延迟释放导致的锁阻塞,我们为每个 IP 的处理逻辑包裹一个匿名函数:

// 避免并发时的竞争和死锁
func(ip string) {
g.IPLocker.Lock(ip)
defer g.IPLocker.Unlock(ip)
...
}(ip)

这样设计的好处是每轮循环的 defer 都会在当前匿名函数结束时释放锁,避免多个 IP 的处理互相阻塞,保证并发安全。