risingwave_connector/sink/
utils.rs1use risingwave_common::array::StreamChunk;
16use serde_json::Value;
17
18use super::encoder::{JsonEncoder, RowEncoder};
19use crate::sink::Result;
20
21pub fn chunk_to_json(chunk: StreamChunk, encoder: &JsonEncoder) -> Result<Vec<String>> {
22 let mut records: Vec<String> = Vec::with_capacity(chunk.capacity());
23 for (_, row) in chunk.rows() {
24 let record = Value::Object(encoder.encode(row)?);
25
26 records.push(record.to_string());
27 }
28
29 Ok(records)
30}
31
32pub(crate) mod dummy {
34
35 use std::collections::BTreeMap;
36 use std::fmt::{Debug, Formatter};
37 use std::marker::PhantomData;
38
39 use anyhow::anyhow;
40 use phf::{Set, phf_set};
41 use risingwave_pb::connector_service::SinkMetadata;
42 use sea_orm::DatabaseConnection;
43 use tokio::sync::mpsc::UnboundedSender;
44
45 use crate::connector_common::IcebergSinkCompactionUpdate;
46 use crate::enforce_secret::EnforceSecret;
47 use crate::error::ConnectorResult;
48 use crate::sink::prelude::*;
49 use crate::sink::{
50 LogSinker, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkLogReader,
51 };
52
53 pub fn err_feature_not_enabled(sink_name: &'static str) -> SinkError {
54 SinkError::Config(anyhow!(
55 "RisingWave is not compiled with feature `sink-{}`",
56 sink_name
57 ))
58 }
59
60 pub trait FeatureNotEnabledSinkMarker: Send + 'static {
62 const SINK_NAME: &'static str;
63 }
64
65 pub struct FeatureNotEnabledCoordinator<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
66 #[async_trait::async_trait]
67 impl<S: FeatureNotEnabledSinkMarker> SinkCommitCoordinator for FeatureNotEnabledCoordinator<S> {
68 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
69 Err(err_feature_not_enabled(S::SINK_NAME))
70 }
71
72 async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
73 Err(err_feature_not_enabled(S::SINK_NAME))
74 }
75 }
76
77 pub struct FeatureNotEnabledLogSinker<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
78 #[async_trait::async_trait]
79 impl<S: FeatureNotEnabledSinkMarker> LogSinker for FeatureNotEnabledLogSinker<S> {
80 async fn consume_log_and_sink(self, _log_reader: impl SinkLogReader) -> Result<!> {
81 Err(err_feature_not_enabled(S::SINK_NAME))
82 }
83 }
84
85 pub struct FeatureNotEnabledSink<S: FeatureNotEnabledSinkMarker>(PhantomData<S>);
86
87 impl<S: FeatureNotEnabledSinkMarker> Debug for FeatureNotEnabledSink<S> {
88 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
89 f.debug_struct("FeatureNotEnabledSink")
90 .field("sink_name", &S::SINK_NAME)
91 .finish()
92 }
93 }
94
95 impl<S: FeatureNotEnabledSinkMarker> TryFrom<SinkParam> for FeatureNotEnabledSink<S> {
96 type Error = SinkError;
97
98 fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
99 Err(err_feature_not_enabled(S::SINK_NAME))
100 }
101 }
102
103 impl<S: FeatureNotEnabledSinkMarker> EnforceSecret for FeatureNotEnabledSink<S> {
104 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {};
105
106 fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> ConnectorResult<()> {
107 Err(err_feature_not_enabled(S::SINK_NAME).into())
108 }
109
110 fn enforce_one(_prop: &str) -> ConnectorResult<()> {
111 Err(err_feature_not_enabled(S::SINK_NAME).into())
112 }
113 }
114
115 impl<S: FeatureNotEnabledSinkMarker> Sink for FeatureNotEnabledSink<S> {
116 type Coordinator = FeatureNotEnabledCoordinator<S>;
117 type LogSinker = FeatureNotEnabledLogSinker<S>;
118
119 const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &[];
120 const SINK_NAME: &'static str = S::SINK_NAME;
121
122 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
123 Err(err_feature_not_enabled(S::SINK_NAME))
124 }
125
126 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
127 Err(err_feature_not_enabled(S::SINK_NAME))
128 }
129
130 async fn validate(&self) -> Result<()> {
131 Err(err_feature_not_enabled(S::SINK_NAME))
132 }
133
134 fn is_coordinated_sink(&self) -> bool {
135 true
136 }
137
138 async fn new_coordinator(
139 &self,
140 _db: DatabaseConnection,
141 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
142 ) -> Result<Self::Coordinator> {
143 Err(err_feature_not_enabled(S::SINK_NAME))
144 }
145 }
146}