1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50 Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
51 Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57 alter_swap_rename_request,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::stream_plan::stream_node::NodeBody;
61use risingwave_pb::stream_plan::{
62 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
63 StreamFragmentGraph as StreamFragmentGraphProto,
64};
65use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
66use strum::Display;
67use thiserror_ext::AsReport;
68use tokio::sync::{OwnedSemaphorePermit, Semaphore};
69use tokio::time::sleep;
70use tracing::Instrument;
71
72use crate::barrier::{BarrierManagerRef, Command};
73use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
74use crate::controller::cluster::StreamingClusterInfo;
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::controller::utils::build_select_node_list;
77use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
78use crate::manager::sink_coordination::SinkCoordinatorManager;
79use crate::manager::{
80 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
81 NotificationVersion, StreamingJob, StreamingJobType,
82};
83use crate::model::{
84 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
85 FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
86 TableParallelism,
87};
88use crate::stream::cdc::{
89 parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
90};
91use crate::stream::{
92 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
93 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
94 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
95 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
96 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
97 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
98};
99use crate::telemetry::report_event;
100use crate::{MetaError, MetaResult};
101
102#[derive(PartialEq)]
103pub enum DropMode {
104 Restrict,
105 Cascade,
106}
107
108impl DropMode {
109 pub fn from_request_setting(cascade: bool) -> DropMode {
110 if cascade {
111 DropMode::Cascade
112 } else {
113 DropMode::Restrict
114 }
115 }
116}
117
118#[derive(strum::AsRefStr)]
119pub enum StreamingJobId {
120 MaterializedView(TableId),
121 Sink(SinkId),
122 Table(Option<SourceId>, TableId),
123 Index(IndexId),
124}
125
126impl std::fmt::Display for StreamingJobId {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 write!(f, "{}", self.as_ref())?;
129 write!(f, "({})", self.id())
130 }
131}
132
133impl StreamingJobId {
134 fn id(&self) -> JobId {
135 match self {
136 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
137 StreamingJobId::Index(id) => id.as_job_id(),
138 StreamingJobId::Sink(id) => id.as_job_id(),
139 }
140 }
141}
142
143pub struct ReplaceStreamJobInfo {
146 pub streaming_job: StreamingJob,
147 pub fragment_graph: StreamFragmentGraphProto,
148}
149
150#[derive(Display)]
151pub enum DdlCommand {
152 CreateDatabase(Database),
153 DropDatabase(DatabaseId),
154 CreateSchema(Schema),
155 DropSchema(SchemaId, DropMode),
156 CreateNonSharedSource(Source),
157 DropSource(SourceId, DropMode),
158 ResetSource(SourceId),
159 CreateFunction(Function),
160 DropFunction(FunctionId, DropMode),
161 CreateView(View, HashSet<ObjectId>),
162 DropView(ViewId, DropMode),
163 CreateStreamingJob {
164 stream_job: StreamingJob,
165 fragment_graph: StreamFragmentGraphProto,
166 dependencies: HashSet<ObjectId>,
167 specific_resource_group: Option<String>, if_not_exists: bool,
169 },
170 DropStreamingJob {
171 job_id: StreamingJobId,
172 drop_mode: DropMode,
173 },
174 AlterName(alter_name_request::Object, String),
175 AlterSwapRename(alter_swap_rename_request::Object),
176 ReplaceStreamJob(ReplaceStreamJobInfo),
177 AlterNonSharedSource(Source),
178 AlterObjectOwner(Object, UserId),
179 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
180 CreateConnection(Connection),
181 DropConnection(ConnectionId, DropMode),
182 CreateSecret(Secret),
183 AlterSecret(Secret),
184 DropSecret(SecretId),
185 CommentOn(Comment),
186 CreateSubscription(Subscription),
187 DropSubscription(SubscriptionId, DropMode),
188 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
189 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
190}
191
192impl DdlCommand {
193 fn object(&self) -> Either<String, ObjectId> {
195 use Either::*;
196 match self {
197 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
198 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
199 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
200 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
201 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
202 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
203 DdlCommand::ResetSource(id) => Right(id.as_object_id()),
204 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
205 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
206 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
207 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
208 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
209 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
210 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
211 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
212 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
213 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
214 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
215 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
216 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
217 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
218 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
219 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
220 DdlCommand::DropSecret(id) => Right(id.as_object_id()),
221 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
222 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
223 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
224 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
225 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
226 }
227 }
228
229 fn allow_in_recovery(&self) -> bool {
230 match self {
231 DdlCommand::DropDatabase(_)
232 | DdlCommand::DropSchema(_, _)
233 | DdlCommand::DropSource(_, _)
234 | DdlCommand::DropFunction(_, _)
235 | DdlCommand::DropView(_, _)
236 | DdlCommand::DropStreamingJob { .. }
237 | DdlCommand::DropConnection(_, _)
238 | DdlCommand::DropSecret(_)
239 | DdlCommand::DropSubscription(_, _)
240 | DdlCommand::AlterName(_, _)
241 | DdlCommand::AlterObjectOwner(_, _)
242 | DdlCommand::AlterSetSchema(_, _)
243 | DdlCommand::CreateDatabase(_)
244 | DdlCommand::CreateSchema(_)
245 | DdlCommand::CreateFunction(_)
246 | DdlCommand::CreateView(_, _)
247 | DdlCommand::CreateConnection(_)
248 | DdlCommand::CommentOn(_)
249 | DdlCommand::CreateSecret(_)
250 | DdlCommand::AlterSecret(_)
251 | DdlCommand::AlterSwapRename(_)
252 | DdlCommand::AlterDatabaseParam(_, _)
253 | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
254 DdlCommand::CreateStreamingJob { .. }
255 | DdlCommand::CreateNonSharedSource(_)
256 | DdlCommand::ReplaceStreamJob(_)
257 | DdlCommand::AlterNonSharedSource(_)
258 | DdlCommand::ResetSource(_)
259 | DdlCommand::CreateSubscription(_) => false,
260 }
261 }
262}
263
264#[derive(Clone)]
265pub struct DdlController {
266 pub(crate) env: MetaSrvEnv,
267
268 pub(crate) metadata_manager: MetadataManager,
269 pub(crate) stream_manager: GlobalStreamManagerRef,
270 pub(crate) source_manager: SourceManagerRef,
271 barrier_manager: BarrierManagerRef,
272 sink_manager: SinkCoordinatorManager,
273
274 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
276
277 seq: Arc<AtomicU64>,
279}
280
281#[derive(Clone)]
282pub struct CreatingStreamingJobPermit {
283 pub(crate) semaphore: Arc<Semaphore>,
284}
285
286impl CreatingStreamingJobPermit {
287 async fn new(env: &MetaSrvEnv) -> Self {
288 let mut permits = env
289 .system_params_reader()
290 .await
291 .max_concurrent_creating_streaming_jobs() as usize;
292 if permits == 0 {
293 permits = Semaphore::MAX_PERMITS;
295 }
296 let semaphore = Arc::new(Semaphore::new(permits));
297
298 let (local_notification_tx, mut local_notification_rx) =
299 tokio::sync::mpsc::unbounded_channel();
300 env.notification_manager()
301 .insert_local_sender(local_notification_tx);
302 let semaphore_clone = semaphore.clone();
303 tokio::spawn(async move {
304 while let Some(notification) = local_notification_rx.recv().await {
305 let LocalNotification::SystemParamsChange(p) = ¬ification else {
306 continue;
307 };
308 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
309 if new_permits == 0 {
310 new_permits = Semaphore::MAX_PERMITS;
311 }
312 match permits.cmp(&new_permits) {
313 Ordering::Less => {
314 semaphore_clone.add_permits(new_permits - permits);
315 }
316 Ordering::Equal => continue,
317 Ordering::Greater => {
318 let to_release = permits - new_permits;
319 let reduced = semaphore_clone.forget_permits(to_release);
320 if reduced != to_release {
322 tracing::warn!(
323 "no enough permits to release, expected {}, but reduced {}",
324 to_release,
325 reduced
326 );
327 }
328 }
329 }
330 tracing::info!(
331 "max_concurrent_creating_streaming_jobs changed from {} to {}",
332 permits,
333 new_permits
334 );
335 permits = new_permits;
336 }
337 });
338
339 Self { semaphore }
340 }
341}
342
343impl DdlController {
344 pub async fn new(
345 env: MetaSrvEnv,
346 metadata_manager: MetadataManager,
347 stream_manager: GlobalStreamManagerRef,
348 source_manager: SourceManagerRef,
349 barrier_manager: BarrierManagerRef,
350 sink_manager: SinkCoordinatorManager,
351 ) -> Self {
352 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
353 Self {
354 env,
355 metadata_manager,
356 stream_manager,
357 source_manager,
358 barrier_manager,
359 sink_manager,
360 creating_streaming_job_permits,
361 seq: Arc::new(AtomicU64::new(0)),
362 }
363 }
364
365 pub fn next_seq(&self) -> u64 {
367 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
369 }
370
371 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
378 if !command.allow_in_recovery() {
379 self.barrier_manager.check_status_running()?;
380 }
381
382 let await_tree_key = format!("DDL Command {}", self.next_seq());
383 let await_tree_span = await_tree::span!("{command}({})", command.object());
384
385 let ctrl = self.clone();
386 let fut = async move {
387 match command {
388 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
389 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
390 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
391 DdlCommand::DropSchema(schema_id, drop_mode) => {
392 ctrl.drop_schema(schema_id, drop_mode).await
393 }
394 DdlCommand::CreateNonSharedSource(source) => {
395 ctrl.create_non_shared_source(source).await
396 }
397 DdlCommand::DropSource(source_id, drop_mode) => {
398 ctrl.drop_source(source_id, drop_mode).await
399 }
400 DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
401 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
402 DdlCommand::DropFunction(function_id, drop_mode) => {
403 ctrl.drop_function(function_id, drop_mode).await
404 }
405 DdlCommand::CreateView(view, dependencies) => {
406 ctrl.create_view(view, dependencies).await
407 }
408 DdlCommand::DropView(view_id, drop_mode) => {
409 ctrl.drop_view(view_id, drop_mode).await
410 }
411 DdlCommand::CreateStreamingJob {
412 stream_job,
413 fragment_graph,
414 dependencies,
415 specific_resource_group,
416 if_not_exists,
417 } => {
418 ctrl.create_streaming_job(
419 stream_job,
420 fragment_graph,
421 dependencies,
422 specific_resource_group,
423 if_not_exists,
424 )
425 .await
426 }
427 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
428 ctrl.drop_streaming_job(job_id, drop_mode).await
429 }
430 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
431 streaming_job,
432 fragment_graph,
433 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
434 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
435 DdlCommand::AlterObjectOwner(object, owner_id) => {
436 ctrl.alter_owner(object, owner_id).await
437 }
438 DdlCommand::AlterSetSchema(object, new_schema_id) => {
439 ctrl.alter_set_schema(object, new_schema_id).await
440 }
441 DdlCommand::CreateConnection(connection) => {
442 ctrl.create_connection(connection).await
443 }
444 DdlCommand::DropConnection(connection_id, drop_mode) => {
445 ctrl.drop_connection(connection_id, drop_mode).await
446 }
447 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
448 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
449 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
450 DdlCommand::AlterNonSharedSource(source) => {
451 ctrl.alter_non_shared_source(source).await
452 }
453 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
454 DdlCommand::CreateSubscription(subscription) => {
455 ctrl.create_subscription(subscription).await
456 }
457 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
458 ctrl.drop_subscription(subscription_id, drop_mode).await
459 }
460 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
461 DdlCommand::AlterDatabaseParam(database_id, param) => {
462 ctrl.alter_database_param(database_id, param).await
463 }
464 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
465 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
466 .await
467 }
468 }
469 }
470 .in_current_span();
471 let fut = (self.env.await_tree_reg())
472 .register(await_tree_key, await_tree_span)
473 .instrument(Box::pin(fut));
474 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
475 Ok(Some(WaitVersion {
476 catalog_version: notification_version,
477 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
478 }))
479 }
480
481 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
482 self.barrier_manager.get_ddl_progress().await
483 }
484
485 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
486 let (version, updated_db) = self
487 .metadata_manager
488 .catalog_controller
489 .create_database(database)
490 .await?;
491 self.barrier_manager
493 .update_database_barrier(
494 updated_db.database_id,
495 updated_db.barrier_interval_ms.map(|v| v as u32),
496 updated_db.checkpoint_frequency.map(|v| v as u64),
497 )
498 .await?;
499 Ok(version)
500 }
501
502 #[tracing::instrument(skip(self), level = "debug")]
503 pub async fn reschedule_streaming_job(
504 &self,
505 job_id: JobId,
506 target: ReschedulePolicy,
507 mut deferred: bool,
508 ) -> MetaResult<()> {
509 tracing::info!("altering parallelism for job {}", job_id);
510 if self.barrier_manager.check_status_running().is_err() {
511 tracing::info!(
512 "alter parallelism is set to deferred mode because the system is in recovery state"
513 );
514 deferred = true;
515 }
516
517 self.stream_manager
518 .reschedule_streaming_job(job_id, target, deferred)
519 .await
520 }
521
522 pub async fn reschedule_cdc_table_backfill(
523 &self,
524 job_id: JobId,
525 target: ReschedulePolicy,
526 ) -> MetaResult<()> {
527 tracing::info!("alter CDC table backfill parallelism");
528 if self.barrier_manager.check_status_running().is_err() {
529 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
530 }
531 self.stream_manager
532 .reschedule_cdc_table_backfill(job_id, target)
533 .await
534 }
535
536 pub async fn reschedule_fragments(
537 &self,
538 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
539 ) -> MetaResult<()> {
540 tracing::info!(
541 "altering parallelism for fragments {:?}",
542 fragment_targets.keys()
543 );
544 let fragment_targets = fragment_targets
545 .into_iter()
546 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
547 .collect();
548
549 self.stream_manager
550 .reschedule_fragments(fragment_targets)
551 .await
552 }
553
554 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
555 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
556 .await
557 }
558
559 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
560 self.metadata_manager
561 .catalog_controller
562 .create_schema(schema)
563 .await
564 }
565
566 async fn drop_schema(
567 &self,
568 schema_id: SchemaId,
569 drop_mode: DropMode,
570 ) -> MetaResult<NotificationVersion> {
571 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
572 .await
573 }
574
575 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
577 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
578 .await
579 .context("failed to create source worker")?;
580
581 let (source_id, version) = self
582 .metadata_manager
583 .catalog_controller
584 .create_source(source)
585 .await?;
586 self.source_manager
587 .register_source_with_handle(source_id, handle)
588 .await;
589 Ok(version)
590 }
591
592 async fn drop_source(
593 &self,
594 source_id: SourceId,
595 drop_mode: DropMode,
596 ) -> MetaResult<NotificationVersion> {
597 self.drop_object(ObjectType::Source, source_id, drop_mode)
598 .await
599 }
600
601 async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
602 tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
603
604 let database_id = self
606 .metadata_manager
607 .catalog_controller
608 .get_object_database_id(source_id)
609 .await?;
610
611 self.stream_manager
612 .barrier_scheduler
613 .run_command(database_id, Command::ResetSource { source_id })
614 .await?;
615
616 let version = self
618 .metadata_manager
619 .catalog_controller
620 .current_notification_version()
621 .await;
622 Ok(version)
623 }
624
625 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
628 self.metadata_manager
629 .catalog_controller
630 .alter_non_shared_source(source)
631 .await
632 }
633
634 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
635 self.metadata_manager
636 .catalog_controller
637 .create_function(function)
638 .await
639 }
640
641 async fn drop_function(
642 &self,
643 function_id: FunctionId,
644 drop_mode: DropMode,
645 ) -> MetaResult<NotificationVersion> {
646 self.drop_object(ObjectType::Function, function_id, drop_mode)
647 .await
648 }
649
650 async fn create_view(
651 &self,
652 view: View,
653 dependencies: HashSet<ObjectId>,
654 ) -> MetaResult<NotificationVersion> {
655 self.metadata_manager
656 .catalog_controller
657 .create_view(view, dependencies)
658 .await
659 }
660
661 async fn drop_view(
662 &self,
663 view_id: ViewId,
664 drop_mode: DropMode,
665 ) -> MetaResult<NotificationVersion> {
666 self.drop_object(ObjectType::View, view_id, drop_mode).await
667 }
668
669 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
670 validate_connection(&connection).await?;
671 self.metadata_manager
672 .catalog_controller
673 .create_connection(connection)
674 .await
675 }
676
677 async fn drop_connection(
678 &self,
679 connection_id: ConnectionId,
680 drop_mode: DropMode,
681 ) -> MetaResult<NotificationVersion> {
682 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
683 .await
684 }
685
686 async fn alter_database_param(
687 &self,
688 database_id: DatabaseId,
689 param: AlterDatabaseParam,
690 ) -> MetaResult<NotificationVersion> {
691 let (version, updated_db) = self
692 .metadata_manager
693 .catalog_controller
694 .alter_database_param(database_id, param)
695 .await?;
696 self.barrier_manager
698 .update_database_barrier(
699 database_id,
700 updated_db.barrier_interval_ms.map(|v| v as u32),
701 updated_db.checkpoint_frequency.map(|v| v as u64),
702 )
703 .await?;
704 Ok(version)
705 }
706
707 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
710 let secret_store_private_key = self
711 .env
712 .opts
713 .secret_store_private_key
714 .clone()
715 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
716
717 let encrypted_payload = SecretEncryption::encrypt(
718 secret_store_private_key.as_slice(),
719 secret.get_value().as_slice(),
720 )
721 .context(format!("failed to encrypt secret {}", secret.name))?;
722 Ok(encrypted_payload
723 .serialize()
724 .context(format!("failed to serialize secret {}", secret.name))?)
725 }
726
727 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
728 let secret_plain_payload = secret.value.clone();
731 let encrypted_payload = self.get_encrypted_payload(&secret)?;
732 secret.value = encrypted_payload;
733
734 self.metadata_manager
735 .catalog_controller
736 .create_secret(secret, secret_plain_payload)
737 .await
738 }
739
740 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
741 self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
742 .await
743 }
744
745 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
746 let secret_plain_payload = secret.value.clone();
747 let encrypted_payload = self.get_encrypted_payload(&secret)?;
748 secret.value = encrypted_payload;
749 self.metadata_manager
750 .catalog_controller
751 .alter_secret(secret, secret_plain_payload)
752 .await
753 }
754
755 async fn create_subscription(
756 &self,
757 mut subscription: Subscription,
758 ) -> MetaResult<NotificationVersion> {
759 tracing::debug!("create subscription");
760 let _permit = self
761 .creating_streaming_job_permits
762 .semaphore
763 .acquire()
764 .await
765 .unwrap();
766 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
767 self.metadata_manager
768 .catalog_controller
769 .create_subscription_catalog(&mut subscription)
770 .await?;
771 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
772 tracing::debug!(error = %err.as_report(), "failed to create subscription");
773 let _ = self
774 .metadata_manager
775 .catalog_controller
776 .try_abort_creating_subscription(subscription.id)
777 .await
778 .inspect_err(|e| {
779 tracing::error!(
780 error = %e.as_report(),
781 "failed to abort create subscription after failure"
782 );
783 });
784 return Err(err);
785 }
786
787 let version = self
788 .metadata_manager
789 .catalog_controller
790 .notify_create_subscription(subscription.id)
791 .await?;
792 tracing::debug!("finish create subscription");
793 Ok(version)
794 }
795
796 async fn drop_subscription(
797 &self,
798 subscription_id: SubscriptionId,
799 drop_mode: DropMode,
800 ) -> MetaResult<NotificationVersion> {
801 tracing::debug!("preparing drop subscription");
802 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
803 let subscription = self
804 .metadata_manager
805 .catalog_controller
806 .get_subscription_by_id(subscription_id)
807 .await?;
808 let table_id = subscription.dependent_table_id;
809 let database_id = subscription.database_id;
810 let (_, version) = self
811 .metadata_manager
812 .catalog_controller
813 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
814 .await?;
815 self.stream_manager
816 .drop_subscription(database_id, subscription_id, table_id)
817 .await;
818 tracing::debug!("finish drop subscription");
819 Ok(version)
820 }
821
822 #[await_tree::instrument]
824 pub(crate) async fn validate_cdc_table(
825 &self,
826 table: &Table,
827 table_fragments: &StreamJobFragments,
828 ) -> MetaResult<()> {
829 let stream_scan_fragment = table_fragments
830 .fragments
831 .values()
832 .filter(|f| {
833 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
834 || f.fragment_type_mask
835 .contains(FragmentTypeFlag::StreamCdcScan)
836 })
837 .exactly_one()
838 .ok()
839 .with_context(|| {
840 format!(
841 "expect exactly one stream scan fragment, got: {:?}",
842 table_fragments.fragments
843 )
844 })?;
845 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
846 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
847 if let Some(o) = node.options
848 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
849 {
850 } else {
852 assert_eq!(
853 stream_scan_fragment.actors.len(),
854 1,
855 "Stream scan fragment should have only one actor"
856 );
857 }
858 }
859 }
860 let mut found_cdc_scan = false;
861 match &stream_scan_fragment.nodes.node_body {
862 Some(NodeBody::StreamCdcScan(_)) => {
863 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
864 if self
865 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
866 .await?
867 {
868 found_cdc_scan = true;
869 }
870 }
871 Some(NodeBody::Project(_)) => {
873 for input in &stream_scan_fragment.nodes.input {
874 assert_parallelism(stream_scan_fragment, &input.node_body);
875 if self
876 .validate_cdc_table_inner(&input.node_body, table.id)
877 .await?
878 {
879 found_cdc_scan = true;
880 }
881 }
882 }
883 _ => {
884 bail!("Unexpected node body for stream cdc scan");
885 }
886 };
887 if !found_cdc_scan {
888 bail!("No stream cdc scan node found in stream scan fragment");
889 }
890 Ok(())
891 }
892
893 async fn validate_cdc_table_inner(
894 &self,
895 node_body: &Option<NodeBody>,
896 table_id: TableId,
897 ) -> MetaResult<bool> {
898 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
899 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
900 {
901 let options_with_secret = WithOptionsSecResolved::new(
902 cdc_table_desc.connect_properties.clone(),
903 cdc_table_desc.secret_refs.clone(),
904 );
905
906 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
907 props.init_from_pb_cdc_table_desc(cdc_table_desc);
908
909 let _enumerator = props
911 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
912 .await?;
913
914 tracing::debug!(?table_id, "validate cdc table success");
915 Ok(true)
916 } else {
917 Ok(false)
918 }
919 }
920
921 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
922 let migrated = self
923 .metadata_manager
924 .catalog_controller
925 .has_table_been_migrated(table_id)
926 .await?;
927 if !migrated {
928 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
929 } else {
930 Ok(())
931 }
932 }
933
934 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
937 pub async fn create_streaming_job(
938 &self,
939 mut streaming_job: StreamingJob,
940 fragment_graph: StreamFragmentGraphProto,
941 dependencies: HashSet<ObjectId>,
942 specific_resource_group: Option<String>,
943 if_not_exists: bool,
944 ) -> MetaResult<NotificationVersion> {
945 if let StreamingJob::Sink(sink) = &streaming_job
946 && let Some(target_table) = sink.target_table
947 {
948 self.validate_table_for_sink(target_table).await?;
949 }
950 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
951 let check_ret = self
952 .metadata_manager
953 .catalog_controller
954 .create_job_catalog(
955 &mut streaming_job,
956 &ctx,
957 &fragment_graph.parallelism,
958 fragment_graph.max_parallelism as _,
959 dependencies,
960 specific_resource_group.clone(),
961 &fragment_graph.backfill_parallelism,
962 )
963 .await;
964 if let Err(meta_err) = check_ret {
965 if !if_not_exists {
966 return Err(meta_err);
967 }
968 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
969 if streaming_job.create_type() == CreateType::Foreground {
970 let database_id = streaming_job.database_id();
971 self.metadata_manager
972 .wait_streaming_job_finished(database_id, *job_id)
973 .await
974 } else {
975 Ok(IGNORED_NOTIFICATION_VERSION)
976 }
977 } else {
978 Err(meta_err)
979 };
980 }
981 let job_id = streaming_job.id();
982 tracing::debug!(
983 id = %job_id,
984 definition = streaming_job.definition(),
985 create_type = streaming_job.create_type().as_str_name(),
986 job_type = ?streaming_job.job_type(),
987 "starting streaming job",
988 );
989 let permit = self
991 .creating_streaming_job_permits
992 .semaphore
993 .clone()
994 .acquire_owned()
995 .instrument_await("acquire_creating_streaming_job_permit")
996 .await
997 .unwrap();
998 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
999
1000 let name = streaming_job.name();
1001 let definition = streaming_job.definition();
1002 let source_id = match &streaming_job {
1003 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1004 _ => None,
1005 };
1006
1007 match self
1009 .create_streaming_job_inner(
1010 ctx,
1011 streaming_job,
1012 fragment_graph,
1013 specific_resource_group,
1014 permit,
1015 )
1016 .await
1017 {
1018 Ok(version) => Ok(version),
1019 Err(err) => {
1020 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1021 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1022 id: job_id,
1023 name,
1024 definition,
1025 error: err.as_report().to_string(),
1026 };
1027 self.env.event_log_manager_ref().add_event_logs(vec![
1028 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1029 ]);
1030 let (aborted, _) = self
1031 .metadata_manager
1032 .catalog_controller
1033 .try_abort_creating_streaming_job(job_id, false)
1034 .await?;
1035 if aborted {
1036 tracing::warn!(id = %job_id, "aborted streaming job");
1037 if let Some(source_id) = source_id {
1039 self.source_manager
1040 .apply_source_change(SourceChange::DropSource {
1041 dropped_source_ids: vec![source_id],
1042 })
1043 .await;
1044 }
1045 }
1046 Err(err)
1047 }
1048 }
1049 }
1050
1051 #[await_tree::instrument(boxed)]
1052 async fn create_streaming_job_inner(
1053 &self,
1054 ctx: StreamContext,
1055 mut streaming_job: StreamingJob,
1056 fragment_graph: StreamFragmentGraphProto,
1057 specific_resource_group: Option<String>,
1058 permit: OwnedSemaphorePermit,
1059 ) -> MetaResult<NotificationVersion> {
1060 let mut fragment_graph =
1061 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1062 streaming_job.set_info_from_graph(&fragment_graph);
1063
1064 let incomplete_internal_tables = fragment_graph
1066 .incomplete_internal_tables()
1067 .into_values()
1068 .collect_vec();
1069 let table_id_map = self
1070 .metadata_manager
1071 .catalog_controller
1072 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1073 .await?;
1074 fragment_graph.refill_internal_table_ids(table_id_map);
1075
1076 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1078 let (ctx, stream_job_fragments) = self
1079 .build_stream_job(ctx, streaming_job, fragment_graph, specific_resource_group)
1080 .await?;
1081
1082 let streaming_job = &ctx.streaming_job;
1083
1084 match streaming_job {
1085 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1086 self.validate_cdc_table(table, &stream_job_fragments)
1087 .await?;
1088 }
1089 StreamingJob::Table(Some(source), ..) => {
1090 self.source_manager.register_source(source).await?;
1092 let connector_name = source
1093 .get_with_properties()
1094 .get(UPSTREAM_SOURCE_KEY)
1095 .cloned();
1096 let attr = source.info.as_ref().map(|source_info| {
1097 jsonbb::json!({
1098 "format": source_info.format().as_str_name(),
1099 "encode": source_info.row_encode().as_str_name(),
1100 })
1101 });
1102 report_create_object(
1103 streaming_job.id(),
1104 "source",
1105 PbTelemetryDatabaseObject::Source,
1106 connector_name,
1107 attr,
1108 );
1109 }
1110 StreamingJob::Sink(sink) => {
1111 if sink.auto_refresh_schema_from_table.is_some() {
1112 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1113 }
1114 validate_sink(sink).await?;
1116 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1117 let attr = sink.format_desc.as_ref().map(|sink_info| {
1118 jsonbb::json!({
1119 "format": sink_info.format().as_str_name(),
1120 "encode": sink_info.encode().as_str_name(),
1121 })
1122 });
1123 report_create_object(
1124 streaming_job.id(),
1125 "sink",
1126 PbTelemetryDatabaseObject::Sink,
1127 connector_name,
1128 attr,
1129 );
1130 }
1131 StreamingJob::Source(source) => {
1132 self.source_manager.register_source(source).await?;
1134 let connector_name = source
1135 .get_with_properties()
1136 .get(UPSTREAM_SOURCE_KEY)
1137 .cloned();
1138 let attr = source.info.as_ref().map(|source_info| {
1139 jsonbb::json!({
1140 "format": source_info.format().as_str_name(),
1141 "encode": source_info.row_encode().as_str_name(),
1142 })
1143 });
1144 report_create_object(
1145 streaming_job.id(),
1146 "source",
1147 PbTelemetryDatabaseObject::Source,
1148 connector_name,
1149 attr,
1150 );
1151 }
1152 _ => {}
1153 }
1154
1155 self.metadata_manager
1156 .catalog_controller
1157 .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1158 .await?;
1159
1160 let version = self
1162 .stream_manager
1163 .create_streaming_job(stream_job_fragments, ctx, permit)
1164 .await?;
1165
1166 Ok(version)
1167 }
1168
1169 pub async fn drop_object(
1171 &self,
1172 object_type: ObjectType,
1173 object_id: impl Into<ObjectId>,
1174 drop_mode: DropMode,
1175 ) -> MetaResult<NotificationVersion> {
1176 let object_id = object_id.into();
1177 let (release_ctx, version) = self
1178 .metadata_manager
1179 .catalog_controller
1180 .drop_object(object_type, object_id, drop_mode)
1181 .await?;
1182
1183 if object_type == ObjectType::Source {
1184 self.env
1185 .notification_manager_ref()
1186 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1187 }
1188
1189 let ReleaseContext {
1190 database_id,
1191 removed_streaming_job_ids,
1192 removed_state_table_ids,
1193 removed_source_ids,
1194 removed_secret_ids: secret_ids,
1195 removed_source_fragments,
1196 removed_actors,
1197 removed_fragments,
1198 removed_sink_fragment_by_targets,
1199 removed_iceberg_table_sinks,
1200 } = release_ctx;
1201
1202 let _guard = self.source_manager.pause_tick().await;
1203 self.stream_manager
1204 .drop_streaming_jobs(
1205 database_id,
1206 removed_actors.iter().map(|id| *id as _).collect(),
1207 removed_streaming_job_ids,
1208 removed_state_table_ids,
1209 removed_fragments.iter().map(|id| *id as _).collect(),
1210 removed_sink_fragment_by_targets
1211 .into_iter()
1212 .map(|(target, sinks)| {
1213 (target as _, sinks.into_iter().map(|id| id as _).collect())
1214 })
1215 .collect(),
1216 )
1217 .await;
1218
1219 self.source_manager
1222 .apply_source_change(SourceChange::DropSource {
1223 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1224 })
1225 .await;
1226
1227 let dropped_source_fragments = removed_source_fragments;
1230 self.source_manager
1231 .apply_source_change(SourceChange::DropMv {
1232 dropped_source_fragments,
1233 })
1234 .await;
1235
1236 let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1238 .iter()
1239 .map(|sink| sink.id)
1240 .collect();
1241
1242 for sink in removed_iceberg_table_sinks {
1243 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1244 .expect("Iceberg sink should be valid");
1245 let iceberg_sink =
1246 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1247 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1248 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1249 tracing::info!(
1250 "dropping iceberg table {} for dropped sink",
1251 table_identifier
1252 );
1253
1254 let _ = iceberg_catalog
1255 .drop_table(&table_identifier)
1256 .await
1257 .inspect_err(|err| {
1258 tracing::error!(
1259 "failed to drop iceberg table {} during cleanup: {}",
1260 table_identifier,
1261 err.as_report()
1262 );
1263 });
1264 }
1265 }
1266
1267 if !iceberg_sink_ids.is_empty() {
1269 self.sink_manager
1270 .stop_sink_coordinator(iceberg_sink_ids)
1271 .await;
1272 }
1273
1274 for secret in secret_ids {
1276 LocalSecretManager::global().remove_secret(secret);
1277 }
1278 Ok(version)
1279 }
1280
1281 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1283 pub async fn replace_job(
1284 &self,
1285 mut streaming_job: StreamingJob,
1286 fragment_graph: StreamFragmentGraphProto,
1287 ) -> MetaResult<NotificationVersion> {
1288 match &streaming_job {
1289 StreamingJob::Table(..)
1290 | StreamingJob::Source(..)
1291 | StreamingJob::MaterializedView(..) => {}
1292 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1293 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1294 }
1295 }
1296
1297 let job_id = streaming_job.id();
1298
1299 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1300 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1301
1302 let original_max_parallelism = self
1304 .metadata_manager
1305 .get_job_max_parallelism(streaming_job.id())
1306 .await?;
1307 let fragment_graph = PbStreamFragmentGraph {
1308 max_parallelism: original_max_parallelism as _,
1309 ..fragment_graph
1310 };
1311
1312 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1314 streaming_job.set_info_from_graph(&fragment_graph);
1315
1316 let streaming_job = streaming_job;
1318
1319 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1320 let auto_refresh_schema_sinks = self
1321 .metadata_manager
1322 .catalog_controller
1323 .get_sink_auto_refresh_schema_from(table.id)
1324 .await?;
1325 if !auto_refresh_schema_sinks.is_empty() {
1326 let original_table_columns = self
1327 .metadata_manager
1328 .catalog_controller
1329 .get_table_columns(table.id)
1330 .await?;
1331 let mut original_table_column_ids: HashSet<_> = original_table_columns
1333 .iter()
1334 .map(|col| col.column_id())
1335 .collect();
1336 let newly_added_columns = table
1337 .columns
1338 .iter()
1339 .filter(|col| {
1340 !original_table_column_ids.remove(&ColumnId::new(
1341 col.column_desc.as_ref().unwrap().column_id as _,
1342 ))
1343 })
1344 .map(|col| ColumnCatalog::from(col.clone()))
1345 .collect_vec();
1346 if !original_table_column_ids.is_empty() {
1347 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1348 }
1349 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1350 for sink in auto_refresh_schema_sinks {
1351 let sink_job_fragments = self
1352 .metadata_manager
1353 .get_job_fragments_by_id(sink.id.as_job_id())
1354 .await?;
1355 if sink_job_fragments.fragments.len() != 1 {
1356 return Err(anyhow!(
1357 "auto schema refresh sink must have only one fragment, but got {}",
1358 sink_job_fragments.fragments.len()
1359 )
1360 .into());
1361 }
1362 let original_sink_fragment =
1363 sink_job_fragments.fragments.into_values().next().unwrap();
1364 let (new_sink_fragment, new_schema, new_log_store_table) =
1365 rewrite_refresh_schema_sink_fragment(
1366 &original_sink_fragment,
1367 &sink,
1368 &newly_added_columns,
1369 table,
1370 fragment_graph.table_fragment_id(),
1371 self.env.id_gen_manager(),
1372 self.env.actor_id_generator(),
1373 )?;
1374
1375 assert_eq!(
1376 original_sink_fragment.actors.len(),
1377 new_sink_fragment.actors.len()
1378 );
1379 let actor_status = (0..original_sink_fragment.actors.len())
1380 .map(|i| {
1381 let worker_node_id = sink_job_fragments.actor_status
1382 [&original_sink_fragment.actors[i].actor_id]
1383 .location
1384 .as_ref()
1385 .unwrap()
1386 .worker_node_id;
1387 (
1388 new_sink_fragment.actors[i].actor_id,
1389 PbActorStatus {
1390 location: Some(PbActorLocation { worker_node_id }),
1391 },
1392 )
1393 })
1394 .collect();
1395
1396 let streaming_job = StreamingJob::Sink(sink);
1397
1398 let tmp_sink_id = self
1399 .metadata_manager
1400 .catalog_controller
1401 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1402 .await?
1403 .as_sink_id();
1404 let StreamingJob::Sink(sink) = streaming_job else {
1405 unreachable!()
1406 };
1407
1408 sinks.push(AutoRefreshSchemaSinkContext {
1409 tmp_sink_id,
1410 original_sink: sink,
1411 original_fragment: original_sink_fragment,
1412 new_schema,
1413 newly_add_fields: newly_added_columns
1414 .iter()
1415 .map(|col| Field::from(&col.column_desc))
1416 .collect(),
1417 new_fragment: new_sink_fragment,
1418 new_log_store_table,
1419 actor_status,
1420 });
1421 }
1422 Some(sinks)
1423 } else {
1424 None
1425 }
1426 } else {
1427 None
1428 };
1429
1430 let tmp_id = self
1431 .metadata_manager
1432 .catalog_controller
1433 .create_job_catalog_for_replace(
1434 &streaming_job,
1435 Some(&ctx),
1436 fragment_graph.specified_parallelism().as_ref(),
1437 Some(fragment_graph.max_parallelism()),
1438 )
1439 .await?;
1440
1441 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1442 sinks
1443 .iter()
1444 .map(|sink| sink.tmp_sink_id.as_object_id())
1445 .collect_vec()
1446 });
1447
1448 tracing::debug!(id = %job_id, "building replace streaming job");
1449 let mut updated_sink_catalogs = vec![];
1450
1451 let mut drop_table_connector_ctx = None;
1452 let result: MetaResult<_> = try {
1453 let (mut ctx, mut stream_job_fragments) = self
1454 .build_replace_job(
1455 ctx,
1456 &streaming_job,
1457 fragment_graph,
1458 tmp_id,
1459 auto_refresh_schema_sinks,
1460 )
1461 .await?;
1462 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1463 let auto_refresh_schema_sink_finish_ctx =
1464 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1465 sinks
1466 .iter()
1467 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1468 tmp_sink_id: sink.tmp_sink_id,
1469 original_sink_id: sink.original_sink.id,
1470 columns: sink.new_schema.clone(),
1471 new_log_store_table: sink
1472 .new_log_store_table
1473 .as_ref()
1474 .map(|table| (table.id, table.columns.clone())),
1475 })
1476 .collect()
1477 });
1478
1479 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1481 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1482 let upstream_infos = self
1483 .metadata_manager
1484 .catalog_controller
1485 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1486 .await?;
1487 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1488
1489 for upstream_info in &upstream_infos {
1490 let upstream_fragment_id = upstream_info.sink_fragment_id;
1491 ctx.upstream_fragment_downstreams
1492 .entry(upstream_fragment_id)
1493 .or_default()
1494 .push(upstream_info.new_sink_downstream.clone());
1495 if upstream_info.sink_original_target_columns.is_empty() {
1496 updated_sink_catalogs.push(upstream_info.sink_id);
1497 }
1498 }
1499 }
1500
1501 let replace_upstream = ctx.replace_upstream.clone();
1502
1503 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1504 let empty_downstreams = FragmentDownstreamRelation::default();
1505 for sink in sinks {
1506 self.metadata_manager
1507 .catalog_controller
1508 .prepare_streaming_job(
1509 sink.tmp_sink_id.as_job_id(),
1510 || [&sink.new_fragment].into_iter(),
1511 &empty_downstreams,
1512 true,
1513 None,
1514 )
1515 .await?;
1516 }
1517 }
1518
1519 self.metadata_manager
1520 .catalog_controller
1521 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1522 .await?;
1523
1524 self.stream_manager
1525 .replace_stream_job(stream_job_fragments, ctx)
1526 .await?;
1527 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1528 };
1529
1530 match result {
1531 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1532 let version = self
1533 .metadata_manager
1534 .catalog_controller
1535 .finish_replace_streaming_job(
1536 tmp_id,
1537 streaming_job,
1538 replace_upstream,
1539 SinkIntoTableContext {
1540 updated_sink_catalogs,
1541 },
1542 drop_table_connector_ctx.as_ref(),
1543 auto_refresh_schema_sink_finish_ctx,
1544 )
1545 .await?;
1546 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1547 self.source_manager
1548 .apply_source_change(SourceChange::DropSource {
1549 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1550 })
1551 .await;
1552 }
1553 Ok(version)
1554 }
1555 Err(err) => {
1556 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1557 let _ = self.metadata_manager
1558 .catalog_controller
1559 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1560 .await.inspect_err(|err| {
1561 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1562 });
1563 Err(err)
1564 }
1565 }
1566 }
1567
1568 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1569 )]
1570 async fn drop_streaming_job(
1571 &self,
1572 job_id: StreamingJobId,
1573 drop_mode: DropMode,
1574 ) -> MetaResult<NotificationVersion> {
1575 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1576
1577 let (object_id, object_type) = match job_id {
1578 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1579 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1580 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1581 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1582 };
1583
1584 let job_status = self
1585 .metadata_manager
1586 .catalog_controller
1587 .get_streaming_job_status(job_id.id())
1588 .await?;
1589 let version = match job_status {
1590 JobStatus::Initial => {
1591 unreachable!(
1592 "Job with Initial status should not notify frontend and therefore should not arrive here"
1593 );
1594 }
1595 JobStatus::Creating => {
1596 self.stream_manager
1597 .cancel_streaming_jobs(vec![job_id.id()])
1598 .await?;
1599 IGNORED_NOTIFICATION_VERSION
1600 }
1601 JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1602 };
1603
1604 Ok(version)
1605 }
1606
1607 fn resolve_stream_parallelism(
1611 &self,
1612 specified: Option<NonZeroUsize>,
1613 max: NonZeroUsize,
1614 cluster_info: &StreamingClusterInfo,
1615 resource_group: String,
1616 ) -> MetaResult<NonZeroUsize> {
1617 let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1618 DdlController::resolve_stream_parallelism_inner(
1619 specified,
1620 max,
1621 available,
1622 &self.env.opts.default_parallelism,
1623 &resource_group,
1624 )
1625 }
1626
1627 fn resolve_stream_parallelism_inner(
1628 specified: Option<NonZeroUsize>,
1629 max: NonZeroUsize,
1630 available: Option<NonZeroUsize>,
1631 default_parallelism: &DefaultParallelism,
1632 resource_group: &str,
1633 ) -> MetaResult<NonZeroUsize> {
1634 let Some(available) = available else {
1635 bail_unavailable!(
1636 "no available slots to schedule in resource group \"{}\", \
1637 have you allocated any compute nodes within this resource group?",
1638 resource_group
1639 );
1640 };
1641
1642 if let Some(specified) = specified {
1643 if specified > max {
1644 bail_invalid_parameter!(
1645 "specified parallelism {} should not exceed max parallelism {}",
1646 specified,
1647 max,
1648 );
1649 }
1650 if specified > available {
1651 tracing::warn!(
1652 resource_group,
1653 specified_parallelism = specified.get(),
1654 available_parallelism = available.get(),
1655 "specified parallelism exceeds available slots, scheduling with specified value",
1656 );
1657 }
1658 return Ok(specified);
1659 }
1660
1661 let default_parallelism = match default_parallelism {
1663 DefaultParallelism::Full => available,
1664 DefaultParallelism::Default(num) => {
1665 if *num > available {
1666 tracing::warn!(
1667 resource_group,
1668 configured_parallelism = num.get(),
1669 available_parallelism = available.get(),
1670 "default parallelism exceeds available slots, scheduling with configured value",
1671 );
1672 }
1673 *num
1674 }
1675 };
1676
1677 if default_parallelism > max {
1678 tracing::warn!(
1679 max_parallelism = max.get(),
1680 resource_group,
1681 "default parallelism exceeds max parallelism, capping to max",
1682 );
1683 }
1684 Ok(default_parallelism.min(max))
1685 }
1686
1687 #[await_tree::instrument]
1693 pub(crate) async fn build_stream_job(
1694 &self,
1695 stream_ctx: StreamContext,
1696 mut stream_job: StreamingJob,
1697 fragment_graph: StreamFragmentGraph,
1698 specific_resource_group: Option<String>,
1699 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1700 let id = stream_job.id();
1701 let specified_parallelism = fragment_graph.specified_parallelism();
1702 let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1703 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1704
1705 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1707
1708 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1712 fragment_graph.collect_snapshot_backfill_info()?;
1713 assert!(
1714 snapshot_backfill_info
1715 .iter()
1716 .chain([&cross_db_snapshot_backfill_info])
1717 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1718 .all(|backfill_epoch| backfill_epoch.is_none()),
1719 "should not set backfill epoch when initially build the job: {:?} {:?}",
1720 snapshot_backfill_info,
1721 cross_db_snapshot_backfill_info
1722 );
1723
1724 let locality_fragment_state_table_mapping =
1725 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1726
1727 self.metadata_manager
1729 .catalog_controller
1730 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1731 .await?;
1732
1733 let upstream_table_ids = fragment_graph
1734 .dependent_table_ids()
1735 .iter()
1736 .filter(|id| {
1737 !cross_db_snapshot_backfill_info
1738 .upstream_mv_table_id_to_backfill_epoch
1739 .contains_key(id)
1740 })
1741 .cloned()
1742 .collect();
1743
1744 let (upstream_root_fragments, existing_actor_location) = self
1745 .metadata_manager
1746 .get_upstream_root_fragments(&upstream_table_ids)
1747 .await?;
1748
1749 if snapshot_backfill_info.is_some() {
1750 match stream_job {
1751 StreamingJob::MaterializedView(_)
1752 | StreamingJob::Sink(_)
1753 | StreamingJob::Index(_, _) => {}
1754 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1755 return Err(
1756 anyhow!("snapshot_backfill not enabled for table and source").into(),
1757 );
1758 }
1759 }
1760 }
1761
1762 let upstream_actors = upstream_root_fragments
1763 .values()
1764 .map(|(fragment, _)| {
1765 (
1766 fragment.fragment_id,
1767 fragment.actors.keys().copied().collect(),
1768 )
1769 })
1770 .collect();
1771
1772 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1773 fragment_graph,
1774 FragmentGraphUpstreamContext {
1775 upstream_root_fragments,
1776 upstream_actor_location: existing_actor_location,
1777 },
1778 (&stream_job).into(),
1779 )?;
1780 let resource_group = match specific_resource_group {
1781 None => {
1782 self.metadata_manager
1783 .get_database_resource_group(stream_job.database_id())
1784 .await?
1785 }
1786 Some(resource_group) => resource_group,
1787 };
1788
1789 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1791
1792 let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1793 let parallelism = self.resolve_stream_parallelism(
1794 initial_parallelism,
1795 max_parallelism,
1796 &cluster_info,
1797 resource_group.clone(),
1798 )?;
1799
1800 let parallelism = self
1801 .env
1802 .system_params_reader()
1803 .await
1804 .adaptive_parallelism_strategy()
1805 .compute_target_parallelism(parallelism.get());
1806
1807 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1808 let actor_graph_builder = ActorGraphBuilder::new(
1809 id,
1810 resource_group,
1811 complete_graph,
1812 cluster_info,
1813 parallelism,
1814 )?;
1815
1816 let ActorGraphBuildResult {
1817 graph,
1818 downstream_fragment_relations,
1819 building_locations,
1820 upstream_fragment_downstreams,
1821 new_no_shuffle,
1822 replace_upstream,
1823 ..
1824 } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1825 assert!(replace_upstream.is_empty());
1826
1827 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1834 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1835 _ => TableParallelism::Fixed(parallelism.get()),
1836 };
1837
1838 let stream_job_fragments = StreamJobFragments::new(
1839 id,
1840 graph,
1841 &building_locations.actor_locations,
1842 stream_ctx.clone(),
1843 table_parallelism,
1844 max_parallelism.get(),
1845 );
1846
1847 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1848 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1849 }
1850
1851 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1852 && let Ok(table_id) = sink.get_target_table()
1853 {
1854 let tables = self
1855 .metadata_manager
1856 .get_table_catalog_by_ids(&[*table_id])
1857 .await?;
1858 let target_table = tables
1859 .first()
1860 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1861 let sink_fragment = stream_job_fragments
1862 .sink_fragment()
1863 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1864 let mview_fragment_id = self
1865 .metadata_manager
1866 .catalog_controller
1867 .get_mview_fragment_by_id(table_id.as_job_id())
1868 .await?;
1869 let upstream_sink_info = build_upstream_sink_info(
1870 sink,
1871 sink_fragment.fragment_id as _,
1872 target_table,
1873 mview_fragment_id,
1874 )?;
1875 Some(upstream_sink_info)
1876 } else {
1877 None
1878 };
1879
1880 let mut cdc_table_snapshot_splits = None;
1881 if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1882 && let Some((_, stream_cdc_scan)) =
1883 parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1884 {
1885 {
1886 let splits = try_init_parallel_cdc_table_snapshot_splits(
1888 table.id,
1889 stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1890 self.env.meta_store_ref(),
1891 stream_cdc_scan.options.as_ref().unwrap(),
1892 self.env.opts.cdc_table_split_init_insert_batch_size,
1893 self.env.opts.cdc_table_split_init_sleep_interval_splits,
1894 self.env.opts.cdc_table_split_init_sleep_duration_millis,
1895 )
1896 .await?;
1897 cdc_table_snapshot_splits = Some(splits);
1898 }
1899 }
1900
1901 let ctx = CreateStreamingJobContext {
1902 upstream_fragment_downstreams,
1903 new_no_shuffle,
1904 upstream_actors,
1905 building_locations,
1906 definition: stream_job.definition(),
1907 create_type: stream_job.create_type(),
1908 job_type: (&stream_job).into(),
1909 streaming_job: stream_job,
1910 new_upstream_sink,
1911 option: CreateStreamingJobOption {},
1912 snapshot_backfill_info,
1913 cross_db_snapshot_backfill_info,
1914 fragment_backfill_ordering,
1915 locality_fragment_state_table_mapping,
1916 cdc_table_snapshot_splits,
1917 };
1918
1919 Ok((
1920 ctx,
1921 StreamJobFragmentsToCreate {
1922 inner: stream_job_fragments,
1923 downstreams: downstream_fragment_relations,
1924 },
1925 ))
1926 }
1927
1928 pub(crate) async fn build_replace_job(
1934 &self,
1935 stream_ctx: StreamContext,
1936 stream_job: &StreamingJob,
1937 mut fragment_graph: StreamFragmentGraph,
1938 tmp_job_id: JobId,
1939 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1940 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1941 match &stream_job {
1942 StreamingJob::Table(..)
1943 | StreamingJob::Source(..)
1944 | StreamingJob::MaterializedView(..) => {}
1945 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1946 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1947 }
1948 }
1949
1950 let id = stream_job.id();
1951
1952 let mut drop_table_associated_source_id = None;
1954 if let StreamingJob::Table(None, _, _) = &stream_job {
1955 drop_table_associated_source_id = self
1956 .metadata_manager
1957 .get_table_associated_source_id(id.as_mv_table_id())
1958 .await?;
1959 }
1960
1961 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1962 let old_internal_table_ids = old_fragments.internal_table_ids();
1963
1964 let mut drop_table_connector_ctx = None;
1966 if let Some(to_remove_source_id) = drop_table_associated_source_id {
1967 debug_assert!(old_internal_table_ids.len() == 1);
1969
1970 drop_table_connector_ctx = Some(DropTableConnectorContext {
1971 to_change_streaming_job_id: id,
1974 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
1976 });
1977 } else if stream_job.is_materialized_view() {
1978 let old_fragments_upstreams = self
1981 .metadata_manager
1982 .catalog_controller
1983 .upstream_fragments(old_fragments.fragment_ids())
1984 .await?;
1985
1986 let old_state_graph =
1987 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1988 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1989 let result = state_match::match_graph(&new_state_graph, &old_state_graph)
1990 .context("incompatible altering on the streaming job states")?;
1991
1992 fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
1993 fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
1994 } else {
1995 let old_internal_tables = self
1998 .metadata_manager
1999 .get_table_catalog_by_ids(&old_internal_table_ids)
2000 .await?;
2001 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2002 }
2003
2004 let original_root_fragment = old_fragments
2007 .root_fragment()
2008 .expect("root fragment not found");
2009
2010 let job_type = StreamingJobType::from(stream_job);
2011
2012 let (mut downstream_fragments, mut downstream_actor_location) =
2014 self.metadata_manager.get_downstream_fragments(id).await?;
2015
2016 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2017 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2018 .iter()
2019 .map(|sink| sink.original_fragment.fragment_id)
2020 .collect();
2021 for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2022 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2023 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2024 }) {
2025 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2026 for actor_id in downstream_fragment.actors.keys() {
2027 downstream_actor_location.remove(actor_id);
2028 }
2029 for (actor_id, status) in &sink.actor_status {
2030 downstream_actor_location
2031 .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2032 }
2033
2034 *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2035 *nodes = sink.new_fragment.nodes.clone();
2036 }
2037 }
2038 assert!(remaining_fragment.is_empty());
2039 }
2040
2041 let complete_graph = match &job_type {
2043 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2044 CompleteStreamFragmentGraph::with_downstreams(
2045 fragment_graph,
2046 FragmentGraphDownstreamContext {
2047 original_root_fragment_id: original_root_fragment.fragment_id,
2048 downstream_fragments,
2049 downstream_actor_location,
2050 },
2051 job_type,
2052 )?
2053 }
2054 StreamingJobType::Table(TableJobType::SharedCdcSource)
2055 | StreamingJobType::MaterializedView => {
2056 let (upstream_root_fragments, upstream_actor_location) = self
2058 .metadata_manager
2059 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2060 .await?;
2061
2062 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2063 fragment_graph,
2064 FragmentGraphUpstreamContext {
2065 upstream_root_fragments,
2066 upstream_actor_location,
2067 },
2068 FragmentGraphDownstreamContext {
2069 original_root_fragment_id: original_root_fragment.fragment_id,
2070 downstream_fragments,
2071 downstream_actor_location,
2072 },
2073 job_type,
2074 )?
2075 }
2076 _ => unreachable!(),
2077 };
2078
2079 let resource_group = self
2080 .metadata_manager
2081 .get_existing_job_resource_group(id)
2082 .await?;
2083
2084 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2086
2087 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2090 .expect("The number of actors in the original table fragment should be greater than 0");
2091
2092 let actor_graph_builder = ActorGraphBuilder::new(
2093 id,
2094 resource_group,
2095 complete_graph,
2096 cluster_info,
2097 parallelism,
2098 )?;
2099
2100 let ActorGraphBuildResult {
2101 graph,
2102 downstream_fragment_relations,
2103 building_locations,
2104 upstream_fragment_downstreams,
2105 mut replace_upstream,
2106 new_no_shuffle,
2107 ..
2108 } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2109
2110 if matches!(
2112 job_type,
2113 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2114 ) {
2115 assert!(upstream_fragment_downstreams.is_empty());
2116 }
2117
2118 let stream_job_fragments = StreamJobFragments::new(
2122 tmp_job_id,
2123 graph,
2124 &building_locations.actor_locations,
2125 stream_ctx,
2126 old_fragments.assigned_parallelism,
2127 old_fragments.max_parallelism,
2128 );
2129
2130 if let Some(sinks) = &auto_refresh_schema_sinks {
2131 for sink in sinks {
2132 replace_upstream
2133 .remove(&sink.new_fragment.fragment_id)
2134 .expect("should exist");
2135 }
2136 }
2137
2138 let ctx = ReplaceStreamJobContext {
2142 old_fragments,
2143 replace_upstream,
2144 new_no_shuffle,
2145 upstream_fragment_downstreams,
2146 building_locations,
2147 streaming_job: stream_job.clone(),
2148 tmp_id: tmp_job_id,
2149 drop_table_connector_ctx,
2150 auto_refresh_schema_sinks,
2151 };
2152
2153 Ok((
2154 ctx,
2155 StreamJobFragmentsToCreate {
2156 inner: stream_job_fragments,
2157 downstreams: downstream_fragment_relations,
2158 },
2159 ))
2160 }
2161
2162 async fn alter_name(
2163 &self,
2164 relation: alter_name_request::Object,
2165 new_name: &str,
2166 ) -> MetaResult<NotificationVersion> {
2167 let (obj_type, id) = match relation {
2168 alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2169 alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2170 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2171 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2172 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2173 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2174 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2175 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2176 };
2177 self.metadata_manager
2178 .catalog_controller
2179 .alter_name(obj_type, id, new_name)
2180 .await
2181 }
2182
2183 async fn alter_swap_rename(
2184 &self,
2185 object: alter_swap_rename_request::Object,
2186 ) -> MetaResult<NotificationVersion> {
2187 let (obj_type, src_id, dst_id) = match object {
2188 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2189 alter_swap_rename_request::Object::Table(objs) => {
2190 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2191 (ObjectType::Table, src_id, dst_id)
2192 }
2193 alter_swap_rename_request::Object::View(objs) => {
2194 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2195 (ObjectType::View, src_id, dst_id)
2196 }
2197 alter_swap_rename_request::Object::Source(objs) => {
2198 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2199 (ObjectType::Source, src_id, dst_id)
2200 }
2201 alter_swap_rename_request::Object::Sink(objs) => {
2202 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2203 (ObjectType::Sink, src_id, dst_id)
2204 }
2205 alter_swap_rename_request::Object::Subscription(objs) => {
2206 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2207 (ObjectType::Subscription, src_id, dst_id)
2208 }
2209 };
2210
2211 self.metadata_manager
2212 .catalog_controller
2213 .alter_swap_rename(obj_type, src_id, dst_id)
2214 .await
2215 }
2216
2217 async fn alter_owner(
2218 &self,
2219 object: Object,
2220 owner_id: UserId,
2221 ) -> MetaResult<NotificationVersion> {
2222 let (obj_type, id) = match object {
2223 Object::TableId(id) => (ObjectType::Table, id),
2224 Object::ViewId(id) => (ObjectType::View, id),
2225 Object::SourceId(id) => (ObjectType::Source, id),
2226 Object::SinkId(id) => (ObjectType::Sink, id),
2227 Object::SchemaId(id) => (ObjectType::Schema, id),
2228 Object::DatabaseId(id) => (ObjectType::Database, id),
2229 Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2230 Object::ConnectionId(id) => (ObjectType::Connection, id),
2231 };
2232 self.metadata_manager
2233 .catalog_controller
2234 .alter_owner(obj_type, id.into(), owner_id as _)
2235 .await
2236 }
2237
2238 async fn alter_set_schema(
2239 &self,
2240 object: alter_set_schema_request::Object,
2241 new_schema_id: SchemaId,
2242 ) -> MetaResult<NotificationVersion> {
2243 let (obj_type, id) = match object {
2244 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2245 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2246 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2247 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2248 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2249 alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2250 alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2251 };
2252 self.metadata_manager
2253 .catalog_controller
2254 .alter_schema(obj_type, id.into(), new_schema_id as _)
2255 .await
2256 }
2257
2258 pub async fn wait(&self) -> MetaResult<()> {
2259 let timeout_ms = 30 * 60 * 1000;
2260 for _ in 0..timeout_ms {
2261 if self
2262 .metadata_manager
2263 .catalog_controller
2264 .list_background_creating_jobs(true, None)
2265 .await?
2266 .is_empty()
2267 {
2268 return Ok(());
2269 }
2270
2271 sleep(Duration::from_millis(1)).await;
2272 }
2273 Err(MetaError::cancelled(format!(
2274 "timeout after {timeout_ms}ms"
2275 )))
2276 }
2277
2278 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2279 self.metadata_manager
2280 .catalog_controller
2281 .comment_on(comment)
2282 .await
2283 }
2284
2285 async fn alter_streaming_job_config(
2286 &self,
2287 job_id: JobId,
2288 entries_to_add: HashMap<String, String>,
2289 keys_to_remove: Vec<String>,
2290 ) -> MetaResult<NotificationVersion> {
2291 self.metadata_manager
2292 .catalog_controller
2293 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2294 .await
2295 }
2296}
2297
2298fn report_create_object(
2299 job_id: JobId,
2300 event_name: &str,
2301 obj_type: PbTelemetryDatabaseObject,
2302 connector_name: Option<String>,
2303 attr_info: Option<jsonbb::Value>,
2304) {
2305 report_event(
2306 PbTelemetryEventStage::CreateStreamJob,
2307 event_name,
2308 job_id.as_raw_id() as _,
2309 connector_name,
2310 Some(obj_type),
2311 attr_info,
2312 );
2313}
2314
2315pub fn build_upstream_sink_info(
2316 sink: &PbSink,
2317 sink_fragment_id: FragmentId,
2318 target_table: &PbTable,
2319 target_fragment_id: FragmentId,
2320) -> MetaResult<UpstreamSinkInfo> {
2321 let sink_columns = if !sink.original_target_columns.is_empty() {
2322 sink.original_target_columns.clone()
2323 } else {
2324 target_table.columns.clone()
2329 };
2330
2331 let sink_output_fields = sink_columns
2332 .iter()
2333 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2334 .collect_vec();
2335 let output_indices = (0..sink_output_fields.len())
2336 .map(|i| i as u32)
2337 .collect_vec();
2338
2339 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2340 let sink_idx_by_col_id = sink_columns
2341 .iter()
2342 .enumerate()
2343 .map(|(idx, col)| {
2344 let column_id = col.column_desc.as_ref().unwrap().column_id;
2345 (column_id, idx as u32)
2346 })
2347 .collect::<HashMap<_, _>>();
2348 target_table
2349 .distribution_key
2350 .iter()
2351 .map(|dist_idx| {
2352 let column_id = target_table.columns[*dist_idx as usize]
2353 .column_desc
2354 .as_ref()
2355 .unwrap()
2356 .column_id;
2357 let sink_idx = sink_idx_by_col_id
2358 .get(&column_id)
2359 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2360 Ok(*sink_idx)
2361 })
2362 .collect::<anyhow::Result<Vec<_>>>()?
2363 };
2364 let dist_key_indices =
2365 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2366 let downstream_fragment_id = target_fragment_id as _;
2367 let new_downstream_relation = DownstreamFragmentRelation {
2368 downstream_fragment_id,
2369 dispatcher_type: DispatcherType::Hash,
2370 dist_key_indices,
2371 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2372 };
2373 let current_target_columns = target_table.get_columns();
2374 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2375 Ok(UpstreamSinkInfo {
2376 sink_id: sink.id,
2377 sink_fragment_id: sink_fragment_id as _,
2378 sink_output_fields,
2379 sink_original_target_columns: sink.get_original_target_columns().clone(),
2380 project_exprs,
2381 new_sink_downstream: new_downstream_relation,
2382 })
2383}
2384
2385pub fn refill_upstream_sink_union_in_table(
2386 union_fragment_root: &mut PbStreamNode,
2387 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2388) {
2389 visit_stream_node_cont_mut(union_fragment_root, |node| {
2390 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2391 let init_upstreams = upstream_sink_infos
2392 .iter()
2393 .map(|info| PbUpstreamSinkInfo {
2394 upstream_fragment_id: info.sink_fragment_id,
2395 sink_output_schema: info.sink_output_fields.clone(),
2396 project_exprs: info.project_exprs.clone(),
2397 })
2398 .collect();
2399 upstream_sink_union.init_upstreams = init_upstreams;
2400 false
2401 } else {
2402 true
2403 }
2404 });
2405}
2406
2407#[cfg(test)]
2408mod tests {
2409 use std::num::NonZeroUsize;
2410
2411 use super::*;
2412
2413 #[test]
2414 fn test_specified_parallelism_exceeds_available() {
2415 let result = DdlController::resolve_stream_parallelism_inner(
2416 Some(NonZeroUsize::new(100).unwrap()),
2417 NonZeroUsize::new(256).unwrap(),
2418 Some(NonZeroUsize::new(4).unwrap()),
2419 &DefaultParallelism::Full,
2420 "default",
2421 )
2422 .unwrap();
2423 assert_eq!(result.get(), 100);
2424 }
2425
2426 #[test]
2427 fn test_allows_default_parallelism_over_available() {
2428 let result = DdlController::resolve_stream_parallelism_inner(
2429 None,
2430 NonZeroUsize::new(256).unwrap(),
2431 Some(NonZeroUsize::new(4).unwrap()),
2432 &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2433 "default",
2434 )
2435 .unwrap();
2436 assert_eq!(result.get(), 50);
2437 }
2438
2439 #[test]
2440 fn test_full_parallelism_capped_by_max() {
2441 let result = DdlController::resolve_stream_parallelism_inner(
2442 None,
2443 NonZeroUsize::new(6).unwrap(),
2444 Some(NonZeroUsize::new(10).unwrap()),
2445 &DefaultParallelism::Full,
2446 "default",
2447 )
2448 .unwrap();
2449 assert_eq!(result.get(), 6);
2450 }
2451
2452 #[test]
2453 fn test_no_available_slots_returns_error() {
2454 let result = DdlController::resolve_stream_parallelism_inner(
2455 None,
2456 NonZeroUsize::new(4).unwrap(),
2457 None,
2458 &DefaultParallelism::Full,
2459 "default",
2460 );
2461 assert!(matches!(
2462 result,
2463 Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2464 ));
2465 }
2466
2467 #[test]
2468 fn test_specified_over_max_returns_error() {
2469 let result = DdlController::resolve_stream_parallelism_inner(
2470 Some(NonZeroUsize::new(8).unwrap()),
2471 NonZeroUsize::new(4).unwrap(),
2472 Some(NonZeroUsize::new(10).unwrap()),
2473 &DefaultParallelism::Full,
2474 "default",
2475 );
2476 assert!(matches!(
2477 result,
2478 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2479 ));
2480 }
2481}