risingwave_connector/sink/
test_sink.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, OnceLock};
16
17use anyhow::anyhow;
18use futures::future::BoxFuture;
19use parking_lot::Mutex;
20use tokio::sync::mpsc::UnboundedSender;
21
22use crate::connector_common::IcebergSinkCompactionUpdate;
23use crate::enforce_secret::EnforceSecret;
24use crate::sink::boxed::BoxLogSinker;
25use crate::sink::{Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam};
26
27pub trait BuildBoxLogSinkerTrait = FnMut(SinkParam, SinkWriterParam) -> BoxFuture<'static, crate::sink::Result<BoxLogSinker>>
28    + Send
29    + 'static;
30pub trait BuildSinkCoordinatorTrait = FnMut(SinkParam, Option<UnboundedSender<IcebergSinkCompactionUpdate>>) -> SinkCommitCoordinator
31    + Send
32    + 'static;
33
34type BuildBoxLogSinker = Box<dyn BuildBoxLogSinkerTrait>;
35type BuildSinkCoordinator = Box<dyn BuildSinkCoordinatorTrait>;
36pub const TEST_SINK_NAME: &str = "test";
37
38#[derive(Debug)]
39pub struct TestSink {
40    param: SinkParam,
41}
42
43impl EnforceSecret for TestSink {}
44
45impl TryFrom<SinkParam> for TestSink {
46    type Error = SinkError;
47
48    fn try_from(param: SinkParam) -> Result<Self, Self::Error> {
49        if cfg!(any(madsim, test)) {
50            Ok(TestSink { param })
51        } else {
52            Err(SinkError::Config(anyhow!("test sink only support in test")))
53        }
54    }
55}
56
57impl Sink for TestSink {
58    type LogSinker = BoxLogSinker;
59
60    const SINK_NAME: &'static str = "test";
61
62    fn support_schema_change() -> bool {
63        true
64    }
65
66    async fn validate(&self) -> crate::sink::Result<()> {
67        Ok(())
68    }
69
70    async fn new_log_sinker(
71        &self,
72        writer_param: SinkWriterParam,
73    ) -> crate::sink::Result<Self::LogSinker> {
74        build_box_log_sinker(self.param.clone(), writer_param).await
75    }
76
77    async fn new_coordinator(
78        &self,
79        iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
80    ) -> crate::sink::Result<SinkCommitCoordinator> {
81        Ok(build_sink_coordinator(
82            self.param.clone(),
83            iceberg_compact_stat_sender,
84        ))
85    }
86}
87
88struct TestSinkRegistry {
89    build_box_sink: Arc<Mutex<Option<(BuildBoxLogSinker, BuildSinkCoordinator)>>>,
90}
91
92impl TestSinkRegistry {
93    fn new() -> Self {
94        TestSinkRegistry {
95            build_box_sink: Arc::new(Mutex::new(None)),
96        }
97    }
98}
99
100fn get_registry() -> &'static TestSinkRegistry {
101    static GLOBAL_REGISTRY: OnceLock<TestSinkRegistry> = OnceLock::new();
102    GLOBAL_REGISTRY.get_or_init(TestSinkRegistry::new)
103}
104
105pub struct TestSinkRegistryGuard;
106
107impl Drop for TestSinkRegistryGuard {
108    fn drop(&mut self) {
109        assert!(get_registry().build_box_sink.lock().take().is_some());
110    }
111}
112
113fn register_build_sink_inner(
114    build_box_log_sinker: impl BuildBoxLogSinkerTrait,
115    build_box_coordinator: impl BuildSinkCoordinatorTrait,
116) -> TestSinkRegistryGuard {
117    assert!(
118        get_registry()
119            .build_box_sink
120            .lock()
121            .replace((
122                Box::new(build_box_log_sinker),
123                Box::new(build_box_coordinator)
124            ))
125            .is_none()
126    );
127    TestSinkRegistryGuard
128}
129
130pub fn register_build_coordinated_sink(
131    build_box_log_sinker: impl BuildBoxLogSinkerTrait,
132    build_box_coordinator: impl BuildSinkCoordinatorTrait,
133) -> TestSinkRegistryGuard {
134    register_build_sink_inner(build_box_log_sinker, build_box_coordinator)
135}
136
137pub fn register_build_sink(
138    build_box_log_sinker: impl BuildBoxLogSinkerTrait,
139) -> TestSinkRegistryGuard {
140    register_build_sink_inner(build_box_log_sinker, |_, _| {
141        unreachable!("no coordinator registered")
142    })
143}
144
145fn build_sink_coordinator(
146    sink_param: SinkParam,
147    iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
148) -> SinkCommitCoordinator {
149    (get_registry()
150        .build_box_sink
151        .lock()
152        .as_mut()
153        .expect("should not be empty")
154        .1)(sink_param, iceberg_compact_stat_sender)
155}
156
157async fn build_box_log_sinker(
158    param: SinkParam,
159    writer_param: SinkWriterParam,
160) -> crate::sink::Result<BoxLogSinker> {
161    let future = (get_registry()
162        .build_box_sink
163        .lock()
164        .as_mut()
165        .expect("should not be empty")
166        .0)(param, writer_param);
167    future.await
168}