1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::{InstrumentAwait, span};
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
32use risingwave_common::system_param::reader::SystemParamsRead;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
34use risingwave_common::{bail, bail_not_implemented};
35use risingwave_connector::WithOptionsSecResolved;
36use risingwave_connector::connector_common::validate_connection;
37use risingwave_connector::source::cdc::CdcScanOptions;
38use risingwave_connector::source::{
39 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
40};
41use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
42use risingwave_meta_model::object::ObjectType;
43use risingwave_meta_model::{
44 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
45 SchemaId, SecretId, SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId, WorkerId,
46};
47use risingwave_pb::catalog::{
48 Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
49 Subscription, Table, View,
50};
51use risingwave_pb::common::PbActorLocation;
52use risingwave_pb::ddl_service::alter_owner_request::Object;
53use risingwave_pb::ddl_service::{
54 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
55 alter_swap_rename_request,
56};
57use risingwave_pb::meta::table_fragments::PbActorStatus;
58use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
59use risingwave_pb::stream_plan::stream_node::NodeBody;
60use risingwave_pb::stream_plan::{
61 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
62 StreamFragmentGraph as StreamFragmentGraphProto,
63};
64use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
65use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
66use strum::Display;
67use thiserror_ext::AsReport;
68use tokio::sync::{OwnedSemaphorePermit, Semaphore, oneshot};
69use tokio::time::sleep;
70use tracing::Instrument;
71
72use crate::barrier::BarrierManagerRef;
73use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
74use crate::controller::cluster::StreamingClusterInfo;
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::controller::utils::build_select_node_list;
77use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
78use crate::manager::{
79 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
80 NotificationVersion, StreamingJob, StreamingJobType,
81};
82use crate::model::{
83 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, StreamContext,
84 StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
85};
86use crate::stream::cdc::{
87 is_parallelized_backfill_enabled, try_init_parallel_cdc_table_snapshot_splits,
88};
89use crate::stream::{
90 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
91 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
92 GlobalStreamManagerRef, JobRescheduleTarget, ReplaceStreamJobContext, SourceChange,
93 SourceManagerRef, StreamFragmentGraph, UpstreamSinkInfo,
94 check_sink_fragments_support_refresh_schema, create_source_worker,
95 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
96};
97use crate::telemetry::report_event;
98use crate::{MetaError, MetaResult};
99
100#[derive(PartialEq)]
101pub enum DropMode {
102 Restrict,
103 Cascade,
104}
105
106impl DropMode {
107 pub fn from_request_setting(cascade: bool) -> DropMode {
108 if cascade {
109 DropMode::Cascade
110 } else {
111 DropMode::Restrict
112 }
113 }
114}
115
116#[derive(strum::AsRefStr)]
117pub enum StreamingJobId {
118 MaterializedView(TableId),
119 Sink(SinkId),
120 Table(Option<SourceId>, TableId),
121 Index(IndexId),
122}
123
124impl std::fmt::Display for StreamingJobId {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 write!(f, "{}", self.as_ref())?;
127 write!(f, "({})", self.id())
128 }
129}
130
131impl StreamingJobId {
132 #[allow(dead_code)]
133 fn id(&self) -> TableId {
134 match self {
135 StreamingJobId::MaterializedView(id)
136 | StreamingJobId::Sink(id)
137 | StreamingJobId::Table(_, id)
138 | StreamingJobId::Index(id) => *id,
139 }
140 }
141}
142
143pub struct ReplaceStreamJobInfo {
146 pub streaming_job: StreamingJob,
147 pub fragment_graph: StreamFragmentGraphProto,
148}
149
150#[derive(Display)]
151pub enum DdlCommand {
152 CreateDatabase(Database),
153 DropDatabase(DatabaseId),
154 CreateSchema(Schema),
155 DropSchema(SchemaId, DropMode),
156 CreateNonSharedSource(Source),
157 DropSource(SourceId, DropMode),
158 CreateFunction(Function),
159 DropFunction(FunctionId, DropMode),
160 CreateView(View, HashSet<ObjectId>),
161 DropView(ViewId, DropMode),
162 CreateStreamingJob {
163 stream_job: StreamingJob,
164 fragment_graph: StreamFragmentGraphProto,
165 dependencies: HashSet<ObjectId>,
166 specific_resource_group: Option<String>, if_not_exists: bool,
168 },
169 DropStreamingJob {
170 job_id: StreamingJobId,
171 drop_mode: DropMode,
172 },
173 AlterName(alter_name_request::Object, String),
174 AlterSwapRename(alter_swap_rename_request::Object),
175 ReplaceStreamJob(ReplaceStreamJobInfo),
176 AlterNonSharedSource(Source),
177 AlterObjectOwner(Object, UserId),
178 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
179 CreateConnection(Connection),
180 DropConnection(ConnectionId, DropMode),
181 CreateSecret(Secret),
182 AlterSecret(Secret),
183 DropSecret(SecretId),
184 CommentOn(Comment),
185 CreateSubscription(Subscription),
186 DropSubscription(SubscriptionId, DropMode),
187 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
188}
189
190impl DdlCommand {
191 fn object(&self) -> Either<String, ObjectId> {
193 use Either::*;
194 match self {
195 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
196 DdlCommand::DropDatabase(id) => Right(*id),
197 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
198 DdlCommand::DropSchema(id, _) => Right(*id),
199 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
200 DdlCommand::DropSource(id, _) => Right(*id),
201 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
202 DdlCommand::DropFunction(id, _) => Right(*id),
203 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
204 DdlCommand::DropView(id, _) => Right(*id),
205 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
206 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id()),
207 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
208 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
209 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
210 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
211 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
212 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
213 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
214 DdlCommand::DropConnection(id, _) => Right(*id),
215 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
216 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
217 DdlCommand::DropSecret(id) => Right(*id),
218 DdlCommand::CommentOn(comment) => Right(comment.table_id as _),
219 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
220 DdlCommand::DropSubscription(id, _) => Right(*id),
221 DdlCommand::AlterDatabaseParam(id, _) => Right(*id),
222 }
223 }
224
225 fn allow_in_recovery(&self) -> bool {
226 match self {
227 DdlCommand::DropDatabase(_)
228 | DdlCommand::DropSchema(_, _)
229 | DdlCommand::DropSource(_, _)
230 | DdlCommand::DropFunction(_, _)
231 | DdlCommand::DropView(_, _)
232 | DdlCommand::DropStreamingJob { .. }
233 | DdlCommand::DropConnection(_, _)
234 | DdlCommand::DropSecret(_)
235 | DdlCommand::DropSubscription(_, _)
236 | DdlCommand::AlterName(_, _)
237 | DdlCommand::AlterObjectOwner(_, _)
238 | DdlCommand::AlterSetSchema(_, _)
239 | DdlCommand::CreateDatabase(_)
240 | DdlCommand::CreateSchema(_)
241 | DdlCommand::CreateFunction(_)
242 | DdlCommand::CreateView(_, _)
243 | DdlCommand::CreateConnection(_)
244 | DdlCommand::CommentOn(_)
245 | DdlCommand::CreateSecret(_)
246 | DdlCommand::AlterSecret(_)
247 | DdlCommand::AlterSwapRename(_)
248 | DdlCommand::AlterDatabaseParam(_, _) => true,
249 DdlCommand::CreateStreamingJob { .. }
250 | DdlCommand::CreateNonSharedSource(_)
251 | DdlCommand::ReplaceStreamJob(_)
252 | DdlCommand::AlterNonSharedSource(_)
253 | DdlCommand::CreateSubscription(_) => false,
254 }
255 }
256}
257
258#[derive(Clone)]
259pub struct DdlController {
260 pub(crate) env: MetaSrvEnv,
261
262 pub(crate) metadata_manager: MetadataManager,
263 pub(crate) stream_manager: GlobalStreamManagerRef,
264 pub(crate) source_manager: SourceManagerRef,
265 barrier_manager: BarrierManagerRef,
266
267 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
269
270 seq: Arc<AtomicU64>,
272}
273
274#[derive(Clone)]
275pub struct CreatingStreamingJobPermit {
276 pub(crate) semaphore: Arc<Semaphore>,
277}
278
279impl CreatingStreamingJobPermit {
280 async fn new(env: &MetaSrvEnv) -> Self {
281 let mut permits = env
282 .system_params_reader()
283 .await
284 .max_concurrent_creating_streaming_jobs() as usize;
285 if permits == 0 {
286 permits = Semaphore::MAX_PERMITS;
288 }
289 let semaphore = Arc::new(Semaphore::new(permits));
290
291 let (local_notification_tx, mut local_notification_rx) =
292 tokio::sync::mpsc::unbounded_channel();
293 env.notification_manager()
294 .insert_local_sender(local_notification_tx);
295 let semaphore_clone = semaphore.clone();
296 tokio::spawn(async move {
297 while let Some(notification) = local_notification_rx.recv().await {
298 let LocalNotification::SystemParamsChange(p) = ¬ification else {
299 continue;
300 };
301 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
302 if new_permits == 0 {
303 new_permits = Semaphore::MAX_PERMITS;
304 }
305 match permits.cmp(&new_permits) {
306 Ordering::Less => {
307 semaphore_clone.add_permits(new_permits - permits);
308 }
309 Ordering::Equal => continue,
310 Ordering::Greater => {
311 let to_release = permits - new_permits;
312 let reduced = semaphore_clone.forget_permits(to_release);
313 if reduced != to_release {
315 tracing::warn!(
316 "no enough permits to release, expected {}, but reduced {}",
317 to_release,
318 reduced
319 );
320 }
321 }
322 }
323 tracing::info!(
324 "max_concurrent_creating_streaming_jobs changed from {} to {}",
325 permits,
326 new_permits
327 );
328 permits = new_permits;
329 }
330 });
331
332 Self { semaphore }
333 }
334}
335
336impl DdlController {
337 pub async fn new(
338 env: MetaSrvEnv,
339 metadata_manager: MetadataManager,
340 stream_manager: GlobalStreamManagerRef,
341 source_manager: SourceManagerRef,
342 barrier_manager: BarrierManagerRef,
343 ) -> Self {
344 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
345 Self {
346 env,
347 metadata_manager,
348 stream_manager,
349 source_manager,
350 barrier_manager,
351 creating_streaming_job_permits,
352 seq: Arc::new(AtomicU64::new(0)),
353 }
354 }
355
356 pub fn next_seq(&self) -> u64 {
358 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
360 }
361
362 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
369 if !command.allow_in_recovery() {
370 self.barrier_manager.check_status_running()?;
371 }
372
373 let await_tree_key = format!("DDL Command {}", self.next_seq());
374 let await_tree_span = await_tree::span!("{command}({})", command.object());
375
376 let ctrl = self.clone();
377 let fut = async move {
378 match command {
379 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
380 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
381 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
382 DdlCommand::DropSchema(schema_id, drop_mode) => {
383 ctrl.drop_schema(schema_id, drop_mode).await
384 }
385 DdlCommand::CreateNonSharedSource(source) => {
386 ctrl.create_non_shared_source(source).await
387 }
388 DdlCommand::DropSource(source_id, drop_mode) => {
389 ctrl.drop_source(source_id, drop_mode).await
390 }
391 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
392 DdlCommand::DropFunction(function_id, drop_mode) => {
393 ctrl.drop_function(function_id, drop_mode).await
394 }
395 DdlCommand::CreateView(view, dependencies) => {
396 ctrl.create_view(view, dependencies).await
397 }
398 DdlCommand::DropView(view_id, drop_mode) => {
399 ctrl.drop_view(view_id, drop_mode).await
400 }
401 DdlCommand::CreateStreamingJob {
402 stream_job,
403 fragment_graph,
404 dependencies,
405 specific_resource_group,
406 if_not_exists,
407 } => {
408 ctrl.create_streaming_job(
409 stream_job,
410 fragment_graph,
411 dependencies,
412 specific_resource_group,
413 if_not_exists,
414 )
415 .await
416 }
417 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
418 ctrl.drop_streaming_job(job_id, drop_mode).await
419 }
420 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
421 streaming_job,
422 fragment_graph,
423 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
424 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
425 DdlCommand::AlterObjectOwner(object, owner_id) => {
426 ctrl.alter_owner(object, owner_id).await
427 }
428 DdlCommand::AlterSetSchema(object, new_schema_id) => {
429 ctrl.alter_set_schema(object, new_schema_id).await
430 }
431 DdlCommand::CreateConnection(connection) => {
432 ctrl.create_connection(connection).await
433 }
434 DdlCommand::DropConnection(connection_id, drop_mode) => {
435 ctrl.drop_connection(connection_id, drop_mode).await
436 }
437 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
438 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
439 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
440 DdlCommand::AlterNonSharedSource(source) => {
441 ctrl.alter_non_shared_source(source).await
442 }
443 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
444 DdlCommand::CreateSubscription(subscription) => {
445 ctrl.create_subscription(subscription).await
446 }
447 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
448 ctrl.drop_subscription(subscription_id, drop_mode).await
449 }
450 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
451 DdlCommand::AlterDatabaseParam(database_id, param) => {
452 ctrl.alter_database_param(database_id, param).await
453 }
454 }
455 }
456 .in_current_span();
457 let fut = (self.env.await_tree_reg())
458 .register(await_tree_key, await_tree_span)
459 .instrument(Box::pin(fut));
460 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
461 Ok(Some(WaitVersion {
462 catalog_version: notification_version,
463 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
464 }))
465 }
466
467 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
468 self.barrier_manager.get_ddl_progress().await
469 }
470
471 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
472 let (version, updated_db) = self
473 .metadata_manager
474 .catalog_controller
475 .create_database(database)
476 .await?;
477 self.barrier_manager
479 .update_database_barrier(
480 updated_db.database_id,
481 updated_db.barrier_interval_ms.map(|v| v as u32),
482 updated_db.checkpoint_frequency.map(|v| v as u64),
483 )
484 .await?;
485 Ok(version)
486 }
487
488 #[tracing::instrument(skip(self), level = "debug")]
489 pub async fn reschedule_streaming_job(
490 &self,
491 job_id: u32,
492 target: JobRescheduleTarget,
493 mut deferred: bool,
494 ) -> MetaResult<()> {
495 tracing::info!("alter parallelism");
496 if self.barrier_manager.check_status_running().is_err() {
497 tracing::info!(
498 "alter parallelism is set to deferred mode because the system is in recovery state"
499 );
500 deferred = true;
501 }
502
503 self.stream_manager
504 .reschedule_streaming_job(job_id, target, deferred)
505 .await
506 }
507
508 pub async fn reschedule_cdc_table_backfill(
509 &self,
510 job_id: u32,
511 target: JobRescheduleTarget,
512 ) -> MetaResult<()> {
513 tracing::info!("alter CDC table backfill parallelism");
514 if self.barrier_manager.check_status_running().is_err() {
515 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
516 }
517 self.stream_manager
518 .reschedule_cdc_table_backfill(job_id, target)
519 .await
520 }
521
522 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
523 self.drop_object(ObjectType::Database, database_id as _, DropMode::Cascade)
524 .await
525 }
526
527 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
528 self.metadata_manager
529 .catalog_controller
530 .create_schema(schema)
531 .await
532 }
533
534 async fn drop_schema(
535 &self,
536 schema_id: SchemaId,
537 drop_mode: DropMode,
538 ) -> MetaResult<NotificationVersion> {
539 self.drop_object(ObjectType::Schema, schema_id as _, drop_mode)
540 .await
541 }
542
543 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
545 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
546 .await
547 .context("failed to create source worker")?;
548
549 let (source_id, version) = self
550 .metadata_manager
551 .catalog_controller
552 .create_source(source)
553 .await?;
554 self.source_manager
555 .register_source_with_handle(source_id, handle)
556 .await;
557 Ok(version)
558 }
559
560 async fn drop_source(
561 &self,
562 source_id: SourceId,
563 drop_mode: DropMode,
564 ) -> MetaResult<NotificationVersion> {
565 self.drop_object(ObjectType::Source, source_id as _, drop_mode)
566 .await
567 }
568
569 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
572 self.metadata_manager
573 .catalog_controller
574 .alter_non_shared_source(source)
575 .await
576 }
577
578 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
579 self.metadata_manager
580 .catalog_controller
581 .create_function(function)
582 .await
583 }
584
585 async fn drop_function(
586 &self,
587 function_id: FunctionId,
588 drop_mode: DropMode,
589 ) -> MetaResult<NotificationVersion> {
590 self.drop_object(ObjectType::Function, function_id as _, drop_mode)
591 .await
592 }
593
594 async fn create_view(
595 &self,
596 view: View,
597 dependencies: HashSet<ObjectId>,
598 ) -> MetaResult<NotificationVersion> {
599 self.metadata_manager
600 .catalog_controller
601 .create_view(view, dependencies)
602 .await
603 }
604
605 async fn drop_view(
606 &self,
607 view_id: ViewId,
608 drop_mode: DropMode,
609 ) -> MetaResult<NotificationVersion> {
610 self.drop_object(ObjectType::View, view_id as _, drop_mode)
611 .await
612 }
613
614 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
615 validate_connection(&connection).await?;
616 self.metadata_manager
617 .catalog_controller
618 .create_connection(connection)
619 .await
620 }
621
622 async fn drop_connection(
623 &self,
624 connection_id: ConnectionId,
625 drop_mode: DropMode,
626 ) -> MetaResult<NotificationVersion> {
627 self.drop_object(ObjectType::Connection, connection_id as _, drop_mode)
628 .await
629 }
630
631 async fn alter_database_param(
632 &self,
633 database_id: DatabaseId,
634 param: AlterDatabaseParam,
635 ) -> MetaResult<NotificationVersion> {
636 let (version, updated_db) = self
637 .metadata_manager
638 .catalog_controller
639 .alter_database_param(database_id, param)
640 .await?;
641 self.barrier_manager
643 .update_database_barrier(
644 database_id,
645 updated_db.barrier_interval_ms.map(|v| v as u32),
646 updated_db.checkpoint_frequency.map(|v| v as u64),
647 )
648 .await?;
649 Ok(version)
650 }
651
652 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
655 let secret_store_private_key = self
656 .env
657 .opts
658 .secret_store_private_key
659 .clone()
660 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
661
662 let encrypted_payload = SecretEncryption::encrypt(
663 secret_store_private_key.as_slice(),
664 secret.get_value().as_slice(),
665 )
666 .context(format!("failed to encrypt secret {}", secret.name))?;
667 Ok(encrypted_payload
668 .serialize()
669 .context(format!("failed to serialize secret {}", secret.name))?)
670 }
671
672 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
673 let secret_plain_payload = secret.value.clone();
676 let encrypted_payload = self.get_encrypted_payload(&secret)?;
677 secret.value = encrypted_payload;
678
679 self.metadata_manager
680 .catalog_controller
681 .create_secret(secret, secret_plain_payload)
682 .await
683 }
684
685 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
686 self.drop_object(ObjectType::Secret, secret_id as _, DropMode::Restrict)
687 .await
688 }
689
690 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
691 let secret_plain_payload = secret.value.clone();
692 let encrypted_payload = self.get_encrypted_payload(&secret)?;
693 secret.value = encrypted_payload;
694 self.metadata_manager
695 .catalog_controller
696 .alter_secret(secret, secret_plain_payload)
697 .await
698 }
699
700 async fn create_subscription(
701 &self,
702 mut subscription: Subscription,
703 ) -> MetaResult<NotificationVersion> {
704 tracing::debug!("create subscription");
705 let _permit = self
706 .creating_streaming_job_permits
707 .semaphore
708 .acquire()
709 .await
710 .unwrap();
711 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
712 self.metadata_manager
713 .catalog_controller
714 .create_subscription_catalog(&mut subscription)
715 .await?;
716 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
717 tracing::debug!(error = %err.as_report(), "failed to create subscription");
718 let _ = self
719 .metadata_manager
720 .catalog_controller
721 .try_abort_creating_subscription(subscription.id as _)
722 .await
723 .inspect_err(|e| {
724 tracing::error!(
725 error = %e.as_report(),
726 "failed to abort create subscription after failure"
727 );
728 });
729 return Err(err);
730 }
731
732 let version = self
733 .metadata_manager
734 .catalog_controller
735 .notify_create_subscription(subscription.id)
736 .await?;
737 tracing::debug!("finish create subscription");
738 Ok(version)
739 }
740
741 async fn drop_subscription(
742 &self,
743 subscription_id: SubscriptionId,
744 drop_mode: DropMode,
745 ) -> MetaResult<NotificationVersion> {
746 tracing::debug!("preparing drop subscription");
747 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
748 let subscription = self
749 .metadata_manager
750 .catalog_controller
751 .get_subscription_by_id(subscription_id)
752 .await?;
753 let table_id = subscription.dependent_table_id;
754 let database_id = subscription.database_id.into();
755 let (_, version) = self
756 .metadata_manager
757 .catalog_controller
758 .drop_object(ObjectType::Subscription, subscription_id as _, drop_mode)
759 .await?;
760 self.stream_manager
761 .drop_subscription(database_id, subscription_id as _, table_id)
762 .await;
763 tracing::debug!("finish drop subscription");
764 Ok(version)
765 }
766
767 #[await_tree::instrument]
769 pub(crate) async fn validate_cdc_table(
770 &self,
771 table: &Table,
772 table_fragments: &StreamJobFragments,
773 ) -> MetaResult<()> {
774 let stream_scan_fragment = table_fragments
775 .fragments
776 .values()
777 .filter(|f| {
778 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
779 || f.fragment_type_mask
780 .contains(FragmentTypeFlag::StreamCdcScan)
781 })
782 .exactly_one()
783 .ok()
784 .with_context(|| {
785 format!(
786 "expect exactly one stream scan fragment, got: {:?}",
787 table_fragments.fragments
788 )
789 })?;
790 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
791 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
792 if let Some(o) = node.options
793 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
794 {
795 } else {
797 assert_eq!(
798 stream_scan_fragment.actors.len(),
799 1,
800 "Stream scan fragment should have only one actor"
801 );
802 }
803 }
804 }
805 let mut found_cdc_scan = false;
806 match &stream_scan_fragment.nodes.node_body {
807 Some(NodeBody::StreamCdcScan(_)) => {
808 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
809 if self
810 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
811 .await?
812 {
813 found_cdc_scan = true;
814 }
815 }
816 Some(NodeBody::Project(_)) => {
818 for input in &stream_scan_fragment.nodes.input {
819 assert_parallelism(stream_scan_fragment, &input.node_body);
820 if self
821 .validate_cdc_table_inner(&input.node_body, table.id)
822 .await?
823 {
824 found_cdc_scan = true;
825 }
826 }
827 }
828 _ => {
829 bail!("Unexpected node body for stream cdc scan");
830 }
831 };
832 if !found_cdc_scan {
833 bail!("No stream cdc scan node found in stream scan fragment");
834 }
835 Ok(())
836 }
837
838 async fn validate_cdc_table_inner(
839 &self,
840 node_body: &Option<NodeBody>,
841 table_id: u32,
842 ) -> MetaResult<bool> {
843 let meta_store = self.env.meta_store_ref();
844 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
845 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
846 {
847 let options_with_secret = WithOptionsSecResolved::new(
848 cdc_table_desc.connect_properties.clone(),
849 cdc_table_desc.secret_refs.clone(),
850 );
851
852 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
853 props.init_from_pb_cdc_table_desc(cdc_table_desc);
854
855 let _enumerator = props
857 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
858 .await?;
859
860 if is_parallelized_backfill_enabled(stream_cdc_scan) {
861 try_init_parallel_cdc_table_snapshot_splits(
863 table_id,
864 cdc_table_desc,
865 meta_store,
866 &stream_cdc_scan.options,
867 self.env.opts.cdc_table_split_init_insert_batch_size,
868 self.env.opts.cdc_table_split_init_sleep_interval_splits,
869 self.env.opts.cdc_table_split_init_sleep_duration_millis,
870 )
871 .await?;
872 }
873
874 tracing::debug!(?table_id, "validate cdc table success");
875 Ok(true)
876 } else {
877 Ok(false)
878 }
879 }
880
881 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
882 let migrated = self
883 .metadata_manager
884 .catalog_controller
885 .has_table_been_migrated(table_id)
886 .await?;
887 if !migrated {
888 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
889 } else {
890 Ok(())
891 }
892 }
893
894 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
897 pub async fn create_streaming_job(
898 &self,
899 mut streaming_job: StreamingJob,
900 fragment_graph: StreamFragmentGraphProto,
901 dependencies: HashSet<ObjectId>,
902 specific_resource_group: Option<String>,
903 if_not_exists: bool,
904 ) -> MetaResult<NotificationVersion> {
905 if let StreamingJob::Sink(sink) = &streaming_job
906 && let Some(target_table) = sink.target_table
907 {
908 self.validate_table_for_sink(target_table as _).await?;
909 }
910 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
911 let check_ret = self
912 .metadata_manager
913 .catalog_controller
914 .create_job_catalog(
915 &mut streaming_job,
916 &ctx,
917 &fragment_graph.parallelism,
918 fragment_graph.max_parallelism as _,
919 dependencies,
920 specific_resource_group.clone(),
921 )
922 .await;
923 if let Err(meta_err) = check_ret {
924 if !if_not_exists {
925 return Err(meta_err);
926 }
927 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
928 if streaming_job.create_type() == CreateType::Foreground {
929 let database_id = streaming_job.database_id();
930 self.metadata_manager
931 .wait_streaming_job_finished(database_id.into(), *job_id)
932 .await
933 } else {
934 Ok(IGNORED_NOTIFICATION_VERSION)
935 }
936 } else {
937 Err(meta_err)
938 };
939 }
940 let job_id = streaming_job.id();
941 tracing::debug!(
942 id = job_id,
943 definition = streaming_job.definition(),
944 create_type = streaming_job.create_type().as_str_name(),
945 job_type = ?streaming_job.job_type(),
946 "starting streaming job",
947 );
948 let permit = self
950 .creating_streaming_job_permits
951 .semaphore
952 .clone()
953 .acquire_owned()
954 .instrument_await("acquire_creating_streaming_job_permit")
955 .await
956 .unwrap();
957 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
958
959 let name = streaming_job.name();
960 let definition = streaming_job.definition();
961 let source_id = match &streaming_job {
962 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
963 _ => None,
964 };
965
966 match self
968 .create_streaming_job_inner(
969 ctx,
970 streaming_job,
971 fragment_graph,
972 specific_resource_group,
973 permit,
974 )
975 .await
976 {
977 Ok(version) => Ok(version),
978 Err(err) => {
979 tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
980 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
981 id: job_id,
982 name,
983 definition,
984 error: err.as_report().to_string(),
985 };
986 self.env.event_log_manager_ref().add_event_logs(vec![
987 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
988 ]);
989 let (aborted, _) = self
990 .metadata_manager
991 .catalog_controller
992 .try_abort_creating_streaming_job(job_id as _, false)
993 .await?;
994 if aborted {
995 tracing::warn!(id = job_id, "aborted streaming job");
996 if let Some(source_id) = source_id {
998 self.source_manager
999 .apply_source_change(SourceChange::DropSource {
1000 dropped_source_ids: vec![source_id as SourceId],
1001 })
1002 .await;
1003 }
1004 }
1005 Err(err)
1006 }
1007 }
1008 }
1009
1010 #[await_tree::instrument(boxed)]
1011 async fn create_streaming_job_inner(
1012 &self,
1013 ctx: StreamContext,
1014 mut streaming_job: StreamingJob,
1015 fragment_graph: StreamFragmentGraphProto,
1016 specific_resource_group: Option<String>,
1017 permit: OwnedSemaphorePermit,
1018 ) -> MetaResult<NotificationVersion> {
1019 let mut fragment_graph =
1020 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1021 streaming_job.set_info_from_graph(&fragment_graph);
1022
1023 let incomplete_internal_tables = fragment_graph
1025 .incomplete_internal_tables()
1026 .into_values()
1027 .collect_vec();
1028 let table_id_map = self
1029 .metadata_manager
1030 .catalog_controller
1031 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1032 .await?;
1033 fragment_graph.refill_internal_table_ids(table_id_map);
1034
1035 tracing::debug!(id = streaming_job.id(), "building streaming job");
1037 let (ctx, stream_job_fragments) = self
1038 .build_stream_job(ctx, streaming_job, fragment_graph, specific_resource_group)
1039 .await?;
1040
1041 let streaming_job = &ctx.streaming_job;
1042
1043 match streaming_job {
1044 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1045 self.validate_cdc_table(table, &stream_job_fragments)
1046 .await?;
1047 }
1048 StreamingJob::Table(Some(source), ..) => {
1049 self.source_manager.register_source(source).await?;
1051 let connector_name = source
1052 .get_with_properties()
1053 .get(UPSTREAM_SOURCE_KEY)
1054 .cloned();
1055 let attr = source.info.as_ref().map(|source_info| {
1056 jsonbb::json!({
1057 "format": source_info.format().as_str_name(),
1058 "encode": source_info.row_encode().as_str_name(),
1059 })
1060 });
1061 report_create_object(
1062 streaming_job.id(),
1063 "source",
1064 PbTelemetryDatabaseObject::Source,
1065 connector_name,
1066 attr,
1067 );
1068 }
1069 StreamingJob::Sink(sink) => {
1070 if sink.auto_refresh_schema_from_table.is_some() {
1071 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1072 }
1073 validate_sink(sink).await?;
1075 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1076 let attr = sink.format_desc.as_ref().map(|sink_info| {
1077 jsonbb::json!({
1078 "format": sink_info.format().as_str_name(),
1079 "encode": sink_info.encode().as_str_name(),
1080 })
1081 });
1082 report_create_object(
1083 streaming_job.id(),
1084 "sink",
1085 PbTelemetryDatabaseObject::Sink,
1086 connector_name,
1087 attr,
1088 );
1089 }
1090 StreamingJob::Source(source) => {
1091 self.source_manager.register_source(source).await?;
1093 let connector_name = source
1094 .get_with_properties()
1095 .get(UPSTREAM_SOURCE_KEY)
1096 .cloned();
1097 let attr = source.info.as_ref().map(|source_info| {
1098 jsonbb::json!({
1099 "format": source_info.format().as_str_name(),
1100 "encode": source_info.row_encode().as_str_name(),
1101 })
1102 });
1103 report_create_object(
1104 streaming_job.id(),
1105 "source",
1106 PbTelemetryDatabaseObject::Source,
1107 connector_name,
1108 attr,
1109 );
1110 }
1111 _ => {}
1112 }
1113
1114 self.metadata_manager
1115 .catalog_controller
1116 .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1117 .await?;
1118
1119 let stream_job_id = streaming_job.id();
1121 match streaming_job.create_type() {
1122 CreateType::Unspecified | CreateType::Foreground => {
1123 let version = self
1124 .stream_manager
1125 .create_streaming_job(stream_job_fragments, ctx, None)
1126 .await?;
1127 Ok(version)
1128 }
1129 CreateType::Background => {
1130 let await_tree_key = format!("Background DDL Worker ({})", stream_job_id);
1131 let await_tree_span =
1132 span!("{:?}({})", streaming_job.job_type(), streaming_job.name());
1133
1134 let ctrl = self.clone();
1135 let (tx, rx) = oneshot::channel();
1136 let fut = async move {
1137 let _ = ctrl
1138 .stream_manager
1139 .create_streaming_job(stream_job_fragments, ctx, Some(tx))
1140 .await
1141 .inspect_err(|err| {
1142 tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
1143 });
1144 drop(permit);
1146 };
1147
1148 let fut = (self.env.await_tree_reg())
1149 .register(await_tree_key, await_tree_span)
1150 .instrument(fut);
1151 tokio::spawn(fut);
1152
1153 rx.instrument_await("wait_background_streaming_job_creation_started")
1154 .await
1155 .map_err(|_| {
1156 anyhow!(
1157 "failed to receive create streaming job result of job: {}",
1158 stream_job_id
1159 )
1160 })??;
1161 Ok(IGNORED_NOTIFICATION_VERSION)
1162 }
1163 }
1164 }
1165
1166 pub async fn drop_object(
1168 &self,
1169 object_type: ObjectType,
1170 object_id: ObjectId,
1171 drop_mode: DropMode,
1172 ) -> MetaResult<NotificationVersion> {
1173 let (release_ctx, version) = self
1174 .metadata_manager
1175 .catalog_controller
1176 .drop_object(object_type, object_id, drop_mode)
1177 .await?;
1178
1179 let ReleaseContext {
1180 database_id,
1181 removed_streaming_job_ids,
1182 removed_state_table_ids,
1183 removed_source_ids,
1184 removed_secret_ids: secret_ids,
1185 removed_source_fragments,
1186 removed_actors,
1187 removed_fragments,
1188 removed_sink_fragment_by_targets,
1189 } = release_ctx;
1190
1191 let _guard = self.source_manager.pause_tick().await;
1192 self.stream_manager
1193 .drop_streaming_jobs(
1194 risingwave_common::catalog::DatabaseId::new(database_id as _),
1195 removed_actors.iter().map(|id| *id as _).collect(),
1196 removed_streaming_job_ids,
1197 removed_state_table_ids,
1198 removed_fragments.iter().map(|id| *id as _).collect(),
1199 removed_sink_fragment_by_targets
1200 .into_iter()
1201 .map(|(target, sinks)| {
1202 (target as _, sinks.into_iter().map(|id| id as _).collect())
1203 })
1204 .collect(),
1205 )
1206 .await;
1207
1208 self.source_manager
1211 .apply_source_change(SourceChange::DropSource {
1212 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1213 })
1214 .await;
1215
1216 let dropped_source_fragments = removed_source_fragments
1219 .into_iter()
1220 .map(|(source_id, fragments)| {
1221 (
1222 source_id,
1223 fragments.into_iter().map(|id| id as u32).collect(),
1224 )
1225 })
1226 .collect();
1227 let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
1228 self.source_manager
1229 .apply_source_change(SourceChange::DropMv {
1230 dropped_source_fragments,
1231 dropped_actors,
1232 })
1233 .await;
1234
1235 for secret in secret_ids {
1237 LocalSecretManager::global().remove_secret(secret as _);
1238 }
1239 Ok(version)
1240 }
1241
1242 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1244 pub async fn replace_job(
1245 &self,
1246 mut streaming_job: StreamingJob,
1247 fragment_graph: StreamFragmentGraphProto,
1248 ) -> MetaResult<NotificationVersion> {
1249 match &streaming_job {
1250 StreamingJob::Table(..)
1251 | StreamingJob::Source(..)
1252 | StreamingJob::MaterializedView(..) => {}
1253 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1254 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1255 }
1256 }
1257
1258 let job_id = streaming_job.id();
1259
1260 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1261 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1262
1263 let original_max_parallelism = self
1265 .metadata_manager
1266 .get_job_max_parallelism(streaming_job.id().into())
1267 .await?;
1268 let fragment_graph = PbStreamFragmentGraph {
1269 max_parallelism: original_max_parallelism as _,
1270 ..fragment_graph
1271 };
1272
1273 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1275 streaming_job.set_info_from_graph(&fragment_graph);
1276
1277 let streaming_job = streaming_job;
1279
1280 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1281 let auto_refresh_schema_sinks = self
1282 .metadata_manager
1283 .catalog_controller
1284 .get_sink_auto_refresh_schema_from(table.id as _)
1285 .await?;
1286 if !auto_refresh_schema_sinks.is_empty() {
1287 let original_table_columns = self
1288 .metadata_manager
1289 .catalog_controller
1290 .get_table_columns(table.id as _)
1291 .await?;
1292 let mut original_table_column_ids: HashSet<_> = original_table_columns
1294 .iter()
1295 .map(|col| col.column_id())
1296 .collect();
1297 let newly_added_columns = table
1298 .columns
1299 .iter()
1300 .filter(|col| {
1301 !original_table_column_ids.remove(&ColumnId::new(
1302 col.column_desc.as_ref().unwrap().column_id as _,
1303 ))
1304 })
1305 .map(|col| ColumnCatalog::from(col.clone()))
1306 .collect_vec();
1307 if !original_table_column_ids.is_empty() {
1308 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1309 }
1310 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1311 for sink in auto_refresh_schema_sinks {
1312 let sink_job_fragments = self
1313 .metadata_manager
1314 .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink.id))
1315 .await?;
1316 if sink_job_fragments.fragments.len() != 1 {
1317 return Err(anyhow!(
1318 "auto schema refresh sink must have only one fragment, but got {}",
1319 sink_job_fragments.fragments.len()
1320 )
1321 .into());
1322 }
1323 let original_sink_fragment =
1324 sink_job_fragments.fragments.into_values().next().unwrap();
1325 let (new_sink_fragment, new_schema, new_log_store_table) =
1326 rewrite_refresh_schema_sink_fragment(
1327 &original_sink_fragment,
1328 &sink,
1329 &newly_added_columns,
1330 table,
1331 fragment_graph.table_fragment_id(),
1332 self.env.id_gen_manager(),
1333 )?;
1334
1335 assert_eq!(
1336 original_sink_fragment.actors.len(),
1337 new_sink_fragment.actors.len()
1338 );
1339 let actor_status = (0..original_sink_fragment.actors.len())
1340 .map(|i| {
1341 let worker_node_id = sink_job_fragments.actor_status
1342 [&original_sink_fragment.actors[i].actor_id]
1343 .location
1344 .as_ref()
1345 .unwrap()
1346 .worker_node_id;
1347 (
1348 new_sink_fragment.actors[i].actor_id,
1349 PbActorStatus {
1350 location: Some(PbActorLocation { worker_node_id }),
1351 state: PbActorState::Inactive as _,
1352 },
1353 )
1354 })
1355 .collect();
1356
1357 let streaming_job = StreamingJob::Sink(sink);
1358
1359 let tmp_sink_id = self
1360 .metadata_manager
1361 .catalog_controller
1362 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1363 .await?;
1364 let StreamingJob::Sink(sink) = streaming_job else {
1365 unreachable!()
1366 };
1367
1368 sinks.push(AutoRefreshSchemaSinkContext {
1369 tmp_sink_id,
1370 original_sink: sink,
1371 original_fragment: original_sink_fragment,
1372 new_schema,
1373 newly_add_fields: newly_added_columns
1374 .iter()
1375 .map(|col| Field::from(&col.column_desc))
1376 .collect(),
1377 new_fragment: new_sink_fragment,
1378 new_log_store_table,
1379 actor_status,
1380 });
1381 }
1382 Some(sinks)
1383 } else {
1384 None
1385 }
1386 } else {
1387 None
1388 };
1389
1390 let tmp_id = self
1391 .metadata_manager
1392 .catalog_controller
1393 .create_job_catalog_for_replace(
1394 &streaming_job,
1395 Some(&ctx),
1396 fragment_graph.specified_parallelism().as_ref(),
1397 Some(fragment_graph.max_parallelism()),
1398 )
1399 .await?;
1400
1401 let tmp_sink_ids = auto_refresh_schema_sinks
1402 .as_ref()
1403 .map(|sinks| sinks.iter().map(|sink| sink.tmp_sink_id).collect_vec());
1404
1405 tracing::debug!(id = job_id, "building replace streaming job");
1406 let mut updated_sink_catalogs = vec![];
1407
1408 let mut drop_table_connector_ctx = None;
1409 let result: MetaResult<_> = try {
1410 let (mut ctx, mut stream_job_fragments) = self
1411 .build_replace_job(
1412 ctx,
1413 &streaming_job,
1414 fragment_graph,
1415 tmp_id as _,
1416 auto_refresh_schema_sinks,
1417 )
1418 .await?;
1419 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1420 let auto_refresh_schema_sink_finish_ctx =
1421 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1422 sinks
1423 .iter()
1424 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1425 tmp_sink_id: sink.tmp_sink_id,
1426 original_sink_id: sink.original_sink.id as _,
1427 columns: sink.new_schema.clone(),
1428 new_log_store_table: sink
1429 .new_log_store_table
1430 .as_ref()
1431 .map(|table| (table.id as _, table.columns.clone())),
1432 })
1433 .collect()
1434 });
1435
1436 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1438 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1439 let upstream_infos = self
1440 .metadata_manager
1441 .catalog_controller
1442 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1443 .await?;
1444 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1445
1446 for upstream_info in &upstream_infos {
1447 let upstream_fragment_id = upstream_info.sink_fragment_id;
1448 ctx.upstream_fragment_downstreams
1449 .entry(upstream_fragment_id)
1450 .or_default()
1451 .push(upstream_info.new_sink_downstream.clone());
1452 if upstream_info.sink_original_target_columns.is_empty() {
1453 updated_sink_catalogs.push(upstream_info.sink_id);
1454 }
1455 }
1456 }
1457
1458 let replace_upstream = ctx.replace_upstream.clone();
1459
1460 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1461 let empty_actor_splits = HashMap::new();
1462 let empty_downstreams = FragmentDownstreamRelation::default();
1463 for sink in sinks {
1464 self.metadata_manager
1465 .catalog_controller
1466 .prepare_streaming_job(
1467 sink.tmp_sink_id,
1468 || [&sink.new_fragment].into_iter(),
1469 &sink.actor_status,
1470 &empty_actor_splits,
1471 &empty_downstreams,
1472 true,
1473 None,
1474 )
1475 .await?;
1476 }
1477 }
1478
1479 self.metadata_manager
1480 .catalog_controller
1481 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1482 .await?;
1483
1484 self.stream_manager
1485 .replace_stream_job(stream_job_fragments, ctx)
1486 .await?;
1487 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1488 };
1489
1490 match result {
1491 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1492 let version = self
1493 .metadata_manager
1494 .catalog_controller
1495 .finish_replace_streaming_job(
1496 tmp_id,
1497 streaming_job,
1498 replace_upstream,
1499 SinkIntoTableContext {
1500 updated_sink_catalogs,
1501 },
1502 drop_table_connector_ctx.as_ref(),
1503 auto_refresh_schema_sink_finish_ctx,
1504 )
1505 .await?;
1506 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1507 self.source_manager
1508 .apply_source_change(SourceChange::DropSource {
1509 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1510 })
1511 .await;
1512 }
1513 Ok(version)
1514 }
1515 Err(err) => {
1516 tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
1517 let _ = self.metadata_manager
1518 .catalog_controller
1519 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1520 .await.inspect_err(|err| {
1521 tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
1522 });
1523 Err(err)
1524 }
1525 }
1526 }
1527
1528 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1529 )]
1530 async fn drop_streaming_job(
1531 &self,
1532 job_id: StreamingJobId,
1533 drop_mode: DropMode,
1534 ) -> MetaResult<NotificationVersion> {
1535 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1536
1537 let (object_id, object_type) = match job_id {
1538 StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
1539 StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
1540 StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
1541 StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
1542 };
1543
1544 let job_status = self
1545 .metadata_manager
1546 .catalog_controller
1547 .get_streaming_job_status(job_id.id())
1548 .await?;
1549 let version = match job_status {
1550 JobStatus::Initial => {
1551 unreachable!(
1552 "Job with Initial status should not notify frontend and therefore should not arrive here"
1553 );
1554 }
1555 JobStatus::Creating => {
1556 let canceled_jobs = self
1557 .stream_manager
1558 .cancel_streaming_jobs(vec![(job_id.id() as u32).into()])
1559 .await;
1560 if canceled_jobs.is_empty() {
1561 tracing::warn!(job_id = job_id.id(), "failed to cancel streaming job");
1562 }
1563 IGNORED_NOTIFICATION_VERSION
1564 }
1565 JobStatus::Created => {
1566 let version = self.drop_object(object_type, object_id, drop_mode).await?;
1567 #[cfg(not(madsim))]
1568 if let StreamingJobId::Sink(sink_id) = job_id {
1569 let db = self.env.meta_store_ref().conn.clone();
1572 clean_all_rows_by_sink_id(&db, sink_id).await?;
1573 }
1574 version
1575 }
1576 };
1577
1578 Ok(version)
1579 }
1580
1581 fn resolve_stream_parallelism(
1585 &self,
1586 specified: Option<NonZeroUsize>,
1587 max: NonZeroUsize,
1588 cluster_info: &StreamingClusterInfo,
1589 resource_group: String,
1590 ) -> MetaResult<NonZeroUsize> {
1591 let available = cluster_info.parallelism(&resource_group);
1592 let Some(available) = NonZeroUsize::new(available) else {
1593 bail_unavailable!(
1594 "no available slots to schedule in resource group \"{}\", \
1595 have you allocated any compute nodes within this resource group?",
1596 resource_group
1597 );
1598 };
1599
1600 if let Some(specified) = specified {
1601 if specified > max {
1602 bail_invalid_parameter!(
1603 "specified parallelism {} should not exceed max parallelism {}",
1604 specified,
1605 max,
1606 );
1607 }
1608 if specified > available {
1609 bail_unavailable!(
1610 "insufficient parallelism to schedule in resource group \"{}\", \
1611 required: {}, available: {}",
1612 resource_group,
1613 specified,
1614 available,
1615 );
1616 }
1617 Ok(specified)
1618 } else {
1619 let default_parallelism = match self.env.opts.default_parallelism {
1621 DefaultParallelism::Full => available,
1622 DefaultParallelism::Default(num) => {
1623 if num > available {
1624 bail_unavailable!(
1625 "insufficient parallelism to schedule in resource group \"{}\", \
1626 required: {}, available: {}",
1627 resource_group,
1628 num,
1629 available,
1630 );
1631 }
1632 num
1633 }
1634 };
1635
1636 if default_parallelism > max {
1637 tracing::warn!(
1638 max_parallelism = max.get(),
1639 resource_group,
1640 "too many parallelism available, use max parallelism instead",
1641 );
1642 }
1643 Ok(default_parallelism.min(max))
1644 }
1645 }
1646
1647 #[await_tree::instrument]
1653 pub(crate) async fn build_stream_job(
1654 &self,
1655 stream_ctx: StreamContext,
1656 mut stream_job: StreamingJob,
1657 fragment_graph: StreamFragmentGraph,
1658 specific_resource_group: Option<String>,
1659 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1660 let id = stream_job.id();
1661 let specified_parallelism = fragment_graph.specified_parallelism();
1662 let expr_context = stream_ctx.to_expr_context();
1663 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1664
1665 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1667
1668 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1672 fragment_graph.collect_snapshot_backfill_info()?;
1673 assert!(
1674 snapshot_backfill_info
1675 .iter()
1676 .chain([&cross_db_snapshot_backfill_info])
1677 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1678 .all(|backfill_epoch| backfill_epoch.is_none()),
1679 "should not set backfill epoch when initially build the job: {:?} {:?}",
1680 snapshot_backfill_info,
1681 cross_db_snapshot_backfill_info
1682 );
1683
1684 self.metadata_manager
1686 .catalog_controller
1687 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1688 .await?;
1689
1690 let upstream_table_ids = fragment_graph
1691 .dependent_table_ids()
1692 .iter()
1693 .filter(|id| {
1694 !cross_db_snapshot_backfill_info
1695 .upstream_mv_table_id_to_backfill_epoch
1696 .contains_key(id)
1697 })
1698 .cloned()
1699 .collect();
1700
1701 let (upstream_root_fragments, existing_actor_location) = self
1702 .metadata_manager
1703 .get_upstream_root_fragments(&upstream_table_ids)
1704 .await?;
1705
1706 if snapshot_backfill_info.is_some() {
1707 match stream_job {
1708 StreamingJob::MaterializedView(_)
1709 | StreamingJob::Sink(_)
1710 | StreamingJob::Index(_, _) => {}
1711 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1712 return Err(
1713 anyhow!("snapshot_backfill not enabled for table and source").into(),
1714 );
1715 }
1716 }
1717 }
1718
1719 let upstream_actors = upstream_root_fragments
1720 .values()
1721 .map(|fragment| {
1722 (
1723 fragment.fragment_id,
1724 fragment.actors.iter().map(|actor| actor.actor_id).collect(),
1725 )
1726 })
1727 .collect();
1728
1729 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1730 fragment_graph,
1731 upstream_root_fragments,
1732 existing_actor_location,
1733 (&stream_job).into(),
1734 )?;
1735
1736 let resource_group = match specific_resource_group {
1737 None => {
1738 self.metadata_manager
1739 .get_database_resource_group(stream_job.database_id() as ObjectId)
1740 .await?
1741 }
1742 Some(resource_group) => resource_group,
1743 };
1744
1745 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1747
1748 let parallelism = self.resolve_stream_parallelism(
1749 specified_parallelism,
1750 max_parallelism,
1751 &cluster_info,
1752 resource_group.clone(),
1753 )?;
1754
1755 let parallelism = self
1756 .env
1757 .system_params_reader()
1758 .await
1759 .adaptive_parallelism_strategy()
1760 .compute_target_parallelism(parallelism.get());
1761
1762 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1763 let actor_graph_builder = ActorGraphBuilder::new(
1764 id,
1765 resource_group,
1766 complete_graph,
1767 cluster_info,
1768 parallelism,
1769 )?;
1770
1771 let ActorGraphBuildResult {
1772 graph,
1773 downstream_fragment_relations,
1774 building_locations,
1775 upstream_fragment_downstreams,
1776 new_no_shuffle,
1777 replace_upstream,
1778 ..
1779 } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
1780 assert!(replace_upstream.is_empty());
1781
1782 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1789 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1790 _ => TableParallelism::Fixed(parallelism.get()),
1791 };
1792
1793 let stream_job_fragments = StreamJobFragments::new(
1794 id.into(),
1795 graph,
1796 &building_locations.actor_locations,
1797 stream_ctx.clone(),
1798 table_parallelism,
1799 max_parallelism.get(),
1800 );
1801
1802 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1803 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1804 }
1805
1806 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1807 && let Ok(table_id) = sink.get_target_table()
1808 {
1809 let tables = self
1810 .metadata_manager
1811 .get_table_catalog_by_ids(&[*table_id as _])
1812 .await?;
1813 let target_table = tables
1814 .first()
1815 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1816 let sink_fragment = stream_job_fragments
1817 .sink_fragment()
1818 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1819 let mview_fragment_id = self
1820 .metadata_manager
1821 .catalog_controller
1822 .get_mview_fragment_by_id(*table_id as _)
1823 .await?;
1824 let upstream_sink_info = build_upstream_sink_info(
1825 sink,
1826 sink_fragment.fragment_id as _,
1827 target_table,
1828 mview_fragment_id,
1829 )?;
1830 Some(upstream_sink_info)
1831 } else {
1832 None
1833 };
1834
1835 let ctx = CreateStreamingJobContext {
1836 upstream_fragment_downstreams,
1837 new_no_shuffle,
1838 upstream_actors,
1839 building_locations,
1840 definition: stream_job.definition(),
1841 mv_table_id: stream_job.mv_table(),
1842 create_type: stream_job.create_type(),
1843 job_type: (&stream_job).into(),
1844 streaming_job: stream_job,
1845 new_upstream_sink,
1846 option: CreateStreamingJobOption {},
1847 snapshot_backfill_info,
1848 cross_db_snapshot_backfill_info,
1849 fragment_backfill_ordering,
1850 };
1851
1852 Ok((
1853 ctx,
1854 StreamJobFragmentsToCreate {
1855 inner: stream_job_fragments,
1856 downstreams: downstream_fragment_relations,
1857 },
1858 ))
1859 }
1860
1861 pub(crate) async fn build_replace_job(
1867 &self,
1868 stream_ctx: StreamContext,
1869 stream_job: &StreamingJob,
1870 mut fragment_graph: StreamFragmentGraph,
1871 tmp_job_id: TableId,
1872 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1873 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1874 match &stream_job {
1875 StreamingJob::Table(..)
1876 | StreamingJob::Source(..)
1877 | StreamingJob::MaterializedView(..) => {}
1878 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1879 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1880 }
1881 }
1882
1883 let id = stream_job.id();
1884 let expr_context = stream_ctx.to_expr_context();
1885
1886 let mut drop_table_associated_source_id = None;
1888 if let StreamingJob::Table(None, _, _) = &stream_job {
1889 drop_table_associated_source_id = self
1890 .metadata_manager
1891 .get_table_associated_source_id(id as _)
1892 .await?;
1893 }
1894
1895 let old_fragments = self
1896 .metadata_manager
1897 .get_job_fragments_by_id(&id.into())
1898 .await?;
1899 let old_internal_table_ids = old_fragments.internal_table_ids();
1900
1901 let mut drop_table_connector_ctx = None;
1903 if let Some(to_remove_source_id) = drop_table_associated_source_id {
1904 debug_assert!(old_internal_table_ids.len() == 1);
1906
1907 drop_table_connector_ctx = Some(DropTableConnectorContext {
1908 to_change_streaming_job_id: id as i32,
1911 to_remove_state_table_id: old_internal_table_ids[0] as i32, to_remove_source_id,
1913 });
1914 } else if stream_job.is_materialized_view() {
1915 let old_fragments_upstreams = self
1918 .metadata_manager
1919 .catalog_controller
1920 .upstream_fragments(old_fragments.fragment_ids())
1921 .await?;
1922
1923 let old_state_graph =
1924 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1925 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1926 let mapping =
1927 state_match::match_graph_internal_tables(&new_state_graph, &old_state_graph)
1928 .context("incompatible altering on the streaming job states")?;
1929
1930 fragment_graph.fit_internal_table_ids_with_mapping(mapping);
1931 } else {
1932 let old_internal_tables = self
1935 .metadata_manager
1936 .get_table_catalog_by_ids(&old_internal_table_ids)
1937 .await?;
1938 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
1939 }
1940
1941 let original_root_fragment = old_fragments
1944 .root_fragment()
1945 .expect("root fragment not found");
1946
1947 let job_type = StreamingJobType::from(stream_job);
1948
1949 let (mut downstream_fragments, mut downstream_actor_location) =
1951 self.metadata_manager.get_downstream_fragments(id).await?;
1952
1953 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
1954 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
1955 .iter()
1956 .map(|sink| sink.original_fragment.fragment_id)
1957 .collect();
1958 for (_, downstream_fragment) in &mut downstream_fragments {
1959 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
1960 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
1961 }) {
1962 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
1963 for actor in &downstream_fragment.actors {
1964 downstream_actor_location.remove(&actor.actor_id);
1965 }
1966 for (actor_id, status) in &sink.actor_status {
1967 downstream_actor_location.insert(
1968 *actor_id,
1969 status.location.as_ref().unwrap().worker_node_id as WorkerId,
1970 );
1971 }
1972 *downstream_fragment = sink.new_fragment.clone();
1973 }
1974 }
1975 assert!(remaining_fragment.is_empty());
1976 }
1977
1978 let complete_graph = match &job_type {
1980 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
1981 CompleteStreamFragmentGraph::with_downstreams(
1982 fragment_graph,
1983 original_root_fragment.fragment_id,
1984 downstream_fragments,
1985 downstream_actor_location,
1986 job_type,
1987 )?
1988 }
1989 StreamingJobType::Table(TableJobType::SharedCdcSource)
1990 | StreamingJobType::MaterializedView => {
1991 let (upstream_root_fragments, upstream_actor_location) = self
1993 .metadata_manager
1994 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
1995 .await?;
1996
1997 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
1998 fragment_graph,
1999 upstream_root_fragments,
2000 upstream_actor_location,
2001 original_root_fragment.fragment_id,
2002 downstream_fragments,
2003 downstream_actor_location,
2004 job_type,
2005 )?
2006 }
2007 _ => unreachable!(),
2008 };
2009
2010 let resource_group = self
2011 .metadata_manager
2012 .get_existing_job_resource_group(id as ObjectId)
2013 .await?;
2014
2015 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2017
2018 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2021 .expect("The number of actors in the original table fragment should be greater than 0");
2022
2023 let actor_graph_builder = ActorGraphBuilder::new(
2024 id,
2025 resource_group,
2026 complete_graph,
2027 cluster_info,
2028 parallelism,
2029 )?;
2030
2031 let ActorGraphBuildResult {
2032 graph,
2033 downstream_fragment_relations,
2034 building_locations,
2035 upstream_fragment_downstreams,
2036 mut replace_upstream,
2037 new_no_shuffle,
2038 ..
2039 } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
2040
2041 if matches!(
2043 job_type,
2044 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2045 ) {
2046 assert!(upstream_fragment_downstreams.is_empty());
2047 }
2048
2049 let stream_job_fragments = StreamJobFragments::new(
2053 (tmp_job_id as u32).into(),
2054 graph,
2055 &building_locations.actor_locations,
2056 stream_ctx,
2057 old_fragments.assigned_parallelism,
2058 old_fragments.max_parallelism,
2059 );
2060
2061 if let Some(sinks) = &auto_refresh_schema_sinks {
2062 for sink in sinks {
2063 replace_upstream
2064 .remove(&sink.new_fragment.fragment_id)
2065 .expect("should exist");
2066 }
2067 }
2068
2069 let ctx = ReplaceStreamJobContext {
2073 old_fragments,
2074 replace_upstream,
2075 new_no_shuffle,
2076 upstream_fragment_downstreams,
2077 building_locations,
2078 streaming_job: stream_job.clone(),
2079 tmp_id: tmp_job_id as _,
2080 drop_table_connector_ctx,
2081 auto_refresh_schema_sinks,
2082 };
2083
2084 Ok((
2085 ctx,
2086 StreamJobFragmentsToCreate {
2087 inner: stream_job_fragments,
2088 downstreams: downstream_fragment_relations,
2089 },
2090 ))
2091 }
2092
2093 async fn alter_name(
2094 &self,
2095 relation: alter_name_request::Object,
2096 new_name: &str,
2097 ) -> MetaResult<NotificationVersion> {
2098 let (obj_type, id) = match relation {
2099 alter_name_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2100 alter_name_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2101 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id as ObjectId),
2102 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2103 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2104 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2105 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2106 alter_name_request::Object::SubscriptionId(id) => {
2107 (ObjectType::Subscription, id as ObjectId)
2108 }
2109 };
2110 self.metadata_manager
2111 .catalog_controller
2112 .alter_name(obj_type, id, new_name)
2113 .await
2114 }
2115
2116 async fn alter_swap_rename(
2117 &self,
2118 object: alter_swap_rename_request::Object,
2119 ) -> MetaResult<NotificationVersion> {
2120 let (obj_type, src_id, dst_id) = match object {
2121 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2122 alter_swap_rename_request::Object::Table(objs) => {
2123 let (src_id, dst_id) = (
2124 objs.src_object_id as ObjectId,
2125 objs.dst_object_id as ObjectId,
2126 );
2127 (ObjectType::Table, src_id, dst_id)
2128 }
2129 alter_swap_rename_request::Object::View(objs) => {
2130 let (src_id, dst_id) = (
2131 objs.src_object_id as ObjectId,
2132 objs.dst_object_id as ObjectId,
2133 );
2134 (ObjectType::View, src_id, dst_id)
2135 }
2136 alter_swap_rename_request::Object::Source(objs) => {
2137 let (src_id, dst_id) = (
2138 objs.src_object_id as ObjectId,
2139 objs.dst_object_id as ObjectId,
2140 );
2141 (ObjectType::Source, src_id, dst_id)
2142 }
2143 alter_swap_rename_request::Object::Sink(objs) => {
2144 let (src_id, dst_id) = (
2145 objs.src_object_id as ObjectId,
2146 objs.dst_object_id as ObjectId,
2147 );
2148 (ObjectType::Sink, src_id, dst_id)
2149 }
2150 alter_swap_rename_request::Object::Subscription(objs) => {
2151 let (src_id, dst_id) = (
2152 objs.src_object_id as ObjectId,
2153 objs.dst_object_id as ObjectId,
2154 );
2155 (ObjectType::Subscription, src_id, dst_id)
2156 }
2157 };
2158
2159 self.metadata_manager
2160 .catalog_controller
2161 .alter_swap_rename(obj_type, src_id, dst_id)
2162 .await
2163 }
2164
2165 async fn alter_owner(
2166 &self,
2167 object: Object,
2168 owner_id: UserId,
2169 ) -> MetaResult<NotificationVersion> {
2170 let (obj_type, id) = match object {
2171 Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2172 Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2173 Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2174 Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2175 Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2176 Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2177 Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId),
2178 Object::ConnectionId(id) => (ObjectType::Connection, id as ObjectId),
2179 };
2180 self.metadata_manager
2181 .catalog_controller
2182 .alter_owner(obj_type, id, owner_id as _)
2183 .await
2184 }
2185
2186 async fn alter_set_schema(
2187 &self,
2188 object: alter_set_schema_request::Object,
2189 new_schema_id: SchemaId,
2190 ) -> MetaResult<NotificationVersion> {
2191 let (obj_type, id) = match object {
2192 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2193 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2194 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2195 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2196 alter_set_schema_request::Object::FunctionId(id) => {
2197 (ObjectType::Function, id as ObjectId)
2198 }
2199 alter_set_schema_request::Object::ConnectionId(id) => {
2200 (ObjectType::Connection, id as ObjectId)
2201 }
2202 alter_set_schema_request::Object::SubscriptionId(id) => {
2203 (ObjectType::Subscription, id as ObjectId)
2204 }
2205 };
2206 self.metadata_manager
2207 .catalog_controller
2208 .alter_schema(obj_type, id, new_schema_id as _)
2209 .await
2210 }
2211
2212 pub async fn wait(&self) -> MetaResult<()> {
2213 let timeout_ms = 30 * 60 * 1000;
2214 for _ in 0..timeout_ms {
2215 if self
2216 .metadata_manager
2217 .catalog_controller
2218 .list_background_creating_jobs(true)
2219 .await?
2220 .is_empty()
2221 {
2222 return Ok(());
2223 }
2224
2225 sleep(Duration::from_millis(1)).await;
2226 }
2227 Err(MetaError::cancelled(format!(
2228 "timeout after {timeout_ms}ms"
2229 )))
2230 }
2231
2232 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2233 self.metadata_manager
2234 .catalog_controller
2235 .comment_on(comment)
2236 .await
2237 }
2238}
2239
2240fn report_create_object(
2241 catalog_id: u32,
2242 event_name: &str,
2243 obj_type: PbTelemetryDatabaseObject,
2244 connector_name: Option<String>,
2245 attr_info: Option<jsonbb::Value>,
2246) {
2247 report_event(
2248 PbTelemetryEventStage::CreateStreamJob,
2249 event_name,
2250 catalog_id.into(),
2251 connector_name,
2252 Some(obj_type),
2253 attr_info,
2254 );
2255}
2256
2257async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> {
2258 match Entity::delete_many()
2259 .filter(Column::SinkId.eq(sink_id))
2260 .exec(db)
2261 .await
2262 {
2263 Ok(result) => {
2264 let deleted_count = result.rows_affected;
2265
2266 tracing::info!(
2267 "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2268 deleted_count,
2269 sink_id
2270 );
2271 Ok(())
2272 }
2273 Err(e) => {
2274 tracing::error!(
2275 "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2276 sink_id,
2277 e.as_report()
2278 );
2279 Err(e.into())
2280 }
2281 }
2282}
2283
2284pub fn build_upstream_sink_info(
2285 sink: &PbSink,
2286 sink_fragment_id: FragmentId,
2287 target_table: &PbTable,
2288 target_fragment_id: FragmentId,
2289) -> MetaResult<UpstreamSinkInfo> {
2290 let sink_columns = if !sink.original_target_columns.is_empty() {
2291 sink.original_target_columns.clone()
2292 } else {
2293 target_table.columns.clone()
2298 };
2299
2300 let sink_output_fields = sink_columns
2301 .iter()
2302 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2303 .collect_vec();
2304 let output_indices = (0..sink_output_fields.len())
2305 .map(|i| i as u32)
2306 .collect_vec();
2307
2308 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2309 let sink_idx_by_col_id = sink_columns
2310 .iter()
2311 .enumerate()
2312 .map(|(idx, col)| {
2313 let column_id = col.column_desc.as_ref().unwrap().column_id;
2314 (column_id, idx as u32)
2315 })
2316 .collect::<HashMap<_, _>>();
2317 target_table
2318 .distribution_key
2319 .iter()
2320 .map(|dist_idx| {
2321 let column_id = target_table.columns[*dist_idx as usize]
2322 .column_desc
2323 .as_ref()
2324 .unwrap()
2325 .column_id;
2326 let sink_idx = sink_idx_by_col_id
2327 .get(&column_id)
2328 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2329 Ok(*sink_idx)
2330 })
2331 .collect::<anyhow::Result<Vec<_>>>()?
2332 };
2333 let dist_key_indices =
2334 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2335 let downstream_fragment_id = target_fragment_id as _;
2336 let new_downstream_relation = DownstreamFragmentRelation {
2337 downstream_fragment_id,
2338 dispatcher_type: DispatcherType::Hash,
2339 dist_key_indices,
2340 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2341 };
2342 let current_target_columns = target_table.get_columns();
2343 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2344 Ok(UpstreamSinkInfo {
2345 sink_id: sink.id as _,
2346 sink_fragment_id: sink_fragment_id as _,
2347 sink_output_fields: sink_output_fields.clone(),
2348 sink_original_target_columns: sink.get_original_target_columns().clone(),
2349 project_exprs,
2350 new_sink_downstream: new_downstream_relation,
2351 })
2352}
2353
2354pub fn refill_upstream_sink_union_in_table(
2355 union_fragment_root: &mut PbStreamNode,
2356 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2357) {
2358 visit_stream_node_cont_mut(union_fragment_root, |node| {
2359 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2360 let init_upstreams = upstream_sink_infos
2361 .iter()
2362 .map(|info| PbUpstreamSinkInfo {
2363 upstream_fragment_id: info.sink_fragment_id,
2364 sink_output_schema: info.sink_output_fields.clone(),
2365 project_exprs: info.project_exprs.clone(),
2366 })
2367 .collect();
2368 upstream_sink_union.init_upstreams = init_upstreams;
2369 false
2370 } else {
2371 true
2372 }
2373 });
2374}