1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::{InstrumentAwait, span};
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog, ColumnId, FragmentTypeFlag};
27use risingwave_common::config::DefaultParallelism;
28use risingwave_common::hash::VnodeCountCompat;
29use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
30use risingwave_common::system_param::reader::SystemParamsRead;
31use risingwave_common::util::stream_graph_visitor::{
32 visit_stream_node, visit_stream_node_body, visit_stream_node_cont_mut,
33};
34use risingwave_common::{bail, bail_not_implemented, must_match};
35use risingwave_connector::WithOptionsSecResolved;
36use risingwave_connector::connector_common::validate_connection;
37use risingwave_connector::source::cdc::CdcScanOptions;
38use risingwave_connector::source::{
39 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
40};
41use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
42use risingwave_meta_model::object::ObjectType;
43use risingwave_meta_model::{
44 ConnectionId, DatabaseId, DispatcherType, FunctionId, IndexId, ObjectId, SchemaId, SecretId,
45 SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId, WorkerId,
46};
47use risingwave_pb::catalog::{
48 Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
49 Subscription, Table, View,
50};
51use risingwave_pb::common::PbActorLocation;
52use risingwave_pb::ddl_service::alter_owner_request::Object;
53use risingwave_pb::ddl_service::{
54 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
55 alter_swap_rename_request,
56};
57use risingwave_pb::meta::table_fragments::PbActorStatus;
58use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
59use risingwave_pb::stream_plan::stream_node::NodeBody;
60use risingwave_pb::stream_plan::{
61 MergeNode, PbDispatchOutputMapping, PbDispatcherType, PbStreamFragmentGraph,
62 StreamFragmentGraph as StreamFragmentGraphProto,
63};
64use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
65use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
66use strum::Display;
67use thiserror_ext::AsReport;
68use tokio::sync::{OwnedSemaphorePermit, Semaphore, oneshot};
69use tokio::time::sleep;
70use tracing::Instrument;
71
72use crate::barrier::BarrierManagerRef;
73use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
74use crate::controller::cluster::StreamingClusterInfo;
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
77use crate::manager::{
78 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
79 NotificationVersion, StreamingJob, StreamingJobType,
80};
81use crate::model::{
82 DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, StreamContext,
83 StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
84};
85use crate::stream::cdc::{
86 is_parallelized_backfill_enabled, try_init_parallel_cdc_table_snapshot_splits,
87};
88use crate::stream::{
89 ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
90 CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
91 GlobalStreamManagerRef, JobRescheduleTarget, ReplaceStreamJobContext, SourceChange,
92 SourceManagerRef, StreamFragmentGraph, check_sink_fragments_support_refresh_schema,
93 create_source_worker, rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
94};
95use crate::telemetry::report_event;
96use crate::{MetaError, MetaResult};
97
98#[derive(PartialEq)]
99pub enum DropMode {
100 Restrict,
101 Cascade,
102}
103
104impl DropMode {
105 pub fn from_request_setting(cascade: bool) -> DropMode {
106 if cascade {
107 DropMode::Cascade
108 } else {
109 DropMode::Restrict
110 }
111 }
112}
113
114#[derive(strum::AsRefStr)]
115pub enum StreamingJobId {
116 MaterializedView(TableId),
117 Sink(SinkId),
118 Table(Option<SourceId>, TableId),
119 Index(IndexId),
120}
121
122impl std::fmt::Display for StreamingJobId {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 write!(f, "{}", self.as_ref())?;
125 write!(f, "({})", self.id())
126 }
127}
128
129impl StreamingJobId {
130 #[allow(dead_code)]
131 fn id(&self) -> TableId {
132 match self {
133 StreamingJobId::MaterializedView(id)
134 | StreamingJobId::Sink(id)
135 | StreamingJobId::Table(_, id)
136 | StreamingJobId::Index(id) => *id,
137 }
138 }
139}
140
141pub struct ReplaceStreamJobInfo {
144 pub streaming_job: StreamingJob,
145 pub fragment_graph: StreamFragmentGraphProto,
146}
147
148#[derive(Display)]
149pub enum DdlCommand {
150 CreateDatabase(Database),
151 DropDatabase(DatabaseId),
152 CreateSchema(Schema),
153 DropSchema(SchemaId, DropMode),
154 CreateNonSharedSource(Source),
155 DropSource(SourceId, DropMode),
156 CreateFunction(Function),
157 DropFunction(FunctionId),
158 CreateView(View, HashSet<ObjectId>),
159 DropView(ViewId, DropMode),
160 CreateStreamingJob {
161 stream_job: StreamingJob,
162 fragment_graph: StreamFragmentGraphProto,
163 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
164 dependencies: HashSet<ObjectId>,
165 specific_resource_group: Option<String>, if_not_exists: bool,
167 },
168 DropStreamingJob {
169 job_id: StreamingJobId,
170 drop_mode: DropMode,
171 target_replace_info: Option<ReplaceStreamJobInfo>,
172 },
173 AlterName(alter_name_request::Object, String),
174 AlterSwapRename(alter_swap_rename_request::Object),
175 ReplaceStreamJob(ReplaceStreamJobInfo),
176 AlterNonSharedSource(Source),
177 AlterObjectOwner(Object, UserId),
178 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
179 CreateConnection(Connection),
180 DropConnection(ConnectionId, DropMode),
181 CreateSecret(Secret),
182 AlterSecret(Secret),
183 DropSecret(SecretId),
184 CommentOn(Comment),
185 CreateSubscription(Subscription),
186 DropSubscription(SubscriptionId, DropMode),
187 AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
188}
189
190impl DdlCommand {
191 fn object(&self) -> Either<String, ObjectId> {
193 use Either::*;
194 match self {
195 DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
196 DdlCommand::DropDatabase(id) => Right(*id),
197 DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
198 DdlCommand::DropSchema(id, _) => Right(*id),
199 DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
200 DdlCommand::DropSource(id, _) => Right(*id),
201 DdlCommand::CreateFunction(function) => Left(function.name.clone()),
202 DdlCommand::DropFunction(id) => Right(*id),
203 DdlCommand::CreateView(view, _) => Left(view.name.clone()),
204 DdlCommand::DropView(id, _) => Right(*id),
205 DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
206 DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id()),
207 DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
208 DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
209 DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
210 DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
211 DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
212 DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
213 DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
214 DdlCommand::DropConnection(id, _) => Right(*id),
215 DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
216 DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
217 DdlCommand::DropSecret(id) => Right(*id),
218 DdlCommand::CommentOn(comment) => Right(comment.table_id as _),
219 DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
220 DdlCommand::DropSubscription(id, _) => Right(*id),
221 DdlCommand::AlterDatabaseParam(id, _) => Right(*id),
222 }
223 }
224
225 fn allow_in_recovery(&self) -> bool {
226 match self {
227 DdlCommand::DropDatabase(_)
228 | DdlCommand::DropSchema(_, _)
229 | DdlCommand::DropSource(_, _)
230 | DdlCommand::DropFunction(_)
231 | DdlCommand::DropView(_, _)
232 | DdlCommand::DropStreamingJob { .. }
233 | DdlCommand::DropConnection(_, _)
234 | DdlCommand::DropSecret(_)
235 | DdlCommand::DropSubscription(_, _)
236 | DdlCommand::AlterName(_, _)
237 | DdlCommand::AlterObjectOwner(_, _)
238 | DdlCommand::AlterSetSchema(_, _)
239 | DdlCommand::CreateDatabase(_)
240 | DdlCommand::CreateSchema(_)
241 | DdlCommand::CreateFunction(_)
242 | DdlCommand::CreateView(_, _)
243 | DdlCommand::CreateConnection(_)
244 | DdlCommand::CommentOn(_)
245 | DdlCommand::CreateSecret(_)
246 | DdlCommand::AlterSecret(_)
247 | DdlCommand::AlterSwapRename(_)
248 | DdlCommand::AlterDatabaseParam(_, _) => true,
249 DdlCommand::CreateStreamingJob { .. }
250 | DdlCommand::CreateNonSharedSource(_)
251 | DdlCommand::ReplaceStreamJob(_)
252 | DdlCommand::AlterNonSharedSource(_)
253 | DdlCommand::CreateSubscription(_) => false,
254 }
255 }
256}
257
258#[derive(Clone)]
259pub struct DdlController {
260 pub(crate) env: MetaSrvEnv,
261
262 pub(crate) metadata_manager: MetadataManager,
263 pub(crate) stream_manager: GlobalStreamManagerRef,
264 pub(crate) source_manager: SourceManagerRef,
265 barrier_manager: BarrierManagerRef,
266
267 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
269
270 seq: Arc<AtomicU64>,
272}
273
274#[derive(Clone)]
275pub struct CreatingStreamingJobPermit {
276 pub(crate) semaphore: Arc<Semaphore>,
277}
278
279impl CreatingStreamingJobPermit {
280 async fn new(env: &MetaSrvEnv) -> Self {
281 let mut permits = env
282 .system_params_reader()
283 .await
284 .max_concurrent_creating_streaming_jobs() as usize;
285 if permits == 0 {
286 permits = Semaphore::MAX_PERMITS;
288 }
289 let semaphore = Arc::new(Semaphore::new(permits));
290
291 let (local_notification_tx, mut local_notification_rx) =
292 tokio::sync::mpsc::unbounded_channel();
293 env.notification_manager()
294 .insert_local_sender(local_notification_tx);
295 let semaphore_clone = semaphore.clone();
296 tokio::spawn(async move {
297 while let Some(notification) = local_notification_rx.recv().await {
298 let LocalNotification::SystemParamsChange(p) = ¬ification else {
299 continue;
300 };
301 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
302 if new_permits == 0 {
303 new_permits = Semaphore::MAX_PERMITS;
304 }
305 match permits.cmp(&new_permits) {
306 Ordering::Less => {
307 semaphore_clone.add_permits(new_permits - permits);
308 }
309 Ordering::Equal => continue,
310 Ordering::Greater => {
311 let to_release = permits - new_permits;
312 let reduced = semaphore_clone.forget_permits(to_release);
313 if reduced != to_release {
315 tracing::warn!(
316 "no enough permits to release, expected {}, but reduced {}",
317 to_release,
318 reduced
319 );
320 }
321 }
322 }
323 tracing::info!(
324 "max_concurrent_creating_streaming_jobs changed from {} to {}",
325 permits,
326 new_permits
327 );
328 permits = new_permits;
329 }
330 });
331
332 Self { semaphore }
333 }
334}
335
336impl DdlController {
337 pub async fn new(
338 env: MetaSrvEnv,
339 metadata_manager: MetadataManager,
340 stream_manager: GlobalStreamManagerRef,
341 source_manager: SourceManagerRef,
342 barrier_manager: BarrierManagerRef,
343 ) -> Self {
344 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
345 Self {
346 env,
347 metadata_manager,
348 stream_manager,
349 source_manager,
350 barrier_manager,
351 creating_streaming_job_permits,
352 seq: Arc::new(AtomicU64::new(0)),
353 }
354 }
355
356 pub fn next_seq(&self) -> u64 {
358 self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
360 }
361
362 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
369 if !command.allow_in_recovery() {
370 self.barrier_manager.check_status_running()?;
371 }
372
373 let await_tree_key = format!("DDL Command {}", self.next_seq());
374 let await_tree_span = await_tree::span!("{command}({})", command.object());
375
376 let ctrl = self.clone();
377 let fut = async move {
378 match command {
379 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
380 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
381 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
382 DdlCommand::DropSchema(schema_id, drop_mode) => {
383 ctrl.drop_schema(schema_id, drop_mode).await
384 }
385 DdlCommand::CreateNonSharedSource(source) => {
386 ctrl.create_non_shared_source(source).await
387 }
388 DdlCommand::DropSource(source_id, drop_mode) => {
389 ctrl.drop_source(source_id, drop_mode).await
390 }
391 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
392 DdlCommand::DropFunction(function_id) => ctrl.drop_function(function_id).await,
393 DdlCommand::CreateView(view, dependencies) => {
394 ctrl.create_view(view, dependencies).await
395 }
396 DdlCommand::DropView(view_id, drop_mode) => {
397 ctrl.drop_view(view_id, drop_mode).await
398 }
399 DdlCommand::CreateStreamingJob {
400 stream_job,
401 fragment_graph,
402 affected_table_replace_info,
403 dependencies,
404 specific_resource_group,
405 if_not_exists,
406 } => {
407 ctrl.create_streaming_job(
408 stream_job,
409 fragment_graph,
410 affected_table_replace_info,
411 dependencies,
412 specific_resource_group,
413 if_not_exists,
414 )
415 .await
416 }
417 DdlCommand::DropStreamingJob {
418 job_id,
419 drop_mode,
420 target_replace_info,
421 } => {
422 ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info)
423 .await
424 }
425 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
426 streaming_job,
427 fragment_graph,
428 }) => ctrl.replace_job(streaming_job, fragment_graph).await,
429 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
430 DdlCommand::AlterObjectOwner(object, owner_id) => {
431 ctrl.alter_owner(object, owner_id).await
432 }
433 DdlCommand::AlterSetSchema(object, new_schema_id) => {
434 ctrl.alter_set_schema(object, new_schema_id).await
435 }
436 DdlCommand::CreateConnection(connection) => {
437 ctrl.create_connection(connection).await
438 }
439 DdlCommand::DropConnection(connection_id, drop_mode) => {
440 ctrl.drop_connection(connection_id, drop_mode).await
441 }
442 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
443 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
444 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
445 DdlCommand::AlterNonSharedSource(source) => {
446 ctrl.alter_non_shared_source(source).await
447 }
448 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
449 DdlCommand::CreateSubscription(subscription) => {
450 ctrl.create_subscription(subscription).await
451 }
452 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
453 ctrl.drop_subscription(subscription_id, drop_mode).await
454 }
455 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
456 DdlCommand::AlterDatabaseParam(database_id, param) => {
457 ctrl.alter_database_param(database_id, param).await
458 }
459 }
460 }
461 .in_current_span();
462 let fut = (self.env.await_tree_reg())
463 .register(await_tree_key, await_tree_span)
464 .instrument(Box::pin(fut));
465 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
466 Ok(Some(WaitVersion {
467 catalog_version: notification_version,
468 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
469 }))
470 }
471
472 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
473 self.barrier_manager.get_ddl_progress().await
474 }
475
476 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
477 let (version, updated_db) = self
478 .metadata_manager
479 .catalog_controller
480 .create_database(database)
481 .await?;
482 self.barrier_manager
484 .update_database_barrier(
485 updated_db.database_id,
486 updated_db.barrier_interval_ms.map(|v| v as u32),
487 updated_db.checkpoint_frequency.map(|v| v as u64),
488 )
489 .await?;
490 Ok(version)
491 }
492
493 #[tracing::instrument(skip(self), level = "debug")]
494 pub async fn reschedule_streaming_job(
495 &self,
496 job_id: u32,
497 target: JobRescheduleTarget,
498 mut deferred: bool,
499 ) -> MetaResult<()> {
500 tracing::info!("alter parallelism");
501 if self.barrier_manager.check_status_running().is_err() {
502 tracing::info!(
503 "alter parallelism is set to deferred mode because the system is in recovery state"
504 );
505 deferred = true;
506 }
507
508 self.stream_manager
509 .reschedule_streaming_job(job_id, target, deferred)
510 .await
511 }
512
513 pub async fn reschedule_cdc_table_backfill(
514 &self,
515 job_id: u32,
516 target: JobRescheduleTarget,
517 ) -> MetaResult<()> {
518 tracing::info!("alter CDC table backfill parallelism");
519 if self.barrier_manager.check_status_running().is_err() {
520 return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
521 }
522 self.stream_manager
523 .reschedule_cdc_table_backfill(job_id, target)
524 .await
525 }
526
527 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
528 self.drop_object(
529 ObjectType::Database,
530 database_id as _,
531 DropMode::Cascade,
532 None,
533 )
534 .await
535 }
536
537 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
538 self.metadata_manager
539 .catalog_controller
540 .create_schema(schema)
541 .await
542 }
543
544 async fn drop_schema(
545 &self,
546 schema_id: SchemaId,
547 drop_mode: DropMode,
548 ) -> MetaResult<NotificationVersion> {
549 self.drop_object(ObjectType::Schema, schema_id as _, drop_mode, None)
550 .await
551 }
552
553 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
555 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
556 .await
557 .context("failed to create source worker")?;
558
559 let (source_id, version) = self
560 .metadata_manager
561 .catalog_controller
562 .create_source(source)
563 .await?;
564 self.source_manager
565 .register_source_with_handle(source_id, handle)
566 .await;
567 Ok(version)
568 }
569
570 async fn drop_source(
571 &self,
572 source_id: SourceId,
573 drop_mode: DropMode,
574 ) -> MetaResult<NotificationVersion> {
575 self.drop_object(ObjectType::Source, source_id as _, drop_mode, None)
576 .await
577 }
578
579 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
582 self.metadata_manager
583 .catalog_controller
584 .alter_non_shared_source(source)
585 .await
586 }
587
588 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
589 self.metadata_manager
590 .catalog_controller
591 .create_function(function)
592 .await
593 }
594
595 async fn drop_function(&self, function_id: FunctionId) -> MetaResult<NotificationVersion> {
596 self.drop_object(
597 ObjectType::Function,
598 function_id as _,
599 DropMode::Restrict,
600 None,
601 )
602 .await
603 }
604
605 async fn create_view(
606 &self,
607 view: View,
608 dependencies: HashSet<ObjectId>,
609 ) -> MetaResult<NotificationVersion> {
610 self.metadata_manager
611 .catalog_controller
612 .create_view(view, dependencies)
613 .await
614 }
615
616 async fn drop_view(
617 &self,
618 view_id: ViewId,
619 drop_mode: DropMode,
620 ) -> MetaResult<NotificationVersion> {
621 self.drop_object(ObjectType::View, view_id as _, drop_mode, None)
622 .await
623 }
624
625 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
626 validate_connection(&connection).await?;
627 self.metadata_manager
628 .catalog_controller
629 .create_connection(connection)
630 .await
631 }
632
633 async fn drop_connection(
634 &self,
635 connection_id: ConnectionId,
636 drop_mode: DropMode,
637 ) -> MetaResult<NotificationVersion> {
638 self.drop_object(ObjectType::Connection, connection_id as _, drop_mode, None)
639 .await
640 }
641
642 async fn alter_database_param(
643 &self,
644 database_id: DatabaseId,
645 param: AlterDatabaseParam,
646 ) -> MetaResult<NotificationVersion> {
647 let (version, updated_db) = self
648 .metadata_manager
649 .catalog_controller
650 .alter_database_param(database_id, param)
651 .await?;
652 self.barrier_manager
654 .update_database_barrier(
655 database_id,
656 updated_db.barrier_interval_ms.map(|v| v as u32),
657 updated_db.checkpoint_frequency.map(|v| v as u64),
658 )
659 .await?;
660 Ok(version)
661 }
662
663 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
666 let secret_store_private_key = self
667 .env
668 .opts
669 .secret_store_private_key
670 .clone()
671 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
672
673 let encrypted_payload = SecretEncryption::encrypt(
674 secret_store_private_key.as_slice(),
675 secret.get_value().as_slice(),
676 )
677 .context(format!("failed to encrypt secret {}", secret.name))?;
678 Ok(encrypted_payload
679 .serialize()
680 .context(format!("failed to serialize secret {}", secret.name))?)
681 }
682
683 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
684 let secret_plain_payload = secret.value.clone();
687 let encrypted_payload = self.get_encrypted_payload(&secret)?;
688 secret.value = encrypted_payload;
689
690 self.metadata_manager
691 .catalog_controller
692 .create_secret(secret, secret_plain_payload)
693 .await
694 }
695
696 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
697 self.drop_object(ObjectType::Secret, secret_id as _, DropMode::Restrict, None)
698 .await
699 }
700
701 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
702 let secret_plain_payload = secret.value.clone();
703 let encrypted_payload = self.get_encrypted_payload(&secret)?;
704 secret.value = encrypted_payload;
705 self.metadata_manager
706 .catalog_controller
707 .alter_secret(secret, secret_plain_payload)
708 .await
709 }
710
711 async fn create_subscription(
712 &self,
713 mut subscription: Subscription,
714 ) -> MetaResult<NotificationVersion> {
715 tracing::debug!("create subscription");
716 let _permit = self
717 .creating_streaming_job_permits
718 .semaphore
719 .acquire()
720 .await
721 .unwrap();
722 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
723 self.metadata_manager
724 .catalog_controller
725 .create_subscription_catalog(&mut subscription)
726 .await?;
727 if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
728 tracing::debug!(error = %err.as_report(), "failed to create subscription");
729 let _ = self
730 .metadata_manager
731 .catalog_controller
732 .try_abort_creating_subscription(subscription.id as _)
733 .await
734 .inspect_err(|e| {
735 tracing::error!(
736 error = %e.as_report(),
737 "failed to abort create subscription after failure"
738 );
739 });
740 return Err(err);
741 }
742
743 let version = self
744 .metadata_manager
745 .catalog_controller
746 .notify_create_subscription(subscription.id)
747 .await?;
748 tracing::debug!("finish create subscription");
749 Ok(version)
750 }
751
752 async fn drop_subscription(
753 &self,
754 subscription_id: SubscriptionId,
755 drop_mode: DropMode,
756 ) -> MetaResult<NotificationVersion> {
757 tracing::debug!("preparing drop subscription");
758 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
759 let subscription = self
760 .metadata_manager
761 .catalog_controller
762 .get_subscription_by_id(subscription_id)
763 .await?;
764 let table_id = subscription.dependent_table_id;
765 let database_id = subscription.database_id.into();
766 let (_, version) = self
767 .metadata_manager
768 .catalog_controller
769 .drop_object(ObjectType::Subscription, subscription_id as _, drop_mode)
770 .await?;
771 self.stream_manager
772 .drop_subscription(database_id, subscription_id as _, table_id)
773 .await;
774 tracing::debug!("finish drop subscription");
775 Ok(version)
776 }
777
778 #[await_tree::instrument]
780 pub(crate) async fn validate_cdc_table(
781 &self,
782 table: &Table,
783 table_fragments: &StreamJobFragments,
784 ) -> MetaResult<()> {
785 let stream_scan_fragment = table_fragments
786 .fragments
787 .values()
788 .filter(|f| {
789 f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
790 || f.fragment_type_mask
791 .contains(FragmentTypeFlag::StreamCdcScan)
792 })
793 .exactly_one()
794 .ok()
795 .with_context(|| {
796 format!(
797 "expect exactly one stream scan fragment, got: {:?}",
798 table_fragments.fragments
799 )
800 })?;
801 fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
802 if let Some(NodeBody::StreamCdcScan(node)) = node_body {
803 if let Some(o) = node.options
804 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
805 {
806 } else {
808 assert_eq!(
809 stream_scan_fragment.actors.len(),
810 1,
811 "Stream scan fragment should have only one actor"
812 );
813 }
814 }
815 }
816 let mut found_cdc_scan = false;
817 match &stream_scan_fragment.nodes.node_body {
818 Some(NodeBody::StreamCdcScan(_)) => {
819 assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
820 if self
821 .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
822 .await?
823 {
824 found_cdc_scan = true;
825 }
826 }
827 Some(NodeBody::Project(_)) => {
829 for input in &stream_scan_fragment.nodes.input {
830 assert_parallelism(stream_scan_fragment, &input.node_body);
831 if self
832 .validate_cdc_table_inner(&input.node_body, table.id)
833 .await?
834 {
835 found_cdc_scan = true;
836 }
837 }
838 }
839 _ => {
840 bail!("Unexpected node body for stream cdc scan");
841 }
842 };
843 if !found_cdc_scan {
844 bail!("No stream cdc scan node found in stream scan fragment");
845 }
846 Ok(())
847 }
848
849 async fn validate_cdc_table_inner(
850 &self,
851 node_body: &Option<NodeBody>,
852 table_id: u32,
853 ) -> MetaResult<bool> {
854 let meta_store = self.env.meta_store_ref();
855 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
856 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
857 {
858 let options_with_secret = WithOptionsSecResolved::new(
859 cdc_table_desc.connect_properties.clone(),
860 cdc_table_desc.secret_refs.clone(),
861 );
862
863 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
864 props.init_from_pb_cdc_table_desc(cdc_table_desc);
865
866 let _enumerator = props
868 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
869 .await?;
870
871 if is_parallelized_backfill_enabled(stream_cdc_scan) {
872 try_init_parallel_cdc_table_snapshot_splits(
874 table_id,
875 cdc_table_desc,
876 meta_store,
877 &stream_cdc_scan.options,
878 self.env.opts.cdc_table_split_init_insert_batch_size,
879 self.env.opts.cdc_table_split_init_sleep_interval_splits,
880 self.env.opts.cdc_table_split_init_sleep_duration_millis,
881 )
882 .await?;
883 }
884
885 tracing::debug!(?table_id, "validate cdc table success");
886 Ok(true)
887 } else {
888 Ok(false)
889 }
890 }
891
892 pub(crate) async fn inject_replace_table_job_for_table_sink(
896 &self,
897 tmp_id: u32,
898 mgr: &MetadataManager,
899 stream_ctx: StreamContext,
900 sink: Option<&Sink>,
901 creating_sink_table_fragments: Option<&StreamJobFragments>,
902 dropping_sink_id: Option<SinkId>,
903 streaming_job: &StreamingJob,
904 fragment_graph: StreamFragmentGraph,
905 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
906 let (mut replace_table_ctx, mut stream_job_fragments) = self
907 .build_replace_job(stream_ctx, streaming_job, fragment_graph, tmp_id as _, None)
908 .await?;
909
910 let target_table = streaming_job.table().unwrap();
911
912 if let Some(creating_sink_table_fragments) = creating_sink_table_fragments {
913 let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();
914 let sink = sink.expect("sink not found");
915 Self::inject_replace_table_plan_for_sink(
916 sink.id,
917 &sink_fragment,
918 target_table,
919 &mut replace_table_ctx,
920 stream_job_fragments.inner.union_fragment_for_table(),
921 None,
922 );
923 }
924
925 let [table_catalog]: [_; 1] = mgr
926 .get_table_catalog_by_ids(vec![target_table.id])
927 .await?
928 .try_into()
929 .expect("Target table should exist in sink into table");
930
931 {
932 let catalogs = mgr
933 .get_sink_catalog_by_ids(&table_catalog.incoming_sinks)
934 .await?;
935
936 for sink in catalogs {
937 let sink_id = sink.id;
938
939 if let Some(dropping_sink_id) = dropping_sink_id
940 && sink_id == (dropping_sink_id as u32)
941 {
942 continue;
943 };
944
945 let sink_table_fragments = mgr
946 .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id))
947 .await?;
948
949 let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
950
951 Self::inject_replace_table_plan_for_sink(
952 sink_id,
953 &sink_fragment,
954 target_table,
955 &mut replace_table_ctx,
956 stream_job_fragments.inner.union_fragment_for_table(),
957 Some(&sink.unique_identity()),
958 );
959 }
960 }
961
962 for fragment in stream_job_fragments.fragments.values() {
964 {
965 {
966 visit_stream_node_body(&fragment.nodes, |node| {
967 if let NodeBody::Merge(merge_node) = node {
968 let upstream_fragment_id = merge_node.upstream_fragment_id;
969 if let Some(external_upstream_fragment_downstreams) = replace_table_ctx
970 .upstream_fragment_downstreams
971 .get(&upstream_fragment_id)
972 {
973 let mut upstream_fragment_downstreams =
974 external_upstream_fragment_downstreams.iter().filter(
975 |downstream| {
976 downstream.downstream_fragment_id
977 == fragment.fragment_id
978 },
979 );
980 assert!(
981 upstream_fragment_downstreams.next().is_some(),
982 "All the mergers for the union should have been fully assigned beforehand."
983 );
984 } else {
985 let mut upstream_fragment_downstreams = stream_job_fragments
986 .downstreams
987 .get(&upstream_fragment_id)
988 .into_iter()
989 .flatten()
990 .filter(|downstream| {
991 downstream.downstream_fragment_id == fragment.fragment_id
992 });
993 assert!(
994 upstream_fragment_downstreams.next().is_some(),
995 "All the mergers for the union should have been fully assigned beforehand."
996 );
997 }
998 }
999 });
1000 }
1001 }
1002 }
1003
1004 Ok((replace_table_ctx, stream_job_fragments))
1005 }
1006
1007 pub(crate) fn inject_replace_table_plan_for_sink(
1008 sink_id: u32,
1009 sink_fragment: &Fragment,
1010 table: &Table,
1011 replace_table_ctx: &mut ReplaceStreamJobContext,
1012 union_fragment: &mut Fragment,
1013 unique_identity: Option<&str>,
1014 ) {
1015 let sink_fields = sink_fragment.nodes.fields.clone();
1016
1017 let output_indices = sink_fields
1018 .iter()
1019 .enumerate()
1020 .map(|(idx, _)| idx as _)
1021 .collect_vec();
1022
1023 let dist_key_indices = table.distribution_key.iter().map(|i| *i as _).collect_vec();
1024
1025 let sink_fragment_downstreams = replace_table_ctx
1026 .upstream_fragment_downstreams
1027 .entry(sink_fragment.fragment_id)
1028 .or_default();
1029
1030 {
1031 sink_fragment_downstreams.push(DownstreamFragmentRelation {
1032 downstream_fragment_id: union_fragment.fragment_id,
1033 dispatcher_type: DispatcherType::Hash,
1034 dist_key_indices: dist_key_indices.clone(),
1035 output_mapping: PbDispatchOutputMapping::simple(output_indices),
1036 });
1037 }
1038
1039 let upstream_fragment_id = sink_fragment.fragment_id;
1040
1041 let mut max_operator_id = 0;
1042
1043 visit_stream_node(&union_fragment.nodes, |node| {
1044 max_operator_id = max_operator_id.max(node.operator_id);
1045 });
1046
1047 {
1048 {
1049 visit_stream_node_cont_mut(&mut union_fragment.nodes, |node| {
1050 if let Some(NodeBody::Union(_)) = &mut node.node_body {
1051 for input_project_node in &mut node.input {
1052 if let Some(NodeBody::Project(_)) = &mut input_project_node.node_body {
1053 let merge_stream_node =
1054 input_project_node.input.iter_mut().exactly_one().unwrap();
1055
1056 if input_project_node.identity.as_str()
1058 != unique_identity
1059 .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
1060 {
1061 continue;
1062 }
1063
1064 if let Some(NodeBody::Merge(merge_node)) =
1065 &mut merge_stream_node.node_body
1066 {
1067 {
1068 merge_stream_node.identity =
1069 format!("MergeExecutor(from sink {})", sink_id);
1070 merge_stream_node.operator_id = max_operator_id + 1;
1071
1072 input_project_node.identity =
1073 format!("ProjectExecutor(from sink {})", sink_id);
1074 input_project_node.operator_id = max_operator_id + 2;
1075 }
1076
1077 **merge_node = {
1078 MergeNode {
1079 upstream_fragment_id,
1080 upstream_dispatcher_type: PbDispatcherType::Hash as _,
1081 ..Default::default()
1082 }
1083 };
1084
1085 merge_stream_node.fields = sink_fields.to_vec();
1086
1087 return false;
1088 }
1089 }
1090 }
1091 }
1092 true
1093 });
1094 }
1095 }
1096 }
1097
1098 #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1101 pub async fn create_streaming_job(
1102 &self,
1103 mut streaming_job: StreamingJob,
1104 fragment_graph: StreamFragmentGraphProto,
1105 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1106 dependencies: HashSet<ObjectId>,
1107 specific_resource_group: Option<String>,
1108 if_not_exists: bool,
1109 ) -> MetaResult<NotificationVersion> {
1110 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1111 let check_ret = self
1112 .metadata_manager
1113 .catalog_controller
1114 .create_job_catalog(
1115 &mut streaming_job,
1116 &ctx,
1117 &fragment_graph.parallelism,
1118 fragment_graph.max_parallelism as _,
1119 dependencies,
1120 specific_resource_group.clone(),
1121 )
1122 .await;
1123 if let Err(meta_err) = check_ret {
1124 if !if_not_exists {
1125 return Err(meta_err);
1126 }
1127 return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1128 if streaming_job.create_type() == CreateType::Foreground {
1129 let database_id = streaming_job.database_id();
1130 self.metadata_manager
1131 .wait_streaming_job_finished(database_id.into(), *job_id)
1132 .await
1133 } else {
1134 Ok(IGNORED_NOTIFICATION_VERSION)
1135 }
1136 } else {
1137 Err(meta_err)
1138 };
1139 }
1140 let job_id = streaming_job.id();
1141 tracing::debug!(
1142 id = job_id,
1143 definition = streaming_job.definition(),
1144 create_type = streaming_job.create_type().as_str_name(),
1145 job_type = ?streaming_job.job_type(),
1146 "starting streaming job",
1147 );
1148 let permit = self
1150 .creating_streaming_job_permits
1151 .semaphore
1152 .clone()
1153 .acquire_owned()
1154 .instrument_await("acquire_creating_streaming_job_permit")
1155 .await
1156 .unwrap();
1157 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1158
1159 let name = streaming_job.name();
1160 let definition = streaming_job.definition();
1161 let source_id = match &streaming_job {
1162 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1163 _ => None,
1164 };
1165
1166 match self
1168 .create_streaming_job_inner(
1169 ctx,
1170 streaming_job,
1171 fragment_graph,
1172 affected_table_replace_info,
1173 specific_resource_group,
1174 permit,
1175 )
1176 .await
1177 {
1178 Ok(version) => Ok(version),
1179 Err(err) => {
1180 tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
1181 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1182 id: job_id,
1183 name,
1184 definition,
1185 error: err.as_report().to_string(),
1186 };
1187 self.env.event_log_manager_ref().add_event_logs(vec![
1188 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1189 ]);
1190 let (aborted, _) = self
1191 .metadata_manager
1192 .catalog_controller
1193 .try_abort_creating_streaming_job(job_id as _, false)
1194 .await?;
1195 if aborted {
1196 tracing::warn!(id = job_id, "aborted streaming job");
1197 if let Some(source_id) = source_id {
1199 self.source_manager
1200 .apply_source_change(SourceChange::DropSource {
1201 dropped_source_ids: vec![source_id as SourceId],
1202 })
1203 .await;
1204 }
1205 }
1206 Err(err)
1207 }
1208 }
1209 }
1210
1211 #[await_tree::instrument(boxed)]
1212 async fn create_streaming_job_inner(
1213 &self,
1214 ctx: StreamContext,
1215 mut streaming_job: StreamingJob,
1216 fragment_graph: StreamFragmentGraphProto,
1217 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1218 specific_resource_group: Option<String>,
1219 permit: OwnedSemaphorePermit,
1220 ) -> MetaResult<NotificationVersion> {
1221 let mut fragment_graph =
1222 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1223 streaming_job.set_info_from_graph(&fragment_graph);
1224
1225 let incomplete_internal_tables = fragment_graph
1227 .incomplete_internal_tables()
1228 .into_values()
1229 .collect_vec();
1230 let table_id_map = self
1231 .metadata_manager
1232 .catalog_controller
1233 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1234 .await?;
1235 fragment_graph.refill_internal_table_ids(table_id_map);
1236
1237 let affected_table_replace_info = match affected_table_replace_info {
1238 Some(replace_table_info) => {
1239 assert!(
1240 specific_resource_group.is_none(),
1241 "specific_resource_group is not supported for replace table (alter column or sink into table)"
1242 );
1243
1244 let ReplaceStreamJobInfo {
1245 mut streaming_job,
1246 fragment_graph,
1247 ..
1248 } = replace_table_info;
1249
1250 let original_max_parallelism = self
1252 .metadata_manager
1253 .get_job_max_parallelism(streaming_job.id().into())
1254 .await?;
1255 let fragment_graph = PbStreamFragmentGraph {
1256 max_parallelism: original_max_parallelism as _,
1257 ..fragment_graph
1258 };
1259
1260 let fragment_graph =
1261 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1262 streaming_job.set_info_from_graph(&fragment_graph);
1263 let streaming_job = streaming_job;
1264
1265 Some((streaming_job, fragment_graph))
1266 }
1267 None => None,
1268 };
1269
1270 tracing::debug!(id = streaming_job.id(), "building streaming job");
1272 let (ctx, stream_job_fragments) = self
1273 .build_stream_job(
1274 ctx,
1275 streaming_job,
1276 fragment_graph,
1277 affected_table_replace_info,
1278 specific_resource_group,
1279 )
1280 .await?;
1281
1282 let streaming_job = &ctx.streaming_job;
1283
1284 match streaming_job {
1285 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1286 self.validate_cdc_table(table, &stream_job_fragments)
1287 .await?;
1288 }
1289 StreamingJob::Table(Some(source), ..) => {
1290 self.source_manager.register_source(source).await?;
1292 let connector_name = source
1293 .get_with_properties()
1294 .get(UPSTREAM_SOURCE_KEY)
1295 .cloned();
1296 let attr = source.info.as_ref().map(|source_info| {
1297 jsonbb::json!({
1298 "format": source_info.format().as_str_name(),
1299 "encode": source_info.row_encode().as_str_name(),
1300 })
1301 });
1302 report_create_object(
1303 streaming_job.id(),
1304 "source",
1305 PbTelemetryDatabaseObject::Source,
1306 connector_name,
1307 attr,
1308 );
1309 }
1310 StreamingJob::Sink(sink, _) => {
1311 if sink.auto_refresh_schema_from_table.is_some() {
1312 check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1313 }
1314 validate_sink(sink).await?;
1316 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1317 let attr = sink.format_desc.as_ref().map(|sink_info| {
1318 jsonbb::json!({
1319 "format": sink_info.format().as_str_name(),
1320 "encode": sink_info.encode().as_str_name(),
1321 })
1322 });
1323 report_create_object(
1324 streaming_job.id(),
1325 "sink",
1326 PbTelemetryDatabaseObject::Sink,
1327 connector_name,
1328 attr,
1329 );
1330 }
1331 StreamingJob::Source(source) => {
1332 self.source_manager.register_source(source).await?;
1334 let connector_name = source
1335 .get_with_properties()
1336 .get(UPSTREAM_SOURCE_KEY)
1337 .cloned();
1338 let attr = source.info.as_ref().map(|source_info| {
1339 jsonbb::json!({
1340 "format": source_info.format().as_str_name(),
1341 "encode": source_info.row_encode().as_str_name(),
1342 })
1343 });
1344 report_create_object(
1345 streaming_job.id(),
1346 "source",
1347 PbTelemetryDatabaseObject::Source,
1348 connector_name,
1349 attr,
1350 );
1351 }
1352 _ => {}
1353 }
1354
1355 self.metadata_manager
1356 .catalog_controller
1357 .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1358 .await?;
1359
1360 let stream_job_id = streaming_job.id();
1362 match streaming_job.create_type() {
1363 CreateType::Unspecified | CreateType::Foreground => {
1364 let version = self
1365 .stream_manager
1366 .create_streaming_job(stream_job_fragments, ctx, None)
1367 .await?;
1368 Ok(version)
1369 }
1370 CreateType::Background => {
1371 let await_tree_key = format!("Background DDL Worker ({})", stream_job_id);
1372 let await_tree_span =
1373 span!("{:?}({})", streaming_job.job_type(), streaming_job.name());
1374
1375 let ctrl = self.clone();
1376 let (tx, rx) = oneshot::channel();
1377 let fut = async move {
1378 let _ = ctrl
1379 .stream_manager
1380 .create_streaming_job(stream_job_fragments, ctx, Some(tx))
1381 .await
1382 .inspect_err(|err| {
1383 tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
1384 });
1385 drop(permit);
1387 };
1388
1389 let fut = (self.env.await_tree_reg())
1390 .register(await_tree_key, await_tree_span)
1391 .instrument(fut);
1392 tokio::spawn(fut);
1393
1394 rx.instrument_await("wait_background_streaming_job_creation_started")
1395 .await
1396 .map_err(|_| {
1397 anyhow!(
1398 "failed to receive create streaming job result of job: {}",
1399 stream_job_id
1400 )
1401 })??;
1402 Ok(IGNORED_NOTIFICATION_VERSION)
1403 }
1404 }
1405 }
1406
1407 pub async fn drop_object(
1409 &self,
1410 object_type: ObjectType,
1411 object_id: ObjectId,
1412 drop_mode: DropMode,
1413 target_replace_info: Option<ReplaceStreamJobInfo>,
1414 ) -> MetaResult<NotificationVersion> {
1415 let (release_ctx, mut version) = self
1416 .metadata_manager
1417 .catalog_controller
1418 .drop_object(object_type, object_id, drop_mode)
1419 .await?;
1420
1421 if let Some(replace_table_info) = target_replace_info {
1422 let stream_ctx =
1423 StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap());
1424
1425 let ReplaceStreamJobInfo {
1426 mut streaming_job,
1427 fragment_graph,
1428 ..
1429 } = replace_table_info;
1430
1431 let sink_id = if let ObjectType::Sink = object_type {
1432 object_id as _
1433 } else {
1434 panic!("additional replace table event only occurs when dropping sink into table")
1435 };
1436
1437 let original_max_parallelism = self
1439 .metadata_manager
1440 .get_job_max_parallelism(streaming_job.id().into())
1441 .await?;
1442 let fragment_graph = PbStreamFragmentGraph {
1443 max_parallelism: original_max_parallelism as _,
1444 ..fragment_graph
1445 };
1446
1447 let fragment_graph =
1448 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1449 streaming_job.set_info_from_graph(&fragment_graph);
1450 let streaming_job = streaming_job;
1451
1452 streaming_job.table().expect("should be table job");
1453
1454 tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink");
1455 let tmp_id = self
1456 .metadata_manager
1457 .catalog_controller
1458 .create_job_catalog_for_replace(
1459 &streaming_job,
1460 Some(&stream_ctx),
1461 fragment_graph.specified_parallelism().as_ref(),
1462 Some(fragment_graph.max_parallelism()),
1463 )
1464 .await? as u32;
1465
1466 let (ctx, stream_job_fragments) = self
1467 .inject_replace_table_job_for_table_sink(
1468 tmp_id,
1469 &self.metadata_manager,
1470 stream_ctx,
1471 None,
1472 None,
1473 Some(sink_id),
1474 &streaming_job,
1475 fragment_graph,
1476 )
1477 .await?;
1478
1479 let result: MetaResult<_> = try {
1480 let replace_upstream = ctx.replace_upstream.clone();
1481
1482 self.metadata_manager
1483 .catalog_controller
1484 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1485 .await?;
1486
1487 self.stream_manager
1488 .replace_stream_job(stream_job_fragments, ctx)
1489 .await?;
1490
1491 replace_upstream
1492 };
1493
1494 version = match result {
1495 Ok(replace_upstream) => {
1496 let version = self
1497 .metadata_manager
1498 .catalog_controller
1499 .finish_replace_streaming_job(
1500 tmp_id as _,
1501 streaming_job,
1502 replace_upstream,
1503 SinkIntoTableContext {
1504 creating_sink_id: None,
1505 dropping_sink_id: Some(sink_id),
1506 updated_sink_catalogs: vec![],
1507 },
1508 None, None, )
1511 .await?;
1512 Ok(version)
1513 }
1514 Err(err) => {
1515 tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table");
1516 let _ = self.metadata_manager
1517 .catalog_controller
1518 .try_abort_replacing_streaming_job(tmp_id as _, None)
1519 .await
1520 .inspect_err(|err| {
1521 tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table");
1522 });
1523 Err(err)
1524 }
1525 }?;
1526 }
1527
1528 let ReleaseContext {
1529 database_id,
1530 removed_streaming_job_ids,
1531 removed_state_table_ids,
1532 removed_source_ids,
1533 removed_secret_ids: secret_ids,
1534 removed_source_fragments,
1535 removed_actors,
1536 removed_fragments,
1537 } = release_ctx;
1538
1539 let _guard = self.source_manager.pause_tick().await;
1540 self.stream_manager
1541 .drop_streaming_jobs(
1542 risingwave_common::catalog::DatabaseId::new(database_id as _),
1543 removed_actors.iter().map(|id| *id as _).collect(),
1544 removed_streaming_job_ids,
1545 removed_state_table_ids,
1546 removed_fragments.iter().map(|id| *id as _).collect(),
1547 )
1548 .await;
1549
1550 self.source_manager
1553 .apply_source_change(SourceChange::DropSource {
1554 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1555 })
1556 .await;
1557
1558 let dropped_source_fragments = removed_source_fragments
1561 .into_iter()
1562 .map(|(source_id, fragments)| {
1563 (
1564 source_id,
1565 fragments.into_iter().map(|id| id as u32).collect(),
1566 )
1567 })
1568 .collect();
1569 let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
1570 self.source_manager
1571 .apply_source_change(SourceChange::DropMv {
1572 dropped_source_fragments,
1573 dropped_actors,
1574 })
1575 .await;
1576
1577 for secret in secret_ids {
1579 LocalSecretManager::global().remove_secret(secret as _);
1580 }
1581 Ok(version)
1582 }
1583
1584 #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1586 pub async fn replace_job(
1587 &self,
1588 mut streaming_job: StreamingJob,
1589 fragment_graph: StreamFragmentGraphProto,
1590 ) -> MetaResult<NotificationVersion> {
1591 match &streaming_job {
1592 StreamingJob::Table(..)
1593 | StreamingJob::Source(..)
1594 | StreamingJob::MaterializedView(..) => {}
1595 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1596 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1597 }
1598 }
1599
1600 let job_id = streaming_job.id();
1601
1602 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1603 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1604
1605 let original_max_parallelism = self
1607 .metadata_manager
1608 .get_job_max_parallelism(streaming_job.id().into())
1609 .await?;
1610 let fragment_graph = PbStreamFragmentGraph {
1611 max_parallelism: original_max_parallelism as _,
1612 ..fragment_graph
1613 };
1614
1615 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1617 streaming_job.set_info_from_graph(&fragment_graph);
1618
1619 let streaming_job = streaming_job;
1621
1622 let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1623 let auto_refresh_schema_sinks = self
1624 .metadata_manager
1625 .catalog_controller
1626 .get_sink_auto_refresh_schema_from(table.id as _)
1627 .await?;
1628 if !auto_refresh_schema_sinks.is_empty() {
1629 let original_table_columns = self
1630 .metadata_manager
1631 .catalog_controller
1632 .get_table_columns(table.id as _)
1633 .await?;
1634 let mut original_table_column_ids: HashSet<_> = original_table_columns
1636 .iter()
1637 .map(|col| col.column_id())
1638 .collect();
1639 let newly_added_columns = table
1640 .columns
1641 .iter()
1642 .filter(|col| {
1643 !original_table_column_ids.remove(&ColumnId::new(
1644 col.column_desc.as_ref().unwrap().column_id as _,
1645 ))
1646 })
1647 .map(|col| ColumnCatalog::from(col.clone()))
1648 .collect_vec();
1649 if !original_table_column_ids.is_empty() {
1650 return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1651 }
1652 let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1653 for sink in auto_refresh_schema_sinks {
1654 let sink_job_fragments = self
1655 .metadata_manager
1656 .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink.id))
1657 .await?;
1658 if sink_job_fragments.fragments.len() != 1 {
1659 return Err(anyhow!(
1660 "auto schema refresh sink must have only one fragment, but got {}",
1661 sink_job_fragments.fragments.len()
1662 )
1663 .into());
1664 }
1665 let original_sink_fragment =
1666 sink_job_fragments.fragments.into_values().next().unwrap();
1667 let (new_sink_fragment, new_sink_columns, new_log_store_table) =
1668 rewrite_refresh_schema_sink_fragment(
1669 &original_sink_fragment,
1670 &sink,
1671 &newly_added_columns,
1672 table,
1673 fragment_graph.table_fragment_id(),
1674 self.env.id_gen_manager(),
1675 )?;
1676
1677 assert_eq!(
1678 original_sink_fragment.actors.len(),
1679 new_sink_fragment.actors.len()
1680 );
1681 let actor_status = (0..original_sink_fragment.actors.len())
1682 .map(|i| {
1683 let worker_node_id = sink_job_fragments.actor_status
1684 [&original_sink_fragment.actors[i].actor_id]
1685 .location
1686 .as_ref()
1687 .unwrap()
1688 .worker_node_id;
1689 (
1690 new_sink_fragment.actors[i].actor_id,
1691 PbActorStatus {
1692 location: Some(PbActorLocation { worker_node_id }),
1693 state: PbActorState::Inactive as _,
1694 },
1695 )
1696 })
1697 .collect();
1698
1699 let streaming_job = StreamingJob::Sink(sink, None);
1700
1701 let tmp_sink_id = self
1702 .metadata_manager
1703 .catalog_controller
1704 .create_job_catalog_for_replace(&streaming_job, None, None, None)
1705 .await?;
1706 let StreamingJob::Sink(sink, _) = streaming_job else {
1707 unreachable!()
1708 };
1709
1710 sinks.push(AutoRefreshSchemaSinkContext {
1711 tmp_sink_id,
1712 original_sink: sink,
1713 original_fragment: original_sink_fragment,
1714 new_columns: new_sink_columns,
1715 new_fragment: new_sink_fragment,
1716 new_log_store_table,
1717 actor_status,
1718 });
1719 }
1720 Some(sinks)
1721 } else {
1722 None
1723 }
1724 } else {
1725 None
1726 };
1727
1728 let tmp_id = self
1729 .metadata_manager
1730 .catalog_controller
1731 .create_job_catalog_for_replace(
1732 &streaming_job,
1733 Some(&ctx),
1734 fragment_graph.specified_parallelism().as_ref(),
1735 Some(fragment_graph.max_parallelism()),
1736 )
1737 .await?;
1738
1739 let tmp_sink_ids = auto_refresh_schema_sinks
1740 .as_ref()
1741 .map(|sinks| sinks.iter().map(|sink| sink.tmp_sink_id).collect_vec());
1742
1743 tracing::debug!(id = job_id, "building replace streaming job");
1744 let mut updated_sink_catalogs = vec![];
1745
1746 let mut drop_table_connector_ctx = None;
1747 let result: MetaResult<_> = try {
1748 let (mut ctx, mut stream_job_fragments) = self
1749 .build_replace_job(
1750 ctx,
1751 &streaming_job,
1752 fragment_graph,
1753 tmp_id as _,
1754 auto_refresh_schema_sinks,
1755 )
1756 .await?;
1757 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1758 let auto_refresh_schema_sink_finish_ctx =
1759 ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1760 sinks
1761 .iter()
1762 .map(|sink| FinishAutoRefreshSchemaSinkContext {
1763 tmp_sink_id: sink.tmp_sink_id,
1764 original_sink_id: sink.original_sink.id as _,
1765 columns: sink.new_columns.clone(),
1766 new_log_store_table: sink
1767 .new_log_store_table
1768 .as_ref()
1769 .map(|table| (table.id as _, table.columns.clone())),
1770 })
1771 .collect()
1772 });
1773
1774 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1776 let catalogs = self
1777 .metadata_manager
1778 .get_sink_catalog_by_ids(&table.incoming_sinks)
1779 .await?;
1780
1781 for sink in catalogs {
1782 let sink_id = &sink.id;
1783
1784 let sink_table_fragments = self
1785 .metadata_manager
1786 .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(
1787 *sink_id,
1788 ))
1789 .await?;
1790
1791 let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
1792
1793 Self::inject_replace_table_plan_for_sink(
1794 *sink_id,
1795 &sink_fragment,
1796 table,
1797 &mut ctx,
1798 stream_job_fragments.inner.union_fragment_for_table(),
1799 Some(&sink.unique_identity()),
1800 );
1801
1802 if sink.original_target_columns.is_empty() {
1803 updated_sink_catalogs.push(sink.id as _);
1804 }
1805 }
1806 }
1807
1808 let replace_upstream = ctx.replace_upstream.clone();
1809
1810 if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1811 let empty_actor_splits = HashMap::new();
1812 let empty_downstreams = FragmentDownstreamRelation::default();
1813 for sink in sinks {
1814 self.metadata_manager
1815 .catalog_controller
1816 .prepare_streaming_job(
1817 sink.tmp_sink_id,
1818 || [&sink.new_fragment].into_iter(),
1819 &sink.actor_status,
1820 &empty_actor_splits,
1821 &empty_downstreams,
1822 false,
1823 sink.original_sink.definition.clone(),
1824 true,
1825 Some(&sink.original_sink),
1826 None,
1827 )
1828 .await?;
1829 }
1830 }
1831
1832 self.metadata_manager
1833 .catalog_controller
1834 .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1835 .await?;
1836
1837 self.stream_manager
1838 .replace_stream_job(stream_job_fragments, ctx)
1839 .await?;
1840 (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1841 };
1842
1843 match result {
1844 Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1845 let version = self
1846 .metadata_manager
1847 .catalog_controller
1848 .finish_replace_streaming_job(
1849 tmp_id,
1850 streaming_job,
1851 replace_upstream,
1852 SinkIntoTableContext {
1853 creating_sink_id: None,
1854 dropping_sink_id: None,
1855 updated_sink_catalogs,
1856 },
1857 drop_table_connector_ctx.as_ref(),
1858 auto_refresh_schema_sink_finish_ctx,
1859 )
1860 .await?;
1861 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1862 self.source_manager
1863 .apply_source_change(SourceChange::DropSource {
1864 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1865 })
1866 .await;
1867 }
1868 Ok(version)
1869 }
1870 Err(err) => {
1871 tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
1872 let _ = self.metadata_manager
1873 .catalog_controller
1874 .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1875 .await.inspect_err(|err| {
1876 tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
1877 });
1878 Err(err)
1879 }
1880 }
1881 }
1882
1883 #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1884 )]
1885 async fn drop_streaming_job(
1886 &self,
1887 job_id: StreamingJobId,
1888 drop_mode: DropMode,
1889 target_replace_info: Option<ReplaceStreamJobInfo>,
1890 ) -> MetaResult<NotificationVersion> {
1891 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1892
1893 let (object_id, object_type) = match job_id {
1894 StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
1895 StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
1896 StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
1897 StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
1898 };
1899
1900 let version = self
1901 .drop_object(object_type, object_id, drop_mode, target_replace_info)
1902 .await?;
1903 #[cfg(not(madsim))]
1904 if let StreamingJobId::Sink(sink_id) = job_id {
1905 let db = self.env.meta_store_ref().conn.clone();
1908 clean_all_rows_by_sink_id(&db, sink_id).await?;
1909 }
1910 Ok(version)
1911 }
1912
1913 fn resolve_stream_parallelism(
1917 &self,
1918 specified: Option<NonZeroUsize>,
1919 max: NonZeroUsize,
1920 cluster_info: &StreamingClusterInfo,
1921 resource_group: String,
1922 ) -> MetaResult<NonZeroUsize> {
1923 let available = cluster_info.parallelism(&resource_group);
1924 let Some(available) = NonZeroUsize::new(available) else {
1925 bail_unavailable!(
1926 "no available slots to schedule in resource group \"{}\", \
1927 have you allocated any compute nodes within this resource group?",
1928 resource_group
1929 );
1930 };
1931
1932 if let Some(specified) = specified {
1933 if specified > max {
1934 bail_invalid_parameter!(
1935 "specified parallelism {} should not exceed max parallelism {}",
1936 specified,
1937 max,
1938 );
1939 }
1940 if specified > available {
1941 bail_unavailable!(
1942 "insufficient parallelism to schedule in resource group \"{}\", \
1943 required: {}, available: {}",
1944 resource_group,
1945 specified,
1946 available,
1947 );
1948 }
1949 Ok(specified)
1950 } else {
1951 let default_parallelism = match self.env.opts.default_parallelism {
1953 DefaultParallelism::Full => available,
1954 DefaultParallelism::Default(num) => {
1955 if num > available {
1956 bail_unavailable!(
1957 "insufficient parallelism to schedule in resource group \"{}\", \
1958 required: {}, available: {}",
1959 resource_group,
1960 num,
1961 available,
1962 );
1963 }
1964 num
1965 }
1966 };
1967
1968 if default_parallelism > max {
1969 tracing::warn!(
1970 max_parallelism = max.get(),
1971 resource_group,
1972 "too many parallelism available, use max parallelism instead",
1973 );
1974 }
1975 Ok(default_parallelism.min(max))
1976 }
1977 }
1978
1979 #[await_tree::instrument]
1985 pub(crate) async fn build_stream_job(
1986 &self,
1987 stream_ctx: StreamContext,
1988 mut stream_job: StreamingJob,
1989 fragment_graph: StreamFragmentGraph,
1990 affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>,
1991 specific_resource_group: Option<String>,
1992 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1993 let id = stream_job.id();
1994 let specified_parallelism = fragment_graph.specified_parallelism();
1995 let expr_context = stream_ctx.to_expr_context();
1996 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1997
1998 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
2000
2001 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
2005 fragment_graph.collect_snapshot_backfill_info()?;
2006 assert!(
2007 snapshot_backfill_info
2008 .iter()
2009 .chain([&cross_db_snapshot_backfill_info])
2010 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
2011 .all(|backfill_epoch| backfill_epoch.is_none()),
2012 "should not set backfill epoch when initially build the job: {:?} {:?}",
2013 snapshot_backfill_info,
2014 cross_db_snapshot_backfill_info
2015 );
2016
2017 self.metadata_manager
2019 .catalog_controller
2020 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
2021 .await?;
2022
2023 let upstream_table_ids = fragment_graph
2024 .dependent_table_ids()
2025 .iter()
2026 .filter(|id| {
2027 !cross_db_snapshot_backfill_info
2028 .upstream_mv_table_id_to_backfill_epoch
2029 .contains_key(id)
2030 })
2031 .cloned()
2032 .collect();
2033
2034 let (upstream_root_fragments, existing_actor_location) = self
2035 .metadata_manager
2036 .get_upstream_root_fragments(&upstream_table_ids)
2037 .await?;
2038
2039 if snapshot_backfill_info.is_some() {
2040 match stream_job {
2041 StreamingJob::MaterializedView(_)
2042 | StreamingJob::Sink(_, _)
2043 | StreamingJob::Index(_, _) => {}
2044 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
2045 return Err(
2046 anyhow!("snapshot_backfill not enabled for table and source").into(),
2047 );
2048 }
2049 }
2050 }
2051
2052 let upstream_actors = upstream_root_fragments
2053 .values()
2054 .map(|fragment| {
2055 (
2056 fragment.fragment_id,
2057 fragment.actors.iter().map(|actor| actor.actor_id).collect(),
2058 )
2059 })
2060 .collect();
2061
2062 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
2063 fragment_graph,
2064 upstream_root_fragments,
2065 existing_actor_location,
2066 (&stream_job).into(),
2067 )?;
2068
2069 let resource_group = match specific_resource_group {
2070 None => {
2071 self.metadata_manager
2072 .get_database_resource_group(stream_job.database_id() as ObjectId)
2073 .await?
2074 }
2075 Some(resource_group) => resource_group,
2076 };
2077
2078 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2080
2081 let parallelism = self.resolve_stream_parallelism(
2082 specified_parallelism,
2083 max_parallelism,
2084 &cluster_info,
2085 resource_group.clone(),
2086 )?;
2087
2088 let parallelism = self
2089 .env
2090 .system_params_reader()
2091 .await
2092 .adaptive_parallelism_strategy()
2093 .compute_target_parallelism(parallelism.get());
2094
2095 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
2096 let actor_graph_builder = ActorGraphBuilder::new(
2097 id,
2098 resource_group,
2099 complete_graph,
2100 cluster_info,
2101 parallelism,
2102 )?;
2103
2104 let ActorGraphBuildResult {
2105 graph,
2106 downstream_fragment_relations,
2107 building_locations,
2108 upstream_fragment_downstreams,
2109 new_no_shuffle,
2110 replace_upstream,
2111 ..
2112 } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
2113 assert!(replace_upstream.is_empty());
2114
2115 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
2122 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
2123 _ => TableParallelism::Fixed(parallelism.get()),
2124 };
2125
2126 let stream_job_fragments = StreamJobFragments::new(
2127 id.into(),
2128 graph,
2129 &building_locations.actor_locations,
2130 stream_ctx.clone(),
2131 table_parallelism,
2132 max_parallelism.get(),
2133 );
2134
2135 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
2136 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
2137 }
2138
2139 let replace_table_job_info = match affected_table_replace_info {
2140 Some((table_stream_job, fragment_graph)) => {
2141 if snapshot_backfill_info.is_some() {
2142 return Err(anyhow!(
2143 "snapshot backfill should not have replace table info: {table_stream_job:?}"
2144 )
2145 .into());
2146 }
2147 let StreamingJob::Sink(sink, target_table) = &mut stream_job else {
2148 bail!("additional replace table event only occurs when sinking into table");
2149 };
2150
2151 table_stream_job.table().expect("should be table job");
2152 let tmp_id = self
2153 .metadata_manager
2154 .catalog_controller
2155 .create_job_catalog_for_replace(
2156 &table_stream_job,
2157 Some(&stream_ctx),
2158 fragment_graph.specified_parallelism().as_ref(),
2159 Some(fragment_graph.max_parallelism()),
2160 )
2161 .await? as u32;
2162
2163 let (context, table_fragments) = self
2164 .inject_replace_table_job_for_table_sink(
2165 tmp_id,
2166 &self.metadata_manager,
2167 stream_ctx,
2168 Some(sink),
2169 Some(&stream_job_fragments),
2170 None,
2171 &table_stream_job,
2172 fragment_graph,
2173 )
2174 .await?;
2175 must_match!(&table_stream_job, StreamingJob::Table(source, table, _) => {
2179 *target_table = Some((table.clone(), source.clone()));
2181 });
2182
2183 Some((table_stream_job, context, table_fragments))
2184 }
2185 None => None,
2186 };
2187
2188 let ctx = CreateStreamingJobContext {
2189 upstream_fragment_downstreams,
2190 new_no_shuffle,
2191 upstream_actors,
2192 building_locations,
2193 definition: stream_job.definition(),
2194 mv_table_id: stream_job.mv_table(),
2195 create_type: stream_job.create_type(),
2196 job_type: (&stream_job).into(),
2197 streaming_job: stream_job,
2198 replace_table_job_info,
2199 option: CreateStreamingJobOption {},
2200 snapshot_backfill_info,
2201 cross_db_snapshot_backfill_info,
2202 fragment_backfill_ordering,
2203 };
2204
2205 Ok((
2206 ctx,
2207 StreamJobFragmentsToCreate {
2208 inner: stream_job_fragments,
2209 downstreams: downstream_fragment_relations,
2210 },
2211 ))
2212 }
2213
2214 pub(crate) async fn build_replace_job(
2220 &self,
2221 stream_ctx: StreamContext,
2222 stream_job: &StreamingJob,
2223 mut fragment_graph: StreamFragmentGraph,
2224 tmp_job_id: TableId,
2225 auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
2226 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
2227 match &stream_job {
2228 StreamingJob::Table(..)
2229 | StreamingJob::Source(..)
2230 | StreamingJob::MaterializedView(..) => {}
2231 StreamingJob::Sink(..) | StreamingJob::Index(..) => {
2232 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
2233 }
2234 }
2235
2236 let id = stream_job.id();
2237 let expr_context = stream_ctx.to_expr_context();
2238
2239 let mut drop_table_associated_source_id = None;
2241 if let StreamingJob::Table(None, _, _) = &stream_job {
2242 drop_table_associated_source_id = self
2243 .metadata_manager
2244 .get_table_associated_source_id(id as _)
2245 .await?;
2246 }
2247
2248 let old_fragments = self
2249 .metadata_manager
2250 .get_job_fragments_by_id(&id.into())
2251 .await?;
2252 let old_internal_table_ids = old_fragments.internal_table_ids();
2253
2254 let mut drop_table_connector_ctx = None;
2256 if let Some(to_remove_source_id) = drop_table_associated_source_id {
2257 debug_assert!(old_internal_table_ids.len() == 1);
2259
2260 drop_table_connector_ctx = Some(DropTableConnectorContext {
2261 to_change_streaming_job_id: id as i32,
2264 to_remove_state_table_id: old_internal_table_ids[0] as i32, to_remove_source_id,
2266 });
2267 } else if stream_job.is_materialized_view() {
2268 let old_fragments_upstreams = self
2271 .metadata_manager
2272 .catalog_controller
2273 .upstream_fragments(old_fragments.fragment_ids())
2274 .await?;
2275
2276 let old_state_graph =
2277 state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2278 let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2279 let mapping =
2280 state_match::match_graph_internal_tables(&new_state_graph, &old_state_graph)
2281 .context("incompatible altering on the streaming job states")?;
2282
2283 fragment_graph.fit_internal_table_ids_with_mapping(mapping);
2284 } else {
2285 let old_internal_tables = self
2288 .metadata_manager
2289 .get_table_catalog_by_ids(old_internal_table_ids)
2290 .await?;
2291 fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2292 }
2293
2294 let original_root_fragment = old_fragments
2297 .root_fragment()
2298 .expect("root fragment not found");
2299
2300 let job_type = StreamingJobType::from(stream_job);
2301
2302 let (mut downstream_fragments, mut downstream_actor_location) =
2304 self.metadata_manager.get_downstream_fragments(id).await?;
2305
2306 if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2307 let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2308 .iter()
2309 .map(|sink| sink.original_fragment.fragment_id)
2310 .collect();
2311 for (_, downstream_fragment) in &mut downstream_fragments {
2312 if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2313 sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2314 }) {
2315 assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2316 for actor in &downstream_fragment.actors {
2317 downstream_actor_location.remove(&actor.actor_id);
2318 }
2319 for (actor_id, status) in &sink.actor_status {
2320 downstream_actor_location.insert(
2321 *actor_id,
2322 status.location.as_ref().unwrap().worker_node_id as WorkerId,
2323 );
2324 }
2325 *downstream_fragment = sink.new_fragment.clone();
2326 }
2327 }
2328 assert!(remaining_fragment.is_empty());
2329 }
2330
2331 let complete_graph = match &job_type {
2333 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2334 CompleteStreamFragmentGraph::with_downstreams(
2335 fragment_graph,
2336 original_root_fragment.fragment_id,
2337 downstream_fragments,
2338 downstream_actor_location,
2339 job_type,
2340 )?
2341 }
2342 StreamingJobType::Table(TableJobType::SharedCdcSource)
2343 | StreamingJobType::MaterializedView => {
2344 let (upstream_root_fragments, upstream_actor_location) = self
2346 .metadata_manager
2347 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2348 .await?;
2349
2350 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2351 fragment_graph,
2352 upstream_root_fragments,
2353 upstream_actor_location,
2354 original_root_fragment.fragment_id,
2355 downstream_fragments,
2356 downstream_actor_location,
2357 job_type,
2358 )?
2359 }
2360 _ => unreachable!(),
2361 };
2362
2363 let resource_group = self
2364 .metadata_manager
2365 .get_existing_job_resource_group(id as ObjectId)
2366 .await?;
2367
2368 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2370
2371 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2374 .expect("The number of actors in the original table fragment should be greater than 0");
2375
2376 let actor_graph_builder = ActorGraphBuilder::new(
2377 id,
2378 resource_group,
2379 complete_graph,
2380 cluster_info,
2381 parallelism,
2382 )?;
2383
2384 let ActorGraphBuildResult {
2385 graph,
2386 downstream_fragment_relations,
2387 building_locations,
2388 upstream_fragment_downstreams,
2389 mut replace_upstream,
2390 new_no_shuffle,
2391 ..
2392 } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
2393
2394 if matches!(
2396 job_type,
2397 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2398 ) {
2399 assert!(upstream_fragment_downstreams.is_empty());
2400 }
2401
2402 let stream_job_fragments = StreamJobFragments::new(
2406 (tmp_job_id as u32).into(),
2407 graph,
2408 &building_locations.actor_locations,
2409 stream_ctx,
2410 old_fragments.assigned_parallelism,
2411 old_fragments.max_parallelism,
2412 );
2413
2414 if let Some(sinks) = &auto_refresh_schema_sinks {
2415 for sink in sinks {
2416 replace_upstream
2417 .remove(&sink.new_fragment.fragment_id)
2418 .expect("should exist");
2419 }
2420 }
2421
2422 let ctx = ReplaceStreamJobContext {
2426 old_fragments,
2427 replace_upstream,
2428 new_no_shuffle,
2429 upstream_fragment_downstreams,
2430 building_locations,
2431 streaming_job: stream_job.clone(),
2432 tmp_id: tmp_job_id as _,
2433 drop_table_connector_ctx,
2434 auto_refresh_schema_sinks,
2435 };
2436
2437 Ok((
2438 ctx,
2439 StreamJobFragmentsToCreate {
2440 inner: stream_job_fragments,
2441 downstreams: downstream_fragment_relations,
2442 },
2443 ))
2444 }
2445
2446 async fn alter_name(
2447 &self,
2448 relation: alter_name_request::Object,
2449 new_name: &str,
2450 ) -> MetaResult<NotificationVersion> {
2451 let (obj_type, id) = match relation {
2452 alter_name_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2453 alter_name_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2454 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id as ObjectId),
2455 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2456 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2457 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2458 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2459 alter_name_request::Object::SubscriptionId(id) => {
2460 (ObjectType::Subscription, id as ObjectId)
2461 }
2462 };
2463 self.metadata_manager
2464 .catalog_controller
2465 .alter_name(obj_type, id, new_name)
2466 .await
2467 }
2468
2469 async fn alter_swap_rename(
2470 &self,
2471 object: alter_swap_rename_request::Object,
2472 ) -> MetaResult<NotificationVersion> {
2473 let (obj_type, src_id, dst_id) = match object {
2474 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2475 alter_swap_rename_request::Object::Table(objs) => {
2476 let (src_id, dst_id) = (
2477 objs.src_object_id as ObjectId,
2478 objs.dst_object_id as ObjectId,
2479 );
2480 (ObjectType::Table, src_id, dst_id)
2481 }
2482 alter_swap_rename_request::Object::View(objs) => {
2483 let (src_id, dst_id) = (
2484 objs.src_object_id as ObjectId,
2485 objs.dst_object_id as ObjectId,
2486 );
2487 (ObjectType::View, src_id, dst_id)
2488 }
2489 alter_swap_rename_request::Object::Source(objs) => {
2490 let (src_id, dst_id) = (
2491 objs.src_object_id as ObjectId,
2492 objs.dst_object_id as ObjectId,
2493 );
2494 (ObjectType::Source, src_id, dst_id)
2495 }
2496 alter_swap_rename_request::Object::Sink(objs) => {
2497 let (src_id, dst_id) = (
2498 objs.src_object_id as ObjectId,
2499 objs.dst_object_id as ObjectId,
2500 );
2501 (ObjectType::Sink, src_id, dst_id)
2502 }
2503 alter_swap_rename_request::Object::Subscription(objs) => {
2504 let (src_id, dst_id) = (
2505 objs.src_object_id as ObjectId,
2506 objs.dst_object_id as ObjectId,
2507 );
2508 (ObjectType::Subscription, src_id, dst_id)
2509 }
2510 };
2511
2512 self.metadata_manager
2513 .catalog_controller
2514 .alter_swap_rename(obj_type, src_id, dst_id)
2515 .await
2516 }
2517
2518 async fn alter_owner(
2519 &self,
2520 object: Object,
2521 owner_id: UserId,
2522 ) -> MetaResult<NotificationVersion> {
2523 let (obj_type, id) = match object {
2524 Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2525 Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2526 Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2527 Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2528 Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2529 Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2530 Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId),
2531 Object::ConnectionId(id) => (ObjectType::Connection, id as ObjectId),
2532 };
2533 self.metadata_manager
2534 .catalog_controller
2535 .alter_owner(obj_type, id, owner_id as _)
2536 .await
2537 }
2538
2539 async fn alter_set_schema(
2540 &self,
2541 object: alter_set_schema_request::Object,
2542 new_schema_id: SchemaId,
2543 ) -> MetaResult<NotificationVersion> {
2544 let (obj_type, id) = match object {
2545 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2546 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2547 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2548 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2549 alter_set_schema_request::Object::FunctionId(id) => {
2550 (ObjectType::Function, id as ObjectId)
2551 }
2552 alter_set_schema_request::Object::ConnectionId(id) => {
2553 (ObjectType::Connection, id as ObjectId)
2554 }
2555 alter_set_schema_request::Object::SubscriptionId(id) => {
2556 (ObjectType::Subscription, id as ObjectId)
2557 }
2558 };
2559 self.metadata_manager
2560 .catalog_controller
2561 .alter_schema(obj_type, id, new_schema_id as _)
2562 .await
2563 }
2564
2565 pub async fn wait(&self) -> MetaResult<()> {
2566 let timeout_ms = 30 * 60 * 1000;
2567 for _ in 0..timeout_ms {
2568 if self
2569 .metadata_manager
2570 .catalog_controller
2571 .list_background_creating_jobs(true)
2572 .await?
2573 .is_empty()
2574 {
2575 return Ok(());
2576 }
2577
2578 sleep(Duration::from_millis(1)).await;
2579 }
2580 Err(MetaError::cancelled(format!(
2581 "timeout after {timeout_ms}ms"
2582 )))
2583 }
2584
2585 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2586 self.metadata_manager
2587 .catalog_controller
2588 .comment_on(comment)
2589 .await
2590 }
2591}
2592
2593fn report_create_object(
2594 catalog_id: u32,
2595 event_name: &str,
2596 obj_type: PbTelemetryDatabaseObject,
2597 connector_name: Option<String>,
2598 attr_info: Option<jsonbb::Value>,
2599) {
2600 report_event(
2601 PbTelemetryEventStage::CreateStreamJob,
2602 event_name,
2603 catalog_id.into(),
2604 connector_name,
2605 Some(obj_type),
2606 attr_info,
2607 );
2608}
2609
2610async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> {
2611 match Entity::delete_many()
2612 .filter(Column::SinkId.eq(sink_id))
2613 .exec(db)
2614 .await
2615 {
2616 Ok(result) => {
2617 let deleted_count = result.rows_affected;
2618
2619 tracing::info!(
2620 "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2621 deleted_count,
2622 sink_id
2623 );
2624 Ok(())
2625 }
2626 Err(e) => {
2627 tracing::error!(
2628 "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2629 sink_id,
2630 e.as_report()
2631 );
2632 Err(e.into())
2633 }
2634 }
2635}