1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
45use risingwave_meta_model::object::ObjectType;
46use risingwave_meta_model::{
47 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
48 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
49};
50use risingwave_pb::catalog::{
51 Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
52 Subscription, Table, View,
53};
54use risingwave_pb::common::PbActorLocation;
55use risingwave_pb::ddl_service::alter_owner_request::Object;
56use risingwave_pb::ddl_service::{
57 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
58 alter_swap_rename_request,
59};
60use risingwave_pb::meta::table_fragments::PbActorStatus;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64 StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
68use strum::Display;
69use thiserror_ext::AsReport;
70use tokio::sync::{OwnedSemaphorePermit, Semaphore};
71use tokio::time::sleep;
72use tracing::Instrument;
73
74use crate::barrier::BarrierManagerRef;
75use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
76use crate::controller::cluster::StreamingClusterInfo;
77use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
78use crate::controller::utils::build_select_node_list;
79use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
80use crate::manager::{
81 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
82 NotificationVersion, StreamingJob, StreamingJobType,
83};
84use crate::model::{
85 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
86 FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
87 TableParallelism,
88};
89use crate::stream::cdc::{
90 is_parallelized_backfill_enabled, try_init_parallel_cdc_table_snapshot_splits,
91};
92use crate::stream::{
93 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
94 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
95 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
96 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
97 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
98 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
99};
100use crate::telemetry::report_event;
101use crate::{MetaError, MetaResult};
102
103#[derive(PartialEq)]
104pub enum DropMode {
105 Restrict,
106 Cascade,
107}
108
109impl DropMode {
110 pub fn from_request_setting(cascade: bool) -> DropMode {
111 if cascade {
112 DropMode::Cascade
113 } else {
114 DropMode::Restrict
115 }
116 }
117}
118
119#[derive(strum::AsRefStr)]
120pub enum StreamingJobId {
121 MaterializedView(TableId),
122 Sink(SinkId),
123 Table(Option<SourceId>, TableId),
124 Index(IndexId),
125}
126
127impl std::fmt::Display for StreamingJobId {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 write!(f, "{}", self.as_ref())?;
130 write!(f, "({})", self.id())
131 }
132}
133
134impl StreamingJobId {
135 fn id(&self) -> JobId {
136 match self {
137 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
138 StreamingJobId::Index(id) => id.as_job_id(),
139 StreamingJobId::Sink(id) => id.as_job_id(),
140 }
141 }
142}
143
144pub struct ReplaceStreamJobInfo {
147 pub streaming_job: StreamingJob,
148 pub fragment_graph: StreamFragmentGraphProto,
149}
150
151#[derive(Display)]
152pub enum DdlCommand {
153 CreateDatabase(Database),
154 DropDatabase(DatabaseId),
155 CreateSchema(Schema),
156 DropSchema(SchemaId, DropMode),
157 CreateNonSharedSource(Source),
158 DropSource(SourceId, DropMode),
159 CreateFunction(Function),
160 DropFunction(FunctionId, DropMode),
161 CreateView(View, HashSet<ObjectId>),
162 DropView(ViewId, DropMode),
163 CreateStreamingJob {
164 stream_job: StreamingJob,
165 fragment_graph: StreamFragmentGraphProto,
166 dependencies: HashSet<ObjectId>,
167 specific_resource_group: Option<String>, if_not_exists: bool,
169 },
170 DropStreamingJob {
171 job_id: StreamingJobId,
172 drop_mode: DropMode,
173 },
174 AlterName(alter_name_request::Object, String),
175 AlterSwapRename(alter_swap_rename_request::Object),
176 ReplaceStreamJob(ReplaceStreamJobInfo),
177 AlterNonSharedSource(Source),
178 AlterObjectOwner(Object, UserId),
179 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
180 CreateConnection(Connection),
181 DropConnection(ConnectionId, DropMode),
182 CreateSecret(Secret),
183 AlterSecret(Secret),
184 DropSecret(SecretId),
185 CommentOn(Comment),
186 CreateSubscription(Subscription),
187 DropSubscription(SubscriptionId, DropMode),
188 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
189 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
190}
191
192impl DdlCommand {
193 fn object(&self) -> Either<String, ObjectId> {
195 use Either::*;
196 match self {
197 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
198 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
199 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
200 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
201 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
202 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
203 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
204 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
205 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
206 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
207 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
208 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
209 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
210 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
211 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
212 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
213 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
214 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
215 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
216 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
217 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
218 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
219 DdlCommand::DropSecret(id) => Right(id.as_object_id()),
220 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
221 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
222 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
223 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
224 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
225 }
226 }
227
228 fn allow_in_recovery(&self) -> bool {
229 match self {
230 DdlCommand::DropDatabase(_)
231 | DdlCommand::DropSchema(_, _)
232 | DdlCommand::DropSource(_, _)
233 | DdlCommand::DropFunction(_, _)
234 | DdlCommand::DropView(_, _)
235 | DdlCommand::DropStreamingJob { .. }
236 | DdlCommand::DropConnection(_, _)
237 | DdlCommand::DropSecret(_)
238 | DdlCommand::DropSubscription(_, _)
239 | DdlCommand::AlterName(_, _)
240 | DdlCommand::AlterObjectOwner(_, _)
241 | DdlCommand::AlterSetSchema(_, _)
242 | DdlCommand::CreateDatabase(_)
243 | DdlCommand::CreateSchema(_)
244 | DdlCommand::CreateFunction(_)
245 | DdlCommand::CreateView(_, _)
246 | DdlCommand::CreateConnection(_)
247 | DdlCommand::CommentOn(_)
248 | DdlCommand::CreateSecret(_)
249 | DdlCommand::AlterSecret(_)
250 | DdlCommand::AlterSwapRename(_)
251 | DdlCommand::AlterDatabaseParam(_, _)
252 | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
253 DdlCommand::CreateStreamingJob { .. }
254 | DdlCommand::CreateNonSharedSource(_)
255 | DdlCommand::ReplaceStreamJob(_)
256 | DdlCommand::AlterNonSharedSource(_)
257 | DdlCommand::CreateSubscription(_) => false,
258 }
259 }
260}
261
262#[derive(Clone)]
263pub struct DdlController {
264 pub(crate) env: MetaSrvEnv,
265
266 pub(crate) metadata_manager: MetadataManager,
267 pub(crate) stream_manager: GlobalStreamManagerRef,
268 pub(crate) source_manager: SourceManagerRef,
269 barrier_manager: BarrierManagerRef,
270
271 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
273
274 seq: Arc<AtomicU64>,
276}
277
278#[derive(Clone)]
279pub struct CreatingStreamingJobPermit {
280 pub(crate) semaphore: Arc<Semaphore>,
281}
282
283impl CreatingStreamingJobPermit {
284 async fn new(env: &MetaSrvEnv) -> Self {
285 let mut permits = env
286 .system_params_reader()
287 .await
288 .max_concurrent_creating_streaming_jobs() as usize;
289 if permits == 0 {
290 permits = Semaphore::MAX_PERMITS;
292 }
293 let semaphore = Arc::new(Semaphore::new(permits));
294
295 let (local_notification_tx, mut local_notification_rx) =
296 tokio::sync::mpsc::unbounded_channel();
297 env.notification_manager()
298 .insert_local_sender(local_notification_tx);
299 let semaphore_clone = semaphore.clone();
300 tokio::spawn(async move {
301 while let Some(notification) = local_notification_rx.recv().await {
302 let LocalNotification::SystemParamsChange(p) = ¬ification else {
303 continue;
304 };
305 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
306 if new_permits == 0 {
307 new_permits = Semaphore::MAX_PERMITS;
308 }
309 match permits.cmp(&new_permits) {
310 Ordering::Less => {
311 semaphore_clone.add_permits(new_permits - permits);
312 }
313 Ordering::Equal => continue,
314 Ordering::Greater => {
315 let to_release = permits - new_permits;
316 let reduced = semaphore_clone.forget_permits(to_release);
317 if reduced != to_release {
319 tracing::warn!(
320 "no enough permits to release, expected {}, but reduced {}",
321 to_release,
322 reduced
323 );
324 }
325 }
326 }
327 tracing::info!(
328 "max_concurrent_creating_streaming_jobs changed from {} to {}",
329 permits,
330 new_permits
331 );
332 permits = new_permits;
333 }
334 });
335
336 Self { semaphore }
337 }
338}
339
340impl DdlController {
341 pub async fn new(
342 env: MetaSrvEnv,
343 metadata_manager: MetadataManager,
344 stream_manager: GlobalStreamManagerRef,
345 source_manager: SourceManagerRef,
346 barrier_manager: BarrierManagerRef,
347 ) -> Self {
348 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
349 Self {
350 env,
351 metadata_manager,
352 stream_manager,
353 source_manager,
354 barrier_manager,
355 creating_streaming_job_permits,
356 seq: Arc::new(AtomicU64::new(0)),
357 }
358 }
359
360 pub fn next_seq(&self) -> u64 {
362 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
364 }
365
366 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
373 if !command.allow_in_recovery() {
374 self.barrier_manager.check_status_running()?;
375 }
376
377 let await_tree_key = format!("DDL Command {}", self.next_seq());
378 let await_tree_span = await_tree::span!("{command}({})", command.object());
379
380 let ctrl = self.clone();
381 let fut = async move {
382 match command {
383 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
384 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
385 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
386 DdlCommand::DropSchema(schema_id, drop_mode) => {
387 ctrl.drop_schema(schema_id, drop_mode).await
388 }
389 DdlCommand::CreateNonSharedSource(source) => {
390 ctrl.create_non_shared_source(source).await
391 }
392 DdlCommand::DropSource(source_id, drop_mode) => {
393 ctrl.drop_source(source_id, drop_mode).await
394 }
395 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
396 DdlCommand::DropFunction(function_id, drop_mode) => {
397 ctrl.drop_function(function_id, drop_mode).await
398 }
399 DdlCommand::CreateView(view, dependencies) => {
400 ctrl.create_view(view, dependencies).await
401 }
402 DdlCommand::DropView(view_id, drop_mode) => {
403 ctrl.drop_view(view_id, drop_mode).await
404 }
405 DdlCommand::CreateStreamingJob {
406 stream_job,
407 fragment_graph,
408 dependencies,
409 specific_resource_group,
410 if_not_exists,
411 } => {
412 ctrl.create_streaming_job(
413 stream_job,
414 fragment_graph,
415 dependencies,
416 specific_resource_group,
417 if_not_exists,
418 )
419 .await
420 }
421 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
422 ctrl.drop_streaming_job(job_id, drop_mode).await
423 }
424 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
425 streaming_job,
426 fragment_graph,
427 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
428 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
429 DdlCommand::AlterObjectOwner(object, owner_id) => {
430 ctrl.alter_owner(object, owner_id).await
431 }
432 DdlCommand::AlterSetSchema(object, new_schema_id) => {
433 ctrl.alter_set_schema(object, new_schema_id).await
434 }
435 DdlCommand::CreateConnection(connection) => {
436 ctrl.create_connection(connection).await
437 }
438 DdlCommand::DropConnection(connection_id, drop_mode) => {
439 ctrl.drop_connection(connection_id, drop_mode).await
440 }
441 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
442 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
443 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
444 DdlCommand::AlterNonSharedSource(source) => {
445 ctrl.alter_non_shared_source(source).await
446 }
447 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
448 DdlCommand::CreateSubscription(subscription) => {
449 ctrl.create_subscription(subscription).await
450 }
451 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
452 ctrl.drop_subscription(subscription_id, drop_mode).await
453 }
454 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
455 DdlCommand::AlterDatabaseParam(database_id, param) => {
456 ctrl.alter_database_param(database_id, param).await
457 }
458 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
459 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
460 .await
461 }
462 }
463 }
464 .in_current_span();
465 let fut = (self.env.await_tree_reg())
466 .register(await_tree_key, await_tree_span)
467 .instrument(Box::pin(fut));
468 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
469 Ok(Some(WaitVersion {
470 catalog_version: notification_version,
471 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
472 }))
473 }
474
475 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
476 self.barrier_manager.get_ddl_progress().await
477 }
478
479 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
480 let (version, updated_db) = self
481 .metadata_manager
482 .catalog_controller
483 .create_database(database)
484 .await?;
485 self.barrier_manager
487 .update_database_barrier(
488 updated_db.database_id,
489 updated_db.barrier_interval_ms.map(|v| v as u32),
490 updated_db.checkpoint_frequency.map(|v| v as u64),
491 )
492 .await?;
493 Ok(version)
494 }
495
496 #[tracing::instrument(skip(self), level = "debug")]
497 pub async fn reschedule_streaming_job(
498 &self,
499 job_id: JobId,
500 target: ReschedulePolicy,
501 mut deferred: bool,
502 ) -> MetaResult<()> {
503 tracing::info!("altering parallelism for job {}", job_id);
504 if self.barrier_manager.check_status_running().is_err() {
505 tracing::info!(
506 "alter parallelism is set to deferred mode because the system is in recovery state"
507 );
508 deferred = true;
509 }
510
511 self.stream_manager
512 .reschedule_streaming_job(job_id, target, deferred)
513 .await
514 }
515
516 pub async fn reschedule_cdc_table_backfill(
517 &self,
518 job_id: JobId,
519 target: ReschedulePolicy,
520 ) -> MetaResult<()> {
521 tracing::info!("alter CDC table backfill parallelism");
522 if self.barrier_manager.check_status_running().is_err() {
523 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
524 }
525 self.stream_manager
526 .reschedule_cdc_table_backfill(job_id, target)
527 .await
528 }
529
530 pub async fn reschedule_fragments(
531 &self,
532 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
533 ) -> MetaResult<()> {
534 tracing::info!(
535 "altering parallelism for fragments {:?}",
536 fragment_targets.keys()
537 );
538 let fragment_targets = fragment_targets
539 .into_iter()
540 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
541 .collect();
542
543 self.stream_manager
544 .reschedule_fragments(fragment_targets)
545 .await
546 }
547
548 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
549 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
550 .await
551 }
552
553 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
554 self.metadata_manager
555 .catalog_controller
556 .create_schema(schema)
557 .await
558 }
559
560 async fn drop_schema(
561 &self,
562 schema_id: SchemaId,
563 drop_mode: DropMode,
564 ) -> MetaResult<NotificationVersion> {
565 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
566 .await
567 }
568
569 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
571 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
572 .await
573 .context("failed to create source worker")?;
574
575 let (source_id, version) = self
576 .metadata_manager
577 .catalog_controller
578 .create_source(source)
579 .await?;
580 self.source_manager
581 .register_source_with_handle(source_id, handle)
582 .await;
583 Ok(version)
584 }
585
586 async fn drop_source(
587 &self,
588 source_id: SourceId,
589 drop_mode: DropMode,
590 ) -> MetaResult<NotificationVersion> {
591 self.drop_object(ObjectType::Source, source_id, drop_mode)
592 .await
593 }
594
595 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
598 self.metadata_manager
599 .catalog_controller
600 .alter_non_shared_source(source)
601 .await
602 }
603
604 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
605 self.metadata_manager
606 .catalog_controller
607 .create_function(function)
608 .await
609 }
610
611 async fn drop_function(
612 &self,
613 function_id: FunctionId,
614 drop_mode: DropMode,
615 ) -> MetaResult<NotificationVersion> {
616 self.drop_object(ObjectType::Function, function_id, drop_mode)
617 .await
618 }
619
620 async fn create_view(
621 &self,
622 view: View,
623 dependencies: HashSet<ObjectId>,
624 ) -> MetaResult<NotificationVersion> {
625 self.metadata_manager
626 .catalog_controller
627 .create_view(view, dependencies)
628 .await
629 }
630
631 async fn drop_view(
632 &self,
633 view_id: ViewId,
634 drop_mode: DropMode,
635 ) -> MetaResult<NotificationVersion> {
636 self.drop_object(ObjectType::View, view_id, drop_mode).await
637 }
638
639 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
640 validate_connection(&connection).await?;
641 self.metadata_manager
642 .catalog_controller
643 .create_connection(connection)
644 .await
645 }
646
647 async fn drop_connection(
648 &self,
649 connection_id: ConnectionId,
650 drop_mode: DropMode,
651 ) -> MetaResult<NotificationVersion> {
652 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
653 .await
654 }
655
656 async fn alter_database_param(
657 &self,
658 database_id: DatabaseId,
659 param: AlterDatabaseParam,
660 ) -> MetaResult<NotificationVersion> {
661 let (version, updated_db) = self
662 .metadata_manager
663 .catalog_controller
664 .alter_database_param(database_id, param)
665 .await?;
666 self.barrier_manager
668 .update_database_barrier(
669 database_id,
670 updated_db.barrier_interval_ms.map(|v| v as u32),
671 updated_db.checkpoint_frequency.map(|v| v as u64),
672 )
673 .await?;
674 Ok(version)
675 }
676
677 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
680 let secret_store_private_key = self
681 .env
682 .opts
683 .secret_store_private_key
684 .clone()
685 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
686
687 let encrypted_payload = SecretEncryption::encrypt(
688 secret_store_private_key.as_slice(),
689 secret.get_value().as_slice(),
690 )
691 .context(format!("failed to encrypt secret {}", secret.name))?;
692 Ok(encrypted_payload
693 .serialize()
694 .context(format!("failed to serialize secret {}", secret.name))?)
695 }
696
697 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
698 let secret_plain_payload = secret.value.clone();
701 let encrypted_payload = self.get_encrypted_payload(&secret)?;
702 secret.value = encrypted_payload;
703
704 self.metadata_manager
705 .catalog_controller
706 .create_secret(secret, secret_plain_payload)
707 .await
708 }
709
710 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
711 self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
712 .await
713 }
714
715 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
716 let secret_plain_payload = secret.value.clone();
717 let encrypted_payload = self.get_encrypted_payload(&secret)?;
718 secret.value = encrypted_payload;
719 self.metadata_manager
720 .catalog_controller
721 .alter_secret(secret, secret_plain_payload)
722 .await
723 }
724
725 async fn create_subscription(
726 &self,
727 mut subscription: Subscription,
728 ) -> MetaResult<NotificationVersion> {
729 tracing::debug!("create subscription");
730 let _permit = self
731 .creating_streaming_job_permits
732 .semaphore
733 .acquire()
734 .await
735 .unwrap();
736 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
737 self.metadata_manager
738 .catalog_controller
739 .create_subscription_catalog(&mut subscription)
740 .await?;
741 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
742 tracing::debug!(error = %err.as_report(), "failed to create subscription");
743 let _ = self
744 .metadata_manager
745 .catalog_controller
746 .try_abort_creating_subscription(subscription.id)
747 .await
748 .inspect_err(|e| {
749 tracing::error!(
750 error = %e.as_report(),
751 "failed to abort create subscription after failure"
752 );
753 });
754 return Err(err);
755 }
756
757 let version = self
758 .metadata_manager
759 .catalog_controller
760 .notify_create_subscription(subscription.id)
761 .await?;
762 tracing::debug!("finish create subscription");
763 Ok(version)
764 }
765
766 async fn drop_subscription(
767 &self,
768 subscription_id: SubscriptionId,
769 drop_mode: DropMode,
770 ) -> MetaResult<NotificationVersion> {
771 tracing::debug!("preparing drop subscription");
772 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
773 let subscription = self
774 .metadata_manager
775 .catalog_controller
776 .get_subscription_by_id(subscription_id)
777 .await?;
778 let table_id = subscription.dependent_table_id;
779 let database_id = subscription.database_id;
780 let (_, version) = self
781 .metadata_manager
782 .catalog_controller
783 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
784 .await?;
785 self.stream_manager
786 .drop_subscription(database_id, subscription_id, table_id)
787 .await;
788 tracing::debug!("finish drop subscription");
789 Ok(version)
790 }
791
792 #[await_tree::instrument]
794 pub(crate) async fn validate_cdc_table(
795 &self,
796 table: &Table,
797 table_fragments: &StreamJobFragments,
798 ) -> MetaResult<()> {
799 let stream_scan_fragment = table_fragments
800 .fragments
801 .values()
802 .filter(|f| {
803 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
804 || f.fragment_type_mask
805 .contains(FragmentTypeFlag::StreamCdcScan)
806 })
807 .exactly_one()
808 .ok()
809 .with_context(|| {
810 format!(
811 "expect exactly one stream scan fragment, got: {:?}",
812 table_fragments.fragments
813 )
814 })?;
815 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
816 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
817 if let Some(o) = node.options
818 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
819 {
820 } else {
822 assert_eq!(
823 stream_scan_fragment.actors.len(),
824 1,
825 "Stream scan fragment should have only one actor"
826 );
827 }
828 }
829 }
830 let mut found_cdc_scan = false;
831 match &stream_scan_fragment.nodes.node_body {
832 Some(NodeBody::StreamCdcScan(_)) => {
833 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
834 if self
835 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
836 .await?
837 {
838 found_cdc_scan = true;
839 }
840 }
841 Some(NodeBody::Project(_)) => {
843 for input in &stream_scan_fragment.nodes.input {
844 assert_parallelism(stream_scan_fragment, &input.node_body);
845 if self
846 .validate_cdc_table_inner(&input.node_body, table.id)
847 .await?
848 {
849 found_cdc_scan = true;
850 }
851 }
852 }
853 _ => {
854 bail!("Unexpected node body for stream cdc scan");
855 }
856 };
857 if !found_cdc_scan {
858 bail!("No stream cdc scan node found in stream scan fragment");
859 }
860 Ok(())
861 }
862
863 async fn validate_cdc_table_inner(
864 &self,
865 node_body: &Option<NodeBody>,
866 table_id: TableId,
867 ) -> MetaResult<bool> {
868 let meta_store = self.env.meta_store_ref();
869 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
870 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
871 {
872 let options_with_secret = WithOptionsSecResolved::new(
873 cdc_table_desc.connect_properties.clone(),
874 cdc_table_desc.secret_refs.clone(),
875 );
876
877 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
878 props.init_from_pb_cdc_table_desc(cdc_table_desc);
879
880 let _enumerator = props
882 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
883 .await?;
884
885 if is_parallelized_backfill_enabled(stream_cdc_scan) {
886 try_init_parallel_cdc_table_snapshot_splits(
888 table_id,
889 cdc_table_desc,
890 meta_store,
891 &stream_cdc_scan.options,
892 self.env.opts.cdc_table_split_init_insert_batch_size,
893 self.env.opts.cdc_table_split_init_sleep_interval_splits,
894 self.env.opts.cdc_table_split_init_sleep_duration_millis,
895 )
896 .await?;
897 }
898
899 tracing::debug!(?table_id, "validate cdc table success");
900 Ok(true)
901 } else {
902 Ok(false)
903 }
904 }
905
906 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
907 let migrated = self
908 .metadata_manager
909 .catalog_controller
910 .has_table_been_migrated(table_id)
911 .await?;
912 if !migrated {
913 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
914 } else {
915 Ok(())
916 }
917 }
918
919 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
922 pub async fn create_streaming_job(
923 &self,
924 mut streaming_job: StreamingJob,
925 fragment_graph: StreamFragmentGraphProto,
926 dependencies: HashSet<ObjectId>,
927 specific_resource_group: Option<String>,
928 if_not_exists: bool,
929 ) -> MetaResult<NotificationVersion> {
930 if let StreamingJob::Sink(sink) = &streaming_job
931 && let Some(target_table) = sink.target_table
932 {
933 self.validate_table_for_sink(target_table).await?;
934 }
935 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
936 let check_ret = self
937 .metadata_manager
938 .catalog_controller
939 .create_job_catalog(
940 &mut streaming_job,
941 &ctx,
942 &fragment_graph.parallelism,
943 fragment_graph.max_parallelism as _,
944 dependencies,
945 specific_resource_group.clone(),
946 )
947 .await;
948 if let Err(meta_err) = check_ret {
949 if !if_not_exists {
950 return Err(meta_err);
951 }
952 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
953 if streaming_job.create_type() == CreateType::Foreground {
954 let database_id = streaming_job.database_id();
955 self.metadata_manager
956 .wait_streaming_job_finished(database_id, *job_id)
957 .await
958 } else {
959 Ok(IGNORED_NOTIFICATION_VERSION)
960 }
961 } else {
962 Err(meta_err)
963 };
964 }
965 let job_id = streaming_job.id();
966 tracing::debug!(
967 id = %job_id,
968 definition = streaming_job.definition(),
969 create_type = streaming_job.create_type().as_str_name(),
970 job_type = ?streaming_job.job_type(),
971 "starting streaming job",
972 );
973 let permit = self
975 .creating_streaming_job_permits
976 .semaphore
977 .clone()
978 .acquire_owned()
979 .instrument_await("acquire_creating_streaming_job_permit")
980 .await
981 .unwrap();
982 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
983
984 let name = streaming_job.name();
985 let definition = streaming_job.definition();
986 let source_id = match &streaming_job {
987 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
988 _ => None,
989 };
990
991 match self
993 .create_streaming_job_inner(
994 ctx,
995 streaming_job,
996 fragment_graph,
997 specific_resource_group,
998 permit,
999 )
1000 .await
1001 {
1002 Ok(version) => Ok(version),
1003 Err(err) => {
1004 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1005 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1006 id: job_id,
1007 name,
1008 definition,
1009 error: err.as_report().to_string(),
1010 };
1011 self.env.event_log_manager_ref().add_event_logs(vec![
1012 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1013 ]);
1014 let (aborted, _) = self
1015 .metadata_manager
1016 .catalog_controller
1017 .try_abort_creating_streaming_job(job_id, false)
1018 .await?;
1019 if aborted {
1020 tracing::warn!(id = %job_id, "aborted streaming job");
1021 if let Some(source_id) = source_id {
1023 self.source_manager
1024 .apply_source_change(SourceChange::DropSource {
1025 dropped_source_ids: vec![source_id],
1026 })
1027 .await;
1028 }
1029 }
1030 Err(err)
1031 }
1032 }
1033 }
1034
1035 #[await_tree::instrument(boxed)]
1036 async fn create_streaming_job_inner(
1037 &self,
1038 ctx: StreamContext,
1039 mut streaming_job: StreamingJob,
1040 fragment_graph: StreamFragmentGraphProto,
1041 specific_resource_group: Option<String>,
1042 permit: OwnedSemaphorePermit,
1043 ) -> MetaResult<NotificationVersion> {
1044 let mut fragment_graph =
1045 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1046 streaming_job.set_info_from_graph(&fragment_graph);
1047
1048 let incomplete_internal_tables = fragment_graph
1050 .incomplete_internal_tables()
1051 .into_values()
1052 .collect_vec();
1053 let table_id_map = self
1054 .metadata_manager
1055 .catalog_controller
1056 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1057 .await?;
1058 fragment_graph.refill_internal_table_ids(table_id_map);
1059
1060 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1062 let (ctx, stream_job_fragments) = self
1063 .build_stream_job(ctx, streaming_job, fragment_graph, specific_resource_group)
1064 .await?;
1065
1066 let streaming_job = &ctx.streaming_job;
1067
1068 match streaming_job {
1069 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1070 self.validate_cdc_table(table, &stream_job_fragments)
1071 .await?;
1072 }
1073 StreamingJob::Table(Some(source), ..) => {
1074 self.source_manager.register_source(source).await?;
1076 let connector_name = source
1077 .get_with_properties()
1078 .get(UPSTREAM_SOURCE_KEY)
1079 .cloned();
1080 let attr = source.info.as_ref().map(|source_info| {
1081 jsonbb::json!({
1082 "format": source_info.format().as_str_name(),
1083 "encode": source_info.row_encode().as_str_name(),
1084 })
1085 });
1086 report_create_object(
1087 streaming_job.id(),
1088 "source",
1089 PbTelemetryDatabaseObject::Source,
1090 connector_name,
1091 attr,
1092 );
1093 }
1094 StreamingJob::Sink(sink) => {
1095 if sink.auto_refresh_schema_from_table.is_some() {
1096 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1097 }
1098 validate_sink(sink).await?;
1100 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1101 let attr = sink.format_desc.as_ref().map(|sink_info| {
1102 jsonbb::json!({
1103 "format": sink_info.format().as_str_name(),
1104 "encode": sink_info.encode().as_str_name(),
1105 })
1106 });
1107 report_create_object(
1108 streaming_job.id(),
1109 "sink",
1110 PbTelemetryDatabaseObject::Sink,
1111 connector_name,
1112 attr,
1113 );
1114 }
1115 StreamingJob::Source(source) => {
1116 self.source_manager.register_source(source).await?;
1118 let connector_name = source
1119 .get_with_properties()
1120 .get(UPSTREAM_SOURCE_KEY)
1121 .cloned();
1122 let attr = source.info.as_ref().map(|source_info| {
1123 jsonbb::json!({
1124 "format": source_info.format().as_str_name(),
1125 "encode": source_info.row_encode().as_str_name(),
1126 })
1127 });
1128 report_create_object(
1129 streaming_job.id(),
1130 "source",
1131 PbTelemetryDatabaseObject::Source,
1132 connector_name,
1133 attr,
1134 );
1135 }
1136 _ => {}
1137 }
1138
1139 self.metadata_manager
1140 .catalog_controller
1141 .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1142 .await?;
1143
1144 let version = self
1146 .stream_manager
1147 .create_streaming_job(stream_job_fragments, ctx, permit)
1148 .await?;
1149
1150 Ok(version)
1151 }
1152
1153 pub async fn drop_object(
1155 &self,
1156 object_type: ObjectType,
1157 object_id: impl Into<ObjectId>,
1158 drop_mode: DropMode,
1159 ) -> MetaResult<NotificationVersion> {
1160 let object_id = object_id.into();
1161 let (release_ctx, version) = self
1162 .metadata_manager
1163 .catalog_controller
1164 .drop_object(object_type, object_id, drop_mode)
1165 .await?;
1166
1167 if object_type == ObjectType::Source {
1168 self.env
1169 .notification_manager_ref()
1170 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1171 }
1172
1173 let ReleaseContext {
1174 database_id,
1175 removed_streaming_job_ids,
1176 removed_state_table_ids,
1177 removed_source_ids,
1178 removed_secret_ids: secret_ids,
1179 removed_source_fragments,
1180 removed_actors,
1181 removed_fragments,
1182 removed_sink_fragment_by_targets,
1183 removed_iceberg_table_sinks,
1184 } = release_ctx;
1185
1186 let _guard = self.source_manager.pause_tick().await;
1187 self.stream_manager
1188 .drop_streaming_jobs(
1189 database_id,
1190 removed_actors.iter().map(|id| *id as _).collect(),
1191 removed_streaming_job_ids,
1192 removed_state_table_ids,
1193 removed_fragments.iter().map(|id| *id as _).collect(),
1194 removed_sink_fragment_by_targets
1195 .into_iter()
1196 .map(|(target, sinks)| {
1197 (target as _, sinks.into_iter().map(|id| id as _).collect())
1198 })
1199 .collect(),
1200 )
1201 .await;
1202
1203 self.source_manager
1206 .apply_source_change(SourceChange::DropSource {
1207 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1208 })
1209 .await;
1210
1211 let dropped_source_fragments = removed_source_fragments;
1214 self.source_manager
1215 .apply_source_change(SourceChange::DropMv {
1216 dropped_source_fragments,
1217 })
1218 .await;
1219
1220 for sink in removed_iceberg_table_sinks {
1222 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1223 .expect("Iceberg sink should be valid");
1224 let iceberg_sink =
1225 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1226 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1227 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1228 tracing::info!(
1229 "dropping iceberg table {} for dropped sink",
1230 table_identifier
1231 );
1232
1233 let _ = iceberg_catalog
1234 .drop_table(&table_identifier)
1235 .await
1236 .inspect_err(|err| {
1237 tracing::error!(
1238 "failed to drop iceberg table {} during cleanup: {}",
1239 table_identifier,
1240 err.as_report()
1241 );
1242 });
1243 }
1244 }
1245
1246 for secret in secret_ids {
1248 LocalSecretManager::global().remove_secret(secret);
1249 }
1250 Ok(version)
1251 }
1252
1253 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1255 pub async fn replace_job(
1256 &self,
1257 mut streaming_job: StreamingJob,
1258 fragment_graph: StreamFragmentGraphProto,
1259 ) -> MetaResult<NotificationVersion> {
1260 match &streaming_job {
1261 StreamingJob::Table(..)
1262 | StreamingJob::Source(..)
1263 | StreamingJob::MaterializedView(..) => {}
1264 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1265 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1266 }
1267 }
1268
1269 let job_id = streaming_job.id();
1270
1271 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1272 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1273
1274 let original_max_parallelism = self
1276 .metadata_manager
1277 .get_job_max_parallelism(streaming_job.id())
1278 .await?;
1279 let fragment_graph = PbStreamFragmentGraph {
1280 max_parallelism: original_max_parallelism as _,
1281 ..fragment_graph
1282 };
1283
1284 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1286 streaming_job.set_info_from_graph(&fragment_graph);
1287
1288 let streaming_job = streaming_job;
1290
1291 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1292 let auto_refresh_schema_sinks = self
1293 .metadata_manager
1294 .catalog_controller
1295 .get_sink_auto_refresh_schema_from(table.id)
1296 .await?;
1297 if !auto_refresh_schema_sinks.is_empty() {
1298 let original_table_columns = self
1299 .metadata_manager
1300 .catalog_controller
1301 .get_table_columns(table.id)
1302 .await?;
1303 let mut original_table_column_ids: HashSet<_> = original_table_columns
1305 .iter()
1306 .map(|col| col.column_id())
1307 .collect();
1308 let newly_added_columns = table
1309 .columns
1310 .iter()
1311 .filter(|col| {
1312 !original_table_column_ids.remove(&ColumnId::new(
1313 col.column_desc.as_ref().unwrap().column_id as _,
1314 ))
1315 })
1316 .map(|col| ColumnCatalog::from(col.clone()))
1317 .collect_vec();
1318 if !original_table_column_ids.is_empty() {
1319 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1320 }
1321 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1322 for sink in auto_refresh_schema_sinks {
1323 let sink_job_fragments = self
1324 .metadata_manager
1325 .get_job_fragments_by_id(sink.id.as_job_id())
1326 .await?;
1327 if sink_job_fragments.fragments.len() != 1 {
1328 return Err(anyhow!(
1329 "auto schema refresh sink must have only one fragment, but got {}",
1330 sink_job_fragments.fragments.len()
1331 )
1332 .into());
1333 }
1334 let original_sink_fragment =
1335 sink_job_fragments.fragments.into_values().next().unwrap();
1336 let (new_sink_fragment, new_schema, new_log_store_table) =
1337 rewrite_refresh_schema_sink_fragment(
1338 &original_sink_fragment,
1339 &sink,
1340 &newly_added_columns,
1341 table,
1342 fragment_graph.table_fragment_id(),
1343 self.env.id_gen_manager(),
1344 self.env.actor_id_generator(),
1345 )?;
1346
1347 assert_eq!(
1348 original_sink_fragment.actors.len(),
1349 new_sink_fragment.actors.len()
1350 );
1351 let actor_status = (0..original_sink_fragment.actors.len())
1352 .map(|i| {
1353 let worker_node_id = sink_job_fragments.actor_status
1354 [&original_sink_fragment.actors[i].actor_id]
1355 .location
1356 .as_ref()
1357 .unwrap()
1358 .worker_node_id;
1359 (
1360 new_sink_fragment.actors[i].actor_id,
1361 PbActorStatus {
1362 location: Some(PbActorLocation { worker_node_id }),
1363 },
1364 )
1365 })
1366 .collect();
1367
1368 let streaming_job = StreamingJob::Sink(sink);
1369
1370 let tmp_sink_id = self
1371 .metadata_manager
1372 .catalog_controller
1373 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1374 .await?
1375 .as_sink_id();
1376 let StreamingJob::Sink(sink) = streaming_job else {
1377 unreachable!()
1378 };
1379
1380 sinks.push(AutoRefreshSchemaSinkContext {
1381 tmp_sink_id,
1382 original_sink: sink,
1383 original_fragment: original_sink_fragment,
1384 new_schema,
1385 newly_add_fields: newly_added_columns
1386 .iter()
1387 .map(|col| Field::from(&col.column_desc))
1388 .collect(),
1389 new_fragment: new_sink_fragment,
1390 new_log_store_table,
1391 actor_status,
1392 });
1393 }
1394 Some(sinks)
1395 } else {
1396 None
1397 }
1398 } else {
1399 None
1400 };
1401
1402 let tmp_id = self
1403 .metadata_manager
1404 .catalog_controller
1405 .create_job_catalog_for_replace(
1406 &streaming_job,
1407 Some(&ctx),
1408 fragment_graph.specified_parallelism().as_ref(),
1409 Some(fragment_graph.max_parallelism()),
1410 )
1411 .await?;
1412
1413 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1414 sinks
1415 .iter()
1416 .map(|sink| sink.tmp_sink_id.as_object_id())
1417 .collect_vec()
1418 });
1419
1420 tracing::debug!(id = %job_id, "building replace streaming job");
1421 let mut updated_sink_catalogs = vec![];
1422
1423 let mut drop_table_connector_ctx = None;
1424 let result: MetaResult<_> = try {
1425 let (mut ctx, mut stream_job_fragments) = self
1426 .build_replace_job(
1427 ctx,
1428 &streaming_job,
1429 fragment_graph,
1430 tmp_id,
1431 auto_refresh_schema_sinks,
1432 )
1433 .await?;
1434 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1435 let auto_refresh_schema_sink_finish_ctx =
1436 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1437 sinks
1438 .iter()
1439 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1440 tmp_sink_id: sink.tmp_sink_id,
1441 original_sink_id: sink.original_sink.id,
1442 columns: sink.new_schema.clone(),
1443 new_log_store_table: sink
1444 .new_log_store_table
1445 .as_ref()
1446 .map(|table| (table.id, table.columns.clone())),
1447 })
1448 .collect()
1449 });
1450
1451 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1453 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1454 let upstream_infos = self
1455 .metadata_manager
1456 .catalog_controller
1457 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1458 .await?;
1459 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1460
1461 for upstream_info in &upstream_infos {
1462 let upstream_fragment_id = upstream_info.sink_fragment_id;
1463 ctx.upstream_fragment_downstreams
1464 .entry(upstream_fragment_id)
1465 .or_default()
1466 .push(upstream_info.new_sink_downstream.clone());
1467 if upstream_info.sink_original_target_columns.is_empty() {
1468 updated_sink_catalogs.push(upstream_info.sink_id);
1469 }
1470 }
1471 }
1472
1473 let replace_upstream = ctx.replace_upstream.clone();
1474
1475 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1476 let empty_downstreams = FragmentDownstreamRelation::default();
1477 for sink in sinks {
1478 self.metadata_manager
1479 .catalog_controller
1480 .prepare_streaming_job(
1481 sink.tmp_sink_id.as_job_id(),
1482 || [&sink.new_fragment].into_iter(),
1483 &empty_downstreams,
1484 true,
1485 None,
1486 )
1487 .await?;
1488 }
1489 }
1490
1491 self.metadata_manager
1492 .catalog_controller
1493 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1494 .await?;
1495
1496 self.stream_manager
1497 .replace_stream_job(stream_job_fragments, ctx)
1498 .await?;
1499 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1500 };
1501
1502 match result {
1503 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1504 let version = self
1505 .metadata_manager
1506 .catalog_controller
1507 .finish_replace_streaming_job(
1508 tmp_id,
1509 streaming_job,
1510 replace_upstream,
1511 SinkIntoTableContext {
1512 updated_sink_catalogs,
1513 },
1514 drop_table_connector_ctx.as_ref(),
1515 auto_refresh_schema_sink_finish_ctx,
1516 )
1517 .await?;
1518 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1519 self.source_manager
1520 .apply_source_change(SourceChange::DropSource {
1521 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1522 })
1523 .await;
1524 }
1525 Ok(version)
1526 }
1527 Err(err) => {
1528 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1529 let _ = self.metadata_manager
1530 .catalog_controller
1531 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1532 .await.inspect_err(|err| {
1533 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1534 });
1535 Err(err)
1536 }
1537 }
1538 }
1539
1540 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1541 )]
1542 async fn drop_streaming_job(
1543 &self,
1544 job_id: StreamingJobId,
1545 drop_mode: DropMode,
1546 ) -> MetaResult<NotificationVersion> {
1547 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1548
1549 let (object_id, object_type) = match job_id {
1550 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1551 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1552 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1553 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1554 };
1555
1556 let job_status = self
1557 .metadata_manager
1558 .catalog_controller
1559 .get_streaming_job_status(job_id.id())
1560 .await?;
1561 let version = match job_status {
1562 JobStatus::Initial => {
1563 unreachable!(
1564 "Job with Initial status should not notify frontend and therefore should not arrive here"
1565 );
1566 }
1567 JobStatus::Creating => {
1568 self.stream_manager
1569 .cancel_streaming_jobs(vec![job_id.id()])
1570 .await?;
1571 IGNORED_NOTIFICATION_VERSION
1572 }
1573 JobStatus::Created => {
1574 let version = self.drop_object(object_type, object_id, drop_mode).await?;
1575 #[cfg(not(madsim))]
1576 if let StreamingJobId::Sink(sink_id) = job_id {
1577 let db = self.env.meta_store_ref().conn.clone();
1580 clean_all_rows_by_sink_id(&db, sink_id).await?;
1581 }
1582 version
1583 }
1584 };
1585
1586 Ok(version)
1587 }
1588
1589 fn resolve_stream_parallelism(
1593 &self,
1594 specified: Option<NonZeroUsize>,
1595 max: NonZeroUsize,
1596 cluster_info: &StreamingClusterInfo,
1597 resource_group: String,
1598 ) -> MetaResult<NonZeroUsize> {
1599 let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1600 DdlController::resolve_stream_parallelism_inner(
1601 specified,
1602 max,
1603 available,
1604 &self.env.opts.default_parallelism,
1605 &resource_group,
1606 )
1607 }
1608
1609 fn resolve_stream_parallelism_inner(
1610 specified: Option<NonZeroUsize>,
1611 max: NonZeroUsize,
1612 available: Option<NonZeroUsize>,
1613 default_parallelism: &DefaultParallelism,
1614 resource_group: &str,
1615 ) -> MetaResult<NonZeroUsize> {
1616 let Some(available) = available else {
1617 bail_unavailable!(
1618 "no available slots to schedule in resource group \"{}\", \
1619 have you allocated any compute nodes within this resource group?",
1620 resource_group
1621 );
1622 };
1623
1624 if let Some(specified) = specified {
1625 if specified > max {
1626 bail_invalid_parameter!(
1627 "specified parallelism {} should not exceed max parallelism {}",
1628 specified,
1629 max,
1630 );
1631 }
1632 if specified > available {
1633 tracing::warn!(
1634 resource_group,
1635 specified_parallelism = specified.get(),
1636 available_parallelism = available.get(),
1637 "specified parallelism exceeds available slots, scheduling with specified value",
1638 );
1639 }
1640 return Ok(specified);
1641 }
1642
1643 let default_parallelism = match default_parallelism {
1645 DefaultParallelism::Full => available,
1646 DefaultParallelism::Default(num) => {
1647 if *num > available {
1648 tracing::warn!(
1649 resource_group,
1650 configured_parallelism = num.get(),
1651 available_parallelism = available.get(),
1652 "default parallelism exceeds available slots, scheduling with configured value",
1653 );
1654 }
1655 *num
1656 }
1657 };
1658
1659 if default_parallelism > max {
1660 tracing::warn!(
1661 max_parallelism = max.get(),
1662 resource_group,
1663 "default parallelism exceeds max parallelism, capping to max",
1664 );
1665 }
1666 Ok(default_parallelism.min(max))
1667 }
1668
1669 #[await_tree::instrument]
1675 pub(crate) async fn build_stream_job(
1676 &self,
1677 stream_ctx: StreamContext,
1678 mut stream_job: StreamingJob,
1679 fragment_graph: StreamFragmentGraph,
1680 specific_resource_group: Option<String>,
1681 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1682 let id = stream_job.id();
1683 let specified_parallelism = fragment_graph.specified_parallelism();
1684 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1685
1686 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1688
1689 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1693 fragment_graph.collect_snapshot_backfill_info()?;
1694 assert!(
1695 snapshot_backfill_info
1696 .iter()
1697 .chain([&cross_db_snapshot_backfill_info])
1698 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1699 .all(|backfill_epoch| backfill_epoch.is_none()),
1700 "should not set backfill epoch when initially build the job: {:?} {:?}",
1701 snapshot_backfill_info,
1702 cross_db_snapshot_backfill_info
1703 );
1704
1705 let locality_fragment_state_table_mapping =
1706 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1707
1708 self.metadata_manager
1710 .catalog_controller
1711 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1712 .await?;
1713
1714 let upstream_table_ids = fragment_graph
1715 .dependent_table_ids()
1716 .iter()
1717 .filter(|id| {
1718 !cross_db_snapshot_backfill_info
1719 .upstream_mv_table_id_to_backfill_epoch
1720 .contains_key(id)
1721 })
1722 .cloned()
1723 .collect();
1724
1725 let (upstream_root_fragments, existing_actor_location) = self
1726 .metadata_manager
1727 .get_upstream_root_fragments(&upstream_table_ids)
1728 .await?;
1729
1730 if snapshot_backfill_info.is_some() {
1731 match stream_job {
1732 StreamingJob::MaterializedView(_)
1733 | StreamingJob::Sink(_)
1734 | StreamingJob::Index(_, _) => {}
1735 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1736 return Err(
1737 anyhow!("snapshot_backfill not enabled for table and source").into(),
1738 );
1739 }
1740 }
1741 }
1742
1743 let upstream_actors = upstream_root_fragments
1744 .values()
1745 .map(|(fragment, _)| {
1746 (
1747 fragment.fragment_id,
1748 fragment.actors.keys().copied().collect(),
1749 )
1750 })
1751 .collect();
1752
1753 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1754 fragment_graph,
1755 FragmentGraphUpstreamContext {
1756 upstream_root_fragments,
1757 upstream_actor_location: existing_actor_location,
1758 },
1759 (&stream_job).into(),
1760 )?;
1761 let resource_group = match specific_resource_group {
1762 None => {
1763 self.metadata_manager
1764 .get_database_resource_group(stream_job.database_id())
1765 .await?
1766 }
1767 Some(resource_group) => resource_group,
1768 };
1769
1770 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1772
1773 let parallelism = self.resolve_stream_parallelism(
1774 specified_parallelism,
1775 max_parallelism,
1776 &cluster_info,
1777 resource_group.clone(),
1778 )?;
1779
1780 let parallelism = self
1781 .env
1782 .system_params_reader()
1783 .await
1784 .adaptive_parallelism_strategy()
1785 .compute_target_parallelism(parallelism.get());
1786
1787 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1788 let actor_graph_builder = ActorGraphBuilder::new(
1789 id,
1790 resource_group,
1791 complete_graph,
1792 cluster_info,
1793 parallelism,
1794 )?;
1795
1796 let ActorGraphBuildResult {
1797 graph,
1798 downstream_fragment_relations,
1799 building_locations,
1800 upstream_fragment_downstreams,
1801 new_no_shuffle,
1802 replace_upstream,
1803 ..
1804 } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1805 assert!(replace_upstream.is_empty());
1806
1807 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1814 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1815 _ => TableParallelism::Fixed(parallelism.get()),
1816 };
1817
1818 let stream_job_fragments = StreamJobFragments::new(
1819 id,
1820 graph,
1821 &building_locations.actor_locations,
1822 stream_ctx.clone(),
1823 table_parallelism,
1824 max_parallelism.get(),
1825 );
1826
1827 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1828 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1829 }
1830
1831 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1832 && let Ok(table_id) = sink.get_target_table()
1833 {
1834 let tables = self
1835 .metadata_manager
1836 .get_table_catalog_by_ids(&[*table_id])
1837 .await?;
1838 let target_table = tables
1839 .first()
1840 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1841 let sink_fragment = stream_job_fragments
1842 .sink_fragment()
1843 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1844 let mview_fragment_id = self
1845 .metadata_manager
1846 .catalog_controller
1847 .get_mview_fragment_by_id(table_id.as_job_id())
1848 .await?;
1849 let upstream_sink_info = build_upstream_sink_info(
1850 sink,
1851 sink_fragment.fragment_id as _,
1852 target_table,
1853 mview_fragment_id,
1854 )?;
1855 Some(upstream_sink_info)
1856 } else {
1857 None
1858 };
1859
1860 let ctx = CreateStreamingJobContext {
1861 upstream_fragment_downstreams,
1862 new_no_shuffle,
1863 upstream_actors,
1864 building_locations,
1865 definition: stream_job.definition(),
1866 create_type: stream_job.create_type(),
1867 job_type: (&stream_job).into(),
1868 streaming_job: stream_job,
1869 new_upstream_sink,
1870 option: CreateStreamingJobOption {},
1871 snapshot_backfill_info,
1872 cross_db_snapshot_backfill_info,
1873 fragment_backfill_ordering,
1874 locality_fragment_state_table_mapping,
1875 };
1876
1877 Ok((
1878 ctx,
1879 StreamJobFragmentsToCreate {
1880 inner: stream_job_fragments,
1881 downstreams: downstream_fragment_relations,
1882 },
1883 ))
1884 }
1885
1886 pub(crate) async fn build_replace_job(
1892 &self,
1893 stream_ctx: StreamContext,
1894 stream_job: &StreamingJob,
1895 mut fragment_graph: StreamFragmentGraph,
1896 tmp_job_id: JobId,
1897 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1898 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1899 match &stream_job {
1900 StreamingJob::Table(..)
1901 | StreamingJob::Source(..)
1902 | StreamingJob::MaterializedView(..) => {}
1903 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1904 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1905 }
1906 }
1907
1908 let id = stream_job.id();
1909
1910 let mut drop_table_associated_source_id = None;
1912 if let StreamingJob::Table(None, _, _) = &stream_job {
1913 drop_table_associated_source_id = self
1914 .metadata_manager
1915 .get_table_associated_source_id(id.as_mv_table_id())
1916 .await?;
1917 }
1918
1919 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1920 let old_internal_table_ids = old_fragments.internal_table_ids();
1921
1922 let mut drop_table_connector_ctx = None;
1924 if let Some(to_remove_source_id) = drop_table_associated_source_id {
1925 debug_assert!(old_internal_table_ids.len() == 1);
1927
1928 drop_table_connector_ctx = Some(DropTableConnectorContext {
1929 to_change_streaming_job_id: id,
1932 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
1934 });
1935 } else if stream_job.is_materialized_view() {
1936 let old_fragments_upstreams = self
1939 .metadata_manager
1940 .catalog_controller
1941 .upstream_fragments(old_fragments.fragment_ids())
1942 .await?;
1943
1944 let old_state_graph =
1945 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1946 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1947 let mapping =
1948 state_match::match_graph_internal_tables(&new_state_graph, &old_state_graph)
1949 .context("incompatible altering on the streaming job states")?;
1950
1951 fragment_graph.fit_internal_table_ids_with_mapping(mapping);
1952 } else {
1953 let old_internal_tables = self
1956 .metadata_manager
1957 .get_table_catalog_by_ids(&old_internal_table_ids)
1958 .await?;
1959 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
1960 }
1961
1962 let original_root_fragment = old_fragments
1965 .root_fragment()
1966 .expect("root fragment not found");
1967
1968 let job_type = StreamingJobType::from(stream_job);
1969
1970 let (mut downstream_fragments, mut downstream_actor_location) =
1972 self.metadata_manager.get_downstream_fragments(id).await?;
1973
1974 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
1975 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
1976 .iter()
1977 .map(|sink| sink.original_fragment.fragment_id)
1978 .collect();
1979 for (_, downstream_fragment, _) in &mut downstream_fragments {
1980 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
1981 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
1982 }) {
1983 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
1984 for actor_id in downstream_fragment.actors.keys() {
1985 downstream_actor_location.remove(actor_id);
1986 }
1987 for (actor_id, status) in &sink.actor_status {
1988 downstream_actor_location
1989 .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
1990 }
1991
1992 *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
1993 }
1994 }
1995 assert!(remaining_fragment.is_empty());
1996 }
1997
1998 let complete_graph = match &job_type {
2000 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2001 CompleteStreamFragmentGraph::with_downstreams(
2002 fragment_graph,
2003 FragmentGraphDownstreamContext {
2004 original_root_fragment_id: original_root_fragment.fragment_id,
2005 downstream_fragments,
2006 downstream_actor_location,
2007 },
2008 job_type,
2009 )?
2010 }
2011 StreamingJobType::Table(TableJobType::SharedCdcSource)
2012 | StreamingJobType::MaterializedView => {
2013 let (upstream_root_fragments, upstream_actor_location) = self
2015 .metadata_manager
2016 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2017 .await?;
2018
2019 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2020 fragment_graph,
2021 FragmentGraphUpstreamContext {
2022 upstream_root_fragments,
2023 upstream_actor_location,
2024 },
2025 FragmentGraphDownstreamContext {
2026 original_root_fragment_id: original_root_fragment.fragment_id,
2027 downstream_fragments,
2028 downstream_actor_location,
2029 },
2030 job_type,
2031 )?
2032 }
2033 _ => unreachable!(),
2034 };
2035
2036 let resource_group = self
2037 .metadata_manager
2038 .get_existing_job_resource_group(id)
2039 .await?;
2040
2041 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2043
2044 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2047 .expect("The number of actors in the original table fragment should be greater than 0");
2048
2049 let actor_graph_builder = ActorGraphBuilder::new(
2050 id,
2051 resource_group,
2052 complete_graph,
2053 cluster_info,
2054 parallelism,
2055 )?;
2056
2057 let ActorGraphBuildResult {
2058 graph,
2059 downstream_fragment_relations,
2060 building_locations,
2061 upstream_fragment_downstreams,
2062 mut replace_upstream,
2063 new_no_shuffle,
2064 ..
2065 } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2066
2067 if matches!(
2069 job_type,
2070 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2071 ) {
2072 assert!(upstream_fragment_downstreams.is_empty());
2073 }
2074
2075 let stream_job_fragments = StreamJobFragments::new(
2079 tmp_job_id,
2080 graph,
2081 &building_locations.actor_locations,
2082 stream_ctx,
2083 old_fragments.assigned_parallelism,
2084 old_fragments.max_parallelism,
2085 );
2086
2087 if let Some(sinks) = &auto_refresh_schema_sinks {
2088 for sink in sinks {
2089 replace_upstream
2090 .remove(&sink.new_fragment.fragment_id)
2091 .expect("should exist");
2092 }
2093 }
2094
2095 let ctx = ReplaceStreamJobContext {
2099 old_fragments,
2100 replace_upstream,
2101 new_no_shuffle,
2102 upstream_fragment_downstreams,
2103 building_locations,
2104 streaming_job: stream_job.clone(),
2105 tmp_id: tmp_job_id,
2106 drop_table_connector_ctx,
2107 auto_refresh_schema_sinks,
2108 };
2109
2110 Ok((
2111 ctx,
2112 StreamJobFragmentsToCreate {
2113 inner: stream_job_fragments,
2114 downstreams: downstream_fragment_relations,
2115 },
2116 ))
2117 }
2118
2119 async fn alter_name(
2120 &self,
2121 relation: alter_name_request::Object,
2122 new_name: &str,
2123 ) -> MetaResult<NotificationVersion> {
2124 let (obj_type, id) = match relation {
2125 alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2126 alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2127 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2128 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2129 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2130 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2131 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2132 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2133 };
2134 self.metadata_manager
2135 .catalog_controller
2136 .alter_name(obj_type, id, new_name)
2137 .await
2138 }
2139
2140 async fn alter_swap_rename(
2141 &self,
2142 object: alter_swap_rename_request::Object,
2143 ) -> MetaResult<NotificationVersion> {
2144 let (obj_type, src_id, dst_id) = match object {
2145 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2146 alter_swap_rename_request::Object::Table(objs) => {
2147 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2148 (ObjectType::Table, src_id, dst_id)
2149 }
2150 alter_swap_rename_request::Object::View(objs) => {
2151 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2152 (ObjectType::View, src_id, dst_id)
2153 }
2154 alter_swap_rename_request::Object::Source(objs) => {
2155 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2156 (ObjectType::Source, src_id, dst_id)
2157 }
2158 alter_swap_rename_request::Object::Sink(objs) => {
2159 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2160 (ObjectType::Sink, src_id, dst_id)
2161 }
2162 alter_swap_rename_request::Object::Subscription(objs) => {
2163 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2164 (ObjectType::Subscription, src_id, dst_id)
2165 }
2166 };
2167
2168 self.metadata_manager
2169 .catalog_controller
2170 .alter_swap_rename(obj_type, src_id, dst_id)
2171 .await
2172 }
2173
2174 async fn alter_owner(
2175 &self,
2176 object: Object,
2177 owner_id: UserId,
2178 ) -> MetaResult<NotificationVersion> {
2179 let (obj_type, id) = match object {
2180 Object::TableId(id) => (ObjectType::Table, id),
2181 Object::ViewId(id) => (ObjectType::View, id),
2182 Object::SourceId(id) => (ObjectType::Source, id),
2183 Object::SinkId(id) => (ObjectType::Sink, id),
2184 Object::SchemaId(id) => (ObjectType::Schema, id),
2185 Object::DatabaseId(id) => (ObjectType::Database, id),
2186 Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2187 Object::ConnectionId(id) => (ObjectType::Connection, id),
2188 };
2189 self.metadata_manager
2190 .catalog_controller
2191 .alter_owner(obj_type, id.into(), owner_id as _)
2192 .await
2193 }
2194
2195 async fn alter_set_schema(
2196 &self,
2197 object: alter_set_schema_request::Object,
2198 new_schema_id: SchemaId,
2199 ) -> MetaResult<NotificationVersion> {
2200 let (obj_type, id) = match object {
2201 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2202 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2203 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2204 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2205 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2206 alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2207 alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2208 };
2209 self.metadata_manager
2210 .catalog_controller
2211 .alter_schema(obj_type, id.into(), new_schema_id as _)
2212 .await
2213 }
2214
2215 pub async fn wait(&self) -> MetaResult<()> {
2216 let timeout_ms = 30 * 60 * 1000;
2217 for _ in 0..timeout_ms {
2218 if self
2219 .metadata_manager
2220 .catalog_controller
2221 .list_background_creating_jobs(true, None)
2222 .await?
2223 .is_empty()
2224 {
2225 return Ok(());
2226 }
2227
2228 sleep(Duration::from_millis(1)).await;
2229 }
2230 Err(MetaError::cancelled(format!(
2231 "timeout after {timeout_ms}ms"
2232 )))
2233 }
2234
2235 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2236 self.metadata_manager
2237 .catalog_controller
2238 .comment_on(comment)
2239 .await
2240 }
2241
2242 async fn alter_streaming_job_config(
2243 &self,
2244 job_id: JobId,
2245 entries_to_add: HashMap<String, String>,
2246 keys_to_remove: Vec<String>,
2247 ) -> MetaResult<NotificationVersion> {
2248 self.metadata_manager
2249 .catalog_controller
2250 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2251 .await
2252 }
2253}
2254
2255fn report_create_object(
2256 job_id: JobId,
2257 event_name: &str,
2258 obj_type: PbTelemetryDatabaseObject,
2259 connector_name: Option<String>,
2260 attr_info: Option<jsonbb::Value>,
2261) {
2262 report_event(
2263 PbTelemetryEventStage::CreateStreamJob,
2264 event_name,
2265 job_id.as_raw_id() as _,
2266 connector_name,
2267 Some(obj_type),
2268 attr_info,
2269 );
2270}
2271
2272async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: SinkId) -> MetaResult<()> {
2273 match Entity::delete_many()
2274 .filter(Column::SinkId.eq(sink_id))
2275 .exec(db)
2276 .await
2277 {
2278 Ok(result) => {
2279 let deleted_count = result.rows_affected;
2280
2281 tracing::info!(
2282 "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2283 deleted_count,
2284 sink_id
2285 );
2286 Ok(())
2287 }
2288 Err(e) => {
2289 tracing::error!(
2290 "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2291 sink_id,
2292 e.as_report()
2293 );
2294 Err(e.into())
2295 }
2296 }
2297}
2298
2299pub fn build_upstream_sink_info(
2300 sink: &PbSink,
2301 sink_fragment_id: FragmentId,
2302 target_table: &PbTable,
2303 target_fragment_id: FragmentId,
2304) -> MetaResult<UpstreamSinkInfo> {
2305 let sink_columns = if !sink.original_target_columns.is_empty() {
2306 sink.original_target_columns.clone()
2307 } else {
2308 target_table.columns.clone()
2313 };
2314
2315 let sink_output_fields = sink_columns
2316 .iter()
2317 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2318 .collect_vec();
2319 let output_indices = (0..sink_output_fields.len())
2320 .map(|i| i as u32)
2321 .collect_vec();
2322
2323 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2324 let sink_idx_by_col_id = sink_columns
2325 .iter()
2326 .enumerate()
2327 .map(|(idx, col)| {
2328 let column_id = col.column_desc.as_ref().unwrap().column_id;
2329 (column_id, idx as u32)
2330 })
2331 .collect::<HashMap<_, _>>();
2332 target_table
2333 .distribution_key
2334 .iter()
2335 .map(|dist_idx| {
2336 let column_id = target_table.columns[*dist_idx as usize]
2337 .column_desc
2338 .as_ref()
2339 .unwrap()
2340 .column_id;
2341 let sink_idx = sink_idx_by_col_id
2342 .get(&column_id)
2343 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2344 Ok(*sink_idx)
2345 })
2346 .collect::<anyhow::Result<Vec<_>>>()?
2347 };
2348 let dist_key_indices =
2349 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2350 let downstream_fragment_id = target_fragment_id as _;
2351 let new_downstream_relation = DownstreamFragmentRelation {
2352 downstream_fragment_id,
2353 dispatcher_type: DispatcherType::Hash,
2354 dist_key_indices,
2355 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2356 };
2357 let current_target_columns = target_table.get_columns();
2358 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2359 Ok(UpstreamSinkInfo {
2360 sink_id: sink.id,
2361 sink_fragment_id: sink_fragment_id as _,
2362 sink_output_fields,
2363 sink_original_target_columns: sink.get_original_target_columns().clone(),
2364 project_exprs,
2365 new_sink_downstream: new_downstream_relation,
2366 })
2367}
2368
2369pub fn refill_upstream_sink_union_in_table(
2370 union_fragment_root: &mut PbStreamNode,
2371 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2372) {
2373 visit_stream_node_cont_mut(union_fragment_root, |node| {
2374 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2375 let init_upstreams = upstream_sink_infos
2376 .iter()
2377 .map(|info| PbUpstreamSinkInfo {
2378 upstream_fragment_id: info.sink_fragment_id,
2379 sink_output_schema: info.sink_output_fields.clone(),
2380 project_exprs: info.project_exprs.clone(),
2381 })
2382 .collect();
2383 upstream_sink_union.init_upstreams = init_upstreams;
2384 false
2385 } else {
2386 true
2387 }
2388 });
2389}
2390
2391#[cfg(test)]
2392mod tests {
2393 use std::num::NonZeroUsize;
2394
2395 use super::*;
2396
2397 #[test]
2398 fn test_specified_parallelism_exceeds_available() {
2399 let result = DdlController::resolve_stream_parallelism_inner(
2400 Some(NonZeroUsize::new(100).unwrap()),
2401 NonZeroUsize::new(256).unwrap(),
2402 Some(NonZeroUsize::new(4).unwrap()),
2403 &DefaultParallelism::Full,
2404 "default",
2405 )
2406 .unwrap();
2407 assert_eq!(result.get(), 100);
2408 }
2409
2410 #[test]
2411 fn test_allows_default_parallelism_over_available() {
2412 let result = DdlController::resolve_stream_parallelism_inner(
2413 None,
2414 NonZeroUsize::new(256).unwrap(),
2415 Some(NonZeroUsize::new(4).unwrap()),
2416 &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2417 "default",
2418 )
2419 .unwrap();
2420 assert_eq!(result.get(), 50);
2421 }
2422
2423 #[test]
2424 fn test_full_parallelism_capped_by_max() {
2425 let result = DdlController::resolve_stream_parallelism_inner(
2426 None,
2427 NonZeroUsize::new(6).unwrap(),
2428 Some(NonZeroUsize::new(10).unwrap()),
2429 &DefaultParallelism::Full,
2430 "default",
2431 )
2432 .unwrap();
2433 assert_eq!(result.get(), 6);
2434 }
2435
2436 #[test]
2437 fn test_no_available_slots_returns_error() {
2438 let result = DdlController::resolve_stream_parallelism_inner(
2439 None,
2440 NonZeroUsize::new(4).unwrap(),
2441 None,
2442 &DefaultParallelism::Full,
2443 "default",
2444 );
2445 assert!(matches!(
2446 result,
2447 Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2448 ));
2449 }
2450
2451 #[test]
2452 fn test_specified_over_max_returns_error() {
2453 let result = DdlController::resolve_stream_parallelism_inner(
2454 Some(NonZeroUsize::new(8).unwrap()),
2455 NonZeroUsize::new(4).unwrap(),
2456 Some(NonZeroUsize::new(10).unwrap()),
2457 &DefaultParallelism::Full,
2458 "default",
2459 );
2460 assert!(matches!(
2461 result,
2462 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2463 ));
2464 }
2465}