risingwave_connector/sink/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::StreamChunk;
16use serde_json::Value;
17
18use super::encoder::{JsonEncoder, RowEncoder};
19use crate::sink::Result;
20
21pub fn chunk_to_json(chunk: StreamChunk, encoder: &JsonEncoder) -> Result<Vec<String>> {
22    let mut records: Vec<String> = Vec::with_capacity(chunk.capacity());
23    for (_, row) in chunk.rows() {
24        let record = Value::Object(encoder.encode(row)?);
25
26        records.push(record.to_string());
27    }
28
29    Ok(records)
30}
31
32/// Dummy trait implementation for a sink when the feature is not enabled at compile time.
33pub(crate) mod dummy {
34
35    use std::collections::BTreeMap;
36    use std::fmt::{Debug, Formatter};
37    use std::marker::PhantomData;
38
39    use anyhow::anyhow;
40    use phf::{Set, phf_set};
41    use risingwave_pb::connector_service::SinkMetadata;
42    use sea_orm::DatabaseConnection;
43    use tokio::sync::mpsc::UnboundedSender;
44
45    use crate::connector_common::IcebergSinkCompactionUpdate;
46    use crate::enforce_secret::EnforceSecret;
47    use crate::error::ConnectorResult;
48    use crate::sink::prelude::*;
49    use crate::sink::{
50        LogSinker, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkLogReader,
51    };
52
53    pub fn err_feature_not_enabled(sink_name: &'static str) -> SinkError {
54        SinkError::Config(anyhow!(
55            "RisingWave is not compiled with feature `sink-{}`",
56            sink_name
57        ))
58    }
59
60    /// Implement this trait will bring a dummy `impl Sink` for the type which always returns an error.
61    pub trait FeatureNotEnabledSinkMarker: Send + 'static {
62        const SINK_NAME: &'static str;
63    }
64
65    pub struct FeatureNotEnabledCoordinator<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
66    #[async_trait::async_trait]
67    impl<S: FeatureNotEnabledSinkMarker> SinkCommitCoordinator for FeatureNotEnabledCoordinator<S> {
68        async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
69            Err(err_feature_not_enabled(S::SINK_NAME))
70        }
71
72        async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
73            Err(err_feature_not_enabled(S::SINK_NAME))
74        }
75    }
76
77    pub struct FeatureNotEnabledLogSinker<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
78    #[async_trait::async_trait]
79    impl<S: FeatureNotEnabledSinkMarker> LogSinker for FeatureNotEnabledLogSinker<S> {
80        async fn consume_log_and_sink(self, _log_reader: impl SinkLogReader) -> Result<!> {
81            Err(err_feature_not_enabled(S::SINK_NAME))
82        }
83    }
84
85    pub struct FeatureNotEnabledSink<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
86
87    impl<S: FeatureNotEnabledSinkMarker> Debug for FeatureNotEnabledSink<S> {
88        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
89            f.debug_struct("FeatureNotEnabledSink")
90                .field("sink_name", &S::SINK_NAME)
91                .finish()
92        }
93    }
94
95    impl<S: FeatureNotEnabledSinkMarker> TryFrom<SinkParam> for FeatureNotEnabledSink<S> {
96        type Error = SinkError;
97
98        fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
99            Err(err_feature_not_enabled(S::SINK_NAME))
100        }
101    }
102
103    impl<S: FeatureNotEnabledSinkMarker> EnforceSecret for FeatureNotEnabledSink<S> {
104        const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
105
106        fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
107            Err(err_feature_not_enabled(S::SINK_NAME).into())
108        }
109
110        fn enforce_one(_prop: &str) -> ConnectorResult<()> {
111            Err(err_feature_not_enabled(S::SINK_NAME).into())
112        }
113    }
114
115    impl<S: FeatureNotEnabledSinkMarker> Sink for FeatureNotEnabledSink<S> {
116        type Coordinator = FeatureNotEnabledCoordinator<S>;
117        type LogSinker = FeatureNotEnabledLogSinker<S>;
118
119        const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &[];
120        const SINK_NAME: &'static str = S::SINK_NAME;
121
122        async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
123            Err(err_feature_not_enabled(S::SINK_NAME))
124        }
125
126        fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
127            Err(err_feature_not_enabled(S::SINK_NAME))
128        }
129
130        async fn validate(&self) -> Result<()> {
131            Err(err_feature_not_enabled(S::SINK_NAME))
132        }
133
134        fn is_coordinated_sink(&self) -> bool {
135            true
136        }
137
138        async fn new_coordinator(
139            &self,
140            _db: DatabaseConnection,
141            _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
142        ) -> Result<Self::Coordinator> {
143            Err(err_feature_not_enabled(S::SINK_NAME))
144        }
145    }
146}