risingwave_connector/sink/
writer.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::{Future, Ready};
16use std::pin::pin;
17use std::time::Instant;
18
19use async_trait::async_trait;
20use futures::TryFuture;
21use futures::future::{Either, select};
22use risingwave_common::array::StreamChunk;
23use rw_futures_util::drop_either_future;
24
25use crate::sink::encoder::SerTo;
26use crate::sink::formatter::SinkFormatter;
27use crate::sink::log_store::{
28    DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogStoreReadItem, TruncateOffset,
29};
30use crate::sink::{LogSinker, Result, SinkError, SinkLogReader, SinkWriterMetrics};
31
32#[async_trait]
33pub trait SinkWriter: Send + 'static {
34    type CommitMetadata: Send = ();
35    /// Begin a new epoch
36    async fn begin_epoch(&mut self, epoch: u64) -> Result<()>;
37
38    /// Write a stream chunk to sink
39    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()>;
40
41    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
42    /// writer should commit the current epoch.
43    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata>;
44
45    /// Return true when the writer wants to commit on the next checkpoint barrier earlier than
46    /// the configured checkpoint interval.
47    fn should_commit_on_checkpoint(&self) -> bool {
48        false
49    }
50
51    /// Clean up
52    async fn abort(&mut self) -> Result<()> {
53        Ok(())
54    }
55}
56
57pub type DummyDeliveryFuture = Ready<std::result::Result<(), SinkError>>;
58
59pub trait AsyncTruncateSinkWriter: Send + 'static {
60    type DeliveryFuture: TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static =
61        DummyDeliveryFuture;
62
63    fn write_chunk<'a>(
64        &'a mut self,
65        chunk: StreamChunk,
66        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
67    ) -> impl Future<Output = Result<()>> + Send + 'a;
68
69    fn barrier(&mut self, _is_checkpoint: bool) -> impl Future<Output = Result<()>> + Send + '_ {
70        async { Ok(()) }
71    }
72}
73
74/// A free-form sink that may output in multiple formats and encodings. Examples include kafka,
75/// kinesis, nats and redis.
76///
77/// The implementor specifies required key & value type (likely string or bytes), as well as how to
78/// write a single pair. The provided `write_chunk` method would handle the interaction with a
79/// `SinkFormatter`.
80///
81/// Currently kafka takes `&mut self` while kinesis takes `&self`. So we use `&mut self` in trait
82/// but implement it for `&Kinesis`. This allows us to hold `&mut &Kinesis` and `&Kinesis`
83/// simultaneously, preventing the schema clone issue propagating from kafka to kinesis.
84pub trait FormattedSink {
85    type K;
86    type V;
87    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()>;
88
89    async fn write_chunk<F: SinkFormatter>(
90        &mut self,
91        chunk: StreamChunk,
92        formatter: &F,
93    ) -> Result<()>
94    where
95        F::K: SerTo<Self::K>,
96        F::V: SerTo<Self::V>,
97    {
98        for r in formatter.format_chunk(&chunk) {
99            let (event_key_object, event_object) = r?;
100
101            self.write_one(
102                event_key_object.map(SerTo::ser_to).transpose()?,
103                event_object.map(SerTo::ser_to).transpose()?,
104            )
105            .await?;
106        }
107
108        Ok(())
109    }
110}
111
112pub struct LogSinkerOf<W> {
113    writer: W,
114    sink_writer_metrics: SinkWriterMetrics,
115}
116
117impl<W> LogSinkerOf<W> {
118    pub fn new(writer: W, sink_writer_metrics: SinkWriterMetrics) -> Self {
119        LogSinkerOf {
120            writer,
121            sink_writer_metrics,
122        }
123    }
124}
125
126#[async_trait]
127impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for LogSinkerOf<W> {
128    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
129        log_reader.start_from(None).await?;
130        let mut sink_writer = self.writer;
131        let metrics = self.sink_writer_metrics;
132        #[derive(Debug)]
133        enum LogConsumerState {
134            /// Mark that the log consumer is not initialized yet
135            Uninitialized,
136
137            /// Mark that a new epoch has begun.
138            EpochBegun { curr_epoch: u64 },
139
140            /// Mark that the consumer has just received a barrier
141            BarrierReceived { prev_epoch: u64 },
142        }
143
144        let mut state = LogConsumerState::Uninitialized;
145
146        loop {
147            let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
148            // begin_epoch when not previously began
149            state = match state {
150                LogConsumerState::Uninitialized => {
151                    sink_writer.begin_epoch(epoch).await?;
152                    LogConsumerState::EpochBegun { curr_epoch: epoch }
153                }
154                LogConsumerState::EpochBegun { curr_epoch } => {
155                    assert!(
156                        epoch >= curr_epoch,
157                        "new epoch {} should not be below the current epoch {}",
158                        epoch,
159                        curr_epoch
160                    );
161                    LogConsumerState::EpochBegun { curr_epoch: epoch }
162                }
163                LogConsumerState::BarrierReceived { prev_epoch } => {
164                    assert!(
165                        epoch > prev_epoch,
166                        "new epoch {} should be greater than prev epoch {}",
167                        epoch,
168                        prev_epoch
169                    );
170                    sink_writer.begin_epoch(epoch).await?;
171                    LogConsumerState::EpochBegun { curr_epoch: epoch }
172                }
173            };
174            match item {
175                LogStoreReadItem::StreamChunk { chunk, .. } => {
176                    if let Err(e) = sink_writer.write_batch(chunk).await {
177                        sink_writer.abort().await?;
178                        return Err(e);
179                    }
180                }
181                LogStoreReadItem::Barrier {
182                    is_checkpoint,
183                    new_vnode_bitmap,
184                    ..
185                } => {
186                    let prev_epoch = match state {
187                        LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
188                        _ => unreachable!("epoch must have begun before handling barrier"),
189                    };
190                    if is_checkpoint {
191                        let start_time = Instant::now();
192                        sink_writer.barrier(true).await?;
193                        metrics
194                            .sink_commit_duration
195                            .observe(start_time.elapsed().as_secs_f64());
196                        log_reader.truncate(TruncateOffset::Barrier { epoch })?;
197                    } else {
198                        assert!(new_vnode_bitmap.is_none());
199                        sink_writer.barrier(false).await?;
200                    }
201                    state = LogConsumerState::BarrierReceived { prev_epoch }
202                }
203            }
204        }
205    }
206}
207
208#[easy_ext::ext(SinkWriterExt)]
209impl<T> T
210where
211    T: SinkWriter<CommitMetadata = ()> + Sized,
212{
213    pub fn into_log_sinker(self, sink_writer_metrics: SinkWriterMetrics) -> LogSinkerOf<Self> {
214        LogSinkerOf {
215            writer: self,
216            sink_writer_metrics,
217        }
218    }
219}
220
221pub struct AsyncTruncateLogSinkerOf<W: AsyncTruncateSinkWriter> {
222    writer: W,
223    future_manager: DeliveryFutureManager<W::DeliveryFuture>,
224}
225
226impl<W: AsyncTruncateSinkWriter> AsyncTruncateLogSinkerOf<W> {
227    pub fn new(writer: W, max_future_count: usize) -> Self {
228        AsyncTruncateLogSinkerOf {
229            writer,
230            future_manager: DeliveryFutureManager::new(max_future_count),
231        }
232    }
233}
234
235#[async_trait]
236impl<W: AsyncTruncateSinkWriter> LogSinker for AsyncTruncateLogSinkerOf<W> {
237    async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
238        log_reader.start_from(None).await?;
239        loop {
240            let select_result = drop_either_future(
241                select(
242                    pin!(log_reader.next_item()),
243                    pin!(self.future_manager.next_truncate_offset()),
244                )
245                .await,
246            );
247            match select_result {
248                Either::Left(item_result) => {
249                    let (epoch, item) = item_result?;
250                    match item {
251                        LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
252                            let add_future = self.future_manager.start_write_chunk(epoch, chunk_id);
253                            self.writer.write_chunk(chunk, add_future).await?;
254                        }
255                        LogStoreReadItem::Barrier { is_checkpoint, .. } => {
256                            self.writer.barrier(is_checkpoint).await?;
257                            self.future_manager.add_barrier(epoch);
258                        }
259                    }
260                }
261                Either::Right(offset_result) => {
262                    let offset = offset_result?;
263                    log_reader.truncate(offset)?;
264                }
265            }
266        }
267    }
268}
269
270#[easy_ext::ext(AsyncTruncateSinkWriterExt)]
271impl<T> T
272where
273    T: AsyncTruncateSinkWriter + Sized,
274{
275    pub fn into_log_sinker(self, max_future_count: usize) -> AsyncTruncateLogSinkerOf<Self> {
276        AsyncTruncateLogSinkerOf::new(self, max_future_count)
277    }
278}