risingwave_connector/sink/
boxed.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::ops::DerefMut;
17
18use async_trait::async_trait;
19use futures::FutureExt;
20use futures::future::BoxFuture;
21use risingwave_common::catalog::Field;
22use risingwave_pb::connector_service::SinkMetadata;
23
24use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
25use crate::sink::{
26    LogSinker, SinglePhaseCommitCoordinator, SinkLogReader, TwoPhaseCommitCoordinator,
27};
28
29pub type BoxSinglePhaseCoordinator = Box<dyn SinglePhaseCommitCoordinator + Send + 'static>;
30pub type BoxTwoPhaseCoordinator = Box<dyn TwoPhaseCommitCoordinator + Send + 'static>;
31
32pub type BoxLogSinker = Box<
33    dyn for<'a> FnOnce(&'a mut dyn DynLogReader) -> BoxFuture<'a, crate::sink::Result<!>>
34        + Send
35        + 'static,
36>;
37
38#[async_trait]
39pub trait DynLogReader: Send {
40    async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()>;
41    async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)>;
42
43    fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
44}
45
46#[async_trait]
47impl<R: SinkLogReader> DynLogReader for R {
48    async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()> {
49        R::start_from(self, start_offset).await
50    }
51
52    async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
53        R::next_item(self).await
54    }
55
56    fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
57        R::truncate(self, offset)
58    }
59}
60
61impl SinkLogReader for &mut dyn DynLogReader {
62    fn start_from(
63        &mut self,
64        start_offset: Option<u64>,
65    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
66        (*self).dyn_start_from(start_offset)
67    }
68
69    fn next_item(
70        &mut self,
71    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
72        (*self).dyn_next_item()
73    }
74
75    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
76        (*self).dyn_truncate(offset)
77    }
78}
79
80pub fn boxed_log_sinker(log_sinker: impl LogSinker) -> BoxLogSinker {
81    fn make_future<'a>(
82        log_sinker: impl LogSinker,
83        log_reader: &'a mut dyn DynLogReader,
84    ) -> BoxFuture<'a, crate::sink::Result<!>> {
85        log_sinker.consume_log_and_sink(log_reader).boxed()
86    }
87
88    // Note: it's magical that the following expression can be cast to the expected return type
89    // without any explicit conversion, such as `<expr> as _` or `<expr>.into()`.
90    // TODO: may investigate the reason. The currently successful compilation seems volatile to future compatibility.
91    Box::new(move |log_reader: &mut dyn DynLogReader| make_future(log_sinker, log_reader))
92}
93
94#[async_trait]
95impl LogSinker for BoxLogSinker {
96    async fn consume_log_and_sink(
97        self,
98        mut log_reader: impl SinkLogReader,
99    ) -> crate::sink::Result<!> {
100        (self)(&mut log_reader).await
101    }
102}
103
104#[async_trait]
105impl SinglePhaseCommitCoordinator for BoxSinglePhaseCoordinator {
106    async fn init(&mut self) -> crate::sink::Result<()> {
107        self.deref_mut().init().await
108    }
109
110    async fn commit(
111        &mut self,
112        epoch: u64,
113        metadata: Vec<SinkMetadata>,
114        add_columns: Option<Vec<Field>>,
115    ) -> crate::sink::Result<()> {
116        self.deref_mut().commit(epoch, metadata, add_columns).await
117    }
118}
119
120#[async_trait]
121impl TwoPhaseCommitCoordinator for BoxTwoPhaseCoordinator {
122    async fn init(&mut self) -> crate::sink::Result<()> {
123        self.deref_mut().init().await
124    }
125
126    async fn pre_commit(
127        &mut self,
128        epoch: u64,
129        metadata: Vec<SinkMetadata>,
130        add_columns: Option<Vec<Field>>,
131    ) -> crate::sink::Result<Vec<u8>> {
132        self.deref_mut()
133            .pre_commit(epoch, metadata, add_columns)
134            .await
135    }
136
137    async fn commit(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> crate::sink::Result<()> {
138        self.deref_mut().commit(epoch, commit_metadata).await
139    }
140
141    async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>) {
142        self.deref_mut().abort(epoch, commit_metadata).await;
143    }
144}