1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50 Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
51 Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57 alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64 StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::{OwnedSemaphorePermit, Semaphore};
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::cluster::StreamingClusterInfo;
76use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
77use crate::controller::utils::build_select_node_list;
78use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
79use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
80use crate::manager::sink_coordination::SinkCoordinatorManager;
81use crate::manager::{
82 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
83 NotificationVersion, StreamingJob, StreamingJobType,
84};
85use crate::model::{
86 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
87 FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
88 TableParallelism,
89};
90use crate::stream::cdc::{
91 parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
92};
93use crate::stream::{
94 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
95 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
96 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
97 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
98 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
99 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
100};
101use crate::telemetry::report_event;
102use crate::{MetaError, MetaResult};
103
104#[derive(PartialEq)]
105pub enum DropMode {
106 Restrict,
107 Cascade,
108}
109
110impl DropMode {
111 pub fn from_request_setting(cascade: bool) -> DropMode {
112 if cascade {
113 DropMode::Cascade
114 } else {
115 DropMode::Restrict
116 }
117 }
118}
119
120#[derive(strum::AsRefStr)]
121pub enum StreamingJobId {
122 MaterializedView(TableId),
123 Sink(SinkId),
124 Table(Option<SourceId>, TableId),
125 Index(IndexId),
126}
127
128impl std::fmt::Display for StreamingJobId {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 write!(f, "{}", self.as_ref())?;
131 write!(f, "({})", self.id())
132 }
133}
134
135impl StreamingJobId {
136 fn id(&self) -> JobId {
137 match self {
138 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
139 StreamingJobId::Index(id) => id.as_job_id(),
140 StreamingJobId::Sink(id) => id.as_job_id(),
141 }
142 }
143}
144
145pub struct ReplaceStreamJobInfo {
148 pub streaming_job: StreamingJob,
149 pub fragment_graph: StreamFragmentGraphProto,
150}
151
152#[derive(Display)]
153pub enum DdlCommand {
154 CreateDatabase(Database),
155 DropDatabase(DatabaseId),
156 CreateSchema(Schema),
157 DropSchema(SchemaId, DropMode),
158 CreateNonSharedSource(Source),
159 DropSource(SourceId, DropMode),
160 ResetSource(SourceId),
161 CreateFunction(Function),
162 DropFunction(FunctionId, DropMode),
163 CreateView(View, HashSet<ObjectId>),
164 DropView(ViewId, DropMode),
165 CreateStreamingJob {
166 stream_job: StreamingJob,
167 fragment_graph: StreamFragmentGraphProto,
168 dependencies: HashSet<ObjectId>,
169 resource_type: streaming_job_resource_type::ResourceType,
170 if_not_exists: bool,
171 },
172 DropStreamingJob {
173 job_id: StreamingJobId,
174 drop_mode: DropMode,
175 },
176 AlterName(alter_name_request::Object, String),
177 AlterSwapRename(alter_swap_rename_request::Object),
178 ReplaceStreamJob(ReplaceStreamJobInfo),
179 AlterNonSharedSource(Source),
180 AlterObjectOwner(Object, UserId),
181 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
182 CreateConnection(Connection),
183 DropConnection(ConnectionId, DropMode),
184 CreateSecret(Secret),
185 AlterSecret(Secret),
186 DropSecret(SecretId, DropMode),
187 CommentOn(Comment),
188 CreateSubscription(Subscription),
189 DropSubscription(SubscriptionId, DropMode),
190 AlterSubscriptionRetention {
191 subscription_id: SubscriptionId,
192 retention_seconds: u64,
193 definition: String,
194 },
195 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
196 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
197}
198
199impl DdlCommand {
200 fn object(&self) -> Either<String, ObjectId> {
202 use Either::*;
203 match self {
204 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
205 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
206 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
207 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
208 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
209 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
210 DdlCommand::ResetSource(id) => Right(id.as_object_id()),
211 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
212 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
213 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
214 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
215 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
216 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
217 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
218 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
219 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
220 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
221 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
222 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
223 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
224 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
225 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
226 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
227 DdlCommand::DropSecret(id, _) => Right(id.as_object_id()),
228 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
229 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
230 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
231 DdlCommand::AlterSubscriptionRetention {
232 subscription_id, ..
233 } => Right(subscription_id.as_object_id()),
234 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
235 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
236 }
237 }
238
239 fn allow_in_recovery(&self) -> bool {
240 match self {
241 DdlCommand::DropDatabase(_)
242 | DdlCommand::DropSchema(_, _)
243 | DdlCommand::DropSource(_, _)
244 | DdlCommand::DropFunction(_, _)
245 | DdlCommand::DropView(_, _)
246 | DdlCommand::DropStreamingJob { .. }
247 | DdlCommand::DropConnection(_, _)
248 | DdlCommand::DropSecret(_, _)
249 | DdlCommand::DropSubscription(_, _)
250 | DdlCommand::AlterName(_, _)
251 | DdlCommand::AlterObjectOwner(_, _)
252 | DdlCommand::AlterSetSchema(_, _)
253 | DdlCommand::CreateDatabase(_)
254 | DdlCommand::CreateSchema(_)
255 | DdlCommand::CreateFunction(_)
256 | DdlCommand::CreateView(_, _)
257 | DdlCommand::CreateConnection(_)
258 | DdlCommand::CommentOn(_)
259 | DdlCommand::CreateSecret(_)
260 | DdlCommand::AlterSecret(_)
261 | DdlCommand::AlterSwapRename(_)
262 | DdlCommand::AlterDatabaseParam(_, _)
263 | DdlCommand::AlterStreamingJobConfig(_, _, _)
264 | DdlCommand::AlterSubscriptionRetention { .. } => true,
265 DdlCommand::CreateStreamingJob { .. }
266 | DdlCommand::CreateNonSharedSource(_)
267 | DdlCommand::ReplaceStreamJob(_)
268 | DdlCommand::AlterNonSharedSource(_)
269 | DdlCommand::ResetSource(_)
270 | DdlCommand::CreateSubscription(_) => false,
271 }
272 }
273}
274
275#[derive(Clone)]
276pub struct DdlController {
277 pub(crate) env: MetaSrvEnv,
278
279 pub(crate) metadata_manager: MetadataManager,
280 pub(crate) stream_manager: GlobalStreamManagerRef,
281 pub(crate) source_manager: SourceManagerRef,
282 barrier_manager: BarrierManagerRef,
283 sink_manager: SinkCoordinatorManager,
284 iceberg_compaction_manager: IcebergCompactionManagerRef,
285
286 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
288
289 seq: Arc<AtomicU64>,
291}
292
293#[derive(Clone)]
294pub struct CreatingStreamingJobPermit {
295 pub(crate) semaphore: Arc<Semaphore>,
296}
297
298impl CreatingStreamingJobPermit {
299 async fn new(env: &MetaSrvEnv) -> Self {
300 let mut permits = env
301 .system_params_reader()
302 .await
303 .max_concurrent_creating_streaming_jobs() as usize;
304 if permits == 0 {
305 permits = Semaphore::MAX_PERMITS;
307 }
308 let semaphore = Arc::new(Semaphore::new(permits));
309
310 let (local_notification_tx, mut local_notification_rx) =
311 tokio::sync::mpsc::unbounded_channel();
312 env.notification_manager()
313 .insert_local_sender(local_notification_tx);
314 let semaphore_clone = semaphore.clone();
315 tokio::spawn(async move {
316 while let Some(notification) = local_notification_rx.recv().await {
317 let LocalNotification::SystemParamsChange(p) = ¬ification else {
318 continue;
319 };
320 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
321 if new_permits == 0 {
322 new_permits = Semaphore::MAX_PERMITS;
323 }
324 match permits.cmp(&new_permits) {
325 Ordering::Less => {
326 semaphore_clone.add_permits(new_permits - permits);
327 }
328 Ordering::Equal => continue,
329 Ordering::Greater => {
330 let to_release = permits - new_permits;
331 let reduced = semaphore_clone.forget_permits(to_release);
332 if reduced != to_release {
334 tracing::warn!(
335 "no enough permits to release, expected {}, but reduced {}",
336 to_release,
337 reduced
338 );
339 }
340 }
341 }
342 tracing::info!(
343 "max_concurrent_creating_streaming_jobs changed from {} to {}",
344 permits,
345 new_permits
346 );
347 permits = new_permits;
348 }
349 });
350
351 Self { semaphore }
352 }
353}
354
355impl DdlController {
356 pub async fn new(
357 env: MetaSrvEnv,
358 metadata_manager: MetadataManager,
359 stream_manager: GlobalStreamManagerRef,
360 source_manager: SourceManagerRef,
361 barrier_manager: BarrierManagerRef,
362 sink_manager: SinkCoordinatorManager,
363 iceberg_compaction_manager: IcebergCompactionManagerRef,
364 ) -> Self {
365 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
366 Self {
367 env,
368 metadata_manager,
369 stream_manager,
370 source_manager,
371 barrier_manager,
372 sink_manager,
373 iceberg_compaction_manager,
374 creating_streaming_job_permits,
375 seq: Arc::new(AtomicU64::new(0)),
376 }
377 }
378
379 pub fn next_seq(&self) -> u64 {
381 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
383 }
384
385 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
392 if !command.allow_in_recovery() {
393 self.barrier_manager.check_status_running()?;
394 }
395
396 let await_tree_key = format!("DDL Command {}", self.next_seq());
397 let await_tree_span = await_tree::span!("{command}({})", command.object());
398
399 let ctrl = self.clone();
400 let fut = async move {
401 match command {
402 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
403 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
404 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
405 DdlCommand::DropSchema(schema_id, drop_mode) => {
406 ctrl.drop_schema(schema_id, drop_mode).await
407 }
408 DdlCommand::CreateNonSharedSource(source) => {
409 ctrl.create_non_shared_source(source).await
410 }
411 DdlCommand::DropSource(source_id, drop_mode) => {
412 ctrl.drop_source(source_id, drop_mode).await
413 }
414 DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
415 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
416 DdlCommand::DropFunction(function_id, drop_mode) => {
417 ctrl.drop_function(function_id, drop_mode).await
418 }
419 DdlCommand::CreateView(view, dependencies) => {
420 ctrl.create_view(view, dependencies).await
421 }
422 DdlCommand::DropView(view_id, drop_mode) => {
423 ctrl.drop_view(view_id, drop_mode).await
424 }
425 DdlCommand::CreateStreamingJob {
426 stream_job,
427 fragment_graph,
428 dependencies,
429 resource_type,
430 if_not_exists,
431 } => {
432 ctrl.create_streaming_job(
433 stream_job,
434 fragment_graph,
435 dependencies,
436 resource_type,
437 if_not_exists,
438 )
439 .await
440 }
441 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
442 ctrl.drop_streaming_job(job_id, drop_mode).await
443 }
444 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
445 streaming_job,
446 fragment_graph,
447 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
448 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
449 DdlCommand::AlterObjectOwner(object, owner_id) => {
450 ctrl.alter_owner(object, owner_id).await
451 }
452 DdlCommand::AlterSetSchema(object, new_schema_id) => {
453 ctrl.alter_set_schema(object, new_schema_id).await
454 }
455 DdlCommand::CreateConnection(connection) => {
456 ctrl.create_connection(connection).await
457 }
458 DdlCommand::DropConnection(connection_id, drop_mode) => {
459 ctrl.drop_connection(connection_id, drop_mode).await
460 }
461 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
462 DdlCommand::DropSecret(secret_id, drop_mode) => {
463 ctrl.drop_secret(secret_id, drop_mode).await
464 }
465 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
466 DdlCommand::AlterNonSharedSource(source) => {
467 ctrl.alter_non_shared_source(source).await
468 }
469 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
470 DdlCommand::CreateSubscription(subscription) => {
471 ctrl.create_subscription(subscription).await
472 }
473 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
474 ctrl.drop_subscription(subscription_id, drop_mode).await
475 }
476 DdlCommand::AlterSubscriptionRetention {
477 subscription_id,
478 retention_seconds,
479 definition,
480 } => {
481 ctrl.alter_subscription_retention(
482 subscription_id,
483 retention_seconds,
484 definition,
485 )
486 .await
487 }
488 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
489 DdlCommand::AlterDatabaseParam(database_id, param) => {
490 ctrl.alter_database_param(database_id, param).await
491 }
492 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
493 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
494 .await
495 }
496 }
497 }
498 .in_current_span();
499 let fut = (self.env.await_tree_reg())
500 .register(await_tree_key, await_tree_span)
501 .instrument(Box::pin(fut));
502 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
503 Ok(Some(WaitVersion {
504 catalog_version: notification_version,
505 hummock_version_id: self.barrier_manager.get_hummock_version_id().await,
506 }))
507 }
508
509 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
510 self.barrier_manager.get_ddl_progress().await
511 }
512
513 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
514 let (version, updated_db) = self
515 .metadata_manager
516 .catalog_controller
517 .create_database(database)
518 .await?;
519 self.barrier_manager
521 .update_database_barrier(
522 updated_db.database_id,
523 updated_db.barrier_interval_ms.map(|v| v as u32),
524 updated_db.checkpoint_frequency.map(|v| v as u64),
525 )
526 .await?;
527 Ok(version)
528 }
529
530 #[tracing::instrument(skip(self), level = "debug")]
531 pub async fn reschedule_streaming_job(
532 &self,
533 job_id: JobId,
534 target: ReschedulePolicy,
535 mut deferred: bool,
536 ) -> MetaResult<()> {
537 tracing::info!("altering parallelism for job {}", job_id);
538 if self.barrier_manager.check_status_running().is_err() {
539 tracing::info!(
540 "alter parallelism is set to deferred mode because the system is in recovery state"
541 );
542 deferred = true;
543 }
544
545 self.stream_manager
546 .reschedule_streaming_job(job_id, target, deferred)
547 .await
548 }
549
550 pub async fn reschedule_streaming_job_backfill_parallelism(
551 &self,
552 job_id: JobId,
553 parallelism: Option<StreamingParallelism>,
554 mut deferred: bool,
555 ) -> MetaResult<()> {
556 tracing::info!("altering backfill parallelism for job {}", job_id);
557 if self.barrier_manager.check_status_running().is_err() {
558 tracing::info!(
559 "alter backfill parallelism is set to deferred mode because the system is in recovery state"
560 );
561 deferred = true;
562 }
563
564 self.stream_manager
565 .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
566 .await
567 }
568
569 pub async fn reschedule_cdc_table_backfill(
570 &self,
571 job_id: JobId,
572 target: ReschedulePolicy,
573 ) -> MetaResult<()> {
574 tracing::info!("alter CDC table backfill parallelism");
575 if self.barrier_manager.check_status_running().is_err() {
576 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
577 }
578 self.stream_manager
579 .reschedule_cdc_table_backfill(job_id, target)
580 .await
581 }
582
583 pub async fn reschedule_fragments(
584 &self,
585 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
586 ) -> MetaResult<()> {
587 tracing::info!(
588 "altering parallelism for fragments {:?}",
589 fragment_targets.keys()
590 );
591 let fragment_targets = fragment_targets
592 .into_iter()
593 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
594 .collect();
595
596 self.stream_manager
597 .reschedule_fragments(fragment_targets)
598 .await
599 }
600
601 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
602 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
603 .await
604 }
605
606 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
607 self.metadata_manager
608 .catalog_controller
609 .create_schema(schema)
610 .await
611 }
612
613 async fn drop_schema(
614 &self,
615 schema_id: SchemaId,
616 drop_mode: DropMode,
617 ) -> MetaResult<NotificationVersion> {
618 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
619 .await
620 }
621
622 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
624 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
625 .await
626 .context("failed to create source worker")?;
627
628 let (source_id, version) = self
629 .metadata_manager
630 .catalog_controller
631 .create_source(source)
632 .await?;
633 self.source_manager
634 .register_source_with_handle(source_id, handle)
635 .await;
636 Ok(version)
637 }
638
639 async fn drop_source(
640 &self,
641 source_id: SourceId,
642 drop_mode: DropMode,
643 ) -> MetaResult<NotificationVersion> {
644 self.drop_object(ObjectType::Source, source_id, drop_mode)
645 .await
646 }
647
648 async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
649 tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
650
651 let database_id = self
653 .metadata_manager
654 .catalog_controller
655 .get_object_database_id(source_id)
656 .await?;
657
658 self.stream_manager
659 .barrier_scheduler
660 .run_command(database_id, Command::ResetSource { source_id })
661 .await?;
662
663 let version = self
665 .metadata_manager
666 .catalog_controller
667 .notify_frontend_trivial()
668 .await;
669 Ok(version)
670 }
671
672 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
675 self.metadata_manager
676 .catalog_controller
677 .alter_non_shared_source(source)
678 .await
679 }
680
681 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
682 self.metadata_manager
683 .catalog_controller
684 .create_function(function)
685 .await
686 }
687
688 async fn drop_function(
689 &self,
690 function_id: FunctionId,
691 drop_mode: DropMode,
692 ) -> MetaResult<NotificationVersion> {
693 self.drop_object(ObjectType::Function, function_id, drop_mode)
694 .await
695 }
696
697 async fn create_view(
698 &self,
699 view: View,
700 dependencies: HashSet<ObjectId>,
701 ) -> MetaResult<NotificationVersion> {
702 self.metadata_manager
703 .catalog_controller
704 .create_view(view, dependencies)
705 .await
706 }
707
708 async fn drop_view(
709 &self,
710 view_id: ViewId,
711 drop_mode: DropMode,
712 ) -> MetaResult<NotificationVersion> {
713 self.drop_object(ObjectType::View, view_id, drop_mode).await
714 }
715
716 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
717 validate_connection(&connection).await?;
718 self.metadata_manager
719 .catalog_controller
720 .create_connection(connection)
721 .await
722 }
723
724 async fn drop_connection(
725 &self,
726 connection_id: ConnectionId,
727 drop_mode: DropMode,
728 ) -> MetaResult<NotificationVersion> {
729 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
730 .await
731 }
732
733 async fn alter_database_param(
734 &self,
735 database_id: DatabaseId,
736 param: AlterDatabaseParam,
737 ) -> MetaResult<NotificationVersion> {
738 let (version, updated_db) = self
739 .metadata_manager
740 .catalog_controller
741 .alter_database_param(database_id, param)
742 .await?;
743 self.barrier_manager
745 .update_database_barrier(
746 database_id,
747 updated_db.barrier_interval_ms.map(|v| v as u32),
748 updated_db.checkpoint_frequency.map(|v| v as u64),
749 )
750 .await?;
751 Ok(version)
752 }
753
754 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
757 let secret_store_private_key = self
758 .env
759 .opts
760 .secret_store_private_key
761 .clone()
762 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
763
764 let encrypted_payload = SecretEncryption::encrypt(
765 secret_store_private_key.as_slice(),
766 secret.get_value().as_slice(),
767 )
768 .context(format!("failed to encrypt secret {}", secret.name))?;
769 Ok(encrypted_payload
770 .serialize()
771 .context(format!("failed to serialize secret {}", secret.name))?)
772 }
773
774 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
775 let secret_plain_payload = secret.value.clone();
778 let encrypted_payload = self.get_encrypted_payload(&secret)?;
779 secret.value = encrypted_payload;
780
781 self.metadata_manager
782 .catalog_controller
783 .create_secret(secret, secret_plain_payload)
784 .await
785 }
786
787 async fn drop_secret(
788 &self,
789 secret_id: SecretId,
790 drop_mode: DropMode,
791 ) -> MetaResult<NotificationVersion> {
792 self.drop_object(ObjectType::Secret, secret_id, drop_mode)
793 .await
794 }
795
796 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
797 let secret_plain_payload = secret.value.clone();
798 let encrypted_payload = self.get_encrypted_payload(&secret)?;
799 secret.value = encrypted_payload;
800 self.metadata_manager
801 .catalog_controller
802 .alter_secret(secret, secret_plain_payload)
803 .await
804 }
805
806 async fn create_subscription(
807 &self,
808 mut subscription: Subscription,
809 ) -> MetaResult<NotificationVersion> {
810 tracing::debug!("create subscription");
811 let _permit = self
812 .creating_streaming_job_permits
813 .semaphore
814 .acquire()
815 .await
816 .unwrap();
817 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
818 self.metadata_manager
819 .catalog_controller
820 .create_subscription_catalog(&mut subscription)
821 .await?;
822 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
823 tracing::debug!(error = %err.as_report(), "failed to create subscription");
824 let _ = self
825 .metadata_manager
826 .catalog_controller
827 .try_abort_creating_subscription(subscription.id)
828 .await
829 .inspect_err(|e| {
830 tracing::error!(
831 error = %e.as_report(),
832 "failed to abort create subscription after failure"
833 );
834 });
835 return Err(err);
836 }
837
838 let version = self
839 .metadata_manager
840 .catalog_controller
841 .notify_create_subscription(subscription.id)
842 .await?;
843 tracing::debug!("finish create subscription");
844 Ok(version)
845 }
846
847 async fn drop_subscription(
848 &self,
849 subscription_id: SubscriptionId,
850 drop_mode: DropMode,
851 ) -> MetaResult<NotificationVersion> {
852 tracing::debug!("preparing drop subscription");
853 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
854 let subscription = self
855 .metadata_manager
856 .catalog_controller
857 .get_subscription_by_id(subscription_id)
858 .await?;
859 let table_id = subscription.dependent_table_id;
860 let database_id = subscription.database_id;
861 let (_, version) = self
862 .metadata_manager
863 .catalog_controller
864 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
865 .await?;
866 self.stream_manager
867 .drop_subscription(database_id, subscription_id, table_id)
868 .await;
869 tracing::debug!("finish drop subscription");
870 Ok(version)
871 }
872
873 async fn alter_subscription_retention(
874 &self,
875 subscription_id: SubscriptionId,
876 retention_seconds: u64,
877 definition: String,
878 ) -> MetaResult<NotificationVersion> {
879 tracing::debug!("alter subscription retention");
880 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
881 let (version, subscription) = self
882 .metadata_manager
883 .catalog_controller
884 .alter_subscription_retention(subscription_id, retention_seconds, definition)
885 .await?;
886 self.stream_manager
887 .alter_subscription_retention(
888 subscription.database_id,
889 subscription.id,
890 subscription.dependent_table_id,
891 subscription.retention_seconds,
892 )
893 .await?;
894 tracing::debug!("finish alter subscription retention");
895 Ok(version)
896 }
897
898 #[await_tree::instrument]
900 pub(crate) async fn validate_cdc_table(
901 &self,
902 table: &Table,
903 table_fragments: &StreamJobFragments,
904 ) -> MetaResult<()> {
905 let stream_scan_fragment = table_fragments
906 .fragments
907 .values()
908 .filter(|f| {
909 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
910 || f.fragment_type_mask
911 .contains(FragmentTypeFlag::StreamCdcScan)
912 })
913 .exactly_one()
914 .ok()
915 .with_context(|| {
916 format!(
917 "expect exactly one stream scan fragment, got: {:?}",
918 table_fragments.fragments
919 )
920 })?;
921 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
922 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
923 if let Some(o) = node.options
924 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
925 {
926 } else {
928 assert_eq!(
929 stream_scan_fragment.actors.len(),
930 1,
931 "Stream scan fragment should have only one actor"
932 );
933 }
934 }
935 }
936 let mut found_cdc_scan = false;
937 match &stream_scan_fragment.nodes.node_body {
938 Some(NodeBody::StreamCdcScan(_)) => {
939 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
940 if self
941 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
942 .await?
943 {
944 found_cdc_scan = true;
945 }
946 }
947 Some(NodeBody::Project(_)) => {
949 for input in &stream_scan_fragment.nodes.input {
950 assert_parallelism(stream_scan_fragment, &input.node_body);
951 if self
952 .validate_cdc_table_inner(&input.node_body, table.id)
953 .await?
954 {
955 found_cdc_scan = true;
956 }
957 }
958 }
959 _ => {
960 bail!("Unexpected node body for stream cdc scan");
961 }
962 };
963 if !found_cdc_scan {
964 bail!("No stream cdc scan node found in stream scan fragment");
965 }
966 Ok(())
967 }
968
969 async fn validate_cdc_table_inner(
970 &self,
971 node_body: &Option<NodeBody>,
972 table_id: TableId,
973 ) -> MetaResult<bool> {
974 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
975 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
976 {
977 let options_with_secret = WithOptionsSecResolved::new(
978 cdc_table_desc.connect_properties.clone(),
979 cdc_table_desc.secret_refs.clone(),
980 );
981
982 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
983 props.init_from_pb_cdc_table_desc(cdc_table_desc);
984
985 let _enumerator = props
987 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
988 .await?;
989
990 tracing::debug!(?table_id, "validate cdc table success");
991 Ok(true)
992 } else {
993 Ok(false)
994 }
995 }
996
997 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
998 let migrated = self
999 .metadata_manager
1000 .catalog_controller
1001 .has_table_been_migrated(table_id)
1002 .await?;
1003 if !migrated {
1004 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
1005 } else {
1006 Ok(())
1007 }
1008 }
1009
1010 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1013 pub async fn create_streaming_job(
1014 &self,
1015 mut streaming_job: StreamingJob,
1016 fragment_graph: StreamFragmentGraphProto,
1017 dependencies: HashSet<ObjectId>,
1018 resource_type: streaming_job_resource_type::ResourceType,
1019 if_not_exists: bool,
1020 ) -> MetaResult<NotificationVersion> {
1021 if let StreamingJob::Sink(sink) = &streaming_job
1022 && let Some(target_table) = sink.target_table
1023 {
1024 self.validate_table_for_sink(target_table).await?;
1025 }
1026 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1027 let check_ret = self
1028 .metadata_manager
1029 .catalog_controller
1030 .create_job_catalog(
1031 &mut streaming_job,
1032 &ctx,
1033 &fragment_graph.parallelism,
1034 fragment_graph.max_parallelism as _,
1035 dependencies,
1036 resource_type.clone(),
1037 &fragment_graph.backfill_parallelism,
1038 )
1039 .await;
1040 if let Err(meta_err) = check_ret {
1041 if !if_not_exists {
1042 return Err(meta_err);
1043 }
1044 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1045 if streaming_job.create_type() == CreateType::Foreground {
1046 let database_id = streaming_job.database_id();
1047 self.metadata_manager
1048 .wait_streaming_job_finished(database_id, *job_id)
1049 .await
1050 } else {
1051 Ok(IGNORED_NOTIFICATION_VERSION)
1052 }
1053 } else {
1054 Err(meta_err)
1055 };
1056 }
1057 let job_id = streaming_job.id();
1058 tracing::debug!(
1059 id = %job_id,
1060 definition = streaming_job.definition(),
1061 create_type = streaming_job.create_type().as_str_name(),
1062 job_type = ?streaming_job.job_type(),
1063 "starting streaming job",
1064 );
1065 let permit = self
1067 .creating_streaming_job_permits
1068 .semaphore
1069 .clone()
1070 .acquire_owned()
1071 .instrument_await("acquire_creating_streaming_job_permit")
1072 .await
1073 .unwrap();
1074 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1075
1076 let name = streaming_job.name();
1077 let definition = streaming_job.definition();
1078 let source_id = match &streaming_job {
1079 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1080 _ => None,
1081 };
1082
1083 match self
1085 .create_streaming_job_inner(ctx, streaming_job, fragment_graph, resource_type, permit)
1086 .await
1087 {
1088 Ok(version) => Ok(version),
1089 Err(err) => {
1090 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1091 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1092 id: job_id,
1093 name,
1094 definition,
1095 error: err.as_report().to_string(),
1096 };
1097 self.env.event_log_manager_ref().add_event_logs(vec![
1098 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1099 ]);
1100 let (aborted, _) = self
1101 .metadata_manager
1102 .catalog_controller
1103 .try_abort_creating_streaming_job(job_id, false)
1104 .await?;
1105 if aborted {
1106 tracing::warn!(id = %job_id, "aborted streaming job");
1107 if let Some(source_id) = source_id {
1109 self.source_manager
1110 .apply_source_change(SourceChange::DropSource {
1111 dropped_source_ids: vec![source_id],
1112 })
1113 .await;
1114 }
1115 }
1116 Err(err)
1117 }
1118 }
1119 }
1120
1121 #[await_tree::instrument(boxed)]
1122 async fn create_streaming_job_inner(
1123 &self,
1124 ctx: StreamContext,
1125 mut streaming_job: StreamingJob,
1126 fragment_graph: StreamFragmentGraphProto,
1127 resource_type: streaming_job_resource_type::ResourceType,
1128 permit: OwnedSemaphorePermit,
1129 ) -> MetaResult<NotificationVersion> {
1130 let mut fragment_graph =
1131 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1132 streaming_job.set_info_from_graph(&fragment_graph);
1133
1134 let incomplete_internal_tables = fragment_graph
1136 .incomplete_internal_tables()
1137 .into_values()
1138 .collect_vec();
1139 let table_id_map = self
1140 .metadata_manager
1141 .catalog_controller
1142 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1143 .await?;
1144 fragment_graph.refill_internal_table_ids(table_id_map);
1145
1146 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1148 let (ctx, stream_job_fragments) = self
1149 .build_stream_job(ctx, streaming_job, fragment_graph, resource_type)
1150 .await?;
1151
1152 let streaming_job = &ctx.streaming_job;
1153
1154 match streaming_job {
1155 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1156 self.validate_cdc_table(table, &stream_job_fragments)
1157 .await?;
1158 }
1159 StreamingJob::Table(Some(source), ..) => {
1160 self.source_manager.register_source(source).await?;
1162 let connector_name = source
1163 .get_with_properties()
1164 .get(UPSTREAM_SOURCE_KEY)
1165 .cloned();
1166 let attr = source.info.as_ref().map(|source_info| {
1167 jsonbb::json!({
1168 "format": source_info.format().as_str_name(),
1169 "encode": source_info.row_encode().as_str_name(),
1170 })
1171 });
1172 report_create_object(
1173 streaming_job.id(),
1174 "source",
1175 PbTelemetryDatabaseObject::Source,
1176 connector_name,
1177 attr,
1178 );
1179 }
1180 StreamingJob::Sink(sink) => {
1181 if sink.auto_refresh_schema_from_table.is_some() {
1182 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1183 }
1184 validate_sink(sink).await?;
1186 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1187 let attr = sink.format_desc.as_ref().map(|sink_info| {
1188 jsonbb::json!({
1189 "format": sink_info.format().as_str_name(),
1190 "encode": sink_info.encode().as_str_name(),
1191 })
1192 });
1193 report_create_object(
1194 streaming_job.id(),
1195 "sink",
1196 PbTelemetryDatabaseObject::Sink,
1197 connector_name,
1198 attr,
1199 );
1200 }
1201 StreamingJob::Source(source) => {
1202 self.source_manager.register_source(source).await?;
1204 let connector_name = source
1205 .get_with_properties()
1206 .get(UPSTREAM_SOURCE_KEY)
1207 .cloned();
1208 let attr = source.info.as_ref().map(|source_info| {
1209 jsonbb::json!({
1210 "format": source_info.format().as_str_name(),
1211 "encode": source_info.row_encode().as_str_name(),
1212 })
1213 });
1214 report_create_object(
1215 streaming_job.id(),
1216 "source",
1217 PbTelemetryDatabaseObject::Source,
1218 connector_name,
1219 attr,
1220 );
1221 }
1222 _ => {}
1223 }
1224
1225 let backfill_orders = ctx.fragment_backfill_ordering.to_meta_model();
1226 self.metadata_manager
1227 .catalog_controller
1228 .prepare_stream_job_fragments(
1229 &stream_job_fragments,
1230 streaming_job,
1231 false,
1232 Some(backfill_orders),
1233 )
1234 .await?;
1235
1236 let version = self
1238 .stream_manager
1239 .create_streaming_job(stream_job_fragments, ctx, permit)
1240 .await?;
1241
1242 Ok(version)
1243 }
1244
1245 pub async fn drop_object(
1247 &self,
1248 object_type: ObjectType,
1249 object_id: impl Into<ObjectId>,
1250 drop_mode: DropMode,
1251 ) -> MetaResult<NotificationVersion> {
1252 let object_id = object_id.into();
1253 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1256 let _source_tick_pause_guard = self.source_manager.pause_tick().await;
1257
1258 let (release_ctx, version) = self
1259 .metadata_manager
1260 .catalog_controller
1261 .drop_object(object_type, object_id, drop_mode)
1262 .await?;
1263
1264 if object_type == ObjectType::Source {
1265 self.env
1266 .notification_manager_ref()
1267 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1268 }
1269
1270 let ReleaseContext {
1271 database_id,
1272 removed_streaming_job_ids,
1273 removed_state_table_ids,
1274 removed_source_ids,
1275 removed_secret_ids: secret_ids,
1276 removed_source_fragments,
1277 removed_actors,
1278 removed_fragments,
1279 removed_sink_fragment_by_targets,
1280 removed_iceberg_table_sinks,
1281 } = release_ctx;
1282
1283 self.stream_manager
1284 .drop_streaming_jobs(
1285 database_id,
1286 removed_actors.iter().map(|id| *id as _).collect(),
1287 removed_streaming_job_ids,
1288 removed_state_table_ids,
1289 removed_fragments.iter().map(|id| *id as _).collect(),
1290 removed_sink_fragment_by_targets
1291 .into_iter()
1292 .map(|(target, sinks)| {
1293 (target as _, sinks.into_iter().map(|id| id as _).collect())
1294 })
1295 .collect(),
1296 )
1297 .await;
1298
1299 self.source_manager
1302 .apply_source_change(SourceChange::DropSource {
1303 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1304 })
1305 .await;
1306
1307 let dropped_source_fragments = removed_source_fragments;
1310 self.source_manager
1311 .apply_source_change(SourceChange::DropMv {
1312 dropped_source_fragments,
1313 })
1314 .await;
1315
1316 let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1318 .iter()
1319 .map(|sink| sink.id)
1320 .collect();
1321
1322 for sink in removed_iceberg_table_sinks {
1323 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1324 .expect("Iceberg sink should be valid");
1325 let iceberg_sink =
1326 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1327 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1328 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1329 tracing::info!(
1330 "dropping iceberg table {} for dropped sink",
1331 table_identifier
1332 );
1333
1334 let _ = iceberg_catalog
1335 .drop_table(&table_identifier)
1336 .await
1337 .inspect_err(|err| {
1338 tracing::error!(
1339 "failed to drop iceberg table {} during cleanup: {}",
1340 table_identifier,
1341 err.as_report()
1342 );
1343 });
1344 }
1345 }
1346
1347 if !iceberg_sink_ids.is_empty() {
1349 self.sink_manager
1350 .stop_sink_coordinator(iceberg_sink_ids.clone())
1351 .await;
1352
1353 for sink_id in iceberg_sink_ids {
1354 self.iceberg_compaction_manager
1355 .clear_iceberg_commits_by_sink_id(sink_id);
1356 }
1357 }
1358
1359 for secret in secret_ids {
1361 LocalSecretManager::global().remove_secret(secret);
1362 }
1363 Ok(version)
1364 }
1365
1366 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1368 pub async fn replace_job(
1369 &self,
1370 mut streaming_job: StreamingJob,
1371 fragment_graph: StreamFragmentGraphProto,
1372 ) -> MetaResult<NotificationVersion> {
1373 match &streaming_job {
1374 StreamingJob::Table(..)
1375 | StreamingJob::Source(..)
1376 | StreamingJob::MaterializedView(..) => {}
1377 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1378 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1379 }
1380 }
1381
1382 let job_id = streaming_job.id();
1383
1384 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1385 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1386
1387 let original_max_parallelism = self
1389 .metadata_manager
1390 .get_job_max_parallelism(streaming_job.id())
1391 .await?;
1392 let fragment_graph = PbStreamFragmentGraph {
1393 max_parallelism: original_max_parallelism as _,
1394 ..fragment_graph
1395 };
1396
1397 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1399 streaming_job.set_info_from_graph(&fragment_graph);
1400
1401 let streaming_job = streaming_job;
1403
1404 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1405 let auto_refresh_schema_sinks = self
1406 .metadata_manager
1407 .catalog_controller
1408 .get_sink_auto_refresh_schema_from(table.id)
1409 .await?;
1410 if !auto_refresh_schema_sinks.is_empty() {
1411 let original_table_columns = self
1412 .metadata_manager
1413 .catalog_controller
1414 .get_table_columns(table.id)
1415 .await?;
1416 let mut original_table_column_ids: HashSet<_> = original_table_columns
1418 .iter()
1419 .map(|col| col.column_id())
1420 .collect();
1421 let newly_added_columns = table
1422 .columns
1423 .iter()
1424 .filter(|col| {
1425 !original_table_column_ids.remove(&ColumnId::new(
1426 col.column_desc.as_ref().unwrap().column_id as _,
1427 ))
1428 })
1429 .map(|col| ColumnCatalog::from(col.clone()))
1430 .collect_vec();
1431 if !original_table_column_ids.is_empty() {
1432 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1433 }
1434 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1435 for sink in auto_refresh_schema_sinks {
1436 let sink_job_fragments = self
1437 .metadata_manager
1438 .get_job_fragments_by_id(sink.id.as_job_id())
1439 .await?;
1440 if sink_job_fragments.fragments.len() != 1 {
1441 return Err(anyhow!(
1442 "auto schema refresh sink must have only one fragment, but got {}",
1443 sink_job_fragments.fragments.len()
1444 )
1445 .into());
1446 }
1447 let original_sink_fragment =
1448 sink_job_fragments.fragments.into_values().next().unwrap();
1449 let (new_sink_fragment, new_schema, new_log_store_table) =
1450 rewrite_refresh_schema_sink_fragment(
1451 &original_sink_fragment,
1452 &sink,
1453 &newly_added_columns,
1454 table,
1455 fragment_graph.table_fragment_id(),
1456 self.env.id_gen_manager(),
1457 self.env.actor_id_generator(),
1458 )?;
1459
1460 assert_eq!(
1461 original_sink_fragment.actors.len(),
1462 new_sink_fragment.actors.len()
1463 );
1464 let actor_status = (0..original_sink_fragment.actors.len())
1465 .map(|i| {
1466 let worker_node_id = sink_job_fragments.actor_status
1467 [&original_sink_fragment.actors[i].actor_id]
1468 .location
1469 .as_ref()
1470 .unwrap()
1471 .worker_node_id;
1472 (
1473 new_sink_fragment.actors[i].actor_id,
1474 PbActorStatus {
1475 location: Some(PbActorLocation { worker_node_id }),
1476 },
1477 )
1478 })
1479 .collect();
1480
1481 let streaming_job = StreamingJob::Sink(sink);
1482
1483 let tmp_sink_id = self
1484 .metadata_manager
1485 .catalog_controller
1486 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1487 .await?
1488 .as_sink_id();
1489 let StreamingJob::Sink(sink) = streaming_job else {
1490 unreachable!()
1491 };
1492
1493 sinks.push(AutoRefreshSchemaSinkContext {
1494 tmp_sink_id,
1495 original_sink: sink,
1496 original_fragment: original_sink_fragment,
1497 new_schema,
1498 newly_add_fields: newly_added_columns
1499 .iter()
1500 .map(|col| Field::from(&col.column_desc))
1501 .collect(),
1502 new_fragment: new_sink_fragment,
1503 new_log_store_table,
1504 actor_status,
1505 });
1506 }
1507 Some(sinks)
1508 } else {
1509 None
1510 }
1511 } else {
1512 None
1513 };
1514
1515 let tmp_id = self
1516 .metadata_manager
1517 .catalog_controller
1518 .create_job_catalog_for_replace(
1519 &streaming_job,
1520 Some(&ctx),
1521 fragment_graph.specified_parallelism().as_ref(),
1522 Some(fragment_graph.max_parallelism()),
1523 )
1524 .await?;
1525
1526 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1527 sinks
1528 .iter()
1529 .map(|sink| sink.tmp_sink_id.as_object_id())
1530 .collect_vec()
1531 });
1532
1533 tracing::debug!(id = %job_id, "building replace streaming job");
1534 let mut updated_sink_catalogs = vec![];
1535
1536 let mut drop_table_connector_ctx = None;
1537 let result: MetaResult<_> = try {
1538 let (mut ctx, mut stream_job_fragments) = self
1539 .build_replace_job(
1540 ctx,
1541 &streaming_job,
1542 fragment_graph,
1543 tmp_id,
1544 auto_refresh_schema_sinks,
1545 )
1546 .await?;
1547 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1548 let auto_refresh_schema_sink_finish_ctx =
1549 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1550 sinks
1551 .iter()
1552 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1553 tmp_sink_id: sink.tmp_sink_id,
1554 original_sink_id: sink.original_sink.id,
1555 columns: sink.new_schema.clone(),
1556 new_log_store_table: sink
1557 .new_log_store_table
1558 .as_ref()
1559 .map(|table| (table.id, table.columns.clone())),
1560 })
1561 .collect()
1562 });
1563
1564 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1566 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1567 let upstream_infos = self
1568 .metadata_manager
1569 .catalog_controller
1570 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1571 .await?;
1572 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1573
1574 for upstream_info in &upstream_infos {
1575 let upstream_fragment_id = upstream_info.sink_fragment_id;
1576 ctx.upstream_fragment_downstreams
1577 .entry(upstream_fragment_id)
1578 .or_default()
1579 .push(upstream_info.new_sink_downstream.clone());
1580 if upstream_info.sink_original_target_columns.is_empty() {
1581 updated_sink_catalogs.push(upstream_info.sink_id);
1582 }
1583 }
1584 }
1585
1586 let replace_upstream = ctx.replace_upstream.clone();
1587
1588 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1589 let empty_downstreams = FragmentDownstreamRelation::default();
1590 for sink in sinks {
1591 self.metadata_manager
1592 .catalog_controller
1593 .prepare_streaming_job(
1594 sink.tmp_sink_id.as_job_id(),
1595 || [&sink.new_fragment].into_iter(),
1596 &empty_downstreams,
1597 true,
1598 None,
1599 None,
1600 )
1601 .await?;
1602 }
1603 }
1604
1605 self.metadata_manager
1606 .catalog_controller
1607 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1608 .await?;
1609
1610 self.stream_manager
1611 .replace_stream_job(stream_job_fragments, ctx)
1612 .await?;
1613 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1614 };
1615
1616 match result {
1617 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1618 let version = self
1619 .metadata_manager
1620 .catalog_controller
1621 .finish_replace_streaming_job(
1622 tmp_id,
1623 streaming_job,
1624 replace_upstream,
1625 SinkIntoTableContext {
1626 updated_sink_catalogs,
1627 },
1628 drop_table_connector_ctx.as_ref(),
1629 auto_refresh_schema_sink_finish_ctx,
1630 )
1631 .await?;
1632 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1633 self.source_manager
1634 .apply_source_change(SourceChange::DropSource {
1635 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1636 })
1637 .await;
1638 }
1639 Ok(version)
1640 }
1641 Err(err) => {
1642 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1643 let _ = self.metadata_manager
1644 .catalog_controller
1645 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1646 .await.inspect_err(|err| {
1647 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1648 });
1649 Err(err)
1650 }
1651 }
1652 }
1653
1654 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1655 )]
1656 async fn drop_streaming_job(
1657 &self,
1658 job_id: StreamingJobId,
1659 drop_mode: DropMode,
1660 ) -> MetaResult<NotificationVersion> {
1661 let (object_id, object_type) = match job_id {
1662 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1663 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1664 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1665 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1666 };
1667
1668 let job_status = self
1669 .metadata_manager
1670 .catalog_controller
1671 .get_streaming_job_status(job_id.id())
1672 .await?;
1673 let version = match job_status {
1674 JobStatus::Initial => {
1675 unreachable!(
1676 "Job with Initial status should not notify frontend and therefore should not arrive here"
1677 );
1678 }
1679 JobStatus::Creating => {
1680 self.stream_manager
1681 .cancel_streaming_jobs(vec![job_id.id()])
1682 .await?;
1683 IGNORED_NOTIFICATION_VERSION
1684 }
1685 JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1686 };
1687
1688 Ok(version)
1689 }
1690
1691 fn resolve_stream_parallelism(
1695 &self,
1696 specified: Option<NonZeroUsize>,
1697 max: NonZeroUsize,
1698 cluster_info: &StreamingClusterInfo,
1699 resource_group: String,
1700 ) -> MetaResult<NonZeroUsize> {
1701 let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1702 DdlController::resolve_stream_parallelism_inner(
1703 specified,
1704 max,
1705 available,
1706 &self.env.opts.default_parallelism,
1707 &resource_group,
1708 )
1709 }
1710
1711 fn resolve_stream_parallelism_inner(
1712 specified: Option<NonZeroUsize>,
1713 max: NonZeroUsize,
1714 available: Option<NonZeroUsize>,
1715 default_parallelism: &DefaultParallelism,
1716 resource_group: &str,
1717 ) -> MetaResult<NonZeroUsize> {
1718 let Some(available) = available else {
1719 bail_unavailable!(
1720 "no available slots to schedule in resource group \"{}\", \
1721 have you allocated any compute nodes within this resource group?",
1722 resource_group
1723 );
1724 };
1725
1726 if let Some(specified) = specified {
1727 if specified > max {
1728 bail_invalid_parameter!(
1729 "specified parallelism {} should not exceed max parallelism {}",
1730 specified,
1731 max,
1732 );
1733 }
1734 if specified > available {
1735 tracing::warn!(
1736 resource_group,
1737 specified_parallelism = specified.get(),
1738 available_parallelism = available.get(),
1739 "specified parallelism exceeds available slots, scheduling with specified value",
1740 );
1741 }
1742 return Ok(specified);
1743 }
1744
1745 let default_parallelism = match default_parallelism {
1747 DefaultParallelism::Full => available,
1748 DefaultParallelism::Default(num) => {
1749 if *num > available {
1750 tracing::warn!(
1751 resource_group,
1752 configured_parallelism = num.get(),
1753 available_parallelism = available.get(),
1754 "default parallelism exceeds available slots, scheduling with configured value",
1755 );
1756 }
1757 *num
1758 }
1759 };
1760
1761 if default_parallelism > max {
1762 tracing::warn!(
1763 max_parallelism = max.get(),
1764 resource_group,
1765 "default parallelism exceeds max parallelism, capping to max",
1766 );
1767 }
1768 Ok(default_parallelism.min(max))
1769 }
1770
1771 #[await_tree::instrument]
1777 pub(crate) async fn build_stream_job(
1778 &self,
1779 stream_ctx: StreamContext,
1780 mut stream_job: StreamingJob,
1781 fragment_graph: StreamFragmentGraph,
1782 resource_type: streaming_job_resource_type::ResourceType,
1783 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1784 let id = stream_job.id();
1785 let specified_parallelism = fragment_graph.specified_parallelism();
1786 let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1787 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1788
1789 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1791
1792 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1796 fragment_graph.collect_snapshot_backfill_info()?;
1797 assert!(
1798 snapshot_backfill_info
1799 .iter()
1800 .chain([&cross_db_snapshot_backfill_info])
1801 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1802 .all(|backfill_epoch| backfill_epoch.is_none()),
1803 "should not set backfill epoch when initially build the job: {:?} {:?}",
1804 snapshot_backfill_info,
1805 cross_db_snapshot_backfill_info
1806 );
1807
1808 let locality_fragment_state_table_mapping =
1809 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1810
1811 self.metadata_manager
1813 .catalog_controller
1814 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1815 .await?;
1816
1817 let upstream_table_ids = fragment_graph
1818 .dependent_table_ids()
1819 .iter()
1820 .filter(|id| {
1821 !cross_db_snapshot_backfill_info
1822 .upstream_mv_table_id_to_backfill_epoch
1823 .contains_key(*id)
1824 })
1825 .cloned()
1826 .collect();
1827
1828 let (upstream_root_fragments, existing_actor_location) = self
1829 .metadata_manager
1830 .get_upstream_root_fragments(&upstream_table_ids)
1831 .await?;
1832
1833 if snapshot_backfill_info.is_some() {
1834 match stream_job {
1835 StreamingJob::MaterializedView(_)
1836 | StreamingJob::Sink(_)
1837 | StreamingJob::Index(_, _) => {}
1838 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1839 return Err(
1840 anyhow!("snapshot_backfill not enabled for table and source").into(),
1841 );
1842 }
1843 }
1844 }
1845
1846 let upstream_actors = upstream_root_fragments
1847 .values()
1848 .map(|(fragment, _)| {
1849 (
1850 fragment.fragment_id,
1851 fragment.actors.keys().copied().collect(),
1852 )
1853 })
1854 .collect();
1855
1856 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1857 fragment_graph,
1858 FragmentGraphUpstreamContext {
1859 upstream_root_fragments,
1860 upstream_actor_location: existing_actor_location,
1861 },
1862 (&stream_job).into(),
1863 )?;
1864 let resource_group = if let Some(group) = resource_type.resource_group() {
1865 group
1866 } else {
1867 self.metadata_manager
1868 .get_database_resource_group(stream_job.database_id())
1869 .await?
1870 };
1871 let is_serverless_backfill = matches!(
1872 &resource_type,
1873 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1874 );
1875
1876 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1878
1879 let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1880 let parallelism = self.resolve_stream_parallelism(
1881 initial_parallelism,
1882 max_parallelism,
1883 &cluster_info,
1884 resource_group.clone(),
1885 )?;
1886
1887 let parallelism = if initial_parallelism.is_some() {
1888 parallelism.get()
1889 } else {
1890 let adaptive_strategy = match stream_ctx.adaptive_parallelism_strategy {
1892 Some(strategy) => strategy,
1893 None => self
1894 .env
1895 .system_params_reader()
1896 .await
1897 .adaptive_parallelism_strategy(),
1898 };
1899 adaptive_strategy.compute_target_parallelism(parallelism.get())
1900 };
1901
1902 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1903 let actor_graph_builder = ActorGraphBuilder::new(
1904 id,
1905 resource_group,
1906 complete_graph,
1907 cluster_info,
1908 parallelism,
1909 )?;
1910
1911 let ActorGraphBuildResult {
1912 graph,
1913 downstream_fragment_relations,
1914 building_locations,
1915 upstream_fragment_downstreams,
1916 new_no_shuffle,
1917 replace_upstream,
1918 ..
1919 } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1920 assert!(replace_upstream.is_empty());
1921
1922 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1929 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1930 _ => TableParallelism::Fixed(parallelism.get()),
1931 };
1932
1933 let stream_job_fragments = StreamJobFragments::new(
1934 id,
1935 graph,
1936 &building_locations.actor_locations,
1937 stream_ctx.clone(),
1938 table_parallelism,
1939 max_parallelism.get(),
1940 );
1941
1942 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1943 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1944 }
1945
1946 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1947 && let Ok(table_id) = sink.get_target_table()
1948 {
1949 let tables = self
1950 .metadata_manager
1951 .get_table_catalog_by_ids(&[*table_id])
1952 .await?;
1953 let target_table = tables
1954 .first()
1955 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1956 let sink_fragment = stream_job_fragments
1957 .sink_fragment()
1958 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1959 let mview_fragment_id = self
1960 .metadata_manager
1961 .catalog_controller
1962 .get_mview_fragment_by_id(table_id.as_job_id())
1963 .await?;
1964 let upstream_sink_info = build_upstream_sink_info(
1965 sink.id,
1966 sink.original_target_columns.clone(),
1967 sink_fragment.fragment_id as _,
1968 target_table,
1969 mview_fragment_id,
1970 )?;
1971 Some(upstream_sink_info)
1972 } else {
1973 None
1974 };
1975
1976 let mut cdc_table_snapshot_splits = None;
1977 if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1978 && let Some((_, stream_cdc_scan)) =
1979 parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1980 {
1981 {
1982 let splits = try_init_parallel_cdc_table_snapshot_splits(
1984 table.id,
1985 stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1986 self.env.meta_store_ref(),
1987 stream_cdc_scan.options.as_ref().unwrap(),
1988 self.env.opts.cdc_table_split_init_insert_batch_size,
1989 self.env.opts.cdc_table_split_init_sleep_interval_splits,
1990 self.env.opts.cdc_table_split_init_sleep_duration_millis,
1991 )
1992 .await?;
1993 cdc_table_snapshot_splits = Some(splits);
1994 }
1995 }
1996
1997 let ctx = CreateStreamingJobContext {
1998 upstream_fragment_downstreams,
1999 new_no_shuffle,
2000 upstream_actors,
2001 building_locations,
2002 definition: stream_job.definition(),
2003 create_type: stream_job.create_type(),
2004 job_type: (&stream_job).into(),
2005 streaming_job: stream_job,
2006 new_upstream_sink,
2007 option: CreateStreamingJobOption {},
2008 snapshot_backfill_info,
2009 cross_db_snapshot_backfill_info,
2010 fragment_backfill_ordering,
2011 locality_fragment_state_table_mapping,
2012 cdc_table_snapshot_splits,
2013 is_serverless_backfill,
2014 };
2015
2016 Ok((
2017 ctx,
2018 StreamJobFragmentsToCreate {
2019 inner: stream_job_fragments,
2020 downstreams: downstream_fragment_relations,
2021 },
2022 ))
2023 }
2024
2025 pub(crate) async fn build_replace_job(
2031 &self,
2032 stream_ctx: StreamContext,
2033 stream_job: &StreamingJob,
2034 mut fragment_graph: StreamFragmentGraph,
2035 tmp_job_id: JobId,
2036 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
2037 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
2038 match &stream_job {
2039 StreamingJob::Table(..)
2040 | StreamingJob::Source(..)
2041 | StreamingJob::MaterializedView(..) => {}
2042 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
2043 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
2044 }
2045 }
2046
2047 let id = stream_job.id();
2048
2049 let mut drop_table_associated_source_id = None;
2051 if let StreamingJob::Table(None, _, _) = &stream_job {
2052 drop_table_associated_source_id = self
2053 .metadata_manager
2054 .get_table_associated_source_id(id.as_mv_table_id())
2055 .await?;
2056 }
2057
2058 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
2059 let old_internal_table_ids = old_fragments.internal_table_ids();
2060
2061 let mut drop_table_connector_ctx = None;
2063 if let Some(to_remove_source_id) = drop_table_associated_source_id {
2064 debug_assert!(old_internal_table_ids.len() == 1);
2066
2067 drop_table_connector_ctx = Some(DropTableConnectorContext {
2068 to_change_streaming_job_id: id,
2071 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
2073 });
2074 } else if stream_job.is_materialized_view() {
2075 let old_fragments_upstreams = self
2078 .metadata_manager
2079 .catalog_controller
2080 .upstream_fragments(old_fragments.fragment_ids())
2081 .await?;
2082
2083 let old_state_graph =
2084 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2085 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2086 let result = state_match::match_graph(&new_state_graph, &old_state_graph)
2087 .context("incompatible altering on the streaming job states")?;
2088
2089 fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2090 fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2091 } else {
2092 let old_internal_tables = self
2095 .metadata_manager
2096 .get_table_catalog_by_ids(&old_internal_table_ids)
2097 .await?;
2098 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2099 }
2100
2101 let original_root_fragment = old_fragments
2104 .root_fragment()
2105 .expect("root fragment not found");
2106
2107 let job_type = StreamingJobType::from(stream_job);
2108
2109 let (mut downstream_fragments, mut downstream_actor_location) =
2111 self.metadata_manager.get_downstream_fragments(id).await?;
2112
2113 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2114 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2115 .iter()
2116 .map(|sink| sink.original_fragment.fragment_id)
2117 .collect();
2118 for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2119 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2120 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2121 }) {
2122 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2123 for actor_id in downstream_fragment.actors.keys() {
2124 downstream_actor_location.remove(actor_id);
2125 }
2126 for (actor_id, status) in &sink.actor_status {
2127 downstream_actor_location
2128 .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2129 }
2130
2131 *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2132 *nodes = sink.new_fragment.nodes.clone();
2133 }
2134 }
2135 assert!(remaining_fragment.is_empty());
2136 }
2137
2138 let complete_graph = match &job_type {
2140 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2141 CompleteStreamFragmentGraph::with_downstreams(
2142 fragment_graph,
2143 FragmentGraphDownstreamContext {
2144 original_root_fragment_id: original_root_fragment.fragment_id,
2145 downstream_fragments,
2146 downstream_actor_location,
2147 },
2148 job_type,
2149 )?
2150 }
2151 StreamingJobType::Table(TableJobType::SharedCdcSource)
2152 | StreamingJobType::MaterializedView => {
2153 let (upstream_root_fragments, upstream_actor_location) = self
2155 .metadata_manager
2156 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2157 .await?;
2158
2159 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2160 fragment_graph,
2161 FragmentGraphUpstreamContext {
2162 upstream_root_fragments,
2163 upstream_actor_location,
2164 },
2165 FragmentGraphDownstreamContext {
2166 original_root_fragment_id: original_root_fragment.fragment_id,
2167 downstream_fragments,
2168 downstream_actor_location,
2169 },
2170 job_type,
2171 )?
2172 }
2173 _ => unreachable!(),
2174 };
2175
2176 let resource_group = self
2177 .metadata_manager
2178 .get_existing_job_resource_group(id)
2179 .await?;
2180
2181 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2183
2184 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2187 .expect("The number of actors in the original table fragment should be greater than 0");
2188
2189 let actor_graph_builder = ActorGraphBuilder::new(
2190 id,
2191 resource_group,
2192 complete_graph,
2193 cluster_info,
2194 parallelism,
2195 )?;
2196
2197 let ActorGraphBuildResult {
2198 graph,
2199 downstream_fragment_relations,
2200 building_locations,
2201 upstream_fragment_downstreams,
2202 mut replace_upstream,
2203 new_no_shuffle,
2204 ..
2205 } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2206
2207 if matches!(
2209 job_type,
2210 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2211 ) {
2212 assert!(upstream_fragment_downstreams.is_empty());
2213 }
2214
2215 let stream_job_fragments = StreamJobFragments::new(
2219 tmp_job_id,
2220 graph,
2221 &building_locations.actor_locations,
2222 stream_ctx,
2223 old_fragments.assigned_parallelism,
2224 old_fragments.max_parallelism,
2225 );
2226
2227 if let Some(sinks) = &auto_refresh_schema_sinks {
2228 for sink in sinks {
2229 replace_upstream
2230 .remove(&sink.new_fragment.fragment_id)
2231 .expect("should exist");
2232 }
2233 }
2234
2235 let ctx = ReplaceStreamJobContext {
2239 old_fragments,
2240 replace_upstream,
2241 new_no_shuffle,
2242 upstream_fragment_downstreams,
2243 building_locations,
2244 streaming_job: stream_job.clone(),
2245 tmp_id: tmp_job_id,
2246 drop_table_connector_ctx,
2247 auto_refresh_schema_sinks,
2248 };
2249
2250 Ok((
2251 ctx,
2252 StreamJobFragmentsToCreate {
2253 inner: stream_job_fragments,
2254 downstreams: downstream_fragment_relations,
2255 },
2256 ))
2257 }
2258
2259 async fn alter_name(
2260 &self,
2261 relation: alter_name_request::Object,
2262 new_name: &str,
2263 ) -> MetaResult<NotificationVersion> {
2264 let (obj_type, id): (ObjectType, ObjectId) = match relation {
2265 alter_name_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2266 alter_name_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2267 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id.into()),
2268 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2269 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2270 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2271 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2272 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2273 };
2274 self.metadata_manager
2275 .catalog_controller
2276 .alter_name(obj_type, id, new_name)
2277 .await
2278 }
2279
2280 async fn alter_swap_rename(
2281 &self,
2282 object: alter_swap_rename_request::Object,
2283 ) -> MetaResult<NotificationVersion> {
2284 let (obj_type, src_id, dst_id) = match object {
2285 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2286 alter_swap_rename_request::Object::Table(objs) => {
2287 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2288 (ObjectType::Table, src_id, dst_id)
2289 }
2290 alter_swap_rename_request::Object::View(objs) => {
2291 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2292 (ObjectType::View, src_id, dst_id)
2293 }
2294 alter_swap_rename_request::Object::Source(objs) => {
2295 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2296 (ObjectType::Source, src_id, dst_id)
2297 }
2298 alter_swap_rename_request::Object::Sink(objs) => {
2299 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2300 (ObjectType::Sink, src_id, dst_id)
2301 }
2302 alter_swap_rename_request::Object::Subscription(objs) => {
2303 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2304 (ObjectType::Subscription, src_id, dst_id)
2305 }
2306 };
2307
2308 self.metadata_manager
2309 .catalog_controller
2310 .alter_swap_rename(obj_type, src_id, dst_id)
2311 .await
2312 }
2313
2314 async fn alter_owner(
2315 &self,
2316 object: Object,
2317 owner_id: UserId,
2318 ) -> MetaResult<NotificationVersion> {
2319 let (obj_type, id): (ObjectType, ObjectId) = match object {
2320 Object::TableId(id) => (ObjectType::Table, id.into()),
2321 Object::ViewId(id) => (ObjectType::View, id.into()),
2322 Object::SourceId(id) => (ObjectType::Source, id.into()),
2323 Object::SinkId(id) => (ObjectType::Sink, id.into()),
2324 Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2325 Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2326 Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2327 Object::ConnectionId(id) => (ObjectType::Connection, id.into()),
2328 Object::FunctionId(id) => (ObjectType::Function, id.into()),
2329 Object::SecretId(id) => (ObjectType::Secret, id.into()),
2330 };
2331 self.metadata_manager
2332 .catalog_controller
2333 .alter_owner(obj_type, id, owner_id as _)
2334 .await
2335 }
2336
2337 async fn alter_set_schema(
2338 &self,
2339 object: alter_set_schema_request::Object,
2340 new_schema_id: SchemaId,
2341 ) -> MetaResult<NotificationVersion> {
2342 let (obj_type, id): (ObjectType, ObjectId) = match object {
2343 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2344 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2345 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2346 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2347 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id.into()),
2348 alter_set_schema_request::Object::ConnectionId(id) => {
2349 (ObjectType::Connection, id.into())
2350 }
2351 alter_set_schema_request::Object::SubscriptionId(id) => {
2352 (ObjectType::Subscription, id.into())
2353 }
2354 };
2355 self.metadata_manager
2356 .catalog_controller
2357 .alter_schema(obj_type, id, new_schema_id)
2358 .await
2359 }
2360
2361 pub async fn wait(&self) -> MetaResult<WaitVersion> {
2362 let timeout_ms = 30 * 60 * 1000;
2363 for _ in 0..timeout_ms {
2364 if self
2365 .metadata_manager
2366 .catalog_controller
2367 .list_background_creating_jobs(true, None)
2368 .await?
2369 .is_empty()
2370 {
2371 let catalog_version = self
2372 .metadata_manager
2373 .catalog_controller
2374 .notify_frontend_trivial()
2375 .await;
2376 let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2377 return Ok(WaitVersion {
2378 catalog_version,
2379 hummock_version_id,
2380 });
2381 }
2382
2383 sleep(Duration::from_millis(1)).await;
2384 }
2385 Err(MetaError::cancelled(format!(
2386 "timeout after {timeout_ms}ms"
2387 )))
2388 }
2389
2390 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2391 self.metadata_manager
2392 .catalog_controller
2393 .comment_on(comment)
2394 .await
2395 }
2396
2397 async fn alter_streaming_job_config(
2398 &self,
2399 job_id: JobId,
2400 entries_to_add: HashMap<String, String>,
2401 keys_to_remove: Vec<String>,
2402 ) -> MetaResult<NotificationVersion> {
2403 self.metadata_manager
2404 .catalog_controller
2405 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2406 .await
2407 }
2408}
2409
2410fn report_create_object(
2411 job_id: JobId,
2412 event_name: &str,
2413 obj_type: PbTelemetryDatabaseObject,
2414 connector_name: Option<String>,
2415 attr_info: Option<jsonbb::Value>,
2416) {
2417 report_event(
2418 PbTelemetryEventStage::CreateStreamJob,
2419 event_name,
2420 job_id.as_raw_id() as _,
2421 connector_name,
2422 Some(obj_type),
2423 attr_info,
2424 );
2425}
2426
2427pub fn build_upstream_sink_info(
2428 sink_id: SinkId,
2429 original_target_columns: Vec<PbColumnCatalog>,
2430 sink_fragment_id: FragmentId,
2431 target_table: &PbTable,
2432 target_fragment_id: FragmentId,
2433) -> MetaResult<UpstreamSinkInfo> {
2434 let sink_columns = if !original_target_columns.is_empty() {
2435 original_target_columns.clone()
2436 } else {
2437 target_table.columns.clone()
2442 };
2443
2444 let sink_output_fields = sink_columns
2445 .iter()
2446 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2447 .collect_vec();
2448 let output_indices = (0..sink_output_fields.len())
2449 .map(|i| i as u32)
2450 .collect_vec();
2451
2452 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2453 let sink_idx_by_col_id = sink_columns
2454 .iter()
2455 .enumerate()
2456 .map(|(idx, col)| {
2457 let column_id = col.column_desc.as_ref().unwrap().column_id;
2458 (column_id, idx as u32)
2459 })
2460 .collect::<HashMap<_, _>>();
2461 target_table
2462 .distribution_key
2463 .iter()
2464 .map(|dist_idx| {
2465 let column_id = target_table.columns[*dist_idx as usize]
2466 .column_desc
2467 .as_ref()
2468 .unwrap()
2469 .column_id;
2470 let sink_idx = sink_idx_by_col_id
2471 .get(&column_id)
2472 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2473 Ok(*sink_idx)
2474 })
2475 .collect::<anyhow::Result<Vec<_>>>()?
2476 };
2477 let dist_key_indices =
2478 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2479 let downstream_fragment_id = target_fragment_id as _;
2480 let new_downstream_relation = DownstreamFragmentRelation {
2481 downstream_fragment_id,
2482 dispatcher_type: DispatcherType::Hash,
2483 dist_key_indices,
2484 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2485 };
2486 let current_target_columns = target_table.get_columns();
2487 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2488 Ok(UpstreamSinkInfo {
2489 sink_id,
2490 sink_fragment_id: sink_fragment_id as _,
2491 sink_output_fields,
2492 sink_original_target_columns: original_target_columns,
2493 project_exprs,
2494 new_sink_downstream: new_downstream_relation,
2495 })
2496}
2497
2498pub fn refill_upstream_sink_union_in_table(
2499 union_fragment_root: &mut PbStreamNode,
2500 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2501) {
2502 visit_stream_node_cont_mut(union_fragment_root, |node| {
2503 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2504 let init_upstreams = upstream_sink_infos
2505 .iter()
2506 .map(|info| PbUpstreamSinkInfo {
2507 upstream_fragment_id: info.sink_fragment_id,
2508 sink_output_schema: info.sink_output_fields.clone(),
2509 project_exprs: info.project_exprs.clone(),
2510 })
2511 .collect();
2512 upstream_sink_union.init_upstreams = init_upstreams;
2513 false
2514 } else {
2515 true
2516 }
2517 });
2518}
2519
2520#[cfg(test)]
2521mod tests {
2522 use std::num::NonZeroUsize;
2523
2524 use super::*;
2525
2526 #[test]
2527 fn test_specified_parallelism_exceeds_available() {
2528 let result = DdlController::resolve_stream_parallelism_inner(
2529 Some(NonZeroUsize::new(100).unwrap()),
2530 NonZeroUsize::new(256).unwrap(),
2531 Some(NonZeroUsize::new(4).unwrap()),
2532 &DefaultParallelism::Full,
2533 "default",
2534 )
2535 .unwrap();
2536 assert_eq!(result.get(), 100);
2537 }
2538
2539 #[test]
2540 fn test_allows_default_parallelism_over_available() {
2541 let result = DdlController::resolve_stream_parallelism_inner(
2542 None,
2543 NonZeroUsize::new(256).unwrap(),
2544 Some(NonZeroUsize::new(4).unwrap()),
2545 &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2546 "default",
2547 )
2548 .unwrap();
2549 assert_eq!(result.get(), 50);
2550 }
2551
2552 #[test]
2553 fn test_full_parallelism_capped_by_max() {
2554 let result = DdlController::resolve_stream_parallelism_inner(
2555 None,
2556 NonZeroUsize::new(6).unwrap(),
2557 Some(NonZeroUsize::new(10).unwrap()),
2558 &DefaultParallelism::Full,
2559 "default",
2560 )
2561 .unwrap();
2562 assert_eq!(result.get(), 6);
2563 }
2564
2565 #[test]
2566 fn test_no_available_slots_returns_error() {
2567 let result = DdlController::resolve_stream_parallelism_inner(
2568 None,
2569 NonZeroUsize::new(4).unwrap(),
2570 None,
2571 &DefaultParallelism::Full,
2572 "default",
2573 );
2574 assert!(matches!(
2575 result,
2576 Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2577 ));
2578 }
2579
2580 #[test]
2581 fn test_specified_over_max_returns_error() {
2582 let result = DdlController::resolve_stream_parallelism_inner(
2583 Some(NonZeroUsize::new(8).unwrap()),
2584 NonZeroUsize::new(4).unwrap(),
2585 Some(NonZeroUsize::new(10).unwrap()),
2586 &DefaultParallelism::Full,
2587 "default",
2588 );
2589 assert!(matches!(
2590 result,
2591 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2592 ));
2593 }
2594}