1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
45use risingwave_meta_model::object::ObjectType;
46use risingwave_meta_model::{
47 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
48 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
49};
50use risingwave_pb::catalog::{
51 Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
52 Subscription, Table, View,
53};
54use risingwave_pb::common::PbActorLocation;
55use risingwave_pb::ddl_service::alter_owner_request::Object;
56use risingwave_pb::ddl_service::{
57 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
58 alter_swap_rename_request,
59};
60use risingwave_pb::meta::table_fragments::PbActorStatus;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64 StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
68use strum::Display;
69use thiserror_ext::AsReport;
70use tokio::sync::{OwnedSemaphorePermit, Semaphore};
71use tokio::time::sleep;
72use tracing::Instrument;
73
74use crate::barrier::BarrierManagerRef;
75use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
76use crate::controller::cluster::StreamingClusterInfo;
77use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
78use crate::controller::utils::build_select_node_list;
79use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
80use crate::manager::{
81 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
82 NotificationVersion, StreamingJob, StreamingJobType,
83};
84use crate::model::{
85 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
86 FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
87 TableParallelism,
88};
89use crate::stream::cdc::{
90 is_parallelized_backfill_enabled, try_init_parallel_cdc_table_snapshot_splits,
91};
92use crate::stream::{
93 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
94 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
95 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
96 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
97 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
98 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
99};
100use crate::telemetry::report_event;
101use crate::{MetaError, MetaResult};
102
103#[derive(PartialEq)]
104pub enum DropMode {
105 Restrict,
106 Cascade,
107}
108
109impl DropMode {
110 pub fn from_request_setting(cascade: bool) -> DropMode {
111 if cascade {
112 DropMode::Cascade
113 } else {
114 DropMode::Restrict
115 }
116 }
117}
118
119#[derive(strum::AsRefStr)]
120pub enum StreamingJobId {
121 MaterializedView(TableId),
122 Sink(SinkId),
123 Table(Option<SourceId>, TableId),
124 Index(IndexId),
125}
126
127impl std::fmt::Display for StreamingJobId {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 write!(f, "{}", self.as_ref())?;
130 write!(f, "({})", self.id())
131 }
132}
133
134impl StreamingJobId {
135 fn id(&self) -> JobId {
136 match self {
137 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
138 StreamingJobId::Index(id) => id.as_job_id(),
139 StreamingJobId::Sink(id) => id.as_job_id(),
140 }
141 }
142}
143
144pub struct ReplaceStreamJobInfo {
147 pub streaming_job: StreamingJob,
148 pub fragment_graph: StreamFragmentGraphProto,
149}
150
151#[derive(Display)]
152pub enum DdlCommand {
153 CreateDatabase(Database),
154 DropDatabase(DatabaseId),
155 CreateSchema(Schema),
156 DropSchema(SchemaId, DropMode),
157 CreateNonSharedSource(Source),
158 DropSource(SourceId, DropMode),
159 CreateFunction(Function),
160 DropFunction(FunctionId, DropMode),
161 CreateView(View, HashSet<ObjectId>),
162 DropView(ViewId, DropMode),
163 CreateStreamingJob {
164 stream_job: StreamingJob,
165 fragment_graph: StreamFragmentGraphProto,
166 dependencies: HashSet<ObjectId>,
167 specific_resource_group: Option<String>, if_not_exists: bool,
169 },
170 DropStreamingJob {
171 job_id: StreamingJobId,
172 drop_mode: DropMode,
173 },
174 AlterName(alter_name_request::Object, String),
175 AlterSwapRename(alter_swap_rename_request::Object),
176 ReplaceStreamJob(ReplaceStreamJobInfo),
177 AlterNonSharedSource(Source),
178 AlterObjectOwner(Object, UserId),
179 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
180 CreateConnection(Connection),
181 DropConnection(ConnectionId, DropMode),
182 CreateSecret(Secret),
183 AlterSecret(Secret),
184 DropSecret(SecretId),
185 CommentOn(Comment),
186 CreateSubscription(Subscription),
187 DropSubscription(SubscriptionId, DropMode),
188 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
189}
190
191impl DdlCommand {
192 fn object(&self) -> Either<String, ObjectId> {
194 use Either::*;
195 match self {
196 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
197 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
198 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
199 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
200 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
201 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
202 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
203 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
204 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
205 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
206 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
207 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
208 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
209 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
210 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
211 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
212 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
213 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
214 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
215 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
216 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
217 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
218 DdlCommand::DropSecret(id) => Right(id.as_object_id()),
219 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
220 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
221 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
222 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
223 }
224 }
225
226 fn allow_in_recovery(&self) -> bool {
227 match self {
228 DdlCommand::DropDatabase(_)
229 | DdlCommand::DropSchema(_, _)
230 | DdlCommand::DropSource(_, _)
231 | DdlCommand::DropFunction(_, _)
232 | DdlCommand::DropView(_, _)
233 | DdlCommand::DropStreamingJob { .. }
234 | DdlCommand::DropConnection(_, _)
235 | DdlCommand::DropSecret(_)
236 | DdlCommand::DropSubscription(_, _)
237 | DdlCommand::AlterName(_, _)
238 | DdlCommand::AlterObjectOwner(_, _)
239 | DdlCommand::AlterSetSchema(_, _)
240 | DdlCommand::CreateDatabase(_)
241 | DdlCommand::CreateSchema(_)
242 | DdlCommand::CreateFunction(_)
243 | DdlCommand::CreateView(_, _)
244 | DdlCommand::CreateConnection(_)
245 | DdlCommand::CommentOn(_)
246 | DdlCommand::CreateSecret(_)
247 | DdlCommand::AlterSecret(_)
248 | DdlCommand::AlterSwapRename(_)
249 | DdlCommand::AlterDatabaseParam(_, _) => true,
250 DdlCommand::CreateStreamingJob { .. }
251 | DdlCommand::CreateNonSharedSource(_)
252 | DdlCommand::ReplaceStreamJob(_)
253 | DdlCommand::AlterNonSharedSource(_)
254 | DdlCommand::CreateSubscription(_) => false,
255 }
256 }
257}
258
259#[derive(Clone)]
260pub struct DdlController {
261 pub(crate) env: MetaSrvEnv,
262
263 pub(crate) metadata_manager: MetadataManager,
264 pub(crate) stream_manager: GlobalStreamManagerRef,
265 pub(crate) source_manager: SourceManagerRef,
266 barrier_manager: BarrierManagerRef,
267
268 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
270
271 seq: Arc<AtomicU64>,
273}
274
275#[derive(Clone)]
276pub struct CreatingStreamingJobPermit {
277 pub(crate) semaphore: Arc<Semaphore>,
278}
279
280impl CreatingStreamingJobPermit {
281 async fn new(env: &MetaSrvEnv) -> Self {
282 let mut permits = env
283 .system_params_reader()
284 .await
285 .max_concurrent_creating_streaming_jobs() as usize;
286 if permits == 0 {
287 permits = Semaphore::MAX_PERMITS;
289 }
290 let semaphore = Arc::new(Semaphore::new(permits));
291
292 let (local_notification_tx, mut local_notification_rx) =
293 tokio::sync::mpsc::unbounded_channel();
294 env.notification_manager()
295 .insert_local_sender(local_notification_tx);
296 let semaphore_clone = semaphore.clone();
297 tokio::spawn(async move {
298 while let Some(notification) = local_notification_rx.recv().await {
299 let LocalNotification::SystemParamsChange(p) = ¬ification else {
300 continue;
301 };
302 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
303 if new_permits == 0 {
304 new_permits = Semaphore::MAX_PERMITS;
305 }
306 match permits.cmp(&new_permits) {
307 Ordering::Less => {
308 semaphore_clone.add_permits(new_permits - permits);
309 }
310 Ordering::Equal => continue,
311 Ordering::Greater => {
312 let to_release = permits - new_permits;
313 let reduced = semaphore_clone.forget_permits(to_release);
314 if reduced != to_release {
316 tracing::warn!(
317 "no enough permits to release, expected {}, but reduced {}",
318 to_release,
319 reduced
320 );
321 }
322 }
323 }
324 tracing::info!(
325 "max_concurrent_creating_streaming_jobs changed from {} to {}",
326 permits,
327 new_permits
328 );
329 permits = new_permits;
330 }
331 });
332
333 Self { semaphore }
334 }
335}
336
337impl DdlController {
338 pub async fn new(
339 env: MetaSrvEnv,
340 metadata_manager: MetadataManager,
341 stream_manager: GlobalStreamManagerRef,
342 source_manager: SourceManagerRef,
343 barrier_manager: BarrierManagerRef,
344 ) -> Self {
345 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
346 Self {
347 env,
348 metadata_manager,
349 stream_manager,
350 source_manager,
351 barrier_manager,
352 creating_streaming_job_permits,
353 seq: Arc::new(AtomicU64::new(0)),
354 }
355 }
356
357 pub fn next_seq(&self) -> u64 {
359 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
361 }
362
363 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
370 if !command.allow_in_recovery() {
371 self.barrier_manager.check_status_running()?;
372 }
373
374 let await_tree_key = format!("DDL Command {}", self.next_seq());
375 let await_tree_span = await_tree::span!("{command}({})", command.object());
376
377 let ctrl = self.clone();
378 let fut = async move {
379 match command {
380 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
381 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
382 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
383 DdlCommand::DropSchema(schema_id, drop_mode) => {
384 ctrl.drop_schema(schema_id, drop_mode).await
385 }
386 DdlCommand::CreateNonSharedSource(source) => {
387 ctrl.create_non_shared_source(source).await
388 }
389 DdlCommand::DropSource(source_id, drop_mode) => {
390 ctrl.drop_source(source_id, drop_mode).await
391 }
392 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
393 DdlCommand::DropFunction(function_id, drop_mode) => {
394 ctrl.drop_function(function_id, drop_mode).await
395 }
396 DdlCommand::CreateView(view, dependencies) => {
397 ctrl.create_view(view, dependencies).await
398 }
399 DdlCommand::DropView(view_id, drop_mode) => {
400 ctrl.drop_view(view_id, drop_mode).await
401 }
402 DdlCommand::CreateStreamingJob {
403 stream_job,
404 fragment_graph,
405 dependencies,
406 specific_resource_group,
407 if_not_exists,
408 } => {
409 ctrl.create_streaming_job(
410 stream_job,
411 fragment_graph,
412 dependencies,
413 specific_resource_group,
414 if_not_exists,
415 )
416 .await
417 }
418 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
419 ctrl.drop_streaming_job(job_id, drop_mode).await
420 }
421 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
422 streaming_job,
423 fragment_graph,
424 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
425 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
426 DdlCommand::AlterObjectOwner(object, owner_id) => {
427 ctrl.alter_owner(object, owner_id).await
428 }
429 DdlCommand::AlterSetSchema(object, new_schema_id) => {
430 ctrl.alter_set_schema(object, new_schema_id).await
431 }
432 DdlCommand::CreateConnection(connection) => {
433 ctrl.create_connection(connection).await
434 }
435 DdlCommand::DropConnection(connection_id, drop_mode) => {
436 ctrl.drop_connection(connection_id, drop_mode).await
437 }
438 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
439 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
440 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
441 DdlCommand::AlterNonSharedSource(source) => {
442 ctrl.alter_non_shared_source(source).await
443 }
444 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
445 DdlCommand::CreateSubscription(subscription) => {
446 ctrl.create_subscription(subscription).await
447 }
448 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
449 ctrl.drop_subscription(subscription_id, drop_mode).await
450 }
451 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
452 DdlCommand::AlterDatabaseParam(database_id, param) => {
453 ctrl.alter_database_param(database_id, param).await
454 }
455 }
456 }
457 .in_current_span();
458 let fut = (self.env.await_tree_reg())
459 .register(await_tree_key, await_tree_span)
460 .instrument(Box::pin(fut));
461 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
462 Ok(Some(WaitVersion {
463 catalog_version: notification_version,
464 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
465 }))
466 }
467
468 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
469 self.barrier_manager.get_ddl_progress().await
470 }
471
472 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
473 let (version, updated_db) = self
474 .metadata_manager
475 .catalog_controller
476 .create_database(database)
477 .await?;
478 self.barrier_manager
480 .update_database_barrier(
481 updated_db.database_id,
482 updated_db.barrier_interval_ms.map(|v| v as u32),
483 updated_db.checkpoint_frequency.map(|v| v as u64),
484 )
485 .await?;
486 Ok(version)
487 }
488
489 #[tracing::instrument(skip(self), level = "debug")]
490 pub async fn reschedule_streaming_job(
491 &self,
492 job_id: JobId,
493 target: ReschedulePolicy,
494 mut deferred: bool,
495 ) -> MetaResult<()> {
496 tracing::info!("altering parallelism for job {}", job_id);
497 if self.barrier_manager.check_status_running().is_err() {
498 tracing::info!(
499 "alter parallelism is set to deferred mode because the system is in recovery state"
500 );
501 deferred = true;
502 }
503
504 self.stream_manager
505 .reschedule_streaming_job(job_id, target, deferred)
506 .await
507 }
508
509 pub async fn reschedule_cdc_table_backfill(
510 &self,
511 job_id: JobId,
512 target: ReschedulePolicy,
513 ) -> MetaResult<()> {
514 tracing::info!("alter CDC table backfill parallelism");
515 if self.barrier_manager.check_status_running().is_err() {
516 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
517 }
518 self.stream_manager
519 .reschedule_cdc_table_backfill(job_id, target)
520 .await
521 }
522
523 pub async fn reschedule_fragments(
524 &self,
525 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
526 ) -> MetaResult<()> {
527 tracing::info!(
528 "altering parallelism for fragments {:?}",
529 fragment_targets.keys()
530 );
531 let fragment_targets = fragment_targets
532 .into_iter()
533 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
534 .collect();
535
536 self.stream_manager
537 .reschedule_fragments(fragment_targets)
538 .await
539 }
540
541 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
542 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
543 .await
544 }
545
546 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
547 self.metadata_manager
548 .catalog_controller
549 .create_schema(schema)
550 .await
551 }
552
553 async fn drop_schema(
554 &self,
555 schema_id: SchemaId,
556 drop_mode: DropMode,
557 ) -> MetaResult<NotificationVersion> {
558 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
559 .await
560 }
561
562 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
564 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
565 .await
566 .context("failed to create source worker")?;
567
568 let (source_id, version) = self
569 .metadata_manager
570 .catalog_controller
571 .create_source(source)
572 .await?;
573 self.source_manager
574 .register_source_with_handle(source_id, handle)
575 .await;
576 Ok(version)
577 }
578
579 async fn drop_source(
580 &self,
581 source_id: SourceId,
582 drop_mode: DropMode,
583 ) -> MetaResult<NotificationVersion> {
584 self.drop_object(ObjectType::Source, source_id, drop_mode)
585 .await
586 }
587
588 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
591 self.metadata_manager
592 .catalog_controller
593 .alter_non_shared_source(source)
594 .await
595 }
596
597 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
598 self.metadata_manager
599 .catalog_controller
600 .create_function(function)
601 .await
602 }
603
604 async fn drop_function(
605 &self,
606 function_id: FunctionId,
607 drop_mode: DropMode,
608 ) -> MetaResult<NotificationVersion> {
609 self.drop_object(ObjectType::Function, function_id, drop_mode)
610 .await
611 }
612
613 async fn create_view(
614 &self,
615 view: View,
616 dependencies: HashSet<ObjectId>,
617 ) -> MetaResult<NotificationVersion> {
618 self.metadata_manager
619 .catalog_controller
620 .create_view(view, dependencies)
621 .await
622 }
623
624 async fn drop_view(
625 &self,
626 view_id: ViewId,
627 drop_mode: DropMode,
628 ) -> MetaResult<NotificationVersion> {
629 self.drop_object(ObjectType::View, view_id, drop_mode).await
630 }
631
632 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
633 validate_connection(&connection).await?;
634 self.metadata_manager
635 .catalog_controller
636 .create_connection(connection)
637 .await
638 }
639
640 async fn drop_connection(
641 &self,
642 connection_id: ConnectionId,
643 drop_mode: DropMode,
644 ) -> MetaResult<NotificationVersion> {
645 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
646 .await
647 }
648
649 async fn alter_database_param(
650 &self,
651 database_id: DatabaseId,
652 param: AlterDatabaseParam,
653 ) -> MetaResult<NotificationVersion> {
654 let (version, updated_db) = self
655 .metadata_manager
656 .catalog_controller
657 .alter_database_param(database_id, param)
658 .await?;
659 self.barrier_manager
661 .update_database_barrier(
662 database_id,
663 updated_db.barrier_interval_ms.map(|v| v as u32),
664 updated_db.checkpoint_frequency.map(|v| v as u64),
665 )
666 .await?;
667 Ok(version)
668 }
669
670 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
673 let secret_store_private_key = self
674 .env
675 .opts
676 .secret_store_private_key
677 .clone()
678 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
679
680 let encrypted_payload = SecretEncryption::encrypt(
681 secret_store_private_key.as_slice(),
682 secret.get_value().as_slice(),
683 )
684 .context(format!("failed to encrypt secret {}", secret.name))?;
685 Ok(encrypted_payload
686 .serialize()
687 .context(format!("failed to serialize secret {}", secret.name))?)
688 }
689
690 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
691 let secret_plain_payload = secret.value.clone();
694 let encrypted_payload = self.get_encrypted_payload(&secret)?;
695 secret.value = encrypted_payload;
696
697 self.metadata_manager
698 .catalog_controller
699 .create_secret(secret, secret_plain_payload)
700 .await
701 }
702
703 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
704 self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
705 .await
706 }
707
708 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
709 let secret_plain_payload = secret.value.clone();
710 let encrypted_payload = self.get_encrypted_payload(&secret)?;
711 secret.value = encrypted_payload;
712 self.metadata_manager
713 .catalog_controller
714 .alter_secret(secret, secret_plain_payload)
715 .await
716 }
717
718 async fn create_subscription(
719 &self,
720 mut subscription: Subscription,
721 ) -> MetaResult<NotificationVersion> {
722 tracing::debug!("create subscription");
723 let _permit = self
724 .creating_streaming_job_permits
725 .semaphore
726 .acquire()
727 .await
728 .unwrap();
729 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
730 self.metadata_manager
731 .catalog_controller
732 .create_subscription_catalog(&mut subscription)
733 .await?;
734 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
735 tracing::debug!(error = %err.as_report(), "failed to create subscription");
736 let _ = self
737 .metadata_manager
738 .catalog_controller
739 .try_abort_creating_subscription(subscription.id)
740 .await
741 .inspect_err(|e| {
742 tracing::error!(
743 error = %e.as_report(),
744 "failed to abort create subscription after failure"
745 );
746 });
747 return Err(err);
748 }
749
750 let version = self
751 .metadata_manager
752 .catalog_controller
753 .notify_create_subscription(subscription.id)
754 .await?;
755 tracing::debug!("finish create subscription");
756 Ok(version)
757 }
758
759 async fn drop_subscription(
760 &self,
761 subscription_id: SubscriptionId,
762 drop_mode: DropMode,
763 ) -> MetaResult<NotificationVersion> {
764 tracing::debug!("preparing drop subscription");
765 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
766 let subscription = self
767 .metadata_manager
768 .catalog_controller
769 .get_subscription_by_id(subscription_id)
770 .await?;
771 let table_id = subscription.dependent_table_id;
772 let database_id = subscription.database_id;
773 let (_, version) = self
774 .metadata_manager
775 .catalog_controller
776 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
777 .await?;
778 self.stream_manager
779 .drop_subscription(database_id, subscription_id, table_id)
780 .await;
781 tracing::debug!("finish drop subscription");
782 Ok(version)
783 }
784
785 #[await_tree::instrument]
787 pub(crate) async fn validate_cdc_table(
788 &self,
789 table: &Table,
790 table_fragments: &StreamJobFragments,
791 ) -> MetaResult<()> {
792 let stream_scan_fragment = table_fragments
793 .fragments
794 .values()
795 .filter(|f| {
796 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
797 || f.fragment_type_mask
798 .contains(FragmentTypeFlag::StreamCdcScan)
799 })
800 .exactly_one()
801 .ok()
802 .with_context(|| {
803 format!(
804 "expect exactly one stream scan fragment, got: {:?}",
805 table_fragments.fragments
806 )
807 })?;
808 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
809 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
810 if let Some(o) = node.options
811 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
812 {
813 } else {
815 assert_eq!(
816 stream_scan_fragment.actors.len(),
817 1,
818 "Stream scan fragment should have only one actor"
819 );
820 }
821 }
822 }
823 let mut found_cdc_scan = false;
824 match &stream_scan_fragment.nodes.node_body {
825 Some(NodeBody::StreamCdcScan(_)) => {
826 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
827 if self
828 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
829 .await?
830 {
831 found_cdc_scan = true;
832 }
833 }
834 Some(NodeBody::Project(_)) => {
836 for input in &stream_scan_fragment.nodes.input {
837 assert_parallelism(stream_scan_fragment, &input.node_body);
838 if self
839 .validate_cdc_table_inner(&input.node_body, table.id)
840 .await?
841 {
842 found_cdc_scan = true;
843 }
844 }
845 }
846 _ => {
847 bail!("Unexpected node body for stream cdc scan");
848 }
849 };
850 if !found_cdc_scan {
851 bail!("No stream cdc scan node found in stream scan fragment");
852 }
853 Ok(())
854 }
855
856 async fn validate_cdc_table_inner(
857 &self,
858 node_body: &Option<NodeBody>,
859 table_id: TableId,
860 ) -> MetaResult<bool> {
861 let meta_store = self.env.meta_store_ref();
862 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
863 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
864 {
865 let options_with_secret = WithOptionsSecResolved::new(
866 cdc_table_desc.connect_properties.clone(),
867 cdc_table_desc.secret_refs.clone(),
868 );
869
870 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
871 props.init_from_pb_cdc_table_desc(cdc_table_desc);
872
873 let _enumerator = props
875 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
876 .await?;
877
878 if is_parallelized_backfill_enabled(stream_cdc_scan) {
879 try_init_parallel_cdc_table_snapshot_splits(
881 table_id,
882 cdc_table_desc,
883 meta_store,
884 &stream_cdc_scan.options,
885 self.env.opts.cdc_table_split_init_insert_batch_size,
886 self.env.opts.cdc_table_split_init_sleep_interval_splits,
887 self.env.opts.cdc_table_split_init_sleep_duration_millis,
888 )
889 .await?;
890 }
891
892 tracing::debug!(?table_id, "validate cdc table success");
893 Ok(true)
894 } else {
895 Ok(false)
896 }
897 }
898
899 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
900 let migrated = self
901 .metadata_manager
902 .catalog_controller
903 .has_table_been_migrated(table_id)
904 .await?;
905 if !migrated {
906 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
907 } else {
908 Ok(())
909 }
910 }
911
912 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
915 pub async fn create_streaming_job(
916 &self,
917 mut streaming_job: StreamingJob,
918 fragment_graph: StreamFragmentGraphProto,
919 dependencies: HashSet<ObjectId>,
920 specific_resource_group: Option<String>,
921 if_not_exists: bool,
922 ) -> MetaResult<NotificationVersion> {
923 if let StreamingJob::Sink(sink) = &streaming_job
924 && let Some(target_table) = sink.target_table
925 {
926 self.validate_table_for_sink(target_table).await?;
927 }
928 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
929 let check_ret = self
930 .metadata_manager
931 .catalog_controller
932 .create_job_catalog(
933 &mut streaming_job,
934 &ctx,
935 &fragment_graph.parallelism,
936 fragment_graph.max_parallelism as _,
937 dependencies,
938 specific_resource_group.clone(),
939 )
940 .await;
941 if let Err(meta_err) = check_ret {
942 if !if_not_exists {
943 return Err(meta_err);
944 }
945 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
946 if streaming_job.create_type() == CreateType::Foreground {
947 let database_id = streaming_job.database_id();
948 self.metadata_manager
949 .wait_streaming_job_finished(database_id, *job_id)
950 .await
951 } else {
952 Ok(IGNORED_NOTIFICATION_VERSION)
953 }
954 } else {
955 Err(meta_err)
956 };
957 }
958 let job_id = streaming_job.id();
959 tracing::debug!(
960 id = %job_id,
961 definition = streaming_job.definition(),
962 create_type = streaming_job.create_type().as_str_name(),
963 job_type = ?streaming_job.job_type(),
964 "starting streaming job",
965 );
966 let permit = self
968 .creating_streaming_job_permits
969 .semaphore
970 .clone()
971 .acquire_owned()
972 .instrument_await("acquire_creating_streaming_job_permit")
973 .await
974 .unwrap();
975 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
976
977 let name = streaming_job.name();
978 let definition = streaming_job.definition();
979 let source_id = match &streaming_job {
980 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
981 _ => None,
982 };
983
984 match self
986 .create_streaming_job_inner(
987 ctx,
988 streaming_job,
989 fragment_graph,
990 specific_resource_group,
991 permit,
992 )
993 .await
994 {
995 Ok(version) => Ok(version),
996 Err(err) => {
997 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
998 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
999 id: job_id,
1000 name,
1001 definition,
1002 error: err.as_report().to_string(),
1003 };
1004 self.env.event_log_manager_ref().add_event_logs(vec![
1005 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1006 ]);
1007 let (aborted, _) = self
1008 .metadata_manager
1009 .catalog_controller
1010 .try_abort_creating_streaming_job(job_id, false)
1011 .await?;
1012 if aborted {
1013 tracing::warn!(id = %job_id, "aborted streaming job");
1014 if let Some(source_id) = source_id {
1016 self.source_manager
1017 .apply_source_change(SourceChange::DropSource {
1018 dropped_source_ids: vec![source_id],
1019 })
1020 .await;
1021 }
1022 }
1023 Err(err)
1024 }
1025 }
1026 }
1027
1028 #[await_tree::instrument(boxed)]
1029 async fn create_streaming_job_inner(
1030 &self,
1031 ctx: StreamContext,
1032 mut streaming_job: StreamingJob,
1033 fragment_graph: StreamFragmentGraphProto,
1034 specific_resource_group: Option<String>,
1035 permit: OwnedSemaphorePermit,
1036 ) -> MetaResult<NotificationVersion> {
1037 let mut fragment_graph =
1038 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1039 streaming_job.set_info_from_graph(&fragment_graph);
1040
1041 let incomplete_internal_tables = fragment_graph
1043 .incomplete_internal_tables()
1044 .into_values()
1045 .collect_vec();
1046 let table_id_map = self
1047 .metadata_manager
1048 .catalog_controller
1049 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1050 .await?;
1051 fragment_graph.refill_internal_table_ids(table_id_map);
1052
1053 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1055 let (ctx, stream_job_fragments) = self
1056 .build_stream_job(ctx, streaming_job, fragment_graph, specific_resource_group)
1057 .await?;
1058
1059 let streaming_job = &ctx.streaming_job;
1060
1061 match streaming_job {
1062 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1063 self.validate_cdc_table(table, &stream_job_fragments)
1064 .await?;
1065 }
1066 StreamingJob::Table(Some(source), ..) => {
1067 self.source_manager.register_source(source).await?;
1069 let connector_name = source
1070 .get_with_properties()
1071 .get(UPSTREAM_SOURCE_KEY)
1072 .cloned();
1073 let attr = source.info.as_ref().map(|source_info| {
1074 jsonbb::json!({
1075 "format": source_info.format().as_str_name(),
1076 "encode": source_info.row_encode().as_str_name(),
1077 })
1078 });
1079 report_create_object(
1080 streaming_job.id(),
1081 "source",
1082 PbTelemetryDatabaseObject::Source,
1083 connector_name,
1084 attr,
1085 );
1086 }
1087 StreamingJob::Sink(sink) => {
1088 if sink.auto_refresh_schema_from_table.is_some() {
1089 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1090 }
1091 validate_sink(sink).await?;
1093 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1094 let attr = sink.format_desc.as_ref().map(|sink_info| {
1095 jsonbb::json!({
1096 "format": sink_info.format().as_str_name(),
1097 "encode": sink_info.encode().as_str_name(),
1098 })
1099 });
1100 report_create_object(
1101 streaming_job.id(),
1102 "sink",
1103 PbTelemetryDatabaseObject::Sink,
1104 connector_name,
1105 attr,
1106 );
1107 }
1108 StreamingJob::Source(source) => {
1109 self.source_manager.register_source(source).await?;
1111 let connector_name = source
1112 .get_with_properties()
1113 .get(UPSTREAM_SOURCE_KEY)
1114 .cloned();
1115 let attr = source.info.as_ref().map(|source_info| {
1116 jsonbb::json!({
1117 "format": source_info.format().as_str_name(),
1118 "encode": source_info.row_encode().as_str_name(),
1119 })
1120 });
1121 report_create_object(
1122 streaming_job.id(),
1123 "source",
1124 PbTelemetryDatabaseObject::Source,
1125 connector_name,
1126 attr,
1127 );
1128 }
1129 _ => {}
1130 }
1131
1132 self.metadata_manager
1133 .catalog_controller
1134 .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1135 .await?;
1136
1137 let version = self
1139 .stream_manager
1140 .create_streaming_job(stream_job_fragments, ctx, permit)
1141 .await?;
1142
1143 Ok(version)
1144 }
1145
1146 pub async fn drop_object(
1148 &self,
1149 object_type: ObjectType,
1150 object_id: impl Into<ObjectId>,
1151 drop_mode: DropMode,
1152 ) -> MetaResult<NotificationVersion> {
1153 let object_id = object_id.into();
1154 let (release_ctx, version) = self
1155 .metadata_manager
1156 .catalog_controller
1157 .drop_object(object_type, object_id, drop_mode)
1158 .await?;
1159
1160 if object_type == ObjectType::Source {
1161 self.env
1162 .notification_manager_ref()
1163 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1164 }
1165
1166 let ReleaseContext {
1167 database_id,
1168 removed_streaming_job_ids,
1169 removed_state_table_ids,
1170 removed_source_ids,
1171 removed_secret_ids: secret_ids,
1172 removed_source_fragments,
1173 removed_actors,
1174 removed_fragments,
1175 removed_sink_fragment_by_targets,
1176 removed_iceberg_table_sinks,
1177 } = release_ctx;
1178
1179 let _guard = self.source_manager.pause_tick().await;
1180 self.stream_manager
1181 .drop_streaming_jobs(
1182 database_id,
1183 removed_actors.iter().map(|id| *id as _).collect(),
1184 removed_streaming_job_ids,
1185 removed_state_table_ids,
1186 removed_fragments.iter().map(|id| *id as _).collect(),
1187 removed_sink_fragment_by_targets
1188 .into_iter()
1189 .map(|(target, sinks)| {
1190 (target as _, sinks.into_iter().map(|id| id as _).collect())
1191 })
1192 .collect(),
1193 )
1194 .await;
1195
1196 self.source_manager
1199 .apply_source_change(SourceChange::DropSource {
1200 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1201 })
1202 .await;
1203
1204 let dropped_source_fragments = removed_source_fragments;
1207 self.source_manager
1208 .apply_source_change(SourceChange::DropMv {
1209 dropped_source_fragments,
1210 })
1211 .await;
1212
1213 for sink in removed_iceberg_table_sinks {
1215 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1216 .expect("Iceberg sink should be valid");
1217 let iceberg_sink =
1218 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1219 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1220 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1221 tracing::info!(
1222 "dropping iceberg table {} for dropped sink",
1223 table_identifier
1224 );
1225
1226 let _ = iceberg_catalog
1227 .drop_table(&table_identifier)
1228 .await
1229 .inspect_err(|err| {
1230 tracing::error!(
1231 "failed to drop iceberg table {} during cleanup: {}",
1232 table_identifier,
1233 err.as_report()
1234 );
1235 });
1236 }
1237 }
1238
1239 for secret in secret_ids {
1241 LocalSecretManager::global().remove_secret(secret);
1242 }
1243 Ok(version)
1244 }
1245
1246 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1248 pub async fn replace_job(
1249 &self,
1250 mut streaming_job: StreamingJob,
1251 fragment_graph: StreamFragmentGraphProto,
1252 ) -> MetaResult<NotificationVersion> {
1253 match &streaming_job {
1254 StreamingJob::Table(..)
1255 | StreamingJob::Source(..)
1256 | StreamingJob::MaterializedView(..) => {}
1257 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1258 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1259 }
1260 }
1261
1262 let job_id = streaming_job.id();
1263
1264 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1265 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1266
1267 let original_max_parallelism = self
1269 .metadata_manager
1270 .get_job_max_parallelism(streaming_job.id())
1271 .await?;
1272 let fragment_graph = PbStreamFragmentGraph {
1273 max_parallelism: original_max_parallelism as _,
1274 ..fragment_graph
1275 };
1276
1277 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1279 streaming_job.set_info_from_graph(&fragment_graph);
1280
1281 let streaming_job = streaming_job;
1283
1284 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1285 let auto_refresh_schema_sinks = self
1286 .metadata_manager
1287 .catalog_controller
1288 .get_sink_auto_refresh_schema_from(table.id)
1289 .await?;
1290 if !auto_refresh_schema_sinks.is_empty() {
1291 let original_table_columns = self
1292 .metadata_manager
1293 .catalog_controller
1294 .get_table_columns(table.id)
1295 .await?;
1296 let mut original_table_column_ids: HashSet<_> = original_table_columns
1298 .iter()
1299 .map(|col| col.column_id())
1300 .collect();
1301 let newly_added_columns = table
1302 .columns
1303 .iter()
1304 .filter(|col| {
1305 !original_table_column_ids.remove(&ColumnId::new(
1306 col.column_desc.as_ref().unwrap().column_id as _,
1307 ))
1308 })
1309 .map(|col| ColumnCatalog::from(col.clone()))
1310 .collect_vec();
1311 if !original_table_column_ids.is_empty() {
1312 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1313 }
1314 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1315 for sink in auto_refresh_schema_sinks {
1316 let sink_job_fragments = self
1317 .metadata_manager
1318 .get_job_fragments_by_id(sink.id.as_job_id())
1319 .await?;
1320 if sink_job_fragments.fragments.len() != 1 {
1321 return Err(anyhow!(
1322 "auto schema refresh sink must have only one fragment, but got {}",
1323 sink_job_fragments.fragments.len()
1324 )
1325 .into());
1326 }
1327 let original_sink_fragment =
1328 sink_job_fragments.fragments.into_values().next().unwrap();
1329 let (new_sink_fragment, new_schema, new_log_store_table) =
1330 rewrite_refresh_schema_sink_fragment(
1331 &original_sink_fragment,
1332 &sink,
1333 &newly_added_columns,
1334 table,
1335 fragment_graph.table_fragment_id(),
1336 self.env.id_gen_manager(),
1337 self.env.actor_id_generator(),
1338 )?;
1339
1340 assert_eq!(
1341 original_sink_fragment.actors.len(),
1342 new_sink_fragment.actors.len()
1343 );
1344 let actor_status = (0..original_sink_fragment.actors.len())
1345 .map(|i| {
1346 let worker_node_id = sink_job_fragments.actor_status
1347 [&original_sink_fragment.actors[i].actor_id]
1348 .location
1349 .as_ref()
1350 .unwrap()
1351 .worker_node_id;
1352 (
1353 new_sink_fragment.actors[i].actor_id,
1354 PbActorStatus {
1355 location: Some(PbActorLocation { worker_node_id }),
1356 },
1357 )
1358 })
1359 .collect();
1360
1361 let streaming_job = StreamingJob::Sink(sink);
1362
1363 let tmp_sink_id = self
1364 .metadata_manager
1365 .catalog_controller
1366 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1367 .await?
1368 .as_sink_id();
1369 let StreamingJob::Sink(sink) = streaming_job else {
1370 unreachable!()
1371 };
1372
1373 sinks.push(AutoRefreshSchemaSinkContext {
1374 tmp_sink_id,
1375 original_sink: sink,
1376 original_fragment: original_sink_fragment,
1377 new_schema,
1378 newly_add_fields: newly_added_columns
1379 .iter()
1380 .map(|col| Field::from(&col.column_desc))
1381 .collect(),
1382 new_fragment: new_sink_fragment,
1383 new_log_store_table,
1384 actor_status,
1385 });
1386 }
1387 Some(sinks)
1388 } else {
1389 None
1390 }
1391 } else {
1392 None
1393 };
1394
1395 let tmp_id = self
1396 .metadata_manager
1397 .catalog_controller
1398 .create_job_catalog_for_replace(
1399 &streaming_job,
1400 Some(&ctx),
1401 fragment_graph.specified_parallelism().as_ref(),
1402 Some(fragment_graph.max_parallelism()),
1403 )
1404 .await?;
1405
1406 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1407 sinks
1408 .iter()
1409 .map(|sink| sink.tmp_sink_id.as_object_id())
1410 .collect_vec()
1411 });
1412
1413 tracing::debug!(id = %job_id, "building replace streaming job");
1414 let mut updated_sink_catalogs = vec![];
1415
1416 let mut drop_table_connector_ctx = None;
1417 let result: MetaResult<_> = try {
1418 let (mut ctx, mut stream_job_fragments) = self
1419 .build_replace_job(
1420 ctx,
1421 &streaming_job,
1422 fragment_graph,
1423 tmp_id,
1424 auto_refresh_schema_sinks,
1425 )
1426 .await?;
1427 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1428 let auto_refresh_schema_sink_finish_ctx =
1429 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1430 sinks
1431 .iter()
1432 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1433 tmp_sink_id: sink.tmp_sink_id,
1434 original_sink_id: sink.original_sink.id,
1435 columns: sink.new_schema.clone(),
1436 new_log_store_table: sink
1437 .new_log_store_table
1438 .as_ref()
1439 .map(|table| (table.id, table.columns.clone())),
1440 })
1441 .collect()
1442 });
1443
1444 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1446 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1447 let upstream_infos = self
1448 .metadata_manager
1449 .catalog_controller
1450 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1451 .await?;
1452 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1453
1454 for upstream_info in &upstream_infos {
1455 let upstream_fragment_id = upstream_info.sink_fragment_id;
1456 ctx.upstream_fragment_downstreams
1457 .entry(upstream_fragment_id)
1458 .or_default()
1459 .push(upstream_info.new_sink_downstream.clone());
1460 if upstream_info.sink_original_target_columns.is_empty() {
1461 updated_sink_catalogs.push(upstream_info.sink_id);
1462 }
1463 }
1464 }
1465
1466 let replace_upstream = ctx.replace_upstream.clone();
1467
1468 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1469 let empty_downstreams = FragmentDownstreamRelation::default();
1470 for sink in sinks {
1471 self.metadata_manager
1472 .catalog_controller
1473 .prepare_streaming_job(
1474 sink.tmp_sink_id.as_job_id(),
1475 || [&sink.new_fragment].into_iter(),
1476 &empty_downstreams,
1477 true,
1478 None,
1479 )
1480 .await?;
1481 }
1482 }
1483
1484 self.metadata_manager
1485 .catalog_controller
1486 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1487 .await?;
1488
1489 self.stream_manager
1490 .replace_stream_job(stream_job_fragments, ctx)
1491 .await?;
1492 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1493 };
1494
1495 match result {
1496 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1497 let version = self
1498 .metadata_manager
1499 .catalog_controller
1500 .finish_replace_streaming_job(
1501 tmp_id,
1502 streaming_job,
1503 replace_upstream,
1504 SinkIntoTableContext {
1505 updated_sink_catalogs,
1506 },
1507 drop_table_connector_ctx.as_ref(),
1508 auto_refresh_schema_sink_finish_ctx,
1509 )
1510 .await?;
1511 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1512 self.source_manager
1513 .apply_source_change(SourceChange::DropSource {
1514 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1515 })
1516 .await;
1517 }
1518 Ok(version)
1519 }
1520 Err(err) => {
1521 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1522 let _ = self.metadata_manager
1523 .catalog_controller
1524 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1525 .await.inspect_err(|err| {
1526 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1527 });
1528 Err(err)
1529 }
1530 }
1531 }
1532
1533 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1534 )]
1535 async fn drop_streaming_job(
1536 &self,
1537 job_id: StreamingJobId,
1538 drop_mode: DropMode,
1539 ) -> MetaResult<NotificationVersion> {
1540 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1541
1542 let (object_id, object_type) = match job_id {
1543 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1544 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1545 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1546 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1547 };
1548
1549 let job_status = self
1550 .metadata_manager
1551 .catalog_controller
1552 .get_streaming_job_status(job_id.id())
1553 .await?;
1554 let version = match job_status {
1555 JobStatus::Initial => {
1556 unreachable!(
1557 "Job with Initial status should not notify frontend and therefore should not arrive here"
1558 );
1559 }
1560 JobStatus::Creating => {
1561 self.stream_manager
1562 .cancel_streaming_jobs(vec![job_id.id()])
1563 .await?;
1564 IGNORED_NOTIFICATION_VERSION
1565 }
1566 JobStatus::Created => {
1567 let version = self.drop_object(object_type, object_id, drop_mode).await?;
1568 #[cfg(not(madsim))]
1569 if let StreamingJobId::Sink(sink_id) = job_id {
1570 let db = self.env.meta_store_ref().conn.clone();
1573 clean_all_rows_by_sink_id(&db, sink_id).await?;
1574 }
1575 version
1576 }
1577 };
1578
1579 Ok(version)
1580 }
1581
1582 fn resolve_stream_parallelism(
1586 &self,
1587 specified: Option<NonZeroUsize>,
1588 max: NonZeroUsize,
1589 cluster_info: &StreamingClusterInfo,
1590 resource_group: String,
1591 ) -> MetaResult<NonZeroUsize> {
1592 let available = cluster_info.parallelism(&resource_group);
1593 let Some(available) = NonZeroUsize::new(available) else {
1594 bail_unavailable!(
1595 "no available slots to schedule in resource group \"{}\", \
1596 have you allocated any compute nodes within this resource group?",
1597 resource_group
1598 );
1599 };
1600
1601 if let Some(specified) = specified {
1602 if specified > max {
1603 bail_invalid_parameter!(
1604 "specified parallelism {} should not exceed max parallelism {}",
1605 specified,
1606 max,
1607 );
1608 }
1609 if specified > available {
1610 bail_unavailable!(
1611 "insufficient parallelism to schedule in resource group \"{}\", \
1612 required: {}, available: {}",
1613 resource_group,
1614 specified,
1615 available,
1616 );
1617 }
1618 Ok(specified)
1619 } else {
1620 let default_parallelism = match self.env.opts.default_parallelism {
1622 DefaultParallelism::Full => available,
1623 DefaultParallelism::Default(num) => {
1624 if num > available {
1625 bail_unavailable!(
1626 "insufficient parallelism to schedule in resource group \"{}\", \
1627 required: {}, available: {}",
1628 resource_group,
1629 num,
1630 available,
1631 );
1632 }
1633 num
1634 }
1635 };
1636
1637 if default_parallelism > max {
1638 tracing::warn!(
1639 max_parallelism = max.get(),
1640 resource_group,
1641 "too many parallelism available, use max parallelism instead",
1642 );
1643 }
1644 Ok(default_parallelism.min(max))
1645 }
1646 }
1647
1648 #[await_tree::instrument]
1654 pub(crate) async fn build_stream_job(
1655 &self,
1656 stream_ctx: StreamContext,
1657 mut stream_job: StreamingJob,
1658 fragment_graph: StreamFragmentGraph,
1659 specific_resource_group: Option<String>,
1660 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1661 let id = stream_job.id();
1662 let specified_parallelism = fragment_graph.specified_parallelism();
1663 let expr_context = stream_ctx.to_expr_context();
1664 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1665
1666 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1668
1669 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1673 fragment_graph.collect_snapshot_backfill_info()?;
1674 assert!(
1675 snapshot_backfill_info
1676 .iter()
1677 .chain([&cross_db_snapshot_backfill_info])
1678 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1679 .all(|backfill_epoch| backfill_epoch.is_none()),
1680 "should not set backfill epoch when initially build the job: {:?} {:?}",
1681 snapshot_backfill_info,
1682 cross_db_snapshot_backfill_info
1683 );
1684
1685 let locality_fragment_state_table_mapping =
1686 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1687
1688 self.metadata_manager
1690 .catalog_controller
1691 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1692 .await?;
1693
1694 let upstream_table_ids = fragment_graph
1695 .dependent_table_ids()
1696 .iter()
1697 .filter(|id| {
1698 !cross_db_snapshot_backfill_info
1699 .upstream_mv_table_id_to_backfill_epoch
1700 .contains_key(id)
1701 })
1702 .cloned()
1703 .collect();
1704
1705 let (upstream_root_fragments, existing_actor_location) = self
1706 .metadata_manager
1707 .get_upstream_root_fragments(&upstream_table_ids)
1708 .await?;
1709
1710 if snapshot_backfill_info.is_some() {
1711 match stream_job {
1712 StreamingJob::MaterializedView(_)
1713 | StreamingJob::Sink(_)
1714 | StreamingJob::Index(_, _) => {}
1715 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1716 return Err(
1717 anyhow!("snapshot_backfill not enabled for table and source").into(),
1718 );
1719 }
1720 }
1721 }
1722
1723 let upstream_actors = upstream_root_fragments
1724 .values()
1725 .map(|(fragment, _)| {
1726 (
1727 fragment.fragment_id,
1728 fragment.actors.keys().copied().collect(),
1729 )
1730 })
1731 .collect();
1732
1733 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1734 fragment_graph,
1735 FragmentGraphUpstreamContext {
1736 upstream_root_fragments,
1737 upstream_actor_location: existing_actor_location,
1738 },
1739 (&stream_job).into(),
1740 )?;
1741 let resource_group = match specific_resource_group {
1742 None => {
1743 self.metadata_manager
1744 .get_database_resource_group(stream_job.database_id())
1745 .await?
1746 }
1747 Some(resource_group) => resource_group,
1748 };
1749
1750 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1752
1753 let parallelism = self.resolve_stream_parallelism(
1754 specified_parallelism,
1755 max_parallelism,
1756 &cluster_info,
1757 resource_group.clone(),
1758 )?;
1759
1760 let parallelism = self
1761 .env
1762 .system_params_reader()
1763 .await
1764 .adaptive_parallelism_strategy()
1765 .compute_target_parallelism(parallelism.get());
1766
1767 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1768 let actor_graph_builder = ActorGraphBuilder::new(
1769 id,
1770 resource_group,
1771 complete_graph,
1772 cluster_info,
1773 parallelism,
1774 )?;
1775
1776 let ActorGraphBuildResult {
1777 graph,
1778 downstream_fragment_relations,
1779 building_locations,
1780 upstream_fragment_downstreams,
1781 new_no_shuffle,
1782 replace_upstream,
1783 ..
1784 } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
1785 assert!(replace_upstream.is_empty());
1786
1787 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1794 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1795 _ => TableParallelism::Fixed(parallelism.get()),
1796 };
1797
1798 let stream_job_fragments = StreamJobFragments::new(
1799 id,
1800 graph,
1801 &building_locations.actor_locations,
1802 stream_ctx.clone(),
1803 table_parallelism,
1804 max_parallelism.get(),
1805 );
1806
1807 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1808 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1809 }
1810
1811 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1812 && let Ok(table_id) = sink.get_target_table()
1813 {
1814 let tables = self
1815 .metadata_manager
1816 .get_table_catalog_by_ids(&[*table_id])
1817 .await?;
1818 let target_table = tables
1819 .first()
1820 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1821 let sink_fragment = stream_job_fragments
1822 .sink_fragment()
1823 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1824 let mview_fragment_id = self
1825 .metadata_manager
1826 .catalog_controller
1827 .get_mview_fragment_by_id(table_id.as_job_id())
1828 .await?;
1829 let upstream_sink_info = build_upstream_sink_info(
1830 sink,
1831 sink_fragment.fragment_id as _,
1832 target_table,
1833 mview_fragment_id,
1834 )?;
1835 Some(upstream_sink_info)
1836 } else {
1837 None
1838 };
1839
1840 let ctx = CreateStreamingJobContext {
1841 upstream_fragment_downstreams,
1842 new_no_shuffle,
1843 upstream_actors,
1844 building_locations,
1845 definition: stream_job.definition(),
1846 create_type: stream_job.create_type(),
1847 job_type: (&stream_job).into(),
1848 streaming_job: stream_job,
1849 new_upstream_sink,
1850 option: CreateStreamingJobOption {},
1851 snapshot_backfill_info,
1852 cross_db_snapshot_backfill_info,
1853 fragment_backfill_ordering,
1854 locality_fragment_state_table_mapping,
1855 };
1856
1857 Ok((
1858 ctx,
1859 StreamJobFragmentsToCreate {
1860 inner: stream_job_fragments,
1861 downstreams: downstream_fragment_relations,
1862 },
1863 ))
1864 }
1865
1866 pub(crate) async fn build_replace_job(
1872 &self,
1873 stream_ctx: StreamContext,
1874 stream_job: &StreamingJob,
1875 mut fragment_graph: StreamFragmentGraph,
1876 tmp_job_id: JobId,
1877 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1878 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1879 match &stream_job {
1880 StreamingJob::Table(..)
1881 | StreamingJob::Source(..)
1882 | StreamingJob::MaterializedView(..) => {}
1883 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1884 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1885 }
1886 }
1887
1888 let id = stream_job.id();
1889 let expr_context = stream_ctx.to_expr_context();
1890
1891 let mut drop_table_associated_source_id = None;
1893 if let StreamingJob::Table(None, _, _) = &stream_job {
1894 drop_table_associated_source_id = self
1895 .metadata_manager
1896 .get_table_associated_source_id(id.as_mv_table_id())
1897 .await?;
1898 }
1899
1900 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1901 let old_internal_table_ids = old_fragments.internal_table_ids();
1902
1903 let mut drop_table_connector_ctx = None;
1905 if let Some(to_remove_source_id) = drop_table_associated_source_id {
1906 debug_assert!(old_internal_table_ids.len() == 1);
1908
1909 drop_table_connector_ctx = Some(DropTableConnectorContext {
1910 to_change_streaming_job_id: id,
1913 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
1915 });
1916 } else if stream_job.is_materialized_view() {
1917 let old_fragments_upstreams = self
1920 .metadata_manager
1921 .catalog_controller
1922 .upstream_fragments(old_fragments.fragment_ids())
1923 .await?;
1924
1925 let old_state_graph =
1926 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1927 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1928 let mapping =
1929 state_match::match_graph_internal_tables(&new_state_graph, &old_state_graph)
1930 .context("incompatible altering on the streaming job states")?;
1931
1932 fragment_graph.fit_internal_table_ids_with_mapping(mapping);
1933 } else {
1934 let old_internal_tables = self
1937 .metadata_manager
1938 .get_table_catalog_by_ids(&old_internal_table_ids)
1939 .await?;
1940 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
1941 }
1942
1943 let original_root_fragment = old_fragments
1946 .root_fragment()
1947 .expect("root fragment not found");
1948
1949 let job_type = StreamingJobType::from(stream_job);
1950
1951 let (mut downstream_fragments, mut downstream_actor_location) =
1953 self.metadata_manager.get_downstream_fragments(id).await?;
1954
1955 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
1956 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
1957 .iter()
1958 .map(|sink| sink.original_fragment.fragment_id)
1959 .collect();
1960 for (_, downstream_fragment, _) in &mut downstream_fragments {
1961 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
1962 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
1963 }) {
1964 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
1965 for actor_id in downstream_fragment.actors.keys() {
1966 downstream_actor_location.remove(actor_id);
1967 }
1968 for (actor_id, status) in &sink.actor_status {
1969 downstream_actor_location
1970 .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
1971 }
1972
1973 *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
1974 }
1975 }
1976 assert!(remaining_fragment.is_empty());
1977 }
1978
1979 let complete_graph = match &job_type {
1981 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
1982 CompleteStreamFragmentGraph::with_downstreams(
1983 fragment_graph,
1984 FragmentGraphDownstreamContext {
1985 original_root_fragment_id: original_root_fragment.fragment_id,
1986 downstream_fragments,
1987 downstream_actor_location,
1988 },
1989 job_type,
1990 )?
1991 }
1992 StreamingJobType::Table(TableJobType::SharedCdcSource)
1993 | StreamingJobType::MaterializedView => {
1994 let (upstream_root_fragments, upstream_actor_location) = self
1996 .metadata_manager
1997 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
1998 .await?;
1999
2000 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2001 fragment_graph,
2002 FragmentGraphUpstreamContext {
2003 upstream_root_fragments,
2004 upstream_actor_location,
2005 },
2006 FragmentGraphDownstreamContext {
2007 original_root_fragment_id: original_root_fragment.fragment_id,
2008 downstream_fragments,
2009 downstream_actor_location,
2010 },
2011 job_type,
2012 )?
2013 }
2014 _ => unreachable!(),
2015 };
2016
2017 let resource_group = self
2018 .metadata_manager
2019 .get_existing_job_resource_group(id)
2020 .await?;
2021
2022 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2024
2025 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2028 .expect("The number of actors in the original table fragment should be greater than 0");
2029
2030 let actor_graph_builder = ActorGraphBuilder::new(
2031 id,
2032 resource_group,
2033 complete_graph,
2034 cluster_info,
2035 parallelism,
2036 )?;
2037
2038 let ActorGraphBuildResult {
2039 graph,
2040 downstream_fragment_relations,
2041 building_locations,
2042 upstream_fragment_downstreams,
2043 mut replace_upstream,
2044 new_no_shuffle,
2045 ..
2046 } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
2047
2048 if matches!(
2050 job_type,
2051 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2052 ) {
2053 assert!(upstream_fragment_downstreams.is_empty());
2054 }
2055
2056 let stream_job_fragments = StreamJobFragments::new(
2060 tmp_job_id,
2061 graph,
2062 &building_locations.actor_locations,
2063 stream_ctx,
2064 old_fragments.assigned_parallelism,
2065 old_fragments.max_parallelism,
2066 );
2067
2068 if let Some(sinks) = &auto_refresh_schema_sinks {
2069 for sink in sinks {
2070 replace_upstream
2071 .remove(&sink.new_fragment.fragment_id)
2072 .expect("should exist");
2073 }
2074 }
2075
2076 let ctx = ReplaceStreamJobContext {
2080 old_fragments,
2081 replace_upstream,
2082 new_no_shuffle,
2083 upstream_fragment_downstreams,
2084 building_locations,
2085 streaming_job: stream_job.clone(),
2086 tmp_id: tmp_job_id,
2087 drop_table_connector_ctx,
2088 auto_refresh_schema_sinks,
2089 };
2090
2091 Ok((
2092 ctx,
2093 StreamJobFragmentsToCreate {
2094 inner: stream_job_fragments,
2095 downstreams: downstream_fragment_relations,
2096 },
2097 ))
2098 }
2099
2100 async fn alter_name(
2101 &self,
2102 relation: alter_name_request::Object,
2103 new_name: &str,
2104 ) -> MetaResult<NotificationVersion> {
2105 let (obj_type, id) = match relation {
2106 alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2107 alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2108 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2109 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2110 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2111 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2112 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2113 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2114 };
2115 self.metadata_manager
2116 .catalog_controller
2117 .alter_name(obj_type, id, new_name)
2118 .await
2119 }
2120
2121 async fn alter_swap_rename(
2122 &self,
2123 object: alter_swap_rename_request::Object,
2124 ) -> MetaResult<NotificationVersion> {
2125 let (obj_type, src_id, dst_id) = match object {
2126 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2127 alter_swap_rename_request::Object::Table(objs) => {
2128 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2129 (ObjectType::Table, src_id, dst_id)
2130 }
2131 alter_swap_rename_request::Object::View(objs) => {
2132 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2133 (ObjectType::View, src_id, dst_id)
2134 }
2135 alter_swap_rename_request::Object::Source(objs) => {
2136 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2137 (ObjectType::Source, src_id, dst_id)
2138 }
2139 alter_swap_rename_request::Object::Sink(objs) => {
2140 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2141 (ObjectType::Sink, src_id, dst_id)
2142 }
2143 alter_swap_rename_request::Object::Subscription(objs) => {
2144 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2145 (ObjectType::Subscription, src_id, dst_id)
2146 }
2147 };
2148
2149 self.metadata_manager
2150 .catalog_controller
2151 .alter_swap_rename(obj_type, src_id, dst_id)
2152 .await
2153 }
2154
2155 async fn alter_owner(
2156 &self,
2157 object: Object,
2158 owner_id: UserId,
2159 ) -> MetaResult<NotificationVersion> {
2160 let (obj_type, id) = match object {
2161 Object::TableId(id) => (ObjectType::Table, id),
2162 Object::ViewId(id) => (ObjectType::View, id),
2163 Object::SourceId(id) => (ObjectType::Source, id),
2164 Object::SinkId(id) => (ObjectType::Sink, id),
2165 Object::SchemaId(id) => (ObjectType::Schema, id),
2166 Object::DatabaseId(id) => (ObjectType::Database, id),
2167 Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2168 Object::ConnectionId(id) => (ObjectType::Connection, id),
2169 };
2170 self.metadata_manager
2171 .catalog_controller
2172 .alter_owner(obj_type, id.into(), owner_id as _)
2173 .await
2174 }
2175
2176 async fn alter_set_schema(
2177 &self,
2178 object: alter_set_schema_request::Object,
2179 new_schema_id: SchemaId,
2180 ) -> MetaResult<NotificationVersion> {
2181 let (obj_type, id) = match object {
2182 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2183 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2184 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2185 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2186 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2187 alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2188 alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2189 };
2190 self.metadata_manager
2191 .catalog_controller
2192 .alter_schema(obj_type, id.into(), new_schema_id as _)
2193 .await
2194 }
2195
2196 pub async fn wait(&self) -> MetaResult<()> {
2197 let timeout_ms = 30 * 60 * 1000;
2198 for _ in 0..timeout_ms {
2199 if self
2200 .metadata_manager
2201 .catalog_controller
2202 .list_background_creating_jobs(true, None)
2203 .await?
2204 .is_empty()
2205 {
2206 return Ok(());
2207 }
2208
2209 sleep(Duration::from_millis(1)).await;
2210 }
2211 Err(MetaError::cancelled(format!(
2212 "timeout after {timeout_ms}ms"
2213 )))
2214 }
2215
2216 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2217 self.metadata_manager
2218 .catalog_controller
2219 .comment_on(comment)
2220 .await
2221 }
2222}
2223
2224fn report_create_object(
2225 job_id: JobId,
2226 event_name: &str,
2227 obj_type: PbTelemetryDatabaseObject,
2228 connector_name: Option<String>,
2229 attr_info: Option<jsonbb::Value>,
2230) {
2231 report_event(
2232 PbTelemetryEventStage::CreateStreamJob,
2233 event_name,
2234 job_id.as_raw_id() as _,
2235 connector_name,
2236 Some(obj_type),
2237 attr_info,
2238 );
2239}
2240
2241async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: SinkId) -> MetaResult<()> {
2242 match Entity::delete_many()
2243 .filter(Column::SinkId.eq(sink_id))
2244 .exec(db)
2245 .await
2246 {
2247 Ok(result) => {
2248 let deleted_count = result.rows_affected;
2249
2250 tracing::info!(
2251 "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2252 deleted_count,
2253 sink_id
2254 );
2255 Ok(())
2256 }
2257 Err(e) => {
2258 tracing::error!(
2259 "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2260 sink_id,
2261 e.as_report()
2262 );
2263 Err(e.into())
2264 }
2265 }
2266}
2267
2268pub fn build_upstream_sink_info(
2269 sink: &PbSink,
2270 sink_fragment_id: FragmentId,
2271 target_table: &PbTable,
2272 target_fragment_id: FragmentId,
2273) -> MetaResult<UpstreamSinkInfo> {
2274 let sink_columns = if !sink.original_target_columns.is_empty() {
2275 sink.original_target_columns.clone()
2276 } else {
2277 target_table.columns.clone()
2282 };
2283
2284 let sink_output_fields = sink_columns
2285 .iter()
2286 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2287 .collect_vec();
2288 let output_indices = (0..sink_output_fields.len())
2289 .map(|i| i as u32)
2290 .collect_vec();
2291
2292 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2293 let sink_idx_by_col_id = sink_columns
2294 .iter()
2295 .enumerate()
2296 .map(|(idx, col)| {
2297 let column_id = col.column_desc.as_ref().unwrap().column_id;
2298 (column_id, idx as u32)
2299 })
2300 .collect::<HashMap<_, _>>();
2301 target_table
2302 .distribution_key
2303 .iter()
2304 .map(|dist_idx| {
2305 let column_id = target_table.columns[*dist_idx as usize]
2306 .column_desc
2307 .as_ref()
2308 .unwrap()
2309 .column_id;
2310 let sink_idx = sink_idx_by_col_id
2311 .get(&column_id)
2312 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2313 Ok(*sink_idx)
2314 })
2315 .collect::<anyhow::Result<Vec<_>>>()?
2316 };
2317 let dist_key_indices =
2318 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2319 let downstream_fragment_id = target_fragment_id as _;
2320 let new_downstream_relation = DownstreamFragmentRelation {
2321 downstream_fragment_id,
2322 dispatcher_type: DispatcherType::Hash,
2323 dist_key_indices,
2324 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2325 };
2326 let current_target_columns = target_table.get_columns();
2327 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2328 Ok(UpstreamSinkInfo {
2329 sink_id: sink.id,
2330 sink_fragment_id: sink_fragment_id as _,
2331 sink_output_fields,
2332 sink_original_target_columns: sink.get_original_target_columns().clone(),
2333 project_exprs,
2334 new_sink_downstream: new_downstream_relation,
2335 })
2336}
2337
2338pub fn refill_upstream_sink_union_in_table(
2339 union_fragment_root: &mut PbStreamNode,
2340 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2341) {
2342 visit_stream_node_cont_mut(union_fragment_root, |node| {
2343 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2344 let init_upstreams = upstream_sink_infos
2345 .iter()
2346 .map(|info| PbUpstreamSinkInfo {
2347 upstream_fragment_id: info.sink_fragment_id,
2348 sink_output_schema: info.sink_output_fields.clone(),
2349 project_exprs: info.project_exprs.clone(),
2350 })
2351 .collect();
2352 upstream_sink_union.init_upstreams = init_upstreams;
2353 false
2354 } else {
2355 true
2356 }
2357 });
2358}