1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50 Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
51 Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57 alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64 StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::{OwnedSemaphorePermit, Semaphore};
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::cluster::StreamingClusterInfo;
76use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
77use crate::controller::utils::build_select_node_list;
78use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
79use crate::manager::sink_coordination::SinkCoordinatorManager;
80use crate::manager::{
81 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
82 NotificationVersion, StreamingJob, StreamingJobType,
83};
84use crate::model::{
85 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
86 FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
87 TableParallelism,
88};
89use crate::stream::cdc::{
90 parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
91};
92use crate::stream::{
93 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
94 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
95 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
96 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
97 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
98 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
99};
100use crate::telemetry::report_event;
101use crate::{MetaError, MetaResult};
102
103#[derive(PartialEq)]
104pub enum DropMode {
105 Restrict,
106 Cascade,
107}
108
109impl DropMode {
110 pub fn from_request_setting(cascade: bool) -> DropMode {
111 if cascade {
112 DropMode::Cascade
113 } else {
114 DropMode::Restrict
115 }
116 }
117}
118
119#[derive(strum::AsRefStr)]
120pub enum StreamingJobId {
121 MaterializedView(TableId),
122 Sink(SinkId),
123 Table(Option<SourceId>, TableId),
124 Index(IndexId),
125}
126
127impl std::fmt::Display for StreamingJobId {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 write!(f, "{}", self.as_ref())?;
130 write!(f, "({})", self.id())
131 }
132}
133
134impl StreamingJobId {
135 fn id(&self) -> JobId {
136 match self {
137 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
138 StreamingJobId::Index(id) => id.as_job_id(),
139 StreamingJobId::Sink(id) => id.as_job_id(),
140 }
141 }
142}
143
144pub struct ReplaceStreamJobInfo {
147 pub streaming_job: StreamingJob,
148 pub fragment_graph: StreamFragmentGraphProto,
149}
150
151#[derive(Display)]
152pub enum DdlCommand {
153 CreateDatabase(Database),
154 DropDatabase(DatabaseId),
155 CreateSchema(Schema),
156 DropSchema(SchemaId, DropMode),
157 CreateNonSharedSource(Source),
158 DropSource(SourceId, DropMode),
159 ResetSource(SourceId),
160 CreateFunction(Function),
161 DropFunction(FunctionId, DropMode),
162 CreateView(View, HashSet<ObjectId>),
163 DropView(ViewId, DropMode),
164 CreateStreamingJob {
165 stream_job: StreamingJob,
166 fragment_graph: StreamFragmentGraphProto,
167 dependencies: HashSet<ObjectId>,
168 resource_type: streaming_job_resource_type::ResourceType,
169 if_not_exists: bool,
170 },
171 DropStreamingJob {
172 job_id: StreamingJobId,
173 drop_mode: DropMode,
174 },
175 AlterName(alter_name_request::Object, String),
176 AlterSwapRename(alter_swap_rename_request::Object),
177 ReplaceStreamJob(ReplaceStreamJobInfo),
178 AlterNonSharedSource(Source),
179 AlterObjectOwner(Object, UserId),
180 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
181 CreateConnection(Connection),
182 DropConnection(ConnectionId, DropMode),
183 CreateSecret(Secret),
184 AlterSecret(Secret),
185 DropSecret(SecretId),
186 CommentOn(Comment),
187 CreateSubscription(Subscription),
188 DropSubscription(SubscriptionId, DropMode),
189 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
190 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
191}
192
193impl DdlCommand {
194 fn object(&self) -> Either<String, ObjectId> {
196 use Either::*;
197 match self {
198 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
199 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
200 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
201 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
202 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
203 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
204 DdlCommand::ResetSource(id) => Right(id.as_object_id()),
205 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
206 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
207 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
208 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
209 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
210 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
211 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
212 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
213 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
214 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
215 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
216 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
217 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
218 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
219 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
220 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
221 DdlCommand::DropSecret(id) => Right(id.as_object_id()),
222 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
223 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
224 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
225 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
226 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
227 }
228 }
229
230 fn allow_in_recovery(&self) -> bool {
231 match self {
232 DdlCommand::DropDatabase(_)
233 | DdlCommand::DropSchema(_, _)
234 | DdlCommand::DropSource(_, _)
235 | DdlCommand::DropFunction(_, _)
236 | DdlCommand::DropView(_, _)
237 | DdlCommand::DropStreamingJob { .. }
238 | DdlCommand::DropConnection(_, _)
239 | DdlCommand::DropSecret(_)
240 | DdlCommand::DropSubscription(_, _)
241 | DdlCommand::AlterName(_, _)
242 | DdlCommand::AlterObjectOwner(_, _)
243 | DdlCommand::AlterSetSchema(_, _)
244 | DdlCommand::CreateDatabase(_)
245 | DdlCommand::CreateSchema(_)
246 | DdlCommand::CreateFunction(_)
247 | DdlCommand::CreateView(_, _)
248 | DdlCommand::CreateConnection(_)
249 | DdlCommand::CommentOn(_)
250 | DdlCommand::CreateSecret(_)
251 | DdlCommand::AlterSecret(_)
252 | DdlCommand::AlterSwapRename(_)
253 | DdlCommand::AlterDatabaseParam(_, _)
254 | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
255 DdlCommand::CreateStreamingJob { .. }
256 | DdlCommand::CreateNonSharedSource(_)
257 | DdlCommand::ReplaceStreamJob(_)
258 | DdlCommand::AlterNonSharedSource(_)
259 | DdlCommand::ResetSource(_)
260 | DdlCommand::CreateSubscription(_) => false,
261 }
262 }
263}
264
265#[derive(Clone)]
266pub struct DdlController {
267 pub(crate) env: MetaSrvEnv,
268
269 pub(crate) metadata_manager: MetadataManager,
270 pub(crate) stream_manager: GlobalStreamManagerRef,
271 pub(crate) source_manager: SourceManagerRef,
272 barrier_manager: BarrierManagerRef,
273 sink_manager: SinkCoordinatorManager,
274
275 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
277
278 seq: Arc<AtomicU64>,
280}
281
282#[derive(Clone)]
283pub struct CreatingStreamingJobPermit {
284 pub(crate) semaphore: Arc<Semaphore>,
285}
286
287impl CreatingStreamingJobPermit {
288 async fn new(env: &MetaSrvEnv) -> Self {
289 let mut permits = env
290 .system_params_reader()
291 .await
292 .max_concurrent_creating_streaming_jobs() as usize;
293 if permits == 0 {
294 permits = Semaphore::MAX_PERMITS;
296 }
297 let semaphore = Arc::new(Semaphore::new(permits));
298
299 let (local_notification_tx, mut local_notification_rx) =
300 tokio::sync::mpsc::unbounded_channel();
301 env.notification_manager()
302 .insert_local_sender(local_notification_tx);
303 let semaphore_clone = semaphore.clone();
304 tokio::spawn(async move {
305 while let Some(notification) = local_notification_rx.recv().await {
306 let LocalNotification::SystemParamsChange(p) = ¬ification else {
307 continue;
308 };
309 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
310 if new_permits == 0 {
311 new_permits = Semaphore::MAX_PERMITS;
312 }
313 match permits.cmp(&new_permits) {
314 Ordering::Less => {
315 semaphore_clone.add_permits(new_permits - permits);
316 }
317 Ordering::Equal => continue,
318 Ordering::Greater => {
319 let to_release = permits - new_permits;
320 let reduced = semaphore_clone.forget_permits(to_release);
321 if reduced != to_release {
323 tracing::warn!(
324 "no enough permits to release, expected {}, but reduced {}",
325 to_release,
326 reduced
327 );
328 }
329 }
330 }
331 tracing::info!(
332 "max_concurrent_creating_streaming_jobs changed from {} to {}",
333 permits,
334 new_permits
335 );
336 permits = new_permits;
337 }
338 });
339
340 Self { semaphore }
341 }
342}
343
344impl DdlController {
345 pub async fn new(
346 env: MetaSrvEnv,
347 metadata_manager: MetadataManager,
348 stream_manager: GlobalStreamManagerRef,
349 source_manager: SourceManagerRef,
350 barrier_manager: BarrierManagerRef,
351 sink_manager: SinkCoordinatorManager,
352 ) -> Self {
353 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
354 Self {
355 env,
356 metadata_manager,
357 stream_manager,
358 source_manager,
359 barrier_manager,
360 sink_manager,
361 creating_streaming_job_permits,
362 seq: Arc::new(AtomicU64::new(0)),
363 }
364 }
365
366 pub fn next_seq(&self) -> u64 {
368 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
370 }
371
372 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
379 if !command.allow_in_recovery() {
380 self.barrier_manager.check_status_running()?;
381 }
382
383 let await_tree_key = format!("DDL Command {}", self.next_seq());
384 let await_tree_span = await_tree::span!("{command}({})", command.object());
385
386 let ctrl = self.clone();
387 let fut = async move {
388 match command {
389 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
390 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
391 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
392 DdlCommand::DropSchema(schema_id, drop_mode) => {
393 ctrl.drop_schema(schema_id, drop_mode).await
394 }
395 DdlCommand::CreateNonSharedSource(source) => {
396 ctrl.create_non_shared_source(source).await
397 }
398 DdlCommand::DropSource(source_id, drop_mode) => {
399 ctrl.drop_source(source_id, drop_mode).await
400 }
401 DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
402 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
403 DdlCommand::DropFunction(function_id, drop_mode) => {
404 ctrl.drop_function(function_id, drop_mode).await
405 }
406 DdlCommand::CreateView(view, dependencies) => {
407 ctrl.create_view(view, dependencies).await
408 }
409 DdlCommand::DropView(view_id, drop_mode) => {
410 ctrl.drop_view(view_id, drop_mode).await
411 }
412 DdlCommand::CreateStreamingJob {
413 stream_job,
414 fragment_graph,
415 dependencies,
416 resource_type,
417 if_not_exists,
418 } => {
419 ctrl.create_streaming_job(
420 stream_job,
421 fragment_graph,
422 dependencies,
423 resource_type,
424 if_not_exists,
425 )
426 .await
427 }
428 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
429 ctrl.drop_streaming_job(job_id, drop_mode).await
430 }
431 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
432 streaming_job,
433 fragment_graph,
434 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
435 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
436 DdlCommand::AlterObjectOwner(object, owner_id) => {
437 ctrl.alter_owner(object, owner_id).await
438 }
439 DdlCommand::AlterSetSchema(object, new_schema_id) => {
440 ctrl.alter_set_schema(object, new_schema_id).await
441 }
442 DdlCommand::CreateConnection(connection) => {
443 ctrl.create_connection(connection).await
444 }
445 DdlCommand::DropConnection(connection_id, drop_mode) => {
446 ctrl.drop_connection(connection_id, drop_mode).await
447 }
448 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
449 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
450 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
451 DdlCommand::AlterNonSharedSource(source) => {
452 ctrl.alter_non_shared_source(source).await
453 }
454 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
455 DdlCommand::CreateSubscription(subscription) => {
456 ctrl.create_subscription(subscription).await
457 }
458 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
459 ctrl.drop_subscription(subscription_id, drop_mode).await
460 }
461 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
462 DdlCommand::AlterDatabaseParam(database_id, param) => {
463 ctrl.alter_database_param(database_id, param).await
464 }
465 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
466 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
467 .await
468 }
469 }
470 }
471 .in_current_span();
472 let fut = (self.env.await_tree_reg())
473 .register(await_tree_key, await_tree_span)
474 .instrument(Box::pin(fut));
475 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
476 Ok(Some(WaitVersion {
477 catalog_version: notification_version,
478 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
479 }))
480 }
481
482 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
483 self.barrier_manager.get_ddl_progress().await
484 }
485
486 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
487 let (version, updated_db) = self
488 .metadata_manager
489 .catalog_controller
490 .create_database(database)
491 .await?;
492 self.barrier_manager
494 .update_database_barrier(
495 updated_db.database_id,
496 updated_db.barrier_interval_ms.map(|v| v as u32),
497 updated_db.checkpoint_frequency.map(|v| v as u64),
498 )
499 .await?;
500 Ok(version)
501 }
502
503 #[tracing::instrument(skip(self), level = "debug")]
504 pub async fn reschedule_streaming_job(
505 &self,
506 job_id: JobId,
507 target: ReschedulePolicy,
508 mut deferred: bool,
509 ) -> MetaResult<()> {
510 tracing::info!("altering parallelism for job {}", job_id);
511 if self.barrier_manager.check_status_running().is_err() {
512 tracing::info!(
513 "alter parallelism is set to deferred mode because the system is in recovery state"
514 );
515 deferred = true;
516 }
517
518 self.stream_manager
519 .reschedule_streaming_job(job_id, target, deferred)
520 .await
521 }
522
523 pub async fn reschedule_cdc_table_backfill(
524 &self,
525 job_id: JobId,
526 target: ReschedulePolicy,
527 ) -> MetaResult<()> {
528 tracing::info!("alter CDC table backfill parallelism");
529 if self.barrier_manager.check_status_running().is_err() {
530 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
531 }
532 self.stream_manager
533 .reschedule_cdc_table_backfill(job_id, target)
534 .await
535 }
536
537 pub async fn reschedule_fragments(
538 &self,
539 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
540 ) -> MetaResult<()> {
541 tracing::info!(
542 "altering parallelism for fragments {:?}",
543 fragment_targets.keys()
544 );
545 let fragment_targets = fragment_targets
546 .into_iter()
547 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
548 .collect();
549
550 self.stream_manager
551 .reschedule_fragments(fragment_targets)
552 .await
553 }
554
555 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
556 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
557 .await
558 }
559
560 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
561 self.metadata_manager
562 .catalog_controller
563 .create_schema(schema)
564 .await
565 }
566
567 async fn drop_schema(
568 &self,
569 schema_id: SchemaId,
570 drop_mode: DropMode,
571 ) -> MetaResult<NotificationVersion> {
572 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
573 .await
574 }
575
576 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
578 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
579 .await
580 .context("failed to create source worker")?;
581
582 let (source_id, version) = self
583 .metadata_manager
584 .catalog_controller
585 .create_source(source)
586 .await?;
587 self.source_manager
588 .register_source_with_handle(source_id, handle)
589 .await;
590 Ok(version)
591 }
592
593 async fn drop_source(
594 &self,
595 source_id: SourceId,
596 drop_mode: DropMode,
597 ) -> MetaResult<NotificationVersion> {
598 self.drop_object(ObjectType::Source, source_id, drop_mode)
599 .await
600 }
601
602 async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
603 tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
604
605 let database_id = self
607 .metadata_manager
608 .catalog_controller
609 .get_object_database_id(source_id)
610 .await?;
611
612 self.stream_manager
613 .barrier_scheduler
614 .run_command(database_id, Command::ResetSource { source_id })
615 .await?;
616
617 let version = self
619 .metadata_manager
620 .catalog_controller
621 .current_notification_version()
622 .await;
623 Ok(version)
624 }
625
626 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
629 self.metadata_manager
630 .catalog_controller
631 .alter_non_shared_source(source)
632 .await
633 }
634
635 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
636 self.metadata_manager
637 .catalog_controller
638 .create_function(function)
639 .await
640 }
641
642 async fn drop_function(
643 &self,
644 function_id: FunctionId,
645 drop_mode: DropMode,
646 ) -> MetaResult<NotificationVersion> {
647 self.drop_object(ObjectType::Function, function_id, drop_mode)
648 .await
649 }
650
651 async fn create_view(
652 &self,
653 view: View,
654 dependencies: HashSet<ObjectId>,
655 ) -> MetaResult<NotificationVersion> {
656 self.metadata_manager
657 .catalog_controller
658 .create_view(view, dependencies)
659 .await
660 }
661
662 async fn drop_view(
663 &self,
664 view_id: ViewId,
665 drop_mode: DropMode,
666 ) -> MetaResult<NotificationVersion> {
667 self.drop_object(ObjectType::View, view_id, drop_mode).await
668 }
669
670 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
671 validate_connection(&connection).await?;
672 self.metadata_manager
673 .catalog_controller
674 .create_connection(connection)
675 .await
676 }
677
678 async fn drop_connection(
679 &self,
680 connection_id: ConnectionId,
681 drop_mode: DropMode,
682 ) -> MetaResult<NotificationVersion> {
683 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
684 .await
685 }
686
687 async fn alter_database_param(
688 &self,
689 database_id: DatabaseId,
690 param: AlterDatabaseParam,
691 ) -> MetaResult<NotificationVersion> {
692 let (version, updated_db) = self
693 .metadata_manager
694 .catalog_controller
695 .alter_database_param(database_id, param)
696 .await?;
697 self.barrier_manager
699 .update_database_barrier(
700 database_id,
701 updated_db.barrier_interval_ms.map(|v| v as u32),
702 updated_db.checkpoint_frequency.map(|v| v as u64),
703 )
704 .await?;
705 Ok(version)
706 }
707
708 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
711 let secret_store_private_key = self
712 .env
713 .opts
714 .secret_store_private_key
715 .clone()
716 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
717
718 let encrypted_payload = SecretEncryption::encrypt(
719 secret_store_private_key.as_slice(),
720 secret.get_value().as_slice(),
721 )
722 .context(format!("failed to encrypt secret {}", secret.name))?;
723 Ok(encrypted_payload
724 .serialize()
725 .context(format!("failed to serialize secret {}", secret.name))?)
726 }
727
728 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
729 let secret_plain_payload = secret.value.clone();
732 let encrypted_payload = self.get_encrypted_payload(&secret)?;
733 secret.value = encrypted_payload;
734
735 self.metadata_manager
736 .catalog_controller
737 .create_secret(secret, secret_plain_payload)
738 .await
739 }
740
741 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
742 self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
743 .await
744 }
745
746 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
747 let secret_plain_payload = secret.value.clone();
748 let encrypted_payload = self.get_encrypted_payload(&secret)?;
749 secret.value = encrypted_payload;
750 self.metadata_manager
751 .catalog_controller
752 .alter_secret(secret, secret_plain_payload)
753 .await
754 }
755
756 async fn create_subscription(
757 &self,
758 mut subscription: Subscription,
759 ) -> MetaResult<NotificationVersion> {
760 tracing::debug!("create subscription");
761 let _permit = self
762 .creating_streaming_job_permits
763 .semaphore
764 .acquire()
765 .await
766 .unwrap();
767 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
768 self.metadata_manager
769 .catalog_controller
770 .create_subscription_catalog(&mut subscription)
771 .await?;
772 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
773 tracing::debug!(error = %err.as_report(), "failed to create subscription");
774 let _ = self
775 .metadata_manager
776 .catalog_controller
777 .try_abort_creating_subscription(subscription.id)
778 .await
779 .inspect_err(|e| {
780 tracing::error!(
781 error = %e.as_report(),
782 "failed to abort create subscription after failure"
783 );
784 });
785 return Err(err);
786 }
787
788 let version = self
789 .metadata_manager
790 .catalog_controller
791 .notify_create_subscription(subscription.id)
792 .await?;
793 tracing::debug!("finish create subscription");
794 Ok(version)
795 }
796
797 async fn drop_subscription(
798 &self,
799 subscription_id: SubscriptionId,
800 drop_mode: DropMode,
801 ) -> MetaResult<NotificationVersion> {
802 tracing::debug!("preparing drop subscription");
803 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
804 let subscription = self
805 .metadata_manager
806 .catalog_controller
807 .get_subscription_by_id(subscription_id)
808 .await?;
809 let table_id = subscription.dependent_table_id;
810 let database_id = subscription.database_id;
811 let (_, version) = self
812 .metadata_manager
813 .catalog_controller
814 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
815 .await?;
816 self.stream_manager
817 .drop_subscription(database_id, subscription_id, table_id)
818 .await;
819 tracing::debug!("finish drop subscription");
820 Ok(version)
821 }
822
823 #[await_tree::instrument]
825 pub(crate) async fn validate_cdc_table(
826 &self,
827 table: &Table,
828 table_fragments: &StreamJobFragments,
829 ) -> MetaResult<()> {
830 let stream_scan_fragment = table_fragments
831 .fragments
832 .values()
833 .filter(|f| {
834 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
835 || f.fragment_type_mask
836 .contains(FragmentTypeFlag::StreamCdcScan)
837 })
838 .exactly_one()
839 .ok()
840 .with_context(|| {
841 format!(
842 "expect exactly one stream scan fragment, got: {:?}",
843 table_fragments.fragments
844 )
845 })?;
846 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
847 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
848 if let Some(o) = node.options
849 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
850 {
851 } else {
853 assert_eq!(
854 stream_scan_fragment.actors.len(),
855 1,
856 "Stream scan fragment should have only one actor"
857 );
858 }
859 }
860 }
861 let mut found_cdc_scan = false;
862 match &stream_scan_fragment.nodes.node_body {
863 Some(NodeBody::StreamCdcScan(_)) => {
864 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
865 if self
866 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
867 .await?
868 {
869 found_cdc_scan = true;
870 }
871 }
872 Some(NodeBody::Project(_)) => {
874 for input in &stream_scan_fragment.nodes.input {
875 assert_parallelism(stream_scan_fragment, &input.node_body);
876 if self
877 .validate_cdc_table_inner(&input.node_body, table.id)
878 .await?
879 {
880 found_cdc_scan = true;
881 }
882 }
883 }
884 _ => {
885 bail!("Unexpected node body for stream cdc scan");
886 }
887 };
888 if !found_cdc_scan {
889 bail!("No stream cdc scan node found in stream scan fragment");
890 }
891 Ok(())
892 }
893
894 async fn validate_cdc_table_inner(
895 &self,
896 node_body: &Option<NodeBody>,
897 table_id: TableId,
898 ) -> MetaResult<bool> {
899 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
900 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
901 {
902 let options_with_secret = WithOptionsSecResolved::new(
903 cdc_table_desc.connect_properties.clone(),
904 cdc_table_desc.secret_refs.clone(),
905 );
906
907 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
908 props.init_from_pb_cdc_table_desc(cdc_table_desc);
909
910 let _enumerator = props
912 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
913 .await?;
914
915 tracing::debug!(?table_id, "validate cdc table success");
916 Ok(true)
917 } else {
918 Ok(false)
919 }
920 }
921
922 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
923 let migrated = self
924 .metadata_manager
925 .catalog_controller
926 .has_table_been_migrated(table_id)
927 .await?;
928 if !migrated {
929 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
930 } else {
931 Ok(())
932 }
933 }
934
935 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
938 pub async fn create_streaming_job(
939 &self,
940 mut streaming_job: StreamingJob,
941 fragment_graph: StreamFragmentGraphProto,
942 dependencies: HashSet<ObjectId>,
943 resource_type: streaming_job_resource_type::ResourceType,
944 if_not_exists: bool,
945 ) -> MetaResult<NotificationVersion> {
946 if let StreamingJob::Sink(sink) = &streaming_job
947 && let Some(target_table) = sink.target_table
948 {
949 self.validate_table_for_sink(target_table).await?;
950 }
951 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
952 let check_ret = self
953 .metadata_manager
954 .catalog_controller
955 .create_job_catalog(
956 &mut streaming_job,
957 &ctx,
958 &fragment_graph.parallelism,
959 fragment_graph.max_parallelism as _,
960 dependencies,
961 resource_type.clone(),
962 &fragment_graph.backfill_parallelism,
963 )
964 .await;
965 if let Err(meta_err) = check_ret {
966 if !if_not_exists {
967 return Err(meta_err);
968 }
969 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
970 if streaming_job.create_type() == CreateType::Foreground {
971 let database_id = streaming_job.database_id();
972 self.metadata_manager
973 .wait_streaming_job_finished(database_id, *job_id)
974 .await
975 } else {
976 Ok(IGNORED_NOTIFICATION_VERSION)
977 }
978 } else {
979 Err(meta_err)
980 };
981 }
982 let job_id = streaming_job.id();
983 tracing::debug!(
984 id = %job_id,
985 definition = streaming_job.definition(),
986 create_type = streaming_job.create_type().as_str_name(),
987 job_type = ?streaming_job.job_type(),
988 "starting streaming job",
989 );
990 let permit = self
992 .creating_streaming_job_permits
993 .semaphore
994 .clone()
995 .acquire_owned()
996 .instrument_await("acquire_creating_streaming_job_permit")
997 .await
998 .unwrap();
999 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1000
1001 let name = streaming_job.name();
1002 let definition = streaming_job.definition();
1003 let source_id = match &streaming_job {
1004 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1005 _ => None,
1006 };
1007
1008 match self
1010 .create_streaming_job_inner(ctx, streaming_job, fragment_graph, resource_type, permit)
1011 .await
1012 {
1013 Ok(version) => Ok(version),
1014 Err(err) => {
1015 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1016 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1017 id: job_id,
1018 name,
1019 definition,
1020 error: err.as_report().to_string(),
1021 };
1022 self.env.event_log_manager_ref().add_event_logs(vec![
1023 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1024 ]);
1025 let (aborted, _) = self
1026 .metadata_manager
1027 .catalog_controller
1028 .try_abort_creating_streaming_job(job_id, false)
1029 .await?;
1030 if aborted {
1031 tracing::warn!(id = %job_id, "aborted streaming job");
1032 if let Some(source_id) = source_id {
1034 self.source_manager
1035 .apply_source_change(SourceChange::DropSource {
1036 dropped_source_ids: vec![source_id],
1037 })
1038 .await;
1039 }
1040 }
1041 Err(err)
1042 }
1043 }
1044 }
1045
1046 #[await_tree::instrument(boxed)]
1047 async fn create_streaming_job_inner(
1048 &self,
1049 ctx: StreamContext,
1050 mut streaming_job: StreamingJob,
1051 fragment_graph: StreamFragmentGraphProto,
1052 resource_type: streaming_job_resource_type::ResourceType,
1053 permit: OwnedSemaphorePermit,
1054 ) -> MetaResult<NotificationVersion> {
1055 let mut fragment_graph =
1056 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1057 streaming_job.set_info_from_graph(&fragment_graph);
1058
1059 let incomplete_internal_tables = fragment_graph
1061 .incomplete_internal_tables()
1062 .into_values()
1063 .collect_vec();
1064 let table_id_map = self
1065 .metadata_manager
1066 .catalog_controller
1067 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1068 .await?;
1069 fragment_graph.refill_internal_table_ids(table_id_map);
1070
1071 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1073 let (ctx, stream_job_fragments) = self
1074 .build_stream_job(ctx, streaming_job, fragment_graph, resource_type)
1075 .await?;
1076
1077 let streaming_job = &ctx.streaming_job;
1078
1079 match streaming_job {
1080 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1081 self.validate_cdc_table(table, &stream_job_fragments)
1082 .await?;
1083 }
1084 StreamingJob::Table(Some(source), ..) => {
1085 self.source_manager.register_source(source).await?;
1087 let connector_name = source
1088 .get_with_properties()
1089 .get(UPSTREAM_SOURCE_KEY)
1090 .cloned();
1091 let attr = source.info.as_ref().map(|source_info| {
1092 jsonbb::json!({
1093 "format": source_info.format().as_str_name(),
1094 "encode": source_info.row_encode().as_str_name(),
1095 })
1096 });
1097 report_create_object(
1098 streaming_job.id(),
1099 "source",
1100 PbTelemetryDatabaseObject::Source,
1101 connector_name,
1102 attr,
1103 );
1104 }
1105 StreamingJob::Sink(sink) => {
1106 if sink.auto_refresh_schema_from_table.is_some() {
1107 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1108 }
1109 validate_sink(sink).await?;
1111 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1112 let attr = sink.format_desc.as_ref().map(|sink_info| {
1113 jsonbb::json!({
1114 "format": sink_info.format().as_str_name(),
1115 "encode": sink_info.encode().as_str_name(),
1116 })
1117 });
1118 report_create_object(
1119 streaming_job.id(),
1120 "sink",
1121 PbTelemetryDatabaseObject::Sink,
1122 connector_name,
1123 attr,
1124 );
1125 }
1126 StreamingJob::Source(source) => {
1127 self.source_manager.register_source(source).await?;
1129 let connector_name = source
1130 .get_with_properties()
1131 .get(UPSTREAM_SOURCE_KEY)
1132 .cloned();
1133 let attr = source.info.as_ref().map(|source_info| {
1134 jsonbb::json!({
1135 "format": source_info.format().as_str_name(),
1136 "encode": source_info.row_encode().as_str_name(),
1137 })
1138 });
1139 report_create_object(
1140 streaming_job.id(),
1141 "source",
1142 PbTelemetryDatabaseObject::Source,
1143 connector_name,
1144 attr,
1145 );
1146 }
1147 _ => {}
1148 }
1149
1150 let backfill_orders = ctx.fragment_backfill_ordering.clone().into();
1151 self.metadata_manager
1152 .catalog_controller
1153 .prepare_stream_job_fragments(
1154 &stream_job_fragments,
1155 streaming_job,
1156 false,
1157 Some(backfill_orders),
1158 )
1159 .await?;
1160
1161 let version = self
1163 .stream_manager
1164 .create_streaming_job(stream_job_fragments, ctx, permit)
1165 .await?;
1166
1167 Ok(version)
1168 }
1169
1170 pub async fn drop_object(
1172 &self,
1173 object_type: ObjectType,
1174 object_id: impl Into<ObjectId>,
1175 drop_mode: DropMode,
1176 ) -> MetaResult<NotificationVersion> {
1177 let object_id = object_id.into();
1178 let (release_ctx, version) = self
1179 .metadata_manager
1180 .catalog_controller
1181 .drop_object(object_type, object_id, drop_mode)
1182 .await?;
1183
1184 if object_type == ObjectType::Source {
1185 self.env
1186 .notification_manager_ref()
1187 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1188 }
1189
1190 let ReleaseContext {
1191 database_id,
1192 removed_streaming_job_ids,
1193 removed_state_table_ids,
1194 removed_source_ids,
1195 removed_secret_ids: secret_ids,
1196 removed_source_fragments,
1197 removed_actors,
1198 removed_fragments,
1199 removed_sink_fragment_by_targets,
1200 removed_iceberg_table_sinks,
1201 } = release_ctx;
1202
1203 let _guard = self.source_manager.pause_tick().await;
1204 self.stream_manager
1205 .drop_streaming_jobs(
1206 database_id,
1207 removed_actors.iter().map(|id| *id as _).collect(),
1208 removed_streaming_job_ids,
1209 removed_state_table_ids,
1210 removed_fragments.iter().map(|id| *id as _).collect(),
1211 removed_sink_fragment_by_targets
1212 .into_iter()
1213 .map(|(target, sinks)| {
1214 (target as _, sinks.into_iter().map(|id| id as _).collect())
1215 })
1216 .collect(),
1217 )
1218 .await;
1219
1220 self.source_manager
1223 .apply_source_change(SourceChange::DropSource {
1224 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1225 })
1226 .await;
1227
1228 let dropped_source_fragments = removed_source_fragments;
1231 self.source_manager
1232 .apply_source_change(SourceChange::DropMv {
1233 dropped_source_fragments,
1234 })
1235 .await;
1236
1237 let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1239 .iter()
1240 .map(|sink| sink.id)
1241 .collect();
1242
1243 for sink in removed_iceberg_table_sinks {
1244 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1245 .expect("Iceberg sink should be valid");
1246 let iceberg_sink =
1247 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1248 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1249 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1250 tracing::info!(
1251 "dropping iceberg table {} for dropped sink",
1252 table_identifier
1253 );
1254
1255 let _ = iceberg_catalog
1256 .drop_table(&table_identifier)
1257 .await
1258 .inspect_err(|err| {
1259 tracing::error!(
1260 "failed to drop iceberg table {} during cleanup: {}",
1261 table_identifier,
1262 err.as_report()
1263 );
1264 });
1265 }
1266 }
1267
1268 if !iceberg_sink_ids.is_empty() {
1270 self.sink_manager
1271 .stop_sink_coordinator(iceberg_sink_ids)
1272 .await;
1273 }
1274
1275 for secret in secret_ids {
1277 LocalSecretManager::global().remove_secret(secret);
1278 }
1279 Ok(version)
1280 }
1281
1282 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1284 pub async fn replace_job(
1285 &self,
1286 mut streaming_job: StreamingJob,
1287 fragment_graph: StreamFragmentGraphProto,
1288 ) -> MetaResult<NotificationVersion> {
1289 match &streaming_job {
1290 StreamingJob::Table(..)
1291 | StreamingJob::Source(..)
1292 | StreamingJob::MaterializedView(..) => {}
1293 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1294 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1295 }
1296 }
1297
1298 let job_id = streaming_job.id();
1299
1300 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1301 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1302
1303 let original_max_parallelism = self
1305 .metadata_manager
1306 .get_job_max_parallelism(streaming_job.id())
1307 .await?;
1308 let fragment_graph = PbStreamFragmentGraph {
1309 max_parallelism: original_max_parallelism as _,
1310 ..fragment_graph
1311 };
1312
1313 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1315 streaming_job.set_info_from_graph(&fragment_graph);
1316
1317 let streaming_job = streaming_job;
1319
1320 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1321 let auto_refresh_schema_sinks = self
1322 .metadata_manager
1323 .catalog_controller
1324 .get_sink_auto_refresh_schema_from(table.id)
1325 .await?;
1326 if !auto_refresh_schema_sinks.is_empty() {
1327 let original_table_columns = self
1328 .metadata_manager
1329 .catalog_controller
1330 .get_table_columns(table.id)
1331 .await?;
1332 let mut original_table_column_ids: HashSet<_> = original_table_columns
1334 .iter()
1335 .map(|col| col.column_id())
1336 .collect();
1337 let newly_added_columns = table
1338 .columns
1339 .iter()
1340 .filter(|col| {
1341 !original_table_column_ids.remove(&ColumnId::new(
1342 col.column_desc.as_ref().unwrap().column_id as _,
1343 ))
1344 })
1345 .map(|col| ColumnCatalog::from(col.clone()))
1346 .collect_vec();
1347 if !original_table_column_ids.is_empty() {
1348 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1349 }
1350 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1351 for sink in auto_refresh_schema_sinks {
1352 let sink_job_fragments = self
1353 .metadata_manager
1354 .get_job_fragments_by_id(sink.id.as_job_id())
1355 .await?;
1356 if sink_job_fragments.fragments.len() != 1 {
1357 return Err(anyhow!(
1358 "auto schema refresh sink must have only one fragment, but got {}",
1359 sink_job_fragments.fragments.len()
1360 )
1361 .into());
1362 }
1363 let original_sink_fragment =
1364 sink_job_fragments.fragments.into_values().next().unwrap();
1365 let (new_sink_fragment, new_schema, new_log_store_table) =
1366 rewrite_refresh_schema_sink_fragment(
1367 &original_sink_fragment,
1368 &sink,
1369 &newly_added_columns,
1370 table,
1371 fragment_graph.table_fragment_id(),
1372 self.env.id_gen_manager(),
1373 self.env.actor_id_generator(),
1374 )?;
1375
1376 assert_eq!(
1377 original_sink_fragment.actors.len(),
1378 new_sink_fragment.actors.len()
1379 );
1380 let actor_status = (0..original_sink_fragment.actors.len())
1381 .map(|i| {
1382 let worker_node_id = sink_job_fragments.actor_status
1383 [&original_sink_fragment.actors[i].actor_id]
1384 .location
1385 .as_ref()
1386 .unwrap()
1387 .worker_node_id;
1388 (
1389 new_sink_fragment.actors[i].actor_id,
1390 PbActorStatus {
1391 location: Some(PbActorLocation { worker_node_id }),
1392 },
1393 )
1394 })
1395 .collect();
1396
1397 let streaming_job = StreamingJob::Sink(sink);
1398
1399 let tmp_sink_id = self
1400 .metadata_manager
1401 .catalog_controller
1402 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1403 .await?
1404 .as_sink_id();
1405 let StreamingJob::Sink(sink) = streaming_job else {
1406 unreachable!()
1407 };
1408
1409 sinks.push(AutoRefreshSchemaSinkContext {
1410 tmp_sink_id,
1411 original_sink: sink,
1412 original_fragment: original_sink_fragment,
1413 new_schema,
1414 newly_add_fields: newly_added_columns
1415 .iter()
1416 .map(|col| Field::from(&col.column_desc))
1417 .collect(),
1418 new_fragment: new_sink_fragment,
1419 new_log_store_table,
1420 actor_status,
1421 });
1422 }
1423 Some(sinks)
1424 } else {
1425 None
1426 }
1427 } else {
1428 None
1429 };
1430
1431 let tmp_id = self
1432 .metadata_manager
1433 .catalog_controller
1434 .create_job_catalog_for_replace(
1435 &streaming_job,
1436 Some(&ctx),
1437 fragment_graph.specified_parallelism().as_ref(),
1438 Some(fragment_graph.max_parallelism()),
1439 )
1440 .await?;
1441
1442 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1443 sinks
1444 .iter()
1445 .map(|sink| sink.tmp_sink_id.as_object_id())
1446 .collect_vec()
1447 });
1448
1449 tracing::debug!(id = %job_id, "building replace streaming job");
1450 let mut updated_sink_catalogs = vec![];
1451
1452 let mut drop_table_connector_ctx = None;
1453 let result: MetaResult<_> = try {
1454 let (mut ctx, mut stream_job_fragments) = self
1455 .build_replace_job(
1456 ctx,
1457 &streaming_job,
1458 fragment_graph,
1459 tmp_id,
1460 auto_refresh_schema_sinks,
1461 )
1462 .await?;
1463 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1464 let auto_refresh_schema_sink_finish_ctx =
1465 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1466 sinks
1467 .iter()
1468 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1469 tmp_sink_id: sink.tmp_sink_id,
1470 original_sink_id: sink.original_sink.id,
1471 columns: sink.new_schema.clone(),
1472 new_log_store_table: sink
1473 .new_log_store_table
1474 .as_ref()
1475 .map(|table| (table.id, table.columns.clone())),
1476 })
1477 .collect()
1478 });
1479
1480 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1482 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1483 let upstream_infos = self
1484 .metadata_manager
1485 .catalog_controller
1486 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1487 .await?;
1488 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1489
1490 for upstream_info in &upstream_infos {
1491 let upstream_fragment_id = upstream_info.sink_fragment_id;
1492 ctx.upstream_fragment_downstreams
1493 .entry(upstream_fragment_id)
1494 .or_default()
1495 .push(upstream_info.new_sink_downstream.clone());
1496 if upstream_info.sink_original_target_columns.is_empty() {
1497 updated_sink_catalogs.push(upstream_info.sink_id);
1498 }
1499 }
1500 }
1501
1502 let replace_upstream = ctx.replace_upstream.clone();
1503
1504 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1505 let empty_downstreams = FragmentDownstreamRelation::default();
1506 for sink in sinks {
1507 self.metadata_manager
1508 .catalog_controller
1509 .prepare_streaming_job(
1510 sink.tmp_sink_id.as_job_id(),
1511 || [&sink.new_fragment].into_iter(),
1512 &empty_downstreams,
1513 true,
1514 None,
1515 None,
1516 )
1517 .await?;
1518 }
1519 }
1520
1521 self.metadata_manager
1522 .catalog_controller
1523 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1524 .await?;
1525
1526 self.stream_manager
1527 .replace_stream_job(stream_job_fragments, ctx)
1528 .await?;
1529 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1530 };
1531
1532 match result {
1533 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1534 let version = self
1535 .metadata_manager
1536 .catalog_controller
1537 .finish_replace_streaming_job(
1538 tmp_id,
1539 streaming_job,
1540 replace_upstream,
1541 SinkIntoTableContext {
1542 updated_sink_catalogs,
1543 },
1544 drop_table_connector_ctx.as_ref(),
1545 auto_refresh_schema_sink_finish_ctx,
1546 )
1547 .await?;
1548 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1549 self.source_manager
1550 .apply_source_change(SourceChange::DropSource {
1551 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1552 })
1553 .await;
1554 }
1555 Ok(version)
1556 }
1557 Err(err) => {
1558 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1559 let _ = self.metadata_manager
1560 .catalog_controller
1561 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1562 .await.inspect_err(|err| {
1563 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1564 });
1565 Err(err)
1566 }
1567 }
1568 }
1569
1570 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1571 )]
1572 async fn drop_streaming_job(
1573 &self,
1574 job_id: StreamingJobId,
1575 drop_mode: DropMode,
1576 ) -> MetaResult<NotificationVersion> {
1577 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1578
1579 let (object_id, object_type) = match job_id {
1580 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1581 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1582 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1583 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1584 };
1585
1586 let job_status = self
1587 .metadata_manager
1588 .catalog_controller
1589 .get_streaming_job_status(job_id.id())
1590 .await?;
1591 let version = match job_status {
1592 JobStatus::Initial => {
1593 unreachable!(
1594 "Job with Initial status should not notify frontend and therefore should not arrive here"
1595 );
1596 }
1597 JobStatus::Creating => {
1598 self.stream_manager
1599 .cancel_streaming_jobs(vec![job_id.id()])
1600 .await?;
1601 IGNORED_NOTIFICATION_VERSION
1602 }
1603 JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1604 };
1605
1606 Ok(version)
1607 }
1608
1609 fn resolve_stream_parallelism(
1613 &self,
1614 specified: Option<NonZeroUsize>,
1615 max: NonZeroUsize,
1616 cluster_info: &StreamingClusterInfo,
1617 resource_group: String,
1618 ) -> MetaResult<NonZeroUsize> {
1619 let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1620 DdlController::resolve_stream_parallelism_inner(
1621 specified,
1622 max,
1623 available,
1624 &self.env.opts.default_parallelism,
1625 &resource_group,
1626 )
1627 }
1628
1629 fn resolve_stream_parallelism_inner(
1630 specified: Option<NonZeroUsize>,
1631 max: NonZeroUsize,
1632 available: Option<NonZeroUsize>,
1633 default_parallelism: &DefaultParallelism,
1634 resource_group: &str,
1635 ) -> MetaResult<NonZeroUsize> {
1636 let Some(available) = available else {
1637 bail_unavailable!(
1638 "no available slots to schedule in resource group \"{}\", \
1639 have you allocated any compute nodes within this resource group?",
1640 resource_group
1641 );
1642 };
1643
1644 if let Some(specified) = specified {
1645 if specified > max {
1646 bail_invalid_parameter!(
1647 "specified parallelism {} should not exceed max parallelism {}",
1648 specified,
1649 max,
1650 );
1651 }
1652 if specified > available {
1653 tracing::warn!(
1654 resource_group,
1655 specified_parallelism = specified.get(),
1656 available_parallelism = available.get(),
1657 "specified parallelism exceeds available slots, scheduling with specified value",
1658 );
1659 }
1660 return Ok(specified);
1661 }
1662
1663 let default_parallelism = match default_parallelism {
1665 DefaultParallelism::Full => available,
1666 DefaultParallelism::Default(num) => {
1667 if *num > available {
1668 tracing::warn!(
1669 resource_group,
1670 configured_parallelism = num.get(),
1671 available_parallelism = available.get(),
1672 "default parallelism exceeds available slots, scheduling with configured value",
1673 );
1674 }
1675 *num
1676 }
1677 };
1678
1679 if default_parallelism > max {
1680 tracing::warn!(
1681 max_parallelism = max.get(),
1682 resource_group,
1683 "default parallelism exceeds max parallelism, capping to max",
1684 );
1685 }
1686 Ok(default_parallelism.min(max))
1687 }
1688
1689 #[await_tree::instrument]
1695 pub(crate) async fn build_stream_job(
1696 &self,
1697 stream_ctx: StreamContext,
1698 mut stream_job: StreamingJob,
1699 fragment_graph: StreamFragmentGraph,
1700 resource_type: streaming_job_resource_type::ResourceType,
1701 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1702 let id = stream_job.id();
1703 let specified_parallelism = fragment_graph.specified_parallelism();
1704 let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1705 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1706
1707 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1709
1710 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1714 fragment_graph.collect_snapshot_backfill_info()?;
1715 assert!(
1716 snapshot_backfill_info
1717 .iter()
1718 .chain([&cross_db_snapshot_backfill_info])
1719 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1720 .all(|backfill_epoch| backfill_epoch.is_none()),
1721 "should not set backfill epoch when initially build the job: {:?} {:?}",
1722 snapshot_backfill_info,
1723 cross_db_snapshot_backfill_info
1724 );
1725
1726 let locality_fragment_state_table_mapping =
1727 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1728
1729 self.metadata_manager
1731 .catalog_controller
1732 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1733 .await?;
1734
1735 let upstream_table_ids = fragment_graph
1736 .dependent_table_ids()
1737 .iter()
1738 .filter(|id| {
1739 !cross_db_snapshot_backfill_info
1740 .upstream_mv_table_id_to_backfill_epoch
1741 .contains_key(id)
1742 })
1743 .cloned()
1744 .collect();
1745
1746 let (upstream_root_fragments, existing_actor_location) = self
1747 .metadata_manager
1748 .get_upstream_root_fragments(&upstream_table_ids)
1749 .await?;
1750
1751 if snapshot_backfill_info.is_some() {
1752 match stream_job {
1753 StreamingJob::MaterializedView(_)
1754 | StreamingJob::Sink(_)
1755 | StreamingJob::Index(_, _) => {}
1756 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1757 return Err(
1758 anyhow!("snapshot_backfill not enabled for table and source").into(),
1759 );
1760 }
1761 }
1762 }
1763
1764 let upstream_actors = upstream_root_fragments
1765 .values()
1766 .map(|(fragment, _)| {
1767 (
1768 fragment.fragment_id,
1769 fragment.actors.keys().copied().collect(),
1770 )
1771 })
1772 .collect();
1773
1774 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1775 fragment_graph,
1776 FragmentGraphUpstreamContext {
1777 upstream_root_fragments,
1778 upstream_actor_location: existing_actor_location,
1779 },
1780 (&stream_job).into(),
1781 )?;
1782 let resource_group = if let Some(group) = resource_type.resource_group() {
1783 group
1784 } else {
1785 self.metadata_manager
1786 .get_database_resource_group(stream_job.database_id())
1787 .await?
1788 };
1789 let is_serverless_backfill = matches!(
1790 &resource_type,
1791 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1792 );
1793
1794 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1796
1797 let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1798 let parallelism = self.resolve_stream_parallelism(
1799 initial_parallelism,
1800 max_parallelism,
1801 &cluster_info,
1802 resource_group.clone(),
1803 )?;
1804
1805 let parallelism = self
1806 .env
1807 .system_params_reader()
1808 .await
1809 .adaptive_parallelism_strategy()
1810 .compute_target_parallelism(parallelism.get());
1811
1812 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1813 let actor_graph_builder = ActorGraphBuilder::new(
1814 id,
1815 resource_group,
1816 complete_graph,
1817 cluster_info,
1818 parallelism,
1819 )?;
1820
1821 let ActorGraphBuildResult {
1822 graph,
1823 downstream_fragment_relations,
1824 building_locations,
1825 upstream_fragment_downstreams,
1826 new_no_shuffle,
1827 replace_upstream,
1828 ..
1829 } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1830 assert!(replace_upstream.is_empty());
1831
1832 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1839 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1840 _ => TableParallelism::Fixed(parallelism.get()),
1841 };
1842
1843 let stream_job_fragments = StreamJobFragments::new(
1844 id,
1845 graph,
1846 &building_locations.actor_locations,
1847 stream_ctx.clone(),
1848 table_parallelism,
1849 max_parallelism.get(),
1850 );
1851
1852 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1853 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1854 }
1855
1856 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1857 && let Ok(table_id) = sink.get_target_table()
1858 {
1859 let tables = self
1860 .metadata_manager
1861 .get_table_catalog_by_ids(&[*table_id])
1862 .await?;
1863 let target_table = tables
1864 .first()
1865 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1866 let sink_fragment = stream_job_fragments
1867 .sink_fragment()
1868 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1869 let mview_fragment_id = self
1870 .metadata_manager
1871 .catalog_controller
1872 .get_mview_fragment_by_id(table_id.as_job_id())
1873 .await?;
1874 let upstream_sink_info = build_upstream_sink_info(
1875 sink.id,
1876 sink.original_target_columns.clone(),
1877 sink_fragment.fragment_id as _,
1878 target_table,
1879 mview_fragment_id,
1880 )?;
1881 Some(upstream_sink_info)
1882 } else {
1883 None
1884 };
1885
1886 let mut cdc_table_snapshot_splits = None;
1887 if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1888 && let Some((_, stream_cdc_scan)) =
1889 parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1890 {
1891 {
1892 let splits = try_init_parallel_cdc_table_snapshot_splits(
1894 table.id,
1895 stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1896 self.env.meta_store_ref(),
1897 stream_cdc_scan.options.as_ref().unwrap(),
1898 self.env.opts.cdc_table_split_init_insert_batch_size,
1899 self.env.opts.cdc_table_split_init_sleep_interval_splits,
1900 self.env.opts.cdc_table_split_init_sleep_duration_millis,
1901 )
1902 .await?;
1903 cdc_table_snapshot_splits = Some(splits);
1904 }
1905 }
1906
1907 let ctx = CreateStreamingJobContext {
1908 upstream_fragment_downstreams,
1909 new_no_shuffle,
1910 upstream_actors,
1911 building_locations,
1912 definition: stream_job.definition(),
1913 create_type: stream_job.create_type(),
1914 job_type: (&stream_job).into(),
1915 streaming_job: stream_job,
1916 new_upstream_sink,
1917 option: CreateStreamingJobOption {},
1918 snapshot_backfill_info,
1919 cross_db_snapshot_backfill_info,
1920 fragment_backfill_ordering,
1921 locality_fragment_state_table_mapping,
1922 cdc_table_snapshot_splits,
1923 is_serverless_backfill,
1924 };
1925
1926 Ok((
1927 ctx,
1928 StreamJobFragmentsToCreate {
1929 inner: stream_job_fragments,
1930 downstreams: downstream_fragment_relations,
1931 },
1932 ))
1933 }
1934
1935 pub(crate) async fn build_replace_job(
1941 &self,
1942 stream_ctx: StreamContext,
1943 stream_job: &StreamingJob,
1944 mut fragment_graph: StreamFragmentGraph,
1945 tmp_job_id: JobId,
1946 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1947 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1948 match &stream_job {
1949 StreamingJob::Table(..)
1950 | StreamingJob::Source(..)
1951 | StreamingJob::MaterializedView(..) => {}
1952 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1953 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1954 }
1955 }
1956
1957 let id = stream_job.id();
1958
1959 let mut drop_table_associated_source_id = None;
1961 if let StreamingJob::Table(None, _, _) = &stream_job {
1962 drop_table_associated_source_id = self
1963 .metadata_manager
1964 .get_table_associated_source_id(id.as_mv_table_id())
1965 .await?;
1966 }
1967
1968 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1969 let old_internal_table_ids = old_fragments.internal_table_ids();
1970
1971 let mut drop_table_connector_ctx = None;
1973 if let Some(to_remove_source_id) = drop_table_associated_source_id {
1974 debug_assert!(old_internal_table_ids.len() == 1);
1976
1977 drop_table_connector_ctx = Some(DropTableConnectorContext {
1978 to_change_streaming_job_id: id,
1981 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
1983 });
1984 } else if stream_job.is_materialized_view() {
1985 let old_fragments_upstreams = self
1988 .metadata_manager
1989 .catalog_controller
1990 .upstream_fragments(old_fragments.fragment_ids())
1991 .await?;
1992
1993 let old_state_graph =
1994 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1995 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1996 let result = state_match::match_graph(&new_state_graph, &old_state_graph)
1997 .context("incompatible altering on the streaming job states")?;
1998
1999 fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2000 fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2001 } else {
2002 let old_internal_tables = self
2005 .metadata_manager
2006 .get_table_catalog_by_ids(&old_internal_table_ids)
2007 .await?;
2008 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2009 }
2010
2011 let original_root_fragment = old_fragments
2014 .root_fragment()
2015 .expect("root fragment not found");
2016
2017 let job_type = StreamingJobType::from(stream_job);
2018
2019 let (mut downstream_fragments, mut downstream_actor_location) =
2021 self.metadata_manager.get_downstream_fragments(id).await?;
2022
2023 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2024 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2025 .iter()
2026 .map(|sink| sink.original_fragment.fragment_id)
2027 .collect();
2028 for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2029 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2030 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2031 }) {
2032 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2033 for actor_id in downstream_fragment.actors.keys() {
2034 downstream_actor_location.remove(actor_id);
2035 }
2036 for (actor_id, status) in &sink.actor_status {
2037 downstream_actor_location
2038 .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2039 }
2040
2041 *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2042 *nodes = sink.new_fragment.nodes.clone();
2043 }
2044 }
2045 assert!(remaining_fragment.is_empty());
2046 }
2047
2048 let complete_graph = match &job_type {
2050 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2051 CompleteStreamFragmentGraph::with_downstreams(
2052 fragment_graph,
2053 FragmentGraphDownstreamContext {
2054 original_root_fragment_id: original_root_fragment.fragment_id,
2055 downstream_fragments,
2056 downstream_actor_location,
2057 },
2058 job_type,
2059 )?
2060 }
2061 StreamingJobType::Table(TableJobType::SharedCdcSource)
2062 | StreamingJobType::MaterializedView => {
2063 let (upstream_root_fragments, upstream_actor_location) = self
2065 .metadata_manager
2066 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2067 .await?;
2068
2069 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2070 fragment_graph,
2071 FragmentGraphUpstreamContext {
2072 upstream_root_fragments,
2073 upstream_actor_location,
2074 },
2075 FragmentGraphDownstreamContext {
2076 original_root_fragment_id: original_root_fragment.fragment_id,
2077 downstream_fragments,
2078 downstream_actor_location,
2079 },
2080 job_type,
2081 )?
2082 }
2083 _ => unreachable!(),
2084 };
2085
2086 let resource_group = self
2087 .metadata_manager
2088 .get_existing_job_resource_group(id)
2089 .await?;
2090
2091 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2093
2094 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2097 .expect("The number of actors in the original table fragment should be greater than 0");
2098
2099 let actor_graph_builder = ActorGraphBuilder::new(
2100 id,
2101 resource_group,
2102 complete_graph,
2103 cluster_info,
2104 parallelism,
2105 )?;
2106
2107 let ActorGraphBuildResult {
2108 graph,
2109 downstream_fragment_relations,
2110 building_locations,
2111 upstream_fragment_downstreams,
2112 mut replace_upstream,
2113 new_no_shuffle,
2114 ..
2115 } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2116
2117 if matches!(
2119 job_type,
2120 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2121 ) {
2122 assert!(upstream_fragment_downstreams.is_empty());
2123 }
2124
2125 let stream_job_fragments = StreamJobFragments::new(
2129 tmp_job_id,
2130 graph,
2131 &building_locations.actor_locations,
2132 stream_ctx,
2133 old_fragments.assigned_parallelism,
2134 old_fragments.max_parallelism,
2135 );
2136
2137 if let Some(sinks) = &auto_refresh_schema_sinks {
2138 for sink in sinks {
2139 replace_upstream
2140 .remove(&sink.new_fragment.fragment_id)
2141 .expect("should exist");
2142 }
2143 }
2144
2145 let ctx = ReplaceStreamJobContext {
2149 old_fragments,
2150 replace_upstream,
2151 new_no_shuffle,
2152 upstream_fragment_downstreams,
2153 building_locations,
2154 streaming_job: stream_job.clone(),
2155 tmp_id: tmp_job_id,
2156 drop_table_connector_ctx,
2157 auto_refresh_schema_sinks,
2158 };
2159
2160 Ok((
2161 ctx,
2162 StreamJobFragmentsToCreate {
2163 inner: stream_job_fragments,
2164 downstreams: downstream_fragment_relations,
2165 },
2166 ))
2167 }
2168
2169 async fn alter_name(
2170 &self,
2171 relation: alter_name_request::Object,
2172 new_name: &str,
2173 ) -> MetaResult<NotificationVersion> {
2174 let (obj_type, id) = match relation {
2175 alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2176 alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2177 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2178 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2179 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2180 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2181 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2182 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2183 };
2184 self.metadata_manager
2185 .catalog_controller
2186 .alter_name(obj_type, id, new_name)
2187 .await
2188 }
2189
2190 async fn alter_swap_rename(
2191 &self,
2192 object: alter_swap_rename_request::Object,
2193 ) -> MetaResult<NotificationVersion> {
2194 let (obj_type, src_id, dst_id) = match object {
2195 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2196 alter_swap_rename_request::Object::Table(objs) => {
2197 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2198 (ObjectType::Table, src_id, dst_id)
2199 }
2200 alter_swap_rename_request::Object::View(objs) => {
2201 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2202 (ObjectType::View, src_id, dst_id)
2203 }
2204 alter_swap_rename_request::Object::Source(objs) => {
2205 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2206 (ObjectType::Source, src_id, dst_id)
2207 }
2208 alter_swap_rename_request::Object::Sink(objs) => {
2209 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2210 (ObjectType::Sink, src_id, dst_id)
2211 }
2212 alter_swap_rename_request::Object::Subscription(objs) => {
2213 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2214 (ObjectType::Subscription, src_id, dst_id)
2215 }
2216 };
2217
2218 self.metadata_manager
2219 .catalog_controller
2220 .alter_swap_rename(obj_type, src_id, dst_id)
2221 .await
2222 }
2223
2224 async fn alter_owner(
2225 &self,
2226 object: Object,
2227 owner_id: UserId,
2228 ) -> MetaResult<NotificationVersion> {
2229 let (obj_type, id) = match object {
2230 Object::TableId(id) => (ObjectType::Table, id),
2231 Object::ViewId(id) => (ObjectType::View, id),
2232 Object::SourceId(id) => (ObjectType::Source, id),
2233 Object::SinkId(id) => (ObjectType::Sink, id),
2234 Object::SchemaId(id) => (ObjectType::Schema, id),
2235 Object::DatabaseId(id) => (ObjectType::Database, id),
2236 Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2237 Object::ConnectionId(id) => (ObjectType::Connection, id),
2238 };
2239 self.metadata_manager
2240 .catalog_controller
2241 .alter_owner(obj_type, id.into(), owner_id as _)
2242 .await
2243 }
2244
2245 async fn alter_set_schema(
2246 &self,
2247 object: alter_set_schema_request::Object,
2248 new_schema_id: SchemaId,
2249 ) -> MetaResult<NotificationVersion> {
2250 let (obj_type, id) = match object {
2251 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2252 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2253 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2254 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2255 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2256 alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2257 alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2258 };
2259 self.metadata_manager
2260 .catalog_controller
2261 .alter_schema(obj_type, id.into(), new_schema_id as _)
2262 .await
2263 }
2264
2265 pub async fn wait(&self) -> MetaResult<()> {
2266 let timeout_ms = 30 * 60 * 1000;
2267 for _ in 0..timeout_ms {
2268 if self
2269 .metadata_manager
2270 .catalog_controller
2271 .list_background_creating_jobs(true, None)
2272 .await?
2273 .is_empty()
2274 {
2275 return Ok(());
2276 }
2277
2278 sleep(Duration::from_millis(1)).await;
2279 }
2280 Err(MetaError::cancelled(format!(
2281 "timeout after {timeout_ms}ms"
2282 )))
2283 }
2284
2285 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2286 self.metadata_manager
2287 .catalog_controller
2288 .comment_on(comment)
2289 .await
2290 }
2291
2292 async fn alter_streaming_job_config(
2293 &self,
2294 job_id: JobId,
2295 entries_to_add: HashMap<String, String>,
2296 keys_to_remove: Vec<String>,
2297 ) -> MetaResult<NotificationVersion> {
2298 self.metadata_manager
2299 .catalog_controller
2300 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2301 .await
2302 }
2303}
2304
2305fn report_create_object(
2306 job_id: JobId,
2307 event_name: &str,
2308 obj_type: PbTelemetryDatabaseObject,
2309 connector_name: Option<String>,
2310 attr_info: Option<jsonbb::Value>,
2311) {
2312 report_event(
2313 PbTelemetryEventStage::CreateStreamJob,
2314 event_name,
2315 job_id.as_raw_id() as _,
2316 connector_name,
2317 Some(obj_type),
2318 attr_info,
2319 );
2320}
2321
2322pub fn build_upstream_sink_info(
2323 sink_id: SinkId,
2324 original_target_columns: Vec<PbColumnCatalog>,
2325 sink_fragment_id: FragmentId,
2326 target_table: &PbTable,
2327 target_fragment_id: FragmentId,
2328) -> MetaResult<UpstreamSinkInfo> {
2329 let sink_columns = if !original_target_columns.is_empty() {
2330 original_target_columns.clone()
2331 } else {
2332 target_table.columns.clone()
2337 };
2338
2339 let sink_output_fields = sink_columns
2340 .iter()
2341 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2342 .collect_vec();
2343 let output_indices = (0..sink_output_fields.len())
2344 .map(|i| i as u32)
2345 .collect_vec();
2346
2347 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2348 let sink_idx_by_col_id = sink_columns
2349 .iter()
2350 .enumerate()
2351 .map(|(idx, col)| {
2352 let column_id = col.column_desc.as_ref().unwrap().column_id;
2353 (column_id, idx as u32)
2354 })
2355 .collect::<HashMap<_, _>>();
2356 target_table
2357 .distribution_key
2358 .iter()
2359 .map(|dist_idx| {
2360 let column_id = target_table.columns[*dist_idx as usize]
2361 .column_desc
2362 .as_ref()
2363 .unwrap()
2364 .column_id;
2365 let sink_idx = sink_idx_by_col_id
2366 .get(&column_id)
2367 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2368 Ok(*sink_idx)
2369 })
2370 .collect::<anyhow::Result<Vec<_>>>()?
2371 };
2372 let dist_key_indices =
2373 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2374 let downstream_fragment_id = target_fragment_id as _;
2375 let new_downstream_relation = DownstreamFragmentRelation {
2376 downstream_fragment_id,
2377 dispatcher_type: DispatcherType::Hash,
2378 dist_key_indices,
2379 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2380 };
2381 let current_target_columns = target_table.get_columns();
2382 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2383 Ok(UpstreamSinkInfo {
2384 sink_id,
2385 sink_fragment_id: sink_fragment_id as _,
2386 sink_output_fields,
2387 sink_original_target_columns: original_target_columns,
2388 project_exprs,
2389 new_sink_downstream: new_downstream_relation,
2390 })
2391}
2392
2393pub fn refill_upstream_sink_union_in_table(
2394 union_fragment_root: &mut PbStreamNode,
2395 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2396) {
2397 visit_stream_node_cont_mut(union_fragment_root, |node| {
2398 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2399 let init_upstreams = upstream_sink_infos
2400 .iter()
2401 .map(|info| PbUpstreamSinkInfo {
2402 upstream_fragment_id: info.sink_fragment_id,
2403 sink_output_schema: info.sink_output_fields.clone(),
2404 project_exprs: info.project_exprs.clone(),
2405 })
2406 .collect();
2407 upstream_sink_union.init_upstreams = init_upstreams;
2408 false
2409 } else {
2410 true
2411 }
2412 });
2413}
2414
2415#[cfg(test)]
2416mod tests {
2417 use std::num::NonZeroUsize;
2418
2419 use super::*;
2420
2421 #[test]
2422 fn test_specified_parallelism_exceeds_available() {
2423 let result = DdlController::resolve_stream_parallelism_inner(
2424 Some(NonZeroUsize::new(100).unwrap()),
2425 NonZeroUsize::new(256).unwrap(),
2426 Some(NonZeroUsize::new(4).unwrap()),
2427 &DefaultParallelism::Full,
2428 "default",
2429 )
2430 .unwrap();
2431 assert_eq!(result.get(), 100);
2432 }
2433
2434 #[test]
2435 fn test_allows_default_parallelism_over_available() {
2436 let result = DdlController::resolve_stream_parallelism_inner(
2437 None,
2438 NonZeroUsize::new(256).unwrap(),
2439 Some(NonZeroUsize::new(4).unwrap()),
2440 &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2441 "default",
2442 )
2443 .unwrap();
2444 assert_eq!(result.get(), 50);
2445 }
2446
2447 #[test]
2448 fn test_full_parallelism_capped_by_max() {
2449 let result = DdlController::resolve_stream_parallelism_inner(
2450 None,
2451 NonZeroUsize::new(6).unwrap(),
2452 Some(NonZeroUsize::new(10).unwrap()),
2453 &DefaultParallelism::Full,
2454 "default",
2455 )
2456 .unwrap();
2457 assert_eq!(result.get(), 6);
2458 }
2459
2460 #[test]
2461 fn test_no_available_slots_returns_error() {
2462 let result = DdlController::resolve_stream_parallelism_inner(
2463 None,
2464 NonZeroUsize::new(4).unwrap(),
2465 None,
2466 &DefaultParallelism::Full,
2467 "default",
2468 );
2469 assert!(matches!(
2470 result,
2471 Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2472 ));
2473 }
2474
2475 #[test]
2476 fn test_specified_over_max_returns_error() {
2477 let result = DdlController::resolve_stream_parallelism_inner(
2478 Some(NonZeroUsize::new(8).unwrap()),
2479 NonZeroUsize::new(4).unwrap(),
2480 Some(NonZeroUsize::new(10).unwrap()),
2481 &DefaultParallelism::Full,
2482 "default",
2483 );
2484 assert!(matches!(
2485 result,
2486 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2487 ));
2488 }
2489}