risingwave_connector/sink/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::StreamChunk;
16use serde_json::Value;
17
18use super::encoder::{JsonEncoder, RowEncoder};
19use crate::sink::Result;
20
21pub fn chunk_to_json(chunk: StreamChunk, encoder: &JsonEncoder) -> Result<Vec<String>> {
22    let mut records: Vec<String> = Vec::with_capacity(chunk.capacity());
23    for (_, row) in chunk.rows() {
24        let record = Value::Object(encoder.encode(row)?);
25
26        records.push(record.to_string());
27    }
28
29    Ok(records)
30}
31
32/// Dummy trait implementation for a sink when the feature is not enabled at compile time.
33pub(crate) mod dummy {
34
35    use std::collections::BTreeMap;
36    use std::fmt::{Debug, Formatter};
37    use std::marker::PhantomData;
38
39    use anyhow::anyhow;
40    use phf::{Set, phf_set};
41    use risingwave_common::catalog::Field;
42    use risingwave_pb::connector_service::SinkMetadata;
43    use sea_orm::DatabaseConnection;
44    use tokio::sync::mpsc::UnboundedSender;
45
46    use crate::connector_common::IcebergSinkCompactionUpdate;
47    use crate::enforce_secret::EnforceSecret;
48    use crate::error::ConnectorResult;
49    use crate::sink::prelude::*;
50    use crate::sink::{
51        LogSinker, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkLogReader,
52    };
53
54    #[allow(dead_code)]
55    pub fn err_feature_not_enabled(sink_name: &'static str) -> SinkError {
56        SinkError::Config(anyhow!(
57            "RisingWave is not compiled with feature `sink-{}`",
58            sink_name
59        ))
60    }
61
62    /// Implement this trait will bring a dummy `impl Sink` for the type which always returns an error.
63    pub trait FeatureNotEnabledSinkMarker: Send + 'static {
64        #[allow(dead_code)]
65        const SINK_NAME: &'static str;
66    }
67
68    /// A dummy coordinator that always returns an error.
69    #[allow(dead_code)]
70    pub struct FeatureNotEnabledCoordinator<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
71    #[async_trait::async_trait]
72    impl<S: FeatureNotEnabledSinkMarker> SinkCommitCoordinator for FeatureNotEnabledCoordinator<S> {
73        async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
74            Err(err_feature_not_enabled(S::SINK_NAME))
75        }
76
77        async fn commit(
78            &mut self,
79            _epoch: u64,
80            _metadata: Vec<SinkMetadata>,
81            _add_columns: Option<Vec<Field>>,
82        ) -> Result<()> {
83            Err(err_feature_not_enabled(S::SINK_NAME))
84        }
85    }
86
87    /// A dummy log sinker that always returns an error.
88    #[allow(dead_code)]
89    pub struct FeatureNotEnabledLogSinker<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
90    #[async_trait::async_trait]
91    impl<S: FeatureNotEnabledSinkMarker> LogSinker for FeatureNotEnabledLogSinker<S> {
92        async fn consume_log_and_sink(self, _log_reader: impl SinkLogReader) -> Result<!> {
93            Err(err_feature_not_enabled(S::SINK_NAME))
94        }
95    }
96
97    /// A dummy sink that always returns an error.
98    #[allow(dead_code)]
99    pub struct FeatureNotEnabledSink<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
100
101    impl<S: FeatureNotEnabledSinkMarker> Debug for FeatureNotEnabledSink<S> {
102        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103            f.debug_struct("FeatureNotEnabledSink")
104                .field("sink_name", &S::SINK_NAME)
105                .finish()
106        }
107    }
108
109    impl<S: FeatureNotEnabledSinkMarker> TryFrom<SinkParam> for FeatureNotEnabledSink<S> {
110        type Error = SinkError;
111
112        fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
113            Err(err_feature_not_enabled(S::SINK_NAME))
114        }
115    }
116
117    impl<S: FeatureNotEnabledSinkMarker> EnforceSecret for FeatureNotEnabledSink<S> {
118        const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
119
120        fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
121            Err(err_feature_not_enabled(S::SINK_NAME).into())
122        }
123
124        fn enforce_one(_prop: &str) -> ConnectorResult<()> {
125            Err(err_feature_not_enabled(S::SINK_NAME).into())
126        }
127    }
128
129    impl<S: FeatureNotEnabledSinkMarker> Sink for FeatureNotEnabledSink<S> {
130        type Coordinator = FeatureNotEnabledCoordinator<S>;
131        type LogSinker = FeatureNotEnabledLogSinker<S>;
132
133        const SINK_NAME: &'static str = S::SINK_NAME;
134
135        async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
136            Err(err_feature_not_enabled(S::SINK_NAME))
137        }
138
139        fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
140            Err(err_feature_not_enabled(S::SINK_NAME))
141        }
142
143        async fn validate(&self) -> Result<()> {
144            Err(err_feature_not_enabled(S::SINK_NAME))
145        }
146
147        fn is_coordinated_sink(&self) -> bool {
148            true
149        }
150
151        async fn new_coordinator(
152            &self,
153            _db: DatabaseConnection,
154            _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
155        ) -> Result<Self::Coordinator> {
156            Err(err_feature_not_enabled(S::SINK_NAME))
157        }
158    }
159}
160
161/// Define a sink module that is gated by a feature.
162///
163/// This is to allow some heavy or unpopular sink implementations (and their dependencies) to be disabled
164/// at compile time, in order to decrease compilation time and binary size.
165macro_rules! feature_gated_sink_mod {
166    ($mod_name:ident, $sink_name:literal) => {
167        crate::sink::utils::feature_gated_sink_mod!($mod_name, $mod_name, $sink_name);
168    };
169    ($mod_name:ident, $struct_prefix:ident, $sink_name:literal) => {
170        paste::paste! {
171        #[cfg(feature = "sink-" $sink_name)]
172        pub mod $mod_name;
173        #[cfg(not(feature = "sink-" $sink_name))]
174        pub mod $mod_name {
175            use crate::sink::utils::dummy::{FeatureNotEnabledSinkMarker, FeatureNotEnabledSink};
176            pub struct [<$struct_prefix:camel NotEnabled>];
177            pub const [<$sink_name:upper _SINK>]: &'static str = $sink_name;
178            impl FeatureNotEnabledSinkMarker for [<$struct_prefix:camel NotEnabled>] {
179                const SINK_NAME: &'static str = [<$sink_name:upper _SINK>];
180            }
181            #[doc = "A dummy sink that always returns an error, as the feature `sink-" $sink_name "` is currently not enabled."]
182            pub type [<$struct_prefix:camel Sink>] = FeatureNotEnabledSink<[<$struct_prefix:camel NotEnabled>]>;
183            #[doc = "A dummy sink config that is empty, as the feature `sink-" $sink_name "` is currently not enabled."]
184            pub struct [<$struct_prefix:camel Config>];
185        }
186        }
187    };
188}
189pub(super) use feature_gated_sink_mod;