1pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23 ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, Schema, SchemaId, StreamJobStatus,
24 TableId, UserId,
25};
26pub use risingwave_common::id::SinkId;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::sort_util::ColumnOrder;
29use risingwave_pb::catalog::{
30 PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
31};
32use risingwave_pb::secret::PbSecretRef;
33use serde::Serialize;
34
35use super::{
36 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
37 SINK_TYPE_UPSERT, SinkError,
38};
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
41pub enum SinkType {
42 AppendOnly,
45 ForceAppendOnly,
48 Upsert,
51 Retract,
56}
57
58impl SinkType {
59 pub fn is_append_only(self) -> bool {
61 self == Self::AppendOnly || self == Self::ForceAppendOnly
62 }
63
64 pub fn type_str(self) -> &'static str {
66 match self {
67 SinkType::AppendOnly | SinkType::ForceAppendOnly => "append-only",
68 SinkType::Upsert => "upsert",
69 SinkType::Retract => "retract",
70 }
71 }
72
73 pub fn to_proto(self) -> PbSinkType {
74 match self {
75 SinkType::AppendOnly => PbSinkType::AppendOnly,
76 SinkType::ForceAppendOnly => PbSinkType::ForceAppendOnly,
77 SinkType::Upsert => PbSinkType::Upsert,
78 SinkType::Retract => PbSinkType::Retract,
79 }
80 }
81
82 pub fn from_proto(pb: PbSinkType) -> Self {
83 match pb {
84 PbSinkType::AppendOnly => SinkType::AppendOnly,
85 PbSinkType::ForceAppendOnly => SinkType::ForceAppendOnly,
86 PbSinkType::Upsert => SinkType::Upsert,
87 PbSinkType::Retract => SinkType::Retract,
88 PbSinkType::Unspecified => unreachable!(),
89 }
90 }
91}
92
93#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97pub struct SinkFormatDesc {
98 pub format: SinkFormat,
99 pub encode: SinkEncode,
100 pub options: BTreeMap<String, String>,
101 pub secret_refs: BTreeMap<String, PbSecretRef>,
102 pub key_encode: Option<SinkEncode>,
103 pub connection_id: Option<ConnectionId>,
104}
105
106#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
108pub enum SinkFormat {
109 AppendOnly,
110 Upsert,
111 Debezium,
112}
113
114impl Display for SinkFormat {
115 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116 write!(f, "{:?}", self)
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
122pub enum SinkEncode {
123 Json,
124 Protobuf,
125 Avro,
126 Template,
127 Parquet,
128 Text,
129 Bytes,
130}
131
132impl Display for SinkEncode {
133 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
134 write!(f, "{:?}", self)
135 }
136}
137
138impl SinkFormatDesc {
139 pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
140 use crate::sink::Sink as _;
141 use crate::sink::kafka::KafkaSink;
142 use crate::sink::kinesis::KinesisSink;
143 use crate::sink::pulsar::PulsarSink;
144
145 let format = match r#type {
146 SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
147 SINK_TYPE_UPSERT => SinkFormat::Upsert,
148 SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
149 _ => {
150 return Err(SinkError::Config(anyhow!(
151 "sink type unsupported: {}",
152 r#type
153 )));
154 }
155 };
156 let encode = match connector {
157 KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
158 SinkEncode::Json
159 }
160 _ => return Ok(None),
161 };
162 Ok(Some(Self {
163 format,
164 encode,
165 options: Default::default(),
166 secret_refs: Default::default(),
167 key_encode: None,
168 connection_id: None,
169 }))
170 }
171
172 pub fn to_proto(&self) -> PbSinkFormatDesc {
173 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
174
175 let format = match self.format {
176 SinkFormat::AppendOnly => F::Plain,
177 SinkFormat::Upsert => F::Upsert,
178 SinkFormat::Debezium => F::Debezium,
179 };
180 let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
181 SinkEncode::Json => E::Json,
182 SinkEncode::Protobuf => E::Protobuf,
183 SinkEncode::Avro => E::Avro,
184 SinkEncode::Template => E::Template,
185 SinkEncode::Parquet => E::Parquet,
186 SinkEncode::Text => E::Text,
187 SinkEncode::Bytes => E::Bytes,
188 };
189
190 let encode = mapping_encode(&self.encode);
191 let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
192 let options = self
193 .options
194 .iter()
195 .map(|(k, v)| (k.clone(), v.clone()))
196 .collect();
197
198 PbSinkFormatDesc {
199 format: format.into(),
200 encode: encode.into(),
201 options,
202 key_encode,
203 secret_refs: self.secret_refs.clone(),
204 connection_id: self.connection_id,
205 }
206 }
207
208 pub fn plain_json_for_snowflake_only() -> Self {
211 Self {
212 format: SinkFormat::AppendOnly,
213 encode: SinkEncode::Json,
214 options: Default::default(),
215 secret_refs: Default::default(),
216 key_encode: None,
217 connection_id: None,
218 }
219 }
220}
221
222impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
223 type Error = SinkError;
224
225 fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
226 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
227
228 let format = match value.format() {
229 F::Plain => SinkFormat::AppendOnly,
230 F::Upsert => SinkFormat::Upsert,
231 F::Debezium => SinkFormat::Debezium,
232 f @ (F::Unspecified
233 | F::Native
234 | F::DebeziumMongo
235 | F::Maxwell
236 | F::Canal
237 | F::None) => {
238 return Err(SinkError::Config(anyhow!(
239 "sink format unsupported: {}",
240 f.as_str_name()
241 )));
242 }
243 };
244 let encode = match value.encode() {
245 E::Json => SinkEncode::Json,
246 E::Protobuf => SinkEncode::Protobuf,
247 E::Template => SinkEncode::Template,
248 E::Avro => SinkEncode::Avro,
249 E::Parquet => SinkEncode::Parquet,
250 E::Bytes => SinkEncode::Bytes,
251 e @ (E::Unspecified | E::Native | E::Csv | E::None | E::Text) => {
252 return Err(SinkError::Config(anyhow!(
253 "sink encode unsupported: {}",
254 e.as_str_name()
255 )));
256 }
257 };
258 let key_encode = match &value.key_encode() {
259 E::Bytes => Some(SinkEncode::Bytes),
260 E::Text => Some(SinkEncode::Text),
261 E::Unspecified => None,
262 encode @ (E::Avro
263 | E::Csv
264 | E::Json
265 | E::Protobuf
266 | E::Template
267 | E::Native
268 | E::Parquet
269 | E::None) => {
270 return Err(SinkError::Config(anyhow!(
271 "unsupported {} as sink key encode",
272 encode.as_str_name()
273 )));
274 }
275 };
276
277 Ok(Self {
278 format,
279 encode,
280 options: value.options,
281 key_encode,
282 secret_refs: value.secret_refs,
283 connection_id: value.connection_id,
284 })
285 }
286}
287
288#[derive(Clone, Debug)]
293pub struct SinkCatalog {
294 pub id: SinkId,
296
297 pub schema_id: SchemaId,
299
300 pub database_id: DatabaseId,
302
303 pub name: String,
305
306 pub definition: String,
308
309 columns: Vec<ColumnCatalog>,
311
312 pub plan_pk: Vec<ColumnOrder>,
314
315 pub downstream_pk: Option<Vec<usize>>,
317
318 pub distribution_key: Vec<usize>,
321
322 pub properties: BTreeMap<String, String>,
324
325 pub owner: UserId,
327
328 pub sink_type: SinkType,
332
333 pub format_desc: Option<SinkFormatDesc>,
335
336 pub connection_id: Option<ConnectionId>,
338
339 pub created_at_epoch: Option<Epoch>,
340
341 pub initialized_at_epoch: Option<Epoch>,
342
343 pub db_name: String,
345
346 pub sink_from_name: String,
348 pub auto_refresh_schema_from_table: Option<TableId>,
349
350 pub target_table: Option<TableId>,
351
352 pub created_at_cluster_version: Option<String>,
353 pub initialized_at_cluster_version: Option<String>,
354 pub create_type: CreateType,
355
356 pub stream_job_status: StreamJobStatus,
359
360 pub secret_refs: BTreeMap<String, PbSecretRef>,
362
363 pub original_target_columns: Vec<ColumnCatalog>,
365}
366
367impl SinkCatalog {
368 pub fn to_proto(&self) -> PbSink {
369 PbSink {
370 id: self.id,
371 schema_id: self.schema_id,
372 database_id: self.database_id,
373 name: self.name.clone(),
374 definition: self.definition.clone(),
375 columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
376 plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
377 downstream_pk: (self.downstream_pk.as_ref())
378 .map_or_else(Vec::new, |pk| pk.iter().map(|idx| *idx as _).collect_vec()),
379 distribution_key: self
380 .distribution_key
381 .iter()
382 .map(|k| *k as i32)
383 .collect_vec(),
384 owner: self.owner.into(),
385 properties: self.properties.clone(),
386 sink_type: self.sink_type.to_proto() as i32,
387 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
388 connection_id: self.connection_id,
389 initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
390 created_at_epoch: self.created_at_epoch.map(|e| e.0),
391 db_name: self.db_name.clone(),
392 sink_from_name: self.sink_from_name.clone(),
393 stream_job_status: self.stream_job_status.to_proto().into(),
394 target_table: self.target_table,
395 created_at_cluster_version: self.created_at_cluster_version.clone(),
396 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
397 create_type: self.create_type.to_proto() as i32,
398 secret_refs: self.secret_refs.clone(),
399 original_target_columns: self
400 .original_target_columns
401 .iter()
402 .map(|c| c.to_protobuf())
403 .collect_vec(),
404 auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
405 }
406 }
407
408 pub fn create_sql(&self) -> String {
410 self.definition.clone()
411 }
412
413 pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
414 self.columns.iter().filter(|c| !c.is_hidden)
415 }
416
417 pub fn full_columns(&self) -> &[ColumnCatalog] {
418 &self.columns
419 }
420
421 pub fn full_schema(&self) -> Schema {
422 let fields = self
423 .full_columns()
424 .iter()
425 .map(|column| Field::from(column.column_desc.clone()))
426 .collect_vec();
427 Schema { fields }
428 }
429
430 pub fn visible_schema(&self) -> Schema {
431 let fields = self
432 .visible_columns()
433 .map(|column| Field::from(column.column_desc.clone()))
434 .collect_vec();
435 Schema { fields }
436 }
437
438 pub fn unique_identity(&self) -> String {
439 self.to_proto().unique_identity()
441 }
442
443 pub fn is_created(&self) -> bool {
444 self.stream_job_status == StreamJobStatus::Created
445 }
446}
447
448impl From<PbSink> for SinkCatalog {
449 fn from(pb: PbSink) -> Self {
450 let sink_type = pb.get_sink_type().unwrap();
451 let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
452 let stream_job_status = pb
453 .get_stream_job_status()
454 .unwrap_or(PbStreamJobStatus::Created);
455 let format_desc = match pb.format_desc {
456 Some(f) => f.try_into().ok(),
457 None => {
458 let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
459 let r#type = pb.properties.get(SINK_TYPE_OPTION);
460 match (connector, r#type) {
461 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
462 _ => None,
463 }
464 }
465 };
466 SinkCatalog {
467 id: pb.id,
468 name: pb.name,
469 schema_id: pb.schema_id,
470 database_id: pb.database_id,
471 definition: pb.definition,
472 columns: pb
473 .columns
474 .into_iter()
475 .map(ColumnCatalog::from)
476 .collect_vec(),
477 plan_pk: pb
478 .plan_pk
479 .iter()
480 .map(ColumnOrder::from_protobuf)
481 .collect_vec(),
482 downstream_pk: if pb.downstream_pk.is_empty() {
483 None
484 } else {
485 Some(
486 (pb.downstream_pk.into_iter())
487 .map(|idx| idx as usize)
488 .collect_vec(),
489 )
490 },
491 distribution_key: pb
492 .distribution_key
493 .into_iter()
494 .map(|k| k as _)
495 .collect_vec(),
496 properties: pb.properties,
497 owner: pb.owner.into(),
498 sink_type: SinkType::from_proto(sink_type),
499 format_desc,
500 connection_id: pb.connection_id,
501 created_at_epoch: pb.created_at_epoch.map(Epoch::from),
502 initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
503 db_name: pb.db_name,
504 sink_from_name: pb.sink_from_name,
505 auto_refresh_schema_from_table: pb.auto_refresh_schema_from_table,
506 target_table: pb.target_table,
507 initialized_at_cluster_version: pb.initialized_at_cluster_version,
508 created_at_cluster_version: pb.created_at_cluster_version,
509 create_type: CreateType::from_proto(create_type),
510 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
511 secret_refs: pb.secret_refs,
512 original_target_columns: pb
513 .original_target_columns
514 .into_iter()
515 .map(ColumnCatalog::from)
516 .collect_vec(),
517 }
518 }
519}
520
521impl From<&PbSink> for SinkCatalog {
522 fn from(pb: &PbSink) -> Self {
523 pb.clone().into()
524 }
525}