1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use iceberg::arrow::type_to_arrow_type;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use maplit::{convert_args, hashmap, hashset};
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::array::arrow::IcebergArrowConvert;
25use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
26use risingwave_common::bail;
27use risingwave_common::catalog::{
28 ColumnCatalog, ConnectionId, DatabaseId, ObjectId, Schema, SchemaId, UserId,
29};
30use risingwave_common::license::Feature;
31use risingwave_common::secret::LocalSecretManager;
32use risingwave_common::system_param::reader::SystemParamsRead;
33use risingwave_common::types::DataType;
34use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
35use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
36use risingwave_connector::sink::kafka::KAFKA_SINK;
37use risingwave_connector::sink::{
38 CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
39 enforce_secret_sink,
40};
41use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt};
42use risingwave_pb::catalog::PbSink;
43use risingwave_pb::catalog::connection_params::PbConnectionType;
44use risingwave_pb::ddl_service::{ReplaceJobPlan, TableJobType, replace_job_plan};
45use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
46use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode};
47use risingwave_pb::telemetry::TelemetryDatabaseObject;
48use risingwave_sqlparser::ast::{
49 CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
50 ObjectName, Query, Statement,
51};
52
53use super::RwPgResponse;
54use super::create_mv::get_column_names;
55use super::create_source::{SqlColumnStrategy, UPSTREAM_SOURCE_KEY};
56use super::util::gen_query_from_table_name;
57use crate::binder::{Binder, Relation};
58use crate::catalog::SinkId;
59use crate::catalog::source_catalog::SourceCatalog;
60use crate::catalog::table_catalog::TableType;
61use crate::error::{ErrorCode, Result, RwError};
62use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
63use crate::handler::HandlerArgs;
64use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
65use crate::handler::create_mv::parse_column_names;
66use crate::handler::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
67use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
68use crate::optimizer::plan_node::{
69 IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamPlanRef as PlanRef,
70 StreamProject, generic,
71};
72use crate::optimizer::{OptimizerContext, RelationCollectorVisitor};
73use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
74use crate::session::SessionImpl;
75use crate::session::current::notice_to_user;
76use crate::stream_fragmenter::{GraphJobType, build_graph};
77use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
78use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
79
80static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
81 LazyLock::new(|| {
82 hashset! {
83 PbConnectionType::Unspecified,
84 PbConnectionType::Kafka,
85 PbConnectionType::Iceberg,
86 PbConnectionType::Elasticsearch,
87 }
88 });
89
90static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
91 LazyLock::new(|| {
92 hashset! {
93 PbConnectionType::Unspecified,
94 PbConnectionType::SchemaRegistry,
95 }
96 });
97
98pub struct SinkPlanContext {
100 pub query: Box<Query>,
101 pub sink_plan: PlanRef,
102 pub sink_catalog: SinkCatalog,
103 pub target_table_catalog: Option<Arc<TableCatalog>>,
104 pub dependencies: HashSet<ObjectId>,
105}
106
107pub async fn gen_sink_plan(
108 handler_args: HandlerArgs,
109 stmt: CreateSinkStatement,
110 explain_options: Option<ExplainOptions>,
111 is_iceberg_engine_internal: bool,
112) -> Result<SinkPlanContext> {
113 let session = handler_args.session.clone();
114 let session = session.as_ref();
115 let user_specified_columns = !stmt.columns.is_empty();
116 let db_name = &session.database();
117 let (sink_schema_name, sink_table_name) =
118 Binder::resolve_schema_qualified_name(db_name, &stmt.sink_name)?;
119
120 let mut with_options = handler_args.with_options.clone();
121
122 if session
123 .env()
124 .system_params_manager()
125 .get_params()
126 .load()
127 .enforce_secret()
128 && Feature::SecretManagement.check_available().is_ok()
129 {
130 enforce_secret_sink(&with_options)?;
131 }
132
133 resolve_privatelink_in_with_option(&mut with_options)?;
134 let (mut resolved_with_options, connection_type, connector_conn_ref) =
135 resolve_connection_ref_and_secret_ref(
136 with_options,
137 session,
138 Some(TelemetryDatabaseObject::Sink),
139 )?;
140 ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
141
142 if !matches!(connection_type, PbConnectionType::Unspecified) {
144 let Some(connector) = resolved_with_options.get_connector() else {
145 return Err(RwError::from(ErrorCode::ProtocolError(format!(
146 "missing field '{}' in WITH clause",
147 CONNECTOR_TYPE_KEY
148 ))));
149 };
150 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
151 }
152
153 let partition_info = get_partition_compute_info(&resolved_with_options).await?;
154
155 let context = if let Some(explain_options) = explain_options {
156 OptimizerContext::new(handler_args.clone(), explain_options)
157 } else {
158 OptimizerContext::from_handler_args(handler_args.clone())
159 };
160
161 let is_auto_schema_change = resolved_with_options
162 .remove(AUTO_SCHEMA_CHANGE_KEY)
163 .map(|value| {
164 value.parse::<bool>().map_err(|_| {
165 ErrorCode::InvalidInputSyntax(format!(
166 "invalid value {} of '{}' option, expect",
167 value, AUTO_SCHEMA_CHANGE_KEY
168 ))
169 })
170 })
171 .transpose()?
172 .unwrap_or(false);
173
174 if is_auto_schema_change {
175 Feature::SinkAutoSchemaChange.check_available()?;
176 }
177
178 let sink_from_table_name;
180 let direct_sink_from_name: Option<(ObjectName, bool)>;
183 let query = match stmt.sink_from {
184 CreateSink::From(from_name) => {
185 sink_from_table_name = from_name.0.last().unwrap().real_value();
186 direct_sink_from_name = Some((from_name.clone(), is_auto_schema_change));
187 if is_auto_schema_change && stmt.into_table_name.is_some() {
188 return Err(RwError::from(ErrorCode::InvalidInputSyntax(
189 "auto schema change not supported for sink-into-table".to_owned(),
190 )));
191 }
192 Box::new(gen_query_from_table_name(from_name))
193 }
194 CreateSink::AsQuery(query) => {
195 if is_auto_schema_change {
196 return Err(RwError::from(ErrorCode::InvalidInputSyntax(
197 "auto schema change not supported for CREATE SINK AS QUERY".to_owned(),
198 )));
199 }
200 sink_from_table_name = sink_table_name.clone();
201 direct_sink_from_name = None;
202 query
203 }
204 };
205
206 let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
207
208 let (sink_database_id, sink_schema_id) =
209 session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
210
211 let (dependent_relations, dependent_udfs, bound, auto_refresh_schema_from_table) = {
212 let mut binder = Binder::new_for_stream(session);
213 let auto_refresh_schema_from_table = if let Some((from_name, true)) = &direct_sink_from_name
214 {
215 let from_relation = binder.bind_relation_by_name(from_name, None, None, true)?;
216 if let Relation::BaseTable(table) = from_relation {
217 if table.table_catalog.table_type != TableType::Table {
218 return Err(ErrorCode::InvalidInputSyntax(format!(
219 "auto schema change only support on TABLE, but got {:?}",
220 table.table_catalog.table_type
221 ))
222 .into());
223 }
224 if table.table_catalog.database_id != sink_database_id {
225 return Err(ErrorCode::InvalidInputSyntax(
226 "auto schema change sink does not support created from cross database table".to_owned()
227 )
228 .into());
229 }
230 for col in &table.table_catalog.columns {
231 if !col.is_hidden() && (col.is_generated() || col.is_rw_sys_column()) {
232 return Err(ErrorCode::InvalidInputSyntax(format!("auto schema change not supported for table with non-hidden generated column or sys column, but got {}", col.name())).into());
233 }
234 }
235 Some(table.table_catalog)
236 } else {
237 return Err(RwError::from(ErrorCode::NotSupported(
238 "auto schema change only supported for TABLE".to_owned(),
239 "try recreating the sink from table".to_owned(),
240 )));
241 }
242 } else {
243 None
244 };
245 let bound = binder.bind_query(&query)?;
246
247 (
248 binder.included_relations().clone(),
249 binder.included_udfs().clone(),
250 bound,
251 auto_refresh_schema_from_table,
252 )
253 };
254
255 let col_names = if sink_into_table_name.is_some() {
256 parse_column_names(&stmt.columns)
257 } else {
258 get_column_names(&bound, stmt.columns)?
260 };
261
262 if sink_into_table_name.is_some() {
263 let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
264
265 if prev.is_some() {
266 return Err(RwError::from(ErrorCode::BindError(
267 "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
268 )));
269 }
270 }
271
272 let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
273 if emit_on_window_close {
274 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
275 }
276
277 let connector = resolved_with_options
278 .get(CONNECTOR_TYPE_KEY)
279 .cloned()
280 .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
281
282 let format_desc = match stmt.sink_schema {
283 Some(f) => {
285 validate_compatibility(&connector, &f)?;
286 Some(bind_sink_format_desc(session,f)?)
287 }
288 None => match resolved_with_options.get(SINK_TYPE_OPTION) {
289 Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
291 session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
292 if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
293 f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
294 }
295 f
296 }),
297 None => None,
299 },
300 };
301
302 let definition = context.normalized_sql().to_owned();
303 let mut plan_root = if is_iceberg_engine_internal {
304 Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
305 } else {
306 Planner::new_for_stream(context.into()).plan_query(bound)?
307 };
308 if let Some(col_names) = &col_names {
309 plan_root.set_out_names(col_names.clone())?;
310 };
311
312 let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
313 Some(flag) if flag.eq_ignore_ascii_case("false") => {
314 if direct_sink_from_name.is_some() || is_iceberg_engine_internal {
315 true
316 } else {
317 return Err(ErrorCode::BindError(
318 "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
319 )
320 .into());
321 }
322 }
323 _ => false,
324 };
325
326 let target_table_catalog = stmt
327 .into_table_name
328 .as_ref()
329 .map(|table_name| fetch_table_catalog_for_alter(session, table_name))
330 .transpose()?;
331
332 if let Some(target_table_catalog) = &target_table_catalog {
333 if let Some(col_names) = col_names {
334 let target_table_columns = target_table_catalog
335 .columns()
336 .iter()
337 .map(|c| c.name())
338 .collect::<BTreeSet<_>>();
339 for c in col_names {
340 if !target_table_columns.contains(c.as_str()) {
341 return Err(RwError::from(ErrorCode::BindError(format!(
342 "Column {} not found in table {}",
343 c,
344 target_table_catalog.name()
345 ))));
346 }
347 }
348 }
349 if target_table_catalog
350 .columns()
351 .iter()
352 .any(|col| !col.nullable())
353 {
354 notice_to_user(format!(
355 "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
356 target_table_catalog.name(),
357 ));
358 }
359 }
360
361 let sink_plan = plan_root.gen_sink_plan(
362 sink_table_name,
363 definition,
364 resolved_with_options,
365 emit_on_window_close,
366 db_name.to_owned(),
367 sink_from_table_name,
368 format_desc,
369 without_backfill,
370 target_table_catalog.clone(),
371 partition_info,
372 user_specified_columns,
373 auto_refresh_schema_from_table,
374 )?;
375
376 let sink_desc = sink_plan.sink_desc().clone();
377
378 let mut sink_plan: PlanRef = sink_plan.into();
379
380 let ctx = sink_plan.ctx();
381 let explain_trace = ctx.is_explain_trace();
382 if explain_trace {
383 ctx.trace("Create Sink:");
384 ctx.trace(sink_plan.explain_to_string());
385 }
386 tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
387
388 let dependencies =
391 RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
392 .into_iter()
393 .map(|id| id.table_id() as ObjectId)
394 .chain(
395 dependent_udfs
396 .into_iter()
397 .map(|id| id.function_id() as ObjectId),
398 )
399 .collect();
400
401 let sink_catalog = sink_desc.into_catalog(
402 SchemaId::new(sink_schema_id),
403 DatabaseId::new(sink_database_id),
404 UserId::new(session.user_id()),
405 connector_conn_ref.map(ConnectionId::from),
406 );
407
408 if let Some(table_catalog) = &target_table_catalog {
409 for column in sink_catalog.full_columns() {
410 if !column.can_dml() {
411 unreachable!(
412 "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
413 );
414 }
415 }
416
417 let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
418 let exprs = derive_default_column_project_for_sink(
419 &sink_catalog,
420 sink_plan.schema(),
421 &table_columns_without_rw_timestamp,
422 user_specified_columns,
423 )?;
424
425 let logical_project = generic::Project::new(exprs, sink_plan);
426
427 sink_plan = StreamProject::new(logical_project).into();
428
429 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
430 &table_columns_without_rw_timestamp,
431 )?;
432
433 if let Some(exprs) = exprs {
434 let logical_project = generic::Project::new(exprs, sink_plan);
435 sink_plan = StreamProject::new(logical_project).into();
436 }
437 };
438
439 Ok(SinkPlanContext {
440 query,
441 sink_plan,
442 sink_catalog,
443 target_table_catalog,
444 dependencies,
445 })
446}
447
448pub async fn get_partition_compute_info(
453 with_options: &WithOptionsSecResolved,
454) -> Result<Option<PartitionComputeInfo>> {
455 let (options, secret_refs) = with_options.clone().into_parts();
456 let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
457 return Ok(None);
458 };
459 let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
460 match connector.as_str() {
461 ICEBERG_SINK => {
462 let iceberg_config = IcebergConfig::from_btreemap(properties)?;
463 get_partition_compute_info_for_iceberg(&iceberg_config).await
464 }
465 _ => Ok(None),
466 }
467}
468
469#[allow(clippy::unused_async)]
470async fn get_partition_compute_info_for_iceberg(
471 _iceberg_config: &IcebergConfig,
472) -> Result<Option<PartitionComputeInfo>> {
473 if _iceberg_config.create_table_if_not_exists {
475 return Ok(None);
476 }
477 let table = _iceberg_config.load_table().await?;
478 let partition_spec = table.metadata().default_partition_spec();
479 if partition_spec.is_unpartitioned() {
480 return Ok(None);
481 }
482
483 let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
488 Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
490 Transform::Year
492 | Transform::Month
493 | Transform::Day
494 | Transform::Hour
495 | Transform::Void
496 | Transform::Unknown => false,
497 });
498 if !has_sparse_partition {
499 return Ok(None);
500 }
501
502 let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
503 table.metadata().default_partition_type().clone(),
504 ))
505 .map_err(|_| {
506 RwError::from(ErrorCode::SinkError(
507 "Fail to convert iceberg partition type to arrow type".into(),
508 ))
509 })?;
510 let ArrowDataType::Struct(struct_fields) = arrow_type else {
511 return Err(RwError::from(ErrorCode::SinkError(
512 "Partition type of iceberg should be a struct type".into(),
513 )));
514 };
515
516 let schema = table.metadata().current_schema();
517 let partition_fields = partition_spec
518 .fields()
519 .iter()
520 .map(|f| {
521 let source_f =
522 schema
523 .field_by_id(f.source_id)
524 .ok_or(RwError::from(ErrorCode::SinkError(
525 "Fail to look up iceberg partition field".into(),
526 )))?;
527 Ok((source_f.name.clone(), f.transform))
528 })
529 .collect::<Result<Vec<_>>>()?;
530
531 Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
532 partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
533 partition_fields,
534 })))
535}
536
537pub async fn handle_create_sink(
538 handle_args: HandlerArgs,
539 stmt: CreateSinkStatement,
540 is_iceberg_engine_internal: bool,
541) -> Result<RwPgResponse> {
542 let session = handle_args.session.clone();
543
544 session.check_cluster_limits().await?;
545
546 let if_not_exists = stmt.if_not_exists;
547 if let Either::Right(resp) = session.check_relation_name_duplicated(
548 stmt.sink_name.clone(),
549 StatementType::CREATE_SINK,
550 if_not_exists,
551 )? {
552 return Ok(resp);
553 }
554
555 let (mut sink, graph, target_table_catalog, dependencies) = {
556 let SinkPlanContext {
557 query,
558 sink_plan: plan,
559 sink_catalog: sink,
560 target_table_catalog,
561 dependencies,
562 } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
563
564 let has_order_by = !query.order_by.is_empty();
565 if has_order_by {
566 plan.ctx().warn_to_user(
567 r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
568 .to_owned(),
569 );
570 }
571
572 let graph = build_graph(plan, Some(GraphJobType::Sink))?;
573
574 (sink, graph, target_table_catalog, dependencies)
575 };
576
577 let mut target_table_replace_plan = None;
578 if let Some(table_catalog) = target_table_catalog {
579 use crate::handler::alter_table_column::hijack_merger_for_target_table;
580
581 let (mut graph, mut table, source, target_job_type) =
582 reparse_table_for_sink(&session, &table_catalog).await?;
583
584 sink.original_target_columns = table.columns.clone();
585
586 table
587 .incoming_sinks
588 .clone_from(&table_catalog.incoming_sinks);
589
590 let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect();
591 let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;
592
593 let columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
594 for existing_sink in incoming_sinks {
595 hijack_merger_for_target_table(
596 &mut graph,
597 &columns_without_rw_timestamp,
598 &existing_sink,
599 Some(&existing_sink.unique_identity()),
600 )?;
601 }
602
603 hijack_merger_for_target_table(&mut graph, &columns_without_rw_timestamp, &sink, None)?;
605
606 target_table_replace_plan = Some(ReplaceJobPlan {
607 replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
608 replace_job_plan::ReplaceTable {
609 table: Some(table.to_prost()),
610 source: source.map(|x| x.to_prost()),
611 job_type: target_job_type as _,
612 },
613 )),
614 fragment_graph: Some(graph),
615 });
616 }
617
618 let _job_guard =
619 session
620 .env()
621 .creating_streaming_job_tracker()
622 .guard(CreatingStreamingJobInfo::new(
623 session.session_id(),
624 sink.database_id.database_id,
625 sink.schema_id.schema_id,
626 sink.name.clone(),
627 ));
628
629 let catalog_writer = session.catalog_writer()?;
630 catalog_writer
631 .create_sink(
632 sink.to_proto(),
633 graph,
634 target_table_replace_plan,
635 dependencies,
636 if_not_exists,
637 )
638 .await?;
639
640 Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
641}
642
643pub fn fetch_incoming_sinks(
644 session: &Arc<SessionImpl>,
645 incoming_sink_ids: &HashSet<SinkId>,
646) -> Result<Vec<Arc<SinkCatalog>>> {
647 let reader = session.env().catalog_reader().read_guard();
648 let mut sinks = Vec::with_capacity(incoming_sink_ids.len());
649 let db_name = &session.database();
650 for schema in reader.iter_schemas(db_name)? {
651 for sink in schema.iter_sink() {
652 if incoming_sink_ids.contains(&sink.id.sink_id) {
653 sinks.push(sink.clone());
654 }
655 }
656 }
657
658 Ok(sinks)
659}
660
661pub(crate) async fn reparse_table_for_sink(
662 session: &Arc<SessionImpl>,
663 table_catalog: &Arc<TableCatalog>,
664) -> Result<(
665 StreamFragmentGraph,
666 TableCatalog,
667 Option<SourceCatalog>,
668 TableJobType,
669)> {
670 let definition = table_catalog.create_sql_ast_purified()?;
672 let Statement::CreateTable { name, .. } = &definition else {
673 panic!("unexpected statement: {:?}", definition);
674 };
675 let table_name = name.clone();
676
677 let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
679 let col_id_gen = ColumnIdGenerator::new_alter(table_catalog);
680
681 let (graph, table, source, job_type) = generate_stream_graph_for_replace_table(
682 session,
683 table_name,
684 table_catalog,
685 handler_args,
686 definition,
687 col_id_gen,
688 SqlColumnStrategy::FollowUnchecked,
689 )
690 .await?;
691
692 Ok((graph, table, source, job_type))
693}
694
695pub(crate) fn insert_merger_to_union_with_project(
696 node: &mut StreamNode,
697 project_node: &PbNodeBody,
698 uniq_identity: Option<&str>,
699) {
700 if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
701 node.input.push(StreamNode {
703 input: vec![StreamNode {
704 node_body: Some(NodeBody::Merge(Box::new(MergeNode {
705 ..Default::default()
706 }))),
707 ..Default::default()
708 }],
709 identity: uniq_identity
710 .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
711 .to_owned(),
712 fields: node.fields.clone(),
713 node_body: Some(project_node.clone()),
714 ..Default::default()
715 });
716
717 return;
718 }
719
720 for input in &mut node.input {
721 insert_merger_to_union_with_project(input, project_node, uniq_identity);
722 }
723}
724
725fn derive_sink_to_table_expr(
726 sink_schema: &Schema,
727 idx: usize,
728 target_type: &DataType,
729) -> Result<ExprImpl> {
730 let input_type = &sink_schema.fields()[idx].data_type;
731
732 if target_type != input_type {
733 bail!(
734 "column type mismatch: {:?} vs {:?}, column name: {:?}",
735 target_type,
736 input_type,
737 sink_schema.fields()[idx].name
738 );
739 } else {
740 Ok(ExprImpl::InputRef(Box::new(InputRef::new(
741 idx,
742 input_type.clone(),
743 ))))
744 }
745}
746
747pub(crate) fn derive_default_column_project_for_sink(
748 sink: &SinkCatalog,
749 sink_schema: &Schema,
750 columns: &[ColumnCatalog],
751 user_specified_columns: bool,
752) -> Result<Vec<ExprImpl>> {
753 assert_eq!(sink.full_schema().len(), sink_schema.len());
754
755 let default_column_exprs = TableCatalog::default_column_exprs(columns);
756
757 let mut exprs = vec![];
758
759 let sink_visible_col_idxes = sink
760 .full_columns()
761 .iter()
762 .positions(|c| !c.is_hidden())
763 .collect_vec();
764 let sink_visible_col_idxes_by_name = sink
765 .full_columns()
766 .iter()
767 .enumerate()
768 .filter(|(_, c)| !c.is_hidden())
769 .map(|(i, c)| (c.name(), i))
770 .collect::<BTreeMap<_, _>>();
771
772 for (idx, column) in columns.iter().enumerate() {
773 if !column.can_dml() {
774 continue;
775 }
776
777 let default_col_expr =
778 || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
779
780 let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
781 derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
782 };
783
784 #[allow(clippy::collapsible_else_if)]
788 if user_specified_columns {
789 if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
790 exprs.push(sink_col_expr(*idx)?);
791 } else {
792 exprs.push(default_col_expr());
793 }
794 } else {
795 if idx < sink_visible_col_idxes.len() {
796 exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
797 } else {
798 exprs.push(default_col_expr());
799 };
800 }
801 }
802 Ok(exprs)
803}
804
805fn bind_sink_format_desc(
809 session: &SessionImpl,
810 value: FormatEncodeOptions,
811) -> Result<SinkFormatDesc> {
812 use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
813 use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
814 use risingwave_sqlparser::ast::{Encode as E, Format as F};
815
816 let format = match value.format {
817 F::Plain => SinkFormat::AppendOnly,
818 F::Upsert => SinkFormat::Upsert,
819 F::Debezium => SinkFormat::Debezium,
820 f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
821 return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
822 }
823 };
824 let encode = match value.row_encode {
825 E::Json => SinkEncode::Json,
826 E::Protobuf => SinkEncode::Protobuf,
827 E::Avro => SinkEncode::Avro,
828 E::Template => SinkEncode::Template,
829 E::Parquet => SinkEncode::Parquet,
830 e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
831 return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
832 }
833 };
834
835 let mut key_encode = None;
836 if let Some(encode) = value.key_encode {
837 match encode {
838 E::Text => key_encode = Some(SinkEncode::Text),
839 E::Bytes => key_encode = Some(SinkEncode::Bytes),
840 _ => {
841 return Err(ErrorCode::BindError(format!(
842 "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
843 ))
844 .into());
845 }
846 }
847 }
848
849 let (props, connection_type_flag, schema_registry_conn_ref) =
850 resolve_connection_ref_and_secret_ref(
851 WithOptions::try_from(value.row_options.as_slice())?,
852 session,
853 Some(TelemetryDatabaseObject::Sink),
854 )?;
855 ensure_connection_type_allowed(
856 connection_type_flag,
857 &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
858 )?;
859 let (mut options, secret_refs) = props.into_parts();
860
861 options
862 .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
863 .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
864
865 Ok(SinkFormatDesc {
866 format,
867 encode,
868 options,
869 secret_refs,
870 key_encode,
871 connection_id: schema_registry_conn_ref,
872 })
873}
874
875static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
876 LazyLock::new(|| {
877 use risingwave_connector::sink::Sink as _;
878 use risingwave_connector::sink::file_sink::azblob::AzblobSink;
879 use risingwave_connector::sink::file_sink::fs::FsSink;
880 use risingwave_connector::sink::file_sink::gcs::GcsSink;
881 use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
882 use risingwave_connector::sink::file_sink::s3::{S3Sink, SnowflakeSink};
883 use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
884 use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
885 use risingwave_connector::sink::kafka::KafkaSink;
886 use risingwave_connector::sink::kinesis::KinesisSink;
887 use risingwave_connector::sink::mqtt::MqttSink;
888 use risingwave_connector::sink::pulsar::PulsarSink;
889 use risingwave_connector::sink::redis::RedisSink;
890
891 convert_args!(hashmap!(
892 GooglePubSubSink::SINK_NAME => hashmap!(
893 Format::Plain => vec![Encode::Json],
894 ),
895 KafkaSink::SINK_NAME => hashmap!(
896 Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
897 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
898 Format::Debezium => vec![Encode::Json],
899 ),
900 FileSink::<S3Sink>::SINK_NAME => hashmap!(
901 Format::Plain => vec![Encode::Parquet, Encode::Json],
902 ),
903 FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
904 Format::Plain => vec![Encode::Parquet, Encode::Json],
905 ),
906 FileSink::<GcsSink>::SINK_NAME => hashmap!(
907 Format::Plain => vec![Encode::Parquet, Encode::Json],
908 ),
909 FileSink::<AzblobSink>::SINK_NAME => hashmap!(
910 Format::Plain => vec![Encode::Parquet, Encode::Json],
911 ),
912 FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
913 Format::Plain => vec![Encode::Parquet, Encode::Json],
914 ),
915 FileSink::<FsSink>::SINK_NAME => hashmap!(
916 Format::Plain => vec![Encode::Parquet, Encode::Json],
917 ),
918 KinesisSink::SINK_NAME => hashmap!(
919 Format::Plain => vec![Encode::Json],
920 Format::Upsert => vec![Encode::Json],
921 Format::Debezium => vec![Encode::Json],
922 ),
923 MqttSink::SINK_NAME => hashmap!(
924 Format::Plain => vec![Encode::Json, Encode::Protobuf],
925 ),
926 PulsarSink::SINK_NAME => hashmap!(
927 Format::Plain => vec![Encode::Json],
928 Format::Upsert => vec![Encode::Json],
929 Format::Debezium => vec![Encode::Json],
930 ),
931 RedisSink::SINK_NAME => hashmap!(
932 Format::Plain => vec![Encode::Json, Encode::Template],
933 Format::Upsert => vec![Encode::Json, Encode::Template],
934 ),
935 ))
936 });
937
938pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
939 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
940 .get(connector)
941 .ok_or_else(|| {
942 ErrorCode::BindError(format!(
943 "connector {} is not supported by FORMAT ... ENCODE ... syntax",
944 connector
945 ))
946 })?;
947 let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
948 ErrorCode::BindError(format!(
949 "connector {} does not support format {:?}",
950 connector, format_desc.format
951 ))
952 })?;
953 if !compatible_encodes.contains(&format_desc.row_encode) {
954 return Err(ErrorCode::BindError(format!(
955 "connector {} does not support format {:?} with encode {:?}",
956 connector, format_desc.format, format_desc.row_encode
957 ))
958 .into());
959 }
960
961 if let Some(encode) = &format_desc.key_encode
963 && connector != KAFKA_SINK
964 && matches!(encode, Encode::Bytes)
965 {
966 return Err(ErrorCode::BindError(format!(
967 "key encode bytes only works with kafka connector, but found {}",
968 connector
969 ))
970 .into());
971 }
972
973 Ok(())
974}
975
976#[cfg(test)]
977pub mod tests {
978 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
979
980 use crate::catalog::root_catalog::SchemaPath;
981 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
982
983 #[tokio::test]
984 async fn test_create_sink_handler() {
985 let proto_file = create_proto_file(PROTO_FILE_DATA);
986 let sql = format!(
987 r#"CREATE SOURCE t1
988 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
989 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
990 proto_file.path().to_str().unwrap()
991 );
992 let frontend = LocalFrontend::new(Default::default()).await;
993 frontend.run_sql(sql).await.unwrap();
994
995 let sql = "create materialized view mv1 as select t1.country from t1;";
996 frontend.run_sql(sql).await.unwrap();
997
998 let sql = r#"CREATE SINK snk1 FROM mv1
999 WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
1000 '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
1001 mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
1002 frontend.run_sql(sql).await.unwrap();
1003
1004 let session = frontend.session_ref();
1005 let catalog_reader = session.env().catalog_reader().read_guard();
1006 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1007
1008 let (source, _) = catalog_reader
1010 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
1011 .unwrap();
1012 assert_eq!(source.name, "t1");
1013
1014 let (table, schema_name) = catalog_reader
1016 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
1017 .unwrap();
1018 assert_eq!(table.name(), "mv1");
1019
1020 let (sink, _) = catalog_reader
1022 .get_created_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
1023 .unwrap();
1024 assert_eq!(sink.name, "snk1");
1025 }
1026}