1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::{Context, anyhow};
22use itertools::Itertools;
23use risingwave_common::config::DefaultParallelism;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::column_index_mapping::ColIndexMapping;
28use risingwave_common::util::stream_graph_visitor::{
29 visit_stream_node, visit_stream_node_cont_mut,
30};
31use risingwave_common::{bail, bail_not_implemented, must_match};
32use risingwave_connector::WithOptionsSecResolved;
33use risingwave_connector::connector_common::validate_connection;
34use risingwave_connector::source::{
35 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
36};
37use risingwave_meta_model::actor_dispatcher::DispatcherType;
38use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::{
41 ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, SinkId, SourceId,
42 SubscriptionId, TableId, UserId, ViewId,
43};
44use risingwave_pb::catalog::{
45 Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
46 Subscription, Table, View,
47};
48use risingwave_pb::ddl_service::alter_owner_request::Object;
49use risingwave_pb::ddl_service::{
50 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
51 alter_swap_rename_request,
52};
53use risingwave_pb::stream_plan::stream_node::NodeBody;
54use risingwave_pb::stream_plan::{
55 FragmentTypeFlag, MergeNode, PbDispatcherType, PbStreamFragmentGraph,
56 StreamFragmentGraph as StreamFragmentGraphProto,
57};
58use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
59use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
60use thiserror_ext::AsReport;
61use tokio::sync::{Semaphore, oneshot};
62use tokio::time::sleep;
63use tracing::Instrument;
64
65use crate::barrier::BarrierManagerRef;
66use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
67use crate::controller::cluster::StreamingClusterInfo;
68use crate::controller::streaming_job::SinkIntoTableContext;
69use crate::error::{bail_invalid_parameter, bail_unavailable};
70use crate::manager::{
71 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
72 NotificationVersion, StreamingJob, StreamingJobType,
73};
74use crate::model::{
75 DownstreamFragmentRelation, Fragment, StreamContext, StreamJobFragments,
76 StreamJobFragmentsToCreate, TableParallelism,
77};
78use crate::stream::{
79 ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph,
80 CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef,
81 JobRescheduleTarget, ReplaceStreamJobContext, SourceChange, SourceManagerRef,
82 StreamFragmentGraph, create_source_worker, validate_sink,
83};
84use crate::telemetry::report_event;
85use crate::{MetaError, MetaResult};
86
87#[derive(PartialEq)]
88pub enum DropMode {
89 Restrict,
90 Cascade,
91}
92
93impl DropMode {
94 pub fn from_request_setting(cascade: bool) -> DropMode {
95 if cascade {
96 DropMode::Cascade
97 } else {
98 DropMode::Restrict
99 }
100 }
101}
102
103pub enum StreamingJobId {
104 MaterializedView(TableId),
105 Sink(SinkId),
106 Table(Option<SourceId>, TableId),
107 Index(IndexId),
108}
109
110impl StreamingJobId {
111 #[allow(dead_code)]
112 fn id(&self) -> TableId {
113 match self {
114 StreamingJobId::MaterializedView(id)
115 | StreamingJobId::Sink(id)
116 | StreamingJobId::Table(_, id)
117 | StreamingJobId::Index(id) => *id,
118 }
119 }
120}
121
122pub struct ReplaceStreamJobInfo {
125 pub streaming_job: StreamingJob,
126 pub fragment_graph: StreamFragmentGraphProto,
127 pub col_index_mapping: Option<ColIndexMapping>,
128}
129
130pub enum DdlCommand {
131 CreateDatabase(Database),
132 DropDatabase(DatabaseId),
133 CreateSchema(Schema),
134 DropSchema(SchemaId, DropMode),
135 CreateNonSharedSource(Source),
136 DropSource(SourceId, DropMode),
137 CreateFunction(Function),
138 DropFunction(FunctionId),
139 CreateView(View),
140 DropView(ViewId, DropMode),
141 CreateStreamingJob(
142 StreamingJob,
143 StreamFragmentGraphProto,
144 CreateType,
145 Option<ReplaceStreamJobInfo>,
146 HashSet<ObjectId>,
147 Option<String>, ),
149 DropStreamingJob(StreamingJobId, DropMode, Option<ReplaceStreamJobInfo>),
150 AlterName(alter_name_request::Object, String),
151 AlterSwapRename(alter_swap_rename_request::Object),
152 ReplaceStreamJob(ReplaceStreamJobInfo),
153 AlterNonSharedSource(Source),
154 AlterObjectOwner(Object, UserId),
155 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
156 CreateConnection(Connection),
157 DropConnection(ConnectionId),
158 CreateSecret(Secret),
159 AlterSecret(Secret),
160 DropSecret(SecretId),
161 CommentOn(Comment),
162 CreateSubscription(Subscription),
163 DropSubscription(SubscriptionId, DropMode),
164}
165
166impl DdlCommand {
167 fn allow_in_recovery(&self) -> bool {
168 match self {
169 DdlCommand::DropDatabase(_)
170 | DdlCommand::DropSchema(_, _)
171 | DdlCommand::DropSource(_, _)
172 | DdlCommand::DropFunction(_)
173 | DdlCommand::DropView(_, _)
174 | DdlCommand::DropStreamingJob(_, _, _)
175 | DdlCommand::DropConnection(_)
176 | DdlCommand::DropSecret(_)
177 | DdlCommand::DropSubscription(_, _)
178 | DdlCommand::AlterName(_, _)
179 | DdlCommand::AlterObjectOwner(_, _)
180 | DdlCommand::AlterSetSchema(_, _)
181 | DdlCommand::CreateDatabase(_)
182 | DdlCommand::CreateSchema(_)
183 | DdlCommand::CreateFunction(_)
184 | DdlCommand::CreateView(_)
185 | DdlCommand::CreateConnection(_)
186 | DdlCommand::CommentOn(_)
187 | DdlCommand::CreateSecret(_)
188 | DdlCommand::AlterSecret(_)
189 | DdlCommand::AlterSwapRename(_) => true,
190 DdlCommand::CreateStreamingJob(_, _, _, _, _, _)
191 | DdlCommand::CreateNonSharedSource(_)
192 | DdlCommand::ReplaceStreamJob(_)
193 | DdlCommand::AlterNonSharedSource(_)
194 | DdlCommand::CreateSubscription(_) => false,
195 }
196 }
197}
198
199#[derive(Clone)]
200pub struct DdlController {
201 pub(crate) env: MetaSrvEnv,
202
203 pub(crate) metadata_manager: MetadataManager,
204 pub(crate) stream_manager: GlobalStreamManagerRef,
205 pub(crate) source_manager: SourceManagerRef,
206 barrier_manager: BarrierManagerRef,
207
208 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
210}
211
212#[derive(Clone)]
213pub struct CreatingStreamingJobPermit {
214 pub(crate) semaphore: Arc<Semaphore>,
215}
216
217impl CreatingStreamingJobPermit {
218 async fn new(env: &MetaSrvEnv) -> Self {
219 let mut permits = env
220 .system_params_reader()
221 .await
222 .max_concurrent_creating_streaming_jobs() as usize;
223 if permits == 0 {
224 permits = Semaphore::MAX_PERMITS;
226 }
227 let semaphore = Arc::new(Semaphore::new(permits));
228
229 let (local_notification_tx, mut local_notification_rx) =
230 tokio::sync::mpsc::unbounded_channel();
231 env.notification_manager()
232 .insert_local_sender(local_notification_tx)
233 .await;
234 let semaphore_clone = semaphore.clone();
235 tokio::spawn(async move {
236 while let Some(notification) = local_notification_rx.recv().await {
237 let LocalNotification::SystemParamsChange(p) = ¬ification else {
238 continue;
239 };
240 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
241 if new_permits == 0 {
242 new_permits = Semaphore::MAX_PERMITS;
243 }
244 match permits.cmp(&new_permits) {
245 Ordering::Less => {
246 semaphore_clone.add_permits(new_permits - permits);
247 }
248 Ordering::Equal => continue,
249 Ordering::Greater => {
250 semaphore_clone
251 .acquire_many((permits - new_permits) as u32)
252 .await
253 .unwrap()
254 .forget();
255 }
256 }
257 tracing::info!(
258 "max_concurrent_creating_streaming_jobs changed from {} to {}",
259 permits,
260 new_permits
261 );
262 permits = new_permits;
263 }
264 });
265
266 Self { semaphore }
267 }
268}
269
270impl DdlController {
271 pub async fn new(
272 env: MetaSrvEnv,
273 metadata_manager: MetadataManager,
274 stream_manager: GlobalStreamManagerRef,
275 source_manager: SourceManagerRef,
276 barrier_manager: BarrierManagerRef,
277 ) -> Self {
278 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
279 Self {
280 env,
281 metadata_manager,
282 stream_manager,
283 source_manager,
284 barrier_manager,
285 creating_streaming_job_permits,
286 }
287 }
288
289 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
296 if !command.allow_in_recovery() {
297 self.barrier_manager.check_status_running()?;
298 }
299 let ctrl = self.clone();
300 let fut = async move {
301 match command {
302 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
303 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
304 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
305 DdlCommand::DropSchema(schema_id, drop_mode) => {
306 ctrl.drop_schema(schema_id, drop_mode).await
307 }
308 DdlCommand::CreateNonSharedSource(source) => {
309 ctrl.create_non_shared_source(source).await
310 }
311 DdlCommand::DropSource(source_id, drop_mode) => {
312 ctrl.drop_source(source_id, drop_mode).await
313 }
314 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
315 DdlCommand::DropFunction(function_id) => ctrl.drop_function(function_id).await,
316 DdlCommand::CreateView(view) => ctrl.create_view(view).await,
317 DdlCommand::DropView(view_id, drop_mode) => {
318 ctrl.drop_view(view_id, drop_mode).await
319 }
320 DdlCommand::CreateStreamingJob(
321 stream_job,
322 fragment_graph,
323 _create_type,
324 affected_table_replace_info,
325 dependencies,
326 specific_resource_group,
327 ) => {
328 ctrl.create_streaming_job(
329 stream_job,
330 fragment_graph,
331 affected_table_replace_info,
332 dependencies,
333 specific_resource_group,
334 )
335 .await
336 }
337 DdlCommand::DropStreamingJob(job_id, drop_mode, target_replace_info) => {
338 ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info)
339 .await
340 }
341 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
342 streaming_job,
343 fragment_graph,
344 col_index_mapping,
345 }) => {
346 ctrl.replace_job(streaming_job, fragment_graph, col_index_mapping)
347 .await
348 }
349 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
350 DdlCommand::AlterObjectOwner(object, owner_id) => {
351 ctrl.alter_owner(object, owner_id).await
352 }
353 DdlCommand::AlterSetSchema(object, new_schema_id) => {
354 ctrl.alter_set_schema(object, new_schema_id).await
355 }
356 DdlCommand::CreateConnection(connection) => {
357 ctrl.create_connection(connection).await
358 }
359 DdlCommand::DropConnection(connection_id) => {
360 ctrl.drop_connection(connection_id).await
361 }
362 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
363 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
364 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
365 DdlCommand::AlterNonSharedSource(source) => {
366 ctrl.alter_non_shared_source(source).await
367 }
368 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
369 DdlCommand::CreateSubscription(subscription) => {
370 ctrl.create_subscription(subscription).await
371 }
372 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
373 ctrl.drop_subscription(subscription_id, drop_mode).await
374 }
375 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
376 }
377 }
378 .in_current_span();
379 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
380 Ok(Some(WaitVersion {
381 catalog_version: notification_version,
382 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
383 }))
384 }
385
386 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
387 self.barrier_manager.get_ddl_progress().await
388 }
389
390 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
391 self.metadata_manager
392 .catalog_controller
393 .create_database(database)
394 .await
395 }
396
397 #[tracing::instrument(skip(self), level = "debug")]
398 pub async fn reschedule_streaming_job(
399 &self,
400 job_id: u32,
401 target: JobRescheduleTarget,
402 mut deferred: bool,
403 ) -> MetaResult<()> {
404 tracing::info!("alter parallelism");
405 if self.barrier_manager.check_status_running().is_err() {
406 tracing::info!(
407 "alter parallelism is set to deferred mode because the system is in recovery state"
408 );
409 deferred = true;
410 }
411
412 self.stream_manager
413 .reschedule_streaming_job(job_id, target, deferred)
414 .await
415 }
416
417 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
418 self.drop_object(
419 ObjectType::Database,
420 database_id as _,
421 DropMode::Cascade,
422 None,
423 )
424 .await
425 }
426
427 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
428 self.metadata_manager
429 .catalog_controller
430 .create_schema(schema)
431 .await
432 }
433
434 async fn drop_schema(
435 &self,
436 schema_id: SchemaId,
437 drop_mode: DropMode,
438 ) -> MetaResult<NotificationVersion> {
439 self.drop_object(ObjectType::Schema, schema_id as _, drop_mode, None)
440 .await
441 }
442
443 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
445 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
446 .await
447 .context("failed to create source worker")?;
448
449 let (source_id, version) = self
450 .metadata_manager
451 .catalog_controller
452 .create_source(source)
453 .await?;
454 self.source_manager
455 .register_source_with_handle(source_id, handle)
456 .await;
457 Ok(version)
458 }
459
460 async fn drop_source(
461 &self,
462 source_id: SourceId,
463 drop_mode: DropMode,
464 ) -> MetaResult<NotificationVersion> {
465 self.drop_object(ObjectType::Source, source_id as _, drop_mode, None)
466 .await
467 }
468
469 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
472 self.metadata_manager
473 .catalog_controller
474 .alter_non_shared_source(source)
475 .await
476 }
477
478 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
479 self.metadata_manager
480 .catalog_controller
481 .create_function(function)
482 .await
483 }
484
485 async fn drop_function(&self, function_id: FunctionId) -> MetaResult<NotificationVersion> {
486 self.drop_object(
487 ObjectType::Function,
488 function_id as _,
489 DropMode::Restrict,
490 None,
491 )
492 .await
493 }
494
495 async fn create_view(&self, view: View) -> MetaResult<NotificationVersion> {
496 self.metadata_manager
497 .catalog_controller
498 .create_view(view)
499 .await
500 }
501
502 async fn drop_view(
503 &self,
504 view_id: ViewId,
505 drop_mode: DropMode,
506 ) -> MetaResult<NotificationVersion> {
507 self.drop_object(ObjectType::View, view_id as _, drop_mode, None)
508 .await
509 }
510
511 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
512 validate_connection(&connection).await?;
513 self.metadata_manager
514 .catalog_controller
515 .create_connection(connection)
516 .await
517 }
518
519 async fn drop_connection(
520 &self,
521 connection_id: ConnectionId,
522 ) -> MetaResult<NotificationVersion> {
523 self.drop_object(
524 ObjectType::Connection,
525 connection_id as _,
526 DropMode::Restrict,
527 None,
528 )
529 .await
530 }
531
532 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
535 let secret_store_private_key = self
536 .env
537 .opts
538 .secret_store_private_key
539 .clone()
540 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
541
542 let encrypted_payload = SecretEncryption::encrypt(
543 secret_store_private_key.as_slice(),
544 secret.get_value().as_slice(),
545 )
546 .context(format!("failed to encrypt secret {}", secret.name))?;
547 Ok(encrypted_payload
548 .serialize()
549 .context(format!("failed to serialize secret {}", secret.name))?)
550 }
551
552 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
553 let secret_plain_payload = secret.value.clone();
556 let encrypted_payload = self.get_encrypted_payload(&secret)?;
557 secret.value = encrypted_payload;
558
559 self.metadata_manager
560 .catalog_controller
561 .create_secret(secret, secret_plain_payload)
562 .await
563 }
564
565 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
566 self.drop_object(ObjectType::Secret, secret_id as _, DropMode::Restrict, None)
567 .await
568 }
569
570 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
571 let secret_plain_payload = secret.value.clone();
572 let encrypted_payload = self.get_encrypted_payload(&secret)?;
573 secret.value = encrypted_payload;
574 self.metadata_manager
575 .catalog_controller
576 .alter_secret(secret, secret_plain_payload)
577 .await
578 }
579
580 async fn create_subscription(
581 &self,
582 mut subscription: Subscription,
583 ) -> MetaResult<NotificationVersion> {
584 tracing::debug!("create subscription");
585 let _permit = self
586 .creating_streaming_job_permits
587 .semaphore
588 .acquire()
589 .await
590 .unwrap();
591 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
592 self.metadata_manager
593 .catalog_controller
594 .create_subscription_catalog(&mut subscription)
595 .await?;
596 self.stream_manager
597 .create_subscription(&subscription)
598 .await
599 .inspect_err(|e| {
600 tracing::debug!(error = %e.as_report(), "cancel create subscription");
601 })?;
602
603 let version = self
604 .metadata_manager
605 .catalog_controller
606 .notify_create_subscription(subscription.id)
607 .await?;
608 tracing::debug!("finish create subscription");
609 Ok(version)
610 }
611
612 async fn drop_subscription(
613 &self,
614 subscription_id: SubscriptionId,
615 drop_mode: DropMode,
616 ) -> MetaResult<NotificationVersion> {
617 tracing::debug!("preparing drop subscription");
618 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
619 let subscription = self
620 .metadata_manager
621 .catalog_controller
622 .get_subscription_by_id(subscription_id)
623 .await?;
624 let table_id = subscription.dependent_table_id;
625 let database_id = subscription.database_id.into();
626 let (_, version) = self
627 .metadata_manager
628 .catalog_controller
629 .drop_object(ObjectType::Subscription, subscription_id as _, drop_mode)
630 .await?;
631 self.stream_manager
632 .drop_subscription(database_id, subscription_id as _, table_id)
633 .await;
634 tracing::debug!("finish drop subscription");
635 Ok(version)
636 }
637
638 pub(crate) async fn validate_cdc_table(
640 table: &Table,
641 table_fragments: &StreamJobFragments,
642 ) -> MetaResult<()> {
643 let stream_scan_fragment = table_fragments
644 .fragments
645 .values()
646 .filter(|f| f.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0)
647 .exactly_one()
648 .ok()
649 .with_context(|| {
650 format!(
651 "expect exactly one stream scan fragment, got: {:?}",
652 table_fragments.fragments
653 )
654 })?;
655
656 assert_eq!(
657 stream_scan_fragment.actors.len(),
658 1,
659 "Stream scan fragment should have only one actor"
660 );
661 let mut found_cdc_scan = false;
662 match &stream_scan_fragment.nodes.node_body {
663 Some(NodeBody::StreamCdcScan(_)) => {
664 if Self::validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
665 .await?
666 {
667 found_cdc_scan = true;
668 }
669 }
670 Some(NodeBody::Project(_)) => {
672 for input in &stream_scan_fragment.nodes.input {
673 if Self::validate_cdc_table_inner(&input.node_body, table.id).await? {
674 found_cdc_scan = true;
675 }
676 }
677 }
678 _ => {
679 bail!("Unexpected node body for stream cdc scan");
680 }
681 };
682 if !found_cdc_scan {
683 bail!("No stream cdc scan node found in stream scan fragment");
684 }
685 Ok(())
686 }
687
688 async fn validate_cdc_table_inner(
689 node_body: &Option<NodeBody>,
690 table_id: u32,
691 ) -> MetaResult<bool> {
692 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
693 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
694 {
695 let options_with_secret = WithOptionsSecResolved::new(
696 cdc_table_desc.connect_properties.clone(),
697 cdc_table_desc.secret_refs.clone(),
698 );
699
700 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
701 props.init_from_pb_cdc_table_desc(cdc_table_desc);
702
703 let _enumerator = props
705 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
706 .await?;
707
708 tracing::debug!(?table_id, "validate cdc table success");
709 Ok(true)
710 } else {
711 Ok(false)
712 }
713 }
714
715 pub(crate) async fn inject_replace_table_job_for_table_sink(
719 &self,
720 tmp_id: u32,
721 mgr: &MetadataManager,
722 stream_ctx: StreamContext,
723 sink: Option<&Sink>,
724 creating_sink_table_fragments: Option<&StreamJobFragments>,
725 dropping_sink_id: Option<SinkId>,
726 streaming_job: &StreamingJob,
727 fragment_graph: StreamFragmentGraph,
728 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
729 let (mut replace_table_ctx, mut stream_job_fragments) = self
730 .build_replace_job(stream_ctx, streaming_job, fragment_graph, None, tmp_id as _)
731 .await?;
732
733 let target_table = streaming_job.table().unwrap();
734
735 if let Some(creating_sink_table_fragments) = creating_sink_table_fragments {
736 let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();
737 let sink = sink.expect("sink not found");
738 Self::inject_replace_table_plan_for_sink(
739 sink.id,
740 &sink_fragment,
741 target_table,
742 &mut replace_table_ctx,
743 stream_job_fragments.inner.union_fragment_for_table(),
744 None,
745 );
746 }
747
748 let [table_catalog]: [_; 1] = mgr
749 .get_table_catalog_by_ids(vec![target_table.id])
750 .await?
751 .try_into()
752 .expect("Target table should exist in sink into table");
753
754 assert_eq!(table_catalog.incoming_sinks, target_table.incoming_sinks);
755
756 {
757 let catalogs = mgr
758 .get_sink_catalog_by_ids(&table_catalog.incoming_sinks)
759 .await?;
760
761 for sink in catalogs {
762 let sink_id = sink.id;
763
764 if let Some(dropping_sink_id) = dropping_sink_id
765 && sink_id == (dropping_sink_id as u32)
766 {
767 continue;
768 };
769
770 let sink_table_fragments = mgr
771 .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id))
772 .await?;
773
774 let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
775
776 Self::inject_replace_table_plan_for_sink(
777 sink_id,
778 &sink_fragment,
779 target_table,
780 &mut replace_table_ctx,
781 stream_job_fragments.inner.union_fragment_for_table(),
782 Some(&sink.unique_identity()),
783 );
784 }
785 }
786
787 for fragment in stream_job_fragments.fragments.values() {
789 {
790 {
791 visit_stream_node(&fragment.nodes, |node| {
792 if let NodeBody::Merge(merge_node) = node {
793 let upstream_fragment_id = merge_node.upstream_fragment_id;
794 if let Some(external_upstream_fragment_downstreams) = replace_table_ctx
795 .upstream_fragment_downstreams
796 .get(&upstream_fragment_id)
797 {
798 let mut upstream_fragment_downstreams =
799 external_upstream_fragment_downstreams.iter().filter(
800 |downstream| {
801 downstream.downstream_fragment_id
802 == fragment.fragment_id
803 },
804 );
805 assert!(
806 upstream_fragment_downstreams.next().is_some(),
807 "All the mergers for the union should have been fully assigned beforehand."
808 );
809 } else {
810 let mut upstream_fragment_downstreams = stream_job_fragments
811 .downstreams
812 .get(&upstream_fragment_id)
813 .into_iter()
814 .flatten()
815 .filter(|downstream| {
816 downstream.downstream_fragment_id == fragment.fragment_id
817 });
818 assert!(
819 upstream_fragment_downstreams.next().is_some(),
820 "All the mergers for the union should have been fully assigned beforehand."
821 );
822 }
823 }
824 });
825 }
826 }
827 }
828
829 Ok((replace_table_ctx, stream_job_fragments))
830 }
831
832 pub(crate) fn inject_replace_table_plan_for_sink(
833 sink_id: u32,
834 sink_fragment: &Fragment,
835 table: &Table,
836 replace_table_ctx: &mut ReplaceStreamJobContext,
837 union_fragment: &mut Fragment,
838 unique_identity: Option<&str>,
839 ) {
840 let sink_fields = sink_fragment.nodes.fields.clone();
841
842 let output_indices = sink_fields
843 .iter()
844 .enumerate()
845 .map(|(idx, _)| idx as _)
846 .collect_vec();
847
848 let dist_key_indices = table.distribution_key.iter().map(|i| *i as _).collect_vec();
849
850 let sink_fragment_downstreams = replace_table_ctx
851 .upstream_fragment_downstreams
852 .entry(sink_fragment.fragment_id)
853 .or_default();
854
855 {
856 sink_fragment_downstreams.push(DownstreamFragmentRelation {
857 downstream_fragment_id: union_fragment.fragment_id,
858 dispatcher_type: DispatcherType::Hash,
859 dist_key_indices: dist_key_indices.clone(),
860 output_indices: output_indices.clone(),
861 });
862 }
863
864 let upstream_fragment_id = sink_fragment.fragment_id;
865
866 {
867 {
868 visit_stream_node_cont_mut(&mut union_fragment.nodes, |node| {
869 if let Some(NodeBody::Union(_)) = &mut node.node_body {
870 for input_project_node in &mut node.input {
871 if let Some(NodeBody::Project(_)) = &mut input_project_node.node_body {
872 let merge_stream_node =
873 input_project_node.input.iter_mut().exactly_one().unwrap();
874
875 if input_project_node.identity.as_str()
877 != unique_identity
878 .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
879 {
880 continue;
881 }
882
883 if let Some(NodeBody::Merge(merge_node)) =
884 &mut merge_stream_node.node_body
885 {
886 {
887 merge_stream_node.identity =
888 format!("MergeExecutor(from sink {})", sink_id);
889
890 input_project_node.identity =
891 format!("ProjectExecutor(from sink {})", sink_id);
892 }
893
894 **merge_node = {
895 #[expect(deprecated)]
896 MergeNode {
897 upstream_actor_id: vec![],
898 upstream_fragment_id,
899 upstream_dispatcher_type: PbDispatcherType::Hash as _,
900 fields: sink_fields.to_vec(),
901 }
902 };
903
904 merge_stream_node.fields = sink_fields.to_vec();
905
906 return false;
907 }
908 }
909 }
910 }
911 true
912 });
913 }
914 }
915 }
916
917 pub async fn create_streaming_job(
920 &self,
921 mut streaming_job: StreamingJob,
922 fragment_graph: StreamFragmentGraphProto,
923 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
924 dependencies: HashSet<ObjectId>,
925 specific_resource_group: Option<String>,
926 ) -> MetaResult<NotificationVersion> {
927 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
928 self.metadata_manager
929 .catalog_controller
930 .create_job_catalog(
931 &mut streaming_job,
932 &ctx,
933 &fragment_graph.parallelism,
934 fragment_graph.max_parallelism as _,
935 dependencies,
936 specific_resource_group.clone(),
937 )
938 .await?;
939 let job_id = streaming_job.id();
940
941 tracing::debug!(
942 id = job_id,
943 definition = streaming_job.definition(),
944 create_type = streaming_job.create_type().as_str_name(),
945 "starting streaming job",
946 );
947 let _permit = self
948 .creating_streaming_job_permits
949 .semaphore
950 .acquire()
951 .await
952 .unwrap();
953 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
954
955 let name = streaming_job.name();
956 let definition = streaming_job.definition();
957 let source_id = match &streaming_job {
958 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
959 _ => None,
960 };
961
962 match self
964 .create_streaming_job_inner(
965 ctx,
966 streaming_job,
967 fragment_graph,
968 affected_table_replace_info,
969 specific_resource_group,
970 )
971 .await
972 {
973 Ok(version) => Ok(version),
974 Err(err) => {
975 tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
976 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
977 id: job_id,
978 name,
979 definition,
980 error: err.as_report().to_string(),
981 };
982 self.env.event_log_manager_ref().add_event_logs(vec![
983 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
984 ]);
985 let (aborted, _) = self
986 .metadata_manager
987 .catalog_controller
988 .try_abort_creating_streaming_job(job_id as _, false)
989 .await?;
990 if aborted {
991 tracing::warn!(id = job_id, "aborted streaming job");
992 if let Some(source_id) = source_id {
994 self.source_manager
995 .apply_source_change(SourceChange::DropSource {
996 dropped_source_ids: vec![source_id as SourceId],
997 })
998 .await;
999 }
1000 }
1001 Err(err)
1002 }
1003 }
1004 }
1005
1006 async fn create_streaming_job_inner(
1007 &self,
1008 ctx: StreamContext,
1009 mut streaming_job: StreamingJob,
1010 fragment_graph: StreamFragmentGraphProto,
1011 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1012 specific_resource_group: Option<String>,
1013 ) -> MetaResult<NotificationVersion> {
1014 let mut fragment_graph =
1015 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1016 streaming_job.set_info_from_graph(&fragment_graph);
1017
1018 let incomplete_internal_tables = fragment_graph
1020 .incomplete_internal_tables()
1021 .into_values()
1022 .collect_vec();
1023 let table_id_map = self
1024 .metadata_manager
1025 .catalog_controller
1026 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1027 .await?;
1028 fragment_graph.refill_internal_table_ids(table_id_map);
1029
1030 let affected_table_replace_info = match affected_table_replace_info {
1031 Some(replace_table_info) => {
1032 assert!(
1033 specific_resource_group.is_none(),
1034 "specific_resource_group is not supported for replace table (alter column or sink into table)"
1035 );
1036
1037 let ReplaceStreamJobInfo {
1038 mut streaming_job,
1039 fragment_graph,
1040 ..
1041 } = replace_table_info;
1042
1043 let original_max_parallelism = self
1045 .metadata_manager
1046 .get_job_max_parallelism(streaming_job.id().into())
1047 .await?;
1048 let fragment_graph = PbStreamFragmentGraph {
1049 max_parallelism: original_max_parallelism as _,
1050 ..fragment_graph
1051 };
1052
1053 let fragment_graph =
1054 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1055 streaming_job.set_info_from_graph(&fragment_graph);
1056 let streaming_job = streaming_job;
1057
1058 Some((streaming_job, fragment_graph))
1059 }
1060 None => None,
1061 };
1062
1063 tracing::debug!(id = streaming_job.id(), "building streaming job");
1065 let (ctx, stream_job_fragments) = self
1066 .build_stream_job(
1067 ctx,
1068 streaming_job,
1069 fragment_graph,
1070 affected_table_replace_info,
1071 specific_resource_group,
1072 )
1073 .await?;
1074
1075 let streaming_job = &ctx.streaming_job;
1076
1077 match streaming_job {
1078 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1079 Self::validate_cdc_table(table, &stream_job_fragments).await?;
1080 }
1081 StreamingJob::Table(Some(source), ..) => {
1082 self.source_manager.register_source(source).await?;
1084 let connector_name = source
1085 .get_with_properties()
1086 .get(UPSTREAM_SOURCE_KEY)
1087 .cloned();
1088 let attr = source.info.as_ref().map(|source_info| {
1089 jsonbb::json!({
1090 "format": source_info.format().as_str_name(),
1091 "encode": source_info.row_encode().as_str_name(),
1092 })
1093 });
1094 report_create_object(
1095 streaming_job.id(),
1096 "source",
1097 PbTelemetryDatabaseObject::Source,
1098 connector_name,
1099 attr,
1100 );
1101 }
1102 StreamingJob::Sink(sink, _) => {
1103 validate_sink(sink).await?;
1105 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1106 let attr = sink.format_desc.as_ref().map(|sink_info| {
1107 jsonbb::json!({
1108 "format": sink_info.format().as_str_name(),
1109 "encode": sink_info.encode().as_str_name(),
1110 })
1111 });
1112 report_create_object(
1113 streaming_job.id(),
1114 "sink",
1115 PbTelemetryDatabaseObject::Sink,
1116 connector_name,
1117 attr,
1118 );
1119 }
1120 StreamingJob::Source(source) => {
1121 self.source_manager.register_source(source).await?;
1123 let connector_name = source
1124 .get_with_properties()
1125 .get(UPSTREAM_SOURCE_KEY)
1126 .cloned();
1127 let attr = source.info.as_ref().map(|source_info| {
1128 jsonbb::json!({
1129 "format": source_info.format().as_str_name(),
1130 "encode": source_info.row_encode().as_str_name(),
1131 })
1132 });
1133 report_create_object(
1134 streaming_job.id(),
1135 "source",
1136 PbTelemetryDatabaseObject::Source,
1137 connector_name,
1138 attr,
1139 );
1140 }
1141 _ => {}
1142 }
1143
1144 self.metadata_manager
1145 .catalog_controller
1146 .prepare_streaming_job(&stream_job_fragments, streaming_job, false)
1147 .await?;
1148
1149 let stream_job_id = streaming_job.id();
1151 match (streaming_job.create_type(), &streaming_job) {
1152 (CreateType::Unspecified, _)
1153 | (CreateType::Foreground, _)
1154 | (CreateType::Background, StreamingJob::Sink(_, _)) => {
1156 let version = self.stream_manager
1157 .create_streaming_job(stream_job_fragments, ctx, None)
1158 .await?;
1159 Ok(version)
1160 }
1161 (CreateType::Background, _) => {
1162 let ctrl = self.clone();
1163 let (tx, rx) = oneshot::channel();
1164 let fut = async move {
1165 let _ = ctrl
1166 .stream_manager
1167 .create_streaming_job(stream_job_fragments, ctx, Some(tx))
1168 .await.inspect_err(|err| {
1169 tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
1170 });
1171 };
1172 tokio::spawn(fut);
1173 rx.await.map_err(|_| anyhow!("failed to receive create streaming job result of job: {}", stream_job_id))??;
1174 Ok(IGNORED_NOTIFICATION_VERSION)
1175 }
1176 }
1177 }
1178
1179 pub async fn drop_object(
1181 &self,
1182 object_type: ObjectType,
1183 object_id: ObjectId,
1184 drop_mode: DropMode,
1185 target_replace_info: Option<ReplaceStreamJobInfo>,
1186 ) -> MetaResult<NotificationVersion> {
1187 let (release_ctx, mut version) = self
1188 .metadata_manager
1189 .catalog_controller
1190 .drop_object(object_type, object_id, drop_mode)
1191 .await?;
1192
1193 if let Some(replace_table_info) = target_replace_info {
1194 let stream_ctx =
1195 StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap());
1196
1197 let ReplaceStreamJobInfo {
1198 mut streaming_job,
1199 fragment_graph,
1200 ..
1201 } = replace_table_info;
1202
1203 let sink_id = if let ObjectType::Sink = object_type {
1204 object_id as _
1205 } else {
1206 panic!("additional replace table event only occurs when dropping sink into table")
1207 };
1208
1209 let original_max_parallelism = self
1211 .metadata_manager
1212 .get_job_max_parallelism(streaming_job.id().into())
1213 .await?;
1214 let fragment_graph = PbStreamFragmentGraph {
1215 max_parallelism: original_max_parallelism as _,
1216 ..fragment_graph
1217 };
1218
1219 let fragment_graph =
1220 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1221 streaming_job.set_info_from_graph(&fragment_graph);
1222 let streaming_job = streaming_job;
1223
1224 streaming_job.table().expect("should be table job");
1225
1226 tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink");
1227 let tmp_id = self
1228 .metadata_manager
1229 .catalog_controller
1230 .create_job_catalog_for_replace(
1231 &streaming_job,
1232 &stream_ctx,
1233 &fragment_graph.specified_parallelism(),
1234 fragment_graph.max_parallelism(),
1235 )
1236 .await? as u32;
1237
1238 let (ctx, stream_job_fragments) = self
1239 .inject_replace_table_job_for_table_sink(
1240 tmp_id,
1241 &self.metadata_manager,
1242 stream_ctx,
1243 None,
1244 None,
1245 Some(sink_id),
1246 &streaming_job,
1247 fragment_graph,
1248 )
1249 .await?;
1250
1251 let result: MetaResult<_> = try {
1252 let replace_upstream = ctx.replace_upstream.clone();
1253
1254 self.metadata_manager
1255 .catalog_controller
1256 .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1257 .await?;
1258
1259 self.stream_manager
1260 .replace_stream_job(stream_job_fragments, ctx)
1261 .await?;
1262
1263 replace_upstream
1264 };
1265
1266 version = match result {
1267 Ok(replace_upstream) => {
1268 let version = self
1269 .metadata_manager
1270 .catalog_controller
1271 .finish_replace_streaming_job(
1272 tmp_id as _,
1273 streaming_job,
1274 replace_upstream,
1275 None,
1276 SinkIntoTableContext {
1277 creating_sink_id: None,
1278 dropping_sink_id: Some(sink_id),
1279 updated_sink_catalogs: vec![],
1280 },
1281 None, )
1283 .await?;
1284 Ok(version)
1285 }
1286 Err(err) => {
1287 tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table");
1288 let _ = self.metadata_manager
1289 .catalog_controller
1290 .try_abort_replacing_streaming_job(tmp_id as _)
1291 .await
1292 .inspect_err(|err| {
1293 tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table");
1294 });
1295 Err(err)
1296 }
1297 }?;
1298 }
1299
1300 let ReleaseContext {
1301 database_id,
1302 removed_streaming_job_ids,
1303 removed_state_table_ids,
1304 removed_source_ids,
1305 removed_secret_ids: secret_ids,
1306 removed_source_fragments,
1307 removed_actors,
1308 removed_fragments,
1309 } = release_ctx;
1310
1311 let _guard = self.source_manager.pause_tick().await;
1312 self.stream_manager
1313 .drop_streaming_jobs(
1314 risingwave_common::catalog::DatabaseId::new(database_id as _),
1315 removed_actors.iter().map(|id| *id as _).collect(),
1316 removed_streaming_job_ids,
1317 removed_state_table_ids,
1318 removed_fragments.iter().map(|id| *id as _).collect(),
1319 )
1320 .await;
1321
1322 self.source_manager
1325 .apply_source_change(SourceChange::DropSource {
1326 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1327 })
1328 .await;
1329
1330 let dropped_source_fragments = removed_source_fragments
1333 .into_iter()
1334 .map(|(source_id, fragments)| {
1335 (
1336 source_id,
1337 fragments.into_iter().map(|id| id as u32).collect(),
1338 )
1339 })
1340 .collect();
1341 let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
1342 self.source_manager
1343 .apply_source_change(SourceChange::DropMv {
1344 dropped_source_fragments,
1345 dropped_actors,
1346 })
1347 .await;
1348
1349 for secret in secret_ids {
1351 LocalSecretManager::global().remove_secret(secret as _);
1352 }
1353 Ok(version)
1354 }
1355
1356 pub async fn replace_job(
1358 &self,
1359 mut streaming_job: StreamingJob,
1360 fragment_graph: StreamFragmentGraphProto,
1361 col_index_mapping: Option<ColIndexMapping>,
1362 ) -> MetaResult<NotificationVersion> {
1363 match &mut streaming_job {
1364 StreamingJob::Table(..) | StreamingJob::Source(..) => {}
1365 StreamingJob::MaterializedView(..)
1366 | StreamingJob::Sink(..)
1367 | StreamingJob::Index(..) => {
1368 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1369 }
1370 }
1371
1372 let job_id = streaming_job.id();
1373
1374 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1375 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1376
1377 let original_max_parallelism = self
1379 .metadata_manager
1380 .get_job_max_parallelism(streaming_job.id().into())
1381 .await?;
1382 let fragment_graph = PbStreamFragmentGraph {
1383 max_parallelism: original_max_parallelism as _,
1384 ..fragment_graph
1385 };
1386
1387 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1389 streaming_job.set_info_from_graph(&fragment_graph);
1390
1391 let streaming_job = streaming_job;
1393
1394 let tmp_id = self
1395 .metadata_manager
1396 .catalog_controller
1397 .create_job_catalog_for_replace(
1398 &streaming_job,
1399 &ctx,
1400 &fragment_graph.specified_parallelism(),
1401 fragment_graph.max_parallelism(),
1402 )
1403 .await?;
1404
1405 tracing::debug!(id = job_id, "building replace streaming job");
1406 let mut updated_sink_catalogs = vec![];
1407
1408 let mut drop_table_connector_ctx = None;
1409 let result: MetaResult<_> = try {
1410 let (mut ctx, mut stream_job_fragments) = self
1411 .build_replace_job(
1412 ctx,
1413 &streaming_job,
1414 fragment_graph,
1415 col_index_mapping.as_ref(),
1416 tmp_id as _,
1417 )
1418 .await?;
1419 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1420
1421 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1422 let catalogs = self
1423 .metadata_manager
1424 .get_sink_catalog_by_ids(&table.incoming_sinks)
1425 .await?;
1426
1427 for sink in catalogs {
1428 let sink_id = &sink.id;
1429
1430 let sink_table_fragments = self
1431 .metadata_manager
1432 .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(
1433 *sink_id,
1434 ))
1435 .await?;
1436
1437 let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
1438
1439 Self::inject_replace_table_plan_for_sink(
1440 *sink_id,
1441 &sink_fragment,
1442 table,
1443 &mut ctx,
1444 stream_job_fragments.inner.union_fragment_for_table(),
1445 Some(&sink.unique_identity()),
1446 );
1447
1448 if sink.original_target_columns.is_empty() {
1449 updated_sink_catalogs.push(sink.id as _);
1450 }
1451 }
1452 }
1453
1454 let replace_upstream = ctx.replace_upstream.clone();
1455
1456 self.metadata_manager
1457 .catalog_controller
1458 .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1459 .await?;
1460
1461 self.stream_manager
1462 .replace_stream_job(stream_job_fragments, ctx)
1463 .await?;
1464 replace_upstream
1465 };
1466
1467 match result {
1468 Ok(replace_upstream) => {
1469 let version = self
1470 .metadata_manager
1471 .catalog_controller
1472 .finish_replace_streaming_job(
1473 tmp_id,
1474 streaming_job,
1475 replace_upstream,
1476 col_index_mapping,
1477 SinkIntoTableContext {
1478 creating_sink_id: None,
1479 dropping_sink_id: None,
1480 updated_sink_catalogs,
1481 },
1482 drop_table_connector_ctx.as_ref(),
1483 )
1484 .await?;
1485 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1486 self.source_manager
1487 .apply_source_change(SourceChange::DropSource {
1488 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1489 })
1490 .await;
1491 }
1492 Ok(version)
1493 }
1494 Err(err) => {
1495 tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
1496 let _ = self.metadata_manager
1497 .catalog_controller
1498 .try_abort_replacing_streaming_job(tmp_id)
1499 .await.inspect_err(|err| {
1500 tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
1501 });
1502 Err(err)
1503 }
1504 }
1505 }
1506
1507 async fn drop_streaming_job(
1508 &self,
1509 job_id: StreamingJobId,
1510 drop_mode: DropMode,
1511 target_replace_info: Option<ReplaceStreamJobInfo>,
1512 ) -> MetaResult<NotificationVersion> {
1513 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1514
1515 let (object_id, object_type) = match job_id {
1516 StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
1517 StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
1518 StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
1519 StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
1520 };
1521
1522 let version = self
1523 .drop_object(object_type, object_id, drop_mode, target_replace_info)
1524 .await?;
1525 #[cfg(not(madsim))]
1526 if let StreamingJobId::Sink(sink_id) = job_id {
1527 let db = self.env.meta_store_ref().conn.clone();
1530 clean_all_rows_by_sink_id(&db, sink_id).await?;
1531 }
1532 Ok(version)
1533 }
1534
1535 fn resolve_stream_parallelism(
1539 &self,
1540 specified: Option<NonZeroUsize>,
1541 max: NonZeroUsize,
1542 cluster_info: &StreamingClusterInfo,
1543 resource_group: String,
1544 ) -> MetaResult<NonZeroUsize> {
1545 let available = cluster_info.parallelism(resource_group);
1546 let Some(available) = NonZeroUsize::new(available) else {
1547 bail_unavailable!("no available slots to schedule");
1548 };
1549
1550 if let Some(specified) = specified {
1551 if specified > max {
1552 bail_invalid_parameter!(
1553 "specified parallelism {} should not exceed max parallelism {}",
1554 specified,
1555 max,
1556 );
1557 }
1558 if specified > available {
1559 bail_unavailable!(
1560 "not enough parallelism to schedule, required: {}, available: {}",
1561 specified,
1562 available,
1563 );
1564 }
1565 Ok(specified)
1566 } else {
1567 let default_parallelism = match self.env.opts.default_parallelism {
1569 DefaultParallelism::Full => available,
1570 DefaultParallelism::Default(num) => {
1571 if num > available {
1572 bail_unavailable!(
1573 "not enough parallelism to schedule, required: {}, available: {}",
1574 num,
1575 available,
1576 );
1577 }
1578 num
1579 }
1580 };
1581
1582 if default_parallelism > max {
1583 tracing::warn!(
1584 "too many parallelism available, use max parallelism {} instead",
1585 max
1586 );
1587 }
1588 Ok(default_parallelism.min(max))
1589 }
1590 }
1591
1592 pub(crate) async fn build_stream_job(
1597 &self,
1598 stream_ctx: StreamContext,
1599 mut stream_job: StreamingJob,
1600 fragment_graph: StreamFragmentGraph,
1601 affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>,
1602 specific_resource_group: Option<String>,
1603 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1604 let id = stream_job.id();
1605 let specified_parallelism = fragment_graph.specified_parallelism();
1606 let expr_context = stream_ctx.to_expr_context();
1607 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1608
1609 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1613 fragment_graph.collect_snapshot_backfill_info()?;
1614
1615 self.metadata_manager
1617 .catalog_controller
1618 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1619 .await?;
1620
1621 let upstream_table_ids = fragment_graph
1622 .dependent_table_ids()
1623 .iter()
1624 .filter(|id| {
1625 !cross_db_snapshot_backfill_info
1626 .upstream_mv_table_id_to_backfill_epoch
1627 .contains_key(id)
1628 })
1629 .cloned()
1630 .collect();
1631
1632 let (upstream_root_fragments, existing_actor_location) = self
1633 .metadata_manager
1634 .get_upstream_root_fragments(&upstream_table_ids)
1635 .await?;
1636
1637 if snapshot_backfill_info.is_some() {
1638 if stream_job.create_type() == CreateType::Background {
1639 return Err(anyhow!("snapshot_backfill must be used as Foreground mode").into());
1640 }
1641 match stream_job {
1642 StreamingJob::MaterializedView(_)
1643 | StreamingJob::Sink(_, _)
1644 | StreamingJob::Index(_, _) => {}
1645 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1646 return Err(
1647 anyhow!("snapshot_backfill not enabled for table and source").into(),
1648 );
1649 }
1650 }
1651 }
1652
1653 let upstream_actors = upstream_root_fragments
1654 .values()
1655 .map(|fragment| {
1656 (
1657 fragment.fragment_id,
1658 fragment.actors.iter().map(|actor| actor.actor_id).collect(),
1659 )
1660 })
1661 .collect();
1662
1663 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1664 fragment_graph,
1665 upstream_root_fragments,
1666 existing_actor_location,
1667 (&stream_job).into(),
1668 )?;
1669
1670 let resource_group = match specific_resource_group {
1671 None => {
1672 self.metadata_manager
1673 .get_database_resource_group(stream_job.database_id() as ObjectId)
1674 .await?
1675 }
1676 Some(resource_group) => resource_group,
1677 };
1678
1679 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1681
1682 let parallelism = self.resolve_stream_parallelism(
1683 specified_parallelism,
1684 max_parallelism,
1685 &cluster_info,
1686 resource_group.clone(),
1687 )?;
1688
1689 let parallelism = self
1690 .env
1691 .system_params_reader()
1692 .await
1693 .adaptive_parallelism_strategy()
1694 .compute_target_parallelism(parallelism.get());
1695
1696 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1697 let actor_graph_builder = ActorGraphBuilder::new(
1698 id,
1699 resource_group,
1700 complete_graph,
1701 cluster_info,
1702 parallelism,
1703 )?;
1704
1705 let ActorGraphBuildResult {
1706 graph,
1707 downstream_fragment_relations,
1708 building_locations,
1709 existing_locations,
1710 upstream_fragment_downstreams,
1711 new_no_shuffle,
1712 replace_upstream,
1713 ..
1714 } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
1715 assert!(replace_upstream.is_empty());
1716
1717 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1724 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1725 _ => TableParallelism::Fixed(parallelism.get()),
1726 };
1727
1728 let stream_job_fragments = StreamJobFragments::new(
1729 id.into(),
1730 graph,
1731 &building_locations.actor_locations,
1732 stream_ctx.clone(),
1733 table_parallelism,
1734 max_parallelism.get(),
1735 );
1736 let internal_tables = stream_job_fragments.internal_tables();
1737
1738 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1739 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1740 }
1741
1742 let replace_table_job_info = match affected_table_replace_info {
1743 Some((table_stream_job, fragment_graph)) => {
1744 if snapshot_backfill_info.is_some() {
1745 return Err(anyhow!(
1746 "snapshot backfill should not have replace table info: {table_stream_job:?}"
1747 )
1748 .into());
1749 }
1750 let StreamingJob::Sink(sink, target_table) = &mut stream_job else {
1751 bail!("additional replace table event only occurs when sinking into table");
1752 };
1753
1754 table_stream_job.table().expect("should be table job");
1755 let tmp_id = self
1756 .metadata_manager
1757 .catalog_controller
1758 .create_job_catalog_for_replace(
1759 &table_stream_job,
1760 &stream_ctx,
1761 &fragment_graph.specified_parallelism(),
1762 fragment_graph.max_parallelism(),
1763 )
1764 .await? as u32;
1765
1766 let (context, table_fragments) = self
1767 .inject_replace_table_job_for_table_sink(
1768 tmp_id,
1769 &self.metadata_manager,
1770 stream_ctx,
1771 Some(sink),
1772 Some(&stream_job_fragments),
1773 None,
1774 &table_stream_job,
1775 fragment_graph,
1776 )
1777 .await?;
1778 must_match!(&table_stream_job, StreamingJob::Table(source, table, _) => {
1782 *target_table = Some((table.clone(), source.clone()));
1784 });
1785
1786 Some((table_stream_job, context, table_fragments))
1787 }
1788 None => None,
1789 };
1790
1791 let ctx = CreateStreamingJobContext {
1792 upstream_fragment_downstreams,
1793 new_no_shuffle,
1794 upstream_actors,
1795 internal_tables,
1796 building_locations,
1797 existing_locations,
1798 definition: stream_job.definition(),
1799 mv_table_id: stream_job.mv_table(),
1800 create_type: stream_job.create_type(),
1801 job_type: (&stream_job).into(),
1802 streaming_job: stream_job,
1803 replace_table_job_info,
1804 option: CreateStreamingJobOption {},
1805 snapshot_backfill_info,
1806 cross_db_snapshot_backfill_info,
1807 };
1808
1809 Ok((
1810 ctx,
1811 StreamJobFragmentsToCreate {
1812 inner: stream_job_fragments,
1813 downstreams: downstream_fragment_relations,
1814 },
1815 ))
1816 }
1817
1818 pub(crate) async fn build_replace_job(
1824 &self,
1825 stream_ctx: StreamContext,
1826 stream_job: &StreamingJob,
1827 mut fragment_graph: StreamFragmentGraph,
1828 col_index_mapping: Option<&ColIndexMapping>,
1829 tmp_job_id: TableId,
1830 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1831 match &stream_job {
1832 StreamingJob::Table(..) | StreamingJob::Source(..) => {}
1833 StreamingJob::MaterializedView(..)
1834 | StreamingJob::Sink(..)
1835 | StreamingJob::Index(..) => {
1836 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1837 }
1838 }
1839
1840 let id = stream_job.id();
1841 let expr_context = stream_ctx.to_expr_context();
1842
1843 let mut drop_table_associated_source_id = None;
1845 if let StreamingJob::Table(None, _, _) = &stream_job {
1846 drop_table_associated_source_id = self
1847 .metadata_manager
1848 .get_table_associated_source_id(id as _)
1849 .await?;
1850 }
1851
1852 let old_fragments = self
1853 .metadata_manager
1854 .get_job_fragments_by_id(&id.into())
1855 .await?;
1856 let old_internal_table_ids = old_fragments.internal_table_ids();
1857
1858 let mut drop_table_connector_ctx = None;
1860 if drop_table_associated_source_id.is_some() {
1861 debug_assert!(old_internal_table_ids.len() == 1);
1863
1864 drop_table_connector_ctx = Some(DropTableConnectorContext {
1865 to_change_streaming_job_id: id as i32,
1868 to_remove_state_table_id: old_internal_table_ids[0] as i32, to_remove_source_id: drop_table_associated_source_id.unwrap(),
1870 });
1871 } else {
1872 let old_internal_tables = self
1873 .metadata_manager
1874 .get_table_catalog_by_ids(old_internal_table_ids)
1875 .await?;
1876 fragment_graph.fit_internal_table_ids(old_internal_tables)?;
1877 }
1878
1879 let original_root_fragment = old_fragments
1882 .root_fragment()
1883 .expect("root fragment not found");
1884
1885 let job_type = StreamingJobType::from(stream_job);
1886
1887 let (mut downstream_fragments, downstream_actor_location) =
1889 self.metadata_manager.get_downstream_fragments(id).await?;
1890 if let Some(mapping) = &col_index_mapping {
1891 for (d, _f) in &mut downstream_fragments {
1892 *d = mapping.rewrite_dispatch_strategy(d).ok_or_else(|| {
1893 MetaError::invalid_parameter(
1896 "unable to drop or alter the column due to being referenced by downstream materialized views or sinks",
1897 )
1898 })?;
1899 }
1900 }
1901
1902 let complete_graph = match &job_type {
1904 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
1905 CompleteStreamFragmentGraph::with_downstreams(
1906 fragment_graph,
1907 original_root_fragment.fragment_id,
1908 downstream_fragments,
1909 downstream_actor_location,
1910 job_type,
1911 )?
1912 }
1913 StreamingJobType::Table(TableJobType::SharedCdcSource) => {
1914 let (upstream_root_fragments, upstream_actor_location) = self
1916 .metadata_manager
1917 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
1918 .await?;
1919
1920 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
1921 fragment_graph,
1922 upstream_root_fragments,
1923 upstream_actor_location,
1924 original_root_fragment.fragment_id,
1925 downstream_fragments,
1926 downstream_actor_location,
1927 job_type,
1928 )?
1929 }
1930 _ => unreachable!(),
1931 };
1932
1933 let resource_group = self
1934 .metadata_manager
1935 .get_existing_job_resource_group(id as ObjectId)
1936 .await?;
1937
1938 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1940
1941 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
1944 .expect("The number of actors in the original table fragment should be greater than 0");
1945
1946 let actor_graph_builder = ActorGraphBuilder::new(
1947 id,
1948 resource_group,
1949 complete_graph,
1950 cluster_info,
1951 parallelism,
1952 )?;
1953
1954 let ActorGraphBuildResult {
1955 graph,
1956 downstream_fragment_relations,
1957 building_locations,
1958 existing_locations,
1959 upstream_fragment_downstreams,
1960 replace_upstream,
1961 new_no_shuffle,
1962 ..
1963 } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
1964
1965 if matches!(
1967 job_type,
1968 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
1969 ) {
1970 assert!(upstream_fragment_downstreams.is_empty());
1971 }
1972
1973 let stream_job_fragments = StreamJobFragments::new(
1977 (tmp_job_id as u32).into(),
1978 graph,
1979 &building_locations.actor_locations,
1980 stream_ctx,
1981 old_fragments.assigned_parallelism,
1982 old_fragments.max_parallelism,
1983 );
1984
1985 let ctx = ReplaceStreamJobContext {
1989 old_fragments,
1990 replace_upstream,
1991 new_no_shuffle,
1992 upstream_fragment_downstreams,
1993 building_locations,
1994 existing_locations,
1995 streaming_job: stream_job.clone(),
1996 tmp_id: tmp_job_id as _,
1997 drop_table_connector_ctx,
1998 };
1999
2000 Ok((
2001 ctx,
2002 StreamJobFragmentsToCreate {
2003 inner: stream_job_fragments,
2004 downstreams: downstream_fragment_relations,
2005 },
2006 ))
2007 }
2008
2009 async fn alter_name(
2010 &self,
2011 relation: alter_name_request::Object,
2012 new_name: &str,
2013 ) -> MetaResult<NotificationVersion> {
2014 let (obj_type, id) = match relation {
2015 alter_name_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2016 alter_name_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2017 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id as ObjectId),
2018 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2019 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2020 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2021 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2022 alter_name_request::Object::SubscriptionId(id) => {
2023 (ObjectType::Subscription, id as ObjectId)
2024 }
2025 };
2026 self.metadata_manager
2027 .catalog_controller
2028 .alter_name(obj_type, id, new_name)
2029 .await
2030 }
2031
2032 async fn alter_swap_rename(
2033 &self,
2034 object: alter_swap_rename_request::Object,
2035 ) -> MetaResult<NotificationVersion> {
2036 let (obj_type, src_id, dst_id) = match object {
2037 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2038 alter_swap_rename_request::Object::Table(objs) => {
2039 let (src_id, dst_id) = (
2040 objs.src_object_id as ObjectId,
2041 objs.dst_object_id as ObjectId,
2042 );
2043 (ObjectType::Table, src_id, dst_id)
2044 }
2045 alter_swap_rename_request::Object::View(objs) => {
2046 let (src_id, dst_id) = (
2047 objs.src_object_id as ObjectId,
2048 objs.dst_object_id as ObjectId,
2049 );
2050 (ObjectType::View, src_id, dst_id)
2051 }
2052 alter_swap_rename_request::Object::Source(objs) => {
2053 let (src_id, dst_id) = (
2054 objs.src_object_id as ObjectId,
2055 objs.dst_object_id as ObjectId,
2056 );
2057 (ObjectType::Source, src_id, dst_id)
2058 }
2059 alter_swap_rename_request::Object::Sink(objs) => {
2060 let (src_id, dst_id) = (
2061 objs.src_object_id as ObjectId,
2062 objs.dst_object_id as ObjectId,
2063 );
2064 (ObjectType::Sink, src_id, dst_id)
2065 }
2066 alter_swap_rename_request::Object::Subscription(objs) => {
2067 let (src_id, dst_id) = (
2068 objs.src_object_id as ObjectId,
2069 objs.dst_object_id as ObjectId,
2070 );
2071 (ObjectType::Subscription, src_id, dst_id)
2072 }
2073 };
2074
2075 self.metadata_manager
2076 .catalog_controller
2077 .alter_swap_rename(obj_type, src_id, dst_id)
2078 .await
2079 }
2080
2081 async fn alter_owner(
2082 &self,
2083 object: Object,
2084 owner_id: UserId,
2085 ) -> MetaResult<NotificationVersion> {
2086 let (obj_type, id) = match object {
2087 Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2088 Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2089 Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2090 Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2091 Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2092 Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2093 Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId),
2094 Object::ConnectionId(id) => (ObjectType::Connection, id as ObjectId),
2095 };
2096 self.metadata_manager
2097 .catalog_controller
2098 .alter_owner(obj_type, id, owner_id as _)
2099 .await
2100 }
2101
2102 async fn alter_set_schema(
2103 &self,
2104 object: alter_set_schema_request::Object,
2105 new_schema_id: SchemaId,
2106 ) -> MetaResult<NotificationVersion> {
2107 let (obj_type, id) = match object {
2108 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2109 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2110 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2111 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2112 alter_set_schema_request::Object::FunctionId(id) => {
2113 (ObjectType::Function, id as ObjectId)
2114 }
2115 alter_set_schema_request::Object::ConnectionId(id) => {
2116 (ObjectType::Connection, id as ObjectId)
2117 }
2118 alter_set_schema_request::Object::SubscriptionId(id) => {
2119 (ObjectType::Subscription, id as ObjectId)
2120 }
2121 };
2122 self.metadata_manager
2123 .catalog_controller
2124 .alter_schema(obj_type, id, new_schema_id as _)
2125 .await
2126 }
2127
2128 pub async fn wait(&self) -> MetaResult<()> {
2129 let timeout_ms = 30 * 60 * 1000;
2130 for _ in 0..timeout_ms {
2131 if self
2132 .metadata_manager
2133 .catalog_controller
2134 .list_background_creating_mviews(true)
2135 .await?
2136 .is_empty()
2137 {
2138 return Ok(());
2139 }
2140
2141 sleep(Duration::from_millis(1)).await;
2142 }
2143 Err(MetaError::cancelled(format!(
2144 "timeout after {timeout_ms}ms"
2145 )))
2146 }
2147
2148 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2149 self.metadata_manager
2150 .catalog_controller
2151 .comment_on(comment)
2152 .await
2153 }
2154}
2155
2156fn report_create_object(
2157 catalog_id: u32,
2158 event_name: &str,
2159 obj_type: PbTelemetryDatabaseObject,
2160 connector_name: Option<String>,
2161 attr_info: Option<jsonbb::Value>,
2162) {
2163 report_event(
2164 PbTelemetryEventStage::CreateStreamJob,
2165 event_name,
2166 catalog_id.into(),
2167 connector_name,
2168 Some(obj_type),
2169 attr_info,
2170 );
2171}
2172
2173async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> {
2174 match Entity::delete_many()
2175 .filter(Column::SinkId.eq(sink_id))
2176 .exec(db)
2177 .await
2178 {
2179 Ok(result) => {
2180 let deleted_count = result.rows_affected;
2181
2182 tracing::info!(
2183 "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2184 deleted_count,
2185 sink_id
2186 );
2187 Ok(())
2188 }
2189 Err(e) => {
2190 tracing::error!(
2191 "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2192 sink_id,
2193 e.as_report()
2194 );
2195 Err(e.into())
2196 }
2197 }
2198}