1pub mod desc;
16
17use std::collections::BTreeMap;
18use std::fmt::{Display, Formatter};
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use risingwave_common::catalog::{
23 ColumnCatalog, ConnectionId, CreateType, DatabaseId, Field, Schema, SchemaId, StreamJobStatus,
24 TableId, UserId,
25};
26pub use risingwave_common::id::SinkId;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_common::util::sort_util::ColumnOrder;
29use risingwave_pb::catalog::{
30 PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
31};
32use risingwave_pb::secret::PbSecretRef;
33use serde::Serialize;
34
35use super::{
36 CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
37 SINK_TYPE_UPSERT, SinkError,
38};
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
41pub enum SinkType {
42 AppendOnly,
45 ForceAppendOnly,
48 Upsert,
51 Retract,
56}
57
58impl SinkType {
59 pub fn is_append_only(self) -> bool {
61 self == Self::AppendOnly || self == Self::ForceAppendOnly
62 }
63
64 pub fn type_str(self) -> &'static str {
66 match self {
67 SinkType::AppendOnly | SinkType::ForceAppendOnly => "append-only",
68 SinkType::Upsert => "upsert",
69 SinkType::Retract => "retract",
70 }
71 }
72
73 pub fn to_proto(self) -> PbSinkType {
74 match self {
75 SinkType::AppendOnly => PbSinkType::AppendOnly,
76 SinkType::ForceAppendOnly => PbSinkType::ForceAppendOnly,
77 SinkType::Upsert => PbSinkType::Upsert,
78 SinkType::Retract => PbSinkType::Retract,
79 }
80 }
81
82 pub fn from_proto(pb: PbSinkType) -> Self {
83 match pb {
84 PbSinkType::AppendOnly => SinkType::AppendOnly,
85 PbSinkType::ForceAppendOnly => SinkType::ForceAppendOnly,
86 PbSinkType::Upsert => SinkType::Upsert,
87 PbSinkType::Retract => SinkType::Retract,
88 PbSinkType::Unspecified => unreachable!(),
89 }
90 }
91}
92
93#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97pub struct SinkFormatDesc {
98 pub format: SinkFormat,
99 pub encode: SinkEncode,
100 pub options: BTreeMap<String, String>,
101 pub secret_refs: BTreeMap<String, PbSecretRef>,
102 pub key_encode: Option<SinkEncode>,
103 pub connection_id: Option<ConnectionId>,
104}
105
106#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
108pub enum SinkFormat {
109 AppendOnly,
110 Upsert,
111 Debezium,
112}
113
114impl Display for SinkFormat {
115 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116 write!(f, "{:?}", self)
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
122pub enum SinkEncode {
123 Json,
124 Protobuf,
125 Avro,
126 Template,
127 Parquet,
128 Text,
129 Bytes,
130}
131
132impl Display for SinkEncode {
133 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
134 write!(f, "{:?}", self)
135 }
136}
137
138impl SinkFormatDesc {
139 pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
140 use crate::sink::Sink as _;
141 use crate::sink::kafka::KafkaSink;
142 use crate::sink::kinesis::KinesisSink;
143 use crate::sink::pulsar::PulsarSink;
144
145 let format = match r#type {
146 SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
147 SINK_TYPE_UPSERT => SinkFormat::Upsert,
148 SINK_TYPE_DEBEZIUM => SinkFormat::Debezium,
149 _ => {
150 return Err(SinkError::Config(anyhow!(
151 "sink type unsupported: {}",
152 r#type
153 )));
154 }
155 };
156 let encode = match connector {
157 KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
158 SinkEncode::Json
159 }
160 _ => return Ok(None),
161 };
162 Ok(Some(Self {
163 format,
164 encode,
165 options: Default::default(),
166 secret_refs: Default::default(),
167 key_encode: None,
168 connection_id: None,
169 }))
170 }
171
172 pub fn to_proto(&self) -> PbSinkFormatDesc {
173 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
174
175 let format = match self.format {
176 SinkFormat::AppendOnly => F::Plain,
177 SinkFormat::Upsert => F::Upsert,
178 SinkFormat::Debezium => F::Debezium,
179 };
180 let mapping_encode = |sink_encode: &SinkEncode| match sink_encode {
181 SinkEncode::Json => E::Json,
182 SinkEncode::Protobuf => E::Protobuf,
183 SinkEncode::Avro => E::Avro,
184 SinkEncode::Template => E::Template,
185 SinkEncode::Parquet => E::Parquet,
186 SinkEncode::Text => E::Text,
187 SinkEncode::Bytes => E::Bytes,
188 };
189
190 let encode = mapping_encode(&self.encode);
191 let key_encode = self.key_encode.as_ref().map(|e| mapping_encode(e).into());
192 let options = self
193 .options
194 .iter()
195 .map(|(k, v)| (k.clone(), v.clone()))
196 .collect();
197
198 PbSinkFormatDesc {
199 format: format.into(),
200 encode: encode.into(),
201 options,
202 key_encode,
203 secret_refs: self.secret_refs.clone(),
204 connection_id: self.connection_id,
205 }
206 }
207
208 pub fn plain_json_for_snowflake_only() -> Self {
211 Self {
212 format: SinkFormat::AppendOnly,
213 encode: SinkEncode::Json,
214 options: Default::default(),
215 secret_refs: Default::default(),
216 key_encode: None,
217 connection_id: None,
218 }
219 }
220}
221
222impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
223 type Error = SinkError;
224
225 fn try_from(value: PbSinkFormatDesc) -> Result<Self, Self::Error> {
226 use risingwave_pb::plan_common::{EncodeType as E, FormatType as F};
227
228 let format = match value.format() {
229 F::Plain => SinkFormat::AppendOnly,
230 F::Upsert => SinkFormat::Upsert,
231 F::Debezium => SinkFormat::Debezium,
232 f @ (F::Unspecified
233 | F::Native
234 | F::DebeziumMongo
235 | F::Maxwell
236 | F::Canal
237 | F::None) => {
238 return Err(SinkError::Config(anyhow!(
239 "sink format unsupported: {}",
240 f.as_str_name()
241 )));
242 }
243 };
244 let encode = match value.encode() {
245 E::Json => SinkEncode::Json,
246 E::Protobuf => SinkEncode::Protobuf,
247 E::Template => SinkEncode::Template,
248 E::Avro => SinkEncode::Avro,
249 E::Parquet => SinkEncode::Parquet,
250 e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
251 return Err(SinkError::Config(anyhow!(
252 "sink encode unsupported: {}",
253 e.as_str_name()
254 )));
255 }
256 };
257 let key_encode = match &value.key_encode() {
258 E::Bytes => Some(SinkEncode::Bytes),
259 E::Text => Some(SinkEncode::Text),
260 E::Unspecified => None,
261 encode @ (E::Avro
262 | E::Csv
263 | E::Json
264 | E::Protobuf
265 | E::Template
266 | E::Native
267 | E::Parquet
268 | E::None) => {
269 return Err(SinkError::Config(anyhow!(
270 "unsupported {} as sink key encode",
271 encode.as_str_name()
272 )));
273 }
274 };
275
276 Ok(Self {
277 format,
278 encode,
279 options: value.options,
280 key_encode,
281 secret_refs: value.secret_refs,
282 connection_id: value.connection_id,
283 })
284 }
285}
286
287#[derive(Clone, Debug)]
292pub struct SinkCatalog {
293 pub id: SinkId,
295
296 pub schema_id: SchemaId,
298
299 pub database_id: DatabaseId,
301
302 pub name: String,
304
305 pub definition: String,
307
308 columns: Vec<ColumnCatalog>,
310
311 pub plan_pk: Vec<ColumnOrder>,
313
314 pub downstream_pk: Option<Vec<usize>>,
316
317 pub distribution_key: Vec<usize>,
320
321 pub properties: BTreeMap<String, String>,
323
324 pub owner: UserId,
326
327 pub sink_type: SinkType,
331
332 pub format_desc: Option<SinkFormatDesc>,
334
335 pub connection_id: Option<ConnectionId>,
337
338 pub created_at_epoch: Option<Epoch>,
339
340 pub initialized_at_epoch: Option<Epoch>,
341
342 pub db_name: String,
344
345 pub sink_from_name: String,
347 pub auto_refresh_schema_from_table: Option<TableId>,
348
349 pub target_table: Option<TableId>,
350
351 pub created_at_cluster_version: Option<String>,
352 pub initialized_at_cluster_version: Option<String>,
353 pub create_type: CreateType,
354
355 pub stream_job_status: StreamJobStatus,
358
359 pub secret_refs: BTreeMap<String, PbSecretRef>,
361
362 pub original_target_columns: Vec<ColumnCatalog>,
364}
365
366impl SinkCatalog {
367 pub fn to_proto(&self) -> PbSink {
368 PbSink {
369 id: self.id,
370 schema_id: self.schema_id,
371 database_id: self.database_id,
372 name: self.name.clone(),
373 definition: self.definition.clone(),
374 columns: self.columns.iter().map(|c| c.to_protobuf()).collect_vec(),
375 plan_pk: self.plan_pk.iter().map(|o| o.to_protobuf()).collect(),
376 downstream_pk: (self.downstream_pk.as_ref())
377 .map_or_else(Vec::new, |pk| pk.iter().map(|idx| *idx as _).collect_vec()),
378 distribution_key: self
379 .distribution_key
380 .iter()
381 .map(|k| *k as i32)
382 .collect_vec(),
383 owner: self.owner.into(),
384 properties: self.properties.clone(),
385 sink_type: self.sink_type.to_proto() as i32,
386 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
387 connection_id: self.connection_id,
388 initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
389 created_at_epoch: self.created_at_epoch.map(|e| e.0),
390 db_name: self.db_name.clone(),
391 sink_from_name: self.sink_from_name.clone(),
392 stream_job_status: self.stream_job_status.to_proto().into(),
393 target_table: self.target_table,
394 created_at_cluster_version: self.created_at_cluster_version.clone(),
395 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
396 create_type: self.create_type.to_proto() as i32,
397 secret_refs: self.secret_refs.clone(),
398 original_target_columns: self
399 .original_target_columns
400 .iter()
401 .map(|c| c.to_protobuf())
402 .collect_vec(),
403 auto_refresh_schema_from_table: self.auto_refresh_schema_from_table,
404 }
405 }
406
407 pub fn create_sql(&self) -> String {
409 self.definition.clone()
410 }
411
412 pub fn visible_columns(&self) -> impl Iterator<Item = &ColumnCatalog> {
413 self.columns.iter().filter(|c| !c.is_hidden)
414 }
415
416 pub fn full_columns(&self) -> &[ColumnCatalog] {
417 &self.columns
418 }
419
420 pub fn full_schema(&self) -> Schema {
421 let fields = self
422 .full_columns()
423 .iter()
424 .map(|column| Field::from(column.column_desc.clone()))
425 .collect_vec();
426 Schema { fields }
427 }
428
429 pub fn visible_schema(&self) -> Schema {
430 let fields = self
431 .visible_columns()
432 .map(|column| Field::from(column.column_desc.clone()))
433 .collect_vec();
434 Schema { fields }
435 }
436
437 pub fn unique_identity(&self) -> String {
438 self.to_proto().unique_identity()
440 }
441
442 pub fn is_created(&self) -> bool {
443 self.stream_job_status == StreamJobStatus::Created
444 }
445}
446
447impl From<PbSink> for SinkCatalog {
448 fn from(pb: PbSink) -> Self {
449 let sink_type = pb.get_sink_type().unwrap();
450 let create_type = pb.get_create_type().unwrap_or(PbCreateType::Foreground);
451 let stream_job_status = pb
452 .get_stream_job_status()
453 .unwrap_or(PbStreamJobStatus::Created);
454 let format_desc = match pb.format_desc {
455 Some(f) => f.try_into().ok(),
456 None => {
457 let connector = pb.properties.get(CONNECTOR_TYPE_KEY);
458 let r#type = pb.properties.get(SINK_TYPE_OPTION);
459 match (connector, r#type) {
460 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
461 _ => None,
462 }
463 }
464 };
465 SinkCatalog {
466 id: pb.id,
467 name: pb.name,
468 schema_id: pb.schema_id,
469 database_id: pb.database_id,
470 definition: pb.definition,
471 columns: pb
472 .columns
473 .into_iter()
474 .map(ColumnCatalog::from)
475 .collect_vec(),
476 plan_pk: pb
477 .plan_pk
478 .iter()
479 .map(ColumnOrder::from_protobuf)
480 .collect_vec(),
481 downstream_pk: if pb.downstream_pk.is_empty() {
482 None
483 } else {
484 Some(
485 (pb.downstream_pk.into_iter())
486 .map(|idx| idx as usize)
487 .collect_vec(),
488 )
489 },
490 distribution_key: pb
491 .distribution_key
492 .into_iter()
493 .map(|k| k as _)
494 .collect_vec(),
495 properties: pb.properties,
496 owner: pb.owner.into(),
497 sink_type: SinkType::from_proto(sink_type),
498 format_desc,
499 connection_id: pb.connection_id,
500 created_at_epoch: pb.created_at_epoch.map(Epoch::from),
501 initialized_at_epoch: pb.initialized_at_epoch.map(Epoch::from),
502 db_name: pb.db_name,
503 sink_from_name: pb.sink_from_name,
504 auto_refresh_schema_from_table: pb.auto_refresh_schema_from_table,
505 target_table: pb.target_table,
506 initialized_at_cluster_version: pb.initialized_at_cluster_version,
507 created_at_cluster_version: pb.created_at_cluster_version,
508 create_type: CreateType::from_proto(create_type),
509 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
510 secret_refs: pb.secret_refs,
511 original_target_columns: pb
512 .original_target_columns
513 .into_iter()
514 .map(ColumnCatalog::from)
515 .collect_vec(),
516 }
517 }
518}
519
520impl From<&PbSink> for SinkCatalog {
521 fn from(pb: &PbSink) -> Self {
522 pb.clone().into()
523 }
524}