risingwave_connector/sink/
decouple_checkpoint_log_sink.rs1use std::num::NonZeroU64;
16use std::time::Instant;
17
18use async_trait::async_trait;
19
20use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
21use crate::sink::writer::SinkWriter;
22use crate::sink::{LogSinker, Result, SinkLogReader, SinkWriterMetrics};
23
24pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE: u64 = 10;
25pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1;
26pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval";
27
28pub fn default_commit_checkpoint_interval() -> u64 {
29 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE
30}
31
32pub struct DecoupleCheckpointLogSinkerOf<W> {
36 writer: W,
37 sink_writer_metrics: SinkWriterMetrics,
38 commit_checkpoint_interval: NonZeroU64,
39}
40
41impl<W> DecoupleCheckpointLogSinkerOf<W> {
42 pub fn new(
45 writer: W,
46 sink_writer_metrics: SinkWriterMetrics,
47 commit_checkpoint_interval: NonZeroU64,
48 ) -> Self {
49 DecoupleCheckpointLogSinkerOf {
50 writer,
51 sink_writer_metrics,
52 commit_checkpoint_interval,
53 }
54 }
55}
56
57#[async_trait]
58impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSinkerOf<W> {
59 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
60 let mut sink_writer = self.writer;
61 log_reader.start_from(None).await?;
62 #[derive(Debug)]
63 enum LogConsumerState {
64 Uninitialized,
66
67 EpochBegun { curr_epoch: u64 },
69
70 BarrierReceived { prev_epoch: u64 },
72 }
73
74 let mut state = LogConsumerState::Uninitialized;
75
76 let mut current_checkpoint: u64 = 0;
77 let commit_checkpoint_interval = self.commit_checkpoint_interval;
78 let sink_writer_metrics = self.sink_writer_metrics;
79
80 loop {
81 let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
82 state = match state {
84 LogConsumerState::Uninitialized => {
85 sink_writer.begin_epoch(epoch).await?;
86 LogConsumerState::EpochBegun { curr_epoch: epoch }
87 }
88 LogConsumerState::EpochBegun { curr_epoch } => {
89 assert!(
90 epoch >= curr_epoch,
91 "new epoch {} should not be below the current epoch {}",
92 epoch,
93 curr_epoch
94 );
95 LogConsumerState::EpochBegun { curr_epoch: epoch }
96 }
97 LogConsumerState::BarrierReceived { prev_epoch, .. } => {
98 assert!(
99 epoch > prev_epoch,
100 "new epoch {} should be greater than prev epoch {}",
101 epoch,
102 prev_epoch
103 );
104
105 sink_writer.begin_epoch(epoch).await?;
106 LogConsumerState::EpochBegun { curr_epoch: epoch }
107 }
108 };
109 match item {
110 LogStoreReadItem::StreamChunk { chunk, .. } => {
111 if let Err(e) = sink_writer.write_batch(chunk).await {
112 sink_writer.abort().await?;
113 return Err(e);
114 }
115 }
116 LogStoreReadItem::Barrier {
117 is_checkpoint,
118 new_vnode_bitmap,
119 ..
120 } => {
121 let prev_epoch = match state {
122 LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
123 _ => unreachable!("epoch must have begun before handling barrier"),
124 };
125 if is_checkpoint {
126 current_checkpoint += 1;
127 if current_checkpoint >= commit_checkpoint_interval.get()
128 || new_vnode_bitmap.is_some()
129 {
130 let start_time = Instant::now();
131 sink_writer.barrier(true).await?;
132 sink_writer_metrics
133 .sink_commit_duration
134 .observe(start_time.elapsed().as_millis() as f64);
135 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
136
137 current_checkpoint = 0;
138 } else {
139 sink_writer.barrier(false).await?;
140 }
141 } else {
142 assert!(new_vnode_bitmap.is_none());
143 sink_writer.barrier(false).await?;
144 }
145 state = LogConsumerState::BarrierReceived { prev_epoch }
146 }
147 }
148 }
149 }
150}