Flink实时任务里TiDB出现锁冲突的解决方案
前言
目前公司已经有一套功能完善的大数据管理平台。业务上很多实时处理Job的技术栈是 Flink+Kafka+TiDB
以前一直没有接触过TiDB 不过听旁边的技术大佬说,和MySQL一样……
官方注释如下: TiDB 是 PingCAP 公司自主设计、研发的开源分布式关系型数据库,是一款同时支持在线事务处理与在线分析处理的融合型分布式数据库产品,具备水平扩容或者缩容、金融级高可用、实时 HTAP、云原生的分布式数据库、兼容 MySQL 5.7 协议和 MySQL 生态等重要特性。适合高可用、强一致要求较高、数据规模较大等各种应用场景。

问题提出
总之,TiDB用下来感觉…不怎么好。接触下来问题倒挺多。除了经常发生的DDL异常,最先遇到的就是数据库锁冲突问题。
Lock wait timeout exceeded; try restarting transaction…
造成锁冲突的原因很多,主要有以下几点:
- 执行了锁表DML没commit,或进行了删除操作等。
- 同一事务内多个线程对同一条数据进行插入或者更新操作。
- 索引设计不当,出现死锁。
- 长事务,阻塞DDL,最终阻塞该表的所有后续操作。
总之就是从两个方面入手,一是数据库本身的限制,从dml和索引进行排查,二是从业务逻辑的角度,排查数据入库的逻辑是否规范。
解决方案
首先看下TiDB给出的问题解释和官方答案:
Lock wait timeout exceeded
In the pessimistic transaction mode, transactions wait for locks of each other. The timeout for waiting a lock is defined by the innodb_lock_wait_timeout parameter of TiDB. This is the maximum wait lock time at the SQL statement level, which is the expectation of a SQL statement Locking, but the lock has never been acquired. After this time, TiDB will not try to lock again and will return the corresponding error message to the client.
When a wait lock timeout occurs, the following error message will be returned to the client:
ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction
Solutions:If the above error occurs frequently, it is recommended to adjust the application logic.
简单翻译下,在悲观锁的模式下,不同的事务之间持有锁将会相互等待。而锁超时的时间是由TiDB的参数 innodb_lock_wait_timeout 配置的。一般不建议修改这个,说白了这个参数是控制锁竞争激烈条件下单个事务持有锁的最长时间,为了保持集群的稳定,需要在适当的时间内不再去竞争锁,给客户端返回失败。
结合公司具体某一业务入库的数据分析,单条记录主键重复比例高达50%。也就是说同一时刻,存在大量相同的主键频繁操作同一批数据。结合Flink Window会给不同的线程。满足上述的第二个条件。最终官方答案验证,是需要从自身代码逻辑上进行解决。
核心思路:
- 切换Sink的方式,将原来异步入库的方式改为同步。进而解决多线程并发带来的冲突问题。
- 优化去重的方案。在一段可允许的时间内,过滤掉重复的数据。
由于方案一会大大降低实时流处理的消费速率,加大并行度也会造成资源的浪费。这里选择方案二。
去重方法:
目前业内对于实时流处理的去重方案很多,不同的业务场景对去重的要求不一,需要兼顾到到去重的精确性和效率。灵活判断是否满足当前的需要。
- 缓存去重(redis)
- Flink 状态去重
- SQL 方式去重
- Bitmap 精确去重
这里实际上验证了Redis和Flink State的方法。
Redis缓存关键信息,在算子的开头和结尾加入去重的逻辑,由于集群Redis性能不佳,故已放弃该方法。
Flink Keyed DataStream State。 先对主键进行KeyBy然后定义一个装配了TTL的 MapState 。这是由Flink进行管理的状态, 一些序列化、故障恢复等由Flink自动完成.。开发只需要专注于业务上的判断即可。
关键代码如下:
1 | public class DupProcessFunction extends KeyedProcessFunction<String,Tuple2<String,BizEntity>,BizEntity){ |
在Process算子里先定义一个MapState,然后重写算子的初始化方法。把状态的配置定义到open方法里。一是定义state类型,二是指定TTL参数。
在processElement方法里,设置的时间段内是否已经有key进入,否则标识后返回。这个地方也可以灵活处理,判断record的重复。
经过测试,flink state的读写大大优于Redis的相同逻辑。
结合process算子,将原有的逻辑先进行处理,处理后经过state的去重,然后开窗进行聚合,最后Sink数据。
改造后,锁冲突的报警消失。几天观察,任务平稳运行,消费能力和吞吐都有所增加。
结论
文章简要分析了问题,并提出了相应的解决方法。如有疑问欢迎交流。
很多时候看似简单的问题,一到实际生产环境就显得特么难搞。所以冷静分析的力量必不可少,不能被眼前的表象所迷惑!马克思基本原理告诉我们要透过现象看本质。在这里,锁冲突就是表象,重复才是问题的内因!






