1use std::collections::{HashMap, HashSet};
16use std::ops::{BitAnd, BitOrAssign};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_connector::source::{SplitImpl, SplitMetaData};
21use risingwave_meta_model::actor::ActorStatus;
22use risingwave_meta_model::fragment::DistributionType;
23use risingwave_meta_model::prelude::{
24 Actor, Fragment, FragmentRelation, Sink, Source, StreamingJob, Table,
25};
26use risingwave_meta_model::{
27 ConnectorSplits, DispatcherType, FragmentId, ObjectId, VnodeBitmap, actor, fragment,
28 fragment_relation, sink, source, streaming_job, table,
29};
30use risingwave_meta_model_migration::{
31 Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement,
32 UnionType, WithClause, WithQuery,
33};
34use risingwave_pb::stream_plan::PbDispatcher;
35use sea_orm::{
36 ColumnTrait, ConnectionTrait, DbErr, DerivePartialModel, EntityTrait, FromQueryResult,
37 QueryFilter, QuerySelect, Statement, TransactionTrait,
38};
39
40use crate::controller::catalog::CatalogController;
41use crate::controller::utils::{get_existing_job_resource_group, get_fragment_actor_dispatchers};
42use crate::model::ActorId;
43use crate::{MetaError, MetaResult};
44
45pub fn construct_no_shuffle_upstream_traverse_query(fragment_ids: Vec<FragmentId>) -> WithQuery {
61 construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Upstream)
62}
63
64pub fn construct_no_shuffle_downstream_traverse_query(fragment_ids: Vec<FragmentId>) -> WithQuery {
65 construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Downstream)
66}
67
68enum NoShuffleResolveDirection {
69 Upstream,
70 Downstream,
71}
72
73fn construct_no_shuffle_traverse_query_helper(
74 fragment_ids: Vec<FragmentId>,
75 direction: NoShuffleResolveDirection,
76) -> WithQuery {
77 let cte_alias = Alias::new("shuffle_deps");
78
79 let (cte_ref_column, compared_column) = match direction {
84 NoShuffleResolveDirection::Upstream => (
85 (
86 cte_alias.clone(),
87 fragment_relation::Column::SourceFragmentId,
88 )
89 .into_column_ref(),
90 (
91 FragmentRelation,
92 fragment_relation::Column::TargetFragmentId,
93 )
94 .into_column_ref(),
95 ),
96 NoShuffleResolveDirection::Downstream => (
97 (
98 cte_alias.clone(),
99 fragment_relation::Column::TargetFragmentId,
100 )
101 .into_column_ref(),
102 (
103 FragmentRelation,
104 fragment_relation::Column::SourceFragmentId,
105 )
106 .into_column_ref(),
107 ),
108 };
109
110 let mut base_query = SelectStatement::new()
111 .column((
112 FragmentRelation,
113 fragment_relation::Column::SourceFragmentId,
114 ))
115 .column((FragmentRelation, fragment_relation::Column::DispatcherType))
116 .column((
117 FragmentRelation,
118 fragment_relation::Column::TargetFragmentId,
119 ))
120 .distinct()
121 .from(FragmentRelation)
122 .and_where(
123 Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType))
124 .eq(DispatcherType::NoShuffle),
125 )
126 .and_where(Expr::col(compared_column.clone()).is_in(fragment_ids.clone()))
127 .to_owned();
128
129 let cte_referencing = SelectStatement::new()
130 .column((
131 FragmentRelation,
132 fragment_relation::Column::SourceFragmentId,
133 ))
134 .column((FragmentRelation, fragment_relation::Column::DispatcherType))
135 .column((
136 FragmentRelation,
137 fragment_relation::Column::TargetFragmentId,
138 ))
139 .from(FragmentRelation)
142 .inner_join(
143 cte_alias.clone(),
144 Expr::col(cte_ref_column).eq(Expr::col(compared_column)),
145 )
146 .and_where(
147 Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType))
148 .eq(DispatcherType::NoShuffle),
149 )
150 .to_owned();
151
152 let common_table_expr = CommonTableExpression::new()
153 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
154 .column(fragment_relation::Column::SourceFragmentId)
155 .column(fragment_relation::Column::DispatcherType)
156 .column(fragment_relation::Column::TargetFragmentId)
157 .table_name(cte_alias.clone())
158 .to_owned();
159
160 SelectStatement::new()
161 .column(fragment_relation::Column::SourceFragmentId)
162 .column(fragment_relation::Column::DispatcherType)
163 .column(fragment_relation::Column::TargetFragmentId)
164 .distinct()
165 .from(cte_alias.clone())
166 .to_owned()
167 .with(
168 WithClause::new()
169 .recursive(true)
170 .cte(common_table_expr)
171 .to_owned(),
172 )
173 .to_owned()
174}
175
176#[derive(Debug, Clone)]
177pub struct RescheduleWorkingSet {
178 pub fragments: HashMap<FragmentId, fragment::Model>,
179 pub actors: HashMap<ActorId, actor::Model>,
180 pub actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
181
182 pub fragment_downstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
183 pub fragment_upstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
184
185 pub job_resource_groups: HashMap<ObjectId, String>,
186 pub related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
187}
188
189async fn resolve_no_shuffle_query<C>(
190 txn: &C,
191 query: WithQuery,
192) -> MetaResult<Vec<(FragmentId, DispatcherType, FragmentId)>>
193where
194 C: ConnectionTrait,
195{
196 let (sql, values) = query.build_any(&*txn.get_database_backend().get_query_builder());
197
198 let result = txn
199 .query_all(Statement::from_sql_and_values(
200 txn.get_database_backend(),
201 sql,
202 values,
203 ))
204 .await?
205 .into_iter()
206 .map(|res| res.try_get_many_by_index())
207 .collect::<Result<Vec<(FragmentId, DispatcherType, FragmentId)>, DbErr>>()
208 .map_err(MetaError::from)?;
209
210 Ok(result)
211}
212
213pub(crate) async fn resolve_streaming_job_definition<C>(
214 txn: &C,
215 job_ids: &HashSet<ObjectId>,
216) -> MetaResult<HashMap<ObjectId, String>>
217where
218 C: ConnectionTrait,
219{
220 let job_ids = job_ids.iter().cloned().collect_vec();
221
222 let common_job_definitions: Vec<(ObjectId, String)> = Table::find()
224 .select_only()
225 .columns([
226 table::Column::TableId,
227 #[cfg(not(debug_assertions))]
228 table::Column::Name,
229 #[cfg(debug_assertions)]
230 table::Column::Definition,
231 ])
232 .filter(table::Column::TableId.is_in(job_ids.clone()))
233 .into_tuple()
234 .all(txn)
235 .await?;
236
237 let sink_definitions: Vec<(ObjectId, String)> = Sink::find()
238 .select_only()
239 .columns([
240 sink::Column::SinkId,
241 #[cfg(not(debug_assertions))]
242 sink::Column::Name,
243 #[cfg(debug_assertions)]
244 sink::Column::Definition,
245 ])
246 .filter(sink::Column::SinkId.is_in(job_ids.clone()))
247 .into_tuple()
248 .all(txn)
249 .await?;
250
251 let source_definitions: Vec<(ObjectId, String)> = Source::find()
252 .select_only()
253 .columns([
254 source::Column::SourceId,
255 #[cfg(not(debug_assertions))]
256 source::Column::Name,
257 #[cfg(debug_assertions)]
258 source::Column::Definition,
259 ])
260 .filter(source::Column::SourceId.is_in(job_ids.clone()))
261 .into_tuple()
262 .all(txn)
263 .await?;
264
265 let definitions: HashMap<ObjectId, String> = common_job_definitions
266 .into_iter()
267 .chain(sink_definitions.into_iter())
268 .chain(source_definitions.into_iter())
269 .collect();
270
271 Ok(definitions)
272}
273
274impl CatalogController {
275 pub async fn resolve_working_set_for_reschedule_fragments(
276 &self,
277 fragment_ids: Vec<FragmentId>,
278 ) -> MetaResult<RescheduleWorkingSet> {
279 let inner = self.inner.read().await;
280 self.resolve_working_set_for_reschedule_helper(&inner.db, fragment_ids)
281 .await
282 }
283
284 pub async fn resolve_working_set_for_reschedule_tables(
285 &self,
286 table_ids: Vec<ObjectId>,
287 ) -> MetaResult<RescheduleWorkingSet> {
288 let inner = self.inner.read().await;
289 let txn = inner.db.begin().await?;
290
291 let fragment_ids: Vec<FragmentId> = Fragment::find()
292 .select_only()
293 .column(fragment::Column::FragmentId)
294 .filter(fragment::Column::JobId.is_in(table_ids))
295 .into_tuple()
296 .all(&txn)
297 .await?;
298
299 self.resolve_working_set_for_reschedule_helper(&txn, fragment_ids)
300 .await
301 }
302
303 pub async fn resolve_working_set_for_reschedule_helper<C>(
304 &self,
305 txn: &C,
306 fragment_ids: Vec<FragmentId>,
307 ) -> MetaResult<RescheduleWorkingSet>
308 where
309 C: ConnectionTrait,
310 {
311 let no_shuffle_related_upstream_fragment_ids = resolve_no_shuffle_query(
313 txn,
314 construct_no_shuffle_upstream_traverse_query(fragment_ids.clone()),
315 )
316 .await?;
317
318 let no_shuffle_related_downstream_fragment_ids = resolve_no_shuffle_query(
320 txn,
321 construct_no_shuffle_downstream_traverse_query(
322 no_shuffle_related_upstream_fragment_ids
323 .iter()
324 .map(|(src, _, _)| *src)
325 .chain(fragment_ids.iter().cloned())
326 .unique()
327 .collect(),
328 ),
329 )
330 .await?;
331
332 let extended_fragment_ids: HashSet<_> = no_shuffle_related_upstream_fragment_ids
334 .iter()
335 .chain(no_shuffle_related_downstream_fragment_ids.iter())
336 .flat_map(|(src, _, dst)| [*src, *dst])
337 .chain(fragment_ids.iter().cloned())
338 .collect();
339
340 let query = FragmentRelation::find()
341 .select_only()
342 .column(fragment_relation::Column::SourceFragmentId)
343 .column(fragment_relation::Column::DispatcherType)
344 .column(fragment_relation::Column::TargetFragmentId)
345 .distinct();
346
347 let upstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query
349 .clone()
350 .filter(
351 fragment_relation::Column::TargetFragmentId.is_in(extended_fragment_ids.clone()),
352 )
353 .into_tuple()
354 .all(txn)
355 .await?;
356
357 let downstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query
359 .clone()
360 .filter(
361 fragment_relation::Column::SourceFragmentId.is_in(extended_fragment_ids.clone()),
362 )
363 .into_tuple()
364 .all(txn)
365 .await?;
366
367 let all_fragment_relations: HashSet<_> = no_shuffle_related_upstream_fragment_ids
368 .into_iter()
369 .chain(no_shuffle_related_downstream_fragment_ids.into_iter())
370 .chain(upstream_fragments.into_iter())
371 .chain(downstream_fragments.into_iter())
372 .collect();
373
374 let mut fragment_upstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>> =
375 HashMap::new();
376 let mut fragment_downstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>> =
377 HashMap::new();
378
379 for (src, dispatcher_type, dst) in &all_fragment_relations {
380 fragment_upstreams
381 .entry(*dst)
382 .or_default()
383 .insert(*src, *dispatcher_type);
384 fragment_downstreams
385 .entry(*src)
386 .or_default()
387 .insert(*dst, *dispatcher_type);
388 }
389
390 let all_fragment_ids: HashSet<_> = all_fragment_relations
391 .iter()
392 .flat_map(|(src, _, dst)| [*src, *dst])
393 .chain(extended_fragment_ids.into_iter())
394 .collect();
395
396 let fragments: Vec<_> = Fragment::find()
397 .filter(fragment::Column::FragmentId.is_in(all_fragment_ids.clone()))
398 .all(txn)
399 .await?;
400
401 let actors: Vec<_> = Actor::find()
402 .filter(actor::Column::FragmentId.is_in(all_fragment_ids.clone()))
403 .all(txn)
404 .await?;
405
406 let actors: HashMap<_, _> = actors
407 .into_iter()
408 .map(|actor| (actor.actor_id as _, actor))
409 .collect();
410
411 let fragments: HashMap<FragmentId, _> = fragments
412 .into_iter()
413 .map(|fragment| (fragment.fragment_id, fragment))
414 .collect();
415
416 let related_job_ids: HashSet<_> =
417 fragments.values().map(|fragment| fragment.job_id).collect();
418
419 let mut job_resource_groups = HashMap::new();
420 for &job_id in &related_job_ids {
421 let resource_group = get_existing_job_resource_group(txn, job_id).await?;
422 job_resource_groups.insert(job_id, resource_group);
423 }
424
425 let related_job_definitions =
426 resolve_streaming_job_definition(txn, &related_job_ids).await?;
427
428 let related_jobs = StreamingJob::find()
429 .filter(streaming_job::Column::JobId.is_in(related_job_ids))
430 .all(txn)
431 .await?;
432
433 let related_jobs = related_jobs
434 .into_iter()
435 .map(|job| {
436 let job_id = job.job_id;
437 (
438 job_id,
439 (
440 job,
441 related_job_definitions
442 .get(&job_id)
443 .cloned()
444 .unwrap_or("".to_owned()),
445 ),
446 )
447 })
448 .collect();
449
450 let fragment_actor_dispatchers = get_fragment_actor_dispatchers(
451 txn,
452 fragments
453 .keys()
454 .map(|fragment_id| *fragment_id as _)
455 .collect(),
456 )
457 .await?;
458
459 Ok(RescheduleWorkingSet {
460 fragments,
461 actors,
462 actor_dispatchers: fragment_actor_dispatchers.into_values().flatten().collect(),
463 fragment_downstreams,
464 fragment_upstreams,
465 job_resource_groups,
466 related_jobs,
467 })
468 }
469}
470
471macro_rules! crit_check_in_loop {
472 ($flag:expr, $condition:expr, $message:expr) => {
473 if !$condition {
474 tracing::error!("Integrity check failed: {}", $message);
475 $flag = true;
476 continue;
477 }
478 };
479}
480
481impl CatalogController {
482 pub async fn integrity_check(&self) -> MetaResult<()> {
483 let inner = self.inner.read().await;
484 let txn = inner.db.begin().await?;
485 Self::graph_check(&txn).await
486 }
487
488 pub async fn graph_check<C>(txn: &C) -> MetaResult<()>
490 where
491 C: ConnectionTrait,
492 {
493 #[derive(Clone, DerivePartialModel, FromQueryResult)]
494 #[sea_orm(entity = "Fragment")]
495 pub struct PartialFragment {
496 pub fragment_id: FragmentId,
497 pub distribution_type: DistributionType,
498 pub vnode_count: i32,
499 }
500
501 #[derive(Clone, DerivePartialModel, FromQueryResult)]
502 #[sea_orm(entity = "Actor")]
503 pub struct PartialActor {
504 pub actor_id: risingwave_meta_model::ActorId,
505 pub fragment_id: FragmentId,
506 pub status: ActorStatus,
507 pub splits: Option<ConnectorSplits>,
508 pub vnode_bitmap: Option<VnodeBitmap>,
509 }
510
511 let mut flag = false;
512
513 let fragments: Vec<PartialFragment> =
514 Fragment::find().into_partial_model().all(txn).await?;
515
516 let fragment_map: HashMap<_, _> = fragments
517 .into_iter()
518 .map(|fragment| (fragment.fragment_id, fragment))
519 .collect();
520
521 let actors: Vec<PartialActor> = Actor::find().into_partial_model().all(txn).await?;
522
523 let mut fragment_actors = HashMap::new();
524 for actor in &actors {
525 fragment_actors
526 .entry(actor.fragment_id)
527 .or_insert(HashSet::new())
528 .insert(actor.actor_id);
529 }
530
531 let actor_map: HashMap<_, _> = actors
532 .into_iter()
533 .map(|actor| (actor.actor_id, actor))
534 .collect();
535
536 for (fragment_id, actor_ids) in &fragment_actors {
537 crit_check_in_loop!(
538 flag,
539 fragment_map.contains_key(fragment_id),
540 format!("Fragment {fragment_id} has actors {actor_ids:?} which does not exist",)
541 );
542
543 let mut split_map = HashMap::new();
544 for actor_id in actor_ids {
545 let actor = &actor_map[actor_id];
546
547 if let Some(splits) = &actor.splits {
548 for split in splits.to_protobuf().splits {
549 let Ok(split_impl) = SplitImpl::try_from(&split) else {
550 continue;
551 };
552
553 let dup_split_actor = split_map.insert(split_impl.id(), actor_id);
554 crit_check_in_loop!(
555 flag,
556 dup_split_actor.is_none(),
557 format!(
558 "Fragment {fragment_id} actor {actor_id} has duplicate split {split:?} from actor {dup_split_actor:?}",
559 )
560 );
561 }
562 }
563 }
564
565 let fragment = &fragment_map[fragment_id];
566
567 match fragment.distribution_type {
568 DistributionType::Single => {
569 crit_check_in_loop!(
570 flag,
571 actor_ids.len() == 1,
572 format!(
573 "Fragment {fragment_id} has more than one actors {actor_ids:?} for single distribution type",
574 )
575 );
576
577 let actor_id = actor_ids.iter().exactly_one().unwrap();
578 let actor = &actor_map[actor_id];
579
580 crit_check_in_loop!(
581 flag,
582 actor.vnode_bitmap.is_none(),
583 format!(
584 "Fragment {fragment_id} actor {actor_id} has vnode_bitmap set for single distribution type",
585 )
586 );
587 }
588 DistributionType::Hash => {
589 crit_check_in_loop!(
590 flag,
591 !actor_ids.is_empty(),
592 format!(
593 "Fragment {fragment_id} has less than one actors {actor_ids:?} for hash distribution type",
594 )
595 );
596
597 let fragment_vnode_count = fragment.vnode_count as usize;
598
599 let mut result_bitmap = Bitmap::zeros(fragment_vnode_count);
600
601 for actor_id in actor_ids {
602 let actor = &actor_map[actor_id];
603
604 crit_check_in_loop!(
605 flag,
606 actor.vnode_bitmap.is_some(),
607 format!(
608 "Fragment {fragment_id} actor {actor_id} has no vnode_bitmap set for hash distribution type",
609 )
610 );
611
612 let bitmap =
613 Bitmap::from(actor.vnode_bitmap.as_ref().unwrap().to_protobuf());
614
615 crit_check_in_loop!(
616 flag,
617 result_bitmap.clone().bitand(&bitmap).count_ones() == 0,
618 format!(
619 "Fragment {fragment_id} actor {actor_id} has duplicate vnode_bitmap with other actor for hash distribution type, actor bitmap {bitmap:?}, other all bitmap {result_bitmap:?}",
620 )
621 );
622
623 result_bitmap.bitor_assign(&bitmap);
624 }
625
626 crit_check_in_loop!(
627 flag,
628 result_bitmap.all(),
629 format!(
630 "Fragment {fragment_id} has incomplete vnode_bitmap for hash distribution type",
631 )
632 );
633
634 let discovered_vnode_count = result_bitmap.count_ones();
635
636 crit_check_in_loop!(
637 flag,
638 discovered_vnode_count == fragment_vnode_count,
639 format!(
640 "Fragment {fragment_id} has different vnode_count {fragment_vnode_count} with discovered vnode count {discovered_vnode_count} for hash distribution type",
641 )
642 );
643 }
644 }
645 }
646
647 for PartialActor {
648 actor_id, status, ..
649 } in actor_map.values()
650 {
651 crit_check_in_loop!(
652 flag,
653 *status == ActorStatus::Running,
654 format!("Actor {actor_id} has status {status:?} which is not Running",)
655 );
656 }
657
658 if flag {
659 return Err(MetaError::integrity_check_failed());
660 }
661
662 Ok(())
663 }
664}