risingwave_connector/sink/
test_sink.rs1use std::sync::{Arc, OnceLock};
16
17use anyhow::anyhow;
18use futures::future::BoxFuture;
19use parking_lot::Mutex;
20use sea_orm::DatabaseConnection;
21
22use crate::enforce_secret::EnforceSecret;
23use crate::sink::boxed::{BoxCoordinator, BoxLogSinker};
24use crate::sink::{Sink, SinkError, SinkParam, SinkWriterParam};
25
26pub trait BuildBoxLogSinkerTrait = FnMut(SinkParam, SinkWriterParam) -> BoxFuture<'static, crate::sink::Result<BoxLogSinker>>
27 + Send
28 + 'static;
29pub trait BuildBoxCoordinatorTrait = FnMut(DatabaseConnection) -> BoxCoordinator + Send + 'static;
30
31type BuildBoxLogSinker = Box<dyn BuildBoxLogSinkerTrait>;
32type BuildBoxCoordinator = Box<dyn BuildBoxCoordinatorTrait>;
33pub const TEST_SINK_NAME: &str = "test";
34
35#[derive(Debug)]
36pub struct TestSink {
37 param: SinkParam,
38}
39
40impl EnforceSecret for TestSink {}
41
42impl TryFrom<SinkParam> for TestSink {
43 type Error = SinkError;
44
45 fn try_from(param: SinkParam) -> Result<Self, Self::Error> {
46 if cfg!(any(madsim, test)) {
47 Ok(TestSink { param })
48 } else {
49 Err(SinkError::Config(anyhow!("test sink only support in test")))
50 }
51 }
52}
53
54impl Sink for TestSink {
55 type Coordinator = BoxCoordinator;
56 type LogSinker = BoxLogSinker;
57
58 const SINK_NAME: &'static str = "test";
59
60 async fn validate(&self) -> crate::sink::Result<()> {
61 Ok(())
62 }
63
64 async fn new_log_sinker(
65 &self,
66 writer_param: SinkWriterParam,
67 ) -> crate::sink::Result<Self::LogSinker> {
68 build_box_log_sinker(self.param.clone(), writer_param).await
69 }
70
71 async fn new_coordinator(
72 &self,
73 db: DatabaseConnection,
74 ) -> crate::sink::Result<Self::Coordinator> {
75 Ok(build_box_coordinator(db))
76 }
77}
78
79struct TestSinkRegistry {
80 build_box_sink: Arc<Mutex<Option<(BuildBoxLogSinker, BuildBoxCoordinator)>>>,
81}
82
83impl TestSinkRegistry {
84 fn new() -> Self {
85 TestSinkRegistry {
86 build_box_sink: Arc::new(Mutex::new(None)),
87 }
88 }
89}
90
91fn get_registry() -> &'static TestSinkRegistry {
92 static GLOBAL_REGISTRY: OnceLock<TestSinkRegistry> = OnceLock::new();
93 GLOBAL_REGISTRY.get_or_init(TestSinkRegistry::new)
94}
95
96pub struct TestSinkRegistryGuard;
97
98impl Drop for TestSinkRegistryGuard {
99 fn drop(&mut self) {
100 assert!(get_registry().build_box_sink.lock().take().is_some());
101 }
102}
103
104fn register_build_sink_inner(
105 build_box_log_sinker: impl BuildBoxLogSinkerTrait,
106 build_box_coordinator: impl BuildBoxCoordinatorTrait,
107) -> TestSinkRegistryGuard {
108 assert!(
109 get_registry()
110 .build_box_sink
111 .lock()
112 .replace((
113 Box::new(build_box_log_sinker),
114 Box::new(build_box_coordinator)
115 ))
116 .is_none()
117 );
118 TestSinkRegistryGuard
119}
120
121pub fn register_build_coordinated_sink(
122 build_box_log_sinker: impl BuildBoxLogSinkerTrait,
123 build_box_coordinator: impl BuildBoxCoordinatorTrait,
124) -> TestSinkRegistryGuard {
125 register_build_sink_inner(build_box_log_sinker, build_box_coordinator)
126}
127
128pub fn register_build_sink(
129 build_box_log_sinker: impl BuildBoxLogSinkerTrait,
130) -> TestSinkRegistryGuard {
131 register_build_sink_inner(build_box_log_sinker, |_| {
132 unreachable!("no coordinator registered")
133 })
134}
135
136fn build_box_coordinator(db: DatabaseConnection) -> BoxCoordinator {
137 (get_registry()
138 .build_box_sink
139 .lock()
140 .as_mut()
141 .expect("should not be empty")
142 .1)(db)
143}
144
145async fn build_box_log_sinker(
146 param: SinkParam,
147 writer_param: SinkWriterParam,
148) -> crate::sink::Result<BoxLogSinker> {
149 let future = (get_registry()
150 .build_box_sink
151 .lock()
152 .as_mut()
153 .expect("should not be empty")
154 .0)(param, writer_param);
155 future.await
156}