1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::hash::VnodeCountCompat;
30use risingwave_common::id::{JobId, TableId};
31use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
32use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48 streaming_job,
49};
50use risingwave_pb::catalog::{
51 Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
52 Subscription, Table, View,
53};
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57 alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType as PbFragmentDistributionType;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64 StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::Semaphore;
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::controller::utils::build_select_node_list;
77use crate::error::{MetaErrorInner, bail_invalid_parameter};
78use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
79use crate::manager::sink_coordination::SinkCoordinatorManager;
80use crate::manager::{
81 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
82 NotificationVersion, StreamingJob, StreamingJobType,
83};
84use crate::model::{
85 DownstreamFragmentRelation, FragmentDownstreamRelation, FragmentId as CatalogFragmentId,
86 StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
87};
88use crate::stream::cdc::{
89 parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
90};
91use crate::stream::{
92 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
93 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
94 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
95 ParallelismPolicy, ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef,
96 StreamFragmentGraph, UpstreamSinkInfo, check_sink_fragments_support_refresh_schema,
97 create_source_worker, rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
98};
99use crate::telemetry::report_event;
100use crate::{MetaError, MetaResult};
101
102#[derive(PartialEq)]
103pub enum DropMode {
104 Restrict,
105 Cascade,
106}
107
108impl DropMode {
109 pub fn from_request_setting(cascade: bool) -> DropMode {
110 if cascade {
111 DropMode::Cascade
112 } else {
113 DropMode::Restrict
114 }
115 }
116}
117
118#[derive(strum::AsRefStr)]
119pub enum StreamingJobId {
120 MaterializedView(TableId),
121 Sink(SinkId),
122 Table(Option<SourceId>, TableId),
123 Index(IndexId),
124}
125
126impl std::fmt::Display for StreamingJobId {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 write!(f, "{}", self.as_ref())?;
129 write!(f, "({})", self.id())
130 }
131}
132
133impl StreamingJobId {
134 fn id(&self) -> JobId {
135 match self {
136 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
137 StreamingJobId::Index(id) => id.as_job_id(),
138 StreamingJobId::Sink(id) => id.as_job_id(),
139 }
140 }
141}
142
143pub struct ReplaceStreamJobInfo {
146 pub streaming_job: StreamingJob,
147 pub fragment_graph: StreamFragmentGraphProto,
148}
149
150#[derive(Display)]
151pub enum DdlCommand {
152 CreateDatabase(Database),
153 DropDatabase(DatabaseId),
154 CreateSchema(Schema),
155 DropSchema(SchemaId, DropMode),
156 CreateNonSharedSource(Source),
157 DropSource(SourceId, DropMode),
158 ResetSource(SourceId),
159 CreateFunction(Function),
160 DropFunction(FunctionId, DropMode),
161 CreateView(View, HashSet<ObjectId>),
162 DropView(ViewId, DropMode),
163 CreateStreamingJob {
164 stream_job: StreamingJob,
165 fragment_graph: StreamFragmentGraphProto,
166 dependencies: HashSet<ObjectId>,
167 resource_type: streaming_job_resource_type::ResourceType,
168 if_not_exists: bool,
169 refresh_interval_sec: Option<u64>,
170 },
171 DropStreamingJob {
172 job_id: StreamingJobId,
173 drop_mode: DropMode,
174 },
175 AlterName(alter_name_request::Object, String),
176 AlterSwapRename(alter_swap_rename_request::Object),
177 ReplaceStreamJob(ReplaceStreamJobInfo),
178 AlterNonSharedSource(Source),
179 AlterObjectOwner(Object, UserId),
180 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
181 CreateConnection(Connection),
182 DropConnection(ConnectionId, DropMode),
183 CreateSecret(Secret),
184 AlterSecret(Secret),
185 DropSecret(SecretId, DropMode),
186 CommentOn(Comment),
187 CreateSubscription(Subscription),
188 DropSubscription(SubscriptionId, DropMode),
189 AlterSubscriptionRetention {
190 subscription_id: SubscriptionId,
191 retention_seconds: u64,
192 definition: String,
193 },
194 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
195 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
196}
197
198impl DdlCommand {
199 fn object(&self) -> Either<String, ObjectId> {
201 use Either::*;
202 match self {
203 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
204 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
205 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
206 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
207 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
208 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
209 DdlCommand::ResetSource(id) => Right(id.as_object_id()),
210 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
211 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
212 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
213 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
214 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
215 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
216 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
217 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
218 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
219 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
220 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
221 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
222 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
223 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
224 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
225 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
226 DdlCommand::DropSecret(id, _) => Right(id.as_object_id()),
227 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
228 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
229 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
230 DdlCommand::AlterSubscriptionRetention {
231 subscription_id, ..
232 } => Right(subscription_id.as_object_id()),
233 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
234 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
235 }
236 }
237
238 fn allow_in_recovery(&self) -> bool {
239 match self {
240 DdlCommand::DropDatabase(_)
241 | DdlCommand::DropSchema(_, _)
242 | DdlCommand::DropSource(_, _)
243 | DdlCommand::DropFunction(_, _)
244 | DdlCommand::DropView(_, _)
245 | DdlCommand::DropStreamingJob { .. }
246 | DdlCommand::DropConnection(_, _)
247 | DdlCommand::DropSecret(_, _)
248 | DdlCommand::DropSubscription(_, _)
249 | DdlCommand::AlterName(_, _)
250 | DdlCommand::AlterObjectOwner(_, _)
251 | DdlCommand::AlterSetSchema(_, _)
252 | DdlCommand::CreateDatabase(_)
253 | DdlCommand::CreateSchema(_)
254 | DdlCommand::CreateFunction(_)
255 | DdlCommand::CreateView(_, _)
256 | DdlCommand::CreateConnection(_)
257 | DdlCommand::CommentOn(_)
258 | DdlCommand::CreateSecret(_)
259 | DdlCommand::AlterSecret(_)
260 | DdlCommand::AlterSwapRename(_)
261 | DdlCommand::AlterDatabaseParam(_, _)
262 | DdlCommand::AlterStreamingJobConfig(_, _, _)
263 | DdlCommand::AlterSubscriptionRetention { .. } => true,
264 DdlCommand::CreateStreamingJob { .. }
265 | DdlCommand::CreateNonSharedSource(_)
266 | DdlCommand::ReplaceStreamJob(_)
267 | DdlCommand::AlterNonSharedSource(_)
268 | DdlCommand::ResetSource(_)
269 | DdlCommand::CreateSubscription(_) => false,
270 }
271 }
272}
273
274#[derive(Clone)]
275pub struct DdlController {
276 pub(crate) env: MetaSrvEnv,
277
278 pub(crate) metadata_manager: MetadataManager,
279 pub(crate) stream_manager: GlobalStreamManagerRef,
280 pub(crate) source_manager: SourceManagerRef,
281 barrier_manager: BarrierManagerRef,
282 sink_manager: SinkCoordinatorManager,
283 iceberg_compaction_manager: IcebergCompactionManagerRef,
284
285 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
287
288 seq: Arc<AtomicU64>,
290}
291
292#[derive(Clone)]
293pub struct CreatingStreamingJobPermit {
294 pub(crate) semaphore: Arc<Semaphore>,
295}
296
297impl CreatingStreamingJobPermit {
298 async fn new(env: &MetaSrvEnv) -> Self {
299 let mut permits = env
300 .system_params_reader()
301 .await
302 .max_concurrent_creating_streaming_jobs() as usize;
303 if permits == 0 {
304 permits = Semaphore::MAX_PERMITS;
306 }
307 let semaphore = Arc::new(Semaphore::new(permits));
308
309 let (local_notification_tx, mut local_notification_rx) =
310 tokio::sync::mpsc::unbounded_channel();
311 env.notification_manager()
312 .insert_local_sender(local_notification_tx);
313 let semaphore_clone = semaphore.clone();
314 tokio::spawn(async move {
315 while let Some(notification) = local_notification_rx.recv().await {
316 let LocalNotification::SystemParamsChange(p) = ¬ification else {
317 continue;
318 };
319 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
320 if new_permits == 0 {
321 new_permits = Semaphore::MAX_PERMITS;
322 }
323 match permits.cmp(&new_permits) {
324 Ordering::Less => {
325 semaphore_clone.add_permits(new_permits - permits);
326 }
327 Ordering::Equal => continue,
328 Ordering::Greater => {
329 let to_release = permits - new_permits;
330 let reduced = semaphore_clone.forget_permits(to_release);
331 if reduced != to_release {
333 tracing::warn!(
334 "no enough permits to release, expected {}, but reduced {}",
335 to_release,
336 reduced
337 );
338 }
339 }
340 }
341 tracing::info!(
342 "max_concurrent_creating_streaming_jobs changed from {} to {}",
343 permits,
344 new_permits
345 );
346 permits = new_permits;
347 }
348 });
349
350 Self { semaphore }
351 }
352}
353
354impl DdlController {
355 fn validate_specified_parallelism(
356 specified_parallelism: Option<NonZeroUsize>,
357 specified_backfill_parallelism: Option<NonZeroUsize>,
358 max_parallelism: NonZeroUsize,
359 ) -> MetaResult<()> {
360 if let Some(parallelism) = specified_parallelism
361 && parallelism > max_parallelism
362 {
363 bail_invalid_parameter!(
364 "specified parallelism {} should not exceed max parallelism {}",
365 parallelism,
366 max_parallelism,
367 );
368 }
369 if let Some(backfill_parallelism) = specified_backfill_parallelism
370 && backfill_parallelism > max_parallelism
371 {
372 bail_invalid_parameter!(
373 "specified backfill parallelism {} should not exceed max parallelism {}",
374 backfill_parallelism,
375 max_parallelism,
376 );
377 }
378 Ok(())
379 }
380
381 pub async fn new(
382 env: MetaSrvEnv,
383 metadata_manager: MetadataManager,
384 stream_manager: GlobalStreamManagerRef,
385 source_manager: SourceManagerRef,
386 barrier_manager: BarrierManagerRef,
387 sink_manager: SinkCoordinatorManager,
388 iceberg_compaction_manager: IcebergCompactionManagerRef,
389 ) -> Self {
390 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
391 Self {
392 env,
393 metadata_manager,
394 stream_manager,
395 source_manager,
396 barrier_manager,
397 sink_manager,
398 iceberg_compaction_manager,
399 creating_streaming_job_permits,
400 seq: Arc::new(AtomicU64::new(0)),
401 }
402 }
403
404 pub fn next_seq(&self) -> u64 {
406 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
408 }
409
410 #[expect(clippy::large_stack_frames)]
417 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
418 if !command.allow_in_recovery() {
419 self.barrier_manager.check_status_running()?;
420 }
421
422 let await_tree_key = format!("DDL Command {}", self.next_seq());
423 let await_tree_span = await_tree::span!("{command}({})", command.object());
424
425 let ctrl = self.clone();
426 let fut = Box::pin(async move {
427 match command {
428 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
429 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
430 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
431 DdlCommand::DropSchema(schema_id, drop_mode) => {
432 ctrl.drop_schema(schema_id, drop_mode).await
433 }
434 DdlCommand::CreateNonSharedSource(source) => {
435 ctrl.create_non_shared_source(source).await
436 }
437 DdlCommand::DropSource(source_id, drop_mode) => {
438 ctrl.drop_source(source_id, drop_mode).await
439 }
440 DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
441 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
442 DdlCommand::DropFunction(function_id, drop_mode) => {
443 ctrl.drop_function(function_id, drop_mode).await
444 }
445 DdlCommand::CreateView(view, dependencies) => {
446 ctrl.create_view(view, dependencies).await
447 }
448 DdlCommand::DropView(view_id, drop_mode) => {
449 ctrl.drop_view(view_id, drop_mode).await
450 }
451 DdlCommand::CreateStreamingJob {
452 stream_job,
453 fragment_graph,
454 dependencies,
455 resource_type,
456 if_not_exists,
457 refresh_interval_sec,
458 } => {
459 ctrl.create_streaming_job(
460 stream_job,
461 fragment_graph,
462 dependencies,
463 resource_type,
464 if_not_exists,
465 refresh_interval_sec,
466 )
467 .await
468 }
469 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
470 ctrl.drop_streaming_job(job_id, drop_mode).await
471 }
472 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
473 streaming_job,
474 fragment_graph,
475 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
476 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
477 DdlCommand::AlterObjectOwner(object, owner_id) => {
478 ctrl.alter_owner(object, owner_id).await
479 }
480 DdlCommand::AlterSetSchema(object, new_schema_id) => {
481 ctrl.alter_set_schema(object, new_schema_id).await
482 }
483 DdlCommand::CreateConnection(connection) => {
484 ctrl.create_connection(connection).await
485 }
486 DdlCommand::DropConnection(connection_id, drop_mode) => {
487 ctrl.drop_connection(connection_id, drop_mode).await
488 }
489 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
490 DdlCommand::DropSecret(secret_id, drop_mode) => {
491 ctrl.drop_secret(secret_id, drop_mode).await
492 }
493 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
494 DdlCommand::AlterNonSharedSource(source) => {
495 ctrl.alter_non_shared_source(source).await
496 }
497 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
498 DdlCommand::CreateSubscription(subscription) => {
499 ctrl.create_subscription(subscription).await
500 }
501 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
502 ctrl.drop_subscription(subscription_id, drop_mode).await
503 }
504 DdlCommand::AlterSubscriptionRetention {
505 subscription_id,
506 retention_seconds,
507 definition,
508 } => {
509 ctrl.alter_subscription_retention(
510 subscription_id,
511 retention_seconds,
512 definition,
513 )
514 .await
515 }
516 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
517 DdlCommand::AlterDatabaseParam(database_id, param) => {
518 ctrl.alter_database_param(database_id, param).await
519 }
520 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
521 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
522 .await
523 }
524 }
525 })
526 .in_current_span();
527 let fut = (self.env.await_tree_reg())
528 .register(await_tree_key, await_tree_span)
529 .instrument(Box::pin(fut));
530 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
531 Ok(Some(WaitVersion {
532 catalog_version: notification_version,
533 hummock_version_id: self.barrier_manager.get_hummock_version_id().await,
534 }))
535 }
536
537 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
538 self.barrier_manager.get_ddl_progress().await
539 }
540
541 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
542 let (version, updated_db) = self
543 .metadata_manager
544 .catalog_controller
545 .create_database(database)
546 .await?;
547 self.barrier_manager
549 .update_database_barrier(
550 updated_db.database_id,
551 updated_db.barrier_interval_ms.map(|v| v as u32),
552 updated_db.checkpoint_frequency.map(|v| v as u64),
553 )
554 .await?;
555 Ok(version)
556 }
557
558 #[tracing::instrument(skip(self), level = "debug")]
559 pub async fn reschedule_streaming_job(
560 &self,
561 job_id: JobId,
562 target: ReschedulePolicy,
563 mut deferred: bool,
564 ) -> MetaResult<()> {
565 tracing::info!("altering parallelism for job {}", job_id);
566 if self.barrier_manager.check_status_running().is_err() {
567 tracing::info!(
568 "alter parallelism is set to deferred mode because the system is in recovery state"
569 );
570 deferred = true;
571 }
572
573 self.stream_manager
574 .reschedule_streaming_job(job_id, target, deferred)
575 .await
576 }
577
578 pub async fn reschedule_streaming_job_backfill_parallelism(
579 &self,
580 job_id: JobId,
581 parallelism: Option<ParallelismPolicy>,
582 mut deferred: bool,
583 ) -> MetaResult<()> {
584 tracing::info!("altering backfill parallelism for job {}", job_id);
585 if self.barrier_manager.check_status_running().is_err() {
586 tracing::info!(
587 "alter backfill parallelism is set to deferred mode because the system is in recovery state"
588 );
589 deferred = true;
590 }
591
592 self.stream_manager
593 .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
594 .await
595 }
596
597 pub async fn reschedule_cdc_table_backfill(
598 &self,
599 job_id: JobId,
600 target: ReschedulePolicy,
601 ) -> MetaResult<()> {
602 tracing::info!("alter CDC table backfill parallelism");
603 if self.barrier_manager.check_status_running().is_err() {
604 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
605 }
606 self.stream_manager
607 .reschedule_cdc_table_backfill(job_id, target)
608 .await
609 }
610
611 pub async fn reschedule_fragments(
612 &self,
613 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
614 ) -> MetaResult<()> {
615 tracing::info!(
616 "altering parallelism for fragments {:?}",
617 fragment_targets.keys()
618 );
619 let fragment_targets = fragment_targets
620 .into_iter()
621 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
622 .collect();
623
624 self.stream_manager
625 .reschedule_fragments(fragment_targets)
626 .await
627 }
628
629 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
630 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
631 .await
632 }
633
634 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
635 self.metadata_manager
636 .catalog_controller
637 .create_schema(schema)
638 .await
639 }
640
641 async fn drop_schema(
642 &self,
643 schema_id: SchemaId,
644 drop_mode: DropMode,
645 ) -> MetaResult<NotificationVersion> {
646 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
647 .await
648 }
649
650 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
652 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
653 .await
654 .context("failed to create source worker")?;
655
656 let (source_id, version) = self
657 .metadata_manager
658 .catalog_controller
659 .create_source(source)
660 .await?;
661 self.source_manager
662 .register_source_with_handle(source_id, handle)
663 .await;
664 Ok(version)
665 }
666
667 async fn drop_source(
668 &self,
669 source_id: SourceId,
670 drop_mode: DropMode,
671 ) -> MetaResult<NotificationVersion> {
672 self.drop_object(ObjectType::Source, source_id, drop_mode)
673 .await
674 }
675
676 async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
677 tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
678
679 let database_id = self
681 .metadata_manager
682 .catalog_controller
683 .get_object_database_id(source_id)
684 .await?;
685
686 self.stream_manager
687 .barrier_scheduler
688 .run_command(database_id, Command::ResetSource { source_id })
689 .await?;
690
691 let version = self
693 .metadata_manager
694 .catalog_controller
695 .notify_frontend_trivial()
696 .await;
697 Ok(version)
698 }
699
700 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
703 self.metadata_manager
704 .catalog_controller
705 .alter_non_shared_source(source)
706 .await
707 }
708
709 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
710 self.metadata_manager
711 .catalog_controller
712 .create_function(function)
713 .await
714 }
715
716 async fn drop_function(
717 &self,
718 function_id: FunctionId,
719 drop_mode: DropMode,
720 ) -> MetaResult<NotificationVersion> {
721 self.drop_object(ObjectType::Function, function_id, drop_mode)
722 .await
723 }
724
725 async fn create_view(
726 &self,
727 view: View,
728 dependencies: HashSet<ObjectId>,
729 ) -> MetaResult<NotificationVersion> {
730 self.metadata_manager
731 .catalog_controller
732 .create_view(view, dependencies)
733 .await
734 }
735
736 async fn drop_view(
737 &self,
738 view_id: ViewId,
739 drop_mode: DropMode,
740 ) -> MetaResult<NotificationVersion> {
741 self.drop_object(ObjectType::View, view_id, drop_mode).await
742 }
743
744 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
745 validate_connection(&connection).await?;
746 self.metadata_manager
747 .catalog_controller
748 .create_connection(connection)
749 .await
750 }
751
752 async fn drop_connection(
753 &self,
754 connection_id: ConnectionId,
755 drop_mode: DropMode,
756 ) -> MetaResult<NotificationVersion> {
757 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
758 .await
759 }
760
761 async fn alter_database_param(
762 &self,
763 database_id: DatabaseId,
764 param: AlterDatabaseParam,
765 ) -> MetaResult<NotificationVersion> {
766 let (version, updated_db) = self
767 .metadata_manager
768 .catalog_controller
769 .alter_database_param(database_id, param)
770 .await?;
771 self.barrier_manager
773 .update_database_barrier(
774 database_id,
775 updated_db.barrier_interval_ms.map(|v| v as u32),
776 updated_db.checkpoint_frequency.map(|v| v as u64),
777 )
778 .await?;
779 Ok(version)
780 }
781
782 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
785 let secret_store_private_key = self
786 .env
787 .opts
788 .secret_store_private_key
789 .clone()
790 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
791
792 let encrypted_payload = SecretEncryption::encrypt(
793 secret_store_private_key.as_slice(),
794 secret.get_value().as_slice(),
795 )
796 .context(format!("failed to encrypt secret {}", secret.name))?;
797 Ok(encrypted_payload
798 .serialize()
799 .context(format!("failed to serialize secret {}", secret.name))?)
800 }
801
802 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
803 let secret_plain_payload = secret.value.clone();
806 let encrypted_payload = self.get_encrypted_payload(&secret)?;
807 secret.value = encrypted_payload;
808
809 self.metadata_manager
810 .catalog_controller
811 .create_secret(secret, secret_plain_payload)
812 .await
813 }
814
815 async fn drop_secret(
816 &self,
817 secret_id: SecretId,
818 drop_mode: DropMode,
819 ) -> MetaResult<NotificationVersion> {
820 self.drop_object(ObjectType::Secret, secret_id, drop_mode)
821 .await
822 }
823
824 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
825 let secret_plain_payload = secret.value.clone();
826 let encrypted_payload = self.get_encrypted_payload(&secret)?;
827 secret.value = encrypted_payload;
828 self.metadata_manager
829 .catalog_controller
830 .alter_secret(secret, secret_plain_payload)
831 .await
832 }
833
834 async fn create_subscription(
835 &self,
836 mut subscription: Subscription,
837 ) -> MetaResult<NotificationVersion> {
838 tracing::debug!("create subscription");
839 let _permit = self
840 .creating_streaming_job_permits
841 .semaphore
842 .acquire()
843 .await
844 .unwrap();
845 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
846 self.metadata_manager
847 .catalog_controller
848 .create_subscription_catalog(&mut subscription)
849 .await?;
850 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
851 tracing::debug!(error = %err.as_report(), "failed to create subscription");
852 let _ = self
853 .metadata_manager
854 .catalog_controller
855 .try_abort_creating_subscription(subscription.id)
856 .await
857 .inspect_err(|e| {
858 tracing::error!(
859 error = %e.as_report(),
860 "failed to abort create subscription after failure"
861 );
862 });
863 return Err(err);
864 }
865
866 let version = self
867 .metadata_manager
868 .catalog_controller
869 .notify_create_subscription(subscription.id)
870 .await?;
871 tracing::debug!("finish create subscription");
872 Ok(version)
873 }
874
875 async fn drop_subscription(
876 &self,
877 subscription_id: SubscriptionId,
878 drop_mode: DropMode,
879 ) -> MetaResult<NotificationVersion> {
880 tracing::debug!("preparing drop subscription");
881 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
882 let subscription = self
883 .metadata_manager
884 .catalog_controller
885 .get_subscription_by_id(subscription_id)
886 .await?;
887 let table_id = subscription.dependent_table_id;
888 let database_id = subscription.database_id;
889 let (_, version) = self
890 .metadata_manager
891 .catalog_controller
892 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
893 .await?;
894 self.stream_manager
895 .drop_subscription(database_id, subscription_id, table_id)
896 .await;
897 tracing::debug!("finish drop subscription");
898 Ok(version)
899 }
900
901 async fn alter_subscription_retention(
902 &self,
903 subscription_id: SubscriptionId,
904 retention_seconds: u64,
905 definition: String,
906 ) -> MetaResult<NotificationVersion> {
907 tracing::debug!("alter subscription retention");
908 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
909 let (version, subscription) = self
910 .metadata_manager
911 .catalog_controller
912 .alter_subscription_retention(subscription_id, retention_seconds, definition)
913 .await?;
914 self.stream_manager
915 .alter_subscription_retention(
916 subscription.database_id,
917 subscription.id,
918 subscription.dependent_table_id,
919 subscription.retention_seconds,
920 )
921 .await?;
922 tracing::debug!("finish alter subscription retention");
923 Ok(version)
924 }
925
926 #[await_tree::instrument]
928 pub(crate) async fn validate_cdc_table(
929 &self,
930 table: &Table,
931 table_fragments: &StreamJobFragments,
932 ) -> MetaResult<()> {
933 let stream_scan_fragment = table_fragments
934 .fragments
935 .values()
936 .filter(|f| {
937 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
938 || f.fragment_type_mask
939 .contains(FragmentTypeFlag::StreamCdcScan)
940 })
941 .exactly_one()
942 .ok()
943 .with_context(|| {
944 format!(
945 "expect exactly one stream scan fragment, got: {:?}",
946 table_fragments.fragments
947 )
948 })?;
949 fn assert_parallelism(
950 distribution_type: PbFragmentDistributionType,
951 node_body: &Option<NodeBody>,
952 ) {
953 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
954 if let Some(o) = node.options
955 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
956 {
957 } else {
959 assert_eq!(
960 distribution_type,
961 PbFragmentDistributionType::Single,
962 "Non-parallelized CDC scan fragment should have Single distribution"
963 );
964 }
965 }
966 }
967 let mut found_cdc_scan = false;
968 match &stream_scan_fragment.nodes.node_body {
969 Some(NodeBody::StreamCdcScan(_)) => {
970 assert_parallelism(
971 stream_scan_fragment.distribution_type,
972 &stream_scan_fragment.nodes.node_body,
973 );
974 if self
975 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
976 .await?
977 {
978 found_cdc_scan = true;
979 }
980 }
981 Some(NodeBody::Project(_)) => {
983 for input in &stream_scan_fragment.nodes.input {
984 assert_parallelism(stream_scan_fragment.distribution_type, &input.node_body);
985 if self
986 .validate_cdc_table_inner(&input.node_body, table.id)
987 .await?
988 {
989 found_cdc_scan = true;
990 }
991 }
992 }
993 _ => {
994 bail!("Unexpected node body for stream cdc scan");
995 }
996 };
997 if !found_cdc_scan {
998 bail!("No stream cdc scan node found in stream scan fragment");
999 }
1000 Ok(())
1001 }
1002
1003 async fn validate_cdc_table_inner(
1004 &self,
1005 node_body: &Option<NodeBody>,
1006 table_id: TableId,
1007 ) -> MetaResult<bool> {
1008 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
1009 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
1010 {
1011 let options_with_secret = WithOptionsSecResolved::new(
1012 cdc_table_desc.connect_properties.clone(),
1013 cdc_table_desc.secret_refs.clone(),
1014 );
1015
1016 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
1017 props.init_from_pb_cdc_table_desc(cdc_table_desc);
1018
1019 let _enumerator = props
1021 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
1022 .await?;
1023
1024 tracing::debug!(?table_id, "validate cdc table success");
1025 Ok(true)
1026 } else {
1027 Ok(false)
1028 }
1029 }
1030
1031 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
1032 let migrated = self
1033 .metadata_manager
1034 .catalog_controller
1035 .has_table_been_migrated(table_id)
1036 .await?;
1037 if !migrated {
1038 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
1039 } else {
1040 Ok(())
1041 }
1042 }
1043
1044 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1047 pub async fn create_streaming_job(
1048 &self,
1049 mut streaming_job: StreamingJob,
1050 fragment_graph: StreamFragmentGraphProto,
1051 dependencies: HashSet<ObjectId>,
1052 resource_type: streaming_job_resource_type::ResourceType,
1053 if_not_exists: bool,
1054 refresh_interval_sec: Option<u64>,
1055 ) -> MetaResult<NotificationVersion> {
1056 if let StreamingJob::Sink(sink) = &streaming_job
1057 && let Some(target_table) = sink.target_table
1058 {
1059 self.validate_table_for_sink(target_table).await?;
1060 }
1061 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1062 let adaptive_parallelism_strategy =
1063 (!fragment_graph.adaptive_parallelism_strategy.is_empty()).then(|| {
1064 parse_strategy(&fragment_graph.adaptive_parallelism_strategy)
1065 .expect("adaptive parallelism strategy should be validated in frontend")
1066 });
1067 let backfill_adaptive_parallelism_strategy = (!fragment_graph
1068 .backfill_adaptive_parallelism_strategy
1069 .is_empty())
1070 .then(|| {
1071 parse_strategy(&fragment_graph.backfill_adaptive_parallelism_strategy)
1072 .expect("backfill adaptive parallelism strategy should be validated in frontend")
1073 });
1074 let streaming_job_model = match self
1075 .metadata_manager
1076 .catalog_controller
1077 .create_job_catalog(
1078 &mut streaming_job,
1079 &ctx,
1080 &fragment_graph.parallelism,
1081 fragment_graph.max_parallelism as _,
1082 dependencies,
1083 resource_type.clone(),
1084 &fragment_graph.backfill_parallelism,
1085 adaptive_parallelism_strategy,
1086 backfill_adaptive_parallelism_strategy,
1087 refresh_interval_sec,
1088 )
1089 .await
1090 {
1091 Ok(model) => model,
1092 Err(meta_err) => {
1093 if !if_not_exists {
1094 return Err(meta_err);
1095 }
1096 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1097 if streaming_job.create_type() == CreateType::Foreground {
1098 let database_id = streaming_job.database_id();
1099 self.metadata_manager
1100 .wait_streaming_job_finished(database_id, *job_id)
1101 .await
1102 } else {
1103 Ok(IGNORED_NOTIFICATION_VERSION)
1104 }
1105 } else {
1106 Err(meta_err)
1107 };
1108 }
1109 };
1110 let job_id = streaming_job.id();
1111 tracing::debug!(
1112 id = %job_id,
1113 definition = streaming_job.definition(),
1114 create_type = streaming_job.create_type().as_str_name(),
1115 job_type = ?streaming_job.job_type(),
1116 "starting streaming job",
1117 );
1118 let permit = self
1120 .creating_streaming_job_permits
1121 .semaphore
1122 .clone()
1123 .acquire_owned()
1124 .instrument_await("acquire_creating_streaming_job_permit")
1125 .await
1126 .unwrap();
1127 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1128
1129 let name = streaming_job.name();
1130 let definition = streaming_job.definition();
1131 let source_id = match &streaming_job {
1132 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1133 _ => None,
1134 };
1135 let create_result = match self
1138 .generate_streaming_job(
1139 ctx,
1140 streaming_job,
1141 fragment_graph,
1142 resource_type,
1143 streaming_job_model,
1144 )
1145 .await
1146 {
1147 Ok((stream_job_fragments, ctx)) => self
1148 .stream_manager
1149 .create_streaming_job(stream_job_fragments, ctx, permit)
1150 .await
1151 .map_err(|err| (err, true)),
1152 Err(err) => Err((err, false)),
1153 };
1154
1155 match create_result {
1156 Ok(version) => Ok(version),
1157 Err((err, is_cancelled)) => {
1158 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1159 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1160 id: job_id,
1161 name,
1162 definition,
1163 error: err.as_report().to_string(),
1164 };
1165 self.env.event_log_manager_ref().add_event_logs(vec![
1166 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1167 ]);
1168 let (aborted, _) = self
1169 .metadata_manager
1170 .catalog_controller
1171 .try_abort_creating_streaming_job(job_id, is_cancelled)
1172 .await?;
1173 if aborted {
1174 tracing::warn!(id = %job_id, is_cancelled, "aborted streaming job");
1175 if let Some(source_id) = source_id {
1177 self.source_manager
1178 .apply_source_change(SourceChange::DropSource {
1179 dropped_source_ids: vec![source_id],
1180 })
1181 .await;
1182 }
1183 }
1184 Err(err)
1185 }
1186 }
1187 }
1188
1189 #[await_tree::instrument(boxed)]
1190 async fn generate_streaming_job(
1191 &self,
1192 ctx: StreamContext,
1193 mut streaming_job: StreamingJob,
1194 fragment_graph: StreamFragmentGraphProto,
1195 resource_type: streaming_job_resource_type::ResourceType,
1196 streaming_job_model: streaming_job::Model,
1197 ) -> MetaResult<(StreamJobFragmentsToCreate, CreateStreamingJobContext)> {
1198 let mut fragment_graph =
1199 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1200 streaming_job.set_info_from_graph(&fragment_graph);
1201
1202 let incomplete_internal_tables = fragment_graph
1204 .incomplete_internal_tables()
1205 .into_values()
1206 .collect_vec();
1207 let table_id_map = self
1208 .metadata_manager
1209 .catalog_controller
1210 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1211 .await?;
1212 fragment_graph.refill_internal_table_ids(table_id_map);
1213
1214 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1216 let (ctx, stream_job_fragments) = self
1217 .build_stream_job(
1218 ctx,
1219 streaming_job,
1220 fragment_graph,
1221 resource_type,
1222 streaming_job_model,
1223 )
1224 .await?;
1225
1226 let streaming_job = &ctx.streaming_job;
1227
1228 match streaming_job {
1229 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1230 self.validate_cdc_table(table, &stream_job_fragments)
1231 .await?;
1232 }
1233 StreamingJob::Table(Some(source), ..) => {
1234 self.source_manager.register_source(source).await?;
1236 let connector_name = source
1237 .get_with_properties()
1238 .get(UPSTREAM_SOURCE_KEY)
1239 .cloned();
1240 let attr = source.info.as_ref().map(|source_info| {
1241 jsonbb::json!({
1242 "format": source_info.format().as_str_name(),
1243 "encode": source_info.row_encode().as_str_name(),
1244 })
1245 });
1246 report_create_object(
1247 streaming_job.id(),
1248 "source",
1249 PbTelemetryDatabaseObject::Source,
1250 connector_name,
1251 attr,
1252 );
1253 }
1254 StreamingJob::Sink(sink) => {
1255 if sink.auto_refresh_schema_from_table.is_some() {
1256 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?;
1257 }
1258 validate_sink(sink).await?;
1260 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1261 let attr = sink.format_desc.as_ref().map(|sink_info| {
1262 jsonbb::json!({
1263 "format": sink_info.format().as_str_name(),
1264 "encode": sink_info.encode().as_str_name(),
1265 })
1266 });
1267 report_create_object(
1268 streaming_job.id(),
1269 "sink",
1270 PbTelemetryDatabaseObject::Sink,
1271 connector_name,
1272 attr,
1273 );
1274 }
1275 StreamingJob::Source(source) => {
1276 self.source_manager.register_source(source).await?;
1278 let connector_name = source
1279 .get_with_properties()
1280 .get(UPSTREAM_SOURCE_KEY)
1281 .cloned();
1282 let attr = source.info.as_ref().map(|source_info| {
1283 jsonbb::json!({
1284 "format": source_info.format().as_str_name(),
1285 "encode": source_info.row_encode().as_str_name(),
1286 })
1287 });
1288 report_create_object(
1289 streaming_job.id(),
1290 "source",
1291 PbTelemetryDatabaseObject::Source,
1292 connector_name,
1293 attr,
1294 );
1295 }
1296 _ => {}
1297 }
1298
1299 let backfill_orders = ctx.fragment_backfill_ordering.to_meta_model();
1300 self.metadata_manager
1301 .catalog_controller
1302 .prepare_stream_job_fragments(
1303 &stream_job_fragments,
1304 streaming_job,
1305 false,
1306 Some(backfill_orders),
1307 )
1308 .await?;
1309
1310 Ok((stream_job_fragments, ctx))
1311 }
1312
1313 pub async fn drop_object(
1315 &self,
1316 object_type: ObjectType,
1317 object_id: impl Into<ObjectId>,
1318 drop_mode: DropMode,
1319 ) -> MetaResult<NotificationVersion> {
1320 let object_id = object_id.into();
1321 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1324 let _source_tick_pause_guard = self.source_manager.pause_tick().await;
1325
1326 let (release_ctx, version) = self
1327 .metadata_manager
1328 .catalog_controller
1329 .drop_object(object_type, object_id, drop_mode)
1330 .await?;
1331
1332 if object_type == ObjectType::Source {
1333 self.env
1334 .notification_manager_ref()
1335 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1336 }
1337
1338 let ReleaseContext {
1339 database_id,
1340 removed_streaming_job_ids,
1341 removed_state_table_ids,
1342 removed_source_ids,
1343 removed_secret_ids: secret_ids,
1344 removed_source_fragments,
1345 removed_fragments,
1346 removed_sink_fragment_by_targets,
1347 removed_iceberg_table_sinks,
1348 } = release_ctx;
1349
1350 self.env
1354 .notification_manager_ref()
1355 .notify_serving_fragment_mapping_delete(
1356 removed_fragments.iter().map(|id| *id as _).collect(),
1357 );
1358
1359 self.stream_manager
1360 .drop_streaming_jobs(
1361 database_id,
1362 removed_streaming_job_ids,
1363 removed_state_table_ids,
1364 removed_sink_fragment_by_targets
1365 .into_iter()
1366 .map(|(target, sinks)| {
1367 (target as _, sinks.into_iter().map(|id| id as _).collect())
1368 })
1369 .collect(),
1370 )
1371 .await;
1372
1373 self.source_manager
1376 .apply_source_change(SourceChange::DropSource {
1377 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1378 })
1379 .await;
1380
1381 let dropped_source_fragments = removed_source_fragments;
1384 self.source_manager
1385 .apply_source_change(SourceChange::DropMv {
1386 dropped_source_fragments,
1387 })
1388 .await;
1389
1390 let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1392 .iter()
1393 .map(|sink| sink.id)
1394 .collect();
1395
1396 for sink in removed_iceberg_table_sinks {
1397 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1398 .expect("Iceberg sink should be valid");
1399 let iceberg_sink =
1400 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1401 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1402 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1403 tracing::info!(
1404 "dropping iceberg table {} for dropped sink",
1405 table_identifier
1406 );
1407
1408 let _ = iceberg_catalog
1409 .drop_table(&table_identifier)
1410 .await
1411 .inspect_err(|err| {
1412 tracing::error!(
1413 "failed to drop iceberg table {} during cleanup: {}",
1414 table_identifier,
1415 err.as_report()
1416 );
1417 });
1418 }
1419 }
1420
1421 if !iceberg_sink_ids.is_empty() {
1423 self.sink_manager
1424 .stop_sink_coordinator(iceberg_sink_ids.clone())
1425 .await;
1426
1427 for sink_id in iceberg_sink_ids {
1428 self.iceberg_compaction_manager
1429 .clear_iceberg_maintenance_by_sink_id(sink_id);
1430 }
1431 }
1432
1433 for secret in secret_ids {
1435 LocalSecretManager::global().remove_secret(secret);
1436 }
1437 Ok(version)
1438 }
1439
1440 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1442 pub async fn replace_job(
1443 &self,
1444 mut streaming_job: StreamingJob,
1445 fragment_graph: StreamFragmentGraphProto,
1446 ) -> MetaResult<NotificationVersion> {
1447 match &streaming_job {
1448 StreamingJob::Table(..)
1449 | StreamingJob::Source(..)
1450 | StreamingJob::MaterializedView(..) => {}
1451 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1452 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1453 }
1454 }
1455
1456 let job_id = streaming_job.id();
1457
1458 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1459 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1460
1461 let original_max_parallelism = self
1463 .metadata_manager
1464 .get_job_max_parallelism(streaming_job.id())
1465 .await?;
1466 let fragment_graph = PbStreamFragmentGraph {
1467 max_parallelism: original_max_parallelism as _,
1468 ..fragment_graph
1469 };
1470
1471 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1473 streaming_job.set_info_from_graph(&fragment_graph);
1474
1475 let streaming_job = streaming_job;
1477
1478 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1479 let auto_refresh_schema_sinks = self
1480 .metadata_manager
1481 .catalog_controller
1482 .get_sink_auto_refresh_schema_from(table.id)
1483 .await?;
1484 if !auto_refresh_schema_sinks.is_empty() {
1485 let original_table_columns = self
1486 .metadata_manager
1487 .catalog_controller
1488 .get_table_columns(table.id)
1489 .await?;
1490 let original_table_column_ids: HashSet<_> = original_table_columns
1492 .iter()
1493 .map(|col| col.column_id())
1494 .collect();
1495 let new_table_column_ids: HashSet<_> = table
1496 .columns
1497 .iter()
1498 .map(|col| ColumnId::new(col.column_desc.as_ref().unwrap().column_id as _))
1499 .collect();
1500 let newly_added_columns = table
1501 .columns
1502 .iter()
1503 .filter(|col| {
1504 !original_table_column_ids.contains(&ColumnId::new(
1505 col.column_desc.as_ref().unwrap().column_id as _,
1506 ))
1507 })
1508 .map(|col| ColumnCatalog::from(col.clone()))
1509 .collect_vec();
1510 let removed_columns = original_table_columns
1511 .iter()
1512 .filter(|col| !new_table_column_ids.contains(&col.column_id()))
1513 .cloned()
1514 .collect_vec();
1515 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1516 for sink in auto_refresh_schema_sinks {
1517 let sink_job_fragments = self
1518 .metadata_manager
1519 .get_job_fragments_by_id(sink.id.as_job_id())
1520 .await?;
1521 if sink_job_fragments.fragments.len() != 1 {
1522 return Err(anyhow!(
1523 "auto schema refresh sink must have only one fragment, but got {}",
1524 sink_job_fragments.fragments.len()
1525 )
1526 .into());
1527 }
1528 let sink_ctx = sink_job_fragments.ctx;
1529 let original_sink_fragment =
1530 sink_job_fragments.fragments.into_values().next().unwrap();
1531 let (new_sink_fragment, new_schema, new_log_store_table) =
1532 rewrite_refresh_schema_sink_fragment(
1533 &original_sink_fragment,
1534 &sink,
1535 &newly_added_columns,
1536 &removed_columns,
1537 table,
1538 fragment_graph.table_fragment_id(),
1539 self.env.id_gen_manager(),
1540 )?;
1541
1542 let streaming_job = StreamingJob::Sink(sink);
1543
1544 let tmp_sink_model = self
1545 .metadata_manager
1546 .catalog_controller
1547 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1548 .await?;
1549 let tmp_sink_id = tmp_sink_model.job_id.as_sink_id();
1550 let StreamingJob::Sink(sink) = streaming_job else {
1551 unreachable!()
1552 };
1553
1554 sinks.push(AutoRefreshSchemaSinkContext {
1555 tmp_sink_id,
1556 original_sink: sink,
1557 original_fragment: original_sink_fragment,
1558 new_schema,
1559 newly_add_fields: newly_added_columns
1560 .iter()
1561 .map(|col| Field::from(&col.column_desc))
1562 .collect(),
1563 removed_column_names: removed_columns
1564 .iter()
1565 .map(|col| col.name.clone())
1566 .collect(),
1567 new_fragment: new_sink_fragment,
1568 new_log_store_table: new_log_store_table.map(Box::new),
1569 ctx: sink_ctx,
1570 });
1571 }
1572 Some(sinks)
1573 } else {
1574 None
1575 }
1576 } else {
1577 None
1578 };
1579
1580 let streaming_job_model = self
1581 .metadata_manager
1582 .catalog_controller
1583 .create_job_catalog_for_replace(
1584 &streaming_job,
1585 Some(&ctx),
1586 fragment_graph.specified_parallelism().as_ref(),
1587 Some(fragment_graph.max_parallelism()),
1588 )
1589 .await?;
1590 let tmp_id = streaming_job_model.job_id;
1591
1592 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1593 sinks
1594 .iter()
1595 .map(|sink| sink.tmp_sink_id.as_object_id())
1596 .collect_vec()
1597 });
1598
1599 tracing::debug!(id = %job_id, "building replace streaming job");
1600 let mut updated_sink_catalogs = vec![];
1601
1602 let mut drop_table_connector_ctx = None;
1603 let result: MetaResult<_> = try {
1604 let (mut ctx, mut stream_job_fragments) = self
1605 .build_replace_job(
1606 ctx,
1607 &streaming_job,
1608 fragment_graph,
1609 tmp_id,
1610 auto_refresh_schema_sinks,
1611 streaming_job_model,
1612 )
1613 .await?;
1614 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1615 let auto_refresh_schema_sink_finish_ctx =
1616 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1617 sinks
1618 .iter()
1619 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1620 tmp_sink_id: sink.tmp_sink_id,
1621 original_sink_id: sink.original_sink.id,
1622 columns: sink.new_schema.clone(),
1623 new_log_store_table: sink.new_log_store_table.clone(),
1624 })
1625 .collect()
1626 });
1627
1628 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1630 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1631 let upstream_infos = self
1632 .metadata_manager
1633 .catalog_controller
1634 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1635 .await?;
1636 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1637
1638 for upstream_info in &upstream_infos {
1639 let upstream_fragment_id = upstream_info.sink_fragment_id;
1640 ctx.upstream_fragment_downstreams
1641 .entry(upstream_fragment_id)
1642 .or_default()
1643 .push(upstream_info.new_sink_downstream.clone());
1644 if upstream_info.sink_original_target_columns.is_empty() {
1645 updated_sink_catalogs.push(upstream_info.sink_id);
1646 }
1647 }
1648 }
1649
1650 let replace_upstream = ctx.replace_upstream.clone();
1651
1652 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1653 let empty_downstreams = FragmentDownstreamRelation::default();
1654 for sink in sinks {
1655 self.metadata_manager
1656 .catalog_controller
1657 .prepare_streaming_job(
1658 sink.tmp_sink_id.as_job_id(),
1659 || [&sink.new_fragment].into_iter(),
1660 &empty_downstreams,
1661 true,
1662 None,
1663 None,
1664 )
1665 .await?;
1666 }
1667 }
1668
1669 self.metadata_manager
1670 .catalog_controller
1671 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1672 .await?;
1673
1674 self.stream_manager
1675 .replace_stream_job(stream_job_fragments, ctx)
1676 .await?;
1677 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1678 };
1679
1680 match result {
1681 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1682 let version = self
1683 .metadata_manager
1684 .catalog_controller
1685 .finish_replace_streaming_job(
1686 tmp_id,
1687 streaming_job,
1688 replace_upstream,
1689 SinkIntoTableContext {
1690 updated_sink_catalogs,
1691 },
1692 drop_table_connector_ctx.as_ref(),
1693 auto_refresh_schema_sink_finish_ctx,
1694 )
1695 .await?;
1696 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1697 self.source_manager
1698 .apply_source_change(SourceChange::DropSource {
1699 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1700 })
1701 .await;
1702 }
1703 Ok(version)
1704 }
1705 Err(err) => {
1706 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1707 let _ = self.metadata_manager
1708 .catalog_controller
1709 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1710 .await.inspect_err(|err| {
1711 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1712 });
1713 Err(err)
1714 }
1715 }
1716 }
1717
1718 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1719 )]
1720 async fn drop_streaming_job(
1721 &self,
1722 job_id: StreamingJobId,
1723 drop_mode: DropMode,
1724 ) -> MetaResult<NotificationVersion> {
1725 let (object_id, object_type) = match job_id {
1726 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1727 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1728 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1729 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1730 };
1731
1732 let job_status = self
1733 .metadata_manager
1734 .catalog_controller
1735 .get_streaming_job_status(job_id.id())
1736 .await?;
1737 let version = match job_status {
1738 JobStatus::Initial => {
1739 self.metadata_manager
1740 .catalog_controller
1741 .try_abort_creating_streaming_job(job_id.id(), true)
1742 .await?;
1743 IGNORED_NOTIFICATION_VERSION
1744 }
1745 JobStatus::Creating => {
1746 self.stream_manager
1747 .cancel_streaming_jobs(vec![job_id.id()])
1748 .await?;
1749 IGNORED_NOTIFICATION_VERSION
1750 }
1751 JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1752 };
1753
1754 Ok(version)
1755 }
1756
1757 #[await_tree::instrument]
1763 pub(crate) async fn build_stream_job(
1764 &self,
1765 stream_ctx: StreamContext,
1766 mut stream_job: StreamingJob,
1767 fragment_graph: StreamFragmentGraph,
1768 resource_type: streaming_job_resource_type::ResourceType,
1769 streaming_job_model: streaming_job::Model,
1770 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1771 let id = stream_job.id();
1772 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1773 Self::validate_specified_parallelism(
1774 fragment_graph.specified_parallelism(),
1775 fragment_graph.specified_backfill_parallelism(),
1776 max_parallelism,
1777 )?;
1778
1779 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1781
1782 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1786 fragment_graph.collect_snapshot_backfill_info()?;
1787 assert!(
1788 snapshot_backfill_info
1789 .iter()
1790 .chain([&cross_db_snapshot_backfill_info])
1791 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1792 .all(|backfill_epoch| backfill_epoch.is_none()),
1793 "should not set backfill epoch when initially build the job: {:?} {:?}",
1794 snapshot_backfill_info,
1795 cross_db_snapshot_backfill_info
1796 );
1797
1798 let locality_fragment_state_table_mapping =
1799 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1800
1801 self.metadata_manager
1803 .catalog_controller
1804 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1805 .await?;
1806
1807 let upstream_table_ids = fragment_graph
1808 .dependent_table_ids()
1809 .iter()
1810 .filter(|id| {
1811 !cross_db_snapshot_backfill_info
1812 .upstream_mv_table_id_to_backfill_epoch
1813 .contains_key(*id)
1814 })
1815 .cloned()
1816 .collect();
1817
1818 let upstream_root_fragments = self
1819 .metadata_manager
1820 .get_upstream_root_fragments(&upstream_table_ids)
1821 .await?;
1822
1823 if snapshot_backfill_info.is_some() {
1824 match stream_job {
1825 StreamingJob::MaterializedView(_)
1826 | StreamingJob::Sink(_)
1827 | StreamingJob::Index(_, _) => {}
1828 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1829 return Err(
1830 anyhow!("snapshot_backfill not enabled for table and source").into(),
1831 );
1832 }
1833 }
1834 }
1835
1836 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1837 fragment_graph,
1838 FragmentGraphUpstreamContext {
1839 upstream_root_fragments,
1840 },
1841 (&stream_job).into(),
1842 )?;
1843 let resource_group = if let Some(group) = resource_type.resource_group() {
1844 group
1845 } else {
1846 self.metadata_manager
1847 .get_database_resource_group(stream_job.database_id())
1848 .await?
1849 };
1850 let is_serverless_backfill = matches!(
1851 &resource_type,
1852 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1853 );
1854
1855 let actor_graph_builder = ActorGraphBuilder::new(complete_graph)?;
1857
1858 let ActorGraphBuildResult {
1859 graph,
1860 downstream_fragment_relations,
1861 upstream_fragment_downstreams,
1862 replace_upstream,
1863 } = actor_graph_builder.generate_graph()?;
1864 assert!(replace_upstream.is_empty());
1865
1866 let stream_job_fragments =
1871 StreamJobFragments::new(id, graph, stream_ctx.clone(), max_parallelism.get());
1872
1873 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1874 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1875 }
1876
1877 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1878 && let Ok(table_id) = sink.get_target_table()
1879 {
1880 let tables = self
1881 .metadata_manager
1882 .get_table_catalog_by_ids(&[*table_id])
1883 .await?;
1884 let target_table = tables
1885 .first()
1886 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1887 let sink_fragment = stream_job_fragments
1888 .sink_fragment()
1889 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1890 let mview_fragment_id = self
1891 .metadata_manager
1892 .catalog_controller
1893 .get_mview_fragment_by_id(table_id.as_job_id())
1894 .await?;
1895 let upstream_sink_info = build_upstream_sink_info(
1896 sink.id,
1897 sink.original_target_columns.clone(),
1898 sink_fragment.fragment_id as _,
1899 target_table,
1900 mview_fragment_id,
1901 )?;
1902 Some(upstream_sink_info)
1903 } else {
1904 None
1905 };
1906
1907 let mut cdc_table_snapshot_splits = None;
1908 if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1909 && let Some((_, stream_cdc_scan)) =
1910 parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1911 {
1912 {
1913 let splits = try_init_parallel_cdc_table_snapshot_splits(
1915 table.id,
1916 stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1917 self.env.meta_store_ref(),
1918 stream_cdc_scan.options.as_ref().unwrap(),
1919 self.env.opts.cdc_table_split_init_insert_batch_size,
1920 self.env.opts.cdc_table_split_init_sleep_interval_splits,
1921 self.env.opts.cdc_table_split_init_sleep_duration_millis,
1922 )
1923 .await?;
1924 cdc_table_snapshot_splits = Some(splits);
1925 }
1926 }
1927
1928 let ctx = CreateStreamingJobContext {
1929 upstream_fragment_downstreams,
1930 database_resource_group: resource_group,
1931 definition: stream_job.definition(),
1932 create_type: stream_job.create_type(),
1933 job_type: (&stream_job).into(),
1934 streaming_job: stream_job,
1935 new_upstream_sink,
1936 option: CreateStreamingJobOption {},
1937 snapshot_backfill_info,
1938 cross_db_snapshot_backfill_info,
1939 fragment_backfill_ordering,
1940 locality_fragment_state_table_mapping,
1941 cdc_table_snapshot_splits,
1942 is_serverless_backfill,
1943 streaming_job_model: streaming_job_model.clone(),
1944 refresh_interval_sec: streaming_job_model.refresh_interval_sec.map(|s| s as u64),
1945 };
1946
1947 Ok((
1948 ctx,
1949 StreamJobFragmentsToCreate {
1950 inner: stream_job_fragments,
1951 downstreams: downstream_fragment_relations,
1952 },
1953 ))
1954 }
1955
1956 pub(crate) async fn build_replace_job(
1962 &self,
1963 stream_ctx: StreamContext,
1964 stream_job: &StreamingJob,
1965 mut fragment_graph: StreamFragmentGraph,
1966 tmp_job_id: JobId,
1967 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1968 streaming_job_model: streaming_job::Model,
1969 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1970 match &stream_job {
1971 StreamingJob::Table(..)
1972 | StreamingJob::Source(..)
1973 | StreamingJob::MaterializedView(..) => {}
1974 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1975 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1976 }
1977 }
1978
1979 let id = stream_job.id();
1980
1981 let mut drop_table_associated_source_id = None;
1983 if let StreamingJob::Table(None, _, _) = &stream_job {
1984 drop_table_associated_source_id = self
1985 .metadata_manager
1986 .get_table_associated_source_id(id.as_mv_table_id())
1987 .await?;
1988 }
1989
1990 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1991 let old_internal_table_ids = old_fragments.internal_table_ids();
1992
1993 let mut drop_table_connector_ctx = None;
1995 if let Some(to_remove_source_id) = drop_table_associated_source_id {
1996 debug_assert!(old_internal_table_ids.len() == 1);
1998
1999 drop_table_connector_ctx = Some(DropTableConnectorContext {
2000 to_change_streaming_job_id: id,
2003 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
2005 });
2006 } else if stream_job.is_materialized_view() {
2007 let old_fragments_upstreams = self
2010 .metadata_manager
2011 .catalog_controller
2012 .upstream_fragments(old_fragments.fragment_ids())
2013 .await?;
2014
2015 let old_state_graph =
2016 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2017 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2018 let result = state_match::match_graph(&new_state_graph, &old_state_graph)
2019 .context("incompatible altering on the streaming job states")?;
2020
2021 fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2022 fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2023 } else {
2024 let old_internal_tables = self
2027 .metadata_manager
2028 .get_table_catalog_by_ids(&old_internal_table_ids)
2029 .await?;
2030 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2031 }
2032
2033 let original_root_fragment = old_fragments
2036 .root_fragment()
2037 .expect("root fragment not found");
2038
2039 let job_type = StreamingJobType::from(stream_job);
2040
2041 let mut downstream_fragments = self.metadata_manager.get_downstream_fragments(id).await?;
2043
2044 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2045 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2046 .iter()
2047 .map(|sink| sink.original_fragment.fragment_id)
2048 .collect();
2049 for (_, downstream_fragment) in &mut downstream_fragments {
2050 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2051 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2052 }) {
2053 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2054 *downstream_fragment = sink.new_fragment.clone();
2057 }
2058 }
2059 assert!(remaining_fragment.is_empty());
2060 }
2061
2062 let complete_graph = match &job_type {
2064 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2065 CompleteStreamFragmentGraph::with_downstreams(
2066 fragment_graph,
2067 FragmentGraphDownstreamContext {
2068 original_root_fragment_id: original_root_fragment.fragment_id,
2069 downstream_fragments,
2070 },
2071 job_type,
2072 )?
2073 }
2074 StreamingJobType::Table(TableJobType::SharedCdcSource)
2075 | StreamingJobType::MaterializedView => {
2076 let upstream_root_fragments = self
2078 .metadata_manager
2079 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2080 .await?;
2081
2082 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2083 fragment_graph,
2084 FragmentGraphUpstreamContext {
2085 upstream_root_fragments,
2086 },
2087 FragmentGraphDownstreamContext {
2088 original_root_fragment_id: original_root_fragment.fragment_id,
2089 downstream_fragments,
2090 },
2091 job_type,
2092 )?
2093 }
2094 _ => unreachable!(),
2095 };
2096
2097 let resource_group = self
2098 .metadata_manager
2099 .get_database_resource_group(stream_job.database_id())
2100 .await?;
2101
2102 let actor_graph_builder = ActorGraphBuilder::new(complete_graph)?;
2103
2104 let ActorGraphBuildResult {
2105 graph,
2106 downstream_fragment_relations,
2107 upstream_fragment_downstreams,
2108 mut replace_upstream,
2109 } = actor_graph_builder.generate_graph()?;
2110
2111 if matches!(
2113 job_type,
2114 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2115 ) {
2116 assert!(upstream_fragment_downstreams.is_empty());
2117 }
2118
2119 let stream_job_fragments =
2123 StreamJobFragments::new(tmp_job_id, graph, stream_ctx, old_fragments.max_parallelism);
2124
2125 if let Some(sinks) = &auto_refresh_schema_sinks {
2126 for sink in sinks {
2127 replace_upstream
2128 .remove(&sink.new_fragment.fragment_id)
2129 .expect("should exist");
2130 }
2131 }
2132
2133 let ctx = ReplaceStreamJobContext {
2137 old_fragments,
2138 replace_upstream,
2139 upstream_fragment_downstreams,
2140 streaming_job: stream_job.clone(),
2141 database_resource_group: resource_group,
2142 tmp_id: tmp_job_id,
2143 drop_table_connector_ctx,
2144 auto_refresh_schema_sinks,
2145 streaming_job_model,
2146 };
2147
2148 Ok((
2149 ctx,
2150 StreamJobFragmentsToCreate {
2151 inner: stream_job_fragments,
2152 downstreams: downstream_fragment_relations,
2153 },
2154 ))
2155 }
2156
2157 async fn alter_name(
2158 &self,
2159 relation: alter_name_request::Object,
2160 new_name: &str,
2161 ) -> MetaResult<NotificationVersion> {
2162 let (obj_type, id): (ObjectType, ObjectId) = match relation {
2163 alter_name_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2164 alter_name_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2165 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id.into()),
2166 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2167 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2168 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2169 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2170 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2171 };
2172 self.metadata_manager
2173 .catalog_controller
2174 .alter_name(obj_type, id, new_name)
2175 .await
2176 }
2177
2178 async fn alter_swap_rename(
2179 &self,
2180 object: alter_swap_rename_request::Object,
2181 ) -> MetaResult<NotificationVersion> {
2182 let (obj_type, src_id, dst_id) = match object {
2183 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2184 alter_swap_rename_request::Object::Table(objs) => {
2185 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2186 (ObjectType::Table, src_id, dst_id)
2187 }
2188 alter_swap_rename_request::Object::View(objs) => {
2189 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2190 (ObjectType::View, src_id, dst_id)
2191 }
2192 alter_swap_rename_request::Object::Source(objs) => {
2193 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2194 (ObjectType::Source, src_id, dst_id)
2195 }
2196 alter_swap_rename_request::Object::Sink(objs) => {
2197 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2198 (ObjectType::Sink, src_id, dst_id)
2199 }
2200 alter_swap_rename_request::Object::Subscription(objs) => {
2201 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2202 (ObjectType::Subscription, src_id, dst_id)
2203 }
2204 };
2205
2206 self.metadata_manager
2207 .catalog_controller
2208 .alter_swap_rename(obj_type, src_id, dst_id)
2209 .await
2210 }
2211
2212 async fn alter_owner(
2213 &self,
2214 object: Object,
2215 owner_id: UserId,
2216 ) -> MetaResult<NotificationVersion> {
2217 let (obj_type, id): (ObjectType, ObjectId) = match object {
2218 Object::TableId(id) => (ObjectType::Table, id.into()),
2219 Object::ViewId(id) => (ObjectType::View, id.into()),
2220 Object::SourceId(id) => (ObjectType::Source, id.into()),
2221 Object::SinkId(id) => (ObjectType::Sink, id.into()),
2222 Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2223 Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2224 Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2225 Object::ConnectionId(id) => (ObjectType::Connection, id.into()),
2226 Object::FunctionId(id) => (ObjectType::Function, id.into()),
2227 Object::SecretId(id) => (ObjectType::Secret, id.into()),
2228 };
2229 self.metadata_manager
2230 .catalog_controller
2231 .alter_owner(obj_type, id, owner_id as _)
2232 .await
2233 }
2234
2235 async fn alter_set_schema(
2236 &self,
2237 object: alter_set_schema_request::Object,
2238 new_schema_id: SchemaId,
2239 ) -> MetaResult<NotificationVersion> {
2240 let (obj_type, id): (ObjectType, ObjectId) = match object {
2241 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2242 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2243 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2244 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2245 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id.into()),
2246 alter_set_schema_request::Object::ConnectionId(id) => {
2247 (ObjectType::Connection, id.into())
2248 }
2249 alter_set_schema_request::Object::SubscriptionId(id) => {
2250 (ObjectType::Subscription, id.into())
2251 }
2252 };
2253 self.metadata_manager
2254 .catalog_controller
2255 .alter_schema(obj_type, id, new_schema_id)
2256 .await
2257 }
2258
2259 pub async fn wait(&self, job_id: Option<JobId>) -> MetaResult<WaitVersion> {
2260 if let Some(job_id) = job_id {
2261 let database_id = self
2262 .metadata_manager
2263 .catalog_controller
2264 .get_object_database_id(job_id)
2265 .await?;
2266 let catalog_version = self
2267 .metadata_manager
2268 .wait_streaming_job_finished(database_id, job_id)
2269 .await?;
2270 let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2271 return Ok(WaitVersion {
2272 catalog_version,
2273 hummock_version_id,
2274 });
2275 }
2276
2277 let timeout_ms = 2 * 60 * 60 * 1000;
2278 let poll_interval = Duration::from_millis(100);
2279 for _ in 0..(timeout_ms / poll_interval.as_millis() as usize) {
2280 let background_jobs = self
2281 .metadata_manager
2282 .catalog_controller
2283 .list_background_creating_jobs(true, None)
2284 .await?;
2285 if background_jobs.is_empty() {
2286 let catalog_version = self
2287 .metadata_manager
2288 .catalog_controller
2289 .notify_frontend_trivial()
2290 .await;
2291 let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2292 return Ok(WaitVersion {
2293 catalog_version,
2294 hummock_version_id,
2295 });
2296 }
2297
2298 sleep(poll_interval).await;
2299 }
2300 Err(MetaError::cancelled(format!(
2301 "timeout after {timeout_ms}ms"
2302 )))
2303 }
2304
2305 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2306 self.metadata_manager
2307 .catalog_controller
2308 .comment_on(comment)
2309 .await
2310 }
2311
2312 async fn alter_streaming_job_config(
2313 &self,
2314 job_id: JobId,
2315 entries_to_add: HashMap<String, String>,
2316 keys_to_remove: Vec<String>,
2317 ) -> MetaResult<NotificationVersion> {
2318 self.metadata_manager
2319 .catalog_controller
2320 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2321 .await
2322 }
2323}
2324
2325fn report_create_object(
2326 job_id: JobId,
2327 event_name: &str,
2328 obj_type: PbTelemetryDatabaseObject,
2329 connector_name: Option<String>,
2330 attr_info: Option<jsonbb::Value>,
2331) {
2332 report_event(
2333 PbTelemetryEventStage::CreateStreamJob,
2334 event_name,
2335 job_id.as_raw_id() as _,
2336 connector_name,
2337 Some(obj_type),
2338 attr_info,
2339 );
2340}
2341
2342pub fn build_upstream_sink_info(
2343 sink_id: SinkId,
2344 original_target_columns: Vec<PbColumnCatalog>,
2345 sink_fragment_id: FragmentId,
2346 target_table: &PbTable,
2347 target_fragment_id: FragmentId,
2348) -> MetaResult<UpstreamSinkInfo> {
2349 let sink_columns = if !original_target_columns.is_empty() {
2350 original_target_columns.clone()
2351 } else {
2352 target_table.columns.clone()
2357 };
2358
2359 let sink_output_fields = sink_columns
2360 .iter()
2361 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2362 .collect_vec();
2363 let output_indices = (0..sink_output_fields.len())
2364 .map(|i| i as u32)
2365 .collect_vec();
2366
2367 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2368 let sink_idx_by_col_id = sink_columns
2369 .iter()
2370 .enumerate()
2371 .map(|(idx, col)| {
2372 let column_id = col.column_desc.as_ref().unwrap().column_id;
2373 (column_id, idx as u32)
2374 })
2375 .collect::<HashMap<_, _>>();
2376 target_table
2377 .distribution_key
2378 .iter()
2379 .map(|dist_idx| {
2380 let column_id = target_table.columns[*dist_idx as usize]
2381 .column_desc
2382 .as_ref()
2383 .unwrap()
2384 .column_id;
2385 let sink_idx = sink_idx_by_col_id
2386 .get(&column_id)
2387 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2388 Ok(*sink_idx)
2389 })
2390 .collect::<anyhow::Result<Vec<_>>>()?
2391 };
2392 let dist_key_indices =
2393 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2394 let downstream_fragment_id = target_fragment_id as _;
2395 let new_downstream_relation = DownstreamFragmentRelation {
2396 downstream_fragment_id,
2397 dispatcher_type: DispatcherType::Hash,
2398 dist_key_indices,
2399 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2400 };
2401 let current_target_columns = target_table.get_columns();
2402 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2403 Ok(UpstreamSinkInfo {
2404 sink_id,
2405 sink_fragment_id: sink_fragment_id as _,
2406 sink_output_fields,
2407 sink_original_target_columns: original_target_columns,
2408 project_exprs,
2409 new_sink_downstream: new_downstream_relation,
2410 })
2411}
2412
2413pub fn refill_upstream_sink_union_in_table(
2414 union_fragment_root: &mut PbStreamNode,
2415 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2416) {
2417 visit_stream_node_cont_mut(union_fragment_root, |node| {
2418 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2419 let init_upstreams = upstream_sink_infos
2420 .iter()
2421 .map(|info| PbUpstreamSinkInfo {
2422 upstream_fragment_id: info.sink_fragment_id,
2423 sink_output_schema: info.sink_output_fields.clone(),
2424 project_exprs: info.project_exprs.clone(),
2425 })
2426 .collect();
2427 upstream_sink_union.init_upstreams = init_upstreams;
2428 false
2429 } else {
2430 true
2431 }
2432 });
2433}
2434
2435#[cfg(test)]
2436mod tests {
2437 use std::num::NonZeroUsize;
2438
2439 use super::*;
2440
2441 #[test]
2442 fn test_validate_specified_parallelism_accepts_within_max() {
2443 DdlController::validate_specified_parallelism(
2444 Some(NonZeroUsize::new(4).unwrap()),
2445 Some(NonZeroUsize::new(8).unwrap()),
2446 NonZeroUsize::new(8).unwrap(),
2447 )
2448 .unwrap();
2449 }
2450
2451 #[test]
2452 fn test_validate_specified_parallelism_rejects_parallelism_over_max() {
2453 let result = DdlController::validate_specified_parallelism(
2454 Some(NonZeroUsize::new(9).unwrap()),
2455 None,
2456 NonZeroUsize::new(8).unwrap(),
2457 );
2458 assert!(matches!(
2459 result,
2460 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2461 ));
2462 }
2463
2464 #[test]
2465 fn test_validate_specified_parallelism_rejects_backfill_parallelism_over_max() {
2466 let result = DdlController::validate_specified_parallelism(
2467 None,
2468 Some(NonZeroUsize::new(9).unwrap()),
2469 NonZeroUsize::new(8).unwrap(),
2470 );
2471 assert!(matches!(
2472 result,
2473 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2474 ));
2475 }
2476}