1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
25use risingwave_hummock_sdk::version::HummockVersion;
26use risingwave_meta_model::SinkId;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use thiserror_ext::AsReport;
30use tracing::{info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::MetaResult;
34use crate::barrier::DatabaseRuntimeInfoSnapshot;
35use crate::barrier::context::GlobalBarrierWorkerContextImpl;
36use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
37use crate::manager::ActiveStreamingWorkerNodes;
38use crate::model::{ActorId, FragmentId, StreamActor};
39use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
40use crate::stream::cdc::reload_cdc_table_snapshot_splits;
41use crate::stream::{SourceChange, StreamFragmentGraph};
42
43impl GlobalBarrierWorkerContextImpl {
44 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
46 self.metadata_manager
47 .catalog_controller
48 .clean_dirty_subscription(database_id)
49 .await?;
50 let dirty_associated_source_ids = self
51 .metadata_manager
52 .catalog_controller
53 .clean_dirty_creating_jobs(database_id)
54 .await?;
55 self.metadata_manager
56 .reset_all_refresh_jobs_to_idle()
57 .await?;
58
59 self.source_manager
61 .apply_source_change(SourceChange::DropSource {
62 dropped_source_ids: dirty_associated_source_ids,
63 })
64 .await;
65
66 Ok(())
67 }
68
69 async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
70 if let Some(database_id) = database_id {
71 let sink_ids = self
72 .metadata_manager
73 .catalog_controller
74 .list_sink_ids(Some(database_id))
75 .await?;
76 self.sink_manager.stop_sink_coordinator(sink_ids).await;
77 } else {
78 self.sink_manager.reset().await;
79 }
80 Ok(())
81 }
82
83 async fn abort_dirty_pending_sink_state(
84 &self,
85 database_id: Option<DatabaseId>,
86 ) -> MetaResult<()> {
87 let pending_sinks: HashSet<SinkId> = self
88 .metadata_manager
89 .catalog_controller
90 .list_all_pending_sinks(database_id)
91 .await?;
92
93 if pending_sinks.is_empty() {
94 return Ok(());
95 }
96
97 let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
98 .metadata_manager
99 .catalog_controller
100 .fetch_sink_with_state_table_ids(pending_sinks)
101 .await?;
102
103 let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
104
105 for (sink_id, table_ids) in sink_with_state_tables {
106 let Some(table_id) = table_ids.first() else {
107 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
108 };
109
110 self.hummock_manager
111 .on_current_version(|version| -> MetaResult<()> {
112 if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
113 assert!(
114 sink_committed_epoch
115 .insert(sink_id, committed_epoch)
116 .is_none()
117 );
118 Ok(())
119 } else {
120 Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
121 }
122 })
123 .await?;
124 }
125
126 self.metadata_manager
127 .catalog_controller
128 .abort_pending_sink_epochs(sink_committed_epoch)
129 .await?;
130
131 Ok(())
132 }
133
134 async fn purge_state_table_from_hummock(
135 &self,
136 all_state_table_ids: &HashSet<TableId>,
137 ) -> MetaResult<()> {
138 self.hummock_manager.purge(all_state_table_ids).await?;
139 Ok(())
140 }
141
142 async fn list_background_job_progress(
143 &self,
144 database_id: Option<DatabaseId>,
145 ) -> MetaResult<HashMap<JobId, String>> {
146 let mgr = &self.metadata_manager;
147 let job_info = mgr
148 .catalog_controller
149 .list_background_creating_jobs(false, database_id)
150 .await?;
151
152 Ok(job_info
153 .into_iter()
154 .map(|(job_id, definition, _init_at)| (job_id, definition))
155 .collect())
156 }
157
158 async fn resolve_database_info(
162 &self,
163 database_id: Option<DatabaseId>,
164 worker_nodes: &ActiveStreamingWorkerNodes,
165 ) -> MetaResult<HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>>
166 {
167 let all_actor_infos = self
168 .metadata_manager
169 .catalog_controller
170 .load_all_actors_dynamic(database_id, worker_nodes)
171 .await?;
172
173 Ok(all_actor_infos
174 .into_iter()
175 .map(|(loaded_database_id, job_fragment_infos)| {
176 if let Some(database_id) = database_id {
177 assert_eq!(database_id, loaded_database_id);
178 }
179 (
180 loaded_database_id,
181 job_fragment_infos
182 .into_iter()
183 .map(|(job_id, fragment_infos)| {
184 (
185 job_id,
186 fragment_infos
187 .into_iter()
188 .map(|(fragment_id, info)| (fragment_id as _, info))
189 .collect(),
190 )
191 })
192 .collect(),
193 )
194 })
195 .collect())
196 }
197
198 #[expect(clippy::type_complexity)]
199 fn resolve_hummock_version_epochs(
200 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
201 version: &HummockVersion,
202 ) -> MetaResult<(
203 HashMap<TableId, u64>,
204 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
205 )> {
206 let table_committed_epoch: HashMap<_, _> = version
207 .state_table_info
208 .info()
209 .iter()
210 .map(|(table_id, info)| (*table_id, info.committed_epoch))
211 .collect();
212 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
213 Ok(*table_committed_epoch
214 .get(&table_id)
215 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
216 };
217 let mut min_downstream_committed_epochs = HashMap::new();
218 for (job_id, fragments) in background_jobs {
219 let job_committed_epoch = {
220 let mut table_id_iter =
221 InflightFragmentInfo::existing_table_ids(fragments.values());
222 let Some(first_table_id) = table_id_iter.next() else {
223 bail!("job {} has no state table", job_id);
224 };
225 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
226 for table_id in table_id_iter {
227 let table_committed_epoch = get_table_committed_epoch(table_id)?;
228 if job_committed_epoch != table_committed_epoch {
229 bail!(
230 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
231 first_table_id,
232 job_committed_epoch,
233 table_id,
234 table_committed_epoch,
235 job_id
236 );
237 }
238 }
239
240 job_committed_epoch
241 };
242 if let (Some(snapshot_backfill_info), _) =
243 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
244 fragments
245 .values()
246 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
247 )?
248 {
249 for (upstream_table, snapshot_epoch) in
250 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
251 {
252 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
253 anyhow!(
254 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
255 job_id, upstream_table
256 )
257 })?;
258 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
259 match min_downstream_committed_epochs.entry(upstream_table) {
260 Entry::Occupied(entry) => {
261 let prev_min_epoch = entry.into_mut();
262 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
263 }
264 Entry::Vacant(entry) => {
265 entry.insert(pinned_epoch);
266 }
267 }
268 }
269 }
270 }
271 let mut log_epochs = HashMap::new();
272 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
273 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
274 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
275 Ordering::Less => {
276 bail!(
277 "downstream epoch {} later than upstream epoch {} of table {}",
278 downstream_committed_epoch,
279 upstream_committed_epoch,
280 upstream_table_id
281 );
282 }
283 Ordering::Equal => {
284 continue;
285 }
286 Ordering::Greater => {
287 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
288 {
289 let epochs = table_change_log
290 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
291 .map(|epoch_log| {
292 (
293 epoch_log.non_checkpoint_epochs.clone(),
294 epoch_log.checkpoint_epoch,
295 )
296 })
297 .collect_vec();
298 let first_epochs = epochs.first();
299 if let Some((_, first_checkpoint_epoch)) = &first_epochs
300 && *first_checkpoint_epoch == downstream_committed_epoch
301 {
302 } else {
303 bail!(
304 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
305 epochs,
306 upstream_table_id,
307 downstream_committed_epoch
308 );
309 }
310 log_epochs
311 .try_insert(upstream_table_id, epochs)
312 .expect("non-duplicated");
313 } else {
314 bail!(
315 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
316 upstream_table_id,
317 upstream_committed_epoch,
318 downstream_committed_epoch
319 );
320 }
321 }
322 }
323 }
324 Ok((table_committed_epoch, log_epochs))
325 }
326
327 async fn recovery_table_with_upstream_sinks(
332 &self,
333 inflight_jobs: &mut HashMap<
334 DatabaseId,
335 HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
336 >,
337 ) -> MetaResult<()> {
338 let mut jobs = inflight_jobs.values_mut().try_fold(
339 HashMap::new(),
340 |mut acc, table_map| -> MetaResult<_> {
341 for (job_id, job) in table_map {
342 if acc.insert(*job_id, job).is_some() {
343 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
344 }
345 }
346 Ok(acc)
347 },
348 )?;
349 let tables = self
351 .metadata_manager
352 .catalog_controller
353 .get_user_created_table_by_ids(jobs.keys().copied())
354 .await?;
355 for table in tables {
356 assert_eq!(table.table_type(), PbTableType::Table);
357 let fragment_infos = jobs.get_mut(&table.id.as_job_id()).unwrap();
358 let mut target_fragment_id = None;
359 for fragment in fragment_infos.values() {
360 let mut is_target_fragment = false;
361 visit_stream_node_cont(&fragment.nodes, |node| {
362 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
363 is_target_fragment = true;
364 false
365 } else {
366 true
367 }
368 });
369 if is_target_fragment {
370 target_fragment_id = Some(fragment.fragment_id);
371 break;
372 }
373 }
374 let Some(target_fragment_id) = target_fragment_id else {
375 tracing::debug!(
376 "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
377 table.id
378 );
379 continue;
380 };
381 let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
382 let upstream_infos = self
383 .metadata_manager
384 .catalog_controller
385 .get_all_upstream_sink_infos(&table, target_fragment_id as _)
386 .await?;
387 refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
388 }
389
390 Ok(())
391 }
392
393 pub(super) async fn reload_runtime_info_impl(
394 &self,
395 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
396 {
397 {
398 {
399 self.clean_dirty_streaming_jobs(None)
400 .await
401 .context("clean dirty streaming jobs")?;
402
403 self.reset_sink_coordinator(None)
404 .await
405 .context("reset sink coordinator")?;
406 self.abort_dirty_pending_sink_state(None)
407 .await
408 .context("abort dirty pending sink state")?;
409
410 tracing::info!("recovering background job progress");
412 let background_jobs = self
413 .list_background_job_progress(None)
414 .await
415 .context("recover background job progress should not fail")?;
416
417 tracing::info!("recovered background job progress");
418
419 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
421 self.metadata_manager
422 .catalog_controller
423 .cleanup_dropped_tables()
424 .await;
425
426 let active_streaming_nodes =
427 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
428 .await?;
429
430 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
431
432 tracing::info!(
433 "background streaming jobs: {:?} total {}",
434 background_streaming_jobs,
435 background_streaming_jobs.len()
436 );
437
438 let unreschedulable_jobs = {
439 let mut unreschedulable_jobs = HashSet::new();
440
441 for job_id in background_streaming_jobs {
442 let scan_types = self
443 .metadata_manager
444 .get_job_backfill_scan_types(job_id)
445 .await?;
446
447 if scan_types
448 .values()
449 .any(|scan_type| !scan_type.is_reschedulable())
450 {
451 unreschedulable_jobs.insert(job_id);
452 }
453 }
454
455 unreschedulable_jobs
456 };
457
458 if !unreschedulable_jobs.is_empty() {
459 tracing::info!(
460 "unreschedulable background jobs: {:?}",
461 unreschedulable_jobs
462 );
463 }
464
465 let mut info = if unreschedulable_jobs.is_empty() {
469 info!("trigger offline scaling");
470 self.resolve_database_info(None, &active_streaming_nodes)
471 .await
472 .inspect_err(|err| {
473 warn!(error = %err.as_report(), "resolve actor info failed");
474 })?
475 } else {
476 bail!(
477 "Recovery for unreschedulable background jobs is not yet implemented. \
478 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
479 unreschedulable_jobs
480 );
481 };
482
483 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
484 if !dropped_table_ids.is_empty() {
485 self.metadata_manager
486 .catalog_controller
487 .complete_dropped_tables(dropped_table_ids)
488 .await;
489 info = self
490 .resolve_database_info(None, &active_streaming_nodes)
491 .await
492 .inspect_err(|err| {
493 warn!(error = %err.as_report(), "resolve actor info failed");
494 })?
495 }
496
497 self.recovery_table_with_upstream_sinks(&mut info).await?;
498
499 let info = info;
500
501 self.purge_state_table_from_hummock(
502 &info
503 .values()
504 .flatten()
505 .flat_map(|(_, fragments)| {
506 InflightFragmentInfo::existing_table_ids(fragments.values())
507 })
508 .collect(),
509 )
510 .await
511 .context("purge state table from hummock")?;
512
513 let (state_table_committed_epochs, state_table_log_epochs) = self
514 .hummock_manager
515 .on_current_version(|version| {
516 Self::resolve_hummock_version_epochs(
517 info.values().flat_map(|jobs| {
518 jobs.iter().filter_map(|(job_id, job)| {
519 background_jobs
520 .contains_key(job_id)
521 .then_some((*job_id, job))
522 })
523 }),
524 version,
525 )
526 })
527 .await?;
528
529 let mv_depended_subscriptions = self
530 .metadata_manager
531 .get_mv_depended_subscriptions(None)
532 .await?;
533
534 let stream_actors = self.load_stream_actors(&info).await?;
535
536 let fragment_relations = self
537 .metadata_manager
538 .catalog_controller
539 .get_fragment_downstream_relations(
540 info.values()
541 .flatten()
542 .flat_map(|(_, job)| job.keys())
543 .map(|fragment_id| *fragment_id as _)
544 .collect(),
545 )
546 .await?;
547
548 let background_jobs = {
549 let mut background_jobs = self
550 .list_background_job_progress(None)
551 .await
552 .context("recover background job progress should not fail")?;
553 info.values()
554 .flatten()
555 .filter_map(|(job_id, _)| {
556 background_jobs
557 .remove(job_id)
558 .map(|definition| (*job_id, definition))
559 })
560 .collect()
561 };
562
563 let database_infos = self
564 .metadata_manager
565 .catalog_controller
566 .list_databases()
567 .await?;
568
569 let mut source_splits = HashMap::new();
571 for (_, fragment_infos) in info.values().flatten() {
572 for fragment in fragment_infos.values() {
573 for (actor_id, info) in &fragment.actors {
574 source_splits.insert(*actor_id, info.splits.clone());
575 }
576 }
577 }
578
579 let cdc_table_snapshot_splits =
580 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
581 .await?;
582
583 Ok(BarrierWorkerRuntimeInfoSnapshot {
584 active_streaming_nodes,
585 database_job_infos: info,
586 state_table_committed_epochs,
587 state_table_log_epochs,
588 mv_depended_subscriptions,
589 stream_actors,
590 fragment_relations,
591 source_splits,
592 background_jobs,
593 hummock_version_stats: self.hummock_manager.get_version_stats().await,
594 database_infos,
595 cdc_table_snapshot_splits,
596 })
597 }
598 }
599 }
600 }
601
602 pub(super) async fn reload_database_runtime_info_impl(
603 &self,
604 database_id: DatabaseId,
605 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
606 self.clean_dirty_streaming_jobs(Some(database_id))
607 .await
608 .context("clean dirty streaming jobs")?;
609
610 self.reset_sink_coordinator(Some(database_id))
611 .await
612 .context("reset sink coordinator")?;
613 self.abort_dirty_pending_sink_state(Some(database_id))
614 .await
615 .context("abort dirty pending sink state")?;
616
617 tracing::info!(
619 ?database_id,
620 "recovering background job progress of database"
621 );
622
623 let background_jobs = self
624 .list_background_job_progress(Some(database_id))
625 .await
626 .context("recover background job progress of database should not fail")?;
627 tracing::info!(?database_id, "recovered background job progress");
628
629 let dropped_table_ids = self
631 .scheduled_barriers
632 .pre_apply_drop_cancel(Some(database_id));
633 self.metadata_manager
634 .catalog_controller
635 .complete_dropped_tables(dropped_table_ids)
636 .await;
637
638 let active_streaming_nodes =
639 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
640
641 let mut all_info = self
642 .resolve_database_info(Some(database_id), &active_streaming_nodes)
643 .await
644 .inspect_err(|err| {
645 warn!(error = %err.as_report(), "resolve actor info failed");
646 })?;
647
648 let mut database_info = all_info
649 .remove(&database_id)
650 .map_or_else(HashMap::new, |table_map| {
651 HashMap::from([(database_id, table_map)])
652 });
653
654 self.recovery_table_with_upstream_sinks(&mut database_info)
655 .await?;
656
657 assert!(database_info.len() <= 1);
658
659 let stream_actors = self.load_stream_actors(&database_info).await?;
660
661 let Some(info) = database_info
662 .into_iter()
663 .next()
664 .map(|(loaded_database_id, info)| {
665 assert_eq!(loaded_database_id, database_id);
666 info
667 })
668 else {
669 return Ok(None);
670 };
671
672 let (state_table_committed_epochs, state_table_log_epochs) = self
673 .hummock_manager
674 .on_current_version(|version| {
675 Self::resolve_hummock_version_epochs(
676 background_jobs
677 .keys()
678 .map(|job_id| (*job_id, &info[job_id])),
679 version,
680 )
681 })
682 .await?;
683
684 let mv_depended_subscriptions = self
685 .metadata_manager
686 .get_mv_depended_subscriptions(Some(database_id))
687 .await?;
688
689 let fragment_relations = self
690 .metadata_manager
691 .catalog_controller
692 .get_fragment_downstream_relations(
693 info.values()
694 .flatten()
695 .map(|(fragment_id, _)| *fragment_id as _)
696 .collect(),
697 )
698 .await?;
699
700 let mut source_splits = HashMap::new();
702 for (_, fragment) in info.values().flatten() {
703 for (actor_id, info) in &fragment.actors {
704 source_splits.insert(*actor_id, info.splits.clone());
705 }
706 }
707
708 let cdc_table_snapshot_splits =
709 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
710 .await?;
711
712 self.refresh_manager
713 .remove_trackers_by_database(database_id);
714
715 Ok(Some(DatabaseRuntimeInfoSnapshot {
716 job_infos: info,
717 state_table_committed_epochs,
718 state_table_log_epochs,
719 mv_depended_subscriptions,
720 stream_actors,
721 fragment_relations,
722 source_splits,
723 background_jobs,
724 cdc_table_snapshot_splits,
725 }))
726 }
727
728 async fn load_stream_actors(
729 &self,
730 all_info: &HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
731 ) -> MetaResult<HashMap<ActorId, StreamActor>> {
732 let job_ids = all_info
733 .values()
734 .flat_map(|jobs| jobs.keys().copied())
735 .collect_vec();
736
737 let job_extra_info = self
738 .metadata_manager
739 .catalog_controller
740 .get_streaming_job_extra_info(job_ids)
741 .await?;
742
743 let mut stream_actors = HashMap::new();
744
745 for (job_id, streaming_info) in all_info.values().flatten() {
746 let extra_info = job_extra_info
747 .get(job_id)
748 .cloned()
749 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
750 let expr_context = extra_info.stream_context().to_expr_context();
751 let job_definition = extra_info.job_definition;
752 let config_override = extra_info.config_override;
753
754 for (fragment_id, fragment_infos) in streaming_info {
755 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
756 stream_actors.insert(
757 *actor_id,
758 StreamActor {
759 actor_id: *actor_id as _,
760 fragment_id: *fragment_id,
761 vnode_bitmap: vnode_bitmap.clone(),
762 mview_definition: job_definition.clone(),
763 expr_context: Some(expr_context.clone()),
764 config_override: config_override.clone(),
765 },
766 );
767 }
768 }
769 }
770 Ok(stream_actors)
771 }
772}