1use std::collections::{HashMap, HashSet};
16use std::ops::{BitAnd, BitOrAssign};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_connector::source::{SplitImpl, SplitMetaData};
21use risingwave_meta_model::actor::ActorStatus;
22use risingwave_meta_model::actor_dispatcher::DispatcherType;
23use risingwave_meta_model::fragment::DistributionType;
24use risingwave_meta_model::prelude::{
25 Actor, Fragment, FragmentRelation, Sink, Source, StreamingJob, Table,
26};
27use risingwave_meta_model::{
28 ConnectorSplits, FragmentId, ObjectId, VnodeBitmap, actor, fragment, fragment_relation, sink,
29 source, streaming_job, table,
30};
31use risingwave_meta_model_migration::{
32 Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement,
33 UnionType, WithClause, WithQuery,
34};
35use risingwave_pb::stream_plan::PbDispatcher;
36use sea_orm::{
37 ColumnTrait, ConnectionTrait, DbErr, DerivePartialModel, EntityTrait, FromQueryResult,
38 QueryFilter, QuerySelect, Statement, TransactionTrait,
39};
40
41use crate::controller::catalog::CatalogController;
42use crate::controller::utils::{get_existing_job_resource_group, get_fragment_actor_dispatchers};
43use crate::model::ActorId;
44use crate::{MetaError, MetaResult};
45
46pub fn construct_no_shuffle_upstream_traverse_query(fragment_ids: Vec<FragmentId>) -> WithQuery {
62 construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Upstream)
63}
64
65pub fn construct_no_shuffle_downstream_traverse_query(fragment_ids: Vec<FragmentId>) -> WithQuery {
66 construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Downstream)
67}
68
69enum NoShuffleResolveDirection {
70 Upstream,
71 Downstream,
72}
73
74fn construct_no_shuffle_traverse_query_helper(
75 fragment_ids: Vec<FragmentId>,
76 direction: NoShuffleResolveDirection,
77) -> WithQuery {
78 let cte_alias = Alias::new("shuffle_deps");
79
80 let (cte_ref_column, compared_column) = match direction {
85 NoShuffleResolveDirection::Upstream => (
86 (
87 cte_alias.clone(),
88 fragment_relation::Column::SourceFragmentId,
89 )
90 .into_column_ref(),
91 (
92 FragmentRelation,
93 fragment_relation::Column::TargetFragmentId,
94 )
95 .into_column_ref(),
96 ),
97 NoShuffleResolveDirection::Downstream => (
98 (
99 cte_alias.clone(),
100 fragment_relation::Column::TargetFragmentId,
101 )
102 .into_column_ref(),
103 (
104 FragmentRelation,
105 fragment_relation::Column::SourceFragmentId,
106 )
107 .into_column_ref(),
108 ),
109 };
110
111 let mut base_query = SelectStatement::new()
112 .column((
113 FragmentRelation,
114 fragment_relation::Column::SourceFragmentId,
115 ))
116 .column((FragmentRelation, fragment_relation::Column::DispatcherType))
117 .column((
118 FragmentRelation,
119 fragment_relation::Column::TargetFragmentId,
120 ))
121 .distinct()
122 .from(FragmentRelation)
123 .and_where(
124 Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType))
125 .eq(DispatcherType::NoShuffle),
126 )
127 .and_where(Expr::col(compared_column.clone()).is_in(fragment_ids.clone()))
128 .to_owned();
129
130 let cte_referencing = SelectStatement::new()
131 .column((
132 FragmentRelation,
133 fragment_relation::Column::SourceFragmentId,
134 ))
135 .column((FragmentRelation, fragment_relation::Column::DispatcherType))
136 .column((
137 FragmentRelation,
138 fragment_relation::Column::TargetFragmentId,
139 ))
140 .from(FragmentRelation)
143 .inner_join(
144 cte_alias.clone(),
145 Expr::col(cte_ref_column).eq(Expr::col(compared_column)),
146 )
147 .and_where(
148 Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType))
149 .eq(DispatcherType::NoShuffle),
150 )
151 .to_owned();
152
153 let common_table_expr = CommonTableExpression::new()
154 .query(base_query.union(UnionType::All, cte_referencing).to_owned())
155 .column(fragment_relation::Column::SourceFragmentId)
156 .column(fragment_relation::Column::DispatcherType)
157 .column(fragment_relation::Column::TargetFragmentId)
158 .table_name(cte_alias.clone())
159 .to_owned();
160
161 SelectStatement::new()
162 .column(fragment_relation::Column::SourceFragmentId)
163 .column(fragment_relation::Column::DispatcherType)
164 .column(fragment_relation::Column::TargetFragmentId)
165 .distinct()
166 .from(cte_alias.clone())
167 .to_owned()
168 .with(
169 WithClause::new()
170 .recursive(true)
171 .cte(common_table_expr)
172 .to_owned(),
173 )
174 .to_owned()
175}
176
177#[derive(Debug, Clone)]
178pub struct RescheduleWorkingSet {
179 pub fragments: HashMap<FragmentId, fragment::Model>,
180 pub actors: HashMap<ActorId, actor::Model>,
181 pub actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
182
183 pub fragment_downstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
184 pub fragment_upstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
185
186 pub job_resource_groups: HashMap<ObjectId, String>,
187 pub related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
188}
189
190async fn resolve_no_shuffle_query<C>(
191 txn: &C,
192 query: WithQuery,
193) -> MetaResult<Vec<(FragmentId, DispatcherType, FragmentId)>>
194where
195 C: ConnectionTrait,
196{
197 let (sql, values) = query.build_any(&*txn.get_database_backend().get_query_builder());
198
199 let result = txn
200 .query_all(Statement::from_sql_and_values(
201 txn.get_database_backend(),
202 sql,
203 values,
204 ))
205 .await?
206 .into_iter()
207 .map(|res| res.try_get_many_by_index())
208 .collect::<Result<Vec<(FragmentId, DispatcherType, FragmentId)>, DbErr>>()
209 .map_err(MetaError::from)?;
210
211 Ok(result)
212}
213
214pub(crate) async fn resolve_streaming_job_definition<C>(
215 txn: &C,
216 job_ids: &HashSet<ObjectId>,
217) -> MetaResult<HashMap<ObjectId, String>>
218where
219 C: ConnectionTrait,
220{
221 let job_ids = job_ids.iter().cloned().collect_vec();
222
223 let common_job_definitions: Vec<(ObjectId, String)> = Table::find()
225 .select_only()
226 .columns([
227 table::Column::TableId,
228 #[cfg(not(debug_assertions))]
229 table::Column::Name,
230 #[cfg(debug_assertions)]
231 table::Column::Definition,
232 ])
233 .filter(table::Column::TableId.is_in(job_ids.clone()))
234 .into_tuple()
235 .all(txn)
236 .await?;
237
238 let sink_definitions: Vec<(ObjectId, String)> = Sink::find()
239 .select_only()
240 .columns([
241 sink::Column::SinkId,
242 #[cfg(not(debug_assertions))]
243 sink::Column::Name,
244 #[cfg(debug_assertions)]
245 sink::Column::Definition,
246 ])
247 .filter(sink::Column::SinkId.is_in(job_ids.clone()))
248 .into_tuple()
249 .all(txn)
250 .await?;
251
252 let source_definitions: Vec<(ObjectId, String)> = Source::find()
253 .select_only()
254 .columns([
255 source::Column::SourceId,
256 #[cfg(not(debug_assertions))]
257 source::Column::Name,
258 #[cfg(debug_assertions)]
259 source::Column::Definition,
260 ])
261 .filter(source::Column::SourceId.is_in(job_ids.clone()))
262 .into_tuple()
263 .all(txn)
264 .await?;
265
266 let definitions: HashMap<ObjectId, String> = common_job_definitions
267 .into_iter()
268 .chain(sink_definitions.into_iter())
269 .chain(source_definitions.into_iter())
270 .collect();
271
272 Ok(definitions)
273}
274
275impl CatalogController {
276 pub async fn resolve_working_set_for_reschedule_fragments(
277 &self,
278 fragment_ids: Vec<FragmentId>,
279 ) -> MetaResult<RescheduleWorkingSet> {
280 let inner = self.inner.read().await;
281 self.resolve_working_set_for_reschedule_helper(&inner.db, fragment_ids)
282 .await
283 }
284
285 pub async fn resolve_working_set_for_reschedule_tables(
286 &self,
287 table_ids: Vec<ObjectId>,
288 ) -> MetaResult<RescheduleWorkingSet> {
289 let inner = self.inner.read().await;
290 let txn = inner.db.begin().await?;
291
292 let fragment_ids: Vec<FragmentId> = Fragment::find()
293 .select_only()
294 .column(fragment::Column::FragmentId)
295 .filter(fragment::Column::JobId.is_in(table_ids))
296 .into_tuple()
297 .all(&txn)
298 .await?;
299
300 self.resolve_working_set_for_reschedule_helper(&txn, fragment_ids)
301 .await
302 }
303
304 pub async fn resolve_working_set_for_reschedule_helper<C>(
305 &self,
306 txn: &C,
307 fragment_ids: Vec<FragmentId>,
308 ) -> MetaResult<RescheduleWorkingSet>
309 where
310 C: ConnectionTrait,
311 {
312 let no_shuffle_related_upstream_fragment_ids = resolve_no_shuffle_query(
314 txn,
315 construct_no_shuffle_upstream_traverse_query(fragment_ids.clone()),
316 )
317 .await?;
318
319 let no_shuffle_related_downstream_fragment_ids = resolve_no_shuffle_query(
321 txn,
322 construct_no_shuffle_downstream_traverse_query(
323 no_shuffle_related_upstream_fragment_ids
324 .iter()
325 .map(|(src, _, _)| *src)
326 .chain(fragment_ids.iter().cloned())
327 .unique()
328 .collect(),
329 ),
330 )
331 .await?;
332
333 let extended_fragment_ids: HashSet<_> = no_shuffle_related_upstream_fragment_ids
335 .iter()
336 .chain(no_shuffle_related_downstream_fragment_ids.iter())
337 .flat_map(|(src, _, dst)| [*src, *dst])
338 .chain(fragment_ids.iter().cloned())
339 .collect();
340
341 let query = FragmentRelation::find()
342 .select_only()
343 .column(fragment_relation::Column::SourceFragmentId)
344 .column(fragment_relation::Column::DispatcherType)
345 .column(fragment_relation::Column::TargetFragmentId)
346 .distinct();
347
348 let upstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query
350 .clone()
351 .filter(
352 fragment_relation::Column::TargetFragmentId.is_in(extended_fragment_ids.clone()),
353 )
354 .into_tuple()
355 .all(txn)
356 .await?;
357
358 let downstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query
360 .clone()
361 .filter(
362 fragment_relation::Column::SourceFragmentId.is_in(extended_fragment_ids.clone()),
363 )
364 .into_tuple()
365 .all(txn)
366 .await?;
367
368 let all_fragment_relations: HashSet<_> = no_shuffle_related_upstream_fragment_ids
369 .into_iter()
370 .chain(no_shuffle_related_downstream_fragment_ids.into_iter())
371 .chain(upstream_fragments.into_iter())
372 .chain(downstream_fragments.into_iter())
373 .collect();
374
375 let mut fragment_upstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>> =
376 HashMap::new();
377 let mut fragment_downstreams: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>> =
378 HashMap::new();
379
380 for (src, dispatcher_type, dst) in &all_fragment_relations {
381 fragment_upstreams
382 .entry(*dst)
383 .or_default()
384 .insert(*src, *dispatcher_type);
385 fragment_downstreams
386 .entry(*src)
387 .or_default()
388 .insert(*dst, *dispatcher_type);
389 }
390
391 let all_fragment_ids: HashSet<_> = all_fragment_relations
392 .iter()
393 .flat_map(|(src, _, dst)| [*src, *dst])
394 .chain(extended_fragment_ids.into_iter())
395 .collect();
396
397 let fragments: Vec<_> = Fragment::find()
398 .filter(fragment::Column::FragmentId.is_in(all_fragment_ids.clone()))
399 .all(txn)
400 .await?;
401
402 let actors: Vec<_> = Actor::find()
403 .filter(actor::Column::FragmentId.is_in(all_fragment_ids.clone()))
404 .all(txn)
405 .await?;
406
407 let actors: HashMap<_, _> = actors
408 .into_iter()
409 .map(|actor| (actor.actor_id as _, actor))
410 .collect();
411
412 let fragments: HashMap<FragmentId, _> = fragments
413 .into_iter()
414 .map(|fragment| (fragment.fragment_id, fragment))
415 .collect();
416
417 let related_job_ids: HashSet<_> =
418 fragments.values().map(|fragment| fragment.job_id).collect();
419
420 let mut job_resource_groups = HashMap::new();
421 for &job_id in &related_job_ids {
422 let resource_group = get_existing_job_resource_group(txn, job_id).await?;
423 job_resource_groups.insert(job_id, resource_group);
424 }
425
426 let related_job_definitions =
427 resolve_streaming_job_definition(txn, &related_job_ids).await?;
428
429 let related_jobs = StreamingJob::find()
430 .filter(streaming_job::Column::JobId.is_in(related_job_ids))
431 .all(txn)
432 .await?;
433
434 let related_jobs = related_jobs
435 .into_iter()
436 .map(|job| {
437 let job_id = job.job_id;
438 (
439 job_id,
440 (
441 job,
442 related_job_definitions
443 .get(&job_id)
444 .cloned()
445 .unwrap_or("".to_owned()),
446 ),
447 )
448 })
449 .collect();
450
451 let fragment_actor_dispatchers = get_fragment_actor_dispatchers(
452 txn,
453 fragments
454 .keys()
455 .map(|fragment_id| *fragment_id as _)
456 .collect(),
457 )
458 .await?;
459
460 Ok(RescheduleWorkingSet {
461 fragments,
462 actors,
463 actor_dispatchers: fragment_actor_dispatchers.into_values().flatten().collect(),
464 fragment_downstreams,
465 fragment_upstreams,
466 job_resource_groups,
467 related_jobs,
468 })
469 }
470}
471
472macro_rules! crit_check_in_loop {
473 ($flag:expr, $condition:expr, $message:expr) => {
474 if !$condition {
475 tracing::error!("Integrity check failed: {}", $message);
476 $flag = true;
477 continue;
478 }
479 };
480}
481
482impl CatalogController {
483 pub async fn integrity_check(&self) -> MetaResult<()> {
484 let inner = self.inner.read().await;
485 let txn = inner.db.begin().await?;
486 Self::graph_check(&txn).await
487 }
488
489 pub async fn graph_check<C>(txn: &C) -> MetaResult<()>
491 where
492 C: ConnectionTrait,
493 {
494 #[derive(Clone, DerivePartialModel, FromQueryResult)]
495 #[sea_orm(entity = "Fragment")]
496 pub struct PartialFragment {
497 pub fragment_id: FragmentId,
498 pub distribution_type: DistributionType,
499 pub vnode_count: i32,
500 }
501
502 #[derive(Clone, DerivePartialModel, FromQueryResult)]
503 #[sea_orm(entity = "Actor")]
504 pub struct PartialActor {
505 pub actor_id: risingwave_meta_model::ActorId,
506 pub fragment_id: FragmentId,
507 pub status: ActorStatus,
508 pub splits: Option<ConnectorSplits>,
509 pub vnode_bitmap: Option<VnodeBitmap>,
510 }
511
512 let mut flag = false;
513
514 let fragments: Vec<PartialFragment> =
515 Fragment::find().into_partial_model().all(txn).await?;
516
517 let fragment_map: HashMap<_, _> = fragments
518 .into_iter()
519 .map(|fragment| (fragment.fragment_id, fragment))
520 .collect();
521
522 let actors: Vec<PartialActor> = Actor::find().into_partial_model().all(txn).await?;
523
524 let mut fragment_actors = HashMap::new();
525 for actor in &actors {
526 fragment_actors
527 .entry(actor.fragment_id)
528 .or_insert(HashSet::new())
529 .insert(actor.actor_id);
530 }
531
532 let actor_map: HashMap<_, _> = actors
533 .into_iter()
534 .map(|actor| (actor.actor_id, actor))
535 .collect();
536
537 for (fragment_id, actor_ids) in &fragment_actors {
538 crit_check_in_loop!(
539 flag,
540 fragment_map.contains_key(fragment_id),
541 format!("Fragment {fragment_id} has actors {actor_ids:?} which does not exist",)
542 );
543
544 let mut split_map = HashMap::new();
545 for actor_id in actor_ids {
546 let actor = &actor_map[actor_id];
547
548 if let Some(splits) = &actor.splits {
549 for split in splits.to_protobuf().splits {
550 let Ok(split_impl) = SplitImpl::try_from(&split) else {
551 continue;
552 };
553
554 let dup_split_actor = split_map.insert(split_impl.id(), actor_id);
555 crit_check_in_loop!(
556 flag,
557 dup_split_actor.is_none(),
558 format!(
559 "Fragment {fragment_id} actor {actor_id} has duplicate split {split:?} from actor {dup_split_actor:?}",
560 )
561 );
562 }
563 }
564 }
565
566 let fragment = &fragment_map[fragment_id];
567
568 match fragment.distribution_type {
569 DistributionType::Single => {
570 crit_check_in_loop!(
571 flag,
572 actor_ids.len() == 1,
573 format!(
574 "Fragment {fragment_id} has more than one actors {actor_ids:?} for single distribution type",
575 )
576 );
577
578 let actor_id = actor_ids.iter().exactly_one().unwrap();
579 let actor = &actor_map[actor_id];
580
581 crit_check_in_loop!(
582 flag,
583 actor.vnode_bitmap.is_none(),
584 format!(
585 "Fragment {fragment_id} actor {actor_id} has vnode_bitmap set for single distribution type",
586 )
587 );
588 }
589 DistributionType::Hash => {
590 crit_check_in_loop!(
591 flag,
592 !actor_ids.is_empty(),
593 format!(
594 "Fragment {fragment_id} has less than one actors {actor_ids:?} for hash distribution type",
595 )
596 );
597
598 let fragment_vnode_count = fragment.vnode_count as usize;
599
600 let mut result_bitmap = Bitmap::zeros(fragment_vnode_count);
601
602 for actor_id in actor_ids {
603 let actor = &actor_map[actor_id];
604
605 crit_check_in_loop!(
606 flag,
607 actor.vnode_bitmap.is_some(),
608 format!(
609 "Fragment {fragment_id} actor {actor_id} has no vnode_bitmap set for hash distribution type",
610 )
611 );
612
613 let bitmap =
614 Bitmap::from(actor.vnode_bitmap.as_ref().unwrap().to_protobuf());
615
616 crit_check_in_loop!(
617 flag,
618 result_bitmap.clone().bitand(&bitmap).count_ones() == 0,
619 format!(
620 "Fragment {fragment_id} actor {actor_id} has duplicate vnode_bitmap with other actor for hash distribution type, actor bitmap {bitmap:?}, other all bitmap {result_bitmap:?}",
621 )
622 );
623
624 result_bitmap.bitor_assign(&bitmap);
625 }
626
627 crit_check_in_loop!(
628 flag,
629 result_bitmap.all(),
630 format!(
631 "Fragment {fragment_id} has incomplete vnode_bitmap for hash distribution type",
632 )
633 );
634
635 let discovered_vnode_count = result_bitmap.count_ones();
636
637 crit_check_in_loop!(
638 flag,
639 discovered_vnode_count == fragment_vnode_count,
640 format!(
641 "Fragment {fragment_id} has different vnode_count {fragment_vnode_count} with discovered vnode count {discovered_vnode_count} for hash distribution type",
642 )
643 );
644 }
645 }
646 }
647
648 for PartialActor {
649 actor_id, status, ..
650 } in actor_map.values()
651 {
652 crit_check_in_loop!(
653 flag,
654 *status == ActorStatus::Running,
655 format!("Actor {actor_id} has status {status:?} which is not Running",)
656 );
657 }
658
659 if flag {
660 return Err(MetaError::integrity_check_failed());
661 }
662
663 Ok(())
664 }
665}