risingwave_connector/sink/
utils.rs1use risingwave_common::array::StreamChunk;
16use serde_json::Value;
17
18use super::encoder::{JsonEncoder, RowEncoder};
19use crate::sink::Result;
20
21pub fn chunk_to_json(chunk: StreamChunk, encoder: &JsonEncoder) -> Result<Vec<String>> {
22 let mut records: Vec<String> = Vec::with_capacity(chunk.capacity());
23 for (_, row) in chunk.rows() {
24 let record = Value::Object(encoder.encode(row)?);
25
26 records.push(record.to_string());
27 }
28
29 Ok(records)
30}
31
32pub(crate) mod dummy {
34
35 use std::collections::BTreeMap;
36 use std::fmt::{Debug, Formatter};
37 use std::marker::PhantomData;
38
39 use anyhow::anyhow;
40 use phf::{Set, phf_set};
41 use risingwave_common::catalog::Field;
42 use risingwave_pb::connector_service::SinkMetadata;
43 use sea_orm::DatabaseConnection;
44 use tokio::sync::mpsc::UnboundedSender;
45
46 use crate::connector_common::IcebergSinkCompactionUpdate;
47 use crate::enforce_secret::EnforceSecret;
48 use crate::error::ConnectorResult;
49 use crate::sink::prelude::*;
50 use crate::sink::{
51 LogSinker, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkLogReader,
52 };
53
54 pub fn err_feature_not_enabled(sink_name: &'static str) -> SinkError {
55 SinkError::Config(anyhow!(
56 "RisingWave is not compiled with feature `sink-{}`",
57 sink_name
58 ))
59 }
60
61 pub trait FeatureNotEnabledSinkMarker: Send + 'static {
63 const SINK_NAME: &'static str;
64 }
65
66 pub struct FeatureNotEnabledCoordinator<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
67 #[async_trait::async_trait]
68 impl<S: FeatureNotEnabledSinkMarker> SinkCommitCoordinator for FeatureNotEnabledCoordinator<S> {
69 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
70 Err(err_feature_not_enabled(S::SINK_NAME))
71 }
72
73 async fn commit(
74 &mut self,
75 _epoch: u64,
76 _metadata: Vec<SinkMetadata>,
77 _add_columns: Option<Vec<Field>>,
78 ) -> Result<()> {
79 Err(err_feature_not_enabled(S::SINK_NAME))
80 }
81 }
82
83 pub struct FeatureNotEnabledLogSinker<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
84 #[async_trait::async_trait]
85 impl<S: FeatureNotEnabledSinkMarker> LogSinker for FeatureNotEnabledLogSinker<S> {
86 async fn consume_log_and_sink(self, _log_reader: impl SinkLogReader) -> Result<!> {
87 Err(err_feature_not_enabled(S::SINK_NAME))
88 }
89 }
90
91 #[allow(dead_code)]
92 pub struct FeatureNotEnabledSink<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
93
94 impl<S: FeatureNotEnabledSinkMarker> Debug for FeatureNotEnabledSink<S> {
95 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96 f.debug_struct("FeatureNotEnabledSink")
97 .field("sink_name", &S::SINK_NAME)
98 .finish()
99 }
100 }
101
102 impl<S: FeatureNotEnabledSinkMarker> TryFrom<SinkParam> for FeatureNotEnabledSink<S> {
103 type Error = SinkError;
104
105 fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
106 Err(err_feature_not_enabled(S::SINK_NAME))
107 }
108 }
109
110 impl<S: FeatureNotEnabledSinkMarker> EnforceSecret for FeatureNotEnabledSink<S> {
111 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
112
113 fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
114 Err(err_feature_not_enabled(S::SINK_NAME).into())
115 }
116
117 fn enforce_one(_prop: &str) -> ConnectorResult<()> {
118 Err(err_feature_not_enabled(S::SINK_NAME).into())
119 }
120 }
121
122 impl<S: FeatureNotEnabledSinkMarker> Sink for FeatureNotEnabledSink<S> {
123 type Coordinator = FeatureNotEnabledCoordinator<S>;
124 type LogSinker = FeatureNotEnabledLogSinker<S>;
125
126 const SINK_NAME: &'static str = S::SINK_NAME;
127
128 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
129 Err(err_feature_not_enabled(S::SINK_NAME))
130 }
131
132 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
133 Err(err_feature_not_enabled(S::SINK_NAME))
134 }
135
136 async fn validate(&self) -> Result<()> {
137 Err(err_feature_not_enabled(S::SINK_NAME))
138 }
139
140 fn is_coordinated_sink(&self) -> bool {
141 true
142 }
143
144 async fn new_coordinator(
145 &self,
146 _db: DatabaseConnection,
147 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
148 ) -> Result<Self::Coordinator> {
149 Err(err_feature_not_enabled(S::SINK_NAME))
150 }
151 }
152}