risingwave_meta/barrier/
schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, anyhow};
21use assert_matches::assert_matches;
22use await_tree::InstrumentAwait;
23use itertools::Itertools;
24use parking_lot::Mutex;
25use prometheus::HistogramTimer;
26use risingwave_common::catalog::{DatabaseId, TableId};
27use risingwave_common::metrics::LabelGuardedHistogram;
28use risingwave_hummock_sdk::HummockVersionId;
29use risingwave_pb::catalog::Database;
30use rw_futures_util::pending_on_none;
31use tokio::select;
32use tokio::sync::{oneshot, watch};
33use tokio_stream::wrappers::IntervalStream;
34use tokio_stream::{StreamExt, StreamMap};
35use tracing::{info, warn};
36
37use super::notifier::Notifier;
38use super::{Command, Scheduled};
39use crate::barrier::context::GlobalBarrierWorkerContext;
40use crate::hummock::HummockManagerRef;
41use crate::rpc::metrics::MetaMetrics;
42use crate::{MetaError, MetaResult};
43
44pub(super) struct NewBarrier {
45    pub database_id: DatabaseId,
46    pub command: Option<(Command, Vec<Notifier>)>,
47    pub span: tracing::Span,
48    pub checkpoint: bool,
49}
50
51/// A queue for scheduling barriers.
52///
53/// We manually implement one here instead of using channels since we may need to update the front
54/// of the queue to add some notifiers for instant flushes.
55struct Inner {
56    queue: Mutex<ScheduledQueue>,
57
58    /// When `queue` is not empty anymore, all subscribers of this watcher will be notified.
59    changed_tx: watch::Sender<()>,
60
61    /// Used for recording send latency of each barrier.
62    metrics: Arc<MetaMetrics>,
63}
64
65#[derive(Debug)]
66enum QueueStatus {
67    /// The queue is ready to accept new command.
68    Ready,
69    /// The queue is blocked to accept new command with the given reason.
70    Blocked(String),
71}
72
73impl QueueStatus {
74    fn is_blocked(&self) -> bool {
75        matches!(self, Self::Blocked(_))
76    }
77}
78
79struct ScheduledQueueItem {
80    command: Command,
81    notifiers: Vec<Notifier>,
82    send_latency_timer: HistogramTimer,
83    span: tracing::Span,
84}
85
86struct StatusQueue<T> {
87    queue: T,
88    status: QueueStatus,
89}
90
91struct DatabaseQueue {
92    inner: VecDeque<ScheduledQueueItem>,
93    send_latency: LabelGuardedHistogram,
94}
95
96type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
97type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
98
99impl DatabaseScheduledQueue {
100    fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
101        Self {
102            queue: DatabaseQueue {
103                inner: Default::default(),
104                send_latency: metrics
105                    .barrier_send_latency
106                    .with_guarded_label_values(&[database_id.database_id.to_string().as_str()]),
107            },
108            status,
109        }
110    }
111}
112
113impl<T> StatusQueue<T> {
114    fn mark_blocked(&mut self, reason: String) {
115        self.status = QueueStatus::Blocked(reason);
116    }
117
118    fn mark_ready(&mut self) -> bool {
119        let prev_blocked = self.status.is_blocked();
120        self.status = QueueStatus::Ready;
121        prev_blocked
122    }
123
124    fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
125        // We don't allow any command to be scheduled when the queue is blocked, except for dropping streaming jobs.
126        // Because we allow dropping streaming jobs when the cluster is under recovery, so we have to buffer the drop
127        // command and execute it when the cluster is ready to clean up it.
128        // TODO: this is just a workaround to allow dropping streaming jobs when the cluster is under recovery,
129        // we need to refine it when catalog and streaming metadata can be handled in a transactional way.
130        if let QueueStatus::Blocked(reason) = &self.status
131            && !matches!(
132                command,
133                Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
134            )
135        {
136            return Err(MetaError::unavailable(reason));
137        }
138        Ok(())
139    }
140}
141
142fn tracing_span() -> tracing::Span {
143    if tracing::Span::current().is_none() {
144        tracing::Span::none()
145    } else {
146        tracing::info_span!(
147            "barrier",
148            checkpoint = tracing::field::Empty,
149            epoch = tracing::field::Empty
150        )
151    }
152}
153
154/// The sender side of the barrier scheduling queue.
155/// Can be cloned and held by other managers to schedule and run barriers.
156#[derive(Clone)]
157pub struct BarrierScheduler {
158    inner: Arc<Inner>,
159
160    /// Used for getting the latest snapshot after `FLUSH`.
161    hummock_manager: HummockManagerRef,
162}
163
164impl BarrierScheduler {
165    /// Create a pair of [`BarrierScheduler`] and [`ScheduledBarriers`], for scheduling barriers
166    /// from different managers, and executing them in the barrier manager, respectively.
167    pub fn new_pair(
168        hummock_manager: HummockManagerRef,
169        metrics: Arc<MetaMetrics>,
170    ) -> (Self, ScheduledBarriers) {
171        let inner = Arc::new(Inner {
172            queue: Mutex::new(ScheduledQueue {
173                queue: Default::default(),
174                status: QueueStatus::Ready,
175            }),
176            changed_tx: watch::channel(()).0,
177            metrics,
178        });
179
180        (
181            Self {
182                inner: inner.clone(),
183                hummock_manager,
184            },
185            ScheduledBarriers { inner },
186        )
187    }
188
189    /// Push a scheduled barrier into the queue.
190    fn push(
191        &self,
192        database_id: DatabaseId,
193        scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
194    ) -> MetaResult<()> {
195        let mut queue = self.inner.queue.lock();
196        let scheduleds = scheduleds.into_iter().collect_vec();
197        scheduleds
198            .iter()
199            .try_for_each(|(command, _)| queue.validate_item(command))?;
200        let queue = queue.queue.entry(database_id).or_insert_with(|| {
201            DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
202        });
203        scheduleds
204            .iter()
205            .try_for_each(|(command, _)| queue.validate_item(command))?;
206        for (command, notifier) in scheduleds {
207            queue.queue.inner.push_back(ScheduledQueueItem {
208                command,
209                notifiers: vec![notifier],
210                send_latency_timer: queue.queue.send_latency.start_timer(),
211                span: tracing_span(),
212            });
213            if queue.queue.inner.len() == 1 {
214                self.inner.changed_tx.send(()).ok();
215            }
216        }
217        Ok(())
218    }
219
220    /// Try to cancel scheduled cmd for create streaming job, return true if the command exists previously and get cancelled.
221    pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, table_id: TableId) -> bool {
222        let queue = &mut self.inner.queue.lock();
223        let Some(queue) = queue.queue.get_mut(&database_id) else {
224            return false;
225        };
226
227        if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
228            if let Command::CreateStreamingJob { info, .. } = &scheduled.command
229                && info.stream_job_fragments.stream_job_id() == table_id
230            {
231                true
232            } else {
233                false
234            }
235        }) {
236            queue.queue.inner.remove(idx).unwrap();
237            true
238        } else {
239            false
240        }
241    }
242
243    /// Run multiple commands and return when they're all completely finished (i.e., collected). It's ensured that
244    /// multiple commands are executed continuously.
245    ///
246    /// Returns the barrier info of each command.
247    ///
248    /// TODO: atomicity of multiple commands is not guaranteed.
249    #[await_tree::instrument("run_commands({})", commands.iter().join(", "))]
250    async fn run_multiple_commands(
251        &self,
252        database_id: DatabaseId,
253        commands: Vec<Command>,
254    ) -> MetaResult<()> {
255        let mut contexts = Vec::with_capacity(commands.len());
256        let mut scheduleds = Vec::with_capacity(commands.len());
257
258        for command in commands {
259            let (started_tx, started_rx) = oneshot::channel();
260            let (collect_tx, collect_rx) = oneshot::channel();
261
262            contexts.push((started_rx, collect_rx));
263            scheduleds.push((
264                command,
265                Notifier {
266                    started: Some(started_tx),
267                    collected: Some(collect_tx),
268                },
269            ));
270        }
271
272        self.push(database_id, scheduleds)?;
273
274        for (injected_rx, collect_rx) in contexts {
275            // Wait for this command to be injected, and record the result.
276            tracing::trace!("waiting for injected_rx");
277            injected_rx
278                .instrument_await("wait_injected")
279                .await
280                .ok()
281                .context("failed to inject barrier")??;
282
283            tracing::trace!("waiting for collect_rx");
284            // Throw the error if it occurs when collecting this barrier.
285            collect_rx
286                .instrument_await("wait_collected")
287                .await
288                .ok()
289                .context("failed to collect barrier")??;
290        }
291
292        Ok(())
293    }
294
295    /// Run a command and return when it's completely finished (i.e., collected).
296    ///
297    /// Returns the barrier info of the actual command.
298    pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
299        tracing::trace!("run_command: {:?}", command);
300        let ret = self.run_multiple_commands(database_id, vec![command]).await;
301        tracing::trace!("run_command finished");
302        ret
303    }
304
305    /// Flush means waiting for the next barrier to collect.
306    pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
307        let start = Instant::now();
308
309        tracing::debug!("start barrier flush");
310        self.run_multiple_commands(database_id, vec![Command::Flush])
311            .await?;
312
313        let elapsed = Instant::now().duration_since(start);
314        tracing::debug!("barrier flushed in {:?}", elapsed);
315
316        let version_id = self.hummock_manager.get_version_id().await;
317        Ok(version_id)
318    }
319}
320
321/// The receiver side of the barrier scheduling queue.
322pub struct ScheduledBarriers {
323    inner: Arc<Inner>,
324}
325
326/// State specific to each database for barrier generation.
327#[derive(Debug)]
328pub struct DatabaseBarrierState {
329    pub barrier_interval: Option<Duration>,
330    pub checkpoint_frequency: Option<u64>,
331    // Force checkpoint in next barrier.
332    pub force_checkpoint: bool,
333    // The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true)
334    pub num_uncheckpointed_barrier: u64,
335}
336
337impl DatabaseBarrierState {
338    fn new(barrier_interval_ms: Option<u32>, checkpoint_frequency: Option<u64>) -> Self {
339        Self {
340            barrier_interval: barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64)),
341            checkpoint_frequency,
342            force_checkpoint: false,
343            num_uncheckpointed_barrier: 0,
344        }
345    }
346}
347
348/// Held by the [`crate::barrier::worker::GlobalBarrierWorker`] to execute these commands.
349#[derive(Default, Debug)]
350pub struct PeriodicBarriers {
351    /// Default system params for barrier interval and checkpoint frequency.
352    sys_barrier_interval: Duration,
353    sys_checkpoint_frequency: u64,
354    /// Per-database state.
355    databases: HashMap<DatabaseId, DatabaseBarrierState>,
356    /// Holds `IntervalStream` for each database, keyed by `DatabaseId`.
357    /// `StreamMap` will yield `(DatabaseId, Instant)` when a timer ticks.
358    timer_streams: StreamMap<DatabaseId, IntervalStream>,
359}
360
361impl PeriodicBarriers {
362    pub(super) fn new(
363        sys_barrier_interval: Duration,
364        sys_checkpoint_frequency: u64,
365        database_infos: Vec<Database>,
366    ) -> Self {
367        let mut databases = HashMap::with_capacity(database_infos.len());
368        let mut timer_streams = StreamMap::with_capacity(database_infos.len());
369        database_infos.into_iter().for_each(|database| {
370            let database_id: DatabaseId = database.id.into();
371            let barrier_interval_ms = database.barrier_interval_ms;
372            let checkpoint_frequency = database.checkpoint_frequency;
373            databases.insert(
374                database_id,
375                DatabaseBarrierState::new(barrier_interval_ms, checkpoint_frequency),
376            );
377            let duration = if let Some(ms) = barrier_interval_ms {
378                Duration::from_millis(ms as u64)
379            } else {
380                sys_barrier_interval
381            };
382            // Create an `IntervalStream` for the database with the specified interval.
383            let interval_stream = Self::new_interval_stream(duration);
384            timer_streams.insert(database_id, interval_stream);
385        });
386        Self {
387            sys_barrier_interval,
388            sys_checkpoint_frequency,
389            databases,
390            timer_streams,
391        }
392    }
393
394    // Create a new interval stream with the specified duration.
395    fn new_interval_stream(duration: Duration) -> IntervalStream {
396        let mut interval = tokio::time::interval(duration);
397        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
398        IntervalStream::new(interval)
399    }
400
401    /// Update the system barrier interval.
402    pub(super) fn set_sys_barrier_interval(&mut self, duration: Duration) {
403        if self.sys_barrier_interval == duration {
404            return;
405        }
406        self.sys_barrier_interval = duration;
407        // Reset the `IntervalStream` for all databases that use default param.
408        for (db_id, db_state) in &mut self.databases {
409            if db_state.barrier_interval.is_none() {
410                let interval_stream = Self::new_interval_stream(duration);
411                self.timer_streams.insert(*db_id, interval_stream);
412            }
413        }
414    }
415
416    /// Update the system checkpoint frequency.
417    pub fn set_sys_checkpoint_frequency(&mut self, frequency: u64) {
418        if self.sys_checkpoint_frequency == frequency {
419            return;
420        }
421        self.sys_checkpoint_frequency = frequency;
422        // Reset the `num_uncheckpointed_barrier` for all databases that use default param.
423        for db_state in self.databases.values_mut() {
424            if db_state.checkpoint_frequency.is_none() {
425                db_state.num_uncheckpointed_barrier = 0;
426                db_state.force_checkpoint = false;
427            }
428        }
429    }
430
431    pub(super) fn update_database_barrier(
432        &mut self,
433        database_id: DatabaseId,
434        barrier_interval_ms: Option<u32>,
435        checkpoint_frequency: Option<u64>,
436    ) {
437        match self.databases.entry(database_id) {
438            Entry::Occupied(mut entry) => {
439                let db_state = entry.get_mut();
440                db_state.barrier_interval =
441                    barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64));
442                db_state.checkpoint_frequency = checkpoint_frequency;
443                // Reset the `num_uncheckpointed_barrier` since the barrier interval or checkpoint frequency is changed.
444                db_state.num_uncheckpointed_barrier = 0;
445                db_state.force_checkpoint = false;
446            }
447            Entry::Vacant(entry) => {
448                entry.insert(DatabaseBarrierState::new(
449                    barrier_interval_ms,
450                    checkpoint_frequency,
451                ));
452            }
453        }
454
455        // If the database already has a timer stream, reset it with the new interval.
456        let duration = if let Some(ms) = barrier_interval_ms {
457            Duration::from_millis(ms as u64)
458        } else {
459            self.sys_barrier_interval
460        };
461        let interval_stream = Self::new_interval_stream(duration);
462        self.timer_streams.insert(database_id, interval_stream);
463    }
464
465    /// Make the `checkpoint` of the next barrier must be true.
466    pub fn force_checkpoint_in_next_barrier(&mut self, database_id: DatabaseId) {
467        if let Some(db_state) = self.databases.get_mut(&database_id) {
468            db_state.force_checkpoint = true;
469        } else {
470            warn!(
471                ?database_id,
472                "force checkpoint in next barrier for non-existing database"
473            );
474        }
475    }
476
477    #[await_tree::instrument]
478    pub(super) async fn next_barrier(
479        &mut self,
480        context: &impl GlobalBarrierWorkerContext,
481    ) -> NewBarrier {
482        let new_barrier = select! {
483            biased;
484            scheduled = context.next_scheduled() => {
485                let database_id = scheduled.database_id;
486                // Check if the database exists.
487                assert!(self.databases.contains_key(&database_id), "database {} not found in periodic barriers", database_id);
488                assert!(self.timer_streams.contains_key(&database_id), "timer stream for database {} not found in periodic barriers", database_id);
489                // New command will trigger the barriers, so reset the timer for the specific database.
490                for (db_id, timer_stream) in self.timer_streams.iter_mut() {
491                    if *db_id == database_id {
492                        timer_stream.as_mut().reset();
493                    }
494                }
495                let checkpoint = scheduled.command.need_checkpoint() || self.try_get_checkpoint(database_id);
496                NewBarrier {
497                    database_id: scheduled.database_id,
498                    command: Some((scheduled.command, scheduled.notifiers)),
499                    span: scheduled.span,
500                    checkpoint,
501                }
502            },
503            // If there is no database, we won't wait for `Interval`, but only wait for command.
504            // Normally it will not return None, because there is always at least one database.
505            next_timer = pending_on_none(self.timer_streams.next()) => {
506                let (database_id, _instant) = next_timer;
507                let checkpoint = self.try_get_checkpoint(database_id);
508                NewBarrier {
509                    database_id,
510                    command: None,
511                    span: tracing_span(),
512                    checkpoint,
513                }
514            }
515        };
516        self.update_num_uncheckpointed_barrier(new_barrier.database_id, new_barrier.checkpoint);
517
518        new_barrier
519    }
520
521    /// Whether the barrier(checkpoint = true) should be injected.
522    fn try_get_checkpoint(&self, database_id: DatabaseId) -> bool {
523        let db_state = self.databases.get(&database_id).unwrap();
524        let checkpoint_frequency = db_state
525            .checkpoint_frequency
526            .unwrap_or(self.sys_checkpoint_frequency);
527        db_state.num_uncheckpointed_barrier + 1 >= checkpoint_frequency || db_state.force_checkpoint
528    }
529
530    /// Update the `num_uncheckpointed_barrier`
531    fn update_num_uncheckpointed_barrier(&mut self, database_id: DatabaseId, checkpoint: bool) {
532        let db_state = self.databases.get_mut(&database_id).unwrap();
533        if checkpoint {
534            db_state.num_uncheckpointed_barrier = 0;
535            db_state.force_checkpoint = false;
536        } else {
537            db_state.num_uncheckpointed_barrier += 1;
538        }
539    }
540}
541
542impl ScheduledBarriers {
543    pub(super) async fn next_scheduled(&self) -> Scheduled {
544        'outer: loop {
545            let mut rx = self.inner.changed_tx.subscribe();
546            {
547                let mut queue = self.inner.queue.lock();
548                if queue.status.is_blocked() {
549                    continue;
550                }
551                for (database_id, queue) in &mut queue.queue {
552                    if queue.status.is_blocked() {
553                        continue;
554                    }
555                    if let Some(item) = queue.queue.inner.pop_front() {
556                        item.send_latency_timer.observe_duration();
557                        break 'outer Scheduled {
558                            database_id: *database_id,
559                            command: item.command,
560                            notifiers: item.notifiers,
561                            span: item.span,
562                        };
563                    }
564                }
565            }
566            rx.changed().await.unwrap();
567        }
568    }
569}
570
571pub(super) enum MarkReadyOptions {
572    Database(DatabaseId),
573    Global {
574        blocked_databases: HashSet<DatabaseId>,
575    },
576}
577
578impl ScheduledBarriers {
579    /// Pre buffered drop and cancel command, return true if any.
580    pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> bool {
581        self.pre_apply_drop_cancel_scheduled(database_id)
582    }
583
584    /// Mark command scheduler as blocked and abort all queued scheduled command and notify with
585    /// specific reason.
586    pub(super) fn abort_and_mark_blocked(
587        &self,
588        database_id: Option<DatabaseId>,
589        reason: impl Into<String>,
590    ) {
591        let mut queue = self.inner.queue.lock();
592        fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
593            format!("database {} unavailable {}", database_id, reason)
594        }
595        fn mark_blocked_and_notify_failed(
596            database_id: DatabaseId,
597            queue: &mut DatabaseScheduledQueue,
598            reason: &String,
599        ) {
600            let reason = database_blocked_reason(database_id, reason);
601            let err: MetaError = anyhow!("{}", reason).into();
602            queue.mark_blocked(reason);
603            while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
604                notifiers
605                    .into_iter()
606                    .for_each(|notify| notify.notify_collection_failed(err.clone()))
607            }
608        }
609        if let Some(database_id) = database_id {
610            let reason = reason.into();
611            match queue.queue.entry(database_id) {
612                Entry::Occupied(entry) => {
613                    let queue = entry.into_mut();
614                    if queue.status.is_blocked() {
615                        if cfg!(debug_assertions) {
616                            panic!("database {} marked as blocked twice", database_id);
617                        } else {
618                            warn!(?database_id, "database marked as blocked twice");
619                        }
620                    }
621                    info!(?database_id, "database marked as blocked");
622                    mark_blocked_and_notify_failed(database_id, queue, &reason);
623                }
624                Entry::Vacant(entry) => {
625                    entry.insert(DatabaseScheduledQueue::new(
626                        database_id,
627                        &self.inner.metrics,
628                        QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
629                    ));
630                }
631            }
632        } else {
633            let reason = reason.into();
634            if queue.status.is_blocked() {
635                if cfg!(debug_assertions) {
636                    panic!("cluster marked as blocked twice");
637                } else {
638                    warn!("cluster marked as blocked twice");
639                }
640            }
641            info!("cluster marked as blocked");
642            queue.mark_blocked(reason.clone());
643            for (database_id, queue) in &mut queue.queue {
644                mark_blocked_and_notify_failed(*database_id, queue, &reason);
645            }
646        }
647    }
648
649    /// Mark command scheduler as ready to accept new command.
650    pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
651        let mut queue = self.inner.queue.lock();
652        let queue = &mut *queue;
653        match options {
654            MarkReadyOptions::Database(database_id) => {
655                info!(?database_id, "database marked as ready");
656                let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
657                    DatabaseScheduledQueue::new(
658                        database_id,
659                        &self.inner.metrics,
660                        QueueStatus::Ready,
661                    )
662                });
663                if !database_queue.status.is_blocked() {
664                    if cfg!(debug_assertions) {
665                        panic!("database {} marked as ready twice", database_id);
666                    } else {
667                        warn!(?database_id, "database marked as ready twice");
668                    }
669                }
670                if database_queue.mark_ready()
671                    && !queue.status.is_blocked()
672                    && !database_queue.queue.inner.is_empty()
673                {
674                    self.inner.changed_tx.send(()).ok();
675                }
676            }
677            MarkReadyOptions::Global { blocked_databases } => {
678                if !queue.status.is_blocked() {
679                    if cfg!(debug_assertions) {
680                        panic!("cluster marked as ready twice");
681                    } else {
682                        warn!("cluster marked as ready twice");
683                    }
684                }
685                info!(?blocked_databases, "cluster marked as ready");
686                let prev_blocked = queue.mark_ready();
687                for database_id in &blocked_databases {
688                    queue.queue.entry(*database_id).or_insert_with(|| {
689                        DatabaseScheduledQueue::new(
690                            *database_id,
691                            &self.inner.metrics,
692                            QueueStatus::Blocked(format!(
693                                "database {} failed to recover in global recovery",
694                                database_id
695                            )),
696                        )
697                    });
698                }
699                for (database_id, queue) in &mut queue.queue {
700                    if !blocked_databases.contains(database_id) {
701                        queue.mark_ready();
702                    }
703                }
704                if prev_blocked
705                    && queue
706                        .queue
707                        .values()
708                        .any(|database_queue| !database_queue.queue.inner.is_empty())
709                {
710                    self.inner.changed_tx.send(()).ok();
711                }
712            }
713        }
714    }
715
716    /// Try to pre apply drop and cancel scheduled command and return them if any.
717    /// It should only be called in recovery.
718    pub(super) fn pre_apply_drop_cancel_scheduled(&self, database_id: Option<DatabaseId>) -> bool {
719        let mut queue = self.inner.queue.lock();
720        let mut applied = false;
721
722        let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
723            while let Some(ScheduledQueueItem {
724                notifiers, command, ..
725            }) = queue.queue.inner.pop_front()
726            {
727                match command {
728                    Command::DropStreamingJobs { .. } => {
729                        applied = true;
730                    }
731                    Command::DropSubscription { .. } => {}
732                    _ => {
733                        unreachable!("only drop and cancel streaming jobs should be buffered");
734                    }
735                }
736                notifiers.into_iter().for_each(|notify| {
737                    notify.notify_collected();
738                });
739            }
740        };
741
742        if let Some(database_id) = database_id {
743            assert_matches!(queue.status, QueueStatus::Ready);
744            if let Some(queue) = queue.queue.get_mut(&database_id) {
745                assert_matches!(queue.status, QueueStatus::Blocked(_));
746                pre_apply_drop_cancel(queue);
747            }
748        } else {
749            assert_matches!(queue.status, QueueStatus::Blocked(_));
750            for queue in queue.queue.values_mut() {
751                pre_apply_drop_cancel(queue);
752            }
753        }
754
755        applied
756    }
757}
758
759#[cfg(test)]
760mod tests {
761    use super::*;
762
763    fn create_test_database(
764        id: u32,
765        barrier_interval_ms: Option<u32>,
766        checkpoint_frequency: Option<u64>,
767    ) -> Database {
768        Database {
769            id,
770            name: format!("test_db_{}", id),
771            barrier_interval_ms,
772            checkpoint_frequency,
773            ..Default::default()
774        }
775    }
776
777    // Mock context for testing next_barrier
778    struct MockGlobalBarrierWorkerContext {
779        scheduled_rx: tokio::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<Scheduled>>,
780    }
781
782    impl MockGlobalBarrierWorkerContext {
783        fn new() -> (Self, tokio::sync::mpsc::UnboundedSender<Scheduled>) {
784            let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
785            (
786                Self {
787                    scheduled_rx: tokio::sync::Mutex::new(rx),
788                },
789                tx,
790            )
791        }
792    }
793
794    impl GlobalBarrierWorkerContext for MockGlobalBarrierWorkerContext {
795        async fn next_scheduled(&self) -> Scheduled {
796            self.scheduled_rx.lock().await.recv().await.unwrap()
797        }
798
799        async fn commit_epoch(
800            &self,
801            _commit_info: crate::hummock::CommitEpochInfo,
802        ) -> MetaResult<risingwave_pb::hummock::HummockVersionStats> {
803            unimplemented!()
804        }
805
806        fn abort_and_mark_blocked(
807            &self,
808            _database_id: Option<DatabaseId>,
809            _recovery_reason: crate::barrier::RecoveryReason,
810        ) {
811            unimplemented!()
812        }
813
814        fn mark_ready(&self, _options: MarkReadyOptions) {
815            unimplemented!()
816        }
817
818        async fn post_collect_command<'a>(
819            &'a self,
820            _command: &'a crate::barrier::command::CommandContext,
821        ) -> MetaResult<()> {
822            unimplemented!()
823        }
824
825        async fn notify_creating_job_failed(&self, _database_id: Option<DatabaseId>, _err: String) {
826            unimplemented!()
827        }
828
829        async fn finish_creating_job(
830            &self,
831            _job: crate::barrier::progress::TrackingJob,
832        ) -> MetaResult<()> {
833            unimplemented!()
834        }
835
836        async fn new_control_stream(
837            &self,
838            _node: &risingwave_pb::common::WorkerNode,
839            _init_request: &risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest,
840        ) -> MetaResult<risingwave_rpc_client::StreamingControlHandle> {
841            unimplemented!()
842        }
843
844        async fn reload_runtime_info(
845            &self,
846        ) -> MetaResult<crate::barrier::BarrierWorkerRuntimeInfoSnapshot> {
847            unimplemented!()
848        }
849
850        async fn reload_database_runtime_info(
851            &self,
852            _database_id: DatabaseId,
853        ) -> MetaResult<Option<crate::barrier::DatabaseRuntimeInfoSnapshot>> {
854            unimplemented!()
855        }
856    }
857
858    #[tokio::test]
859    async fn test_next_barrier_with_different_intervals() {
860        // Create databases with different intervals
861        let databases = vec![
862            create_test_database(1, Some(50), Some(2)), // 50ms interval, checkpoint every 2
863            create_test_database(2, Some(100), Some(3)), // 100ms interval, checkpoint every 3
864            create_test_database(3, None, Some(5)), /* Use system default (200ms), checkpoint every 5 */
865        ];
866
867        let mut periodic = PeriodicBarriers::new(
868            Duration::from_millis(200), // System default
869            10,                         // System checkpoint frequency
870            databases,
871        );
872
873        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
874
875        // Call next_barrier for each database once, because the first tick is returned immediately
876        for _ in 0..3 {
877            let barrier = periodic.next_barrier(&context).await;
878            assert!(barrier.command.is_none()); // Should be a periodic barrier, not a scheduled command
879            assert!(!barrier.checkpoint); // First barrier shouldn't be a checkpoint
880        }
881
882        // Since we have 3 databases with intervals 50ms, 100ms, and 200ms,
883        // the first barrier should come from database 1 (50ms interval)
884        let start_time = Instant::now();
885        let barrier = periodic.next_barrier(&context).await;
886        let elapsed = start_time.elapsed();
887
888        // Verify the barrier properties
889        assert_eq!(barrier.database_id, DatabaseId::from(1));
890        assert!(barrier.command.is_none()); // Should be a periodic barrier, not a scheduled command
891        assert!(barrier.checkpoint); // Second barrier should be checkpoint for database 1
892        assert!(
893            elapsed <= Duration::from_millis(100),
894            "Elapsed time exceeded: {:?}",
895            elapsed
896        ); // Should be around 50ms
897
898        // Verify that the checkpoint frequency works
899        let db1_id = DatabaseId::from(1);
900        let db1_state = periodic.databases.get_mut(&db1_id).unwrap();
901        assert_eq!(db1_state.num_uncheckpointed_barrier, 0); // Should reset after checkpoint
902    }
903
904    #[tokio::test]
905    async fn test_next_barrier_with_scheduled_command() {
906        let databases = vec![
907            create_test_database(1, Some(1000), Some(2)), // Long interval to avoid interference
908        ];
909
910        let mut periodic = PeriodicBarriers::new(Duration::from_millis(1000), 10, databases);
911
912        let (context, tx) = MockGlobalBarrierWorkerContext::new();
913
914        // Skip the first barrier to let the timers start
915        periodic.next_barrier(&context).await;
916
917        // Schedule a command
918        let scheduled_command = Scheduled {
919            database_id: DatabaseId::from(1),
920            command: Command::Flush,
921            notifiers: vec![],
922            span: tracing::Span::none(),
923        };
924
925        // Send scheduled command in background
926        let tx_clone = tx.clone();
927        tokio::spawn(async move {
928            tokio::time::sleep(Duration::from_millis(10)).await;
929            tx_clone.send(scheduled_command).unwrap();
930        });
931
932        let barrier = periodic.next_barrier(&context).await;
933
934        // Should return the scheduled command
935        assert!(barrier.command.is_some());
936        assert_eq!(barrier.database_id, DatabaseId::from(1));
937
938        if let Some((command, _)) = barrier.command {
939            assert!(matches!(command, Command::Flush));
940        }
941    }
942
943    #[tokio::test]
944    async fn test_next_barrier_multiple_databases_timing() {
945        let databases = vec![
946            create_test_database(1, Some(30), Some(10)), // Fast interval
947            create_test_database(2, Some(100), Some(10)), // Slower interval
948        ];
949
950        let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 10, databases);
951
952        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
953
954        // Skip first 2 barriers to let the timers start
955        for _ in 0..2 {
956            periodic.next_barrier(&context).await;
957        }
958
959        let mut barrier_counts = HashMap::new();
960
961        // Collect barriers for a short period
962        let mut barriers = Vec::new();
963        for _ in 0..5 {
964            let barrier = periodic.next_barrier(&context).await;
965            barriers.push(barrier);
966        }
967
968        // Count barriers per database
969        for barrier in barriers {
970            *barrier_counts.entry(barrier.database_id).or_insert(0) += 1;
971        }
972
973        // Database 1 (30ms interval) should have more barriers than database 2 (100ms interval)
974        let db1_count = barrier_counts.get(&DatabaseId::from(1)).unwrap_or(&0);
975        let db2_count = barrier_counts.get(&DatabaseId::from(2)).unwrap_or(&0);
976
977        // Due to timing, db1 should generally have more barriers, but allow for some variance
978        assert!(*db1_count >= *db2_count);
979    }
980
981    #[tokio::test]
982    async fn test_next_barrier_force_checkpoint() {
983        let databases = vec![create_test_database(1, Some(100), Some(10))];
984
985        let mut periodic = PeriodicBarriers::new(Duration::from_millis(100), 10, databases);
986
987        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
988
989        // Force checkpoint for next barrier
990        periodic.force_checkpoint_in_next_barrier(DatabaseId::from(1));
991
992        let barrier = periodic.next_barrier(&context).await;
993
994        // Should be a checkpoint barrier due to force_checkpoint
995        assert!(barrier.checkpoint);
996        assert_eq!(barrier.database_id, DatabaseId::from(1));
997        assert!(barrier.command.is_none());
998    }
999
1000    #[tokio::test]
1001    async fn test_next_barrier_checkpoint_frequency() {
1002        let databases = vec![create_test_database(1, Some(50), Some(2))]; // Checkpoint every 2 barriers
1003
1004        let mut periodic = PeriodicBarriers::new(Duration::from_millis(50), 10, databases);
1005
1006        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1007
1008        // First barrier - should not be checkpoint
1009        let barrier1 = periodic.next_barrier(&context).await;
1010        assert!(!barrier1.checkpoint);
1011
1012        // Second barrier - should be checkpoint (frequency = 2)
1013        let barrier2 = periodic.next_barrier(&context).await;
1014        assert!(barrier2.checkpoint);
1015
1016        // Third barrier - should not be checkpoint (counter reset)
1017        let barrier3 = periodic.next_barrier(&context).await;
1018        assert!(!barrier3.checkpoint);
1019    }
1020
1021    #[tokio::test]
1022    async fn test_update_database_barrier() {
1023        let databases = vec![create_test_database(1, Some(1000), Some(10))];
1024
1025        let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 20, databases);
1026
1027        // Update existing database
1028        periodic.update_database_barrier(DatabaseId::from(1), Some(2000), Some(15));
1029
1030        let db_state = periodic.databases.get(&DatabaseId::from(1)).unwrap();
1031        assert_eq!(db_state.barrier_interval, Some(Duration::from_millis(2000)));
1032        assert_eq!(db_state.checkpoint_frequency, Some(15));
1033        assert_eq!(db_state.num_uncheckpointed_barrier, 0);
1034        assert!(!db_state.force_checkpoint);
1035
1036        // Add new database
1037        periodic.update_database_barrier(DatabaseId::from(2), None, None);
1038
1039        assert!(periodic.databases.contains_key(&DatabaseId::from(2)));
1040        let db2_state = periodic.databases.get(&DatabaseId::from(2)).unwrap();
1041        assert_eq!(db2_state.barrier_interval, None);
1042        assert_eq!(db2_state.checkpoint_frequency, None);
1043    }
1044}