1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22 ColumnCatalog, FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23 RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, max_column_id,
24};
25use risingwave_common::config::DefaultParallelism;
26use risingwave_common::hash::VnodeCountCompat;
27use risingwave_common::id::JobId;
28use risingwave_common::secret::LocalSecretManager;
29use risingwave_common::system_param::AdaptiveParallelismStrategy;
30use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
31use risingwave_common::util::iter_util::ZipEqDebug;
32use risingwave_common::util::stream_graph_visitor::{
33 visit_stream_node_body, visit_stream_node_mut,
34};
35use risingwave_common::{bail, current_cluster_version};
36use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::error::ConnectorError;
39use risingwave_connector::sink::file_sink::fs::FsSink;
40use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
41use risingwave_connector::source::{
42 ConnectorProperties, UPSTREAM_SOURCE_KEY, pb_connection_type_to_connection_type,
43};
44use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
45use risingwave_meta_model::object::ObjectType;
46use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
47use risingwave_meta_model::refresh_job::RefreshState;
48use risingwave_meta_model::streaming_job::BackfillOrders;
49use risingwave_meta_model::table::TableType;
50use risingwave_meta_model::user_privilege::Action;
51use risingwave_meta_model::*;
52use risingwave_pb::catalog::table::PbEngine;
53use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
54use risingwave_pb::ddl_service::streaming_job_resource_type;
55use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
56use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
57use risingwave_pb::meta::object::PbObjectInfo;
58use risingwave_pb::meta::subscribe_response::{
59 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
60};
61use risingwave_pb::meta::{ObjectDependency as PbObjectDependency, PbObject, PbObjectGroup};
62use risingwave_pb::plan_common::PbColumnCatalog;
63use risingwave_pb::plan_common::source_refresh_mode::{
64 RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
65};
66use risingwave_pb::secret::PbSecretRef;
67use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
68use risingwave_pb::stream_plan::stream_node::PbNodeBody;
69use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
70use risingwave_pb::user::PbUserInfo;
71use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
72use risingwave_sqlparser::parser::{Parser, ParserError};
73use sea_orm::ActiveValue::Set;
74use sea_orm::sea_query::{Expr, Query, SimpleExpr};
75use sea_orm::{
76 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
77 NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
78};
79use thiserror_ext::AsReport;
80
81use super::rename::IndexItemRewriter;
82use crate::barrier::Command;
83use crate::controller::ObjectModel;
84use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
85use crate::controller::fragment::FragmentTypeMaskExt;
86use crate::controller::utils::{
87 PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
88 check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
89 ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
90 get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
91 list_object_dependencies_by_object_id, list_user_info_by_ids,
92 try_get_iceberg_table_by_downstream_sink,
93};
94use crate::error::MetaErrorInner;
95use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
96use crate::model::{
97 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
98 StreamJobFragmentsToCreate,
99};
100use crate::stream::SplitAssignment;
101use crate::{MetaError, MetaResult};
102
103#[derive(Debug)]
108struct DependentSourceFragmentUpdate {
109 job_ids: Vec<JobId>,
110 with_properties: BTreeMap<String, String>,
111 secret_refs: BTreeMap<String, PbSecretRef>,
112 is_shared_source: bool,
113}
114
115#[derive(Debug)]
116struct ReplaceOriginalJobInfo {
117 max_parallelism: i32,
118 timezone: Option<String>,
119 config_override: Option<String>,
120 adaptive_parallelism_strategy: Option<String>,
121 parallelism: StreamingParallelism,
122 specific_resource_group: Option<String>,
123}
124
125impl ReplaceOriginalJobInfo {
126 fn resolved_parallelism(
127 &self,
128 specified_parallelism: Option<&NonZeroUsize>,
129 ) -> StreamingParallelism {
130 specified_parallelism
131 .map(|n| StreamingParallelism::Fixed(n.get() as _))
132 .unwrap_or_else(|| self.parallelism.clone())
133 }
134
135 fn stream_context(&self, ctx_override: Option<&StreamContext>) -> StreamContext {
136 StreamContext {
137 timezone: ctx_override
138 .and_then(|ctx| ctx.timezone.clone())
139 .or_else(|| self.timezone.clone()),
140 config_override: self.config_override.clone().unwrap_or_default().into(),
143 }
144 }
145
146 fn resource_type(&self) -> streaming_job_resource_type::ResourceType {
147 match &self.specific_resource_group {
148 Some(group) => {
149 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group.clone())
150 }
151 None => streaming_job_resource_type::ResourceType::Regular(true),
152 }
153 }
154}
155
156impl From<streaming_job::Model> for ReplaceOriginalJobInfo {
157 fn from(model: streaming_job::Model) -> Self {
158 Self {
159 max_parallelism: model.max_parallelism,
160 timezone: model.timezone,
161 config_override: model.config_override,
162 adaptive_parallelism_strategy: model.adaptive_parallelism_strategy,
163 parallelism: model.parallelism,
164 specific_resource_group: model.specific_resource_group,
165 }
166 }
167}
168
169impl CatalogController {
170 #[expect(clippy::too_many_arguments)]
171 pub async fn create_streaming_job_obj(
172 txn: &DatabaseTransaction,
173 obj_type: ObjectType,
174 owner_id: UserId,
175 database_id: Option<DatabaseId>,
176 schema_id: Option<SchemaId>,
177 create_type: PbCreateType,
178 ctx: StreamContext,
179 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
180 streaming_parallelism: StreamingParallelism,
181 max_parallelism: usize,
182 resource_type: streaming_job_resource_type::ResourceType,
183 backfill_parallelism: Option<StreamingParallelism>,
184 backfill_adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
185 refresh_interval_sec: Option<u64>,
186 ) -> MetaResult<streaming_job::Model> {
187 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
188 let job_id = obj.oid.as_job_id();
189 let specific_resource_group = resource_type.resource_group();
190 let is_serverless_backfill = matches!(
191 &resource_type,
192 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
193 );
194 let model = streaming_job::Model {
195 job_id,
196 job_status: JobStatus::Initial,
197 create_type: create_type.into(),
198 timezone: ctx.timezone,
199 config_override: Some(ctx.config_override.to_string()),
200 adaptive_parallelism_strategy: adaptive_parallelism_strategy
201 .as_ref()
202 .map(ToString::to_string),
203 parallelism: streaming_parallelism,
204 backfill_parallelism,
205 backfill_adaptive_parallelism_strategy: backfill_adaptive_parallelism_strategy
206 .as_ref()
207 .map(ToString::to_string),
208 backfill_orders: None,
209 max_parallelism: max_parallelism as _,
210 specific_resource_group,
211 is_serverless_backfill,
212 refresh_interval_sec: refresh_interval_sec.map(|s| s as i64),
213 };
214 let job = model.clone().into_active_model();
215 StreamingJobModel::insert(job).exec(txn).await?;
216
217 Ok(model)
218 }
219
220 #[expect(clippy::too_many_arguments)]
226 #[await_tree::instrument]
227 pub async fn create_job_catalog(
228 &self,
229 streaming_job: &mut StreamingJob,
230 ctx: &StreamContext,
231 parallelism: &Option<Parallelism>,
232 max_parallelism: usize,
233 mut dependencies: HashSet<ObjectId>,
234 resource_type: streaming_job_resource_type::ResourceType,
235 backfill_parallelism: &Option<Parallelism>,
236 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
237 backfill_adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
238 refresh_interval_sec: Option<u64>,
239 ) -> MetaResult<streaming_job::Model> {
240 let inner = self.inner.write().await;
241 let txn = inner.db.begin().await?;
242 let create_type = streaming_job.create_type();
243
244 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
245 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
246 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
247 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
248 };
249 let backfill_parallelism = backfill_parallelism
250 .as_ref()
251 .map(|p| StreamingParallelism::Fixed(p.parallelism as _))
252 .or_else(|| {
253 backfill_adaptive_parallelism_strategy
254 .as_ref()
255 .map(|_| StreamingParallelism::Adaptive)
256 });
257
258 ensure_user_id(streaming_job.owner() as _, &txn).await?;
259 ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
260 ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
261 check_relation_name_duplicate(
262 &streaming_job.name(),
263 streaming_job.database_id(),
264 streaming_job.schema_id(),
265 &txn,
266 )
267 .await?;
268
269 if !dependencies.is_empty() {
271 let altering_cnt = ObjectDependency::find()
272 .join(
273 JoinType::InnerJoin,
274 object_dependency::Relation::Object1.def(),
275 )
276 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
277 .filter(
278 object_dependency::Column::Oid
279 .is_in(dependencies.clone())
280 .and(object::Column::ObjType.eq(ObjectType::Table))
281 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
282 .and(
283 object::Column::Oid.not_in_subquery(
285 Query::select()
286 .column(table::Column::TableId)
287 .from(Table)
288 .to_owned(),
289 ),
290 ),
291 )
292 .count(&txn)
293 .await?;
294 if altering_cnt != 0 {
295 return Err(MetaError::permission_denied(
296 "some dependent relations are being altered",
297 ));
298 }
299
300 let batch_refresh_cnt = StreamingJobModel::find()
303 .filter(
304 streaming_job::Column::JobId
305 .is_in(dependencies.iter().map(|id| JobId::new(id.as_raw_id())))
306 .and(streaming_job::Column::RefreshIntervalSec.is_not_null()),
307 )
308 .count(&txn)
309 .await?;
310 if batch_refresh_cnt != 0 {
311 return Err(MetaError::permission_denied(
312 "creating streaming jobs on batch refresh materialized views is not supported",
313 ));
314 }
315 }
316
317 let streaming_job_model = match streaming_job {
318 StreamingJob::MaterializedView(table) => {
319 let streaming_job_model = Self::create_streaming_job_obj(
320 &txn,
321 ObjectType::Table,
322 table.owner as _,
323 Some(table.database_id),
324 Some(table.schema_id),
325 create_type,
326 ctx.clone(),
327 adaptive_parallelism_strategy,
328 streaming_parallelism,
329 max_parallelism,
330 resource_type,
331 backfill_parallelism.clone(),
332 backfill_adaptive_parallelism_strategy,
333 refresh_interval_sec,
334 )
335 .await?;
336 table.id = streaming_job_model.job_id.as_mv_table_id();
337 let table_model: table::ActiveModel = table.clone().into();
338 Table::insert(table_model).exec(&txn).await?;
339 streaming_job_model
340 }
341 StreamingJob::Sink(sink) => {
342 if let Some(target_table_id) = sink.target_table
343 && check_sink_into_table_cycle(
344 target_table_id.into(),
345 dependencies.iter().cloned().collect(),
346 &txn,
347 )
348 .await?
349 {
350 bail!("Creating such a sink will result in circular dependency.");
351 }
352
353 let streaming_job_model = Self::create_streaming_job_obj(
354 &txn,
355 ObjectType::Sink,
356 sink.owner as _,
357 Some(sink.database_id),
358 Some(sink.schema_id),
359 create_type,
360 ctx.clone(),
361 adaptive_parallelism_strategy,
362 streaming_parallelism,
363 max_parallelism,
364 resource_type,
365 backfill_parallelism.clone(),
366 backfill_adaptive_parallelism_strategy,
367 None, )
369 .await?;
370 sink.id = streaming_job_model.job_id.as_sink_id();
371 let sink_model: sink::ActiveModel = sink.clone().into();
372 Sink::insert(sink_model).exec(&txn).await?;
373 streaming_job_model
374 }
375 StreamingJob::Table(src, table, _) => {
376 let streaming_job_model = Self::create_streaming_job_obj(
377 &txn,
378 ObjectType::Table,
379 table.owner as _,
380 Some(table.database_id),
381 Some(table.schema_id),
382 create_type,
383 ctx.clone(),
384 adaptive_parallelism_strategy,
385 streaming_parallelism,
386 max_parallelism,
387 resource_type,
388 backfill_parallelism.clone(),
389 backfill_adaptive_parallelism_strategy,
390 None, )
392 .await?;
393 let job_id = streaming_job_model.job_id;
394 table.id = job_id.as_mv_table_id();
395 if let Some(src) = src {
396 let src_obj = Self::create_object(
397 &txn,
398 ObjectType::Source,
399 src.owner as _,
400 Some(src.database_id),
401 Some(src.schema_id),
402 )
403 .await?;
404 src.id = src_obj.oid.as_source_id();
405 src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
406 table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
407 let source: source::ActiveModel = src.clone().into();
408 Source::insert(source).exec(&txn).await?;
409 }
410 let table_model: table::ActiveModel = table.clone().into();
411 Table::insert(table_model).exec(&txn).await?;
412
413 if table.refreshable {
414 let trigger_interval_secs = src
415 .as_ref()
416 .and_then(|source_catalog| source_catalog.refresh_mode)
417 .and_then(
418 |source_refresh_mode| match source_refresh_mode.refresh_mode {
419 Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
420 refresh_interval_sec,
421 })) => refresh_interval_sec,
422 Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
423 None => None,
424 },
425 );
426
427 RefreshJob::insert(refresh_job::ActiveModel {
428 table_id: Set(table.id),
429 last_trigger_time: Set(None),
430 trigger_interval_secs: Set(trigger_interval_secs),
431 current_status: Set(RefreshState::Idle),
432 last_success_time: Set(None),
433 })
434 .exec(&txn)
435 .await?;
436 }
437 streaming_job_model
438 }
439 StreamingJob::Index(index, table) => {
440 ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
441 let streaming_job_model = Self::create_streaming_job_obj(
442 &txn,
443 ObjectType::Index,
444 index.owner as _,
445 Some(index.database_id),
446 Some(index.schema_id),
447 create_type,
448 ctx.clone(),
449 adaptive_parallelism_strategy,
450 streaming_parallelism,
451 max_parallelism,
452 resource_type,
453 backfill_parallelism.clone(),
454 backfill_adaptive_parallelism_strategy,
455 None, )
457 .await?;
458 let job_id = streaming_job_model.job_id;
460 index.id = job_id.as_index_id();
461 index.index_table_id = job_id.as_mv_table_id();
462 table.id = job_id.as_mv_table_id();
463
464 ObjectDependency::insert(object_dependency::ActiveModel {
465 oid: Set(index.primary_table_id.into()),
466 used_by: Set(table.id.into()),
467 ..Default::default()
468 })
469 .exec(&txn)
470 .await?;
471
472 let table_model: table::ActiveModel = table.clone().into();
473 Table::insert(table_model).exec(&txn).await?;
474 let index_model: index::ActiveModel = index.clone().into();
475 Index::insert(index_model).exec(&txn).await?;
476 streaming_job_model
477 }
478 StreamingJob::Source(src) => {
479 let streaming_job_model = Self::create_streaming_job_obj(
480 &txn,
481 ObjectType::Source,
482 src.owner as _,
483 Some(src.database_id),
484 Some(src.schema_id),
485 create_type,
486 ctx.clone(),
487 adaptive_parallelism_strategy,
488 streaming_parallelism,
489 max_parallelism,
490 resource_type,
491 backfill_parallelism.clone(),
492 backfill_adaptive_parallelism_strategy,
493 None, )
495 .await?;
496 src.id = streaming_job_model.job_id.as_shared_source_id();
497 let source_model: source::ActiveModel = src.clone().into();
498 Source::insert(source_model).exec(&txn).await?;
499 streaming_job_model
500 }
501 };
502
503 dependencies.extend(
505 streaming_job
506 .dependent_secret_ids()?
507 .into_iter()
508 .map(|id| id.as_object_id()),
509 );
510 dependencies.extend(
512 streaming_job
513 .dependent_connection_ids()?
514 .into_iter()
515 .map(|id| id.as_object_id()),
516 );
517
518 if !dependencies.is_empty() {
520 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
521 object_dependency::ActiveModel {
522 oid: Set(oid),
523 used_by: Set(streaming_job.id().as_object_id()),
524 ..Default::default()
525 }
526 }))
527 .exec(&txn)
528 .await?;
529 }
530
531 txn.commit().await?;
532
533 Ok(streaming_job_model)
534 }
535
536 pub async fn create_internal_table_catalog(
544 &self,
545 job: &StreamingJob,
546 mut incomplete_internal_tables: Vec<PbTable>,
547 ) -> MetaResult<HashMap<TableId, TableId>> {
548 let job_id = job.id();
549 let inner = self.inner.write().await;
550 let txn = inner.db.begin().await?;
551
552 ensure_job_not_canceled(job_id, &txn).await?;
554
555 let mut table_id_map = HashMap::new();
556 for table in &mut incomplete_internal_tables {
557 let table_id = Self::create_object(
558 &txn,
559 ObjectType::Table,
560 table.owner as _,
561 Some(table.database_id),
562 Some(table.schema_id),
563 )
564 .await?
565 .oid
566 .as_table_id();
567 table_id_map.insert(table.id, table_id);
568 table.id = table_id;
569 table.job_id = Some(job_id);
570
571 let table_model = table::ActiveModel {
572 table_id: Set(table_id),
573 belongs_to_job_id: Set(Some(job_id)),
574 fragment_id: NotSet,
575 ..table.clone().into()
576 };
577 Table::insert(table_model).exec(&txn).await?;
578 }
579 txn.commit().await?;
580
581 Ok(table_id_map)
582 }
583
584 pub async fn prepare_stream_job_fragments(
585 &self,
586 stream_job_fragments: &StreamJobFragmentsToCreate,
587 streaming_job: &StreamingJob,
588 for_replace: bool,
589 backfill_orders: Option<BackfillOrders>,
590 ) -> MetaResult<()> {
591 self.prepare_streaming_job(
592 stream_job_fragments.stream_job_id(),
593 || stream_job_fragments.fragments.values(),
594 &stream_job_fragments.downstreams,
595 for_replace,
596 Some(streaming_job),
597 backfill_orders,
598 )
599 .await
600 }
601
602 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
607 )]
608 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
609 &self,
610 job_id: JobId,
611 get_fragments: impl Fn() -> I + 'a,
612 downstreams: &FragmentDownstreamRelation,
613 for_replace: bool,
614 creating_streaming_job: Option<&'a StreamingJob>,
615 backfill_orders: Option<BackfillOrders>,
616 ) -> MetaResult<()> {
617 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
618
619 let inner = self.inner.write().await;
620
621 let need_notify = creating_streaming_job
622 .map(|job| job.should_notify_creating())
623 .unwrap_or(false);
624 let definition = creating_streaming_job.map(|job| job.definition());
625
626 let mut objects_to_notify = vec![];
627 let txn = inner.db.begin().await?;
628
629 ensure_job_not_canceled(job_id, &txn).await?;
631
632 if let Some(backfill_orders) = backfill_orders {
633 let job = streaming_job::ActiveModel {
634 job_id: Set(job_id),
635 backfill_orders: Set(Some(backfill_orders)),
636 ..Default::default()
637 };
638 StreamingJobModel::update(job).exec(&txn).await?;
639 }
640
641 let state_table_ids = fragments
642 .iter()
643 .flat_map(|fragment| fragment.state_table_ids.inner_ref().clone())
644 .collect_vec();
645
646 let inserted_fragment_ids: Vec<crate::model::FragmentId> = fragments
648 .iter()
649 .map(|f| f.fragment_id as crate::model::FragmentId)
650 .collect();
651
652 if !fragments.is_empty() {
653 let fragment_models = fragments
654 .into_iter()
655 .map(|fragment| fragment.into_active_model())
656 .collect_vec();
657 Fragment::insert_many(fragment_models).exec(&txn).await?;
658 }
659
660 if !for_replace {
663 let all_tables = StreamJobFragments::collect_tables(get_fragments());
664 for state_table_id in state_table_ids {
665 let table = all_tables
669 .get(&state_table_id)
670 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
671 assert_eq!(table.id, state_table_id);
672 let vnode_count = table.vnode_count();
673
674 Table::update(table::ActiveModel {
675 table_id: Set(state_table_id as _),
676 fragment_id: Set(Some(table.fragment_id)),
677 vnode_count: Set(vnode_count as _),
678 ..Default::default()
679 })
680 .exec(&txn)
681 .await?;
682
683 if need_notify {
684 let mut table = table.clone();
685 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
687 table.definition = definition.clone().unwrap();
688 }
689 objects_to_notify.push(PbObject {
690 object_info: Some(PbObjectInfo::Table(table)),
691 });
692 }
693 }
694 }
695
696 if need_notify {
698 match creating_streaming_job.unwrap() {
699 StreamingJob::Table(Some(source), ..) => {
700 objects_to_notify.push(PbObject {
701 object_info: Some(PbObjectInfo::Source(source.clone())),
702 });
703 }
704 StreamingJob::Sink(sink) => {
705 objects_to_notify.push(PbObject {
706 object_info: Some(PbObjectInfo::Sink(sink.clone())),
707 });
708 }
709 StreamingJob::Index(index, _) => {
710 objects_to_notify.push(PbObject {
711 object_info: Some(PbObjectInfo::Index(index.clone())),
712 });
713 }
714 _ => {}
715 }
716 }
717
718 insert_fragment_relations(&txn, downstreams).await?;
719
720 if !for_replace {
721 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
723 Table::update(table::ActiveModel {
724 table_id: Set(table.id),
725 dml_fragment_id: Set(table.dml_fragment_id),
726 ..Default::default()
727 })
728 .exec(&txn)
729 .await?;
730 }
731 }
732
733 txn.commit().await?;
734
735 self.env
739 .notification_manager()
740 .notify_serving_fragment_mapping_update(inserted_fragment_ids);
741
742 if !objects_to_notify.is_empty() {
746 self.notify_frontend(
747 Operation::Add,
748 Info::ObjectGroup(PbObjectGroup {
749 objects: objects_to_notify,
750 dependencies: vec![],
751 }),
752 )
753 .await;
754 }
755
756 Ok(())
757 }
758
759 pub async fn build_cancel_command(
762 &self,
763 table_fragments: &StreamJobFragments,
764 ) -> MetaResult<Command> {
765 let inner = self.inner.read().await;
766 let txn = inner.db.begin().await?;
767
768 let dropped_sink_fragment_with_target =
769 if let Some(sink_fragment) = table_fragments.sink_fragment() {
770 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
771 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
772 sink_target_fragment
773 .get(&sink_fragment_id)
774 .map(|target_fragments| {
775 let target_fragment_id = *target_fragments
776 .first()
777 .expect("sink should have at least one downstream fragment");
778 (sink_fragment_id, target_fragment_id)
779 })
780 } else {
781 None
782 };
783
784 Ok(Command::DropStreamingJobs {
785 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
786 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
787 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
788 .into_iter()
789 .map(|(sink, target)| (target as _, vec![sink as _]))
790 .collect(),
791 })
792 }
793
794 #[await_tree::instrument]
798 pub async fn try_abort_creating_streaming_job(
799 &self,
800 mut job_id: JobId,
801 is_cancelled: bool,
802 ) -> MetaResult<(bool, Option<DatabaseId>)> {
803 let mut inner = self.inner.write().await;
804 let txn = inner.db.begin().await?;
805
806 let obj = Object::find_by_id(job_id).one(&txn).await?;
807 let Some(obj) = obj else {
808 tracing::warn!(
809 id = %job_id,
810 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
811 );
812 return Ok((true, None));
813 };
814 let database_id = obj
815 .database_id
816 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
817 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
818
819 if !is_cancelled && let Some(streaming_job) = &streaming_job {
820 assert_ne!(streaming_job.job_status, JobStatus::Created);
821 if streaming_job.create_type == CreateType::Background
822 && streaming_job.job_status == JobStatus::Creating
823 {
824 if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
825 && check_if_belongs_to_iceberg_table(&txn, job_id).await?
826 {
827 } else {
829 tracing::warn!(
831 id = %job_id,
832 "streaming job is created in background and still in creating status"
833 );
834 return Ok((false, Some(database_id)));
835 }
836 }
837 }
838
839 let original_job_id = job_id;
841 let original_obj_type = obj.obj_type;
842
843 let iceberg_table_id =
844 try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
845 if let Some(iceberg_table_id) = iceberg_table_id {
846 let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
849 Object::delete_many()
850 .filter(
851 object::Column::Oid
852 .eq(job_id)
853 .or(object::Column::Oid.is_in(internal_tables)),
854 )
855 .exec(&txn)
856 .await?;
857 job_id = iceberg_table_id.as_job_id();
858 };
859
860 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
861
862 let mut objs = vec![];
864 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
865
866 let mut need_notify =
867 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
868 if !need_notify {
869 if let Some(table) = &table_obj {
870 need_notify = table.table_type == TableType::MaterializedView;
871 } else if original_obj_type == ObjectType::Sink {
872 need_notify = true;
873 }
874 }
875
876 if is_cancelled {
877 let dropped_tables = Table::find()
878 .find_also_related(Object)
879 .filter(
880 table::Column::TableId.is_in(
881 internal_table_ids
882 .iter()
883 .cloned()
884 .chain(table_obj.iter().map(|t| t.table_id as _)),
885 ),
886 )
887 .all(&txn)
888 .await?
889 .into_iter()
890 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
891 inner
892 .dropped_tables
893 .extend(dropped_tables.map(|t| (t.id, t)));
894 }
895
896 if need_notify {
897 if original_obj_type == ObjectType::Sink && original_job_id != job_id {
900 let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
901 .select_only()
902 .columns([
903 object::Column::Oid,
904 object::Column::ObjType,
905 object::Column::SchemaId,
906 object::Column::DatabaseId,
907 ])
908 .into_partial_model()
909 .one(&txn)
910 .await?;
911 if let Some(orig_obj) = orig_obj {
912 objs.push(orig_obj);
913 }
914 }
915
916 let obj: Option<PartialObject> = Object::find_by_id(job_id)
917 .select_only()
918 .columns([
919 object::Column::Oid,
920 object::Column::ObjType,
921 object::Column::SchemaId,
922 object::Column::DatabaseId,
923 ])
924 .into_partial_model()
925 .one(&txn)
926 .await?;
927 let obj =
928 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
929 objs.push(obj);
930 let internal_table_objs: Vec<PartialObject> = Object::find()
931 .select_only()
932 .columns([
933 object::Column::Oid,
934 object::Column::ObjType,
935 object::Column::SchemaId,
936 object::Column::DatabaseId,
937 ])
938 .join(JoinType::InnerJoin, object::Relation::Table.def())
939 .filter(table::Column::BelongsToJobId.eq(job_id))
940 .into_partial_model()
941 .all(&txn)
942 .await?;
943 objs.extend(internal_table_objs);
944 }
945
946 let abort_fragment_ids: Vec<FragmentId> = Fragment::find()
948 .select_only()
949 .column(fragment::Column::FragmentId)
950 .filter(fragment::Column::JobId.eq(job_id))
951 .into_tuple()
952 .all(&txn)
953 .await?;
954
955 Object::delete_by_id(job_id).exec(&txn).await?;
956 if !internal_table_ids.is_empty() {
957 Object::delete_many()
958 .filter(object::Column::Oid.is_in(internal_table_ids))
959 .exec(&txn)
960 .await?;
961 }
962 if let Some(t) = &table_obj
963 && let Some(source_id) = t.optional_associated_source_id
964 {
965 Object::delete_by_id(source_id).exec(&txn).await?;
966 }
967
968 let err = if is_cancelled {
969 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
970 } else {
971 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
972 };
973 let abort_reason = format!("streaming job aborted {}", err.as_report());
974 for tx in inner
975 .creating_table_finish_notifier
976 .get_mut(&database_id)
977 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
978 .into_iter()
979 .flatten()
980 .flatten()
981 {
982 let _ = tx.send(Err(abort_reason.clone()));
983 }
984 txn.commit().await?;
985
986 self.env
988 .notification_manager()
989 .notify_serving_fragment_mapping_delete(
990 abort_fragment_ids.iter().map(|id| *id as _).collect(),
991 );
992
993 if !objs.is_empty() {
994 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
997 .await;
998 }
999 Ok((true, Some(database_id)))
1000 }
1001
1002 #[await_tree::instrument]
1003 pub async fn post_collect_job_fragments(
1004 &self,
1005 job_id: JobId,
1006 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
1007 new_sink_downstream: Option<FragmentDownstreamRelation>,
1008 split_assignment: Option<&SplitAssignment>,
1009 ) -> MetaResult<()> {
1010 let inner = self.inner.write().await;
1011 let txn = inner.db.begin().await?;
1012
1013 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
1014
1015 if let Some(new_downstream) = new_sink_downstream {
1016 insert_fragment_relations(&txn, &new_downstream).await?;
1017 }
1018
1019 StreamingJobModel::update(streaming_job::ActiveModel {
1021 job_id: Set(job_id),
1022 job_status: Set(JobStatus::Creating),
1023 ..Default::default()
1024 })
1025 .exec(&txn)
1026 .await?;
1027
1028 if let Some(split_assignment) = split_assignment {
1029 let fragment_splits = split_assignment
1030 .iter()
1031 .map(|(fragment_id, splits)| {
1032 (
1033 *fragment_id as _,
1034 splits.values().flatten().cloned().collect_vec(),
1035 )
1036 })
1037 .collect();
1038
1039 self.update_fragment_splits(&txn, &fragment_splits).await?;
1040 }
1041
1042 txn.commit().await?;
1043
1044 Ok(())
1045 }
1046
1047 pub async fn create_job_catalog_for_replace(
1048 &self,
1049 streaming_job: &StreamingJob,
1050 ctx: Option<&StreamContext>,
1051 specified_parallelism: Option<&NonZeroUsize>,
1052 expected_original_max_parallelism: Option<usize>,
1053 ) -> MetaResult<streaming_job::Model> {
1054 let id = streaming_job.id();
1055 let inner = self.inner.write().await;
1056 let txn = inner.db.begin().await?;
1057
1058 streaming_job.verify_version_for_replace(&txn).await?;
1060 let referring_cnt = ObjectDependency::find()
1062 .join(
1063 JoinType::InnerJoin,
1064 object_dependency::Relation::Object1.def(),
1065 )
1066 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
1067 .filter(
1068 object_dependency::Column::Oid
1069 .eq(id)
1070 .and(object::Column::ObjType.eq(ObjectType::Table))
1071 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
1072 )
1073 .count(&txn)
1074 .await?;
1075 if referring_cnt != 0 {
1076 return Err(MetaError::permission_denied(
1077 "job is being altered or referenced by some creating jobs",
1078 ));
1079 }
1080
1081 let batch_refresh_dep_cnt = ObjectDependency::find()
1084 .join(
1085 JoinType::InnerJoin,
1086 object_dependency::Relation::Object1.def(),
1087 )
1088 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
1089 .filter(
1090 object_dependency::Column::Oid
1091 .eq(id)
1092 .and(streaming_job::Column::RefreshIntervalSec.is_not_null()),
1093 )
1094 .count(&txn)
1095 .await?;
1096 if batch_refresh_dep_cnt != 0 {
1097 return Err(MetaError::permission_denied(
1098 "replacing a table with dependent batch refresh materialized views is not supported",
1099 ));
1100 }
1101
1102 let original_job = StreamingJobModel::find_by_id(id)
1104 .one(&txn)
1105 .await?
1106 .map(ReplaceOriginalJobInfo::from)
1107 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
1108
1109 if let Some(max_parallelism) = expected_original_max_parallelism
1110 && original_job.max_parallelism != max_parallelism as i32
1111 {
1112 bail!(
1115 "cannot use a different max parallelism \
1116 when replacing streaming job, \
1117 original: {}, new: {}",
1118 original_job.max_parallelism,
1119 max_parallelism
1120 );
1121 }
1122
1123 let parallelism = original_job.resolved_parallelism(specified_parallelism);
1124 let ctx = original_job.stream_context(ctx);
1125 let adaptive_parallelism_strategy = original_job
1126 .adaptive_parallelism_strategy
1127 .as_deref()
1128 .map(|s| parse_strategy(s).expect("strategy should be validated before persisting"));
1129 let resource_type = original_job.resource_type();
1130
1131 let tmp_model = Self::create_streaming_job_obj(
1133 &txn,
1134 streaming_job.object_type(),
1135 streaming_job.owner() as _,
1136 Some(streaming_job.database_id() as _),
1137 Some(streaming_job.schema_id() as _),
1138 streaming_job.create_type(),
1139 ctx,
1140 adaptive_parallelism_strategy,
1141 parallelism,
1142 original_job.max_parallelism as _,
1143 resource_type,
1144 None,
1148 None,
1149 None, )
1151 .await?;
1152
1153 ObjectDependency::insert(object_dependency::ActiveModel {
1155 oid: Set(id.as_object_id()),
1156 used_by: Set(tmp_model.job_id.as_object_id()),
1157 ..Default::default()
1158 })
1159 .exec(&txn)
1160 .await?;
1161
1162 txn.commit().await?;
1163
1164 Ok(tmp_model)
1165 }
1166
1167 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1169 let mut inner = self.inner.write().await;
1170 let txn = inner.db.begin().await?;
1171
1172 if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1174 tracing::info!(
1175 "streaming job {} is for iceberg table, wait for manual finish operation",
1176 job_id
1177 );
1178 return Ok(());
1179 }
1180
1181 let (notification_op, objects, updated_user_info, dependencies) =
1182 self.finish_streaming_job_inner(&txn, job_id).await?;
1183
1184 txn.commit().await?;
1185
1186 let mut version = self
1187 .notify_frontend(
1188 notification_op,
1189 NotificationInfo::ObjectGroup(PbObjectGroup {
1190 objects,
1191 dependencies,
1192 }),
1193 )
1194 .await;
1195
1196 if !updated_user_info.is_empty() {
1198 version = self.notify_users_update(updated_user_info).await;
1199 }
1200
1201 inner
1202 .creating_table_finish_notifier
1203 .values_mut()
1204 .for_each(|creating_tables| {
1205 if let Some(txs) = creating_tables.remove(&job_id) {
1206 for tx in txs {
1207 let _ = tx.send(Ok(version));
1208 }
1209 }
1210 });
1211
1212 Ok(())
1213 }
1214
1215 pub async fn finish_streaming_job_inner(
1217 &self,
1218 txn: &DatabaseTransaction,
1219 job_id: JobId,
1220 ) -> MetaResult<(
1221 Operation,
1222 Vec<risingwave_pb::meta::Object>,
1223 Vec<PbUserInfo>,
1224 Vec<PbObjectDependency>,
1225 )> {
1226 let job_type = Object::find_by_id(job_id)
1227 .select_only()
1228 .column(object::Column::ObjType)
1229 .into_tuple()
1230 .one(txn)
1231 .await?
1232 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1233
1234 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1235 .select_only()
1236 .column(streaming_job::Column::CreateType)
1237 .into_tuple()
1238 .one(txn)
1239 .await?
1240 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1241
1242 let res = Object::update_many()
1244 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1245 .col_expr(
1246 object::Column::CreatedAtClusterVersion,
1247 current_cluster_version().into(),
1248 )
1249 .filter(object::Column::Oid.eq(job_id))
1250 .exec(txn)
1251 .await?;
1252 if res.rows_affected == 0 {
1253 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1254 }
1255
1256 let job = streaming_job::ActiveModel {
1258 job_id: Set(job_id),
1259 job_status: Set(JobStatus::Created),
1260 ..Default::default()
1261 };
1262 let streaming_job = Some(job.update(txn).await?);
1263
1264 let internal_table_objs = Table::find()
1266 .find_also_related(Object)
1267 .filter(table::Column::BelongsToJobId.eq(job_id))
1268 .all(txn)
1269 .await?;
1270 let mut objects = internal_table_objs
1271 .iter()
1272 .map(|(table, obj)| PbObject {
1273 object_info: Some(PbObjectInfo::Table(
1274 ObjectModel(table.clone(), obj.clone().unwrap(), streaming_job.clone()).into(),
1275 )),
1276 })
1277 .collect_vec();
1278 let mut notification_op = if create_type == CreateType::Background {
1279 NotificationOperation::Update
1280 } else {
1281 NotificationOperation::Add
1282 };
1283 let mut updated_user_info = vec![];
1284 let mut need_grant_default_privileges = true;
1285
1286 match job_type {
1287 ObjectType::Table => {
1288 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1289 .find_also_related(Object)
1290 .one(txn)
1291 .await?
1292 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1293 if table.table_type == TableType::MaterializedView {
1294 notification_op = NotificationOperation::Update;
1295 }
1296
1297 if let Some(source_id) = table.optional_associated_source_id {
1298 let (src, obj) = Source::find_by_id(source_id)
1299 .find_also_related(Object)
1300 .one(txn)
1301 .await?
1302 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1303 objects.push(PbObject {
1304 object_info: Some(PbObjectInfo::Source(
1305 ObjectModel(src, obj.unwrap(), None).into(),
1306 )),
1307 });
1308 }
1309 objects.push(PbObject {
1310 object_info: Some(PbObjectInfo::Table(
1311 ObjectModel(table, obj.unwrap(), streaming_job).into(),
1312 )),
1313 });
1314 }
1315 ObjectType::Sink => {
1316 let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1317 .find_also_related(Object)
1318 .one(txn)
1319 .await?
1320 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1321 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1322 need_grant_default_privileges = false;
1323 }
1324 notification_op = NotificationOperation::Update;
1327 objects.push(PbObject {
1328 object_info: Some(PbObjectInfo::Sink(
1329 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
1330 )),
1331 });
1332 }
1333 ObjectType::Index => {
1334 need_grant_default_privileges = false;
1335 let (index, obj) = Index::find_by_id(job_id.as_index_id())
1336 .find_also_related(Object)
1337 .one(txn)
1338 .await?
1339 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1340 {
1341 let (table, obj) = Table::find_by_id(index.index_table_id)
1342 .find_also_related(Object)
1343 .one(txn)
1344 .await?
1345 .ok_or_else(|| {
1346 MetaError::catalog_id_not_found("table", index.index_table_id)
1347 })?;
1348 objects.push(PbObject {
1349 object_info: Some(PbObjectInfo::Table(
1350 ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1351 )),
1352 });
1353 }
1354
1355 let primary_table_privileges = UserPrivilege::find()
1358 .filter(
1359 user_privilege::Column::Oid
1360 .eq(index.primary_table_id)
1361 .and(user_privilege::Column::Action.eq(Action::Select)),
1362 )
1363 .all(txn)
1364 .await?;
1365 if !primary_table_privileges.is_empty() {
1366 let index_state_table_ids: Vec<TableId> = Table::find()
1367 .select_only()
1368 .column(table::Column::TableId)
1369 .filter(
1370 table::Column::BelongsToJobId
1371 .eq(job_id)
1372 .or(table::Column::TableId.eq(index.index_table_id)),
1373 )
1374 .into_tuple()
1375 .all(txn)
1376 .await?;
1377 let mut new_privileges = vec![];
1378 for privilege in &primary_table_privileges {
1379 for state_table_id in &index_state_table_ids {
1380 new_privileges.push(user_privilege::ActiveModel {
1381 id: Default::default(),
1382 oid: Set(state_table_id.as_object_id()),
1383 user_id: Set(privilege.user_id),
1384 action: Set(Action::Select),
1385 dependent_id: Set(privilege.dependent_id),
1386 granted_by: Set(privilege.granted_by),
1387 with_grant_option: Set(privilege.with_grant_option),
1388 });
1389 }
1390 }
1391 UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1392
1393 updated_user_info = list_user_info_by_ids(
1394 primary_table_privileges.into_iter().map(|p| p.user_id),
1395 txn,
1396 )
1397 .await?;
1398 }
1399
1400 objects.push(PbObject {
1401 object_info: Some(PbObjectInfo::Index(
1402 ObjectModel(index, obj.unwrap(), streaming_job).into(),
1403 )),
1404 });
1405 }
1406 ObjectType::Source => {
1407 let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1408 .find_also_related(Object)
1409 .one(txn)
1410 .await?
1411 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1412 objects.push(PbObject {
1413 object_info: Some(PbObjectInfo::Source(
1414 ObjectModel(source, obj.unwrap(), None).into(),
1415 )),
1416 });
1417 }
1418 _ => unreachable!("invalid job type: {:?}", job_type),
1419 }
1420
1421 if need_grant_default_privileges {
1422 updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1423 }
1424
1425 let dependencies =
1426 list_object_dependencies_by_object_id(txn, job_id.as_object_id()).await?;
1427
1428 Ok((notification_op, objects, updated_user_info, dependencies))
1429 }
1430
1431 pub async fn finish_replace_streaming_job(
1432 &self,
1433 tmp_id: JobId,
1434 streaming_job: StreamingJob,
1435 replace_upstream: FragmentReplaceUpstream,
1436 sink_into_table_context: SinkIntoTableContext,
1437 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1438 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1439 ) -> MetaResult<NotificationVersion> {
1440 let inner = self.inner.write().await;
1441 let txn = inner.db.begin().await?;
1442
1443 let (objects, delete_notification_objs, old_fragment_ids, new_fragment_ids) =
1444 Self::finish_replace_streaming_job_inner(
1445 tmp_id,
1446 replace_upstream,
1447 sink_into_table_context,
1448 &txn,
1449 streaming_job,
1450 drop_table_connector_ctx,
1451 auto_refresh_schema_sinks,
1452 )
1453 .await?;
1454
1455 txn.commit().await?;
1456
1457 let notification_manager = self.env.notification_manager();
1459 notification_manager.notify_serving_fragment_mapping_delete(
1460 old_fragment_ids.iter().map(|id| *id as _).collect(),
1461 );
1462 notification_manager.notify_serving_fragment_mapping_update(
1463 new_fragment_ids.iter().map(|id| *id as _).collect(),
1464 );
1465
1466 let mut version = self
1467 .notify_frontend(
1468 NotificationOperation::Update,
1469 NotificationInfo::ObjectGroup(PbObjectGroup {
1470 objects,
1471 dependencies: vec![],
1472 }),
1473 )
1474 .await;
1475
1476 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1477 self.notify_users_update(user_infos).await;
1478 version = self
1479 .notify_frontend(
1480 NotificationOperation::Delete,
1481 build_object_group_for_delete(to_drop_objects),
1482 )
1483 .await;
1484 }
1485
1486 Ok(version)
1487 }
1488
1489 fn update_iceberg_source_columns(
1490 original_source_columns: &[ColumnCatalog],
1491 original_row_id_index: Option<usize>,
1492 new_table_columns: &[ColumnCatalog],
1493 ) -> (Vec<ColumnCatalog>, Option<usize>) {
1494 let row_id_column_name = original_row_id_index
1495 .and_then(|idx| original_source_columns.get(idx))
1496 .map(|col| col.name().to_owned());
1497 let mut next_column_id = max_column_id(original_source_columns).next();
1498 let existing_columns: HashMap<String, ColumnCatalog> = original_source_columns
1499 .iter()
1500 .cloned()
1501 .map(|col| (col.name().to_owned(), col))
1502 .collect();
1503
1504 let mut new_columns = Vec::new();
1505 for table_col in new_table_columns
1506 .iter()
1507 .filter(|col| !col.is_rw_sys_column())
1508 {
1509 let mut source_col_name = table_col.name().to_owned();
1510 if source_col_name == ROW_ID_COLUMN_NAME {
1511 source_col_name = RISINGWAVE_ICEBERG_ROW_ID.to_owned();
1512 }
1513
1514 if let Some(existing) = existing_columns.get(&source_col_name) {
1515 new_columns.push(existing.clone());
1516 } else {
1517 let mut new_col = table_col.clone();
1518 new_col.column_desc.name = source_col_name;
1519 new_col.column_desc.column_id = next_column_id;
1520 next_column_id = next_column_id.next();
1521 new_columns.push(new_col);
1522 }
1523 }
1524
1525 let mut seen_names: HashSet<String> = new_columns
1526 .iter()
1527 .map(|col| col.name().to_owned())
1528 .collect();
1529 for col in original_source_columns
1530 .iter()
1531 .filter(|col| col.is_iceberg_hidden_column())
1532 {
1533 if seen_names.insert(col.name().to_owned()) {
1534 new_columns.push(col.clone());
1535 }
1536 }
1537
1538 let new_row_id_index = row_id_column_name
1539 .as_ref()
1540 .and_then(|name| new_columns.iter().position(|col| col.name() == name));
1541
1542 (new_columns, new_row_id_index)
1543 }
1544
1545 pub async fn finish_replace_streaming_job_inner(
1546 tmp_id: JobId,
1547 replace_upstream: FragmentReplaceUpstream,
1548 SinkIntoTableContext {
1549 updated_sink_catalogs,
1550 }: SinkIntoTableContext,
1551 txn: &DatabaseTransaction,
1552 streaming_job: StreamingJob,
1553 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1554 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1555 ) -> MetaResult<(
1556 Vec<PbObject>,
1557 Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1558 Vec<FragmentId>,
1559 Vec<FragmentId>,
1560 )> {
1561 let original_job_id = streaming_job.id();
1562 let job_type = streaming_job.job_type();
1563
1564 let old_fragment_ids: Vec<FragmentId> = Fragment::find()
1566 .select_only()
1567 .column(fragment::Column::FragmentId)
1568 .filter(fragment::Column::JobId.eq(original_job_id))
1569 .into_tuple()
1570 .all(txn)
1571 .await?;
1572 let new_fragment_ids: Vec<FragmentId> = Fragment::find()
1573 .select_only()
1574 .column(fragment::Column::FragmentId)
1575 .filter(fragment::Column::JobId.eq(tmp_id))
1576 .into_tuple()
1577 .all(txn)
1578 .await?;
1579
1580 let mut index_item_rewriter = None;
1581 let mut updated_iceberg_source_id: Option<SourceId> = None;
1582
1583 match streaming_job {
1585 StreamingJob::Table(_, table, _table_job_type) => {
1586 let original_column_catalogs =
1587 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1588 let schema_changed = original_column_catalogs.to_protobuf() != table.columns;
1589 let is_iceberg = table
1590 .engine
1591 .and_then(|engine| PbEngine::try_from(engine).ok())
1592 == Some(PbEngine::Iceberg);
1593 if is_iceberg && schema_changed {
1594 let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1595 let source = Source::find()
1596 .inner_join(Object)
1597 .filter(
1598 object::Column::DatabaseId
1599 .eq(table.database_id)
1600 .and(object::Column::SchemaId.eq(table.schema_id))
1601 .and(source::Column::Name.eq(&iceberg_source_name)),
1602 )
1603 .one(txn)
1604 .await?
1605 .ok_or_else(|| {
1606 MetaError::catalog_id_not_found("source", iceberg_source_name)
1607 })?;
1608
1609 let source_id = source.source_id;
1610 let source_version = source.version;
1611 let source_columns: Vec<ColumnCatalog> = source
1612 .columns
1613 .to_protobuf()
1614 .into_iter()
1615 .map(ColumnCatalog::from)
1616 .collect();
1617 let source_row_id_index = source
1618 .row_id_index
1619 .and_then(|idx| usize::try_from(idx).ok());
1620 let table_columns: Vec<ColumnCatalog> = table
1621 .columns
1622 .iter()
1623 .cloned()
1624 .map(ColumnCatalog::from)
1625 .collect();
1626 let (updated_columns, updated_row_id_index) =
1627 Self::update_iceberg_source_columns(
1628 &source_columns,
1629 source_row_id_index,
1630 &table_columns,
1631 );
1632 let updated_columns: Vec<PbColumnCatalog> = updated_columns
1633 .into_iter()
1634 .map(|col| col.to_protobuf())
1635 .collect();
1636
1637 let mut source_active = source.into_active_model();
1638 source_active.columns = Set(ColumnCatalogArray::from(updated_columns));
1639 source_active.row_id_index = Set(updated_row_id_index.map(|idx| idx as i32));
1640 source_active.version = Set(source_version + 1);
1641 source_active.update(txn).await?;
1642 updated_iceberg_source_id = Some(source_id);
1643 }
1644
1645 index_item_rewriter = Some({
1646 let original_columns = original_column_catalogs
1647 .to_protobuf()
1648 .into_iter()
1649 .map(|c| c.column_desc.unwrap())
1650 .collect_vec();
1651 let new_columns = table
1652 .columns
1653 .iter()
1654 .map(|c| c.column_desc.clone().unwrap())
1655 .collect_vec();
1656
1657 IndexItemRewriter {
1658 original_columns,
1659 new_columns,
1660 }
1661 });
1662
1663 for sink_id in updated_sink_catalogs {
1665 Sink::update(sink::ActiveModel {
1666 sink_id: Set(sink_id as _),
1667 original_target_columns: Set(Some(original_column_catalogs.clone())),
1668 ..Default::default()
1669 })
1670 .exec(txn)
1671 .await?;
1672 }
1673 let mut table = table::ActiveModel::from(table);
1675 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1676 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1677 {
1678 table.optional_associated_source_id = Set(None);
1680 }
1681
1682 Table::update(table).exec(txn).await?;
1683 }
1684 StreamingJob::Source(source) => {
1685 let source = source::ActiveModel::from(source);
1687 Source::update(source).exec(txn).await?;
1688 }
1689 StreamingJob::MaterializedView(table) => {
1690 let table = table::ActiveModel::from(table);
1692 Table::update(table).exec(txn).await?;
1693 }
1694 _ => unreachable!(
1695 "invalid streaming job type: {:?}",
1696 streaming_job.job_type_str()
1697 ),
1698 }
1699
1700 async fn finish_fragments(
1701 txn: &DatabaseTransaction,
1702 tmp_id: JobId,
1703 original_job_id: JobId,
1704 replace_upstream: FragmentReplaceUpstream,
1705 ) -> MetaResult<()> {
1706 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1710 .select_only()
1711 .columns([
1712 fragment::Column::FragmentId,
1713 fragment::Column::StateTableIds,
1714 ])
1715 .filter(fragment::Column::JobId.eq(tmp_id))
1716 .into_tuple()
1717 .all(txn)
1718 .await?;
1719 for (fragment_id, state_table_ids) in fragment_info {
1720 for state_table_id in state_table_ids.into_inner() {
1721 let state_table_id = TableId::new(state_table_id as _);
1722 Table::update(table::ActiveModel {
1723 table_id: Set(state_table_id),
1724 fragment_id: Set(Some(fragment_id)),
1725 ..Default::default()
1727 })
1728 .exec(txn)
1729 .await?;
1730 }
1731 }
1732
1733 Fragment::delete_many()
1735 .filter(fragment::Column::JobId.eq(original_job_id))
1736 .exec(txn)
1737 .await?;
1738 Fragment::update_many()
1739 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1740 .filter(fragment::Column::JobId.eq(tmp_id))
1741 .exec(txn)
1742 .await?;
1743
1744 for (fragment_id, fragment_replace_map) in replace_upstream {
1747 let (fragment_id, mut stream_node) =
1748 Fragment::find_by_id(fragment_id as FragmentId)
1749 .select_only()
1750 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1751 .into_tuple::<(FragmentId, StreamNode)>()
1752 .one(txn)
1753 .await?
1754 .map(|(id, node)| (id, node.to_protobuf()))
1755 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1756
1757 visit_stream_node_mut(&mut stream_node, |body| {
1758 if let PbNodeBody::Merge(m) = body
1759 && let Some(new_fragment_id) =
1760 fragment_replace_map.get(&m.upstream_fragment_id)
1761 {
1762 m.upstream_fragment_id = *new_fragment_id;
1763 }
1764 });
1765 Fragment::update(fragment::ActiveModel {
1766 fragment_id: Set(fragment_id),
1767 stream_node: Set(StreamNode::from(&stream_node)),
1768 ..Default::default()
1769 })
1770 .exec(txn)
1771 .await?;
1772 }
1773
1774 Object::delete_by_id(tmp_id).exec(txn).await?;
1776
1777 Ok(())
1778 }
1779
1780 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1781
1782 let mut objects = vec![];
1784 match job_type {
1785 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1786 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1787 .find_also_related(Object)
1788 .one(txn)
1789 .await?
1790 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1791 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
1792 .one(txn)
1793 .await?;
1794 objects.push(PbObject {
1795 object_info: Some(PbObjectInfo::Table(
1796 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
1797 )),
1798 })
1799 }
1800 StreamingJobType::Source => {
1801 let (source, source_obj) =
1802 Source::find_by_id(original_job_id.as_shared_source_id())
1803 .find_also_related(Object)
1804 .one(txn)
1805 .await?
1806 .ok_or_else(|| {
1807 MetaError::catalog_id_not_found("object", original_job_id)
1808 })?;
1809 objects.push(PbObject {
1810 object_info: Some(PbObjectInfo::Source(
1811 ObjectModel(source, source_obj.unwrap(), None).into(),
1812 )),
1813 })
1814 }
1815 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1816 }
1817
1818 if let Some(source_id) = updated_iceberg_source_id {
1819 let (source, source_obj) = Source::find_by_id(source_id)
1820 .find_also_related(Object)
1821 .one(txn)
1822 .await?
1823 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1824 objects.push(PbObject {
1825 object_info: Some(PbObjectInfo::Source(
1826 ObjectModel(source, source_obj.unwrap(), None).into(),
1827 )),
1828 });
1829 }
1830
1831 if let Some(expr_rewriter) = index_item_rewriter {
1832 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1833 .select_only()
1834 .columns([index::Column::IndexId, index::Column::IndexItems])
1835 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1836 .into_tuple()
1837 .all(txn)
1838 .await?;
1839 for (index_id, nodes) in index_items {
1840 let mut pb_nodes = nodes.to_protobuf();
1841 pb_nodes
1842 .iter_mut()
1843 .for_each(|x| expr_rewriter.rewrite_expr(x));
1844 let index = index::ActiveModel {
1845 index_id: Set(index_id),
1846 index_items: Set(pb_nodes.into()),
1847 ..Default::default()
1848 }
1849 .update(txn)
1850 .await?;
1851 let (index_obj, streaming_job) = Object::find_by_id(index.index_id)
1852 .find_also_related(streaming_job::Entity)
1853 .one(txn)
1854 .await?
1855 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1856 objects.push(PbObject {
1857 object_info: Some(PbObjectInfo::Index(
1858 ObjectModel(index, index_obj, streaming_job).into(),
1859 )),
1860 });
1861 }
1862 }
1863
1864 if let Some(sinks) = auto_refresh_schema_sinks {
1865 for finish_sink_context in sinks {
1866 finish_fragments(
1867 txn,
1868 finish_sink_context.tmp_sink_id.as_job_id(),
1869 finish_sink_context.original_sink_id.as_job_id(),
1870 Default::default(),
1871 )
1872 .await?;
1873 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1874 .find_also_related(Object)
1875 .one(txn)
1876 .await?
1877 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1878 let sink_streaming_job =
1879 streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
1880 .one(txn)
1881 .await?;
1882 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1883 Sink::update(sink::ActiveModel {
1884 sink_id: Set(finish_sink_context.original_sink_id),
1885 columns: Set(columns.clone()),
1886 ..Default::default()
1887 })
1888 .exec(txn)
1889 .await?;
1890 sink.columns = columns;
1891 objects.push(PbObject {
1892 object_info: Some(PbObjectInfo::Sink(
1893 ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job.clone()).into(),
1894 )),
1895 });
1896 if let Some(new_log_store_table) = finish_sink_context.new_log_store_table {
1897 let log_store_table_id = new_log_store_table.id;
1898 let new_log_store_table_columns: ColumnCatalogArray =
1899 new_log_store_table.columns.clone().into();
1900 let new_log_store_table_value_indices =
1901 new_log_store_table.value_indices.clone();
1902 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1903 .find_also_related(Object)
1904 .one(txn)
1905 .await?
1906 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1907 Table::update(table::ActiveModel {
1908 table_id: Set(log_store_table_id),
1909 columns: Set(new_log_store_table_columns.clone()),
1910 value_indices: Set(new_log_store_table_value_indices.clone().into()),
1911 ..Default::default()
1912 })
1913 .exec(txn)
1914 .await?;
1915 table.columns = new_log_store_table_columns;
1916 table.value_indices = new_log_store_table_value_indices.into();
1917 objects.push(PbObject {
1918 object_info: Some(PbObjectInfo::Table(
1919 ObjectModel(table, table_obj.unwrap(), sink_streaming_job.clone())
1920 .into(),
1921 )),
1922 });
1923 }
1924 }
1925 }
1926
1927 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1928 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1929 notification_objs =
1930 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1931 }
1932
1933 Ok((
1934 objects,
1935 notification_objs,
1936 old_fragment_ids,
1937 new_fragment_ids,
1938 ))
1939 }
1940
1941 pub async fn try_abort_replacing_streaming_job(
1943 &self,
1944 tmp_job_id: JobId,
1945 tmp_sink_ids: Option<Vec<ObjectId>>,
1946 ) -> MetaResult<()> {
1947 let inner = self.inner.write().await;
1948 let txn = inner.db.begin().await?;
1949
1950 let mut all_job_ids: Vec<ObjectId> = vec![tmp_job_id.into()];
1952 if let Some(ref sink_ids) = tmp_sink_ids {
1953 all_job_ids.extend(sink_ids.iter().copied());
1954 }
1955 let abort_fragment_ids: Vec<FragmentId> = Fragment::find()
1956 .select_only()
1957 .column(fragment::Column::FragmentId)
1958 .filter(fragment::Column::JobId.is_in(all_job_ids))
1959 .into_tuple()
1960 .all(&txn)
1961 .await?;
1962
1963 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1964 if let Some(tmp_sink_ids) = tmp_sink_ids {
1965 for tmp_sink_id in tmp_sink_ids {
1966 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1967 }
1968 }
1969 txn.commit().await?;
1970
1971 self.env
1973 .notification_manager()
1974 .notify_serving_fragment_mapping_delete(
1975 abort_fragment_ids.iter().map(|id| *id as _).collect(),
1976 );
1977
1978 Ok(())
1979 }
1980
1981 pub async fn update_source_rate_limit_by_source_id(
1984 &self,
1985 source_id: SourceId,
1986 rate_limit: Option<u32>,
1987 ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1988 let inner = self.inner.read().await;
1989 let txn = inner.db.begin().await?;
1990
1991 {
1992 let active_source = source::ActiveModel {
1993 source_id: Set(source_id),
1994 rate_limit: Set(rate_limit.map(|v| v as i32)),
1995 ..Default::default()
1996 };
1997 Source::update(active_source).exec(&txn).await?;
1998 }
1999
2000 let (source, obj) = Source::find_by_id(source_id)
2001 .find_also_related(Object)
2002 .one(&txn)
2003 .await?
2004 .ok_or_else(|| {
2005 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2006 })?;
2007
2008 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
2009 let streaming_job_ids: Vec<JobId> =
2010 if let Some(table_id) = source.optional_associated_table_id {
2011 vec![table_id.as_job_id()]
2012 } else if let Some(source_info) = &source.source_info
2013 && source_info.to_protobuf().is_shared()
2014 {
2015 vec![source_id.as_share_source_job_id()]
2016 } else {
2017 ObjectDependency::find()
2018 .select_only()
2019 .column(object_dependency::Column::UsedBy)
2020 .filter(object_dependency::Column::Oid.eq(source_id))
2021 .into_tuple()
2022 .all(&txn)
2023 .await?
2024 };
2025
2026 if streaming_job_ids.is_empty() {
2027 return Err(MetaError::invalid_parameter(format!(
2028 "source id {source_id} not used by any streaming job"
2029 )));
2030 }
2031
2032 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2033 .select_only()
2034 .columns([
2035 fragment::Column::FragmentId,
2036 fragment::Column::JobId,
2037 fragment::Column::FragmentTypeMask,
2038 fragment::Column::StreamNode,
2039 ])
2040 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
2041 .into_tuple()
2042 .all(&txn)
2043 .await?;
2044 let mut fragments = fragments
2045 .into_iter()
2046 .map(|(id, job_id, mask, stream_node)| {
2047 (
2048 id,
2049 job_id,
2050 FragmentTypeMask::from(mask as u32),
2051 stream_node.to_protobuf(),
2052 )
2053 })
2054 .collect_vec();
2055
2056 fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
2057 let mut found = false;
2058 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
2059 visit_stream_node_mut(stream_node, |node| {
2060 if let PbNodeBody::Source(node) = node
2061 && let Some(node_inner) = &mut node.source_inner
2062 && node_inner.source_id == source_id
2063 {
2064 node_inner.rate_limit = rate_limit;
2065 found = true;
2066 }
2067 });
2068 }
2069 if is_fs_source {
2070 visit_stream_node_mut(stream_node, |node| {
2073 if let PbNodeBody::StreamFsFetch(node) = node {
2074 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
2075 if let Some(node_inner) = &mut node.node_inner
2076 && node_inner.source_id == source_id
2077 {
2078 node_inner.rate_limit = rate_limit;
2079 found = true;
2080 }
2081 }
2082 });
2083 }
2084 found
2085 });
2086
2087 assert!(
2088 !fragments.is_empty(),
2089 "source id should be used by at least one fragment"
2090 );
2091
2092 let (fragment_ids, job_ids) = fragments
2093 .iter()
2094 .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
2095 .unzip();
2096
2097 for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
2098 Fragment::update(fragment::ActiveModel {
2099 fragment_id: Set(fragment_id),
2100 fragment_type_mask: Set(fragment_type_mask.into()),
2101 stream_node: Set(StreamNode::from(&stream_node)),
2102 ..Default::default()
2103 })
2104 .exec(&txn)
2105 .await?;
2106 }
2107
2108 txn.commit().await?;
2109
2110 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap(), None).into());
2111 let _version = self
2112 .notify_frontend(
2113 NotificationOperation::Update,
2114 NotificationInfo::ObjectGroup(PbObjectGroup {
2115 objects: vec![PbObject {
2116 object_info: Some(relation_info),
2117 }],
2118 dependencies: vec![],
2119 }),
2120 )
2121 .await;
2122
2123 Ok((job_ids, fragment_ids))
2124 }
2125
2126 pub async fn mutate_fragments_by_job_id(
2129 &self,
2130 job_id: JobId,
2131 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
2133 err_msg: &'static str,
2135 ) -> MetaResult<HashSet<FragmentId>> {
2136 let inner = self.inner.read().await;
2137 let txn = inner.db.begin().await?;
2138
2139 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2140 .select_only()
2141 .columns([
2142 fragment::Column::FragmentId,
2143 fragment::Column::FragmentTypeMask,
2144 fragment::Column::StreamNode,
2145 ])
2146 .filter(fragment::Column::JobId.eq(job_id))
2147 .into_tuple()
2148 .all(&txn)
2149 .await?;
2150 let mut fragments = fragments
2151 .into_iter()
2152 .map(|(id, mask, stream_node)| {
2153 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
2154 })
2155 .collect_vec();
2156
2157 let fragments = fragments
2158 .iter_mut()
2159 .map(|(_, fragment_type_mask, stream_node)| {
2160 fragments_mutation_fn(*fragment_type_mask, stream_node)
2161 })
2162 .collect::<MetaResult<Vec<bool>>>()?
2163 .into_iter()
2164 .zip_eq_debug(std::mem::take(&mut fragments))
2165 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
2166 .collect::<Vec<_>>();
2167
2168 if fragments.is_empty() {
2169 return Err(MetaError::invalid_parameter(format!(
2170 "job id {job_id}: {}",
2171 err_msg
2172 )));
2173 }
2174
2175 let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
2176 for (id, _, stream_node) in fragments {
2177 Fragment::update(fragment::ActiveModel {
2178 fragment_id: Set(id),
2179 stream_node: Set(StreamNode::from(&stream_node)),
2180 ..Default::default()
2181 })
2182 .exec(&txn)
2183 .await?;
2184 }
2185
2186 txn.commit().await?;
2187
2188 Ok(fragment_ids)
2189 }
2190
2191 async fn mutate_fragment_by_fragment_id(
2192 &self,
2193 fragment_id: FragmentId,
2194 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
2195 err_msg: &'static str,
2196 ) -> MetaResult<()> {
2197 let inner = self.inner.read().await;
2198 let txn = inner.db.begin().await?;
2199
2200 let (fragment_type_mask, stream_node): (i32, StreamNode) =
2201 Fragment::find_by_id(fragment_id)
2202 .select_only()
2203 .columns([
2204 fragment::Column::FragmentTypeMask,
2205 fragment::Column::StreamNode,
2206 ])
2207 .into_tuple()
2208 .one(&txn)
2209 .await?
2210 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
2211 let mut pb_stream_node = stream_node.to_protobuf();
2212 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
2213
2214 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
2215 return Err(MetaError::invalid_parameter(format!(
2216 "fragment id {fragment_id}: {}",
2217 err_msg
2218 )));
2219 }
2220
2221 Fragment::update(fragment::ActiveModel {
2222 fragment_id: Set(fragment_id),
2223 stream_node: Set(stream_node),
2224 ..Default::default()
2225 })
2226 .exec(&txn)
2227 .await?;
2228
2229 txn.commit().await?;
2230
2231 Ok(())
2232 }
2233
2234 pub async fn update_backfill_orders_by_job_id(
2235 &self,
2236 job_id: JobId,
2237 backfill_orders: Option<BackfillOrders>,
2238 ) -> MetaResult<()> {
2239 let inner = self.inner.write().await;
2240 let txn = inner.db.begin().await?;
2241
2242 ensure_job_not_canceled(job_id, &txn).await?;
2243
2244 streaming_job::ActiveModel {
2245 job_id: Set(job_id),
2246 backfill_orders: Set(backfill_orders),
2247 ..Default::default()
2248 }
2249 .update(&txn)
2250 .await?;
2251
2252 txn.commit().await?;
2253
2254 Ok(())
2255 }
2256
2257 pub async fn update_backfill_rate_limit_by_job_id(
2260 &self,
2261 job_id: JobId,
2262 rate_limit: Option<u32>,
2263 ) -> MetaResult<HashSet<FragmentId>> {
2264 let update_backfill_rate_limit =
2265 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2266 let mut found = false;
2267 if fragment_type_mask
2268 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
2269 {
2270 visit_stream_node_mut(stream_node, |node| match node {
2271 PbNodeBody::StreamCdcScan(node) => {
2272 node.rate_limit = rate_limit;
2273 found = true;
2274 }
2275 PbNodeBody::StreamScan(node) => {
2276 node.rate_limit = rate_limit;
2277 found = true;
2278 }
2279 PbNodeBody::SourceBackfill(node) => {
2280 node.rate_limit = rate_limit;
2281 found = true;
2282 }
2283 _ => {}
2284 });
2285 }
2286 Ok(found)
2287 };
2288
2289 self.mutate_fragments_by_job_id(
2290 job_id,
2291 update_backfill_rate_limit,
2292 "stream scan node or source node not found",
2293 )
2294 .await
2295 }
2296
2297 pub async fn update_sink_rate_limit_by_job_id(
2300 &self,
2301 sink_id: SinkId,
2302 rate_limit: Option<u32>,
2303 ) -> MetaResult<HashSet<FragmentId>> {
2304 let update_sink_rate_limit =
2305 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2306 let mut found = Ok(false);
2307 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
2308 visit_stream_node_mut(stream_node, |node| {
2309 if let PbNodeBody::Sink(node) = node {
2310 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
2311 found = Err(MetaError::invalid_parameter(
2312 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
2313 ));
2314 return;
2315 }
2316 node.rate_limit = rate_limit;
2317 found = Ok(true);
2318 }
2319 });
2320 }
2321 found
2322 };
2323
2324 self.mutate_fragments_by_job_id(
2325 sink_id.as_job_id(),
2326 update_sink_rate_limit,
2327 "sink node not found",
2328 )
2329 .await
2330 }
2331
2332 pub async fn update_dml_rate_limit_by_job_id(
2333 &self,
2334 job_id: JobId,
2335 rate_limit: Option<u32>,
2336 ) -> MetaResult<HashSet<FragmentId>> {
2337 let update_dml_rate_limit =
2338 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2339 let mut found = false;
2340 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
2341 visit_stream_node_mut(stream_node, |node| {
2342 if let PbNodeBody::Dml(node) = node {
2343 node.rate_limit = rate_limit;
2344 found = true;
2345 }
2346 });
2347 }
2348 Ok(found)
2349 };
2350
2351 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
2352 .await
2353 }
2354
2355 pub async fn update_source_props_by_source_id(
2356 &self,
2357 source_id: SourceId,
2358 alter_props: BTreeMap<String, String>,
2359 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2360 skip_alter_on_fly_check: bool,
2361 ) -> MetaResult<WithOptionsSecResolved> {
2362 let inner = self.inner.read().await;
2363 let txn = inner.db.begin().await?;
2364
2365 let (source, _obj) = Source::find_by_id(source_id)
2366 .find_also_related(Object)
2367 .one(&txn)
2368 .await?
2369 .ok_or_else(|| {
2370 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2371 })?;
2372 let connector = source.with_properties.0.get_connector().unwrap();
2373 let is_shared_source = source.is_shared();
2374
2375 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2376 if !is_shared_source {
2377 dep_source_job_ids = ObjectDependency::find()
2379 .select_only()
2380 .column(object_dependency::Column::UsedBy)
2381 .filter(object_dependency::Column::Oid.eq(source_id))
2382 .into_tuple()
2383 .all(&txn)
2384 .await?;
2385 }
2386
2387 if let Some(new_connector) = alter_props.get(UPSTREAM_SOURCE_KEY)
2389 && new_connector != &connector
2390 {
2391 return Err(MetaError::invalid_parameter(format!(
2392 "Cannot change connector type from '{}' to '{}'. Drop and recreate the source instead.",
2393 connector, new_connector
2394 )));
2395 }
2396
2397 if !skip_alter_on_fly_check {
2399 let prop_keys: Vec<String> = alter_props
2400 .keys()
2401 .chain(alter_secret_refs.keys())
2402 .cloned()
2403 .collect();
2404 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2405 &connector, &prop_keys,
2406 )?;
2407 }
2408
2409 let mut options_with_secret = WithOptionsSecResolved::new(
2410 source.with_properties.0.clone(),
2411 source
2412 .secret_ref
2413 .map(|secret_ref| secret_ref.to_protobuf())
2414 .unwrap_or_default(),
2415 );
2416 let (to_add_secret_dep, to_remove_secret_dep) =
2417 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2418
2419 tracing::info!(
2420 "applying new properties to source: source_id={}, options_with_secret={:?}",
2421 source_id,
2422 options_with_secret
2423 );
2424 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2426 let mut associate_table_id = None;
2429
2430 let mut preferred_id = source_id.as_object_id();
2434 let rewrite_sql = {
2435 let definition = source.definition.clone();
2436
2437 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2438 .map_err(|e| {
2439 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2440 anyhow!(e).context("Failed to parse source definition SQL"),
2441 )))
2442 })?
2443 .try_into()
2444 .unwrap();
2445
2446 async fn format_with_option_secret_resolved(
2460 txn: &DatabaseTransaction,
2461 options_with_secret: &WithOptionsSecResolved,
2462 ) -> MetaResult<Vec<SqlOption>> {
2463 let mut options = Vec::new();
2464 for (k, v) in options_with_secret.as_plaintext() {
2465 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2466 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2467 options.push(sql_option);
2468 }
2469 for (k, v) in options_with_secret.as_secret() {
2470 if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2471 let sql_option =
2472 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2473 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2474 options.push(sql_option);
2475 } else {
2476 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2477 }
2478 }
2479 Ok(options)
2480 }
2481
2482 match &mut stmt {
2483 Statement::CreateSource { stmt } => {
2484 stmt.with_properties.0 =
2485 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2486 }
2487 Statement::CreateTable { with_options, .. } => {
2488 *with_options =
2489 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2490 associate_table_id = source.optional_associated_table_id;
2491 preferred_id = associate_table_id.unwrap().as_object_id();
2492 }
2493 _ => unreachable!(),
2494 }
2495
2496 stmt.to_string()
2497 };
2498
2499 {
2500 if !to_add_secret_dep.is_empty() {
2503 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2504 object_dependency::ActiveModel {
2505 oid: Set(secret_id.into()),
2506 used_by: Set(preferred_id),
2507 ..Default::default()
2508 }
2509 }))
2510 .exec(&txn)
2511 .await?;
2512 }
2513 if !to_remove_secret_dep.is_empty() {
2516 let _ = ObjectDependency::delete_many()
2517 .filter(
2518 object_dependency::Column::Oid
2519 .is_in(to_remove_secret_dep)
2520 .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2521 )
2522 .exec(&txn)
2523 .await?;
2524 }
2525 }
2526
2527 let active_source_model = source::ActiveModel {
2528 source_id: Set(source_id),
2529 definition: Set(rewrite_sql.clone()),
2530 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2531 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2532 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2533 ..Default::default()
2534 };
2535 Source::update(active_source_model).exec(&txn).await?;
2536
2537 if let Some(associate_table_id) = associate_table_id {
2538 let active_table_model = table::ActiveModel {
2540 table_id: Set(associate_table_id),
2541 definition: Set(rewrite_sql),
2542 ..Default::default()
2543 };
2544 Table::update(active_table_model).exec(&txn).await?;
2545 }
2546
2547 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2548 associate_table_id.as_job_id()
2550 } else {
2551 source_id.as_share_source_job_id()
2552 }]
2553 .into_iter()
2554 .chain(dep_source_job_ids.into_iter())
2555 .collect_vec();
2556
2557 update_connector_props_fragments(
2559 &txn,
2560 to_check_job_ids,
2561 FragmentTypeFlag::Source,
2562 |node, found| {
2563 if let PbNodeBody::Source(node) = node
2564 && let Some(source_inner) = &mut node.source_inner
2565 {
2566 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2567 source_inner.secret_refs = options_with_secret.as_secret().clone();
2568 *found = true;
2569 }
2570 },
2571 is_shared_source,
2572 )
2573 .await?;
2574
2575 let mut to_update_objs = Vec::with_capacity(2);
2576 let (source, obj) = Source::find_by_id(source_id)
2577 .find_also_related(Object)
2578 .one(&txn)
2579 .await?
2580 .ok_or_else(|| {
2581 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2582 })?;
2583 to_update_objs.push(PbObject {
2584 object_info: Some(PbObjectInfo::Source(
2585 ObjectModel(source, obj.unwrap(), None).into(),
2586 )),
2587 });
2588
2589 if let Some(associate_table_id) = associate_table_id {
2590 let (table, obj) = Table::find_by_id(associate_table_id)
2591 .find_also_related(Object)
2592 .one(&txn)
2593 .await?
2594 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2595 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2596 .one(&txn)
2597 .await?;
2598 to_update_objs.push(PbObject {
2599 object_info: Some(PbObjectInfo::Table(
2600 ObjectModel(table, obj.unwrap(), streaming_job).into(),
2601 )),
2602 });
2603 }
2604
2605 txn.commit().await?;
2606
2607 self.notify_frontend(
2608 NotificationOperation::Update,
2609 NotificationInfo::ObjectGroup(PbObjectGroup {
2610 objects: to_update_objs,
2611 dependencies: vec![],
2612 }),
2613 )
2614 .await;
2615
2616 Ok(options_with_secret)
2617 }
2618
2619 pub async fn update_sink_props_by_sink_id(
2620 &self,
2621 sink_id: SinkId,
2622 props: BTreeMap<String, String>,
2623 ) -> MetaResult<HashMap<String, String>> {
2624 let inner = self.inner.read().await;
2625 let txn = inner.db.begin().await?;
2626
2627 let (sink, _obj) = Sink::find_by_id(sink_id)
2628 .find_also_related(Object)
2629 .one(&txn)
2630 .await?
2631 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2632 validate_sink_props(&sink, &props)?;
2633 let definition = sink.definition.clone();
2634 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2635 .map_err(|e| SinkError::Config(anyhow!(e)))?
2636 .try_into()
2637 .unwrap();
2638 if let Statement::CreateSink { stmt } = &mut stmt {
2639 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2640 } else {
2641 panic!("definition is not a create sink statement")
2642 }
2643 let mut new_config = sink.properties.clone().into_inner();
2644 new_config.extend(props.clone());
2645
2646 let definition = stmt.to_string();
2647 let active_sink = sink::ActiveModel {
2648 sink_id: Set(sink_id),
2649 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2650 definition: Set(definition),
2651 ..Default::default()
2652 };
2653 Sink::update(active_sink).exec(&txn).await?;
2654
2655 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2656 let (sink, obj) = Sink::find_by_id(sink_id)
2657 .find_also_related(Object)
2658 .one(&txn)
2659 .await?
2660 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2661 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2662 .one(&txn)
2663 .await?;
2664 txn.commit().await?;
2665 let relation_infos = vec![PbObject {
2666 object_info: Some(PbObjectInfo::Sink(
2667 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2668 )),
2669 }];
2670
2671 let _version = self
2672 .notify_frontend(
2673 NotificationOperation::Update,
2674 NotificationInfo::ObjectGroup(PbObjectGroup {
2675 objects: relation_infos,
2676 dependencies: vec![],
2677 }),
2678 )
2679 .await;
2680
2681 Ok(props.into_iter().collect())
2682 }
2683
2684 pub async fn update_iceberg_table_props_by_table_id(
2685 &self,
2686 table_id: TableId,
2687 props: BTreeMap<String, String>,
2688 alter_iceberg_table_props: Option<
2689 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2690 >,
2691 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2692 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2693 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2694 let inner = self.inner.read().await;
2695 let txn = inner.db.begin().await?;
2696
2697 let (sink, _obj) = Sink::find_by_id(sink_id)
2698 .find_also_related(Object)
2699 .one(&txn)
2700 .await?
2701 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2702 validate_sink_props(&sink, &props)?;
2703
2704 let definition = sink.definition.clone();
2705 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2706 .map_err(|e| SinkError::Config(anyhow!(e)))?
2707 .try_into()
2708 .unwrap();
2709 if let Statement::CreateTable {
2710 with_options,
2711 engine,
2712 ..
2713 } = &mut stmt
2714 {
2715 if !matches!(engine, Engine::Iceberg) {
2716 return Err(SinkError::Config(anyhow!(
2717 "only iceberg table can be altered as sink"
2718 ))
2719 .into());
2720 }
2721 update_stmt_with_props(with_options, &props)?;
2722 } else {
2723 panic!("definition is not a create iceberg table statement")
2724 }
2725 let mut new_config = sink.properties.clone().into_inner();
2726 new_config.extend(props.clone());
2727
2728 let definition = stmt.to_string();
2729 let active_sink = sink::ActiveModel {
2730 sink_id: Set(sink_id),
2731 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2732 definition: Set(definition.clone()),
2733 ..Default::default()
2734 };
2735 let active_source = source::ActiveModel {
2736 source_id: Set(source_id),
2737 definition: Set(definition.clone()),
2738 ..Default::default()
2739 };
2740 let active_table = table::ActiveModel {
2741 table_id: Set(table_id),
2742 definition: Set(definition),
2743 ..Default::default()
2744 };
2745 Sink::update(active_sink).exec(&txn).await?;
2746 Source::update(active_source).exec(&txn).await?;
2747 Table::update(active_table).exec(&txn).await?;
2748
2749 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2750
2751 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2752 .find_also_related(Object)
2753 .one(&txn)
2754 .await?
2755 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2756 let sink_streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2757 .one(&txn)
2758 .await?;
2759 let (source, source_obj) = Source::find_by_id(source_id)
2760 .find_also_related(Object)
2761 .one(&txn)
2762 .await?
2763 .ok_or_else(|| {
2764 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2765 })?;
2766 let (table, table_obj) = Table::find_by_id(table_id)
2767 .find_also_related(Object)
2768 .one(&txn)
2769 .await?
2770 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2771 let table_streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2772 .one(&txn)
2773 .await?;
2774 txn.commit().await?;
2775 let relation_infos = vec![
2776 PbObject {
2777 object_info: Some(PbObjectInfo::Sink(
2778 ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job).into(),
2779 )),
2780 },
2781 PbObject {
2782 object_info: Some(PbObjectInfo::Source(
2783 ObjectModel(source, source_obj.unwrap(), None).into(),
2784 )),
2785 },
2786 PbObject {
2787 object_info: Some(PbObjectInfo::Table(
2788 ObjectModel(table, table_obj.unwrap(), table_streaming_job).into(),
2789 )),
2790 },
2791 ];
2792 let _version = self
2793 .notify_frontend(
2794 NotificationOperation::Update,
2795 NotificationInfo::ObjectGroup(PbObjectGroup {
2796 objects: relation_infos,
2797 dependencies: vec![],
2798 }),
2799 )
2800 .await;
2801
2802 Ok((props.into_iter().collect(), sink_id))
2803 }
2804
2805 pub async fn update_connection_and_dependent_objects_props(
2807 &self,
2808 connection_id: ConnectionId,
2809 alter_props: BTreeMap<String, String>,
2810 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2811 ) -> MetaResult<(
2812 WithOptionsSecResolved, Vec<(SourceId, HashMap<String, String>)>, Vec<(SinkId, HashMap<String, String>)>, )> {
2816 let inner = self.inner.read().await;
2817 let txn = inner.db.begin().await?;
2818
2819 let dependent_sources: Vec<SourceId> = Source::find()
2821 .select_only()
2822 .column(source::Column::SourceId)
2823 .filter(source::Column::ConnectionId.eq(connection_id))
2824 .into_tuple()
2825 .all(&txn)
2826 .await?;
2827
2828 let dependent_sinks: Vec<SinkId> = Sink::find()
2829 .select_only()
2830 .column(sink::Column::SinkId)
2831 .filter(sink::Column::ConnectionId.eq(connection_id))
2832 .into_tuple()
2833 .all(&txn)
2834 .await?;
2835
2836 let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2837 .find_also_related(Object)
2838 .one(&txn)
2839 .await?
2840 .ok_or_else(|| {
2841 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2842 })?;
2843
2844 let prop_keys: Vec<String> = alter_props
2846 .keys()
2847 .chain(alter_secret_refs.keys())
2848 .cloned()
2849 .collect();
2850
2851 let connection_type_str = pb_connection_type_to_connection_type(
2853 &connection_catalog.params.to_protobuf().connection_type(),
2854 )
2855 .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2856
2857 risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2858 connection_type_str, &prop_keys,
2859 )?;
2860
2861 let connection_pb = connection_catalog.params.to_protobuf();
2862 let mut connection_options_with_secret = WithOptionsSecResolved::new(
2863 connection_pb.properties.into_iter().collect(),
2864 connection_pb.secret_refs.into_iter().collect(),
2865 );
2866
2867 let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2868 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2869
2870 tracing::debug!(
2871 "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2872 connection_id,
2873 dependent_sources,
2874 dependent_sinks
2875 );
2876
2877 {
2879 let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2880 connection_type: connection_pb.connection_type,
2881 properties: connection_options_with_secret
2882 .as_plaintext()
2883 .clone()
2884 .into_iter()
2885 .collect(),
2886 secret_refs: connection_options_with_secret
2887 .as_secret()
2888 .clone()
2889 .into_iter()
2890 .collect(),
2891 };
2892 let connection = PbConnection {
2893 id: connection_id as _,
2894 info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2895 conn_params_pb,
2896 )),
2897 ..Default::default()
2898 };
2899 validate_connection(&connection).await?;
2900 }
2901
2902 if !to_add_secret_dep.is_empty() {
2904 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2905 object_dependency::ActiveModel {
2906 oid: Set(secret_id.into()),
2907 used_by: Set(connection_id.as_object_id()),
2908 ..Default::default()
2909 }
2910 }))
2911 .exec(&txn)
2912 .await?;
2913 }
2914 if !to_remove_secret_dep.is_empty() {
2915 let _ = ObjectDependency::delete_many()
2916 .filter(
2917 object_dependency::Column::Oid
2918 .is_in(to_remove_secret_dep)
2919 .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2920 )
2921 .exec(&txn)
2922 .await?;
2923 }
2924
2925 let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2927 connection_type: connection_pb.connection_type,
2928 properties: connection_options_with_secret
2929 .as_plaintext()
2930 .clone()
2931 .into_iter()
2932 .collect(),
2933 secret_refs: connection_options_with_secret
2934 .as_secret()
2935 .clone()
2936 .into_iter()
2937 .collect(),
2938 };
2939 let active_connection_model = connection::ActiveModel {
2940 connection_id: Set(connection_id),
2941 params: Set(ConnectionParams::from(&updated_connection_params)),
2942 ..Default::default()
2943 };
2944 Connection::update(active_connection_model)
2945 .exec(&txn)
2946 .await?;
2947
2948 let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2950
2951 if !dependent_sources.is_empty() {
2952 let sources_with_objs = Source::find()
2954 .find_also_related(Object)
2955 .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2956 .all(&txn)
2957 .await?;
2958
2959 let mut source_updates = Vec::new();
2961 let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2962
2963 for (source, _obj) in sources_with_objs {
2964 let source_id = source.source_id;
2965
2966 let mut source_options_with_secret = WithOptionsSecResolved::new(
2967 source.with_properties.0.clone(),
2968 source
2969 .secret_ref
2970 .clone()
2971 .map(|secret_ref| secret_ref.to_protobuf())
2972 .unwrap_or_default(),
2973 );
2974 let (source_to_add_secret_dep, source_to_remove_secret_dep) =
2975 source_options_with_secret
2976 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2977
2978 let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2980
2981 let source_used_by_id = source
2984 .optional_associated_table_id
2985 .map(|table_id| table_id.as_object_id())
2986 .unwrap_or_else(|| source_id.as_object_id());
2987 if !source_to_add_secret_dep.is_empty() {
2988 ObjectDependency::insert_many(source_to_add_secret_dep.into_iter().map(
2989 |secret_id| object_dependency::ActiveModel {
2990 oid: Set(secret_id.into()),
2991 used_by: Set(source_used_by_id),
2992 ..Default::default()
2993 },
2994 ))
2995 .exec(&txn)
2996 .await?;
2997 }
2998 if !source_to_remove_secret_dep.is_empty() {
2999 let _ = ObjectDependency::delete_many()
3000 .filter(
3001 object_dependency::Column::Oid
3002 .is_in(source_to_remove_secret_dep)
3003 .and(object_dependency::Column::UsedBy.eq(source_used_by_id)),
3004 )
3005 .exec(&txn)
3006 .await?;
3007 }
3008
3009 let active_source = source::ActiveModel {
3011 source_id: Set(source_id),
3012 with_properties: Set(Property(
3013 source_options_with_secret.as_plaintext().clone(),
3014 )),
3015 secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
3016 || {
3017 risingwave_meta_model::SecretRef::from(
3018 source_options_with_secret.as_secret().clone(),
3019 )
3020 },
3021 )),
3022 ..Default::default()
3023 };
3024 source_updates.push(active_source);
3025
3026 let is_shared_source = source.is_shared();
3031 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
3032 if !is_shared_source {
3033 dep_source_job_ids = ObjectDependency::find()
3034 .select_only()
3035 .column(object_dependency::Column::UsedBy)
3036 .filter(object_dependency::Column::Oid.eq(source_id))
3037 .into_tuple()
3038 .all(&txn)
3039 .await?;
3040 }
3041
3042 let base_job_id =
3043 if let Some(associate_table_id) = source.optional_associated_table_id {
3044 associate_table_id.as_job_id()
3045 } else {
3046 source_id.as_share_source_job_id()
3047 };
3048 let job_ids = vec![base_job_id]
3049 .into_iter()
3050 .chain(dep_source_job_ids.into_iter())
3051 .collect_vec();
3052
3053 fragment_updates.push(DependentSourceFragmentUpdate {
3054 job_ids,
3055 with_properties: source_options_with_secret.as_plaintext().clone(),
3056 secret_refs: source_options_with_secret.as_secret().clone(),
3057 is_shared_source,
3058 });
3059
3060 let complete_source_props = LocalSecretManager::global()
3062 .fill_secrets(
3063 source_options_with_secret.as_plaintext().clone(),
3064 source_options_with_secret.as_secret().clone(),
3065 )
3066 .map_err(MetaError::from)?
3067 .into_iter()
3068 .collect::<HashMap<String, String>>();
3069 updated_sources_with_props.push((source_id, complete_source_props));
3070 }
3071
3072 for source_update in source_updates {
3073 Source::update(source_update).exec(&txn).await?;
3074 }
3075
3076 for DependentSourceFragmentUpdate {
3078 job_ids,
3079 with_properties,
3080 secret_refs,
3081 is_shared_source,
3082 } in fragment_updates
3083 {
3084 update_connector_props_fragments(
3085 &txn,
3086 job_ids,
3087 FragmentTypeFlag::Source,
3088 |node, found| {
3089 if let PbNodeBody::Source(node) = node
3090 && let Some(source_inner) = &mut node.source_inner
3091 {
3092 source_inner.with_properties = with_properties.clone();
3093 source_inner.secret_refs = secret_refs.clone();
3094 *found = true;
3095 }
3096 },
3097 is_shared_source,
3098 )
3099 .await?;
3100 }
3101 }
3102
3103 let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
3105
3106 if !dependent_sinks.is_empty() {
3107 let sinks_with_objs = Sink::find()
3109 .find_also_related(Object)
3110 .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
3111 .all(&txn)
3112 .await?;
3113
3114 let mut sink_updates = Vec::new();
3116 let mut sink_fragment_updates = Vec::new();
3117
3118 for (sink, _obj) in sinks_with_objs {
3119 let sink_id = sink.sink_id;
3120
3121 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3123 Some(connector) => {
3124 let connector_type = connector.to_lowercase();
3125 check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
3126 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3127
3128 match_sink_name_str!(
3129 connector_type.as_str(),
3130 SinkType,
3131 {
3132 let mut new_sink_props = sink.properties.0.clone();
3133 new_sink_props.extend(alter_props.clone());
3134 SinkType::validate_alter_config(&new_sink_props)
3135 },
3136 |sink: &str| Err(SinkError::Config(anyhow!(
3137 "unsupported sink type {}",
3138 sink
3139 )))
3140 )?
3141 }
3142 None => {
3143 return Err(SinkError::Config(anyhow!(
3144 "connector not specified when alter sink"
3145 ))
3146 .into());
3147 }
3148 };
3149
3150 let mut new_sink_props = sink.properties.0.clone();
3151 new_sink_props.extend(alter_props.clone());
3152
3153 let active_sink = sink::ActiveModel {
3155 sink_id: Set(sink_id),
3156 properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
3157 ..Default::default()
3158 };
3159 sink_updates.push(active_sink);
3160
3161 sink_fragment_updates.push((sink_id, new_sink_props.clone()));
3163
3164 let complete_sink_props: HashMap<String, String> =
3166 new_sink_props.into_iter().collect();
3167 updated_sinks_with_props.push((sink_id, complete_sink_props));
3168 }
3169
3170 for sink_update in sink_updates {
3172 Sink::update(sink_update).exec(&txn).await?;
3173 }
3174
3175 for (sink_id, new_sink_props) in sink_fragment_updates {
3177 update_connector_props_fragments(
3178 &txn,
3179 vec![sink_id.as_job_id()],
3180 FragmentTypeFlag::Sink,
3181 |node, found| {
3182 if let PbNodeBody::Sink(node) = node
3183 && let Some(sink_desc) = &mut node.sink_desc
3184 && sink_desc.id == sink_id.as_raw_id()
3185 {
3186 sink_desc.properties = new_sink_props.clone();
3187 *found = true;
3188 }
3189 },
3190 true,
3191 )
3192 .await?;
3193 }
3194 }
3195
3196 let mut updated_objects = Vec::new();
3198
3199 let (connection, obj) = Connection::find_by_id(connection_id)
3201 .find_also_related(Object)
3202 .one(&txn)
3203 .await?
3204 .ok_or_else(|| {
3205 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
3206 })?;
3207 updated_objects.push(PbObject {
3208 object_info: Some(PbObjectInfo::Connection(
3209 ObjectModel(connection, obj.unwrap(), None).into(),
3210 )),
3211 });
3212
3213 for source_id in &dependent_sources {
3215 let (source, obj) = Source::find_by_id(*source_id)
3216 .find_also_related(Object)
3217 .one(&txn)
3218 .await?
3219 .ok_or_else(|| {
3220 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
3221 })?;
3222 updated_objects.push(PbObject {
3223 object_info: Some(PbObjectInfo::Source(
3224 ObjectModel(source, obj.unwrap(), None).into(),
3225 )),
3226 });
3227 }
3228
3229 for sink_id in &dependent_sinks {
3231 let (sink, obj) = Sink::find_by_id(*sink_id)
3232 .find_also_related(Object)
3233 .one(&txn)
3234 .await?
3235 .ok_or_else(|| {
3236 MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
3237 })?;
3238 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
3239 .one(&txn)
3240 .await?;
3241 updated_objects.push(PbObject {
3242 object_info: Some(PbObjectInfo::Sink(
3243 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
3244 )),
3245 });
3246 }
3247
3248 txn.commit().await?;
3250
3251 if !updated_objects.is_empty() {
3253 self.notify_frontend(
3254 NotificationOperation::Update,
3255 NotificationInfo::ObjectGroup(PbObjectGroup {
3256 objects: updated_objects,
3257 dependencies: vec![],
3258 }),
3259 )
3260 .await;
3261 }
3262
3263 Ok((
3264 connection_options_with_secret,
3265 updated_sources_with_props,
3266 updated_sinks_with_props,
3267 ))
3268 }
3269
3270 pub async fn update_fragment_rate_limit_by_fragment_id(
3271 &self,
3272 fragment_id: FragmentId,
3273 rate_limit: Option<u32>,
3274 ) -> MetaResult<()> {
3275 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
3276 stream_node: &mut PbStreamNode| {
3277 let mut found = false;
3278 if fragment_type_mask.contains_any(
3279 FragmentTypeFlag::dml_rate_limit_fragments()
3280 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
3281 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
3282 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
3283 ) {
3284 visit_stream_node_mut(stream_node, |node| {
3285 if let PbNodeBody::Dml(node) = node {
3286 node.rate_limit = rate_limit;
3287 found = true;
3288 }
3289 if let PbNodeBody::Sink(node) = node {
3290 node.rate_limit = rate_limit;
3291 found = true;
3292 }
3293 if let PbNodeBody::StreamCdcScan(node) = node {
3294 node.rate_limit = rate_limit;
3295 found = true;
3296 }
3297 if let PbNodeBody::StreamScan(node) = node {
3298 node.rate_limit = rate_limit;
3299 found = true;
3300 }
3301 if let PbNodeBody::SourceBackfill(node) = node {
3302 node.rate_limit = rate_limit;
3303 found = true;
3304 }
3305 });
3306 }
3307 found
3308 };
3309 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
3310 .await
3311 }
3312
3313 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
3316 let inner = self.inner.read().await;
3317 let txn = inner.db.begin().await?;
3318
3319 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
3320 .select_only()
3321 .columns([
3322 fragment::Column::FragmentId,
3323 fragment::Column::JobId,
3324 fragment::Column::FragmentTypeMask,
3325 fragment::Column::StreamNode,
3326 ])
3327 .filter(FragmentTypeMask::intersects_any(
3328 FragmentTypeFlag::rate_limit_fragments(),
3329 ))
3330 .into_tuple()
3331 .all(&txn)
3332 .await?;
3333
3334 let mut rate_limits = Vec::new();
3335 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
3336 let stream_node = stream_node.to_protobuf();
3337 visit_stream_node_body(&stream_node, |node| {
3338 let mut rate_limit = None;
3339 let mut node_name = None;
3340
3341 match node {
3342 PbNodeBody::Source(node) => {
3344 if let Some(node_inner) = &node.source_inner {
3345 rate_limit = node_inner.rate_limit;
3346 node_name = Some("SOURCE");
3347 }
3348 }
3349 PbNodeBody::StreamFsFetch(node) => {
3350 if let Some(node_inner) = &node.node_inner {
3351 rate_limit = node_inner.rate_limit;
3352 node_name = Some("FS_FETCH");
3353 }
3354 }
3355 PbNodeBody::SourceBackfill(node) => {
3357 rate_limit = node.rate_limit;
3358 node_name = Some("SOURCE_BACKFILL");
3359 }
3360 PbNodeBody::StreamScan(node) => {
3361 rate_limit = node.rate_limit;
3362 node_name = Some("STREAM_SCAN");
3363 }
3364 PbNodeBody::StreamCdcScan(node) => {
3365 rate_limit = node.rate_limit;
3366 node_name = Some("STREAM_CDC_SCAN");
3367 }
3368 PbNodeBody::Sink(node) => {
3369 rate_limit = node.rate_limit;
3370 node_name = Some("SINK");
3371 }
3372 _ => {}
3373 }
3374
3375 if let Some(rate_limit) = rate_limit {
3376 rate_limits.push(RateLimitInfo {
3377 fragment_id,
3378 job_id,
3379 fragment_type_mask: fragment_type_mask as u32,
3380 rate_limit,
3381 node_name: node_name.unwrap().to_owned(),
3382 });
3383 }
3384 });
3385 }
3386
3387 Ok(rate_limits)
3388 }
3389}
3390
3391fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
3392 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3394 Some(connector) => {
3395 let connector_type = connector.to_lowercase();
3396 let field_names: Vec<String> = props.keys().cloned().collect();
3397 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
3398 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3399
3400 match_sink_name_str!(
3401 connector_type.as_str(),
3402 SinkType,
3403 {
3404 let mut new_props = sink.properties.0.clone();
3405 new_props.extend(props.clone());
3406 SinkType::validate_alter_config(&new_props)
3407 },
3408 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
3409 )?
3410 }
3411 None => {
3412 return Err(
3413 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
3414 );
3415 }
3416 };
3417 Ok(())
3418}
3419
3420fn update_stmt_with_props(
3421 with_properties: &mut Vec<SqlOption>,
3422 props: &BTreeMap<String, String>,
3423) -> MetaResult<()> {
3424 let mut new_sql_options = with_properties
3425 .iter()
3426 .map(|sql_option| (&sql_option.name, sql_option))
3427 .collect::<IndexMap<_, _>>();
3428 let add_sql_options = props
3429 .iter()
3430 .map(|(k, v)| SqlOption::try_from((k, v)))
3431 .collect::<Result<Vec<SqlOption>, ParserError>>()
3432 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3433 new_sql_options.extend(
3434 add_sql_options
3435 .iter()
3436 .map(|sql_option| (&sql_option.name, sql_option)),
3437 );
3438 *with_properties = new_sql_options.into_values().cloned().collect();
3439 Ok(())
3440}
3441
3442async fn update_sink_fragment_props(
3443 txn: &DatabaseTransaction,
3444 sink_id: SinkId,
3445 props: BTreeMap<String, String>,
3446) -> MetaResult<()> {
3447 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3448 .select_only()
3449 .columns([
3450 fragment::Column::FragmentId,
3451 fragment::Column::FragmentTypeMask,
3452 fragment::Column::StreamNode,
3453 ])
3454 .filter(fragment::Column::JobId.eq(sink_id))
3455 .into_tuple()
3456 .all(txn)
3457 .await?;
3458 let fragments = fragments
3459 .into_iter()
3460 .filter(|(_, fragment_type_mask, _)| {
3461 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3462 })
3463 .filter_map(|(id, _, stream_node)| {
3464 let mut stream_node = stream_node.to_protobuf();
3465 let mut found = false;
3466 visit_stream_node_mut(&mut stream_node, |node| {
3467 if let PbNodeBody::Sink(node) = node
3468 && let Some(sink_desc) = &mut node.sink_desc
3469 && sink_desc.id == sink_id
3470 {
3471 sink_desc.properties.extend(props.clone());
3472 found = true;
3473 }
3474 });
3475 if found { Some((id, stream_node)) } else { None }
3476 })
3477 .collect_vec();
3478 assert!(
3479 !fragments.is_empty(),
3480 "sink id should be used by at least one fragment"
3481 );
3482 for (id, stream_node) in fragments {
3483 Fragment::update(fragment::ActiveModel {
3484 fragment_id: Set(id),
3485 stream_node: Set(StreamNode::from(&stream_node)),
3486 ..Default::default()
3487 })
3488 .exec(txn)
3489 .await?;
3490 }
3491 Ok(())
3492}
3493
3494pub struct SinkIntoTableContext {
3495 pub updated_sink_catalogs: Vec<SinkId>,
3498}
3499
3500pub struct FinishAutoRefreshSchemaSinkContext {
3501 pub tmp_sink_id: SinkId,
3502 pub original_sink_id: SinkId,
3503 pub columns: Vec<PbColumnCatalog>,
3504 pub new_log_store_table: Option<Box<PbTable>>,
3505}
3506
3507async fn update_connector_props_fragments<F>(
3508 txn: &DatabaseTransaction,
3509 job_ids: Vec<JobId>,
3510 expect_flag: FragmentTypeFlag,
3511 mut alter_stream_node_fn: F,
3512 is_shared_source: bool,
3513) -> MetaResult<()>
3514where
3515 F: FnMut(&mut PbNodeBody, &mut bool),
3516{
3517 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3518 .select_only()
3519 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3520 .filter(
3521 fragment::Column::JobId
3522 .is_in(job_ids.clone())
3523 .and(FragmentTypeMask::intersects(expect_flag)),
3524 )
3525 .into_tuple()
3526 .all(txn)
3527 .await?;
3528 let fragments = fragments
3529 .into_iter()
3530 .filter_map(|(id, stream_node)| {
3531 let mut stream_node = stream_node.to_protobuf();
3532 let mut found = false;
3533 visit_stream_node_mut(&mut stream_node, |node| {
3534 alter_stream_node_fn(node, &mut found);
3535 });
3536 if found { Some((id, stream_node)) } else { None }
3537 })
3538 .collect_vec();
3539 if is_shared_source || job_ids.len() > 1 {
3540 assert!(
3544 !fragments.is_empty(),
3545 "job ids {:?} (type: {:?}) should be used by at least one fragment",
3546 job_ids,
3547 expect_flag
3548 );
3549 }
3550
3551 for (id, stream_node) in fragments {
3552 Fragment::update(fragment::ActiveModel {
3553 fragment_id: Set(id),
3554 stream_node: Set(StreamNode::from(&stream_node)),
3555 ..Default::default()
3556 })
3557 .exec(txn)
3558 .await?;
3559 }
3560
3561 Ok(())
3562}