risingwave_connector/sink/
test_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, OnceLock};
16
17use anyhow::anyhow;
18use futures::future::BoxFuture;
19use parking_lot::Mutex;
20use sea_orm::DatabaseConnection;
21
22use crate::enforce_secret::EnforceSecret;
23use crate::sink::boxed::{BoxCoordinator, BoxLogSinker};
24use crate::sink::{Sink, SinkError, SinkParam, SinkWriterParam};
25
26pub trait BuildBoxLogSinkerTrait = FnMut(SinkParam, SinkWriterParam) -> BoxFuture<'static, crate::sink::Result<BoxLogSinker>>
27    + Send
28    + 'static;
29pub trait BuildBoxCoordinatorTrait = FnMut(DatabaseConnection) -> BoxCoordinator + Send + 'static;
30
31type BuildBoxLogSinker = Box<dyn BuildBoxLogSinkerTrait>;
32type BuildBoxCoordinator = Box<dyn BuildBoxCoordinatorTrait>;
33pub const TEST_SINK_NAME: &str = "test";
34
35#[derive(Debug)]
36pub struct TestSink {
37    param: SinkParam,
38}
39
40impl EnforceSecret for TestSink {}
41
42impl TryFrom<SinkParam> for TestSink {
43    type Error = SinkError;
44
45    fn try_from(param: SinkParam) -> Result<Self, Self::Error> {
46        if cfg!(any(madsim, test)) {
47            Ok(TestSink { param })
48        } else {
49            Err(SinkError::Config(anyhow!("test sink only support in test")))
50        }
51    }
52}
53
54impl Sink for TestSink {
55    type Coordinator = BoxCoordinator;
56    type LogSinker = BoxLogSinker;
57
58    const SINK_NAME: &'static str = "test";
59
60    async fn validate(&self) -> crate::sink::Result<()> {
61        Ok(())
62    }
63
64    async fn new_log_sinker(
65        &self,
66        writer_param: SinkWriterParam,
67    ) -> crate::sink::Result<Self::LogSinker> {
68        build_box_log_sinker(self.param.clone(), writer_param).await
69    }
70
71    async fn new_coordinator(
72        &self,
73        db: DatabaseConnection,
74    ) -> crate::sink::Result<Self::Coordinator> {
75        Ok(build_box_coordinator(db))
76    }
77}
78
79struct TestSinkRegistry {
80    build_box_sink: Arc<Mutex<Option<(BuildBoxLogSinker, BuildBoxCoordinator)>>>,
81}
82
83impl TestSinkRegistry {
84    fn new() -> Self {
85        TestSinkRegistry {
86            build_box_sink: Arc::new(Mutex::new(None)),
87        }
88    }
89}
90
91fn get_registry() -> &'static TestSinkRegistry {
92    static GLOBAL_REGISTRY: OnceLock<TestSinkRegistry> = OnceLock::new();
93    GLOBAL_REGISTRY.get_or_init(TestSinkRegistry::new)
94}
95
96pub struct TestSinkRegistryGuard;
97
98impl Drop for TestSinkRegistryGuard {
99    fn drop(&mut self) {
100        assert!(get_registry().build_box_sink.lock().take().is_some());
101    }
102}
103
104fn register_build_sink_inner(
105    build_box_log_sinker: impl BuildBoxLogSinkerTrait,
106    build_box_coordinator: impl BuildBoxCoordinatorTrait,
107) -> TestSinkRegistryGuard {
108    assert!(
109        get_registry()
110            .build_box_sink
111            .lock()
112            .replace((
113                Box::new(build_box_log_sinker),
114                Box::new(build_box_coordinator)
115            ))
116            .is_none()
117    );
118    TestSinkRegistryGuard
119}
120
121pub fn register_build_coordinated_sink(
122    build_box_log_sinker: impl BuildBoxLogSinkerTrait,
123    build_box_coordinator: impl BuildBoxCoordinatorTrait,
124) -> TestSinkRegistryGuard {
125    register_build_sink_inner(build_box_log_sinker, build_box_coordinator)
126}
127
128pub fn register_build_sink(
129    build_box_log_sinker: impl BuildBoxLogSinkerTrait,
130) -> TestSinkRegistryGuard {
131    register_build_sink_inner(build_box_log_sinker, |_| {
132        unreachable!("no coordinator registered")
133    })
134}
135
136fn build_box_coordinator(db: DatabaseConnection) -> BoxCoordinator {
137    (get_registry()
138        .build_box_sink
139        .lock()
140        .as_mut()
141        .expect("should not be empty")
142        .1)(db)
143}
144
145async fn build_box_log_sinker(
146    param: SinkParam,
147    writer_param: SinkWriterParam,
148) -> crate::sink::Result<BoxLogSinker> {
149    let future = (get_registry()
150        .build_box_sink
151        .lock()
152        .as_mut()
153        .expect("should not be empty")
154        .0)(param, writer_param);
155    future.await
156}