risingwave_meta/barrier/checkpoint/
state.rs1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::catalog::{DatabaseId, TableId};
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use tracing::warn;
23
24use crate::barrier::info::{
25    BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo, SharedActorInfos, SubscriberType,
26};
27use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
28
29pub(crate) struct BarrierWorkerState {
31    in_flight_prev_epoch: TracedEpoch,
36
37    pending_non_checkpoint_barriers: Vec<u64>,
39
40    pub(super) inflight_graph_info: InflightDatabaseInfo,
42
43    is_paused: bool,
45}
46
47impl BarrierWorkerState {
48    pub(super) fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
49        Self {
50            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
51            pending_non_checkpoint_barriers: vec![],
52            inflight_graph_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
53            is_paused: false,
54        }
55    }
56
57    pub fn recovery(
58        database_id: DatabaseId,
59        shared_actor_infos: SharedActorInfos,
60        in_flight_prev_epoch: TracedEpoch,
61        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
62        is_paused: bool,
63    ) -> Self {
64        Self {
65            in_flight_prev_epoch,
66            pending_non_checkpoint_barriers: vec![],
67            inflight_graph_info: InflightDatabaseInfo::recover(
68                database_id,
69                jobs,
70                shared_actor_infos,
71            ),
72            is_paused,
73        }
74    }
75
76    pub fn is_paused(&self) -> bool {
77        self.is_paused
78    }
79
80    fn set_is_paused(&mut self, is_paused: bool) {
81        if self.is_paused != is_paused {
82            tracing::info!(
83                currently_paused = self.is_paused,
84                newly_paused = is_paused,
85                "update paused state"
86            );
87            self.is_paused = is_paused;
88        }
89    }
90
91    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
92        &self.in_flight_prev_epoch
93    }
94
95    pub fn next_barrier_info(
97        &mut self,
98        command: Option<&Command>,
99        is_checkpoint: bool,
100        curr_epoch: TracedEpoch,
101    ) -> Option<BarrierInfo> {
102        if self.inflight_graph_info.is_empty()
103            && !matches!(&command, Some(Command::CreateStreamingJob { .. }))
104        {
105            return None;
106        };
107        assert!(
108            self.in_flight_prev_epoch.value() < curr_epoch.value(),
109            "curr epoch regress. {} > {}",
110            self.in_flight_prev_epoch.value(),
111            curr_epoch.value()
112        );
113        let prev_epoch = self.in_flight_prev_epoch.clone();
114        self.in_flight_prev_epoch = curr_epoch.clone();
115        self.pending_non_checkpoint_barriers
116            .push(prev_epoch.value().0);
117        let kind = if is_checkpoint {
118            let epochs = take(&mut self.pending_non_checkpoint_barriers);
119            BarrierKind::Checkpoint(epochs)
120        } else {
121            BarrierKind::Barrier
122        };
123        Some(BarrierInfo {
124            prev_epoch,
125            curr_epoch,
126            kind,
127        })
128    }
129
130    pub fn apply_command(
135        &mut self,
136        command: Option<&Command>,
137    ) -> (
138        InflightDatabaseInfo,
139        HashMap<TableId, u64>,
140        HashSet<TableId>,
141        HashSet<JobId>,
142        bool,
143    ) {
144        let fragment_changes = if let Some(Command::CreateStreamingJob {
146            job_type: CreateStreamingJobType::SnapshotBackfill(_),
147            ..
148        }) = command
149        {
150            None
151        } else if let Some((new_job_id, fragment_changes)) =
152            command.and_then(Command::fragment_changes)
153        {
154            self.inflight_graph_info
155                .pre_apply(new_job_id, &fragment_changes);
156            Some(fragment_changes)
157        } else {
158            None
159        };
160
161        match &command {
162            Some(Command::CreateSubscription {
163                subscription_id,
164                upstream_mv_table_id,
165                retention_second,
166            }) => {
167                self.inflight_graph_info.register_subscriber(
168                    upstream_mv_table_id.as_job_id(),
169                    *subscription_id,
170                    SubscriberType::Subscription(*retention_second),
171                );
172            }
173            Some(Command::CreateStreamingJob {
174                info,
175                job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
176                ..
177            }) => {
178                for upstream_mv_table_id in snapshot_backfill_info
179                    .upstream_mv_table_id_to_backfill_epoch
180                    .keys()
181                {
182                    self.inflight_graph_info.register_subscriber(
183                        upstream_mv_table_id.as_job_id(),
184                        info.streaming_job.id().as_raw_id(),
185                        SubscriberType::SnapshotBackfill,
186                    );
187                }
188            }
189            _ => {}
190        };
191
192        let info = self.inflight_graph_info.clone();
193
194        if let Some(fragment_changes) = fragment_changes {
195            self.inflight_graph_info.post_apply(&fragment_changes);
196        }
197
198        let mut table_ids_to_commit: HashSet<_> = info.existing_table_ids().collect();
199        let mut jobs_to_wait = HashSet::new();
200        if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
201            for (table_id, (_, graph_info)) in jobs_to_merge {
202                jobs_to_wait.insert(*table_id);
203                table_ids_to_commit.extend(graph_info.existing_table_ids());
204                self.inflight_graph_info.add_existing(graph_info.clone());
205            }
206        }
207
208        match &command {
209            Some(Command::DropSubscription {
210                subscription_id,
211                upstream_mv_table_id,
212            }) => {
213                if self
214                    .inflight_graph_info
215                    .unregister_subscriber(upstream_mv_table_id.as_job_id(), *subscription_id)
216                    .is_none()
217                {
218                    warn!(subscription_id, %upstream_mv_table_id, "no subscription to drop");
219                }
220            }
221            Some(Command::MergeSnapshotBackfillStreamingJobs(snapshot_backfill_jobs)) => {
222                for (snapshot_backfill_job_id, (upstream_mv_table_ids, _)) in snapshot_backfill_jobs
223                {
224                    for upstream_mv_table_id in upstream_mv_table_ids {
225                        assert_matches!(
226                            self.inflight_graph_info.unregister_subscriber(
227                                upstream_mv_table_id.as_job_id(),
228                                snapshot_backfill_job_id.as_raw_id()
229                            ),
230                            Some(SubscriberType::SnapshotBackfill)
231                        );
232                    }
233                }
234            }
235            _ => {}
236        }
237
238        let prev_is_paused = self.is_paused();
239        let curr_is_paused = match command {
240            Some(Command::Pause) => true,
241            Some(Command::Resume) => false,
242            _ => prev_is_paused,
243        };
244        self.set_is_paused(curr_is_paused);
245
246        (
247            info,
248            self.inflight_graph_info.max_subscription_retention(),
249            table_ids_to_commit,
250            jobs_to_wait,
251            prev_is_paused,
252        )
253    }
254}