risingwave_connector/sink/
utils.rs1use risingwave_common::array::StreamChunk;
16use serde_json::Value;
17
18use super::encoder::{JsonEncoder, RowEncoder};
19use crate::sink::Result;
20
21pub fn chunk_to_json(chunk: StreamChunk, encoder: &JsonEncoder) -> Result<Vec<String>> {
22 let mut records: Vec<String> = Vec::with_capacity(chunk.capacity());
23 for (_, row) in chunk.rows() {
24 let record = Value::Object(encoder.encode(row)?);
25
26 records.push(record.to_string());
27 }
28
29 Ok(records)
30}
31
32pub(crate) mod dummy {
34
35 use std::collections::BTreeMap;
36 use std::fmt::{Debug, Formatter};
37 use std::marker::PhantomData;
38
39 use anyhow::anyhow;
40 use phf::{Set, phf_set};
41 use tokio::sync::mpsc::UnboundedSender;
42
43 use crate::connector_common::IcebergSinkCompactionUpdate;
44 use crate::enforce_secret::EnforceSecret;
45 use crate::error::ConnectorResult;
46 use crate::sink::prelude::*;
47 use crate::sink::{LogSinker, SinkCommitCoordinator, SinkLogReader};
48
49 #[allow(dead_code)]
50 pub fn err_feature_not_enabled(sink_name: &'static str) -> SinkError {
51 SinkError::Config(anyhow!(
52 "RisingWave is not compiled with feature `sink-{}`",
53 sink_name
54 ))
55 }
56
57 pub trait FeatureNotEnabledSinkMarker: Send + 'static {
59 #[allow(dead_code)]
60 const SINK_NAME: &'static str;
61 }
62
63 #[allow(dead_code)]
65 pub struct FeatureNotEnabledLogSinker<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
66 #[async_trait::async_trait]
67 impl<S: FeatureNotEnabledSinkMarker> LogSinker for FeatureNotEnabledLogSinker<S> {
68 async fn consume_log_and_sink(self, _log_reader: impl SinkLogReader) -> Result<!> {
69 Err(err_feature_not_enabled(S::SINK_NAME))
70 }
71 }
72
73 #[allow(dead_code)]
75 pub struct FeatureNotEnabledSink<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
76
77 impl<S: FeatureNotEnabledSinkMarker> Debug for FeatureNotEnabledSink<S> {
78 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct("FeatureNotEnabledSink")
80 .field("sink_name", &S::SINK_NAME)
81 .finish()
82 }
83 }
84
85 impl<S: FeatureNotEnabledSinkMarker> TryFrom<SinkParam> for FeatureNotEnabledSink<S> {
86 type Error = SinkError;
87
88 fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
89 Err(err_feature_not_enabled(S::SINK_NAME))
90 }
91 }
92
93 impl<S: FeatureNotEnabledSinkMarker> EnforceSecret for FeatureNotEnabledSink<S> {
94 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
95
96 fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
97 Err(err_feature_not_enabled(S::SINK_NAME).into())
98 }
99
100 fn enforce_one(_prop: &str) -> ConnectorResult<()> {
101 Err(err_feature_not_enabled(S::SINK_NAME).into())
102 }
103 }
104
105 impl<S: FeatureNotEnabledSinkMarker> Sink for FeatureNotEnabledSink<S> {
106 type LogSinker = FeatureNotEnabledLogSinker<S>;
107
108 const SINK_NAME: &'static str = S::SINK_NAME;
109
110 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
111 Err(err_feature_not_enabled(S::SINK_NAME))
112 }
113
114 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
115 Err(err_feature_not_enabled(S::SINK_NAME))
116 }
117
118 async fn validate(&self) -> Result<()> {
119 Err(err_feature_not_enabled(S::SINK_NAME))
120 }
121
122 fn is_coordinated_sink(&self) -> bool {
123 true
124 }
125
126 async fn new_coordinator(
127 &self,
128 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
129 ) -> Result<SinkCommitCoordinator> {
130 Err(err_feature_not_enabled(S::SINK_NAME))
131 }
132 }
133}
134
135macro_rules! feature_gated_sink_mod {
140 ($mod_name:ident, $sink_name:literal) => {
141 crate::sink::utils::feature_gated_sink_mod!($mod_name, $mod_name, $sink_name);
142 };
143 ($mod_name:ident, $struct_prefix:ident, $sink_name:literal) => {
144 paste::paste! {
145 #[cfg(feature = "sink-" $sink_name)]
146 pub mod $mod_name;
147 #[cfg(not(feature = "sink-" $sink_name))]
148 pub mod $mod_name {
149 use crate::sink::utils::dummy::{FeatureNotEnabledSinkMarker, FeatureNotEnabledSink};
150 pub struct [<$struct_prefix:camel NotEnabled>];
151 pub const [<$sink_name:upper _SINK>]: &'static str = $sink_name;
152 impl FeatureNotEnabledSinkMarker for [<$struct_prefix:camel NotEnabled>] {
153 const SINK_NAME: &'static str = [<$sink_name:upper _SINK>];
154 }
155 #[doc = "A dummy sink that always returns an error, as the feature `sink-" $sink_name "` is currently not enabled."]
156 pub type [<$struct_prefix:camel Sink>] = FeatureNotEnabledSink<[<$struct_prefix:camel NotEnabled>]>;
157 #[doc = "A dummy sink config that is empty, as the feature `sink-" $sink_name "` is currently not enabled."]
158 pub struct [<$struct_prefix:camel Config>];
159 }
160 }
161 };
162}
163pub(super) use feature_gated_sink_mod;