risingwave_connector/sink/
writer.rs1use std::future::{Future, Ready};
16use std::pin::pin;
17use std::time::Instant;
18
19use async_trait::async_trait;
20use futures::TryFuture;
21use futures::future::{Either, select};
22use risingwave_common::array::StreamChunk;
23use rw_futures_util::drop_either_future;
24
25use crate::sink::encoder::SerTo;
26use crate::sink::formatter::SinkFormatter;
27use crate::sink::log_store::{
28 DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogStoreReadItem, TruncateOffset,
29};
30use crate::sink::{LogSinker, Result, SinkError, SinkLogReader, SinkWriterMetrics};
31
32#[async_trait]
33pub trait SinkWriter: Send + 'static {
34 type CommitMetadata: Send = ();
35 async fn begin_epoch(&mut self, epoch: u64) -> Result<()>;
37
38 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()>;
40
41 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata>;
44
45 async fn abort(&mut self) -> Result<()> {
47 Ok(())
48 }
49}
50
51pub type DummyDeliveryFuture = Ready<std::result::Result<(), SinkError>>;
52
53pub trait AsyncTruncateSinkWriter: Send + 'static {
54 type DeliveryFuture: TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static =
55 DummyDeliveryFuture;
56
57 fn write_chunk<'a>(
58 &'a mut self,
59 chunk: StreamChunk,
60 add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
61 ) -> impl Future<Output = Result<()>> + Send + 'a;
62
63 fn barrier(&mut self, _is_checkpoint: bool) -> impl Future<Output = Result<()>> + Send + '_ {
64 async { Ok(()) }
65 }
66}
67
68pub trait FormattedSink {
79 type K;
80 type V;
81 async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()>;
82
83 async fn write_chunk<F: SinkFormatter>(
84 &mut self,
85 chunk: StreamChunk,
86 formatter: &F,
87 ) -> Result<()>
88 where
89 F::K: SerTo<Self::K>,
90 F::V: SerTo<Self::V>,
91 {
92 for r in formatter.format_chunk(&chunk) {
93 let (event_key_object, event_object) = r?;
94
95 self.write_one(
96 event_key_object.map(SerTo::ser_to).transpose()?,
97 event_object.map(SerTo::ser_to).transpose()?,
98 )
99 .await?;
100 }
101
102 Ok(())
103 }
104}
105
106pub struct LogSinkerOf<W> {
107 writer: W,
108 sink_writer_metrics: SinkWriterMetrics,
109}
110
111impl<W> LogSinkerOf<W> {
112 pub fn new(writer: W, sink_writer_metrics: SinkWriterMetrics) -> Self {
113 LogSinkerOf {
114 writer,
115 sink_writer_metrics,
116 }
117 }
118}
119
120#[async_trait]
121impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for LogSinkerOf<W> {
122 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
123 log_reader.start_from(None).await?;
124 let mut sink_writer = self.writer;
125 let metrics = self.sink_writer_metrics;
126 #[derive(Debug)]
127 enum LogConsumerState {
128 Uninitialized,
130
131 EpochBegun { curr_epoch: u64 },
133
134 BarrierReceived { prev_epoch: u64 },
136 }
137
138 let mut state = LogConsumerState::Uninitialized;
139
140 loop {
141 let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
142 state = match state {
144 LogConsumerState::Uninitialized => {
145 sink_writer.begin_epoch(epoch).await?;
146 LogConsumerState::EpochBegun { curr_epoch: epoch }
147 }
148 LogConsumerState::EpochBegun { curr_epoch } => {
149 assert!(
150 epoch >= curr_epoch,
151 "new epoch {} should not be below the current epoch {}",
152 epoch,
153 curr_epoch
154 );
155 LogConsumerState::EpochBegun { curr_epoch: epoch }
156 }
157 LogConsumerState::BarrierReceived { prev_epoch } => {
158 assert!(
159 epoch > prev_epoch,
160 "new epoch {} should be greater than prev epoch {}",
161 epoch,
162 prev_epoch
163 );
164 sink_writer.begin_epoch(epoch).await?;
165 LogConsumerState::EpochBegun { curr_epoch: epoch }
166 }
167 };
168 match item {
169 LogStoreReadItem::StreamChunk { chunk, .. } => {
170 if let Err(e) = sink_writer.write_batch(chunk).await {
171 sink_writer.abort().await?;
172 return Err(e);
173 }
174 }
175 LogStoreReadItem::Barrier {
176 is_checkpoint,
177 new_vnode_bitmap,
178 ..
179 } => {
180 let prev_epoch = match state {
181 LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
182 _ => unreachable!("epoch must have begun before handling barrier"),
183 };
184 if is_checkpoint {
185 let start_time = Instant::now();
186 sink_writer.barrier(true).await?;
187 metrics
188 .sink_commit_duration
189 .observe(start_time.elapsed().as_millis() as f64);
190 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
191 } else {
192 assert!(new_vnode_bitmap.is_none());
193 sink_writer.barrier(false).await?;
194 }
195 state = LogConsumerState::BarrierReceived { prev_epoch }
196 }
197 }
198 }
199 }
200}
201
202#[easy_ext::ext(SinkWriterExt)]
203impl<T> T
204where
205 T: SinkWriter<CommitMetadata = ()> + Sized,
206{
207 pub fn into_log_sinker(self, sink_writer_metrics: SinkWriterMetrics) -> LogSinkerOf<Self> {
208 LogSinkerOf {
209 writer: self,
210 sink_writer_metrics,
211 }
212 }
213}
214
215pub struct AsyncTruncateLogSinkerOf<W: AsyncTruncateSinkWriter> {
216 writer: W,
217 future_manager: DeliveryFutureManager<W::DeliveryFuture>,
218}
219
220impl<W: AsyncTruncateSinkWriter> AsyncTruncateLogSinkerOf<W> {
221 pub fn new(writer: W, max_future_count: usize) -> Self {
222 AsyncTruncateLogSinkerOf {
223 writer,
224 future_manager: DeliveryFutureManager::new(max_future_count),
225 }
226 }
227}
228
229#[async_trait]
230impl<W: AsyncTruncateSinkWriter> LogSinker for AsyncTruncateLogSinkerOf<W> {
231 async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
232 log_reader.start_from(None).await?;
233 loop {
234 let select_result = drop_either_future(
235 select(
236 pin!(log_reader.next_item()),
237 pin!(self.future_manager.next_truncate_offset()),
238 )
239 .await,
240 );
241 match select_result {
242 Either::Left(item_result) => {
243 let (epoch, item) = item_result?;
244 match item {
245 LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
246 let add_future = self.future_manager.start_write_chunk(epoch, chunk_id);
247 self.writer.write_chunk(chunk, add_future).await?;
248 }
249 LogStoreReadItem::Barrier { is_checkpoint, .. } => {
250 self.writer.barrier(is_checkpoint).await?;
251 self.future_manager.add_barrier(epoch);
252 }
253 }
254 }
255 Either::Right(offset_result) => {
256 let offset = offset_result?;
257 log_reader.truncate(offset)?;
258 }
259 }
260 }
261 }
262}
263
264#[easy_ext::ext(AsyncTruncateSinkWriterExt)]
265impl<T> T
266where
267 T: AsyncTruncateSinkWriter + Sized,
268{
269 pub fn into_log_sinker(self, max_future_count: usize) -> AsyncTruncateLogSinkerOf<Self> {
270 AsyncTruncateLogSinkerOf::new(self, max_future_count)
271 }
272}