前言
目前,数据处理的链路很长,由于业务的不断迭代,导致了数据的分层。例如ODS->DWD->DWS->ADS。每个层都有不同的数据处理逻辑。 而层与层之间目前是通过Kafka队列实现解耦的。在某些场景下,上下层需要通过查询数据库来实现某种数据的更新交互。由于网络延迟和一些不可预知的原因,导致下游查询不到最新的数据,从而无法得到正确的更新。
解决方案
既然是因为时间差导致的不一致问题,那直接可以使用延迟队列。虽然kafka也支持延迟队列,但是使用上限制较多,且需要配置,维护比较麻烦。在环境允许的情况下,可以优先选用RabbitMQ进行解决。网上示例很多,可以进行参考。
由于笔者是Flink流处理任务,DataSteam的底层API天然就支持对时间的精确控制。时间语义和窗口是让无界流可以聚合精准处理的两大核心要素。
大致思路如下:
首先把上游推送的标识ID进行逻辑分区
开窗口进行ID去重,保证一段时间内数据的唯一性,避免下游重复查询
定义一个状态,缓存延迟的ID
注册定时器,触发ID的推送逻辑
实现代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("192.168.88.130:9092") .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build();
DataStreamSource<String> kafka_source = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
env.setParallelism(2);
source.keyBy(String::hashCode) .window(TumblingProcessingTimeWindows.of(Time.seconds(30L))) .process(new ProcessWindowFunction<String, String, Integer, TimeWindow>() { @Override public void process(Integer integer, ProcessWindowFunction<String, String, Integer, TimeWindow>.Context context, Iterable<String> iterable, Collector<String> collector) throws Exception { Set<String> resList = new HashSet<>(); for (String it : iterable) { resList.add(it); } for (String it : resList) { collector.collect(it); } } }) .keyBy(String::hashCode) .process(new StateTimerFunction(3)) .print();
env.execute("my job");
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package com.hc.process;
import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;
public class StateTimerFunction extends KeyedProcessFunction<Integer, String, String> {
private MapState<String,String> mapCacheStorage; private ValueState<String> valCacheStorage;
private int TTL_SEC;
public StateTimerFunction(int TTL_MINUTE) { this.TTL_SEC = TTL_MINUTE; }
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<String> valStateDescriptor = new ValueStateDescriptor<>("valState", String.class);
StateTtlConfig ttl = StateTtlConfig .newBuilder(Time.seconds(5)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build();
valStateDescriptor.enableTimeToLive(ttl);
valCacheStorage = getRuntimeContext().getState(valStateDescriptor); }
@Override public void processElement(String bytes, Context context, Collector<String> collector) throws Exception { try {
context.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + TTL_SEC *1000); if (valCacheStorage.value()==null){ valCacheStorage.update(bytes); collector.collect(bytes); } } catch (Exception e) { e.printStackTrace(); }
}
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { if (valCacheStorage.value()!=null){ out.collect(valCacheStorage.value().concat("selectDB")); } } }
|
运行的结果如图所示

结论
Flink作为以流为核心的高可用、高性能的分布式计算引擎。具备流批一体,高吞吐、低延迟,容错能力,大规模复杂计算等特点。结合其底层的核心DataSteamAPI可以非常灵活的处理各种需求。
Copyright Notice: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!