1pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23 ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, Schema, SchemaId, StreamJobStatus,
24 TableId, UserId,
25};
26pub use risingwave_common::id::SinkId;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::sort_util::ColumnOrder;
29use risingwave_pb::catalog::{
30 PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
31};
32use risingwave_pb::secret::PbSecretRef;
33use serde::Serialize;
34
35use super::{
36 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
37 SINK_TYPE_UPSERT, SinkError,
38};
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
41pub enum SinkType {
42 AppendOnly,
45 Upsert,
48 Retract,
53}
54
55impl SinkType {
56 pub fn is_append_only(self) -> bool {
58 self == Self::AppendOnly
59 }
60
61 pub fn type_str(self) -> &'static str {
63 match self {
64 SinkType::AppendOnly => "append-only",
65 SinkType::Upsert => "upsert",
66 SinkType::Retract => "retract",
67 }
68 }
69
70 pub fn to_proto(self) -> PbSinkType {
71 match self {
72 SinkType::AppendOnly => PbSinkType::AppendOnly,
73 SinkType::Upsert => PbSinkType::Upsert,
74 SinkType::Retract => PbSinkType::Retract,
75 }
76 }
77
78 pub fn from_proto(pb: PbSinkType) -> Self {
79 match pb {
80 PbSinkType::AppendOnly => SinkType::AppendOnly,
81 #[expect(deprecated)]
84 PbSinkType::ForceAppendOnly => SinkType::AppendOnly,
85 PbSinkType::Upsert => SinkType::Upsert,
86 PbSinkType::Retract => SinkType::Retract,
87 PbSinkType::Unspecified => unreachable!(),
88 }
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, Hash)]
96pub struct SinkFormatDesc {
97 pub format: SinkFormat,
98 pub encode: SinkEncode,
99 pub options: BTreeMap<String, String>,
100 pub secret_refs: BTreeMap<String, PbSecretRef>,
101 pub key_encode: Option<SinkEncode>,
102 pub connection_id: Option<ConnectionId>,
103}
104
105#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
107pub enum SinkFormat {
108 AppendOnly,
109 Upsert,
110 Debezium,
111}
112
113impl Display for SinkFormat {
114 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115 write!(f, "{:?}", self)
116 }
117}
118
119#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
121pub enum SinkEncode {
122 Json,
123 Protobuf,
124 Avro,
125 Template,
126 Parquet,
127 Text,
128 Bytes,
129}
130
131impl Display for SinkEncode {
132 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
133 write!(f, "{:?}", self)
134 }
135}
136
137impl SinkFormatDesc {
138 pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
139 use crate::sink::Sink as _;
140 use crate::sink::kafka::KafkaSink;
141 use crate::sink::kinesis::KinesisSink;
142 use crate::sink::pulsar::PulsarSink;
143
144 let format = match r#type {
145 SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
146 SINK_TYPE_UPSERT => SinkFormat::Upsert,
147 SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
148 _ => {
149 return Err(SinkError::Config(anyhow!(
150 "sink type unsupported: {}",
151 r#type
152 )));
153 }
154 };
155 let encode = match connector {
156 KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
157 SinkEncode::Json
158 }
159 _ => return Ok(None),
160 };
161 Ok(Some(Self {
162 format,
163 encode,
164 options: Default::default(),
165 secret_refs: Default::default(),
166 key_encode: None,
167 connection_id: None,
168 }))
169 }
170
171 pub fn to_proto(&self) -> PbSinkFormatDesc {
172 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
173
174 let format = match self.format {
175 SinkFormat::AppendOnly => F::Plain,
176 SinkFormat::Upsert => F::Upsert,
177 SinkFormat::Debezium => F::Debezium,
178 };
179 let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
180 SinkEncode::Json => E::Json,
181 SinkEncode::Protobuf => E::Protobuf,
182 SinkEncode::Avro => E::Avro,
183 SinkEncode::Template => E::Template,
184 SinkEncode::Parquet => E::Parquet,
185 SinkEncode::Text => E::Text,
186 SinkEncode::Bytes => E::Bytes,
187 };
188
189 let encode = mapping_encode(&self.encode);
190 let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
191 let options = self
192 .options
193 .iter()
194 .map(|(k, v)| (k.clone(), v.clone()))
195 .collect();
196
197 PbSinkFormatDesc {
198 format: format.into(),
199 encode: encode.into(),
200 options,
201 key_encode,
202 secret_refs: self.secret_refs.clone(),
203 connection_id: self.connection_id,
204 }
205 }
206
207 pub fn plain_json_for_snowflake_only() -> Self {
210 Self {
211 format: SinkFormat::AppendOnly,
212 encode: SinkEncode::Json,
213 options: Default::default(),
214 secret_refs: Default::default(),
215 key_encode: None,
216 connection_id: None,
217 }
218 }
219}
220
221impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
222 type Error = SinkError;
223
224 fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
225 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
226
227 let format = match value.format() {
228 F::Plain => SinkFormat::AppendOnly,
229 F::Upsert => SinkFormat::Upsert,
230 F::Debezium => SinkFormat::Debezium,
231 f @ (F::Unspecified
232 | F::Native
233 | F::DebeziumMongo
234 | F::Maxwell
235 | F::Canal
236 | F::None) => {
237 return Err(SinkError::Config(anyhow!(
238 "sink format unsupported: {}",
239 f.as_str_name()
240 )));
241 }
242 };
243 let encode = match value.encode() {
244 E::Json => SinkEncode::Json,
245 E::Protobuf => SinkEncode::Protobuf,
246 E::Template => SinkEncode::Template,
247 E::Avro => SinkEncode::Avro,
248 E::Parquet => SinkEncode::Parquet,
249 E::Bytes => SinkEncode::Bytes,
250 e @ (E::Unspecified | E::Native | E::Csv | E::None | E::Text) => {
251 return Err(SinkError::Config(anyhow!(
252 "sink encode unsupported: {}",
253 e.as_str_name()
254 )));
255 }
256 };
257 let key_encode = match &value.key_encode() {
258 E::Bytes => Some(SinkEncode::Bytes),
259 E::Text => Some(SinkEncode::Text),
260 E::Unspecified => None,
261 encode @ (E::Avro
262 | E::Csv
263 | E::Json
264 | E::Protobuf
265 | E::Template
266 | E::Native
267 | E::Parquet
268 | E::None) => {
269 return Err(SinkError::Config(anyhow!(
270 "unsupported {} as sink key encode",
271 encode.as_str_name()
272 )));
273 }
274 };
275
276 Ok(Self {
277 format,
278 encode,
279 options: value.options,
280 key_encode,
281 secret_refs: value.secret_refs,
282 connection_id: value.connection_id,
283 })
284 }
285}
286
287#[derive(Clone, Debug)]
292pub struct SinkCatalog {
293 pub id: SinkId,
295
296 pub schema_id: SchemaId,
298
299 pub database_id: DatabaseId,
301
302 pub name: String,
304
305 pub definition: String,
307
308 columns: Vec<ColumnCatalog>,
310
311 pub plan_pk: Vec<ColumnOrder>,
313
314 pub downstream_pk: Option<Vec<usize>>,
316
317 pub distribution_key: Vec<usize>,
320
321 pub properties: BTreeMap<String, String>,
323
324 pub owner: UserId,
326
327 pub sink_type: SinkType,
331 pub ignore_delete: bool,
333
334 pub format_desc: Option<SinkFormatDesc>,
336
337 pub connection_id: Option<ConnectionId>,
339
340 pub created_at_epoch: Option<Epoch>,
341
342 pub initialized_at_epoch: Option<Epoch>,
343
344 pub db_name: String,
346
347 pub sink_from_name: String,
349 pub auto_refresh_schema_from_table: Option<TableId>,
350
351 pub target_table: Option<TableId>,
352
353 pub created_at_cluster_version: Option<String>,
354 pub initialized_at_cluster_version: Option<String>,
355 pub create_type: CreateType,
356
357 pub stream_job_status: StreamJobStatus,
360
361 pub secret_refs: BTreeMap<String, PbSecretRef>,
363
364 pub original_target_columns: Vec<ColumnCatalog>,
366}
367
368impl SinkCatalog {
369 pub fn to_proto(&self) -> PbSink {
370 PbSink {
371 id: self.id,
372 schema_id: self.schema_id,
373 database_id: self.database_id,
374 name: self.name.clone(),
375 definition: self.definition.clone(),
376 columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
377 plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
378 downstream_pk: (self.downstream_pk.as_ref())
379 .map_or_else(Vec::new, |pk| pk.iter().map(|idx| *idx as _).collect_vec()),
380 distribution_key: self
381 .distribution_key
382 .iter()
383 .map(|k| *k as i32)
384 .collect_vec(),
385 owner: self.owner,
386 properties: self.properties.clone(),
387 sink_type: self.sink_type.to_proto() as i32,
388 raw_ignore_delete: self.ignore_delete,
389 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
390 connection_id: self.connection_id,
391 initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
392 created_at_epoch: self.created_at_epoch.map(|e| e.0),
393 db_name: self.db_name.clone(),
394 sink_from_name: self.sink_from_name.clone(),
395 stream_job_status: self.stream_job_status.to_proto().into(),
396 target_table: self.target_table,
397 created_at_cluster_version: self.created_at_cluster_version.clone(),
398 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
399 create_type: self.create_type.to_proto() as i32,
400 secret_refs: self.secret_refs.clone(),
401 original_target_columns: self
402 .original_target_columns
403 .iter()
404 .map(|c| c.to_protobuf())
405 .collect_vec(),
406 auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
407 }
408 }
409
410 pub fn create_sql(&self) -> String {
412 self.definition.clone()
413 }
414
415 pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
416 self.columns.iter().filter(|c| !c.is_hidden)
417 }
418
419 pub fn full_columns(&self) -> &[ColumnCatalog] {
420 &self.columns
421 }
422
423 pub fn full_schema(&self) -> Schema {
424 let fields = self
425 .full_columns()
426 .iter()
427 .map(|column| Field::from(column.column_desc.clone()))
428 .collect_vec();
429 Schema { fields }
430 }
431
432 pub fn visible_schema(&self) -> Schema {
433 let fields = self
434 .visible_columns()
435 .map(|column| Field::from(column.column_desc.clone()))
436 .collect_vec();
437 Schema { fields }
438 }
439
440 pub fn unique_identity(&self) -> String {
441 self.to_proto().unique_identity()
443 }
444
445 pub fn is_created(&self) -> bool {
446 self.stream_job_status == StreamJobStatus::Created
447 }
448}
449
450impl From<PbSink> for SinkCatalog {
451 fn from(pb: PbSink) -> Self {
452 let sink_type = pb.get_sink_type().unwrap();
453 let ignore_delete = pb.ignore_delete();
454 let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
455 let stream_job_status = pb
456 .get_stream_job_status()
457 .unwrap_or(PbStreamJobStatus::Created);
458 let format_desc = match pb.format_desc {
459 Some(f) => f.try_into().ok(),
460 None => {
461 let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
462 let r#type = pb.properties.get(SINK_TYPE_OPTION);
463 match (connector, r#type) {
464 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
465 _ => None,
466 }
467 }
468 };
469 SinkCatalog {
470 id: pb.id,
471 name: pb.name,
472 schema_id: pb.schema_id,
473 database_id: pb.database_id,
474 definition: pb.definition,
475 columns: pb
476 .columns
477 .into_iter()
478 .map(ColumnCatalog::from)
479 .collect_vec(),
480 plan_pk: pb
481 .plan_pk
482 .iter()
483 .map(ColumnOrder::from_protobuf)
484 .collect_vec(),
485 downstream_pk: if pb.downstream_pk.is_empty() {
486 None
487 } else {
488 Some(
489 (pb.downstream_pk.into_iter())
490 .map(|idx| idx as usize)
491 .collect_vec(),
492 )
493 },
494 distribution_key: pb
495 .distribution_key
496 .into_iter()
497 .map(|k| k as _)
498 .collect_vec(),
499 properties: pb.properties,
500 owner: pb.owner,
501 sink_type: SinkType::from_proto(sink_type),
502 ignore_delete,
503 format_desc,
504 connection_id: pb.connection_id,
505 created_at_epoch: pb.created_at_epoch.map(Epoch::from),
506 initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
507 db_name: pb.db_name,
508 sink_from_name: pb.sink_from_name,
509 auto_refresh_schema_from_table: pb.auto_refresh_schema_from_table,
510 target_table: pb.target_table,
511 initialized_at_cluster_version: pb.initialized_at_cluster_version,
512 created_at_cluster_version: pb.created_at_cluster_version,
513 create_type: CreateType::from_proto(create_type),
514 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
515 secret_refs: pb.secret_refs,
516 original_target_columns: pb
517 .original_target_columns
518 .into_iter()
519 .map(ColumnCatalog::from)
520 .collect_vec(),
521 }
522 }
523}
524
525impl From<&PbSink> for SinkCatalog {
526 fn from(pb: &PbSink) -> Self {
527 pb.clone().into()
528 }
529}