1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use iceberg::arrow::type_to_arrow_type;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use maplit::{convert_args, hashmap, hashset};
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::array::arrow::IcebergArrowConvert;
25use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
26use risingwave_common::bail;
27use risingwave_common::catalog::{
28 ColumnCatalog, ConnectionId, DatabaseId, ObjectId, Schema, SchemaId, UserId,
29};
30use risingwave_common::license::Feature;
31use risingwave_common::secret::LocalSecretManager;
32use risingwave_common::system_param::reader::SystemParamsRead;
33use risingwave_common::types::DataType;
34use risingwave_connector::WithPropertiesExt;
35use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
36use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
37use risingwave_connector::sink::kafka::KAFKA_SINK;
38use risingwave_connector::sink::{
39 CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
40 enforce_secret_sink,
41};
42use risingwave_pb::catalog::PbSink;
43use risingwave_pb::catalog::connection_params::PbConnectionType;
44use risingwave_pb::ddl_service::{ReplaceJobPlan, TableJobType, replace_job_plan};
45use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
46use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode};
47use risingwave_pb::telemetry::TelemetryDatabaseObject;
48use risingwave_sqlparser::ast::{
49 CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
50 Query, Statement,
51};
52
53use super::RwPgResponse;
54use super::create_mv::get_column_names;
55use super::create_source::{SqlColumnStrategy, UPSTREAM_SOURCE_KEY};
56use super::util::gen_query_from_table_name;
57use crate::binder::Binder;
58use crate::catalog::SinkId;
59use crate::catalog::source_catalog::SourceCatalog;
60use crate::error::{ErrorCode, Result, RwError};
61use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
62use crate::handler::HandlerArgs;
63use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
64use crate::handler::create_mv::parse_column_names;
65use crate::handler::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
66use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
67use crate::optimizer::plan_node::{
68 IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamProject, generic,
69};
70use crate::optimizer::{OptimizerContext, PlanRef, RelationCollectorVisitor};
71use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
72use crate::session::SessionImpl;
73use crate::session::current::notice_to_user;
74use crate::stream_fragmenter::{GraphJobType, build_graph};
75use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
76use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
77
78static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
79 LazyLock::new(|| {
80 hashset! {
81 PbConnectionType::Unspecified,
82 PbConnectionType::Kafka,
83 PbConnectionType::Iceberg,
84 PbConnectionType::Elasticsearch,
85 }
86 });
87
88static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
89 LazyLock::new(|| {
90 hashset! {
91 PbConnectionType::Unspecified,
92 PbConnectionType::SchemaRegistry,
93 }
94 });
95
96pub struct SinkPlanContext {
98 pub query: Box<Query>,
99 pub sink_plan: PlanRef,
100 pub sink_catalog: SinkCatalog,
101 pub target_table_catalog: Option<Arc<TableCatalog>>,
102 pub dependencies: HashSet<ObjectId>,
103}
104
105pub async fn gen_sink_plan(
106 handler_args: HandlerArgs,
107 stmt: CreateSinkStatement,
108 explain_options: Option<ExplainOptions>,
109 is_iceberg_engine_internal: bool,
110) -> Result<SinkPlanContext> {
111 let session = handler_args.session.clone();
112 let session = session.as_ref();
113 let user_specified_columns = !stmt.columns.is_empty();
114 let db_name = &session.database();
115 let (sink_schema_name, sink_table_name) =
116 Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?;
117
118 let mut with_options = handler_args.with_options.clone();
119
120 if session
121 .env()
122 .system_params_manager()
123 .get_params()
124 .load()
125 .enforce_secret()
126 && Feature::SecretManagement.check_available().is_ok()
127 {
128 enforce_secret_sink(&with_options)?;
129 }
130
131 resolve_privatelink_in_with_option(&mut with_options)?;
132 let (mut resolved_with_options, connection_type, connector_conn_ref) =
133 resolve_connection_ref_and_secret_ref(
134 with_options,
135 session,
136 Some(TelemetryDatabaseObject::Sink),
137 )?;
138 ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
139
140 if !matches!(connection_type, PbConnectionType::Unspecified) {
142 let Some(connector) = resolved_with_options.get_connector() else {
143 return Err(RwError::from(ErrorCode::ProtocolError(format!(
144 "missing field '{}' in WITH clause",
145 CONNECTOR_TYPE_KEY
146 ))));
147 };
148 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
149 }
150
151 let partition_info = get_partition_compute_info(&resolved_with_options).await?;
152
153 let context = if let Some(explain_options) = explain_options {
154 OptimizerContext::new(handler_args.clone(), explain_options)
155 } else {
156 OptimizerContext::from_handler_args(handler_args.clone())
157 };
158
159 let sink_from_table_name;
161 let direct_sink;
164 let query = match stmt.sink_from {
165 CreateSink::From(from_name) => {
166 sink_from_table_name = from_name.0.last().unwrap().real_value();
167 direct_sink = true;
168 Box::new(gen_query_from_table_name(from_name))
169 }
170 CreateSink::AsQuery(query) => {
171 sink_from_table_name = sink_table_name.clone();
172 direct_sink = false;
173 query
174 }
175 };
176
177 let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
178
179 let (sink_database_id, sink_schema_id) =
180 session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
181
182 let (dependent_relations, dependent_udfs, bound) = {
183 let mut binder = Binder::new_for_stream(session);
184 let bound = binder.bind_query(*query.clone())?;
185 (
186 binder.included_relations().clone(),
187 binder.included_udfs().clone(),
188 bound,
189 )
190 };
191
192 let col_names = if sink_into_table_name.is_some() {
193 parse_column_names(&stmt.columns)
194 } else {
195 get_column_names(&bound, stmt.columns)?
197 };
198
199 if sink_into_table_name.is_some() {
200 let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
201
202 if prev.is_some() {
203 return Err(RwError::from(ErrorCode::BindError(
204 "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
205 )));
206 }
207 }
208
209 let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
210 if emit_on_window_close {
211 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
212 }
213
214 let connector = resolved_with_options
215 .get(CONNECTOR_TYPE_KEY)
216 .cloned()
217 .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
218
219 let format_desc = match stmt.sink_schema {
220 Some(f) => {
222 validate_compatibility(&connector, &f)?;
223 Some(bind_sink_format_desc(session,f)?)
224 }
225 None => match resolved_with_options.get(SINK_TYPE_OPTION) {
226 Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
228 session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
229 if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
230 f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
231 }
232 f
233 }),
234 None => None,
236 },
237 };
238
239 let definition = context.normalized_sql().to_owned();
240 let mut plan_root = if is_iceberg_engine_internal {
241 Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
242 } else {
243 Planner::new_for_stream(context.into()).plan_query(bound)?
244 };
245 if let Some(col_names) = &col_names {
246 plan_root.set_out_names(col_names.clone())?;
247 };
248
249 let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
250 Some(flag) if flag.eq_ignore_ascii_case("false") => {
251 if direct_sink || is_iceberg_engine_internal {
252 true
253 } else {
254 return Err(ErrorCode::BindError(
255 "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
256 )
257 .into());
258 }
259 }
260 _ => false,
261 };
262
263 let target_table_catalog = stmt
264 .into_table_name
265 .as_ref()
266 .map(|table_name| fetch_table_catalog_for_alter(session, table_name))
267 .transpose()?;
268
269 if let Some(target_table_catalog) = &target_table_catalog {
270 if let Some(col_names) = col_names {
271 let target_table_columns = target_table_catalog
272 .columns()
273 .iter()
274 .map(|c| c.name())
275 .collect::<BTreeSet<_>>();
276 for c in col_names {
277 if !target_table_columns.contains(c.as_str()) {
278 return Err(RwError::from(ErrorCode::BindError(format!(
279 "Column {} not found in table {}",
280 c,
281 target_table_catalog.name()
282 ))));
283 }
284 }
285 }
286 if target_table_catalog
287 .columns()
288 .iter()
289 .any(|col| !col.nullable())
290 {
291 notice_to_user(format!(
292 "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
293 target_table_catalog.name(),
294 ));
295 }
296 }
297
298 let sink_plan = plan_root.gen_sink_plan(
299 sink_table_name,
300 definition,
301 resolved_with_options,
302 emit_on_window_close,
303 db_name.to_owned(),
304 sink_from_table_name,
305 format_desc,
306 without_backfill,
307 target_table_catalog.clone(),
308 partition_info,
309 user_specified_columns,
310 )?;
311
312 let sink_desc = sink_plan.sink_desc().clone();
313
314 let mut sink_plan: PlanRef = sink_plan.into();
315
316 let ctx = sink_plan.ctx();
317 let explain_trace = ctx.is_explain_trace();
318 if explain_trace {
319 ctx.trace("Create Sink:");
320 ctx.trace(sink_plan.explain_to_string());
321 }
322 tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
323
324 let dependencies =
327 RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
328 .into_iter()
329 .map(|id| id.table_id() as ObjectId)
330 .chain(
331 dependent_udfs
332 .into_iter()
333 .map(|id| id.function_id() as ObjectId),
334 )
335 .collect();
336
337 let sink_catalog = sink_desc.into_catalog(
338 SchemaId::new(sink_schema_id),
339 DatabaseId::new(sink_database_id),
340 UserId::new(session.user_id()),
341 connector_conn_ref.map(ConnectionId::from),
342 );
343
344 if let Some(table_catalog) = &target_table_catalog {
345 for column in sink_catalog.full_columns() {
346 if !column.can_dml() {
347 unreachable!(
348 "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
349 );
350 }
351 }
352
353 let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
354 let exprs = derive_default_column_project_for_sink(
355 &sink_catalog,
356 sink_plan.schema(),
357 &table_columns_without_rw_timestamp,
358 user_specified_columns,
359 )?;
360
361 let logical_project = generic::Project::new(exprs, sink_plan);
362
363 sink_plan = StreamProject::new(logical_project).into();
364
365 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
366 &table_columns_without_rw_timestamp,
367 )?;
368
369 if let Some(exprs) = exprs {
370 let logical_project = generic::Project::new(exprs, sink_plan);
371 sink_plan = StreamProject::new(logical_project).into();
372 }
373 };
374
375 Ok(SinkPlanContext {
376 query,
377 sink_plan,
378 sink_catalog,
379 target_table_catalog,
380 dependencies,
381 })
382}
383
384pub async fn get_partition_compute_info(
389 with_options: &WithOptionsSecResolved,
390) -> Result<Option<PartitionComputeInfo>> {
391 let (options, secret_refs) = with_options.clone().into_parts();
392 let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
393 return Ok(None);
394 };
395 let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
396 match connector.as_str() {
397 ICEBERG_SINK => {
398 let iceberg_config = IcebergConfig::from_btreemap(properties)?;
399 get_partition_compute_info_for_iceberg(&iceberg_config).await
400 }
401 _ => Ok(None),
402 }
403}
404
405#[allow(clippy::unused_async)]
406async fn get_partition_compute_info_for_iceberg(
407 _iceberg_config: &IcebergConfig,
408) -> Result<Option<PartitionComputeInfo>> {
409 if _iceberg_config.create_table_if_not_exists {
411 return Ok(None);
412 }
413 let table = _iceberg_config.load_table().await?;
414 let partition_spec = table.metadata().default_partition_spec();
415 if partition_spec.is_unpartitioned() {
416 return Ok(None);
417 }
418
419 let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
424 Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
426 Transform::Year
428 | Transform::Month
429 | Transform::Day
430 | Transform::Hour
431 | Transform::Void
432 | Transform::Unknown => false,
433 });
434 if !has_sparse_partition {
435 return Ok(None);
436 }
437
438 let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
439 table.metadata().default_partition_type().clone(),
440 ))
441 .map_err(|_| {
442 RwError::from(ErrorCode::SinkError(
443 "Fail to convert iceberg partition type to arrow type".into(),
444 ))
445 })?;
446 let ArrowDataType::Struct(struct_fields) = arrow_type else {
447 return Err(RwError::from(ErrorCode::SinkError(
448 "Partition type of iceberg should be a struct type".into(),
449 )));
450 };
451
452 let schema = table.metadata().current_schema();
453 let partition_fields = partition_spec
454 .fields()
455 .iter()
456 .map(|f| {
457 let source_f =
458 schema
459 .field_by_id(f.source_id)
460 .ok_or(RwError::from(ErrorCode::SinkError(
461 "Fail to look up iceberg partition field".into(),
462 )))?;
463 Ok((source_f.name.clone(), f.transform))
464 })
465 .collect::<Result<Vec<_>>>()?;
466
467 Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
468 partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
469 partition_fields,
470 })))
471}
472
473pub async fn handle_create_sink(
474 handle_args: HandlerArgs,
475 stmt: CreateSinkStatement,
476 is_iceberg_engine_internal: bool,
477) -> Result<RwPgResponse> {
478 let session = handle_args.session.clone();
479
480 session.check_cluster_limits().await?;
481
482 let if_not_exists = stmt.if_not_exists;
483 if let Either::Right(resp) = session.check_relation_name_duplicated(
484 stmt.sink_name.clone(),
485 StatementType::CREATE_SINK,
486 if_not_exists,
487 )? {
488 return Ok(resp);
489 }
490
491 let (mut sink, graph, target_table_catalog, dependencies) = {
492 let SinkPlanContext {
493 query,
494 sink_plan: plan,
495 sink_catalog: sink,
496 target_table_catalog,
497 dependencies,
498 } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
499
500 let has_order_by = !query.order_by.is_empty();
501 if has_order_by {
502 plan.ctx().warn_to_user(
503 r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
504 .to_owned(),
505 );
506 }
507
508 let graph = build_graph(plan, Some(GraphJobType::Sink))?;
509
510 (sink, graph, target_table_catalog, dependencies)
511 };
512
513 let mut target_table_replace_plan = None;
514 if let Some(table_catalog) = target_table_catalog {
515 use crate::handler::alter_table_column::hijack_merger_for_target_table;
516
517 let (mut graph, mut table, source, target_job_type) =
518 reparse_table_for_sink(&session, &table_catalog).await?;
519
520 sink.original_target_columns = table.columns.clone();
521
522 table
523 .incoming_sinks
524 .clone_from(&table_catalog.incoming_sinks);
525
526 let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect();
527 let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;
528
529 let columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
530 for existing_sink in incoming_sinks {
531 hijack_merger_for_target_table(
532 &mut graph,
533 &columns_without_rw_timestamp,
534 &existing_sink,
535 Some(&existing_sink.unique_identity()),
536 )?;
537 }
538
539 hijack_merger_for_target_table(&mut graph, &columns_without_rw_timestamp, &sink, None)?;
541
542 target_table_replace_plan = Some(ReplaceJobPlan {
543 replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
544 replace_job_plan::ReplaceTable {
545 table: Some(table.to_prost()),
546 source: source.map(|x| x.to_prost()),
547 job_type: target_job_type as _,
548 },
549 )),
550 fragment_graph: Some(graph),
551 });
552 }
553
554 let _job_guard =
555 session
556 .env()
557 .creating_streaming_job_tracker()
558 .guard(CreatingStreamingJobInfo::new(
559 session.session_id(),
560 sink.database_id.database_id,
561 sink.schema_id.schema_id,
562 sink.name.clone(),
563 ));
564
565 let catalog_writer = session.catalog_writer()?;
566 catalog_writer
567 .create_sink(
568 sink.to_proto(),
569 graph,
570 target_table_replace_plan,
571 dependencies,
572 if_not_exists,
573 )
574 .await?;
575
576 Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
577}
578
579pub fn fetch_incoming_sinks(
580 session: &Arc<SessionImpl>,
581 incoming_sink_ids: &HashSet<SinkId>,
582) -> Result<Vec<Arc<SinkCatalog>>> {
583 let reader = session.env().catalog_reader().read_guard();
584 let mut sinks = Vec::with_capacity(incoming_sink_ids.len());
585 let db_name = &session.database();
586 for schema in reader.iter_schemas(db_name)? {
587 for sink in schema.iter_sink() {
588 if incoming_sink_ids.contains(&sink.id.sink_id) {
589 sinks.push(sink.clone());
590 }
591 }
592 }
593
594 Ok(sinks)
595}
596
597pub(crate) async fn reparse_table_for_sink(
598 session: &Arc<SessionImpl>,
599 table_catalog: &Arc<TableCatalog>,
600) -> Result<(
601 StreamFragmentGraph,
602 TableCatalog,
603 Option<SourceCatalog>,
604 TableJobType,
605)> {
606 let definition = table_catalog.create_sql_ast_purified()?;
608 let Statement::CreateTable { name, .. } = &definition else {
609 panic!("unexpected statement: {:?}", definition);
610 };
611 let table_name = name.clone();
612
613 let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
615 let col_id_gen = ColumnIdGenerator::new_alter(table_catalog);
616
617 let (graph, table, source, job_type) = generate_stream_graph_for_replace_table(
618 session,
619 table_name,
620 table_catalog,
621 handler_args,
622 definition,
623 col_id_gen,
624 SqlColumnStrategy::FollowUnchecked,
625 )
626 .await?;
627
628 Ok((graph, table, source, job_type))
629}
630
631pub(crate) fn insert_merger_to_union_with_project(
632 node: &mut StreamNode,
633 project_node: &PbNodeBody,
634 uniq_identity: Option<&str>,
635) {
636 if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
637 node.input.push(StreamNode {
639 input: vec![StreamNode {
640 node_body: Some(NodeBody::Merge(Box::new(MergeNode {
641 ..Default::default()
642 }))),
643 ..Default::default()
644 }],
645 identity: uniq_identity
646 .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
647 .to_owned(),
648 fields: node.fields.clone(),
649 node_body: Some(project_node.clone()),
650 ..Default::default()
651 });
652
653 return;
654 }
655
656 for input in &mut node.input {
657 insert_merger_to_union_with_project(input, project_node, uniq_identity);
658 }
659}
660
661fn derive_sink_to_table_expr(
662 sink_schema: &Schema,
663 idx: usize,
664 target_type: &DataType,
665) -> Result<ExprImpl> {
666 let input_type = &sink_schema.fields()[idx].data_type;
667
668 if target_type != input_type {
669 bail!(
670 "column type mismatch: {:?} vs {:?}, column name: {:?}",
671 target_type,
672 input_type,
673 sink_schema.fields()[idx].name
674 );
675 } else {
676 Ok(ExprImpl::InputRef(Box::new(InputRef::new(
677 idx,
678 input_type.clone(),
679 ))))
680 }
681}
682
683pub(crate) fn derive_default_column_project_for_sink(
684 sink: &SinkCatalog,
685 sink_schema: &Schema,
686 columns: &[ColumnCatalog],
687 user_specified_columns: bool,
688) -> Result<Vec<ExprImpl>> {
689 assert_eq!(sink.full_schema().len(), sink_schema.len());
690
691 let default_column_exprs = TableCatalog::default_column_exprs(columns);
692
693 let mut exprs = vec![];
694
695 let sink_visible_col_idxes = sink
696 .full_columns()
697 .iter()
698 .positions(|c| !c.is_hidden())
699 .collect_vec();
700 let sink_visible_col_idxes_by_name = sink
701 .full_columns()
702 .iter()
703 .enumerate()
704 .filter(|(_, c)| !c.is_hidden())
705 .map(|(i, c)| (c.name(), i))
706 .collect::<BTreeMap<_, _>>();
707
708 for (idx, column) in columns.iter().enumerate() {
709 if !column.can_dml() {
710 continue;
711 }
712
713 let default_col_expr =
714 || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
715
716 let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
717 derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
718 };
719
720 #[allow(clippy::collapsible_else_if)]
724 if user_specified_columns {
725 if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
726 exprs.push(sink_col_expr(*idx)?);
727 } else {
728 exprs.push(default_col_expr());
729 }
730 } else {
731 if idx < sink_visible_col_idxes.len() {
732 exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
733 } else {
734 exprs.push(default_col_expr());
735 };
736 }
737 }
738 Ok(exprs)
739}
740
741fn bind_sink_format_desc(
745 session: &SessionImpl,
746 value: FormatEncodeOptions,
747) -> Result<SinkFormatDesc> {
748 use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
749 use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
750 use risingwave_sqlparser::ast::{Encode as E, Format as F};
751
752 let format = match value.format {
753 F::Plain => SinkFormat::AppendOnly,
754 F::Upsert => SinkFormat::Upsert,
755 F::Debezium => SinkFormat::Debezium,
756 f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
757 return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
758 }
759 };
760 let encode = match value.row_encode {
761 E::Json => SinkEncode::Json,
762 E::Protobuf => SinkEncode::Protobuf,
763 E::Avro => SinkEncode::Avro,
764 E::Template => SinkEncode::Template,
765 E::Parquet => SinkEncode::Parquet,
766 e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
767 return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
768 }
769 };
770
771 let mut key_encode = None;
772 if let Some(encode) = value.key_encode {
773 match encode {
774 E::Text => key_encode = Some(SinkEncode::Text),
775 E::Bytes => key_encode = Some(SinkEncode::Bytes),
776 _ => {
777 return Err(ErrorCode::BindError(format!(
778 "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
779 ))
780 .into());
781 }
782 }
783 }
784
785 let (props, connection_type_flag, schema_registry_conn_ref) =
786 resolve_connection_ref_and_secret_ref(
787 WithOptions::try_from(value.row_options.as_slice())?,
788 session,
789 Some(TelemetryDatabaseObject::Sink),
790 )?;
791 ensure_connection_type_allowed(
792 connection_type_flag,
793 &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
794 )?;
795 let (mut options, secret_refs) = props.into_parts();
796
797 options
798 .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
799 .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
800
801 Ok(SinkFormatDesc {
802 format,
803 encode,
804 options,
805 secret_refs,
806 key_encode,
807 connection_id: schema_registry_conn_ref,
808 })
809}
810
811static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
812 LazyLock::new(|| {
813 use risingwave_connector::sink::Sink as _;
814 use risingwave_connector::sink::file_sink::azblob::AzblobSink;
815 use risingwave_connector::sink::file_sink::fs::FsSink;
816 use risingwave_connector::sink::file_sink::gcs::GcsSink;
817 use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
818 use risingwave_connector::sink::file_sink::s3::{S3Sink, SnowflakeSink};
819 use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
820 use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
821 use risingwave_connector::sink::kafka::KafkaSink;
822 use risingwave_connector::sink::kinesis::KinesisSink;
823 use risingwave_connector::sink::mqtt::MqttSink;
824 use risingwave_connector::sink::pulsar::PulsarSink;
825 use risingwave_connector::sink::redis::RedisSink;
826
827 convert_args!(hashmap!(
828 GooglePubSubSink::SINK_NAME => hashmap!(
829 Format::Plain => vec![Encode::Json],
830 ),
831 KafkaSink::SINK_NAME => hashmap!(
832 Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
833 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
834 Format::Debezium => vec![Encode::Json],
835 ),
836 FileSink::<S3Sink>::SINK_NAME => hashmap!(
837 Format::Plain => vec![Encode::Parquet, Encode::Json],
838 ),
839 FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
840 Format::Plain => vec![Encode::Parquet, Encode::Json],
841 ),
842 FileSink::<GcsSink>::SINK_NAME => hashmap!(
843 Format::Plain => vec![Encode::Parquet, Encode::Json],
844 ),
845 FileSink::<AzblobSink>::SINK_NAME => hashmap!(
846 Format::Plain => vec![Encode::Parquet, Encode::Json],
847 ),
848 FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
849 Format::Plain => vec![Encode::Parquet, Encode::Json],
850 ),
851 FileSink::<FsSink>::SINK_NAME => hashmap!(
852 Format::Plain => vec![Encode::Parquet, Encode::Json],
853 ),
854 KinesisSink::SINK_NAME => hashmap!(
855 Format::Plain => vec![Encode::Json],
856 Format::Upsert => vec![Encode::Json],
857 Format::Debezium => vec![Encode::Json],
858 ),
859 MqttSink::SINK_NAME => hashmap!(
860 Format::Plain => vec![Encode::Json, Encode::Protobuf],
861 ),
862 PulsarSink::SINK_NAME => hashmap!(
863 Format::Plain => vec![Encode::Json],
864 Format::Upsert => vec![Encode::Json],
865 Format::Debezium => vec![Encode::Json],
866 ),
867 RedisSink::SINK_NAME => hashmap!(
868 Format::Plain => vec![Encode::Json, Encode::Template],
869 Format::Upsert => vec![Encode::Json, Encode::Template],
870 ),
871 ))
872 });
873
874pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
875 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
876 .get(connector)
877 .ok_or_else(|| {
878 ErrorCode::BindError(format!(
879 "connector {} is not supported by FORMAT ... ENCODE ... syntax",
880 connector
881 ))
882 })?;
883 let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
884 ErrorCode::BindError(format!(
885 "connector {} does not support format {:?}",
886 connector, format_desc.format
887 ))
888 })?;
889 if !compatible_encodes.contains(&format_desc.row_encode) {
890 return Err(ErrorCode::BindError(format!(
891 "connector {} does not support format {:?} with encode {:?}",
892 connector, format_desc.format, format_desc.row_encode
893 ))
894 .into());
895 }
896
897 if let Some(encode) = &format_desc.key_encode
899 && connector != KAFKA_SINK
900 && matches!(encode, Encode::Bytes)
901 {
902 return Err(ErrorCode::BindError(format!(
903 "key encode bytes only works with kafka connector, but found {}",
904 connector
905 ))
906 .into());
907 }
908
909 Ok(())
910}
911
912#[cfg(test)]
913pub mod tests {
914 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
915
916 use crate::catalog::root_catalog::SchemaPath;
917 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
918
919 #[tokio::test]
920 async fn test_create_sink_handler() {
921 let proto_file = create_proto_file(PROTO_FILE_DATA);
922 let sql = format!(
923 r#"CREATE SOURCE t1
924 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
925 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
926 proto_file.path().to_str().unwrap()
927 );
928 let frontend = LocalFrontend::new(Default::default()).await;
929 frontend.run_sql(sql).await.unwrap();
930
931 let sql = "create materialized view mv1 as select t1.country from t1;";
932 frontend.run_sql(sql).await.unwrap();
933
934 let sql = r#"CREATE SINK snk1 FROM mv1
935 WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
936 '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
937 mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
938 frontend.run_sql(sql).await.unwrap();
939
940 let session = frontend.session_ref();
941 let catalog_reader = session.env().catalog_reader().read_guard();
942 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
943
944 let (source, _) = catalog_reader
946 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
947 .unwrap();
948 assert_eq!(source.name, "t1");
949
950 let (table, schema_name) = catalog_reader
952 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
953 .unwrap();
954 assert_eq!(table.name(), "mv1");
955
956 let (sink, _) = catalog_reader
958 .get_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
959 .unwrap();
960 assert_eq!(sink.name, "snk1");
961 }
962}