risingwave_connector/sink/
test_sink.rs1use std::sync::{Arc, OnceLock};
16
17use anyhow::anyhow;
18use futures::future::BoxFuture;
19use parking_lot::Mutex;
20use tokio::sync::mpsc::UnboundedSender;
21
22use crate::connector_common::IcebergSinkCompactionUpdate;
23use crate::enforce_secret::EnforceSecret;
24use crate::sink::boxed::BoxLogSinker;
25use crate::sink::{Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam};
26
27pub trait BuildBoxLogSinkerTrait = FnMut(SinkParam, SinkWriterParam) -> BoxFuture<'static, crate::sink::Result<BoxLogSinker>>
28 + Send
29 + 'static;
30pub trait BuildSinkCoordinatorTrait = FnMut(SinkParam, Option<UnboundedSender<IcebergSinkCompactionUpdate>>) -> SinkCommitCoordinator
31 + Send
32 + 'static;
33
34type BuildBoxLogSinker = Box<dyn BuildBoxLogSinkerTrait>;
35type BuildSinkCoordinator = Box<dyn BuildSinkCoordinatorTrait>;
36pub const TEST_SINK_NAME: &str = "test";
37
38#[derive(Debug)]
39pub struct TestSink {
40 param: SinkParam,
41}
42
43impl EnforceSecret for TestSink {}
44
45impl TryFrom<SinkParam> for TestSink {
46 type Error = SinkError;
47
48 fn try_from(param: SinkParam) -> Result<Self, Self::Error> {
49 if cfg!(any(madsim, test)) {
50 Ok(TestSink { param })
51 } else {
52 Err(SinkError::Config(anyhow!("test sink only support in test")))
53 }
54 }
55}
56
57impl Sink for TestSink {
58 type LogSinker = BoxLogSinker;
59
60 const SINK_NAME: &'static str = "test";
61
62 async fn validate(&self) -> crate::sink::Result<()> {
63 Ok(())
64 }
65
66 async fn new_log_sinker(
67 &self,
68 writer_param: SinkWriterParam,
69 ) -> crate::sink::Result<Self::LogSinker> {
70 build_box_log_sinker(self.param.clone(), writer_param).await
71 }
72
73 async fn new_coordinator(
74 &self,
75 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
76 ) -> crate::sink::Result<SinkCommitCoordinator> {
77 Ok(build_sink_coordinator(
78 self.param.clone(),
79 iceberg_compact_stat_sender,
80 ))
81 }
82}
83
84struct TestSinkRegistry {
85 build_box_sink: Arc<Mutex<Option<(BuildBoxLogSinker, BuildSinkCoordinator)>>>,
86}
87
88impl TestSinkRegistry {
89 fn new() -> Self {
90 TestSinkRegistry {
91 build_box_sink: Arc::new(Mutex::new(None)),
92 }
93 }
94}
95
96fn get_registry() -> &'static TestSinkRegistry {
97 static GLOBAL_REGISTRY: OnceLock<TestSinkRegistry> = OnceLock::new();
98 GLOBAL_REGISTRY.get_or_init(TestSinkRegistry::new)
99}
100
101pub struct TestSinkRegistryGuard;
102
103impl Drop for TestSinkRegistryGuard {
104 fn drop(&mut self) {
105 assert!(get_registry().build_box_sink.lock().take().is_some());
106 }
107}
108
109fn register_build_sink_inner(
110 build_box_log_sinker: impl BuildBoxLogSinkerTrait,
111 build_box_coordinator: impl BuildSinkCoordinatorTrait,
112) -> TestSinkRegistryGuard {
113 assert!(
114 get_registry()
115 .build_box_sink
116 .lock()
117 .replace((
118 Box::new(build_box_log_sinker),
119 Box::new(build_box_coordinator)
120 ))
121 .is_none()
122 );
123 TestSinkRegistryGuard
124}
125
126pub fn register_build_coordinated_sink(
127 build_box_log_sinker: impl BuildBoxLogSinkerTrait,
128 build_box_coordinator: impl BuildSinkCoordinatorTrait,
129) -> TestSinkRegistryGuard {
130 register_build_sink_inner(build_box_log_sinker, build_box_coordinator)
131}
132
133pub fn register_build_sink(
134 build_box_log_sinker: impl BuildBoxLogSinkerTrait,
135) -> TestSinkRegistryGuard {
136 register_build_sink_inner(build_box_log_sinker, |_, _| {
137 unreachable!("no coordinator registered")
138 })
139}
140
141fn build_sink_coordinator(
142 sink_param: SinkParam,
143 iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
144) -> SinkCommitCoordinator {
145 (get_registry()
146 .build_box_sink
147 .lock()
148 .as_mut()
149 .expect("should not be empty")
150 .1)(sink_param, iceberg_compact_stat_sender)
151}
152
153async fn build_box_log_sinker(
154 param: SinkParam,
155 writer_param: SinkWriterParam,
156) -> crate::sink::Result<BoxLogSinker> {
157 let future = (get_registry()
158 .build_box_sink
159 .lock()
160 .as_mut()
161 .expect("should not be empty")
162 .0)(param, writer_param);
163 future.await
164}