1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50 Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
51 Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57 alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64 StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::{OwnedSemaphorePermit, Semaphore};
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::cluster::StreamingClusterInfo;
76use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
77use crate::controller::utils::build_select_node_list;
78use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
79use crate::manager::sink_coordination::SinkCoordinatorManager;
80use crate::manager::{
81 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
82 NotificationVersion, StreamingJob, StreamingJobType,
83};
84use crate::model::{
85 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
86 FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
87 TableParallelism,
88};
89use crate::stream::cdc::{
90 parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
91};
92use crate::stream::{
93 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
94 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
95 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
96 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
97 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
98 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
99};
100use crate::telemetry::report_event;
101use crate::{MetaError, MetaResult};
102
103#[derive(PartialEq)]
104pub enum DropMode {
105 Restrict,
106 Cascade,
107}
108
109impl DropMode {
110 pub fn from_request_setting(cascade: bool) -> DropMode {
111 if cascade {
112 DropMode::Cascade
113 } else {
114 DropMode::Restrict
115 }
116 }
117}
118
119#[derive(strum::AsRefStr)]
120pub enum StreamingJobId {
121 MaterializedView(TableId),
122 Sink(SinkId),
123 Table(Option<SourceId>, TableId),
124 Index(IndexId),
125}
126
127impl std::fmt::Display for StreamingJobId {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 write!(f, "{}", self.as_ref())?;
130 write!(f, "({})", self.id())
131 }
132}
133
134impl StreamingJobId {
135 fn id(&self) -> JobId {
136 match self {
137 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
138 StreamingJobId::Index(id) => id.as_job_id(),
139 StreamingJobId::Sink(id) => id.as_job_id(),
140 }
141 }
142}
143
144pub struct ReplaceStreamJobInfo {
147 pub streaming_job: StreamingJob,
148 pub fragment_graph: StreamFragmentGraphProto,
149}
150
151#[derive(Display)]
152pub enum DdlCommand {
153 CreateDatabase(Database),
154 DropDatabase(DatabaseId),
155 CreateSchema(Schema),
156 DropSchema(SchemaId, DropMode),
157 CreateNonSharedSource(Source),
158 DropSource(SourceId, DropMode),
159 ResetSource(SourceId),
160 CreateFunction(Function),
161 DropFunction(FunctionId, DropMode),
162 CreateView(View, HashSet<ObjectId>),
163 DropView(ViewId, DropMode),
164 CreateStreamingJob {
165 stream_job: StreamingJob,
166 fragment_graph: StreamFragmentGraphProto,
167 dependencies: HashSet<ObjectId>,
168 resource_type: streaming_job_resource_type::ResourceType,
169 if_not_exists: bool,
170 },
171 DropStreamingJob {
172 job_id: StreamingJobId,
173 drop_mode: DropMode,
174 },
175 AlterName(alter_name_request::Object, String),
176 AlterSwapRename(alter_swap_rename_request::Object),
177 ReplaceStreamJob(ReplaceStreamJobInfo),
178 AlterNonSharedSource(Source),
179 AlterObjectOwner(Object, UserId),
180 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
181 CreateConnection(Connection),
182 DropConnection(ConnectionId, DropMode),
183 CreateSecret(Secret),
184 AlterSecret(Secret),
185 DropSecret(SecretId),
186 CommentOn(Comment),
187 CreateSubscription(Subscription),
188 DropSubscription(SubscriptionId, DropMode),
189 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
190 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
191}
192
193impl DdlCommand {
194 fn object(&self) -> Either<String, ObjectId> {
196 use Either::*;
197 match self {
198 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
199 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
200 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
201 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
202 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
203 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
204 DdlCommand::ResetSource(id) => Right(id.as_object_id()),
205 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
206 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
207 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
208 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
209 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
210 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
211 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
212 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
213 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
214 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
215 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
216 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
217 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
218 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
219 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
220 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
221 DdlCommand::DropSecret(id) => Right(id.as_object_id()),
222 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
223 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
224 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
225 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
226 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
227 }
228 }
229
230 fn allow_in_recovery(&self) -> bool {
231 match self {
232 DdlCommand::DropDatabase(_)
233 | DdlCommand::DropSchema(_, _)
234 | DdlCommand::DropSource(_, _)
235 | DdlCommand::DropFunction(_, _)
236 | DdlCommand::DropView(_, _)
237 | DdlCommand::DropStreamingJob { .. }
238 | DdlCommand::DropConnection(_, _)
239 | DdlCommand::DropSecret(_)
240 | DdlCommand::DropSubscription(_, _)
241 | DdlCommand::AlterName(_, _)
242 | DdlCommand::AlterObjectOwner(_, _)
243 | DdlCommand::AlterSetSchema(_, _)
244 | DdlCommand::CreateDatabase(_)
245 | DdlCommand::CreateSchema(_)
246 | DdlCommand::CreateFunction(_)
247 | DdlCommand::CreateView(_, _)
248 | DdlCommand::CreateConnection(_)
249 | DdlCommand::CommentOn(_)
250 | DdlCommand::CreateSecret(_)
251 | DdlCommand::AlterSecret(_)
252 | DdlCommand::AlterSwapRename(_)
253 | DdlCommand::AlterDatabaseParam(_, _)
254 | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
255 DdlCommand::CreateStreamingJob { .. }
256 | DdlCommand::CreateNonSharedSource(_)
257 | DdlCommand::ReplaceStreamJob(_)
258 | DdlCommand::AlterNonSharedSource(_)
259 | DdlCommand::ResetSource(_)
260 | DdlCommand::CreateSubscription(_) => false,
261 }
262 }
263}
264
265#[derive(Clone)]
266pub struct DdlController {
267 pub(crate) env: MetaSrvEnv,
268
269 pub(crate) metadata_manager: MetadataManager,
270 pub(crate) stream_manager: GlobalStreamManagerRef,
271 pub(crate) source_manager: SourceManagerRef,
272 barrier_manager: BarrierManagerRef,
273 sink_manager: SinkCoordinatorManager,
274
275 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
277
278 seq: Arc<AtomicU64>,
280}
281
282#[derive(Clone)]
283pub struct CreatingStreamingJobPermit {
284 pub(crate) semaphore: Arc<Semaphore>,
285}
286
287impl CreatingStreamingJobPermit {
288 async fn new(env: &MetaSrvEnv) -> Self {
289 let mut permits = env
290 .system_params_reader()
291 .await
292 .max_concurrent_creating_streaming_jobs() as usize;
293 if permits == 0 {
294 permits = Semaphore::MAX_PERMITS;
296 }
297 let semaphore = Arc::new(Semaphore::new(permits));
298
299 let (local_notification_tx, mut local_notification_rx) =
300 tokio::sync::mpsc::unbounded_channel();
301 env.notification_manager()
302 .insert_local_sender(local_notification_tx);
303 let semaphore_clone = semaphore.clone();
304 tokio::spawn(async move {
305 while let Some(notification) = local_notification_rx.recv().await {
306 let LocalNotification::SystemParamsChange(p) = ¬ification else {
307 continue;
308 };
309 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
310 if new_permits == 0 {
311 new_permits = Semaphore::MAX_PERMITS;
312 }
313 match permits.cmp(&new_permits) {
314 Ordering::Less => {
315 semaphore_clone.add_permits(new_permits - permits);
316 }
317 Ordering::Equal => continue,
318 Ordering::Greater => {
319 let to_release = permits - new_permits;
320 let reduced = semaphore_clone.forget_permits(to_release);
321 if reduced != to_release {
323 tracing::warn!(
324 "no enough permits to release, expected {}, but reduced {}",
325 to_release,
326 reduced
327 );
328 }
329 }
330 }
331 tracing::info!(
332 "max_concurrent_creating_streaming_jobs changed from {} to {}",
333 permits,
334 new_permits
335 );
336 permits = new_permits;
337 }
338 });
339
340 Self { semaphore }
341 }
342}
343
344impl DdlController {
345 pub async fn new(
346 env: MetaSrvEnv,
347 metadata_manager: MetadataManager,
348 stream_manager: GlobalStreamManagerRef,
349 source_manager: SourceManagerRef,
350 barrier_manager: BarrierManagerRef,
351 sink_manager: SinkCoordinatorManager,
352 ) -> Self {
353 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
354 Self {
355 env,
356 metadata_manager,
357 stream_manager,
358 source_manager,
359 barrier_manager,
360 sink_manager,
361 creating_streaming_job_permits,
362 seq: Arc::new(AtomicU64::new(0)),
363 }
364 }
365
366 pub fn next_seq(&self) -> u64 {
368 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
370 }
371
372 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
379 if !command.allow_in_recovery() {
380 self.barrier_manager.check_status_running()?;
381 }
382
383 let await_tree_key = format!("DDL Command {}", self.next_seq());
384 let await_tree_span = await_tree::span!("{command}({})", command.object());
385
386 let ctrl = self.clone();
387 let fut = async move {
388 match command {
389 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
390 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
391 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
392 DdlCommand::DropSchema(schema_id, drop_mode) => {
393 ctrl.drop_schema(schema_id, drop_mode).await
394 }
395 DdlCommand::CreateNonSharedSource(source) => {
396 ctrl.create_non_shared_source(source).await
397 }
398 DdlCommand::DropSource(source_id, drop_mode) => {
399 ctrl.drop_source(source_id, drop_mode).await
400 }
401 DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
402 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
403 DdlCommand::DropFunction(function_id, drop_mode) => {
404 ctrl.drop_function(function_id, drop_mode).await
405 }
406 DdlCommand::CreateView(view, dependencies) => {
407 ctrl.create_view(view, dependencies).await
408 }
409 DdlCommand::DropView(view_id, drop_mode) => {
410 ctrl.drop_view(view_id, drop_mode).await
411 }
412 DdlCommand::CreateStreamingJob {
413 stream_job,
414 fragment_graph,
415 dependencies,
416 resource_type,
417 if_not_exists,
418 } => {
419 ctrl.create_streaming_job(
420 stream_job,
421 fragment_graph,
422 dependencies,
423 resource_type,
424 if_not_exists,
425 )
426 .await
427 }
428 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
429 ctrl.drop_streaming_job(job_id, drop_mode).await
430 }
431 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
432 streaming_job,
433 fragment_graph,
434 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
435 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
436 DdlCommand::AlterObjectOwner(object, owner_id) => {
437 ctrl.alter_owner(object, owner_id).await
438 }
439 DdlCommand::AlterSetSchema(object, new_schema_id) => {
440 ctrl.alter_set_schema(object, new_schema_id).await
441 }
442 DdlCommand::CreateConnection(connection) => {
443 ctrl.create_connection(connection).await
444 }
445 DdlCommand::DropConnection(connection_id, drop_mode) => {
446 ctrl.drop_connection(connection_id, drop_mode).await
447 }
448 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
449 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
450 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
451 DdlCommand::AlterNonSharedSource(source) => {
452 ctrl.alter_non_shared_source(source).await
453 }
454 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
455 DdlCommand::CreateSubscription(subscription) => {
456 ctrl.create_subscription(subscription).await
457 }
458 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
459 ctrl.drop_subscription(subscription_id, drop_mode).await
460 }
461 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
462 DdlCommand::AlterDatabaseParam(database_id, param) => {
463 ctrl.alter_database_param(database_id, param).await
464 }
465 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
466 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
467 .await
468 }
469 }
470 }
471 .in_current_span();
472 let fut = (self.env.await_tree_reg())
473 .register(await_tree_key, await_tree_span)
474 .instrument(Box::pin(fut));
475 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
476 Ok(Some(WaitVersion {
477 catalog_version: notification_version,
478 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
479 }))
480 }
481
482 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
483 self.barrier_manager.get_ddl_progress().await
484 }
485
486 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
487 let (version, updated_db) = self
488 .metadata_manager
489 .catalog_controller
490 .create_database(database)
491 .await?;
492 self.barrier_manager
494 .update_database_barrier(
495 updated_db.database_id,
496 updated_db.barrier_interval_ms.map(|v| v as u32),
497 updated_db.checkpoint_frequency.map(|v| v as u64),
498 )
499 .await?;
500 Ok(version)
501 }
502
503 #[tracing::instrument(skip(self), level = "debug")]
504 pub async fn reschedule_streaming_job(
505 &self,
506 job_id: JobId,
507 target: ReschedulePolicy,
508 mut deferred: bool,
509 ) -> MetaResult<()> {
510 tracing::info!("altering parallelism for job {}", job_id);
511 if self.barrier_manager.check_status_running().is_err() {
512 tracing::info!(
513 "alter parallelism is set to deferred mode because the system is in recovery state"
514 );
515 deferred = true;
516 }
517
518 self.stream_manager
519 .reschedule_streaming_job(job_id, target, deferred)
520 .await
521 }
522
523 pub async fn reschedule_cdc_table_backfill(
524 &self,
525 job_id: JobId,
526 target: ReschedulePolicy,
527 ) -> MetaResult<()> {
528 tracing::info!("alter CDC table backfill parallelism");
529 if self.barrier_manager.check_status_running().is_err() {
530 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
531 }
532 self.stream_manager
533 .reschedule_cdc_table_backfill(job_id, target)
534 .await
535 }
536
537 pub async fn reschedule_fragments(
538 &self,
539 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
540 ) -> MetaResult<()> {
541 tracing::info!(
542 "altering parallelism for fragments {:?}",
543 fragment_targets.keys()
544 );
545 let fragment_targets = fragment_targets
546 .into_iter()
547 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
548 .collect();
549
550 self.stream_manager
551 .reschedule_fragments(fragment_targets)
552 .await
553 }
554
555 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
556 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
557 .await
558 }
559
560 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
561 self.metadata_manager
562 .catalog_controller
563 .create_schema(schema)
564 .await
565 }
566
567 async fn drop_schema(
568 &self,
569 schema_id: SchemaId,
570 drop_mode: DropMode,
571 ) -> MetaResult<NotificationVersion> {
572 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
573 .await
574 }
575
576 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
578 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
579 .await
580 .context("failed to create source worker")?;
581
582 let (source_id, version) = self
583 .metadata_manager
584 .catalog_controller
585 .create_source(source)
586 .await?;
587 self.source_manager
588 .register_source_with_handle(source_id, handle)
589 .await;
590 Ok(version)
591 }
592
593 async fn drop_source(
594 &self,
595 source_id: SourceId,
596 drop_mode: DropMode,
597 ) -> MetaResult<NotificationVersion> {
598 self.drop_object(ObjectType::Source, source_id, drop_mode)
599 .await
600 }
601
602 async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
603 tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
604
605 let database_id = self
607 .metadata_manager
608 .catalog_controller
609 .get_object_database_id(source_id)
610 .await?;
611
612 self.stream_manager
613 .barrier_scheduler
614 .run_command(database_id, Command::ResetSource { source_id })
615 .await?;
616
617 let version = self
619 .metadata_manager
620 .catalog_controller
621 .current_notification_version()
622 .await;
623 Ok(version)
624 }
625
626 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
629 self.metadata_manager
630 .catalog_controller
631 .alter_non_shared_source(source)
632 .await
633 }
634
635 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
636 self.metadata_manager
637 .catalog_controller
638 .create_function(function)
639 .await
640 }
641
642 async fn drop_function(
643 &self,
644 function_id: FunctionId,
645 drop_mode: DropMode,
646 ) -> MetaResult<NotificationVersion> {
647 self.drop_object(ObjectType::Function, function_id, drop_mode)
648 .await
649 }
650
651 async fn create_view(
652 &self,
653 view: View,
654 dependencies: HashSet<ObjectId>,
655 ) -> MetaResult<NotificationVersion> {
656 self.metadata_manager
657 .catalog_controller
658 .create_view(view, dependencies)
659 .await
660 }
661
662 async fn drop_view(
663 &self,
664 view_id: ViewId,
665 drop_mode: DropMode,
666 ) -> MetaResult<NotificationVersion> {
667 self.drop_object(ObjectType::View, view_id, drop_mode).await
668 }
669
670 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
671 validate_connection(&connection).await?;
672 self.metadata_manager
673 .catalog_controller
674 .create_connection(connection)
675 .await
676 }
677
678 async fn drop_connection(
679 &self,
680 connection_id: ConnectionId,
681 drop_mode: DropMode,
682 ) -> MetaResult<NotificationVersion> {
683 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
684 .await
685 }
686
687 async fn alter_database_param(
688 &self,
689 database_id: DatabaseId,
690 param: AlterDatabaseParam,
691 ) -> MetaResult<NotificationVersion> {
692 let (version, updated_db) = self
693 .metadata_manager
694 .catalog_controller
695 .alter_database_param(database_id, param)
696 .await?;
697 self.barrier_manager
699 .update_database_barrier(
700 database_id,
701 updated_db.barrier_interval_ms.map(|v| v as u32),
702 updated_db.checkpoint_frequency.map(|v| v as u64),
703 )
704 .await?;
705 Ok(version)
706 }
707
708 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
711 let secret_store_private_key = self
712 .env
713 .opts
714 .secret_store_private_key
715 .clone()
716 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
717
718 let encrypted_payload = SecretEncryption::encrypt(
719 secret_store_private_key.as_slice(),
720 secret.get_value().as_slice(),
721 )
722 .context(format!("failed to encrypt secret {}", secret.name))?;
723 Ok(encrypted_payload
724 .serialize()
725 .context(format!("failed to serialize secret {}", secret.name))?)
726 }
727
728 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
729 let secret_plain_payload = secret.value.clone();
732 let encrypted_payload = self.get_encrypted_payload(&secret)?;
733 secret.value = encrypted_payload;
734
735 self.metadata_manager
736 .catalog_controller
737 .create_secret(secret, secret_plain_payload)
738 .await
739 }
740
741 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
742 self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
743 .await
744 }
745
746 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
747 let secret_plain_payload = secret.value.clone();
748 let encrypted_payload = self.get_encrypted_payload(&secret)?;
749 secret.value = encrypted_payload;
750 self.metadata_manager
751 .catalog_controller
752 .alter_secret(secret, secret_plain_payload)
753 .await
754 }
755
756 async fn create_subscription(
757 &self,
758 mut subscription: Subscription,
759 ) -> MetaResult<NotificationVersion> {
760 tracing::debug!("create subscription");
761 let _permit = self
762 .creating_streaming_job_permits
763 .semaphore
764 .acquire()
765 .await
766 .unwrap();
767 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
768 self.metadata_manager
769 .catalog_controller
770 .create_subscription_catalog(&mut subscription)
771 .await?;
772 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
773 tracing::debug!(error = %err.as_report(), "failed to create subscription");
774 let _ = self
775 .metadata_manager
776 .catalog_controller
777 .try_abort_creating_subscription(subscription.id)
778 .await
779 .inspect_err(|e| {
780 tracing::error!(
781 error = %e.as_report(),
782 "failed to abort create subscription after failure"
783 );
784 });
785 return Err(err);
786 }
787
788 let version = self
789 .metadata_manager
790 .catalog_controller
791 .notify_create_subscription(subscription.id)
792 .await?;
793 tracing::debug!("finish create subscription");
794 Ok(version)
795 }
796
797 async fn drop_subscription(
798 &self,
799 subscription_id: SubscriptionId,
800 drop_mode: DropMode,
801 ) -> MetaResult<NotificationVersion> {
802 tracing::debug!("preparing drop subscription");
803 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
804 let subscription = self
805 .metadata_manager
806 .catalog_controller
807 .get_subscription_by_id(subscription_id)
808 .await?;
809 let table_id = subscription.dependent_table_id;
810 let database_id = subscription.database_id;
811 let (_, version) = self
812 .metadata_manager
813 .catalog_controller
814 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
815 .await?;
816 self.stream_manager
817 .drop_subscription(database_id, subscription_id, table_id)
818 .await;
819 tracing::debug!("finish drop subscription");
820 Ok(version)
821 }
822
823 #[await_tree::instrument]
825 pub(crate) async fn validate_cdc_table(
826 &self,
827 table: &Table,
828 table_fragments: &StreamJobFragments,
829 ) -> MetaResult<()> {
830 let stream_scan_fragment = table_fragments
831 .fragments
832 .values()
833 .filter(|f| {
834 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
835 || f.fragment_type_mask
836 .contains(FragmentTypeFlag::StreamCdcScan)
837 })
838 .exactly_one()
839 .ok()
840 .with_context(|| {
841 format!(
842 "expect exactly one stream scan fragment, got: {:?}",
843 table_fragments.fragments
844 )
845 })?;
846 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
847 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
848 if let Some(o) = node.options
849 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
850 {
851 } else {
853 assert_eq!(
854 stream_scan_fragment.actors.len(),
855 1,
856 "Stream scan fragment should have only one actor"
857 );
858 }
859 }
860 }
861 let mut found_cdc_scan = false;
862 match &stream_scan_fragment.nodes.node_body {
863 Some(NodeBody::StreamCdcScan(_)) => {
864 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
865 if self
866 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
867 .await?
868 {
869 found_cdc_scan = true;
870 }
871 }
872 Some(NodeBody::Project(_)) => {
874 for input in &stream_scan_fragment.nodes.input {
875 assert_parallelism(stream_scan_fragment, &input.node_body);
876 if self
877 .validate_cdc_table_inner(&input.node_body, table.id)
878 .await?
879 {
880 found_cdc_scan = true;
881 }
882 }
883 }
884 _ => {
885 bail!("Unexpected node body for stream cdc scan");
886 }
887 };
888 if !found_cdc_scan {
889 bail!("No stream cdc scan node found in stream scan fragment");
890 }
891 Ok(())
892 }
893
894 async fn validate_cdc_table_inner(
895 &self,
896 node_body: &Option<NodeBody>,
897 table_id: TableId,
898 ) -> MetaResult<bool> {
899 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
900 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
901 {
902 let options_with_secret = WithOptionsSecResolved::new(
903 cdc_table_desc.connect_properties.clone(),
904 cdc_table_desc.secret_refs.clone(),
905 );
906
907 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
908 props.init_from_pb_cdc_table_desc(cdc_table_desc);
909
910 let _enumerator = props
912 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
913 .await?;
914
915 tracing::debug!(?table_id, "validate cdc table success");
916 Ok(true)
917 } else {
918 Ok(false)
919 }
920 }
921
922 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
923 let migrated = self
924 .metadata_manager
925 .catalog_controller
926 .has_table_been_migrated(table_id)
927 .await?;
928 if !migrated {
929 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
930 } else {
931 Ok(())
932 }
933 }
934
935 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
938 pub async fn create_streaming_job(
939 &self,
940 mut streaming_job: StreamingJob,
941 fragment_graph: StreamFragmentGraphProto,
942 dependencies: HashSet<ObjectId>,
943 resource_type: streaming_job_resource_type::ResourceType,
944 if_not_exists: bool,
945 ) -> MetaResult<NotificationVersion> {
946 if let StreamingJob::Sink(sink) = &streaming_job
947 && let Some(target_table) = sink.target_table
948 {
949 self.validate_table_for_sink(target_table).await?;
950 }
951 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
952 let check_ret = self
953 .metadata_manager
954 .catalog_controller
955 .create_job_catalog(
956 &mut streaming_job,
957 &ctx,
958 &fragment_graph.parallelism,
959 fragment_graph.max_parallelism as _,
960 dependencies,
961 resource_type.clone(),
962 &fragment_graph.backfill_parallelism,
963 )
964 .await;
965 if let Err(meta_err) = check_ret {
966 if !if_not_exists {
967 return Err(meta_err);
968 }
969 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
970 if streaming_job.create_type() == CreateType::Foreground {
971 let database_id = streaming_job.database_id();
972 self.metadata_manager
973 .wait_streaming_job_finished(database_id, *job_id)
974 .await
975 } else {
976 Ok(IGNORED_NOTIFICATION_VERSION)
977 }
978 } else {
979 Err(meta_err)
980 };
981 }
982 let job_id = streaming_job.id();
983 tracing::debug!(
984 id = %job_id,
985 definition = streaming_job.definition(),
986 create_type = streaming_job.create_type().as_str_name(),
987 job_type = ?streaming_job.job_type(),
988 "starting streaming job",
989 );
990 let permit = self
992 .creating_streaming_job_permits
993 .semaphore
994 .clone()
995 .acquire_owned()
996 .instrument_await("acquire_creating_streaming_job_permit")
997 .await
998 .unwrap();
999 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1000
1001 let name = streaming_job.name();
1002 let definition = streaming_job.definition();
1003 let source_id = match &streaming_job {
1004 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1005 _ => None,
1006 };
1007
1008 match self
1010 .create_streaming_job_inner(ctx, streaming_job, fragment_graph, resource_type, permit)
1011 .await
1012 {
1013 Ok(version) => Ok(version),
1014 Err(err) => {
1015 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1016 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1017 id: job_id,
1018 name,
1019 definition,
1020 error: err.as_report().to_string(),
1021 };
1022 self.env.event_log_manager_ref().add_event_logs(vec![
1023 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1024 ]);
1025 let (aborted, _) = self
1026 .metadata_manager
1027 .catalog_controller
1028 .try_abort_creating_streaming_job(job_id, false)
1029 .await?;
1030 if aborted {
1031 tracing::warn!(id = %job_id, "aborted streaming job");
1032 if let Some(source_id) = source_id {
1034 self.source_manager
1035 .apply_source_change(SourceChange::DropSource {
1036 dropped_source_ids: vec![source_id],
1037 })
1038 .await;
1039 }
1040 }
1041 Err(err)
1042 }
1043 }
1044 }
1045
1046 #[await_tree::instrument(boxed)]
1047 async fn create_streaming_job_inner(
1048 &self,
1049 ctx: StreamContext,
1050 mut streaming_job: StreamingJob,
1051 fragment_graph: StreamFragmentGraphProto,
1052 resource_type: streaming_job_resource_type::ResourceType,
1053 permit: OwnedSemaphorePermit,
1054 ) -> MetaResult<NotificationVersion> {
1055 let mut fragment_graph =
1056 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1057 streaming_job.set_info_from_graph(&fragment_graph);
1058
1059 let incomplete_internal_tables = fragment_graph
1061 .incomplete_internal_tables()
1062 .into_values()
1063 .collect_vec();
1064 let table_id_map = self
1065 .metadata_manager
1066 .catalog_controller
1067 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1068 .await?;
1069 fragment_graph.refill_internal_table_ids(table_id_map);
1070
1071 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1073 let (ctx, stream_job_fragments) = self
1074 .build_stream_job(ctx, streaming_job, fragment_graph, resource_type)
1075 .await?;
1076
1077 let streaming_job = &ctx.streaming_job;
1078
1079 match streaming_job {
1080 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1081 self.validate_cdc_table(table, &stream_job_fragments)
1082 .await?;
1083 }
1084 StreamingJob::Table(Some(source), ..) => {
1085 self.source_manager.register_source(source).await?;
1087 let connector_name = source
1088 .get_with_properties()
1089 .get(UPSTREAM_SOURCE_KEY)
1090 .cloned();
1091 let attr = source.info.as_ref().map(|source_info| {
1092 jsonbb::json!({
1093 "format": source_info.format().as_str_name(),
1094 "encode": source_info.row_encode().as_str_name(),
1095 })
1096 });
1097 report_create_object(
1098 streaming_job.id(),
1099 "source",
1100 PbTelemetryDatabaseObject::Source,
1101 connector_name,
1102 attr,
1103 );
1104 }
1105 StreamingJob::Sink(sink) => {
1106 if sink.auto_refresh_schema_from_table.is_some() {
1107 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1108 }
1109 validate_sink(sink).await?;
1111 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1112 let attr = sink.format_desc.as_ref().map(|sink_info| {
1113 jsonbb::json!({
1114 "format": sink_info.format().as_str_name(),
1115 "encode": sink_info.encode().as_str_name(),
1116 })
1117 });
1118 report_create_object(
1119 streaming_job.id(),
1120 "sink",
1121 PbTelemetryDatabaseObject::Sink,
1122 connector_name,
1123 attr,
1124 );
1125 }
1126 StreamingJob::Source(source) => {
1127 self.source_manager.register_source(source).await?;
1129 let connector_name = source
1130 .get_with_properties()
1131 .get(UPSTREAM_SOURCE_KEY)
1132 .cloned();
1133 let attr = source.info.as_ref().map(|source_info| {
1134 jsonbb::json!({
1135 "format": source_info.format().as_str_name(),
1136 "encode": source_info.row_encode().as_str_name(),
1137 })
1138 });
1139 report_create_object(
1140 streaming_job.id(),
1141 "source",
1142 PbTelemetryDatabaseObject::Source,
1143 connector_name,
1144 attr,
1145 );
1146 }
1147 _ => {}
1148 }
1149
1150 let backfill_orders = ctx.fragment_backfill_ordering.clone().into();
1151 self.metadata_manager
1152 .catalog_controller
1153 .prepare_stream_job_fragments(
1154 &stream_job_fragments,
1155 streaming_job,
1156 false,
1157 Some(backfill_orders),
1158 )
1159 .await?;
1160
1161 let version = self
1163 .stream_manager
1164 .create_streaming_job(stream_job_fragments, ctx, permit)
1165 .await?;
1166
1167 Ok(version)
1168 }
1169
1170 pub async fn drop_object(
1172 &self,
1173 object_type: ObjectType,
1174 object_id: impl Into<ObjectId>,
1175 drop_mode: DropMode,
1176 ) -> MetaResult<NotificationVersion> {
1177 let object_id = object_id.into();
1178 let (release_ctx, version) = self
1179 .metadata_manager
1180 .catalog_controller
1181 .drop_object(object_type, object_id, drop_mode)
1182 .await?;
1183
1184 if object_type == ObjectType::Source {
1185 self.env
1186 .notification_manager_ref()
1187 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1188 }
1189
1190 let ReleaseContext {
1191 database_id,
1192 removed_streaming_job_ids,
1193 removed_state_table_ids,
1194 removed_source_ids,
1195 removed_secret_ids: secret_ids,
1196 removed_source_fragments,
1197 removed_actors,
1198 removed_fragments,
1199 removed_sink_fragment_by_targets,
1200 removed_iceberg_table_sinks,
1201 } = release_ctx;
1202
1203 let _guard = self.source_manager.pause_tick().await;
1204 self.stream_manager
1205 .drop_streaming_jobs(
1206 database_id,
1207 removed_actors.iter().map(|id| *id as _).collect(),
1208 removed_streaming_job_ids,
1209 removed_state_table_ids,
1210 removed_fragments.iter().map(|id| *id as _).collect(),
1211 removed_sink_fragment_by_targets
1212 .into_iter()
1213 .map(|(target, sinks)| {
1214 (target as _, sinks.into_iter().map(|id| id as _).collect())
1215 })
1216 .collect(),
1217 )
1218 .await;
1219
1220 self.source_manager
1223 .apply_source_change(SourceChange::DropSource {
1224 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1225 })
1226 .await;
1227
1228 let dropped_source_fragments = removed_source_fragments;
1231 self.source_manager
1232 .apply_source_change(SourceChange::DropMv {
1233 dropped_source_fragments,
1234 })
1235 .await;
1236
1237 let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1239 .iter()
1240 .map(|sink| sink.id)
1241 .collect();
1242
1243 for sink in removed_iceberg_table_sinks {
1244 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1245 .expect("Iceberg sink should be valid");
1246 let iceberg_sink =
1247 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1248 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1249 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1250 tracing::info!(
1251 "dropping iceberg table {} for dropped sink",
1252 table_identifier
1253 );
1254
1255 let _ = iceberg_catalog
1256 .drop_table(&table_identifier)
1257 .await
1258 .inspect_err(|err| {
1259 tracing::error!(
1260 "failed to drop iceberg table {} during cleanup: {}",
1261 table_identifier,
1262 err.as_report()
1263 );
1264 });
1265 }
1266 }
1267
1268 if !iceberg_sink_ids.is_empty() {
1270 self.sink_manager
1271 .stop_sink_coordinator(iceberg_sink_ids)
1272 .await;
1273 }
1274
1275 for secret in secret_ids {
1277 LocalSecretManager::global().remove_secret(secret);
1278 }
1279 Ok(version)
1280 }
1281
1282 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1284 pub async fn replace_job(
1285 &self,
1286 mut streaming_job: StreamingJob,
1287 fragment_graph: StreamFragmentGraphProto,
1288 ) -> MetaResult<NotificationVersion> {
1289 match &streaming_job {
1290 StreamingJob::Table(..)
1291 | StreamingJob::Source(..)
1292 | StreamingJob::MaterializedView(..) => {}
1293 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1294 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1295 }
1296 }
1297
1298 let job_id = streaming_job.id();
1299
1300 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1301 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1302
1303 let original_max_parallelism = self
1305 .metadata_manager
1306 .get_job_max_parallelism(streaming_job.id())
1307 .await?;
1308 let fragment_graph = PbStreamFragmentGraph {
1309 max_parallelism: original_max_parallelism as _,
1310 ..fragment_graph
1311 };
1312
1313 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1315 streaming_job.set_info_from_graph(&fragment_graph);
1316
1317 let streaming_job = streaming_job;
1319
1320 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1321 let auto_refresh_schema_sinks = self
1322 .metadata_manager
1323 .catalog_controller
1324 .get_sink_auto_refresh_schema_from(table.id)
1325 .await?;
1326 if !auto_refresh_schema_sinks.is_empty() {
1327 let original_table_columns = self
1328 .metadata_manager
1329 .catalog_controller
1330 .get_table_columns(table.id)
1331 .await?;
1332 let mut original_table_column_ids: HashSet<_> = original_table_columns
1334 .iter()
1335 .map(|col| col.column_id())
1336 .collect();
1337 let newly_added_columns = table
1338 .columns
1339 .iter()
1340 .filter(|col| {
1341 !original_table_column_ids.remove(&ColumnId::new(
1342 col.column_desc.as_ref().unwrap().column_id as _,
1343 ))
1344 })
1345 .map(|col| ColumnCatalog::from(col.clone()))
1346 .collect_vec();
1347 if !original_table_column_ids.is_empty() {
1348 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1349 }
1350 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1351 for sink in auto_refresh_schema_sinks {
1352 let sink_job_fragments = self
1353 .metadata_manager
1354 .get_job_fragments_by_id(sink.id.as_job_id())
1355 .await?;
1356 if sink_job_fragments.fragments.len() != 1 {
1357 return Err(anyhow!(
1358 "auto schema refresh sink must have only one fragment, but got {}",
1359 sink_job_fragments.fragments.len()
1360 )
1361 .into());
1362 }
1363 let original_sink_fragment =
1364 sink_job_fragments.fragments.into_values().next().unwrap();
1365 let (new_sink_fragment, new_schema, new_log_store_table) =
1366 rewrite_refresh_schema_sink_fragment(
1367 &original_sink_fragment,
1368 &sink,
1369 &newly_added_columns,
1370 table,
1371 fragment_graph.table_fragment_id(),
1372 self.env.id_gen_manager(),
1373 self.env.actor_id_generator(),
1374 )?;
1375
1376 assert_eq!(
1377 original_sink_fragment.actors.len(),
1378 new_sink_fragment.actors.len()
1379 );
1380 let actor_status = (0..original_sink_fragment.actors.len())
1381 .map(|i| {
1382 let worker_node_id = sink_job_fragments.actor_status
1383 [&original_sink_fragment.actors[i].actor_id]
1384 .location
1385 .as_ref()
1386 .unwrap()
1387 .worker_node_id;
1388 (
1389 new_sink_fragment.actors[i].actor_id,
1390 PbActorStatus {
1391 location: Some(PbActorLocation { worker_node_id }),
1392 },
1393 )
1394 })
1395 .collect();
1396
1397 let streaming_job = StreamingJob::Sink(sink);
1398
1399 let tmp_sink_id = self
1400 .metadata_manager
1401 .catalog_controller
1402 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1403 .await?
1404 .as_sink_id();
1405 let StreamingJob::Sink(sink) = streaming_job else {
1406 unreachable!()
1407 };
1408
1409 sinks.push(AutoRefreshSchemaSinkContext {
1410 tmp_sink_id,
1411 original_sink: sink,
1412 original_fragment: original_sink_fragment,
1413 new_schema,
1414 newly_add_fields: newly_added_columns
1415 .iter()
1416 .map(|col| Field::from(&col.column_desc))
1417 .collect(),
1418 new_fragment: new_sink_fragment,
1419 new_log_store_table,
1420 actor_status,
1421 });
1422 }
1423 Some(sinks)
1424 } else {
1425 None
1426 }
1427 } else {
1428 None
1429 };
1430
1431 let tmp_id = self
1432 .metadata_manager
1433 .catalog_controller
1434 .create_job_catalog_for_replace(
1435 &streaming_job,
1436 Some(&ctx),
1437 fragment_graph.specified_parallelism().as_ref(),
1438 Some(fragment_graph.max_parallelism()),
1439 )
1440 .await?;
1441
1442 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1443 sinks
1444 .iter()
1445 .map(|sink| sink.tmp_sink_id.as_object_id())
1446 .collect_vec()
1447 });
1448
1449 tracing::debug!(id = %job_id, "building replace streaming job");
1450 let mut updated_sink_catalogs = vec![];
1451
1452 let mut drop_table_connector_ctx = None;
1453 let result: MetaResult<_> = try {
1454 let (mut ctx, mut stream_job_fragments) = self
1455 .build_replace_job(
1456 ctx,
1457 &streaming_job,
1458 fragment_graph,
1459 tmp_id,
1460 auto_refresh_schema_sinks,
1461 )
1462 .await?;
1463 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1464 let auto_refresh_schema_sink_finish_ctx =
1465 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1466 sinks
1467 .iter()
1468 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1469 tmp_sink_id: sink.tmp_sink_id,
1470 original_sink_id: sink.original_sink.id,
1471 columns: sink.new_schema.clone(),
1472 new_log_store_table: sink
1473 .new_log_store_table
1474 .as_ref()
1475 .map(|table| (table.id, table.columns.clone())),
1476 })
1477 .collect()
1478 });
1479
1480 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1482 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1483 let upstream_infos = self
1484 .metadata_manager
1485 .catalog_controller
1486 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1487 .await?;
1488 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1489
1490 for upstream_info in &upstream_infos {
1491 let upstream_fragment_id = upstream_info.sink_fragment_id;
1492 ctx.upstream_fragment_downstreams
1493 .entry(upstream_fragment_id)
1494 .or_default()
1495 .push(upstream_info.new_sink_downstream.clone());
1496 if upstream_info.sink_original_target_columns.is_empty() {
1497 updated_sink_catalogs.push(upstream_info.sink_id);
1498 }
1499 }
1500 }
1501
1502 let replace_upstream = ctx.replace_upstream.clone();
1503
1504 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1505 let empty_downstreams = FragmentDownstreamRelation::default();
1506 for sink in sinks {
1507 self.metadata_manager
1508 .catalog_controller
1509 .prepare_streaming_job(
1510 sink.tmp_sink_id.as_job_id(),
1511 || [&sink.new_fragment].into_iter(),
1512 &empty_downstreams,
1513 true,
1514 None,
1515 None,
1516 )
1517 .await?;
1518 }
1519 }
1520
1521 self.metadata_manager
1522 .catalog_controller
1523 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1524 .await?;
1525
1526 self.stream_manager
1527 .replace_stream_job(stream_job_fragments, ctx)
1528 .await?;
1529 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1530 };
1531
1532 match result {
1533 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1534 let version = self
1535 .metadata_manager
1536 .catalog_controller
1537 .finish_replace_streaming_job(
1538 tmp_id,
1539 streaming_job,
1540 replace_upstream,
1541 SinkIntoTableContext {
1542 updated_sink_catalogs,
1543 },
1544 drop_table_connector_ctx.as_ref(),
1545 auto_refresh_schema_sink_finish_ctx,
1546 )
1547 .await?;
1548 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1549 self.source_manager
1550 .apply_source_change(SourceChange::DropSource {
1551 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1552 })
1553 .await;
1554 }
1555 Ok(version)
1556 }
1557 Err(err) => {
1558 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1559 let _ = self.metadata_manager
1560 .catalog_controller
1561 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1562 .await.inspect_err(|err| {
1563 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1564 });
1565 Err(err)
1566 }
1567 }
1568 }
1569
1570 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1571 )]
1572 async fn drop_streaming_job(
1573 &self,
1574 job_id: StreamingJobId,
1575 drop_mode: DropMode,
1576 ) -> MetaResult<NotificationVersion> {
1577 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1578
1579 let (object_id, object_type) = match job_id {
1580 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1581 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1582 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1583 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1584 };
1585
1586 let job_status = self
1587 .metadata_manager
1588 .catalog_controller
1589 .get_streaming_job_status(job_id.id())
1590 .await?;
1591 let version = match job_status {
1592 JobStatus::Initial => {
1593 unreachable!(
1594 "Job with Initial status should not notify frontend and therefore should not arrive here"
1595 );
1596 }
1597 JobStatus::Creating => {
1598 self.stream_manager
1599 .cancel_streaming_jobs(vec![job_id.id()])
1600 .await?;
1601 IGNORED_NOTIFICATION_VERSION
1602 }
1603 JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1604 };
1605
1606 Ok(version)
1607 }
1608
1609 fn resolve_stream_parallelism(
1613 &self,
1614 specified: Option<NonZeroUsize>,
1615 max: NonZeroUsize,
1616 cluster_info: &StreamingClusterInfo,
1617 resource_group: String,
1618 ) -> MetaResult<NonZeroUsize> {
1619 let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1620 DdlController::resolve_stream_parallelism_inner(
1621 specified,
1622 max,
1623 available,
1624 &self.env.opts.default_parallelism,
1625 &resource_group,
1626 )
1627 }
1628
1629 fn resolve_stream_parallelism_inner(
1630 specified: Option<NonZeroUsize>,
1631 max: NonZeroUsize,
1632 available: Option<NonZeroUsize>,
1633 default_parallelism: &DefaultParallelism,
1634 resource_group: &str,
1635 ) -> MetaResult<NonZeroUsize> {
1636 let Some(available) = available else {
1637 bail_unavailable!(
1638 "no available slots to schedule in resource group \"{}\", \
1639 have you allocated any compute nodes within this resource group?",
1640 resource_group
1641 );
1642 };
1643
1644 if let Some(specified) = specified {
1645 if specified > max {
1646 bail_invalid_parameter!(
1647 "specified parallelism {} should not exceed max parallelism {}",
1648 specified,
1649 max,
1650 );
1651 }
1652 if specified > available {
1653 tracing::warn!(
1654 resource_group,
1655 specified_parallelism = specified.get(),
1656 available_parallelism = available.get(),
1657 "specified parallelism exceeds available slots, scheduling with specified value",
1658 );
1659 }
1660 return Ok(specified);
1661 }
1662
1663 let default_parallelism = match default_parallelism {
1665 DefaultParallelism::Full => available,
1666 DefaultParallelism::Default(num) => {
1667 if *num > available {
1668 tracing::warn!(
1669 resource_group,
1670 configured_parallelism = num.get(),
1671 available_parallelism = available.get(),
1672 "default parallelism exceeds available slots, scheduling with configured value",
1673 );
1674 }
1675 *num
1676 }
1677 };
1678
1679 if default_parallelism > max {
1680 tracing::warn!(
1681 max_parallelism = max.get(),
1682 resource_group,
1683 "default parallelism exceeds max parallelism, capping to max",
1684 );
1685 }
1686 Ok(default_parallelism.min(max))
1687 }
1688
1689 #[await_tree::instrument]
1695 pub(crate) async fn build_stream_job(
1696 &self,
1697 stream_ctx: StreamContext,
1698 mut stream_job: StreamingJob,
1699 fragment_graph: StreamFragmentGraph,
1700 resource_type: streaming_job_resource_type::ResourceType,
1701 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1702 let id = stream_job.id();
1703 let specified_parallelism = fragment_graph.specified_parallelism();
1704 let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1705 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1706
1707 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1709
1710 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1714 fragment_graph.collect_snapshot_backfill_info()?;
1715 assert!(
1716 snapshot_backfill_info
1717 .iter()
1718 .chain([&cross_db_snapshot_backfill_info])
1719 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1720 .all(|backfill_epoch| backfill_epoch.is_none()),
1721 "should not set backfill epoch when initially build the job: {:?} {:?}",
1722 snapshot_backfill_info,
1723 cross_db_snapshot_backfill_info
1724 );
1725
1726 let locality_fragment_state_table_mapping =
1727 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1728
1729 self.metadata_manager
1731 .catalog_controller
1732 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1733 .await?;
1734
1735 let upstream_table_ids = fragment_graph
1736 .dependent_table_ids()
1737 .iter()
1738 .filter(|id| {
1739 !cross_db_snapshot_backfill_info
1740 .upstream_mv_table_id_to_backfill_epoch
1741 .contains_key(id)
1742 })
1743 .cloned()
1744 .collect();
1745
1746 let (upstream_root_fragments, existing_actor_location) = self
1747 .metadata_manager
1748 .get_upstream_root_fragments(&upstream_table_ids)
1749 .await?;
1750
1751 if snapshot_backfill_info.is_some() {
1752 match stream_job {
1753 StreamingJob::MaterializedView(_)
1754 | StreamingJob::Sink(_)
1755 | StreamingJob::Index(_, _) => {}
1756 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1757 return Err(
1758 anyhow!("snapshot_backfill not enabled for table and source").into(),
1759 );
1760 }
1761 }
1762 }
1763
1764 let upstream_actors = upstream_root_fragments
1765 .values()
1766 .map(|(fragment, _)| {
1767 (
1768 fragment.fragment_id,
1769 fragment.actors.keys().copied().collect(),
1770 )
1771 })
1772 .collect();
1773
1774 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1775 fragment_graph,
1776 FragmentGraphUpstreamContext {
1777 upstream_root_fragments,
1778 upstream_actor_location: existing_actor_location,
1779 },
1780 (&stream_job).into(),
1781 )?;
1782 let resource_group = if let Some(group) = resource_type.resource_group() {
1783 group
1784 } else {
1785 self.metadata_manager
1786 .get_database_resource_group(stream_job.database_id())
1787 .await?
1788 };
1789 let is_serverless_backfill = matches!(
1790 &resource_type,
1791 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1792 );
1793
1794 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1796
1797 let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1798 let parallelism = self.resolve_stream_parallelism(
1799 initial_parallelism,
1800 max_parallelism,
1801 &cluster_info,
1802 resource_group.clone(),
1803 )?;
1804
1805 let parallelism = if initial_parallelism.is_some() {
1806 parallelism.get()
1807 } else {
1808 let adaptive_strategy = match stream_ctx.adaptive_parallelism_strategy {
1810 Some(strategy) => strategy,
1811 None => self
1812 .env
1813 .system_params_reader()
1814 .await
1815 .adaptive_parallelism_strategy(),
1816 };
1817 adaptive_strategy.compute_target_parallelism(parallelism.get())
1818 };
1819
1820 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1821 let actor_graph_builder = ActorGraphBuilder::new(
1822 id,
1823 resource_group,
1824 complete_graph,
1825 cluster_info,
1826 parallelism,
1827 )?;
1828
1829 let ActorGraphBuildResult {
1830 graph,
1831 downstream_fragment_relations,
1832 building_locations,
1833 upstream_fragment_downstreams,
1834 new_no_shuffle,
1835 replace_upstream,
1836 ..
1837 } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1838 assert!(replace_upstream.is_empty());
1839
1840 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1847 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1848 _ => TableParallelism::Fixed(parallelism.get()),
1849 };
1850
1851 let stream_job_fragments = StreamJobFragments::new(
1852 id,
1853 graph,
1854 &building_locations.actor_locations,
1855 stream_ctx.clone(),
1856 table_parallelism,
1857 max_parallelism.get(),
1858 );
1859
1860 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1861 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1862 }
1863
1864 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1865 && let Ok(table_id) = sink.get_target_table()
1866 {
1867 let tables = self
1868 .metadata_manager
1869 .get_table_catalog_by_ids(&[*table_id])
1870 .await?;
1871 let target_table = tables
1872 .first()
1873 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1874 let sink_fragment = stream_job_fragments
1875 .sink_fragment()
1876 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1877 let mview_fragment_id = self
1878 .metadata_manager
1879 .catalog_controller
1880 .get_mview_fragment_by_id(table_id.as_job_id())
1881 .await?;
1882 let upstream_sink_info = build_upstream_sink_info(
1883 sink.id,
1884 sink.original_target_columns.clone(),
1885 sink_fragment.fragment_id as _,
1886 target_table,
1887 mview_fragment_id,
1888 )?;
1889 Some(upstream_sink_info)
1890 } else {
1891 None
1892 };
1893
1894 let mut cdc_table_snapshot_splits = None;
1895 if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1896 && let Some((_, stream_cdc_scan)) =
1897 parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1898 {
1899 {
1900 let splits = try_init_parallel_cdc_table_snapshot_splits(
1902 table.id,
1903 stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1904 self.env.meta_store_ref(),
1905 stream_cdc_scan.options.as_ref().unwrap(),
1906 self.env.opts.cdc_table_split_init_insert_batch_size,
1907 self.env.opts.cdc_table_split_init_sleep_interval_splits,
1908 self.env.opts.cdc_table_split_init_sleep_duration_millis,
1909 )
1910 .await?;
1911 cdc_table_snapshot_splits = Some(splits);
1912 }
1913 }
1914
1915 let ctx = CreateStreamingJobContext {
1916 upstream_fragment_downstreams,
1917 new_no_shuffle,
1918 upstream_actors,
1919 building_locations,
1920 definition: stream_job.definition(),
1921 create_type: stream_job.create_type(),
1922 job_type: (&stream_job).into(),
1923 streaming_job: stream_job,
1924 new_upstream_sink,
1925 option: CreateStreamingJobOption {},
1926 snapshot_backfill_info,
1927 cross_db_snapshot_backfill_info,
1928 fragment_backfill_ordering,
1929 locality_fragment_state_table_mapping,
1930 cdc_table_snapshot_splits,
1931 is_serverless_backfill,
1932 };
1933
1934 Ok((
1935 ctx,
1936 StreamJobFragmentsToCreate {
1937 inner: stream_job_fragments,
1938 downstreams: downstream_fragment_relations,
1939 },
1940 ))
1941 }
1942
1943 pub(crate) async fn build_replace_job(
1949 &self,
1950 stream_ctx: StreamContext,
1951 stream_job: &StreamingJob,
1952 mut fragment_graph: StreamFragmentGraph,
1953 tmp_job_id: JobId,
1954 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1955 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1956 match &stream_job {
1957 StreamingJob::Table(..)
1958 | StreamingJob::Source(..)
1959 | StreamingJob::MaterializedView(..) => {}
1960 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1961 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1962 }
1963 }
1964
1965 let id = stream_job.id();
1966
1967 let mut drop_table_associated_source_id = None;
1969 if let StreamingJob::Table(None, _, _) = &stream_job {
1970 drop_table_associated_source_id = self
1971 .metadata_manager
1972 .get_table_associated_source_id(id.as_mv_table_id())
1973 .await?;
1974 }
1975
1976 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1977 let old_internal_table_ids = old_fragments.internal_table_ids();
1978
1979 let mut drop_table_connector_ctx = None;
1981 if let Some(to_remove_source_id) = drop_table_associated_source_id {
1982 debug_assert!(old_internal_table_ids.len() == 1);
1984
1985 drop_table_connector_ctx = Some(DropTableConnectorContext {
1986 to_change_streaming_job_id: id,
1989 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
1991 });
1992 } else if stream_job.is_materialized_view() {
1993 let old_fragments_upstreams = self
1996 .metadata_manager
1997 .catalog_controller
1998 .upstream_fragments(old_fragments.fragment_ids())
1999 .await?;
2000
2001 let old_state_graph =
2002 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2003 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2004 let result = state_match::match_graph(&new_state_graph, &old_state_graph)
2005 .context("incompatible altering on the streaming job states")?;
2006
2007 fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2008 fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2009 } else {
2010 let old_internal_tables = self
2013 .metadata_manager
2014 .get_table_catalog_by_ids(&old_internal_table_ids)
2015 .await?;
2016 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2017 }
2018
2019 let original_root_fragment = old_fragments
2022 .root_fragment()
2023 .expect("root fragment not found");
2024
2025 let job_type = StreamingJobType::from(stream_job);
2026
2027 let (mut downstream_fragments, mut downstream_actor_location) =
2029 self.metadata_manager.get_downstream_fragments(id).await?;
2030
2031 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2032 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2033 .iter()
2034 .map(|sink| sink.original_fragment.fragment_id)
2035 .collect();
2036 for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2037 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2038 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2039 }) {
2040 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2041 for actor_id in downstream_fragment.actors.keys() {
2042 downstream_actor_location.remove(actor_id);
2043 }
2044 for (actor_id, status) in &sink.actor_status {
2045 downstream_actor_location
2046 .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2047 }
2048
2049 *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2050 *nodes = sink.new_fragment.nodes.clone();
2051 }
2052 }
2053 assert!(remaining_fragment.is_empty());
2054 }
2055
2056 let complete_graph = match &job_type {
2058 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2059 CompleteStreamFragmentGraph::with_downstreams(
2060 fragment_graph,
2061 FragmentGraphDownstreamContext {
2062 original_root_fragment_id: original_root_fragment.fragment_id,
2063 downstream_fragments,
2064 downstream_actor_location,
2065 },
2066 job_type,
2067 )?
2068 }
2069 StreamingJobType::Table(TableJobType::SharedCdcSource)
2070 | StreamingJobType::MaterializedView => {
2071 let (upstream_root_fragments, upstream_actor_location) = self
2073 .metadata_manager
2074 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2075 .await?;
2076
2077 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2078 fragment_graph,
2079 FragmentGraphUpstreamContext {
2080 upstream_root_fragments,
2081 upstream_actor_location,
2082 },
2083 FragmentGraphDownstreamContext {
2084 original_root_fragment_id: original_root_fragment.fragment_id,
2085 downstream_fragments,
2086 downstream_actor_location,
2087 },
2088 job_type,
2089 )?
2090 }
2091 _ => unreachable!(),
2092 };
2093
2094 let resource_group = self
2095 .metadata_manager
2096 .get_existing_job_resource_group(id)
2097 .await?;
2098
2099 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2101
2102 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2105 .expect("The number of actors in the original table fragment should be greater than 0");
2106
2107 let actor_graph_builder = ActorGraphBuilder::new(
2108 id,
2109 resource_group,
2110 complete_graph,
2111 cluster_info,
2112 parallelism,
2113 )?;
2114
2115 let ActorGraphBuildResult {
2116 graph,
2117 downstream_fragment_relations,
2118 building_locations,
2119 upstream_fragment_downstreams,
2120 mut replace_upstream,
2121 new_no_shuffle,
2122 ..
2123 } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2124
2125 if matches!(
2127 job_type,
2128 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2129 ) {
2130 assert!(upstream_fragment_downstreams.is_empty());
2131 }
2132
2133 let stream_job_fragments = StreamJobFragments::new(
2137 tmp_job_id,
2138 graph,
2139 &building_locations.actor_locations,
2140 stream_ctx,
2141 old_fragments.assigned_parallelism,
2142 old_fragments.max_parallelism,
2143 );
2144
2145 if let Some(sinks) = &auto_refresh_schema_sinks {
2146 for sink in sinks {
2147 replace_upstream
2148 .remove(&sink.new_fragment.fragment_id)
2149 .expect("should exist");
2150 }
2151 }
2152
2153 let ctx = ReplaceStreamJobContext {
2157 old_fragments,
2158 replace_upstream,
2159 new_no_shuffle,
2160 upstream_fragment_downstreams,
2161 building_locations,
2162 streaming_job: stream_job.clone(),
2163 tmp_id: tmp_job_id,
2164 drop_table_connector_ctx,
2165 auto_refresh_schema_sinks,
2166 };
2167
2168 Ok((
2169 ctx,
2170 StreamJobFragmentsToCreate {
2171 inner: stream_job_fragments,
2172 downstreams: downstream_fragment_relations,
2173 },
2174 ))
2175 }
2176
2177 async fn alter_name(
2178 &self,
2179 relation: alter_name_request::Object,
2180 new_name: &str,
2181 ) -> MetaResult<NotificationVersion> {
2182 let (obj_type, id) = match relation {
2183 alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2184 alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2185 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2186 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2187 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2188 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2189 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2190 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2191 };
2192 self.metadata_manager
2193 .catalog_controller
2194 .alter_name(obj_type, id, new_name)
2195 .await
2196 }
2197
2198 async fn alter_swap_rename(
2199 &self,
2200 object: alter_swap_rename_request::Object,
2201 ) -> MetaResult<NotificationVersion> {
2202 let (obj_type, src_id, dst_id) = match object {
2203 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2204 alter_swap_rename_request::Object::Table(objs) => {
2205 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2206 (ObjectType::Table, src_id, dst_id)
2207 }
2208 alter_swap_rename_request::Object::View(objs) => {
2209 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2210 (ObjectType::View, src_id, dst_id)
2211 }
2212 alter_swap_rename_request::Object::Source(objs) => {
2213 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2214 (ObjectType::Source, src_id, dst_id)
2215 }
2216 alter_swap_rename_request::Object::Sink(objs) => {
2217 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2218 (ObjectType::Sink, src_id, dst_id)
2219 }
2220 alter_swap_rename_request::Object::Subscription(objs) => {
2221 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2222 (ObjectType::Subscription, src_id, dst_id)
2223 }
2224 };
2225
2226 self.metadata_manager
2227 .catalog_controller
2228 .alter_swap_rename(obj_type, src_id, dst_id)
2229 .await
2230 }
2231
2232 async fn alter_owner(
2233 &self,
2234 object: Object,
2235 owner_id: UserId,
2236 ) -> MetaResult<NotificationVersion> {
2237 let (obj_type, id) = match object {
2238 Object::TableId(id) => (ObjectType::Table, id),
2239 Object::ViewId(id) => (ObjectType::View, id),
2240 Object::SourceId(id) => (ObjectType::Source, id),
2241 Object::SinkId(id) => (ObjectType::Sink, id),
2242 Object::SchemaId(id) => (ObjectType::Schema, id),
2243 Object::DatabaseId(id) => (ObjectType::Database, id),
2244 Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2245 Object::ConnectionId(id) => (ObjectType::Connection, id),
2246 Object::FunctionId(id) => (ObjectType::Function, id),
2247 Object::SecretId(id) => (ObjectType::Secret, id),
2248 };
2249 self.metadata_manager
2250 .catalog_controller
2251 .alter_owner(obj_type, id.into(), owner_id as _)
2252 .await
2253 }
2254
2255 async fn alter_set_schema(
2256 &self,
2257 object: alter_set_schema_request::Object,
2258 new_schema_id: SchemaId,
2259 ) -> MetaResult<NotificationVersion> {
2260 let (obj_type, id) = match object {
2261 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2262 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2263 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2264 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2265 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2266 alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2267 alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2268 };
2269 self.metadata_manager
2270 .catalog_controller
2271 .alter_schema(obj_type, id.into(), new_schema_id as _)
2272 .await
2273 }
2274
2275 pub async fn wait(&self) -> MetaResult<()> {
2276 let timeout_ms = 30 * 60 * 1000;
2277 for _ in 0..timeout_ms {
2278 if self
2279 .metadata_manager
2280 .catalog_controller
2281 .list_background_creating_jobs(true, None)
2282 .await?
2283 .is_empty()
2284 {
2285 return Ok(());
2286 }
2287
2288 sleep(Duration::from_millis(1)).await;
2289 }
2290 Err(MetaError::cancelled(format!(
2291 "timeout after {timeout_ms}ms"
2292 )))
2293 }
2294
2295 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2296 self.metadata_manager
2297 .catalog_controller
2298 .comment_on(comment)
2299 .await
2300 }
2301
2302 async fn alter_streaming_job_config(
2303 &self,
2304 job_id: JobId,
2305 entries_to_add: HashMap<String, String>,
2306 keys_to_remove: Vec<String>,
2307 ) -> MetaResult<NotificationVersion> {
2308 self.metadata_manager
2309 .catalog_controller
2310 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2311 .await
2312 }
2313}
2314
2315fn report_create_object(
2316 job_id: JobId,
2317 event_name: &str,
2318 obj_type: PbTelemetryDatabaseObject,
2319 connector_name: Option<String>,
2320 attr_info: Option<jsonbb::Value>,
2321) {
2322 report_event(
2323 PbTelemetryEventStage::CreateStreamJob,
2324 event_name,
2325 job_id.as_raw_id() as _,
2326 connector_name,
2327 Some(obj_type),
2328 attr_info,
2329 );
2330}
2331
2332pub fn build_upstream_sink_info(
2333 sink_id: SinkId,
2334 original_target_columns: Vec<PbColumnCatalog>,
2335 sink_fragment_id: FragmentId,
2336 target_table: &PbTable,
2337 target_fragment_id: FragmentId,
2338) -> MetaResult<UpstreamSinkInfo> {
2339 let sink_columns = if !original_target_columns.is_empty() {
2340 original_target_columns.clone()
2341 } else {
2342 target_table.columns.clone()
2347 };
2348
2349 let sink_output_fields = sink_columns
2350 .iter()
2351 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2352 .collect_vec();
2353 let output_indices = (0..sink_output_fields.len())
2354 .map(|i| i as u32)
2355 .collect_vec();
2356
2357 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2358 let sink_idx_by_col_id = sink_columns
2359 .iter()
2360 .enumerate()
2361 .map(|(idx, col)| {
2362 let column_id = col.column_desc.as_ref().unwrap().column_id;
2363 (column_id, idx as u32)
2364 })
2365 .collect::<HashMap<_, _>>();
2366 target_table
2367 .distribution_key
2368 .iter()
2369 .map(|dist_idx| {
2370 let column_id = target_table.columns[*dist_idx as usize]
2371 .column_desc
2372 .as_ref()
2373 .unwrap()
2374 .column_id;
2375 let sink_idx = sink_idx_by_col_id
2376 .get(&column_id)
2377 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2378 Ok(*sink_idx)
2379 })
2380 .collect::<anyhow::Result<Vec<_>>>()?
2381 };
2382 let dist_key_indices =
2383 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2384 let downstream_fragment_id = target_fragment_id as _;
2385 let new_downstream_relation = DownstreamFragmentRelation {
2386 downstream_fragment_id,
2387 dispatcher_type: DispatcherType::Hash,
2388 dist_key_indices,
2389 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2390 };
2391 let current_target_columns = target_table.get_columns();
2392 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2393 Ok(UpstreamSinkInfo {
2394 sink_id,
2395 sink_fragment_id: sink_fragment_id as _,
2396 sink_output_fields,
2397 sink_original_target_columns: original_target_columns,
2398 project_exprs,
2399 new_sink_downstream: new_downstream_relation,
2400 })
2401}
2402
2403pub fn refill_upstream_sink_union_in_table(
2404 union_fragment_root: &mut PbStreamNode,
2405 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2406) {
2407 visit_stream_node_cont_mut(union_fragment_root, |node| {
2408 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2409 let init_upstreams = upstream_sink_infos
2410 .iter()
2411 .map(|info| PbUpstreamSinkInfo {
2412 upstream_fragment_id: info.sink_fragment_id,
2413 sink_output_schema: info.sink_output_fields.clone(),
2414 project_exprs: info.project_exprs.clone(),
2415 })
2416 .collect();
2417 upstream_sink_union.init_upstreams = init_upstreams;
2418 false
2419 } else {
2420 true
2421 }
2422 });
2423}
2424
2425#[cfg(test)]
2426mod tests {
2427 use std::num::NonZeroUsize;
2428
2429 use super::*;
2430
2431 #[test]
2432 fn test_specified_parallelism_exceeds_available() {
2433 let result = DdlController::resolve_stream_parallelism_inner(
2434 Some(NonZeroUsize::new(100).unwrap()),
2435 NonZeroUsize::new(256).unwrap(),
2436 Some(NonZeroUsize::new(4).unwrap()),
2437 &DefaultParallelism::Full,
2438 "default",
2439 )
2440 .unwrap();
2441 assert_eq!(result.get(), 100);
2442 }
2443
2444 #[test]
2445 fn test_allows_default_parallelism_over_available() {
2446 let result = DdlController::resolve_stream_parallelism_inner(
2447 None,
2448 NonZeroUsize::new(256).unwrap(),
2449 Some(NonZeroUsize::new(4).unwrap()),
2450 &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2451 "default",
2452 )
2453 .unwrap();
2454 assert_eq!(result.get(), 50);
2455 }
2456
2457 #[test]
2458 fn test_full_parallelism_capped_by_max() {
2459 let result = DdlController::resolve_stream_parallelism_inner(
2460 None,
2461 NonZeroUsize::new(6).unwrap(),
2462 Some(NonZeroUsize::new(10).unwrap()),
2463 &DefaultParallelism::Full,
2464 "default",
2465 )
2466 .unwrap();
2467 assert_eq!(result.get(), 6);
2468 }
2469
2470 #[test]
2471 fn test_no_available_slots_returns_error() {
2472 let result = DdlController::resolve_stream_parallelism_inner(
2473 None,
2474 NonZeroUsize::new(4).unwrap(),
2475 None,
2476 &DefaultParallelism::Full,
2477 "default",
2478 );
2479 assert!(matches!(
2480 result,
2481 Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2482 ));
2483 }
2484
2485 #[test]
2486 fn test_specified_over_max_returns_error() {
2487 let result = DdlController::resolve_stream_parallelism_inner(
2488 Some(NonZeroUsize::new(8).unwrap()),
2489 NonZeroUsize::new(4).unwrap(),
2490 Some(NonZeroUsize::new(10).unwrap()),
2491 &DefaultParallelism::Full,
2492 "default",
2493 );
2494 assert!(matches!(
2495 result,
2496 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2497 ));
2498 }
2499}