1pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23 ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, OBJECT_ID_PLACEHOLDER, Schema,
24 SchemaId, TableId, UserId,
25};
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::sort_util::ColumnOrder;
28use risingwave_pb::catalog::{
29 PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
30};
31use risingwave_pb::secret::PbSecretRef;
32use serde_derive::Serialize;
33
34use super::{
35 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
36 SINK_TYPE_UPSERT, SinkError,
37};
38
39#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
40pub struct SinkId {
41 pub sink_id: u32,
42}
43
44impl SinkId {
45 pub const fn new(sink_id: u32) -> Self {
46 SinkId { sink_id }
47 }
48
49 pub const fn placeholder() -> Self {
51 SinkId {
52 sink_id: OBJECT_ID_PLACEHOLDER,
53 }
54 }
55
56 pub fn sink_id(&self) -> u32 {
57 self.sink_id
58 }
59}
60
61impl std::fmt::Display for SinkId {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(f, "{}", self.sink_id)
64 }
65}
66
67impl From<u32> for SinkId {
68 fn from(id: u32) -> Self {
69 Self::new(id)
70 }
71}
72impl From<SinkId> for u32 {
73 fn from(id: SinkId) -> Self {
74 id.sink_id
75 }
76}
77
78#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
79pub enum SinkType {
80 AppendOnly,
83 ForceAppendOnly,
86 Upsert,
88}
89
90impl SinkType {
91 pub fn is_append_only(&self) -> bool {
92 self == &Self::AppendOnly || self == &Self::ForceAppendOnly
93 }
94
95 pub fn is_upsert(&self) -> bool {
96 self == &Self::Upsert
97 }
98
99 pub fn to_proto(self) -> PbSinkType {
100 match self {
101 SinkType::AppendOnly => PbSinkType::AppendOnly,
102 SinkType::ForceAppendOnly => PbSinkType::ForceAppendOnly,
103 SinkType::Upsert => PbSinkType::Upsert,
104 }
105 }
106
107 pub fn from_proto(pb: PbSinkType) -> Self {
108 match pb {
109 PbSinkType::AppendOnly => SinkType::AppendOnly,
110 PbSinkType::ForceAppendOnly => SinkType::ForceAppendOnly,
111 PbSinkType::Upsert => SinkType::Upsert,
112 PbSinkType::Unspecified => unreachable!(),
113 }
114 }
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, Hash)]
121pub struct SinkFormatDesc {
122 pub format: SinkFormat,
123 pub encode: SinkEncode,
124 pub options: BTreeMap<String, String>,
125 pub secret_refs: BTreeMap<String, PbSecretRef>,
126 pub key_encode: Option<SinkEncode>,
127 pub connection_id: Option<u32>,
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
132pub enum SinkFormat {
133 AppendOnly,
134 Upsert,
135 Debezium,
136}
137
138impl Display for SinkFormat {
139 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140 write!(f, "{:?}", self)
141 }
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
146pub enum SinkEncode {
147 Json,
148 Protobuf,
149 Avro,
150 Template,
151 Parquet,
152 Text,
153 Bytes,
154}
155
156impl Display for SinkEncode {
157 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
158 write!(f, "{:?}", self)
159 }
160}
161
162impl SinkFormatDesc {
163 pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
164 use crate::sink::Sink as _;
165 use crate::sink::kafka::KafkaSink;
166 use crate::sink::kinesis::KinesisSink;
167 use crate::sink::pulsar::PulsarSink;
168
169 let format = match r#type {
170 SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
171 SINK_TYPE_UPSERT => SinkFormat::Upsert,
172 SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
173 _ => {
174 return Err(SinkError::Config(anyhow!(
175 "sink type unsupported: {}",
176 r#type
177 )));
178 }
179 };
180 let encode = match connector {
181 KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
182 SinkEncode::Json
183 }
184 _ => return Ok(None),
185 };
186 Ok(Some(Self {
187 format,
188 encode,
189 options: Default::default(),
190 secret_refs: Default::default(),
191 key_encode: None,
192 connection_id: None,
193 }))
194 }
195
196 pub fn to_proto(&self) -> PbSinkFormatDesc {
197 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
198
199 let format = match self.format {
200 SinkFormat::AppendOnly => F::Plain,
201 SinkFormat::Upsert => F::Upsert,
202 SinkFormat::Debezium => F::Debezium,
203 };
204 let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
205 SinkEncode::Json => E::Json,
206 SinkEncode::Protobuf => E::Protobuf,
207 SinkEncode::Avro => E::Avro,
208 SinkEncode::Template => E::Template,
209 SinkEncode::Parquet => E::Parquet,
210 SinkEncode::Text => E::Text,
211 SinkEncode::Bytes => E::Bytes,
212 };
213
214 let encode = mapping_encode(&self.encode);
215 let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
216 let options = self
217 .options
218 .iter()
219 .map(|(k, v)| (k.clone(), v.clone()))
220 .collect();
221
222 PbSinkFormatDesc {
223 format: format.into(),
224 encode: encode.into(),
225 options,
226 key_encode,
227 secret_refs: self.secret_refs.clone(),
228 connection_id: self.connection_id,
229 }
230 }
231
232 pub fn plain_json_for_snowflake_only() -> Self {
235 Self {
236 format: SinkFormat::AppendOnly,
237 encode: SinkEncode::Json,
238 options: Default::default(),
239 secret_refs: Default::default(),
240 key_encode: None,
241 connection_id: None,
242 }
243 }
244}
245
246impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
247 type Error = SinkError;
248
249 fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
250 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
251
252 let format = match value.format() {
253 F::Plain => SinkFormat::AppendOnly,
254 F::Upsert => SinkFormat::Upsert,
255 F::Debezium => SinkFormat::Debezium,
256 f @ (F::Unspecified
257 | F::Native
258 | F::DebeziumMongo
259 | F::Maxwell
260 | F::Canal
261 | F::None) => {
262 return Err(SinkError::Config(anyhow!(
263 "sink format unsupported: {}",
264 f.as_str_name()
265 )));
266 }
267 };
268 let encode = match value.encode() {
269 E::Json => SinkEncode::Json,
270 E::Protobuf => SinkEncode::Protobuf,
271 E::Template => SinkEncode::Template,
272 E::Avro => SinkEncode::Avro,
273 E::Parquet => SinkEncode::Parquet,
274 e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
275 return Err(SinkError::Config(anyhow!(
276 "sink encode unsupported: {}",
277 e.as_str_name()
278 )));
279 }
280 };
281 let key_encode = match &value.key_encode() {
282 E::Bytes => Some(SinkEncode::Bytes),
283 E::Text => Some(SinkEncode::Text),
284 E::Unspecified => None,
285 encode @ (E::Avro
286 | E::Csv
287 | E::Json
288 | E::Protobuf
289 | E::Template
290 | E::Native
291 | E::Parquet
292 | E::None) => {
293 return Err(SinkError::Config(anyhow!(
294 "unsupported {} as sink key encode",
295 encode.as_str_name()
296 )));
297 }
298 };
299
300 Ok(Self {
301 format,
302 encode,
303 options: value.options,
304 key_encode,
305 secret_refs: value.secret_refs,
306 connection_id: value.connection_id,
307 })
308 }
309}
310
311#[derive(Clone, Debug)]
316pub struct SinkCatalog {
317 pub id: SinkId,
319
320 pub schema_id: SchemaId,
322
323 pub database_id: DatabaseId,
325
326 pub name: String,
328
329 pub definition: String,
331
332 columns: Vec<ColumnCatalog>,
334
335 pub plan_pk: Vec<ColumnOrder>,
337
338 pub downstream_pk: Vec<usize>,
340
341 pub distribution_key: Vec<usize>,
344
345 pub properties: BTreeMap<String, String>,
347
348 pub owner: UserId,
350
351 pub sink_type: SinkType,
355
356 pub format_desc: Option<SinkFormatDesc>,
358
359 pub connection_id: Option<ConnectionId>,
361
362 pub created_at_epoch: Option<Epoch>,
363
364 pub initialized_at_epoch: Option<Epoch>,
365
366 pub db_name: String,
368
369 pub sink_from_name: String,
371
372 pub target_table: Option<TableId>,
373
374 pub created_at_cluster_version: Option<String>,
375 pub initialized_at_cluster_version: Option<String>,
376 pub create_type: CreateType,
377
378 pub secret_refs: BTreeMap<String, PbSecretRef>,
380
381 pub original_target_columns: Vec<ColumnCatalog>,
383}
384
385impl SinkCatalog {
386 pub fn to_proto(&self) -> PbSink {
387 #[allow(deprecated)] PbSink {
389 id: self.id.into(),
390 schema_id: self.schema_id.schema_id,
391 database_id: self.database_id.database_id,
392 name: self.name.clone(),
393 definition: self.definition.clone(),
394 columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
395 plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
396 downstream_pk: self
397 .downstream_pk
398 .iter()
399 .map(|idx| *idx as i32)
400 .collect_vec(),
401 dependent_relations: vec![],
402 distribution_key: self
403 .distribution_key
404 .iter()
405 .map(|k| *k as i32)
406 .collect_vec(),
407 owner: self.owner.into(),
408 properties: self.properties.clone(),
409 sink_type: self.sink_type.to_proto() as i32,
410 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
411 connection_id: self.connection_id.map(|id| id.into()),
412 initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
413 created_at_epoch: self.created_at_epoch.map(|e| e.0),
414 db_name: self.db_name.clone(),
415 sink_from_name: self.sink_from_name.clone(),
416 stream_job_status: PbStreamJobStatus::Creating.into(),
417 target_table: self.target_table.map(|table_id| table_id.table_id()),
418 created_at_cluster_version: self.created_at_cluster_version.clone(),
419 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
420 create_type: self.create_type.to_proto() as i32,
421 secret_refs: self.secret_refs.clone(),
422 original_target_columns: self
423 .original_target_columns
424 .iter()
425 .map(|c| c.to_protobuf())
426 .collect_vec(),
427 }
428 }
429
430 pub fn create_sql(&self) -> String {
432 self.definition.clone()
433 }
434
435 pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
436 self.columns.iter().filter(|c| !c.is_hidden)
437 }
438
439 pub fn full_columns(&self) -> &[ColumnCatalog] {
440 &self.columns
441 }
442
443 pub fn full_schema(&self) -> Schema {
444 let fields = self
445 .full_columns()
446 .iter()
447 .map(|column| Field::from(column.column_desc.clone()))
448 .collect_vec();
449 Schema { fields }
450 }
451
452 pub fn visible_schema(&self) -> Schema {
453 let fields = self
454 .visible_columns()
455 .map(|column| Field::from(column.column_desc.clone()))
456 .collect_vec();
457 Schema { fields }
458 }
459
460 pub fn downstream_pk_indices(&self) -> Vec<usize> {
461 self.downstream_pk.clone()
462 }
463
464 pub fn unique_identity(&self) -> String {
465 self.to_proto().unique_identity()
467 }
468}
469
470impl From<PbSink> for SinkCatalog {
471 fn from(pb: PbSink) -> Self {
472 let sink_type = pb.get_sink_type().unwrap();
473 let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
474 let format_desc = match pb.format_desc {
475 Some(f) => f.try_into().ok(),
476 None => {
477 let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
478 let r#type = pb.properties.get(SINK_TYPE_OPTION);
479 match (connector, r#type) {
480 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
481 _ => None,
482 }
483 }
484 };
485 SinkCatalog {
486 id: pb.id.into(),
487 name: pb.name,
488 schema_id: pb.schema_id.into(),
489 database_id: pb.database_id.into(),
490 definition: pb.definition,
491 columns: pb
492 .columns
493 .into_iter()
494 .map(ColumnCatalog::from)
495 .collect_vec(),
496 plan_pk: pb
497 .plan_pk
498 .iter()
499 .map(ColumnOrder::from_protobuf)
500 .collect_vec(),
501 downstream_pk: pb.downstream_pk.into_iter().map(|k| k as _).collect_vec(),
502 distribution_key: pb
503 .distribution_key
504 .into_iter()
505 .map(|k| k as _)
506 .collect_vec(),
507 properties: pb.properties,
508 owner: pb.owner.into(),
509 sink_type: SinkType::from_proto(sink_type),
510 format_desc,
511 connection_id: pb.connection_id.map(ConnectionId),
512 created_at_epoch: pb.created_at_epoch.map(Epoch::from),
513 initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
514 db_name: pb.db_name,
515 sink_from_name: pb.sink_from_name,
516 target_table: pb.target_table.map(TableId::new),
517 initialized_at_cluster_version: pb.initialized_at_cluster_version,
518 created_at_cluster_version: pb.created_at_cluster_version,
519 create_type: CreateType::from_proto(create_type),
520 secret_refs: pb.secret_refs,
521 original_target_columns: pb
522 .original_target_columns
523 .into_iter()
524 .map(ColumnCatalog::from)
525 .collect_vec(),
526 }
527 }
528}
529
530impl From<&PbSink> for SinkCatalog {
531 fn from(pb: &PbSink) -> Self {
532 pb.clone().into()
533 }
534}