risingwave_connector/sink/
coordinate.rs
1use std::future::pending;
16use std::num::NonZeroU64;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use async_trait::async_trait;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_pb::connector_service::SinkMetadata;
23use tracing::{info, warn};
24
25use super::{
26 LogSinker, SinkCoordinationRpcClientEnum, SinkError, SinkLogReader, SinkWriterMetrics,
27 SinkWriterParam,
28};
29use crate::sink::writer::SinkWriter;
30use crate::sink::{LogStoreReadItem, Result, SinkParam, TruncateOffset};
31
32pub struct CoordinatedLogSinker<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
33 writer: W,
34 sink_coordinate_client: SinkCoordinationRpcClientEnum,
35 param: SinkParam,
36 vnode_bitmap: Bitmap,
37 commit_checkpoint_interval: NonZeroU64,
38 sink_writer_metrics: SinkWriterMetrics,
39}
40
41impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedLogSinker<W> {
42 pub async fn new(
43 writer_param: &SinkWriterParam,
44 param: SinkParam,
45 writer: W,
46 commit_checkpoint_interval: NonZeroU64,
47 ) -> Result<Self> {
48 Ok(Self {
49 writer,
50 sink_coordinate_client: writer_param
51 .meta_client
52 .as_ref()
53 .ok_or_else(|| anyhow!("should have meta client"))?
54 .clone()
55 .sink_coordinate_client()
56 .await,
57 param,
58 vnode_bitmap: writer_param
59 .vnode_bitmap
60 .as_ref()
61 .ok_or_else(|| {
62 anyhow!("sink needs coordination and should not have singleton input")
63 })?
64 .clone(),
65 commit_checkpoint_interval,
66 sink_writer_metrics: SinkWriterMetrics::new(writer_param),
67 })
68 }
69}
70
71#[async_trait]
72impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for CoordinatedLogSinker<W> {
73 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
74 let (mut coordinator_stream_handle, log_store_rewind_start_epoch) = self
75 .sink_coordinate_client
76 .new_stream_handle(&self.param, self.vnode_bitmap)
77 .await?;
78 let mut sink_writer = self.writer;
79 log_reader.start_from(log_store_rewind_start_epoch).await?;
80 #[derive(Debug)]
81 enum LogConsumerState {
82 Uninitialized,
84
85 EpochBegun { curr_epoch: u64 },
87
88 BarrierReceived { prev_epoch: u64 },
90 }
91
92 let mut state = LogConsumerState::Uninitialized;
93
94 let mut current_checkpoint: u64 = 0;
95 let commit_checkpoint_interval = self.commit_checkpoint_interval;
96 let sink_writer_metrics = self.sink_writer_metrics;
97
98 loop {
99 let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
100 state = match state {
102 LogConsumerState::Uninitialized => {
103 sink_writer.begin_epoch(epoch).await?;
104 LogConsumerState::EpochBegun { curr_epoch: epoch }
105 }
106 LogConsumerState::EpochBegun { curr_epoch } => {
107 assert!(
108 epoch >= curr_epoch,
109 "new epoch {} should not be below the current epoch {}",
110 epoch,
111 curr_epoch
112 );
113 LogConsumerState::EpochBegun { curr_epoch: epoch }
114 }
115 LogConsumerState::BarrierReceived { prev_epoch, .. } => {
116 assert!(
117 epoch > prev_epoch,
118 "new epoch {} should be greater than prev epoch {}",
119 epoch,
120 prev_epoch
121 );
122
123 sink_writer.begin_epoch(epoch).await?;
124 LogConsumerState::EpochBegun { curr_epoch: epoch }
125 }
126 };
127 match item {
128 LogStoreReadItem::StreamChunk { chunk, .. } => {
129 if let Err(e) = sink_writer.write_batch(chunk).await {
130 sink_writer.abort().await?;
131 return Err(e);
132 }
133 }
134 LogStoreReadItem::Barrier {
135 is_checkpoint,
136 new_vnode_bitmap,
137 is_stop,
138 } => {
139 let prev_epoch = match state {
140 LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
141 _ => unreachable!("epoch must have begun before handling barrier"),
142 };
143 if is_checkpoint {
144 current_checkpoint += 1;
145 if current_checkpoint >= commit_checkpoint_interval.get()
146 || new_vnode_bitmap.is_some()
147 || is_stop
148 {
149 let start_time = Instant::now();
150 let metadata = sink_writer.barrier(true).await?;
151 let metadata = metadata.ok_or_else(|| {
152 SinkError::Coordinator(anyhow!(
153 "should get metadata on checkpoint barrier"
154 ))
155 })?;
156 coordinator_stream_handle.commit(epoch, metadata).await?;
157 sink_writer_metrics
158 .sink_commit_duration
159 .observe(start_time.elapsed().as_millis() as f64);
160 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
161
162 current_checkpoint = 0;
163 if let Some(new_vnode_bitmap) = new_vnode_bitmap {
164 coordinator_stream_handle
165 .update_vnode_bitmap(&new_vnode_bitmap)
166 .await?;
167 }
168 if is_stop {
169 coordinator_stream_handle.stop().await?;
170 info!(
171 sink_id = self.param.sink_id.sink_id,
172 "coordinated log sinker stops"
173 );
174 return pending().await;
175 }
176 } else {
177 let metadata = sink_writer.barrier(false).await?;
178 if let Some(metadata) = metadata {
179 warn!(?metadata, "get metadata on non-checkpoint barrier");
180 }
181 }
182 } else {
183 let metadata = sink_writer.barrier(false).await?;
184 if let Some(metadata) = metadata {
185 warn!(?metadata, "get metadata on non-checkpoint barrier");
186 }
187 }
188 state = LogConsumerState::BarrierReceived { prev_epoch }
189 }
190 }
191 }
192 }
193}