
不能靠 sleep 轮询,也不能全量重算——增量处理的核心是位点(position)和状态快照(state)。
为什么 time.Sleep 轮询在大规模数据下必然失败
轮询看似简单,但面对高写入密度或不均匀延迟时,会直接漏数据或重复消费。比如上游 1 秒内写入 200 条记录,而你的 time.Sleep(5 * time.Second) 固定间隔,中间几批变更就永远丢失;更糟的是刚查完就插入一条,这条要等下一个周期才被拉取,滞留整整 5 秒。
真正可用的增量必须满足三个条件:可比较、单调递增、能精确定位到某条记录之后的位置。常见位点包括:
MySQL:binlog position(推荐)、updated_at + id 复合条件(需注意时钟漂移)PostgreSQL:LSN 或 logical replication slot(搭配 wal2json 插件更易解析)通用兜底:自增 id(仅限单写、无软删/硬删、无 update 覆盖场景)
用 go-mysql-org/go-mysql 做 MySQL binlog 增量监听
这个库封装了 MySQL 的 binlog dump 协议,比手动解析 SHOW BINLOG EVENTS 稳定得多,且原生支持断线重连与位点续传。
立即学习“go语言免费学习笔记(深入)”;
关键配置和避坑点:
MySQL 必须开启 binlog,且 binlog_format = ROW(否则拿不到行级变更)server_id 必须全局唯一,否则 MySQL 会拒绝连接首次启动建议从 SHOW MASTER STATUS 拿最新 File 和 Pos,存到本地文件或 etcd;重启时读该位点,调用 Syncer.StartSync(pos)收到 *replication.RowsEvent 后,务必先判空再用:if x.Table != nil && x.Action != 0,有些 event 是 DDL 或空事件
示例片段(简化):
syncer := replication.NewBinlogSyncer(&replication.BinlogSyncerConfig{ ServerID: 1001, Flavor: "mysql", Host: "127.0.0.1", Port: 3306, User: "repl", Password: "xxx",})streamer, _ := syncer.StartSync(mysql.Position{File: "mysql-bin.000001", Pos: 4})for { ev, _ := streamer.GetEvent(context.Background()) switch x := ev.Event.(type) { case *replication.RowsEvent: if x.Action == replication.DeleteAction { // 处理删除 } }}
用 pglogrepl 实现 PostgreSQL 逻辑复制同步
pglogrepl 是目前最轻量、纯 Go 实现的 PG 逻辑复制客户端,不依赖 libpq,适合嵌入长期运行的服务中。
和 MySQL 不同,PG 的逻辑复制需要前置创建 replication slot,并指定输出插件(如 wal2json)。如果你只关心结构化变更,wal2json 比原生 pgoutput 更友好,返回 JSON 格式,字段语义清晰。
操作步骤:
首次连接前,手动执行:SELECT * FROM pg_create_logical_replication_slot(‘my_slot’, ‘wal2json’);连接时指定 slot 名和 plugin:pglogrepl.StartReplication(…, pglogrepl.ReplicationOptions{PluginArgs: []string{"include-transaction", "include-timestamp"}})收到消息后,用 wal2json 解析器解析 msg.Data,提取 change 数组中的每条变更每次成功处理后,调用 pglogrepl.SendStandbyStatusUpdate() 上报已消费 LSN,防止 WAL 被回收
通用 state.json 增量方案(适用于无 binlog / 无逻辑复制的场景)
当数据库不支持或无法启用变更日志时,可用“状态快照 + 时间戳对比”兜底,典型用于 Sitemap 增量生成、静态内容同步等。
核心是维护一个 state.json 文件,记录每个 URL 及其最后更新时间:{"https://example.com/item/1": "2025-11-08T12:34:56Z"}。每次运行只拉取 lastmod 变更或新增的条目。
要点:
必须用 RFC3339 格式存储时间(带时区),避免跨机器时钟漂移导致误判全量扫描时,不要一次性加载全部 URL 到内存,改用流式遍历(如 database/sql 分页游标 + rows.Next())state 更新必须原子:先写新 state.tmp,再 os.Rename() 覆盖旧文件,防止中断损坏并发写入时加文件锁(flock 或 syscall.Flock),否则多进程可能覆盖彼此 state
diff 逻辑极简示例:
func diffURLs(old StateMap, all []UrlEntry) (changed []UrlEntry, newState StateMap) { newState = make(StateMap, len(all)) for _, u := range all { newState[u.Loc] = u.LastMod if last, ok := old[u.Loc]; !ok || last != u.LastMod { changed = append(changed, u) } } return}
位点管理比想象中脆弱——哪怕只漏一次 LSN 或 binlog position,后续所有增量都可能错位。务必把位点持久化、校验、上报做成不可跳过的原子步骤,而不是“等出问题再补”。

评论(0)