risingwave_connector/sink/
boxed.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use async_trait::async_trait;
18use futures::FutureExt;
19use futures::future::BoxFuture;
20
21use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
22use crate::sink::{
23    LogSinker, SinglePhaseCommitCoordinator, SinkLogReader, TwoPhaseCommitCoordinator,
24};
25
26pub type BoxSinglePhaseCoordinator = Box<dyn SinglePhaseCommitCoordinator + Send + 'static>;
27pub type BoxTwoPhaseCoordinator = Box<dyn TwoPhaseCommitCoordinator + Send + 'static>;
28
29pub type BoxLogSinker = Box<
30    dyn for<'a> FnOnce(&'a mut dyn DynLogReader) -> BoxFuture<'a, crate::sink::Result<!>>
31        + Send
32        + 'static,
33>;
34
35#[async_trait]
36pub trait DynLogReader: Send {
37    async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()>;
38    async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)>;
39
40    fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
41}
42
43#[async_trait]
44impl<R: SinkLogReader> DynLogReader for R {
45    async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()> {
46        R::start_from(self, start_offset).await
47    }
48
49    async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
50        R::next_item(self).await
51    }
52
53    fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
54        R::truncate(self, offset)
55    }
56}
57
58impl SinkLogReader for &mut dyn DynLogReader {
59    fn start_from(
60        &mut self,
61        start_offset: Option<u64>,
62    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
63        (*self).dyn_start_from(start_offset)
64    }
65
66    fn next_item(
67        &mut self,
68    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
69        (*self).dyn_next_item()
70    }
71
72    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
73        (*self).dyn_truncate(offset)
74    }
75}
76
77pub fn boxed_log_sinker(log_sinker: impl LogSinker) -> BoxLogSinker {
78    fn make_future<'a>(
79        log_sinker: impl LogSinker,
80        log_reader: &'a mut dyn DynLogReader,
81    ) -> BoxFuture<'a, crate::sink::Result<!>> {
82        log_sinker.consume_log_and_sink(log_reader).boxed()
83    }
84
85    // Note: it's magical that the following expression can be cast to the expected return type
86    // without any explicit conversion, such as `<expr> as _` or `<expr>.into()`.
87    // TODO: may investigate the reason. The currently successful compilation seems volatile to future compatibility.
88    Box::new(move |log_reader: &mut dyn DynLogReader| make_future(log_sinker, log_reader))
89}
90
91#[async_trait]
92impl LogSinker for BoxLogSinker {
93    async fn consume_log_and_sink(
94        self,
95        mut log_reader: impl SinkLogReader,
96    ) -> crate::sink::Result<!> {
97        (self)(&mut log_reader).await
98    }
99}