1pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23 ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, Schema, SchemaId, StreamJobStatus,
24 TableId, UserId,
25};
26pub use risingwave_common::id::SinkId;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::sort_util::ColumnOrder;
29use risingwave_pb::catalog::{
30 PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
31};
32use risingwave_pb::secret::PbSecretRef;
33use serde::Serialize;
34
35use super::{
36 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
37 SINK_TYPE_UPSERT, SinkError,
38};
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
41pub enum SinkType {
42 AppendOnly,
45 Upsert,
48 Retract,
53}
54
55impl SinkType {
56 pub fn is_append_only(self) -> bool {
58 self == Self::AppendOnly
59 }
60
61 pub fn type_str(self) -> &'static str {
63 match self {
64 SinkType::AppendOnly => "append-only",
65 SinkType::Upsert => "upsert",
66 SinkType::Retract => "retract",
67 }
68 }
69
70 pub fn to_proto(self) -> PbSinkType {
71 match self {
72 SinkType::AppendOnly => PbSinkType::AppendOnly,
73 SinkType::Upsert => PbSinkType::Upsert,
74 SinkType::Retract => PbSinkType::Retract,
75 }
76 }
77
78 pub fn from_proto(pb: PbSinkType) -> Self {
79 match pb {
80 PbSinkType::AppendOnly => SinkType::AppendOnly,
81 PbSinkType::ForceAppendOnly => SinkType::AppendOnly,
84 PbSinkType::Upsert => SinkType::Upsert,
85 PbSinkType::Retract => SinkType::Retract,
86 PbSinkType::Unspecified => unreachable!(),
87 }
88 }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Hash)]
95pub struct SinkFormatDesc {
96 pub format: SinkFormat,
97 pub encode: SinkEncode,
98 pub options: BTreeMap<String, String>,
99 pub secret_refs: BTreeMap<String, PbSecretRef>,
100 pub key_encode: Option<SinkEncode>,
101 pub connection_id: Option<ConnectionId>,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
106pub enum SinkFormat {
107 AppendOnly,
108 Upsert,
109 Debezium,
110}
111
112impl Display for SinkFormat {
113 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
114 write!(f, "{:?}", self)
115 }
116}
117
118#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
120pub enum SinkEncode {
121 Json,
122 Protobuf,
123 Avro,
124 Template,
125 Parquet,
126 Text,
127 Bytes,
128}
129
130impl Display for SinkEncode {
131 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
132 write!(f, "{:?}", self)
133 }
134}
135
136impl SinkFormatDesc {
137 pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
138 use crate::sink::Sink as _;
139 use crate::sink::kafka::KafkaSink;
140 use crate::sink::kinesis::KinesisSink;
141 use crate::sink::pulsar::PulsarSink;
142
143 let format = match r#type {
144 SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
145 SINK_TYPE_UPSERT => SinkFormat::Upsert,
146 SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
147 _ => {
148 return Err(SinkError::Config(anyhow!(
149 "sink type unsupported: {}",
150 r#type
151 )));
152 }
153 };
154 let encode = match connector {
155 KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
156 SinkEncode::Json
157 }
158 _ => return Ok(None),
159 };
160 Ok(Some(Self {
161 format,
162 encode,
163 options: Default::default(),
164 secret_refs: Default::default(),
165 key_encode: None,
166 connection_id: None,
167 }))
168 }
169
170 pub fn to_proto(&self) -> PbSinkFormatDesc {
171 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
172
173 let format = match self.format {
174 SinkFormat::AppendOnly => F::Plain,
175 SinkFormat::Upsert => F::Upsert,
176 SinkFormat::Debezium => F::Debezium,
177 };
178 let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
179 SinkEncode::Json => E::Json,
180 SinkEncode::Protobuf => E::Protobuf,
181 SinkEncode::Avro => E::Avro,
182 SinkEncode::Template => E::Template,
183 SinkEncode::Parquet => E::Parquet,
184 SinkEncode::Text => E::Text,
185 SinkEncode::Bytes => E::Bytes,
186 };
187
188 let encode = mapping_encode(&self.encode);
189 let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
190 let options = self
191 .options
192 .iter()
193 .map(|(k, v)| (k.clone(), v.clone()))
194 .collect();
195
196 PbSinkFormatDesc {
197 format: format.into(),
198 encode: encode.into(),
199 options,
200 key_encode,
201 secret_refs: self.secret_refs.clone(),
202 connection_id: self.connection_id,
203 }
204 }
205
206 pub fn plain_json_for_snowflake_only() -> Self {
209 Self {
210 format: SinkFormat::AppendOnly,
211 encode: SinkEncode::Json,
212 options: Default::default(),
213 secret_refs: Default::default(),
214 key_encode: None,
215 connection_id: None,
216 }
217 }
218}
219
220impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
221 type Error = SinkError;
222
223 fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
224 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
225
226 let format = match value.format() {
227 F::Plain => SinkFormat::AppendOnly,
228 F::Upsert => SinkFormat::Upsert,
229 F::Debezium => SinkFormat::Debezium,
230 f @ (F::Unspecified
231 | F::Native
232 | F::DebeziumMongo
233 | F::Maxwell
234 | F::Canal
235 | F::None) => {
236 return Err(SinkError::Config(anyhow!(
237 "sink format unsupported: {}",
238 f.as_str_name()
239 )));
240 }
241 };
242 let encode = match value.encode() {
243 E::Json => SinkEncode::Json,
244 E::Protobuf => SinkEncode::Protobuf,
245 E::Template => SinkEncode::Template,
246 E::Avro => SinkEncode::Avro,
247 E::Parquet => SinkEncode::Parquet,
248 E::Bytes => SinkEncode::Bytes,
249 e @ (E::Unspecified | E::Native | E::Csv | E::None | E::Text) => {
250 return Err(SinkError::Config(anyhow!(
251 "sink encode unsupported: {}",
252 e.as_str_name()
253 )));
254 }
255 };
256 let key_encode = match &value.key_encode() {
257 E::Bytes => Some(SinkEncode::Bytes),
258 E::Text => Some(SinkEncode::Text),
259 E::Unspecified => None,
260 encode @ (E::Avro
261 | E::Csv
262 | E::Json
263 | E::Protobuf
264 | E::Template
265 | E::Native
266 | E::Parquet
267 | E::None) => {
268 return Err(SinkError::Config(anyhow!(
269 "unsupported {} as sink key encode",
270 encode.as_str_name()
271 )));
272 }
273 };
274
275 Ok(Self {
276 format,
277 encode,
278 options: value.options,
279 key_encode,
280 secret_refs: value.secret_refs,
281 connection_id: value.connection_id,
282 })
283 }
284}
285
286#[derive(Clone, Debug)]
291pub struct SinkCatalog {
292 pub id: SinkId,
294
295 pub schema_id: SchemaId,
297
298 pub database_id: DatabaseId,
300
301 pub name: String,
303
304 pub definition: String,
306
307 columns: Vec<ColumnCatalog>,
309
310 pub plan_pk: Vec<ColumnOrder>,
312
313 pub downstream_pk: Option<Vec<usize>>,
315
316 pub distribution_key: Vec<usize>,
319
320 pub properties: BTreeMap<String, String>,
322
323 pub owner: UserId,
325
326 pub sink_type: SinkType,
330 pub ignore_delete: bool,
332
333 pub format_desc: Option<SinkFormatDesc>,
335
336 pub connection_id: Option<ConnectionId>,
338
339 pub created_at_epoch: Option<Epoch>,
340
341 pub initialized_at_epoch: Option<Epoch>,
342
343 pub db_name: String,
345
346 pub sink_from_name: String,
348 pub auto_refresh_schema_from_table: Option<TableId>,
349
350 pub target_table: Option<TableId>,
351
352 pub created_at_cluster_version: Option<String>,
353 pub initialized_at_cluster_version: Option<String>,
354 pub create_type: CreateType,
355
356 pub stream_job_status: StreamJobStatus,
359
360 pub secret_refs: BTreeMap<String, PbSecretRef>,
362
363 pub original_target_columns: Vec<ColumnCatalog>,
365}
366
367impl SinkCatalog {
368 pub fn to_proto(&self) -> PbSink {
369 PbSink {
370 id: self.id,
371 schema_id: self.schema_id,
372 database_id: self.database_id,
373 name: self.name.clone(),
374 definition: self.definition.clone(),
375 columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
376 plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
377 downstream_pk: (self.downstream_pk.as_ref())
378 .map_or_else(Vec::new, |pk| pk.iter().map(|idx| *idx as _).collect_vec()),
379 distribution_key: self
380 .distribution_key
381 .iter()
382 .map(|k| *k as i32)
383 .collect_vec(),
384 owner: self.owner.into(),
385 properties: self.properties.clone(),
386 sink_type: self.sink_type.to_proto() as i32,
387 raw_ignore_delete: self.ignore_delete,
388 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
389 connection_id: self.connection_id,
390 initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
391 created_at_epoch: self.created_at_epoch.map(|e| e.0),
392 db_name: self.db_name.clone(),
393 sink_from_name: self.sink_from_name.clone(),
394 stream_job_status: self.stream_job_status.to_proto().into(),
395 target_table: self.target_table,
396 created_at_cluster_version: self.created_at_cluster_version.clone(),
397 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
398 create_type: self.create_type.to_proto() as i32,
399 secret_refs: self.secret_refs.clone(),
400 original_target_columns: self
401 .original_target_columns
402 .iter()
403 .map(|c| c.to_protobuf())
404 .collect_vec(),
405 auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
406 }
407 }
408
409 pub fn create_sql(&self) -> String {
411 self.definition.clone()
412 }
413
414 pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
415 self.columns.iter().filter(|c| !c.is_hidden)
416 }
417
418 pub fn full_columns(&self) -> &[ColumnCatalog] {
419 &self.columns
420 }
421
422 pub fn full_schema(&self) -> Schema {
423 let fields = self
424 .full_columns()
425 .iter()
426 .map(|column| Field::from(column.column_desc.clone()))
427 .collect_vec();
428 Schema { fields }
429 }
430
431 pub fn visible_schema(&self) -> Schema {
432 let fields = self
433 .visible_columns()
434 .map(|column| Field::from(column.column_desc.clone()))
435 .collect_vec();
436 Schema { fields }
437 }
438
439 pub fn unique_identity(&self) -> String {
440 self.to_proto().unique_identity()
442 }
443
444 pub fn is_created(&self) -> bool {
445 self.stream_job_status == StreamJobStatus::Created
446 }
447}
448
449impl From<PbSink> for SinkCatalog {
450 fn from(pb: PbSink) -> Self {
451 let sink_type = pb.get_sink_type().unwrap();
452 let ignore_delete = pb.ignore_delete();
453 let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
454 let stream_job_status = pb
455 .get_stream_job_status()
456 .unwrap_or(PbStreamJobStatus::Created);
457 let format_desc = match pb.format_desc {
458 Some(f) => f.try_into().ok(),
459 None => {
460 let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
461 let r#type = pb.properties.get(SINK_TYPE_OPTION);
462 match (connector, r#type) {
463 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
464 _ => None,
465 }
466 }
467 };
468 SinkCatalog {
469 id: pb.id,
470 name: pb.name,
471 schema_id: pb.schema_id,
472 database_id: pb.database_id,
473 definition: pb.definition,
474 columns: pb
475 .columns
476 .into_iter()
477 .map(ColumnCatalog::from)
478 .collect_vec(),
479 plan_pk: pb
480 .plan_pk
481 .iter()
482 .map(ColumnOrder::from_protobuf)
483 .collect_vec(),
484 downstream_pk: if pb.downstream_pk.is_empty() {
485 None
486 } else {
487 Some(
488 (pb.downstream_pk.into_iter())
489 .map(|idx| idx as usize)
490 .collect_vec(),
491 )
492 },
493 distribution_key: pb
494 .distribution_key
495 .into_iter()
496 .map(|k| k as _)
497 .collect_vec(),
498 properties: pb.properties,
499 owner: pb.owner.into(),
500 sink_type: SinkType::from_proto(sink_type),
501 ignore_delete,
502 format_desc,
503 connection_id: pb.connection_id,
504 created_at_epoch: pb.created_at_epoch.map(Epoch::from),
505 initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
506 db_name: pb.db_name,
507 sink_from_name: pb.sink_from_name,
508 auto_refresh_schema_from_table: pb.auto_refresh_schema_from_table,
509 target_table: pb.target_table,
510 initialized_at_cluster_version: pb.initialized_at_cluster_version,
511 created_at_cluster_version: pb.created_at_cluster_version,
512 create_type: CreateType::from_proto(create_type),
513 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
514 secret_refs: pb.secret_refs,
515 original_target_columns: pb
516 .original_target_columns
517 .into_iter()
518 .map(ColumnCatalog::from)
519 .collect_vec(),
520 }
521 }
522}
523
524impl From<&PbSink> for SinkCatalog {
525 fn from(pb: &PbSink) -> Self {
526 pb.clone().into()
527 }
528}