risingwave_connector/sink/
test_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, OnceLock};
16
17use anyhow::anyhow;
18use parking_lot::Mutex;
19use sea_orm::DatabaseConnection;
20
21use crate::sink::boxed::{BoxCoordinator, BoxLogSinker};
22use crate::sink::{Sink, SinkError, SinkParam, SinkWriterParam};
23
24pub trait BuildBoxLogSinkerTrait =
25    FnMut(SinkParam, SinkWriterParam) -> BoxLogSinker + Send + 'static;
26pub trait BuildBoxCoordinatorTrait = FnMut(DatabaseConnection) -> BoxCoordinator + Send + 'static;
27
28type BuildBoxLogSinker = Box<dyn BuildBoxLogSinkerTrait>;
29type BuildBoxCoordinator = Box<dyn BuildBoxCoordinatorTrait>;
30pub const TEST_SINK_NAME: &str = "test";
31
32#[derive(Debug)]
33pub struct TestSink {
34    param: SinkParam,
35}
36
37impl TryFrom<SinkParam> for TestSink {
38    type Error = SinkError;
39
40    fn try_from(param: SinkParam) -> Result<Self, Self::Error> {
41        if cfg!(any(madsim, test)) {
42            Ok(TestSink { param })
43        } else {
44            Err(SinkError::Config(anyhow!("test sink only support in test")))
45        }
46    }
47}
48
49impl Sink for TestSink {
50    type Coordinator = BoxCoordinator;
51    type LogSinker = BoxLogSinker;
52
53    const SINK_NAME: &'static str = "test";
54
55    async fn validate(&self) -> crate::sink::Result<()> {
56        Ok(())
57    }
58
59    async fn new_log_sinker(
60        &self,
61        writer_param: SinkWriterParam,
62    ) -> crate::sink::Result<Self::LogSinker> {
63        Ok(build_box_log_sinker(self.param.clone(), writer_param))
64    }
65
66    async fn new_coordinator(
67        &self,
68        db: DatabaseConnection,
69    ) -> crate::sink::Result<Self::Coordinator> {
70        Ok(build_box_coordinator(db))
71    }
72}
73
74struct TestSinkRegistry {
75    build_box_sink: Arc<Mutex<Option<(BuildBoxLogSinker, BuildBoxCoordinator)>>>,
76}
77
78impl TestSinkRegistry {
79    fn new() -> Self {
80        TestSinkRegistry {
81            build_box_sink: Arc::new(Mutex::new(None)),
82        }
83    }
84}
85
86fn get_registry() -> &'static TestSinkRegistry {
87    static GLOBAL_REGISTRY: OnceLock<TestSinkRegistry> = OnceLock::new();
88    GLOBAL_REGISTRY.get_or_init(TestSinkRegistry::new)
89}
90
91pub struct TestSinkRegistryGuard;
92
93impl Drop for TestSinkRegistryGuard {
94    fn drop(&mut self) {
95        assert!(get_registry().build_box_sink.lock().take().is_some());
96    }
97}
98
99fn registry_build_sink_inner(
100    build_box_log_sinker: impl BuildBoxLogSinkerTrait,
101    build_box_coordinator: impl BuildBoxCoordinatorTrait,
102) -> TestSinkRegistryGuard {
103    assert!(
104        get_registry()
105            .build_box_sink
106            .lock()
107            .replace((
108                Box::new(build_box_log_sinker),
109                Box::new(build_box_coordinator)
110            ))
111            .is_none()
112    );
113    TestSinkRegistryGuard
114}
115
116pub fn registry_build_sink(
117    build_box_log_sinker: impl BuildBoxLogSinkerTrait,
118) -> TestSinkRegistryGuard {
119    registry_build_sink_inner(build_box_log_sinker, |_| {
120        unreachable!("no coordinator registered")
121    })
122}
123
124fn build_box_coordinator(db: DatabaseConnection) -> BoxCoordinator {
125    (get_registry()
126        .build_box_sink
127        .lock()
128        .as_mut()
129        .expect("should not be empty")
130        .1)(db)
131}
132
133fn build_box_log_sinker(param: SinkParam, writer_param: SinkWriterParam) -> BoxLogSinker {
134    (get_registry()
135        .build_box_sink
136        .lock()
137        .as_mut()
138        .expect("should not be empty")
139        .0)(param, writer_param)
140}