risingwave_meta/controller/
scale.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::ops::{BitAnd, BitOrAssign};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_connector::source::{SplitImpl, SplitMetaData};
21use risingwave_meta_model::actor::ActorStatus;
22use risingwave_meta_model::fragment::DistributionType;
23use risingwave_meta_model::prelude::{
24    Actor, Fragment, FragmentRelation, Sink, Source, StreamingJob, Table,
25};
26use risingwave_meta_model::{
27    ConnectorSplits, DispatcherType, FragmentId, ObjectId, VnodeBitmap, actor, fragment,
28    fragment_relation, sink, source, streaming_job, table,
29};
30use risingwave_meta_model_migration::{
31    Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement,
32    UnionType, WithClause, WithQuery,
33};
34use risingwave_pb::stream_plan::PbDispatcher;
35use sea_orm::{
36    ColumnTrait, ConnectionTrait, DbErr, DerivePartialModel, EntityTrait, FromQueryResult,
37    QueryFilter, QuerySelect, Statement, TransactionTrait,
38};
39
40use crate::controller::catalog::CatalogController;
41use crate::controller::utils::{get_existing_job_resource_group, get_fragment_actor_dispatchers};
42use crate::model::ActorId;
43use crate::{MetaError, MetaResult};
44
45/// This function will construct a query using recursive cte to find `no_shuffle` upstream relation graph for target fragments.
46///
47/// # Examples
48///
49/// ```
50/// use risingwave_meta::controller::scale::construct_no_shuffle_upstream_traverse_query;
51/// use sea_orm::sea_query::*;
52/// use sea_orm::*;
53///
54/// let query = construct_no_shuffle_upstream_traverse_query(vec![2, 3]);
55///
56/// assert_eq!(query.to_string(MysqlQueryBuilder), r#"WITH RECURSIVE `shuffle_deps` (`source_fragment_id`, `dispatcher_type`, `target_fragment_id`) AS (SELECT DISTINCT `fragment_relation`.`source_fragment_id`, `fragment_relation`.`dispatcher_type`, `fragment_relation`.`target_fragment_id` FROM `fragment_relation` WHERE `fragment_relation`.`dispatcher_type` = 'NO_SHUFFLE' AND `fragment_relation`.`target_fragment_id` IN (2, 3) UNION ALL (SELECT `fragment_relation`.`source_fragment_id`, `fragment_relation`.`dispatcher_type`, `fragment_relation`.`target_fragment_id` FROM `fragment_relation` INNER JOIN `shuffle_deps` ON `shuffle_deps`.`source_fragment_id` = `fragment_relation`.`target_fragment_id` WHERE `fragment_relation`.`dispatcher_type` = 'NO_SHUFFLE')) SELECT DISTINCT `source_fragment_id`, `dispatcher_type`, `target_fragment_id` FROM `shuffle_deps`"#);
57/// assert_eq!(query.to_string(PostgresQueryBuilder), r#"WITH RECURSIVE "shuffle_deps" ("source_fragment_id", "dispatcher_type", "target_fragment_id") AS (SELECT DISTINCT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE' AND "fragment_relation"."target_fragment_id" IN (2, 3) UNION ALL (SELECT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" INNER JOIN "shuffle_deps" ON "shuffle_deps"."source_fragment_id" = "fragment_relation"."target_fragment_id" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE')) SELECT DISTINCT "source_fragment_id", "dispatcher_type", "target_fragment_id" FROM "shuffle_deps""#);
58/// assert_eq!(query.to_string(SqliteQueryBuilder), r#"WITH RECURSIVE "shuffle_deps" ("source_fragment_id", "dispatcher_type", "target_fragment_id") AS (SELECT DISTINCT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE' AND "fragment_relation"."target_fragment_id" IN (2, 3) UNION ALL SELECT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" INNER JOIN "shuffle_deps" ON "shuffle_deps"."source_fragment_id" = "fragment_relation"."target_fragment_id" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE') SELECT DISTINCT "source_fragment_id", "dispatcher_type", "target_fragment_id" FROM "shuffle_deps""#);
59/// ```
60pub fn construct_no_shuffle_upstream_traverse_query(fragment_ids: Vec<FragmentId>) -> WithQuery {
61    construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Upstream)
62}
63
64pub fn construct_no_shuffle_downstream_traverse_query(fragment_ids: Vec<FragmentId>) -> WithQuery {
65    construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Downstream)
66}
67
68enum NoShuffleResolveDirection {
69    Upstream,
70    Downstream,
71}
72
73fn construct_no_shuffle_traverse_query_helper(
74    fragment_ids: Vec<FragmentId>,
75    direction: NoShuffleResolveDirection,
76) -> WithQuery {
77    let cte_alias = Alias::new("shuffle_deps");
78
79    // If we need to look upwards
80    //     resolve by upstream fragment_id -> downstream fragment_id
81    // and if downwards
82    //     resolve by downstream fragment_id -> upstream fragment_id
83    let (cte_ref_column, compared_column) = match direction {
84        NoShuffleResolveDirection::Upstream => (
85            (
86                cte_alias.clone(),
87                fragment_relation::Column::SourceFragmentId,
88            )
89                .into_column_ref(),
90            (
91                FragmentRelation,
92                fragment_relation::Column::TargetFragmentId,
93            )
94                .into_column_ref(),
95        ),
96        NoShuffleResolveDirection::Downstream => (
97            (
98                cte_alias.clone(),
99                fragment_relation::Column::TargetFragmentId,
100            )
101                .into_column_ref(),
102            (
103                FragmentRelation,
104                fragment_relation::Column::SourceFragmentId,
105            )
106                .into_column_ref(),
107        ),
108    };
109
110    let mut base_query = SelectStatement::new()
111        .column((
112            FragmentRelation,
113            fragment_relation::Column::SourceFragmentId,
114        ))
115        .column((FragmentRelation, fragment_relation::Column::DispatcherType))
116        .column((
117            FragmentRelation,
118            fragment_relation::Column::TargetFragmentId,
119        ))
120        .distinct()
121        .from(FragmentRelation)
122        .and_where(
123            Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType))
124                .eq(DispatcherType::NoShuffle),
125        )
126        .and_where(Expr::col(compared_column.clone()).is_in(fragment_ids.clone()))
127        .to_owned();
128
129    let cte_referencing = SelectStatement::new()
130        .column((
131            FragmentRelation,
132            fragment_relation::Column::SourceFragmentId,
133        ))
134        .column((FragmentRelation, fragment_relation::Column::DispatcherType))
135        .column((
136            FragmentRelation,
137            fragment_relation::Column::TargetFragmentId,
138        ))
139        // NOTE: Uncomment me once MySQL supports DISTINCT in the recursive block of CTE.
140        //.distinct()
141        .from(FragmentRelation)
142        .inner_join(
143            cte_alias.clone(),
144            Expr::col(cte_ref_column).eq(Expr::col(compared_column)),
145        )
146        .and_where(
147            Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType))
148                .eq(DispatcherType::NoShuffle),
149        )
150        .to_owned();
151
152    let common_table_expr = CommonTableExpression::new()
153        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
154        .column(fragment_relation::Column::SourceFragmentId)
155        .column(fragment_relation::Column::DispatcherType)
156        .column(fragment_relation::Column::TargetFragmentId)
157        .table_name(cte_alias.clone())
158        .to_owned();
159
160    SelectStatement::new()
161        .column(fragment_relation::Column::SourceFragmentId)
162        .column(fragment_relation::Column::DispatcherType)
163        .column(fragment_relation::Column::TargetFragmentId)
164        .distinct()
165        .from(cte_alias.clone())
166        .to_owned()
167        .with(
168            WithClause::new()
169                .recursive(true)
170                .cte(common_table_expr)
171                .to_owned(),
172        )
173        .to_owned()
174}
175
176#[derive(Debug, Clone)]
177pub struct RescheduleWorkingSet {
178    pub fragments: HashMap<FragmentId, fragment::Model>,
179    pub actors: HashMap<ActorId, actor::Model>,
180    pub actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
181
182    pub fragment_downstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
183    pub fragment_upstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
184
185    pub job_resource_groups: HashMap<ObjectId, String>,
186    pub related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
187}
188
189async fn resolve_no_shuffle_query<C>(
190    txn: &C,
191    query: WithQuery,
192) -> MetaResult<Vec<(FragmentId, DispatcherType, FragmentId)>>
193where
194    C: ConnectionTrait,
195{
196    let (sql, values) = query.build_any(&*txn.get_database_backend().get_query_builder());
197
198    let result = txn
199        .query_all(Statement::from_sql_and_values(
200            txn.get_database_backend(),
201            sql,
202            values,
203        ))
204        .await?
205        .into_iter()
206        .map(|res| res.try_get_many_by_index())
207        .collect::<Result<Vec<(FragmentId, DispatcherType, FragmentId)>, DbErr>>()
208        .map_err(MetaError::from)?;
209
210    Ok(result)
211}
212
213pub(crate) async fn resolve_streaming_job_definition<C>(
214    txn: &C,
215    job_ids: &HashSet<ObjectId>,
216) -> MetaResult<HashMap<ObjectId, String>>
217where
218    C: ConnectionTrait,
219{
220    let job_ids = job_ids.iter().cloned().collect_vec();
221
222    // including table, materialized view, index
223    let common_job_definitions: Vec<(ObjectId, String)> = Table::find()
224        .select_only()
225        .columns([
226            table::Column::TableId,
227            #[cfg(not(debug_assertions))]
228            table::Column::Name,
229            #[cfg(debug_assertions)]
230            table::Column::Definition,
231        ])
232        .filter(table::Column::TableId.is_in(job_ids.clone()))
233        .into_tuple()
234        .all(txn)
235        .await?;
236
237    let sink_definitions: Vec<(ObjectId, String)> = Sink::find()
238        .select_only()
239        .columns([
240            sink::Column::SinkId,
241            #[cfg(not(debug_assertions))]
242            sink::Column::Name,
243            #[cfg(debug_assertions)]
244            sink::Column::Definition,
245        ])
246        .filter(sink::Column::SinkId.is_in(job_ids.clone()))
247        .into_tuple()
248        .all(txn)
249        .await?;
250
251    let source_definitions: Vec<(ObjectId, String)> = Source::find()
252        .select_only()
253        .columns([
254            source::Column::SourceId,
255            #[cfg(not(debug_assertions))]
256            source::Column::Name,
257            #[cfg(debug_assertions)]
258            source::Column::Definition,
259        ])
260        .filter(source::Column::SourceId.is_in(job_ids.clone()))
261        .into_tuple()
262        .all(txn)
263        .await?;
264
265    let definitions: HashMap<ObjectId, String> = common_job_definitions
266        .into_iter()
267        .chain(sink_definitions.into_iter())
268        .chain(source_definitions.into_iter())
269        .collect();
270
271    Ok(definitions)
272}
273
274impl CatalogController {
275    pub async fn resolve_working_set_for_reschedule_fragments(
276        &self,
277        fragment_ids: Vec<FragmentId>,
278    ) -> MetaResult<RescheduleWorkingSet> {
279        let inner = self.inner.read().await;
280        self.resolve_working_set_for_reschedule_helper(&inner.db, fragment_ids)
281            .await
282    }
283
284    pub async fn resolve_working_set_for_reschedule_tables(
285        &self,
286        table_ids: Vec<ObjectId>,
287    ) -> MetaResult<RescheduleWorkingSet> {
288        let inner = self.inner.read().await;
289        let txn = inner.db.begin().await?;
290
291        let fragment_ids: Vec<FragmentId> = Fragment::find()
292            .select_only()
293            .column(fragment::Column::FragmentId)
294            .filter(fragment::Column::JobId.is_in(table_ids))
295            .into_tuple()
296            .all(&txn)
297            .await?;
298
299        self.resolve_working_set_for_reschedule_helper(&txn, fragment_ids)
300            .await
301    }
302
303    pub async fn resolve_working_set_for_reschedule_helper<C>(
304        &self,
305        txn: &C,
306        fragment_ids: Vec<FragmentId>,
307    ) -> MetaResult<RescheduleWorkingSet>
308    where
309        C: ConnectionTrait,
310    {
311        // NO_SHUFFLE related multi-layer upstream fragments
312        let no_shuffle_related_upstream_fragment_ids = resolve_no_shuffle_query(
313            txn,
314            construct_no_shuffle_upstream_traverse_query(fragment_ids.clone()),
315        )
316        .await?;
317
318        // NO_SHUFFLE related multi-layer downstream fragments
319        let no_shuffle_related_downstream_fragment_ids = resolve_no_shuffle_query(
320            txn,
321            construct_no_shuffle_downstream_traverse_query(
322                no_shuffle_related_upstream_fragment_ids
323                    .iter()
324                    .map(|(src, _, _)| *src)
325                    .chain(fragment_ids.iter().cloned())
326                    .unique()
327                    .collect(),
328            ),
329        )
330        .await?;
331
332        // We need to identify all other types of dispatchers that are Leaves in the NO_SHUFFLE dependency tree.
333        let extended_fragment_ids: HashSet<_> = no_shuffle_related_upstream_fragment_ids
334            .iter()
335            .chain(no_shuffle_related_downstream_fragment_ids.iter())
336            .flat_map(|(src, _, dst)| [*src, *dst])
337            .chain(fragment_ids.iter().cloned())
338            .collect();
339
340        let query = FragmentRelation::find()
341            .select_only()
342            .column(fragment_relation::Column::SourceFragmentId)
343            .column(fragment_relation::Column::DispatcherType)
344            .column(fragment_relation::Column::TargetFragmentId)
345            .distinct();
346
347        // single-layer upstream fragment ids
348        let upstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query
349            .clone()
350            .filter(
351                fragment_relation::Column::TargetFragmentId.is_in(extended_fragment_ids.clone()),
352            )
353            .into_tuple()
354            .all(txn)
355            .await?;
356
357        // single-layer downstream fragment ids
358        let downstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query
359            .clone()
360            .filter(
361                fragment_relation::Column::SourceFragmentId.is_in(extended_fragment_ids.clone()),
362            )
363            .into_tuple()
364            .all(txn)
365            .await?;
366
367        let all_fragment_relations: HashSet<_> = no_shuffle_related_upstream_fragment_ids
368            .into_iter()
369            .chain(no_shuffle_related_downstream_fragment_ids.into_iter())
370            .chain(upstream_fragments.into_iter())
371            .chain(downstream_fragments.into_iter())
372            .collect();
373
374        let mut fragment_upstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>> =
375            HashMap::new();
376        let mut fragment_downstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>> =
377            HashMap::new();
378
379        for (src, dispatcher_type, dst) in &all_fragment_relations {
380            fragment_upstreams
381                .entry(*dst)
382                .or_default()
383                .insert(*src, *dispatcher_type);
384            fragment_downstreams
385                .entry(*src)
386                .or_default()
387                .insert(*dst, *dispatcher_type);
388        }
389
390        let all_fragment_ids: HashSet<_> = all_fragment_relations
391            .iter()
392            .flat_map(|(src, _, dst)| [*src, *dst])
393            .chain(extended_fragment_ids.into_iter())
394            .collect();
395
396        let fragments: Vec<_> = Fragment::find()
397            .filter(fragment::Column::FragmentId.is_in(all_fragment_ids.clone()))
398            .all(txn)
399            .await?;
400
401        let actors: Vec<_> = Actor::find()
402            .filter(actor::Column::FragmentId.is_in(all_fragment_ids.clone()))
403            .all(txn)
404            .await?;
405
406        let actors: HashMap<_, _> = actors
407            .into_iter()
408            .map(|actor| (actor.actor_id as _, actor))
409            .collect();
410
411        let fragments: HashMap<FragmentId, _> = fragments
412            .into_iter()
413            .map(|fragment| (fragment.fragment_id, fragment))
414            .collect();
415
416        let related_job_ids: HashSet<_> =
417            fragments.values().map(|fragment| fragment.job_id).collect();
418
419        let mut job_resource_groups = HashMap::new();
420        for &job_id in &related_job_ids {
421            let resource_group = get_existing_job_resource_group(txn, job_id).await?;
422            job_resource_groups.insert(job_id, resource_group);
423        }
424
425        let related_job_definitions =
426            resolve_streaming_job_definition(txn, &related_job_ids).await?;
427
428        let related_jobs = StreamingJob::find()
429            .filter(streaming_job::Column::JobId.is_in(related_job_ids))
430            .all(txn)
431            .await?;
432
433        let related_jobs = related_jobs
434            .into_iter()
435            .map(|job| {
436                let job_id = job.job_id;
437                (
438                    job_id,
439                    (
440                        job,
441                        related_job_definitions
442                            .get(&job_id)
443                            .cloned()
444                            .unwrap_or("".to_owned()),
445                    ),
446                )
447            })
448            .collect();
449
450        let fragment_actor_dispatchers = get_fragment_actor_dispatchers(
451            txn,
452            fragments
453                .keys()
454                .map(|fragment_id| *fragment_id as _)
455                .collect(),
456        )
457        .await?;
458
459        Ok(RescheduleWorkingSet {
460            fragments,
461            actors,
462            actor_dispatchers: fragment_actor_dispatchers.into_values().flatten().collect(),
463            fragment_downstreams,
464            fragment_upstreams,
465            job_resource_groups,
466            related_jobs,
467        })
468    }
469}
470
471macro_rules! crit_check_in_loop {
472    ($flag:expr, $condition:expr, $message:expr) => {
473        if !$condition {
474            tracing::error!("Integrity check failed: {}", $message);
475            $flag = true;
476            continue;
477        }
478    };
479}
480
481impl CatalogController {
482    pub async fn integrity_check(&self) -> MetaResult<()> {
483        let inner = self.inner.read().await;
484        let txn = inner.db.begin().await?;
485        Self::graph_check(&txn).await
486    }
487
488    // Perform integrity checks on the Actor, ActorDispatcher and Fragment tables.
489    pub async fn graph_check<C>(txn: &C) -> MetaResult<()>
490    where
491        C: ConnectionTrait,
492    {
493        #[derive(Clone, DerivePartialModel, FromQueryResult)]
494        #[sea_orm(entity = "Fragment")]
495        pub struct PartialFragment {
496            pub fragment_id: FragmentId,
497            pub distribution_type: DistributionType,
498            pub vnode_count: i32,
499        }
500
501        #[derive(Clone, DerivePartialModel, FromQueryResult)]
502        #[sea_orm(entity = "Actor")]
503        pub struct PartialActor {
504            pub actor_id: risingwave_meta_model::ActorId,
505            pub fragment_id: FragmentId,
506            pub status: ActorStatus,
507            pub splits: Option<ConnectorSplits>,
508            pub vnode_bitmap: Option<VnodeBitmap>,
509        }
510
511        let mut flag = false;
512
513        let fragments: Vec<PartialFragment> =
514            Fragment::find().into_partial_model().all(txn).await?;
515
516        let fragment_map: HashMap<_, _> = fragments
517            .into_iter()
518            .map(|fragment| (fragment.fragment_id, fragment))
519            .collect();
520
521        let actors: Vec<PartialActor> = Actor::find().into_partial_model().all(txn).await?;
522
523        let mut fragment_actors = HashMap::new();
524        for actor in &actors {
525            fragment_actors
526                .entry(actor.fragment_id)
527                .or_insert(HashSet::new())
528                .insert(actor.actor_id);
529        }
530
531        let actor_map: HashMap<_, _> = actors
532            .into_iter()
533            .map(|actor| (actor.actor_id, actor))
534            .collect();
535
536        for (fragment_id, actor_ids) in &fragment_actors {
537            crit_check_in_loop!(
538                flag,
539                fragment_map.contains_key(fragment_id),
540                format!("Fragment {fragment_id} has actors {actor_ids:?} which does not exist",)
541            );
542
543            let mut split_map = HashMap::new();
544            for actor_id in actor_ids {
545                let actor = &actor_map[actor_id];
546
547                if let Some(splits) = &actor.splits {
548                    for split in splits.to_protobuf().splits {
549                        let Ok(split_impl) = SplitImpl::try_from(&split) else {
550                            continue;
551                        };
552
553                        let dup_split_actor = split_map.insert(split_impl.id(), actor_id);
554                        crit_check_in_loop!(
555                            flag,
556                            dup_split_actor.is_none(),
557                            format!(
558                                "Fragment {fragment_id} actor {actor_id} has duplicate split {split:?} from actor {dup_split_actor:?}",
559                            )
560                        );
561                    }
562                }
563            }
564
565            let fragment = &fragment_map[fragment_id];
566
567            match fragment.distribution_type {
568                DistributionType::Single => {
569                    crit_check_in_loop!(
570                        flag,
571                        actor_ids.len() == 1,
572                        format!(
573                            "Fragment {fragment_id} has more than one actors {actor_ids:?} for single distribution type",
574                        )
575                    );
576
577                    let actor_id = actor_ids.iter().exactly_one().unwrap();
578                    let actor = &actor_map[actor_id];
579
580                    crit_check_in_loop!(
581                        flag,
582                        actor.vnode_bitmap.is_none(),
583                        format!(
584                            "Fragment {fragment_id} actor {actor_id} has vnode_bitmap set for single distribution type",
585                        )
586                    );
587                }
588                DistributionType::Hash => {
589                    crit_check_in_loop!(
590                        flag,
591                        !actor_ids.is_empty(),
592                        format!(
593                            "Fragment {fragment_id} has less than one actors {actor_ids:?} for hash distribution type",
594                        )
595                    );
596
597                    let fragment_vnode_count = fragment.vnode_count as usize;
598
599                    let mut result_bitmap = Bitmap::zeros(fragment_vnode_count);
600
601                    for actor_id in actor_ids {
602                        let actor = &actor_map[actor_id];
603
604                        crit_check_in_loop!(
605                            flag,
606                            actor.vnode_bitmap.is_some(),
607                            format!(
608                                "Fragment {fragment_id} actor {actor_id} has no vnode_bitmap set for hash distribution type",
609                            )
610                        );
611
612                        let bitmap =
613                            Bitmap::from(actor.vnode_bitmap.as_ref().unwrap().to_protobuf());
614
615                        crit_check_in_loop!(
616                            flag,
617                            result_bitmap.clone().bitand(&bitmap).count_ones() == 0,
618                            format!(
619                                "Fragment {fragment_id} actor {actor_id} has duplicate vnode_bitmap with other actor for hash distribution type, actor bitmap {bitmap:?}, other all bitmap {result_bitmap:?}",
620                            )
621                        );
622
623                        result_bitmap.bitor_assign(&bitmap);
624                    }
625
626                    crit_check_in_loop!(
627                        flag,
628                        result_bitmap.all(),
629                        format!(
630                            "Fragment {fragment_id} has incomplete vnode_bitmap for hash distribution type",
631                        )
632                    );
633
634                    let discovered_vnode_count = result_bitmap.count_ones();
635
636                    crit_check_in_loop!(
637                        flag,
638                        discovered_vnode_count == fragment_vnode_count,
639                        format!(
640                            "Fragment {fragment_id} has different vnode_count {fragment_vnode_count} with discovered vnode count {discovered_vnode_count} for hash distribution type",
641                        )
642                    );
643                }
644            }
645        }
646
647        for PartialActor {
648            actor_id, status, ..
649        } in actor_map.values()
650        {
651            crit_check_in_loop!(
652                flag,
653                *status == ActorStatus::Running,
654                format!("Actor {actor_id} has status {status:?} which is not Running",)
655            );
656        }
657
658        if flag {
659            return Err(MetaError::integrity_check_failed());
660        }
661
662        Ok(())
663    }
664}