1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::secret::LocalSecretManager;
26use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_common::util::stream_graph_visitor::{
29 visit_stream_node_body, visit_stream_node_mut,
30};
31use risingwave_common::{bail, current_cluster_version};
32use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
33use risingwave_connector::connector_common::validate_connection;
34use risingwave_connector::error::ConnectorError;
35use risingwave_connector::sink::file_sink::fs::FsSink;
36use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
37use risingwave_connector::source::{ConnectorProperties, pb_connection_type_to_connection_type};
38use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
41use risingwave_meta_model::refresh_job::RefreshState;
42use risingwave_meta_model::streaming_job::BackfillOrders;
43use risingwave_meta_model::table::TableType;
44use risingwave_meta_model::user_privilege::Action;
45use risingwave_meta_model::*;
46use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
47use risingwave_pb::ddl_service::streaming_job_resource_type;
48use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
49use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
50use risingwave_pb::meta::object::PbObjectInfo;
51use risingwave_pb::meta::subscribe_response::{
52 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
53};
54use risingwave_pb::meta::{PbObject, PbObjectGroup};
55use risingwave_pb::plan_common::PbColumnCatalog;
56use risingwave_pb::plan_common::source_refresh_mode::{
57 RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
58};
59use risingwave_pb::secret::PbSecretRef;
60use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
61use risingwave_pb::stream_plan::stream_node::PbNodeBody;
62use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
63use risingwave_pb::user::PbUserInfo;
64use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
65use risingwave_sqlparser::parser::{Parser, ParserError};
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
70 NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
71};
72use thiserror_ext::AsReport;
73
74use super::rename::IndexItemRewriter;
75use crate::barrier::Command;
76use crate::controller::ObjectModel;
77use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
78use crate::controller::fragment::FragmentTypeMaskExt;
79use crate::controller::utils::{
80 PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
81 check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
82 ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
83 get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
84 list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
85};
86use crate::error::MetaErrorInner;
87use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
88use crate::model::{
89 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
90 StreamJobFragmentsToCreate,
91};
92use crate::stream::SplitAssignment;
93use crate::{MetaError, MetaResult};
94
95#[derive(Debug)]
100struct DependentSourceFragmentUpdate {
101 job_ids: Vec<JobId>,
102 with_properties: BTreeMap<String, String>,
103 secret_refs: BTreeMap<String, PbSecretRef>,
104 is_shared_source: bool,
105}
106
107impl CatalogController {
108 #[allow(clippy::too_many_arguments)]
109 pub async fn create_streaming_job_obj(
110 txn: &DatabaseTransaction,
111 obj_type: ObjectType,
112 owner_id: UserId,
113 database_id: Option<DatabaseId>,
114 schema_id: Option<SchemaId>,
115 create_type: PbCreateType,
116 ctx: StreamContext,
117 streaming_parallelism: StreamingParallelism,
118 max_parallelism: usize,
119 resource_type: streaming_job_resource_type::ResourceType,
120 backfill_parallelism: Option<StreamingParallelism>,
121 ) -> MetaResult<JobId> {
122 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
123 let job_id = obj.oid.as_job_id();
124 let specific_resource_group = resource_type.resource_group();
125 let is_serverless_backfill = matches!(
126 &resource_type,
127 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
128 );
129 let job = streaming_job::ActiveModel {
130 job_id: Set(job_id),
131 job_status: Set(JobStatus::Initial),
132 create_type: Set(create_type.into()),
133 timezone: Set(ctx.timezone),
134 config_override: Set(Some(ctx.config_override.to_string())),
135 adaptive_parallelism_strategy: Set(ctx
136 .adaptive_parallelism_strategy
137 .as_ref()
138 .map(ToString::to_string)),
139 parallelism: Set(streaming_parallelism),
140 backfill_parallelism: Set(backfill_parallelism),
141 backfill_orders: Set(None),
142 max_parallelism: Set(max_parallelism as _),
143 specific_resource_group: Set(specific_resource_group),
144 is_serverless_backfill: Set(is_serverless_backfill),
145 };
146 StreamingJobModel::insert(job).exec(txn).await?;
147
148 Ok(job_id)
149 }
150
151 #[await_tree::instrument]
157 pub async fn create_job_catalog(
158 &self,
159 streaming_job: &mut StreamingJob,
160 ctx: &StreamContext,
161 parallelism: &Option<Parallelism>,
162 max_parallelism: usize,
163 mut dependencies: HashSet<ObjectId>,
164 resource_type: streaming_job_resource_type::ResourceType,
165 backfill_parallelism: &Option<Parallelism>,
166 ) -> MetaResult<()> {
167 let inner = self.inner.write().await;
168 let txn = inner.db.begin().await?;
169 let create_type = streaming_job.create_type();
170
171 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
172 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
173 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
174 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
175 };
176 let backfill_parallelism = backfill_parallelism
177 .as_ref()
178 .map(|p| StreamingParallelism::Fixed(p.parallelism as _));
179
180 ensure_user_id(streaming_job.owner() as _, &txn).await?;
181 ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
182 ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
183 check_relation_name_duplicate(
184 &streaming_job.name(),
185 streaming_job.database_id(),
186 streaming_job.schema_id(),
187 &txn,
188 )
189 .await?;
190
191 if !dependencies.is_empty() {
193 let altering_cnt = ObjectDependency::find()
194 .join(
195 JoinType::InnerJoin,
196 object_dependency::Relation::Object1.def(),
197 )
198 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
199 .filter(
200 object_dependency::Column::Oid
201 .is_in(dependencies.clone())
202 .and(object::Column::ObjType.eq(ObjectType::Table))
203 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
204 .and(
205 object::Column::Oid.not_in_subquery(
207 Query::select()
208 .column(table::Column::TableId)
209 .from(Table)
210 .to_owned(),
211 ),
212 ),
213 )
214 .count(&txn)
215 .await?;
216 if altering_cnt != 0 {
217 return Err(MetaError::permission_denied(
218 "some dependent relations are being altered",
219 ));
220 }
221 }
222
223 match streaming_job {
224 StreamingJob::MaterializedView(table) => {
225 let job_id = Self::create_streaming_job_obj(
226 &txn,
227 ObjectType::Table,
228 table.owner as _,
229 Some(table.database_id),
230 Some(table.schema_id),
231 create_type,
232 ctx.clone(),
233 streaming_parallelism,
234 max_parallelism,
235 resource_type,
236 backfill_parallelism.clone(),
237 )
238 .await?;
239 table.id = job_id.as_mv_table_id();
240 let table_model: table::ActiveModel = table.clone().into();
241 Table::insert(table_model).exec(&txn).await?;
242 }
243 StreamingJob::Sink(sink) => {
244 if let Some(target_table_id) = sink.target_table
245 && check_sink_into_table_cycle(
246 target_table_id.into(),
247 dependencies.iter().cloned().collect(),
248 &txn,
249 )
250 .await?
251 {
252 bail!("Creating such a sink will result in circular dependency.");
253 }
254
255 let job_id = Self::create_streaming_job_obj(
256 &txn,
257 ObjectType::Sink,
258 sink.owner as _,
259 Some(sink.database_id),
260 Some(sink.schema_id),
261 create_type,
262 ctx.clone(),
263 streaming_parallelism,
264 max_parallelism,
265 resource_type,
266 backfill_parallelism.clone(),
267 )
268 .await?;
269 sink.id = job_id.as_sink_id();
270 let sink_model: sink::ActiveModel = sink.clone().into();
271 Sink::insert(sink_model).exec(&txn).await?;
272 }
273 StreamingJob::Table(src, table, _) => {
274 let job_id = Self::create_streaming_job_obj(
275 &txn,
276 ObjectType::Table,
277 table.owner as _,
278 Some(table.database_id),
279 Some(table.schema_id),
280 create_type,
281 ctx.clone(),
282 streaming_parallelism,
283 max_parallelism,
284 resource_type,
285 backfill_parallelism.clone(),
286 )
287 .await?;
288 table.id = job_id.as_mv_table_id();
289 if let Some(src) = src {
290 let src_obj = Self::create_object(
291 &txn,
292 ObjectType::Source,
293 src.owner as _,
294 Some(src.database_id),
295 Some(src.schema_id),
296 )
297 .await?;
298 src.id = src_obj.oid.as_source_id();
299 src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
300 table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
301 let source: source::ActiveModel = src.clone().into();
302 Source::insert(source).exec(&txn).await?;
303 }
304 let table_model: table::ActiveModel = table.clone().into();
305 Table::insert(table_model).exec(&txn).await?;
306
307 if table.refreshable {
308 let trigger_interval_secs = src
309 .as_ref()
310 .and_then(|source_catalog| source_catalog.refresh_mode)
311 .and_then(
312 |source_refresh_mode| match source_refresh_mode.refresh_mode {
313 Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
314 refresh_interval_sec,
315 })) => refresh_interval_sec,
316 Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
317 None => None,
318 },
319 );
320
321 RefreshJob::insert(refresh_job::ActiveModel {
322 table_id: Set(table.id),
323 last_trigger_time: Set(None),
324 trigger_interval_secs: Set(trigger_interval_secs),
325 current_status: Set(RefreshState::Idle),
326 last_success_time: Set(None),
327 })
328 .exec(&txn)
329 .await?;
330 }
331 }
332 StreamingJob::Index(index, table) => {
333 ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
334 let job_id = Self::create_streaming_job_obj(
335 &txn,
336 ObjectType::Index,
337 index.owner as _,
338 Some(index.database_id),
339 Some(index.schema_id),
340 create_type,
341 ctx.clone(),
342 streaming_parallelism,
343 max_parallelism,
344 resource_type,
345 backfill_parallelism.clone(),
346 )
347 .await?;
348 index.id = job_id.as_index_id();
350 index.index_table_id = job_id.as_mv_table_id();
351 table.id = job_id.as_mv_table_id();
352
353 ObjectDependency::insert(object_dependency::ActiveModel {
354 oid: Set(index.primary_table_id.into()),
355 used_by: Set(table.id.into()),
356 ..Default::default()
357 })
358 .exec(&txn)
359 .await?;
360
361 let table_model: table::ActiveModel = table.clone().into();
362 Table::insert(table_model).exec(&txn).await?;
363 let index_model: index::ActiveModel = index.clone().into();
364 Index::insert(index_model).exec(&txn).await?;
365 }
366 StreamingJob::Source(src) => {
367 let job_id = Self::create_streaming_job_obj(
368 &txn,
369 ObjectType::Source,
370 src.owner as _,
371 Some(src.database_id),
372 Some(src.schema_id),
373 create_type,
374 ctx.clone(),
375 streaming_parallelism,
376 max_parallelism,
377 resource_type,
378 backfill_parallelism.clone(),
379 )
380 .await?;
381 src.id = job_id.as_shared_source_id();
382 let source_model: source::ActiveModel = src.clone().into();
383 Source::insert(source_model).exec(&txn).await?;
384 }
385 }
386
387 dependencies.extend(
389 streaming_job
390 .dependent_secret_ids()?
391 .into_iter()
392 .map(|id| id.as_object_id()),
393 );
394 dependencies.extend(
396 streaming_job
397 .dependent_connection_ids()?
398 .into_iter()
399 .map(|id| id.as_object_id()),
400 );
401
402 if !dependencies.is_empty() {
404 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
405 object_dependency::ActiveModel {
406 oid: Set(oid),
407 used_by: Set(streaming_job.id().as_object_id()),
408 ..Default::default()
409 }
410 }))
411 .exec(&txn)
412 .await?;
413 }
414
415 txn.commit().await?;
416
417 Ok(())
418 }
419
420 pub async fn create_internal_table_catalog(
428 &self,
429 job: &StreamingJob,
430 mut incomplete_internal_tables: Vec<PbTable>,
431 ) -> MetaResult<HashMap<TableId, TableId>> {
432 let job_id = job.id();
433 let inner = self.inner.write().await;
434 let txn = inner.db.begin().await?;
435
436 ensure_job_not_canceled(job_id, &txn).await?;
438
439 let mut table_id_map = HashMap::new();
440 for table in &mut incomplete_internal_tables {
441 let table_id = Self::create_object(
442 &txn,
443 ObjectType::Table,
444 table.owner as _,
445 Some(table.database_id),
446 Some(table.schema_id),
447 )
448 .await?
449 .oid
450 .as_table_id();
451 table_id_map.insert(table.id, table_id);
452 table.id = table_id;
453 table.job_id = Some(job_id);
454
455 let table_model = table::ActiveModel {
456 table_id: Set(table_id),
457 belongs_to_job_id: Set(Some(job_id)),
458 fragment_id: NotSet,
459 ..table.clone().into()
460 };
461 Table::insert(table_model).exec(&txn).await?;
462 }
463 txn.commit().await?;
464
465 Ok(table_id_map)
466 }
467
468 pub async fn prepare_stream_job_fragments(
469 &self,
470 stream_job_fragments: &StreamJobFragmentsToCreate,
471 streaming_job: &StreamingJob,
472 for_replace: bool,
473 backfill_orders: Option<BackfillOrders>,
474 ) -> MetaResult<()> {
475 self.prepare_streaming_job(
476 stream_job_fragments.stream_job_id(),
477 || stream_job_fragments.fragments.values(),
478 &stream_job_fragments.downstreams,
479 for_replace,
480 Some(streaming_job),
481 backfill_orders,
482 )
483 .await
484 }
485
486 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
491 )]
492 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
493 &self,
494 job_id: JobId,
495 get_fragments: impl Fn() -> I + 'a,
496 downstreams: &FragmentDownstreamRelation,
497 for_replace: bool,
498 creating_streaming_job: Option<&'a StreamingJob>,
499 backfill_orders: Option<BackfillOrders>,
500 ) -> MetaResult<()> {
501 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
502
503 let inner = self.inner.write().await;
504
505 let need_notify = creating_streaming_job
506 .map(|job| job.should_notify_creating())
507 .unwrap_or(false);
508 let definition = creating_streaming_job.map(|job| job.definition());
509
510 let mut objects_to_notify = vec![];
511 let txn = inner.db.begin().await?;
512
513 ensure_job_not_canceled(job_id, &txn).await?;
515
516 if let Some(backfill_orders) = backfill_orders {
517 let job = streaming_job::ActiveModel {
518 job_id: Set(job_id),
519 backfill_orders: Set(Some(backfill_orders)),
520 ..Default::default()
521 };
522 StreamingJobModel::update(job).exec(&txn).await?;
523 }
524
525 let state_table_ids = fragments
526 .iter()
527 .flat_map(|fragment| fragment.state_table_ids.inner_ref().clone())
528 .collect_vec();
529
530 if !fragments.is_empty() {
531 let fragment_models = fragments
532 .into_iter()
533 .map(|fragment| fragment.into_active_model())
534 .collect_vec();
535 Fragment::insert_many(fragment_models).exec(&txn).await?;
536 }
537
538 if !for_replace {
541 let all_tables = StreamJobFragments::collect_tables(get_fragments());
542 for state_table_id in state_table_ids {
543 let table = all_tables
547 .get(&state_table_id)
548 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
549 assert_eq!(table.id, state_table_id);
550 let vnode_count = table.vnode_count();
551
552 Table::update(table::ActiveModel {
553 table_id: Set(state_table_id as _),
554 fragment_id: Set(Some(table.fragment_id)),
555 vnode_count: Set(vnode_count as _),
556 ..Default::default()
557 })
558 .exec(&txn)
559 .await?;
560
561 if need_notify {
562 let mut table = table.clone();
563 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
565 table.definition = definition.clone().unwrap();
566 }
567 objects_to_notify.push(PbObject {
568 object_info: Some(PbObjectInfo::Table(table)),
569 });
570 }
571 }
572 }
573
574 if need_notify {
576 match creating_streaming_job.unwrap() {
577 StreamingJob::Table(Some(source), ..) => {
578 objects_to_notify.push(PbObject {
579 object_info: Some(PbObjectInfo::Source(source.clone())),
580 });
581 }
582 StreamingJob::Sink(sink) => {
583 objects_to_notify.push(PbObject {
584 object_info: Some(PbObjectInfo::Sink(sink.clone())),
585 });
586 }
587 StreamingJob::Index(index, _) => {
588 objects_to_notify.push(PbObject {
589 object_info: Some(PbObjectInfo::Index(index.clone())),
590 });
591 }
592 _ => {}
593 }
594 }
595
596 insert_fragment_relations(&txn, downstreams).await?;
597
598 if !for_replace {
599 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
601 Table::update(table::ActiveModel {
602 table_id: Set(table.id),
603 dml_fragment_id: Set(table.dml_fragment_id),
604 ..Default::default()
605 })
606 .exec(&txn)
607 .await?;
608 }
609 }
610
611 txn.commit().await?;
612
613 if !objects_to_notify.is_empty() {
617 self.notify_frontend(
618 Operation::Add,
619 Info::ObjectGroup(PbObjectGroup {
620 objects: objects_to_notify,
621 }),
622 )
623 .await;
624 }
625
626 Ok(())
627 }
628
629 pub async fn build_cancel_command(
632 &self,
633 table_fragments: &StreamJobFragments,
634 ) -> MetaResult<Command> {
635 let inner = self.inner.read().await;
636 let txn = inner.db.begin().await?;
637
638 let dropped_sink_fragment_with_target =
639 if let Some(sink_fragment) = table_fragments.sink_fragment() {
640 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
641 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
642 sink_target_fragment
643 .get(&sink_fragment_id)
644 .map(|target_fragments| {
645 let target_fragment_id = *target_fragments
646 .first()
647 .expect("sink should have at least one downstream fragment");
648 (sink_fragment_id, target_fragment_id)
649 })
650 } else {
651 None
652 };
653
654 Ok(Command::DropStreamingJobs {
655 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
656 actors: table_fragments.actor_ids().collect(),
657 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
658 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
659 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
660 .into_iter()
661 .map(|(sink, target)| (target as _, vec![sink as _]))
662 .collect(),
663 })
664 }
665
666 #[await_tree::instrument]
670 pub async fn try_abort_creating_streaming_job(
671 &self,
672 mut job_id: JobId,
673 is_cancelled: bool,
674 ) -> MetaResult<(bool, Option<DatabaseId>)> {
675 let mut inner = self.inner.write().await;
676 let txn = inner.db.begin().await?;
677
678 let obj = Object::find_by_id(job_id).one(&txn).await?;
679 let Some(obj) = obj else {
680 tracing::warn!(
681 id = %job_id,
682 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
683 );
684 return Ok((true, None));
685 };
686 let database_id = obj
687 .database_id
688 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
689 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
690
691 if !is_cancelled && let Some(streaming_job) = &streaming_job {
692 assert_ne!(streaming_job.job_status, JobStatus::Created);
693 if streaming_job.create_type == CreateType::Background
694 && streaming_job.job_status == JobStatus::Creating
695 {
696 if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
697 && check_if_belongs_to_iceberg_table(&txn, job_id).await?
698 {
699 } else {
701 tracing::warn!(
703 id = %job_id,
704 "streaming job is created in background and still in creating status"
705 );
706 return Ok((false, Some(database_id)));
707 }
708 }
709 }
710
711 let original_job_id = job_id;
713 let original_obj_type = obj.obj_type;
714
715 let iceberg_table_id =
716 try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
717 if let Some(iceberg_table_id) = iceberg_table_id {
718 let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
721 Object::delete_many()
722 .filter(
723 object::Column::Oid
724 .eq(job_id)
725 .or(object::Column::Oid.is_in(internal_tables)),
726 )
727 .exec(&txn)
728 .await?;
729 job_id = iceberg_table_id.as_job_id();
730 };
731
732 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
733
734 let mut objs = vec![];
736 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
737
738 let mut need_notify =
739 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
740 if !need_notify {
741 if let Some(table) = &table_obj {
742 need_notify = table.table_type == TableType::MaterializedView;
743 } else if original_obj_type == ObjectType::Sink {
744 need_notify = true;
745 }
746 }
747
748 if is_cancelled {
749 let dropped_tables = Table::find()
750 .find_also_related(Object)
751 .filter(
752 table::Column::TableId.is_in(
753 internal_table_ids
754 .iter()
755 .cloned()
756 .chain(table_obj.iter().map(|t| t.table_id as _)),
757 ),
758 )
759 .all(&txn)
760 .await?
761 .into_iter()
762 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
763 inner
764 .dropped_tables
765 .extend(dropped_tables.map(|t| (t.id, t)));
766 }
767
768 if need_notify {
769 if original_obj_type == ObjectType::Sink && original_job_id != job_id {
772 let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
773 .select_only()
774 .columns([
775 object::Column::Oid,
776 object::Column::ObjType,
777 object::Column::SchemaId,
778 object::Column::DatabaseId,
779 ])
780 .into_partial_model()
781 .one(&txn)
782 .await?;
783 if let Some(orig_obj) = orig_obj {
784 objs.push(orig_obj);
785 }
786 }
787
788 let obj: Option<PartialObject> = Object::find_by_id(job_id)
789 .select_only()
790 .columns([
791 object::Column::Oid,
792 object::Column::ObjType,
793 object::Column::SchemaId,
794 object::Column::DatabaseId,
795 ])
796 .into_partial_model()
797 .one(&txn)
798 .await?;
799 let obj =
800 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
801 objs.push(obj);
802 let internal_table_objs: Vec<PartialObject> = Object::find()
803 .select_only()
804 .columns([
805 object::Column::Oid,
806 object::Column::ObjType,
807 object::Column::SchemaId,
808 object::Column::DatabaseId,
809 ])
810 .join(JoinType::InnerJoin, object::Relation::Table.def())
811 .filter(table::Column::BelongsToJobId.eq(job_id))
812 .into_partial_model()
813 .all(&txn)
814 .await?;
815 objs.extend(internal_table_objs);
816 }
817
818 if table_obj.is_none()
820 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
821 .select_only()
822 .column(sink::Column::TargetTable)
823 .into_tuple::<Option<TableId>>()
824 .one(&txn)
825 .await?
826 {
827 let tmp_id: Option<ObjectId> = ObjectDependency::find()
828 .select_only()
829 .column(object_dependency::Column::UsedBy)
830 .join(
831 JoinType::InnerJoin,
832 object_dependency::Relation::Object1.def(),
833 )
834 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
835 .filter(
836 object_dependency::Column::Oid
837 .eq(target_table_id)
838 .and(object::Column::ObjType.eq(ObjectType::Table))
839 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
840 )
841 .into_tuple()
842 .one(&txn)
843 .await?;
844 if let Some(tmp_id) = tmp_id {
845 tracing::warn!(
846 id = %tmp_id,
847 "aborting temp streaming job for sink into table"
848 );
849
850 Object::delete_by_id(tmp_id).exec(&txn).await?;
851 }
852 }
853
854 Object::delete_by_id(job_id).exec(&txn).await?;
855 if !internal_table_ids.is_empty() {
856 Object::delete_many()
857 .filter(object::Column::Oid.is_in(internal_table_ids))
858 .exec(&txn)
859 .await?;
860 }
861 if let Some(t) = &table_obj
862 && let Some(source_id) = t.optional_associated_source_id
863 {
864 Object::delete_by_id(source_id).exec(&txn).await?;
865 }
866
867 let err = if is_cancelled {
868 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
869 } else {
870 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
871 };
872 let abort_reason = format!("streaming job aborted {}", err.as_report());
873 for tx in inner
874 .creating_table_finish_notifier
875 .get_mut(&database_id)
876 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
877 .into_iter()
878 .flatten()
879 .flatten()
880 {
881 let _ = tx.send(Err(abort_reason.clone()));
882 }
883 txn.commit().await?;
884
885 if !objs.is_empty() {
886 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
889 .await;
890 }
891 Ok((true, Some(database_id)))
892 }
893
894 #[await_tree::instrument]
895 pub async fn post_collect_job_fragments(
896 &self,
897 job_id: JobId,
898 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
899 new_sink_downstream: Option<FragmentDownstreamRelation>,
900 split_assignment: Option<&SplitAssignment>,
901 ) -> MetaResult<()> {
902 let inner = self.inner.write().await;
903 let txn = inner.db.begin().await?;
904
905 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
906
907 if let Some(new_downstream) = new_sink_downstream {
908 insert_fragment_relations(&txn, &new_downstream).await?;
909 }
910
911 StreamingJobModel::update(streaming_job::ActiveModel {
913 job_id: Set(job_id),
914 job_status: Set(JobStatus::Creating),
915 ..Default::default()
916 })
917 .exec(&txn)
918 .await?;
919
920 if let Some(split_assignment) = split_assignment {
921 let fragment_splits = split_assignment
922 .iter()
923 .map(|(fragment_id, splits)| {
924 (
925 *fragment_id as _,
926 splits.values().flatten().cloned().collect_vec(),
927 )
928 })
929 .collect();
930
931 self.update_fragment_splits(&txn, &fragment_splits).await?;
932 }
933
934 txn.commit().await?;
935
936 Ok(())
937 }
938
939 pub async fn create_job_catalog_for_replace(
940 &self,
941 streaming_job: &StreamingJob,
942 ctx: Option<&StreamContext>,
943 specified_parallelism: Option<&NonZeroUsize>,
944 expected_original_max_parallelism: Option<usize>,
945 ) -> MetaResult<JobId> {
946 let id = streaming_job.id();
947 let inner = self.inner.write().await;
948 let txn = inner.db.begin().await?;
949
950 streaming_job.verify_version_for_replace(&txn).await?;
952 let referring_cnt = ObjectDependency::find()
954 .join(
955 JoinType::InnerJoin,
956 object_dependency::Relation::Object1.def(),
957 )
958 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
959 .filter(
960 object_dependency::Column::Oid
961 .eq(id)
962 .and(object::Column::ObjType.eq(ObjectType::Table))
963 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
964 )
965 .count(&txn)
966 .await?;
967 if referring_cnt != 0 {
968 return Err(MetaError::permission_denied(
969 "job is being altered or referenced by some creating jobs",
970 ));
971 }
972
973 let (
975 original_max_parallelism,
976 original_timezone,
977 original_config_override,
978 original_adaptive_strategy,
979 ): (i32, Option<String>, Option<String>, Option<String>) =
980 StreamingJobModel::find_by_id(id)
981 .select_only()
982 .column(streaming_job::Column::MaxParallelism)
983 .column(streaming_job::Column::Timezone)
984 .column(streaming_job::Column::ConfigOverride)
985 .column(streaming_job::Column::AdaptiveParallelismStrategy)
986 .into_tuple()
987 .one(&txn)
988 .await?
989 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
990
991 if let Some(max_parallelism) = expected_original_max_parallelism
992 && original_max_parallelism != max_parallelism as i32
993 {
994 bail!(
997 "cannot use a different max parallelism \
998 when replacing streaming job, \
999 original: {}, new: {}",
1000 original_max_parallelism,
1001 max_parallelism
1002 );
1003 }
1004
1005 let parallelism = match specified_parallelism {
1006 None => StreamingParallelism::Adaptive,
1007 Some(n) => StreamingParallelism::Fixed(n.get() as _),
1008 };
1009
1010 let ctx = StreamContext {
1011 timezone: ctx
1012 .map(|ctx| ctx.timezone.clone())
1013 .unwrap_or(original_timezone),
1014 config_override: original_config_override.unwrap_or_default().into(),
1017 adaptive_parallelism_strategy: original_adaptive_strategy.as_deref().map(|s| {
1018 parse_strategy(s).expect("strategy should be validated before persisting")
1019 }),
1020 };
1021
1022 let new_obj_id = Self::create_streaming_job_obj(
1024 &txn,
1025 streaming_job.object_type(),
1026 streaming_job.owner() as _,
1027 Some(streaming_job.database_id() as _),
1028 Some(streaming_job.schema_id() as _),
1029 streaming_job.create_type(),
1030 ctx,
1031 parallelism,
1032 original_max_parallelism as _,
1033 streaming_job_resource_type::ResourceType::Regular(true),
1034 None,
1035 )
1036 .await?;
1037
1038 ObjectDependency::insert(object_dependency::ActiveModel {
1040 oid: Set(id.as_object_id()),
1041 used_by: Set(new_obj_id.as_object_id()),
1042 ..Default::default()
1043 })
1044 .exec(&txn)
1045 .await?;
1046
1047 txn.commit().await?;
1048
1049 Ok(new_obj_id)
1050 }
1051
1052 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1054 let mut inner = self.inner.write().await;
1055 let txn = inner.db.begin().await?;
1056
1057 if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1059 tracing::info!(
1060 "streaming job {} is for iceberg table, wait for manual finish operation",
1061 job_id
1062 );
1063 return Ok(());
1064 }
1065
1066 let (notification_op, objects, updated_user_info) =
1067 Self::finish_streaming_job_inner(&txn, job_id).await?;
1068
1069 txn.commit().await?;
1070
1071 let mut version = self
1072 .notify_frontend(
1073 notification_op,
1074 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1075 )
1076 .await;
1077
1078 if !updated_user_info.is_empty() {
1080 version = self.notify_users_update(updated_user_info).await;
1081 }
1082
1083 inner
1084 .creating_table_finish_notifier
1085 .values_mut()
1086 .for_each(|creating_tables| {
1087 if let Some(txs) = creating_tables.remove(&job_id) {
1088 for tx in txs {
1089 let _ = tx.send(Ok(version));
1090 }
1091 }
1092 });
1093
1094 Ok(())
1095 }
1096
1097 pub async fn finish_streaming_job_inner(
1099 txn: &DatabaseTransaction,
1100 job_id: JobId,
1101 ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1102 let job_type = Object::find_by_id(job_id)
1103 .select_only()
1104 .column(object::Column::ObjType)
1105 .into_tuple()
1106 .one(txn)
1107 .await?
1108 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1109
1110 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1111 .select_only()
1112 .column(streaming_job::Column::CreateType)
1113 .into_tuple()
1114 .one(txn)
1115 .await?
1116 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1117
1118 let res = Object::update_many()
1120 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1121 .col_expr(
1122 object::Column::CreatedAtClusterVersion,
1123 current_cluster_version().into(),
1124 )
1125 .filter(object::Column::Oid.eq(job_id))
1126 .exec(txn)
1127 .await?;
1128 if res.rows_affected == 0 {
1129 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1130 }
1131
1132 let job = streaming_job::ActiveModel {
1134 job_id: Set(job_id),
1135 job_status: Set(JobStatus::Created),
1136 ..Default::default()
1137 };
1138 let streaming_job = Some(job.update(txn).await?);
1139
1140 let internal_table_objs = Table::find()
1142 .find_also_related(Object)
1143 .filter(table::Column::BelongsToJobId.eq(job_id))
1144 .all(txn)
1145 .await?;
1146 let mut objects = internal_table_objs
1147 .iter()
1148 .map(|(table, obj)| PbObject {
1149 object_info: Some(PbObjectInfo::Table(
1150 ObjectModel(table.clone(), obj.clone().unwrap(), streaming_job.clone()).into(),
1151 )),
1152 })
1153 .collect_vec();
1154 let mut notification_op = if create_type == CreateType::Background {
1155 NotificationOperation::Update
1156 } else {
1157 NotificationOperation::Add
1158 };
1159 let mut updated_user_info = vec![];
1160 let mut need_grant_default_privileges = true;
1161
1162 match job_type {
1163 ObjectType::Table => {
1164 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1165 .find_also_related(Object)
1166 .one(txn)
1167 .await?
1168 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1169 if table.table_type == TableType::MaterializedView {
1170 notification_op = NotificationOperation::Update;
1171 }
1172
1173 if let Some(source_id) = table.optional_associated_source_id {
1174 let (src, obj) = Source::find_by_id(source_id)
1175 .find_also_related(Object)
1176 .one(txn)
1177 .await?
1178 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1179 objects.push(PbObject {
1180 object_info: Some(PbObjectInfo::Source(
1181 ObjectModel(src, obj.unwrap(), None).into(),
1182 )),
1183 });
1184 }
1185 objects.push(PbObject {
1186 object_info: Some(PbObjectInfo::Table(
1187 ObjectModel(table, obj.unwrap(), streaming_job).into(),
1188 )),
1189 });
1190 }
1191 ObjectType::Sink => {
1192 let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1193 .find_also_related(Object)
1194 .one(txn)
1195 .await?
1196 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1197 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1198 need_grant_default_privileges = false;
1199 }
1200 notification_op = NotificationOperation::Update;
1203 objects.push(PbObject {
1204 object_info: Some(PbObjectInfo::Sink(
1205 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
1206 )),
1207 });
1208 }
1209 ObjectType::Index => {
1210 need_grant_default_privileges = false;
1211 let (index, obj) = Index::find_by_id(job_id.as_index_id())
1212 .find_also_related(Object)
1213 .one(txn)
1214 .await?
1215 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1216 {
1217 let (table, obj) = Table::find_by_id(index.index_table_id)
1218 .find_also_related(Object)
1219 .one(txn)
1220 .await?
1221 .ok_or_else(|| {
1222 MetaError::catalog_id_not_found("table", index.index_table_id)
1223 })?;
1224 objects.push(PbObject {
1225 object_info: Some(PbObjectInfo::Table(
1226 ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1227 )),
1228 });
1229 }
1230
1231 let primary_table_privileges = UserPrivilege::find()
1234 .filter(
1235 user_privilege::Column::Oid
1236 .eq(index.primary_table_id)
1237 .and(user_privilege::Column::Action.eq(Action::Select)),
1238 )
1239 .all(txn)
1240 .await?;
1241 if !primary_table_privileges.is_empty() {
1242 let index_state_table_ids: Vec<TableId> = Table::find()
1243 .select_only()
1244 .column(table::Column::TableId)
1245 .filter(
1246 table::Column::BelongsToJobId
1247 .eq(job_id)
1248 .or(table::Column::TableId.eq(index.index_table_id)),
1249 )
1250 .into_tuple()
1251 .all(txn)
1252 .await?;
1253 let mut new_privileges = vec![];
1254 for privilege in &primary_table_privileges {
1255 for state_table_id in &index_state_table_ids {
1256 new_privileges.push(user_privilege::ActiveModel {
1257 id: Default::default(),
1258 oid: Set(state_table_id.as_object_id()),
1259 user_id: Set(privilege.user_id),
1260 action: Set(Action::Select),
1261 dependent_id: Set(privilege.dependent_id),
1262 granted_by: Set(privilege.granted_by),
1263 with_grant_option: Set(privilege.with_grant_option),
1264 });
1265 }
1266 }
1267 UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1268
1269 updated_user_info = list_user_info_by_ids(
1270 primary_table_privileges.into_iter().map(|p| p.user_id),
1271 txn,
1272 )
1273 .await?;
1274 }
1275
1276 objects.push(PbObject {
1277 object_info: Some(PbObjectInfo::Index(
1278 ObjectModel(index, obj.unwrap(), streaming_job).into(),
1279 )),
1280 });
1281 }
1282 ObjectType::Source => {
1283 let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1284 .find_also_related(Object)
1285 .one(txn)
1286 .await?
1287 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1288 objects.push(PbObject {
1289 object_info: Some(PbObjectInfo::Source(
1290 ObjectModel(source, obj.unwrap(), None).into(),
1291 )),
1292 });
1293 }
1294 _ => unreachable!("invalid job type: {:?}", job_type),
1295 }
1296
1297 if need_grant_default_privileges {
1298 updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1299 }
1300
1301 Ok((notification_op, objects, updated_user_info))
1302 }
1303
1304 pub async fn finish_replace_streaming_job(
1305 &self,
1306 tmp_id: JobId,
1307 streaming_job: StreamingJob,
1308 replace_upstream: FragmentReplaceUpstream,
1309 sink_into_table_context: SinkIntoTableContext,
1310 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1311 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1312 ) -> MetaResult<NotificationVersion> {
1313 let inner = self.inner.write().await;
1314 let txn = inner.db.begin().await?;
1315
1316 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1317 tmp_id,
1318 replace_upstream,
1319 sink_into_table_context,
1320 &txn,
1321 streaming_job,
1322 drop_table_connector_ctx,
1323 auto_refresh_schema_sinks,
1324 )
1325 .await?;
1326
1327 txn.commit().await?;
1328
1329 let mut version = self
1330 .notify_frontend(
1331 NotificationOperation::Update,
1332 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1333 )
1334 .await;
1335
1336 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1337 self.notify_users_update(user_infos).await;
1338 version = self
1339 .notify_frontend(
1340 NotificationOperation::Delete,
1341 build_object_group_for_delete(to_drop_objects),
1342 )
1343 .await;
1344 }
1345
1346 Ok(version)
1347 }
1348
1349 pub async fn finish_replace_streaming_job_inner(
1350 tmp_id: JobId,
1351 replace_upstream: FragmentReplaceUpstream,
1352 SinkIntoTableContext {
1353 updated_sink_catalogs,
1354 }: SinkIntoTableContext,
1355 txn: &DatabaseTransaction,
1356 streaming_job: StreamingJob,
1357 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1358 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1359 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1360 let original_job_id = streaming_job.id();
1361 let job_type = streaming_job.job_type();
1362
1363 let mut index_item_rewriter = None;
1364
1365 match streaming_job {
1367 StreamingJob::Table(_source, table, _table_job_type) => {
1368 let original_column_catalogs =
1371 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1372
1373 index_item_rewriter = Some({
1374 let original_columns = original_column_catalogs
1375 .to_protobuf()
1376 .into_iter()
1377 .map(|c| c.column_desc.unwrap())
1378 .collect_vec();
1379 let new_columns = table
1380 .columns
1381 .iter()
1382 .map(|c| c.column_desc.clone().unwrap())
1383 .collect_vec();
1384
1385 IndexItemRewriter {
1386 original_columns,
1387 new_columns,
1388 }
1389 });
1390
1391 for sink_id in updated_sink_catalogs {
1393 Sink::update(sink::ActiveModel {
1394 sink_id: Set(sink_id as _),
1395 original_target_columns: Set(Some(original_column_catalogs.clone())),
1396 ..Default::default()
1397 })
1398 .exec(txn)
1399 .await?;
1400 }
1401 let mut table = table::ActiveModel::from(table);
1403 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1404 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1405 {
1406 table.optional_associated_source_id = Set(None);
1408 }
1409
1410 Table::update(table).exec(txn).await?;
1411 }
1412 StreamingJob::Source(source) => {
1413 let source = source::ActiveModel::from(source);
1415 Source::update(source).exec(txn).await?;
1416 }
1417 StreamingJob::MaterializedView(table) => {
1418 let table = table::ActiveModel::from(table);
1420 Table::update(table).exec(txn).await?;
1421 }
1422 _ => unreachable!(
1423 "invalid streaming job type: {:?}",
1424 streaming_job.job_type_str()
1425 ),
1426 }
1427
1428 async fn finish_fragments(
1429 txn: &DatabaseTransaction,
1430 tmp_id: JobId,
1431 original_job_id: JobId,
1432 replace_upstream: FragmentReplaceUpstream,
1433 ) -> MetaResult<()> {
1434 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1438 .select_only()
1439 .columns([
1440 fragment::Column::FragmentId,
1441 fragment::Column::StateTableIds,
1442 ])
1443 .filter(fragment::Column::JobId.eq(tmp_id))
1444 .into_tuple()
1445 .all(txn)
1446 .await?;
1447 for (fragment_id, state_table_ids) in fragment_info {
1448 for state_table_id in state_table_ids.into_inner() {
1449 let state_table_id = TableId::new(state_table_id as _);
1450 Table::update(table::ActiveModel {
1451 table_id: Set(state_table_id),
1452 fragment_id: Set(Some(fragment_id)),
1453 ..Default::default()
1455 })
1456 .exec(txn)
1457 .await?;
1458 }
1459 }
1460
1461 Fragment::delete_many()
1463 .filter(fragment::Column::JobId.eq(original_job_id))
1464 .exec(txn)
1465 .await?;
1466 Fragment::update_many()
1467 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1468 .filter(fragment::Column::JobId.eq(tmp_id))
1469 .exec(txn)
1470 .await?;
1471
1472 for (fragment_id, fragment_replace_map) in replace_upstream {
1475 let (fragment_id, mut stream_node) =
1476 Fragment::find_by_id(fragment_id as FragmentId)
1477 .select_only()
1478 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1479 .into_tuple::<(FragmentId, StreamNode)>()
1480 .one(txn)
1481 .await?
1482 .map(|(id, node)| (id, node.to_protobuf()))
1483 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1484
1485 visit_stream_node_mut(&mut stream_node, |body| {
1486 if let PbNodeBody::Merge(m) = body
1487 && let Some(new_fragment_id) =
1488 fragment_replace_map.get(&m.upstream_fragment_id)
1489 {
1490 m.upstream_fragment_id = *new_fragment_id;
1491 }
1492 });
1493 Fragment::update(fragment::ActiveModel {
1494 fragment_id: Set(fragment_id),
1495 stream_node: Set(StreamNode::from(&stream_node)),
1496 ..Default::default()
1497 })
1498 .exec(txn)
1499 .await?;
1500 }
1501
1502 Object::delete_by_id(tmp_id).exec(txn).await?;
1504
1505 Ok(())
1506 }
1507
1508 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1509
1510 let mut objects = vec![];
1512 match job_type {
1513 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1514 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1515 .find_also_related(Object)
1516 .one(txn)
1517 .await?
1518 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1519 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
1520 .one(txn)
1521 .await?;
1522 objects.push(PbObject {
1523 object_info: Some(PbObjectInfo::Table(
1524 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
1525 )),
1526 })
1527 }
1528 StreamingJobType::Source => {
1529 let (source, source_obj) =
1530 Source::find_by_id(original_job_id.as_shared_source_id())
1531 .find_also_related(Object)
1532 .one(txn)
1533 .await?
1534 .ok_or_else(|| {
1535 MetaError::catalog_id_not_found("object", original_job_id)
1536 })?;
1537 objects.push(PbObject {
1538 object_info: Some(PbObjectInfo::Source(
1539 ObjectModel(source, source_obj.unwrap(), None).into(),
1540 )),
1541 })
1542 }
1543 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1544 }
1545
1546 if let Some(expr_rewriter) = index_item_rewriter {
1547 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1548 .select_only()
1549 .columns([index::Column::IndexId, index::Column::IndexItems])
1550 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1551 .into_tuple()
1552 .all(txn)
1553 .await?;
1554 for (index_id, nodes) in index_items {
1555 let mut pb_nodes = nodes.to_protobuf();
1556 pb_nodes
1557 .iter_mut()
1558 .for_each(|x| expr_rewriter.rewrite_expr(x));
1559 let index = index::ActiveModel {
1560 index_id: Set(index_id),
1561 index_items: Set(pb_nodes.into()),
1562 ..Default::default()
1563 }
1564 .update(txn)
1565 .await?;
1566 let (index_obj, streaming_job) = Object::find_by_id(index.index_id)
1567 .find_also_related(streaming_job::Entity)
1568 .one(txn)
1569 .await?
1570 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1571 objects.push(PbObject {
1572 object_info: Some(PbObjectInfo::Index(
1573 ObjectModel(index, index_obj, streaming_job).into(),
1574 )),
1575 });
1576 }
1577 }
1578
1579 if let Some(sinks) = auto_refresh_schema_sinks {
1580 for finish_sink_context in sinks {
1581 finish_fragments(
1582 txn,
1583 finish_sink_context.tmp_sink_id.as_job_id(),
1584 finish_sink_context.original_sink_id.as_job_id(),
1585 Default::default(),
1586 )
1587 .await?;
1588 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1589 .find_also_related(Object)
1590 .one(txn)
1591 .await?
1592 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1593 let sink_streaming_job =
1594 streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
1595 .one(txn)
1596 .await?;
1597 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1598 Sink::update(sink::ActiveModel {
1599 sink_id: Set(finish_sink_context.original_sink_id),
1600 columns: Set(columns.clone()),
1601 ..Default::default()
1602 })
1603 .exec(txn)
1604 .await?;
1605 sink.columns = columns;
1606 objects.push(PbObject {
1607 object_info: Some(PbObjectInfo::Sink(
1608 ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job.clone()).into(),
1609 )),
1610 });
1611 if let Some((log_store_table_id, new_log_store_table_columns)) =
1612 finish_sink_context.new_log_store_table
1613 {
1614 let new_log_store_table_columns: ColumnCatalogArray =
1615 new_log_store_table_columns.into();
1616 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1617 .find_also_related(Object)
1618 .one(txn)
1619 .await?
1620 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1621 Table::update(table::ActiveModel {
1622 table_id: Set(log_store_table_id),
1623 columns: Set(new_log_store_table_columns.clone()),
1624 ..Default::default()
1625 })
1626 .exec(txn)
1627 .await?;
1628 table.columns = new_log_store_table_columns;
1629 objects.push(PbObject {
1630 object_info: Some(PbObjectInfo::Table(
1631 ObjectModel(table, table_obj.unwrap(), sink_streaming_job.clone())
1632 .into(),
1633 )),
1634 });
1635 }
1636 }
1637 }
1638
1639 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1640 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1641 notification_objs =
1642 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1643 }
1644
1645 Ok((objects, notification_objs))
1646 }
1647
1648 pub async fn try_abort_replacing_streaming_job(
1650 &self,
1651 tmp_job_id: JobId,
1652 tmp_sink_ids: Option<Vec<ObjectId>>,
1653 ) -> MetaResult<()> {
1654 let inner = self.inner.write().await;
1655 let txn = inner.db.begin().await?;
1656 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1657 if let Some(tmp_sink_ids) = tmp_sink_ids {
1658 for tmp_sink_id in tmp_sink_ids {
1659 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1660 }
1661 }
1662 txn.commit().await?;
1663 Ok(())
1664 }
1665
1666 pub async fn update_source_rate_limit_by_source_id(
1669 &self,
1670 source_id: SourceId,
1671 rate_limit: Option<u32>,
1672 ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1673 let inner = self.inner.read().await;
1674 let txn = inner.db.begin().await?;
1675
1676 {
1677 let active_source = source::ActiveModel {
1678 source_id: Set(source_id),
1679 rate_limit: Set(rate_limit.map(|v| v as i32)),
1680 ..Default::default()
1681 };
1682 Source::update(active_source).exec(&txn).await?;
1683 }
1684
1685 let (source, obj) = Source::find_by_id(source_id)
1686 .find_also_related(Object)
1687 .one(&txn)
1688 .await?
1689 .ok_or_else(|| {
1690 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1691 })?;
1692
1693 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1694 let streaming_job_ids: Vec<JobId> =
1695 if let Some(table_id) = source.optional_associated_table_id {
1696 vec![table_id.as_job_id()]
1697 } else if let Some(source_info) = &source.source_info
1698 && source_info.to_protobuf().is_shared()
1699 {
1700 vec![source_id.as_share_source_job_id()]
1701 } else {
1702 ObjectDependency::find()
1703 .select_only()
1704 .column(object_dependency::Column::UsedBy)
1705 .filter(object_dependency::Column::Oid.eq(source_id))
1706 .into_tuple()
1707 .all(&txn)
1708 .await?
1709 };
1710
1711 if streaming_job_ids.is_empty() {
1712 return Err(MetaError::invalid_parameter(format!(
1713 "source id {source_id} not used by any streaming job"
1714 )));
1715 }
1716
1717 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
1718 .select_only()
1719 .columns([
1720 fragment::Column::FragmentId,
1721 fragment::Column::JobId,
1722 fragment::Column::FragmentTypeMask,
1723 fragment::Column::StreamNode,
1724 ])
1725 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1726 .into_tuple()
1727 .all(&txn)
1728 .await?;
1729 let mut fragments = fragments
1730 .into_iter()
1731 .map(|(id, job_id, mask, stream_node)| {
1732 (
1733 id,
1734 job_id,
1735 FragmentTypeMask::from(mask as u32),
1736 stream_node.to_protobuf(),
1737 )
1738 })
1739 .collect_vec();
1740
1741 fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
1742 let mut found = false;
1743 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1744 visit_stream_node_mut(stream_node, |node| {
1745 if let PbNodeBody::Source(node) = node
1746 && let Some(node_inner) = &mut node.source_inner
1747 && node_inner.source_id == source_id
1748 {
1749 node_inner.rate_limit = rate_limit;
1750 found = true;
1751 }
1752 });
1753 }
1754 if is_fs_source {
1755 visit_stream_node_mut(stream_node, |node| {
1758 if let PbNodeBody::StreamFsFetch(node) = node {
1759 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1760 if let Some(node_inner) = &mut node.node_inner
1761 && node_inner.source_id == source_id
1762 {
1763 node_inner.rate_limit = rate_limit;
1764 found = true;
1765 }
1766 }
1767 });
1768 }
1769 found
1770 });
1771
1772 assert!(
1773 !fragments.is_empty(),
1774 "source id should be used by at least one fragment"
1775 );
1776
1777 let (fragment_ids, job_ids) = fragments
1778 .iter()
1779 .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
1780 .unzip();
1781
1782 for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
1783 Fragment::update(fragment::ActiveModel {
1784 fragment_id: Set(fragment_id),
1785 fragment_type_mask: Set(fragment_type_mask.into()),
1786 stream_node: Set(StreamNode::from(&stream_node)),
1787 ..Default::default()
1788 })
1789 .exec(&txn)
1790 .await?;
1791 }
1792
1793 txn.commit().await?;
1794
1795 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap(), None).into());
1796 let _version = self
1797 .notify_frontend(
1798 NotificationOperation::Update,
1799 NotificationInfo::ObjectGroup(PbObjectGroup {
1800 objects: vec![PbObject {
1801 object_info: Some(relation_info),
1802 }],
1803 }),
1804 )
1805 .await;
1806
1807 Ok((job_ids, fragment_ids))
1808 }
1809
1810 pub async fn mutate_fragments_by_job_id(
1813 &self,
1814 job_id: JobId,
1815 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1817 err_msg: &'static str,
1819 ) -> MetaResult<HashSet<FragmentId>> {
1820 let inner = self.inner.read().await;
1821 let txn = inner.db.begin().await?;
1822
1823 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1824 .select_only()
1825 .columns([
1826 fragment::Column::FragmentId,
1827 fragment::Column::FragmentTypeMask,
1828 fragment::Column::StreamNode,
1829 ])
1830 .filter(fragment::Column::JobId.eq(job_id))
1831 .into_tuple()
1832 .all(&txn)
1833 .await?;
1834 let mut fragments = fragments
1835 .into_iter()
1836 .map(|(id, mask, stream_node)| {
1837 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1838 })
1839 .collect_vec();
1840
1841 let fragments = fragments
1842 .iter_mut()
1843 .map(|(_, fragment_type_mask, stream_node)| {
1844 fragments_mutation_fn(*fragment_type_mask, stream_node)
1845 })
1846 .collect::<MetaResult<Vec<bool>>>()?
1847 .into_iter()
1848 .zip_eq_debug(std::mem::take(&mut fragments))
1849 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1850 .collect::<Vec<_>>();
1851
1852 if fragments.is_empty() {
1853 return Err(MetaError::invalid_parameter(format!(
1854 "job id {job_id}: {}",
1855 err_msg
1856 )));
1857 }
1858
1859 let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1860 for (id, _, stream_node) in fragments {
1861 Fragment::update(fragment::ActiveModel {
1862 fragment_id: Set(id),
1863 stream_node: Set(StreamNode::from(&stream_node)),
1864 ..Default::default()
1865 })
1866 .exec(&txn)
1867 .await?;
1868 }
1869
1870 txn.commit().await?;
1871
1872 Ok(fragment_ids)
1873 }
1874
1875 async fn mutate_fragment_by_fragment_id(
1876 &self,
1877 fragment_id: FragmentId,
1878 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1879 err_msg: &'static str,
1880 ) -> MetaResult<()> {
1881 let inner = self.inner.read().await;
1882 let txn = inner.db.begin().await?;
1883
1884 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1885 Fragment::find_by_id(fragment_id)
1886 .select_only()
1887 .columns([
1888 fragment::Column::FragmentTypeMask,
1889 fragment::Column::StreamNode,
1890 ])
1891 .into_tuple()
1892 .one(&txn)
1893 .await?
1894 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1895 let mut pb_stream_node = stream_node.to_protobuf();
1896 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1897
1898 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1899 return Err(MetaError::invalid_parameter(format!(
1900 "fragment id {fragment_id}: {}",
1901 err_msg
1902 )));
1903 }
1904
1905 Fragment::update(fragment::ActiveModel {
1906 fragment_id: Set(fragment_id),
1907 stream_node: Set(stream_node),
1908 ..Default::default()
1909 })
1910 .exec(&txn)
1911 .await?;
1912
1913 txn.commit().await?;
1914
1915 Ok(())
1916 }
1917
1918 pub async fn update_backfill_orders_by_job_id(
1919 &self,
1920 job_id: JobId,
1921 backfill_orders: Option<BackfillOrders>,
1922 ) -> MetaResult<()> {
1923 let inner = self.inner.write().await;
1924 let txn = inner.db.begin().await?;
1925
1926 ensure_job_not_canceled(job_id, &txn).await?;
1927
1928 streaming_job::ActiveModel {
1929 job_id: Set(job_id),
1930 backfill_orders: Set(backfill_orders),
1931 ..Default::default()
1932 }
1933 .update(&txn)
1934 .await?;
1935
1936 txn.commit().await?;
1937
1938 Ok(())
1939 }
1940
1941 pub async fn update_backfill_rate_limit_by_job_id(
1944 &self,
1945 job_id: JobId,
1946 rate_limit: Option<u32>,
1947 ) -> MetaResult<HashSet<FragmentId>> {
1948 let update_backfill_rate_limit =
1949 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1950 let mut found = false;
1951 if fragment_type_mask
1952 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1953 {
1954 visit_stream_node_mut(stream_node, |node| match node {
1955 PbNodeBody::StreamCdcScan(node) => {
1956 node.rate_limit = rate_limit;
1957 found = true;
1958 }
1959 PbNodeBody::StreamScan(node) => {
1960 node.rate_limit = rate_limit;
1961 found = true;
1962 }
1963 PbNodeBody::SourceBackfill(node) => {
1964 node.rate_limit = rate_limit;
1965 found = true;
1966 }
1967 _ => {}
1968 });
1969 }
1970 Ok(found)
1971 };
1972
1973 self.mutate_fragments_by_job_id(
1974 job_id,
1975 update_backfill_rate_limit,
1976 "stream scan node or source node not found",
1977 )
1978 .await
1979 }
1980
1981 pub async fn update_sink_rate_limit_by_job_id(
1984 &self,
1985 sink_id: SinkId,
1986 rate_limit: Option<u32>,
1987 ) -> MetaResult<HashSet<FragmentId>> {
1988 let update_sink_rate_limit =
1989 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1990 let mut found = Ok(false);
1991 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1992 visit_stream_node_mut(stream_node, |node| {
1993 if let PbNodeBody::Sink(node) = node {
1994 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1995 found = Err(MetaError::invalid_parameter(
1996 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1997 ));
1998 return;
1999 }
2000 node.rate_limit = rate_limit;
2001 found = Ok(true);
2002 }
2003 });
2004 }
2005 found
2006 };
2007
2008 self.mutate_fragments_by_job_id(
2009 sink_id.as_job_id(),
2010 update_sink_rate_limit,
2011 "sink node not found",
2012 )
2013 .await
2014 }
2015
2016 pub async fn update_dml_rate_limit_by_job_id(
2017 &self,
2018 job_id: JobId,
2019 rate_limit: Option<u32>,
2020 ) -> MetaResult<HashSet<FragmentId>> {
2021 let update_dml_rate_limit =
2022 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2023 let mut found = false;
2024 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
2025 visit_stream_node_mut(stream_node, |node| {
2026 if let PbNodeBody::Dml(node) = node {
2027 node.rate_limit = rate_limit;
2028 found = true;
2029 }
2030 });
2031 }
2032 Ok(found)
2033 };
2034
2035 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
2036 .await
2037 }
2038
2039 pub async fn update_source_props_by_source_id(
2040 &self,
2041 source_id: SourceId,
2042 alter_props: BTreeMap<String, String>,
2043 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2044 ) -> MetaResult<WithOptionsSecResolved> {
2045 let inner = self.inner.read().await;
2046 let txn = inner.db.begin().await?;
2047
2048 let (source, _obj) = Source::find_by_id(source_id)
2049 .find_also_related(Object)
2050 .one(&txn)
2051 .await?
2052 .ok_or_else(|| {
2053 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2054 })?;
2055 let connector = source.with_properties.0.get_connector().unwrap();
2056 let is_shared_source = source.is_shared();
2057
2058 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2059 if !is_shared_source {
2060 dep_source_job_ids = ObjectDependency::find()
2062 .select_only()
2063 .column(object_dependency::Column::UsedBy)
2064 .filter(object_dependency::Column::Oid.eq(source_id))
2065 .into_tuple()
2066 .all(&txn)
2067 .await?;
2068 }
2069
2070 let prop_keys: Vec<String> = alter_props
2072 .keys()
2073 .chain(alter_secret_refs.keys())
2074 .cloned()
2075 .collect();
2076 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2077 &connector, &prop_keys,
2078 )?;
2079
2080 let mut options_with_secret = WithOptionsSecResolved::new(
2081 source.with_properties.0.clone(),
2082 source
2083 .secret_ref
2084 .map(|secret_ref| secret_ref.to_protobuf())
2085 .unwrap_or_default(),
2086 );
2087 let (to_add_secret_dep, to_remove_secret_dep) =
2088 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2089
2090 tracing::info!(
2091 "applying new properties to source: source_id={}, options_with_secret={:?}",
2092 source_id,
2093 options_with_secret
2094 );
2095 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2097 let mut associate_table_id = None;
2100
2101 let mut preferred_id = source_id.as_object_id();
2105 let rewrite_sql = {
2106 let definition = source.definition.clone();
2107
2108 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2109 .map_err(|e| {
2110 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2111 anyhow!(e).context("Failed to parse source definition SQL"),
2112 )))
2113 })?
2114 .try_into()
2115 .unwrap();
2116
2117 async fn format_with_option_secret_resolved(
2131 txn: &DatabaseTransaction,
2132 options_with_secret: &WithOptionsSecResolved,
2133 ) -> MetaResult<Vec<SqlOption>> {
2134 let mut options = Vec::new();
2135 for (k, v) in options_with_secret.as_plaintext() {
2136 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2137 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2138 options.push(sql_option);
2139 }
2140 for (k, v) in options_with_secret.as_secret() {
2141 if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2142 let sql_option =
2143 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2144 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2145 options.push(sql_option);
2146 } else {
2147 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2148 }
2149 }
2150 Ok(options)
2151 }
2152
2153 match &mut stmt {
2154 Statement::CreateSource { stmt } => {
2155 stmt.with_properties.0 =
2156 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2157 }
2158 Statement::CreateTable { with_options, .. } => {
2159 *with_options =
2160 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2161 associate_table_id = source.optional_associated_table_id;
2162 preferred_id = associate_table_id.unwrap().as_object_id();
2163 }
2164 _ => unreachable!(),
2165 }
2166
2167 stmt.to_string()
2168 };
2169
2170 {
2171 if !to_add_secret_dep.is_empty() {
2173 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2174 object_dependency::ActiveModel {
2175 oid: Set(secret_id.into()),
2176 used_by: Set(preferred_id),
2177 ..Default::default()
2178 }
2179 }))
2180 .exec(&txn)
2181 .await?;
2182 }
2183 if !to_remove_secret_dep.is_empty() {
2184 let _ = ObjectDependency::delete_many()
2186 .filter(
2187 object_dependency::Column::Oid
2188 .is_in(to_remove_secret_dep)
2189 .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2190 )
2191 .exec(&txn)
2192 .await?;
2193 }
2194 }
2195
2196 let active_source_model = source::ActiveModel {
2197 source_id: Set(source_id),
2198 definition: Set(rewrite_sql.clone()),
2199 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2200 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2201 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2202 ..Default::default()
2203 };
2204 Source::update(active_source_model).exec(&txn).await?;
2205
2206 if let Some(associate_table_id) = associate_table_id {
2207 let active_table_model = table::ActiveModel {
2209 table_id: Set(associate_table_id),
2210 definition: Set(rewrite_sql),
2211 ..Default::default()
2212 };
2213 Table::update(active_table_model).exec(&txn).await?;
2214 }
2215
2216 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2217 associate_table_id.as_job_id()
2219 } else {
2220 source_id.as_share_source_job_id()
2221 }]
2222 .into_iter()
2223 .chain(dep_source_job_ids.into_iter())
2224 .collect_vec();
2225
2226 update_connector_props_fragments(
2228 &txn,
2229 to_check_job_ids,
2230 FragmentTypeFlag::Source,
2231 |node, found| {
2232 if let PbNodeBody::Source(node) = node
2233 && let Some(source_inner) = &mut node.source_inner
2234 {
2235 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2236 source_inner.secret_refs = options_with_secret.as_secret().clone();
2237 *found = true;
2238 }
2239 },
2240 is_shared_source,
2241 )
2242 .await?;
2243
2244 let mut to_update_objs = Vec::with_capacity(2);
2245 let (source, obj) = Source::find_by_id(source_id)
2246 .find_also_related(Object)
2247 .one(&txn)
2248 .await?
2249 .ok_or_else(|| {
2250 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2251 })?;
2252 to_update_objs.push(PbObject {
2253 object_info: Some(PbObjectInfo::Source(
2254 ObjectModel(source, obj.unwrap(), None).into(),
2255 )),
2256 });
2257
2258 if let Some(associate_table_id) = associate_table_id {
2259 let (table, obj) = Table::find_by_id(associate_table_id)
2260 .find_also_related(Object)
2261 .one(&txn)
2262 .await?
2263 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2264 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2265 .one(&txn)
2266 .await?;
2267 to_update_objs.push(PbObject {
2268 object_info: Some(PbObjectInfo::Table(
2269 ObjectModel(table, obj.unwrap(), streaming_job).into(),
2270 )),
2271 });
2272 }
2273
2274 txn.commit().await?;
2275
2276 self.notify_frontend(
2277 NotificationOperation::Update,
2278 NotificationInfo::ObjectGroup(PbObjectGroup {
2279 objects: to_update_objs,
2280 }),
2281 )
2282 .await;
2283
2284 Ok(options_with_secret)
2285 }
2286
2287 pub async fn update_sink_props_by_sink_id(
2288 &self,
2289 sink_id: SinkId,
2290 props: BTreeMap<String, String>,
2291 ) -> MetaResult<HashMap<String, String>> {
2292 let inner = self.inner.read().await;
2293 let txn = inner.db.begin().await?;
2294
2295 let (sink, _obj) = Sink::find_by_id(sink_id)
2296 .find_also_related(Object)
2297 .one(&txn)
2298 .await?
2299 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2300 validate_sink_props(&sink, &props)?;
2301 let definition = sink.definition.clone();
2302 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2303 .map_err(|e| SinkError::Config(anyhow!(e)))?
2304 .try_into()
2305 .unwrap();
2306 if let Statement::CreateSink { stmt } = &mut stmt {
2307 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2308 } else {
2309 panic!("definition is not a create sink statement")
2310 }
2311 let mut new_config = sink.properties.clone().into_inner();
2312 new_config.extend(props.clone());
2313
2314 let definition = stmt.to_string();
2315 let active_sink = sink::ActiveModel {
2316 sink_id: Set(sink_id),
2317 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2318 definition: Set(definition),
2319 ..Default::default()
2320 };
2321 Sink::update(active_sink).exec(&txn).await?;
2322
2323 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2324 let (sink, obj) = Sink::find_by_id(sink_id)
2325 .find_also_related(Object)
2326 .one(&txn)
2327 .await?
2328 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2329 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2330 .one(&txn)
2331 .await?;
2332 txn.commit().await?;
2333 let relation_infos = vec![PbObject {
2334 object_info: Some(PbObjectInfo::Sink(
2335 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2336 )),
2337 }];
2338
2339 let _version = self
2340 .notify_frontend(
2341 NotificationOperation::Update,
2342 NotificationInfo::ObjectGroup(PbObjectGroup {
2343 objects: relation_infos,
2344 }),
2345 )
2346 .await;
2347
2348 Ok(props.into_iter().collect())
2349 }
2350
2351 pub async fn update_iceberg_table_props_by_table_id(
2352 &self,
2353 table_id: TableId,
2354 props: BTreeMap<String, String>,
2355 alter_iceberg_table_props: Option<
2356 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2357 >,
2358 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2359 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2360 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2361 let inner = self.inner.read().await;
2362 let txn = inner.db.begin().await?;
2363
2364 let (sink, _obj) = Sink::find_by_id(sink_id)
2365 .find_also_related(Object)
2366 .one(&txn)
2367 .await?
2368 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2369 validate_sink_props(&sink, &props)?;
2370
2371 let definition = sink.definition.clone();
2372 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2373 .map_err(|e| SinkError::Config(anyhow!(e)))?
2374 .try_into()
2375 .unwrap();
2376 if let Statement::CreateTable {
2377 with_options,
2378 engine,
2379 ..
2380 } = &mut stmt
2381 {
2382 if !matches!(engine, Engine::Iceberg) {
2383 return Err(SinkError::Config(anyhow!(
2384 "only iceberg table can be altered as sink"
2385 ))
2386 .into());
2387 }
2388 update_stmt_with_props(with_options, &props)?;
2389 } else {
2390 panic!("definition is not a create iceberg table statement")
2391 }
2392 let mut new_config = sink.properties.clone().into_inner();
2393 new_config.extend(props.clone());
2394
2395 let definition = stmt.to_string();
2396 let active_sink = sink::ActiveModel {
2397 sink_id: Set(sink_id),
2398 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2399 definition: Set(definition.clone()),
2400 ..Default::default()
2401 };
2402 let active_source = source::ActiveModel {
2403 source_id: Set(source_id),
2404 definition: Set(definition.clone()),
2405 ..Default::default()
2406 };
2407 let active_table = table::ActiveModel {
2408 table_id: Set(table_id),
2409 definition: Set(definition),
2410 ..Default::default()
2411 };
2412 Sink::update(active_sink).exec(&txn).await?;
2413 Source::update(active_source).exec(&txn).await?;
2414 Table::update(active_table).exec(&txn).await?;
2415
2416 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2417
2418 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2419 .find_also_related(Object)
2420 .one(&txn)
2421 .await?
2422 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2423 let sink_streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2424 .one(&txn)
2425 .await?;
2426 let (source, source_obj) = Source::find_by_id(source_id)
2427 .find_also_related(Object)
2428 .one(&txn)
2429 .await?
2430 .ok_or_else(|| {
2431 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2432 })?;
2433 let (table, table_obj) = Table::find_by_id(table_id)
2434 .find_also_related(Object)
2435 .one(&txn)
2436 .await?
2437 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2438 let table_streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2439 .one(&txn)
2440 .await?;
2441 txn.commit().await?;
2442 let relation_infos = vec![
2443 PbObject {
2444 object_info: Some(PbObjectInfo::Sink(
2445 ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job).into(),
2446 )),
2447 },
2448 PbObject {
2449 object_info: Some(PbObjectInfo::Source(
2450 ObjectModel(source, source_obj.unwrap(), None).into(),
2451 )),
2452 },
2453 PbObject {
2454 object_info: Some(PbObjectInfo::Table(
2455 ObjectModel(table, table_obj.unwrap(), table_streaming_job).into(),
2456 )),
2457 },
2458 ];
2459 let _version = self
2460 .notify_frontend(
2461 NotificationOperation::Update,
2462 NotificationInfo::ObjectGroup(PbObjectGroup {
2463 objects: relation_infos,
2464 }),
2465 )
2466 .await;
2467
2468 Ok((props.into_iter().collect(), sink_id))
2469 }
2470
2471 pub async fn update_connection_and_dependent_objects_props(
2473 &self,
2474 connection_id: ConnectionId,
2475 alter_props: BTreeMap<String, String>,
2476 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2477 ) -> MetaResult<(
2478 WithOptionsSecResolved, Vec<(SourceId, HashMap<String, String>)>, Vec<(SinkId, HashMap<String, String>)>, )> {
2482 let inner = self.inner.read().await;
2483 let txn = inner.db.begin().await?;
2484
2485 let dependent_sources: Vec<SourceId> = Source::find()
2487 .select_only()
2488 .column(source::Column::SourceId)
2489 .filter(source::Column::ConnectionId.eq(connection_id))
2490 .into_tuple()
2491 .all(&txn)
2492 .await?;
2493
2494 let dependent_sinks: Vec<SinkId> = Sink::find()
2495 .select_only()
2496 .column(sink::Column::SinkId)
2497 .filter(sink::Column::ConnectionId.eq(connection_id))
2498 .into_tuple()
2499 .all(&txn)
2500 .await?;
2501
2502 let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2503 .find_also_related(Object)
2504 .one(&txn)
2505 .await?
2506 .ok_or_else(|| {
2507 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2508 })?;
2509
2510 let prop_keys: Vec<String> = alter_props
2512 .keys()
2513 .chain(alter_secret_refs.keys())
2514 .cloned()
2515 .collect();
2516
2517 let connection_type_str = pb_connection_type_to_connection_type(
2519 &connection_catalog.params.to_protobuf().connection_type(),
2520 )
2521 .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2522
2523 risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2524 connection_type_str, &prop_keys,
2525 )?;
2526
2527 let connection_pb = connection_catalog.params.to_protobuf();
2528 let mut connection_options_with_secret = WithOptionsSecResolved::new(
2529 connection_pb.properties.into_iter().collect(),
2530 connection_pb.secret_refs.into_iter().collect(),
2531 );
2532
2533 let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2534 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2535
2536 tracing::debug!(
2537 "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2538 connection_id,
2539 dependent_sources,
2540 dependent_sinks
2541 );
2542
2543 {
2545 let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2546 connection_type: connection_pb.connection_type,
2547 properties: connection_options_with_secret
2548 .as_plaintext()
2549 .clone()
2550 .into_iter()
2551 .collect(),
2552 secret_refs: connection_options_with_secret
2553 .as_secret()
2554 .clone()
2555 .into_iter()
2556 .collect(),
2557 };
2558 let connection = PbConnection {
2559 id: connection_id as _,
2560 info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2561 conn_params_pb,
2562 )),
2563 ..Default::default()
2564 };
2565 validate_connection(&connection).await?;
2566 }
2567
2568 if !to_add_secret_dep.is_empty() {
2570 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2571 object_dependency::ActiveModel {
2572 oid: Set(secret_id.into()),
2573 used_by: Set(connection_id.as_object_id()),
2574 ..Default::default()
2575 }
2576 }))
2577 .exec(&txn)
2578 .await?;
2579 }
2580 if !to_remove_secret_dep.is_empty() {
2581 let _ = ObjectDependency::delete_many()
2582 .filter(
2583 object_dependency::Column::Oid
2584 .is_in(to_remove_secret_dep)
2585 .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2586 )
2587 .exec(&txn)
2588 .await?;
2589 }
2590
2591 let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2593 connection_type: connection_pb.connection_type,
2594 properties: connection_options_with_secret
2595 .as_plaintext()
2596 .clone()
2597 .into_iter()
2598 .collect(),
2599 secret_refs: connection_options_with_secret
2600 .as_secret()
2601 .clone()
2602 .into_iter()
2603 .collect(),
2604 };
2605 let active_connection_model = connection::ActiveModel {
2606 connection_id: Set(connection_id),
2607 params: Set(ConnectionParams::from(&updated_connection_params)),
2608 ..Default::default()
2609 };
2610 Connection::update(active_connection_model)
2611 .exec(&txn)
2612 .await?;
2613
2614 let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2616
2617 if !dependent_sources.is_empty() {
2618 let sources_with_objs = Source::find()
2620 .find_also_related(Object)
2621 .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2622 .all(&txn)
2623 .await?;
2624
2625 let mut source_updates = Vec::new();
2627 let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2628
2629 for (source, _obj) in sources_with_objs {
2630 let source_id = source.source_id;
2631
2632 let mut source_options_with_secret = WithOptionsSecResolved::new(
2633 source.with_properties.0.clone(),
2634 source
2635 .secret_ref
2636 .clone()
2637 .map(|secret_ref| secret_ref.to_protobuf())
2638 .unwrap_or_default(),
2639 );
2640 let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2641 source_options_with_secret
2642 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2643
2644 let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2646
2647 let active_source = source::ActiveModel {
2649 source_id: Set(source_id),
2650 with_properties: Set(Property(
2651 source_options_with_secret.as_plaintext().clone(),
2652 )),
2653 secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2654 || {
2655 risingwave_meta_model::SecretRef::from(
2656 source_options_with_secret.as_secret().clone(),
2657 )
2658 },
2659 )),
2660 ..Default::default()
2661 };
2662 source_updates.push(active_source);
2663
2664 let is_shared_source = source.is_shared();
2669 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2670 if !is_shared_source {
2671 dep_source_job_ids = ObjectDependency::find()
2672 .select_only()
2673 .column(object_dependency::Column::UsedBy)
2674 .filter(object_dependency::Column::Oid.eq(source_id))
2675 .into_tuple()
2676 .all(&txn)
2677 .await?;
2678 }
2679
2680 let base_job_id =
2681 if let Some(associate_table_id) = source.optional_associated_table_id {
2682 associate_table_id.as_job_id()
2683 } else {
2684 source_id.as_share_source_job_id()
2685 };
2686 let job_ids = vec![base_job_id]
2687 .into_iter()
2688 .chain(dep_source_job_ids.into_iter())
2689 .collect_vec();
2690
2691 fragment_updates.push(DependentSourceFragmentUpdate {
2692 job_ids,
2693 with_properties: source_options_with_secret.as_plaintext().clone(),
2694 secret_refs: source_options_with_secret.as_secret().clone(),
2695 is_shared_source,
2696 });
2697
2698 let complete_source_props = LocalSecretManager::global()
2700 .fill_secrets(
2701 source_options_with_secret.as_plaintext().clone(),
2702 source_options_with_secret.as_secret().clone(),
2703 )
2704 .map_err(MetaError::from)?
2705 .into_iter()
2706 .collect::<HashMap<String, String>>();
2707 updated_sources_with_props.push((source_id, complete_source_props));
2708 }
2709
2710 for source_update in source_updates {
2711 Source::update(source_update).exec(&txn).await?;
2712 }
2713
2714 for DependentSourceFragmentUpdate {
2716 job_ids,
2717 with_properties,
2718 secret_refs,
2719 is_shared_source,
2720 } in fragment_updates
2721 {
2722 update_connector_props_fragments(
2723 &txn,
2724 job_ids,
2725 FragmentTypeFlag::Source,
2726 |node, found| {
2727 if let PbNodeBody::Source(node) = node
2728 && let Some(source_inner) = &mut node.source_inner
2729 {
2730 source_inner.with_properties = with_properties.clone();
2731 source_inner.secret_refs = secret_refs.clone();
2732 *found = true;
2733 }
2734 },
2735 is_shared_source,
2736 )
2737 .await?;
2738 }
2739 }
2740
2741 let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
2743
2744 if !dependent_sinks.is_empty() {
2745 let sinks_with_objs = Sink::find()
2747 .find_also_related(Object)
2748 .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
2749 .all(&txn)
2750 .await?;
2751
2752 let mut sink_updates = Vec::new();
2754 let mut sink_fragment_updates = Vec::new();
2755
2756 for (sink, _obj) in sinks_with_objs {
2757 let sink_id = sink.sink_id;
2758
2759 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2761 Some(connector) => {
2762 let connector_type = connector.to_lowercase();
2763 check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
2764 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2765
2766 match_sink_name_str!(
2767 connector_type.as_str(),
2768 SinkType,
2769 {
2770 let mut new_sink_props = sink.properties.0.clone();
2771 new_sink_props.extend(alter_props.clone());
2772 SinkType::validate_alter_config(&new_sink_props)
2773 },
2774 |sink: &str| Err(SinkError::Config(anyhow!(
2775 "unsupported sink type {}",
2776 sink
2777 )))
2778 )?
2779 }
2780 None => {
2781 return Err(SinkError::Config(anyhow!(
2782 "connector not specified when alter sink"
2783 ))
2784 .into());
2785 }
2786 };
2787
2788 let mut new_sink_props = sink.properties.0.clone();
2789 new_sink_props.extend(alter_props.clone());
2790
2791 let active_sink = sink::ActiveModel {
2793 sink_id: Set(sink_id),
2794 properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
2795 ..Default::default()
2796 };
2797 sink_updates.push(active_sink);
2798
2799 sink_fragment_updates.push((sink_id, new_sink_props.clone()));
2801
2802 let complete_sink_props: HashMap<String, String> =
2804 new_sink_props.into_iter().collect();
2805 updated_sinks_with_props.push((sink_id, complete_sink_props));
2806 }
2807
2808 for sink_update in sink_updates {
2810 Sink::update(sink_update).exec(&txn).await?;
2811 }
2812
2813 for (sink_id, new_sink_props) in sink_fragment_updates {
2815 update_connector_props_fragments(
2816 &txn,
2817 vec![sink_id.as_job_id()],
2818 FragmentTypeFlag::Sink,
2819 |node, found| {
2820 if let PbNodeBody::Sink(node) = node
2821 && let Some(sink_desc) = &mut node.sink_desc
2822 && sink_desc.id == sink_id.as_raw_id()
2823 {
2824 sink_desc.properties = new_sink_props.clone();
2825 *found = true;
2826 }
2827 },
2828 true,
2829 )
2830 .await?;
2831 }
2832 }
2833
2834 let mut updated_objects = Vec::new();
2836
2837 let (connection, obj) = Connection::find_by_id(connection_id)
2839 .find_also_related(Object)
2840 .one(&txn)
2841 .await?
2842 .ok_or_else(|| {
2843 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2844 })?;
2845 updated_objects.push(PbObject {
2846 object_info: Some(PbObjectInfo::Connection(
2847 ObjectModel(connection, obj.unwrap(), None).into(),
2848 )),
2849 });
2850
2851 for source_id in &dependent_sources {
2853 let (source, obj) = Source::find_by_id(*source_id)
2854 .find_also_related(Object)
2855 .one(&txn)
2856 .await?
2857 .ok_or_else(|| {
2858 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
2859 })?;
2860 updated_objects.push(PbObject {
2861 object_info: Some(PbObjectInfo::Source(
2862 ObjectModel(source, obj.unwrap(), None).into(),
2863 )),
2864 });
2865 }
2866
2867 for sink_id in &dependent_sinks {
2869 let (sink, obj) = Sink::find_by_id(*sink_id)
2870 .find_also_related(Object)
2871 .one(&txn)
2872 .await?
2873 .ok_or_else(|| {
2874 MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
2875 })?;
2876 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2877 .one(&txn)
2878 .await?;
2879 updated_objects.push(PbObject {
2880 object_info: Some(PbObjectInfo::Sink(
2881 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2882 )),
2883 });
2884 }
2885
2886 txn.commit().await?;
2888
2889 if !updated_objects.is_empty() {
2891 self.notify_frontend(
2892 NotificationOperation::Update,
2893 NotificationInfo::ObjectGroup(PbObjectGroup {
2894 objects: updated_objects,
2895 }),
2896 )
2897 .await;
2898 }
2899
2900 Ok((
2901 connection_options_with_secret,
2902 updated_sources_with_props,
2903 updated_sinks_with_props,
2904 ))
2905 }
2906
2907 pub async fn update_fragment_rate_limit_by_fragment_id(
2908 &self,
2909 fragment_id: FragmentId,
2910 rate_limit: Option<u32>,
2911 ) -> MetaResult<()> {
2912 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2913 stream_node: &mut PbStreamNode| {
2914 let mut found = false;
2915 if fragment_type_mask.contains_any(
2916 FragmentTypeFlag::dml_rate_limit_fragments()
2917 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2918 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2919 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2920 ) {
2921 visit_stream_node_mut(stream_node, |node| {
2922 if let PbNodeBody::Dml(node) = node {
2923 node.rate_limit = rate_limit;
2924 found = true;
2925 }
2926 if let PbNodeBody::Sink(node) = node {
2927 node.rate_limit = rate_limit;
2928 found = true;
2929 }
2930 if let PbNodeBody::StreamCdcScan(node) = node {
2931 node.rate_limit = rate_limit;
2932 found = true;
2933 }
2934 if let PbNodeBody::StreamScan(node) = node {
2935 node.rate_limit = rate_limit;
2936 found = true;
2937 }
2938 if let PbNodeBody::SourceBackfill(node) = node {
2939 node.rate_limit = rate_limit;
2940 found = true;
2941 }
2942 });
2943 }
2944 found
2945 };
2946 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2947 .await
2948 }
2949
2950 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2953 let inner = self.inner.read().await;
2954 let txn = inner.db.begin().await?;
2955
2956 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2957 .select_only()
2958 .columns([
2959 fragment::Column::FragmentId,
2960 fragment::Column::JobId,
2961 fragment::Column::FragmentTypeMask,
2962 fragment::Column::StreamNode,
2963 ])
2964 .filter(FragmentTypeMask::intersects_any(
2965 FragmentTypeFlag::rate_limit_fragments(),
2966 ))
2967 .into_tuple()
2968 .all(&txn)
2969 .await?;
2970
2971 let mut rate_limits = Vec::new();
2972 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2973 let stream_node = stream_node.to_protobuf();
2974 visit_stream_node_body(&stream_node, |node| {
2975 let mut rate_limit = None;
2976 let mut node_name = None;
2977
2978 match node {
2979 PbNodeBody::Source(node) => {
2981 if let Some(node_inner) = &node.source_inner {
2982 rate_limit = node_inner.rate_limit;
2983 node_name = Some("SOURCE");
2984 }
2985 }
2986 PbNodeBody::StreamFsFetch(node) => {
2987 if let Some(node_inner) = &node.node_inner {
2988 rate_limit = node_inner.rate_limit;
2989 node_name = Some("FS_FETCH");
2990 }
2991 }
2992 PbNodeBody::SourceBackfill(node) => {
2994 rate_limit = node.rate_limit;
2995 node_name = Some("SOURCE_BACKFILL");
2996 }
2997 PbNodeBody::StreamScan(node) => {
2998 rate_limit = node.rate_limit;
2999 node_name = Some("STREAM_SCAN");
3000 }
3001 PbNodeBody::StreamCdcScan(node) => {
3002 rate_limit = node.rate_limit;
3003 node_name = Some("STREAM_CDC_SCAN");
3004 }
3005 PbNodeBody::Sink(node) => {
3006 rate_limit = node.rate_limit;
3007 node_name = Some("SINK");
3008 }
3009 _ => {}
3010 }
3011
3012 if let Some(rate_limit) = rate_limit {
3013 rate_limits.push(RateLimitInfo {
3014 fragment_id,
3015 job_id,
3016 fragment_type_mask: fragment_type_mask as u32,
3017 rate_limit,
3018 node_name: node_name.unwrap().to_owned(),
3019 });
3020 }
3021 });
3022 }
3023
3024 Ok(rate_limits)
3025 }
3026}
3027
3028fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
3029 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3031 Some(connector) => {
3032 let connector_type = connector.to_lowercase();
3033 let field_names: Vec<String> = props.keys().cloned().collect();
3034 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
3035 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3036
3037 match_sink_name_str!(
3038 connector_type.as_str(),
3039 SinkType,
3040 {
3041 let mut new_props = sink.properties.0.clone();
3042 new_props.extend(props.clone());
3043 SinkType::validate_alter_config(&new_props)
3044 },
3045 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
3046 )?
3047 }
3048 None => {
3049 return Err(
3050 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
3051 );
3052 }
3053 };
3054 Ok(())
3055}
3056
3057fn update_stmt_with_props(
3058 with_properties: &mut Vec<SqlOption>,
3059 props: &BTreeMap<String, String>,
3060) -> MetaResult<()> {
3061 let mut new_sql_options = with_properties
3062 .iter()
3063 .map(|sql_option| (&sql_option.name, sql_option))
3064 .collect::<IndexMap<_, _>>();
3065 let add_sql_options = props
3066 .iter()
3067 .map(|(k, v)| SqlOption::try_from((k, v)))
3068 .collect::<Result<Vec<SqlOption>, ParserError>>()
3069 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3070 new_sql_options.extend(
3071 add_sql_options
3072 .iter()
3073 .map(|sql_option| (&sql_option.name, sql_option)),
3074 );
3075 *with_properties = new_sql_options.into_values().cloned().collect();
3076 Ok(())
3077}
3078
3079async fn update_sink_fragment_props(
3080 txn: &DatabaseTransaction,
3081 sink_id: SinkId,
3082 props: BTreeMap<String, String>,
3083) -> MetaResult<()> {
3084 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3085 .select_only()
3086 .columns([
3087 fragment::Column::FragmentId,
3088 fragment::Column::FragmentTypeMask,
3089 fragment::Column::StreamNode,
3090 ])
3091 .filter(fragment::Column::JobId.eq(sink_id))
3092 .into_tuple()
3093 .all(txn)
3094 .await?;
3095 let fragments = fragments
3096 .into_iter()
3097 .filter(|(_, fragment_type_mask, _)| {
3098 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3099 })
3100 .filter_map(|(id, _, stream_node)| {
3101 let mut stream_node = stream_node.to_protobuf();
3102 let mut found = false;
3103 visit_stream_node_mut(&mut stream_node, |node| {
3104 if let PbNodeBody::Sink(node) = node
3105 && let Some(sink_desc) = &mut node.sink_desc
3106 && sink_desc.id == sink_id
3107 {
3108 sink_desc.properties.extend(props.clone());
3109 found = true;
3110 }
3111 });
3112 if found { Some((id, stream_node)) } else { None }
3113 })
3114 .collect_vec();
3115 assert!(
3116 !fragments.is_empty(),
3117 "sink id should be used by at least one fragment"
3118 );
3119 for (id, stream_node) in fragments {
3120 Fragment::update(fragment::ActiveModel {
3121 fragment_id: Set(id),
3122 stream_node: Set(StreamNode::from(&stream_node)),
3123 ..Default::default()
3124 })
3125 .exec(txn)
3126 .await?;
3127 }
3128 Ok(())
3129}
3130
3131pub struct SinkIntoTableContext {
3132 pub updated_sink_catalogs: Vec<SinkId>,
3135}
3136
3137pub struct FinishAutoRefreshSchemaSinkContext {
3138 pub tmp_sink_id: SinkId,
3139 pub original_sink_id: SinkId,
3140 pub columns: Vec<PbColumnCatalog>,
3141 pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3142}
3143
3144async fn update_connector_props_fragments<F>(
3145 txn: &DatabaseTransaction,
3146 job_ids: Vec<JobId>,
3147 expect_flag: FragmentTypeFlag,
3148 mut alter_stream_node_fn: F,
3149 is_shared_source: bool,
3150) -> MetaResult<()>
3151where
3152 F: FnMut(&mut PbNodeBody, &mut bool),
3153{
3154 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3155 .select_only()
3156 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3157 .filter(
3158 fragment::Column::JobId
3159 .is_in(job_ids.clone())
3160 .and(FragmentTypeMask::intersects(expect_flag)),
3161 )
3162 .into_tuple()
3163 .all(txn)
3164 .await?;
3165 let fragments = fragments
3166 .into_iter()
3167 .filter_map(|(id, stream_node)| {
3168 let mut stream_node = stream_node.to_protobuf();
3169 let mut found = false;
3170 visit_stream_node_mut(&mut stream_node, |node| {
3171 alter_stream_node_fn(node, &mut found);
3172 });
3173 if found { Some((id, stream_node)) } else { None }
3174 })
3175 .collect_vec();
3176 if is_shared_source || job_ids.len() > 1 {
3177 assert!(
3181 !fragments.is_empty(),
3182 "job ids {:?} (type: {:?}) should be used by at least one fragment",
3183 job_ids,
3184 expect_flag
3185 );
3186 }
3187
3188 for (id, stream_node) in fragments {
3189 Fragment::update(fragment::ActiveModel {
3190 fragment_id: Set(id),
3191 stream_node: Set(StreamNode::from(&stream_node)),
3192 ..Default::default()
3193 })
3194 .exec(txn)
3195 .await?;
3196 }
3197
3198 Ok(())
3199}