1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use either::Either;
19use iceberg::arrow::type_to_arrow_type;
20use iceberg::spec::Transform;
21use itertools::Itertools;
22use maplit::{convert_args, hashmap, hashset};
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::array::arrow::IcebergArrowConvert;
25use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
26use risingwave_common::bail;
27use risingwave_common::catalog::{
28 ColumnCatalog, ConnectionId, DatabaseId, ObjectId, Schema, SchemaId, UserId,
29};
30use risingwave_common::license::Feature;
31use risingwave_common::secret::LocalSecretManager;
32use risingwave_common::system_param::reader::SystemParamsRead;
33use risingwave_common::types::DataType;
34use risingwave_connector::WithPropertiesExt;
35use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
36use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
37use risingwave_connector::sink::kafka::KAFKA_SINK;
38use risingwave_connector::sink::{
39 CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
40 enforce_secret_sink,
41};
42use risingwave_pb::catalog::connection_params::PbConnectionType;
43use risingwave_pb::catalog::{PbSink, PbSource, Table};
44use risingwave_pb::ddl_service::{ReplaceJobPlan, TableJobType, replace_job_plan};
45use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
46use risingwave_pb::stream_plan::{MergeNode, StreamFragmentGraph, StreamNode};
47use risingwave_pb::telemetry::TelemetryDatabaseObject;
48use risingwave_sqlparser::ast::{
49 CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
50 Query, Statement,
51};
52
53use super::RwPgResponse;
54use super::create_mv::get_column_names;
55use super::create_source::{SqlColumnStrategy, UPSTREAM_SOURCE_KEY};
56use super::util::gen_query_from_table_name;
57use crate::binder::Binder;
58use crate::catalog::SinkId;
59use crate::error::{ErrorCode, Result, RwError};
60use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
61use crate::handler::HandlerArgs;
62use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
63use crate::handler::create_mv::parse_column_names;
64use crate::handler::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
65use crate::handler::util::{check_connector_match_connection_type, ensure_connection_type_allowed};
66use crate::optimizer::plan_node::{
67 IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamProject, generic,
68};
69use crate::optimizer::{OptimizerContext, PlanRef, RelationCollectorVisitor};
70use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
71use crate::session::SessionImpl;
72use crate::session::current::notice_to_user;
73use crate::stream_fragmenter::{GraphJobType, build_graph};
74use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
75use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
76
77static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
78 LazyLock::new(|| {
79 hashset! {
80 PbConnectionType::Unspecified,
81 PbConnectionType::Kafka,
82 PbConnectionType::Iceberg,
83 PbConnectionType::Elasticsearch,
84 }
85 });
86
87static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
88 LazyLock::new(|| {
89 hashset! {
90 PbConnectionType::Unspecified,
91 PbConnectionType::SchemaRegistry,
92 }
93 });
94
95pub struct SinkPlanContext {
97 pub query: Box<Query>,
98 pub sink_plan: PlanRef,
99 pub sink_catalog: SinkCatalog,
100 pub target_table_catalog: Option<Arc<TableCatalog>>,
101 pub dependencies: HashSet<ObjectId>,
102}
103
104pub async fn gen_sink_plan(
105 handler_args: HandlerArgs,
106 stmt: CreateSinkStatement,
107 explain_options: Option<ExplainOptions>,
108 is_iceberg_engine_internal: bool,
109) -> Result<SinkPlanContext> {
110 let session = handler_args.session.clone();
111 let session = session.as_ref();
112 let user_specified_columns = !stmt.columns.is_empty();
113 let db_name = &session.database();
114 let (sink_schema_name, sink_table_name) =
115 Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?;
116
117 let mut with_options = handler_args.with_options.clone();
118
119 if session
120 .env()
121 .system_params_manager()
122 .get_params()
123 .load()
124 .enforce_secret()
125 && Feature::SecretManagement.check_available().is_ok()
126 {
127 enforce_secret_sink(&with_options)?;
128 }
129
130 resolve_privatelink_in_with_option(&mut with_options)?;
131 let (mut resolved_with_options, connection_type, connector_conn_ref) =
132 resolve_connection_ref_and_secret_ref(
133 with_options,
134 session,
135 Some(TelemetryDatabaseObject::Sink),
136 )?;
137 ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
138
139 if !matches!(connection_type, PbConnectionType::Unspecified) {
141 let Some(connector) = resolved_with_options.get_connector() else {
142 return Err(RwError::from(ErrorCode::ProtocolError(format!(
143 "missing field '{}' in WITH clause",
144 CONNECTOR_TYPE_KEY
145 ))));
146 };
147 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
148 }
149
150 let partition_info = get_partition_compute_info(&resolved_with_options).await?;
151
152 let context = if let Some(explain_options) = explain_options {
153 OptimizerContext::new(handler_args.clone(), explain_options)
154 } else {
155 OptimizerContext::from_handler_args(handler_args.clone())
156 };
157
158 let sink_from_table_name;
160 let direct_sink;
163 let query = match stmt.sink_from {
164 CreateSink::From(from_name) => {
165 sink_from_table_name = from_name.0.last().unwrap().real_value();
166 direct_sink = true;
167 Box::new(gen_query_from_table_name(from_name))
168 }
169 CreateSink::AsQuery(query) => {
170 sink_from_table_name = sink_table_name.clone();
171 direct_sink = false;
172 query
173 }
174 };
175
176 let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
177
178 let (sink_database_id, sink_schema_id) =
179 session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
180
181 let (dependent_relations, dependent_udfs, bound) = {
182 let mut binder = Binder::new_for_stream(session);
183 let bound = binder.bind_query(*query.clone())?;
184 (
185 binder.included_relations().clone(),
186 binder.included_udfs().clone(),
187 bound,
188 )
189 };
190
191 let col_names = if sink_into_table_name.is_some() {
192 parse_column_names(&stmt.columns)
193 } else {
194 get_column_names(&bound, stmt.columns)?
196 };
197
198 if sink_into_table_name.is_some() {
199 let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
200
201 if prev.is_some() {
202 return Err(RwError::from(ErrorCode::BindError(
203 "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
204 )));
205 }
206 }
207
208 let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
209 if emit_on_window_close {
210 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
211 }
212
213 let connector = resolved_with_options
214 .get(CONNECTOR_TYPE_KEY)
215 .cloned()
216 .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
217
218 let format_desc = match stmt.sink_schema {
219 Some(f) => {
221 validate_compatibility(&connector, &f)?;
222 Some(bind_sink_format_desc(session,f)?)
223 }
224 None => match resolved_with_options.get(SINK_TYPE_OPTION) {
225 Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
227 session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
228 if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
229 f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
230 }
231 f
232 }),
233 None => None,
235 },
236 };
237
238 let definition = context.normalized_sql().to_owned();
239 let mut plan_root = if is_iceberg_engine_internal {
240 Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
241 } else {
242 Planner::new_for_stream(context.into()).plan_query(bound)?
243 };
244 if let Some(col_names) = &col_names {
245 plan_root.set_out_names(col_names.clone())?;
246 };
247
248 let without_backfill = match resolved_with_options.remove(SINK_SNAPSHOT_OPTION) {
249 Some(flag) if flag.eq_ignore_ascii_case("false") => {
250 if direct_sink || is_iceberg_engine_internal {
251 true
252 } else {
253 return Err(ErrorCode::BindError(
254 "`snapshot = false` only support `CREATE SINK FROM MV or TABLE`".to_owned(),
255 )
256 .into());
257 }
258 }
259 _ => false,
260 };
261
262 let target_table_catalog = stmt
263 .into_table_name
264 .as_ref()
265 .map(|table_name| fetch_table_catalog_for_alter(session, table_name))
266 .transpose()?;
267
268 if let Some(target_table_catalog) = &target_table_catalog {
269 if let Some(col_names) = col_names {
270 let target_table_columns = target_table_catalog
271 .columns()
272 .iter()
273 .map(|c| c.name())
274 .collect::<BTreeSet<_>>();
275 for c in col_names {
276 if !target_table_columns.contains(c.as_str()) {
277 return Err(RwError::from(ErrorCode::BindError(format!(
278 "Column {} not found in table {}",
279 c,
280 target_table_catalog.name()
281 ))));
282 }
283 }
284 }
285 if target_table_catalog
286 .columns()
287 .iter()
288 .any(|col| !col.nullable())
289 {
290 notice_to_user(format!(
291 "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
292 target_table_catalog.name(),
293 ));
294 }
295 }
296
297 let sink_plan = plan_root.gen_sink_plan(
298 sink_table_name,
299 definition,
300 resolved_with_options,
301 emit_on_window_close,
302 db_name.to_owned(),
303 sink_from_table_name,
304 format_desc,
305 without_backfill,
306 target_table_catalog.clone(),
307 partition_info,
308 user_specified_columns,
309 )?;
310
311 let sink_desc = sink_plan.sink_desc().clone();
312
313 let mut sink_plan: PlanRef = sink_plan.into();
314
315 let ctx = sink_plan.ctx();
316 let explain_trace = ctx.is_explain_trace();
317 if explain_trace {
318 ctx.trace("Create Sink:");
319 ctx.trace(sink_plan.explain_to_string());
320 }
321 tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
322
323 let dependencies =
326 RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
327 .into_iter()
328 .map(|id| id.table_id() as ObjectId)
329 .chain(
330 dependent_udfs
331 .into_iter()
332 .map(|id| id.function_id() as ObjectId),
333 )
334 .collect();
335
336 let sink_catalog = sink_desc.into_catalog(
337 SchemaId::new(sink_schema_id),
338 DatabaseId::new(sink_database_id),
339 UserId::new(session.user_id()),
340 connector_conn_ref.map(ConnectionId::from),
341 );
342
343 if let Some(table_catalog) = &target_table_catalog {
344 for column in sink_catalog.full_columns() {
345 if !column.can_dml() {
346 unreachable!(
347 "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
348 );
349 }
350 }
351
352 let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
353 let exprs = derive_default_column_project_for_sink(
354 &sink_catalog,
355 sink_plan.schema(),
356 &table_columns_without_rw_timestamp,
357 user_specified_columns,
358 )?;
359
360 let logical_project = generic::Project::new(exprs, sink_plan);
361
362 sink_plan = StreamProject::new(logical_project).into();
363
364 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
365 &table_columns_without_rw_timestamp,
366 )?;
367
368 if let Some(exprs) = exprs {
369 let logical_project = generic::Project::new(exprs, sink_plan);
370 sink_plan = StreamProject::new(logical_project).into();
371 }
372 };
373
374 Ok(SinkPlanContext {
375 query,
376 sink_plan,
377 sink_catalog,
378 target_table_catalog,
379 dependencies,
380 })
381}
382
383pub async fn get_partition_compute_info(
388 with_options: &WithOptionsSecResolved,
389) -> Result<Option<PartitionComputeInfo>> {
390 let (options, secret_refs) = with_options.clone().into_parts();
391 let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
392 return Ok(None);
393 };
394 let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
395 match connector.as_str() {
396 ICEBERG_SINK => {
397 let iceberg_config = IcebergConfig::from_btreemap(properties)?;
398 get_partition_compute_info_for_iceberg(&iceberg_config).await
399 }
400 _ => Ok(None),
401 }
402}
403
404#[allow(clippy::unused_async)]
405async fn get_partition_compute_info_for_iceberg(
406 _iceberg_config: &IcebergConfig,
407) -> Result<Option<PartitionComputeInfo>> {
408 if _iceberg_config.create_table_if_not_exists {
410 return Ok(None);
411 }
412 let table = _iceberg_config.load_table().await?;
413 let partition_spec = table.metadata().default_partition_spec();
414 if partition_spec.is_unpartitioned() {
415 return Ok(None);
416 }
417
418 let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
423 Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
425 Transform::Year
427 | Transform::Month
428 | Transform::Day
429 | Transform::Hour
430 | Transform::Void
431 | Transform::Unknown => false,
432 });
433 if !has_sparse_partition {
434 return Ok(None);
435 }
436
437 let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
438 table.metadata().default_partition_type().clone(),
439 ))
440 .map_err(|_| {
441 RwError::from(ErrorCode::SinkError(
442 "Fail to convert iceberg partition type to arrow type".into(),
443 ))
444 })?;
445 let ArrowDataType::Struct(struct_fields) = arrow_type else {
446 return Err(RwError::from(ErrorCode::SinkError(
447 "Partition type of iceberg should be a struct type".into(),
448 )));
449 };
450
451 let schema = table.metadata().current_schema();
452 let partition_fields = partition_spec
453 .fields()
454 .iter()
455 .map(|f| {
456 let source_f =
457 schema
458 .field_by_id(f.source_id)
459 .ok_or(RwError::from(ErrorCode::SinkError(
460 "Fail to look up iceberg partition field".into(),
461 )))?;
462 Ok((source_f.name.clone(), f.transform))
463 })
464 .collect::<Result<Vec<_>>>()?;
465
466 Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
467 partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
468 partition_fields,
469 })))
470}
471
472pub async fn handle_create_sink(
473 handle_args: HandlerArgs,
474 stmt: CreateSinkStatement,
475 is_iceberg_engine_internal: bool,
476) -> Result<RwPgResponse> {
477 let session = handle_args.session.clone();
478
479 session.check_cluster_limits().await?;
480
481 if let Either::Right(resp) = session.check_relation_name_duplicated(
482 stmt.sink_name.clone(),
483 StatementType::CREATE_SINK,
484 stmt.if_not_exists,
485 )? {
486 return Ok(resp);
487 }
488
489 let (mut sink, graph, target_table_catalog, dependencies) = {
490 let SinkPlanContext {
491 query,
492 sink_plan: plan,
493 sink_catalog: sink,
494 target_table_catalog,
495 dependencies,
496 } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
497
498 let has_order_by = !query.order_by.is_empty();
499 if has_order_by {
500 plan.ctx().warn_to_user(
501 r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
502 .to_owned(),
503 );
504 }
505
506 let graph = build_graph(plan, Some(GraphJobType::Sink))?;
507
508 (sink, graph, target_table_catalog, dependencies)
509 };
510
511 let mut target_table_replace_plan = None;
512 if let Some(table_catalog) = target_table_catalog {
513 use crate::handler::alter_table_column::hijack_merger_for_target_table;
514
515 let (mut graph, mut table, source, target_job_type) =
516 reparse_table_for_sink(&session, &table_catalog).await?;
517
518 sink.original_target_columns = table
519 .columns
520 .iter()
521 .map(|col| ColumnCatalog::from(col.clone()))
522 .collect_vec();
523
524 table
525 .incoming_sinks
526 .clone_from(&table_catalog.incoming_sinks);
527
528 let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect();
529 let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;
530
531 let columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
532 for existing_sink in incoming_sinks {
533 hijack_merger_for_target_table(
534 &mut graph,
535 &columns_without_rw_timestamp,
536 &existing_sink,
537 Some(&existing_sink.unique_identity()),
538 )?;
539 }
540
541 hijack_merger_for_target_table(&mut graph, &columns_without_rw_timestamp, &sink, None)?;
543
544 target_table_replace_plan = Some(ReplaceJobPlan {
545 replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
546 replace_job_plan::ReplaceTable {
547 table: Some(table),
548 source,
549 job_type: target_job_type as _,
550 },
551 )),
552 fragment_graph: Some(graph),
553 table_col_index_mapping: None,
554 });
555 }
556
557 let _job_guard =
558 session
559 .env()
560 .creating_streaming_job_tracker()
561 .guard(CreatingStreamingJobInfo::new(
562 session.session_id(),
563 sink.database_id.database_id,
564 sink.schema_id.schema_id,
565 sink.name.clone(),
566 ));
567
568 let catalog_writer = session.catalog_writer()?;
569 catalog_writer
570 .create_sink(
571 sink.to_proto(),
572 graph,
573 target_table_replace_plan,
574 dependencies,
575 )
576 .await?;
577
578 Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
579}
580
581pub fn fetch_incoming_sinks(
582 session: &Arc<SessionImpl>,
583 incoming_sink_ids: &HashSet<SinkId>,
584) -> Result<Vec<Arc<SinkCatalog>>> {
585 let reader = session.env().catalog_reader().read_guard();
586 let mut sinks = Vec::with_capacity(incoming_sink_ids.len());
587 let db_name = &session.database();
588 for schema in reader.iter_schemas(db_name)? {
589 for sink in schema.iter_sink() {
590 if incoming_sink_ids.contains(&sink.id.sink_id) {
591 sinks.push(sink.clone());
592 }
593 }
594 }
595
596 Ok(sinks)
597}
598
599pub(crate) async fn reparse_table_for_sink(
600 session: &Arc<SessionImpl>,
601 table_catalog: &Arc<TableCatalog>,
602) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
603 let definition = table_catalog.create_sql_ast_purified()?;
605 let Statement::CreateTable { name, .. } = &definition else {
606 panic!("unexpected statement: {:?}", definition);
607 };
608 let table_name = name.clone();
609
610 let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
612 let col_id_gen = ColumnIdGenerator::new_alter(table_catalog);
613
614 let (graph, table, source, job_type) = generate_stream_graph_for_replace_table(
615 session,
616 table_name,
617 table_catalog,
618 handler_args,
619 definition,
620 col_id_gen,
621 SqlColumnStrategy::FollowUnchecked,
622 )
623 .await?;
624
625 Ok((graph, table, source, job_type))
626}
627
628pub(crate) fn insert_merger_to_union_with_project(
629 node: &mut StreamNode,
630 project_node: &PbNodeBody,
631 uniq_identity: Option<&str>,
632) {
633 if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
634 node.input.push(StreamNode {
636 input: vec![StreamNode {
637 node_body: Some(NodeBody::Merge(Box::new(MergeNode {
638 ..Default::default()
639 }))),
640 ..Default::default()
641 }],
642 identity: uniq_identity
643 .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
644 .to_owned(),
645 fields: node.fields.clone(),
646 node_body: Some(project_node.clone()),
647 ..Default::default()
648 });
649
650 return;
651 }
652
653 for input in &mut node.input {
654 insert_merger_to_union_with_project(input, project_node, uniq_identity);
655 }
656}
657
658fn derive_sink_to_table_expr(
659 sink_schema: &Schema,
660 idx: usize,
661 target_type: &DataType,
662) -> Result<ExprImpl> {
663 let input_type = &sink_schema.fields()[idx].data_type;
664
665 if target_type != input_type {
666 bail!(
667 "column type mismatch: {:?} vs {:?}, column name: {:?}",
668 target_type,
669 input_type,
670 sink_schema.fields()[idx].name
671 );
672 } else {
673 Ok(ExprImpl::InputRef(Box::new(InputRef::new(
674 idx,
675 input_type.clone(),
676 ))))
677 }
678}
679
680pub(crate) fn derive_default_column_project_for_sink(
681 sink: &SinkCatalog,
682 sink_schema: &Schema,
683 columns: &[ColumnCatalog],
684 user_specified_columns: bool,
685) -> Result<Vec<ExprImpl>> {
686 assert_eq!(sink.full_schema().len(), sink_schema.len());
687
688 let default_column_exprs = TableCatalog::default_column_exprs(columns);
689
690 let mut exprs = vec![];
691
692 let sink_visible_col_idxes = sink
693 .full_columns()
694 .iter()
695 .positions(|c| !c.is_hidden())
696 .collect_vec();
697 let sink_visible_col_idxes_by_name = sink
698 .full_columns()
699 .iter()
700 .enumerate()
701 .filter(|(_, c)| !c.is_hidden())
702 .map(|(i, c)| (c.name(), i))
703 .collect::<BTreeMap<_, _>>();
704
705 for (idx, column) in columns.iter().enumerate() {
706 if !column.can_dml() {
707 continue;
708 }
709
710 let default_col_expr =
711 || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
712
713 let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
714 derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
715 };
716
717 #[allow(clippy::collapsible_else_if)]
721 if user_specified_columns {
722 if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
723 exprs.push(sink_col_expr(*idx)?);
724 } else {
725 exprs.push(default_col_expr());
726 }
727 } else {
728 if idx < sink_visible_col_idxes.len() {
729 exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
730 } else {
731 exprs.push(default_col_expr());
732 };
733 }
734 }
735 Ok(exprs)
736}
737
738fn bind_sink_format_desc(
742 session: &SessionImpl,
743 value: FormatEncodeOptions,
744) -> Result<SinkFormatDesc> {
745 use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
746 use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
747 use risingwave_sqlparser::ast::{Encode as E, Format as F};
748
749 let format = match value.format {
750 F::Plain => SinkFormat::AppendOnly,
751 F::Upsert => SinkFormat::Upsert,
752 F::Debezium => SinkFormat::Debezium,
753 f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
754 return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
755 }
756 };
757 let encode = match value.row_encode {
758 E::Json => SinkEncode::Json,
759 E::Protobuf => SinkEncode::Protobuf,
760 E::Avro => SinkEncode::Avro,
761 E::Template => SinkEncode::Template,
762 E::Parquet => SinkEncode::Parquet,
763 e @ (E::Native | E::Csv | E::Bytes | E::None | E::Text) => {
764 return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
765 }
766 };
767
768 let mut key_encode = None;
769 if let Some(encode) = value.key_encode {
770 match encode {
771 E::Text => key_encode = Some(SinkEncode::Text),
772 E::Bytes => key_encode = Some(SinkEncode::Bytes),
773 _ => {
774 return Err(ErrorCode::BindError(format!(
775 "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
776 ))
777 .into());
778 }
779 }
780 }
781
782 let (props, connection_type_flag, schema_registry_conn_ref) =
783 resolve_connection_ref_and_secret_ref(
784 WithOptions::try_from(value.row_options.as_slice())?,
785 session,
786 Some(TelemetryDatabaseObject::Sink),
787 )?;
788 ensure_connection_type_allowed(
789 connection_type_flag,
790 &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
791 )?;
792 let (mut options, secret_refs) = props.into_parts();
793
794 options
795 .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
796 .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
797
798 Ok(SinkFormatDesc {
799 format,
800 encode,
801 options,
802 secret_refs,
803 key_encode,
804 connection_id: schema_registry_conn_ref,
805 })
806}
807
808static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
809 LazyLock::new(|| {
810 use risingwave_connector::sink::Sink as _;
811 use risingwave_connector::sink::file_sink::azblob::AzblobSink;
812 use risingwave_connector::sink::file_sink::fs::FsSink;
813 use risingwave_connector::sink::file_sink::gcs::GcsSink;
814 use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
815 use risingwave_connector::sink::file_sink::s3::{S3Sink, SnowflakeSink};
816 use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
817 use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
818 use risingwave_connector::sink::kafka::KafkaSink;
819 use risingwave_connector::sink::kinesis::KinesisSink;
820 use risingwave_connector::sink::mqtt::MqttSink;
821 use risingwave_connector::sink::pulsar::PulsarSink;
822 use risingwave_connector::sink::redis::RedisSink;
823
824 convert_args!(hashmap!(
825 GooglePubSubSink::SINK_NAME => hashmap!(
826 Format::Plain => vec![Encode::Json],
827 ),
828 KafkaSink::SINK_NAME => hashmap!(
829 Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
830 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
831 Format::Debezium => vec![Encode::Json],
832 ),
833 FileSink::<S3Sink>::SINK_NAME => hashmap!(
834 Format::Plain => vec![Encode::Parquet, Encode::Json],
835 ),
836 FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
837 Format::Plain => vec![Encode::Parquet, Encode::Json],
838 ),
839 FileSink::<GcsSink>::SINK_NAME => hashmap!(
840 Format::Plain => vec![Encode::Parquet, Encode::Json],
841 ),
842 FileSink::<AzblobSink>::SINK_NAME => hashmap!(
843 Format::Plain => vec![Encode::Parquet, Encode::Json],
844 ),
845 FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
846 Format::Plain => vec![Encode::Parquet, Encode::Json],
847 ),
848 FileSink::<FsSink>::SINK_NAME => hashmap!(
849 Format::Plain => vec![Encode::Parquet, Encode::Json],
850 ),
851 KinesisSink::SINK_NAME => hashmap!(
852 Format::Plain => vec![Encode::Json],
853 Format::Upsert => vec![Encode::Json],
854 Format::Debezium => vec![Encode::Json],
855 ),
856 MqttSink::SINK_NAME => hashmap!(
857 Format::Plain => vec![Encode::Json, Encode::Protobuf],
858 ),
859 PulsarSink::SINK_NAME => hashmap!(
860 Format::Plain => vec![Encode::Json],
861 Format::Upsert => vec![Encode::Json],
862 Format::Debezium => vec![Encode::Json],
863 ),
864 RedisSink::SINK_NAME => hashmap!(
865 Format::Plain => vec![Encode::Json, Encode::Template],
866 Format::Upsert => vec![Encode::Json, Encode::Template],
867 ),
868 ))
869 });
870
871pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
872 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
873 .get(connector)
874 .ok_or_else(|| {
875 ErrorCode::BindError(format!(
876 "connector {} is not supported by FORMAT ... ENCODE ... syntax",
877 connector
878 ))
879 })?;
880 let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
881 ErrorCode::BindError(format!(
882 "connector {} does not support format {:?}",
883 connector, format_desc.format
884 ))
885 })?;
886 if !compatible_encodes.contains(&format_desc.row_encode) {
887 return Err(ErrorCode::BindError(format!(
888 "connector {} does not support format {:?} with encode {:?}",
889 connector, format_desc.format, format_desc.row_encode
890 ))
891 .into());
892 }
893
894 if let Some(encode) = &format_desc.key_encode
896 && connector != KAFKA_SINK
897 && matches!(encode, Encode::Bytes)
898 {
899 return Err(ErrorCode::BindError(format!(
900 "key encode bytes only works with kafka connector, but found {}",
901 connector
902 ))
903 .into());
904 }
905
906 Ok(())
907}
908
909#[cfg(test)]
910pub mod tests {
911 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
912
913 use crate::catalog::root_catalog::SchemaPath;
914 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
915
916 #[tokio::test]
917 async fn test_create_sink_handler() {
918 let proto_file = create_proto_file(PROTO_FILE_DATA);
919 let sql = format!(
920 r#"CREATE SOURCE t1
921 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
922 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
923 proto_file.path().to_str().unwrap()
924 );
925 let frontend = LocalFrontend::new(Default::default()).await;
926 frontend.run_sql(sql).await.unwrap();
927
928 let sql = "create materialized view mv1 as select t1.country from t1;";
929 frontend.run_sql(sql).await.unwrap();
930
931 let sql = r#"CREATE SINK snk1 FROM mv1
932 WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
933 '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
934 mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
935 frontend.run_sql(sql).await.unwrap();
936
937 let session = frontend.session_ref();
938 let catalog_reader = session.env().catalog_reader().read_guard();
939 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
940
941 let (source, _) = catalog_reader
943 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
944 .unwrap();
945 assert_eq!(source.name, "t1");
946
947 let (table, schema_name) = catalog_reader
949 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
950 .unwrap();
951 assert_eq!(table.name(), "mv1");
952
953 let (sink, _) = catalog_reader
955 .get_sink_by_name(DEFAULT_DATABASE_NAME, SchemaPath::Name(schema_name), "snk1")
956 .unwrap();
957 assert_eq!(sink.name, "snk1");
958 }
959}