1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22 ColumnCatalog, FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23 RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, max_column_id,
24};
25use risingwave_common::config::DefaultParallelism;
26use risingwave_common::hash::VnodeCountCompat;
27use risingwave_common::id::JobId;
28use risingwave_common::secret::LocalSecretManager;
29use risingwave_common::system_param::AdaptiveParallelismStrategy;
30use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
31use risingwave_common::util::iter_util::ZipEqDebug;
32use risingwave_common::util::stream_graph_visitor::{
33 visit_stream_node_body, visit_stream_node_mut,
34};
35use risingwave_common::{bail, current_cluster_version};
36use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::error::ConnectorError;
39use risingwave_connector::sink::file_sink::fs::FsSink;
40use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
41use risingwave_connector::source::{
42 ConnectorProperties, UPSTREAM_SOURCE_KEY, pb_connection_type_to_connection_type,
43};
44use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
45use risingwave_meta_model::object::ObjectType;
46use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
47use risingwave_meta_model::refresh_job::RefreshState;
48use risingwave_meta_model::streaming_job::BackfillOrders;
49use risingwave_meta_model::table::TableType;
50use risingwave_meta_model::user_privilege::Action;
51use risingwave_meta_model::*;
52use risingwave_pb::catalog::table::PbEngine;
53use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
54use risingwave_pb::ddl_service::streaming_job_resource_type;
55use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
56use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
57use risingwave_pb::meta::object::PbObjectInfo;
58use risingwave_pb::meta::subscribe_response::{
59 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
60};
61use risingwave_pb::meta::{ObjectDependency as PbObjectDependency, PbObject, PbObjectGroup};
62use risingwave_pb::plan_common::PbColumnCatalog;
63use risingwave_pb::plan_common::source_refresh_mode::{
64 RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
65};
66use risingwave_pb::secret::PbSecretRef;
67use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
68use risingwave_pb::stream_plan::stream_node::PbNodeBody;
69use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
70use risingwave_pb::user::PbUserInfo;
71use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
72use risingwave_sqlparser::parser::{Parser, ParserError};
73use sea_orm::ActiveValue::Set;
74use sea_orm::sea_query::{Expr, Query, SimpleExpr};
75use sea_orm::{
76 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
77 NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
78};
79use thiserror_ext::AsReport;
80
81use super::rename::IndexItemRewriter;
82use crate::barrier::Command;
83use crate::controller::ObjectModel;
84use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
85use crate::controller::fragment::FragmentTypeMaskExt;
86use crate::controller::utils::{
87 PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
88 check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
89 ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
90 get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
91 list_object_dependencies_by_object_id, list_user_info_by_ids,
92 try_get_iceberg_table_by_downstream_sink,
93};
94use crate::error::MetaErrorInner;
95use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
96use crate::model::{
97 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
98 StreamJobFragmentsToCreate,
99};
100use crate::stream::SplitAssignment;
101use crate::{MetaError, MetaResult};
102
103#[derive(Debug)]
108struct DependentSourceFragmentUpdate {
109 job_ids: Vec<JobId>,
110 with_properties: BTreeMap<String, String>,
111 secret_refs: BTreeMap<String, PbSecretRef>,
112 is_shared_source: bool,
113}
114
115#[derive(Debug)]
116struct ReplaceOriginalJobInfo {
117 max_parallelism: i32,
118 timezone: Option<String>,
119 config_override: Option<String>,
120 adaptive_parallelism_strategy: Option<String>,
121 parallelism: StreamingParallelism,
122 specific_resource_group: Option<String>,
123}
124
125impl ReplaceOriginalJobInfo {
126 fn resolved_parallelism(
127 &self,
128 specified_parallelism: Option<&NonZeroUsize>,
129 ) -> StreamingParallelism {
130 specified_parallelism
131 .map(|n| StreamingParallelism::Fixed(n.get() as _))
132 .unwrap_or_else(|| self.parallelism.clone())
133 }
134
135 fn stream_context(&self, ctx_override: Option<&StreamContext>) -> StreamContext {
136 StreamContext {
137 timezone: ctx_override
138 .and_then(|ctx| ctx.timezone.clone())
139 .or_else(|| self.timezone.clone()),
140 config_override: self.config_override.clone().unwrap_or_default().into(),
143 }
144 }
145
146 fn resource_type(&self) -> streaming_job_resource_type::ResourceType {
147 match &self.specific_resource_group {
148 Some(group) => {
149 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group.clone())
150 }
151 None => streaming_job_resource_type::ResourceType::Regular(true),
152 }
153 }
154}
155
156impl From<streaming_job::Model> for ReplaceOriginalJobInfo {
157 fn from(model: streaming_job::Model) -> Self {
158 Self {
159 max_parallelism: model.max_parallelism,
160 timezone: model.timezone,
161 config_override: model.config_override,
162 adaptive_parallelism_strategy: model.adaptive_parallelism_strategy,
163 parallelism: model.parallelism,
164 specific_resource_group: model.specific_resource_group,
165 }
166 }
167}
168
169impl CatalogController {
170 #[expect(clippy::too_many_arguments)]
171 pub async fn create_streaming_job_obj(
172 txn: &DatabaseTransaction,
173 obj_type: ObjectType,
174 owner_id: UserId,
175 database_id: Option<DatabaseId>,
176 schema_id: Option<SchemaId>,
177 create_type: PbCreateType,
178 ctx: StreamContext,
179 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
180 streaming_parallelism: StreamingParallelism,
181 max_parallelism: usize,
182 resource_type: streaming_job_resource_type::ResourceType,
183 backfill_parallelism: Option<StreamingParallelism>,
184 backfill_adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
185 ) -> MetaResult<streaming_job::Model> {
186 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
187 let job_id = obj.oid.as_job_id();
188 let specific_resource_group = resource_type.resource_group();
189 let is_serverless_backfill = matches!(
190 &resource_type,
191 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
192 );
193 let model = streaming_job::Model {
194 job_id,
195 job_status: JobStatus::Initial,
196 create_type: create_type.into(),
197 timezone: ctx.timezone,
198 config_override: Some(ctx.config_override.to_string()),
199 adaptive_parallelism_strategy: adaptive_parallelism_strategy
200 .as_ref()
201 .map(ToString::to_string),
202 parallelism: streaming_parallelism,
203 backfill_parallelism,
204 backfill_adaptive_parallelism_strategy: backfill_adaptive_parallelism_strategy
205 .as_ref()
206 .map(ToString::to_string),
207 backfill_orders: None,
208 max_parallelism: max_parallelism as _,
209 specific_resource_group,
210 is_serverless_backfill,
211 };
212 let job = model.clone().into_active_model();
213 StreamingJobModel::insert(job).exec(txn).await?;
214
215 Ok(model)
216 }
217
218 #[await_tree::instrument]
224 pub async fn create_job_catalog(
225 &self,
226 streaming_job: &mut StreamingJob,
227 ctx: &StreamContext,
228 parallelism: &Option<Parallelism>,
229 max_parallelism: usize,
230 mut dependencies: HashSet<ObjectId>,
231 resource_type: streaming_job_resource_type::ResourceType,
232 backfill_parallelism: &Option<Parallelism>,
233 adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
234 backfill_adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
235 ) -> MetaResult<streaming_job::Model> {
236 let inner = self.inner.write().await;
237 let txn = inner.db.begin().await?;
238 let create_type = streaming_job.create_type();
239
240 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
241 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
242 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
243 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
244 };
245 let backfill_parallelism = backfill_parallelism
246 .as_ref()
247 .map(|p| StreamingParallelism::Fixed(p.parallelism as _))
248 .or_else(|| {
249 backfill_adaptive_parallelism_strategy
250 .as_ref()
251 .map(|_| StreamingParallelism::Adaptive)
252 });
253
254 ensure_user_id(streaming_job.owner() as _, &txn).await?;
255 ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
256 ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
257 check_relation_name_duplicate(
258 &streaming_job.name(),
259 streaming_job.database_id(),
260 streaming_job.schema_id(),
261 &txn,
262 )
263 .await?;
264
265 if !dependencies.is_empty() {
267 let altering_cnt = ObjectDependency::find()
268 .join(
269 JoinType::InnerJoin,
270 object_dependency::Relation::Object1.def(),
271 )
272 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
273 .filter(
274 object_dependency::Column::Oid
275 .is_in(dependencies.clone())
276 .and(object::Column::ObjType.eq(ObjectType::Table))
277 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
278 .and(
279 object::Column::Oid.not_in_subquery(
281 Query::select()
282 .column(table::Column::TableId)
283 .from(Table)
284 .to_owned(),
285 ),
286 ),
287 )
288 .count(&txn)
289 .await?;
290 if altering_cnt != 0 {
291 return Err(MetaError::permission_denied(
292 "some dependent relations are being altered",
293 ));
294 }
295 }
296
297 let streaming_job_model = match streaming_job {
298 StreamingJob::MaterializedView(table) => {
299 let streaming_job_model = Self::create_streaming_job_obj(
300 &txn,
301 ObjectType::Table,
302 table.owner as _,
303 Some(table.database_id),
304 Some(table.schema_id),
305 create_type,
306 ctx.clone(),
307 adaptive_parallelism_strategy,
308 streaming_parallelism,
309 max_parallelism,
310 resource_type,
311 backfill_parallelism.clone(),
312 backfill_adaptive_parallelism_strategy,
313 )
314 .await?;
315 table.id = streaming_job_model.job_id.as_mv_table_id();
316 let table_model: table::ActiveModel = table.clone().into();
317 Table::insert(table_model).exec(&txn).await?;
318 streaming_job_model
319 }
320 StreamingJob::Sink(sink) => {
321 if let Some(target_table_id) = sink.target_table
322 && check_sink_into_table_cycle(
323 target_table_id.into(),
324 dependencies.iter().cloned().collect(),
325 &txn,
326 )
327 .await?
328 {
329 bail!("Creating such a sink will result in circular dependency.");
330 }
331
332 let streaming_job_model = Self::create_streaming_job_obj(
333 &txn,
334 ObjectType::Sink,
335 sink.owner as _,
336 Some(sink.database_id),
337 Some(sink.schema_id),
338 create_type,
339 ctx.clone(),
340 adaptive_parallelism_strategy,
341 streaming_parallelism,
342 max_parallelism,
343 resource_type,
344 backfill_parallelism.clone(),
345 backfill_adaptive_parallelism_strategy,
346 )
347 .await?;
348 sink.id = streaming_job_model.job_id.as_sink_id();
349 let sink_model: sink::ActiveModel = sink.clone().into();
350 Sink::insert(sink_model).exec(&txn).await?;
351 streaming_job_model
352 }
353 StreamingJob::Table(src, table, _) => {
354 let streaming_job_model = Self::create_streaming_job_obj(
355 &txn,
356 ObjectType::Table,
357 table.owner as _,
358 Some(table.database_id),
359 Some(table.schema_id),
360 create_type,
361 ctx.clone(),
362 adaptive_parallelism_strategy,
363 streaming_parallelism,
364 max_parallelism,
365 resource_type,
366 backfill_parallelism.clone(),
367 backfill_adaptive_parallelism_strategy,
368 )
369 .await?;
370 let job_id = streaming_job_model.job_id;
371 table.id = job_id.as_mv_table_id();
372 if let Some(src) = src {
373 let src_obj = Self::create_object(
374 &txn,
375 ObjectType::Source,
376 src.owner as _,
377 Some(src.database_id),
378 Some(src.schema_id),
379 )
380 .await?;
381 src.id = src_obj.oid.as_source_id();
382 src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
383 table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
384 let source: source::ActiveModel = src.clone().into();
385 Source::insert(source).exec(&txn).await?;
386 }
387 let table_model: table::ActiveModel = table.clone().into();
388 Table::insert(table_model).exec(&txn).await?;
389
390 if table.refreshable {
391 let trigger_interval_secs = src
392 .as_ref()
393 .and_then(|source_catalog| source_catalog.refresh_mode)
394 .and_then(
395 |source_refresh_mode| match source_refresh_mode.refresh_mode {
396 Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
397 refresh_interval_sec,
398 })) => refresh_interval_sec,
399 Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
400 None => None,
401 },
402 );
403
404 RefreshJob::insert(refresh_job::ActiveModel {
405 table_id: Set(table.id),
406 last_trigger_time: Set(None),
407 trigger_interval_secs: Set(trigger_interval_secs),
408 current_status: Set(RefreshState::Idle),
409 last_success_time: Set(None),
410 })
411 .exec(&txn)
412 .await?;
413 }
414 streaming_job_model
415 }
416 StreamingJob::Index(index, table) => {
417 ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
418 let streaming_job_model = Self::create_streaming_job_obj(
419 &txn,
420 ObjectType::Index,
421 index.owner as _,
422 Some(index.database_id),
423 Some(index.schema_id),
424 create_type,
425 ctx.clone(),
426 adaptive_parallelism_strategy,
427 streaming_parallelism,
428 max_parallelism,
429 resource_type,
430 backfill_parallelism.clone(),
431 backfill_adaptive_parallelism_strategy,
432 )
433 .await?;
434 let job_id = streaming_job_model.job_id;
436 index.id = job_id.as_index_id();
437 index.index_table_id = job_id.as_mv_table_id();
438 table.id = job_id.as_mv_table_id();
439
440 ObjectDependency::insert(object_dependency::ActiveModel {
441 oid: Set(index.primary_table_id.into()),
442 used_by: Set(table.id.into()),
443 ..Default::default()
444 })
445 .exec(&txn)
446 .await?;
447
448 let table_model: table::ActiveModel = table.clone().into();
449 Table::insert(table_model).exec(&txn).await?;
450 let index_model: index::ActiveModel = index.clone().into();
451 Index::insert(index_model).exec(&txn).await?;
452 streaming_job_model
453 }
454 StreamingJob::Source(src) => {
455 let streaming_job_model = Self::create_streaming_job_obj(
456 &txn,
457 ObjectType::Source,
458 src.owner as _,
459 Some(src.database_id),
460 Some(src.schema_id),
461 create_type,
462 ctx.clone(),
463 adaptive_parallelism_strategy,
464 streaming_parallelism,
465 max_parallelism,
466 resource_type,
467 backfill_parallelism.clone(),
468 backfill_adaptive_parallelism_strategy,
469 )
470 .await?;
471 src.id = streaming_job_model.job_id.as_shared_source_id();
472 let source_model: source::ActiveModel = src.clone().into();
473 Source::insert(source_model).exec(&txn).await?;
474 streaming_job_model
475 }
476 };
477
478 dependencies.extend(
480 streaming_job
481 .dependent_secret_ids()?
482 .into_iter()
483 .map(|id| id.as_object_id()),
484 );
485 dependencies.extend(
487 streaming_job
488 .dependent_connection_ids()?
489 .into_iter()
490 .map(|id| id.as_object_id()),
491 );
492
493 if !dependencies.is_empty() {
495 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
496 object_dependency::ActiveModel {
497 oid: Set(oid),
498 used_by: Set(streaming_job.id().as_object_id()),
499 ..Default::default()
500 }
501 }))
502 .exec(&txn)
503 .await?;
504 }
505
506 txn.commit().await?;
507
508 Ok(streaming_job_model)
509 }
510
511 pub async fn create_internal_table_catalog(
519 &self,
520 job: &StreamingJob,
521 mut incomplete_internal_tables: Vec<PbTable>,
522 ) -> MetaResult<HashMap<TableId, TableId>> {
523 let job_id = job.id();
524 let inner = self.inner.write().await;
525 let txn = inner.db.begin().await?;
526
527 ensure_job_not_canceled(job_id, &txn).await?;
529
530 let mut table_id_map = HashMap::new();
531 for table in &mut incomplete_internal_tables {
532 let table_id = Self::create_object(
533 &txn,
534 ObjectType::Table,
535 table.owner as _,
536 Some(table.database_id),
537 Some(table.schema_id),
538 )
539 .await?
540 .oid
541 .as_table_id();
542 table_id_map.insert(table.id, table_id);
543 table.id = table_id;
544 table.job_id = Some(job_id);
545
546 let table_model = table::ActiveModel {
547 table_id: Set(table_id),
548 belongs_to_job_id: Set(Some(job_id)),
549 fragment_id: NotSet,
550 ..table.clone().into()
551 };
552 Table::insert(table_model).exec(&txn).await?;
553 }
554 txn.commit().await?;
555
556 Ok(table_id_map)
557 }
558
559 pub async fn prepare_stream_job_fragments(
560 &self,
561 stream_job_fragments: &StreamJobFragmentsToCreate,
562 streaming_job: &StreamingJob,
563 for_replace: bool,
564 backfill_orders: Option<BackfillOrders>,
565 ) -> MetaResult<()> {
566 self.prepare_streaming_job(
567 stream_job_fragments.stream_job_id(),
568 || stream_job_fragments.fragments.values(),
569 &stream_job_fragments.downstreams,
570 for_replace,
571 Some(streaming_job),
572 backfill_orders,
573 )
574 .await
575 }
576
577 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
582 )]
583 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
584 &self,
585 job_id: JobId,
586 get_fragments: impl Fn() -> I + 'a,
587 downstreams: &FragmentDownstreamRelation,
588 for_replace: bool,
589 creating_streaming_job: Option<&'a StreamingJob>,
590 backfill_orders: Option<BackfillOrders>,
591 ) -> MetaResult<()> {
592 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
593
594 let inner = self.inner.write().await;
595
596 let need_notify = creating_streaming_job
597 .map(|job| job.should_notify_creating())
598 .unwrap_or(false);
599 let definition = creating_streaming_job.map(|job| job.definition());
600
601 let mut objects_to_notify = vec![];
602 let txn = inner.db.begin().await?;
603
604 ensure_job_not_canceled(job_id, &txn).await?;
606
607 if let Some(backfill_orders) = backfill_orders {
608 let job = streaming_job::ActiveModel {
609 job_id: Set(job_id),
610 backfill_orders: Set(Some(backfill_orders)),
611 ..Default::default()
612 };
613 StreamingJobModel::update(job).exec(&txn).await?;
614 }
615
616 let state_table_ids = fragments
617 .iter()
618 .flat_map(|fragment| fragment.state_table_ids.inner_ref().clone())
619 .collect_vec();
620
621 let inserted_fragment_ids: Vec<crate::model::FragmentId> = fragments
623 .iter()
624 .map(|f| f.fragment_id as crate::model::FragmentId)
625 .collect();
626
627 if !fragments.is_empty() {
628 let fragment_models = fragments
629 .into_iter()
630 .map(|fragment| fragment.into_active_model())
631 .collect_vec();
632 Fragment::insert_many(fragment_models).exec(&txn).await?;
633 }
634
635 if !for_replace {
638 let all_tables = StreamJobFragments::collect_tables(get_fragments());
639 for state_table_id in state_table_ids {
640 let table = all_tables
644 .get(&state_table_id)
645 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
646 assert_eq!(table.id, state_table_id);
647 let vnode_count = table.vnode_count();
648
649 Table::update(table::ActiveModel {
650 table_id: Set(state_table_id as _),
651 fragment_id: Set(Some(table.fragment_id)),
652 vnode_count: Set(vnode_count as _),
653 ..Default::default()
654 })
655 .exec(&txn)
656 .await?;
657
658 if need_notify {
659 let mut table = table.clone();
660 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
662 table.definition = definition.clone().unwrap();
663 }
664 objects_to_notify.push(PbObject {
665 object_info: Some(PbObjectInfo::Table(table)),
666 });
667 }
668 }
669 }
670
671 if need_notify {
673 match creating_streaming_job.unwrap() {
674 StreamingJob::Table(Some(source), ..) => {
675 objects_to_notify.push(PbObject {
676 object_info: Some(PbObjectInfo::Source(source.clone())),
677 });
678 }
679 StreamingJob::Sink(sink) => {
680 objects_to_notify.push(PbObject {
681 object_info: Some(PbObjectInfo::Sink(sink.clone())),
682 });
683 }
684 StreamingJob::Index(index, _) => {
685 objects_to_notify.push(PbObject {
686 object_info: Some(PbObjectInfo::Index(index.clone())),
687 });
688 }
689 _ => {}
690 }
691 }
692
693 insert_fragment_relations(&txn, downstreams).await?;
694
695 if !for_replace {
696 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
698 Table::update(table::ActiveModel {
699 table_id: Set(table.id),
700 dml_fragment_id: Set(table.dml_fragment_id),
701 ..Default::default()
702 })
703 .exec(&txn)
704 .await?;
705 }
706 }
707
708 txn.commit().await?;
709
710 self.env
714 .notification_manager()
715 .notify_serving_fragment_mapping_update(inserted_fragment_ids);
716
717 if !objects_to_notify.is_empty() {
721 self.notify_frontend(
722 Operation::Add,
723 Info::ObjectGroup(PbObjectGroup {
724 objects: objects_to_notify,
725 dependencies: vec![],
726 }),
727 )
728 .await;
729 }
730
731 Ok(())
732 }
733
734 pub async fn build_cancel_command(
737 &self,
738 table_fragments: &StreamJobFragments,
739 ) -> MetaResult<Command> {
740 let inner = self.inner.read().await;
741 let txn = inner.db.begin().await?;
742
743 let dropped_sink_fragment_with_target =
744 if let Some(sink_fragment) = table_fragments.sink_fragment() {
745 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
746 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
747 sink_target_fragment
748 .get(&sink_fragment_id)
749 .map(|target_fragments| {
750 let target_fragment_id = *target_fragments
751 .first()
752 .expect("sink should have at least one downstream fragment");
753 (sink_fragment_id, target_fragment_id)
754 })
755 } else {
756 None
757 };
758
759 Ok(Command::DropStreamingJobs {
760 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
761 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
762 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
763 .into_iter()
764 .map(|(sink, target)| (target as _, vec![sink as _]))
765 .collect(),
766 })
767 }
768
769 #[await_tree::instrument]
773 pub async fn try_abort_creating_streaming_job(
774 &self,
775 mut job_id: JobId,
776 is_cancelled: bool,
777 ) -> MetaResult<(bool, Option<DatabaseId>)> {
778 let mut inner = self.inner.write().await;
779 let txn = inner.db.begin().await?;
780
781 let obj = Object::find_by_id(job_id).one(&txn).await?;
782 let Some(obj) = obj else {
783 tracing::warn!(
784 id = %job_id,
785 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
786 );
787 return Ok((true, None));
788 };
789 let database_id = obj
790 .database_id
791 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
792 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
793
794 if !is_cancelled && let Some(streaming_job) = &streaming_job {
795 assert_ne!(streaming_job.job_status, JobStatus::Created);
796 if streaming_job.create_type == CreateType::Background
797 && streaming_job.job_status == JobStatus::Creating
798 {
799 if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
800 && check_if_belongs_to_iceberg_table(&txn, job_id).await?
801 {
802 } else {
804 tracing::warn!(
806 id = %job_id,
807 "streaming job is created in background and still in creating status"
808 );
809 return Ok((false, Some(database_id)));
810 }
811 }
812 }
813
814 let original_job_id = job_id;
816 let original_obj_type = obj.obj_type;
817
818 let iceberg_table_id =
819 try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
820 if let Some(iceberg_table_id) = iceberg_table_id {
821 let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
824 Object::delete_many()
825 .filter(
826 object::Column::Oid
827 .eq(job_id)
828 .or(object::Column::Oid.is_in(internal_tables)),
829 )
830 .exec(&txn)
831 .await?;
832 job_id = iceberg_table_id.as_job_id();
833 };
834
835 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
836
837 let mut objs = vec![];
839 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
840
841 let mut need_notify =
842 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
843 if !need_notify {
844 if let Some(table) = &table_obj {
845 need_notify = table.table_type == TableType::MaterializedView;
846 } else if original_obj_type == ObjectType::Sink {
847 need_notify = true;
848 }
849 }
850
851 if is_cancelled {
852 let dropped_tables = Table::find()
853 .find_also_related(Object)
854 .filter(
855 table::Column::TableId.is_in(
856 internal_table_ids
857 .iter()
858 .cloned()
859 .chain(table_obj.iter().map(|t| t.table_id as _)),
860 ),
861 )
862 .all(&txn)
863 .await?
864 .into_iter()
865 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
866 inner
867 .dropped_tables
868 .extend(dropped_tables.map(|t| (t.id, t)));
869 }
870
871 if need_notify {
872 if original_obj_type == ObjectType::Sink && original_job_id != job_id {
875 let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
876 .select_only()
877 .columns([
878 object::Column::Oid,
879 object::Column::ObjType,
880 object::Column::SchemaId,
881 object::Column::DatabaseId,
882 ])
883 .into_partial_model()
884 .one(&txn)
885 .await?;
886 if let Some(orig_obj) = orig_obj {
887 objs.push(orig_obj);
888 }
889 }
890
891 let obj: Option<PartialObject> = Object::find_by_id(job_id)
892 .select_only()
893 .columns([
894 object::Column::Oid,
895 object::Column::ObjType,
896 object::Column::SchemaId,
897 object::Column::DatabaseId,
898 ])
899 .into_partial_model()
900 .one(&txn)
901 .await?;
902 let obj =
903 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
904 objs.push(obj);
905 let internal_table_objs: Vec<PartialObject> = Object::find()
906 .select_only()
907 .columns([
908 object::Column::Oid,
909 object::Column::ObjType,
910 object::Column::SchemaId,
911 object::Column::DatabaseId,
912 ])
913 .join(JoinType::InnerJoin, object::Relation::Table.def())
914 .filter(table::Column::BelongsToJobId.eq(job_id))
915 .into_partial_model()
916 .all(&txn)
917 .await?;
918 objs.extend(internal_table_objs);
919 }
920
921 let abort_fragment_ids: Vec<FragmentId> = Fragment::find()
923 .select_only()
924 .column(fragment::Column::FragmentId)
925 .filter(fragment::Column::JobId.eq(job_id))
926 .into_tuple()
927 .all(&txn)
928 .await?;
929
930 Object::delete_by_id(job_id).exec(&txn).await?;
931 if !internal_table_ids.is_empty() {
932 Object::delete_many()
933 .filter(object::Column::Oid.is_in(internal_table_ids))
934 .exec(&txn)
935 .await?;
936 }
937 if let Some(t) = &table_obj
938 && let Some(source_id) = t.optional_associated_source_id
939 {
940 Object::delete_by_id(source_id).exec(&txn).await?;
941 }
942
943 let err = if is_cancelled {
944 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
945 } else {
946 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
947 };
948 let abort_reason = format!("streaming job aborted {}", err.as_report());
949 for tx in inner
950 .creating_table_finish_notifier
951 .get_mut(&database_id)
952 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
953 .into_iter()
954 .flatten()
955 .flatten()
956 {
957 let _ = tx.send(Err(abort_reason.clone()));
958 }
959 txn.commit().await?;
960
961 self.env
963 .notification_manager()
964 .notify_serving_fragment_mapping_delete(
965 abort_fragment_ids.iter().map(|id| *id as _).collect(),
966 );
967
968 if !objs.is_empty() {
969 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
972 .await;
973 }
974 Ok((true, Some(database_id)))
975 }
976
977 #[await_tree::instrument]
978 pub async fn post_collect_job_fragments(
979 &self,
980 job_id: JobId,
981 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
982 new_sink_downstream: Option<FragmentDownstreamRelation>,
983 split_assignment: Option<&SplitAssignment>,
984 ) -> MetaResult<()> {
985 let inner = self.inner.write().await;
986 let txn = inner.db.begin().await?;
987
988 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
989
990 if let Some(new_downstream) = new_sink_downstream {
991 insert_fragment_relations(&txn, &new_downstream).await?;
992 }
993
994 StreamingJobModel::update(streaming_job::ActiveModel {
996 job_id: Set(job_id),
997 job_status: Set(JobStatus::Creating),
998 ..Default::default()
999 })
1000 .exec(&txn)
1001 .await?;
1002
1003 if let Some(split_assignment) = split_assignment {
1004 let fragment_splits = split_assignment
1005 .iter()
1006 .map(|(fragment_id, splits)| {
1007 (
1008 *fragment_id as _,
1009 splits.values().flatten().cloned().collect_vec(),
1010 )
1011 })
1012 .collect();
1013
1014 self.update_fragment_splits(&txn, &fragment_splits).await?;
1015 }
1016
1017 txn.commit().await?;
1018
1019 Ok(())
1020 }
1021
1022 pub async fn create_job_catalog_for_replace(
1023 &self,
1024 streaming_job: &StreamingJob,
1025 ctx: Option<&StreamContext>,
1026 specified_parallelism: Option<&NonZeroUsize>,
1027 expected_original_max_parallelism: Option<usize>,
1028 ) -> MetaResult<streaming_job::Model> {
1029 let id = streaming_job.id();
1030 let inner = self.inner.write().await;
1031 let txn = inner.db.begin().await?;
1032
1033 streaming_job.verify_version_for_replace(&txn).await?;
1035 let referring_cnt = ObjectDependency::find()
1037 .join(
1038 JoinType::InnerJoin,
1039 object_dependency::Relation::Object1.def(),
1040 )
1041 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
1042 .filter(
1043 object_dependency::Column::Oid
1044 .eq(id)
1045 .and(object::Column::ObjType.eq(ObjectType::Table))
1046 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
1047 )
1048 .count(&txn)
1049 .await?;
1050 if referring_cnt != 0 {
1051 return Err(MetaError::permission_denied(
1052 "job is being altered or referenced by some creating jobs",
1053 ));
1054 }
1055
1056 let original_job = StreamingJobModel::find_by_id(id)
1058 .one(&txn)
1059 .await?
1060 .map(ReplaceOriginalJobInfo::from)
1061 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
1062
1063 if let Some(max_parallelism) = expected_original_max_parallelism
1064 && original_job.max_parallelism != max_parallelism as i32
1065 {
1066 bail!(
1069 "cannot use a different max parallelism \
1070 when replacing streaming job, \
1071 original: {}, new: {}",
1072 original_job.max_parallelism,
1073 max_parallelism
1074 );
1075 }
1076
1077 let parallelism = original_job.resolved_parallelism(specified_parallelism);
1078 let ctx = original_job.stream_context(ctx);
1079 let adaptive_parallelism_strategy = original_job
1080 .adaptive_parallelism_strategy
1081 .as_deref()
1082 .map(|s| parse_strategy(s).expect("strategy should be validated before persisting"));
1083 let resource_type = original_job.resource_type();
1084
1085 let tmp_model = Self::create_streaming_job_obj(
1087 &txn,
1088 streaming_job.object_type(),
1089 streaming_job.owner() as _,
1090 Some(streaming_job.database_id() as _),
1091 Some(streaming_job.schema_id() as _),
1092 streaming_job.create_type(),
1093 ctx,
1094 adaptive_parallelism_strategy,
1095 parallelism,
1096 original_job.max_parallelism as _,
1097 resource_type,
1098 None,
1102 None,
1103 )
1104 .await?;
1105
1106 ObjectDependency::insert(object_dependency::ActiveModel {
1108 oid: Set(id.as_object_id()),
1109 used_by: Set(tmp_model.job_id.as_object_id()),
1110 ..Default::default()
1111 })
1112 .exec(&txn)
1113 .await?;
1114
1115 txn.commit().await?;
1116
1117 Ok(tmp_model)
1118 }
1119
1120 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1122 let mut inner = self.inner.write().await;
1123 let txn = inner.db.begin().await?;
1124
1125 if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1127 tracing::info!(
1128 "streaming job {} is for iceberg table, wait for manual finish operation",
1129 job_id
1130 );
1131 return Ok(());
1132 }
1133
1134 let (notification_op, objects, updated_user_info, dependencies) =
1135 self.finish_streaming_job_inner(&txn, job_id).await?;
1136
1137 txn.commit().await?;
1138
1139 let mut version = self
1140 .notify_frontend(
1141 notification_op,
1142 NotificationInfo::ObjectGroup(PbObjectGroup {
1143 objects,
1144 dependencies,
1145 }),
1146 )
1147 .await;
1148
1149 if !updated_user_info.is_empty() {
1151 version = self.notify_users_update(updated_user_info).await;
1152 }
1153
1154 inner
1155 .creating_table_finish_notifier
1156 .values_mut()
1157 .for_each(|creating_tables| {
1158 if let Some(txs) = creating_tables.remove(&job_id) {
1159 for tx in txs {
1160 let _ = tx.send(Ok(version));
1161 }
1162 }
1163 });
1164
1165 Ok(())
1166 }
1167
1168 pub async fn finish_streaming_job_inner(
1170 &self,
1171 txn: &DatabaseTransaction,
1172 job_id: JobId,
1173 ) -> MetaResult<(
1174 Operation,
1175 Vec<risingwave_pb::meta::Object>,
1176 Vec<PbUserInfo>,
1177 Vec<PbObjectDependency>,
1178 )> {
1179 let job_type = Object::find_by_id(job_id)
1180 .select_only()
1181 .column(object::Column::ObjType)
1182 .into_tuple()
1183 .one(txn)
1184 .await?
1185 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1186
1187 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1188 .select_only()
1189 .column(streaming_job::Column::CreateType)
1190 .into_tuple()
1191 .one(txn)
1192 .await?
1193 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1194
1195 let res = Object::update_many()
1197 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1198 .col_expr(
1199 object::Column::CreatedAtClusterVersion,
1200 current_cluster_version().into(),
1201 )
1202 .filter(object::Column::Oid.eq(job_id))
1203 .exec(txn)
1204 .await?;
1205 if res.rows_affected == 0 {
1206 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1207 }
1208
1209 let job = streaming_job::ActiveModel {
1211 job_id: Set(job_id),
1212 job_status: Set(JobStatus::Created),
1213 ..Default::default()
1214 };
1215 let streaming_job = Some(job.update(txn).await?);
1216
1217 let internal_table_objs = Table::find()
1219 .find_also_related(Object)
1220 .filter(table::Column::BelongsToJobId.eq(job_id))
1221 .all(txn)
1222 .await?;
1223 let mut objects = internal_table_objs
1224 .iter()
1225 .map(|(table, obj)| PbObject {
1226 object_info: Some(PbObjectInfo::Table(
1227 ObjectModel(table.clone(), obj.clone().unwrap(), streaming_job.clone()).into(),
1228 )),
1229 })
1230 .collect_vec();
1231 let mut notification_op = if create_type == CreateType::Background {
1232 NotificationOperation::Update
1233 } else {
1234 NotificationOperation::Add
1235 };
1236 let mut updated_user_info = vec![];
1237 let mut need_grant_default_privileges = true;
1238
1239 match job_type {
1240 ObjectType::Table => {
1241 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1242 .find_also_related(Object)
1243 .one(txn)
1244 .await?
1245 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1246 if table.table_type == TableType::MaterializedView {
1247 notification_op = NotificationOperation::Update;
1248 }
1249
1250 if let Some(source_id) = table.optional_associated_source_id {
1251 let (src, obj) = Source::find_by_id(source_id)
1252 .find_also_related(Object)
1253 .one(txn)
1254 .await?
1255 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1256 objects.push(PbObject {
1257 object_info: Some(PbObjectInfo::Source(
1258 ObjectModel(src, obj.unwrap(), None).into(),
1259 )),
1260 });
1261 }
1262 objects.push(PbObject {
1263 object_info: Some(PbObjectInfo::Table(
1264 ObjectModel(table, obj.unwrap(), streaming_job).into(),
1265 )),
1266 });
1267 }
1268 ObjectType::Sink => {
1269 let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1270 .find_also_related(Object)
1271 .one(txn)
1272 .await?
1273 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1274 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1275 need_grant_default_privileges = false;
1276 }
1277 notification_op = NotificationOperation::Update;
1280 objects.push(PbObject {
1281 object_info: Some(PbObjectInfo::Sink(
1282 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
1283 )),
1284 });
1285 }
1286 ObjectType::Index => {
1287 need_grant_default_privileges = false;
1288 let (index, obj) = Index::find_by_id(job_id.as_index_id())
1289 .find_also_related(Object)
1290 .one(txn)
1291 .await?
1292 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1293 {
1294 let (table, obj) = Table::find_by_id(index.index_table_id)
1295 .find_also_related(Object)
1296 .one(txn)
1297 .await?
1298 .ok_or_else(|| {
1299 MetaError::catalog_id_not_found("table", index.index_table_id)
1300 })?;
1301 objects.push(PbObject {
1302 object_info: Some(PbObjectInfo::Table(
1303 ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1304 )),
1305 });
1306 }
1307
1308 let primary_table_privileges = UserPrivilege::find()
1311 .filter(
1312 user_privilege::Column::Oid
1313 .eq(index.primary_table_id)
1314 .and(user_privilege::Column::Action.eq(Action::Select)),
1315 )
1316 .all(txn)
1317 .await?;
1318 if !primary_table_privileges.is_empty() {
1319 let index_state_table_ids: Vec<TableId> = Table::find()
1320 .select_only()
1321 .column(table::Column::TableId)
1322 .filter(
1323 table::Column::BelongsToJobId
1324 .eq(job_id)
1325 .or(table::Column::TableId.eq(index.index_table_id)),
1326 )
1327 .into_tuple()
1328 .all(txn)
1329 .await?;
1330 let mut new_privileges = vec![];
1331 for privilege in &primary_table_privileges {
1332 for state_table_id in &index_state_table_ids {
1333 new_privileges.push(user_privilege::ActiveModel {
1334 id: Default::default(),
1335 oid: Set(state_table_id.as_object_id()),
1336 user_id: Set(privilege.user_id),
1337 action: Set(Action::Select),
1338 dependent_id: Set(privilege.dependent_id),
1339 granted_by: Set(privilege.granted_by),
1340 with_grant_option: Set(privilege.with_grant_option),
1341 });
1342 }
1343 }
1344 UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1345
1346 updated_user_info = list_user_info_by_ids(
1347 primary_table_privileges.into_iter().map(|p| p.user_id),
1348 txn,
1349 )
1350 .await?;
1351 }
1352
1353 objects.push(PbObject {
1354 object_info: Some(PbObjectInfo::Index(
1355 ObjectModel(index, obj.unwrap(), streaming_job).into(),
1356 )),
1357 });
1358 }
1359 ObjectType::Source => {
1360 let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1361 .find_also_related(Object)
1362 .one(txn)
1363 .await?
1364 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1365 objects.push(PbObject {
1366 object_info: Some(PbObjectInfo::Source(
1367 ObjectModel(source, obj.unwrap(), None).into(),
1368 )),
1369 });
1370 }
1371 _ => unreachable!("invalid job type: {:?}", job_type),
1372 }
1373
1374 if need_grant_default_privileges {
1375 updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1376 }
1377
1378 let dependencies =
1379 list_object_dependencies_by_object_id(txn, job_id.as_object_id()).await?;
1380
1381 Ok((notification_op, objects, updated_user_info, dependencies))
1382 }
1383
1384 pub async fn finish_replace_streaming_job(
1385 &self,
1386 tmp_id: JobId,
1387 streaming_job: StreamingJob,
1388 replace_upstream: FragmentReplaceUpstream,
1389 sink_into_table_context: SinkIntoTableContext,
1390 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1391 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1392 ) -> MetaResult<NotificationVersion> {
1393 let inner = self.inner.write().await;
1394 let txn = inner.db.begin().await?;
1395
1396 let (objects, delete_notification_objs, old_fragment_ids, new_fragment_ids) =
1397 Self::finish_replace_streaming_job_inner(
1398 tmp_id,
1399 replace_upstream,
1400 sink_into_table_context,
1401 &txn,
1402 streaming_job,
1403 drop_table_connector_ctx,
1404 auto_refresh_schema_sinks,
1405 )
1406 .await?;
1407
1408 txn.commit().await?;
1409
1410 let notification_manager = self.env.notification_manager();
1412 notification_manager.notify_serving_fragment_mapping_delete(
1413 old_fragment_ids.iter().map(|id| *id as _).collect(),
1414 );
1415 notification_manager.notify_serving_fragment_mapping_update(
1416 new_fragment_ids.iter().map(|id| *id as _).collect(),
1417 );
1418
1419 let mut version = self
1420 .notify_frontend(
1421 NotificationOperation::Update,
1422 NotificationInfo::ObjectGroup(PbObjectGroup {
1423 objects,
1424 dependencies: vec![],
1425 }),
1426 )
1427 .await;
1428
1429 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1430 self.notify_users_update(user_infos).await;
1431 version = self
1432 .notify_frontend(
1433 NotificationOperation::Delete,
1434 build_object_group_for_delete(to_drop_objects),
1435 )
1436 .await;
1437 }
1438
1439 Ok(version)
1440 }
1441
1442 fn update_iceberg_source_columns(
1443 original_source_columns: &[ColumnCatalog],
1444 original_row_id_index: Option<usize>,
1445 new_table_columns: &[ColumnCatalog],
1446 ) -> (Vec<ColumnCatalog>, Option<usize>) {
1447 let row_id_column_name = original_row_id_index
1448 .and_then(|idx| original_source_columns.get(idx))
1449 .map(|col| col.name().to_owned());
1450 let mut next_column_id = max_column_id(original_source_columns).next();
1451 let existing_columns: HashMap<String, ColumnCatalog> = original_source_columns
1452 .iter()
1453 .cloned()
1454 .map(|col| (col.name().to_owned(), col))
1455 .collect();
1456
1457 let mut new_columns = Vec::new();
1458 for table_col in new_table_columns
1459 .iter()
1460 .filter(|col| !col.is_rw_sys_column())
1461 {
1462 let mut source_col_name = table_col.name().to_owned();
1463 if source_col_name == ROW_ID_COLUMN_NAME {
1464 source_col_name = RISINGWAVE_ICEBERG_ROW_ID.to_owned();
1465 }
1466
1467 if let Some(existing) = existing_columns.get(&source_col_name) {
1468 new_columns.push(existing.clone());
1469 } else {
1470 let mut new_col = table_col.clone();
1471 new_col.column_desc.name = source_col_name;
1472 new_col.column_desc.column_id = next_column_id;
1473 next_column_id = next_column_id.next();
1474 new_columns.push(new_col);
1475 }
1476 }
1477
1478 let mut seen_names: HashSet<String> = new_columns
1479 .iter()
1480 .map(|col| col.name().to_owned())
1481 .collect();
1482 for col in original_source_columns
1483 .iter()
1484 .filter(|col| col.is_iceberg_hidden_column())
1485 {
1486 if seen_names.insert(col.name().to_owned()) {
1487 new_columns.push(col.clone());
1488 }
1489 }
1490
1491 let new_row_id_index = row_id_column_name
1492 .as_ref()
1493 .and_then(|name| new_columns.iter().position(|col| col.name() == name));
1494
1495 (new_columns, new_row_id_index)
1496 }
1497
1498 pub async fn finish_replace_streaming_job_inner(
1499 tmp_id: JobId,
1500 replace_upstream: FragmentReplaceUpstream,
1501 SinkIntoTableContext {
1502 updated_sink_catalogs,
1503 }: SinkIntoTableContext,
1504 txn: &DatabaseTransaction,
1505 streaming_job: StreamingJob,
1506 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1507 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1508 ) -> MetaResult<(
1509 Vec<PbObject>,
1510 Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1511 Vec<FragmentId>,
1512 Vec<FragmentId>,
1513 )> {
1514 let original_job_id = streaming_job.id();
1515 let job_type = streaming_job.job_type();
1516
1517 let old_fragment_ids: Vec<FragmentId> = Fragment::find()
1519 .select_only()
1520 .column(fragment::Column::FragmentId)
1521 .filter(fragment::Column::JobId.eq(original_job_id))
1522 .into_tuple()
1523 .all(txn)
1524 .await?;
1525 let new_fragment_ids: Vec<FragmentId> = Fragment::find()
1526 .select_only()
1527 .column(fragment::Column::FragmentId)
1528 .filter(fragment::Column::JobId.eq(tmp_id))
1529 .into_tuple()
1530 .all(txn)
1531 .await?;
1532
1533 let mut index_item_rewriter = None;
1534 let mut updated_iceberg_source_id: Option<SourceId> = None;
1535
1536 match streaming_job {
1538 StreamingJob::Table(_, table, _table_job_type) => {
1539 let original_column_catalogs =
1540 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1541 let schema_changed = original_column_catalogs.to_protobuf() != table.columns;
1542 let is_iceberg = table
1543 .engine
1544 .and_then(|engine| PbEngine::try_from(engine).ok())
1545 == Some(PbEngine::Iceberg);
1546 if is_iceberg && schema_changed {
1547 let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1548 let source = Source::find()
1549 .inner_join(Object)
1550 .filter(
1551 object::Column::DatabaseId
1552 .eq(table.database_id)
1553 .and(object::Column::SchemaId.eq(table.schema_id))
1554 .and(source::Column::Name.eq(&iceberg_source_name)),
1555 )
1556 .one(txn)
1557 .await?
1558 .ok_or_else(|| {
1559 MetaError::catalog_id_not_found("source", iceberg_source_name)
1560 })?;
1561
1562 let source_id = source.source_id;
1563 let source_version = source.version;
1564 let source_columns: Vec<ColumnCatalog> = source
1565 .columns
1566 .to_protobuf()
1567 .into_iter()
1568 .map(ColumnCatalog::from)
1569 .collect();
1570 let source_row_id_index = source
1571 .row_id_index
1572 .and_then(|idx| usize::try_from(idx).ok());
1573 let table_columns: Vec<ColumnCatalog> = table
1574 .columns
1575 .iter()
1576 .cloned()
1577 .map(ColumnCatalog::from)
1578 .collect();
1579 let (updated_columns, updated_row_id_index) =
1580 Self::update_iceberg_source_columns(
1581 &source_columns,
1582 source_row_id_index,
1583 &table_columns,
1584 );
1585 let updated_columns: Vec<PbColumnCatalog> = updated_columns
1586 .into_iter()
1587 .map(|col| col.to_protobuf())
1588 .collect();
1589
1590 let mut source_active = source.into_active_model();
1591 source_active.columns = Set(ColumnCatalogArray::from(updated_columns));
1592 source_active.row_id_index = Set(updated_row_id_index.map(|idx| idx as i32));
1593 source_active.version = Set(source_version + 1);
1594 source_active.update(txn).await?;
1595 updated_iceberg_source_id = Some(source_id);
1596 }
1597
1598 index_item_rewriter = Some({
1599 let original_columns = original_column_catalogs
1600 .to_protobuf()
1601 .into_iter()
1602 .map(|c| c.column_desc.unwrap())
1603 .collect_vec();
1604 let new_columns = table
1605 .columns
1606 .iter()
1607 .map(|c| c.column_desc.clone().unwrap())
1608 .collect_vec();
1609
1610 IndexItemRewriter {
1611 original_columns,
1612 new_columns,
1613 }
1614 });
1615
1616 for sink_id in updated_sink_catalogs {
1618 Sink::update(sink::ActiveModel {
1619 sink_id: Set(sink_id as _),
1620 original_target_columns: Set(Some(original_column_catalogs.clone())),
1621 ..Default::default()
1622 })
1623 .exec(txn)
1624 .await?;
1625 }
1626 let mut table = table::ActiveModel::from(table);
1628 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1629 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1630 {
1631 table.optional_associated_source_id = Set(None);
1633 }
1634
1635 Table::update(table).exec(txn).await?;
1636 }
1637 StreamingJob::Source(source) => {
1638 let source = source::ActiveModel::from(source);
1640 Source::update(source).exec(txn).await?;
1641 }
1642 StreamingJob::MaterializedView(table) => {
1643 let table = table::ActiveModel::from(table);
1645 Table::update(table).exec(txn).await?;
1646 }
1647 _ => unreachable!(
1648 "invalid streaming job type: {:?}",
1649 streaming_job.job_type_str()
1650 ),
1651 }
1652
1653 async fn finish_fragments(
1654 txn: &DatabaseTransaction,
1655 tmp_id: JobId,
1656 original_job_id: JobId,
1657 replace_upstream: FragmentReplaceUpstream,
1658 ) -> MetaResult<()> {
1659 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1663 .select_only()
1664 .columns([
1665 fragment::Column::FragmentId,
1666 fragment::Column::StateTableIds,
1667 ])
1668 .filter(fragment::Column::JobId.eq(tmp_id))
1669 .into_tuple()
1670 .all(txn)
1671 .await?;
1672 for (fragment_id, state_table_ids) in fragment_info {
1673 for state_table_id in state_table_ids.into_inner() {
1674 let state_table_id = TableId::new(state_table_id as _);
1675 Table::update(table::ActiveModel {
1676 table_id: Set(state_table_id),
1677 fragment_id: Set(Some(fragment_id)),
1678 ..Default::default()
1680 })
1681 .exec(txn)
1682 .await?;
1683 }
1684 }
1685
1686 Fragment::delete_many()
1688 .filter(fragment::Column::JobId.eq(original_job_id))
1689 .exec(txn)
1690 .await?;
1691 Fragment::update_many()
1692 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1693 .filter(fragment::Column::JobId.eq(tmp_id))
1694 .exec(txn)
1695 .await?;
1696
1697 for (fragment_id, fragment_replace_map) in replace_upstream {
1700 let (fragment_id, mut stream_node) =
1701 Fragment::find_by_id(fragment_id as FragmentId)
1702 .select_only()
1703 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1704 .into_tuple::<(FragmentId, StreamNode)>()
1705 .one(txn)
1706 .await?
1707 .map(|(id, node)| (id, node.to_protobuf()))
1708 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1709
1710 visit_stream_node_mut(&mut stream_node, |body| {
1711 if let PbNodeBody::Merge(m) = body
1712 && let Some(new_fragment_id) =
1713 fragment_replace_map.get(&m.upstream_fragment_id)
1714 {
1715 m.upstream_fragment_id = *new_fragment_id;
1716 }
1717 });
1718 Fragment::update(fragment::ActiveModel {
1719 fragment_id: Set(fragment_id),
1720 stream_node: Set(StreamNode::from(&stream_node)),
1721 ..Default::default()
1722 })
1723 .exec(txn)
1724 .await?;
1725 }
1726
1727 Object::delete_by_id(tmp_id).exec(txn).await?;
1729
1730 Ok(())
1731 }
1732
1733 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1734
1735 let mut objects = vec![];
1737 match job_type {
1738 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1739 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1740 .find_also_related(Object)
1741 .one(txn)
1742 .await?
1743 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1744 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
1745 .one(txn)
1746 .await?;
1747 objects.push(PbObject {
1748 object_info: Some(PbObjectInfo::Table(
1749 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
1750 )),
1751 })
1752 }
1753 StreamingJobType::Source => {
1754 let (source, source_obj) =
1755 Source::find_by_id(original_job_id.as_shared_source_id())
1756 .find_also_related(Object)
1757 .one(txn)
1758 .await?
1759 .ok_or_else(|| {
1760 MetaError::catalog_id_not_found("object", original_job_id)
1761 })?;
1762 objects.push(PbObject {
1763 object_info: Some(PbObjectInfo::Source(
1764 ObjectModel(source, source_obj.unwrap(), None).into(),
1765 )),
1766 })
1767 }
1768 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1769 }
1770
1771 if let Some(source_id) = updated_iceberg_source_id {
1772 let (source, source_obj) = Source::find_by_id(source_id)
1773 .find_also_related(Object)
1774 .one(txn)
1775 .await?
1776 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1777 objects.push(PbObject {
1778 object_info: Some(PbObjectInfo::Source(
1779 ObjectModel(source, source_obj.unwrap(), None).into(),
1780 )),
1781 });
1782 }
1783
1784 if let Some(expr_rewriter) = index_item_rewriter {
1785 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1786 .select_only()
1787 .columns([index::Column::IndexId, index::Column::IndexItems])
1788 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1789 .into_tuple()
1790 .all(txn)
1791 .await?;
1792 for (index_id, nodes) in index_items {
1793 let mut pb_nodes = nodes.to_protobuf();
1794 pb_nodes
1795 .iter_mut()
1796 .for_each(|x| expr_rewriter.rewrite_expr(x));
1797 let index = index::ActiveModel {
1798 index_id: Set(index_id),
1799 index_items: Set(pb_nodes.into()),
1800 ..Default::default()
1801 }
1802 .update(txn)
1803 .await?;
1804 let (index_obj, streaming_job) = Object::find_by_id(index.index_id)
1805 .find_also_related(streaming_job::Entity)
1806 .one(txn)
1807 .await?
1808 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1809 objects.push(PbObject {
1810 object_info: Some(PbObjectInfo::Index(
1811 ObjectModel(index, index_obj, streaming_job).into(),
1812 )),
1813 });
1814 }
1815 }
1816
1817 if let Some(sinks) = auto_refresh_schema_sinks {
1818 for finish_sink_context in sinks {
1819 finish_fragments(
1820 txn,
1821 finish_sink_context.tmp_sink_id.as_job_id(),
1822 finish_sink_context.original_sink_id.as_job_id(),
1823 Default::default(),
1824 )
1825 .await?;
1826 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1827 .find_also_related(Object)
1828 .one(txn)
1829 .await?
1830 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1831 let sink_streaming_job =
1832 streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
1833 .one(txn)
1834 .await?;
1835 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1836 Sink::update(sink::ActiveModel {
1837 sink_id: Set(finish_sink_context.original_sink_id),
1838 columns: Set(columns.clone()),
1839 ..Default::default()
1840 })
1841 .exec(txn)
1842 .await?;
1843 sink.columns = columns;
1844 objects.push(PbObject {
1845 object_info: Some(PbObjectInfo::Sink(
1846 ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job.clone()).into(),
1847 )),
1848 });
1849 if let Some(new_log_store_table) = finish_sink_context.new_log_store_table {
1850 let log_store_table_id = new_log_store_table.id;
1851 let new_log_store_table_columns: ColumnCatalogArray =
1852 new_log_store_table.columns.clone().into();
1853 let new_log_store_table_value_indices =
1854 new_log_store_table.value_indices.clone();
1855 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1856 .find_also_related(Object)
1857 .one(txn)
1858 .await?
1859 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1860 Table::update(table::ActiveModel {
1861 table_id: Set(log_store_table_id),
1862 columns: Set(new_log_store_table_columns.clone()),
1863 value_indices: Set(new_log_store_table_value_indices.clone().into()),
1864 ..Default::default()
1865 })
1866 .exec(txn)
1867 .await?;
1868 table.columns = new_log_store_table_columns;
1869 table.value_indices = new_log_store_table_value_indices.into();
1870 objects.push(PbObject {
1871 object_info: Some(PbObjectInfo::Table(
1872 ObjectModel(table, table_obj.unwrap(), sink_streaming_job.clone())
1873 .into(),
1874 )),
1875 });
1876 }
1877 }
1878 }
1879
1880 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1881 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1882 notification_objs =
1883 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1884 }
1885
1886 Ok((
1887 objects,
1888 notification_objs,
1889 old_fragment_ids,
1890 new_fragment_ids,
1891 ))
1892 }
1893
1894 pub async fn try_abort_replacing_streaming_job(
1896 &self,
1897 tmp_job_id: JobId,
1898 tmp_sink_ids: Option<Vec<ObjectId>>,
1899 ) -> MetaResult<()> {
1900 let inner = self.inner.write().await;
1901 let txn = inner.db.begin().await?;
1902
1903 let mut all_job_ids: Vec<ObjectId> = vec![tmp_job_id.into()];
1905 if let Some(ref sink_ids) = tmp_sink_ids {
1906 all_job_ids.extend(sink_ids.iter().copied());
1907 }
1908 let abort_fragment_ids: Vec<FragmentId> = Fragment::find()
1909 .select_only()
1910 .column(fragment::Column::FragmentId)
1911 .filter(fragment::Column::JobId.is_in(all_job_ids))
1912 .into_tuple()
1913 .all(&txn)
1914 .await?;
1915
1916 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1917 if let Some(tmp_sink_ids) = tmp_sink_ids {
1918 for tmp_sink_id in tmp_sink_ids {
1919 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1920 }
1921 }
1922 txn.commit().await?;
1923
1924 self.env
1926 .notification_manager()
1927 .notify_serving_fragment_mapping_delete(
1928 abort_fragment_ids.iter().map(|id| *id as _).collect(),
1929 );
1930
1931 Ok(())
1932 }
1933
1934 pub async fn update_source_rate_limit_by_source_id(
1937 &self,
1938 source_id: SourceId,
1939 rate_limit: Option<u32>,
1940 ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1941 let inner = self.inner.read().await;
1942 let txn = inner.db.begin().await?;
1943
1944 {
1945 let active_source = source::ActiveModel {
1946 source_id: Set(source_id),
1947 rate_limit: Set(rate_limit.map(|v| v as i32)),
1948 ..Default::default()
1949 };
1950 Source::update(active_source).exec(&txn).await?;
1951 }
1952
1953 let (source, obj) = Source::find_by_id(source_id)
1954 .find_also_related(Object)
1955 .one(&txn)
1956 .await?
1957 .ok_or_else(|| {
1958 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1959 })?;
1960
1961 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1962 let streaming_job_ids: Vec<JobId> =
1963 if let Some(table_id) = source.optional_associated_table_id {
1964 vec![table_id.as_job_id()]
1965 } else if let Some(source_info) = &source.source_info
1966 && source_info.to_protobuf().is_shared()
1967 {
1968 vec![source_id.as_share_source_job_id()]
1969 } else {
1970 ObjectDependency::find()
1971 .select_only()
1972 .column(object_dependency::Column::UsedBy)
1973 .filter(object_dependency::Column::Oid.eq(source_id))
1974 .into_tuple()
1975 .all(&txn)
1976 .await?
1977 };
1978
1979 if streaming_job_ids.is_empty() {
1980 return Err(MetaError::invalid_parameter(format!(
1981 "source id {source_id} not used by any streaming job"
1982 )));
1983 }
1984
1985 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
1986 .select_only()
1987 .columns([
1988 fragment::Column::FragmentId,
1989 fragment::Column::JobId,
1990 fragment::Column::FragmentTypeMask,
1991 fragment::Column::StreamNode,
1992 ])
1993 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1994 .into_tuple()
1995 .all(&txn)
1996 .await?;
1997 let mut fragments = fragments
1998 .into_iter()
1999 .map(|(id, job_id, mask, stream_node)| {
2000 (
2001 id,
2002 job_id,
2003 FragmentTypeMask::from(mask as u32),
2004 stream_node.to_protobuf(),
2005 )
2006 })
2007 .collect_vec();
2008
2009 fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
2010 let mut found = false;
2011 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
2012 visit_stream_node_mut(stream_node, |node| {
2013 if let PbNodeBody::Source(node) = node
2014 && let Some(node_inner) = &mut node.source_inner
2015 && node_inner.source_id == source_id
2016 {
2017 node_inner.rate_limit = rate_limit;
2018 found = true;
2019 }
2020 });
2021 }
2022 if is_fs_source {
2023 visit_stream_node_mut(stream_node, |node| {
2026 if let PbNodeBody::StreamFsFetch(node) = node {
2027 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
2028 if let Some(node_inner) = &mut node.node_inner
2029 && node_inner.source_id == source_id
2030 {
2031 node_inner.rate_limit = rate_limit;
2032 found = true;
2033 }
2034 }
2035 });
2036 }
2037 found
2038 });
2039
2040 assert!(
2041 !fragments.is_empty(),
2042 "source id should be used by at least one fragment"
2043 );
2044
2045 let (fragment_ids, job_ids) = fragments
2046 .iter()
2047 .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
2048 .unzip();
2049
2050 for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
2051 Fragment::update(fragment::ActiveModel {
2052 fragment_id: Set(fragment_id),
2053 fragment_type_mask: Set(fragment_type_mask.into()),
2054 stream_node: Set(StreamNode::from(&stream_node)),
2055 ..Default::default()
2056 })
2057 .exec(&txn)
2058 .await?;
2059 }
2060
2061 txn.commit().await?;
2062
2063 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap(), None).into());
2064 let _version = self
2065 .notify_frontend(
2066 NotificationOperation::Update,
2067 NotificationInfo::ObjectGroup(PbObjectGroup {
2068 objects: vec![PbObject {
2069 object_info: Some(relation_info),
2070 }],
2071 dependencies: vec![],
2072 }),
2073 )
2074 .await;
2075
2076 Ok((job_ids, fragment_ids))
2077 }
2078
2079 pub async fn mutate_fragments_by_job_id(
2082 &self,
2083 job_id: JobId,
2084 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
2086 err_msg: &'static str,
2088 ) -> MetaResult<HashSet<FragmentId>> {
2089 let inner = self.inner.read().await;
2090 let txn = inner.db.begin().await?;
2091
2092 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2093 .select_only()
2094 .columns([
2095 fragment::Column::FragmentId,
2096 fragment::Column::FragmentTypeMask,
2097 fragment::Column::StreamNode,
2098 ])
2099 .filter(fragment::Column::JobId.eq(job_id))
2100 .into_tuple()
2101 .all(&txn)
2102 .await?;
2103 let mut fragments = fragments
2104 .into_iter()
2105 .map(|(id, mask, stream_node)| {
2106 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
2107 })
2108 .collect_vec();
2109
2110 let fragments = fragments
2111 .iter_mut()
2112 .map(|(_, fragment_type_mask, stream_node)| {
2113 fragments_mutation_fn(*fragment_type_mask, stream_node)
2114 })
2115 .collect::<MetaResult<Vec<bool>>>()?
2116 .into_iter()
2117 .zip_eq_debug(std::mem::take(&mut fragments))
2118 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
2119 .collect::<Vec<_>>();
2120
2121 if fragments.is_empty() {
2122 return Err(MetaError::invalid_parameter(format!(
2123 "job id {job_id}: {}",
2124 err_msg
2125 )));
2126 }
2127
2128 let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
2129 for (id, _, stream_node) in fragments {
2130 Fragment::update(fragment::ActiveModel {
2131 fragment_id: Set(id),
2132 stream_node: Set(StreamNode::from(&stream_node)),
2133 ..Default::default()
2134 })
2135 .exec(&txn)
2136 .await?;
2137 }
2138
2139 txn.commit().await?;
2140
2141 Ok(fragment_ids)
2142 }
2143
2144 async fn mutate_fragment_by_fragment_id(
2145 &self,
2146 fragment_id: FragmentId,
2147 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
2148 err_msg: &'static str,
2149 ) -> MetaResult<()> {
2150 let inner = self.inner.read().await;
2151 let txn = inner.db.begin().await?;
2152
2153 let (fragment_type_mask, stream_node): (i32, StreamNode) =
2154 Fragment::find_by_id(fragment_id)
2155 .select_only()
2156 .columns([
2157 fragment::Column::FragmentTypeMask,
2158 fragment::Column::StreamNode,
2159 ])
2160 .into_tuple()
2161 .one(&txn)
2162 .await?
2163 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
2164 let mut pb_stream_node = stream_node.to_protobuf();
2165 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
2166
2167 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
2168 return Err(MetaError::invalid_parameter(format!(
2169 "fragment id {fragment_id}: {}",
2170 err_msg
2171 )));
2172 }
2173
2174 Fragment::update(fragment::ActiveModel {
2175 fragment_id: Set(fragment_id),
2176 stream_node: Set(stream_node),
2177 ..Default::default()
2178 })
2179 .exec(&txn)
2180 .await?;
2181
2182 txn.commit().await?;
2183
2184 Ok(())
2185 }
2186
2187 pub async fn update_backfill_orders_by_job_id(
2188 &self,
2189 job_id: JobId,
2190 backfill_orders: Option<BackfillOrders>,
2191 ) -> MetaResult<()> {
2192 let inner = self.inner.write().await;
2193 let txn = inner.db.begin().await?;
2194
2195 ensure_job_not_canceled(job_id, &txn).await?;
2196
2197 streaming_job::ActiveModel {
2198 job_id: Set(job_id),
2199 backfill_orders: Set(backfill_orders),
2200 ..Default::default()
2201 }
2202 .update(&txn)
2203 .await?;
2204
2205 txn.commit().await?;
2206
2207 Ok(())
2208 }
2209
2210 pub async fn update_backfill_rate_limit_by_job_id(
2213 &self,
2214 job_id: JobId,
2215 rate_limit: Option<u32>,
2216 ) -> MetaResult<HashSet<FragmentId>> {
2217 let update_backfill_rate_limit =
2218 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2219 let mut found = false;
2220 if fragment_type_mask
2221 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
2222 {
2223 visit_stream_node_mut(stream_node, |node| match node {
2224 PbNodeBody::StreamCdcScan(node) => {
2225 node.rate_limit = rate_limit;
2226 found = true;
2227 }
2228 PbNodeBody::StreamScan(node) => {
2229 node.rate_limit = rate_limit;
2230 found = true;
2231 }
2232 PbNodeBody::SourceBackfill(node) => {
2233 node.rate_limit = rate_limit;
2234 found = true;
2235 }
2236 _ => {}
2237 });
2238 }
2239 Ok(found)
2240 };
2241
2242 self.mutate_fragments_by_job_id(
2243 job_id,
2244 update_backfill_rate_limit,
2245 "stream scan node or source node not found",
2246 )
2247 .await
2248 }
2249
2250 pub async fn update_sink_rate_limit_by_job_id(
2253 &self,
2254 sink_id: SinkId,
2255 rate_limit: Option<u32>,
2256 ) -> MetaResult<HashSet<FragmentId>> {
2257 let update_sink_rate_limit =
2258 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2259 let mut found = Ok(false);
2260 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
2261 visit_stream_node_mut(stream_node, |node| {
2262 if let PbNodeBody::Sink(node) = node {
2263 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
2264 found = Err(MetaError::invalid_parameter(
2265 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
2266 ));
2267 return;
2268 }
2269 node.rate_limit = rate_limit;
2270 found = Ok(true);
2271 }
2272 });
2273 }
2274 found
2275 };
2276
2277 self.mutate_fragments_by_job_id(
2278 sink_id.as_job_id(),
2279 update_sink_rate_limit,
2280 "sink node not found",
2281 )
2282 .await
2283 }
2284
2285 pub async fn update_dml_rate_limit_by_job_id(
2286 &self,
2287 job_id: JobId,
2288 rate_limit: Option<u32>,
2289 ) -> MetaResult<HashSet<FragmentId>> {
2290 let update_dml_rate_limit =
2291 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2292 let mut found = false;
2293 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
2294 visit_stream_node_mut(stream_node, |node| {
2295 if let PbNodeBody::Dml(node) = node {
2296 node.rate_limit = rate_limit;
2297 found = true;
2298 }
2299 });
2300 }
2301 Ok(found)
2302 };
2303
2304 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
2305 .await
2306 }
2307
2308 pub async fn update_source_props_by_source_id(
2309 &self,
2310 source_id: SourceId,
2311 alter_props: BTreeMap<String, String>,
2312 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2313 skip_alter_on_fly_check: bool,
2314 ) -> MetaResult<WithOptionsSecResolved> {
2315 let inner = self.inner.read().await;
2316 let txn = inner.db.begin().await?;
2317
2318 let (source, _obj) = Source::find_by_id(source_id)
2319 .find_also_related(Object)
2320 .one(&txn)
2321 .await?
2322 .ok_or_else(|| {
2323 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2324 })?;
2325 let connector = source.with_properties.0.get_connector().unwrap();
2326 let is_shared_source = source.is_shared();
2327
2328 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2329 if !is_shared_source {
2330 dep_source_job_ids = ObjectDependency::find()
2332 .select_only()
2333 .column(object_dependency::Column::UsedBy)
2334 .filter(object_dependency::Column::Oid.eq(source_id))
2335 .into_tuple()
2336 .all(&txn)
2337 .await?;
2338 }
2339
2340 if let Some(new_connector) = alter_props.get(UPSTREAM_SOURCE_KEY)
2342 && new_connector != &connector
2343 {
2344 return Err(MetaError::invalid_parameter(format!(
2345 "Cannot change connector type from '{}' to '{}'. Drop and recreate the source instead.",
2346 connector, new_connector
2347 )));
2348 }
2349
2350 if !skip_alter_on_fly_check {
2352 let prop_keys: Vec<String> = alter_props
2353 .keys()
2354 .chain(alter_secret_refs.keys())
2355 .cloned()
2356 .collect();
2357 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2358 &connector, &prop_keys,
2359 )?;
2360 }
2361
2362 let mut options_with_secret = WithOptionsSecResolved::new(
2363 source.with_properties.0.clone(),
2364 source
2365 .secret_ref
2366 .map(|secret_ref| secret_ref.to_protobuf())
2367 .unwrap_or_default(),
2368 );
2369 let (to_add_secret_dep, to_remove_secret_dep) =
2370 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2371
2372 tracing::info!(
2373 "applying new properties to source: source_id={}, options_with_secret={:?}",
2374 source_id,
2375 options_with_secret
2376 );
2377 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2379 let mut associate_table_id = None;
2382
2383 let mut preferred_id = source_id.as_object_id();
2387 let rewrite_sql = {
2388 let definition = source.definition.clone();
2389
2390 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2391 .map_err(|e| {
2392 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2393 anyhow!(e).context("Failed to parse source definition SQL"),
2394 )))
2395 })?
2396 .try_into()
2397 .unwrap();
2398
2399 async fn format_with_option_secret_resolved(
2413 txn: &DatabaseTransaction,
2414 options_with_secret: &WithOptionsSecResolved,
2415 ) -> MetaResult<Vec<SqlOption>> {
2416 let mut options = Vec::new();
2417 for (k, v) in options_with_secret.as_plaintext() {
2418 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2419 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2420 options.push(sql_option);
2421 }
2422 for (k, v) in options_with_secret.as_secret() {
2423 if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2424 let sql_option =
2425 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2426 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2427 options.push(sql_option);
2428 } else {
2429 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2430 }
2431 }
2432 Ok(options)
2433 }
2434
2435 match &mut stmt {
2436 Statement::CreateSource { stmt } => {
2437 stmt.with_properties.0 =
2438 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2439 }
2440 Statement::CreateTable { with_options, .. } => {
2441 *with_options =
2442 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2443 associate_table_id = source.optional_associated_table_id;
2444 preferred_id = associate_table_id.unwrap().as_object_id();
2445 }
2446 _ => unreachable!(),
2447 }
2448
2449 stmt.to_string()
2450 };
2451
2452 {
2453 if !to_add_secret_dep.is_empty() {
2456 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2457 object_dependency::ActiveModel {
2458 oid: Set(secret_id.into()),
2459 used_by: Set(preferred_id),
2460 ..Default::default()
2461 }
2462 }))
2463 .exec(&txn)
2464 .await?;
2465 }
2466 if !to_remove_secret_dep.is_empty() {
2469 let _ = ObjectDependency::delete_many()
2470 .filter(
2471 object_dependency::Column::Oid
2472 .is_in(to_remove_secret_dep)
2473 .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2474 )
2475 .exec(&txn)
2476 .await?;
2477 }
2478 }
2479
2480 let active_source_model = source::ActiveModel {
2481 source_id: Set(source_id),
2482 definition: Set(rewrite_sql.clone()),
2483 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2484 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2485 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2486 ..Default::default()
2487 };
2488 Source::update(active_source_model).exec(&txn).await?;
2489
2490 if let Some(associate_table_id) = associate_table_id {
2491 let active_table_model = table::ActiveModel {
2493 table_id: Set(associate_table_id),
2494 definition: Set(rewrite_sql),
2495 ..Default::default()
2496 };
2497 Table::update(active_table_model).exec(&txn).await?;
2498 }
2499
2500 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2501 associate_table_id.as_job_id()
2503 } else {
2504 source_id.as_share_source_job_id()
2505 }]
2506 .into_iter()
2507 .chain(dep_source_job_ids.into_iter())
2508 .collect_vec();
2509
2510 update_connector_props_fragments(
2512 &txn,
2513 to_check_job_ids,
2514 FragmentTypeFlag::Source,
2515 |node, found| {
2516 if let PbNodeBody::Source(node) = node
2517 && let Some(source_inner) = &mut node.source_inner
2518 {
2519 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2520 source_inner.secret_refs = options_with_secret.as_secret().clone();
2521 *found = true;
2522 }
2523 },
2524 is_shared_source,
2525 )
2526 .await?;
2527
2528 let mut to_update_objs = Vec::with_capacity(2);
2529 let (source, obj) = Source::find_by_id(source_id)
2530 .find_also_related(Object)
2531 .one(&txn)
2532 .await?
2533 .ok_or_else(|| {
2534 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2535 })?;
2536 to_update_objs.push(PbObject {
2537 object_info: Some(PbObjectInfo::Source(
2538 ObjectModel(source, obj.unwrap(), None).into(),
2539 )),
2540 });
2541
2542 if let Some(associate_table_id) = associate_table_id {
2543 let (table, obj) = Table::find_by_id(associate_table_id)
2544 .find_also_related(Object)
2545 .one(&txn)
2546 .await?
2547 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2548 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2549 .one(&txn)
2550 .await?;
2551 to_update_objs.push(PbObject {
2552 object_info: Some(PbObjectInfo::Table(
2553 ObjectModel(table, obj.unwrap(), streaming_job).into(),
2554 )),
2555 });
2556 }
2557
2558 txn.commit().await?;
2559
2560 self.notify_frontend(
2561 NotificationOperation::Update,
2562 NotificationInfo::ObjectGroup(PbObjectGroup {
2563 objects: to_update_objs,
2564 dependencies: vec![],
2565 }),
2566 )
2567 .await;
2568
2569 Ok(options_with_secret)
2570 }
2571
2572 pub async fn update_sink_props_by_sink_id(
2573 &self,
2574 sink_id: SinkId,
2575 props: BTreeMap<String, String>,
2576 ) -> MetaResult<HashMap<String, String>> {
2577 let inner = self.inner.read().await;
2578 let txn = inner.db.begin().await?;
2579
2580 let (sink, _obj) = Sink::find_by_id(sink_id)
2581 .find_also_related(Object)
2582 .one(&txn)
2583 .await?
2584 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2585 validate_sink_props(&sink, &props)?;
2586 let definition = sink.definition.clone();
2587 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2588 .map_err(|e| SinkError::Config(anyhow!(e)))?
2589 .try_into()
2590 .unwrap();
2591 if let Statement::CreateSink { stmt } = &mut stmt {
2592 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2593 } else {
2594 panic!("definition is not a create sink statement")
2595 }
2596 let mut new_config = sink.properties.clone().into_inner();
2597 new_config.extend(props.clone());
2598
2599 let definition = stmt.to_string();
2600 let active_sink = sink::ActiveModel {
2601 sink_id: Set(sink_id),
2602 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2603 definition: Set(definition),
2604 ..Default::default()
2605 };
2606 Sink::update(active_sink).exec(&txn).await?;
2607
2608 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2609 let (sink, obj) = Sink::find_by_id(sink_id)
2610 .find_also_related(Object)
2611 .one(&txn)
2612 .await?
2613 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2614 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2615 .one(&txn)
2616 .await?;
2617 txn.commit().await?;
2618 let relation_infos = vec![PbObject {
2619 object_info: Some(PbObjectInfo::Sink(
2620 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2621 )),
2622 }];
2623
2624 let _version = self
2625 .notify_frontend(
2626 NotificationOperation::Update,
2627 NotificationInfo::ObjectGroup(PbObjectGroup {
2628 objects: relation_infos,
2629 dependencies: vec![],
2630 }),
2631 )
2632 .await;
2633
2634 Ok(props.into_iter().collect())
2635 }
2636
2637 pub async fn update_iceberg_table_props_by_table_id(
2638 &self,
2639 table_id: TableId,
2640 props: BTreeMap<String, String>,
2641 alter_iceberg_table_props: Option<
2642 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2643 >,
2644 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2645 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2646 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2647 let inner = self.inner.read().await;
2648 let txn = inner.db.begin().await?;
2649
2650 let (sink, _obj) = Sink::find_by_id(sink_id)
2651 .find_also_related(Object)
2652 .one(&txn)
2653 .await?
2654 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2655 validate_sink_props(&sink, &props)?;
2656
2657 let definition = sink.definition.clone();
2658 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2659 .map_err(|e| SinkError::Config(anyhow!(e)))?
2660 .try_into()
2661 .unwrap();
2662 if let Statement::CreateTable {
2663 with_options,
2664 engine,
2665 ..
2666 } = &mut stmt
2667 {
2668 if !matches!(engine, Engine::Iceberg) {
2669 return Err(SinkError::Config(anyhow!(
2670 "only iceberg table can be altered as sink"
2671 ))
2672 .into());
2673 }
2674 update_stmt_with_props(with_options, &props)?;
2675 } else {
2676 panic!("definition is not a create iceberg table statement")
2677 }
2678 let mut new_config = sink.properties.clone().into_inner();
2679 new_config.extend(props.clone());
2680
2681 let definition = stmt.to_string();
2682 let active_sink = sink::ActiveModel {
2683 sink_id: Set(sink_id),
2684 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2685 definition: Set(definition.clone()),
2686 ..Default::default()
2687 };
2688 let active_source = source::ActiveModel {
2689 source_id: Set(source_id),
2690 definition: Set(definition.clone()),
2691 ..Default::default()
2692 };
2693 let active_table = table::ActiveModel {
2694 table_id: Set(table_id),
2695 definition: Set(definition),
2696 ..Default::default()
2697 };
2698 Sink::update(active_sink).exec(&txn).await?;
2699 Source::update(active_source).exec(&txn).await?;
2700 Table::update(active_table).exec(&txn).await?;
2701
2702 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2703
2704 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2705 .find_also_related(Object)
2706 .one(&txn)
2707 .await?
2708 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2709 let sink_streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2710 .one(&txn)
2711 .await?;
2712 let (source, source_obj) = Source::find_by_id(source_id)
2713 .find_also_related(Object)
2714 .one(&txn)
2715 .await?
2716 .ok_or_else(|| {
2717 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2718 })?;
2719 let (table, table_obj) = Table::find_by_id(table_id)
2720 .find_also_related(Object)
2721 .one(&txn)
2722 .await?
2723 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2724 let table_streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2725 .one(&txn)
2726 .await?;
2727 txn.commit().await?;
2728 let relation_infos = vec![
2729 PbObject {
2730 object_info: Some(PbObjectInfo::Sink(
2731 ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job).into(),
2732 )),
2733 },
2734 PbObject {
2735 object_info: Some(PbObjectInfo::Source(
2736 ObjectModel(source, source_obj.unwrap(), None).into(),
2737 )),
2738 },
2739 PbObject {
2740 object_info: Some(PbObjectInfo::Table(
2741 ObjectModel(table, table_obj.unwrap(), table_streaming_job).into(),
2742 )),
2743 },
2744 ];
2745 let _version = self
2746 .notify_frontend(
2747 NotificationOperation::Update,
2748 NotificationInfo::ObjectGroup(PbObjectGroup {
2749 objects: relation_infos,
2750 dependencies: vec![],
2751 }),
2752 )
2753 .await;
2754
2755 Ok((props.into_iter().collect(), sink_id))
2756 }
2757
2758 pub async fn update_connection_and_dependent_objects_props(
2760 &self,
2761 connection_id: ConnectionId,
2762 alter_props: BTreeMap<String, String>,
2763 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2764 ) -> MetaResult<(
2765 WithOptionsSecResolved, Vec<(SourceId, HashMap<String, String>)>, Vec<(SinkId, HashMap<String, String>)>, )> {
2769 let inner = self.inner.read().await;
2770 let txn = inner.db.begin().await?;
2771
2772 let dependent_sources: Vec<SourceId> = Source::find()
2774 .select_only()
2775 .column(source::Column::SourceId)
2776 .filter(source::Column::ConnectionId.eq(connection_id))
2777 .into_tuple()
2778 .all(&txn)
2779 .await?;
2780
2781 let dependent_sinks: Vec<SinkId> = Sink::find()
2782 .select_only()
2783 .column(sink::Column::SinkId)
2784 .filter(sink::Column::ConnectionId.eq(connection_id))
2785 .into_tuple()
2786 .all(&txn)
2787 .await?;
2788
2789 let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2790 .find_also_related(Object)
2791 .one(&txn)
2792 .await?
2793 .ok_or_else(|| {
2794 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2795 })?;
2796
2797 let prop_keys: Vec<String> = alter_props
2799 .keys()
2800 .chain(alter_secret_refs.keys())
2801 .cloned()
2802 .collect();
2803
2804 let connection_type_str = pb_connection_type_to_connection_type(
2806 &connection_catalog.params.to_protobuf().connection_type(),
2807 )
2808 .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2809
2810 risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2811 connection_type_str, &prop_keys,
2812 )?;
2813
2814 let connection_pb = connection_catalog.params.to_protobuf();
2815 let mut connection_options_with_secret = WithOptionsSecResolved::new(
2816 connection_pb.properties.into_iter().collect(),
2817 connection_pb.secret_refs.into_iter().collect(),
2818 );
2819
2820 let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2821 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2822
2823 tracing::debug!(
2824 "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2825 connection_id,
2826 dependent_sources,
2827 dependent_sinks
2828 );
2829
2830 {
2832 let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2833 connection_type: connection_pb.connection_type,
2834 properties: connection_options_with_secret
2835 .as_plaintext()
2836 .clone()
2837 .into_iter()
2838 .collect(),
2839 secret_refs: connection_options_with_secret
2840 .as_secret()
2841 .clone()
2842 .into_iter()
2843 .collect(),
2844 };
2845 let connection = PbConnection {
2846 id: connection_id as _,
2847 info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2848 conn_params_pb,
2849 )),
2850 ..Default::default()
2851 };
2852 validate_connection(&connection).await?;
2853 }
2854
2855 if !to_add_secret_dep.is_empty() {
2857 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2858 object_dependency::ActiveModel {
2859 oid: Set(secret_id.into()),
2860 used_by: Set(connection_id.as_object_id()),
2861 ..Default::default()
2862 }
2863 }))
2864 .exec(&txn)
2865 .await?;
2866 }
2867 if !to_remove_secret_dep.is_empty() {
2868 let _ = ObjectDependency::delete_many()
2869 .filter(
2870 object_dependency::Column::Oid
2871 .is_in(to_remove_secret_dep)
2872 .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2873 )
2874 .exec(&txn)
2875 .await?;
2876 }
2877
2878 let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2880 connection_type: connection_pb.connection_type,
2881 properties: connection_options_with_secret
2882 .as_plaintext()
2883 .clone()
2884 .into_iter()
2885 .collect(),
2886 secret_refs: connection_options_with_secret
2887 .as_secret()
2888 .clone()
2889 .into_iter()
2890 .collect(),
2891 };
2892 let active_connection_model = connection::ActiveModel {
2893 connection_id: Set(connection_id),
2894 params: Set(ConnectionParams::from(&updated_connection_params)),
2895 ..Default::default()
2896 };
2897 Connection::update(active_connection_model)
2898 .exec(&txn)
2899 .await?;
2900
2901 let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2903
2904 if !dependent_sources.is_empty() {
2905 let sources_with_objs = Source::find()
2907 .find_also_related(Object)
2908 .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2909 .all(&txn)
2910 .await?;
2911
2912 let mut source_updates = Vec::new();
2914 let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2915
2916 for (source, _obj) in sources_with_objs {
2917 let source_id = source.source_id;
2918
2919 let mut source_options_with_secret = WithOptionsSecResolved::new(
2920 source.with_properties.0.clone(),
2921 source
2922 .secret_ref
2923 .clone()
2924 .map(|secret_ref| secret_ref.to_protobuf())
2925 .unwrap_or_default(),
2926 );
2927 let (source_to_add_secret_dep, source_to_remove_secret_dep) =
2928 source_options_with_secret
2929 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2930
2931 let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2933
2934 let source_used_by_id = source
2937 .optional_associated_table_id
2938 .map(|table_id| table_id.as_object_id())
2939 .unwrap_or_else(|| source_id.as_object_id());
2940 if !source_to_add_secret_dep.is_empty() {
2941 ObjectDependency::insert_many(source_to_add_secret_dep.into_iter().map(
2942 |secret_id| object_dependency::ActiveModel {
2943 oid: Set(secret_id.into()),
2944 used_by: Set(source_used_by_id),
2945 ..Default::default()
2946 },
2947 ))
2948 .exec(&txn)
2949 .await?;
2950 }
2951 if !source_to_remove_secret_dep.is_empty() {
2952 let _ = ObjectDependency::delete_many()
2953 .filter(
2954 object_dependency::Column::Oid
2955 .is_in(source_to_remove_secret_dep)
2956 .and(object_dependency::Column::UsedBy.eq(source_used_by_id)),
2957 )
2958 .exec(&txn)
2959 .await?;
2960 }
2961
2962 let active_source = source::ActiveModel {
2964 source_id: Set(source_id),
2965 with_properties: Set(Property(
2966 source_options_with_secret.as_plaintext().clone(),
2967 )),
2968 secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2969 || {
2970 risingwave_meta_model::SecretRef::from(
2971 source_options_with_secret.as_secret().clone(),
2972 )
2973 },
2974 )),
2975 ..Default::default()
2976 };
2977 source_updates.push(active_source);
2978
2979 let is_shared_source = source.is_shared();
2984 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2985 if !is_shared_source {
2986 dep_source_job_ids = ObjectDependency::find()
2987 .select_only()
2988 .column(object_dependency::Column::UsedBy)
2989 .filter(object_dependency::Column::Oid.eq(source_id))
2990 .into_tuple()
2991 .all(&txn)
2992 .await?;
2993 }
2994
2995 let base_job_id =
2996 if let Some(associate_table_id) = source.optional_associated_table_id {
2997 associate_table_id.as_job_id()
2998 } else {
2999 source_id.as_share_source_job_id()
3000 };
3001 let job_ids = vec![base_job_id]
3002 .into_iter()
3003 .chain(dep_source_job_ids.into_iter())
3004 .collect_vec();
3005
3006 fragment_updates.push(DependentSourceFragmentUpdate {
3007 job_ids,
3008 with_properties: source_options_with_secret.as_plaintext().clone(),
3009 secret_refs: source_options_with_secret.as_secret().clone(),
3010 is_shared_source,
3011 });
3012
3013 let complete_source_props = LocalSecretManager::global()
3015 .fill_secrets(
3016 source_options_with_secret.as_plaintext().clone(),
3017 source_options_with_secret.as_secret().clone(),
3018 )
3019 .map_err(MetaError::from)?
3020 .into_iter()
3021 .collect::<HashMap<String, String>>();
3022 updated_sources_with_props.push((source_id, complete_source_props));
3023 }
3024
3025 for source_update in source_updates {
3026 Source::update(source_update).exec(&txn).await?;
3027 }
3028
3029 for DependentSourceFragmentUpdate {
3031 job_ids,
3032 with_properties,
3033 secret_refs,
3034 is_shared_source,
3035 } in fragment_updates
3036 {
3037 update_connector_props_fragments(
3038 &txn,
3039 job_ids,
3040 FragmentTypeFlag::Source,
3041 |node, found| {
3042 if let PbNodeBody::Source(node) = node
3043 && let Some(source_inner) = &mut node.source_inner
3044 {
3045 source_inner.with_properties = with_properties.clone();
3046 source_inner.secret_refs = secret_refs.clone();
3047 *found = true;
3048 }
3049 },
3050 is_shared_source,
3051 )
3052 .await?;
3053 }
3054 }
3055
3056 let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
3058
3059 if !dependent_sinks.is_empty() {
3060 let sinks_with_objs = Sink::find()
3062 .find_also_related(Object)
3063 .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
3064 .all(&txn)
3065 .await?;
3066
3067 let mut sink_updates = Vec::new();
3069 let mut sink_fragment_updates = Vec::new();
3070
3071 for (sink, _obj) in sinks_with_objs {
3072 let sink_id = sink.sink_id;
3073
3074 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3076 Some(connector) => {
3077 let connector_type = connector.to_lowercase();
3078 check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
3079 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3080
3081 match_sink_name_str!(
3082 connector_type.as_str(),
3083 SinkType,
3084 {
3085 let mut new_sink_props = sink.properties.0.clone();
3086 new_sink_props.extend(alter_props.clone());
3087 SinkType::validate_alter_config(&new_sink_props)
3088 },
3089 |sink: &str| Err(SinkError::Config(anyhow!(
3090 "unsupported sink type {}",
3091 sink
3092 )))
3093 )?
3094 }
3095 None => {
3096 return Err(SinkError::Config(anyhow!(
3097 "connector not specified when alter sink"
3098 ))
3099 .into());
3100 }
3101 };
3102
3103 let mut new_sink_props = sink.properties.0.clone();
3104 new_sink_props.extend(alter_props.clone());
3105
3106 let active_sink = sink::ActiveModel {
3108 sink_id: Set(sink_id),
3109 properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
3110 ..Default::default()
3111 };
3112 sink_updates.push(active_sink);
3113
3114 sink_fragment_updates.push((sink_id, new_sink_props.clone()));
3116
3117 let complete_sink_props: HashMap<String, String> =
3119 new_sink_props.into_iter().collect();
3120 updated_sinks_with_props.push((sink_id, complete_sink_props));
3121 }
3122
3123 for sink_update in sink_updates {
3125 Sink::update(sink_update).exec(&txn).await?;
3126 }
3127
3128 for (sink_id, new_sink_props) in sink_fragment_updates {
3130 update_connector_props_fragments(
3131 &txn,
3132 vec![sink_id.as_job_id()],
3133 FragmentTypeFlag::Sink,
3134 |node, found| {
3135 if let PbNodeBody::Sink(node) = node
3136 && let Some(sink_desc) = &mut node.sink_desc
3137 && sink_desc.id == sink_id.as_raw_id()
3138 {
3139 sink_desc.properties = new_sink_props.clone();
3140 *found = true;
3141 }
3142 },
3143 true,
3144 )
3145 .await?;
3146 }
3147 }
3148
3149 let mut updated_objects = Vec::new();
3151
3152 let (connection, obj) = Connection::find_by_id(connection_id)
3154 .find_also_related(Object)
3155 .one(&txn)
3156 .await?
3157 .ok_or_else(|| {
3158 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
3159 })?;
3160 updated_objects.push(PbObject {
3161 object_info: Some(PbObjectInfo::Connection(
3162 ObjectModel(connection, obj.unwrap(), None).into(),
3163 )),
3164 });
3165
3166 for source_id in &dependent_sources {
3168 let (source, obj) = Source::find_by_id(*source_id)
3169 .find_also_related(Object)
3170 .one(&txn)
3171 .await?
3172 .ok_or_else(|| {
3173 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
3174 })?;
3175 updated_objects.push(PbObject {
3176 object_info: Some(PbObjectInfo::Source(
3177 ObjectModel(source, obj.unwrap(), None).into(),
3178 )),
3179 });
3180 }
3181
3182 for sink_id in &dependent_sinks {
3184 let (sink, obj) = Sink::find_by_id(*sink_id)
3185 .find_also_related(Object)
3186 .one(&txn)
3187 .await?
3188 .ok_or_else(|| {
3189 MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
3190 })?;
3191 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
3192 .one(&txn)
3193 .await?;
3194 updated_objects.push(PbObject {
3195 object_info: Some(PbObjectInfo::Sink(
3196 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
3197 )),
3198 });
3199 }
3200
3201 txn.commit().await?;
3203
3204 if !updated_objects.is_empty() {
3206 self.notify_frontend(
3207 NotificationOperation::Update,
3208 NotificationInfo::ObjectGroup(PbObjectGroup {
3209 objects: updated_objects,
3210 dependencies: vec![],
3211 }),
3212 )
3213 .await;
3214 }
3215
3216 Ok((
3217 connection_options_with_secret,
3218 updated_sources_with_props,
3219 updated_sinks_with_props,
3220 ))
3221 }
3222
3223 pub async fn update_fragment_rate_limit_by_fragment_id(
3224 &self,
3225 fragment_id: FragmentId,
3226 rate_limit: Option<u32>,
3227 ) -> MetaResult<()> {
3228 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
3229 stream_node: &mut PbStreamNode| {
3230 let mut found = false;
3231 if fragment_type_mask.contains_any(
3232 FragmentTypeFlag::dml_rate_limit_fragments()
3233 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
3234 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
3235 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
3236 ) {
3237 visit_stream_node_mut(stream_node, |node| {
3238 if let PbNodeBody::Dml(node) = node {
3239 node.rate_limit = rate_limit;
3240 found = true;
3241 }
3242 if let PbNodeBody::Sink(node) = node {
3243 node.rate_limit = rate_limit;
3244 found = true;
3245 }
3246 if let PbNodeBody::StreamCdcScan(node) = node {
3247 node.rate_limit = rate_limit;
3248 found = true;
3249 }
3250 if let PbNodeBody::StreamScan(node) = node {
3251 node.rate_limit = rate_limit;
3252 found = true;
3253 }
3254 if let PbNodeBody::SourceBackfill(node) = node {
3255 node.rate_limit = rate_limit;
3256 found = true;
3257 }
3258 });
3259 }
3260 found
3261 };
3262 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
3263 .await
3264 }
3265
3266 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
3269 let inner = self.inner.read().await;
3270 let txn = inner.db.begin().await?;
3271
3272 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
3273 .select_only()
3274 .columns([
3275 fragment::Column::FragmentId,
3276 fragment::Column::JobId,
3277 fragment::Column::FragmentTypeMask,
3278 fragment::Column::StreamNode,
3279 ])
3280 .filter(FragmentTypeMask::intersects_any(
3281 FragmentTypeFlag::rate_limit_fragments(),
3282 ))
3283 .into_tuple()
3284 .all(&txn)
3285 .await?;
3286
3287 let mut rate_limits = Vec::new();
3288 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
3289 let stream_node = stream_node.to_protobuf();
3290 visit_stream_node_body(&stream_node, |node| {
3291 let mut rate_limit = None;
3292 let mut node_name = None;
3293
3294 match node {
3295 PbNodeBody::Source(node) => {
3297 if let Some(node_inner) = &node.source_inner {
3298 rate_limit = node_inner.rate_limit;
3299 node_name = Some("SOURCE");
3300 }
3301 }
3302 PbNodeBody::StreamFsFetch(node) => {
3303 if let Some(node_inner) = &node.node_inner {
3304 rate_limit = node_inner.rate_limit;
3305 node_name = Some("FS_FETCH");
3306 }
3307 }
3308 PbNodeBody::SourceBackfill(node) => {
3310 rate_limit = node.rate_limit;
3311 node_name = Some("SOURCE_BACKFILL");
3312 }
3313 PbNodeBody::StreamScan(node) => {
3314 rate_limit = node.rate_limit;
3315 node_name = Some("STREAM_SCAN");
3316 }
3317 PbNodeBody::StreamCdcScan(node) => {
3318 rate_limit = node.rate_limit;
3319 node_name = Some("STREAM_CDC_SCAN");
3320 }
3321 PbNodeBody::Sink(node) => {
3322 rate_limit = node.rate_limit;
3323 node_name = Some("SINK");
3324 }
3325 _ => {}
3326 }
3327
3328 if let Some(rate_limit) = rate_limit {
3329 rate_limits.push(RateLimitInfo {
3330 fragment_id,
3331 job_id,
3332 fragment_type_mask: fragment_type_mask as u32,
3333 rate_limit,
3334 node_name: node_name.unwrap().to_owned(),
3335 });
3336 }
3337 });
3338 }
3339
3340 Ok(rate_limits)
3341 }
3342}
3343
3344fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
3345 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3347 Some(connector) => {
3348 let connector_type = connector.to_lowercase();
3349 let field_names: Vec<String> = props.keys().cloned().collect();
3350 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
3351 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3352
3353 match_sink_name_str!(
3354 connector_type.as_str(),
3355 SinkType,
3356 {
3357 let mut new_props = sink.properties.0.clone();
3358 new_props.extend(props.clone());
3359 SinkType::validate_alter_config(&new_props)
3360 },
3361 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
3362 )?
3363 }
3364 None => {
3365 return Err(
3366 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
3367 );
3368 }
3369 };
3370 Ok(())
3371}
3372
3373fn update_stmt_with_props(
3374 with_properties: &mut Vec<SqlOption>,
3375 props: &BTreeMap<String, String>,
3376) -> MetaResult<()> {
3377 let mut new_sql_options = with_properties
3378 .iter()
3379 .map(|sql_option| (&sql_option.name, sql_option))
3380 .collect::<IndexMap<_, _>>();
3381 let add_sql_options = props
3382 .iter()
3383 .map(|(k, v)| SqlOption::try_from((k, v)))
3384 .collect::<Result<Vec<SqlOption>, ParserError>>()
3385 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3386 new_sql_options.extend(
3387 add_sql_options
3388 .iter()
3389 .map(|sql_option| (&sql_option.name, sql_option)),
3390 );
3391 *with_properties = new_sql_options.into_values().cloned().collect();
3392 Ok(())
3393}
3394
3395async fn update_sink_fragment_props(
3396 txn: &DatabaseTransaction,
3397 sink_id: SinkId,
3398 props: BTreeMap<String, String>,
3399) -> MetaResult<()> {
3400 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3401 .select_only()
3402 .columns([
3403 fragment::Column::FragmentId,
3404 fragment::Column::FragmentTypeMask,
3405 fragment::Column::StreamNode,
3406 ])
3407 .filter(fragment::Column::JobId.eq(sink_id))
3408 .into_tuple()
3409 .all(txn)
3410 .await?;
3411 let fragments = fragments
3412 .into_iter()
3413 .filter(|(_, fragment_type_mask, _)| {
3414 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3415 })
3416 .filter_map(|(id, _, stream_node)| {
3417 let mut stream_node = stream_node.to_protobuf();
3418 let mut found = false;
3419 visit_stream_node_mut(&mut stream_node, |node| {
3420 if let PbNodeBody::Sink(node) = node
3421 && let Some(sink_desc) = &mut node.sink_desc
3422 && sink_desc.id == sink_id
3423 {
3424 sink_desc.properties.extend(props.clone());
3425 found = true;
3426 }
3427 });
3428 if found { Some((id, stream_node)) } else { None }
3429 })
3430 .collect_vec();
3431 assert!(
3432 !fragments.is_empty(),
3433 "sink id should be used by at least one fragment"
3434 );
3435 for (id, stream_node) in fragments {
3436 Fragment::update(fragment::ActiveModel {
3437 fragment_id: Set(id),
3438 stream_node: Set(StreamNode::from(&stream_node)),
3439 ..Default::default()
3440 })
3441 .exec(txn)
3442 .await?;
3443 }
3444 Ok(())
3445}
3446
3447pub struct SinkIntoTableContext {
3448 pub updated_sink_catalogs: Vec<SinkId>,
3451}
3452
3453pub struct FinishAutoRefreshSchemaSinkContext {
3454 pub tmp_sink_id: SinkId,
3455 pub original_sink_id: SinkId,
3456 pub columns: Vec<PbColumnCatalog>,
3457 pub new_log_store_table: Option<Box<PbTable>>,
3458}
3459
3460async fn update_connector_props_fragments<F>(
3461 txn: &DatabaseTransaction,
3462 job_ids: Vec<JobId>,
3463 expect_flag: FragmentTypeFlag,
3464 mut alter_stream_node_fn: F,
3465 is_shared_source: bool,
3466) -> MetaResult<()>
3467where
3468 F: FnMut(&mut PbNodeBody, &mut bool),
3469{
3470 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3471 .select_only()
3472 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3473 .filter(
3474 fragment::Column::JobId
3475 .is_in(job_ids.clone())
3476 .and(FragmentTypeMask::intersects(expect_flag)),
3477 )
3478 .into_tuple()
3479 .all(txn)
3480 .await?;
3481 let fragments = fragments
3482 .into_iter()
3483 .filter_map(|(id, stream_node)| {
3484 let mut stream_node = stream_node.to_protobuf();
3485 let mut found = false;
3486 visit_stream_node_mut(&mut stream_node, |node| {
3487 alter_stream_node_fn(node, &mut found);
3488 });
3489 if found { Some((id, stream_node)) } else { None }
3490 })
3491 .collect_vec();
3492 if is_shared_source || job_ids.len() > 1 {
3493 assert!(
3497 !fragments.is_empty(),
3498 "job ids {:?} (type: {:?}) should be used by at least one fragment",
3499 job_ids,
3500 expect_flag
3501 );
3502 }
3503
3504 for (id, stream_node) in fragments {
3505 Fragment::update(fragment::ActiveModel {
3506 fragment_id: Set(id),
3507 stream_node: Set(StreamNode::from(&stream_node)),
3508 ..Default::default()
3509 })
3510 .exec(txn)
3511 .await?;
3512 }
3513
3514 Ok(())
3515}