1use std::collections::{HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::config::DefaultParallelism;
21use risingwave_common::hash::VnodeCountCompat;
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
24use risingwave_common::{bail, current_cluster_version};
25use risingwave_connector::WithPropertiesExt;
26use risingwave_meta_model::actor::ActorStatus;
27use risingwave_meta_model::object::ObjectType;
28use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
29use risingwave_meta_model::table::TableType;
30use risingwave_meta_model::*;
31use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
32use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
33use risingwave_pb::catalog::{PbCreateType, PbTable};
34use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
35use risingwave_pb::meta::object::PbObjectInfo;
36use risingwave_pb::meta::subscribe_response::{
37 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
38};
39use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
40use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
41use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
42use risingwave_pb::stream_plan::stream_node::PbNodeBody;
43use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode};
44use risingwave_pb::user::PbUserInfo;
45use sea_orm::ActiveValue::Set;
46use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr};
47use sea_orm::{
48 ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
49 IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect,
50 RelationTrait, TransactionTrait,
51};
52use thiserror_ext::AsReport;
53
54use crate::barrier::{ReplaceStreamJobPlan, Reschedule};
55use crate::controller::ObjectModel;
56use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
57use crate::controller::rename::ReplaceTableExprRewriter;
58use crate::controller::utils::{
59 PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
60 check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
61 get_fragment_mappings, get_internal_tables_by_id, insert_fragment_relations,
62 rebuild_fragment_mapping_from_actors,
63};
64use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
65use crate::model::{
66 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
67 StreamJobFragmentsToCreate, TableParallelism,
68};
69use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
70use crate::{MetaError, MetaResult};
71
72impl CatalogController {
73 pub async fn create_streaming_job_obj(
74 txn: &DatabaseTransaction,
75 obj_type: ObjectType,
76 owner_id: UserId,
77 database_id: Option<DatabaseId>,
78 schema_id: Option<SchemaId>,
79 create_type: PbCreateType,
80 ctx: &StreamContext,
81 streaming_parallelism: StreamingParallelism,
82 max_parallelism: usize,
83 specific_resource_group: Option<String>, ) -> MetaResult<ObjectId> {
85 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
86 let job = streaming_job::ActiveModel {
87 job_id: Set(obj.oid),
88 job_status: Set(JobStatus::Initial),
89 create_type: Set(create_type.into()),
90 timezone: Set(ctx.timezone.clone()),
91 parallelism: Set(streaming_parallelism),
92 max_parallelism: Set(max_parallelism as _),
93 specific_resource_group: Set(specific_resource_group),
94 };
95 job.insert(txn).await?;
96
97 Ok(obj.oid)
98 }
99
100 pub async fn create_job_catalog(
106 &self,
107 streaming_job: &mut StreamingJob,
108 ctx: &StreamContext,
109 parallelism: &Option<Parallelism>,
110 max_parallelism: usize,
111 mut dependencies: HashSet<ObjectId>,
112 specific_resource_group: Option<String>,
113 ) -> MetaResult<()> {
114 let inner = self.inner.write().await;
115 let txn = inner.db.begin().await?;
116 let create_type = streaming_job.create_type();
117
118 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
119 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
120 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
121 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
122 };
123
124 ensure_user_id(streaming_job.owner() as _, &txn).await?;
125 ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
126 ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
127 check_relation_name_duplicate(
128 &streaming_job.name(),
129 streaming_job.database_id() as _,
130 streaming_job.schema_id() as _,
131 &txn,
132 )
133 .await?;
134
135 dependencies.extend(
137 streaming_job
138 .dependent_relations()
139 .into_iter()
140 .map(|id| id as ObjectId),
141 );
142
143 if !dependencies.is_empty() {
145 let altering_cnt = ObjectDependency::find()
146 .join(
147 JoinType::InnerJoin,
148 object_dependency::Relation::Object1.def(),
149 )
150 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
151 .filter(
152 object_dependency::Column::Oid
153 .is_in(dependencies.clone())
154 .and(object::Column::ObjType.eq(ObjectType::Table))
155 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
156 .and(
157 object::Column::Oid.not_in_subquery(
159 Query::select()
160 .column(table::Column::TableId)
161 .from(Table)
162 .to_owned(),
163 ),
164 ),
165 )
166 .count(&txn)
167 .await?;
168 if altering_cnt != 0 {
169 return Err(MetaError::permission_denied(
170 "some dependent relations are being altered",
171 ));
172 }
173 }
174
175 match streaming_job {
176 StreamingJob::MaterializedView(table) => {
177 let job_id = Self::create_streaming_job_obj(
178 &txn,
179 ObjectType::Table,
180 table.owner as _,
181 Some(table.database_id as _),
182 Some(table.schema_id as _),
183 create_type,
184 ctx,
185 streaming_parallelism,
186 max_parallelism,
187 specific_resource_group,
188 )
189 .await?;
190 table.id = job_id as _;
191 let table_model: table::ActiveModel = table.clone().into();
192 Table::insert(table_model).exec(&txn).await?;
193 }
194 StreamingJob::Sink(sink, _) => {
195 if let Some(target_table_id) = sink.target_table {
196 if check_sink_into_table_cycle(
197 target_table_id as ObjectId,
198 dependencies.iter().cloned().collect(),
199 &txn,
200 )
201 .await?
202 {
203 bail!("Creating such a sink will result in circular dependency.");
204 }
205 }
206
207 let job_id = Self::create_streaming_job_obj(
208 &txn,
209 ObjectType::Sink,
210 sink.owner as _,
211 Some(sink.database_id as _),
212 Some(sink.schema_id as _),
213 create_type,
214 ctx,
215 streaming_parallelism,
216 max_parallelism,
217 specific_resource_group,
218 )
219 .await?;
220 sink.id = job_id as _;
221 let sink_model: sink::ActiveModel = sink.clone().into();
222 Sink::insert(sink_model).exec(&txn).await?;
223 }
224 StreamingJob::Table(src, table, _) => {
225 let job_id = Self::create_streaming_job_obj(
226 &txn,
227 ObjectType::Table,
228 table.owner as _,
229 Some(table.database_id as _),
230 Some(table.schema_id as _),
231 create_type,
232 ctx,
233 streaming_parallelism,
234 max_parallelism,
235 specific_resource_group,
236 )
237 .await?;
238 table.id = job_id as _;
239 if let Some(src) = src {
240 let src_obj = Self::create_object(
241 &txn,
242 ObjectType::Source,
243 src.owner as _,
244 Some(src.database_id as _),
245 Some(src.schema_id as _),
246 )
247 .await?;
248 src.id = src_obj.oid as _;
249 src.optional_associated_table_id =
250 Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
251 table.optional_associated_source_id = Some(
252 PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
253 );
254 let source: source::ActiveModel = src.clone().into();
255 Source::insert(source).exec(&txn).await?;
256 }
257 let table_model: table::ActiveModel = table.clone().into();
258 Table::insert(table_model).exec(&txn).await?;
259 }
260 StreamingJob::Index(index, table) => {
261 ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
262 let job_id = Self::create_streaming_job_obj(
263 &txn,
264 ObjectType::Index,
265 index.owner as _,
266 Some(index.database_id as _),
267 Some(index.schema_id as _),
268 create_type,
269 ctx,
270 streaming_parallelism,
271 max_parallelism,
272 specific_resource_group,
273 )
274 .await?;
275 index.id = job_id as _;
277 index.index_table_id = job_id as _;
278 table.id = job_id as _;
279
280 ObjectDependency::insert(object_dependency::ActiveModel {
281 oid: Set(index.primary_table_id as _),
282 used_by: Set(table.id as _),
283 ..Default::default()
284 })
285 .exec(&txn)
286 .await?;
287
288 let table_model: table::ActiveModel = table.clone().into();
289 Table::insert(table_model).exec(&txn).await?;
290 let index_model: index::ActiveModel = index.clone().into();
291 Index::insert(index_model).exec(&txn).await?;
292 }
293 StreamingJob::Source(src) => {
294 let job_id = Self::create_streaming_job_obj(
295 &txn,
296 ObjectType::Source,
297 src.owner as _,
298 Some(src.database_id as _),
299 Some(src.schema_id as _),
300 create_type,
301 ctx,
302 streaming_parallelism,
303 max_parallelism,
304 specific_resource_group,
305 )
306 .await?;
307 src.id = job_id as _;
308 let source_model: source::ActiveModel = src.clone().into();
309 Source::insert(source_model).exec(&txn).await?;
310 }
311 }
312
313 dependencies.extend(
315 streaming_job
316 .dependent_secret_ids()?
317 .into_iter()
318 .map(|secret_id| secret_id as ObjectId),
319 );
320 dependencies.extend(
322 streaming_job
323 .dependent_connection_ids()?
324 .into_iter()
325 .map(|conn_id| conn_id as ObjectId),
326 );
327
328 if !dependencies.is_empty() {
330 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
331 object_dependency::ActiveModel {
332 oid: Set(oid),
333 used_by: Set(streaming_job.id() as _),
334 ..Default::default()
335 }
336 }))
337 .exec(&txn)
338 .await?;
339 }
340
341 txn.commit().await?;
342
343 Ok(())
344 }
345
346 pub async fn create_internal_table_catalog(
354 &self,
355 job: &StreamingJob,
356 mut incomplete_internal_tables: Vec<PbTable>,
357 ) -> MetaResult<HashMap<u32, u32>> {
358 let job_id = job.id() as ObjectId;
359 let inner = self.inner.write().await;
360 let txn = inner.db.begin().await?;
361 let mut table_id_map = HashMap::new();
362 for table in &mut incomplete_internal_tables {
363 let table_id = Self::create_object(
364 &txn,
365 ObjectType::Table,
366 table.owner as _,
367 Some(table.database_id as _),
368 Some(table.schema_id as _),
369 )
370 .await?
371 .oid;
372 table_id_map.insert(table.id, table_id as u32);
373 table.id = table_id as _;
374 table.job_id = Some(job_id as _);
375
376 let table_model = table::ActiveModel {
377 table_id: Set(table_id as _),
378 belongs_to_job_id: Set(Some(job_id)),
379 fragment_id: NotSet,
380 ..table.clone().into()
381 };
382 Table::insert(table_model).exec(&txn).await?;
383 }
384 txn.commit().await?;
385
386 Ok(table_id_map)
387 }
388
389 pub async fn prepare_streaming_job(
394 &self,
395 stream_job_fragments: &StreamJobFragmentsToCreate,
396 streaming_job: &StreamingJob,
397 for_replace: bool,
398 ) -> MetaResult<()> {
399 let is_materialized_view = streaming_job.is_materialized_view();
400 let fragment_actors =
401 Self::extract_fragment_and_actors_from_fragments(stream_job_fragments)?;
402 let mut all_tables = stream_job_fragments.all_tables();
403 let inner = self.inner.write().await;
404
405 let mut objects = vec![];
406 let txn = inner.db.begin().await?;
407
408 let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
410 for fragment in fragments {
411 let fragment_id = fragment.fragment_id;
412 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
413
414 let fragment = fragment.into_active_model();
415 Fragment::insert(fragment).exec(&txn).await?;
416
417 if !for_replace {
420 for state_table_id in state_table_ids {
421 let table = all_tables
425 .get_mut(&(state_table_id as u32))
426 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
427 assert_eq!(table.id, state_table_id as u32);
428 assert_eq!(table.fragment_id, fragment_id as u32);
429 table.job_id = Some(streaming_job.id());
430 let vnode_count = table.vnode_count();
431
432 table::ActiveModel {
433 table_id: Set(state_table_id as _),
434 fragment_id: Set(Some(fragment_id)),
435 vnode_count: Set(vnode_count as _),
436 ..Default::default()
437 }
438 .update(&txn)
439 .await?;
440
441 if is_materialized_view {
442 objects.push(PbObject {
443 object_info: Some(PbObjectInfo::Table(table.clone())),
444 });
445 }
446 }
447 }
448 }
449
450 insert_fragment_relations(&txn, &stream_job_fragments.downstreams).await?;
451
452 for actors in actors {
454 for actor in actors {
455 let actor = actor.into_active_model();
456 Actor::insert(actor).exec(&txn).await?;
457 }
458 }
459
460 if !for_replace {
461 if let StreamingJob::Table(_, table, ..) = streaming_job {
463 Table::update(table::ActiveModel {
464 table_id: Set(table.id as _),
465 dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
466 ..Default::default()
467 })
468 .exec(&txn)
469 .await?;
470 }
471 }
472
473 txn.commit().await?;
474
475 if !objects.is_empty() {
476 assert!(is_materialized_view);
477 self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
478 .await;
479 }
480
481 Ok(())
482 }
483
484 pub async fn try_abort_creating_streaming_job(
488 &self,
489 job_id: ObjectId,
490 is_cancelled: bool,
491 ) -> MetaResult<(bool, Option<DatabaseId>)> {
492 let mut inner = self.inner.write().await;
493 let txn = inner.db.begin().await?;
494
495 let obj = Object::find_by_id(job_id).one(&txn).await?;
496 let Some(obj) = obj else {
497 tracing::warn!(
498 id = job_id,
499 "streaming job not found when aborting creating, might be cleaned by recovery"
500 );
501 return Ok((true, None));
502 };
503 let database_id = obj
504 .database_id
505 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
506
507 if !is_cancelled {
508 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
509 if let Some(streaming_job) = streaming_job {
510 assert_ne!(streaming_job.job_status, JobStatus::Created);
511 if streaming_job.create_type == CreateType::Background
512 && streaming_job.job_status == JobStatus::Creating
513 {
514 tracing::warn!(
516 id = job_id,
517 "streaming job is created in background and still in creating status"
518 );
519 return Ok((false, Some(database_id)));
520 }
521 }
522 }
523
524 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
525
526 let table_obj = Table::find_by_id(job_id).one(&txn).await?;
528 let mut objs = vec![];
529 if let Some(table) = &table_obj
530 && table.table_type == TableType::MaterializedView
531 {
532 let obj: Option<PartialObject> = Object::find_by_id(job_id)
533 .select_only()
534 .columns([
535 object::Column::Oid,
536 object::Column::ObjType,
537 object::Column::SchemaId,
538 object::Column::DatabaseId,
539 ])
540 .into_partial_model()
541 .one(&txn)
542 .await?;
543 let obj =
544 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
545 objs.push(obj);
546 let internal_table_objs: Vec<PartialObject> = Object::find()
547 .select_only()
548 .columns([
549 object::Column::Oid,
550 object::Column::ObjType,
551 object::Column::SchemaId,
552 object::Column::DatabaseId,
553 ])
554 .join(JoinType::InnerJoin, object::Relation::Table.def())
555 .filter(table::Column::BelongsToJobId.eq(job_id))
556 .into_partial_model()
557 .all(&txn)
558 .await?;
559 objs.extend(internal_table_objs);
560 }
561
562 Object::delete_by_id(job_id).exec(&txn).await?;
563 if !internal_table_ids.is_empty() {
564 Object::delete_many()
565 .filter(object::Column::Oid.is_in(internal_table_ids))
566 .exec(&txn)
567 .await?;
568 }
569 if let Some(t) = &table_obj
570 && let Some(source_id) = t.optional_associated_source_id
571 {
572 Object::delete_by_id(source_id).exec(&txn).await?;
573 }
574
575 let err = if is_cancelled {
576 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
577 } else {
578 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
579 };
580 let abort_reason = format!("streaing job aborted {}", err.as_report());
581 for tx in inner
582 .creating_table_finish_notifier
583 .get_mut(&database_id)
584 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
585 .into_iter()
586 .flatten()
587 .flatten()
588 {
589 let _ = tx.send(Err(abort_reason.clone()));
590 }
591 txn.commit().await?;
592
593 if !objs.is_empty() {
594 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
597 .await;
598 }
599 Ok((true, Some(database_id)))
600 }
601
602 pub async fn post_collect_job_fragments(
603 &self,
604 job_id: ObjectId,
605 actor_ids: Vec<crate::model::ActorId>,
606 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
607 split_assignment: &SplitAssignment,
608 ) -> MetaResult<()> {
609 self.post_collect_job_fragments_inner(
610 job_id,
611 actor_ids,
612 upstream_fragment_new_downstreams,
613 split_assignment,
614 false,
615 )
616 .await
617 }
618
619 pub async fn post_collect_job_fragments_inner(
620 &self,
621 job_id: ObjectId,
622 actor_ids: Vec<crate::model::ActorId>,
623 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
624 split_assignment: &SplitAssignment,
625 is_mv: bool,
626 ) -> MetaResult<()> {
627 let inner = self.inner.write().await;
628 let txn = inner.db.begin().await?;
629
630 Actor::update_many()
631 .col_expr(
632 actor::Column::Status,
633 SimpleExpr::from(ActorStatus::Running.into_value()),
634 )
635 .filter(
636 actor::Column::ActorId
637 .is_in(actor_ids.into_iter().map(|id| id as ActorId).collect_vec()),
638 )
639 .exec(&txn)
640 .await?;
641
642 for splits in split_assignment.values() {
643 for (actor_id, splits) in splits {
644 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
645 let connector_splits = &PbConnectorSplits { splits };
646 actor::ActiveModel {
647 actor_id: Set(*actor_id as _),
648 splits: Set(Some(connector_splits.into())),
649 ..Default::default()
650 }
651 .update(&txn)
652 .await?;
653 }
654 }
655
656 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
657
658 streaming_job::ActiveModel {
660 job_id: Set(job_id),
661 job_status: Set(JobStatus::Creating),
662 ..Default::default()
663 }
664 .update(&txn)
665 .await?;
666
667 let fragment_mapping = if is_mv {
668 get_fragment_mappings(&txn, job_id as _).await?
669 } else {
670 vec![]
671 };
672
673 txn.commit().await?;
674 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
675 .await;
676
677 Ok(())
678 }
679
680 pub async fn create_job_catalog_for_replace(
681 &self,
682 streaming_job: &StreamingJob,
683 ctx: &StreamContext,
684 specified_parallelism: &Option<NonZeroUsize>,
685 max_parallelism: usize,
686 ) -> MetaResult<ObjectId> {
687 let id = streaming_job.id();
688 let inner = self.inner.write().await;
689 let txn = inner.db.begin().await?;
690
691 streaming_job.verify_version_for_replace(&txn).await?;
693 let referring_cnt = ObjectDependency::find()
695 .join(
696 JoinType::InnerJoin,
697 object_dependency::Relation::Object1.def(),
698 )
699 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
700 .filter(
701 object_dependency::Column::Oid
702 .eq(id as ObjectId)
703 .and(object::Column::ObjType.eq(ObjectType::Table))
704 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
705 )
706 .count(&txn)
707 .await?;
708 if referring_cnt != 0 {
709 return Err(MetaError::permission_denied(
710 "job is being altered or referenced by some creating jobs",
711 ));
712 }
713
714 let original_max_parallelism: i32 = StreamingJobModel::find_by_id(id as ObjectId)
716 .select_only()
717 .column(streaming_job::Column::MaxParallelism)
718 .into_tuple()
719 .one(&txn)
720 .await?
721 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
722
723 if original_max_parallelism != max_parallelism as i32 {
724 bail!(
727 "cannot use a different max parallelism \
728 when replacing streaming job, \
729 original: {}, new: {}",
730 original_max_parallelism,
731 max_parallelism
732 );
733 }
734
735 let parallelism = match specified_parallelism {
736 None => StreamingParallelism::Adaptive,
737 Some(n) => StreamingParallelism::Fixed(n.get() as _),
738 };
739
740 let new_obj_id = Self::create_streaming_job_obj(
742 &txn,
743 streaming_job.object_type(),
744 streaming_job.owner() as _,
745 Some(streaming_job.database_id() as _),
746 Some(streaming_job.schema_id() as _),
747 streaming_job.create_type(),
748 ctx,
749 parallelism,
750 max_parallelism,
751 None,
752 )
753 .await?;
754
755 ObjectDependency::insert(object_dependency::ActiveModel {
757 oid: Set(id as _),
758 used_by: Set(new_obj_id as _),
759 ..Default::default()
760 })
761 .exec(&txn)
762 .await?;
763
764 txn.commit().await?;
765
766 Ok(new_obj_id)
767 }
768
769 pub async fn finish_streaming_job(
771 &self,
772 job_id: ObjectId,
773 replace_stream_job_info: Option<ReplaceStreamJobPlan>,
774 ) -> MetaResult<()> {
775 let mut inner = self.inner.write().await;
776 let txn = inner.db.begin().await?;
777
778 let job_type = Object::find_by_id(job_id)
779 .select_only()
780 .column(object::Column::ObjType)
781 .into_tuple()
782 .one(&txn)
783 .await?
784 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
785
786 let res = Object::update_many()
788 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
789 .col_expr(
790 object::Column::CreatedAtClusterVersion,
791 current_cluster_version().into(),
792 )
793 .filter(object::Column::Oid.eq(job_id))
794 .exec(&txn)
795 .await?;
796 if res.rows_affected == 0 {
797 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
798 }
799
800 let job = streaming_job::ActiveModel {
802 job_id: Set(job_id),
803 job_status: Set(JobStatus::Created),
804 ..Default::default()
805 };
806 job.update(&txn).await?;
807
808 let internal_table_objs = Table::find()
810 .find_also_related(Object)
811 .filter(table::Column::BelongsToJobId.eq(job_id))
812 .all(&txn)
813 .await?;
814 let mut objects = internal_table_objs
815 .iter()
816 .map(|(table, obj)| PbObject {
817 object_info: Some(PbObjectInfo::Table(
818 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
819 )),
820 })
821 .collect_vec();
822 let mut notification_op = NotificationOperation::Add;
823
824 match job_type {
825 ObjectType::Table => {
826 let (table, obj) = Table::find_by_id(job_id)
827 .find_also_related(Object)
828 .one(&txn)
829 .await?
830 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
831 if table.table_type == TableType::MaterializedView {
832 notification_op = NotificationOperation::Update;
833 }
834
835 if let Some(source_id) = table.optional_associated_source_id {
836 let (src, obj) = Source::find_by_id(source_id)
837 .find_also_related(Object)
838 .one(&txn)
839 .await?
840 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
841 objects.push(PbObject {
842 object_info: Some(PbObjectInfo::Source(
843 ObjectModel(src, obj.unwrap()).into(),
844 )),
845 });
846 }
847 objects.push(PbObject {
848 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
849 });
850 }
851 ObjectType::Sink => {
852 let (sink, obj) = Sink::find_by_id(job_id)
853 .find_also_related(Object)
854 .one(&txn)
855 .await?
856 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
857 objects.push(PbObject {
858 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
859 });
860 }
861 ObjectType::Index => {
862 let (index, obj) = Index::find_by_id(job_id)
863 .find_also_related(Object)
864 .one(&txn)
865 .await?
866 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
867 {
868 let (table, obj) = Table::find_by_id(index.index_table_id)
869 .find_also_related(Object)
870 .one(&txn)
871 .await?
872 .ok_or_else(|| {
873 MetaError::catalog_id_not_found("table", index.index_table_id)
874 })?;
875 objects.push(PbObject {
876 object_info: Some(PbObjectInfo::Table(
877 ObjectModel(table, obj.unwrap()).into(),
878 )),
879 });
880 }
881 objects.push(PbObject {
882 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
883 });
884 }
885 ObjectType::Source => {
886 let (source, obj) = Source::find_by_id(job_id)
887 .find_also_related(Object)
888 .one(&txn)
889 .await?
890 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
891 objects.push(PbObject {
892 object_info: Some(PbObjectInfo::Source(
893 ObjectModel(source, obj.unwrap()).into(),
894 )),
895 });
896 }
897 _ => unreachable!("invalid job type: {:?}", job_type),
898 }
899
900 let fragment_mapping = get_fragment_mappings(&txn, job_id).await?;
901
902 let replace_table_mapping_update = match replace_stream_job_info {
903 Some(ReplaceStreamJobPlan {
904 streaming_job,
905 replace_upstream,
906 tmp_id,
907 ..
908 }) => {
909 let incoming_sink_id = job_id;
910
911 let (relations, fragment_mapping, _) = Self::finish_replace_streaming_job_inner(
912 tmp_id as ObjectId,
913 replace_upstream,
914 None,
915 SinkIntoTableContext {
916 creating_sink_id: Some(incoming_sink_id as _),
917 dropping_sink_id: None,
918 updated_sink_catalogs: vec![],
919 },
920 &txn,
921 streaming_job,
922 None, )
924 .await?;
925
926 Some((relations, fragment_mapping))
927 }
928 None => None,
929 };
930
931 txn.commit().await?;
932
933 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
934 .await;
935
936 let mut version = self
937 .notify_frontend(
938 notification_op,
939 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
940 )
941 .await;
942
943 if let Some((objects, fragment_mapping)) = replace_table_mapping_update {
944 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
945 .await;
946 version = self
947 .notify_frontend(
948 NotificationOperation::Update,
949 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
950 )
951 .await;
952 }
953 inner
954 .creating_table_finish_notifier
955 .values_mut()
956 .for_each(|creating_tables| {
957 if let Some(txs) = creating_tables.remove(&job_id) {
958 for tx in txs {
959 let _ = tx.send(Ok(version));
960 }
961 }
962 });
963
964 Ok(())
965 }
966
967 pub async fn finish_replace_streaming_job(
968 &self,
969 tmp_id: ObjectId,
970 streaming_job: StreamingJob,
971 replace_upstream: FragmentReplaceUpstream,
972 col_index_mapping: Option<ColIndexMapping>,
973 sink_into_table_context: SinkIntoTableContext,
974 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
975 ) -> MetaResult<NotificationVersion> {
976 let inner = self.inner.write().await;
977 let txn = inner.db.begin().await?;
978
979 let (objects, fragment_mapping, delete_notification_objs) =
980 Self::finish_replace_streaming_job_inner(
981 tmp_id,
982 replace_upstream,
983 col_index_mapping,
984 sink_into_table_context,
985 &txn,
986 streaming_job,
987 drop_table_connector_ctx,
988 )
989 .await?;
990
991 txn.commit().await?;
992
993 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
999 .await;
1000 let mut version = self
1001 .notify_frontend(
1002 NotificationOperation::Update,
1003 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1004 )
1005 .await;
1006
1007 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1008 self.notify_users_update(user_infos).await;
1009 version = self
1010 .notify_frontend(
1011 NotificationOperation::Delete,
1012 build_object_group_for_delete(to_drop_objects),
1013 )
1014 .await;
1015 }
1016
1017 Ok(version)
1018 }
1019
1020 pub async fn finish_replace_streaming_job_inner(
1021 tmp_id: ObjectId,
1022 replace_upstream: FragmentReplaceUpstream,
1023 col_index_mapping: Option<ColIndexMapping>,
1024 SinkIntoTableContext {
1025 creating_sink_id,
1026 dropping_sink_id,
1027 updated_sink_catalogs,
1028 }: SinkIntoTableContext,
1029 txn: &DatabaseTransaction,
1030 streaming_job: StreamingJob,
1031 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1032 ) -> MetaResult<(
1033 Vec<PbObject>,
1034 Vec<PbFragmentWorkerSlotMapping>,
1035 Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1036 )> {
1037 let original_job_id = streaming_job.id() as ObjectId;
1038 let job_type = streaming_job.job_type();
1039
1040 match streaming_job {
1042 StreamingJob::Table(_source, table, _table_job_type) => {
1043 let original_table_catalogs = Table::find_by_id(original_job_id)
1046 .select_only()
1047 .columns([table::Column::Columns])
1048 .into_tuple::<ColumnCatalogArray>()
1049 .one(txn)
1050 .await?
1051 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1052
1053 for sink_id in updated_sink_catalogs {
1055 sink::ActiveModel {
1056 sink_id: Set(sink_id as _),
1057 original_target_columns: Set(Some(original_table_catalogs.clone())),
1058 ..Default::default()
1059 }
1060 .update(txn)
1061 .await?;
1062 }
1063 let mut table = table::ActiveModel::from(table);
1065 let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
1066 if let Some(sink_id) = creating_sink_id {
1067 debug_assert!(!incoming_sinks.contains(&{ sink_id }));
1068 incoming_sinks.push(sink_id as _);
1069 }
1070 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1071 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1072 {
1073 table.optional_associated_source_id = Set(None);
1075 }
1076
1077 if let Some(sink_id) = dropping_sink_id {
1078 let drained = incoming_sinks
1079 .extract_if(.., |id| *id == sink_id)
1080 .collect_vec();
1081 debug_assert_eq!(drained, vec![sink_id]);
1082 }
1083
1084 table.incoming_sinks = Set(incoming_sinks.into());
1085 table.update(txn).await?;
1086 }
1087 StreamingJob::Source(source) => {
1088 let source = source::ActiveModel::from(source);
1090 source.update(txn).await?;
1091 }
1092 _ => unreachable!(
1093 "invalid streaming job type: {:?}",
1094 streaming_job.job_type_str()
1095 ),
1096 }
1097
1098 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1102 .select_only()
1103 .columns([
1104 fragment::Column::FragmentId,
1105 fragment::Column::StateTableIds,
1106 ])
1107 .filter(fragment::Column::JobId.eq(tmp_id))
1108 .into_tuple()
1109 .all(txn)
1110 .await?;
1111 for (fragment_id, state_table_ids) in fragment_info {
1112 for state_table_id in state_table_ids.into_inner() {
1113 table::ActiveModel {
1114 table_id: Set(state_table_id as _),
1115 fragment_id: Set(Some(fragment_id)),
1116 ..Default::default()
1118 }
1119 .update(txn)
1120 .await?;
1121 }
1122 }
1123
1124 Fragment::delete_many()
1126 .filter(fragment::Column::JobId.eq(original_job_id))
1127 .exec(txn)
1128 .await?;
1129 Fragment::update_many()
1130 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1131 .filter(fragment::Column::JobId.eq(tmp_id))
1132 .exec(txn)
1133 .await?;
1134
1135 for (fragment_id, fragment_replace_map) in replace_upstream {
1138 let (fragment_id, mut stream_node) = Fragment::find_by_id(fragment_id as FragmentId)
1139 .select_only()
1140 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1141 .into_tuple::<(FragmentId, StreamNode)>()
1142 .one(txn)
1143 .await?
1144 .map(|(id, node)| (id, node.to_protobuf()))
1145 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1146
1147 visit_stream_node_mut(&mut stream_node, |body| {
1148 if let PbNodeBody::Merge(m) = body
1149 && let Some(new_fragment_id) = fragment_replace_map.get(&m.upstream_fragment_id)
1150 {
1151 m.upstream_fragment_id = *new_fragment_id;
1152 }
1153 });
1154 fragment::ActiveModel {
1155 fragment_id: Set(fragment_id),
1156 stream_node: Set(StreamNode::from(&stream_node)),
1157 ..Default::default()
1158 }
1159 .update(txn)
1160 .await?;
1161 }
1162
1163 Object::delete_by_id(tmp_id).exec(txn).await?;
1165
1166 let mut objects = vec![];
1168 match job_type {
1169 StreamingJobType::Table(_) => {
1170 let (table, table_obj) = Table::find_by_id(original_job_id)
1171 .find_also_related(Object)
1172 .one(txn)
1173 .await?
1174 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1175 objects.push(PbObject {
1176 object_info: Some(PbObjectInfo::Table(
1177 ObjectModel(table, table_obj.unwrap()).into(),
1178 )),
1179 })
1180 }
1181 StreamingJobType::Source => {
1182 let (source, source_obj) = Source::find_by_id(original_job_id)
1183 .find_also_related(Object)
1184 .one(txn)
1185 .await?
1186 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1187 objects.push(PbObject {
1188 object_info: Some(PbObjectInfo::Source(
1189 ObjectModel(source, source_obj.unwrap()).into(),
1190 )),
1191 })
1192 }
1193 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1194 }
1195 if let Some(table_col_index_mapping) = col_index_mapping {
1196 let expr_rewriter = ReplaceTableExprRewriter {
1197 table_col_index_mapping,
1198 };
1199
1200 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1201 .select_only()
1202 .columns([index::Column::IndexId, index::Column::IndexItems])
1203 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1204 .into_tuple()
1205 .all(txn)
1206 .await?;
1207 for (index_id, nodes) in index_items {
1208 let mut pb_nodes = nodes.to_protobuf();
1209 pb_nodes
1210 .iter_mut()
1211 .for_each(|x| expr_rewriter.rewrite_expr(x));
1212 let index = index::ActiveModel {
1213 index_id: Set(index_id),
1214 index_items: Set(pb_nodes.into()),
1215 ..Default::default()
1216 }
1217 .update(txn)
1218 .await?;
1219 let index_obj = index
1220 .find_related(Object)
1221 .one(txn)
1222 .await?
1223 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1224 objects.push(PbObject {
1225 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1226 });
1227 }
1228 }
1229
1230 let fragment_mapping: Vec<_> = get_fragment_mappings(txn, original_job_id as _).await?;
1231
1232 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1233 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1234 notification_objs =
1235 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1236 }
1237
1238 Ok((objects, fragment_mapping, notification_objs))
1239 }
1240
1241 pub async fn try_abort_replacing_streaming_job(&self, tmp_job_id: ObjectId) -> MetaResult<()> {
1243 let inner = self.inner.write().await;
1244 Object::delete_by_id(tmp_job_id).exec(&inner.db).await?;
1245 Ok(())
1246 }
1247
1248 pub async fn update_source_rate_limit_by_source_id(
1251 &self,
1252 source_id: SourceId,
1253 rate_limit: Option<u32>,
1254 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1255 let inner = self.inner.read().await;
1256 let txn = inner.db.begin().await?;
1257
1258 {
1259 let active_source = source::ActiveModel {
1260 source_id: Set(source_id),
1261 rate_limit: Set(rate_limit.map(|v| v as i32)),
1262 ..Default::default()
1263 };
1264 active_source.update(&txn).await?;
1265 }
1266
1267 let (source, obj) = Source::find_by_id(source_id)
1268 .find_also_related(Object)
1269 .one(&txn)
1270 .await?
1271 .ok_or_else(|| {
1272 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1273 })?;
1274
1275 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1276 let streaming_job_ids: Vec<ObjectId> =
1277 if let Some(table_id) = source.optional_associated_table_id {
1278 vec![table_id]
1279 } else if let Some(source_info) = &source.source_info
1280 && source_info.to_protobuf().is_shared()
1281 {
1282 vec![source_id]
1283 } else {
1284 ObjectDependency::find()
1285 .select_only()
1286 .column(object_dependency::Column::UsedBy)
1287 .filter(object_dependency::Column::Oid.eq(source_id))
1288 .into_tuple()
1289 .all(&txn)
1290 .await?
1291 };
1292
1293 if streaming_job_ids.is_empty() {
1294 return Err(MetaError::invalid_parameter(format!(
1295 "source id {source_id} not used by any streaming job"
1296 )));
1297 }
1298
1299 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1300 .select_only()
1301 .columns([
1302 fragment::Column::FragmentId,
1303 fragment::Column::FragmentTypeMask,
1304 fragment::Column::StreamNode,
1305 ])
1306 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1307 .into_tuple()
1308 .all(&txn)
1309 .await?;
1310 let mut fragments = fragments
1311 .into_iter()
1312 .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1313 .collect_vec();
1314
1315 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1316 let mut found = false;
1317 if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
1318 visit_stream_node_mut(stream_node, |node| {
1319 if let PbNodeBody::Source(node) = node {
1320 if let Some(node_inner) = &mut node.source_inner
1321 && node_inner.source_id == source_id as u32
1322 {
1323 node_inner.rate_limit = rate_limit;
1324 found = true;
1325 }
1326 }
1327 });
1328 }
1329 if is_fs_source {
1330 visit_stream_node_mut(stream_node, |node| {
1333 if let PbNodeBody::StreamFsFetch(node) = node {
1334 *fragment_type_mask |= PbFragmentTypeFlag::FsFetch as i32;
1335 if let Some(node_inner) = &mut node.node_inner
1336 && node_inner.source_id == source_id as u32
1337 {
1338 node_inner.rate_limit = rate_limit;
1339 found = true;
1340 }
1341 }
1342 });
1343 }
1344 found
1345 });
1346
1347 assert!(
1348 !fragments.is_empty(),
1349 "source id should be used by at least one fragment"
1350 );
1351 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1352 for (id, fragment_type_mask, stream_node) in fragments {
1353 fragment::ActiveModel {
1354 fragment_id: Set(id),
1355 fragment_type_mask: Set(fragment_type_mask),
1356 stream_node: Set(StreamNode::from(&stream_node)),
1357 ..Default::default()
1358 }
1359 .update(&txn)
1360 .await?;
1361 }
1362 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1363
1364 txn.commit().await?;
1365
1366 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1367 let _version = self
1368 .notify_frontend(
1369 NotificationOperation::Update,
1370 NotificationInfo::ObjectGroup(PbObjectGroup {
1371 objects: vec![PbObject {
1372 object_info: Some(relation_info),
1373 }],
1374 }),
1375 )
1376 .await;
1377
1378 Ok(fragment_actors)
1379 }
1380
1381 async fn mutate_fragments_by_job_id(
1384 &self,
1385 job_id: ObjectId,
1386 mut fragments_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1388 err_msg: &'static str,
1390 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1391 let inner = self.inner.read().await;
1392 let txn = inner.db.begin().await?;
1393
1394 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1395 .select_only()
1396 .columns([
1397 fragment::Column::FragmentId,
1398 fragment::Column::FragmentTypeMask,
1399 fragment::Column::StreamNode,
1400 ])
1401 .filter(fragment::Column::JobId.eq(job_id))
1402 .into_tuple()
1403 .all(&txn)
1404 .await?;
1405 let mut fragments = fragments
1406 .into_iter()
1407 .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1408 .collect_vec();
1409
1410 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1411 fragments_mutation_fn(fragment_type_mask, stream_node)
1412 });
1413 if fragments.is_empty() {
1414 return Err(MetaError::invalid_parameter(format!(
1415 "job id {job_id}: {}",
1416 err_msg
1417 )));
1418 }
1419
1420 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1421 for (id, _, stream_node) in fragments {
1422 fragment::ActiveModel {
1423 fragment_id: Set(id),
1424 stream_node: Set(StreamNode::from(&stream_node)),
1425 ..Default::default()
1426 }
1427 .update(&txn)
1428 .await?;
1429 }
1430 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1431
1432 txn.commit().await?;
1433
1434 Ok(fragment_actors)
1435 }
1436
1437 async fn mutate_fragment_by_fragment_id(
1438 &self,
1439 fragment_id: FragmentId,
1440 mut fragment_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1441 err_msg: &'static str,
1442 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1443 let inner = self.inner.read().await;
1444 let txn = inner.db.begin().await?;
1445
1446 let (mut fragment_type_mask, stream_node): (i32, StreamNode) =
1447 Fragment::find_by_id(fragment_id)
1448 .select_only()
1449 .columns([
1450 fragment::Column::FragmentTypeMask,
1451 fragment::Column::StreamNode,
1452 ])
1453 .into_tuple()
1454 .one(&txn)
1455 .await?
1456 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1457 let mut pb_stream_node = stream_node.to_protobuf();
1458
1459 if !fragment_mutation_fn(&mut fragment_type_mask, &mut pb_stream_node) {
1460 return Err(MetaError::invalid_parameter(format!(
1461 "fragment id {fragment_id}: {}",
1462 err_msg
1463 )));
1464 }
1465
1466 fragment::ActiveModel {
1467 fragment_id: Set(fragment_id),
1468 stream_node: Set(stream_node),
1469 ..Default::default()
1470 }
1471 .update(&txn)
1472 .await?;
1473
1474 let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1475
1476 txn.commit().await?;
1477
1478 Ok(fragment_actors)
1479 }
1480
1481 pub async fn update_backfill_rate_limit_by_job_id(
1484 &self,
1485 job_id: ObjectId,
1486 rate_limit: Option<u32>,
1487 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1488 let update_backfill_rate_limit =
1489 |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1490 let mut found = false;
1491 if *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0 {
1492 visit_stream_node_mut(stream_node, |node| match node {
1493 PbNodeBody::StreamCdcScan(node) => {
1494 node.rate_limit = rate_limit;
1495 found = true;
1496 }
1497 PbNodeBody::StreamScan(node) => {
1498 node.rate_limit = rate_limit;
1499 found = true;
1500 }
1501 PbNodeBody::SourceBackfill(node) => {
1502 node.rate_limit = rate_limit;
1503 found = true;
1504 }
1505 PbNodeBody::Sink(node) => {
1506 node.rate_limit = rate_limit;
1507 found = true;
1508 }
1509 _ => {}
1510 });
1511 }
1512 found
1513 };
1514
1515 self.mutate_fragments_by_job_id(
1516 job_id,
1517 update_backfill_rate_limit,
1518 "stream scan node or source node not found",
1519 )
1520 .await
1521 }
1522
1523 pub async fn update_sink_rate_limit_by_job_id(
1526 &self,
1527 job_id: ObjectId,
1528 rate_limit: Option<u32>,
1529 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1530 let update_sink_rate_limit =
1531 |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1532 let mut found = false;
1533 if *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0 {
1534 visit_stream_node_mut(stream_node, |node| {
1535 if let PbNodeBody::Sink(node) = node {
1536 node.rate_limit = rate_limit;
1537 found = true;
1538 }
1539 });
1540 }
1541 found
1542 };
1543
1544 self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1545 .await
1546 }
1547
1548 pub async fn update_dml_rate_limit_by_job_id(
1549 &self,
1550 job_id: ObjectId,
1551 rate_limit: Option<u32>,
1552 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1553 let update_dml_rate_limit =
1554 |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1555 let mut found = false;
1556 if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0 {
1557 visit_stream_node_mut(stream_node, |node| {
1558 if let PbNodeBody::Dml(node) = node {
1559 node.rate_limit = rate_limit;
1560 found = true;
1561 }
1562 });
1563 }
1564 found
1565 };
1566
1567 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1568 .await
1569 }
1570
1571 pub async fn update_fragment_rate_limit_by_fragment_id(
1572 &self,
1573 fragment_id: FragmentId,
1574 rate_limit: Option<u32>,
1575 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1576 let update_rate_limit = |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1577 let mut found = false;
1578 if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0
1579 || *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0
1580 || *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0
1581 || *fragment_type_mask & PbFragmentTypeFlag::source_rate_limit_fragments() != 0
1582 {
1583 visit_stream_node_mut(stream_node, |node| {
1584 if let PbNodeBody::Dml(node) = node {
1585 node.rate_limit = rate_limit;
1586 found = true;
1587 }
1588 if let PbNodeBody::Sink(node) = node {
1589 node.rate_limit = rate_limit;
1590 found = true;
1591 }
1592 if let PbNodeBody::StreamCdcScan(node) = node {
1593 node.rate_limit = rate_limit;
1594 found = true;
1595 }
1596 if let PbNodeBody::StreamScan(node) = node {
1597 node.rate_limit = rate_limit;
1598 found = true;
1599 }
1600 if let PbNodeBody::SourceBackfill(node) = node {
1601 node.rate_limit = rate_limit;
1602 found = true;
1603 }
1604 });
1605 }
1606 found
1607 };
1608 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
1609 .await
1610 }
1611
1612 pub async fn post_apply_reschedules(
1613 &self,
1614 reschedules: HashMap<FragmentId, Reschedule>,
1615 post_updates: &JobReschedulePostUpdates,
1616 ) -> MetaResult<()> {
1617 let new_created_actors: HashSet<_> = reschedules
1618 .values()
1619 .flat_map(|reschedule| {
1620 reschedule
1621 .added_actors
1622 .values()
1623 .flatten()
1624 .map(|actor_id| *actor_id as ActorId)
1625 })
1626 .collect();
1627
1628 let inner = self.inner.write().await;
1629
1630 let txn = inner.db.begin().await?;
1631
1632 let mut fragment_mapping_to_notify = vec![];
1633
1634 for (
1635 fragment_id,
1636 Reschedule {
1637 removed_actors,
1638 vnode_bitmap_updates,
1639 actor_splits,
1640 newly_created_actors,
1641 ..
1642 },
1643 ) in reschedules
1644 {
1645 Actor::delete_many()
1647 .filter(
1648 actor::Column::ActorId
1649 .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
1650 )
1651 .exec(&txn)
1652 .await?;
1653
1654 for (
1656 (
1657 StreamActor {
1658 actor_id,
1659 fragment_id,
1660 vnode_bitmap,
1661 expr_context,
1662 ..
1663 },
1664 _,
1665 ),
1666 worker_id,
1667 ) in newly_created_actors.into_values()
1668 {
1669 let splits = actor_splits
1670 .get(&actor_id)
1671 .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
1672
1673 Actor::insert(actor::ActiveModel {
1674 actor_id: Set(actor_id as _),
1675 fragment_id: Set(fragment_id as _),
1676 status: Set(ActorStatus::Running),
1677 splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
1678 worker_id: Set(worker_id),
1679 upstream_actor_ids: Set(Default::default()),
1680 vnode_bitmap: Set(vnode_bitmap
1681 .as_ref()
1682 .map(|bitmap| (&bitmap.to_protobuf()).into())),
1683 expr_context: Set(expr_context.as_ref().unwrap().into()),
1684 })
1685 .exec(&txn)
1686 .await?;
1687 }
1688
1689 for (actor_id, bitmap) in vnode_bitmap_updates {
1691 let actor = Actor::find_by_id(actor_id as ActorId)
1692 .one(&txn)
1693 .await?
1694 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1695
1696 let mut actor = actor.into_active_model();
1697 actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
1698 actor.update(&txn).await?;
1699 }
1700
1701 for (actor_id, splits) in actor_splits {
1703 if new_created_actors.contains(&(actor_id as ActorId)) {
1704 continue;
1705 }
1706
1707 let actor = Actor::find_by_id(actor_id as ActorId)
1708 .one(&txn)
1709 .await?
1710 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1711
1712 let mut actor = actor.into_active_model();
1713 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
1714 actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
1715 actor.update(&txn).await?;
1716 }
1717
1718 let fragment = Fragment::find_by_id(fragment_id)
1720 .one(&txn)
1721 .await?
1722 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1723
1724 let job_actors = fragment
1725 .find_related(Actor)
1726 .all(&txn)
1727 .await?
1728 .into_iter()
1729 .map(|actor| {
1730 (
1731 fragment_id,
1732 fragment.distribution_type,
1733 actor.actor_id,
1734 actor.vnode_bitmap,
1735 actor.worker_id,
1736 actor.status,
1737 )
1738 })
1739 .collect_vec();
1740
1741 fragment_mapping_to_notify.extend(rebuild_fragment_mapping_from_actors(job_actors));
1742 }
1743
1744 let JobReschedulePostUpdates {
1745 parallelism_updates,
1746 resource_group_updates,
1747 } = post_updates;
1748
1749 for (table_id, parallelism) in parallelism_updates {
1750 let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
1751 .one(&txn)
1752 .await?
1753 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
1754 .into_active_model();
1755
1756 streaming_job.parallelism = Set(match parallelism {
1757 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
1758 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
1759 TableParallelism::Custom => StreamingParallelism::Custom,
1760 });
1761
1762 if let Some(resource_group) =
1763 resource_group_updates.get(&(table_id.table_id() as ObjectId))
1764 {
1765 streaming_job.specific_resource_group = Set(resource_group.to_owned());
1766 }
1767
1768 streaming_job.update(&txn).await?;
1769 }
1770
1771 txn.commit().await?;
1772 self.notify_fragment_mapping(Operation::Update, fragment_mapping_to_notify)
1773 .await;
1774
1775 Ok(())
1776 }
1777
1778 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
1781 let inner = self.inner.read().await;
1782 let txn = inner.db.begin().await?;
1783
1784 let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
1785 .select_only()
1786 .columns([
1787 fragment::Column::FragmentId,
1788 fragment::Column::JobId,
1789 fragment::Column::FragmentTypeMask,
1790 fragment::Column::StreamNode,
1791 ])
1792 .filter(fragment_type_mask_intersects(
1793 PbFragmentTypeFlag::rate_limit_fragments(),
1794 ))
1795 .into_tuple()
1796 .all(&txn)
1797 .await?;
1798
1799 let mut rate_limits = Vec::new();
1800 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
1801 let stream_node = stream_node.to_protobuf();
1802 let mut rate_limit = None;
1803 let mut node_name = None;
1804
1805 visit_stream_node(&stream_node, |node| {
1806 match node {
1807 PbNodeBody::Source(node) => {
1809 if let Some(node_inner) = &node.source_inner {
1810 debug_assert!(
1811 rate_limit.is_none(),
1812 "one fragment should only have 1 rate limit node"
1813 );
1814 rate_limit = node_inner.rate_limit;
1815 node_name = Some("SOURCE");
1816 }
1817 }
1818 PbNodeBody::StreamFsFetch(node) => {
1819 if let Some(node_inner) = &node.node_inner {
1820 debug_assert!(
1821 rate_limit.is_none(),
1822 "one fragment should only have 1 rate limit node"
1823 );
1824 rate_limit = node_inner.rate_limit;
1825 node_name = Some("FS_FETCH");
1826 }
1827 }
1828 PbNodeBody::SourceBackfill(node) => {
1830 debug_assert!(
1831 rate_limit.is_none(),
1832 "one fragment should only have 1 rate limit node"
1833 );
1834 rate_limit = node.rate_limit;
1835 node_name = Some("SOURCE_BACKFILL");
1836 }
1837 PbNodeBody::StreamScan(node) => {
1838 debug_assert!(
1839 rate_limit.is_none(),
1840 "one fragment should only have 1 rate limit node"
1841 );
1842 rate_limit = node.rate_limit;
1843 node_name = Some("STREAM_SCAN");
1844 }
1845 PbNodeBody::StreamCdcScan(node) => {
1846 debug_assert!(
1847 rate_limit.is_none(),
1848 "one fragment should only have 1 rate limit node"
1849 );
1850 rate_limit = node.rate_limit;
1851 node_name = Some("STREAM_CDC_SCAN");
1852 }
1853 PbNodeBody::Sink(node) => {
1854 debug_assert!(
1855 rate_limit.is_none(),
1856 "one fragment should only have 1 rate limit node"
1857 );
1858 rate_limit = node.rate_limit;
1859 node_name = Some("SINK");
1860 }
1861 _ => {}
1862 }
1863 });
1864
1865 if let Some(rate_limit) = rate_limit {
1866 rate_limits.push(RateLimitInfo {
1867 fragment_id: fragment_id as u32,
1868 job_id: job_id as u32,
1869 fragment_type_mask: fragment_type_mask as u32,
1870 rate_limit,
1871 node_name: node_name.unwrap().to_owned(),
1872 });
1873 }
1874 }
1875
1876 Ok(rate_limits)
1877 }
1878}
1879
1880fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr {
1881 column
1882 .binary(BinOper::Custom("&"), value)
1883 .binary(BinOper::NotEqual, 0)
1884}
1885
1886fn fragment_type_mask_intersects(value: i32) -> SimpleExpr {
1887 bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value)
1888}
1889
1890pub struct SinkIntoTableContext {
1891 pub creating_sink_id: Option<SinkId>,
1893 pub dropping_sink_id: Option<SinkId>,
1895 pub updated_sink_catalogs: Vec<SinkId>,
1898}