1pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23 ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, OBJECT_ID_PLACEHOLDER, Schema,
24 SchemaId, StreamJobStatus, TableId, UserId,
25};
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::sort_util::ColumnOrder;
28use risingwave_pb::catalog::{
29 PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
30};
31use risingwave_pb::secret::PbSecretRef;
32use serde::Serialize;
33
34use super::{
35 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
36 SINK_TYPE_UPSERT, SinkError,
37};
38
39#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
40pub struct SinkId {
41 pub sink_id: u32,
42}
43
44impl SinkId {
45 pub const fn new(sink_id: u32) -> Self {
46 SinkId { sink_id }
47 }
48
49 pub const fn placeholder() -> Self {
51 SinkId {
52 sink_id: OBJECT_ID_PLACEHOLDER,
53 }
54 }
55
56 pub fn sink_id(&self) -> u32 {
57 self.sink_id
58 }
59}
60
61impl std::fmt::Display for SinkId {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(f, "{}", self.sink_id)
64 }
65}
66
67impl From<u32> for SinkId {
68 fn from(id: u32) -> Self {
69 Self::new(id)
70 }
71}
72impl From<SinkId> for u32 {
73 fn from(id: SinkId) -> Self {
74 id.sink_id
75 }
76}
77
78#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
79pub enum SinkType {
80 AppendOnly,
83 ForceAppendOnly,
86 Upsert,
89 Retract,
94}
95
96impl SinkType {
97 pub fn is_append_only(self) -> bool {
99 self == Self::AppendOnly || self == Self::ForceAppendOnly
100 }
101
102 pub fn type_str(self) -> &'static str {
104 match self {
105 SinkType::AppendOnly | SinkType::ForceAppendOnly => "append-only",
106 SinkType::Upsert => "upsert",
107 SinkType::Retract => "retract",
108 }
109 }
110
111 pub fn to_proto(self) -> PbSinkType {
112 match self {
113 SinkType::AppendOnly => PbSinkType::AppendOnly,
114 SinkType::ForceAppendOnly => PbSinkType::ForceAppendOnly,
115 SinkType::Upsert => PbSinkType::Upsert,
116 SinkType::Retract => PbSinkType::Retract,
117 }
118 }
119
120 pub fn from_proto(pb: PbSinkType) -> Self {
121 match pb {
122 PbSinkType::AppendOnly => SinkType::AppendOnly,
123 PbSinkType::ForceAppendOnly => SinkType::ForceAppendOnly,
124 PbSinkType::Upsert => SinkType::Upsert,
125 PbSinkType::Retract => SinkType::Retract,
126 PbSinkType::Unspecified => unreachable!(),
127 }
128 }
129}
130
131#[derive(Debug, Clone, PartialEq, Eq, Hash)]
135pub struct SinkFormatDesc {
136 pub format: SinkFormat,
137 pub encode: SinkEncode,
138 pub options: BTreeMap<String, String>,
139 pub secret_refs: BTreeMap<String, PbSecretRef>,
140 pub key_encode: Option<SinkEncode>,
141 pub connection_id: Option<u32>,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
146pub enum SinkFormat {
147 AppendOnly,
148 Upsert,
149 Debezium,
150}
151
152impl Display for SinkFormat {
153 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154 write!(f, "{:?}", self)
155 }
156}
157
158#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
160pub enum SinkEncode {
161 Json,
162 Protobuf,
163 Avro,
164 Template,
165 Parquet,
166 Text,
167 Bytes,
168}
169
170impl Display for SinkEncode {
171 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172 write!(f, "{:?}", self)
173 }
174}
175
176impl SinkFormatDesc {
177 pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
178 use crate::sink::Sink as _;
179 use crate::sink::kafka::KafkaSink;
180 use crate::sink::kinesis::KinesisSink;
181 use crate::sink::pulsar::PulsarSink;
182
183 let format = match r#type {
184 SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
185 SINK_TYPE_UPSERT => SinkFormat::Upsert,
186 SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
187 _ => {
188 return Err(SinkError::Config(anyhow!(
189 "sink type unsupported: {}",
190 r#type
191 )));
192 }
193 };
194 let encode = match connector {
195 KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
196 SinkEncode::Json
197 }
198 _ => return Ok(None),
199 };
200 Ok(Some(Self {
201 format,
202 encode,
203 options: Default::default(),
204 secret_refs: Default::default(),
205 key_encode: None,
206 connection_id: None,
207 }))
208 }
209
210 pub fn to_proto(&self) -> PbSinkFormatDesc {
211 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
212
213 let format = match self.format {
214 SinkFormat::AppendOnly => F::Plain,
215 SinkFormat::Upsert => F::Upsert,
216 SinkFormat::Debezium => F::Debezium,
217 };
218 let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
219 SinkEncode::Json => E::Json,
220 SinkEncode::Protobuf => E::Protobuf,
221 SinkEncode::Avro => E::Avro,
222 SinkEncode::Template => E::Template,
223 SinkEncode::Parquet => E::Parquet,
224 SinkEncode::Text => E::Text,
225 SinkEncode::Bytes => E::Bytes,
226 };
227
228 let encode = mapping_encode(&self.encode);
229 let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
230 let options = self
231 .options
232 .iter()
233 .map(|(k, v)| (k.clone(), v.clone()))
234 .collect();
235
236 PbSinkFormatDesc {
237 format: format.into(),
238 encode: encode.into(),
239 options,
240 key_encode,
241 secret_refs: self.secret_refs.clone(),
242 connection_id: self.connection_id,
243 }
244 }
245
246 pub fn plain_json_for_snowflake_only() -> Self {
249 Self {
250 format: SinkFormat::AppendOnly,
251 encode: SinkEncode::Json,
252 options: Default::default(),
253 secret_refs: Default::default(),
254 key_encode: None,
255 connection_id: None,
256 }
257 }
258}
259
260impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
261 type Error = SinkError;
262
263 fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
264 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
265
266 let format = match value.format() {
267 F::Plain => SinkFormat::AppendOnly,
268 F::Upsert => SinkFormat::Upsert,
269 F::Debezium => SinkFormat::Debezium,
270 f @ (F::Unspecified
271 | F::Native
272 | F::DebeziumMongo
273 | F::Maxwell
274 | F::Canal
275 | F::None) => {
276 return Err(SinkError::Config(anyhow!(
277 "sink format unsupported: {}",
278 f.as_str_name()
279 )));
280 }
281 };
282 let encode = match value.encode() {
283 E::Json => SinkEncode::Json,
284 E::Protobuf => SinkEncode::Protobuf,
285 E::Template => SinkEncode::Template,
286 E::Avro => SinkEncode::Avro,
287 E::Parquet => SinkEncode::Parquet,
288 e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
289 return Err(SinkError::Config(anyhow!(
290 "sink encode unsupported: {}",
291 e.as_str_name()
292 )));
293 }
294 };
295 let key_encode = match &value.key_encode() {
296 E::Bytes => Some(SinkEncode::Bytes),
297 E::Text => Some(SinkEncode::Text),
298 E::Unspecified => None,
299 encode @ (E::Avro
300 | E::Csv
301 | E::Json
302 | E::Protobuf
303 | E::Template
304 | E::Native
305 | E::Parquet
306 | E::None) => {
307 return Err(SinkError::Config(anyhow!(
308 "unsupported {} as sink key encode",
309 encode.as_str_name()
310 )));
311 }
312 };
313
314 Ok(Self {
315 format,
316 encode,
317 options: value.options,
318 key_encode,
319 secret_refs: value.secret_refs,
320 connection_id: value.connection_id,
321 })
322 }
323}
324
325#[derive(Clone, Debug)]
330pub struct SinkCatalog {
331 pub id: SinkId,
333
334 pub schema_id: SchemaId,
336
337 pub database_id: DatabaseId,
339
340 pub name: String,
342
343 pub definition: String,
345
346 columns: Vec<ColumnCatalog>,
348
349 pub plan_pk: Vec<ColumnOrder>,
351
352 pub downstream_pk: Option<Vec<usize>>,
354
355 pub distribution_key: Vec<usize>,
358
359 pub properties: BTreeMap<String, String>,
361
362 pub owner: UserId,
364
365 pub sink_type: SinkType,
369
370 pub format_desc: Option<SinkFormatDesc>,
372
373 pub connection_id: Option<ConnectionId>,
375
376 pub created_at_epoch: Option<Epoch>,
377
378 pub initialized_at_epoch: Option<Epoch>,
379
380 pub db_name: String,
382
383 pub sink_from_name: String,
385 pub auto_refresh_schema_from_table: Option<TableId>,
386
387 pub target_table: Option<TableId>,
388
389 pub created_at_cluster_version: Option<String>,
390 pub initialized_at_cluster_version: Option<String>,
391 pub create_type: CreateType,
392
393 pub stream_job_status: StreamJobStatus,
396
397 pub secret_refs: BTreeMap<String, PbSecretRef>,
399
400 pub original_target_columns: Vec<ColumnCatalog>,
402}
403
404impl SinkCatalog {
405 pub fn to_proto(&self) -> PbSink {
406 PbSink {
407 id: self.id.into(),
408 schema_id: self.schema_id,
409 database_id: self.database_id,
410 name: self.name.clone(),
411 definition: self.definition.clone(),
412 columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
413 plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
414 downstream_pk: (self.downstream_pk.as_ref())
415 .map_or_else(Vec::new, |pk| pk.iter().map(|idx| *idx as _).collect_vec()),
416 distribution_key: self
417 .distribution_key
418 .iter()
419 .map(|k| *k as i32)
420 .collect_vec(),
421 owner: self.owner.into(),
422 properties: self.properties.clone(),
423 sink_type: self.sink_type.to_proto() as i32,
424 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
425 connection_id: self.connection_id.map(|id| id.into()),
426 initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
427 created_at_epoch: self.created_at_epoch.map(|e| e.0),
428 db_name: self.db_name.clone(),
429 sink_from_name: self.sink_from_name.clone(),
430 stream_job_status: self.stream_job_status.to_proto().into(),
431 target_table: self.target_table,
432 created_at_cluster_version: self.created_at_cluster_version.clone(),
433 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
434 create_type: self.create_type.to_proto() as i32,
435 secret_refs: self.secret_refs.clone(),
436 original_target_columns: self
437 .original_target_columns
438 .iter()
439 .map(|c| c.to_protobuf())
440 .collect_vec(),
441 auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
442 }
443 }
444
445 pub fn create_sql(&self) -> String {
447 self.definition.clone()
448 }
449
450 pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
451 self.columns.iter().filter(|c| !c.is_hidden)
452 }
453
454 pub fn full_columns(&self) -> &[ColumnCatalog] {
455 &self.columns
456 }
457
458 pub fn full_schema(&self) -> Schema {
459 let fields = self
460 .full_columns()
461 .iter()
462 .map(|column| Field::from(column.column_desc.clone()))
463 .collect_vec();
464 Schema { fields }
465 }
466
467 pub fn visible_schema(&self) -> Schema {
468 let fields = self
469 .visible_columns()
470 .map(|column| Field::from(column.column_desc.clone()))
471 .collect_vec();
472 Schema { fields }
473 }
474
475 pub fn unique_identity(&self) -> String {
476 self.to_proto().unique_identity()
478 }
479
480 pub fn is_created(&self) -> bool {
481 self.stream_job_status == StreamJobStatus::Created
482 }
483}
484
485impl From<PbSink> for SinkCatalog {
486 fn from(pb: PbSink) -> Self {
487 let sink_type = pb.get_sink_type().unwrap();
488 let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
489 let stream_job_status = pb
490 .get_stream_job_status()
491 .unwrap_or(PbStreamJobStatus::Created);
492 let format_desc = match pb.format_desc {
493 Some(f) => f.try_into().ok(),
494 None => {
495 let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
496 let r#type = pb.properties.get(SINK_TYPE_OPTION);
497 match (connector, r#type) {
498 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
499 _ => None,
500 }
501 }
502 };
503 SinkCatalog {
504 id: pb.id.into(),
505 name: pb.name,
506 schema_id: pb.schema_id,
507 database_id: pb.database_id,
508 definition: pb.definition,
509 columns: pb
510 .columns
511 .into_iter()
512 .map(ColumnCatalog::from)
513 .collect_vec(),
514 plan_pk: pb
515 .plan_pk
516 .iter()
517 .map(ColumnOrder::from_protobuf)
518 .collect_vec(),
519 downstream_pk: if pb.downstream_pk.is_empty() {
520 None
521 } else {
522 Some(
523 (pb.downstream_pk.into_iter())
524 .map(|idx| idx as usize)
525 .collect_vec(),
526 )
527 },
528 distribution_key: pb
529 .distribution_key
530 .into_iter()
531 .map(|k| k as _)
532 .collect_vec(),
533 properties: pb.properties,
534 owner: pb.owner.into(),
535 sink_type: SinkType::from_proto(sink_type),
536 format_desc,
537 connection_id: pb.connection_id.map(ConnectionId),
538 created_at_epoch: pb.created_at_epoch.map(Epoch::from),
539 initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
540 db_name: pb.db_name,
541 sink_from_name: pb.sink_from_name,
542 auto_refresh_schema_from_table: pb.auto_refresh_schema_from_table,
543 target_table: pb.target_table,
544 initialized_at_cluster_version: pb.initialized_at_cluster_version,
545 created_at_cluster_version: pb.created_at_cluster_version,
546 create_type: CreateType::from_proto(create_type),
547 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
548 secret_refs: pb.secret_refs,
549 original_target_columns: pb
550 .original_target_columns
551 .into_iter()
552 .map(ColumnCatalog::from)
553 .collect_vec(),
554 }
555 }
556}
557
558impl From<&PbSink> for SinkCatalog {
559 fn from(pb: &PbSink) -> Self {
560 pb.clone().into()
561 }
562}