1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use itertools::Itertools;
20use maplit::{convert_args, hashmap, hashset};
21use pgwire::pg_response::{PgResponse, StatementType};
22use risingwave_common::bail;
23use risingwave_common::catalog::{
24 ColumnCatalog, ConnectionId, DatabaseId, ObjectId, Schema, SchemaId, UserId,
25};
26use risingwave_common::secret::LocalSecretManager;
27use risingwave_common::types::DataType;
28use risingwave_connector::WithPropertiesExt;
29use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
30use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
31use risingwave_connector::sink::kafka::KAFKA_SINK;
32use risingwave_connector::sink::{
33 CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL,
34};
35use risingwave_pb::catalog::connection_params::PbConnectionType;
36use risingwave_pb::catalog::{PbSink, PbSource, Table};
37use risingwave_pb::ddl_service::{ReplaceJobPlan, TableJobType, replace_job_plan};
38use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
39use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode};
40use risingwave_pb::telemetry::TelemetryDatabaseObject;
41use risingwave_sqlparser::ast::{
42 CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
43 Query, Statement,
44};
45
46use super::RwPgResponse;
47use super::create_mv::get_column_names;
48use super::create_source::{SqlColumnStrategy, UPSTREAM_SOURCE_KEY};
49use super::util::gen_query_from_table_name;
50use crate::binder::Binder;
51use crate::catalog::SinkId;
52use crate::error::{ErrorCode, Result, RwError};
53use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
54use crate::handler::HandlerArgs;
55use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
56use crate::handler::create_mv::parse_column_names;
57use crate::handler::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
58use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
59use crate::optimizer::plan_node::{LogicalSource, PartitionComputeInfo, StreamProject, generic};
60use crate::optimizer::{OptimizerContext, PlanRef, RelationCollectorVisitor};
61use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
62use crate::session::SessionImpl;
63use crate::session::current::notice_to_user;
64use crate::stream_fragmenter::build_graph;
65use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
66use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
67
68static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
69 LazyLock::new(|| {
70 hashset! {
71 PbConnectionType::Unspecified,
72 PbConnectionType::Kafka,
73 PbConnectionType::Iceberg,
74 PbConnectionType::Elasticsearch,
75 }
76 });
77
78static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
79 LazyLock::new(|| {
80 hashset! {
81 PbConnectionType::Unspecified,
82 PbConnectionType::SchemaRegistry,
83 }
84 });
85
86pub struct SinkPlanContext {
88 pub query: Box<Query>,
89 pub sink_plan: PlanRef,
90 pub sink_catalog: SinkCatalog,
91 pub target_table_catalog: Option<Arc<TableCatalog>>,
92 pub dependencies: HashSet<ObjectId>,
93}
94
95pub async fn gen_sink_plan(
96 handler_args: HandlerArgs,
97 stmt: CreateSinkStatement,
98 explain_options: Option<ExplainOptions>,
99) -> Result<SinkPlanContext> {
100 let session = handler_args.session.clone();
101 let session = session.as_ref();
102 let user_specified_columns = !stmt.columns.is_empty();
103 let db_name = &session.database();
104 let (sink_schema_name, sink_table_name) =
105 Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?;
106
107 let mut with_options = handler_args.with_options.clone();
108
109 resolve_privatelink_in_with_option(&mut with_options)?;
110 let (mut resolved_with_options, connection_type, connector_conn_ref) =
111 resolve_connection_ref_and_secret_ref(
112 with_options,
113 session,
114 TelemetryDatabaseObject::Sink,
115 )?;
116 ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
117
118 if !matches!(connection_type, PbConnectionType::Unspecified) {
120 let connector = resolved_with_options.get_connector().unwrap();
121 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
122 }
123
124 let partition_info = get_partition_compute_info(&resolved_with_options).await?;
125
126 let context = if let Some(explain_options) = explain_options {
127 OptimizerContext::new(handler_args.clone(), explain_options)
128 } else {
129 OptimizerContext::from_handler_args(handler_args.clone())
130 };
131
132 let sink_from_table_name;
134 let direct_sink;
137 let query = match stmt.sink_from {
138 CreateSink::From(from_name) => {
139 sink_from_table_name = from_name.0.last().unwrap().real_value();
140 direct_sink = true;
141 Box::new(gen_query_from_table_name(from_name))
142 }
143 CreateSink::AsQuery(query) => {
144 sink_from_table_name = sink_table_name.clone();
145 direct_sink = false;
146 query
147 }
148 };
149
150 let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
151
152 let (sink_database_id, sink_schema_id) =
153 session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
154
155 let (dependent_relations, dependent_udfs, bound) = {
156 let mut binder = Binder::new_for_stream(session);
157 let bound = binder.bind_query(*query.clone())?;
158 (
159 binder.included_relations().clone(),
160 binder.included_udfs().clone(),
161 bound,
162 )
163 };
164
165 let col_names = if sink_into_table_name.is_some() {
166 parse_column_names(&stmt.columns)
167 } else {
168 get_column_names(&bound, stmt.columns)?
170 };
171
172 if sink_into_table_name.is_some() {
173 let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
174
175 if prev.is_some() {
176 return Err(RwError::from(ErrorCode::BindError(
177 "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
178 )));
179 }
180 }
181
182 let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
183 if emit_on_window_close {
184 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
185 }
186
187 let connector = resolved_with_options
188 .get(CONNECTOR_TYPE_KEY)
189 .cloned()
190 .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
191
192 let format_desc = match stmt.sink_schema {
193 Some(f) => {
195 validate_compatibility(&connector, &f)?;
196 Some(bind_sink_format_desc(session,f)?)
197 }
198 None => match resolved_with_options.get(SINK_TYPE_OPTION) {
199 Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
201 session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
202 if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
203 f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
204 }
205 f
206 }),
207 None => None,
209 },
210 };
211
212 let definition = context.normalized_sql().to_owned();
213 let mut plan_root = Planner::new_for_stream(context.into()).plan_query(bound)?;
214 if let Some(col_names) = &col_names {
215 plan_root.set_out_names(col_names.clone())?;
216 };
217
218 let without_backfill = match resolved_with_options.remove(SINK_WITHOUT_BACKFILL) {
219 Some(flag) if flag.eq_ignore_ascii_case("false") => {
220 if direct_sink {
221 true
222 } else {
223 return Err(ErrorCode::BindError(
224 "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
225 )
226 .into());
227 }
228 }
229 _ => false,
230 };
231
232 let target_table_catalog = stmt
233 .into_table_name
234 .as_ref()
235 .map(|table_name| fetch_table_catalog_for_alter(session, table_name))
236 .transpose()?;
237
238 if let Some(target_table_catalog) = &target_table_catalog {
239 if let Some(col_names) = col_names {
240 let target_table_columns = target_table_catalog
241 .columns()
242 .iter()
243 .map(|c| c.name())
244 .collect::<BTreeSet<_>>();
245 for c in col_names {
246 if !target_table_columns.contains(c.as_str()) {
247 return Err(RwError::from(ErrorCode::BindError(format!(
248 "Column {} not found in table {}",
249 c,
250 target_table_catalog.name()
251 ))));
252 }
253 }
254 }
255 if target_table_catalog
256 .columns()
257 .iter()
258 .any(|col| !col.nullable())
259 {
260 notice_to_user(format!(
261 "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
262 target_table_catalog.name(),
263 ));
264 }
265 }
266
267 let sink_plan = plan_root.gen_sink_plan(
268 sink_table_name,
269 definition,
270 resolved_with_options,
271 emit_on_window_close,
272 db_name.to_owned(),
273 sink_from_table_name,
274 format_desc,
275 without_backfill,
276 target_table_catalog.clone(),
277 partition_info,
278 user_specified_columns,
279 )?;
280
281 let sink_desc = sink_plan.sink_desc().clone();
282
283 let mut sink_plan: PlanRef = sink_plan.into();
284
285 let ctx = sink_plan.ctx();
286 let explain_trace = ctx.is_explain_trace();
287 if explain_trace {
288 ctx.trace("Create Sink:");
289 ctx.trace(sink_plan.explain_to_string());
290 }
291
292 let dependencies =
295 RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
296 .into_iter()
297 .map(|id| id.table_id() as ObjectId)
298 .chain(
299 dependent_udfs
300 .into_iter()
301 .map(|id| id.function_id() as ObjectId),
302 )
303 .collect();
304
305 let sink_catalog = sink_desc.into_catalog(
306 SchemaId::new(sink_schema_id),
307 DatabaseId::new(sink_database_id),
308 UserId::new(session.user_id()),
309 connector_conn_ref.map(ConnectionId::from),
310 );
311
312 if let Some(table_catalog) = &target_table_catalog {
313 for column in sink_catalog.full_columns() {
314 if !column.can_dml() {
315 unreachable!(
316 "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
317 );
318 }
319 }
320
321 let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
322 let exprs = derive_default_column_project_for_sink(
323 &sink_catalog,
324 sink_plan.schema(),
325 &table_columns_without_rw_timestamp,
326 user_specified_columns,
327 )?;
328
329 let logical_project = generic::Project::new(exprs, sink_plan);
330
331 sink_plan = StreamProject::new(logical_project).into();
332
333 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
334 &table_columns_without_rw_timestamp,
335 )?;
336
337 if let Some(exprs) = exprs {
338 let logical_project = generic::Project::new(exprs, sink_plan);
339 sink_plan = StreamProject::new(logical_project).into();
340 }
341 };
342
343 Ok(SinkPlanContext {
344 query,
345 sink_plan,
346 sink_catalog,
347 target_table_catalog,
348 dependencies,
349 })
350}
351
352pub async fn get_partition_compute_info(
357 with_options: &WithOptionsSecResolved,
358) -> Result<Option<PartitionComputeInfo>> {
359 let (options, secret_refs) = with_options.clone().into_parts();
360 let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
361 return Ok(None);
362 };
363 let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
364 match connector.as_str() {
365 ICEBERG_SINK => {
366 let iceberg_config = IcebergConfig::from_btreemap(properties)?;
367 get_partition_compute_info_for_iceberg(&iceberg_config).await
368 }
369 _ => Ok(None),
370 }
371}
372
373#[allow(clippy::unused_async)]
374async fn get_partition_compute_info_for_iceberg(
375 _iceberg_config: &IcebergConfig,
376) -> Result<Option<PartitionComputeInfo>> {
377 Ok(None)
379
380 }
382
383pub async fn handle_create_sink(
384 handle_args: HandlerArgs,
385 stmt: CreateSinkStatement,
386) -> Result<RwPgResponse> {
387 let session = handle_args.session.clone();
388
389 session.check_cluster_limits().await?;
390
391 if let Either::Right(resp) = session.check_relation_name_duplicated(
392 stmt.sink_name.clone(),
393 StatementType::CREATE_SINK,
394 stmt.if_not_exists,
395 )? {
396 return Ok(resp);
397 }
398
399 let (mut sink, graph, target_table_catalog, dependencies) = {
400 let SinkPlanContext {
401 query,
402 sink_plan: plan,
403 sink_catalog: sink,
404 target_table_catalog,
405 dependencies,
406 } = gen_sink_plan(handle_args, stmt, None).await?;
407
408 let has_order_by = !query.order_by.is_empty();
409 if has_order_by {
410 plan.ctx().warn_to_user(
411 r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
412 .to_owned(),
413 );
414 }
415
416 let graph = build_graph(plan)?;
417
418 (sink, graph, target_table_catalog, dependencies)
419 };
420
421 let mut target_table_replace_plan = None;
422 if let Some(table_catalog) = target_table_catalog {
423 use crate::handler::alter_table_column::hijack_merger_for_target_table;
424
425 let (mut graph, mut table, source) =
426 reparse_table_for_sink(&session, &table_catalog).await?;
427
428 sink.original_target_columns = table
429 .columns
430 .iter()
431 .map(|col| ColumnCatalog::from(col.clone()))
432 .collect_vec();
433
434 table
435 .incoming_sinks
436 .clone_from(&table_catalog.incoming_sinks);
437
438 let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect();
439 let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;
440
441 let columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
442 for existing_sink in incoming_sinks {
443 hijack_merger_for_target_table(
444 &mut graph,
445 &columns_without_rw_timestamp,
446 &existing_sink,
447 Some(&existing_sink.unique_identity()),
448 )?;
449 }
450
451 hijack_merger_for_target_table(&mut graph, &columns_without_rw_timestamp, &sink, None)?;
453
454 target_table_replace_plan = Some(ReplaceJobPlan {
455 replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
456 replace_job_plan::ReplaceTable {
457 table: Some(table),
458 source,
459 job_type: TableJobType::General as _,
460 },
461 )),
462 fragment_graph: Some(graph),
463 table_col_index_mapping: None,
464 });
465 }
466
467 let _job_guard =
468 session
469 .env()
470 .creating_streaming_job_tracker()
471 .guard(CreatingStreamingJobInfo::new(
472 session.session_id(),
473 sink.database_id.database_id,
474 sink.schema_id.schema_id,
475 sink.name.clone(),
476 ));
477
478 let catalog_writer = session.catalog_writer()?;
479 catalog_writer
480 .create_sink(
481 sink.to_proto(),
482 graph,
483 target_table_replace_plan,
484 dependencies,
485 )
486 .await?;
487
488 Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
489}
490
491pub fn fetch_incoming_sinks(
492 session: &Arc<SessionImpl>,
493 incoming_sink_ids: &HashSet<SinkId>,
494) -> Result<Vec<Arc<SinkCatalog>>> {
495 let reader = session.env().catalog_reader().read_guard();
496 let mut sinks = Vec::with_capacity(incoming_sink_ids.len());
497 let db_name = &session.database();
498 for schema in reader.iter_schemas(db_name)? {
499 for sink in schema.iter_sink() {
500 if incoming_sink_ids.contains(&sink.id.sink_id) {
501 sinks.push(sink.clone());
502 }
503 }
504 }
505
506 Ok(sinks)
507}
508
509pub(crate) async fn reparse_table_for_sink(
510 session: &Arc<SessionImpl>,
511 table_catalog: &Arc<TableCatalog>,
512) -> Result<(StreamFragmentGraph, Table, Option<PbSource>)> {
513 let definition = table_catalog.create_sql_ast_purified()?;
515 let Statement::CreateTable { name, .. } = &definition else {
516 panic!("unexpected statement: {:?}", definition);
517 };
518 let table_name = name.clone();
519
520 let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
522 let col_id_gen = ColumnIdGenerator::new_alter(table_catalog);
523
524 let (graph, table, source, _) = generate_stream_graph_for_replace_table(
525 session,
526 table_name,
527 table_catalog,
528 handler_args,
529 definition,
530 col_id_gen,
531 SqlColumnStrategy::FollowUnchecked,
532 )
533 .await?;
534
535 Ok((graph, table, source))
536}
537
538pub(crate) fn insert_merger_to_union_with_project(
539 node: &mut StreamNode,
540 project_node: &PbNodeBody,
541 uniq_identity: Option<&str>,
542) {
543 if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
544 node.input.push(StreamNode {
546 input: vec![StreamNode {
547 node_body: Some(NodeBody::Merge(Box::new(MergeNode {
548 ..Default::default()
549 }))),
550 ..Default::default()
551 }],
552 identity: uniq_identity
553 .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
554 .to_owned(),
555 fields: node.fields.clone(),
556 node_body: Some(project_node.clone()),
557 ..Default::default()
558 });
559
560 return;
561 }
562
563 for input in &mut node.input {
564 insert_merger_to_union_with_project(input, project_node, uniq_identity);
565 }
566}
567
568fn derive_sink_to_table_expr(
569 sink_schema: &Schema,
570 idx: usize,
571 target_type: &DataType,
572) -> Result<ExprImpl> {
573 let input_type = &sink_schema.fields()[idx].data_type;
574
575 if target_type != input_type {
576 bail!(
577 "column type mismatch: {:?} vs {:?}, column name: {:?}",
578 target_type,
579 input_type,
580 sink_schema.fields()[idx].name
581 );
582 } else {
583 Ok(ExprImpl::InputRef(Box::new(InputRef::new(
584 idx,
585 input_type.clone(),
586 ))))
587 }
588}
589
590pub(crate) fn derive_default_column_project_for_sink(
591 sink: &SinkCatalog,
592 sink_schema: &Schema,
593 columns: &[ColumnCatalog],
594 user_specified_columns: bool,
595) -> Result<Vec<ExprImpl>> {
596 assert_eq!(sink.full_schema().len(), sink_schema.len());
597
598 let default_column_exprs = TableCatalog::default_column_exprs(columns);
599
600 let mut exprs = vec![];
601
602 let sink_visible_col_idxes = sink
603 .full_columns()
604 .iter()
605 .positions(|c| !c.is_hidden())
606 .collect_vec();
607 let sink_visible_col_idxes_by_name = sink
608 .full_columns()
609 .iter()
610 .enumerate()
611 .filter(|(_, c)| !c.is_hidden())
612 .map(|(i, c)| (c.name(), i))
613 .collect::<BTreeMap<_, _>>();
614
615 for (idx, column) in columns.iter().enumerate() {
616 if !column.can_dml() {
617 continue;
618 }
619
620 let default_col_expr =
621 || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
622
623 let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
624 derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
625 };
626
627 #[allow(clippy::collapsible_else_if)]
631 if user_specified_columns {
632 if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
633 exprs.push(sink_col_expr(*idx)?);
634 } else {
635 exprs.push(default_col_expr());
636 }
637 } else {
638 if idx < sink_visible_col_idxes.len() {
639 exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
640 } else {
641 exprs.push(default_col_expr());
642 };
643 }
644 }
645 Ok(exprs)
646}
647
648fn bind_sink_format_desc(
652 session: &SessionImpl,
653 value: FormatEncodeOptions,
654) -> Result<SinkFormatDesc> {
655 use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
656 use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
657 use risingwave_sqlparser::ast::{Encode as E, Format as F};
658
659 let format = match value.format {
660 F::Plain => SinkFormat::AppendOnly,
661 F::Upsert => SinkFormat::Upsert,
662 F::Debezium => SinkFormat::Debezium,
663 f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
664 return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
665 }
666 };
667 let encode = match value.row_encode {
668 E::Json => SinkEncode::Json,
669 E::Protobuf => SinkEncode::Protobuf,
670 E::Avro => SinkEncode::Avro,
671 E::Template => SinkEncode::Template,
672 E::Parquet => SinkEncode::Parquet,
673 e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
674 return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
675 }
676 };
677
678 let mut key_encode = None;
679 if let Some(encode) = value.key_encode {
680 match encode {
681 E::Text => key_encode = Some(SinkEncode::Text),
682 E::Bytes => key_encode = Some(SinkEncode::Bytes),
683 _ => {
684 return Err(ErrorCode::BindError(format!(
685 "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
686 ))
687 .into());
688 }
689 }
690 }
691
692 let (props, connection_type_flag, schema_registry_conn_ref) =
693 resolve_connection_ref_and_secret_ref(
694 WithOptions::try_from(value.row_options.as_slice())?,
695 session,
696 TelemetryDatabaseObject::Sink,
697 )?;
698 ensure_connection_type_allowed(
699 connection_type_flag,
700 &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
701 )?;
702 let (mut options, secret_refs) = props.into_parts();
703
704 options
705 .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
706 .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
707
708 Ok(SinkFormatDesc {
709 format,
710 encode,
711 options,
712 secret_refs,
713 key_encode,
714 connection_id: schema_registry_conn_ref,
715 })
716}
717
718static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
719 LazyLock::new(|| {
720 use risingwave_connector::sink::Sink as _;
721 use risingwave_connector::sink::file_sink::azblob::AzblobSink;
722 use risingwave_connector::sink::file_sink::fs::FsSink;
723 use risingwave_connector::sink::file_sink::gcs::GcsSink;
724 use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
725 use risingwave_connector::sink::file_sink::s3::{S3Sink, SnowflakeSink};
726 use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
727 use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
728 use risingwave_connector::sink::kafka::KafkaSink;
729 use risingwave_connector::sink::kinesis::KinesisSink;
730 use risingwave_connector::sink::mqtt::MqttSink;
731 use risingwave_connector::sink::pulsar::PulsarSink;
732 use risingwave_connector::sink::redis::RedisSink;
733
734 convert_args!(hashmap!(
735 GooglePubSubSink::SINK_NAME => hashmap!(
736 Format::Plain => vec![Encode::Json],
737 ),
738 KafkaSink::SINK_NAME => hashmap!(
739 Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
740 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
741 Format::Debezium => vec![Encode::Json],
742 ),
743 FileSink::<S3Sink>::SINK_NAME => hashmap!(
744 Format::Plain => vec![Encode::Parquet, Encode::Json],
745 ),
746 FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
747 Format::Plain => vec![Encode::Parquet, Encode::Json],
748 ),
749 FileSink::<GcsSink>::SINK_NAME => hashmap!(
750 Format::Plain => vec![Encode::Parquet, Encode::Json],
751 ),
752 FileSink::<AzblobSink>::SINK_NAME => hashmap!(
753 Format::Plain => vec![Encode::Parquet, Encode::Json],
754 ),
755 FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
756 Format::Plain => vec![Encode::Parquet, Encode::Json],
757 ),
758 FileSink::<FsSink>::SINK_NAME => hashmap!(
759 Format::Plain => vec![Encode::Parquet, Encode::Json],
760 ),
761 KinesisSink::SINK_NAME => hashmap!(
762 Format::Plain => vec![Encode::Json],
763 Format::Upsert => vec![Encode::Json],
764 Format::Debezium => vec![Encode::Json],
765 ),
766 MqttSink::SINK_NAME => hashmap!(
767 Format::Plain => vec![Encode::Json, Encode::Protobuf],
768 ),
769 PulsarSink::SINK_NAME => hashmap!(
770 Format::Plain => vec![Encode::Json],
771 Format::Upsert => vec![Encode::Json],
772 Format::Debezium => vec![Encode::Json],
773 ),
774 RedisSink::SINK_NAME => hashmap!(
775 Format::Plain => vec![Encode::Json, Encode::Template],
776 Format::Upsert => vec![Encode::Json, Encode::Template],
777 ),
778 ))
779 });
780
781pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
782 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
783 .get(connector)
784 .ok_or_else(|| {
785 ErrorCode::BindError(format!(
786 "connector {} is not supported by FORMAT ... ENCODE ... syntax",
787 connector
788 ))
789 })?;
790 let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
791 ErrorCode::BindError(format!(
792 "connector {} does not support format {:?}",
793 connector, format_desc.format
794 ))
795 })?;
796 if !compatible_encodes.contains(&format_desc.row_encode) {
797 return Err(ErrorCode::BindError(format!(
798 "connector {} does not support format {:?} with encode {:?}",
799 connector, format_desc.format, format_desc.row_encode
800 ))
801 .into());
802 }
803
804 if let Some(encode) = &format_desc.key_encode
806 && connector != KAFKA_SINK
807 && matches!(encode, Encode::Bytes)
808 {
809 return Err(ErrorCode::BindError(format!(
810 "key encode bytes only works with kafka connector, but found {}",
811 connector
812 ))
813 .into());
814 }
815
816 Ok(())
817}
818
819#[cfg(test)]
820pub mod tests {
821 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
822
823 use crate::catalog::root_catalog::SchemaPath;
824 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
825
826 #[tokio::test]
827 async fn test_create_sink_handler() {
828 let proto_file = create_proto_file(PROTO_FILE_DATA);
829 let sql = format!(
830 r#"CREATE SOURCE t1
831 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
832 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
833 proto_file.path().to_str().unwrap()
834 );
835 let frontend = LocalFrontend::new(Default::default()).await;
836 frontend.run_sql(sql).await.unwrap();
837
838 let sql = "create materialized view mv1 as select t1.country from t1;";
839 frontend.run_sql(sql).await.unwrap();
840
841 let sql = r#"CREATE SINK snk1 FROM mv1
842 WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
843 '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
844 mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
845 frontend.run_sql(sql).await.unwrap();
846
847 let session = frontend.session_ref();
848 let catalog_reader = session.env().catalog_reader().read_guard();
849 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
850
851 let (source, _) = catalog_reader
853 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
854 .unwrap();
855 assert_eq!(source.name, "t1");
856
857 let (table, schema_name) = catalog_reader
859 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
860 .unwrap();
861 assert_eq!(table.name(), "mv1");
862
863 let (sink, _) = catalog_reader
865 .get_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
866 .unwrap();
867 assert_eq!(sink.name, "snk1");
868 }
869}