1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_common::util::stream_graph_visitor::{
27 visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_common::{bail, current_cluster_version};
30use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
31use risingwave_connector::error::ConnectorError;
32use risingwave_connector::sink::file_sink::fs::FsSink;
33use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
34use risingwave_connector::source::ConnectorProperties;
35use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::user_privilege::Action;
40use risingwave_meta_model::*;
41use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
42use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
43use risingwave_pb::catalog::{PbCreateType, PbTable};
44use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
45use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
46use risingwave_pb::meta::object::PbObjectInfo;
47use risingwave_pb::meta::subscribe_response::{
48 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
49};
50use risingwave_pb::meta::{PbObject, PbObjectGroup};
51use risingwave_pb::plan_common::PbColumnCatalog;
52use risingwave_pb::secret::PbSecretRef;
53use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
54use risingwave_pb::stream_plan::stream_node::PbNodeBody;
55use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
56use risingwave_pb::user::PbUserInfo;
57use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
58use risingwave_sqlparser::parser::{Parser, ParserError};
59use sea_orm::ActiveValue::Set;
60use sea_orm::sea_query::{Expr, Query, SimpleExpr};
61use sea_orm::{
62 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
63 ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
64};
65use thiserror_ext::AsReport;
66
67use super::rename::IndexItemRewriter;
68use crate::barrier::{Command, SharedFragmentInfo};
69use crate::controller::ObjectModel;
70use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
71use crate::controller::fragment::FragmentTypeMaskExt;
72use crate::controller::utils::{
73 PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
74 check_sink_into_table_cycle, ensure_job_not_canceled, ensure_object_id, ensure_user_id,
75 fetch_target_fragments, get_internal_tables_by_id, get_table_columns,
76 grant_default_privileges_automatically, insert_fragment_relations, list_user_info_by_ids,
77};
78use crate::error::MetaErrorInner;
79use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
80use crate::model::{
81 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
82 StreamJobFragmentsToCreate,
83};
84use crate::stream::SplitAssignment;
85use crate::{MetaError, MetaResult};
86
87impl CatalogController {
88 pub async fn create_streaming_job_obj(
89 txn: &DatabaseTransaction,
90 obj_type: ObjectType,
91 owner_id: UserId,
92 database_id: Option<DatabaseId>,
93 schema_id: Option<SchemaId>,
94 create_type: PbCreateType,
95 timezone: Option<String>,
96 streaming_parallelism: StreamingParallelism,
97 max_parallelism: usize,
98 specific_resource_group: Option<String>, ) -> MetaResult<JobId> {
100 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
101 let job_id = (obj.oid as u32).into();
102 let job = streaming_job::ActiveModel {
103 job_id: Set(job_id),
104 job_status: Set(JobStatus::Initial),
105 create_type: Set(create_type.into()),
106 timezone: Set(timezone),
107 parallelism: Set(streaming_parallelism),
108 max_parallelism: Set(max_parallelism as _),
109 specific_resource_group: Set(specific_resource_group),
110 };
111 job.insert(txn).await?;
112
113 Ok(job_id)
114 }
115
116 #[await_tree::instrument]
122 pub async fn create_job_catalog(
123 &self,
124 streaming_job: &mut StreamingJob,
125 ctx: &StreamContext,
126 parallelism: &Option<Parallelism>,
127 max_parallelism: usize,
128 mut dependencies: HashSet<ObjectId>,
129 specific_resource_group: Option<String>,
130 ) -> MetaResult<()> {
131 let inner = self.inner.write().await;
132 let txn = inner.db.begin().await?;
133 let create_type = streaming_job.create_type();
134
135 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
136 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
137 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
138 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
139 };
140
141 ensure_user_id(streaming_job.owner() as _, &txn).await?;
142 ensure_object_id(
143 ObjectType::Database,
144 streaming_job.database_id().as_raw_id() as _,
145 &txn,
146 )
147 .await?;
148 ensure_object_id(
149 ObjectType::Schema,
150 streaming_job.schema_id().as_raw_id() as _,
151 &txn,
152 )
153 .await?;
154 check_relation_name_duplicate(
155 &streaming_job.name(),
156 streaming_job.database_id(),
157 streaming_job.schema_id(),
158 &txn,
159 )
160 .await?;
161
162 if !dependencies.is_empty() {
164 let altering_cnt = ObjectDependency::find()
165 .join(
166 JoinType::InnerJoin,
167 object_dependency::Relation::Object1.def(),
168 )
169 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
170 .filter(
171 object_dependency::Column::Oid
172 .is_in(dependencies.clone())
173 .and(object::Column::ObjType.eq(ObjectType::Table))
174 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
175 .and(
176 object::Column::Oid.not_in_subquery(
178 Query::select()
179 .column(table::Column::TableId)
180 .from(Table)
181 .to_owned(),
182 ),
183 ),
184 )
185 .count(&txn)
186 .await?;
187 if altering_cnt != 0 {
188 return Err(MetaError::permission_denied(
189 "some dependent relations are being altered",
190 ));
191 }
192 }
193
194 match streaming_job {
195 StreamingJob::MaterializedView(table) => {
196 let job_id = Self::create_streaming_job_obj(
197 &txn,
198 ObjectType::Table,
199 table.owner as _,
200 Some(table.database_id),
201 Some(table.schema_id),
202 create_type,
203 ctx.timezone.clone(),
204 streaming_parallelism,
205 max_parallelism,
206 specific_resource_group,
207 )
208 .await?;
209 table.id = job_id.as_mv_table_id();
210 let table_model: table::ActiveModel = table.clone().into();
211 Table::insert(table_model).exec(&txn).await?;
212 }
213 StreamingJob::Sink(sink) => {
214 if let Some(target_table_id) = sink.target_table
215 && check_sink_into_table_cycle(
216 target_table_id.as_raw_id() as ObjectId,
217 dependencies.iter().cloned().collect(),
218 &txn,
219 )
220 .await?
221 {
222 bail!("Creating such a sink will result in circular dependency.");
223 }
224
225 let job_id = Self::create_streaming_job_obj(
226 &txn,
227 ObjectType::Sink,
228 sink.owner as _,
229 Some(sink.database_id),
230 Some(sink.schema_id),
231 create_type,
232 ctx.timezone.clone(),
233 streaming_parallelism,
234 max_parallelism,
235 specific_resource_group,
236 )
237 .await?;
238 sink.id = job_id.as_raw_id() as _;
239 let sink_model: sink::ActiveModel = sink.clone().into();
240 Sink::insert(sink_model).exec(&txn).await?;
241 }
242 StreamingJob::Table(src, table, _) => {
243 let job_id = Self::create_streaming_job_obj(
244 &txn,
245 ObjectType::Table,
246 table.owner as _,
247 Some(table.database_id),
248 Some(table.schema_id),
249 create_type,
250 ctx.timezone.clone(),
251 streaming_parallelism,
252 max_parallelism,
253 specific_resource_group,
254 )
255 .await?;
256 table.id = job_id.as_mv_table_id();
257 if let Some(src) = src {
258 let src_obj = Self::create_object(
259 &txn,
260 ObjectType::Source,
261 src.owner as _,
262 Some(src.database_id),
263 Some(src.schema_id),
264 )
265 .await?;
266 src.id = src_obj.oid as _;
267 src.optional_associated_table_id = Some(
268 PbOptionalAssociatedTableId::AssociatedTableId(job_id.as_raw_id() as _),
269 );
270 table.optional_associated_source_id = Some(
271 PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
272 );
273 let source: source::ActiveModel = src.clone().into();
274 Source::insert(source).exec(&txn).await?;
275 }
276 let table_model: table::ActiveModel = table.clone().into();
277 Table::insert(table_model).exec(&txn).await?;
278 }
279 StreamingJob::Index(index, table) => {
280 ensure_object_id(
281 ObjectType::Table,
282 index.primary_table_id.as_raw_id() as ObjectId,
283 &txn,
284 )
285 .await?;
286 let job_id = Self::create_streaming_job_obj(
287 &txn,
288 ObjectType::Index,
289 index.owner as _,
290 Some(index.database_id),
291 Some(index.schema_id),
292 create_type,
293 ctx.timezone.clone(),
294 streaming_parallelism,
295 max_parallelism,
296 specific_resource_group,
297 )
298 .await?;
299 index.id = job_id.as_raw_id();
301 index.index_table_id = job_id.as_mv_table_id();
302 table.id = job_id.as_mv_table_id();
303
304 ObjectDependency::insert(object_dependency::ActiveModel {
305 oid: Set(index.primary_table_id.as_raw_id() as ObjectId),
306 used_by: Set(table.id.as_raw_id() as ObjectId),
307 ..Default::default()
308 })
309 .exec(&txn)
310 .await?;
311
312 let table_model: table::ActiveModel = table.clone().into();
313 Table::insert(table_model).exec(&txn).await?;
314 let index_model: index::ActiveModel = index.clone().into();
315 Index::insert(index_model).exec(&txn).await?;
316 }
317 StreamingJob::Source(src) => {
318 let job_id = Self::create_streaming_job_obj(
319 &txn,
320 ObjectType::Source,
321 src.owner as _,
322 Some(src.database_id),
323 Some(src.schema_id),
324 create_type,
325 ctx.timezone.clone(),
326 streaming_parallelism,
327 max_parallelism,
328 specific_resource_group,
329 )
330 .await?;
331 src.id = job_id.as_raw_id();
332 let source_model: source::ActiveModel = src.clone().into();
333 Source::insert(source_model).exec(&txn).await?;
334 }
335 }
336
337 dependencies.extend(
339 streaming_job
340 .dependent_secret_ids()?
341 .into_iter()
342 .map(|secret_id| secret_id as ObjectId),
343 );
344 dependencies.extend(
346 streaming_job
347 .dependent_connection_ids()?
348 .into_iter()
349 .map(|conn_id| conn_id as ObjectId),
350 );
351
352 if !dependencies.is_empty() {
354 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
355 object_dependency::ActiveModel {
356 oid: Set(oid),
357 used_by: Set(streaming_job.id().as_raw_id() as _),
358 ..Default::default()
359 }
360 }))
361 .exec(&txn)
362 .await?;
363 }
364
365 txn.commit().await?;
366
367 Ok(())
368 }
369
370 pub async fn create_internal_table_catalog(
378 &self,
379 job: &StreamingJob,
380 mut incomplete_internal_tables: Vec<PbTable>,
381 ) -> MetaResult<HashMap<TableId, u32>> {
382 let job_id = job.id();
383 let inner = self.inner.write().await;
384 let txn = inner.db.begin().await?;
385
386 ensure_job_not_canceled(job_id, &txn).await?;
388
389 let mut table_id_map = HashMap::new();
390 for table in &mut incomplete_internal_tables {
391 let table_id = Self::create_object(
392 &txn,
393 ObjectType::Table,
394 table.owner as _,
395 Some(table.database_id),
396 Some(table.schema_id),
397 )
398 .await?
399 .oid;
400 table_id_map.insert(table.id, table_id as u32);
401 table.id = (table_id as u32).into();
402 table.job_id = Some(job_id);
403
404 let table_model = table::ActiveModel {
405 table_id: Set((table_id as u32).into()),
406 belongs_to_job_id: Set(Some(job_id)),
407 fragment_id: NotSet,
408 ..table.clone().into()
409 };
410 Table::insert(table_model).exec(&txn).await?;
411 }
412 txn.commit().await?;
413
414 Ok(table_id_map)
415 }
416
417 pub async fn prepare_stream_job_fragments(
418 &self,
419 stream_job_fragments: &StreamJobFragmentsToCreate,
420 streaming_job: &StreamingJob,
421 for_replace: bool,
422 ) -> MetaResult<()> {
423 self.prepare_streaming_job(
424 stream_job_fragments.stream_job_id(),
425 || stream_job_fragments.fragments.values(),
426 &stream_job_fragments.downstreams,
427 for_replace,
428 Some(streaming_job),
429 )
430 .await
431 }
432
433 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
438 )]
439 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
440 &self,
441 job_id: JobId,
442 get_fragments: impl Fn() -> I + 'a,
443 downstreams: &FragmentDownstreamRelation,
444 for_replace: bool,
445 creating_streaming_job: Option<&'a StreamingJob>,
446 ) -> MetaResult<()> {
447 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
448
449 let inner = self.inner.write().await;
450
451 let need_notify = creating_streaming_job
452 .map(|job| job.should_notify_creating())
453 .unwrap_or(false);
454 let definition = creating_streaming_job.map(|job| job.definition());
455
456 let mut objects_to_notify = vec![];
457 let txn = inner.db.begin().await?;
458
459 ensure_job_not_canceled(job_id, &txn).await?;
461
462 for fragment in fragments {
463 let fragment_id = fragment.fragment_id;
464 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
465
466 let fragment = fragment.into_active_model();
467 Fragment::insert(fragment).exec(&txn).await?;
468
469 if !for_replace {
472 let all_tables = StreamJobFragments::collect_tables(get_fragments());
473 for state_table_id in state_table_ids {
474 let table = all_tables
478 .get(&state_table_id)
479 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
480 assert_eq!(table.id, state_table_id.as_raw_id());
481 assert_eq!(table.fragment_id, fragment_id as u32);
482 let vnode_count = table.vnode_count();
483
484 table::ActiveModel {
485 table_id: Set(state_table_id as _),
486 fragment_id: Set(Some(fragment_id)),
487 vnode_count: Set(vnode_count as _),
488 ..Default::default()
489 }
490 .update(&txn)
491 .await?;
492
493 if need_notify {
494 let mut table = table.clone();
495 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
497 table.definition = definition.clone().unwrap();
498 }
499 objects_to_notify.push(PbObject {
500 object_info: Some(PbObjectInfo::Table(table)),
501 });
502 }
503 }
504 }
505 }
506
507 if need_notify {
509 match creating_streaming_job.unwrap() {
510 StreamingJob::Sink(sink) => {
511 objects_to_notify.push(PbObject {
512 object_info: Some(PbObjectInfo::Sink(sink.clone())),
513 });
514 }
515 StreamingJob::Index(index, _) => {
516 objects_to_notify.push(PbObject {
517 object_info: Some(PbObjectInfo::Index(index.clone())),
518 });
519 }
520 _ => {}
521 }
522 }
523
524 insert_fragment_relations(&txn, downstreams).await?;
525
526 if !for_replace {
527 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
529 Table::update(table::ActiveModel {
530 table_id: Set(table.id),
531 dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
532 ..Default::default()
533 })
534 .exec(&txn)
535 .await?;
536 }
537 }
538
539 txn.commit().await?;
540
541 if !objects_to_notify.is_empty() {
545 self.notify_frontend(
546 Operation::Add,
547 Info::ObjectGroup(PbObjectGroup {
548 objects: objects_to_notify,
549 }),
550 )
551 .await;
552 }
553
554 Ok(())
555 }
556
557 pub async fn build_cancel_command(
560 &self,
561 table_fragments: &StreamJobFragments,
562 ) -> MetaResult<Command> {
563 let inner = self.inner.read().await;
564 let txn = inner.db.begin().await?;
565
566 let dropped_sink_fragment_with_target =
567 if let Some(sink_fragment) = table_fragments.sink_fragment() {
568 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
569 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
570 sink_target_fragment
571 .get(&sink_fragment_id)
572 .map(|target_fragments| {
573 let target_fragment_id = *target_fragments
574 .first()
575 .expect("sink should have at least one downstream fragment");
576 (sink_fragment_id, target_fragment_id)
577 })
578 } else {
579 None
580 };
581
582 Ok(Command::DropStreamingJobs {
583 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
584 actors: table_fragments.actor_ids(),
585 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
586 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
587 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
588 .into_iter()
589 .map(|(sink, target)| (target as _, vec![sink as _]))
590 .collect(),
591 })
592 }
593
594 #[await_tree::instrument]
598 pub async fn try_abort_creating_streaming_job(
599 &self,
600 job_id: JobId,
601 is_cancelled: bool,
602 ) -> MetaResult<(bool, Option<DatabaseId>)> {
603 let mut inner = self.inner.write().await;
604 let txn = inner.db.begin().await?;
605
606 let obj = Object::find_by_id(job_id.as_raw_id() as ObjectId)
607 .one(&txn)
608 .await?;
609 let Some(obj) = obj else {
610 tracing::warn!(
611 id = %job_id,
612 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
613 );
614 return Ok((true, None));
615 };
616 let database_id = obj
617 .database_id
618 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
619 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
620
621 if !is_cancelled && let Some(streaming_job) = &streaming_job {
622 assert_ne!(streaming_job.job_status, JobStatus::Created);
623 if streaming_job.create_type == CreateType::Background
624 && streaming_job.job_status == JobStatus::Creating
625 {
626 tracing::warn!(
628 id = %job_id,
629 "streaming job is created in background and still in creating status"
630 );
631 return Ok((false, Some(database_id)));
632 }
633 }
634
635 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
636
637 let mut objs = vec![];
639 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
640
641 let mut need_notify =
642 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
643 if !need_notify {
644 if let Some(table) = &table_obj {
646 need_notify = table.table_type == TableType::MaterializedView;
647 }
648 }
649
650 if is_cancelled {
651 let dropped_tables = Table::find()
652 .find_also_related(Object)
653 .filter(
654 table::Column::TableId.is_in(
655 internal_table_ids
656 .iter()
657 .cloned()
658 .chain(table_obj.iter().map(|t| t.table_id as _)),
659 ),
660 )
661 .all(&txn)
662 .await?
663 .into_iter()
664 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
665 inner
666 .dropped_tables
667 .extend(dropped_tables.map(|t| (t.id, t)));
668 }
669
670 if need_notify {
671 let obj: Option<PartialObject> = Object::find_by_id(job_id.as_raw_id() as ObjectId)
672 .select_only()
673 .columns([
674 object::Column::Oid,
675 object::Column::ObjType,
676 object::Column::SchemaId,
677 object::Column::DatabaseId,
678 ])
679 .into_partial_model()
680 .one(&txn)
681 .await?;
682 let obj =
683 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
684 objs.push(obj);
685 let internal_table_objs: Vec<PartialObject> = Object::find()
686 .select_only()
687 .columns([
688 object::Column::Oid,
689 object::Column::ObjType,
690 object::Column::SchemaId,
691 object::Column::DatabaseId,
692 ])
693 .join(JoinType::InnerJoin, object::Relation::Table.def())
694 .filter(table::Column::BelongsToJobId.eq(job_id))
695 .into_partial_model()
696 .all(&txn)
697 .await?;
698 objs.extend(internal_table_objs);
699 }
700
701 if table_obj.is_none()
703 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_raw_id() as ObjectId)
704 .select_only()
705 .column(sink::Column::TargetTable)
706 .into_tuple::<Option<TableId>>()
707 .one(&txn)
708 .await?
709 {
710 let tmp_id: Option<ObjectId> = ObjectDependency::find()
711 .select_only()
712 .column(object_dependency::Column::UsedBy)
713 .join(
714 JoinType::InnerJoin,
715 object_dependency::Relation::Object1.def(),
716 )
717 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
718 .filter(
719 object_dependency::Column::Oid
720 .eq(target_table_id)
721 .and(object::Column::ObjType.eq(ObjectType::Table))
722 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
723 )
724 .into_tuple()
725 .one(&txn)
726 .await?;
727 if let Some(tmp_id) = tmp_id {
728 tracing::warn!(
729 id = tmp_id,
730 "aborting temp streaming job for sink into table"
731 );
732
733 Object::delete_by_id(tmp_id).exec(&txn).await?;
734 }
735 }
736
737 Object::delete_by_id(job_id.as_raw_id() as ObjectId)
738 .exec(&txn)
739 .await?;
740 if !internal_table_ids.is_empty() {
741 Object::delete_many()
742 .filter(object::Column::Oid.is_in(internal_table_ids))
743 .exec(&txn)
744 .await?;
745 }
746 if let Some(t) = &table_obj
747 && let Some(source_id) = t.optional_associated_source_id
748 {
749 Object::delete_by_id(source_id).exec(&txn).await?;
750 }
751
752 let err = if is_cancelled {
753 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
754 } else {
755 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
756 };
757 let abort_reason = format!("streaming job aborted {}", err.as_report());
758 for tx in inner
759 .creating_table_finish_notifier
760 .get_mut(&database_id)
761 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
762 .into_iter()
763 .flatten()
764 .flatten()
765 {
766 let _ = tx.send(Err(abort_reason.clone()));
767 }
768 txn.commit().await?;
769
770 if !objs.is_empty() {
771 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
774 .await;
775 }
776 Ok((true, Some(database_id)))
777 }
778
779 #[await_tree::instrument]
780 pub async fn post_collect_job_fragments(
781 &self,
782 job_id: JobId,
783 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
784 new_sink_downstream: Option<FragmentDownstreamRelation>,
785 split_assignment: Option<&SplitAssignment>,
786 ) -> MetaResult<()> {
787 let inner = self.inner.write().await;
788 let txn = inner.db.begin().await?;
789
790 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
791
792 if let Some(new_downstream) = new_sink_downstream {
793 insert_fragment_relations(&txn, &new_downstream).await?;
794 }
795
796 streaming_job::ActiveModel {
798 job_id: Set(job_id),
799 job_status: Set(JobStatus::Creating),
800 ..Default::default()
801 }
802 .update(&txn)
803 .await?;
804
805 if let Some(split_assignment) = split_assignment {
806 let fragment_splits = split_assignment
807 .iter()
808 .map(|(fragment_id, splits)| {
809 (
810 *fragment_id as _,
811 splits.values().flatten().cloned().collect_vec(),
812 )
813 })
814 .collect();
815
816 self.update_fragment_splits(&txn, &fragment_splits).await?;
817 }
818
819 txn.commit().await?;
820
821 Ok(())
822 }
823
824 pub async fn create_job_catalog_for_replace(
825 &self,
826 streaming_job: &StreamingJob,
827 ctx: Option<&StreamContext>,
828 specified_parallelism: Option<&NonZeroUsize>,
829 expected_original_max_parallelism: Option<usize>,
830 ) -> MetaResult<JobId> {
831 let id = streaming_job.id();
832 let inner = self.inner.write().await;
833 let txn = inner.db.begin().await?;
834
835 streaming_job.verify_version_for_replace(&txn).await?;
837 let referring_cnt = ObjectDependency::find()
839 .join(
840 JoinType::InnerJoin,
841 object_dependency::Relation::Object1.def(),
842 )
843 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
844 .filter(
845 object_dependency::Column::Oid
846 .eq(id.as_raw_id() as ObjectId)
847 .and(object::Column::ObjType.eq(ObjectType::Table))
848 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
849 )
850 .count(&txn)
851 .await?;
852 if referring_cnt != 0 {
853 return Err(MetaError::permission_denied(
854 "job is being altered or referenced by some creating jobs",
855 ));
856 }
857
858 let (original_max_parallelism, original_timezone): (i32, Option<String>) =
860 StreamingJobModel::find_by_id(id)
861 .select_only()
862 .column(streaming_job::Column::MaxParallelism)
863 .column(streaming_job::Column::Timezone)
864 .into_tuple()
865 .one(&txn)
866 .await?
867 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
868
869 if let Some(max_parallelism) = expected_original_max_parallelism
870 && original_max_parallelism != max_parallelism as i32
871 {
872 bail!(
875 "cannot use a different max parallelism \
876 when replacing streaming job, \
877 original: {}, new: {}",
878 original_max_parallelism,
879 max_parallelism
880 );
881 }
882
883 let parallelism = match specified_parallelism {
884 None => StreamingParallelism::Adaptive,
885 Some(n) => StreamingParallelism::Fixed(n.get() as _),
886 };
887 let timezone = ctx
888 .map(|ctx| ctx.timezone.clone())
889 .unwrap_or(original_timezone);
890
891 let new_obj_id = Self::create_streaming_job_obj(
893 &txn,
894 streaming_job.object_type(),
895 streaming_job.owner() as _,
896 Some(streaming_job.database_id() as _),
897 Some(streaming_job.schema_id() as _),
898 streaming_job.create_type(),
899 timezone,
900 parallelism,
901 original_max_parallelism as _,
902 None,
903 )
904 .await?;
905
906 ObjectDependency::insert(object_dependency::ActiveModel {
908 oid: Set(id.as_raw_id() as _),
909 used_by: Set(new_obj_id.as_raw_id() as _),
910 ..Default::default()
911 })
912 .exec(&txn)
913 .await?;
914
915 txn.commit().await?;
916
917 Ok(new_obj_id)
918 }
919
920 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
922 let mut inner = self.inner.write().await;
923 let txn = inner.db.begin().await?;
924
925 let job_type = Object::find_by_id(job_id.as_raw_id() as ObjectId)
926 .select_only()
927 .column(object::Column::ObjType)
928 .into_tuple()
929 .one(&txn)
930 .await?
931 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
932
933 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
934 .select_only()
935 .column(streaming_job::Column::CreateType)
936 .into_tuple()
937 .one(&txn)
938 .await?
939 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
940
941 let res = Object::update_many()
943 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
944 .col_expr(
945 object::Column::CreatedAtClusterVersion,
946 current_cluster_version().into(),
947 )
948 .filter(object::Column::Oid.eq(job_id))
949 .exec(&txn)
950 .await?;
951 if res.rows_affected == 0 {
952 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
953 }
954
955 let job = streaming_job::ActiveModel {
957 job_id: Set(job_id),
958 job_status: Set(JobStatus::Created),
959 ..Default::default()
960 };
961 job.update(&txn).await?;
962
963 let internal_table_objs = Table::find()
965 .find_also_related(Object)
966 .filter(table::Column::BelongsToJobId.eq(job_id))
967 .all(&txn)
968 .await?;
969 let mut objects = internal_table_objs
970 .iter()
971 .map(|(table, obj)| PbObject {
972 object_info: Some(PbObjectInfo::Table(
973 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
974 )),
975 })
976 .collect_vec();
977 let mut notification_op = if create_type == CreateType::Background {
978 NotificationOperation::Update
979 } else {
980 NotificationOperation::Add
981 };
982 let mut updated_user_info = vec![];
983
984 match job_type {
985 ObjectType::Table => {
986 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
987 .find_also_related(Object)
988 .one(&txn)
989 .await?
990 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
991 if table.table_type == TableType::MaterializedView {
992 notification_op = NotificationOperation::Update;
993 }
994
995 if let Some(source_id) = table.optional_associated_source_id {
996 let (src, obj) = Source::find_by_id(source_id)
997 .find_also_related(Object)
998 .one(&txn)
999 .await?
1000 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1001 objects.push(PbObject {
1002 object_info: Some(PbObjectInfo::Source(
1003 ObjectModel(src, obj.unwrap()).into(),
1004 )),
1005 });
1006 }
1007 objects.push(PbObject {
1008 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1009 });
1010 }
1011 ObjectType::Sink => {
1012 let (sink, obj) = Sink::find_by_id(job_id.as_raw_id() as ObjectId)
1013 .find_also_related(Object)
1014 .one(&txn)
1015 .await?
1016 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1017 objects.push(PbObject {
1018 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1019 });
1020 }
1021 ObjectType::Index => {
1022 let (index, obj) = Index::find_by_id(job_id.as_raw_id() as ObjectId)
1023 .find_also_related(Object)
1024 .one(&txn)
1025 .await?
1026 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1027 {
1028 let (table, obj) = Table::find_by_id(index.index_table_id)
1029 .find_also_related(Object)
1030 .one(&txn)
1031 .await?
1032 .ok_or_else(|| {
1033 MetaError::catalog_id_not_found("table", index.index_table_id)
1034 })?;
1035 objects.push(PbObject {
1036 object_info: Some(PbObjectInfo::Table(
1037 ObjectModel(table, obj.unwrap()).into(),
1038 )),
1039 });
1040 }
1041
1042 let primary_table_privileges = UserPrivilege::find()
1045 .filter(
1046 user_privilege::Column::Oid
1047 .eq(index.primary_table_id)
1048 .and(user_privilege::Column::Action.eq(Action::Select)),
1049 )
1050 .all(&txn)
1051 .await?;
1052 if !primary_table_privileges.is_empty() {
1053 let index_state_table_ids: Vec<TableId> = Table::find()
1054 .select_only()
1055 .column(table::Column::TableId)
1056 .filter(
1057 table::Column::BelongsToJobId
1058 .eq(job_id)
1059 .or(table::Column::TableId.eq(index.index_table_id)),
1060 )
1061 .into_tuple()
1062 .all(&txn)
1063 .await?;
1064 let mut new_privileges = vec![];
1065 for privilege in &primary_table_privileges {
1066 for state_table_id in &index_state_table_ids {
1067 new_privileges.push(user_privilege::ActiveModel {
1068 id: Default::default(),
1069 oid: Set(state_table_id.as_raw_id() as ObjectId),
1070 user_id: Set(privilege.user_id),
1071 action: Set(Action::Select),
1072 dependent_id: Set(privilege.dependent_id),
1073 granted_by: Set(privilege.granted_by),
1074 with_grant_option: Set(privilege.with_grant_option),
1075 });
1076 }
1077 }
1078 UserPrivilege::insert_many(new_privileges)
1079 .exec(&txn)
1080 .await?;
1081
1082 updated_user_info = list_user_info_by_ids(
1083 primary_table_privileges.into_iter().map(|p| p.user_id),
1084 &txn,
1085 )
1086 .await?;
1087 }
1088
1089 objects.push(PbObject {
1090 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1091 });
1092 }
1093 ObjectType::Source => {
1094 let (source, obj) = Source::find_by_id(job_id.as_raw_id() as ObjectId)
1095 .find_also_related(Object)
1096 .one(&txn)
1097 .await?
1098 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1099 objects.push(PbObject {
1100 object_info: Some(PbObjectInfo::Source(
1101 ObjectModel(source, obj.unwrap()).into(),
1102 )),
1103 });
1104 }
1105 _ => unreachable!("invalid job type: {:?}", job_type),
1106 }
1107
1108 if job_type != ObjectType::Index {
1109 updated_user_info =
1110 grant_default_privileges_automatically(&txn, job_id.as_raw_id() as ObjectId)
1111 .await?;
1112 }
1113 txn.commit().await?;
1114
1115 let mut version = self
1116 .notify_frontend(
1117 notification_op,
1118 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1119 )
1120 .await;
1121
1122 if !updated_user_info.is_empty() {
1124 version = self.notify_users_update(updated_user_info).await;
1125 }
1126
1127 inner
1128 .creating_table_finish_notifier
1129 .values_mut()
1130 .for_each(|creating_tables| {
1131 if let Some(txs) = creating_tables.remove(&job_id) {
1132 for tx in txs {
1133 let _ = tx.send(Ok(version));
1134 }
1135 }
1136 });
1137
1138 Ok(())
1139 }
1140
1141 pub async fn finish_replace_streaming_job(
1142 &self,
1143 tmp_id: JobId,
1144 streaming_job: StreamingJob,
1145 replace_upstream: FragmentReplaceUpstream,
1146 sink_into_table_context: SinkIntoTableContext,
1147 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1148 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1149 ) -> MetaResult<NotificationVersion> {
1150 let inner = self.inner.write().await;
1151 let txn = inner.db.begin().await?;
1152
1153 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1154 tmp_id,
1155 replace_upstream,
1156 sink_into_table_context,
1157 &txn,
1158 streaming_job,
1159 drop_table_connector_ctx,
1160 auto_refresh_schema_sinks,
1161 )
1162 .await?;
1163
1164 txn.commit().await?;
1165
1166 let mut version = self
1167 .notify_frontend(
1168 NotificationOperation::Update,
1169 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1170 )
1171 .await;
1172
1173 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1174 self.notify_users_update(user_infos).await;
1175 version = self
1176 .notify_frontend(
1177 NotificationOperation::Delete,
1178 build_object_group_for_delete(to_drop_objects),
1179 )
1180 .await;
1181 }
1182
1183 Ok(version)
1184 }
1185
1186 pub async fn finish_replace_streaming_job_inner(
1187 tmp_id: JobId,
1188 replace_upstream: FragmentReplaceUpstream,
1189 SinkIntoTableContext {
1190 updated_sink_catalogs,
1191 }: SinkIntoTableContext,
1192 txn: &DatabaseTransaction,
1193 streaming_job: StreamingJob,
1194 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1195 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1196 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1197 let original_job_id = streaming_job.id();
1198 let job_type = streaming_job.job_type();
1199
1200 let mut index_item_rewriter = None;
1201
1202 match streaming_job {
1204 StreamingJob::Table(_source, table, _table_job_type) => {
1205 let original_column_catalogs =
1208 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1209
1210 index_item_rewriter = Some({
1211 let original_columns = original_column_catalogs
1212 .to_protobuf()
1213 .into_iter()
1214 .map(|c| c.column_desc.unwrap())
1215 .collect_vec();
1216 let new_columns = table
1217 .columns
1218 .iter()
1219 .map(|c| c.column_desc.clone().unwrap())
1220 .collect_vec();
1221
1222 IndexItemRewriter {
1223 original_columns,
1224 new_columns,
1225 }
1226 });
1227
1228 for sink_id in updated_sink_catalogs {
1230 sink::ActiveModel {
1231 sink_id: Set(sink_id as _),
1232 original_target_columns: Set(Some(original_column_catalogs.clone())),
1233 ..Default::default()
1234 }
1235 .update(txn)
1236 .await?;
1237 }
1238 let mut table = table::ActiveModel::from(table);
1240 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1241 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1242 {
1243 table.optional_associated_source_id = Set(None);
1245 }
1246
1247 table.update(txn).await?;
1248 }
1249 StreamingJob::Source(source) => {
1250 let source = source::ActiveModel::from(source);
1252 source.update(txn).await?;
1253 }
1254 StreamingJob::MaterializedView(table) => {
1255 let table = table::ActiveModel::from(table);
1257 table.update(txn).await?;
1258 }
1259 _ => unreachable!(
1260 "invalid streaming job type: {:?}",
1261 streaming_job.job_type_str()
1262 ),
1263 }
1264
1265 async fn finish_fragments(
1266 txn: &DatabaseTransaction,
1267 tmp_id: JobId,
1268 original_job_id: JobId,
1269 replace_upstream: FragmentReplaceUpstream,
1270 ) -> MetaResult<()> {
1271 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1275 .select_only()
1276 .columns([
1277 fragment::Column::FragmentId,
1278 fragment::Column::StateTableIds,
1279 ])
1280 .filter(fragment::Column::JobId.eq(tmp_id))
1281 .into_tuple()
1282 .all(txn)
1283 .await?;
1284 for (fragment_id, state_table_ids) in fragment_info {
1285 for state_table_id in state_table_ids.into_inner() {
1286 let state_table_id = TableId::new(state_table_id as _);
1287 table::ActiveModel {
1288 table_id: Set(state_table_id),
1289 fragment_id: Set(Some(fragment_id)),
1290 ..Default::default()
1292 }
1293 .update(txn)
1294 .await?;
1295 }
1296 }
1297
1298 Fragment::delete_many()
1300 .filter(fragment::Column::JobId.eq(original_job_id))
1301 .exec(txn)
1302 .await?;
1303 Fragment::update_many()
1304 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1305 .filter(fragment::Column::JobId.eq(tmp_id))
1306 .exec(txn)
1307 .await?;
1308
1309 for (fragment_id, fragment_replace_map) in replace_upstream {
1312 let (fragment_id, mut stream_node) =
1313 Fragment::find_by_id(fragment_id as FragmentId)
1314 .select_only()
1315 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1316 .into_tuple::<(FragmentId, StreamNode)>()
1317 .one(txn)
1318 .await?
1319 .map(|(id, node)| (id, node.to_protobuf()))
1320 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1321
1322 visit_stream_node_mut(&mut stream_node, |body| {
1323 if let PbNodeBody::Merge(m) = body
1324 && let Some(new_fragment_id) =
1325 fragment_replace_map.get(&m.upstream_fragment_id)
1326 {
1327 m.upstream_fragment_id = *new_fragment_id;
1328 }
1329 });
1330 fragment::ActiveModel {
1331 fragment_id: Set(fragment_id),
1332 stream_node: Set(StreamNode::from(&stream_node)),
1333 ..Default::default()
1334 }
1335 .update(txn)
1336 .await?;
1337 }
1338
1339 Object::delete_by_id(tmp_id.as_raw_id() as ObjectId)
1341 .exec(txn)
1342 .await?;
1343
1344 Ok(())
1345 }
1346
1347 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1348
1349 let mut objects = vec![];
1351 match job_type {
1352 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1353 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1354 .find_also_related(Object)
1355 .one(txn)
1356 .await?
1357 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1358 objects.push(PbObject {
1359 object_info: Some(PbObjectInfo::Table(
1360 ObjectModel(table, table_obj.unwrap()).into(),
1361 )),
1362 })
1363 }
1364 StreamingJobType::Source => {
1365 let (source, source_obj) =
1366 Source::find_by_id(original_job_id.as_raw_id() as ObjectId)
1367 .find_also_related(Object)
1368 .one(txn)
1369 .await?
1370 .ok_or_else(|| {
1371 MetaError::catalog_id_not_found("object", original_job_id)
1372 })?;
1373 objects.push(PbObject {
1374 object_info: Some(PbObjectInfo::Source(
1375 ObjectModel(source, source_obj.unwrap()).into(),
1376 )),
1377 })
1378 }
1379 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1380 }
1381
1382 if let Some(expr_rewriter) = index_item_rewriter {
1383 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1384 .select_only()
1385 .columns([index::Column::IndexId, index::Column::IndexItems])
1386 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1387 .into_tuple()
1388 .all(txn)
1389 .await?;
1390 for (index_id, nodes) in index_items {
1391 let mut pb_nodes = nodes.to_protobuf();
1392 pb_nodes
1393 .iter_mut()
1394 .for_each(|x| expr_rewriter.rewrite_expr(x));
1395 let index = index::ActiveModel {
1396 index_id: Set(index_id),
1397 index_items: Set(pb_nodes.into()),
1398 ..Default::default()
1399 }
1400 .update(txn)
1401 .await?;
1402 let index_obj = index
1403 .find_related(Object)
1404 .one(txn)
1405 .await?
1406 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1407 objects.push(PbObject {
1408 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1409 });
1410 }
1411 }
1412
1413 if let Some(sinks) = auto_refresh_schema_sinks {
1414 for finish_sink_context in sinks {
1415 finish_fragments(
1416 txn,
1417 finish_sink_context.tmp_sink_id,
1418 JobId::new(finish_sink_context.original_sink_id as _),
1419 Default::default(),
1420 )
1421 .await?;
1422 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1423 .find_also_related(Object)
1424 .one(txn)
1425 .await?
1426 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1427 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1428 Sink::update(sink::ActiveModel {
1429 sink_id: Set(finish_sink_context.original_sink_id),
1430 columns: Set(columns.clone()),
1431 ..Default::default()
1432 })
1433 .exec(txn)
1434 .await?;
1435 sink.columns = columns;
1436 objects.push(PbObject {
1437 object_info: Some(PbObjectInfo::Sink(
1438 ObjectModel(sink, sink_obj.unwrap()).into(),
1439 )),
1440 });
1441 if let Some((log_store_table_id, new_log_store_table_columns)) =
1442 finish_sink_context.new_log_store_table
1443 {
1444 let new_log_store_table_columns: ColumnCatalogArray =
1445 new_log_store_table_columns.into();
1446 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1447 .find_also_related(Object)
1448 .one(txn)
1449 .await?
1450 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1451 Table::update(table::ActiveModel {
1452 table_id: Set(log_store_table_id),
1453 columns: Set(new_log_store_table_columns.clone()),
1454 ..Default::default()
1455 })
1456 .exec(txn)
1457 .await?;
1458 table.columns = new_log_store_table_columns;
1459 objects.push(PbObject {
1460 object_info: Some(PbObjectInfo::Table(
1461 ObjectModel(table, table_obj.unwrap()).into(),
1462 )),
1463 });
1464 }
1465 }
1466 }
1467
1468 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1469 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1470 notification_objs =
1471 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1472 }
1473
1474 Ok((objects, notification_objs))
1475 }
1476
1477 pub async fn try_abort_replacing_streaming_job(
1479 &self,
1480 tmp_job_id: JobId,
1481 tmp_sink_ids: Option<Vec<ObjectId>>,
1482 ) -> MetaResult<()> {
1483 let inner = self.inner.write().await;
1484 let txn = inner.db.begin().await?;
1485 Object::delete_by_id(tmp_job_id.as_raw_id() as ObjectId)
1486 .exec(&txn)
1487 .await?;
1488 if let Some(tmp_sink_ids) = tmp_sink_ids {
1489 for tmp_sink_id in tmp_sink_ids {
1490 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1491 }
1492 }
1493 txn.commit().await?;
1494 Ok(())
1495 }
1496
1497 pub async fn update_source_rate_limit_by_source_id(
1500 &self,
1501 source_id: SourceId,
1502 rate_limit: Option<u32>,
1503 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1504 let inner = self.inner.read().await;
1505 let txn = inner.db.begin().await?;
1506
1507 {
1508 let active_source = source::ActiveModel {
1509 source_id: Set(source_id),
1510 rate_limit: Set(rate_limit.map(|v| v as i32)),
1511 ..Default::default()
1512 };
1513 active_source.update(&txn).await?;
1514 }
1515
1516 let (source, obj) = Source::find_by_id(source_id)
1517 .find_also_related(Object)
1518 .one(&txn)
1519 .await?
1520 .ok_or_else(|| {
1521 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1522 })?;
1523
1524 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1525 let streaming_job_ids: Vec<JobId> =
1526 if let Some(table_id) = source.optional_associated_table_id {
1527 vec![table_id.as_job_id()]
1528 } else if let Some(source_info) = &source.source_info
1529 && source_info.to_protobuf().is_shared()
1530 {
1531 vec![JobId::new(source_id as _)]
1532 } else {
1533 ObjectDependency::find()
1534 .select_only()
1535 .column(object_dependency::Column::UsedBy)
1536 .filter(object_dependency::Column::Oid.eq(source_id))
1537 .into_tuple()
1538 .all(&txn)
1539 .await?
1540 };
1541
1542 if streaming_job_ids.is_empty() {
1543 return Err(MetaError::invalid_parameter(format!(
1544 "source id {source_id} not used by any streaming job"
1545 )));
1546 }
1547
1548 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1549 .select_only()
1550 .columns([
1551 fragment::Column::FragmentId,
1552 fragment::Column::FragmentTypeMask,
1553 fragment::Column::StreamNode,
1554 ])
1555 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1556 .into_tuple()
1557 .all(&txn)
1558 .await?;
1559 let mut fragments = fragments
1560 .into_iter()
1561 .map(|(id, mask, stream_node)| {
1562 (
1563 id,
1564 FragmentTypeMask::from(mask as u32),
1565 stream_node.to_protobuf(),
1566 )
1567 })
1568 .collect_vec();
1569
1570 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1571 let mut found = false;
1572 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1573 visit_stream_node_mut(stream_node, |node| {
1574 if let PbNodeBody::Source(node) = node
1575 && let Some(node_inner) = &mut node.source_inner
1576 && node_inner.source_id == source_id as u32
1577 {
1578 node_inner.rate_limit = rate_limit;
1579 found = true;
1580 }
1581 });
1582 }
1583 if is_fs_source {
1584 visit_stream_node_mut(stream_node, |node| {
1587 if let PbNodeBody::StreamFsFetch(node) = node {
1588 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1589 if let Some(node_inner) = &mut node.node_inner
1590 && node_inner.source_id == source_id as u32
1591 {
1592 node_inner.rate_limit = rate_limit;
1593 found = true;
1594 }
1595 }
1596 });
1597 }
1598 found
1599 });
1600
1601 assert!(
1602 !fragments.is_empty(),
1603 "source id should be used by at least one fragment"
1604 );
1605 let fragment_ids = fragments.iter().map(|(id, _, _)| *id as u32).collect_vec();
1606
1607 for (id, fragment_type_mask, stream_node) in fragments {
1608 fragment::ActiveModel {
1609 fragment_id: Set(id),
1610 fragment_type_mask: Set(fragment_type_mask.into()),
1611 stream_node: Set(StreamNode::from(&stream_node)),
1612 ..Default::default()
1613 }
1614 .update(&txn)
1615 .await?;
1616 }
1617
1618 txn.commit().await?;
1619
1620 let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1621
1622 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1623 let _version = self
1624 .notify_frontend(
1625 NotificationOperation::Update,
1626 NotificationInfo::ObjectGroup(PbObjectGroup {
1627 objects: vec![PbObject {
1628 object_info: Some(relation_info),
1629 }],
1630 }),
1631 )
1632 .await;
1633
1634 Ok(fragment_actors)
1635 }
1636
1637 fn get_fragment_actors_from_running_info(
1638 &self,
1639 fragment_ids: impl Iterator<Item = u32>,
1640 ) -> HashMap<FragmentId, Vec<ActorId>> {
1641 let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1642
1643 let info = self.env.shared_actor_infos().read_guard();
1644
1645 for fragment_id in fragment_ids {
1646 let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id as _).unwrap();
1647 fragment_actors
1648 .entry(fragment_id as _)
1649 .or_default()
1650 .extend(actors.keys().map(|actor| *actor as i32));
1651 }
1652
1653 fragment_actors
1654 }
1655
1656 pub async fn mutate_fragments_by_job_id(
1659 &self,
1660 job_id: JobId,
1661 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1663 err_msg: &'static str,
1665 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1666 let inner = self.inner.read().await;
1667 let txn = inner.db.begin().await?;
1668
1669 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1670 .select_only()
1671 .columns([
1672 fragment::Column::FragmentId,
1673 fragment::Column::FragmentTypeMask,
1674 fragment::Column::StreamNode,
1675 ])
1676 .filter(fragment::Column::JobId.eq(job_id))
1677 .into_tuple()
1678 .all(&txn)
1679 .await?;
1680 let mut fragments = fragments
1681 .into_iter()
1682 .map(|(id, mask, stream_node)| {
1683 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1684 })
1685 .collect_vec();
1686
1687 let fragments = fragments
1688 .iter_mut()
1689 .map(|(_, fragment_type_mask, stream_node)| {
1690 fragments_mutation_fn(*fragment_type_mask, stream_node)
1691 })
1692 .collect::<MetaResult<Vec<bool>>>()?
1693 .into_iter()
1694 .zip_eq_debug(std::mem::take(&mut fragments))
1695 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1696 .collect::<Vec<_>>();
1697
1698 if fragments.is_empty() {
1699 return Err(MetaError::invalid_parameter(format!(
1700 "job id {job_id}: {}",
1701 err_msg
1702 )));
1703 }
1704
1705 let fragment_ids: HashSet<u32> = fragments.iter().map(|(id, _, _)| *id as u32).collect();
1706 for (id, _, stream_node) in fragments {
1707 fragment::ActiveModel {
1708 fragment_id: Set(id),
1709 stream_node: Set(StreamNode::from(&stream_node)),
1710 ..Default::default()
1711 }
1712 .update(&txn)
1713 .await?;
1714 }
1715
1716 txn.commit().await?;
1717
1718 let fragment_actors =
1719 self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1720
1721 Ok(fragment_actors)
1722 }
1723
1724 async fn mutate_fragment_by_fragment_id(
1725 &self,
1726 fragment_id: FragmentId,
1727 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1728 err_msg: &'static str,
1729 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1730 let inner = self.inner.read().await;
1731 let txn = inner.db.begin().await?;
1732
1733 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1734 Fragment::find_by_id(fragment_id)
1735 .select_only()
1736 .columns([
1737 fragment::Column::FragmentTypeMask,
1738 fragment::Column::StreamNode,
1739 ])
1740 .into_tuple()
1741 .one(&txn)
1742 .await?
1743 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1744 let mut pb_stream_node = stream_node.to_protobuf();
1745 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1746
1747 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1748 return Err(MetaError::invalid_parameter(format!(
1749 "fragment id {fragment_id}: {}",
1750 err_msg
1751 )));
1752 }
1753
1754 fragment::ActiveModel {
1755 fragment_id: Set(fragment_id),
1756 stream_node: Set(stream_node),
1757 ..Default::default()
1758 }
1759 .update(&txn)
1760 .await?;
1761
1762 let fragment_actors =
1763 self.get_fragment_actors_from_running_info(std::iter::once(fragment_id as u32));
1764
1765 txn.commit().await?;
1766
1767 Ok(fragment_actors)
1768 }
1769
1770 pub async fn update_backfill_rate_limit_by_job_id(
1773 &self,
1774 job_id: JobId,
1775 rate_limit: Option<u32>,
1776 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1777 let update_backfill_rate_limit =
1778 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1779 let mut found = false;
1780 if fragment_type_mask
1781 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1782 {
1783 visit_stream_node_mut(stream_node, |node| match node {
1784 PbNodeBody::StreamCdcScan(node) => {
1785 node.rate_limit = rate_limit;
1786 found = true;
1787 }
1788 PbNodeBody::StreamScan(node) => {
1789 node.rate_limit = rate_limit;
1790 found = true;
1791 }
1792 PbNodeBody::SourceBackfill(node) => {
1793 node.rate_limit = rate_limit;
1794 found = true;
1795 }
1796 PbNodeBody::Sink(node) => {
1797 node.rate_limit = rate_limit;
1798 found = true;
1799 }
1800 _ => {}
1801 });
1802 }
1803 Ok(found)
1804 };
1805
1806 self.mutate_fragments_by_job_id(
1807 job_id,
1808 update_backfill_rate_limit,
1809 "stream scan node or source node not found",
1810 )
1811 .await
1812 }
1813
1814 pub async fn update_sink_rate_limit_by_job_id(
1817 &self,
1818 sink_id: SinkId,
1819 rate_limit: Option<u32>,
1820 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1821 let update_sink_rate_limit =
1822 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1823 let mut found = Ok(false);
1824 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1825 visit_stream_node_mut(stream_node, |node| {
1826 if let PbNodeBody::Sink(node) = node {
1827 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1828 found = Err(MetaError::invalid_parameter(
1829 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1830 ));
1831 return;
1832 }
1833 node.rate_limit = rate_limit;
1834 found = Ok(true);
1835 }
1836 });
1837 }
1838 found
1839 };
1840
1841 self.mutate_fragments_by_job_id(
1842 JobId::new(sink_id as _),
1843 update_sink_rate_limit,
1844 "sink node not found",
1845 )
1846 .await
1847 }
1848
1849 pub async fn update_dml_rate_limit_by_job_id(
1850 &self,
1851 job_id: JobId,
1852 rate_limit: Option<u32>,
1853 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1854 let update_dml_rate_limit =
1855 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1856 let mut found = false;
1857 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1858 visit_stream_node_mut(stream_node, |node| {
1859 if let PbNodeBody::Dml(node) = node {
1860 node.rate_limit = rate_limit;
1861 found = true;
1862 }
1863 });
1864 }
1865 Ok(found)
1866 };
1867
1868 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1869 .await
1870 }
1871
1872 pub async fn update_source_props_by_source_id(
1873 &self,
1874 source_id: SourceId,
1875 alter_props: BTreeMap<String, String>,
1876 alter_secret_refs: BTreeMap<String, PbSecretRef>,
1877 ) -> MetaResult<WithOptionsSecResolved> {
1878 let inner = self.inner.read().await;
1879 let txn = inner.db.begin().await?;
1880
1881 let (source, _obj) = Source::find_by_id(source_id)
1882 .find_also_related(Object)
1883 .one(&txn)
1884 .await?
1885 .ok_or_else(|| {
1886 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1887 })?;
1888 let connector = source.with_properties.0.get_connector().unwrap();
1889 let is_shared_source = source.is_shared();
1890
1891 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1892 if !is_shared_source {
1893 dep_source_job_ids = ObjectDependency::find()
1895 .select_only()
1896 .column(object_dependency::Column::UsedBy)
1897 .filter(object_dependency::Column::Oid.eq(source_id))
1898 .into_tuple()
1899 .all(&txn)
1900 .await?;
1901 }
1902
1903 let prop_keys: Vec<String> = alter_props
1905 .keys()
1906 .chain(alter_secret_refs.keys())
1907 .cloned()
1908 .collect();
1909 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1910 &connector, &prop_keys,
1911 )?;
1912
1913 let mut options_with_secret = WithOptionsSecResolved::new(
1914 source.with_properties.0.clone(),
1915 source
1916 .secret_ref
1917 .map(|secret_ref| secret_ref.to_protobuf())
1918 .unwrap_or_default(),
1919 );
1920 let (to_add_secret_dep, to_remove_secret_dep) =
1921 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1922
1923 tracing::info!(
1924 "applying new properties to source: source_id={}, options_with_secret={:?}",
1925 source_id,
1926 options_with_secret
1927 );
1928 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1930 let mut associate_table_id = None;
1933
1934 let mut preferred_id: i32 = source_id;
1938 let rewrite_sql = {
1939 let definition = source.definition.clone();
1940
1941 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1942 .map_err(|e| {
1943 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1944 anyhow!(e).context("Failed to parse source definition SQL"),
1945 )))
1946 })?
1947 .try_into()
1948 .unwrap();
1949
1950 async fn format_with_option_secret_resolved(
1964 txn: &DatabaseTransaction,
1965 options_with_secret: &WithOptionsSecResolved,
1966 ) -> MetaResult<Vec<SqlOption>> {
1967 let mut options = Vec::new();
1968 for (k, v) in options_with_secret.as_plaintext() {
1969 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1970 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1971 options.push(sql_option);
1972 }
1973 for (k, v) in options_with_secret.as_secret() {
1974 if let Some(secret_model) =
1975 Secret::find_by_id(v.secret_id as i32).one(txn).await?
1976 {
1977 let sql_option =
1978 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
1979 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1980 options.push(sql_option);
1981 } else {
1982 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
1983 }
1984 }
1985 Ok(options)
1986 }
1987
1988 match &mut stmt {
1989 Statement::CreateSource { stmt } => {
1990 stmt.with_properties.0 =
1991 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1992 }
1993 Statement::CreateTable { with_options, .. } => {
1994 *with_options =
1995 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1996 associate_table_id = source.optional_associated_table_id;
1997 preferred_id = associate_table_id.unwrap().as_raw_id() as _;
1998 }
1999 _ => unreachable!(),
2000 }
2001
2002 stmt.to_string()
2003 };
2004
2005 {
2006 if !to_add_secret_dep.is_empty() {
2008 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2009 object_dependency::ActiveModel {
2010 oid: Set(secret_id as _),
2011 used_by: Set(preferred_id as _),
2012 ..Default::default()
2013 }
2014 }))
2015 .exec(&txn)
2016 .await?;
2017 }
2018 if !to_remove_secret_dep.is_empty() {
2019 let _ = ObjectDependency::delete_many()
2021 .filter(
2022 object_dependency::Column::Oid
2023 .is_in(to_remove_secret_dep)
2024 .and(
2025 object_dependency::Column::UsedBy.eq::<ObjectId>(preferred_id as _),
2026 ),
2027 )
2028 .exec(&txn)
2029 .await?;
2030 }
2031 }
2032
2033 let active_source_model = source::ActiveModel {
2034 source_id: Set(source_id),
2035 definition: Set(rewrite_sql.clone()),
2036 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2037 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2038 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2039 ..Default::default()
2040 };
2041 active_source_model.update(&txn).await?;
2042
2043 if let Some(associate_table_id) = associate_table_id {
2044 let active_table_model = table::ActiveModel {
2046 table_id: Set(associate_table_id),
2047 definition: Set(rewrite_sql),
2048 ..Default::default()
2049 };
2050 active_table_model.update(&txn).await?;
2051 }
2052
2053 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2054 associate_table_id.as_job_id()
2056 } else {
2057 JobId::new(source_id as _)
2058 }]
2059 .into_iter()
2060 .chain(dep_source_job_ids.into_iter())
2061 .collect_vec();
2062
2063 update_connector_props_fragments(
2065 &txn,
2066 to_check_job_ids,
2067 FragmentTypeFlag::Source,
2068 |node, found| {
2069 if let PbNodeBody::Source(node) = node
2070 && let Some(source_inner) = &mut node.source_inner
2071 {
2072 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2073 source_inner.secret_refs = options_with_secret.as_secret().clone();
2074 *found = true;
2075 }
2076 },
2077 is_shared_source,
2078 )
2079 .await?;
2080
2081 let mut to_update_objs = Vec::with_capacity(2);
2082 let (source, obj) = Source::find_by_id(source_id)
2083 .find_also_related(Object)
2084 .one(&txn)
2085 .await?
2086 .ok_or_else(|| {
2087 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2088 })?;
2089 to_update_objs.push(PbObject {
2090 object_info: Some(PbObjectInfo::Source(
2091 ObjectModel(source, obj.unwrap()).into(),
2092 )),
2093 });
2094
2095 if let Some(associate_table_id) = associate_table_id {
2096 let (table, obj) = Table::find_by_id(associate_table_id)
2097 .find_also_related(Object)
2098 .one(&txn)
2099 .await?
2100 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2101 to_update_objs.push(PbObject {
2102 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2103 });
2104 }
2105
2106 txn.commit().await?;
2107
2108 self.notify_frontend(
2109 NotificationOperation::Update,
2110 NotificationInfo::ObjectGroup(PbObjectGroup {
2111 objects: to_update_objs,
2112 }),
2113 )
2114 .await;
2115
2116 Ok(options_with_secret)
2117 }
2118
2119 pub async fn update_sink_props_by_sink_id(
2120 &self,
2121 sink_id: SinkId,
2122 props: BTreeMap<String, String>,
2123 ) -> MetaResult<HashMap<String, String>> {
2124 let inner = self.inner.read().await;
2125 let txn = inner.db.begin().await?;
2126
2127 let (sink, _obj) = Sink::find_by_id(sink_id)
2128 .find_also_related(Object)
2129 .one(&txn)
2130 .await?
2131 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2132 validate_sink_props(&sink, &props)?;
2133 let definition = sink.definition.clone();
2134 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2135 .map_err(|e| SinkError::Config(anyhow!(e)))?
2136 .try_into()
2137 .unwrap();
2138 if let Statement::CreateSink { stmt } = &mut stmt {
2139 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2140 } else {
2141 panic!("definition is not a create sink statement")
2142 }
2143 let mut new_config = sink.properties.clone().into_inner();
2144 new_config.extend(props.clone());
2145
2146 let definition = stmt.to_string();
2147 let active_sink = sink::ActiveModel {
2148 sink_id: Set(sink_id),
2149 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2150 definition: Set(definition),
2151 ..Default::default()
2152 };
2153 active_sink.update(&txn).await?;
2154
2155 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2156 let (sink, obj) = Sink::find_by_id(sink_id)
2157 .find_also_related(Object)
2158 .one(&txn)
2159 .await?
2160 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2161 txn.commit().await?;
2162 let relation_infos = vec![PbObject {
2163 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2164 }];
2165
2166 let _version = self
2167 .notify_frontend(
2168 NotificationOperation::Update,
2169 NotificationInfo::ObjectGroup(PbObjectGroup {
2170 objects: relation_infos,
2171 }),
2172 )
2173 .await;
2174
2175 Ok(props.into_iter().collect())
2176 }
2177
2178 pub async fn update_iceberg_table_props_by_table_id(
2179 &self,
2180 table_id: TableId,
2181 props: BTreeMap<String, String>,
2182 alter_iceberg_table_props: Option<
2183 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2184 >,
2185 ) -> MetaResult<(HashMap<String, String>, u32)> {
2186 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2187 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2188 let inner = self.inner.read().await;
2189 let txn = inner.db.begin().await?;
2190
2191 let (sink, _obj) = Sink::find_by_id(sink_id)
2192 .find_also_related(Object)
2193 .one(&txn)
2194 .await?
2195 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2196 validate_sink_props(&sink, &props)?;
2197
2198 let definition = sink.definition.clone();
2199 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2200 .map_err(|e| SinkError::Config(anyhow!(e)))?
2201 .try_into()
2202 .unwrap();
2203 if let Statement::CreateTable {
2204 with_options,
2205 engine,
2206 ..
2207 } = &mut stmt
2208 {
2209 if !matches!(engine, Engine::Iceberg) {
2210 return Err(SinkError::Config(anyhow!(
2211 "only iceberg table can be altered as sink"
2212 ))
2213 .into());
2214 }
2215 update_stmt_with_props(with_options, &props)?;
2216 } else {
2217 panic!("definition is not a create iceberg table statement")
2218 }
2219 let mut new_config = sink.properties.clone().into_inner();
2220 new_config.extend(props.clone());
2221
2222 let definition = stmt.to_string();
2223 let active_sink = sink::ActiveModel {
2224 sink_id: Set(sink_id),
2225 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2226 definition: Set(definition.clone()),
2227 ..Default::default()
2228 };
2229 let active_source = source::ActiveModel {
2230 source_id: Set(source_id),
2231 definition: Set(definition.clone()),
2232 ..Default::default()
2233 };
2234 let active_table = table::ActiveModel {
2235 table_id: Set(table_id),
2236 definition: Set(definition),
2237 ..Default::default()
2238 };
2239 active_sink.update(&txn).await?;
2240 active_source.update(&txn).await?;
2241 active_table.update(&txn).await?;
2242
2243 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2244
2245 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2246 .find_also_related(Object)
2247 .one(&txn)
2248 .await?
2249 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2250 let (source, source_obj) = Source::find_by_id(source_id)
2251 .find_also_related(Object)
2252 .one(&txn)
2253 .await?
2254 .ok_or_else(|| {
2255 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2256 })?;
2257 let (table, table_obj) = Table::find_by_id(table_id)
2258 .find_also_related(Object)
2259 .one(&txn)
2260 .await?
2261 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2262 txn.commit().await?;
2263 let relation_infos = vec![
2264 PbObject {
2265 object_info: Some(PbObjectInfo::Sink(
2266 ObjectModel(sink, sink_obj.unwrap()).into(),
2267 )),
2268 },
2269 PbObject {
2270 object_info: Some(PbObjectInfo::Source(
2271 ObjectModel(source, source_obj.unwrap()).into(),
2272 )),
2273 },
2274 PbObject {
2275 object_info: Some(PbObjectInfo::Table(
2276 ObjectModel(table, table_obj.unwrap()).into(),
2277 )),
2278 },
2279 ];
2280 let _version = self
2281 .notify_frontend(
2282 NotificationOperation::Update,
2283 NotificationInfo::ObjectGroup(PbObjectGroup {
2284 objects: relation_infos,
2285 }),
2286 )
2287 .await;
2288
2289 Ok((props.into_iter().collect(), sink_id as u32))
2290 }
2291
2292 pub async fn update_fragment_rate_limit_by_fragment_id(
2293 &self,
2294 fragment_id: FragmentId,
2295 rate_limit: Option<u32>,
2296 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2297 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2298 stream_node: &mut PbStreamNode| {
2299 let mut found = false;
2300 if fragment_type_mask.contains_any(
2301 FragmentTypeFlag::dml_rate_limit_fragments()
2302 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2303 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2304 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2305 ) {
2306 visit_stream_node_mut(stream_node, |node| {
2307 if let PbNodeBody::Dml(node) = node {
2308 node.rate_limit = rate_limit;
2309 found = true;
2310 }
2311 if let PbNodeBody::Sink(node) = node {
2312 node.rate_limit = rate_limit;
2313 found = true;
2314 }
2315 if let PbNodeBody::StreamCdcScan(node) = node {
2316 node.rate_limit = rate_limit;
2317 found = true;
2318 }
2319 if let PbNodeBody::StreamScan(node) = node {
2320 node.rate_limit = rate_limit;
2321 found = true;
2322 }
2323 if let PbNodeBody::SourceBackfill(node) = node {
2324 node.rate_limit = rate_limit;
2325 found = true;
2326 }
2327 });
2328 }
2329 found
2330 };
2331 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2332 .await
2333 }
2334
2335 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2338 let inner = self.inner.read().await;
2339 let txn = inner.db.begin().await?;
2340
2341 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2342 .select_only()
2343 .columns([
2344 fragment::Column::FragmentId,
2345 fragment::Column::JobId,
2346 fragment::Column::FragmentTypeMask,
2347 fragment::Column::StreamNode,
2348 ])
2349 .filter(FragmentTypeMask::intersects_any(
2350 FragmentTypeFlag::rate_limit_fragments(),
2351 ))
2352 .into_tuple()
2353 .all(&txn)
2354 .await?;
2355
2356 let mut rate_limits = Vec::new();
2357 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2358 let stream_node = stream_node.to_protobuf();
2359 visit_stream_node_body(&stream_node, |node| {
2360 let mut rate_limit = None;
2361 let mut node_name = None;
2362
2363 match node {
2364 PbNodeBody::Source(node) => {
2366 if let Some(node_inner) = &node.source_inner {
2367 rate_limit = node_inner.rate_limit;
2368 node_name = Some("SOURCE");
2369 }
2370 }
2371 PbNodeBody::StreamFsFetch(node) => {
2372 if let Some(node_inner) = &node.node_inner {
2373 rate_limit = node_inner.rate_limit;
2374 node_name = Some("FS_FETCH");
2375 }
2376 }
2377 PbNodeBody::SourceBackfill(node) => {
2379 rate_limit = node.rate_limit;
2380 node_name = Some("SOURCE_BACKFILL");
2381 }
2382 PbNodeBody::StreamScan(node) => {
2383 rate_limit = node.rate_limit;
2384 node_name = Some("STREAM_SCAN");
2385 }
2386 PbNodeBody::StreamCdcScan(node) => {
2387 rate_limit = node.rate_limit;
2388 node_name = Some("STREAM_CDC_SCAN");
2389 }
2390 PbNodeBody::Sink(node) => {
2391 rate_limit = node.rate_limit;
2392 node_name = Some("SINK");
2393 }
2394 _ => {}
2395 }
2396
2397 if let Some(rate_limit) = rate_limit {
2398 rate_limits.push(RateLimitInfo {
2399 fragment_id: fragment_id as u32,
2400 job_id,
2401 fragment_type_mask: fragment_type_mask as u32,
2402 rate_limit,
2403 node_name: node_name.unwrap().to_owned(),
2404 });
2405 }
2406 });
2407 }
2408
2409 Ok(rate_limits)
2410 }
2411}
2412
2413fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2414 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2416 Some(connector) => {
2417 let connector_type = connector.to_lowercase();
2418 let field_names: Vec<String> = props.keys().cloned().collect();
2419 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2420 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2421
2422 match_sink_name_str!(
2423 connector_type.as_str(),
2424 SinkType,
2425 {
2426 let mut new_props = sink.properties.0.clone();
2427 new_props.extend(props.clone());
2428 SinkType::validate_alter_config(&new_props)
2429 },
2430 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2431 )?
2432 }
2433 None => {
2434 return Err(
2435 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2436 );
2437 }
2438 };
2439 Ok(())
2440}
2441
2442fn update_stmt_with_props(
2443 with_properties: &mut Vec<SqlOption>,
2444 props: &BTreeMap<String, String>,
2445) -> MetaResult<()> {
2446 let mut new_sql_options = with_properties
2447 .iter()
2448 .map(|sql_option| (&sql_option.name, sql_option))
2449 .collect::<IndexMap<_, _>>();
2450 let add_sql_options = props
2451 .iter()
2452 .map(|(k, v)| SqlOption::try_from((k, v)))
2453 .collect::<Result<Vec<SqlOption>, ParserError>>()
2454 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2455 new_sql_options.extend(
2456 add_sql_options
2457 .iter()
2458 .map(|sql_option| (&sql_option.name, sql_option)),
2459 );
2460 *with_properties = new_sql_options.into_values().cloned().collect();
2461 Ok(())
2462}
2463
2464async fn update_sink_fragment_props(
2465 txn: &DatabaseTransaction,
2466 sink_id: SinkId,
2467 props: BTreeMap<String, String>,
2468) -> MetaResult<()> {
2469 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2470 .select_only()
2471 .columns([
2472 fragment::Column::FragmentId,
2473 fragment::Column::FragmentTypeMask,
2474 fragment::Column::StreamNode,
2475 ])
2476 .filter(fragment::Column::JobId.eq(sink_id))
2477 .into_tuple()
2478 .all(txn)
2479 .await?;
2480 let fragments = fragments
2481 .into_iter()
2482 .filter(|(_, fragment_type_mask, _)| {
2483 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
2484 })
2485 .filter_map(|(id, _, stream_node)| {
2486 let mut stream_node = stream_node.to_protobuf();
2487 let mut found = false;
2488 visit_stream_node_mut(&mut stream_node, |node| {
2489 if let PbNodeBody::Sink(node) = node
2490 && let Some(sink_desc) = &mut node.sink_desc
2491 && sink_desc.id == sink_id as u32
2492 {
2493 sink_desc.properties.extend(props.clone());
2494 found = true;
2495 }
2496 });
2497 if found { Some((id, stream_node)) } else { None }
2498 })
2499 .collect_vec();
2500 assert!(
2501 !fragments.is_empty(),
2502 "sink id should be used by at least one fragment"
2503 );
2504 for (id, stream_node) in fragments {
2505 fragment::ActiveModel {
2506 fragment_id: Set(id),
2507 stream_node: Set(StreamNode::from(&stream_node)),
2508 ..Default::default()
2509 }
2510 .update(txn)
2511 .await?;
2512 }
2513 Ok(())
2514}
2515
2516pub struct SinkIntoTableContext {
2517 pub updated_sink_catalogs: Vec<SinkId>,
2520}
2521
2522pub struct FinishAutoRefreshSchemaSinkContext {
2523 pub tmp_sink_id: JobId,
2524 pub original_sink_id: ObjectId,
2525 pub columns: Vec<PbColumnCatalog>,
2526 pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
2527}
2528
2529async fn update_connector_props_fragments<F>(
2530 txn: &DatabaseTransaction,
2531 job_ids: Vec<JobId>,
2532 expect_flag: FragmentTypeFlag,
2533 mut alter_stream_node_fn: F,
2534 is_shared_source: bool,
2535) -> MetaResult<()>
2536where
2537 F: FnMut(&mut PbNodeBody, &mut bool),
2538{
2539 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2540 .select_only()
2541 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2542 .filter(
2543 fragment::Column::JobId
2544 .is_in(job_ids.clone())
2545 .and(FragmentTypeMask::intersects(expect_flag)),
2546 )
2547 .into_tuple()
2548 .all(txn)
2549 .await?;
2550 let fragments = fragments
2551 .into_iter()
2552 .filter_map(|(id, stream_node)| {
2553 let mut stream_node = stream_node.to_protobuf();
2554 let mut found = false;
2555 visit_stream_node_mut(&mut stream_node, |node| {
2556 alter_stream_node_fn(node, &mut found);
2557 });
2558 if found { Some((id, stream_node)) } else { None }
2559 })
2560 .collect_vec();
2561 if is_shared_source || job_ids.len() > 1 {
2562 assert!(
2566 !fragments.is_empty(),
2567 "job ids {:?} (type: {:?}) should be used by at least one fragment",
2568 job_ids,
2569 expect_flag
2570 );
2571 }
2572
2573 for (id, stream_node) in fragments {
2574 fragment::ActiveModel {
2575 fragment_id: Set(id),
2576 stream_node: Set(StreamNode::from(&stream_node)),
2577 ..Default::default()
2578 }
2579 .update(txn)
2580 .await?;
2581 }
2582
2583 Ok(())
2584}