risingwave_connector/sink/
utils.rs1use risingwave_common::array::StreamChunk;
16use serde_json::Value;
17
18use super::encoder::{JsonEncoder, RowEncoder};
19use crate::sink::Result;
20
21pub fn chunk_to_json(chunk: StreamChunk, encoder: &JsonEncoder) -> Result<Vec<String>> {
22 let mut records: Vec<String> = Vec::with_capacity(chunk.capacity());
23 for (_, row) in chunk.rows() {
24 let record = Value::Object(encoder.encode(row)?);
25
26 records.push(record.to_string());
27 }
28
29 Ok(records)
30}
31
32pub(crate) mod dummy {
34
35 use std::collections::BTreeMap;
36 use std::fmt::{Debug, Formatter};
37 use std::marker::PhantomData;
38
39 use anyhow::anyhow;
40 use phf::{Set, phf_set};
41 use risingwave_common::catalog::Field;
42 use risingwave_pb::connector_service::SinkMetadata;
43 use sea_orm::DatabaseConnection;
44 use tokio::sync::mpsc::UnboundedSender;
45
46 use crate::connector_common::IcebergSinkCompactionUpdate;
47 use crate::enforce_secret::EnforceSecret;
48 use crate::error::ConnectorResult;
49 use crate::sink::prelude::*;
50 use crate::sink::{
51 LogSinker, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkLogReader,
52 };
53
54 #[allow(dead_code)]
55 pub fn err_feature_not_enabled(sink_name: &'static str) -> SinkError {
56 SinkError::Config(anyhow!(
57 "RisingWave is not compiled with feature `sink-{}`",
58 sink_name
59 ))
60 }
61
62 pub trait FeatureNotEnabledSinkMarker: Send + 'static {
64 #[allow(dead_code)]
65 const SINK_NAME: &'static str;
66 }
67
68 #[allow(dead_code)]
70 pub struct FeatureNotEnabledCoordinator<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
71 #[async_trait::async_trait]
72 impl<S: FeatureNotEnabledSinkMarker> SinkCommitCoordinator for FeatureNotEnabledCoordinator<S> {
73 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
74 Err(err_feature_not_enabled(S::SINK_NAME))
75 }
76
77 async fn commit(
78 &mut self,
79 _epoch: u64,
80 _metadata: Vec<SinkMetadata>,
81 _add_columns: Option<Vec<Field>>,
82 ) -> Result<()> {
83 Err(err_feature_not_enabled(S::SINK_NAME))
84 }
85 }
86
87 #[allow(dead_code)]
89 pub struct FeatureNotEnabledLogSinker<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
90 #[async_trait::async_trait]
91 impl<S: FeatureNotEnabledSinkMarker> LogSinker for FeatureNotEnabledLogSinker<S> {
92 async fn consume_log_and_sink(self, _log_reader: impl SinkLogReader) -> Result<!> {
93 Err(err_feature_not_enabled(S::SINK_NAME))
94 }
95 }
96
97 #[allow(dead_code)]
99 pub struct FeatureNotEnabledSink<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
100
101 impl<S: FeatureNotEnabledSinkMarker> Debug for FeatureNotEnabledSink<S> {
102 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103 f.debug_struct("FeatureNotEnabledSink")
104 .field("sink_name", &S::SINK_NAME)
105 .finish()
106 }
107 }
108
109 impl<S: FeatureNotEnabledSinkMarker> TryFrom<SinkParam> for FeatureNotEnabledSink<S> {
110 type Error = SinkError;
111
112 fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
113 Err(err_feature_not_enabled(S::SINK_NAME))
114 }
115 }
116
117 impl<S: FeatureNotEnabledSinkMarker> EnforceSecret for FeatureNotEnabledSink<S> {
118 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
119
120 fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
121 Err(err_feature_not_enabled(S::SINK_NAME).into())
122 }
123
124 fn enforce_one(_prop: &str) -> ConnectorResult<()> {
125 Err(err_feature_not_enabled(S::SINK_NAME).into())
126 }
127 }
128
129 impl<S: FeatureNotEnabledSinkMarker> Sink for FeatureNotEnabledSink<S> {
130 type Coordinator = FeatureNotEnabledCoordinator<S>;
131 type LogSinker = FeatureNotEnabledLogSinker<S>;
132
133 const SINK_NAME: &'static str = S::SINK_NAME;
134
135 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
136 Err(err_feature_not_enabled(S::SINK_NAME))
137 }
138
139 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
140 Err(err_feature_not_enabled(S::SINK_NAME))
141 }
142
143 async fn validate(&self) -> Result<()> {
144 Err(err_feature_not_enabled(S::SINK_NAME))
145 }
146
147 fn is_coordinated_sink(&self) -> bool {
148 true
149 }
150
151 async fn new_coordinator(
152 &self,
153 _db: DatabaseConnection,
154 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
155 ) -> Result<Self::Coordinator> {
156 Err(err_feature_not_enabled(S::SINK_NAME))
157 }
158 }
159}
160
161macro_rules! feature_gated_sink_mod {
166 ($mod_name:ident, $sink_name:literal) => {
167 crate::sink::utils::feature_gated_sink_mod!($mod_name, $mod_name, $sink_name);
168 };
169 ($mod_name:ident, $struct_prefix:ident, $sink_name:literal) => {
170 paste::paste! {
171 #[cfg(feature = "sink-" $sink_name)]
172 pub mod $mod_name;
173 #[cfg(not(feature = "sink-" $sink_name))]
174 pub mod $mod_name {
175 use crate::sink::utils::dummy::{FeatureNotEnabledSinkMarker, FeatureNotEnabledSink};
176 pub struct [<$struct_prefix:camel NotEnabled>];
177 pub const [<$sink_name:upper _SINK>]: &'static str = $sink_name;
178 impl FeatureNotEnabledSinkMarker for [<$struct_prefix:camel NotEnabled>] {
179 const SINK_NAME: &'static str = [<$sink_name:upper _SINK>];
180 }
181 #[doc = "A dummy sink that always returns an error, as the feature `sink-" $sink_name "` is currently not enabled."]
182 pub type [<$struct_prefix:camel Sink>] = FeatureNotEnabledSink<[<$struct_prefix:camel NotEnabled>]>;
183 #[doc = "A dummy sink config that is empty, as the feature `sink-" $sink_name "` is currently not enabled."]
184 pub struct [<$struct_prefix:camel Config>];
185 }
186 }
187 };
188}
189pub(super) use feature_gated_sink_mod;