1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_common::util::stream_graph_visitor::{
26 visit_stream_node_body, visit_stream_node_mut,
27};
28use risingwave_common::{bail, current_cluster_version};
29use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
30use risingwave_connector::error::ConnectorError;
31use risingwave_connector::sink::file_sink::fs::FsSink;
32use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
33use risingwave_connector::source::{ConnectorProperties, SplitImpl};
34use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
35use risingwave_meta_model::actor::ActorStatus;
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::user_privilege::Action;
40use risingwave_meta_model::*;
41use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
42use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
43use risingwave_pb::catalog::{PbCreateType, PbSink, PbTable};
44use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
45use risingwave_pb::meta::object::PbObjectInfo;
46use risingwave_pb::meta::subscribe_response::{
47 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
48};
49use risingwave_pb::meta::table_fragments::PbActorStatus;
50use risingwave_pb::meta::{PbObject, PbObjectGroup};
51use risingwave_pb::plan_common::PbColumnCatalog;
52use risingwave_pb::secret::PbSecretRef;
53use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
55use risingwave_pb::stream_plan::stream_node::PbNodeBody;
56use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
57use risingwave_pb::user::PbUserInfo;
58use risingwave_sqlparser::ast::{SqlOption, Statement};
59use risingwave_sqlparser::parser::{Parser, ParserError};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr};
62use sea_orm::{
63 ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
64 IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect,
65 RelationTrait, TransactionTrait,
66};
67use thiserror_ext::AsReport;
68
69use super::rename::IndexItemRewriter;
70use crate::barrier::{ReplaceStreamJobPlan, Reschedule};
71use crate::controller::ObjectModel;
72use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
73use crate::controller::utils::{
74 PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
75 check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
76 get_internal_tables_by_id, get_table_columns, grant_default_privileges_automatically,
77 insert_fragment_relations, list_user_info_by_ids,
78};
79use crate::error::MetaErrorInner;
80use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
81use crate::model::{
82 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
83 StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
84};
85use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
86use crate::{MetaError, MetaResult};
87
88impl CatalogController {
89 pub async fn create_streaming_job_obj(
90 txn: &DatabaseTransaction,
91 obj_type: ObjectType,
92 owner_id: UserId,
93 database_id: Option<DatabaseId>,
94 schema_id: Option<SchemaId>,
95 create_type: PbCreateType,
96 timezone: Option<String>,
97 streaming_parallelism: StreamingParallelism,
98 max_parallelism: usize,
99 specific_resource_group: Option<String>, ) -> MetaResult<ObjectId> {
101 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
102 let job = streaming_job::ActiveModel {
103 job_id: Set(obj.oid),
104 job_status: Set(JobStatus::Initial),
105 create_type: Set(create_type.into()),
106 timezone: Set(timezone),
107 parallelism: Set(streaming_parallelism),
108 max_parallelism: Set(max_parallelism as _),
109 specific_resource_group: Set(specific_resource_group),
110 };
111 job.insert(txn).await?;
112
113 Ok(obj.oid)
114 }
115
116 #[await_tree::instrument]
122 pub async fn create_job_catalog(
123 &self,
124 streaming_job: &mut StreamingJob,
125 ctx: &StreamContext,
126 parallelism: &Option<Parallelism>,
127 max_parallelism: usize,
128 mut dependencies: HashSet<ObjectId>,
129 specific_resource_group: Option<String>,
130 ) -> MetaResult<()> {
131 let inner = self.inner.write().await;
132 let txn = inner.db.begin().await?;
133 let create_type = streaming_job.create_type();
134
135 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
136 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
137 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
138 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
139 };
140
141 ensure_user_id(streaming_job.owner() as _, &txn).await?;
142 ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
143 ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
144 check_relation_name_duplicate(
145 &streaming_job.name(),
146 streaming_job.database_id() as _,
147 streaming_job.schema_id() as _,
148 &txn,
149 )
150 .await?;
151
152 if !dependencies.is_empty() {
154 let altering_cnt = ObjectDependency::find()
155 .join(
156 JoinType::InnerJoin,
157 object_dependency::Relation::Object1.def(),
158 )
159 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
160 .filter(
161 object_dependency::Column::Oid
162 .is_in(dependencies.clone())
163 .and(object::Column::ObjType.eq(ObjectType::Table))
164 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
165 .and(
166 object::Column::Oid.not_in_subquery(
168 Query::select()
169 .column(table::Column::TableId)
170 .from(Table)
171 .to_owned(),
172 ),
173 ),
174 )
175 .count(&txn)
176 .await?;
177 if altering_cnt != 0 {
178 return Err(MetaError::permission_denied(
179 "some dependent relations are being altered",
180 ));
181 }
182 }
183
184 match streaming_job {
185 StreamingJob::MaterializedView(table) => {
186 let job_id = Self::create_streaming_job_obj(
187 &txn,
188 ObjectType::Table,
189 table.owner as _,
190 Some(table.database_id as _),
191 Some(table.schema_id as _),
192 create_type,
193 ctx.timezone.clone(),
194 streaming_parallelism,
195 max_parallelism,
196 specific_resource_group,
197 )
198 .await?;
199 table.id = job_id as _;
200 let table_model: table::ActiveModel = table.clone().into();
201 Table::insert(table_model).exec(&txn).await?;
202 }
203 StreamingJob::Sink(sink, _) => {
204 if let Some(target_table_id) = sink.target_table
205 && check_sink_into_table_cycle(
206 target_table_id as ObjectId,
207 dependencies.iter().cloned().collect(),
208 &txn,
209 )
210 .await?
211 {
212 bail!("Creating such a sink will result in circular dependency.");
213 }
214
215 let job_id = Self::create_streaming_job_obj(
216 &txn,
217 ObjectType::Sink,
218 sink.owner as _,
219 Some(sink.database_id as _),
220 Some(sink.schema_id as _),
221 create_type,
222 ctx.timezone.clone(),
223 streaming_parallelism,
224 max_parallelism,
225 specific_resource_group,
226 )
227 .await?;
228 sink.id = job_id as _;
229 let sink_model: sink::ActiveModel = sink.clone().into();
230 Sink::insert(sink_model).exec(&txn).await?;
231 }
232 StreamingJob::Table(src, table, _) => {
233 let job_id = Self::create_streaming_job_obj(
234 &txn,
235 ObjectType::Table,
236 table.owner as _,
237 Some(table.database_id as _),
238 Some(table.schema_id as _),
239 create_type,
240 ctx.timezone.clone(),
241 streaming_parallelism,
242 max_parallelism,
243 specific_resource_group,
244 )
245 .await?;
246 table.id = job_id as _;
247 if let Some(src) = src {
248 let src_obj = Self::create_object(
249 &txn,
250 ObjectType::Source,
251 src.owner as _,
252 Some(src.database_id as _),
253 Some(src.schema_id as _),
254 )
255 .await?;
256 src.id = src_obj.oid as _;
257 src.optional_associated_table_id =
258 Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
259 table.optional_associated_source_id = Some(
260 PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
261 );
262 let source: source::ActiveModel = src.clone().into();
263 Source::insert(source).exec(&txn).await?;
264 }
265 let table_model: table::ActiveModel = table.clone().into();
266 Table::insert(table_model).exec(&txn).await?;
267 }
268 StreamingJob::Index(index, table) => {
269 ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
270 let job_id = Self::create_streaming_job_obj(
271 &txn,
272 ObjectType::Index,
273 index.owner as _,
274 Some(index.database_id as _),
275 Some(index.schema_id as _),
276 create_type,
277 ctx.timezone.clone(),
278 streaming_parallelism,
279 max_parallelism,
280 specific_resource_group,
281 )
282 .await?;
283 index.id = job_id as _;
285 index.index_table_id = job_id as _;
286 table.id = job_id as _;
287
288 ObjectDependency::insert(object_dependency::ActiveModel {
289 oid: Set(index.primary_table_id as _),
290 used_by: Set(table.id as _),
291 ..Default::default()
292 })
293 .exec(&txn)
294 .await?;
295
296 let table_model: table::ActiveModel = table.clone().into();
297 Table::insert(table_model).exec(&txn).await?;
298 let index_model: index::ActiveModel = index.clone().into();
299 Index::insert(index_model).exec(&txn).await?;
300 }
301 StreamingJob::Source(src) => {
302 let job_id = Self::create_streaming_job_obj(
303 &txn,
304 ObjectType::Source,
305 src.owner as _,
306 Some(src.database_id as _),
307 Some(src.schema_id as _),
308 create_type,
309 ctx.timezone.clone(),
310 streaming_parallelism,
311 max_parallelism,
312 specific_resource_group,
313 )
314 .await?;
315 src.id = job_id as _;
316 let source_model: source::ActiveModel = src.clone().into();
317 Source::insert(source_model).exec(&txn).await?;
318 }
319 }
320
321 dependencies.extend(
323 streaming_job
324 .dependent_secret_ids()?
325 .into_iter()
326 .map(|secret_id| secret_id as ObjectId),
327 );
328 dependencies.extend(
330 streaming_job
331 .dependent_connection_ids()?
332 .into_iter()
333 .map(|conn_id| conn_id as ObjectId),
334 );
335
336 if !dependencies.is_empty() {
338 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
339 object_dependency::ActiveModel {
340 oid: Set(oid),
341 used_by: Set(streaming_job.id() as _),
342 ..Default::default()
343 }
344 }))
345 .exec(&txn)
346 .await?;
347 }
348
349 txn.commit().await?;
350
351 Ok(())
352 }
353
354 pub async fn create_internal_table_catalog(
362 &self,
363 job: &StreamingJob,
364 mut incomplete_internal_tables: Vec<PbTable>,
365 ) -> MetaResult<HashMap<u32, u32>> {
366 let job_id = job.id() as ObjectId;
367 let inner = self.inner.write().await;
368 let txn = inner.db.begin().await?;
369 let mut table_id_map = HashMap::new();
370 for table in &mut incomplete_internal_tables {
371 let table_id = Self::create_object(
372 &txn,
373 ObjectType::Table,
374 table.owner as _,
375 Some(table.database_id as _),
376 Some(table.schema_id as _),
377 )
378 .await?
379 .oid;
380 table_id_map.insert(table.id, table_id as u32);
381 table.id = table_id as _;
382 table.job_id = Some(job_id as _);
383
384 let table_model = table::ActiveModel {
385 table_id: Set(table_id as _),
386 belongs_to_job_id: Set(Some(job_id)),
387 fragment_id: NotSet,
388 ..table.clone().into()
389 };
390 Table::insert(table_model).exec(&txn).await?;
391 }
392 txn.commit().await?;
393
394 Ok(table_id_map)
395 }
396
397 pub async fn prepare_stream_job_fragments(
398 &self,
399 stream_job_fragments: &StreamJobFragmentsToCreate,
400 streaming_job: &StreamingJob,
401 for_replace: bool,
402 ) -> MetaResult<()> {
403 let need_notify = streaming_job.should_notify_creating();
404 let (sink, table) = match streaming_job {
405 StreamingJob::Sink(sink, _) => (Some(sink), None),
406 StreamingJob::Table(_, table, _) => (None, Some(table)),
407 StreamingJob::Index(_, _)
408 | StreamingJob::Source(_)
409 | StreamingJob::MaterializedView(_) => (None, None),
410 };
411 self.prepare_streaming_job(
412 stream_job_fragments.stream_job_id().table_id as _,
413 || stream_job_fragments.fragments.values(),
414 &stream_job_fragments.actor_status,
415 &stream_job_fragments.actor_splits,
416 &stream_job_fragments.downstreams,
417 need_notify,
418 streaming_job.definition(),
419 for_replace,
420 sink,
421 table,
422 )
423 .await
424 }
425
426 #[expect(clippy::too_many_arguments)]
431 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
432 )]
433 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
434 &self,
435 job_id: ObjectId,
436 get_fragments: impl Fn() -> I + 'a,
437 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
438 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
439 downstreams: &FragmentDownstreamRelation,
440 need_notify: bool,
441 definition: String,
442 for_replace: bool,
443 sink: Option<&PbSink>,
444 table: Option<&PbTable>,
445 ) -> MetaResult<()> {
446 let fragment_actors = Self::extract_fragment_and_actors_from_fragments(
447 job_id,
448 get_fragments(),
449 actor_status,
450 actor_splits,
451 )?;
452 let inner = self.inner.write().await;
453
454 let mut objects_to_notify = if need_notify { Some(vec![]) } else { None };
455 let txn = inner.db.begin().await?;
456
457 let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
459 for fragment in fragments {
460 let fragment_id = fragment.fragment_id;
461 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
462
463 let fragment = fragment.into_active_model();
464 Fragment::insert(fragment).exec(&txn).await?;
465
466 if !for_replace {
469 let all_tables = StreamJobFragments::collect_tables(get_fragments());
470 for state_table_id in state_table_ids {
471 let table = all_tables
475 .get(&(state_table_id as u32))
476 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
477 assert_eq!(table.id, state_table_id as u32);
478 assert_eq!(table.fragment_id, fragment_id as u32);
479 let vnode_count = table.vnode_count();
480
481 table::ActiveModel {
482 table_id: Set(state_table_id as _),
483 fragment_id: Set(Some(fragment_id)),
484 vnode_count: Set(vnode_count as _),
485 ..Default::default()
486 }
487 .update(&txn)
488 .await?;
489
490 if let Some(objects) = &mut objects_to_notify {
491 let mut table = table.clone();
492 if cfg!(not(debug_assertions)) && table.id == job_id as u32 {
494 table.definition = definition.clone();
495 }
496 objects.push(PbObject {
497 object_info: Some(PbObjectInfo::Table(table)),
498 });
499 }
500 }
501 }
502 }
503
504 if let Some(objects) = &mut objects_to_notify
505 && let Some(sink) = sink
506 {
507 objects.push(PbObject {
508 object_info: Some(PbObjectInfo::Sink(sink.clone())),
509 })
510 }
511
512 insert_fragment_relations(&txn, downstreams).await?;
513
514 for actors in actors {
516 for actor in actors {
517 let actor = actor.into_active_model();
518 Actor::insert(actor).exec(&txn).await?;
519 }
520 }
521
522 if !for_replace {
523 if let Some(table) = table {
525 Table::update(table::ActiveModel {
526 table_id: Set(table.id as _),
527 dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
528 ..Default::default()
529 })
530 .exec(&txn)
531 .await?;
532 }
533 }
534
535 txn.commit().await?;
536
537 if let Some(objects) = objects_to_notify
538 && !objects.is_empty()
539 {
540 self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
541 .await;
542 }
543
544 Ok(())
545 }
546
547 #[await_tree::instrument]
551 pub async fn try_abort_creating_streaming_job(
552 &self,
553 job_id: ObjectId,
554 is_cancelled: bool,
555 ) -> MetaResult<(bool, Option<DatabaseId>)> {
556 let mut inner = self.inner.write().await;
557 let txn = inner.db.begin().await?;
558
559 let obj = Object::find_by_id(job_id).one(&txn).await?;
560 let Some(obj) = obj else {
561 tracing::warn!(
562 id = job_id,
563 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
564 );
565 return Ok((true, None));
566 };
567 let database_id = obj
568 .database_id
569 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
570 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
571
572 if !is_cancelled && let Some(streaming_job) = &streaming_job {
573 assert_ne!(streaming_job.job_status, JobStatus::Created);
574 if streaming_job.create_type == CreateType::Background
575 && streaming_job.job_status == JobStatus::Creating
576 {
577 tracing::warn!(
579 id = job_id,
580 "streaming job is created in background and still in creating status"
581 );
582 return Ok((false, Some(database_id)));
583 }
584 }
585
586 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
587
588 let mut objs = vec![];
590 let table_obj = Table::find_by_id(job_id).one(&txn).await?;
591 let need_notify = if let Some(table) = &table_obj {
592 table.table_type == TableType::MaterializedView
594 } else {
595 streaming_job.is_some_and(|job| job.create_type == CreateType::Background)
596 };
597 if need_notify {
598 let obj: Option<PartialObject> = Object::find_by_id(job_id)
599 .select_only()
600 .columns([
601 object::Column::Oid,
602 object::Column::ObjType,
603 object::Column::SchemaId,
604 object::Column::DatabaseId,
605 ])
606 .into_partial_model()
607 .one(&txn)
608 .await?;
609 let obj =
610 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
611 objs.push(obj);
612 let internal_table_objs: Vec<PartialObject> = Object::find()
613 .select_only()
614 .columns([
615 object::Column::Oid,
616 object::Column::ObjType,
617 object::Column::SchemaId,
618 object::Column::DatabaseId,
619 ])
620 .join(JoinType::InnerJoin, object::Relation::Table.def())
621 .filter(table::Column::BelongsToJobId.eq(job_id))
622 .into_partial_model()
623 .all(&txn)
624 .await?;
625 objs.extend(internal_table_objs);
626 }
627
628 if table_obj.is_none()
630 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id)
631 .select_only()
632 .column(sink::Column::TargetTable)
633 .into_tuple::<Option<TableId>>()
634 .one(&txn)
635 .await?
636 {
637 let tmp_id: Option<ObjectId> = ObjectDependency::find()
638 .select_only()
639 .column(object_dependency::Column::UsedBy)
640 .join(
641 JoinType::InnerJoin,
642 object_dependency::Relation::Object1.def(),
643 )
644 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
645 .filter(
646 object_dependency::Column::Oid
647 .eq(target_table_id)
648 .and(object::Column::ObjType.eq(ObjectType::Table))
649 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
650 )
651 .into_tuple()
652 .one(&txn)
653 .await?;
654 if let Some(tmp_id) = tmp_id {
655 tracing::warn!(
656 id = tmp_id,
657 "aborting temp streaming job for sink into table"
658 );
659 Object::delete_by_id(tmp_id).exec(&txn).await?;
660 }
661 }
662
663 Object::delete_by_id(job_id).exec(&txn).await?;
664 if !internal_table_ids.is_empty() {
665 Object::delete_many()
666 .filter(object::Column::Oid.is_in(internal_table_ids))
667 .exec(&txn)
668 .await?;
669 }
670 if let Some(t) = &table_obj
671 && let Some(source_id) = t.optional_associated_source_id
672 {
673 Object::delete_by_id(source_id).exec(&txn).await?;
674 }
675
676 let err = if is_cancelled {
677 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
678 } else {
679 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
680 };
681 let abort_reason = format!("streaming job aborted {}", err.as_report());
682 for tx in inner
683 .creating_table_finish_notifier
684 .get_mut(&database_id)
685 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
686 .into_iter()
687 .flatten()
688 .flatten()
689 {
690 let _ = tx.send(Err(abort_reason.clone()));
691 }
692 txn.commit().await?;
693
694 if !objs.is_empty() {
695 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
698 .await;
699 }
700 Ok((true, Some(database_id)))
701 }
702
703 #[await_tree::instrument]
704 pub async fn post_collect_job_fragments(
705 &self,
706 job_id: ObjectId,
707 actor_ids: Vec<crate::model::ActorId>,
708 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
709 split_assignment: &SplitAssignment,
710 replace_plan: Option<&ReplaceStreamJobPlan>,
711 ) -> MetaResult<()> {
712 let inner = self.inner.write().await;
713 let txn = inner.db.begin().await?;
714
715 let actor_ids = actor_ids
716 .into_iter()
717 .chain(
718 replace_plan
719 .iter()
720 .flat_map(|plan| plan.new_fragments.actor_ids().into_iter()),
721 )
722 .map(|id| id as ActorId)
723 .collect_vec();
724
725 Actor::update_many()
726 .col_expr(
727 actor::Column::Status,
728 SimpleExpr::from(ActorStatus::Running.into_value()),
729 )
730 .filter(actor::Column::ActorId.is_in(actor_ids))
731 .exec(&txn)
732 .await?;
733
734 for splits in split_assignment.values().chain(
735 replace_plan
736 .as_ref()
737 .map(|plan| plan.init_split_assignment.values())
738 .into_iter()
739 .flatten(),
740 ) {
741 for (actor_id, splits) in splits {
742 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
743 let connector_splits = &PbConnectorSplits { splits };
744 actor::ActiveModel {
745 actor_id: Set(*actor_id as _),
746 splits: Set(Some(connector_splits.into())),
747 ..Default::default()
748 }
749 .update(&txn)
750 .await?;
751 }
752 }
753
754 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
755 let objects = if let Some(plan) = &replace_plan {
756 insert_fragment_relations(&txn, &plan.upstream_fragment_downstreams).await?;
757
758 let incoming_sink_id = job_id;
759 let (objects, _) = Self::finish_replace_streaming_job_inner(
760 plan.tmp_id as _,
761 plan.replace_upstream.clone(),
762 SinkIntoTableContext {
763 creating_sink_id: Some(incoming_sink_id),
764 dropping_sink_id: None,
765 updated_sink_catalogs: vec![],
766 },
767 &txn,
768 plan.streaming_job.clone(),
769 None,
770 None,
771 )
772 .await?;
773 objects
774 } else {
775 vec![]
776 };
777
778 streaming_job::ActiveModel {
780 job_id: Set(job_id),
781 job_status: Set(JobStatus::Creating),
782 ..Default::default()
783 }
784 .update(&txn)
785 .await?;
786
787 txn.commit().await?;
788 if !objects.is_empty() {
789 self.notify_frontend(
790 NotificationOperation::Update,
791 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
792 )
793 .await;
794 }
795
796 Ok(())
797 }
798
799 pub async fn create_job_catalog_for_replace(
800 &self,
801 streaming_job: &StreamingJob,
802 ctx: Option<&StreamContext>,
803 specified_parallelism: Option<&NonZeroUsize>,
804 expected_original_max_parallelism: Option<usize>,
805 ) -> MetaResult<ObjectId> {
806 let id = streaming_job.id();
807 let inner = self.inner.write().await;
808 let txn = inner.db.begin().await?;
809
810 streaming_job.verify_version_for_replace(&txn).await?;
812 let referring_cnt = ObjectDependency::find()
814 .join(
815 JoinType::InnerJoin,
816 object_dependency::Relation::Object1.def(),
817 )
818 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
819 .filter(
820 object_dependency::Column::Oid
821 .eq(id as ObjectId)
822 .and(object::Column::ObjType.eq(ObjectType::Table))
823 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
824 )
825 .count(&txn)
826 .await?;
827 if referring_cnt != 0 {
828 return Err(MetaError::permission_denied(
829 "job is being altered or referenced by some creating jobs",
830 ));
831 }
832
833 let (original_max_parallelism, original_timezone): (i32, Option<String>) =
835 StreamingJobModel::find_by_id(id as ObjectId)
836 .select_only()
837 .column(streaming_job::Column::MaxParallelism)
838 .column(streaming_job::Column::Timezone)
839 .into_tuple()
840 .one(&txn)
841 .await?
842 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
843
844 if let Some(max_parallelism) = expected_original_max_parallelism
845 && original_max_parallelism != max_parallelism as i32
846 {
847 bail!(
850 "cannot use a different max parallelism \
851 when replacing streaming job, \
852 original: {}, new: {}",
853 original_max_parallelism,
854 max_parallelism
855 );
856 }
857
858 let parallelism = match specified_parallelism {
859 None => StreamingParallelism::Adaptive,
860 Some(n) => StreamingParallelism::Fixed(n.get() as _),
861 };
862 let timezone = ctx
863 .map(|ctx| ctx.timezone.clone())
864 .unwrap_or(original_timezone);
865
866 let new_obj_id = Self::create_streaming_job_obj(
868 &txn,
869 streaming_job.object_type(),
870 streaming_job.owner() as _,
871 Some(streaming_job.database_id() as _),
872 Some(streaming_job.schema_id() as _),
873 streaming_job.create_type(),
874 timezone,
875 parallelism,
876 original_max_parallelism as _,
877 None,
878 )
879 .await?;
880
881 ObjectDependency::insert(object_dependency::ActiveModel {
883 oid: Set(id as _),
884 used_by: Set(new_obj_id as _),
885 ..Default::default()
886 })
887 .exec(&txn)
888 .await?;
889
890 txn.commit().await?;
891
892 Ok(new_obj_id)
893 }
894
895 pub async fn finish_streaming_job(&self, job_id: ObjectId) -> MetaResult<()> {
897 let mut inner = self.inner.write().await;
898 let txn = inner.db.begin().await?;
899
900 let job_type = Object::find_by_id(job_id)
901 .select_only()
902 .column(object::Column::ObjType)
903 .into_tuple()
904 .one(&txn)
905 .await?
906 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
907
908 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
909 .select_only()
910 .column(streaming_job::Column::CreateType)
911 .into_tuple()
912 .one(&txn)
913 .await?
914 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
915
916 let res = Object::update_many()
918 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
919 .col_expr(
920 object::Column::CreatedAtClusterVersion,
921 current_cluster_version().into(),
922 )
923 .filter(object::Column::Oid.eq(job_id))
924 .exec(&txn)
925 .await?;
926 if res.rows_affected == 0 {
927 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
928 }
929
930 let job = streaming_job::ActiveModel {
932 job_id: Set(job_id),
933 job_status: Set(JobStatus::Created),
934 ..Default::default()
935 };
936 job.update(&txn).await?;
937
938 let internal_table_objs = Table::find()
940 .find_also_related(Object)
941 .filter(table::Column::BelongsToJobId.eq(job_id))
942 .all(&txn)
943 .await?;
944 let mut objects = internal_table_objs
945 .iter()
946 .map(|(table, obj)| PbObject {
947 object_info: Some(PbObjectInfo::Table(
948 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
949 )),
950 })
951 .collect_vec();
952 let mut notification_op = if create_type == CreateType::Background {
953 NotificationOperation::Update
954 } else {
955 NotificationOperation::Add
956 };
957 let mut updated_user_info = vec![];
958
959 match job_type {
960 ObjectType::Table => {
961 let (table, obj) = Table::find_by_id(job_id)
962 .find_also_related(Object)
963 .one(&txn)
964 .await?
965 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
966 if table.table_type == TableType::MaterializedView {
967 notification_op = NotificationOperation::Update;
968 }
969
970 if let Some(source_id) = table.optional_associated_source_id {
971 let (src, obj) = Source::find_by_id(source_id)
972 .find_also_related(Object)
973 .one(&txn)
974 .await?
975 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
976 objects.push(PbObject {
977 object_info: Some(PbObjectInfo::Source(
978 ObjectModel(src, obj.unwrap()).into(),
979 )),
980 });
981 }
982 objects.push(PbObject {
983 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
984 });
985 }
986 ObjectType::Sink => {
987 let (sink, obj) = Sink::find_by_id(job_id)
988 .find_also_related(Object)
989 .one(&txn)
990 .await?
991 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
992 objects.push(PbObject {
993 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
994 });
995 }
996 ObjectType::Index => {
997 let (index, obj) = Index::find_by_id(job_id)
998 .find_also_related(Object)
999 .one(&txn)
1000 .await?
1001 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1002 {
1003 let (table, obj) = Table::find_by_id(index.index_table_id)
1004 .find_also_related(Object)
1005 .one(&txn)
1006 .await?
1007 .ok_or_else(|| {
1008 MetaError::catalog_id_not_found("table", index.index_table_id)
1009 })?;
1010 objects.push(PbObject {
1011 object_info: Some(PbObjectInfo::Table(
1012 ObjectModel(table, obj.unwrap()).into(),
1013 )),
1014 });
1015 }
1016
1017 let primary_table_privileges = UserPrivilege::find()
1020 .filter(
1021 user_privilege::Column::Oid
1022 .eq(index.primary_table_id)
1023 .and(user_privilege::Column::Action.eq(Action::Select)),
1024 )
1025 .all(&txn)
1026 .await?;
1027 if !primary_table_privileges.is_empty() {
1028 let index_state_table_ids: Vec<TableId> = Table::find()
1029 .select_only()
1030 .column(table::Column::TableId)
1031 .filter(
1032 table::Column::BelongsToJobId
1033 .eq(job_id)
1034 .or(table::Column::TableId.eq(index.index_table_id)),
1035 )
1036 .into_tuple()
1037 .all(&txn)
1038 .await?;
1039 let mut new_privileges = vec![];
1040 for privilege in &primary_table_privileges {
1041 for state_table_id in &index_state_table_ids {
1042 new_privileges.push(user_privilege::ActiveModel {
1043 id: Default::default(),
1044 oid: Set(*state_table_id),
1045 user_id: Set(privilege.user_id),
1046 action: Set(Action::Select),
1047 dependent_id: Set(privilege.dependent_id),
1048 granted_by: Set(privilege.granted_by),
1049 with_grant_option: Set(privilege.with_grant_option),
1050 });
1051 }
1052 }
1053 UserPrivilege::insert_many(new_privileges)
1054 .exec(&txn)
1055 .await?;
1056
1057 updated_user_info = list_user_info_by_ids(
1058 primary_table_privileges.into_iter().map(|p| p.user_id),
1059 &txn,
1060 )
1061 .await?;
1062 }
1063
1064 objects.push(PbObject {
1065 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1066 });
1067 }
1068 ObjectType::Source => {
1069 let (source, obj) = Source::find_by_id(job_id)
1070 .find_also_related(Object)
1071 .one(&txn)
1072 .await?
1073 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1074 objects.push(PbObject {
1075 object_info: Some(PbObjectInfo::Source(
1076 ObjectModel(source, obj.unwrap()).into(),
1077 )),
1078 });
1079 }
1080 _ => unreachable!("invalid job type: {:?}", job_type),
1081 }
1082
1083 if job_type != ObjectType::Index {
1084 updated_user_info = grant_default_privileges_automatically(&txn, job_id).await?;
1085 }
1086 txn.commit().await?;
1087
1088 let mut version = self
1089 .notify_frontend(
1090 notification_op,
1091 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1092 )
1093 .await;
1094
1095 if !updated_user_info.is_empty() {
1097 version = self.notify_users_update(updated_user_info).await;
1098 }
1099
1100 inner
1101 .creating_table_finish_notifier
1102 .values_mut()
1103 .for_each(|creating_tables| {
1104 if let Some(txs) = creating_tables.remove(&job_id) {
1105 for tx in txs {
1106 let _ = tx.send(Ok(version));
1107 }
1108 }
1109 });
1110
1111 Ok(())
1112 }
1113
1114 pub async fn finish_replace_streaming_job(
1115 &self,
1116 tmp_id: ObjectId,
1117 streaming_job: StreamingJob,
1118 replace_upstream: FragmentReplaceUpstream,
1119 sink_into_table_context: SinkIntoTableContext,
1120 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1121 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1122 ) -> MetaResult<NotificationVersion> {
1123 let inner = self.inner.write().await;
1124 let txn = inner.db.begin().await?;
1125
1126 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1127 tmp_id,
1128 replace_upstream,
1129 sink_into_table_context,
1130 &txn,
1131 streaming_job,
1132 drop_table_connector_ctx,
1133 auto_refresh_schema_sinks,
1134 )
1135 .await?;
1136
1137 txn.commit().await?;
1138
1139 let mut version = self
1140 .notify_frontend(
1141 NotificationOperation::Update,
1142 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1143 )
1144 .await;
1145
1146 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1147 self.notify_users_update(user_infos).await;
1148 version = self
1149 .notify_frontend(
1150 NotificationOperation::Delete,
1151 build_object_group_for_delete(to_drop_objects),
1152 )
1153 .await;
1154 }
1155
1156 Ok(version)
1157 }
1158
1159 pub async fn finish_replace_streaming_job_inner(
1160 tmp_id: ObjectId,
1161 replace_upstream: FragmentReplaceUpstream,
1162 SinkIntoTableContext {
1163 creating_sink_id,
1164 dropping_sink_id,
1165 updated_sink_catalogs,
1166 }: SinkIntoTableContext,
1167 txn: &DatabaseTransaction,
1168 streaming_job: StreamingJob,
1169 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1170 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1171 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1172 let original_job_id = streaming_job.id() as ObjectId;
1173 let job_type = streaming_job.job_type();
1174
1175 let mut index_item_rewriter = None;
1176
1177 match streaming_job {
1179 StreamingJob::Table(_source, table, _table_job_type) => {
1180 let original_column_catalogs = get_table_columns(txn, original_job_id).await?;
1183
1184 index_item_rewriter = Some({
1185 let original_columns = original_column_catalogs
1186 .to_protobuf()
1187 .into_iter()
1188 .map(|c| c.column_desc.unwrap())
1189 .collect_vec();
1190 let new_columns = table
1191 .columns
1192 .iter()
1193 .map(|c| c.column_desc.clone().unwrap())
1194 .collect_vec();
1195
1196 IndexItemRewriter {
1197 original_columns,
1198 new_columns,
1199 }
1200 });
1201
1202 for sink_id in updated_sink_catalogs {
1204 sink::ActiveModel {
1205 sink_id: Set(sink_id as _),
1206 original_target_columns: Set(Some(original_column_catalogs.clone())),
1207 ..Default::default()
1208 }
1209 .update(txn)
1210 .await?;
1211 }
1212 let mut table = table::ActiveModel::from(table);
1214 let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
1215 if let Some(sink_id) = creating_sink_id {
1216 debug_assert!(!incoming_sinks.contains(&{ sink_id }));
1217 incoming_sinks.push(sink_id as _);
1218 }
1219 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1220 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1221 {
1222 table.optional_associated_source_id = Set(None);
1224 }
1225
1226 if let Some(sink_id) = dropping_sink_id {
1227 let _drained = incoming_sinks
1228 .extract_if(.., |id| *id == sink_id)
1229 .collect_vec();
1230 }
1233
1234 table.incoming_sinks = Set(incoming_sinks.into());
1235 table.update(txn).await?;
1236 }
1237 StreamingJob::Source(source) => {
1238 let source = source::ActiveModel::from(source);
1240 source.update(txn).await?;
1241 }
1242 StreamingJob::MaterializedView(table) => {
1243 let table = table::ActiveModel::from(table);
1245 table.update(txn).await?;
1246 }
1247 _ => unreachable!(
1248 "invalid streaming job type: {:?}",
1249 streaming_job.job_type_str()
1250 ),
1251 }
1252
1253 async fn finish_fragments(
1254 txn: &DatabaseTransaction,
1255 tmp_id: ObjectId,
1256 original_job_id: ObjectId,
1257 replace_upstream: FragmentReplaceUpstream,
1258 ) -> MetaResult<()> {
1259 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1263 .select_only()
1264 .columns([
1265 fragment::Column::FragmentId,
1266 fragment::Column::StateTableIds,
1267 ])
1268 .filter(fragment::Column::JobId.eq(tmp_id))
1269 .into_tuple()
1270 .all(txn)
1271 .await?;
1272 for (fragment_id, state_table_ids) in fragment_info {
1273 for state_table_id in state_table_ids.into_inner() {
1274 table::ActiveModel {
1275 table_id: Set(state_table_id as _),
1276 fragment_id: Set(Some(fragment_id)),
1277 ..Default::default()
1279 }
1280 .update(txn)
1281 .await?;
1282 }
1283 }
1284
1285 Fragment::delete_many()
1287 .filter(fragment::Column::JobId.eq(original_job_id))
1288 .exec(txn)
1289 .await?;
1290 Fragment::update_many()
1291 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1292 .filter(fragment::Column::JobId.eq(tmp_id))
1293 .exec(txn)
1294 .await?;
1295
1296 for (fragment_id, fragment_replace_map) in replace_upstream {
1299 let (fragment_id, mut stream_node) =
1300 Fragment::find_by_id(fragment_id as FragmentId)
1301 .select_only()
1302 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1303 .into_tuple::<(FragmentId, StreamNode)>()
1304 .one(txn)
1305 .await?
1306 .map(|(id, node)| (id, node.to_protobuf()))
1307 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1308
1309 visit_stream_node_mut(&mut stream_node, |body| {
1310 if let PbNodeBody::Merge(m) = body
1311 && let Some(new_fragment_id) =
1312 fragment_replace_map.get(&m.upstream_fragment_id)
1313 {
1314 m.upstream_fragment_id = *new_fragment_id;
1315 }
1316 });
1317 fragment::ActiveModel {
1318 fragment_id: Set(fragment_id),
1319 stream_node: Set(StreamNode::from(&stream_node)),
1320 ..Default::default()
1321 }
1322 .update(txn)
1323 .await?;
1324 }
1325
1326 Object::delete_by_id(tmp_id).exec(txn).await?;
1328
1329 Ok(())
1330 }
1331
1332 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1333
1334 let mut objects = vec![];
1336 match job_type {
1337 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1338 let (table, table_obj) = Table::find_by_id(original_job_id)
1339 .find_also_related(Object)
1340 .one(txn)
1341 .await?
1342 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1343 objects.push(PbObject {
1344 object_info: Some(PbObjectInfo::Table(
1345 ObjectModel(table, table_obj.unwrap()).into(),
1346 )),
1347 })
1348 }
1349 StreamingJobType::Source => {
1350 let (source, source_obj) = Source::find_by_id(original_job_id)
1351 .find_also_related(Object)
1352 .one(txn)
1353 .await?
1354 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1355 objects.push(PbObject {
1356 object_info: Some(PbObjectInfo::Source(
1357 ObjectModel(source, source_obj.unwrap()).into(),
1358 )),
1359 })
1360 }
1361 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1362 }
1363
1364 if let Some(expr_rewriter) = index_item_rewriter {
1365 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1366 .select_only()
1367 .columns([index::Column::IndexId, index::Column::IndexItems])
1368 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1369 .into_tuple()
1370 .all(txn)
1371 .await?;
1372 for (index_id, nodes) in index_items {
1373 let mut pb_nodes = nodes.to_protobuf();
1374 pb_nodes
1375 .iter_mut()
1376 .for_each(|x| expr_rewriter.rewrite_expr(x));
1377 let index = index::ActiveModel {
1378 index_id: Set(index_id),
1379 index_items: Set(pb_nodes.into()),
1380 ..Default::default()
1381 }
1382 .update(txn)
1383 .await?;
1384 let index_obj = index
1385 .find_related(Object)
1386 .one(txn)
1387 .await?
1388 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1389 objects.push(PbObject {
1390 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1391 });
1392 }
1393 }
1394
1395 if let Some(sinks) = auto_refresh_schema_sinks {
1396 for finish_sink_context in sinks {
1397 finish_fragments(
1398 txn,
1399 finish_sink_context.tmp_sink_id,
1400 finish_sink_context.original_sink_id,
1401 Default::default(),
1402 )
1403 .await?;
1404 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1405 .find_also_related(Object)
1406 .one(txn)
1407 .await?
1408 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1409 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1410 Sink::update(sink::ActiveModel {
1411 sink_id: Set(finish_sink_context.original_sink_id),
1412 columns: Set(columns.clone()),
1413 ..Default::default()
1414 })
1415 .exec(txn)
1416 .await?;
1417 sink.columns = columns;
1418 objects.push(PbObject {
1419 object_info: Some(PbObjectInfo::Sink(
1420 ObjectModel(sink, sink_obj.unwrap()).into(),
1421 )),
1422 });
1423 if let Some((log_store_table_id, new_log_store_table_columns)) =
1424 finish_sink_context.new_log_store_table
1425 {
1426 let new_log_store_table_columns: ColumnCatalogArray =
1427 new_log_store_table_columns.into();
1428 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1429 .find_also_related(Object)
1430 .one(txn)
1431 .await?
1432 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1433 Table::update(table::ActiveModel {
1434 table_id: Set(log_store_table_id),
1435 columns: Set(new_log_store_table_columns.clone()),
1436 ..Default::default()
1437 })
1438 .exec(txn)
1439 .await?;
1440 table.columns = new_log_store_table_columns;
1441 objects.push(PbObject {
1442 object_info: Some(PbObjectInfo::Table(
1443 ObjectModel(table, table_obj.unwrap()).into(),
1444 )),
1445 });
1446 }
1447 }
1448 }
1449
1450 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1451 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1452 notification_objs =
1453 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1454 }
1455
1456 Ok((objects, notification_objs))
1457 }
1458
1459 pub async fn try_abort_replacing_streaming_job(
1461 &self,
1462 tmp_job_id: ObjectId,
1463 tmp_sink_ids: Option<Vec<ObjectId>>,
1464 ) -> MetaResult<()> {
1465 let inner = self.inner.write().await;
1466 let txn = inner.db.begin().await?;
1467 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1468 if let Some(tmp_sink_ids) = tmp_sink_ids {
1469 for tmp_sink_id in tmp_sink_ids {
1470 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1471 }
1472 }
1473 txn.commit().await?;
1474 Ok(())
1475 }
1476
1477 pub async fn update_source_rate_limit_by_source_id(
1480 &self,
1481 source_id: SourceId,
1482 rate_limit: Option<u32>,
1483 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1484 let inner = self.inner.read().await;
1485 let txn = inner.db.begin().await?;
1486
1487 {
1488 let active_source = source::ActiveModel {
1489 source_id: Set(source_id),
1490 rate_limit: Set(rate_limit.map(|v| v as i32)),
1491 ..Default::default()
1492 };
1493 active_source.update(&txn).await?;
1494 }
1495
1496 let (source, obj) = Source::find_by_id(source_id)
1497 .find_also_related(Object)
1498 .one(&txn)
1499 .await?
1500 .ok_or_else(|| {
1501 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1502 })?;
1503
1504 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1505 let streaming_job_ids: Vec<ObjectId> =
1506 if let Some(table_id) = source.optional_associated_table_id {
1507 vec![table_id]
1508 } else if let Some(source_info) = &source.source_info
1509 && source_info.to_protobuf().is_shared()
1510 {
1511 vec![source_id]
1512 } else {
1513 ObjectDependency::find()
1514 .select_only()
1515 .column(object_dependency::Column::UsedBy)
1516 .filter(object_dependency::Column::Oid.eq(source_id))
1517 .into_tuple()
1518 .all(&txn)
1519 .await?
1520 };
1521
1522 if streaming_job_ids.is_empty() {
1523 return Err(MetaError::invalid_parameter(format!(
1524 "source id {source_id} not used by any streaming job"
1525 )));
1526 }
1527
1528 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1529 .select_only()
1530 .columns([
1531 fragment::Column::FragmentId,
1532 fragment::Column::FragmentTypeMask,
1533 fragment::Column::StreamNode,
1534 ])
1535 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1536 .into_tuple()
1537 .all(&txn)
1538 .await?;
1539 let mut fragments = fragments
1540 .into_iter()
1541 .map(|(id, mask, stream_node)| {
1542 (
1543 id,
1544 FragmentTypeMask::from(mask as u32),
1545 stream_node.to_protobuf(),
1546 )
1547 })
1548 .collect_vec();
1549
1550 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1551 let mut found = false;
1552 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1553 visit_stream_node_mut(stream_node, |node| {
1554 if let PbNodeBody::Source(node) = node
1555 && let Some(node_inner) = &mut node.source_inner
1556 && node_inner.source_id == source_id as u32
1557 {
1558 node_inner.rate_limit = rate_limit;
1559 found = true;
1560 }
1561 });
1562 }
1563 if is_fs_source {
1564 visit_stream_node_mut(stream_node, |node| {
1567 if let PbNodeBody::StreamFsFetch(node) = node {
1568 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1569 if let Some(node_inner) = &mut node.node_inner
1570 && node_inner.source_id == source_id as u32
1571 {
1572 node_inner.rate_limit = rate_limit;
1573 found = true;
1574 }
1575 }
1576 });
1577 }
1578 found
1579 });
1580
1581 assert!(
1582 !fragments.is_empty(),
1583 "source id should be used by at least one fragment"
1584 );
1585 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1586 for (id, fragment_type_mask, stream_node) in fragments {
1587 fragment::ActiveModel {
1588 fragment_id: Set(id),
1589 fragment_type_mask: Set(fragment_type_mask.into()),
1590 stream_node: Set(StreamNode::from(&stream_node)),
1591 ..Default::default()
1592 }
1593 .update(&txn)
1594 .await?;
1595 }
1596 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1597
1598 txn.commit().await?;
1599
1600 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1601 let _version = self
1602 .notify_frontend(
1603 NotificationOperation::Update,
1604 NotificationInfo::ObjectGroup(PbObjectGroup {
1605 objects: vec![PbObject {
1606 object_info: Some(relation_info),
1607 }],
1608 }),
1609 )
1610 .await;
1611
1612 Ok(fragment_actors)
1613 }
1614
1615 pub async fn mutate_fragments_by_job_id(
1618 &self,
1619 job_id: ObjectId,
1620 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1622 err_msg: &'static str,
1624 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1625 let inner = self.inner.read().await;
1626 let txn = inner.db.begin().await?;
1627
1628 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1629 .select_only()
1630 .columns([
1631 fragment::Column::FragmentId,
1632 fragment::Column::FragmentTypeMask,
1633 fragment::Column::StreamNode,
1634 ])
1635 .filter(fragment::Column::JobId.eq(job_id))
1636 .into_tuple()
1637 .all(&txn)
1638 .await?;
1639 let mut fragments = fragments
1640 .into_iter()
1641 .map(|(id, mask, stream_node)| {
1642 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1643 })
1644 .collect_vec();
1645
1646 let fragments = fragments
1647 .iter_mut()
1648 .map(|(_, fragment_type_mask, stream_node)| {
1649 fragments_mutation_fn(*fragment_type_mask, stream_node)
1650 })
1651 .collect::<MetaResult<Vec<bool>>>()?
1652 .into_iter()
1653 .zip_eq_debug(std::mem::take(&mut fragments))
1654 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1655 .collect::<Vec<_>>();
1656
1657 if fragments.is_empty() {
1658 return Err(MetaError::invalid_parameter(format!(
1659 "job id {job_id}: {}",
1660 err_msg
1661 )));
1662 }
1663
1664 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1665 for (id, _, stream_node) in fragments {
1666 fragment::ActiveModel {
1667 fragment_id: Set(id),
1668 stream_node: Set(StreamNode::from(&stream_node)),
1669 ..Default::default()
1670 }
1671 .update(&txn)
1672 .await?;
1673 }
1674 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1675
1676 txn.commit().await?;
1677
1678 Ok(fragment_actors)
1679 }
1680
1681 async fn mutate_fragment_by_fragment_id(
1682 &self,
1683 fragment_id: FragmentId,
1684 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1685 err_msg: &'static str,
1686 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1687 let inner = self.inner.read().await;
1688 let txn = inner.db.begin().await?;
1689
1690 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1691 Fragment::find_by_id(fragment_id)
1692 .select_only()
1693 .columns([
1694 fragment::Column::FragmentTypeMask,
1695 fragment::Column::StreamNode,
1696 ])
1697 .into_tuple()
1698 .one(&txn)
1699 .await?
1700 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1701 let mut pb_stream_node = stream_node.to_protobuf();
1702 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1703
1704 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1705 return Err(MetaError::invalid_parameter(format!(
1706 "fragment id {fragment_id}: {}",
1707 err_msg
1708 )));
1709 }
1710
1711 fragment::ActiveModel {
1712 fragment_id: Set(fragment_id),
1713 stream_node: Set(stream_node),
1714 ..Default::default()
1715 }
1716 .update(&txn)
1717 .await?;
1718
1719 let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1720
1721 txn.commit().await?;
1722
1723 Ok(fragment_actors)
1724 }
1725
1726 pub async fn update_backfill_rate_limit_by_job_id(
1729 &self,
1730 job_id: ObjectId,
1731 rate_limit: Option<u32>,
1732 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1733 let update_backfill_rate_limit =
1734 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1735 let mut found = false;
1736 if fragment_type_mask
1737 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1738 {
1739 visit_stream_node_mut(stream_node, |node| match node {
1740 PbNodeBody::StreamCdcScan(node) => {
1741 node.rate_limit = rate_limit;
1742 found = true;
1743 }
1744 PbNodeBody::StreamScan(node) => {
1745 node.rate_limit = rate_limit;
1746 found = true;
1747 }
1748 PbNodeBody::SourceBackfill(node) => {
1749 node.rate_limit = rate_limit;
1750 found = true;
1751 }
1752 PbNodeBody::Sink(node) => {
1753 node.rate_limit = rate_limit;
1754 found = true;
1755 }
1756 _ => {}
1757 });
1758 }
1759 Ok(found)
1760 };
1761
1762 self.mutate_fragments_by_job_id(
1763 job_id,
1764 update_backfill_rate_limit,
1765 "stream scan node or source node not found",
1766 )
1767 .await
1768 }
1769
1770 pub async fn update_sink_rate_limit_by_job_id(
1773 &self,
1774 job_id: ObjectId,
1775 rate_limit: Option<u32>,
1776 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1777 let update_sink_rate_limit =
1778 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1779 let mut found = Ok(false);
1780 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1781 visit_stream_node_mut(stream_node, |node| {
1782 if let PbNodeBody::Sink(node) = node {
1783 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1784 found = Err(MetaError::invalid_parameter(
1785 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1786 ));
1787 return;
1788 }
1789 node.rate_limit = rate_limit;
1790 found = Ok(true);
1791 }
1792 });
1793 }
1794 found
1795 };
1796
1797 self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1798 .await
1799 }
1800
1801 pub async fn update_dml_rate_limit_by_job_id(
1802 &self,
1803 job_id: ObjectId,
1804 rate_limit: Option<u32>,
1805 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1806 let update_dml_rate_limit =
1807 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1808 let mut found = false;
1809 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1810 visit_stream_node_mut(stream_node, |node| {
1811 if let PbNodeBody::Dml(node) = node {
1812 node.rate_limit = rate_limit;
1813 found = true;
1814 }
1815 });
1816 }
1817 Ok(found)
1818 };
1819
1820 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1821 .await
1822 }
1823
1824 pub async fn update_source_props_by_source_id(
1825 &self,
1826 source_id: SourceId,
1827 alter_props: BTreeMap<String, String>,
1828 alter_secret_refs: BTreeMap<String, PbSecretRef>,
1829 ) -> MetaResult<WithOptionsSecResolved> {
1830 let inner = self.inner.read().await;
1831 let txn = inner.db.begin().await?;
1832
1833 let (source, _obj) = Source::find_by_id(source_id)
1834 .find_also_related(Object)
1835 .one(&txn)
1836 .await?
1837 .ok_or_else(|| {
1838 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1839 })?;
1840 let connector = source.with_properties.0.get_connector().unwrap();
1841
1842 let prop_keys: Vec<String> = alter_props
1844 .keys()
1845 .chain(alter_secret_refs.keys())
1846 .cloned()
1847 .collect();
1848 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1849 &connector, &prop_keys,
1850 )?;
1851
1852 let mut options_with_secret = WithOptionsSecResolved::new(
1853 source.with_properties.0.clone(),
1854 source
1855 .secret_ref
1856 .map(|secret_ref| secret_ref.to_protobuf())
1857 .unwrap_or_default(),
1858 );
1859 let (to_add_secret_dep, to_remove_secret_dep) =
1860 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1861
1862 tracing::info!(
1863 "applying new properties to source: source_id={}, options_with_secret={:?}",
1864 source_id,
1865 options_with_secret
1866 );
1867 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1869 let mut associate_table_id = None;
1872
1873 let mut preferred_id: i32 = source_id;
1877 let rewrite_sql = {
1878 let definition = source.definition.clone();
1879
1880 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1881 .map_err(|e| {
1882 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1883 anyhow!(e).context("Failed to parse source definition SQL"),
1884 )))
1885 })?
1886 .try_into()
1887 .unwrap();
1888
1889 async fn format_with_option_secret_resolved(
1903 txn: &DatabaseTransaction,
1904 options_with_secret: &WithOptionsSecResolved,
1905 ) -> MetaResult<Vec<SqlOption>> {
1906 let mut options = Vec::new();
1907 for (k, v) in options_with_secret.as_plaintext() {
1908 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1909 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1910 options.push(sql_option);
1911 }
1912 for (k, v) in options_with_secret.as_secret() {
1913 if let Some(secret_model) =
1914 Secret::find_by_id(v.secret_id as i32).one(txn).await?
1915 {
1916 let sql_option =
1917 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
1918 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1919 options.push(sql_option);
1920 } else {
1921 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
1922 }
1923 }
1924 Ok(options)
1925 }
1926
1927 match &mut stmt {
1928 Statement::CreateSource { stmt } => {
1929 stmt.with_properties.0 =
1930 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1931 }
1932 Statement::CreateTable { with_options, .. } => {
1933 *with_options =
1934 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1935 associate_table_id = source.optional_associated_table_id;
1936 preferred_id = associate_table_id.unwrap();
1937 }
1938 _ => unreachable!(),
1939 }
1940
1941 stmt.to_string()
1942 };
1943
1944 {
1945 if !to_add_secret_dep.is_empty() {
1947 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
1948 object_dependency::ActiveModel {
1949 oid: Set(secret_id as _),
1950 used_by: Set(preferred_id as _),
1951 ..Default::default()
1952 }
1953 }))
1954 .exec(&txn)
1955 .await?;
1956 }
1957 if !to_remove_secret_dep.is_empty() {
1958 let _ = ObjectDependency::delete_many()
1960 .filter(
1961 object_dependency::Column::Oid
1962 .is_in(to_remove_secret_dep)
1963 .and(
1964 object_dependency::Column::UsedBy.eq::<ObjectId>(preferred_id as _),
1965 ),
1966 )
1967 .exec(&txn)
1968 .await?;
1969 }
1970 }
1971
1972 let active_source_model = source::ActiveModel {
1973 source_id: Set(source_id),
1974 definition: Set(rewrite_sql.clone()),
1975 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
1976 secret_ref: Set((!options_with_secret.as_secret().is_empty())
1977 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
1978 ..Default::default()
1979 };
1980 active_source_model.update(&txn).await?;
1981
1982 if let Some(associate_table_id) = associate_table_id {
1983 let active_table_model = table::ActiveModel {
1985 table_id: Set(associate_table_id),
1986 definition: Set(rewrite_sql),
1987 ..Default::default()
1988 };
1989 active_table_model.update(&txn).await?;
1990 }
1991
1992 update_connector_props_fragments(
1994 &txn,
1995 if let Some(associate_table_id) = associate_table_id {
1996 associate_table_id
1998 } else {
1999 source_id
2000 },
2001 FragmentTypeFlag::Source,
2002 |node, found| {
2003 if let PbNodeBody::Source(node) = node
2004 && let Some(source_inner) = &mut node.source_inner
2005 {
2006 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2007 source_inner.secret_refs = options_with_secret.as_secret().clone();
2008 *found = true;
2009 }
2010 },
2011 )
2012 .await?;
2013
2014 let mut to_update_objs = Vec::with_capacity(2);
2015 let (source, obj) = Source::find_by_id(source_id)
2016 .find_also_related(Object)
2017 .one(&txn)
2018 .await?
2019 .ok_or_else(|| {
2020 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2021 })?;
2022 to_update_objs.push(PbObject {
2023 object_info: Some(PbObjectInfo::Source(
2024 ObjectModel(source, obj.unwrap()).into(),
2025 )),
2026 });
2027
2028 if let Some(associate_table_id) = associate_table_id {
2029 let (table, obj) = Table::find_by_id(associate_table_id)
2030 .find_also_related(Object)
2031 .one(&txn)
2032 .await?
2033 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2034 to_update_objs.push(PbObject {
2035 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2036 });
2037 }
2038
2039 txn.commit().await?;
2040
2041 self.notify_frontend(
2042 NotificationOperation::Update,
2043 NotificationInfo::ObjectGroup(PbObjectGroup {
2044 objects: to_update_objs,
2045 }),
2046 )
2047 .await;
2048
2049 Ok(options_with_secret)
2050 }
2051
2052 pub async fn update_sink_props_by_sink_id(
2053 &self,
2054 sink_id: SinkId,
2055 props: BTreeMap<String, String>,
2056 ) -> MetaResult<HashMap<String, String>> {
2057 let inner = self.inner.read().await;
2058 let txn = inner.db.begin().await?;
2059
2060 let (sink, _obj) = Sink::find_by_id(sink_id)
2061 .find_also_related(Object)
2062 .one(&txn)
2063 .await?
2064 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2065
2066 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2068 Some(connector) => {
2069 let connector_type = connector.to_lowercase();
2070 let field_names: Vec<String> = props.keys().cloned().collect();
2071 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2072 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2073
2074 match_sink_name_str!(
2075 connector_type.as_str(),
2076 SinkType,
2077 {
2078 let mut new_props = sink.properties.0.clone();
2079 new_props.extend(props.clone());
2080 SinkType::validate_alter_config(&new_props)
2081 },
2082 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2083 )?
2084 }
2085 None => {
2086 return Err(
2087 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2088 );
2089 }
2090 };
2091 let definition = sink.definition.clone();
2092 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2093 .map_err(|e| SinkError::Config(anyhow!(e)))?
2094 .try_into()
2095 .unwrap();
2096 if let Statement::CreateSink { stmt } = &mut stmt {
2097 let mut new_sql_options = stmt
2098 .with_properties
2099 .0
2100 .iter()
2101 .map(|sql_option| (&sql_option.name, sql_option))
2102 .collect::<IndexMap<_, _>>();
2103 let add_sql_options = props
2104 .iter()
2105 .map(|(k, v)| SqlOption::try_from((k, v)))
2106 .collect::<Result<Vec<SqlOption>, ParserError>>()
2107 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2108 new_sql_options.extend(
2109 add_sql_options
2110 .iter()
2111 .map(|sql_option| (&sql_option.name, sql_option)),
2112 );
2113 stmt.with_properties.0 = new_sql_options.into_values().cloned().collect();
2114 } else {
2115 panic!("sink definition is not a create sink statement")
2116 }
2117 let mut new_config = sink.properties.clone().into_inner();
2118 new_config.extend(props);
2119
2120 let active_sink = sink::ActiveModel {
2121 sink_id: Set(sink_id),
2122 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2123 definition: Set(stmt.to_string()),
2124 ..Default::default()
2125 };
2126 active_sink.update(&txn).await?;
2127
2128 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2129 .select_only()
2130 .columns([
2131 fragment::Column::FragmentId,
2132 fragment::Column::FragmentTypeMask,
2133 fragment::Column::StreamNode,
2134 ])
2135 .filter(fragment::Column::JobId.eq(sink_id))
2136 .into_tuple()
2137 .all(&txn)
2138 .await?;
2139 let fragments = fragments
2140 .into_iter()
2141 .filter(|(_, fragment_type_mask, _)| {
2142 FragmentTypeMask::from(*fragment_type_mask).contains(FragmentTypeFlag::Sink)
2143 })
2144 .filter_map(|(id, _, stream_node)| {
2145 let mut stream_node = stream_node.to_protobuf();
2146 let mut found = false;
2147 visit_stream_node_mut(&mut stream_node, |node| {
2148 if let PbNodeBody::Sink(node) = node
2149 && let Some(sink_desc) = &mut node.sink_desc
2150 && sink_desc.id == sink_id as u32
2151 {
2152 sink_desc.properties = new_config.clone();
2153 found = true;
2154 }
2155 });
2156 if found { Some((id, stream_node)) } else { None }
2157 })
2158 .collect_vec();
2159 assert!(
2160 !fragments.is_empty(),
2161 "sink id should be used by at least one fragment"
2162 );
2163 for (id, stream_node) in fragments {
2164 fragment::ActiveModel {
2165 fragment_id: Set(id),
2166 stream_node: Set(StreamNode::from(&stream_node)),
2167 ..Default::default()
2168 }
2169 .update(&txn)
2170 .await?;
2171 }
2172
2173 let (sink, obj) = Sink::find_by_id(sink_id)
2174 .find_also_related(Object)
2175 .one(&txn)
2176 .await?
2177 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2178
2179 txn.commit().await?;
2180
2181 let relation_info = PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into());
2182 let _version = self
2183 .notify_frontend(
2184 NotificationOperation::Update,
2185 NotificationInfo::ObjectGroup(PbObjectGroup {
2186 objects: vec![PbObject {
2187 object_info: Some(relation_info),
2188 }],
2189 }),
2190 )
2191 .await;
2192
2193 Ok(new_config.into_iter().collect())
2194 }
2195
2196 pub async fn update_fragment_rate_limit_by_fragment_id(
2197 &self,
2198 fragment_id: FragmentId,
2199 rate_limit: Option<u32>,
2200 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2201 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2202 stream_node: &mut PbStreamNode| {
2203 let mut found = false;
2204 if fragment_type_mask.contains_any(
2205 FragmentTypeFlag::dml_rate_limit_fragments()
2206 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2207 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2208 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2209 ) {
2210 visit_stream_node_mut(stream_node, |node| {
2211 if let PbNodeBody::Dml(node) = node {
2212 node.rate_limit = rate_limit;
2213 found = true;
2214 }
2215 if let PbNodeBody::Sink(node) = node {
2216 node.rate_limit = rate_limit;
2217 found = true;
2218 }
2219 if let PbNodeBody::StreamCdcScan(node) = node {
2220 node.rate_limit = rate_limit;
2221 found = true;
2222 }
2223 if let PbNodeBody::StreamScan(node) = node {
2224 node.rate_limit = rate_limit;
2225 found = true;
2226 }
2227 if let PbNodeBody::SourceBackfill(node) = node {
2228 node.rate_limit = rate_limit;
2229 found = true;
2230 }
2231 });
2232 }
2233 found
2234 };
2235 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2236 .await
2237 }
2238
2239 pub async fn post_apply_reschedules(
2240 &self,
2241 reschedules: HashMap<FragmentId, Reschedule>,
2242 post_updates: &JobReschedulePostUpdates,
2243 ) -> MetaResult<()> {
2244 let new_created_actors: HashSet<_> = reschedules
2245 .values()
2246 .flat_map(|reschedule| {
2247 reschedule
2248 .added_actors
2249 .values()
2250 .flatten()
2251 .map(|actor_id| *actor_id as ActorId)
2252 })
2253 .collect();
2254
2255 let inner = self.inner.write().await;
2256
2257 let txn = inner.db.begin().await?;
2258
2259 for Reschedule {
2260 removed_actors,
2261 vnode_bitmap_updates,
2262 actor_splits,
2263 newly_created_actors,
2264 ..
2265 } in reschedules.into_values()
2266 {
2267 Actor::delete_many()
2269 .filter(
2270 actor::Column::ActorId
2271 .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
2272 )
2273 .exec(&txn)
2274 .await?;
2275
2276 for (
2278 (
2279 StreamActor {
2280 actor_id,
2281 fragment_id,
2282 vnode_bitmap,
2283 expr_context,
2284 ..
2285 },
2286 _,
2287 ),
2288 worker_id,
2289 ) in newly_created_actors.into_values()
2290 {
2291 let splits = actor_splits
2292 .get(&actor_id)
2293 .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
2294
2295 Actor::insert(actor::ActiveModel {
2296 actor_id: Set(actor_id as _),
2297 fragment_id: Set(fragment_id as _),
2298 status: Set(ActorStatus::Running),
2299 splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
2300 worker_id: Set(worker_id),
2301 upstream_actor_ids: Set(Default::default()),
2302 vnode_bitmap: Set(vnode_bitmap
2303 .as_ref()
2304 .map(|bitmap| (&bitmap.to_protobuf()).into())),
2305 expr_context: Set(expr_context.as_ref().unwrap().into()),
2306 })
2307 .exec(&txn)
2308 .await?;
2309 }
2310
2311 for (actor_id, bitmap) in vnode_bitmap_updates {
2313 let actor = Actor::find_by_id(actor_id as ActorId)
2314 .one(&txn)
2315 .await?
2316 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2317
2318 let mut actor = actor.into_active_model();
2319 actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
2320 actor.update(&txn).await?;
2321 }
2322
2323 for (actor_id, splits) in actor_splits {
2325 if new_created_actors.contains(&(actor_id as ActorId)) {
2326 continue;
2327 }
2328
2329 let actor = Actor::find_by_id(actor_id as ActorId)
2330 .one(&txn)
2331 .await?
2332 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2333
2334 let mut actor = actor.into_active_model();
2335 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
2336 actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
2337 actor.update(&txn).await?;
2338 }
2339 }
2340
2341 let JobReschedulePostUpdates {
2342 parallelism_updates,
2343 resource_group_updates,
2344 } = post_updates;
2345
2346 for (table_id, parallelism) in parallelism_updates {
2347 let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
2348 .one(&txn)
2349 .await?
2350 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
2351 .into_active_model();
2352
2353 streaming_job.parallelism = Set(match parallelism {
2354 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
2355 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
2356 TableParallelism::Custom => StreamingParallelism::Custom,
2357 });
2358
2359 if let Some(resource_group) =
2360 resource_group_updates.get(&(table_id.table_id() as ObjectId))
2361 {
2362 streaming_job.specific_resource_group = Set(resource_group.to_owned());
2363 }
2364
2365 streaming_job.update(&txn).await?;
2366 }
2367
2368 txn.commit().await?;
2369
2370 Ok(())
2371 }
2372
2373 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2376 let inner = self.inner.read().await;
2377 let txn = inner.db.begin().await?;
2378
2379 let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
2380 .select_only()
2381 .columns([
2382 fragment::Column::FragmentId,
2383 fragment::Column::JobId,
2384 fragment::Column::FragmentTypeMask,
2385 fragment::Column::StreamNode,
2386 ])
2387 .filter(fragment_type_mask_intersects(FragmentTypeFlag::raw_flag(
2388 FragmentTypeFlag::rate_limit_fragments(),
2389 ) as _))
2390 .into_tuple()
2391 .all(&txn)
2392 .await?;
2393
2394 let mut rate_limits = Vec::new();
2395 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2396 let stream_node = stream_node.to_protobuf();
2397 visit_stream_node_body(&stream_node, |node| {
2398 let mut rate_limit = None;
2399 let mut node_name = None;
2400
2401 match node {
2402 PbNodeBody::Source(node) => {
2404 if let Some(node_inner) = &node.source_inner {
2405 rate_limit = node_inner.rate_limit;
2406 node_name = Some("SOURCE");
2407 }
2408 }
2409 PbNodeBody::StreamFsFetch(node) => {
2410 if let Some(node_inner) = &node.node_inner {
2411 rate_limit = node_inner.rate_limit;
2412 node_name = Some("FS_FETCH");
2413 }
2414 }
2415 PbNodeBody::SourceBackfill(node) => {
2417 rate_limit = node.rate_limit;
2418 node_name = Some("SOURCE_BACKFILL");
2419 }
2420 PbNodeBody::StreamScan(node) => {
2421 rate_limit = node.rate_limit;
2422 node_name = Some("STREAM_SCAN");
2423 }
2424 PbNodeBody::StreamCdcScan(node) => {
2425 rate_limit = node.rate_limit;
2426 node_name = Some("STREAM_CDC_SCAN");
2427 }
2428 PbNodeBody::Sink(node) => {
2429 rate_limit = node.rate_limit;
2430 node_name = Some("SINK");
2431 }
2432 _ => {}
2433 }
2434
2435 if let Some(rate_limit) = rate_limit {
2436 rate_limits.push(RateLimitInfo {
2437 fragment_id: fragment_id as u32,
2438 job_id: job_id as u32,
2439 fragment_type_mask: fragment_type_mask as u32,
2440 rate_limit,
2441 node_name: node_name.unwrap().to_owned(),
2442 });
2443 }
2444 });
2445 }
2446
2447 Ok(rate_limits)
2448 }
2449}
2450
2451fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr {
2452 column
2453 .binary(BinOper::Custom("&"), value)
2454 .binary(BinOper::NotEqual, 0)
2455}
2456
2457fn fragment_type_mask_intersects(value: i32) -> SimpleExpr {
2458 bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value)
2459}
2460
2461pub struct SinkIntoTableContext {
2462 pub creating_sink_id: Option<SinkId>,
2464 pub dropping_sink_id: Option<SinkId>,
2466 pub updated_sink_catalogs: Vec<SinkId>,
2469}
2470
2471pub struct FinishAutoRefreshSchemaSinkContext {
2472 pub tmp_sink_id: ObjectId,
2473 pub original_sink_id: ObjectId,
2474 pub columns: Vec<PbColumnCatalog>,
2475 pub new_log_store_table: Option<(ObjectId, Vec<PbColumnCatalog>)>,
2476}
2477
2478async fn update_connector_props_fragments<F>(
2479 txn: &DatabaseTransaction,
2480 job_id: i32,
2481 expect_flag: FragmentTypeFlag,
2482 mut alter_stream_node_fn: F,
2483) -> MetaResult<()>
2484where
2485 F: FnMut(&mut PbNodeBody, &mut bool),
2486{
2487 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2488 .select_only()
2489 .columns([
2490 fragment::Column::FragmentId,
2491 fragment::Column::FragmentTypeMask,
2492 fragment::Column::StreamNode,
2493 ])
2494 .filter(fragment::Column::JobId.eq(job_id))
2495 .into_tuple()
2496 .all(txn)
2497 .await?;
2498 let fragments = fragments
2499 .into_iter()
2500 .filter(|(_, fragment_type_mask, _)| *fragment_type_mask & expect_flag as i32 != 0)
2501 .filter_map(|(id, _, stream_node)| {
2502 let mut stream_node = stream_node.to_protobuf();
2503 let mut found = false;
2504 visit_stream_node_mut(&mut stream_node, |node| {
2505 alter_stream_node_fn(node, &mut found);
2506 });
2507 if found { Some((id, stream_node)) } else { None }
2508 })
2509 .collect_vec();
2510 assert!(
2511 !fragments.is_empty(),
2512 "job {} (type: {:?}) should be used by at least one fragment",
2513 job_id,
2514 expect_flag
2515 );
2516
2517 for (id, stream_node) in fragments {
2518 fragment::ActiveModel {
2519 fragment_id: Set(id),
2520 stream_node: Set(StreamNode::from(&stream_node)),
2521 ..Default::default()
2522 }
2523 .update(txn)
2524 .await?;
2525 }
2526
2527 Ok(())
2528}