risingwave_connector/sink/
boxed.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::ops::DerefMut;
17
18use async_trait::async_trait;
19use futures::FutureExt;
20use futures::future::BoxFuture;
21use risingwave_pb::connector_service::SinkMetadata;
22use risingwave_pb::stream_plan::PbSinkSchemaChange;
23
24use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
25use crate::sink::{
26    LogSinker, SinglePhaseCommitCoordinator, SinkLogReader, TwoPhaseCommitCoordinator,
27};
28
29pub type BoxSinglePhaseCoordinator = Box<dyn SinglePhaseCommitCoordinator + Send + 'static>;
30pub type BoxTwoPhaseCoordinator = Box<dyn TwoPhaseCommitCoordinator + Send + 'static>;
31
32pub type BoxLogSinker = Box<
33    dyn for<'a> FnOnce(&'a mut dyn DynLogReader) -> BoxFuture<'a, crate::sink::Result<!>>
34        + Send
35        + 'static,
36>;
37
38#[async_trait]
39pub trait DynLogReader: Send {
40    async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()>;
41    async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)>;
42
43    fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
44}
45
46#[async_trait]
47impl<R: SinkLogReader> DynLogReader for R {
48    async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()> {
49        R::start_from(self, start_offset).await
50    }
51
52    async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
53        R::next_item(self).await
54    }
55
56    fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
57        R::truncate(self, offset)
58    }
59}
60
61impl SinkLogReader for &mut dyn DynLogReader {
62    fn start_from(
63        &mut self,
64        start_offset: Option<u64>,
65    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
66        (*self).dyn_start_from(start_offset)
67    }
68
69    fn next_item(
70        &mut self,
71    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
72        (*self).dyn_next_item()
73    }
74
75    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
76        (*self).dyn_truncate(offset)
77    }
78}
79
80pub fn boxed_log_sinker(log_sinker: impl LogSinker) -> BoxLogSinker {
81    fn make_future<'a>(
82        log_sinker: impl LogSinker,
83        log_reader: &'a mut dyn DynLogReader,
84    ) -> BoxFuture<'a, crate::sink::Result<!>> {
85        log_sinker.consume_log_and_sink(log_reader).boxed()
86    }
87
88    // Note: it's magical that the following expression can be cast to the expected return type
89    // without any explicit conversion, such as `<expr> as _` or `<expr>.into()`.
90    // TODO: may investigate the reason. The currently successful compilation seems volatile to future compatibility.
91    Box::new(move |log_reader: &mut dyn DynLogReader| make_future(log_sinker, log_reader))
92}
93
94#[async_trait]
95impl LogSinker for BoxLogSinker {
96    async fn consume_log_and_sink(
97        self,
98        mut log_reader: impl SinkLogReader,
99    ) -> crate::sink::Result<!> {
100        (self)(&mut log_reader).await
101    }
102}
103
104#[async_trait]
105impl SinglePhaseCommitCoordinator for BoxSinglePhaseCoordinator {
106    async fn init(&mut self) -> crate::sink::Result<()> {
107        self.deref_mut().init().await
108    }
109
110    async fn commit_data(
111        &mut self,
112        epoch: u64,
113        metadata: Vec<SinkMetadata>,
114    ) -> crate::sink::Result<()> {
115        self.deref_mut().commit_data(epoch, metadata).await
116    }
117
118    async fn commit_schema_change(
119        &mut self,
120        epoch: u64,
121        schema_change: PbSinkSchemaChange,
122    ) -> crate::sink::Result<()> {
123        self.deref_mut()
124            .commit_schema_change(epoch, schema_change)
125            .await
126    }
127}
128
129#[async_trait]
130impl TwoPhaseCommitCoordinator for BoxTwoPhaseCoordinator {
131    async fn init(&mut self) -> crate::sink::Result<()> {
132        self.deref_mut().init().await
133    }
134
135    async fn pre_commit(
136        &mut self,
137        epoch: u64,
138        metadata: Vec<SinkMetadata>,
139        schema_change: Option<PbSinkSchemaChange>,
140    ) -> crate::sink::Result<Option<Vec<u8>>> {
141        self.deref_mut()
142            .pre_commit(epoch, metadata, schema_change)
143            .await
144    }
145
146    async fn commit_data(
147        &mut self,
148        epoch: u64,
149        commit_metadata: Vec<u8>,
150    ) -> crate::sink::Result<()> {
151        self.deref_mut().commit_data(epoch, commit_metadata).await
152    }
153
154    async fn commit_schema_change(
155        &mut self,
156        epoch: u64,
157        schema_change: PbSinkSchemaChange,
158    ) -> crate::sink::Result<()> {
159        self.deref_mut()
160            .commit_schema_change(epoch, schema_change)
161            .await
162    }
163
164    async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>) {
165        self.deref_mut().abort(epoch, commit_metadata).await;
166    }
167}