1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use anyhow::Context;
19use either::Either;
20use iceberg::arrow::type_to_arrow_type;
21use iceberg::spec::Transform;
22use itertools::Itertools;
23use maplit::{convert_args, hashmap, hashset};
24use pgwire::pg_response::{PgResponse, StatementType};
25use risingwave_common::array::arrow::IcebergArrowConvert;
26use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
27use risingwave_common::bail;
28use risingwave_common::catalog::{
29 ColumnCatalog, ICEBERG_SINK_PREFIX, ObjectId, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30 Schema,
31};
32use risingwave_common::license::Feature;
33use risingwave_common::secret::LocalSecretManager;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::types::DataType;
36use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
37use risingwave_connector::sink::file_sink::s3::SnowflakeSink;
38use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
39use risingwave_connector::sink::kafka::KAFKA_SINK;
40use risingwave_connector::sink::snowflake_redshift::redshift::RedshiftSink;
41use risingwave_connector::sink::snowflake_redshift::snowflake::SnowflakeV2Sink;
42use risingwave_connector::sink::{
43 CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
44 SINK_USER_IGNORE_DELETE_OPTION, Sink, enforce_secret_sink,
45};
46use risingwave_connector::{
47 AUTO_SCHEMA_CHANGE_KEY, SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, SINK_INTERMEDIATE_TABLE_NAME,
48 SINK_TARGET_TABLE_NAME, WithPropertiesExt,
49};
50use risingwave_pb::catalog::connection_params::PbConnectionType;
51use risingwave_pb::telemetry::TelemetryDatabaseObject;
52use risingwave_sqlparser::ast::{
53 CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
54 ObjectName, Query, Statement,
55};
56use risingwave_sqlparser::parser::Parser;
57
58use super::RwPgResponse;
59use super::create_mv::get_column_names;
60use super::create_source::UPSTREAM_SOURCE_KEY;
61use super::util::gen_query_from_table_name;
62use crate::binder::{Binder, Relation};
63use crate::catalog::table_catalog::TableType;
64use crate::error::{ErrorCode, Result, RwError};
65use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
66use crate::handler::HandlerArgs;
67use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
68use crate::handler::create_mv::parse_column_names;
69use crate::handler::util::{
70 LongRunningNotificationAction, check_connector_match_connection_type,
71 ensure_connection_type_allowed, execute_with_long_running_notification,
72 get_table_catalog_by_table_name, reject_internal_table_dependencies,
73};
74use crate::optimizer::backfill_order_strategy::plan_backfill_order;
75use crate::optimizer::plan_node::{
76 IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamPlanRef as PlanRef,
77 StreamProject, generic,
78};
79use crate::optimizer::{OptimizerContext, RelationCollectorVisitor};
80use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
81use crate::session::SessionImpl;
82use crate::session::current::notice_to_user;
83use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
84use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
85use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
86
87static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
88 LazyLock::new(|| {
89 hashset! {
90 PbConnectionType::Unspecified,
91 PbConnectionType::Kafka,
92 PbConnectionType::Iceberg,
93 PbConnectionType::Elasticsearch,
94 }
95 });
96
97static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
98 LazyLock::new(|| {
99 hashset! {
100 PbConnectionType::Unspecified,
101 PbConnectionType::SchemaRegistry,
102 }
103 });
104
105pub struct SinkPlanContext {
107 pub query: Box<Query>,
108 pub sink_plan: PlanRef,
109 pub sink_catalog: SinkCatalog,
110 pub target_table_catalog: Option<Arc<TableCatalog>>,
111 pub dependencies: HashSet<ObjectId>,
112}
113
114pub async fn gen_sink_plan(
115 handler_args: HandlerArgs,
116 stmt: CreateSinkStatement,
117 explain_options: Option<ExplainOptions>,
118 is_iceberg_engine_internal: bool,
119) -> Result<SinkPlanContext> {
120 let session = handler_args.session.clone();
121 let session = session.as_ref();
122 let user_specified_columns = !stmt.columns.is_empty();
123 let db_name = &session.database();
124 let (sink_schema_name, sink_table_name) =
125 Binder::resolve_schema_qualified_name(db_name, &stmt.sink_name)?;
126
127 let mut with_options = handler_args.with_options.clone();
128
129 if session
130 .env()
131 .system_params_manager()
132 .get_params()
133 .load()
134 .enforce_secret()
135 && Feature::SecretManagement.check_available().is_ok()
136 {
137 enforce_secret_sink(&with_options)?;
138 }
139
140 resolve_privatelink_in_with_option(&mut with_options)?;
141 let (mut resolved_with_options, connection_type, connector_conn_ref) =
142 resolve_connection_ref_and_secret_ref(
143 with_options,
144 session,
145 Some(TelemetryDatabaseObject::Sink),
146 )?;
147 ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
148
149 if !matches!(connection_type, PbConnectionType::Unspecified) {
151 let Some(connector) = resolved_with_options.get_connector() else {
152 return Err(RwError::from(ErrorCode::ProtocolError(format!(
153 "missing field '{}' in WITH clause",
154 CONNECTOR_TYPE_KEY
155 ))));
156 };
157 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
158 }
159
160 let partition_info = get_partition_compute_info(&resolved_with_options).await?;
161
162 let context = if let Some(explain_options) = explain_options {
163 OptimizerContext::new(handler_args.clone(), explain_options)
164 } else {
165 OptimizerContext::from_handler_args(handler_args.clone())
166 };
167
168 let is_auto_schema_change = resolved_with_options
169 .remove(AUTO_SCHEMA_CHANGE_KEY)
170 .map(|value| {
171 value.parse::<bool>().map_err(|_| {
172 ErrorCode::InvalidInputSyntax(format!(
173 "invalid value {} of '{}' option, expect",
174 value, AUTO_SCHEMA_CHANGE_KEY
175 ))
176 })
177 })
178 .transpose()?
179 .unwrap_or(false);
180
181 if is_auto_schema_change {
182 Feature::SinkAutoSchemaChange.check_available()?;
183 }
184
185 let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
186 if sink_into_table_name.is_some() {
187 let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
188
189 if prev.is_some() {
190 return Err(RwError::from(ErrorCode::BindError(
191 "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
192 )));
193 }
194 }
195 let connector = resolved_with_options
196 .get(CONNECTOR_TYPE_KEY)
197 .cloned()
198 .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
199
200 let sink_from_table_name;
202 let direct_sink_from_name: Option<(ObjectName, bool)>;
205 let mut query = match stmt.sink_from {
206 CreateSink::From(from_name) => {
207 sink_from_table_name = from_name.0.last().unwrap().real_value();
208 direct_sink_from_name = Some((from_name.clone(), is_auto_schema_change));
209 if is_auto_schema_change && sink_into_table_name.is_some() {
210 return Err(RwError::from(ErrorCode::InvalidInputSyntax(
211 "auto schema change not supported for sink-into-table".to_owned(),
212 )));
213 }
214 if resolved_with_options
215 .value_eq_ignore_case(SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, "true")
216 && connector == RedshiftSink::SINK_NAME
217 || connector == SnowflakeV2Sink::SINK_NAME
218 {
219 if let Some(table_name) = resolved_with_options.get(SINK_TARGET_TABLE_NAME) {
220 if resolved_with_options
222 .get(SINK_INTERMEDIATE_TABLE_NAME)
223 .is_none()
224 {
225 let intermediate_table_name = format!(
227 "rw_{}_{}_{}",
228 sink_table_name,
229 table_name,
230 uuid::Uuid::new_v4()
231 );
232 resolved_with_options.insert(
233 SINK_INTERMEDIATE_TABLE_NAME.to_owned(),
234 intermediate_table_name,
235 );
236 }
237 } else {
238 return Err(RwError::from(ErrorCode::BindError(
239 "'table.name' option must be specified.".to_owned(),
240 )));
241 }
242 }
243 Box::new(gen_query_from_table_name(from_name))
244 }
245 CreateSink::AsQuery(query) => {
246 if is_auto_schema_change {
247 return Err(RwError::from(ErrorCode::InvalidInputSyntax(
248 "auto schema change not supported for CREATE SINK AS QUERY".to_owned(),
249 )));
250 }
251 sink_from_table_name = sink_table_name.clone();
252 direct_sink_from_name = None;
253 query
254 }
255 };
256
257 if is_iceberg_engine_internal && let Some((from_name, _)) = &direct_sink_from_name {
258 let (table, _) = get_table_catalog_by_table_name(session, from_name)?;
259 let pk_names = table.pk_column_names();
260 if pk_names.len() == 1 && pk_names[0].eq(ROW_ID_COLUMN_NAME) {
261 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
262 "select {} as {}, * from {}",
263 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, from_name
264 ))
265 .context("unable to parse query")?
266 .try_into()
267 .unwrap();
268 let Statement::Query(parsed_query) = stmt else {
269 panic!("unexpected statement: {:?}", stmt);
270 };
271 query = parsed_query;
272 }
273 }
274
275 let (sink_database_id, sink_schema_id) =
276 session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
277
278 let (
279 dependent_relations,
280 dependent_udfs,
281 dependent_secrets,
282 bound,
283 auto_refresh_schema_from_table,
284 ) = {
285 let mut binder = Binder::new_for_stream(session);
286 let auto_refresh_schema_from_table = if let Some((from_name, true)) = &direct_sink_from_name
287 {
288 let from_relation = binder.bind_relation_by_name(from_name, None, None, true)?;
289 if let Relation::BaseTable(table) = from_relation {
290 if table.table_catalog.table_type != TableType::Table {
291 return Err(ErrorCode::InvalidInputSyntax(format!(
292 "auto schema change only support on TABLE, but got {:?}",
293 table.table_catalog.table_type
294 ))
295 .into());
296 }
297 if table.table_catalog.database_id != sink_database_id {
298 return Err(ErrorCode::InvalidInputSyntax(
299 "auto schema change sink does not support created from cross database table".to_owned()
300 )
301 .into());
302 }
303 for col in &table.table_catalog.columns {
304 if !col.is_hidden() && (col.is_generated() || col.is_rw_sys_column()) {
305 return Err(ErrorCode::InvalidInputSyntax(format!("auto schema change not supported for table with non-hidden generated column or sys column, but got {}", col.name())).into());
306 }
307 }
308 Some(table.table_catalog)
309 } else {
310 return Err(RwError::from(ErrorCode::NotSupported(
311 "auto schema change only supported for TABLE".to_owned(),
312 "try recreating the sink from table".to_owned(),
313 )));
314 }
315 } else {
316 None
317 };
318
319 let bound = binder.bind_query(&query)?;
320
321 (
322 binder.included_relations().clone(),
323 binder.included_udfs().clone(),
324 binder.included_secrets().clone(),
325 bound,
326 auto_refresh_schema_from_table,
327 )
328 };
329
330 reject_internal_table_dependencies(session, &dependent_relations, "CREATE SINK")?;
331
332 let col_names = if sink_into_table_name.is_some() {
333 parse_column_names(&stmt.columns)
334 } else {
335 get_column_names(&bound, stmt.columns)?
337 };
338
339 let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
340 if emit_on_window_close {
341 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
342 }
343
344 let format_desc = match stmt.sink_schema {
345 Some(f) => {
347 validate_compatibility(&connector, &f)?;
348 Some(bind_sink_format_desc(session,f)?)
349 }
350 None => match resolved_with_options.get(SINK_TYPE_OPTION) {
351 Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
353 session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
354 if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
355 f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
356 }
357 if let Some(v) = resolved_with_options.get(SINK_USER_IGNORE_DELETE_OPTION) {
358 f.options.insert(SINK_USER_IGNORE_DELETE_OPTION.into(), v.into());
359 }
360 f
361 }),
362 None => None,
364 },
365 };
366
367 let definition = context.normalized_sql().to_owned();
368 let mut plan_root = if is_iceberg_engine_internal {
369 Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
370 } else {
371 Planner::new_for_stream(context.into()).plan_query(bound)?
372 };
373 if let Some(col_names) = &col_names {
374 plan_root.set_out_names(col_names.clone())?;
375 };
376
377 let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
378 Some(flag) if flag.eq_ignore_ascii_case("false") => {
379 if direct_sink_from_name.is_some() || is_iceberg_engine_internal {
380 true
381 } else {
382 return Err(ErrorCode::BindError(
383 "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
384 )
385 .into());
386 }
387 }
388 _ => false,
389 };
390
391 let target_table_catalog = stmt
392 .into_table_name
393 .as_ref()
394 .map(|table_name| fetch_table_catalog_for_alter(session, table_name).map(|t| t.0))
395 .transpose()?;
396
397 if let Some(target_table_catalog) = &target_table_catalog {
398 if let Some(col_names) = col_names {
399 let target_table_columns = target_table_catalog
400 .columns()
401 .iter()
402 .map(|c| c.name())
403 .collect::<BTreeSet<_>>();
404 for c in col_names {
405 if !target_table_columns.contains(c.as_str()) {
406 return Err(RwError::from(ErrorCode::BindError(format!(
407 "Column {} not found in table {}",
408 c,
409 target_table_catalog.name()
410 ))));
411 }
412 }
413 }
414 if target_table_catalog
415 .columns()
416 .iter()
417 .any(|col| !col.nullable())
418 {
419 notice_to_user(format!(
420 "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
421 target_table_catalog.name(),
422 ));
423 }
424 }
425
426 let allow_snapshot_backfill = target_table_catalog.is_none() && !is_iceberg_engine_internal;
427
428 let sink_plan = plan_root.gen_sink_plan(
429 sink_table_name,
430 definition,
431 resolved_with_options,
432 emit_on_window_close,
433 db_name.to_owned(),
434 sink_from_table_name,
435 format_desc,
436 without_backfill,
437 target_table_catalog.clone(),
438 partition_info,
439 user_specified_columns,
440 auto_refresh_schema_from_table,
441 allow_snapshot_backfill,
442 )?;
443
444 let sink_desc = sink_plan.sink_desc().clone();
445
446 let mut sink_plan: PlanRef = sink_plan.into_stream_plan()?;
447
448 let ctx = sink_plan.ctx();
449 let explain_trace = ctx.is_explain_trace();
450 if explain_trace {
451 ctx.trace("Create Sink:");
452 ctx.trace(sink_plan.explain_to_string());
453 }
454 tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
455
456 let dependencies =
459 RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
460 .into_iter()
461 .chain(dependent_udfs.iter().copied().map_into())
462 .chain(
463 dependent_secrets
464 .iter()
465 .copied()
466 .map(|id| id.as_object_id()),
467 )
468 .collect();
469
470 let sink_catalog = sink_desc.into_catalog(
471 sink_schema_id,
472 sink_database_id,
473 session.user_id(),
474 connector_conn_ref,
475 );
476
477 if let Some(table_catalog) = &target_table_catalog {
478 for column in sink_catalog.full_columns() {
479 if !column.can_dml() {
480 unreachable!(
481 "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
482 );
483 }
484 }
485
486 let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
487 let exprs = derive_default_column_project_for_sink(
488 &sink_catalog,
489 sink_plan.schema(),
490 &table_columns_without_rw_timestamp,
491 user_specified_columns,
492 )?;
493
494 let logical_project = generic::Project::new(exprs, sink_plan);
495
496 sink_plan = StreamProject::new(logical_project).into();
497
498 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
499 &table_columns_without_rw_timestamp,
500 )?;
501
502 if let Some(exprs) = exprs {
503 let logical_project = generic::Project::new(exprs, sink_plan);
504 sink_plan = StreamProject::new(logical_project).into();
505 }
506 };
507
508 Ok(SinkPlanContext {
509 query,
510 sink_plan,
511 sink_catalog,
512 target_table_catalog,
513 dependencies,
514 })
515}
516
517pub async fn get_partition_compute_info(
522 with_options: &WithOptionsSecResolved,
523) -> Result<Option<PartitionComputeInfo>> {
524 let (options, secret_refs) = with_options.clone().into_parts();
525 let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
526 return Ok(None);
527 };
528 let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
529 match connector.as_str() {
530 ICEBERG_SINK => {
531 let iceberg_config = IcebergConfig::from_btreemap(properties)?;
532 get_partition_compute_info_for_iceberg(&iceberg_config).await
533 }
534 _ => Ok(None),
535 }
536}
537
538#[allow(clippy::unused_async)]
539async fn get_partition_compute_info_for_iceberg(
540 _iceberg_config: &IcebergConfig,
541) -> Result<Option<PartitionComputeInfo>> {
542 if _iceberg_config.create_table_if_not_exists {
544 return Ok(None);
545 }
546 let table = _iceberg_config.load_table().await?;
547 let partition_spec = table.metadata().default_partition_spec();
548 if partition_spec.is_unpartitioned() {
549 return Ok(None);
550 }
551
552 let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
557 Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
559 Transform::Year
561 | Transform::Month
562 | Transform::Day
563 | Transform::Hour
564 | Transform::Void
565 | Transform::Unknown => false,
566 });
567 if !has_sparse_partition {
568 return Ok(None);
569 }
570
571 let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
572 table.metadata().default_partition_type().clone(),
573 ))
574 .map_err(|_| {
575 RwError::from(ErrorCode::SinkError(
576 "Fail to convert iceberg partition type to arrow type".into(),
577 ))
578 })?;
579 let ArrowDataType::Struct(struct_fields) = arrow_type else {
580 return Err(RwError::from(ErrorCode::SinkError(
581 "Partition type of iceberg should be a struct type".into(),
582 )));
583 };
584
585 let schema = table.metadata().current_schema();
586 let partition_fields = partition_spec
587 .fields()
588 .iter()
589 .map(|f| {
590 let source_f =
591 schema
592 .field_by_id(f.source_id)
593 .ok_or(RwError::from(ErrorCode::SinkError(
594 "Fail to look up iceberg partition field".into(),
595 )))?;
596 Ok((source_f.name.clone(), f.transform))
597 })
598 .collect::<Result<Vec<_>>>()?;
599
600 Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
601 partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
602 partition_fields,
603 })))
604}
605
606pub async fn handle_create_sink(
607 handle_args: HandlerArgs,
608 stmt: CreateSinkStatement,
609 is_iceberg_engine_internal: bool,
610) -> Result<RwPgResponse> {
611 let session = handle_args.session.clone();
612
613 session.check_cluster_limits().await?;
614
615 let if_not_exists = stmt.if_not_exists;
616 if let Either::Right(resp) = session.check_relation_name_duplicated(
617 stmt.sink_name.clone(),
618 StatementType::CREATE_SINK,
619 if_not_exists,
620 )? {
621 return Ok(resp);
622 }
623
624 if stmt.sink_name.base_name().starts_with(ICEBERG_SINK_PREFIX) {
625 return Err(RwError::from(ErrorCode::InvalidInputSyntax(format!(
626 "Sink name cannot start with reserved prefix '{}'",
627 ICEBERG_SINK_PREFIX
628 ))));
629 }
630
631 let (mut sink, graph, target_table_catalog, dependencies) = {
632 let backfill_order_strategy = handle_args.with_options.backfill_order_strategy();
633
634 let SinkPlanContext {
635 query,
636 sink_plan: plan,
637 sink_catalog: sink,
638 target_table_catalog,
639 dependencies,
640 } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
641
642 let has_order_by = !query.order_by.is_empty();
643 if has_order_by {
644 plan.ctx().warn_to_user(
645 r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
646 .to_owned(),
647 );
648 }
649
650 let backfill_order =
651 plan_backfill_order(session.as_ref(), backfill_order_strategy, plan.clone())?;
652
653 let graph =
654 build_graph_with_strategy(plan, Some(GraphJobType::Sink), Some(backfill_order))?;
655
656 (sink, graph, target_table_catalog, dependencies)
657 };
658
659 if let Some(table_catalog) = target_table_catalog {
660 sink.original_target_columns = table_catalog.columns_without_rw_timestamp();
661 }
662
663 let _job_guard =
664 session
665 .env()
666 .creating_streaming_job_tracker()
667 .guard(CreatingStreamingJobInfo::new(
668 session.session_id(),
669 sink.database_id,
670 sink.schema_id,
671 sink.name.clone(),
672 ));
673
674 let catalog_writer = session.catalog_writer()?;
675 execute_with_long_running_notification(
676 catalog_writer.create_sink(sink.to_proto(), graph, dependencies, if_not_exists),
677 &session,
678 "CREATE SINK",
679 LongRunningNotificationAction::MonitorBackfillJob,
680 )
681 .await?;
682
683 Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
684}
685
686pub fn fetch_incoming_sinks(
687 session: &Arc<SessionImpl>,
688 table: &TableCatalog,
689) -> Result<Vec<Arc<SinkCatalog>>> {
690 let reader = session.env().catalog_reader().read_guard();
691 let schema = reader.get_schema_by_id(table.database_id, table.schema_id)?;
692 let Some(incoming_sinks) = schema.table_incoming_sinks(table.id) else {
693 return Ok(vec![]);
694 };
695 let mut sinks = vec![];
696 for sink_id in incoming_sinks {
697 sinks.push(
698 schema
699 .get_sink_by_id(*sink_id)
700 .expect("should exist")
701 .clone(),
702 );
703 }
704 Ok(sinks)
705}
706
707fn derive_sink_to_table_expr(
708 sink_schema: &Schema,
709 idx: usize,
710 target_type: &DataType,
711) -> Result<ExprImpl> {
712 let input_type = &sink_schema.fields()[idx].data_type;
713
714 if !target_type.equals_datatype(input_type) {
715 bail!(
716 "column type mismatch: {:?} vs {:?}, column name: {:?}",
717 target_type,
718 input_type,
719 sink_schema.fields()[idx].name
720 );
721 } else {
722 Ok(ExprImpl::InputRef(Box::new(InputRef::new(
723 idx,
724 input_type.clone(),
725 ))))
726 }
727}
728
729pub(crate) fn derive_default_column_project_for_sink(
730 sink: &SinkCatalog,
731 sink_schema: &Schema,
732 columns: &[ColumnCatalog],
733 user_specified_columns: bool,
734) -> Result<Vec<ExprImpl>> {
735 assert_eq!(sink.full_schema().len(), sink_schema.len());
736
737 let default_column_exprs = TableCatalog::default_column_exprs(columns);
738
739 let mut exprs = vec![];
740
741 let sink_visible_col_idxes = sink
742 .full_columns()
743 .iter()
744 .positions(|c| !c.is_hidden())
745 .collect_vec();
746 let sink_visible_col_idxes_by_name = sink
747 .full_columns()
748 .iter()
749 .enumerate()
750 .filter(|(_, c)| !c.is_hidden())
751 .map(|(i, c)| (c.name(), i))
752 .collect::<BTreeMap<_, _>>();
753
754 for (idx, column) in columns.iter().enumerate() {
755 if !column.can_dml() {
756 continue;
757 }
758
759 let default_col_expr =
760 || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
761
762 let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
763 derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
764 };
765
766 #[allow(clippy::collapsible_else_if)]
770 if user_specified_columns {
771 if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
772 exprs.push(sink_col_expr(*idx)?);
773 } else {
774 exprs.push(default_col_expr());
775 }
776 } else {
777 if idx < sink_visible_col_idxes.len() {
778 exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
779 } else {
780 exprs.push(default_col_expr());
781 };
782 }
783 }
784 Ok(exprs)
785}
786
787fn bind_sink_format_desc(
791 session: &SessionImpl,
792 value: FormatEncodeOptions,
793) -> Result<SinkFormatDesc> {
794 use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
795 use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
796 use risingwave_sqlparser::ast::{Encode as E, Format as F};
797
798 let format = match value.format {
799 F::Plain => SinkFormat::AppendOnly,
800 F::Upsert => SinkFormat::Upsert,
801 F::Debezium => SinkFormat::Debezium,
802 f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
803 return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
804 }
805 };
806 let encode = match value.row_encode {
807 E::Json => SinkEncode::Json,
808 E::Protobuf => SinkEncode::Protobuf,
809 E::Avro => SinkEncode::Avro,
810 E::Template => SinkEncode::Template,
811 E::Parquet => SinkEncode::Parquet,
812 E::Bytes => SinkEncode::Bytes,
813 e @ (E::Native | E::Csv | E::None | E::Text) => {
814 return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
815 }
816 };
817
818 let mut key_encode = None;
819 if let Some(encode) = value.key_encode {
820 match encode {
821 E::Text => key_encode = Some(SinkEncode::Text),
822 E::Bytes => key_encode = Some(SinkEncode::Bytes),
823 _ => {
824 return Err(ErrorCode::BindError(format!(
825 "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
826 ))
827 .into());
828 }
829 }
830 }
831
832 let (props, connection_type_flag, schema_registry_conn_ref) =
833 resolve_connection_ref_and_secret_ref(
834 WithOptions::try_from(value.row_options.as_slice())?,
835 session,
836 Some(TelemetryDatabaseObject::Sink),
837 )?;
838 ensure_connection_type_allowed(
839 connection_type_flag,
840 &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
841 )?;
842 let (mut options, secret_refs) = props.into_parts();
843
844 options
845 .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
846 .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
847
848 Ok(SinkFormatDesc {
849 format,
850 encode,
851 options,
852 secret_refs,
853 key_encode,
854 connection_id: schema_registry_conn_ref,
855 })
856}
857
858static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
859 LazyLock::new(|| {
860 use risingwave_connector::sink::Sink as _;
861 use risingwave_connector::sink::file_sink::azblob::AzblobSink;
862 use risingwave_connector::sink::file_sink::fs::FsSink;
863 use risingwave_connector::sink::file_sink::gcs::GcsSink;
864 use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
865 use risingwave_connector::sink::file_sink::s3::S3Sink;
866 use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
867 use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
868 use risingwave_connector::sink::kafka::KafkaSink;
869 use risingwave_connector::sink::kinesis::KinesisSink;
870 use risingwave_connector::sink::mqtt::MqttSink;
871 use risingwave_connector::sink::pulsar::PulsarSink;
872 use risingwave_connector::sink::redis::RedisSink;
873
874 convert_args!(hashmap!(
875 GooglePubSubSink::SINK_NAME => hashmap!(
876 Format::Plain => vec![Encode::Json],
877 ),
878 KafkaSink::SINK_NAME => hashmap!(
879 Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf, Encode::Bytes],
880 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
881 Format::Debezium => vec![Encode::Json],
882 ),
883 FileSink::<S3Sink>::SINK_NAME => hashmap!(
884 Format::Plain => vec![Encode::Parquet, Encode::Json],
885 ),
886 FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
887 Format::Plain => vec![Encode::Parquet, Encode::Json],
888 ),
889 FileSink::<GcsSink>::SINK_NAME => hashmap!(
890 Format::Plain => vec![Encode::Parquet, Encode::Json],
891 ),
892 FileSink::<AzblobSink>::SINK_NAME => hashmap!(
893 Format::Plain => vec![Encode::Parquet, Encode::Json],
894 ),
895 FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
896 Format::Plain => vec![Encode::Parquet, Encode::Json],
897 ),
898 FileSink::<FsSink>::SINK_NAME => hashmap!(
899 Format::Plain => vec![Encode::Parquet, Encode::Json],
900 ),
901 KinesisSink::SINK_NAME => hashmap!(
902 Format::Plain => vec![Encode::Json],
903 Format::Upsert => vec![Encode::Json],
904 Format::Debezium => vec![Encode::Json],
905 ),
906 MqttSink::SINK_NAME => hashmap!(
907 Format::Plain => vec![Encode::Json, Encode::Protobuf],
908 ),
909 PulsarSink::SINK_NAME => hashmap!(
910 Format::Plain => vec![Encode::Json],
911 Format::Upsert => vec![Encode::Json],
912 Format::Debezium => vec![Encode::Json],
913 ),
914 RedisSink::SINK_NAME => hashmap!(
915 Format::Plain => vec![Encode::Json, Encode::Template],
916 Format::Upsert => vec![Encode::Json, Encode::Template],
917 ),
918 ))
919 });
920
921pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
922 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
923 .get(connector)
924 .ok_or_else(|| {
925 ErrorCode::BindError(format!(
926 "connector {} is not supported by FORMAT ... ENCODE ... syntax",
927 connector
928 ))
929 })?;
930 let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
931 ErrorCode::BindError(format!(
932 "connector {} does not support format {:?}",
933 connector, format_desc.format
934 ))
935 })?;
936 if !compatible_encodes.contains(&format_desc.row_encode) {
937 return Err(ErrorCode::BindError(format!(
938 "connector {} does not support format {:?} with encode {:?}",
939 connector, format_desc.format, format_desc.row_encode
940 ))
941 .into());
942 }
943
944 if let Some(encode) = &format_desc.key_encode
946 && connector != KAFKA_SINK
947 && matches!(encode, Encode::Bytes)
948 {
949 return Err(ErrorCode::BindError(format!(
950 "key encode bytes only works with kafka connector, but found {}",
951 connector
952 ))
953 .into());
954 }
955
956 Ok(())
957}
958
959#[cfg(test)]
960pub mod tests {
961 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
962
963 use crate::catalog::root_catalog::SchemaPath;
964 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
965
966 #[tokio::test]
967 async fn test_create_sink_handler() {
968 let proto_file = create_proto_file(PROTO_FILE_DATA);
969 let sql = format!(
970 r#"CREATE SOURCE t1
971 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
972 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
973 proto_file.path().to_str().unwrap()
974 );
975 let frontend = LocalFrontend::new(Default::default()).await;
976 frontend.run_sql(sql).await.unwrap();
977
978 let sql = "create materialized view mv1 as select t1.country from t1;";
979 frontend.run_sql(sql).await.unwrap();
980
981 let sql = r#"CREATE SINK snk1 FROM mv1
982 WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
983 '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
984 mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
985 frontend.run_sql(sql).await.unwrap();
986
987 let session = frontend.session_ref();
988 let catalog_reader = session.env().catalog_reader().read_guard();
989 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
990
991 let (source, _) = catalog_reader
993 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
994 .unwrap();
995 assert_eq!(source.name, "t1");
996
997 let (table, schema_name) = catalog_reader
999 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
1000 .unwrap();
1001 assert_eq!(table.name(), "mv1");
1002
1003 let (sink, _) = catalog_reader
1005 .get_created_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
1006 .unwrap();
1007 assert_eq!(sink.name, "snk1");
1008 }
1009}