risingwave_meta/controller/
scale.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::ops::{BitAnd, BitOrAssign};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_connector::source::{SplitImpl, SplitMetaData};
21use risingwave_meta_model::actor::ActorStatus;
22use risingwave_meta_model::fragment::DistributionType;
23use risingwave_meta_model::prelude::{
24    Actor, Fragment, FragmentRelation, Sink, Source, StreamingJob, Table,
25};
26use risingwave_meta_model::{
27    ConnectorSplits, DispatcherType, FragmentId, ObjectId, VnodeBitmap, actor, fragment,
28    fragment_relation, sink, source, streaming_job, table,
29};
30use risingwave_meta_model_migration::{
31    Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement,
32    UnionType, WithClause, WithQuery,
33};
34use risingwave_pb::stream_plan::PbDispatcher;
35use sea_orm::{
36    ColumnTrait, ConnectionTrait, DbErr, DerivePartialModel, EntityTrait, FromQueryResult,
37    QueryFilter, QuerySelect, Statement, TransactionTrait,
38};
39
40use crate::controller::catalog::CatalogController;
41use crate::controller::utils::{get_existing_job_resource_group, get_fragment_actor_dispatchers};
42use crate::model::ActorId;
43use crate::{MetaError, MetaResult};
44
45/// This function will construct a query using recursive cte to find `no_shuffle` upstream relation graph for target fragments.
46///
47/// # Examples
48///
49/// ```
50/// use risingwave_meta::controller::scale::construct_no_shuffle_upstream_traverse_query;
51/// use sea_orm::sea_query::*;
52/// use sea_orm::*;
53///
54/// let query = construct_no_shuffle_upstream_traverse_query(vec![2, 3]);
55///
56/// assert_eq!(query.to_string(MysqlQueryBuilder), r#"WITH RECURSIVE `shuffle_deps` (`source_fragment_id`, `dispatcher_type`, `target_fragment_id`) AS (SELECT DISTINCT `fragment_relation`.`source_fragment_id`, `fragment_relation`.`dispatcher_type`, `fragment_relation`.`target_fragment_id` FROM `fragment_relation` WHERE `fragment_relation`.`dispatcher_type` = 'NO_SHUFFLE' AND `fragment_relation`.`target_fragment_id` IN (2, 3) UNION ALL (SELECT `fragment_relation`.`source_fragment_id`, `fragment_relation`.`dispatcher_type`, `fragment_relation`.`target_fragment_id` FROM `fragment_relation` INNER JOIN `shuffle_deps` ON `shuffle_deps`.`source_fragment_id` = `fragment_relation`.`target_fragment_id` WHERE `fragment_relation`.`dispatcher_type` = 'NO_SHUFFLE')) SELECT DISTINCT `source_fragment_id`, `dispatcher_type`, `target_fragment_id` FROM `shuffle_deps`"#);
57/// assert_eq!(query.to_string(PostgresQueryBuilder), r#"WITH RECURSIVE "shuffle_deps" ("source_fragment_id", "dispatcher_type", "target_fragment_id") AS (SELECT DISTINCT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE' AND "fragment_relation"."target_fragment_id" IN (2, 3) UNION ALL (SELECT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" INNER JOIN "shuffle_deps" ON "shuffle_deps"."source_fragment_id" = "fragment_relation"."target_fragment_id" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE')) SELECT DISTINCT "source_fragment_id", "dispatcher_type", "target_fragment_id" FROM "shuffle_deps""#);
58/// assert_eq!(query.to_string(SqliteQueryBuilder), r#"WITH RECURSIVE "shuffle_deps" ("source_fragment_id", "dispatcher_type", "target_fragment_id") AS (SELECT DISTINCT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE' AND "fragment_relation"."target_fragment_id" IN (2, 3) UNION ALL SELECT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" INNER JOIN "shuffle_deps" ON "shuffle_deps"."source_fragment_id" = "fragment_relation"."target_fragment_id" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE') SELECT DISTINCT "source_fragment_id", "dispatcher_type", "target_fragment_id" FROM "shuffle_deps""#);
59/// ```
60pub fn construct_no_shuffle_upstream_traverse_query(fragment_ids: Vec<FragmentId>) -> WithQuery {
61    construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Upstream)
62}
63
64pub fn construct_no_shuffle_downstream_traverse_query(fragment_ids: Vec<FragmentId>) -> WithQuery {
65    construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Downstream)
66}
67
68enum NoShuffleResolveDirection {
69    Upstream,
70    Downstream,
71}
72
73fn construct_no_shuffle_traverse_query_helper(
74    fragment_ids: Vec<FragmentId>,
75    direction: NoShuffleResolveDirection,
76) -> WithQuery {
77    let cte_alias = Alias::new("shuffle_deps");
78
79    // If we need to look upwards
80    //     resolve by upstream fragment_id -> downstream fragment_id
81    // and if downwards
82    //     resolve by downstream fragment_id -> upstream fragment_id
83    let (cte_ref_column, compared_column) = match direction {
84        NoShuffleResolveDirection::Upstream => (
85            (
86                cte_alias.clone(),
87                fragment_relation::Column::SourceFragmentId,
88            )
89                .into_column_ref(),
90            (
91                FragmentRelation,
92                fragment_relation::Column::TargetFragmentId,
93            )
94                .into_column_ref(),
95        ),
96        NoShuffleResolveDirection::Downstream => (
97            (
98                cte_alias.clone(),
99                fragment_relation::Column::TargetFragmentId,
100            )
101                .into_column_ref(),
102            (
103                FragmentRelation,
104                fragment_relation::Column::SourceFragmentId,
105            )
106                .into_column_ref(),
107        ),
108    };
109
110    let mut base_query = SelectStatement::new()
111        .column((
112            FragmentRelation,
113            fragment_relation::Column::SourceFragmentId,
114        ))
115        .column((FragmentRelation, fragment_relation::Column::DispatcherType))
116        .column((
117            FragmentRelation,
118            fragment_relation::Column::TargetFragmentId,
119        ))
120        .distinct()
121        .from(FragmentRelation)
122        .and_where(
123            Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType))
124                .eq(DispatcherType::NoShuffle),
125        )
126        .and_where(Expr::col(compared_column.clone()).is_in(fragment_ids))
127        .to_owned();
128
129    let cte_referencing = SelectStatement::new()
130        .column((
131            FragmentRelation,
132            fragment_relation::Column::SourceFragmentId,
133        ))
134        .column((FragmentRelation, fragment_relation::Column::DispatcherType))
135        .column((
136            FragmentRelation,
137            fragment_relation::Column::TargetFragmentId,
138        ))
139        // NOTE: Uncomment me once MySQL supports DISTINCT in the recursive block of CTE.
140        //.distinct()
141        .from(FragmentRelation)
142        .inner_join(
143            cte_alias.clone(),
144            Expr::col(cte_ref_column).eq(Expr::col(compared_column)),
145        )
146        .and_where(
147            Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType))
148                .eq(DispatcherType::NoShuffle),
149        )
150        .to_owned();
151
152    let mut common_table_expr = CommonTableExpression::new();
153    common_table_expr
154        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
155        .column(fragment_relation::Column::SourceFragmentId)
156        .column(fragment_relation::Column::DispatcherType)
157        .column(fragment_relation::Column::TargetFragmentId)
158        .table_name(cte_alias.clone());
159
160    SelectStatement::new()
161        .column(fragment_relation::Column::SourceFragmentId)
162        .column(fragment_relation::Column::DispatcherType)
163        .column(fragment_relation::Column::TargetFragmentId)
164        .distinct()
165        .from(cte_alias)
166        .to_owned()
167        .with(
168            WithClause::new()
169                .recursive(true)
170                .cte(common_table_expr)
171                .to_owned(),
172        )
173}
174
175#[derive(Debug, Clone)]
176pub struct RescheduleWorkingSet {
177    pub fragments: HashMap<FragmentId, fragment::Model>,
178    pub actors: HashMap<ActorId, actor::Model>,
179    pub actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
180
181    pub fragment_downstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
182    pub fragment_upstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
183
184    pub job_resource_groups: HashMap<ObjectId, String>,
185    pub related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
186}
187
188async fn resolve_no_shuffle_query<C>(
189    txn: &C,
190    query: WithQuery,
191) -> MetaResult<Vec<(FragmentId, DispatcherType, FragmentId)>>
192where
193    C: ConnectionTrait,
194{
195    let (sql, values) = query.build_any(&*txn.get_database_backend().get_query_builder());
196
197    let result = txn
198        .query_all(Statement::from_sql_and_values(
199            txn.get_database_backend(),
200            sql,
201            values,
202        ))
203        .await?
204        .into_iter()
205        .map(|res| res.try_get_many_by_index())
206        .collect::<Result<Vec<(FragmentId, DispatcherType, FragmentId)>, DbErr>>()
207        .map_err(MetaError::from)?;
208
209    Ok(result)
210}
211
212pub(crate) async fn resolve_streaming_job_definition<C>(
213    txn: &C,
214    job_ids: &HashSet<ObjectId>,
215) -> MetaResult<HashMap<ObjectId, String>>
216where
217    C: ConnectionTrait,
218{
219    let job_ids = job_ids.iter().cloned().collect_vec();
220
221    // including table, materialized view, index
222    let common_job_definitions: Vec<(ObjectId, String)> = Table::find()
223        .select_only()
224        .columns([
225            table::Column::TableId,
226            #[cfg(not(debug_assertions))]
227            table::Column::Name,
228            #[cfg(debug_assertions)]
229            table::Column::Definition,
230        ])
231        .filter(table::Column::TableId.is_in(job_ids.clone()))
232        .into_tuple()
233        .all(txn)
234        .await?;
235
236    let sink_definitions: Vec<(ObjectId, String)> = Sink::find()
237        .select_only()
238        .columns([
239            sink::Column::SinkId,
240            #[cfg(not(debug_assertions))]
241            sink::Column::Name,
242            #[cfg(debug_assertions)]
243            sink::Column::Definition,
244        ])
245        .filter(sink::Column::SinkId.is_in(job_ids.clone()))
246        .into_tuple()
247        .all(txn)
248        .await?;
249
250    let source_definitions: Vec<(ObjectId, String)> = Source::find()
251        .select_only()
252        .columns([
253            source::Column::SourceId,
254            #[cfg(not(debug_assertions))]
255            source::Column::Name,
256            #[cfg(debug_assertions)]
257            source::Column::Definition,
258        ])
259        .filter(source::Column::SourceId.is_in(job_ids.clone()))
260        .into_tuple()
261        .all(txn)
262        .await?;
263
264    let definitions: HashMap<ObjectId, String> = common_job_definitions
265        .into_iter()
266        .chain(sink_definitions.into_iter())
267        .chain(source_definitions.into_iter())
268        .collect();
269
270    Ok(definitions)
271}
272
273impl CatalogController {
274    pub async fn resolve_working_set_for_reschedule_fragments(
275        &self,
276        fragment_ids: Vec<FragmentId>,
277    ) -> MetaResult<RescheduleWorkingSet> {
278        let inner = self.inner.read().await;
279        self.resolve_working_set_for_reschedule_helper(&inner.db, fragment_ids)
280            .await
281    }
282
283    pub async fn resolve_working_set_for_reschedule_tables(
284        &self,
285        table_ids: Vec<ObjectId>,
286    ) -> MetaResult<RescheduleWorkingSet> {
287        let inner = self.inner.read().await;
288        let txn = inner.db.begin().await?;
289
290        let fragment_ids: Vec<FragmentId> = Fragment::find()
291            .select_only()
292            .column(fragment::Column::FragmentId)
293            .filter(fragment::Column::JobId.is_in(table_ids))
294            .into_tuple()
295            .all(&txn)
296            .await?;
297
298        self.resolve_working_set_for_reschedule_helper(&txn, fragment_ids)
299            .await
300    }
301
302    pub async fn resolve_working_set_for_reschedule_helper<C>(
303        &self,
304        txn: &C,
305        fragment_ids: Vec<FragmentId>,
306    ) -> MetaResult<RescheduleWorkingSet>
307    where
308        C: ConnectionTrait,
309    {
310        // NO_SHUFFLE related multi-layer upstream fragments
311        let no_shuffle_related_upstream_fragment_ids = resolve_no_shuffle_query(
312            txn,
313            construct_no_shuffle_upstream_traverse_query(fragment_ids.clone()),
314        )
315        .await?;
316
317        // NO_SHUFFLE related multi-layer downstream fragments
318        let no_shuffle_related_downstream_fragment_ids = resolve_no_shuffle_query(
319            txn,
320            construct_no_shuffle_downstream_traverse_query(
321                no_shuffle_related_upstream_fragment_ids
322                    .iter()
323                    .map(|(src, _, _)| *src)
324                    .chain(fragment_ids.iter().cloned())
325                    .unique()
326                    .collect(),
327            ),
328        )
329        .await?;
330
331        // We need to identify all other types of dispatchers that are Leaves in the NO_SHUFFLE dependency tree.
332        let extended_fragment_ids: HashSet<_> = no_shuffle_related_upstream_fragment_ids
333            .iter()
334            .chain(no_shuffle_related_downstream_fragment_ids.iter())
335            .flat_map(|(src, _, dst)| [*src, *dst])
336            .chain(fragment_ids.iter().cloned())
337            .collect();
338
339        let query = FragmentRelation::find()
340            .select_only()
341            .column(fragment_relation::Column::SourceFragmentId)
342            .column(fragment_relation::Column::DispatcherType)
343            .column(fragment_relation::Column::TargetFragmentId)
344            .distinct();
345
346        // single-layer upstream fragment ids
347        let upstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query
348            .clone()
349            .filter(
350                fragment_relation::Column::TargetFragmentId.is_in(extended_fragment_ids.clone()),
351            )
352            .into_tuple()
353            .all(txn)
354            .await?;
355
356        // single-layer downstream fragment ids
357        let downstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query
358            .clone()
359            .filter(
360                fragment_relation::Column::SourceFragmentId.is_in(extended_fragment_ids.clone()),
361            )
362            .into_tuple()
363            .all(txn)
364            .await?;
365
366        let all_fragment_relations: HashSet<_> = no_shuffle_related_upstream_fragment_ids
367            .into_iter()
368            .chain(no_shuffle_related_downstream_fragment_ids.into_iter())
369            .chain(upstream_fragments.into_iter())
370            .chain(downstream_fragments.into_iter())
371            .collect();
372
373        let mut fragment_upstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>> =
374            HashMap::new();
375        let mut fragment_downstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>> =
376            HashMap::new();
377
378        for (src, dispatcher_type, dst) in &all_fragment_relations {
379            fragment_upstreams
380                .entry(*dst)
381                .or_default()
382                .insert(*src, *dispatcher_type);
383            fragment_downstreams
384                .entry(*src)
385                .or_default()
386                .insert(*dst, *dispatcher_type);
387        }
388
389        let all_fragment_ids: HashSet<_> = all_fragment_relations
390            .iter()
391            .flat_map(|(src, _, dst)| [*src, *dst])
392            .chain(extended_fragment_ids.into_iter())
393            .collect();
394
395        let fragments: Vec<_> = Fragment::find()
396            .filter(fragment::Column::FragmentId.is_in(all_fragment_ids.clone()))
397            .all(txn)
398            .await?;
399
400        let actors: Vec<_> = Actor::find()
401            .filter(actor::Column::FragmentId.is_in(all_fragment_ids.clone()))
402            .all(txn)
403            .await?;
404
405        let actors: HashMap<_, _> = actors
406            .into_iter()
407            .map(|actor| (actor.actor_id as _, actor))
408            .collect();
409
410        let fragments: HashMap<FragmentId, _> = fragments
411            .into_iter()
412            .map(|fragment| (fragment.fragment_id, fragment))
413            .collect();
414
415        let related_job_ids: HashSet<_> =
416            fragments.values().map(|fragment| fragment.job_id).collect();
417
418        let mut job_resource_groups = HashMap::new();
419        for &job_id in &related_job_ids {
420            let resource_group = get_existing_job_resource_group(txn, job_id).await?;
421            job_resource_groups.insert(job_id, resource_group);
422        }
423
424        let related_job_definitions =
425            resolve_streaming_job_definition(txn, &related_job_ids).await?;
426
427        let related_jobs = StreamingJob::find()
428            .filter(streaming_job::Column::JobId.is_in(related_job_ids))
429            .all(txn)
430            .await?;
431
432        let related_jobs = related_jobs
433            .into_iter()
434            .map(|job| {
435                let job_id = job.job_id;
436                (
437                    job_id,
438                    (
439                        job,
440                        related_job_definitions
441                            .get(&job_id)
442                            .cloned()
443                            .unwrap_or("".to_owned()),
444                    ),
445                )
446            })
447            .collect();
448
449        let fragment_actor_dispatchers = get_fragment_actor_dispatchers(
450            txn,
451            fragments
452                .keys()
453                .map(|fragment_id| *fragment_id as _)
454                .collect(),
455        )
456        .await?;
457
458        Ok(RescheduleWorkingSet {
459            fragments,
460            actors,
461            actor_dispatchers: fragment_actor_dispatchers.into_values().flatten().collect(),
462            fragment_downstreams,
463            fragment_upstreams,
464            job_resource_groups,
465            related_jobs,
466        })
467    }
468}
469
470macro_rules! crit_check_in_loop {
471    ($flag:expr, $condition:expr, $message:expr) => {
472        if !$condition {
473            tracing::error!("Integrity check failed: {}", $message);
474            $flag = true;
475            continue;
476        }
477    };
478}
479
480impl CatalogController {
481    pub async fn integrity_check(&self) -> MetaResult<()> {
482        let inner = self.inner.read().await;
483        let txn = inner.db.begin().await?;
484        Self::graph_check(&txn).await
485    }
486
487    // Perform integrity checks on the Actor, ActorDispatcher and Fragment tables.
488    pub async fn graph_check<C>(txn: &C) -> MetaResult<()>
489    where
490        C: ConnectionTrait,
491    {
492        #[derive(Clone, DerivePartialModel, FromQueryResult)]
493        #[sea_orm(entity = "Fragment")]
494        pub struct PartialFragment {
495            pub fragment_id: FragmentId,
496            pub distribution_type: DistributionType,
497            pub vnode_count: i32,
498        }
499
500        #[derive(Clone, DerivePartialModel, FromQueryResult)]
501        #[sea_orm(entity = "Actor")]
502        pub struct PartialActor {
503            pub actor_id: risingwave_meta_model::ActorId,
504            pub fragment_id: FragmentId,
505            pub status: ActorStatus,
506            pub splits: Option<ConnectorSplits>,
507            pub vnode_bitmap: Option<VnodeBitmap>,
508        }
509
510        let mut flag = false;
511
512        let fragments: Vec<PartialFragment> =
513            Fragment::find().into_partial_model().all(txn).await?;
514
515        let fragment_map: HashMap<_, _> = fragments
516            .into_iter()
517            .map(|fragment| (fragment.fragment_id, fragment))
518            .collect();
519
520        let actors: Vec<PartialActor> = Actor::find().into_partial_model().all(txn).await?;
521
522        let mut fragment_actors = HashMap::new();
523        for actor in &actors {
524            fragment_actors
525                .entry(actor.fragment_id)
526                .or_insert(HashSet::new())
527                .insert(actor.actor_id);
528        }
529
530        let actor_map: HashMap<_, _> = actors
531            .into_iter()
532            .map(|actor| (actor.actor_id, actor))
533            .collect();
534
535        for (fragment_id, actor_ids) in &fragment_actors {
536            crit_check_in_loop!(
537                flag,
538                fragment_map.contains_key(fragment_id),
539                format!("Fragment {fragment_id} has actors {actor_ids:?} which does not exist",)
540            );
541
542            let mut split_map = HashMap::new();
543            for actor_id in actor_ids {
544                let actor = &actor_map[actor_id];
545
546                if let Some(splits) = &actor.splits {
547                    for split in splits.to_protobuf().splits {
548                        let Ok(split_impl) = SplitImpl::try_from(&split) else {
549                            continue;
550                        };
551
552                        let dup_split_actor = split_map.insert(split_impl.id(), actor_id);
553                        crit_check_in_loop!(
554                            flag,
555                            dup_split_actor.is_none(),
556                            format!(
557                                "Fragment {fragment_id} actor {actor_id} has duplicate split {split:?} from actor {dup_split_actor:?}",
558                            )
559                        );
560                    }
561                }
562            }
563
564            let fragment = &fragment_map[fragment_id];
565
566            match fragment.distribution_type {
567                DistributionType::Single => {
568                    crit_check_in_loop!(
569                        flag,
570                        actor_ids.len() == 1,
571                        format!(
572                            "Fragment {fragment_id} has more than one actors {actor_ids:?} for single distribution type",
573                        )
574                    );
575
576                    let actor_id = actor_ids.iter().exactly_one().unwrap();
577                    let actor = &actor_map[actor_id];
578
579                    crit_check_in_loop!(
580                        flag,
581                        actor.vnode_bitmap.is_none(),
582                        format!(
583                            "Fragment {fragment_id} actor {actor_id} has vnode_bitmap set for single distribution type",
584                        )
585                    );
586                }
587                DistributionType::Hash => {
588                    crit_check_in_loop!(
589                        flag,
590                        !actor_ids.is_empty(),
591                        format!(
592                            "Fragment {fragment_id} has less than one actors {actor_ids:?} for hash distribution type",
593                        )
594                    );
595
596                    let fragment_vnode_count = fragment.vnode_count as usize;
597
598                    let mut result_bitmap = Bitmap::zeros(fragment_vnode_count);
599
600                    for actor_id in actor_ids {
601                        let actor = &actor_map[actor_id];
602
603                        crit_check_in_loop!(
604                            flag,
605                            actor.vnode_bitmap.is_some(),
606                            format!(
607                                "Fragment {fragment_id} actor {actor_id} has no vnode_bitmap set for hash distribution type",
608                            )
609                        );
610
611                        let bitmap =
612                            Bitmap::from(actor.vnode_bitmap.as_ref().unwrap().to_protobuf());
613
614                        crit_check_in_loop!(
615                            flag,
616                            result_bitmap.clone().bitand(&bitmap).count_ones() == 0,
617                            format!(
618                                "Fragment {fragment_id} actor {actor_id} has duplicate vnode_bitmap with other actor for hash distribution type, actor bitmap {bitmap:?}, other all bitmap {result_bitmap:?}",
619                            )
620                        );
621
622                        result_bitmap.bitor_assign(&bitmap);
623                    }
624
625                    crit_check_in_loop!(
626                        flag,
627                        result_bitmap.all(),
628                        format!(
629                            "Fragment {fragment_id} has incomplete vnode_bitmap for hash distribution type",
630                        )
631                    );
632
633                    let discovered_vnode_count = result_bitmap.count_ones();
634
635                    crit_check_in_loop!(
636                        flag,
637                        discovered_vnode_count == fragment_vnode_count,
638                        format!(
639                            "Fragment {fragment_id} has different vnode_count {fragment_vnode_count} with discovered vnode count {discovered_vnode_count} for hash distribution type",
640                        )
641                    );
642                }
643            }
644        }
645
646        for PartialActor {
647            actor_id, status, ..
648        } in actor_map.values()
649        {
650            crit_check_in_loop!(
651                flag,
652                *status == ActorStatus::Running,
653                format!("Actor {actor_id} has status {status:?} which is not Running",)
654            );
655        }
656
657        if flag {
658            return Err(MetaError::integrity_check_failed());
659        }
660
661        Ok(())
662    }
663}