核心功能模块
controller是IPBlock-Operator-Plus的核心模块,在ipblock_controller.go中实现。它负责核心业务逻辑的调度和协调,监听 Kubernetes 中 IPBlock 自定义资源的变化,维护和管理其生命周期。它是整个项目的核心部分,协调各个模块协同工作。
Reconcile(调和、协调、解决冲突)是Kubernetes Operator的核心方法,用于驱动资源的状态的期望一致性,这也是Kubernetes controller的核心理念与任务。对于IPBlock-Operator-Plus而言,它的任务就是处理每一个IPBlock对象的状态变更、触发动作以及更新状态字段(Status)等核心逻辑。
结构定义
IPBlockReconciler接口定义
type IPBlockReconciler struct { |
ipblock_controller.go中实现的函数,如下:
// 处理CRD各种事件的具体业务逻辑, req包含标识当前对象的信息:名称和命名空间 |
字段定义
IPBlock定义
type IPBlock struct { |
状态机
phase
phase用来标识 IP 状态阶段
| Phase | 含义说明 |
|---|---|
pending |
初始化阶段(默认值) |
active |
已封禁状态 |
expired |
已解封(自动或手动) |
skipped |
跳过(如白名单命中) |
failed |
操作失败,如封禁失败、非法配置等 |

状态流转逻辑说明
初始状态:pending
创建时设为
pending若命中白名单 → 设为
skipped若未命中白名单,尝试封禁:
- 成功 →
active - 失败 →
failed
- 成功 →
白名单状态:skipped
- 表示已跳过处理
- 幂等逻辑下跳过执行(不重复封禁)
封禁中状态:active
- 表示 IP 当前已封禁
- 若设定
Duration 且未过期 → 等待时间结束后由 goroutine 调用scheduleAutoUnblock 解封 →expired - 若手动触发
Spec.Trigger = true → 重新执行封禁流程 - 若手动设置
Spec.Unblock = true → 执行解封操作 →expired
已解封状态:expired
- 解封成功后设置为
expired - 表示资源生命周期已闭环(可留待 GC)
失败状态:failed
- 封禁失败或非法配置(如
duration 解析失败) - 写入
.Status.Result = failed,并通过 notifier 通知错误
result
| Result 值 | 含义 |
|---|---|
success |
操作成功(封禁或解封成功) |
failed |
操作失败(封禁或解封失败) |
unblocked |
已解封(通常对应解封动作完成) |
skipped |
跳过操作(如白名单、重复跳过) |
| (空字符串) | 尚未执行操作或初始状态 |

Reconcile具体实现
reconcile主要做了五件事:
- 如果有手动解封,优先处理
- 白名单跳过(只在非 trigger 情况下判断)
- 手动强制封禁
- 幂等判断(状态无变化 + 未触发)
- 封禁操作
手动解封处理
spec 表示用户期望的状态,(想要什么)
status 表示控制器观察到的实际状态。(现在是什么)大致流程是:
- 读取
spec.Unblock == true,表明用户意图要解封- 执行解封逻辑 ——> 解封成功
- 更新
status.result == unblocked,记录了解封完成- 然后要把
spec.unblock更新为false,否则下次Reconcile又会重复解封,造成混乱
主要在判断unblocked是否为true。
预处理:
首先需要通过r.Get去集群中获取名为 req.Name 的 IPBlock 资源,看看有没有这个资源。
有的话初始化phase为pending,使用r.Update来更新。
手动解封判断:
首先判断当前资源的result是不是unblocked,是的话跳过。
不是的话,就需要用r.Adapter.Unban(ip)方法来解封ip,解封成功或失败都会用r.UpdateIPBlockStatus来更新资源的状态(Status),然后通过 Notify 进行通知。
最后利用r.Patch更新ipblock.Spec.Unblock为false
白名单跳过
首先判断白名单是否存在 && IP 是否在白名单中,命中(即phase == skipped),则更新status.phase和status.result为skipped
手动强制封禁
当 spec.trigger == true时,控制器在 Reconcile 中会识别到这一标志,先将其重置为false,并设置 triggered = true。
随后通过r.Update(ctx, &ipblock)更新资源对象,从而触发一次新的 Reconcile。
在下一轮 Reconcile 中,控制器由于检测到 triggered == true(即使 spec 内容未变、哈希一致),也会跳过幂等判断,进入并执行 Step 5 中的封禁操作。
这么做实现了一种 “按钮行为” ,不在这里直接调用Ban()方法是出于幂等性 + 声明式控制 + 最小变更原理考虑。
| 原因 | 说明 |
|---|---|
| 幂等性 | 封禁操作必须避免重复执行 → 所以控制器通过 hash 和triggered判断 |
| 声明式控制 | 控制器行为应由spec驱动 → 用户设置trigger: true即可,不需要命令式调用函数 |
| 对齐 controller-runtime 模式 | 所有资源状态变更由 Reconcile 驱动,包括 trigger 的清除 |
幂等判断
这里通过HashSpec()来进行哈希,也就是对Spec整个部分进行哈希编码
func HashSpec(spec opsv1.IPBlockSpec) (string, error) { |
控制器通过计算当前 IPBlock 的 spec 字段的哈希值(HashSpec(ipblock.Spec)),与之前记录在 status.lastSpecHash 中的哈希值进行对比,从而判断用户配置是否发生变更。
如果一致(即
LastSpecHash == currentHash),且Phase != pending,并且没有设置trigger == true,说明:- 用户未修改
spec - 控制器已经执行过封禁或跳过处理
- 当前状态与期望一致
因此就会跳过本次处理,保证了控制器的幂等性,避免了重复封禁
- 用户未修改
若哈希值发生变化或设置了
trigger == true,说明用户期望重新执行封禁逻辑,此时控制器将继续执行 Step 5 封禁操作。
封禁操作
在 Step 5 中,控制器根据 spec.Duration 字段判断封禁时长:
如果
Duration 为空字符串,表示永久封禁,设置isPermanent = true,并不传递具体时长;如果
Duration 非空,则调用time.ParseDuration 解析该字符串为具体时长banSeconds(秒数);- 如果解析失败,则更新状态为失败,并返回错误;
随后调用适配器接口 r.Adapter.Ban(ip, isPermanent, banSeconds) 执行封禁操作。
- 若封禁失败,更新状态为失败,并通过Notify发送错误通知;
- 若封禁成功,更新状态为成功,包括更新时间戳、记录最后的 spec 哈希等信息,并通过通知器发送封禁成功通知。
最后,若为非永久封禁,且未曾自动解封,则启动一个协程异步执行自动解封计划。
对于自动解封函数scheduleAutoUnblock,主要思路就是time.Sleep(传入的时间)阻塞当前协程等待这段时间(即封禁到期时间),然后执行r.Adapter.UnBan(ipblock.Spec.IP),对于成功/失败,更新状态并通知。
状态更新函数
func (r *IPBlockReconciler) UpdateIPBlockStatus(ctx context.Context, ipblock *opsv1.IPBlock, updateFn func(*opsv1.IPBlock)) (*opsv1.IPBlock, error) {} |
目的是为了安全地更新Status字段,特别解决并发更新是的版本冲突问题
定义一个变量
latest,用来存储每次重试时获取的最新 IPBlock 对象。调用
retry.RetryOnConflict(retry.DefaultBackoff, func() error {...}),该函数会捕获因为版本冲突导致的更新失败,并自动按照默认重试策略重试。在重试函数内部:
- 先调用
r.Get(...) 从 API Server 获取该 IPBlock 的最新版本,避免基于过时版本更新造成冲突。 - 调用传入的回调函数
updateFn(&latest),让调用方修改latest 对象的 Status 字段。 - 调用
r.Status().Update(ctx, &latest) 仅更新对象的 Status 子资源(符合 Kubernetes 设计,Status 和 Spec 可分开更新)。
- 先调用
如果
Update 出错且是版本冲突错误,RetryOnConflict 会自动重试,直到成功或超过最大重试次数。如果最终失败,记录错误日志。
返回最新状态的对象指针和错误。
通知模板与动态参数替换
这里主要使用的 Lark 的 card json 模板,将需要替换的部分写成变量形式。
... |
然后在使用时,动态传入参数即可。(模板类型,传递参数)
if r.Notifier != nil { |
ConfigMap热更新
在main.go中watchConfigMap实现。
func watchConfigMap(ctx context.Context, mgr ctrl.Manager, reconciler *controller.IPBlockReconciler) {} |
- 通过
mgr.GetCache().GetInformer(ctx, &corev1.ConfigMap{}) 获取 ConfigMap 的 Informer
Informer 是 controller-runtime 提供的机制,底层封装了 Kubernetes 的缓存和事件监听功能。
它会监听集群中 ConfigMap 资源的新增、更新、删除事件,触发相应回调函数。
- 通过
AddEventHandler 注册事件处理函数,即注册一个”当资源发生变化时需要执行的函数”
- AddFunc:当监听的 ConfigMap 新建时触发。
- UpdateFunc:当监听的 ConfigMap 更新时触发。
- 具体处理逻辑:
a. 读取并更新关键配置字段
gatewayHost:网关地址,变更时调用reconciler.UpdateGatewayHost 动态更新。白名单:调用
config.LoadWhitelistFromConfigMap 加载最新白名单,更新reconciler.UpdateWhitelist(并发安全更新)。适配器名称(
engine 字段):更新适配器实例。
b. 触发器(Triggers)加载和重启
从 ConfigMap 的
trigger 字段加载触发器配置。停止所有旧触发器,重新创建并注册新触发器,最后启动它们。
c. 通知(Notify)配置加载
读取通知类型、Webhook URL 和模板。
动态创建新的通知实例(如飞书 LarkNotify),替换旧实例,实现通知配置的热切换。
并发处理
IPBlock-Operator-Plus在实际使用中可能会出现并发问题,主要会出现以下三种情况:
场景举例:
- 单 Trigger 重复报警:Grafana 配置有误,1分钟内对同一 IP 发出10次报警。
- 多个 Trigger 同时触发:比如 Prometheus 和 Grafana 同时监控 TCP 和 HTTP 流量,触发了对同一 IP 的告警。
- 多个 IP 并发告警:高峰期系统受到 SYN Flood 攻击,多个 IP 同时发起连接,产生大量告警。
如果单Trigger频繁报警,POST了多个Webhook,可能会出现重复CR资源。
如果有多个Trigger,多个Trigger通知了同一个 IP,也可能出现重复CR资源,或者出现冲突。
多个Trigger通知不同 IP,可能出现资源竞争或冲突。
针对这三种情况,在internal/utils下添加了一些工具专门处理这几种并发问题。
debouncer.go:定义 Debouncer 接口,拥有一个ShouldAllow(key string) bool方法,用于决定是否允许本次处理。lru_debouncer.go:Debouncer 的接口具体实现。利用LRU处理Webhook风暴,避免因为大量 Webhook 通知同一个 IP,导致出现大量重复的 CR 资源。gen_name.go:对 IPBlock CR 名进行唯一性标识,避免多个 Trigger 通知同一个 IP,导致出现大量重复的 CR 资源。IPLock.go:用于管理 IP 维度的并发锁,避免多个 Trigger 并发冲突或重复创建 CR ,在同一时间只允许一个 Trigger (或一个 goroutine)处理同一个 IP。
LRU 防抖机制
确保同一个 IP 只会有一个资源,防止 Webhook 风暴。
LRU(Least Recently Used)缓存淘汰策略,是一种缓存淘汰机制。它会淘汰最近最少被访问的数据,腾出空间给新数据。
具体是通过维护两个链表:活跃链表和非活跃链表实现的。
有新数据访问时,把新数据插入非活跃链表的尾部。
如果访问了非活跃链表中的数据,就把这个数据放在活跃链表的头部。
如果访问了活跃链表中的数据,就把这个数据放在活跃链表的头部。
如果缓存满了,再插入时,淘汰位于链表最后的数据。
逻辑保存在utils/lru_debouncer.go中,这里是对 Debouncer 接口的实现。主要有两个方法:
// 使用"k8s.io/utils/lru"包,生成一个 LRU 防抖器。 |
核心方法是ShouldAllow,这个方法做了下面几件事:
- 给 cache 加上互斥锁,防止并发请求中对其访问和修改。
- 在缓存中检查是否有该 IP 的记录
- 有的话,就检查它上次触发时间 ts 和现在 time.Since(ts) 的间隔。
- 如果间隔时间小于设定的 TTL,说明是重复触发了,就返回 false,拒绝本次操作。
然后在internal/trigger/grafana.go中应用防抖(在创建 CR 之前)
// 防抖,防止重复创建 CR |
最后在main.go中,注册防抖器。
我们定义了防抖频率规则:
- 对于不同 IP,LRU 中最多保存1000个IP,达到1000个后会自动淘汰最近最少使用的 IP。
- 对于同一个IP,如果最近60s内发生过一次封禁,那么这60s内再次收到该IP的相同请求时,会被防抖识别为重复。
// 选择触发器 |
唯一性 CR 名称
确保同一个 IP 只会有一个资源。
既然可能会出现重复的 CR 资源,那我们就通过一些特征来让每个 CR 唯一化,这样多个 Trigger 在上报相同 IP 时,生成的 CR 名也是相同的。K8s不会出现相同名的 CR ,这样就有效避免了大量相同 CR 消耗资源的并发问题。
逻辑保存在utils/gen_name.go中,主要是对 IP 进行 md5 hash,取8位,然后和ipblock-进行拼接。
func GenCRName(ip string) string { |
然后我们在internal/trigger/grafana.go中应用,在创建 CR 之前,先判断 CR 是否已经存在(g.Client.Get() + apierrors.IsNotFound(err))。
crName := utils.GenCRName(ip) |
IP锁机制
确保同一时间只允许一个 Trigger (或一个 goroutine)处理同一个 IP 的封禁逻辑。
假设同一时刻,这两个 trigger 几乎同时收到了针对 IP
1.2.3.4 的 webhook:
- TriggerA 开始处理:打算创建一个
IPBlock 资源- TriggerB 也收到相同 IP 的告警,也想创建一个同样的
IPBlock如果没有加锁:
- 两个 trigger 可能几乎同时进入了
Client.Create(...)- 结果 Kubernetes 可能报错
AlreadyExists- 状态可能冲突,日志也会乱
逻辑保存在utils/IPLock.go中,主要做三件事:
type IPLock struct { |
// 新建一个锁管理器 |
核心是Lock,它做了这几件事:
- 新建一个全局锁,防止多个 goroutine 访问 map 出现问题。
- 查看该 IP 是否有对应的锁,没有就新建一个锁,并加入map中。
- 再释放全局锁。
- 最后调用具体 IP 的
lock.Lock(),阻塞等待获取锁。
然后应用在internal/trigger/grafana.go,对预警循环时,会循环获取到每个 IP,这里我们使用匿名函数,为了确保每个 IP 的锁在处理完成后能及时释放,避免因 defer 延迟释放导致的锁阻塞,我们为每个 IP 的处理逻辑包裹一个匿名函数:
// 避免并发时的竞争和死锁 |
这样设计的好处是每轮循环的 defer 都会在当前匿名函数结束时释放锁,避免多个 IP 的处理互相阻塞,保证并发安全。
