1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::secret::LocalSecretManager;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_common::util::stream_graph_visitor::{
28 visit_stream_node_body, visit_stream_node_mut,
29};
30use risingwave_common::{bail, current_cluster_version};
31use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
32use risingwave_connector::connector_common::validate_connection;
33use risingwave_connector::error::ConnectorError;
34use risingwave_connector::sink::file_sink::fs::FsSink;
35use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
36use risingwave_connector::source::{ConnectorProperties, pb_connection_type_to_connection_type};
37use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
40use risingwave_meta_model::refresh_job::RefreshState;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::user_privilege::Action;
43use risingwave_meta_model::*;
44use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
45use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
46use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
47use risingwave_pb::meta::object::PbObjectInfo;
48use risingwave_pb::meta::subscribe_response::{
49 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
50};
51use risingwave_pb::meta::{PbObject, PbObjectGroup};
52use risingwave_pb::plan_common::PbColumnCatalog;
53use risingwave_pb::plan_common::source_refresh_mode::{
54 RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
55};
56use risingwave_pb::secret::PbSecretRef;
57use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
58use risingwave_pb::stream_plan::stream_node::PbNodeBody;
59use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
60use risingwave_pb::user::PbUserInfo;
61use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
62use risingwave_sqlparser::parser::{Parser, ParserError};
63use sea_orm::ActiveValue::Set;
64use sea_orm::sea_query::{Expr, Query, SimpleExpr};
65use sea_orm::{
66 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
67 ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
68};
69use thiserror_ext::AsReport;
70
71use super::rename::IndexItemRewriter;
72use crate::barrier::{Command, SharedFragmentInfo};
73use crate::controller::ObjectModel;
74use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
75use crate::controller::fragment::FragmentTypeMaskExt;
76use crate::controller::utils::{
77 PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
78 check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
79 ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
80 get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
81 list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
82};
83use crate::error::MetaErrorInner;
84use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
85use crate::model::{
86 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
87 StreamJobFragmentsToCreate,
88};
89use crate::stream::SplitAssignment;
90use crate::{MetaError, MetaResult};
91
92#[derive(Debug)]
97struct DependentSourceFragmentUpdate {
98 job_ids: Vec<JobId>,
99 with_properties: BTreeMap<String, String>,
100 secret_refs: BTreeMap<String, PbSecretRef>,
101 is_shared_source: bool,
102}
103
104impl CatalogController {
105 #[allow(clippy::too_many_arguments)]
106 pub async fn create_streaming_job_obj(
107 txn: &DatabaseTransaction,
108 obj_type: ObjectType,
109 owner_id: UserId,
110 database_id: Option<DatabaseId>,
111 schema_id: Option<SchemaId>,
112 create_type: PbCreateType,
113 ctx: StreamContext,
114 streaming_parallelism: StreamingParallelism,
115 max_parallelism: usize,
116 specific_resource_group: Option<String>, backfill_parallelism: Option<StreamingParallelism>,
118 ) -> MetaResult<JobId> {
119 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
120 let job_id = obj.oid.as_job_id();
121 let job = streaming_job::ActiveModel {
122 job_id: Set(job_id),
123 job_status: Set(JobStatus::Initial),
124 create_type: Set(create_type.into()),
125 timezone: Set(ctx.timezone),
126 config_override: Set(Some(ctx.config_override.to_string())),
127 parallelism: Set(streaming_parallelism),
128 backfill_parallelism: Set(backfill_parallelism),
129 max_parallelism: Set(max_parallelism as _),
130 specific_resource_group: Set(specific_resource_group),
131 };
132 job.insert(txn).await?;
133
134 Ok(job_id)
135 }
136
137 #[await_tree::instrument]
143 pub async fn create_job_catalog(
144 &self,
145 streaming_job: &mut StreamingJob,
146 ctx: &StreamContext,
147 parallelism: &Option<Parallelism>,
148 max_parallelism: usize,
149 mut dependencies: HashSet<ObjectId>,
150 specific_resource_group: Option<String>,
151 backfill_parallelism: &Option<Parallelism>,
152 ) -> MetaResult<()> {
153 let inner = self.inner.write().await;
154 let txn = inner.db.begin().await?;
155 let create_type = streaming_job.create_type();
156
157 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
158 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
159 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
160 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
161 };
162 let backfill_parallelism = backfill_parallelism
163 .as_ref()
164 .map(|p| StreamingParallelism::Fixed(p.parallelism as _));
165
166 ensure_user_id(streaming_job.owner() as _, &txn).await?;
167 ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
168 ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
169 check_relation_name_duplicate(
170 &streaming_job.name(),
171 streaming_job.database_id(),
172 streaming_job.schema_id(),
173 &txn,
174 )
175 .await?;
176
177 if !dependencies.is_empty() {
179 let altering_cnt = ObjectDependency::find()
180 .join(
181 JoinType::InnerJoin,
182 object_dependency::Relation::Object1.def(),
183 )
184 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
185 .filter(
186 object_dependency::Column::Oid
187 .is_in(dependencies.clone())
188 .and(object::Column::ObjType.eq(ObjectType::Table))
189 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
190 .and(
191 object::Column::Oid.not_in_subquery(
193 Query::select()
194 .column(table::Column::TableId)
195 .from(Table)
196 .to_owned(),
197 ),
198 ),
199 )
200 .count(&txn)
201 .await?;
202 if altering_cnt != 0 {
203 return Err(MetaError::permission_denied(
204 "some dependent relations are being altered",
205 ));
206 }
207 }
208
209 match streaming_job {
210 StreamingJob::MaterializedView(table) => {
211 let job_id = Self::create_streaming_job_obj(
212 &txn,
213 ObjectType::Table,
214 table.owner as _,
215 Some(table.database_id),
216 Some(table.schema_id),
217 create_type,
218 ctx.clone(),
219 streaming_parallelism,
220 max_parallelism,
221 specific_resource_group,
222 backfill_parallelism.clone(),
223 )
224 .await?;
225 table.id = job_id.as_mv_table_id();
226 let table_model: table::ActiveModel = table.clone().into();
227 Table::insert(table_model).exec(&txn).await?;
228 }
229 StreamingJob::Sink(sink) => {
230 if let Some(target_table_id) = sink.target_table
231 && check_sink_into_table_cycle(
232 target_table_id.into(),
233 dependencies.iter().cloned().collect(),
234 &txn,
235 )
236 .await?
237 {
238 bail!("Creating such a sink will result in circular dependency.");
239 }
240
241 let job_id = Self::create_streaming_job_obj(
242 &txn,
243 ObjectType::Sink,
244 sink.owner as _,
245 Some(sink.database_id),
246 Some(sink.schema_id),
247 create_type,
248 ctx.clone(),
249 streaming_parallelism,
250 max_parallelism,
251 specific_resource_group,
252 backfill_parallelism.clone(),
253 )
254 .await?;
255 sink.id = job_id.as_sink_id();
256 let sink_model: sink::ActiveModel = sink.clone().into();
257 Sink::insert(sink_model).exec(&txn).await?;
258 }
259 StreamingJob::Table(src, table, _) => {
260 let job_id = Self::create_streaming_job_obj(
261 &txn,
262 ObjectType::Table,
263 table.owner as _,
264 Some(table.database_id),
265 Some(table.schema_id),
266 create_type,
267 ctx.clone(),
268 streaming_parallelism,
269 max_parallelism,
270 specific_resource_group,
271 backfill_parallelism.clone(),
272 )
273 .await?;
274 table.id = job_id.as_mv_table_id();
275 if let Some(src) = src {
276 let src_obj = Self::create_object(
277 &txn,
278 ObjectType::Source,
279 src.owner as _,
280 Some(src.database_id),
281 Some(src.schema_id),
282 )
283 .await?;
284 src.id = src_obj.oid.as_source_id();
285 src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
286 table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
287 let source: source::ActiveModel = src.clone().into();
288 Source::insert(source).exec(&txn).await?;
289 }
290 let table_model: table::ActiveModel = table.clone().into();
291 Table::insert(table_model).exec(&txn).await?;
292
293 if table.refreshable {
294 let trigger_interval_secs = src
295 .as_ref()
296 .and_then(|source_catalog| source_catalog.refresh_mode)
297 .and_then(
298 |source_refresh_mode| match source_refresh_mode.refresh_mode {
299 Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
300 refresh_interval_sec,
301 })) => refresh_interval_sec,
302 Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
303 None => None,
304 },
305 );
306
307 RefreshJob::insert(refresh_job::ActiveModel {
308 table_id: Set(table.id),
309 last_trigger_time: Set(None),
310 trigger_interval_secs: Set(trigger_interval_secs),
311 current_status: Set(RefreshState::Idle),
312 last_success_time: Set(None),
313 })
314 .exec(&txn)
315 .await?;
316 }
317 }
318 StreamingJob::Index(index, table) => {
319 ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
320 let job_id = Self::create_streaming_job_obj(
321 &txn,
322 ObjectType::Index,
323 index.owner as _,
324 Some(index.database_id),
325 Some(index.schema_id),
326 create_type,
327 ctx.clone(),
328 streaming_parallelism,
329 max_parallelism,
330 specific_resource_group,
331 backfill_parallelism.clone(),
332 )
333 .await?;
334 index.id = job_id.as_index_id();
336 index.index_table_id = job_id.as_mv_table_id();
337 table.id = job_id.as_mv_table_id();
338
339 ObjectDependency::insert(object_dependency::ActiveModel {
340 oid: Set(index.primary_table_id.into()),
341 used_by: Set(table.id.into()),
342 ..Default::default()
343 })
344 .exec(&txn)
345 .await?;
346
347 let table_model: table::ActiveModel = table.clone().into();
348 Table::insert(table_model).exec(&txn).await?;
349 let index_model: index::ActiveModel = index.clone().into();
350 Index::insert(index_model).exec(&txn).await?;
351 }
352 StreamingJob::Source(src) => {
353 let job_id = Self::create_streaming_job_obj(
354 &txn,
355 ObjectType::Source,
356 src.owner as _,
357 Some(src.database_id),
358 Some(src.schema_id),
359 create_type,
360 ctx.clone(),
361 streaming_parallelism,
362 max_parallelism,
363 specific_resource_group,
364 backfill_parallelism.clone(),
365 )
366 .await?;
367 src.id = job_id.as_shared_source_id();
368 let source_model: source::ActiveModel = src.clone().into();
369 Source::insert(source_model).exec(&txn).await?;
370 }
371 }
372
373 dependencies.extend(
375 streaming_job
376 .dependent_secret_ids()?
377 .into_iter()
378 .map(|id| id.as_object_id()),
379 );
380 dependencies.extend(
382 streaming_job
383 .dependent_connection_ids()?
384 .into_iter()
385 .map(|id| id.as_object_id()),
386 );
387
388 if !dependencies.is_empty() {
390 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
391 object_dependency::ActiveModel {
392 oid: Set(oid),
393 used_by: Set(streaming_job.id().as_object_id()),
394 ..Default::default()
395 }
396 }))
397 .exec(&txn)
398 .await?;
399 }
400
401 txn.commit().await?;
402
403 Ok(())
404 }
405
406 pub async fn create_internal_table_catalog(
414 &self,
415 job: &StreamingJob,
416 mut incomplete_internal_tables: Vec<PbTable>,
417 ) -> MetaResult<HashMap<TableId, TableId>> {
418 let job_id = job.id();
419 let inner = self.inner.write().await;
420 let txn = inner.db.begin().await?;
421
422 ensure_job_not_canceled(job_id, &txn).await?;
424
425 let mut table_id_map = HashMap::new();
426 for table in &mut incomplete_internal_tables {
427 let table_id = Self::create_object(
428 &txn,
429 ObjectType::Table,
430 table.owner as _,
431 Some(table.database_id),
432 Some(table.schema_id),
433 )
434 .await?
435 .oid
436 .as_table_id();
437 table_id_map.insert(table.id, table_id);
438 table.id = table_id;
439 table.job_id = Some(job_id);
440
441 let table_model = table::ActiveModel {
442 table_id: Set(table_id),
443 belongs_to_job_id: Set(Some(job_id)),
444 fragment_id: NotSet,
445 ..table.clone().into()
446 };
447 Table::insert(table_model).exec(&txn).await?;
448 }
449 txn.commit().await?;
450
451 Ok(table_id_map)
452 }
453
454 pub async fn prepare_stream_job_fragments(
455 &self,
456 stream_job_fragments: &StreamJobFragmentsToCreate,
457 streaming_job: &StreamingJob,
458 for_replace: bool,
459 ) -> MetaResult<()> {
460 self.prepare_streaming_job(
461 stream_job_fragments.stream_job_id(),
462 || stream_job_fragments.fragments.values(),
463 &stream_job_fragments.downstreams,
464 for_replace,
465 Some(streaming_job),
466 )
467 .await
468 }
469
470 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
475 )]
476 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
477 &self,
478 job_id: JobId,
479 get_fragments: impl Fn() -> I + 'a,
480 downstreams: &FragmentDownstreamRelation,
481 for_replace: bool,
482 creating_streaming_job: Option<&'a StreamingJob>,
483 ) -> MetaResult<()> {
484 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
485
486 let inner = self.inner.write().await;
487
488 let need_notify = creating_streaming_job
489 .map(|job| job.should_notify_creating())
490 .unwrap_or(false);
491 let definition = creating_streaming_job.map(|job| job.definition());
492
493 let mut objects_to_notify = vec![];
494 let txn = inner.db.begin().await?;
495
496 ensure_job_not_canceled(job_id, &txn).await?;
498
499 for fragment in fragments {
500 let fragment_id = fragment.fragment_id;
501 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
502
503 let fragment = fragment.into_active_model();
504 Fragment::insert(fragment).exec(&txn).await?;
505
506 if !for_replace {
509 let all_tables = StreamJobFragments::collect_tables(get_fragments());
510 for state_table_id in state_table_ids {
511 let table = all_tables
515 .get(&state_table_id)
516 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
517 assert_eq!(table.id, state_table_id);
518 assert_eq!(table.fragment_id, fragment_id);
519 let vnode_count = table.vnode_count();
520
521 table::ActiveModel {
522 table_id: Set(state_table_id as _),
523 fragment_id: Set(Some(fragment_id)),
524 vnode_count: Set(vnode_count as _),
525 ..Default::default()
526 }
527 .update(&txn)
528 .await?;
529
530 if need_notify {
531 let mut table = table.clone();
532 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
534 table.definition = definition.clone().unwrap();
535 }
536 objects_to_notify.push(PbObject {
537 object_info: Some(PbObjectInfo::Table(table)),
538 });
539 }
540 }
541 }
542 }
543
544 if need_notify {
546 match creating_streaming_job.unwrap() {
547 StreamingJob::Table(Some(source), ..) => {
548 objects_to_notify.push(PbObject {
549 object_info: Some(PbObjectInfo::Source(source.clone())),
550 });
551 }
552 StreamingJob::Sink(sink) => {
553 objects_to_notify.push(PbObject {
554 object_info: Some(PbObjectInfo::Sink(sink.clone())),
555 });
556 }
557 StreamingJob::Index(index, _) => {
558 objects_to_notify.push(PbObject {
559 object_info: Some(PbObjectInfo::Index(index.clone())),
560 });
561 }
562 _ => {}
563 }
564 }
565
566 insert_fragment_relations(&txn, downstreams).await?;
567
568 if !for_replace {
569 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
571 Table::update(table::ActiveModel {
572 table_id: Set(table.id),
573 dml_fragment_id: Set(table.dml_fragment_id),
574 ..Default::default()
575 })
576 .exec(&txn)
577 .await?;
578 }
579 }
580
581 txn.commit().await?;
582
583 if !objects_to_notify.is_empty() {
587 self.notify_frontend(
588 Operation::Add,
589 Info::ObjectGroup(PbObjectGroup {
590 objects: objects_to_notify,
591 }),
592 )
593 .await;
594 }
595
596 Ok(())
597 }
598
599 pub async fn build_cancel_command(
602 &self,
603 table_fragments: &StreamJobFragments,
604 ) -> MetaResult<Command> {
605 let inner = self.inner.read().await;
606 let txn = inner.db.begin().await?;
607
608 let dropped_sink_fragment_with_target =
609 if let Some(sink_fragment) = table_fragments.sink_fragment() {
610 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
611 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
612 sink_target_fragment
613 .get(&sink_fragment_id)
614 .map(|target_fragments| {
615 let target_fragment_id = *target_fragments
616 .first()
617 .expect("sink should have at least one downstream fragment");
618 (sink_fragment_id, target_fragment_id)
619 })
620 } else {
621 None
622 };
623
624 Ok(Command::DropStreamingJobs {
625 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
626 actors: table_fragments.actor_ids().collect(),
627 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
628 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
629 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
630 .into_iter()
631 .map(|(sink, target)| (target as _, vec![sink as _]))
632 .collect(),
633 })
634 }
635
636 #[await_tree::instrument]
640 pub async fn try_abort_creating_streaming_job(
641 &self,
642 mut job_id: JobId,
643 is_cancelled: bool,
644 ) -> MetaResult<(bool, Option<DatabaseId>)> {
645 let mut inner = self.inner.write().await;
646 let txn = inner.db.begin().await?;
647
648 let obj = Object::find_by_id(job_id).one(&txn).await?;
649 let Some(obj) = obj else {
650 tracing::warn!(
651 id = %job_id,
652 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
653 );
654 return Ok((true, None));
655 };
656 let database_id = obj
657 .database_id
658 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
659 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
660
661 if !is_cancelled && let Some(streaming_job) = &streaming_job {
662 assert_ne!(streaming_job.job_status, JobStatus::Created);
663 if streaming_job.create_type == CreateType::Background
664 && streaming_job.job_status == JobStatus::Creating
665 {
666 if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
667 && check_if_belongs_to_iceberg_table(&txn, job_id).await?
668 {
669 } else {
671 tracing::warn!(
673 id = %job_id,
674 "streaming job is created in background and still in creating status"
675 );
676 return Ok((false, Some(database_id)));
677 }
678 }
679 }
680
681 let original_job_id = job_id;
683 let original_obj_type = obj.obj_type;
684
685 let iceberg_table_id =
686 try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
687 if let Some(iceberg_table_id) = iceberg_table_id {
688 let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
691 Object::delete_many()
692 .filter(
693 object::Column::Oid
694 .eq(job_id)
695 .or(object::Column::Oid.is_in(internal_tables)),
696 )
697 .exec(&txn)
698 .await?;
699 job_id = iceberg_table_id.as_job_id();
700 };
701
702 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
703
704 let mut objs = vec![];
706 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
707
708 let mut need_notify =
709 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
710 if !need_notify {
711 if let Some(table) = &table_obj {
712 need_notify = table.table_type == TableType::MaterializedView;
713 } else if original_obj_type == ObjectType::Sink {
714 need_notify = true;
715 }
716 }
717
718 if is_cancelled {
719 let dropped_tables = Table::find()
720 .find_also_related(Object)
721 .filter(
722 table::Column::TableId.is_in(
723 internal_table_ids
724 .iter()
725 .cloned()
726 .chain(table_obj.iter().map(|t| t.table_id as _)),
727 ),
728 )
729 .all(&txn)
730 .await?
731 .into_iter()
732 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
733 inner
734 .dropped_tables
735 .extend(dropped_tables.map(|t| (t.id, t)));
736 }
737
738 if need_notify {
739 if original_obj_type == ObjectType::Sink && original_job_id != job_id {
742 let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
743 .select_only()
744 .columns([
745 object::Column::Oid,
746 object::Column::ObjType,
747 object::Column::SchemaId,
748 object::Column::DatabaseId,
749 ])
750 .into_partial_model()
751 .one(&txn)
752 .await?;
753 if let Some(orig_obj) = orig_obj {
754 objs.push(orig_obj);
755 }
756 }
757
758 let obj: Option<PartialObject> = Object::find_by_id(job_id)
759 .select_only()
760 .columns([
761 object::Column::Oid,
762 object::Column::ObjType,
763 object::Column::SchemaId,
764 object::Column::DatabaseId,
765 ])
766 .into_partial_model()
767 .one(&txn)
768 .await?;
769 let obj =
770 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
771 objs.push(obj);
772 let internal_table_objs: Vec<PartialObject> = Object::find()
773 .select_only()
774 .columns([
775 object::Column::Oid,
776 object::Column::ObjType,
777 object::Column::SchemaId,
778 object::Column::DatabaseId,
779 ])
780 .join(JoinType::InnerJoin, object::Relation::Table.def())
781 .filter(table::Column::BelongsToJobId.eq(job_id))
782 .into_partial_model()
783 .all(&txn)
784 .await?;
785 objs.extend(internal_table_objs);
786 }
787
788 if table_obj.is_none()
790 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
791 .select_only()
792 .column(sink::Column::TargetTable)
793 .into_tuple::<Option<TableId>>()
794 .one(&txn)
795 .await?
796 {
797 let tmp_id: Option<ObjectId> = ObjectDependency::find()
798 .select_only()
799 .column(object_dependency::Column::UsedBy)
800 .join(
801 JoinType::InnerJoin,
802 object_dependency::Relation::Object1.def(),
803 )
804 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
805 .filter(
806 object_dependency::Column::Oid
807 .eq(target_table_id)
808 .and(object::Column::ObjType.eq(ObjectType::Table))
809 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
810 )
811 .into_tuple()
812 .one(&txn)
813 .await?;
814 if let Some(tmp_id) = tmp_id {
815 tracing::warn!(
816 id = %tmp_id,
817 "aborting temp streaming job for sink into table"
818 );
819
820 Object::delete_by_id(tmp_id).exec(&txn).await?;
821 }
822 }
823
824 Object::delete_by_id(job_id).exec(&txn).await?;
825 if !internal_table_ids.is_empty() {
826 Object::delete_many()
827 .filter(object::Column::Oid.is_in(internal_table_ids))
828 .exec(&txn)
829 .await?;
830 }
831 if let Some(t) = &table_obj
832 && let Some(source_id) = t.optional_associated_source_id
833 {
834 Object::delete_by_id(source_id).exec(&txn).await?;
835 }
836
837 let err = if is_cancelled {
838 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
839 } else {
840 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
841 };
842 let abort_reason = format!("streaming job aborted {}", err.as_report());
843 for tx in inner
844 .creating_table_finish_notifier
845 .get_mut(&database_id)
846 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
847 .into_iter()
848 .flatten()
849 .flatten()
850 {
851 let _ = tx.send(Err(abort_reason.clone()));
852 }
853 txn.commit().await?;
854
855 if !objs.is_empty() {
856 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
859 .await;
860 }
861 Ok((true, Some(database_id)))
862 }
863
864 #[await_tree::instrument]
865 pub async fn post_collect_job_fragments(
866 &self,
867 job_id: JobId,
868 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
869 new_sink_downstream: Option<FragmentDownstreamRelation>,
870 split_assignment: Option<&SplitAssignment>,
871 ) -> MetaResult<()> {
872 let inner = self.inner.write().await;
873 let txn = inner.db.begin().await?;
874
875 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
876
877 if let Some(new_downstream) = new_sink_downstream {
878 insert_fragment_relations(&txn, &new_downstream).await?;
879 }
880
881 streaming_job::ActiveModel {
883 job_id: Set(job_id),
884 job_status: Set(JobStatus::Creating),
885 ..Default::default()
886 }
887 .update(&txn)
888 .await?;
889
890 if let Some(split_assignment) = split_assignment {
891 let fragment_splits = split_assignment
892 .iter()
893 .map(|(fragment_id, splits)| {
894 (
895 *fragment_id as _,
896 splits.values().flatten().cloned().collect_vec(),
897 )
898 })
899 .collect();
900
901 self.update_fragment_splits(&txn, &fragment_splits).await?;
902 }
903
904 txn.commit().await?;
905
906 Ok(())
907 }
908
909 pub async fn create_job_catalog_for_replace(
910 &self,
911 streaming_job: &StreamingJob,
912 ctx: Option<&StreamContext>,
913 specified_parallelism: Option<&NonZeroUsize>,
914 expected_original_max_parallelism: Option<usize>,
915 ) -> MetaResult<JobId> {
916 let id = streaming_job.id();
917 let inner = self.inner.write().await;
918 let txn = inner.db.begin().await?;
919
920 streaming_job.verify_version_for_replace(&txn).await?;
922 let referring_cnt = ObjectDependency::find()
924 .join(
925 JoinType::InnerJoin,
926 object_dependency::Relation::Object1.def(),
927 )
928 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
929 .filter(
930 object_dependency::Column::Oid
931 .eq(id)
932 .and(object::Column::ObjType.eq(ObjectType::Table))
933 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
934 )
935 .count(&txn)
936 .await?;
937 if referring_cnt != 0 {
938 return Err(MetaError::permission_denied(
939 "job is being altered or referenced by some creating jobs",
940 ));
941 }
942
943 let (original_max_parallelism, original_timezone, original_config_override): (
945 i32,
946 Option<String>,
947 Option<String>,
948 ) = StreamingJobModel::find_by_id(id)
949 .select_only()
950 .column(streaming_job::Column::MaxParallelism)
951 .column(streaming_job::Column::Timezone)
952 .column(streaming_job::Column::ConfigOverride)
953 .into_tuple()
954 .one(&txn)
955 .await?
956 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
957
958 if let Some(max_parallelism) = expected_original_max_parallelism
959 && original_max_parallelism != max_parallelism as i32
960 {
961 bail!(
964 "cannot use a different max parallelism \
965 when replacing streaming job, \
966 original: {}, new: {}",
967 original_max_parallelism,
968 max_parallelism
969 );
970 }
971
972 let parallelism = match specified_parallelism {
973 None => StreamingParallelism::Adaptive,
974 Some(n) => StreamingParallelism::Fixed(n.get() as _),
975 };
976
977 let ctx = StreamContext {
978 timezone: ctx
979 .map(|ctx| ctx.timezone.clone())
980 .unwrap_or(original_timezone),
981 config_override: original_config_override.unwrap_or_default().into(),
984 };
985
986 let new_obj_id = Self::create_streaming_job_obj(
988 &txn,
989 streaming_job.object_type(),
990 streaming_job.owner() as _,
991 Some(streaming_job.database_id() as _),
992 Some(streaming_job.schema_id() as _),
993 streaming_job.create_type(),
994 ctx,
995 parallelism,
996 original_max_parallelism as _,
997 None,
998 None,
999 )
1000 .await?;
1001
1002 ObjectDependency::insert(object_dependency::ActiveModel {
1004 oid: Set(id.as_object_id()),
1005 used_by: Set(new_obj_id.as_object_id()),
1006 ..Default::default()
1007 })
1008 .exec(&txn)
1009 .await?;
1010
1011 txn.commit().await?;
1012
1013 Ok(new_obj_id)
1014 }
1015
1016 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1018 let mut inner = self.inner.write().await;
1019 let txn = inner.db.begin().await?;
1020
1021 if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1023 tracing::info!(
1024 "streaming job {} is for iceberg table, wait for manual finish operation",
1025 job_id
1026 );
1027 return Ok(());
1028 }
1029
1030 let (notification_op, objects, updated_user_info) =
1031 Self::finish_streaming_job_inner(&txn, job_id).await?;
1032
1033 txn.commit().await?;
1034
1035 let mut version = self
1036 .notify_frontend(
1037 notification_op,
1038 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1039 )
1040 .await;
1041
1042 if !updated_user_info.is_empty() {
1044 version = self.notify_users_update(updated_user_info).await;
1045 }
1046
1047 inner
1048 .creating_table_finish_notifier
1049 .values_mut()
1050 .for_each(|creating_tables| {
1051 if let Some(txs) = creating_tables.remove(&job_id) {
1052 for tx in txs {
1053 let _ = tx.send(Ok(version));
1054 }
1055 }
1056 });
1057
1058 Ok(())
1059 }
1060
1061 pub async fn finish_streaming_job_inner(
1063 txn: &DatabaseTransaction,
1064 job_id: JobId,
1065 ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1066 let job_type = Object::find_by_id(job_id)
1067 .select_only()
1068 .column(object::Column::ObjType)
1069 .into_tuple()
1070 .one(txn)
1071 .await?
1072 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1073
1074 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1075 .select_only()
1076 .column(streaming_job::Column::CreateType)
1077 .into_tuple()
1078 .one(txn)
1079 .await?
1080 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1081
1082 let res = Object::update_many()
1084 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1085 .col_expr(
1086 object::Column::CreatedAtClusterVersion,
1087 current_cluster_version().into(),
1088 )
1089 .filter(object::Column::Oid.eq(job_id))
1090 .exec(txn)
1091 .await?;
1092 if res.rows_affected == 0 {
1093 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1094 }
1095
1096 let job = streaming_job::ActiveModel {
1098 job_id: Set(job_id),
1099 job_status: Set(JobStatus::Created),
1100 ..Default::default()
1101 };
1102 job.update(txn).await?;
1103
1104 let internal_table_objs = Table::find()
1106 .find_also_related(Object)
1107 .filter(table::Column::BelongsToJobId.eq(job_id))
1108 .all(txn)
1109 .await?;
1110 let mut objects = internal_table_objs
1111 .iter()
1112 .map(|(table, obj)| PbObject {
1113 object_info: Some(PbObjectInfo::Table(
1114 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1115 )),
1116 })
1117 .collect_vec();
1118 let mut notification_op = if create_type == CreateType::Background {
1119 NotificationOperation::Update
1120 } else {
1121 NotificationOperation::Add
1122 };
1123 let mut updated_user_info = vec![];
1124 let mut need_grant_default_privileges = true;
1125
1126 match job_type {
1127 ObjectType::Table => {
1128 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1129 .find_also_related(Object)
1130 .one(txn)
1131 .await?
1132 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1133 if table.table_type == TableType::MaterializedView {
1134 notification_op = NotificationOperation::Update;
1135 }
1136
1137 if let Some(source_id) = table.optional_associated_source_id {
1138 let (src, obj) = Source::find_by_id(source_id)
1139 .find_also_related(Object)
1140 .one(txn)
1141 .await?
1142 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1143 objects.push(PbObject {
1144 object_info: Some(PbObjectInfo::Source(
1145 ObjectModel(src, obj.unwrap()).into(),
1146 )),
1147 });
1148 }
1149 objects.push(PbObject {
1150 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1151 });
1152 }
1153 ObjectType::Sink => {
1154 let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1155 .find_also_related(Object)
1156 .one(txn)
1157 .await?
1158 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1159 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1160 need_grant_default_privileges = false;
1161 }
1162 notification_op = NotificationOperation::Update;
1165 objects.push(PbObject {
1166 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1167 });
1168 }
1169 ObjectType::Index => {
1170 need_grant_default_privileges = false;
1171 let (index, obj) = Index::find_by_id(job_id.as_index_id())
1172 .find_also_related(Object)
1173 .one(txn)
1174 .await?
1175 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1176 {
1177 let (table, obj) = Table::find_by_id(index.index_table_id)
1178 .find_also_related(Object)
1179 .one(txn)
1180 .await?
1181 .ok_or_else(|| {
1182 MetaError::catalog_id_not_found("table", index.index_table_id)
1183 })?;
1184 objects.push(PbObject {
1185 object_info: Some(PbObjectInfo::Table(
1186 ObjectModel(table, obj.unwrap()).into(),
1187 )),
1188 });
1189 }
1190
1191 let primary_table_privileges = UserPrivilege::find()
1194 .filter(
1195 user_privilege::Column::Oid
1196 .eq(index.primary_table_id)
1197 .and(user_privilege::Column::Action.eq(Action::Select)),
1198 )
1199 .all(txn)
1200 .await?;
1201 if !primary_table_privileges.is_empty() {
1202 let index_state_table_ids: Vec<TableId> = Table::find()
1203 .select_only()
1204 .column(table::Column::TableId)
1205 .filter(
1206 table::Column::BelongsToJobId
1207 .eq(job_id)
1208 .or(table::Column::TableId.eq(index.index_table_id)),
1209 )
1210 .into_tuple()
1211 .all(txn)
1212 .await?;
1213 let mut new_privileges = vec![];
1214 for privilege in &primary_table_privileges {
1215 for state_table_id in &index_state_table_ids {
1216 new_privileges.push(user_privilege::ActiveModel {
1217 id: Default::default(),
1218 oid: Set(state_table_id.as_object_id()),
1219 user_id: Set(privilege.user_id),
1220 action: Set(Action::Select),
1221 dependent_id: Set(privilege.dependent_id),
1222 granted_by: Set(privilege.granted_by),
1223 with_grant_option: Set(privilege.with_grant_option),
1224 });
1225 }
1226 }
1227 UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1228
1229 updated_user_info = list_user_info_by_ids(
1230 primary_table_privileges.into_iter().map(|p| p.user_id),
1231 txn,
1232 )
1233 .await?;
1234 }
1235
1236 objects.push(PbObject {
1237 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1238 });
1239 }
1240 ObjectType::Source => {
1241 let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1242 .find_also_related(Object)
1243 .one(txn)
1244 .await?
1245 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1246 objects.push(PbObject {
1247 object_info: Some(PbObjectInfo::Source(
1248 ObjectModel(source, obj.unwrap()).into(),
1249 )),
1250 });
1251 }
1252 _ => unreachable!("invalid job type: {:?}", job_type),
1253 }
1254
1255 if need_grant_default_privileges {
1256 updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1257 }
1258
1259 Ok((notification_op, objects, updated_user_info))
1260 }
1261
1262 pub async fn finish_replace_streaming_job(
1263 &self,
1264 tmp_id: JobId,
1265 streaming_job: StreamingJob,
1266 replace_upstream: FragmentReplaceUpstream,
1267 sink_into_table_context: SinkIntoTableContext,
1268 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1269 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1270 ) -> MetaResult<NotificationVersion> {
1271 let inner = self.inner.write().await;
1272 let txn = inner.db.begin().await?;
1273
1274 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1275 tmp_id,
1276 replace_upstream,
1277 sink_into_table_context,
1278 &txn,
1279 streaming_job,
1280 drop_table_connector_ctx,
1281 auto_refresh_schema_sinks,
1282 )
1283 .await?;
1284
1285 txn.commit().await?;
1286
1287 let mut version = self
1288 .notify_frontend(
1289 NotificationOperation::Update,
1290 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1291 )
1292 .await;
1293
1294 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1295 self.notify_users_update(user_infos).await;
1296 version = self
1297 .notify_frontend(
1298 NotificationOperation::Delete,
1299 build_object_group_for_delete(to_drop_objects),
1300 )
1301 .await;
1302 }
1303
1304 Ok(version)
1305 }
1306
1307 pub async fn finish_replace_streaming_job_inner(
1308 tmp_id: JobId,
1309 replace_upstream: FragmentReplaceUpstream,
1310 SinkIntoTableContext {
1311 updated_sink_catalogs,
1312 }: SinkIntoTableContext,
1313 txn: &DatabaseTransaction,
1314 streaming_job: StreamingJob,
1315 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1316 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1317 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1318 let original_job_id = streaming_job.id();
1319 let job_type = streaming_job.job_type();
1320
1321 let mut index_item_rewriter = None;
1322
1323 match streaming_job {
1325 StreamingJob::Table(_source, table, _table_job_type) => {
1326 let original_column_catalogs =
1329 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1330
1331 index_item_rewriter = Some({
1332 let original_columns = original_column_catalogs
1333 .to_protobuf()
1334 .into_iter()
1335 .map(|c| c.column_desc.unwrap())
1336 .collect_vec();
1337 let new_columns = table
1338 .columns
1339 .iter()
1340 .map(|c| c.column_desc.clone().unwrap())
1341 .collect_vec();
1342
1343 IndexItemRewriter {
1344 original_columns,
1345 new_columns,
1346 }
1347 });
1348
1349 for sink_id in updated_sink_catalogs {
1351 sink::ActiveModel {
1352 sink_id: Set(sink_id as _),
1353 original_target_columns: Set(Some(original_column_catalogs.clone())),
1354 ..Default::default()
1355 }
1356 .update(txn)
1357 .await?;
1358 }
1359 let mut table = table::ActiveModel::from(table);
1361 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1362 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1363 {
1364 table.optional_associated_source_id = Set(None);
1366 }
1367
1368 table.update(txn).await?;
1369 }
1370 StreamingJob::Source(source) => {
1371 let source = source::ActiveModel::from(source);
1373 source.update(txn).await?;
1374 }
1375 StreamingJob::MaterializedView(table) => {
1376 let table = table::ActiveModel::from(table);
1378 table.update(txn).await?;
1379 }
1380 _ => unreachable!(
1381 "invalid streaming job type: {:?}",
1382 streaming_job.job_type_str()
1383 ),
1384 }
1385
1386 async fn finish_fragments(
1387 txn: &DatabaseTransaction,
1388 tmp_id: JobId,
1389 original_job_id: JobId,
1390 replace_upstream: FragmentReplaceUpstream,
1391 ) -> MetaResult<()> {
1392 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1396 .select_only()
1397 .columns([
1398 fragment::Column::FragmentId,
1399 fragment::Column::StateTableIds,
1400 ])
1401 .filter(fragment::Column::JobId.eq(tmp_id))
1402 .into_tuple()
1403 .all(txn)
1404 .await?;
1405 for (fragment_id, state_table_ids) in fragment_info {
1406 for state_table_id in state_table_ids.into_inner() {
1407 let state_table_id = TableId::new(state_table_id as _);
1408 table::ActiveModel {
1409 table_id: Set(state_table_id),
1410 fragment_id: Set(Some(fragment_id)),
1411 ..Default::default()
1413 }
1414 .update(txn)
1415 .await?;
1416 }
1417 }
1418
1419 Fragment::delete_many()
1421 .filter(fragment::Column::JobId.eq(original_job_id))
1422 .exec(txn)
1423 .await?;
1424 Fragment::update_many()
1425 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1426 .filter(fragment::Column::JobId.eq(tmp_id))
1427 .exec(txn)
1428 .await?;
1429
1430 for (fragment_id, fragment_replace_map) in replace_upstream {
1433 let (fragment_id, mut stream_node) =
1434 Fragment::find_by_id(fragment_id as FragmentId)
1435 .select_only()
1436 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1437 .into_tuple::<(FragmentId, StreamNode)>()
1438 .one(txn)
1439 .await?
1440 .map(|(id, node)| (id, node.to_protobuf()))
1441 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1442
1443 visit_stream_node_mut(&mut stream_node, |body| {
1444 if let PbNodeBody::Merge(m) = body
1445 && let Some(new_fragment_id) =
1446 fragment_replace_map.get(&m.upstream_fragment_id)
1447 {
1448 m.upstream_fragment_id = *new_fragment_id;
1449 }
1450 });
1451 fragment::ActiveModel {
1452 fragment_id: Set(fragment_id),
1453 stream_node: Set(StreamNode::from(&stream_node)),
1454 ..Default::default()
1455 }
1456 .update(txn)
1457 .await?;
1458 }
1459
1460 Object::delete_by_id(tmp_id).exec(txn).await?;
1462
1463 Ok(())
1464 }
1465
1466 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1467
1468 let mut objects = vec![];
1470 match job_type {
1471 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1472 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1473 .find_also_related(Object)
1474 .one(txn)
1475 .await?
1476 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1477 objects.push(PbObject {
1478 object_info: Some(PbObjectInfo::Table(
1479 ObjectModel(table, table_obj.unwrap()).into(),
1480 )),
1481 })
1482 }
1483 StreamingJobType::Source => {
1484 let (source, source_obj) =
1485 Source::find_by_id(original_job_id.as_shared_source_id())
1486 .find_also_related(Object)
1487 .one(txn)
1488 .await?
1489 .ok_or_else(|| {
1490 MetaError::catalog_id_not_found("object", original_job_id)
1491 })?;
1492 objects.push(PbObject {
1493 object_info: Some(PbObjectInfo::Source(
1494 ObjectModel(source, source_obj.unwrap()).into(),
1495 )),
1496 })
1497 }
1498 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1499 }
1500
1501 if let Some(expr_rewriter) = index_item_rewriter {
1502 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1503 .select_only()
1504 .columns([index::Column::IndexId, index::Column::IndexItems])
1505 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1506 .into_tuple()
1507 .all(txn)
1508 .await?;
1509 for (index_id, nodes) in index_items {
1510 let mut pb_nodes = nodes.to_protobuf();
1511 pb_nodes
1512 .iter_mut()
1513 .for_each(|x| expr_rewriter.rewrite_expr(x));
1514 let index = index::ActiveModel {
1515 index_id: Set(index_id),
1516 index_items: Set(pb_nodes.into()),
1517 ..Default::default()
1518 }
1519 .update(txn)
1520 .await?;
1521 let index_obj = index
1522 .find_related(Object)
1523 .one(txn)
1524 .await?
1525 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1526 objects.push(PbObject {
1527 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1528 });
1529 }
1530 }
1531
1532 if let Some(sinks) = auto_refresh_schema_sinks {
1533 for finish_sink_context in sinks {
1534 finish_fragments(
1535 txn,
1536 finish_sink_context.tmp_sink_id.as_job_id(),
1537 finish_sink_context.original_sink_id.as_job_id(),
1538 Default::default(),
1539 )
1540 .await?;
1541 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1542 .find_also_related(Object)
1543 .one(txn)
1544 .await?
1545 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1546 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1547 Sink::update(sink::ActiveModel {
1548 sink_id: Set(finish_sink_context.original_sink_id),
1549 columns: Set(columns.clone()),
1550 ..Default::default()
1551 })
1552 .exec(txn)
1553 .await?;
1554 sink.columns = columns;
1555 objects.push(PbObject {
1556 object_info: Some(PbObjectInfo::Sink(
1557 ObjectModel(sink, sink_obj.unwrap()).into(),
1558 )),
1559 });
1560 if let Some((log_store_table_id, new_log_store_table_columns)) =
1561 finish_sink_context.new_log_store_table
1562 {
1563 let new_log_store_table_columns: ColumnCatalogArray =
1564 new_log_store_table_columns.into();
1565 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1566 .find_also_related(Object)
1567 .one(txn)
1568 .await?
1569 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1570 Table::update(table::ActiveModel {
1571 table_id: Set(log_store_table_id),
1572 columns: Set(new_log_store_table_columns.clone()),
1573 ..Default::default()
1574 })
1575 .exec(txn)
1576 .await?;
1577 table.columns = new_log_store_table_columns;
1578 objects.push(PbObject {
1579 object_info: Some(PbObjectInfo::Table(
1580 ObjectModel(table, table_obj.unwrap()).into(),
1581 )),
1582 });
1583 }
1584 }
1585 }
1586
1587 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1588 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1589 notification_objs =
1590 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1591 }
1592
1593 Ok((objects, notification_objs))
1594 }
1595
1596 pub async fn try_abort_replacing_streaming_job(
1598 &self,
1599 tmp_job_id: JobId,
1600 tmp_sink_ids: Option<Vec<ObjectId>>,
1601 ) -> MetaResult<()> {
1602 let inner = self.inner.write().await;
1603 let txn = inner.db.begin().await?;
1604 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1605 if let Some(tmp_sink_ids) = tmp_sink_ids {
1606 for tmp_sink_id in tmp_sink_ids {
1607 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1608 }
1609 }
1610 txn.commit().await?;
1611 Ok(())
1612 }
1613
1614 pub async fn update_source_rate_limit_by_source_id(
1617 &self,
1618 source_id: SourceId,
1619 rate_limit: Option<u32>,
1620 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1621 let inner = self.inner.read().await;
1622 let txn = inner.db.begin().await?;
1623
1624 {
1625 let active_source = source::ActiveModel {
1626 source_id: Set(source_id),
1627 rate_limit: Set(rate_limit.map(|v| v as i32)),
1628 ..Default::default()
1629 };
1630 active_source.update(&txn).await?;
1631 }
1632
1633 let (source, obj) = Source::find_by_id(source_id)
1634 .find_also_related(Object)
1635 .one(&txn)
1636 .await?
1637 .ok_or_else(|| {
1638 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1639 })?;
1640
1641 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1642 let streaming_job_ids: Vec<JobId> =
1643 if let Some(table_id) = source.optional_associated_table_id {
1644 vec![table_id.as_job_id()]
1645 } else if let Some(source_info) = &source.source_info
1646 && source_info.to_protobuf().is_shared()
1647 {
1648 vec![source_id.as_share_source_job_id()]
1649 } else {
1650 ObjectDependency::find()
1651 .select_only()
1652 .column(object_dependency::Column::UsedBy)
1653 .filter(object_dependency::Column::Oid.eq(source_id))
1654 .into_tuple()
1655 .all(&txn)
1656 .await?
1657 };
1658
1659 if streaming_job_ids.is_empty() {
1660 return Err(MetaError::invalid_parameter(format!(
1661 "source id {source_id} not used by any streaming job"
1662 )));
1663 }
1664
1665 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1666 .select_only()
1667 .columns([
1668 fragment::Column::FragmentId,
1669 fragment::Column::FragmentTypeMask,
1670 fragment::Column::StreamNode,
1671 ])
1672 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1673 .into_tuple()
1674 .all(&txn)
1675 .await?;
1676 let mut fragments = fragments
1677 .into_iter()
1678 .map(|(id, mask, stream_node)| {
1679 (
1680 id,
1681 FragmentTypeMask::from(mask as u32),
1682 stream_node.to_protobuf(),
1683 )
1684 })
1685 .collect_vec();
1686
1687 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1688 let mut found = false;
1689 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1690 visit_stream_node_mut(stream_node, |node| {
1691 if let PbNodeBody::Source(node) = node
1692 && let Some(node_inner) = &mut node.source_inner
1693 && node_inner.source_id == source_id
1694 {
1695 node_inner.rate_limit = rate_limit;
1696 found = true;
1697 }
1698 });
1699 }
1700 if is_fs_source {
1701 visit_stream_node_mut(stream_node, |node| {
1704 if let PbNodeBody::StreamFsFetch(node) = node {
1705 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1706 if let Some(node_inner) = &mut node.node_inner
1707 && node_inner.source_id == source_id
1708 {
1709 node_inner.rate_limit = rate_limit;
1710 found = true;
1711 }
1712 }
1713 });
1714 }
1715 found
1716 });
1717
1718 assert!(
1719 !fragments.is_empty(),
1720 "source id should be used by at least one fragment"
1721 );
1722 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1723
1724 for (id, fragment_type_mask, stream_node) in fragments {
1725 fragment::ActiveModel {
1726 fragment_id: Set(id),
1727 fragment_type_mask: Set(fragment_type_mask.into()),
1728 stream_node: Set(StreamNode::from(&stream_node)),
1729 ..Default::default()
1730 }
1731 .update(&txn)
1732 .await?;
1733 }
1734
1735 txn.commit().await?;
1736
1737 let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1738
1739 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1740 let _version = self
1741 .notify_frontend(
1742 NotificationOperation::Update,
1743 NotificationInfo::ObjectGroup(PbObjectGroup {
1744 objects: vec![PbObject {
1745 object_info: Some(relation_info),
1746 }],
1747 }),
1748 )
1749 .await;
1750
1751 Ok(fragment_actors)
1752 }
1753
1754 fn get_fragment_actors_from_running_info(
1755 &self,
1756 fragment_ids: impl Iterator<Item = FragmentId>,
1757 ) -> HashMap<FragmentId, Vec<ActorId>> {
1758 let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1759
1760 let info = self.env.shared_actor_infos().read_guard();
1761
1762 for fragment_id in fragment_ids {
1763 let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id).unwrap();
1764 fragment_actors
1765 .entry(fragment_id as _)
1766 .or_default()
1767 .extend(actors.keys().copied());
1768 }
1769
1770 fragment_actors
1771 }
1772
1773 pub async fn mutate_fragments_by_job_id(
1776 &self,
1777 job_id: JobId,
1778 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1780 err_msg: &'static str,
1782 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1783 let inner = self.inner.read().await;
1784 let txn = inner.db.begin().await?;
1785
1786 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1787 .select_only()
1788 .columns([
1789 fragment::Column::FragmentId,
1790 fragment::Column::FragmentTypeMask,
1791 fragment::Column::StreamNode,
1792 ])
1793 .filter(fragment::Column::JobId.eq(job_id))
1794 .into_tuple()
1795 .all(&txn)
1796 .await?;
1797 let mut fragments = fragments
1798 .into_iter()
1799 .map(|(id, mask, stream_node)| {
1800 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1801 })
1802 .collect_vec();
1803
1804 let fragments = fragments
1805 .iter_mut()
1806 .map(|(_, fragment_type_mask, stream_node)| {
1807 fragments_mutation_fn(*fragment_type_mask, stream_node)
1808 })
1809 .collect::<MetaResult<Vec<bool>>>()?
1810 .into_iter()
1811 .zip_eq_debug(std::mem::take(&mut fragments))
1812 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1813 .collect::<Vec<_>>();
1814
1815 if fragments.is_empty() {
1816 return Err(MetaError::invalid_parameter(format!(
1817 "job id {job_id}: {}",
1818 err_msg
1819 )));
1820 }
1821
1822 let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1823 for (id, _, stream_node) in fragments {
1824 fragment::ActiveModel {
1825 fragment_id: Set(id),
1826 stream_node: Set(StreamNode::from(&stream_node)),
1827 ..Default::default()
1828 }
1829 .update(&txn)
1830 .await?;
1831 }
1832
1833 txn.commit().await?;
1834
1835 let fragment_actors =
1836 self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1837
1838 Ok(fragment_actors)
1839 }
1840
1841 async fn mutate_fragment_by_fragment_id(
1842 &self,
1843 fragment_id: FragmentId,
1844 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1845 err_msg: &'static str,
1846 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1847 let inner = self.inner.read().await;
1848 let txn = inner.db.begin().await?;
1849
1850 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1851 Fragment::find_by_id(fragment_id)
1852 .select_only()
1853 .columns([
1854 fragment::Column::FragmentTypeMask,
1855 fragment::Column::StreamNode,
1856 ])
1857 .into_tuple()
1858 .one(&txn)
1859 .await?
1860 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1861 let mut pb_stream_node = stream_node.to_protobuf();
1862 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1863
1864 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1865 return Err(MetaError::invalid_parameter(format!(
1866 "fragment id {fragment_id}: {}",
1867 err_msg
1868 )));
1869 }
1870
1871 fragment::ActiveModel {
1872 fragment_id: Set(fragment_id),
1873 stream_node: Set(stream_node),
1874 ..Default::default()
1875 }
1876 .update(&txn)
1877 .await?;
1878
1879 let fragment_actors =
1880 self.get_fragment_actors_from_running_info(std::iter::once(fragment_id));
1881
1882 txn.commit().await?;
1883
1884 Ok(fragment_actors)
1885 }
1886
1887 pub async fn update_backfill_rate_limit_by_job_id(
1890 &self,
1891 job_id: JobId,
1892 rate_limit: Option<u32>,
1893 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1894 let update_backfill_rate_limit =
1895 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1896 let mut found = false;
1897 if fragment_type_mask
1898 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1899 {
1900 visit_stream_node_mut(stream_node, |node| match node {
1901 PbNodeBody::StreamCdcScan(node) => {
1902 node.rate_limit = rate_limit;
1903 found = true;
1904 }
1905 PbNodeBody::StreamScan(node) => {
1906 node.rate_limit = rate_limit;
1907 found = true;
1908 }
1909 PbNodeBody::SourceBackfill(node) => {
1910 node.rate_limit = rate_limit;
1911 found = true;
1912 }
1913 PbNodeBody::Sink(node) => {
1914 node.rate_limit = rate_limit;
1915 found = true;
1916 }
1917 _ => {}
1918 });
1919 }
1920 Ok(found)
1921 };
1922
1923 self.mutate_fragments_by_job_id(
1924 job_id,
1925 update_backfill_rate_limit,
1926 "stream scan node or source node not found",
1927 )
1928 .await
1929 }
1930
1931 pub async fn update_sink_rate_limit_by_job_id(
1934 &self,
1935 sink_id: SinkId,
1936 rate_limit: Option<u32>,
1937 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1938 let update_sink_rate_limit =
1939 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1940 let mut found = Ok(false);
1941 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1942 visit_stream_node_mut(stream_node, |node| {
1943 if let PbNodeBody::Sink(node) = node {
1944 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1945 found = Err(MetaError::invalid_parameter(
1946 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1947 ));
1948 return;
1949 }
1950 node.rate_limit = rate_limit;
1951 found = Ok(true);
1952 }
1953 });
1954 }
1955 found
1956 };
1957
1958 self.mutate_fragments_by_job_id(
1959 sink_id.as_job_id(),
1960 update_sink_rate_limit,
1961 "sink node not found",
1962 )
1963 .await
1964 }
1965
1966 pub async fn update_dml_rate_limit_by_job_id(
1967 &self,
1968 job_id: JobId,
1969 rate_limit: Option<u32>,
1970 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1971 let update_dml_rate_limit =
1972 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1973 let mut found = false;
1974 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1975 visit_stream_node_mut(stream_node, |node| {
1976 if let PbNodeBody::Dml(node) = node {
1977 node.rate_limit = rate_limit;
1978 found = true;
1979 }
1980 });
1981 }
1982 Ok(found)
1983 };
1984
1985 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1986 .await
1987 }
1988
1989 pub async fn update_source_props_by_source_id(
1990 &self,
1991 source_id: SourceId,
1992 alter_props: BTreeMap<String, String>,
1993 alter_secret_refs: BTreeMap<String, PbSecretRef>,
1994 ) -> MetaResult<WithOptionsSecResolved> {
1995 let inner = self.inner.read().await;
1996 let txn = inner.db.begin().await?;
1997
1998 let (source, _obj) = Source::find_by_id(source_id)
1999 .find_also_related(Object)
2000 .one(&txn)
2001 .await?
2002 .ok_or_else(|| {
2003 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2004 })?;
2005 let connector = source.with_properties.0.get_connector().unwrap();
2006 let is_shared_source = source.is_shared();
2007
2008 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2009 if !is_shared_source {
2010 dep_source_job_ids = ObjectDependency::find()
2012 .select_only()
2013 .column(object_dependency::Column::UsedBy)
2014 .filter(object_dependency::Column::Oid.eq(source_id))
2015 .into_tuple()
2016 .all(&txn)
2017 .await?;
2018 }
2019
2020 let prop_keys: Vec<String> = alter_props
2022 .keys()
2023 .chain(alter_secret_refs.keys())
2024 .cloned()
2025 .collect();
2026 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2027 &connector, &prop_keys,
2028 )?;
2029
2030 let mut options_with_secret = WithOptionsSecResolved::new(
2031 source.with_properties.0.clone(),
2032 source
2033 .secret_ref
2034 .map(|secret_ref| secret_ref.to_protobuf())
2035 .unwrap_or_default(),
2036 );
2037 let (to_add_secret_dep, to_remove_secret_dep) =
2038 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2039
2040 tracing::info!(
2041 "applying new properties to source: source_id={}, options_with_secret={:?}",
2042 source_id,
2043 options_with_secret
2044 );
2045 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2047 let mut associate_table_id = None;
2050
2051 let mut preferred_id = source_id.as_object_id();
2055 let rewrite_sql = {
2056 let definition = source.definition.clone();
2057
2058 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2059 .map_err(|e| {
2060 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2061 anyhow!(e).context("Failed to parse source definition SQL"),
2062 )))
2063 })?
2064 .try_into()
2065 .unwrap();
2066
2067 async fn format_with_option_secret_resolved(
2081 txn: &DatabaseTransaction,
2082 options_with_secret: &WithOptionsSecResolved,
2083 ) -> MetaResult<Vec<SqlOption>> {
2084 let mut options = Vec::new();
2085 for (k, v) in options_with_secret.as_plaintext() {
2086 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2087 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2088 options.push(sql_option);
2089 }
2090 for (k, v) in options_with_secret.as_secret() {
2091 if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2092 let sql_option =
2093 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2094 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2095 options.push(sql_option);
2096 } else {
2097 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2098 }
2099 }
2100 Ok(options)
2101 }
2102
2103 match &mut stmt {
2104 Statement::CreateSource { stmt } => {
2105 stmt.with_properties.0 =
2106 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2107 }
2108 Statement::CreateTable { with_options, .. } => {
2109 *with_options =
2110 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2111 associate_table_id = source.optional_associated_table_id;
2112 preferred_id = associate_table_id.unwrap().as_object_id();
2113 }
2114 _ => unreachable!(),
2115 }
2116
2117 stmt.to_string()
2118 };
2119
2120 {
2121 if !to_add_secret_dep.is_empty() {
2123 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2124 object_dependency::ActiveModel {
2125 oid: Set(secret_id.into()),
2126 used_by: Set(preferred_id),
2127 ..Default::default()
2128 }
2129 }))
2130 .exec(&txn)
2131 .await?;
2132 }
2133 if !to_remove_secret_dep.is_empty() {
2134 let _ = ObjectDependency::delete_many()
2136 .filter(
2137 object_dependency::Column::Oid
2138 .is_in(to_remove_secret_dep)
2139 .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2140 )
2141 .exec(&txn)
2142 .await?;
2143 }
2144 }
2145
2146 let active_source_model = source::ActiveModel {
2147 source_id: Set(source_id),
2148 definition: Set(rewrite_sql.clone()),
2149 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2150 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2151 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2152 ..Default::default()
2153 };
2154 active_source_model.update(&txn).await?;
2155
2156 if let Some(associate_table_id) = associate_table_id {
2157 let active_table_model = table::ActiveModel {
2159 table_id: Set(associate_table_id),
2160 definition: Set(rewrite_sql),
2161 ..Default::default()
2162 };
2163 active_table_model.update(&txn).await?;
2164 }
2165
2166 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2167 associate_table_id.as_job_id()
2169 } else {
2170 source_id.as_share_source_job_id()
2171 }]
2172 .into_iter()
2173 .chain(dep_source_job_ids.into_iter())
2174 .collect_vec();
2175
2176 update_connector_props_fragments(
2178 &txn,
2179 to_check_job_ids,
2180 FragmentTypeFlag::Source,
2181 |node, found| {
2182 if let PbNodeBody::Source(node) = node
2183 && let Some(source_inner) = &mut node.source_inner
2184 {
2185 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2186 source_inner.secret_refs = options_with_secret.as_secret().clone();
2187 *found = true;
2188 }
2189 },
2190 is_shared_source,
2191 )
2192 .await?;
2193
2194 let mut to_update_objs = Vec::with_capacity(2);
2195 let (source, obj) = Source::find_by_id(source_id)
2196 .find_also_related(Object)
2197 .one(&txn)
2198 .await?
2199 .ok_or_else(|| {
2200 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2201 })?;
2202 to_update_objs.push(PbObject {
2203 object_info: Some(PbObjectInfo::Source(
2204 ObjectModel(source, obj.unwrap()).into(),
2205 )),
2206 });
2207
2208 if let Some(associate_table_id) = associate_table_id {
2209 let (table, obj) = Table::find_by_id(associate_table_id)
2210 .find_also_related(Object)
2211 .one(&txn)
2212 .await?
2213 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2214 to_update_objs.push(PbObject {
2215 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2216 });
2217 }
2218
2219 txn.commit().await?;
2220
2221 self.notify_frontend(
2222 NotificationOperation::Update,
2223 NotificationInfo::ObjectGroup(PbObjectGroup {
2224 objects: to_update_objs,
2225 }),
2226 )
2227 .await;
2228
2229 Ok(options_with_secret)
2230 }
2231
2232 pub async fn update_sink_props_by_sink_id(
2233 &self,
2234 sink_id: SinkId,
2235 props: BTreeMap<String, String>,
2236 ) -> MetaResult<HashMap<String, String>> {
2237 let inner = self.inner.read().await;
2238 let txn = inner.db.begin().await?;
2239
2240 let (sink, _obj) = Sink::find_by_id(sink_id)
2241 .find_also_related(Object)
2242 .one(&txn)
2243 .await?
2244 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2245 validate_sink_props(&sink, &props)?;
2246 let definition = sink.definition.clone();
2247 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2248 .map_err(|e| SinkError::Config(anyhow!(e)))?
2249 .try_into()
2250 .unwrap();
2251 if let Statement::CreateSink { stmt } = &mut stmt {
2252 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2253 } else {
2254 panic!("definition is not a create sink statement")
2255 }
2256 let mut new_config = sink.properties.clone().into_inner();
2257 new_config.extend(props.clone());
2258
2259 let definition = stmt.to_string();
2260 let active_sink = sink::ActiveModel {
2261 sink_id: Set(sink_id),
2262 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2263 definition: Set(definition),
2264 ..Default::default()
2265 };
2266 active_sink.update(&txn).await?;
2267
2268 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2269 let (sink, obj) = Sink::find_by_id(sink_id)
2270 .find_also_related(Object)
2271 .one(&txn)
2272 .await?
2273 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2274 txn.commit().await?;
2275 let relation_infos = vec![PbObject {
2276 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2277 }];
2278
2279 let _version = self
2280 .notify_frontend(
2281 NotificationOperation::Update,
2282 NotificationInfo::ObjectGroup(PbObjectGroup {
2283 objects: relation_infos,
2284 }),
2285 )
2286 .await;
2287
2288 Ok(props.into_iter().collect())
2289 }
2290
2291 pub async fn update_iceberg_table_props_by_table_id(
2292 &self,
2293 table_id: TableId,
2294 props: BTreeMap<String, String>,
2295 alter_iceberg_table_props: Option<
2296 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2297 >,
2298 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2299 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2300 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2301 let inner = self.inner.read().await;
2302 let txn = inner.db.begin().await?;
2303
2304 let (sink, _obj) = Sink::find_by_id(sink_id)
2305 .find_also_related(Object)
2306 .one(&txn)
2307 .await?
2308 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2309 validate_sink_props(&sink, &props)?;
2310
2311 let definition = sink.definition.clone();
2312 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2313 .map_err(|e| SinkError::Config(anyhow!(e)))?
2314 .try_into()
2315 .unwrap();
2316 if let Statement::CreateTable {
2317 with_options,
2318 engine,
2319 ..
2320 } = &mut stmt
2321 {
2322 if !matches!(engine, Engine::Iceberg) {
2323 return Err(SinkError::Config(anyhow!(
2324 "only iceberg table can be altered as sink"
2325 ))
2326 .into());
2327 }
2328 update_stmt_with_props(with_options, &props)?;
2329 } else {
2330 panic!("definition is not a create iceberg table statement")
2331 }
2332 let mut new_config = sink.properties.clone().into_inner();
2333 new_config.extend(props.clone());
2334
2335 let definition = stmt.to_string();
2336 let active_sink = sink::ActiveModel {
2337 sink_id: Set(sink_id),
2338 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2339 definition: Set(definition.clone()),
2340 ..Default::default()
2341 };
2342 let active_source = source::ActiveModel {
2343 source_id: Set(source_id),
2344 definition: Set(definition.clone()),
2345 ..Default::default()
2346 };
2347 let active_table = table::ActiveModel {
2348 table_id: Set(table_id),
2349 definition: Set(definition),
2350 ..Default::default()
2351 };
2352 active_sink.update(&txn).await?;
2353 active_source.update(&txn).await?;
2354 active_table.update(&txn).await?;
2355
2356 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2357
2358 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2359 .find_also_related(Object)
2360 .one(&txn)
2361 .await?
2362 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2363 let (source, source_obj) = Source::find_by_id(source_id)
2364 .find_also_related(Object)
2365 .one(&txn)
2366 .await?
2367 .ok_or_else(|| {
2368 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2369 })?;
2370 let (table, table_obj) = Table::find_by_id(table_id)
2371 .find_also_related(Object)
2372 .one(&txn)
2373 .await?
2374 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2375 txn.commit().await?;
2376 let relation_infos = vec![
2377 PbObject {
2378 object_info: Some(PbObjectInfo::Sink(
2379 ObjectModel(sink, sink_obj.unwrap()).into(),
2380 )),
2381 },
2382 PbObject {
2383 object_info: Some(PbObjectInfo::Source(
2384 ObjectModel(source, source_obj.unwrap()).into(),
2385 )),
2386 },
2387 PbObject {
2388 object_info: Some(PbObjectInfo::Table(
2389 ObjectModel(table, table_obj.unwrap()).into(),
2390 )),
2391 },
2392 ];
2393 let _version = self
2394 .notify_frontend(
2395 NotificationOperation::Update,
2396 NotificationInfo::ObjectGroup(PbObjectGroup {
2397 objects: relation_infos,
2398 }),
2399 )
2400 .await;
2401
2402 Ok((props.into_iter().collect(), sink_id))
2403 }
2404
2405 pub async fn update_connection_and_dependent_objects_props(
2407 &self,
2408 connection_id: ConnectionId,
2409 alter_props: BTreeMap<String, String>,
2410 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2411 ) -> MetaResult<(
2412 WithOptionsSecResolved, Vec<(SourceId, HashMap<String, String>)>, Vec<(SinkId, HashMap<String, String>)>, )> {
2416 let inner = self.inner.read().await;
2417 let txn = inner.db.begin().await?;
2418
2419 let dependent_sources: Vec<SourceId> = Source::find()
2421 .select_only()
2422 .column(source::Column::SourceId)
2423 .filter(source::Column::ConnectionId.eq(connection_id))
2424 .into_tuple()
2425 .all(&txn)
2426 .await?;
2427
2428 let dependent_sinks: Vec<SinkId> = Sink::find()
2429 .select_only()
2430 .column(sink::Column::SinkId)
2431 .filter(sink::Column::ConnectionId.eq(connection_id))
2432 .into_tuple()
2433 .all(&txn)
2434 .await?;
2435
2436 let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2437 .find_also_related(Object)
2438 .one(&txn)
2439 .await?
2440 .ok_or_else(|| {
2441 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2442 })?;
2443
2444 let prop_keys: Vec<String> = alter_props
2446 .keys()
2447 .chain(alter_secret_refs.keys())
2448 .cloned()
2449 .collect();
2450
2451 let connection_type_str = pb_connection_type_to_connection_type(
2453 &connection_catalog.params.to_protobuf().connection_type(),
2454 )
2455 .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2456
2457 risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2458 connection_type_str, &prop_keys,
2459 )?;
2460
2461 let connection_pb = connection_catalog.params.to_protobuf();
2462 let mut connection_options_with_secret = WithOptionsSecResolved::new(
2463 connection_pb.properties.into_iter().collect(),
2464 connection_pb.secret_refs.into_iter().collect(),
2465 );
2466
2467 let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2468 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2469
2470 tracing::debug!(
2471 "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2472 connection_id,
2473 dependent_sources,
2474 dependent_sinks
2475 );
2476
2477 {
2479 let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2480 connection_type: connection_pb.connection_type,
2481 properties: connection_options_with_secret
2482 .as_plaintext()
2483 .clone()
2484 .into_iter()
2485 .collect(),
2486 secret_refs: connection_options_with_secret
2487 .as_secret()
2488 .clone()
2489 .into_iter()
2490 .collect(),
2491 };
2492 let connection = PbConnection {
2493 id: connection_id as _,
2494 info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2495 conn_params_pb,
2496 )),
2497 ..Default::default()
2498 };
2499 validate_connection(&connection).await?;
2500 }
2501
2502 if !to_add_secret_dep.is_empty() {
2504 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2505 object_dependency::ActiveModel {
2506 oid: Set(secret_id.into()),
2507 used_by: Set(connection_id.as_object_id()),
2508 ..Default::default()
2509 }
2510 }))
2511 .exec(&txn)
2512 .await?;
2513 }
2514 if !to_remove_secret_dep.is_empty() {
2515 let _ = ObjectDependency::delete_many()
2516 .filter(
2517 object_dependency::Column::Oid
2518 .is_in(to_remove_secret_dep)
2519 .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2520 )
2521 .exec(&txn)
2522 .await?;
2523 }
2524
2525 let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2527 connection_type: connection_pb.connection_type,
2528 properties: connection_options_with_secret
2529 .as_plaintext()
2530 .clone()
2531 .into_iter()
2532 .collect(),
2533 secret_refs: connection_options_with_secret
2534 .as_secret()
2535 .clone()
2536 .into_iter()
2537 .collect(),
2538 };
2539 let active_connection_model = connection::ActiveModel {
2540 connection_id: Set(connection_id),
2541 params: Set(ConnectionParams::from(&updated_connection_params)),
2542 ..Default::default()
2543 };
2544 active_connection_model.update(&txn).await?;
2545
2546 let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2548
2549 if !dependent_sources.is_empty() {
2550 let sources_with_objs = Source::find()
2552 .find_also_related(Object)
2553 .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2554 .all(&txn)
2555 .await?;
2556
2557 let mut source_updates = Vec::new();
2559 let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2560
2561 for (source, _obj) in sources_with_objs {
2562 let source_id = source.source_id;
2563
2564 let mut source_options_with_secret = WithOptionsSecResolved::new(
2565 source.with_properties.0.clone(),
2566 source
2567 .secret_ref
2568 .clone()
2569 .map(|secret_ref| secret_ref.to_protobuf())
2570 .unwrap_or_default(),
2571 );
2572 let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2573 source_options_with_secret
2574 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2575
2576 let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2578
2579 let active_source = source::ActiveModel {
2581 source_id: Set(source_id),
2582 with_properties: Set(Property(
2583 source_options_with_secret.as_plaintext().clone(),
2584 )),
2585 secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2586 || {
2587 risingwave_meta_model::SecretRef::from(
2588 source_options_with_secret.as_secret().clone(),
2589 )
2590 },
2591 )),
2592 ..Default::default()
2593 };
2594 source_updates.push(active_source);
2595
2596 let is_shared_source = source.is_shared();
2601 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2602 if !is_shared_source {
2603 dep_source_job_ids = ObjectDependency::find()
2604 .select_only()
2605 .column(object_dependency::Column::UsedBy)
2606 .filter(object_dependency::Column::Oid.eq(source_id))
2607 .into_tuple()
2608 .all(&txn)
2609 .await?;
2610 }
2611
2612 let base_job_id =
2613 if let Some(associate_table_id) = source.optional_associated_table_id {
2614 associate_table_id.as_job_id()
2615 } else {
2616 source_id.as_share_source_job_id()
2617 };
2618 let job_ids = vec![base_job_id]
2619 .into_iter()
2620 .chain(dep_source_job_ids.into_iter())
2621 .collect_vec();
2622
2623 fragment_updates.push(DependentSourceFragmentUpdate {
2624 job_ids,
2625 with_properties: source_options_with_secret.as_plaintext().clone(),
2626 secret_refs: source_options_with_secret.as_secret().clone(),
2627 is_shared_source,
2628 });
2629
2630 let complete_source_props = LocalSecretManager::global()
2632 .fill_secrets(
2633 source_options_with_secret.as_plaintext().clone(),
2634 source_options_with_secret.as_secret().clone(),
2635 )
2636 .map_err(MetaError::from)?
2637 .into_iter()
2638 .collect::<HashMap<String, String>>();
2639 updated_sources_with_props.push((source_id, complete_source_props));
2640 }
2641
2642 for source_update in source_updates {
2643 source_update.update(&txn).await?;
2644 }
2645
2646 for DependentSourceFragmentUpdate {
2648 job_ids,
2649 with_properties,
2650 secret_refs,
2651 is_shared_source,
2652 } in fragment_updates
2653 {
2654 update_connector_props_fragments(
2655 &txn,
2656 job_ids,
2657 FragmentTypeFlag::Source,
2658 |node, found| {
2659 if let PbNodeBody::Source(node) = node
2660 && let Some(source_inner) = &mut node.source_inner
2661 {
2662 source_inner.with_properties = with_properties.clone();
2663 source_inner.secret_refs = secret_refs.clone();
2664 *found = true;
2665 }
2666 },
2667 is_shared_source,
2668 )
2669 .await?;
2670 }
2671 }
2672
2673 let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
2675
2676 if !dependent_sinks.is_empty() {
2677 let sinks_with_objs = Sink::find()
2679 .find_also_related(Object)
2680 .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
2681 .all(&txn)
2682 .await?;
2683
2684 let mut sink_updates = Vec::new();
2686 let mut sink_fragment_updates = Vec::new();
2687
2688 for (sink, _obj) in sinks_with_objs {
2689 let sink_id = sink.sink_id;
2690
2691 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2693 Some(connector) => {
2694 let connector_type = connector.to_lowercase();
2695 check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
2696 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2697
2698 match_sink_name_str!(
2699 connector_type.as_str(),
2700 SinkType,
2701 {
2702 let mut new_sink_props = sink.properties.0.clone();
2703 new_sink_props.extend(alter_props.clone());
2704 SinkType::validate_alter_config(&new_sink_props)
2705 },
2706 |sink: &str| Err(SinkError::Config(anyhow!(
2707 "unsupported sink type {}",
2708 sink
2709 )))
2710 )?
2711 }
2712 None => {
2713 return Err(SinkError::Config(anyhow!(
2714 "connector not specified when alter sink"
2715 ))
2716 .into());
2717 }
2718 };
2719
2720 let mut new_sink_props = sink.properties.0.clone();
2721 new_sink_props.extend(alter_props.clone());
2722
2723 let active_sink = sink::ActiveModel {
2725 sink_id: Set(sink_id),
2726 properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
2727 ..Default::default()
2728 };
2729 sink_updates.push(active_sink);
2730
2731 sink_fragment_updates.push((sink_id, new_sink_props.clone()));
2733
2734 let complete_sink_props: HashMap<String, String> =
2736 new_sink_props.into_iter().collect();
2737 updated_sinks_with_props.push((sink_id, complete_sink_props));
2738 }
2739
2740 for sink_update in sink_updates {
2742 sink_update.update(&txn).await?;
2743 }
2744
2745 for (sink_id, new_sink_props) in sink_fragment_updates {
2747 update_connector_props_fragments(
2748 &txn,
2749 vec![sink_id.as_job_id()],
2750 FragmentTypeFlag::Sink,
2751 |node, found| {
2752 if let PbNodeBody::Sink(node) = node
2753 && let Some(sink_desc) = &mut node.sink_desc
2754 && sink_desc.id == sink_id.as_raw_id()
2755 {
2756 sink_desc.properties = new_sink_props.clone();
2757 *found = true;
2758 }
2759 },
2760 true,
2761 )
2762 .await?;
2763 }
2764 }
2765
2766 let mut updated_objects = Vec::new();
2768
2769 let (connection, obj) = Connection::find_by_id(connection_id)
2771 .find_also_related(Object)
2772 .one(&txn)
2773 .await?
2774 .ok_or_else(|| {
2775 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2776 })?;
2777 updated_objects.push(PbObject {
2778 object_info: Some(PbObjectInfo::Connection(
2779 ObjectModel(connection, obj.unwrap()).into(),
2780 )),
2781 });
2782
2783 for source_id in &dependent_sources {
2785 let (source, obj) = Source::find_by_id(*source_id)
2786 .find_also_related(Object)
2787 .one(&txn)
2788 .await?
2789 .ok_or_else(|| {
2790 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
2791 })?;
2792 updated_objects.push(PbObject {
2793 object_info: Some(PbObjectInfo::Source(
2794 ObjectModel(source, obj.unwrap()).into(),
2795 )),
2796 });
2797 }
2798
2799 for sink_id in &dependent_sinks {
2801 let (sink, obj) = Sink::find_by_id(*sink_id)
2802 .find_also_related(Object)
2803 .one(&txn)
2804 .await?
2805 .ok_or_else(|| {
2806 MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
2807 })?;
2808 updated_objects.push(PbObject {
2809 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2810 });
2811 }
2812
2813 txn.commit().await?;
2815
2816 if !updated_objects.is_empty() {
2818 self.notify_frontend(
2819 NotificationOperation::Update,
2820 NotificationInfo::ObjectGroup(PbObjectGroup {
2821 objects: updated_objects,
2822 }),
2823 )
2824 .await;
2825 }
2826
2827 Ok((
2828 connection_options_with_secret,
2829 updated_sources_with_props,
2830 updated_sinks_with_props,
2831 ))
2832 }
2833
2834 pub async fn update_fragment_rate_limit_by_fragment_id(
2835 &self,
2836 fragment_id: FragmentId,
2837 rate_limit: Option<u32>,
2838 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2839 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2840 stream_node: &mut PbStreamNode| {
2841 let mut found = false;
2842 if fragment_type_mask.contains_any(
2843 FragmentTypeFlag::dml_rate_limit_fragments()
2844 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2845 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2846 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2847 ) {
2848 visit_stream_node_mut(stream_node, |node| {
2849 if let PbNodeBody::Dml(node) = node {
2850 node.rate_limit = rate_limit;
2851 found = true;
2852 }
2853 if let PbNodeBody::Sink(node) = node {
2854 node.rate_limit = rate_limit;
2855 found = true;
2856 }
2857 if let PbNodeBody::StreamCdcScan(node) = node {
2858 node.rate_limit = rate_limit;
2859 found = true;
2860 }
2861 if let PbNodeBody::StreamScan(node) = node {
2862 node.rate_limit = rate_limit;
2863 found = true;
2864 }
2865 if let PbNodeBody::SourceBackfill(node) = node {
2866 node.rate_limit = rate_limit;
2867 found = true;
2868 }
2869 });
2870 }
2871 found
2872 };
2873 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2874 .await
2875 }
2876
2877 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2880 let inner = self.inner.read().await;
2881 let txn = inner.db.begin().await?;
2882
2883 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2884 .select_only()
2885 .columns([
2886 fragment::Column::FragmentId,
2887 fragment::Column::JobId,
2888 fragment::Column::FragmentTypeMask,
2889 fragment::Column::StreamNode,
2890 ])
2891 .filter(FragmentTypeMask::intersects_any(
2892 FragmentTypeFlag::rate_limit_fragments(),
2893 ))
2894 .into_tuple()
2895 .all(&txn)
2896 .await?;
2897
2898 let mut rate_limits = Vec::new();
2899 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2900 let stream_node = stream_node.to_protobuf();
2901 visit_stream_node_body(&stream_node, |node| {
2902 let mut rate_limit = None;
2903 let mut node_name = None;
2904
2905 match node {
2906 PbNodeBody::Source(node) => {
2908 if let Some(node_inner) = &node.source_inner {
2909 rate_limit = node_inner.rate_limit;
2910 node_name = Some("SOURCE");
2911 }
2912 }
2913 PbNodeBody::StreamFsFetch(node) => {
2914 if let Some(node_inner) = &node.node_inner {
2915 rate_limit = node_inner.rate_limit;
2916 node_name = Some("FS_FETCH");
2917 }
2918 }
2919 PbNodeBody::SourceBackfill(node) => {
2921 rate_limit = node.rate_limit;
2922 node_name = Some("SOURCE_BACKFILL");
2923 }
2924 PbNodeBody::StreamScan(node) => {
2925 rate_limit = node.rate_limit;
2926 node_name = Some("STREAM_SCAN");
2927 }
2928 PbNodeBody::StreamCdcScan(node) => {
2929 rate_limit = node.rate_limit;
2930 node_name = Some("STREAM_CDC_SCAN");
2931 }
2932 PbNodeBody::Sink(node) => {
2933 rate_limit = node.rate_limit;
2934 node_name = Some("SINK");
2935 }
2936 _ => {}
2937 }
2938
2939 if let Some(rate_limit) = rate_limit {
2940 rate_limits.push(RateLimitInfo {
2941 fragment_id,
2942 job_id,
2943 fragment_type_mask: fragment_type_mask as u32,
2944 rate_limit,
2945 node_name: node_name.unwrap().to_owned(),
2946 });
2947 }
2948 });
2949 }
2950
2951 Ok(rate_limits)
2952 }
2953}
2954
2955fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2956 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2958 Some(connector) => {
2959 let connector_type = connector.to_lowercase();
2960 let field_names: Vec<String> = props.keys().cloned().collect();
2961 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2962 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2963
2964 match_sink_name_str!(
2965 connector_type.as_str(),
2966 SinkType,
2967 {
2968 let mut new_props = sink.properties.0.clone();
2969 new_props.extend(props.clone());
2970 SinkType::validate_alter_config(&new_props)
2971 },
2972 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2973 )?
2974 }
2975 None => {
2976 return Err(
2977 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2978 );
2979 }
2980 };
2981 Ok(())
2982}
2983
2984fn update_stmt_with_props(
2985 with_properties: &mut Vec<SqlOption>,
2986 props: &BTreeMap<String, String>,
2987) -> MetaResult<()> {
2988 let mut new_sql_options = with_properties
2989 .iter()
2990 .map(|sql_option| (&sql_option.name, sql_option))
2991 .collect::<IndexMap<_, _>>();
2992 let add_sql_options = props
2993 .iter()
2994 .map(|(k, v)| SqlOption::try_from((k, v)))
2995 .collect::<Result<Vec<SqlOption>, ParserError>>()
2996 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2997 new_sql_options.extend(
2998 add_sql_options
2999 .iter()
3000 .map(|sql_option| (&sql_option.name, sql_option)),
3001 );
3002 *with_properties = new_sql_options.into_values().cloned().collect();
3003 Ok(())
3004}
3005
3006async fn update_sink_fragment_props(
3007 txn: &DatabaseTransaction,
3008 sink_id: SinkId,
3009 props: BTreeMap<String, String>,
3010) -> MetaResult<()> {
3011 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3012 .select_only()
3013 .columns([
3014 fragment::Column::FragmentId,
3015 fragment::Column::FragmentTypeMask,
3016 fragment::Column::StreamNode,
3017 ])
3018 .filter(fragment::Column::JobId.eq(sink_id))
3019 .into_tuple()
3020 .all(txn)
3021 .await?;
3022 let fragments = fragments
3023 .into_iter()
3024 .filter(|(_, fragment_type_mask, _)| {
3025 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3026 })
3027 .filter_map(|(id, _, stream_node)| {
3028 let mut stream_node = stream_node.to_protobuf();
3029 let mut found = false;
3030 visit_stream_node_mut(&mut stream_node, |node| {
3031 if let PbNodeBody::Sink(node) = node
3032 && let Some(sink_desc) = &mut node.sink_desc
3033 && sink_desc.id == sink_id
3034 {
3035 sink_desc.properties.extend(props.clone());
3036 found = true;
3037 }
3038 });
3039 if found { Some((id, stream_node)) } else { None }
3040 })
3041 .collect_vec();
3042 assert!(
3043 !fragments.is_empty(),
3044 "sink id should be used by at least one fragment"
3045 );
3046 for (id, stream_node) in fragments {
3047 fragment::ActiveModel {
3048 fragment_id: Set(id),
3049 stream_node: Set(StreamNode::from(&stream_node)),
3050 ..Default::default()
3051 }
3052 .update(txn)
3053 .await?;
3054 }
3055 Ok(())
3056}
3057
3058pub struct SinkIntoTableContext {
3059 pub updated_sink_catalogs: Vec<SinkId>,
3062}
3063
3064pub struct FinishAutoRefreshSchemaSinkContext {
3065 pub tmp_sink_id: SinkId,
3066 pub original_sink_id: SinkId,
3067 pub columns: Vec<PbColumnCatalog>,
3068 pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3069}
3070
3071async fn update_connector_props_fragments<F>(
3072 txn: &DatabaseTransaction,
3073 job_ids: Vec<JobId>,
3074 expect_flag: FragmentTypeFlag,
3075 mut alter_stream_node_fn: F,
3076 is_shared_source: bool,
3077) -> MetaResult<()>
3078where
3079 F: FnMut(&mut PbNodeBody, &mut bool),
3080{
3081 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3082 .select_only()
3083 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3084 .filter(
3085 fragment::Column::JobId
3086 .is_in(job_ids.clone())
3087 .and(FragmentTypeMask::intersects(expect_flag)),
3088 )
3089 .into_tuple()
3090 .all(txn)
3091 .await?;
3092 let fragments = fragments
3093 .into_iter()
3094 .filter_map(|(id, stream_node)| {
3095 let mut stream_node = stream_node.to_protobuf();
3096 let mut found = false;
3097 visit_stream_node_mut(&mut stream_node, |node| {
3098 alter_stream_node_fn(node, &mut found);
3099 });
3100 if found { Some((id, stream_node)) } else { None }
3101 })
3102 .collect_vec();
3103 if is_shared_source || job_ids.len() > 1 {
3104 assert!(
3108 !fragments.is_empty(),
3109 "job ids {:?} (type: {:?}) should be used by at least one fragment",
3110 job_ids,
3111 expect_flag
3112 );
3113 }
3114
3115 for (id, stream_node) in fragments {
3116 fragment::ActiveModel {
3117 fragment_id: Set(id),
3118 stream_node: Set(StreamNode::from(&stream_node)),
3119 ..Default::default()
3120 }
3121 .update(txn)
3122 .await?;
3123 }
3124
3125 Ok(())
3126}