1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50 Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
51 Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57 alter_swap_rename_request,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::stream_plan::stream_node::NodeBody;
61use risingwave_pb::stream_plan::{
62 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
63 StreamFragmentGraph as StreamFragmentGraphProto,
64};
65use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
66use strum::Display;
67use thiserror_ext::AsReport;
68use tokio::sync::{OwnedSemaphorePermit, Semaphore};
69use tokio::time::sleep;
70use tracing::Instrument;
71
72use crate::barrier::BarrierManagerRef;
73use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
74use crate::controller::cluster::StreamingClusterInfo;
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::controller::utils::build_select_node_list;
77use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
78use crate::manager::{
79 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
80 NotificationVersion, StreamingJob, StreamingJobType,
81};
82use crate::model::{
83 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
84 FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
85 TableParallelism,
86};
87use crate::stream::cdc::{
88 parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
89};
90use crate::stream::{
91 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
92 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
93 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
94 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
95 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
96 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
97};
98use crate::telemetry::report_event;
99use crate::{MetaError, MetaResult};
100
101#[derive(PartialEq)]
102pub enum DropMode {
103 Restrict,
104 Cascade,
105}
106
107impl DropMode {
108 pub fn from_request_setting(cascade: bool) -> DropMode {
109 if cascade {
110 DropMode::Cascade
111 } else {
112 DropMode::Restrict
113 }
114 }
115}
116
117#[derive(strum::AsRefStr)]
118pub enum StreamingJobId {
119 MaterializedView(TableId),
120 Sink(SinkId),
121 Table(Option<SourceId>, TableId),
122 Index(IndexId),
123}
124
125impl std::fmt::Display for StreamingJobId {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 write!(f, "{}", self.as_ref())?;
128 write!(f, "({})", self.id())
129 }
130}
131
132impl StreamingJobId {
133 fn id(&self) -> JobId {
134 match self {
135 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
136 StreamingJobId::Index(id) => id.as_job_id(),
137 StreamingJobId::Sink(id) => id.as_job_id(),
138 }
139 }
140}
141
142pub struct ReplaceStreamJobInfo {
145 pub streaming_job: StreamingJob,
146 pub fragment_graph: StreamFragmentGraphProto,
147}
148
149#[derive(Display)]
150pub enum DdlCommand {
151 CreateDatabase(Database),
152 DropDatabase(DatabaseId),
153 CreateSchema(Schema),
154 DropSchema(SchemaId, DropMode),
155 CreateNonSharedSource(Source),
156 DropSource(SourceId, DropMode),
157 CreateFunction(Function),
158 DropFunction(FunctionId, DropMode),
159 CreateView(View, HashSet<ObjectId>),
160 DropView(ViewId, DropMode),
161 CreateStreamingJob {
162 stream_job: StreamingJob,
163 fragment_graph: StreamFragmentGraphProto,
164 dependencies: HashSet<ObjectId>,
165 specific_resource_group: Option<String>, if_not_exists: bool,
167 },
168 DropStreamingJob {
169 job_id: StreamingJobId,
170 drop_mode: DropMode,
171 },
172 AlterName(alter_name_request::Object, String),
173 AlterSwapRename(alter_swap_rename_request::Object),
174 ReplaceStreamJob(ReplaceStreamJobInfo),
175 AlterNonSharedSource(Source),
176 AlterObjectOwner(Object, UserId),
177 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
178 CreateConnection(Connection),
179 DropConnection(ConnectionId, DropMode),
180 CreateSecret(Secret),
181 AlterSecret(Secret),
182 DropSecret(SecretId),
183 CommentOn(Comment),
184 CreateSubscription(Subscription),
185 DropSubscription(SubscriptionId, DropMode),
186 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
187 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
188}
189
190impl DdlCommand {
191 fn object(&self) -> Either<String, ObjectId> {
193 use Either::*;
194 match self {
195 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
196 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
197 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
198 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
199 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
200 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
201 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
202 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
203 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
204 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
205 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
206 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
207 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
208 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
209 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
210 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
211 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
212 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
213 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
214 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
215 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
216 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
217 DdlCommand::DropSecret(id) => Right(id.as_object_id()),
218 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
219 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
220 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
221 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
222 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
223 }
224 }
225
226 fn allow_in_recovery(&self) -> bool {
227 match self {
228 DdlCommand::DropDatabase(_)
229 | DdlCommand::DropSchema(_, _)
230 | DdlCommand::DropSource(_, _)
231 | DdlCommand::DropFunction(_, _)
232 | DdlCommand::DropView(_, _)
233 | DdlCommand::DropStreamingJob { .. }
234 | DdlCommand::DropConnection(_, _)
235 | DdlCommand::DropSecret(_)
236 | DdlCommand::DropSubscription(_, _)
237 | DdlCommand::AlterName(_, _)
238 | DdlCommand::AlterObjectOwner(_, _)
239 | DdlCommand::AlterSetSchema(_, _)
240 | DdlCommand::CreateDatabase(_)
241 | DdlCommand::CreateSchema(_)
242 | DdlCommand::CreateFunction(_)
243 | DdlCommand::CreateView(_, _)
244 | DdlCommand::CreateConnection(_)
245 | DdlCommand::CommentOn(_)
246 | DdlCommand::CreateSecret(_)
247 | DdlCommand::AlterSecret(_)
248 | DdlCommand::AlterSwapRename(_)
249 | DdlCommand::AlterDatabaseParam(_, _)
250 | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
251 DdlCommand::CreateStreamingJob { .. }
252 | DdlCommand::CreateNonSharedSource(_)
253 | DdlCommand::ReplaceStreamJob(_)
254 | DdlCommand::AlterNonSharedSource(_)
255 | DdlCommand::CreateSubscription(_) => false,
256 }
257 }
258}
259
260#[derive(Clone)]
261pub struct DdlController {
262 pub(crate) env: MetaSrvEnv,
263
264 pub(crate) metadata_manager: MetadataManager,
265 pub(crate) stream_manager: GlobalStreamManagerRef,
266 pub(crate) source_manager: SourceManagerRef,
267 barrier_manager: BarrierManagerRef,
268
269 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
271
272 seq: Arc<AtomicU64>,
274}
275
276#[derive(Clone)]
277pub struct CreatingStreamingJobPermit {
278 pub(crate) semaphore: Arc<Semaphore>,
279}
280
281impl CreatingStreamingJobPermit {
282 async fn new(env: &MetaSrvEnv) -> Self {
283 let mut permits = env
284 .system_params_reader()
285 .await
286 .max_concurrent_creating_streaming_jobs() as usize;
287 if permits == 0 {
288 permits = Semaphore::MAX_PERMITS;
290 }
291 let semaphore = Arc::new(Semaphore::new(permits));
292
293 let (local_notification_tx, mut local_notification_rx) =
294 tokio::sync::mpsc::unbounded_channel();
295 env.notification_manager()
296 .insert_local_sender(local_notification_tx);
297 let semaphore_clone = semaphore.clone();
298 tokio::spawn(async move {
299 while let Some(notification) = local_notification_rx.recv().await {
300 let LocalNotification::SystemParamsChange(p) = ¬ification else {
301 continue;
302 };
303 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
304 if new_permits == 0 {
305 new_permits = Semaphore::MAX_PERMITS;
306 }
307 match permits.cmp(&new_permits) {
308 Ordering::Less => {
309 semaphore_clone.add_permits(new_permits - permits);
310 }
311 Ordering::Equal => continue,
312 Ordering::Greater => {
313 let to_release = permits - new_permits;
314 let reduced = semaphore_clone.forget_permits(to_release);
315 if reduced != to_release {
317 tracing::warn!(
318 "no enough permits to release, expected {}, but reduced {}",
319 to_release,
320 reduced
321 );
322 }
323 }
324 }
325 tracing::info!(
326 "max_concurrent_creating_streaming_jobs changed from {} to {}",
327 permits,
328 new_permits
329 );
330 permits = new_permits;
331 }
332 });
333
334 Self { semaphore }
335 }
336}
337
338impl DdlController {
339 pub async fn new(
340 env: MetaSrvEnv,
341 metadata_manager: MetadataManager,
342 stream_manager: GlobalStreamManagerRef,
343 source_manager: SourceManagerRef,
344 barrier_manager: BarrierManagerRef,
345 ) -> Self {
346 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
347 Self {
348 env,
349 metadata_manager,
350 stream_manager,
351 source_manager,
352 barrier_manager,
353 creating_streaming_job_permits,
354 seq: Arc::new(AtomicU64::new(0)),
355 }
356 }
357
358 pub fn next_seq(&self) -> u64 {
360 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
362 }
363
364 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
371 if !command.allow_in_recovery() {
372 self.barrier_manager.check_status_running()?;
373 }
374
375 let await_tree_key = format!("DDL Command {}", self.next_seq());
376 let await_tree_span = await_tree::span!("{command}({})", command.object());
377
378 let ctrl = self.clone();
379 let fut = async move {
380 match command {
381 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
382 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
383 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
384 DdlCommand::DropSchema(schema_id, drop_mode) => {
385 ctrl.drop_schema(schema_id, drop_mode).await
386 }
387 DdlCommand::CreateNonSharedSource(source) => {
388 ctrl.create_non_shared_source(source).await
389 }
390 DdlCommand::DropSource(source_id, drop_mode) => {
391 ctrl.drop_source(source_id, drop_mode).await
392 }
393 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
394 DdlCommand::DropFunction(function_id, drop_mode) => {
395 ctrl.drop_function(function_id, drop_mode).await
396 }
397 DdlCommand::CreateView(view, dependencies) => {
398 ctrl.create_view(view, dependencies).await
399 }
400 DdlCommand::DropView(view_id, drop_mode) => {
401 ctrl.drop_view(view_id, drop_mode).await
402 }
403 DdlCommand::CreateStreamingJob {
404 stream_job,
405 fragment_graph,
406 dependencies,
407 specific_resource_group,
408 if_not_exists,
409 } => {
410 ctrl.create_streaming_job(
411 stream_job,
412 fragment_graph,
413 dependencies,
414 specific_resource_group,
415 if_not_exists,
416 )
417 .await
418 }
419 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
420 ctrl.drop_streaming_job(job_id, drop_mode).await
421 }
422 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
423 streaming_job,
424 fragment_graph,
425 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
426 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
427 DdlCommand::AlterObjectOwner(object, owner_id) => {
428 ctrl.alter_owner(object, owner_id).await
429 }
430 DdlCommand::AlterSetSchema(object, new_schema_id) => {
431 ctrl.alter_set_schema(object, new_schema_id).await
432 }
433 DdlCommand::CreateConnection(connection) => {
434 ctrl.create_connection(connection).await
435 }
436 DdlCommand::DropConnection(connection_id, drop_mode) => {
437 ctrl.drop_connection(connection_id, drop_mode).await
438 }
439 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
440 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
441 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
442 DdlCommand::AlterNonSharedSource(source) => {
443 ctrl.alter_non_shared_source(source).await
444 }
445 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
446 DdlCommand::CreateSubscription(subscription) => {
447 ctrl.create_subscription(subscription).await
448 }
449 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
450 ctrl.drop_subscription(subscription_id, drop_mode).await
451 }
452 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
453 DdlCommand::AlterDatabaseParam(database_id, param) => {
454 ctrl.alter_database_param(database_id, param).await
455 }
456 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
457 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
458 .await
459 }
460 }
461 }
462 .in_current_span();
463 let fut = (self.env.await_tree_reg())
464 .register(await_tree_key, await_tree_span)
465 .instrument(Box::pin(fut));
466 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
467 Ok(Some(WaitVersion {
468 catalog_version: notification_version,
469 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
470 }))
471 }
472
473 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
474 self.barrier_manager.get_ddl_progress().await
475 }
476
477 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
478 let (version, updated_db) = self
479 .metadata_manager
480 .catalog_controller
481 .create_database(database)
482 .await?;
483 self.barrier_manager
485 .update_database_barrier(
486 updated_db.database_id,
487 updated_db.barrier_interval_ms.map(|v| v as u32),
488 updated_db.checkpoint_frequency.map(|v| v as u64),
489 )
490 .await?;
491 Ok(version)
492 }
493
494 #[tracing::instrument(skip(self), level = "debug")]
495 pub async fn reschedule_streaming_job(
496 &self,
497 job_id: JobId,
498 target: ReschedulePolicy,
499 mut deferred: bool,
500 ) -> MetaResult<()> {
501 tracing::info!("altering parallelism for job {}", job_id);
502 if self.barrier_manager.check_status_running().is_err() {
503 tracing::info!(
504 "alter parallelism is set to deferred mode because the system is in recovery state"
505 );
506 deferred = true;
507 }
508
509 self.stream_manager
510 .reschedule_streaming_job(job_id, target, deferred)
511 .await
512 }
513
514 pub async fn reschedule_cdc_table_backfill(
515 &self,
516 job_id: JobId,
517 target: ReschedulePolicy,
518 ) -> MetaResult<()> {
519 tracing::info!("alter CDC table backfill parallelism");
520 if self.barrier_manager.check_status_running().is_err() {
521 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
522 }
523 self.stream_manager
524 .reschedule_cdc_table_backfill(job_id, target)
525 .await
526 }
527
528 pub async fn reschedule_fragments(
529 &self,
530 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
531 ) -> MetaResult<()> {
532 tracing::info!(
533 "altering parallelism for fragments {:?}",
534 fragment_targets.keys()
535 );
536 let fragment_targets = fragment_targets
537 .into_iter()
538 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
539 .collect();
540
541 self.stream_manager
542 .reschedule_fragments(fragment_targets)
543 .await
544 }
545
546 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
547 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
548 .await
549 }
550
551 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
552 self.metadata_manager
553 .catalog_controller
554 .create_schema(schema)
555 .await
556 }
557
558 async fn drop_schema(
559 &self,
560 schema_id: SchemaId,
561 drop_mode: DropMode,
562 ) -> MetaResult<NotificationVersion> {
563 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
564 .await
565 }
566
567 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
569 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
570 .await
571 .context("failed to create source worker")?;
572
573 let (source_id, version) = self
574 .metadata_manager
575 .catalog_controller
576 .create_source(source)
577 .await?;
578 self.source_manager
579 .register_source_with_handle(source_id, handle)
580 .await;
581 Ok(version)
582 }
583
584 async fn drop_source(
585 &self,
586 source_id: SourceId,
587 drop_mode: DropMode,
588 ) -> MetaResult<NotificationVersion> {
589 self.drop_object(ObjectType::Source, source_id, drop_mode)
590 .await
591 }
592
593 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
596 self.metadata_manager
597 .catalog_controller
598 .alter_non_shared_source(source)
599 .await
600 }
601
602 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
603 self.metadata_manager
604 .catalog_controller
605 .create_function(function)
606 .await
607 }
608
609 async fn drop_function(
610 &self,
611 function_id: FunctionId,
612 drop_mode: DropMode,
613 ) -> MetaResult<NotificationVersion> {
614 self.drop_object(ObjectType::Function, function_id, drop_mode)
615 .await
616 }
617
618 async fn create_view(
619 &self,
620 view: View,
621 dependencies: HashSet<ObjectId>,
622 ) -> MetaResult<NotificationVersion> {
623 self.metadata_manager
624 .catalog_controller
625 .create_view(view, dependencies)
626 .await
627 }
628
629 async fn drop_view(
630 &self,
631 view_id: ViewId,
632 drop_mode: DropMode,
633 ) -> MetaResult<NotificationVersion> {
634 self.drop_object(ObjectType::View, view_id, drop_mode).await
635 }
636
637 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
638 validate_connection(&connection).await?;
639 self.metadata_manager
640 .catalog_controller
641 .create_connection(connection)
642 .await
643 }
644
645 async fn drop_connection(
646 &self,
647 connection_id: ConnectionId,
648 drop_mode: DropMode,
649 ) -> MetaResult<NotificationVersion> {
650 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
651 .await
652 }
653
654 async fn alter_database_param(
655 &self,
656 database_id: DatabaseId,
657 param: AlterDatabaseParam,
658 ) -> MetaResult<NotificationVersion> {
659 let (version, updated_db) = self
660 .metadata_manager
661 .catalog_controller
662 .alter_database_param(database_id, param)
663 .await?;
664 self.barrier_manager
666 .update_database_barrier(
667 database_id,
668 updated_db.barrier_interval_ms.map(|v| v as u32),
669 updated_db.checkpoint_frequency.map(|v| v as u64),
670 )
671 .await?;
672 Ok(version)
673 }
674
675 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
678 let secret_store_private_key = self
679 .env
680 .opts
681 .secret_store_private_key
682 .clone()
683 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
684
685 let encrypted_payload = SecretEncryption::encrypt(
686 secret_store_private_key.as_slice(),
687 secret.get_value().as_slice(),
688 )
689 .context(format!("failed to encrypt secret {}", secret.name))?;
690 Ok(encrypted_payload
691 .serialize()
692 .context(format!("failed to serialize secret {}", secret.name))?)
693 }
694
695 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
696 let secret_plain_payload = secret.value.clone();
699 let encrypted_payload = self.get_encrypted_payload(&secret)?;
700 secret.value = encrypted_payload;
701
702 self.metadata_manager
703 .catalog_controller
704 .create_secret(secret, secret_plain_payload)
705 .await
706 }
707
708 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
709 self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
710 .await
711 }
712
713 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
714 let secret_plain_payload = secret.value.clone();
715 let encrypted_payload = self.get_encrypted_payload(&secret)?;
716 secret.value = encrypted_payload;
717 self.metadata_manager
718 .catalog_controller
719 .alter_secret(secret, secret_plain_payload)
720 .await
721 }
722
723 async fn create_subscription(
724 &self,
725 mut subscription: Subscription,
726 ) -> MetaResult<NotificationVersion> {
727 tracing::debug!("create subscription");
728 let _permit = self
729 .creating_streaming_job_permits
730 .semaphore
731 .acquire()
732 .await
733 .unwrap();
734 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
735 self.metadata_manager
736 .catalog_controller
737 .create_subscription_catalog(&mut subscription)
738 .await?;
739 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
740 tracing::debug!(error = %err.as_report(), "failed to create subscription");
741 let _ = self
742 .metadata_manager
743 .catalog_controller
744 .try_abort_creating_subscription(subscription.id)
745 .await
746 .inspect_err(|e| {
747 tracing::error!(
748 error = %e.as_report(),
749 "failed to abort create subscription after failure"
750 );
751 });
752 return Err(err);
753 }
754
755 let version = self
756 .metadata_manager
757 .catalog_controller
758 .notify_create_subscription(subscription.id)
759 .await?;
760 tracing::debug!("finish create subscription");
761 Ok(version)
762 }
763
764 async fn drop_subscription(
765 &self,
766 subscription_id: SubscriptionId,
767 drop_mode: DropMode,
768 ) -> MetaResult<NotificationVersion> {
769 tracing::debug!("preparing drop subscription");
770 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
771 let subscription = self
772 .metadata_manager
773 .catalog_controller
774 .get_subscription_by_id(subscription_id)
775 .await?;
776 let table_id = subscription.dependent_table_id;
777 let database_id = subscription.database_id;
778 let (_, version) = self
779 .metadata_manager
780 .catalog_controller
781 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
782 .await?;
783 self.stream_manager
784 .drop_subscription(database_id, subscription_id, table_id)
785 .await;
786 tracing::debug!("finish drop subscription");
787 Ok(version)
788 }
789
790 #[await_tree::instrument]
792 pub(crate) async fn validate_cdc_table(
793 &self,
794 table: &Table,
795 table_fragments: &StreamJobFragments,
796 ) -> MetaResult<()> {
797 let stream_scan_fragment = table_fragments
798 .fragments
799 .values()
800 .filter(|f| {
801 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
802 || f.fragment_type_mask
803 .contains(FragmentTypeFlag::StreamCdcScan)
804 })
805 .exactly_one()
806 .ok()
807 .with_context(|| {
808 format!(
809 "expect exactly one stream scan fragment, got: {:?}",
810 table_fragments.fragments
811 )
812 })?;
813 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
814 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
815 if let Some(o) = node.options
816 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
817 {
818 } else {
820 assert_eq!(
821 stream_scan_fragment.actors.len(),
822 1,
823 "Stream scan fragment should have only one actor"
824 );
825 }
826 }
827 }
828 let mut found_cdc_scan = false;
829 match &stream_scan_fragment.nodes.node_body {
830 Some(NodeBody::StreamCdcScan(_)) => {
831 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
832 if self
833 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
834 .await?
835 {
836 found_cdc_scan = true;
837 }
838 }
839 Some(NodeBody::Project(_)) => {
841 for input in &stream_scan_fragment.nodes.input {
842 assert_parallelism(stream_scan_fragment, &input.node_body);
843 if self
844 .validate_cdc_table_inner(&input.node_body, table.id)
845 .await?
846 {
847 found_cdc_scan = true;
848 }
849 }
850 }
851 _ => {
852 bail!("Unexpected node body for stream cdc scan");
853 }
854 };
855 if !found_cdc_scan {
856 bail!("No stream cdc scan node found in stream scan fragment");
857 }
858 Ok(())
859 }
860
861 async fn validate_cdc_table_inner(
862 &self,
863 node_body: &Option<NodeBody>,
864 table_id: TableId,
865 ) -> MetaResult<bool> {
866 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
867 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
868 {
869 let options_with_secret = WithOptionsSecResolved::new(
870 cdc_table_desc.connect_properties.clone(),
871 cdc_table_desc.secret_refs.clone(),
872 );
873
874 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
875 props.init_from_pb_cdc_table_desc(cdc_table_desc);
876
877 let _enumerator = props
879 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
880 .await?;
881
882 tracing::debug!(?table_id, "validate cdc table success");
883 Ok(true)
884 } else {
885 Ok(false)
886 }
887 }
888
889 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
890 let migrated = self
891 .metadata_manager
892 .catalog_controller
893 .has_table_been_migrated(table_id)
894 .await?;
895 if !migrated {
896 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
897 } else {
898 Ok(())
899 }
900 }
901
902 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
905 pub async fn create_streaming_job(
906 &self,
907 mut streaming_job: StreamingJob,
908 fragment_graph: StreamFragmentGraphProto,
909 dependencies: HashSet<ObjectId>,
910 specific_resource_group: Option<String>,
911 if_not_exists: bool,
912 ) -> MetaResult<NotificationVersion> {
913 if let StreamingJob::Sink(sink) = &streaming_job
914 && let Some(target_table) = sink.target_table
915 {
916 self.validate_table_for_sink(target_table).await?;
917 }
918 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
919 let check_ret = self
920 .metadata_manager
921 .catalog_controller
922 .create_job_catalog(
923 &mut streaming_job,
924 &ctx,
925 &fragment_graph.parallelism,
926 fragment_graph.max_parallelism as _,
927 dependencies,
928 specific_resource_group.clone(),
929 )
930 .await;
931 if let Err(meta_err) = check_ret {
932 if !if_not_exists {
933 return Err(meta_err);
934 }
935 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
936 if streaming_job.create_type() == CreateType::Foreground {
937 let database_id = streaming_job.database_id();
938 self.metadata_manager
939 .wait_streaming_job_finished(database_id, *job_id)
940 .await
941 } else {
942 Ok(IGNORED_NOTIFICATION_VERSION)
943 }
944 } else {
945 Err(meta_err)
946 };
947 }
948 let job_id = streaming_job.id();
949 tracing::debug!(
950 id = %job_id,
951 definition = streaming_job.definition(),
952 create_type = streaming_job.create_type().as_str_name(),
953 job_type = ?streaming_job.job_type(),
954 "starting streaming job",
955 );
956 let permit = self
958 .creating_streaming_job_permits
959 .semaphore
960 .clone()
961 .acquire_owned()
962 .instrument_await("acquire_creating_streaming_job_permit")
963 .await
964 .unwrap();
965 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
966
967 let name = streaming_job.name();
968 let definition = streaming_job.definition();
969 let source_id = match &streaming_job {
970 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
971 _ => None,
972 };
973
974 match self
976 .create_streaming_job_inner(
977 ctx,
978 streaming_job,
979 fragment_graph,
980 specific_resource_group,
981 permit,
982 )
983 .await
984 {
985 Ok(version) => Ok(version),
986 Err(err) => {
987 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
988 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
989 id: job_id,
990 name,
991 definition,
992 error: err.as_report().to_string(),
993 };
994 self.env.event_log_manager_ref().add_event_logs(vec![
995 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
996 ]);
997 let (aborted, _) = self
998 .metadata_manager
999 .catalog_controller
1000 .try_abort_creating_streaming_job(job_id, false)
1001 .await?;
1002 if aborted {
1003 tracing::warn!(id = %job_id, "aborted streaming job");
1004 if let Some(source_id) = source_id {
1006 self.source_manager
1007 .apply_source_change(SourceChange::DropSource {
1008 dropped_source_ids: vec![source_id],
1009 })
1010 .await;
1011 }
1012 }
1013 Err(err)
1014 }
1015 }
1016 }
1017
1018 #[await_tree::instrument(boxed)]
1019 async fn create_streaming_job_inner(
1020 &self,
1021 ctx: StreamContext,
1022 mut streaming_job: StreamingJob,
1023 fragment_graph: StreamFragmentGraphProto,
1024 specific_resource_group: Option<String>,
1025 permit: OwnedSemaphorePermit,
1026 ) -> MetaResult<NotificationVersion> {
1027 let mut fragment_graph =
1028 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1029 streaming_job.set_info_from_graph(&fragment_graph);
1030
1031 let incomplete_internal_tables = fragment_graph
1033 .incomplete_internal_tables()
1034 .into_values()
1035 .collect_vec();
1036 let table_id_map = self
1037 .metadata_manager
1038 .catalog_controller
1039 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1040 .await?;
1041 fragment_graph.refill_internal_table_ids(table_id_map);
1042
1043 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1045 let (ctx, stream_job_fragments) = self
1046 .build_stream_job(ctx, streaming_job, fragment_graph, specific_resource_group)
1047 .await?;
1048
1049 let streaming_job = &ctx.streaming_job;
1050
1051 match streaming_job {
1052 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1053 self.validate_cdc_table(table, &stream_job_fragments)
1054 .await?;
1055 }
1056 StreamingJob::Table(Some(source), ..) => {
1057 self.source_manager.register_source(source).await?;
1059 let connector_name = source
1060 .get_with_properties()
1061 .get(UPSTREAM_SOURCE_KEY)
1062 .cloned();
1063 let attr = source.info.as_ref().map(|source_info| {
1064 jsonbb::json!({
1065 "format": source_info.format().as_str_name(),
1066 "encode": source_info.row_encode().as_str_name(),
1067 })
1068 });
1069 report_create_object(
1070 streaming_job.id(),
1071 "source",
1072 PbTelemetryDatabaseObject::Source,
1073 connector_name,
1074 attr,
1075 );
1076 }
1077 StreamingJob::Sink(sink) => {
1078 if sink.auto_refresh_schema_from_table.is_some() {
1079 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1080 }
1081 validate_sink(sink).await?;
1083 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1084 let attr = sink.format_desc.as_ref().map(|sink_info| {
1085 jsonbb::json!({
1086 "format": sink_info.format().as_str_name(),
1087 "encode": sink_info.encode().as_str_name(),
1088 })
1089 });
1090 report_create_object(
1091 streaming_job.id(),
1092 "sink",
1093 PbTelemetryDatabaseObject::Sink,
1094 connector_name,
1095 attr,
1096 );
1097 }
1098 StreamingJob::Source(source) => {
1099 self.source_manager.register_source(source).await?;
1101 let connector_name = source
1102 .get_with_properties()
1103 .get(UPSTREAM_SOURCE_KEY)
1104 .cloned();
1105 let attr = source.info.as_ref().map(|source_info| {
1106 jsonbb::json!({
1107 "format": source_info.format().as_str_name(),
1108 "encode": source_info.row_encode().as_str_name(),
1109 })
1110 });
1111 report_create_object(
1112 streaming_job.id(),
1113 "source",
1114 PbTelemetryDatabaseObject::Source,
1115 connector_name,
1116 attr,
1117 );
1118 }
1119 _ => {}
1120 }
1121
1122 self.metadata_manager
1123 .catalog_controller
1124 .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1125 .await?;
1126
1127 let version = self
1129 .stream_manager
1130 .create_streaming_job(stream_job_fragments, ctx, permit)
1131 .await?;
1132
1133 Ok(version)
1134 }
1135
1136 pub async fn drop_object(
1138 &self,
1139 object_type: ObjectType,
1140 object_id: impl Into<ObjectId>,
1141 drop_mode: DropMode,
1142 ) -> MetaResult<NotificationVersion> {
1143 let object_id = object_id.into();
1144 let (release_ctx, version) = self
1145 .metadata_manager
1146 .catalog_controller
1147 .drop_object(object_type, object_id, drop_mode)
1148 .await?;
1149
1150 if object_type == ObjectType::Source {
1151 self.env
1152 .notification_manager_ref()
1153 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1154 }
1155
1156 let ReleaseContext {
1157 database_id,
1158 removed_streaming_job_ids,
1159 removed_state_table_ids,
1160 removed_source_ids,
1161 removed_secret_ids: secret_ids,
1162 removed_source_fragments,
1163 removed_actors,
1164 removed_fragments,
1165 removed_sink_fragment_by_targets,
1166 removed_iceberg_table_sinks,
1167 } = release_ctx;
1168
1169 let _guard = self.source_manager.pause_tick().await;
1170 self.stream_manager
1171 .drop_streaming_jobs(
1172 database_id,
1173 removed_actors.iter().map(|id| *id as _).collect(),
1174 removed_streaming_job_ids,
1175 removed_state_table_ids,
1176 removed_fragments.iter().map(|id| *id as _).collect(),
1177 removed_sink_fragment_by_targets
1178 .into_iter()
1179 .map(|(target, sinks)| {
1180 (target as _, sinks.into_iter().map(|id| id as _).collect())
1181 })
1182 .collect(),
1183 )
1184 .await;
1185
1186 self.source_manager
1189 .apply_source_change(SourceChange::DropSource {
1190 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1191 })
1192 .await;
1193
1194 let dropped_source_fragments = removed_source_fragments;
1197 self.source_manager
1198 .apply_source_change(SourceChange::DropMv {
1199 dropped_source_fragments,
1200 })
1201 .await;
1202
1203 for sink in removed_iceberg_table_sinks {
1205 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1206 .expect("Iceberg sink should be valid");
1207 let iceberg_sink =
1208 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1209 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1210 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1211 tracing::info!(
1212 "dropping iceberg table {} for dropped sink",
1213 table_identifier
1214 );
1215
1216 let _ = iceberg_catalog
1217 .drop_table(&table_identifier)
1218 .await
1219 .inspect_err(|err| {
1220 tracing::error!(
1221 "failed to drop iceberg table {} during cleanup: {}",
1222 table_identifier,
1223 err.as_report()
1224 );
1225 });
1226 }
1227 }
1228
1229 for secret in secret_ids {
1231 LocalSecretManager::global().remove_secret(secret);
1232 }
1233 Ok(version)
1234 }
1235
1236 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1238 pub async fn replace_job(
1239 &self,
1240 mut streaming_job: StreamingJob,
1241 fragment_graph: StreamFragmentGraphProto,
1242 ) -> MetaResult<NotificationVersion> {
1243 match &streaming_job {
1244 StreamingJob::Table(..)
1245 | StreamingJob::Source(..)
1246 | StreamingJob::MaterializedView(..) => {}
1247 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1248 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1249 }
1250 }
1251
1252 let job_id = streaming_job.id();
1253
1254 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1255 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1256
1257 let original_max_parallelism = self
1259 .metadata_manager
1260 .get_job_max_parallelism(streaming_job.id())
1261 .await?;
1262 let fragment_graph = PbStreamFragmentGraph {
1263 max_parallelism: original_max_parallelism as _,
1264 ..fragment_graph
1265 };
1266
1267 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1269 streaming_job.set_info_from_graph(&fragment_graph);
1270
1271 let streaming_job = streaming_job;
1273
1274 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1275 let auto_refresh_schema_sinks = self
1276 .metadata_manager
1277 .catalog_controller
1278 .get_sink_auto_refresh_schema_from(table.id)
1279 .await?;
1280 if !auto_refresh_schema_sinks.is_empty() {
1281 let original_table_columns = self
1282 .metadata_manager
1283 .catalog_controller
1284 .get_table_columns(table.id)
1285 .await?;
1286 let mut original_table_column_ids: HashSet<_> = original_table_columns
1288 .iter()
1289 .map(|col| col.column_id())
1290 .collect();
1291 let newly_added_columns = table
1292 .columns
1293 .iter()
1294 .filter(|col| {
1295 !original_table_column_ids.remove(&ColumnId::new(
1296 col.column_desc.as_ref().unwrap().column_id as _,
1297 ))
1298 })
1299 .map(|col| ColumnCatalog::from(col.clone()))
1300 .collect_vec();
1301 if !original_table_column_ids.is_empty() {
1302 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1303 }
1304 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1305 for sink in auto_refresh_schema_sinks {
1306 let sink_job_fragments = self
1307 .metadata_manager
1308 .get_job_fragments_by_id(sink.id.as_job_id())
1309 .await?;
1310 if sink_job_fragments.fragments.len() != 1 {
1311 return Err(anyhow!(
1312 "auto schema refresh sink must have only one fragment, but got {}",
1313 sink_job_fragments.fragments.len()
1314 )
1315 .into());
1316 }
1317 let original_sink_fragment =
1318 sink_job_fragments.fragments.into_values().next().unwrap();
1319 let (new_sink_fragment, new_schema, new_log_store_table) =
1320 rewrite_refresh_schema_sink_fragment(
1321 &original_sink_fragment,
1322 &sink,
1323 &newly_added_columns,
1324 table,
1325 fragment_graph.table_fragment_id(),
1326 self.env.id_gen_manager(),
1327 self.env.actor_id_generator(),
1328 )?;
1329
1330 assert_eq!(
1331 original_sink_fragment.actors.len(),
1332 new_sink_fragment.actors.len()
1333 );
1334 let actor_status = (0..original_sink_fragment.actors.len())
1335 .map(|i| {
1336 let worker_node_id = sink_job_fragments.actor_status
1337 [&original_sink_fragment.actors[i].actor_id]
1338 .location
1339 .as_ref()
1340 .unwrap()
1341 .worker_node_id;
1342 (
1343 new_sink_fragment.actors[i].actor_id,
1344 PbActorStatus {
1345 location: Some(PbActorLocation { worker_node_id }),
1346 },
1347 )
1348 })
1349 .collect();
1350
1351 let streaming_job = StreamingJob::Sink(sink);
1352
1353 let tmp_sink_id = self
1354 .metadata_manager
1355 .catalog_controller
1356 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1357 .await?
1358 .as_sink_id();
1359 let StreamingJob::Sink(sink) = streaming_job else {
1360 unreachable!()
1361 };
1362
1363 sinks.push(AutoRefreshSchemaSinkContext {
1364 tmp_sink_id,
1365 original_sink: sink,
1366 original_fragment: original_sink_fragment,
1367 new_schema,
1368 newly_add_fields: newly_added_columns
1369 .iter()
1370 .map(|col| Field::from(&col.column_desc))
1371 .collect(),
1372 new_fragment: new_sink_fragment,
1373 new_log_store_table,
1374 actor_status,
1375 });
1376 }
1377 Some(sinks)
1378 } else {
1379 None
1380 }
1381 } else {
1382 None
1383 };
1384
1385 let tmp_id = self
1386 .metadata_manager
1387 .catalog_controller
1388 .create_job_catalog_for_replace(
1389 &streaming_job,
1390 Some(&ctx),
1391 fragment_graph.specified_parallelism().as_ref(),
1392 Some(fragment_graph.max_parallelism()),
1393 )
1394 .await?;
1395
1396 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1397 sinks
1398 .iter()
1399 .map(|sink| sink.tmp_sink_id.as_object_id())
1400 .collect_vec()
1401 });
1402
1403 tracing::debug!(id = %job_id, "building replace streaming job");
1404 let mut updated_sink_catalogs = vec![];
1405
1406 let mut drop_table_connector_ctx = None;
1407 let result: MetaResult<_> = try {
1408 let (mut ctx, mut stream_job_fragments) = self
1409 .build_replace_job(
1410 ctx,
1411 &streaming_job,
1412 fragment_graph,
1413 tmp_id,
1414 auto_refresh_schema_sinks,
1415 )
1416 .await?;
1417 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1418 let auto_refresh_schema_sink_finish_ctx =
1419 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1420 sinks
1421 .iter()
1422 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1423 tmp_sink_id: sink.tmp_sink_id,
1424 original_sink_id: sink.original_sink.id,
1425 columns: sink.new_schema.clone(),
1426 new_log_store_table: sink
1427 .new_log_store_table
1428 .as_ref()
1429 .map(|table| (table.id, table.columns.clone())),
1430 })
1431 .collect()
1432 });
1433
1434 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1436 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1437 let upstream_infos = self
1438 .metadata_manager
1439 .catalog_controller
1440 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1441 .await?;
1442 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1443
1444 for upstream_info in &upstream_infos {
1445 let upstream_fragment_id = upstream_info.sink_fragment_id;
1446 ctx.upstream_fragment_downstreams
1447 .entry(upstream_fragment_id)
1448 .or_default()
1449 .push(upstream_info.new_sink_downstream.clone());
1450 if upstream_info.sink_original_target_columns.is_empty() {
1451 updated_sink_catalogs.push(upstream_info.sink_id);
1452 }
1453 }
1454 }
1455
1456 let replace_upstream = ctx.replace_upstream.clone();
1457
1458 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1459 let empty_downstreams = FragmentDownstreamRelation::default();
1460 for sink in sinks {
1461 self.metadata_manager
1462 .catalog_controller
1463 .prepare_streaming_job(
1464 sink.tmp_sink_id.as_job_id(),
1465 || [&sink.new_fragment].into_iter(),
1466 &empty_downstreams,
1467 true,
1468 None,
1469 )
1470 .await?;
1471 }
1472 }
1473
1474 self.metadata_manager
1475 .catalog_controller
1476 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1477 .await?;
1478
1479 self.stream_manager
1480 .replace_stream_job(stream_job_fragments, ctx)
1481 .await?;
1482 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1483 };
1484
1485 match result {
1486 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1487 let version = self
1488 .metadata_manager
1489 .catalog_controller
1490 .finish_replace_streaming_job(
1491 tmp_id,
1492 streaming_job,
1493 replace_upstream,
1494 SinkIntoTableContext {
1495 updated_sink_catalogs,
1496 },
1497 drop_table_connector_ctx.as_ref(),
1498 auto_refresh_schema_sink_finish_ctx,
1499 )
1500 .await?;
1501 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1502 self.source_manager
1503 .apply_source_change(SourceChange::DropSource {
1504 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1505 })
1506 .await;
1507 }
1508 Ok(version)
1509 }
1510 Err(err) => {
1511 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1512 let _ = self.metadata_manager
1513 .catalog_controller
1514 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1515 .await.inspect_err(|err| {
1516 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1517 });
1518 Err(err)
1519 }
1520 }
1521 }
1522
1523 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1524 )]
1525 async fn drop_streaming_job(
1526 &self,
1527 job_id: StreamingJobId,
1528 drop_mode: DropMode,
1529 ) -> MetaResult<NotificationVersion> {
1530 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1531
1532 let (object_id, object_type) = match job_id {
1533 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1534 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1535 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1536 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1537 };
1538
1539 let job_status = self
1540 .metadata_manager
1541 .catalog_controller
1542 .get_streaming_job_status(job_id.id())
1543 .await?;
1544 let version = match job_status {
1545 JobStatus::Initial => {
1546 unreachable!(
1547 "Job with Initial status should not notify frontend and therefore should not arrive here"
1548 );
1549 }
1550 JobStatus::Creating => {
1551 self.stream_manager
1552 .cancel_streaming_jobs(vec![job_id.id()])
1553 .await?;
1554 IGNORED_NOTIFICATION_VERSION
1555 }
1556 JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1557 };
1558
1559 Ok(version)
1560 }
1561
1562 fn resolve_stream_parallelism(
1566 &self,
1567 specified: Option<NonZeroUsize>,
1568 max: NonZeroUsize,
1569 cluster_info: &StreamingClusterInfo,
1570 resource_group: String,
1571 ) -> MetaResult<NonZeroUsize> {
1572 let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1573 DdlController::resolve_stream_parallelism_inner(
1574 specified,
1575 max,
1576 available,
1577 &self.env.opts.default_parallelism,
1578 &resource_group,
1579 )
1580 }
1581
1582 fn resolve_stream_parallelism_inner(
1583 specified: Option<NonZeroUsize>,
1584 max: NonZeroUsize,
1585 available: Option<NonZeroUsize>,
1586 default_parallelism: &DefaultParallelism,
1587 resource_group: &str,
1588 ) -> MetaResult<NonZeroUsize> {
1589 let Some(available) = available else {
1590 bail_unavailable!(
1591 "no available slots to schedule in resource group \"{}\", \
1592 have you allocated any compute nodes within this resource group?",
1593 resource_group
1594 );
1595 };
1596
1597 if let Some(specified) = specified {
1598 if specified > max {
1599 bail_invalid_parameter!(
1600 "specified parallelism {} should not exceed max parallelism {}",
1601 specified,
1602 max,
1603 );
1604 }
1605 if specified > available {
1606 tracing::warn!(
1607 resource_group,
1608 specified_parallelism = specified.get(),
1609 available_parallelism = available.get(),
1610 "specified parallelism exceeds available slots, scheduling with specified value",
1611 );
1612 }
1613 return Ok(specified);
1614 }
1615
1616 let default_parallelism = match default_parallelism {
1618 DefaultParallelism::Full => available,
1619 DefaultParallelism::Default(num) => {
1620 if *num > available {
1621 tracing::warn!(
1622 resource_group,
1623 configured_parallelism = num.get(),
1624 available_parallelism = available.get(),
1625 "default parallelism exceeds available slots, scheduling with configured value",
1626 );
1627 }
1628 *num
1629 }
1630 };
1631
1632 if default_parallelism > max {
1633 tracing::warn!(
1634 max_parallelism = max.get(),
1635 resource_group,
1636 "default parallelism exceeds max parallelism, capping to max",
1637 );
1638 }
1639 Ok(default_parallelism.min(max))
1640 }
1641
1642 #[await_tree::instrument]
1648 pub(crate) async fn build_stream_job(
1649 &self,
1650 stream_ctx: StreamContext,
1651 mut stream_job: StreamingJob,
1652 fragment_graph: StreamFragmentGraph,
1653 specific_resource_group: Option<String>,
1654 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1655 let id = stream_job.id();
1656 let specified_parallelism = fragment_graph.specified_parallelism();
1657 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1658
1659 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1661
1662 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1666 fragment_graph.collect_snapshot_backfill_info()?;
1667 assert!(
1668 snapshot_backfill_info
1669 .iter()
1670 .chain([&cross_db_snapshot_backfill_info])
1671 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1672 .all(|backfill_epoch| backfill_epoch.is_none()),
1673 "should not set backfill epoch when initially build the job: {:?} {:?}",
1674 snapshot_backfill_info,
1675 cross_db_snapshot_backfill_info
1676 );
1677
1678 let locality_fragment_state_table_mapping =
1679 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1680
1681 self.metadata_manager
1683 .catalog_controller
1684 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1685 .await?;
1686
1687 let upstream_table_ids = fragment_graph
1688 .dependent_table_ids()
1689 .iter()
1690 .filter(|id| {
1691 !cross_db_snapshot_backfill_info
1692 .upstream_mv_table_id_to_backfill_epoch
1693 .contains_key(id)
1694 })
1695 .cloned()
1696 .collect();
1697
1698 let (upstream_root_fragments, existing_actor_location) = self
1699 .metadata_manager
1700 .get_upstream_root_fragments(&upstream_table_ids)
1701 .await?;
1702
1703 if snapshot_backfill_info.is_some() {
1704 match stream_job {
1705 StreamingJob::MaterializedView(_)
1706 | StreamingJob::Sink(_)
1707 | StreamingJob::Index(_, _) => {}
1708 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1709 return Err(
1710 anyhow!("snapshot_backfill not enabled for table and source").into(),
1711 );
1712 }
1713 }
1714 }
1715
1716 let upstream_actors = upstream_root_fragments
1717 .values()
1718 .map(|(fragment, _)| {
1719 (
1720 fragment.fragment_id,
1721 fragment.actors.keys().copied().collect(),
1722 )
1723 })
1724 .collect();
1725
1726 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1727 fragment_graph,
1728 FragmentGraphUpstreamContext {
1729 upstream_root_fragments,
1730 upstream_actor_location: existing_actor_location,
1731 },
1732 (&stream_job).into(),
1733 )?;
1734 let resource_group = match specific_resource_group {
1735 None => {
1736 self.metadata_manager
1737 .get_database_resource_group(stream_job.database_id())
1738 .await?
1739 }
1740 Some(resource_group) => resource_group,
1741 };
1742
1743 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1745
1746 let parallelism = self.resolve_stream_parallelism(
1747 specified_parallelism,
1748 max_parallelism,
1749 &cluster_info,
1750 resource_group.clone(),
1751 )?;
1752
1753 let parallelism = self
1754 .env
1755 .system_params_reader()
1756 .await
1757 .adaptive_parallelism_strategy()
1758 .compute_target_parallelism(parallelism.get());
1759
1760 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1761 let actor_graph_builder = ActorGraphBuilder::new(
1762 id,
1763 resource_group,
1764 complete_graph,
1765 cluster_info,
1766 parallelism,
1767 )?;
1768
1769 let ActorGraphBuildResult {
1770 graph,
1771 downstream_fragment_relations,
1772 building_locations,
1773 upstream_fragment_downstreams,
1774 new_no_shuffle,
1775 replace_upstream,
1776 ..
1777 } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1778 assert!(replace_upstream.is_empty());
1779
1780 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1787 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1788 _ => TableParallelism::Fixed(parallelism.get()),
1789 };
1790
1791 let stream_job_fragments = StreamJobFragments::new(
1792 id,
1793 graph,
1794 &building_locations.actor_locations,
1795 stream_ctx.clone(),
1796 table_parallelism,
1797 max_parallelism.get(),
1798 );
1799
1800 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1801 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1802 }
1803
1804 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1805 && let Ok(table_id) = sink.get_target_table()
1806 {
1807 let tables = self
1808 .metadata_manager
1809 .get_table_catalog_by_ids(&[*table_id])
1810 .await?;
1811 let target_table = tables
1812 .first()
1813 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1814 let sink_fragment = stream_job_fragments
1815 .sink_fragment()
1816 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1817 let mview_fragment_id = self
1818 .metadata_manager
1819 .catalog_controller
1820 .get_mview_fragment_by_id(table_id.as_job_id())
1821 .await?;
1822 let upstream_sink_info = build_upstream_sink_info(
1823 sink,
1824 sink_fragment.fragment_id as _,
1825 target_table,
1826 mview_fragment_id,
1827 )?;
1828 Some(upstream_sink_info)
1829 } else {
1830 None
1831 };
1832
1833 let mut cdc_table_snapshot_splits = None;
1834 if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1835 && let Some((_, stream_cdc_scan)) =
1836 parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1837 {
1838 {
1839 let splits = try_init_parallel_cdc_table_snapshot_splits(
1841 table.id,
1842 stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1843 self.env.meta_store_ref(),
1844 stream_cdc_scan.options.as_ref().unwrap(),
1845 self.env.opts.cdc_table_split_init_insert_batch_size,
1846 self.env.opts.cdc_table_split_init_sleep_interval_splits,
1847 self.env.opts.cdc_table_split_init_sleep_duration_millis,
1848 )
1849 .await?;
1850 cdc_table_snapshot_splits = Some(splits);
1851 }
1852 }
1853
1854 let ctx = CreateStreamingJobContext {
1855 upstream_fragment_downstreams,
1856 new_no_shuffle,
1857 upstream_actors,
1858 building_locations,
1859 definition: stream_job.definition(),
1860 create_type: stream_job.create_type(),
1861 job_type: (&stream_job).into(),
1862 streaming_job: stream_job,
1863 new_upstream_sink,
1864 option: CreateStreamingJobOption {},
1865 snapshot_backfill_info,
1866 cross_db_snapshot_backfill_info,
1867 fragment_backfill_ordering,
1868 locality_fragment_state_table_mapping,
1869 cdc_table_snapshot_splits,
1870 };
1871
1872 Ok((
1873 ctx,
1874 StreamJobFragmentsToCreate {
1875 inner: stream_job_fragments,
1876 downstreams: downstream_fragment_relations,
1877 },
1878 ))
1879 }
1880
1881 pub(crate) async fn build_replace_job(
1887 &self,
1888 stream_ctx: StreamContext,
1889 stream_job: &StreamingJob,
1890 mut fragment_graph: StreamFragmentGraph,
1891 tmp_job_id: JobId,
1892 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1893 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1894 match &stream_job {
1895 StreamingJob::Table(..)
1896 | StreamingJob::Source(..)
1897 | StreamingJob::MaterializedView(..) => {}
1898 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1899 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1900 }
1901 }
1902
1903 let id = stream_job.id();
1904
1905 let mut drop_table_associated_source_id = None;
1907 if let StreamingJob::Table(None, _, _) = &stream_job {
1908 drop_table_associated_source_id = self
1909 .metadata_manager
1910 .get_table_associated_source_id(id.as_mv_table_id())
1911 .await?;
1912 }
1913
1914 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1915 let old_internal_table_ids = old_fragments.internal_table_ids();
1916
1917 let mut drop_table_connector_ctx = None;
1919 if let Some(to_remove_source_id) = drop_table_associated_source_id {
1920 debug_assert!(old_internal_table_ids.len() == 1);
1922
1923 drop_table_connector_ctx = Some(DropTableConnectorContext {
1924 to_change_streaming_job_id: id,
1927 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
1929 });
1930 } else if stream_job.is_materialized_view() {
1931 let old_fragments_upstreams = self
1934 .metadata_manager
1935 .catalog_controller
1936 .upstream_fragments(old_fragments.fragment_ids())
1937 .await?;
1938
1939 let old_state_graph =
1940 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1941 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1942 let result = state_match::match_graph(&new_state_graph, &old_state_graph)
1943 .context("incompatible altering on the streaming job states")?;
1944
1945 fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
1946 fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
1947 } else {
1948 let old_internal_tables = self
1951 .metadata_manager
1952 .get_table_catalog_by_ids(&old_internal_table_ids)
1953 .await?;
1954 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
1955 }
1956
1957 let original_root_fragment = old_fragments
1960 .root_fragment()
1961 .expect("root fragment not found");
1962
1963 let job_type = StreamingJobType::from(stream_job);
1964
1965 let (mut downstream_fragments, mut downstream_actor_location) =
1967 self.metadata_manager.get_downstream_fragments(id).await?;
1968
1969 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
1970 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
1971 .iter()
1972 .map(|sink| sink.original_fragment.fragment_id)
1973 .collect();
1974 for (_, downstream_fragment, nodes) in &mut downstream_fragments {
1975 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
1976 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
1977 }) {
1978 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
1979 for actor_id in downstream_fragment.actors.keys() {
1980 downstream_actor_location.remove(actor_id);
1981 }
1982 for (actor_id, status) in &sink.actor_status {
1983 downstream_actor_location
1984 .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
1985 }
1986
1987 *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
1988 *nodes = sink.new_fragment.nodes.clone();
1989 }
1990 }
1991 assert!(remaining_fragment.is_empty());
1992 }
1993
1994 let complete_graph = match &job_type {
1996 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
1997 CompleteStreamFragmentGraph::with_downstreams(
1998 fragment_graph,
1999 FragmentGraphDownstreamContext {
2000 original_root_fragment_id: original_root_fragment.fragment_id,
2001 downstream_fragments,
2002 downstream_actor_location,
2003 },
2004 job_type,
2005 )?
2006 }
2007 StreamingJobType::Table(TableJobType::SharedCdcSource)
2008 | StreamingJobType::MaterializedView => {
2009 let (upstream_root_fragments, upstream_actor_location) = self
2011 .metadata_manager
2012 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2013 .await?;
2014
2015 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2016 fragment_graph,
2017 FragmentGraphUpstreamContext {
2018 upstream_root_fragments,
2019 upstream_actor_location,
2020 },
2021 FragmentGraphDownstreamContext {
2022 original_root_fragment_id: original_root_fragment.fragment_id,
2023 downstream_fragments,
2024 downstream_actor_location,
2025 },
2026 job_type,
2027 )?
2028 }
2029 _ => unreachable!(),
2030 };
2031
2032 let resource_group = self
2033 .metadata_manager
2034 .get_existing_job_resource_group(id)
2035 .await?;
2036
2037 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2039
2040 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2043 .expect("The number of actors in the original table fragment should be greater than 0");
2044
2045 let actor_graph_builder = ActorGraphBuilder::new(
2046 id,
2047 resource_group,
2048 complete_graph,
2049 cluster_info,
2050 parallelism,
2051 )?;
2052
2053 let ActorGraphBuildResult {
2054 graph,
2055 downstream_fragment_relations,
2056 building_locations,
2057 upstream_fragment_downstreams,
2058 mut replace_upstream,
2059 new_no_shuffle,
2060 ..
2061 } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2062
2063 if matches!(
2065 job_type,
2066 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2067 ) {
2068 assert!(upstream_fragment_downstreams.is_empty());
2069 }
2070
2071 let stream_job_fragments = StreamJobFragments::new(
2075 tmp_job_id,
2076 graph,
2077 &building_locations.actor_locations,
2078 stream_ctx,
2079 old_fragments.assigned_parallelism,
2080 old_fragments.max_parallelism,
2081 );
2082
2083 if let Some(sinks) = &auto_refresh_schema_sinks {
2084 for sink in sinks {
2085 replace_upstream
2086 .remove(&sink.new_fragment.fragment_id)
2087 .expect("should exist");
2088 }
2089 }
2090
2091 let ctx = ReplaceStreamJobContext {
2095 old_fragments,
2096 replace_upstream,
2097 new_no_shuffle,
2098 upstream_fragment_downstreams,
2099 building_locations,
2100 streaming_job: stream_job.clone(),
2101 tmp_id: tmp_job_id,
2102 drop_table_connector_ctx,
2103 auto_refresh_schema_sinks,
2104 };
2105
2106 Ok((
2107 ctx,
2108 StreamJobFragmentsToCreate {
2109 inner: stream_job_fragments,
2110 downstreams: downstream_fragment_relations,
2111 },
2112 ))
2113 }
2114
2115 async fn alter_name(
2116 &self,
2117 relation: alter_name_request::Object,
2118 new_name: &str,
2119 ) -> MetaResult<NotificationVersion> {
2120 let (obj_type, id) = match relation {
2121 alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2122 alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2123 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2124 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2125 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2126 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2127 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2128 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2129 };
2130 self.metadata_manager
2131 .catalog_controller
2132 .alter_name(obj_type, id, new_name)
2133 .await
2134 }
2135
2136 async fn alter_swap_rename(
2137 &self,
2138 object: alter_swap_rename_request::Object,
2139 ) -> MetaResult<NotificationVersion> {
2140 let (obj_type, src_id, dst_id) = match object {
2141 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2142 alter_swap_rename_request::Object::Table(objs) => {
2143 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2144 (ObjectType::Table, src_id, dst_id)
2145 }
2146 alter_swap_rename_request::Object::View(objs) => {
2147 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2148 (ObjectType::View, src_id, dst_id)
2149 }
2150 alter_swap_rename_request::Object::Source(objs) => {
2151 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2152 (ObjectType::Source, src_id, dst_id)
2153 }
2154 alter_swap_rename_request::Object::Sink(objs) => {
2155 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2156 (ObjectType::Sink, src_id, dst_id)
2157 }
2158 alter_swap_rename_request::Object::Subscription(objs) => {
2159 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2160 (ObjectType::Subscription, src_id, dst_id)
2161 }
2162 };
2163
2164 self.metadata_manager
2165 .catalog_controller
2166 .alter_swap_rename(obj_type, src_id, dst_id)
2167 .await
2168 }
2169
2170 async fn alter_owner(
2171 &self,
2172 object: Object,
2173 owner_id: UserId,
2174 ) -> MetaResult<NotificationVersion> {
2175 let (obj_type, id) = match object {
2176 Object::TableId(id) => (ObjectType::Table, id),
2177 Object::ViewId(id) => (ObjectType::View, id),
2178 Object::SourceId(id) => (ObjectType::Source, id),
2179 Object::SinkId(id) => (ObjectType::Sink, id),
2180 Object::SchemaId(id) => (ObjectType::Schema, id),
2181 Object::DatabaseId(id) => (ObjectType::Database, id),
2182 Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2183 Object::ConnectionId(id) => (ObjectType::Connection, id),
2184 };
2185 self.metadata_manager
2186 .catalog_controller
2187 .alter_owner(obj_type, id.into(), owner_id as _)
2188 .await
2189 }
2190
2191 async fn alter_set_schema(
2192 &self,
2193 object: alter_set_schema_request::Object,
2194 new_schema_id: SchemaId,
2195 ) -> MetaResult<NotificationVersion> {
2196 let (obj_type, id) = match object {
2197 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2198 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2199 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2200 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2201 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2202 alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2203 alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2204 };
2205 self.metadata_manager
2206 .catalog_controller
2207 .alter_schema(obj_type, id.into(), new_schema_id as _)
2208 .await
2209 }
2210
2211 pub async fn wait(&self) -> MetaResult<()> {
2212 let timeout_ms = 30 * 60 * 1000;
2213 for _ in 0..timeout_ms {
2214 if self
2215 .metadata_manager
2216 .catalog_controller
2217 .list_background_creating_jobs(true, None)
2218 .await?
2219 .is_empty()
2220 {
2221 return Ok(());
2222 }
2223
2224 sleep(Duration::from_millis(1)).await;
2225 }
2226 Err(MetaError::cancelled(format!(
2227 "timeout after {timeout_ms}ms"
2228 )))
2229 }
2230
2231 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2232 self.metadata_manager
2233 .catalog_controller
2234 .comment_on(comment)
2235 .await
2236 }
2237
2238 async fn alter_streaming_job_config(
2239 &self,
2240 job_id: JobId,
2241 entries_to_add: HashMap<String, String>,
2242 keys_to_remove: Vec<String>,
2243 ) -> MetaResult<NotificationVersion> {
2244 self.metadata_manager
2245 .catalog_controller
2246 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2247 .await
2248 }
2249}
2250
2251fn report_create_object(
2252 job_id: JobId,
2253 event_name: &str,
2254 obj_type: PbTelemetryDatabaseObject,
2255 connector_name: Option<String>,
2256 attr_info: Option<jsonbb::Value>,
2257) {
2258 report_event(
2259 PbTelemetryEventStage::CreateStreamJob,
2260 event_name,
2261 job_id.as_raw_id() as _,
2262 connector_name,
2263 Some(obj_type),
2264 attr_info,
2265 );
2266}
2267
2268pub fn build_upstream_sink_info(
2269 sink: &PbSink,
2270 sink_fragment_id: FragmentId,
2271 target_table: &PbTable,
2272 target_fragment_id: FragmentId,
2273) -> MetaResult<UpstreamSinkInfo> {
2274 let sink_columns = if !sink.original_target_columns.is_empty() {
2275 sink.original_target_columns.clone()
2276 } else {
2277 target_table.columns.clone()
2282 };
2283
2284 let sink_output_fields = sink_columns
2285 .iter()
2286 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2287 .collect_vec();
2288 let output_indices = (0..sink_output_fields.len())
2289 .map(|i| i as u32)
2290 .collect_vec();
2291
2292 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2293 let sink_idx_by_col_id = sink_columns
2294 .iter()
2295 .enumerate()
2296 .map(|(idx, col)| {
2297 let column_id = col.column_desc.as_ref().unwrap().column_id;
2298 (column_id, idx as u32)
2299 })
2300 .collect::<HashMap<_, _>>();
2301 target_table
2302 .distribution_key
2303 .iter()
2304 .map(|dist_idx| {
2305 let column_id = target_table.columns[*dist_idx as usize]
2306 .column_desc
2307 .as_ref()
2308 .unwrap()
2309 .column_id;
2310 let sink_idx = sink_idx_by_col_id
2311 .get(&column_id)
2312 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2313 Ok(*sink_idx)
2314 })
2315 .collect::<anyhow::Result<Vec<_>>>()?
2316 };
2317 let dist_key_indices =
2318 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2319 let downstream_fragment_id = target_fragment_id as _;
2320 let new_downstream_relation = DownstreamFragmentRelation {
2321 downstream_fragment_id,
2322 dispatcher_type: DispatcherType::Hash,
2323 dist_key_indices,
2324 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2325 };
2326 let current_target_columns = target_table.get_columns();
2327 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2328 Ok(UpstreamSinkInfo {
2329 sink_id: sink.id,
2330 sink_fragment_id: sink_fragment_id as _,
2331 sink_output_fields,
2332 sink_original_target_columns: sink.get_original_target_columns().clone(),
2333 project_exprs,
2334 new_sink_downstream: new_downstream_relation,
2335 })
2336}
2337
2338pub fn refill_upstream_sink_union_in_table(
2339 union_fragment_root: &mut PbStreamNode,
2340 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2341) {
2342 visit_stream_node_cont_mut(union_fragment_root, |node| {
2343 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2344 let init_upstreams = upstream_sink_infos
2345 .iter()
2346 .map(|info| PbUpstreamSinkInfo {
2347 upstream_fragment_id: info.sink_fragment_id,
2348 sink_output_schema: info.sink_output_fields.clone(),
2349 project_exprs: info.project_exprs.clone(),
2350 })
2351 .collect();
2352 upstream_sink_union.init_upstreams = init_upstreams;
2353 false
2354 } else {
2355 true
2356 }
2357 });
2358}
2359
2360#[cfg(test)]
2361mod tests {
2362 use std::num::NonZeroUsize;
2363
2364 use super::*;
2365
2366 #[test]
2367 fn test_specified_parallelism_exceeds_available() {
2368 let result = DdlController::resolve_stream_parallelism_inner(
2369 Some(NonZeroUsize::new(100).unwrap()),
2370 NonZeroUsize::new(256).unwrap(),
2371 Some(NonZeroUsize::new(4).unwrap()),
2372 &DefaultParallelism::Full,
2373 "default",
2374 )
2375 .unwrap();
2376 assert_eq!(result.get(), 100);
2377 }
2378
2379 #[test]
2380 fn test_allows_default_parallelism_over_available() {
2381 let result = DdlController::resolve_stream_parallelism_inner(
2382 None,
2383 NonZeroUsize::new(256).unwrap(),
2384 Some(NonZeroUsize::new(4).unwrap()),
2385 &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2386 "default",
2387 )
2388 .unwrap();
2389 assert_eq!(result.get(), 50);
2390 }
2391
2392 #[test]
2393 fn test_full_parallelism_capped_by_max() {
2394 let result = DdlController::resolve_stream_parallelism_inner(
2395 None,
2396 NonZeroUsize::new(6).unwrap(),
2397 Some(NonZeroUsize::new(10).unwrap()),
2398 &DefaultParallelism::Full,
2399 "default",
2400 )
2401 .unwrap();
2402 assert_eq!(result.get(), 6);
2403 }
2404
2405 #[test]
2406 fn test_no_available_slots_returns_error() {
2407 let result = DdlController::resolve_stream_parallelism_inner(
2408 None,
2409 NonZeroUsize::new(4).unwrap(),
2410 None,
2411 &DefaultParallelism::Full,
2412 "default",
2413 );
2414 assert!(matches!(
2415 result,
2416 Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2417 ));
2418 }
2419
2420 #[test]
2421 fn test_specified_over_max_returns_error() {
2422 let result = DdlController::resolve_stream_parallelism_inner(
2423 Some(NonZeroUsize::new(8).unwrap()),
2424 NonZeroUsize::new(4).unwrap(),
2425 Some(NonZeroUsize::new(10).unwrap()),
2426 &DefaultParallelism::Full,
2427 "default",
2428 );
2429 assert!(matches!(
2430 result,
2431 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2432 ));
2433 }
2434}