1use std::collections::{HashMap, HashSet};
16use std::ops::{BitAnd, BitOrAssign};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_connector::source::{SplitImpl, SplitMetaData};
21use risingwave_meta_model::actor::ActorStatus;
22use risingwave_meta_model::fragment::DistributionType;
23use risingwave_meta_model::prelude::{
24 Actor, Fragment, FragmentRelation, Sink, Source, StreamingJob, Table,
25};
26use risingwave_meta_model::{
27 ConnectorSplits, DispatcherType, FragmentId, ObjectId, VnodeBitmap, actor, fragment,
28 fragment_relation, sink, source, streaming_job, table,
29};
30use risingwave_meta_model_migration::{
31 Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement,
32 UnionType, WithClause, WithQuery,
33};
34use risingwave_pb::stream_plan::PbDispatcher;
35use sea_orm::{
36 ColumnTrait, ConnectionTrait, DbErr, DerivePartialModel, EntityTrait, FromQueryResult,
37 QueryFilter, QuerySelect, Statement, TransactionTrait,
38};
39
40use crate::controller::catalog::CatalogController;
41use crate::controller::utils::{get_existing_job_resource_group, get_fragment_actor_dispatchers};
42use crate::model::ActorId;
43use crate::{MetaError, MetaResult};
44
45pub fn construct_no_shuffle_upstream_traverse_query(fragment_ids: Vec<FragmentId>) -> WithQuery {
61 construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Upstream)
62}
63
64pub fn construct_no_shuffle_downstream_traverse_query(fragment_ids: Vec<FragmentId>) -> WithQuery {
65 construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Downstream)
66}
67
68enum NoShuffleResolveDirection {
69 Upstream,
70 Downstream,
71}
72
73fn construct_no_shuffle_traverse_query_helper(
74 fragment_ids: Vec<FragmentId>,
75 direction: NoShuffleResolveDirection,
76) -> WithQuery {
77 let cte_alias = Alias::new("shuffle_deps");
78
79 let (cte_ref_column, compared_column) = match direction {
84 NoShuffleResolveDirection::Upstream => (
85 (
86 cte_alias.clone(),
87 fragment_relation::Column::SourceFragmentId,
88 )
89 .into_column_ref(),
90 (
91 FragmentRelation,
92 fragment_relation::Column::TargetFragmentId,
93 )
94 .into_column_ref(),
95 ),
96 NoShuffleResolveDirection::Downstream => (
97 (
98 cte_alias.clone(),
99 fragment_relation::Column::TargetFragmentId,
100 )
101 .into_column_ref(),
102 (
103 FragmentRelation,
104 fragment_relation::Column::SourceFragmentId,
105 )
106 .into_column_ref(),
107 ),
108 };
109
110 let mut base_query = SelectStatement::new()
111 .column((
112 FragmentRelation,
113 fragment_relation::Column::SourceFragmentId,
114 ))
115 .column((FragmentRelation, fragment_relation::Column::DispatcherType))
116 .column((
117 FragmentRelation,
118 fragment_relation::Column::TargetFragmentId,
119 ))
120 .distinct()
121 .from(FragmentRelation)
122 .and_where(
123 Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType))
124 .eq(DispatcherType::NoShuffle),
125 )
126 .and_where(Expr::col(compared_column.clone()).is_in(fragment_ids))
127 .to_owned();
128
129 let cte_referencing = SelectStatement::new()
130 .column((
131 FragmentRelation,
132 fragment_relation::Column::SourceFragmentId,
133 ))
134 .column((FragmentRelation, fragment_relation::Column::DispatcherType))
135 .column((
136 FragmentRelation,
137 fragment_relation::Column::TargetFragmentId,
138 ))
139 .from(FragmentRelation)
142 .inner_join(
143 cte_alias.clone(),
144 Expr::col(cte_ref_column).eq(Expr::col(compared_column)),
145 )
146 .and_where(
147 Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType))
148 .eq(DispatcherType::NoShuffle),
149 )
150 .to_owned();
151
152 let mut common_table_expr = CommonTableExpression::new();
153 common_table_expr
154 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
155 .column(fragment_relation::Column::SourceFragmentId)
156 .column(fragment_relation::Column::DispatcherType)
157 .column(fragment_relation::Column::TargetFragmentId)
158 .table_name(cte_alias.clone());
159
160 SelectStatement::new()
161 .column(fragment_relation::Column::SourceFragmentId)
162 .column(fragment_relation::Column::DispatcherType)
163 .column(fragment_relation::Column::TargetFragmentId)
164 .distinct()
165 .from(cte_alias)
166 .to_owned()
167 .with(
168 WithClause::new()
169 .recursive(true)
170 .cte(common_table_expr)
171 .to_owned(),
172 )
173}
174
175#[derive(Debug, Clone)]
176pub struct RescheduleWorkingSet {
177 pub fragments: HashMap<FragmentId, fragment::Model>,
178 pub actors: HashMap<ActorId, actor::Model>,
179 pub actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
180
181 pub fragment_downstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
182 pub fragment_upstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
183
184 pub job_resource_groups: HashMap<ObjectId, String>,
185 pub related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
186}
187
188async fn resolve_no_shuffle_query<C>(
189 txn: &C,
190 query: WithQuery,
191) -> MetaResult<Vec<(FragmentId, DispatcherType, FragmentId)>>
192where
193 C: ConnectionTrait,
194{
195 let (sql, values) = query.build_any(&*txn.get_database_backend().get_query_builder());
196
197 let result = txn
198 .query_all(Statement::from_sql_and_values(
199 txn.get_database_backend(),
200 sql,
201 values,
202 ))
203 .await?
204 .into_iter()
205 .map(|res| res.try_get_many_by_index())
206 .collect::<Result<Vec<(FragmentId, DispatcherType, FragmentId)>, DbErr>>()
207 .map_err(MetaError::from)?;
208
209 Ok(result)
210}
211
212pub(crate) async fn resolve_streaming_job_definition<C>(
213 txn: &C,
214 job_ids: &HashSet<ObjectId>,
215) -> MetaResult<HashMap<ObjectId, String>>
216where
217 C: ConnectionTrait,
218{
219 let job_ids = job_ids.iter().cloned().collect_vec();
220
221 let common_job_definitions: Vec<(ObjectId, String)> = Table::find()
223 .select_only()
224 .columns([
225 table::Column::TableId,
226 #[cfg(not(debug_assertions))]
227 table::Column::Name,
228 #[cfg(debug_assertions)]
229 table::Column::Definition,
230 ])
231 .filter(table::Column::TableId.is_in(job_ids.clone()))
232 .into_tuple()
233 .all(txn)
234 .await?;
235
236 let sink_definitions: Vec<(ObjectId, String)> = Sink::find()
237 .select_only()
238 .columns([
239 sink::Column::SinkId,
240 #[cfg(not(debug_assertions))]
241 sink::Column::Name,
242 #[cfg(debug_assertions)]
243 sink::Column::Definition,
244 ])
245 .filter(sink::Column::SinkId.is_in(job_ids.clone()))
246 .into_tuple()
247 .all(txn)
248 .await?;
249
250 let source_definitions: Vec<(ObjectId, String)> = Source::find()
251 .select_only()
252 .columns([
253 source::Column::SourceId,
254 #[cfg(not(debug_assertions))]
255 source::Column::Name,
256 #[cfg(debug_assertions)]
257 source::Column::Definition,
258 ])
259 .filter(source::Column::SourceId.is_in(job_ids.clone()))
260 .into_tuple()
261 .all(txn)
262 .await?;
263
264 let definitions: HashMap<ObjectId, String> = common_job_definitions
265 .into_iter()
266 .chain(sink_definitions.into_iter())
267 .chain(source_definitions.into_iter())
268 .collect();
269
270 Ok(definitions)
271}
272
273impl CatalogController {
274 pub async fn resolve_working_set_for_reschedule_fragments(
275 &self,
276 fragment_ids: Vec<FragmentId>,
277 ) -> MetaResult<RescheduleWorkingSet> {
278 let inner = self.inner.read().await;
279 self.resolve_working_set_for_reschedule_helper(&inner.db, fragment_ids)
280 .await
281 }
282
283 pub async fn resolve_working_set_for_reschedule_tables(
284 &self,
285 table_ids: Vec<ObjectId>,
286 ) -> MetaResult<RescheduleWorkingSet> {
287 let inner = self.inner.read().await;
288 let txn = inner.db.begin().await?;
289
290 let fragment_ids: Vec<FragmentId> = Fragment::find()
291 .select_only()
292 .column(fragment::Column::FragmentId)
293 .filter(fragment::Column::JobId.is_in(table_ids))
294 .into_tuple()
295 .all(&txn)
296 .await?;
297
298 self.resolve_working_set_for_reschedule_helper(&txn, fragment_ids)
299 .await
300 }
301
302 pub async fn resolve_working_set_for_reschedule_helper<C>(
303 &self,
304 txn: &C,
305 fragment_ids: Vec<FragmentId>,
306 ) -> MetaResult<RescheduleWorkingSet>
307 where
308 C: ConnectionTrait,
309 {
310 let no_shuffle_related_upstream_fragment_ids = resolve_no_shuffle_query(
312 txn,
313 construct_no_shuffle_upstream_traverse_query(fragment_ids.clone()),
314 )
315 .await?;
316
317 let no_shuffle_related_downstream_fragment_ids = resolve_no_shuffle_query(
319 txn,
320 construct_no_shuffle_downstream_traverse_query(
321 no_shuffle_related_upstream_fragment_ids
322 .iter()
323 .map(|(src, _, _)| *src)
324 .chain(fragment_ids.iter().cloned())
325 .unique()
326 .collect(),
327 ),
328 )
329 .await?;
330
331 let extended_fragment_ids: HashSet<_> = no_shuffle_related_upstream_fragment_ids
333 .iter()
334 .chain(no_shuffle_related_downstream_fragment_ids.iter())
335 .flat_map(|(src, _, dst)| [*src, *dst])
336 .chain(fragment_ids.iter().cloned())
337 .collect();
338
339 let query = FragmentRelation::find()
340 .select_only()
341 .column(fragment_relation::Column::SourceFragmentId)
342 .column(fragment_relation::Column::DispatcherType)
343 .column(fragment_relation::Column::TargetFragmentId)
344 .distinct();
345
346 let upstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query
348 .clone()
349 .filter(
350 fragment_relation::Column::TargetFragmentId.is_in(extended_fragment_ids.clone()),
351 )
352 .into_tuple()
353 .all(txn)
354 .await?;
355
356 let downstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query
358 .clone()
359 .filter(
360 fragment_relation::Column::SourceFragmentId.is_in(extended_fragment_ids.clone()),
361 )
362 .into_tuple()
363 .all(txn)
364 .await?;
365
366 let all_fragment_relations: HashSet<_> = no_shuffle_related_upstream_fragment_ids
367 .into_iter()
368 .chain(no_shuffle_related_downstream_fragment_ids.into_iter())
369 .chain(upstream_fragments.into_iter())
370 .chain(downstream_fragments.into_iter())
371 .collect();
372
373 let mut fragment_upstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>> =
374 HashMap::new();
375 let mut fragment_downstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>> =
376 HashMap::new();
377
378 for (src, dispatcher_type, dst) in &all_fragment_relations {
379 fragment_upstreams
380 .entry(*dst)
381 .or_default()
382 .insert(*src, *dispatcher_type);
383 fragment_downstreams
384 .entry(*src)
385 .or_default()
386 .insert(*dst, *dispatcher_type);
387 }
388
389 let all_fragment_ids: HashSet<_> = all_fragment_relations
390 .iter()
391 .flat_map(|(src, _, dst)| [*src, *dst])
392 .chain(extended_fragment_ids.into_iter())
393 .collect();
394
395 let fragments: Vec<_> = Fragment::find()
396 .filter(fragment::Column::FragmentId.is_in(all_fragment_ids.clone()))
397 .all(txn)
398 .await?;
399
400 let actors: Vec<_> = Actor::find()
401 .filter(actor::Column::FragmentId.is_in(all_fragment_ids.clone()))
402 .all(txn)
403 .await?;
404
405 let actors: HashMap<_, _> = actors
406 .into_iter()
407 .map(|actor| (actor.actor_id as _, actor))
408 .collect();
409
410 let fragments: HashMap<FragmentId, _> = fragments
411 .into_iter()
412 .map(|fragment| (fragment.fragment_id, fragment))
413 .collect();
414
415 let related_job_ids: HashSet<_> =
416 fragments.values().map(|fragment| fragment.job_id).collect();
417
418 let mut job_resource_groups = HashMap::new();
419 for &job_id in &related_job_ids {
420 let resource_group = get_existing_job_resource_group(txn, job_id).await?;
421 job_resource_groups.insert(job_id, resource_group);
422 }
423
424 let related_job_definitions =
425 resolve_streaming_job_definition(txn, &related_job_ids).await?;
426
427 let related_jobs = StreamingJob::find()
428 .filter(streaming_job::Column::JobId.is_in(related_job_ids))
429 .all(txn)
430 .await?;
431
432 let related_jobs = related_jobs
433 .into_iter()
434 .map(|job| {
435 let job_id = job.job_id;
436 (
437 job_id,
438 (
439 job,
440 related_job_definitions
441 .get(&job_id)
442 .cloned()
443 .unwrap_or("".to_owned()),
444 ),
445 )
446 })
447 .collect();
448
449 let fragment_actor_dispatchers = get_fragment_actor_dispatchers(
450 txn,
451 fragments
452 .keys()
453 .map(|fragment_id| *fragment_id as _)
454 .collect(),
455 )
456 .await?;
457
458 Ok(RescheduleWorkingSet {
459 fragments,
460 actors,
461 actor_dispatchers: fragment_actor_dispatchers.into_values().flatten().collect(),
462 fragment_downstreams,
463 fragment_upstreams,
464 job_resource_groups,
465 related_jobs,
466 })
467 }
468}
469
470macro_rules! crit_check_in_loop {
471 ($flag:expr, $condition:expr, $message:expr) => {
472 if !$condition {
473 tracing::error!("Integrity check failed: {}", $message);
474 $flag = true;
475 continue;
476 }
477 };
478}
479
480impl CatalogController {
481 pub async fn integrity_check(&self) -> MetaResult<()> {
482 let inner = self.inner.read().await;
483 let txn = inner.db.begin().await?;
484 Self::graph_check(&txn).await
485 }
486
487 pub async fn graph_check<C>(txn: &C) -> MetaResult<()>
489 where
490 C: ConnectionTrait,
491 {
492 #[derive(Clone, DerivePartialModel, FromQueryResult)]
493 #[sea_orm(entity = "Fragment")]
494 pub struct PartialFragment {
495 pub fragment_id: FragmentId,
496 pub distribution_type: DistributionType,
497 pub vnode_count: i32,
498 }
499
500 #[derive(Clone, DerivePartialModel, FromQueryResult)]
501 #[sea_orm(entity = "Actor")]
502 pub struct PartialActor {
503 pub actor_id: risingwave_meta_model::ActorId,
504 pub fragment_id: FragmentId,
505 pub status: ActorStatus,
506 pub splits: Option<ConnectorSplits>,
507 pub vnode_bitmap: Option<VnodeBitmap>,
508 }
509
510 let mut flag = false;
511
512 let fragments: Vec<PartialFragment> =
513 Fragment::find().into_partial_model().all(txn).await?;
514
515 let fragment_map: HashMap<_, _> = fragments
516 .into_iter()
517 .map(|fragment| (fragment.fragment_id, fragment))
518 .collect();
519
520 let actors: Vec<PartialActor> = Actor::find().into_partial_model().all(txn).await?;
521
522 let mut fragment_actors = HashMap::new();
523 for actor in &actors {
524 fragment_actors
525 .entry(actor.fragment_id)
526 .or_insert(HashSet::new())
527 .insert(actor.actor_id);
528 }
529
530 let actor_map: HashMap<_, _> = actors
531 .into_iter()
532 .map(|actor| (actor.actor_id, actor))
533 .collect();
534
535 for (fragment_id, actor_ids) in &fragment_actors {
536 crit_check_in_loop!(
537 flag,
538 fragment_map.contains_key(fragment_id),
539 format!("Fragment {fragment_id} has actors {actor_ids:?} which does not exist",)
540 );
541
542 let mut split_map = HashMap::new();
543 for actor_id in actor_ids {
544 let actor = &actor_map[actor_id];
545
546 if let Some(splits) = &actor.splits {
547 for split in splits.to_protobuf().splits {
548 let Ok(split_impl) = SplitImpl::try_from(&split) else {
549 continue;
550 };
551
552 let dup_split_actor = split_map.insert(split_impl.id(), actor_id);
553 crit_check_in_loop!(
554 flag,
555 dup_split_actor.is_none(),
556 format!(
557 "Fragment {fragment_id} actor {actor_id} has duplicate split {split:?} from actor {dup_split_actor:?}",
558 )
559 );
560 }
561 }
562 }
563
564 let fragment = &fragment_map[fragment_id];
565
566 match fragment.distribution_type {
567 DistributionType::Single => {
568 crit_check_in_loop!(
569 flag,
570 actor_ids.len() == 1,
571 format!(
572 "Fragment {fragment_id} has more than one actors {actor_ids:?} for single distribution type",
573 )
574 );
575
576 let actor_id = actor_ids.iter().exactly_one().unwrap();
577 let actor = &actor_map[actor_id];
578
579 crit_check_in_loop!(
580 flag,
581 actor.vnode_bitmap.is_none(),
582 format!(
583 "Fragment {fragment_id} actor {actor_id} has vnode_bitmap set for single distribution type",
584 )
585 );
586 }
587 DistributionType::Hash => {
588 crit_check_in_loop!(
589 flag,
590 !actor_ids.is_empty(),
591 format!(
592 "Fragment {fragment_id} has less than one actors {actor_ids:?} for hash distribution type",
593 )
594 );
595
596 let fragment_vnode_count = fragment.vnode_count as usize;
597
598 let mut result_bitmap = Bitmap::zeros(fragment_vnode_count);
599
600 for actor_id in actor_ids {
601 let actor = &actor_map[actor_id];
602
603 crit_check_in_loop!(
604 flag,
605 actor.vnode_bitmap.is_some(),
606 format!(
607 "Fragment {fragment_id} actor {actor_id} has no vnode_bitmap set for hash distribution type",
608 )
609 );
610
611 let bitmap =
612 Bitmap::from(actor.vnode_bitmap.as_ref().unwrap().to_protobuf());
613
614 crit_check_in_loop!(
615 flag,
616 result_bitmap.clone().bitand(&bitmap).count_ones() == 0,
617 format!(
618 "Fragment {fragment_id} actor {actor_id} has duplicate vnode_bitmap with other actor for hash distribution type, actor bitmap {bitmap:?}, other all bitmap {result_bitmap:?}",
619 )
620 );
621
622 result_bitmap.bitor_assign(&bitmap);
623 }
624
625 crit_check_in_loop!(
626 flag,
627 result_bitmap.all(),
628 format!(
629 "Fragment {fragment_id} has incomplete vnode_bitmap for hash distribution type",
630 )
631 );
632
633 let discovered_vnode_count = result_bitmap.count_ones();
634
635 crit_check_in_loop!(
636 flag,
637 discovered_vnode_count == fragment_vnode_count,
638 format!(
639 "Fragment {fragment_id} has different vnode_count {fragment_vnode_count} with discovered vnode count {discovered_vnode_count} for hash distribution type",
640 )
641 );
642 }
643 }
644 }
645
646 for PartialActor {
647 actor_id, status, ..
648 } in actor_map.values()
649 {
650 crit_check_in_loop!(
651 flag,
652 *status == ActorStatus::Running,
653 format!("Actor {actor_id} has status {status:?} which is not Running",)
654 );
655 }
656
657 if flag {
658 return Err(MetaError::integrity_check_failed());
659 }
660
661 Ok(())
662 }
663}