1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
25use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
26use risingwave_connector::source::SplitImpl;
27use risingwave_meta_model::actor::ActorStatus;
28use risingwave_meta_model::actor_dispatcher::DispatcherType;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_meta_model::object::ObjectType;
31use risingwave_meta_model::prelude::{
32 Actor, Fragment as FragmentModel, FragmentRelation, Sink, StreamingJob,
33};
34use risingwave_meta_model::{
35 ActorId, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus, ObjectId,
36 SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId,
37 actor, database, fragment, fragment_relation, object, sink, source, streaming_job, table,
38};
39use risingwave_meta_model_migration::{Alias, SelectStatement};
40use risingwave_pb::common::PbActorLocation;
41use risingwave_pb::meta::subscribe_response::{
42 Info as NotificationInfo, Operation as NotificationOperation,
43};
44use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
45use risingwave_pb::meta::table_fragments::fragment::{
46 FragmentDistributionType, PbFragmentDistributionType,
47};
48use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
49use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
50use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
51use risingwave_pb::stream_plan::stream_node::NodeBody;
52use risingwave_pb::stream_plan::{
53 DispatchStrategy, PbDispatcherType, PbFragmentTypeFlag, PbStreamContext, PbStreamNode,
54 PbStreamScanType, StreamScanType,
55};
56use sea_orm::ActiveValue::Set;
57use sea_orm::sea_query::Expr;
58use sea_orm::{
59 ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
60 QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
61};
62use serde::{Deserialize, Serialize};
63use tracing::debug;
64
65use crate::barrier::SnapshotBackfillInfo;
66use crate::controller::catalog::{CatalogController, CatalogControllerInner};
67use crate::controller::scale::resolve_streaming_job_definition;
68use crate::controller::utils::{
69 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
70 get_fragment_mappings, rebuild_fragment_mapping_from_actors,
71 resolve_no_shuffle_actor_dispatcher,
72};
73use crate::manager::LocalNotification;
74use crate::model::{
75 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
76 StreamActor, StreamContext, StreamJobFragments, TableParallelism,
77};
78use crate::stream::{SplitAssignment, build_actor_split_impls};
79use crate::{MetaError, MetaResult, model};
80
81#[derive(Clone, Debug)]
83pub struct InflightActorInfo {
84 pub worker_id: WorkerId,
85 pub vnode_bitmap: Option<Bitmap>,
86}
87
88#[derive(Clone, Debug)]
89pub struct InflightFragmentInfo {
90 pub fragment_id: crate::model::FragmentId,
91 pub distribution_type: DistributionType,
92 pub nodes: PbStreamNode,
93 pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
94 pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
95}
96
97#[derive(Clone, Debug)]
98pub struct FragmentParallelismInfo {
99 pub distribution_type: FragmentDistributionType,
100 pub actor_count: usize,
101 pub vnode_count: usize,
102}
103
104#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
105#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
107 pub job_id: ObjectId,
108 pub obj_type: ObjectType,
109 pub name: String,
110 pub job_status: JobStatus,
111 pub parallelism: StreamingParallelism,
112 pub max_parallelism: i32,
113 pub resource_group: String,
114}
115
116impl CatalogControllerInner {
117 pub async fn all_running_fragment_mappings(
119 &self,
120 ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
121 let txn = self.db.begin().await?;
122
123 let job_ids: Vec<ObjectId> = StreamingJob::find()
124 .select_only()
125 .column(streaming_job::Column::JobId)
126 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
127 .into_tuple()
128 .all(&txn)
129 .await?;
130
131 let mut result = vec![];
132 for job_id in job_ids {
133 let mappings = get_fragment_mappings(&txn, job_id).await?;
134
135 result.extend(mappings.into_iter());
136 }
137
138 Ok(result.into_iter())
139 }
140}
141
142impl CatalogController {
143 pub(crate) async fn notify_fragment_mapping(
144 &self,
145 operation: NotificationOperation,
146 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
147 ) {
148 let fragment_ids = fragment_mappings
149 .iter()
150 .map(|mapping| mapping.fragment_id)
151 .collect_vec();
152 for fragment_mapping in fragment_mappings {
154 self.env
155 .notification_manager()
156 .notify_frontend(
157 operation,
158 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
159 )
160 .await;
161 }
162
163 match operation {
165 NotificationOperation::Add | NotificationOperation::Update => {
166 self.env
167 .notification_manager()
168 .notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
169 fragment_ids,
170 ))
171 .await;
172 }
173 NotificationOperation::Delete => {
174 self.env
175 .notification_manager()
176 .notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
177 fragment_ids,
178 ))
179 .await;
180 }
181 op => {
182 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
183 }
184 }
185 }
186
187 pub fn extract_fragment_and_actors_from_fragments(
188 stream_job_fragments: &StreamJobFragments,
189 ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
190 stream_job_fragments
191 .fragments
192 .values()
193 .map(|fragment| {
194 Self::extract_fragment_and_actors_for_new_job(
195 stream_job_fragments.stream_job_id.table_id as _,
196 fragment,
197 &stream_job_fragments.actor_status,
198 &stream_job_fragments.actor_splits,
199 )
200 })
201 .try_collect()
202 }
203
204 fn extract_fragment_and_actors_for_new_job(
205 job_id: ObjectId,
206 fragment: &Fragment,
207 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
208 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
209 ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
210 let vnode_count = fragment.vnode_count();
211 let Fragment {
212 fragment_id: pb_fragment_id,
213 fragment_type_mask: pb_fragment_type_mask,
214 distribution_type: pb_distribution_type,
215 actors: pb_actors,
216 state_table_ids: pb_state_table_ids,
217 nodes,
218 ..
219 } = fragment;
220
221 let state_table_ids = pb_state_table_ids.clone().into();
222
223 assert!(!pb_actors.is_empty());
224
225 let stream_node = {
226 let mut stream_node = nodes.clone();
227 visit_stream_node_mut(&mut stream_node, |body| {
228 #[expect(deprecated)]
229 if let NodeBody::Merge(m) = body {
230 m.upstream_actor_id = vec![];
231 }
232 });
233
234 stream_node
235 };
236
237 let mut actors = vec![];
238
239 for actor in pb_actors {
240 let StreamActor {
241 actor_id,
242 fragment_id,
243 vnode_bitmap,
244 mview_definition: _,
245 expr_context: pb_expr_context,
246 ..
247 } = actor;
248
249 let splits = actor_splits.get(actor_id).map(|splits| {
250 ConnectorSplits::from(&PbConnectorSplits {
251 splits: splits.iter().map(ConnectorSplit::from).collect(),
252 })
253 });
254 let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
255 anyhow::anyhow!(
256 "actor {} in fragment {} has no actor_status",
257 actor_id,
258 fragment_id
259 )
260 })?;
261
262 let worker_id = status.worker_id() as _;
263
264 let pb_expr_context = pb_expr_context
265 .as_ref()
266 .expect("no expression context found");
267
268 #[expect(deprecated)]
269 actors.push(actor::Model {
270 actor_id: *actor_id as _,
271 fragment_id: *fragment_id as _,
272 status: status.get_state().unwrap().into(),
273 splits,
274 worker_id,
275 upstream_actor_ids: Default::default(),
276 vnode_bitmap: vnode_bitmap
277 .as_ref()
278 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
279 expr_context: ExprContext::from(pb_expr_context),
280 });
281 }
282
283 let stream_node = StreamNode::from(&stream_node);
284
285 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
286 .unwrap()
287 .into();
288
289 #[expect(deprecated)]
290 let fragment = fragment::Model {
291 fragment_id: *pb_fragment_id as _,
292 job_id,
293 fragment_type_mask: *pb_fragment_type_mask as _,
294 distribution_type,
295 stream_node,
296 state_table_ids,
297 upstream_fragment_id: Default::default(),
298 vnode_count: vnode_count as _,
299 };
300
301 Ok((fragment, actors))
302 }
303
304 pub fn compose_table_fragments(
305 table_id: u32,
306 state: PbState,
307 ctx: Option<PbStreamContext>,
308 fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
309 parallelism: StreamingParallelism,
310 max_parallelism: usize,
311 job_definition: Option<String>,
312 ) -> MetaResult<StreamJobFragments> {
313 let mut pb_fragments = BTreeMap::new();
314 let mut pb_actor_splits = HashMap::new();
315 let mut pb_actor_status = BTreeMap::new();
316
317 for (fragment, actors) in fragments {
318 let (fragment, fragment_actor_status, fragment_actor_splits) =
319 Self::compose_fragment(fragment, actors, job_definition.clone())?;
320
321 pb_fragments.insert(fragment.fragment_id, fragment);
322
323 pb_actor_splits.extend(build_actor_split_impls(&fragment_actor_splits));
324 pb_actor_status.extend(fragment_actor_status.into_iter());
325 }
326
327 let table_fragments = StreamJobFragments {
328 stream_job_id: table_id.into(),
329 state: state as _,
330 fragments: pb_fragments,
331 actor_status: pb_actor_status,
332 actor_splits: pb_actor_splits,
333 ctx: ctx
334 .as_ref()
335 .map(StreamContext::from_protobuf)
336 .unwrap_or_default(),
337 assigned_parallelism: match parallelism {
338 StreamingParallelism::Custom => TableParallelism::Custom,
339 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
340 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
341 },
342 max_parallelism,
343 };
344
345 Ok(table_fragments)
346 }
347
348 #[allow(clippy::type_complexity)]
349 pub(crate) fn compose_fragment(
350 fragment: fragment::Model,
351 actors: Vec<actor::Model>,
352 job_definition: Option<String>,
353 ) -> MetaResult<(
354 Fragment,
355 HashMap<crate::model::ActorId, PbActorStatus>,
356 HashMap<crate::model::ActorId, PbConnectorSplits>,
357 )> {
358 let fragment::Model {
359 fragment_id,
360 fragment_type_mask,
361 distribution_type,
362 stream_node,
363 state_table_ids,
364 vnode_count,
365 ..
366 } = fragment;
367
368 let stream_node = stream_node.to_protobuf();
369 let mut upstream_fragments = HashSet::new();
370 visit_stream_node(&stream_node, |body| {
371 if let NodeBody::Merge(m) = body {
372 assert!(
373 upstream_fragments.insert(m.upstream_fragment_id),
374 "non-duplicate upstream fragment"
375 );
376 }
377 });
378
379 let mut pb_actors = vec![];
380
381 let mut pb_actor_status = HashMap::new();
382 let mut pb_actor_splits = HashMap::new();
383
384 for actor in actors {
385 if actor.fragment_id != fragment_id {
386 bail!(
387 "fragment id {} from actor {} is different from fragment {}",
388 actor.fragment_id,
389 actor.actor_id,
390 fragment_id
391 )
392 }
393
394 let actor::Model {
395 actor_id,
396 fragment_id,
397 status,
398 worker_id,
399 splits,
400 vnode_bitmap,
401 expr_context,
402 ..
403 } = actor;
404
405 let vnode_bitmap =
406 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
407 let pb_expr_context = Some(expr_context.to_protobuf());
408
409 pb_actor_status.insert(
410 actor_id as _,
411 PbActorStatus {
412 location: PbActorLocation::from_worker(worker_id as u32),
413 state: PbActorState::from(status) as _,
414 },
415 );
416
417 if let Some(splits) = splits {
418 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
419 }
420
421 pb_actors.push(StreamActor {
422 actor_id: actor_id as _,
423 fragment_id: fragment_id as _,
424 vnode_bitmap,
425 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
426 expr_context: pb_expr_context,
427 })
428 }
429
430 let pb_state_table_ids = state_table_ids.into_u32_array();
431 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
432 let pb_fragment = Fragment {
433 fragment_id: fragment_id as _,
434 fragment_type_mask: fragment_type_mask as _,
435 distribution_type: pb_distribution_type,
436 actors: pb_actors,
437 state_table_ids: pb_state_table_ids,
438 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
439 nodes: stream_node,
440 };
441
442 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
443 }
444
445 pub async fn running_fragment_parallelisms(
446 &self,
447 id_filter: Option<HashSet<FragmentId>>,
448 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
449 let inner = self.inner.read().await;
450
451 let query_alias = Alias::new("fragment_actor_count");
452 let count_alias = Alias::new("count");
453
454 let mut query = SelectStatement::new()
455 .column(actor::Column::FragmentId)
456 .expr_as(actor::Column::ActorId.count(), count_alias.clone())
457 .from(Actor)
458 .group_by_col(actor::Column::FragmentId)
459 .to_owned();
460
461 if let Some(id_filter) = id_filter {
462 query.cond_having(actor::Column::FragmentId.is_in(id_filter));
463 }
464
465 let outer = SelectStatement::new()
466 .column((FragmentModel, fragment::Column::FragmentId))
467 .column(count_alias)
468 .column(fragment::Column::DistributionType)
469 .column(fragment::Column::VnodeCount)
470 .from_subquery(query.to_owned(), query_alias.clone())
471 .inner_join(
472 FragmentModel,
473 Expr::col((query_alias, actor::Column::FragmentId))
474 .equals((FragmentModel, fragment::Column::FragmentId)),
475 )
476 .to_owned();
477
478 let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
479 Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
480 outer.to_owned(),
481 )
482 .all(&inner.db)
483 .await?;
484
485 Ok(fragment_parallelisms
486 .into_iter()
487 .map(|(fragment_id, count, distribution_type, vnode_count)| {
488 (
489 fragment_id,
490 FragmentParallelismInfo {
491 distribution_type: distribution_type.into(),
492 actor_count: count as usize,
493 vnode_count: vnode_count as usize,
494 },
495 )
496 })
497 .collect())
498 }
499
500 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
501 let inner = self.inner.read().await;
502 let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
503 .select_only()
504 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
505 .into_tuple()
506 .all(&inner.db)
507 .await?;
508 Ok(fragment_jobs.into_iter().collect())
509 }
510
511 pub async fn get_fragment_job_id(
512 &self,
513 fragment_ids: Vec<FragmentId>,
514 ) -> MetaResult<Vec<ObjectId>> {
515 let inner = self.inner.read().await;
516
517 let object_ids: Vec<ObjectId> = FragmentModel::find()
518 .select_only()
519 .column(fragment::Column::JobId)
520 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
521 .into_tuple()
522 .all(&inner.db)
523 .await?;
524
525 Ok(object_ids)
526 }
527
528 pub async fn list_fragment_database_ids(
529 &self,
530 select_fragment_ids: Option<Vec<FragmentId>>,
531 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
532 let inner = self.inner.read().await;
533 let select = FragmentModel::find()
534 .select_only()
535 .column(fragment::Column::FragmentId)
536 .column(object::Column::DatabaseId)
537 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
538 let select = if let Some(select_fragment_ids) = select_fragment_ids {
539 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
540 } else {
541 select
542 };
543 Ok(select.into_tuple().all(&inner.db).await?)
544 }
545
546 pub async fn get_job_fragments_by_id(
547 &self,
548 job_id: ObjectId,
549 ) -> MetaResult<StreamJobFragments> {
550 let inner = self.inner.read().await;
551 let fragment_actors = FragmentModel::find()
552 .find_with_related(Actor)
553 .filter(fragment::Column::JobId.eq(job_id))
554 .all(&inner.db)
555 .await?;
556
557 let job_info = StreamingJob::find_by_id(job_id)
558 .one(&inner.db)
559 .await?
560 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
561
562 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
563 .await?
564 .remove(&job_id);
565
566 Self::compose_table_fragments(
567 job_id as _,
568 job_info.job_status.into(),
569 job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
570 fragment_actors,
571 job_info.parallelism.clone(),
572 job_info.max_parallelism as _,
573 job_definition,
574 )
575 }
576
577 pub async fn get_fragment_actor_dispatchers(
578 &self,
579 fragment_ids: Vec<FragmentId>,
580 ) -> MetaResult<FragmentActorDispatchers> {
581 let inner = self.inner.read().await;
582 get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
583 }
584
585 pub async fn get_fragment_downstream_relations(
586 &self,
587 fragment_ids: Vec<FragmentId>,
588 ) -> MetaResult<FragmentDownstreamRelation> {
589 let inner = self.inner.read().await;
590 let mut stream = FragmentRelation::find()
591 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
592 .stream(&inner.db)
593 .await?;
594 let mut relations = FragmentDownstreamRelation::new();
595 while let Some(relation) = stream.try_next().await? {
596 relations
597 .entry(relation.source_fragment_id as _)
598 .or_default()
599 .push(DownstreamFragmentRelation {
600 downstream_fragment_id: relation.target_fragment_id as _,
601 dispatcher_type: relation.dispatcher_type,
602 dist_key_indices: relation.dist_key_indices.into_u32_array(),
603 output_indices: relation.output_indices.into_u32_array(),
604 });
605 }
606 Ok(relations)
607 }
608
609 pub async fn get_job_fragment_backfill_scan_type(
610 &self,
611 job_id: ObjectId,
612 ) -> MetaResult<HashMap<model::FragmentId, PbStreamScanType>> {
613 let inner = self.inner.read().await;
614 let fragments: Vec<_> = FragmentModel::find()
615 .filter(fragment::Column::JobId.eq(job_id))
616 .all(&inner.db)
617 .await?;
618
619 let mut result = HashMap::new();
620
621 for fragment::Model {
622 fragment_id,
623 stream_node,
624 ..
625 } in fragments
626 {
627 let stream_node = stream_node.to_protobuf();
628 visit_stream_node(&stream_node, |body| {
629 if let NodeBody::StreamScan(node) = body {
630 match node.stream_scan_type() {
631 StreamScanType::Unspecified => {}
632 scan_type => {
633 result.insert(fragment_id as model::FragmentId, scan_type);
634 }
635 }
636 }
637 });
638 }
639
640 Ok(result)
641 }
642
643 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
644 let inner = self.inner.read().await;
645 let job_states = StreamingJob::find()
646 .select_only()
647 .column(streaming_job::Column::JobId)
648 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
649 .join(JoinType::InnerJoin, object::Relation::Database2.def())
650 .column(object::Column::ObjType)
651 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
652 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
653 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
654 .column_as(
655 Expr::if_null(
656 Expr::col((table::Entity, table::Column::Name)),
657 Expr::if_null(
658 Expr::col((source::Entity, source::Column::Name)),
659 Expr::if_null(
660 Expr::col((sink::Entity, sink::Column::Name)),
661 Expr::val("<unknown>"),
662 ),
663 ),
664 ),
665 "name",
666 )
667 .columns([
668 streaming_job::Column::JobStatus,
669 streaming_job::Column::Parallelism,
670 streaming_job::Column::MaxParallelism,
671 ])
672 .column_as(
673 Expr::if_null(
674 Expr::col((
675 streaming_job::Entity,
676 streaming_job::Column::SpecificResourceGroup,
677 )),
678 Expr::col((database::Entity, database::Column::ResourceGroup)),
679 ),
680 "resource_group",
681 )
682 .into_model()
683 .all(&inner.db)
684 .await?;
685 Ok(job_states)
686 }
687
688 pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
689 let inner = self.inner.read().await;
690 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
691 .select_only()
692 .column(streaming_job::Column::MaxParallelism)
693 .into_tuple()
694 .one(&inner.db)
695 .await?
696 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
697 Ok(max_parallelism as usize)
698 }
699
700 pub async fn get_job_actor_mapping(
702 &self,
703 job_ids: Vec<ObjectId>,
704 ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
705 let inner = self.inner.read().await;
706 let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
707 .select_only()
708 .column(fragment::Column::JobId)
709 .column(actor::Column::ActorId)
710 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
711 .filter(fragment::Column::JobId.is_in(job_ids))
712 .into_tuple()
713 .all(&inner.db)
714 .await?;
715 Ok(job_actors.into_iter().into_group_map())
716 }
717
718 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
720 if let Ok(inner) = self.inner.try_read() {
721 if let Ok(job_state_tables) = FragmentModel::find()
722 .select_only()
723 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
724 .into_tuple::<(ObjectId, I32Array)>()
725 .all(&inner.db)
726 .await
727 {
728 let mut job_internal_table_ids = HashMap::new();
729 for (job_id, state_table_ids) in job_state_tables {
730 job_internal_table_ids
731 .entry(job_id)
732 .or_insert_with(Vec::new)
733 .extend(state_table_ids.into_inner());
734 }
735 return Some(job_internal_table_ids.into_iter().collect());
736 }
737 }
738 None
739 }
740
741 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
742 let inner = self.inner.read().await;
743 let count = FragmentModel::find().count(&inner.db).await?;
744 Ok(count > 0)
745 }
746
747 pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
748 let inner = self.inner.read().await;
749 let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
750 .select_only()
751 .column(actor::Column::WorkerId)
752 .column_as(actor::Column::ActorId.count(), "count")
753 .group_by(actor::Column::WorkerId)
754 .into_tuple()
755 .all(&inner.db)
756 .await?;
757
758 Ok(actor_cnt
759 .into_iter()
760 .map(|(worker_id, count)| (worker_id, count as usize))
761 .collect())
762 }
763
764 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
766 let inner = self.inner.read().await;
767 let jobs = StreamingJob::find().all(&inner.db).await?;
768
769 let mut job_definition = resolve_streaming_job_definition(
770 &inner.db,
771 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
772 )
773 .await?;
774
775 let mut table_fragments = BTreeMap::new();
776 for job in jobs {
777 let fragment_actors = FragmentModel::find()
778 .find_with_related(Actor)
779 .filter(fragment::Column::JobId.eq(job.job_id))
780 .all(&inner.db)
781 .await?;
782
783 table_fragments.insert(
784 job.job_id as ObjectId,
785 Self::compose_table_fragments(
786 job.job_id as _,
787 job.job_status.into(),
788 job.timezone.map(|tz| PbStreamContext { timezone: tz }),
789 fragment_actors,
790 job.parallelism.clone(),
791 job.max_parallelism as _,
792 job_definition.remove(&job.job_id),
793 )?,
794 );
795 }
796
797 Ok(table_fragments)
798 }
799
800 pub async fn upstream_fragments(
801 &self,
802 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
803 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
804 let inner = self.inner.read().await;
805 let mut stream = FragmentRelation::find()
806 .select_only()
807 .columns([
808 fragment_relation::Column::SourceFragmentId,
809 fragment_relation::Column::TargetFragmentId,
810 ])
811 .filter(
812 fragment_relation::Column::TargetFragmentId
813 .is_in(fragment_ids.map(|id| id as FragmentId)),
814 )
815 .into_tuple::<(FragmentId, FragmentId)>()
816 .stream(&inner.db)
817 .await?;
818 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
819 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
820 upstream_fragments
821 .entry(downstream_fragment_id as crate::model::FragmentId)
822 .or_default()
823 .insert(upstream_fragment_id as crate::model::FragmentId);
824 }
825 Ok(upstream_fragments)
826 }
827
828 pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
829 let inner = self.inner.read().await;
830 let actor_locations: Vec<PartialActorLocation> =
831 Actor::find().into_partial_model().all(&inner.db).await?;
832 Ok(actor_locations)
833 }
834
835 pub async fn list_actor_info(
836 &self,
837 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
838 let inner = self.inner.read().await;
839 let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
840 Actor::find()
841 .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
842 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
843 .select_only()
844 .columns([actor::Column::ActorId, actor::Column::FragmentId])
845 .column_as(object::Column::Oid, "job_id")
846 .column_as(object::Column::SchemaId, "schema_id")
847 .column_as(object::Column::ObjType, "type")
848 .into_tuple()
849 .all(&inner.db)
850 .await?;
851 Ok(actor_locations)
852 }
853
854 pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
855 let inner = self.inner.read().await;
856
857 let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
858 .select_only()
859 .filter(actor::Column::Splits.is_not_null())
860 .columns([actor::Column::ActorId, actor::Column::FragmentId])
861 .into_tuple()
862 .all(&inner.db)
863 .await?;
864
865 Ok(source_actors)
866 }
867
868 pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
869 let inner = self.inner.read().await;
870 let mut result = Vec::new();
871 let fragments = FragmentModel::find()
872 .select_only()
873 .columns([
874 fragment::Column::FragmentId,
875 fragment::Column::JobId,
876 fragment::Column::FragmentTypeMask,
877 fragment::Column::DistributionType,
878 fragment::Column::StateTableIds,
879 fragment::Column::VnodeCount,
880 fragment::Column::StreamNode,
881 ])
882 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
883 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
884 .group_by(fragment::Column::FragmentId)
885 .into_model::<FragmentDesc>()
886 .all(&inner.db)
887 .await?;
888 for fragment in fragments {
889 let upstreams: Vec<FragmentId> = FragmentRelation::find()
890 .select_only()
891 .column(fragment_relation::Column::SourceFragmentId)
892 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
893 .into_tuple()
894 .all(&inner.db)
895 .await?;
896 result.push((fragment, upstreams));
897 }
898 Ok(result)
899 }
900
901 pub async fn list_sink_actor_mapping(
902 &self,
903 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
904 let inner = self.inner.read().await;
905 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
906 .select_only()
907 .columns([sink::Column::SinkId, sink::Column::Name])
908 .into_tuple()
909 .all(&inner.db)
910 .await?;
911 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
912 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
913
914 let actor_with_type: Vec<(ActorId, SinkId, i32)> = Actor::find()
915 .select_only()
916 .column(actor::Column::ActorId)
917 .columns([fragment::Column::JobId, fragment::Column::FragmentTypeMask])
918 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
919 .filter(fragment::Column::JobId.is_in(sink_ids))
920 .into_tuple()
921 .all(&inner.db)
922 .await?;
923
924 let mut sink_actor_mapping = HashMap::new();
925 for (actor_id, sink_id, type_mask) in actor_with_type {
926 if type_mask & PbFragmentTypeFlag::Sink as i32 != 0 {
927 sink_actor_mapping
928 .entry(sink_id)
929 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
930 .1
931 .push(actor_id);
932 }
933 }
934
935 Ok(sink_actor_mapping)
936 }
937
938 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
939 let inner = self.inner.read().await;
940 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
941 .select_only()
942 .columns([
943 fragment::Column::FragmentId,
944 fragment::Column::JobId,
945 fragment::Column::StateTableIds,
946 ])
947 .into_partial_model()
948 .all(&inner.db)
949 .await?;
950 Ok(fragment_state_tables)
951 }
952
953 pub async fn load_all_actors(
956 &self,
957 database_id: Option<DatabaseId>,
958 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
959 {
960 let inner = self.inner.read().await;
961 let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
962 let filter_condition = if let Some(database_id) = database_id {
963 filter_condition.and(object::Column::DatabaseId.eq(database_id))
964 } else {
965 filter_condition
966 };
967 #[expect(clippy::type_complexity)]
968 let mut actor_info_stream: BoxStream<
969 '_,
970 Result<
971 (
972 ActorId,
973 WorkerId,
974 Option<VnodeBitmap>,
975 FragmentId,
976 StreamNode,
977 I32Array,
978 DistributionType,
979 DatabaseId,
980 ObjectId,
981 ),
982 _,
983 >,
984 > = Actor::find()
985 .select_only()
986 .column(actor::Column::ActorId)
987 .column(actor::Column::WorkerId)
988 .column(actor::Column::VnodeBitmap)
989 .column(fragment::Column::FragmentId)
990 .column(fragment::Column::StreamNode)
991 .column(fragment::Column::StateTableIds)
992 .column(fragment::Column::DistributionType)
993 .column(object::Column::DatabaseId)
994 .column(object::Column::Oid)
995 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
996 .join(JoinType::InnerJoin, fragment::Relation::Object.def())
997 .filter(filter_condition)
998 .into_tuple()
999 .stream(&inner.db)
1000 .await?;
1001
1002 let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1003 HashMap::new();
1004
1005 while let Some((
1006 actor_id,
1007 worker_id,
1008 vnode_bitmap,
1009 fragment_id,
1010 node,
1011 state_table_ids,
1012 distribution_type,
1013 database_id,
1014 job_id,
1015 )) = actor_info_stream.try_next().await?
1016 {
1017 let fragment_infos = database_fragment_infos
1018 .entry(database_id)
1019 .or_default()
1020 .entry(job_id)
1021 .or_default();
1022 let state_table_ids = state_table_ids.into_inner();
1023 let state_table_ids = state_table_ids
1024 .into_iter()
1025 .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1026 .collect();
1027 let actor_info = InflightActorInfo {
1028 worker_id,
1029 vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1030 };
1031 match fragment_infos.entry(fragment_id) {
1032 Entry::Occupied(mut entry) => {
1033 let info: &mut InflightFragmentInfo = entry.get_mut();
1034 assert_eq!(info.state_table_ids, state_table_ids);
1035 assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1036 }
1037 Entry::Vacant(entry) => {
1038 entry.insert(InflightFragmentInfo {
1039 fragment_id: fragment_id as _,
1040 distribution_type,
1041 nodes: node.to_protobuf(),
1042 actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1043 state_table_ids,
1044 });
1045 }
1046 }
1047 }
1048
1049 debug!(?database_fragment_infos, "reload all actors");
1050
1051 Ok(database_fragment_infos)
1052 }
1053
1054 pub async fn migrate_actors(
1055 &self,
1056 plan: HashMap<WorkerSlotId, WorkerSlotId>,
1057 ) -> MetaResult<()> {
1058 let inner = self.inner.read().await;
1059 let txn = inner.db.begin().await?;
1060
1061 let actors: Vec<(
1062 FragmentId,
1063 DistributionType,
1064 ActorId,
1065 Option<VnodeBitmap>,
1066 WorkerId,
1067 ActorStatus,
1068 )> = Actor::find()
1069 .select_only()
1070 .columns([
1071 fragment::Column::FragmentId,
1072 fragment::Column::DistributionType,
1073 ])
1074 .columns([
1075 actor::Column::ActorId,
1076 actor::Column::VnodeBitmap,
1077 actor::Column::WorkerId,
1078 actor::Column::Status,
1079 ])
1080 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1081 .into_tuple()
1082 .all(&txn)
1083 .await?;
1084
1085 let mut actor_locations = HashMap::new();
1086
1087 for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1088 if *status != ActorStatus::Running {
1089 tracing::warn!(
1090 "skipping actor {} in fragment {} with status {:?}",
1091 actor_id,
1092 fragment_id,
1093 status
1094 );
1095 continue;
1096 }
1097
1098 actor_locations
1099 .entry(*worker_id)
1100 .or_insert(HashMap::new())
1101 .entry(*fragment_id)
1102 .or_insert(BTreeSet::new())
1103 .insert(*actor_id);
1104 }
1105
1106 let expired_or_changed_workers: HashSet<_> =
1107 plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1108
1109 let mut actor_migration_plan = HashMap::new();
1110 for (worker, fragment) in actor_locations {
1111 if expired_or_changed_workers.contains(&worker) {
1112 for (fragment_id, actors) in fragment {
1113 debug!(
1114 "worker {} expired or changed, migrating fragment {}",
1115 worker, fragment_id
1116 );
1117 let worker_slot_to_actor: HashMap<_, _> = actors
1118 .iter()
1119 .enumerate()
1120 .map(|(idx, actor_id)| {
1121 (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1122 })
1123 .collect();
1124
1125 for (worker_slot, actor) in worker_slot_to_actor {
1126 if let Some(target) = plan.get(&worker_slot) {
1127 actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1128 }
1129 }
1130 }
1131 }
1132 }
1133
1134 for (actor, worker) in actor_migration_plan {
1135 Actor::update_many()
1136 .col_expr(
1137 actor::Column::WorkerId,
1138 Expr::value(Value::Int(Some(worker))),
1139 )
1140 .filter(actor::Column::ActorId.eq(actor))
1141 .exec(&txn)
1142 .await?;
1143 }
1144
1145 txn.commit().await?;
1146
1147 self.notify_fragment_mapping(
1148 NotificationOperation::Update,
1149 rebuild_fragment_mapping_from_actors(actors),
1150 )
1151 .await;
1152
1153 Ok(())
1154 }
1155
1156 pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1157 let inner = self.inner.read().await;
1158
1159 let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1160 .select_only()
1161 .columns([fragment::Column::FragmentId])
1162 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1163 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1164 .into_tuple()
1165 .all(&inner.db)
1166 .await?;
1167
1168 let mut actor_locations = HashMap::new();
1169
1170 for (fragment_id, _, worker_id) in actors {
1171 *actor_locations
1172 .entry(worker_id)
1173 .or_insert(HashMap::new())
1174 .entry(fragment_id)
1175 .or_insert(0_usize) += 1;
1176 }
1177
1178 let mut result = HashSet::new();
1179 for (worker_id, mapping) in actor_locations {
1180 let max_fragment_len = mapping.values().max().unwrap();
1181
1182 result
1183 .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1184 }
1185
1186 Ok(result)
1187 }
1188
1189 pub async fn all_node_actors(
1190 &self,
1191 include_inactive: bool,
1192 ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1193 let inner = self.inner.read().await;
1194 let fragment_actors = if include_inactive {
1195 FragmentModel::find()
1196 .find_with_related(Actor)
1197 .all(&inner.db)
1198 .await?
1199 } else {
1200 FragmentModel::find()
1201 .find_with_related(Actor)
1202 .filter(actor::Column::Status.eq(ActorStatus::Running))
1203 .all(&inner.db)
1204 .await?
1205 };
1206
1207 let job_definitions = resolve_streaming_job_definition(
1208 &inner.db,
1209 &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1210 )
1211 .await?;
1212
1213 let mut node_actors = HashMap::new();
1214 for (fragment, actors) in fragment_actors {
1215 let job_id = fragment.job_id;
1216 let (table_fragments, actor_status, _) = Self::compose_fragment(
1217 fragment,
1218 actors,
1219 job_definitions.get(&(job_id as _)).cloned(),
1220 )?;
1221 for actor in table_fragments.actors {
1222 let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1223 node_actors
1224 .entry(node_id)
1225 .or_insert_with(Vec::new)
1226 .push(actor);
1227 }
1228 }
1229
1230 Ok(node_actors)
1231 }
1232
1233 pub async fn get_worker_actor_ids(
1234 &self,
1235 job_ids: Vec<ObjectId>,
1236 ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1237 let inner = self.inner.read().await;
1238 let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1239 .select_only()
1240 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1241 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1242 .filter(fragment::Column::JobId.is_in(job_ids))
1243 .into_tuple()
1244 .all(&inner.db)
1245 .await?;
1246
1247 let mut worker_actors = BTreeMap::new();
1248 for (actor_id, worker_id) in actor_workers {
1249 worker_actors
1250 .entry(worker_id)
1251 .or_insert_with(Vec::new)
1252 .push(actor_id);
1253 }
1254
1255 Ok(worker_actors)
1256 }
1257
1258 pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1259 let inner = self.inner.read().await;
1260 let txn = inner.db.begin().await?;
1261 for assignments in split_assignment.values() {
1262 for (actor_id, splits) in assignments {
1263 let actor_splits = splits.iter().map(Into::into).collect_vec();
1264 Actor::update(actor::ActiveModel {
1265 actor_id: Set(*actor_id as _),
1266 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1267 splits: actor_splits,
1268 }))),
1269 ..Default::default()
1270 })
1271 .exec(&txn)
1272 .await
1273 .map_err(|err| {
1274 if err == DbErr::RecordNotUpdated {
1275 MetaError::catalog_id_not_found("actor_id", actor_id)
1276 } else {
1277 err.into()
1278 }
1279 })?;
1280 }
1281 }
1282 txn.commit().await?;
1283
1284 Ok(())
1285 }
1286
1287 pub async fn fill_snapshot_backfill_epoch(
1288 &self,
1289 fragment_ids: impl Iterator<Item = FragmentId>,
1290 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1291 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1292 ) -> MetaResult<()> {
1293 let inner = self.inner.write().await;
1294 let txn = inner.db.begin().await?;
1295 for fragment_id in fragment_ids {
1296 let fragment = FragmentModel::find_by_id(fragment_id)
1297 .one(&txn)
1298 .await?
1299 .context(format!("fragment {} not found", fragment_id))?;
1300 let mut node = fragment.stream_node.to_protobuf();
1301 if crate::stream::fill_snapshot_backfill_epoch(
1302 &mut node,
1303 snapshot_backfill_info,
1304 cross_db_snapshot_backfill_info,
1305 )? {
1306 let node = StreamNode::from(&node);
1307 FragmentModel::update(fragment::ActiveModel {
1308 fragment_id: Set(fragment_id),
1309 stream_node: Set(node),
1310 ..Default::default()
1311 })
1312 .exec(&txn)
1313 .await?;
1314 }
1315 }
1316 txn.commit().await?;
1317 Ok(())
1318 }
1319
1320 pub async fn get_running_actors_of_fragment(
1322 &self,
1323 fragment_id: FragmentId,
1324 ) -> MetaResult<Vec<ActorId>> {
1325 let inner = self.inner.read().await;
1326 let actors: Vec<ActorId> = Actor::find()
1327 .select_only()
1328 .column(actor::Column::ActorId)
1329 .filter(actor::Column::FragmentId.eq(fragment_id))
1330 .filter(actor::Column::Status.eq(ActorStatus::Running))
1331 .into_tuple()
1332 .all(&inner.db)
1333 .await?;
1334 Ok(actors)
1335 }
1336
1337 pub async fn get_running_actors_for_source_backfill(
1340 &self,
1341 source_backfill_fragment_id: FragmentId,
1342 source_fragment_id: FragmentId,
1343 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1344 let inner = self.inner.read().await;
1345 let txn = inner.db.begin().await?;
1346
1347 let fragment_relation: DispatcherType = FragmentRelation::find()
1348 .select_only()
1349 .column(fragment_relation::Column::DispatcherType)
1350 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1351 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1352 .into_tuple()
1353 .one(&txn)
1354 .await?
1355 .ok_or_else(|| {
1356 anyhow!(
1357 "no fragment connection from source fragment {} to source backfill fragment {}",
1358 source_fragment_id,
1359 source_backfill_fragment_id
1360 )
1361 })?;
1362
1363 if fragment_relation != DispatcherType::NoShuffle {
1364 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1365 }
1366
1367 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1368 let result: MetaResult<DistributionType> = try {
1369 FragmentModel::find_by_id(fragment_id)
1370 .select_only()
1371 .column(fragment::Column::DistributionType)
1372 .into_tuple()
1373 .one(txn)
1374 .await?
1375 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1376 };
1377 result
1378 };
1379
1380 let source_backfill_distribution_type =
1381 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1382 let source_distribution_type =
1383 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1384
1385 let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1386 Actor::find()
1387 .select_only()
1388 .column(actor::Column::ActorId)
1389 .column(actor::Column::VnodeBitmap)
1390 .filter(actor::Column::FragmentId.eq(fragment_id))
1391 .into_tuple()
1392 .stream(txn)
1393 .await?
1394 .map(|result| {
1395 result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1396 (
1397 actor_id as _,
1398 vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1399 )
1400 })
1401 })
1402 .try_collect()
1403 .await
1404 };
1405
1406 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1407 load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1408
1409 let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1410
1411 Ok(resolve_no_shuffle_actor_dispatcher(
1412 source_distribution_type,
1413 &source_actors,
1414 source_backfill_distribution_type,
1415 &source_backfill_actors,
1416 )
1417 .into_iter()
1418 .map(|(source_actor, source_backfill_actor)| {
1419 (source_backfill_actor as _, source_actor as _)
1420 })
1421 .collect())
1422 }
1423
1424 pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1425 let inner = self.inner.read().await;
1426 let actors: Vec<ActorId> = Actor::find()
1427 .select_only()
1428 .column(actor::Column::ActorId)
1429 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1430 .filter(fragment::Column::JobId.is_in(job_ids))
1431 .into_tuple()
1432 .all(&inner.db)
1433 .await?;
1434 Ok(actors)
1435 }
1436
1437 pub async fn get_root_fragments(
1450 &self,
1451 job_ids: Vec<ObjectId>,
1452 ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1453 let inner = self.inner.read().await;
1454
1455 let job_definitions = resolve_streaming_job_definition(
1456 &inner.db,
1457 &HashSet::from_iter(job_ids.iter().copied()),
1458 )
1459 .await?;
1460
1461 let all_fragments = FragmentModel::find()
1462 .filter(fragment::Column::JobId.is_in(job_ids))
1463 .all(&inner.db)
1464 .await?;
1465 let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1467 for fragment in all_fragments {
1468 if (fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32) != 0
1469 || (fragment.fragment_type_mask & PbFragmentTypeFlag::Sink as i32) != 0
1470 {
1471 _ = root_fragments.insert(fragment.job_id, fragment);
1472 } else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
1473 _ = root_fragments.try_insert(fragment.job_id, fragment);
1476 }
1477 }
1478
1479 let mut root_fragments_pb = HashMap::new();
1480 for (_, fragment) in root_fragments {
1481 let actors = fragment.find_related(Actor).all(&inner.db).await?;
1482
1483 let job_id = fragment.job_id;
1484 root_fragments_pb.insert(
1485 fragment.job_id,
1486 Self::compose_fragment(
1487 fragment,
1488 actors,
1489 job_definitions.get(&(job_id as _)).cloned(),
1490 )?
1491 .0,
1492 );
1493 }
1494
1495 let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1496 .select_only()
1497 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1498 .into_tuple()
1499 .all(&inner.db)
1500 .await?;
1501
1502 Ok((root_fragments_pb, actors))
1503 }
1504
1505 pub async fn get_root_fragment(
1506 &self,
1507 job_id: ObjectId,
1508 ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1509 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1510 let root_fragment = root_fragments
1511 .remove(&job_id)
1512 .context(format!("root fragment for job {} not found", job_id))?;
1513 Ok((root_fragment, actors))
1514 }
1515
1516 pub async fn get_downstream_fragments(
1518 &self,
1519 job_id: ObjectId,
1520 ) -> MetaResult<(Vec<(DispatchStrategy, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1521 let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1522
1523 let inner = self.inner.read().await;
1524 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1525 .filter(
1526 fragment_relation::Column::SourceFragmentId
1527 .eq(root_fragment.fragment_id as FragmentId),
1528 )
1529 .all(&inner.db)
1530 .await?;
1531 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1532 .await?
1533 .remove(&job_id);
1534
1535 let mut downstream_fragments = vec![];
1536 for fragment_relation::Model {
1537 target_fragment_id: fragment_id,
1538 dispatcher_type,
1539 dist_key_indices,
1540 output_indices,
1541 ..
1542 } in downstream_fragment_relations
1543 {
1544 let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1545 .find_with_related(Actor)
1546 .all(&inner.db)
1547 .await?;
1548 if fragment_actors.is_empty() {
1549 bail!("No fragment found for fragment id {}", fragment_id);
1550 }
1551 assert_eq!(fragment_actors.len(), 1);
1552 let (fragment, actors) = fragment_actors.pop().unwrap();
1553 let dispatch_strategy = DispatchStrategy {
1554 r#type: PbDispatcherType::from(dispatcher_type) as _,
1555 dist_key_indices: dist_key_indices.into_u32_array(),
1556 output_indices: output_indices.into_u32_array(),
1557 };
1558 let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1559 downstream_fragments.push((dispatch_strategy, fragment));
1560 }
1561
1562 Ok((downstream_fragments, actors))
1563 }
1564
1565 pub async fn load_source_fragment_ids(
1566 &self,
1567 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1568 let inner = self.inner.read().await;
1569 let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1570 .select_only()
1571 .columns([
1572 fragment::Column::FragmentId,
1573 fragment::Column::FragmentTypeMask,
1574 fragment::Column::StreamNode,
1575 ])
1576 .into_tuple()
1577 .all(&inner.db)
1578 .await?;
1579 fragments.retain(|(_, mask, _)| *mask & PbFragmentTypeFlag::Source as i32 != 0);
1580
1581 let mut source_fragment_ids = HashMap::new();
1582 for (fragment_id, _, stream_node) in fragments {
1583 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1584 source_fragment_ids
1585 .entry(source_id as SourceId)
1586 .or_insert_with(BTreeSet::new)
1587 .insert(fragment_id);
1588 }
1589 }
1590 Ok(source_fragment_ids)
1591 }
1592
1593 pub async fn load_backfill_fragment_ids(
1594 &self,
1595 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1596 let inner = self.inner.read().await;
1597 let mut fragments: Vec<(FragmentId, i32, StreamNode)> = FragmentModel::find()
1598 .select_only()
1599 .columns([
1600 fragment::Column::FragmentId,
1601 fragment::Column::FragmentTypeMask,
1602 fragment::Column::StreamNode,
1603 ])
1604 .into_tuple()
1605 .all(&inner.db)
1606 .await?;
1607 fragments.retain(|(_, mask, _)| *mask & PbFragmentTypeFlag::SourceScan as i32 != 0);
1608
1609 let mut source_fragment_ids = HashMap::new();
1610 for (fragment_id, _, stream_node) in fragments {
1611 if let Some((source_id, upstream_source_fragment_id)) =
1612 stream_node.to_protobuf().find_source_backfill()
1613 {
1614 source_fragment_ids
1615 .entry(source_id as SourceId)
1616 .or_insert_with(BTreeSet::new)
1617 .insert((fragment_id, upstream_source_fragment_id));
1618 }
1619 }
1620 Ok(source_fragment_ids)
1621 }
1622
1623 pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1624 let inner = self.inner.read().await;
1625 let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1626 .select_only()
1627 .columns([actor::Column::ActorId, actor::Column::Splits])
1628 .filter(actor::Column::Splits.is_not_null())
1629 .into_tuple()
1630 .all(&inner.db)
1631 .await?;
1632 Ok(splits.into_iter().collect())
1633 }
1634
1635 pub async fn get_actual_job_fragment_parallelism(
1637 &self,
1638 job_id: ObjectId,
1639 ) -> MetaResult<Option<usize>> {
1640 let inner = self.inner.read().await;
1641 let mut fragments: Vec<(FragmentId, i32, i64)> = FragmentModel::find()
1642 .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1643 .select_only()
1644 .columns([
1645 fragment::Column::FragmentId,
1646 fragment::Column::FragmentTypeMask,
1647 ])
1648 .column_as(actor::Column::ActorId.count(), "count")
1649 .filter(fragment::Column::JobId.eq(job_id))
1650 .group_by(fragment::Column::FragmentId)
1651 .into_tuple()
1652 .all(&inner.db)
1653 .await?;
1654
1655 fragments.retain(|(_, mask, _)| {
1656 *mask & PbFragmentTypeFlag::Mview as i32 != 0
1657 || *mask & PbFragmentTypeFlag::Sink as i32 != 0
1658 });
1659
1660 Ok(fragments
1661 .into_iter()
1662 .at_most_one()
1663 .ok()
1664 .flatten()
1665 .map(|(_, _, count)| count as usize))
1666 }
1667}
1668
1669#[cfg(test)]
1670mod tests {
1671 use std::collections::{BTreeMap, HashMap, HashSet};
1672
1673 use itertools::Itertools;
1674 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1675 use risingwave_common::util::iter_util::ZipEqDebug;
1676 use risingwave_common::util::stream_graph_visitor::visit_stream_node;
1677 use risingwave_meta_model::actor::ActorStatus;
1678 use risingwave_meta_model::fragment::DistributionType;
1679 use risingwave_meta_model::{
1680 ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1681 VnodeBitmap, actor, fragment,
1682 };
1683 use risingwave_pb::common::PbActorLocation;
1684 use risingwave_pb::meta::table_fragments::PbActorStatus;
1685 use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1686 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1687 use risingwave_pb::plan_common::PbExprContext;
1688 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1689 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1690 use risingwave_pb::stream_plan::{MergeNode, PbFragmentTypeFlag, PbStreamNode, PbUnionNode};
1691
1692 use crate::MetaResult;
1693 use crate::controller::catalog::CatalogController;
1694 use crate::model::{ActorUpstreams, Fragment, FragmentActorUpstreams, StreamActor};
1695
1696 const TEST_FRAGMENT_ID: FragmentId = 1;
1697
1698 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1699
1700 const TEST_JOB_ID: ObjectId = 1;
1701
1702 const TEST_STATE_TABLE_ID: TableId = 1000;
1703
1704 fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1705 let mut upstream_actor_ids = BTreeMap::new();
1706 upstream_actor_ids.insert(
1707 TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1708 HashSet::from_iter([(actor_id + 100)]),
1709 );
1710 upstream_actor_ids.insert(
1711 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1712 HashSet::from_iter([(actor_id + 200)]),
1713 );
1714 upstream_actor_ids
1715 }
1716
1717 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1718 let mut input = vec![];
1719 for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1720 input.push(PbStreamNode {
1721 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1722 upstream_fragment_id: *upstream_fragment_id as _,
1723 ..Default::default()
1724 }))),
1725 ..Default::default()
1726 });
1727 }
1728
1729 PbStreamNode {
1730 input,
1731 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1732 ..Default::default()
1733 }
1734 }
1735
1736 #[tokio::test]
1737 async fn test_extract_fragment() -> MetaResult<()> {
1738 let actor_count = 3u32;
1739 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1740 .map(|actor_id| {
1741 (
1742 actor_id as _,
1743 generate_upstream_actor_ids_for_actor(actor_id),
1744 )
1745 })
1746 .collect();
1747
1748 let actor_bitmaps = ActorMapping::new_uniform(
1749 (0..actor_count).map(|i| i as _),
1750 VirtualNode::COUNT_FOR_TEST,
1751 )
1752 .to_bitmaps();
1753
1754 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1755
1756 let pb_actors = (0..actor_count)
1757 .map(|actor_id| StreamActor {
1758 actor_id: actor_id as _,
1759 fragment_id: TEST_FRAGMENT_ID as _,
1760 vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1761 mview_definition: "".to_owned(),
1762 expr_context: Some(PbExprContext {
1763 time_zone: String::from("America/New_York"),
1764 strict_mode: false,
1765 }),
1766 })
1767 .collect_vec();
1768
1769 let pb_fragment = Fragment {
1770 fragment_id: TEST_FRAGMENT_ID as _,
1771 fragment_type_mask: PbFragmentTypeFlag::Source as _,
1772 distribution_type: PbFragmentDistributionType::Hash as _,
1773 actors: pb_actors.clone(),
1774 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
1775 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
1776 nodes: stream_node.clone(),
1777 };
1778
1779 let pb_actor_status = (0..actor_count)
1780 .map(|actor_id| {
1781 (
1782 actor_id,
1783 PbActorStatus {
1784 location: PbActorLocation::from_worker(0),
1785 state: PbActorState::Running as _,
1786 },
1787 )
1788 })
1789 .collect();
1790
1791 let pb_actor_splits = Default::default();
1792
1793 let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
1794 TEST_JOB_ID,
1795 &pb_fragment,
1796 &pb_actor_status,
1797 &pb_actor_splits,
1798 )?;
1799
1800 check_fragment(fragment, pb_fragment);
1801 check_actors(
1802 actors,
1803 &upstream_actor_ids,
1804 pb_actors,
1805 Default::default(),
1806 &stream_node,
1807 );
1808
1809 Ok(())
1810 }
1811
1812 #[tokio::test]
1813 async fn test_compose_fragment() -> MetaResult<()> {
1814 let actor_count = 3u32;
1815
1816 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1817 .map(|actor_id| {
1818 (
1819 actor_id as _,
1820 generate_upstream_actor_ids_for_actor(actor_id),
1821 )
1822 })
1823 .collect();
1824
1825 let mut actor_bitmaps = ActorMapping::new_uniform(
1826 (0..actor_count).map(|i| i as _),
1827 VirtualNode::COUNT_FOR_TEST,
1828 )
1829 .to_bitmaps();
1830
1831 let actors = (0..actor_count)
1832 .map(|actor_id| {
1833 let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
1834 splits: vec![PbConnectorSplit {
1835 split_type: "dummy".to_owned(),
1836 ..Default::default()
1837 }],
1838 }));
1839
1840 #[expect(deprecated)]
1841 actor::Model {
1842 actor_id: actor_id as ActorId,
1843 fragment_id: TEST_FRAGMENT_ID,
1844 status: ActorStatus::Running,
1845 splits: actor_splits,
1846 worker_id: 0,
1847 upstream_actor_ids: Default::default(),
1848 vnode_bitmap: actor_bitmaps
1849 .remove(&actor_id)
1850 .map(|bitmap| bitmap.to_protobuf())
1851 .as_ref()
1852 .map(VnodeBitmap::from),
1853 expr_context: ExprContext::from(&PbExprContext {
1854 time_zone: String::from("America/New_York"),
1855 strict_mode: false,
1856 }),
1857 }
1858 })
1859 .collect_vec();
1860
1861 let stream_node = {
1862 let template_actor = actors.first().cloned().unwrap();
1863
1864 let template_upstream_actor_ids = upstream_actor_ids
1865 .get(&(template_actor.actor_id as _))
1866 .unwrap();
1867
1868 generate_merger_stream_node(template_upstream_actor_ids)
1869 };
1870
1871 #[expect(deprecated)]
1872 let fragment = fragment::Model {
1873 fragment_id: TEST_FRAGMENT_ID,
1874 job_id: TEST_JOB_ID,
1875 fragment_type_mask: 0,
1876 distribution_type: DistributionType::Hash,
1877 stream_node: StreamNode::from(&stream_node),
1878 state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
1879 upstream_fragment_id: Default::default(),
1880 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
1881 };
1882
1883 let (pb_fragment, pb_actor_status, pb_actor_splits) =
1884 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
1885
1886 assert_eq!(pb_actor_status.len(), actor_count as usize);
1887 assert_eq!(pb_actor_splits.len(), actor_count as usize);
1888
1889 let pb_actors = pb_fragment.actors.clone();
1890
1891 check_fragment(fragment, pb_fragment);
1892 check_actors(
1893 actors,
1894 &upstream_actor_ids,
1895 pb_actors,
1896 pb_actor_splits,
1897 &stream_node,
1898 );
1899
1900 Ok(())
1901 }
1902
1903 fn check_actors(
1904 actors: Vec<actor::Model>,
1905 actor_upstreams: &FragmentActorUpstreams,
1906 pb_actors: Vec<StreamActor>,
1907 pb_actor_splits: HashMap<u32, PbConnectorSplits>,
1908 stream_node: &PbStreamNode,
1909 ) {
1910 for (
1911 actor::Model {
1912 actor_id,
1913 fragment_id,
1914 status,
1915 splits,
1916 worker_id: _,
1917 vnode_bitmap,
1918 expr_context,
1919 ..
1920 },
1921 StreamActor {
1922 actor_id: pb_actor_id,
1923 fragment_id: pb_fragment_id,
1924 vnode_bitmap: pb_vnode_bitmap,
1925 mview_definition,
1926 expr_context: pb_expr_context,
1927 ..
1928 },
1929 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
1930 {
1931 assert_eq!(actor_id, pb_actor_id as ActorId);
1932 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
1933
1934 assert_eq!(
1935 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1936 pb_vnode_bitmap,
1937 );
1938
1939 assert_eq!(mview_definition, "");
1940
1941 visit_stream_node(stream_node, |body| {
1942 if let PbNodeBody::Merge(m) = body {
1943 assert!(
1944 actor_upstreams
1945 .get(&(actor_id as _))
1946 .unwrap()
1947 .contains_key(&m.upstream_fragment_id)
1948 );
1949 }
1950 });
1951
1952 assert_eq!(status, ActorStatus::Running);
1953
1954 assert_eq!(
1955 splits,
1956 pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
1957 );
1958
1959 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
1960 }
1961 }
1962
1963 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
1964 let Fragment {
1965 fragment_id,
1966 fragment_type_mask,
1967 distribution_type: pb_distribution_type,
1968 actors: _,
1969 state_table_ids: pb_state_table_ids,
1970 maybe_vnode_count: _,
1971 nodes,
1972 } = pb_fragment;
1973
1974 assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
1975 assert_eq!(fragment_type_mask, fragment.fragment_type_mask as u32);
1976 assert_eq!(
1977 pb_distribution_type,
1978 PbFragmentDistributionType::from(fragment.distribution_type)
1979 );
1980
1981 assert_eq!(
1982 pb_state_table_ids,
1983 fragment.state_table_ids.into_u32_array()
1984 );
1985 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
1986 }
1987}