1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::{InstrumentAwait, span};
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{AlterDatabaseParam, FragmentTypeFlag};
27use risingwave_common::config::DefaultParallelism;
28use risingwave_common::hash::VnodeCountCompat;
29use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
30use risingwave_common::system_param::reader::SystemParamsRead;
31use risingwave_common::util::stream_graph_visitor::{
32 visit_stream_node, visit_stream_node_body, visit_stream_node_cont_mut,
33};
34use risingwave_common::{bail, bail_not_implemented, must_match};
35use risingwave_connector::WithOptionsSecResolved;
36use risingwave_connector::connector_common::validate_connection;
37use risingwave_connector::source::{
38 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
39};
40use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
41use risingwave_meta_model::object::ObjectType;
42use risingwave_meta_model::{
43 ConnectionId, DatabaseId, DispatcherType, FunctionId, IndexId, ObjectId, SchemaId, SecretId,
44 SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId,
45};
46use risingwave_pb::catalog::{
47 Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
48 Subscription, Table, View,
49};
50use risingwave_pb::ddl_service::alter_owner_request::Object;
51use risingwave_pb::ddl_service::{
52 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
53 alter_swap_rename_request,
54};
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 MergeNode, PbDispatchOutputMapping, PbDispatcherType, PbStreamFragmentGraph,
58 StreamFragmentGraph as StreamFragmentGraphProto,
59};
60use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
61use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
62use strum::Display;
63use thiserror_ext::AsReport;
64use tokio::sync::{Semaphore, oneshot};
65use tokio::time::sleep;
66use tracing::Instrument;
67
68use crate::barrier::BarrierManagerRef;
69use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
70use crate::controller::cluster::StreamingClusterInfo;
71use crate::controller::streaming_job::SinkIntoTableContext;
72use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
73use crate::manager::{
74 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
75 NotificationVersion, StreamingJob, StreamingJobType,
76};
77use crate::model::{
78 DownstreamFragmentRelation, Fragment, StreamContext, StreamJobFragments,
79 StreamJobFragmentsToCreate, TableParallelism,
80};
81use crate::stream::{
82 ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph,
83 CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef,
84 JobRescheduleTarget, ReplaceStreamJobContext, SourceChange, SourceManagerRef,
85 StreamFragmentGraph, create_source_worker, state_match, validate_sink,
86};
87use crate::telemetry::report_event;
88use crate::{MetaError, MetaResult};
89
90#[derive(PartialEq)]
91pub enum DropMode {
92 Restrict,
93 Cascade,
94}
95
96impl DropMode {
97 pub fn from_request_setting(cascade: bool) -> DropMode {
98 if cascade {
99 DropMode::Cascade
100 } else {
101 DropMode::Restrict
102 }
103 }
104}
105
106#[derive(strum::AsRefStr)]
107pub enum StreamingJobId {
108 MaterializedView(TableId),
109 Sink(SinkId),
110 Table(Option<SourceId>, TableId),
111 Index(IndexId),
112}
113
114impl std::fmt::Display for StreamingJobId {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 write!(f, "{}", self.as_ref())?;
117 write!(f, "({})", self.id())
118 }
119}
120
121impl StreamingJobId {
122 #[allow(dead_code)]
123 fn id(&self) -> TableId {
124 match self {
125 StreamingJobId::MaterializedView(id)
126 | StreamingJobId::Sink(id)
127 | StreamingJobId::Table(_, id)
128 | StreamingJobId::Index(id) => *id,
129 }
130 }
131}
132
133pub struct ReplaceStreamJobInfo {
136 pub streaming_job: StreamingJob,
137 pub fragment_graph: StreamFragmentGraphProto,
138}
139
140#[derive(Display)]
141pub enum DdlCommand {
142 CreateDatabase(Database),
143 DropDatabase(DatabaseId),
144 CreateSchema(Schema),
145 DropSchema(SchemaId, DropMode),
146 CreateNonSharedSource(Source),
147 DropSource(SourceId, DropMode),
148 CreateFunction(Function),
149 DropFunction(FunctionId),
150 CreateView(View, HashSet<ObjectId>),
151 DropView(ViewId, DropMode),
152 CreateStreamingJob {
153 stream_job: StreamingJob,
154 fragment_graph: StreamFragmentGraphProto,
155 create_type: CreateType,
156 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
157 dependencies: HashSet<ObjectId>,
158 specific_resource_group: Option<String>, if_not_exists: bool,
160 },
161 DropStreamingJob {
162 job_id: StreamingJobId,
163 drop_mode: DropMode,
164 target_replace_info: Option<ReplaceStreamJobInfo>,
165 },
166 AlterName(alter_name_request::Object, String),
167 AlterSwapRename(alter_swap_rename_request::Object),
168 ReplaceStreamJob(ReplaceStreamJobInfo),
169 AlterNonSharedSource(Source),
170 AlterObjectOwner(Object, UserId),
171 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
172 CreateConnection(Connection),
173 DropConnection(ConnectionId),
174 CreateSecret(Secret),
175 AlterSecret(Secret),
176 DropSecret(SecretId),
177 CommentOn(Comment),
178 CreateSubscription(Subscription),
179 DropSubscription(SubscriptionId, DropMode),
180 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
181}
182
183impl DdlCommand {
184 fn object(&self) -> Either<String, ObjectId> {
186 use Either::*;
187 match self {
188 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
189 DdlCommand::DropDatabase(id) => Right(*id),
190 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
191 DdlCommand::DropSchema(id, _) => Right(*id),
192 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
193 DdlCommand::DropSource(id, _) => Right(*id),
194 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
195 DdlCommand::DropFunction(id) => Right(*id),
196 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
197 DdlCommand::DropView(id, _) => Right(*id),
198 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
199 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id()),
200 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
201 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
202 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
203 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
204 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
205 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
206 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
207 DdlCommand::DropConnection(id) => Right(*id),
208 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
209 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
210 DdlCommand::DropSecret(id) => Right(*id),
211 DdlCommand::CommentOn(comment) => Right(comment.table_id as _),
212 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
213 DdlCommand::DropSubscription(id, _) => Right(*id),
214 DdlCommand::AlterDatabaseParam(id, _) => Right(*id),
215 }
216 }
217
218 fn allow_in_recovery(&self) -> bool {
219 match self {
220 DdlCommand::DropDatabase(_)
221 | DdlCommand::DropSchema(_, _)
222 | DdlCommand::DropSource(_, _)
223 | DdlCommand::DropFunction(_)
224 | DdlCommand::DropView(_, _)
225 | DdlCommand::DropStreamingJob { .. }
226 | DdlCommand::DropConnection(_)
227 | DdlCommand::DropSecret(_)
228 | DdlCommand::DropSubscription(_, _)
229 | DdlCommand::AlterName(_, _)
230 | DdlCommand::AlterObjectOwner(_, _)
231 | DdlCommand::AlterSetSchema(_, _)
232 | DdlCommand::CreateDatabase(_)
233 | DdlCommand::CreateSchema(_)
234 | DdlCommand::CreateFunction(_)
235 | DdlCommand::CreateView(_, _)
236 | DdlCommand::CreateConnection(_)
237 | DdlCommand::CommentOn(_)
238 | DdlCommand::CreateSecret(_)
239 | DdlCommand::AlterSecret(_)
240 | DdlCommand::AlterSwapRename(_)
241 | DdlCommand::AlterDatabaseParam(_, _) => true,
242 DdlCommand::CreateStreamingJob { .. }
243 | DdlCommand::CreateNonSharedSource(_)
244 | DdlCommand::ReplaceStreamJob(_)
245 | DdlCommand::AlterNonSharedSource(_)
246 | DdlCommand::CreateSubscription(_) => false,
247 }
248 }
249}
250
251#[derive(Clone)]
252pub struct DdlController {
253 pub(crate) env: MetaSrvEnv,
254
255 pub(crate) metadata_manager: MetadataManager,
256 pub(crate) stream_manager: GlobalStreamManagerRef,
257 pub(crate) source_manager: SourceManagerRef,
258 barrier_manager: BarrierManagerRef,
259
260 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
262
263 seq: Arc<AtomicU64>,
265}
266
267#[derive(Clone)]
268pub struct CreatingStreamingJobPermit {
269 pub(crate) semaphore: Arc<Semaphore>,
270}
271
272impl CreatingStreamingJobPermit {
273 async fn new(env: &MetaSrvEnv) -> Self {
274 let mut permits = env
275 .system_params_reader()
276 .await
277 .max_concurrent_creating_streaming_jobs() as usize;
278 if permits == 0 {
279 permits = Semaphore::MAX_PERMITS;
281 }
282 let semaphore = Arc::new(Semaphore::new(permits));
283
284 let (local_notification_tx, mut local_notification_rx) =
285 tokio::sync::mpsc::unbounded_channel();
286 env.notification_manager()
287 .insert_local_sender(local_notification_tx)
288 .await;
289 let semaphore_clone = semaphore.clone();
290 tokio::spawn(async move {
291 while let Some(notification) = local_notification_rx.recv().await {
292 let LocalNotification::SystemParamsChange(p) = ¬ification else {
293 continue;
294 };
295 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
296 if new_permits == 0 {
297 new_permits = Semaphore::MAX_PERMITS;
298 }
299 match permits.cmp(&new_permits) {
300 Ordering::Less => {
301 semaphore_clone.add_permits(new_permits - permits);
302 }
303 Ordering::Equal => continue,
304 Ordering::Greater => {
305 semaphore_clone
306 .acquire_many((permits - new_permits) as u32)
307 .await
308 .unwrap()
309 .forget();
310 }
311 }
312 tracing::info!(
313 "max_concurrent_creating_streaming_jobs changed from {} to {}",
314 permits,
315 new_permits
316 );
317 permits = new_permits;
318 }
319 });
320
321 Self { semaphore }
322 }
323}
324
325impl DdlController {
326 pub async fn new(
327 env: MetaSrvEnv,
328 metadata_manager: MetadataManager,
329 stream_manager: GlobalStreamManagerRef,
330 source_manager: SourceManagerRef,
331 barrier_manager: BarrierManagerRef,
332 ) -> Self {
333 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
334 Self {
335 env,
336 metadata_manager,
337 stream_manager,
338 source_manager,
339 barrier_manager,
340 creating_streaming_job_permits,
341 seq: Arc::new(AtomicU64::new(0)),
342 }
343 }
344
345 pub fn next_seq(&self) -> u64 {
347 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
349 }
350
351 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
358 if !command.allow_in_recovery() {
359 self.barrier_manager.check_status_running()?;
360 }
361
362 let await_tree_key = format!("DDL Command {}", self.next_seq());
363 let await_tree_span = await_tree::span!("{command}({})", command.object());
364
365 let ctrl = self.clone();
366 let fut = async move {
367 match command {
368 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
369 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
370 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
371 DdlCommand::DropSchema(schema_id, drop_mode) => {
372 ctrl.drop_schema(schema_id, drop_mode).await
373 }
374 DdlCommand::CreateNonSharedSource(source) => {
375 ctrl.create_non_shared_source(source).await
376 }
377 DdlCommand::DropSource(source_id, drop_mode) => {
378 ctrl.drop_source(source_id, drop_mode).await
379 }
380 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
381 DdlCommand::DropFunction(function_id) => ctrl.drop_function(function_id).await,
382 DdlCommand::CreateView(view, dependencies) => {
383 ctrl.create_view(view, dependencies).await
384 }
385 DdlCommand::DropView(view_id, drop_mode) => {
386 ctrl.drop_view(view_id, drop_mode).await
387 }
388 DdlCommand::CreateStreamingJob {
389 stream_job,
390 fragment_graph,
391 create_type: _,
392 affected_table_replace_info,
393 dependencies,
394 specific_resource_group,
395 if_not_exists,
396 } => {
397 ctrl.create_streaming_job(
398 stream_job,
399 fragment_graph,
400 affected_table_replace_info,
401 dependencies,
402 specific_resource_group,
403 if_not_exists,
404 )
405 .await
406 }
407 DdlCommand::DropStreamingJob {
408 job_id,
409 drop_mode,
410 target_replace_info,
411 } => {
412 ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info)
413 .await
414 }
415 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
416 streaming_job,
417 fragment_graph,
418 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
419 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
420 DdlCommand::AlterObjectOwner(object, owner_id) => {
421 ctrl.alter_owner(object, owner_id).await
422 }
423 DdlCommand::AlterSetSchema(object, new_schema_id) => {
424 ctrl.alter_set_schema(object, new_schema_id).await
425 }
426 DdlCommand::CreateConnection(connection) => {
427 ctrl.create_connection(connection).await
428 }
429 DdlCommand::DropConnection(connection_id) => {
430 ctrl.drop_connection(connection_id).await
431 }
432 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
433 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
434 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
435 DdlCommand::AlterNonSharedSource(source) => {
436 ctrl.alter_non_shared_source(source).await
437 }
438 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
439 DdlCommand::CreateSubscription(subscription) => {
440 ctrl.create_subscription(subscription).await
441 }
442 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
443 ctrl.drop_subscription(subscription_id, drop_mode).await
444 }
445 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
446 DdlCommand::AlterDatabaseParam(database_id, param) => {
447 ctrl.alter_database_param(database_id, param).await
448 }
449 }
450 }
451 .in_current_span();
452 let fut = (self.env.await_tree_reg())
453 .register(await_tree_key, await_tree_span)
454 .instrument(Box::pin(fut));
455 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
456 Ok(Some(WaitVersion {
457 catalog_version: notification_version,
458 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
459 }))
460 }
461
462 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
463 self.barrier_manager.get_ddl_progress().await
464 }
465
466 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
467 let (version, updated_db) = self
468 .metadata_manager
469 .catalog_controller
470 .create_database(database)
471 .await?;
472 self.barrier_manager
474 .update_database_barrier(
475 updated_db.database_id,
476 updated_db.barrier_interval_ms.map(|v| v as u32),
477 updated_db.checkpoint_frequency.map(|v| v as u64),
478 )
479 .await?;
480 Ok(version)
481 }
482
483 #[tracing::instrument(skip(self), level = "debug")]
484 pub async fn reschedule_streaming_job(
485 &self,
486 job_id: u32,
487 target: JobRescheduleTarget,
488 mut deferred: bool,
489 ) -> MetaResult<()> {
490 tracing::info!("alter parallelism");
491 if self.barrier_manager.check_status_running().is_err() {
492 tracing::info!(
493 "alter parallelism is set to deferred mode because the system is in recovery state"
494 );
495 deferred = true;
496 }
497
498 self.stream_manager
499 .reschedule_streaming_job(job_id, target, deferred)
500 .await
501 }
502
503 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
504 self.drop_object(
505 ObjectType::Database,
506 database_id as _,
507 DropMode::Cascade,
508 None,
509 )
510 .await
511 }
512
513 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
514 self.metadata_manager
515 .catalog_controller
516 .create_schema(schema)
517 .await
518 }
519
520 async fn drop_schema(
521 &self,
522 schema_id: SchemaId,
523 drop_mode: DropMode,
524 ) -> MetaResult<NotificationVersion> {
525 self.drop_object(ObjectType::Schema, schema_id as _, drop_mode, None)
526 .await
527 }
528
529 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
531 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
532 .await
533 .context("failed to create source worker")?;
534
535 let (source_id, version) = self
536 .metadata_manager
537 .catalog_controller
538 .create_source(source)
539 .await?;
540 self.source_manager
541 .register_source_with_handle(source_id, handle)
542 .await;
543 Ok(version)
544 }
545
546 async fn drop_source(
547 &self,
548 source_id: SourceId,
549 drop_mode: DropMode,
550 ) -> MetaResult<NotificationVersion> {
551 self.drop_object(ObjectType::Source, source_id as _, drop_mode, None)
552 .await
553 }
554
555 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
558 self.metadata_manager
559 .catalog_controller
560 .alter_non_shared_source(source)
561 .await
562 }
563
564 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
565 self.metadata_manager
566 .catalog_controller
567 .create_function(function)
568 .await
569 }
570
571 async fn drop_function(&self, function_id: FunctionId) -> MetaResult<NotificationVersion> {
572 self.drop_object(
573 ObjectType::Function,
574 function_id as _,
575 DropMode::Restrict,
576 None,
577 )
578 .await
579 }
580
581 async fn create_view(
582 &self,
583 view: View,
584 dependencies: HashSet<ObjectId>,
585 ) -> MetaResult<NotificationVersion> {
586 self.metadata_manager
587 .catalog_controller
588 .create_view(view, dependencies)
589 .await
590 }
591
592 async fn drop_view(
593 &self,
594 view_id: ViewId,
595 drop_mode: DropMode,
596 ) -> MetaResult<NotificationVersion> {
597 self.drop_object(ObjectType::View, view_id as _, drop_mode, None)
598 .await
599 }
600
601 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
602 validate_connection(&connection).await?;
603 self.metadata_manager
604 .catalog_controller
605 .create_connection(connection)
606 .await
607 }
608
609 async fn drop_connection(
610 &self,
611 connection_id: ConnectionId,
612 ) -> MetaResult<NotificationVersion> {
613 self.drop_object(
614 ObjectType::Connection,
615 connection_id as _,
616 DropMode::Restrict,
617 None,
618 )
619 .await
620 }
621
622 async fn alter_database_param(
623 &self,
624 database_id: DatabaseId,
625 param: AlterDatabaseParam,
626 ) -> MetaResult<NotificationVersion> {
627 let (version, updated_db) = self
628 .metadata_manager
629 .catalog_controller
630 .alter_database_param(database_id, param)
631 .await?;
632 self.barrier_manager
634 .update_database_barrier(
635 database_id,
636 updated_db.barrier_interval_ms.map(|v| v as u32),
637 updated_db.checkpoint_frequency.map(|v| v as u64),
638 )
639 .await?;
640 Ok(version)
641 }
642
643 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
646 let secret_store_private_key = self
647 .env
648 .opts
649 .secret_store_private_key
650 .clone()
651 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
652
653 let encrypted_payload = SecretEncryption::encrypt(
654 secret_store_private_key.as_slice(),
655 secret.get_value().as_slice(),
656 )
657 .context(format!("failed to encrypt secret {}", secret.name))?;
658 Ok(encrypted_payload
659 .serialize()
660 .context(format!("failed to serialize secret {}", secret.name))?)
661 }
662
663 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
664 let secret_plain_payload = secret.value.clone();
667 let encrypted_payload = self.get_encrypted_payload(&secret)?;
668 secret.value = encrypted_payload;
669
670 self.metadata_manager
671 .catalog_controller
672 .create_secret(secret, secret_plain_payload)
673 .await
674 }
675
676 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
677 self.drop_object(ObjectType::Secret, secret_id as _, DropMode::Restrict, None)
678 .await
679 }
680
681 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
682 let secret_plain_payload = secret.value.clone();
683 let encrypted_payload = self.get_encrypted_payload(&secret)?;
684 secret.value = encrypted_payload;
685 self.metadata_manager
686 .catalog_controller
687 .alter_secret(secret, secret_plain_payload)
688 .await
689 }
690
691 async fn create_subscription(
692 &self,
693 mut subscription: Subscription,
694 ) -> MetaResult<NotificationVersion> {
695 tracing::debug!("create subscription");
696 let _permit = self
697 .creating_streaming_job_permits
698 .semaphore
699 .acquire()
700 .await
701 .unwrap();
702 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
703 self.metadata_manager
704 .catalog_controller
705 .create_subscription_catalog(&mut subscription)
706 .await?;
707 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
708 tracing::debug!(error = %err.as_report(), "failed to create subscription");
709 let _ = self
710 .metadata_manager
711 .catalog_controller
712 .try_abort_creating_subscription(subscription.id as _)
713 .await
714 .inspect_err(|e| {
715 tracing::error!(
716 error = %e.as_report(),
717 "failed to abort create subscription after failure"
718 );
719 });
720 return Err(err);
721 }
722
723 let version = self
724 .metadata_manager
725 .catalog_controller
726 .notify_create_subscription(subscription.id)
727 .await?;
728 tracing::debug!("finish create subscription");
729 Ok(version)
730 }
731
732 async fn drop_subscription(
733 &self,
734 subscription_id: SubscriptionId,
735 drop_mode: DropMode,
736 ) -> MetaResult<NotificationVersion> {
737 tracing::debug!("preparing drop subscription");
738 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
739 let subscription = self
740 .metadata_manager
741 .catalog_controller
742 .get_subscription_by_id(subscription_id)
743 .await?;
744 let table_id = subscription.dependent_table_id;
745 let database_id = subscription.database_id.into();
746 let (_, version) = self
747 .metadata_manager
748 .catalog_controller
749 .drop_object(ObjectType::Subscription, subscription_id as _, drop_mode)
750 .await?;
751 self.stream_manager
752 .drop_subscription(database_id, subscription_id as _, table_id)
753 .await;
754 tracing::debug!("finish drop subscription");
755 Ok(version)
756 }
757
758 #[await_tree::instrument]
760 pub(crate) async fn validate_cdc_table(
761 table: &Table,
762 table_fragments: &StreamJobFragments,
763 ) -> MetaResult<()> {
764 let stream_scan_fragment = table_fragments
765 .fragments
766 .values()
767 .filter(|f| f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan))
768 .exactly_one()
769 .ok()
770 .with_context(|| {
771 format!(
772 "expect exactly one stream scan fragment, got: {:?}",
773 table_fragments.fragments
774 )
775 })?;
776
777 assert_eq!(
778 stream_scan_fragment.actors.len(),
779 1,
780 "Stream scan fragment should have only one actor"
781 );
782 let mut found_cdc_scan = false;
783 match &stream_scan_fragment.nodes.node_body {
784 Some(NodeBody::StreamCdcScan(_)) => {
785 if Self::validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
786 .await?
787 {
788 found_cdc_scan = true;
789 }
790 }
791 Some(NodeBody::Project(_)) => {
793 for input in &stream_scan_fragment.nodes.input {
794 if Self::validate_cdc_table_inner(&input.node_body, table.id).await? {
795 found_cdc_scan = true;
796 }
797 }
798 }
799 _ => {
800 bail!("Unexpected node body for stream cdc scan");
801 }
802 };
803 if !found_cdc_scan {
804 bail!("No stream cdc scan node found in stream scan fragment");
805 }
806 Ok(())
807 }
808
809 async fn validate_cdc_table_inner(
810 node_body: &Option<NodeBody>,
811 table_id: u32,
812 ) -> MetaResult<bool> {
813 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
814 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
815 {
816 let options_with_secret = WithOptionsSecResolved::new(
817 cdc_table_desc.connect_properties.clone(),
818 cdc_table_desc.secret_refs.clone(),
819 );
820
821 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
822 props.init_from_pb_cdc_table_desc(cdc_table_desc);
823
824 let _enumerator = props
826 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
827 .await?;
828
829 tracing::debug!(?table_id, "validate cdc table success");
830 Ok(true)
831 } else {
832 Ok(false)
833 }
834 }
835
836 pub(crate) async fn inject_replace_table_job_for_table_sink(
840 &self,
841 tmp_id: u32,
842 mgr: &MetadataManager,
843 stream_ctx: StreamContext,
844 sink: Option<&Sink>,
845 creating_sink_table_fragments: Option<&StreamJobFragments>,
846 dropping_sink_id: Option<SinkId>,
847 streaming_job: &StreamingJob,
848 fragment_graph: StreamFragmentGraph,
849 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
850 let (mut replace_table_ctx, mut stream_job_fragments) = self
851 .build_replace_job(stream_ctx, streaming_job, fragment_graph, tmp_id as _)
852 .await?;
853
854 let target_table = streaming_job.table().unwrap();
855
856 if let Some(creating_sink_table_fragments) = creating_sink_table_fragments {
857 let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();
858 let sink = sink.expect("sink not found");
859 Self::inject_replace_table_plan_for_sink(
860 sink.id,
861 &sink_fragment,
862 target_table,
863 &mut replace_table_ctx,
864 stream_job_fragments.inner.union_fragment_for_table(),
865 None,
866 );
867 }
868
869 let [table_catalog]: [_; 1] = mgr
870 .get_table_catalog_by_ids(vec![target_table.id])
871 .await?
872 .try_into()
873 .expect("Target table should exist in sink into table");
874
875 {
876 let catalogs = mgr
877 .get_sink_catalog_by_ids(&table_catalog.incoming_sinks)
878 .await?;
879
880 for sink in catalogs {
881 let sink_id = sink.id;
882
883 if let Some(dropping_sink_id) = dropping_sink_id
884 && sink_id == (dropping_sink_id as u32)
885 {
886 continue;
887 };
888
889 let sink_table_fragments = mgr
890 .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id))
891 .await?;
892
893 let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
894
895 Self::inject_replace_table_plan_for_sink(
896 sink_id,
897 &sink_fragment,
898 target_table,
899 &mut replace_table_ctx,
900 stream_job_fragments.inner.union_fragment_for_table(),
901 Some(&sink.unique_identity()),
902 );
903 }
904 }
905
906 for fragment in stream_job_fragments.fragments.values() {
908 {
909 {
910 visit_stream_node_body(&fragment.nodes, |node| {
911 if let NodeBody::Merge(merge_node) = node {
912 let upstream_fragment_id = merge_node.upstream_fragment_id;
913 if let Some(external_upstream_fragment_downstreams) = replace_table_ctx
914 .upstream_fragment_downstreams
915 .get(&upstream_fragment_id)
916 {
917 let mut upstream_fragment_downstreams =
918 external_upstream_fragment_downstreams.iter().filter(
919 |downstream| {
920 downstream.downstream_fragment_id
921 == fragment.fragment_id
922 },
923 );
924 assert!(
925 upstream_fragment_downstreams.next().is_some(),
926 "All the mergers for the union should have been fully assigned beforehand."
927 );
928 } else {
929 let mut upstream_fragment_downstreams = stream_job_fragments
930 .downstreams
931 .get(&upstream_fragment_id)
932 .into_iter()
933 .flatten()
934 .filter(|downstream| {
935 downstream.downstream_fragment_id == fragment.fragment_id
936 });
937 assert!(
938 upstream_fragment_downstreams.next().is_some(),
939 "All the mergers for the union should have been fully assigned beforehand."
940 );
941 }
942 }
943 });
944 }
945 }
946 }
947
948 Ok((replace_table_ctx, stream_job_fragments))
949 }
950
951 pub(crate) fn inject_replace_table_plan_for_sink(
952 sink_id: u32,
953 sink_fragment: &Fragment,
954 table: &Table,
955 replace_table_ctx: &mut ReplaceStreamJobContext,
956 union_fragment: &mut Fragment,
957 unique_identity: Option<&str>,
958 ) {
959 let sink_fields = sink_fragment.nodes.fields.clone();
960
961 let output_indices = sink_fields
962 .iter()
963 .enumerate()
964 .map(|(idx, _)| idx as _)
965 .collect_vec();
966
967 let dist_key_indices = table.distribution_key.iter().map(|i| *i as _).collect_vec();
968
969 let sink_fragment_downstreams = replace_table_ctx
970 .upstream_fragment_downstreams
971 .entry(sink_fragment.fragment_id)
972 .or_default();
973
974 {
975 sink_fragment_downstreams.push(DownstreamFragmentRelation {
976 downstream_fragment_id: union_fragment.fragment_id,
977 dispatcher_type: DispatcherType::Hash,
978 dist_key_indices: dist_key_indices.clone(),
979 output_mapping: PbDispatchOutputMapping::simple(output_indices),
980 });
981 }
982
983 let upstream_fragment_id = sink_fragment.fragment_id;
984
985 let mut max_operator_id = 0;
986
987 visit_stream_node(&union_fragment.nodes, |node| {
988 max_operator_id = max_operator_id.max(node.operator_id);
989 });
990
991 {
992 {
993 visit_stream_node_cont_mut(&mut union_fragment.nodes, |node| {
994 if let Some(NodeBody::Union(_)) = &mut node.node_body {
995 for input_project_node in &mut node.input {
996 if let Some(NodeBody::Project(_)) = &mut input_project_node.node_body {
997 let merge_stream_node =
998 input_project_node.input.iter_mut().exactly_one().unwrap();
999
1000 if input_project_node.identity.as_str()
1002 != unique_identity
1003 .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
1004 {
1005 continue;
1006 }
1007
1008 if let Some(NodeBody::Merge(merge_node)) =
1009 &mut merge_stream_node.node_body
1010 {
1011 {
1012 merge_stream_node.identity =
1013 format!("MergeExecutor(from sink {})", sink_id);
1014 merge_stream_node.operator_id = max_operator_id + 1;
1015
1016 input_project_node.identity =
1017 format!("ProjectExecutor(from sink {})", sink_id);
1018 input_project_node.operator_id = max_operator_id + 2;
1019 }
1020
1021 **merge_node = {
1022 #[expect(deprecated)]
1023 MergeNode {
1024 upstream_actor_id: vec![],
1025 upstream_fragment_id,
1026 upstream_dispatcher_type: PbDispatcherType::Hash as _,
1027 fields: sink_fields.to_vec(),
1028 }
1029 };
1030
1031 merge_stream_node.fields = sink_fields.to_vec();
1032
1033 return false;
1034 }
1035 }
1036 }
1037 }
1038 true
1039 });
1040 }
1041 }
1042 }
1043
1044 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1047 pub async fn create_streaming_job(
1048 &self,
1049 mut streaming_job: StreamingJob,
1050 fragment_graph: StreamFragmentGraphProto,
1051 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1052 dependencies: HashSet<ObjectId>,
1053 specific_resource_group: Option<String>,
1054 if_not_exists: bool,
1055 ) -> MetaResult<NotificationVersion> {
1056 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1057 let check_ret = self
1058 .metadata_manager
1059 .catalog_controller
1060 .create_job_catalog(
1061 &mut streaming_job,
1062 &ctx,
1063 &fragment_graph.parallelism,
1064 fragment_graph.max_parallelism as _,
1065 dependencies,
1066 specific_resource_group.clone(),
1067 )
1068 .await;
1069 if let Err(meta_err) = check_ret {
1070 if !if_not_exists {
1071 return Err(meta_err);
1072 }
1073 if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1074 if streaming_job.create_type() == CreateType::Foreground {
1075 let database_id = streaming_job.database_id();
1076 return self
1077 .metadata_manager
1078 .wait_streaming_job_finished(database_id.into(), *job_id)
1079 .await;
1080 } else {
1081 return Ok(IGNORED_NOTIFICATION_VERSION);
1082 }
1083 } else {
1084 return Err(meta_err);
1085 }
1086 }
1087 let job_id = streaming_job.id();
1088
1089 tracing::debug!(
1090 id = job_id,
1091 definition = streaming_job.definition(),
1092 create_type = streaming_job.create_type().as_str_name(),
1093 job_type = ?streaming_job.job_type(),
1094 "starting streaming job",
1095 );
1096 let _permit = self
1097 .creating_streaming_job_permits
1098 .semaphore
1099 .acquire()
1100 .instrument_await("acquire_creating_streaming_job_permit")
1101 .await
1102 .unwrap();
1103 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1104
1105 let name = streaming_job.name();
1106 let definition = streaming_job.definition();
1107 let source_id = match &streaming_job {
1108 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1109 _ => None,
1110 };
1111
1112 match self
1114 .create_streaming_job_inner(
1115 ctx,
1116 streaming_job,
1117 fragment_graph,
1118 affected_table_replace_info,
1119 specific_resource_group,
1120 )
1121 .await
1122 {
1123 Ok(version) => Ok(version),
1124 Err(err) => {
1125 tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
1126 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1127 id: job_id,
1128 name,
1129 definition,
1130 error: err.as_report().to_string(),
1131 };
1132 self.env.event_log_manager_ref().add_event_logs(vec![
1133 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1134 ]);
1135 let (aborted, _) = self
1136 .metadata_manager
1137 .catalog_controller
1138 .try_abort_creating_streaming_job(job_id as _, false)
1139 .await?;
1140 if aborted {
1141 tracing::warn!(id = job_id, "aborted streaming job");
1142 if let Some(source_id) = source_id {
1144 self.source_manager
1145 .apply_source_change(SourceChange::DropSource {
1146 dropped_source_ids: vec![source_id as SourceId],
1147 })
1148 .await;
1149 }
1150 }
1151 Err(err)
1152 }
1153 }
1154 }
1155
1156 #[await_tree::instrument(boxed)]
1157 async fn create_streaming_job_inner(
1158 &self,
1159 ctx: StreamContext,
1160 mut streaming_job: StreamingJob,
1161 fragment_graph: StreamFragmentGraphProto,
1162 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1163 specific_resource_group: Option<String>,
1164 ) -> MetaResult<NotificationVersion> {
1165 let mut fragment_graph =
1166 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1167 streaming_job.set_info_from_graph(&fragment_graph);
1168
1169 let incomplete_internal_tables = fragment_graph
1171 .incomplete_internal_tables()
1172 .into_values()
1173 .collect_vec();
1174 let table_id_map = self
1175 .metadata_manager
1176 .catalog_controller
1177 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1178 .await?;
1179 fragment_graph.refill_internal_table_ids(table_id_map);
1180
1181 let affected_table_replace_info = match affected_table_replace_info {
1182 Some(replace_table_info) => {
1183 assert!(
1184 specific_resource_group.is_none(),
1185 "specific_resource_group is not supported for replace table (alter column or sink into table)"
1186 );
1187
1188 let ReplaceStreamJobInfo {
1189 mut streaming_job,
1190 fragment_graph,
1191 ..
1192 } = replace_table_info;
1193
1194 let original_max_parallelism = self
1196 .metadata_manager
1197 .get_job_max_parallelism(streaming_job.id().into())
1198 .await?;
1199 let fragment_graph = PbStreamFragmentGraph {
1200 max_parallelism: original_max_parallelism as _,
1201 ..fragment_graph
1202 };
1203
1204 let fragment_graph =
1205 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1206 streaming_job.set_info_from_graph(&fragment_graph);
1207 let streaming_job = streaming_job;
1208
1209 Some((streaming_job, fragment_graph))
1210 }
1211 None => None,
1212 };
1213
1214 tracing::debug!(id = streaming_job.id(), "building streaming job");
1216 let (ctx, stream_job_fragments) = self
1217 .build_stream_job(
1218 ctx,
1219 streaming_job,
1220 fragment_graph,
1221 affected_table_replace_info,
1222 specific_resource_group,
1223 )
1224 .await?;
1225
1226 let streaming_job = &ctx.streaming_job;
1227
1228 match streaming_job {
1229 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1230 Self::validate_cdc_table(table, &stream_job_fragments).await?;
1231 }
1232 StreamingJob::Table(Some(source), ..) => {
1233 self.source_manager.register_source(source).await?;
1235 let connector_name = source
1236 .get_with_properties()
1237 .get(UPSTREAM_SOURCE_KEY)
1238 .cloned();
1239 let attr = source.info.as_ref().map(|source_info| {
1240 jsonbb::json!({
1241 "format": source_info.format().as_str_name(),
1242 "encode": source_info.row_encode().as_str_name(),
1243 })
1244 });
1245 report_create_object(
1246 streaming_job.id(),
1247 "source",
1248 PbTelemetryDatabaseObject::Source,
1249 connector_name,
1250 attr,
1251 );
1252 }
1253 StreamingJob::Sink(sink, _) => {
1254 validate_sink(sink).await?;
1256 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1257 let attr = sink.format_desc.as_ref().map(|sink_info| {
1258 jsonbb::json!({
1259 "format": sink_info.format().as_str_name(),
1260 "encode": sink_info.encode().as_str_name(),
1261 })
1262 });
1263 report_create_object(
1264 streaming_job.id(),
1265 "sink",
1266 PbTelemetryDatabaseObject::Sink,
1267 connector_name,
1268 attr,
1269 );
1270 }
1271 StreamingJob::Source(source) => {
1272 self.source_manager.register_source(source).await?;
1274 let connector_name = source
1275 .get_with_properties()
1276 .get(UPSTREAM_SOURCE_KEY)
1277 .cloned();
1278 let attr = source.info.as_ref().map(|source_info| {
1279 jsonbb::json!({
1280 "format": source_info.format().as_str_name(),
1281 "encode": source_info.row_encode().as_str_name(),
1282 })
1283 });
1284 report_create_object(
1285 streaming_job.id(),
1286 "source",
1287 PbTelemetryDatabaseObject::Source,
1288 connector_name,
1289 attr,
1290 );
1291 }
1292 _ => {}
1293 }
1294
1295 self.metadata_manager
1296 .catalog_controller
1297 .prepare_streaming_job(&stream_job_fragments, streaming_job, false)
1298 .await?;
1299
1300 let stream_job_id = streaming_job.id();
1302 match (streaming_job.create_type(), &streaming_job) {
1303 (CreateType::Unspecified, _)
1305 | (CreateType::Foreground, _)
1306 | (CreateType::Background, StreamingJob::Sink(_, _)) => {
1307 let version = self
1308 .stream_manager
1309 .create_streaming_job(stream_job_fragments, ctx, None)
1310 .await?;
1311 Ok(version)
1312 }
1313 (CreateType::Background, _) => {
1314 let await_tree_key = format!("Background DDL Worker ({})", streaming_job.id());
1315 let await_tree_span =
1316 span!("{:?}({})", streaming_job.job_type(), streaming_job.name());
1317
1318 let ctrl = self.clone();
1319 let (tx, rx) = oneshot::channel();
1320 let fut = async move {
1321 let _ = ctrl
1322 .stream_manager
1323 .create_streaming_job(stream_job_fragments, ctx, Some(tx))
1324 .await
1325 .inspect_err(|err| {
1326 tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
1327 });
1328 };
1329
1330 let fut = (self.env.await_tree_reg())
1331 .register(await_tree_key, await_tree_span)
1332 .instrument(fut);
1333 tokio::spawn(fut);
1334
1335 rx.instrument_await("wait_background_streaming_job_creation_started")
1336 .await
1337 .map_err(|_| {
1338 anyhow!(
1339 "failed to receive create streaming job result of job: {}",
1340 stream_job_id
1341 )
1342 })??;
1343 Ok(IGNORED_NOTIFICATION_VERSION)
1344 }
1345 }
1346 }
1347
1348 pub async fn drop_object(
1350 &self,
1351 object_type: ObjectType,
1352 object_id: ObjectId,
1353 drop_mode: DropMode,
1354 target_replace_info: Option<ReplaceStreamJobInfo>,
1355 ) -> MetaResult<NotificationVersion> {
1356 let (release_ctx, mut version) = self
1357 .metadata_manager
1358 .catalog_controller
1359 .drop_object(object_type, object_id, drop_mode)
1360 .await?;
1361
1362 if let Some(replace_table_info) = target_replace_info {
1363 let stream_ctx =
1364 StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap());
1365
1366 let ReplaceStreamJobInfo {
1367 mut streaming_job,
1368 fragment_graph,
1369 ..
1370 } = replace_table_info;
1371
1372 let sink_id = if let ObjectType::Sink = object_type {
1373 object_id as _
1374 } else {
1375 panic!("additional replace table event only occurs when dropping sink into table")
1376 };
1377
1378 let original_max_parallelism = self
1380 .metadata_manager
1381 .get_job_max_parallelism(streaming_job.id().into())
1382 .await?;
1383 let fragment_graph = PbStreamFragmentGraph {
1384 max_parallelism: original_max_parallelism as _,
1385 ..fragment_graph
1386 };
1387
1388 let fragment_graph =
1389 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1390 streaming_job.set_info_from_graph(&fragment_graph);
1391 let streaming_job = streaming_job;
1392
1393 streaming_job.table().expect("should be table job");
1394
1395 tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink");
1396 let tmp_id = self
1397 .metadata_manager
1398 .catalog_controller
1399 .create_job_catalog_for_replace(
1400 &streaming_job,
1401 &stream_ctx,
1402 &fragment_graph.specified_parallelism(),
1403 fragment_graph.max_parallelism(),
1404 )
1405 .await? as u32;
1406
1407 let (ctx, stream_job_fragments) = self
1408 .inject_replace_table_job_for_table_sink(
1409 tmp_id,
1410 &self.metadata_manager,
1411 stream_ctx,
1412 None,
1413 None,
1414 Some(sink_id),
1415 &streaming_job,
1416 fragment_graph,
1417 )
1418 .await?;
1419
1420 let result: MetaResult<_> = try {
1421 let replace_upstream = ctx.replace_upstream.clone();
1422
1423 self.metadata_manager
1424 .catalog_controller
1425 .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1426 .await?;
1427
1428 self.stream_manager
1429 .replace_stream_job(stream_job_fragments, ctx)
1430 .await?;
1431
1432 replace_upstream
1433 };
1434
1435 version = match result {
1436 Ok(replace_upstream) => {
1437 let version = self
1438 .metadata_manager
1439 .catalog_controller
1440 .finish_replace_streaming_job(
1441 tmp_id as _,
1442 streaming_job,
1443 replace_upstream,
1444 SinkIntoTableContext {
1445 creating_sink_id: None,
1446 dropping_sink_id: Some(sink_id),
1447 updated_sink_catalogs: vec![],
1448 },
1449 None, )
1451 .await?;
1452 Ok(version)
1453 }
1454 Err(err) => {
1455 tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table");
1456 let _ = self.metadata_manager
1457 .catalog_controller
1458 .try_abort_replacing_streaming_job(tmp_id as _)
1459 .await
1460 .inspect_err(|err| {
1461 tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table");
1462 });
1463 Err(err)
1464 }
1465 }?;
1466 }
1467
1468 let ReleaseContext {
1469 database_id,
1470 removed_streaming_job_ids,
1471 removed_state_table_ids,
1472 removed_source_ids,
1473 removed_secret_ids: secret_ids,
1474 removed_source_fragments,
1475 removed_actors,
1476 removed_fragments,
1477 } = release_ctx;
1478
1479 let _guard = self.source_manager.pause_tick().await;
1480 self.stream_manager
1481 .drop_streaming_jobs(
1482 risingwave_common::catalog::DatabaseId::new(database_id as _),
1483 removed_actors.iter().map(|id| *id as _).collect(),
1484 removed_streaming_job_ids,
1485 removed_state_table_ids,
1486 removed_fragments.iter().map(|id| *id as _).collect(),
1487 )
1488 .await;
1489
1490 self.source_manager
1493 .apply_source_change(SourceChange::DropSource {
1494 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1495 })
1496 .await;
1497
1498 let dropped_source_fragments = removed_source_fragments
1501 .into_iter()
1502 .map(|(source_id, fragments)| {
1503 (
1504 source_id,
1505 fragments.into_iter().map(|id| id as u32).collect(),
1506 )
1507 })
1508 .collect();
1509 let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
1510 self.source_manager
1511 .apply_source_change(SourceChange::DropMv {
1512 dropped_source_fragments,
1513 dropped_actors,
1514 })
1515 .await;
1516
1517 for secret in secret_ids {
1519 LocalSecretManager::global().remove_secret(secret as _);
1520 }
1521 Ok(version)
1522 }
1523
1524 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1526 pub async fn replace_job(
1527 &self,
1528 mut streaming_job: StreamingJob,
1529 fragment_graph: StreamFragmentGraphProto,
1530 ) -> MetaResult<NotificationVersion> {
1531 match &mut streaming_job {
1532 StreamingJob::Table(..)
1533 | StreamingJob::Source(..)
1534 | StreamingJob::MaterializedView(..) => {}
1535 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1536 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1537 }
1538 }
1539
1540 let job_id = streaming_job.id();
1541
1542 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1543 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1544
1545 let original_max_parallelism = self
1547 .metadata_manager
1548 .get_job_max_parallelism(streaming_job.id().into())
1549 .await?;
1550 let fragment_graph = PbStreamFragmentGraph {
1551 max_parallelism: original_max_parallelism as _,
1552 ..fragment_graph
1553 };
1554
1555 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1557 streaming_job.set_info_from_graph(&fragment_graph);
1558
1559 let streaming_job = streaming_job;
1561
1562 let tmp_id = self
1563 .metadata_manager
1564 .catalog_controller
1565 .create_job_catalog_for_replace(
1566 &streaming_job,
1567 &ctx,
1568 &fragment_graph.specified_parallelism(),
1569 fragment_graph.max_parallelism(),
1570 )
1571 .await?;
1572
1573 tracing::debug!(id = job_id, "building replace streaming job");
1574 let mut updated_sink_catalogs = vec![];
1575
1576 let mut drop_table_connector_ctx = None;
1577 let result: MetaResult<_> = try {
1578 let (mut ctx, mut stream_job_fragments) = self
1579 .build_replace_job(ctx, &streaming_job, fragment_graph, tmp_id as _)
1580 .await?;
1581 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1582
1583 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1585 let catalogs = self
1586 .metadata_manager
1587 .get_sink_catalog_by_ids(&table.incoming_sinks)
1588 .await?;
1589
1590 for sink in catalogs {
1591 let sink_id = &sink.id;
1592
1593 let sink_table_fragments = self
1594 .metadata_manager
1595 .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(
1596 *sink_id,
1597 ))
1598 .await?;
1599
1600 let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
1601
1602 Self::inject_replace_table_plan_for_sink(
1603 *sink_id,
1604 &sink_fragment,
1605 table,
1606 &mut ctx,
1607 stream_job_fragments.inner.union_fragment_for_table(),
1608 Some(&sink.unique_identity()),
1609 );
1610
1611 if sink.original_target_columns.is_empty() {
1612 updated_sink_catalogs.push(sink.id as _);
1613 }
1614 }
1615 }
1616
1617 let replace_upstream = ctx.replace_upstream.clone();
1618
1619 self.metadata_manager
1620 .catalog_controller
1621 .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1622 .await?;
1623
1624 self.stream_manager
1625 .replace_stream_job(stream_job_fragments, ctx)
1626 .await?;
1627 replace_upstream
1628 };
1629
1630 match result {
1631 Ok(replace_upstream) => {
1632 let version = self
1633 .metadata_manager
1634 .catalog_controller
1635 .finish_replace_streaming_job(
1636 tmp_id,
1637 streaming_job,
1638 replace_upstream,
1639 SinkIntoTableContext {
1640 creating_sink_id: None,
1641 dropping_sink_id: None,
1642 updated_sink_catalogs,
1643 },
1644 drop_table_connector_ctx.as_ref(),
1645 )
1646 .await?;
1647 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1648 self.source_manager
1649 .apply_source_change(SourceChange::DropSource {
1650 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1651 })
1652 .await;
1653 }
1654 Ok(version)
1655 }
1656 Err(err) => {
1657 tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
1658 let _ = self.metadata_manager
1659 .catalog_controller
1660 .try_abort_replacing_streaming_job(tmp_id)
1661 .await.inspect_err(|err| {
1662 tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
1663 });
1664 Err(err)
1665 }
1666 }
1667 }
1668
1669 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" })]
1670 async fn drop_streaming_job(
1671 &self,
1672 job_id: StreamingJobId,
1673 drop_mode: DropMode,
1674 target_replace_info: Option<ReplaceStreamJobInfo>,
1675 ) -> MetaResult<NotificationVersion> {
1676 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1677
1678 let (object_id, object_type) = match job_id {
1679 StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
1680 StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
1681 StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
1682 StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
1683 };
1684
1685 let version = self
1686 .drop_object(object_type, object_id, drop_mode, target_replace_info)
1687 .await?;
1688 #[cfg(not(madsim))]
1689 if let StreamingJobId::Sink(sink_id) = job_id {
1690 let db = self.env.meta_store_ref().conn.clone();
1693 clean_all_rows_by_sink_id(&db, sink_id).await?;
1694 }
1695 Ok(version)
1696 }
1697
1698 fn resolve_stream_parallelism(
1702 &self,
1703 specified: Option<NonZeroUsize>,
1704 max: NonZeroUsize,
1705 cluster_info: &StreamingClusterInfo,
1706 resource_group: String,
1707 ) -> MetaResult<NonZeroUsize> {
1708 let available = cluster_info.parallelism(&resource_group);
1709 let Some(available) = NonZeroUsize::new(available) else {
1710 bail_unavailable!(
1711 "no available slots to schedule in resource group \"{}\", \
1712 have you allocated any compute nodes within this resource group?",
1713 resource_group
1714 );
1715 };
1716
1717 if let Some(specified) = specified {
1718 if specified > max {
1719 bail_invalid_parameter!(
1720 "specified parallelism {} should not exceed max parallelism {}",
1721 specified,
1722 max,
1723 );
1724 }
1725 if specified > available {
1726 bail_unavailable!(
1727 "insufficient parallelism to schedule in resource group \"{}\", \
1728 required: {}, available: {}",
1729 resource_group,
1730 specified,
1731 available,
1732 );
1733 }
1734 Ok(specified)
1735 } else {
1736 let default_parallelism = match self.env.opts.default_parallelism {
1738 DefaultParallelism::Full => available,
1739 DefaultParallelism::Default(num) => {
1740 if num > available {
1741 bail_unavailable!(
1742 "insufficient parallelism to schedule in resource group \"{}\", \
1743 required: {}, available: {}",
1744 resource_group,
1745 num,
1746 available,
1747 );
1748 }
1749 num
1750 }
1751 };
1752
1753 if default_parallelism > max {
1754 tracing::warn!(
1755 max_parallelism = max.get(),
1756 resource_group,
1757 "too many parallelism available, use max parallelism instead",
1758 );
1759 }
1760 Ok(default_parallelism.min(max))
1761 }
1762 }
1763
1764 #[await_tree::instrument]
1770 pub(crate) async fn build_stream_job(
1771 &self,
1772 stream_ctx: StreamContext,
1773 mut stream_job: StreamingJob,
1774 fragment_graph: StreamFragmentGraph,
1775 affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>,
1776 specific_resource_group: Option<String>,
1777 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1778 let id = stream_job.id();
1779 let specified_parallelism = fragment_graph.specified_parallelism();
1780 let expr_context = stream_ctx.to_expr_context();
1781 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1782
1783 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1785
1786 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1790 fragment_graph.collect_snapshot_backfill_info()?;
1791 assert!(
1792 snapshot_backfill_info
1793 .iter()
1794 .chain([&cross_db_snapshot_backfill_info])
1795 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1796 .all(|backfill_epoch| backfill_epoch.is_none()),
1797 "should not set backfill epoch when initially build the job: {:?} {:?}",
1798 snapshot_backfill_info,
1799 cross_db_snapshot_backfill_info
1800 );
1801
1802 self.metadata_manager
1804 .catalog_controller
1805 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1806 .await?;
1807
1808 let upstream_table_ids = fragment_graph
1809 .dependent_table_ids()
1810 .iter()
1811 .filter(|id| {
1812 !cross_db_snapshot_backfill_info
1813 .upstream_mv_table_id_to_backfill_epoch
1814 .contains_key(id)
1815 })
1816 .cloned()
1817 .collect();
1818
1819 let (upstream_root_fragments, existing_actor_location) = self
1820 .metadata_manager
1821 .get_upstream_root_fragments(&upstream_table_ids)
1822 .await?;
1823
1824 if snapshot_backfill_info.is_some() {
1825 match stream_job {
1826 StreamingJob::MaterializedView(_)
1827 | StreamingJob::Sink(_, _)
1828 | StreamingJob::Index(_, _) => {}
1829 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1830 return Err(
1831 anyhow!("snapshot_backfill not enabled for table and source").into(),
1832 );
1833 }
1834 }
1835 }
1836
1837 let upstream_actors = upstream_root_fragments
1838 .values()
1839 .map(|fragment| {
1840 (
1841 fragment.fragment_id,
1842 fragment.actors.iter().map(|actor| actor.actor_id).collect(),
1843 )
1844 })
1845 .collect();
1846
1847 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1848 fragment_graph,
1849 upstream_root_fragments,
1850 existing_actor_location,
1851 (&stream_job).into(),
1852 )?;
1853
1854 let resource_group = match specific_resource_group {
1855 None => {
1856 self.metadata_manager
1857 .get_database_resource_group(stream_job.database_id() as ObjectId)
1858 .await?
1859 }
1860 Some(resource_group) => resource_group,
1861 };
1862
1863 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1865
1866 let parallelism = self.resolve_stream_parallelism(
1867 specified_parallelism,
1868 max_parallelism,
1869 &cluster_info,
1870 resource_group.clone(),
1871 )?;
1872
1873 let parallelism = self
1874 .env
1875 .system_params_reader()
1876 .await
1877 .adaptive_parallelism_strategy()
1878 .compute_target_parallelism(parallelism.get());
1879
1880 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1881 let actor_graph_builder = ActorGraphBuilder::new(
1882 id,
1883 resource_group,
1884 complete_graph,
1885 cluster_info,
1886 parallelism,
1887 )?;
1888
1889 let ActorGraphBuildResult {
1890 graph,
1891 downstream_fragment_relations,
1892 building_locations,
1893 existing_locations,
1894 upstream_fragment_downstreams,
1895 new_no_shuffle,
1896 replace_upstream,
1897 ..
1898 } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
1899 assert!(replace_upstream.is_empty());
1900
1901 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1908 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1909 _ => TableParallelism::Fixed(parallelism.get()),
1910 };
1911
1912 let stream_job_fragments = StreamJobFragments::new(
1913 id.into(),
1914 graph,
1915 &building_locations.actor_locations,
1916 stream_ctx.clone(),
1917 table_parallelism,
1918 max_parallelism.get(),
1919 );
1920 let internal_tables = stream_job_fragments.internal_tables();
1921
1922 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1923 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1924 }
1925
1926 let replace_table_job_info = match affected_table_replace_info {
1927 Some((table_stream_job, fragment_graph)) => {
1928 if snapshot_backfill_info.is_some() {
1929 return Err(anyhow!(
1930 "snapshot backfill should not have replace table info: {table_stream_job:?}"
1931 )
1932 .into());
1933 }
1934 let StreamingJob::Sink(sink, target_table) = &mut stream_job else {
1935 bail!("additional replace table event only occurs when sinking into table");
1936 };
1937
1938 table_stream_job.table().expect("should be table job");
1939 let tmp_id = self
1940 .metadata_manager
1941 .catalog_controller
1942 .create_job_catalog_for_replace(
1943 &table_stream_job,
1944 &stream_ctx,
1945 &fragment_graph.specified_parallelism(),
1946 fragment_graph.max_parallelism(),
1947 )
1948 .await? as u32;
1949
1950 let (context, table_fragments) = self
1951 .inject_replace_table_job_for_table_sink(
1952 tmp_id,
1953 &self.metadata_manager,
1954 stream_ctx,
1955 Some(sink),
1956 Some(&stream_job_fragments),
1957 None,
1958 &table_stream_job,
1959 fragment_graph,
1960 )
1961 .await?;
1962 must_match!(&table_stream_job, StreamingJob::Table(source, table, _) => {
1966 *target_table = Some((table.clone(), source.clone()));
1968 });
1969
1970 Some((table_stream_job, context, table_fragments))
1971 }
1972 None => None,
1973 };
1974
1975 let ctx = CreateStreamingJobContext {
1976 upstream_fragment_downstreams,
1977 new_no_shuffle,
1978 upstream_actors,
1979 internal_tables,
1980 building_locations,
1981 existing_locations,
1982 definition: stream_job.definition(),
1983 mv_table_id: stream_job.mv_table(),
1984 create_type: stream_job.create_type(),
1985 job_type: (&stream_job).into(),
1986 streaming_job: stream_job,
1987 replace_table_job_info,
1988 option: CreateStreamingJobOption {},
1989 snapshot_backfill_info,
1990 cross_db_snapshot_backfill_info,
1991 fragment_backfill_ordering,
1992 };
1993
1994 Ok((
1995 ctx,
1996 StreamJobFragmentsToCreate {
1997 inner: stream_job_fragments,
1998 downstreams: downstream_fragment_relations,
1999 },
2000 ))
2001 }
2002
2003 pub(crate) async fn build_replace_job(
2009 &self,
2010 stream_ctx: StreamContext,
2011 stream_job: &StreamingJob,
2012 mut fragment_graph: StreamFragmentGraph,
2013 tmp_job_id: TableId,
2014 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
2015 match &stream_job {
2016 StreamingJob::Table(..)
2017 | StreamingJob::Source(..)
2018 | StreamingJob::MaterializedView(..) => {}
2019 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
2020 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
2021 }
2022 }
2023
2024 let id = stream_job.id();
2025 let expr_context = stream_ctx.to_expr_context();
2026
2027 let mut drop_table_associated_source_id = None;
2029 if let StreamingJob::Table(None, _, _) = &stream_job {
2030 drop_table_associated_source_id = self
2031 .metadata_manager
2032 .get_table_associated_source_id(id as _)
2033 .await?;
2034 }
2035
2036 let old_fragments = self
2037 .metadata_manager
2038 .get_job_fragments_by_id(&id.into())
2039 .await?;
2040 let old_internal_table_ids = old_fragments.internal_table_ids();
2041
2042 let mut drop_table_connector_ctx = None;
2044 if let Some(to_remove_source_id) = drop_table_associated_source_id {
2045 debug_assert!(old_internal_table_ids.len() == 1);
2047
2048 drop_table_connector_ctx = Some(DropTableConnectorContext {
2049 to_change_streaming_job_id: id as i32,
2052 to_remove_state_table_id: old_internal_table_ids[0] as i32, to_remove_source_id,
2054 });
2055 } else if stream_job.is_materialized_view() {
2056 let old_fragments_upstreams = self
2059 .metadata_manager
2060 .catalog_controller
2061 .upstream_fragments(old_fragments.fragment_ids())
2062 .await?;
2063
2064 let old_state_graph =
2065 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2066 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2067 let mapping =
2068 state_match::match_graph_internal_tables(&new_state_graph, &old_state_graph)
2069 .context("incompatible altering on the streaming job states")?;
2070
2071 fragment_graph.fit_internal_table_ids_with_mapping(mapping);
2072 } else {
2073 let old_internal_tables = self
2076 .metadata_manager
2077 .get_table_catalog_by_ids(old_internal_table_ids)
2078 .await?;
2079 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2080 }
2081
2082 let original_root_fragment = old_fragments
2085 .root_fragment()
2086 .expect("root fragment not found");
2087
2088 let job_type = StreamingJobType::from(stream_job);
2089
2090 let (downstream_fragments, downstream_actor_location) =
2092 self.metadata_manager.get_downstream_fragments(id).await?;
2093
2094 let complete_graph = match &job_type {
2096 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2097 CompleteStreamFragmentGraph::with_downstreams(
2098 fragment_graph,
2099 original_root_fragment.fragment_id,
2100 downstream_fragments,
2101 downstream_actor_location,
2102 job_type,
2103 )?
2104 }
2105 StreamingJobType::Table(TableJobType::SharedCdcSource)
2106 | StreamingJobType::MaterializedView => {
2107 let (upstream_root_fragments, upstream_actor_location) = self
2109 .metadata_manager
2110 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2111 .await?;
2112
2113 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2114 fragment_graph,
2115 upstream_root_fragments,
2116 upstream_actor_location,
2117 original_root_fragment.fragment_id,
2118 downstream_fragments,
2119 downstream_actor_location,
2120 job_type,
2121 )?
2122 }
2123 _ => unreachable!(),
2124 };
2125
2126 let resource_group = self
2127 .metadata_manager
2128 .get_existing_job_resource_group(id as ObjectId)
2129 .await?;
2130
2131 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2133
2134 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2137 .expect("The number of actors in the original table fragment should be greater than 0");
2138
2139 let actor_graph_builder = ActorGraphBuilder::new(
2140 id,
2141 resource_group,
2142 complete_graph,
2143 cluster_info,
2144 parallelism,
2145 )?;
2146
2147 let ActorGraphBuildResult {
2148 graph,
2149 downstream_fragment_relations,
2150 building_locations,
2151 existing_locations,
2152 upstream_fragment_downstreams,
2153 replace_upstream,
2154 new_no_shuffle,
2155 ..
2156 } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
2157
2158 if matches!(
2160 job_type,
2161 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2162 ) {
2163 assert!(upstream_fragment_downstreams.is_empty());
2164 }
2165
2166 let stream_job_fragments = StreamJobFragments::new(
2170 (tmp_job_id as u32).into(),
2171 graph,
2172 &building_locations.actor_locations,
2173 stream_ctx,
2174 old_fragments.assigned_parallelism,
2175 old_fragments.max_parallelism,
2176 );
2177
2178 let ctx = ReplaceStreamJobContext {
2182 old_fragments,
2183 replace_upstream,
2184 new_no_shuffle,
2185 upstream_fragment_downstreams,
2186 building_locations,
2187 existing_locations,
2188 streaming_job: stream_job.clone(),
2189 tmp_id: tmp_job_id as _,
2190 drop_table_connector_ctx,
2191 };
2192
2193 Ok((
2194 ctx,
2195 StreamJobFragmentsToCreate {
2196 inner: stream_job_fragments,
2197 downstreams: downstream_fragment_relations,
2198 },
2199 ))
2200 }
2201
2202 async fn alter_name(
2203 &self,
2204 relation: alter_name_request::Object,
2205 new_name: &str,
2206 ) -> MetaResult<NotificationVersion> {
2207 let (obj_type, id) = match relation {
2208 alter_name_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2209 alter_name_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2210 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id as ObjectId),
2211 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2212 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2213 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2214 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2215 alter_name_request::Object::SubscriptionId(id) => {
2216 (ObjectType::Subscription, id as ObjectId)
2217 }
2218 };
2219 self.metadata_manager
2220 .catalog_controller
2221 .alter_name(obj_type, id, new_name)
2222 .await
2223 }
2224
2225 async fn alter_swap_rename(
2226 &self,
2227 object: alter_swap_rename_request::Object,
2228 ) -> MetaResult<NotificationVersion> {
2229 let (obj_type, src_id, dst_id) = match object {
2230 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2231 alter_swap_rename_request::Object::Table(objs) => {
2232 let (src_id, dst_id) = (
2233 objs.src_object_id as ObjectId,
2234 objs.dst_object_id as ObjectId,
2235 );
2236 (ObjectType::Table, src_id, dst_id)
2237 }
2238 alter_swap_rename_request::Object::View(objs) => {
2239 let (src_id, dst_id) = (
2240 objs.src_object_id as ObjectId,
2241 objs.dst_object_id as ObjectId,
2242 );
2243 (ObjectType::View, src_id, dst_id)
2244 }
2245 alter_swap_rename_request::Object::Source(objs) => {
2246 let (src_id, dst_id) = (
2247 objs.src_object_id as ObjectId,
2248 objs.dst_object_id as ObjectId,
2249 );
2250 (ObjectType::Source, src_id, dst_id)
2251 }
2252 alter_swap_rename_request::Object::Sink(objs) => {
2253 let (src_id, dst_id) = (
2254 objs.src_object_id as ObjectId,
2255 objs.dst_object_id as ObjectId,
2256 );
2257 (ObjectType::Sink, src_id, dst_id)
2258 }
2259 alter_swap_rename_request::Object::Subscription(objs) => {
2260 let (src_id, dst_id) = (
2261 objs.src_object_id as ObjectId,
2262 objs.dst_object_id as ObjectId,
2263 );
2264 (ObjectType::Subscription, src_id, dst_id)
2265 }
2266 };
2267
2268 self.metadata_manager
2269 .catalog_controller
2270 .alter_swap_rename(obj_type, src_id, dst_id)
2271 .await
2272 }
2273
2274 async fn alter_owner(
2275 &self,
2276 object: Object,
2277 owner_id: UserId,
2278 ) -> MetaResult<NotificationVersion> {
2279 let (obj_type, id) = match object {
2280 Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2281 Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2282 Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2283 Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2284 Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2285 Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2286 Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId),
2287 Object::ConnectionId(id) => (ObjectType::Connection, id as ObjectId),
2288 };
2289 self.metadata_manager
2290 .catalog_controller
2291 .alter_owner(obj_type, id, owner_id as _)
2292 .await
2293 }
2294
2295 async fn alter_set_schema(
2296 &self,
2297 object: alter_set_schema_request::Object,
2298 new_schema_id: SchemaId,
2299 ) -> MetaResult<NotificationVersion> {
2300 let (obj_type, id) = match object {
2301 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2302 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2303 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2304 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2305 alter_set_schema_request::Object::FunctionId(id) => {
2306 (ObjectType::Function, id as ObjectId)
2307 }
2308 alter_set_schema_request::Object::ConnectionId(id) => {
2309 (ObjectType::Connection, id as ObjectId)
2310 }
2311 alter_set_schema_request::Object::SubscriptionId(id) => {
2312 (ObjectType::Subscription, id as ObjectId)
2313 }
2314 };
2315 self.metadata_manager
2316 .catalog_controller
2317 .alter_schema(obj_type, id, new_schema_id as _)
2318 .await
2319 }
2320
2321 pub async fn wait(&self) -> MetaResult<()> {
2322 let timeout_ms = 30 * 60 * 1000;
2323 for _ in 0..timeout_ms {
2324 if self
2325 .metadata_manager
2326 .catalog_controller
2327 .list_background_creating_mviews(true)
2328 .await?
2329 .is_empty()
2330 {
2331 return Ok(());
2332 }
2333
2334 sleep(Duration::from_millis(1)).await;
2335 }
2336 Err(MetaError::cancelled(format!(
2337 "timeout after {timeout_ms}ms"
2338 )))
2339 }
2340
2341 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2342 self.metadata_manager
2343 .catalog_controller
2344 .comment_on(comment)
2345 .await
2346 }
2347}
2348
2349fn report_create_object(
2350 catalog_id: u32,
2351 event_name: &str,
2352 obj_type: PbTelemetryDatabaseObject,
2353 connector_name: Option<String>,
2354 attr_info: Option<jsonbb::Value>,
2355) {
2356 report_event(
2357 PbTelemetryEventStage::CreateStreamJob,
2358 event_name,
2359 catalog_id.into(),
2360 connector_name,
2361 Some(obj_type),
2362 attr_info,
2363 );
2364}
2365
2366async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> {
2367 match Entity::delete_many()
2368 .filter(Column::SinkId.eq(sink_id))
2369 .exec(db)
2370 .await
2371 {
2372 Ok(result) => {
2373 let deleted_count = result.rows_affected;
2374
2375 tracing::info!(
2376 "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2377 deleted_count,
2378 sink_id
2379 );
2380 Ok(())
2381 }
2382 Err(e) => {
2383 tracing::error!(
2384 "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2385 sink_id,
2386 e.as_report()
2387 );
2388 Err(e.into())
2389 }
2390 }
2391}