risingwave_connector/sink/
boxed.rs1use std::future::Future;
16use std::ops::DerefMut;
17
18use async_trait::async_trait;
19use futures::FutureExt;
20use futures::future::BoxFuture;
21use risingwave_pb::connector_service::SinkMetadata;
22
23use super::SinkCommittedEpochSubscriber;
24use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
25use crate::sink::{LogSinker, SinkCommitCoordinator, SinkLogReader};
26
27pub type BoxCoordinator = Box<dyn SinkCommitCoordinator + Send + 'static>;
28
29pub type BoxLogSinker = Box<
30 dyn for<'a> FnOnce(&'a mut dyn DynLogReader) -> BoxFuture<'a, crate::sink::Result<!>>
31 + Send
32 + 'static,
33>;
34
35#[async_trait]
36pub trait DynLogReader: Send {
37 async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()>;
38 async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)>;
39
40 fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
41}
42
43#[async_trait]
44impl<R: SinkLogReader> DynLogReader for R {
45 async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()> {
46 R::start_from(self, start_offset).await
47 }
48
49 async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
50 R::next_item(self).await
51 }
52
53 fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
54 R::truncate(self, offset)
55 }
56}
57
58impl SinkLogReader for &mut dyn DynLogReader {
59 fn start_from(
60 &mut self,
61 start_offset: Option<u64>,
62 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
63 (*self).dyn_start_from(start_offset)
64 }
65
66 fn next_item(
67 &mut self,
68 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
69 (*self).dyn_next_item()
70 }
71
72 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
73 (*self).dyn_truncate(offset)
74 }
75}
76
77pub fn boxed_log_sinker(log_sinker: impl LogSinker) -> BoxLogSinker {
78 fn make_future<'a>(
79 log_sinker: impl LogSinker,
80 log_reader: &'a mut dyn DynLogReader,
81 ) -> BoxFuture<'a, crate::sink::Result<!>> {
82 log_sinker.consume_log_and_sink(log_reader).boxed()
83 }
84
85 Box::new(move |log_reader: &mut dyn DynLogReader| make_future(log_sinker, log_reader))
89}
90
91#[async_trait]
92impl LogSinker for BoxLogSinker {
93 async fn consume_log_and_sink(
94 self,
95 mut log_reader: impl SinkLogReader,
96 ) -> crate::sink::Result<!> {
97 (self)(&mut log_reader).await
98 }
99}
100
101#[async_trait]
102impl SinkCommitCoordinator for BoxCoordinator {
103 async fn init(
104 &mut self,
105 subscriber: SinkCommittedEpochSubscriber,
106 ) -> crate::sink::Result<Option<u64>> {
107 self.deref_mut().init(subscriber).await
108 }
109
110 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> crate::sink::Result<()> {
111 self.deref_mut().commit(epoch, metadata).await
112 }
113}