use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::mem::take;
use anyhow::anyhow;
use fail::fail_point;
use prometheus::HistogramTimer;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_meta_model::WorkerId;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tracing::{debug, warn};
use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
use crate::barrier::checkpoint::state::BarrierWorkerState;
use crate::barrier::command::CommandContext;
use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
use crate::barrier::notifier::Notifier;
use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
use crate::barrier::rpc::{from_partial_graph_id, ControlStreamManager};
use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
use crate::barrier::utils::collect_creating_job_commit_epoch_info;
use crate::barrier::{
BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
InflightSubscriptionInfo, SnapshotBackfillInfo, TracedEpoch,
};
use crate::manager::ActiveStreamingWorkerNodes;
use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::{MetaError, MetaResult};
#[derive(Default)]
pub(crate) struct CheckpointControl {
databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
hummock_version_stats: HummockVersionStats,
}
impl CheckpointControl {
pub(crate) fn new(
databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
hummock_version_stats: HummockVersionStats,
) -> Self {
Self {
databases,
hummock_version_stats,
}
}
pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
self.hummock_version_stats = output.hummock_version_stats;
for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
self.databases
.get_mut(&database_id)
.expect("should exist")
.ack_completed(command_prev_epoch, creating_job_epochs);
}
}
pub(crate) fn next_complete_barrier_task(
&mut self,
mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
) -> Option<CompleteBarrierTask> {
let mut task = None;
for database in self.databases.values_mut() {
let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
}
task
}
pub(crate) fn barrier_collected(
&mut self,
resp: BarrierCompleteResponse,
control_stream_manager: &mut ControlStreamManager,
) -> MetaResult<()> {
let database_id = from_partial_graph_id(resp.partial_graph_id).0;
self.databases
.get_mut(&database_id)
.expect("should exist")
.barrier_collected(resp, control_stream_manager)
}
pub(crate) fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
self.databases
.values()
.all(|database| database.can_inject_barrier(in_flight_barrier_nums))
}
pub(crate) fn max_prev_epoch(&self) -> Option<TracedEpoch> {
self.databases
.values()
.map(|database| database.state.in_flight_prev_epoch())
.max_by_key(|epoch| epoch.value())
.cloned()
}
pub(crate) fn handle_new_barrier(
&mut self,
new_barrier: NewBarrier,
control_stream_manager: &mut ControlStreamManager,
active_streaming_nodes: &ActiveStreamingWorkerNodes,
) -> MetaResult<()> {
let NewBarrier {
command,
span,
checkpoint,
} = new_barrier;
if let Some((database_id, command, notifiers)) = command {
let max_prev_epoch = self.max_prev_epoch();
let (database, max_prev_epoch) = match self.databases.entry(database_id) {
Entry::Occupied(entry) => (
entry.into_mut(),
max_prev_epoch.expect("should exist when having some database"),
),
Entry::Vacant(entry) => match &command {
Command::CreateStreamingJob {
job_type: CreateStreamingJobType::Normal,
..
} => {
let new_database = DatabaseCheckpointControl::new(database_id);
let max_prev_epoch = if let Some(max_prev_epoch) = max_prev_epoch {
if max_prev_epoch.value()
< new_database.state.in_flight_prev_epoch().value()
{
new_database.state.in_flight_prev_epoch().clone()
} else {
max_prev_epoch
}
} else {
new_database.state.in_flight_prev_epoch().clone()
};
control_stream_manager.add_partial_graph(database_id, None)?;
(entry.insert(new_database), max_prev_epoch)
}
Command::Flush
| Command::Pause(PausedReason::Manual)
| Command::Resume(PausedReason::Manual) => {
for mut notifier in notifiers {
notifier.notify_started();
notifier.notify_collected();
}
warn!(?command, "skip command for empty database");
return Ok(());
}
_ => {
panic!("new database graph info can only be created for normal creating streaming job, but get command: {} {:?}", database_id, command)
}
},
};
let curr_epoch = max_prev_epoch.next();
database.handle_new_barrier(
Some((command, notifiers)),
checkpoint,
span.clone(),
control_stream_manager,
active_streaming_nodes,
&self.hummock_version_stats,
curr_epoch.clone(),
)?;
for database in self.databases.values_mut() {
if database.database_id == database_id {
continue;
}
database.handle_new_barrier(
None,
checkpoint,
span.clone(),
control_stream_manager,
active_streaming_nodes,
&self.hummock_version_stats,
curr_epoch.clone(),
)?;
}
} else {
let Some(max_prev_epoch) = self.max_prev_epoch() else {
assert!(self.databases.is_empty());
return Ok(());
};
let curr_epoch = max_prev_epoch.next();
for database in self.databases.values_mut() {
database.handle_new_barrier(
None,
checkpoint,
span.clone(),
control_stream_manager,
active_streaming_nodes,
&self.hummock_version_stats,
curr_epoch.clone(),
)?;
}
}
Ok(())
}
pub(crate) fn update_barrier_nums_metrics(&self) {
self.databases
.values()
.for_each(|database| database.update_barrier_nums_metrics());
}
pub(crate) fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
let mut progress = HashMap::new();
for database_checkpoint_control in self.databases.values() {
progress.extend(
database_checkpoint_control
.create_mview_tracker
.gen_ddl_progress(),
);
for creating_job in database_checkpoint_control
.creating_streaming_job_controls
.values()
{
progress.extend([(
creating_job
.info
.stream_job_fragments
.stream_job_id()
.table_id,
creating_job.gen_ddl_progress(),
)]);
}
}
progress
}
pub(crate) fn is_failed_at_worker_err(&self, worker_id: WorkerId) -> bool {
for database_checkpoint_control in self.databases.values() {
let failed_barrier =
database_checkpoint_control.barrier_wait_collect_from_worker(worker_id as _);
if failed_barrier.is_some()
|| database_checkpoint_control
.state
.inflight_graph_info
.contains_worker(worker_id as _)
|| database_checkpoint_control
.creating_streaming_job_controls
.values()
.any(|job| job.is_wait_on_worker(worker_id))
{
return true;
}
}
false
}
pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
for (_, node) in self
.databases
.values_mut()
.flat_map(|database| take(&mut database.command_ctx_queue))
{
for notifier in node.notifiers {
notifier.notify_failed(err.clone());
}
node.enqueue_time.observe_duration();
}
self.databases
.values_mut()
.for_each(|database| database.create_mview_tracker.abort_all());
}
pub(crate) fn subscriptions(
&self,
) -> impl Iterator<Item = (DatabaseId, &InflightSubscriptionInfo)> + '_ {
self.databases.iter().map(|(database_id, database)| {
(*database_id, &database.state.inflight_subscription_info)
})
}
}
pub(crate) struct DatabaseCheckpointControl {
database_id: DatabaseId,
state: BarrierWorkerState,
command_ctx_queue: BTreeMap<u64, EpochNode>,
completing_barrier: Option<(u64, bool)>,
creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
create_mview_tracker: CreateMviewProgressTracker,
}
impl DatabaseCheckpointControl {
fn new(database_id: DatabaseId) -> Self {
Self {
database_id,
state: BarrierWorkerState::new(),
command_ctx_queue: Default::default(),
completing_barrier: None,
creating_streaming_job_controls: Default::default(),
create_mview_tracker: Default::default(),
}
}
pub(crate) fn recovery(
database_id: DatabaseId,
create_mview_tracker: CreateMviewProgressTracker,
state: BarrierWorkerState,
) -> Self {
Self {
database_id,
state,
command_ctx_queue: Default::default(),
completing_barrier: None,
creating_streaming_job_controls: Default::default(),
create_mview_tracker,
}
}
fn total_command_num(&self) -> usize {
self.command_ctx_queue.len()
+ match &self.completing_barrier {
Some(_) => 1,
None => 0,
}
}
fn update_barrier_nums_metrics(&self) {
let database_id_str = self.database_id.database_id.to_string();
GLOBAL_META_METRICS
.in_flight_barrier_nums
.with_label_values(&[&database_id_str])
.set(
self.command_ctx_queue
.values()
.filter(|x| x.state.is_inflight())
.count() as i64,
);
GLOBAL_META_METRICS
.all_barrier_nums
.with_label_values(&[&database_id_str])
.set(self.total_command_num() as i64);
}
fn jobs_to_merge(
&self,
) -> Option<HashMap<TableId, (SnapshotBackfillInfo, InflightStreamingJobInfo)>> {
let mut table_ids_to_merge = HashMap::new();
for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
table_ids_to_merge.insert(
*table_id,
(
creating_streaming_job.snapshot_backfill_info.clone(),
graph_info,
),
);
}
}
if table_ids_to_merge.is_empty() {
None
} else {
Some(table_ids_to_merge)
}
}
fn enqueue_command(
&mut self,
command_ctx: CommandContext,
notifiers: Vec<Notifier>,
node_to_collect: HashSet<WorkerId>,
creating_jobs_to_wait: HashSet<TableId>,
) {
let timer = GLOBAL_META_METRICS.barrier_latency.start_timer();
if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
assert_eq!(
command_ctx.barrier_info.prev_epoch.value(),
node.command_ctx.barrier_info.curr_epoch.value()
);
}
tracing::trace!(
prev_epoch = command_ctx.barrier_info.prev_epoch(),
?creating_jobs_to_wait,
"enqueue command"
);
self.command_ctx_queue.insert(
command_ctx.barrier_info.prev_epoch(),
EpochNode {
enqueue_time: timer,
state: BarrierEpochState {
node_to_collect,
resps: vec![],
creating_jobs_to_wait,
finished_jobs: HashMap::new(),
},
command_ctx,
notifiers,
},
);
}
fn barrier_collected(
&mut self,
resp: BarrierCompleteResponse,
control_stream_manager: &mut ControlStreamManager,
) -> MetaResult<()> {
let worker_id = resp.worker_id;
let prev_epoch = resp.epoch;
tracing::trace!(
worker_id,
prev_epoch,
partial_graph_id = resp.partial_graph_id,
"barrier collected"
);
let (database_id, creating_job_id) = from_partial_graph_id(resp.partial_graph_id);
assert_eq!(database_id, self.database_id);
match creating_job_id {
None => {
if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
assert!(node.state.node_to_collect.remove(&(worker_id as _)));
node.state.resps.push(resp);
} else {
panic!(
"collect barrier on non-existing barrier: {}, {}",
prev_epoch, worker_id
);
}
}
Some(creating_job_id) => {
self.creating_streaming_job_controls
.get_mut(&creating_job_id)
.expect("should exist")
.collect(prev_epoch, worker_id as _, resp, control_stream_manager)?;
}
}
Ok(())
}
fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
let in_flight_not_full = self
.command_ctx_queue
.values()
.filter(|x| x.state.is_inflight())
.count()
< in_flight_barrier_nums;
let should_pause = self
.command_ctx_queue
.last_key_value()
.and_then(|(_, x)| {
x.command_ctx
.command
.as_ref()
.map(Command::should_pause_inject_barrier)
})
.or(self
.completing_barrier
.map(|(_, should_pause)| should_pause))
.unwrap_or(false);
debug_assert_eq!(
self.command_ctx_queue
.values()
.filter_map(|node| {
node.command_ctx
.command
.as_ref()
.map(Command::should_pause_inject_barrier)
})
.chain(
self.completing_barrier
.map(|(_, should_pause)| should_pause)
.into_iter()
)
.any(|should_pause| should_pause),
should_pause
);
in_flight_not_full && !should_pause
}
pub(crate) fn barrier_wait_collect_from_worker(
&self,
worker_id: WorkerId,
) -> Option<&BarrierInfo> {
for epoch_node in self.command_ctx_queue.values() {
if epoch_node.state.node_to_collect.contains(&worker_id) {
return Some(&epoch_node.command_ctx.barrier_info);
}
}
None
}
}
impl DatabaseCheckpointControl {
fn collect_backfill_pinned_upstream_log_epoch(
&self,
) -> HashMap<TableId, (u64, HashSet<TableId>)> {
self.creating_streaming_job_controls
.iter()
.filter_map(|(table_id, creating_job)| {
creating_job
.pinned_upstream_log_epoch()
.map(|progress_epoch| {
(
*table_id,
(
progress_epoch,
creating_job
.snapshot_backfill_info
.upstream_mv_table_ids
.clone(),
),
)
})
})
.collect()
}
fn next_complete_barrier_task(
&mut self,
task: &mut Option<CompleteBarrierTask>,
mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
hummock_version_stats: &HummockVersionStats,
) {
let mut creating_jobs_task = vec![];
{
let mut finished_jobs = Vec::new();
let min_upstream_inflight_barrier = self
.command_ctx_queue
.first_key_value()
.map(|(epoch, _)| *epoch);
for (table_id, job) in &mut self.creating_streaming_job_controls {
if let Some((epoch, resps, status)) =
job.start_completing(min_upstream_inflight_barrier)
{
let is_first_time = match status {
CompleteJobType::First => true,
CompleteJobType::Normal => false,
CompleteJobType::Finished => {
finished_jobs.push((*table_id, epoch, resps));
continue;
}
};
creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
}
}
if !finished_jobs.is_empty()
&& let Some((_, control_stream_manager)) = &mut context
{
control_stream_manager.remove_partial_graph(
self.database_id,
finished_jobs
.iter()
.map(|(table_id, _, _)| *table_id)
.collect(),
);
}
for (table_id, epoch, resps) in finished_jobs {
let epoch_state = &mut self
.command_ctx_queue
.get_mut(&epoch)
.expect("should exist")
.state;
assert!(epoch_state.creating_jobs_to_wait.remove(&table_id));
debug!(epoch, ?table_id, "finish creating job");
let creating_streaming_job = self
.creating_streaming_job_controls
.remove(&table_id)
.expect("should exist");
assert!(creating_streaming_job.is_finished());
assert!(epoch_state
.finished_jobs
.insert(table_id, (creating_streaming_job.info, resps))
.is_none());
}
}
assert!(self.completing_barrier.is_none());
while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
&& !state.is_inflight()
{
{
let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
assert!(node.state.creating_jobs_to_wait.is_empty());
assert!(node.state.node_to_collect.is_empty());
let mut finished_jobs = self.create_mview_tracker.apply_collected_command(
node.command_ctx.command.as_ref(),
&node.command_ctx.barrier_info,
&node.state.resps,
hummock_version_stats,
);
if !node.command_ctx.barrier_info.kind.is_checkpoint() {
assert!(finished_jobs.is_empty());
node.notifiers.into_iter().for_each(|notifier| {
notifier.notify_collected();
});
if let Some((scheduled_barriers, _)) = &mut context
&& self.create_mview_tracker.has_pending_finished_jobs()
&& self
.command_ctx_queue
.values()
.all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
{
scheduled_barriers.force_checkpoint_in_next_barrier();
}
continue;
}
node.state
.finished_jobs
.drain()
.for_each(|(_, (info, resps))| {
node.state.resps.extend(resps);
finished_jobs.push(TrackingJob::New(TrackingCommand {
info,
replace_table_info: None,
}));
});
let task = task.get_or_insert_default();
node.command_ctx.collect_commit_epoch_info(
&mut task.commit_info,
take(&mut node.state.resps),
self.collect_backfill_pinned_upstream_log_epoch(),
);
self.completing_barrier = Some((
node.command_ctx.barrier_info.prev_epoch(),
node.command_ctx
.command
.as_ref()
.map(|c| c.should_pause_inject_barrier())
.unwrap_or(false),
));
task.finished_jobs.extend(finished_jobs);
task.notifiers.extend(node.notifiers);
task.epoch_infos
.try_insert(
self.database_id,
(Some((node.command_ctx, node.enqueue_time)), vec![]),
)
.expect("non duplicate");
break;
}
}
if !creating_jobs_task.is_empty() {
let task = task.get_or_insert_default();
for (table_id, epoch, resps, is_first_time) in creating_jobs_task {
collect_creating_job_commit_epoch_info(
&mut task.commit_info,
epoch,
resps,
self.creating_streaming_job_controls[&table_id]
.info
.stream_job_fragments
.all_table_ids()
.map(TableId::new),
is_first_time,
);
let (_, creating_job_epochs) =
task.epoch_infos.entry(self.database_id).or_default();
creating_job_epochs.push((table_id, epoch));
}
}
}
fn ack_completed(
&mut self,
command_prev_epoch: Option<u64>,
creating_job_epochs: Vec<(TableId, u64)>,
) {
{
assert_eq!(
self.completing_barrier
.take()
.map(|(prev_epoch, _)| prev_epoch),
command_prev_epoch
);
for (table_id, epoch) in creating_job_epochs {
self.creating_streaming_job_controls
.get_mut(&table_id)
.expect("should exist")
.ack_completed(epoch)
}
}
}
}
struct EpochNode {
enqueue_time: HistogramTimer,
state: BarrierEpochState,
command_ctx: CommandContext,
notifiers: Vec<Notifier>,
}
#[derive(Debug)]
struct BarrierEpochState {
node_to_collect: HashSet<WorkerId>,
resps: Vec<BarrierCompleteResponse>,
creating_jobs_to_wait: HashSet<TableId>,
finished_jobs: HashMap<TableId, (CreateStreamingJobCommandInfo, Vec<BarrierCompleteResponse>)>,
}
impl BarrierEpochState {
fn is_inflight(&self) -> bool {
!self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
}
}
impl DatabaseCheckpointControl {
fn handle_new_barrier(
&mut self,
command: Option<(Command, Vec<Notifier>)>,
checkpoint: bool,
span: tracing::Span,
control_stream_manager: &mut ControlStreamManager,
active_streaming_nodes: &ActiveStreamingWorkerNodes,
hummock_version_stats: &HummockVersionStats,
curr_epoch: TracedEpoch,
) -> MetaResult<()> {
let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
(Some(command), notifiers)
} else {
(None, vec![])
};
for table_to_cancel in command
.as_ref()
.map(Command::tables_to_drop)
.into_iter()
.flatten()
{
if self
.creating_streaming_job_controls
.contains_key(&table_to_cancel)
{
warn!(
table_id = table_to_cancel.table_id,
"ignore cancel command on creating streaming job"
);
for notifier in notifiers {
notifier
.notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
}
return Ok(());
}
}
if let Some(Command::RescheduleFragment { .. }) = &command {
if !self.creating_streaming_job_controls.is_empty() {
warn!("ignore reschedule when creating streaming job with snapshot backfill");
for notifier in notifiers {
notifier.notify_start_failed(
anyhow!(
"cannot reschedule when creating streaming job with snapshot backfill",
)
.into(),
);
}
return Ok(());
}
}
let Some(barrier_info) =
self.state
.next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
else {
for mut notifier in notifiers {
notifier.notify_started();
notifier.notify_collected();
}
return Ok(());
};
if let Some(Command::CreateStreamingJob {
job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
info,
}) = &command
{
if self.state.paused_reason().is_some() {
warn!("cannot create streaming job with snapshot backfill when paused");
for notifier in notifiers {
notifier.notify_start_failed(
anyhow!("cannot create streaming job with snapshot backfill when paused",)
.into(),
);
}
return Ok(());
}
let mutation = command
.as_ref()
.expect("checked Some")
.to_mutation(None)
.expect("should have some mutation in `CreateStreamingJob` command");
let job_id = info.stream_job_fragments.stream_job_id();
control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?;
self.creating_streaming_job_controls.insert(
job_id,
CreatingStreamingJobControl::new(
info.clone(),
snapshot_backfill_info.clone(),
barrier_info.prev_epoch(),
hummock_version_stats,
mutation,
),
);
}
if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command)
&& let Some(jobs_to_merge) = self.jobs_to_merge()
{
command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
}
let command = command;
let (
pre_applied_graph_info,
pre_applied_subscription_info,
table_ids_to_commit,
jobs_to_wait,
prev_paused_reason,
) = self.state.apply_command(command.as_ref());
barrier_info.prev_epoch.span().in_scope(|| {
tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
});
span.record("epoch", barrier_info.curr_epoch.value().0);
for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
}
let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
self.database_id,
command.as_ref(),
&barrier_info,
prev_paused_reason,
&pre_applied_graph_info,
&self.state.inflight_graph_info,
) {
Ok(node_to_collect) => node_to_collect,
Err(err) => {
for notifier in notifiers {
notifier.notify_failed(err.clone());
}
fail_point!("inject_barrier_err_success");
return Err(err);
}
};
notifiers.iter_mut().for_each(|n| n.notify_started());
let command_ctx = CommandContext::new(
active_streaming_nodes.current().clone(),
barrier_info,
pre_applied_subscription_info,
table_ids_to_commit.clone(),
command,
span,
);
self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
Ok(())
}
}