risingwave_connector/sink/
boxed.rs1use std::future::Future;
16use std::ops::DerefMut;
17
18use async_trait::async_trait;
19use futures::FutureExt;
20use futures::future::BoxFuture;
21use risingwave_pb::connector_service::SinkMetadata;
22use risingwave_pb::stream_plan::PbSinkSchemaChange;
23
24use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
25use crate::sink::{
26 LogSinker, SinglePhaseCommitCoordinator, SinkLogReader, TwoPhaseCommitCoordinator,
27};
28
29pub type BoxSinglePhaseCoordinator = Box<dyn SinglePhaseCommitCoordinator + Send + 'static>;
30pub type BoxTwoPhaseCoordinator = Box<dyn TwoPhaseCommitCoordinator + Send + 'static>;
31
32pub type BoxLogSinker = Box<
33 dyn for<'a> FnOnce(&'a mut dyn DynLogReader) -> BoxFuture<'a, crate::sink::Result<!>>
34 + Send
35 + 'static,
36>;
37
38#[async_trait]
39pub trait DynLogReader: Send {
40 async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()>;
41 async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)>;
42
43 fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
44}
45
46#[async_trait]
47impl<R: SinkLogReader> DynLogReader for R {
48 async fn dyn_start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()> {
49 R::start_from(self, start_offset).await
50 }
51
52 async fn dyn_next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
53 R::next_item(self).await
54 }
55
56 fn dyn_truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
57 R::truncate(self, offset)
58 }
59}
60
61impl SinkLogReader for &mut dyn DynLogReader {
62 fn start_from(
63 &mut self,
64 start_offset: Option<u64>,
65 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
66 (*self).dyn_start_from(start_offset)
67 }
68
69 fn next_item(
70 &mut self,
71 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
72 (*self).dyn_next_item()
73 }
74
75 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
76 (*self).dyn_truncate(offset)
77 }
78}
79
80pub fn boxed_log_sinker(log_sinker: impl LogSinker) -> BoxLogSinker {
81 fn make_future<'a>(
82 log_sinker: impl LogSinker,
83 log_reader: &'a mut dyn DynLogReader,
84 ) -> BoxFuture<'a, crate::sink::Result<!>> {
85 log_sinker.consume_log_and_sink(log_reader).boxed()
86 }
87
88 Box::new(move |log_reader: &mut dyn DynLogReader| make_future(log_sinker, log_reader))
92}
93
94#[async_trait]
95impl LogSinker for BoxLogSinker {
96 async fn consume_log_and_sink(
97 self,
98 mut log_reader: impl SinkLogReader,
99 ) -> crate::sink::Result<!> {
100 (self)(&mut log_reader).await
101 }
102}
103
104#[async_trait]
105impl SinglePhaseCommitCoordinator for BoxSinglePhaseCoordinator {
106 async fn init(&mut self) -> crate::sink::Result<()> {
107 self.deref_mut().init().await
108 }
109
110 async fn commit_data(
111 &mut self,
112 epoch: u64,
113 metadata: Vec<SinkMetadata>,
114 ) -> crate::sink::Result<()> {
115 self.deref_mut().commit_data(epoch, metadata).await
116 }
117
118 async fn commit_schema_change(
119 &mut self,
120 epoch: u64,
121 schema_change: PbSinkSchemaChange,
122 ) -> crate::sink::Result<()> {
123 self.deref_mut()
124 .commit_schema_change(epoch, schema_change)
125 .await
126 }
127}
128
129#[async_trait]
130impl TwoPhaseCommitCoordinator for BoxTwoPhaseCoordinator {
131 async fn init(&mut self) -> crate::sink::Result<()> {
132 self.deref_mut().init().await
133 }
134
135 async fn pre_commit(
136 &mut self,
137 epoch: u64,
138 metadata: Vec<SinkMetadata>,
139 schema_change: Option<PbSinkSchemaChange>,
140 ) -> crate::sink::Result<Option<Vec<u8>>> {
141 self.deref_mut()
142 .pre_commit(epoch, metadata, schema_change)
143 .await
144 }
145
146 async fn commit_data(
147 &mut self,
148 epoch: u64,
149 commit_metadata: Vec<u8>,
150 ) -> crate::sink::Result<()> {
151 self.deref_mut().commit_data(epoch, commit_metadata).await
152 }
153
154 async fn commit_schema_change(
155 &mut self,
156 epoch: u64,
157 schema_change: PbSinkSchemaChange,
158 ) -> crate::sink::Result<()> {
159 self.deref_mut()
160 .commit_schema_change(epoch, schema_change)
161 .await
162 }
163
164 async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>) {
165 self.deref_mut().abort(epoch, commit_metadata).await;
166 }
167}