risingwave_connector/sink/
boxed.rs1use std::future::Future;
16use std::ops::DerefMut;
17
18use async_trait::async_trait;
19use futures::FutureExt;
20use futures::future::BoxFuture;
21use risingwave_common::catalog::Field;
22use risingwave_pb::connector_service::SinkMetadata;
23
24use super::SinkCommittedEpochSubscriber;
25use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
26use crate::sink::{LogSinker, SinkCommitCoordinator, SinkLogReader};
27
28pub type BoxCoordinator = Box<dyn SinkCommitCoordinator + Send + 'static>;
29
30pub type BoxLogSinker = Box<
31 dyn for<'a> FnOnce(&'a mut dyn DynLogReader) -> BoxFuture<'a, crate::sink::Result<!>>
32 + Send
33 + 'static,
34>;
35
36#[async_trait]
37pub trait DynLogReader: Send {
38 async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()>;
39 async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)>;
40
41 fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
42}
43
44#[async_trait]
45impl<R: SinkLogReader> DynLogReader for R {
46 async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()> {
47 R::start_from(self, start_offset).await
48 }
49
50 async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
51 R::next_item(self).await
52 }
53
54 fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
55 R::truncate(self, offset)
56 }
57}
58
59impl SinkLogReader for &mut dyn DynLogReader {
60 fn start_from(
61 &mut self,
62 start_offset: Option<u64>,
63 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
64 (*self).dyn_start_from(start_offset)
65 }
66
67 fn next_item(
68 &mut self,
69 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
70 (*self).dyn_next_item()
71 }
72
73 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
74 (*self).dyn_truncate(offset)
75 }
76}
77
78pub fn boxed_log_sinker(log_sinker: impl LogSinker) -> BoxLogSinker {
79 fn make_future<'a>(
80 log_sinker: impl LogSinker,
81 log_reader: &'a mut dyn DynLogReader,
82 ) -> BoxFuture<'a, crate::sink::Result<!>> {
83 log_sinker.consume_log_and_sink(log_reader).boxed()
84 }
85
86 Box::new(move |log_reader: &mut dyn DynLogReader| make_future(log_sinker, log_reader))
90}
91
92#[async_trait]
93impl LogSinker for BoxLogSinker {
94 async fn consume_log_and_sink(
95 self,
96 mut log_reader: impl SinkLogReader,
97 ) -> crate::sink::Result<!> {
98 (self)(&mut log_reader).await
99 }
100}
101
102#[async_trait]
103impl SinkCommitCoordinator for BoxCoordinator {
104 async fn init(
105 &mut self,
106 subscriber: SinkCommittedEpochSubscriber,
107 ) -> crate::sink::Result<Option<u64>> {
108 self.deref_mut().init(subscriber).await
109 }
110
111 async fn commit(
112 &mut self,
113 epoch: u64,
114 metadata: Vec<SinkMetadata>,
115 add_columns: Option<Vec<Field>>,
116 ) -> crate::sink::Result<()> {
117 self.deref_mut().commit(epoch, metadata, add_columns).await
118 }
119}