risingwave_meta/barrier/
complete_task.rsuse std::collections::HashMap;
use std::future::{pending, Future};
use std::mem::replace;
use std::sync::Arc;
use anyhow::Context;
use futures::future::try_join_all;
use prometheus::HistogramTimer;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::must_match;
use risingwave_pb::hummock::HummockVersionStats;
use tokio::task::JoinHandle;
use crate::barrier::checkpoint::CheckpointControl;
use crate::barrier::command::CommandContext;
use crate::barrier::context::GlobalBarrierWorkerContext;
use crate::barrier::notifier::Notifier;
use crate::barrier::progress::TrackingJob;
use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::schedule::PeriodicBarriers;
use crate::hummock::CommitEpochInfo;
use crate::manager::MetaSrvEnv;
use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::{MetaError, MetaResult};
pub(super) enum CompletingTask {
None,
Completing {
#[expect(clippy::type_complexity)]
epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
join_handle: JoinHandle<MetaResult<HummockVersionStats>>,
},
#[expect(dead_code)]
Err(MetaError),
}
#[derive(Default)]
pub(super) struct CompleteBarrierTask {
pub(super) commit_info: CommitEpochInfo,
pub(super) finished_jobs: Vec<TrackingJob>,
pub(super) notifiers: Vec<Notifier>,
#[expect(clippy::type_complexity)]
pub(super) epoch_infos: HashMap<
DatabaseId,
(
Option<(CommandContext, HistogramTimer)>,
Vec<(TableId, u64)>,
),
>,
}
impl CompleteBarrierTask {
#[expect(clippy::type_complexity)]
pub(super) fn epochs_to_ack(&self) -> HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)> {
self.epoch_infos
.iter()
.map(|(database_id, (command_context, creating_job_epochs))| {
(
*database_id,
(
command_context
.as_ref()
.map(|(command, _)| command.barrier_info.prev_epoch.value().0),
creating_job_epochs.clone(),
),
)
})
.collect()
}
}
impl CompleteBarrierTask {
pub(super) async fn complete_barrier(
self,
context: &impl GlobalBarrierWorkerContext,
env: MetaSrvEnv,
) -> MetaResult<HummockVersionStats> {
let result: MetaResult<HummockVersionStats> = try {
let wait_commit_timer = GLOBAL_META_METRICS
.barrier_wait_commit_latency
.start_timer();
let version_stats = context.commit_epoch(self.commit_info).await?;
for command_ctx in self
.epoch_infos
.values()
.flat_map(|(command, _)| command.as_ref().map(|(command, _)| command))
{
context.post_collect_command(command_ctx).await?;
}
wait_commit_timer.observe_duration();
version_stats
};
let version_stats = {
let version_stats = match result {
Ok(version_stats) => version_stats,
Err(e) => {
for notifier in self.notifiers {
notifier.notify_collection_failed(e.clone());
}
return Err(e);
}
};
self.notifiers.into_iter().for_each(|notifier| {
notifier.notify_collected();
});
try_join_all(
self.finished_jobs
.into_iter()
.map(|finished_job| context.finish_creating_job(finished_job)),
)
.await?;
for (command_ctx, enqueue_time) in self
.epoch_infos
.into_values()
.flat_map(|(command_context, _)| command_context)
{
let duration_sec = enqueue_time.stop_and_record();
Self::report_complete_event(&env, duration_sec, &command_ctx);
GLOBAL_META_METRICS
.last_committed_barrier_time
.set(command_ctx.barrier_info.curr_epoch.value().as_unix_secs() as i64);
}
version_stats
};
Ok(version_stats)
}
}
impl CompleteBarrierTask {
fn report_complete_event(env: &MetaSrvEnv, duration_sec: f64, command_ctx: &CommandContext) {
use risingwave_pb::meta::event_log;
let event = event_log::EventBarrierComplete {
prev_epoch: command_ctx.barrier_info.prev_epoch(),
cur_epoch: command_ctx.barrier_info.curr_epoch.value().0,
duration_sec,
command: command_ctx
.command
.as_ref()
.map(|command| command.to_string())
.unwrap_or_else(|| "barrier".to_string()),
barrier_kind: command_ctx.barrier_info.kind.as_str_name().to_string(),
};
env.event_log_manager_ref()
.add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
}
}
pub(super) struct BarrierCompleteOutput {
#[expect(clippy::type_complexity)]
pub epochs_to_ack: HashMap<DatabaseId, (Option<u64>, Vec<(TableId, u64)>)>,
pub hummock_version_stats: HummockVersionStats,
}
impl CompletingTask {
pub(super) fn next_completed_barrier<'a>(
&'a mut self,
scheduled_barriers: &mut PeriodicBarriers,
checkpoint_control: &mut CheckpointControl,
control_stream_manager: &mut ControlStreamManager,
context: &Arc<impl GlobalBarrierWorkerContext>,
env: &MetaSrvEnv,
) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
if let CompletingTask::None = self {
if let Some(task) = checkpoint_control
.next_complete_barrier_task(Some((scheduled_barriers, control_stream_manager)))
{
{
let epochs_to_ack = task.epochs_to_ack();
let context = context.clone();
let env = env.clone();
let join_handle =
tokio::spawn(async move { task.complete_barrier(&*context, env).await });
*self = CompletingTask::Completing {
epochs_to_ack,
join_handle,
};
}
}
}
self.next_completed_barrier_inner()
}
async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
let CompletingTask::Completing { join_handle, .. } = self else {
return pending().await;
};
{
{
let join_result: MetaResult<_> = try {
join_handle
.await
.context("failed to join completing command")??
};
let next_completing_command_status = if let Err(e) = &join_result {
CompletingTask::Err(e.clone())
} else {
CompletingTask::None
};
let completed_command = replace(self, next_completing_command_status);
let hummock_version_stats = join_result?;
must_match!(completed_command, CompletingTask::Completing {
epochs_to_ack,
..
} => {
Ok(BarrierCompleteOutput {
epochs_to_ack,
hummock_version_stats,
})
})
}
}
}
}