1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22 ColumnCatalog, FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23 RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, max_column_id,
24};
25use risingwave_common::config::DefaultParallelism;
26use risingwave_common::hash::VnodeCountCompat;
27use risingwave_common::id::JobId;
28use risingwave_common::secret::LocalSecretManager;
29use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_common::util::stream_graph_visitor::{
32 visit_stream_node_body, visit_stream_node_mut,
33};
34use risingwave_common::{bail, current_cluster_version};
35use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
36use risingwave_connector::connector_common::validate_connection;
37use risingwave_connector::error::ConnectorError;
38use risingwave_connector::sink::file_sink::fs::FsSink;
39use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
40use risingwave_connector::source::{
41 ConnectorProperties, UPSTREAM_SOURCE_KEY, pb_connection_type_to_connection_type,
42};
43use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
46use risingwave_meta_model::refresh_job::RefreshState;
47use risingwave_meta_model::streaming_job::BackfillOrders;
48use risingwave_meta_model::table::TableType;
49use risingwave_meta_model::user_privilege::Action;
50use risingwave_meta_model::*;
51use risingwave_pb::catalog::table::PbEngine;
52use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
53use risingwave_pb::ddl_service::streaming_job_resource_type;
54use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
55use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
56use risingwave_pb::meta::object::PbObjectInfo;
57use risingwave_pb::meta::subscribe_response::{
58 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
59};
60use risingwave_pb::meta::{ObjectDependency as PbObjectDependency, PbObject, PbObjectGroup};
61use risingwave_pb::plan_common::PbColumnCatalog;
62use risingwave_pb::plan_common::source_refresh_mode::{
63 RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
64};
65use risingwave_pb::secret::PbSecretRef;
66use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
67use risingwave_pb::stream_plan::stream_node::PbNodeBody;
68use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
69use risingwave_pb::user::PbUserInfo;
70use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
71use risingwave_sqlparser::parser::{Parser, ParserError};
72use sea_orm::ActiveValue::Set;
73use sea_orm::sea_query::{Expr, Query, SimpleExpr};
74use sea_orm::{
75 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
76 NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
77};
78use thiserror_ext::AsReport;
79
80use super::rename::IndexItemRewriter;
81use crate::barrier::Command;
82use crate::controller::ObjectModel;
83use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
84use crate::controller::fragment::FragmentTypeMaskExt;
85use crate::controller::utils::{
86 PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
87 check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
88 ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
89 get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
90 list_object_dependencies_by_object_id, list_user_info_by_ids,
91 try_get_iceberg_table_by_downstream_sink,
92};
93use crate::error::MetaErrorInner;
94use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
95use crate::model::{
96 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
97 StreamJobFragmentsToCreate,
98};
99use crate::stream::SplitAssignment;
100use crate::{MetaError, MetaResult};
101
102#[derive(Debug)]
107struct DependentSourceFragmentUpdate {
108 job_ids: Vec<JobId>,
109 with_properties: BTreeMap<String, String>,
110 secret_refs: BTreeMap<String, PbSecretRef>,
111 is_shared_source: bool,
112}
113
114#[derive(Debug)]
115struct ReplaceOriginalJobInfo {
116 max_parallelism: i32,
117 timezone: Option<String>,
118 config_override: Option<String>,
119 adaptive_parallelism_strategy: Option<String>,
120 parallelism: StreamingParallelism,
121 specific_resource_group: Option<String>,
122}
123
124impl ReplaceOriginalJobInfo {
125 fn resolved_parallelism(
126 &self,
127 specified_parallelism: Option<&NonZeroUsize>,
128 ) -> StreamingParallelism {
129 specified_parallelism
130 .map(|n| StreamingParallelism::Fixed(n.get() as _))
131 .unwrap_or_else(|| self.parallelism.clone())
132 }
133
134 fn stream_context(&self, ctx_override: Option<&StreamContext>) -> StreamContext {
135 StreamContext {
136 timezone: ctx_override
137 .and_then(|ctx| ctx.timezone.clone())
138 .or_else(|| self.timezone.clone()),
139 config_override: self.config_override.clone().unwrap_or_default().into(),
142 adaptive_parallelism_strategy: self.adaptive_parallelism_strategy.as_deref().map(|s| {
143 parse_strategy(s).expect("strategy should be validated before persisting")
144 }),
145 }
146 }
147
148 fn resource_type(&self) -> streaming_job_resource_type::ResourceType {
149 match &self.specific_resource_group {
150 Some(group) => {
151 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group.clone())
152 }
153 None => streaming_job_resource_type::ResourceType::Regular(true),
154 }
155 }
156}
157
158impl From<streaming_job::Model> for ReplaceOriginalJobInfo {
159 fn from(model: streaming_job::Model) -> Self {
160 Self {
161 max_parallelism: model.max_parallelism,
162 timezone: model.timezone,
163 config_override: model.config_override,
164 adaptive_parallelism_strategy: model.adaptive_parallelism_strategy,
165 parallelism: model.parallelism,
166 specific_resource_group: model.specific_resource_group,
167 }
168 }
169}
170
171impl CatalogController {
172 #[allow(clippy::too_many_arguments)]
173 pub async fn create_streaming_job_obj(
174 txn: &DatabaseTransaction,
175 obj_type: ObjectType,
176 owner_id: UserId,
177 database_id: Option<DatabaseId>,
178 schema_id: Option<SchemaId>,
179 create_type: PbCreateType,
180 ctx: StreamContext,
181 streaming_parallelism: StreamingParallelism,
182 max_parallelism: usize,
183 resource_type: streaming_job_resource_type::ResourceType,
184 backfill_parallelism: Option<StreamingParallelism>,
185 ) -> MetaResult<streaming_job::Model> {
186 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
187 let job_id = obj.oid.as_job_id();
188 let specific_resource_group = resource_type.resource_group();
189 let is_serverless_backfill = matches!(
190 &resource_type,
191 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
192 );
193 let model = streaming_job::Model {
194 job_id,
195 job_status: JobStatus::Initial,
196 create_type: create_type.into(),
197 timezone: ctx.timezone,
198 config_override: Some(ctx.config_override.to_string()),
199 adaptive_parallelism_strategy: ctx
200 .adaptive_parallelism_strategy
201 .as_ref()
202 .map(ToString::to_string),
203 parallelism: streaming_parallelism,
204 backfill_parallelism,
205 backfill_orders: None,
206 max_parallelism: max_parallelism as _,
207 specific_resource_group,
208 is_serverless_backfill,
209 };
210 let job = model.clone().into_active_model();
211 StreamingJobModel::insert(job).exec(txn).await?;
212
213 Ok(model)
214 }
215
216 #[await_tree::instrument]
222 pub async fn create_job_catalog(
223 &self,
224 streaming_job: &mut StreamingJob,
225 ctx: &StreamContext,
226 parallelism: &Option<Parallelism>,
227 max_parallelism: usize,
228 mut dependencies: HashSet<ObjectId>,
229 resource_type: streaming_job_resource_type::ResourceType,
230 backfill_parallelism: &Option<Parallelism>,
231 ) -> MetaResult<streaming_job::Model> {
232 let inner = self.inner.write().await;
233 let txn = inner.db.begin().await?;
234 let create_type = streaming_job.create_type();
235
236 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
237 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
238 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
239 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
240 };
241 let backfill_parallelism = backfill_parallelism
242 .as_ref()
243 .map(|p| StreamingParallelism::Fixed(p.parallelism as _));
244
245 ensure_user_id(streaming_job.owner() as _, &txn).await?;
246 ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
247 ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
248 check_relation_name_duplicate(
249 &streaming_job.name(),
250 streaming_job.database_id(),
251 streaming_job.schema_id(),
252 &txn,
253 )
254 .await?;
255
256 if !dependencies.is_empty() {
258 let altering_cnt = ObjectDependency::find()
259 .join(
260 JoinType::InnerJoin,
261 object_dependency::Relation::Object1.def(),
262 )
263 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
264 .filter(
265 object_dependency::Column::Oid
266 .is_in(dependencies.clone())
267 .and(object::Column::ObjType.eq(ObjectType::Table))
268 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
269 .and(
270 object::Column::Oid.not_in_subquery(
272 Query::select()
273 .column(table::Column::TableId)
274 .from(Table)
275 .to_owned(),
276 ),
277 ),
278 )
279 .count(&txn)
280 .await?;
281 if altering_cnt != 0 {
282 return Err(MetaError::permission_denied(
283 "some dependent relations are being altered",
284 ));
285 }
286 }
287
288 let streaming_job_model = match streaming_job {
289 StreamingJob::MaterializedView(table) => {
290 let streaming_job_model = Self::create_streaming_job_obj(
291 &txn,
292 ObjectType::Table,
293 table.owner as _,
294 Some(table.database_id),
295 Some(table.schema_id),
296 create_type,
297 ctx.clone(),
298 streaming_parallelism,
299 max_parallelism,
300 resource_type,
301 backfill_parallelism.clone(),
302 )
303 .await?;
304 table.id = streaming_job_model.job_id.as_mv_table_id();
305 let table_model: table::ActiveModel = table.clone().into();
306 Table::insert(table_model).exec(&txn).await?;
307 streaming_job_model
308 }
309 StreamingJob::Sink(sink) => {
310 if let Some(target_table_id) = sink.target_table
311 && check_sink_into_table_cycle(
312 target_table_id.into(),
313 dependencies.iter().cloned().collect(),
314 &txn,
315 )
316 .await?
317 {
318 bail!("Creating such a sink will result in circular dependency.");
319 }
320
321 let streaming_job_model = Self::create_streaming_job_obj(
322 &txn,
323 ObjectType::Sink,
324 sink.owner as _,
325 Some(sink.database_id),
326 Some(sink.schema_id),
327 create_type,
328 ctx.clone(),
329 streaming_parallelism,
330 max_parallelism,
331 resource_type,
332 backfill_parallelism.clone(),
333 )
334 .await?;
335 sink.id = streaming_job_model.job_id.as_sink_id();
336 let sink_model: sink::ActiveModel = sink.clone().into();
337 Sink::insert(sink_model).exec(&txn).await?;
338 streaming_job_model
339 }
340 StreamingJob::Table(src, table, _) => {
341 let streaming_job_model = Self::create_streaming_job_obj(
342 &txn,
343 ObjectType::Table,
344 table.owner as _,
345 Some(table.database_id),
346 Some(table.schema_id),
347 create_type,
348 ctx.clone(),
349 streaming_parallelism,
350 max_parallelism,
351 resource_type,
352 backfill_parallelism.clone(),
353 )
354 .await?;
355 let job_id = streaming_job_model.job_id;
356 table.id = job_id.as_mv_table_id();
357 if let Some(src) = src {
358 let src_obj = Self::create_object(
359 &txn,
360 ObjectType::Source,
361 src.owner as _,
362 Some(src.database_id),
363 Some(src.schema_id),
364 )
365 .await?;
366 src.id = src_obj.oid.as_source_id();
367 src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
368 table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
369 let source: source::ActiveModel = src.clone().into();
370 Source::insert(source).exec(&txn).await?;
371 }
372 let table_model: table::ActiveModel = table.clone().into();
373 Table::insert(table_model).exec(&txn).await?;
374
375 if table.refreshable {
376 let trigger_interval_secs = src
377 .as_ref()
378 .and_then(|source_catalog| source_catalog.refresh_mode)
379 .and_then(
380 |source_refresh_mode| match source_refresh_mode.refresh_mode {
381 Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
382 refresh_interval_sec,
383 })) => refresh_interval_sec,
384 Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
385 None => None,
386 },
387 );
388
389 RefreshJob::insert(refresh_job::ActiveModel {
390 table_id: Set(table.id),
391 last_trigger_time: Set(None),
392 trigger_interval_secs: Set(trigger_interval_secs),
393 current_status: Set(RefreshState::Idle),
394 last_success_time: Set(None),
395 })
396 .exec(&txn)
397 .await?;
398 }
399 streaming_job_model
400 }
401 StreamingJob::Index(index, table) => {
402 ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
403 let streaming_job_model = Self::create_streaming_job_obj(
404 &txn,
405 ObjectType::Index,
406 index.owner as _,
407 Some(index.database_id),
408 Some(index.schema_id),
409 create_type,
410 ctx.clone(),
411 streaming_parallelism,
412 max_parallelism,
413 resource_type,
414 backfill_parallelism.clone(),
415 )
416 .await?;
417 let job_id = streaming_job_model.job_id;
419 index.id = job_id.as_index_id();
420 index.index_table_id = job_id.as_mv_table_id();
421 table.id = job_id.as_mv_table_id();
422
423 ObjectDependency::insert(object_dependency::ActiveModel {
424 oid: Set(index.primary_table_id.into()),
425 used_by: Set(table.id.into()),
426 ..Default::default()
427 })
428 .exec(&txn)
429 .await?;
430
431 let table_model: table::ActiveModel = table.clone().into();
432 Table::insert(table_model).exec(&txn).await?;
433 let index_model: index::ActiveModel = index.clone().into();
434 Index::insert(index_model).exec(&txn).await?;
435 streaming_job_model
436 }
437 StreamingJob::Source(src) => {
438 let streaming_job_model = Self::create_streaming_job_obj(
439 &txn,
440 ObjectType::Source,
441 src.owner as _,
442 Some(src.database_id),
443 Some(src.schema_id),
444 create_type,
445 ctx.clone(),
446 streaming_parallelism,
447 max_parallelism,
448 resource_type,
449 backfill_parallelism.clone(),
450 )
451 .await?;
452 src.id = streaming_job_model.job_id.as_shared_source_id();
453 let source_model: source::ActiveModel = src.clone().into();
454 Source::insert(source_model).exec(&txn).await?;
455 streaming_job_model
456 }
457 };
458
459 dependencies.extend(
461 streaming_job
462 .dependent_secret_ids()?
463 .into_iter()
464 .map(|id| id.as_object_id()),
465 );
466 dependencies.extend(
468 streaming_job
469 .dependent_connection_ids()?
470 .into_iter()
471 .map(|id| id.as_object_id()),
472 );
473
474 if !dependencies.is_empty() {
476 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
477 object_dependency::ActiveModel {
478 oid: Set(oid),
479 used_by: Set(streaming_job.id().as_object_id()),
480 ..Default::default()
481 }
482 }))
483 .exec(&txn)
484 .await?;
485 }
486
487 txn.commit().await?;
488
489 Ok(streaming_job_model)
490 }
491
492 pub async fn create_internal_table_catalog(
500 &self,
501 job: &StreamingJob,
502 mut incomplete_internal_tables: Vec<PbTable>,
503 ) -> MetaResult<HashMap<TableId, TableId>> {
504 let job_id = job.id();
505 let inner = self.inner.write().await;
506 let txn = inner.db.begin().await?;
507
508 ensure_job_not_canceled(job_id, &txn).await?;
510
511 let mut table_id_map = HashMap::new();
512 for table in &mut incomplete_internal_tables {
513 let table_id = Self::create_object(
514 &txn,
515 ObjectType::Table,
516 table.owner as _,
517 Some(table.database_id),
518 Some(table.schema_id),
519 )
520 .await?
521 .oid
522 .as_table_id();
523 table_id_map.insert(table.id, table_id);
524 table.id = table_id;
525 table.job_id = Some(job_id);
526
527 let table_model = table::ActiveModel {
528 table_id: Set(table_id),
529 belongs_to_job_id: Set(Some(job_id)),
530 fragment_id: NotSet,
531 ..table.clone().into()
532 };
533 Table::insert(table_model).exec(&txn).await?;
534 }
535 txn.commit().await?;
536
537 Ok(table_id_map)
538 }
539
540 pub async fn prepare_stream_job_fragments(
541 &self,
542 stream_job_fragments: &StreamJobFragmentsToCreate,
543 streaming_job: &StreamingJob,
544 for_replace: bool,
545 backfill_orders: Option<BackfillOrders>,
546 ) -> MetaResult<()> {
547 self.prepare_streaming_job(
548 stream_job_fragments.stream_job_id(),
549 || stream_job_fragments.fragments.values(),
550 &stream_job_fragments.downstreams,
551 for_replace,
552 Some(streaming_job),
553 backfill_orders,
554 )
555 .await
556 }
557
558 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
563 )]
564 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
565 &self,
566 job_id: JobId,
567 get_fragments: impl Fn() -> I + 'a,
568 downstreams: &FragmentDownstreamRelation,
569 for_replace: bool,
570 creating_streaming_job: Option<&'a StreamingJob>,
571 backfill_orders: Option<BackfillOrders>,
572 ) -> MetaResult<()> {
573 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
574
575 let inner = self.inner.write().await;
576
577 let need_notify = creating_streaming_job
578 .map(|job| job.should_notify_creating())
579 .unwrap_or(false);
580 let definition = creating_streaming_job.map(|job| job.definition());
581
582 let mut objects_to_notify = vec![];
583 let txn = inner.db.begin().await?;
584
585 ensure_job_not_canceled(job_id, &txn).await?;
587
588 if let Some(backfill_orders) = backfill_orders {
589 let job = streaming_job::ActiveModel {
590 job_id: Set(job_id),
591 backfill_orders: Set(Some(backfill_orders)),
592 ..Default::default()
593 };
594 StreamingJobModel::update(job).exec(&txn).await?;
595 }
596
597 let state_table_ids = fragments
598 .iter()
599 .flat_map(|fragment| fragment.state_table_ids.inner_ref().clone())
600 .collect_vec();
601
602 let inserted_fragment_ids: Vec<crate::model::FragmentId> = fragments
604 .iter()
605 .map(|f| f.fragment_id as crate::model::FragmentId)
606 .collect();
607
608 if !fragments.is_empty() {
609 let fragment_models = fragments
610 .into_iter()
611 .map(|fragment| fragment.into_active_model())
612 .collect_vec();
613 Fragment::insert_many(fragment_models).exec(&txn).await?;
614 }
615
616 if !for_replace {
619 let all_tables = StreamJobFragments::collect_tables(get_fragments());
620 for state_table_id in state_table_ids {
621 let table = all_tables
625 .get(&state_table_id)
626 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
627 assert_eq!(table.id, state_table_id);
628 let vnode_count = table.vnode_count();
629
630 Table::update(table::ActiveModel {
631 table_id: Set(state_table_id as _),
632 fragment_id: Set(Some(table.fragment_id)),
633 vnode_count: Set(vnode_count as _),
634 ..Default::default()
635 })
636 .exec(&txn)
637 .await?;
638
639 if need_notify {
640 let mut table = table.clone();
641 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
643 table.definition = definition.clone().unwrap();
644 }
645 objects_to_notify.push(PbObject {
646 object_info: Some(PbObjectInfo::Table(table)),
647 });
648 }
649 }
650 }
651
652 if need_notify {
654 match creating_streaming_job.unwrap() {
655 StreamingJob::Table(Some(source), ..) => {
656 objects_to_notify.push(PbObject {
657 object_info: Some(PbObjectInfo::Source(source.clone())),
658 });
659 }
660 StreamingJob::Sink(sink) => {
661 objects_to_notify.push(PbObject {
662 object_info: Some(PbObjectInfo::Sink(sink.clone())),
663 });
664 }
665 StreamingJob::Index(index, _) => {
666 objects_to_notify.push(PbObject {
667 object_info: Some(PbObjectInfo::Index(index.clone())),
668 });
669 }
670 _ => {}
671 }
672 }
673
674 insert_fragment_relations(&txn, downstreams).await?;
675
676 if !for_replace {
677 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
679 Table::update(table::ActiveModel {
680 table_id: Set(table.id),
681 dml_fragment_id: Set(table.dml_fragment_id),
682 ..Default::default()
683 })
684 .exec(&txn)
685 .await?;
686 }
687 }
688
689 txn.commit().await?;
690
691 self.env
695 .notification_manager()
696 .notify_serving_fragment_mapping_update(inserted_fragment_ids);
697
698 if !objects_to_notify.is_empty() {
702 self.notify_frontend(
703 Operation::Add,
704 Info::ObjectGroup(PbObjectGroup {
705 objects: objects_to_notify,
706 dependencies: vec![],
707 }),
708 )
709 .await;
710 }
711
712 Ok(())
713 }
714
715 pub async fn build_cancel_command(
718 &self,
719 table_fragments: &StreamJobFragments,
720 ) -> MetaResult<Command> {
721 let inner = self.inner.read().await;
722 let txn = inner.db.begin().await?;
723
724 let dropped_sink_fragment_with_target =
725 if let Some(sink_fragment) = table_fragments.sink_fragment() {
726 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
727 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
728 sink_target_fragment
729 .get(&sink_fragment_id)
730 .map(|target_fragments| {
731 let target_fragment_id = *target_fragments
732 .first()
733 .expect("sink should have at least one downstream fragment");
734 (sink_fragment_id, target_fragment_id)
735 })
736 } else {
737 None
738 };
739
740 Ok(Command::DropStreamingJobs {
741 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
742 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
743 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
744 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
745 .into_iter()
746 .map(|(sink, target)| (target as _, vec![sink as _]))
747 .collect(),
748 })
749 }
750
751 #[await_tree::instrument]
755 pub async fn try_abort_creating_streaming_job(
756 &self,
757 mut job_id: JobId,
758 is_cancelled: bool,
759 ) -> MetaResult<(bool, Option<DatabaseId>)> {
760 let mut inner = self.inner.write().await;
761 let txn = inner.db.begin().await?;
762
763 let obj = Object::find_by_id(job_id).one(&txn).await?;
764 let Some(obj) = obj else {
765 tracing::warn!(
766 id = %job_id,
767 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
768 );
769 return Ok((true, None));
770 };
771 let database_id = obj
772 .database_id
773 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
774 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
775
776 if !is_cancelled && let Some(streaming_job) = &streaming_job {
777 assert_ne!(streaming_job.job_status, JobStatus::Created);
778 if streaming_job.create_type == CreateType::Background
779 && streaming_job.job_status == JobStatus::Creating
780 {
781 if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
782 && check_if_belongs_to_iceberg_table(&txn, job_id).await?
783 {
784 } else {
786 tracing::warn!(
788 id = %job_id,
789 "streaming job is created in background and still in creating status"
790 );
791 return Ok((false, Some(database_id)));
792 }
793 }
794 }
795
796 let original_job_id = job_id;
798 let original_obj_type = obj.obj_type;
799
800 let iceberg_table_id =
801 try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
802 if let Some(iceberg_table_id) = iceberg_table_id {
803 let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
806 Object::delete_many()
807 .filter(
808 object::Column::Oid
809 .eq(job_id)
810 .or(object::Column::Oid.is_in(internal_tables)),
811 )
812 .exec(&txn)
813 .await?;
814 job_id = iceberg_table_id.as_job_id();
815 };
816
817 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
818
819 let mut objs = vec![];
821 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
822
823 let mut need_notify =
824 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
825 if !need_notify {
826 if let Some(table) = &table_obj {
827 need_notify = table.table_type == TableType::MaterializedView;
828 } else if original_obj_type == ObjectType::Sink {
829 need_notify = true;
830 }
831 }
832
833 if is_cancelled {
834 let dropped_tables = Table::find()
835 .find_also_related(Object)
836 .filter(
837 table::Column::TableId.is_in(
838 internal_table_ids
839 .iter()
840 .cloned()
841 .chain(table_obj.iter().map(|t| t.table_id as _)),
842 ),
843 )
844 .all(&txn)
845 .await?
846 .into_iter()
847 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
848 inner
849 .dropped_tables
850 .extend(dropped_tables.map(|t| (t.id, t)));
851 }
852
853 if need_notify {
854 if original_obj_type == ObjectType::Sink && original_job_id != job_id {
857 let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
858 .select_only()
859 .columns([
860 object::Column::Oid,
861 object::Column::ObjType,
862 object::Column::SchemaId,
863 object::Column::DatabaseId,
864 ])
865 .into_partial_model()
866 .one(&txn)
867 .await?;
868 if let Some(orig_obj) = orig_obj {
869 objs.push(orig_obj);
870 }
871 }
872
873 let obj: Option<PartialObject> = Object::find_by_id(job_id)
874 .select_only()
875 .columns([
876 object::Column::Oid,
877 object::Column::ObjType,
878 object::Column::SchemaId,
879 object::Column::DatabaseId,
880 ])
881 .into_partial_model()
882 .one(&txn)
883 .await?;
884 let obj =
885 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
886 objs.push(obj);
887 let internal_table_objs: Vec<PartialObject> = Object::find()
888 .select_only()
889 .columns([
890 object::Column::Oid,
891 object::Column::ObjType,
892 object::Column::SchemaId,
893 object::Column::DatabaseId,
894 ])
895 .join(JoinType::InnerJoin, object::Relation::Table.def())
896 .filter(table::Column::BelongsToJobId.eq(job_id))
897 .into_partial_model()
898 .all(&txn)
899 .await?;
900 objs.extend(internal_table_objs);
901 }
902
903 let abort_fragment_ids: Vec<FragmentId> = Fragment::find()
905 .select_only()
906 .column(fragment::Column::FragmentId)
907 .filter(fragment::Column::JobId.eq(job_id))
908 .into_tuple()
909 .all(&txn)
910 .await?;
911
912 Object::delete_by_id(job_id).exec(&txn).await?;
913 if !internal_table_ids.is_empty() {
914 Object::delete_many()
915 .filter(object::Column::Oid.is_in(internal_table_ids))
916 .exec(&txn)
917 .await?;
918 }
919 if let Some(t) = &table_obj
920 && let Some(source_id) = t.optional_associated_source_id
921 {
922 Object::delete_by_id(source_id).exec(&txn).await?;
923 }
924
925 let err = if is_cancelled {
926 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
927 } else {
928 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
929 };
930 let abort_reason = format!("streaming job aborted {}", err.as_report());
931 for tx in inner
932 .creating_table_finish_notifier
933 .get_mut(&database_id)
934 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
935 .into_iter()
936 .flatten()
937 .flatten()
938 {
939 let _ = tx.send(Err(abort_reason.clone()));
940 }
941 txn.commit().await?;
942
943 self.env
945 .notification_manager()
946 .notify_serving_fragment_mapping_delete(
947 abort_fragment_ids.iter().map(|id| *id as _).collect(),
948 );
949
950 if !objs.is_empty() {
951 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
954 .await;
955 }
956 Ok((true, Some(database_id)))
957 }
958
959 #[await_tree::instrument]
960 pub async fn post_collect_job_fragments(
961 &self,
962 job_id: JobId,
963 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
964 new_sink_downstream: Option<FragmentDownstreamRelation>,
965 split_assignment: Option<&SplitAssignment>,
966 ) -> MetaResult<()> {
967 let inner = self.inner.write().await;
968 let txn = inner.db.begin().await?;
969
970 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
971
972 if let Some(new_downstream) = new_sink_downstream {
973 insert_fragment_relations(&txn, &new_downstream).await?;
974 }
975
976 StreamingJobModel::update(streaming_job::ActiveModel {
978 job_id: Set(job_id),
979 job_status: Set(JobStatus::Creating),
980 ..Default::default()
981 })
982 .exec(&txn)
983 .await?;
984
985 if let Some(split_assignment) = split_assignment {
986 let fragment_splits = split_assignment
987 .iter()
988 .map(|(fragment_id, splits)| {
989 (
990 *fragment_id as _,
991 splits.values().flatten().cloned().collect_vec(),
992 )
993 })
994 .collect();
995
996 self.update_fragment_splits(&txn, &fragment_splits).await?;
997 }
998
999 txn.commit().await?;
1000
1001 Ok(())
1002 }
1003
1004 pub async fn create_job_catalog_for_replace(
1005 &self,
1006 streaming_job: &StreamingJob,
1007 ctx: Option<&StreamContext>,
1008 specified_parallelism: Option<&NonZeroUsize>,
1009 expected_original_max_parallelism: Option<usize>,
1010 ) -> MetaResult<streaming_job::Model> {
1011 let id = streaming_job.id();
1012 let inner = self.inner.write().await;
1013 let txn = inner.db.begin().await?;
1014
1015 streaming_job.verify_version_for_replace(&txn).await?;
1017 let referring_cnt = ObjectDependency::find()
1019 .join(
1020 JoinType::InnerJoin,
1021 object_dependency::Relation::Object1.def(),
1022 )
1023 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
1024 .filter(
1025 object_dependency::Column::Oid
1026 .eq(id)
1027 .and(object::Column::ObjType.eq(ObjectType::Table))
1028 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
1029 )
1030 .count(&txn)
1031 .await?;
1032 if referring_cnt != 0 {
1033 return Err(MetaError::permission_denied(
1034 "job is being altered or referenced by some creating jobs",
1035 ));
1036 }
1037
1038 let original_job = StreamingJobModel::find_by_id(id)
1040 .one(&txn)
1041 .await?
1042 .map(ReplaceOriginalJobInfo::from)
1043 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
1044
1045 if let Some(max_parallelism) = expected_original_max_parallelism
1046 && original_job.max_parallelism != max_parallelism as i32
1047 {
1048 bail!(
1051 "cannot use a different max parallelism \
1052 when replacing streaming job, \
1053 original: {}, new: {}",
1054 original_job.max_parallelism,
1055 max_parallelism
1056 );
1057 }
1058
1059 let parallelism = original_job.resolved_parallelism(specified_parallelism);
1060 let ctx = original_job.stream_context(ctx);
1061 let resource_type = original_job.resource_type();
1062
1063 let tmp_model = Self::create_streaming_job_obj(
1065 &txn,
1066 streaming_job.object_type(),
1067 streaming_job.owner() as _,
1068 Some(streaming_job.database_id() as _),
1069 Some(streaming_job.schema_id() as _),
1070 streaming_job.create_type(),
1071 ctx,
1072 parallelism,
1073 original_job.max_parallelism as _,
1074 resource_type,
1075 None,
1079 )
1080 .await?;
1081
1082 ObjectDependency::insert(object_dependency::ActiveModel {
1084 oid: Set(id.as_object_id()),
1085 used_by: Set(tmp_model.job_id.as_object_id()),
1086 ..Default::default()
1087 })
1088 .exec(&txn)
1089 .await?;
1090
1091 txn.commit().await?;
1092
1093 Ok(tmp_model)
1094 }
1095
1096 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1098 let mut inner = self.inner.write().await;
1099 let txn = inner.db.begin().await?;
1100
1101 if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1103 tracing::info!(
1104 "streaming job {} is for iceberg table, wait for manual finish operation",
1105 job_id
1106 );
1107 return Ok(());
1108 }
1109
1110 let (notification_op, objects, updated_user_info, dependencies) =
1111 self.finish_streaming_job_inner(&txn, job_id).await?;
1112
1113 txn.commit().await?;
1114
1115 let mut version = self
1116 .notify_frontend(
1117 notification_op,
1118 NotificationInfo::ObjectGroup(PbObjectGroup {
1119 objects,
1120 dependencies,
1121 }),
1122 )
1123 .await;
1124
1125 if !updated_user_info.is_empty() {
1127 version = self.notify_users_update(updated_user_info).await;
1128 }
1129
1130 inner
1131 .creating_table_finish_notifier
1132 .values_mut()
1133 .for_each(|creating_tables| {
1134 if let Some(txs) = creating_tables.remove(&job_id) {
1135 for tx in txs {
1136 let _ = tx.send(Ok(version));
1137 }
1138 }
1139 });
1140
1141 Ok(())
1142 }
1143
1144 pub async fn finish_streaming_job_inner(
1146 &self,
1147 txn: &DatabaseTransaction,
1148 job_id: JobId,
1149 ) -> MetaResult<(
1150 Operation,
1151 Vec<risingwave_pb::meta::Object>,
1152 Vec<PbUserInfo>,
1153 Vec<PbObjectDependency>,
1154 )> {
1155 let job_type = Object::find_by_id(job_id)
1156 .select_only()
1157 .column(object::Column::ObjType)
1158 .into_tuple()
1159 .one(txn)
1160 .await?
1161 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1162
1163 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1164 .select_only()
1165 .column(streaming_job::Column::CreateType)
1166 .into_tuple()
1167 .one(txn)
1168 .await?
1169 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1170
1171 let res = Object::update_many()
1173 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1174 .col_expr(
1175 object::Column::CreatedAtClusterVersion,
1176 current_cluster_version().into(),
1177 )
1178 .filter(object::Column::Oid.eq(job_id))
1179 .exec(txn)
1180 .await?;
1181 if res.rows_affected == 0 {
1182 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1183 }
1184
1185 let job = streaming_job::ActiveModel {
1187 job_id: Set(job_id),
1188 job_status: Set(JobStatus::Created),
1189 ..Default::default()
1190 };
1191 let streaming_job = Some(job.update(txn).await?);
1192
1193 let internal_table_objs = Table::find()
1195 .find_also_related(Object)
1196 .filter(table::Column::BelongsToJobId.eq(job_id))
1197 .all(txn)
1198 .await?;
1199 let mut objects = internal_table_objs
1200 .iter()
1201 .map(|(table, obj)| PbObject {
1202 object_info: Some(PbObjectInfo::Table(
1203 ObjectModel(table.clone(), obj.clone().unwrap(), streaming_job.clone()).into(),
1204 )),
1205 })
1206 .collect_vec();
1207 let mut notification_op = if create_type == CreateType::Background {
1208 NotificationOperation::Update
1209 } else {
1210 NotificationOperation::Add
1211 };
1212 let mut updated_user_info = vec![];
1213 let mut need_grant_default_privileges = true;
1214
1215 match job_type {
1216 ObjectType::Table => {
1217 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1218 .find_also_related(Object)
1219 .one(txn)
1220 .await?
1221 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1222 if table.table_type == TableType::MaterializedView {
1223 notification_op = NotificationOperation::Update;
1224 }
1225
1226 if let Some(source_id) = table.optional_associated_source_id {
1227 let (src, obj) = Source::find_by_id(source_id)
1228 .find_also_related(Object)
1229 .one(txn)
1230 .await?
1231 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1232 objects.push(PbObject {
1233 object_info: Some(PbObjectInfo::Source(
1234 ObjectModel(src, obj.unwrap(), None).into(),
1235 )),
1236 });
1237 }
1238 objects.push(PbObject {
1239 object_info: Some(PbObjectInfo::Table(
1240 ObjectModel(table, obj.unwrap(), streaming_job).into(),
1241 )),
1242 });
1243 }
1244 ObjectType::Sink => {
1245 let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1246 .find_also_related(Object)
1247 .one(txn)
1248 .await?
1249 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1250 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1251 need_grant_default_privileges = false;
1252 }
1253 notification_op = NotificationOperation::Update;
1256 objects.push(PbObject {
1257 object_info: Some(PbObjectInfo::Sink(
1258 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
1259 )),
1260 });
1261 }
1262 ObjectType::Index => {
1263 need_grant_default_privileges = false;
1264 let (index, obj) = Index::find_by_id(job_id.as_index_id())
1265 .find_also_related(Object)
1266 .one(txn)
1267 .await?
1268 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1269 {
1270 let (table, obj) = Table::find_by_id(index.index_table_id)
1271 .find_also_related(Object)
1272 .one(txn)
1273 .await?
1274 .ok_or_else(|| {
1275 MetaError::catalog_id_not_found("table", index.index_table_id)
1276 })?;
1277 objects.push(PbObject {
1278 object_info: Some(PbObjectInfo::Table(
1279 ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1280 )),
1281 });
1282 }
1283
1284 let primary_table_privileges = UserPrivilege::find()
1287 .filter(
1288 user_privilege::Column::Oid
1289 .eq(index.primary_table_id)
1290 .and(user_privilege::Column::Action.eq(Action::Select)),
1291 )
1292 .all(txn)
1293 .await?;
1294 if !primary_table_privileges.is_empty() {
1295 let index_state_table_ids: Vec<TableId> = Table::find()
1296 .select_only()
1297 .column(table::Column::TableId)
1298 .filter(
1299 table::Column::BelongsToJobId
1300 .eq(job_id)
1301 .or(table::Column::TableId.eq(index.index_table_id)),
1302 )
1303 .into_tuple()
1304 .all(txn)
1305 .await?;
1306 let mut new_privileges = vec![];
1307 for privilege in &primary_table_privileges {
1308 for state_table_id in &index_state_table_ids {
1309 new_privileges.push(user_privilege::ActiveModel {
1310 id: Default::default(),
1311 oid: Set(state_table_id.as_object_id()),
1312 user_id: Set(privilege.user_id),
1313 action: Set(Action::Select),
1314 dependent_id: Set(privilege.dependent_id),
1315 granted_by: Set(privilege.granted_by),
1316 with_grant_option: Set(privilege.with_grant_option),
1317 });
1318 }
1319 }
1320 UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1321
1322 updated_user_info = list_user_info_by_ids(
1323 primary_table_privileges.into_iter().map(|p| p.user_id),
1324 txn,
1325 )
1326 .await?;
1327 }
1328
1329 objects.push(PbObject {
1330 object_info: Some(PbObjectInfo::Index(
1331 ObjectModel(index, obj.unwrap(), streaming_job).into(),
1332 )),
1333 });
1334 }
1335 ObjectType::Source => {
1336 let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1337 .find_also_related(Object)
1338 .one(txn)
1339 .await?
1340 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1341 objects.push(PbObject {
1342 object_info: Some(PbObjectInfo::Source(
1343 ObjectModel(source, obj.unwrap(), None).into(),
1344 )),
1345 });
1346 }
1347 _ => unreachable!("invalid job type: {:?}", job_type),
1348 }
1349
1350 if need_grant_default_privileges {
1351 updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1352 }
1353
1354 let dependencies =
1355 list_object_dependencies_by_object_id(txn, job_id.as_object_id()).await?;
1356
1357 Ok((notification_op, objects, updated_user_info, dependencies))
1358 }
1359
1360 pub async fn finish_replace_streaming_job(
1361 &self,
1362 tmp_id: JobId,
1363 streaming_job: StreamingJob,
1364 replace_upstream: FragmentReplaceUpstream,
1365 sink_into_table_context: SinkIntoTableContext,
1366 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1367 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1368 ) -> MetaResult<NotificationVersion> {
1369 let inner = self.inner.write().await;
1370 let txn = inner.db.begin().await?;
1371
1372 let (objects, delete_notification_objs, old_fragment_ids, new_fragment_ids) =
1373 Self::finish_replace_streaming_job_inner(
1374 tmp_id,
1375 replace_upstream,
1376 sink_into_table_context,
1377 &txn,
1378 streaming_job,
1379 drop_table_connector_ctx,
1380 auto_refresh_schema_sinks,
1381 )
1382 .await?;
1383
1384 txn.commit().await?;
1385
1386 let notification_manager = self.env.notification_manager();
1388 notification_manager.notify_serving_fragment_mapping_delete(
1389 old_fragment_ids.iter().map(|id| *id as _).collect(),
1390 );
1391 notification_manager.notify_serving_fragment_mapping_update(
1392 new_fragment_ids.iter().map(|id| *id as _).collect(),
1393 );
1394
1395 let mut version = self
1396 .notify_frontend(
1397 NotificationOperation::Update,
1398 NotificationInfo::ObjectGroup(PbObjectGroup {
1399 objects,
1400 dependencies: vec![],
1401 }),
1402 )
1403 .await;
1404
1405 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1406 self.notify_users_update(user_infos).await;
1407 version = self
1408 .notify_frontend(
1409 NotificationOperation::Delete,
1410 build_object_group_for_delete(to_drop_objects),
1411 )
1412 .await;
1413 }
1414
1415 Ok(version)
1416 }
1417
1418 fn update_iceberg_source_columns(
1419 original_source_columns: &[ColumnCatalog],
1420 original_row_id_index: Option<usize>,
1421 new_table_columns: &[ColumnCatalog],
1422 ) -> (Vec<ColumnCatalog>, Option<usize>) {
1423 let row_id_column_name = original_row_id_index
1424 .and_then(|idx| original_source_columns.get(idx))
1425 .map(|col| col.name().to_owned());
1426 let mut next_column_id = max_column_id(original_source_columns).next();
1427 let existing_columns: HashMap<String, ColumnCatalog> = original_source_columns
1428 .iter()
1429 .cloned()
1430 .map(|col| (col.name().to_owned(), col))
1431 .collect();
1432
1433 let mut new_columns = Vec::new();
1434 for table_col in new_table_columns
1435 .iter()
1436 .filter(|col| !col.is_rw_sys_column())
1437 {
1438 let mut source_col_name = table_col.name().to_owned();
1439 if source_col_name == ROW_ID_COLUMN_NAME {
1440 source_col_name = RISINGWAVE_ICEBERG_ROW_ID.to_owned();
1441 }
1442
1443 if let Some(existing) = existing_columns.get(&source_col_name) {
1444 new_columns.push(existing.clone());
1445 } else {
1446 let mut new_col = table_col.clone();
1447 new_col.column_desc.name = source_col_name;
1448 new_col.column_desc.column_id = next_column_id;
1449 next_column_id = next_column_id.next();
1450 new_columns.push(new_col);
1451 }
1452 }
1453
1454 let mut seen_names: HashSet<String> = new_columns
1455 .iter()
1456 .map(|col| col.name().to_owned())
1457 .collect();
1458 for col in original_source_columns
1459 .iter()
1460 .filter(|col| col.is_iceberg_hidden_column())
1461 {
1462 if seen_names.insert(col.name().to_owned()) {
1463 new_columns.push(col.clone());
1464 }
1465 }
1466
1467 let new_row_id_index = row_id_column_name
1468 .as_ref()
1469 .and_then(|name| new_columns.iter().position(|col| col.name() == name));
1470
1471 (new_columns, new_row_id_index)
1472 }
1473
1474 pub async fn finish_replace_streaming_job_inner(
1475 tmp_id: JobId,
1476 replace_upstream: FragmentReplaceUpstream,
1477 SinkIntoTableContext {
1478 updated_sink_catalogs,
1479 }: SinkIntoTableContext,
1480 txn: &DatabaseTransaction,
1481 streaming_job: StreamingJob,
1482 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1483 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1484 ) -> MetaResult<(
1485 Vec<PbObject>,
1486 Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1487 Vec<FragmentId>,
1488 Vec<FragmentId>,
1489 )> {
1490 let original_job_id = streaming_job.id();
1491 let job_type = streaming_job.job_type();
1492
1493 let old_fragment_ids: Vec<FragmentId> = Fragment::find()
1495 .select_only()
1496 .column(fragment::Column::FragmentId)
1497 .filter(fragment::Column::JobId.eq(original_job_id))
1498 .into_tuple()
1499 .all(txn)
1500 .await?;
1501 let new_fragment_ids: Vec<FragmentId> = Fragment::find()
1502 .select_only()
1503 .column(fragment::Column::FragmentId)
1504 .filter(fragment::Column::JobId.eq(tmp_id))
1505 .into_tuple()
1506 .all(txn)
1507 .await?;
1508
1509 let mut index_item_rewriter = None;
1510 let mut updated_iceberg_source_id: Option<SourceId> = None;
1511
1512 match streaming_job {
1514 StreamingJob::Table(_, table, _table_job_type) => {
1515 let original_column_catalogs =
1516 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1517 let schema_changed = original_column_catalogs.to_protobuf() != table.columns;
1518 let is_iceberg = table
1519 .engine
1520 .and_then(|engine| PbEngine::try_from(engine).ok())
1521 == Some(PbEngine::Iceberg);
1522 if is_iceberg && schema_changed {
1523 let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1524 let source = Source::find()
1525 .inner_join(Object)
1526 .filter(
1527 object::Column::DatabaseId
1528 .eq(table.database_id)
1529 .and(object::Column::SchemaId.eq(table.schema_id))
1530 .and(source::Column::Name.eq(&iceberg_source_name)),
1531 )
1532 .one(txn)
1533 .await?
1534 .ok_or_else(|| {
1535 MetaError::catalog_id_not_found("source", iceberg_source_name)
1536 })?;
1537
1538 let source_id = source.source_id;
1539 let source_version = source.version;
1540 let source_columns: Vec<ColumnCatalog> = source
1541 .columns
1542 .to_protobuf()
1543 .into_iter()
1544 .map(ColumnCatalog::from)
1545 .collect();
1546 let source_row_id_index = source
1547 .row_id_index
1548 .and_then(|idx| usize::try_from(idx).ok());
1549 let table_columns: Vec<ColumnCatalog> = table
1550 .columns
1551 .iter()
1552 .cloned()
1553 .map(ColumnCatalog::from)
1554 .collect();
1555 let (updated_columns, updated_row_id_index) =
1556 Self::update_iceberg_source_columns(
1557 &source_columns,
1558 source_row_id_index,
1559 &table_columns,
1560 );
1561 let updated_columns: Vec<PbColumnCatalog> = updated_columns
1562 .into_iter()
1563 .map(|col| col.to_protobuf())
1564 .collect();
1565
1566 let mut source_active = source.into_active_model();
1567 source_active.columns = Set(ColumnCatalogArray::from(updated_columns));
1568 source_active.row_id_index = Set(updated_row_id_index.map(|idx| idx as i32));
1569 source_active.version = Set(source_version + 1);
1570 source_active.update(txn).await?;
1571 updated_iceberg_source_id = Some(source_id);
1572 }
1573
1574 index_item_rewriter = Some({
1575 let original_columns = original_column_catalogs
1576 .to_protobuf()
1577 .into_iter()
1578 .map(|c| c.column_desc.unwrap())
1579 .collect_vec();
1580 let new_columns = table
1581 .columns
1582 .iter()
1583 .map(|c| c.column_desc.clone().unwrap())
1584 .collect_vec();
1585
1586 IndexItemRewriter {
1587 original_columns,
1588 new_columns,
1589 }
1590 });
1591
1592 for sink_id in updated_sink_catalogs {
1594 Sink::update(sink::ActiveModel {
1595 sink_id: Set(sink_id as _),
1596 original_target_columns: Set(Some(original_column_catalogs.clone())),
1597 ..Default::default()
1598 })
1599 .exec(txn)
1600 .await?;
1601 }
1602 let mut table = table::ActiveModel::from(table);
1604 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1605 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1606 {
1607 table.optional_associated_source_id = Set(None);
1609 }
1610
1611 Table::update(table).exec(txn).await?;
1612 }
1613 StreamingJob::Source(source) => {
1614 let source = source::ActiveModel::from(source);
1616 Source::update(source).exec(txn).await?;
1617 }
1618 StreamingJob::MaterializedView(table) => {
1619 let table = table::ActiveModel::from(table);
1621 Table::update(table).exec(txn).await?;
1622 }
1623 _ => unreachable!(
1624 "invalid streaming job type: {:?}",
1625 streaming_job.job_type_str()
1626 ),
1627 }
1628
1629 async fn finish_fragments(
1630 txn: &DatabaseTransaction,
1631 tmp_id: JobId,
1632 original_job_id: JobId,
1633 replace_upstream: FragmentReplaceUpstream,
1634 ) -> MetaResult<()> {
1635 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1639 .select_only()
1640 .columns([
1641 fragment::Column::FragmentId,
1642 fragment::Column::StateTableIds,
1643 ])
1644 .filter(fragment::Column::JobId.eq(tmp_id))
1645 .into_tuple()
1646 .all(txn)
1647 .await?;
1648 for (fragment_id, state_table_ids) in fragment_info {
1649 for state_table_id in state_table_ids.into_inner() {
1650 let state_table_id = TableId::new(state_table_id as _);
1651 Table::update(table::ActiveModel {
1652 table_id: Set(state_table_id),
1653 fragment_id: Set(Some(fragment_id)),
1654 ..Default::default()
1656 })
1657 .exec(txn)
1658 .await?;
1659 }
1660 }
1661
1662 Fragment::delete_many()
1664 .filter(fragment::Column::JobId.eq(original_job_id))
1665 .exec(txn)
1666 .await?;
1667 Fragment::update_many()
1668 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1669 .filter(fragment::Column::JobId.eq(tmp_id))
1670 .exec(txn)
1671 .await?;
1672
1673 for (fragment_id, fragment_replace_map) in replace_upstream {
1676 let (fragment_id, mut stream_node) =
1677 Fragment::find_by_id(fragment_id as FragmentId)
1678 .select_only()
1679 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1680 .into_tuple::<(FragmentId, StreamNode)>()
1681 .one(txn)
1682 .await?
1683 .map(|(id, node)| (id, node.to_protobuf()))
1684 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1685
1686 visit_stream_node_mut(&mut stream_node, |body| {
1687 if let PbNodeBody::Merge(m) = body
1688 && let Some(new_fragment_id) =
1689 fragment_replace_map.get(&m.upstream_fragment_id)
1690 {
1691 m.upstream_fragment_id = *new_fragment_id;
1692 }
1693 });
1694 Fragment::update(fragment::ActiveModel {
1695 fragment_id: Set(fragment_id),
1696 stream_node: Set(StreamNode::from(&stream_node)),
1697 ..Default::default()
1698 })
1699 .exec(txn)
1700 .await?;
1701 }
1702
1703 Object::delete_by_id(tmp_id).exec(txn).await?;
1705
1706 Ok(())
1707 }
1708
1709 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1710
1711 let mut objects = vec![];
1713 match job_type {
1714 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1715 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1716 .find_also_related(Object)
1717 .one(txn)
1718 .await?
1719 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1720 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
1721 .one(txn)
1722 .await?;
1723 objects.push(PbObject {
1724 object_info: Some(PbObjectInfo::Table(
1725 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
1726 )),
1727 })
1728 }
1729 StreamingJobType::Source => {
1730 let (source, source_obj) =
1731 Source::find_by_id(original_job_id.as_shared_source_id())
1732 .find_also_related(Object)
1733 .one(txn)
1734 .await?
1735 .ok_or_else(|| {
1736 MetaError::catalog_id_not_found("object", original_job_id)
1737 })?;
1738 objects.push(PbObject {
1739 object_info: Some(PbObjectInfo::Source(
1740 ObjectModel(source, source_obj.unwrap(), None).into(),
1741 )),
1742 })
1743 }
1744 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1745 }
1746
1747 if let Some(source_id) = updated_iceberg_source_id {
1748 let (source, source_obj) = Source::find_by_id(source_id)
1749 .find_also_related(Object)
1750 .one(txn)
1751 .await?
1752 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1753 objects.push(PbObject {
1754 object_info: Some(PbObjectInfo::Source(
1755 ObjectModel(source, source_obj.unwrap(), None).into(),
1756 )),
1757 });
1758 }
1759
1760 if let Some(expr_rewriter) = index_item_rewriter {
1761 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1762 .select_only()
1763 .columns([index::Column::IndexId, index::Column::IndexItems])
1764 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1765 .into_tuple()
1766 .all(txn)
1767 .await?;
1768 for (index_id, nodes) in index_items {
1769 let mut pb_nodes = nodes.to_protobuf();
1770 pb_nodes
1771 .iter_mut()
1772 .for_each(|x| expr_rewriter.rewrite_expr(x));
1773 let index = index::ActiveModel {
1774 index_id: Set(index_id),
1775 index_items: Set(pb_nodes.into()),
1776 ..Default::default()
1777 }
1778 .update(txn)
1779 .await?;
1780 let (index_obj, streaming_job) = Object::find_by_id(index.index_id)
1781 .find_also_related(streaming_job::Entity)
1782 .one(txn)
1783 .await?
1784 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1785 objects.push(PbObject {
1786 object_info: Some(PbObjectInfo::Index(
1787 ObjectModel(index, index_obj, streaming_job).into(),
1788 )),
1789 });
1790 }
1791 }
1792
1793 if let Some(sinks) = auto_refresh_schema_sinks {
1794 for finish_sink_context in sinks {
1795 finish_fragments(
1796 txn,
1797 finish_sink_context.tmp_sink_id.as_job_id(),
1798 finish_sink_context.original_sink_id.as_job_id(),
1799 Default::default(),
1800 )
1801 .await?;
1802 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1803 .find_also_related(Object)
1804 .one(txn)
1805 .await?
1806 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1807 let sink_streaming_job =
1808 streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
1809 .one(txn)
1810 .await?;
1811 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1812 Sink::update(sink::ActiveModel {
1813 sink_id: Set(finish_sink_context.original_sink_id),
1814 columns: Set(columns.clone()),
1815 ..Default::default()
1816 })
1817 .exec(txn)
1818 .await?;
1819 sink.columns = columns;
1820 objects.push(PbObject {
1821 object_info: Some(PbObjectInfo::Sink(
1822 ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job.clone()).into(),
1823 )),
1824 });
1825 if let Some((log_store_table_id, new_log_store_table_columns)) =
1826 finish_sink_context.new_log_store_table
1827 {
1828 let new_log_store_table_columns: ColumnCatalogArray =
1829 new_log_store_table_columns.into();
1830 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1831 .find_also_related(Object)
1832 .one(txn)
1833 .await?
1834 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1835 Table::update(table::ActiveModel {
1836 table_id: Set(log_store_table_id),
1837 columns: Set(new_log_store_table_columns.clone()),
1838 ..Default::default()
1839 })
1840 .exec(txn)
1841 .await?;
1842 table.columns = new_log_store_table_columns;
1843 objects.push(PbObject {
1844 object_info: Some(PbObjectInfo::Table(
1845 ObjectModel(table, table_obj.unwrap(), sink_streaming_job.clone())
1846 .into(),
1847 )),
1848 });
1849 }
1850 }
1851 }
1852
1853 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1854 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1855 notification_objs =
1856 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1857 }
1858
1859 Ok((
1860 objects,
1861 notification_objs,
1862 old_fragment_ids,
1863 new_fragment_ids,
1864 ))
1865 }
1866
1867 pub async fn try_abort_replacing_streaming_job(
1869 &self,
1870 tmp_job_id: JobId,
1871 tmp_sink_ids: Option<Vec<ObjectId>>,
1872 ) -> MetaResult<()> {
1873 let inner = self.inner.write().await;
1874 let txn = inner.db.begin().await?;
1875
1876 let mut all_job_ids: Vec<ObjectId> = vec![tmp_job_id.into()];
1878 if let Some(ref sink_ids) = tmp_sink_ids {
1879 all_job_ids.extend(sink_ids.iter().copied());
1880 }
1881 let abort_fragment_ids: Vec<FragmentId> = Fragment::find()
1882 .select_only()
1883 .column(fragment::Column::FragmentId)
1884 .filter(fragment::Column::JobId.is_in(all_job_ids))
1885 .into_tuple()
1886 .all(&txn)
1887 .await?;
1888
1889 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1890 if let Some(tmp_sink_ids) = tmp_sink_ids {
1891 for tmp_sink_id in tmp_sink_ids {
1892 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1893 }
1894 }
1895 txn.commit().await?;
1896
1897 self.env
1899 .notification_manager()
1900 .notify_serving_fragment_mapping_delete(
1901 abort_fragment_ids.iter().map(|id| *id as _).collect(),
1902 );
1903
1904 Ok(())
1905 }
1906
1907 pub async fn update_source_rate_limit_by_source_id(
1910 &self,
1911 source_id: SourceId,
1912 rate_limit: Option<u32>,
1913 ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1914 let inner = self.inner.read().await;
1915 let txn = inner.db.begin().await?;
1916
1917 {
1918 let active_source = source::ActiveModel {
1919 source_id: Set(source_id),
1920 rate_limit: Set(rate_limit.map(|v| v as i32)),
1921 ..Default::default()
1922 };
1923 Source::update(active_source).exec(&txn).await?;
1924 }
1925
1926 let (source, obj) = Source::find_by_id(source_id)
1927 .find_also_related(Object)
1928 .one(&txn)
1929 .await?
1930 .ok_or_else(|| {
1931 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1932 })?;
1933
1934 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1935 let streaming_job_ids: Vec<JobId> =
1936 if let Some(table_id) = source.optional_associated_table_id {
1937 vec![table_id.as_job_id()]
1938 } else if let Some(source_info) = &source.source_info
1939 && source_info.to_protobuf().is_shared()
1940 {
1941 vec![source_id.as_share_source_job_id()]
1942 } else {
1943 ObjectDependency::find()
1944 .select_only()
1945 .column(object_dependency::Column::UsedBy)
1946 .filter(object_dependency::Column::Oid.eq(source_id))
1947 .into_tuple()
1948 .all(&txn)
1949 .await?
1950 };
1951
1952 if streaming_job_ids.is_empty() {
1953 return Err(MetaError::invalid_parameter(format!(
1954 "source id {source_id} not used by any streaming job"
1955 )));
1956 }
1957
1958 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
1959 .select_only()
1960 .columns([
1961 fragment::Column::FragmentId,
1962 fragment::Column::JobId,
1963 fragment::Column::FragmentTypeMask,
1964 fragment::Column::StreamNode,
1965 ])
1966 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1967 .into_tuple()
1968 .all(&txn)
1969 .await?;
1970 let mut fragments = fragments
1971 .into_iter()
1972 .map(|(id, job_id, mask, stream_node)| {
1973 (
1974 id,
1975 job_id,
1976 FragmentTypeMask::from(mask as u32),
1977 stream_node.to_protobuf(),
1978 )
1979 })
1980 .collect_vec();
1981
1982 fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
1983 let mut found = false;
1984 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1985 visit_stream_node_mut(stream_node, |node| {
1986 if let PbNodeBody::Source(node) = node
1987 && let Some(node_inner) = &mut node.source_inner
1988 && node_inner.source_id == source_id
1989 {
1990 node_inner.rate_limit = rate_limit;
1991 found = true;
1992 }
1993 });
1994 }
1995 if is_fs_source {
1996 visit_stream_node_mut(stream_node, |node| {
1999 if let PbNodeBody::StreamFsFetch(node) = node {
2000 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
2001 if let Some(node_inner) = &mut node.node_inner
2002 && node_inner.source_id == source_id
2003 {
2004 node_inner.rate_limit = rate_limit;
2005 found = true;
2006 }
2007 }
2008 });
2009 }
2010 found
2011 });
2012
2013 assert!(
2014 !fragments.is_empty(),
2015 "source id should be used by at least one fragment"
2016 );
2017
2018 let (fragment_ids, job_ids) = fragments
2019 .iter()
2020 .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
2021 .unzip();
2022
2023 for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
2024 Fragment::update(fragment::ActiveModel {
2025 fragment_id: Set(fragment_id),
2026 fragment_type_mask: Set(fragment_type_mask.into()),
2027 stream_node: Set(StreamNode::from(&stream_node)),
2028 ..Default::default()
2029 })
2030 .exec(&txn)
2031 .await?;
2032 }
2033
2034 txn.commit().await?;
2035
2036 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap(), None).into());
2037 let _version = self
2038 .notify_frontend(
2039 NotificationOperation::Update,
2040 NotificationInfo::ObjectGroup(PbObjectGroup {
2041 objects: vec![PbObject {
2042 object_info: Some(relation_info),
2043 }],
2044 dependencies: vec![],
2045 }),
2046 )
2047 .await;
2048
2049 Ok((job_ids, fragment_ids))
2050 }
2051
2052 pub async fn mutate_fragments_by_job_id(
2055 &self,
2056 job_id: JobId,
2057 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
2059 err_msg: &'static str,
2061 ) -> MetaResult<HashSet<FragmentId>> {
2062 let inner = self.inner.read().await;
2063 let txn = inner.db.begin().await?;
2064
2065 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2066 .select_only()
2067 .columns([
2068 fragment::Column::FragmentId,
2069 fragment::Column::FragmentTypeMask,
2070 fragment::Column::StreamNode,
2071 ])
2072 .filter(fragment::Column::JobId.eq(job_id))
2073 .into_tuple()
2074 .all(&txn)
2075 .await?;
2076 let mut fragments = fragments
2077 .into_iter()
2078 .map(|(id, mask, stream_node)| {
2079 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
2080 })
2081 .collect_vec();
2082
2083 let fragments = fragments
2084 .iter_mut()
2085 .map(|(_, fragment_type_mask, stream_node)| {
2086 fragments_mutation_fn(*fragment_type_mask, stream_node)
2087 })
2088 .collect::<MetaResult<Vec<bool>>>()?
2089 .into_iter()
2090 .zip_eq_debug(std::mem::take(&mut fragments))
2091 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
2092 .collect::<Vec<_>>();
2093
2094 if fragments.is_empty() {
2095 return Err(MetaError::invalid_parameter(format!(
2096 "job id {job_id}: {}",
2097 err_msg
2098 )));
2099 }
2100
2101 let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
2102 for (id, _, stream_node) in fragments {
2103 Fragment::update(fragment::ActiveModel {
2104 fragment_id: Set(id),
2105 stream_node: Set(StreamNode::from(&stream_node)),
2106 ..Default::default()
2107 })
2108 .exec(&txn)
2109 .await?;
2110 }
2111
2112 txn.commit().await?;
2113
2114 Ok(fragment_ids)
2115 }
2116
2117 async fn mutate_fragment_by_fragment_id(
2118 &self,
2119 fragment_id: FragmentId,
2120 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
2121 err_msg: &'static str,
2122 ) -> MetaResult<()> {
2123 let inner = self.inner.read().await;
2124 let txn = inner.db.begin().await?;
2125
2126 let (fragment_type_mask, stream_node): (i32, StreamNode) =
2127 Fragment::find_by_id(fragment_id)
2128 .select_only()
2129 .columns([
2130 fragment::Column::FragmentTypeMask,
2131 fragment::Column::StreamNode,
2132 ])
2133 .into_tuple()
2134 .one(&txn)
2135 .await?
2136 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
2137 let mut pb_stream_node = stream_node.to_protobuf();
2138 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
2139
2140 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
2141 return Err(MetaError::invalid_parameter(format!(
2142 "fragment id {fragment_id}: {}",
2143 err_msg
2144 )));
2145 }
2146
2147 Fragment::update(fragment::ActiveModel {
2148 fragment_id: Set(fragment_id),
2149 stream_node: Set(stream_node),
2150 ..Default::default()
2151 })
2152 .exec(&txn)
2153 .await?;
2154
2155 txn.commit().await?;
2156
2157 Ok(())
2158 }
2159
2160 pub async fn update_backfill_orders_by_job_id(
2161 &self,
2162 job_id: JobId,
2163 backfill_orders: Option<BackfillOrders>,
2164 ) -> MetaResult<()> {
2165 let inner = self.inner.write().await;
2166 let txn = inner.db.begin().await?;
2167
2168 ensure_job_not_canceled(job_id, &txn).await?;
2169
2170 streaming_job::ActiveModel {
2171 job_id: Set(job_id),
2172 backfill_orders: Set(backfill_orders),
2173 ..Default::default()
2174 }
2175 .update(&txn)
2176 .await?;
2177
2178 txn.commit().await?;
2179
2180 Ok(())
2181 }
2182
2183 pub async fn update_backfill_rate_limit_by_job_id(
2186 &self,
2187 job_id: JobId,
2188 rate_limit: Option<u32>,
2189 ) -> MetaResult<HashSet<FragmentId>> {
2190 let update_backfill_rate_limit =
2191 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2192 let mut found = false;
2193 if fragment_type_mask
2194 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
2195 {
2196 visit_stream_node_mut(stream_node, |node| match node {
2197 PbNodeBody::StreamCdcScan(node) => {
2198 node.rate_limit = rate_limit;
2199 found = true;
2200 }
2201 PbNodeBody::StreamScan(node) => {
2202 node.rate_limit = rate_limit;
2203 found = true;
2204 }
2205 PbNodeBody::SourceBackfill(node) => {
2206 node.rate_limit = rate_limit;
2207 found = true;
2208 }
2209 _ => {}
2210 });
2211 }
2212 Ok(found)
2213 };
2214
2215 self.mutate_fragments_by_job_id(
2216 job_id,
2217 update_backfill_rate_limit,
2218 "stream scan node or source node not found",
2219 )
2220 .await
2221 }
2222
2223 pub async fn update_sink_rate_limit_by_job_id(
2226 &self,
2227 sink_id: SinkId,
2228 rate_limit: Option<u32>,
2229 ) -> MetaResult<HashSet<FragmentId>> {
2230 let update_sink_rate_limit =
2231 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2232 let mut found = Ok(false);
2233 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
2234 visit_stream_node_mut(stream_node, |node| {
2235 if let PbNodeBody::Sink(node) = node {
2236 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
2237 found = Err(MetaError::invalid_parameter(
2238 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
2239 ));
2240 return;
2241 }
2242 node.rate_limit = rate_limit;
2243 found = Ok(true);
2244 }
2245 });
2246 }
2247 found
2248 };
2249
2250 self.mutate_fragments_by_job_id(
2251 sink_id.as_job_id(),
2252 update_sink_rate_limit,
2253 "sink node not found",
2254 )
2255 .await
2256 }
2257
2258 pub async fn update_dml_rate_limit_by_job_id(
2259 &self,
2260 job_id: JobId,
2261 rate_limit: Option<u32>,
2262 ) -> MetaResult<HashSet<FragmentId>> {
2263 let update_dml_rate_limit =
2264 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2265 let mut found = false;
2266 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
2267 visit_stream_node_mut(stream_node, |node| {
2268 if let PbNodeBody::Dml(node) = node {
2269 node.rate_limit = rate_limit;
2270 found = true;
2271 }
2272 });
2273 }
2274 Ok(found)
2275 };
2276
2277 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
2278 .await
2279 }
2280
2281 pub async fn update_source_props_by_source_id(
2282 &self,
2283 source_id: SourceId,
2284 alter_props: BTreeMap<String, String>,
2285 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2286 skip_alter_on_fly_check: bool,
2287 ) -> MetaResult<WithOptionsSecResolved> {
2288 let inner = self.inner.read().await;
2289 let txn = inner.db.begin().await?;
2290
2291 let (source, _obj) = Source::find_by_id(source_id)
2292 .find_also_related(Object)
2293 .one(&txn)
2294 .await?
2295 .ok_or_else(|| {
2296 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2297 })?;
2298 let connector = source.with_properties.0.get_connector().unwrap();
2299 let is_shared_source = source.is_shared();
2300
2301 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2302 if !is_shared_source {
2303 dep_source_job_ids = ObjectDependency::find()
2305 .select_only()
2306 .column(object_dependency::Column::UsedBy)
2307 .filter(object_dependency::Column::Oid.eq(source_id))
2308 .into_tuple()
2309 .all(&txn)
2310 .await?;
2311 }
2312
2313 if let Some(new_connector) = alter_props.get(UPSTREAM_SOURCE_KEY)
2315 && new_connector != &connector
2316 {
2317 return Err(MetaError::invalid_parameter(format!(
2318 "Cannot change connector type from '{}' to '{}'. Drop and recreate the source instead.",
2319 connector, new_connector
2320 )));
2321 }
2322
2323 if !skip_alter_on_fly_check {
2325 let prop_keys: Vec<String> = alter_props
2326 .keys()
2327 .chain(alter_secret_refs.keys())
2328 .cloned()
2329 .collect();
2330 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2331 &connector, &prop_keys,
2332 )?;
2333 }
2334
2335 let mut options_with_secret = WithOptionsSecResolved::new(
2336 source.with_properties.0.clone(),
2337 source
2338 .secret_ref
2339 .map(|secret_ref| secret_ref.to_protobuf())
2340 .unwrap_or_default(),
2341 );
2342 let (to_add_secret_dep, to_remove_secret_dep) =
2343 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2344
2345 tracing::info!(
2346 "applying new properties to source: source_id={}, options_with_secret={:?}",
2347 source_id,
2348 options_with_secret
2349 );
2350 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2352 let mut associate_table_id = None;
2355
2356 let mut preferred_id = source_id.as_object_id();
2360 let rewrite_sql = {
2361 let definition = source.definition.clone();
2362
2363 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2364 .map_err(|e| {
2365 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2366 anyhow!(e).context("Failed to parse source definition SQL"),
2367 )))
2368 })?
2369 .try_into()
2370 .unwrap();
2371
2372 async fn format_with_option_secret_resolved(
2386 txn: &DatabaseTransaction,
2387 options_with_secret: &WithOptionsSecResolved,
2388 ) -> MetaResult<Vec<SqlOption>> {
2389 let mut options = Vec::new();
2390 for (k, v) in options_with_secret.as_plaintext() {
2391 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2392 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2393 options.push(sql_option);
2394 }
2395 for (k, v) in options_with_secret.as_secret() {
2396 if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2397 let sql_option =
2398 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2399 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2400 options.push(sql_option);
2401 } else {
2402 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2403 }
2404 }
2405 Ok(options)
2406 }
2407
2408 match &mut stmt {
2409 Statement::CreateSource { stmt } => {
2410 stmt.with_properties.0 =
2411 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2412 }
2413 Statement::CreateTable { with_options, .. } => {
2414 *with_options =
2415 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2416 associate_table_id = source.optional_associated_table_id;
2417 preferred_id = associate_table_id.unwrap().as_object_id();
2418 }
2419 _ => unreachable!(),
2420 }
2421
2422 stmt.to_string()
2423 };
2424
2425 {
2426 if !to_add_secret_dep.is_empty() {
2429 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2430 object_dependency::ActiveModel {
2431 oid: Set(secret_id.into()),
2432 used_by: Set(preferred_id),
2433 ..Default::default()
2434 }
2435 }))
2436 .exec(&txn)
2437 .await?;
2438 }
2439 if !to_remove_secret_dep.is_empty() {
2442 let _ = ObjectDependency::delete_many()
2443 .filter(
2444 object_dependency::Column::Oid
2445 .is_in(to_remove_secret_dep)
2446 .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2447 )
2448 .exec(&txn)
2449 .await?;
2450 }
2451 }
2452
2453 let active_source_model = source::ActiveModel {
2454 source_id: Set(source_id),
2455 definition: Set(rewrite_sql.clone()),
2456 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2457 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2458 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2459 ..Default::default()
2460 };
2461 Source::update(active_source_model).exec(&txn).await?;
2462
2463 if let Some(associate_table_id) = associate_table_id {
2464 let active_table_model = table::ActiveModel {
2466 table_id: Set(associate_table_id),
2467 definition: Set(rewrite_sql),
2468 ..Default::default()
2469 };
2470 Table::update(active_table_model).exec(&txn).await?;
2471 }
2472
2473 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2474 associate_table_id.as_job_id()
2476 } else {
2477 source_id.as_share_source_job_id()
2478 }]
2479 .into_iter()
2480 .chain(dep_source_job_ids.into_iter())
2481 .collect_vec();
2482
2483 update_connector_props_fragments(
2485 &txn,
2486 to_check_job_ids,
2487 FragmentTypeFlag::Source,
2488 |node, found| {
2489 if let PbNodeBody::Source(node) = node
2490 && let Some(source_inner) = &mut node.source_inner
2491 {
2492 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2493 source_inner.secret_refs = options_with_secret.as_secret().clone();
2494 *found = true;
2495 }
2496 },
2497 is_shared_source,
2498 )
2499 .await?;
2500
2501 let mut to_update_objs = Vec::with_capacity(2);
2502 let (source, obj) = Source::find_by_id(source_id)
2503 .find_also_related(Object)
2504 .one(&txn)
2505 .await?
2506 .ok_or_else(|| {
2507 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2508 })?;
2509 to_update_objs.push(PbObject {
2510 object_info: Some(PbObjectInfo::Source(
2511 ObjectModel(source, obj.unwrap(), None).into(),
2512 )),
2513 });
2514
2515 if let Some(associate_table_id) = associate_table_id {
2516 let (table, obj) = Table::find_by_id(associate_table_id)
2517 .find_also_related(Object)
2518 .one(&txn)
2519 .await?
2520 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2521 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2522 .one(&txn)
2523 .await?;
2524 to_update_objs.push(PbObject {
2525 object_info: Some(PbObjectInfo::Table(
2526 ObjectModel(table, obj.unwrap(), streaming_job).into(),
2527 )),
2528 });
2529 }
2530
2531 txn.commit().await?;
2532
2533 self.notify_frontend(
2534 NotificationOperation::Update,
2535 NotificationInfo::ObjectGroup(PbObjectGroup {
2536 objects: to_update_objs,
2537 dependencies: vec![],
2538 }),
2539 )
2540 .await;
2541
2542 Ok(options_with_secret)
2543 }
2544
2545 pub async fn update_sink_props_by_sink_id(
2546 &self,
2547 sink_id: SinkId,
2548 props: BTreeMap<String, String>,
2549 ) -> MetaResult<HashMap<String, String>> {
2550 let inner = self.inner.read().await;
2551 let txn = inner.db.begin().await?;
2552
2553 let (sink, _obj) = Sink::find_by_id(sink_id)
2554 .find_also_related(Object)
2555 .one(&txn)
2556 .await?
2557 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2558 validate_sink_props(&sink, &props)?;
2559 let definition = sink.definition.clone();
2560 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2561 .map_err(|e| SinkError::Config(anyhow!(e)))?
2562 .try_into()
2563 .unwrap();
2564 if let Statement::CreateSink { stmt } = &mut stmt {
2565 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2566 } else {
2567 panic!("definition is not a create sink statement")
2568 }
2569 let mut new_config = sink.properties.clone().into_inner();
2570 new_config.extend(props.clone());
2571
2572 let definition = stmt.to_string();
2573 let active_sink = sink::ActiveModel {
2574 sink_id: Set(sink_id),
2575 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2576 definition: Set(definition),
2577 ..Default::default()
2578 };
2579 Sink::update(active_sink).exec(&txn).await?;
2580
2581 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2582 let (sink, obj) = Sink::find_by_id(sink_id)
2583 .find_also_related(Object)
2584 .one(&txn)
2585 .await?
2586 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2587 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2588 .one(&txn)
2589 .await?;
2590 txn.commit().await?;
2591 let relation_infos = vec![PbObject {
2592 object_info: Some(PbObjectInfo::Sink(
2593 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2594 )),
2595 }];
2596
2597 let _version = self
2598 .notify_frontend(
2599 NotificationOperation::Update,
2600 NotificationInfo::ObjectGroup(PbObjectGroup {
2601 objects: relation_infos,
2602 dependencies: vec![],
2603 }),
2604 )
2605 .await;
2606
2607 Ok(props.into_iter().collect())
2608 }
2609
2610 pub async fn update_iceberg_table_props_by_table_id(
2611 &self,
2612 table_id: TableId,
2613 props: BTreeMap<String, String>,
2614 alter_iceberg_table_props: Option<
2615 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2616 >,
2617 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2618 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2619 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2620 let inner = self.inner.read().await;
2621 let txn = inner.db.begin().await?;
2622
2623 let (sink, _obj) = Sink::find_by_id(sink_id)
2624 .find_also_related(Object)
2625 .one(&txn)
2626 .await?
2627 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2628 validate_sink_props(&sink, &props)?;
2629
2630 let definition = sink.definition.clone();
2631 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2632 .map_err(|e| SinkError::Config(anyhow!(e)))?
2633 .try_into()
2634 .unwrap();
2635 if let Statement::CreateTable {
2636 with_options,
2637 engine,
2638 ..
2639 } = &mut stmt
2640 {
2641 if !matches!(engine, Engine::Iceberg) {
2642 return Err(SinkError::Config(anyhow!(
2643 "only iceberg table can be altered as sink"
2644 ))
2645 .into());
2646 }
2647 update_stmt_with_props(with_options, &props)?;
2648 } else {
2649 panic!("definition is not a create iceberg table statement")
2650 }
2651 let mut new_config = sink.properties.clone().into_inner();
2652 new_config.extend(props.clone());
2653
2654 let definition = stmt.to_string();
2655 let active_sink = sink::ActiveModel {
2656 sink_id: Set(sink_id),
2657 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2658 definition: Set(definition.clone()),
2659 ..Default::default()
2660 };
2661 let active_source = source::ActiveModel {
2662 source_id: Set(source_id),
2663 definition: Set(definition.clone()),
2664 ..Default::default()
2665 };
2666 let active_table = table::ActiveModel {
2667 table_id: Set(table_id),
2668 definition: Set(definition),
2669 ..Default::default()
2670 };
2671 Sink::update(active_sink).exec(&txn).await?;
2672 Source::update(active_source).exec(&txn).await?;
2673 Table::update(active_table).exec(&txn).await?;
2674
2675 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2676
2677 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2678 .find_also_related(Object)
2679 .one(&txn)
2680 .await?
2681 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2682 let sink_streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2683 .one(&txn)
2684 .await?;
2685 let (source, source_obj) = Source::find_by_id(source_id)
2686 .find_also_related(Object)
2687 .one(&txn)
2688 .await?
2689 .ok_or_else(|| {
2690 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2691 })?;
2692 let (table, table_obj) = Table::find_by_id(table_id)
2693 .find_also_related(Object)
2694 .one(&txn)
2695 .await?
2696 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2697 let table_streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2698 .one(&txn)
2699 .await?;
2700 txn.commit().await?;
2701 let relation_infos = vec![
2702 PbObject {
2703 object_info: Some(PbObjectInfo::Sink(
2704 ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job).into(),
2705 )),
2706 },
2707 PbObject {
2708 object_info: Some(PbObjectInfo::Source(
2709 ObjectModel(source, source_obj.unwrap(), None).into(),
2710 )),
2711 },
2712 PbObject {
2713 object_info: Some(PbObjectInfo::Table(
2714 ObjectModel(table, table_obj.unwrap(), table_streaming_job).into(),
2715 )),
2716 },
2717 ];
2718 let _version = self
2719 .notify_frontend(
2720 NotificationOperation::Update,
2721 NotificationInfo::ObjectGroup(PbObjectGroup {
2722 objects: relation_infos,
2723 dependencies: vec![],
2724 }),
2725 )
2726 .await;
2727
2728 Ok((props.into_iter().collect(), sink_id))
2729 }
2730
2731 pub async fn update_connection_and_dependent_objects_props(
2733 &self,
2734 connection_id: ConnectionId,
2735 alter_props: BTreeMap<String, String>,
2736 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2737 ) -> MetaResult<(
2738 WithOptionsSecResolved, Vec<(SourceId, HashMap<String, String>)>, Vec<(SinkId, HashMap<String, String>)>, )> {
2742 let inner = self.inner.read().await;
2743 let txn = inner.db.begin().await?;
2744
2745 let dependent_sources: Vec<SourceId> = Source::find()
2747 .select_only()
2748 .column(source::Column::SourceId)
2749 .filter(source::Column::ConnectionId.eq(connection_id))
2750 .into_tuple()
2751 .all(&txn)
2752 .await?;
2753
2754 let dependent_sinks: Vec<SinkId> = Sink::find()
2755 .select_only()
2756 .column(sink::Column::SinkId)
2757 .filter(sink::Column::ConnectionId.eq(connection_id))
2758 .into_tuple()
2759 .all(&txn)
2760 .await?;
2761
2762 let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2763 .find_also_related(Object)
2764 .one(&txn)
2765 .await?
2766 .ok_or_else(|| {
2767 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2768 })?;
2769
2770 let prop_keys: Vec<String> = alter_props
2772 .keys()
2773 .chain(alter_secret_refs.keys())
2774 .cloned()
2775 .collect();
2776
2777 let connection_type_str = pb_connection_type_to_connection_type(
2779 &connection_catalog.params.to_protobuf().connection_type(),
2780 )
2781 .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2782
2783 risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2784 connection_type_str, &prop_keys,
2785 )?;
2786
2787 let connection_pb = connection_catalog.params.to_protobuf();
2788 let mut connection_options_with_secret = WithOptionsSecResolved::new(
2789 connection_pb.properties.into_iter().collect(),
2790 connection_pb.secret_refs.into_iter().collect(),
2791 );
2792
2793 let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2794 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2795
2796 tracing::debug!(
2797 "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2798 connection_id,
2799 dependent_sources,
2800 dependent_sinks
2801 );
2802
2803 {
2805 let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2806 connection_type: connection_pb.connection_type,
2807 properties: connection_options_with_secret
2808 .as_plaintext()
2809 .clone()
2810 .into_iter()
2811 .collect(),
2812 secret_refs: connection_options_with_secret
2813 .as_secret()
2814 .clone()
2815 .into_iter()
2816 .collect(),
2817 };
2818 let connection = PbConnection {
2819 id: connection_id as _,
2820 info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2821 conn_params_pb,
2822 )),
2823 ..Default::default()
2824 };
2825 validate_connection(&connection).await?;
2826 }
2827
2828 if !to_add_secret_dep.is_empty() {
2830 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2831 object_dependency::ActiveModel {
2832 oid: Set(secret_id.into()),
2833 used_by: Set(connection_id.as_object_id()),
2834 ..Default::default()
2835 }
2836 }))
2837 .exec(&txn)
2838 .await?;
2839 }
2840 if !to_remove_secret_dep.is_empty() {
2841 let _ = ObjectDependency::delete_many()
2842 .filter(
2843 object_dependency::Column::Oid
2844 .is_in(to_remove_secret_dep)
2845 .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2846 )
2847 .exec(&txn)
2848 .await?;
2849 }
2850
2851 let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2853 connection_type: connection_pb.connection_type,
2854 properties: connection_options_with_secret
2855 .as_plaintext()
2856 .clone()
2857 .into_iter()
2858 .collect(),
2859 secret_refs: connection_options_with_secret
2860 .as_secret()
2861 .clone()
2862 .into_iter()
2863 .collect(),
2864 };
2865 let active_connection_model = connection::ActiveModel {
2866 connection_id: Set(connection_id),
2867 params: Set(ConnectionParams::from(&updated_connection_params)),
2868 ..Default::default()
2869 };
2870 Connection::update(active_connection_model)
2871 .exec(&txn)
2872 .await?;
2873
2874 let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2876
2877 if !dependent_sources.is_empty() {
2878 let sources_with_objs = Source::find()
2880 .find_also_related(Object)
2881 .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2882 .all(&txn)
2883 .await?;
2884
2885 let mut source_updates = Vec::new();
2887 let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2888
2889 for (source, _obj) in sources_with_objs {
2890 let source_id = source.source_id;
2891
2892 let mut source_options_with_secret = WithOptionsSecResolved::new(
2893 source.with_properties.0.clone(),
2894 source
2895 .secret_ref
2896 .clone()
2897 .map(|secret_ref| secret_ref.to_protobuf())
2898 .unwrap_or_default(),
2899 );
2900 let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2901 source_options_with_secret
2902 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2903
2904 let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2906
2907 let active_source = source::ActiveModel {
2909 source_id: Set(source_id),
2910 with_properties: Set(Property(
2911 source_options_with_secret.as_plaintext().clone(),
2912 )),
2913 secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2914 || {
2915 risingwave_meta_model::SecretRef::from(
2916 source_options_with_secret.as_secret().clone(),
2917 )
2918 },
2919 )),
2920 ..Default::default()
2921 };
2922 source_updates.push(active_source);
2923
2924 let is_shared_source = source.is_shared();
2929 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2930 if !is_shared_source {
2931 dep_source_job_ids = ObjectDependency::find()
2932 .select_only()
2933 .column(object_dependency::Column::UsedBy)
2934 .filter(object_dependency::Column::Oid.eq(source_id))
2935 .into_tuple()
2936 .all(&txn)
2937 .await?;
2938 }
2939
2940 let base_job_id =
2941 if let Some(associate_table_id) = source.optional_associated_table_id {
2942 associate_table_id.as_job_id()
2943 } else {
2944 source_id.as_share_source_job_id()
2945 };
2946 let job_ids = vec![base_job_id]
2947 .into_iter()
2948 .chain(dep_source_job_ids.into_iter())
2949 .collect_vec();
2950
2951 fragment_updates.push(DependentSourceFragmentUpdate {
2952 job_ids,
2953 with_properties: source_options_with_secret.as_plaintext().clone(),
2954 secret_refs: source_options_with_secret.as_secret().clone(),
2955 is_shared_source,
2956 });
2957
2958 let complete_source_props = LocalSecretManager::global()
2960 .fill_secrets(
2961 source_options_with_secret.as_plaintext().clone(),
2962 source_options_with_secret.as_secret().clone(),
2963 )
2964 .map_err(MetaError::from)?
2965 .into_iter()
2966 .collect::<HashMap<String, String>>();
2967 updated_sources_with_props.push((source_id, complete_source_props));
2968 }
2969
2970 for source_update in source_updates {
2971 Source::update(source_update).exec(&txn).await?;
2972 }
2973
2974 for DependentSourceFragmentUpdate {
2976 job_ids,
2977 with_properties,
2978 secret_refs,
2979 is_shared_source,
2980 } in fragment_updates
2981 {
2982 update_connector_props_fragments(
2983 &txn,
2984 job_ids,
2985 FragmentTypeFlag::Source,
2986 |node, found| {
2987 if let PbNodeBody::Source(node) = node
2988 && let Some(source_inner) = &mut node.source_inner
2989 {
2990 source_inner.with_properties = with_properties.clone();
2991 source_inner.secret_refs = secret_refs.clone();
2992 *found = true;
2993 }
2994 },
2995 is_shared_source,
2996 )
2997 .await?;
2998 }
2999 }
3000
3001 let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
3003
3004 if !dependent_sinks.is_empty() {
3005 let sinks_with_objs = Sink::find()
3007 .find_also_related(Object)
3008 .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
3009 .all(&txn)
3010 .await?;
3011
3012 let mut sink_updates = Vec::new();
3014 let mut sink_fragment_updates = Vec::new();
3015
3016 for (sink, _obj) in sinks_with_objs {
3017 let sink_id = sink.sink_id;
3018
3019 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3021 Some(connector) => {
3022 let connector_type = connector.to_lowercase();
3023 check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
3024 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3025
3026 match_sink_name_str!(
3027 connector_type.as_str(),
3028 SinkType,
3029 {
3030 let mut new_sink_props = sink.properties.0.clone();
3031 new_sink_props.extend(alter_props.clone());
3032 SinkType::validate_alter_config(&new_sink_props)
3033 },
3034 |sink: &str| Err(SinkError::Config(anyhow!(
3035 "unsupported sink type {}",
3036 sink
3037 )))
3038 )?
3039 }
3040 None => {
3041 return Err(SinkError::Config(anyhow!(
3042 "connector not specified when alter sink"
3043 ))
3044 .into());
3045 }
3046 };
3047
3048 let mut new_sink_props = sink.properties.0.clone();
3049 new_sink_props.extend(alter_props.clone());
3050
3051 let active_sink = sink::ActiveModel {
3053 sink_id: Set(sink_id),
3054 properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
3055 ..Default::default()
3056 };
3057 sink_updates.push(active_sink);
3058
3059 sink_fragment_updates.push((sink_id, new_sink_props.clone()));
3061
3062 let complete_sink_props: HashMap<String, String> =
3064 new_sink_props.into_iter().collect();
3065 updated_sinks_with_props.push((sink_id, complete_sink_props));
3066 }
3067
3068 for sink_update in sink_updates {
3070 Sink::update(sink_update).exec(&txn).await?;
3071 }
3072
3073 for (sink_id, new_sink_props) in sink_fragment_updates {
3075 update_connector_props_fragments(
3076 &txn,
3077 vec![sink_id.as_job_id()],
3078 FragmentTypeFlag::Sink,
3079 |node, found| {
3080 if let PbNodeBody::Sink(node) = node
3081 && let Some(sink_desc) = &mut node.sink_desc
3082 && sink_desc.id == sink_id.as_raw_id()
3083 {
3084 sink_desc.properties = new_sink_props.clone();
3085 *found = true;
3086 }
3087 },
3088 true,
3089 )
3090 .await?;
3091 }
3092 }
3093
3094 let mut updated_objects = Vec::new();
3096
3097 let (connection, obj) = Connection::find_by_id(connection_id)
3099 .find_also_related(Object)
3100 .one(&txn)
3101 .await?
3102 .ok_or_else(|| {
3103 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
3104 })?;
3105 updated_objects.push(PbObject {
3106 object_info: Some(PbObjectInfo::Connection(
3107 ObjectModel(connection, obj.unwrap(), None).into(),
3108 )),
3109 });
3110
3111 for source_id in &dependent_sources {
3113 let (source, obj) = Source::find_by_id(*source_id)
3114 .find_also_related(Object)
3115 .one(&txn)
3116 .await?
3117 .ok_or_else(|| {
3118 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
3119 })?;
3120 updated_objects.push(PbObject {
3121 object_info: Some(PbObjectInfo::Source(
3122 ObjectModel(source, obj.unwrap(), None).into(),
3123 )),
3124 });
3125 }
3126
3127 for sink_id in &dependent_sinks {
3129 let (sink, obj) = Sink::find_by_id(*sink_id)
3130 .find_also_related(Object)
3131 .one(&txn)
3132 .await?
3133 .ok_or_else(|| {
3134 MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
3135 })?;
3136 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
3137 .one(&txn)
3138 .await?;
3139 updated_objects.push(PbObject {
3140 object_info: Some(PbObjectInfo::Sink(
3141 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
3142 )),
3143 });
3144 }
3145
3146 txn.commit().await?;
3148
3149 if !updated_objects.is_empty() {
3151 self.notify_frontend(
3152 NotificationOperation::Update,
3153 NotificationInfo::ObjectGroup(PbObjectGroup {
3154 objects: updated_objects,
3155 dependencies: vec![],
3156 }),
3157 )
3158 .await;
3159 }
3160
3161 Ok((
3162 connection_options_with_secret,
3163 updated_sources_with_props,
3164 updated_sinks_with_props,
3165 ))
3166 }
3167
3168 pub async fn update_fragment_rate_limit_by_fragment_id(
3169 &self,
3170 fragment_id: FragmentId,
3171 rate_limit: Option<u32>,
3172 ) -> MetaResult<()> {
3173 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
3174 stream_node: &mut PbStreamNode| {
3175 let mut found = false;
3176 if fragment_type_mask.contains_any(
3177 FragmentTypeFlag::dml_rate_limit_fragments()
3178 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
3179 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
3180 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
3181 ) {
3182 visit_stream_node_mut(stream_node, |node| {
3183 if let PbNodeBody::Dml(node) = node {
3184 node.rate_limit = rate_limit;
3185 found = true;
3186 }
3187 if let PbNodeBody::Sink(node) = node {
3188 node.rate_limit = rate_limit;
3189 found = true;
3190 }
3191 if let PbNodeBody::StreamCdcScan(node) = node {
3192 node.rate_limit = rate_limit;
3193 found = true;
3194 }
3195 if let PbNodeBody::StreamScan(node) = node {
3196 node.rate_limit = rate_limit;
3197 found = true;
3198 }
3199 if let PbNodeBody::SourceBackfill(node) = node {
3200 node.rate_limit = rate_limit;
3201 found = true;
3202 }
3203 });
3204 }
3205 found
3206 };
3207 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
3208 .await
3209 }
3210
3211 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
3214 let inner = self.inner.read().await;
3215 let txn = inner.db.begin().await?;
3216
3217 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
3218 .select_only()
3219 .columns([
3220 fragment::Column::FragmentId,
3221 fragment::Column::JobId,
3222 fragment::Column::FragmentTypeMask,
3223 fragment::Column::StreamNode,
3224 ])
3225 .filter(FragmentTypeMask::intersects_any(
3226 FragmentTypeFlag::rate_limit_fragments(),
3227 ))
3228 .into_tuple()
3229 .all(&txn)
3230 .await?;
3231
3232 let mut rate_limits = Vec::new();
3233 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
3234 let stream_node = stream_node.to_protobuf();
3235 visit_stream_node_body(&stream_node, |node| {
3236 let mut rate_limit = None;
3237 let mut node_name = None;
3238
3239 match node {
3240 PbNodeBody::Source(node) => {
3242 if let Some(node_inner) = &node.source_inner {
3243 rate_limit = node_inner.rate_limit;
3244 node_name = Some("SOURCE");
3245 }
3246 }
3247 PbNodeBody::StreamFsFetch(node) => {
3248 if let Some(node_inner) = &node.node_inner {
3249 rate_limit = node_inner.rate_limit;
3250 node_name = Some("FS_FETCH");
3251 }
3252 }
3253 PbNodeBody::SourceBackfill(node) => {
3255 rate_limit = node.rate_limit;
3256 node_name = Some("SOURCE_BACKFILL");
3257 }
3258 PbNodeBody::StreamScan(node) => {
3259 rate_limit = node.rate_limit;
3260 node_name = Some("STREAM_SCAN");
3261 }
3262 PbNodeBody::StreamCdcScan(node) => {
3263 rate_limit = node.rate_limit;
3264 node_name = Some("STREAM_CDC_SCAN");
3265 }
3266 PbNodeBody::Sink(node) => {
3267 rate_limit = node.rate_limit;
3268 node_name = Some("SINK");
3269 }
3270 _ => {}
3271 }
3272
3273 if let Some(rate_limit) = rate_limit {
3274 rate_limits.push(RateLimitInfo {
3275 fragment_id,
3276 job_id,
3277 fragment_type_mask: fragment_type_mask as u32,
3278 rate_limit,
3279 node_name: node_name.unwrap().to_owned(),
3280 });
3281 }
3282 });
3283 }
3284
3285 Ok(rate_limits)
3286 }
3287}
3288
3289fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
3290 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3292 Some(connector) => {
3293 let connector_type = connector.to_lowercase();
3294 let field_names: Vec<String> = props.keys().cloned().collect();
3295 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
3296 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3297
3298 match_sink_name_str!(
3299 connector_type.as_str(),
3300 SinkType,
3301 {
3302 let mut new_props = sink.properties.0.clone();
3303 new_props.extend(props.clone());
3304 SinkType::validate_alter_config(&new_props)
3305 },
3306 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
3307 )?
3308 }
3309 None => {
3310 return Err(
3311 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
3312 );
3313 }
3314 };
3315 Ok(())
3316}
3317
3318fn update_stmt_with_props(
3319 with_properties: &mut Vec<SqlOption>,
3320 props: &BTreeMap<String, String>,
3321) -> MetaResult<()> {
3322 let mut new_sql_options = with_properties
3323 .iter()
3324 .map(|sql_option| (&sql_option.name, sql_option))
3325 .collect::<IndexMap<_, _>>();
3326 let add_sql_options = props
3327 .iter()
3328 .map(|(k, v)| SqlOption::try_from((k, v)))
3329 .collect::<Result<Vec<SqlOption>, ParserError>>()
3330 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3331 new_sql_options.extend(
3332 add_sql_options
3333 .iter()
3334 .map(|sql_option| (&sql_option.name, sql_option)),
3335 );
3336 *with_properties = new_sql_options.into_values().cloned().collect();
3337 Ok(())
3338}
3339
3340async fn update_sink_fragment_props(
3341 txn: &DatabaseTransaction,
3342 sink_id: SinkId,
3343 props: BTreeMap<String, String>,
3344) -> MetaResult<()> {
3345 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3346 .select_only()
3347 .columns([
3348 fragment::Column::FragmentId,
3349 fragment::Column::FragmentTypeMask,
3350 fragment::Column::StreamNode,
3351 ])
3352 .filter(fragment::Column::JobId.eq(sink_id))
3353 .into_tuple()
3354 .all(txn)
3355 .await?;
3356 let fragments = fragments
3357 .into_iter()
3358 .filter(|(_, fragment_type_mask, _)| {
3359 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3360 })
3361 .filter_map(|(id, _, stream_node)| {
3362 let mut stream_node = stream_node.to_protobuf();
3363 let mut found = false;
3364 visit_stream_node_mut(&mut stream_node, |node| {
3365 if let PbNodeBody::Sink(node) = node
3366 && let Some(sink_desc) = &mut node.sink_desc
3367 && sink_desc.id == sink_id
3368 {
3369 sink_desc.properties.extend(props.clone());
3370 found = true;
3371 }
3372 });
3373 if found { Some((id, stream_node)) } else { None }
3374 })
3375 .collect_vec();
3376 assert!(
3377 !fragments.is_empty(),
3378 "sink id should be used by at least one fragment"
3379 );
3380 for (id, stream_node) in fragments {
3381 Fragment::update(fragment::ActiveModel {
3382 fragment_id: Set(id),
3383 stream_node: Set(StreamNode::from(&stream_node)),
3384 ..Default::default()
3385 })
3386 .exec(txn)
3387 .await?;
3388 }
3389 Ok(())
3390}
3391
3392pub struct SinkIntoTableContext {
3393 pub updated_sink_catalogs: Vec<SinkId>,
3396}
3397
3398pub struct FinishAutoRefreshSchemaSinkContext {
3399 pub tmp_sink_id: SinkId,
3400 pub original_sink_id: SinkId,
3401 pub columns: Vec<PbColumnCatalog>,
3402 pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3403}
3404
3405async fn update_connector_props_fragments<F>(
3406 txn: &DatabaseTransaction,
3407 job_ids: Vec<JobId>,
3408 expect_flag: FragmentTypeFlag,
3409 mut alter_stream_node_fn: F,
3410 is_shared_source: bool,
3411) -> MetaResult<()>
3412where
3413 F: FnMut(&mut PbNodeBody, &mut bool),
3414{
3415 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3416 .select_only()
3417 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3418 .filter(
3419 fragment::Column::JobId
3420 .is_in(job_ids.clone())
3421 .and(FragmentTypeMask::intersects(expect_flag)),
3422 )
3423 .into_tuple()
3424 .all(txn)
3425 .await?;
3426 let fragments = fragments
3427 .into_iter()
3428 .filter_map(|(id, stream_node)| {
3429 let mut stream_node = stream_node.to_protobuf();
3430 let mut found = false;
3431 visit_stream_node_mut(&mut stream_node, |node| {
3432 alter_stream_node_fn(node, &mut found);
3433 });
3434 if found { Some((id, stream_node)) } else { None }
3435 })
3436 .collect_vec();
3437 if is_shared_source || job_ids.len() > 1 {
3438 assert!(
3442 !fragments.is_empty(),
3443 "job ids {:?} (type: {:?}) should be used by at least one fragment",
3444 job_ids,
3445 expect_flag
3446 );
3447 }
3448
3449 for (id, stream_node) in fragments {
3450 Fragment::update(fragment::ActiveModel {
3451 fragment_id: Set(id),
3452 stream_node: Set(StreamNode::from(&stream_node)),
3453 ..Default::default()
3454 })
3455 .exec(txn)
3456 .await?;
3457 }
3458
3459 Ok(())
3460}