risingwave_meta/barrier/checkpoint/
state.rsuse std::collections::HashSet;
use std::mem::take;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::meta::PausedReason;
use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightSubscriptionInfo};
use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
use crate::controller::fragment::InflightFragmentInfo;
pub(crate) struct BarrierWorkerState {
in_flight_prev_epoch: TracedEpoch,
pending_non_checkpoint_barriers: Vec<u64>,
pub(super) inflight_graph_info: InflightDatabaseInfo,
pub(super) inflight_subscription_info: InflightSubscriptionInfo,
paused_reason: Option<PausedReason>,
}
impl BarrierWorkerState {
pub(super) fn new() -> Self {
Self {
in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
pending_non_checkpoint_barriers: vec![],
inflight_graph_info: InflightDatabaseInfo::empty(),
inflight_subscription_info: InflightSubscriptionInfo::default(),
paused_reason: None,
}
}
pub fn recovery(
in_flight_prev_epoch: TracedEpoch,
inflight_graph_info: InflightDatabaseInfo,
inflight_subscription_info: InflightSubscriptionInfo,
paused_reason: Option<PausedReason>,
) -> Self {
Self {
in_flight_prev_epoch,
pending_non_checkpoint_barriers: vec![],
inflight_graph_info,
inflight_subscription_info,
paused_reason,
}
}
pub fn paused_reason(&self) -> Option<PausedReason> {
self.paused_reason
}
fn set_paused_reason(&mut self, paused_reason: Option<PausedReason>) {
if self.paused_reason != paused_reason {
tracing::info!(current = ?self.paused_reason, new = ?paused_reason, "update paused state");
self.paused_reason = paused_reason;
}
}
pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
&self.in_flight_prev_epoch
}
pub fn next_barrier_info(
&mut self,
command: Option<&Command>,
is_checkpoint: bool,
curr_epoch: TracedEpoch,
) -> Option<BarrierInfo> {
if self.inflight_graph_info.is_empty()
&& !matches!(&command, Some(Command::CreateStreamingJob { .. }))
{
return None;
};
assert!(
self.in_flight_prev_epoch.value() < curr_epoch.value(),
"curr epoch regress. {} > {}",
self.in_flight_prev_epoch.value(),
curr_epoch.value()
);
let prev_epoch = self.in_flight_prev_epoch.clone();
self.in_flight_prev_epoch = curr_epoch.clone();
self.pending_non_checkpoint_barriers
.push(prev_epoch.value().0);
let kind = if is_checkpoint {
let epochs = take(&mut self.pending_non_checkpoint_barriers);
BarrierKind::Checkpoint(epochs)
} else {
BarrierKind::Barrier
};
Some(BarrierInfo {
prev_epoch,
curr_epoch,
kind,
})
}
pub fn apply_command(
&mut self,
command: Option<&Command>,
) -> (
InflightDatabaseInfo,
InflightSubscriptionInfo,
HashSet<TableId>,
HashSet<TableId>,
Option<PausedReason>,
) {
let fragment_changes = if let Some(Command::CreateStreamingJob {
job_type: CreateStreamingJobType::SnapshotBackfill(_),
..
}) = command
{
None
} else if let Some(fragment_changes) = command.and_then(Command::fragment_changes) {
self.inflight_graph_info.pre_apply(&fragment_changes);
Some(fragment_changes)
} else {
None
};
if let Some(command) = &command {
self.inflight_subscription_info.pre_apply(command);
}
let info = self.inflight_graph_info.clone();
let subscription_info = self.inflight_subscription_info.clone();
if let Some(fragment_changes) = fragment_changes {
self.inflight_graph_info.post_apply(&fragment_changes);
}
let mut table_ids_to_commit: HashSet<_> = info.existing_table_ids().collect();
let mut jobs_to_wait = HashSet::new();
if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
for (table_id, (_, graph_info)) in jobs_to_merge {
jobs_to_wait.insert(*table_id);
table_ids_to_commit.extend(InflightFragmentInfo::existing_table_ids(
graph_info.fragment_infos(),
));
self.inflight_graph_info.extend(graph_info.clone());
}
}
if let Some(command) = command {
self.inflight_subscription_info.post_apply(command);
}
let prev_paused_reason = self.paused_reason;
let curr_paused_reason = Command::next_paused_reason(command, prev_paused_reason);
self.set_paused_reason(curr_paused_reason);
(
info,
subscription_info,
table_ids_to_commit,
jobs_to_wait,
prev_paused_reason,
)
}
}