risingwave_connector/sink/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::StreamChunk;
16use serde_json::Value;
17
18use super::encoder::{JsonEncoder, RowEncoder};
19use crate::sink::Result;
20
21pub fn chunk_to_json(chunk: StreamChunk, encoder: &JsonEncoder) -> Result<Vec<String>> {
22    let mut records: Vec<String> = Vec::with_capacity(chunk.capacity());
23    for (_, row) in chunk.rows() {
24        let record = Value::Object(encoder.encode(row)?);
25
26        records.push(record.to_string());
27    }
28
29    Ok(records)
30}
31
32/// Dummy trait implementation for a sink when the feature is not enabled at compile time.
33pub(crate) mod dummy {
34
35    use std::collections::BTreeMap;
36    use std::fmt::{Debug, Formatter};
37    use std::marker::PhantomData;
38
39    use anyhow::anyhow;
40    use phf::{Set, phf_set};
41    use tokio::sync::mpsc::UnboundedSender;
42
43    use crate::connector_common::IcebergSinkCompactionUpdate;
44    use crate::enforce_secret::EnforceSecret;
45    use crate::error::ConnectorResult;
46    use crate::sink::prelude::*;
47    use crate::sink::{LogSinker, SinkCommitCoordinator, SinkLogReader};
48
49    #[allow(dead_code)]
50    pub fn err_feature_not_enabled(sink_name: &'static str) -> SinkError {
51        SinkError::Config(anyhow!(
52            "RisingWave is not compiled with feature `sink-{}`",
53            sink_name
54        ))
55    }
56
57    /// Implement this trait will bring a dummy `impl Sink` for the type which always returns an error.
58    pub trait FeatureNotEnabledSinkMarker: Send + 'static {
59        #[allow(dead_code)]
60        const SINK_NAME: &'static str;
61    }
62
63    /// A dummy coordinator that always returns an error.
64    #[allow(dead_code)]
65    pub struct FeatureNotEnabledLogSinker<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
66    #[async_trait::async_trait]
67    impl<S: FeatureNotEnabledSinkMarker> LogSinker for FeatureNotEnabledLogSinker<S> {
68        async fn consume_log_and_sink(self, _log_reader: impl SinkLogReader) -> Result<!> {
69            Err(err_feature_not_enabled(S::SINK_NAME))
70        }
71    }
72
73    /// A dummy sink that always returns an error.
74    #[allow(dead_code)]
75    pub struct FeatureNotEnabledSink<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
76
77    impl<S: FeatureNotEnabledSinkMarker> Debug for FeatureNotEnabledSink<S> {
78        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
79            f.debug_struct("FeatureNotEnabledSink")
80                .field("sink_name", &S::SINK_NAME)
81                .finish()
82        }
83    }
84
85    impl<S: FeatureNotEnabledSinkMarker> TryFrom<SinkParam> for FeatureNotEnabledSink<S> {
86        type Error = SinkError;
87
88        fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
89            Err(err_feature_not_enabled(S::SINK_NAME))
90        }
91    }
92
93    impl<S: FeatureNotEnabledSinkMarker> EnforceSecret for FeatureNotEnabledSink<S> {
94        const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
95
96        fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
97            Err(err_feature_not_enabled(S::SINK_NAME).into())
98        }
99
100        fn enforce_one(_prop: &str) -> ConnectorResult<()> {
101            Err(err_feature_not_enabled(S::SINK_NAME).into())
102        }
103    }
104
105    impl<S: FeatureNotEnabledSinkMarker> Sink for FeatureNotEnabledSink<S> {
106        type LogSinker = FeatureNotEnabledLogSinker<S>;
107
108        const SINK_NAME: &'static str = S::SINK_NAME;
109
110        async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
111            Err(err_feature_not_enabled(S::SINK_NAME))
112        }
113
114        fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
115            Err(err_feature_not_enabled(S::SINK_NAME))
116        }
117
118        async fn validate(&self) -> Result<()> {
119            Err(err_feature_not_enabled(S::SINK_NAME))
120        }
121
122        fn is_coordinated_sink(&self) -> bool {
123            true
124        }
125
126        async fn new_coordinator(
127            &self,
128            _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
129        ) -> Result<SinkCommitCoordinator> {
130            Err(err_feature_not_enabled(S::SINK_NAME))
131        }
132    }
133}
134
135/// Define a sink module that is gated by a feature.
136///
137/// This is to allow some heavy or unpopular sink implementations (and their dependencies) to be disabled
138/// at compile time, in order to decrease compilation time and binary size.
139macro_rules! feature_gated_sink_mod {
140    ($mod_name:ident, $sink_name:literal) => {
141        crate::sink::utils::feature_gated_sink_mod!($mod_name, $mod_name, $sink_name);
142    };
143    ($mod_name:ident, $struct_prefix:ident, $sink_name:literal) => {
144        paste::paste! {
145        #[cfg(feature = "sink-" $sink_name)]
146        pub mod $mod_name;
147        #[cfg(not(feature = "sink-" $sink_name))]
148        pub mod $mod_name {
149            use crate::sink::utils::dummy::{FeatureNotEnabledSinkMarker, FeatureNotEnabledSink};
150            pub struct [<$struct_prefix:camel NotEnabled>];
151            pub const [<$sink_name:upper _SINK>]: &'static str = $sink_name;
152            impl FeatureNotEnabledSinkMarker for [<$struct_prefix:camel NotEnabled>] {
153                const SINK_NAME: &'static str = [<$sink_name:upper _SINK>];
154            }
155            #[doc = "A dummy sink that always returns an error, as the feature `sink-" $sink_name "` is currently not enabled."]
156            pub type [<$struct_prefix:camel Sink>] = FeatureNotEnabledSink<[<$struct_prefix:camel NotEnabled>]>;
157            #[doc = "A dummy sink config that is empty, as the feature `sink-" $sink_name "` is currently not enabled."]
158            pub struct [<$struct_prefix:camel Config>];
159        }
160        }
161    };
162}
163pub(super) use feature_gated_sink_mod;