risingwave_connector/sink/
test_sink.rs1use std::sync::{Arc, OnceLock};
16
17use anyhow::anyhow;
18use futures::future::BoxFuture;
19use parking_lot::Mutex;
20use sea_orm::DatabaseConnection;
21use tokio::sync::mpsc::UnboundedSender;
22
23use crate::connector_common::IcebergSinkCompactionUpdate;
24use crate::enforce_secret::EnforceSecret;
25use crate::sink::boxed::{BoxCoordinator, BoxLogSinker};
26use crate::sink::{Sink, SinkError, SinkParam, SinkWriterParam};
27
28pub trait BuildBoxLogSinkerTrait = FnMut(SinkParam, SinkWriterParam) -> BoxFuture<'static, crate::sink::Result<BoxLogSinker>>
29 + Send
30 + 'static;
31pub trait BuildBoxCoordinatorTrait = FnMut(
32 DatabaseConnection,
33 SinkParam,
34 Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
35 ) -> BoxCoordinator
36 + Send
37 + 'static;
38
39type BuildBoxLogSinker = Box<dyn BuildBoxLogSinkerTrait>;
40type BuildBoxCoordinator = Box<dyn BuildBoxCoordinatorTrait>;
41pub const TEST_SINK_NAME: &str = "test";
42
43#[derive(Debug)]
44pub struct TestSink {
45 param: SinkParam,
46}
47
48impl EnforceSecret for TestSink {}
49
50impl TryFrom<SinkParam> for TestSink {
51 type Error = SinkError;
52
53 fn try_from(param: SinkParam) -> Result<Self, Self::Error> {
54 if cfg!(any(madsim, test)) {
55 Ok(TestSink { param })
56 } else {
57 Err(SinkError::Config(anyhow!("test sink only support in test")))
58 }
59 }
60}
61
62impl Sink for TestSink {
63 type Coordinator = BoxCoordinator;
64 type LogSinker = BoxLogSinker;
65
66 const SINK_NAME: &'static str = "test";
67
68 async fn validate(&self) -> crate::sink::Result<()> {
69 Ok(())
70 }
71
72 async fn new_log_sinker(
73 &self,
74 writer_param: SinkWriterParam,
75 ) -> crate::sink::Result<Self::LogSinker> {
76 build_box_log_sinker(self.param.clone(), writer_param).await
77 }
78
79 async fn new_coordinator(
80 &self,
81 db: DatabaseConnection,
82 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
83 ) -> crate::sink::Result<Self::Coordinator> {
84 Ok(build_box_coordinator(
85 db,
86 self.param.clone(),
87 iceberg_compact_stat_sender,
88 ))
89 }
90}
91
92struct TestSinkRegistry {
93 build_box_sink: Arc<Mutex<Option<(BuildBoxLogSinker, BuildBoxCoordinator)>>>,
94}
95
96impl TestSinkRegistry {
97 fn new() -> Self {
98 TestSinkRegistry {
99 build_box_sink: Arc::new(Mutex::new(None)),
100 }
101 }
102}
103
104fn get_registry() -> &'static TestSinkRegistry {
105 static GLOBAL_REGISTRY: OnceLock<TestSinkRegistry> = OnceLock::new();
106 GLOBAL_REGISTRY.get_or_init(TestSinkRegistry::new)
107}
108
109pub struct TestSinkRegistryGuard;
110
111impl Drop for TestSinkRegistryGuard {
112 fn drop(&mut self) {
113 assert!(get_registry().build_box_sink.lock().take().is_some());
114 }
115}
116
117fn register_build_sink_inner(
118 build_box_log_sinker: impl BuildBoxLogSinkerTrait,
119 build_box_coordinator: impl BuildBoxCoordinatorTrait,
120) -> TestSinkRegistryGuard {
121 assert!(
122 get_registry()
123 .build_box_sink
124 .lock()
125 .replace((
126 Box::new(build_box_log_sinker),
127 Box::new(build_box_coordinator)
128 ))
129 .is_none()
130 );
131 TestSinkRegistryGuard
132}
133
134pub fn register_build_coordinated_sink(
135 build_box_log_sinker: impl BuildBoxLogSinkerTrait,
136 build_box_coordinator: impl BuildBoxCoordinatorTrait,
137) -> TestSinkRegistryGuard {
138 register_build_sink_inner(build_box_log_sinker, build_box_coordinator)
139}
140
141pub fn register_build_sink(
142 build_box_log_sinker: impl BuildBoxLogSinkerTrait,
143) -> TestSinkRegistryGuard {
144 register_build_sink_inner(build_box_log_sinker, |_, _, _| {
145 unreachable!("no coordinator registered")
146 })
147}
148
149fn build_box_coordinator(
150 db: DatabaseConnection,
151 sink_param: SinkParam,
152 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
153) -> BoxCoordinator {
154 (get_registry()
155 .build_box_sink
156 .lock()
157 .as_mut()
158 .expect("should not be empty")
159 .1)(db, sink_param, iceberg_compact_stat_sender)
160}
161
162async fn build_box_log_sinker(
163 param: SinkParam,
164 writer_param: SinkWriterParam,
165) -> crate::sink::Result<BoxLogSinker> {
166 let future = (get_registry()
167 .build_box_sink
168 .lock()
169 .as_mut()
170 .expect("should not be empty")
171 .0)(param, writer_param);
172 future.await
173}