risingwave_connector/sink/
writer.rs1use std::future::{Future, Ready};
16use std::pin::pin;
17use std::time::Instant;
18
19use async_trait::async_trait;
20use futures::TryFuture;
21use futures::future::{Either, select};
22use risingwave_common::array::StreamChunk;
23use rw_futures_util::drop_either_future;
24
25use crate::sink::encoder::SerTo;
26use crate::sink::formatter::SinkFormatter;
27use crate::sink::log_store::{
28 DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogStoreReadItem, TruncateOffset,
29};
30use crate::sink::{LogSinker, Result, SinkError, SinkLogReader, SinkWriterMetrics};
31
32#[async_trait]
33pub trait SinkWriter: Send + 'static {
34 type CommitMetadata: Send = ();
35 async fn begin_epoch(&mut self, epoch: u64) -> Result<()>;
37
38 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()>;
40
41 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata>;
44
45 fn should_commit_on_checkpoint(&self) -> bool {
48 false
49 }
50
51 async fn abort(&mut self) -> Result<()> {
53 Ok(())
54 }
55}
56
57pub type DummyDeliveryFuture = Ready<std::result::Result<(), SinkError>>;
58
59pub trait AsyncTruncateSinkWriter: Send + 'static {
60 type DeliveryFuture: TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static =
61 DummyDeliveryFuture;
62
63 fn write_chunk<'a>(
64 &'a mut self,
65 chunk: StreamChunk,
66 add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
67 ) -> impl Future<Output = Result<()>> + Send + 'a;
68
69 fn barrier(&mut self, _is_checkpoint: bool) -> impl Future<Output = Result<()>> + Send + '_ {
70 async { Ok(()) }
71 }
72}
73
74pub trait FormattedSink {
85 type K;
86 type V;
87 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()>;
88
89 async fn write_chunk<F: SinkFormatter>(
90 &mut self,
91 chunk: StreamChunk,
92 formatter: &F,
93 ) -> Result<()>
94 where
95 F::K: SerTo<Self::K>,
96 F::V: SerTo<Self::V>,
97 {
98 for r in formatter.format_chunk(&chunk) {
99 let (event_key_object, event_object) = r?;
100
101 self.write_one(
102 event_key_object.map(SerTo::ser_to).transpose()?,
103 event_object.map(SerTo::ser_to).transpose()?,
104 )
105 .await?;
106 }
107
108 Ok(())
109 }
110}
111
112pub struct LogSinkerOf<W> {
113 writer: W,
114 sink_writer_metrics: SinkWriterMetrics,
115}
116
117impl<W> LogSinkerOf<W> {
118 pub fn new(writer: W, sink_writer_metrics: SinkWriterMetrics) -> Self {
119 LogSinkerOf {
120 writer,
121 sink_writer_metrics,
122 }
123 }
124}
125
126#[async_trait]
127impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for LogSinkerOf<W> {
128 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
129 log_reader.start_from(None).await?;
130 let mut sink_writer = self.writer;
131 let metrics = self.sink_writer_metrics;
132 #[derive(Debug)]
133 enum LogConsumerState {
134 Uninitialized,
136
137 EpochBegun { curr_epoch: u64 },
139
140 BarrierReceived { prev_epoch: u64 },
142 }
143
144 let mut state = LogConsumerState::Uninitialized;
145
146 loop {
147 let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
148 state = match state {
150 LogConsumerState::Uninitialized => {
151 sink_writer.begin_epoch(epoch).await?;
152 LogConsumerState::EpochBegun { curr_epoch: epoch }
153 }
154 LogConsumerState::EpochBegun { curr_epoch } => {
155 assert!(
156 epoch >= curr_epoch,
157 "new epoch {} should not be below the current epoch {}",
158 epoch,
159 curr_epoch
160 );
161 LogConsumerState::EpochBegun { curr_epoch: epoch }
162 }
163 LogConsumerState::BarrierReceived { prev_epoch } => {
164 assert!(
165 epoch > prev_epoch,
166 "new epoch {} should be greater than prev epoch {}",
167 epoch,
168 prev_epoch
169 );
170 sink_writer.begin_epoch(epoch).await?;
171 LogConsumerState::EpochBegun { curr_epoch: epoch }
172 }
173 };
174 match item {
175 LogStoreReadItem::StreamChunk { chunk, .. } => {
176 if let Err(e) = sink_writer.write_batch(chunk).await {
177 sink_writer.abort().await?;
178 return Err(e);
179 }
180 }
181 LogStoreReadItem::Barrier {
182 is_checkpoint,
183 new_vnode_bitmap,
184 ..
185 } => {
186 let prev_epoch = match state {
187 LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
188 _ => unreachable!("epoch must have begun before handling barrier"),
189 };
190 if is_checkpoint {
191 let start_time = Instant::now();
192 sink_writer.barrier(true).await?;
193 metrics
194 .sink_commit_duration
195 .observe(start_time.elapsed().as_secs_f64());
196 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
197 } else {
198 assert!(new_vnode_bitmap.is_none());
199 sink_writer.barrier(false).await?;
200 }
201 state = LogConsumerState::BarrierReceived { prev_epoch }
202 }
203 }
204 }
205 }
206}
207
208#[easy_ext::ext(SinkWriterExt)]
209impl<T> T
210where
211 T: SinkWriter<CommitMetadata = ()> + Sized,
212{
213 pub fn into_log_sinker(self, sink_writer_metrics: SinkWriterMetrics) -> LogSinkerOf<Self> {
214 LogSinkerOf {
215 writer: self,
216 sink_writer_metrics,
217 }
218 }
219}
220
221pub struct AsyncTruncateLogSinkerOf<W: AsyncTruncateSinkWriter> {
222 writer: W,
223 future_manager: DeliveryFutureManager<W::DeliveryFuture>,
224}
225
226impl<W: AsyncTruncateSinkWriter> AsyncTruncateLogSinkerOf<W> {
227 pub fn new(writer: W, max_future_count: usize) -> Self {
228 AsyncTruncateLogSinkerOf {
229 writer,
230 future_manager: DeliveryFutureManager::new(max_future_count),
231 }
232 }
233}
234
235#[async_trait]
236impl<W: AsyncTruncateSinkWriter> LogSinker for AsyncTruncateLogSinkerOf<W> {
237 async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
238 log_reader.start_from(None).await?;
239 loop {
240 let select_result = drop_either_future(
241 select(
242 pin!(log_reader.next_item()),
243 pin!(self.future_manager.next_truncate_offset()),
244 )
245 .await,
246 );
247 match select_result {
248 Either::Left(item_result) => {
249 let (epoch, item) = item_result?;
250 match item {
251 LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
252 let add_future = self.future_manager.start_write_chunk(epoch, chunk_id);
253 self.writer.write_chunk(chunk, add_future).await?;
254 }
255 LogStoreReadItem::Barrier { is_checkpoint, .. } => {
256 self.writer.barrier(is_checkpoint).await?;
257 self.future_manager.add_barrier(epoch);
258 }
259 }
260 }
261 Either::Right(offset_result) => {
262 let offset = offset_result?;
263 log_reader.truncate(offset)?;
264 }
265 }
266 }
267 }
268}
269
270#[easy_ext::ext(AsyncTruncateSinkWriterExt)]
271impl<T> T
272where
273 T: AsyncTruncateSinkWriter + Sized,
274{
275 pub fn into_log_sinker(self, max_future_count: usize) -> AsyncTruncateLogSinkerOf<Self> {
276 AsyncTruncateLogSinkerOf::new(self, max_future_count)
277 }
278}