1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48 streaming_job,
49};
50use risingwave_pb::catalog::{
51 Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
52 Subscription, Table, View,
53};
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57 alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType as PbFragmentDistributionType;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64 StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::{OwnedSemaphorePermit, Semaphore};
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::cluster::StreamingClusterInfo;
76use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
77use crate::controller::utils::build_select_node_list;
78use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
79use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
80use crate::manager::sink_coordination::SinkCoordinatorManager;
81use crate::manager::{
82 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
83 NotificationVersion, StreamingJob, StreamingJobType,
84};
85use crate::model::{
86 DownstreamFragmentRelation, FragmentDownstreamRelation, FragmentId as CatalogFragmentId,
87 StreamContext, StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
88};
89use crate::stream::cdc::{
90 parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
91};
92use crate::stream::{
93 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
94 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
95 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
96 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
97 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
98 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
99};
100use crate::telemetry::report_event;
101use crate::{MetaError, MetaResult};
102
103#[derive(PartialEq)]
104pub enum DropMode {
105 Restrict,
106 Cascade,
107}
108
109impl DropMode {
110 pub fn from_request_setting(cascade: bool) -> DropMode {
111 if cascade {
112 DropMode::Cascade
113 } else {
114 DropMode::Restrict
115 }
116 }
117}
118
119#[derive(strum::AsRefStr)]
120pub enum StreamingJobId {
121 MaterializedView(TableId),
122 Sink(SinkId),
123 Table(Option<SourceId>, TableId),
124 Index(IndexId),
125}
126
127impl std::fmt::Display for StreamingJobId {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 write!(f, "{}", self.as_ref())?;
130 write!(f, "({})", self.id())
131 }
132}
133
134impl StreamingJobId {
135 fn id(&self) -> JobId {
136 match self {
137 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
138 StreamingJobId::Index(id) => id.as_job_id(),
139 StreamingJobId::Sink(id) => id.as_job_id(),
140 }
141 }
142}
143
144pub struct ReplaceStreamJobInfo {
147 pub streaming_job: StreamingJob,
148 pub fragment_graph: StreamFragmentGraphProto,
149}
150
151#[derive(Display)]
152pub enum DdlCommand {
153 CreateDatabase(Database),
154 DropDatabase(DatabaseId),
155 CreateSchema(Schema),
156 DropSchema(SchemaId, DropMode),
157 CreateNonSharedSource(Source),
158 DropSource(SourceId, DropMode),
159 ResetSource(SourceId),
160 CreateFunction(Function),
161 DropFunction(FunctionId, DropMode),
162 CreateView(View, HashSet<ObjectId>),
163 DropView(ViewId, DropMode),
164 CreateStreamingJob {
165 stream_job: StreamingJob,
166 fragment_graph: StreamFragmentGraphProto,
167 dependencies: HashSet<ObjectId>,
168 resource_type: streaming_job_resource_type::ResourceType,
169 if_not_exists: bool,
170 },
171 DropStreamingJob {
172 job_id: StreamingJobId,
173 drop_mode: DropMode,
174 },
175 AlterName(alter_name_request::Object, String),
176 AlterSwapRename(alter_swap_rename_request::Object),
177 ReplaceStreamJob(ReplaceStreamJobInfo),
178 AlterNonSharedSource(Source),
179 AlterObjectOwner(Object, UserId),
180 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
181 CreateConnection(Connection),
182 DropConnection(ConnectionId, DropMode),
183 CreateSecret(Secret),
184 AlterSecret(Secret),
185 DropSecret(SecretId, DropMode),
186 CommentOn(Comment),
187 CreateSubscription(Subscription),
188 DropSubscription(SubscriptionId, DropMode),
189 AlterSubscriptionRetention {
190 subscription_id: SubscriptionId,
191 retention_seconds: u64,
192 definition: String,
193 },
194 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
195 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
196}
197
198impl DdlCommand {
199 fn object(&self) -> Either<String, ObjectId> {
201 use Either::*;
202 match self {
203 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
204 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
205 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
206 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
207 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
208 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
209 DdlCommand::ResetSource(id) => Right(id.as_object_id()),
210 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
211 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
212 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
213 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
214 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
215 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
216 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
217 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
218 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
219 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
220 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
221 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
222 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
223 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
224 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
225 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
226 DdlCommand::DropSecret(id, _) => Right(id.as_object_id()),
227 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
228 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
229 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
230 DdlCommand::AlterSubscriptionRetention {
231 subscription_id, ..
232 } => Right(subscription_id.as_object_id()),
233 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
234 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
235 }
236 }
237
238 fn allow_in_recovery(&self) -> bool {
239 match self {
240 DdlCommand::DropDatabase(_)
241 | DdlCommand::DropSchema(_, _)
242 | DdlCommand::DropSource(_, _)
243 | DdlCommand::DropFunction(_, _)
244 | DdlCommand::DropView(_, _)
245 | DdlCommand::DropStreamingJob { .. }
246 | DdlCommand::DropConnection(_, _)
247 | DdlCommand::DropSecret(_, _)
248 | DdlCommand::DropSubscription(_, _)
249 | DdlCommand::AlterName(_, _)
250 | DdlCommand::AlterObjectOwner(_, _)
251 | DdlCommand::AlterSetSchema(_, _)
252 | DdlCommand::CreateDatabase(_)
253 | DdlCommand::CreateSchema(_)
254 | DdlCommand::CreateFunction(_)
255 | DdlCommand::CreateView(_, _)
256 | DdlCommand::CreateConnection(_)
257 | DdlCommand::CommentOn(_)
258 | DdlCommand::CreateSecret(_)
259 | DdlCommand::AlterSecret(_)
260 | DdlCommand::AlterSwapRename(_)
261 | DdlCommand::AlterDatabaseParam(_, _)
262 | DdlCommand::AlterStreamingJobConfig(_, _, _)
263 | DdlCommand::AlterSubscriptionRetention { .. } => true,
264 DdlCommand::CreateStreamingJob { .. }
265 | DdlCommand::CreateNonSharedSource(_)
266 | DdlCommand::ReplaceStreamJob(_)
267 | DdlCommand::AlterNonSharedSource(_)
268 | DdlCommand::ResetSource(_)
269 | DdlCommand::CreateSubscription(_) => false,
270 }
271 }
272}
273
274#[derive(Clone)]
275pub struct DdlController {
276 pub(crate) env: MetaSrvEnv,
277
278 pub(crate) metadata_manager: MetadataManager,
279 pub(crate) stream_manager: GlobalStreamManagerRef,
280 pub(crate) source_manager: SourceManagerRef,
281 barrier_manager: BarrierManagerRef,
282 sink_manager: SinkCoordinatorManager,
283 iceberg_compaction_manager: IcebergCompactionManagerRef,
284
285 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
287
288 seq: Arc<AtomicU64>,
290}
291
292#[derive(Clone)]
293pub struct CreatingStreamingJobPermit {
294 pub(crate) semaphore: Arc<Semaphore>,
295}
296
297impl CreatingStreamingJobPermit {
298 async fn new(env: &MetaSrvEnv) -> Self {
299 let mut permits = env
300 .system_params_reader()
301 .await
302 .max_concurrent_creating_streaming_jobs() as usize;
303 if permits == 0 {
304 permits = Semaphore::MAX_PERMITS;
306 }
307 let semaphore = Arc::new(Semaphore::new(permits));
308
309 let (local_notification_tx, mut local_notification_rx) =
310 tokio::sync::mpsc::unbounded_channel();
311 env.notification_manager()
312 .insert_local_sender(local_notification_tx);
313 let semaphore_clone = semaphore.clone();
314 tokio::spawn(async move {
315 while let Some(notification) = local_notification_rx.recv().await {
316 let LocalNotification::SystemParamsChange(p) = ¬ification else {
317 continue;
318 };
319 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
320 if new_permits == 0 {
321 new_permits = Semaphore::MAX_PERMITS;
322 }
323 match permits.cmp(&new_permits) {
324 Ordering::Less => {
325 semaphore_clone.add_permits(new_permits - permits);
326 }
327 Ordering::Equal => continue,
328 Ordering::Greater => {
329 let to_release = permits - new_permits;
330 let reduced = semaphore_clone.forget_permits(to_release);
331 if reduced != to_release {
333 tracing::warn!(
334 "no enough permits to release, expected {}, but reduced {}",
335 to_release,
336 reduced
337 );
338 }
339 }
340 }
341 tracing::info!(
342 "max_concurrent_creating_streaming_jobs changed from {} to {}",
343 permits,
344 new_permits
345 );
346 permits = new_permits;
347 }
348 });
349
350 Self { semaphore }
351 }
352}
353
354impl DdlController {
355 pub async fn new(
356 env: MetaSrvEnv,
357 metadata_manager: MetadataManager,
358 stream_manager: GlobalStreamManagerRef,
359 source_manager: SourceManagerRef,
360 barrier_manager: BarrierManagerRef,
361 sink_manager: SinkCoordinatorManager,
362 iceberg_compaction_manager: IcebergCompactionManagerRef,
363 ) -> Self {
364 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
365 Self {
366 env,
367 metadata_manager,
368 stream_manager,
369 source_manager,
370 barrier_manager,
371 sink_manager,
372 iceberg_compaction_manager,
373 creating_streaming_job_permits,
374 seq: Arc::new(AtomicU64::new(0)),
375 }
376 }
377
378 pub fn next_seq(&self) -> u64 {
380 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
382 }
383
384 #[allow(clippy::large_stack_frames)]
391 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
392 if !command.allow_in_recovery() {
393 self.barrier_manager.check_status_running()?;
394 }
395
396 let await_tree_key = format!("DDL Command {}", self.next_seq());
397 let await_tree_span = await_tree::span!("{command}({})", command.object());
398
399 let ctrl = self.clone();
400 let fut = Box::pin(async move {
401 match command {
402 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
403 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
404 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
405 DdlCommand::DropSchema(schema_id, drop_mode) => {
406 ctrl.drop_schema(schema_id, drop_mode).await
407 }
408 DdlCommand::CreateNonSharedSource(source) => {
409 ctrl.create_non_shared_source(source).await
410 }
411 DdlCommand::DropSource(source_id, drop_mode) => {
412 ctrl.drop_source(source_id, drop_mode).await
413 }
414 DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
415 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
416 DdlCommand::DropFunction(function_id, drop_mode) => {
417 ctrl.drop_function(function_id, drop_mode).await
418 }
419 DdlCommand::CreateView(view, dependencies) => {
420 ctrl.create_view(view, dependencies).await
421 }
422 DdlCommand::DropView(view_id, drop_mode) => {
423 ctrl.drop_view(view_id, drop_mode).await
424 }
425 DdlCommand::CreateStreamingJob {
426 stream_job,
427 fragment_graph,
428 dependencies,
429 resource_type,
430 if_not_exists,
431 } => {
432 ctrl.create_streaming_job(
433 stream_job,
434 fragment_graph,
435 dependencies,
436 resource_type,
437 if_not_exists,
438 )
439 .await
440 }
441 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
442 ctrl.drop_streaming_job(job_id, drop_mode).await
443 }
444 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
445 streaming_job,
446 fragment_graph,
447 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
448 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
449 DdlCommand::AlterObjectOwner(object, owner_id) => {
450 ctrl.alter_owner(object, owner_id).await
451 }
452 DdlCommand::AlterSetSchema(object, new_schema_id) => {
453 ctrl.alter_set_schema(object, new_schema_id).await
454 }
455 DdlCommand::CreateConnection(connection) => {
456 ctrl.create_connection(connection).await
457 }
458 DdlCommand::DropConnection(connection_id, drop_mode) => {
459 ctrl.drop_connection(connection_id, drop_mode).await
460 }
461 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
462 DdlCommand::DropSecret(secret_id, drop_mode) => {
463 ctrl.drop_secret(secret_id, drop_mode).await
464 }
465 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
466 DdlCommand::AlterNonSharedSource(source) => {
467 ctrl.alter_non_shared_source(source).await
468 }
469 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
470 DdlCommand::CreateSubscription(subscription) => {
471 ctrl.create_subscription(subscription).await
472 }
473 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
474 ctrl.drop_subscription(subscription_id, drop_mode).await
475 }
476 DdlCommand::AlterSubscriptionRetention {
477 subscription_id,
478 retention_seconds,
479 definition,
480 } => {
481 ctrl.alter_subscription_retention(
482 subscription_id,
483 retention_seconds,
484 definition,
485 )
486 .await
487 }
488 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
489 DdlCommand::AlterDatabaseParam(database_id, param) => {
490 ctrl.alter_database_param(database_id, param).await
491 }
492 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
493 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
494 .await
495 }
496 }
497 })
498 .in_current_span();
499 let fut = (self.env.await_tree_reg())
500 .register(await_tree_key, await_tree_span)
501 .instrument(Box::pin(fut));
502 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
503 Ok(Some(WaitVersion {
504 catalog_version: notification_version,
505 hummock_version_id: self.barrier_manager.get_hummock_version_id().await,
506 }))
507 }
508
509 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
510 self.barrier_manager.get_ddl_progress().await
511 }
512
513 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
514 let (version, updated_db) = self
515 .metadata_manager
516 .catalog_controller
517 .create_database(database)
518 .await?;
519 self.barrier_manager
521 .update_database_barrier(
522 updated_db.database_id,
523 updated_db.barrier_interval_ms.map(|v| v as u32),
524 updated_db.checkpoint_frequency.map(|v| v as u64),
525 )
526 .await?;
527 Ok(version)
528 }
529
530 #[tracing::instrument(skip(self), level = "debug")]
531 pub async fn reschedule_streaming_job(
532 &self,
533 job_id: JobId,
534 target: ReschedulePolicy,
535 mut deferred: bool,
536 ) -> MetaResult<()> {
537 tracing::info!("altering parallelism for job {}", job_id);
538 if self.barrier_manager.check_status_running().is_err() {
539 tracing::info!(
540 "alter parallelism is set to deferred mode because the system is in recovery state"
541 );
542 deferred = true;
543 }
544
545 self.stream_manager
546 .reschedule_streaming_job(job_id, target, deferred)
547 .await
548 }
549
550 pub async fn reschedule_streaming_job_backfill_parallelism(
551 &self,
552 job_id: JobId,
553 parallelism: Option<StreamingParallelism>,
554 mut deferred: bool,
555 ) -> MetaResult<()> {
556 tracing::info!("altering backfill parallelism for job {}", job_id);
557 if self.barrier_manager.check_status_running().is_err() {
558 tracing::info!(
559 "alter backfill parallelism is set to deferred mode because the system is in recovery state"
560 );
561 deferred = true;
562 }
563
564 self.stream_manager
565 .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
566 .await
567 }
568
569 pub async fn reschedule_cdc_table_backfill(
570 &self,
571 job_id: JobId,
572 target: ReschedulePolicy,
573 ) -> MetaResult<()> {
574 tracing::info!("alter CDC table backfill parallelism");
575 if self.barrier_manager.check_status_running().is_err() {
576 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
577 }
578 self.stream_manager
579 .reschedule_cdc_table_backfill(job_id, target)
580 .await
581 }
582
583 pub async fn reschedule_fragments(
584 &self,
585 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
586 ) -> MetaResult<()> {
587 tracing::info!(
588 "altering parallelism for fragments {:?}",
589 fragment_targets.keys()
590 );
591 let fragment_targets = fragment_targets
592 .into_iter()
593 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
594 .collect();
595
596 self.stream_manager
597 .reschedule_fragments(fragment_targets)
598 .await
599 }
600
601 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
602 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
603 .await
604 }
605
606 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
607 self.metadata_manager
608 .catalog_controller
609 .create_schema(schema)
610 .await
611 }
612
613 async fn drop_schema(
614 &self,
615 schema_id: SchemaId,
616 drop_mode: DropMode,
617 ) -> MetaResult<NotificationVersion> {
618 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
619 .await
620 }
621
622 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
624 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
625 .await
626 .context("failed to create source worker")?;
627
628 let (source_id, version) = self
629 .metadata_manager
630 .catalog_controller
631 .create_source(source)
632 .await?;
633 self.source_manager
634 .register_source_with_handle(source_id, handle)
635 .await;
636 Ok(version)
637 }
638
639 async fn drop_source(
640 &self,
641 source_id: SourceId,
642 drop_mode: DropMode,
643 ) -> MetaResult<NotificationVersion> {
644 self.drop_object(ObjectType::Source, source_id, drop_mode)
645 .await
646 }
647
648 async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
649 tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
650
651 let database_id = self
653 .metadata_manager
654 .catalog_controller
655 .get_object_database_id(source_id)
656 .await?;
657
658 self.stream_manager
659 .barrier_scheduler
660 .run_command(database_id, Command::ResetSource { source_id })
661 .await?;
662
663 let version = self
665 .metadata_manager
666 .catalog_controller
667 .notify_frontend_trivial()
668 .await;
669 Ok(version)
670 }
671
672 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
675 self.metadata_manager
676 .catalog_controller
677 .alter_non_shared_source(source)
678 .await
679 }
680
681 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
682 self.metadata_manager
683 .catalog_controller
684 .create_function(function)
685 .await
686 }
687
688 async fn drop_function(
689 &self,
690 function_id: FunctionId,
691 drop_mode: DropMode,
692 ) -> MetaResult<NotificationVersion> {
693 self.drop_object(ObjectType::Function, function_id, drop_mode)
694 .await
695 }
696
697 async fn create_view(
698 &self,
699 view: View,
700 dependencies: HashSet<ObjectId>,
701 ) -> MetaResult<NotificationVersion> {
702 self.metadata_manager
703 .catalog_controller
704 .create_view(view, dependencies)
705 .await
706 }
707
708 async fn drop_view(
709 &self,
710 view_id: ViewId,
711 drop_mode: DropMode,
712 ) -> MetaResult<NotificationVersion> {
713 self.drop_object(ObjectType::View, view_id, drop_mode).await
714 }
715
716 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
717 validate_connection(&connection).await?;
718 self.metadata_manager
719 .catalog_controller
720 .create_connection(connection)
721 .await
722 }
723
724 async fn drop_connection(
725 &self,
726 connection_id: ConnectionId,
727 drop_mode: DropMode,
728 ) -> MetaResult<NotificationVersion> {
729 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
730 .await
731 }
732
733 async fn alter_database_param(
734 &self,
735 database_id: DatabaseId,
736 param: AlterDatabaseParam,
737 ) -> MetaResult<NotificationVersion> {
738 let (version, updated_db) = self
739 .metadata_manager
740 .catalog_controller
741 .alter_database_param(database_id, param)
742 .await?;
743 self.barrier_manager
745 .update_database_barrier(
746 database_id,
747 updated_db.barrier_interval_ms.map(|v| v as u32),
748 updated_db.checkpoint_frequency.map(|v| v as u64),
749 )
750 .await?;
751 Ok(version)
752 }
753
754 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
757 let secret_store_private_key = self
758 .env
759 .opts
760 .secret_store_private_key
761 .clone()
762 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
763
764 let encrypted_payload = SecretEncryption::encrypt(
765 secret_store_private_key.as_slice(),
766 secret.get_value().as_slice(),
767 )
768 .context(format!("failed to encrypt secret {}", secret.name))?;
769 Ok(encrypted_payload
770 .serialize()
771 .context(format!("failed to serialize secret {}", secret.name))?)
772 }
773
774 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
775 let secret_plain_payload = secret.value.clone();
778 let encrypted_payload = self.get_encrypted_payload(&secret)?;
779 secret.value = encrypted_payload;
780
781 self.metadata_manager
782 .catalog_controller
783 .create_secret(secret, secret_plain_payload)
784 .await
785 }
786
787 async fn drop_secret(
788 &self,
789 secret_id: SecretId,
790 drop_mode: DropMode,
791 ) -> MetaResult<NotificationVersion> {
792 self.drop_object(ObjectType::Secret, secret_id, drop_mode)
793 .await
794 }
795
796 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
797 let secret_plain_payload = secret.value.clone();
798 let encrypted_payload = self.get_encrypted_payload(&secret)?;
799 secret.value = encrypted_payload;
800 self.metadata_manager
801 .catalog_controller
802 .alter_secret(secret, secret_plain_payload)
803 .await
804 }
805
806 async fn create_subscription(
807 &self,
808 mut subscription: Subscription,
809 ) -> MetaResult<NotificationVersion> {
810 tracing::debug!("create subscription");
811 let _permit = self
812 .creating_streaming_job_permits
813 .semaphore
814 .acquire()
815 .await
816 .unwrap();
817 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
818 self.metadata_manager
819 .catalog_controller
820 .create_subscription_catalog(&mut subscription)
821 .await?;
822 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
823 tracing::debug!(error = %err.as_report(), "failed to create subscription");
824 let _ = self
825 .metadata_manager
826 .catalog_controller
827 .try_abort_creating_subscription(subscription.id)
828 .await
829 .inspect_err(|e| {
830 tracing::error!(
831 error = %e.as_report(),
832 "failed to abort create subscription after failure"
833 );
834 });
835 return Err(err);
836 }
837
838 let version = self
839 .metadata_manager
840 .catalog_controller
841 .notify_create_subscription(subscription.id)
842 .await?;
843 tracing::debug!("finish create subscription");
844 Ok(version)
845 }
846
847 async fn drop_subscription(
848 &self,
849 subscription_id: SubscriptionId,
850 drop_mode: DropMode,
851 ) -> MetaResult<NotificationVersion> {
852 tracing::debug!("preparing drop subscription");
853 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
854 let subscription = self
855 .metadata_manager
856 .catalog_controller
857 .get_subscription_by_id(subscription_id)
858 .await?;
859 let table_id = subscription.dependent_table_id;
860 let database_id = subscription.database_id;
861 let (_, version) = self
862 .metadata_manager
863 .catalog_controller
864 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
865 .await?;
866 self.stream_manager
867 .drop_subscription(database_id, subscription_id, table_id)
868 .await;
869 tracing::debug!("finish drop subscription");
870 Ok(version)
871 }
872
873 async fn alter_subscription_retention(
874 &self,
875 subscription_id: SubscriptionId,
876 retention_seconds: u64,
877 definition: String,
878 ) -> MetaResult<NotificationVersion> {
879 tracing::debug!("alter subscription retention");
880 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
881 let (version, subscription) = self
882 .metadata_manager
883 .catalog_controller
884 .alter_subscription_retention(subscription_id, retention_seconds, definition)
885 .await?;
886 self.stream_manager
887 .alter_subscription_retention(
888 subscription.database_id,
889 subscription.id,
890 subscription.dependent_table_id,
891 subscription.retention_seconds,
892 )
893 .await?;
894 tracing::debug!("finish alter subscription retention");
895 Ok(version)
896 }
897
898 #[await_tree::instrument]
900 pub(crate) async fn validate_cdc_table(
901 &self,
902 table: &Table,
903 table_fragments: &StreamJobFragments,
904 ) -> MetaResult<()> {
905 let stream_scan_fragment = table_fragments
906 .fragments
907 .values()
908 .filter(|f| {
909 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
910 || f.fragment_type_mask
911 .contains(FragmentTypeFlag::StreamCdcScan)
912 })
913 .exactly_one()
914 .ok()
915 .with_context(|| {
916 format!(
917 "expect exactly one stream scan fragment, got: {:?}",
918 table_fragments.fragments
919 )
920 })?;
921 fn assert_parallelism(
922 distribution_type: PbFragmentDistributionType,
923 node_body: &Option<NodeBody>,
924 ) {
925 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
926 if let Some(o) = node.options
927 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
928 {
929 } else {
931 assert_eq!(
932 distribution_type,
933 PbFragmentDistributionType::Single,
934 "Non-parallelized CDC scan fragment should have Single distribution"
935 );
936 }
937 }
938 }
939 let mut found_cdc_scan = false;
940 match &stream_scan_fragment.nodes.node_body {
941 Some(NodeBody::StreamCdcScan(_)) => {
942 assert_parallelism(
943 stream_scan_fragment.distribution_type,
944 &stream_scan_fragment.nodes.node_body,
945 );
946 if self
947 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
948 .await?
949 {
950 found_cdc_scan = true;
951 }
952 }
953 Some(NodeBody::Project(_)) => {
955 for input in &stream_scan_fragment.nodes.input {
956 assert_parallelism(stream_scan_fragment.distribution_type, &input.node_body);
957 if self
958 .validate_cdc_table_inner(&input.node_body, table.id)
959 .await?
960 {
961 found_cdc_scan = true;
962 }
963 }
964 }
965 _ => {
966 bail!("Unexpected node body for stream cdc scan");
967 }
968 };
969 if !found_cdc_scan {
970 bail!("No stream cdc scan node found in stream scan fragment");
971 }
972 Ok(())
973 }
974
975 async fn validate_cdc_table_inner(
976 &self,
977 node_body: &Option<NodeBody>,
978 table_id: TableId,
979 ) -> MetaResult<bool> {
980 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
981 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
982 {
983 let options_with_secret = WithOptionsSecResolved::new(
984 cdc_table_desc.connect_properties.clone(),
985 cdc_table_desc.secret_refs.clone(),
986 );
987
988 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
989 props.init_from_pb_cdc_table_desc(cdc_table_desc);
990
991 let _enumerator = props
993 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
994 .await?;
995
996 tracing::debug!(?table_id, "validate cdc table success");
997 Ok(true)
998 } else {
999 Ok(false)
1000 }
1001 }
1002
1003 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
1004 let migrated = self
1005 .metadata_manager
1006 .catalog_controller
1007 .has_table_been_migrated(table_id)
1008 .await?;
1009 if !migrated {
1010 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
1011 } else {
1012 Ok(())
1013 }
1014 }
1015
1016 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1019 pub async fn create_streaming_job(
1020 &self,
1021 mut streaming_job: StreamingJob,
1022 fragment_graph: StreamFragmentGraphProto,
1023 dependencies: HashSet<ObjectId>,
1024 resource_type: streaming_job_resource_type::ResourceType,
1025 if_not_exists: bool,
1026 ) -> MetaResult<NotificationVersion> {
1027 if let StreamingJob::Sink(sink) = &streaming_job
1028 && let Some(target_table) = sink.target_table
1029 {
1030 self.validate_table_for_sink(target_table).await?;
1031 }
1032 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1033 let streaming_job_model = match self
1034 .metadata_manager
1035 .catalog_controller
1036 .create_job_catalog(
1037 &mut streaming_job,
1038 &ctx,
1039 &fragment_graph.parallelism,
1040 fragment_graph.max_parallelism as _,
1041 dependencies,
1042 resource_type.clone(),
1043 &fragment_graph.backfill_parallelism,
1044 )
1045 .await
1046 {
1047 Ok(model) => model,
1048 Err(meta_err) => {
1049 if !if_not_exists {
1050 return Err(meta_err);
1051 }
1052 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1053 if streaming_job.create_type() == CreateType::Foreground {
1054 let database_id = streaming_job.database_id();
1055 self.metadata_manager
1056 .wait_streaming_job_finished(database_id, *job_id)
1057 .await
1058 } else {
1059 Ok(IGNORED_NOTIFICATION_VERSION)
1060 }
1061 } else {
1062 Err(meta_err)
1063 };
1064 }
1065 };
1066 let job_id = streaming_job.id();
1067 tracing::debug!(
1068 id = %job_id,
1069 definition = streaming_job.definition(),
1070 create_type = streaming_job.create_type().as_str_name(),
1071 job_type = ?streaming_job.job_type(),
1072 "starting streaming job",
1073 );
1074 let permit = self
1076 .creating_streaming_job_permits
1077 .semaphore
1078 .clone()
1079 .acquire_owned()
1080 .instrument_await("acquire_creating_streaming_job_permit")
1081 .await
1082 .unwrap();
1083 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1084
1085 let name = streaming_job.name();
1086 let definition = streaming_job.definition();
1087 let source_id = match &streaming_job {
1088 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1089 _ => None,
1090 };
1091
1092 match self
1094 .create_streaming_job_inner(
1095 ctx,
1096 streaming_job,
1097 fragment_graph,
1098 resource_type,
1099 permit,
1100 streaming_job_model,
1101 )
1102 .await
1103 {
1104 Ok(version) => Ok(version),
1105 Err(err) => {
1106 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1107 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1108 id: job_id,
1109 name,
1110 definition,
1111 error: err.as_report().to_string(),
1112 };
1113 self.env.event_log_manager_ref().add_event_logs(vec![
1114 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1115 ]);
1116 let (aborted, _) = self
1117 .metadata_manager
1118 .catalog_controller
1119 .try_abort_creating_streaming_job(job_id, false)
1120 .await?;
1121 if aborted {
1122 tracing::warn!(id = %job_id, "aborted streaming job");
1123 if let Some(source_id) = source_id {
1125 self.source_manager
1126 .apply_source_change(SourceChange::DropSource {
1127 dropped_source_ids: vec![source_id],
1128 })
1129 .await;
1130 }
1131 }
1132 Err(err)
1133 }
1134 }
1135 }
1136
1137 #[await_tree::instrument(boxed)]
1138 async fn create_streaming_job_inner(
1139 &self,
1140 ctx: StreamContext,
1141 mut streaming_job: StreamingJob,
1142 fragment_graph: StreamFragmentGraphProto,
1143 resource_type: streaming_job_resource_type::ResourceType,
1144 permit: OwnedSemaphorePermit,
1145 streaming_job_model: streaming_job::Model,
1146 ) -> MetaResult<NotificationVersion> {
1147 let mut fragment_graph =
1148 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1149 streaming_job.set_info_from_graph(&fragment_graph);
1150
1151 let incomplete_internal_tables = fragment_graph
1153 .incomplete_internal_tables()
1154 .into_values()
1155 .collect_vec();
1156 let table_id_map = self
1157 .metadata_manager
1158 .catalog_controller
1159 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1160 .await?;
1161 fragment_graph.refill_internal_table_ids(table_id_map);
1162
1163 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1165 let (ctx, stream_job_fragments) = self
1166 .build_stream_job(
1167 ctx,
1168 streaming_job,
1169 fragment_graph,
1170 resource_type,
1171 streaming_job_model,
1172 )
1173 .await?;
1174
1175 let streaming_job = &ctx.streaming_job;
1176
1177 match streaming_job {
1178 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1179 self.validate_cdc_table(table, &stream_job_fragments)
1180 .await?;
1181 }
1182 StreamingJob::Table(Some(source), ..) => {
1183 self.source_manager.register_source(source).await?;
1185 let connector_name = source
1186 .get_with_properties()
1187 .get(UPSTREAM_SOURCE_KEY)
1188 .cloned();
1189 let attr = source.info.as_ref().map(|source_info| {
1190 jsonbb::json!({
1191 "format": source_info.format().as_str_name(),
1192 "encode": source_info.row_encode().as_str_name(),
1193 })
1194 });
1195 report_create_object(
1196 streaming_job.id(),
1197 "source",
1198 PbTelemetryDatabaseObject::Source,
1199 connector_name,
1200 attr,
1201 );
1202 }
1203 StreamingJob::Sink(sink) => {
1204 if sink.auto_refresh_schema_from_table.is_some() {
1205 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1206 }
1207 validate_sink(sink).await?;
1209 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1210 let attr = sink.format_desc.as_ref().map(|sink_info| {
1211 jsonbb::json!({
1212 "format": sink_info.format().as_str_name(),
1213 "encode": sink_info.encode().as_str_name(),
1214 })
1215 });
1216 report_create_object(
1217 streaming_job.id(),
1218 "sink",
1219 PbTelemetryDatabaseObject::Sink,
1220 connector_name,
1221 attr,
1222 );
1223 }
1224 StreamingJob::Source(source) => {
1225 self.source_manager.register_source(source).await?;
1227 let connector_name = source
1228 .get_with_properties()
1229 .get(UPSTREAM_SOURCE_KEY)
1230 .cloned();
1231 let attr = source.info.as_ref().map(|source_info| {
1232 jsonbb::json!({
1233 "format": source_info.format().as_str_name(),
1234 "encode": source_info.row_encode().as_str_name(),
1235 })
1236 });
1237 report_create_object(
1238 streaming_job.id(),
1239 "source",
1240 PbTelemetryDatabaseObject::Source,
1241 connector_name,
1242 attr,
1243 );
1244 }
1245 _ => {}
1246 }
1247
1248 let backfill_orders = ctx.fragment_backfill_ordering.to_meta_model();
1249 self.metadata_manager
1250 .catalog_controller
1251 .prepare_stream_job_fragments(
1252 &stream_job_fragments,
1253 streaming_job,
1254 false,
1255 Some(backfill_orders),
1256 )
1257 .await?;
1258
1259 let version = self
1261 .stream_manager
1262 .create_streaming_job(stream_job_fragments, ctx, permit)
1263 .await?;
1264
1265 Ok(version)
1266 }
1267
1268 pub async fn drop_object(
1270 &self,
1271 object_type: ObjectType,
1272 object_id: impl Into<ObjectId>,
1273 drop_mode: DropMode,
1274 ) -> MetaResult<NotificationVersion> {
1275 let object_id = object_id.into();
1276 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1279 let _source_tick_pause_guard = self.source_manager.pause_tick().await;
1280
1281 let (release_ctx, version) = self
1282 .metadata_manager
1283 .catalog_controller
1284 .drop_object(object_type, object_id, drop_mode)
1285 .await?;
1286
1287 if object_type == ObjectType::Source {
1288 self.env
1289 .notification_manager_ref()
1290 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1291 }
1292
1293 let ReleaseContext {
1294 database_id,
1295 removed_streaming_job_ids,
1296 removed_state_table_ids,
1297 removed_source_ids,
1298 removed_secret_ids: secret_ids,
1299 removed_source_fragments,
1300 removed_fragments,
1301 removed_sink_fragment_by_targets,
1302 removed_iceberg_table_sinks,
1303 } = release_ctx;
1304
1305 self.env
1309 .notification_manager_ref()
1310 .notify_serving_fragment_mapping_delete(
1311 removed_fragments.iter().map(|id| *id as _).collect(),
1312 );
1313
1314 self.stream_manager
1315 .drop_streaming_jobs(
1316 database_id,
1317 removed_streaming_job_ids,
1318 removed_state_table_ids,
1319 removed_fragments.iter().map(|id| *id as _).collect(),
1320 removed_sink_fragment_by_targets
1321 .into_iter()
1322 .map(|(target, sinks)| {
1323 (target as _, sinks.into_iter().map(|id| id as _).collect())
1324 })
1325 .collect(),
1326 )
1327 .await;
1328
1329 self.source_manager
1332 .apply_source_change(SourceChange::DropSource {
1333 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1334 })
1335 .await;
1336
1337 let dropped_source_fragments = removed_source_fragments;
1340 self.source_manager
1341 .apply_source_change(SourceChange::DropMv {
1342 dropped_source_fragments,
1343 })
1344 .await;
1345
1346 let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1348 .iter()
1349 .map(|sink| sink.id)
1350 .collect();
1351
1352 for sink in removed_iceberg_table_sinks {
1353 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1354 .expect("Iceberg sink should be valid");
1355 let iceberg_sink =
1356 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1357 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1358 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1359 tracing::info!(
1360 "dropping iceberg table {} for dropped sink",
1361 table_identifier
1362 );
1363
1364 let _ = iceberg_catalog
1365 .drop_table(&table_identifier)
1366 .await
1367 .inspect_err(|err| {
1368 tracing::error!(
1369 "failed to drop iceberg table {} during cleanup: {}",
1370 table_identifier,
1371 err.as_report()
1372 );
1373 });
1374 }
1375 }
1376
1377 if !iceberg_sink_ids.is_empty() {
1379 self.sink_manager
1380 .stop_sink_coordinator(iceberg_sink_ids.clone())
1381 .await;
1382
1383 for sink_id in iceberg_sink_ids {
1384 self.iceberg_compaction_manager
1385 .clear_iceberg_commits_by_sink_id(sink_id);
1386 }
1387 }
1388
1389 for secret in secret_ids {
1391 LocalSecretManager::global().remove_secret(secret);
1392 }
1393 Ok(version)
1394 }
1395
1396 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1398 pub async fn replace_job(
1399 &self,
1400 mut streaming_job: StreamingJob,
1401 fragment_graph: StreamFragmentGraphProto,
1402 ) -> MetaResult<NotificationVersion> {
1403 match &streaming_job {
1404 StreamingJob::Table(..)
1405 | StreamingJob::Source(..)
1406 | StreamingJob::MaterializedView(..) => {}
1407 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1408 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1409 }
1410 }
1411
1412 let job_id = streaming_job.id();
1413
1414 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1415 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1416
1417 let original_max_parallelism = self
1419 .metadata_manager
1420 .get_job_max_parallelism(streaming_job.id())
1421 .await?;
1422 let fragment_graph = PbStreamFragmentGraph {
1423 max_parallelism: original_max_parallelism as _,
1424 ..fragment_graph
1425 };
1426
1427 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1429 streaming_job.set_info_from_graph(&fragment_graph);
1430
1431 let streaming_job = streaming_job;
1433
1434 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1435 let auto_refresh_schema_sinks = self
1436 .metadata_manager
1437 .catalog_controller
1438 .get_sink_auto_refresh_schema_from(table.id)
1439 .await?;
1440 if !auto_refresh_schema_sinks.is_empty() {
1441 let original_table_columns = self
1442 .metadata_manager
1443 .catalog_controller
1444 .get_table_columns(table.id)
1445 .await?;
1446 let original_table_column_ids: HashSet<_> = original_table_columns
1448 .iter()
1449 .map(|col| col.column_id())
1450 .collect();
1451 let new_table_column_ids: HashSet<_> = table
1452 .columns
1453 .iter()
1454 .map(|col| ColumnId::new(col.column_desc.as_ref().unwrap().column_id as _))
1455 .collect();
1456 let newly_added_columns = table
1457 .columns
1458 .iter()
1459 .filter(|col| {
1460 !original_table_column_ids.contains(&ColumnId::new(
1461 col.column_desc.as_ref().unwrap().column_id as _,
1462 ))
1463 })
1464 .map(|col| ColumnCatalog::from(col.clone()))
1465 .collect_vec();
1466 let removed_columns = original_table_columns
1467 .iter()
1468 .filter(|col| !new_table_column_ids.contains(&col.column_id()))
1469 .cloned()
1470 .collect_vec();
1471 if !removed_columns.is_empty() {
1473 return Err(anyhow!(
1474 "new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}",
1475 table.columns,
1476 original_table_columns,
1477 removed_columns
1478 .iter()
1479 .map(|col| col.column_id())
1480 .collect_vec()
1481 )
1482 .into());
1483 }
1484 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1485 for sink in auto_refresh_schema_sinks {
1486 let sink_job_fragments = self
1487 .metadata_manager
1488 .get_job_fragments_by_id(sink.id.as_job_id())
1489 .await?;
1490 if sink_job_fragments.fragments.len() != 1 {
1491 return Err(anyhow!(
1492 "auto schema refresh sink must have only one fragment, but got {}",
1493 sink_job_fragments.fragments.len()
1494 )
1495 .into());
1496 }
1497 let sink_ctx = sink_job_fragments.ctx;
1498 let original_sink_fragment =
1499 sink_job_fragments.fragments.into_values().next().unwrap();
1500 let (new_sink_fragment, new_schema, new_log_store_table) =
1501 rewrite_refresh_schema_sink_fragment(
1502 &original_sink_fragment,
1503 &sink,
1504 &newly_added_columns,
1505 &removed_columns,
1506 table,
1507 fragment_graph.table_fragment_id(),
1508 self.env.id_gen_manager(),
1509 )?;
1510
1511 let streaming_job = StreamingJob::Sink(sink);
1512
1513 let tmp_sink_model = self
1514 .metadata_manager
1515 .catalog_controller
1516 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1517 .await?;
1518 let tmp_sink_id = tmp_sink_model.job_id.as_sink_id();
1519 let StreamingJob::Sink(sink) = streaming_job else {
1520 unreachable!()
1521 };
1522
1523 sinks.push(AutoRefreshSchemaSinkContext {
1524 tmp_sink_id,
1525 original_sink: sink,
1526 original_fragment: original_sink_fragment,
1527 new_schema,
1528 newly_add_fields: newly_added_columns
1529 .iter()
1530 .map(|col| Field::from(&col.column_desc))
1531 .collect(),
1532 removed_column_names: removed_columns
1533 .iter()
1534 .map(|col| col.name.clone())
1535 .collect(),
1536 new_fragment: new_sink_fragment,
1537 new_log_store_table: new_log_store_table.map(Box::new),
1538 ctx: sink_ctx,
1539 });
1540 }
1541 Some(sinks)
1542 } else {
1543 None
1544 }
1545 } else {
1546 None
1547 };
1548
1549 let streaming_job_model = self
1550 .metadata_manager
1551 .catalog_controller
1552 .create_job_catalog_for_replace(
1553 &streaming_job,
1554 Some(&ctx),
1555 fragment_graph.specified_parallelism().as_ref(),
1556 Some(fragment_graph.max_parallelism()),
1557 )
1558 .await?;
1559 let tmp_id = streaming_job_model.job_id;
1560
1561 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1562 sinks
1563 .iter()
1564 .map(|sink| sink.tmp_sink_id.as_object_id())
1565 .collect_vec()
1566 });
1567
1568 tracing::debug!(id = %job_id, "building replace streaming job");
1569 let mut updated_sink_catalogs = vec![];
1570
1571 let mut drop_table_connector_ctx = None;
1572 let result: MetaResult<_> = try {
1573 let (mut ctx, mut stream_job_fragments) = self
1574 .build_replace_job(
1575 ctx,
1576 &streaming_job,
1577 fragment_graph,
1578 tmp_id,
1579 auto_refresh_schema_sinks,
1580 streaming_job_model,
1581 )
1582 .await?;
1583 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1584 let auto_refresh_schema_sink_finish_ctx =
1585 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1586 sinks
1587 .iter()
1588 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1589 tmp_sink_id: sink.tmp_sink_id,
1590 original_sink_id: sink.original_sink.id,
1591 columns: sink.new_schema.clone(),
1592 new_log_store_table: sink.new_log_store_table.clone(),
1593 })
1594 .collect()
1595 });
1596
1597 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1599 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1600 let upstream_infos = self
1601 .metadata_manager
1602 .catalog_controller
1603 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1604 .await?;
1605 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1606
1607 for upstream_info in &upstream_infos {
1608 let upstream_fragment_id = upstream_info.sink_fragment_id;
1609 ctx.upstream_fragment_downstreams
1610 .entry(upstream_fragment_id)
1611 .or_default()
1612 .push(upstream_info.new_sink_downstream.clone());
1613 if upstream_info.sink_original_target_columns.is_empty() {
1614 updated_sink_catalogs.push(upstream_info.sink_id);
1615 }
1616 }
1617 }
1618
1619 let replace_upstream = ctx.replace_upstream.clone();
1620
1621 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1622 let empty_downstreams = FragmentDownstreamRelation::default();
1623 for sink in sinks {
1624 self.metadata_manager
1625 .catalog_controller
1626 .prepare_streaming_job(
1627 sink.tmp_sink_id.as_job_id(),
1628 || [&sink.new_fragment].into_iter(),
1629 &empty_downstreams,
1630 true,
1631 None,
1632 None,
1633 )
1634 .await?;
1635 }
1636 }
1637
1638 self.metadata_manager
1639 .catalog_controller
1640 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1641 .await?;
1642
1643 self.stream_manager
1644 .replace_stream_job(stream_job_fragments, ctx)
1645 .await?;
1646 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1647 };
1648
1649 match result {
1650 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1651 let version = self
1652 .metadata_manager
1653 .catalog_controller
1654 .finish_replace_streaming_job(
1655 tmp_id,
1656 streaming_job,
1657 replace_upstream,
1658 SinkIntoTableContext {
1659 updated_sink_catalogs,
1660 },
1661 drop_table_connector_ctx.as_ref(),
1662 auto_refresh_schema_sink_finish_ctx,
1663 )
1664 .await?;
1665 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1666 self.source_manager
1667 .apply_source_change(SourceChange::DropSource {
1668 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1669 })
1670 .await;
1671 }
1672 Ok(version)
1673 }
1674 Err(err) => {
1675 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1676 let _ = self.metadata_manager
1677 .catalog_controller
1678 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1679 .await.inspect_err(|err| {
1680 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1681 });
1682 Err(err)
1683 }
1684 }
1685 }
1686
1687 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1688 )]
1689 async fn drop_streaming_job(
1690 &self,
1691 job_id: StreamingJobId,
1692 drop_mode: DropMode,
1693 ) -> MetaResult<NotificationVersion> {
1694 let (object_id, object_type) = match job_id {
1695 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1696 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1697 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1698 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1699 };
1700
1701 let job_status = self
1702 .metadata_manager
1703 .catalog_controller
1704 .get_streaming_job_status(job_id.id())
1705 .await?;
1706 let version = match job_status {
1707 JobStatus::Initial => {
1708 self.metadata_manager
1709 .catalog_controller
1710 .try_abort_creating_streaming_job(job_id.id(), true)
1711 .await?;
1712 IGNORED_NOTIFICATION_VERSION
1713 }
1714 JobStatus::Creating => {
1715 self.stream_manager
1716 .cancel_streaming_jobs(vec![job_id.id()])
1717 .await?;
1718 IGNORED_NOTIFICATION_VERSION
1719 }
1720 JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1721 };
1722
1723 Ok(version)
1724 }
1725
1726 fn resolve_stream_parallelism(
1730 &self,
1731 specified: Option<NonZeroUsize>,
1732 max: NonZeroUsize,
1733 cluster_info: &StreamingClusterInfo,
1734 resource_group: String,
1735 ) -> MetaResult<NonZeroUsize> {
1736 let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1737 DdlController::resolve_stream_parallelism_inner(
1738 specified,
1739 max,
1740 available,
1741 &self.env.opts.default_parallelism,
1742 &resource_group,
1743 )
1744 }
1745
1746 fn resolve_stream_parallelism_inner(
1747 specified: Option<NonZeroUsize>,
1748 max: NonZeroUsize,
1749 available: Option<NonZeroUsize>,
1750 default_parallelism: &DefaultParallelism,
1751 resource_group: &str,
1752 ) -> MetaResult<NonZeroUsize> {
1753 let Some(available) = available else {
1754 bail_unavailable!(
1755 "no available slots to schedule in resource group \"{}\", \
1756 have you allocated any compute nodes within this resource group?",
1757 resource_group
1758 );
1759 };
1760
1761 if let Some(specified) = specified {
1762 if specified > max {
1763 bail_invalid_parameter!(
1764 "specified parallelism {} should not exceed max parallelism {}",
1765 specified,
1766 max,
1767 );
1768 }
1769 if specified > available {
1770 tracing::warn!(
1771 resource_group,
1772 specified_parallelism = specified.get(),
1773 available_parallelism = available.get(),
1774 "specified parallelism exceeds available slots, scheduling with specified value",
1775 );
1776 }
1777 return Ok(specified);
1778 }
1779
1780 let default_parallelism = match default_parallelism {
1782 DefaultParallelism::Full => available,
1783 DefaultParallelism::Default(num) => {
1784 if *num > available {
1785 tracing::warn!(
1786 resource_group,
1787 configured_parallelism = num.get(),
1788 available_parallelism = available.get(),
1789 "default parallelism exceeds available slots, scheduling with configured value",
1790 );
1791 }
1792 *num
1793 }
1794 };
1795
1796 if default_parallelism > max {
1797 tracing::warn!(
1798 max_parallelism = max.get(),
1799 resource_group,
1800 "default parallelism exceeds max parallelism, capping to max",
1801 );
1802 }
1803 Ok(default_parallelism.min(max))
1804 }
1805
1806 #[await_tree::instrument]
1812 pub(crate) async fn build_stream_job(
1813 &self,
1814 stream_ctx: StreamContext,
1815 mut stream_job: StreamingJob,
1816 fragment_graph: StreamFragmentGraph,
1817 resource_type: streaming_job_resource_type::ResourceType,
1818 streaming_job_model: streaming_job::Model,
1819 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1820 let id = stream_job.id();
1821 let specified_parallelism = fragment_graph.specified_parallelism();
1822 let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1823 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1824
1825 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1827
1828 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1832 fragment_graph.collect_snapshot_backfill_info()?;
1833 assert!(
1834 snapshot_backfill_info
1835 .iter()
1836 .chain([&cross_db_snapshot_backfill_info])
1837 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1838 .all(|backfill_epoch| backfill_epoch.is_none()),
1839 "should not set backfill epoch when initially build the job: {:?} {:?}",
1840 snapshot_backfill_info,
1841 cross_db_snapshot_backfill_info
1842 );
1843
1844 let locality_fragment_state_table_mapping =
1845 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1846
1847 self.metadata_manager
1849 .catalog_controller
1850 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1851 .await?;
1852
1853 let upstream_table_ids = fragment_graph
1854 .dependent_table_ids()
1855 .iter()
1856 .filter(|id| {
1857 !cross_db_snapshot_backfill_info
1858 .upstream_mv_table_id_to_backfill_epoch
1859 .contains_key(*id)
1860 })
1861 .cloned()
1862 .collect();
1863
1864 let upstream_root_fragments = self
1865 .metadata_manager
1866 .get_upstream_root_fragments(&upstream_table_ids)
1867 .await?;
1868
1869 if snapshot_backfill_info.is_some() {
1870 match stream_job {
1871 StreamingJob::MaterializedView(_)
1872 | StreamingJob::Sink(_)
1873 | StreamingJob::Index(_, _) => {}
1874 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1875 return Err(
1876 anyhow!("snapshot_backfill not enabled for table and source").into(),
1877 );
1878 }
1879 }
1880 }
1881
1882 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1883 fragment_graph,
1884 FragmentGraphUpstreamContext {
1885 upstream_root_fragments,
1886 },
1887 (&stream_job).into(),
1888 )?;
1889 let resource_group = if let Some(group) = resource_type.resource_group() {
1890 group
1891 } else {
1892 self.metadata_manager
1893 .get_database_resource_group(stream_job.database_id())
1894 .await?
1895 };
1896 let is_serverless_backfill = matches!(
1897 &resource_type,
1898 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1899 );
1900
1901 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1903
1904 let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1905 let parallelism = self.resolve_stream_parallelism(
1906 initial_parallelism,
1907 max_parallelism,
1908 &cluster_info,
1909 resource_group.clone(),
1910 )?;
1911
1912 let parallelism = if initial_parallelism.is_some() {
1913 parallelism.get()
1914 } else {
1915 let adaptive_strategy = match stream_ctx.adaptive_parallelism_strategy {
1917 Some(strategy) => strategy,
1918 None => self
1919 .env
1920 .system_params_reader()
1921 .await
1922 .adaptive_parallelism_strategy(),
1923 };
1924 adaptive_strategy.compute_target_parallelism(parallelism.get())
1925 };
1926
1927 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1928 let actor_graph_builder = ActorGraphBuilder::new(id, complete_graph, parallelism)?;
1929
1930 let ActorGraphBuildResult {
1931 graph,
1932 downstream_fragment_relations,
1933 upstream_fragment_downstreams,
1934 replace_upstream,
1935 } = actor_graph_builder.generate_graph()?;
1936 assert!(replace_upstream.is_empty());
1937
1938 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1945 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1946 _ => TableParallelism::Fixed(parallelism.get()),
1947 };
1948
1949 let stream_job_fragments = StreamJobFragments::new(
1950 id,
1951 graph,
1952 stream_ctx.clone(),
1953 table_parallelism,
1954 max_parallelism.get(),
1955 );
1956
1957 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1958 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1959 }
1960
1961 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1962 && let Ok(table_id) = sink.get_target_table()
1963 {
1964 let tables = self
1965 .metadata_manager
1966 .get_table_catalog_by_ids(&[*table_id])
1967 .await?;
1968 let target_table = tables
1969 .first()
1970 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1971 let sink_fragment = stream_job_fragments
1972 .sink_fragment()
1973 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1974 let mview_fragment_id = self
1975 .metadata_manager
1976 .catalog_controller
1977 .get_mview_fragment_by_id(table_id.as_job_id())
1978 .await?;
1979 let upstream_sink_info = build_upstream_sink_info(
1980 sink.id,
1981 sink.original_target_columns.clone(),
1982 sink_fragment.fragment_id as _,
1983 target_table,
1984 mview_fragment_id,
1985 )?;
1986 Some(upstream_sink_info)
1987 } else {
1988 None
1989 };
1990
1991 let mut cdc_table_snapshot_splits = None;
1992 if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1993 && let Some((_, stream_cdc_scan)) =
1994 parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1995 {
1996 {
1997 let splits = try_init_parallel_cdc_table_snapshot_splits(
1999 table.id,
2000 stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
2001 self.env.meta_store_ref(),
2002 stream_cdc_scan.options.as_ref().unwrap(),
2003 self.env.opts.cdc_table_split_init_insert_batch_size,
2004 self.env.opts.cdc_table_split_init_sleep_interval_splits,
2005 self.env.opts.cdc_table_split_init_sleep_duration_millis,
2006 )
2007 .await?;
2008 cdc_table_snapshot_splits = Some(splits);
2009 }
2010 }
2011
2012 let ctx = CreateStreamingJobContext {
2013 upstream_fragment_downstreams,
2014 database_resource_group: resource_group,
2015 definition: stream_job.definition(),
2016 create_type: stream_job.create_type(),
2017 job_type: (&stream_job).into(),
2018 streaming_job: stream_job,
2019 new_upstream_sink,
2020 option: CreateStreamingJobOption {},
2021 snapshot_backfill_info,
2022 cross_db_snapshot_backfill_info,
2023 fragment_backfill_ordering,
2024 locality_fragment_state_table_mapping,
2025 cdc_table_snapshot_splits,
2026 is_serverless_backfill,
2027 streaming_job_model,
2028 };
2029
2030 Ok((
2031 ctx,
2032 StreamJobFragmentsToCreate {
2033 inner: stream_job_fragments,
2034 downstreams: downstream_fragment_relations,
2035 },
2036 ))
2037 }
2038
2039 pub(crate) async fn build_replace_job(
2045 &self,
2046 stream_ctx: StreamContext,
2047 stream_job: &StreamingJob,
2048 mut fragment_graph: StreamFragmentGraph,
2049 tmp_job_id: JobId,
2050 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
2051 streaming_job_model: streaming_job::Model,
2052 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
2053 match &stream_job {
2054 StreamingJob::Table(..)
2055 | StreamingJob::Source(..)
2056 | StreamingJob::MaterializedView(..) => {}
2057 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
2058 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
2059 }
2060 }
2061
2062 let id = stream_job.id();
2063
2064 let mut drop_table_associated_source_id = None;
2066 if let StreamingJob::Table(None, _, _) = &stream_job {
2067 drop_table_associated_source_id = self
2068 .metadata_manager
2069 .get_table_associated_source_id(id.as_mv_table_id())
2070 .await?;
2071 }
2072
2073 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
2074 let old_internal_table_ids = old_fragments.internal_table_ids();
2075
2076 let mut drop_table_connector_ctx = None;
2078 if let Some(to_remove_source_id) = drop_table_associated_source_id {
2079 debug_assert!(old_internal_table_ids.len() == 1);
2081
2082 drop_table_connector_ctx = Some(DropTableConnectorContext {
2083 to_change_streaming_job_id: id,
2086 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
2088 });
2089 } else if stream_job.is_materialized_view() {
2090 let old_fragments_upstreams = self
2093 .metadata_manager
2094 .catalog_controller
2095 .upstream_fragments(old_fragments.fragment_ids())
2096 .await?;
2097
2098 let old_state_graph =
2099 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2100 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2101 let result = state_match::match_graph(&new_state_graph, &old_state_graph)
2102 .context("incompatible altering on the streaming job states")?;
2103
2104 fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2105 fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2106 } else {
2107 let old_internal_tables = self
2110 .metadata_manager
2111 .get_table_catalog_by_ids(&old_internal_table_ids)
2112 .await?;
2113 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2114 }
2115
2116 let original_root_fragment = old_fragments
2119 .root_fragment()
2120 .expect("root fragment not found");
2121
2122 let job_type = StreamingJobType::from(stream_job);
2123
2124 let mut downstream_fragments = self.metadata_manager.get_downstream_fragments(id).await?;
2126
2127 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2128 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2129 .iter()
2130 .map(|sink| sink.original_fragment.fragment_id)
2131 .collect();
2132 for (_, downstream_fragment) in &mut downstream_fragments {
2133 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2134 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2135 }) {
2136 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2137 *downstream_fragment = sink.new_fragment.clone();
2140 }
2141 }
2142 assert!(remaining_fragment.is_empty());
2143 }
2144
2145 let complete_graph = match &job_type {
2147 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2148 CompleteStreamFragmentGraph::with_downstreams(
2149 fragment_graph,
2150 FragmentGraphDownstreamContext {
2151 original_root_fragment_id: original_root_fragment.fragment_id,
2152 downstream_fragments,
2153 },
2154 job_type,
2155 )?
2156 }
2157 StreamingJobType::Table(TableJobType::SharedCdcSource)
2158 | StreamingJobType::MaterializedView => {
2159 let upstream_root_fragments = self
2161 .metadata_manager
2162 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2163 .await?;
2164
2165 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2166 fragment_graph,
2167 FragmentGraphUpstreamContext {
2168 upstream_root_fragments,
2169 },
2170 FragmentGraphDownstreamContext {
2171 original_root_fragment_id: original_root_fragment.fragment_id,
2172 downstream_fragments,
2173 },
2174 job_type,
2175 )?
2176 }
2177 _ => unreachable!(),
2178 };
2179
2180 let resource_group = self
2181 .metadata_manager
2182 .get_database_resource_group(stream_job.database_id())
2183 .await?;
2184
2185 let parallelism = NonZeroUsize::new(match old_fragments.assigned_parallelism {
2188 TableParallelism::Fixed(n) => n,
2189 TableParallelism::Adaptive | TableParallelism::Custom => 1,
2190 })
2191 .expect("The number of actors in the original table fragment should be greater than 0");
2192
2193 let actor_graph_builder = ActorGraphBuilder::new(id, complete_graph, parallelism)?;
2194
2195 let ActorGraphBuildResult {
2196 graph,
2197 downstream_fragment_relations,
2198 upstream_fragment_downstreams,
2199 mut replace_upstream,
2200 } = actor_graph_builder.generate_graph()?;
2201
2202 if matches!(
2204 job_type,
2205 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2206 ) {
2207 assert!(upstream_fragment_downstreams.is_empty());
2208 }
2209
2210 let stream_job_fragments = StreamJobFragments::new(
2214 tmp_job_id,
2215 graph,
2216 stream_ctx,
2217 old_fragments.assigned_parallelism,
2218 old_fragments.max_parallelism,
2219 );
2220
2221 if let Some(sinks) = &auto_refresh_schema_sinks {
2222 for sink in sinks {
2223 replace_upstream
2224 .remove(&sink.new_fragment.fragment_id)
2225 .expect("should exist");
2226 }
2227 }
2228
2229 let ctx = ReplaceStreamJobContext {
2233 old_fragments,
2234 replace_upstream,
2235 upstream_fragment_downstreams,
2236 streaming_job: stream_job.clone(),
2237 database_resource_group: resource_group,
2238 tmp_id: tmp_job_id,
2239 drop_table_connector_ctx,
2240 auto_refresh_schema_sinks,
2241 streaming_job_model,
2242 };
2243
2244 Ok((
2245 ctx,
2246 StreamJobFragmentsToCreate {
2247 inner: stream_job_fragments,
2248 downstreams: downstream_fragment_relations,
2249 },
2250 ))
2251 }
2252
2253 async fn alter_name(
2254 &self,
2255 relation: alter_name_request::Object,
2256 new_name: &str,
2257 ) -> MetaResult<NotificationVersion> {
2258 let (obj_type, id): (ObjectType, ObjectId) = match relation {
2259 alter_name_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2260 alter_name_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2261 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id.into()),
2262 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2263 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2264 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2265 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2266 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2267 };
2268 self.metadata_manager
2269 .catalog_controller
2270 .alter_name(obj_type, id, new_name)
2271 .await
2272 }
2273
2274 async fn alter_swap_rename(
2275 &self,
2276 object: alter_swap_rename_request::Object,
2277 ) -> MetaResult<NotificationVersion> {
2278 let (obj_type, src_id, dst_id) = match object {
2279 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2280 alter_swap_rename_request::Object::Table(objs) => {
2281 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2282 (ObjectType::Table, src_id, dst_id)
2283 }
2284 alter_swap_rename_request::Object::View(objs) => {
2285 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2286 (ObjectType::View, src_id, dst_id)
2287 }
2288 alter_swap_rename_request::Object::Source(objs) => {
2289 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2290 (ObjectType::Source, src_id, dst_id)
2291 }
2292 alter_swap_rename_request::Object::Sink(objs) => {
2293 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2294 (ObjectType::Sink, src_id, dst_id)
2295 }
2296 alter_swap_rename_request::Object::Subscription(objs) => {
2297 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2298 (ObjectType::Subscription, src_id, dst_id)
2299 }
2300 };
2301
2302 self.metadata_manager
2303 .catalog_controller
2304 .alter_swap_rename(obj_type, src_id, dst_id)
2305 .await
2306 }
2307
2308 async fn alter_owner(
2309 &self,
2310 object: Object,
2311 owner_id: UserId,
2312 ) -> MetaResult<NotificationVersion> {
2313 let (obj_type, id): (ObjectType, ObjectId) = match object {
2314 Object::TableId(id) => (ObjectType::Table, id.into()),
2315 Object::ViewId(id) => (ObjectType::View, id.into()),
2316 Object::SourceId(id) => (ObjectType::Source, id.into()),
2317 Object::SinkId(id) => (ObjectType::Sink, id.into()),
2318 Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2319 Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2320 Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2321 Object::ConnectionId(id) => (ObjectType::Connection, id.into()),
2322 Object::FunctionId(id) => (ObjectType::Function, id.into()),
2323 Object::SecretId(id) => (ObjectType::Secret, id.into()),
2324 };
2325 self.metadata_manager
2326 .catalog_controller
2327 .alter_owner(obj_type, id, owner_id as _)
2328 .await
2329 }
2330
2331 async fn alter_set_schema(
2332 &self,
2333 object: alter_set_schema_request::Object,
2334 new_schema_id: SchemaId,
2335 ) -> MetaResult<NotificationVersion> {
2336 let (obj_type, id): (ObjectType, ObjectId) = match object {
2337 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2338 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2339 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2340 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2341 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id.into()),
2342 alter_set_schema_request::Object::ConnectionId(id) => {
2343 (ObjectType::Connection, id.into())
2344 }
2345 alter_set_schema_request::Object::SubscriptionId(id) => {
2346 (ObjectType::Subscription, id.into())
2347 }
2348 };
2349 self.metadata_manager
2350 .catalog_controller
2351 .alter_schema(obj_type, id, new_schema_id)
2352 .await
2353 }
2354
2355 pub async fn wait(&self, job_id: Option<JobId>) -> MetaResult<WaitVersion> {
2356 if let Some(job_id) = job_id {
2357 let database_id = self
2358 .metadata_manager
2359 .catalog_controller
2360 .get_object_database_id(job_id)
2361 .await?;
2362 let catalog_version = self
2363 .metadata_manager
2364 .wait_streaming_job_finished(database_id, job_id)
2365 .await?;
2366 let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2367 return Ok(WaitVersion {
2368 catalog_version,
2369 hummock_version_id,
2370 });
2371 }
2372
2373 let timeout_ms = 2 * 60 * 60 * 1000;
2374 let poll_interval = Duration::from_millis(100);
2375 for _ in 0..(timeout_ms / poll_interval.as_millis() as usize) {
2376 let background_jobs = self
2377 .metadata_manager
2378 .catalog_controller
2379 .list_background_creating_jobs(true, None)
2380 .await?;
2381 if background_jobs.is_empty() {
2382 let catalog_version = self
2383 .metadata_manager
2384 .catalog_controller
2385 .notify_frontend_trivial()
2386 .await;
2387 let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2388 return Ok(WaitVersion {
2389 catalog_version,
2390 hummock_version_id,
2391 });
2392 }
2393
2394 sleep(poll_interval).await;
2395 }
2396 Err(MetaError::cancelled(format!(
2397 "timeout after {timeout_ms}ms"
2398 )))
2399 }
2400
2401 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2402 self.metadata_manager
2403 .catalog_controller
2404 .comment_on(comment)
2405 .await
2406 }
2407
2408 async fn alter_streaming_job_config(
2409 &self,
2410 job_id: JobId,
2411 entries_to_add: HashMap<String, String>,
2412 keys_to_remove: Vec<String>,
2413 ) -> MetaResult<NotificationVersion> {
2414 self.metadata_manager
2415 .catalog_controller
2416 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2417 .await
2418 }
2419}
2420
2421fn report_create_object(
2422 job_id: JobId,
2423 event_name: &str,
2424 obj_type: PbTelemetryDatabaseObject,
2425 connector_name: Option<String>,
2426 attr_info: Option<jsonbb::Value>,
2427) {
2428 report_event(
2429 PbTelemetryEventStage::CreateStreamJob,
2430 event_name,
2431 job_id.as_raw_id() as _,
2432 connector_name,
2433 Some(obj_type),
2434 attr_info,
2435 );
2436}
2437
2438pub fn build_upstream_sink_info(
2439 sink_id: SinkId,
2440 original_target_columns: Vec<PbColumnCatalog>,
2441 sink_fragment_id: FragmentId,
2442 target_table: &PbTable,
2443 target_fragment_id: FragmentId,
2444) -> MetaResult<UpstreamSinkInfo> {
2445 let sink_columns = if !original_target_columns.is_empty() {
2446 original_target_columns.clone()
2447 } else {
2448 target_table.columns.clone()
2453 };
2454
2455 let sink_output_fields = sink_columns
2456 .iter()
2457 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2458 .collect_vec();
2459 let output_indices = (0..sink_output_fields.len())
2460 .map(|i| i as u32)
2461 .collect_vec();
2462
2463 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2464 let sink_idx_by_col_id = sink_columns
2465 .iter()
2466 .enumerate()
2467 .map(|(idx, col)| {
2468 let column_id = col.column_desc.as_ref().unwrap().column_id;
2469 (column_id, idx as u32)
2470 })
2471 .collect::<HashMap<_, _>>();
2472 target_table
2473 .distribution_key
2474 .iter()
2475 .map(|dist_idx| {
2476 let column_id = target_table.columns[*dist_idx as usize]
2477 .column_desc
2478 .as_ref()
2479 .unwrap()
2480 .column_id;
2481 let sink_idx = sink_idx_by_col_id
2482 .get(&column_id)
2483 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2484 Ok(*sink_idx)
2485 })
2486 .collect::<anyhow::Result<Vec<_>>>()?
2487 };
2488 let dist_key_indices =
2489 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2490 let downstream_fragment_id = target_fragment_id as _;
2491 let new_downstream_relation = DownstreamFragmentRelation {
2492 downstream_fragment_id,
2493 dispatcher_type: DispatcherType::Hash,
2494 dist_key_indices,
2495 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2496 };
2497 let current_target_columns = target_table.get_columns();
2498 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2499 Ok(UpstreamSinkInfo {
2500 sink_id,
2501 sink_fragment_id: sink_fragment_id as _,
2502 sink_output_fields,
2503 sink_original_target_columns: original_target_columns,
2504 project_exprs,
2505 new_sink_downstream: new_downstream_relation,
2506 })
2507}
2508
2509pub fn refill_upstream_sink_union_in_table(
2510 union_fragment_root: &mut PbStreamNode,
2511 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2512) {
2513 visit_stream_node_cont_mut(union_fragment_root, |node| {
2514 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2515 let init_upstreams = upstream_sink_infos
2516 .iter()
2517 .map(|info| PbUpstreamSinkInfo {
2518 upstream_fragment_id: info.sink_fragment_id,
2519 sink_output_schema: info.sink_output_fields.clone(),
2520 project_exprs: info.project_exprs.clone(),
2521 })
2522 .collect();
2523 upstream_sink_union.init_upstreams = init_upstreams;
2524 false
2525 } else {
2526 true
2527 }
2528 });
2529}
2530
2531#[cfg(test)]
2532mod tests {
2533 use std::num::NonZeroUsize;
2534
2535 use super::*;
2536
2537 #[test]
2538 fn test_specified_parallelism_exceeds_available() {
2539 let result = DdlController::resolve_stream_parallelism_inner(
2540 Some(NonZeroUsize::new(100).unwrap()),
2541 NonZeroUsize::new(256).unwrap(),
2542 Some(NonZeroUsize::new(4).unwrap()),
2543 &DefaultParallelism::Full,
2544 "default",
2545 )
2546 .unwrap();
2547 assert_eq!(result.get(), 100);
2548 }
2549
2550 #[test]
2551 fn test_allows_default_parallelism_over_available() {
2552 let result = DdlController::resolve_stream_parallelism_inner(
2553 None,
2554 NonZeroUsize::new(256).unwrap(),
2555 Some(NonZeroUsize::new(4).unwrap()),
2556 &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2557 "default",
2558 )
2559 .unwrap();
2560 assert_eq!(result.get(), 50);
2561 }
2562
2563 #[test]
2564 fn test_full_parallelism_capped_by_max() {
2565 let result = DdlController::resolve_stream_parallelism_inner(
2566 None,
2567 NonZeroUsize::new(6).unwrap(),
2568 Some(NonZeroUsize::new(10).unwrap()),
2569 &DefaultParallelism::Full,
2570 "default",
2571 )
2572 .unwrap();
2573 assert_eq!(result.get(), 6);
2574 }
2575
2576 #[test]
2577 fn test_no_available_slots_returns_error() {
2578 let result = DdlController::resolve_stream_parallelism_inner(
2579 None,
2580 NonZeroUsize::new(4).unwrap(),
2581 None,
2582 &DefaultParallelism::Full,
2583 "default",
2584 );
2585 assert!(matches!(
2586 result,
2587 Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2588 ));
2589 }
2590
2591 #[test]
2592 fn test_specified_over_max_returns_error() {
2593 let result = DdlController::resolve_stream_parallelism_inner(
2594 Some(NonZeroUsize::new(8).unwrap()),
2595 NonZeroUsize::new(4).unwrap(),
2596 Some(NonZeroUsize::new(10).unwrap()),
2597 &DefaultParallelism::Full,
2598 "default",
2599 );
2600 assert!(matches!(
2601 result,
2602 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2603 ));
2604 }
2605}