risingwave_connector/sink/
coordinate.rs1use std::cmp::Ordering;
16use std::future::pending;
17use std::num::NonZeroU64;
18use std::time::Instant;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_pb::connector_service::SinkMetadata;
25use tracing::{info, warn};
26
27use super::{
28 LogSinker, SinkCoordinationRpcClientEnum, SinkError, SinkLogReader, SinkWriterMetrics,
29 SinkWriterParam,
30};
31use crate::sink::writer::SinkWriter;
32use crate::sink::{LogStoreReadItem, Result, SinkParam, TruncateOffset};
33
34pub struct CoordinatedLogSinker<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
35 writer: W,
36 sink_coordinate_client: SinkCoordinationRpcClientEnum,
37 param: SinkParam,
38 vnode_bitmap: Bitmap,
39 commit_checkpoint_interval: NonZeroU64,
40 sink_writer_metrics: SinkWriterMetrics,
41}
42
43impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedLogSinker<W> {
44 pub async fn new(
45 writer_param: &SinkWriterParam,
46 param: SinkParam,
47 writer: W,
48 commit_checkpoint_interval: NonZeroU64,
49 ) -> Result<Self> {
50 Ok(Self {
51 writer,
52 sink_coordinate_client: writer_param
53 .meta_client
54 .as_ref()
55 .ok_or_else(|| anyhow!("should have meta client"))?
56 .clone()
57 .sink_coordinate_client()
58 .await,
59 param,
60 vnode_bitmap: writer_param
61 .vnode_bitmap
62 .as_ref()
63 .ok_or_else(|| {
64 anyhow!("sink needs coordination and should not have singleton input")
65 })?
66 .clone(),
67 commit_checkpoint_interval,
68 sink_writer_metrics: SinkWriterMetrics::new(writer_param),
69 })
70 }
71}
72
73#[async_trait]
74impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for CoordinatedLogSinker<W> {
75 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
76 let (mut coordinator_stream_handle, log_store_rewind_start_epoch) = self
77 .sink_coordinate_client
78 .new_stream_handle(&self.param, self.vnode_bitmap)
79 .await?;
80 let mut sink_writer = self.writer;
81 log_reader.start_from(log_store_rewind_start_epoch).await?;
82 let mut first_item = log_reader.next_item().await?;
83 if let (Some(log_store_rewind_start_epoch), (first_epoch, _)) =
84 (log_store_rewind_start_epoch, &first_item)
85 {
86 if log_store_rewind_start_epoch >= *first_epoch {
87 bail!(
88 "log_store_rewind_start_epoch {} not later than first_epoch {}",
89 log_store_rewind_start_epoch,
90 first_epoch
91 );
92 }
93 } else {
94 let &(initial_epoch, _) = &first_item;
95 let aligned_initial_epoch = coordinator_stream_handle
96 .align_initial_epoch(initial_epoch)
97 .await?;
98 if initial_epoch != aligned_initial_epoch {
99 warn!(
100 initial_epoch,
101 aligned_initial_epoch,
102 sink_id = self.param.sink_id.sink_id,
103 "initial epoch not matched aligned initial epoch"
104 );
105 let mut peeked_first = Some(first_item);
106 first_item = loop {
107 let (epoch, item) = if let Some(peeked_first) = peeked_first.take() {
108 peeked_first
109 } else {
110 log_reader.next_item().await?
111 };
112 match epoch.cmp(&aligned_initial_epoch) {
113 Ordering::Less => {
114 continue;
115 }
116 Ordering::Equal => {
117 break (epoch, item);
118 }
119 Ordering::Greater => {
120 return Err(anyhow!(
121 "initial epoch {} greater than aligned initial epoch {}",
122 initial_epoch,
123 aligned_initial_epoch
124 )
125 .into());
126 }
127 }
128 };
129 }
130 }
131
132 let mut first_item = Some(first_item);
133
134 #[derive(Debug)]
135 enum LogConsumerState {
136 Uninitialized,
138
139 EpochBegun { curr_epoch: u64 },
141
142 BarrierReceived { prev_epoch: u64 },
144 }
145
146 let mut state = LogConsumerState::Uninitialized;
147
148 let mut current_checkpoint: u64 = 0;
149 let commit_checkpoint_interval = self.commit_checkpoint_interval;
150 let sink_writer_metrics = self.sink_writer_metrics;
151
152 loop {
153 let (epoch, item) = if let Some(item) = first_item.take() {
154 item
155 } else {
156 log_reader.next_item().await?
157 };
158
159 state = match state {
161 LogConsumerState::Uninitialized => {
162 sink_writer.begin_epoch(epoch).await?;
163 LogConsumerState::EpochBegun { curr_epoch: epoch }
164 }
165 LogConsumerState::EpochBegun { curr_epoch } => {
166 assert!(
167 epoch >= curr_epoch,
168 "new epoch {} should not be below the current epoch {}",
169 epoch,
170 curr_epoch
171 );
172 LogConsumerState::EpochBegun { curr_epoch: epoch }
173 }
174 LogConsumerState::BarrierReceived { prev_epoch, .. } => {
175 assert!(
176 epoch > prev_epoch,
177 "new epoch {} should be greater than prev epoch {}",
178 epoch,
179 prev_epoch
180 );
181
182 sink_writer.begin_epoch(epoch).await?;
183 LogConsumerState::EpochBegun { curr_epoch: epoch }
184 }
185 };
186 match item {
187 LogStoreReadItem::StreamChunk { chunk, .. } => {
188 if let Err(e) = sink_writer.write_batch(chunk).await {
189 sink_writer.abort().await?;
190 return Err(e);
191 }
192 }
193 LogStoreReadItem::Barrier {
194 is_checkpoint,
195 new_vnode_bitmap,
196 is_stop,
197 } => {
198 let prev_epoch = match state {
199 LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
200 _ => unreachable!("epoch must have begun before handling barrier"),
201 };
202 if is_checkpoint {
203 current_checkpoint += 1;
204 if current_checkpoint >= commit_checkpoint_interval.get()
205 || new_vnode_bitmap.is_some()
206 || is_stop
207 {
208 let start_time = Instant::now();
209 let metadata = sink_writer.barrier(true).await?;
210 let metadata = metadata.ok_or_else(|| {
211 SinkError::Coordinator(anyhow!(
212 "should get metadata on checkpoint barrier"
213 ))
214 })?;
215 coordinator_stream_handle.commit(epoch, metadata).await?;
216 sink_writer_metrics
217 .sink_commit_duration
218 .observe(start_time.elapsed().as_millis() as f64);
219
220 current_checkpoint = 0;
221 if let Some(new_vnode_bitmap) = new_vnode_bitmap {
222 let epoch = coordinator_stream_handle
223 .update_vnode_bitmap(&new_vnode_bitmap)
224 .await?;
225 if epoch != prev_epoch {
226 bail!(
227 "newly start epoch {} after update vnode bitmap not matched with prev_epoch {}",
228 epoch,
229 prev_epoch
230 );
231 }
232 }
233 if is_stop {
234 coordinator_stream_handle.stop().await?;
235 info!(
236 sink_id = self.param.sink_id.sink_id,
237 "coordinated log sinker stops"
238 );
239 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
240 return pending().await;
241 }
242 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
243 } else {
244 let metadata = sink_writer.barrier(false).await?;
245 if let Some(metadata) = metadata {
246 warn!(?metadata, "get metadata on non-checkpoint barrier");
247 }
248 }
249 } else {
250 let metadata = sink_writer.barrier(false).await?;
251 if let Some(metadata) = metadata {
252 warn!(?metadata, "get metadata on non-checkpoint barrier");
253 }
254 }
255 state = LogConsumerState::BarrierReceived { prev_epoch }
256 }
257 }
258 }
259 }
260}