1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::{InstrumentAwait, span};
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::AlterDatabaseParam;
27use risingwave_common::config::DefaultParallelism;
28use risingwave_common::hash::VnodeCountCompat;
29use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
30use risingwave_common::system_param::reader::SystemParamsRead;
31use risingwave_common::util::stream_graph_visitor::{
32 visit_stream_node, visit_stream_node_cont_mut,
33};
34use risingwave_common::{bail, bail_not_implemented, must_match};
35use risingwave_connector::WithOptionsSecResolved;
36use risingwave_connector::connector_common::validate_connection;
37use risingwave_connector::source::{
38 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
39};
40use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
41use risingwave_meta_model::object::ObjectType;
42use risingwave_meta_model::{
43 ConnectionId, DatabaseId, DispatcherType, FunctionId, IndexId, ObjectId, SchemaId, SecretId,
44 SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId,
45};
46use risingwave_pb::catalog::{
47 Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
48 Subscription, Table, View,
49};
50use risingwave_pb::ddl_service::alter_owner_request::Object;
51use risingwave_pb::ddl_service::{
52 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
53 alter_swap_rename_request,
54};
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 FragmentTypeFlag, MergeNode, PbDispatchOutputMapping, PbDispatcherType, PbStreamFragmentGraph,
58 StreamFragmentGraph as StreamFragmentGraphProto,
59};
60use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
61use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
62use strum::Display;
63use thiserror_ext::AsReport;
64use tokio::sync::{Semaphore, oneshot};
65use tokio::time::sleep;
66use tracing::Instrument;
67
68use crate::barrier::BarrierManagerRef;
69use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
70use crate::controller::cluster::StreamingClusterInfo;
71use crate::controller::streaming_job::SinkIntoTableContext;
72use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
73use crate::manager::{
74 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
75 NotificationVersion, StreamingJob, StreamingJobType,
76};
77use crate::model::{
78 DownstreamFragmentRelation, Fragment, StreamContext, StreamJobFragments,
79 StreamJobFragmentsToCreate, TableParallelism,
80};
81use crate::stream::{
82 ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph,
83 CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef,
84 JobRescheduleTarget, ReplaceStreamJobContext, SourceChange, SourceManagerRef,
85 StreamFragmentGraph, create_source_worker, validate_sink,
86};
87use crate::telemetry::report_event;
88use crate::{MetaError, MetaResult};
89
90#[derive(PartialEq)]
91pub enum DropMode {
92 Restrict,
93 Cascade,
94}
95
96impl DropMode {
97 pub fn from_request_setting(cascade: bool) -> DropMode {
98 if cascade {
99 DropMode::Cascade
100 } else {
101 DropMode::Restrict
102 }
103 }
104}
105
106#[derive(strum::AsRefStr)]
107pub enum StreamingJobId {
108 MaterializedView(TableId),
109 Sink(SinkId),
110 Table(Option<SourceId>, TableId),
111 Index(IndexId),
112}
113
114impl std::fmt::Display for StreamingJobId {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 write!(f, "{}", self.as_ref())?;
117 write!(f, "({})", self.id())
118 }
119}
120
121impl StreamingJobId {
122 #[allow(dead_code)]
123 fn id(&self) -> TableId {
124 match self {
125 StreamingJobId::MaterializedView(id)
126 | StreamingJobId::Sink(id)
127 | StreamingJobId::Table(_, id)
128 | StreamingJobId::Index(id) => *id,
129 }
130 }
131}
132
133pub struct ReplaceStreamJobInfo {
136 pub streaming_job: StreamingJob,
137 pub fragment_graph: StreamFragmentGraphProto,
138}
139
140#[derive(Display)]
141pub enum DdlCommand {
142 CreateDatabase(Database),
143 DropDatabase(DatabaseId),
144 CreateSchema(Schema),
145 DropSchema(SchemaId, DropMode),
146 CreateNonSharedSource(Source),
147 DropSource(SourceId, DropMode),
148 CreateFunction(Function),
149 DropFunction(FunctionId),
150 CreateView(View),
151 DropView(ViewId, DropMode),
152 CreateStreamingJob {
153 stream_job: StreamingJob,
154 fragment_graph: StreamFragmentGraphProto,
155 create_type: CreateType,
156 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
157 dependencies: HashSet<ObjectId>,
158 specific_resource_group: Option<String>, if_not_exists: bool,
160 },
161 DropStreamingJob {
162 job_id: StreamingJobId,
163 drop_mode: DropMode,
164 target_replace_info: Option<ReplaceStreamJobInfo>,
165 },
166 AlterName(alter_name_request::Object, String),
167 AlterSwapRename(alter_swap_rename_request::Object),
168 ReplaceStreamJob(ReplaceStreamJobInfo),
169 AlterNonSharedSource(Source),
170 AlterObjectOwner(Object, UserId),
171 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
172 CreateConnection(Connection),
173 DropConnection(ConnectionId),
174 CreateSecret(Secret),
175 AlterSecret(Secret),
176 DropSecret(SecretId),
177 CommentOn(Comment),
178 CreateSubscription(Subscription),
179 DropSubscription(SubscriptionId, DropMode),
180 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
181}
182
183impl DdlCommand {
184 fn object(&self) -> Either<String, ObjectId> {
186 use Either::*;
187 match self {
188 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
189 DdlCommand::DropDatabase(id) => Right(*id),
190 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
191 DdlCommand::DropSchema(id, _) => Right(*id),
192 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
193 DdlCommand::DropSource(id, _) => Right(*id),
194 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
195 DdlCommand::DropFunction(id) => Right(*id),
196 DdlCommand::CreateView(view) => Left(view.name.clone()),
197 DdlCommand::DropView(id, _) => Right(*id),
198 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
199 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id()),
200 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
201 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
202 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
203 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
204 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
205 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
206 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
207 DdlCommand::DropConnection(id) => Right(*id),
208 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
209 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
210 DdlCommand::DropSecret(id) => Right(*id),
211 DdlCommand::CommentOn(comment) => Right(comment.table_id as _),
212 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
213 DdlCommand::DropSubscription(id, _) => Right(*id),
214 DdlCommand::AlterDatabaseParam(id, _) => Right(*id),
215 }
216 }
217
218 fn allow_in_recovery(&self) -> bool {
219 match self {
220 DdlCommand::DropDatabase(_)
221 | DdlCommand::DropSchema(_, _)
222 | DdlCommand::DropSource(_, _)
223 | DdlCommand::DropFunction(_)
224 | DdlCommand::DropView(_, _)
225 | DdlCommand::DropStreamingJob { .. }
226 | DdlCommand::DropConnection(_)
227 | DdlCommand::DropSecret(_)
228 | DdlCommand::DropSubscription(_, _)
229 | DdlCommand::AlterName(_, _)
230 | DdlCommand::AlterObjectOwner(_, _)
231 | DdlCommand::AlterSetSchema(_, _)
232 | DdlCommand::CreateDatabase(_)
233 | DdlCommand::CreateSchema(_)
234 | DdlCommand::CreateFunction(_)
235 | DdlCommand::CreateView(_)
236 | DdlCommand::CreateConnection(_)
237 | DdlCommand::CommentOn(_)
238 | DdlCommand::CreateSecret(_)
239 | DdlCommand::AlterSecret(_)
240 | DdlCommand::AlterSwapRename(_)
241 | DdlCommand::AlterDatabaseParam(_, _) => true,
242 DdlCommand::CreateStreamingJob { .. }
243 | DdlCommand::CreateNonSharedSource(_)
244 | DdlCommand::ReplaceStreamJob(_)
245 | DdlCommand::AlterNonSharedSource(_)
246 | DdlCommand::CreateSubscription(_) => false,
247 }
248 }
249}
250
251#[derive(Clone)]
252pub struct DdlController {
253 pub(crate) env: MetaSrvEnv,
254
255 pub(crate) metadata_manager: MetadataManager,
256 pub(crate) stream_manager: GlobalStreamManagerRef,
257 pub(crate) source_manager: SourceManagerRef,
258 barrier_manager: BarrierManagerRef,
259
260 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
262
263 seq: Arc<AtomicU64>,
265}
266
267#[derive(Clone)]
268pub struct CreatingStreamingJobPermit {
269 pub(crate) semaphore: Arc<Semaphore>,
270}
271
272impl CreatingStreamingJobPermit {
273 async fn new(env: &MetaSrvEnv) -> Self {
274 let mut permits = env
275 .system_params_reader()
276 .await
277 .max_concurrent_creating_streaming_jobs() as usize;
278 if permits == 0 {
279 permits = Semaphore::MAX_PERMITS;
281 }
282 let semaphore = Arc::new(Semaphore::new(permits));
283
284 let (local_notification_tx, mut local_notification_rx) =
285 tokio::sync::mpsc::unbounded_channel();
286 env.notification_manager()
287 .insert_local_sender(local_notification_tx)
288 .await;
289 let semaphore_clone = semaphore.clone();
290 tokio::spawn(async move {
291 while let Some(notification) = local_notification_rx.recv().await {
292 let LocalNotification::SystemParamsChange(p) = ¬ification else {
293 continue;
294 };
295 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
296 if new_permits == 0 {
297 new_permits = Semaphore::MAX_PERMITS;
298 }
299 match permits.cmp(&new_permits) {
300 Ordering::Less => {
301 semaphore_clone.add_permits(new_permits - permits);
302 }
303 Ordering::Equal => continue,
304 Ordering::Greater => {
305 semaphore_clone
306 .acquire_many((permits - new_permits) as u32)
307 .await
308 .unwrap()
309 .forget();
310 }
311 }
312 tracing::info!(
313 "max_concurrent_creating_streaming_jobs changed from {} to {}",
314 permits,
315 new_permits
316 );
317 permits = new_permits;
318 }
319 });
320
321 Self { semaphore }
322 }
323}
324
325impl DdlController {
326 pub async fn new(
327 env: MetaSrvEnv,
328 metadata_manager: MetadataManager,
329 stream_manager: GlobalStreamManagerRef,
330 source_manager: SourceManagerRef,
331 barrier_manager: BarrierManagerRef,
332 ) -> Self {
333 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
334 Self {
335 env,
336 metadata_manager,
337 stream_manager,
338 source_manager,
339 barrier_manager,
340 creating_streaming_job_permits,
341 seq: Arc::new(AtomicU64::new(0)),
342 }
343 }
344
345 pub fn next_seq(&self) -> u64 {
347 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
348 }
349
350 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
357 if !command.allow_in_recovery() {
358 self.barrier_manager.check_status_running()?;
359 }
360
361 let await_tree_key = format!("DDL Command {}", self.next_seq());
362 let await_tree_span = await_tree::span!("{command}({})", command.object());
363
364 let ctrl = self.clone();
365 let fut = async move {
366 match command {
367 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
368 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
369 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
370 DdlCommand::DropSchema(schema_id, drop_mode) => {
371 ctrl.drop_schema(schema_id, drop_mode).await
372 }
373 DdlCommand::CreateNonSharedSource(source) => {
374 ctrl.create_non_shared_source(source).await
375 }
376 DdlCommand::DropSource(source_id, drop_mode) => {
377 ctrl.drop_source(source_id, drop_mode).await
378 }
379 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
380 DdlCommand::DropFunction(function_id) => ctrl.drop_function(function_id).await,
381 DdlCommand::CreateView(view) => ctrl.create_view(view).await,
382 DdlCommand::DropView(view_id, drop_mode) => {
383 ctrl.drop_view(view_id, drop_mode).await
384 }
385 DdlCommand::CreateStreamingJob {
386 stream_job,
387 fragment_graph,
388 create_type: _,
389 affected_table_replace_info,
390 dependencies,
391 specific_resource_group,
392 if_not_exists,
393 } => {
394 ctrl.create_streaming_job(
395 stream_job,
396 fragment_graph,
397 affected_table_replace_info,
398 dependencies,
399 specific_resource_group,
400 if_not_exists,
401 )
402 .await
403 }
404 DdlCommand::DropStreamingJob {
405 job_id,
406 drop_mode,
407 target_replace_info,
408 } => {
409 ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info)
410 .await
411 }
412 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
413 streaming_job,
414 fragment_graph,
415 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
416 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
417 DdlCommand::AlterObjectOwner(object, owner_id) => {
418 ctrl.alter_owner(object, owner_id).await
419 }
420 DdlCommand::AlterSetSchema(object, new_schema_id) => {
421 ctrl.alter_set_schema(object, new_schema_id).await
422 }
423 DdlCommand::CreateConnection(connection) => {
424 ctrl.create_connection(connection).await
425 }
426 DdlCommand::DropConnection(connection_id) => {
427 ctrl.drop_connection(connection_id).await
428 }
429 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
430 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
431 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
432 DdlCommand::AlterNonSharedSource(source) => {
433 ctrl.alter_non_shared_source(source).await
434 }
435 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
436 DdlCommand::CreateSubscription(subscription) => {
437 ctrl.create_subscription(subscription).await
438 }
439 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
440 ctrl.drop_subscription(subscription_id, drop_mode).await
441 }
442 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
443 DdlCommand::AlterDatabaseParam(database_id, param) => {
444 ctrl.alter_database_param(database_id, param).await
445 }
446 }
447 }
448 .in_current_span();
449 let fut = (self.env.await_tree_reg())
450 .register(await_tree_key, await_tree_span)
451 .instrument(Box::pin(fut));
452 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
453 Ok(Some(WaitVersion {
454 catalog_version: notification_version,
455 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
456 }))
457 }
458
459 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
460 self.barrier_manager.get_ddl_progress().await
461 }
462
463 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
464 self.metadata_manager
465 .catalog_controller
466 .create_database(database)
467 .await
468 }
469
470 #[tracing::instrument(skip(self), level = "debug")]
471 pub async fn reschedule_streaming_job(
472 &self,
473 job_id: u32,
474 target: JobRescheduleTarget,
475 mut deferred: bool,
476 ) -> MetaResult<()> {
477 tracing::info!("alter parallelism");
478 if self.barrier_manager.check_status_running().is_err() {
479 tracing::info!(
480 "alter parallelism is set to deferred mode because the system is in recovery state"
481 );
482 deferred = true;
483 }
484
485 self.stream_manager
486 .reschedule_streaming_job(job_id, target, deferred)
487 .await
488 }
489
490 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
491 self.drop_object(
492 ObjectType::Database,
493 database_id as _,
494 DropMode::Cascade,
495 None,
496 )
497 .await
498 }
499
500 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
501 self.metadata_manager
502 .catalog_controller
503 .create_schema(schema)
504 .await
505 }
506
507 async fn drop_schema(
508 &self,
509 schema_id: SchemaId,
510 drop_mode: DropMode,
511 ) -> MetaResult<NotificationVersion> {
512 self.drop_object(ObjectType::Schema, schema_id as _, drop_mode, None)
513 .await
514 }
515
516 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
518 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
519 .await
520 .context("failed to create source worker")?;
521
522 let (source_id, version) = self
523 .metadata_manager
524 .catalog_controller
525 .create_source(source)
526 .await?;
527 self.source_manager
528 .register_source_with_handle(source_id, handle)
529 .await;
530 Ok(version)
531 }
532
533 async fn drop_source(
534 &self,
535 source_id: SourceId,
536 drop_mode: DropMode,
537 ) -> MetaResult<NotificationVersion> {
538 self.drop_object(ObjectType::Source, source_id as _, drop_mode, None)
539 .await
540 }
541
542 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
545 self.metadata_manager
546 .catalog_controller
547 .alter_non_shared_source(source)
548 .await
549 }
550
551 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
552 self.metadata_manager
553 .catalog_controller
554 .create_function(function)
555 .await
556 }
557
558 async fn drop_function(&self, function_id: FunctionId) -> MetaResult<NotificationVersion> {
559 self.drop_object(
560 ObjectType::Function,
561 function_id as _,
562 DropMode::Restrict,
563 None,
564 )
565 .await
566 }
567
568 async fn create_view(&self, view: View) -> MetaResult<NotificationVersion> {
569 self.metadata_manager
570 .catalog_controller
571 .create_view(view)
572 .await
573 }
574
575 async fn drop_view(
576 &self,
577 view_id: ViewId,
578 drop_mode: DropMode,
579 ) -> MetaResult<NotificationVersion> {
580 self.drop_object(ObjectType::View, view_id as _, drop_mode, None)
581 .await
582 }
583
584 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
585 validate_connection(&connection).await?;
586 self.metadata_manager
587 .catalog_controller
588 .create_connection(connection)
589 .await
590 }
591
592 async fn drop_connection(
593 &self,
594 connection_id: ConnectionId,
595 ) -> MetaResult<NotificationVersion> {
596 self.drop_object(
597 ObjectType::Connection,
598 connection_id as _,
599 DropMode::Restrict,
600 None,
601 )
602 .await
603 }
604
605 async fn alter_database_param(
606 &self,
607 database_id: DatabaseId,
608 param: AlterDatabaseParam,
609 ) -> MetaResult<NotificationVersion> {
610 self.metadata_manager
611 .catalog_controller
612 .alter_database_param(database_id, param)
613 .await
614 }
615
616 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
619 let secret_store_private_key = self
620 .env
621 .opts
622 .secret_store_private_key
623 .clone()
624 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
625
626 let encrypted_payload = SecretEncryption::encrypt(
627 secret_store_private_key.as_slice(),
628 secret.get_value().as_slice(),
629 )
630 .context(format!("failed to encrypt secret {}", secret.name))?;
631 Ok(encrypted_payload
632 .serialize()
633 .context(format!("failed to serialize secret {}", secret.name))?)
634 }
635
636 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
637 let secret_plain_payload = secret.value.clone();
640 let encrypted_payload = self.get_encrypted_payload(&secret)?;
641 secret.value = encrypted_payload;
642
643 self.metadata_manager
644 .catalog_controller
645 .create_secret(secret, secret_plain_payload)
646 .await
647 }
648
649 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
650 self.drop_object(ObjectType::Secret, secret_id as _, DropMode::Restrict, None)
651 .await
652 }
653
654 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
655 let secret_plain_payload = secret.value.clone();
656 let encrypted_payload = self.get_encrypted_payload(&secret)?;
657 secret.value = encrypted_payload;
658 self.metadata_manager
659 .catalog_controller
660 .alter_secret(secret, secret_plain_payload)
661 .await
662 }
663
664 async fn create_subscription(
665 &self,
666 mut subscription: Subscription,
667 ) -> MetaResult<NotificationVersion> {
668 tracing::debug!("create subscription");
669 let _permit = self
670 .creating_streaming_job_permits
671 .semaphore
672 .acquire()
673 .await
674 .unwrap();
675 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
676 self.metadata_manager
677 .catalog_controller
678 .create_subscription_catalog(&mut subscription)
679 .await?;
680 self.stream_manager
681 .create_subscription(&subscription)
682 .await
683 .inspect_err(|e| {
684 tracing::debug!(error = %e.as_report(), "cancel create subscription");
685 })?;
686
687 let version = self
688 .metadata_manager
689 .catalog_controller
690 .notify_create_subscription(subscription.id)
691 .await?;
692 tracing::debug!("finish create subscription");
693 Ok(version)
694 }
695
696 async fn drop_subscription(
697 &self,
698 subscription_id: SubscriptionId,
699 drop_mode: DropMode,
700 ) -> MetaResult<NotificationVersion> {
701 tracing::debug!("preparing drop subscription");
702 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
703 let subscription = self
704 .metadata_manager
705 .catalog_controller
706 .get_subscription_by_id(subscription_id)
707 .await?;
708 let table_id = subscription.dependent_table_id;
709 let database_id = subscription.database_id.into();
710 let (_, version) = self
711 .metadata_manager
712 .catalog_controller
713 .drop_object(ObjectType::Subscription, subscription_id as _, drop_mode)
714 .await?;
715 self.stream_manager
716 .drop_subscription(database_id, subscription_id as _, table_id)
717 .await;
718 tracing::debug!("finish drop subscription");
719 Ok(version)
720 }
721
722 #[await_tree::instrument]
724 pub(crate) async fn validate_cdc_table(
725 table: &Table,
726 table_fragments: &StreamJobFragments,
727 ) -> MetaResult<()> {
728 let stream_scan_fragment = table_fragments
729 .fragments
730 .values()
731 .filter(|f| f.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0)
732 .exactly_one()
733 .ok()
734 .with_context(|| {
735 format!(
736 "expect exactly one stream scan fragment, got: {:?}",
737 table_fragments.fragments
738 )
739 })?;
740
741 assert_eq!(
742 stream_scan_fragment.actors.len(),
743 1,
744 "Stream scan fragment should have only one actor"
745 );
746 let mut found_cdc_scan = false;
747 match &stream_scan_fragment.nodes.node_body {
748 Some(NodeBody::StreamCdcScan(_)) => {
749 if Self::validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
750 .await?
751 {
752 found_cdc_scan = true;
753 }
754 }
755 Some(NodeBody::Project(_)) => {
757 for input in &stream_scan_fragment.nodes.input {
758 if Self::validate_cdc_table_inner(&input.node_body, table.id).await? {
759 found_cdc_scan = true;
760 }
761 }
762 }
763 _ => {
764 bail!("Unexpected node body for stream cdc scan");
765 }
766 };
767 if !found_cdc_scan {
768 bail!("No stream cdc scan node found in stream scan fragment");
769 }
770 Ok(())
771 }
772
773 async fn validate_cdc_table_inner(
774 node_body: &Option<NodeBody>,
775 table_id: u32,
776 ) -> MetaResult<bool> {
777 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
778 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
779 {
780 let options_with_secret = WithOptionsSecResolved::new(
781 cdc_table_desc.connect_properties.clone(),
782 cdc_table_desc.secret_refs.clone(),
783 );
784
785 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
786 props.init_from_pb_cdc_table_desc(cdc_table_desc);
787
788 let _enumerator = props
790 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
791 .await?;
792
793 tracing::debug!(?table_id, "validate cdc table success");
794 Ok(true)
795 } else {
796 Ok(false)
797 }
798 }
799
800 pub(crate) async fn inject_replace_table_job_for_table_sink(
804 &self,
805 tmp_id: u32,
806 mgr: &MetadataManager,
807 stream_ctx: StreamContext,
808 sink: Option<&Sink>,
809 creating_sink_table_fragments: Option<&StreamJobFragments>,
810 dropping_sink_id: Option<SinkId>,
811 streaming_job: &StreamingJob,
812 fragment_graph: StreamFragmentGraph,
813 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
814 let (mut replace_table_ctx, mut stream_job_fragments) = self
815 .build_replace_job(stream_ctx, streaming_job, fragment_graph, tmp_id as _)
816 .await?;
817
818 let target_table = streaming_job.table().unwrap();
819
820 if let Some(creating_sink_table_fragments) = creating_sink_table_fragments {
821 let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();
822 let sink = sink.expect("sink not found");
823 Self::inject_replace_table_plan_for_sink(
824 sink.id,
825 &sink_fragment,
826 target_table,
827 &mut replace_table_ctx,
828 stream_job_fragments.inner.union_fragment_for_table(),
829 None,
830 );
831 }
832
833 let [table_catalog]: [_; 1] = mgr
834 .get_table_catalog_by_ids(vec![target_table.id])
835 .await?
836 .try_into()
837 .expect("Target table should exist in sink into table");
838
839 {
840 let catalogs = mgr
841 .get_sink_catalog_by_ids(&table_catalog.incoming_sinks)
842 .await?;
843
844 for sink in catalogs {
845 let sink_id = sink.id;
846
847 if let Some(dropping_sink_id) = dropping_sink_id
848 && sink_id == (dropping_sink_id as u32)
849 {
850 continue;
851 };
852
853 let sink_table_fragments = mgr
854 .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id))
855 .await?;
856
857 let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
858
859 Self::inject_replace_table_plan_for_sink(
860 sink_id,
861 &sink_fragment,
862 target_table,
863 &mut replace_table_ctx,
864 stream_job_fragments.inner.union_fragment_for_table(),
865 Some(&sink.unique_identity()),
866 );
867 }
868 }
869
870 for fragment in stream_job_fragments.fragments.values() {
872 {
873 {
874 visit_stream_node(&fragment.nodes, |node| {
875 if let NodeBody::Merge(merge_node) = node {
876 let upstream_fragment_id = merge_node.upstream_fragment_id;
877 if let Some(external_upstream_fragment_downstreams) = replace_table_ctx
878 .upstream_fragment_downstreams
879 .get(&upstream_fragment_id)
880 {
881 let mut upstream_fragment_downstreams =
882 external_upstream_fragment_downstreams.iter().filter(
883 |downstream| {
884 downstream.downstream_fragment_id
885 == fragment.fragment_id
886 },
887 );
888 assert!(
889 upstream_fragment_downstreams.next().is_some(),
890 "All the mergers for the union should have been fully assigned beforehand."
891 );
892 } else {
893 let mut upstream_fragment_downstreams = stream_job_fragments
894 .downstreams
895 .get(&upstream_fragment_id)
896 .into_iter()
897 .flatten()
898 .filter(|downstream| {
899 downstream.downstream_fragment_id == fragment.fragment_id
900 });
901 assert!(
902 upstream_fragment_downstreams.next().is_some(),
903 "All the mergers for the union should have been fully assigned beforehand."
904 );
905 }
906 }
907 });
908 }
909 }
910 }
911
912 Ok((replace_table_ctx, stream_job_fragments))
913 }
914
915 pub(crate) fn inject_replace_table_plan_for_sink(
916 sink_id: u32,
917 sink_fragment: &Fragment,
918 table: &Table,
919 replace_table_ctx: &mut ReplaceStreamJobContext,
920 union_fragment: &mut Fragment,
921 unique_identity: Option<&str>,
922 ) {
923 let sink_fields = sink_fragment.nodes.fields.clone();
924
925 let output_indices = sink_fields
926 .iter()
927 .enumerate()
928 .map(|(idx, _)| idx as _)
929 .collect_vec();
930
931 let dist_key_indices = table.distribution_key.iter().map(|i| *i as _).collect_vec();
932
933 let sink_fragment_downstreams = replace_table_ctx
934 .upstream_fragment_downstreams
935 .entry(sink_fragment.fragment_id)
936 .or_default();
937
938 {
939 sink_fragment_downstreams.push(DownstreamFragmentRelation {
940 downstream_fragment_id: union_fragment.fragment_id,
941 dispatcher_type: DispatcherType::Hash,
942 dist_key_indices: dist_key_indices.clone(),
943 output_mapping: PbDispatchOutputMapping::simple(output_indices),
944 });
945 }
946
947 let upstream_fragment_id = sink_fragment.fragment_id;
948
949 {
950 {
951 visit_stream_node_cont_mut(&mut union_fragment.nodes, |node| {
952 if let Some(NodeBody::Union(_)) = &mut node.node_body {
953 for input_project_node in &mut node.input {
954 if let Some(NodeBody::Project(_)) = &mut input_project_node.node_body {
955 let merge_stream_node =
956 input_project_node.input.iter_mut().exactly_one().unwrap();
957
958 if input_project_node.identity.as_str()
960 != unique_identity
961 .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
962 {
963 continue;
964 }
965
966 if let Some(NodeBody::Merge(merge_node)) =
967 &mut merge_stream_node.node_body
968 {
969 {
970 merge_stream_node.identity =
971 format!("MergeExecutor(from sink {})", sink_id);
972
973 input_project_node.identity =
974 format!("ProjectExecutor(from sink {})", sink_id);
975 }
976
977 **merge_node = {
978 #[expect(deprecated)]
979 MergeNode {
980 upstream_actor_id: vec![],
981 upstream_fragment_id,
982 upstream_dispatcher_type: PbDispatcherType::Hash as _,
983 fields: sink_fields.to_vec(),
984 }
985 };
986
987 merge_stream_node.fields = sink_fields.to_vec();
988
989 return false;
990 }
991 }
992 }
993 }
994 true
995 });
996 }
997 }
998 }
999
1000 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1003 pub async fn create_streaming_job(
1004 &self,
1005 mut streaming_job: StreamingJob,
1006 fragment_graph: StreamFragmentGraphProto,
1007 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1008 dependencies: HashSet<ObjectId>,
1009 specific_resource_group: Option<String>,
1010 if_not_exists: bool,
1011 ) -> MetaResult<NotificationVersion> {
1012 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1013 let check_ret = self
1014 .metadata_manager
1015 .catalog_controller
1016 .create_job_catalog(
1017 &mut streaming_job,
1018 &ctx,
1019 &fragment_graph.parallelism,
1020 fragment_graph.max_parallelism as _,
1021 dependencies,
1022 specific_resource_group.clone(),
1023 )
1024 .await;
1025 if let Err(meta_err) = check_ret {
1026 if !if_not_exists {
1027 return Err(meta_err);
1028 }
1029 if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1030 if streaming_job.create_type() == CreateType::Foreground {
1031 let database_id = streaming_job.database_id();
1032 return self
1033 .metadata_manager
1034 .wait_streaming_job_finished(database_id.into(), *job_id)
1035 .await;
1036 } else {
1037 return Ok(IGNORED_NOTIFICATION_VERSION);
1038 }
1039 } else {
1040 return Err(meta_err);
1041 }
1042 }
1043 let job_id = streaming_job.id();
1044
1045 tracing::debug!(
1046 id = job_id,
1047 definition = streaming_job.definition(),
1048 create_type = streaming_job.create_type().as_str_name(),
1049 job_type = ?streaming_job.job_type(),
1050 "starting streaming job",
1051 );
1052 let _permit = self
1053 .creating_streaming_job_permits
1054 .semaphore
1055 .acquire()
1056 .instrument_await("acquire_creating_streaming_job_permit")
1057 .await
1058 .unwrap();
1059 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1060
1061 let name = streaming_job.name();
1062 let definition = streaming_job.definition();
1063 let source_id = match &streaming_job {
1064 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1065 _ => None,
1066 };
1067
1068 match self
1070 .create_streaming_job_inner(
1071 ctx,
1072 streaming_job,
1073 fragment_graph,
1074 affected_table_replace_info,
1075 specific_resource_group,
1076 )
1077 .await
1078 {
1079 Ok(version) => Ok(version),
1080 Err(err) => {
1081 tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
1082 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1083 id: job_id,
1084 name,
1085 definition,
1086 error: err.as_report().to_string(),
1087 };
1088 self.env.event_log_manager_ref().add_event_logs(vec![
1089 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1090 ]);
1091 let (aborted, _) = self
1092 .metadata_manager
1093 .catalog_controller
1094 .try_abort_creating_streaming_job(job_id as _, false)
1095 .await?;
1096 if aborted {
1097 tracing::warn!(id = job_id, "aborted streaming job");
1098 if let Some(source_id) = source_id {
1100 self.source_manager
1101 .apply_source_change(SourceChange::DropSource {
1102 dropped_source_ids: vec![source_id as SourceId],
1103 })
1104 .await;
1105 }
1106 }
1107 Err(err)
1108 }
1109 }
1110 }
1111
1112 #[await_tree::instrument(boxed)]
1113 async fn create_streaming_job_inner(
1114 &self,
1115 ctx: StreamContext,
1116 mut streaming_job: StreamingJob,
1117 fragment_graph: StreamFragmentGraphProto,
1118 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1119 specific_resource_group: Option<String>,
1120 ) -> MetaResult<NotificationVersion> {
1121 let mut fragment_graph =
1122 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1123 streaming_job.set_info_from_graph(&fragment_graph);
1124
1125 let incomplete_internal_tables = fragment_graph
1127 .incomplete_internal_tables()
1128 .into_values()
1129 .collect_vec();
1130 let table_id_map = self
1131 .metadata_manager
1132 .catalog_controller
1133 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1134 .await?;
1135 fragment_graph.refill_internal_table_ids(table_id_map);
1136
1137 let affected_table_replace_info = match affected_table_replace_info {
1138 Some(replace_table_info) => {
1139 assert!(
1140 specific_resource_group.is_none(),
1141 "specific_resource_group is not supported for replace table (alter column or sink into table)"
1142 );
1143
1144 let ReplaceStreamJobInfo {
1145 mut streaming_job,
1146 fragment_graph,
1147 ..
1148 } = replace_table_info;
1149
1150 let original_max_parallelism = self
1152 .metadata_manager
1153 .get_job_max_parallelism(streaming_job.id().into())
1154 .await?;
1155 let fragment_graph = PbStreamFragmentGraph {
1156 max_parallelism: original_max_parallelism as _,
1157 ..fragment_graph
1158 };
1159
1160 let fragment_graph =
1161 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1162 streaming_job.set_info_from_graph(&fragment_graph);
1163 let streaming_job = streaming_job;
1164
1165 Some((streaming_job, fragment_graph))
1166 }
1167 None => None,
1168 };
1169
1170 tracing::debug!(id = streaming_job.id(), "building streaming job");
1172 let (ctx, stream_job_fragments) = self
1173 .build_stream_job(
1174 ctx,
1175 streaming_job,
1176 fragment_graph,
1177 affected_table_replace_info,
1178 specific_resource_group,
1179 )
1180 .await?;
1181
1182 let streaming_job = &ctx.streaming_job;
1183
1184 match streaming_job {
1185 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1186 Self::validate_cdc_table(table, &stream_job_fragments).await?;
1187 }
1188 StreamingJob::Table(Some(source), ..) => {
1189 self.source_manager.register_source(source).await?;
1191 let connector_name = source
1192 .get_with_properties()
1193 .get(UPSTREAM_SOURCE_KEY)
1194 .cloned();
1195 let attr = source.info.as_ref().map(|source_info| {
1196 jsonbb::json!({
1197 "format": source_info.format().as_str_name(),
1198 "encode": source_info.row_encode().as_str_name(),
1199 })
1200 });
1201 report_create_object(
1202 streaming_job.id(),
1203 "source",
1204 PbTelemetryDatabaseObject::Source,
1205 connector_name,
1206 attr,
1207 );
1208 }
1209 StreamingJob::Sink(sink, _) => {
1210 validate_sink(sink).await?;
1212 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1213 let attr = sink.format_desc.as_ref().map(|sink_info| {
1214 jsonbb::json!({
1215 "format": sink_info.format().as_str_name(),
1216 "encode": sink_info.encode().as_str_name(),
1217 })
1218 });
1219 report_create_object(
1220 streaming_job.id(),
1221 "sink",
1222 PbTelemetryDatabaseObject::Sink,
1223 connector_name,
1224 attr,
1225 );
1226 }
1227 StreamingJob::Source(source) => {
1228 self.source_manager.register_source(source).await?;
1230 let connector_name = source
1231 .get_with_properties()
1232 .get(UPSTREAM_SOURCE_KEY)
1233 .cloned();
1234 let attr = source.info.as_ref().map(|source_info| {
1235 jsonbb::json!({
1236 "format": source_info.format().as_str_name(),
1237 "encode": source_info.row_encode().as_str_name(),
1238 })
1239 });
1240 report_create_object(
1241 streaming_job.id(),
1242 "source",
1243 PbTelemetryDatabaseObject::Source,
1244 connector_name,
1245 attr,
1246 );
1247 }
1248 _ => {}
1249 }
1250
1251 self.metadata_manager
1252 .catalog_controller
1253 .prepare_streaming_job(&stream_job_fragments, streaming_job, false)
1254 .await?;
1255
1256 let stream_job_id = streaming_job.id();
1258 match (streaming_job.create_type(), &streaming_job) {
1259 (CreateType::Unspecified, _)
1261 | (CreateType::Foreground, _)
1262 | (CreateType::Background, StreamingJob::Sink(_, _)) => {
1263 let version = self
1264 .stream_manager
1265 .create_streaming_job(stream_job_fragments, ctx, None)
1266 .await?;
1267 Ok(version)
1268 }
1269 (CreateType::Background, _) => {
1270 let await_tree_key = format!("Background DDL Worker ({})", streaming_job.id());
1271 let await_tree_span =
1272 span!("{:?}({})", streaming_job.job_type(), streaming_job.name());
1273
1274 let ctrl = self.clone();
1275 let (tx, rx) = oneshot::channel();
1276 let fut = async move {
1277 let _ = ctrl
1278 .stream_manager
1279 .create_streaming_job(stream_job_fragments, ctx, Some(tx))
1280 .await
1281 .inspect_err(|err| {
1282 tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
1283 });
1284 };
1285
1286 let fut = (self.env.await_tree_reg())
1287 .register(await_tree_key, await_tree_span)
1288 .instrument(fut);
1289 tokio::spawn(fut);
1290
1291 rx.instrument_await("wait_background_streaming_job_creation_started")
1292 .await
1293 .map_err(|_| {
1294 anyhow!(
1295 "failed to receive create streaming job result of job: {}",
1296 stream_job_id
1297 )
1298 })??;
1299 Ok(IGNORED_NOTIFICATION_VERSION)
1300 }
1301 }
1302 }
1303
1304 pub async fn drop_object(
1306 &self,
1307 object_type: ObjectType,
1308 object_id: ObjectId,
1309 drop_mode: DropMode,
1310 target_replace_info: Option<ReplaceStreamJobInfo>,
1311 ) -> MetaResult<NotificationVersion> {
1312 let (release_ctx, mut version) = self
1313 .metadata_manager
1314 .catalog_controller
1315 .drop_object(object_type, object_id, drop_mode)
1316 .await?;
1317
1318 if let Some(replace_table_info) = target_replace_info {
1319 let stream_ctx =
1320 StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap());
1321
1322 let ReplaceStreamJobInfo {
1323 mut streaming_job,
1324 fragment_graph,
1325 ..
1326 } = replace_table_info;
1327
1328 let sink_id = if let ObjectType::Sink = object_type {
1329 object_id as _
1330 } else {
1331 panic!("additional replace table event only occurs when dropping sink into table")
1332 };
1333
1334 let original_max_parallelism = self
1336 .metadata_manager
1337 .get_job_max_parallelism(streaming_job.id().into())
1338 .await?;
1339 let fragment_graph = PbStreamFragmentGraph {
1340 max_parallelism: original_max_parallelism as _,
1341 ..fragment_graph
1342 };
1343
1344 let fragment_graph =
1345 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1346 streaming_job.set_info_from_graph(&fragment_graph);
1347 let streaming_job = streaming_job;
1348
1349 streaming_job.table().expect("should be table job");
1350
1351 tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink");
1352 let tmp_id = self
1353 .metadata_manager
1354 .catalog_controller
1355 .create_job_catalog_for_replace(
1356 &streaming_job,
1357 &stream_ctx,
1358 &fragment_graph.specified_parallelism(),
1359 fragment_graph.max_parallelism(),
1360 )
1361 .await? as u32;
1362
1363 let (ctx, stream_job_fragments) = self
1364 .inject_replace_table_job_for_table_sink(
1365 tmp_id,
1366 &self.metadata_manager,
1367 stream_ctx,
1368 None,
1369 None,
1370 Some(sink_id),
1371 &streaming_job,
1372 fragment_graph,
1373 )
1374 .await?;
1375
1376 let result: MetaResult<_> = try {
1377 let replace_upstream = ctx.replace_upstream.clone();
1378
1379 self.metadata_manager
1380 .catalog_controller
1381 .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1382 .await?;
1383
1384 self.stream_manager
1385 .replace_stream_job(stream_job_fragments, ctx)
1386 .await?;
1387
1388 replace_upstream
1389 };
1390
1391 version = match result {
1392 Ok(replace_upstream) => {
1393 let version = self
1394 .metadata_manager
1395 .catalog_controller
1396 .finish_replace_streaming_job(
1397 tmp_id as _,
1398 streaming_job,
1399 replace_upstream,
1400 SinkIntoTableContext {
1401 creating_sink_id: None,
1402 dropping_sink_id: Some(sink_id),
1403 updated_sink_catalogs: vec![],
1404 },
1405 None, )
1407 .await?;
1408 Ok(version)
1409 }
1410 Err(err) => {
1411 tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table");
1412 let _ = self.metadata_manager
1413 .catalog_controller
1414 .try_abort_replacing_streaming_job(tmp_id as _)
1415 .await
1416 .inspect_err(|err| {
1417 tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table");
1418 });
1419 Err(err)
1420 }
1421 }?;
1422 }
1423
1424 let ReleaseContext {
1425 database_id,
1426 removed_streaming_job_ids,
1427 removed_state_table_ids,
1428 removed_source_ids,
1429 removed_secret_ids: secret_ids,
1430 removed_source_fragments,
1431 removed_actors,
1432 removed_fragments,
1433 } = release_ctx;
1434
1435 let _guard = self.source_manager.pause_tick().await;
1436 self.stream_manager
1437 .drop_streaming_jobs(
1438 risingwave_common::catalog::DatabaseId::new(database_id as _),
1439 removed_actors.iter().map(|id| *id as _).collect(),
1440 removed_streaming_job_ids,
1441 removed_state_table_ids,
1442 removed_fragments.iter().map(|id| *id as _).collect(),
1443 )
1444 .await;
1445
1446 self.source_manager
1449 .apply_source_change(SourceChange::DropSource {
1450 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1451 })
1452 .await;
1453
1454 let dropped_source_fragments = removed_source_fragments
1457 .into_iter()
1458 .map(|(source_id, fragments)| {
1459 (
1460 source_id,
1461 fragments.into_iter().map(|id| id as u32).collect(),
1462 )
1463 })
1464 .collect();
1465 let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
1466 self.source_manager
1467 .apply_source_change(SourceChange::DropMv {
1468 dropped_source_fragments,
1469 dropped_actors,
1470 })
1471 .await;
1472
1473 for secret in secret_ids {
1475 LocalSecretManager::global().remove_secret(secret as _);
1476 }
1477 Ok(version)
1478 }
1479
1480 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1482 pub async fn replace_job(
1483 &self,
1484 mut streaming_job: StreamingJob,
1485 fragment_graph: StreamFragmentGraphProto,
1486 ) -> MetaResult<NotificationVersion> {
1487 match &mut streaming_job {
1488 StreamingJob::Table(..) | StreamingJob::Source(..) => {}
1489 StreamingJob::MaterializedView(..)
1490 | StreamingJob::Sink(..)
1491 | StreamingJob::Index(..) => {
1492 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1493 }
1494 }
1495
1496 let job_id = streaming_job.id();
1497
1498 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1499 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1500
1501 let original_max_parallelism = self
1503 .metadata_manager
1504 .get_job_max_parallelism(streaming_job.id().into())
1505 .await?;
1506 let fragment_graph = PbStreamFragmentGraph {
1507 max_parallelism: original_max_parallelism as _,
1508 ..fragment_graph
1509 };
1510
1511 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1513 streaming_job.set_info_from_graph(&fragment_graph);
1514
1515 let streaming_job = streaming_job;
1517
1518 let tmp_id = self
1519 .metadata_manager
1520 .catalog_controller
1521 .create_job_catalog_for_replace(
1522 &streaming_job,
1523 &ctx,
1524 &fragment_graph.specified_parallelism(),
1525 fragment_graph.max_parallelism(),
1526 )
1527 .await?;
1528
1529 tracing::debug!(id = job_id, "building replace streaming job");
1530 let mut updated_sink_catalogs = vec![];
1531
1532 let mut drop_table_connector_ctx = None;
1533 let result: MetaResult<_> = try {
1534 let (mut ctx, mut stream_job_fragments) = self
1535 .build_replace_job(ctx, &streaming_job, fragment_graph, tmp_id as _)
1536 .await?;
1537 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1538
1539 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1540 let catalogs = self
1541 .metadata_manager
1542 .get_sink_catalog_by_ids(&table.incoming_sinks)
1543 .await?;
1544
1545 for sink in catalogs {
1546 let sink_id = &sink.id;
1547
1548 let sink_table_fragments = self
1549 .metadata_manager
1550 .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(
1551 *sink_id,
1552 ))
1553 .await?;
1554
1555 let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
1556
1557 Self::inject_replace_table_plan_for_sink(
1558 *sink_id,
1559 &sink_fragment,
1560 table,
1561 &mut ctx,
1562 stream_job_fragments.inner.union_fragment_for_table(),
1563 Some(&sink.unique_identity()),
1564 );
1565
1566 if sink.original_target_columns.is_empty() {
1567 updated_sink_catalogs.push(sink.id as _);
1568 }
1569 }
1570 }
1571
1572 let replace_upstream = ctx.replace_upstream.clone();
1573
1574 self.metadata_manager
1575 .catalog_controller
1576 .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1577 .await?;
1578
1579 self.stream_manager
1580 .replace_stream_job(stream_job_fragments, ctx)
1581 .await?;
1582 replace_upstream
1583 };
1584
1585 match result {
1586 Ok(replace_upstream) => {
1587 let version = self
1588 .metadata_manager
1589 .catalog_controller
1590 .finish_replace_streaming_job(
1591 tmp_id,
1592 streaming_job,
1593 replace_upstream,
1594 SinkIntoTableContext {
1595 creating_sink_id: None,
1596 dropping_sink_id: None,
1597 updated_sink_catalogs,
1598 },
1599 drop_table_connector_ctx.as_ref(),
1600 )
1601 .await?;
1602 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1603 self.source_manager
1604 .apply_source_change(SourceChange::DropSource {
1605 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1606 })
1607 .await;
1608 }
1609 Ok(version)
1610 }
1611 Err(err) => {
1612 tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
1613 let _ = self.metadata_manager
1614 .catalog_controller
1615 .try_abort_replacing_streaming_job(tmp_id)
1616 .await.inspect_err(|err| {
1617 tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
1618 });
1619 Err(err)
1620 }
1621 }
1622 }
1623
1624 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" })]
1625 async fn drop_streaming_job(
1626 &self,
1627 job_id: StreamingJobId,
1628 drop_mode: DropMode,
1629 target_replace_info: Option<ReplaceStreamJobInfo>,
1630 ) -> MetaResult<NotificationVersion> {
1631 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1632
1633 let (object_id, object_type) = match job_id {
1634 StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
1635 StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
1636 StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
1637 StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
1638 };
1639
1640 let version = self
1641 .drop_object(object_type, object_id, drop_mode, target_replace_info)
1642 .await?;
1643 #[cfg(not(madsim))]
1644 if let StreamingJobId::Sink(sink_id) = job_id {
1645 let db = self.env.meta_store_ref().conn.clone();
1648 clean_all_rows_by_sink_id(&db, sink_id).await?;
1649 }
1650 Ok(version)
1651 }
1652
1653 fn resolve_stream_parallelism(
1657 &self,
1658 specified: Option<NonZeroUsize>,
1659 max: NonZeroUsize,
1660 cluster_info: &StreamingClusterInfo,
1661 resource_group: String,
1662 ) -> MetaResult<NonZeroUsize> {
1663 let available = cluster_info.parallelism(&resource_group);
1664 let Some(available) = NonZeroUsize::new(available) else {
1665 bail_unavailable!(
1666 "no available slots to schedule in resource group \"{}\", \
1667 have you allocated any compute nodes within this resource group?",
1668 resource_group
1669 );
1670 };
1671
1672 if let Some(specified) = specified {
1673 if specified > max {
1674 bail_invalid_parameter!(
1675 "specified parallelism {} should not exceed max parallelism {}",
1676 specified,
1677 max,
1678 );
1679 }
1680 if specified > available {
1681 bail_unavailable!(
1682 "insufficient parallelism to schedule in resource group \"{}\", \
1683 required: {}, available: {}",
1684 resource_group,
1685 specified,
1686 available,
1687 );
1688 }
1689 Ok(specified)
1690 } else {
1691 let default_parallelism = match self.env.opts.default_parallelism {
1693 DefaultParallelism::Full => available,
1694 DefaultParallelism::Default(num) => {
1695 if num > available {
1696 bail_unavailable!(
1697 "insufficient parallelism to schedule in resource group \"{}\", \
1698 required: {}, available: {}",
1699 resource_group,
1700 num,
1701 available,
1702 );
1703 }
1704 num
1705 }
1706 };
1707
1708 if default_parallelism > max {
1709 tracing::warn!(
1710 max_parallelism = max.get(),
1711 resource_group,
1712 "too many parallelism available, use max parallelism instead",
1713 );
1714 }
1715 Ok(default_parallelism.min(max))
1716 }
1717 }
1718
1719 #[await_tree::instrument]
1725 pub(crate) async fn build_stream_job(
1726 &self,
1727 stream_ctx: StreamContext,
1728 mut stream_job: StreamingJob,
1729 fragment_graph: StreamFragmentGraph,
1730 affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>,
1731 specific_resource_group: Option<String>,
1732 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1733 let id = stream_job.id();
1734 let specified_parallelism = fragment_graph.specified_parallelism();
1735 let expr_context = stream_ctx.to_expr_context();
1736 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1737
1738 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1740
1741 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1745 fragment_graph.collect_snapshot_backfill_info()?;
1746 assert!(
1747 snapshot_backfill_info
1748 .iter()
1749 .chain([&cross_db_snapshot_backfill_info])
1750 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1751 .all(|backfill_epoch| backfill_epoch.is_none()),
1752 "should not set backfill epoch when initially build the job: {:?} {:?}",
1753 snapshot_backfill_info,
1754 cross_db_snapshot_backfill_info
1755 );
1756
1757 self.metadata_manager
1759 .catalog_controller
1760 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1761 .await?;
1762
1763 let upstream_table_ids = fragment_graph
1764 .dependent_table_ids()
1765 .iter()
1766 .filter(|id| {
1767 !cross_db_snapshot_backfill_info
1768 .upstream_mv_table_id_to_backfill_epoch
1769 .contains_key(id)
1770 })
1771 .cloned()
1772 .collect();
1773
1774 let (upstream_root_fragments, existing_actor_location) = self
1775 .metadata_manager
1776 .get_upstream_root_fragments(&upstream_table_ids)
1777 .await?;
1778
1779 if snapshot_backfill_info.is_some() {
1780 match stream_job {
1781 StreamingJob::MaterializedView(_)
1782 | StreamingJob::Sink(_, _)
1783 | StreamingJob::Index(_, _) => {}
1784 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1785 return Err(
1786 anyhow!("snapshot_backfill not enabled for table and source").into(),
1787 );
1788 }
1789 }
1790 }
1791
1792 let upstream_actors = upstream_root_fragments
1793 .values()
1794 .map(|fragment| {
1795 (
1796 fragment.fragment_id,
1797 fragment.actors.iter().map(|actor| actor.actor_id).collect(),
1798 )
1799 })
1800 .collect();
1801
1802 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1803 fragment_graph,
1804 upstream_root_fragments,
1805 existing_actor_location,
1806 (&stream_job).into(),
1807 )?;
1808
1809 let resource_group = match specific_resource_group {
1810 None => {
1811 self.metadata_manager
1812 .get_database_resource_group(stream_job.database_id() as ObjectId)
1813 .await?
1814 }
1815 Some(resource_group) => resource_group,
1816 };
1817
1818 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1820
1821 let parallelism = self.resolve_stream_parallelism(
1822 specified_parallelism,
1823 max_parallelism,
1824 &cluster_info,
1825 resource_group.clone(),
1826 )?;
1827
1828 let parallelism = self
1829 .env
1830 .system_params_reader()
1831 .await
1832 .adaptive_parallelism_strategy()
1833 .compute_target_parallelism(parallelism.get());
1834
1835 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1836 let actor_graph_builder = ActorGraphBuilder::new(
1837 id,
1838 resource_group,
1839 complete_graph,
1840 cluster_info,
1841 parallelism,
1842 )?;
1843
1844 let ActorGraphBuildResult {
1845 graph,
1846 downstream_fragment_relations,
1847 building_locations,
1848 existing_locations,
1849 upstream_fragment_downstreams,
1850 new_no_shuffle,
1851 replace_upstream,
1852 ..
1853 } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
1854 assert!(replace_upstream.is_empty());
1855
1856 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1863 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1864 _ => TableParallelism::Fixed(parallelism.get()),
1865 };
1866
1867 let stream_job_fragments = StreamJobFragments::new(
1868 id.into(),
1869 graph,
1870 &building_locations.actor_locations,
1871 stream_ctx.clone(),
1872 table_parallelism,
1873 max_parallelism.get(),
1874 );
1875 let internal_tables = stream_job_fragments.internal_tables();
1876
1877 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1878 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1879 }
1880
1881 let replace_table_job_info = match affected_table_replace_info {
1882 Some((table_stream_job, fragment_graph)) => {
1883 if snapshot_backfill_info.is_some() {
1884 return Err(anyhow!(
1885 "snapshot backfill should not have replace table info: {table_stream_job:?}"
1886 )
1887 .into());
1888 }
1889 let StreamingJob::Sink(sink, target_table) = &mut stream_job else {
1890 bail!("additional replace table event only occurs when sinking into table");
1891 };
1892
1893 table_stream_job.table().expect("should be table job");
1894 let tmp_id = self
1895 .metadata_manager
1896 .catalog_controller
1897 .create_job_catalog_for_replace(
1898 &table_stream_job,
1899 &stream_ctx,
1900 &fragment_graph.specified_parallelism(),
1901 fragment_graph.max_parallelism(),
1902 )
1903 .await? as u32;
1904
1905 let (context, table_fragments) = self
1906 .inject_replace_table_job_for_table_sink(
1907 tmp_id,
1908 &self.metadata_manager,
1909 stream_ctx,
1910 Some(sink),
1911 Some(&stream_job_fragments),
1912 None,
1913 &table_stream_job,
1914 fragment_graph,
1915 )
1916 .await?;
1917 must_match!(&table_stream_job, StreamingJob::Table(source, table, _) => {
1921 *target_table = Some((table.clone(), source.clone()));
1923 });
1924
1925 Some((table_stream_job, context, table_fragments))
1926 }
1927 None => None,
1928 };
1929
1930 let ctx = CreateStreamingJobContext {
1931 upstream_fragment_downstreams,
1932 new_no_shuffle,
1933 upstream_actors,
1934 internal_tables,
1935 building_locations,
1936 existing_locations,
1937 definition: stream_job.definition(),
1938 mv_table_id: stream_job.mv_table(),
1939 create_type: stream_job.create_type(),
1940 job_type: (&stream_job).into(),
1941 streaming_job: stream_job,
1942 replace_table_job_info,
1943 option: CreateStreamingJobOption {},
1944 snapshot_backfill_info,
1945 cross_db_snapshot_backfill_info,
1946 fragment_backfill_ordering,
1947 };
1948
1949 Ok((
1950 ctx,
1951 StreamJobFragmentsToCreate {
1952 inner: stream_job_fragments,
1953 downstreams: downstream_fragment_relations,
1954 },
1955 ))
1956 }
1957
1958 pub(crate) async fn build_replace_job(
1964 &self,
1965 stream_ctx: StreamContext,
1966 stream_job: &StreamingJob,
1967 mut fragment_graph: StreamFragmentGraph,
1968 tmp_job_id: TableId,
1969 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1970 match &stream_job {
1971 StreamingJob::Table(..) | StreamingJob::Source(..) => {}
1972 StreamingJob::MaterializedView(..)
1973 | StreamingJob::Sink(..)
1974 | StreamingJob::Index(..) => {
1975 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1976 }
1977 }
1978
1979 let id = stream_job.id();
1980 let expr_context = stream_ctx.to_expr_context();
1981
1982 let mut drop_table_associated_source_id = None;
1984 if let StreamingJob::Table(None, _, _) = &stream_job {
1985 drop_table_associated_source_id = self
1986 .metadata_manager
1987 .get_table_associated_source_id(id as _)
1988 .await?;
1989 }
1990
1991 let old_fragments = self
1992 .metadata_manager
1993 .get_job_fragments_by_id(&id.into())
1994 .await?;
1995 let old_internal_table_ids = old_fragments.internal_table_ids();
1996
1997 let mut drop_table_connector_ctx = None;
1999 if drop_table_associated_source_id.is_some() {
2000 debug_assert!(old_internal_table_ids.len() == 1);
2002
2003 drop_table_connector_ctx = Some(DropTableConnectorContext {
2004 to_change_streaming_job_id: id as i32,
2007 to_remove_state_table_id: old_internal_table_ids[0] as i32, to_remove_source_id: drop_table_associated_source_id.unwrap(),
2009 });
2010 } else {
2011 let old_internal_tables = self
2012 .metadata_manager
2013 .get_table_catalog_by_ids(old_internal_table_ids)
2014 .await?;
2015 fragment_graph.fit_internal_table_ids(old_internal_tables)?;
2016 }
2017
2018 let original_root_fragment = old_fragments
2021 .root_fragment()
2022 .expect("root fragment not found");
2023
2024 let job_type = StreamingJobType::from(stream_job);
2025
2026 let (downstream_fragments, downstream_actor_location) =
2028 self.metadata_manager.get_downstream_fragments(id).await?;
2029
2030 let complete_graph = match &job_type {
2032 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2033 CompleteStreamFragmentGraph::with_downstreams(
2034 fragment_graph,
2035 original_root_fragment.fragment_id,
2036 downstream_fragments,
2037 downstream_actor_location,
2038 job_type,
2039 )?
2040 }
2041 StreamingJobType::Table(TableJobType::SharedCdcSource) => {
2042 let (upstream_root_fragments, upstream_actor_location) = self
2044 .metadata_manager
2045 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2046 .await?;
2047
2048 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2049 fragment_graph,
2050 upstream_root_fragments,
2051 upstream_actor_location,
2052 original_root_fragment.fragment_id,
2053 downstream_fragments,
2054 downstream_actor_location,
2055 job_type,
2056 )?
2057 }
2058 _ => unreachable!(),
2059 };
2060
2061 let resource_group = self
2062 .metadata_manager
2063 .get_existing_job_resource_group(id as ObjectId)
2064 .await?;
2065
2066 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2068
2069 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2072 .expect("The number of actors in the original table fragment should be greater than 0");
2073
2074 let actor_graph_builder = ActorGraphBuilder::new(
2075 id,
2076 resource_group,
2077 complete_graph,
2078 cluster_info,
2079 parallelism,
2080 )?;
2081
2082 let ActorGraphBuildResult {
2083 graph,
2084 downstream_fragment_relations,
2085 building_locations,
2086 existing_locations,
2087 upstream_fragment_downstreams,
2088 replace_upstream,
2089 new_no_shuffle,
2090 ..
2091 } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
2092
2093 if matches!(
2095 job_type,
2096 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2097 ) {
2098 assert!(upstream_fragment_downstreams.is_empty());
2099 }
2100
2101 let stream_job_fragments = StreamJobFragments::new(
2105 (tmp_job_id as u32).into(),
2106 graph,
2107 &building_locations.actor_locations,
2108 stream_ctx,
2109 old_fragments.assigned_parallelism,
2110 old_fragments.max_parallelism,
2111 );
2112
2113 let ctx = ReplaceStreamJobContext {
2117 old_fragments,
2118 replace_upstream,
2119 new_no_shuffle,
2120 upstream_fragment_downstreams,
2121 building_locations,
2122 existing_locations,
2123 streaming_job: stream_job.clone(),
2124 tmp_id: tmp_job_id as _,
2125 drop_table_connector_ctx,
2126 };
2127
2128 Ok((
2129 ctx,
2130 StreamJobFragmentsToCreate {
2131 inner: stream_job_fragments,
2132 downstreams: downstream_fragment_relations,
2133 },
2134 ))
2135 }
2136
2137 async fn alter_name(
2138 &self,
2139 relation: alter_name_request::Object,
2140 new_name: &str,
2141 ) -> MetaResult<NotificationVersion> {
2142 let (obj_type, id) = match relation {
2143 alter_name_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2144 alter_name_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2145 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id as ObjectId),
2146 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2147 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2148 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2149 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2150 alter_name_request::Object::SubscriptionId(id) => {
2151 (ObjectType::Subscription, id as ObjectId)
2152 }
2153 };
2154 self.metadata_manager
2155 .catalog_controller
2156 .alter_name(obj_type, id, new_name)
2157 .await
2158 }
2159
2160 async fn alter_swap_rename(
2161 &self,
2162 object: alter_swap_rename_request::Object,
2163 ) -> MetaResult<NotificationVersion> {
2164 let (obj_type, src_id, dst_id) = match object {
2165 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2166 alter_swap_rename_request::Object::Table(objs) => {
2167 let (src_id, dst_id) = (
2168 objs.src_object_id as ObjectId,
2169 objs.dst_object_id as ObjectId,
2170 );
2171 (ObjectType::Table, src_id, dst_id)
2172 }
2173 alter_swap_rename_request::Object::View(objs) => {
2174 let (src_id, dst_id) = (
2175 objs.src_object_id as ObjectId,
2176 objs.dst_object_id as ObjectId,
2177 );
2178 (ObjectType::View, src_id, dst_id)
2179 }
2180 alter_swap_rename_request::Object::Source(objs) => {
2181 let (src_id, dst_id) = (
2182 objs.src_object_id as ObjectId,
2183 objs.dst_object_id as ObjectId,
2184 );
2185 (ObjectType::Source, src_id, dst_id)
2186 }
2187 alter_swap_rename_request::Object::Sink(objs) => {
2188 let (src_id, dst_id) = (
2189 objs.src_object_id as ObjectId,
2190 objs.dst_object_id as ObjectId,
2191 );
2192 (ObjectType::Sink, src_id, dst_id)
2193 }
2194 alter_swap_rename_request::Object::Subscription(objs) => {
2195 let (src_id, dst_id) = (
2196 objs.src_object_id as ObjectId,
2197 objs.dst_object_id as ObjectId,
2198 );
2199 (ObjectType::Subscription, src_id, dst_id)
2200 }
2201 };
2202
2203 self.metadata_manager
2204 .catalog_controller
2205 .alter_swap_rename(obj_type, src_id, dst_id)
2206 .await
2207 }
2208
2209 async fn alter_owner(
2210 &self,
2211 object: Object,
2212 owner_id: UserId,
2213 ) -> MetaResult<NotificationVersion> {
2214 let (obj_type, id) = match object {
2215 Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2216 Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2217 Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2218 Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2219 Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2220 Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2221 Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId),
2222 Object::ConnectionId(id) => (ObjectType::Connection, id as ObjectId),
2223 };
2224 self.metadata_manager
2225 .catalog_controller
2226 .alter_owner(obj_type, id, owner_id as _)
2227 .await
2228 }
2229
2230 async fn alter_set_schema(
2231 &self,
2232 object: alter_set_schema_request::Object,
2233 new_schema_id: SchemaId,
2234 ) -> MetaResult<NotificationVersion> {
2235 let (obj_type, id) = match object {
2236 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2237 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2238 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2239 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2240 alter_set_schema_request::Object::FunctionId(id) => {
2241 (ObjectType::Function, id as ObjectId)
2242 }
2243 alter_set_schema_request::Object::ConnectionId(id) => {
2244 (ObjectType::Connection, id as ObjectId)
2245 }
2246 alter_set_schema_request::Object::SubscriptionId(id) => {
2247 (ObjectType::Subscription, id as ObjectId)
2248 }
2249 };
2250 self.metadata_manager
2251 .catalog_controller
2252 .alter_schema(obj_type, id, new_schema_id as _)
2253 .await
2254 }
2255
2256 pub async fn wait(&self) -> MetaResult<()> {
2257 let timeout_ms = 30 * 60 * 1000;
2258 for _ in 0..timeout_ms {
2259 if self
2260 .metadata_manager
2261 .catalog_controller
2262 .list_background_creating_mviews(true)
2263 .await?
2264 .is_empty()
2265 {
2266 return Ok(());
2267 }
2268
2269 sleep(Duration::from_millis(1)).await;
2270 }
2271 Err(MetaError::cancelled(format!(
2272 "timeout after {timeout_ms}ms"
2273 )))
2274 }
2275
2276 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2277 self.metadata_manager
2278 .catalog_controller
2279 .comment_on(comment)
2280 .await
2281 }
2282}
2283
2284fn report_create_object(
2285 catalog_id: u32,
2286 event_name: &str,
2287 obj_type: PbTelemetryDatabaseObject,
2288 connector_name: Option<String>,
2289 attr_info: Option<jsonbb::Value>,
2290) {
2291 report_event(
2292 PbTelemetryEventStage::CreateStreamJob,
2293 event_name,
2294 catalog_id.into(),
2295 connector_name,
2296 Some(obj_type),
2297 attr_info,
2298 );
2299}
2300
2301async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> {
2302 match Entity::delete_many()
2303 .filter(Column::SinkId.eq(sink_id))
2304 .exec(db)
2305 .await
2306 {
2307 Ok(result) => {
2308 let deleted_count = result.rows_affected;
2309
2310 tracing::info!(
2311 "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2312 deleted_count,
2313 sink_id
2314 );
2315 Ok(())
2316 }
2317 Err(e) => {
2318 tracing::error!(
2319 "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2320 sink_id,
2321 e.as_report()
2322 );
2323 Err(e.into())
2324 }
2325 }
2326}