risingwave_connector/sink/
coordinate.rs1use std::cmp::Ordering;
16use std::future::pending;
17use std::num::NonZeroU64;
18use std::time::Instant;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_pb::connector_service::SinkMetadata;
25use tracing::{info, warn};
26
27use super::{
28 LogSinker, SinkCoordinationRpcClientEnum, SinkLogReader, SinkWriterMetrics, SinkWriterParam,
29};
30use crate::sink::writer::SinkWriter;
31use crate::sink::{LogStoreReadItem, Result, SinkError, SinkParam, TruncateOffset};
32
33pub struct CoordinatedLogSinker<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
34 writer: W,
35 sink_coordinate_client: SinkCoordinationRpcClientEnum,
36 param: SinkParam,
37 vnode_bitmap: Bitmap,
38 commit_checkpoint_interval: NonZeroU64,
39 sink_writer_metrics: SinkWriterMetrics,
40}
41
42impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedLogSinker<W> {
43 pub async fn new(
44 writer_param: &SinkWriterParam,
45 param: SinkParam,
46 writer: W,
47 commit_checkpoint_interval: NonZeroU64,
48 ) -> Result<Self> {
49 Ok(Self {
50 writer,
51 sink_coordinate_client: writer_param
52 .meta_client
53 .as_ref()
54 .ok_or_else(|| anyhow!("should have meta client"))?
55 .clone()
56 .sink_coordinate_client()
57 .await,
58 param,
59 vnode_bitmap: writer_param
60 .vnode_bitmap
61 .as_ref()
62 .ok_or_else(|| {
63 anyhow!("sink needs coordination and should not have singleton input")
64 })?
65 .clone(),
66 commit_checkpoint_interval,
67 sink_writer_metrics: SinkWriterMetrics::new(writer_param),
68 })
69 }
70}
71
72#[async_trait]
73impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for CoordinatedLogSinker<W> {
74 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
75 let (mut coordinator_stream_handle, log_store_rewind_start_epoch) = self
76 .sink_coordinate_client
77 .new_stream_handle(&self.param, self.vnode_bitmap)
78 .await?;
79 let mut sink_writer = self.writer;
80 log_reader.start_from(log_store_rewind_start_epoch).await?;
81 let mut first_item = log_reader.next_item().await?;
82 if let (Some(log_store_rewind_start_epoch), (first_epoch, _)) =
83 (log_store_rewind_start_epoch, &first_item)
84 {
85 if log_store_rewind_start_epoch >= *first_epoch {
86 bail!(
87 "log_store_rewind_start_epoch {} not later than first_epoch {}",
88 log_store_rewind_start_epoch,
89 first_epoch
90 );
91 }
92 } else {
93 let &(initial_epoch, _) = &first_item;
94 let aligned_initial_epoch = coordinator_stream_handle
95 .align_initial_epoch(initial_epoch)
96 .await?;
97 if initial_epoch != aligned_initial_epoch {
98 warn!(
99 initial_epoch,
100 aligned_initial_epoch,
101 sink_id = self.param.sink_id.sink_id,
102 "initial epoch not matched aligned initial epoch"
103 );
104 let mut peeked_first = Some(first_item);
105 first_item = loop {
106 let (epoch, item) = if let Some(peeked_first) = peeked_first.take() {
107 peeked_first
108 } else {
109 log_reader.next_item().await?
110 };
111 match epoch.cmp(&aligned_initial_epoch) {
112 Ordering::Less => {
113 continue;
114 }
115 Ordering::Equal => {
116 break (epoch, item);
117 }
118 Ordering::Greater => {
119 return Err(anyhow!(
120 "initial epoch {} greater than aligned initial epoch {}",
121 initial_epoch,
122 aligned_initial_epoch
123 )
124 .into());
125 }
126 }
127 };
128 }
129 }
130
131 let mut first_item = Some(first_item);
132
133 #[derive(Debug)]
134 enum LogConsumerState {
135 Uninitialized,
137
138 EpochBegun { curr_epoch: u64 },
140
141 BarrierReceived { prev_epoch: u64 },
143 }
144
145 let mut state = LogConsumerState::Uninitialized;
146
147 let mut current_checkpoint: u64 = 0;
148 let commit_checkpoint_interval = self.commit_checkpoint_interval;
149 let sink_writer_metrics = self.sink_writer_metrics;
150
151 loop {
152 let (epoch, item) = if let Some(item) = first_item.take() {
153 item
154 } else {
155 log_reader.next_item().await?
156 };
157
158 state = match state {
160 LogConsumerState::Uninitialized => {
161 sink_writer.begin_epoch(epoch).await?;
162 LogConsumerState::EpochBegun { curr_epoch: epoch }
163 }
164 LogConsumerState::EpochBegun { curr_epoch } => {
165 assert!(
166 epoch >= curr_epoch,
167 "new epoch {} should not be below the current epoch {}",
168 epoch,
169 curr_epoch
170 );
171 LogConsumerState::EpochBegun { curr_epoch: epoch }
172 }
173 LogConsumerState::BarrierReceived { prev_epoch, .. } => {
174 assert!(
175 epoch > prev_epoch,
176 "new epoch {} should be greater than prev epoch {}",
177 epoch,
178 prev_epoch
179 );
180
181 sink_writer.begin_epoch(epoch).await?;
182 LogConsumerState::EpochBegun { curr_epoch: epoch }
183 }
184 };
185 match item {
186 LogStoreReadItem::StreamChunk { chunk, .. } => {
187 if let Err(e) = sink_writer.write_batch(chunk).await {
188 sink_writer.abort().await?;
189 return Err(e);
190 }
191 }
192 LogStoreReadItem::Barrier {
193 is_checkpoint,
194 new_vnode_bitmap,
195 is_stop,
196 add_columns,
197 } => {
198 let prev_epoch = match state {
199 LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
200 _ => unreachable!("epoch must have begun before handling barrier"),
201 };
202 if is_checkpoint {
203 current_checkpoint += 1;
204 if current_checkpoint >= commit_checkpoint_interval.get()
205 || new_vnode_bitmap.is_some()
206 || is_stop
207 || add_columns.is_some()
208 {
209 let start_time = Instant::now();
210 let metadata = sink_writer.barrier(true).await?;
211 let metadata = metadata.ok_or_else(|| {
212 SinkError::Coordinator(anyhow!(
213 "should get metadata on checkpoint barrier"
214 ))
215 })?;
216 if add_columns.is_some() {
217 assert!(
218 is_stop,
219 "add columns should stop current sink for sink {}",
220 self.param.sink_id
221 );
222 }
223 coordinator_stream_handle
224 .commit(epoch, metadata, add_columns)
225 .await?;
226 sink_writer_metrics
227 .sink_commit_duration
228 .observe(start_time.elapsed().as_millis() as f64);
229
230 current_checkpoint = 0;
231 if let Some(new_vnode_bitmap) = new_vnode_bitmap {
232 let epoch = coordinator_stream_handle
233 .update_vnode_bitmap(&new_vnode_bitmap)
234 .await?;
235 if epoch != prev_epoch {
236 bail!(
237 "newly start epoch {} after update vnode bitmap not matched with prev_epoch {}",
238 epoch,
239 prev_epoch
240 );
241 }
242 }
243 if is_stop {
244 coordinator_stream_handle.stop().await?;
245 info!(
246 sink_id = self.param.sink_id.sink_id,
247 "coordinated log sinker stops"
248 );
249 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
250 return pending().await;
251 }
252 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
253 } else {
254 let metadata = sink_writer.barrier(false).await?;
255 if let Some(metadata) = metadata {
256 warn!(?metadata, "get metadata on non-checkpoint barrier");
257 }
258 }
259 } else {
260 let metadata = sink_writer.barrier(false).await?;
261 if let Some(metadata) = metadata {
262 warn!(?metadata, "get metadata on non-checkpoint barrier");
263 }
264 }
265 state = LogConsumerState::BarrierReceived { prev_epoch }
266 }
267 }
268 }
269 }
270}