use std::collections::VecDeque;
use std::iter::once;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{anyhow, Context};
use assert_matches::assert_matches;
use parking_lot::Mutex;
use prometheus::HistogramTimer;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::meta::PausedReason;
use tokio::select;
use tokio::sync::{oneshot, watch};
use tokio::time::Interval;
use super::notifier::Notifier;
use super::{Command, Scheduled};
use crate::barrier::context::GlobalBarrierWorkerContext;
use crate::hummock::HummockManagerRef;
use crate::rpc::metrics::MetaMetrics;
use crate::{MetaError, MetaResult};
pub(super) struct NewBarrier {
pub command: Option<(DatabaseId, Command, Vec<Notifier>)>,
pub span: tracing::Span,
pub checkpoint: bool,
}
struct Inner {
queue: Mutex<ScheduledQueue>,
changed_tx: watch::Sender<()>,
metrics: Arc<MetaMetrics>,
}
#[derive(Debug)]
enum QueueStatus {
Ready,
Blocked(String),
}
struct ScheduledQueueItem {
database_id: DatabaseId,
command: Command,
notifiers: Vec<Notifier>,
send_latency_timer: HistogramTimer,
span: tracing::Span,
}
pub(super) struct ScheduledQueue {
queue: VecDeque<ScheduledQueueItem>,
status: QueueStatus,
}
impl ScheduledQueue {
fn new() -> Self {
Self {
queue: VecDeque::new(),
status: QueueStatus::Ready,
}
}
fn mark_blocked(&mut self, reason: String) {
self.status = QueueStatus::Blocked(reason);
}
fn mark_ready(&mut self) {
self.status = QueueStatus::Ready;
}
fn len(&self) -> usize {
self.queue.len()
}
fn push_back(&mut self, scheduled: ScheduledQueueItem) -> MetaResult<()> {
if let QueueStatus::Blocked(reason) = &self.status
&& !matches!(
scheduled.command,
Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
)
{
return Err(MetaError::unavailable(reason));
}
self.queue.push_back(scheduled);
Ok(())
}
}
fn tracing_span() -> tracing::Span {
if tracing::Span::current().is_none() {
tracing::Span::none()
} else {
tracing::info_span!(
"barrier",
checkpoint = tracing::field::Empty,
epoch = tracing::field::Empty
)
}
}
impl Inner {
fn new_scheduled(
&self,
database_id: DatabaseId,
command: Command,
notifiers: impl IntoIterator<Item = Notifier>,
) -> ScheduledQueueItem {
let span = tracing_span();
ScheduledQueueItem {
database_id,
command,
notifiers: notifiers.into_iter().collect(),
send_latency_timer: self.metrics.barrier_send_latency.start_timer(),
span,
}
}
}
#[derive(Clone)]
pub struct BarrierScheduler {
inner: Arc<Inner>,
hummock_manager: HummockManagerRef,
}
impl BarrierScheduler {
pub fn new_pair(
hummock_manager: HummockManagerRef,
metrics: Arc<MetaMetrics>,
) -> (Self, ScheduledBarriers) {
let inner = Arc::new(Inner {
queue: Mutex::new(ScheduledQueue::new()),
changed_tx: watch::channel(()).0,
metrics,
});
(
Self {
inner: inner.clone(),
hummock_manager,
},
ScheduledBarriers { inner },
)
}
fn push(&self, scheduleds: impl IntoIterator<Item = ScheduledQueueItem>) -> MetaResult<()> {
let mut queue = self.inner.queue.lock();
for scheduled in scheduleds {
queue.push_back(scheduled)?;
if queue.len() == 1 {
self.inner.changed_tx.send(()).ok();
}
}
Ok(())
}
pub fn try_cancel_scheduled_create(&self, table_id: TableId) -> bool {
let queue = &mut self.inner.queue.lock();
if let Some(idx) = queue.queue.iter().position(|scheduled| {
if let Command::CreateStreamingJob { info, .. } = &scheduled.command
&& info.stream_job_fragments.stream_job_id() == table_id
{
true
} else {
false
}
}) {
queue.queue.remove(idx).unwrap();
true
} else {
false
}
}
async fn run_multiple_commands(
&self,
database_id: DatabaseId,
commands: Vec<Command>,
) -> MetaResult<()> {
let mut contexts = Vec::with_capacity(commands.len());
let mut scheduleds = Vec::with_capacity(commands.len());
for command in commands {
let (started_tx, started_rx) = oneshot::channel();
let (collect_tx, collect_rx) = oneshot::channel();
contexts.push((started_rx, collect_rx));
scheduleds.push(self.inner.new_scheduled(
database_id,
command,
once(Notifier {
started: Some(started_tx),
collected: Some(collect_tx),
}),
));
}
self.push(scheduleds)?;
for (injected_rx, collect_rx) in contexts {
tracing::trace!("waiting for injected_rx");
injected_rx
.await
.ok()
.context("failed to inject barrier")??;
tracing::trace!("waiting for collect_rx");
collect_rx
.await
.ok()
.context("failed to collect barrier")??;
}
Ok(())
}
pub async fn run_config_change_command_with_pause(
&self,
database_id: DatabaseId,
command: Command,
) -> MetaResult<()> {
self.run_multiple_commands(
database_id,
vec![
Command::pause(PausedReason::ConfigChange),
command,
Command::resume(PausedReason::ConfigChange),
],
)
.await
}
pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
tracing::trace!("run_command: {:?}", command);
let ret = self.run_multiple_commands(database_id, vec![command]).await;
tracing::trace!("run_command finished");
ret
}
pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
let start = Instant::now();
tracing::debug!("start barrier flush");
self.run_multiple_commands(database_id, vec![Command::Flush])
.await?;
let elapsed = Instant::now().duration_since(start);
tracing::debug!("barrier flushed in {:?}", elapsed);
let version_id = self.hummock_manager.get_version_id().await;
Ok(version_id)
}
}
pub struct ScheduledBarriers {
inner: Arc<Inner>,
}
pub(super) struct PeriodicBarriers {
min_interval: Interval,
force_checkpoint: bool,
num_uncheckpointed_barrier: usize,
checkpoint_frequency: usize,
}
impl PeriodicBarriers {
pub(super) fn new(min_interval: Duration, checkpoint_frequency: usize) -> PeriodicBarriers {
Self {
min_interval: tokio::time::interval(min_interval),
force_checkpoint: false,
num_uncheckpointed_barrier: 0,
checkpoint_frequency,
}
}
pub(super) fn set_min_interval(&mut self, min_interval: Duration) {
let set_new_interval = min_interval != self.min_interval.period();
if set_new_interval {
let mut min_interval = tokio::time::interval(min_interval);
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
self.min_interval = min_interval;
}
}
pub(super) async fn next_barrier(
&mut self,
context: &impl GlobalBarrierWorkerContext,
) -> NewBarrier {
let checkpoint = self.try_get_checkpoint();
let scheduled = select! {
biased;
scheduled = context.next_scheduled() => {
self.min_interval.reset();
let checkpoint = scheduled.command.need_checkpoint() || checkpoint;
NewBarrier {
command: Some((scheduled.database_id, scheduled.command, scheduled.notifiers)),
span: scheduled.span,
checkpoint,
}
},
_ = self.min_interval.tick() => {
NewBarrier {
command: None,
span: tracing_span(),
checkpoint,
}
}
};
self.update_num_uncheckpointed_barrier(scheduled.checkpoint);
scheduled
}
}
impl ScheduledBarriers {
pub(super) async fn next_scheduled(&self) -> Scheduled {
loop {
let mut rx = self.inner.changed_tx.subscribe();
{
let mut queue = self.inner.queue.lock();
if let Some(item) = queue.queue.pop_front() {
item.send_latency_timer.observe_duration();
break Scheduled {
database_id: item.database_id,
command: item.command,
notifiers: item.notifiers,
span: item.span,
};
}
}
rx.changed().await.unwrap();
}
}
}
impl ScheduledBarriers {
pub(super) fn pre_apply_drop_cancel(&self) -> bool {
self.pre_apply_drop_cancel_scheduled()
}
pub(super) fn abort_and_mark_blocked(&self, reason: impl Into<String> + Copy) {
let mut queue = self.inner.queue.lock();
queue.mark_blocked(reason.into());
while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.pop_front() {
notifiers
.into_iter()
.for_each(|notify| notify.notify_collection_failed(anyhow!(reason.into()).into()))
}
}
pub(super) fn mark_ready(&self) {
let mut queue = self.inner.queue.lock();
queue.mark_ready();
}
pub(super) fn pre_apply_drop_cancel_scheduled(&self) -> bool {
let mut queue = self.inner.queue.lock();
assert_matches!(queue.status, QueueStatus::Blocked(_));
let mut applied = false;
while let Some(ScheduledQueueItem {
notifiers, command, ..
}) = queue.queue.pop_front()
{
match command {
Command::DropStreamingJobs { .. } => {
applied = true;
}
Command::DropSubscription { .. } => {}
_ => {
unreachable!("only drop and cancel streaming jobs should be buffered");
}
}
notifiers.into_iter().for_each(|notify| {
notify.notify_collected();
});
}
applied
}
}
impl PeriodicBarriers {
fn try_get_checkpoint(&self) -> bool {
self.num_uncheckpointed_barrier + 1 >= self.checkpoint_frequency || self.force_checkpoint
}
pub fn force_checkpoint_in_next_barrier(&mut self) {
self.force_checkpoint = true;
}
pub fn set_checkpoint_frequency(&mut self, frequency: usize) {
self.checkpoint_frequency = frequency;
}
fn update_num_uncheckpointed_barrier(&mut self, checkpoint: bool) {
if checkpoint {
self.num_uncheckpointed_barrier = 0;
self.force_checkpoint = false;
} else {
self.num_uncheckpointed_barrier += 1;
}
}
}