risingwave_connector/sink/
utils.rs1use risingwave_common::array::StreamChunk;
16use serde_json::Value;
17
18use super::encoder::{JsonEncoder, RowEncoder};
19use crate::sink::Result;
20
21pub fn chunk_to_json(chunk: StreamChunk, encoder: &JsonEncoder) -> Result<Vec<String>> {
22 let mut records: Vec<String> = Vec::with_capacity(chunk.capacity());
23 for (_, row) in chunk.rows() {
24 let record = Value::Object(encoder.encode(row)?);
25
26 records.push(record.to_string());
27 }
28
29 Ok(records)
30}
31
32pub(crate) mod dummy {
34
35 use std::collections::BTreeMap;
36 use std::fmt::{Debug, Formatter};
37 use std::marker::PhantomData;
38
39 use anyhow::anyhow;
40 use phf::{Set, phf_set};
41 use risingwave_common::catalog::Field;
42 use risingwave_pb::connector_service::SinkMetadata;
43 use sea_orm::DatabaseConnection;
44 use tokio::sync::mpsc::UnboundedSender;
45
46 use crate::connector_common::IcebergSinkCompactionUpdate;
47 use crate::enforce_secret::EnforceSecret;
48 use crate::error::ConnectorResult;
49 use crate::sink::prelude::*;
50 use crate::sink::{
51 LogSinker, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkLogReader,
52 };
53
54 #[allow(dead_code)]
55 pub fn err_feature_not_enabled(sink_name: &'static str) -> SinkError {
56 SinkError::Config(anyhow!(
57 "RisingWave is not compiled with feature `sink-{}`",
58 sink_name
59 ))
60 }
61
62 pub trait FeatureNotEnabledSinkMarker: Send + 'static {
64 #[allow(dead_code)]
65 const SINK_NAME: &'static str;
66 }
67
68 #[allow(dead_code)]
69 pub struct FeatureNotEnabledCoordinator<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
70 #[async_trait::async_trait]
71 impl<S: FeatureNotEnabledSinkMarker> SinkCommitCoordinator for FeatureNotEnabledCoordinator<S> {
72 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
73 Err(err_feature_not_enabled(S::SINK_NAME))
74 }
75
76 async fn commit(
77 &mut self,
78 _epoch: u64,
79 _metadata: Vec<SinkMetadata>,
80 _add_columns: Option<Vec<Field>>,
81 ) -> Result<()> {
82 Err(err_feature_not_enabled(S::SINK_NAME))
83 }
84 }
85
86 #[allow(dead_code)]
87 pub struct FeatureNotEnabledLogSinker<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
88 #[async_trait::async_trait]
89 impl<S: FeatureNotEnabledSinkMarker> LogSinker for FeatureNotEnabledLogSinker<S> {
90 async fn consume_log_and_sink(self, _log_reader: impl SinkLogReader) -> Result<!> {
91 Err(err_feature_not_enabled(S::SINK_NAME))
92 }
93 }
94
95 #[allow(dead_code)]
96 pub struct FeatureNotEnabledSink<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
97
98 impl<S: FeatureNotEnabledSinkMarker> Debug for FeatureNotEnabledSink<S> {
99 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct("FeatureNotEnabledSink")
101 .field("sink_name", &S::SINK_NAME)
102 .finish()
103 }
104 }
105
106 impl<S: FeatureNotEnabledSinkMarker> TryFrom<SinkParam> for FeatureNotEnabledSink<S> {
107 type Error = SinkError;
108
109 fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
110 Err(err_feature_not_enabled(S::SINK_NAME))
111 }
112 }
113
114 impl<S: FeatureNotEnabledSinkMarker> EnforceSecret for FeatureNotEnabledSink<S> {
115 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
116
117 fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
118 Err(err_feature_not_enabled(S::SINK_NAME).into())
119 }
120
121 fn enforce_one(_prop: &str) -> ConnectorResult<()> {
122 Err(err_feature_not_enabled(S::SINK_NAME).into())
123 }
124 }
125
126 impl<S: FeatureNotEnabledSinkMarker> Sink for FeatureNotEnabledSink<S> {
127 type Coordinator = FeatureNotEnabledCoordinator<S>;
128 type LogSinker = FeatureNotEnabledLogSinker<S>;
129
130 const SINK_NAME: &'static str = S::SINK_NAME;
131
132 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
133 Err(err_feature_not_enabled(S::SINK_NAME))
134 }
135
136 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
137 Err(err_feature_not_enabled(S::SINK_NAME))
138 }
139
140 async fn validate(&self) -> Result<()> {
141 Err(err_feature_not_enabled(S::SINK_NAME))
142 }
143
144 fn is_coordinated_sink(&self) -> bool {
145 true
146 }
147
148 async fn new_coordinator(
149 &self,
150 _db: DatabaseConnection,
151 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
152 ) -> Result<Self::Coordinator> {
153 Err(err_feature_not_enabled(S::SINK_NAME))
154 }
155 }
156}