1pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23 ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, OBJECT_ID_PLACEHOLDER, Schema,
24 SchemaId, TableId, UserId,
25};
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::sort_util::ColumnOrder;
28use risingwave_pb::catalog::{
29 PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
30};
31use risingwave_pb::secret::PbSecretRef;
32use serde_derive::Serialize;
33
34use super::{
35 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
36 SINK_TYPE_UPSERT, SinkError,
37};
38
39#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
40pub struct SinkId {
41 pub sink_id: u32,
42}
43
44impl SinkId {
45 pub const fn new(sink_id: u32) -> Self {
46 SinkId { sink_id }
47 }
48
49 pub const fn placeholder() -> Self {
51 SinkId {
52 sink_id: OBJECT_ID_PLACEHOLDER,
53 }
54 }
55
56 pub fn sink_id(&self) -> u32 {
57 self.sink_id
58 }
59}
60
61impl std::fmt::Display for SinkId {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(f, "{}", self.sink_id)
64 }
65}
66
67impl From<u32> for SinkId {
68 fn from(id: u32) -> Self {
69 Self::new(id)
70 }
71}
72impl From<SinkId> for u32 {
73 fn from(id: SinkId) -> Self {
74 id.sink_id
75 }
76}
77
78#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
79pub enum SinkType {
80 AppendOnly,
83 ForceAppendOnly,
86 Upsert,
88}
89
90impl SinkType {
91 pub fn is_append_only(&self) -> bool {
92 self == &Self::AppendOnly || self == &Self::ForceAppendOnly
93 }
94
95 pub fn is_upsert(&self) -> bool {
96 self == &Self::Upsert
97 }
98
99 pub fn to_proto(self) -> PbSinkType {
100 match self {
101 SinkType::AppendOnly => PbSinkType::AppendOnly,
102 SinkType::ForceAppendOnly => PbSinkType::ForceAppendOnly,
103 SinkType::Upsert => PbSinkType::Upsert,
104 }
105 }
106
107 pub fn from_proto(pb: PbSinkType) -> Self {
108 match pb {
109 PbSinkType::AppendOnly => SinkType::AppendOnly,
110 PbSinkType::ForceAppendOnly => SinkType::ForceAppendOnly,
111 PbSinkType::Upsert => SinkType::Upsert,
112 PbSinkType::Unspecified => unreachable!(),
113 }
114 }
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, Hash)]
121pub struct SinkFormatDesc {
122 pub format: SinkFormat,
123 pub encode: SinkEncode,
124 pub options: BTreeMap<String, String>,
125 pub secret_refs: BTreeMap<String, PbSecretRef>,
126 pub key_encode: Option<SinkEncode>,
127 pub connection_id: Option<u32>,
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
132pub enum SinkFormat {
133 AppendOnly,
134 Upsert,
135 Debezium,
136}
137
138impl Display for SinkFormat {
139 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140 write!(f, "{:?}", self)
141 }
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
146pub enum SinkEncode {
147 Json,
148 Protobuf,
149 Avro,
150 Template,
151 Parquet,
152 Text,
153 Bytes,
154}
155
156impl Display for SinkEncode {
157 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
158 write!(f, "{:?}", self)
159 }
160}
161
162impl SinkFormatDesc {
163 pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
164 use crate::sink::Sink as _;
165 use crate::sink::kafka::KafkaSink;
166 use crate::sink::kinesis::KinesisSink;
167 use crate::sink::pulsar::PulsarSink;
168
169 let format = match r#type {
170 SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
171 SINK_TYPE_UPSERT => SinkFormat::Upsert,
172 SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
173 _ => {
174 return Err(SinkError::Config(anyhow!(
175 "sink type unsupported: {}",
176 r#type
177 )));
178 }
179 };
180 let encode = match connector {
181 KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
182 SinkEncode::Json
183 }
184 _ => return Ok(None),
185 };
186 Ok(Some(Self {
187 format,
188 encode,
189 options: Default::default(),
190 secret_refs: Default::default(),
191 key_encode: None,
192 connection_id: None,
193 }))
194 }
195
196 pub fn to_proto(&self) -> PbSinkFormatDesc {
197 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
198
199 let format = match self.format {
200 SinkFormat::AppendOnly => F::Plain,
201 SinkFormat::Upsert => F::Upsert,
202 SinkFormat::Debezium => F::Debezium,
203 };
204 let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
205 SinkEncode::Json => E::Json,
206 SinkEncode::Protobuf => E::Protobuf,
207 SinkEncode::Avro => E::Avro,
208 SinkEncode::Template => E::Template,
209 SinkEncode::Parquet => E::Parquet,
210 SinkEncode::Text => E::Text,
211 SinkEncode::Bytes => E::Bytes,
212 };
213
214 let encode = mapping_encode(&self.encode);
215 let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
216 let options = self
217 .options
218 .iter()
219 .map(|(k, v)| (k.clone(), v.clone()))
220 .collect();
221
222 PbSinkFormatDesc {
223 format: format.into(),
224 encode: encode.into(),
225 options,
226 key_encode,
227 secret_refs: self.secret_refs.clone(),
228 connection_id: self.connection_id,
229 }
230 }
231
232 pub fn plain_json_for_snowflake_only() -> Self {
235 Self {
236 format: SinkFormat::AppendOnly,
237 encode: SinkEncode::Json,
238 options: Default::default(),
239 secret_refs: Default::default(),
240 key_encode: None,
241 connection_id: None,
242 }
243 }
244}
245
246impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
247 type Error = SinkError;
248
249 fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
250 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
251
252 let format = match value.format() {
253 F::Plain => SinkFormat::AppendOnly,
254 F::Upsert => SinkFormat::Upsert,
255 F::Debezium => SinkFormat::Debezium,
256 f @ (F::Unspecified
257 | F::Native
258 | F::DebeziumMongo
259 | F::Maxwell
260 | F::Canal
261 | F::None) => {
262 return Err(SinkError::Config(anyhow!(
263 "sink format unsupported: {}",
264 f.as_str_name()
265 )));
266 }
267 };
268 let encode = match value.encode() {
269 E::Json => SinkEncode::Json,
270 E::Protobuf => SinkEncode::Protobuf,
271 E::Template => SinkEncode::Template,
272 E::Avro => SinkEncode::Avro,
273 E::Parquet => SinkEncode::Parquet,
274 e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
275 return Err(SinkError::Config(anyhow!(
276 "sink encode unsupported: {}",
277 e.as_str_name()
278 )));
279 }
280 };
281 let key_encode = match &value.key_encode() {
282 E::Bytes => Some(SinkEncode::Bytes),
283 E::Text => Some(SinkEncode::Text),
284 E::Unspecified => None,
285 encode @ (E::Avro
286 | E::Csv
287 | E::Json
288 | E::Protobuf
289 | E::Template
290 | E::Native
291 | E::Parquet
292 | E::None) => {
293 return Err(SinkError::Config(anyhow!(
294 "unsupported {} as sink key encode",
295 encode.as_str_name()
296 )));
297 }
298 };
299
300 Ok(Self {
301 format,
302 encode,
303 options: value.options,
304 key_encode,
305 secret_refs: value.secret_refs,
306 connection_id: value.connection_id,
307 })
308 }
309}
310
311#[derive(Clone, Debug)]
316pub struct SinkCatalog {
317 pub id: SinkId,
319
320 pub schema_id: SchemaId,
322
323 pub database_id: DatabaseId,
325
326 pub name: String,
328
329 pub definition: String,
331
332 columns: Vec<ColumnCatalog>,
334
335 pub plan_pk: Vec<ColumnOrder>,
337
338 pub downstream_pk: Vec<usize>,
340
341 pub distribution_key: Vec<usize>,
344
345 pub properties: BTreeMap<String, String>,
347
348 pub owner: UserId,
350
351 pub sink_type: SinkType,
355
356 pub format_desc: Option<SinkFormatDesc>,
358
359 pub connection_id: Option<ConnectionId>,
361
362 pub created_at_epoch: Option<Epoch>,
363
364 pub initialized_at_epoch: Option<Epoch>,
365
366 pub db_name: String,
368
369 pub sink_from_name: String,
371
372 pub target_table: Option<TableId>,
373
374 pub created_at_cluster_version: Option<String>,
375 pub initialized_at_cluster_version: Option<String>,
376 pub create_type: CreateType,
377
378 pub secret_refs: BTreeMap<String, PbSecretRef>,
380
381 pub original_target_columns: Vec<ColumnCatalog>,
383}
384
385impl SinkCatalog {
386 pub fn to_proto(&self) -> PbSink {
387 PbSink {
388 id: self.id.into(),
389 schema_id: self.schema_id.schema_id,
390 database_id: self.database_id.database_id,
391 name: self.name.clone(),
392 definition: self.definition.clone(),
393 columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
394 plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
395 downstream_pk: self
396 .downstream_pk
397 .iter()
398 .map(|idx| *idx as i32)
399 .collect_vec(),
400 distribution_key: self
401 .distribution_key
402 .iter()
403 .map(|k| *k as i32)
404 .collect_vec(),
405 owner: self.owner.into(),
406 properties: self.properties.clone(),
407 sink_type: self.sink_type.to_proto() as i32,
408 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
409 connection_id: self.connection_id.map(|id| id.into()),
410 initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
411 created_at_epoch: self.created_at_epoch.map(|e| e.0),
412 db_name: self.db_name.clone(),
413 sink_from_name: self.sink_from_name.clone(),
414 stream_job_status: PbStreamJobStatus::Creating.into(),
415 target_table: self.target_table.map(|table_id| table_id.table_id()),
416 created_at_cluster_version: self.created_at_cluster_version.clone(),
417 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
418 create_type: self.create_type.to_proto() as i32,
419 secret_refs: self.secret_refs.clone(),
420 original_target_columns: self
421 .original_target_columns
422 .iter()
423 .map(|c| c.to_protobuf())
424 .collect_vec(),
425 }
426 }
427
428 pub fn create_sql(&self) -> String {
430 self.definition.clone()
431 }
432
433 pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
434 self.columns.iter().filter(|c| !c.is_hidden)
435 }
436
437 pub fn full_columns(&self) -> &[ColumnCatalog] {
438 &self.columns
439 }
440
441 pub fn full_schema(&self) -> Schema {
442 let fields = self
443 .full_columns()
444 .iter()
445 .map(|column| Field::from(column.column_desc.clone()))
446 .collect_vec();
447 Schema { fields }
448 }
449
450 pub fn visible_schema(&self) -> Schema {
451 let fields = self
452 .visible_columns()
453 .map(|column| Field::from(column.column_desc.clone()))
454 .collect_vec();
455 Schema { fields }
456 }
457
458 pub fn downstream_pk_indices(&self) -> Vec<usize> {
459 self.downstream_pk.clone()
460 }
461
462 pub fn unique_identity(&self) -> String {
463 self.to_proto().unique_identity()
465 }
466}
467
468impl From<PbSink> for SinkCatalog {
469 fn from(pb: PbSink) -> Self {
470 let sink_type = pb.get_sink_type().unwrap();
471 let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
472 let format_desc = match pb.format_desc {
473 Some(f) => f.try_into().ok(),
474 None => {
475 let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
476 let r#type = pb.properties.get(SINK_TYPE_OPTION);
477 match (connector, r#type) {
478 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
479 _ => None,
480 }
481 }
482 };
483 SinkCatalog {
484 id: pb.id.into(),
485 name: pb.name,
486 schema_id: pb.schema_id.into(),
487 database_id: pb.database_id.into(),
488 definition: pb.definition,
489 columns: pb
490 .columns
491 .into_iter()
492 .map(ColumnCatalog::from)
493 .collect_vec(),
494 plan_pk: pb
495 .plan_pk
496 .iter()
497 .map(ColumnOrder::from_protobuf)
498 .collect_vec(),
499 downstream_pk: pb.downstream_pk.into_iter().map(|k| k as _).collect_vec(),
500 distribution_key: pb
501 .distribution_key
502 .into_iter()
503 .map(|k| k as _)
504 .collect_vec(),
505 properties: pb.properties,
506 owner: pb.owner.into(),
507 sink_type: SinkType::from_proto(sink_type),
508 format_desc,
509 connection_id: pb.connection_id.map(ConnectionId),
510 created_at_epoch: pb.created_at_epoch.map(Epoch::from),
511 initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
512 db_name: pb.db_name,
513 sink_from_name: pb.sink_from_name,
514 target_table: pb.target_table.map(TableId::new),
515 initialized_at_cluster_version: pb.initialized_at_cluster_version,
516 created_at_cluster_version: pb.created_at_cluster_version,
517 create_type: CreateType::from_proto(create_type),
518 secret_refs: pb.secret_refs,
519 original_target_columns: pb
520 .original_target_columns
521 .into_iter()
522 .map(ColumnCatalog::from)
523 .collect_vec(),
524 }
525 }
526}
527
528impl From<&PbSink> for SinkCatalog {
529 fn from(pb: &PbSink) -> Self {
530 pb.clone().into()
531 }
532}