1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::AtomicU32;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
26use risingwave_connector::source::SplitImpl;
27use risingwave_hummock_sdk::change_log::TableChangeLogs;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::checkpoint::{
39 BatchRefreshJobCheckpointControl, BatchRefreshLogicalFragments, BatchRefreshRenderResult,
40};
41use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
42use crate::barrier::progress::TrackingJob;
43use crate::barrier::rpc::to_partial_graph_id;
44use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
45use crate::controller::scale::{
46 FragmentRenderMap, LoadedFragment, LoadedFragmentContext, RenderedGraph,
47 render_actor_assignments,
48};
49use crate::controller::utils::StreamingJobExtraInfo;
50use crate::manager::ActiveStreamingWorkerNodes;
51use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
52use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
53use crate::stream::cdc::reload_cdc_table_snapshot_splits;
54use crate::stream::{
55 SourceChange, StreamFragmentGraph, UpstreamSinkInfo, cleanup_dropped_streaming_jobs,
56};
57
58#[derive(Debug)]
59pub(crate) struct UpstreamSinkRecoveryInfo {
60 target_fragment_id: FragmentId,
61 upstream_infos: Vec<UpstreamSinkInfo>,
62}
63
64#[derive(Debug)]
65pub struct LoadedRecoveryContext {
66 pub fragment_context: LoadedFragmentContext,
67 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
68 pub upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
69 pub fragment_relations: FragmentDownstreamRelation,
70}
71
72impl LoadedRecoveryContext {
73 fn empty(fragment_context: LoadedFragmentContext) -> Self {
74 Self {
75 fragment_context,
76 job_extra_info: HashMap::new(),
77 upstream_sink_recovery: HashMap::new(),
78 fragment_relations: FragmentDownstreamRelation::default(),
79 }
80 }
81}
82
83pub struct RenderedDatabaseRuntimeInfo {
84 pub job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
85 pub stream_actors: HashMap<ActorId, StreamActor>,
86 pub source_splits: HashMap<ActorId, Vec<SplitImpl>>,
87 pub batch_refresh: HashMap<JobId, BatchRefreshRenderResult>,
89}
90
91pub fn render_runtime_info(
92 actor_id_generator: &AtomicU32,
93 worker_nodes: &ActiveStreamingWorkerNodes,
94 recovery_context: &LoadedRecoveryContext,
95 database_id: DatabaseId,
96) -> MetaResult<Option<RenderedDatabaseRuntimeInfo>> {
97 let Some(mut per_database_context) =
98 recovery_context.fragment_context.for_database(database_id)
99 else {
100 return Ok(None);
101 };
102
103 assert!(!per_database_context.is_empty());
104
105 let batch_refresh_job_ids: HashSet<JobId> = per_database_context
108 .job_map
109 .iter()
110 .filter(|(_, model)| model.refresh_interval_sec.is_some())
111 .map(|(job_id, _)| *job_id)
112 .collect();
113
114 let mut batch_refresh_logical = HashMap::new();
115 if !batch_refresh_job_ids.is_empty() {
116 let batch_refresh_fragment_ids: HashSet<FragmentId> = batch_refresh_job_ids
117 .iter()
118 .flat_map(|job_id| {
119 per_database_context
120 .job_fragments
121 .get(job_id)
122 .unwrap()
123 .keys()
124 .copied()
125 })
126 .collect();
127
128 for &job_id in &batch_refresh_job_ids {
129 let fragments = per_database_context.job_fragments.remove(&job_id).unwrap();
130 let downstreams = fragments
131 .keys()
132 .filter_map(|fid| {
133 recovery_context
134 .fragment_relations
135 .get(fid)
136 .map(|r| (*fid, r.clone()))
137 })
138 .collect();
139 batch_refresh_logical.insert(
140 job_id,
141 BatchRefreshLogicalFragments {
142 fragments,
143 downstreams,
144 },
145 );
146 }
147
148 per_database_context.ensembles.retain(|ensemble| {
150 !ensemble
151 .component_fragments()
152 .all(|fid| batch_refresh_fragment_ids.contains(&fid))
153 });
154 }
155
156 let mut batch_refresh = HashMap::new();
158 for (job_id, logical) in batch_refresh_logical {
159 let extra = recovery_context
160 .job_extra_info
161 .get(&job_id)
162 .expect("should have extra info");
163 let streaming_job_model = per_database_context
164 .job_map
165 .get(&job_id)
166 .expect("should have streaming job model");
167 let database_model = &per_database_context.database_map[&database_id];
168 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
169
170 let render_result = BatchRefreshJobCheckpointControl::render_actors_and_build_job_info(
171 &logical.fragments,
172 &logical.downstreams,
173 &extra.job_definition,
174 actor_id_generator,
175 worker_nodes.current(),
176 &database_model.resource_group,
177 streaming_job_model,
178 partial_graph_id,
179 )?;
180
181 batch_refresh.insert(job_id, render_result);
182 }
183
184 if per_database_context.ensembles.is_empty() {
186 return Ok(Some(RenderedDatabaseRuntimeInfo {
187 job_infos: HashMap::new(),
188 stream_actors: HashMap::new(),
189 source_splits: HashMap::new(),
190 batch_refresh,
191 }));
192 }
193
194 let RenderedGraph { mut fragments, .. } = render_actor_assignments(
195 actor_id_generator,
196 worker_nodes.current(),
197 &per_database_context,
198 )?;
199
200 let single_database = match fragments.remove(&database_id) {
201 Some(info) => info,
202 None => return Ok(None),
203 };
204
205 let mut database_map = HashMap::from([(database_id, single_database)]);
206 recovery_table_with_upstream_sinks(
207 &mut database_map,
208 &recovery_context.upstream_sink_recovery,
209 )?;
210 let stream_actors = build_stream_actors(&database_map, &recovery_context.job_extra_info)?;
211
212 let job_infos = database_map
213 .remove(&database_id)
214 .expect("database entry must exist");
215
216 let mut source_splits = HashMap::new();
217 for fragment_infos in job_infos.values() {
218 for fragment in fragment_infos.values() {
219 for (actor_id, info) in &fragment.actors {
220 source_splits.insert(*actor_id, info.splits.clone());
221 }
222 }
223 }
224
225 Ok(Some(RenderedDatabaseRuntimeInfo {
226 job_infos,
227 stream_actors,
228 source_splits,
229 batch_refresh,
230 }))
231}
232
233fn recovery_table_with_upstream_sinks(
238 inflight_jobs: &mut FragmentRenderMap,
239 upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
240) -> MetaResult<()> {
241 if upstream_sink_recovery.is_empty() {
242 return Ok(());
243 }
244
245 let mut seen_jobs = HashSet::new();
246
247 for jobs in inflight_jobs.values_mut() {
248 for (job_id, fragments) in jobs {
249 if !seen_jobs.insert(*job_id) {
250 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
251 }
252
253 if let Some(recovery) = upstream_sink_recovery.get(job_id) {
254 if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
255 refill_upstream_sink_union_in_table(
256 &mut target_fragment.nodes,
257 &recovery.upstream_infos,
258 );
259 } else {
260 return Err(anyhow::anyhow!(
261 "target fragment {} not found for upstream sink recovery of job {}",
262 recovery.target_fragment_id,
263 job_id
264 )
265 .into());
266 }
267 }
268 }
269 }
270
271 Ok(())
272}
273
274fn build_stream_actors(
280 all_info: &FragmentRenderMap,
281 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
282) -> MetaResult<HashMap<ActorId, StreamActor>> {
283 let mut stream_actors = HashMap::new();
284
285 for (job_id, streaming_info) in all_info.values().flatten() {
286 let extra_info = job_extra_info
287 .get(job_id)
288 .cloned()
289 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
290 let expr_context = extra_info.stream_context().to_expr_context();
291 let job_definition = extra_info.job_definition;
292 let config_override = extra_info.config_override;
293
294 for (fragment_id, fragment_infos) in streaming_info {
295 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
296 stream_actors.insert(
297 *actor_id,
298 StreamActor {
299 actor_id: *actor_id,
300 fragment_id: *fragment_id,
301 vnode_bitmap: vnode_bitmap.clone(),
302 mview_definition: job_definition.clone(),
303 expr_context: Some(expr_context.clone()),
304 config_override: config_override.clone(),
305 },
306 );
307 }
308 }
309 }
310 Ok(stream_actors)
311}
312
313impl GlobalBarrierWorkerContextImpl {
314 fn resolve_job_committed_epoch(
315 job_id: JobId,
316 fragments: &HashMap<FragmentId, LoadedFragment>,
317 state_table_committed_epochs: &HashMap<TableId, u64>,
318 ) -> MetaResult<u64> {
319 let mut table_id_iter = fragments
320 .values()
321 .flat_map(|fragment| fragment.state_table_ids.iter().copied());
322 let Some(first_table_id) = table_id_iter.next() else {
323 bail!("job {} has no state table", job_id);
324 };
325 let committed_epoch = *state_table_committed_epochs
326 .get(&first_table_id)
327 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", first_table_id))?;
328 for table_id in table_id_iter {
329 let table_committed_epoch = *state_table_committed_epochs
330 .get(&table_id)
331 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?;
332 if committed_epoch != table_committed_epoch {
333 bail!(
334 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
335 first_table_id,
336 committed_epoch,
337 table_id,
338 table_committed_epoch,
339 job_id
340 );
341 }
342 }
343
344 Ok(committed_epoch)
345 }
346
347 async fn finish_completed_batch_refresh_background_jobs(
348 &self,
349 recovery_context: &LoadedRecoveryContext,
350 state_table_committed_epochs: &HashMap<TableId, u64>,
351 background_jobs: &mut HashSet<JobId>,
352 ) -> MetaResult<()> {
353 let background_job_ids = background_jobs.iter().copied().collect_vec();
354 for job_id in background_job_ids {
355 let Some(job) = recovery_context.fragment_context.job_map.get(&job_id) else {
356 continue;
357 };
358 if job.refresh_interval_sec.is_none() {
359 continue;
360 }
361 let Some(fragments) = recovery_context.fragment_context.job_fragments.get(&job_id)
362 else {
363 continue;
364 };
365
366 let committed_epoch =
367 Self::resolve_job_committed_epoch(job_id, fragments, state_table_committed_epochs)?;
368 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
369 fragments
370 .values()
371 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
372 )?
373 .0
374 .ok_or_else(|| anyhow!("batch refresh job {} has no snapshot backfill info", job_id))?;
375 let snapshot_epoch = snapshot_backfill_info
376 .upstream_mv_table_id_to_backfill_epoch
377 .values()
378 .find_map(|e| *e)
379 .unwrap_or(committed_epoch);
380 if committed_epoch < snapshot_epoch {
381 continue;
382 }
383
384 info!(
385 %job_id,
386 committed_epoch,
387 snapshot_epoch,
388 "finish completed batch refresh background job during recovery"
389 );
390 self.finish_creating_job(TrackingJob::recovered_from_fragment_nodes(
391 job_id,
392 fragments
393 .iter()
394 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
395 ))
396 .await?;
397 background_jobs.remove(&job_id);
398 }
399
400 Ok(())
401 }
402
403 async fn apply_pre_applied_drop_cancel(
404 &self,
405 database_id: Option<DatabaseId>,
406 ) -> MetaResult<bool> {
407 let drop_cancel = self.scheduled_barriers.pre_apply_drop_cancel(database_id);
408 let has_drop_streaming_jobs = !drop_cancel.streaming_job_ids.is_empty();
409 cleanup_dropped_streaming_jobs(
410 &self.refresh_manager,
411 &self.hummock_manager,
412 &self.metadata_manager,
413 drop_cancel.streaming_job_ids,
414 drop_cancel.dropped_state_table_ids,
415 "drop_streaming_jobs",
416 )
417 .await?;
418 Ok(has_drop_streaming_jobs)
419 }
420
421 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
423 self.metadata_manager
424 .catalog_controller
425 .clean_dirty_subscription(database_id)
426 .await?;
427
428 let cleaned_dirty_jobs = self
429 .metadata_manager
430 .catalog_controller
431 .clean_dirty_creating_jobs(database_id)
432 .await?;
433 if database_id.is_some() {
434 cleanup_dropped_streaming_jobs(
437 &self.refresh_manager,
438 &self.hummock_manager,
439 &self.metadata_manager,
440 cleaned_dirty_jobs.streaming_job_ids,
441 cleaned_dirty_jobs.dropped_table_ids,
442 "clean_dirty_creating_jobs",
443 )
444 .await?;
445 }
446 self.metadata_manager
447 .reset_all_refresh_jobs_to_idle()
448 .await?;
449
450 self.source_manager
452 .apply_source_change(SourceChange::DropSource {
453 dropped_source_ids: cleaned_dirty_jobs.source_ids,
454 })
455 .await;
456
457 Ok(())
458 }
459
460 async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
461 if let Some(database_id) = database_id {
462 let sink_ids = self
463 .metadata_manager
464 .catalog_controller
465 .list_sink_ids(Some(database_id))
466 .await?;
467 self.sink_manager.stop_sink_coordinator(sink_ids).await;
468 } else {
469 self.sink_manager.reset().await;
470 }
471 Ok(())
472 }
473
474 async fn abort_dirty_pending_sink_state(
475 &self,
476 database_id: Option<DatabaseId>,
477 ) -> MetaResult<()> {
478 let pending_sinks: HashSet<SinkId> = self
479 .metadata_manager
480 .catalog_controller
481 .list_all_pending_sinks(database_id)
482 .await?;
483
484 if pending_sinks.is_empty() {
485 return Ok(());
486 }
487
488 let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
489 .metadata_manager
490 .catalog_controller
491 .fetch_sink_with_state_table_ids(pending_sinks)
492 .await?;
493
494 let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
495
496 for (sink_id, table_ids) in sink_with_state_tables {
497 let Some(table_id) = table_ids.first() else {
498 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
499 };
500
501 self.hummock_manager
502 .on_current_version(|version| -> MetaResult<()> {
503 if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
504 assert!(
505 sink_committed_epoch
506 .insert(sink_id, committed_epoch)
507 .is_none()
508 );
509 Ok(())
510 } else {
511 Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
512 }
513 })
514 .await?;
515 }
516
517 self.metadata_manager
518 .catalog_controller
519 .abort_pending_sink_epochs(sink_committed_epoch)
520 .await?;
521
522 Ok(())
523 }
524
525 async fn purge_state_table_from_hummock(
526 &self,
527 all_state_table_ids: &HashSet<TableId>,
528 ) -> MetaResult<()> {
529 self.hummock_manager.purge(all_state_table_ids).await?;
530 Ok(())
531 }
532
533 async fn list_background_job_progress(
534 &self,
535 database_id: Option<DatabaseId>,
536 ) -> MetaResult<HashSet<JobId>> {
537 let mgr = &self.metadata_manager;
538 mgr.catalog_controller
539 .list_background_creating_jobs(false, database_id)
540 .await
541 }
542
543 async fn load_recovery_context(
544 &self,
545 database_id: Option<DatabaseId>,
546 ) -> MetaResult<LoadedRecoveryContext> {
547 let inner = self
548 .metadata_manager
549 .catalog_controller
550 .get_inner_read_guard()
551 .await;
552 let txn = inner.db.begin().await?;
553
554 let fragment_context = self
555 .metadata_manager
556 .catalog_controller
557 .load_fragment_context_in_txn(&txn, database_id)
558 .await
559 .inspect_err(|err| {
560 warn!(error = %err.as_report(), "load fragment context failed");
561 })?;
562
563 if fragment_context.is_empty() {
564 return Ok(LoadedRecoveryContext::empty(fragment_context));
565 }
566
567 let job_ids = fragment_context.job_map.keys().copied().collect_vec();
568 let job_extra_info = self
569 .metadata_manager
570 .catalog_controller
571 .get_streaming_job_extra_info_in_txn(&txn, job_ids)
572 .await?;
573
574 let mut upstream_targets = HashMap::new();
575 for fragment in fragment_context
576 .job_fragments
577 .values()
578 .flat_map(|fragments| fragments.values())
579 {
580 let mut has_upstream_union = false;
581 visit_stream_node_cont(&fragment.nodes, |node| {
582 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
583 has_upstream_union = true;
584 false
585 } else {
586 true
587 }
588 });
589
590 if has_upstream_union
591 && let Some(previous) =
592 upstream_targets.insert(fragment.job_id, fragment.fragment_id)
593 {
594 bail!(
595 "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
596 fragment.job_id,
597 fragment.fragment_id,
598 previous
599 );
600 }
601 }
602
603 let mut upstream_sink_recovery = HashMap::new();
604 if !upstream_targets.is_empty() {
605 let tables = self
606 .metadata_manager
607 .catalog_controller
608 .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
609 .await?;
610
611 for table in tables {
612 let job_id = table.id.as_job_id();
613 let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
614 tracing::debug!(
616 job_id = %job_id,
617 "upstream sink union target fragment not found for table"
618 );
619 continue;
620 };
621
622 let upstream_infos = self
623 .metadata_manager
624 .catalog_controller
625 .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
626 .await?;
627
628 upstream_sink_recovery.insert(
629 job_id,
630 UpstreamSinkRecoveryInfo {
631 target_fragment_id: *target_fragment_id,
632 upstream_infos,
633 },
634 );
635 }
636 }
637
638 let fragment_relations = self
639 .metadata_manager
640 .catalog_controller
641 .get_fragment_downstream_relations_in_txn(
642 &txn,
643 fragment_context
644 .job_fragments
645 .values()
646 .flat_map(|fragments| fragments.keys().copied())
647 .collect_vec(),
648 )
649 .await?;
650
651 Ok(LoadedRecoveryContext {
652 fragment_context,
653 job_extra_info,
654 upstream_sink_recovery,
655 fragment_relations,
656 })
657 }
658
659 #[expect(clippy::type_complexity)]
660 fn resolve_hummock_version_epochs(
661 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, LoadedFragment>)>,
662 version: &HummockVersion,
663 table_change_log: &TableChangeLogs,
664 ) -> MetaResult<(
665 HashMap<TableId, u64>,
666 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
667 )> {
668 let table_committed_epoch: HashMap<_, _> = version
669 .state_table_info
670 .info()
671 .iter()
672 .map(|(table_id, info)| (*table_id, info.committed_epoch))
673 .collect();
674 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
675 Ok(*table_committed_epoch
676 .get(&table_id)
677 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
678 };
679 let mut min_downstream_committed_epochs = HashMap::new();
680 for (job_id, fragments) in background_jobs {
681 let job_committed_epoch =
682 Self::resolve_job_committed_epoch(job_id, fragments, &table_committed_epoch)?;
683 if let (Some(snapshot_backfill_info), _) =
684 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
685 fragments
686 .values()
687 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
688 )?
689 {
690 for (upstream_table, snapshot_epoch) in
691 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
692 {
693 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
694 anyhow!(
695 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
696 job_id, upstream_table
697 )
698 })?;
699 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
700 match min_downstream_committed_epochs.entry(upstream_table) {
701 Entry::Occupied(entry) => {
702 let prev_min_epoch = entry.into_mut();
703 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
704 }
705 Entry::Vacant(entry) => {
706 entry.insert(pinned_epoch);
707 }
708 }
709 }
710 }
711 }
712 let mut log_epochs = HashMap::new();
713 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
714 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
715 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
716 Ordering::Less => {
717 bail!(
718 "downstream epoch {} later than upstream epoch {} of table {}",
719 downstream_committed_epoch,
720 upstream_committed_epoch,
721 upstream_table_id
722 );
723 }
724 Ordering::Equal => {
725 continue;
726 }
727 Ordering::Greater => {
728 if let Some(table_change_log) = table_change_log.get(&upstream_table_id) {
729 let epochs = table_change_log
730 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
731 .map(|epoch_log| {
732 (
733 epoch_log.non_checkpoint_epochs.clone(),
734 epoch_log.checkpoint_epoch,
735 )
736 })
737 .collect_vec();
738 let first_epochs = epochs.first();
739 if let Some((_, first_checkpoint_epoch)) = &first_epochs
740 && *first_checkpoint_epoch == downstream_committed_epoch
741 {
742 } else {
743 bail!(
744 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
745 epochs,
746 upstream_table_id,
747 downstream_committed_epoch
748 );
749 }
750 log_epochs
751 .try_insert(upstream_table_id, epochs)
752 .expect("non-duplicated");
753 } else {
754 bail!(
755 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
756 upstream_table_id,
757 upstream_committed_epoch,
758 downstream_committed_epoch
759 );
760 }
761 }
762 }
763 }
764 Ok((table_committed_epoch, log_epochs))
765 }
766
767 pub(super) async fn reload_runtime_info_impl(
768 &self,
769 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
770 {
771 {
772 {
773 self.clean_dirty_streaming_jobs(None)
774 .await
775 .context("clean dirty streaming jobs")?;
776
777 self.reset_sink_coordinator(None)
778 .await
779 .context("reset sink coordinator")?;
780 self.abort_dirty_pending_sink_state(None)
781 .await
782 .context("abort dirty pending sink state")?;
783
784 tracing::info!("recovering background job progress");
786 let mut initial_background_jobs = self
787 .list_background_job_progress(None)
788 .await
789 .context("recover background job progress should not fail")?;
790
791 tracing::info!("recovered background job progress");
792
793 let _ = self.apply_pre_applied_drop_cancel(None).await?;
795 self.metadata_manager
796 .catalog_controller
797 .cleanup_dropped_tables()
798 .await;
799
800 let active_streaming_nodes =
801 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
802 .await?;
803
804 let background_streaming_jobs =
805 initial_background_jobs.iter().cloned().collect_vec();
806
807 tracing::info!(
808 "background streaming jobs: {:?} total {}",
809 background_streaming_jobs,
810 background_streaming_jobs.len()
811 );
812
813 let unreschedulable_jobs = {
814 let mut unreschedulable_jobs = HashSet::new();
815
816 for job_id in background_streaming_jobs {
817 let scan_types = self
818 .metadata_manager
819 .get_job_backfill_scan_types(job_id)
820 .await?;
821
822 if scan_types
823 .values()
824 .any(|scan_type| !scan_type.is_reschedulable(false))
825 {
826 unreschedulable_jobs.insert(job_id);
827 }
828 }
829
830 unreschedulable_jobs
831 };
832
833 if !unreschedulable_jobs.is_empty() {
834 info!(
835 "unreschedulable background jobs: {:?}",
836 unreschedulable_jobs
837 );
838 }
839
840 if !unreschedulable_jobs.is_empty() {
844 bail!(
845 "Recovery for unreschedulable background jobs is not yet implemented. \
846 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
847 unreschedulable_jobs
848 );
849 }
850
851 let mut recovery_context = self.load_recovery_context(None).await?;
852 if self.apply_pre_applied_drop_cancel(None).await? {
853 recovery_context = self.load_recovery_context(None).await?;
854 }
855
856 self.purge_state_table_from_hummock(
857 &recovery_context
858 .fragment_context
859 .job_fragments
860 .values()
861 .flat_map(|fragments| fragments.values())
862 .flat_map(|fragment| fragment.state_table_ids.iter().copied())
863 .collect(),
864 )
865 .await
866 .context("purge state table from hummock")?;
867
868 let (state_table_committed_epochs, state_table_log_epochs) = self
869 .hummock_manager
870 .on_current_version_and_table_change_log(|version, table_change_log| {
871 Self::resolve_hummock_version_epochs(
872 recovery_context
873 .fragment_context
874 .job_fragments
875 .iter()
876 .filter_map(|(job_id, job)| {
877 initial_background_jobs
878 .contains(job_id)
879 .then_some((*job_id, job))
880 }),
881 version,
882 table_change_log,
883 )
884 })
885 .await?;
886
887 self.finish_completed_batch_refresh_background_jobs(
888 &recovery_context,
889 &state_table_committed_epochs,
890 &mut initial_background_jobs,
891 )
892 .await?;
893
894 let mv_depended_subscriptions = self
895 .metadata_manager
896 .get_mv_depended_subscriptions(None)
897 .await?;
898
899 let background_jobs = {
901 let mut refreshed_background_jobs = self
902 .list_background_job_progress(None)
903 .await
904 .context("recover background job progress should not fail")?;
905 recovery_context
906 .fragment_context
907 .job_map
908 .keys()
909 .filter_map(|job_id| {
910 refreshed_background_jobs.remove(job_id).then_some(*job_id)
911 })
912 .collect()
913 };
914
915 let database_infos = self
916 .metadata_manager
917 .catalog_controller
918 .list_databases()
919 .await?;
920
921 let cdc_table_snapshot_splits =
922 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
923 .await?;
924
925 Ok(BarrierWorkerRuntimeInfoSnapshot {
926 active_streaming_nodes,
927 recovery_context,
928 state_table_committed_epochs,
929 state_table_log_epochs,
930 mv_depended_subscriptions,
931 background_jobs,
932 hummock_version_stats: self.hummock_manager.get_version_stats().await,
933 database_infos,
934 cdc_table_snapshot_splits,
935 })
936 }
937 }
938 }
939 }
940
941 pub(super) async fn reload_database_runtime_info_impl(
942 &self,
943 database_id: DatabaseId,
944 ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
945 self.clean_dirty_streaming_jobs(Some(database_id))
946 .await
947 .context("clean dirty streaming jobs")?;
948
949 self.reset_sink_coordinator(Some(database_id))
950 .await
951 .context("reset sink coordinator")?;
952 self.abort_dirty_pending_sink_state(Some(database_id))
953 .await
954 .context("abort dirty pending sink state")?;
955
956 tracing::info!(
958 ?database_id,
959 "recovering background job progress of database"
960 );
961
962 let mut background_jobs = self
963 .list_background_job_progress(Some(database_id))
964 .await
965 .context("recover background job progress of database should not fail")?;
966 tracing::info!(?database_id, "recovered background job progress");
967
968 let _ = self
970 .apply_pre_applied_drop_cancel(Some(database_id))
971 .await?;
972
973 let recovery_context = self.load_recovery_context(Some(database_id)).await?;
974
975 let missing_background_jobs = background_jobs
976 .iter()
977 .filter(|job_id| {
978 !recovery_context
979 .fragment_context
980 .job_map
981 .contains_key(*job_id)
982 })
983 .copied()
984 .collect_vec();
985 if !missing_background_jobs.is_empty() {
986 warn!(
987 database_id = %database_id,
988 missing_job_ids = ?missing_background_jobs,
989 "background jobs missing in rendered info"
990 );
991 }
992
993 let (state_table_committed_epochs, state_table_log_epochs) = self
994 .hummock_manager
995 .on_current_version_and_table_change_log(|version, table_change_log| {
996 Self::resolve_hummock_version_epochs(
997 background_jobs.iter().filter_map(|job_id| {
998 recovery_context
999 .fragment_context
1000 .job_fragments
1001 .get(job_id)
1002 .map(|job| (*job_id, job))
1003 }),
1004 version,
1005 table_change_log,
1006 )
1007 })
1008 .await?;
1009
1010 self.finish_completed_batch_refresh_background_jobs(
1011 &recovery_context,
1012 &state_table_committed_epochs,
1013 &mut background_jobs,
1014 )
1015 .await?;
1016
1017 let mv_depended_subscriptions = self
1018 .metadata_manager
1019 .get_mv_depended_subscriptions(Some(database_id))
1020 .await?;
1021
1022 let cdc_table_snapshot_splits =
1023 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
1024 .await?;
1025
1026 self.refresh_manager
1027 .remove_trackers_by_database(database_id);
1028
1029 Ok(DatabaseRuntimeInfoSnapshot {
1030 recovery_context,
1031 state_table_committed_epochs,
1032 state_table_log_epochs,
1033 mv_depended_subscriptions,
1034 background_jobs,
1035 cdc_table_snapshot_splits,
1036 })
1037 }
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042 use std::collections::HashMap;
1043
1044 use risingwave_common::catalog::FragmentTypeMask;
1045 use risingwave_common::id::WorkerId;
1046 use risingwave_meta_model::DispatcherType;
1047 use risingwave_meta_model::fragment::DistributionType;
1048 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1049 use risingwave_pb::stream_plan::{
1050 PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
1051 };
1052
1053 use super::*;
1054 use crate::controller::fragment::InflightActorInfo;
1055 use crate::model::DownstreamFragmentRelation;
1056 use crate::stream::UpstreamSinkInfo;
1057
1058 #[test]
1059 fn test_recovery_table_with_upstream_sinks_updates_union_node() {
1060 let database_id = DatabaseId::new(1);
1061 let job_id = JobId::new(10);
1062 let fragment_id = FragmentId::new(100);
1063 let sink_fragment_id = FragmentId::new(200);
1064
1065 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
1066 let fragment = InflightFragmentInfo {
1067 fragment_id,
1068 distribution_type: DistributionType::Hash,
1069 fragment_type_mask: FragmentTypeMask::empty(),
1070 vnode_count: 1,
1071 nodes: PbStreamNode {
1072 node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
1073 PbUpstreamSinkUnionNode {
1074 init_upstreams: vec![],
1075 },
1076 ))),
1077 ..Default::default()
1078 },
1079 actors: HashMap::new(),
1080 state_table_ids: HashSet::new(),
1081 };
1082
1083 inflight_jobs
1084 .entry(database_id)
1085 .or_default()
1086 .entry(job_id)
1087 .or_default()
1088 .insert(fragment_id, fragment);
1089
1090 let upstream_sink_recovery = HashMap::from([(
1091 job_id,
1092 UpstreamSinkRecoveryInfo {
1093 target_fragment_id: fragment_id,
1094 upstream_infos: vec![UpstreamSinkInfo {
1095 sink_id: SinkId::new(1),
1096 sink_fragment_id,
1097 sink_output_fields: vec![],
1098 sink_original_target_columns: vec![],
1099 project_exprs: vec![],
1100 new_sink_downstream: DownstreamFragmentRelation {
1101 downstream_fragment_id: FragmentId::new(300),
1102 dispatcher_type: DispatcherType::Hash,
1103 dist_key_indices: vec![],
1104 output_mapping: PbDispatchOutputMapping::default(),
1105 },
1106 }],
1107 },
1108 )]);
1109
1110 recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
1111
1112 let updated = inflight_jobs
1113 .get(&database_id)
1114 .unwrap()
1115 .get(&job_id)
1116 .unwrap()
1117 .get(&fragment_id)
1118 .unwrap();
1119
1120 let PbNodeBody::UpstreamSinkUnion(updated_union) =
1121 updated.nodes.node_body.as_ref().unwrap()
1122 else {
1123 panic!("expected upstream sink union node");
1124 };
1125
1126 assert_eq!(updated_union.init_upstreams.len(), 1);
1127 assert_eq!(
1128 updated_union.init_upstreams[0].upstream_fragment_id,
1129 sink_fragment_id.as_raw_id()
1130 );
1131 }
1132
1133 #[test]
1134 fn test_build_stream_actors_uses_preloaded_extra_info() {
1135 let database_id = DatabaseId::new(2);
1136 let job_id = JobId::new(20);
1137 let fragment_id = FragmentId::new(120);
1138 let actor_id = ActorId::new(500);
1139
1140 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
1141 inflight_jobs
1142 .entry(database_id)
1143 .or_default()
1144 .entry(job_id)
1145 .or_default()
1146 .insert(
1147 fragment_id,
1148 InflightFragmentInfo {
1149 fragment_id,
1150 distribution_type: DistributionType::Hash,
1151 fragment_type_mask: FragmentTypeMask::empty(),
1152 vnode_count: 1,
1153 nodes: PbStreamNode::default(),
1154 actors: HashMap::from([(
1155 actor_id,
1156 InflightActorInfo {
1157 worker_id: WorkerId::new(1),
1158 vnode_bitmap: None,
1159 splits: vec![],
1160 },
1161 )]),
1162 state_table_ids: HashSet::new(),
1163 },
1164 );
1165
1166 let job_extra_info = HashMap::from([(
1167 job_id,
1168 StreamingJobExtraInfo {
1169 timezone: Some("UTC".to_owned()),
1170 config_override: "cfg".into(),
1171 job_definition: "definition".to_owned(),
1172 backfill_orders: None,
1173 refresh_interval_sec: None,
1174 },
1175 )]);
1176
1177 let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
1178
1179 let actor = stream_actors.get(&actor_id).unwrap();
1180 assert_eq!(actor.actor_id, actor_id);
1181 assert_eq!(actor.fragment_id, fragment_id);
1182 assert_eq!(actor.mview_definition, "definition");
1183 assert_eq!(&*actor.config_override, "cfg");
1184 let expr_ctx = actor.expr_context.as_ref().unwrap();
1185 assert_eq!(expr_ctx.time_zone, "UTC");
1186 }
1187}