1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27 AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46 ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47 SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50 Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
51 Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57 alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::stream_plan::stream_node::NodeBody;
61use risingwave_pb::stream_plan::{
62 PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
63 StreamFragmentGraph as StreamFragmentGraphProto,
64};
65use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
66use strum::Display;
67use thiserror_ext::AsReport;
68use tokio::sync::{OwnedSemaphorePermit, Semaphore};
69use tokio::time::sleep;
70use tracing::Instrument;
71
72use crate::barrier::{BarrierManagerRef, Command};
73use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
74use crate::controller::cluster::StreamingClusterInfo;
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::controller::utils::build_select_node_list;
77use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
78use crate::manager::sink_coordination::SinkCoordinatorManager;
79use crate::manager::{
80 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
81 NotificationVersion, StreamingJob, StreamingJobType,
82};
83use crate::model::{
84 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
85 FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
86 TableParallelism,
87};
88use crate::stream::cdc::{
89 parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
90};
91use crate::stream::{
92 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
93 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
94 FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
95 ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
96 UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
97 rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
98};
99use crate::telemetry::report_event;
100use crate::{MetaError, MetaResult};
101
102#[derive(PartialEq)]
103pub enum DropMode {
104 Restrict,
105 Cascade,
106}
107
108impl DropMode {
109 pub fn from_request_setting(cascade: bool) -> DropMode {
110 if cascade {
111 DropMode::Cascade
112 } else {
113 DropMode::Restrict
114 }
115 }
116}
117
118#[derive(strum::AsRefStr)]
119pub enum StreamingJobId {
120 MaterializedView(TableId),
121 Sink(SinkId),
122 Table(Option<SourceId>, TableId),
123 Index(IndexId),
124}
125
126impl std::fmt::Display for StreamingJobId {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 write!(f, "{}", self.as_ref())?;
129 write!(f, "({})", self.id())
130 }
131}
132
133impl StreamingJobId {
134 fn id(&self) -> JobId {
135 match self {
136 StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
137 StreamingJobId::Index(id) => id.as_job_id(),
138 StreamingJobId::Sink(id) => id.as_job_id(),
139 }
140 }
141}
142
143pub struct ReplaceStreamJobInfo {
146 pub streaming_job: StreamingJob,
147 pub fragment_graph: StreamFragmentGraphProto,
148}
149
150#[derive(Display)]
151pub enum DdlCommand {
152 CreateDatabase(Database),
153 DropDatabase(DatabaseId),
154 CreateSchema(Schema),
155 DropSchema(SchemaId, DropMode),
156 CreateNonSharedSource(Source),
157 DropSource(SourceId, DropMode),
158 ResetSource(SourceId),
159 CreateFunction(Function),
160 DropFunction(FunctionId, DropMode),
161 CreateView(View, HashSet<ObjectId>),
162 DropView(ViewId, DropMode),
163 CreateStreamingJob {
164 stream_job: StreamingJob,
165 fragment_graph: StreamFragmentGraphProto,
166 dependencies: HashSet<ObjectId>,
167 resource_type: streaming_job_resource_type::ResourceType,
168 if_not_exists: bool,
169 },
170 DropStreamingJob {
171 job_id: StreamingJobId,
172 drop_mode: DropMode,
173 },
174 AlterName(alter_name_request::Object, String),
175 AlterSwapRename(alter_swap_rename_request::Object),
176 ReplaceStreamJob(ReplaceStreamJobInfo),
177 AlterNonSharedSource(Source),
178 AlterObjectOwner(Object, UserId),
179 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
180 CreateConnection(Connection),
181 DropConnection(ConnectionId, DropMode),
182 CreateSecret(Secret),
183 AlterSecret(Secret),
184 DropSecret(SecretId),
185 CommentOn(Comment),
186 CreateSubscription(Subscription),
187 DropSubscription(SubscriptionId, DropMode),
188 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
189 AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
190}
191
192impl DdlCommand {
193 fn object(&self) -> Either<String, ObjectId> {
195 use Either::*;
196 match self {
197 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
198 DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
199 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
200 DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
201 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
202 DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
203 DdlCommand::ResetSource(id) => Right(id.as_object_id()),
204 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
205 DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
206 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
207 DdlCommand::DropView(id, _) => Right(id.as_object_id()),
208 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
209 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
210 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
211 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
212 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
213 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
214 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
215 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
216 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
217 DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
218 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
219 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
220 DdlCommand::DropSecret(id) => Right(id.as_object_id()),
221 DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
222 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
223 DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
224 DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
225 DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
226 }
227 }
228
229 fn allow_in_recovery(&self) -> bool {
230 match self {
231 DdlCommand::DropDatabase(_)
232 | DdlCommand::DropSchema(_, _)
233 | DdlCommand::DropSource(_, _)
234 | DdlCommand::DropFunction(_, _)
235 | DdlCommand::DropView(_, _)
236 | DdlCommand::DropStreamingJob { .. }
237 | DdlCommand::DropConnection(_, _)
238 | DdlCommand::DropSecret(_)
239 | DdlCommand::DropSubscription(_, _)
240 | DdlCommand::AlterName(_, _)
241 | DdlCommand::AlterObjectOwner(_, _)
242 | DdlCommand::AlterSetSchema(_, _)
243 | DdlCommand::CreateDatabase(_)
244 | DdlCommand::CreateSchema(_)
245 | DdlCommand::CreateFunction(_)
246 | DdlCommand::CreateView(_, _)
247 | DdlCommand::CreateConnection(_)
248 | DdlCommand::CommentOn(_)
249 | DdlCommand::CreateSecret(_)
250 | DdlCommand::AlterSecret(_)
251 | DdlCommand::AlterSwapRename(_)
252 | DdlCommand::AlterDatabaseParam(_, _)
253 | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
254 DdlCommand::CreateStreamingJob { .. }
255 | DdlCommand::CreateNonSharedSource(_)
256 | DdlCommand::ReplaceStreamJob(_)
257 | DdlCommand::AlterNonSharedSource(_)
258 | DdlCommand::ResetSource(_)
259 | DdlCommand::CreateSubscription(_) => false,
260 }
261 }
262}
263
264#[derive(Clone)]
265pub struct DdlController {
266 pub(crate) env: MetaSrvEnv,
267
268 pub(crate) metadata_manager: MetadataManager,
269 pub(crate) stream_manager: GlobalStreamManagerRef,
270 pub(crate) source_manager: SourceManagerRef,
271 barrier_manager: BarrierManagerRef,
272 sink_manager: SinkCoordinatorManager,
273
274 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
276
277 seq: Arc<AtomicU64>,
279}
280
281#[derive(Clone)]
282pub struct CreatingStreamingJobPermit {
283 pub(crate) semaphore: Arc<Semaphore>,
284}
285
286impl CreatingStreamingJobPermit {
287 async fn new(env: &MetaSrvEnv) -> Self {
288 let mut permits = env
289 .system_params_reader()
290 .await
291 .max_concurrent_creating_streaming_jobs() as usize;
292 if permits == 0 {
293 permits = Semaphore::MAX_PERMITS;
295 }
296 let semaphore = Arc::new(Semaphore::new(permits));
297
298 let (local_notification_tx, mut local_notification_rx) =
299 tokio::sync::mpsc::unbounded_channel();
300 env.notification_manager()
301 .insert_local_sender(local_notification_tx);
302 let semaphore_clone = semaphore.clone();
303 tokio::spawn(async move {
304 while let Some(notification) = local_notification_rx.recv().await {
305 let LocalNotification::SystemParamsChange(p) = ¬ification else {
306 continue;
307 };
308 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
309 if new_permits == 0 {
310 new_permits = Semaphore::MAX_PERMITS;
311 }
312 match permits.cmp(&new_permits) {
313 Ordering::Less => {
314 semaphore_clone.add_permits(new_permits - permits);
315 }
316 Ordering::Equal => continue,
317 Ordering::Greater => {
318 let to_release = permits - new_permits;
319 let reduced = semaphore_clone.forget_permits(to_release);
320 if reduced != to_release {
322 tracing::warn!(
323 "no enough permits to release, expected {}, but reduced {}",
324 to_release,
325 reduced
326 );
327 }
328 }
329 }
330 tracing::info!(
331 "max_concurrent_creating_streaming_jobs changed from {} to {}",
332 permits,
333 new_permits
334 );
335 permits = new_permits;
336 }
337 });
338
339 Self { semaphore }
340 }
341}
342
343impl DdlController {
344 pub async fn new(
345 env: MetaSrvEnv,
346 metadata_manager: MetadataManager,
347 stream_manager: GlobalStreamManagerRef,
348 source_manager: SourceManagerRef,
349 barrier_manager: BarrierManagerRef,
350 sink_manager: SinkCoordinatorManager,
351 ) -> Self {
352 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
353 Self {
354 env,
355 metadata_manager,
356 stream_manager,
357 source_manager,
358 barrier_manager,
359 sink_manager,
360 creating_streaming_job_permits,
361 seq: Arc::new(AtomicU64::new(0)),
362 }
363 }
364
365 pub fn next_seq(&self) -> u64 {
367 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
369 }
370
371 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
378 if !command.allow_in_recovery() {
379 self.barrier_manager.check_status_running()?;
380 }
381
382 let await_tree_key = format!("DDL Command {}", self.next_seq());
383 let await_tree_span = await_tree::span!("{command}({})", command.object());
384
385 let ctrl = self.clone();
386 let fut = async move {
387 match command {
388 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
389 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
390 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
391 DdlCommand::DropSchema(schema_id, drop_mode) => {
392 ctrl.drop_schema(schema_id, drop_mode).await
393 }
394 DdlCommand::CreateNonSharedSource(source) => {
395 ctrl.create_non_shared_source(source).await
396 }
397 DdlCommand::DropSource(source_id, drop_mode) => {
398 ctrl.drop_source(source_id, drop_mode).await
399 }
400 DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
401 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
402 DdlCommand::DropFunction(function_id, drop_mode) => {
403 ctrl.drop_function(function_id, drop_mode).await
404 }
405 DdlCommand::CreateView(view, dependencies) => {
406 ctrl.create_view(view, dependencies).await
407 }
408 DdlCommand::DropView(view_id, drop_mode) => {
409 ctrl.drop_view(view_id, drop_mode).await
410 }
411 DdlCommand::CreateStreamingJob {
412 stream_job,
413 fragment_graph,
414 dependencies,
415 resource_type,
416 if_not_exists,
417 } => {
418 ctrl.create_streaming_job(
419 stream_job,
420 fragment_graph,
421 dependencies,
422 resource_type,
423 if_not_exists,
424 )
425 .await
426 }
427 DdlCommand::DropStreamingJob { job_id, drop_mode } => {
428 ctrl.drop_streaming_job(job_id, drop_mode).await
429 }
430 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
431 streaming_job,
432 fragment_graph,
433 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
434 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
435 DdlCommand::AlterObjectOwner(object, owner_id) => {
436 ctrl.alter_owner(object, owner_id).await
437 }
438 DdlCommand::AlterSetSchema(object, new_schema_id) => {
439 ctrl.alter_set_schema(object, new_schema_id).await
440 }
441 DdlCommand::CreateConnection(connection) => {
442 ctrl.create_connection(connection).await
443 }
444 DdlCommand::DropConnection(connection_id, drop_mode) => {
445 ctrl.drop_connection(connection_id, drop_mode).await
446 }
447 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
448 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
449 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
450 DdlCommand::AlterNonSharedSource(source) => {
451 ctrl.alter_non_shared_source(source).await
452 }
453 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
454 DdlCommand::CreateSubscription(subscription) => {
455 ctrl.create_subscription(subscription).await
456 }
457 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
458 ctrl.drop_subscription(subscription_id, drop_mode).await
459 }
460 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
461 DdlCommand::AlterDatabaseParam(database_id, param) => {
462 ctrl.alter_database_param(database_id, param).await
463 }
464 DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
465 ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
466 .await
467 }
468 }
469 }
470 .in_current_span();
471 let fut = (self.env.await_tree_reg())
472 .register(await_tree_key, await_tree_span)
473 .instrument(Box::pin(fut));
474 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
475 Ok(Some(WaitVersion {
476 catalog_version: notification_version,
477 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
478 }))
479 }
480
481 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
482 self.barrier_manager.get_ddl_progress().await
483 }
484
485 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
486 let (version, updated_db) = self
487 .metadata_manager
488 .catalog_controller
489 .create_database(database)
490 .await?;
491 self.barrier_manager
493 .update_database_barrier(
494 updated_db.database_id,
495 updated_db.barrier_interval_ms.map(|v| v as u32),
496 updated_db.checkpoint_frequency.map(|v| v as u64),
497 )
498 .await?;
499 Ok(version)
500 }
501
502 #[tracing::instrument(skip(self), level = "debug")]
503 pub async fn reschedule_streaming_job(
504 &self,
505 job_id: JobId,
506 target: ReschedulePolicy,
507 mut deferred: bool,
508 ) -> MetaResult<()> {
509 tracing::info!("altering parallelism for job {}", job_id);
510 if self.barrier_manager.check_status_running().is_err() {
511 tracing::info!(
512 "alter parallelism is set to deferred mode because the system is in recovery state"
513 );
514 deferred = true;
515 }
516
517 self.stream_manager
518 .reschedule_streaming_job(job_id, target, deferred)
519 .await
520 }
521
522 pub async fn reschedule_cdc_table_backfill(
523 &self,
524 job_id: JobId,
525 target: ReschedulePolicy,
526 ) -> MetaResult<()> {
527 tracing::info!("alter CDC table backfill parallelism");
528 if self.barrier_manager.check_status_running().is_err() {
529 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
530 }
531 self.stream_manager
532 .reschedule_cdc_table_backfill(job_id, target)
533 .await
534 }
535
536 pub async fn reschedule_fragments(
537 &self,
538 fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
539 ) -> MetaResult<()> {
540 tracing::info!(
541 "altering parallelism for fragments {:?}",
542 fragment_targets.keys()
543 );
544 let fragment_targets = fragment_targets
545 .into_iter()
546 .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
547 .collect();
548
549 self.stream_manager
550 .reschedule_fragments(fragment_targets)
551 .await
552 }
553
554 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
555 self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
556 .await
557 }
558
559 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
560 self.metadata_manager
561 .catalog_controller
562 .create_schema(schema)
563 .await
564 }
565
566 async fn drop_schema(
567 &self,
568 schema_id: SchemaId,
569 drop_mode: DropMode,
570 ) -> MetaResult<NotificationVersion> {
571 self.drop_object(ObjectType::Schema, schema_id, drop_mode)
572 .await
573 }
574
575 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
577 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
578 .await
579 .context("failed to create source worker")?;
580
581 let (source_id, version) = self
582 .metadata_manager
583 .catalog_controller
584 .create_source(source)
585 .await?;
586 self.source_manager
587 .register_source_with_handle(source_id, handle)
588 .await;
589 Ok(version)
590 }
591
592 async fn drop_source(
593 &self,
594 source_id: SourceId,
595 drop_mode: DropMode,
596 ) -> MetaResult<NotificationVersion> {
597 self.drop_object(ObjectType::Source, source_id, drop_mode)
598 .await
599 }
600
601 async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
602 tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
603
604 let database_id = self
606 .metadata_manager
607 .catalog_controller
608 .get_object_database_id(source_id)
609 .await?;
610
611 self.stream_manager
612 .barrier_scheduler
613 .run_command(database_id, Command::ResetSource { source_id })
614 .await?;
615
616 let version = self
618 .metadata_manager
619 .catalog_controller
620 .current_notification_version()
621 .await;
622 Ok(version)
623 }
624
625 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
628 self.metadata_manager
629 .catalog_controller
630 .alter_non_shared_source(source)
631 .await
632 }
633
634 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
635 self.metadata_manager
636 .catalog_controller
637 .create_function(function)
638 .await
639 }
640
641 async fn drop_function(
642 &self,
643 function_id: FunctionId,
644 drop_mode: DropMode,
645 ) -> MetaResult<NotificationVersion> {
646 self.drop_object(ObjectType::Function, function_id, drop_mode)
647 .await
648 }
649
650 async fn create_view(
651 &self,
652 view: View,
653 dependencies: HashSet<ObjectId>,
654 ) -> MetaResult<NotificationVersion> {
655 self.metadata_manager
656 .catalog_controller
657 .create_view(view, dependencies)
658 .await
659 }
660
661 async fn drop_view(
662 &self,
663 view_id: ViewId,
664 drop_mode: DropMode,
665 ) -> MetaResult<NotificationVersion> {
666 self.drop_object(ObjectType::View, view_id, drop_mode).await
667 }
668
669 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
670 validate_connection(&connection).await?;
671 self.metadata_manager
672 .catalog_controller
673 .create_connection(connection)
674 .await
675 }
676
677 async fn drop_connection(
678 &self,
679 connection_id: ConnectionId,
680 drop_mode: DropMode,
681 ) -> MetaResult<NotificationVersion> {
682 self.drop_object(ObjectType::Connection, connection_id, drop_mode)
683 .await
684 }
685
686 async fn alter_database_param(
687 &self,
688 database_id: DatabaseId,
689 param: AlterDatabaseParam,
690 ) -> MetaResult<NotificationVersion> {
691 let (version, updated_db) = self
692 .metadata_manager
693 .catalog_controller
694 .alter_database_param(database_id, param)
695 .await?;
696 self.barrier_manager
698 .update_database_barrier(
699 database_id,
700 updated_db.barrier_interval_ms.map(|v| v as u32),
701 updated_db.checkpoint_frequency.map(|v| v as u64),
702 )
703 .await?;
704 Ok(version)
705 }
706
707 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
710 let secret_store_private_key = self
711 .env
712 .opts
713 .secret_store_private_key
714 .clone()
715 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
716
717 let encrypted_payload = SecretEncryption::encrypt(
718 secret_store_private_key.as_slice(),
719 secret.get_value().as_slice(),
720 )
721 .context(format!("failed to encrypt secret {}", secret.name))?;
722 Ok(encrypted_payload
723 .serialize()
724 .context(format!("failed to serialize secret {}", secret.name))?)
725 }
726
727 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
728 let secret_plain_payload = secret.value.clone();
731 let encrypted_payload = self.get_encrypted_payload(&secret)?;
732 secret.value = encrypted_payload;
733
734 self.metadata_manager
735 .catalog_controller
736 .create_secret(secret, secret_plain_payload)
737 .await
738 }
739
740 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
741 self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
742 .await
743 }
744
745 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
746 let secret_plain_payload = secret.value.clone();
747 let encrypted_payload = self.get_encrypted_payload(&secret)?;
748 secret.value = encrypted_payload;
749 self.metadata_manager
750 .catalog_controller
751 .alter_secret(secret, secret_plain_payload)
752 .await
753 }
754
755 async fn create_subscription(
756 &self,
757 mut subscription: Subscription,
758 ) -> MetaResult<NotificationVersion> {
759 tracing::debug!("create subscription");
760 let _permit = self
761 .creating_streaming_job_permits
762 .semaphore
763 .acquire()
764 .await
765 .unwrap();
766 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
767 self.metadata_manager
768 .catalog_controller
769 .create_subscription_catalog(&mut subscription)
770 .await?;
771 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
772 tracing::debug!(error = %err.as_report(), "failed to create subscription");
773 let _ = self
774 .metadata_manager
775 .catalog_controller
776 .try_abort_creating_subscription(subscription.id)
777 .await
778 .inspect_err(|e| {
779 tracing::error!(
780 error = %e.as_report(),
781 "failed to abort create subscription after failure"
782 );
783 });
784 return Err(err);
785 }
786
787 let version = self
788 .metadata_manager
789 .catalog_controller
790 .notify_create_subscription(subscription.id)
791 .await?;
792 tracing::debug!("finish create subscription");
793 Ok(version)
794 }
795
796 async fn drop_subscription(
797 &self,
798 subscription_id: SubscriptionId,
799 drop_mode: DropMode,
800 ) -> MetaResult<NotificationVersion> {
801 tracing::debug!("preparing drop subscription");
802 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
803 let subscription = self
804 .metadata_manager
805 .catalog_controller
806 .get_subscription_by_id(subscription_id)
807 .await?;
808 let table_id = subscription.dependent_table_id;
809 let database_id = subscription.database_id;
810 let (_, version) = self
811 .metadata_manager
812 .catalog_controller
813 .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
814 .await?;
815 self.stream_manager
816 .drop_subscription(database_id, subscription_id, table_id)
817 .await;
818 tracing::debug!("finish drop subscription");
819 Ok(version)
820 }
821
822 #[await_tree::instrument]
824 pub(crate) async fn validate_cdc_table(
825 &self,
826 table: &Table,
827 table_fragments: &StreamJobFragments,
828 ) -> MetaResult<()> {
829 let stream_scan_fragment = table_fragments
830 .fragments
831 .values()
832 .filter(|f| {
833 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
834 || f.fragment_type_mask
835 .contains(FragmentTypeFlag::StreamCdcScan)
836 })
837 .exactly_one()
838 .ok()
839 .with_context(|| {
840 format!(
841 "expect exactly one stream scan fragment, got: {:?}",
842 table_fragments.fragments
843 )
844 })?;
845 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
846 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
847 if let Some(o) = node.options
848 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
849 {
850 } else {
852 assert_eq!(
853 stream_scan_fragment.actors.len(),
854 1,
855 "Stream scan fragment should have only one actor"
856 );
857 }
858 }
859 }
860 let mut found_cdc_scan = false;
861 match &stream_scan_fragment.nodes.node_body {
862 Some(NodeBody::StreamCdcScan(_)) => {
863 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
864 if self
865 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
866 .await?
867 {
868 found_cdc_scan = true;
869 }
870 }
871 Some(NodeBody::Project(_)) => {
873 for input in &stream_scan_fragment.nodes.input {
874 assert_parallelism(stream_scan_fragment, &input.node_body);
875 if self
876 .validate_cdc_table_inner(&input.node_body, table.id)
877 .await?
878 {
879 found_cdc_scan = true;
880 }
881 }
882 }
883 _ => {
884 bail!("Unexpected node body for stream cdc scan");
885 }
886 };
887 if !found_cdc_scan {
888 bail!("No stream cdc scan node found in stream scan fragment");
889 }
890 Ok(())
891 }
892
893 async fn validate_cdc_table_inner(
894 &self,
895 node_body: &Option<NodeBody>,
896 table_id: TableId,
897 ) -> MetaResult<bool> {
898 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
899 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
900 {
901 let options_with_secret = WithOptionsSecResolved::new(
902 cdc_table_desc.connect_properties.clone(),
903 cdc_table_desc.secret_refs.clone(),
904 );
905
906 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
907 props.init_from_pb_cdc_table_desc(cdc_table_desc);
908
909 let _enumerator = props
911 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
912 .await?;
913
914 tracing::debug!(?table_id, "validate cdc table success");
915 Ok(true)
916 } else {
917 Ok(false)
918 }
919 }
920
921 pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
922 let migrated = self
923 .metadata_manager
924 .catalog_controller
925 .has_table_been_migrated(table_id)
926 .await?;
927 if !migrated {
928 Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
929 } else {
930 Ok(())
931 }
932 }
933
934 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
937 pub async fn create_streaming_job(
938 &self,
939 mut streaming_job: StreamingJob,
940 fragment_graph: StreamFragmentGraphProto,
941 dependencies: HashSet<ObjectId>,
942 resource_type: streaming_job_resource_type::ResourceType,
943 if_not_exists: bool,
944 ) -> MetaResult<NotificationVersion> {
945 if let StreamingJob::Sink(sink) = &streaming_job
946 && let Some(target_table) = sink.target_table
947 {
948 self.validate_table_for_sink(target_table).await?;
949 }
950 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
951 let specific_resource_group = resource_type.resource_group();
952 let check_ret = self
953 .metadata_manager
954 .catalog_controller
955 .create_job_catalog(
956 &mut streaming_job,
957 &ctx,
958 &fragment_graph.parallelism,
959 fragment_graph.max_parallelism as _,
960 dependencies,
961 specific_resource_group,
962 &fragment_graph.backfill_parallelism,
963 )
964 .await;
965 if let Err(meta_err) = check_ret {
966 if !if_not_exists {
967 return Err(meta_err);
968 }
969 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
970 if streaming_job.create_type() == CreateType::Foreground {
971 let database_id = streaming_job.database_id();
972 self.metadata_manager
973 .wait_streaming_job_finished(database_id, *job_id)
974 .await
975 } else {
976 Ok(IGNORED_NOTIFICATION_VERSION)
977 }
978 } else {
979 Err(meta_err)
980 };
981 }
982 let job_id = streaming_job.id();
983 tracing::debug!(
984 id = %job_id,
985 definition = streaming_job.definition(),
986 create_type = streaming_job.create_type().as_str_name(),
987 job_type = ?streaming_job.job_type(),
988 "starting streaming job",
989 );
990 let permit = self
992 .creating_streaming_job_permits
993 .semaphore
994 .clone()
995 .acquire_owned()
996 .instrument_await("acquire_creating_streaming_job_permit")
997 .await
998 .unwrap();
999 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1000
1001 let name = streaming_job.name();
1002 let definition = streaming_job.definition();
1003 let source_id = match &streaming_job {
1004 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1005 _ => None,
1006 };
1007
1008 match self
1010 .create_streaming_job_inner(ctx, streaming_job, fragment_graph, resource_type, permit)
1011 .await
1012 {
1013 Ok(version) => Ok(version),
1014 Err(err) => {
1015 tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1016 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1017 id: job_id,
1018 name,
1019 definition,
1020 error: err.as_report().to_string(),
1021 };
1022 self.env.event_log_manager_ref().add_event_logs(vec![
1023 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1024 ]);
1025 let (aborted, _) = self
1026 .metadata_manager
1027 .catalog_controller
1028 .try_abort_creating_streaming_job(job_id, false)
1029 .await?;
1030 if aborted {
1031 tracing::warn!(id = %job_id, "aborted streaming job");
1032 if let Some(source_id) = source_id {
1034 self.source_manager
1035 .apply_source_change(SourceChange::DropSource {
1036 dropped_source_ids: vec![source_id],
1037 })
1038 .await;
1039 }
1040 }
1041 Err(err)
1042 }
1043 }
1044 }
1045
1046 #[await_tree::instrument(boxed)]
1047 async fn create_streaming_job_inner(
1048 &self,
1049 ctx: StreamContext,
1050 mut streaming_job: StreamingJob,
1051 fragment_graph: StreamFragmentGraphProto,
1052 resource_type: streaming_job_resource_type::ResourceType,
1053 permit: OwnedSemaphorePermit,
1054 ) -> MetaResult<NotificationVersion> {
1055 let mut fragment_graph =
1056 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1057 streaming_job.set_info_from_graph(&fragment_graph);
1058
1059 let incomplete_internal_tables = fragment_graph
1061 .incomplete_internal_tables()
1062 .into_values()
1063 .collect_vec();
1064 let table_id_map = self
1065 .metadata_manager
1066 .catalog_controller
1067 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1068 .await?;
1069 fragment_graph.refill_internal_table_ids(table_id_map);
1070
1071 tracing::debug!(id = %streaming_job.id(), "building streaming job");
1073 let (ctx, stream_job_fragments) = self
1074 .build_stream_job(ctx, streaming_job, fragment_graph, resource_type)
1075 .await?;
1076
1077 let streaming_job = &ctx.streaming_job;
1078
1079 match streaming_job {
1080 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1081 self.validate_cdc_table(table, &stream_job_fragments)
1082 .await?;
1083 }
1084 StreamingJob::Table(Some(source), ..) => {
1085 self.source_manager.register_source(source).await?;
1087 let connector_name = source
1088 .get_with_properties()
1089 .get(UPSTREAM_SOURCE_KEY)
1090 .cloned();
1091 let attr = source.info.as_ref().map(|source_info| {
1092 jsonbb::json!({
1093 "format": source_info.format().as_str_name(),
1094 "encode": source_info.row_encode().as_str_name(),
1095 })
1096 });
1097 report_create_object(
1098 streaming_job.id(),
1099 "source",
1100 PbTelemetryDatabaseObject::Source,
1101 connector_name,
1102 attr,
1103 );
1104 }
1105 StreamingJob::Sink(sink) => {
1106 if sink.auto_refresh_schema_from_table.is_some() {
1107 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1108 }
1109 validate_sink(sink).await?;
1111 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1112 let attr = sink.format_desc.as_ref().map(|sink_info| {
1113 jsonbb::json!({
1114 "format": sink_info.format().as_str_name(),
1115 "encode": sink_info.encode().as_str_name(),
1116 })
1117 });
1118 report_create_object(
1119 streaming_job.id(),
1120 "sink",
1121 PbTelemetryDatabaseObject::Sink,
1122 connector_name,
1123 attr,
1124 );
1125 }
1126 StreamingJob::Source(source) => {
1127 self.source_manager.register_source(source).await?;
1129 let connector_name = source
1130 .get_with_properties()
1131 .get(UPSTREAM_SOURCE_KEY)
1132 .cloned();
1133 let attr = source.info.as_ref().map(|source_info| {
1134 jsonbb::json!({
1135 "format": source_info.format().as_str_name(),
1136 "encode": source_info.row_encode().as_str_name(),
1137 })
1138 });
1139 report_create_object(
1140 streaming_job.id(),
1141 "source",
1142 PbTelemetryDatabaseObject::Source,
1143 connector_name,
1144 attr,
1145 );
1146 }
1147 _ => {}
1148 }
1149
1150 self.metadata_manager
1151 .catalog_controller
1152 .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1153 .await?;
1154
1155 let version = self
1157 .stream_manager
1158 .create_streaming_job(stream_job_fragments, ctx, permit)
1159 .await?;
1160
1161 Ok(version)
1162 }
1163
1164 pub async fn drop_object(
1166 &self,
1167 object_type: ObjectType,
1168 object_id: impl Into<ObjectId>,
1169 drop_mode: DropMode,
1170 ) -> MetaResult<NotificationVersion> {
1171 let object_id = object_id.into();
1172 let (release_ctx, version) = self
1173 .metadata_manager
1174 .catalog_controller
1175 .drop_object(object_type, object_id, drop_mode)
1176 .await?;
1177
1178 if object_type == ObjectType::Source {
1179 self.env
1180 .notification_manager_ref()
1181 .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1182 }
1183
1184 let ReleaseContext {
1185 database_id,
1186 removed_streaming_job_ids,
1187 removed_state_table_ids,
1188 removed_source_ids,
1189 removed_secret_ids: secret_ids,
1190 removed_source_fragments,
1191 removed_actors,
1192 removed_fragments,
1193 removed_sink_fragment_by_targets,
1194 removed_iceberg_table_sinks,
1195 } = release_ctx;
1196
1197 let _guard = self.source_manager.pause_tick().await;
1198 self.stream_manager
1199 .drop_streaming_jobs(
1200 database_id,
1201 removed_actors.iter().map(|id| *id as _).collect(),
1202 removed_streaming_job_ids,
1203 removed_state_table_ids,
1204 removed_fragments.iter().map(|id| *id as _).collect(),
1205 removed_sink_fragment_by_targets
1206 .into_iter()
1207 .map(|(target, sinks)| {
1208 (target as _, sinks.into_iter().map(|id| id as _).collect())
1209 })
1210 .collect(),
1211 )
1212 .await;
1213
1214 self.source_manager
1217 .apply_source_change(SourceChange::DropSource {
1218 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1219 })
1220 .await;
1221
1222 let dropped_source_fragments = removed_source_fragments;
1225 self.source_manager
1226 .apply_source_change(SourceChange::DropMv {
1227 dropped_source_fragments,
1228 })
1229 .await;
1230
1231 let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1233 .iter()
1234 .map(|sink| sink.id)
1235 .collect();
1236
1237 for sink in removed_iceberg_table_sinks {
1238 let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1239 .expect("Iceberg sink should be valid");
1240 let iceberg_sink =
1241 IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1242 if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1243 let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1244 tracing::info!(
1245 "dropping iceberg table {} for dropped sink",
1246 table_identifier
1247 );
1248
1249 let _ = iceberg_catalog
1250 .drop_table(&table_identifier)
1251 .await
1252 .inspect_err(|err| {
1253 tracing::error!(
1254 "failed to drop iceberg table {} during cleanup: {}",
1255 table_identifier,
1256 err.as_report()
1257 );
1258 });
1259 }
1260 }
1261
1262 if !iceberg_sink_ids.is_empty() {
1264 self.sink_manager
1265 .stop_sink_coordinator(iceberg_sink_ids)
1266 .await;
1267 }
1268
1269 for secret in secret_ids {
1271 LocalSecretManager::global().remove_secret(secret);
1272 }
1273 Ok(version)
1274 }
1275
1276 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1278 pub async fn replace_job(
1279 &self,
1280 mut streaming_job: StreamingJob,
1281 fragment_graph: StreamFragmentGraphProto,
1282 ) -> MetaResult<NotificationVersion> {
1283 match &streaming_job {
1284 StreamingJob::Table(..)
1285 | StreamingJob::Source(..)
1286 | StreamingJob::MaterializedView(..) => {}
1287 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1288 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1289 }
1290 }
1291
1292 let job_id = streaming_job.id();
1293
1294 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1295 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1296
1297 let original_max_parallelism = self
1299 .metadata_manager
1300 .get_job_max_parallelism(streaming_job.id())
1301 .await?;
1302 let fragment_graph = PbStreamFragmentGraph {
1303 max_parallelism: original_max_parallelism as _,
1304 ..fragment_graph
1305 };
1306
1307 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1309 streaming_job.set_info_from_graph(&fragment_graph);
1310
1311 let streaming_job = streaming_job;
1313
1314 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1315 let auto_refresh_schema_sinks = self
1316 .metadata_manager
1317 .catalog_controller
1318 .get_sink_auto_refresh_schema_from(table.id)
1319 .await?;
1320 if !auto_refresh_schema_sinks.is_empty() {
1321 let original_table_columns = self
1322 .metadata_manager
1323 .catalog_controller
1324 .get_table_columns(table.id)
1325 .await?;
1326 let mut original_table_column_ids: HashSet<_> = original_table_columns
1328 .iter()
1329 .map(|col| col.column_id())
1330 .collect();
1331 let newly_added_columns = table
1332 .columns
1333 .iter()
1334 .filter(|col| {
1335 !original_table_column_ids.remove(&ColumnId::new(
1336 col.column_desc.as_ref().unwrap().column_id as _,
1337 ))
1338 })
1339 .map(|col| ColumnCatalog::from(col.clone()))
1340 .collect_vec();
1341 if !original_table_column_ids.is_empty() {
1342 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1343 }
1344 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1345 for sink in auto_refresh_schema_sinks {
1346 let sink_job_fragments = self
1347 .metadata_manager
1348 .get_job_fragments_by_id(sink.id.as_job_id())
1349 .await?;
1350 if sink_job_fragments.fragments.len() != 1 {
1351 return Err(anyhow!(
1352 "auto schema refresh sink must have only one fragment, but got {}",
1353 sink_job_fragments.fragments.len()
1354 )
1355 .into());
1356 }
1357 let original_sink_fragment =
1358 sink_job_fragments.fragments.into_values().next().unwrap();
1359 let (new_sink_fragment, new_schema, new_log_store_table) =
1360 rewrite_refresh_schema_sink_fragment(
1361 &original_sink_fragment,
1362 &sink,
1363 &newly_added_columns,
1364 table,
1365 fragment_graph.table_fragment_id(),
1366 self.env.id_gen_manager(),
1367 self.env.actor_id_generator(),
1368 )?;
1369
1370 assert_eq!(
1371 original_sink_fragment.actors.len(),
1372 new_sink_fragment.actors.len()
1373 );
1374 let actor_status = (0..original_sink_fragment.actors.len())
1375 .map(|i| {
1376 let worker_node_id = sink_job_fragments.actor_status
1377 [&original_sink_fragment.actors[i].actor_id]
1378 .location
1379 .as_ref()
1380 .unwrap()
1381 .worker_node_id;
1382 (
1383 new_sink_fragment.actors[i].actor_id,
1384 PbActorStatus {
1385 location: Some(PbActorLocation { worker_node_id }),
1386 },
1387 )
1388 })
1389 .collect();
1390
1391 let streaming_job = StreamingJob::Sink(sink);
1392
1393 let tmp_sink_id = self
1394 .metadata_manager
1395 .catalog_controller
1396 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1397 .await?
1398 .as_sink_id();
1399 let StreamingJob::Sink(sink) = streaming_job else {
1400 unreachable!()
1401 };
1402
1403 sinks.push(AutoRefreshSchemaSinkContext {
1404 tmp_sink_id,
1405 original_sink: sink,
1406 original_fragment: original_sink_fragment,
1407 new_schema,
1408 newly_add_fields: newly_added_columns
1409 .iter()
1410 .map(|col| Field::from(&col.column_desc))
1411 .collect(),
1412 new_fragment: new_sink_fragment,
1413 new_log_store_table,
1414 actor_status,
1415 });
1416 }
1417 Some(sinks)
1418 } else {
1419 None
1420 }
1421 } else {
1422 None
1423 };
1424
1425 let tmp_id = self
1426 .metadata_manager
1427 .catalog_controller
1428 .create_job_catalog_for_replace(
1429 &streaming_job,
1430 Some(&ctx),
1431 fragment_graph.specified_parallelism().as_ref(),
1432 Some(fragment_graph.max_parallelism()),
1433 )
1434 .await?;
1435
1436 let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1437 sinks
1438 .iter()
1439 .map(|sink| sink.tmp_sink_id.as_object_id())
1440 .collect_vec()
1441 });
1442
1443 tracing::debug!(id = %job_id, "building replace streaming job");
1444 let mut updated_sink_catalogs = vec![];
1445
1446 let mut drop_table_connector_ctx = None;
1447 let result: MetaResult<_> = try {
1448 let (mut ctx, mut stream_job_fragments) = self
1449 .build_replace_job(
1450 ctx,
1451 &streaming_job,
1452 fragment_graph,
1453 tmp_id,
1454 auto_refresh_schema_sinks,
1455 )
1456 .await?;
1457 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1458 let auto_refresh_schema_sink_finish_ctx =
1459 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1460 sinks
1461 .iter()
1462 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1463 tmp_sink_id: sink.tmp_sink_id,
1464 original_sink_id: sink.original_sink.id,
1465 columns: sink.new_schema.clone(),
1466 new_log_store_table: sink
1467 .new_log_store_table
1468 .as_ref()
1469 .map(|table| (table.id, table.columns.clone())),
1470 })
1471 .collect()
1472 });
1473
1474 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1476 let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1477 let upstream_infos = self
1478 .metadata_manager
1479 .catalog_controller
1480 .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1481 .await?;
1482 refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1483
1484 for upstream_info in &upstream_infos {
1485 let upstream_fragment_id = upstream_info.sink_fragment_id;
1486 ctx.upstream_fragment_downstreams
1487 .entry(upstream_fragment_id)
1488 .or_default()
1489 .push(upstream_info.new_sink_downstream.clone());
1490 if upstream_info.sink_original_target_columns.is_empty() {
1491 updated_sink_catalogs.push(upstream_info.sink_id);
1492 }
1493 }
1494 }
1495
1496 let replace_upstream = ctx.replace_upstream.clone();
1497
1498 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1499 let empty_downstreams = FragmentDownstreamRelation::default();
1500 for sink in sinks {
1501 self.metadata_manager
1502 .catalog_controller
1503 .prepare_streaming_job(
1504 sink.tmp_sink_id.as_job_id(),
1505 || [&sink.new_fragment].into_iter(),
1506 &empty_downstreams,
1507 true,
1508 None,
1509 )
1510 .await?;
1511 }
1512 }
1513
1514 self.metadata_manager
1515 .catalog_controller
1516 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1517 .await?;
1518
1519 self.stream_manager
1520 .replace_stream_job(stream_job_fragments, ctx)
1521 .await?;
1522 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1523 };
1524
1525 match result {
1526 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1527 let version = self
1528 .metadata_manager
1529 .catalog_controller
1530 .finish_replace_streaming_job(
1531 tmp_id,
1532 streaming_job,
1533 replace_upstream,
1534 SinkIntoTableContext {
1535 updated_sink_catalogs,
1536 },
1537 drop_table_connector_ctx.as_ref(),
1538 auto_refresh_schema_sink_finish_ctx,
1539 )
1540 .await?;
1541 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1542 self.source_manager
1543 .apply_source_change(SourceChange::DropSource {
1544 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1545 })
1546 .await;
1547 }
1548 Ok(version)
1549 }
1550 Err(err) => {
1551 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1552 let _ = self.metadata_manager
1553 .catalog_controller
1554 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1555 .await.inspect_err(|err| {
1556 tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1557 });
1558 Err(err)
1559 }
1560 }
1561 }
1562
1563 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1564 )]
1565 async fn drop_streaming_job(
1566 &self,
1567 job_id: StreamingJobId,
1568 drop_mode: DropMode,
1569 ) -> MetaResult<NotificationVersion> {
1570 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1571
1572 let (object_id, object_type) = match job_id {
1573 StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1574 StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1575 StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1576 StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1577 };
1578
1579 let job_status = self
1580 .metadata_manager
1581 .catalog_controller
1582 .get_streaming_job_status(job_id.id())
1583 .await?;
1584 let version = match job_status {
1585 JobStatus::Initial => {
1586 unreachable!(
1587 "Job with Initial status should not notify frontend and therefore should not arrive here"
1588 );
1589 }
1590 JobStatus::Creating => {
1591 self.stream_manager
1592 .cancel_streaming_jobs(vec![job_id.id()])
1593 .await?;
1594 IGNORED_NOTIFICATION_VERSION
1595 }
1596 JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1597 };
1598
1599 Ok(version)
1600 }
1601
1602 fn resolve_stream_parallelism(
1606 &self,
1607 specified: Option<NonZeroUsize>,
1608 max: NonZeroUsize,
1609 cluster_info: &StreamingClusterInfo,
1610 resource_group: String,
1611 ) -> MetaResult<NonZeroUsize> {
1612 let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1613 DdlController::resolve_stream_parallelism_inner(
1614 specified,
1615 max,
1616 available,
1617 &self.env.opts.default_parallelism,
1618 &resource_group,
1619 )
1620 }
1621
1622 fn resolve_stream_parallelism_inner(
1623 specified: Option<NonZeroUsize>,
1624 max: NonZeroUsize,
1625 available: Option<NonZeroUsize>,
1626 default_parallelism: &DefaultParallelism,
1627 resource_group: &str,
1628 ) -> MetaResult<NonZeroUsize> {
1629 let Some(available) = available else {
1630 bail_unavailable!(
1631 "no available slots to schedule in resource group \"{}\", \
1632 have you allocated any compute nodes within this resource group?",
1633 resource_group
1634 );
1635 };
1636
1637 if let Some(specified) = specified {
1638 if specified > max {
1639 bail_invalid_parameter!(
1640 "specified parallelism {} should not exceed max parallelism {}",
1641 specified,
1642 max,
1643 );
1644 }
1645 if specified > available {
1646 tracing::warn!(
1647 resource_group,
1648 specified_parallelism = specified.get(),
1649 available_parallelism = available.get(),
1650 "specified parallelism exceeds available slots, scheduling with specified value",
1651 );
1652 }
1653 return Ok(specified);
1654 }
1655
1656 let default_parallelism = match default_parallelism {
1658 DefaultParallelism::Full => available,
1659 DefaultParallelism::Default(num) => {
1660 if *num > available {
1661 tracing::warn!(
1662 resource_group,
1663 configured_parallelism = num.get(),
1664 available_parallelism = available.get(),
1665 "default parallelism exceeds available slots, scheduling with configured value",
1666 );
1667 }
1668 *num
1669 }
1670 };
1671
1672 if default_parallelism > max {
1673 tracing::warn!(
1674 max_parallelism = max.get(),
1675 resource_group,
1676 "default parallelism exceeds max parallelism, capping to max",
1677 );
1678 }
1679 Ok(default_parallelism.min(max))
1680 }
1681
1682 #[await_tree::instrument]
1688 pub(crate) async fn build_stream_job(
1689 &self,
1690 stream_ctx: StreamContext,
1691 mut stream_job: StreamingJob,
1692 fragment_graph: StreamFragmentGraph,
1693 resource_type: streaming_job_resource_type::ResourceType,
1694 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1695 let id = stream_job.id();
1696 let specified_parallelism = fragment_graph.specified_parallelism();
1697 let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1698 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1699
1700 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1702
1703 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1707 fragment_graph.collect_snapshot_backfill_info()?;
1708 assert!(
1709 snapshot_backfill_info
1710 .iter()
1711 .chain([&cross_db_snapshot_backfill_info])
1712 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1713 .all(|backfill_epoch| backfill_epoch.is_none()),
1714 "should not set backfill epoch when initially build the job: {:?} {:?}",
1715 snapshot_backfill_info,
1716 cross_db_snapshot_backfill_info
1717 );
1718
1719 let locality_fragment_state_table_mapping =
1720 fragment_graph.find_locality_provider_fragment_state_table_mapping();
1721
1722 self.metadata_manager
1724 .catalog_controller
1725 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1726 .await?;
1727
1728 let upstream_table_ids = fragment_graph
1729 .dependent_table_ids()
1730 .iter()
1731 .filter(|id| {
1732 !cross_db_snapshot_backfill_info
1733 .upstream_mv_table_id_to_backfill_epoch
1734 .contains_key(id)
1735 })
1736 .cloned()
1737 .collect();
1738
1739 let (upstream_root_fragments, existing_actor_location) = self
1740 .metadata_manager
1741 .get_upstream_root_fragments(&upstream_table_ids)
1742 .await?;
1743
1744 if snapshot_backfill_info.is_some() {
1745 match stream_job {
1746 StreamingJob::MaterializedView(_)
1747 | StreamingJob::Sink(_)
1748 | StreamingJob::Index(_, _) => {}
1749 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1750 return Err(
1751 anyhow!("snapshot_backfill not enabled for table and source").into(),
1752 );
1753 }
1754 }
1755 }
1756
1757 let upstream_actors = upstream_root_fragments
1758 .values()
1759 .map(|(fragment, _)| {
1760 (
1761 fragment.fragment_id,
1762 fragment.actors.keys().copied().collect(),
1763 )
1764 })
1765 .collect();
1766
1767 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1768 fragment_graph,
1769 FragmentGraphUpstreamContext {
1770 upstream_root_fragments,
1771 upstream_actor_location: existing_actor_location,
1772 },
1773 (&stream_job).into(),
1774 )?;
1775 let resource_group = if let Some(group) = resource_type.resource_group() {
1776 group
1777 } else {
1778 self.metadata_manager
1779 .get_database_resource_group(stream_job.database_id())
1780 .await?
1781 };
1782 let is_serverless_backfill = matches!(
1783 &resource_type,
1784 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1785 );
1786
1787 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1789
1790 let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1791 let parallelism = self.resolve_stream_parallelism(
1792 initial_parallelism,
1793 max_parallelism,
1794 &cluster_info,
1795 resource_group.clone(),
1796 )?;
1797
1798 let parallelism = self
1799 .env
1800 .system_params_reader()
1801 .await
1802 .adaptive_parallelism_strategy()
1803 .compute_target_parallelism(parallelism.get());
1804
1805 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1806 let actor_graph_builder = ActorGraphBuilder::new(
1807 id,
1808 resource_group,
1809 complete_graph,
1810 cluster_info,
1811 parallelism,
1812 )?;
1813
1814 let ActorGraphBuildResult {
1815 graph,
1816 downstream_fragment_relations,
1817 building_locations,
1818 upstream_fragment_downstreams,
1819 new_no_shuffle,
1820 replace_upstream,
1821 ..
1822 } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1823 assert!(replace_upstream.is_empty());
1824
1825 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1832 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1833 _ => TableParallelism::Fixed(parallelism.get()),
1834 };
1835
1836 let stream_job_fragments = StreamJobFragments::new(
1837 id,
1838 graph,
1839 &building_locations.actor_locations,
1840 stream_ctx.clone(),
1841 table_parallelism,
1842 max_parallelism.get(),
1843 );
1844
1845 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1846 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1847 }
1848
1849 let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1850 && let Ok(table_id) = sink.get_target_table()
1851 {
1852 let tables = self
1853 .metadata_manager
1854 .get_table_catalog_by_ids(&[*table_id])
1855 .await?;
1856 let target_table = tables
1857 .first()
1858 .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1859 let sink_fragment = stream_job_fragments
1860 .sink_fragment()
1861 .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1862 let mview_fragment_id = self
1863 .metadata_manager
1864 .catalog_controller
1865 .get_mview_fragment_by_id(table_id.as_job_id())
1866 .await?;
1867 let upstream_sink_info = build_upstream_sink_info(
1868 sink,
1869 sink_fragment.fragment_id as _,
1870 target_table,
1871 mview_fragment_id,
1872 )?;
1873 Some(upstream_sink_info)
1874 } else {
1875 None
1876 };
1877
1878 let mut cdc_table_snapshot_splits = None;
1879 if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1880 && let Some((_, stream_cdc_scan)) =
1881 parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1882 {
1883 {
1884 let splits = try_init_parallel_cdc_table_snapshot_splits(
1886 table.id,
1887 stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1888 self.env.meta_store_ref(),
1889 stream_cdc_scan.options.as_ref().unwrap(),
1890 self.env.opts.cdc_table_split_init_insert_batch_size,
1891 self.env.opts.cdc_table_split_init_sleep_interval_splits,
1892 self.env.opts.cdc_table_split_init_sleep_duration_millis,
1893 )
1894 .await?;
1895 cdc_table_snapshot_splits = Some(splits);
1896 }
1897 }
1898
1899 let ctx = CreateStreamingJobContext {
1900 upstream_fragment_downstreams,
1901 new_no_shuffle,
1902 upstream_actors,
1903 building_locations,
1904 definition: stream_job.definition(),
1905 create_type: stream_job.create_type(),
1906 job_type: (&stream_job).into(),
1907 streaming_job: stream_job,
1908 new_upstream_sink,
1909 option: CreateStreamingJobOption {},
1910 snapshot_backfill_info,
1911 cross_db_snapshot_backfill_info,
1912 fragment_backfill_ordering,
1913 locality_fragment_state_table_mapping,
1914 cdc_table_snapshot_splits,
1915 is_serverless_backfill,
1916 };
1917
1918 Ok((
1919 ctx,
1920 StreamJobFragmentsToCreate {
1921 inner: stream_job_fragments,
1922 downstreams: downstream_fragment_relations,
1923 },
1924 ))
1925 }
1926
1927 pub(crate) async fn build_replace_job(
1933 &self,
1934 stream_ctx: StreamContext,
1935 stream_job: &StreamingJob,
1936 mut fragment_graph: StreamFragmentGraph,
1937 tmp_job_id: JobId,
1938 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1939 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1940 match &stream_job {
1941 StreamingJob::Table(..)
1942 | StreamingJob::Source(..)
1943 | StreamingJob::MaterializedView(..) => {}
1944 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1945 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1946 }
1947 }
1948
1949 let id = stream_job.id();
1950
1951 let mut drop_table_associated_source_id = None;
1953 if let StreamingJob::Table(None, _, _) = &stream_job {
1954 drop_table_associated_source_id = self
1955 .metadata_manager
1956 .get_table_associated_source_id(id.as_mv_table_id())
1957 .await?;
1958 }
1959
1960 let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1961 let old_internal_table_ids = old_fragments.internal_table_ids();
1962
1963 let mut drop_table_connector_ctx = None;
1965 if let Some(to_remove_source_id) = drop_table_associated_source_id {
1966 debug_assert!(old_internal_table_ids.len() == 1);
1968
1969 drop_table_connector_ctx = Some(DropTableConnectorContext {
1970 to_change_streaming_job_id: id,
1973 to_remove_state_table_id: old_internal_table_ids[0], to_remove_source_id,
1975 });
1976 } else if stream_job.is_materialized_view() {
1977 let old_fragments_upstreams = self
1980 .metadata_manager
1981 .catalog_controller
1982 .upstream_fragments(old_fragments.fragment_ids())
1983 .await?;
1984
1985 let old_state_graph =
1986 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1987 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1988 let result = state_match::match_graph(&new_state_graph, &old_state_graph)
1989 .context("incompatible altering on the streaming job states")?;
1990
1991 fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
1992 fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
1993 } else {
1994 let old_internal_tables = self
1997 .metadata_manager
1998 .get_table_catalog_by_ids(&old_internal_table_ids)
1999 .await?;
2000 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2001 }
2002
2003 let original_root_fragment = old_fragments
2006 .root_fragment()
2007 .expect("root fragment not found");
2008
2009 let job_type = StreamingJobType::from(stream_job);
2010
2011 let (mut downstream_fragments, mut downstream_actor_location) =
2013 self.metadata_manager.get_downstream_fragments(id).await?;
2014
2015 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2016 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2017 .iter()
2018 .map(|sink| sink.original_fragment.fragment_id)
2019 .collect();
2020 for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2021 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2022 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2023 }) {
2024 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2025 for actor_id in downstream_fragment.actors.keys() {
2026 downstream_actor_location.remove(actor_id);
2027 }
2028 for (actor_id, status) in &sink.actor_status {
2029 downstream_actor_location
2030 .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2031 }
2032
2033 *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2034 *nodes = sink.new_fragment.nodes.clone();
2035 }
2036 }
2037 assert!(remaining_fragment.is_empty());
2038 }
2039
2040 let complete_graph = match &job_type {
2042 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2043 CompleteStreamFragmentGraph::with_downstreams(
2044 fragment_graph,
2045 FragmentGraphDownstreamContext {
2046 original_root_fragment_id: original_root_fragment.fragment_id,
2047 downstream_fragments,
2048 downstream_actor_location,
2049 },
2050 job_type,
2051 )?
2052 }
2053 StreamingJobType::Table(TableJobType::SharedCdcSource)
2054 | StreamingJobType::MaterializedView => {
2055 let (upstream_root_fragments, upstream_actor_location) = self
2057 .metadata_manager
2058 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2059 .await?;
2060
2061 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2062 fragment_graph,
2063 FragmentGraphUpstreamContext {
2064 upstream_root_fragments,
2065 upstream_actor_location,
2066 },
2067 FragmentGraphDownstreamContext {
2068 original_root_fragment_id: original_root_fragment.fragment_id,
2069 downstream_fragments,
2070 downstream_actor_location,
2071 },
2072 job_type,
2073 )?
2074 }
2075 _ => unreachable!(),
2076 };
2077
2078 let resource_group = self
2079 .metadata_manager
2080 .get_existing_job_resource_group(id)
2081 .await?;
2082
2083 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2085
2086 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2089 .expect("The number of actors in the original table fragment should be greater than 0");
2090
2091 let actor_graph_builder = ActorGraphBuilder::new(
2092 id,
2093 resource_group,
2094 complete_graph,
2095 cluster_info,
2096 parallelism,
2097 )?;
2098
2099 let ActorGraphBuildResult {
2100 graph,
2101 downstream_fragment_relations,
2102 building_locations,
2103 upstream_fragment_downstreams,
2104 mut replace_upstream,
2105 new_no_shuffle,
2106 ..
2107 } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2108
2109 if matches!(
2111 job_type,
2112 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2113 ) {
2114 assert!(upstream_fragment_downstreams.is_empty());
2115 }
2116
2117 let stream_job_fragments = StreamJobFragments::new(
2121 tmp_job_id,
2122 graph,
2123 &building_locations.actor_locations,
2124 stream_ctx,
2125 old_fragments.assigned_parallelism,
2126 old_fragments.max_parallelism,
2127 );
2128
2129 if let Some(sinks) = &auto_refresh_schema_sinks {
2130 for sink in sinks {
2131 replace_upstream
2132 .remove(&sink.new_fragment.fragment_id)
2133 .expect("should exist");
2134 }
2135 }
2136
2137 let ctx = ReplaceStreamJobContext {
2141 old_fragments,
2142 replace_upstream,
2143 new_no_shuffle,
2144 upstream_fragment_downstreams,
2145 building_locations,
2146 streaming_job: stream_job.clone(),
2147 tmp_id: tmp_job_id,
2148 drop_table_connector_ctx,
2149 auto_refresh_schema_sinks,
2150 };
2151
2152 Ok((
2153 ctx,
2154 StreamJobFragmentsToCreate {
2155 inner: stream_job_fragments,
2156 downstreams: downstream_fragment_relations,
2157 },
2158 ))
2159 }
2160
2161 async fn alter_name(
2162 &self,
2163 relation: alter_name_request::Object,
2164 new_name: &str,
2165 ) -> MetaResult<NotificationVersion> {
2166 let (obj_type, id) = match relation {
2167 alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2168 alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2169 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2170 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2171 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2172 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2173 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2174 alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2175 };
2176 self.metadata_manager
2177 .catalog_controller
2178 .alter_name(obj_type, id, new_name)
2179 .await
2180 }
2181
2182 async fn alter_swap_rename(
2183 &self,
2184 object: alter_swap_rename_request::Object,
2185 ) -> MetaResult<NotificationVersion> {
2186 let (obj_type, src_id, dst_id) = match object {
2187 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2188 alter_swap_rename_request::Object::Table(objs) => {
2189 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2190 (ObjectType::Table, src_id, dst_id)
2191 }
2192 alter_swap_rename_request::Object::View(objs) => {
2193 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2194 (ObjectType::View, src_id, dst_id)
2195 }
2196 alter_swap_rename_request::Object::Source(objs) => {
2197 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2198 (ObjectType::Source, src_id, dst_id)
2199 }
2200 alter_swap_rename_request::Object::Sink(objs) => {
2201 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2202 (ObjectType::Sink, src_id, dst_id)
2203 }
2204 alter_swap_rename_request::Object::Subscription(objs) => {
2205 let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2206 (ObjectType::Subscription, src_id, dst_id)
2207 }
2208 };
2209
2210 self.metadata_manager
2211 .catalog_controller
2212 .alter_swap_rename(obj_type, src_id, dst_id)
2213 .await
2214 }
2215
2216 async fn alter_owner(
2217 &self,
2218 object: Object,
2219 owner_id: UserId,
2220 ) -> MetaResult<NotificationVersion> {
2221 let (obj_type, id) = match object {
2222 Object::TableId(id) => (ObjectType::Table, id),
2223 Object::ViewId(id) => (ObjectType::View, id),
2224 Object::SourceId(id) => (ObjectType::Source, id),
2225 Object::SinkId(id) => (ObjectType::Sink, id),
2226 Object::SchemaId(id) => (ObjectType::Schema, id),
2227 Object::DatabaseId(id) => (ObjectType::Database, id),
2228 Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2229 Object::ConnectionId(id) => (ObjectType::Connection, id),
2230 };
2231 self.metadata_manager
2232 .catalog_controller
2233 .alter_owner(obj_type, id.into(), owner_id as _)
2234 .await
2235 }
2236
2237 async fn alter_set_schema(
2238 &self,
2239 object: alter_set_schema_request::Object,
2240 new_schema_id: SchemaId,
2241 ) -> MetaResult<NotificationVersion> {
2242 let (obj_type, id) = match object {
2243 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2244 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2245 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2246 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2247 alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2248 alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2249 alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2250 };
2251 self.metadata_manager
2252 .catalog_controller
2253 .alter_schema(obj_type, id.into(), new_schema_id as _)
2254 .await
2255 }
2256
2257 pub async fn wait(&self) -> MetaResult<()> {
2258 let timeout_ms = 30 * 60 * 1000;
2259 for _ in 0..timeout_ms {
2260 if self
2261 .metadata_manager
2262 .catalog_controller
2263 .list_background_creating_jobs(true, None)
2264 .await?
2265 .is_empty()
2266 {
2267 return Ok(());
2268 }
2269
2270 sleep(Duration::from_millis(1)).await;
2271 }
2272 Err(MetaError::cancelled(format!(
2273 "timeout after {timeout_ms}ms"
2274 )))
2275 }
2276
2277 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2278 self.metadata_manager
2279 .catalog_controller
2280 .comment_on(comment)
2281 .await
2282 }
2283
2284 async fn alter_streaming_job_config(
2285 &self,
2286 job_id: JobId,
2287 entries_to_add: HashMap<String, String>,
2288 keys_to_remove: Vec<String>,
2289 ) -> MetaResult<NotificationVersion> {
2290 self.metadata_manager
2291 .catalog_controller
2292 .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2293 .await
2294 }
2295}
2296
2297fn report_create_object(
2298 job_id: JobId,
2299 event_name: &str,
2300 obj_type: PbTelemetryDatabaseObject,
2301 connector_name: Option<String>,
2302 attr_info: Option<jsonbb::Value>,
2303) {
2304 report_event(
2305 PbTelemetryEventStage::CreateStreamJob,
2306 event_name,
2307 job_id.as_raw_id() as _,
2308 connector_name,
2309 Some(obj_type),
2310 attr_info,
2311 );
2312}
2313
2314pub fn build_upstream_sink_info(
2315 sink: &PbSink,
2316 sink_fragment_id: FragmentId,
2317 target_table: &PbTable,
2318 target_fragment_id: FragmentId,
2319) -> MetaResult<UpstreamSinkInfo> {
2320 let sink_columns = if !sink.original_target_columns.is_empty() {
2321 sink.original_target_columns.clone()
2322 } else {
2323 target_table.columns.clone()
2328 };
2329
2330 let sink_output_fields = sink_columns
2331 .iter()
2332 .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2333 .collect_vec();
2334 let output_indices = (0..sink_output_fields.len())
2335 .map(|i| i as u32)
2336 .collect_vec();
2337
2338 let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2339 let sink_idx_by_col_id = sink_columns
2340 .iter()
2341 .enumerate()
2342 .map(|(idx, col)| {
2343 let column_id = col.column_desc.as_ref().unwrap().column_id;
2344 (column_id, idx as u32)
2345 })
2346 .collect::<HashMap<_, _>>();
2347 target_table
2348 .distribution_key
2349 .iter()
2350 .map(|dist_idx| {
2351 let column_id = target_table.columns[*dist_idx as usize]
2352 .column_desc
2353 .as_ref()
2354 .unwrap()
2355 .column_id;
2356 let sink_idx = sink_idx_by_col_id
2357 .get(&column_id)
2358 .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2359 Ok(*sink_idx)
2360 })
2361 .collect::<anyhow::Result<Vec<_>>>()?
2362 };
2363 let dist_key_indices =
2364 dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2365 let downstream_fragment_id = target_fragment_id as _;
2366 let new_downstream_relation = DownstreamFragmentRelation {
2367 downstream_fragment_id,
2368 dispatcher_type: DispatcherType::Hash,
2369 dist_key_indices,
2370 output_mapping: PbDispatchOutputMapping::simple(output_indices),
2371 };
2372 let current_target_columns = target_table.get_columns();
2373 let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2374 Ok(UpstreamSinkInfo {
2375 sink_id: sink.id,
2376 sink_fragment_id: sink_fragment_id as _,
2377 sink_output_fields,
2378 sink_original_target_columns: sink.get_original_target_columns().clone(),
2379 project_exprs,
2380 new_sink_downstream: new_downstream_relation,
2381 })
2382}
2383
2384pub fn refill_upstream_sink_union_in_table(
2385 union_fragment_root: &mut PbStreamNode,
2386 upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2387) {
2388 visit_stream_node_cont_mut(union_fragment_root, |node| {
2389 if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2390 let init_upstreams = upstream_sink_infos
2391 .iter()
2392 .map(|info| PbUpstreamSinkInfo {
2393 upstream_fragment_id: info.sink_fragment_id,
2394 sink_output_schema: info.sink_output_fields.clone(),
2395 project_exprs: info.project_exprs.clone(),
2396 })
2397 .collect();
2398 upstream_sink_union.init_upstreams = init_upstreams;
2399 false
2400 } else {
2401 true
2402 }
2403 });
2404}
2405
2406#[cfg(test)]
2407mod tests {
2408 use std::num::NonZeroUsize;
2409
2410 use super::*;
2411
2412 #[test]
2413 fn test_specified_parallelism_exceeds_available() {
2414 let result = DdlController::resolve_stream_parallelism_inner(
2415 Some(NonZeroUsize::new(100).unwrap()),
2416 NonZeroUsize::new(256).unwrap(),
2417 Some(NonZeroUsize::new(4).unwrap()),
2418 &DefaultParallelism::Full,
2419 "default",
2420 )
2421 .unwrap();
2422 assert_eq!(result.get(), 100);
2423 }
2424
2425 #[test]
2426 fn test_allows_default_parallelism_over_available() {
2427 let result = DdlController::resolve_stream_parallelism_inner(
2428 None,
2429 NonZeroUsize::new(256).unwrap(),
2430 Some(NonZeroUsize::new(4).unwrap()),
2431 &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2432 "default",
2433 )
2434 .unwrap();
2435 assert_eq!(result.get(), 50);
2436 }
2437
2438 #[test]
2439 fn test_full_parallelism_capped_by_max() {
2440 let result = DdlController::resolve_stream_parallelism_inner(
2441 None,
2442 NonZeroUsize::new(6).unwrap(),
2443 Some(NonZeroUsize::new(10).unwrap()),
2444 &DefaultParallelism::Full,
2445 "default",
2446 )
2447 .unwrap();
2448 assert_eq!(result.get(), 6);
2449 }
2450
2451 #[test]
2452 fn test_no_available_slots_returns_error() {
2453 let result = DdlController::resolve_stream_parallelism_inner(
2454 None,
2455 NonZeroUsize::new(4).unwrap(),
2456 None,
2457 &DefaultParallelism::Full,
2458 "default",
2459 );
2460 assert!(matches!(
2461 result,
2462 Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2463 ));
2464 }
2465
2466 #[test]
2467 fn test_specified_over_max_returns_error() {
2468 let result = DdlController::resolve_stream_parallelism_inner(
2469 Some(NonZeroUsize::new(8).unwrap()),
2470 NonZeroUsize::new(4).unwrap(),
2471 Some(NonZeroUsize::new(10).unwrap()),
2472 &DefaultParallelism::Full,
2473 "default",
2474 );
2475 assert!(matches!(
2476 result,
2477 Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2478 ));
2479 }
2480}