1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use iceberg::arrow::type_to_arrow_type;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use maplit::{convert_args, hashmap, hashset};
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::array::arrow::IcebergArrowConvert;
25use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
26use risingwave_common::bail;
27use risingwave_common::catalog::{ColumnCatalog, ICEBERG_SINK_PREFIX, ObjectId, Schema, UserId};
28use risingwave_common::license::Feature;
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::system_param::reader::SystemParamsRead;
31use risingwave_common::types::DataType;
32use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
33use risingwave_connector::sink::file_sink::s3::SnowflakeSink;
34use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
35use risingwave_connector::sink::kafka::KAFKA_SINK;
36use risingwave_connector::sink::snowflake_redshift::redshift::RedshiftSink;
37use risingwave_connector::sink::snowflake_redshift::snowflake::SnowflakeV2Sink;
38use risingwave_connector::sink::{
39 CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
40 SINK_USER_IGNORE_DELETE_OPTION, Sink, enforce_secret_sink,
41};
42use risingwave_connector::{
43 AUTO_SCHEMA_CHANGE_KEY, SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, SINK_INTERMEDIATE_TABLE_NAME,
44 SINK_TARGET_TABLE_NAME, WithPropertiesExt,
45};
46use risingwave_pb::catalog::connection_params::PbConnectionType;
47use risingwave_pb::telemetry::TelemetryDatabaseObject;
48use risingwave_sqlparser::ast::{
49 CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
50 ObjectName, Query,
51};
52
53use super::RwPgResponse;
54use super::create_mv::get_column_names;
55use super::create_source::UPSTREAM_SOURCE_KEY;
56use super::util::gen_query_from_table_name;
57use crate::binder::{Binder, Relation};
58use crate::catalog::table_catalog::TableType;
59use crate::error::{ErrorCode, Result, RwError};
60use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
61use crate::handler::HandlerArgs;
62use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
63use crate::handler::create_mv::parse_column_names;
64use crate::handler::util::{
65 LongRunningNotificationAction, check_connector_match_connection_type,
66 ensure_connection_type_allowed, execute_with_long_running_notification,
67};
68use crate::optimizer::backfill_order_strategy::plan_backfill_order;
69use crate::optimizer::plan_node::{
70 IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamPlanRef as PlanRef,
71 StreamProject, generic,
72};
73use crate::optimizer::{OptimizerContext, RelationCollectorVisitor};
74use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
75use crate::session::SessionImpl;
76use crate::session::current::notice_to_user;
77use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
78use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
79use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
80
81static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
82 LazyLock::new(|| {
83 hashset! {
84 PbConnectionType::Unspecified,
85 PbConnectionType::Kafka,
86 PbConnectionType::Iceberg,
87 PbConnectionType::Elasticsearch,
88 }
89 });
90
91static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
92 LazyLock::new(|| {
93 hashset! {
94 PbConnectionType::Unspecified,
95 PbConnectionType::SchemaRegistry,
96 }
97 });
98
99pub struct SinkPlanContext {
101 pub query: Box<Query>,
102 pub sink_plan: PlanRef,
103 pub sink_catalog: SinkCatalog,
104 pub target_table_catalog: Option<Arc<TableCatalog>>,
105 pub dependencies: HashSet<ObjectId>,
106}
107
108pub async fn gen_sink_plan(
109 handler_args: HandlerArgs,
110 stmt: CreateSinkStatement,
111 explain_options: Option<ExplainOptions>,
112 is_iceberg_engine_internal: bool,
113) -> Result<SinkPlanContext> {
114 let session = handler_args.session.clone();
115 let session = session.as_ref();
116 let user_specified_columns = !stmt.columns.is_empty();
117 let db_name = &session.database();
118 let (sink_schema_name, sink_table_name) =
119 Binder::resolve_schema_qualified_name(db_name, &stmt.sink_name)?;
120
121 let mut with_options = handler_args.with_options.clone();
122
123 if session
124 .env()
125 .system_params_manager()
126 .get_params()
127 .load()
128 .enforce_secret()
129 && Feature::SecretManagement.check_available().is_ok()
130 {
131 enforce_secret_sink(&with_options)?;
132 }
133
134 resolve_privatelink_in_with_option(&mut with_options)?;
135 let (mut resolved_with_options, connection_type, connector_conn_ref) =
136 resolve_connection_ref_and_secret_ref(
137 with_options,
138 session,
139 Some(TelemetryDatabaseObject::Sink),
140 )?;
141 ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
142
143 if !matches!(connection_type, PbConnectionType::Unspecified) {
145 let Some(connector) = resolved_with_options.get_connector() else {
146 return Err(RwError::from(ErrorCode::ProtocolError(format!(
147 "missing field '{}' in WITH clause",
148 CONNECTOR_TYPE_KEY
149 ))));
150 };
151 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
152 }
153
154 let partition_info = get_partition_compute_info(&resolved_with_options).await?;
155
156 let context = if let Some(explain_options) = explain_options {
157 OptimizerContext::new(handler_args.clone(), explain_options)
158 } else {
159 OptimizerContext::from_handler_args(handler_args.clone())
160 };
161
162 let is_auto_schema_change = resolved_with_options
163 .remove(AUTO_SCHEMA_CHANGE_KEY)
164 .map(|value| {
165 value.parse::<bool>().map_err(|_| {
166 ErrorCode::InvalidInputSyntax(format!(
167 "invalid value {} of '{}' option, expect",
168 value, AUTO_SCHEMA_CHANGE_KEY
169 ))
170 })
171 })
172 .transpose()?
173 .unwrap_or(false);
174
175 if is_auto_schema_change {
176 Feature::SinkAutoSchemaChange.check_available()?;
177 }
178
179 let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
180 if sink_into_table_name.is_some() {
181 let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
182
183 if prev.is_some() {
184 return Err(RwError::from(ErrorCode::BindError(
185 "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
186 )));
187 }
188 }
189 let connector = resolved_with_options
190 .get(CONNECTOR_TYPE_KEY)
191 .cloned()
192 .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
193
194 let sink_from_table_name;
196 let direct_sink_from_name: Option<(ObjectName, bool)>;
199 let query = match stmt.sink_from {
200 CreateSink::From(from_name) => {
201 sink_from_table_name = from_name.0.last().unwrap().real_value();
202 direct_sink_from_name = Some((from_name.clone(), is_auto_schema_change));
203 if is_auto_schema_change && sink_into_table_name.is_some() {
204 return Err(RwError::from(ErrorCode::InvalidInputSyntax(
205 "auto schema change not supported for sink-into-table".to_owned(),
206 )));
207 }
208 if resolved_with_options
209 .value_eq_ignore_case(SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, "true")
210 && connector == RedshiftSink::SINK_NAME
211 || connector == SnowflakeV2Sink::SINK_NAME
212 {
213 if let Some(table_name) = resolved_with_options.get(SINK_TARGET_TABLE_NAME) {
214 if resolved_with_options
216 .get(SINK_INTERMEDIATE_TABLE_NAME)
217 .is_none()
218 {
219 let intermediate_table_name = format!(
221 "rw_{}_{}_{}",
222 sink_table_name,
223 table_name,
224 uuid::Uuid::new_v4()
225 );
226 resolved_with_options.insert(
227 SINK_INTERMEDIATE_TABLE_NAME.to_owned(),
228 intermediate_table_name,
229 );
230 }
231 } else {
232 return Err(RwError::from(ErrorCode::BindError(
233 "'table.name' option must be specified.".to_owned(),
234 )));
235 }
236 }
237 Box::new(gen_query_from_table_name(from_name))
238 }
239 CreateSink::AsQuery(query) => {
240 if is_auto_schema_change {
241 return Err(RwError::from(ErrorCode::InvalidInputSyntax(
242 "auto schema change not supported for CREATE SINK AS QUERY".to_owned(),
243 )));
244 }
245 sink_from_table_name = sink_table_name.clone();
246 direct_sink_from_name = None;
247 query
248 }
249 };
250
251 let (sink_database_id, sink_schema_id) =
252 session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
253
254 let (dependent_relations, dependent_udfs, bound, auto_refresh_schema_from_table) = {
255 let mut binder = Binder::new_for_stream(session);
256 let auto_refresh_schema_from_table = if let Some((from_name, true)) = &direct_sink_from_name
257 {
258 let from_relation = binder.bind_relation_by_name(from_name, None, None, true)?;
259 if let Relation::BaseTable(table) = from_relation {
260 if table.table_catalog.table_type != TableType::Table {
261 return Err(ErrorCode::InvalidInputSyntax(format!(
262 "auto schema change only support on TABLE, but got {:?}",
263 table.table_catalog.table_type
264 ))
265 .into());
266 }
267 if table.table_catalog.database_id != sink_database_id {
268 return Err(ErrorCode::InvalidInputSyntax(
269 "auto schema change sink does not support created from cross database table".to_owned()
270 )
271 .into());
272 }
273 for col in &table.table_catalog.columns {
274 if !col.is_hidden() && (col.is_generated() || col.is_rw_sys_column()) {
275 return Err(ErrorCode::InvalidInputSyntax(format!("auto schema change not supported for table with non-hidden generated column or sys column, but got {}", col.name())).into());
276 }
277 }
278 Some(table.table_catalog)
279 } else {
280 return Err(RwError::from(ErrorCode::NotSupported(
281 "auto schema change only supported for TABLE".to_owned(),
282 "try recreating the sink from table".to_owned(),
283 )));
284 }
285 } else {
286 None
287 };
288
289 let bound = binder.bind_query(&query)?;
290
291 (
292 binder.included_relations().clone(),
293 binder.included_udfs().clone(),
294 bound,
295 auto_refresh_schema_from_table,
296 )
297 };
298
299 let col_names = if sink_into_table_name.is_some() {
300 parse_column_names(&stmt.columns)
301 } else {
302 get_column_names(&bound, stmt.columns)?
304 };
305
306 let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
307 if emit_on_window_close {
308 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
309 }
310
311 let format_desc = match stmt.sink_schema {
312 Some(f) => {
314 validate_compatibility(&connector, &f)?;
315 Some(bind_sink_format_desc(session,f)?)
316 }
317 None => match resolved_with_options.get(SINK_TYPE_OPTION) {
318 Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
320 session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
321 if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
322 f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
323 }
324 if let Some(v) = resolved_with_options.get(SINK_USER_IGNORE_DELETE_OPTION) {
325 f.options.insert(SINK_USER_IGNORE_DELETE_OPTION.into(), v.into());
326 }
327 f
328 }),
329 None => None,
331 },
332 };
333
334 let definition = context.normalized_sql().to_owned();
335 let mut plan_root = if is_iceberg_engine_internal {
336 Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
337 } else {
338 Planner::new_for_stream(context.into()).plan_query(bound)?
339 };
340 if let Some(col_names) = &col_names {
341 plan_root.set_out_names(col_names.clone())?;
342 };
343
344 let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
345 Some(flag) if flag.eq_ignore_ascii_case("false") => {
346 if direct_sink_from_name.is_some() || is_iceberg_engine_internal {
347 true
348 } else {
349 return Err(ErrorCode::BindError(
350 "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
351 )
352 .into());
353 }
354 }
355 _ => false,
356 };
357
358 let target_table_catalog = stmt
359 .into_table_name
360 .as_ref()
361 .map(|table_name| fetch_table_catalog_for_alter(session, table_name).map(|t| t.0))
362 .transpose()?;
363
364 if let Some(target_table_catalog) = &target_table_catalog {
365 if let Some(col_names) = col_names {
366 let target_table_columns = target_table_catalog
367 .columns()
368 .iter()
369 .map(|c| c.name())
370 .collect::<BTreeSet<_>>();
371 for c in col_names {
372 if !target_table_columns.contains(c.as_str()) {
373 return Err(RwError::from(ErrorCode::BindError(format!(
374 "Column {} not found in table {}",
375 c,
376 target_table_catalog.name()
377 ))));
378 }
379 }
380 }
381 if target_table_catalog
382 .columns()
383 .iter()
384 .any(|col| !col.nullable())
385 {
386 notice_to_user(format!(
387 "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
388 target_table_catalog.name(),
389 ));
390 }
391 }
392
393 let allow_snapshot_backfill = target_table_catalog.is_none() && !is_iceberg_engine_internal;
394
395 let sink_plan = plan_root.gen_sink_plan(
396 sink_table_name,
397 definition,
398 resolved_with_options,
399 emit_on_window_close,
400 db_name.to_owned(),
401 sink_from_table_name,
402 format_desc,
403 without_backfill,
404 target_table_catalog.clone(),
405 partition_info,
406 user_specified_columns,
407 auto_refresh_schema_from_table,
408 allow_snapshot_backfill,
409 )?;
410
411 let sink_desc = sink_plan.sink_desc().clone();
412
413 let mut sink_plan: PlanRef = sink_plan.into();
414
415 let ctx = sink_plan.ctx();
416 let explain_trace = ctx.is_explain_trace();
417 if explain_trace {
418 ctx.trace("Create Sink:");
419 ctx.trace(sink_plan.explain_to_string());
420 }
421 tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
422
423 let dependencies =
426 RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
427 .into_iter()
428 .chain(dependent_udfs.iter().copied().map_into())
429 .collect();
430
431 let sink_catalog = sink_desc.into_catalog(
432 sink_schema_id,
433 sink_database_id,
434 UserId::new(session.user_id()),
435 connector_conn_ref,
436 );
437
438 if let Some(table_catalog) = &target_table_catalog {
439 for column in sink_catalog.full_columns() {
440 if !column.can_dml() {
441 unreachable!(
442 "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
443 );
444 }
445 }
446
447 let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
448 let exprs = derive_default_column_project_for_sink(
449 &sink_catalog,
450 sink_plan.schema(),
451 &table_columns_without_rw_timestamp,
452 user_specified_columns,
453 )?;
454
455 let logical_project = generic::Project::new(exprs, sink_plan);
456
457 sink_plan = StreamProject::new(logical_project).into();
458
459 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
460 &table_columns_without_rw_timestamp,
461 )?;
462
463 if let Some(exprs) = exprs {
464 let logical_project = generic::Project::new(exprs, sink_plan);
465 sink_plan = StreamProject::new(logical_project).into();
466 }
467 };
468
469 Ok(SinkPlanContext {
470 query,
471 sink_plan,
472 sink_catalog,
473 target_table_catalog,
474 dependencies,
475 })
476}
477
478pub async fn get_partition_compute_info(
483 with_options: &WithOptionsSecResolved,
484) -> Result<Option<PartitionComputeInfo>> {
485 let (options, secret_refs) = with_options.clone().into_parts();
486 let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
487 return Ok(None);
488 };
489 let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
490 match connector.as_str() {
491 ICEBERG_SINK => {
492 let iceberg_config = IcebergConfig::from_btreemap(properties)?;
493 get_partition_compute_info_for_iceberg(&iceberg_config).await
494 }
495 _ => Ok(None),
496 }
497}
498
499#[allow(clippy::unused_async)]
500async fn get_partition_compute_info_for_iceberg(
501 _iceberg_config: &IcebergConfig,
502) -> Result<Option<PartitionComputeInfo>> {
503 if _iceberg_config.create_table_if_not_exists {
505 return Ok(None);
506 }
507 let table = _iceberg_config.load_table().await?;
508 let partition_spec = table.metadata().default_partition_spec();
509 if partition_spec.is_unpartitioned() {
510 return Ok(None);
511 }
512
513 let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
518 Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
520 Transform::Year
522 | Transform::Month
523 | Transform::Day
524 | Transform::Hour
525 | Transform::Void
526 | Transform::Unknown => false,
527 });
528 if !has_sparse_partition {
529 return Ok(None);
530 }
531
532 let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
533 table.metadata().default_partition_type().clone(),
534 ))
535 .map_err(|_| {
536 RwError::from(ErrorCode::SinkError(
537 "Fail to convert iceberg partition type to arrow type".into(),
538 ))
539 })?;
540 let ArrowDataType::Struct(struct_fields) = arrow_type else {
541 return Err(RwError::from(ErrorCode::SinkError(
542 "Partition type of iceberg should be a struct type".into(),
543 )));
544 };
545
546 let schema = table.metadata().current_schema();
547 let partition_fields = partition_spec
548 .fields()
549 .iter()
550 .map(|f| {
551 let source_f =
552 schema
553 .field_by_id(f.source_id)
554 .ok_or(RwError::from(ErrorCode::SinkError(
555 "Fail to look up iceberg partition field".into(),
556 )))?;
557 Ok((source_f.name.clone(), f.transform))
558 })
559 .collect::<Result<Vec<_>>>()?;
560
561 Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
562 partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
563 partition_fields,
564 })))
565}
566
567pub async fn handle_create_sink(
568 handle_args: HandlerArgs,
569 stmt: CreateSinkStatement,
570 is_iceberg_engine_internal: bool,
571) -> Result<RwPgResponse> {
572 let session = handle_args.session.clone();
573
574 session.check_cluster_limits().await?;
575
576 let if_not_exists = stmt.if_not_exists;
577 if let Either::Right(resp) = session.check_relation_name_duplicated(
578 stmt.sink_name.clone(),
579 StatementType::CREATE_SINK,
580 if_not_exists,
581 )? {
582 return Ok(resp);
583 }
584
585 if stmt.sink_name.base_name().starts_with(ICEBERG_SINK_PREFIX) {
586 return Err(RwError::from(ErrorCode::InvalidInputSyntax(format!(
587 "Sink name cannot start with reserved prefix '{}'",
588 ICEBERG_SINK_PREFIX
589 ))));
590 }
591
592 let (mut sink, graph, target_table_catalog, dependencies) = {
593 let backfill_order_strategy = handle_args.with_options.backfill_order_strategy();
594
595 let SinkPlanContext {
596 query,
597 sink_plan: plan,
598 sink_catalog: sink,
599 target_table_catalog,
600 dependencies,
601 } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
602
603 let has_order_by = !query.order_by.is_empty();
604 if has_order_by {
605 plan.ctx().warn_to_user(
606 r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
607 .to_owned(),
608 );
609 }
610
611 let backfill_order =
612 plan_backfill_order(session.as_ref(), backfill_order_strategy, plan.clone())?;
613
614 let graph =
615 build_graph_with_strategy(plan, Some(GraphJobType::Sink), Some(backfill_order))?;
616
617 (sink, graph, target_table_catalog, dependencies)
618 };
619
620 if let Some(table_catalog) = target_table_catalog {
621 sink.original_target_columns = table_catalog.columns_without_rw_timestamp();
622 }
623
624 let _job_guard =
625 session
626 .env()
627 .creating_streaming_job_tracker()
628 .guard(CreatingStreamingJobInfo::new(
629 session.session_id(),
630 sink.database_id,
631 sink.schema_id,
632 sink.name.clone(),
633 ));
634
635 let catalog_writer = session.catalog_writer()?;
636 execute_with_long_running_notification(
637 catalog_writer.create_sink(sink.to_proto(), graph, dependencies, if_not_exists),
638 &session,
639 "CREATE SINK",
640 LongRunningNotificationAction::MonitorBackfillJob,
641 )
642 .await?;
643
644 Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
645}
646
647pub fn fetch_incoming_sinks(
648 session: &Arc<SessionImpl>,
649 table: &TableCatalog,
650) -> Result<Vec<Arc<SinkCatalog>>> {
651 let reader = session.env().catalog_reader().read_guard();
652 let schema = reader.get_schema_by_id(table.database_id, table.schema_id)?;
653 let Some(incoming_sinks) = schema.table_incoming_sinks(table.id) else {
654 return Ok(vec![]);
655 };
656 let mut sinks = vec![];
657 for sink_id in incoming_sinks {
658 sinks.push(
659 schema
660 .get_sink_by_id(*sink_id)
661 .expect("should exist")
662 .clone(),
663 );
664 }
665 Ok(sinks)
666}
667
668fn derive_sink_to_table_expr(
669 sink_schema: &Schema,
670 idx: usize,
671 target_type: &DataType,
672) -> Result<ExprImpl> {
673 let input_type = &sink_schema.fields()[idx].data_type;
674
675 if !target_type.equals_datatype(input_type) {
676 bail!(
677 "column type mismatch: {:?} vs {:?}, column name: {:?}",
678 target_type,
679 input_type,
680 sink_schema.fields()[idx].name
681 );
682 } else {
683 Ok(ExprImpl::InputRef(Box::new(InputRef::new(
684 idx,
685 input_type.clone(),
686 ))))
687 }
688}
689
690pub(crate) fn derive_default_column_project_for_sink(
691 sink: &SinkCatalog,
692 sink_schema: &Schema,
693 columns: &[ColumnCatalog],
694 user_specified_columns: bool,
695) -> Result<Vec<ExprImpl>> {
696 assert_eq!(sink.full_schema().len(), sink_schema.len());
697
698 let default_column_exprs = TableCatalog::default_column_exprs(columns);
699
700 let mut exprs = vec![];
701
702 let sink_visible_col_idxes = sink
703 .full_columns()
704 .iter()
705 .positions(|c| !c.is_hidden())
706 .collect_vec();
707 let sink_visible_col_idxes_by_name = sink
708 .full_columns()
709 .iter()
710 .enumerate()
711 .filter(|(_, c)| !c.is_hidden())
712 .map(|(i, c)| (c.name(), i))
713 .collect::<BTreeMap<_, _>>();
714
715 for (idx, column) in columns.iter().enumerate() {
716 if !column.can_dml() {
717 continue;
718 }
719
720 let default_col_expr =
721 || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
722
723 let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
724 derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
725 };
726
727 #[allow(clippy::collapsible_else_if)]
731 if user_specified_columns {
732 if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
733 exprs.push(sink_col_expr(*idx)?);
734 } else {
735 exprs.push(default_col_expr());
736 }
737 } else {
738 if idx < sink_visible_col_idxes.len() {
739 exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
740 } else {
741 exprs.push(default_col_expr());
742 };
743 }
744 }
745 Ok(exprs)
746}
747
748fn bind_sink_format_desc(
752 session: &SessionImpl,
753 value: FormatEncodeOptions,
754) -> Result<SinkFormatDesc> {
755 use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
756 use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
757 use risingwave_sqlparser::ast::{Encode as E, Format as F};
758
759 let format = match value.format {
760 F::Plain => SinkFormat::AppendOnly,
761 F::Upsert => SinkFormat::Upsert,
762 F::Debezium => SinkFormat::Debezium,
763 f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
764 return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
765 }
766 };
767 let encode = match value.row_encode {
768 E::Json => SinkEncode::Json,
769 E::Protobuf => SinkEncode::Protobuf,
770 E::Avro => SinkEncode::Avro,
771 E::Template => SinkEncode::Template,
772 E::Parquet => SinkEncode::Parquet,
773 E::Bytes => SinkEncode::Bytes,
774 e @ (E::Native | E::Csv | E::None | E::Text) => {
775 return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
776 }
777 };
778
779 let mut key_encode = None;
780 if let Some(encode) = value.key_encode {
781 match encode {
782 E::Text => key_encode = Some(SinkEncode::Text),
783 E::Bytes => key_encode = Some(SinkEncode::Bytes),
784 _ => {
785 return Err(ErrorCode::BindError(format!(
786 "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
787 ))
788 .into());
789 }
790 }
791 }
792
793 let (props, connection_type_flag, schema_registry_conn_ref) =
794 resolve_connection_ref_and_secret_ref(
795 WithOptions::try_from(value.row_options.as_slice())?,
796 session,
797 Some(TelemetryDatabaseObject::Sink),
798 )?;
799 ensure_connection_type_allowed(
800 connection_type_flag,
801 &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
802 )?;
803 let (mut options, secret_refs) = props.into_parts();
804
805 options
806 .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
807 .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
808
809 Ok(SinkFormatDesc {
810 format,
811 encode,
812 options,
813 secret_refs,
814 key_encode,
815 connection_id: schema_registry_conn_ref,
816 })
817}
818
819static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
820 LazyLock::new(|| {
821 use risingwave_connector::sink::Sink as _;
822 use risingwave_connector::sink::file_sink::azblob::AzblobSink;
823 use risingwave_connector::sink::file_sink::fs::FsSink;
824 use risingwave_connector::sink::file_sink::gcs::GcsSink;
825 use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
826 use risingwave_connector::sink::file_sink::s3::S3Sink;
827 use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
828 use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
829 use risingwave_connector::sink::kafka::KafkaSink;
830 use risingwave_connector::sink::kinesis::KinesisSink;
831 use risingwave_connector::sink::mqtt::MqttSink;
832 use risingwave_connector::sink::pulsar::PulsarSink;
833 use risingwave_connector::sink::redis::RedisSink;
834
835 convert_args!(hashmap!(
836 GooglePubSubSink::SINK_NAME => hashmap!(
837 Format::Plain => vec![Encode::Json],
838 ),
839 KafkaSink::SINK_NAME => hashmap!(
840 Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf, Encode::Bytes],
841 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
842 Format::Debezium => vec![Encode::Json],
843 ),
844 FileSink::<S3Sink>::SINK_NAME => hashmap!(
845 Format::Plain => vec![Encode::Parquet, Encode::Json],
846 ),
847 FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
848 Format::Plain => vec![Encode::Parquet, Encode::Json],
849 ),
850 FileSink::<GcsSink>::SINK_NAME => hashmap!(
851 Format::Plain => vec![Encode::Parquet, Encode::Json],
852 ),
853 FileSink::<AzblobSink>::SINK_NAME => hashmap!(
854 Format::Plain => vec![Encode::Parquet, Encode::Json],
855 ),
856 FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
857 Format::Plain => vec![Encode::Parquet, Encode::Json],
858 ),
859 FileSink::<FsSink>::SINK_NAME => hashmap!(
860 Format::Plain => vec![Encode::Parquet, Encode::Json],
861 ),
862 KinesisSink::SINK_NAME => hashmap!(
863 Format::Plain => vec![Encode::Json],
864 Format::Upsert => vec![Encode::Json],
865 Format::Debezium => vec![Encode::Json],
866 ),
867 MqttSink::SINK_NAME => hashmap!(
868 Format::Plain => vec![Encode::Json, Encode::Protobuf],
869 ),
870 PulsarSink::SINK_NAME => hashmap!(
871 Format::Plain => vec![Encode::Json],
872 Format::Upsert => vec![Encode::Json],
873 Format::Debezium => vec![Encode::Json],
874 ),
875 RedisSink::SINK_NAME => hashmap!(
876 Format::Plain => vec![Encode::Json, Encode::Template],
877 Format::Upsert => vec![Encode::Json, Encode::Template],
878 ),
879 ))
880 });
881
882pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
883 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
884 .get(connector)
885 .ok_or_else(|| {
886 ErrorCode::BindError(format!(
887 "connector {} is not supported by FORMAT ... ENCODE ... syntax",
888 connector
889 ))
890 })?;
891 let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
892 ErrorCode::BindError(format!(
893 "connector {} does not support format {:?}",
894 connector, format_desc.format
895 ))
896 })?;
897 if !compatible_encodes.contains(&format_desc.row_encode) {
898 return Err(ErrorCode::BindError(format!(
899 "connector {} does not support format {:?} with encode {:?}",
900 connector, format_desc.format, format_desc.row_encode
901 ))
902 .into());
903 }
904
905 if let Some(encode) = &format_desc.key_encode
907 && connector != KAFKA_SINK
908 && matches!(encode, Encode::Bytes)
909 {
910 return Err(ErrorCode::BindError(format!(
911 "key encode bytes only works with kafka connector, but found {}",
912 connector
913 ))
914 .into());
915 }
916
917 Ok(())
918}
919
920#[cfg(test)]
921pub mod tests {
922 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
923
924 use crate::catalog::root_catalog::SchemaPath;
925 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
926
927 #[tokio::test]
928 async fn test_create_sink_handler() {
929 let proto_file = create_proto_file(PROTO_FILE_DATA);
930 let sql = format!(
931 r#"CREATE SOURCE t1
932 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
933 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
934 proto_file.path().to_str().unwrap()
935 );
936 let frontend = LocalFrontend::new(Default::default()).await;
937 frontend.run_sql(sql).await.unwrap();
938
939 let sql = "create materialized view mv1 as select t1.country from t1;";
940 frontend.run_sql(sql).await.unwrap();
941
942 let sql = r#"CREATE SINK snk1 FROM mv1
943 WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
944 '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
945 mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
946 frontend.run_sql(sql).await.unwrap();
947
948 let session = frontend.session_ref();
949 let catalog_reader = session.env().catalog_reader().read_guard();
950 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
951
952 let (source, _) = catalog_reader
954 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
955 .unwrap();
956 assert_eq!(source.name, "t1");
957
958 let (table, schema_name) = catalog_reader
960 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
961 .unwrap();
962 assert_eq!(table.name(), "mv1");
963
964 let (sink, _) = catalog_reader
966 .get_created_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
967 .unwrap();
968 assert_eq!(sink.name, "snk1");
969 }
970}