1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use anyhow::Context;
19use either::Either;
20use iceberg::arrow::type_to_arrow_type;
21use iceberg::spec::Transform;
22use itertools::Itertools;
23use maplit::{convert_args, hashmap, hashset};
24use pgwire::pg_response::{PgResponse, StatementType};
25use risingwave_common::array::arrow::IcebergArrowConvert;
26use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
27use risingwave_common::bail;
28use risingwave_common::catalog::{
29 ColumnCatalog, ICEBERG_SINK_PREFIX, ObjectId, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30 Schema,
31};
32use risingwave_common::license::Feature;
33use risingwave_common::secret::LocalSecretManager;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::types::{DataType, Timestamptz};
36use risingwave_common::util::epoch::Epoch;
37use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
38use risingwave_connector::sink::file_sink::s3::SnowflakeSink;
39use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
40use risingwave_connector::sink::kafka::KAFKA_SINK;
41use risingwave_connector::sink::snowflake_redshift::redshift::RedshiftSink;
42use risingwave_connector::sink::snowflake_redshift::snowflake::SnowflakeV2Sink;
43use risingwave_connector::sink::{
44 CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
45 SINK_USER_IGNORE_DELETE_OPTION, Sink, enforce_secret_sink,
46};
47use risingwave_connector::{
48 AUTO_SCHEMA_CHANGE_KEY, SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, SINK_INTERMEDIATE_TABLE_NAME,
49 SINK_TARGET_TABLE_NAME, WithPropertiesExt,
50};
51use risingwave_pb::catalog::connection_params::PbConnectionType;
52use risingwave_pb::telemetry::TelemetryDatabaseObject;
53use risingwave_sqlparser::ast::{
54 CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
55 ObjectName, Query, Statement,
56};
57use risingwave_sqlparser::parser::Parser;
58
59use super::RwPgResponse;
60use super::create_mv::get_column_names;
61use super::create_source::UPSTREAM_SOURCE_KEY;
62use super::util::gen_query_from_table_name;
63use crate::binder::{Binder, Relation};
64use crate::catalog::root_catalog::SchemaPath;
65use crate::catalog::table_catalog::TableType;
66use crate::error::{ErrorCode, Result, RwError};
67use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
68use crate::handler::HandlerArgs;
69use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
70use crate::handler::create_mv::{
71 extract_streaming_job_resource_options, parse_column_names, resolve_streaming_job_resource_type,
72};
73use crate::handler::util::{
74 LongRunningNotificationAction, check_connector_match_connection_type,
75 ensure_connection_type_allowed, ensure_local_fs_connector_allowed,
76 execute_with_long_running_notification, get_table_catalog_by_table_name,
77 reject_internal_table_dependencies,
78};
79use crate::optimizer::backfill_order_strategy::plan_backfill_order;
80use crate::optimizer::plan_node::{
81 IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamPlanRef as PlanRef,
82 StreamProject, ensure_sync_log_store_fragment_root, generic,
83};
84use crate::optimizer::{OptimizerContext, RelationCollectorVisitor};
85use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
86use crate::session::SessionImpl;
87use crate::session::current::notice_to_user;
88use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
89use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
90use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
91
92static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
93 LazyLock::new(|| {
94 hashset! {
95 PbConnectionType::Unspecified,
96 PbConnectionType::Kafka,
97 PbConnectionType::Iceberg,
98 PbConnectionType::Elasticsearch,
99 }
100 });
101
102static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
103 LazyLock::new(|| {
104 hashset! {
105 PbConnectionType::Unspecified,
106 PbConnectionType::SchemaRegistry,
107 }
108 });
109
110const SINK_SINCE_TIMESTAMP_OPTION: &str = "since_timestamp";
111
112pub struct SinkPlanContext {
114 pub query: Box<Query>,
115 pub sink_plan: PlanRef,
116 pub sink_catalog: SinkCatalog,
117 pub target_table_catalog: Option<Arc<TableCatalog>>,
118 pub dependencies: HashSet<ObjectId>,
119 pub since_timestamp_epoch: Option<u64>,
120}
121
122pub async fn gen_sink_plan(
123 handler_args: HandlerArgs,
124 stmt: CreateSinkStatement,
125 explain_options: Option<ExplainOptions>,
126 is_iceberg_engine_internal: bool,
127) -> Result<SinkPlanContext> {
128 let session = handler_args.session.clone();
129 let session = session.as_ref();
130 let user_specified_columns = !stmt.columns.is_empty();
131 let db_name = &session.database();
132 let (sink_schema_name, sink_table_name) =
133 Binder::resolve_schema_qualified_name(db_name, &stmt.sink_name)?;
134
135 let mut with_options = handler_args.with_options.clone();
136 extract_streaming_job_resource_options(&mut with_options);
139
140 if session
141 .env()
142 .system_params_manager()
143 .get_params()
144 .load()
145 .enforce_secret()
146 && Feature::SecretManagement.check_available().is_ok()
147 {
148 enforce_secret_sink(&with_options)?;
149 }
150
151 resolve_privatelink_in_with_option(&mut with_options)?;
152 let (mut resolved_with_options, connection_type, connector_conn_ref) =
153 resolve_connection_ref_and_secret_ref(
154 with_options,
155 session,
156 Some(TelemetryDatabaseObject::Sink),
157 )?;
158
159 let since_timestamp_epoch = resolved_with_options
160 .remove(SINK_SINCE_TIMESTAMP_OPTION)
161 .map(|value| {
162 let timestamp = value.parse::<Timestamptz>().map_err(|err| {
163 ErrorCode::InvalidInputSyntax(format!(
164 "invalid value {value:?} of '{SINK_SINCE_TIMESTAMP_OPTION}' option: {err}; \
165 expected a timestamptz string with an explicit time zone, \
166 for example '2024-01-01 00:00:00Z'"
167 ))
168 })?;
169 let timestamp_millis = u64::try_from(timestamp.timestamp_millis()).unwrap_or(0);
170 Ok::<_, RwError>(Epoch::from_unix_millis_or_earliest(timestamp_millis).0)
171 })
172 .transpose()?;
173 if since_timestamp_epoch.is_some() {
174 Feature::SinkSinceTimestamp.check_available()?;
175 }
176
177 ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
178
179 if !matches!(connection_type, PbConnectionType::Unspecified) {
181 let Some(connector) = resolved_with_options.get_connector() else {
182 return Err(RwError::from(ErrorCode::ProtocolError(format!(
183 "missing field '{}' in WITH clause",
184 CONNECTOR_TYPE_KEY
185 ))));
186 };
187 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
188 }
189
190 let partition_info = get_partition_compute_info(&resolved_with_options).await?;
191
192 let context = if let Some(explain_options) = explain_options {
193 OptimizerContext::new(handler_args.clone(), explain_options)
194 } else {
195 OptimizerContext::from_handler_args(handler_args.clone())
196 };
197
198 let is_auto_schema_change = resolved_with_options
199 .remove(AUTO_SCHEMA_CHANGE_KEY)
200 .map(|value| {
201 value.parse::<bool>().map_err(|_| {
202 ErrorCode::InvalidInputSyntax(format!(
203 "invalid value {} of '{}' option, expect",
204 value, AUTO_SCHEMA_CHANGE_KEY
205 ))
206 })
207 })
208 .transpose()?
209 .unwrap_or(false);
210
211 if is_auto_schema_change {
212 Feature::SinkAutoSchemaChange.check_available()?;
213 }
214
215 let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
216 if sink_into_table_name.is_some() {
217 let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
218
219 if prev.is_some() {
220 return Err(RwError::from(ErrorCode::BindError(
221 "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
222 )));
223 }
224 }
225 let connector = resolved_with_options
226 .get(CONNECTOR_TYPE_KEY)
227 .cloned()
228 .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
229 ensure_local_fs_connector_allowed(session, &connector)?;
230
231 let sink_from_table_name;
233 let direct_sink_from_name: Option<(ObjectName, bool)>;
236 let mut query = match stmt.sink_from {
237 CreateSink::From(from_name) => {
238 sink_from_table_name = from_name.0.last().unwrap().real_value();
239 direct_sink_from_name = Some((from_name.clone(), is_auto_schema_change));
240 if is_auto_schema_change && sink_into_table_name.is_some() {
241 return Err(RwError::from(ErrorCode::InvalidInputSyntax(
242 "auto schema change not supported for sink-into-table".to_owned(),
243 )));
244 }
245 if resolved_with_options
246 .value_eq_ignore_case(SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, "true")
247 && connector == RedshiftSink::SINK_NAME
248 || connector == SnowflakeV2Sink::SINK_NAME
249 {
250 if let Some(table_name) = resolved_with_options.get(SINK_TARGET_TABLE_NAME) {
251 if resolved_with_options
253 .get(SINK_INTERMEDIATE_TABLE_NAME)
254 .is_none()
255 {
256 let intermediate_table_name = format!(
258 "rw_{}_{}_{}",
259 sink_table_name,
260 table_name,
261 uuid::Uuid::new_v4()
262 );
263 resolved_with_options.insert(
264 SINK_INTERMEDIATE_TABLE_NAME.to_owned(),
265 intermediate_table_name,
266 );
267 }
268 } else {
269 return Err(RwError::from(ErrorCode::BindError(
270 "'table.name' option must be specified.".to_owned(),
271 )));
272 }
273 }
274 Box::new(gen_query_from_table_name(from_name))
275 }
276 CreateSink::AsQuery(query) => {
277 if is_auto_schema_change {
278 return Err(RwError::from(ErrorCode::InvalidInputSyntax(
279 "auto schema change not supported for CREATE SINK AS QUERY".to_owned(),
280 )));
281 }
282 sink_from_table_name = sink_table_name.clone();
283 direct_sink_from_name = None;
284 query
285 }
286 };
287
288 if is_iceberg_engine_internal && let Some((from_name, _)) = &direct_sink_from_name {
289 let (table, _) = get_table_catalog_by_table_name(session, from_name)?;
290 let pk_names = table.pk_column_names();
291 if pk_names.len() == 1 && pk_names[0].eq(ROW_ID_COLUMN_NAME) {
292 let [stmt]: [_; 1] = Parser::parse_sql(&format!(
293 "select {} as {}, * from {}",
294 ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, from_name
295 ))
296 .context("unable to parse query")?
297 .try_into()
298 .unwrap();
299 let Statement::Query(parsed_query) = stmt else {
300 panic!("unexpected statement: {:?}", stmt);
301 };
302 query = parsed_query;
303 }
304 }
305
306 let (sink_database_id, sink_schema_id) =
307 session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
308
309 if since_timestamp_epoch.is_some() {
310 if sink_into_table_name.is_some() {
311 return Err(ErrorCode::BindError(format!(
312 "`{SINK_SINCE_TIMESTAMP_OPTION}` does not support `CREATE SINK INTO TABLE`"
313 ))
314 .into());
315 }
316 if is_iceberg_engine_internal {
317 return Err(ErrorCode::BindError(format!(
318 "`{SINK_SINCE_TIMESTAMP_OPTION}` does not support iceberg engine internal sinks"
319 ))
320 .into());
321 }
322 if let Some((from_name, _)) = &direct_sink_from_name {
323 let (table, _) = get_table_catalog_by_table_name(session, from_name)?;
324 if table.database_id != sink_database_id {
325 return Err(ErrorCode::NotSupported(
326 format!(
327 "`{SINK_SINCE_TIMESTAMP_OPTION}` does not support cross-database sinks"
328 ),
329 "Please create the sink in the same database as the upstream table.".to_owned(),
330 )
331 .into());
332 }
333 }
334 }
335
336 let (
337 dependent_relations,
338 dependent_udfs,
339 dependent_secrets,
340 bound,
341 auto_refresh_schema_from_table,
342 ) = {
343 let mut binder = Binder::new_for_stream(session);
344 let auto_refresh_schema_from_table = if let Some((from_name, true)) = &direct_sink_from_name
345 {
346 let from_relation = binder.bind_relation_by_name(from_name, None, None, true)?;
347 if let Relation::BaseTable(table) = from_relation {
348 if table.table_catalog.table_type != TableType::Table {
349 return Err(ErrorCode::InvalidInputSyntax(format!(
350 "auto schema change only support on TABLE, but got {:?}",
351 table.table_catalog.table_type
352 ))
353 .into());
354 }
355 if table.table_catalog.database_id != sink_database_id {
356 return Err(ErrorCode::InvalidInputSyntax(
357 "auto schema change sink does not support created from cross database table".to_owned()
358 )
359 .into());
360 }
361 for col in &table.table_catalog.columns {
362 if !col.is_hidden() && (col.is_generated() || col.is_rw_sys_column()) {
363 return Err(ErrorCode::InvalidInputSyntax(format!("auto schema change not supported for table with non-hidden generated column or sys column, but got {}", col.name())).into());
364 }
365 }
366 Some(table.table_catalog)
367 } else {
368 return Err(RwError::from(ErrorCode::NotSupported(
369 "auto schema change only supported for TABLE".to_owned(),
370 "try recreating the sink from table".to_owned(),
371 )));
372 }
373 } else {
374 None
375 };
376
377 let bound = binder.bind_query(&query)?;
378
379 (
380 binder.included_relations().clone(),
381 binder.included_udfs().clone(),
382 binder.included_secrets().clone(),
383 bound,
384 auto_refresh_schema_from_table,
385 )
386 };
387
388 reject_internal_table_dependencies(session, &dependent_relations, "CREATE SINK")?;
389
390 let col_names = if sink_into_table_name.is_some() {
391 parse_column_names(&stmt.columns)
392 } else {
393 get_column_names(&bound, stmt.columns)?
395 };
396
397 let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
398 if emit_on_window_close {
399 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
400 }
401
402 let format_desc = match stmt.sink_schema {
403 Some(f) => {
405 validate_compatibility(&connector, &f)?;
406 Some(bind_sink_format_desc(session,f)?)
407 }
408 None => match resolved_with_options.get(SINK_TYPE_OPTION) {
409 Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
411 session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
412 if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
413 f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
414 }
415 if let Some(v) = resolved_with_options.get(SINK_USER_IGNORE_DELETE_OPTION) {
416 f.options.insert(SINK_USER_IGNORE_DELETE_OPTION.into(), v.into());
417 }
418 f
419 }),
420 None => None,
422 },
423 };
424
425 let definition = context.normalized_sql().to_owned();
426 let mut plan_root = if is_iceberg_engine_internal {
427 Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
428 } else {
429 Planner::new_for_stream(context.into()).plan_query(bound)?
430 };
431 if let Some(col_names) = &col_names {
432 plan_root.set_out_names(col_names.clone())?;
433 };
434
435 let without_snapshot = matches!(
436 resolved_with_options.remove(SINK_SNAPSHOT_OPTION),
437 Some(flag) if flag.eq_ignore_ascii_case("false")
438 );
439
440 if since_timestamp_epoch.is_some() && !without_snapshot {
441 return Err(ErrorCode::BindError(format!(
442 "`{SINK_SINCE_TIMESTAMP_OPTION}` requires `snapshot = false`"
443 ))
444 .into());
445 }
446
447 let target_table_catalog = stmt
448 .into_table_name
449 .as_ref()
450 .map(|table_name| fetch_table_catalog_for_alter(session, table_name).map(|t| t.0))
451 .transpose()?;
452
453 if let Some(target_table_catalog) = &target_table_catalog {
454 if let Some(col_names) = col_names {
455 let target_table_columns = target_table_catalog
456 .columns()
457 .iter()
458 .map(|c| c.name())
459 .collect::<BTreeSet<_>>();
460 for c in col_names {
461 if !target_table_columns.contains(c.as_str()) {
462 return Err(RwError::from(ErrorCode::BindError(format!(
463 "Column {} not found in table {}",
464 c,
465 target_table_catalog.name()
466 ))));
467 }
468 }
469 }
470 if target_table_catalog
471 .columns()
472 .iter()
473 .any(|col| !col.nullable())
474 {
475 notice_to_user(format!(
476 "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
477 target_table_catalog.name(),
478 ));
479 }
480 }
481
482 let sink_plan = plan_root.gen_sink_plan(
483 sink_table_name,
484 definition,
485 resolved_with_options,
486 emit_on_window_close,
487 db_name.to_owned(),
488 sink_from_table_name,
489 format_desc,
490 without_snapshot,
491 since_timestamp_epoch.is_some(),
492 is_iceberg_engine_internal,
493 target_table_catalog.clone(),
494 partition_info,
495 user_specified_columns,
496 auto_refresh_schema_from_table,
497 )?;
498
499 let sink_desc = sink_plan.sink_desc().clone();
500
501 let mut sink_plan: PlanRef = sink_plan.into_stream_plan()?;
502 sink_plan = ensure_sync_log_store_fragment_root(sink_plan);
503
504 let ctx = sink_plan.ctx();
505 let explain_trace = ctx.is_explain_trace();
506 if explain_trace {
507 ctx.trace("Create Sink:");
508 ctx.trace(sink_plan.explain_to_string());
509 }
510 tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
511
512 let dependencies =
515 RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
516 .into_iter()
517 .chain(dependent_udfs.iter().copied().map_into())
518 .chain(
519 dependent_secrets
520 .iter()
521 .copied()
522 .map(|id| id.as_object_id()),
523 )
524 .collect();
525
526 let sink_catalog = sink_desc.into_catalog(
527 sink_schema_id,
528 sink_database_id,
529 session.user_id(),
530 connector_conn_ref,
531 );
532
533 if let Some(table_catalog) = &target_table_catalog {
534 for column in sink_catalog.full_columns() {
535 if !column.can_dml() {
536 unreachable!(
537 "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
538 );
539 }
540 }
541
542 let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
543 let exprs = derive_default_column_project_for_sink(
544 &sink_catalog,
545 sink_plan.schema(),
546 &table_columns_without_rw_timestamp,
547 user_specified_columns,
548 )?;
549
550 let logical_project = generic::Project::new(exprs, sink_plan);
551
552 sink_plan = StreamProject::new(logical_project).into();
553
554 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
555 &table_columns_without_rw_timestamp,
556 )?;
557
558 if let Some(exprs) = exprs {
559 let logical_project = generic::Project::new(exprs, sink_plan);
560 sink_plan = StreamProject::new(logical_project).into();
561 }
562 };
563
564 Ok(SinkPlanContext {
565 query,
566 sink_plan,
567 sink_catalog,
568 target_table_catalog,
569 dependencies,
570 since_timestamp_epoch,
571 })
572}
573
574pub async fn get_partition_compute_info(
579 with_options: &WithOptionsSecResolved,
580) -> Result<Option<PartitionComputeInfo>> {
581 let (options, secret_refs) = with_options.clone().into_parts();
582 let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
583 return Ok(None);
584 };
585 let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
586 match connector.as_str() {
587 ICEBERG_SINK => {
588 let iceberg_config = IcebergConfig::from_btreemap(properties)?;
589 get_partition_compute_info_for_iceberg(&iceberg_config).await
590 }
591 _ => Ok(None),
592 }
593}
594
595async fn get_partition_compute_info_for_iceberg(
596 _iceberg_config: &IcebergConfig,
597) -> Result<Option<PartitionComputeInfo>> {
598 if _iceberg_config.create_table_if_not_exists {
600 return Ok(None);
601 }
602 let table = _iceberg_config.load_table().await?;
603 let partition_spec = table.metadata().default_partition_spec();
604 if partition_spec.is_unpartitioned() {
605 return Ok(None);
606 }
607
608 let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
613 Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
615 Transform::Year
617 | Transform::Month
618 | Transform::Day
619 | Transform::Hour
620 | Transform::Void
621 | Transform::Unknown => false,
622 });
623 if !has_sparse_partition {
624 return Ok(None);
625 }
626
627 let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
628 table.metadata().default_partition_type().clone(),
629 ))
630 .map_err(|_| {
631 RwError::from(ErrorCode::SinkError(
632 "Fail to convert iceberg partition type to arrow type".into(),
633 ))
634 })?;
635 let ArrowDataType::Struct(struct_fields) = arrow_type else {
636 return Err(RwError::from(ErrorCode::SinkError(
637 "Partition type of iceberg should be a struct type".into(),
638 )));
639 };
640
641 let schema = table.metadata().current_schema();
642 let partition_fields = partition_spec
643 .fields()
644 .iter()
645 .map(|f| {
646 let source_f =
647 schema
648 .field_by_id(f.source_id)
649 .ok_or(RwError::from(ErrorCode::SinkError(
650 "Fail to look up iceberg partition field".into(),
651 )))?;
652 Ok((source_f.name.clone(), f.transform))
653 })
654 .collect::<Result<Vec<_>>>()?;
655
656 Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
657 partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
658 partition_fields,
659 })))
660}
661
662pub async fn handle_create_sink(
663 mut handle_args: HandlerArgs,
664 stmt: CreateSinkStatement,
665 is_iceberg_engine_internal: bool,
666) -> Result<RwPgResponse> {
667 let session = handle_args.session.clone();
668
669 session.check_cluster_limits().await?;
670
671 let mode = if stmt.or_replace {
672 prepare_replace_sink(&mut handle_args, &stmt)?
673 } else {
674 let if_not_exists = stmt.if_not_exists;
675 if let Either::Right(resp) = session.check_relation_name_duplicated(
676 stmt.sink_name.clone(),
677 StatementType::CREATE_SINK,
678 if_not_exists,
679 )? {
680 return Ok(resp);
681 }
682
683 if stmt.sink_name.base_name().starts_with(ICEBERG_SINK_PREFIX) {
684 return Err(RwError::from(ErrorCode::InvalidInputSyntax(format!(
685 "Sink name cannot start with reserved prefix '{}'",
686 ICEBERG_SINK_PREFIX
687 ))));
688 }
689
690 SinkCreateMode::Create { if_not_exists }
691 };
692
693 create_sink_or_replace(handle_args, stmt, is_iceberg_engine_internal, mode).await
694}
695
696enum SinkCreateMode {
697 Create { if_not_exists: bool },
698 Replace { original_sink: Arc<SinkCatalog> },
699}
700
701impl SinkCreateMode {
702 fn statement_name(&self) -> &'static str {
703 match self {
704 SinkCreateMode::Create { .. } => "CREATE SINK",
705 SinkCreateMode::Replace { .. } => "REPLACE SINK",
706 }
707 }
708}
709
710async fn create_sink_or_replace(
711 mut handle_args: HandlerArgs,
712 stmt: CreateSinkStatement,
713 is_iceberg_engine_internal: bool,
714 mode: SinkCreateMode,
715) -> Result<RwPgResponse> {
716 let session = handle_args.session.clone();
717
718 let resource_type =
719 resolve_streaming_job_resource_type(session.as_ref(), &mut handle_args.with_options)?;
720
721 let (sink, graph, dependencies, since_timestamp_epoch) = {
722 let backfill_order_strategy = handle_args.with_options.backfill_order_strategy();
723 let SinkPlanContext {
724 query,
725 sink_plan: plan,
726 sink_catalog: mut sink,
727 target_table_catalog,
728 dependencies,
729 since_timestamp_epoch,
730 } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
731
732 let has_order_by = !query.order_by.is_empty();
733 if has_order_by {
734 plan.ctx().warn_to_user(
735 r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
736 .to_owned(),
737 );
738 }
739
740 match &mode {
741 SinkCreateMode::Create { .. } => {
742 if let Some(table_catalog) = &target_table_catalog {
743 sink.original_target_columns = table_catalog.columns_without_rw_timestamp();
744 }
745 }
746 SinkCreateMode::Replace { original_sink } => {
747 if target_table_catalog.is_some() {
748 return Err(ErrorCode::NotSupported(
749 "REPLACE SINK INTO TABLE is not supported yet".to_owned(),
750 "replace ordinary sinks first".to_owned(),
751 )
752 .into());
753 }
754
755 sink.schema_id = original_sink.schema_id;
756 sink.database_id = original_sink.database_id;
757 sink.name = original_sink.name.clone();
758 sink.owner = original_sink.owner;
759 }
760 }
761
762 let backfill_order =
763 plan_backfill_order(session.as_ref(), backfill_order_strategy, plan.clone())?;
764 let graph =
765 build_graph_with_strategy(plan, Some(GraphJobType::Sink), Some(backfill_order))?;
766
767 (sink, graph, dependencies, since_timestamp_epoch)
768 };
769
770 let statement_name = mode.statement_name();
771 let catalog_writer = session.catalog_writer()?;
772 match mode {
773 SinkCreateMode::Create { if_not_exists } => {
774 let _job_guard = session.env().creating_streaming_job_tracker().guard(
775 CreatingStreamingJobInfo::new(
776 session.session_id(),
777 sink.database_id,
778 sink.schema_id,
779 sink.name.clone(),
780 ),
781 );
782
783 execute_with_long_running_notification(
784 catalog_writer.create_sink(
785 sink.to_proto(),
786 graph,
787 dependencies,
788 resource_type,
789 if_not_exists,
790 since_timestamp_epoch,
791 ),
792 &session,
793 statement_name,
794 LongRunningNotificationAction::MonitorBackfillJob,
795 )
796 .await?;
797 }
798 SinkCreateMode::Replace { original_sink } => {
799 let original_sink_id = original_sink.id;
800 execute_with_long_running_notification(
801 catalog_writer.replace_sink(
802 original_sink_id,
803 sink.to_proto(),
804 graph,
805 dependencies,
806 resource_type,
807 ),
808 &session,
809 statement_name,
810 LongRunningNotificationAction::DiagnoseBarrierLatency,
811 )
812 .await?;
813
814 tracing::info!(
815 old_sink_id = %original_sink_id,
816 sink_name = %sink.name,
817 "replace sink plan submitted"
818 );
819 }
820 }
821
822 Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
823}
824
825fn sink_replace_requires_exactly_once_state(sink: &SinkCatalog) -> bool {
826 match sink.properties.get("is_exactly_once") {
827 Some(value) => value.eq_ignore_ascii_case("true"),
828 None => sink
829 .properties
830 .get(CONNECTOR_TYPE_KEY)
831 .is_some_and(|connector| connector.eq_ignore_ascii_case(ICEBERG_SINK)),
832 }
833}
834
835fn prepare_replace_sink(
836 handle_args: &mut HandlerArgs,
837 stmt: &CreateSinkStatement,
838) -> Result<SinkCreateMode> {
839 let session = handle_args.session.clone();
840 if stmt.if_not_exists {
841 return Err(ErrorCode::InvalidInputSyntax(
842 "REPLACE SINK does not support IF NOT EXISTS".to_owned(),
843 )
844 .into());
845 }
846 if !matches!(&stmt.sink_from, CreateSink::From(_)) {
847 return Err(ErrorCode::NotSupported(
848 "REPLACE SINK currently only supports REPLACE SINK ... FROM table_or_mv".to_owned(),
849 "use REPLACE SINK name FROM existing_relation ...".to_owned(),
850 )
851 .into());
852 }
853 if stmt.into_table_name.is_some() {
854 return Err(ErrorCode::NotSupported(
855 "REPLACE SINK INTO TABLE is not supported yet".to_owned(),
856 "replace ordinary sinks first".to_owned(),
857 )
858 .into());
859 }
860 if handle_args
861 .with_options
862 .get(AUTO_SCHEMA_CHANGE_KEY)
863 .is_some_and(|value| value.eq_ignore_ascii_case("true"))
864 {
865 return Err(ErrorCode::NotSupported(
866 "REPLACE SINK with auto schema change is not supported yet".to_owned(),
867 "disable auto schema change for this replacement".to_owned(),
868 )
869 .into());
870 }
871 if handle_args
872 .with_options
873 .contains_key(SINK_SINCE_TIMESTAMP_OPTION)
874 {
875 return Err(ErrorCode::NotSupported(
876 "REPLACE SINK with since_timestamp is not supported yet".to_owned(),
877 "create a new sink with since_timestamp instead".to_owned(),
878 )
879 .into());
880 }
881 match handle_args.with_options.get(SINK_SNAPSHOT_OPTION) {
882 Some(value) if !value.eq_ignore_ascii_case("false") => {
883 return Err(ErrorCode::InvalidInputSyntax(
884 "REPLACE SINK must not enable snapshot backfill".to_owned(),
885 )
886 .into());
887 }
888 Some(_) => {}
889 None => {
890 handle_args
891 .with_options
892 .insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
893 }
894 }
895
896 let db_name = session.database();
897 let (sink_schema_name, sink_table_name) =
898 Binder::resolve_schema_qualified_name(&db_name, &stmt.sink_name)?;
899 let original_sink = {
900 let search_path = session.config().search_path();
901 let user_name = session.user_name();
902 let schema_path = SchemaPath::new(sink_schema_name.as_deref(), &search_path, &user_name);
903 let reader = session.env().catalog_reader().read_guard();
904 let (sink, schema_name) =
905 reader.get_created_sink_by_name(&db_name, schema_path, &sink_table_name)?;
906 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
907 if sink.target_table.is_some() {
908 return Err(ErrorCode::NotSupported(
909 "REPLACE SINK INTO TABLE is not supported yet".to_owned(),
910 "replace ordinary sinks first".to_owned(),
911 )
912 .into());
913 }
914 if sink.auto_refresh_schema_from_table.is_some() {
915 return Err(ErrorCode::NotSupported(
916 "REPLACE SINK with auto schema change is not supported yet".to_owned(),
917 "drop and recreate this auto schema change sink".to_owned(),
918 )
919 .into());
920 }
921 if sink_replace_requires_exactly_once_state(sink) {
922 return Err(ErrorCode::NotSupported(
923 "REPLACE SINK does not support exactly-once sinks yet".to_owned(),
924 "set is_exactly_once=false or recreate the sink manually".to_owned(),
925 )
926 .into());
927 }
928 sink.clone()
929 };
930
931 Ok(SinkCreateMode::Replace { original_sink })
932}
933
934pub fn fetch_incoming_sinks(
935 session: &Arc<SessionImpl>,
936 table: &TableCatalog,
937) -> Result<Vec<Arc<SinkCatalog>>> {
938 let reader = session.env().catalog_reader().read_guard();
939 let schema = reader.get_schema_by_id(table.database_id, table.schema_id)?;
940 let Some(incoming_sinks) = schema.table_incoming_sinks(table.id) else {
941 return Ok(vec![]);
942 };
943 let mut sinks = vec![];
944 for sink_id in incoming_sinks {
945 sinks.push(
946 schema
947 .get_sink_by_id(*sink_id)
948 .expect("should exist")
949 .clone(),
950 );
951 }
952 Ok(sinks)
953}
954
955fn derive_sink_to_table_expr(
956 sink_schema: &Schema,
957 idx: usize,
958 target_type: &DataType,
959) -> Result<ExprImpl> {
960 let input_type = &sink_schema.fields()[idx].data_type;
961
962 if !target_type.equals_datatype(input_type) {
963 bail!(
964 "column type mismatch: {:?} vs {:?}, column name: {:?}",
965 target_type,
966 input_type,
967 sink_schema.fields()[idx].name
968 );
969 } else {
970 Ok(ExprImpl::InputRef(Box::new(InputRef::new(
971 idx,
972 input_type.clone(),
973 ))))
974 }
975}
976
977pub(crate) fn derive_default_column_project_for_sink(
978 sink: &SinkCatalog,
979 sink_schema: &Schema,
980 columns: &[ColumnCatalog],
981 user_specified_columns: bool,
982) -> Result<Vec<ExprImpl>> {
983 assert_eq!(sink.full_schema().len(), sink_schema.len());
984
985 let default_column_exprs = TableCatalog::default_column_exprs(columns);
986
987 let mut exprs = vec![];
988
989 let sink_visible_col_idxes = sink
990 .full_columns()
991 .iter()
992 .positions(|c| !c.is_hidden())
993 .collect_vec();
994 let sink_visible_col_idxes_by_name = sink
995 .full_columns()
996 .iter()
997 .enumerate()
998 .filter(|(_, c)| !c.is_hidden())
999 .map(|(i, c)| (c.name(), i))
1000 .collect::<BTreeMap<_, _>>();
1001
1002 for (idx, column) in columns.iter().enumerate() {
1003 if !column.can_dml() {
1004 continue;
1005 }
1006
1007 let default_col_expr =
1008 || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
1009
1010 let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
1011 derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
1012 };
1013
1014 if user_specified_columns {
1018 if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
1019 exprs.push(sink_col_expr(*idx)?);
1020 } else {
1021 exprs.push(default_col_expr());
1022 }
1023 } else {
1024 if idx < sink_visible_col_idxes.len() {
1025 exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
1026 } else {
1027 exprs.push(default_col_expr());
1028 };
1029 }
1030 }
1031 Ok(exprs)
1032}
1033
1034fn bind_sink_format_desc(
1038 session: &SessionImpl,
1039 value: FormatEncodeOptions,
1040) -> Result<SinkFormatDesc> {
1041 use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
1042 use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
1043 use risingwave_sqlparser::ast::{Encode as E, Format as F};
1044
1045 let format = match value.format {
1046 F::Plain => SinkFormat::AppendOnly,
1047 F::Upsert => SinkFormat::Upsert,
1048 F::Debezium => SinkFormat::Debezium,
1049 f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
1050 return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
1051 }
1052 };
1053 let encode = match value.row_encode {
1054 E::Json => SinkEncode::Json,
1055 E::Protobuf => SinkEncode::Protobuf,
1056 E::Avro => SinkEncode::Avro,
1057 E::Template => SinkEncode::Template,
1058 E::Parquet => SinkEncode::Parquet,
1059 E::Bytes => SinkEncode::Bytes,
1060 e @ (E::Native | E::Csv | E::None | E::Text) => {
1061 return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
1062 }
1063 };
1064
1065 let mut key_encode = None;
1066 if let Some(encode) = value.key_encode {
1067 match encode {
1068 E::Text => key_encode = Some(SinkEncode::Text),
1069 E::Bytes => key_encode = Some(SinkEncode::Bytes),
1070 _ => {
1071 return Err(ErrorCode::BindError(format!(
1072 "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
1073 ))
1074 .into());
1075 }
1076 }
1077 }
1078
1079 let (props, connection_type_flag, schema_registry_conn_ref) =
1080 resolve_connection_ref_and_secret_ref(
1081 WithOptions::try_from(value.row_options.as_slice())?,
1082 session,
1083 Some(TelemetryDatabaseObject::Sink),
1084 )?;
1085 ensure_connection_type_allowed(
1086 connection_type_flag,
1087 &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
1088 )?;
1089 let (mut options, secret_refs) = props.into_parts();
1090
1091 options
1092 .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
1093 .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
1094
1095 Ok(SinkFormatDesc {
1096 format,
1097 encode,
1098 options,
1099 secret_refs,
1100 key_encode,
1101 connection_id: schema_registry_conn_ref,
1102 })
1103}
1104
1105static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
1106 LazyLock::new(|| {
1107 use risingwave_connector::sink::Sink as _;
1108 use risingwave_connector::sink::file_sink::azblob::AzblobSink;
1109 use risingwave_connector::sink::file_sink::fs::FsSink;
1110 use risingwave_connector::sink::file_sink::gcs::GcsSink;
1111 use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
1112 use risingwave_connector::sink::file_sink::s3::S3Sink;
1113 use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
1114 use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
1115 use risingwave_connector::sink::kafka::KafkaSink;
1116 use risingwave_connector::sink::kinesis::KinesisSink;
1117 use risingwave_connector::sink::mqtt::MqttSink;
1118 use risingwave_connector::sink::pulsar::PulsarSink;
1119 use risingwave_connector::sink::redis::RedisSink;
1120
1121 convert_args!(hashmap!(
1122 GooglePubSubSink::SINK_NAME => hashmap!(
1123 Format::Plain => vec![Encode::Json],
1124 ),
1125 KafkaSink::SINK_NAME => hashmap!(
1126 Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf, Encode::Bytes],
1127 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
1128 Format::Debezium => vec![Encode::Json],
1129 ),
1130 FileSink::<S3Sink>::SINK_NAME => hashmap!(
1131 Format::Plain => vec![Encode::Parquet, Encode::Json],
1132 ),
1133 FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
1134 Format::Plain => vec![Encode::Parquet, Encode::Json],
1135 ),
1136 FileSink::<GcsSink>::SINK_NAME => hashmap!(
1137 Format::Plain => vec![Encode::Parquet, Encode::Json],
1138 ),
1139 FileSink::<AzblobSink>::SINK_NAME => hashmap!(
1140 Format::Plain => vec![Encode::Parquet, Encode::Json],
1141 ),
1142 FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
1143 Format::Plain => vec![Encode::Parquet, Encode::Json],
1144 ),
1145 FileSink::<FsSink>::SINK_NAME => hashmap!(
1146 Format::Plain => vec![Encode::Parquet, Encode::Json],
1147 ),
1148 KinesisSink::SINK_NAME => hashmap!(
1149 Format::Plain => vec![Encode::Json],
1150 Format::Upsert => vec![Encode::Json],
1151 Format::Debezium => vec![Encode::Json],
1152 ),
1153 MqttSink::SINK_NAME => hashmap!(
1154 Format::Plain => vec![Encode::Json, Encode::Protobuf],
1155 ),
1156 PulsarSink::SINK_NAME => hashmap!(
1157 Format::Plain => vec![Encode::Json],
1158 Format::Upsert => vec![Encode::Json],
1159 Format::Debezium => vec![Encode::Json],
1160 ),
1161 RedisSink::SINK_NAME => hashmap!(
1162 Format::Plain => vec![Encode::Json, Encode::Template],
1163 Format::Upsert => vec![Encode::Json, Encode::Template],
1164 ),
1165 ))
1166 });
1167
1168pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
1169 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
1170 .get(connector)
1171 .ok_or_else(|| {
1172 ErrorCode::BindError(format!(
1173 "connector {} is not supported by FORMAT ... ENCODE ... syntax",
1174 connector
1175 ))
1176 })?;
1177 let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
1178 ErrorCode::BindError(format!(
1179 "connector {} does not support format {:?}",
1180 connector, format_desc.format
1181 ))
1182 })?;
1183 if !compatible_encodes.contains(&format_desc.row_encode) {
1184 return Err(ErrorCode::BindError(format!(
1185 "connector {} does not support format {:?} with encode {:?}",
1186 connector, format_desc.format, format_desc.row_encode
1187 ))
1188 .into());
1189 }
1190
1191 if let Some(encode) = &format_desc.key_encode
1193 && connector != KAFKA_SINK
1194 && matches!(encode, Encode::Bytes)
1195 {
1196 return Err(ErrorCode::BindError(format!(
1197 "key encode bytes only works with kafka connector, but found {}",
1198 connector
1199 ))
1200 .into());
1201 }
1202
1203 Ok(())
1204}
1205
1206#[cfg(test)]
1207pub mod tests {
1208 use risingwave_common::catalog::{CreateType, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
1209 use risingwave_common::config::FrontendConfig;
1210
1211 use crate::catalog::root_catalog::SchemaPath;
1212 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1213
1214 #[tokio::test]
1215 async fn test_create_sink_handler() {
1216 let proto_file = create_proto_file(PROTO_FILE_DATA);
1217 let sql = format!(
1218 r#"CREATE SOURCE t1
1219 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1220 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1221 proto_file.path().to_str().unwrap()
1222 );
1223 let frontend = LocalFrontend::new(Default::default()).await;
1224 frontend.run_sql(sql).await.unwrap();
1225
1226 let sql = "create materialized view mv1 as select t1.country from t1;";
1227 frontend.run_sql(sql).await.unwrap();
1228
1229 let sql = r#"CREATE SINK snk1 FROM mv1
1230 WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
1231 '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
1232 mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
1233 frontend.run_sql(sql).await.unwrap();
1234
1235 let session = frontend.session_ref();
1236 let catalog_reader = session.env().catalog_reader().read_guard();
1237 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1238
1239 let (source, _) = catalog_reader
1241 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
1242 .unwrap();
1243 assert_eq!(source.name, "t1");
1244
1245 let (table, schema_name) = catalog_reader
1247 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
1248 .unwrap();
1249 assert_eq!(table.name(), "mv1");
1250 let schema_name = schema_name.to_owned();
1251
1252 let (sink, _) = catalog_reader
1254 .get_created_sink_by_name(
1255 DEFAULT_DATABASE_NAME,
1256 SchemaPath::Name(&schema_name),
1257 "snk1",
1258 )
1259 .unwrap();
1260 assert_eq!(sink.name, "snk1");
1261 drop(catalog_reader);
1262
1263 let sql = r#"REPLACE SINK snk1 FROM mv1
1264 WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
1265 '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
1266 mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
1267 frontend.run_sql(sql).await.unwrap();
1268
1269 let catalog_reader = session.env().catalog_reader().read_guard();
1270 let (sink, _) = catalog_reader
1271 .get_created_sink_by_name(
1272 DEFAULT_DATABASE_NAME,
1273 SchemaPath::Name(&schema_name),
1274 "snk1",
1275 )
1276 .unwrap();
1277 assert_eq!(sink.name, "snk1");
1278 assert_eq!(sink.create_type, CreateType::Foreground);
1281 }
1282
1283 #[tokio::test]
1284 async fn test_create_fs_sink_requires_frontend_config() {
1285 let frontend = LocalFrontend::with_frontend_config(
1286 Default::default(),
1287 FrontendConfig {
1288 unsafe_enable_local_fs_connector: false,
1289 ..Default::default()
1290 },
1291 )
1292 .await;
1293 frontend.run_sql("CREATE TABLE t(v int);").await.unwrap();
1294 frontend
1295 .run_sql("CREATE MATERIALIZED VIEW mv AS SELECT * FROM t;")
1296 .await
1297 .unwrap();
1298
1299 let err = frontend
1300 .run_sql(
1301 r#"CREATE SINK local_sink FROM mv
1302 WITH (
1303 connector = 'fs',
1304 fs.path = '/tmp/rw-local-sink',
1305 type = 'append-only',
1306 force_append_only = 'true'
1307 ) FORMAT PLAIN ENCODE JSON (force_append_only = 'true');"#
1308 .to_owned(),
1309 )
1310 .await
1311 .unwrap_err();
1312
1313 assert!(
1314 err.to_string()
1315 .contains("frontend.unsafe_enable_local_fs_connector = true"),
1316 "{err:?}"
1317 );
1318 }
1319}