Skip to main content

risingwave_frontend/handler/
create_sink.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17
18use anyhow::Context;
19use either::Either;
20use iceberg::arrow::type_to_arrow_type;
21use iceberg::spec::Transform;
22use itertools::Itertools;
23use maplit::{convert_args, hashmap, hashset};
24use pgwire::pg_response::{PgResponse, StatementType};
25use risingwave_common::array::arrow::IcebergArrowConvert;
26use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
27use risingwave_common::bail;
28use risingwave_common::catalog::{
29    ColumnCatalog, ICEBERG_SINK_PREFIX, ObjectId, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30    Schema,
31};
32use risingwave_common::license::Feature;
33use risingwave_common::secret::LocalSecretManager;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::types::{DataType, Timestamptz};
36use risingwave_common::util::epoch::Epoch;
37use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
38use risingwave_connector::sink::file_sink::s3::SnowflakeSink;
39use risingwave_connector::sink::iceberg::{ICEBERG_SINK, IcebergConfig};
40use risingwave_connector::sink::kafka::KAFKA_SINK;
41use risingwave_connector::sink::snowflake_redshift::redshift::RedshiftSink;
42use risingwave_connector::sink::snowflake_redshift::snowflake::SnowflakeV2Sink;
43use risingwave_connector::sink::{
44    CONNECTOR_TYPE_KEY, SINK_SNAPSHOT_OPTION, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
45    SINK_USER_IGNORE_DELETE_OPTION, Sink, enforce_secret_sink,
46};
47use risingwave_connector::{
48    AUTO_SCHEMA_CHANGE_KEY, SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, SINK_INTERMEDIATE_TABLE_NAME,
49    SINK_TARGET_TABLE_NAME, WithPropertiesExt,
50};
51use risingwave_pb::catalog::connection_params::PbConnectionType;
52use risingwave_pb::telemetry::TelemetryDatabaseObject;
53use risingwave_sqlparser::ast::{
54    CreateSink, CreateSinkStatement, EmitMode, Encode, ExplainOptions, Format, FormatEncodeOptions,
55    ObjectName, Query, Statement,
56};
57use risingwave_sqlparser::parser::Parser;
58
59use super::RwPgResponse;
60use super::create_mv::get_column_names;
61use super::create_source::UPSTREAM_SOURCE_KEY;
62use super::util::gen_query_from_table_name;
63use crate::binder::{Binder, Relation};
64use crate::catalog::root_catalog::SchemaPath;
65use crate::catalog::table_catalog::TableType;
66use crate::error::{ErrorCode, Result, RwError};
67use crate::expr::{ExprImpl, InputRef, rewrite_now_to_proctime};
68use crate::handler::HandlerArgs;
69use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
70use crate::handler::create_mv::{
71    extract_streaming_job_resource_options, parse_column_names, resolve_streaming_job_resource_type,
72};
73use crate::handler::util::{
74    LongRunningNotificationAction, check_connector_match_connection_type,
75    ensure_connection_type_allowed, ensure_local_fs_connector_allowed,
76    execute_with_long_running_notification, get_table_catalog_by_table_name,
77    reject_internal_table_dependencies,
78};
79use crate::optimizer::backfill_order_strategy::plan_backfill_order;
80use crate::optimizer::plan_node::{
81    IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamPlanRef as PlanRef,
82    StreamProject, ensure_sync_log_store_fragment_root, generic,
83};
84use crate::optimizer::{OptimizerContext, RelationCollectorVisitor};
85use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
86use crate::session::SessionImpl;
87use crate::session::current::notice_to_user;
88use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
89use crate::utils::{resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option};
90use crate::{Explain, Planner, TableCatalog, WithOptions, WithOptionsSecResolved};
91
92static SINK_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
93    LazyLock::new(|| {
94        hashset! {
95            PbConnectionType::Unspecified,
96            PbConnectionType::Kafka,
97            PbConnectionType::Iceberg,
98            PbConnectionType::Elasticsearch,
99        }
100    });
101
102static SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
103    LazyLock::new(|| {
104        hashset! {
105            PbConnectionType::Unspecified,
106            PbConnectionType::SchemaRegistry,
107        }
108    });
109
110const SINK_SINCE_TIMESTAMP_OPTION: &str = "since_timestamp";
111
112// used to store result of `gen_sink_plan`
113pub struct SinkPlanContext {
114    pub query: Box<Query>,
115    pub sink_plan: PlanRef,
116    pub sink_catalog: SinkCatalog,
117    pub target_table_catalog: Option<Arc<TableCatalog>>,
118    pub dependencies: HashSet<ObjectId>,
119    pub since_timestamp_epoch: Option<u64>,
120}
121
122pub async fn gen_sink_plan(
123    handler_args: HandlerArgs,
124    stmt: CreateSinkStatement,
125    explain_options: Option<ExplainOptions>,
126    is_iceberg_engine_internal: bool,
127) -> Result<SinkPlanContext> {
128    let session = handler_args.session.clone();
129    let session = session.as_ref();
130    let user_specified_columns = !stmt.columns.is_empty();
131    let db_name = &session.database();
132    let (sink_schema_name, sink_table_name) =
133        Binder::resolve_schema_qualified_name(db_name, &stmt.sink_name)?;
134
135    let mut with_options = handler_args.with_options.clone();
136    // These are frontend-level streaming job options. They must not be passed to connector
137    // property validation.
138    extract_streaming_job_resource_options(&mut with_options);
139
140    if session
141        .env()
142        .system_params_manager()
143        .get_params()
144        .load()
145        .enforce_secret()
146        && Feature::SecretManagement.check_available().is_ok()
147    {
148        enforce_secret_sink(&with_options)?;
149    }
150
151    resolve_privatelink_in_with_option(&mut with_options)?;
152    let (mut resolved_with_options, connection_type, connector_conn_ref) =
153        resolve_connection_ref_and_secret_ref(
154            with_options,
155            session,
156            Some(TelemetryDatabaseObject::Sink),
157        )?;
158
159    let since_timestamp_epoch = resolved_with_options
160        .remove(SINK_SINCE_TIMESTAMP_OPTION)
161        .map(|value| {
162            let timestamp = value.parse::<Timestamptz>().map_err(|err| {
163                ErrorCode::InvalidInputSyntax(format!(
164                    "invalid value {value:?} of '{SINK_SINCE_TIMESTAMP_OPTION}' option: {err}; \
165                     expected a timestamptz string with an explicit time zone, \
166                     for example '2024-01-01 00:00:00Z'"
167                ))
168            })?;
169            let timestamp_millis = u64::try_from(timestamp.timestamp_millis()).unwrap_or(0);
170            Ok::<_, RwError>(Epoch::from_unix_millis_or_earliest(timestamp_millis).0)
171        })
172        .transpose()?;
173    if since_timestamp_epoch.is_some() {
174        Feature::SinkSinceTimestamp.check_available()?;
175    }
176
177    ensure_connection_type_allowed(connection_type, &SINK_ALLOWED_CONNECTION_CONNECTOR)?;
178
179    // if not using connection, we don't need to check connector match connection type
180    if !matches!(connection_type, PbConnectionType::Unspecified) {
181        let Some(connector) = resolved_with_options.get_connector() else {
182            return Err(RwError::from(ErrorCode::ProtocolError(format!(
183                "missing field '{}' in WITH clause",
184                CONNECTOR_TYPE_KEY
185            ))));
186        };
187        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
188    }
189
190    let partition_info = get_partition_compute_info(&resolved_with_options).await?;
191
192    let context = if let Some(explain_options) = explain_options {
193        OptimizerContext::new(handler_args.clone(), explain_options)
194    } else {
195        OptimizerContext::from_handler_args(handler_args.clone())
196    };
197
198    let is_auto_schema_change = resolved_with_options
199        .remove(AUTO_SCHEMA_CHANGE_KEY)
200        .map(|value| {
201            value.parse::<bool>().map_err(|_| {
202                ErrorCode::InvalidInputSyntax(format!(
203                    "invalid value {} of '{}' option, expect",
204                    value, AUTO_SCHEMA_CHANGE_KEY
205                ))
206            })
207        })
208        .transpose()?
209        .unwrap_or(false);
210
211    if is_auto_schema_change {
212        Feature::SinkAutoSchemaChange.check_available()?;
213    }
214
215    let sink_into_table_name = stmt.into_table_name.as_ref().map(|name| name.real_value());
216    if sink_into_table_name.is_some() {
217        let prev = resolved_with_options.insert(CONNECTOR_TYPE_KEY.to_owned(), "table".to_owned());
218
219        if prev.is_some() {
220            return Err(RwError::from(ErrorCode::BindError(
221                "In the case of sinking into table, the 'connector' parameter should not be provided.".to_owned(),
222            )));
223        }
224    }
225    let connector = resolved_with_options
226        .get(CONNECTOR_TYPE_KEY)
227        .cloned()
228        .ok_or_else(|| ErrorCode::BindError(format!("missing field '{CONNECTOR_TYPE_KEY}'")))?;
229    ensure_local_fs_connector_allowed(session, &connector)?;
230
231    // Used for debezium's table name
232    let sink_from_table_name;
233    // `true` means that sink statement has the form: `CREATE SINK s1 FROM ...`
234    // `false` means that sink statement has the form: `CREATE SINK s1 AS <query>`
235    let direct_sink_from_name: Option<(ObjectName, bool)>;
236    let mut query = match stmt.sink_from {
237        CreateSink::From(from_name) => {
238            sink_from_table_name = from_name.0.last().unwrap().real_value();
239            direct_sink_from_name = Some((from_name.clone(), is_auto_schema_change));
240            if is_auto_schema_change && sink_into_table_name.is_some() {
241                return Err(RwError::from(ErrorCode::InvalidInputSyntax(
242                    "auto schema change not supported for sink-into-table".to_owned(),
243                )));
244            }
245            if resolved_with_options
246                .value_eq_ignore_case(SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY, "true")
247                && connector == RedshiftSink::SINK_NAME
248                || connector == SnowflakeV2Sink::SINK_NAME
249            {
250                if let Some(table_name) = resolved_with_options.get(SINK_TARGET_TABLE_NAME) {
251                    // auto fill intermediate table name if target table name is specified
252                    if resolved_with_options
253                        .get(SINK_INTERMEDIATE_TABLE_NAME)
254                        .is_none()
255                    {
256                        // generate the intermediate table name with random value appended to the target table name
257                        let intermediate_table_name = format!(
258                            "rw_{}_{}_{}",
259                            sink_table_name,
260                            table_name,
261                            uuid::Uuid::new_v4()
262                        );
263                        resolved_with_options.insert(
264                            SINK_INTERMEDIATE_TABLE_NAME.to_owned(),
265                            intermediate_table_name,
266                        );
267                    }
268                } else {
269                    return Err(RwError::from(ErrorCode::BindError(
270                        "'table.name' option must be specified.".to_owned(),
271                    )));
272                }
273            }
274            Box::new(gen_query_from_table_name(from_name))
275        }
276        CreateSink::AsQuery(query) => {
277            if is_auto_schema_change {
278                return Err(RwError::from(ErrorCode::InvalidInputSyntax(
279                    "auto schema change not supported for CREATE SINK AS QUERY".to_owned(),
280                )));
281            }
282            sink_from_table_name = sink_table_name.clone();
283            direct_sink_from_name = None;
284            query
285        }
286    };
287
288    if is_iceberg_engine_internal && let Some((from_name, _)) = &direct_sink_from_name {
289        let (table, _) = get_table_catalog_by_table_name(session, from_name)?;
290        let pk_names = table.pk_column_names();
291        if pk_names.len() == 1 && pk_names[0].eq(ROW_ID_COLUMN_NAME) {
292            let [stmt]: [_; 1] = Parser::parse_sql(&format!(
293                "select {} as {}, * from {}",
294                ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, from_name
295            ))
296            .context("unable to parse query")?
297            .try_into()
298            .unwrap();
299            let Statement::Query(parsed_query) = stmt else {
300                panic!("unexpected statement: {:?}", stmt);
301            };
302            query = parsed_query;
303        }
304    }
305
306    let (sink_database_id, sink_schema_id) =
307        session.get_database_and_schema_id_for_create(sink_schema_name.clone())?;
308
309    if since_timestamp_epoch.is_some() {
310        if sink_into_table_name.is_some() {
311            return Err(ErrorCode::BindError(format!(
312                "`{SINK_SINCE_TIMESTAMP_OPTION}` does not support `CREATE SINK INTO TABLE`"
313            ))
314            .into());
315        }
316        if is_iceberg_engine_internal {
317            return Err(ErrorCode::BindError(format!(
318                "`{SINK_SINCE_TIMESTAMP_OPTION}` does not support iceberg engine internal sinks"
319            ))
320            .into());
321        }
322        if let Some((from_name, _)) = &direct_sink_from_name {
323            let (table, _) = get_table_catalog_by_table_name(session, from_name)?;
324            if table.database_id != sink_database_id {
325                return Err(ErrorCode::NotSupported(
326                    format!(
327                        "`{SINK_SINCE_TIMESTAMP_OPTION}` does not support cross-database sinks"
328                    ),
329                    "Please create the sink in the same database as the upstream table.".to_owned(),
330                )
331                .into());
332            }
333        }
334    }
335
336    let (
337        dependent_relations,
338        dependent_udfs,
339        dependent_secrets,
340        bound,
341        auto_refresh_schema_from_table,
342    ) = {
343        let mut binder = Binder::new_for_stream(session);
344        let auto_refresh_schema_from_table = if let Some((from_name, true)) = &direct_sink_from_name
345        {
346            let from_relation = binder.bind_relation_by_name(from_name, None, None, true)?;
347            if let Relation::BaseTable(table) = from_relation {
348                if table.table_catalog.table_type != TableType::Table {
349                    return Err(ErrorCode::InvalidInputSyntax(format!(
350                        "auto schema change only support on TABLE, but got {:?}",
351                        table.table_catalog.table_type
352                    ))
353                    .into());
354                }
355                if table.table_catalog.database_id != sink_database_id {
356                    return Err(ErrorCode::InvalidInputSyntax(
357                        "auto schema change sink does not support created from cross database table".to_owned()
358                    )
359                        .into());
360                }
361                for col in &table.table_catalog.columns {
362                    if !col.is_hidden() && (col.is_generated() || col.is_rw_sys_column()) {
363                        return Err(ErrorCode::InvalidInputSyntax(format!("auto schema change not supported for table with non-hidden generated column or sys column, but got {}", col.name())).into());
364                    }
365                }
366                Some(table.table_catalog)
367            } else {
368                return Err(RwError::from(ErrorCode::NotSupported(
369                    "auto schema change only supported for TABLE".to_owned(),
370                    "try recreating the sink from table".to_owned(),
371                )));
372            }
373        } else {
374            None
375        };
376
377        let bound = binder.bind_query(&query)?;
378
379        (
380            binder.included_relations().clone(),
381            binder.included_udfs().clone(),
382            binder.included_secrets().clone(),
383            bound,
384            auto_refresh_schema_from_table,
385        )
386    };
387
388    reject_internal_table_dependencies(session, &dependent_relations, "CREATE SINK")?;
389
390    let col_names = if sink_into_table_name.is_some() {
391        parse_column_names(&stmt.columns)
392    } else {
393        // If column names not specified, use the name in the bound query, which is equal with the plan root's original field name.
394        get_column_names(&bound, stmt.columns)?
395    };
396
397    let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
398    if emit_on_window_close {
399        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
400    }
401
402    let format_desc = match stmt.sink_schema {
403        // Case A: new syntax `format ... encode ...`
404        Some(f) => {
405            validate_compatibility(&connector, &f)?;
406            Some(bind_sink_format_desc(session,f)?)
407        }
408        None => match resolved_with_options.get(SINK_TYPE_OPTION) {
409            // Case B: old syntax `type = '...'`
410            Some(t) => SinkFormatDesc::from_legacy_type(&connector, t)?.map(|mut f| {
411                session.notice_to_user("Consider using the newer syntax `FORMAT ... ENCODE ...` instead of `type = '...'`.");
412                if let Some(v) = resolved_with_options.get(SINK_USER_FORCE_APPEND_ONLY_OPTION) {
413                    f.options.insert(SINK_USER_FORCE_APPEND_ONLY_OPTION.into(), v.into());
414                }
415                if let Some(v) = resolved_with_options.get(SINK_USER_IGNORE_DELETE_OPTION) {
416                    f.options.insert(SINK_USER_IGNORE_DELETE_OPTION.into(), v.into());
417                }
418                f
419            }),
420            // Case C: no format + encode required
421            None => None,
422        },
423    };
424
425    let definition = context.normalized_sql().to_owned();
426    let mut plan_root = if is_iceberg_engine_internal {
427        Planner::new_for_iceberg_table_engine_sink(context.into()).plan_query(bound)?
428    } else {
429        Planner::new_for_stream(context.into()).plan_query(bound)?
430    };
431    if let Some(col_names) = &col_names {
432        plan_root.set_out_names(col_names.clone())?;
433    };
434
435    let without_snapshot = matches!(
436        resolved_with_options.remove(SINK_SNAPSHOT_OPTION),
437        Some(flag) if flag.eq_ignore_ascii_case("false")
438    );
439
440    if since_timestamp_epoch.is_some() && !without_snapshot {
441        return Err(ErrorCode::BindError(format!(
442            "`{SINK_SINCE_TIMESTAMP_OPTION}` requires `snapshot = false`"
443        ))
444        .into());
445    }
446
447    let target_table_catalog = stmt
448        .into_table_name
449        .as_ref()
450        .map(|table_name| fetch_table_catalog_for_alter(session, table_name).map(|t| t.0))
451        .transpose()?;
452
453    if let Some(target_table_catalog) = &target_table_catalog {
454        if let Some(col_names) = col_names {
455            let target_table_columns = target_table_catalog
456                .columns()
457                .iter()
458                .map(|c| c.name())
459                .collect::<BTreeSet<_>>();
460            for c in col_names {
461                if !target_table_columns.contains(c.as_str()) {
462                    return Err(RwError::from(ErrorCode::BindError(format!(
463                        "Column {} not found in table {}",
464                        c,
465                        target_table_catalog.name()
466                    ))));
467                }
468            }
469        }
470        if target_table_catalog
471            .columns()
472            .iter()
473            .any(|col| !col.nullable())
474        {
475            notice_to_user(format!(
476                "The target table `{}` contains columns with NOT NULL constraints. Any sinked rows violating the constraints will be ignored silently.",
477                target_table_catalog.name(),
478            ));
479        }
480    }
481
482    let sink_plan = plan_root.gen_sink_plan(
483        sink_table_name,
484        definition,
485        resolved_with_options,
486        emit_on_window_close,
487        db_name.to_owned(),
488        sink_from_table_name,
489        format_desc,
490        without_snapshot,
491        since_timestamp_epoch.is_some(),
492        is_iceberg_engine_internal,
493        target_table_catalog.clone(),
494        partition_info,
495        user_specified_columns,
496        auto_refresh_schema_from_table,
497    )?;
498
499    let sink_desc = sink_plan.sink_desc().clone();
500
501    let mut sink_plan: PlanRef = sink_plan.into_stream_plan()?;
502    sink_plan = ensure_sync_log_store_fragment_root(sink_plan);
503
504    let ctx = sink_plan.ctx();
505    let explain_trace = ctx.is_explain_trace();
506    if explain_trace {
507        ctx.trace("Create Sink:");
508        ctx.trace(sink_plan.explain_to_string());
509    }
510    tracing::trace!("sink_plan: {:?}", sink_plan.explain_to_string());
511
512    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
513    // during binding instead of visiting the optimized plan.
514    let dependencies =
515        RelationCollectorVisitor::collect_with(dependent_relations, sink_plan.clone())
516            .into_iter()
517            .chain(dependent_udfs.iter().copied().map_into())
518            .chain(
519                dependent_secrets
520                    .iter()
521                    .copied()
522                    .map(|id| id.as_object_id()),
523            )
524            .collect();
525
526    let sink_catalog = sink_desc.into_catalog(
527        sink_schema_id,
528        sink_database_id,
529        session.user_id(),
530        connector_conn_ref,
531    );
532
533    if let Some(table_catalog) = &target_table_catalog {
534        for column in sink_catalog.full_columns() {
535            if !column.can_dml() {
536                unreachable!(
537                    "can not derive generated columns and system column `_rw_timestamp` in a sink's catalog, but meet one"
538                );
539            }
540        }
541
542        let table_columns_without_rw_timestamp = table_catalog.columns_without_rw_timestamp();
543        let exprs = derive_default_column_project_for_sink(
544            &sink_catalog,
545            sink_plan.schema(),
546            &table_columns_without_rw_timestamp,
547            user_specified_columns,
548        )?;
549
550        let logical_project = generic::Project::new(exprs, sink_plan);
551
552        sink_plan = StreamProject::new(logical_project).into();
553
554        let exprs = LogicalSource::derive_output_exprs_from_generated_columns(
555            &table_columns_without_rw_timestamp,
556        )?;
557
558        if let Some(exprs) = exprs {
559            let logical_project = generic::Project::new(exprs, sink_plan);
560            sink_plan = StreamProject::new(logical_project).into();
561        }
562    };
563
564    Ok(SinkPlanContext {
565        query,
566        sink_plan,
567        sink_catalog,
568        target_table_catalog,
569        dependencies,
570        since_timestamp_epoch,
571    })
572}
573
574// This function is used to return partition compute info for a sink. More details refer in `PartitionComputeInfo`.
575// Return:
576// `Some(PartitionComputeInfo)` if the sink need to compute partition.
577// `None` if the sink does not need to compute partition.
578pub async fn get_partition_compute_info(
579    with_options: &WithOptionsSecResolved,
580) -> Result<Option<PartitionComputeInfo>> {
581    let (options, secret_refs) = with_options.clone().into_parts();
582    let Some(connector) = options.get(UPSTREAM_SOURCE_KEY).cloned() else {
583        return Ok(None);
584    };
585    let properties = LocalSecretManager::global().fill_secrets(options, secret_refs)?;
586    match connector.as_str() {
587        ICEBERG_SINK => {
588            let iceberg_config = IcebergConfig::from_btreemap(properties)?;
589            get_partition_compute_info_for_iceberg(&iceberg_config).await
590        }
591        _ => Ok(None),
592    }
593}
594
595async fn get_partition_compute_info_for_iceberg(
596    _iceberg_config: &IcebergConfig,
597) -> Result<Option<PartitionComputeInfo>> {
598    // TODO: check table if exists
599    if _iceberg_config.create_table_if_not_exists {
600        return Ok(None);
601    }
602    let table = _iceberg_config.load_table().await?;
603    let partition_spec = table.metadata().default_partition_spec();
604    if partition_spec.is_unpartitioned() {
605        return Ok(None);
606    }
607
608    // Separate the partition spec into two parts: sparse partition and range partition.
609    // Sparse partition means that the data distribution is more sparse at a given time.
610    // Range partition means that the data distribution is likely same at a given time.
611    // Only compute the partition and shuffle by them for the sparse partition.
612    let has_sparse_partition = partition_spec.fields().iter().any(|f| match f.transform {
613        // Sparse partition
614        Transform::Identity | Transform::Truncate(_) | Transform::Bucket(_) => true,
615        // Range partition
616        Transform::Year
617        | Transform::Month
618        | Transform::Day
619        | Transform::Hour
620        | Transform::Void
621        | Transform::Unknown => false,
622    });
623    if !has_sparse_partition {
624        return Ok(None);
625    }
626
627    let arrow_type = type_to_arrow_type(&iceberg::spec::Type::Struct(
628        table.metadata().default_partition_type().clone(),
629    ))
630    .map_err(|_| {
631        RwError::from(ErrorCode::SinkError(
632            "Fail to convert iceberg partition type to arrow type".into(),
633        ))
634    })?;
635    let ArrowDataType::Struct(struct_fields) = arrow_type else {
636        return Err(RwError::from(ErrorCode::SinkError(
637            "Partition type of iceberg should be a struct type".into(),
638        )));
639    };
640
641    let schema = table.metadata().current_schema();
642    let partition_fields = partition_spec
643        .fields()
644        .iter()
645        .map(|f| {
646            let source_f =
647                schema
648                    .field_by_id(f.source_id)
649                    .ok_or(RwError::from(ErrorCode::SinkError(
650                        "Fail to look up iceberg partition field".into(),
651                    )))?;
652            Ok((source_f.name.clone(), f.transform))
653        })
654        .collect::<Result<Vec<_>>>()?;
655
656    Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
657        partition_type: IcebergArrowConvert.struct_from_fields(&struct_fields)?,
658        partition_fields,
659    })))
660}
661
662pub async fn handle_create_sink(
663    mut handle_args: HandlerArgs,
664    stmt: CreateSinkStatement,
665    is_iceberg_engine_internal: bool,
666) -> Result<RwPgResponse> {
667    let session = handle_args.session.clone();
668
669    session.check_cluster_limits().await?;
670
671    let mode = if stmt.or_replace {
672        prepare_replace_sink(&mut handle_args, &stmt)?
673    } else {
674        let if_not_exists = stmt.if_not_exists;
675        if let Either::Right(resp) = session.check_relation_name_duplicated(
676            stmt.sink_name.clone(),
677            StatementType::CREATE_SINK,
678            if_not_exists,
679        )? {
680            return Ok(resp);
681        }
682
683        if stmt.sink_name.base_name().starts_with(ICEBERG_SINK_PREFIX) {
684            return Err(RwError::from(ErrorCode::InvalidInputSyntax(format!(
685                "Sink name cannot start with reserved prefix '{}'",
686                ICEBERG_SINK_PREFIX
687            ))));
688        }
689
690        SinkCreateMode::Create { if_not_exists }
691    };
692
693    create_sink_or_replace(handle_args, stmt, is_iceberg_engine_internal, mode).await
694}
695
696enum SinkCreateMode {
697    Create { if_not_exists: bool },
698    Replace { original_sink: Arc<SinkCatalog> },
699}
700
701impl SinkCreateMode {
702    fn statement_name(&self) -> &'static str {
703        match self {
704            SinkCreateMode::Create { .. } => "CREATE SINK",
705            SinkCreateMode::Replace { .. } => "REPLACE SINK",
706        }
707    }
708}
709
710async fn create_sink_or_replace(
711    mut handle_args: HandlerArgs,
712    stmt: CreateSinkStatement,
713    is_iceberg_engine_internal: bool,
714    mode: SinkCreateMode,
715) -> Result<RwPgResponse> {
716    let session = handle_args.session.clone();
717
718    let resource_type =
719        resolve_streaming_job_resource_type(session.as_ref(), &mut handle_args.with_options)?;
720
721    let (sink, graph, dependencies, since_timestamp_epoch) = {
722        let backfill_order_strategy = handle_args.with_options.backfill_order_strategy();
723        let SinkPlanContext {
724            query,
725            sink_plan: plan,
726            sink_catalog: mut sink,
727            target_table_catalog,
728            dependencies,
729            since_timestamp_epoch,
730        } = gen_sink_plan(handle_args, stmt, None, is_iceberg_engine_internal).await?;
731
732        let has_order_by = !query.order_by.is_empty();
733        if has_order_by {
734            plan.ctx().warn_to_user(
735                r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
736                    .to_owned(),
737            );
738        }
739
740        match &mode {
741            SinkCreateMode::Create { .. } => {
742                if let Some(table_catalog) = &target_table_catalog {
743                    sink.original_target_columns = table_catalog.columns_without_rw_timestamp();
744                }
745            }
746            SinkCreateMode::Replace { original_sink } => {
747                if target_table_catalog.is_some() {
748                    return Err(ErrorCode::NotSupported(
749                        "REPLACE SINK INTO TABLE is not supported yet".to_owned(),
750                        "replace ordinary sinks first".to_owned(),
751                    )
752                    .into());
753                }
754
755                sink.schema_id = original_sink.schema_id;
756                sink.database_id = original_sink.database_id;
757                sink.name = original_sink.name.clone();
758                sink.owner = original_sink.owner;
759            }
760        }
761
762        let backfill_order =
763            plan_backfill_order(session.as_ref(), backfill_order_strategy, plan.clone())?;
764        let graph =
765            build_graph_with_strategy(plan, Some(GraphJobType::Sink), Some(backfill_order))?;
766
767        (sink, graph, dependencies, since_timestamp_epoch)
768    };
769
770    let statement_name = mode.statement_name();
771    let catalog_writer = session.catalog_writer()?;
772    match mode {
773        SinkCreateMode::Create { if_not_exists } => {
774            let _job_guard = session.env().creating_streaming_job_tracker().guard(
775                CreatingStreamingJobInfo::new(
776                    session.session_id(),
777                    sink.database_id,
778                    sink.schema_id,
779                    sink.name.clone(),
780                ),
781            );
782
783            execute_with_long_running_notification(
784                catalog_writer.create_sink(
785                    sink.to_proto(),
786                    graph,
787                    dependencies,
788                    resource_type,
789                    if_not_exists,
790                    since_timestamp_epoch,
791                ),
792                &session,
793                statement_name,
794                LongRunningNotificationAction::MonitorBackfillJob,
795            )
796            .await?;
797        }
798        SinkCreateMode::Replace { original_sink } => {
799            let original_sink_id = original_sink.id;
800            execute_with_long_running_notification(
801                catalog_writer.replace_sink(
802                    original_sink_id,
803                    sink.to_proto(),
804                    graph,
805                    dependencies,
806                    resource_type,
807                ),
808                &session,
809                statement_name,
810                LongRunningNotificationAction::DiagnoseBarrierLatency,
811            )
812            .await?;
813
814            tracing::info!(
815                old_sink_id = %original_sink_id,
816                sink_name = %sink.name,
817                "replace sink plan submitted"
818            );
819        }
820    }
821
822    Ok(PgResponse::empty_result(StatementType::CREATE_SINK))
823}
824
825fn sink_replace_requires_exactly_once_state(sink: &SinkCatalog) -> bool {
826    match sink.properties.get("is_exactly_once") {
827        Some(value) => value.eq_ignore_ascii_case("true"),
828        None => sink
829            .properties
830            .get(CONNECTOR_TYPE_KEY)
831            .is_some_and(|connector| connector.eq_ignore_ascii_case(ICEBERG_SINK)),
832    }
833}
834
835fn prepare_replace_sink(
836    handle_args: &mut HandlerArgs,
837    stmt: &CreateSinkStatement,
838) -> Result<SinkCreateMode> {
839    let session = handle_args.session.clone();
840    if stmt.if_not_exists {
841        return Err(ErrorCode::InvalidInputSyntax(
842            "REPLACE SINK does not support IF NOT EXISTS".to_owned(),
843        )
844        .into());
845    }
846    if !matches!(&stmt.sink_from, CreateSink::From(_)) {
847        return Err(ErrorCode::NotSupported(
848            "REPLACE SINK currently only supports REPLACE SINK ... FROM table_or_mv".to_owned(),
849            "use REPLACE SINK name FROM existing_relation ...".to_owned(),
850        )
851        .into());
852    }
853    if stmt.into_table_name.is_some() {
854        return Err(ErrorCode::NotSupported(
855            "REPLACE SINK INTO TABLE is not supported yet".to_owned(),
856            "replace ordinary sinks first".to_owned(),
857        )
858        .into());
859    }
860    if handle_args
861        .with_options
862        .get(AUTO_SCHEMA_CHANGE_KEY)
863        .is_some_and(|value| value.eq_ignore_ascii_case("true"))
864    {
865        return Err(ErrorCode::NotSupported(
866            "REPLACE SINK with auto schema change is not supported yet".to_owned(),
867            "disable auto schema change for this replacement".to_owned(),
868        )
869        .into());
870    }
871    if handle_args
872        .with_options
873        .contains_key(SINK_SINCE_TIMESTAMP_OPTION)
874    {
875        return Err(ErrorCode::NotSupported(
876            "REPLACE SINK with since_timestamp is not supported yet".to_owned(),
877            "create a new sink with since_timestamp instead".to_owned(),
878        )
879        .into());
880    }
881    match handle_args.with_options.get(SINK_SNAPSHOT_OPTION) {
882        Some(value) if !value.eq_ignore_ascii_case("false") => {
883            return Err(ErrorCode::InvalidInputSyntax(
884                "REPLACE SINK must not enable snapshot backfill".to_owned(),
885            )
886            .into());
887        }
888        Some(_) => {}
889        None => {
890            handle_args
891                .with_options
892                .insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
893        }
894    }
895
896    let db_name = session.database();
897    let (sink_schema_name, sink_table_name) =
898        Binder::resolve_schema_qualified_name(&db_name, &stmt.sink_name)?;
899    let original_sink = {
900        let search_path = session.config().search_path();
901        let user_name = session.user_name();
902        let schema_path = SchemaPath::new(sink_schema_name.as_deref(), &search_path, &user_name);
903        let reader = session.env().catalog_reader().read_guard();
904        let (sink, schema_name) =
905            reader.get_created_sink_by_name(&db_name, schema_path, &sink_table_name)?;
906        session.check_privilege_for_drop_alter(schema_name, &**sink)?;
907        if sink.target_table.is_some() {
908            return Err(ErrorCode::NotSupported(
909                "REPLACE SINK INTO TABLE is not supported yet".to_owned(),
910                "replace ordinary sinks first".to_owned(),
911            )
912            .into());
913        }
914        if sink.auto_refresh_schema_from_table.is_some() {
915            return Err(ErrorCode::NotSupported(
916                "REPLACE SINK with auto schema change is not supported yet".to_owned(),
917                "drop and recreate this auto schema change sink".to_owned(),
918            )
919            .into());
920        }
921        if sink_replace_requires_exactly_once_state(sink) {
922            return Err(ErrorCode::NotSupported(
923                "REPLACE SINK does not support exactly-once sinks yet".to_owned(),
924                "set is_exactly_once=false or recreate the sink manually".to_owned(),
925            )
926            .into());
927        }
928        sink.clone()
929    };
930
931    Ok(SinkCreateMode::Replace { original_sink })
932}
933
934pub fn fetch_incoming_sinks(
935    session: &Arc<SessionImpl>,
936    table: &TableCatalog,
937) -> Result<Vec<Arc<SinkCatalog>>> {
938    let reader = session.env().catalog_reader().read_guard();
939    let schema = reader.get_schema_by_id(table.database_id, table.schema_id)?;
940    let Some(incoming_sinks) = schema.table_incoming_sinks(table.id) else {
941        return Ok(vec![]);
942    };
943    let mut sinks = vec![];
944    for sink_id in incoming_sinks {
945        sinks.push(
946            schema
947                .get_sink_by_id(*sink_id)
948                .expect("should exist")
949                .clone(),
950        );
951    }
952    Ok(sinks)
953}
954
955fn derive_sink_to_table_expr(
956    sink_schema: &Schema,
957    idx: usize,
958    target_type: &DataType,
959) -> Result<ExprImpl> {
960    let input_type = &sink_schema.fields()[idx].data_type;
961
962    if !target_type.equals_datatype(input_type) {
963        bail!(
964            "column type mismatch: {:?} vs {:?}, column name: {:?}",
965            target_type,
966            input_type,
967            sink_schema.fields()[idx].name
968        );
969    } else {
970        Ok(ExprImpl::InputRef(Box::new(InputRef::new(
971            idx,
972            input_type.clone(),
973        ))))
974    }
975}
976
977pub(crate) fn derive_default_column_project_for_sink(
978    sink: &SinkCatalog,
979    sink_schema: &Schema,
980    columns: &[ColumnCatalog],
981    user_specified_columns: bool,
982) -> Result<Vec<ExprImpl>> {
983    assert_eq!(sink.full_schema().len(), sink_schema.len());
984
985    let default_column_exprs = TableCatalog::default_column_exprs(columns);
986
987    let mut exprs = vec![];
988
989    let sink_visible_col_idxes = sink
990        .full_columns()
991        .iter()
992        .positions(|c| !c.is_hidden())
993        .collect_vec();
994    let sink_visible_col_idxes_by_name = sink
995        .full_columns()
996        .iter()
997        .enumerate()
998        .filter(|(_, c)| !c.is_hidden())
999        .map(|(i, c)| (c.name(), i))
1000        .collect::<BTreeMap<_, _>>();
1001
1002    for (idx, column) in columns.iter().enumerate() {
1003        if !column.can_dml() {
1004            continue;
1005        }
1006
1007        let default_col_expr =
1008            || -> ExprImpl { rewrite_now_to_proctime(default_column_exprs[idx].clone()) };
1009
1010        let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
1011            derive_sink_to_table_expr(sink_schema, sink_col_idx, column.data_type())
1012        };
1013
1014        // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly.
1015        // The missing columns will be filled with default value (`null` if not explicitly defined).
1016        // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table.
1017        if user_specified_columns {
1018            if let Some(idx) = sink_visible_col_idxes_by_name.get(column.name()) {
1019                exprs.push(sink_col_expr(*idx)?);
1020            } else {
1021                exprs.push(default_col_expr());
1022            }
1023        } else {
1024            if idx < sink_visible_col_idxes.len() {
1025                exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
1026            } else {
1027                exprs.push(default_col_expr());
1028            };
1029        }
1030    }
1031    Ok(exprs)
1032}
1033
1034/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
1035/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`]
1036/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
1037fn bind_sink_format_desc(
1038    session: &SessionImpl,
1039    value: FormatEncodeOptions,
1040) -> Result<SinkFormatDesc> {
1041    use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
1042    use risingwave_connector::sink::encoder::TimestamptzHandlingMode;
1043    use risingwave_sqlparser::ast::{Encode as E, Format as F};
1044
1045    let format = match value.format {
1046        F::Plain => SinkFormat::AppendOnly,
1047        F::Upsert => SinkFormat::Upsert,
1048        F::Debezium => SinkFormat::Debezium,
1049        f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => {
1050            return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into());
1051        }
1052    };
1053    let encode = match value.row_encode {
1054        E::Json => SinkEncode::Json,
1055        E::Protobuf => SinkEncode::Protobuf,
1056        E::Avro => SinkEncode::Avro,
1057        E::Template => SinkEncode::Template,
1058        E::Parquet => SinkEncode::Parquet,
1059        E::Bytes => SinkEncode::Bytes,
1060        e @ (E::Native | E::Csv | E::None | E::Text) => {
1061            return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into());
1062        }
1063    };
1064
1065    let mut key_encode = None;
1066    if let Some(encode) = value.key_encode {
1067        match encode {
1068            E::Text => key_encode = Some(SinkEncode::Text),
1069            E::Bytes => key_encode = Some(SinkEncode::Bytes),
1070            _ => {
1071                return Err(ErrorCode::BindError(format!(
1072                    "sink key encode unsupported: {encode}, only TEXT and BYTES supported"
1073                ))
1074                .into());
1075            }
1076        }
1077    }
1078
1079    let (props, connection_type_flag, schema_registry_conn_ref) =
1080        resolve_connection_ref_and_secret_ref(
1081            WithOptions::try_from(value.row_options.as_slice())?,
1082            session,
1083            Some(TelemetryDatabaseObject::Sink),
1084        )?;
1085    ensure_connection_type_allowed(
1086        connection_type_flag,
1087        &SINK_ALLOWED_CONNECTION_SCHEMA_REGISTRY,
1088    )?;
1089    let (mut options, secret_refs) = props.into_parts();
1090
1091    options
1092        .entry(TimestamptzHandlingMode::OPTION_KEY.to_owned())
1093        .or_insert(TimestamptzHandlingMode::FRONTEND_DEFAULT.to_owned());
1094
1095    Ok(SinkFormatDesc {
1096        format,
1097        encode,
1098        options,
1099        secret_refs,
1100        key_encode,
1101        connection_id: schema_registry_conn_ref,
1102    })
1103}
1104
1105static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
1106    LazyLock::new(|| {
1107        use risingwave_connector::sink::Sink as _;
1108        use risingwave_connector::sink::file_sink::azblob::AzblobSink;
1109        use risingwave_connector::sink::file_sink::fs::FsSink;
1110        use risingwave_connector::sink::file_sink::gcs::GcsSink;
1111        use risingwave_connector::sink::file_sink::opendal_sink::FileSink;
1112        use risingwave_connector::sink::file_sink::s3::S3Sink;
1113        use risingwave_connector::sink::file_sink::webhdfs::WebhdfsSink;
1114        use risingwave_connector::sink::google_pubsub::GooglePubSubSink;
1115        use risingwave_connector::sink::kafka::KafkaSink;
1116        use risingwave_connector::sink::kinesis::KinesisSink;
1117        use risingwave_connector::sink::mqtt::MqttSink;
1118        use risingwave_connector::sink::pulsar::PulsarSink;
1119        use risingwave_connector::sink::redis::RedisSink;
1120
1121        convert_args!(hashmap!(
1122                GooglePubSubSink::SINK_NAME => hashmap!(
1123                    Format::Plain => vec![Encode::Json],
1124                ),
1125                KafkaSink::SINK_NAME => hashmap!(
1126                    Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf, Encode::Bytes],
1127                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
1128                    Format::Debezium => vec![Encode::Json],
1129                ),
1130                FileSink::<S3Sink>::SINK_NAME => hashmap!(
1131                    Format::Plain => vec![Encode::Parquet, Encode::Json],
1132                ),
1133                FileSink::<SnowflakeSink>::SINK_NAME => hashmap!(
1134                    Format::Plain => vec![Encode::Parquet, Encode::Json],
1135                ),
1136                FileSink::<GcsSink>::SINK_NAME => hashmap!(
1137                    Format::Plain => vec![Encode::Parquet, Encode::Json],
1138                ),
1139                FileSink::<AzblobSink>::SINK_NAME => hashmap!(
1140                    Format::Plain => vec![Encode::Parquet, Encode::Json],
1141                ),
1142                FileSink::<WebhdfsSink>::SINK_NAME => hashmap!(
1143                    Format::Plain => vec![Encode::Parquet, Encode::Json],
1144                ),
1145                FileSink::<FsSink>::SINK_NAME => hashmap!(
1146                    Format::Plain => vec![Encode::Parquet, Encode::Json],
1147                ),
1148                KinesisSink::SINK_NAME => hashmap!(
1149                    Format::Plain => vec![Encode::Json],
1150                    Format::Upsert => vec![Encode::Json],
1151                    Format::Debezium => vec![Encode::Json],
1152                ),
1153                MqttSink::SINK_NAME => hashmap!(
1154                    Format::Plain => vec![Encode::Json, Encode::Protobuf],
1155                ),
1156                PulsarSink::SINK_NAME => hashmap!(
1157                    Format::Plain => vec![Encode::Json],
1158                    Format::Upsert => vec![Encode::Json],
1159                    Format::Debezium => vec![Encode::Json],
1160                ),
1161                RedisSink::SINK_NAME => hashmap!(
1162                    Format::Plain => vec![Encode::Json, Encode::Template],
1163                    Format::Upsert => vec![Encode::Json, Encode::Template],
1164                ),
1165        ))
1166    });
1167
1168pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions) -> Result<()> {
1169    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
1170        .get(connector)
1171        .ok_or_else(|| {
1172            ErrorCode::BindError(format!(
1173                "connector {} is not supported by FORMAT ... ENCODE ... syntax",
1174                connector
1175            ))
1176        })?;
1177    let compatible_encodes = compatible_formats.get(&format_desc.format).ok_or_else(|| {
1178        ErrorCode::BindError(format!(
1179            "connector {} does not support format {:?}",
1180            connector, format_desc.format
1181        ))
1182    })?;
1183    if !compatible_encodes.contains(&format_desc.row_encode) {
1184        return Err(ErrorCode::BindError(format!(
1185            "connector {} does not support format {:?} with encode {:?}",
1186            connector, format_desc.format, format_desc.row_encode
1187        ))
1188        .into());
1189    }
1190
1191    // only allow Kafka connector work with `bytes` as key encode
1192    if let Some(encode) = &format_desc.key_encode
1193        && connector != KAFKA_SINK
1194        && matches!(encode, Encode::Bytes)
1195    {
1196        return Err(ErrorCode::BindError(format!(
1197            "key encode bytes only works with kafka connector, but found {}",
1198            connector
1199        ))
1200        .into());
1201    }
1202
1203    Ok(())
1204}
1205
1206#[cfg(test)]
1207pub mod tests {
1208    use risingwave_common::catalog::{CreateType, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
1209    use risingwave_common::config::FrontendConfig;
1210
1211    use crate::catalog::root_catalog::SchemaPath;
1212    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1213
1214    #[tokio::test]
1215    async fn test_create_sink_handler() {
1216        let proto_file = create_proto_file(PROTO_FILE_DATA);
1217        let sql = format!(
1218            r#"CREATE SOURCE t1
1219    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
1220    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1221            proto_file.path().to_str().unwrap()
1222        );
1223        let frontend = LocalFrontend::new(Default::default()).await;
1224        frontend.run_sql(sql).await.unwrap();
1225
1226        let sql = "create materialized view mv1 as select t1.country from t1;";
1227        frontend.run_sql(sql).await.unwrap();
1228
1229        let sql = r#"CREATE SINK snk1 FROM mv1
1230                    WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
1231                        '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
1232                        mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
1233        frontend.run_sql(sql).await.unwrap();
1234
1235        let session = frontend.session_ref();
1236        let catalog_reader = session.env().catalog_reader().read_guard();
1237        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1238
1239        // Check source exists.
1240        let (source, _) = catalog_reader
1241            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
1242            .unwrap();
1243        assert_eq!(source.name, "t1");
1244
1245        // Check table exists.
1246        let (table, schema_name) = catalog_reader
1247            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
1248            .unwrap();
1249        assert_eq!(table.name(), "mv1");
1250        let schema_name = schema_name.to_owned();
1251
1252        // Check sink exists.
1253        let (sink, _) = catalog_reader
1254            .get_created_sink_by_name(
1255                DEFAULT_DATABASE_NAME,
1256                SchemaPath::Name(&schema_name),
1257                "snk1",
1258            )
1259            .unwrap();
1260        assert_eq!(sink.name, "snk1");
1261        drop(catalog_reader);
1262
1263        let sql = r#"REPLACE SINK snk1 FROM mv1
1264                    WITH (connector = 'jdbc', mysql.endpoint = '127.0.0.1:3306', mysql.table =
1265                        '<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
1266                        mysql.password = '<password>', type = 'append-only', force_append_only = 'true');"#.to_owned();
1267        frontend.run_sql(sql).await.unwrap();
1268
1269        let catalog_reader = session.env().catalog_reader().read_guard();
1270        let (sink, _) = catalog_reader
1271            .get_created_sink_by_name(
1272                DEFAULT_DATABASE_NAME,
1273                SchemaPath::Name(&schema_name),
1274                "snk1",
1275            )
1276            .unwrap();
1277        assert_eq!(sink.name, "snk1");
1278        // Frontend leaves the replacement job foreground for the meta foreground wait path.
1279        // Meta switches it to background when marking the job Creating during cutover.
1280        assert_eq!(sink.create_type, CreateType::Foreground);
1281    }
1282
1283    #[tokio::test]
1284    async fn test_create_fs_sink_requires_frontend_config() {
1285        let frontend = LocalFrontend::with_frontend_config(
1286            Default::default(),
1287            FrontendConfig {
1288                unsafe_enable_local_fs_connector: false,
1289                ..Default::default()
1290            },
1291        )
1292        .await;
1293        frontend.run_sql("CREATE TABLE t(v int);").await.unwrap();
1294        frontend
1295            .run_sql("CREATE MATERIALIZED VIEW mv AS SELECT * FROM t;")
1296            .await
1297            .unwrap();
1298
1299        let err = frontend
1300            .run_sql(
1301                r#"CREATE SINK local_sink FROM mv
1302                    WITH (
1303                        connector = 'fs',
1304                        fs.path = '/tmp/rw-local-sink',
1305                        type = 'append-only',
1306                        force_append_only = 'true'
1307                    ) FORMAT PLAIN ENCODE JSON (force_append_only = 'true');"#
1308                    .to_owned(),
1309            )
1310            .await
1311            .unwrap_err();
1312
1313        assert!(
1314            err.to_string()
1315                .contains("frontend.unsafe_enable_local_fs_connector = true"),
1316            "{err:?}"
1317        );
1318    }
1319}