risingwave_connector/sink/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::StreamChunk;
16use serde_json::Value;
17
18use super::encoder::{JsonEncoder, RowEncoder};
19use crate::sink::Result;
20
21pub fn chunk_to_json(chunk: StreamChunk, encoder: &JsonEncoder) -> Result<Vec<String>> {
22    let mut records: Vec<String> = Vec::with_capacity(chunk.capacity());
23    for (_, row) in chunk.rows() {
24        let record = Value::Object(encoder.encode(row)?);
25
26        records.push(record.to_string());
27    }
28
29    Ok(records)
30}
31
32/// Dummy trait implementation for a sink when the feature is not enabled at compile time.
33pub(crate) mod dummy {
34
35    use std::collections::BTreeMap;
36    use std::fmt::{Debug, Formatter};
37    use std::marker::PhantomData;
38
39    use anyhow::anyhow;
40    use phf::{Set, phf_set};
41    use risingwave_common::catalog::Field;
42    use risingwave_pb::connector_service::SinkMetadata;
43    use sea_orm::DatabaseConnection;
44    use tokio::sync::mpsc::UnboundedSender;
45
46    use crate::connector_common::IcebergSinkCompactionUpdate;
47    use crate::enforce_secret::EnforceSecret;
48    use crate::error::ConnectorResult;
49    use crate::sink::prelude::*;
50    use crate::sink::{
51        LogSinker, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkLogReader,
52    };
53
54    pub fn err_feature_not_enabled(sink_name: &'static str) -> SinkError {
55        SinkError::Config(anyhow!(
56            "RisingWave is not compiled with feature `sink-{}`",
57            sink_name
58        ))
59    }
60
61    /// Implement this trait will bring a dummy `impl Sink` for the type which always returns an error.
62    pub trait FeatureNotEnabledSinkMarker: Send + 'static {
63        const SINK_NAME: &'static str;
64    }
65
66    pub struct FeatureNotEnabledCoordinator<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
67    #[async_trait::async_trait]
68    impl<S: FeatureNotEnabledSinkMarker> SinkCommitCoordinator for FeatureNotEnabledCoordinator<S> {
69        async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
70            Err(err_feature_not_enabled(S::SINK_NAME))
71        }
72
73        async fn commit(
74            &mut self,
75            _epoch: u64,
76            _metadata: Vec<SinkMetadata>,
77            _add_columns: Option<Vec<Field>>,
78        ) -> Result<()> {
79            Err(err_feature_not_enabled(S::SINK_NAME))
80        }
81    }
82
83    pub struct FeatureNotEnabledLogSinker<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
84    #[async_trait::async_trait]
85    impl<S: FeatureNotEnabledSinkMarker> LogSinker for FeatureNotEnabledLogSinker<S> {
86        async fn consume_log_and_sink(self, _log_reader: impl SinkLogReader) -> Result<!> {
87            Err(err_feature_not_enabled(S::SINK_NAME))
88        }
89    }
90
91    #[allow(dead_code)]
92    pub struct FeatureNotEnabledSink<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
93
94    impl<S: FeatureNotEnabledSinkMarker> Debug for FeatureNotEnabledSink<S> {
95        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96            f.debug_struct("FeatureNotEnabledSink")
97                .field("sink_name", &S::SINK_NAME)
98                .finish()
99        }
100    }
101
102    impl<S: FeatureNotEnabledSinkMarker> TryFrom<SinkParam> for FeatureNotEnabledSink<S> {
103        type Error = SinkError;
104
105        fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
106            Err(err_feature_not_enabled(S::SINK_NAME))
107        }
108    }
109
110    impl<S: FeatureNotEnabledSinkMarker> EnforceSecret for FeatureNotEnabledSink<S> {
111        const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
112
113        fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
114            Err(err_feature_not_enabled(S::SINK_NAME).into())
115        }
116
117        fn enforce_one(_prop: &str) -> ConnectorResult<()> {
118            Err(err_feature_not_enabled(S::SINK_NAME).into())
119        }
120    }
121
122    impl<S: FeatureNotEnabledSinkMarker> Sink for FeatureNotEnabledSink<S> {
123        type Coordinator = FeatureNotEnabledCoordinator<S>;
124        type LogSinker = FeatureNotEnabledLogSinker<S>;
125
126        const SINK_NAME: &'static str = S::SINK_NAME;
127
128        async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
129            Err(err_feature_not_enabled(S::SINK_NAME))
130        }
131
132        fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
133            Err(err_feature_not_enabled(S::SINK_NAME))
134        }
135
136        async fn validate(&self) -> Result<()> {
137            Err(err_feature_not_enabled(S::SINK_NAME))
138        }
139
140        fn is_coordinated_sink(&self) -> bool {
141            true
142        }
143
144        async fn new_coordinator(
145            &self,
146            _db: DatabaseConnection,
147            _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
148        ) -> Result<Self::Coordinator> {
149            Err(err_feature_not_enabled(S::SINK_NAME))
150        }
151    }
152}