risingwave_meta/controller/
scale.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::ops::{BitAnd, BitOrAssign};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_connector::source::{SplitImpl, SplitMetaData};
21use risingwave_meta_model::actor::ActorStatus;
22use risingwave_meta_model::actor_dispatcher::DispatcherType;
23use risingwave_meta_model::fragment::DistributionType;
24use risingwave_meta_model::prelude::{
25    Actor, Fragment, FragmentRelation, Sink, Source, StreamingJob, Table,
26};
27use risingwave_meta_model::{
28    ConnectorSplits, FragmentId, ObjectId, VnodeBitmap, actor, fragment, fragment_relation, sink,
29    source, streaming_job, table,
30};
31use risingwave_meta_model_migration::{
32    Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement,
33    UnionType, WithClause, WithQuery,
34};
35use risingwave_pb::stream_plan::PbDispatcher;
36use sea_orm::{
37    ColumnTrait, ConnectionTrait, DbErr, DerivePartialModel, EntityTrait, FromQueryResult,
38    QueryFilter, QuerySelect, Statement, TransactionTrait,
39};
40
41use crate::controller::catalog::CatalogController;
42use crate::controller::utils::{get_existing_job_resource_group, get_fragment_actor_dispatchers};
43use crate::model::ActorId;
44use crate::{MetaError, MetaResult};
45
46/// This function will construct a query using recursive cte to find `no_shuffle` upstream relation graph for target fragments.
47///
48/// # Examples
49///
50/// ```
51/// use risingwave_meta::controller::scale::construct_no_shuffle_upstream_traverse_query;
52/// use sea_orm::sea_query::*;
53/// use sea_orm::*;
54///
55/// let query = construct_no_shuffle_upstream_traverse_query(vec![2, 3]);
56///
57/// assert_eq!(query.to_string(MysqlQueryBuilder), r#"WITH RECURSIVE `shuffle_deps` (`source_fragment_id`, `dispatcher_type`, `target_fragment_id`) AS (SELECT DISTINCT `fragment_relation`.`source_fragment_id`, `fragment_relation`.`dispatcher_type`, `fragment_relation`.`target_fragment_id` FROM `fragment_relation` WHERE `fragment_relation`.`dispatcher_type` = 'NO_SHUFFLE' AND `fragment_relation`.`target_fragment_id` IN (2, 3) UNION ALL (SELECT `fragment_relation`.`source_fragment_id`, `fragment_relation`.`dispatcher_type`, `fragment_relation`.`target_fragment_id` FROM `fragment_relation` INNER JOIN `shuffle_deps` ON `shuffle_deps`.`source_fragment_id` = `fragment_relation`.`target_fragment_id` WHERE `fragment_relation`.`dispatcher_type` = 'NO_SHUFFLE')) SELECT DISTINCT `source_fragment_id`, `dispatcher_type`, `target_fragment_id` FROM `shuffle_deps`"#);
58/// assert_eq!(query.to_string(PostgresQueryBuilder), r#"WITH RECURSIVE "shuffle_deps" ("source_fragment_id", "dispatcher_type", "target_fragment_id") AS (SELECT DISTINCT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE' AND "fragment_relation"."target_fragment_id" IN (2, 3) UNION ALL (SELECT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" INNER JOIN "shuffle_deps" ON "shuffle_deps"."source_fragment_id" = "fragment_relation"."target_fragment_id" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE')) SELECT DISTINCT "source_fragment_id", "dispatcher_type", "target_fragment_id" FROM "shuffle_deps""#);
59/// assert_eq!(query.to_string(SqliteQueryBuilder), r#"WITH RECURSIVE "shuffle_deps" ("source_fragment_id", "dispatcher_type", "target_fragment_id") AS (SELECT DISTINCT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE' AND "fragment_relation"."target_fragment_id" IN (2, 3) UNION ALL SELECT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" INNER JOIN "shuffle_deps" ON "shuffle_deps"."source_fragment_id" = "fragment_relation"."target_fragment_id" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE') SELECT DISTINCT "source_fragment_id", "dispatcher_type", "target_fragment_id" FROM "shuffle_deps""#);
60/// ```
61pub fn construct_no_shuffle_upstream_traverse_query(fragment_ids: Vec<FragmentId>) -> WithQuery {
62    construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Upstream)
63}
64
65pub fn construct_no_shuffle_downstream_traverse_query(fragment_ids: Vec<FragmentId>) -> WithQuery {
66    construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Downstream)
67}
68
69enum NoShuffleResolveDirection {
70    Upstream,
71    Downstream,
72}
73
74fn construct_no_shuffle_traverse_query_helper(
75    fragment_ids: Vec<FragmentId>,
76    direction: NoShuffleResolveDirection,
77) -> WithQuery {
78    let cte_alias = Alias::new("shuffle_deps");
79
80    // If we need to look upwards
81    //     resolve by upstream fragment_id -> downstream fragment_id
82    // and if downwards
83    //     resolve by downstream fragment_id -> upstream fragment_id
84    let (cte_ref_column, compared_column) = match direction {
85        NoShuffleResolveDirection::Upstream => (
86            (
87                cte_alias.clone(),
88                fragment_relation::Column::SourceFragmentId,
89            )
90                .into_column_ref(),
91            (
92                FragmentRelation,
93                fragment_relation::Column::TargetFragmentId,
94            )
95                .into_column_ref(),
96        ),
97        NoShuffleResolveDirection::Downstream => (
98            (
99                cte_alias.clone(),
100                fragment_relation::Column::TargetFragmentId,
101            )
102                .into_column_ref(),
103            (
104                FragmentRelation,
105                fragment_relation::Column::SourceFragmentId,
106            )
107                .into_column_ref(),
108        ),
109    };
110
111    let mut base_query = SelectStatement::new()
112        .column((
113            FragmentRelation,
114            fragment_relation::Column::SourceFragmentId,
115        ))
116        .column((FragmentRelation, fragment_relation::Column::DispatcherType))
117        .column((
118            FragmentRelation,
119            fragment_relation::Column::TargetFragmentId,
120        ))
121        .distinct()
122        .from(FragmentRelation)
123        .and_where(
124            Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType))
125                .eq(DispatcherType::NoShuffle),
126        )
127        .and_where(Expr::col(compared_column.clone()).is_in(fragment_ids.clone()))
128        .to_owned();
129
130    let cte_referencing = SelectStatement::new()
131        .column((
132            FragmentRelation,
133            fragment_relation::Column::SourceFragmentId,
134        ))
135        .column((FragmentRelation, fragment_relation::Column::DispatcherType))
136        .column((
137            FragmentRelation,
138            fragment_relation::Column::TargetFragmentId,
139        ))
140        // NOTE: Uncomment me once MySQL supports DISTINCT in the recursive block of CTE.
141        //.distinct()
142        .from(FragmentRelation)
143        .inner_join(
144            cte_alias.clone(),
145            Expr::col(cte_ref_column).eq(Expr::col(compared_column)),
146        )
147        .and_where(
148            Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType))
149                .eq(DispatcherType::NoShuffle),
150        )
151        .to_owned();
152
153    let common_table_expr = CommonTableExpression::new()
154        .query(base_query.union(UnionType::All, cte_referencing).to_owned())
155        .column(fragment_relation::Column::SourceFragmentId)
156        .column(fragment_relation::Column::DispatcherType)
157        .column(fragment_relation::Column::TargetFragmentId)
158        .table_name(cte_alias.clone())
159        .to_owned();
160
161    SelectStatement::new()
162        .column(fragment_relation::Column::SourceFragmentId)
163        .column(fragment_relation::Column::DispatcherType)
164        .column(fragment_relation::Column::TargetFragmentId)
165        .distinct()
166        .from(cte_alias.clone())
167        .to_owned()
168        .with(
169            WithClause::new()
170                .recursive(true)
171                .cte(common_table_expr)
172                .to_owned(),
173        )
174        .to_owned()
175}
176
177#[derive(Debug, Clone)]
178pub struct RescheduleWorkingSet {
179    pub fragments: HashMap<FragmentId, fragment::Model>,
180    pub actors: HashMap<ActorId, actor::Model>,
181    pub actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
182
183    pub fragment_downstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
184    pub fragment_upstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
185
186    pub job_resource_groups: HashMap<ObjectId, String>,
187    pub related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
188}
189
190async fn resolve_no_shuffle_query<C>(
191    txn: &C,
192    query: WithQuery,
193) -> MetaResult<Vec<(FragmentId, DispatcherType, FragmentId)>>
194where
195    C: ConnectionTrait,
196{
197    let (sql, values) = query.build_any(&*txn.get_database_backend().get_query_builder());
198
199    let result = txn
200        .query_all(Statement::from_sql_and_values(
201            txn.get_database_backend(),
202            sql,
203            values,
204        ))
205        .await?
206        .into_iter()
207        .map(|res| res.try_get_many_by_index())
208        .collect::<Result<Vec<(FragmentId, DispatcherType, FragmentId)>, DbErr>>()
209        .map_err(MetaError::from)?;
210
211    Ok(result)
212}
213
214pub(crate) async fn resolve_streaming_job_definition<C>(
215    txn: &C,
216    job_ids: &HashSet<ObjectId>,
217) -> MetaResult<HashMap<ObjectId, String>>
218where
219    C: ConnectionTrait,
220{
221    let job_ids = job_ids.iter().cloned().collect_vec();
222
223    // including table, materialized view, index
224    let common_job_definitions: Vec<(ObjectId, String)> = Table::find()
225        .select_only()
226        .columns([
227            table::Column::TableId,
228            #[cfg(not(debug_assertions))]
229            table::Column::Name,
230            #[cfg(debug_assertions)]
231            table::Column::Definition,
232        ])
233        .filter(table::Column::TableId.is_in(job_ids.clone()))
234        .into_tuple()
235        .all(txn)
236        .await?;
237
238    let sink_definitions: Vec<(ObjectId, String)> = Sink::find()
239        .select_only()
240        .columns([
241            sink::Column::SinkId,
242            #[cfg(not(debug_assertions))]
243            sink::Column::Name,
244            #[cfg(debug_assertions)]
245            sink::Column::Definition,
246        ])
247        .filter(sink::Column::SinkId.is_in(job_ids.clone()))
248        .into_tuple()
249        .all(txn)
250        .await?;
251
252    let source_definitions: Vec<(ObjectId, String)> = Source::find()
253        .select_only()
254        .columns([
255            source::Column::SourceId,
256            #[cfg(not(debug_assertions))]
257            source::Column::Name,
258            #[cfg(debug_assertions)]
259            source::Column::Definition,
260        ])
261        .filter(source::Column::SourceId.is_in(job_ids.clone()))
262        .into_tuple()
263        .all(txn)
264        .await?;
265
266    let definitions: HashMap<ObjectId, String> = common_job_definitions
267        .into_iter()
268        .chain(sink_definitions.into_iter())
269        .chain(source_definitions.into_iter())
270        .collect();
271
272    Ok(definitions)
273}
274
275impl CatalogController {
276    pub async fn resolve_working_set_for_reschedule_fragments(
277        &self,
278        fragment_ids: Vec<FragmentId>,
279    ) -> MetaResult<RescheduleWorkingSet> {
280        let inner = self.inner.read().await;
281        self.resolve_working_set_for_reschedule_helper(&inner.db, fragment_ids)
282            .await
283    }
284
285    pub async fn resolve_working_set_for_reschedule_tables(
286        &self,
287        table_ids: Vec<ObjectId>,
288    ) -> MetaResult<RescheduleWorkingSet> {
289        let inner = self.inner.read().await;
290        let txn = inner.db.begin().await?;
291
292        let fragment_ids: Vec<FragmentId> = Fragment::find()
293            .select_only()
294            .column(fragment::Column::FragmentId)
295            .filter(fragment::Column::JobId.is_in(table_ids))
296            .into_tuple()
297            .all(&txn)
298            .await?;
299
300        self.resolve_working_set_for_reschedule_helper(&txn, fragment_ids)
301            .await
302    }
303
304    pub async fn resolve_working_set_for_reschedule_helper<C>(
305        &self,
306        txn: &C,
307        fragment_ids: Vec<FragmentId>,
308    ) -> MetaResult<RescheduleWorkingSet>
309    where
310        C: ConnectionTrait,
311    {
312        // NO_SHUFFLE related multi-layer upstream fragments
313        let no_shuffle_related_upstream_fragment_ids = resolve_no_shuffle_query(
314            txn,
315            construct_no_shuffle_upstream_traverse_query(fragment_ids.clone()),
316        )
317        .await?;
318
319        // NO_SHUFFLE related multi-layer downstream fragments
320        let no_shuffle_related_downstream_fragment_ids = resolve_no_shuffle_query(
321            txn,
322            construct_no_shuffle_downstream_traverse_query(
323                no_shuffle_related_upstream_fragment_ids
324                    .iter()
325                    .map(|(src, _, _)| *src)
326                    .chain(fragment_ids.iter().cloned())
327                    .unique()
328                    .collect(),
329            ),
330        )
331        .await?;
332
333        // We need to identify all other types of dispatchers that are Leaves in the NO_SHUFFLE dependency tree.
334        let extended_fragment_ids: HashSet<_> = no_shuffle_related_upstream_fragment_ids
335            .iter()
336            .chain(no_shuffle_related_downstream_fragment_ids.iter())
337            .flat_map(|(src, _, dst)| [*src, *dst])
338            .chain(fragment_ids.iter().cloned())
339            .collect();
340
341        let query = FragmentRelation::find()
342            .select_only()
343            .column(fragment_relation::Column::SourceFragmentId)
344            .column(fragment_relation::Column::DispatcherType)
345            .column(fragment_relation::Column::TargetFragmentId)
346            .distinct();
347
348        // single-layer upstream fragment ids
349        let upstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query
350            .clone()
351            .filter(
352                fragment_relation::Column::TargetFragmentId.is_in(extended_fragment_ids.clone()),
353            )
354            .into_tuple()
355            .all(txn)
356            .await?;
357
358        // single-layer downstream fragment ids
359        let downstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query
360            .clone()
361            .filter(
362                fragment_relation::Column::SourceFragmentId.is_in(extended_fragment_ids.clone()),
363            )
364            .into_tuple()
365            .all(txn)
366            .await?;
367
368        let all_fragment_relations: HashSet<_> = no_shuffle_related_upstream_fragment_ids
369            .into_iter()
370            .chain(no_shuffle_related_downstream_fragment_ids.into_iter())
371            .chain(upstream_fragments.into_iter())
372            .chain(downstream_fragments.into_iter())
373            .collect();
374
375        let mut fragment_upstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>> =
376            HashMap::new();
377        let mut fragment_downstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>> =
378            HashMap::new();
379
380        for (src, dispatcher_type, dst) in &all_fragment_relations {
381            fragment_upstreams
382                .entry(*dst)
383                .or_default()
384                .insert(*src, *dispatcher_type);
385            fragment_downstreams
386                .entry(*src)
387                .or_default()
388                .insert(*dst, *dispatcher_type);
389        }
390
391        let all_fragment_ids: HashSet<_> = all_fragment_relations
392            .iter()
393            .flat_map(|(src, _, dst)| [*src, *dst])
394            .chain(extended_fragment_ids.into_iter())
395            .collect();
396
397        let fragments: Vec<_> = Fragment::find()
398            .filter(fragment::Column::FragmentId.is_in(all_fragment_ids.clone()))
399            .all(txn)
400            .await?;
401
402        let actors: Vec<_> = Actor::find()
403            .filter(actor::Column::FragmentId.is_in(all_fragment_ids.clone()))
404            .all(txn)
405            .await?;
406
407        let actors: HashMap<_, _> = actors
408            .into_iter()
409            .map(|actor| (actor.actor_id as _, actor))
410            .collect();
411
412        let fragments: HashMap<FragmentId, _> = fragments
413            .into_iter()
414            .map(|fragment| (fragment.fragment_id, fragment))
415            .collect();
416
417        let related_job_ids: HashSet<_> =
418            fragments.values().map(|fragment| fragment.job_id).collect();
419
420        let mut job_resource_groups = HashMap::new();
421        for &job_id in &related_job_ids {
422            let resource_group = get_existing_job_resource_group(txn, job_id).await?;
423            job_resource_groups.insert(job_id, resource_group);
424        }
425
426        let related_job_definitions =
427            resolve_streaming_job_definition(txn, &related_job_ids).await?;
428
429        let related_jobs = StreamingJob::find()
430            .filter(streaming_job::Column::JobId.is_in(related_job_ids))
431            .all(txn)
432            .await?;
433
434        let related_jobs = related_jobs
435            .into_iter()
436            .map(|job| {
437                let job_id = job.job_id;
438                (
439                    job_id,
440                    (
441                        job,
442                        related_job_definitions
443                            .get(&job_id)
444                            .cloned()
445                            .unwrap_or("".to_owned()),
446                    ),
447                )
448            })
449            .collect();
450
451        let fragment_actor_dispatchers = get_fragment_actor_dispatchers(
452            txn,
453            fragments
454                .keys()
455                .map(|fragment_id| *fragment_id as _)
456                .collect(),
457        )
458        .await?;
459
460        Ok(RescheduleWorkingSet {
461            fragments,
462            actors,
463            actor_dispatchers: fragment_actor_dispatchers.into_values().flatten().collect(),
464            fragment_downstreams,
465            fragment_upstreams,
466            job_resource_groups,
467            related_jobs,
468        })
469    }
470}
471
472macro_rules! crit_check_in_loop {
473    ($flag:expr, $condition:expr, $message:expr) => {
474        if !$condition {
475            tracing::error!("Integrity check failed: {}", $message);
476            $flag = true;
477            continue;
478        }
479    };
480}
481
482impl CatalogController {
483    pub async fn integrity_check(&self) -> MetaResult<()> {
484        let inner = self.inner.read().await;
485        let txn = inner.db.begin().await?;
486        Self::graph_check(&txn).await
487    }
488
489    // Perform integrity checks on the Actor, ActorDispatcher and Fragment tables.
490    pub async fn graph_check<C>(txn: &C) -> MetaResult<()>
491    where
492        C: ConnectionTrait,
493    {
494        #[derive(Clone, DerivePartialModel, FromQueryResult)]
495        #[sea_orm(entity = "Fragment")]
496        pub struct PartialFragment {
497            pub fragment_id: FragmentId,
498            pub distribution_type: DistributionType,
499            pub vnode_count: i32,
500        }
501
502        #[derive(Clone, DerivePartialModel, FromQueryResult)]
503        #[sea_orm(entity = "Actor")]
504        pub struct PartialActor {
505            pub actor_id: risingwave_meta_model::ActorId,
506            pub fragment_id: FragmentId,
507            pub status: ActorStatus,
508            pub splits: Option<ConnectorSplits>,
509            pub vnode_bitmap: Option<VnodeBitmap>,
510        }
511
512        let mut flag = false;
513
514        let fragments: Vec<PartialFragment> =
515            Fragment::find().into_partial_model().all(txn).await?;
516
517        let fragment_map: HashMap<_, _> = fragments
518            .into_iter()
519            .map(|fragment| (fragment.fragment_id, fragment))
520            .collect();
521
522        let actors: Vec<PartialActor> = Actor::find().into_partial_model().all(txn).await?;
523
524        let mut fragment_actors = HashMap::new();
525        for actor in &actors {
526            fragment_actors
527                .entry(actor.fragment_id)
528                .or_insert(HashSet::new())
529                .insert(actor.actor_id);
530        }
531
532        let actor_map: HashMap<_, _> = actors
533            .into_iter()
534            .map(|actor| (actor.actor_id, actor))
535            .collect();
536
537        for (fragment_id, actor_ids) in &fragment_actors {
538            crit_check_in_loop!(
539                flag,
540                fragment_map.contains_key(fragment_id),
541                format!("Fragment {fragment_id} has actors {actor_ids:?} which does not exist",)
542            );
543
544            let mut split_map = HashMap::new();
545            for actor_id in actor_ids {
546                let actor = &actor_map[actor_id];
547
548                if let Some(splits) = &actor.splits {
549                    for split in splits.to_protobuf().splits {
550                        let Ok(split_impl) = SplitImpl::try_from(&split) else {
551                            continue;
552                        };
553
554                        let dup_split_actor = split_map.insert(split_impl.id(), actor_id);
555                        crit_check_in_loop!(
556                            flag,
557                            dup_split_actor.is_none(),
558                            format!(
559                                "Fragment {fragment_id} actor {actor_id} has duplicate split {split:?} from actor {dup_split_actor:?}",
560                            )
561                        );
562                    }
563                }
564            }
565
566            let fragment = &fragment_map[fragment_id];
567
568            match fragment.distribution_type {
569                DistributionType::Single => {
570                    crit_check_in_loop!(
571                        flag,
572                        actor_ids.len() == 1,
573                        format!(
574                            "Fragment {fragment_id} has more than one actors {actor_ids:?} for single distribution type",
575                        )
576                    );
577
578                    let actor_id = actor_ids.iter().exactly_one().unwrap();
579                    let actor = &actor_map[actor_id];
580
581                    crit_check_in_loop!(
582                        flag,
583                        actor.vnode_bitmap.is_none(),
584                        format!(
585                            "Fragment {fragment_id} actor {actor_id} has vnode_bitmap set for single distribution type",
586                        )
587                    );
588                }
589                DistributionType::Hash => {
590                    crit_check_in_loop!(
591                        flag,
592                        !actor_ids.is_empty(),
593                        format!(
594                            "Fragment {fragment_id} has less than one actors {actor_ids:?} for hash distribution type",
595                        )
596                    );
597
598                    let fragment_vnode_count = fragment.vnode_count as usize;
599
600                    let mut result_bitmap = Bitmap::zeros(fragment_vnode_count);
601
602                    for actor_id in actor_ids {
603                        let actor = &actor_map[actor_id];
604
605                        crit_check_in_loop!(
606                            flag,
607                            actor.vnode_bitmap.is_some(),
608                            format!(
609                                "Fragment {fragment_id} actor {actor_id} has no vnode_bitmap set for hash distribution type",
610                            )
611                        );
612
613                        let bitmap =
614                            Bitmap::from(actor.vnode_bitmap.as_ref().unwrap().to_protobuf());
615
616                        crit_check_in_loop!(
617                            flag,
618                            result_bitmap.clone().bitand(&bitmap).count_ones() == 0,
619                            format!(
620                                "Fragment {fragment_id} actor {actor_id} has duplicate vnode_bitmap with other actor for hash distribution type, actor bitmap {bitmap:?}, other all bitmap {result_bitmap:?}",
621                            )
622                        );
623
624                        result_bitmap.bitor_assign(&bitmap);
625                    }
626
627                    crit_check_in_loop!(
628                        flag,
629                        result_bitmap.all(),
630                        format!(
631                            "Fragment {fragment_id} has incomplete vnode_bitmap for hash distribution type",
632                        )
633                    );
634
635                    let discovered_vnode_count = result_bitmap.count_ones();
636
637                    crit_check_in_loop!(
638                        flag,
639                        discovered_vnode_count == fragment_vnode_count,
640                        format!(
641                            "Fragment {fragment_id} has different vnode_count {fragment_vnode_count} with discovered vnode count {discovered_vnode_count} for hash distribution type",
642                        )
643                    );
644                }
645            }
646        }
647
648        for PartialActor {
649            actor_id, status, ..
650        } in actor_map.values()
651        {
652            crit_check_in_loop!(
653                flag,
654                *status == ActorStatus::Running,
655                format!("Actor {actor_id} has status {status:?} which is not Running",)
656            );
657        }
658
659        if flag {
660            return Err(MetaError::integrity_check_failed());
661        }
662
663        Ok(())
664    }
665}