1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::{Context, anyhow};
22use itertools::Itertools;
23use risingwave_common::config::DefaultParallelism;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::column_index_mapping::ColIndexMapping;
28use risingwave_common::util::stream_graph_visitor::{
29 visit_stream_node, visit_stream_node_cont_mut,
30};
31use risingwave_common::{bail, bail_not_implemented, must_match};
32use risingwave_connector::WithOptionsSecResolved;
33use risingwave_connector::connector_common::validate_connection;
34use risingwave_connector::source::{
35 ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
36};
37use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::{
40 ConnectionId, DatabaseId, DispatcherType, FunctionId, IndexId, ObjectId, SchemaId, SecretId,
41 SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId,
42};
43use risingwave_pb::catalog::{
44 Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
45 Subscription, Table, View,
46};
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::{
49 DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
50 alter_swap_rename_request,
51};
52use risingwave_pb::stream_plan::stream_node::NodeBody;
53use risingwave_pb::stream_plan::{
54 FragmentTypeFlag, MergeNode, PbDispatcherType, PbStreamFragmentGraph,
55 StreamFragmentGraph as StreamFragmentGraphProto,
56};
57use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
58use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
59use thiserror_ext::AsReport;
60use tokio::sync::{Semaphore, oneshot};
61use tokio::time::sleep;
62use tracing::Instrument;
63
64use crate::barrier::BarrierManagerRef;
65use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
66use crate::controller::cluster::StreamingClusterInfo;
67use crate::controller::streaming_job::SinkIntoTableContext;
68use crate::error::{bail_invalid_parameter, bail_unavailable};
69use crate::manager::{
70 IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
71 NotificationVersion, StreamingJob, StreamingJobType,
72};
73use crate::model::{
74 DownstreamFragmentRelation, Fragment, StreamContext, StreamJobFragments,
75 StreamJobFragmentsToCreate, TableParallelism,
76};
77use crate::stream::{
78 ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph,
79 CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef,
80 JobRescheduleTarget, ReplaceStreamJobContext, SourceChange, SourceManagerRef,
81 StreamFragmentGraph, create_source_worker, validate_sink,
82};
83use crate::telemetry::report_event;
84use crate::{MetaError, MetaResult};
85
86#[derive(PartialEq)]
87pub enum DropMode {
88 Restrict,
89 Cascade,
90}
91
92impl DropMode {
93 pub fn from_request_setting(cascade: bool) -> DropMode {
94 if cascade {
95 DropMode::Cascade
96 } else {
97 DropMode::Restrict
98 }
99 }
100}
101
102pub enum StreamingJobId {
103 MaterializedView(TableId),
104 Sink(SinkId),
105 Table(Option<SourceId>, TableId),
106 Index(IndexId),
107}
108
109impl StreamingJobId {
110 #[allow(dead_code)]
111 fn id(&self) -> TableId {
112 match self {
113 StreamingJobId::MaterializedView(id)
114 | StreamingJobId::Sink(id)
115 | StreamingJobId::Table(_, id)
116 | StreamingJobId::Index(id) => *id,
117 }
118 }
119}
120
121pub struct ReplaceStreamJobInfo {
124 pub streaming_job: StreamingJob,
125 pub fragment_graph: StreamFragmentGraphProto,
126 pub col_index_mapping: Option<ColIndexMapping>,
127}
128
129pub enum DdlCommand {
130 CreateDatabase(Database),
131 DropDatabase(DatabaseId),
132 CreateSchema(Schema),
133 DropSchema(SchemaId, DropMode),
134 CreateNonSharedSource(Source),
135 DropSource(SourceId, DropMode),
136 CreateFunction(Function),
137 DropFunction(FunctionId),
138 CreateView(View),
139 DropView(ViewId, DropMode),
140 CreateStreamingJob(
141 StreamingJob,
142 StreamFragmentGraphProto,
143 CreateType,
144 Option<ReplaceStreamJobInfo>,
145 HashSet<ObjectId>,
146 Option<String>, ),
148 DropStreamingJob(StreamingJobId, DropMode, Option<ReplaceStreamJobInfo>),
149 AlterName(alter_name_request::Object, String),
150 AlterSwapRename(alter_swap_rename_request::Object),
151 ReplaceStreamJob(ReplaceStreamJobInfo),
152 AlterNonSharedSource(Source),
153 AlterObjectOwner(Object, UserId),
154 AlterSetSchema(alter_set_schema_request::Object, SchemaId),
155 CreateConnection(Connection),
156 DropConnection(ConnectionId),
157 CreateSecret(Secret),
158 AlterSecret(Secret),
159 DropSecret(SecretId),
160 CommentOn(Comment),
161 CreateSubscription(Subscription),
162 DropSubscription(SubscriptionId, DropMode),
163}
164
165impl DdlCommand {
166 fn allow_in_recovery(&self) -> bool {
167 match self {
168 DdlCommand::DropDatabase(_)
169 | DdlCommand::DropSchema(_, _)
170 | DdlCommand::DropSource(_, _)
171 | DdlCommand::DropFunction(_)
172 | DdlCommand::DropView(_, _)
173 | DdlCommand::DropStreamingJob(_, _, _)
174 | DdlCommand::DropConnection(_)
175 | DdlCommand::DropSecret(_)
176 | DdlCommand::DropSubscription(_, _)
177 | DdlCommand::AlterName(_, _)
178 | DdlCommand::AlterObjectOwner(_, _)
179 | DdlCommand::AlterSetSchema(_, _)
180 | DdlCommand::CreateDatabase(_)
181 | DdlCommand::CreateSchema(_)
182 | DdlCommand::CreateFunction(_)
183 | DdlCommand::CreateView(_)
184 | DdlCommand::CreateConnection(_)
185 | DdlCommand::CommentOn(_)
186 | DdlCommand::CreateSecret(_)
187 | DdlCommand::AlterSecret(_)
188 | DdlCommand::AlterSwapRename(_) => true,
189 DdlCommand::CreateStreamingJob(_, _, _, _, _, _)
190 | DdlCommand::CreateNonSharedSource(_)
191 | DdlCommand::ReplaceStreamJob(_)
192 | DdlCommand::AlterNonSharedSource(_)
193 | DdlCommand::CreateSubscription(_) => false,
194 }
195 }
196}
197
198#[derive(Clone)]
199pub struct DdlController {
200 pub(crate) env: MetaSrvEnv,
201
202 pub(crate) metadata_manager: MetadataManager,
203 pub(crate) stream_manager: GlobalStreamManagerRef,
204 pub(crate) source_manager: SourceManagerRef,
205 barrier_manager: BarrierManagerRef,
206
207 pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
209}
210
211#[derive(Clone)]
212pub struct CreatingStreamingJobPermit {
213 pub(crate) semaphore: Arc<Semaphore>,
214}
215
216impl CreatingStreamingJobPermit {
217 async fn new(env: &MetaSrvEnv) -> Self {
218 let mut permits = env
219 .system_params_reader()
220 .await
221 .max_concurrent_creating_streaming_jobs() as usize;
222 if permits == 0 {
223 permits = Semaphore::MAX_PERMITS;
225 }
226 let semaphore = Arc::new(Semaphore::new(permits));
227
228 let (local_notification_tx, mut local_notification_rx) =
229 tokio::sync::mpsc::unbounded_channel();
230 env.notification_manager()
231 .insert_local_sender(local_notification_tx)
232 .await;
233 let semaphore_clone = semaphore.clone();
234 tokio::spawn(async move {
235 while let Some(notification) = local_notification_rx.recv().await {
236 let LocalNotification::SystemParamsChange(p) = ¬ification else {
237 continue;
238 };
239 let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
240 if new_permits == 0 {
241 new_permits = Semaphore::MAX_PERMITS;
242 }
243 match permits.cmp(&new_permits) {
244 Ordering::Less => {
245 semaphore_clone.add_permits(new_permits - permits);
246 }
247 Ordering::Equal => continue,
248 Ordering::Greater => {
249 semaphore_clone
250 .acquire_many((permits - new_permits) as u32)
251 .await
252 .unwrap()
253 .forget();
254 }
255 }
256 tracing::info!(
257 "max_concurrent_creating_streaming_jobs changed from {} to {}",
258 permits,
259 new_permits
260 );
261 permits = new_permits;
262 }
263 });
264
265 Self { semaphore }
266 }
267}
268
269impl DdlController {
270 pub async fn new(
271 env: MetaSrvEnv,
272 metadata_manager: MetadataManager,
273 stream_manager: GlobalStreamManagerRef,
274 source_manager: SourceManagerRef,
275 barrier_manager: BarrierManagerRef,
276 ) -> Self {
277 let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
278 Self {
279 env,
280 metadata_manager,
281 stream_manager,
282 source_manager,
283 barrier_manager,
284 creating_streaming_job_permits,
285 }
286 }
287
288 pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
295 if !command.allow_in_recovery() {
296 self.barrier_manager.check_status_running()?;
297 }
298 let ctrl = self.clone();
299 let fut = async move {
300 match command {
301 DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
302 DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
303 DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
304 DdlCommand::DropSchema(schema_id, drop_mode) => {
305 ctrl.drop_schema(schema_id, drop_mode).await
306 }
307 DdlCommand::CreateNonSharedSource(source) => {
308 ctrl.create_non_shared_source(source).await
309 }
310 DdlCommand::DropSource(source_id, drop_mode) => {
311 ctrl.drop_source(source_id, drop_mode).await
312 }
313 DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
314 DdlCommand::DropFunction(function_id) => ctrl.drop_function(function_id).await,
315 DdlCommand::CreateView(view) => ctrl.create_view(view).await,
316 DdlCommand::DropView(view_id, drop_mode) => {
317 ctrl.drop_view(view_id, drop_mode).await
318 }
319 DdlCommand::CreateStreamingJob(
320 stream_job,
321 fragment_graph,
322 _create_type,
323 affected_table_replace_info,
324 dependencies,
325 specific_resource_group,
326 ) => {
327 ctrl.create_streaming_job(
328 stream_job,
329 fragment_graph,
330 affected_table_replace_info,
331 dependencies,
332 specific_resource_group,
333 )
334 .await
335 }
336 DdlCommand::DropStreamingJob(job_id, drop_mode, target_replace_info) => {
337 ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info)
338 .await
339 }
340 DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
341 streaming_job,
342 fragment_graph,
343 col_index_mapping,
344 }) => {
345 ctrl.replace_job(streaming_job, fragment_graph, col_index_mapping)
346 .await
347 }
348 DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
349 DdlCommand::AlterObjectOwner(object, owner_id) => {
350 ctrl.alter_owner(object, owner_id).await
351 }
352 DdlCommand::AlterSetSchema(object, new_schema_id) => {
353 ctrl.alter_set_schema(object, new_schema_id).await
354 }
355 DdlCommand::CreateConnection(connection) => {
356 ctrl.create_connection(connection).await
357 }
358 DdlCommand::DropConnection(connection_id) => {
359 ctrl.drop_connection(connection_id).await
360 }
361 DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
362 DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
363 DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
364 DdlCommand::AlterNonSharedSource(source) => {
365 ctrl.alter_non_shared_source(source).await
366 }
367 DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
368 DdlCommand::CreateSubscription(subscription) => {
369 ctrl.create_subscription(subscription).await
370 }
371 DdlCommand::DropSubscription(subscription_id, drop_mode) => {
372 ctrl.drop_subscription(subscription_id, drop_mode).await
373 }
374 DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
375 }
376 }
377 .in_current_span();
378 let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
379 Ok(Some(WaitVersion {
380 catalog_version: notification_version,
381 hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
382 }))
383 }
384
385 pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
386 self.barrier_manager.get_ddl_progress().await
387 }
388
389 async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
390 self.metadata_manager
391 .catalog_controller
392 .create_database(database)
393 .await
394 }
395
396 #[tracing::instrument(skip(self), level = "debug")]
397 pub async fn reschedule_streaming_job(
398 &self,
399 job_id: u32,
400 target: JobRescheduleTarget,
401 mut deferred: bool,
402 ) -> MetaResult<()> {
403 tracing::info!("alter parallelism");
404 if self.barrier_manager.check_status_running().is_err() {
405 tracing::info!(
406 "alter parallelism is set to deferred mode because the system is in recovery state"
407 );
408 deferred = true;
409 }
410
411 self.stream_manager
412 .reschedule_streaming_job(job_id, target, deferred)
413 .await
414 }
415
416 async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
417 self.drop_object(
418 ObjectType::Database,
419 database_id as _,
420 DropMode::Cascade,
421 None,
422 )
423 .await
424 }
425
426 async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
427 self.metadata_manager
428 .catalog_controller
429 .create_schema(schema)
430 .await
431 }
432
433 async fn drop_schema(
434 &self,
435 schema_id: SchemaId,
436 drop_mode: DropMode,
437 ) -> MetaResult<NotificationVersion> {
438 self.drop_object(ObjectType::Schema, schema_id as _, drop_mode, None)
439 .await
440 }
441
442 async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
444 let handle = create_source_worker(&source, self.source_manager.metrics.clone())
445 .await
446 .context("failed to create source worker")?;
447
448 let (source_id, version) = self
449 .metadata_manager
450 .catalog_controller
451 .create_source(source)
452 .await?;
453 self.source_manager
454 .register_source_with_handle(source_id, handle)
455 .await;
456 Ok(version)
457 }
458
459 async fn drop_source(
460 &self,
461 source_id: SourceId,
462 drop_mode: DropMode,
463 ) -> MetaResult<NotificationVersion> {
464 self.drop_object(ObjectType::Source, source_id as _, drop_mode, None)
465 .await
466 }
467
468 async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
471 self.metadata_manager
472 .catalog_controller
473 .alter_non_shared_source(source)
474 .await
475 }
476
477 async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
478 self.metadata_manager
479 .catalog_controller
480 .create_function(function)
481 .await
482 }
483
484 async fn drop_function(&self, function_id: FunctionId) -> MetaResult<NotificationVersion> {
485 self.drop_object(
486 ObjectType::Function,
487 function_id as _,
488 DropMode::Restrict,
489 None,
490 )
491 .await
492 }
493
494 async fn create_view(&self, view: View) -> MetaResult<NotificationVersion> {
495 self.metadata_manager
496 .catalog_controller
497 .create_view(view)
498 .await
499 }
500
501 async fn drop_view(
502 &self,
503 view_id: ViewId,
504 drop_mode: DropMode,
505 ) -> MetaResult<NotificationVersion> {
506 self.drop_object(ObjectType::View, view_id as _, drop_mode, None)
507 .await
508 }
509
510 async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
511 validate_connection(&connection).await?;
512 self.metadata_manager
513 .catalog_controller
514 .create_connection(connection)
515 .await
516 }
517
518 async fn drop_connection(
519 &self,
520 connection_id: ConnectionId,
521 ) -> MetaResult<NotificationVersion> {
522 self.drop_object(
523 ObjectType::Connection,
524 connection_id as _,
525 DropMode::Restrict,
526 None,
527 )
528 .await
529 }
530
531 fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
534 let secret_store_private_key = self
535 .env
536 .opts
537 .secret_store_private_key
538 .clone()
539 .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
540
541 let encrypted_payload = SecretEncryption::encrypt(
542 secret_store_private_key.as_slice(),
543 secret.get_value().as_slice(),
544 )
545 .context(format!("failed to encrypt secret {}", secret.name))?;
546 Ok(encrypted_payload
547 .serialize()
548 .context(format!("failed to serialize secret {}", secret.name))?)
549 }
550
551 async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
552 let secret_plain_payload = secret.value.clone();
555 let encrypted_payload = self.get_encrypted_payload(&secret)?;
556 secret.value = encrypted_payload;
557
558 self.metadata_manager
559 .catalog_controller
560 .create_secret(secret, secret_plain_payload)
561 .await
562 }
563
564 async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
565 self.drop_object(ObjectType::Secret, secret_id as _, DropMode::Restrict, None)
566 .await
567 }
568
569 async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
570 let secret_plain_payload = secret.value.clone();
571 let encrypted_payload = self.get_encrypted_payload(&secret)?;
572 secret.value = encrypted_payload;
573 self.metadata_manager
574 .catalog_controller
575 .alter_secret(secret, secret_plain_payload)
576 .await
577 }
578
579 async fn create_subscription(
580 &self,
581 mut subscription: Subscription,
582 ) -> MetaResult<NotificationVersion> {
583 tracing::debug!("create subscription");
584 let _permit = self
585 .creating_streaming_job_permits
586 .semaphore
587 .acquire()
588 .await
589 .unwrap();
590 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
591 self.metadata_manager
592 .catalog_controller
593 .create_subscription_catalog(&mut subscription)
594 .await?;
595 self.stream_manager
596 .create_subscription(&subscription)
597 .await
598 .inspect_err(|e| {
599 tracing::debug!(error = %e.as_report(), "cancel create subscription");
600 })?;
601
602 let version = self
603 .metadata_manager
604 .catalog_controller
605 .notify_create_subscription(subscription.id)
606 .await?;
607 tracing::debug!("finish create subscription");
608 Ok(version)
609 }
610
611 async fn drop_subscription(
612 &self,
613 subscription_id: SubscriptionId,
614 drop_mode: DropMode,
615 ) -> MetaResult<NotificationVersion> {
616 tracing::debug!("preparing drop subscription");
617 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
618 let subscription = self
619 .metadata_manager
620 .catalog_controller
621 .get_subscription_by_id(subscription_id)
622 .await?;
623 let table_id = subscription.dependent_table_id;
624 let database_id = subscription.database_id.into();
625 let (_, version) = self
626 .metadata_manager
627 .catalog_controller
628 .drop_object(ObjectType::Subscription, subscription_id as _, drop_mode)
629 .await?;
630 self.stream_manager
631 .drop_subscription(database_id, subscription_id as _, table_id)
632 .await;
633 tracing::debug!("finish drop subscription");
634 Ok(version)
635 }
636
637 pub(crate) async fn validate_cdc_table(
639 table: &Table,
640 table_fragments: &StreamJobFragments,
641 ) -> MetaResult<()> {
642 let stream_scan_fragment = table_fragments
643 .fragments
644 .values()
645 .filter(|f| f.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0)
646 .exactly_one()
647 .ok()
648 .with_context(|| {
649 format!(
650 "expect exactly one stream scan fragment, got: {:?}",
651 table_fragments.fragments
652 )
653 })?;
654
655 assert_eq!(
656 stream_scan_fragment.actors.len(),
657 1,
658 "Stream scan fragment should have only one actor"
659 );
660 let mut found_cdc_scan = false;
661 match &stream_scan_fragment.nodes.node_body {
662 Some(NodeBody::StreamCdcScan(_)) => {
663 if Self::validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
664 .await?
665 {
666 found_cdc_scan = true;
667 }
668 }
669 Some(NodeBody::Project(_)) => {
671 for input in &stream_scan_fragment.nodes.input {
672 if Self::validate_cdc_table_inner(&input.node_body, table.id).await? {
673 found_cdc_scan = true;
674 }
675 }
676 }
677 _ => {
678 bail!("Unexpected node body for stream cdc scan");
679 }
680 };
681 if !found_cdc_scan {
682 bail!("No stream cdc scan node found in stream scan fragment");
683 }
684 Ok(())
685 }
686
687 async fn validate_cdc_table_inner(
688 node_body: &Option<NodeBody>,
689 table_id: u32,
690 ) -> MetaResult<bool> {
691 if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
692 && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
693 {
694 let options_with_secret = WithOptionsSecResolved::new(
695 cdc_table_desc.connect_properties.clone(),
696 cdc_table_desc.secret_refs.clone(),
697 );
698
699 let mut props = ConnectorProperties::extract(options_with_secret, true)?;
700 props.init_from_pb_cdc_table_desc(cdc_table_desc);
701
702 let _enumerator = props
704 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
705 .await?;
706
707 tracing::debug!(?table_id, "validate cdc table success");
708 Ok(true)
709 } else {
710 Ok(false)
711 }
712 }
713
714 pub(crate) async fn inject_replace_table_job_for_table_sink(
718 &self,
719 tmp_id: u32,
720 mgr: &MetadataManager,
721 stream_ctx: StreamContext,
722 sink: Option<&Sink>,
723 creating_sink_table_fragments: Option<&StreamJobFragments>,
724 dropping_sink_id: Option<SinkId>,
725 streaming_job: &StreamingJob,
726 fragment_graph: StreamFragmentGraph,
727 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
728 let (mut replace_table_ctx, mut stream_job_fragments) = self
729 .build_replace_job(stream_ctx, streaming_job, fragment_graph, tmp_id as _)
730 .await?;
731
732 let target_table = streaming_job.table().unwrap();
733
734 if let Some(creating_sink_table_fragments) = creating_sink_table_fragments {
735 let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();
736 let sink = sink.expect("sink not found");
737 Self::inject_replace_table_plan_for_sink(
738 sink.id,
739 &sink_fragment,
740 target_table,
741 &mut replace_table_ctx,
742 stream_job_fragments.inner.union_fragment_for_table(),
743 None,
744 );
745 }
746
747 let [table_catalog]: [_; 1] = mgr
748 .get_table_catalog_by_ids(vec![target_table.id])
749 .await?
750 .try_into()
751 .expect("Target table should exist in sink into table");
752
753 {
754 let catalogs = mgr
755 .get_sink_catalog_by_ids(&table_catalog.incoming_sinks)
756 .await?;
757
758 for sink in catalogs {
759 let sink_id = sink.id;
760
761 if let Some(dropping_sink_id) = dropping_sink_id
762 && sink_id == (dropping_sink_id as u32)
763 {
764 continue;
765 };
766
767 let sink_table_fragments = mgr
768 .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id))
769 .await?;
770
771 let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
772
773 Self::inject_replace_table_plan_for_sink(
774 sink_id,
775 &sink_fragment,
776 target_table,
777 &mut replace_table_ctx,
778 stream_job_fragments.inner.union_fragment_for_table(),
779 Some(&sink.unique_identity()),
780 );
781 }
782 }
783
784 for fragment in stream_job_fragments.fragments.values() {
786 {
787 {
788 visit_stream_node(&fragment.nodes, |node| {
789 if let NodeBody::Merge(merge_node) = node {
790 let upstream_fragment_id = merge_node.upstream_fragment_id;
791 if let Some(external_upstream_fragment_downstreams) = replace_table_ctx
792 .upstream_fragment_downstreams
793 .get(&upstream_fragment_id)
794 {
795 let mut upstream_fragment_downstreams =
796 external_upstream_fragment_downstreams.iter().filter(
797 |downstream| {
798 downstream.downstream_fragment_id
799 == fragment.fragment_id
800 },
801 );
802 assert!(
803 upstream_fragment_downstreams.next().is_some(),
804 "All the mergers for the union should have been fully assigned beforehand."
805 );
806 } else {
807 let mut upstream_fragment_downstreams = stream_job_fragments
808 .downstreams
809 .get(&upstream_fragment_id)
810 .into_iter()
811 .flatten()
812 .filter(|downstream| {
813 downstream.downstream_fragment_id == fragment.fragment_id
814 });
815 assert!(
816 upstream_fragment_downstreams.next().is_some(),
817 "All the mergers for the union should have been fully assigned beforehand."
818 );
819 }
820 }
821 });
822 }
823 }
824 }
825
826 Ok((replace_table_ctx, stream_job_fragments))
827 }
828
829 pub(crate) fn inject_replace_table_plan_for_sink(
830 sink_id: u32,
831 sink_fragment: &Fragment,
832 table: &Table,
833 replace_table_ctx: &mut ReplaceStreamJobContext,
834 union_fragment: &mut Fragment,
835 unique_identity: Option<&str>,
836 ) {
837 let sink_fields = sink_fragment.nodes.fields.clone();
838
839 let output_indices = sink_fields
840 .iter()
841 .enumerate()
842 .map(|(idx, _)| idx as _)
843 .collect_vec();
844
845 let dist_key_indices = table.distribution_key.iter().map(|i| *i as _).collect_vec();
846
847 let sink_fragment_downstreams = replace_table_ctx
848 .upstream_fragment_downstreams
849 .entry(sink_fragment.fragment_id)
850 .or_default();
851
852 {
853 sink_fragment_downstreams.push(DownstreamFragmentRelation {
854 downstream_fragment_id: union_fragment.fragment_id,
855 dispatcher_type: DispatcherType::Hash,
856 dist_key_indices: dist_key_indices.clone(),
857 output_indices: output_indices.clone(),
858 });
859 }
860
861 let upstream_fragment_id = sink_fragment.fragment_id;
862
863 {
864 {
865 visit_stream_node_cont_mut(&mut union_fragment.nodes, |node| {
866 if let Some(NodeBody::Union(_)) = &mut node.node_body {
867 for input_project_node in &mut node.input {
868 if let Some(NodeBody::Project(_)) = &mut input_project_node.node_body {
869 let merge_stream_node =
870 input_project_node.input.iter_mut().exactly_one().unwrap();
871
872 if input_project_node.identity.as_str()
874 != unique_identity
875 .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
876 {
877 continue;
878 }
879
880 if let Some(NodeBody::Merge(merge_node)) =
881 &mut merge_stream_node.node_body
882 {
883 {
884 merge_stream_node.identity =
885 format!("MergeExecutor(from sink {})", sink_id);
886
887 input_project_node.identity =
888 format!("ProjectExecutor(from sink {})", sink_id);
889 }
890
891 **merge_node = {
892 #[expect(deprecated)]
893 MergeNode {
894 upstream_actor_id: vec![],
895 upstream_fragment_id,
896 upstream_dispatcher_type: PbDispatcherType::Hash as _,
897 fields: sink_fields.to_vec(),
898 }
899 };
900
901 merge_stream_node.fields = sink_fields.to_vec();
902
903 return false;
904 }
905 }
906 }
907 }
908 true
909 });
910 }
911 }
912 }
913
914 pub async fn create_streaming_job(
917 &self,
918 mut streaming_job: StreamingJob,
919 fragment_graph: StreamFragmentGraphProto,
920 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
921 dependencies: HashSet<ObjectId>,
922 specific_resource_group: Option<String>,
923 ) -> MetaResult<NotificationVersion> {
924 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
925 self.metadata_manager
926 .catalog_controller
927 .create_job_catalog(
928 &mut streaming_job,
929 &ctx,
930 &fragment_graph.parallelism,
931 fragment_graph.max_parallelism as _,
932 dependencies,
933 specific_resource_group.clone(),
934 )
935 .await?;
936 let job_id = streaming_job.id();
937
938 tracing::debug!(
939 id = job_id,
940 definition = streaming_job.definition(),
941 create_type = streaming_job.create_type().as_str_name(),
942 job_type = ?streaming_job.job_type(),
943 "starting streaming job",
944 );
945 let _permit = self
946 .creating_streaming_job_permits
947 .semaphore
948 .acquire()
949 .await
950 .unwrap();
951 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
952
953 let name = streaming_job.name();
954 let definition = streaming_job.definition();
955 let source_id = match &streaming_job {
956 StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
957 _ => None,
958 };
959
960 match self
962 .create_streaming_job_inner(
963 ctx,
964 streaming_job,
965 fragment_graph,
966 affected_table_replace_info,
967 specific_resource_group,
968 )
969 .await
970 {
971 Ok(version) => Ok(version),
972 Err(err) => {
973 tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
974 let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
975 id: job_id,
976 name,
977 definition,
978 error: err.as_report().to_string(),
979 };
980 self.env.event_log_manager_ref().add_event_logs(vec![
981 risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
982 ]);
983 let (aborted, _) = self
984 .metadata_manager
985 .catalog_controller
986 .try_abort_creating_streaming_job(job_id as _, false)
987 .await?;
988 if aborted {
989 tracing::warn!(id = job_id, "aborted streaming job");
990 if let Some(source_id) = source_id {
992 self.source_manager
993 .apply_source_change(SourceChange::DropSource {
994 dropped_source_ids: vec![source_id as SourceId],
995 })
996 .await;
997 }
998 }
999 Err(err)
1000 }
1001 }
1002 }
1003
1004 async fn create_streaming_job_inner(
1005 &self,
1006 ctx: StreamContext,
1007 mut streaming_job: StreamingJob,
1008 fragment_graph: StreamFragmentGraphProto,
1009 affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1010 specific_resource_group: Option<String>,
1011 ) -> MetaResult<NotificationVersion> {
1012 let mut fragment_graph =
1013 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1014 streaming_job.set_info_from_graph(&fragment_graph);
1015
1016 let incomplete_internal_tables = fragment_graph
1018 .incomplete_internal_tables()
1019 .into_values()
1020 .collect_vec();
1021 let table_id_map = self
1022 .metadata_manager
1023 .catalog_controller
1024 .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1025 .await?;
1026 fragment_graph.refill_internal_table_ids(table_id_map);
1027
1028 let affected_table_replace_info = match affected_table_replace_info {
1029 Some(replace_table_info) => {
1030 assert!(
1031 specific_resource_group.is_none(),
1032 "specific_resource_group is not supported for replace table (alter column or sink into table)"
1033 );
1034
1035 let ReplaceStreamJobInfo {
1036 mut streaming_job,
1037 fragment_graph,
1038 ..
1039 } = replace_table_info;
1040
1041 let original_max_parallelism = self
1043 .metadata_manager
1044 .get_job_max_parallelism(streaming_job.id().into())
1045 .await?;
1046 let fragment_graph = PbStreamFragmentGraph {
1047 max_parallelism: original_max_parallelism as _,
1048 ..fragment_graph
1049 };
1050
1051 let fragment_graph =
1052 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1053 streaming_job.set_info_from_graph(&fragment_graph);
1054 let streaming_job = streaming_job;
1055
1056 Some((streaming_job, fragment_graph))
1057 }
1058 None => None,
1059 };
1060
1061 tracing::debug!(id = streaming_job.id(), "building streaming job");
1063 let (ctx, stream_job_fragments) = self
1064 .build_stream_job(
1065 ctx,
1066 streaming_job,
1067 fragment_graph,
1068 affected_table_replace_info,
1069 specific_resource_group,
1070 )
1071 .await?;
1072
1073 let streaming_job = &ctx.streaming_job;
1074
1075 match streaming_job {
1076 StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1077 Self::validate_cdc_table(table, &stream_job_fragments).await?;
1078 }
1079 StreamingJob::Table(Some(source), ..) => {
1080 self.source_manager.register_source(source).await?;
1082 let connector_name = source
1083 .get_with_properties()
1084 .get(UPSTREAM_SOURCE_KEY)
1085 .cloned();
1086 let attr = source.info.as_ref().map(|source_info| {
1087 jsonbb::json!({
1088 "format": source_info.format().as_str_name(),
1089 "encode": source_info.row_encode().as_str_name(),
1090 })
1091 });
1092 report_create_object(
1093 streaming_job.id(),
1094 "source",
1095 PbTelemetryDatabaseObject::Source,
1096 connector_name,
1097 attr,
1098 );
1099 }
1100 StreamingJob::Sink(sink, _) => {
1101 validate_sink(sink).await?;
1103 let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1104 let attr = sink.format_desc.as_ref().map(|sink_info| {
1105 jsonbb::json!({
1106 "format": sink_info.format().as_str_name(),
1107 "encode": sink_info.encode().as_str_name(),
1108 })
1109 });
1110 report_create_object(
1111 streaming_job.id(),
1112 "sink",
1113 PbTelemetryDatabaseObject::Sink,
1114 connector_name,
1115 attr,
1116 );
1117 }
1118 StreamingJob::Source(source) => {
1119 self.source_manager.register_source(source).await?;
1121 let connector_name = source
1122 .get_with_properties()
1123 .get(UPSTREAM_SOURCE_KEY)
1124 .cloned();
1125 let attr = source.info.as_ref().map(|source_info| {
1126 jsonbb::json!({
1127 "format": source_info.format().as_str_name(),
1128 "encode": source_info.row_encode().as_str_name(),
1129 })
1130 });
1131 report_create_object(
1132 streaming_job.id(),
1133 "source",
1134 PbTelemetryDatabaseObject::Source,
1135 connector_name,
1136 attr,
1137 );
1138 }
1139 _ => {}
1140 }
1141
1142 self.metadata_manager
1143 .catalog_controller
1144 .prepare_streaming_job(&stream_job_fragments, streaming_job, false)
1145 .await?;
1146
1147 let stream_job_id = streaming_job.id();
1149 match (streaming_job.create_type(), &streaming_job) {
1150 (CreateType::Unspecified, _)
1151 | (CreateType::Foreground, _)
1152 | (CreateType::Background, StreamingJob::Sink(_, _)) => {
1154 let version = self.stream_manager
1155 .create_streaming_job(stream_job_fragments, ctx, None)
1156 .await?;
1157 Ok(version)
1158 }
1159 (CreateType::Background, _) => {
1160 let ctrl = self.clone();
1161 let (tx, rx) = oneshot::channel();
1162 let fut = async move {
1163 let _ = ctrl
1164 .stream_manager
1165 .create_streaming_job(stream_job_fragments, ctx, Some(tx))
1166 .await.inspect_err(|err| {
1167 tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
1168 });
1169 };
1170 tokio::spawn(fut);
1171 rx.await.map_err(|_| anyhow!("failed to receive create streaming job result of job: {}", stream_job_id))??;
1172 Ok(IGNORED_NOTIFICATION_VERSION)
1173 }
1174 }
1175 }
1176
1177 pub async fn drop_object(
1179 &self,
1180 object_type: ObjectType,
1181 object_id: ObjectId,
1182 drop_mode: DropMode,
1183 target_replace_info: Option<ReplaceStreamJobInfo>,
1184 ) -> MetaResult<NotificationVersion> {
1185 let (release_ctx, mut version) = self
1186 .metadata_manager
1187 .catalog_controller
1188 .drop_object(object_type, object_id, drop_mode)
1189 .await?;
1190
1191 if let Some(replace_table_info) = target_replace_info {
1192 let stream_ctx =
1193 StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap());
1194
1195 let ReplaceStreamJobInfo {
1196 mut streaming_job,
1197 fragment_graph,
1198 ..
1199 } = replace_table_info;
1200
1201 let sink_id = if let ObjectType::Sink = object_type {
1202 object_id as _
1203 } else {
1204 panic!("additional replace table event only occurs when dropping sink into table")
1205 };
1206
1207 let original_max_parallelism = self
1209 .metadata_manager
1210 .get_job_max_parallelism(streaming_job.id().into())
1211 .await?;
1212 let fragment_graph = PbStreamFragmentGraph {
1213 max_parallelism: original_max_parallelism as _,
1214 ..fragment_graph
1215 };
1216
1217 let fragment_graph =
1218 StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1219 streaming_job.set_info_from_graph(&fragment_graph);
1220 let streaming_job = streaming_job;
1221
1222 streaming_job.table().expect("should be table job");
1223
1224 tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink");
1225 let tmp_id = self
1226 .metadata_manager
1227 .catalog_controller
1228 .create_job_catalog_for_replace(
1229 &streaming_job,
1230 &stream_ctx,
1231 &fragment_graph.specified_parallelism(),
1232 fragment_graph.max_parallelism(),
1233 )
1234 .await? as u32;
1235
1236 let (ctx, stream_job_fragments) = self
1237 .inject_replace_table_job_for_table_sink(
1238 tmp_id,
1239 &self.metadata_manager,
1240 stream_ctx,
1241 None,
1242 None,
1243 Some(sink_id),
1244 &streaming_job,
1245 fragment_graph,
1246 )
1247 .await?;
1248
1249 let result: MetaResult<_> = try {
1250 let replace_upstream = ctx.replace_upstream.clone();
1251
1252 self.metadata_manager
1253 .catalog_controller
1254 .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1255 .await?;
1256
1257 self.stream_manager
1258 .replace_stream_job(stream_job_fragments, ctx)
1259 .await?;
1260
1261 replace_upstream
1262 };
1263
1264 version = match result {
1265 Ok(replace_upstream) => {
1266 let version = self
1267 .metadata_manager
1268 .catalog_controller
1269 .finish_replace_streaming_job(
1270 tmp_id as _,
1271 streaming_job,
1272 replace_upstream,
1273 None,
1274 SinkIntoTableContext {
1275 creating_sink_id: None,
1276 dropping_sink_id: Some(sink_id),
1277 updated_sink_catalogs: vec![],
1278 },
1279 None, )
1281 .await?;
1282 Ok(version)
1283 }
1284 Err(err) => {
1285 tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table");
1286 let _ = self.metadata_manager
1287 .catalog_controller
1288 .try_abort_replacing_streaming_job(tmp_id as _)
1289 .await
1290 .inspect_err(|err| {
1291 tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table");
1292 });
1293 Err(err)
1294 }
1295 }?;
1296 }
1297
1298 let ReleaseContext {
1299 database_id,
1300 removed_streaming_job_ids,
1301 removed_state_table_ids,
1302 removed_source_ids,
1303 removed_secret_ids: secret_ids,
1304 removed_source_fragments,
1305 removed_actors,
1306 removed_fragments,
1307 } = release_ctx;
1308
1309 let _guard = self.source_manager.pause_tick().await;
1310 self.stream_manager
1311 .drop_streaming_jobs(
1312 risingwave_common::catalog::DatabaseId::new(database_id as _),
1313 removed_actors.iter().map(|id| *id as _).collect(),
1314 removed_streaming_job_ids,
1315 removed_state_table_ids,
1316 removed_fragments.iter().map(|id| *id as _).collect(),
1317 )
1318 .await;
1319
1320 self.source_manager
1323 .apply_source_change(SourceChange::DropSource {
1324 dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1325 })
1326 .await;
1327
1328 let dropped_source_fragments = removed_source_fragments
1331 .into_iter()
1332 .map(|(source_id, fragments)| {
1333 (
1334 source_id,
1335 fragments.into_iter().map(|id| id as u32).collect(),
1336 )
1337 })
1338 .collect();
1339 let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
1340 self.source_manager
1341 .apply_source_change(SourceChange::DropMv {
1342 dropped_source_fragments,
1343 dropped_actors,
1344 })
1345 .await;
1346
1347 for secret in secret_ids {
1349 LocalSecretManager::global().remove_secret(secret as _);
1350 }
1351 Ok(version)
1352 }
1353
1354 pub async fn replace_job(
1356 &self,
1357 mut streaming_job: StreamingJob,
1358 fragment_graph: StreamFragmentGraphProto,
1359 col_index_mapping: Option<ColIndexMapping>,
1360 ) -> MetaResult<NotificationVersion> {
1361 match &mut streaming_job {
1362 StreamingJob::Table(..) | StreamingJob::Source(..) => {}
1363 StreamingJob::MaterializedView(..)
1364 | StreamingJob::Sink(..)
1365 | StreamingJob::Index(..) => {
1366 bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1367 }
1368 }
1369
1370 let job_id = streaming_job.id();
1371
1372 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1373 let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1374
1375 let original_max_parallelism = self
1377 .metadata_manager
1378 .get_job_max_parallelism(streaming_job.id().into())
1379 .await?;
1380 let fragment_graph = PbStreamFragmentGraph {
1381 max_parallelism: original_max_parallelism as _,
1382 ..fragment_graph
1383 };
1384
1385 let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1387 streaming_job.set_info_from_graph(&fragment_graph);
1388
1389 let streaming_job = streaming_job;
1391
1392 let tmp_id = self
1393 .metadata_manager
1394 .catalog_controller
1395 .create_job_catalog_for_replace(
1396 &streaming_job,
1397 &ctx,
1398 &fragment_graph.specified_parallelism(),
1399 fragment_graph.max_parallelism(),
1400 )
1401 .await?;
1402
1403 tracing::debug!(id = job_id, "building replace streaming job");
1404 let mut updated_sink_catalogs = vec![];
1405
1406 let mut drop_table_connector_ctx = None;
1407 let result: MetaResult<_> = try {
1408 let (mut ctx, mut stream_job_fragments) = self
1409 .build_replace_job(ctx, &streaming_job, fragment_graph, tmp_id as _)
1410 .await?;
1411 drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1412
1413 if let StreamingJob::Table(_, table, ..) = &streaming_job {
1414 let catalogs = self
1415 .metadata_manager
1416 .get_sink_catalog_by_ids(&table.incoming_sinks)
1417 .await?;
1418
1419 for sink in catalogs {
1420 let sink_id = &sink.id;
1421
1422 let sink_table_fragments = self
1423 .metadata_manager
1424 .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(
1425 *sink_id,
1426 ))
1427 .await?;
1428
1429 let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
1430
1431 Self::inject_replace_table_plan_for_sink(
1432 *sink_id,
1433 &sink_fragment,
1434 table,
1435 &mut ctx,
1436 stream_job_fragments.inner.union_fragment_for_table(),
1437 Some(&sink.unique_identity()),
1438 );
1439
1440 if sink.original_target_columns.is_empty() {
1441 updated_sink_catalogs.push(sink.id as _);
1442 }
1443 }
1444 }
1445
1446 let replace_upstream = ctx.replace_upstream.clone();
1447
1448 self.metadata_manager
1449 .catalog_controller
1450 .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1451 .await?;
1452
1453 self.stream_manager
1454 .replace_stream_job(stream_job_fragments, ctx)
1455 .await?;
1456 replace_upstream
1457 };
1458
1459 match result {
1460 Ok(replace_upstream) => {
1461 let version = self
1462 .metadata_manager
1463 .catalog_controller
1464 .finish_replace_streaming_job(
1465 tmp_id,
1466 streaming_job,
1467 replace_upstream,
1468 col_index_mapping,
1469 SinkIntoTableContext {
1470 creating_sink_id: None,
1471 dropping_sink_id: None,
1472 updated_sink_catalogs,
1473 },
1474 drop_table_connector_ctx.as_ref(),
1475 )
1476 .await?;
1477 if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1478 self.source_manager
1479 .apply_source_change(SourceChange::DropSource {
1480 dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1481 })
1482 .await;
1483 }
1484 Ok(version)
1485 }
1486 Err(err) => {
1487 tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
1488 let _ = self.metadata_manager
1489 .catalog_controller
1490 .try_abort_replacing_streaming_job(tmp_id)
1491 .await.inspect_err(|err| {
1492 tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
1493 });
1494 Err(err)
1495 }
1496 }
1497 }
1498
1499 async fn drop_streaming_job(
1500 &self,
1501 job_id: StreamingJobId,
1502 drop_mode: DropMode,
1503 target_replace_info: Option<ReplaceStreamJobInfo>,
1504 ) -> MetaResult<NotificationVersion> {
1505 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1506
1507 let (object_id, object_type) = match job_id {
1508 StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
1509 StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
1510 StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
1511 StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
1512 };
1513
1514 let version = self
1515 .drop_object(object_type, object_id, drop_mode, target_replace_info)
1516 .await?;
1517 #[cfg(not(madsim))]
1518 if let StreamingJobId::Sink(sink_id) = job_id {
1519 let db = self.env.meta_store_ref().conn.clone();
1522 clean_all_rows_by_sink_id(&db, sink_id).await?;
1523 }
1524 Ok(version)
1525 }
1526
1527 fn resolve_stream_parallelism(
1531 &self,
1532 specified: Option<NonZeroUsize>,
1533 max: NonZeroUsize,
1534 cluster_info: &StreamingClusterInfo,
1535 resource_group: String,
1536 ) -> MetaResult<NonZeroUsize> {
1537 let available = cluster_info.parallelism(&resource_group);
1538 let Some(available) = NonZeroUsize::new(available) else {
1539 bail_unavailable!(
1540 "no available slots to schedule in resource group \"{}\", \
1541 have you allocated any compute nodes within this resource group?",
1542 resource_group
1543 );
1544 };
1545
1546 if let Some(specified) = specified {
1547 if specified > max {
1548 bail_invalid_parameter!(
1549 "specified parallelism {} should not exceed max parallelism {}",
1550 specified,
1551 max,
1552 );
1553 }
1554 if specified > available {
1555 bail_unavailable!(
1556 "insufficient parallelism to schedule in resource group \"{}\", \
1557 required: {}, available: {}",
1558 resource_group,
1559 specified,
1560 available,
1561 );
1562 }
1563 Ok(specified)
1564 } else {
1565 let default_parallelism = match self.env.opts.default_parallelism {
1567 DefaultParallelism::Full => available,
1568 DefaultParallelism::Default(num) => {
1569 if num > available {
1570 bail_unavailable!(
1571 "insufficient parallelism to schedule in resource group \"{}\", \
1572 required: {}, available: {}",
1573 resource_group,
1574 num,
1575 available,
1576 );
1577 }
1578 num
1579 }
1580 };
1581
1582 if default_parallelism > max {
1583 tracing::warn!(
1584 max_parallelism = max.get(),
1585 resource_group,
1586 "too many parallelism available, use max parallelism instead",
1587 );
1588 }
1589 Ok(default_parallelism.min(max))
1590 }
1591 }
1592
1593 pub(crate) async fn build_stream_job(
1599 &self,
1600 stream_ctx: StreamContext,
1601 mut stream_job: StreamingJob,
1602 fragment_graph: StreamFragmentGraph,
1603 affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>,
1604 specific_resource_group: Option<String>,
1605 ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1606 let id = stream_job.id();
1607 let specified_parallelism = fragment_graph.specified_parallelism();
1608 let expr_context = stream_ctx.to_expr_context();
1609 let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1610
1611 let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1613
1614 let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1618 fragment_graph.collect_snapshot_backfill_info()?;
1619 assert!(
1620 snapshot_backfill_info
1621 .iter()
1622 .chain([&cross_db_snapshot_backfill_info])
1623 .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1624 .all(|backfill_epoch| backfill_epoch.is_none()),
1625 "should not set backfill epoch when initially build the job: {:?} {:?}",
1626 snapshot_backfill_info,
1627 cross_db_snapshot_backfill_info
1628 );
1629
1630 self.metadata_manager
1632 .catalog_controller
1633 .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1634 .await?;
1635
1636 let upstream_table_ids = fragment_graph
1637 .dependent_table_ids()
1638 .iter()
1639 .filter(|id| {
1640 !cross_db_snapshot_backfill_info
1641 .upstream_mv_table_id_to_backfill_epoch
1642 .contains_key(id)
1643 })
1644 .cloned()
1645 .collect();
1646
1647 let (upstream_root_fragments, existing_actor_location) = self
1648 .metadata_manager
1649 .get_upstream_root_fragments(&upstream_table_ids)
1650 .await?;
1651
1652 if snapshot_backfill_info.is_some() {
1653 match stream_job {
1654 StreamingJob::MaterializedView(_)
1655 | StreamingJob::Sink(_, _)
1656 | StreamingJob::Index(_, _) => {}
1657 StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1658 return Err(
1659 anyhow!("snapshot_backfill not enabled for table and source").into(),
1660 );
1661 }
1662 }
1663 }
1664
1665 let upstream_actors = upstream_root_fragments
1666 .values()
1667 .map(|fragment| {
1668 (
1669 fragment.fragment_id,
1670 fragment.actors.iter().map(|actor| actor.actor_id).collect(),
1671 )
1672 })
1673 .collect();
1674
1675 let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1676 fragment_graph,
1677 upstream_root_fragments,
1678 existing_actor_location,
1679 (&stream_job).into(),
1680 )?;
1681
1682 let resource_group = match specific_resource_group {
1683 None => {
1684 self.metadata_manager
1685 .get_database_resource_group(stream_job.database_id() as ObjectId)
1686 .await?
1687 }
1688 Some(resource_group) => resource_group,
1689 };
1690
1691 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1693
1694 let parallelism = self.resolve_stream_parallelism(
1695 specified_parallelism,
1696 max_parallelism,
1697 &cluster_info,
1698 resource_group.clone(),
1699 )?;
1700
1701 let parallelism = self
1702 .env
1703 .system_params_reader()
1704 .await
1705 .adaptive_parallelism_strategy()
1706 .compute_target_parallelism(parallelism.get());
1707
1708 let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1709 let actor_graph_builder = ActorGraphBuilder::new(
1710 id,
1711 resource_group,
1712 complete_graph,
1713 cluster_info,
1714 parallelism,
1715 )?;
1716
1717 let ActorGraphBuildResult {
1718 graph,
1719 downstream_fragment_relations,
1720 building_locations,
1721 existing_locations,
1722 upstream_fragment_downstreams,
1723 new_no_shuffle,
1724 replace_upstream,
1725 ..
1726 } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
1727 assert!(replace_upstream.is_empty());
1728
1729 let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1736 (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1737 _ => TableParallelism::Fixed(parallelism.get()),
1738 };
1739
1740 let stream_job_fragments = StreamJobFragments::new(
1741 id.into(),
1742 graph,
1743 &building_locations.actor_locations,
1744 stream_ctx.clone(),
1745 table_parallelism,
1746 max_parallelism.get(),
1747 );
1748 let internal_tables = stream_job_fragments.internal_tables();
1749
1750 if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1751 stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1752 }
1753
1754 let replace_table_job_info = match affected_table_replace_info {
1755 Some((table_stream_job, fragment_graph)) => {
1756 if snapshot_backfill_info.is_some() {
1757 return Err(anyhow!(
1758 "snapshot backfill should not have replace table info: {table_stream_job:?}"
1759 )
1760 .into());
1761 }
1762 let StreamingJob::Sink(sink, target_table) = &mut stream_job else {
1763 bail!("additional replace table event only occurs when sinking into table");
1764 };
1765
1766 table_stream_job.table().expect("should be table job");
1767 let tmp_id = self
1768 .metadata_manager
1769 .catalog_controller
1770 .create_job_catalog_for_replace(
1771 &table_stream_job,
1772 &stream_ctx,
1773 &fragment_graph.specified_parallelism(),
1774 fragment_graph.max_parallelism(),
1775 )
1776 .await? as u32;
1777
1778 let (context, table_fragments) = self
1779 .inject_replace_table_job_for_table_sink(
1780 tmp_id,
1781 &self.metadata_manager,
1782 stream_ctx,
1783 Some(sink),
1784 Some(&stream_job_fragments),
1785 None,
1786 &table_stream_job,
1787 fragment_graph,
1788 )
1789 .await?;
1790 must_match!(&table_stream_job, StreamingJob::Table(source, table, _) => {
1794 *target_table = Some((table.clone(), source.clone()));
1796 });
1797
1798 Some((table_stream_job, context, table_fragments))
1799 }
1800 None => None,
1801 };
1802
1803 let ctx = CreateStreamingJobContext {
1804 upstream_fragment_downstreams,
1805 new_no_shuffle,
1806 upstream_actors,
1807 internal_tables,
1808 building_locations,
1809 existing_locations,
1810 definition: stream_job.definition(),
1811 mv_table_id: stream_job.mv_table(),
1812 create_type: stream_job.create_type(),
1813 job_type: (&stream_job).into(),
1814 streaming_job: stream_job,
1815 replace_table_job_info,
1816 option: CreateStreamingJobOption {},
1817 snapshot_backfill_info,
1818 cross_db_snapshot_backfill_info,
1819 fragment_backfill_ordering,
1820 };
1821
1822 Ok((
1823 ctx,
1824 StreamJobFragmentsToCreate {
1825 inner: stream_job_fragments,
1826 downstreams: downstream_fragment_relations,
1827 },
1828 ))
1829 }
1830
1831 pub(crate) async fn build_replace_job(
1837 &self,
1838 stream_ctx: StreamContext,
1839 stream_job: &StreamingJob,
1840 mut fragment_graph: StreamFragmentGraph,
1841 tmp_job_id: TableId,
1842 ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1843 match &stream_job {
1844 StreamingJob::Table(..) | StreamingJob::Source(..) => {}
1845 StreamingJob::MaterializedView(..)
1846 | StreamingJob::Sink(..)
1847 | StreamingJob::Index(..) => {
1848 bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1849 }
1850 }
1851
1852 let id = stream_job.id();
1853 let expr_context = stream_ctx.to_expr_context();
1854
1855 let mut drop_table_associated_source_id = None;
1857 if let StreamingJob::Table(None, _, _) = &stream_job {
1858 drop_table_associated_source_id = self
1859 .metadata_manager
1860 .get_table_associated_source_id(id as _)
1861 .await?;
1862 }
1863
1864 let old_fragments = self
1865 .metadata_manager
1866 .get_job_fragments_by_id(&id.into())
1867 .await?;
1868 let old_internal_table_ids = old_fragments.internal_table_ids();
1869
1870 let mut drop_table_connector_ctx = None;
1872 if drop_table_associated_source_id.is_some() {
1873 debug_assert!(old_internal_table_ids.len() == 1);
1875
1876 drop_table_connector_ctx = Some(DropTableConnectorContext {
1877 to_change_streaming_job_id: id as i32,
1880 to_remove_state_table_id: old_internal_table_ids[0] as i32, to_remove_source_id: drop_table_associated_source_id.unwrap(),
1882 });
1883 } else {
1884 let old_internal_tables = self
1885 .metadata_manager
1886 .get_table_catalog_by_ids(old_internal_table_ids)
1887 .await?;
1888 fragment_graph.fit_internal_table_ids(old_internal_tables)?;
1889 }
1890
1891 let original_root_fragment = old_fragments
1894 .root_fragment()
1895 .expect("root fragment not found");
1896
1897 let job_type = StreamingJobType::from(stream_job);
1898
1899 let (downstream_fragments, downstream_actor_location) =
1901 self.metadata_manager.get_downstream_fragments(id).await?;
1902
1903 let complete_graph = match &job_type {
1905 StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
1906 CompleteStreamFragmentGraph::with_downstreams(
1907 fragment_graph,
1908 original_root_fragment.fragment_id,
1909 downstream_fragments,
1910 downstream_actor_location,
1911 job_type,
1912 )?
1913 }
1914 StreamingJobType::Table(TableJobType::SharedCdcSource) => {
1915 let (upstream_root_fragments, upstream_actor_location) = self
1917 .metadata_manager
1918 .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
1919 .await?;
1920
1921 CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
1922 fragment_graph,
1923 upstream_root_fragments,
1924 upstream_actor_location,
1925 original_root_fragment.fragment_id,
1926 downstream_fragments,
1927 downstream_actor_location,
1928 job_type,
1929 )?
1930 }
1931 _ => unreachable!(),
1932 };
1933
1934 let resource_group = self
1935 .metadata_manager
1936 .get_existing_job_resource_group(id as ObjectId)
1937 .await?;
1938
1939 let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1941
1942 let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
1945 .expect("The number of actors in the original table fragment should be greater than 0");
1946
1947 let actor_graph_builder = ActorGraphBuilder::new(
1948 id,
1949 resource_group,
1950 complete_graph,
1951 cluster_info,
1952 parallelism,
1953 )?;
1954
1955 let ActorGraphBuildResult {
1956 graph,
1957 downstream_fragment_relations,
1958 building_locations,
1959 existing_locations,
1960 upstream_fragment_downstreams,
1961 replace_upstream,
1962 new_no_shuffle,
1963 ..
1964 } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
1965
1966 if matches!(
1968 job_type,
1969 StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
1970 ) {
1971 assert!(upstream_fragment_downstreams.is_empty());
1972 }
1973
1974 let stream_job_fragments = StreamJobFragments::new(
1978 (tmp_job_id as u32).into(),
1979 graph,
1980 &building_locations.actor_locations,
1981 stream_ctx,
1982 old_fragments.assigned_parallelism,
1983 old_fragments.max_parallelism,
1984 );
1985
1986 let ctx = ReplaceStreamJobContext {
1990 old_fragments,
1991 replace_upstream,
1992 new_no_shuffle,
1993 upstream_fragment_downstreams,
1994 building_locations,
1995 existing_locations,
1996 streaming_job: stream_job.clone(),
1997 tmp_id: tmp_job_id as _,
1998 drop_table_connector_ctx,
1999 };
2000
2001 Ok((
2002 ctx,
2003 StreamJobFragmentsToCreate {
2004 inner: stream_job_fragments,
2005 downstreams: downstream_fragment_relations,
2006 },
2007 ))
2008 }
2009
2010 async fn alter_name(
2011 &self,
2012 relation: alter_name_request::Object,
2013 new_name: &str,
2014 ) -> MetaResult<NotificationVersion> {
2015 let (obj_type, id) = match relation {
2016 alter_name_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2017 alter_name_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2018 alter_name_request::Object::IndexId(id) => (ObjectType::Index, id as ObjectId),
2019 alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2020 alter_name_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2021 alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2022 alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2023 alter_name_request::Object::SubscriptionId(id) => {
2024 (ObjectType::Subscription, id as ObjectId)
2025 }
2026 };
2027 self.metadata_manager
2028 .catalog_controller
2029 .alter_name(obj_type, id, new_name)
2030 .await
2031 }
2032
2033 async fn alter_swap_rename(
2034 &self,
2035 object: alter_swap_rename_request::Object,
2036 ) -> MetaResult<NotificationVersion> {
2037 let (obj_type, src_id, dst_id) = match object {
2038 alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2039 alter_swap_rename_request::Object::Table(objs) => {
2040 let (src_id, dst_id) = (
2041 objs.src_object_id as ObjectId,
2042 objs.dst_object_id as ObjectId,
2043 );
2044 (ObjectType::Table, src_id, dst_id)
2045 }
2046 alter_swap_rename_request::Object::View(objs) => {
2047 let (src_id, dst_id) = (
2048 objs.src_object_id as ObjectId,
2049 objs.dst_object_id as ObjectId,
2050 );
2051 (ObjectType::View, src_id, dst_id)
2052 }
2053 alter_swap_rename_request::Object::Source(objs) => {
2054 let (src_id, dst_id) = (
2055 objs.src_object_id as ObjectId,
2056 objs.dst_object_id as ObjectId,
2057 );
2058 (ObjectType::Source, src_id, dst_id)
2059 }
2060 alter_swap_rename_request::Object::Sink(objs) => {
2061 let (src_id, dst_id) = (
2062 objs.src_object_id as ObjectId,
2063 objs.dst_object_id as ObjectId,
2064 );
2065 (ObjectType::Sink, src_id, dst_id)
2066 }
2067 alter_swap_rename_request::Object::Subscription(objs) => {
2068 let (src_id, dst_id) = (
2069 objs.src_object_id as ObjectId,
2070 objs.dst_object_id as ObjectId,
2071 );
2072 (ObjectType::Subscription, src_id, dst_id)
2073 }
2074 };
2075
2076 self.metadata_manager
2077 .catalog_controller
2078 .alter_swap_rename(obj_type, src_id, dst_id)
2079 .await
2080 }
2081
2082 async fn alter_owner(
2083 &self,
2084 object: Object,
2085 owner_id: UserId,
2086 ) -> MetaResult<NotificationVersion> {
2087 let (obj_type, id) = match object {
2088 Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2089 Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2090 Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2091 Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2092 Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2093 Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2094 Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId),
2095 Object::ConnectionId(id) => (ObjectType::Connection, id as ObjectId),
2096 };
2097 self.metadata_manager
2098 .catalog_controller
2099 .alter_owner(obj_type, id, owner_id as _)
2100 .await
2101 }
2102
2103 async fn alter_set_schema(
2104 &self,
2105 object: alter_set_schema_request::Object,
2106 new_schema_id: SchemaId,
2107 ) -> MetaResult<NotificationVersion> {
2108 let (obj_type, id) = match object {
2109 alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2110 alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2111 alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2112 alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2113 alter_set_schema_request::Object::FunctionId(id) => {
2114 (ObjectType::Function, id as ObjectId)
2115 }
2116 alter_set_schema_request::Object::ConnectionId(id) => {
2117 (ObjectType::Connection, id as ObjectId)
2118 }
2119 alter_set_schema_request::Object::SubscriptionId(id) => {
2120 (ObjectType::Subscription, id as ObjectId)
2121 }
2122 };
2123 self.metadata_manager
2124 .catalog_controller
2125 .alter_schema(obj_type, id, new_schema_id as _)
2126 .await
2127 }
2128
2129 pub async fn wait(&self) -> MetaResult<()> {
2130 let timeout_ms = 30 * 60 * 1000;
2131 for _ in 0..timeout_ms {
2132 if self
2133 .metadata_manager
2134 .catalog_controller
2135 .list_background_creating_mviews(true)
2136 .await?
2137 .is_empty()
2138 {
2139 return Ok(());
2140 }
2141
2142 sleep(Duration::from_millis(1)).await;
2143 }
2144 Err(MetaError::cancelled(format!(
2145 "timeout after {timeout_ms}ms"
2146 )))
2147 }
2148
2149 async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2150 self.metadata_manager
2151 .catalog_controller
2152 .comment_on(comment)
2153 .await
2154 }
2155}
2156
2157fn report_create_object(
2158 catalog_id: u32,
2159 event_name: &str,
2160 obj_type: PbTelemetryDatabaseObject,
2161 connector_name: Option<String>,
2162 attr_info: Option<jsonbb::Value>,
2163) {
2164 report_event(
2165 PbTelemetryEventStage::CreateStreamJob,
2166 event_name,
2167 catalog_id.into(),
2168 connector_name,
2169 Some(obj_type),
2170 attr_info,
2171 );
2172}
2173
2174async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> {
2175 match Entity::delete_many()
2176 .filter(Column::SinkId.eq(sink_id))
2177 .exec(db)
2178 .await
2179 {
2180 Ok(result) => {
2181 let deleted_count = result.rows_affected;
2182
2183 tracing::info!(
2184 "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2185 deleted_count,
2186 sink_id
2187 );
2188 Ok(())
2189 }
2190 Err(e) => {
2191 tracing::error!(
2192 "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2193 sink_id,
2194 e.as_report()
2195 );
2196 Err(e.into())
2197 }
2198 }
2199}