risingwave_connector/sink/
test_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, OnceLock};
16
17use anyhow::anyhow;
18use futures::future::BoxFuture;
19use parking_lot::Mutex;
20use sea_orm::DatabaseConnection;
21use tokio::sync::mpsc::UnboundedSender;
22
23use crate::connector_common::IcebergSinkCompactionUpdate;
24use crate::enforce_secret::EnforceSecret;
25use crate::sink::boxed::{BoxCoordinator, BoxLogSinker};
26use crate::sink::{Sink, SinkError, SinkParam, SinkWriterParam};
27
28pub trait BuildBoxLogSinkerTrait = FnMut(SinkParam, SinkWriterParam) -> BoxFuture<'static, crate::sink::Result<BoxLogSinker>>
29    + Send
30    + 'static;
31pub trait BuildBoxCoordinatorTrait = FnMut(
32        DatabaseConnection,
33        SinkParam,
34        Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
35    ) -> BoxCoordinator
36    + Send
37    + 'static;
38
39type BuildBoxLogSinker = Box<dyn BuildBoxLogSinkerTrait>;
40type BuildBoxCoordinator = Box<dyn BuildBoxCoordinatorTrait>;
41pub const TEST_SINK_NAME: &str = "test";
42
43#[derive(Debug)]
44pub struct TestSink {
45    param: SinkParam,
46}
47
48impl EnforceSecret for TestSink {}
49
50impl TryFrom<SinkParam> for TestSink {
51    type Error = SinkError;
52
53    fn try_from(param: SinkParam) -> Result<Self, Self::Error> {
54        if cfg!(any(madsim, test)) {
55            Ok(TestSink { param })
56        } else {
57            Err(SinkError::Config(anyhow!("test sink only support in test")))
58        }
59    }
60}
61
62impl Sink for TestSink {
63    type Coordinator = BoxCoordinator;
64    type LogSinker = BoxLogSinker;
65
66    const SINK_NAME: &'static str = "test";
67
68    async fn validate(&self) -> crate::sink::Result<()> {
69        Ok(())
70    }
71
72    async fn new_log_sinker(
73        &self,
74        writer_param: SinkWriterParam,
75    ) -> crate::sink::Result<Self::LogSinker> {
76        build_box_log_sinker(self.param.clone(), writer_param).await
77    }
78
79    async fn new_coordinator(
80        &self,
81        db: DatabaseConnection,
82        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
83    ) -> crate::sink::Result<Self::Coordinator> {
84        Ok(build_box_coordinator(
85            db,
86            self.param.clone(),
87            iceberg_compact_stat_sender,
88        ))
89    }
90}
91
92struct TestSinkRegistry {
93    build_box_sink: Arc<Mutex<Option<(BuildBoxLogSinker, BuildBoxCoordinator)>>>,
94}
95
96impl TestSinkRegistry {
97    fn new() -> Self {
98        TestSinkRegistry {
99            build_box_sink: Arc::new(Mutex::new(None)),
100        }
101    }
102}
103
104fn get_registry() -> &'static TestSinkRegistry {
105    static GLOBAL_REGISTRY: OnceLock<TestSinkRegistry> = OnceLock::new();
106    GLOBAL_REGISTRY.get_or_init(TestSinkRegistry::new)
107}
108
109pub struct TestSinkRegistryGuard;
110
111impl Drop for TestSinkRegistryGuard {
112    fn drop(&mut self) {
113        assert!(get_registry().build_box_sink.lock().take().is_some());
114    }
115}
116
117fn register_build_sink_inner(
118    build_box_log_sinker: impl BuildBoxLogSinkerTrait,
119    build_box_coordinator: impl BuildBoxCoordinatorTrait,
120) -> TestSinkRegistryGuard {
121    assert!(
122        get_registry()
123            .build_box_sink
124            .lock()
125            .replace((
126                Box::new(build_box_log_sinker),
127                Box::new(build_box_coordinator)
128            ))
129            .is_none()
130    );
131    TestSinkRegistryGuard
132}
133
134pub fn register_build_coordinated_sink(
135    build_box_log_sinker: impl BuildBoxLogSinkerTrait,
136    build_box_coordinator: impl BuildBoxCoordinatorTrait,
137) -> TestSinkRegistryGuard {
138    register_build_sink_inner(build_box_log_sinker, build_box_coordinator)
139}
140
141pub fn register_build_sink(
142    build_box_log_sinker: impl BuildBoxLogSinkerTrait,
143) -> TestSinkRegistryGuard {
144    register_build_sink_inner(build_box_log_sinker, |_, _, _| {
145        unreachable!("no coordinator registered")
146    })
147}
148
149fn build_box_coordinator(
150    db: DatabaseConnection,
151    sink_param: SinkParam,
152    iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
153) -> BoxCoordinator {
154    (get_registry()
155        .build_box_sink
156        .lock()
157        .as_mut()
158        .expect("should not be empty")
159        .1)(db, sink_param, iceberg_compact_stat_sender)
160}
161
162async fn build_box_log_sinker(
163    param: SinkParam,
164    writer_param: SinkWriterParam,
165) -> crate::sink::Result<BoxLogSinker> {
166    let future = (get_registry()
167        .build_box_sink
168        .lock()
169        .as_mut()
170        .expect("should not be empty")
171        .0)(param, writer_param);
172    future.await
173}