1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::secret::LocalSecretManager;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_common::util::stream_graph_visitor::{
28 visit_stream_node_body, visit_stream_node_mut,
29};
30use risingwave_common::{bail, current_cluster_version};
31use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
32use risingwave_connector::connector_common::validate_connection;
33use risingwave_connector::error::ConnectorError;
34use risingwave_connector::sink::file_sink::fs::FsSink;
35use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
36use risingwave_connector::source::{ConnectorProperties, pb_connection_type_to_connection_type};
37use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
40use risingwave_meta_model::refresh_job::RefreshState;
41use risingwave_meta_model::streaming_job::BackfillOrders;
42use risingwave_meta_model::table::TableType;
43use risingwave_meta_model::user_privilege::Action;
44use risingwave_meta_model::*;
45use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
46use risingwave_pb::ddl_service::streaming_job_resource_type;
47use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
48use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
49use risingwave_pb::meta::object::PbObjectInfo;
50use risingwave_pb::meta::subscribe_response::{
51 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
52};
53use risingwave_pb::meta::{PbObject, PbObjectGroup};
54use risingwave_pb::plan_common::PbColumnCatalog;
55use risingwave_pb::plan_common::source_refresh_mode::{
56 RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
57};
58use risingwave_pb::secret::PbSecretRef;
59use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
60use risingwave_pb::stream_plan::stream_node::PbNodeBody;
61use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
62use risingwave_pb::user::PbUserInfo;
63use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
64use risingwave_sqlparser::parser::{Parser, ParserError};
65use sea_orm::ActiveValue::Set;
66use sea_orm::sea_query::{Expr, Query, SimpleExpr};
67use sea_orm::{
68 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
69 NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
70};
71use thiserror_ext::AsReport;
72
73use super::rename::IndexItemRewriter;
74use crate::barrier::Command;
75use crate::controller::ObjectModel;
76use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
77use crate::controller::fragment::FragmentTypeMaskExt;
78use crate::controller::utils::{
79 PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
80 check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
81 ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
82 get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
83 list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
84};
85use crate::error::MetaErrorInner;
86use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
87use crate::model::{
88 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
89 StreamJobFragmentsToCreate,
90};
91use crate::stream::SplitAssignment;
92use crate::{MetaError, MetaResult};
93
94#[derive(Debug)]
99struct DependentSourceFragmentUpdate {
100 job_ids: Vec<JobId>,
101 with_properties: BTreeMap<String, String>,
102 secret_refs: BTreeMap<String, PbSecretRef>,
103 is_shared_source: bool,
104}
105
106impl CatalogController {
107 #[allow(clippy::too_many_arguments)]
108 pub async fn create_streaming_job_obj(
109 txn: &DatabaseTransaction,
110 obj_type: ObjectType,
111 owner_id: UserId,
112 database_id: Option<DatabaseId>,
113 schema_id: Option<SchemaId>,
114 create_type: PbCreateType,
115 ctx: StreamContext,
116 streaming_parallelism: StreamingParallelism,
117 max_parallelism: usize,
118 resource_type: streaming_job_resource_type::ResourceType,
119 backfill_parallelism: Option<StreamingParallelism>,
120 ) -> MetaResult<JobId> {
121 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
122 let job_id = obj.oid.as_job_id();
123 let specific_resource_group = resource_type.resource_group();
124 let is_serverless_backfill = matches!(
125 &resource_type,
126 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
127 );
128 let job = streaming_job::ActiveModel {
129 job_id: Set(job_id),
130 job_status: Set(JobStatus::Initial),
131 create_type: Set(create_type.into()),
132 timezone: Set(ctx.timezone),
133 config_override: Set(Some(ctx.config_override.to_string())),
134 parallelism: Set(streaming_parallelism),
135 backfill_parallelism: Set(backfill_parallelism),
136 backfill_orders: Set(None),
137 max_parallelism: Set(max_parallelism as _),
138 specific_resource_group: Set(specific_resource_group),
139 is_serverless_backfill: Set(is_serverless_backfill),
140 };
141 job.insert(txn).await?;
142
143 Ok(job_id)
144 }
145
146 #[await_tree::instrument]
152 pub async fn create_job_catalog(
153 &self,
154 streaming_job: &mut StreamingJob,
155 ctx: &StreamContext,
156 parallelism: &Option<Parallelism>,
157 max_parallelism: usize,
158 mut dependencies: HashSet<ObjectId>,
159 resource_type: streaming_job_resource_type::ResourceType,
160 backfill_parallelism: &Option<Parallelism>,
161 ) -> MetaResult<()> {
162 let inner = self.inner.write().await;
163 let txn = inner.db.begin().await?;
164 let create_type = streaming_job.create_type();
165
166 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
167 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
168 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
169 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
170 };
171 let backfill_parallelism = backfill_parallelism
172 .as_ref()
173 .map(|p| StreamingParallelism::Fixed(p.parallelism as _));
174
175 ensure_user_id(streaming_job.owner() as _, &txn).await?;
176 ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
177 ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
178 check_relation_name_duplicate(
179 &streaming_job.name(),
180 streaming_job.database_id(),
181 streaming_job.schema_id(),
182 &txn,
183 )
184 .await?;
185
186 if !dependencies.is_empty() {
188 let altering_cnt = ObjectDependency::find()
189 .join(
190 JoinType::InnerJoin,
191 object_dependency::Relation::Object1.def(),
192 )
193 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
194 .filter(
195 object_dependency::Column::Oid
196 .is_in(dependencies.clone())
197 .and(object::Column::ObjType.eq(ObjectType::Table))
198 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
199 .and(
200 object::Column::Oid.not_in_subquery(
202 Query::select()
203 .column(table::Column::TableId)
204 .from(Table)
205 .to_owned(),
206 ),
207 ),
208 )
209 .count(&txn)
210 .await?;
211 if altering_cnt != 0 {
212 return Err(MetaError::permission_denied(
213 "some dependent relations are being altered",
214 ));
215 }
216 }
217
218 match streaming_job {
219 StreamingJob::MaterializedView(table) => {
220 let job_id = Self::create_streaming_job_obj(
221 &txn,
222 ObjectType::Table,
223 table.owner as _,
224 Some(table.database_id),
225 Some(table.schema_id),
226 create_type,
227 ctx.clone(),
228 streaming_parallelism,
229 max_parallelism,
230 resource_type,
231 backfill_parallelism.clone(),
232 )
233 .await?;
234 table.id = job_id.as_mv_table_id();
235 let table_model: table::ActiveModel = table.clone().into();
236 Table::insert(table_model).exec(&txn).await?;
237 }
238 StreamingJob::Sink(sink) => {
239 if let Some(target_table_id) = sink.target_table
240 && check_sink_into_table_cycle(
241 target_table_id.into(),
242 dependencies.iter().cloned().collect(),
243 &txn,
244 )
245 .await?
246 {
247 bail!("Creating such a sink will result in circular dependency.");
248 }
249
250 let job_id = Self::create_streaming_job_obj(
251 &txn,
252 ObjectType::Sink,
253 sink.owner as _,
254 Some(sink.database_id),
255 Some(sink.schema_id),
256 create_type,
257 ctx.clone(),
258 streaming_parallelism,
259 max_parallelism,
260 resource_type,
261 backfill_parallelism.clone(),
262 )
263 .await?;
264 sink.id = job_id.as_sink_id();
265 let sink_model: sink::ActiveModel = sink.clone().into();
266 Sink::insert(sink_model).exec(&txn).await?;
267 }
268 StreamingJob::Table(src, table, _) => {
269 let job_id = Self::create_streaming_job_obj(
270 &txn,
271 ObjectType::Table,
272 table.owner as _,
273 Some(table.database_id),
274 Some(table.schema_id),
275 create_type,
276 ctx.clone(),
277 streaming_parallelism,
278 max_parallelism,
279 resource_type,
280 backfill_parallelism.clone(),
281 )
282 .await?;
283 table.id = job_id.as_mv_table_id();
284 if let Some(src) = src {
285 let src_obj = Self::create_object(
286 &txn,
287 ObjectType::Source,
288 src.owner as _,
289 Some(src.database_id),
290 Some(src.schema_id),
291 )
292 .await?;
293 src.id = src_obj.oid.as_source_id();
294 src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
295 table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
296 let source: source::ActiveModel = src.clone().into();
297 Source::insert(source).exec(&txn).await?;
298 }
299 let table_model: table::ActiveModel = table.clone().into();
300 Table::insert(table_model).exec(&txn).await?;
301
302 if table.refreshable {
303 let trigger_interval_secs = src
304 .as_ref()
305 .and_then(|source_catalog| source_catalog.refresh_mode)
306 .and_then(
307 |source_refresh_mode| match source_refresh_mode.refresh_mode {
308 Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
309 refresh_interval_sec,
310 })) => refresh_interval_sec,
311 Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
312 None => None,
313 },
314 );
315
316 RefreshJob::insert(refresh_job::ActiveModel {
317 table_id: Set(table.id),
318 last_trigger_time: Set(None),
319 trigger_interval_secs: Set(trigger_interval_secs),
320 current_status: Set(RefreshState::Idle),
321 last_success_time: Set(None),
322 })
323 .exec(&txn)
324 .await?;
325 }
326 }
327 StreamingJob::Index(index, table) => {
328 ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
329 let job_id = Self::create_streaming_job_obj(
330 &txn,
331 ObjectType::Index,
332 index.owner as _,
333 Some(index.database_id),
334 Some(index.schema_id),
335 create_type,
336 ctx.clone(),
337 streaming_parallelism,
338 max_parallelism,
339 resource_type,
340 backfill_parallelism.clone(),
341 )
342 .await?;
343 index.id = job_id.as_index_id();
345 index.index_table_id = job_id.as_mv_table_id();
346 table.id = job_id.as_mv_table_id();
347
348 ObjectDependency::insert(object_dependency::ActiveModel {
349 oid: Set(index.primary_table_id.into()),
350 used_by: Set(table.id.into()),
351 ..Default::default()
352 })
353 .exec(&txn)
354 .await?;
355
356 let table_model: table::ActiveModel = table.clone().into();
357 Table::insert(table_model).exec(&txn).await?;
358 let index_model: index::ActiveModel = index.clone().into();
359 Index::insert(index_model).exec(&txn).await?;
360 }
361 StreamingJob::Source(src) => {
362 let job_id = Self::create_streaming_job_obj(
363 &txn,
364 ObjectType::Source,
365 src.owner as _,
366 Some(src.database_id),
367 Some(src.schema_id),
368 create_type,
369 ctx.clone(),
370 streaming_parallelism,
371 max_parallelism,
372 resource_type,
373 backfill_parallelism.clone(),
374 )
375 .await?;
376 src.id = job_id.as_shared_source_id();
377 let source_model: source::ActiveModel = src.clone().into();
378 Source::insert(source_model).exec(&txn).await?;
379 }
380 }
381
382 dependencies.extend(
384 streaming_job
385 .dependent_secret_ids()?
386 .into_iter()
387 .map(|id| id.as_object_id()),
388 );
389 dependencies.extend(
391 streaming_job
392 .dependent_connection_ids()?
393 .into_iter()
394 .map(|id| id.as_object_id()),
395 );
396
397 if !dependencies.is_empty() {
399 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
400 object_dependency::ActiveModel {
401 oid: Set(oid),
402 used_by: Set(streaming_job.id().as_object_id()),
403 ..Default::default()
404 }
405 }))
406 .exec(&txn)
407 .await?;
408 }
409
410 txn.commit().await?;
411
412 Ok(())
413 }
414
415 pub async fn create_internal_table_catalog(
423 &self,
424 job: &StreamingJob,
425 mut incomplete_internal_tables: Vec<PbTable>,
426 ) -> MetaResult<HashMap<TableId, TableId>> {
427 let job_id = job.id();
428 let inner = self.inner.write().await;
429 let txn = inner.db.begin().await?;
430
431 ensure_job_not_canceled(job_id, &txn).await?;
433
434 let mut table_id_map = HashMap::new();
435 for table in &mut incomplete_internal_tables {
436 let table_id = Self::create_object(
437 &txn,
438 ObjectType::Table,
439 table.owner as _,
440 Some(table.database_id),
441 Some(table.schema_id),
442 )
443 .await?
444 .oid
445 .as_table_id();
446 table_id_map.insert(table.id, table_id);
447 table.id = table_id;
448 table.job_id = Some(job_id);
449
450 let table_model = table::ActiveModel {
451 table_id: Set(table_id),
452 belongs_to_job_id: Set(Some(job_id)),
453 fragment_id: NotSet,
454 ..table.clone().into()
455 };
456 Table::insert(table_model).exec(&txn).await?;
457 }
458 txn.commit().await?;
459
460 Ok(table_id_map)
461 }
462
463 pub async fn prepare_stream_job_fragments(
464 &self,
465 stream_job_fragments: &StreamJobFragmentsToCreate,
466 streaming_job: &StreamingJob,
467 for_replace: bool,
468 backfill_orders: Option<BackfillOrders>,
469 ) -> MetaResult<()> {
470 self.prepare_streaming_job(
471 stream_job_fragments.stream_job_id(),
472 || stream_job_fragments.fragments.values(),
473 &stream_job_fragments.downstreams,
474 for_replace,
475 Some(streaming_job),
476 backfill_orders,
477 )
478 .await
479 }
480
481 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
486 )]
487 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
488 &self,
489 job_id: JobId,
490 get_fragments: impl Fn() -> I + 'a,
491 downstreams: &FragmentDownstreamRelation,
492 for_replace: bool,
493 creating_streaming_job: Option<&'a StreamingJob>,
494 backfill_orders: Option<BackfillOrders>,
495 ) -> MetaResult<()> {
496 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
497
498 let inner = self.inner.write().await;
499
500 let need_notify = creating_streaming_job
501 .map(|job| job.should_notify_creating())
502 .unwrap_or(false);
503 let definition = creating_streaming_job.map(|job| job.definition());
504
505 let mut objects_to_notify = vec![];
506 let txn = inner.db.begin().await?;
507
508 ensure_job_not_canceled(job_id, &txn).await?;
510
511 if let Some(backfill_orders) = backfill_orders {
512 let job = streaming_job::ActiveModel {
513 job_id: Set(job_id),
514 backfill_orders: Set(Some(backfill_orders)),
515 ..Default::default()
516 };
517 job.update(&txn).await?;
518 }
519
520 let state_table_ids = fragments
521 .iter()
522 .flat_map(|fragment| fragment.state_table_ids.inner_ref().clone())
523 .collect_vec();
524
525 if !fragments.is_empty() {
526 let fragment_models = fragments
527 .into_iter()
528 .map(|fragment| fragment.into_active_model())
529 .collect_vec();
530 Fragment::insert_many(fragment_models).exec(&txn).await?;
531 }
532
533 if !for_replace {
536 let all_tables = StreamJobFragments::collect_tables(get_fragments());
537 for state_table_id in state_table_ids {
538 let table = all_tables
542 .get(&state_table_id)
543 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
544 assert_eq!(table.id, state_table_id);
545 let vnode_count = table.vnode_count();
546
547 table::ActiveModel {
548 table_id: Set(state_table_id as _),
549 fragment_id: Set(Some(table.fragment_id)),
550 vnode_count: Set(vnode_count as _),
551 ..Default::default()
552 }
553 .update(&txn)
554 .await?;
555
556 if need_notify {
557 let mut table = table.clone();
558 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
560 table.definition = definition.clone().unwrap();
561 }
562 objects_to_notify.push(PbObject {
563 object_info: Some(PbObjectInfo::Table(table)),
564 });
565 }
566 }
567 }
568
569 if need_notify {
571 match creating_streaming_job.unwrap() {
572 StreamingJob::Table(Some(source), ..) => {
573 objects_to_notify.push(PbObject {
574 object_info: Some(PbObjectInfo::Source(source.clone())),
575 });
576 }
577 StreamingJob::Sink(sink) => {
578 objects_to_notify.push(PbObject {
579 object_info: Some(PbObjectInfo::Sink(sink.clone())),
580 });
581 }
582 StreamingJob::Index(index, _) => {
583 objects_to_notify.push(PbObject {
584 object_info: Some(PbObjectInfo::Index(index.clone())),
585 });
586 }
587 _ => {}
588 }
589 }
590
591 insert_fragment_relations(&txn, downstreams).await?;
592
593 if !for_replace {
594 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
596 Table::update(table::ActiveModel {
597 table_id: Set(table.id),
598 dml_fragment_id: Set(table.dml_fragment_id),
599 ..Default::default()
600 })
601 .exec(&txn)
602 .await?;
603 }
604 }
605
606 txn.commit().await?;
607
608 if !objects_to_notify.is_empty() {
612 self.notify_frontend(
613 Operation::Add,
614 Info::ObjectGroup(PbObjectGroup {
615 objects: objects_to_notify,
616 }),
617 )
618 .await;
619 }
620
621 Ok(())
622 }
623
624 pub async fn build_cancel_command(
627 &self,
628 table_fragments: &StreamJobFragments,
629 ) -> MetaResult<Command> {
630 let inner = self.inner.read().await;
631 let txn = inner.db.begin().await?;
632
633 let dropped_sink_fragment_with_target =
634 if let Some(sink_fragment) = table_fragments.sink_fragment() {
635 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
636 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
637 sink_target_fragment
638 .get(&sink_fragment_id)
639 .map(|target_fragments| {
640 let target_fragment_id = *target_fragments
641 .first()
642 .expect("sink should have at least one downstream fragment");
643 (sink_fragment_id, target_fragment_id)
644 })
645 } else {
646 None
647 };
648
649 Ok(Command::DropStreamingJobs {
650 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
651 actors: table_fragments.actor_ids().collect(),
652 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
653 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
654 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
655 .into_iter()
656 .map(|(sink, target)| (target as _, vec![sink as _]))
657 .collect(),
658 })
659 }
660
661 #[await_tree::instrument]
665 pub async fn try_abort_creating_streaming_job(
666 &self,
667 mut job_id: JobId,
668 is_cancelled: bool,
669 ) -> MetaResult<(bool, Option<DatabaseId>)> {
670 let mut inner = self.inner.write().await;
671 let txn = inner.db.begin().await?;
672
673 let obj = Object::find_by_id(job_id).one(&txn).await?;
674 let Some(obj) = obj else {
675 tracing::warn!(
676 id = %job_id,
677 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
678 );
679 return Ok((true, None));
680 };
681 let database_id = obj
682 .database_id
683 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
684 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
685
686 if !is_cancelled && let Some(streaming_job) = &streaming_job {
687 assert_ne!(streaming_job.job_status, JobStatus::Created);
688 if streaming_job.create_type == CreateType::Background
689 && streaming_job.job_status == JobStatus::Creating
690 {
691 if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
692 && check_if_belongs_to_iceberg_table(&txn, job_id).await?
693 {
694 } else {
696 tracing::warn!(
698 id = %job_id,
699 "streaming job is created in background and still in creating status"
700 );
701 return Ok((false, Some(database_id)));
702 }
703 }
704 }
705
706 let original_job_id = job_id;
708 let original_obj_type = obj.obj_type;
709
710 let iceberg_table_id =
711 try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
712 if let Some(iceberg_table_id) = iceberg_table_id {
713 let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
716 Object::delete_many()
717 .filter(
718 object::Column::Oid
719 .eq(job_id)
720 .or(object::Column::Oid.is_in(internal_tables)),
721 )
722 .exec(&txn)
723 .await?;
724 job_id = iceberg_table_id.as_job_id();
725 };
726
727 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
728
729 let mut objs = vec![];
731 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
732
733 let mut need_notify =
734 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
735 if !need_notify {
736 if let Some(table) = &table_obj {
737 need_notify = table.table_type == TableType::MaterializedView;
738 } else if original_obj_type == ObjectType::Sink {
739 need_notify = true;
740 }
741 }
742
743 if is_cancelled {
744 let dropped_tables = Table::find()
745 .find_also_related(Object)
746 .filter(
747 table::Column::TableId.is_in(
748 internal_table_ids
749 .iter()
750 .cloned()
751 .chain(table_obj.iter().map(|t| t.table_id as _)),
752 ),
753 )
754 .all(&txn)
755 .await?
756 .into_iter()
757 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
758 inner
759 .dropped_tables
760 .extend(dropped_tables.map(|t| (t.id, t)));
761 }
762
763 if need_notify {
764 if original_obj_type == ObjectType::Sink && original_job_id != job_id {
767 let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
768 .select_only()
769 .columns([
770 object::Column::Oid,
771 object::Column::ObjType,
772 object::Column::SchemaId,
773 object::Column::DatabaseId,
774 ])
775 .into_partial_model()
776 .one(&txn)
777 .await?;
778 if let Some(orig_obj) = orig_obj {
779 objs.push(orig_obj);
780 }
781 }
782
783 let obj: Option<PartialObject> = Object::find_by_id(job_id)
784 .select_only()
785 .columns([
786 object::Column::Oid,
787 object::Column::ObjType,
788 object::Column::SchemaId,
789 object::Column::DatabaseId,
790 ])
791 .into_partial_model()
792 .one(&txn)
793 .await?;
794 let obj =
795 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
796 objs.push(obj);
797 let internal_table_objs: Vec<PartialObject> = Object::find()
798 .select_only()
799 .columns([
800 object::Column::Oid,
801 object::Column::ObjType,
802 object::Column::SchemaId,
803 object::Column::DatabaseId,
804 ])
805 .join(JoinType::InnerJoin, object::Relation::Table.def())
806 .filter(table::Column::BelongsToJobId.eq(job_id))
807 .into_partial_model()
808 .all(&txn)
809 .await?;
810 objs.extend(internal_table_objs);
811 }
812
813 if table_obj.is_none()
815 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
816 .select_only()
817 .column(sink::Column::TargetTable)
818 .into_tuple::<Option<TableId>>()
819 .one(&txn)
820 .await?
821 {
822 let tmp_id: Option<ObjectId> = ObjectDependency::find()
823 .select_only()
824 .column(object_dependency::Column::UsedBy)
825 .join(
826 JoinType::InnerJoin,
827 object_dependency::Relation::Object1.def(),
828 )
829 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
830 .filter(
831 object_dependency::Column::Oid
832 .eq(target_table_id)
833 .and(object::Column::ObjType.eq(ObjectType::Table))
834 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
835 )
836 .into_tuple()
837 .one(&txn)
838 .await?;
839 if let Some(tmp_id) = tmp_id {
840 tracing::warn!(
841 id = %tmp_id,
842 "aborting temp streaming job for sink into table"
843 );
844
845 Object::delete_by_id(tmp_id).exec(&txn).await?;
846 }
847 }
848
849 Object::delete_by_id(job_id).exec(&txn).await?;
850 if !internal_table_ids.is_empty() {
851 Object::delete_many()
852 .filter(object::Column::Oid.is_in(internal_table_ids))
853 .exec(&txn)
854 .await?;
855 }
856 if let Some(t) = &table_obj
857 && let Some(source_id) = t.optional_associated_source_id
858 {
859 Object::delete_by_id(source_id).exec(&txn).await?;
860 }
861
862 let err = if is_cancelled {
863 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
864 } else {
865 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
866 };
867 let abort_reason = format!("streaming job aborted {}", err.as_report());
868 for tx in inner
869 .creating_table_finish_notifier
870 .get_mut(&database_id)
871 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
872 .into_iter()
873 .flatten()
874 .flatten()
875 {
876 let _ = tx.send(Err(abort_reason.clone()));
877 }
878 txn.commit().await?;
879
880 if !objs.is_empty() {
881 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
884 .await;
885 }
886 Ok((true, Some(database_id)))
887 }
888
889 #[await_tree::instrument]
890 pub async fn post_collect_job_fragments(
891 &self,
892 job_id: JobId,
893 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
894 new_sink_downstream: Option<FragmentDownstreamRelation>,
895 split_assignment: Option<&SplitAssignment>,
896 ) -> MetaResult<()> {
897 let inner = self.inner.write().await;
898 let txn = inner.db.begin().await?;
899
900 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
901
902 if let Some(new_downstream) = new_sink_downstream {
903 insert_fragment_relations(&txn, &new_downstream).await?;
904 }
905
906 streaming_job::ActiveModel {
908 job_id: Set(job_id),
909 job_status: Set(JobStatus::Creating),
910 ..Default::default()
911 }
912 .update(&txn)
913 .await?;
914
915 if let Some(split_assignment) = split_assignment {
916 let fragment_splits = split_assignment
917 .iter()
918 .map(|(fragment_id, splits)| {
919 (
920 *fragment_id as _,
921 splits.values().flatten().cloned().collect_vec(),
922 )
923 })
924 .collect();
925
926 self.update_fragment_splits(&txn, &fragment_splits).await?;
927 }
928
929 txn.commit().await?;
930
931 Ok(())
932 }
933
934 pub async fn create_job_catalog_for_replace(
935 &self,
936 streaming_job: &StreamingJob,
937 ctx: Option<&StreamContext>,
938 specified_parallelism: Option<&NonZeroUsize>,
939 expected_original_max_parallelism: Option<usize>,
940 ) -> MetaResult<JobId> {
941 let id = streaming_job.id();
942 let inner = self.inner.write().await;
943 let txn = inner.db.begin().await?;
944
945 streaming_job.verify_version_for_replace(&txn).await?;
947 let referring_cnt = ObjectDependency::find()
949 .join(
950 JoinType::InnerJoin,
951 object_dependency::Relation::Object1.def(),
952 )
953 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
954 .filter(
955 object_dependency::Column::Oid
956 .eq(id)
957 .and(object::Column::ObjType.eq(ObjectType::Table))
958 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
959 )
960 .count(&txn)
961 .await?;
962 if referring_cnt != 0 {
963 return Err(MetaError::permission_denied(
964 "job is being altered or referenced by some creating jobs",
965 ));
966 }
967
968 let (original_max_parallelism, original_timezone, original_config_override): (
970 i32,
971 Option<String>,
972 Option<String>,
973 ) = StreamingJobModel::find_by_id(id)
974 .select_only()
975 .column(streaming_job::Column::MaxParallelism)
976 .column(streaming_job::Column::Timezone)
977 .column(streaming_job::Column::ConfigOverride)
978 .into_tuple()
979 .one(&txn)
980 .await?
981 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
982
983 if let Some(max_parallelism) = expected_original_max_parallelism
984 && original_max_parallelism != max_parallelism as i32
985 {
986 bail!(
989 "cannot use a different max parallelism \
990 when replacing streaming job, \
991 original: {}, new: {}",
992 original_max_parallelism,
993 max_parallelism
994 );
995 }
996
997 let parallelism = match specified_parallelism {
998 None => StreamingParallelism::Adaptive,
999 Some(n) => StreamingParallelism::Fixed(n.get() as _),
1000 };
1001
1002 let ctx = StreamContext {
1003 timezone: ctx
1004 .map(|ctx| ctx.timezone.clone())
1005 .unwrap_or(original_timezone),
1006 config_override: original_config_override.unwrap_or_default().into(),
1009 };
1010
1011 let new_obj_id = Self::create_streaming_job_obj(
1013 &txn,
1014 streaming_job.object_type(),
1015 streaming_job.owner() as _,
1016 Some(streaming_job.database_id() as _),
1017 Some(streaming_job.schema_id() as _),
1018 streaming_job.create_type(),
1019 ctx,
1020 parallelism,
1021 original_max_parallelism as _,
1022 streaming_job_resource_type::ResourceType::Regular(true),
1023 None,
1024 )
1025 .await?;
1026
1027 ObjectDependency::insert(object_dependency::ActiveModel {
1029 oid: Set(id.as_object_id()),
1030 used_by: Set(new_obj_id.as_object_id()),
1031 ..Default::default()
1032 })
1033 .exec(&txn)
1034 .await?;
1035
1036 txn.commit().await?;
1037
1038 Ok(new_obj_id)
1039 }
1040
1041 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1043 let mut inner = self.inner.write().await;
1044 let txn = inner.db.begin().await?;
1045
1046 if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1048 tracing::info!(
1049 "streaming job {} is for iceberg table, wait for manual finish operation",
1050 job_id
1051 );
1052 return Ok(());
1053 }
1054
1055 let (notification_op, objects, updated_user_info) =
1056 Self::finish_streaming_job_inner(&txn, job_id).await?;
1057
1058 txn.commit().await?;
1059
1060 let mut version = self
1061 .notify_frontend(
1062 notification_op,
1063 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1064 )
1065 .await;
1066
1067 if !updated_user_info.is_empty() {
1069 version = self.notify_users_update(updated_user_info).await;
1070 }
1071
1072 inner
1073 .creating_table_finish_notifier
1074 .values_mut()
1075 .for_each(|creating_tables| {
1076 if let Some(txs) = creating_tables.remove(&job_id) {
1077 for tx in txs {
1078 let _ = tx.send(Ok(version));
1079 }
1080 }
1081 });
1082
1083 Ok(())
1084 }
1085
1086 pub async fn finish_streaming_job_inner(
1088 txn: &DatabaseTransaction,
1089 job_id: JobId,
1090 ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1091 let job_type = Object::find_by_id(job_id)
1092 .select_only()
1093 .column(object::Column::ObjType)
1094 .into_tuple()
1095 .one(txn)
1096 .await?
1097 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1098
1099 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1100 .select_only()
1101 .column(streaming_job::Column::CreateType)
1102 .into_tuple()
1103 .one(txn)
1104 .await?
1105 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1106
1107 let res = Object::update_many()
1109 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1110 .col_expr(
1111 object::Column::CreatedAtClusterVersion,
1112 current_cluster_version().into(),
1113 )
1114 .filter(object::Column::Oid.eq(job_id))
1115 .exec(txn)
1116 .await?;
1117 if res.rows_affected == 0 {
1118 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1119 }
1120
1121 let job = streaming_job::ActiveModel {
1123 job_id: Set(job_id),
1124 job_status: Set(JobStatus::Created),
1125 ..Default::default()
1126 };
1127 let streaming_job = Some(job.update(txn).await?);
1128
1129 let internal_table_objs = Table::find()
1131 .find_also_related(Object)
1132 .filter(table::Column::BelongsToJobId.eq(job_id))
1133 .all(txn)
1134 .await?;
1135 let mut objects = internal_table_objs
1136 .iter()
1137 .map(|(table, obj)| PbObject {
1138 object_info: Some(PbObjectInfo::Table(
1139 ObjectModel(table.clone(), obj.clone().unwrap(), streaming_job.clone()).into(),
1140 )),
1141 })
1142 .collect_vec();
1143 let mut notification_op = if create_type == CreateType::Background {
1144 NotificationOperation::Update
1145 } else {
1146 NotificationOperation::Add
1147 };
1148 let mut updated_user_info = vec![];
1149 let mut need_grant_default_privileges = true;
1150
1151 match job_type {
1152 ObjectType::Table => {
1153 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1154 .find_also_related(Object)
1155 .one(txn)
1156 .await?
1157 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1158 if table.table_type == TableType::MaterializedView {
1159 notification_op = NotificationOperation::Update;
1160 }
1161
1162 if let Some(source_id) = table.optional_associated_source_id {
1163 let (src, obj) = Source::find_by_id(source_id)
1164 .find_also_related(Object)
1165 .one(txn)
1166 .await?
1167 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1168 objects.push(PbObject {
1169 object_info: Some(PbObjectInfo::Source(
1170 ObjectModel(src, obj.unwrap(), None).into(),
1171 )),
1172 });
1173 }
1174 objects.push(PbObject {
1175 object_info: Some(PbObjectInfo::Table(
1176 ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1177 )),
1178 });
1179 }
1180 ObjectType::Sink => {
1181 let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1182 .find_also_related(Object)
1183 .one(txn)
1184 .await?
1185 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1186 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1187 need_grant_default_privileges = false;
1188 }
1189 notification_op = NotificationOperation::Update;
1192 objects.push(PbObject {
1193 object_info: Some(PbObjectInfo::Sink(
1194 ObjectModel(sink, obj.unwrap(), streaming_job.clone()).into(),
1195 )),
1196 });
1197 }
1198 ObjectType::Index => {
1199 need_grant_default_privileges = false;
1200 let (index, obj) = Index::find_by_id(job_id.as_index_id())
1201 .find_also_related(Object)
1202 .one(txn)
1203 .await?
1204 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1205 {
1206 let (table, obj) = Table::find_by_id(index.index_table_id)
1207 .find_also_related(Object)
1208 .one(txn)
1209 .await?
1210 .ok_or_else(|| {
1211 MetaError::catalog_id_not_found("table", index.index_table_id)
1212 })?;
1213 objects.push(PbObject {
1214 object_info: Some(PbObjectInfo::Table(
1215 ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1216 )),
1217 });
1218 }
1219
1220 let primary_table_privileges = UserPrivilege::find()
1223 .filter(
1224 user_privilege::Column::Oid
1225 .eq(index.primary_table_id)
1226 .and(user_privilege::Column::Action.eq(Action::Select)),
1227 )
1228 .all(txn)
1229 .await?;
1230 if !primary_table_privileges.is_empty() {
1231 let index_state_table_ids: Vec<TableId> = Table::find()
1232 .select_only()
1233 .column(table::Column::TableId)
1234 .filter(
1235 table::Column::BelongsToJobId
1236 .eq(job_id)
1237 .or(table::Column::TableId.eq(index.index_table_id)),
1238 )
1239 .into_tuple()
1240 .all(txn)
1241 .await?;
1242 let mut new_privileges = vec![];
1243 for privilege in &primary_table_privileges {
1244 for state_table_id in &index_state_table_ids {
1245 new_privileges.push(user_privilege::ActiveModel {
1246 id: Default::default(),
1247 oid: Set(state_table_id.as_object_id()),
1248 user_id: Set(privilege.user_id),
1249 action: Set(Action::Select),
1250 dependent_id: Set(privilege.dependent_id),
1251 granted_by: Set(privilege.granted_by),
1252 with_grant_option: Set(privilege.with_grant_option),
1253 });
1254 }
1255 }
1256 UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1257
1258 updated_user_info = list_user_info_by_ids(
1259 primary_table_privileges.into_iter().map(|p| p.user_id),
1260 txn,
1261 )
1262 .await?;
1263 }
1264
1265 objects.push(PbObject {
1266 object_info: Some(PbObjectInfo::Index(
1267 ObjectModel(index, obj.unwrap(), streaming_job.clone()).into(),
1268 )),
1269 });
1270 }
1271 ObjectType::Source => {
1272 let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1273 .find_also_related(Object)
1274 .one(txn)
1275 .await?
1276 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1277 objects.push(PbObject {
1278 object_info: Some(PbObjectInfo::Source(
1279 ObjectModel(source, obj.unwrap(), None).into(),
1280 )),
1281 });
1282 }
1283 _ => unreachable!("invalid job type: {:?}", job_type),
1284 }
1285
1286 if need_grant_default_privileges {
1287 updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1288 }
1289
1290 Ok((notification_op, objects, updated_user_info))
1291 }
1292
1293 pub async fn finish_replace_streaming_job(
1294 &self,
1295 tmp_id: JobId,
1296 streaming_job: StreamingJob,
1297 replace_upstream: FragmentReplaceUpstream,
1298 sink_into_table_context: SinkIntoTableContext,
1299 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1300 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1301 ) -> MetaResult<NotificationVersion> {
1302 let inner = self.inner.write().await;
1303 let txn = inner.db.begin().await?;
1304
1305 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1306 tmp_id,
1307 replace_upstream,
1308 sink_into_table_context,
1309 &txn,
1310 streaming_job,
1311 drop_table_connector_ctx,
1312 auto_refresh_schema_sinks,
1313 )
1314 .await?;
1315
1316 txn.commit().await?;
1317
1318 let mut version = self
1319 .notify_frontend(
1320 NotificationOperation::Update,
1321 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1322 )
1323 .await;
1324
1325 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1326 self.notify_users_update(user_infos).await;
1327 version = self
1328 .notify_frontend(
1329 NotificationOperation::Delete,
1330 build_object_group_for_delete(to_drop_objects),
1331 )
1332 .await;
1333 }
1334
1335 Ok(version)
1336 }
1337
1338 pub async fn finish_replace_streaming_job_inner(
1339 tmp_id: JobId,
1340 replace_upstream: FragmentReplaceUpstream,
1341 SinkIntoTableContext {
1342 updated_sink_catalogs,
1343 }: SinkIntoTableContext,
1344 txn: &DatabaseTransaction,
1345 streaming_job: StreamingJob,
1346 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1347 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1348 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1349 let original_job_id = streaming_job.id();
1350 let job_type = streaming_job.job_type();
1351
1352 let mut index_item_rewriter = None;
1353
1354 match streaming_job {
1356 StreamingJob::Table(_source, table, _table_job_type) => {
1357 let original_column_catalogs =
1360 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1361
1362 index_item_rewriter = Some({
1363 let original_columns = original_column_catalogs
1364 .to_protobuf()
1365 .into_iter()
1366 .map(|c| c.column_desc.unwrap())
1367 .collect_vec();
1368 let new_columns = table
1369 .columns
1370 .iter()
1371 .map(|c| c.column_desc.clone().unwrap())
1372 .collect_vec();
1373
1374 IndexItemRewriter {
1375 original_columns,
1376 new_columns,
1377 }
1378 });
1379
1380 for sink_id in updated_sink_catalogs {
1382 sink::ActiveModel {
1383 sink_id: Set(sink_id as _),
1384 original_target_columns: Set(Some(original_column_catalogs.clone())),
1385 ..Default::default()
1386 }
1387 .update(txn)
1388 .await?;
1389 }
1390 let mut table = table::ActiveModel::from(table);
1392 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1393 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1394 {
1395 table.optional_associated_source_id = Set(None);
1397 }
1398
1399 table.update(txn).await?;
1400 }
1401 StreamingJob::Source(source) => {
1402 let source = source::ActiveModel::from(source);
1404 source.update(txn).await?;
1405 }
1406 StreamingJob::MaterializedView(table) => {
1407 let table = table::ActiveModel::from(table);
1409 table.update(txn).await?;
1410 }
1411 _ => unreachable!(
1412 "invalid streaming job type: {:?}",
1413 streaming_job.job_type_str()
1414 ),
1415 }
1416
1417 async fn finish_fragments(
1418 txn: &DatabaseTransaction,
1419 tmp_id: JobId,
1420 original_job_id: JobId,
1421 replace_upstream: FragmentReplaceUpstream,
1422 ) -> MetaResult<()> {
1423 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1427 .select_only()
1428 .columns([
1429 fragment::Column::FragmentId,
1430 fragment::Column::StateTableIds,
1431 ])
1432 .filter(fragment::Column::JobId.eq(tmp_id))
1433 .into_tuple()
1434 .all(txn)
1435 .await?;
1436 for (fragment_id, state_table_ids) in fragment_info {
1437 for state_table_id in state_table_ids.into_inner() {
1438 let state_table_id = TableId::new(state_table_id as _);
1439 table::ActiveModel {
1440 table_id: Set(state_table_id),
1441 fragment_id: Set(Some(fragment_id)),
1442 ..Default::default()
1444 }
1445 .update(txn)
1446 .await?;
1447 }
1448 }
1449
1450 Fragment::delete_many()
1452 .filter(fragment::Column::JobId.eq(original_job_id))
1453 .exec(txn)
1454 .await?;
1455 Fragment::update_many()
1456 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1457 .filter(fragment::Column::JobId.eq(tmp_id))
1458 .exec(txn)
1459 .await?;
1460
1461 for (fragment_id, fragment_replace_map) in replace_upstream {
1464 let (fragment_id, mut stream_node) =
1465 Fragment::find_by_id(fragment_id as FragmentId)
1466 .select_only()
1467 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1468 .into_tuple::<(FragmentId, StreamNode)>()
1469 .one(txn)
1470 .await?
1471 .map(|(id, node)| (id, node.to_protobuf()))
1472 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1473
1474 visit_stream_node_mut(&mut stream_node, |body| {
1475 if let PbNodeBody::Merge(m) = body
1476 && let Some(new_fragment_id) =
1477 fragment_replace_map.get(&m.upstream_fragment_id)
1478 {
1479 m.upstream_fragment_id = *new_fragment_id;
1480 }
1481 });
1482 fragment::ActiveModel {
1483 fragment_id: Set(fragment_id),
1484 stream_node: Set(StreamNode::from(&stream_node)),
1485 ..Default::default()
1486 }
1487 .update(txn)
1488 .await?;
1489 }
1490
1491 Object::delete_by_id(tmp_id).exec(txn).await?;
1493
1494 Ok(())
1495 }
1496
1497 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1498
1499 let mut objects = vec![];
1501 match job_type {
1502 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1503 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1504 .find_also_related(Object)
1505 .one(txn)
1506 .await?
1507 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1508 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
1509 .one(txn)
1510 .await?;
1511 objects.push(PbObject {
1512 object_info: Some(PbObjectInfo::Table(
1513 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
1514 )),
1515 })
1516 }
1517 StreamingJobType::Source => {
1518 let (source, source_obj) =
1519 Source::find_by_id(original_job_id.as_shared_source_id())
1520 .find_also_related(Object)
1521 .one(txn)
1522 .await?
1523 .ok_or_else(|| {
1524 MetaError::catalog_id_not_found("object", original_job_id)
1525 })?;
1526 objects.push(PbObject {
1527 object_info: Some(PbObjectInfo::Source(
1528 ObjectModel(source, source_obj.unwrap(), None).into(),
1529 )),
1530 })
1531 }
1532 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1533 }
1534
1535 if let Some(expr_rewriter) = index_item_rewriter {
1536 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1537 .select_only()
1538 .columns([index::Column::IndexId, index::Column::IndexItems])
1539 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1540 .into_tuple()
1541 .all(txn)
1542 .await?;
1543 for (index_id, nodes) in index_items {
1544 let mut pb_nodes = nodes.to_protobuf();
1545 pb_nodes
1546 .iter_mut()
1547 .for_each(|x| expr_rewriter.rewrite_expr(x));
1548 let index = index::ActiveModel {
1549 index_id: Set(index_id),
1550 index_items: Set(pb_nodes.into()),
1551 ..Default::default()
1552 }
1553 .update(txn)
1554 .await?;
1555 let (index_obj, streaming_job) = Object::find_by_id(index.index_id)
1556 .find_also_related(streaming_job::Entity)
1557 .one(txn)
1558 .await?
1559 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1560 objects.push(PbObject {
1561 object_info: Some(PbObjectInfo::Index(
1562 ObjectModel(index, index_obj, streaming_job).into(),
1563 )),
1564 });
1565 }
1566 }
1567
1568 if let Some(sinks) = auto_refresh_schema_sinks {
1569 for finish_sink_context in sinks {
1570 finish_fragments(
1571 txn,
1572 finish_sink_context.tmp_sink_id.as_job_id(),
1573 finish_sink_context.original_sink_id.as_job_id(),
1574 Default::default(),
1575 )
1576 .await?;
1577 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1578 .find_also_related(Object)
1579 .one(txn)
1580 .await?
1581 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1582 let sink_streaming_job =
1583 streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
1584 .one(txn)
1585 .await?;
1586 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1587 Sink::update(sink::ActiveModel {
1588 sink_id: Set(finish_sink_context.original_sink_id),
1589 columns: Set(columns.clone()),
1590 ..Default::default()
1591 })
1592 .exec(txn)
1593 .await?;
1594 sink.columns = columns;
1595 objects.push(PbObject {
1596 object_info: Some(PbObjectInfo::Sink(
1597 ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job.clone()).into(),
1598 )),
1599 });
1600 if let Some((log_store_table_id, new_log_store_table_columns)) =
1601 finish_sink_context.new_log_store_table
1602 {
1603 let new_log_store_table_columns: ColumnCatalogArray =
1604 new_log_store_table_columns.into();
1605 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1606 .find_also_related(Object)
1607 .one(txn)
1608 .await?
1609 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1610 Table::update(table::ActiveModel {
1611 table_id: Set(log_store_table_id),
1612 columns: Set(new_log_store_table_columns.clone()),
1613 ..Default::default()
1614 })
1615 .exec(txn)
1616 .await?;
1617 table.columns = new_log_store_table_columns;
1618 objects.push(PbObject {
1619 object_info: Some(PbObjectInfo::Table(
1620 ObjectModel(table, table_obj.unwrap(), sink_streaming_job.clone())
1621 .into(),
1622 )),
1623 });
1624 }
1625 }
1626 }
1627
1628 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1629 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1630 notification_objs =
1631 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1632 }
1633
1634 Ok((objects, notification_objs))
1635 }
1636
1637 pub async fn try_abort_replacing_streaming_job(
1639 &self,
1640 tmp_job_id: JobId,
1641 tmp_sink_ids: Option<Vec<ObjectId>>,
1642 ) -> MetaResult<()> {
1643 let inner = self.inner.write().await;
1644 let txn = inner.db.begin().await?;
1645 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1646 if let Some(tmp_sink_ids) = tmp_sink_ids {
1647 for tmp_sink_id in tmp_sink_ids {
1648 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1649 }
1650 }
1651 txn.commit().await?;
1652 Ok(())
1653 }
1654
1655 pub async fn update_source_rate_limit_by_source_id(
1658 &self,
1659 source_id: SourceId,
1660 rate_limit: Option<u32>,
1661 ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1662 let inner = self.inner.read().await;
1663 let txn = inner.db.begin().await?;
1664
1665 {
1666 let active_source = source::ActiveModel {
1667 source_id: Set(source_id),
1668 rate_limit: Set(rate_limit.map(|v| v as i32)),
1669 ..Default::default()
1670 };
1671 active_source.update(&txn).await?;
1672 }
1673
1674 let (source, obj) = Source::find_by_id(source_id)
1675 .find_also_related(Object)
1676 .one(&txn)
1677 .await?
1678 .ok_or_else(|| {
1679 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1680 })?;
1681
1682 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1683 let streaming_job_ids: Vec<JobId> =
1684 if let Some(table_id) = source.optional_associated_table_id {
1685 vec![table_id.as_job_id()]
1686 } else if let Some(source_info) = &source.source_info
1687 && source_info.to_protobuf().is_shared()
1688 {
1689 vec![source_id.as_share_source_job_id()]
1690 } else {
1691 ObjectDependency::find()
1692 .select_only()
1693 .column(object_dependency::Column::UsedBy)
1694 .filter(object_dependency::Column::Oid.eq(source_id))
1695 .into_tuple()
1696 .all(&txn)
1697 .await?
1698 };
1699
1700 if streaming_job_ids.is_empty() {
1701 return Err(MetaError::invalid_parameter(format!(
1702 "source id {source_id} not used by any streaming job"
1703 )));
1704 }
1705
1706 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
1707 .select_only()
1708 .columns([
1709 fragment::Column::FragmentId,
1710 fragment::Column::JobId,
1711 fragment::Column::FragmentTypeMask,
1712 fragment::Column::StreamNode,
1713 ])
1714 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1715 .into_tuple()
1716 .all(&txn)
1717 .await?;
1718 let mut fragments = fragments
1719 .into_iter()
1720 .map(|(id, job_id, mask, stream_node)| {
1721 (
1722 id,
1723 job_id,
1724 FragmentTypeMask::from(mask as u32),
1725 stream_node.to_protobuf(),
1726 )
1727 })
1728 .collect_vec();
1729
1730 fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
1731 let mut found = false;
1732 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1733 visit_stream_node_mut(stream_node, |node| {
1734 if let PbNodeBody::Source(node) = node
1735 && let Some(node_inner) = &mut node.source_inner
1736 && node_inner.source_id == source_id
1737 {
1738 node_inner.rate_limit = rate_limit;
1739 found = true;
1740 }
1741 });
1742 }
1743 if is_fs_source {
1744 visit_stream_node_mut(stream_node, |node| {
1747 if let PbNodeBody::StreamFsFetch(node) = node {
1748 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1749 if let Some(node_inner) = &mut node.node_inner
1750 && node_inner.source_id == source_id
1751 {
1752 node_inner.rate_limit = rate_limit;
1753 found = true;
1754 }
1755 }
1756 });
1757 }
1758 found
1759 });
1760
1761 assert!(
1762 !fragments.is_empty(),
1763 "source id should be used by at least one fragment"
1764 );
1765
1766 let (fragment_ids, job_ids) = fragments
1767 .iter()
1768 .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
1769 .unzip();
1770
1771 for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
1772 fragment::ActiveModel {
1773 fragment_id: Set(fragment_id),
1774 fragment_type_mask: Set(fragment_type_mask.into()),
1775 stream_node: Set(StreamNode::from(&stream_node)),
1776 ..Default::default()
1777 }
1778 .update(&txn)
1779 .await?;
1780 }
1781
1782 txn.commit().await?;
1783
1784 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap(), None).into());
1785 let _version = self
1786 .notify_frontend(
1787 NotificationOperation::Update,
1788 NotificationInfo::ObjectGroup(PbObjectGroup {
1789 objects: vec![PbObject {
1790 object_info: Some(relation_info),
1791 }],
1792 }),
1793 )
1794 .await;
1795
1796 Ok((job_ids, fragment_ids))
1797 }
1798
1799 pub async fn mutate_fragments_by_job_id(
1802 &self,
1803 job_id: JobId,
1804 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1806 err_msg: &'static str,
1808 ) -> MetaResult<HashSet<FragmentId>> {
1809 let inner = self.inner.read().await;
1810 let txn = inner.db.begin().await?;
1811
1812 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1813 .select_only()
1814 .columns([
1815 fragment::Column::FragmentId,
1816 fragment::Column::FragmentTypeMask,
1817 fragment::Column::StreamNode,
1818 ])
1819 .filter(fragment::Column::JobId.eq(job_id))
1820 .into_tuple()
1821 .all(&txn)
1822 .await?;
1823 let mut fragments = fragments
1824 .into_iter()
1825 .map(|(id, mask, stream_node)| {
1826 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1827 })
1828 .collect_vec();
1829
1830 let fragments = fragments
1831 .iter_mut()
1832 .map(|(_, fragment_type_mask, stream_node)| {
1833 fragments_mutation_fn(*fragment_type_mask, stream_node)
1834 })
1835 .collect::<MetaResult<Vec<bool>>>()?
1836 .into_iter()
1837 .zip_eq_debug(std::mem::take(&mut fragments))
1838 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1839 .collect::<Vec<_>>();
1840
1841 if fragments.is_empty() {
1842 return Err(MetaError::invalid_parameter(format!(
1843 "job id {job_id}: {}",
1844 err_msg
1845 )));
1846 }
1847
1848 let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1849 for (id, _, stream_node) in fragments {
1850 fragment::ActiveModel {
1851 fragment_id: Set(id),
1852 stream_node: Set(StreamNode::from(&stream_node)),
1853 ..Default::default()
1854 }
1855 .update(&txn)
1856 .await?;
1857 }
1858
1859 txn.commit().await?;
1860
1861 Ok(fragment_ids)
1862 }
1863
1864 async fn mutate_fragment_by_fragment_id(
1865 &self,
1866 fragment_id: FragmentId,
1867 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1868 err_msg: &'static str,
1869 ) -> MetaResult<()> {
1870 let inner = self.inner.read().await;
1871 let txn = inner.db.begin().await?;
1872
1873 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1874 Fragment::find_by_id(fragment_id)
1875 .select_only()
1876 .columns([
1877 fragment::Column::FragmentTypeMask,
1878 fragment::Column::StreamNode,
1879 ])
1880 .into_tuple()
1881 .one(&txn)
1882 .await?
1883 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1884 let mut pb_stream_node = stream_node.to_protobuf();
1885 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1886
1887 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1888 return Err(MetaError::invalid_parameter(format!(
1889 "fragment id {fragment_id}: {}",
1890 err_msg
1891 )));
1892 }
1893
1894 fragment::ActiveModel {
1895 fragment_id: Set(fragment_id),
1896 stream_node: Set(stream_node),
1897 ..Default::default()
1898 }
1899 .update(&txn)
1900 .await?;
1901
1902 txn.commit().await?;
1903
1904 Ok(())
1905 }
1906
1907 pub async fn update_backfill_rate_limit_by_job_id(
1910 &self,
1911 job_id: JobId,
1912 rate_limit: Option<u32>,
1913 ) -> MetaResult<HashSet<FragmentId>> {
1914 let update_backfill_rate_limit =
1915 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1916 let mut found = false;
1917 if fragment_type_mask
1918 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1919 {
1920 visit_stream_node_mut(stream_node, |node| match node {
1921 PbNodeBody::StreamCdcScan(node) => {
1922 node.rate_limit = rate_limit;
1923 found = true;
1924 }
1925 PbNodeBody::StreamScan(node) => {
1926 node.rate_limit = rate_limit;
1927 found = true;
1928 }
1929 PbNodeBody::SourceBackfill(node) => {
1930 node.rate_limit = rate_limit;
1931 found = true;
1932 }
1933 _ => {}
1934 });
1935 }
1936 Ok(found)
1937 };
1938
1939 self.mutate_fragments_by_job_id(
1940 job_id,
1941 update_backfill_rate_limit,
1942 "stream scan node or source node not found",
1943 )
1944 .await
1945 }
1946
1947 pub async fn update_sink_rate_limit_by_job_id(
1950 &self,
1951 sink_id: SinkId,
1952 rate_limit: Option<u32>,
1953 ) -> MetaResult<HashSet<FragmentId>> {
1954 let update_sink_rate_limit =
1955 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1956 let mut found = Ok(false);
1957 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1958 visit_stream_node_mut(stream_node, |node| {
1959 if let PbNodeBody::Sink(node) = node {
1960 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1961 found = Err(MetaError::invalid_parameter(
1962 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1963 ));
1964 return;
1965 }
1966 node.rate_limit = rate_limit;
1967 found = Ok(true);
1968 }
1969 });
1970 }
1971 found
1972 };
1973
1974 self.mutate_fragments_by_job_id(
1975 sink_id.as_job_id(),
1976 update_sink_rate_limit,
1977 "sink node not found",
1978 )
1979 .await
1980 }
1981
1982 pub async fn update_dml_rate_limit_by_job_id(
1983 &self,
1984 job_id: JobId,
1985 rate_limit: Option<u32>,
1986 ) -> MetaResult<HashSet<FragmentId>> {
1987 let update_dml_rate_limit =
1988 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1989 let mut found = false;
1990 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1991 visit_stream_node_mut(stream_node, |node| {
1992 if let PbNodeBody::Dml(node) = node {
1993 node.rate_limit = rate_limit;
1994 found = true;
1995 }
1996 });
1997 }
1998 Ok(found)
1999 };
2000
2001 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
2002 .await
2003 }
2004
2005 pub async fn update_source_props_by_source_id(
2006 &self,
2007 source_id: SourceId,
2008 alter_props: BTreeMap<String, String>,
2009 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2010 ) -> MetaResult<WithOptionsSecResolved> {
2011 let inner = self.inner.read().await;
2012 let txn = inner.db.begin().await?;
2013
2014 let (source, _obj) = Source::find_by_id(source_id)
2015 .find_also_related(Object)
2016 .one(&txn)
2017 .await?
2018 .ok_or_else(|| {
2019 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2020 })?;
2021 let connector = source.with_properties.0.get_connector().unwrap();
2022 let is_shared_source = source.is_shared();
2023
2024 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2025 if !is_shared_source {
2026 dep_source_job_ids = ObjectDependency::find()
2028 .select_only()
2029 .column(object_dependency::Column::UsedBy)
2030 .filter(object_dependency::Column::Oid.eq(source_id))
2031 .into_tuple()
2032 .all(&txn)
2033 .await?;
2034 }
2035
2036 let prop_keys: Vec<String> = alter_props
2038 .keys()
2039 .chain(alter_secret_refs.keys())
2040 .cloned()
2041 .collect();
2042 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2043 &connector, &prop_keys,
2044 )?;
2045
2046 let mut options_with_secret = WithOptionsSecResolved::new(
2047 source.with_properties.0.clone(),
2048 source
2049 .secret_ref
2050 .map(|secret_ref| secret_ref.to_protobuf())
2051 .unwrap_or_default(),
2052 );
2053 let (to_add_secret_dep, to_remove_secret_dep) =
2054 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2055
2056 tracing::info!(
2057 "applying new properties to source: source_id={}, options_with_secret={:?}",
2058 source_id,
2059 options_with_secret
2060 );
2061 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2063 let mut associate_table_id = None;
2066
2067 let mut preferred_id = source_id.as_object_id();
2071 let rewrite_sql = {
2072 let definition = source.definition.clone();
2073
2074 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2075 .map_err(|e| {
2076 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2077 anyhow!(e).context("Failed to parse source definition SQL"),
2078 )))
2079 })?
2080 .try_into()
2081 .unwrap();
2082
2083 async fn format_with_option_secret_resolved(
2097 txn: &DatabaseTransaction,
2098 options_with_secret: &WithOptionsSecResolved,
2099 ) -> MetaResult<Vec<SqlOption>> {
2100 let mut options = Vec::new();
2101 for (k, v) in options_with_secret.as_plaintext() {
2102 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2103 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2104 options.push(sql_option);
2105 }
2106 for (k, v) in options_with_secret.as_secret() {
2107 if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2108 let sql_option =
2109 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2110 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2111 options.push(sql_option);
2112 } else {
2113 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2114 }
2115 }
2116 Ok(options)
2117 }
2118
2119 match &mut stmt {
2120 Statement::CreateSource { stmt } => {
2121 stmt.with_properties.0 =
2122 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2123 }
2124 Statement::CreateTable { with_options, .. } => {
2125 *with_options =
2126 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2127 associate_table_id = source.optional_associated_table_id;
2128 preferred_id = associate_table_id.unwrap().as_object_id();
2129 }
2130 _ => unreachable!(),
2131 }
2132
2133 stmt.to_string()
2134 };
2135
2136 {
2137 if !to_add_secret_dep.is_empty() {
2139 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2140 object_dependency::ActiveModel {
2141 oid: Set(secret_id.into()),
2142 used_by: Set(preferred_id),
2143 ..Default::default()
2144 }
2145 }))
2146 .exec(&txn)
2147 .await?;
2148 }
2149 if !to_remove_secret_dep.is_empty() {
2150 let _ = ObjectDependency::delete_many()
2152 .filter(
2153 object_dependency::Column::Oid
2154 .is_in(to_remove_secret_dep)
2155 .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2156 )
2157 .exec(&txn)
2158 .await?;
2159 }
2160 }
2161
2162 let active_source_model = source::ActiveModel {
2163 source_id: Set(source_id),
2164 definition: Set(rewrite_sql.clone()),
2165 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2166 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2167 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2168 ..Default::default()
2169 };
2170 active_source_model.update(&txn).await?;
2171
2172 if let Some(associate_table_id) = associate_table_id {
2173 let active_table_model = table::ActiveModel {
2175 table_id: Set(associate_table_id),
2176 definition: Set(rewrite_sql),
2177 ..Default::default()
2178 };
2179 active_table_model.update(&txn).await?;
2180 }
2181
2182 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2183 associate_table_id.as_job_id()
2185 } else {
2186 source_id.as_share_source_job_id()
2187 }]
2188 .into_iter()
2189 .chain(dep_source_job_ids.into_iter())
2190 .collect_vec();
2191
2192 update_connector_props_fragments(
2194 &txn,
2195 to_check_job_ids,
2196 FragmentTypeFlag::Source,
2197 |node, found| {
2198 if let PbNodeBody::Source(node) = node
2199 && let Some(source_inner) = &mut node.source_inner
2200 {
2201 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2202 source_inner.secret_refs = options_with_secret.as_secret().clone();
2203 *found = true;
2204 }
2205 },
2206 is_shared_source,
2207 )
2208 .await?;
2209
2210 let mut to_update_objs = Vec::with_capacity(2);
2211 let (source, obj) = Source::find_by_id(source_id)
2212 .find_also_related(Object)
2213 .one(&txn)
2214 .await?
2215 .ok_or_else(|| {
2216 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2217 })?;
2218 to_update_objs.push(PbObject {
2219 object_info: Some(PbObjectInfo::Source(
2220 ObjectModel(source, obj.unwrap(), None).into(),
2221 )),
2222 });
2223
2224 if let Some(associate_table_id) = associate_table_id {
2225 let (table, obj) = Table::find_by_id(associate_table_id)
2226 .find_also_related(Object)
2227 .one(&txn)
2228 .await?
2229 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2230 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2231 .one(&txn)
2232 .await?;
2233 to_update_objs.push(PbObject {
2234 object_info: Some(PbObjectInfo::Table(
2235 ObjectModel(table, obj.unwrap(), streaming_job).into(),
2236 )),
2237 });
2238 }
2239
2240 txn.commit().await?;
2241
2242 self.notify_frontend(
2243 NotificationOperation::Update,
2244 NotificationInfo::ObjectGroup(PbObjectGroup {
2245 objects: to_update_objs,
2246 }),
2247 )
2248 .await;
2249
2250 Ok(options_with_secret)
2251 }
2252
2253 pub async fn update_sink_props_by_sink_id(
2254 &self,
2255 sink_id: SinkId,
2256 props: BTreeMap<String, String>,
2257 ) -> MetaResult<HashMap<String, String>> {
2258 let inner = self.inner.read().await;
2259 let txn = inner.db.begin().await?;
2260
2261 let (sink, _obj) = Sink::find_by_id(sink_id)
2262 .find_also_related(Object)
2263 .one(&txn)
2264 .await?
2265 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2266 validate_sink_props(&sink, &props)?;
2267 let definition = sink.definition.clone();
2268 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2269 .map_err(|e| SinkError::Config(anyhow!(e)))?
2270 .try_into()
2271 .unwrap();
2272 if let Statement::CreateSink { stmt } = &mut stmt {
2273 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2274 } else {
2275 panic!("definition is not a create sink statement")
2276 }
2277 let mut new_config = sink.properties.clone().into_inner();
2278 new_config.extend(props.clone());
2279
2280 let definition = stmt.to_string();
2281 let active_sink = sink::ActiveModel {
2282 sink_id: Set(sink_id),
2283 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2284 definition: Set(definition),
2285 ..Default::default()
2286 };
2287 active_sink.update(&txn).await?;
2288
2289 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2290 let (sink, obj) = Sink::find_by_id(sink_id)
2291 .find_also_related(Object)
2292 .one(&txn)
2293 .await?
2294 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2295 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2296 .one(&txn)
2297 .await?;
2298 txn.commit().await?;
2299 let relation_infos = vec![PbObject {
2300 object_info: Some(PbObjectInfo::Sink(
2301 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2302 )),
2303 }];
2304
2305 let _version = self
2306 .notify_frontend(
2307 NotificationOperation::Update,
2308 NotificationInfo::ObjectGroup(PbObjectGroup {
2309 objects: relation_infos,
2310 }),
2311 )
2312 .await;
2313
2314 Ok(props.into_iter().collect())
2315 }
2316
2317 pub async fn update_iceberg_table_props_by_table_id(
2318 &self,
2319 table_id: TableId,
2320 props: BTreeMap<String, String>,
2321 alter_iceberg_table_props: Option<
2322 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2323 >,
2324 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2325 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2326 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2327 let inner = self.inner.read().await;
2328 let txn = inner.db.begin().await?;
2329
2330 let (sink, _obj) = Sink::find_by_id(sink_id)
2331 .find_also_related(Object)
2332 .one(&txn)
2333 .await?
2334 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2335 validate_sink_props(&sink, &props)?;
2336
2337 let definition = sink.definition.clone();
2338 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2339 .map_err(|e| SinkError::Config(anyhow!(e)))?
2340 .try_into()
2341 .unwrap();
2342 if let Statement::CreateTable {
2343 with_options,
2344 engine,
2345 ..
2346 } = &mut stmt
2347 {
2348 if !matches!(engine, Engine::Iceberg) {
2349 return Err(SinkError::Config(anyhow!(
2350 "only iceberg table can be altered as sink"
2351 ))
2352 .into());
2353 }
2354 update_stmt_with_props(with_options, &props)?;
2355 } else {
2356 panic!("definition is not a create iceberg table statement")
2357 }
2358 let mut new_config = sink.properties.clone().into_inner();
2359 new_config.extend(props.clone());
2360
2361 let definition = stmt.to_string();
2362 let active_sink = sink::ActiveModel {
2363 sink_id: Set(sink_id),
2364 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2365 definition: Set(definition.clone()),
2366 ..Default::default()
2367 };
2368 let active_source = source::ActiveModel {
2369 source_id: Set(source_id),
2370 definition: Set(definition.clone()),
2371 ..Default::default()
2372 };
2373 let active_table = table::ActiveModel {
2374 table_id: Set(table_id),
2375 definition: Set(definition),
2376 ..Default::default()
2377 };
2378 active_sink.update(&txn).await?;
2379 active_source.update(&txn).await?;
2380 active_table.update(&txn).await?;
2381
2382 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2383
2384 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2385 .find_also_related(Object)
2386 .one(&txn)
2387 .await?
2388 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2389 let sink_streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2390 .one(&txn)
2391 .await?;
2392 let (source, source_obj) = Source::find_by_id(source_id)
2393 .find_also_related(Object)
2394 .one(&txn)
2395 .await?
2396 .ok_or_else(|| {
2397 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2398 })?;
2399 let (table, table_obj) = Table::find_by_id(table_id)
2400 .find_also_related(Object)
2401 .one(&txn)
2402 .await?
2403 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2404 let table_streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2405 .one(&txn)
2406 .await?;
2407 txn.commit().await?;
2408 let relation_infos = vec![
2409 PbObject {
2410 object_info: Some(PbObjectInfo::Sink(
2411 ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job).into(),
2412 )),
2413 },
2414 PbObject {
2415 object_info: Some(PbObjectInfo::Source(
2416 ObjectModel(source, source_obj.unwrap(), None).into(),
2417 )),
2418 },
2419 PbObject {
2420 object_info: Some(PbObjectInfo::Table(
2421 ObjectModel(table, table_obj.unwrap(), table_streaming_job).into(),
2422 )),
2423 },
2424 ];
2425 let _version = self
2426 .notify_frontend(
2427 NotificationOperation::Update,
2428 NotificationInfo::ObjectGroup(PbObjectGroup {
2429 objects: relation_infos,
2430 }),
2431 )
2432 .await;
2433
2434 Ok((props.into_iter().collect(), sink_id))
2435 }
2436
2437 pub async fn update_connection_and_dependent_objects_props(
2439 &self,
2440 connection_id: ConnectionId,
2441 alter_props: BTreeMap<String, String>,
2442 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2443 ) -> MetaResult<(
2444 WithOptionsSecResolved, Vec<(SourceId, HashMap<String, String>)>, Vec<(SinkId, HashMap<String, String>)>, )> {
2448 let inner = self.inner.read().await;
2449 let txn = inner.db.begin().await?;
2450
2451 let dependent_sources: Vec<SourceId> = Source::find()
2453 .select_only()
2454 .column(source::Column::SourceId)
2455 .filter(source::Column::ConnectionId.eq(connection_id))
2456 .into_tuple()
2457 .all(&txn)
2458 .await?;
2459
2460 let dependent_sinks: Vec<SinkId> = Sink::find()
2461 .select_only()
2462 .column(sink::Column::SinkId)
2463 .filter(sink::Column::ConnectionId.eq(connection_id))
2464 .into_tuple()
2465 .all(&txn)
2466 .await?;
2467
2468 let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2469 .find_also_related(Object)
2470 .one(&txn)
2471 .await?
2472 .ok_or_else(|| {
2473 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2474 })?;
2475
2476 let prop_keys: Vec<String> = alter_props
2478 .keys()
2479 .chain(alter_secret_refs.keys())
2480 .cloned()
2481 .collect();
2482
2483 let connection_type_str = pb_connection_type_to_connection_type(
2485 &connection_catalog.params.to_protobuf().connection_type(),
2486 )
2487 .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2488
2489 risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2490 connection_type_str, &prop_keys,
2491 )?;
2492
2493 let connection_pb = connection_catalog.params.to_protobuf();
2494 let mut connection_options_with_secret = WithOptionsSecResolved::new(
2495 connection_pb.properties.into_iter().collect(),
2496 connection_pb.secret_refs.into_iter().collect(),
2497 );
2498
2499 let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2500 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2501
2502 tracing::debug!(
2503 "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2504 connection_id,
2505 dependent_sources,
2506 dependent_sinks
2507 );
2508
2509 {
2511 let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2512 connection_type: connection_pb.connection_type,
2513 properties: connection_options_with_secret
2514 .as_plaintext()
2515 .clone()
2516 .into_iter()
2517 .collect(),
2518 secret_refs: connection_options_with_secret
2519 .as_secret()
2520 .clone()
2521 .into_iter()
2522 .collect(),
2523 };
2524 let connection = PbConnection {
2525 id: connection_id as _,
2526 info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2527 conn_params_pb,
2528 )),
2529 ..Default::default()
2530 };
2531 validate_connection(&connection).await?;
2532 }
2533
2534 if !to_add_secret_dep.is_empty() {
2536 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2537 object_dependency::ActiveModel {
2538 oid: Set(secret_id.into()),
2539 used_by: Set(connection_id.as_object_id()),
2540 ..Default::default()
2541 }
2542 }))
2543 .exec(&txn)
2544 .await?;
2545 }
2546 if !to_remove_secret_dep.is_empty() {
2547 let _ = ObjectDependency::delete_many()
2548 .filter(
2549 object_dependency::Column::Oid
2550 .is_in(to_remove_secret_dep)
2551 .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2552 )
2553 .exec(&txn)
2554 .await?;
2555 }
2556
2557 let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2559 connection_type: connection_pb.connection_type,
2560 properties: connection_options_with_secret
2561 .as_plaintext()
2562 .clone()
2563 .into_iter()
2564 .collect(),
2565 secret_refs: connection_options_with_secret
2566 .as_secret()
2567 .clone()
2568 .into_iter()
2569 .collect(),
2570 };
2571 let active_connection_model = connection::ActiveModel {
2572 connection_id: Set(connection_id),
2573 params: Set(ConnectionParams::from(&updated_connection_params)),
2574 ..Default::default()
2575 };
2576 active_connection_model.update(&txn).await?;
2577
2578 let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2580
2581 if !dependent_sources.is_empty() {
2582 let sources_with_objs = Source::find()
2584 .find_also_related(Object)
2585 .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2586 .all(&txn)
2587 .await?;
2588
2589 let mut source_updates = Vec::new();
2591 let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2592
2593 for (source, _obj) in sources_with_objs {
2594 let source_id = source.source_id;
2595
2596 let mut source_options_with_secret = WithOptionsSecResolved::new(
2597 source.with_properties.0.clone(),
2598 source
2599 .secret_ref
2600 .clone()
2601 .map(|secret_ref| secret_ref.to_protobuf())
2602 .unwrap_or_default(),
2603 );
2604 let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2605 source_options_with_secret
2606 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2607
2608 let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2610
2611 let active_source = source::ActiveModel {
2613 source_id: Set(source_id),
2614 with_properties: Set(Property(
2615 source_options_with_secret.as_plaintext().clone(),
2616 )),
2617 secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2618 || {
2619 risingwave_meta_model::SecretRef::from(
2620 source_options_with_secret.as_secret().clone(),
2621 )
2622 },
2623 )),
2624 ..Default::default()
2625 };
2626 source_updates.push(active_source);
2627
2628 let is_shared_source = source.is_shared();
2633 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2634 if !is_shared_source {
2635 dep_source_job_ids = ObjectDependency::find()
2636 .select_only()
2637 .column(object_dependency::Column::UsedBy)
2638 .filter(object_dependency::Column::Oid.eq(source_id))
2639 .into_tuple()
2640 .all(&txn)
2641 .await?;
2642 }
2643
2644 let base_job_id =
2645 if let Some(associate_table_id) = source.optional_associated_table_id {
2646 associate_table_id.as_job_id()
2647 } else {
2648 source_id.as_share_source_job_id()
2649 };
2650 let job_ids = vec![base_job_id]
2651 .into_iter()
2652 .chain(dep_source_job_ids.into_iter())
2653 .collect_vec();
2654
2655 fragment_updates.push(DependentSourceFragmentUpdate {
2656 job_ids,
2657 with_properties: source_options_with_secret.as_plaintext().clone(),
2658 secret_refs: source_options_with_secret.as_secret().clone(),
2659 is_shared_source,
2660 });
2661
2662 let complete_source_props = LocalSecretManager::global()
2664 .fill_secrets(
2665 source_options_with_secret.as_plaintext().clone(),
2666 source_options_with_secret.as_secret().clone(),
2667 )
2668 .map_err(MetaError::from)?
2669 .into_iter()
2670 .collect::<HashMap<String, String>>();
2671 updated_sources_with_props.push((source_id, complete_source_props));
2672 }
2673
2674 for source_update in source_updates {
2675 source_update.update(&txn).await?;
2676 }
2677
2678 for DependentSourceFragmentUpdate {
2680 job_ids,
2681 with_properties,
2682 secret_refs,
2683 is_shared_source,
2684 } in fragment_updates
2685 {
2686 update_connector_props_fragments(
2687 &txn,
2688 job_ids,
2689 FragmentTypeFlag::Source,
2690 |node, found| {
2691 if let PbNodeBody::Source(node) = node
2692 && let Some(source_inner) = &mut node.source_inner
2693 {
2694 source_inner.with_properties = with_properties.clone();
2695 source_inner.secret_refs = secret_refs.clone();
2696 *found = true;
2697 }
2698 },
2699 is_shared_source,
2700 )
2701 .await?;
2702 }
2703 }
2704
2705 let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
2707
2708 if !dependent_sinks.is_empty() {
2709 let sinks_with_objs = Sink::find()
2711 .find_also_related(Object)
2712 .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
2713 .all(&txn)
2714 .await?;
2715
2716 let mut sink_updates = Vec::new();
2718 let mut sink_fragment_updates = Vec::new();
2719
2720 for (sink, _obj) in sinks_with_objs {
2721 let sink_id = sink.sink_id;
2722
2723 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2725 Some(connector) => {
2726 let connector_type = connector.to_lowercase();
2727 check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
2728 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2729
2730 match_sink_name_str!(
2731 connector_type.as_str(),
2732 SinkType,
2733 {
2734 let mut new_sink_props = sink.properties.0.clone();
2735 new_sink_props.extend(alter_props.clone());
2736 SinkType::validate_alter_config(&new_sink_props)
2737 },
2738 |sink: &str| Err(SinkError::Config(anyhow!(
2739 "unsupported sink type {}",
2740 sink
2741 )))
2742 )?
2743 }
2744 None => {
2745 return Err(SinkError::Config(anyhow!(
2746 "connector not specified when alter sink"
2747 ))
2748 .into());
2749 }
2750 };
2751
2752 let mut new_sink_props = sink.properties.0.clone();
2753 new_sink_props.extend(alter_props.clone());
2754
2755 let active_sink = sink::ActiveModel {
2757 sink_id: Set(sink_id),
2758 properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
2759 ..Default::default()
2760 };
2761 sink_updates.push(active_sink);
2762
2763 sink_fragment_updates.push((sink_id, new_sink_props.clone()));
2765
2766 let complete_sink_props: HashMap<String, String> =
2768 new_sink_props.into_iter().collect();
2769 updated_sinks_with_props.push((sink_id, complete_sink_props));
2770 }
2771
2772 for sink_update in sink_updates {
2774 sink_update.update(&txn).await?;
2775 }
2776
2777 for (sink_id, new_sink_props) in sink_fragment_updates {
2779 update_connector_props_fragments(
2780 &txn,
2781 vec![sink_id.as_job_id()],
2782 FragmentTypeFlag::Sink,
2783 |node, found| {
2784 if let PbNodeBody::Sink(node) = node
2785 && let Some(sink_desc) = &mut node.sink_desc
2786 && sink_desc.id == sink_id.as_raw_id()
2787 {
2788 sink_desc.properties = new_sink_props.clone();
2789 *found = true;
2790 }
2791 },
2792 true,
2793 )
2794 .await?;
2795 }
2796 }
2797
2798 let mut updated_objects = Vec::new();
2800
2801 let (connection, obj) = Connection::find_by_id(connection_id)
2803 .find_also_related(Object)
2804 .one(&txn)
2805 .await?
2806 .ok_or_else(|| {
2807 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2808 })?;
2809 updated_objects.push(PbObject {
2810 object_info: Some(PbObjectInfo::Connection(
2811 ObjectModel(connection, obj.unwrap(), None).into(),
2812 )),
2813 });
2814
2815 for source_id in &dependent_sources {
2817 let (source, obj) = Source::find_by_id(*source_id)
2818 .find_also_related(Object)
2819 .one(&txn)
2820 .await?
2821 .ok_or_else(|| {
2822 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
2823 })?;
2824 updated_objects.push(PbObject {
2825 object_info: Some(PbObjectInfo::Source(
2826 ObjectModel(source, obj.unwrap(), None).into(),
2827 )),
2828 });
2829 }
2830
2831 for sink_id in &dependent_sinks {
2833 let (sink, obj) = Sink::find_by_id(*sink_id)
2834 .find_also_related(Object)
2835 .one(&txn)
2836 .await?
2837 .ok_or_else(|| {
2838 MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
2839 })?;
2840 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2841 .one(&txn)
2842 .await?;
2843 updated_objects.push(PbObject {
2844 object_info: Some(PbObjectInfo::Sink(
2845 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2846 )),
2847 });
2848 }
2849
2850 txn.commit().await?;
2852
2853 if !updated_objects.is_empty() {
2855 self.notify_frontend(
2856 NotificationOperation::Update,
2857 NotificationInfo::ObjectGroup(PbObjectGroup {
2858 objects: updated_objects,
2859 }),
2860 )
2861 .await;
2862 }
2863
2864 Ok((
2865 connection_options_with_secret,
2866 updated_sources_with_props,
2867 updated_sinks_with_props,
2868 ))
2869 }
2870
2871 pub async fn update_fragment_rate_limit_by_fragment_id(
2872 &self,
2873 fragment_id: FragmentId,
2874 rate_limit: Option<u32>,
2875 ) -> MetaResult<()> {
2876 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2877 stream_node: &mut PbStreamNode| {
2878 let mut found = false;
2879 if fragment_type_mask.contains_any(
2880 FragmentTypeFlag::dml_rate_limit_fragments()
2881 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2882 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2883 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2884 ) {
2885 visit_stream_node_mut(stream_node, |node| {
2886 if let PbNodeBody::Dml(node) = node {
2887 node.rate_limit = rate_limit;
2888 found = true;
2889 }
2890 if let PbNodeBody::Sink(node) = node {
2891 node.rate_limit = rate_limit;
2892 found = true;
2893 }
2894 if let PbNodeBody::StreamCdcScan(node) = node {
2895 node.rate_limit = rate_limit;
2896 found = true;
2897 }
2898 if let PbNodeBody::StreamScan(node) = node {
2899 node.rate_limit = rate_limit;
2900 found = true;
2901 }
2902 if let PbNodeBody::SourceBackfill(node) = node {
2903 node.rate_limit = rate_limit;
2904 found = true;
2905 }
2906 });
2907 }
2908 found
2909 };
2910 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2911 .await
2912 }
2913
2914 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2917 let inner = self.inner.read().await;
2918 let txn = inner.db.begin().await?;
2919
2920 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2921 .select_only()
2922 .columns([
2923 fragment::Column::FragmentId,
2924 fragment::Column::JobId,
2925 fragment::Column::FragmentTypeMask,
2926 fragment::Column::StreamNode,
2927 ])
2928 .filter(FragmentTypeMask::intersects_any(
2929 FragmentTypeFlag::rate_limit_fragments(),
2930 ))
2931 .into_tuple()
2932 .all(&txn)
2933 .await?;
2934
2935 let mut rate_limits = Vec::new();
2936 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2937 let stream_node = stream_node.to_protobuf();
2938 visit_stream_node_body(&stream_node, |node| {
2939 let mut rate_limit = None;
2940 let mut node_name = None;
2941
2942 match node {
2943 PbNodeBody::Source(node) => {
2945 if let Some(node_inner) = &node.source_inner {
2946 rate_limit = node_inner.rate_limit;
2947 node_name = Some("SOURCE");
2948 }
2949 }
2950 PbNodeBody::StreamFsFetch(node) => {
2951 if let Some(node_inner) = &node.node_inner {
2952 rate_limit = node_inner.rate_limit;
2953 node_name = Some("FS_FETCH");
2954 }
2955 }
2956 PbNodeBody::SourceBackfill(node) => {
2958 rate_limit = node.rate_limit;
2959 node_name = Some("SOURCE_BACKFILL");
2960 }
2961 PbNodeBody::StreamScan(node) => {
2962 rate_limit = node.rate_limit;
2963 node_name = Some("STREAM_SCAN");
2964 }
2965 PbNodeBody::StreamCdcScan(node) => {
2966 rate_limit = node.rate_limit;
2967 node_name = Some("STREAM_CDC_SCAN");
2968 }
2969 PbNodeBody::Sink(node) => {
2970 rate_limit = node.rate_limit;
2971 node_name = Some("SINK");
2972 }
2973 _ => {}
2974 }
2975
2976 if let Some(rate_limit) = rate_limit {
2977 rate_limits.push(RateLimitInfo {
2978 fragment_id,
2979 job_id,
2980 fragment_type_mask: fragment_type_mask as u32,
2981 rate_limit,
2982 node_name: node_name.unwrap().to_owned(),
2983 });
2984 }
2985 });
2986 }
2987
2988 Ok(rate_limits)
2989 }
2990}
2991
2992fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2993 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2995 Some(connector) => {
2996 let connector_type = connector.to_lowercase();
2997 let field_names: Vec<String> = props.keys().cloned().collect();
2998 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2999 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3000
3001 match_sink_name_str!(
3002 connector_type.as_str(),
3003 SinkType,
3004 {
3005 let mut new_props = sink.properties.0.clone();
3006 new_props.extend(props.clone());
3007 SinkType::validate_alter_config(&new_props)
3008 },
3009 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
3010 )?
3011 }
3012 None => {
3013 return Err(
3014 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
3015 );
3016 }
3017 };
3018 Ok(())
3019}
3020
3021fn update_stmt_with_props(
3022 with_properties: &mut Vec<SqlOption>,
3023 props: &BTreeMap<String, String>,
3024) -> MetaResult<()> {
3025 let mut new_sql_options = with_properties
3026 .iter()
3027 .map(|sql_option| (&sql_option.name, sql_option))
3028 .collect::<IndexMap<_, _>>();
3029 let add_sql_options = props
3030 .iter()
3031 .map(|(k, v)| SqlOption::try_from((k, v)))
3032 .collect::<Result<Vec<SqlOption>, ParserError>>()
3033 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3034 new_sql_options.extend(
3035 add_sql_options
3036 .iter()
3037 .map(|sql_option| (&sql_option.name, sql_option)),
3038 );
3039 *with_properties = new_sql_options.into_values().cloned().collect();
3040 Ok(())
3041}
3042
3043async fn update_sink_fragment_props(
3044 txn: &DatabaseTransaction,
3045 sink_id: SinkId,
3046 props: BTreeMap<String, String>,
3047) -> MetaResult<()> {
3048 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3049 .select_only()
3050 .columns([
3051 fragment::Column::FragmentId,
3052 fragment::Column::FragmentTypeMask,
3053 fragment::Column::StreamNode,
3054 ])
3055 .filter(fragment::Column::JobId.eq(sink_id))
3056 .into_tuple()
3057 .all(txn)
3058 .await?;
3059 let fragments = fragments
3060 .into_iter()
3061 .filter(|(_, fragment_type_mask, _)| {
3062 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3063 })
3064 .filter_map(|(id, _, stream_node)| {
3065 let mut stream_node = stream_node.to_protobuf();
3066 let mut found = false;
3067 visit_stream_node_mut(&mut stream_node, |node| {
3068 if let PbNodeBody::Sink(node) = node
3069 && let Some(sink_desc) = &mut node.sink_desc
3070 && sink_desc.id == sink_id
3071 {
3072 sink_desc.properties.extend(props.clone());
3073 found = true;
3074 }
3075 });
3076 if found { Some((id, stream_node)) } else { None }
3077 })
3078 .collect_vec();
3079 assert!(
3080 !fragments.is_empty(),
3081 "sink id should be used by at least one fragment"
3082 );
3083 for (id, stream_node) in fragments {
3084 fragment::ActiveModel {
3085 fragment_id: Set(id),
3086 stream_node: Set(StreamNode::from(&stream_node)),
3087 ..Default::default()
3088 }
3089 .update(txn)
3090 .await?;
3091 }
3092 Ok(())
3093}
3094
3095pub struct SinkIntoTableContext {
3096 pub updated_sink_catalogs: Vec<SinkId>,
3099}
3100
3101pub struct FinishAutoRefreshSchemaSinkContext {
3102 pub tmp_sink_id: SinkId,
3103 pub original_sink_id: SinkId,
3104 pub columns: Vec<PbColumnCatalog>,
3105 pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3106}
3107
3108async fn update_connector_props_fragments<F>(
3109 txn: &DatabaseTransaction,
3110 job_ids: Vec<JobId>,
3111 expect_flag: FragmentTypeFlag,
3112 mut alter_stream_node_fn: F,
3113 is_shared_source: bool,
3114) -> MetaResult<()>
3115where
3116 F: FnMut(&mut PbNodeBody, &mut bool),
3117{
3118 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3119 .select_only()
3120 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3121 .filter(
3122 fragment::Column::JobId
3123 .is_in(job_ids.clone())
3124 .and(FragmentTypeMask::intersects(expect_flag)),
3125 )
3126 .into_tuple()
3127 .all(txn)
3128 .await?;
3129 let fragments = fragments
3130 .into_iter()
3131 .filter_map(|(id, stream_node)| {
3132 let mut stream_node = stream_node.to_protobuf();
3133 let mut found = false;
3134 visit_stream_node_mut(&mut stream_node, |node| {
3135 alter_stream_node_fn(node, &mut found);
3136 });
3137 if found { Some((id, stream_node)) } else { None }
3138 })
3139 .collect_vec();
3140 if is_shared_source || job_ids.len() > 1 {
3141 assert!(
3145 !fragments.is_empty(),
3146 "job ids {:?} (type: {:?}) should be used by at least one fragment",
3147 job_ids,
3148 expect_flag
3149 );
3150 }
3151
3152 for (id, stream_node) in fragments {
3153 fragment::ActiveModel {
3154 fragment_id: Set(id),
3155 stream_node: Set(StreamNode::from(&stream_node)),
3156 ..Default::default()
3157 }
3158 .update(txn)
3159 .await?;
3160 }
3161
3162 Ok(())
3163}