risingwave_connector/sink/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::StreamChunk;
16use serde_json::Value;
17
18use super::encoder::{JsonEncoder, RowEncoder};
19use crate::sink::Result;
20
21pub fn chunk_to_json(chunk: StreamChunk, encoder: &JsonEncoder) -> Result<Vec<String>> {
22    let mut records: Vec<String> = Vec::with_capacity(chunk.capacity());
23    for (_, row) in chunk.rows() {
24        let record = Value::Object(encoder.encode(row)?);
25
26        records.push(record.to_string());
27    }
28
29    Ok(records)
30}
31
32/// Dummy trait implementation for a sink when the feature is not enabled at compile time.
33pub(crate) mod dummy {
34
35    use std::collections::BTreeMap;
36    use std::fmt::{Debug, Formatter};
37    use std::marker::PhantomData;
38
39    use anyhow::anyhow;
40    use phf::{Set, phf_set};
41    use risingwave_common::catalog::Field;
42    use risingwave_pb::connector_service::SinkMetadata;
43    use sea_orm::DatabaseConnection;
44    use tokio::sync::mpsc::UnboundedSender;
45
46    use crate::connector_common::IcebergSinkCompactionUpdate;
47    use crate::enforce_secret::EnforceSecret;
48    use crate::error::ConnectorResult;
49    use crate::sink::prelude::*;
50    use crate::sink::{
51        LogSinker, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkLogReader,
52    };
53
54    #[allow(dead_code)]
55    pub fn err_feature_not_enabled(sink_name: &'static str) -> SinkError {
56        SinkError::Config(anyhow!(
57            "RisingWave is not compiled with feature `sink-{}`",
58            sink_name
59        ))
60    }
61
62    /// Implement this trait will bring a dummy `impl Sink` for the type which always returns an error.
63    pub trait FeatureNotEnabledSinkMarker: Send + 'static {
64        #[allow(dead_code)]
65        const SINK_NAME: &'static str;
66    }
67
68    #[allow(dead_code)]
69    pub struct FeatureNotEnabledCoordinator<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
70    #[async_trait::async_trait]
71    impl<S: FeatureNotEnabledSinkMarker> SinkCommitCoordinator for FeatureNotEnabledCoordinator<S> {
72        async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
73            Err(err_feature_not_enabled(S::SINK_NAME))
74        }
75
76        async fn commit(
77            &mut self,
78            _epoch: u64,
79            _metadata: Vec<SinkMetadata>,
80            _add_columns: Option<Vec<Field>>,
81        ) -> Result<()> {
82            Err(err_feature_not_enabled(S::SINK_NAME))
83        }
84    }
85
86    #[allow(dead_code)]
87    pub struct FeatureNotEnabledLogSinker<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
88    #[async_trait::async_trait]
89    impl<S: FeatureNotEnabledSinkMarker> LogSinker for FeatureNotEnabledLogSinker<S> {
90        async fn consume_log_and_sink(self, _log_reader: impl SinkLogReader) -> Result<!> {
91            Err(err_feature_not_enabled(S::SINK_NAME))
92        }
93    }
94
95    #[allow(dead_code)]
96    pub struct FeatureNotEnabledSink<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
97
98    impl<S: FeatureNotEnabledSinkMarker> Debug for FeatureNotEnabledSink<S> {
99        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
100            f.debug_struct("FeatureNotEnabledSink")
101                .field("sink_name", &S::SINK_NAME)
102                .finish()
103        }
104    }
105
106    impl<S: FeatureNotEnabledSinkMarker> TryFrom<SinkParam> for FeatureNotEnabledSink<S> {
107        type Error = SinkError;
108
109        fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
110            Err(err_feature_not_enabled(S::SINK_NAME))
111        }
112    }
113
114    impl<S: FeatureNotEnabledSinkMarker> EnforceSecret for FeatureNotEnabledSink<S> {
115        const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
116
117        fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
118            Err(err_feature_not_enabled(S::SINK_NAME).into())
119        }
120
121        fn enforce_one(_prop: &str) -> ConnectorResult<()> {
122            Err(err_feature_not_enabled(S::SINK_NAME).into())
123        }
124    }
125
126    impl<S: FeatureNotEnabledSinkMarker> Sink for FeatureNotEnabledSink<S> {
127        type Coordinator = FeatureNotEnabledCoordinator<S>;
128        type LogSinker = FeatureNotEnabledLogSinker<S>;
129
130        const SINK_NAME: &'static str = S::SINK_NAME;
131
132        async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
133            Err(err_feature_not_enabled(S::SINK_NAME))
134        }
135
136        fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
137            Err(err_feature_not_enabled(S::SINK_NAME))
138        }
139
140        async fn validate(&self) -> Result<()> {
141            Err(err_feature_not_enabled(S::SINK_NAME))
142        }
143
144        fn is_coordinated_sink(&self) -> bool {
145            true
146        }
147
148        async fn new_coordinator(
149            &self,
150            _db: DatabaseConnection,
151            _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
152        ) -> Result<Self::Coordinator> {
153            Err(err_feature_not_enabled(S::SINK_NAME))
154        }
155    }
156}