1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50 Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
51 Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57 alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64 StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::{OwnedSemaphorePermit, Semaphore};
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::cluster::StreamingClusterInfo;
76use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
77use crate::controller::utils::build_select_node_list;
78use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
79use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
80use crate::manager::sink_coordination::SinkCoordinatorManager;
81use crate::manager::{
82 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
83 NotificationVersion, StreamingJob, StreamingJobType,
84};
85use crate::model::{
86 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
87 FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
88 TableParallelism,
89};
90use crate::stream::cdc::{
91 parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
92};
93use crate::stream::{
94 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
95 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
96 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
97 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
98 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
99 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
100};
101use crate::telemetry::report_event;
102use crate::{MetaError, MetaResult};
103
104#[derive(PartialEq)]
105pub enum DropMode {
106 Restrict,
107 Cascade,
108}
109
110impl DropMode {
111 pub fn from_request_setting(cascade: bool) -> DropMode {
112 if cascade {
113 DropMode::Cascade
114 } else {
115 DropMode::Restrict
116 }
117 }
118}
119
120#[derive(strum::AsRefStr)]
121pub enum StreamingJobId {
122 MaterializedView(TableId),
123 Sink(SinkId),
124 Table(Option<SourceId>, TableId),
125 Index(IndexId),
126}
127
128impl std::fmt::Display for StreamingJobId {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 write!(f, "{}", self.as_ref())?;
131 write!(f, "({})", self.id())
132 }
133}
134
135impl StreamingJobId {
136 fn id(&self) -> JobId {
137 match self {
138 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
139 StreamingJobId::Index(id) => id.as_job_id(),
140 StreamingJobId::Sink(id) => id.as_job_id(),
141 }
142 }
143}
144
145pub struct ReplaceStreamJobInfo {
148 pub streaming_job: StreamingJob,
149 pub fragment_graph: StreamFragmentGraphProto,
150}
151
152#[derive(Display)]
153pub enum DdlCommand {
154 CreateDatabase(Database),
155 DropDatabase(DatabaseId),
156 CreateSchema(Schema),
157 DropSchema(SchemaId, DropMode),
158 CreateNonSharedSource(Source),
159 DropSource(SourceId, DropMode),
160 ResetSource(SourceId),
161 CreateFunction(Function),
162 DropFunction(FunctionId, DropMode),
163 CreateView(View, HashSet<ObjectId>),
164 DropView(ViewId, DropMode),
165 CreateStreamingJob {
166 stream_job: StreamingJob,
167 fragment_graph: StreamFragmentGraphProto,
168 dependencies: HashSet<ObjectId>,
169 resource_type: streaming_job_resource_type::ResourceType,
170 if_not_exists: bool,
171 },
172 DropStreamingJob {
173 job_id: StreamingJobId,
174 drop_mode: DropMode,
175 },
176 AlterName(alter_name_request::Object, String),
177 AlterSwapRename(alter_swap_rename_request::Object),
178 ReplaceStreamJob(ReplaceStreamJobInfo),
179 AlterNonSharedSource(Source),
180 AlterObjectOwner(Object, UserId),
181 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
182 CreateConnection(Connection),
183 DropConnection(ConnectionId, DropMode),
184 CreateSecret(Secret),
185 AlterSecret(Secret),
186 DropSecret(SecretId, DropMode),
187 CommentOn(Comment),
188 CreateSubscription(Subscription),
189 DropSubscription(SubscriptionId, DropMode),
190 AlterSubscriptionRetention {
191 subscription_id: SubscriptionId,
192 retention_seconds: u64,
193 definition: String,
194 },
195 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
196 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
197}
198
199impl DdlCommand {
200 fn object(&self) -> Either<String, ObjectId> {
202 use Either::*;
203 match self {
204 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
205 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
206 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
207 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
208 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
209 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
210 DdlCommand::ResetSource(id) => Right(id.as_object_id()),
211 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
212 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
213 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
214 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
215 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
216 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
217 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
218 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
219 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
220 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
221 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
222 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
223 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
224 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
225 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
226 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
227 DdlCommand::DropSecret(id, _) => Right(id.as_object_id()),
228 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
229 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
230 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
231 DdlCommand::AlterSubscriptionRetention {
232 subscription_id, ..
233 } => Right(subscription_id.as_object_id()),
234 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
235 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
236 }
237 }
238
239 fn allow_in_recovery(&self) -> bool {
240 match self {
241 DdlCommand::DropDatabase(_)
242 | DdlCommand::DropSchema(_, _)
243 | DdlCommand::DropSource(_, _)
244 | DdlCommand::DropFunction(_, _)
245 | DdlCommand::DropView(_, _)
246 | DdlCommand::DropStreamingJob { .. }
247 | DdlCommand::DropConnection(_, _)
248 | DdlCommand::DropSecret(_, _)
249 | DdlCommand::DropSubscription(_, _)
250 | DdlCommand::AlterName(_, _)
251 | DdlCommand::AlterObjectOwner(_, _)
252 | DdlCommand::AlterSetSchema(_, _)
253 | DdlCommand::CreateDatabase(_)
254 | DdlCommand::CreateSchema(_)
255 | DdlCommand::CreateFunction(_)
256 | DdlCommand::CreateView(_, _)
257 | DdlCommand::CreateConnection(_)
258 | DdlCommand::CommentOn(_)
259 | DdlCommand::CreateSecret(_)
260 | DdlCommand::AlterSecret(_)
261 | DdlCommand::AlterSwapRename(_)
262 | DdlCommand::AlterDatabaseParam(_, _)
263 | DdlCommand::AlterStreamingJobConfig(_, _, _)
264 | DdlCommand::AlterSubscriptionRetention { .. } => true,
265 DdlCommand::CreateStreamingJob { .. }
266 | DdlCommand::CreateNonSharedSource(_)
267 | DdlCommand::ReplaceStreamJob(_)
268 | DdlCommand::AlterNonSharedSource(_)
269 | DdlCommand::ResetSource(_)
270 | DdlCommand::CreateSubscription(_) => false,
271 }
272 }
273}
274
275#[derive(Clone)]
276pub struct DdlController {
277 pub(crate) env: MetaSrvEnv,
278
279 pub(crate) metadata_manager: MetadataManager,
280 pub(crate) stream_manager: GlobalStreamManagerRef,
281 pub(crate) source_manager: SourceManagerRef,
282 barrier_manager: BarrierManagerRef,
283 sink_manager: SinkCoordinatorManager,
284 iceberg_compaction_manager: IcebergCompactionManagerRef,
285
286 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
288
289 seq: Arc<AtomicU64>,
291}
292
293#[derive(Clone)]
294pub struct CreatingStreamingJobPermit {
295 pub(crate) semaphore: Arc<Semaphore>,
296}
297
298impl CreatingStreamingJobPermit {
299 async fn new(env: &MetaSrvEnv) -> Self {
300 let mut permits = env
301 .system_params_reader()
302 .await
303 .max_concurrent_creating_streaming_jobs() as usize;
304 if permits == 0 {
305 permits = Semaphore::MAX_PERMITS;
307 }
308 let semaphore = Arc::new(Semaphore::new(permits));
309
310 let (local_notification_tx, mut local_notification_rx) =
311 tokio::sync::mpsc::unbounded_channel();
312 env.notification_manager()
313 .insert_local_sender(local_notification_tx);
314 let semaphore_clone = semaphore.clone();
315 tokio::spawn(async move {
316 while let Some(notification) = local_notification_rx.recv().await {
317 let LocalNotification::SystemParamsChange(p) = ¬ification else {
318 continue;
319 };
320 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
321 if new_permits == 0 {
322 new_permits = Semaphore::MAX_PERMITS;
323 }
324 match permits.cmp(&new_permits) {
325 Ordering::Less => {
326 semaphore_clone.add_permits(new_permits - permits);
327 }
328 Ordering::Equal => continue,
329 Ordering::Greater => {
330 let to_release = permits - new_permits;
331 let reduced = semaphore_clone.forget_permits(to_release);
332 if reduced != to_release {
334 tracing::warn!(
335 "no enough permits to release, expected {}, but reduced {}",
336 to_release,
337 reduced
338 );
339 }
340 }
341 }
342 tracing::info!(
343 "max_concurrent_creating_streaming_jobs changed from {} to {}",
344 permits,
345 new_permits
346 );
347 permits = new_permits;
348 }
349 });
350
351 Self { semaphore }
352 }
353}
354
355impl DdlController {
356 pub async fn new(
357 env: MetaSrvEnv,
358 metadata_manager: MetadataManager,
359 stream_manager: GlobalStreamManagerRef,
360 source_manager: SourceManagerRef,
361 barrier_manager: BarrierManagerRef,
362 sink_manager: SinkCoordinatorManager,
363 iceberg_compaction_manager: IcebergCompactionManagerRef,
364 ) -> Self {
365 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
366 Self {
367 env,
368 metadata_manager,
369 stream_manager,
370 source_manager,
371 barrier_manager,
372 sink_manager,
373 iceberg_compaction_manager,
374 creating_streaming_job_permits,
375 seq: Arc::new(AtomicU64::new(0)),
376 }
377 }
378
379 pub fn next_seq(&self) -> u64 {
381 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
383 }
384
385 #[allow(clippy::large_stack_frames)]
392 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
393 if !command.allow_in_recovery() {
394 self.barrier_manager.check_status_running()?;
395 }
396
397 let await_tree_key = format!("DDL Command {}", self.next_seq());
398 let await_tree_span = await_tree::span!("{command}({})", command.object());
399
400 let ctrl = self.clone();
401 let fut = Box::pin(async move {
402 match command {
403 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
404 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
405 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
406 DdlCommand::DropSchema(schema_id, drop_mode) => {
407 ctrl.drop_schema(schema_id, drop_mode).await
408 }
409 DdlCommand::CreateNonSharedSource(source) => {
410 ctrl.create_non_shared_source(source).await
411 }
412 DdlCommand::DropSource(source_id, drop_mode) => {
413 ctrl.drop_source(source_id, drop_mode).await
414 }
415 DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
416 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
417 DdlCommand::DropFunction(function_id, drop_mode) => {
418 ctrl.drop_function(function_id, drop_mode).await
419 }
420 DdlCommand::CreateView(view, dependencies) => {
421 ctrl.create_view(view, dependencies).await
422 }
423 DdlCommand::DropView(view_id, drop_mode) => {
424 ctrl.drop_view(view_id, drop_mode).await
425 }
426 DdlCommand::CreateStreamingJob {
427 stream_job,
428 fragment_graph,
429 dependencies,
430 resource_type,
431 if_not_exists,
432 } => {
433 ctrl.create_streaming_job(
434 stream_job,
435 fragment_graph,
436 dependencies,
437 resource_type,
438 if_not_exists,
439 )
440 .await
441 }
442 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
443 ctrl.drop_streaming_job(job_id, drop_mode).await
444 }
445 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
446 streaming_job,
447 fragment_graph,
448 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
449 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
450 DdlCommand::AlterObjectOwner(object, owner_id) => {
451 ctrl.alter_owner(object, owner_id).await
452 }
453 DdlCommand::AlterSetSchema(object, new_schema_id) => {
454 ctrl.alter_set_schema(object, new_schema_id).await
455 }
456 DdlCommand::CreateConnection(connection) => {
457 ctrl.create_connection(connection).await
458 }
459 DdlCommand::DropConnection(connection_id, drop_mode) => {
460 ctrl.drop_connection(connection_id, drop_mode).await
461 }
462 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
463 DdlCommand::DropSecret(secret_id, drop_mode) => {
464 ctrl.drop_secret(secret_id, drop_mode).await
465 }
466 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
467 DdlCommand::AlterNonSharedSource(source) => {
468 ctrl.alter_non_shared_source(source).await
469 }
470 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
471 DdlCommand::CreateSubscription(subscription) => {
472 ctrl.create_subscription(subscription).await
473 }
474 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
475 ctrl.drop_subscription(subscription_id, drop_mode).await
476 }
477 DdlCommand::AlterSubscriptionRetention {
478 subscription_id,
479 retention_seconds,
480 definition,
481 } => {
482 ctrl.alter_subscription_retention(
483 subscription_id,
484 retention_seconds,
485 definition,
486 )
487 .await
488 }
489 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
490 DdlCommand::AlterDatabaseParam(database_id, param) => {
491 ctrl.alter_database_param(database_id, param).await
492 }
493 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
494 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
495 .await
496 }
497 }
498 })
499 .in_current_span();
500 let fut = (self.env.await_tree_reg())
501 .register(await_tree_key, await_tree_span)
502 .instrument(Box::pin(fut));
503 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
504 Ok(Some(WaitVersion {
505 catalog_version: notification_version,
506 hummock_version_id: self.barrier_manager.get_hummock_version_id().await,
507 }))
508 }
509
510 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
511 self.barrier_manager.get_ddl_progress().await
512 }
513
514 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
515 let (version, updated_db) = self
516 .metadata_manager
517 .catalog_controller
518 .create_database(database)
519 .await?;
520 self.barrier_manager
522 .update_database_barrier(
523 updated_db.database_id,
524 updated_db.barrier_interval_ms.map(|v| v as u32),
525 updated_db.checkpoint_frequency.map(|v| v as u64),
526 )
527 .await?;
528 Ok(version)
529 }
530
531 #[tracing::instrument(skip(self), level = "debug")]
532 pub async fn reschedule_streaming_job(
533 &self,
534 job_id: JobId,
535 target: ReschedulePolicy,
536 mut deferred: bool,
537 ) -> MetaResult<()> {
538 tracing::info!("altering parallelism for job {}", job_id);
539 if self.barrier_manager.check_status_running().is_err() {
540 tracing::info!(
541 "alter parallelism is set to deferred mode because the system is in recovery state"
542 );
543 deferred = true;
544 }
545
546 self.stream_manager
547 .reschedule_streaming_job(job_id, target, deferred)
548 .await
549 }
550
551 pub async fn reschedule_streaming_job_backfill_parallelism(
552 &self,
553 job_id: JobId,
554 parallelism: Option<StreamingParallelism>,
555 mut deferred: bool,
556 ) -> MetaResult<()> {
557 tracing::info!("altering backfill parallelism for job {}", job_id);
558 if self.barrier_manager.check_status_running().is_err() {
559 tracing::info!(
560 "alter backfill parallelism is set to deferred mode because the system is in recovery state"
561 );
562 deferred = true;
563 }
564
565 self.stream_manager
566 .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
567 .await
568 }
569
570 pub async fn reschedule_cdc_table_backfill(
571 &self,
572 job_id: JobId,
573 target: ReschedulePolicy,
574 ) -> MetaResult<()> {
575 tracing::info!("alter CDC table backfill parallelism");
576 if self.barrier_manager.check_status_running().is_err() {
577 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
578 }
579 self.stream_manager
580 .reschedule_cdc_table_backfill(job_id, target)
581 .await
582 }
583
584 pub async fn reschedule_fragments(
585 &self,
586 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
587 ) -> MetaResult<()> {
588 tracing::info!(
589 "altering parallelism for fragments {:?}",
590 fragment_targets.keys()
591 );
592 let fragment_targets = fragment_targets
593 .into_iter()
594 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
595 .collect();
596
597 self.stream_manager
598 .reschedule_fragments(fragment_targets)
599 .await
600 }
601
602 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
603 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
604 .await
605 }
606
607 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
608 self.metadata_manager
609 .catalog_controller
610 .create_schema(schema)
611 .await
612 }
613
614 async fn drop_schema(
615 &self,
616 schema_id: SchemaId,
617 drop_mode: DropMode,
618 ) -> MetaResult<NotificationVersion> {
619 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
620 .await
621 }
622
623 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
625 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
626 .await
627 .context("failed to create source worker")?;
628
629 let (source_id, version) = self
630 .metadata_manager
631 .catalog_controller
632 .create_source(source)
633 .await?;
634 self.source_manager
635 .register_source_with_handle(source_id, handle)
636 .await;
637 Ok(version)
638 }
639
640 async fn drop_source(
641 &self,
642 source_id: SourceId,
643 drop_mode: DropMode,
644 ) -> MetaResult<NotificationVersion> {
645 self.drop_object(ObjectType::Source, source_id, drop_mode)
646 .await
647 }
648
649 async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
650 tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
651
652 let database_id = self
654 .metadata_manager
655 .catalog_controller
656 .get_object_database_id(source_id)
657 .await?;
658
659 self.stream_manager
660 .barrier_scheduler
661 .run_command(database_id, Command::ResetSource { source_id })
662 .await?;
663
664 let version = self
666 .metadata_manager
667 .catalog_controller
668 .notify_frontend_trivial()
669 .await;
670 Ok(version)
671 }
672
673 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
676 self.metadata_manager
677 .catalog_controller
678 .alter_non_shared_source(source)
679 .await
680 }
681
682 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
683 self.metadata_manager
684 .catalog_controller
685 .create_function(function)
686 .await
687 }
688
689 async fn drop_function(
690 &self,
691 function_id: FunctionId,
692 drop_mode: DropMode,
693 ) -> MetaResult<NotificationVersion> {
694 self.drop_object(ObjectType::Function, function_id, drop_mode)
695 .await
696 }
697
698 async fn create_view(
699 &self,
700 view: View,
701 dependencies: HashSet<ObjectId>,
702 ) -> MetaResult<NotificationVersion> {
703 self.metadata_manager
704 .catalog_controller
705 .create_view(view, dependencies)
706 .await
707 }
708
709 async fn drop_view(
710 &self,
711 view_id: ViewId,
712 drop_mode: DropMode,
713 ) -> MetaResult<NotificationVersion> {
714 self.drop_object(ObjectType::View, view_id, drop_mode).await
715 }
716
717 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
718 validate_connection(&connection).await?;
719 self.metadata_manager
720 .catalog_controller
721 .create_connection(connection)
722 .await
723 }
724
725 async fn drop_connection(
726 &self,
727 connection_id: ConnectionId,
728 drop_mode: DropMode,
729 ) -> MetaResult<NotificationVersion> {
730 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
731 .await
732 }
733
734 async fn alter_database_param(
735 &self,
736 database_id: DatabaseId,
737 param: AlterDatabaseParam,
738 ) -> MetaResult<NotificationVersion> {
739 let (version, updated_db) = self
740 .metadata_manager
741 .catalog_controller
742 .alter_database_param(database_id, param)
743 .await?;
744 self.barrier_manager
746 .update_database_barrier(
747 database_id,
748 updated_db.barrier_interval_ms.map(|v| v as u32),
749 updated_db.checkpoint_frequency.map(|v| v as u64),
750 )
751 .await?;
752 Ok(version)
753 }
754
755 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
758 let secret_store_private_key = self
759 .env
760 .opts
761 .secret_store_private_key
762 .clone()
763 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
764
765 let encrypted_payload = SecretEncryption::encrypt(
766 secret_store_private_key.as_slice(),
767 secret.get_value().as_slice(),
768 )
769 .context(format!("failed to encrypt secret {}", secret.name))?;
770 Ok(encrypted_payload
771 .serialize()
772 .context(format!("failed to serialize secret {}", secret.name))?)
773 }
774
775 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
776 let secret_plain_payload = secret.value.clone();
779 let encrypted_payload = self.get_encrypted_payload(&secret)?;
780 secret.value = encrypted_payload;
781
782 self.metadata_manager
783 .catalog_controller
784 .create_secret(secret, secret_plain_payload)
785 .await
786 }
787
788 async fn drop_secret(
789 &self,
790 secret_id: SecretId,
791 drop_mode: DropMode,
792 ) -> MetaResult<NotificationVersion> {
793 self.drop_object(ObjectType::Secret, secret_id, drop_mode)
794 .await
795 }
796
797 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
798 let secret_plain_payload = secret.value.clone();
799 let encrypted_payload = self.get_encrypted_payload(&secret)?;
800 secret.value = encrypted_payload;
801 self.metadata_manager
802 .catalog_controller
803 .alter_secret(secret, secret_plain_payload)
804 .await
805 }
806
807 async fn create_subscription(
808 &self,
809 mut subscription: Subscription,
810 ) -> MetaResult<NotificationVersion> {
811 tracing::debug!("create subscription");
812 let _permit = self
813 .creating_streaming_job_permits
814 .semaphore
815 .acquire()
816 .await
817 .unwrap();
818 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
819 self.metadata_manager
820 .catalog_controller
821 .create_subscription_catalog(&mut subscription)
822 .await?;
823 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
824 tracing::debug!(error = %err.as_report(), "failed to create subscription");
825 let _ = self
826 .metadata_manager
827 .catalog_controller
828 .try_abort_creating_subscription(subscription.id)
829 .await
830 .inspect_err(|e| {
831 tracing::error!(
832 error = %e.as_report(),
833 "failed to abort create subscription after failure"
834 );
835 });
836 return Err(err);
837 }
838
839 let version = self
840 .metadata_manager
841 .catalog_controller
842 .notify_create_subscription(subscription.id)
843 .await?;
844 tracing::debug!("finish create subscription");
845 Ok(version)
846 }
847
848 async fn drop_subscription(
849 &self,
850 subscription_id: SubscriptionId,
851 drop_mode: DropMode,
852 ) -> MetaResult<NotificationVersion> {
853 tracing::debug!("preparing drop subscription");
854 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
855 let subscription = self
856 .metadata_manager
857 .catalog_controller
858 .get_subscription_by_id(subscription_id)
859 .await?;
860 let table_id = subscription.dependent_table_id;
861 let database_id = subscription.database_id;
862 let (_, version) = self
863 .metadata_manager
864 .catalog_controller
865 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
866 .await?;
867 self.stream_manager
868 .drop_subscription(database_id, subscription_id, table_id)
869 .await;
870 tracing::debug!("finish drop subscription");
871 Ok(version)
872 }
873
874 async fn alter_subscription_retention(
875 &self,
876 subscription_id: SubscriptionId,
877 retention_seconds: u64,
878 definition: String,
879 ) -> MetaResult<NotificationVersion> {
880 tracing::debug!("alter subscription retention");
881 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
882 let (version, subscription) = self
883 .metadata_manager
884 .catalog_controller
885 .alter_subscription_retention(subscription_id, retention_seconds, definition)
886 .await?;
887 self.stream_manager
888 .alter_subscription_retention(
889 subscription.database_id,
890 subscription.id,
891 subscription.dependent_table_id,
892 subscription.retention_seconds,
893 )
894 .await?;
895 tracing::debug!("finish alter subscription retention");
896 Ok(version)
897 }
898
899 #[await_tree::instrument]
901 pub(crate) async fn validate_cdc_table(
902 &self,
903 table: &Table,
904 table_fragments: &StreamJobFragments,
905 ) -> MetaResult<()> {
906 let stream_scan_fragment = table_fragments
907 .fragments
908 .values()
909 .filter(|f| {
910 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
911 || f.fragment_type_mask
912 .contains(FragmentTypeFlag::StreamCdcScan)
913 })
914 .exactly_one()
915 .ok()
916 .with_context(|| {
917 format!(
918 "expect exactly one stream scan fragment, got: {:?}",
919 table_fragments.fragments
920 )
921 })?;
922 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
923 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
924 if let Some(o) = node.options
925 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
926 {
927 } else {
929 assert_eq!(
930 stream_scan_fragment.actors.len(),
931 1,
932 "Stream scan fragment should have only one actor"
933 );
934 }
935 }
936 }
937 let mut found_cdc_scan = false;
938 match &stream_scan_fragment.nodes.node_body {
939 Some(NodeBody::StreamCdcScan(_)) => {
940 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
941 if self
942 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
943 .await?
944 {
945 found_cdc_scan = true;
946 }
947 }
948 Some(NodeBody::Project(_)) => {
950 for input in &stream_scan_fragment.nodes.input {
951 assert_parallelism(stream_scan_fragment, &input.node_body);
952 if self
953 .validate_cdc_table_inner(&input.node_body, table.id)
954 .await?
955 {
956 found_cdc_scan = true;
957 }
958 }
959 }
960 _ => {
961 bail!("Unexpected node body for stream cdc scan");
962 }
963 };
964 if !found_cdc_scan {
965 bail!("No stream cdc scan node found in stream scan fragment");
966 }
967 Ok(())
968 }
969
970 async fn validate_cdc_table_inner(
971 &self,
972 node_body: &Option<NodeBody>,
973 table_id: TableId,
974 ) -> MetaResult<bool> {
975 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
976 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
977 {
978 let options_with_secret = WithOptionsSecResolved::new(
979 cdc_table_desc.connect_properties.clone(),
980 cdc_table_desc.secret_refs.clone(),
981 );
982
983 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
984 props.init_from_pb_cdc_table_desc(cdc_table_desc);
985
986 let _enumerator = props
988 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
989 .await?;
990
991 tracing::debug!(?table_id, "validate cdc table success");
992 Ok(true)
993 } else {
994 Ok(false)
995 }
996 }
997
998 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
999 let migrated = self
1000 .metadata_manager
1001 .catalog_controller
1002 .has_table_been_migrated(table_id)
1003 .await?;
1004 if !migrated {
1005 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
1006 } else {
1007 Ok(())
1008 }
1009 }
1010
1011 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1014 pub async fn create_streaming_job(
1015 &self,
1016 mut streaming_job: StreamingJob,
1017 fragment_graph: StreamFragmentGraphProto,
1018 dependencies: HashSet<ObjectId>,
1019 resource_type: streaming_job_resource_type::ResourceType,
1020 if_not_exists: bool,
1021 ) -> MetaResult<NotificationVersion> {
1022 if let StreamingJob::Sink(sink) = &streaming_job
1023 && let Some(target_table) = sink.target_table
1024 {
1025 self.validate_table_for_sink(target_table).await?;
1026 }
1027 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1028 let check_ret = self
1029 .metadata_manager
1030 .catalog_controller
1031 .create_job_catalog(
1032 &mut streaming_job,
1033 &ctx,
1034 &fragment_graph.parallelism,
1035 fragment_graph.max_parallelism as _,
1036 dependencies,
1037 resource_type.clone(),
1038 &fragment_graph.backfill_parallelism,
1039 )
1040 .await;
1041 if let Err(meta_err) = check_ret {
1042 if !if_not_exists {
1043 return Err(meta_err);
1044 }
1045 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1046 if streaming_job.create_type() == CreateType::Foreground {
1047 let database_id = streaming_job.database_id();
1048 self.metadata_manager
1049 .wait_streaming_job_finished(database_id, *job_id)
1050 .await
1051 } else {
1052 Ok(IGNORED_NOTIFICATION_VERSION)
1053 }
1054 } else {
1055 Err(meta_err)
1056 };
1057 }
1058 let job_id = streaming_job.id();
1059 tracing::debug!(
1060 id = %job_id,
1061 definition = streaming_job.definition(),
1062 create_type = streaming_job.create_type().as_str_name(),
1063 job_type = ?streaming_job.job_type(),
1064 "starting streaming job",
1065 );
1066 let permit = self
1068 .creating_streaming_job_permits
1069 .semaphore
1070 .clone()
1071 .acquire_owned()
1072 .instrument_await("acquire_creating_streaming_job_permit")
1073 .await
1074 .unwrap();
1075 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1076
1077 let name = streaming_job.name();
1078 let definition = streaming_job.definition();
1079 let source_id = match &streaming_job {
1080 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1081 _ => None,
1082 };
1083
1084 match self
1086 .create_streaming_job_inner(ctx, streaming_job, fragment_graph, resource_type, permit)
1087 .await
1088 {
1089 Ok(version) => Ok(version),
1090 Err(err) => {
1091 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1092 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1093 id: job_id,
1094 name,
1095 definition,
1096 error: err.as_report().to_string(),
1097 };
1098 self.env.event_log_manager_ref().add_event_logs(vec![
1099 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1100 ]);
1101 let (aborted, _) = self
1102 .metadata_manager
1103 .catalog_controller
1104 .try_abort_creating_streaming_job(job_id, false)
1105 .await?;
1106 if aborted {
1107 tracing::warn!(id = %job_id, "aborted streaming job");
1108 if let Some(source_id) = source_id {
1110 self.source_manager
1111 .apply_source_change(SourceChange::DropSource {
1112 dropped_source_ids: vec![source_id],
1113 })
1114 .await;
1115 }
1116 }
1117 Err(err)
1118 }
1119 }
1120 }
1121
1122 #[await_tree::instrument(boxed)]
1123 async fn create_streaming_job_inner(
1124 &self,
1125 ctx: StreamContext,
1126 mut streaming_job: StreamingJob,
1127 fragment_graph: StreamFragmentGraphProto,
1128 resource_type: streaming_job_resource_type::ResourceType,
1129 permit: OwnedSemaphorePermit,
1130 ) -> MetaResult<NotificationVersion> {
1131 let mut fragment_graph =
1132 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1133 streaming_job.set_info_from_graph(&fragment_graph);
1134
1135 let incomplete_internal_tables = fragment_graph
1137 .incomplete_internal_tables()
1138 .into_values()
1139 .collect_vec();
1140 let table_id_map = self
1141 .metadata_manager
1142 .catalog_controller
1143 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1144 .await?;
1145 fragment_graph.refill_internal_table_ids(table_id_map);
1146
1147 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1149 let (ctx, stream_job_fragments) = self
1150 .build_stream_job(ctx, streaming_job, fragment_graph, resource_type)
1151 .await?;
1152
1153 let streaming_job = &ctx.streaming_job;
1154
1155 match streaming_job {
1156 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1157 self.validate_cdc_table(table, &stream_job_fragments)
1158 .await?;
1159 }
1160 StreamingJob::Table(Some(source), ..) => {
1161 self.source_manager.register_source(source).await?;
1163 let connector_name = source
1164 .get_with_properties()
1165 .get(UPSTREAM_SOURCE_KEY)
1166 .cloned();
1167 let attr = source.info.as_ref().map(|source_info| {
1168 jsonbb::json!({
1169 "format": source_info.format().as_str_name(),
1170 "encode": source_info.row_encode().as_str_name(),
1171 })
1172 });
1173 report_create_object(
1174 streaming_job.id(),
1175 "source",
1176 PbTelemetryDatabaseObject::Source,
1177 connector_name,
1178 attr,
1179 );
1180 }
1181 StreamingJob::Sink(sink) => {
1182 if sink.auto_refresh_schema_from_table.is_some() {
1183 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1184 }
1185 validate_sink(sink).await?;
1187 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1188 let attr = sink.format_desc.as_ref().map(|sink_info| {
1189 jsonbb::json!({
1190 "format": sink_info.format().as_str_name(),
1191 "encode": sink_info.encode().as_str_name(),
1192 })
1193 });
1194 report_create_object(
1195 streaming_job.id(),
1196 "sink",
1197 PbTelemetryDatabaseObject::Sink,
1198 connector_name,
1199 attr,
1200 );
1201 }
1202 StreamingJob::Source(source) => {
1203 self.source_manager.register_source(source).await?;
1205 let connector_name = source
1206 .get_with_properties()
1207 .get(UPSTREAM_SOURCE_KEY)
1208 .cloned();
1209 let attr = source.info.as_ref().map(|source_info| {
1210 jsonbb::json!({
1211 "format": source_info.format().as_str_name(),
1212 "encode": source_info.row_encode().as_str_name(),
1213 })
1214 });
1215 report_create_object(
1216 streaming_job.id(),
1217 "source",
1218 PbTelemetryDatabaseObject::Source,
1219 connector_name,
1220 attr,
1221 );
1222 }
1223 _ => {}
1224 }
1225
1226 let backfill_orders = ctx.fragment_backfill_ordering.to_meta_model();
1227 self.metadata_manager
1228 .catalog_controller
1229 .prepare_stream_job_fragments(
1230 &stream_job_fragments,
1231 streaming_job,
1232 false,
1233 Some(backfill_orders),
1234 )
1235 .await?;
1236
1237 let version = self
1239 .stream_manager
1240 .create_streaming_job(stream_job_fragments, ctx, permit)
1241 .await?;
1242
1243 Ok(version)
1244 }
1245
1246 pub async fn drop_object(
1248 &self,
1249 object_type: ObjectType,
1250 object_id: impl Into<ObjectId>,
1251 drop_mode: DropMode,
1252 ) -> MetaResult<NotificationVersion> {
1253 let object_id = object_id.into();
1254 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1257 let _source_tick_pause_guard = self.source_manager.pause_tick().await;
1258
1259 let (release_ctx, version) = self
1260 .metadata_manager
1261 .catalog_controller
1262 .drop_object(object_type, object_id, drop_mode)
1263 .await?;
1264
1265 if object_type == ObjectType::Source {
1266 self.env
1267 .notification_manager_ref()
1268 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1269 }
1270
1271 let ReleaseContext {
1272 database_id,
1273 removed_streaming_job_ids,
1274 removed_state_table_ids,
1275 removed_source_ids,
1276 removed_secret_ids: secret_ids,
1277 removed_source_fragments,
1278 removed_actors,
1279 removed_fragments,
1280 removed_sink_fragment_by_targets,
1281 removed_iceberg_table_sinks,
1282 } = release_ctx;
1283
1284 self.stream_manager
1285 .drop_streaming_jobs(
1286 database_id,
1287 removed_actors.iter().map(|id| *id as _).collect(),
1288 removed_streaming_job_ids,
1289 removed_state_table_ids,
1290 removed_fragments.iter().map(|id| *id as _).collect(),
1291 removed_sink_fragment_by_targets
1292 .into_iter()
1293 .map(|(target, sinks)| {
1294 (target as _, sinks.into_iter().map(|id| id as _).collect())
1295 })
1296 .collect(),
1297 )
1298 .await;
1299
1300 self.source_manager
1303 .apply_source_change(SourceChange::DropSource {
1304 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1305 })
1306 .await;
1307
1308 let dropped_source_fragments = removed_source_fragments;
1311 self.source_manager
1312 .apply_source_change(SourceChange::DropMv {
1313 dropped_source_fragments,
1314 })
1315 .await;
1316
1317 let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1319 .iter()
1320 .map(|sink| sink.id)
1321 .collect();
1322
1323 for sink in removed_iceberg_table_sinks {
1324 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1325 .expect("Iceberg sink should be valid");
1326 let iceberg_sink =
1327 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1328 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1329 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1330 tracing::info!(
1331 "dropping iceberg table {} for dropped sink",
1332 table_identifier
1333 );
1334
1335 let _ = iceberg_catalog
1336 .drop_table(&table_identifier)
1337 .await
1338 .inspect_err(|err| {
1339 tracing::error!(
1340 "failed to drop iceberg table {} during cleanup: {}",
1341 table_identifier,
1342 err.as_report()
1343 );
1344 });
1345 }
1346 }
1347
1348 if !iceberg_sink_ids.is_empty() {
1350 self.sink_manager
1351 .stop_sink_coordinator(iceberg_sink_ids.clone())
1352 .await;
1353
1354 for sink_id in iceberg_sink_ids {
1355 self.iceberg_compaction_manager
1356 .clear_iceberg_commits_by_sink_id(sink_id);
1357 }
1358 }
1359
1360 for secret in secret_ids {
1362 LocalSecretManager::global().remove_secret(secret);
1363 }
1364 Ok(version)
1365 }
1366
1367 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1369 pub async fn replace_job(
1370 &self,
1371 mut streaming_job: StreamingJob,
1372 fragment_graph: StreamFragmentGraphProto,
1373 ) -> MetaResult<NotificationVersion> {
1374 match &streaming_job {
1375 StreamingJob::Table(..)
1376 | StreamingJob::Source(..)
1377 | StreamingJob::MaterializedView(..) => {}
1378 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1379 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1380 }
1381 }
1382
1383 let job_id = streaming_job.id();
1384
1385 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1386 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1387
1388 let original_max_parallelism = self
1390 .metadata_manager
1391 .get_job_max_parallelism(streaming_job.id())
1392 .await?;
1393 let fragment_graph = PbStreamFragmentGraph {
1394 max_parallelism: original_max_parallelism as _,
1395 ..fragment_graph
1396 };
1397
1398 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1400 streaming_job.set_info_from_graph(&fragment_graph);
1401
1402 let streaming_job = streaming_job;
1404
1405 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1406 let auto_refresh_schema_sinks = self
1407 .metadata_manager
1408 .catalog_controller
1409 .get_sink_auto_refresh_schema_from(table.id)
1410 .await?;
1411 if !auto_refresh_schema_sinks.is_empty() {
1412 let original_table_columns = self
1413 .metadata_manager
1414 .catalog_controller
1415 .get_table_columns(table.id)
1416 .await?;
1417 let mut original_table_column_ids: HashSet<_> = original_table_columns
1419 .iter()
1420 .map(|col| col.column_id())
1421 .collect();
1422 let newly_added_columns = table
1423 .columns
1424 .iter()
1425 .filter(|col| {
1426 !original_table_column_ids.remove(&ColumnId::new(
1427 col.column_desc.as_ref().unwrap().column_id as _,
1428 ))
1429 })
1430 .map(|col| ColumnCatalog::from(col.clone()))
1431 .collect_vec();
1432 if !original_table_column_ids.is_empty() {
1433 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1434 }
1435 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1436 for sink in auto_refresh_schema_sinks {
1437 let sink_job_fragments = self
1438 .metadata_manager
1439 .get_job_fragments_by_id(sink.id.as_job_id())
1440 .await?;
1441 if sink_job_fragments.fragments.len() != 1 {
1442 return Err(anyhow!(
1443 "auto schema refresh sink must have only one fragment, but got {}",
1444 sink_job_fragments.fragments.len()
1445 )
1446 .into());
1447 }
1448 let original_sink_fragment =
1449 sink_job_fragments.fragments.into_values().next().unwrap();
1450 let (new_sink_fragment, new_schema, new_log_store_table) =
1451 rewrite_refresh_schema_sink_fragment(
1452 &original_sink_fragment,
1453 &sink,
1454 &newly_added_columns,
1455 table,
1456 fragment_graph.table_fragment_id(),
1457 self.env.id_gen_manager(),
1458 self.env.actor_id_generator(),
1459 )?;
1460
1461 assert_eq!(
1462 original_sink_fragment.actors.len(),
1463 new_sink_fragment.actors.len()
1464 );
1465 let actor_status = (0..original_sink_fragment.actors.len())
1466 .map(|i| {
1467 let worker_node_id = sink_job_fragments.actor_status
1468 [&original_sink_fragment.actors[i].actor_id]
1469 .location
1470 .as_ref()
1471 .unwrap()
1472 .worker_node_id;
1473 (
1474 new_sink_fragment.actors[i].actor_id,
1475 PbActorStatus {
1476 location: Some(PbActorLocation { worker_node_id }),
1477 },
1478 )
1479 })
1480 .collect();
1481
1482 let streaming_job = StreamingJob::Sink(sink);
1483
1484 let tmp_sink_id = self
1485 .metadata_manager
1486 .catalog_controller
1487 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1488 .await?
1489 .as_sink_id();
1490 let StreamingJob::Sink(sink) = streaming_job else {
1491 unreachable!()
1492 };
1493
1494 sinks.push(AutoRefreshSchemaSinkContext {
1495 tmp_sink_id,
1496 original_sink: sink,
1497 original_fragment: original_sink_fragment,
1498 new_schema,
1499 newly_add_fields: newly_added_columns
1500 .iter()
1501 .map(|col| Field::from(&col.column_desc))
1502 .collect(),
1503 new_fragment: new_sink_fragment,
1504 new_log_store_table,
1505 actor_status,
1506 });
1507 }
1508 Some(sinks)
1509 } else {
1510 None
1511 }
1512 } else {
1513 None
1514 };
1515
1516 let tmp_id = self
1517 .metadata_manager
1518 .catalog_controller
1519 .create_job_catalog_for_replace(
1520 &streaming_job,
1521 Some(&ctx),
1522 fragment_graph.specified_parallelism().as_ref(),
1523 Some(fragment_graph.max_parallelism()),
1524 )
1525 .await?;
1526
1527 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1528 sinks
1529 .iter()
1530 .map(|sink| sink.tmp_sink_id.as_object_id())
1531 .collect_vec()
1532 });
1533
1534 tracing::debug!(id = %job_id, "building replace streaming job");
1535 let mut updated_sink_catalogs = vec![];
1536
1537 let mut drop_table_connector_ctx = None;
1538 let result: MetaResult<_> = try {
1539 let (mut ctx, mut stream_job_fragments) = self
1540 .build_replace_job(
1541 ctx,
1542 &streaming_job,
1543 fragment_graph,
1544 tmp_id,
1545 auto_refresh_schema_sinks,
1546 )
1547 .await?;
1548 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1549 let auto_refresh_schema_sink_finish_ctx =
1550 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1551 sinks
1552 .iter()
1553 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1554 tmp_sink_id: sink.tmp_sink_id,
1555 original_sink_id: sink.original_sink.id,
1556 columns: sink.new_schema.clone(),
1557 new_log_store_table: sink
1558 .new_log_store_table
1559 .as_ref()
1560 .map(|table| (table.id, table.columns.clone())),
1561 })
1562 .collect()
1563 });
1564
1565 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1567 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1568 let upstream_infos = self
1569 .metadata_manager
1570 .catalog_controller
1571 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1572 .await?;
1573 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1574
1575 for upstream_info in &upstream_infos {
1576 let upstream_fragment_id = upstream_info.sink_fragment_id;
1577 ctx.upstream_fragment_downstreams
1578 .entry(upstream_fragment_id)
1579 .or_default()
1580 .push(upstream_info.new_sink_downstream.clone());
1581 if upstream_info.sink_original_target_columns.is_empty() {
1582 updated_sink_catalogs.push(upstream_info.sink_id);
1583 }
1584 }
1585 }
1586
1587 let replace_upstream = ctx.replace_upstream.clone();
1588
1589 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1590 let empty_downstreams = FragmentDownstreamRelation::default();
1591 for sink in sinks {
1592 self.metadata_manager
1593 .catalog_controller
1594 .prepare_streaming_job(
1595 sink.tmp_sink_id.as_job_id(),
1596 || [&sink.new_fragment].into_iter(),
1597 &empty_downstreams,
1598 true,
1599 None,
1600 None,
1601 )
1602 .await?;
1603 }
1604 }
1605
1606 self.metadata_manager
1607 .catalog_controller
1608 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1609 .await?;
1610
1611 self.stream_manager
1612 .replace_stream_job(stream_job_fragments, ctx)
1613 .await?;
1614 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1615 };
1616
1617 match result {
1618 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1619 let version = self
1620 .metadata_manager
1621 .catalog_controller
1622 .finish_replace_streaming_job(
1623 tmp_id,
1624 streaming_job,
1625 replace_upstream,
1626 SinkIntoTableContext {
1627 updated_sink_catalogs,
1628 },
1629 drop_table_connector_ctx.as_ref(),
1630 auto_refresh_schema_sink_finish_ctx,
1631 )
1632 .await?;
1633 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1634 self.source_manager
1635 .apply_source_change(SourceChange::DropSource {
1636 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1637 })
1638 .await;
1639 }
1640 Ok(version)
1641 }
1642 Err(err) => {
1643 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1644 let _ = self.metadata_manager
1645 .catalog_controller
1646 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1647 .await.inspect_err(|err| {
1648 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1649 });
1650 Err(err)
1651 }
1652 }
1653 }
1654
1655 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1656 )]
1657 async fn drop_streaming_job(
1658 &self,
1659 job_id: StreamingJobId,
1660 drop_mode: DropMode,
1661 ) -> MetaResult<NotificationVersion> {
1662 let (object_id, object_type) = match job_id {
1663 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1664 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1665 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1666 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1667 };
1668
1669 let job_status = self
1670 .metadata_manager
1671 .catalog_controller
1672 .get_streaming_job_status(job_id.id())
1673 .await?;
1674 let version = match job_status {
1675 JobStatus::Initial => {
1676 unreachable!(
1677 "Job with Initial status should not notify frontend and therefore should not arrive here"
1678 );
1679 }
1680 JobStatus::Creating => {
1681 self.stream_manager
1682 .cancel_streaming_jobs(vec![job_id.id()])
1683 .await?;
1684 IGNORED_NOTIFICATION_VERSION
1685 }
1686 JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1687 };
1688
1689 Ok(version)
1690 }
1691
1692 fn resolve_stream_parallelism(
1696 &self,
1697 specified: Option<NonZeroUsize>,
1698 max: NonZeroUsize,
1699 cluster_info: &StreamingClusterInfo,
1700 resource_group: String,
1701 ) -> MetaResult<NonZeroUsize> {
1702 let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1703 DdlController::resolve_stream_parallelism_inner(
1704 specified,
1705 max,
1706 available,
1707 &self.env.opts.default_parallelism,
1708 &resource_group,
1709 )
1710 }
1711
1712 fn resolve_stream_parallelism_inner(
1713 specified: Option<NonZeroUsize>,
1714 max: NonZeroUsize,
1715 available: Option<NonZeroUsize>,
1716 default_parallelism: &DefaultParallelism,
1717 resource_group: &str,
1718 ) -> MetaResult<NonZeroUsize> {
1719 let Some(available) = available else {
1720 bail_unavailable!(
1721 "no available slots to schedule in resource group \"{}\", \
1722 have you allocated any compute nodes within this resource group?",
1723 resource_group
1724 );
1725 };
1726
1727 if let Some(specified) = specified {
1728 if specified > max {
1729 bail_invalid_parameter!(
1730 "specified parallelism {} should not exceed max parallelism {}",
1731 specified,
1732 max,
1733 );
1734 }
1735 if specified > available {
1736 tracing::warn!(
1737 resource_group,
1738 specified_parallelism = specified.get(),
1739 available_parallelism = available.get(),
1740 "specified parallelism exceeds available slots, scheduling with specified value",
1741 );
1742 }
1743 return Ok(specified);
1744 }
1745
1746 let default_parallelism = match default_parallelism {
1748 DefaultParallelism::Full => available,
1749 DefaultParallelism::Default(num) => {
1750 if *num > available {
1751 tracing::warn!(
1752 resource_group,
1753 configured_parallelism = num.get(),
1754 available_parallelism = available.get(),
1755 "default parallelism exceeds available slots, scheduling with configured value",
1756 );
1757 }
1758 *num
1759 }
1760 };
1761
1762 if default_parallelism > max {
1763 tracing::warn!(
1764 max_parallelism = max.get(),
1765 resource_group,
1766 "default parallelism exceeds max parallelism, capping to max",
1767 );
1768 }
1769 Ok(default_parallelism.min(max))
1770 }
1771
1772 #[await_tree::instrument]
1778 pub(crate) async fn build_stream_job(
1779 &self,
1780 stream_ctx: StreamContext,
1781 mut stream_job: StreamingJob,
1782 fragment_graph: StreamFragmentGraph,
1783 resource_type: streaming_job_resource_type::ResourceType,
1784 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1785 let id = stream_job.id();
1786 let specified_parallelism = fragment_graph.specified_parallelism();
1787 let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1788 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1789
1790 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1792
1793 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1797 fragment_graph.collect_snapshot_backfill_info()?;
1798 assert!(
1799 snapshot_backfill_info
1800 .iter()
1801 .chain([&cross_db_snapshot_backfill_info])
1802 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1803 .all(|backfill_epoch| backfill_epoch.is_none()),
1804 "should not set backfill epoch when initially build the job: {:?} {:?}",
1805 snapshot_backfill_info,
1806 cross_db_snapshot_backfill_info
1807 );
1808
1809 let locality_fragment_state_table_mapping =
1810 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1811
1812 self.metadata_manager
1814 .catalog_controller
1815 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1816 .await?;
1817
1818 let upstream_table_ids = fragment_graph
1819 .dependent_table_ids()
1820 .iter()
1821 .filter(|id| {
1822 !cross_db_snapshot_backfill_info
1823 .upstream_mv_table_id_to_backfill_epoch
1824 .contains_key(*id)
1825 })
1826 .cloned()
1827 .collect();
1828
1829 let (upstream_root_fragments, existing_actor_location) = self
1830 .metadata_manager
1831 .get_upstream_root_fragments(&upstream_table_ids)
1832 .await?;
1833
1834 if snapshot_backfill_info.is_some() {
1835 match stream_job {
1836 StreamingJob::MaterializedView(_)
1837 | StreamingJob::Sink(_)
1838 | StreamingJob::Index(_, _) => {}
1839 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1840 return Err(
1841 anyhow!("snapshot_backfill not enabled for table and source").into(),
1842 );
1843 }
1844 }
1845 }
1846
1847 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1848 fragment_graph,
1849 FragmentGraphUpstreamContext {
1850 upstream_root_fragments,
1851 upstream_actor_location: existing_actor_location,
1852 },
1853 (&stream_job).into(),
1854 )?;
1855 let resource_group = if let Some(group) = resource_type.resource_group() {
1856 group
1857 } else {
1858 self.metadata_manager
1859 .get_database_resource_group(stream_job.database_id())
1860 .await?
1861 };
1862 let is_serverless_backfill = matches!(
1863 &resource_type,
1864 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1865 );
1866
1867 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1869
1870 let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1871 let parallelism = self.resolve_stream_parallelism(
1872 initial_parallelism,
1873 max_parallelism,
1874 &cluster_info,
1875 resource_group.clone(),
1876 )?;
1877
1878 let parallelism = if initial_parallelism.is_some() {
1879 parallelism.get()
1880 } else {
1881 let adaptive_strategy = match stream_ctx.adaptive_parallelism_strategy {
1883 Some(strategy) => strategy,
1884 None => self
1885 .env
1886 .system_params_reader()
1887 .await
1888 .adaptive_parallelism_strategy(),
1889 };
1890 adaptive_strategy.compute_target_parallelism(parallelism.get())
1891 };
1892
1893 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1894 let actor_graph_builder = ActorGraphBuilder::new(
1895 id,
1896 resource_group,
1897 complete_graph,
1898 cluster_info,
1899 parallelism,
1900 )?;
1901
1902 let ActorGraphBuildResult {
1903 graph,
1904 downstream_fragment_relations,
1905 building_locations,
1906 upstream_fragment_downstreams,
1907 replace_upstream,
1908 ..
1909 } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1910 assert!(replace_upstream.is_empty());
1911
1912 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1919 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1920 _ => TableParallelism::Fixed(parallelism.get()),
1921 };
1922
1923 let stream_job_fragments = StreamJobFragments::new(
1924 id,
1925 graph,
1926 &building_locations.actor_locations,
1927 stream_ctx.clone(),
1928 table_parallelism,
1929 max_parallelism.get(),
1930 );
1931
1932 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1933 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1934 }
1935
1936 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1937 && let Ok(table_id) = sink.get_target_table()
1938 {
1939 let tables = self
1940 .metadata_manager
1941 .get_table_catalog_by_ids(&[*table_id])
1942 .await?;
1943 let target_table = tables
1944 .first()
1945 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1946 let sink_fragment = stream_job_fragments
1947 .sink_fragment()
1948 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1949 let mview_fragment_id = self
1950 .metadata_manager
1951 .catalog_controller
1952 .get_mview_fragment_by_id(table_id.as_job_id())
1953 .await?;
1954 let upstream_sink_info = build_upstream_sink_info(
1955 sink.id,
1956 sink.original_target_columns.clone(),
1957 sink_fragment.fragment_id as _,
1958 target_table,
1959 mview_fragment_id,
1960 )?;
1961 Some(upstream_sink_info)
1962 } else {
1963 None
1964 };
1965
1966 let mut cdc_table_snapshot_splits = None;
1967 if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1968 && let Some((_, stream_cdc_scan)) =
1969 parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1970 {
1971 {
1972 let splits = try_init_parallel_cdc_table_snapshot_splits(
1974 table.id,
1975 stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1976 self.env.meta_store_ref(),
1977 stream_cdc_scan.options.as_ref().unwrap(),
1978 self.env.opts.cdc_table_split_init_insert_batch_size,
1979 self.env.opts.cdc_table_split_init_sleep_interval_splits,
1980 self.env.opts.cdc_table_split_init_sleep_duration_millis,
1981 )
1982 .await?;
1983 cdc_table_snapshot_splits = Some(splits);
1984 }
1985 }
1986
1987 let ctx = CreateStreamingJobContext {
1988 upstream_fragment_downstreams,
1989 building_locations,
1990 definition: stream_job.definition(),
1991 create_type: stream_job.create_type(),
1992 job_type: (&stream_job).into(),
1993 streaming_job: stream_job,
1994 new_upstream_sink,
1995 option: CreateStreamingJobOption {},
1996 snapshot_backfill_info,
1997 cross_db_snapshot_backfill_info,
1998 fragment_backfill_ordering,
1999 locality_fragment_state_table_mapping,
2000 cdc_table_snapshot_splits,
2001 is_serverless_backfill,
2002 };
2003
2004 Ok((
2005 ctx,
2006 StreamJobFragmentsToCreate {
2007 inner: stream_job_fragments,
2008 downstreams: downstream_fragment_relations,
2009 },
2010 ))
2011 }
2012
2013 pub(crate) async fn build_replace_job(
2019 &self,
2020 stream_ctx: StreamContext,
2021 stream_job: &StreamingJob,
2022 mut fragment_graph: StreamFragmentGraph,
2023 tmp_job_id: JobId,
2024 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
2025 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
2026 match &stream_job {
2027 StreamingJob::Table(..)
2028 | StreamingJob::Source(..)
2029 | StreamingJob::MaterializedView(..) => {}
2030 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
2031 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
2032 }
2033 }
2034
2035 let id = stream_job.id();
2036
2037 let mut drop_table_associated_source_id = None;
2039 if let StreamingJob::Table(None, _, _) = &stream_job {
2040 drop_table_associated_source_id = self
2041 .metadata_manager
2042 .get_table_associated_source_id(id.as_mv_table_id())
2043 .await?;
2044 }
2045
2046 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
2047 let old_internal_table_ids = old_fragments.internal_table_ids();
2048
2049 let mut drop_table_connector_ctx = None;
2051 if let Some(to_remove_source_id) = drop_table_associated_source_id {
2052 debug_assert!(old_internal_table_ids.len() == 1);
2054
2055 drop_table_connector_ctx = Some(DropTableConnectorContext {
2056 to_change_streaming_job_id: id,
2059 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
2061 });
2062 } else if stream_job.is_materialized_view() {
2063 let old_fragments_upstreams = self
2066 .metadata_manager
2067 .catalog_controller
2068 .upstream_fragments(old_fragments.fragment_ids())
2069 .await?;
2070
2071 let old_state_graph =
2072 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2073 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2074 let result = state_match::match_graph(&new_state_graph, &old_state_graph)
2075 .context("incompatible altering on the streaming job states")?;
2076
2077 fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2078 fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2079 } else {
2080 let old_internal_tables = self
2083 .metadata_manager
2084 .get_table_catalog_by_ids(&old_internal_table_ids)
2085 .await?;
2086 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2087 }
2088
2089 let original_root_fragment = old_fragments
2092 .root_fragment()
2093 .expect("root fragment not found");
2094
2095 let job_type = StreamingJobType::from(stream_job);
2096
2097 let (mut downstream_fragments, mut downstream_actor_location) =
2099 self.metadata_manager.get_downstream_fragments(id).await?;
2100
2101 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2102 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2103 .iter()
2104 .map(|sink| sink.original_fragment.fragment_id)
2105 .collect();
2106 for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2107 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2108 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2109 }) {
2110 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2111 for actor_id in downstream_fragment.actors.keys() {
2112 downstream_actor_location.remove(actor_id);
2113 }
2114 for (actor_id, status) in &sink.actor_status {
2115 downstream_actor_location
2116 .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2117 }
2118
2119 *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2120 *nodes = sink.new_fragment.nodes.clone();
2121 }
2122 }
2123 assert!(remaining_fragment.is_empty());
2124 }
2125
2126 let complete_graph = match &job_type {
2128 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2129 CompleteStreamFragmentGraph::with_downstreams(
2130 fragment_graph,
2131 FragmentGraphDownstreamContext {
2132 original_root_fragment_id: original_root_fragment.fragment_id,
2133 downstream_fragments,
2134 downstream_actor_location,
2135 },
2136 job_type,
2137 )?
2138 }
2139 StreamingJobType::Table(TableJobType::SharedCdcSource)
2140 | StreamingJobType::MaterializedView => {
2141 let (upstream_root_fragments, upstream_actor_location) = self
2143 .metadata_manager
2144 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2145 .await?;
2146
2147 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2148 fragment_graph,
2149 FragmentGraphUpstreamContext {
2150 upstream_root_fragments,
2151 upstream_actor_location,
2152 },
2153 FragmentGraphDownstreamContext {
2154 original_root_fragment_id: original_root_fragment.fragment_id,
2155 downstream_fragments,
2156 downstream_actor_location,
2157 },
2158 job_type,
2159 )?
2160 }
2161 _ => unreachable!(),
2162 };
2163
2164 let resource_group = self
2165 .metadata_manager
2166 .get_existing_job_resource_group(id)
2167 .await?;
2168
2169 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2171
2172 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2175 .expect("The number of actors in the original table fragment should be greater than 0");
2176
2177 let actor_graph_builder = ActorGraphBuilder::new(
2178 id,
2179 resource_group,
2180 complete_graph,
2181 cluster_info,
2182 parallelism,
2183 )?;
2184
2185 let ActorGraphBuildResult {
2186 graph,
2187 downstream_fragment_relations,
2188 building_locations,
2189 upstream_fragment_downstreams,
2190 mut replace_upstream,
2191 ..
2192 } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2193
2194 if matches!(
2196 job_type,
2197 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2198 ) {
2199 assert!(upstream_fragment_downstreams.is_empty());
2200 }
2201
2202 let stream_job_fragments = StreamJobFragments::new(
2206 tmp_job_id,
2207 graph,
2208 &building_locations.actor_locations,
2209 stream_ctx,
2210 old_fragments.assigned_parallelism,
2211 old_fragments.max_parallelism,
2212 );
2213
2214 if let Some(sinks) = &auto_refresh_schema_sinks {
2215 for sink in sinks {
2216 replace_upstream
2217 .remove(&sink.new_fragment.fragment_id)
2218 .expect("should exist");
2219 }
2220 }
2221
2222 let ctx = ReplaceStreamJobContext {
2226 old_fragments,
2227 replace_upstream,
2228 upstream_fragment_downstreams,
2229 building_locations,
2230 streaming_job: stream_job.clone(),
2231 tmp_id: tmp_job_id,
2232 drop_table_connector_ctx,
2233 auto_refresh_schema_sinks,
2234 };
2235
2236 Ok((
2237 ctx,
2238 StreamJobFragmentsToCreate {
2239 inner: stream_job_fragments,
2240 downstreams: downstream_fragment_relations,
2241 },
2242 ))
2243 }
2244
2245 async fn alter_name(
2246 &self,
2247 relation: alter_name_request::Object,
2248 new_name: &str,
2249 ) -> MetaResult<NotificationVersion> {
2250 let (obj_type, id): (ObjectType, ObjectId) = match relation {
2251 alter_name_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2252 alter_name_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2253 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id.into()),
2254 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2255 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2256 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2257 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2258 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2259 };
2260 self.metadata_manager
2261 .catalog_controller
2262 .alter_name(obj_type, id, new_name)
2263 .await
2264 }
2265
2266 async fn alter_swap_rename(
2267 &self,
2268 object: alter_swap_rename_request::Object,
2269 ) -> MetaResult<NotificationVersion> {
2270 let (obj_type, src_id, dst_id) = match object {
2271 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2272 alter_swap_rename_request::Object::Table(objs) => {
2273 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2274 (ObjectType::Table, src_id, dst_id)
2275 }
2276 alter_swap_rename_request::Object::View(objs) => {
2277 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2278 (ObjectType::View, src_id, dst_id)
2279 }
2280 alter_swap_rename_request::Object::Source(objs) => {
2281 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2282 (ObjectType::Source, src_id, dst_id)
2283 }
2284 alter_swap_rename_request::Object::Sink(objs) => {
2285 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2286 (ObjectType::Sink, src_id, dst_id)
2287 }
2288 alter_swap_rename_request::Object::Subscription(objs) => {
2289 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2290 (ObjectType::Subscription, src_id, dst_id)
2291 }
2292 };
2293
2294 self.metadata_manager
2295 .catalog_controller
2296 .alter_swap_rename(obj_type, src_id, dst_id)
2297 .await
2298 }
2299
2300 async fn alter_owner(
2301 &self,
2302 object: Object,
2303 owner_id: UserId,
2304 ) -> MetaResult<NotificationVersion> {
2305 let (obj_type, id): (ObjectType, ObjectId) = match object {
2306 Object::TableId(id) => (ObjectType::Table, id.into()),
2307 Object::ViewId(id) => (ObjectType::View, id.into()),
2308 Object::SourceId(id) => (ObjectType::Source, id.into()),
2309 Object::SinkId(id) => (ObjectType::Sink, id.into()),
2310 Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2311 Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2312 Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2313 Object::ConnectionId(id) => (ObjectType::Connection, id.into()),
2314 Object::FunctionId(id) => (ObjectType::Function, id.into()),
2315 Object::SecretId(id) => (ObjectType::Secret, id.into()),
2316 };
2317 self.metadata_manager
2318 .catalog_controller
2319 .alter_owner(obj_type, id, owner_id as _)
2320 .await
2321 }
2322
2323 async fn alter_set_schema(
2324 &self,
2325 object: alter_set_schema_request::Object,
2326 new_schema_id: SchemaId,
2327 ) -> MetaResult<NotificationVersion> {
2328 let (obj_type, id): (ObjectType, ObjectId) = match object {
2329 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2330 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2331 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2332 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2333 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id.into()),
2334 alter_set_schema_request::Object::ConnectionId(id) => {
2335 (ObjectType::Connection, id.into())
2336 }
2337 alter_set_schema_request::Object::SubscriptionId(id) => {
2338 (ObjectType::Subscription, id.into())
2339 }
2340 };
2341 self.metadata_manager
2342 .catalog_controller
2343 .alter_schema(obj_type, id, new_schema_id)
2344 .await
2345 }
2346
2347 pub async fn wait(&self) -> MetaResult<WaitVersion> {
2348 let timeout_ms = 30 * 60 * 1000;
2349 for _ in 0..timeout_ms {
2350 if self
2351 .metadata_manager
2352 .catalog_controller
2353 .list_background_creating_jobs(true, None)
2354 .await?
2355 .is_empty()
2356 {
2357 let catalog_version = self
2358 .metadata_manager
2359 .catalog_controller
2360 .notify_frontend_trivial()
2361 .await;
2362 let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2363 return Ok(WaitVersion {
2364 catalog_version,
2365 hummock_version_id,
2366 });
2367 }
2368
2369 sleep(Duration::from_millis(1)).await;
2370 }
2371 Err(MetaError::cancelled(format!(
2372 "timeout after {timeout_ms}ms"
2373 )))
2374 }
2375
2376 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2377 self.metadata_manager
2378 .catalog_controller
2379 .comment_on(comment)
2380 .await
2381 }
2382
2383 async fn alter_streaming_job_config(
2384 &self,
2385 job_id: JobId,
2386 entries_to_add: HashMap<String, String>,
2387 keys_to_remove: Vec<String>,
2388 ) -> MetaResult<NotificationVersion> {
2389 self.metadata_manager
2390 .catalog_controller
2391 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2392 .await
2393 }
2394}
2395
2396fn report_create_object(
2397 job_id: JobId,
2398 event_name: &str,
2399 obj_type: PbTelemetryDatabaseObject,
2400 connector_name: Option<String>,
2401 attr_info: Option<jsonbb::Value>,
2402) {
2403 report_event(
2404 PbTelemetryEventStage::CreateStreamJob,
2405 event_name,
2406 job_id.as_raw_id() as _,
2407 connector_name,
2408 Some(obj_type),
2409 attr_info,
2410 );
2411}
2412
2413pub fn build_upstream_sink_info(
2414 sink_id: SinkId,
2415 original_target_columns: Vec<PbColumnCatalog>,
2416 sink_fragment_id: FragmentId,
2417 target_table: &PbTable,
2418 target_fragment_id: FragmentId,
2419) -> MetaResult<UpstreamSinkInfo> {
2420 let sink_columns = if !original_target_columns.is_empty() {
2421 original_target_columns.clone()
2422 } else {
2423 target_table.columns.clone()
2428 };
2429
2430 let sink_output_fields = sink_columns
2431 .iter()
2432 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2433 .collect_vec();
2434 let output_indices = (0..sink_output_fields.len())
2435 .map(|i| i as u32)
2436 .collect_vec();
2437
2438 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2439 let sink_idx_by_col_id = sink_columns
2440 .iter()
2441 .enumerate()
2442 .map(|(idx, col)| {
2443 let column_id = col.column_desc.as_ref().unwrap().column_id;
2444 (column_id, idx as u32)
2445 })
2446 .collect::<HashMap<_, _>>();
2447 target_table
2448 .distribution_key
2449 .iter()
2450 .map(|dist_idx| {
2451 let column_id = target_table.columns[*dist_idx as usize]
2452 .column_desc
2453 .as_ref()
2454 .unwrap()
2455 .column_id;
2456 let sink_idx = sink_idx_by_col_id
2457 .get(&column_id)
2458 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2459 Ok(*sink_idx)
2460 })
2461 .collect::<anyhow::Result<Vec<_>>>()?
2462 };
2463 let dist_key_indices =
2464 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2465 let downstream_fragment_id = target_fragment_id as _;
2466 let new_downstream_relation = DownstreamFragmentRelation {
2467 downstream_fragment_id,
2468 dispatcher_type: DispatcherType::Hash,
2469 dist_key_indices,
2470 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2471 };
2472 let current_target_columns = target_table.get_columns();
2473 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2474 Ok(UpstreamSinkInfo {
2475 sink_id,
2476 sink_fragment_id: sink_fragment_id as _,
2477 sink_output_fields,
2478 sink_original_target_columns: original_target_columns,
2479 project_exprs,
2480 new_sink_downstream: new_downstream_relation,
2481 })
2482}
2483
2484pub fn refill_upstream_sink_union_in_table(
2485 union_fragment_root: &mut PbStreamNode,
2486 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2487) {
2488 visit_stream_node_cont_mut(union_fragment_root, |node| {
2489 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2490 let init_upstreams = upstream_sink_infos
2491 .iter()
2492 .map(|info| PbUpstreamSinkInfo {
2493 upstream_fragment_id: info.sink_fragment_id,
2494 sink_output_schema: info.sink_output_fields.clone(),
2495 project_exprs: info.project_exprs.clone(),
2496 })
2497 .collect();
2498 upstream_sink_union.init_upstreams = init_upstreams;
2499 false
2500 } else {
2501 true
2502 }
2503 });
2504}
2505
2506#[cfg(test)]
2507mod tests {
2508 use std::num::NonZeroUsize;
2509
2510 use super::*;
2511
2512 #[test]
2513 fn test_specified_parallelism_exceeds_available() {
2514 let result = DdlController::resolve_stream_parallelism_inner(
2515 Some(NonZeroUsize::new(100).unwrap()),
2516 NonZeroUsize::new(256).unwrap(),
2517 Some(NonZeroUsize::new(4).unwrap()),
2518 &DefaultParallelism::Full,
2519 "default",
2520 )
2521 .unwrap();
2522 assert_eq!(result.get(), 100);
2523 }
2524
2525 #[test]
2526 fn test_allows_default_parallelism_over_available() {
2527 let result = DdlController::resolve_stream_parallelism_inner(
2528 None,
2529 NonZeroUsize::new(256).unwrap(),
2530 Some(NonZeroUsize::new(4).unwrap()),
2531 &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2532 "default",
2533 )
2534 .unwrap();
2535 assert_eq!(result.get(), 50);
2536 }
2537
2538 #[test]
2539 fn test_full_parallelism_capped_by_max() {
2540 let result = DdlController::resolve_stream_parallelism_inner(
2541 None,
2542 NonZeroUsize::new(6).unwrap(),
2543 Some(NonZeroUsize::new(10).unwrap()),
2544 &DefaultParallelism::Full,
2545 "default",
2546 )
2547 .unwrap();
2548 assert_eq!(result.get(), 6);
2549 }
2550
2551 #[test]
2552 fn test_no_available_slots_returns_error() {
2553 let result = DdlController::resolve_stream_parallelism_inner(
2554 None,
2555 NonZeroUsize::new(4).unwrap(),
2556 None,
2557 &DefaultParallelism::Full,
2558 "default",
2559 );
2560 assert!(matches!(
2561 result,
2562 Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2563 ));
2564 }
2565
2566 #[test]
2567 fn test_specified_over_max_returns_error() {
2568 let result = DdlController::resolve_stream_parallelism_inner(
2569 Some(NonZeroUsize::new(8).unwrap()),
2570 NonZeroUsize::new(4).unwrap(),
2571 Some(NonZeroUsize::new(10).unwrap()),
2572 &DefaultParallelism::Full,
2573 "default",
2574 );
2575 assert!(matches!(
2576 result,
2577 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2578 ));
2579 }
2580}