risingwave_meta/barrier/checkpoint/
state.rs1use std::collections::HashSet;
16use std::mem::take;
17
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_common::util::epoch::Epoch;
20
21use crate::barrier::info::{
22 BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo, InflightSubscriptionInfo,
23 SharedActorInfos,
24};
25use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
26
27pub(crate) struct BarrierWorkerState {
29 in_flight_prev_epoch: TracedEpoch,
34
35 pending_non_checkpoint_barriers: Vec<u64>,
37
38 pub(super) inflight_graph_info: InflightDatabaseInfo,
40
41 pub(super) inflight_subscription_info: InflightSubscriptionInfo,
42
43 is_paused: bool,
45}
46
47impl BarrierWorkerState {
48 pub(super) fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
49 Self {
50 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
51 pending_non_checkpoint_barriers: vec![],
52 inflight_graph_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
53 inflight_subscription_info: InflightSubscriptionInfo::default(),
54 is_paused: false,
55 }
56 }
57
58 pub fn recovery(
59 database_id: DatabaseId,
60 shared_actor_infos: SharedActorInfos,
61 in_flight_prev_epoch: TracedEpoch,
62 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
63 inflight_subscription_info: InflightSubscriptionInfo,
64 is_paused: bool,
65 ) -> Self {
66 Self {
67 in_flight_prev_epoch,
68 pending_non_checkpoint_barriers: vec![],
69 inflight_graph_info: InflightDatabaseInfo::recover(
70 database_id,
71 jobs,
72 shared_actor_infos,
73 ),
74 inflight_subscription_info,
75 is_paused,
76 }
77 }
78
79 pub fn is_paused(&self) -> bool {
80 self.is_paused
81 }
82
83 fn set_is_paused(&mut self, is_paused: bool) {
84 if self.is_paused != is_paused {
85 tracing::info!(
86 currently_paused = self.is_paused,
87 newly_paused = is_paused,
88 "update paused state"
89 );
90 self.is_paused = is_paused;
91 }
92 }
93
94 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
95 &self.in_flight_prev_epoch
96 }
97
98 pub fn next_barrier_info(
100 &mut self,
101 command: Option<&Command>,
102 is_checkpoint: bool,
103 curr_epoch: TracedEpoch,
104 ) -> Option<BarrierInfo> {
105 if self.inflight_graph_info.is_empty()
106 && !matches!(&command, Some(Command::CreateStreamingJob { .. }))
107 {
108 return None;
109 };
110 assert!(
111 self.in_flight_prev_epoch.value() < curr_epoch.value(),
112 "curr epoch regress. {} > {}",
113 self.in_flight_prev_epoch.value(),
114 curr_epoch.value()
115 );
116 let prev_epoch = self.in_flight_prev_epoch.clone();
117 self.in_flight_prev_epoch = curr_epoch.clone();
118 self.pending_non_checkpoint_barriers
119 .push(prev_epoch.value().0);
120 let kind = if is_checkpoint {
121 let epochs = take(&mut self.pending_non_checkpoint_barriers);
122 BarrierKind::Checkpoint(epochs)
123 } else {
124 BarrierKind::Barrier
125 };
126 Some(BarrierInfo {
127 prev_epoch,
128 curr_epoch,
129 kind,
130 })
131 }
132
133 pub fn apply_command(
138 &mut self,
139 command: Option<&Command>,
140 ) -> (
141 InflightDatabaseInfo,
142 InflightSubscriptionInfo,
143 HashSet<TableId>,
144 HashSet<TableId>,
145 bool,
146 ) {
147 let fragment_changes = if let Some(Command::CreateStreamingJob {
149 job_type: CreateStreamingJobType::SnapshotBackfill(_),
150 ..
151 }) = command
152 {
153 None
154 } else if let Some(fragment_changes) = command.and_then(Command::fragment_changes) {
155 self.inflight_graph_info.pre_apply(&fragment_changes);
156 Some(fragment_changes)
157 } else {
158 None
159 };
160 if let Some(command) = &command {
161 self.inflight_subscription_info.pre_apply(command);
162 }
163
164 let info = self.inflight_graph_info.clone();
165 let subscription_info = self.inflight_subscription_info.clone();
166
167 if let Some(fragment_changes) = fragment_changes {
168 self.inflight_graph_info.post_apply(&fragment_changes);
169 }
170
171 let mut table_ids_to_commit: HashSet<_> = info.existing_table_ids().collect();
172 let mut jobs_to_wait = HashSet::new();
173 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
174 for (table_id, (_, graph_info)) in jobs_to_merge {
175 jobs_to_wait.insert(*table_id);
176 table_ids_to_commit.extend(graph_info.existing_table_ids());
177 self.inflight_graph_info.add_existing(graph_info.clone());
178 }
179 }
180
181 if let Some(command) = command {
182 self.inflight_subscription_info.post_apply(command);
183 }
184
185 let prev_is_paused = self.is_paused();
186 let curr_is_paused = match command {
187 Some(Command::Pause) => true,
188 Some(Command::Resume) => false,
189 _ => prev_is_paused,
190 };
191 self.set_is_paused(curr_is_paused);
192
193 (
194 info,
195 subscription_info,
196 table_ids_to_commit,
197 jobs_to_wait,
198 prev_is_paused,
199 )
200 }
201}