1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50 Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
51 Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57 alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64 StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::{OwnedSemaphorePermit, Semaphore};
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::cluster::StreamingClusterInfo;
76use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
77use crate::controller::utils::build_select_node_list;
78use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
79use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
80use crate::manager::sink_coordination::SinkCoordinatorManager;
81use crate::manager::{
82 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
83 NotificationVersion, StreamingJob, StreamingJobType,
84};
85use crate::model::{
86 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
87 FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
88 TableParallelism,
89};
90use crate::stream::cdc::{
91 parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
92};
93use crate::stream::{
94 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
95 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
96 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
97 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
98 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
99 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
100};
101use crate::telemetry::report_event;
102use crate::{MetaError, MetaResult};
103
104#[derive(PartialEq)]
105pub enum DropMode {
106 Restrict,
107 Cascade,
108}
109
110impl DropMode {
111 pub fn from_request_setting(cascade: bool) -> DropMode {
112 if cascade {
113 DropMode::Cascade
114 } else {
115 DropMode::Restrict
116 }
117 }
118}
119
120#[derive(strum::AsRefStr)]
121pub enum StreamingJobId {
122 MaterializedView(TableId),
123 Sink(SinkId),
124 Table(Option<SourceId>, TableId),
125 Index(IndexId),
126}
127
128impl std::fmt::Display for StreamingJobId {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 write!(f, "{}", self.as_ref())?;
131 write!(f, "({})", self.id())
132 }
133}
134
135impl StreamingJobId {
136 fn id(&self) -> JobId {
137 match self {
138 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
139 StreamingJobId::Index(id) => id.as_job_id(),
140 StreamingJobId::Sink(id) => id.as_job_id(),
141 }
142 }
143}
144
145pub struct ReplaceStreamJobInfo {
148 pub streaming_job: StreamingJob,
149 pub fragment_graph: StreamFragmentGraphProto,
150}
151
152#[derive(Display)]
153pub enum DdlCommand {
154 CreateDatabase(Database),
155 DropDatabase(DatabaseId),
156 CreateSchema(Schema),
157 DropSchema(SchemaId, DropMode),
158 CreateNonSharedSource(Source),
159 DropSource(SourceId, DropMode),
160 ResetSource(SourceId),
161 CreateFunction(Function),
162 DropFunction(FunctionId, DropMode),
163 CreateView(View, HashSet<ObjectId>),
164 DropView(ViewId, DropMode),
165 CreateStreamingJob {
166 stream_job: StreamingJob,
167 fragment_graph: StreamFragmentGraphProto,
168 dependencies: HashSet<ObjectId>,
169 resource_type: streaming_job_resource_type::ResourceType,
170 if_not_exists: bool,
171 },
172 DropStreamingJob {
173 job_id: StreamingJobId,
174 drop_mode: DropMode,
175 },
176 AlterName(alter_name_request::Object, String),
177 AlterSwapRename(alter_swap_rename_request::Object),
178 ReplaceStreamJob(ReplaceStreamJobInfo),
179 AlterNonSharedSource(Source),
180 AlterObjectOwner(Object, UserId),
181 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
182 CreateConnection(Connection),
183 DropConnection(ConnectionId, DropMode),
184 CreateSecret(Secret),
185 AlterSecret(Secret),
186 DropSecret(SecretId, DropMode),
187 CommentOn(Comment),
188 CreateSubscription(Subscription),
189 DropSubscription(SubscriptionId, DropMode),
190 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
191 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
192}
193
194impl DdlCommand {
195 fn object(&self) -> Either<String, ObjectId> {
197 use Either::*;
198 match self {
199 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
200 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
201 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
202 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
203 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
204 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
205 DdlCommand::ResetSource(id) => Right(id.as_object_id()),
206 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
207 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
208 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
209 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
210 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
211 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
212 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
213 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
214 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
215 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
216 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
217 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
218 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
219 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
220 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
221 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
222 DdlCommand::DropSecret(id, _) => Right(id.as_object_id()),
223 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
224 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
225 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
226 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
227 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
228 }
229 }
230
231 fn allow_in_recovery(&self) -> bool {
232 match self {
233 DdlCommand::DropDatabase(_)
234 | DdlCommand::DropSchema(_, _)
235 | DdlCommand::DropSource(_, _)
236 | DdlCommand::DropFunction(_, _)
237 | DdlCommand::DropView(_, _)
238 | DdlCommand::DropStreamingJob { .. }
239 | DdlCommand::DropConnection(_, _)
240 | DdlCommand::DropSecret(_, _)
241 | DdlCommand::DropSubscription(_, _)
242 | DdlCommand::AlterName(_, _)
243 | DdlCommand::AlterObjectOwner(_, _)
244 | DdlCommand::AlterSetSchema(_, _)
245 | DdlCommand::CreateDatabase(_)
246 | DdlCommand::CreateSchema(_)
247 | DdlCommand::CreateFunction(_)
248 | DdlCommand::CreateView(_, _)
249 | DdlCommand::CreateConnection(_)
250 | DdlCommand::CommentOn(_)
251 | DdlCommand::CreateSecret(_)
252 | DdlCommand::AlterSecret(_)
253 | DdlCommand::AlterSwapRename(_)
254 | DdlCommand::AlterDatabaseParam(_, _)
255 | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
256 DdlCommand::CreateStreamingJob { .. }
257 | DdlCommand::CreateNonSharedSource(_)
258 | DdlCommand::ReplaceStreamJob(_)
259 | DdlCommand::AlterNonSharedSource(_)
260 | DdlCommand::ResetSource(_)
261 | DdlCommand::CreateSubscription(_) => false,
262 }
263 }
264}
265
266#[derive(Clone)]
267pub struct DdlController {
268 pub(crate) env: MetaSrvEnv,
269
270 pub(crate) metadata_manager: MetadataManager,
271 pub(crate) stream_manager: GlobalStreamManagerRef,
272 pub(crate) source_manager: SourceManagerRef,
273 barrier_manager: BarrierManagerRef,
274 sink_manager: SinkCoordinatorManager,
275 iceberg_compaction_manager: IcebergCompactionManagerRef,
276
277 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
279
280 seq: Arc<AtomicU64>,
282}
283
284#[derive(Clone)]
285pub struct CreatingStreamingJobPermit {
286 pub(crate) semaphore: Arc<Semaphore>,
287}
288
289impl CreatingStreamingJobPermit {
290 async fn new(env: &MetaSrvEnv) -> Self {
291 let mut permits = env
292 .system_params_reader()
293 .await
294 .max_concurrent_creating_streaming_jobs() as usize;
295 if permits == 0 {
296 permits = Semaphore::MAX_PERMITS;
298 }
299 let semaphore = Arc::new(Semaphore::new(permits));
300
301 let (local_notification_tx, mut local_notification_rx) =
302 tokio::sync::mpsc::unbounded_channel();
303 env.notification_manager()
304 .insert_local_sender(local_notification_tx);
305 let semaphore_clone = semaphore.clone();
306 tokio::spawn(async move {
307 while let Some(notification) = local_notification_rx.recv().await {
308 let LocalNotification::SystemParamsChange(p) = ¬ification else {
309 continue;
310 };
311 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
312 if new_permits == 0 {
313 new_permits = Semaphore::MAX_PERMITS;
314 }
315 match permits.cmp(&new_permits) {
316 Ordering::Less => {
317 semaphore_clone.add_permits(new_permits - permits);
318 }
319 Ordering::Equal => continue,
320 Ordering::Greater => {
321 let to_release = permits - new_permits;
322 let reduced = semaphore_clone.forget_permits(to_release);
323 if reduced != to_release {
325 tracing::warn!(
326 "no enough permits to release, expected {}, but reduced {}",
327 to_release,
328 reduced
329 );
330 }
331 }
332 }
333 tracing::info!(
334 "max_concurrent_creating_streaming_jobs changed from {} to {}",
335 permits,
336 new_permits
337 );
338 permits = new_permits;
339 }
340 });
341
342 Self { semaphore }
343 }
344}
345
346impl DdlController {
347 pub async fn new(
348 env: MetaSrvEnv,
349 metadata_manager: MetadataManager,
350 stream_manager: GlobalStreamManagerRef,
351 source_manager: SourceManagerRef,
352 barrier_manager: BarrierManagerRef,
353 sink_manager: SinkCoordinatorManager,
354 iceberg_compaction_manager: IcebergCompactionManagerRef,
355 ) -> Self {
356 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
357 Self {
358 env,
359 metadata_manager,
360 stream_manager,
361 source_manager,
362 barrier_manager,
363 sink_manager,
364 iceberg_compaction_manager,
365 creating_streaming_job_permits,
366 seq: Arc::new(AtomicU64::new(0)),
367 }
368 }
369
370 pub fn next_seq(&self) -> u64 {
372 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
374 }
375
376 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
383 if !command.allow_in_recovery() {
384 self.barrier_manager.check_status_running()?;
385 }
386
387 let await_tree_key = format!("DDL Command {}", self.next_seq());
388 let await_tree_span = await_tree::span!("{command}({})", command.object());
389
390 let ctrl = self.clone();
391 let fut = async move {
392 match command {
393 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
394 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
395 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
396 DdlCommand::DropSchema(schema_id, drop_mode) => {
397 ctrl.drop_schema(schema_id, drop_mode).await
398 }
399 DdlCommand::CreateNonSharedSource(source) => {
400 ctrl.create_non_shared_source(source).await
401 }
402 DdlCommand::DropSource(source_id, drop_mode) => {
403 ctrl.drop_source(source_id, drop_mode).await
404 }
405 DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
406 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
407 DdlCommand::DropFunction(function_id, drop_mode) => {
408 ctrl.drop_function(function_id, drop_mode).await
409 }
410 DdlCommand::CreateView(view, dependencies) => {
411 ctrl.create_view(view, dependencies).await
412 }
413 DdlCommand::DropView(view_id, drop_mode) => {
414 ctrl.drop_view(view_id, drop_mode).await
415 }
416 DdlCommand::CreateStreamingJob {
417 stream_job,
418 fragment_graph,
419 dependencies,
420 resource_type,
421 if_not_exists,
422 } => {
423 ctrl.create_streaming_job(
424 stream_job,
425 fragment_graph,
426 dependencies,
427 resource_type,
428 if_not_exists,
429 )
430 .await
431 }
432 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
433 ctrl.drop_streaming_job(job_id, drop_mode).await
434 }
435 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
436 streaming_job,
437 fragment_graph,
438 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
439 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
440 DdlCommand::AlterObjectOwner(object, owner_id) => {
441 ctrl.alter_owner(object, owner_id).await
442 }
443 DdlCommand::AlterSetSchema(object, new_schema_id) => {
444 ctrl.alter_set_schema(object, new_schema_id).await
445 }
446 DdlCommand::CreateConnection(connection) => {
447 ctrl.create_connection(connection).await
448 }
449 DdlCommand::DropConnection(connection_id, drop_mode) => {
450 ctrl.drop_connection(connection_id, drop_mode).await
451 }
452 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
453 DdlCommand::DropSecret(secret_id, drop_mode) => {
454 ctrl.drop_secret(secret_id, drop_mode).await
455 }
456 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
457 DdlCommand::AlterNonSharedSource(source) => {
458 ctrl.alter_non_shared_source(source).await
459 }
460 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
461 DdlCommand::CreateSubscription(subscription) => {
462 ctrl.create_subscription(subscription).await
463 }
464 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
465 ctrl.drop_subscription(subscription_id, drop_mode).await
466 }
467 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
468 DdlCommand::AlterDatabaseParam(database_id, param) => {
469 ctrl.alter_database_param(database_id, param).await
470 }
471 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
472 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
473 .await
474 }
475 }
476 }
477 .in_current_span();
478 let fut = (self.env.await_tree_reg())
479 .register(await_tree_key, await_tree_span)
480 .instrument(Box::pin(fut));
481 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
482 Ok(Some(WaitVersion {
483 catalog_version: notification_version,
484 hummock_version_id: self.barrier_manager.get_hummock_version_id().await,
485 }))
486 }
487
488 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
489 self.barrier_manager.get_ddl_progress().await
490 }
491
492 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
493 let (version, updated_db) = self
494 .metadata_manager
495 .catalog_controller
496 .create_database(database)
497 .await?;
498 self.barrier_manager
500 .update_database_barrier(
501 updated_db.database_id,
502 updated_db.barrier_interval_ms.map(|v| v as u32),
503 updated_db.checkpoint_frequency.map(|v| v as u64),
504 )
505 .await?;
506 Ok(version)
507 }
508
509 #[tracing::instrument(skip(self), level = "debug")]
510 pub async fn reschedule_streaming_job(
511 &self,
512 job_id: JobId,
513 target: ReschedulePolicy,
514 mut deferred: bool,
515 ) -> MetaResult<()> {
516 tracing::info!("altering parallelism for job {}", job_id);
517 if self.barrier_manager.check_status_running().is_err() {
518 tracing::info!(
519 "alter parallelism is set to deferred mode because the system is in recovery state"
520 );
521 deferred = true;
522 }
523
524 self.stream_manager
525 .reschedule_streaming_job(job_id, target, deferred)
526 .await
527 }
528
529 pub async fn reschedule_cdc_table_backfill(
530 &self,
531 job_id: JobId,
532 target: ReschedulePolicy,
533 ) -> MetaResult<()> {
534 tracing::info!("alter CDC table backfill parallelism");
535 if self.barrier_manager.check_status_running().is_err() {
536 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
537 }
538 self.stream_manager
539 .reschedule_cdc_table_backfill(job_id, target)
540 .await
541 }
542
543 pub async fn reschedule_fragments(
544 &self,
545 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
546 ) -> MetaResult<()> {
547 tracing::info!(
548 "altering parallelism for fragments {:?}",
549 fragment_targets.keys()
550 );
551 let fragment_targets = fragment_targets
552 .into_iter()
553 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
554 .collect();
555
556 self.stream_manager
557 .reschedule_fragments(fragment_targets)
558 .await
559 }
560
561 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
562 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
563 .await
564 }
565
566 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
567 self.metadata_manager
568 .catalog_controller
569 .create_schema(schema)
570 .await
571 }
572
573 async fn drop_schema(
574 &self,
575 schema_id: SchemaId,
576 drop_mode: DropMode,
577 ) -> MetaResult<NotificationVersion> {
578 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
579 .await
580 }
581
582 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
584 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
585 .await
586 .context("failed to create source worker")?;
587
588 let (source_id, version) = self
589 .metadata_manager
590 .catalog_controller
591 .create_source(source)
592 .await?;
593 self.source_manager
594 .register_source_with_handle(source_id, handle)
595 .await;
596 Ok(version)
597 }
598
599 async fn drop_source(
600 &self,
601 source_id: SourceId,
602 drop_mode: DropMode,
603 ) -> MetaResult<NotificationVersion> {
604 self.drop_object(ObjectType::Source, source_id, drop_mode)
605 .await
606 }
607
608 async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
609 tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
610
611 let database_id = self
613 .metadata_manager
614 .catalog_controller
615 .get_object_database_id(source_id)
616 .await?;
617
618 self.stream_manager
619 .barrier_scheduler
620 .run_command(database_id, Command::ResetSource { source_id })
621 .await?;
622
623 let version = self
625 .metadata_manager
626 .catalog_controller
627 .notify_frontend_trivial()
628 .await;
629 Ok(version)
630 }
631
632 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
635 self.metadata_manager
636 .catalog_controller
637 .alter_non_shared_source(source)
638 .await
639 }
640
641 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
642 self.metadata_manager
643 .catalog_controller
644 .create_function(function)
645 .await
646 }
647
648 async fn drop_function(
649 &self,
650 function_id: FunctionId,
651 drop_mode: DropMode,
652 ) -> MetaResult<NotificationVersion> {
653 self.drop_object(ObjectType::Function, function_id, drop_mode)
654 .await
655 }
656
657 async fn create_view(
658 &self,
659 view: View,
660 dependencies: HashSet<ObjectId>,
661 ) -> MetaResult<NotificationVersion> {
662 self.metadata_manager
663 .catalog_controller
664 .create_view(view, dependencies)
665 .await
666 }
667
668 async fn drop_view(
669 &self,
670 view_id: ViewId,
671 drop_mode: DropMode,
672 ) -> MetaResult<NotificationVersion> {
673 self.drop_object(ObjectType::View, view_id, drop_mode).await
674 }
675
676 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
677 validate_connection(&connection).await?;
678 self.metadata_manager
679 .catalog_controller
680 .create_connection(connection)
681 .await
682 }
683
684 async fn drop_connection(
685 &self,
686 connection_id: ConnectionId,
687 drop_mode: DropMode,
688 ) -> MetaResult<NotificationVersion> {
689 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
690 .await
691 }
692
693 async fn alter_database_param(
694 &self,
695 database_id: DatabaseId,
696 param: AlterDatabaseParam,
697 ) -> MetaResult<NotificationVersion> {
698 let (version, updated_db) = self
699 .metadata_manager
700 .catalog_controller
701 .alter_database_param(database_id, param)
702 .await?;
703 self.barrier_manager
705 .update_database_barrier(
706 database_id,
707 updated_db.barrier_interval_ms.map(|v| v as u32),
708 updated_db.checkpoint_frequency.map(|v| v as u64),
709 )
710 .await?;
711 Ok(version)
712 }
713
714 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
717 let secret_store_private_key = self
718 .env
719 .opts
720 .secret_store_private_key
721 .clone()
722 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
723
724 let encrypted_payload = SecretEncryption::encrypt(
725 secret_store_private_key.as_slice(),
726 secret.get_value().as_slice(),
727 )
728 .context(format!("failed to encrypt secret {}", secret.name))?;
729 Ok(encrypted_payload
730 .serialize()
731 .context(format!("failed to serialize secret {}", secret.name))?)
732 }
733
734 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
735 let secret_plain_payload = secret.value.clone();
738 let encrypted_payload = self.get_encrypted_payload(&secret)?;
739 secret.value = encrypted_payload;
740
741 self.metadata_manager
742 .catalog_controller
743 .create_secret(secret, secret_plain_payload)
744 .await
745 }
746
747 async fn drop_secret(
748 &self,
749 secret_id: SecretId,
750 drop_mode: DropMode,
751 ) -> MetaResult<NotificationVersion> {
752 self.drop_object(ObjectType::Secret, secret_id, drop_mode)
753 .await
754 }
755
756 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
757 let secret_plain_payload = secret.value.clone();
758 let encrypted_payload = self.get_encrypted_payload(&secret)?;
759 secret.value = encrypted_payload;
760 self.metadata_manager
761 .catalog_controller
762 .alter_secret(secret, secret_plain_payload)
763 .await
764 }
765
766 async fn create_subscription(
767 &self,
768 mut subscription: Subscription,
769 ) -> MetaResult<NotificationVersion> {
770 tracing::debug!("create subscription");
771 let _permit = self
772 .creating_streaming_job_permits
773 .semaphore
774 .acquire()
775 .await
776 .unwrap();
777 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
778 self.metadata_manager
779 .catalog_controller
780 .create_subscription_catalog(&mut subscription)
781 .await?;
782 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
783 tracing::debug!(error = %err.as_report(), "failed to create subscription");
784 let _ = self
785 .metadata_manager
786 .catalog_controller
787 .try_abort_creating_subscription(subscription.id)
788 .await
789 .inspect_err(|e| {
790 tracing::error!(
791 error = %e.as_report(),
792 "failed to abort create subscription after failure"
793 );
794 });
795 return Err(err);
796 }
797
798 let version = self
799 .metadata_manager
800 .catalog_controller
801 .notify_create_subscription(subscription.id)
802 .await?;
803 tracing::debug!("finish create subscription");
804 Ok(version)
805 }
806
807 async fn drop_subscription(
808 &self,
809 subscription_id: SubscriptionId,
810 drop_mode: DropMode,
811 ) -> MetaResult<NotificationVersion> {
812 tracing::debug!("preparing drop subscription");
813 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
814 let subscription = self
815 .metadata_manager
816 .catalog_controller
817 .get_subscription_by_id(subscription_id)
818 .await?;
819 let table_id = subscription.dependent_table_id;
820 let database_id = subscription.database_id;
821 let (_, version) = self
822 .metadata_manager
823 .catalog_controller
824 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
825 .await?;
826 self.stream_manager
827 .drop_subscription(database_id, subscription_id, table_id)
828 .await;
829 tracing::debug!("finish drop subscription");
830 Ok(version)
831 }
832
833 #[await_tree::instrument]
835 pub(crate) async fn validate_cdc_table(
836 &self,
837 table: &Table,
838 table_fragments: &StreamJobFragments,
839 ) -> MetaResult<()> {
840 let stream_scan_fragment = table_fragments
841 .fragments
842 .values()
843 .filter(|f| {
844 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
845 || f.fragment_type_mask
846 .contains(FragmentTypeFlag::StreamCdcScan)
847 })
848 .exactly_one()
849 .ok()
850 .with_context(|| {
851 format!(
852 "expect exactly one stream scan fragment, got: {:?}",
853 table_fragments.fragments
854 )
855 })?;
856 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
857 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
858 if let Some(o) = node.options
859 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
860 {
861 } else {
863 assert_eq!(
864 stream_scan_fragment.actors.len(),
865 1,
866 "Stream scan fragment should have only one actor"
867 );
868 }
869 }
870 }
871 let mut found_cdc_scan = false;
872 match &stream_scan_fragment.nodes.node_body {
873 Some(NodeBody::StreamCdcScan(_)) => {
874 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
875 if self
876 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
877 .await?
878 {
879 found_cdc_scan = true;
880 }
881 }
882 Some(NodeBody::Project(_)) => {
884 for input in &stream_scan_fragment.nodes.input {
885 assert_parallelism(stream_scan_fragment, &input.node_body);
886 if self
887 .validate_cdc_table_inner(&input.node_body, table.id)
888 .await?
889 {
890 found_cdc_scan = true;
891 }
892 }
893 }
894 _ => {
895 bail!("Unexpected node body for stream cdc scan");
896 }
897 };
898 if !found_cdc_scan {
899 bail!("No stream cdc scan node found in stream scan fragment");
900 }
901 Ok(())
902 }
903
904 async fn validate_cdc_table_inner(
905 &self,
906 node_body: &Option<NodeBody>,
907 table_id: TableId,
908 ) -> MetaResult<bool> {
909 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
910 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
911 {
912 let options_with_secret = WithOptionsSecResolved::new(
913 cdc_table_desc.connect_properties.clone(),
914 cdc_table_desc.secret_refs.clone(),
915 );
916
917 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
918 props.init_from_pb_cdc_table_desc(cdc_table_desc);
919
920 let _enumerator = props
922 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
923 .await?;
924
925 tracing::debug!(?table_id, "validate cdc table success");
926 Ok(true)
927 } else {
928 Ok(false)
929 }
930 }
931
932 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
933 let migrated = self
934 .metadata_manager
935 .catalog_controller
936 .has_table_been_migrated(table_id)
937 .await?;
938 if !migrated {
939 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
940 } else {
941 Ok(())
942 }
943 }
944
945 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
948 pub async fn create_streaming_job(
949 &self,
950 mut streaming_job: StreamingJob,
951 fragment_graph: StreamFragmentGraphProto,
952 dependencies: HashSet<ObjectId>,
953 resource_type: streaming_job_resource_type::ResourceType,
954 if_not_exists: bool,
955 ) -> MetaResult<NotificationVersion> {
956 if let StreamingJob::Sink(sink) = &streaming_job
957 && let Some(target_table) = sink.target_table
958 {
959 self.validate_table_for_sink(target_table).await?;
960 }
961 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
962 let check_ret = self
963 .metadata_manager
964 .catalog_controller
965 .create_job_catalog(
966 &mut streaming_job,
967 &ctx,
968 &fragment_graph.parallelism,
969 fragment_graph.max_parallelism as _,
970 dependencies,
971 resource_type.clone(),
972 &fragment_graph.backfill_parallelism,
973 )
974 .await;
975 if let Err(meta_err) = check_ret {
976 if !if_not_exists {
977 return Err(meta_err);
978 }
979 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
980 if streaming_job.create_type() == CreateType::Foreground {
981 let database_id = streaming_job.database_id();
982 self.metadata_manager
983 .wait_streaming_job_finished(database_id, *job_id)
984 .await
985 } else {
986 Ok(IGNORED_NOTIFICATION_VERSION)
987 }
988 } else {
989 Err(meta_err)
990 };
991 }
992 let job_id = streaming_job.id();
993 tracing::debug!(
994 id = %job_id,
995 definition = streaming_job.definition(),
996 create_type = streaming_job.create_type().as_str_name(),
997 job_type = ?streaming_job.job_type(),
998 "starting streaming job",
999 );
1000 let permit = self
1002 .creating_streaming_job_permits
1003 .semaphore
1004 .clone()
1005 .acquire_owned()
1006 .instrument_await("acquire_creating_streaming_job_permit")
1007 .await
1008 .unwrap();
1009 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1010
1011 let name = streaming_job.name();
1012 let definition = streaming_job.definition();
1013 let source_id = match &streaming_job {
1014 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1015 _ => None,
1016 };
1017
1018 match self
1020 .create_streaming_job_inner(ctx, streaming_job, fragment_graph, resource_type, permit)
1021 .await
1022 {
1023 Ok(version) => Ok(version),
1024 Err(err) => {
1025 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1026 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1027 id: job_id,
1028 name,
1029 definition,
1030 error: err.as_report().to_string(),
1031 };
1032 self.env.event_log_manager_ref().add_event_logs(vec![
1033 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1034 ]);
1035 let (aborted, _) = self
1036 .metadata_manager
1037 .catalog_controller
1038 .try_abort_creating_streaming_job(job_id, false)
1039 .await?;
1040 if aborted {
1041 tracing::warn!(id = %job_id, "aborted streaming job");
1042 if let Some(source_id) = source_id {
1044 self.source_manager
1045 .apply_source_change(SourceChange::DropSource {
1046 dropped_source_ids: vec![source_id],
1047 })
1048 .await;
1049 }
1050 }
1051 Err(err)
1052 }
1053 }
1054 }
1055
1056 #[await_tree::instrument(boxed)]
1057 async fn create_streaming_job_inner(
1058 &self,
1059 ctx: StreamContext,
1060 mut streaming_job: StreamingJob,
1061 fragment_graph: StreamFragmentGraphProto,
1062 resource_type: streaming_job_resource_type::ResourceType,
1063 permit: OwnedSemaphorePermit,
1064 ) -> MetaResult<NotificationVersion> {
1065 let mut fragment_graph =
1066 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1067 streaming_job.set_info_from_graph(&fragment_graph);
1068
1069 let incomplete_internal_tables = fragment_graph
1071 .incomplete_internal_tables()
1072 .into_values()
1073 .collect_vec();
1074 let table_id_map = self
1075 .metadata_manager
1076 .catalog_controller
1077 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1078 .await?;
1079 fragment_graph.refill_internal_table_ids(table_id_map);
1080
1081 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1083 let (ctx, stream_job_fragments) = self
1084 .build_stream_job(ctx, streaming_job, fragment_graph, resource_type)
1085 .await?;
1086
1087 let streaming_job = &ctx.streaming_job;
1088
1089 match streaming_job {
1090 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1091 self.validate_cdc_table(table, &stream_job_fragments)
1092 .await?;
1093 }
1094 StreamingJob::Table(Some(source), ..) => {
1095 self.source_manager.register_source(source).await?;
1097 let connector_name = source
1098 .get_with_properties()
1099 .get(UPSTREAM_SOURCE_KEY)
1100 .cloned();
1101 let attr = source.info.as_ref().map(|source_info| {
1102 jsonbb::json!({
1103 "format": source_info.format().as_str_name(),
1104 "encode": source_info.row_encode().as_str_name(),
1105 })
1106 });
1107 report_create_object(
1108 streaming_job.id(),
1109 "source",
1110 PbTelemetryDatabaseObject::Source,
1111 connector_name,
1112 attr,
1113 );
1114 }
1115 StreamingJob::Sink(sink) => {
1116 if sink.auto_refresh_schema_from_table.is_some() {
1117 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1118 }
1119 validate_sink(sink).await?;
1121 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1122 let attr = sink.format_desc.as_ref().map(|sink_info| {
1123 jsonbb::json!({
1124 "format": sink_info.format().as_str_name(),
1125 "encode": sink_info.encode().as_str_name(),
1126 })
1127 });
1128 report_create_object(
1129 streaming_job.id(),
1130 "sink",
1131 PbTelemetryDatabaseObject::Sink,
1132 connector_name,
1133 attr,
1134 );
1135 }
1136 StreamingJob::Source(source) => {
1137 self.source_manager.register_source(source).await?;
1139 let connector_name = source
1140 .get_with_properties()
1141 .get(UPSTREAM_SOURCE_KEY)
1142 .cloned();
1143 let attr = source.info.as_ref().map(|source_info| {
1144 jsonbb::json!({
1145 "format": source_info.format().as_str_name(),
1146 "encode": source_info.row_encode().as_str_name(),
1147 })
1148 });
1149 report_create_object(
1150 streaming_job.id(),
1151 "source",
1152 PbTelemetryDatabaseObject::Source,
1153 connector_name,
1154 attr,
1155 );
1156 }
1157 _ => {}
1158 }
1159
1160 let backfill_orders = ctx.fragment_backfill_ordering.to_meta_model();
1161 self.metadata_manager
1162 .catalog_controller
1163 .prepare_stream_job_fragments(
1164 &stream_job_fragments,
1165 streaming_job,
1166 false,
1167 Some(backfill_orders),
1168 )
1169 .await?;
1170
1171 let version = self
1173 .stream_manager
1174 .create_streaming_job(stream_job_fragments, ctx, permit)
1175 .await?;
1176
1177 Ok(version)
1178 }
1179
1180 pub async fn drop_object(
1182 &self,
1183 object_type: ObjectType,
1184 object_id: impl Into<ObjectId>,
1185 drop_mode: DropMode,
1186 ) -> MetaResult<NotificationVersion> {
1187 let object_id = object_id.into();
1188 let (release_ctx, version) = self
1189 .metadata_manager
1190 .catalog_controller
1191 .drop_object(object_type, object_id, drop_mode)
1192 .await?;
1193
1194 if object_type == ObjectType::Source {
1195 self.env
1196 .notification_manager_ref()
1197 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1198 }
1199
1200 let ReleaseContext {
1201 database_id,
1202 removed_streaming_job_ids,
1203 removed_state_table_ids,
1204 removed_source_ids,
1205 removed_secret_ids: secret_ids,
1206 removed_source_fragments,
1207 removed_actors,
1208 removed_fragments,
1209 removed_sink_fragment_by_targets,
1210 removed_iceberg_table_sinks,
1211 } = release_ctx;
1212
1213 let _guard = self.source_manager.pause_tick().await;
1214 self.stream_manager
1215 .drop_streaming_jobs(
1216 database_id,
1217 removed_actors.iter().map(|id| *id as _).collect(),
1218 removed_streaming_job_ids,
1219 removed_state_table_ids,
1220 removed_fragments.iter().map(|id| *id as _).collect(),
1221 removed_sink_fragment_by_targets
1222 .into_iter()
1223 .map(|(target, sinks)| {
1224 (target as _, sinks.into_iter().map(|id| id as _).collect())
1225 })
1226 .collect(),
1227 )
1228 .await;
1229
1230 self.source_manager
1233 .apply_source_change(SourceChange::DropSource {
1234 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1235 })
1236 .await;
1237
1238 let dropped_source_fragments = removed_source_fragments;
1241 self.source_manager
1242 .apply_source_change(SourceChange::DropMv {
1243 dropped_source_fragments,
1244 })
1245 .await;
1246
1247 let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1249 .iter()
1250 .map(|sink| sink.id)
1251 .collect();
1252
1253 for sink in removed_iceberg_table_sinks {
1254 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1255 .expect("Iceberg sink should be valid");
1256 let iceberg_sink =
1257 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1258 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1259 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1260 tracing::info!(
1261 "dropping iceberg table {} for dropped sink",
1262 table_identifier
1263 );
1264
1265 let _ = iceberg_catalog
1266 .drop_table(&table_identifier)
1267 .await
1268 .inspect_err(|err| {
1269 tracing::error!(
1270 "failed to drop iceberg table {} during cleanup: {}",
1271 table_identifier,
1272 err.as_report()
1273 );
1274 });
1275 }
1276 }
1277
1278 if !iceberg_sink_ids.is_empty() {
1280 self.sink_manager
1281 .stop_sink_coordinator(iceberg_sink_ids.clone())
1282 .await;
1283
1284 for sink_id in iceberg_sink_ids {
1285 self.iceberg_compaction_manager
1286 .clear_iceberg_commits_by_sink_id(sink_id);
1287 }
1288 }
1289
1290 for secret in secret_ids {
1292 LocalSecretManager::global().remove_secret(secret);
1293 }
1294 Ok(version)
1295 }
1296
1297 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1299 pub async fn replace_job(
1300 &self,
1301 mut streaming_job: StreamingJob,
1302 fragment_graph: StreamFragmentGraphProto,
1303 ) -> MetaResult<NotificationVersion> {
1304 match &streaming_job {
1305 StreamingJob::Table(..)
1306 | StreamingJob::Source(..)
1307 | StreamingJob::MaterializedView(..) => {}
1308 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1309 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1310 }
1311 }
1312
1313 let job_id = streaming_job.id();
1314
1315 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1316 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1317
1318 let original_max_parallelism = self
1320 .metadata_manager
1321 .get_job_max_parallelism(streaming_job.id())
1322 .await?;
1323 let fragment_graph = PbStreamFragmentGraph {
1324 max_parallelism: original_max_parallelism as _,
1325 ..fragment_graph
1326 };
1327
1328 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1330 streaming_job.set_info_from_graph(&fragment_graph);
1331
1332 let streaming_job = streaming_job;
1334
1335 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1336 let auto_refresh_schema_sinks = self
1337 .metadata_manager
1338 .catalog_controller
1339 .get_sink_auto_refresh_schema_from(table.id)
1340 .await?;
1341 if !auto_refresh_schema_sinks.is_empty() {
1342 let original_table_columns = self
1343 .metadata_manager
1344 .catalog_controller
1345 .get_table_columns(table.id)
1346 .await?;
1347 let mut original_table_column_ids: HashSet<_> = original_table_columns
1349 .iter()
1350 .map(|col| col.column_id())
1351 .collect();
1352 let newly_added_columns = table
1353 .columns
1354 .iter()
1355 .filter(|col| {
1356 !original_table_column_ids.remove(&ColumnId::new(
1357 col.column_desc.as_ref().unwrap().column_id as _,
1358 ))
1359 })
1360 .map(|col| ColumnCatalog::from(col.clone()))
1361 .collect_vec();
1362 if !original_table_column_ids.is_empty() {
1363 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1364 }
1365 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1366 for sink in auto_refresh_schema_sinks {
1367 let sink_job_fragments = self
1368 .metadata_manager
1369 .get_job_fragments_by_id(sink.id.as_job_id())
1370 .await?;
1371 if sink_job_fragments.fragments.len() != 1 {
1372 return Err(anyhow!(
1373 "auto schema refresh sink must have only one fragment, but got {}",
1374 sink_job_fragments.fragments.len()
1375 )
1376 .into());
1377 }
1378 let original_sink_fragment =
1379 sink_job_fragments.fragments.into_values().next().unwrap();
1380 let (new_sink_fragment, new_schema, new_log_store_table) =
1381 rewrite_refresh_schema_sink_fragment(
1382 &original_sink_fragment,
1383 &sink,
1384 &newly_added_columns,
1385 table,
1386 fragment_graph.table_fragment_id(),
1387 self.env.id_gen_manager(),
1388 self.env.actor_id_generator(),
1389 )?;
1390
1391 assert_eq!(
1392 original_sink_fragment.actors.len(),
1393 new_sink_fragment.actors.len()
1394 );
1395 let actor_status = (0..original_sink_fragment.actors.len())
1396 .map(|i| {
1397 let worker_node_id = sink_job_fragments.actor_status
1398 [&original_sink_fragment.actors[i].actor_id]
1399 .location
1400 .as_ref()
1401 .unwrap()
1402 .worker_node_id;
1403 (
1404 new_sink_fragment.actors[i].actor_id,
1405 PbActorStatus {
1406 location: Some(PbActorLocation { worker_node_id }),
1407 },
1408 )
1409 })
1410 .collect();
1411
1412 let streaming_job = StreamingJob::Sink(sink);
1413
1414 let tmp_sink_id = self
1415 .metadata_manager
1416 .catalog_controller
1417 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1418 .await?
1419 .as_sink_id();
1420 let StreamingJob::Sink(sink) = streaming_job else {
1421 unreachable!()
1422 };
1423
1424 sinks.push(AutoRefreshSchemaSinkContext {
1425 tmp_sink_id,
1426 original_sink: sink,
1427 original_fragment: original_sink_fragment,
1428 new_schema,
1429 newly_add_fields: newly_added_columns
1430 .iter()
1431 .map(|col| Field::from(&col.column_desc))
1432 .collect(),
1433 new_fragment: new_sink_fragment,
1434 new_log_store_table,
1435 actor_status,
1436 });
1437 }
1438 Some(sinks)
1439 } else {
1440 None
1441 }
1442 } else {
1443 None
1444 };
1445
1446 let tmp_id = self
1447 .metadata_manager
1448 .catalog_controller
1449 .create_job_catalog_for_replace(
1450 &streaming_job,
1451 Some(&ctx),
1452 fragment_graph.specified_parallelism().as_ref(),
1453 Some(fragment_graph.max_parallelism()),
1454 )
1455 .await?;
1456
1457 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1458 sinks
1459 .iter()
1460 .map(|sink| sink.tmp_sink_id.as_object_id())
1461 .collect_vec()
1462 });
1463
1464 tracing::debug!(id = %job_id, "building replace streaming job");
1465 let mut updated_sink_catalogs = vec![];
1466
1467 let mut drop_table_connector_ctx = None;
1468 let result: MetaResult<_> = try {
1469 let (mut ctx, mut stream_job_fragments) = self
1470 .build_replace_job(
1471 ctx,
1472 &streaming_job,
1473 fragment_graph,
1474 tmp_id,
1475 auto_refresh_schema_sinks,
1476 )
1477 .await?;
1478 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1479 let auto_refresh_schema_sink_finish_ctx =
1480 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1481 sinks
1482 .iter()
1483 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1484 tmp_sink_id: sink.tmp_sink_id,
1485 original_sink_id: sink.original_sink.id,
1486 columns: sink.new_schema.clone(),
1487 new_log_store_table: sink
1488 .new_log_store_table
1489 .as_ref()
1490 .map(|table| (table.id, table.columns.clone())),
1491 })
1492 .collect()
1493 });
1494
1495 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1497 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1498 let upstream_infos = self
1499 .metadata_manager
1500 .catalog_controller
1501 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1502 .await?;
1503 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1504
1505 for upstream_info in &upstream_infos {
1506 let upstream_fragment_id = upstream_info.sink_fragment_id;
1507 ctx.upstream_fragment_downstreams
1508 .entry(upstream_fragment_id)
1509 .or_default()
1510 .push(upstream_info.new_sink_downstream.clone());
1511 if upstream_info.sink_original_target_columns.is_empty() {
1512 updated_sink_catalogs.push(upstream_info.sink_id);
1513 }
1514 }
1515 }
1516
1517 let replace_upstream = ctx.replace_upstream.clone();
1518
1519 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1520 let empty_downstreams = FragmentDownstreamRelation::default();
1521 for sink in sinks {
1522 self.metadata_manager
1523 .catalog_controller
1524 .prepare_streaming_job(
1525 sink.tmp_sink_id.as_job_id(),
1526 || [&sink.new_fragment].into_iter(),
1527 &empty_downstreams,
1528 true,
1529 None,
1530 None,
1531 )
1532 .await?;
1533 }
1534 }
1535
1536 self.metadata_manager
1537 .catalog_controller
1538 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1539 .await?;
1540
1541 self.stream_manager
1542 .replace_stream_job(stream_job_fragments, ctx)
1543 .await?;
1544 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1545 };
1546
1547 match result {
1548 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1549 let version = self
1550 .metadata_manager
1551 .catalog_controller
1552 .finish_replace_streaming_job(
1553 tmp_id,
1554 streaming_job,
1555 replace_upstream,
1556 SinkIntoTableContext {
1557 updated_sink_catalogs,
1558 },
1559 drop_table_connector_ctx.as_ref(),
1560 auto_refresh_schema_sink_finish_ctx,
1561 )
1562 .await?;
1563 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1564 self.source_manager
1565 .apply_source_change(SourceChange::DropSource {
1566 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1567 })
1568 .await;
1569 }
1570 Ok(version)
1571 }
1572 Err(err) => {
1573 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1574 let _ = self.metadata_manager
1575 .catalog_controller
1576 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1577 .await.inspect_err(|err| {
1578 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1579 });
1580 Err(err)
1581 }
1582 }
1583 }
1584
1585 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1586 )]
1587 async fn drop_streaming_job(
1588 &self,
1589 job_id: StreamingJobId,
1590 drop_mode: DropMode,
1591 ) -> MetaResult<NotificationVersion> {
1592 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1593
1594 let (object_id, object_type) = match job_id {
1595 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1596 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1597 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1598 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1599 };
1600
1601 let job_status = self
1602 .metadata_manager
1603 .catalog_controller
1604 .get_streaming_job_status(job_id.id())
1605 .await?;
1606 let version = match job_status {
1607 JobStatus::Initial => {
1608 unreachable!(
1609 "Job with Initial status should not notify frontend and therefore should not arrive here"
1610 );
1611 }
1612 JobStatus::Creating => {
1613 self.stream_manager
1614 .cancel_streaming_jobs(vec![job_id.id()])
1615 .await?;
1616 IGNORED_NOTIFICATION_VERSION
1617 }
1618 JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1619 };
1620
1621 Ok(version)
1622 }
1623
1624 fn resolve_stream_parallelism(
1628 &self,
1629 specified: Option<NonZeroUsize>,
1630 max: NonZeroUsize,
1631 cluster_info: &StreamingClusterInfo,
1632 resource_group: String,
1633 ) -> MetaResult<NonZeroUsize> {
1634 let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1635 DdlController::resolve_stream_parallelism_inner(
1636 specified,
1637 max,
1638 available,
1639 &self.env.opts.default_parallelism,
1640 &resource_group,
1641 )
1642 }
1643
1644 fn resolve_stream_parallelism_inner(
1645 specified: Option<NonZeroUsize>,
1646 max: NonZeroUsize,
1647 available: Option<NonZeroUsize>,
1648 default_parallelism: &DefaultParallelism,
1649 resource_group: &str,
1650 ) -> MetaResult<NonZeroUsize> {
1651 let Some(available) = available else {
1652 bail_unavailable!(
1653 "no available slots to schedule in resource group \"{}\", \
1654 have you allocated any compute nodes within this resource group?",
1655 resource_group
1656 );
1657 };
1658
1659 if let Some(specified) = specified {
1660 if specified > max {
1661 bail_invalid_parameter!(
1662 "specified parallelism {} should not exceed max parallelism {}",
1663 specified,
1664 max,
1665 );
1666 }
1667 if specified > available {
1668 tracing::warn!(
1669 resource_group,
1670 specified_parallelism = specified.get(),
1671 available_parallelism = available.get(),
1672 "specified parallelism exceeds available slots, scheduling with specified value",
1673 );
1674 }
1675 return Ok(specified);
1676 }
1677
1678 let default_parallelism = match default_parallelism {
1680 DefaultParallelism::Full => available,
1681 DefaultParallelism::Default(num) => {
1682 if *num > available {
1683 tracing::warn!(
1684 resource_group,
1685 configured_parallelism = num.get(),
1686 available_parallelism = available.get(),
1687 "default parallelism exceeds available slots, scheduling with configured value",
1688 );
1689 }
1690 *num
1691 }
1692 };
1693
1694 if default_parallelism > max {
1695 tracing::warn!(
1696 max_parallelism = max.get(),
1697 resource_group,
1698 "default parallelism exceeds max parallelism, capping to max",
1699 );
1700 }
1701 Ok(default_parallelism.min(max))
1702 }
1703
1704 #[await_tree::instrument]
1710 pub(crate) async fn build_stream_job(
1711 &self,
1712 stream_ctx: StreamContext,
1713 mut stream_job: StreamingJob,
1714 fragment_graph: StreamFragmentGraph,
1715 resource_type: streaming_job_resource_type::ResourceType,
1716 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1717 let id = stream_job.id();
1718 let specified_parallelism = fragment_graph.specified_parallelism();
1719 let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1720 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1721
1722 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1724
1725 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1729 fragment_graph.collect_snapshot_backfill_info()?;
1730 assert!(
1731 snapshot_backfill_info
1732 .iter()
1733 .chain([&cross_db_snapshot_backfill_info])
1734 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1735 .all(|backfill_epoch| backfill_epoch.is_none()),
1736 "should not set backfill epoch when initially build the job: {:?} {:?}",
1737 snapshot_backfill_info,
1738 cross_db_snapshot_backfill_info
1739 );
1740
1741 let locality_fragment_state_table_mapping =
1742 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1743
1744 self.metadata_manager
1746 .catalog_controller
1747 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1748 .await?;
1749
1750 let upstream_table_ids = fragment_graph
1751 .dependent_table_ids()
1752 .iter()
1753 .filter(|id| {
1754 !cross_db_snapshot_backfill_info
1755 .upstream_mv_table_id_to_backfill_epoch
1756 .contains_key(*id)
1757 })
1758 .cloned()
1759 .collect();
1760
1761 let (upstream_root_fragments, existing_actor_location) = self
1762 .metadata_manager
1763 .get_upstream_root_fragments(&upstream_table_ids)
1764 .await?;
1765
1766 if snapshot_backfill_info.is_some() {
1767 match stream_job {
1768 StreamingJob::MaterializedView(_)
1769 | StreamingJob::Sink(_)
1770 | StreamingJob::Index(_, _) => {}
1771 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1772 return Err(
1773 anyhow!("snapshot_backfill not enabled for table and source").into(),
1774 );
1775 }
1776 }
1777 }
1778
1779 let upstream_actors = upstream_root_fragments
1780 .values()
1781 .map(|(fragment, _)| {
1782 (
1783 fragment.fragment_id,
1784 fragment.actors.keys().copied().collect(),
1785 )
1786 })
1787 .collect();
1788
1789 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1790 fragment_graph,
1791 FragmentGraphUpstreamContext {
1792 upstream_root_fragments,
1793 upstream_actor_location: existing_actor_location,
1794 },
1795 (&stream_job).into(),
1796 )?;
1797 let resource_group = if let Some(group) = resource_type.resource_group() {
1798 group
1799 } else {
1800 self.metadata_manager
1801 .get_database_resource_group(stream_job.database_id())
1802 .await?
1803 };
1804 let is_serverless_backfill = matches!(
1805 &resource_type,
1806 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1807 );
1808
1809 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1811
1812 let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1813 let parallelism = self.resolve_stream_parallelism(
1814 initial_parallelism,
1815 max_parallelism,
1816 &cluster_info,
1817 resource_group.clone(),
1818 )?;
1819
1820 let parallelism = if initial_parallelism.is_some() {
1821 parallelism.get()
1822 } else {
1823 let adaptive_strategy = match stream_ctx.adaptive_parallelism_strategy {
1825 Some(strategy) => strategy,
1826 None => self
1827 .env
1828 .system_params_reader()
1829 .await
1830 .adaptive_parallelism_strategy(),
1831 };
1832 adaptive_strategy.compute_target_parallelism(parallelism.get())
1833 };
1834
1835 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1836 let actor_graph_builder = ActorGraphBuilder::new(
1837 id,
1838 resource_group,
1839 complete_graph,
1840 cluster_info,
1841 parallelism,
1842 )?;
1843
1844 let ActorGraphBuildResult {
1845 graph,
1846 downstream_fragment_relations,
1847 building_locations,
1848 upstream_fragment_downstreams,
1849 new_no_shuffle,
1850 replace_upstream,
1851 ..
1852 } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1853 assert!(replace_upstream.is_empty());
1854
1855 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1862 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1863 _ => TableParallelism::Fixed(parallelism.get()),
1864 };
1865
1866 let stream_job_fragments = StreamJobFragments::new(
1867 id,
1868 graph,
1869 &building_locations.actor_locations,
1870 stream_ctx.clone(),
1871 table_parallelism,
1872 max_parallelism.get(),
1873 );
1874
1875 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1876 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1877 }
1878
1879 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1880 && let Ok(table_id) = sink.get_target_table()
1881 {
1882 let tables = self
1883 .metadata_manager
1884 .get_table_catalog_by_ids(&[*table_id])
1885 .await?;
1886 let target_table = tables
1887 .first()
1888 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1889 let sink_fragment = stream_job_fragments
1890 .sink_fragment()
1891 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1892 let mview_fragment_id = self
1893 .metadata_manager
1894 .catalog_controller
1895 .get_mview_fragment_by_id(table_id.as_job_id())
1896 .await?;
1897 let upstream_sink_info = build_upstream_sink_info(
1898 sink.id,
1899 sink.original_target_columns.clone(),
1900 sink_fragment.fragment_id as _,
1901 target_table,
1902 mview_fragment_id,
1903 )?;
1904 Some(upstream_sink_info)
1905 } else {
1906 None
1907 };
1908
1909 let mut cdc_table_snapshot_splits = None;
1910 if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1911 && let Some((_, stream_cdc_scan)) =
1912 parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1913 {
1914 {
1915 let splits = try_init_parallel_cdc_table_snapshot_splits(
1917 table.id,
1918 stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1919 self.env.meta_store_ref(),
1920 stream_cdc_scan.options.as_ref().unwrap(),
1921 self.env.opts.cdc_table_split_init_insert_batch_size,
1922 self.env.opts.cdc_table_split_init_sleep_interval_splits,
1923 self.env.opts.cdc_table_split_init_sleep_duration_millis,
1924 )
1925 .await?;
1926 cdc_table_snapshot_splits = Some(splits);
1927 }
1928 }
1929
1930 let ctx = CreateStreamingJobContext {
1931 upstream_fragment_downstreams,
1932 new_no_shuffle,
1933 upstream_actors,
1934 building_locations,
1935 definition: stream_job.definition(),
1936 create_type: stream_job.create_type(),
1937 job_type: (&stream_job).into(),
1938 streaming_job: stream_job,
1939 new_upstream_sink,
1940 option: CreateStreamingJobOption {},
1941 snapshot_backfill_info,
1942 cross_db_snapshot_backfill_info,
1943 fragment_backfill_ordering,
1944 locality_fragment_state_table_mapping,
1945 cdc_table_snapshot_splits,
1946 is_serverless_backfill,
1947 };
1948
1949 Ok((
1950 ctx,
1951 StreamJobFragmentsToCreate {
1952 inner: stream_job_fragments,
1953 downstreams: downstream_fragment_relations,
1954 },
1955 ))
1956 }
1957
1958 pub(crate) async fn build_replace_job(
1964 &self,
1965 stream_ctx: StreamContext,
1966 stream_job: &StreamingJob,
1967 mut fragment_graph: StreamFragmentGraph,
1968 tmp_job_id: JobId,
1969 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1970 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1971 match &stream_job {
1972 StreamingJob::Table(..)
1973 | StreamingJob::Source(..)
1974 | StreamingJob::MaterializedView(..) => {}
1975 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1976 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1977 }
1978 }
1979
1980 let id = stream_job.id();
1981
1982 let mut drop_table_associated_source_id = None;
1984 if let StreamingJob::Table(None, _, _) = &stream_job {
1985 drop_table_associated_source_id = self
1986 .metadata_manager
1987 .get_table_associated_source_id(id.as_mv_table_id())
1988 .await?;
1989 }
1990
1991 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1992 let old_internal_table_ids = old_fragments.internal_table_ids();
1993
1994 let mut drop_table_connector_ctx = None;
1996 if let Some(to_remove_source_id) = drop_table_associated_source_id {
1997 debug_assert!(old_internal_table_ids.len() == 1);
1999
2000 drop_table_connector_ctx = Some(DropTableConnectorContext {
2001 to_change_streaming_job_id: id,
2004 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
2006 });
2007 } else if stream_job.is_materialized_view() {
2008 let old_fragments_upstreams = self
2011 .metadata_manager
2012 .catalog_controller
2013 .upstream_fragments(old_fragments.fragment_ids())
2014 .await?;
2015
2016 let old_state_graph =
2017 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2018 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2019 let result = state_match::match_graph(&new_state_graph, &old_state_graph)
2020 .context("incompatible altering on the streaming job states")?;
2021
2022 fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2023 fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2024 } else {
2025 let old_internal_tables = self
2028 .metadata_manager
2029 .get_table_catalog_by_ids(&old_internal_table_ids)
2030 .await?;
2031 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2032 }
2033
2034 let original_root_fragment = old_fragments
2037 .root_fragment()
2038 .expect("root fragment not found");
2039
2040 let job_type = StreamingJobType::from(stream_job);
2041
2042 let (mut downstream_fragments, mut downstream_actor_location) =
2044 self.metadata_manager.get_downstream_fragments(id).await?;
2045
2046 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2047 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2048 .iter()
2049 .map(|sink| sink.original_fragment.fragment_id)
2050 .collect();
2051 for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2052 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2053 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2054 }) {
2055 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2056 for actor_id in downstream_fragment.actors.keys() {
2057 downstream_actor_location.remove(actor_id);
2058 }
2059 for (actor_id, status) in &sink.actor_status {
2060 downstream_actor_location
2061 .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2062 }
2063
2064 *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2065 *nodes = sink.new_fragment.nodes.clone();
2066 }
2067 }
2068 assert!(remaining_fragment.is_empty());
2069 }
2070
2071 let complete_graph = match &job_type {
2073 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2074 CompleteStreamFragmentGraph::with_downstreams(
2075 fragment_graph,
2076 FragmentGraphDownstreamContext {
2077 original_root_fragment_id: original_root_fragment.fragment_id,
2078 downstream_fragments,
2079 downstream_actor_location,
2080 },
2081 job_type,
2082 )?
2083 }
2084 StreamingJobType::Table(TableJobType::SharedCdcSource)
2085 | StreamingJobType::MaterializedView => {
2086 let (upstream_root_fragments, upstream_actor_location) = self
2088 .metadata_manager
2089 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2090 .await?;
2091
2092 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2093 fragment_graph,
2094 FragmentGraphUpstreamContext {
2095 upstream_root_fragments,
2096 upstream_actor_location,
2097 },
2098 FragmentGraphDownstreamContext {
2099 original_root_fragment_id: original_root_fragment.fragment_id,
2100 downstream_fragments,
2101 downstream_actor_location,
2102 },
2103 job_type,
2104 )?
2105 }
2106 _ => unreachable!(),
2107 };
2108
2109 let resource_group = self
2110 .metadata_manager
2111 .get_existing_job_resource_group(id)
2112 .await?;
2113
2114 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2116
2117 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2120 .expect("The number of actors in the original table fragment should be greater than 0");
2121
2122 let actor_graph_builder = ActorGraphBuilder::new(
2123 id,
2124 resource_group,
2125 complete_graph,
2126 cluster_info,
2127 parallelism,
2128 )?;
2129
2130 let ActorGraphBuildResult {
2131 graph,
2132 downstream_fragment_relations,
2133 building_locations,
2134 upstream_fragment_downstreams,
2135 mut replace_upstream,
2136 new_no_shuffle,
2137 ..
2138 } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2139
2140 if matches!(
2142 job_type,
2143 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2144 ) {
2145 assert!(upstream_fragment_downstreams.is_empty());
2146 }
2147
2148 let stream_job_fragments = StreamJobFragments::new(
2152 tmp_job_id,
2153 graph,
2154 &building_locations.actor_locations,
2155 stream_ctx,
2156 old_fragments.assigned_parallelism,
2157 old_fragments.max_parallelism,
2158 );
2159
2160 if let Some(sinks) = &auto_refresh_schema_sinks {
2161 for sink in sinks {
2162 replace_upstream
2163 .remove(&sink.new_fragment.fragment_id)
2164 .expect("should exist");
2165 }
2166 }
2167
2168 let ctx = ReplaceStreamJobContext {
2172 old_fragments,
2173 replace_upstream,
2174 new_no_shuffle,
2175 upstream_fragment_downstreams,
2176 building_locations,
2177 streaming_job: stream_job.clone(),
2178 tmp_id: tmp_job_id,
2179 drop_table_connector_ctx,
2180 auto_refresh_schema_sinks,
2181 };
2182
2183 Ok((
2184 ctx,
2185 StreamJobFragmentsToCreate {
2186 inner: stream_job_fragments,
2187 downstreams: downstream_fragment_relations,
2188 },
2189 ))
2190 }
2191
2192 async fn alter_name(
2193 &self,
2194 relation: alter_name_request::Object,
2195 new_name: &str,
2196 ) -> MetaResult<NotificationVersion> {
2197 let (obj_type, id): (ObjectType, ObjectId) = match relation {
2198 alter_name_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2199 alter_name_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2200 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id.into()),
2201 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2202 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2203 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2204 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2205 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2206 };
2207 self.metadata_manager
2208 .catalog_controller
2209 .alter_name(obj_type, id, new_name)
2210 .await
2211 }
2212
2213 async fn alter_swap_rename(
2214 &self,
2215 object: alter_swap_rename_request::Object,
2216 ) -> MetaResult<NotificationVersion> {
2217 let (obj_type, src_id, dst_id) = match object {
2218 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2219 alter_swap_rename_request::Object::Table(objs) => {
2220 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2221 (ObjectType::Table, src_id, dst_id)
2222 }
2223 alter_swap_rename_request::Object::View(objs) => {
2224 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2225 (ObjectType::View, src_id, dst_id)
2226 }
2227 alter_swap_rename_request::Object::Source(objs) => {
2228 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2229 (ObjectType::Source, src_id, dst_id)
2230 }
2231 alter_swap_rename_request::Object::Sink(objs) => {
2232 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2233 (ObjectType::Sink, src_id, dst_id)
2234 }
2235 alter_swap_rename_request::Object::Subscription(objs) => {
2236 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2237 (ObjectType::Subscription, src_id, dst_id)
2238 }
2239 };
2240
2241 self.metadata_manager
2242 .catalog_controller
2243 .alter_swap_rename(obj_type, src_id, dst_id)
2244 .await
2245 }
2246
2247 async fn alter_owner(
2248 &self,
2249 object: Object,
2250 owner_id: UserId,
2251 ) -> MetaResult<NotificationVersion> {
2252 let (obj_type, id): (ObjectType, ObjectId) = match object {
2253 Object::TableId(id) => (ObjectType::Table, id.into()),
2254 Object::ViewId(id) => (ObjectType::View, id.into()),
2255 Object::SourceId(id) => (ObjectType::Source, id.into()),
2256 Object::SinkId(id) => (ObjectType::Sink, id.into()),
2257 Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2258 Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2259 Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2260 Object::ConnectionId(id) => (ObjectType::Connection, id.into()),
2261 Object::FunctionId(id) => (ObjectType::Function, id.into()),
2262 Object::SecretId(id) => (ObjectType::Secret, id.into()),
2263 };
2264 self.metadata_manager
2265 .catalog_controller
2266 .alter_owner(obj_type, id, owner_id as _)
2267 .await
2268 }
2269
2270 async fn alter_set_schema(
2271 &self,
2272 object: alter_set_schema_request::Object,
2273 new_schema_id: SchemaId,
2274 ) -> MetaResult<NotificationVersion> {
2275 let (obj_type, id): (ObjectType, ObjectId) = match object {
2276 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2277 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2278 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2279 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2280 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id.into()),
2281 alter_set_schema_request::Object::ConnectionId(id) => {
2282 (ObjectType::Connection, id.into())
2283 }
2284 alter_set_schema_request::Object::SubscriptionId(id) => {
2285 (ObjectType::Subscription, id.into())
2286 }
2287 };
2288 self.metadata_manager
2289 .catalog_controller
2290 .alter_schema(obj_type, id, new_schema_id)
2291 .await
2292 }
2293
2294 pub async fn wait(&self) -> MetaResult<WaitVersion> {
2295 let timeout_ms = 30 * 60 * 1000;
2296 for _ in 0..timeout_ms {
2297 if self
2298 .metadata_manager
2299 .catalog_controller
2300 .list_background_creating_jobs(true, None)
2301 .await?
2302 .is_empty()
2303 {
2304 let catalog_version = self
2305 .metadata_manager
2306 .catalog_controller
2307 .notify_frontend_trivial()
2308 .await;
2309 let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2310 return Ok(WaitVersion {
2311 catalog_version,
2312 hummock_version_id,
2313 });
2314 }
2315
2316 sleep(Duration::from_millis(1)).await;
2317 }
2318 Err(MetaError::cancelled(format!(
2319 "timeout after {timeout_ms}ms"
2320 )))
2321 }
2322
2323 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2324 self.metadata_manager
2325 .catalog_controller
2326 .comment_on(comment)
2327 .await
2328 }
2329
2330 async fn alter_streaming_job_config(
2331 &self,
2332 job_id: JobId,
2333 entries_to_add: HashMap<String, String>,
2334 keys_to_remove: Vec<String>,
2335 ) -> MetaResult<NotificationVersion> {
2336 self.metadata_manager
2337 .catalog_controller
2338 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2339 .await
2340 }
2341}
2342
2343fn report_create_object(
2344 job_id: JobId,
2345 event_name: &str,
2346 obj_type: PbTelemetryDatabaseObject,
2347 connector_name: Option<String>,
2348 attr_info: Option<jsonbb::Value>,
2349) {
2350 report_event(
2351 PbTelemetryEventStage::CreateStreamJob,
2352 event_name,
2353 job_id.as_raw_id() as _,
2354 connector_name,
2355 Some(obj_type),
2356 attr_info,
2357 );
2358}
2359
2360pub fn build_upstream_sink_info(
2361 sink_id: SinkId,
2362 original_target_columns: Vec<PbColumnCatalog>,
2363 sink_fragment_id: FragmentId,
2364 target_table: &PbTable,
2365 target_fragment_id: FragmentId,
2366) -> MetaResult<UpstreamSinkInfo> {
2367 let sink_columns = if !original_target_columns.is_empty() {
2368 original_target_columns.clone()
2369 } else {
2370 target_table.columns.clone()
2375 };
2376
2377 let sink_output_fields = sink_columns
2378 .iter()
2379 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2380 .collect_vec();
2381 let output_indices = (0..sink_output_fields.len())
2382 .map(|i| i as u32)
2383 .collect_vec();
2384
2385 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2386 let sink_idx_by_col_id = sink_columns
2387 .iter()
2388 .enumerate()
2389 .map(|(idx, col)| {
2390 let column_id = col.column_desc.as_ref().unwrap().column_id;
2391 (column_id, idx as u32)
2392 })
2393 .collect::<HashMap<_, _>>();
2394 target_table
2395 .distribution_key
2396 .iter()
2397 .map(|dist_idx| {
2398 let column_id = target_table.columns[*dist_idx as usize]
2399 .column_desc
2400 .as_ref()
2401 .unwrap()
2402 .column_id;
2403 let sink_idx = sink_idx_by_col_id
2404 .get(&column_id)
2405 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2406 Ok(*sink_idx)
2407 })
2408 .collect::<anyhow::Result<Vec<_>>>()?
2409 };
2410 let dist_key_indices =
2411 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2412 let downstream_fragment_id = target_fragment_id as _;
2413 let new_downstream_relation = DownstreamFragmentRelation {
2414 downstream_fragment_id,
2415 dispatcher_type: DispatcherType::Hash,
2416 dist_key_indices,
2417 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2418 };
2419 let current_target_columns = target_table.get_columns();
2420 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2421 Ok(UpstreamSinkInfo {
2422 sink_id,
2423 sink_fragment_id: sink_fragment_id as _,
2424 sink_output_fields,
2425 sink_original_target_columns: original_target_columns,
2426 project_exprs,
2427 new_sink_downstream: new_downstream_relation,
2428 })
2429}
2430
2431pub fn refill_upstream_sink_union_in_table(
2432 union_fragment_root: &mut PbStreamNode,
2433 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2434) {
2435 visit_stream_node_cont_mut(union_fragment_root, |node| {
2436 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2437 let init_upstreams = upstream_sink_infos
2438 .iter()
2439 .map(|info| PbUpstreamSinkInfo {
2440 upstream_fragment_id: info.sink_fragment_id,
2441 sink_output_schema: info.sink_output_fields.clone(),
2442 project_exprs: info.project_exprs.clone(),
2443 })
2444 .collect();
2445 upstream_sink_union.init_upstreams = init_upstreams;
2446 false
2447 } else {
2448 true
2449 }
2450 });
2451}
2452
2453#[cfg(test)]
2454mod tests {
2455 use std::num::NonZeroUsize;
2456
2457 use super::*;
2458
2459 #[test]
2460 fn test_specified_parallelism_exceeds_available() {
2461 let result = DdlController::resolve_stream_parallelism_inner(
2462 Some(NonZeroUsize::new(100).unwrap()),
2463 NonZeroUsize::new(256).unwrap(),
2464 Some(NonZeroUsize::new(4).unwrap()),
2465 &DefaultParallelism::Full,
2466 "default",
2467 )
2468 .unwrap();
2469 assert_eq!(result.get(), 100);
2470 }
2471
2472 #[test]
2473 fn test_allows_default_parallelism_over_available() {
2474 let result = DdlController::resolve_stream_parallelism_inner(
2475 None,
2476 NonZeroUsize::new(256).unwrap(),
2477 Some(NonZeroUsize::new(4).unwrap()),
2478 &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2479 "default",
2480 )
2481 .unwrap();
2482 assert_eq!(result.get(), 50);
2483 }
2484
2485 #[test]
2486 fn test_full_parallelism_capped_by_max() {
2487 let result = DdlController::resolve_stream_parallelism_inner(
2488 None,
2489 NonZeroUsize::new(6).unwrap(),
2490 Some(NonZeroUsize::new(10).unwrap()),
2491 &DefaultParallelism::Full,
2492 "default",
2493 )
2494 .unwrap();
2495 assert_eq!(result.get(), 6);
2496 }
2497
2498 #[test]
2499 fn test_no_available_slots_returns_error() {
2500 let result = DdlController::resolve_stream_parallelism_inner(
2501 None,
2502 NonZeroUsize::new(4).unwrap(),
2503 None,
2504 &DefaultParallelism::Full,
2505 "default",
2506 );
2507 assert!(matches!(
2508 result,
2509 Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2510 ));
2511 }
2512
2513 #[test]
2514 fn test_specified_over_max_returns_error() {
2515 let result = DdlController::resolve_stream_parallelism_inner(
2516 Some(NonZeroUsize::new(8).unwrap()),
2517 NonZeroUsize::new(4).unwrap(),
2518 Some(NonZeroUsize::new(10).unwrap()),
2519 &DefaultParallelism::Full,
2520 "default",
2521 );
2522 assert!(matches!(
2523 result,
2524 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2525 ));
2526 }
2527}