risingwave_connector/sink/
boxed.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::ops::DerefMut;
17
18use async_trait::async_trait;
19use futures::FutureExt;
20use futures::future::BoxFuture;
21use risingwave_common::catalog::Field;
22use risingwave_pb::connector_service::SinkMetadata;
23
24use super::SinkCommittedEpochSubscriber;
25use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
26use crate::sink::{LogSinker, SinkCommitCoordinator, SinkLogReader};
27
28pub type BoxCoordinator = Box<dyn SinkCommitCoordinator + Send + 'static>;
29
30pub type BoxLogSinker = Box<
31    dyn for<'a> FnOnce(&'a mut dyn DynLogReader) -> BoxFuture<'a, crate::sink::Result<!>>
32        + Send
33        + 'static,
34>;
35
36#[async_trait]
37pub trait DynLogReader: Send {
38    async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()>;
39    async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)>;
40
41    fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
42}
43
44#[async_trait]
45impl<R: SinkLogReader> DynLogReader for R {
46    async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()> {
47        R::start_from(self, start_offset).await
48    }
49
50    async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
51        R::next_item(self).await
52    }
53
54    fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
55        R::truncate(self, offset)
56    }
57}
58
59impl SinkLogReader for &mut dyn DynLogReader {
60    fn start_from(
61        &mut self,
62        start_offset: Option<u64>,
63    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
64        (*self).dyn_start_from(start_offset)
65    }
66
67    fn next_item(
68        &mut self,
69    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
70        (*self).dyn_next_item()
71    }
72
73    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
74        (*self).dyn_truncate(offset)
75    }
76}
77
78pub fn boxed_log_sinker(log_sinker: impl LogSinker) -> BoxLogSinker {
79    fn make_future<'a>(
80        log_sinker: impl LogSinker,
81        log_reader: &'a mut dyn DynLogReader,
82    ) -> BoxFuture<'a, crate::sink::Result<!>> {
83        log_sinker.consume_log_and_sink(log_reader).boxed()
84    }
85
86    // Note: it's magical that the following expression can be cast to the expected return type
87    // without any explicit conversion, such as `<expr> as _` or `<expr>.into()`.
88    // TODO: may investigate the reason. The currently successful compilation seems volatile to future compatibility.
89    Box::new(move |log_reader: &mut dyn DynLogReader| make_future(log_sinker, log_reader))
90}
91
92#[async_trait]
93impl LogSinker for BoxLogSinker {
94    async fn consume_log_and_sink(
95        self,
96        mut log_reader: impl SinkLogReader,
97    ) -> crate::sink::Result<!> {
98        (self)(&mut log_reader).await
99    }
100}
101
102#[async_trait]
103impl SinkCommitCoordinator for BoxCoordinator {
104    async fn init(
105        &mut self,
106        subscriber: SinkCommittedEpochSubscriber,
107    ) -> crate::sink::Result<Option<u64>> {
108        self.deref_mut().init(subscriber).await
109    }
110
111    async fn commit(
112        &mut self,
113        epoch: u64,
114        metadata: Vec<SinkMetadata>,
115        add_columns: Option<Vec<Field>>,
116    ) -> crate::sink::Result<()> {
117        self.deref_mut().commit(epoch, metadata, add_columns).await
118    }
119}