1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::util::stream_graph_visitor::{
25 visit_stream_node_body, visit_stream_node_mut,
26};
27use risingwave_common::{bail, current_cluster_version};
28use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
29use risingwave_connector::error::ConnectorError;
30use risingwave_connector::sink::file_sink::fs::FsSink;
31use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
32use risingwave_connector::source::ConnectorProperties;
33use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
34use risingwave_meta_model::actor::ActorStatus;
35use risingwave_meta_model::object::ObjectType;
36use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
37use risingwave_meta_model::table::TableType;
38use risingwave_meta_model::user_privilege::Action;
39use risingwave_meta_model::*;
40use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
41use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
42use risingwave_pb::catalog::{PbCreateType, PbTable};
43use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
44use risingwave_pb::meta::object::PbObjectInfo;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
47};
48use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
49use risingwave_pb::secret::PbSecretRef;
50use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
51use risingwave_pb::stream_plan::PbStreamNode;
52use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
53use risingwave_pb::stream_plan::stream_node::PbNodeBody;
54use risingwave_pb::user::PbUserInfo;
55use risingwave_sqlparser::ast::{SqlOption, Statement};
56use risingwave_sqlparser::parser::{Parser, ParserError};
57use sea_orm::ActiveValue::Set;
58use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr};
59use sea_orm::{
60 ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
61 IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect,
62 RelationTrait, TransactionTrait,
63};
64use thiserror_ext::AsReport;
65
66use super::rename::IndexItemRewriter;
67use crate::barrier::{ReplaceStreamJobPlan, Reschedule};
68use crate::controller::ObjectModel;
69use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
70use crate::controller::utils::{
71 PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
72 check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
73 get_fragment_mappings, get_internal_tables_by_id, grant_default_privileges_automatically,
74 insert_fragment_relations, list_user_info_by_ids, rebuild_fragment_mapping_from_actors,
75};
76use crate::error::MetaErrorInner;
77use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
78use crate::model::{
79 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
80 StreamJobFragmentsToCreate, TableParallelism,
81};
82use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
83use crate::{MetaError, MetaResult};
84
85impl CatalogController {
86 pub async fn create_streaming_job_obj(
87 txn: &DatabaseTransaction,
88 obj_type: ObjectType,
89 owner_id: UserId,
90 database_id: Option<DatabaseId>,
91 schema_id: Option<SchemaId>,
92 create_type: PbCreateType,
93 ctx: &StreamContext,
94 streaming_parallelism: StreamingParallelism,
95 max_parallelism: usize,
96 specific_resource_group: Option<String>, ) -> MetaResult<ObjectId> {
98 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
99 let job = streaming_job::ActiveModel {
100 job_id: Set(obj.oid),
101 job_status: Set(JobStatus::Initial),
102 create_type: Set(create_type.into()),
103 timezone: Set(ctx.timezone.clone()),
104 parallelism: Set(streaming_parallelism),
105 max_parallelism: Set(max_parallelism as _),
106 specific_resource_group: Set(specific_resource_group),
107 };
108 job.insert(txn).await?;
109
110 Ok(obj.oid)
111 }
112
113 #[await_tree::instrument]
119 pub async fn create_job_catalog(
120 &self,
121 streaming_job: &mut StreamingJob,
122 ctx: &StreamContext,
123 parallelism: &Option<Parallelism>,
124 max_parallelism: usize,
125 mut dependencies: HashSet<ObjectId>,
126 specific_resource_group: Option<String>,
127 ) -> MetaResult<()> {
128 let inner = self.inner.write().await;
129 let txn = inner.db.begin().await?;
130 let create_type = streaming_job.create_type();
131
132 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
133 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
134 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
135 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
136 };
137
138 ensure_user_id(streaming_job.owner() as _, &txn).await?;
139 ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
140 ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
141 check_relation_name_duplicate(
142 &streaming_job.name(),
143 streaming_job.database_id() as _,
144 streaming_job.schema_id() as _,
145 &txn,
146 )
147 .await?;
148
149 if !dependencies.is_empty() {
151 let altering_cnt = ObjectDependency::find()
152 .join(
153 JoinType::InnerJoin,
154 object_dependency::Relation::Object1.def(),
155 )
156 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
157 .filter(
158 object_dependency::Column::Oid
159 .is_in(dependencies.clone())
160 .and(object::Column::ObjType.eq(ObjectType::Table))
161 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
162 .and(
163 object::Column::Oid.not_in_subquery(
165 Query::select()
166 .column(table::Column::TableId)
167 .from(Table)
168 .to_owned(),
169 ),
170 ),
171 )
172 .count(&txn)
173 .await?;
174 if altering_cnt != 0 {
175 return Err(MetaError::permission_denied(
176 "some dependent relations are being altered",
177 ));
178 }
179 }
180
181 match streaming_job {
182 StreamingJob::MaterializedView(table) => {
183 let job_id = Self::create_streaming_job_obj(
184 &txn,
185 ObjectType::Table,
186 table.owner as _,
187 Some(table.database_id as _),
188 Some(table.schema_id as _),
189 create_type,
190 ctx,
191 streaming_parallelism,
192 max_parallelism,
193 specific_resource_group,
194 )
195 .await?;
196 table.id = job_id as _;
197 let table_model: table::ActiveModel = table.clone().into();
198 Table::insert(table_model).exec(&txn).await?;
199 }
200 StreamingJob::Sink(sink, _) => {
201 if let Some(target_table_id) = sink.target_table
202 && check_sink_into_table_cycle(
203 target_table_id as ObjectId,
204 dependencies.iter().cloned().collect(),
205 &txn,
206 )
207 .await?
208 {
209 bail!("Creating such a sink will result in circular dependency.");
210 }
211
212 let job_id = Self::create_streaming_job_obj(
213 &txn,
214 ObjectType::Sink,
215 sink.owner as _,
216 Some(sink.database_id as _),
217 Some(sink.schema_id as _),
218 create_type,
219 ctx,
220 streaming_parallelism,
221 max_parallelism,
222 specific_resource_group,
223 )
224 .await?;
225 sink.id = job_id as _;
226 let sink_model: sink::ActiveModel = sink.clone().into();
227 Sink::insert(sink_model).exec(&txn).await?;
228 }
229 StreamingJob::Table(src, table, _) => {
230 let job_id = Self::create_streaming_job_obj(
231 &txn,
232 ObjectType::Table,
233 table.owner as _,
234 Some(table.database_id as _),
235 Some(table.schema_id as _),
236 create_type,
237 ctx,
238 streaming_parallelism,
239 max_parallelism,
240 specific_resource_group,
241 )
242 .await?;
243 table.id = job_id as _;
244 if let Some(src) = src {
245 let src_obj = Self::create_object(
246 &txn,
247 ObjectType::Source,
248 src.owner as _,
249 Some(src.database_id as _),
250 Some(src.schema_id as _),
251 )
252 .await?;
253 src.id = src_obj.oid as _;
254 src.optional_associated_table_id =
255 Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
256 table.optional_associated_source_id = Some(
257 PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
258 );
259 let source: source::ActiveModel = src.clone().into();
260 Source::insert(source).exec(&txn).await?;
261 }
262 let table_model: table::ActiveModel = table.clone().into();
263 Table::insert(table_model).exec(&txn).await?;
264 }
265 StreamingJob::Index(index, table) => {
266 ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
267 let job_id = Self::create_streaming_job_obj(
268 &txn,
269 ObjectType::Index,
270 index.owner as _,
271 Some(index.database_id as _),
272 Some(index.schema_id as _),
273 create_type,
274 ctx,
275 streaming_parallelism,
276 max_parallelism,
277 specific_resource_group,
278 )
279 .await?;
280 index.id = job_id as _;
282 index.index_table_id = job_id as _;
283 table.id = job_id as _;
284
285 ObjectDependency::insert(object_dependency::ActiveModel {
286 oid: Set(index.primary_table_id as _),
287 used_by: Set(table.id as _),
288 ..Default::default()
289 })
290 .exec(&txn)
291 .await?;
292
293 let table_model: table::ActiveModel = table.clone().into();
294 Table::insert(table_model).exec(&txn).await?;
295 let index_model: index::ActiveModel = index.clone().into();
296 Index::insert(index_model).exec(&txn).await?;
297 }
298 StreamingJob::Source(src) => {
299 let job_id = Self::create_streaming_job_obj(
300 &txn,
301 ObjectType::Source,
302 src.owner as _,
303 Some(src.database_id as _),
304 Some(src.schema_id as _),
305 create_type,
306 ctx,
307 streaming_parallelism,
308 max_parallelism,
309 specific_resource_group,
310 )
311 .await?;
312 src.id = job_id as _;
313 let source_model: source::ActiveModel = src.clone().into();
314 Source::insert(source_model).exec(&txn).await?;
315 }
316 }
317
318 dependencies.extend(
320 streaming_job
321 .dependent_secret_ids()?
322 .into_iter()
323 .map(|secret_id| secret_id as ObjectId),
324 );
325 dependencies.extend(
327 streaming_job
328 .dependent_connection_ids()?
329 .into_iter()
330 .map(|conn_id| conn_id as ObjectId),
331 );
332
333 if !dependencies.is_empty() {
335 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
336 object_dependency::ActiveModel {
337 oid: Set(oid),
338 used_by: Set(streaming_job.id() as _),
339 ..Default::default()
340 }
341 }))
342 .exec(&txn)
343 .await?;
344 }
345
346 txn.commit().await?;
347
348 Ok(())
349 }
350
351 pub async fn create_internal_table_catalog(
359 &self,
360 job: &StreamingJob,
361 mut incomplete_internal_tables: Vec<PbTable>,
362 ) -> MetaResult<HashMap<u32, u32>> {
363 let job_id = job.id() as ObjectId;
364 let inner = self.inner.write().await;
365 let txn = inner.db.begin().await?;
366 let mut table_id_map = HashMap::new();
367 for table in &mut incomplete_internal_tables {
368 let table_id = Self::create_object(
369 &txn,
370 ObjectType::Table,
371 table.owner as _,
372 Some(table.database_id as _),
373 Some(table.schema_id as _),
374 )
375 .await?
376 .oid;
377 table_id_map.insert(table.id, table_id as u32);
378 table.id = table_id as _;
379 table.job_id = Some(job_id as _);
380
381 let table_model = table::ActiveModel {
382 table_id: Set(table_id as _),
383 belongs_to_job_id: Set(Some(job_id)),
384 fragment_id: NotSet,
385 ..table.clone().into()
386 };
387 Table::insert(table_model).exec(&txn).await?;
388 }
389 txn.commit().await?;
390
391 Ok(table_id_map)
392 }
393
394 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" })]
399 pub async fn prepare_streaming_job(
400 &self,
401 stream_job_fragments: &StreamJobFragmentsToCreate,
402 streaming_job: &StreamingJob,
403 for_replace: bool,
404 ) -> MetaResult<()> {
405 let is_materialized_view = streaming_job.is_materialized_view();
406 let fragment_actors =
407 Self::extract_fragment_and_actors_from_fragments(stream_job_fragments)?;
408 let mut all_tables = stream_job_fragments.all_tables();
409 let inner = self.inner.write().await;
410
411 let mut objects = vec![];
412 let txn = inner.db.begin().await?;
413
414 let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
416 for fragment in fragments {
417 let fragment_id = fragment.fragment_id;
418 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
419
420 let fragment = fragment.into_active_model();
421 Fragment::insert(fragment).exec(&txn).await?;
422
423 if !for_replace {
426 for state_table_id in state_table_ids {
427 let table = all_tables
431 .get_mut(&(state_table_id as u32))
432 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
433 assert_eq!(table.id, state_table_id as u32);
434 assert_eq!(table.fragment_id, fragment_id as u32);
435 let vnode_count = table.vnode_count();
436
437 table::ActiveModel {
438 table_id: Set(state_table_id as _),
439 fragment_id: Set(Some(fragment_id)),
440 vnode_count: Set(vnode_count as _),
441 ..Default::default()
442 }
443 .update(&txn)
444 .await?;
445
446 if is_materialized_view {
447 if cfg!(not(debug_assertions)) && table.id == streaming_job.id() {
449 table.definition = streaming_job.definition();
450 }
451 objects.push(PbObject {
452 object_info: Some(PbObjectInfo::Table(table.clone())),
453 });
454 }
455 }
456 }
457 }
458
459 insert_fragment_relations(&txn, &stream_job_fragments.downstreams).await?;
460
461 for actors in actors {
463 for actor in actors {
464 let actor = actor.into_active_model();
465 Actor::insert(actor).exec(&txn).await?;
466 }
467 }
468
469 if !for_replace {
470 if let StreamingJob::Table(_, table, ..) = streaming_job {
472 Table::update(table::ActiveModel {
473 table_id: Set(table.id as _),
474 dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
475 ..Default::default()
476 })
477 .exec(&txn)
478 .await?;
479 }
480 }
481
482 txn.commit().await?;
483
484 if !objects.is_empty() {
485 assert!(is_materialized_view);
486 self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
487 .await;
488 }
489
490 Ok(())
491 }
492
493 #[await_tree::instrument]
497 pub async fn try_abort_creating_streaming_job(
498 &self,
499 job_id: ObjectId,
500 is_cancelled: bool,
501 ) -> MetaResult<(bool, Option<DatabaseId>)> {
502 let mut inner = self.inner.write().await;
503 let txn = inner.db.begin().await?;
504
505 let obj = Object::find_by_id(job_id).one(&txn).await?;
506 let Some(obj) = obj else {
507 tracing::warn!(
508 id = job_id,
509 "streaming job not found when aborting creating, might be cleaned by recovery"
510 );
511 return Ok((true, None));
512 };
513 let database_id = obj
514 .database_id
515 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
516
517 if !is_cancelled {
518 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
519 if let Some(streaming_job) = streaming_job {
520 assert_ne!(streaming_job.job_status, JobStatus::Created);
521 if streaming_job.create_type == CreateType::Background
522 && streaming_job.job_status == JobStatus::Creating
523 {
524 tracing::warn!(
526 id = job_id,
527 "streaming job is created in background and still in creating status"
528 );
529 return Ok((false, Some(database_id)));
530 }
531 }
532 }
533
534 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
535
536 let table_obj = Table::find_by_id(job_id).one(&txn).await?;
538 let mut objs = vec![];
539 if let Some(table) = &table_obj
540 && table.table_type == TableType::MaterializedView
541 {
542 let obj: Option<PartialObject> = Object::find_by_id(job_id)
543 .select_only()
544 .columns([
545 object::Column::Oid,
546 object::Column::ObjType,
547 object::Column::SchemaId,
548 object::Column::DatabaseId,
549 ])
550 .into_partial_model()
551 .one(&txn)
552 .await?;
553 let obj =
554 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
555 objs.push(obj);
556 let internal_table_objs: Vec<PartialObject> = Object::find()
557 .select_only()
558 .columns([
559 object::Column::Oid,
560 object::Column::ObjType,
561 object::Column::SchemaId,
562 object::Column::DatabaseId,
563 ])
564 .join(JoinType::InnerJoin, object::Relation::Table.def())
565 .filter(table::Column::BelongsToJobId.eq(job_id))
566 .into_partial_model()
567 .all(&txn)
568 .await?;
569 objs.extend(internal_table_objs);
570 }
571
572 if table_obj.is_none()
574 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id)
575 .select_only()
576 .column(sink::Column::TargetTable)
577 .into_tuple::<Option<TableId>>()
578 .one(&txn)
579 .await?
580 {
581 let tmp_id: Option<ObjectId> = ObjectDependency::find()
582 .select_only()
583 .column(object_dependency::Column::UsedBy)
584 .join(
585 JoinType::InnerJoin,
586 object_dependency::Relation::Object1.def(),
587 )
588 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
589 .filter(
590 object_dependency::Column::Oid
591 .eq(target_table_id)
592 .and(object::Column::ObjType.eq(ObjectType::Table))
593 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
594 )
595 .into_tuple()
596 .one(&txn)
597 .await?;
598 if let Some(tmp_id) = tmp_id {
599 tracing::warn!(
600 id = tmp_id,
601 "aborting temp streaming job for sink into table"
602 );
603 Object::delete_by_id(tmp_id).exec(&txn).await?;
604 }
605 }
606
607 Object::delete_by_id(job_id).exec(&txn).await?;
608 if !internal_table_ids.is_empty() {
609 Object::delete_many()
610 .filter(object::Column::Oid.is_in(internal_table_ids))
611 .exec(&txn)
612 .await?;
613 }
614 if let Some(t) = &table_obj
615 && let Some(source_id) = t.optional_associated_source_id
616 {
617 Object::delete_by_id(source_id).exec(&txn).await?;
618 }
619
620 let err = if is_cancelled {
621 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
622 } else {
623 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
624 };
625 let abort_reason = format!("streaming job aborted {}", err.as_report());
626 for tx in inner
627 .creating_table_finish_notifier
628 .get_mut(&database_id)
629 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
630 .into_iter()
631 .flatten()
632 .flatten()
633 {
634 let _ = tx.send(Err(abort_reason.clone()));
635 }
636 txn.commit().await?;
637
638 if !objs.is_empty() {
639 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
642 .await;
643 }
644 Ok((true, Some(database_id)))
645 }
646
647 #[await_tree::instrument]
648 pub async fn post_collect_job_fragments(
649 &self,
650 job_id: ObjectId,
651 actor_ids: Vec<crate::model::ActorId>,
652 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
653 split_assignment: &SplitAssignment,
654 ) -> MetaResult<()> {
655 self.post_collect_job_fragments_inner(
656 job_id,
657 actor_ids,
658 upstream_fragment_new_downstreams,
659 split_assignment,
660 false,
661 )
662 .await
663 }
664
665 pub async fn post_collect_job_fragments_inner(
666 &self,
667 job_id: ObjectId,
668 actor_ids: Vec<crate::model::ActorId>,
669 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
670 split_assignment: &SplitAssignment,
671 is_mv: bool,
672 ) -> MetaResult<()> {
673 let inner = self.inner.write().await;
674 let txn = inner.db.begin().await?;
675
676 Actor::update_many()
677 .col_expr(
678 actor::Column::Status,
679 SimpleExpr::from(ActorStatus::Running.into_value()),
680 )
681 .filter(
682 actor::Column::ActorId
683 .is_in(actor_ids.into_iter().map(|id| id as ActorId).collect_vec()),
684 )
685 .exec(&txn)
686 .await?;
687
688 for splits in split_assignment.values() {
689 for (actor_id, splits) in splits {
690 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
691 let connector_splits = &PbConnectorSplits { splits };
692 actor::ActiveModel {
693 actor_id: Set(*actor_id as _),
694 splits: Set(Some(connector_splits.into())),
695 ..Default::default()
696 }
697 .update(&txn)
698 .await?;
699 }
700 }
701
702 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
703
704 streaming_job::ActiveModel {
706 job_id: Set(job_id),
707 job_status: Set(JobStatus::Creating),
708 ..Default::default()
709 }
710 .update(&txn)
711 .await?;
712
713 let fragment_mapping = if is_mv {
714 get_fragment_mappings(&txn, job_id as _).await?
715 } else {
716 vec![]
717 };
718
719 txn.commit().await?;
720 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
721 .await;
722
723 Ok(())
724 }
725
726 pub async fn create_job_catalog_for_replace(
727 &self,
728 streaming_job: &StreamingJob,
729 ctx: &StreamContext,
730 specified_parallelism: &Option<NonZeroUsize>,
731 max_parallelism: usize,
732 ) -> MetaResult<ObjectId> {
733 let id = streaming_job.id();
734 let inner = self.inner.write().await;
735 let txn = inner.db.begin().await?;
736
737 streaming_job.verify_version_for_replace(&txn).await?;
739 let referring_cnt = ObjectDependency::find()
741 .join(
742 JoinType::InnerJoin,
743 object_dependency::Relation::Object1.def(),
744 )
745 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
746 .filter(
747 object_dependency::Column::Oid
748 .eq(id as ObjectId)
749 .and(object::Column::ObjType.eq(ObjectType::Table))
750 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
751 )
752 .count(&txn)
753 .await?;
754 if referring_cnt != 0 {
755 return Err(MetaError::permission_denied(
756 "job is being altered or referenced by some creating jobs",
757 ));
758 }
759
760 let original_max_parallelism: i32 = StreamingJobModel::find_by_id(id as ObjectId)
762 .select_only()
763 .column(streaming_job::Column::MaxParallelism)
764 .into_tuple()
765 .one(&txn)
766 .await?
767 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
768
769 if original_max_parallelism != max_parallelism as i32 {
770 bail!(
773 "cannot use a different max parallelism \
774 when replacing streaming job, \
775 original: {}, new: {}",
776 original_max_parallelism,
777 max_parallelism
778 );
779 }
780
781 let parallelism = match specified_parallelism {
782 None => StreamingParallelism::Adaptive,
783 Some(n) => StreamingParallelism::Fixed(n.get() as _),
784 };
785
786 let new_obj_id = Self::create_streaming_job_obj(
788 &txn,
789 streaming_job.object_type(),
790 streaming_job.owner() as _,
791 Some(streaming_job.database_id() as _),
792 Some(streaming_job.schema_id() as _),
793 streaming_job.create_type(),
794 ctx,
795 parallelism,
796 max_parallelism,
797 None,
798 )
799 .await?;
800
801 ObjectDependency::insert(object_dependency::ActiveModel {
803 oid: Set(id as _),
804 used_by: Set(new_obj_id as _),
805 ..Default::default()
806 })
807 .exec(&txn)
808 .await?;
809
810 txn.commit().await?;
811
812 Ok(new_obj_id)
813 }
814
815 pub async fn finish_streaming_job(
817 &self,
818 job_id: ObjectId,
819 replace_stream_job_info: Option<ReplaceStreamJobPlan>,
820 ) -> MetaResult<()> {
821 let mut inner = self.inner.write().await;
822 let txn = inner.db.begin().await?;
823
824 let job_type = Object::find_by_id(job_id)
825 .select_only()
826 .column(object::Column::ObjType)
827 .into_tuple()
828 .one(&txn)
829 .await?
830 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
831
832 let res = Object::update_many()
834 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
835 .col_expr(
836 object::Column::CreatedAtClusterVersion,
837 current_cluster_version().into(),
838 )
839 .filter(object::Column::Oid.eq(job_id))
840 .exec(&txn)
841 .await?;
842 if res.rows_affected == 0 {
843 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
844 }
845
846 let job = streaming_job::ActiveModel {
848 job_id: Set(job_id),
849 job_status: Set(JobStatus::Created),
850 ..Default::default()
851 };
852 job.update(&txn).await?;
853
854 let internal_table_objs = Table::find()
856 .find_also_related(Object)
857 .filter(table::Column::BelongsToJobId.eq(job_id))
858 .all(&txn)
859 .await?;
860 let mut objects = internal_table_objs
861 .iter()
862 .map(|(table, obj)| PbObject {
863 object_info: Some(PbObjectInfo::Table(
864 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
865 )),
866 })
867 .collect_vec();
868 let mut notification_op = NotificationOperation::Add;
869 let mut updated_user_info = vec![];
870
871 match job_type {
872 ObjectType::Table => {
873 let (table, obj) = Table::find_by_id(job_id)
874 .find_also_related(Object)
875 .one(&txn)
876 .await?
877 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
878 if table.table_type == TableType::MaterializedView {
879 notification_op = NotificationOperation::Update;
880 }
881
882 if let Some(source_id) = table.optional_associated_source_id {
883 let (src, obj) = Source::find_by_id(source_id)
884 .find_also_related(Object)
885 .one(&txn)
886 .await?
887 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
888 objects.push(PbObject {
889 object_info: Some(PbObjectInfo::Source(
890 ObjectModel(src, obj.unwrap()).into(),
891 )),
892 });
893 }
894 objects.push(PbObject {
895 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
896 });
897 }
898 ObjectType::Sink => {
899 let (sink, obj) = Sink::find_by_id(job_id)
900 .find_also_related(Object)
901 .one(&txn)
902 .await?
903 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
904 objects.push(PbObject {
905 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
906 });
907 }
908 ObjectType::Index => {
909 let (index, obj) = Index::find_by_id(job_id)
910 .find_also_related(Object)
911 .one(&txn)
912 .await?
913 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
914 {
915 let (table, obj) = Table::find_by_id(index.index_table_id)
916 .find_also_related(Object)
917 .one(&txn)
918 .await?
919 .ok_or_else(|| {
920 MetaError::catalog_id_not_found("table", index.index_table_id)
921 })?;
922 objects.push(PbObject {
923 object_info: Some(PbObjectInfo::Table(
924 ObjectModel(table, obj.unwrap()).into(),
925 )),
926 });
927 }
928
929 let primary_table_privileges = UserPrivilege::find()
932 .filter(
933 user_privilege::Column::Oid
934 .eq(index.primary_table_id)
935 .and(user_privilege::Column::Action.eq(Action::Select)),
936 )
937 .all(&txn)
938 .await?;
939 if !primary_table_privileges.is_empty() {
940 let index_state_table_ids: Vec<TableId> = Table::find()
941 .select_only()
942 .column(table::Column::TableId)
943 .filter(
944 table::Column::BelongsToJobId
945 .eq(job_id)
946 .or(table::Column::TableId.eq(index.index_table_id)),
947 )
948 .into_tuple()
949 .all(&txn)
950 .await?;
951 let mut new_privileges = vec![];
952 for privilege in &primary_table_privileges {
953 for state_table_id in &index_state_table_ids {
954 new_privileges.push(user_privilege::ActiveModel {
955 id: Default::default(),
956 oid: Set(*state_table_id),
957 user_id: Set(privilege.user_id),
958 action: Set(Action::Select),
959 dependent_id: Set(privilege.dependent_id),
960 granted_by: Set(privilege.granted_by),
961 with_grant_option: Set(privilege.with_grant_option),
962 });
963 }
964 }
965 UserPrivilege::insert_many(new_privileges)
966 .exec(&txn)
967 .await?;
968
969 updated_user_info = list_user_info_by_ids(
970 primary_table_privileges.into_iter().map(|p| p.user_id),
971 &txn,
972 )
973 .await?;
974 }
975
976 objects.push(PbObject {
977 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
978 });
979 }
980 ObjectType::Source => {
981 let (source, obj) = Source::find_by_id(job_id)
982 .find_also_related(Object)
983 .one(&txn)
984 .await?
985 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
986 objects.push(PbObject {
987 object_info: Some(PbObjectInfo::Source(
988 ObjectModel(source, obj.unwrap()).into(),
989 )),
990 });
991 }
992 _ => unreachable!("invalid job type: {:?}", job_type),
993 }
994
995 let fragment_mapping = get_fragment_mappings(&txn, job_id).await?;
996
997 let replace_table_mapping_update = match replace_stream_job_info {
998 Some(ReplaceStreamJobPlan {
999 streaming_job,
1000 replace_upstream,
1001 tmp_id,
1002 ..
1003 }) => {
1004 let incoming_sink_id = job_id;
1005
1006 let (relations, fragment_mapping, _) = Self::finish_replace_streaming_job_inner(
1007 tmp_id as ObjectId,
1008 replace_upstream,
1009 SinkIntoTableContext {
1010 creating_sink_id: Some(incoming_sink_id as _),
1011 dropping_sink_id: None,
1012 updated_sink_catalogs: vec![],
1013 },
1014 &txn,
1015 streaming_job,
1016 None, )
1018 .await?;
1019
1020 Some((relations, fragment_mapping))
1021 }
1022 None => None,
1023 };
1024
1025 if job_type != ObjectType::Index {
1026 updated_user_info = grant_default_privileges_automatically(&txn, job_id).await?;
1027 }
1028 txn.commit().await?;
1029
1030 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
1031 .await;
1032
1033 let mut version = self
1034 .notify_frontend(
1035 notification_op,
1036 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1037 )
1038 .await;
1039
1040 if !updated_user_info.is_empty() {
1042 version = self.notify_users_update(updated_user_info).await;
1043 }
1044
1045 if let Some((objects, fragment_mapping)) = replace_table_mapping_update {
1046 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
1047 .await;
1048 version = self
1049 .notify_frontend(
1050 NotificationOperation::Update,
1051 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1052 )
1053 .await;
1054 }
1055 inner
1056 .creating_table_finish_notifier
1057 .values_mut()
1058 .for_each(|creating_tables| {
1059 if let Some(txs) = creating_tables.remove(&job_id) {
1060 for tx in txs {
1061 let _ = tx.send(Ok(version));
1062 }
1063 }
1064 });
1065
1066 Ok(())
1067 }
1068
1069 pub async fn finish_replace_streaming_job(
1070 &self,
1071 tmp_id: ObjectId,
1072 streaming_job: StreamingJob,
1073 replace_upstream: FragmentReplaceUpstream,
1074 sink_into_table_context: SinkIntoTableContext,
1075 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1076 ) -> MetaResult<NotificationVersion> {
1077 let inner = self.inner.write().await;
1078 let txn = inner.db.begin().await?;
1079
1080 let (objects, fragment_mapping, delete_notification_objs) =
1081 Self::finish_replace_streaming_job_inner(
1082 tmp_id,
1083 replace_upstream,
1084 sink_into_table_context,
1085 &txn,
1086 streaming_job,
1087 drop_table_connector_ctx,
1088 )
1089 .await?;
1090
1091 txn.commit().await?;
1092
1093 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
1099 .await;
1100 let mut version = self
1101 .notify_frontend(
1102 NotificationOperation::Update,
1103 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1104 )
1105 .await;
1106
1107 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1108 self.notify_users_update(user_infos).await;
1109 version = self
1110 .notify_frontend(
1111 NotificationOperation::Delete,
1112 build_object_group_for_delete(to_drop_objects),
1113 )
1114 .await;
1115 }
1116
1117 Ok(version)
1118 }
1119
1120 pub async fn finish_replace_streaming_job_inner(
1121 tmp_id: ObjectId,
1122 replace_upstream: FragmentReplaceUpstream,
1123 SinkIntoTableContext {
1124 creating_sink_id,
1125 dropping_sink_id,
1126 updated_sink_catalogs,
1127 }: SinkIntoTableContext,
1128 txn: &DatabaseTransaction,
1129 streaming_job: StreamingJob,
1130 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1131 ) -> MetaResult<(
1132 Vec<PbObject>,
1133 Vec<PbFragmentWorkerSlotMapping>,
1134 Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1135 )> {
1136 let original_job_id = streaming_job.id() as ObjectId;
1137 let job_type = streaming_job.job_type();
1138
1139 let mut index_item_rewriter = None;
1140
1141 match streaming_job {
1143 StreamingJob::Table(_source, table, _table_job_type) => {
1144 let original_column_catalogs = Table::find_by_id(original_job_id)
1147 .select_only()
1148 .columns([table::Column::Columns])
1149 .into_tuple::<ColumnCatalogArray>()
1150 .one(txn)
1151 .await?
1152 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1153
1154 index_item_rewriter = Some({
1155 let original_columns = original_column_catalogs
1156 .to_protobuf()
1157 .into_iter()
1158 .map(|c| c.column_desc.unwrap())
1159 .collect_vec();
1160 let new_columns = table
1161 .columns
1162 .iter()
1163 .map(|c| c.column_desc.clone().unwrap())
1164 .collect_vec();
1165
1166 IndexItemRewriter {
1167 original_columns,
1168 new_columns,
1169 }
1170 });
1171
1172 for sink_id in updated_sink_catalogs {
1174 sink::ActiveModel {
1175 sink_id: Set(sink_id as _),
1176 original_target_columns: Set(Some(original_column_catalogs.clone())),
1177 ..Default::default()
1178 }
1179 .update(txn)
1180 .await?;
1181 }
1182 let mut table = table::ActiveModel::from(table);
1184 let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
1185 if let Some(sink_id) = creating_sink_id {
1186 debug_assert!(!incoming_sinks.contains(&{ sink_id }));
1187 incoming_sinks.push(sink_id as _);
1188 }
1189 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1190 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1191 {
1192 table.optional_associated_source_id = Set(None);
1194 }
1195
1196 if let Some(sink_id) = dropping_sink_id {
1197 let drained = incoming_sinks
1198 .extract_if(.., |id| *id == sink_id)
1199 .collect_vec();
1200 debug_assert_eq!(drained, vec![sink_id]);
1201 }
1202
1203 table.incoming_sinks = Set(incoming_sinks.into());
1204 table.update(txn).await?;
1205 }
1206 StreamingJob::Source(source) => {
1207 let source = source::ActiveModel::from(source);
1209 source.update(txn).await?;
1210 }
1211 StreamingJob::MaterializedView(table) => {
1212 let table = table::ActiveModel::from(table);
1214 table.update(txn).await?;
1215 }
1216 _ => unreachable!(
1217 "invalid streaming job type: {:?}",
1218 streaming_job.job_type_str()
1219 ),
1220 }
1221
1222 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1226 .select_only()
1227 .columns([
1228 fragment::Column::FragmentId,
1229 fragment::Column::StateTableIds,
1230 ])
1231 .filter(fragment::Column::JobId.eq(tmp_id))
1232 .into_tuple()
1233 .all(txn)
1234 .await?;
1235 for (fragment_id, state_table_ids) in fragment_info {
1236 for state_table_id in state_table_ids.into_inner() {
1237 table::ActiveModel {
1238 table_id: Set(state_table_id as _),
1239 fragment_id: Set(Some(fragment_id)),
1240 ..Default::default()
1242 }
1243 .update(txn)
1244 .await?;
1245 }
1246 }
1247
1248 Fragment::delete_many()
1250 .filter(fragment::Column::JobId.eq(original_job_id))
1251 .exec(txn)
1252 .await?;
1253 Fragment::update_many()
1254 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1255 .filter(fragment::Column::JobId.eq(tmp_id))
1256 .exec(txn)
1257 .await?;
1258
1259 for (fragment_id, fragment_replace_map) in replace_upstream {
1262 let (fragment_id, mut stream_node) = Fragment::find_by_id(fragment_id as FragmentId)
1263 .select_only()
1264 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1265 .into_tuple::<(FragmentId, StreamNode)>()
1266 .one(txn)
1267 .await?
1268 .map(|(id, node)| (id, node.to_protobuf()))
1269 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1270
1271 visit_stream_node_mut(&mut stream_node, |body| {
1272 if let PbNodeBody::Merge(m) = body
1273 && let Some(new_fragment_id) = fragment_replace_map.get(&m.upstream_fragment_id)
1274 {
1275 m.upstream_fragment_id = *new_fragment_id;
1276 }
1277 });
1278 fragment::ActiveModel {
1279 fragment_id: Set(fragment_id),
1280 stream_node: Set(StreamNode::from(&stream_node)),
1281 ..Default::default()
1282 }
1283 .update(txn)
1284 .await?;
1285 }
1286
1287 Object::delete_by_id(tmp_id).exec(txn).await?;
1289
1290 let mut objects = vec![];
1292 match job_type {
1293 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1294 let (table, table_obj) = Table::find_by_id(original_job_id)
1295 .find_also_related(Object)
1296 .one(txn)
1297 .await?
1298 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1299 objects.push(PbObject {
1300 object_info: Some(PbObjectInfo::Table(
1301 ObjectModel(table, table_obj.unwrap()).into(),
1302 )),
1303 })
1304 }
1305 StreamingJobType::Source => {
1306 let (source, source_obj) = Source::find_by_id(original_job_id)
1307 .find_also_related(Object)
1308 .one(txn)
1309 .await?
1310 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1311 objects.push(PbObject {
1312 object_info: Some(PbObjectInfo::Source(
1313 ObjectModel(source, source_obj.unwrap()).into(),
1314 )),
1315 })
1316 }
1317 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1318 }
1319
1320 if let Some(expr_rewriter) = index_item_rewriter {
1321 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1322 .select_only()
1323 .columns([index::Column::IndexId, index::Column::IndexItems])
1324 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1325 .into_tuple()
1326 .all(txn)
1327 .await?;
1328 for (index_id, nodes) in index_items {
1329 let mut pb_nodes = nodes.to_protobuf();
1330 pb_nodes
1331 .iter_mut()
1332 .for_each(|x| expr_rewriter.rewrite_expr(x));
1333 let index = index::ActiveModel {
1334 index_id: Set(index_id),
1335 index_items: Set(pb_nodes.into()),
1336 ..Default::default()
1337 }
1338 .update(txn)
1339 .await?;
1340 let index_obj = index
1341 .find_related(Object)
1342 .one(txn)
1343 .await?
1344 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1345 objects.push(PbObject {
1346 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1347 });
1348 }
1349 }
1350
1351 let fragment_mapping: Vec<_> = get_fragment_mappings(txn, original_job_id as _).await?;
1352
1353 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1354 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1355 notification_objs =
1356 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1357 }
1358
1359 Ok((objects, fragment_mapping, notification_objs))
1360 }
1361
1362 pub async fn try_abort_replacing_streaming_job(&self, tmp_job_id: ObjectId) -> MetaResult<()> {
1364 let inner = self.inner.write().await;
1365 Object::delete_by_id(tmp_job_id).exec(&inner.db).await?;
1366 Ok(())
1367 }
1368
1369 pub async fn update_source_rate_limit_by_source_id(
1372 &self,
1373 source_id: SourceId,
1374 rate_limit: Option<u32>,
1375 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1376 let inner = self.inner.read().await;
1377 let txn = inner.db.begin().await?;
1378
1379 {
1380 let active_source = source::ActiveModel {
1381 source_id: Set(source_id),
1382 rate_limit: Set(rate_limit.map(|v| v as i32)),
1383 ..Default::default()
1384 };
1385 active_source.update(&txn).await?;
1386 }
1387
1388 let (source, obj) = Source::find_by_id(source_id)
1389 .find_also_related(Object)
1390 .one(&txn)
1391 .await?
1392 .ok_or_else(|| {
1393 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1394 })?;
1395
1396 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1397 let streaming_job_ids: Vec<ObjectId> =
1398 if let Some(table_id) = source.optional_associated_table_id {
1399 vec![table_id]
1400 } else if let Some(source_info) = &source.source_info
1401 && source_info.to_protobuf().is_shared()
1402 {
1403 vec![source_id]
1404 } else {
1405 ObjectDependency::find()
1406 .select_only()
1407 .column(object_dependency::Column::UsedBy)
1408 .filter(object_dependency::Column::Oid.eq(source_id))
1409 .into_tuple()
1410 .all(&txn)
1411 .await?
1412 };
1413
1414 if streaming_job_ids.is_empty() {
1415 return Err(MetaError::invalid_parameter(format!(
1416 "source id {source_id} not used by any streaming job"
1417 )));
1418 }
1419
1420 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1421 .select_only()
1422 .columns([
1423 fragment::Column::FragmentId,
1424 fragment::Column::FragmentTypeMask,
1425 fragment::Column::StreamNode,
1426 ])
1427 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1428 .into_tuple()
1429 .all(&txn)
1430 .await?;
1431 let mut fragments = fragments
1432 .into_iter()
1433 .map(|(id, mask, stream_node)| {
1434 (
1435 id,
1436 FragmentTypeMask::from(mask as u32),
1437 stream_node.to_protobuf(),
1438 )
1439 })
1440 .collect_vec();
1441
1442 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1443 let mut found = false;
1444 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1445 visit_stream_node_mut(stream_node, |node| {
1446 if let PbNodeBody::Source(node) = node
1447 && let Some(node_inner) = &mut node.source_inner
1448 && node_inner.source_id == source_id as u32
1449 {
1450 node_inner.rate_limit = rate_limit;
1451 found = true;
1452 }
1453 });
1454 }
1455 if is_fs_source {
1456 visit_stream_node_mut(stream_node, |node| {
1459 if let PbNodeBody::StreamFsFetch(node) = node {
1460 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1461 if let Some(node_inner) = &mut node.node_inner
1462 && node_inner.source_id == source_id as u32
1463 {
1464 node_inner.rate_limit = rate_limit;
1465 found = true;
1466 }
1467 }
1468 });
1469 }
1470 found
1471 });
1472
1473 assert!(
1474 !fragments.is_empty(),
1475 "source id should be used by at least one fragment"
1476 );
1477 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1478 for (id, fragment_type_mask, stream_node) in fragments {
1479 fragment::ActiveModel {
1480 fragment_id: Set(id),
1481 fragment_type_mask: Set(fragment_type_mask.into()),
1482 stream_node: Set(StreamNode::from(&stream_node)),
1483 ..Default::default()
1484 }
1485 .update(&txn)
1486 .await?;
1487 }
1488 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1489
1490 txn.commit().await?;
1491
1492 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1493 let _version = self
1494 .notify_frontend(
1495 NotificationOperation::Update,
1496 NotificationInfo::ObjectGroup(PbObjectGroup {
1497 objects: vec![PbObject {
1498 object_info: Some(relation_info),
1499 }],
1500 }),
1501 )
1502 .await;
1503
1504 Ok(fragment_actors)
1505 }
1506
1507 pub async fn mutate_fragments_by_job_id(
1510 &self,
1511 job_id: ObjectId,
1512 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1514 err_msg: &'static str,
1516 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1517 let inner = self.inner.read().await;
1518 let txn = inner.db.begin().await?;
1519
1520 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1521 .select_only()
1522 .columns([
1523 fragment::Column::FragmentId,
1524 fragment::Column::FragmentTypeMask,
1525 fragment::Column::StreamNode,
1526 ])
1527 .filter(fragment::Column::JobId.eq(job_id))
1528 .into_tuple()
1529 .all(&txn)
1530 .await?;
1531 let mut fragments = fragments
1532 .into_iter()
1533 .map(|(id, mask, stream_node)| {
1534 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1535 })
1536 .collect_vec();
1537
1538 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1539 fragments_mutation_fn(*fragment_type_mask, stream_node)
1540 });
1541 if fragments.is_empty() {
1542 return Err(MetaError::invalid_parameter(format!(
1543 "job id {job_id}: {}",
1544 err_msg
1545 )));
1546 }
1547
1548 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1549 for (id, _, stream_node) in fragments {
1550 fragment::ActiveModel {
1551 fragment_id: Set(id),
1552 stream_node: Set(StreamNode::from(&stream_node)),
1553 ..Default::default()
1554 }
1555 .update(&txn)
1556 .await?;
1557 }
1558 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1559
1560 txn.commit().await?;
1561
1562 Ok(fragment_actors)
1563 }
1564
1565 async fn mutate_fragment_by_fragment_id(
1566 &self,
1567 fragment_id: FragmentId,
1568 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1569 err_msg: &'static str,
1570 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1571 let inner = self.inner.read().await;
1572 let txn = inner.db.begin().await?;
1573
1574 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1575 Fragment::find_by_id(fragment_id)
1576 .select_only()
1577 .columns([
1578 fragment::Column::FragmentTypeMask,
1579 fragment::Column::StreamNode,
1580 ])
1581 .into_tuple()
1582 .one(&txn)
1583 .await?
1584 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1585 let mut pb_stream_node = stream_node.to_protobuf();
1586 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1587
1588 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1589 return Err(MetaError::invalid_parameter(format!(
1590 "fragment id {fragment_id}: {}",
1591 err_msg
1592 )));
1593 }
1594
1595 fragment::ActiveModel {
1596 fragment_id: Set(fragment_id),
1597 stream_node: Set(stream_node),
1598 ..Default::default()
1599 }
1600 .update(&txn)
1601 .await?;
1602
1603 let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1604
1605 txn.commit().await?;
1606
1607 Ok(fragment_actors)
1608 }
1609
1610 pub async fn update_backfill_rate_limit_by_job_id(
1613 &self,
1614 job_id: ObjectId,
1615 rate_limit: Option<u32>,
1616 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1617 let update_backfill_rate_limit =
1618 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1619 let mut found = false;
1620 if fragment_type_mask
1621 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1622 {
1623 visit_stream_node_mut(stream_node, |node| match node {
1624 PbNodeBody::StreamCdcScan(node) => {
1625 node.rate_limit = rate_limit;
1626 found = true;
1627 }
1628 PbNodeBody::StreamScan(node) => {
1629 node.rate_limit = rate_limit;
1630 found = true;
1631 }
1632 PbNodeBody::SourceBackfill(node) => {
1633 node.rate_limit = rate_limit;
1634 found = true;
1635 }
1636 PbNodeBody::Sink(node) => {
1637 node.rate_limit = rate_limit;
1638 found = true;
1639 }
1640 _ => {}
1641 });
1642 }
1643 found
1644 };
1645
1646 self.mutate_fragments_by_job_id(
1647 job_id,
1648 update_backfill_rate_limit,
1649 "stream scan node or source node not found",
1650 )
1651 .await
1652 }
1653
1654 pub async fn update_sink_rate_limit_by_job_id(
1657 &self,
1658 job_id: ObjectId,
1659 rate_limit: Option<u32>,
1660 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1661 let update_sink_rate_limit =
1662 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1663 let mut found = false;
1664 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1665 visit_stream_node_mut(stream_node, |node| {
1666 if let PbNodeBody::Sink(node) = node {
1667 node.rate_limit = rate_limit;
1668 found = true;
1669 }
1670 });
1671 }
1672 found
1673 };
1674
1675 self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1676 .await
1677 }
1678
1679 pub async fn update_dml_rate_limit_by_job_id(
1680 &self,
1681 job_id: ObjectId,
1682 rate_limit: Option<u32>,
1683 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1684 let update_dml_rate_limit =
1685 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1686 let mut found = false;
1687 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1688 visit_stream_node_mut(stream_node, |node| {
1689 if let PbNodeBody::Dml(node) = node {
1690 node.rate_limit = rate_limit;
1691 found = true;
1692 }
1693 });
1694 }
1695 found
1696 };
1697
1698 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1699 .await
1700 }
1701
1702 pub async fn update_source_props_by_source_id(
1703 &self,
1704 source_id: SourceId,
1705 alter_props: BTreeMap<String, String>,
1706 alter_secret_refs: BTreeMap<String, PbSecretRef>,
1707 ) -> MetaResult<WithOptionsSecResolved> {
1708 let inner = self.inner.read().await;
1709 let txn = inner.db.begin().await?;
1710
1711 let (source, _obj) = Source::find_by_id(source_id)
1712 .find_also_related(Object)
1713 .one(&txn)
1714 .await?
1715 .ok_or_else(|| {
1716 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1717 })?;
1718 let connector = source.with_properties.0.get_connector().unwrap();
1719
1720 let prop_keys: Vec<String> = alter_props
1722 .keys()
1723 .chain(alter_secret_refs.keys())
1724 .cloned()
1725 .collect();
1726 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1727 &connector, &prop_keys,
1728 )?;
1729
1730 let mut options_with_secret = WithOptionsSecResolved::new(
1731 source.with_properties.0.clone(),
1732 source
1733 .secret_ref
1734 .map(|secret_ref| secret_ref.to_protobuf())
1735 .unwrap_or_default(),
1736 );
1737 let (to_add_secret_dep, to_remove_secret_dep) =
1738 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1739
1740 tracing::info!(
1741 "applying new properties to source: source_id={}, options_with_secret={:?}",
1742 source_id,
1743 options_with_secret
1744 );
1745 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1747 let mut associate_table_id = None;
1750
1751 let mut preferred_id: i32 = source_id;
1755 let rewrite_sql = {
1756 let definition = source.definition.clone();
1757
1758 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1759 .map_err(|e| {
1760 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1761 anyhow!(e).context("Failed to parse source definition SQL"),
1762 )))
1763 })?
1764 .try_into()
1765 .unwrap();
1766
1767 async fn format_with_option_secret_resolved(
1781 txn: &DatabaseTransaction,
1782 options_with_secret: &WithOptionsSecResolved,
1783 ) -> MetaResult<Vec<SqlOption>> {
1784 let mut options = Vec::new();
1785 for (k, v) in options_with_secret.as_plaintext() {
1786 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1787 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1788 options.push(sql_option);
1789 }
1790 for (k, v) in options_with_secret.as_secret() {
1791 if let Some(secret_model) =
1792 Secret::find_by_id(v.secret_id as i32).one(txn).await?
1793 {
1794 let sql_option =
1795 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
1796 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1797 options.push(sql_option);
1798 } else {
1799 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
1800 }
1801 }
1802 Ok(options)
1803 }
1804
1805 match &mut stmt {
1806 Statement::CreateSource { stmt } => {
1807 stmt.with_properties.0 =
1808 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1809 }
1810 Statement::CreateTable { with_options, .. } => {
1811 *with_options =
1812 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1813 associate_table_id = source.optional_associated_table_id;
1814 preferred_id = associate_table_id.unwrap();
1815 }
1816 _ => unreachable!(),
1817 }
1818
1819 stmt.to_string()
1820 };
1821
1822 {
1823 if !to_add_secret_dep.is_empty() {
1825 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
1826 object_dependency::ActiveModel {
1827 oid: Set(secret_id as _),
1828 used_by: Set(preferred_id as _),
1829 ..Default::default()
1830 }
1831 }))
1832 .exec(&txn)
1833 .await?;
1834 }
1835 if !to_remove_secret_dep.is_empty() {
1836 let _ = ObjectDependency::delete_many()
1838 .filter(
1839 object_dependency::Column::Oid
1840 .is_in(to_remove_secret_dep)
1841 .and(
1842 object_dependency::Column::UsedBy.eq::<ObjectId>(preferred_id as _),
1843 ),
1844 )
1845 .exec(&txn)
1846 .await?;
1847 }
1848 }
1849
1850 let active_source_model = source::ActiveModel {
1851 source_id: Set(source_id),
1852 definition: Set(rewrite_sql.clone()),
1853 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
1854 secret_ref: Set((!options_with_secret.as_secret().is_empty())
1855 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
1856 ..Default::default()
1857 };
1858 active_source_model.update(&txn).await?;
1859
1860 if let Some(associate_table_id) = associate_table_id {
1861 let active_table_model = table::ActiveModel {
1863 table_id: Set(associate_table_id),
1864 definition: Set(rewrite_sql),
1865 ..Default::default()
1866 };
1867 active_table_model.update(&txn).await?;
1868 }
1869
1870 update_connector_props_fragments(
1872 &txn,
1873 if let Some(associate_table_id) = associate_table_id {
1874 associate_table_id
1876 } else {
1877 source_id
1878 },
1879 FragmentTypeFlag::Source,
1880 |node, found| {
1881 if let PbNodeBody::Source(node) = node
1882 && let Some(source_inner) = &mut node.source_inner
1883 {
1884 source_inner.with_properties = options_with_secret.as_plaintext().clone();
1885 source_inner.secret_refs = options_with_secret.as_secret().clone();
1886 *found = true;
1887 }
1888 },
1889 )
1890 .await?;
1891
1892 let mut to_update_objs = Vec::with_capacity(2);
1893 let (source, obj) = Source::find_by_id(source_id)
1894 .find_also_related(Object)
1895 .one(&txn)
1896 .await?
1897 .ok_or_else(|| {
1898 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1899 })?;
1900 to_update_objs.push(PbObject {
1901 object_info: Some(PbObjectInfo::Source(
1902 ObjectModel(source, obj.unwrap()).into(),
1903 )),
1904 });
1905
1906 if let Some(associate_table_id) = associate_table_id {
1907 let (table, obj) = Table::find_by_id(associate_table_id)
1908 .find_also_related(Object)
1909 .one(&txn)
1910 .await?
1911 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
1912 to_update_objs.push(PbObject {
1913 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1914 });
1915 }
1916
1917 txn.commit().await?;
1918
1919 self.notify_frontend(
1920 NotificationOperation::Update,
1921 NotificationInfo::ObjectGroup(PbObjectGroup {
1922 objects: to_update_objs,
1923 }),
1924 )
1925 .await;
1926
1927 Ok(options_with_secret)
1928 }
1929
1930 pub async fn update_sink_props_by_sink_id(
1931 &self,
1932 sink_id: SinkId,
1933 props: BTreeMap<String, String>,
1934 ) -> MetaResult<HashMap<String, String>> {
1935 let inner = self.inner.read().await;
1936 let txn = inner.db.begin().await?;
1937
1938 let (sink, _obj) = Sink::find_by_id(sink_id)
1939 .find_also_related(Object)
1940 .one(&txn)
1941 .await?
1942 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
1943
1944 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
1946 Some(connector) => {
1947 let connector_type = connector.to_lowercase();
1948 let field_names: Vec<String> = props.keys().cloned().collect();
1949 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
1950 .map_err(|e| SinkError::Config(anyhow!(e)))?;
1951
1952 match_sink_name_str!(
1953 connector_type.as_str(),
1954 SinkType,
1955 {
1956 let mut new_props = sink.properties.0.clone();
1957 new_props.extend(props.clone());
1958 SinkType::validate_alter_config(&new_props)
1959 },
1960 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
1961 )?
1962 }
1963 None => {
1964 return Err(
1965 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
1966 );
1967 }
1968 };
1969 let definition = sink.definition.clone();
1970 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1971 .map_err(|e| SinkError::Config(anyhow!(e)))?
1972 .try_into()
1973 .unwrap();
1974 if let Statement::CreateSink { stmt } = &mut stmt {
1975 let mut new_sql_options = stmt
1976 .with_properties
1977 .0
1978 .iter()
1979 .map(|sql_option| (&sql_option.name, sql_option))
1980 .collect::<IndexMap<_, _>>();
1981 let add_sql_options = props
1982 .iter()
1983 .map(|(k, v)| SqlOption::try_from((k, v)))
1984 .collect::<Result<Vec<SqlOption>, ParserError>>()
1985 .map_err(|e| SinkError::Config(anyhow!(e)))?;
1986 new_sql_options.extend(
1987 add_sql_options
1988 .iter()
1989 .map(|sql_option| (&sql_option.name, sql_option)),
1990 );
1991 stmt.with_properties.0 = new_sql_options.into_values().cloned().collect();
1992 } else {
1993 panic!("sink definition is not a create sink statement")
1994 }
1995 let mut new_config = sink.properties.clone().into_inner();
1996 new_config.extend(props);
1997
1998 let active_sink = sink::ActiveModel {
1999 sink_id: Set(sink_id),
2000 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2001 definition: Set(stmt.to_string()),
2002 ..Default::default()
2003 };
2004 active_sink.update(&txn).await?;
2005
2006 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2007 .select_only()
2008 .columns([
2009 fragment::Column::FragmentId,
2010 fragment::Column::FragmentTypeMask,
2011 fragment::Column::StreamNode,
2012 ])
2013 .filter(fragment::Column::JobId.eq(sink_id))
2014 .into_tuple()
2015 .all(&txn)
2016 .await?;
2017 let fragments = fragments
2018 .into_iter()
2019 .filter(|(_, fragment_type_mask, _)| {
2020 FragmentTypeMask::from(*fragment_type_mask).contains(FragmentTypeFlag::Sink)
2021 })
2022 .filter_map(|(id, _, stream_node)| {
2023 let mut stream_node = stream_node.to_protobuf();
2024 let mut found = false;
2025 visit_stream_node_mut(&mut stream_node, |node| {
2026 if let PbNodeBody::Sink(node) = node
2027 && let Some(sink_desc) = &mut node.sink_desc
2028 && sink_desc.id == sink_id as u32
2029 {
2030 sink_desc.properties = new_config.clone();
2031 found = true;
2032 }
2033 });
2034 if found { Some((id, stream_node)) } else { None }
2035 })
2036 .collect_vec();
2037 assert!(
2038 !fragments.is_empty(),
2039 "sink id should be used by at least one fragment"
2040 );
2041 for (id, stream_node) in fragments {
2042 fragment::ActiveModel {
2043 fragment_id: Set(id),
2044 stream_node: Set(StreamNode::from(&stream_node)),
2045 ..Default::default()
2046 }
2047 .update(&txn)
2048 .await?;
2049 }
2050
2051 let (sink, obj) = Sink::find_by_id(sink_id)
2052 .find_also_related(Object)
2053 .one(&txn)
2054 .await?
2055 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2056
2057 txn.commit().await?;
2058
2059 let relation_info = PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into());
2060 let _version = self
2061 .notify_frontend(
2062 NotificationOperation::Update,
2063 NotificationInfo::ObjectGroup(PbObjectGroup {
2064 objects: vec![PbObject {
2065 object_info: Some(relation_info),
2066 }],
2067 }),
2068 )
2069 .await;
2070
2071 Ok(new_config.into_iter().collect())
2072 }
2073
2074 pub async fn update_fragment_rate_limit_by_fragment_id(
2075 &self,
2076 fragment_id: FragmentId,
2077 rate_limit: Option<u32>,
2078 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2079 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2080 stream_node: &mut PbStreamNode| {
2081 let mut found = false;
2082 if fragment_type_mask.contains_any(
2083 FragmentTypeFlag::dml_rate_limit_fragments()
2084 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2085 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2086 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2087 ) {
2088 visit_stream_node_mut(stream_node, |node| {
2089 if let PbNodeBody::Dml(node) = node {
2090 node.rate_limit = rate_limit;
2091 found = true;
2092 }
2093 if let PbNodeBody::Sink(node) = node {
2094 node.rate_limit = rate_limit;
2095 found = true;
2096 }
2097 if let PbNodeBody::StreamCdcScan(node) = node {
2098 node.rate_limit = rate_limit;
2099 found = true;
2100 }
2101 if let PbNodeBody::StreamScan(node) = node {
2102 node.rate_limit = rate_limit;
2103 found = true;
2104 }
2105 if let PbNodeBody::SourceBackfill(node) = node {
2106 node.rate_limit = rate_limit;
2107 found = true;
2108 }
2109 });
2110 }
2111 found
2112 };
2113 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2114 .await
2115 }
2116
2117 pub async fn post_apply_reschedules(
2118 &self,
2119 reschedules: HashMap<FragmentId, Reschedule>,
2120 post_updates: &JobReschedulePostUpdates,
2121 ) -> MetaResult<()> {
2122 let new_created_actors: HashSet<_> = reschedules
2123 .values()
2124 .flat_map(|reschedule| {
2125 reschedule
2126 .added_actors
2127 .values()
2128 .flatten()
2129 .map(|actor_id| *actor_id as ActorId)
2130 })
2131 .collect();
2132
2133 let inner = self.inner.write().await;
2134
2135 let txn = inner.db.begin().await?;
2136
2137 let mut fragment_mapping_to_notify = vec![];
2138
2139 for (
2140 fragment_id,
2141 Reschedule {
2142 removed_actors,
2143 vnode_bitmap_updates,
2144 actor_splits,
2145 newly_created_actors,
2146 ..
2147 },
2148 ) in reschedules
2149 {
2150 Actor::delete_many()
2152 .filter(
2153 actor::Column::ActorId
2154 .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
2155 )
2156 .exec(&txn)
2157 .await?;
2158
2159 for (
2161 (
2162 StreamActor {
2163 actor_id,
2164 fragment_id,
2165 vnode_bitmap,
2166 expr_context,
2167 ..
2168 },
2169 _,
2170 ),
2171 worker_id,
2172 ) in newly_created_actors.into_values()
2173 {
2174 let splits = actor_splits
2175 .get(&actor_id)
2176 .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
2177
2178 Actor::insert(actor::ActiveModel {
2179 actor_id: Set(actor_id as _),
2180 fragment_id: Set(fragment_id as _),
2181 status: Set(ActorStatus::Running),
2182 splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
2183 worker_id: Set(worker_id),
2184 upstream_actor_ids: Set(Default::default()),
2185 vnode_bitmap: Set(vnode_bitmap
2186 .as_ref()
2187 .map(|bitmap| (&bitmap.to_protobuf()).into())),
2188 expr_context: Set(expr_context.as_ref().unwrap().into()),
2189 })
2190 .exec(&txn)
2191 .await?;
2192 }
2193
2194 for (actor_id, bitmap) in vnode_bitmap_updates {
2196 let actor = Actor::find_by_id(actor_id as ActorId)
2197 .one(&txn)
2198 .await?
2199 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2200
2201 let mut actor = actor.into_active_model();
2202 actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
2203 actor.update(&txn).await?;
2204 }
2205
2206 for (actor_id, splits) in actor_splits {
2208 if new_created_actors.contains(&(actor_id as ActorId)) {
2209 continue;
2210 }
2211
2212 let actor = Actor::find_by_id(actor_id as ActorId)
2213 .one(&txn)
2214 .await?
2215 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2216
2217 let mut actor = actor.into_active_model();
2218 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
2219 actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
2220 actor.update(&txn).await?;
2221 }
2222
2223 let fragment = Fragment::find_by_id(fragment_id)
2225 .one(&txn)
2226 .await?
2227 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
2228
2229 let job_actors = fragment
2230 .find_related(Actor)
2231 .all(&txn)
2232 .await?
2233 .into_iter()
2234 .map(|actor| {
2235 (
2236 fragment_id,
2237 fragment.distribution_type,
2238 actor.actor_id,
2239 actor.vnode_bitmap,
2240 actor.worker_id,
2241 actor.status,
2242 )
2243 })
2244 .collect_vec();
2245
2246 fragment_mapping_to_notify.extend(rebuild_fragment_mapping_from_actors(job_actors));
2247 }
2248
2249 let JobReschedulePostUpdates {
2250 parallelism_updates,
2251 resource_group_updates,
2252 } = post_updates;
2253
2254 for (table_id, parallelism) in parallelism_updates {
2255 let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
2256 .one(&txn)
2257 .await?
2258 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
2259 .into_active_model();
2260
2261 streaming_job.parallelism = Set(match parallelism {
2262 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
2263 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
2264 TableParallelism::Custom => StreamingParallelism::Custom,
2265 });
2266
2267 if let Some(resource_group) =
2268 resource_group_updates.get(&(table_id.table_id() as ObjectId))
2269 {
2270 streaming_job.specific_resource_group = Set(resource_group.to_owned());
2271 }
2272
2273 streaming_job.update(&txn).await?;
2274 }
2275
2276 txn.commit().await?;
2277 self.notify_fragment_mapping(Operation::Update, fragment_mapping_to_notify)
2278 .await;
2279
2280 Ok(())
2281 }
2282
2283 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2286 let inner = self.inner.read().await;
2287 let txn = inner.db.begin().await?;
2288
2289 let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
2290 .select_only()
2291 .columns([
2292 fragment::Column::FragmentId,
2293 fragment::Column::JobId,
2294 fragment::Column::FragmentTypeMask,
2295 fragment::Column::StreamNode,
2296 ])
2297 .filter(fragment_type_mask_intersects(FragmentTypeFlag::raw_flag(
2298 FragmentTypeFlag::rate_limit_fragments(),
2299 ) as _))
2300 .into_tuple()
2301 .all(&txn)
2302 .await?;
2303
2304 let mut rate_limits = Vec::new();
2305 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2306 let stream_node = stream_node.to_protobuf();
2307 visit_stream_node_body(&stream_node, |node| {
2308 let mut rate_limit = None;
2309 let mut node_name = None;
2310
2311 match node {
2312 PbNodeBody::Source(node) => {
2314 if let Some(node_inner) = &node.source_inner {
2315 rate_limit = node_inner.rate_limit;
2316 node_name = Some("SOURCE");
2317 }
2318 }
2319 PbNodeBody::StreamFsFetch(node) => {
2320 if let Some(node_inner) = &node.node_inner {
2321 rate_limit = node_inner.rate_limit;
2322 node_name = Some("FS_FETCH");
2323 }
2324 }
2325 PbNodeBody::SourceBackfill(node) => {
2327 rate_limit = node.rate_limit;
2328 node_name = Some("SOURCE_BACKFILL");
2329 }
2330 PbNodeBody::StreamScan(node) => {
2331 rate_limit = node.rate_limit;
2332 node_name = Some("STREAM_SCAN");
2333 }
2334 PbNodeBody::StreamCdcScan(node) => {
2335 rate_limit = node.rate_limit;
2336 node_name = Some("STREAM_CDC_SCAN");
2337 }
2338 PbNodeBody::Sink(node) => {
2339 rate_limit = node.rate_limit;
2340 node_name = Some("SINK");
2341 }
2342 _ => {}
2343 }
2344
2345 if let Some(rate_limit) = rate_limit {
2346 rate_limits.push(RateLimitInfo {
2347 fragment_id: fragment_id as u32,
2348 job_id: job_id as u32,
2349 fragment_type_mask: fragment_type_mask as u32,
2350 rate_limit,
2351 node_name: node_name.unwrap().to_owned(),
2352 });
2353 }
2354 });
2355 }
2356
2357 Ok(rate_limits)
2358 }
2359}
2360
2361fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr {
2362 column
2363 .binary(BinOper::Custom("&"), value)
2364 .binary(BinOper::NotEqual, 0)
2365}
2366
2367fn fragment_type_mask_intersects(value: i32) -> SimpleExpr {
2368 bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value)
2369}
2370
2371pub struct SinkIntoTableContext {
2372 pub creating_sink_id: Option<SinkId>,
2374 pub dropping_sink_id: Option<SinkId>,
2376 pub updated_sink_catalogs: Vec<SinkId>,
2379}
2380
2381async fn update_connector_props_fragments<F>(
2382 txn: &DatabaseTransaction,
2383 job_id: i32,
2384 expect_flag: FragmentTypeFlag,
2385 mut alter_stream_node_fn: F,
2386) -> MetaResult<()>
2387where
2388 F: FnMut(&mut PbNodeBody, &mut bool),
2389{
2390 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2391 .select_only()
2392 .columns([
2393 fragment::Column::FragmentId,
2394 fragment::Column::FragmentTypeMask,
2395 fragment::Column::StreamNode,
2396 ])
2397 .filter(fragment::Column::JobId.eq(job_id))
2398 .into_tuple()
2399 .all(txn)
2400 .await?;
2401 let fragments = fragments
2402 .into_iter()
2403 .filter(|(_, fragment_type_mask, _)| *fragment_type_mask & expect_flag as i32 != 0)
2404 .filter_map(|(id, _, stream_node)| {
2405 let mut stream_node = stream_node.to_protobuf();
2406 let mut found = false;
2407 visit_stream_node_mut(&mut stream_node, |node| {
2408 alter_stream_node_fn(node, &mut found);
2409 });
2410 if found { Some((id, stream_node)) } else { None }
2411 })
2412 .collect_vec();
2413 assert!(
2414 !fragments.is_empty(),
2415 "job {} (type: {:?}) should be used by at least one fragment",
2416 job_id,
2417 expect_flag
2418 );
2419
2420 for (id, stream_node) in fragments {
2421 fragment::ActiveModel {
2422 fragment_id: Set(id),
2423 stream_node: Set(StreamNode::from(&stream_node)),
2424 ..Default::default()
2425 }
2426 .update(txn)
2427 .await?;
2428 }
2429
2430 Ok(())
2431}