1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50 Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
51 Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57 alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64 StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::{OwnedSemaphorePermit, Semaphore};
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::cluster::StreamingClusterInfo;
76use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
77use crate::controller::utils::build_select_node_list;
78use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
79use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
80use crate::manager::sink_coordination::SinkCoordinatorManager;
81use crate::manager::{
82 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
83 NotificationVersion, StreamingJob, StreamingJobType,
84};
85use crate::model::{
86 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
87 FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
88 TableParallelism,
89};
90use crate::stream::cdc::{
91 parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
92};
93use crate::stream::{
94 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
95 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
96 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
97 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
98 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
99 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
100};
101use crate::telemetry::report_event;
102use crate::{MetaError, MetaResult};
103
104#[derive(PartialEq)]
105pub enum DropMode {
106 Restrict,
107 Cascade,
108}
109
110impl DropMode {
111 pub fn from_request_setting(cascade: bool) -> DropMode {
112 if cascade {
113 DropMode::Cascade
114 } else {
115 DropMode::Restrict
116 }
117 }
118}
119
120#[derive(strum::AsRefStr)]
121pub enum StreamingJobId {
122 MaterializedView(TableId),
123 Sink(SinkId),
124 Table(Option<SourceId>, TableId),
125 Index(IndexId),
126}
127
128impl std::fmt::Display for StreamingJobId {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 write!(f, "{}", self.as_ref())?;
131 write!(f, "({})", self.id())
132 }
133}
134
135impl StreamingJobId {
136 fn id(&self) -> JobId {
137 match self {
138 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
139 StreamingJobId::Index(id) => id.as_job_id(),
140 StreamingJobId::Sink(id) => id.as_job_id(),
141 }
142 }
143}
144
145pub struct ReplaceStreamJobInfo {
148 pub streaming_job: StreamingJob,
149 pub fragment_graph: StreamFragmentGraphProto,
150}
151
152#[derive(Display)]
153pub enum DdlCommand {
154 CreateDatabase(Database),
155 DropDatabase(DatabaseId),
156 CreateSchema(Schema),
157 DropSchema(SchemaId, DropMode),
158 CreateNonSharedSource(Source),
159 DropSource(SourceId, DropMode),
160 ResetSource(SourceId),
161 CreateFunction(Function),
162 DropFunction(FunctionId, DropMode),
163 CreateView(View, HashSet<ObjectId>),
164 DropView(ViewId, DropMode),
165 CreateStreamingJob {
166 stream_job: StreamingJob,
167 fragment_graph: StreamFragmentGraphProto,
168 dependencies: HashSet<ObjectId>,
169 resource_type: streaming_job_resource_type::ResourceType,
170 if_not_exists: bool,
171 },
172 DropStreamingJob {
173 job_id: StreamingJobId,
174 drop_mode: DropMode,
175 },
176 AlterName(alter_name_request::Object, String),
177 AlterSwapRename(alter_swap_rename_request::Object),
178 ReplaceStreamJob(ReplaceStreamJobInfo),
179 AlterNonSharedSource(Source),
180 AlterObjectOwner(Object, UserId),
181 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
182 CreateConnection(Connection),
183 DropConnection(ConnectionId, DropMode),
184 CreateSecret(Secret),
185 AlterSecret(Secret),
186 DropSecret(SecretId),
187 CommentOn(Comment),
188 CreateSubscription(Subscription),
189 DropSubscription(SubscriptionId, DropMode),
190 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
191 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
192}
193
194impl DdlCommand {
195 fn object(&self) -> Either<String, ObjectId> {
197 use Either::*;
198 match self {
199 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
200 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
201 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
202 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
203 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
204 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
205 DdlCommand::ResetSource(id) => Right(id.as_object_id()),
206 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
207 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
208 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
209 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
210 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
211 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
212 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
213 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
214 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
215 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
216 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
217 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
218 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
219 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
220 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
221 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
222 DdlCommand::DropSecret(id) => Right(id.as_object_id()),
223 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
224 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
225 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
226 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
227 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
228 }
229 }
230
231 fn allow_in_recovery(&self) -> bool {
232 match self {
233 DdlCommand::DropDatabase(_)
234 | DdlCommand::DropSchema(_, _)
235 | DdlCommand::DropSource(_, _)
236 | DdlCommand::DropFunction(_, _)
237 | DdlCommand::DropView(_, _)
238 | DdlCommand::DropStreamingJob { .. }
239 | DdlCommand::DropConnection(_, _)
240 | DdlCommand::DropSecret(_)
241 | DdlCommand::DropSubscription(_, _)
242 | DdlCommand::AlterName(_, _)
243 | DdlCommand::AlterObjectOwner(_, _)
244 | DdlCommand::AlterSetSchema(_, _)
245 | DdlCommand::CreateDatabase(_)
246 | DdlCommand::CreateSchema(_)
247 | DdlCommand::CreateFunction(_)
248 | DdlCommand::CreateView(_, _)
249 | DdlCommand::CreateConnection(_)
250 | DdlCommand::CommentOn(_)
251 | DdlCommand::CreateSecret(_)
252 | DdlCommand::AlterSecret(_)
253 | DdlCommand::AlterSwapRename(_)
254 | DdlCommand::AlterDatabaseParam(_, _)
255 | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
256 DdlCommand::CreateStreamingJob { .. }
257 | DdlCommand::CreateNonSharedSource(_)
258 | DdlCommand::ReplaceStreamJob(_)
259 | DdlCommand::AlterNonSharedSource(_)
260 | DdlCommand::ResetSource(_)
261 | DdlCommand::CreateSubscription(_) => false,
262 }
263 }
264}
265
266#[derive(Clone)]
267pub struct DdlController {
268 pub(crate) env: MetaSrvEnv,
269
270 pub(crate) metadata_manager: MetadataManager,
271 pub(crate) stream_manager: GlobalStreamManagerRef,
272 pub(crate) source_manager: SourceManagerRef,
273 barrier_manager: BarrierManagerRef,
274 sink_manager: SinkCoordinatorManager,
275 iceberg_compaction_manager: IcebergCompactionManagerRef,
276
277 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
279
280 seq: Arc<AtomicU64>,
282}
283
284#[derive(Clone)]
285pub struct CreatingStreamingJobPermit {
286 pub(crate) semaphore: Arc<Semaphore>,
287}
288
289impl CreatingStreamingJobPermit {
290 async fn new(env: &MetaSrvEnv) -> Self {
291 let mut permits = env
292 .system_params_reader()
293 .await
294 .max_concurrent_creating_streaming_jobs() as usize;
295 if permits == 0 {
296 permits = Semaphore::MAX_PERMITS;
298 }
299 let semaphore = Arc::new(Semaphore::new(permits));
300
301 let (local_notification_tx, mut local_notification_rx) =
302 tokio::sync::mpsc::unbounded_channel();
303 env.notification_manager()
304 .insert_local_sender(local_notification_tx);
305 let semaphore_clone = semaphore.clone();
306 tokio::spawn(async move {
307 while let Some(notification) = local_notification_rx.recv().await {
308 let LocalNotification::SystemParamsChange(p) = ¬ification else {
309 continue;
310 };
311 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
312 if new_permits == 0 {
313 new_permits = Semaphore::MAX_PERMITS;
314 }
315 match permits.cmp(&new_permits) {
316 Ordering::Less => {
317 semaphore_clone.add_permits(new_permits - permits);
318 }
319 Ordering::Equal => continue,
320 Ordering::Greater => {
321 let to_release = permits - new_permits;
322 let reduced = semaphore_clone.forget_permits(to_release);
323 if reduced != to_release {
325 tracing::warn!(
326 "no enough permits to release, expected {}, but reduced {}",
327 to_release,
328 reduced
329 );
330 }
331 }
332 }
333 tracing::info!(
334 "max_concurrent_creating_streaming_jobs changed from {} to {}",
335 permits,
336 new_permits
337 );
338 permits = new_permits;
339 }
340 });
341
342 Self { semaphore }
343 }
344}
345
346impl DdlController {
347 pub async fn new(
348 env: MetaSrvEnv,
349 metadata_manager: MetadataManager,
350 stream_manager: GlobalStreamManagerRef,
351 source_manager: SourceManagerRef,
352 barrier_manager: BarrierManagerRef,
353 sink_manager: SinkCoordinatorManager,
354 iceberg_compaction_manager: IcebergCompactionManagerRef,
355 ) -> Self {
356 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
357 Self {
358 env,
359 metadata_manager,
360 stream_manager,
361 source_manager,
362 barrier_manager,
363 sink_manager,
364 iceberg_compaction_manager,
365 creating_streaming_job_permits,
366 seq: Arc::new(AtomicU64::new(0)),
367 }
368 }
369
370 pub fn next_seq(&self) -> u64 {
372 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
374 }
375
376 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
383 if !command.allow_in_recovery() {
384 self.barrier_manager.check_status_running()?;
385 }
386
387 let await_tree_key = format!("DDL Command {}", self.next_seq());
388 let await_tree_span = await_tree::span!("{command}({})", command.object());
389
390 let ctrl = self.clone();
391 let fut = async move {
392 match command {
393 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
394 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
395 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
396 DdlCommand::DropSchema(schema_id, drop_mode) => {
397 ctrl.drop_schema(schema_id, drop_mode).await
398 }
399 DdlCommand::CreateNonSharedSource(source) => {
400 ctrl.create_non_shared_source(source).await
401 }
402 DdlCommand::DropSource(source_id, drop_mode) => {
403 ctrl.drop_source(source_id, drop_mode).await
404 }
405 DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
406 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
407 DdlCommand::DropFunction(function_id, drop_mode) => {
408 ctrl.drop_function(function_id, drop_mode).await
409 }
410 DdlCommand::CreateView(view, dependencies) => {
411 ctrl.create_view(view, dependencies).await
412 }
413 DdlCommand::DropView(view_id, drop_mode) => {
414 ctrl.drop_view(view_id, drop_mode).await
415 }
416 DdlCommand::CreateStreamingJob {
417 stream_job,
418 fragment_graph,
419 dependencies,
420 resource_type,
421 if_not_exists,
422 } => {
423 ctrl.create_streaming_job(
424 stream_job,
425 fragment_graph,
426 dependencies,
427 resource_type,
428 if_not_exists,
429 )
430 .await
431 }
432 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
433 ctrl.drop_streaming_job(job_id, drop_mode).await
434 }
435 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
436 streaming_job,
437 fragment_graph,
438 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
439 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
440 DdlCommand::AlterObjectOwner(object, owner_id) => {
441 ctrl.alter_owner(object, owner_id).await
442 }
443 DdlCommand::AlterSetSchema(object, new_schema_id) => {
444 ctrl.alter_set_schema(object, new_schema_id).await
445 }
446 DdlCommand::CreateConnection(connection) => {
447 ctrl.create_connection(connection).await
448 }
449 DdlCommand::DropConnection(connection_id, drop_mode) => {
450 ctrl.drop_connection(connection_id, drop_mode).await
451 }
452 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
453 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
454 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
455 DdlCommand::AlterNonSharedSource(source) => {
456 ctrl.alter_non_shared_source(source).await
457 }
458 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
459 DdlCommand::CreateSubscription(subscription) => {
460 ctrl.create_subscription(subscription).await
461 }
462 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
463 ctrl.drop_subscription(subscription_id, drop_mode).await
464 }
465 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
466 DdlCommand::AlterDatabaseParam(database_id, param) => {
467 ctrl.alter_database_param(database_id, param).await
468 }
469 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
470 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
471 .await
472 }
473 }
474 }
475 .in_current_span();
476 let fut = (self.env.await_tree_reg())
477 .register(await_tree_key, await_tree_span)
478 .instrument(Box::pin(fut));
479 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
480 Ok(Some(WaitVersion {
481 catalog_version: notification_version,
482 hummock_version_id: self.barrier_manager.get_hummock_version_id().await,
483 }))
484 }
485
486 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
487 self.barrier_manager.get_ddl_progress().await
488 }
489
490 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
491 let (version, updated_db) = self
492 .metadata_manager
493 .catalog_controller
494 .create_database(database)
495 .await?;
496 self.barrier_manager
498 .update_database_barrier(
499 updated_db.database_id,
500 updated_db.barrier_interval_ms.map(|v| v as u32),
501 updated_db.checkpoint_frequency.map(|v| v as u64),
502 )
503 .await?;
504 Ok(version)
505 }
506
507 #[tracing::instrument(skip(self), level = "debug")]
508 pub async fn reschedule_streaming_job(
509 &self,
510 job_id: JobId,
511 target: ReschedulePolicy,
512 mut deferred: bool,
513 ) -> MetaResult<()> {
514 tracing::info!("altering parallelism for job {}", job_id);
515 if self.barrier_manager.check_status_running().is_err() {
516 tracing::info!(
517 "alter parallelism is set to deferred mode because the system is in recovery state"
518 );
519 deferred = true;
520 }
521
522 self.stream_manager
523 .reschedule_streaming_job(job_id, target, deferred)
524 .await
525 }
526
527 pub async fn reschedule_cdc_table_backfill(
528 &self,
529 job_id: JobId,
530 target: ReschedulePolicy,
531 ) -> MetaResult<()> {
532 tracing::info!("alter CDC table backfill parallelism");
533 if self.barrier_manager.check_status_running().is_err() {
534 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
535 }
536 self.stream_manager
537 .reschedule_cdc_table_backfill(job_id, target)
538 .await
539 }
540
541 pub async fn reschedule_fragments(
542 &self,
543 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
544 ) -> MetaResult<()> {
545 tracing::info!(
546 "altering parallelism for fragments {:?}",
547 fragment_targets.keys()
548 );
549 let fragment_targets = fragment_targets
550 .into_iter()
551 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
552 .collect();
553
554 self.stream_manager
555 .reschedule_fragments(fragment_targets)
556 .await
557 }
558
559 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
560 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
561 .await
562 }
563
564 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
565 self.metadata_manager
566 .catalog_controller
567 .create_schema(schema)
568 .await
569 }
570
571 async fn drop_schema(
572 &self,
573 schema_id: SchemaId,
574 drop_mode: DropMode,
575 ) -> MetaResult<NotificationVersion> {
576 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
577 .await
578 }
579
580 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
582 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
583 .await
584 .context("failed to create source worker")?;
585
586 let (source_id, version) = self
587 .metadata_manager
588 .catalog_controller
589 .create_source(source)
590 .await?;
591 self.source_manager
592 .register_source_with_handle(source_id, handle)
593 .await;
594 Ok(version)
595 }
596
597 async fn drop_source(
598 &self,
599 source_id: SourceId,
600 drop_mode: DropMode,
601 ) -> MetaResult<NotificationVersion> {
602 self.drop_object(ObjectType::Source, source_id, drop_mode)
603 .await
604 }
605
606 async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
607 tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
608
609 let database_id = self
611 .metadata_manager
612 .catalog_controller
613 .get_object_database_id(source_id)
614 .await?;
615
616 self.stream_manager
617 .barrier_scheduler
618 .run_command(database_id, Command::ResetSource { source_id })
619 .await?;
620
621 let version = self
623 .metadata_manager
624 .catalog_controller
625 .current_notification_version()
626 .await;
627 Ok(version)
628 }
629
630 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
633 self.metadata_manager
634 .catalog_controller
635 .alter_non_shared_source(source)
636 .await
637 }
638
639 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
640 self.metadata_manager
641 .catalog_controller
642 .create_function(function)
643 .await
644 }
645
646 async fn drop_function(
647 &self,
648 function_id: FunctionId,
649 drop_mode: DropMode,
650 ) -> MetaResult<NotificationVersion> {
651 self.drop_object(ObjectType::Function, function_id, drop_mode)
652 .await
653 }
654
655 async fn create_view(
656 &self,
657 view: View,
658 dependencies: HashSet<ObjectId>,
659 ) -> MetaResult<NotificationVersion> {
660 self.metadata_manager
661 .catalog_controller
662 .create_view(view, dependencies)
663 .await
664 }
665
666 async fn drop_view(
667 &self,
668 view_id: ViewId,
669 drop_mode: DropMode,
670 ) -> MetaResult<NotificationVersion> {
671 self.drop_object(ObjectType::View, view_id, drop_mode).await
672 }
673
674 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
675 validate_connection(&connection).await?;
676 self.metadata_manager
677 .catalog_controller
678 .create_connection(connection)
679 .await
680 }
681
682 async fn drop_connection(
683 &self,
684 connection_id: ConnectionId,
685 drop_mode: DropMode,
686 ) -> MetaResult<NotificationVersion> {
687 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
688 .await
689 }
690
691 async fn alter_database_param(
692 &self,
693 database_id: DatabaseId,
694 param: AlterDatabaseParam,
695 ) -> MetaResult<NotificationVersion> {
696 let (version, updated_db) = self
697 .metadata_manager
698 .catalog_controller
699 .alter_database_param(database_id, param)
700 .await?;
701 self.barrier_manager
703 .update_database_barrier(
704 database_id,
705 updated_db.barrier_interval_ms.map(|v| v as u32),
706 updated_db.checkpoint_frequency.map(|v| v as u64),
707 )
708 .await?;
709 Ok(version)
710 }
711
712 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
715 let secret_store_private_key = self
716 .env
717 .opts
718 .secret_store_private_key
719 .clone()
720 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
721
722 let encrypted_payload = SecretEncryption::encrypt(
723 secret_store_private_key.as_slice(),
724 secret.get_value().as_slice(),
725 )
726 .context(format!("failed to encrypt secret {}", secret.name))?;
727 Ok(encrypted_payload
728 .serialize()
729 .context(format!("failed to serialize secret {}", secret.name))?)
730 }
731
732 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
733 let secret_plain_payload = secret.value.clone();
736 let encrypted_payload = self.get_encrypted_payload(&secret)?;
737 secret.value = encrypted_payload;
738
739 self.metadata_manager
740 .catalog_controller
741 .create_secret(secret, secret_plain_payload)
742 .await
743 }
744
745 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
746 self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
747 .await
748 }
749
750 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
751 let secret_plain_payload = secret.value.clone();
752 let encrypted_payload = self.get_encrypted_payload(&secret)?;
753 secret.value = encrypted_payload;
754 self.metadata_manager
755 .catalog_controller
756 .alter_secret(secret, secret_plain_payload)
757 .await
758 }
759
760 async fn create_subscription(
761 &self,
762 mut subscription: Subscription,
763 ) -> MetaResult<NotificationVersion> {
764 tracing::debug!("create subscription");
765 let _permit = self
766 .creating_streaming_job_permits
767 .semaphore
768 .acquire()
769 .await
770 .unwrap();
771 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
772 self.metadata_manager
773 .catalog_controller
774 .create_subscription_catalog(&mut subscription)
775 .await?;
776 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
777 tracing::debug!(error = %err.as_report(), "failed to create subscription");
778 let _ = self
779 .metadata_manager
780 .catalog_controller
781 .try_abort_creating_subscription(subscription.id)
782 .await
783 .inspect_err(|e| {
784 tracing::error!(
785 error = %e.as_report(),
786 "failed to abort create subscription after failure"
787 );
788 });
789 return Err(err);
790 }
791
792 let version = self
793 .metadata_manager
794 .catalog_controller
795 .notify_create_subscription(subscription.id)
796 .await?;
797 tracing::debug!("finish create subscription");
798 Ok(version)
799 }
800
801 async fn drop_subscription(
802 &self,
803 subscription_id: SubscriptionId,
804 drop_mode: DropMode,
805 ) -> MetaResult<NotificationVersion> {
806 tracing::debug!("preparing drop subscription");
807 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
808 let subscription = self
809 .metadata_manager
810 .catalog_controller
811 .get_subscription_by_id(subscription_id)
812 .await?;
813 let table_id = subscription.dependent_table_id;
814 let database_id = subscription.database_id;
815 let (_, version) = self
816 .metadata_manager
817 .catalog_controller
818 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
819 .await?;
820 self.stream_manager
821 .drop_subscription(database_id, subscription_id, table_id)
822 .await;
823 tracing::debug!("finish drop subscription");
824 Ok(version)
825 }
826
827 #[await_tree::instrument]
829 pub(crate) async fn validate_cdc_table(
830 &self,
831 table: &Table,
832 table_fragments: &StreamJobFragments,
833 ) -> MetaResult<()> {
834 let stream_scan_fragment = table_fragments
835 .fragments
836 .values()
837 .filter(|f| {
838 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
839 || f.fragment_type_mask
840 .contains(FragmentTypeFlag::StreamCdcScan)
841 })
842 .exactly_one()
843 .ok()
844 .with_context(|| {
845 format!(
846 "expect exactly one stream scan fragment, got: {:?}",
847 table_fragments.fragments
848 )
849 })?;
850 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
851 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
852 if let Some(o) = node.options
853 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
854 {
855 } else {
857 assert_eq!(
858 stream_scan_fragment.actors.len(),
859 1,
860 "Stream scan fragment should have only one actor"
861 );
862 }
863 }
864 }
865 let mut found_cdc_scan = false;
866 match &stream_scan_fragment.nodes.node_body {
867 Some(NodeBody::StreamCdcScan(_)) => {
868 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
869 if self
870 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
871 .await?
872 {
873 found_cdc_scan = true;
874 }
875 }
876 Some(NodeBody::Project(_)) => {
878 for input in &stream_scan_fragment.nodes.input {
879 assert_parallelism(stream_scan_fragment, &input.node_body);
880 if self
881 .validate_cdc_table_inner(&input.node_body, table.id)
882 .await?
883 {
884 found_cdc_scan = true;
885 }
886 }
887 }
888 _ => {
889 bail!("Unexpected node body for stream cdc scan");
890 }
891 };
892 if !found_cdc_scan {
893 bail!("No stream cdc scan node found in stream scan fragment");
894 }
895 Ok(())
896 }
897
898 async fn validate_cdc_table_inner(
899 &self,
900 node_body: &Option<NodeBody>,
901 table_id: TableId,
902 ) -> MetaResult<bool> {
903 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
904 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
905 {
906 let options_with_secret = WithOptionsSecResolved::new(
907 cdc_table_desc.connect_properties.clone(),
908 cdc_table_desc.secret_refs.clone(),
909 );
910
911 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
912 props.init_from_pb_cdc_table_desc(cdc_table_desc);
913
914 let _enumerator = props
916 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
917 .await?;
918
919 tracing::debug!(?table_id, "validate cdc table success");
920 Ok(true)
921 } else {
922 Ok(false)
923 }
924 }
925
926 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
927 let migrated = self
928 .metadata_manager
929 .catalog_controller
930 .has_table_been_migrated(table_id)
931 .await?;
932 if !migrated {
933 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
934 } else {
935 Ok(())
936 }
937 }
938
939 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
942 pub async fn create_streaming_job(
943 &self,
944 mut streaming_job: StreamingJob,
945 fragment_graph: StreamFragmentGraphProto,
946 dependencies: HashSet<ObjectId>,
947 resource_type: streaming_job_resource_type::ResourceType,
948 if_not_exists: bool,
949 ) -> MetaResult<NotificationVersion> {
950 if let StreamingJob::Sink(sink) = &streaming_job
951 && let Some(target_table) = sink.target_table
952 {
953 self.validate_table_for_sink(target_table).await?;
954 }
955 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
956 let check_ret = self
957 .metadata_manager
958 .catalog_controller
959 .create_job_catalog(
960 &mut streaming_job,
961 &ctx,
962 &fragment_graph.parallelism,
963 fragment_graph.max_parallelism as _,
964 dependencies,
965 resource_type.clone(),
966 &fragment_graph.backfill_parallelism,
967 )
968 .await;
969 if let Err(meta_err) = check_ret {
970 if !if_not_exists {
971 return Err(meta_err);
972 }
973 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
974 if streaming_job.create_type() == CreateType::Foreground {
975 let database_id = streaming_job.database_id();
976 self.metadata_manager
977 .wait_streaming_job_finished(database_id, *job_id)
978 .await
979 } else {
980 Ok(IGNORED_NOTIFICATION_VERSION)
981 }
982 } else {
983 Err(meta_err)
984 };
985 }
986 let job_id = streaming_job.id();
987 tracing::debug!(
988 id = %job_id,
989 definition = streaming_job.definition(),
990 create_type = streaming_job.create_type().as_str_name(),
991 job_type = ?streaming_job.job_type(),
992 "starting streaming job",
993 );
994 let permit = self
996 .creating_streaming_job_permits
997 .semaphore
998 .clone()
999 .acquire_owned()
1000 .instrument_await("acquire_creating_streaming_job_permit")
1001 .await
1002 .unwrap();
1003 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1004
1005 let name = streaming_job.name();
1006 let definition = streaming_job.definition();
1007 let source_id = match &streaming_job {
1008 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1009 _ => None,
1010 };
1011
1012 match self
1014 .create_streaming_job_inner(ctx, streaming_job, fragment_graph, resource_type, permit)
1015 .await
1016 {
1017 Ok(version) => Ok(version),
1018 Err(err) => {
1019 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1020 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1021 id: job_id,
1022 name,
1023 definition,
1024 error: err.as_report().to_string(),
1025 };
1026 self.env.event_log_manager_ref().add_event_logs(vec![
1027 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1028 ]);
1029 let (aborted, _) = self
1030 .metadata_manager
1031 .catalog_controller
1032 .try_abort_creating_streaming_job(job_id, false)
1033 .await?;
1034 if aborted {
1035 tracing::warn!(id = %job_id, "aborted streaming job");
1036 if let Some(source_id) = source_id {
1038 self.source_manager
1039 .apply_source_change(SourceChange::DropSource {
1040 dropped_source_ids: vec![source_id],
1041 })
1042 .await;
1043 }
1044 }
1045 Err(err)
1046 }
1047 }
1048 }
1049
1050 #[await_tree::instrument(boxed)]
1051 async fn create_streaming_job_inner(
1052 &self,
1053 ctx: StreamContext,
1054 mut streaming_job: StreamingJob,
1055 fragment_graph: StreamFragmentGraphProto,
1056 resource_type: streaming_job_resource_type::ResourceType,
1057 permit: OwnedSemaphorePermit,
1058 ) -> MetaResult<NotificationVersion> {
1059 let mut fragment_graph =
1060 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1061 streaming_job.set_info_from_graph(&fragment_graph);
1062
1063 let incomplete_internal_tables = fragment_graph
1065 .incomplete_internal_tables()
1066 .into_values()
1067 .collect_vec();
1068 let table_id_map = self
1069 .metadata_manager
1070 .catalog_controller
1071 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1072 .await?;
1073 fragment_graph.refill_internal_table_ids(table_id_map);
1074
1075 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1077 let (ctx, stream_job_fragments) = self
1078 .build_stream_job(ctx, streaming_job, fragment_graph, resource_type)
1079 .await?;
1080
1081 let streaming_job = &ctx.streaming_job;
1082
1083 match streaming_job {
1084 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1085 self.validate_cdc_table(table, &stream_job_fragments)
1086 .await?;
1087 }
1088 StreamingJob::Table(Some(source), ..) => {
1089 self.source_manager.register_source(source).await?;
1091 let connector_name = source
1092 .get_with_properties()
1093 .get(UPSTREAM_SOURCE_KEY)
1094 .cloned();
1095 let attr = source.info.as_ref().map(|source_info| {
1096 jsonbb::json!({
1097 "format": source_info.format().as_str_name(),
1098 "encode": source_info.row_encode().as_str_name(),
1099 })
1100 });
1101 report_create_object(
1102 streaming_job.id(),
1103 "source",
1104 PbTelemetryDatabaseObject::Source,
1105 connector_name,
1106 attr,
1107 );
1108 }
1109 StreamingJob::Sink(sink) => {
1110 if sink.auto_refresh_schema_from_table.is_some() {
1111 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1112 }
1113 validate_sink(sink).await?;
1115 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1116 let attr = sink.format_desc.as_ref().map(|sink_info| {
1117 jsonbb::json!({
1118 "format": sink_info.format().as_str_name(),
1119 "encode": sink_info.encode().as_str_name(),
1120 })
1121 });
1122 report_create_object(
1123 streaming_job.id(),
1124 "sink",
1125 PbTelemetryDatabaseObject::Sink,
1126 connector_name,
1127 attr,
1128 );
1129 }
1130 StreamingJob::Source(source) => {
1131 self.source_manager.register_source(source).await?;
1133 let connector_name = source
1134 .get_with_properties()
1135 .get(UPSTREAM_SOURCE_KEY)
1136 .cloned();
1137 let attr = source.info.as_ref().map(|source_info| {
1138 jsonbb::json!({
1139 "format": source_info.format().as_str_name(),
1140 "encode": source_info.row_encode().as_str_name(),
1141 })
1142 });
1143 report_create_object(
1144 streaming_job.id(),
1145 "source",
1146 PbTelemetryDatabaseObject::Source,
1147 connector_name,
1148 attr,
1149 );
1150 }
1151 _ => {}
1152 }
1153
1154 let backfill_orders = ctx.fragment_backfill_ordering.to_meta_model();
1155 self.metadata_manager
1156 .catalog_controller
1157 .prepare_stream_job_fragments(
1158 &stream_job_fragments,
1159 streaming_job,
1160 false,
1161 Some(backfill_orders),
1162 )
1163 .await?;
1164
1165 let version = self
1167 .stream_manager
1168 .create_streaming_job(stream_job_fragments, ctx, permit)
1169 .await?;
1170
1171 Ok(version)
1172 }
1173
1174 pub async fn drop_object(
1176 &self,
1177 object_type: ObjectType,
1178 object_id: impl Into<ObjectId>,
1179 drop_mode: DropMode,
1180 ) -> MetaResult<NotificationVersion> {
1181 let object_id = object_id.into();
1182 let (release_ctx, version) = self
1183 .metadata_manager
1184 .catalog_controller
1185 .drop_object(object_type, object_id, drop_mode)
1186 .await?;
1187
1188 if object_type == ObjectType::Source {
1189 self.env
1190 .notification_manager_ref()
1191 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1192 }
1193
1194 let ReleaseContext {
1195 database_id,
1196 removed_streaming_job_ids,
1197 removed_state_table_ids,
1198 removed_source_ids,
1199 removed_secret_ids: secret_ids,
1200 removed_source_fragments,
1201 removed_actors,
1202 removed_fragments,
1203 removed_sink_fragment_by_targets,
1204 removed_iceberg_table_sinks,
1205 } = release_ctx;
1206
1207 let _guard = self.source_manager.pause_tick().await;
1208 self.stream_manager
1209 .drop_streaming_jobs(
1210 database_id,
1211 removed_actors.iter().map(|id| *id as _).collect(),
1212 removed_streaming_job_ids,
1213 removed_state_table_ids,
1214 removed_fragments.iter().map(|id| *id as _).collect(),
1215 removed_sink_fragment_by_targets
1216 .into_iter()
1217 .map(|(target, sinks)| {
1218 (target as _, sinks.into_iter().map(|id| id as _).collect())
1219 })
1220 .collect(),
1221 )
1222 .await;
1223
1224 self.source_manager
1227 .apply_source_change(SourceChange::DropSource {
1228 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1229 })
1230 .await;
1231
1232 let dropped_source_fragments = removed_source_fragments;
1235 self.source_manager
1236 .apply_source_change(SourceChange::DropMv {
1237 dropped_source_fragments,
1238 })
1239 .await;
1240
1241 let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1243 .iter()
1244 .map(|sink| sink.id)
1245 .collect();
1246
1247 for sink in removed_iceberg_table_sinks {
1248 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1249 .expect("Iceberg sink should be valid");
1250 let iceberg_sink =
1251 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1252 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1253 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1254 tracing::info!(
1255 "dropping iceberg table {} for dropped sink",
1256 table_identifier
1257 );
1258
1259 let _ = iceberg_catalog
1260 .drop_table(&table_identifier)
1261 .await
1262 .inspect_err(|err| {
1263 tracing::error!(
1264 "failed to drop iceberg table {} during cleanup: {}",
1265 table_identifier,
1266 err.as_report()
1267 );
1268 });
1269 }
1270 }
1271
1272 if !iceberg_sink_ids.is_empty() {
1274 self.sink_manager
1275 .stop_sink_coordinator(iceberg_sink_ids.clone())
1276 .await;
1277
1278 for sink_id in iceberg_sink_ids {
1279 self.iceberg_compaction_manager
1280 .clear_iceberg_commits_by_sink_id(sink_id);
1281 }
1282 }
1283
1284 for secret in secret_ids {
1286 LocalSecretManager::global().remove_secret(secret);
1287 }
1288 Ok(version)
1289 }
1290
1291 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1293 pub async fn replace_job(
1294 &self,
1295 mut streaming_job: StreamingJob,
1296 fragment_graph: StreamFragmentGraphProto,
1297 ) -> MetaResult<NotificationVersion> {
1298 match &streaming_job {
1299 StreamingJob::Table(..)
1300 | StreamingJob::Source(..)
1301 | StreamingJob::MaterializedView(..) => {}
1302 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1303 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1304 }
1305 }
1306
1307 let job_id = streaming_job.id();
1308
1309 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1310 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1311
1312 let original_max_parallelism = self
1314 .metadata_manager
1315 .get_job_max_parallelism(streaming_job.id())
1316 .await?;
1317 let fragment_graph = PbStreamFragmentGraph {
1318 max_parallelism: original_max_parallelism as _,
1319 ..fragment_graph
1320 };
1321
1322 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1324 streaming_job.set_info_from_graph(&fragment_graph);
1325
1326 let streaming_job = streaming_job;
1328
1329 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1330 let auto_refresh_schema_sinks = self
1331 .metadata_manager
1332 .catalog_controller
1333 .get_sink_auto_refresh_schema_from(table.id)
1334 .await?;
1335 if !auto_refresh_schema_sinks.is_empty() {
1336 let original_table_columns = self
1337 .metadata_manager
1338 .catalog_controller
1339 .get_table_columns(table.id)
1340 .await?;
1341 let mut original_table_column_ids: HashSet<_> = original_table_columns
1343 .iter()
1344 .map(|col| col.column_id())
1345 .collect();
1346 let newly_added_columns = table
1347 .columns
1348 .iter()
1349 .filter(|col| {
1350 !original_table_column_ids.remove(&ColumnId::new(
1351 col.column_desc.as_ref().unwrap().column_id as _,
1352 ))
1353 })
1354 .map(|col| ColumnCatalog::from(col.clone()))
1355 .collect_vec();
1356 if !original_table_column_ids.is_empty() {
1357 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1358 }
1359 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1360 for sink in auto_refresh_schema_sinks {
1361 let sink_job_fragments = self
1362 .metadata_manager
1363 .get_job_fragments_by_id(sink.id.as_job_id())
1364 .await?;
1365 if sink_job_fragments.fragments.len() != 1 {
1366 return Err(anyhow!(
1367 "auto schema refresh sink must have only one fragment, but got {}",
1368 sink_job_fragments.fragments.len()
1369 )
1370 .into());
1371 }
1372 let original_sink_fragment =
1373 sink_job_fragments.fragments.into_values().next().unwrap();
1374 let (new_sink_fragment, new_schema, new_log_store_table) =
1375 rewrite_refresh_schema_sink_fragment(
1376 &original_sink_fragment,
1377 &sink,
1378 &newly_added_columns,
1379 table,
1380 fragment_graph.table_fragment_id(),
1381 self.env.id_gen_manager(),
1382 self.env.actor_id_generator(),
1383 )?;
1384
1385 assert_eq!(
1386 original_sink_fragment.actors.len(),
1387 new_sink_fragment.actors.len()
1388 );
1389 let actor_status = (0..original_sink_fragment.actors.len())
1390 .map(|i| {
1391 let worker_node_id = sink_job_fragments.actor_status
1392 [&original_sink_fragment.actors[i].actor_id]
1393 .location
1394 .as_ref()
1395 .unwrap()
1396 .worker_node_id;
1397 (
1398 new_sink_fragment.actors[i].actor_id,
1399 PbActorStatus {
1400 location: Some(PbActorLocation { worker_node_id }),
1401 },
1402 )
1403 })
1404 .collect();
1405
1406 let streaming_job = StreamingJob::Sink(sink);
1407
1408 let tmp_sink_id = self
1409 .metadata_manager
1410 .catalog_controller
1411 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1412 .await?
1413 .as_sink_id();
1414 let StreamingJob::Sink(sink) = streaming_job else {
1415 unreachable!()
1416 };
1417
1418 sinks.push(AutoRefreshSchemaSinkContext {
1419 tmp_sink_id,
1420 original_sink: sink,
1421 original_fragment: original_sink_fragment,
1422 new_schema,
1423 newly_add_fields: newly_added_columns
1424 .iter()
1425 .map(|col| Field::from(&col.column_desc))
1426 .collect(),
1427 new_fragment: new_sink_fragment,
1428 new_log_store_table,
1429 actor_status,
1430 });
1431 }
1432 Some(sinks)
1433 } else {
1434 None
1435 }
1436 } else {
1437 None
1438 };
1439
1440 let tmp_id = self
1441 .metadata_manager
1442 .catalog_controller
1443 .create_job_catalog_for_replace(
1444 &streaming_job,
1445 Some(&ctx),
1446 fragment_graph.specified_parallelism().as_ref(),
1447 Some(fragment_graph.max_parallelism()),
1448 )
1449 .await?;
1450
1451 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1452 sinks
1453 .iter()
1454 .map(|sink| sink.tmp_sink_id.as_object_id())
1455 .collect_vec()
1456 });
1457
1458 tracing::debug!(id = %job_id, "building replace streaming job");
1459 let mut updated_sink_catalogs = vec![];
1460
1461 let mut drop_table_connector_ctx = None;
1462 let result: MetaResult<_> = try {
1463 let (mut ctx, mut stream_job_fragments) = self
1464 .build_replace_job(
1465 ctx,
1466 &streaming_job,
1467 fragment_graph,
1468 tmp_id,
1469 auto_refresh_schema_sinks,
1470 )
1471 .await?;
1472 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1473 let auto_refresh_schema_sink_finish_ctx =
1474 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1475 sinks
1476 .iter()
1477 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1478 tmp_sink_id: sink.tmp_sink_id,
1479 original_sink_id: sink.original_sink.id,
1480 columns: sink.new_schema.clone(),
1481 new_log_store_table: sink
1482 .new_log_store_table
1483 .as_ref()
1484 .map(|table| (table.id, table.columns.clone())),
1485 })
1486 .collect()
1487 });
1488
1489 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1491 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1492 let upstream_infos = self
1493 .metadata_manager
1494 .catalog_controller
1495 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1496 .await?;
1497 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1498
1499 for upstream_info in &upstream_infos {
1500 let upstream_fragment_id = upstream_info.sink_fragment_id;
1501 ctx.upstream_fragment_downstreams
1502 .entry(upstream_fragment_id)
1503 .or_default()
1504 .push(upstream_info.new_sink_downstream.clone());
1505 if upstream_info.sink_original_target_columns.is_empty() {
1506 updated_sink_catalogs.push(upstream_info.sink_id);
1507 }
1508 }
1509 }
1510
1511 let replace_upstream = ctx.replace_upstream.clone();
1512
1513 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1514 let empty_downstreams = FragmentDownstreamRelation::default();
1515 for sink in sinks {
1516 self.metadata_manager
1517 .catalog_controller
1518 .prepare_streaming_job(
1519 sink.tmp_sink_id.as_job_id(),
1520 || [&sink.new_fragment].into_iter(),
1521 &empty_downstreams,
1522 true,
1523 None,
1524 None,
1525 )
1526 .await?;
1527 }
1528 }
1529
1530 self.metadata_manager
1531 .catalog_controller
1532 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1533 .await?;
1534
1535 self.stream_manager
1536 .replace_stream_job(stream_job_fragments, ctx)
1537 .await?;
1538 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1539 };
1540
1541 match result {
1542 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1543 let version = self
1544 .metadata_manager
1545 .catalog_controller
1546 .finish_replace_streaming_job(
1547 tmp_id,
1548 streaming_job,
1549 replace_upstream,
1550 SinkIntoTableContext {
1551 updated_sink_catalogs,
1552 },
1553 drop_table_connector_ctx.as_ref(),
1554 auto_refresh_schema_sink_finish_ctx,
1555 )
1556 .await?;
1557 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1558 self.source_manager
1559 .apply_source_change(SourceChange::DropSource {
1560 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1561 })
1562 .await;
1563 }
1564 Ok(version)
1565 }
1566 Err(err) => {
1567 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1568 let _ = self.metadata_manager
1569 .catalog_controller
1570 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1571 .await.inspect_err(|err| {
1572 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1573 });
1574 Err(err)
1575 }
1576 }
1577 }
1578
1579 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1580 )]
1581 async fn drop_streaming_job(
1582 &self,
1583 job_id: StreamingJobId,
1584 drop_mode: DropMode,
1585 ) -> MetaResult<NotificationVersion> {
1586 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1587
1588 let (object_id, object_type) = match job_id {
1589 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1590 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1591 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1592 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1593 };
1594
1595 let job_status = self
1596 .metadata_manager
1597 .catalog_controller
1598 .get_streaming_job_status(job_id.id())
1599 .await?;
1600 let version = match job_status {
1601 JobStatus::Initial => {
1602 unreachable!(
1603 "Job with Initial status should not notify frontend and therefore should not arrive here"
1604 );
1605 }
1606 JobStatus::Creating => {
1607 self.stream_manager
1608 .cancel_streaming_jobs(vec![job_id.id()])
1609 .await?;
1610 IGNORED_NOTIFICATION_VERSION
1611 }
1612 JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1613 };
1614
1615 Ok(version)
1616 }
1617
1618 fn resolve_stream_parallelism(
1622 &self,
1623 specified: Option<NonZeroUsize>,
1624 max: NonZeroUsize,
1625 cluster_info: &StreamingClusterInfo,
1626 resource_group: String,
1627 ) -> MetaResult<NonZeroUsize> {
1628 let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1629 DdlController::resolve_stream_parallelism_inner(
1630 specified,
1631 max,
1632 available,
1633 &self.env.opts.default_parallelism,
1634 &resource_group,
1635 )
1636 }
1637
1638 fn resolve_stream_parallelism_inner(
1639 specified: Option<NonZeroUsize>,
1640 max: NonZeroUsize,
1641 available: Option<NonZeroUsize>,
1642 default_parallelism: &DefaultParallelism,
1643 resource_group: &str,
1644 ) -> MetaResult<NonZeroUsize> {
1645 let Some(available) = available else {
1646 bail_unavailable!(
1647 "no available slots to schedule in resource group \"{}\", \
1648 have you allocated any compute nodes within this resource group?",
1649 resource_group
1650 );
1651 };
1652
1653 if let Some(specified) = specified {
1654 if specified > max {
1655 bail_invalid_parameter!(
1656 "specified parallelism {} should not exceed max parallelism {}",
1657 specified,
1658 max,
1659 );
1660 }
1661 if specified > available {
1662 tracing::warn!(
1663 resource_group,
1664 specified_parallelism = specified.get(),
1665 available_parallelism = available.get(),
1666 "specified parallelism exceeds available slots, scheduling with specified value",
1667 );
1668 }
1669 return Ok(specified);
1670 }
1671
1672 let default_parallelism = match default_parallelism {
1674 DefaultParallelism::Full => available,
1675 DefaultParallelism::Default(num) => {
1676 if *num > available {
1677 tracing::warn!(
1678 resource_group,
1679 configured_parallelism = num.get(),
1680 available_parallelism = available.get(),
1681 "default parallelism exceeds available slots, scheduling with configured value",
1682 );
1683 }
1684 *num
1685 }
1686 };
1687
1688 if default_parallelism > max {
1689 tracing::warn!(
1690 max_parallelism = max.get(),
1691 resource_group,
1692 "default parallelism exceeds max parallelism, capping to max",
1693 );
1694 }
1695 Ok(default_parallelism.min(max))
1696 }
1697
1698 #[await_tree::instrument]
1704 pub(crate) async fn build_stream_job(
1705 &self,
1706 stream_ctx: StreamContext,
1707 mut stream_job: StreamingJob,
1708 fragment_graph: StreamFragmentGraph,
1709 resource_type: streaming_job_resource_type::ResourceType,
1710 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1711 let id = stream_job.id();
1712 let specified_parallelism = fragment_graph.specified_parallelism();
1713 let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1714 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1715
1716 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1718
1719 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1723 fragment_graph.collect_snapshot_backfill_info()?;
1724 assert!(
1725 snapshot_backfill_info
1726 .iter()
1727 .chain([&cross_db_snapshot_backfill_info])
1728 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1729 .all(|backfill_epoch| backfill_epoch.is_none()),
1730 "should not set backfill epoch when initially build the job: {:?} {:?}",
1731 snapshot_backfill_info,
1732 cross_db_snapshot_backfill_info
1733 );
1734
1735 let locality_fragment_state_table_mapping =
1736 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1737
1738 self.metadata_manager
1740 .catalog_controller
1741 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1742 .await?;
1743
1744 let upstream_table_ids = fragment_graph
1745 .dependent_table_ids()
1746 .iter()
1747 .filter(|id| {
1748 !cross_db_snapshot_backfill_info
1749 .upstream_mv_table_id_to_backfill_epoch
1750 .contains_key(*id)
1751 })
1752 .cloned()
1753 .collect();
1754
1755 let (upstream_root_fragments, existing_actor_location) = self
1756 .metadata_manager
1757 .get_upstream_root_fragments(&upstream_table_ids)
1758 .await?;
1759
1760 if snapshot_backfill_info.is_some() {
1761 match stream_job {
1762 StreamingJob::MaterializedView(_)
1763 | StreamingJob::Sink(_)
1764 | StreamingJob::Index(_, _) => {}
1765 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1766 return Err(
1767 anyhow!("snapshot_backfill not enabled for table and source").into(),
1768 );
1769 }
1770 }
1771 }
1772
1773 let upstream_actors = upstream_root_fragments
1774 .values()
1775 .map(|(fragment, _)| {
1776 (
1777 fragment.fragment_id,
1778 fragment.actors.keys().copied().collect(),
1779 )
1780 })
1781 .collect();
1782
1783 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1784 fragment_graph,
1785 FragmentGraphUpstreamContext {
1786 upstream_root_fragments,
1787 upstream_actor_location: existing_actor_location,
1788 },
1789 (&stream_job).into(),
1790 )?;
1791 let resource_group = if let Some(group) = resource_type.resource_group() {
1792 group
1793 } else {
1794 self.metadata_manager
1795 .get_database_resource_group(stream_job.database_id())
1796 .await?
1797 };
1798 let is_serverless_backfill = matches!(
1799 &resource_type,
1800 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1801 );
1802
1803 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1805
1806 let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1807 let parallelism = self.resolve_stream_parallelism(
1808 initial_parallelism,
1809 max_parallelism,
1810 &cluster_info,
1811 resource_group.clone(),
1812 )?;
1813
1814 let parallelism = if initial_parallelism.is_some() {
1815 parallelism.get()
1816 } else {
1817 let adaptive_strategy = match stream_ctx.adaptive_parallelism_strategy {
1819 Some(strategy) => strategy,
1820 None => self
1821 .env
1822 .system_params_reader()
1823 .await
1824 .adaptive_parallelism_strategy(),
1825 };
1826 adaptive_strategy.compute_target_parallelism(parallelism.get())
1827 };
1828
1829 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1830 let actor_graph_builder = ActorGraphBuilder::new(
1831 id,
1832 resource_group,
1833 complete_graph,
1834 cluster_info,
1835 parallelism,
1836 )?;
1837
1838 let ActorGraphBuildResult {
1839 graph,
1840 downstream_fragment_relations,
1841 building_locations,
1842 upstream_fragment_downstreams,
1843 new_no_shuffle,
1844 replace_upstream,
1845 ..
1846 } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1847 assert!(replace_upstream.is_empty());
1848
1849 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1856 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1857 _ => TableParallelism::Fixed(parallelism.get()),
1858 };
1859
1860 let stream_job_fragments = StreamJobFragments::new(
1861 id,
1862 graph,
1863 &building_locations.actor_locations,
1864 stream_ctx.clone(),
1865 table_parallelism,
1866 max_parallelism.get(),
1867 );
1868
1869 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1870 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1871 }
1872
1873 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1874 && let Ok(table_id) = sink.get_target_table()
1875 {
1876 let tables = self
1877 .metadata_manager
1878 .get_table_catalog_by_ids(&[*table_id])
1879 .await?;
1880 let target_table = tables
1881 .first()
1882 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1883 let sink_fragment = stream_job_fragments
1884 .sink_fragment()
1885 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1886 let mview_fragment_id = self
1887 .metadata_manager
1888 .catalog_controller
1889 .get_mview_fragment_by_id(table_id.as_job_id())
1890 .await?;
1891 let upstream_sink_info = build_upstream_sink_info(
1892 sink.id,
1893 sink.original_target_columns.clone(),
1894 sink_fragment.fragment_id as _,
1895 target_table,
1896 mview_fragment_id,
1897 )?;
1898 Some(upstream_sink_info)
1899 } else {
1900 None
1901 };
1902
1903 let mut cdc_table_snapshot_splits = None;
1904 if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1905 && let Some((_, stream_cdc_scan)) =
1906 parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1907 {
1908 {
1909 let splits = try_init_parallel_cdc_table_snapshot_splits(
1911 table.id,
1912 stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1913 self.env.meta_store_ref(),
1914 stream_cdc_scan.options.as_ref().unwrap(),
1915 self.env.opts.cdc_table_split_init_insert_batch_size,
1916 self.env.opts.cdc_table_split_init_sleep_interval_splits,
1917 self.env.opts.cdc_table_split_init_sleep_duration_millis,
1918 )
1919 .await?;
1920 cdc_table_snapshot_splits = Some(splits);
1921 }
1922 }
1923
1924 let ctx = CreateStreamingJobContext {
1925 upstream_fragment_downstreams,
1926 new_no_shuffle,
1927 upstream_actors,
1928 building_locations,
1929 definition: stream_job.definition(),
1930 create_type: stream_job.create_type(),
1931 job_type: (&stream_job).into(),
1932 streaming_job: stream_job,
1933 new_upstream_sink,
1934 option: CreateStreamingJobOption {},
1935 snapshot_backfill_info,
1936 cross_db_snapshot_backfill_info,
1937 fragment_backfill_ordering,
1938 locality_fragment_state_table_mapping,
1939 cdc_table_snapshot_splits,
1940 is_serverless_backfill,
1941 };
1942
1943 Ok((
1944 ctx,
1945 StreamJobFragmentsToCreate {
1946 inner: stream_job_fragments,
1947 downstreams: downstream_fragment_relations,
1948 },
1949 ))
1950 }
1951
1952 pub(crate) async fn build_replace_job(
1958 &self,
1959 stream_ctx: StreamContext,
1960 stream_job: &StreamingJob,
1961 mut fragment_graph: StreamFragmentGraph,
1962 tmp_job_id: JobId,
1963 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1964 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1965 match &stream_job {
1966 StreamingJob::Table(..)
1967 | StreamingJob::Source(..)
1968 | StreamingJob::MaterializedView(..) => {}
1969 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1970 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1971 }
1972 }
1973
1974 let id = stream_job.id();
1975
1976 let mut drop_table_associated_source_id = None;
1978 if let StreamingJob::Table(None, _, _) = &stream_job {
1979 drop_table_associated_source_id = self
1980 .metadata_manager
1981 .get_table_associated_source_id(id.as_mv_table_id())
1982 .await?;
1983 }
1984
1985 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1986 let old_internal_table_ids = old_fragments.internal_table_ids();
1987
1988 let mut drop_table_connector_ctx = None;
1990 if let Some(to_remove_source_id) = drop_table_associated_source_id {
1991 debug_assert!(old_internal_table_ids.len() == 1);
1993
1994 drop_table_connector_ctx = Some(DropTableConnectorContext {
1995 to_change_streaming_job_id: id,
1998 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
2000 });
2001 } else if stream_job.is_materialized_view() {
2002 let old_fragments_upstreams = self
2005 .metadata_manager
2006 .catalog_controller
2007 .upstream_fragments(old_fragments.fragment_ids())
2008 .await?;
2009
2010 let old_state_graph =
2011 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2012 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2013 let result = state_match::match_graph(&new_state_graph, &old_state_graph)
2014 .context("incompatible altering on the streaming job states")?;
2015
2016 fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2017 fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2018 } else {
2019 let old_internal_tables = self
2022 .metadata_manager
2023 .get_table_catalog_by_ids(&old_internal_table_ids)
2024 .await?;
2025 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2026 }
2027
2028 let original_root_fragment = old_fragments
2031 .root_fragment()
2032 .expect("root fragment not found");
2033
2034 let job_type = StreamingJobType::from(stream_job);
2035
2036 let (mut downstream_fragments, mut downstream_actor_location) =
2038 self.metadata_manager.get_downstream_fragments(id).await?;
2039
2040 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2041 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2042 .iter()
2043 .map(|sink| sink.original_fragment.fragment_id)
2044 .collect();
2045 for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2046 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2047 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2048 }) {
2049 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2050 for actor_id in downstream_fragment.actors.keys() {
2051 downstream_actor_location.remove(actor_id);
2052 }
2053 for (actor_id, status) in &sink.actor_status {
2054 downstream_actor_location
2055 .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2056 }
2057
2058 *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2059 *nodes = sink.new_fragment.nodes.clone();
2060 }
2061 }
2062 assert!(remaining_fragment.is_empty());
2063 }
2064
2065 let complete_graph = match &job_type {
2067 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2068 CompleteStreamFragmentGraph::with_downstreams(
2069 fragment_graph,
2070 FragmentGraphDownstreamContext {
2071 original_root_fragment_id: original_root_fragment.fragment_id,
2072 downstream_fragments,
2073 downstream_actor_location,
2074 },
2075 job_type,
2076 )?
2077 }
2078 StreamingJobType::Table(TableJobType::SharedCdcSource)
2079 | StreamingJobType::MaterializedView => {
2080 let (upstream_root_fragments, upstream_actor_location) = self
2082 .metadata_manager
2083 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2084 .await?;
2085
2086 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2087 fragment_graph,
2088 FragmentGraphUpstreamContext {
2089 upstream_root_fragments,
2090 upstream_actor_location,
2091 },
2092 FragmentGraphDownstreamContext {
2093 original_root_fragment_id: original_root_fragment.fragment_id,
2094 downstream_fragments,
2095 downstream_actor_location,
2096 },
2097 job_type,
2098 )?
2099 }
2100 _ => unreachable!(),
2101 };
2102
2103 let resource_group = self
2104 .metadata_manager
2105 .get_existing_job_resource_group(id)
2106 .await?;
2107
2108 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2110
2111 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2114 .expect("The number of actors in the original table fragment should be greater than 0");
2115
2116 let actor_graph_builder = ActorGraphBuilder::new(
2117 id,
2118 resource_group,
2119 complete_graph,
2120 cluster_info,
2121 parallelism,
2122 )?;
2123
2124 let ActorGraphBuildResult {
2125 graph,
2126 downstream_fragment_relations,
2127 building_locations,
2128 upstream_fragment_downstreams,
2129 mut replace_upstream,
2130 new_no_shuffle,
2131 ..
2132 } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2133
2134 if matches!(
2136 job_type,
2137 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2138 ) {
2139 assert!(upstream_fragment_downstreams.is_empty());
2140 }
2141
2142 let stream_job_fragments = StreamJobFragments::new(
2146 tmp_job_id,
2147 graph,
2148 &building_locations.actor_locations,
2149 stream_ctx,
2150 old_fragments.assigned_parallelism,
2151 old_fragments.max_parallelism,
2152 );
2153
2154 if let Some(sinks) = &auto_refresh_schema_sinks {
2155 for sink in sinks {
2156 replace_upstream
2157 .remove(&sink.new_fragment.fragment_id)
2158 .expect("should exist");
2159 }
2160 }
2161
2162 let ctx = ReplaceStreamJobContext {
2166 old_fragments,
2167 replace_upstream,
2168 new_no_shuffle,
2169 upstream_fragment_downstreams,
2170 building_locations,
2171 streaming_job: stream_job.clone(),
2172 tmp_id: tmp_job_id,
2173 drop_table_connector_ctx,
2174 auto_refresh_schema_sinks,
2175 };
2176
2177 Ok((
2178 ctx,
2179 StreamJobFragmentsToCreate {
2180 inner: stream_job_fragments,
2181 downstreams: downstream_fragment_relations,
2182 },
2183 ))
2184 }
2185
2186 async fn alter_name(
2187 &self,
2188 relation: alter_name_request::Object,
2189 new_name: &str,
2190 ) -> MetaResult<NotificationVersion> {
2191 let (obj_type, id) = match relation {
2192 alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2193 alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2194 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2195 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2196 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2197 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2198 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2199 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2200 };
2201 self.metadata_manager
2202 .catalog_controller
2203 .alter_name(obj_type, id, new_name)
2204 .await
2205 }
2206
2207 async fn alter_swap_rename(
2208 &self,
2209 object: alter_swap_rename_request::Object,
2210 ) -> MetaResult<NotificationVersion> {
2211 let (obj_type, src_id, dst_id) = match object {
2212 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2213 alter_swap_rename_request::Object::Table(objs) => {
2214 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2215 (ObjectType::Table, src_id, dst_id)
2216 }
2217 alter_swap_rename_request::Object::View(objs) => {
2218 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2219 (ObjectType::View, src_id, dst_id)
2220 }
2221 alter_swap_rename_request::Object::Source(objs) => {
2222 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2223 (ObjectType::Source, src_id, dst_id)
2224 }
2225 alter_swap_rename_request::Object::Sink(objs) => {
2226 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2227 (ObjectType::Sink, src_id, dst_id)
2228 }
2229 alter_swap_rename_request::Object::Subscription(objs) => {
2230 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2231 (ObjectType::Subscription, src_id, dst_id)
2232 }
2233 };
2234
2235 self.metadata_manager
2236 .catalog_controller
2237 .alter_swap_rename(obj_type, src_id, dst_id)
2238 .await
2239 }
2240
2241 async fn alter_owner(
2242 &self,
2243 object: Object,
2244 owner_id: UserId,
2245 ) -> MetaResult<NotificationVersion> {
2246 let (obj_type, id) = match object {
2247 Object::TableId(id) => (ObjectType::Table, id),
2248 Object::ViewId(id) => (ObjectType::View, id),
2249 Object::SourceId(id) => (ObjectType::Source, id),
2250 Object::SinkId(id) => (ObjectType::Sink, id),
2251 Object::SchemaId(id) => (ObjectType::Schema, id),
2252 Object::DatabaseId(id) => (ObjectType::Database, id),
2253 Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2254 Object::ConnectionId(id) => (ObjectType::Connection, id),
2255 Object::FunctionId(id) => (ObjectType::Function, id),
2256 Object::SecretId(id) => (ObjectType::Secret, id),
2257 };
2258 self.metadata_manager
2259 .catalog_controller
2260 .alter_owner(obj_type, id.into(), owner_id as _)
2261 .await
2262 }
2263
2264 async fn alter_set_schema(
2265 &self,
2266 object: alter_set_schema_request::Object,
2267 new_schema_id: SchemaId,
2268 ) -> MetaResult<NotificationVersion> {
2269 let (obj_type, id) = match object {
2270 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2271 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2272 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2273 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2274 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2275 alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2276 alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2277 };
2278 self.metadata_manager
2279 .catalog_controller
2280 .alter_schema(obj_type, id.into(), new_schema_id as _)
2281 .await
2282 }
2283
2284 pub async fn wait(&self) -> MetaResult<()> {
2285 let timeout_ms = 30 * 60 * 1000;
2286 for _ in 0..timeout_ms {
2287 if self
2288 .metadata_manager
2289 .catalog_controller
2290 .list_background_creating_jobs(true, None)
2291 .await?
2292 .is_empty()
2293 {
2294 return Ok(());
2295 }
2296
2297 sleep(Duration::from_millis(1)).await;
2298 }
2299 Err(MetaError::cancelled(format!(
2300 "timeout after {timeout_ms}ms"
2301 )))
2302 }
2303
2304 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2305 self.metadata_manager
2306 .catalog_controller
2307 .comment_on(comment)
2308 .await
2309 }
2310
2311 async fn alter_streaming_job_config(
2312 &self,
2313 job_id: JobId,
2314 entries_to_add: HashMap<String, String>,
2315 keys_to_remove: Vec<String>,
2316 ) -> MetaResult<NotificationVersion> {
2317 self.metadata_manager
2318 .catalog_controller
2319 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2320 .await
2321 }
2322}
2323
2324fn report_create_object(
2325 job_id: JobId,
2326 event_name: &str,
2327 obj_type: PbTelemetryDatabaseObject,
2328 connector_name: Option<String>,
2329 attr_info: Option<jsonbb::Value>,
2330) {
2331 report_event(
2332 PbTelemetryEventStage::CreateStreamJob,
2333 event_name,
2334 job_id.as_raw_id() as _,
2335 connector_name,
2336 Some(obj_type),
2337 attr_info,
2338 );
2339}
2340
2341pub fn build_upstream_sink_info(
2342 sink_id: SinkId,
2343 original_target_columns: Vec<PbColumnCatalog>,
2344 sink_fragment_id: FragmentId,
2345 target_table: &PbTable,
2346 target_fragment_id: FragmentId,
2347) -> MetaResult<UpstreamSinkInfo> {
2348 let sink_columns = if !original_target_columns.is_empty() {
2349 original_target_columns.clone()
2350 } else {
2351 target_table.columns.clone()
2356 };
2357
2358 let sink_output_fields = sink_columns
2359 .iter()
2360 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2361 .collect_vec();
2362 let output_indices = (0..sink_output_fields.len())
2363 .map(|i| i as u32)
2364 .collect_vec();
2365
2366 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2367 let sink_idx_by_col_id = sink_columns
2368 .iter()
2369 .enumerate()
2370 .map(|(idx, col)| {
2371 let column_id = col.column_desc.as_ref().unwrap().column_id;
2372 (column_id, idx as u32)
2373 })
2374 .collect::<HashMap<_, _>>();
2375 target_table
2376 .distribution_key
2377 .iter()
2378 .map(|dist_idx| {
2379 let column_id = target_table.columns[*dist_idx as usize]
2380 .column_desc
2381 .as_ref()
2382 .unwrap()
2383 .column_id;
2384 let sink_idx = sink_idx_by_col_id
2385 .get(&column_id)
2386 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2387 Ok(*sink_idx)
2388 })
2389 .collect::<anyhow::Result<Vec<_>>>()?
2390 };
2391 let dist_key_indices =
2392 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2393 let downstream_fragment_id = target_fragment_id as _;
2394 let new_downstream_relation = DownstreamFragmentRelation {
2395 downstream_fragment_id,
2396 dispatcher_type: DispatcherType::Hash,
2397 dist_key_indices,
2398 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2399 };
2400 let current_target_columns = target_table.get_columns();
2401 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2402 Ok(UpstreamSinkInfo {
2403 sink_id,
2404 sink_fragment_id: sink_fragment_id as _,
2405 sink_output_fields,
2406 sink_original_target_columns: original_target_columns,
2407 project_exprs,
2408 new_sink_downstream: new_downstream_relation,
2409 })
2410}
2411
2412pub fn refill_upstream_sink_union_in_table(
2413 union_fragment_root: &mut PbStreamNode,
2414 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2415) {
2416 visit_stream_node_cont_mut(union_fragment_root, |node| {
2417 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2418 let init_upstreams = upstream_sink_infos
2419 .iter()
2420 .map(|info| PbUpstreamSinkInfo {
2421 upstream_fragment_id: info.sink_fragment_id,
2422 sink_output_schema: info.sink_output_fields.clone(),
2423 project_exprs: info.project_exprs.clone(),
2424 })
2425 .collect();
2426 upstream_sink_union.init_upstreams = init_upstreams;
2427 false
2428 } else {
2429 true
2430 }
2431 });
2432}
2433
2434#[cfg(test)]
2435mod tests {
2436 use std::num::NonZeroUsize;
2437
2438 use super::*;
2439
2440 #[test]
2441 fn test_specified_parallelism_exceeds_available() {
2442 let result = DdlController::resolve_stream_parallelism_inner(
2443 Some(NonZeroUsize::new(100).unwrap()),
2444 NonZeroUsize::new(256).unwrap(),
2445 Some(NonZeroUsize::new(4).unwrap()),
2446 &DefaultParallelism::Full,
2447 "default",
2448 )
2449 .unwrap();
2450 assert_eq!(result.get(), 100);
2451 }
2452
2453 #[test]
2454 fn test_allows_default_parallelism_over_available() {
2455 let result = DdlController::resolve_stream_parallelism_inner(
2456 None,
2457 NonZeroUsize::new(256).unwrap(),
2458 Some(NonZeroUsize::new(4).unwrap()),
2459 &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2460 "default",
2461 )
2462 .unwrap();
2463 assert_eq!(result.get(), 50);
2464 }
2465
2466 #[test]
2467 fn test_full_parallelism_capped_by_max() {
2468 let result = DdlController::resolve_stream_parallelism_inner(
2469 None,
2470 NonZeroUsize::new(6).unwrap(),
2471 Some(NonZeroUsize::new(10).unwrap()),
2472 &DefaultParallelism::Full,
2473 "default",
2474 )
2475 .unwrap();
2476 assert_eq!(result.get(), 6);
2477 }
2478
2479 #[test]
2480 fn test_no_available_slots_returns_error() {
2481 let result = DdlController::resolve_stream_parallelism_inner(
2482 None,
2483 NonZeroUsize::new(4).unwrap(),
2484 None,
2485 &DefaultParallelism::Full,
2486 "default",
2487 );
2488 assert!(matches!(
2489 result,
2490 Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2491 ));
2492 }
2493
2494 #[test]
2495 fn test_specified_over_max_returns_error() {
2496 let result = DdlController::resolve_stream_parallelism_inner(
2497 Some(NonZeroUsize::new(8).unwrap()),
2498 NonZeroUsize::new(4).unwrap(),
2499 Some(NonZeroUsize::new(10).unwrap()),
2500 &DefaultParallelism::Full,
2501 "default",
2502 );
2503 assert!(matches!(
2504 result,
2505 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2506 ));
2507 }
2508}