1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50 Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
51 Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57 alter_swap_rename_request,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::stream_plan::stream_node::NodeBody;
61use risingwave_pb::stream_plan::{
62 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
63 StreamFragmentGraph as StreamFragmentGraphProto,
64};
65use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
66use strum::Display;
67use thiserror_ext::AsReport;
68use tokio::sync::{OwnedSemaphorePermit, Semaphore};
69use tokio::time::sleep;
70use tracing::Instrument;
71
72use crate::barrier::BarrierManagerRef;
73use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
74use crate::controller::cluster::StreamingClusterInfo;
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::controller::utils::build_select_node_list;
77use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
78use crate::manager::sink_coordination::SinkCoordinatorManager;
79use crate::manager::{
80 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
81 NotificationVersion, StreamingJob, StreamingJobType,
82};
83use crate::model::{
84 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
85 FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
86 TableParallelism,
87};
88use crate::stream::cdc::{
89 parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
90};
91use crate::stream::{
92 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
93 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
94 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
95 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
96 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
97 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
98};
99use crate::telemetry::report_event;
100use crate::{MetaError, MetaResult};
101
102#[derive(PartialEq)]
103pub enum DropMode {
104 Restrict,
105 Cascade,
106}
107
108impl DropMode {
109 pub fn from_request_setting(cascade: bool) -> DropMode {
110 if cascade {
111 DropMode::Cascade
112 } else {
113 DropMode::Restrict
114 }
115 }
116}
117
118#[derive(strum::AsRefStr)]
119pub enum StreamingJobId {
120 MaterializedView(TableId),
121 Sink(SinkId),
122 Table(Option<SourceId>, TableId),
123 Index(IndexId),
124}
125
126impl std::fmt::Display for StreamingJobId {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 write!(f, "{}", self.as_ref())?;
129 write!(f, "({})", self.id())
130 }
131}
132
133impl StreamingJobId {
134 fn id(&self) -> JobId {
135 match self {
136 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
137 StreamingJobId::Index(id) => id.as_job_id(),
138 StreamingJobId::Sink(id) => id.as_job_id(),
139 }
140 }
141}
142
143pub struct ReplaceStreamJobInfo {
146 pub streaming_job: StreamingJob,
147 pub fragment_graph: StreamFragmentGraphProto,
148}
149
150#[derive(Display)]
151pub enum DdlCommand {
152 CreateDatabase(Database),
153 DropDatabase(DatabaseId),
154 CreateSchema(Schema),
155 DropSchema(SchemaId, DropMode),
156 CreateNonSharedSource(Source),
157 DropSource(SourceId, DropMode),
158 CreateFunction(Function),
159 DropFunction(FunctionId, DropMode),
160 CreateView(View, HashSet<ObjectId>),
161 DropView(ViewId, DropMode),
162 CreateStreamingJob {
163 stream_job: StreamingJob,
164 fragment_graph: StreamFragmentGraphProto,
165 dependencies: HashSet<ObjectId>,
166 specific_resource_group: Option<String>, if_not_exists: bool,
168 },
169 DropStreamingJob {
170 job_id: StreamingJobId,
171 drop_mode: DropMode,
172 },
173 AlterName(alter_name_request::Object, String),
174 AlterSwapRename(alter_swap_rename_request::Object),
175 ReplaceStreamJob(ReplaceStreamJobInfo),
176 AlterNonSharedSource(Source),
177 AlterObjectOwner(Object, UserId),
178 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
179 CreateConnection(Connection),
180 DropConnection(ConnectionId, DropMode),
181 CreateSecret(Secret),
182 AlterSecret(Secret),
183 DropSecret(SecretId),
184 CommentOn(Comment),
185 CreateSubscription(Subscription),
186 DropSubscription(SubscriptionId, DropMode),
187 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
188 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
189}
190
191impl DdlCommand {
192 fn object(&self) -> Either<String, ObjectId> {
194 use Either::*;
195 match self {
196 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
197 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
198 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
199 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
200 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
201 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
202 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
203 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
204 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
205 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
206 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
207 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
208 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
209 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
210 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
211 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
212 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
213 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
214 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
215 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
216 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
217 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
218 DdlCommand::DropSecret(id) => Right(id.as_object_id()),
219 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
220 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
221 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
222 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
223 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
224 }
225 }
226
227 fn allow_in_recovery(&self) -> bool {
228 match self {
229 DdlCommand::DropDatabase(_)
230 | DdlCommand::DropSchema(_, _)
231 | DdlCommand::DropSource(_, _)
232 | DdlCommand::DropFunction(_, _)
233 | DdlCommand::DropView(_, _)
234 | DdlCommand::DropStreamingJob { .. }
235 | DdlCommand::DropConnection(_, _)
236 | DdlCommand::DropSecret(_)
237 | DdlCommand::DropSubscription(_, _)
238 | DdlCommand::AlterName(_, _)
239 | DdlCommand::AlterObjectOwner(_, _)
240 | DdlCommand::AlterSetSchema(_, _)
241 | DdlCommand::CreateDatabase(_)
242 | DdlCommand::CreateSchema(_)
243 | DdlCommand::CreateFunction(_)
244 | DdlCommand::CreateView(_, _)
245 | DdlCommand::CreateConnection(_)
246 | DdlCommand::CommentOn(_)
247 | DdlCommand::CreateSecret(_)
248 | DdlCommand::AlterSecret(_)
249 | DdlCommand::AlterSwapRename(_)
250 | DdlCommand::AlterDatabaseParam(_, _)
251 | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
252 DdlCommand::CreateStreamingJob { .. }
253 | DdlCommand::CreateNonSharedSource(_)
254 | DdlCommand::ReplaceStreamJob(_)
255 | DdlCommand::AlterNonSharedSource(_)
256 | DdlCommand::CreateSubscription(_) => false,
257 }
258 }
259}
260
261#[derive(Clone)]
262pub struct DdlController {
263 pub(crate) env: MetaSrvEnv,
264
265 pub(crate) metadata_manager: MetadataManager,
266 pub(crate) stream_manager: GlobalStreamManagerRef,
267 pub(crate) source_manager: SourceManagerRef,
268 barrier_manager: BarrierManagerRef,
269 sink_manager: SinkCoordinatorManager,
270
271 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
273
274 seq: Arc<AtomicU64>,
276}
277
278#[derive(Clone)]
279pub struct CreatingStreamingJobPermit {
280 pub(crate) semaphore: Arc<Semaphore>,
281}
282
283impl CreatingStreamingJobPermit {
284 async fn new(env: &MetaSrvEnv) -> Self {
285 let mut permits = env
286 .system_params_reader()
287 .await
288 .max_concurrent_creating_streaming_jobs() as usize;
289 if permits == 0 {
290 permits = Semaphore::MAX_PERMITS;
292 }
293 let semaphore = Arc::new(Semaphore::new(permits));
294
295 let (local_notification_tx, mut local_notification_rx) =
296 tokio::sync::mpsc::unbounded_channel();
297 env.notification_manager()
298 .insert_local_sender(local_notification_tx);
299 let semaphore_clone = semaphore.clone();
300 tokio::spawn(async move {
301 while let Some(notification) = local_notification_rx.recv().await {
302 let LocalNotification::SystemParamsChange(p) = ¬ification else {
303 continue;
304 };
305 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
306 if new_permits == 0 {
307 new_permits = Semaphore::MAX_PERMITS;
308 }
309 match permits.cmp(&new_permits) {
310 Ordering::Less => {
311 semaphore_clone.add_permits(new_permits - permits);
312 }
313 Ordering::Equal => continue,
314 Ordering::Greater => {
315 let to_release = permits - new_permits;
316 let reduced = semaphore_clone.forget_permits(to_release);
317 if reduced != to_release {
319 tracing::warn!(
320 "no enough permits to release, expected {}, but reduced {}",
321 to_release,
322 reduced
323 );
324 }
325 }
326 }
327 tracing::info!(
328 "max_concurrent_creating_streaming_jobs changed from {} to {}",
329 permits,
330 new_permits
331 );
332 permits = new_permits;
333 }
334 });
335
336 Self { semaphore }
337 }
338}
339
340impl DdlController {
341 pub async fn new(
342 env: MetaSrvEnv,
343 metadata_manager: MetadataManager,
344 stream_manager: GlobalStreamManagerRef,
345 source_manager: SourceManagerRef,
346 barrier_manager: BarrierManagerRef,
347 sink_manager: SinkCoordinatorManager,
348 ) -> Self {
349 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
350 Self {
351 env,
352 metadata_manager,
353 stream_manager,
354 source_manager,
355 barrier_manager,
356 sink_manager,
357 creating_streaming_job_permits,
358 seq: Arc::new(AtomicU64::new(0)),
359 }
360 }
361
362 pub fn next_seq(&self) -> u64 {
364 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
366 }
367
368 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
375 if !command.allow_in_recovery() {
376 self.barrier_manager.check_status_running()?;
377 }
378
379 let await_tree_key = format!("DDL Command {}", self.next_seq());
380 let await_tree_span = await_tree::span!("{command}({})", command.object());
381
382 let ctrl = self.clone();
383 let fut = async move {
384 match command {
385 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
386 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
387 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
388 DdlCommand::DropSchema(schema_id, drop_mode) => {
389 ctrl.drop_schema(schema_id, drop_mode).await
390 }
391 DdlCommand::CreateNonSharedSource(source) => {
392 ctrl.create_non_shared_source(source).await
393 }
394 DdlCommand::DropSource(source_id, drop_mode) => {
395 ctrl.drop_source(source_id, drop_mode).await
396 }
397 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
398 DdlCommand::DropFunction(function_id, drop_mode) => {
399 ctrl.drop_function(function_id, drop_mode).await
400 }
401 DdlCommand::CreateView(view, dependencies) => {
402 ctrl.create_view(view, dependencies).await
403 }
404 DdlCommand::DropView(view_id, drop_mode) => {
405 ctrl.drop_view(view_id, drop_mode).await
406 }
407 DdlCommand::CreateStreamingJob {
408 stream_job,
409 fragment_graph,
410 dependencies,
411 specific_resource_group,
412 if_not_exists,
413 } => {
414 ctrl.create_streaming_job(
415 stream_job,
416 fragment_graph,
417 dependencies,
418 specific_resource_group,
419 if_not_exists,
420 )
421 .await
422 }
423 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
424 ctrl.drop_streaming_job(job_id, drop_mode).await
425 }
426 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
427 streaming_job,
428 fragment_graph,
429 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
430 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
431 DdlCommand::AlterObjectOwner(object, owner_id) => {
432 ctrl.alter_owner(object, owner_id).await
433 }
434 DdlCommand::AlterSetSchema(object, new_schema_id) => {
435 ctrl.alter_set_schema(object, new_schema_id).await
436 }
437 DdlCommand::CreateConnection(connection) => {
438 ctrl.create_connection(connection).await
439 }
440 DdlCommand::DropConnection(connection_id, drop_mode) => {
441 ctrl.drop_connection(connection_id, drop_mode).await
442 }
443 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
444 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
445 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
446 DdlCommand::AlterNonSharedSource(source) => {
447 ctrl.alter_non_shared_source(source).await
448 }
449 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
450 DdlCommand::CreateSubscription(subscription) => {
451 ctrl.create_subscription(subscription).await
452 }
453 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
454 ctrl.drop_subscription(subscription_id, drop_mode).await
455 }
456 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
457 DdlCommand::AlterDatabaseParam(database_id, param) => {
458 ctrl.alter_database_param(database_id, param).await
459 }
460 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
461 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
462 .await
463 }
464 }
465 }
466 .in_current_span();
467 let fut = (self.env.await_tree_reg())
468 .register(await_tree_key, await_tree_span)
469 .instrument(Box::pin(fut));
470 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
471 Ok(Some(WaitVersion {
472 catalog_version: notification_version,
473 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
474 }))
475 }
476
477 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
478 self.barrier_manager.get_ddl_progress().await
479 }
480
481 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
482 let (version, updated_db) = self
483 .metadata_manager
484 .catalog_controller
485 .create_database(database)
486 .await?;
487 self.barrier_manager
489 .update_database_barrier(
490 updated_db.database_id,
491 updated_db.barrier_interval_ms.map(|v| v as u32),
492 updated_db.checkpoint_frequency.map(|v| v as u64),
493 )
494 .await?;
495 Ok(version)
496 }
497
498 #[tracing::instrument(skip(self), level = "debug")]
499 pub async fn reschedule_streaming_job(
500 &self,
501 job_id: JobId,
502 target: ReschedulePolicy,
503 mut deferred: bool,
504 ) -> MetaResult<()> {
505 tracing::info!("altering parallelism for job {}", job_id);
506 if self.barrier_manager.check_status_running().is_err() {
507 tracing::info!(
508 "alter parallelism is set to deferred mode because the system is in recovery state"
509 );
510 deferred = true;
511 }
512
513 self.stream_manager
514 .reschedule_streaming_job(job_id, target, deferred)
515 .await
516 }
517
518 pub async fn reschedule_cdc_table_backfill(
519 &self,
520 job_id: JobId,
521 target: ReschedulePolicy,
522 ) -> MetaResult<()> {
523 tracing::info!("alter CDC table backfill parallelism");
524 if self.barrier_manager.check_status_running().is_err() {
525 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
526 }
527 self.stream_manager
528 .reschedule_cdc_table_backfill(job_id, target)
529 .await
530 }
531
532 pub async fn reschedule_fragments(
533 &self,
534 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
535 ) -> MetaResult<()> {
536 tracing::info!(
537 "altering parallelism for fragments {:?}",
538 fragment_targets.keys()
539 );
540 let fragment_targets = fragment_targets
541 .into_iter()
542 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
543 .collect();
544
545 self.stream_manager
546 .reschedule_fragments(fragment_targets)
547 .await
548 }
549
550 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
551 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
552 .await
553 }
554
555 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
556 self.metadata_manager
557 .catalog_controller
558 .create_schema(schema)
559 .await
560 }
561
562 async fn drop_schema(
563 &self,
564 schema_id: SchemaId,
565 drop_mode: DropMode,
566 ) -> MetaResult<NotificationVersion> {
567 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
568 .await
569 }
570
571 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
573 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
574 .await
575 .context("failed to create source worker")?;
576
577 let (source_id, version) = self
578 .metadata_manager
579 .catalog_controller
580 .create_source(source)
581 .await?;
582 self.source_manager
583 .register_source_with_handle(source_id, handle)
584 .await;
585 Ok(version)
586 }
587
588 async fn drop_source(
589 &self,
590 source_id: SourceId,
591 drop_mode: DropMode,
592 ) -> MetaResult<NotificationVersion> {
593 self.drop_object(ObjectType::Source, source_id, drop_mode)
594 .await
595 }
596
597 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
600 self.metadata_manager
601 .catalog_controller
602 .alter_non_shared_source(source)
603 .await
604 }
605
606 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
607 self.metadata_manager
608 .catalog_controller
609 .create_function(function)
610 .await
611 }
612
613 async fn drop_function(
614 &self,
615 function_id: FunctionId,
616 drop_mode: DropMode,
617 ) -> MetaResult<NotificationVersion> {
618 self.drop_object(ObjectType::Function, function_id, drop_mode)
619 .await
620 }
621
622 async fn create_view(
623 &self,
624 view: View,
625 dependencies: HashSet<ObjectId>,
626 ) -> MetaResult<NotificationVersion> {
627 self.metadata_manager
628 .catalog_controller
629 .create_view(view, dependencies)
630 .await
631 }
632
633 async fn drop_view(
634 &self,
635 view_id: ViewId,
636 drop_mode: DropMode,
637 ) -> MetaResult<NotificationVersion> {
638 self.drop_object(ObjectType::View, view_id, drop_mode).await
639 }
640
641 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
642 validate_connection(&connection).await?;
643 self.metadata_manager
644 .catalog_controller
645 .create_connection(connection)
646 .await
647 }
648
649 async fn drop_connection(
650 &self,
651 connection_id: ConnectionId,
652 drop_mode: DropMode,
653 ) -> MetaResult<NotificationVersion> {
654 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
655 .await
656 }
657
658 async fn alter_database_param(
659 &self,
660 database_id: DatabaseId,
661 param: AlterDatabaseParam,
662 ) -> MetaResult<NotificationVersion> {
663 let (version, updated_db) = self
664 .metadata_manager
665 .catalog_controller
666 .alter_database_param(database_id, param)
667 .await?;
668 self.barrier_manager
670 .update_database_barrier(
671 database_id,
672 updated_db.barrier_interval_ms.map(|v| v as u32),
673 updated_db.checkpoint_frequency.map(|v| v as u64),
674 )
675 .await?;
676 Ok(version)
677 }
678
679 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
682 let secret_store_private_key = self
683 .env
684 .opts
685 .secret_store_private_key
686 .clone()
687 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
688
689 let encrypted_payload = SecretEncryption::encrypt(
690 secret_store_private_key.as_slice(),
691 secret.get_value().as_slice(),
692 )
693 .context(format!("failed to encrypt secret {}", secret.name))?;
694 Ok(encrypted_payload
695 .serialize()
696 .context(format!("failed to serialize secret {}", secret.name))?)
697 }
698
699 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
700 let secret_plain_payload = secret.value.clone();
703 let encrypted_payload = self.get_encrypted_payload(&secret)?;
704 secret.value = encrypted_payload;
705
706 self.metadata_manager
707 .catalog_controller
708 .create_secret(secret, secret_plain_payload)
709 .await
710 }
711
712 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
713 self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
714 .await
715 }
716
717 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
718 let secret_plain_payload = secret.value.clone();
719 let encrypted_payload = self.get_encrypted_payload(&secret)?;
720 secret.value = encrypted_payload;
721 self.metadata_manager
722 .catalog_controller
723 .alter_secret(secret, secret_plain_payload)
724 .await
725 }
726
727 async fn create_subscription(
728 &self,
729 mut subscription: Subscription,
730 ) -> MetaResult<NotificationVersion> {
731 tracing::debug!("create subscription");
732 let _permit = self
733 .creating_streaming_job_permits
734 .semaphore
735 .acquire()
736 .await
737 .unwrap();
738 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
739 self.metadata_manager
740 .catalog_controller
741 .create_subscription_catalog(&mut subscription)
742 .await?;
743 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
744 tracing::debug!(error = %err.as_report(), "failed to create subscription");
745 let _ = self
746 .metadata_manager
747 .catalog_controller
748 .try_abort_creating_subscription(subscription.id)
749 .await
750 .inspect_err(|e| {
751 tracing::error!(
752 error = %e.as_report(),
753 "failed to abort create subscription after failure"
754 );
755 });
756 return Err(err);
757 }
758
759 let version = self
760 .metadata_manager
761 .catalog_controller
762 .notify_create_subscription(subscription.id)
763 .await?;
764 tracing::debug!("finish create subscription");
765 Ok(version)
766 }
767
768 async fn drop_subscription(
769 &self,
770 subscription_id: SubscriptionId,
771 drop_mode: DropMode,
772 ) -> MetaResult<NotificationVersion> {
773 tracing::debug!("preparing drop subscription");
774 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
775 let subscription = self
776 .metadata_manager
777 .catalog_controller
778 .get_subscription_by_id(subscription_id)
779 .await?;
780 let table_id = subscription.dependent_table_id;
781 let database_id = subscription.database_id;
782 let (_, version) = self
783 .metadata_manager
784 .catalog_controller
785 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
786 .await?;
787 self.stream_manager
788 .drop_subscription(database_id, subscription_id, table_id)
789 .await;
790 tracing::debug!("finish drop subscription");
791 Ok(version)
792 }
793
794 #[await_tree::instrument]
796 pub(crate) async fn validate_cdc_table(
797 &self,
798 table: &Table,
799 table_fragments: &StreamJobFragments,
800 ) -> MetaResult<()> {
801 let stream_scan_fragment = table_fragments
802 .fragments
803 .values()
804 .filter(|f| {
805 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
806 || f.fragment_type_mask
807 .contains(FragmentTypeFlag::StreamCdcScan)
808 })
809 .exactly_one()
810 .ok()
811 .with_context(|| {
812 format!(
813 "expect exactly one stream scan fragment, got: {:?}",
814 table_fragments.fragments
815 )
816 })?;
817 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
818 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
819 if let Some(o) = node.options
820 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
821 {
822 } else {
824 assert_eq!(
825 stream_scan_fragment.actors.len(),
826 1,
827 "Stream scan fragment should have only one actor"
828 );
829 }
830 }
831 }
832 let mut found_cdc_scan = false;
833 match &stream_scan_fragment.nodes.node_body {
834 Some(NodeBody::StreamCdcScan(_)) => {
835 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
836 if self
837 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
838 .await?
839 {
840 found_cdc_scan = true;
841 }
842 }
843 Some(NodeBody::Project(_)) => {
845 for input in &stream_scan_fragment.nodes.input {
846 assert_parallelism(stream_scan_fragment, &input.node_body);
847 if self
848 .validate_cdc_table_inner(&input.node_body, table.id)
849 .await?
850 {
851 found_cdc_scan = true;
852 }
853 }
854 }
855 _ => {
856 bail!("Unexpected node body for stream cdc scan");
857 }
858 };
859 if !found_cdc_scan {
860 bail!("No stream cdc scan node found in stream scan fragment");
861 }
862 Ok(())
863 }
864
865 async fn validate_cdc_table_inner(
866 &self,
867 node_body: &Option<NodeBody>,
868 table_id: TableId,
869 ) -> MetaResult<bool> {
870 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
871 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
872 {
873 let options_with_secret = WithOptionsSecResolved::new(
874 cdc_table_desc.connect_properties.clone(),
875 cdc_table_desc.secret_refs.clone(),
876 );
877
878 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
879 props.init_from_pb_cdc_table_desc(cdc_table_desc);
880
881 let _enumerator = props
883 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
884 .await?;
885
886 tracing::debug!(?table_id, "validate cdc table success");
887 Ok(true)
888 } else {
889 Ok(false)
890 }
891 }
892
893 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
894 let migrated = self
895 .metadata_manager
896 .catalog_controller
897 .has_table_been_migrated(table_id)
898 .await?;
899 if !migrated {
900 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
901 } else {
902 Ok(())
903 }
904 }
905
906 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
909 pub async fn create_streaming_job(
910 &self,
911 mut streaming_job: StreamingJob,
912 fragment_graph: StreamFragmentGraphProto,
913 dependencies: HashSet<ObjectId>,
914 specific_resource_group: Option<String>,
915 if_not_exists: bool,
916 ) -> MetaResult<NotificationVersion> {
917 if let StreamingJob::Sink(sink) = &streaming_job
918 && let Some(target_table) = sink.target_table
919 {
920 self.validate_table_for_sink(target_table).await?;
921 }
922 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
923 let check_ret = self
924 .metadata_manager
925 .catalog_controller
926 .create_job_catalog(
927 &mut streaming_job,
928 &ctx,
929 &fragment_graph.parallelism,
930 fragment_graph.max_parallelism as _,
931 dependencies,
932 specific_resource_group.clone(),
933 )
934 .await;
935 if let Err(meta_err) = check_ret {
936 if !if_not_exists {
937 return Err(meta_err);
938 }
939 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
940 if streaming_job.create_type() == CreateType::Foreground {
941 let database_id = streaming_job.database_id();
942 self.metadata_manager
943 .wait_streaming_job_finished(database_id, *job_id)
944 .await
945 } else {
946 Ok(IGNORED_NOTIFICATION_VERSION)
947 }
948 } else {
949 Err(meta_err)
950 };
951 }
952 let job_id = streaming_job.id();
953 tracing::debug!(
954 id = %job_id,
955 definition = streaming_job.definition(),
956 create_type = streaming_job.create_type().as_str_name(),
957 job_type = ?streaming_job.job_type(),
958 "starting streaming job",
959 );
960 let permit = self
962 .creating_streaming_job_permits
963 .semaphore
964 .clone()
965 .acquire_owned()
966 .instrument_await("acquire_creating_streaming_job_permit")
967 .await
968 .unwrap();
969 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
970
971 let name = streaming_job.name();
972 let definition = streaming_job.definition();
973 let source_id = match &streaming_job {
974 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
975 _ => None,
976 };
977
978 match self
980 .create_streaming_job_inner(
981 ctx,
982 streaming_job,
983 fragment_graph,
984 specific_resource_group,
985 permit,
986 )
987 .await
988 {
989 Ok(version) => Ok(version),
990 Err(err) => {
991 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
992 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
993 id: job_id,
994 name,
995 definition,
996 error: err.as_report().to_string(),
997 };
998 self.env.event_log_manager_ref().add_event_logs(vec![
999 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1000 ]);
1001 let (aborted, _) = self
1002 .metadata_manager
1003 .catalog_controller
1004 .try_abort_creating_streaming_job(job_id, false)
1005 .await?;
1006 if aborted {
1007 tracing::warn!(id = %job_id, "aborted streaming job");
1008 if let Some(source_id) = source_id {
1010 self.source_manager
1011 .apply_source_change(SourceChange::DropSource {
1012 dropped_source_ids: vec![source_id],
1013 })
1014 .await;
1015 }
1016 }
1017 Err(err)
1018 }
1019 }
1020 }
1021
1022 #[await_tree::instrument(boxed)]
1023 async fn create_streaming_job_inner(
1024 &self,
1025 ctx: StreamContext,
1026 mut streaming_job: StreamingJob,
1027 fragment_graph: StreamFragmentGraphProto,
1028 specific_resource_group: Option<String>,
1029 permit: OwnedSemaphorePermit,
1030 ) -> MetaResult<NotificationVersion> {
1031 let mut fragment_graph =
1032 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1033 streaming_job.set_info_from_graph(&fragment_graph);
1034
1035 let incomplete_internal_tables = fragment_graph
1037 .incomplete_internal_tables()
1038 .into_values()
1039 .collect_vec();
1040 let table_id_map = self
1041 .metadata_manager
1042 .catalog_controller
1043 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1044 .await?;
1045 fragment_graph.refill_internal_table_ids(table_id_map);
1046
1047 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1049 let (ctx, stream_job_fragments) = self
1050 .build_stream_job(ctx, streaming_job, fragment_graph, specific_resource_group)
1051 .await?;
1052
1053 let streaming_job = &ctx.streaming_job;
1054
1055 match streaming_job {
1056 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1057 self.validate_cdc_table(table, &stream_job_fragments)
1058 .await?;
1059 }
1060 StreamingJob::Table(Some(source), ..) => {
1061 self.source_manager.register_source(source).await?;
1063 let connector_name = source
1064 .get_with_properties()
1065 .get(UPSTREAM_SOURCE_KEY)
1066 .cloned();
1067 let attr = source.info.as_ref().map(|source_info| {
1068 jsonbb::json!({
1069 "format": source_info.format().as_str_name(),
1070 "encode": source_info.row_encode().as_str_name(),
1071 })
1072 });
1073 report_create_object(
1074 streaming_job.id(),
1075 "source",
1076 PbTelemetryDatabaseObject::Source,
1077 connector_name,
1078 attr,
1079 );
1080 }
1081 StreamingJob::Sink(sink) => {
1082 if sink.auto_refresh_schema_from_table.is_some() {
1083 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1084 }
1085 validate_sink(sink).await?;
1087 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1088 let attr = sink.format_desc.as_ref().map(|sink_info| {
1089 jsonbb::json!({
1090 "format": sink_info.format().as_str_name(),
1091 "encode": sink_info.encode().as_str_name(),
1092 })
1093 });
1094 report_create_object(
1095 streaming_job.id(),
1096 "sink",
1097 PbTelemetryDatabaseObject::Sink,
1098 connector_name,
1099 attr,
1100 );
1101 }
1102 StreamingJob::Source(source) => {
1103 self.source_manager.register_source(source).await?;
1105 let connector_name = source
1106 .get_with_properties()
1107 .get(UPSTREAM_SOURCE_KEY)
1108 .cloned();
1109 let attr = source.info.as_ref().map(|source_info| {
1110 jsonbb::json!({
1111 "format": source_info.format().as_str_name(),
1112 "encode": source_info.row_encode().as_str_name(),
1113 })
1114 });
1115 report_create_object(
1116 streaming_job.id(),
1117 "source",
1118 PbTelemetryDatabaseObject::Source,
1119 connector_name,
1120 attr,
1121 );
1122 }
1123 _ => {}
1124 }
1125
1126 self.metadata_manager
1127 .catalog_controller
1128 .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1129 .await?;
1130
1131 let version = self
1133 .stream_manager
1134 .create_streaming_job(stream_job_fragments, ctx, permit)
1135 .await?;
1136
1137 Ok(version)
1138 }
1139
1140 pub async fn drop_object(
1142 &self,
1143 object_type: ObjectType,
1144 object_id: impl Into<ObjectId>,
1145 drop_mode: DropMode,
1146 ) -> MetaResult<NotificationVersion> {
1147 let object_id = object_id.into();
1148 let (release_ctx, version) = self
1149 .metadata_manager
1150 .catalog_controller
1151 .drop_object(object_type, object_id, drop_mode)
1152 .await?;
1153
1154 if object_type == ObjectType::Source {
1155 self.env
1156 .notification_manager_ref()
1157 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1158 }
1159
1160 let ReleaseContext {
1161 database_id,
1162 removed_streaming_job_ids,
1163 removed_state_table_ids,
1164 removed_source_ids,
1165 removed_secret_ids: secret_ids,
1166 removed_source_fragments,
1167 removed_actors,
1168 removed_fragments,
1169 removed_sink_fragment_by_targets,
1170 removed_iceberg_table_sinks,
1171 } = release_ctx;
1172
1173 let _guard = self.source_manager.pause_tick().await;
1174 self.stream_manager
1175 .drop_streaming_jobs(
1176 database_id,
1177 removed_actors.iter().map(|id| *id as _).collect(),
1178 removed_streaming_job_ids,
1179 removed_state_table_ids,
1180 removed_fragments.iter().map(|id| *id as _).collect(),
1181 removed_sink_fragment_by_targets
1182 .into_iter()
1183 .map(|(target, sinks)| {
1184 (target as _, sinks.into_iter().map(|id| id as _).collect())
1185 })
1186 .collect(),
1187 )
1188 .await;
1189
1190 self.source_manager
1193 .apply_source_change(SourceChange::DropSource {
1194 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1195 })
1196 .await;
1197
1198 let dropped_source_fragments = removed_source_fragments;
1201 self.source_manager
1202 .apply_source_change(SourceChange::DropMv {
1203 dropped_source_fragments,
1204 })
1205 .await;
1206
1207 let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1209 .iter()
1210 .map(|sink| sink.id)
1211 .collect();
1212
1213 for sink in removed_iceberg_table_sinks {
1214 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1215 .expect("Iceberg sink should be valid");
1216 let iceberg_sink =
1217 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1218 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1219 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1220 tracing::info!(
1221 "dropping iceberg table {} for dropped sink",
1222 table_identifier
1223 );
1224
1225 let _ = iceberg_catalog
1226 .drop_table(&table_identifier)
1227 .await
1228 .inspect_err(|err| {
1229 tracing::error!(
1230 "failed to drop iceberg table {} during cleanup: {}",
1231 table_identifier,
1232 err.as_report()
1233 );
1234 });
1235 }
1236 }
1237
1238 if !iceberg_sink_ids.is_empty() {
1240 self.sink_manager
1241 .stop_sink_coordinator(iceberg_sink_ids)
1242 .await;
1243 }
1244
1245 for secret in secret_ids {
1247 LocalSecretManager::global().remove_secret(secret);
1248 }
1249 Ok(version)
1250 }
1251
1252 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1254 pub async fn replace_job(
1255 &self,
1256 mut streaming_job: StreamingJob,
1257 fragment_graph: StreamFragmentGraphProto,
1258 ) -> MetaResult<NotificationVersion> {
1259 match &streaming_job {
1260 StreamingJob::Table(..)
1261 | StreamingJob::Source(..)
1262 | StreamingJob::MaterializedView(..) => {}
1263 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1264 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1265 }
1266 }
1267
1268 let job_id = streaming_job.id();
1269
1270 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1271 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1272
1273 let original_max_parallelism = self
1275 .metadata_manager
1276 .get_job_max_parallelism(streaming_job.id())
1277 .await?;
1278 let fragment_graph = PbStreamFragmentGraph {
1279 max_parallelism: original_max_parallelism as _,
1280 ..fragment_graph
1281 };
1282
1283 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1285 streaming_job.set_info_from_graph(&fragment_graph);
1286
1287 let streaming_job = streaming_job;
1289
1290 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1291 let auto_refresh_schema_sinks = self
1292 .metadata_manager
1293 .catalog_controller
1294 .get_sink_auto_refresh_schema_from(table.id)
1295 .await?;
1296 if !auto_refresh_schema_sinks.is_empty() {
1297 let original_table_columns = self
1298 .metadata_manager
1299 .catalog_controller
1300 .get_table_columns(table.id)
1301 .await?;
1302 let mut original_table_column_ids: HashSet<_> = original_table_columns
1304 .iter()
1305 .map(|col| col.column_id())
1306 .collect();
1307 let newly_added_columns = table
1308 .columns
1309 .iter()
1310 .filter(|col| {
1311 !original_table_column_ids.remove(&ColumnId::new(
1312 col.column_desc.as_ref().unwrap().column_id as _,
1313 ))
1314 })
1315 .map(|col| ColumnCatalog::from(col.clone()))
1316 .collect_vec();
1317 if !original_table_column_ids.is_empty() {
1318 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1319 }
1320 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1321 for sink in auto_refresh_schema_sinks {
1322 let sink_job_fragments = self
1323 .metadata_manager
1324 .get_job_fragments_by_id(sink.id.as_job_id())
1325 .await?;
1326 if sink_job_fragments.fragments.len() != 1 {
1327 return Err(anyhow!(
1328 "auto schema refresh sink must have only one fragment, but got {}",
1329 sink_job_fragments.fragments.len()
1330 )
1331 .into());
1332 }
1333 let original_sink_fragment =
1334 sink_job_fragments.fragments.into_values().next().unwrap();
1335 let (new_sink_fragment, new_schema, new_log_store_table) =
1336 rewrite_refresh_schema_sink_fragment(
1337 &original_sink_fragment,
1338 &sink,
1339 &newly_added_columns,
1340 table,
1341 fragment_graph.table_fragment_id(),
1342 self.env.id_gen_manager(),
1343 self.env.actor_id_generator(),
1344 )?;
1345
1346 assert_eq!(
1347 original_sink_fragment.actors.len(),
1348 new_sink_fragment.actors.len()
1349 );
1350 let actor_status = (0..original_sink_fragment.actors.len())
1351 .map(|i| {
1352 let worker_node_id = sink_job_fragments.actor_status
1353 [&original_sink_fragment.actors[i].actor_id]
1354 .location
1355 .as_ref()
1356 .unwrap()
1357 .worker_node_id;
1358 (
1359 new_sink_fragment.actors[i].actor_id,
1360 PbActorStatus {
1361 location: Some(PbActorLocation { worker_node_id }),
1362 },
1363 )
1364 })
1365 .collect();
1366
1367 let streaming_job = StreamingJob::Sink(sink);
1368
1369 let tmp_sink_id = self
1370 .metadata_manager
1371 .catalog_controller
1372 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1373 .await?
1374 .as_sink_id();
1375 let StreamingJob::Sink(sink) = streaming_job else {
1376 unreachable!()
1377 };
1378
1379 sinks.push(AutoRefreshSchemaSinkContext {
1380 tmp_sink_id,
1381 original_sink: sink,
1382 original_fragment: original_sink_fragment,
1383 new_schema,
1384 newly_add_fields: newly_added_columns
1385 .iter()
1386 .map(|col| Field::from(&col.column_desc))
1387 .collect(),
1388 new_fragment: new_sink_fragment,
1389 new_log_store_table,
1390 actor_status,
1391 });
1392 }
1393 Some(sinks)
1394 } else {
1395 None
1396 }
1397 } else {
1398 None
1399 };
1400
1401 let tmp_id = self
1402 .metadata_manager
1403 .catalog_controller
1404 .create_job_catalog_for_replace(
1405 &streaming_job,
1406 Some(&ctx),
1407 fragment_graph.specified_parallelism().as_ref(),
1408 Some(fragment_graph.max_parallelism()),
1409 )
1410 .await?;
1411
1412 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1413 sinks
1414 .iter()
1415 .map(|sink| sink.tmp_sink_id.as_object_id())
1416 .collect_vec()
1417 });
1418
1419 tracing::debug!(id = %job_id, "building replace streaming job");
1420 let mut updated_sink_catalogs = vec![];
1421
1422 let mut drop_table_connector_ctx = None;
1423 let result: MetaResult<_> = try {
1424 let (mut ctx, mut stream_job_fragments) = self
1425 .build_replace_job(
1426 ctx,
1427 &streaming_job,
1428 fragment_graph,
1429 tmp_id,
1430 auto_refresh_schema_sinks,
1431 )
1432 .await?;
1433 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1434 let auto_refresh_schema_sink_finish_ctx =
1435 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1436 sinks
1437 .iter()
1438 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1439 tmp_sink_id: sink.tmp_sink_id,
1440 original_sink_id: sink.original_sink.id,
1441 columns: sink.new_schema.clone(),
1442 new_log_store_table: sink
1443 .new_log_store_table
1444 .as_ref()
1445 .map(|table| (table.id, table.columns.clone())),
1446 })
1447 .collect()
1448 });
1449
1450 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1452 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1453 let upstream_infos = self
1454 .metadata_manager
1455 .catalog_controller
1456 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1457 .await?;
1458 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1459
1460 for upstream_info in &upstream_infos {
1461 let upstream_fragment_id = upstream_info.sink_fragment_id;
1462 ctx.upstream_fragment_downstreams
1463 .entry(upstream_fragment_id)
1464 .or_default()
1465 .push(upstream_info.new_sink_downstream.clone());
1466 if upstream_info.sink_original_target_columns.is_empty() {
1467 updated_sink_catalogs.push(upstream_info.sink_id);
1468 }
1469 }
1470 }
1471
1472 let replace_upstream = ctx.replace_upstream.clone();
1473
1474 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1475 let empty_downstreams = FragmentDownstreamRelation::default();
1476 for sink in sinks {
1477 self.metadata_manager
1478 .catalog_controller
1479 .prepare_streaming_job(
1480 sink.tmp_sink_id.as_job_id(),
1481 || [&sink.new_fragment].into_iter(),
1482 &empty_downstreams,
1483 true,
1484 None,
1485 )
1486 .await?;
1487 }
1488 }
1489
1490 self.metadata_manager
1491 .catalog_controller
1492 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1493 .await?;
1494
1495 self.stream_manager
1496 .replace_stream_job(stream_job_fragments, ctx)
1497 .await?;
1498 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1499 };
1500
1501 match result {
1502 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1503 let version = self
1504 .metadata_manager
1505 .catalog_controller
1506 .finish_replace_streaming_job(
1507 tmp_id,
1508 streaming_job,
1509 replace_upstream,
1510 SinkIntoTableContext {
1511 updated_sink_catalogs,
1512 },
1513 drop_table_connector_ctx.as_ref(),
1514 auto_refresh_schema_sink_finish_ctx,
1515 )
1516 .await?;
1517 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1518 self.source_manager
1519 .apply_source_change(SourceChange::DropSource {
1520 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1521 })
1522 .await;
1523 }
1524 Ok(version)
1525 }
1526 Err(err) => {
1527 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1528 let _ = self.metadata_manager
1529 .catalog_controller
1530 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1531 .await.inspect_err(|err| {
1532 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1533 });
1534 Err(err)
1535 }
1536 }
1537 }
1538
1539 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1540 )]
1541 async fn drop_streaming_job(
1542 &self,
1543 job_id: StreamingJobId,
1544 drop_mode: DropMode,
1545 ) -> MetaResult<NotificationVersion> {
1546 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1547
1548 let (object_id, object_type) = match job_id {
1549 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1550 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1551 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1552 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1553 };
1554
1555 let job_status = self
1556 .metadata_manager
1557 .catalog_controller
1558 .get_streaming_job_status(job_id.id())
1559 .await?;
1560 let version = match job_status {
1561 JobStatus::Initial => {
1562 unreachable!(
1563 "Job with Initial status should not notify frontend and therefore should not arrive here"
1564 );
1565 }
1566 JobStatus::Creating => {
1567 self.stream_manager
1568 .cancel_streaming_jobs(vec![job_id.id()])
1569 .await?;
1570 IGNORED_NOTIFICATION_VERSION
1571 }
1572 JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1573 };
1574
1575 Ok(version)
1576 }
1577
1578 fn resolve_stream_parallelism(
1582 &self,
1583 specified: Option<NonZeroUsize>,
1584 max: NonZeroUsize,
1585 cluster_info: &StreamingClusterInfo,
1586 resource_group: String,
1587 ) -> MetaResult<NonZeroUsize> {
1588 let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1589 DdlController::resolve_stream_parallelism_inner(
1590 specified,
1591 max,
1592 available,
1593 &self.env.opts.default_parallelism,
1594 &resource_group,
1595 )
1596 }
1597
1598 fn resolve_stream_parallelism_inner(
1599 specified: Option<NonZeroUsize>,
1600 max: NonZeroUsize,
1601 available: Option<NonZeroUsize>,
1602 default_parallelism: &DefaultParallelism,
1603 resource_group: &str,
1604 ) -> MetaResult<NonZeroUsize> {
1605 let Some(available) = available else {
1606 bail_unavailable!(
1607 "no available slots to schedule in resource group \"{}\", \
1608 have you allocated any compute nodes within this resource group?",
1609 resource_group
1610 );
1611 };
1612
1613 if let Some(specified) = specified {
1614 if specified > max {
1615 bail_invalid_parameter!(
1616 "specified parallelism {} should not exceed max parallelism {}",
1617 specified,
1618 max,
1619 );
1620 }
1621 if specified > available {
1622 tracing::warn!(
1623 resource_group,
1624 specified_parallelism = specified.get(),
1625 available_parallelism = available.get(),
1626 "specified parallelism exceeds available slots, scheduling with specified value",
1627 );
1628 }
1629 return Ok(specified);
1630 }
1631
1632 let default_parallelism = match default_parallelism {
1634 DefaultParallelism::Full => available,
1635 DefaultParallelism::Default(num) => {
1636 if *num > available {
1637 tracing::warn!(
1638 resource_group,
1639 configured_parallelism = num.get(),
1640 available_parallelism = available.get(),
1641 "default parallelism exceeds available slots, scheduling with configured value",
1642 );
1643 }
1644 *num
1645 }
1646 };
1647
1648 if default_parallelism > max {
1649 tracing::warn!(
1650 max_parallelism = max.get(),
1651 resource_group,
1652 "default parallelism exceeds max parallelism, capping to max",
1653 );
1654 }
1655 Ok(default_parallelism.min(max))
1656 }
1657
1658 #[await_tree::instrument]
1664 pub(crate) async fn build_stream_job(
1665 &self,
1666 stream_ctx: StreamContext,
1667 mut stream_job: StreamingJob,
1668 fragment_graph: StreamFragmentGraph,
1669 specific_resource_group: Option<String>,
1670 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1671 let id = stream_job.id();
1672 let specified_parallelism = fragment_graph.specified_parallelism();
1673 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1674
1675 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1677
1678 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1682 fragment_graph.collect_snapshot_backfill_info()?;
1683 assert!(
1684 snapshot_backfill_info
1685 .iter()
1686 .chain([&cross_db_snapshot_backfill_info])
1687 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1688 .all(|backfill_epoch| backfill_epoch.is_none()),
1689 "should not set backfill epoch when initially build the job: {:?} {:?}",
1690 snapshot_backfill_info,
1691 cross_db_snapshot_backfill_info
1692 );
1693
1694 let locality_fragment_state_table_mapping =
1695 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1696
1697 self.metadata_manager
1699 .catalog_controller
1700 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1701 .await?;
1702
1703 let upstream_table_ids = fragment_graph
1704 .dependent_table_ids()
1705 .iter()
1706 .filter(|id| {
1707 !cross_db_snapshot_backfill_info
1708 .upstream_mv_table_id_to_backfill_epoch
1709 .contains_key(id)
1710 })
1711 .cloned()
1712 .collect();
1713
1714 let (upstream_root_fragments, existing_actor_location) = self
1715 .metadata_manager
1716 .get_upstream_root_fragments(&upstream_table_ids)
1717 .await?;
1718
1719 if snapshot_backfill_info.is_some() {
1720 match stream_job {
1721 StreamingJob::MaterializedView(_)
1722 | StreamingJob::Sink(_)
1723 | StreamingJob::Index(_, _) => {}
1724 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1725 return Err(
1726 anyhow!("snapshot_backfill not enabled for table and source").into(),
1727 );
1728 }
1729 }
1730 }
1731
1732 let upstream_actors = upstream_root_fragments
1733 .values()
1734 .map(|(fragment, _)| {
1735 (
1736 fragment.fragment_id,
1737 fragment.actors.keys().copied().collect(),
1738 )
1739 })
1740 .collect();
1741
1742 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1743 fragment_graph,
1744 FragmentGraphUpstreamContext {
1745 upstream_root_fragments,
1746 upstream_actor_location: existing_actor_location,
1747 },
1748 (&stream_job).into(),
1749 )?;
1750 let resource_group = match specific_resource_group {
1751 None => {
1752 self.metadata_manager
1753 .get_database_resource_group(stream_job.database_id())
1754 .await?
1755 }
1756 Some(resource_group) => resource_group,
1757 };
1758
1759 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1761
1762 let parallelism = self.resolve_stream_parallelism(
1763 specified_parallelism,
1764 max_parallelism,
1765 &cluster_info,
1766 resource_group.clone(),
1767 )?;
1768
1769 let parallelism = self
1770 .env
1771 .system_params_reader()
1772 .await
1773 .adaptive_parallelism_strategy()
1774 .compute_target_parallelism(parallelism.get());
1775
1776 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1777 let actor_graph_builder = ActorGraphBuilder::new(
1778 id,
1779 resource_group,
1780 complete_graph,
1781 cluster_info,
1782 parallelism,
1783 )?;
1784
1785 let ActorGraphBuildResult {
1786 graph,
1787 downstream_fragment_relations,
1788 building_locations,
1789 upstream_fragment_downstreams,
1790 new_no_shuffle,
1791 replace_upstream,
1792 ..
1793 } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1794 assert!(replace_upstream.is_empty());
1795
1796 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1803 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1804 _ => TableParallelism::Fixed(parallelism.get()),
1805 };
1806
1807 let stream_job_fragments = StreamJobFragments::new(
1808 id,
1809 graph,
1810 &building_locations.actor_locations,
1811 stream_ctx.clone(),
1812 table_parallelism,
1813 max_parallelism.get(),
1814 );
1815
1816 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1817 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1818 }
1819
1820 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1821 && let Ok(table_id) = sink.get_target_table()
1822 {
1823 let tables = self
1824 .metadata_manager
1825 .get_table_catalog_by_ids(&[*table_id])
1826 .await?;
1827 let target_table = tables
1828 .first()
1829 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1830 let sink_fragment = stream_job_fragments
1831 .sink_fragment()
1832 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1833 let mview_fragment_id = self
1834 .metadata_manager
1835 .catalog_controller
1836 .get_mview_fragment_by_id(table_id.as_job_id())
1837 .await?;
1838 let upstream_sink_info = build_upstream_sink_info(
1839 sink,
1840 sink_fragment.fragment_id as _,
1841 target_table,
1842 mview_fragment_id,
1843 )?;
1844 Some(upstream_sink_info)
1845 } else {
1846 None
1847 };
1848
1849 let mut cdc_table_snapshot_splits = None;
1850 if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1851 && let Some((_, stream_cdc_scan)) =
1852 parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1853 {
1854 {
1855 let splits = try_init_parallel_cdc_table_snapshot_splits(
1857 table.id,
1858 stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1859 self.env.meta_store_ref(),
1860 stream_cdc_scan.options.as_ref().unwrap(),
1861 self.env.opts.cdc_table_split_init_insert_batch_size,
1862 self.env.opts.cdc_table_split_init_sleep_interval_splits,
1863 self.env.opts.cdc_table_split_init_sleep_duration_millis,
1864 )
1865 .await?;
1866 cdc_table_snapshot_splits = Some(splits);
1867 }
1868 }
1869
1870 let ctx = CreateStreamingJobContext {
1871 upstream_fragment_downstreams,
1872 new_no_shuffle,
1873 upstream_actors,
1874 building_locations,
1875 definition: stream_job.definition(),
1876 create_type: stream_job.create_type(),
1877 job_type: (&stream_job).into(),
1878 streaming_job: stream_job,
1879 new_upstream_sink,
1880 option: CreateStreamingJobOption {},
1881 snapshot_backfill_info,
1882 cross_db_snapshot_backfill_info,
1883 fragment_backfill_ordering,
1884 locality_fragment_state_table_mapping,
1885 cdc_table_snapshot_splits,
1886 };
1887
1888 Ok((
1889 ctx,
1890 StreamJobFragmentsToCreate {
1891 inner: stream_job_fragments,
1892 downstreams: downstream_fragment_relations,
1893 },
1894 ))
1895 }
1896
1897 pub(crate) async fn build_replace_job(
1903 &self,
1904 stream_ctx: StreamContext,
1905 stream_job: &StreamingJob,
1906 mut fragment_graph: StreamFragmentGraph,
1907 tmp_job_id: JobId,
1908 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1909 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1910 match &stream_job {
1911 StreamingJob::Table(..)
1912 | StreamingJob::Source(..)
1913 | StreamingJob::MaterializedView(..) => {}
1914 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1915 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1916 }
1917 }
1918
1919 let id = stream_job.id();
1920
1921 let mut drop_table_associated_source_id = None;
1923 if let StreamingJob::Table(None, _, _) = &stream_job {
1924 drop_table_associated_source_id = self
1925 .metadata_manager
1926 .get_table_associated_source_id(id.as_mv_table_id())
1927 .await?;
1928 }
1929
1930 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1931 let old_internal_table_ids = old_fragments.internal_table_ids();
1932
1933 let mut drop_table_connector_ctx = None;
1935 if let Some(to_remove_source_id) = drop_table_associated_source_id {
1936 debug_assert!(old_internal_table_ids.len() == 1);
1938
1939 drop_table_connector_ctx = Some(DropTableConnectorContext {
1940 to_change_streaming_job_id: id,
1943 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
1945 });
1946 } else if stream_job.is_materialized_view() {
1947 let old_fragments_upstreams = self
1950 .metadata_manager
1951 .catalog_controller
1952 .upstream_fragments(old_fragments.fragment_ids())
1953 .await?;
1954
1955 let old_state_graph =
1956 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1957 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1958 let result = state_match::match_graph(&new_state_graph, &old_state_graph)
1959 .context("incompatible altering on the streaming job states")?;
1960
1961 fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
1962 fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
1963 } else {
1964 let old_internal_tables = self
1967 .metadata_manager
1968 .get_table_catalog_by_ids(&old_internal_table_ids)
1969 .await?;
1970 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
1971 }
1972
1973 let original_root_fragment = old_fragments
1976 .root_fragment()
1977 .expect("root fragment not found");
1978
1979 let job_type = StreamingJobType::from(stream_job);
1980
1981 let (mut downstream_fragments, mut downstream_actor_location) =
1983 self.metadata_manager.get_downstream_fragments(id).await?;
1984
1985 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
1986 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
1987 .iter()
1988 .map(|sink| sink.original_fragment.fragment_id)
1989 .collect();
1990 for (_, downstream_fragment, nodes) in &mut downstream_fragments {
1991 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
1992 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
1993 }) {
1994 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
1995 for actor_id in downstream_fragment.actors.keys() {
1996 downstream_actor_location.remove(actor_id);
1997 }
1998 for (actor_id, status) in &sink.actor_status {
1999 downstream_actor_location
2000 .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2001 }
2002
2003 *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2004 *nodes = sink.new_fragment.nodes.clone();
2005 }
2006 }
2007 assert!(remaining_fragment.is_empty());
2008 }
2009
2010 let complete_graph = match &job_type {
2012 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2013 CompleteStreamFragmentGraph::with_downstreams(
2014 fragment_graph,
2015 FragmentGraphDownstreamContext {
2016 original_root_fragment_id: original_root_fragment.fragment_id,
2017 downstream_fragments,
2018 downstream_actor_location,
2019 },
2020 job_type,
2021 )?
2022 }
2023 StreamingJobType::Table(TableJobType::SharedCdcSource)
2024 | StreamingJobType::MaterializedView => {
2025 let (upstream_root_fragments, upstream_actor_location) = self
2027 .metadata_manager
2028 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2029 .await?;
2030
2031 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2032 fragment_graph,
2033 FragmentGraphUpstreamContext {
2034 upstream_root_fragments,
2035 upstream_actor_location,
2036 },
2037 FragmentGraphDownstreamContext {
2038 original_root_fragment_id: original_root_fragment.fragment_id,
2039 downstream_fragments,
2040 downstream_actor_location,
2041 },
2042 job_type,
2043 )?
2044 }
2045 _ => unreachable!(),
2046 };
2047
2048 let resource_group = self
2049 .metadata_manager
2050 .get_existing_job_resource_group(id)
2051 .await?;
2052
2053 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2055
2056 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2059 .expect("The number of actors in the original table fragment should be greater than 0");
2060
2061 let actor_graph_builder = ActorGraphBuilder::new(
2062 id,
2063 resource_group,
2064 complete_graph,
2065 cluster_info,
2066 parallelism,
2067 )?;
2068
2069 let ActorGraphBuildResult {
2070 graph,
2071 downstream_fragment_relations,
2072 building_locations,
2073 upstream_fragment_downstreams,
2074 mut replace_upstream,
2075 new_no_shuffle,
2076 ..
2077 } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2078
2079 if matches!(
2081 job_type,
2082 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2083 ) {
2084 assert!(upstream_fragment_downstreams.is_empty());
2085 }
2086
2087 let stream_job_fragments = StreamJobFragments::new(
2091 tmp_job_id,
2092 graph,
2093 &building_locations.actor_locations,
2094 stream_ctx,
2095 old_fragments.assigned_parallelism,
2096 old_fragments.max_parallelism,
2097 );
2098
2099 if let Some(sinks) = &auto_refresh_schema_sinks {
2100 for sink in sinks {
2101 replace_upstream
2102 .remove(&sink.new_fragment.fragment_id)
2103 .expect("should exist");
2104 }
2105 }
2106
2107 let ctx = ReplaceStreamJobContext {
2111 old_fragments,
2112 replace_upstream,
2113 new_no_shuffle,
2114 upstream_fragment_downstreams,
2115 building_locations,
2116 streaming_job: stream_job.clone(),
2117 tmp_id: tmp_job_id,
2118 drop_table_connector_ctx,
2119 auto_refresh_schema_sinks,
2120 };
2121
2122 Ok((
2123 ctx,
2124 StreamJobFragmentsToCreate {
2125 inner: stream_job_fragments,
2126 downstreams: downstream_fragment_relations,
2127 },
2128 ))
2129 }
2130
2131 async fn alter_name(
2132 &self,
2133 relation: alter_name_request::Object,
2134 new_name: &str,
2135 ) -> MetaResult<NotificationVersion> {
2136 let (obj_type, id) = match relation {
2137 alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2138 alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2139 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2140 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2141 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2142 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2143 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2144 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2145 };
2146 self.metadata_manager
2147 .catalog_controller
2148 .alter_name(obj_type, id, new_name)
2149 .await
2150 }
2151
2152 async fn alter_swap_rename(
2153 &self,
2154 object: alter_swap_rename_request::Object,
2155 ) -> MetaResult<NotificationVersion> {
2156 let (obj_type, src_id, dst_id) = match object {
2157 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2158 alter_swap_rename_request::Object::Table(objs) => {
2159 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2160 (ObjectType::Table, src_id, dst_id)
2161 }
2162 alter_swap_rename_request::Object::View(objs) => {
2163 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2164 (ObjectType::View, src_id, dst_id)
2165 }
2166 alter_swap_rename_request::Object::Source(objs) => {
2167 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2168 (ObjectType::Source, src_id, dst_id)
2169 }
2170 alter_swap_rename_request::Object::Sink(objs) => {
2171 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2172 (ObjectType::Sink, src_id, dst_id)
2173 }
2174 alter_swap_rename_request::Object::Subscription(objs) => {
2175 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2176 (ObjectType::Subscription, src_id, dst_id)
2177 }
2178 };
2179
2180 self.metadata_manager
2181 .catalog_controller
2182 .alter_swap_rename(obj_type, src_id, dst_id)
2183 .await
2184 }
2185
2186 async fn alter_owner(
2187 &self,
2188 object: Object,
2189 owner_id: UserId,
2190 ) -> MetaResult<NotificationVersion> {
2191 let (obj_type, id) = match object {
2192 Object::TableId(id) => (ObjectType::Table, id),
2193 Object::ViewId(id) => (ObjectType::View, id),
2194 Object::SourceId(id) => (ObjectType::Source, id),
2195 Object::SinkId(id) => (ObjectType::Sink, id),
2196 Object::SchemaId(id) => (ObjectType::Schema, id),
2197 Object::DatabaseId(id) => (ObjectType::Database, id),
2198 Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2199 Object::ConnectionId(id) => (ObjectType::Connection, id),
2200 };
2201 self.metadata_manager
2202 .catalog_controller
2203 .alter_owner(obj_type, id.into(), owner_id as _)
2204 .await
2205 }
2206
2207 async fn alter_set_schema(
2208 &self,
2209 object: alter_set_schema_request::Object,
2210 new_schema_id: SchemaId,
2211 ) -> MetaResult<NotificationVersion> {
2212 let (obj_type, id) = match object {
2213 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2214 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2215 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2216 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2217 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2218 alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2219 alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2220 };
2221 self.metadata_manager
2222 .catalog_controller
2223 .alter_schema(obj_type, id.into(), new_schema_id as _)
2224 .await
2225 }
2226
2227 pub async fn wait(&self) -> MetaResult<()> {
2228 let timeout_ms = 30 * 60 * 1000;
2229 for _ in 0..timeout_ms {
2230 if self
2231 .metadata_manager
2232 .catalog_controller
2233 .list_background_creating_jobs(true, None)
2234 .await?
2235 .is_empty()
2236 {
2237 return Ok(());
2238 }
2239
2240 sleep(Duration::from_millis(1)).await;
2241 }
2242 Err(MetaError::cancelled(format!(
2243 "timeout after {timeout_ms}ms"
2244 )))
2245 }
2246
2247 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2248 self.metadata_manager
2249 .catalog_controller
2250 .comment_on(comment)
2251 .await
2252 }
2253
2254 async fn alter_streaming_job_config(
2255 &self,
2256 job_id: JobId,
2257 entries_to_add: HashMap<String, String>,
2258 keys_to_remove: Vec<String>,
2259 ) -> MetaResult<NotificationVersion> {
2260 self.metadata_manager
2261 .catalog_controller
2262 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2263 .await
2264 }
2265}
2266
2267fn report_create_object(
2268 job_id: JobId,
2269 event_name: &str,
2270 obj_type: PbTelemetryDatabaseObject,
2271 connector_name: Option<String>,
2272 attr_info: Option<jsonbb::Value>,
2273) {
2274 report_event(
2275 PbTelemetryEventStage::CreateStreamJob,
2276 event_name,
2277 job_id.as_raw_id() as _,
2278 connector_name,
2279 Some(obj_type),
2280 attr_info,
2281 );
2282}
2283
2284pub fn build_upstream_sink_info(
2285 sink: &PbSink,
2286 sink_fragment_id: FragmentId,
2287 target_table: &PbTable,
2288 target_fragment_id: FragmentId,
2289) -> MetaResult<UpstreamSinkInfo> {
2290 let sink_columns = if !sink.original_target_columns.is_empty() {
2291 sink.original_target_columns.clone()
2292 } else {
2293 target_table.columns.clone()
2298 };
2299
2300 let sink_output_fields = sink_columns
2301 .iter()
2302 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2303 .collect_vec();
2304 let output_indices = (0..sink_output_fields.len())
2305 .map(|i| i as u32)
2306 .collect_vec();
2307
2308 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2309 let sink_idx_by_col_id = sink_columns
2310 .iter()
2311 .enumerate()
2312 .map(|(idx, col)| {
2313 let column_id = col.column_desc.as_ref().unwrap().column_id;
2314 (column_id, idx as u32)
2315 })
2316 .collect::<HashMap<_, _>>();
2317 target_table
2318 .distribution_key
2319 .iter()
2320 .map(|dist_idx| {
2321 let column_id = target_table.columns[*dist_idx as usize]
2322 .column_desc
2323 .as_ref()
2324 .unwrap()
2325 .column_id;
2326 let sink_idx = sink_idx_by_col_id
2327 .get(&column_id)
2328 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2329 Ok(*sink_idx)
2330 })
2331 .collect::<anyhow::Result<Vec<_>>>()?
2332 };
2333 let dist_key_indices =
2334 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2335 let downstream_fragment_id = target_fragment_id as _;
2336 let new_downstream_relation = DownstreamFragmentRelation {
2337 downstream_fragment_id,
2338 dispatcher_type: DispatcherType::Hash,
2339 dist_key_indices,
2340 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2341 };
2342 let current_target_columns = target_table.get_columns();
2343 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2344 Ok(UpstreamSinkInfo {
2345 sink_id: sink.id,
2346 sink_fragment_id: sink_fragment_id as _,
2347 sink_output_fields,
2348 sink_original_target_columns: sink.get_original_target_columns().clone(),
2349 project_exprs,
2350 new_sink_downstream: new_downstream_relation,
2351 })
2352}
2353
2354pub fn refill_upstream_sink_union_in_table(
2355 union_fragment_root: &mut PbStreamNode,
2356 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2357) {
2358 visit_stream_node_cont_mut(union_fragment_root, |node| {
2359 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2360 let init_upstreams = upstream_sink_infos
2361 .iter()
2362 .map(|info| PbUpstreamSinkInfo {
2363 upstream_fragment_id: info.sink_fragment_id,
2364 sink_output_schema: info.sink_output_fields.clone(),
2365 project_exprs: info.project_exprs.clone(),
2366 })
2367 .collect();
2368 upstream_sink_union.init_upstreams = init_upstreams;
2369 false
2370 } else {
2371 true
2372 }
2373 });
2374}
2375
2376#[cfg(test)]
2377mod tests {
2378 use std::num::NonZeroUsize;
2379
2380 use super::*;
2381
2382 #[test]
2383 fn test_specified_parallelism_exceeds_available() {
2384 let result = DdlController::resolve_stream_parallelism_inner(
2385 Some(NonZeroUsize::new(100).unwrap()),
2386 NonZeroUsize::new(256).unwrap(),
2387 Some(NonZeroUsize::new(4).unwrap()),
2388 &DefaultParallelism::Full,
2389 "default",
2390 )
2391 .unwrap();
2392 assert_eq!(result.get(), 100);
2393 }
2394
2395 #[test]
2396 fn test_allows_default_parallelism_over_available() {
2397 let result = DdlController::resolve_stream_parallelism_inner(
2398 None,
2399 NonZeroUsize::new(256).unwrap(),
2400 Some(NonZeroUsize::new(4).unwrap()),
2401 &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2402 "default",
2403 )
2404 .unwrap();
2405 assert_eq!(result.get(), 50);
2406 }
2407
2408 #[test]
2409 fn test_full_parallelism_capped_by_max() {
2410 let result = DdlController::resolve_stream_parallelism_inner(
2411 None,
2412 NonZeroUsize::new(6).unwrap(),
2413 Some(NonZeroUsize::new(10).unwrap()),
2414 &DefaultParallelism::Full,
2415 "default",
2416 )
2417 .unwrap();
2418 assert_eq!(result.get(), 6);
2419 }
2420
2421 #[test]
2422 fn test_no_available_slots_returns_error() {
2423 let result = DdlController::resolve_stream_parallelism_inner(
2424 None,
2425 NonZeroUsize::new(4).unwrap(),
2426 None,
2427 &DefaultParallelism::Full,
2428 "default",
2429 );
2430 assert!(matches!(
2431 result,
2432 Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2433 ));
2434 }
2435
2436 #[test]
2437 fn test_specified_over_max_returns_error() {
2438 let result = DdlController::resolve_stream_parallelism_inner(
2439 Some(NonZeroUsize::new(8).unwrap()),
2440 NonZeroUsize::new(4).unwrap(),
2441 Some(NonZeroUsize::new(10).unwrap()),
2442 &DefaultParallelism::Full,
2443 "default",
2444 );
2445 assert!(matches!(
2446 result,
2447 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2448 ));
2449 }
2450}