1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
25use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
26use risingwave_connector::source::SplitImpl;
27use risingwave_meta_model::actor::ActorStatus;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::object::ObjectType;
30use risingwave_meta_model::prelude::{
31 Actor, Fragment as FragmentModel, FragmentRelation, Sink, StreamingJob,
32};
33use risingwave_meta_model::{
34 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
35 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
36 VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
37 streaming_job, table,
38};
39use risingwave_meta_model_migration::{Alias, SelectStatement};
40use risingwave_pb::common::PbActorLocation;
41use risingwave_pb::meta::subscribe_response::{
42 Info as NotificationInfo, Operation as NotificationOperation,
43};
44use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
45use risingwave_pb::meta::table_fragments::fragment::{
46 FragmentDistributionType, PbFragmentDistributionType,
47};
48use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
49use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
50use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
51use risingwave_pb::stream_plan::stream_node::NodeBody;
52use risingwave_pb::stream_plan::{
53 PbDispatcherType, PbFragmentTypeFlag, PbStreamContext, PbStreamNode, PbStreamScanType,
54 StreamScanType,
55};
56use sea_orm::ActiveValue::Set;
57use sea_orm::sea_query::Expr;
58use sea_orm::{
59 ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
60 QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
61};
62use serde::{Deserialize, Serialize};
63use tracing::debug;
64
65use crate::barrier::SnapshotBackfillInfo;
66use crate::controller::catalog::{CatalogController, CatalogControllerInner};
67use crate::controller::scale::resolve_streaming_job_definition;
68use crate::controller::utils::{
69 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
70 get_fragment_mappings, rebuild_fragment_mapping_from_actors,
71 resolve_no_shuffle_actor_dispatcher,
72};
73use crate::manager::LocalNotification;
74use crate::model::{
75 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
76 StreamActor, StreamContext, StreamJobFragments, TableParallelism,
77};
78use crate::stream::{SplitAssignment, build_actor_split_impls};
79use crate::{MetaError, MetaResult, model};
80
81#[derive(Clone, Debug)]
83pub struct InflightActorInfo {
84 pub worker_id: WorkerId,
85 pub vnode_bitmap: Option<Bitmap>,
86}
87
88#[derive(Clone, Debug)]
89pub struct InflightFragmentInfo {
90 pub fragment_id: crate::model::FragmentId,
91 pub distribution_type: DistributionType,
92 pub nodes: PbStreamNode,
93 pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
94 pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
95}
96
97#[derive(Clone, Debug)]
98pub struct FragmentParallelismInfo {
99 pub distribution_type: FragmentDistributionType,
100 pub actor_count: usize,
101 pub vnode_count: usize,
102}
103
104#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
105#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
107 pub job_id: ObjectId,
108 pub obj_type: ObjectType,
109 pub name: String,
110 pub job_status: JobStatus,
111 pub parallelism: StreamingParallelism,
112 pub max_parallelism: i32,
113 pub resource_group: String,
114}
115
116impl CatalogControllerInner {
117 pub async fn all_running_fragment_mappings(
119 &self,
120 ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
121 let txn = self.db.begin().await?;
122
123 let job_ids: Vec<ObjectId> = StreamingJob::find()
124 .select_only()
125 .column(streaming_job::Column::JobId)
126 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
127 .into_tuple()
128 .all(&txn)
129 .await?;
130
131 let mut result = vec![];
132 for job_id in job_ids {
133 let mappings = get_fragment_mappings(&txn, job_id).await?;
134
135 result.extend(mappings.into_iter());
136 }
137
138 Ok(result.into_iter())
139 }
140}
141
142impl CatalogController {
143 pub(crate) async fn notify_fragment_mapping(
144 &self,
145 operation: NotificationOperation,
146 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
147 ) {
148 let fragment_ids = fragment_mappings
149 .iter()
150 .map(|mapping| mapping.fragment_id)
151 .collect_vec();
152 for fragment_mapping in fragment_mappings {
154 self.env
155 .notification_manager()
156 .notify_frontend(
157 operation,
158 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
159 )
160 .await;
161 }
162
163 match operation {
165 NotificationOperation::Add | NotificationOperation::Update => {
166 self.env
167 .notification_manager()
168 .notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
169 fragment_ids,
170 ))
171 .await;
172 }
173 NotificationOperation::Delete => {
174 self.env
175 .notification_manager()
176 .notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
177 fragment_ids,
178 ))
179 .await;
180 }
181 op => {
182 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
183 }
184 }
185 }
186
187 pub fn extract_fragment_and_actors_from_fragments(
188 stream_job_fragments: &StreamJobFragments,
189 ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
190 stream_job_fragments
191 .fragments
192 .values()
193 .map(|fragment| {
194 Self::extract_fragment_and_actors_for_new_job(
195 stream_job_fragments.stream_job_id.table_id as _,
196 fragment,
197 &stream_job_fragments.actor_status,
198 &stream_job_fragments.actor_splits,
199 )
200 })
201 .try_collect()
202 }
203
204 fn extract_fragment_and_actors_for_new_job(
205 job_id: ObjectId,
206 fragment: &Fragment,
207 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
208 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
209 ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
210 let vnode_count = fragment.vnode_count();
211 let Fragment {
212 fragment_id: pb_fragment_id,
213 fragment_type_mask: pb_fragment_type_mask,
214 distribution_type: pb_distribution_type,
215 actors: pb_actors,
216 state_table_ids: pb_state_table_ids,
217 nodes,
218 ..
219 } = fragment;
220
221 let state_table_ids = pb_state_table_ids.clone().into();
222
223 assert!(!pb_actors.is_empty());
224
225 let stream_node = {
226 let mut stream_node = nodes.clone();
227 visit_stream_node_mut(&mut stream_node, |body| {
228 #[expect(deprecated)]
229 if let NodeBody::Merge(m) = body {
230 m.upstream_actor_id = vec![];
231 }
232 });
233
234 stream_node
235 };
236
237 let mut actors = vec![];
238
239 for actor in pb_actors {
240 let StreamActor {
241 actor_id,
242 fragment_id,
243 vnode_bitmap,
244 mview_definition: _,
245 expr_context: pb_expr_context,
246 ..
247 } = actor;
248
249 let splits = actor_splits.get(actor_id).map(|splits| {
250 ConnectorSplits::from(&PbConnectorSplits {
251 splits: splits.iter().map(ConnectorSplit::from).collect(),
252 })
253 });
254 let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
255 anyhow::anyhow!(
256 "actor {} in fragment {} has no actor_status",
257 actor_id,
258 fragment_id
259 )
260 })?;
261
262 let worker_id = status.worker_id() as _;
263
264 let pb_expr_context = pb_expr_context
265 .as_ref()
266 .expect("no expression context found");
267
268 #[expect(deprecated)]
269 actors.push(actor::Model {
270 actor_id: *actor_id as _,
271 fragment_id: *fragment_id as _,
272 status: status.get_state().unwrap().into(),
273 splits,
274 worker_id,
275 upstream_actor_ids: Default::default(),
276 vnode_bitmap: vnode_bitmap
277 .as_ref()
278 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
279 expr_context: ExprContext::from(pb_expr_context),
280 });
281 }
282
283 let stream_node = StreamNode::from(&stream_node);
284
285 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
286 .unwrap()
287 .into();
288
289 #[expect(deprecated)]
290 let fragment = fragment::Model {
291 fragment_id: *pb_fragment_id as _,
292 job_id,
293 fragment_type_mask: *pb_fragment_type_mask as _,
294 distribution_type,
295 stream_node,
296 state_table_ids,
297 upstream_fragment_id: Default::default(),
298 vnode_count: vnode_count as _,
299 };
300
301 Ok((fragment, actors))
302 }
303
304 pub fn compose_table_fragments(
305 table_id: u32,
306 state: PbState,
307 ctx: Option<PbStreamContext>,
308 fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
309 parallelism: StreamingParallelism,
310 max_parallelism: usize,
311 job_definition: Option<String>,
312 ) -> MetaResult<StreamJobFragments> {
313 let mut pb_fragments = BTreeMap::new();
314 let mut pb_actor_splits = HashMap::new();
315 let mut pb_actor_status = BTreeMap::new();
316
317 for (fragment, actors) in fragments {
318 let (fragment, fragment_actor_status, fragment_actor_splits) =
319 Self::compose_fragment(fragment, actors, job_definition.clone())?;
320
321 pb_fragments.insert(fragment.fragment_id, fragment);
322
323 pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits));
324 pb_actor_status.extend(fragment_actor_status.into_iter());
325 }
326
327 let table_fragments = StreamJobFragments {
328 stream_job_id: table_id.into(),
329 state: state as _,
330 fragments: pb_fragments,
331 actor_status: pb_actor_status,
332 actor_splits: pb_actor_splits,
333 ctx: ctx
334 .as_ref()
335 .map(StreamContext::from_protobuf)
336 .unwrap_or_default(),
337 assigned_parallelism: match parallelism {
338 StreamingParallelism::Custom => TableParallelism::Custom,
339 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
340 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
341 },
342 max_parallelism,
343 };
344
345 Ok(table_fragments)
346 }
347
348 #[allow(clippy::type_complexity)]
349 pub(crate) fn compose_fragment(
350 fragment: fragment::Model,
351 actors: Vec<actor::Model>,
352 job_definition: Option<String>,
353 ) -> MetaResult<(
354 Fragment,
355 HashMap<crate::model::ActorId, PbActorStatus>,
356 HashMap<crate::model::ActorId, PbConnectorSplits>,
357 )> {
358 let fragment::Model {
359 fragment_id,
360 fragment_type_mask,
361 distribution_type,
362 stream_node,
363 state_table_ids,
364 vnode_count,
365 ..
366 } = fragment;
367
368 let stream_node = stream_node.to_protobuf();
369 let mut upstream_fragments = HashSet::new();
370 visit_stream_node(&stream_node, |body| {
371 if let NodeBody::Merge(m) = body {
372 assert!(
373 upstream_fragments.insert(m.upstream_fragment_id),
374 "non-duplicate upstream fragment"
375 );
376 }
377 });
378
379 let mut pb_actors = vec![];
380
381 let mut pb_actor_status = HashMap::new();
382 let mut pb_actor_splits = HashMap::new();
383
384 for actor in actors {
385 if actor.fragment_id != fragment_id {
386 bail!(
387 "fragment id {} from actor {} is different from fragment {}",
388 actor.fragment_id,
389 actor.actor_id,
390 fragment_id
391 )
392 }
393
394 let actor::Model {
395 actor_id,
396 fragment_id,
397 status,
398 worker_id,
399 splits,
400 vnode_bitmap,
401 expr_context,
402 ..
403 } = actor;
404
405 let vnode_bitmap =
406 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
407 let pb_expr_context = Some(expr_context.to_protobuf());
408
409 pb_actor_status.insert(
410 actor_id as _,
411 PbActorStatus {
412 location: PbActorLocation::from_worker(worker_id as u32),
413 state: PbActorState::from(status) as _,
414 },
415 );
416
417 if let Some(splits) = splits {
418 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
419 }
420
421 pb_actors.push(StreamActor {
422 actor_id: actor_id as _,
423 fragment_id: fragment_id as _,
424 vnode_bitmap,
425 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
426 expr_context: pb_expr_context,
427 })
428 }
429
430 let pb_state_table_ids = state_table_ids.into_u32_array();
431 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
432 let pb_fragment = Fragment {
433 fragment_id: fragment_id as _,
434 fragment_type_mask: fragment_type_mask as _,
435 distribution_type: pb_distribution_type,
436 actors: pb_actors,
437 state_table_ids: pb_state_table_ids,
438 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
439 nodes: stream_node,
440 };
441
442 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
443 }
444
445 pub async fn running_fragment_parallelisms(
446 &self,
447 id_filter: Option<HashSet<FragmentId>>,
448 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
449 let inner = self.inner.read().await;
450
451 let query_alias = Alias::new("fragment_actor_count");
452 let count_alias = Alias::new("count");
453
454 let mut query = SelectStatement::new()
455 .column(actor::Column::FragmentId)
456 .expr_as(actor::Column::ActorId.count(), count_alias.clone())
457 .from(Actor)
458 .group_by_col(actor::Column::FragmentId)
459 .to_owned();
460
461 if let Some(id_filter) = id_filter {
462 query.cond_having(actor::Column::FragmentId.is_in(id_filter));
463 }
464
465 let outer = SelectStatement::new()
466 .column((FragmentModel, fragment::Column::FragmentId))
467 .column(count_alias)
468 .column(fragment::Column::DistributionType)
469 .column(fragment::Column::VnodeCount)
470 .from_subquery(query.to_owned(), query_alias.clone())
471 .inner_join(
472 FragmentModel,
473 Expr::col((query_alias, actor::Column::FragmentId))
474 .equals((FragmentModel, fragment::Column::FragmentId)),
475 )
476 .to_owned();
477
478 let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
479 Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
480 outer.to_owned(),
481 )
482 .all(&inner.db)
483 .await?;
484
485 Ok(fragment_parallelisms
486 .into_iter()
487 .map(|(fragment_id, count, distribution_type, vnode_count)| {
488 (
489 fragment_id,
490 FragmentParallelismInfo {
491 distribution_type: distribution_type.into(),
492 actor_count: count as usize,
493 vnode_count: vnode_count as usize,
494 },
495 )
496 })
497 .collect())
498 }
499
500 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
501 let inner = self.inner.read().await;
502 let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
503 .select_only()
504 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
505 .into_tuple()
506 .all(&inner.db)
507 .await?;
508 Ok(fragment_jobs.into_iter().collect())
509 }
510
511 pub async fn get_fragment_job_id(
512 &self,
513 fragment_ids: Vec<FragmentId>,
514 ) -> MetaResult<Vec<ObjectId>> {
515 let inner = self.inner.read().await;
516
517 let object_ids: Vec<ObjectId> = FragmentModel::find()
518 .select_only()
519 .column(fragment::Column::JobId)
520 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
521 .into_tuple()
522 .all(&inner.db)
523 .await?;
524
525 Ok(object_ids)
526 }
527
528 pub async fn get_fragment_desc_by_id(
529 &self,
530 fragment_id: FragmentId,
531 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
532 let inner = self.inner.read().await;
533
534 let fragment_opt = FragmentModel::find()
536 .select_only()
537 .columns([
538 fragment::Column::FragmentId,
539 fragment::Column::JobId,
540 fragment::Column::FragmentTypeMask,
541 fragment::Column::DistributionType,
542 fragment::Column::StateTableIds,
543 fragment::Column::VnodeCount,
544 fragment::Column::StreamNode,
545 ])
546 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
547 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
548 .filter(fragment::Column::FragmentId.eq(fragment_id))
549 .group_by(fragment::Column::FragmentId)
550 .into_model::<FragmentDesc>()
551 .one(&inner.db)
552 .await?;
553
554 let Some(fragment) = fragment_opt else {
555 return Ok(None); };
557
558 let upstreams: Vec<FragmentId> = FragmentRelation::find()
560 .select_only()
561 .column(fragment_relation::Column::SourceFragmentId)
562 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
563 .into_tuple()
564 .all(&inner.db)
565 .await?;
566
567 Ok(Some((fragment, upstreams)))
568 }
569
570 pub async fn list_fragment_database_ids(
571 &self,
572 select_fragment_ids: Option<Vec<FragmentId>>,
573 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
574 let inner = self.inner.read().await;
575 let select = FragmentModel::find()
576 .select_only()
577 .column(fragment::Column::FragmentId)
578 .column(object::Column::DatabaseId)
579 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
580 let select = if let Some(select_fragment_ids) = select_fragment_ids {
581 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
582 } else {
583 select
584 };
585 Ok(select.into_tuple().all(&inner.db).await?)
586 }
587
588 pub async fn get_job_fragments_by_id(
589 &self,
590 job_id: ObjectId,
591 ) -> MetaResult<StreamJobFragments> {
592 let inner = self.inner.read().await;
593 let fragment_actors = FragmentModel::find()
594 .find_with_related(Actor)
595 .filter(fragment::Column::JobId.eq(job_id))
596 .all(&inner.db)
597 .await?;
598
599 let job_info = StreamingJob::find_by_id(job_id)
600 .one(&inner.db)
601 .await?
602 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
603
604 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
605 .await?
606 .remove(&job_id);
607
608 Self::compose_table_fragments(
609 job_id as _,
610 job_info.job_status.into(),
611 job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
612 fragment_actors,
613 job_info.parallelism.clone(),
614 job_info.max_parallelism as _,
615 job_definition,
616 )
617 }
618
619 pub async fn get_fragment_actor_dispatchers(
620 &self,
621 fragment_ids: Vec<FragmentId>,
622 ) -> MetaResult<FragmentActorDispatchers> {
623 let inner = self.inner.read().await;
624 get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
625 }
626
627 pub async fn get_fragment_downstream_relations(
628 &self,
629 fragment_ids: Vec<FragmentId>,
630 ) -> MetaResult<FragmentDownstreamRelation> {
631 let inner = self.inner.read().await;
632 let mut stream = FragmentRelation::find()
633 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
634 .stream(&inner.db)
635 .await?;
636 let mut relations = FragmentDownstreamRelation::new();
637 while let Some(relation) = stream.try_next().await? {
638 relations
639 .entry(relation.source_fragment_id as _)
640 .or_default()
641 .push(DownstreamFragmentRelation {
642 downstream_fragment_id: relation.target_fragment_id as _,
643 dispatcher_type: relation.dispatcher_type,
644 dist_key_indices: relation.dist_key_indices.into_u32_array(),
645 output_indices: relation.output_indices.into_u32_array(),
646 });
647 }
648 Ok(relations)
649 }
650
651 pub async fn get_job_fragment_backfill_scan_type(
652 &self,
653 job_id: ObjectId,
654 ) -> MetaResult<HashMap<model::FragmentId, PbStreamScanType>> {
655 let inner = self.inner.read().await;
656 let fragments: Vec<_> = FragmentModel::find()
657 .filter(fragment::Column::JobId.eq(job_id))
658 .all(&inner.db)
659 .await?;
660
661 let mut result = HashMap::new();
662
663 for fragment::Model {
664 fragment_id,
665 stream_node,
666 ..
667 } in fragments
668 {
669 let stream_node = stream_node.to_protobuf();
670 visit_stream_node(&stream_node, |body| {
671 if let NodeBody::StreamScan(node) = body {
672 match node.stream_scan_type() {
673 StreamScanType::Unspecified => {}
674 scan_type => {
675 result.insert(fragment_id as model::FragmentId, scan_type);
676 }
677 }
678 }
679 });
680 }
681
682 Ok(result)
683 }
684
685 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
686 let inner = self.inner.read().await;
687 let job_states = StreamingJob::find()
688 .select_only()
689 .column(streaming_job::Column::JobId)
690 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
691 .join(JoinType::InnerJoin, object::Relation::Database2.def())
692 .column(object::Column::ObjType)
693 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
694 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
695 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
696 .column_as(
697 Expr::if_null(
698 Expr::col((table::Entity, table::Column::Name)),
699 Expr::if_null(
700 Expr::col((source::Entity, source::Column::Name)),
701 Expr::if_null(
702 Expr::col((sink::Entity, sink::Column::Name)),
703 Expr::val("<unknown>"),
704 ),
705 ),
706 ),
707 "name",
708 )
709 .columns([
710 streaming_job::Column::JobStatus,
711 streaming_job::Column::Parallelism,
712 streaming_job::Column::MaxParallelism,
713 ])
714 .column_as(
715 Expr::if_null(
716 Expr::col((
717 streaming_job::Entity,
718 streaming_job::Column::SpecificResourceGroup,
719 )),
720 Expr::col((database::Entity, database::Column::ResourceGroup)),
721 ),
722 "resource_group",
723 )
724 .into_model()
725 .all(&inner.db)
726 .await?;
727 Ok(job_states)
728 }
729
730 pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
731 let inner = self.inner.read().await;
732 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
733 .select_only()
734 .column(streaming_job::Column::MaxParallelism)
735 .into_tuple()
736 .one(&inner.db)
737 .await?
738 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
739 Ok(max_parallelism as usize)
740 }
741
742 pub async fn get_job_actor_mapping(
744 &self,
745 job_ids: Vec<ObjectId>,
746 ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
747 let inner = self.inner.read().await;
748 let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
749 .select_only()
750 .column(fragment::Column::JobId)
751 .column(actor::Column::ActorId)
752 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
753 .filter(fragment::Column::JobId.is_in(job_ids))
754 .into_tuple()
755 .all(&inner.db)
756 .await?;
757 Ok(job_actors.into_iter().into_group_map())
758 }
759
760 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
762 if let Ok(inner) = self.inner.try_read() {
763 if let Ok(job_state_tables) = FragmentModel::find()
764 .select_only()
765 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
766 .into_tuple::<(ObjectId, I32Array)>()
767 .all(&inner.db)
768 .await
769 {
770 let mut job_internal_table_ids = HashMap::new();
771 for (job_id, state_table_ids) in job_state_tables {
772 job_internal_table_ids
773 .entry(job_id)
774 .or_insert_with(Vec::new)
775 .extend(state_table_ids.into_inner());
776 }
777 return Some(job_internal_table_ids.into_iter().collect());
778 }
779 }
780 None
781 }
782
783 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
784 let inner = self.inner.read().await;
785 let count = FragmentModel::find().count(&inner.db).await?;
786 Ok(count > 0)
787 }
788
789 pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
790 let inner = self.inner.read().await;
791 let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
792 .select_only()
793 .column(actor::Column::WorkerId)
794 .column_as(actor::Column::ActorId.count(), "count")
795 .group_by(actor::Column::WorkerId)
796 .into_tuple()
797 .all(&inner.db)
798 .await?;
799
800 Ok(actor_cnt
801 .into_iter()
802 .map(|(worker_id, count)| (worker_id, count as usize))
803 .collect())
804 }
805
806 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
808 let inner = self.inner.read().await;
809 let jobs = StreamingJob::find().all(&inner.db).await?;
810
811 let mut job_definition = resolve_streaming_job_definition(
812 &inner.db,
813 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
814 )
815 .await?;
816
817 let mut table_fragments = BTreeMap::new();
818 for job in jobs {
819 let fragment_actors = FragmentModel::find()
820 .find_with_related(Actor)
821 .filter(fragment::Column::JobId.eq(job.job_id))
822 .all(&inner.db)
823 .await?;
824
825 table_fragments.insert(
826 job.job_id as ObjectId,
827 Self::compose_table_fragments(
828 job.job_id as _,
829 job.job_status.into(),
830 job.timezone.map(|tz| PbStreamContext { timezone: tz }),
831 fragment_actors,
832 job.parallelism.clone(),
833 job.max_parallelism as _,
834 job_definition.remove(&job.job_id),
835 )?,
836 );
837 }
838
839 Ok(table_fragments)
840 }
841
842 pub async fn upstream_fragments(
843 &self,
844 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
845 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
846 let inner = self.inner.read().await;
847 let mut stream = FragmentRelation::find()
848 .select_only()
849 .columns([
850 fragment_relation::Column::SourceFragmentId,
851 fragment_relation::Column::TargetFragmentId,
852 ])
853 .filter(
854 fragment_relation::Column::TargetFragmentId
855 .is_in(fragment_ids.map(|id| id as FragmentId)),
856 )
857 .into_tuple::<(FragmentId, FragmentId)>()
858 .stream(&inner.db)
859 .await?;
860 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
861 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
862 upstream_fragments
863 .entry(downstream_fragment_id as crate::model::FragmentId)
864 .or_default()
865 .insert(upstream_fragment_id as crate::model::FragmentId);
866 }
867 Ok(upstream_fragments)
868 }
869
870 pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
871 let inner = self.inner.read().await;
872 let actor_locations: Vec<PartialActorLocation> =
873 Actor::find().into_partial_model().all(&inner.db).await?;
874 Ok(actor_locations)
875 }
876
877 pub async fn list_actor_info(
878 &self,
879 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
880 let inner = self.inner.read().await;
881 let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
882 Actor::find()
883 .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
884 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
885 .select_only()
886 .columns([actor::Column::ActorId, actor::Column::FragmentId])
887 .column_as(object::Column::Oid, "job_id")
888 .column_as(object::Column::SchemaId, "schema_id")
889 .column_as(object::Column::ObjType, "type")
890 .into_tuple()
891 .all(&inner.db)
892 .await?;
893 Ok(actor_locations)
894 }
895
896 pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
897 let inner = self.inner.read().await;
898
899 let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
900 .select_only()
901 .filter(actor::Column::Splits.is_not_null())
902 .columns([actor::Column::ActorId, actor::Column::FragmentId])
903 .into_tuple()
904 .all(&inner.db)
905 .await?;
906
907 Ok(source_actors)
908 }
909
910 pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
911 let inner = self.inner.read().await;
912 let mut result = Vec::new();
913 let fragments = FragmentModel::find()
914 .select_only()
915 .columns([
916 fragment::Column::FragmentId,
917 fragment::Column::JobId,
918 fragment::Column::FragmentTypeMask,
919 fragment::Column::DistributionType,
920 fragment::Column::StateTableIds,
921 fragment::Column::VnodeCount,
922 fragment::Column::StreamNode,
923 ])
924 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
925 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
926 .group_by(fragment::Column::FragmentId)
927 .into_model::<FragmentDesc>()
928 .all(&inner.db)
929 .await?;
930 for fragment in fragments {
931 let upstreams: Vec<FragmentId> = FragmentRelation::find()
932 .select_only()
933 .column(fragment_relation::Column::SourceFragmentId)
934 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
935 .into_tuple()
936 .all(&inner.db)
937 .await?;
938 result.push((fragment, upstreams));
939 }
940 Ok(result)
941 }
942
943 pub async fn list_sink_actor_mapping(
944 &self,
945 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
946 let inner = self.inner.read().await;
947 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
948 .select_only()
949 .columns([sink::Column::SinkId, sink::Column::Name])
950 .into_tuple()
951 .all(&inner.db)
952 .await?;
953 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
954 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
955
956 let actor_with_type: Vec<(ActorId, SinkId, i32)> = Actor::find()
957 .select_only()
958 .column(actor::Column::ActorId)
959 .columns([fragment::Column::JobId, fragment::Column::FragmentTypeMask])
960 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
961 .filter(fragment::Column::JobId.is_in(sink_ids))
962 .into_tuple()
963 .all(&inner.db)
964 .await?;
965
966 let mut sink_actor_mapping = HashMap::new();
967 for (actor_id, sink_id, type_mask) in actor_with_type {
968 if type_mask & PbFragmentTypeFlag::Sink as i32 != 0 {
969 sink_actor_mapping
970 .entry(sink_id)
971 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
972 .1
973 .push(actor_id);
974 }
975 }
976
977 Ok(sink_actor_mapping)
978 }
979
980 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
981 let inner = self.inner.read().await;
982 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
983 .select_only()
984 .columns([
985 fragment::Column::FragmentId,
986 fragment::Column::JobId,
987 fragment::Column::StateTableIds,
988 ])
989 .into_partial_model()
990 .all(&inner.db)
991 .await?;
992 Ok(fragment_state_tables)
993 }
994
995 pub async fn load_all_actors(
998 &self,
999 database_id: Option<DatabaseId>,
1000 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1001 {
1002 let inner = self.inner.read().await;
1003 let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1004 let filter_condition = if let Some(database_id) = database_id {
1005 filter_condition.and(object::Column::DatabaseId.eq(database_id))
1006 } else {
1007 filter_condition
1008 };
1009 #[expect(clippy::type_complexity)]
1010 let mut actor_info_stream: BoxStream<
1011 '_,
1012 Result<
1013 (
1014 ActorId,
1015 WorkerId,
1016 Option<VnodeBitmap>,
1017 FragmentId,
1018 StreamNode,
1019 I32Array,
1020 DistributionType,
1021 DatabaseId,
1022 ObjectId,
1023 ),
1024 _,
1025 >,
1026 > = Actor::find()
1027 .select_only()
1028 .column(actor::Column::ActorId)
1029 .column(actor::Column::WorkerId)
1030 .column(actor::Column::VnodeBitmap)
1031 .column(fragment::Column::FragmentId)
1032 .column(fragment::Column::StreamNode)
1033 .column(fragment::Column::StateTableIds)
1034 .column(fragment::Column::DistributionType)
1035 .column(object::Column::DatabaseId)
1036 .column(object::Column::Oid)
1037 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1038 .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1039 .filter(filter_condition)
1040 .into_tuple()
1041 .stream(&inner.db)
1042 .await?;
1043
1044 let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1045 HashMap::new();
1046
1047 while let Some((
1048 actor_id,
1049 worker_id,
1050 vnode_bitmap,
1051 fragment_id,
1052 node,
1053 state_table_ids,
1054 distribution_type,
1055 database_id,
1056 job_id,
1057 )) = actor_info_stream.try_next().await?
1058 {
1059 let fragment_infos = database_fragment_infos
1060 .entry(database_id)
1061 .or_default()
1062 .entry(job_id)
1063 .or_default();
1064 let state_table_ids = state_table_ids.into_inner();
1065 let state_table_ids = state_table_ids
1066 .into_iter()
1067 .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1068 .collect();
1069 let actor_info = InflightActorInfo {
1070 worker_id,
1071 vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1072 };
1073 match fragment_infos.entry(fragment_id) {
1074 Entry::Occupied(mut entry) => {
1075 let info: &mut InflightFragmentInfo = entry.get_mut();
1076 assert_eq!(info.state_table_ids, state_table_ids);
1077 assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1078 }
1079 Entry::Vacant(entry) => {
1080 entry.insert(InflightFragmentInfo {
1081 fragment_id: fragment_id as _,
1082 distribution_type,
1083 nodes: node.to_protobuf(),
1084 actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1085 state_table_ids,
1086 });
1087 }
1088 }
1089 }
1090
1091 debug!(?database_fragment_infos, "reload all actors");
1092
1093 Ok(database_fragment_infos)
1094 }
1095
1096 pub async fn migrate_actors(
1097 &self,
1098 plan: HashMap<WorkerSlotId, WorkerSlotId>,
1099 ) -> MetaResult<()> {
1100 let inner = self.inner.read().await;
1101 let txn = inner.db.begin().await?;
1102
1103 let actors: Vec<(
1104 FragmentId,
1105 DistributionType,
1106 ActorId,
1107 Option<VnodeBitmap>,
1108 WorkerId,
1109 ActorStatus,
1110 )> = Actor::find()
1111 .select_only()
1112 .columns([
1113 fragment::Column::FragmentId,
1114 fragment::Column::DistributionType,
1115 ])
1116 .columns([
1117 actor::Column::ActorId,
1118 actor::Column::VnodeBitmap,
1119 actor::Column::WorkerId,
1120 actor::Column::Status,
1121 ])
1122 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1123 .into_tuple()
1124 .all(&txn)
1125 .await?;
1126
1127 let mut actor_locations = HashMap::new();
1128
1129 for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1130 if *status != ActorStatus::Running {
1131 tracing::warn!(
1132 "skipping actor {} in fragment {} with status {:?}",
1133 actor_id,
1134 fragment_id,
1135 status
1136 );
1137 continue;
1138 }
1139
1140 actor_locations
1141 .entry(*worker_id)
1142 .or_insert(HashMap::new())
1143 .entry(*fragment_id)
1144 .or_insert(BTreeSet::new())
1145 .insert(*actor_id);
1146 }
1147
1148 let expired_or_changed_workers: HashSet<_> =
1149 plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1150
1151 let mut actor_migration_plan = HashMap::new();
1152 for (worker, fragment) in actor_locations {
1153 if expired_or_changed_workers.contains(&worker) {
1154 for (fragment_id, actors) in fragment {
1155 debug!(
1156 "worker {} expired or changed, migrating fragment {}",
1157 worker, fragment_id
1158 );
1159 let worker_slot_to_actor: HashMap<_, _> = actors
1160 .iter()
1161 .enumerate()
1162 .map(|(idx, actor_id)| {
1163 (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1164 })
1165 .collect();
1166
1167 for (worker_slot, actor) in worker_slot_to_actor {
1168 if let Some(target) = plan.get(&worker_slot) {
1169 actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1170 }
1171 }
1172 }
1173 }
1174 }
1175
1176 for (actor, worker) in actor_migration_plan {
1177 Actor::update_many()
1178 .col_expr(
1179 actor::Column::WorkerId,
1180 Expr::value(Value::Int(Some(worker))),
1181 )
1182 .filter(actor::Column::ActorId.eq(actor))
1183 .exec(&txn)
1184 .await?;
1185 }
1186
1187 txn.commit().await?;
1188
1189 self.notify_fragment_mapping(
1190 NotificationOperation::Update,
1191 rebuild_fragment_mapping_from_actors(actors),
1192 )
1193 .await;
1194
1195 Ok(())
1196 }
1197
1198 pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1199 let inner = self.inner.read().await;
1200
1201 let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1202 .select_only()
1203 .columns([fragment::Column::FragmentId])
1204 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1205 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1206 .into_tuple()
1207 .all(&inner.db)
1208 .await?;
1209
1210 let mut actor_locations = HashMap::new();
1211
1212 for (fragment_id, _, worker_id) in actors {
1213 *actor_locations
1214 .entry(worker_id)
1215 .or_insert(HashMap::new())
1216 .entry(fragment_id)
1217 .or_insert(0_usize) += 1;
1218 }
1219
1220 let mut result = HashSet::new();
1221 for (worker_id, mapping) in actor_locations {
1222 let max_fragment_len = mapping.values().max().unwrap();
1223
1224 result
1225 .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1226 }
1227
1228 Ok(result)
1229 }
1230
1231 pub async fn all_node_actors(
1232 &self,
1233 include_inactive: bool,
1234 ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1235 let inner = self.inner.read().await;
1236 let fragment_actors = if include_inactive {
1237 FragmentModel::find()
1238 .find_with_related(Actor)
1239 .all(&inner.db)
1240 .await?
1241 } else {
1242 FragmentModel::find()
1243 .find_with_related(Actor)
1244 .filter(actor::Column::Status.eq(ActorStatus::Running))
1245 .all(&inner.db)
1246 .await?
1247 };
1248
1249 let job_definitions = resolve_streaming_job_definition(
1250 &inner.db,
1251 &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1252 )
1253 .await?;
1254
1255 let mut node_actors = HashMap::new();
1256 for (fragment, actors) in fragment_actors {
1257 let job_id = fragment.job_id;
1258 let (table_fragments, actor_status, _) = Self::compose_fragment(
1259 fragment,
1260 actors,
1261 job_definitions.get(&(job_id as _)).cloned(),
1262 )?;
1263 for actor in table_fragments.actors {
1264 let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1265 node_actors
1266 .entry(node_id)
1267 .or_insert_with(Vec::new)
1268 .push(actor);
1269 }
1270 }
1271
1272 Ok(node_actors)
1273 }
1274
1275 pub async fn get_worker_actor_ids(
1276 &self,
1277 job_ids: Vec<ObjectId>,
1278 ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1279 let inner = self.inner.read().await;
1280 let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1281 .select_only()
1282 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1283 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1284 .filter(fragment::Column::JobId.is_in(job_ids))
1285 .into_tuple()
1286 .all(&inner.db)
1287 .await?;
1288
1289 let mut worker_actors = BTreeMap::new();
1290 for (actor_id, worker_id) in actor_workers {
1291 worker_actors
1292 .entry(worker_id)
1293 .or_insert_with(Vec::new)
1294 .push(actor_id);
1295 }
1296
1297 Ok(worker_actors)
1298 }
1299
1300 pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1301 let inner = self.inner.read().await;
1302 let txn = inner.db.begin().await?;
1303 for assignments in split_assignment.values() {
1304 for (actor_id, splits) in assignments {
1305 let actor_splits = splits.iter().map(Into::into).collect_vec();
1306 Actor::update(actor::ActiveModel {
1307 actor_id: Set(*actor_id as _),
1308 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1309 splits: actor_splits,
1310 }))),
1311 ..Default::default()
1312 })
1313 .exec(&txn)
1314 .await
1315 .map_err(|err| {
1316 if err == DbErr::RecordNotUpdated {
1317 MetaError::catalog_id_not_found("actor_id", actor_id)
1318 } else {
1319 err.into()
1320 }
1321 })?;
1322 }
1323 }
1324 txn.commit().await?;
1325
1326 Ok(())
1327 }
1328
1329 pub async fn fill_snapshot_backfill_epoch(
1330 &self,
1331 fragment_ids: impl Iterator<Item = FragmentId>,
1332 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1333 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1334 ) -> MetaResult<()> {
1335 let inner = self.inner.write().await;
1336 let txn = inner.db.begin().await?;
1337 for fragment_id in fragment_ids {
1338 let fragment = FragmentModel::find_by_id(fragment_id)
1339 .one(&txn)
1340 .await?
1341 .context(format!("fragment {} not found", fragment_id))?;
1342 let mut node = fragment.stream_node.to_protobuf();
1343 if crate::stream::fill_snapshot_backfill_epoch(
1344 &mut node,
1345 snapshot_backfill_info,
1346 cross_db_snapshot_backfill_info,
1347 )? {
1348 let node = StreamNode::from(&node);
1349 FragmentModel::update(fragment::ActiveModel {
1350 fragment_id: Set(fragment_id),
1351 stream_node: Set(node),
1352 ..Default::default()
1353 })
1354 .exec(&txn)
1355 .await?;
1356 }
1357 }
1358 txn.commit().await?;
1359 Ok(())
1360 }
1361
1362 pub async fn get_running_actors_of_fragment(
1364 &self,
1365 fragment_id: FragmentId,
1366 ) -> MetaResult<Vec<ActorId>> {
1367 let inner = self.inner.read().await;
1368 let actors: Vec<ActorId> = Actor::find()
1369 .select_only()
1370 .column(actor::Column::ActorId)
1371 .filter(actor::Column::FragmentId.eq(fragment_id))
1372 .filter(actor::Column::Status.eq(ActorStatus::Running))
1373 .into_tuple()
1374 .all(&inner.db)
1375 .await?;
1376 Ok(actors)
1377 }
1378
1379 pub async fn get_running_actors_for_source_backfill(
1382 &self,
1383 source_backfill_fragment_id: FragmentId,
1384 source_fragment_id: FragmentId,
1385 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1386 let inner = self.inner.read().await;
1387 let txn = inner.db.begin().await?;
1388
1389 let fragment_relation: DispatcherType = FragmentRelation::find()
1390 .select_only()
1391 .column(fragment_relation::Column::DispatcherType)
1392 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1393 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1394 .into_tuple()
1395 .one(&txn)
1396 .await?
1397 .ok_or_else(|| {
1398 anyhow!(
1399 "no fragment connection from source fragment {} to source backfill fragment {}",
1400 source_fragment_id,
1401 source_backfill_fragment_id
1402 )
1403 })?;
1404
1405 if fragment_relation != DispatcherType::NoShuffle {
1406 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1407 }
1408
1409 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1410 let result: MetaResult<DistributionType> = try {
1411 FragmentModel::find_by_id(fragment_id)
1412 .select_only()
1413 .column(fragment::Column::DistributionType)
1414 .into_tuple()
1415 .one(txn)
1416 .await?
1417 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1418 };
1419 result
1420 };
1421
1422 let source_backfill_distribution_type =
1423 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1424 let source_distribution_type =
1425 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1426
1427 let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1428 Actor::find()
1429 .select_only()
1430 .column(actor::Column::ActorId)
1431 .column(actor::Column::VnodeBitmap)
1432 .filter(actor::Column::FragmentId.eq(fragment_id))
1433 .into_tuple()
1434 .stream(txn)
1435 .await?
1436 .map(|result| {
1437 result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1438 (
1439 actor_id as _,
1440 vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1441 )
1442 })
1443 })
1444 .try_collect()
1445 .await
1446 };
1447
1448 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1449 load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1450
1451 let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1452
1453 Ok(resolve_no_shuffle_actor_dispatcher(
1454 source_distribution_type,
1455 &source_actors,
1456 source_backfill_distribution_type,
1457 &source_backfill_actors,
1458 )
1459 .into_iter()
1460 .map(|(source_actor, source_backfill_actor)| {
1461 (source_backfill_actor as _, source_actor as _)
1462 })
1463 .collect())
1464 }
1465
1466 pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1467 let inner = self.inner.read().await;
1468 let actors: Vec<ActorId> = Actor::find()
1469 .select_only()
1470 .column(actor::Column::ActorId)
1471 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1472 .filter(fragment::Column::JobId.is_in(job_ids))
1473 .into_tuple()
1474 .all(&inner.db)
1475 .await?;
1476 Ok(actors)
1477 }
1478
1479 pub async fn get_root_fragments(
1492 &self,
1493 job_ids: Vec<ObjectId>,
1494 ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1495 let inner = self.inner.read().await;
1496
1497 let job_definitions = resolve_streaming_job_definition(
1498 &inner.db,
1499 &HashSet::from_iter(job_ids.iter().copied()),
1500 )
1501 .await?;
1502
1503 let all_fragments = FragmentModel::find()
1504 .filter(fragment::Column::JobId.is_in(job_ids))
1505 .all(&inner.db)
1506 .await?;
1507 let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1509 for fragment in all_fragments {
1510 if (fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32) != 0
1511 || (fragment.fragment_type_mask & PbFragmentTypeFlag::Sink as i32) != 0
1512 {
1513 _ = root_fragments.insert(fragment.job_id, fragment);
1514 } else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
1515 _ = root_fragments.try_insert(fragment.job_id, fragment);
1518 }
1519 }
1520
1521 let mut root_fragments_pb = HashMap::new();
1522 for (_, fragment) in root_fragments {
1523 let actors = fragment.find_related(Actor).all(&inner.db).await?;
1524
1525 let job_id = fragment.job_id;
1526 root_fragments_pb.insert(
1527 fragment.job_id,
1528 Self::compose_fragment(
1529 fragment,
1530 actors,
1531 job_definitions.get(&(job_id as _)).cloned(),
1532 )?
1533 .0,
1534 );
1535 }
1536
1537 let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1538 .select_only()
1539 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1540 .into_tuple()
1541 .all(&inner.db)
1542 .await?;
1543
1544 Ok((root_fragments_pb, actors))
1545 }
1546
1547 pub async fn get_root_fragment(
1548 &self,
1549 job_id: ObjectId,
1550 ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1551 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1552 let root_fragment = root_fragments
1553 .remove(&job_id)
1554 .context(format!("root fragment for job {} not found", job_id))?;
1555 Ok((root_fragment, actors))
1556 }
1557
1558 pub async fn get_downstream_fragments(
1560 &self,
1561 job_id: ObjectId,
1562 ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1563 let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1564
1565 let inner = self.inner.read().await;
1566 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1567 .filter(
1568 fragment_relation::Column::SourceFragmentId
1569 .eq(root_fragment.fragment_id as FragmentId),
1570 )
1571 .all(&inner.db)
1572 .await?;
1573 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1574 .await?
1575 .remove(&job_id);
1576
1577 let mut downstream_fragments = vec![];
1578 for fragment_relation::Model {
1579 target_fragment_id: fragment_id,
1580 dispatcher_type,
1581 ..
1582 } in downstream_fragment_relations
1583 {
1584 let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1585 .find_with_related(Actor)
1586 .all(&inner.db)
1587 .await?;
1588 if fragment_actors.is_empty() {
1589 bail!("No fragment found for fragment id {}", fragment_id);
1590 }
1591 assert_eq!(fragment_actors.len(), 1);
1592 let (fragment, actors) = fragment_actors.pop().unwrap();
1593 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1594 let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1595 downstream_fragments.push((dispatch_type, fragment));
1596 }
1597
1598 Ok((downstream_fragments, actors))
1599 }
1600
1601 pub async fn load_source_fragment_ids(
1602 &self,
1603 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1604 let inner = self.inner.read().await;
1605 let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1606 .select_only()
1607 .columns([
1608 fragment::Column::FragmentId,
1609 fragment::Column::FragmentTypeMask,
1610 fragment::Column::StreamNode,
1611 ])
1612 .into_tuple()
1613 .all(&inner.db)
1614 .await?;
1615 fragments.retain(|(_, mask, _)| *mask & PbFragmentTypeFlag::Source as i32 != 0);
1616
1617 let mut source_fragment_ids = HashMap::new();
1618 for (fragment_id, _, stream_node) in fragments {
1619 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1620 source_fragment_ids
1621 .entry(source_id as SourceId)
1622 .or_insert_with(BTreeSet::new)
1623 .insert(fragment_id);
1624 }
1625 }
1626 Ok(source_fragment_ids)
1627 }
1628
1629 pub async fn load_backfill_fragment_ids(
1630 &self,
1631 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1632 let inner = self.inner.read().await;
1633 let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1634 .select_only()
1635 .columns([
1636 fragment::Column::FragmentId,
1637 fragment::Column::FragmentTypeMask,
1638 fragment::Column::StreamNode,
1639 ])
1640 .into_tuple()
1641 .all(&inner.db)
1642 .await?;
1643 fragments.retain(|(_, mask, _)| *mask & PbFragmentTypeFlag::SourceScan as i32 != 0);
1644
1645 let mut source_fragment_ids = HashMap::new();
1646 for (fragment_id, _, stream_node) in fragments {
1647 if let Some((source_id, upstream_source_fragment_id)) =
1648 stream_node.to_protobuf().find_source_backfill()
1649 {
1650 source_fragment_ids
1651 .entry(source_id as SourceId)
1652 .or_insert_with(BTreeSet::new)
1653 .insert((fragment_id, upstream_source_fragment_id));
1654 }
1655 }
1656 Ok(source_fragment_ids)
1657 }
1658
1659 pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1660 let inner = self.inner.read().await;
1661 let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1662 .select_only()
1663 .columns([actor::Column::ActorId, actor::Column::Splits])
1664 .filter(actor::Column::Splits.is_not_null())
1665 .into_tuple()
1666 .all(&inner.db)
1667 .await?;
1668 Ok(splits.into_iter().collect())
1669 }
1670
1671 pub async fn get_actual_job_fragment_parallelism(
1673 &self,
1674 job_id: ObjectId,
1675 ) -> MetaResult<Option<usize>> {
1676 let inner = self.inner.read().await;
1677 let mut fragments: Vec<(FragmentId, i32, i64)> = FragmentModel::find()
1678 .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1679 .select_only()
1680 .columns([
1681 fragment::Column::FragmentId,
1682 fragment::Column::FragmentTypeMask,
1683 ])
1684 .column_as(actor::Column::ActorId.count(), "count")
1685 .filter(fragment::Column::JobId.eq(job_id))
1686 .group_by(fragment::Column::FragmentId)
1687 .into_tuple()
1688 .all(&inner.db)
1689 .await?;
1690
1691 fragments.retain(|(_, mask, _)| {
1692 *mask & PbFragmentTypeFlag::Mview as i32 != 0
1693 || *mask & PbFragmentTypeFlag::Sink as i32 != 0
1694 });
1695
1696 Ok(fragments
1697 .into_iter()
1698 .at_most_one()
1699 .ok()
1700 .flatten()
1701 .map(|(_, _, count)| count as usize))
1702 }
1703}
1704
1705#[cfg(test)]
1706mod tests {
1707 use std::collections::{BTreeMap, HashMap, HashSet};
1708
1709 use itertools::Itertools;
1710 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1711 use risingwave_common::util::iter_util::ZipEqDebug;
1712 use risingwave_common::util::stream_graph_visitor::visit_stream_node;
1713 use risingwave_meta_model::actor::ActorStatus;
1714 use risingwave_meta_model::fragment::DistributionType;
1715 use risingwave_meta_model::{
1716 ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1717 VnodeBitmap, actor, fragment,
1718 };
1719 use risingwave_pb::common::PbActorLocation;
1720 use risingwave_pb::meta::table_fragments::PbActorStatus;
1721 use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1722 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1723 use risingwave_pb::plan_common::PbExprContext;
1724 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1725 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1726 use risingwave_pb::stream_plan::{MergeNode, PbFragmentTypeFlag, PbStreamNode, PbUnionNode};
1727
1728 use crate::MetaResult;
1729 use crate::controller::catalog::CatalogController;
1730 use crate::model::{Fragment, StreamActor};
1731
1732 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1733
1734 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1735
1736 const TEST_FRAGMENT_ID: FragmentId = 1;
1737
1738 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1739
1740 const TEST_JOB_ID: ObjectId = 1;
1741
1742 const TEST_STATE_TABLE_ID: TableId = 1000;
1743
1744 fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1745 let mut upstream_actor_ids = BTreeMap::new();
1746 upstream_actor_ids.insert(
1747 TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1748 HashSet::from_iter([(actor_id + 100)]),
1749 );
1750 upstream_actor_ids.insert(
1751 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1752 HashSet::from_iter([(actor_id + 200)]),
1753 );
1754 upstream_actor_ids
1755 }
1756
1757 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1758 let mut input = vec![];
1759 for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1760 input.push(PbStreamNode {
1761 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1762 upstream_fragment_id: *upstream_fragment_id as _,
1763 ..Default::default()
1764 }))),
1765 ..Default::default()
1766 });
1767 }
1768
1769 PbStreamNode {
1770 input,
1771 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1772 ..Default::default()
1773 }
1774 }
1775
1776 #[tokio::test]
1777 async fn test_extract_fragment() -> MetaResult<()> {
1778 let actor_count = 3u32;
1779 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1780 .map(|actor_id| {
1781 (
1782 actor_id as _,
1783 generate_upstream_actor_ids_for_actor(actor_id),
1784 )
1785 })
1786 .collect();
1787
1788 let actor_bitmaps = ActorMapping::new_uniform(
1789 (0..actor_count).map(|i| i as _),
1790 VirtualNode::COUNT_FOR_TEST,
1791 )
1792 .to_bitmaps();
1793
1794 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1795
1796 let pb_actors = (0..actor_count)
1797 .map(|actor_id| StreamActor {
1798 actor_id: actor_id as _,
1799 fragment_id: TEST_FRAGMENT_ID as _,
1800 vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1801 mview_definition: "".to_owned(),
1802 expr_context: Some(PbExprContext {
1803 time_zone: String::from("America/New_York"),
1804 strict_mode: false,
1805 }),
1806 })
1807 .collect_vec();
1808
1809 let pb_fragment = Fragment {
1810 fragment_id: TEST_FRAGMENT_ID as _,
1811 fragment_type_mask: PbFragmentTypeFlag::Source as _,
1812 distribution_type: PbFragmentDistributionType::Hash as _,
1813 actors: pb_actors.clone(),
1814 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1815 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1816 nodes: stream_node.clone(),
1817 };
1818
1819 let pb_actor_status = (0..actor_count)
1820 .map(|actor_id| {
1821 (
1822 actor_id,
1823 PbActorStatus {
1824 location: PbActorLocation::from_worker(0),
1825 state: PbActorState::Running as _,
1826 },
1827 )
1828 })
1829 .collect();
1830
1831 let pb_actor_splits = Default::default();
1832
1833 let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1834 TEST_JOB_ID,
1835 &pb_fragment,
1836 &pb_actor_status,
1837 &pb_actor_splits,
1838 )?;
1839
1840 check_fragment(fragment, pb_fragment);
1841 check_actors(
1842 actors,
1843 &upstream_actor_ids,
1844 pb_actors,
1845 Default::default(),
1846 &stream_node,
1847 );
1848
1849 Ok(())
1850 }
1851
1852 #[tokio::test]
1853 async fn test_compose_fragment() -> MetaResult<()> {
1854 let actor_count = 3u32;
1855
1856 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1857 .map(|actor_id| {
1858 (
1859 actor_id as _,
1860 generate_upstream_actor_ids_for_actor(actor_id),
1861 )
1862 })
1863 .collect();
1864
1865 let mut actor_bitmaps = ActorMapping::new_uniform(
1866 (0..actor_count).map(|i| i as _),
1867 VirtualNode::COUNT_FOR_TEST,
1868 )
1869 .to_bitmaps();
1870
1871 let actors = (0..actor_count)
1872 .map(|actor_id| {
1873 let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
1874 splits: vec![PbConnectorSplit {
1875 split_type: "dummy".to_owned(),
1876 ..Default::default()
1877 }],
1878 }));
1879
1880 #[expect(deprecated)]
1881 actor::Model {
1882 actor_id: actor_id as ActorId,
1883 fragment_id: TEST_FRAGMENT_ID,
1884 status: ActorStatus::Running,
1885 splits: actor_splits,
1886 worker_id: 0,
1887 upstream_actor_ids: Default::default(),
1888 vnode_bitmap: actor_bitmaps
1889 .remove(&actor_id)
1890 .map(|bitmap| bitmap.to_protobuf())
1891 .as_ref()
1892 .map(VnodeBitmap::from),
1893 expr_context: ExprContext::from(&PbExprContext {
1894 time_zone: String::from("America/New_York"),
1895 strict_mode: false,
1896 }),
1897 }
1898 })
1899 .collect_vec();
1900
1901 let stream_node = {
1902 let template_actor = actors.first().cloned().unwrap();
1903
1904 let template_upstream_actor_ids = upstream_actor_ids
1905 .get(&(template_actor.actor_id as _))
1906 .unwrap();
1907
1908 generate_merger_stream_node(template_upstream_actor_ids)
1909 };
1910
1911 #[expect(deprecated)]
1912 let fragment = fragment::Model {
1913 fragment_id: TEST_FRAGMENT_ID,
1914 job_id: TEST_JOB_ID,
1915 fragment_type_mask: 0,
1916 distribution_type: DistributionType::Hash,
1917 stream_node: StreamNode::from(&stream_node),
1918 state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
1919 upstream_fragment_id: Default::default(),
1920 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1921 };
1922
1923 let (pb_fragment, pb_actor_status, pb_actor_splits) =
1924 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1925
1926 assert_eq!(pb_actor_status.len(), actor_count as usize);
1927 assert_eq!(pb_actor_splits.len(), actor_count as usize);
1928
1929 let pb_actors = pb_fragment.actors.clone();
1930
1931 check_fragment(fragment, pb_fragment);
1932 check_actors(
1933 actors,
1934 &upstream_actor_ids,
1935 pb_actors,
1936 pb_actor_splits,
1937 &stream_node,
1938 );
1939
1940 Ok(())
1941 }
1942
1943 fn check_actors(
1944 actors: Vec<actor::Model>,
1945 actor_upstreams: &FragmentActorUpstreams,
1946 pb_actors: Vec<StreamActor>,
1947 pb_actor_splits: HashMap<u32, PbConnectorSplits>,
1948 stream_node: &PbStreamNode,
1949 ) {
1950 for (
1951 actor::Model {
1952 actor_id,
1953 fragment_id,
1954 status,
1955 splits,
1956 worker_id: _,
1957 vnode_bitmap,
1958 expr_context,
1959 ..
1960 },
1961 StreamActor {
1962 actor_id: pb_actor_id,
1963 fragment_id: pb_fragment_id,
1964 vnode_bitmap: pb_vnode_bitmap,
1965 mview_definition,
1966 expr_context: pb_expr_context,
1967 ..
1968 },
1969 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
1970 {
1971 assert_eq!(actor_id, pb_actor_id as ActorId);
1972 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
1973
1974 assert_eq!(
1975 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1976 pb_vnode_bitmap,
1977 );
1978
1979 assert_eq!(mview_definition, "");
1980
1981 visit_stream_node(stream_node, |body| {
1982 if let PbNodeBody::Merge(m) = body {
1983 assert!(
1984 actor_upstreams
1985 .get(&(actor_id as _))
1986 .unwrap()
1987 .contains_key(&m.upstream_fragment_id)
1988 );
1989 }
1990 });
1991
1992 assert_eq!(status, ActorStatus::Running);
1993
1994 assert_eq!(
1995 splits,
1996 pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
1997 );
1998
1999 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2000 }
2001 }
2002
2003 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2004 let Fragment {
2005 fragment_id,
2006 fragment_type_mask,
2007 distribution_type: pb_distribution_type,
2008 actors: _,
2009 state_table_ids: pb_state_table_ids,
2010 maybe_vnode_count: _,
2011 nodes,
2012 } = pb_fragment;
2013
2014 assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2015 assert_eq!(fragment_type_mask, fragment.fragment_type_mask as u32);
2016 assert_eq!(
2017 pb_distribution_type,
2018 PbFragmentDistributionType::from(fragment.distribution_type)
2019 );
2020
2021 assert_eq!(
2022 pb_state_table_ids,
2023 fragment.state_table_ids.into_u32_array()
2024 );
2025 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2026 }
2027}