risingwave_meta/barrier/
schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, anyhow};
21use assert_matches::assert_matches;
22use await_tree::InstrumentAwait;
23use itertools::Itertools;
24use parking_lot::Mutex;
25use prometheus::HistogramTimer;
26use risingwave_common::catalog::{DatabaseId, TableId};
27use risingwave_common::metrics::LabelGuardedHistogram;
28use risingwave_hummock_sdk::HummockVersionId;
29use risingwave_pb::catalog::Database;
30use rw_futures_util::pending_on_none;
31use tokio::select;
32use tokio::sync::{oneshot, watch};
33use tokio_stream::wrappers::IntervalStream;
34use tokio_stream::{StreamExt, StreamMap};
35use tracing::{info, warn};
36
37use super::notifier::Notifier;
38use super::{Command, Scheduled};
39use crate::barrier::context::GlobalBarrierWorkerContext;
40use crate::hummock::HummockManagerRef;
41use crate::rpc::metrics::MetaMetrics;
42use crate::{MetaError, MetaResult};
43
44pub(super) struct NewBarrier {
45    pub database_id: DatabaseId,
46    pub command: Option<(Command, Vec<Notifier>)>,
47    pub span: tracing::Span,
48    pub checkpoint: bool,
49}
50
51/// A queue for scheduling barriers.
52///
53/// We manually implement one here instead of using channels since we may need to update the front
54/// of the queue to add some notifiers for instant flushes.
55struct Inner {
56    queue: Mutex<ScheduledQueue>,
57
58    /// When `queue` is not empty anymore, all subscribers of this watcher will be notified.
59    changed_tx: watch::Sender<()>,
60
61    /// Used for recording send latency of each barrier.
62    metrics: Arc<MetaMetrics>,
63}
64
65#[derive(Debug)]
66enum QueueStatus {
67    /// The queue is ready to accept new command.
68    Ready,
69    /// The queue is blocked to accept new command with the given reason.
70    Blocked(String),
71}
72
73impl QueueStatus {
74    fn is_blocked(&self) -> bool {
75        matches!(self, Self::Blocked(_))
76    }
77}
78
79struct ScheduledQueueItem {
80    command: Command,
81    notifiers: Vec<Notifier>,
82    send_latency_timer: HistogramTimer,
83    span: tracing::Span,
84}
85
86struct StatusQueue<T> {
87    queue: T,
88    status: QueueStatus,
89}
90
91struct DatabaseQueue {
92    inner: VecDeque<ScheduledQueueItem>,
93    send_latency: LabelGuardedHistogram,
94}
95
96type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
97type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
98
99impl DatabaseScheduledQueue {
100    fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
101        Self {
102            queue: DatabaseQueue {
103                inner: Default::default(),
104                send_latency: metrics
105                    .barrier_send_latency
106                    .with_guarded_label_values(&[database_id.database_id.to_string().as_str()]),
107            },
108            status,
109        }
110    }
111}
112
113impl<T> StatusQueue<T> {
114    fn mark_blocked(&mut self, reason: String) {
115        self.status = QueueStatus::Blocked(reason);
116    }
117
118    fn mark_ready(&mut self) -> bool {
119        let prev_blocked = self.status.is_blocked();
120        self.status = QueueStatus::Ready;
121        prev_blocked
122    }
123
124    fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
125        // We don't allow any command to be scheduled when the queue is blocked, except for dropping streaming jobs.
126        // Because we allow dropping streaming jobs when the cluster is under recovery, so we have to buffer the drop
127        // command and execute it when the cluster is ready to clean up it.
128        // TODO: this is just a workaround to allow dropping streaming jobs when the cluster is under recovery,
129        // we need to refine it when catalog and streaming metadata can be handled in a transactional way.
130        if let QueueStatus::Blocked(reason) = &self.status
131            && !matches!(
132                command,
133                Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
134            )
135        {
136            return Err(MetaError::unavailable(reason));
137        }
138        Ok(())
139    }
140}
141
142fn tracing_span() -> tracing::Span {
143    if tracing::Span::current().is_none() {
144        tracing::Span::none()
145    } else {
146        tracing::info_span!(
147            "barrier",
148            checkpoint = tracing::field::Empty,
149            epoch = tracing::field::Empty
150        )
151    }
152}
153
154/// The sender side of the barrier scheduling queue.
155/// Can be cloned and held by other managers to schedule and run barriers.
156#[derive(Clone)]
157pub struct BarrierScheduler {
158    inner: Arc<Inner>,
159
160    /// Used for getting the latest snapshot after `FLUSH`.
161    hummock_manager: HummockManagerRef,
162}
163
164impl BarrierScheduler {
165    /// Create a pair of [`BarrierScheduler`] and [`ScheduledBarriers`], for scheduling barriers
166    /// from different managers, and executing them in the barrier manager, respectively.
167    pub fn new_pair(
168        hummock_manager: HummockManagerRef,
169        metrics: Arc<MetaMetrics>,
170    ) -> (Self, ScheduledBarriers) {
171        let inner = Arc::new(Inner {
172            queue: Mutex::new(ScheduledQueue {
173                queue: Default::default(),
174                status: QueueStatus::Ready,
175            }),
176            changed_tx: watch::channel(()).0,
177            metrics,
178        });
179
180        (
181            Self {
182                inner: inner.clone(),
183                hummock_manager,
184            },
185            ScheduledBarriers { inner },
186        )
187    }
188
189    /// Push a scheduled barrier into the queue.
190    fn push(
191        &self,
192        database_id: DatabaseId,
193        scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
194    ) -> MetaResult<()> {
195        let mut queue = self.inner.queue.lock();
196        let scheduleds = scheduleds.into_iter().collect_vec();
197        scheduleds
198            .iter()
199            .try_for_each(|(command, _)| queue.validate_item(command))?;
200        let queue = queue.queue.entry(database_id).or_insert_with(|| {
201            DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
202        });
203        scheduleds
204            .iter()
205            .try_for_each(|(command, _)| queue.validate_item(command))?;
206        for (command, notifier) in scheduleds {
207            queue.queue.inner.push_back(ScheduledQueueItem {
208                command,
209                notifiers: vec![notifier],
210                send_latency_timer: queue.queue.send_latency.start_timer(),
211                span: tracing_span(),
212            });
213            if queue.queue.inner.len() == 1 {
214                self.inner.changed_tx.send(()).ok();
215            }
216        }
217        Ok(())
218    }
219
220    /// Try to cancel scheduled cmd for create streaming job, return true if the command exists previously and get cancelled.
221    pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, table_id: TableId) -> bool {
222        let queue = &mut self.inner.queue.lock();
223        let Some(queue) = queue.queue.get_mut(&database_id) else {
224            return false;
225        };
226
227        if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
228            if let Command::CreateStreamingJob { info, .. } = &scheduled.command
229                && info.stream_job_fragments.stream_job_id() == table_id
230            {
231                true
232            } else {
233                false
234            }
235        }) {
236            queue.queue.inner.remove(idx).unwrap();
237            true
238        } else {
239            false
240        }
241    }
242
243    /// Run multiple commands and return when they're all completely finished (i.e., collected). It's ensured that
244    /// multiple commands are executed continuously.
245    ///
246    /// Returns the barrier info of each command.
247    ///
248    /// TODO: atomicity of multiple commands is not guaranteed.
249    #[await_tree::instrument("run_commands({})", commands.iter().join(", "))]
250    async fn run_multiple_commands(
251        &self,
252        database_id: DatabaseId,
253        commands: Vec<Command>,
254    ) -> MetaResult<()> {
255        let mut contexts = Vec::with_capacity(commands.len());
256        let mut scheduleds = Vec::with_capacity(commands.len());
257
258        for command in commands {
259            let (started_tx, started_rx) = oneshot::channel();
260            let (collect_tx, collect_rx) = oneshot::channel();
261
262            contexts.push((started_rx, collect_rx));
263            scheduleds.push((
264                command,
265                Notifier {
266                    started: Some(started_tx),
267                    collected: Some(collect_tx),
268                },
269            ));
270        }
271
272        self.push(database_id, scheduleds)?;
273
274        for (injected_rx, collect_rx) in contexts {
275            // Wait for this command to be injected, and record the result.
276            tracing::trace!("waiting for injected_rx");
277            injected_rx
278                .instrument_await("wait_injected")
279                .await
280                .ok()
281                .context("failed to inject barrier")??;
282
283            tracing::trace!("waiting for collect_rx");
284            // Throw the error if it occurs when collecting this barrier.
285            collect_rx
286                .instrument_await("wait_collected")
287                .await
288                .ok()
289                .context("failed to collect barrier")??;
290        }
291
292        Ok(())
293    }
294
295    /// Run a command and return when it's completely finished (i.e., collected).
296    ///
297    /// Returns the barrier info of the actual command.
298    pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
299        tracing::trace!("run_command: {:?}", command);
300        let ret = self.run_multiple_commands(database_id, vec![command]).await;
301        tracing::trace!("run_command finished");
302        ret
303    }
304
305    /// Schedule a command without waiting for it to be executed.
306    pub fn run_command_no_wait(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
307        tracing::trace!("run_command_no_wait: {:?}", command);
308        self.push(database_id, vec![(command, Notifier::default())])
309    }
310
311    /// Flush means waiting for the next barrier to collect.
312    pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
313        let start = Instant::now();
314
315        tracing::debug!("start barrier flush");
316        self.run_multiple_commands(database_id, vec![Command::Flush])
317            .await?;
318
319        let elapsed = Instant::now().duration_since(start);
320        tracing::debug!("barrier flushed in {:?}", elapsed);
321
322        let version_id = self.hummock_manager.get_version_id().await;
323        Ok(version_id)
324    }
325}
326
327/// The receiver side of the barrier scheduling queue.
328pub struct ScheduledBarriers {
329    inner: Arc<Inner>,
330}
331
332/// State specific to each database for barrier generation.
333#[derive(Debug)]
334pub struct DatabaseBarrierState {
335    pub barrier_interval: Option<Duration>,
336    pub checkpoint_frequency: Option<u64>,
337    // Force checkpoint in next barrier.
338    pub force_checkpoint: bool,
339    // The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true)
340    pub num_uncheckpointed_barrier: u64,
341}
342
343impl DatabaseBarrierState {
344    fn new(barrier_interval_ms: Option<u32>, checkpoint_frequency: Option<u64>) -> Self {
345        Self {
346            barrier_interval: barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64)),
347            checkpoint_frequency,
348            force_checkpoint: false,
349            num_uncheckpointed_barrier: 0,
350        }
351    }
352}
353
354/// Held by the [`crate::barrier::worker::GlobalBarrierWorker`] to execute these commands.
355#[derive(Default, Debug)]
356pub struct PeriodicBarriers {
357    /// Default system params for barrier interval and checkpoint frequency.
358    sys_barrier_interval: Duration,
359    sys_checkpoint_frequency: u64,
360    /// Per-database state.
361    databases: HashMap<DatabaseId, DatabaseBarrierState>,
362    /// Holds `IntervalStream` for each database, keyed by `DatabaseId`.
363    /// `StreamMap` will yield `(DatabaseId, Instant)` when a timer ticks.
364    timer_streams: StreamMap<DatabaseId, IntervalStream>,
365}
366
367impl PeriodicBarriers {
368    pub(super) fn new(
369        sys_barrier_interval: Duration,
370        sys_checkpoint_frequency: u64,
371        database_infos: Vec<Database>,
372    ) -> Self {
373        let mut databases = HashMap::with_capacity(database_infos.len());
374        let mut timer_streams = StreamMap::with_capacity(database_infos.len());
375        database_infos.into_iter().for_each(|database| {
376            let database_id: DatabaseId = database.id.into();
377            let barrier_interval_ms = database.barrier_interval_ms;
378            let checkpoint_frequency = database.checkpoint_frequency;
379            databases.insert(
380                database_id,
381                DatabaseBarrierState::new(barrier_interval_ms, checkpoint_frequency),
382            );
383            let duration = if let Some(ms) = barrier_interval_ms {
384                Duration::from_millis(ms as u64)
385            } else {
386                sys_barrier_interval
387            };
388            // Create an `IntervalStream` for the database with the specified interval.
389            let interval_stream = Self::new_interval_stream(duration);
390            timer_streams.insert(database_id, interval_stream);
391        });
392        Self {
393            sys_barrier_interval,
394            sys_checkpoint_frequency,
395            databases,
396            timer_streams,
397        }
398    }
399
400    // Create a new interval stream with the specified duration.
401    fn new_interval_stream(duration: Duration) -> IntervalStream {
402        let mut interval = tokio::time::interval(duration);
403        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
404        IntervalStream::new(interval)
405    }
406
407    /// Update the system barrier interval.
408    pub(super) fn set_sys_barrier_interval(&mut self, duration: Duration) {
409        if self.sys_barrier_interval == duration {
410            return;
411        }
412        self.sys_barrier_interval = duration;
413        // Reset the `IntervalStream` for all databases that use default param.
414        for (db_id, db_state) in &mut self.databases {
415            if db_state.barrier_interval.is_none() {
416                let interval_stream = Self::new_interval_stream(duration);
417                self.timer_streams.insert(*db_id, interval_stream);
418            }
419        }
420    }
421
422    /// Update the system checkpoint frequency.
423    pub fn set_sys_checkpoint_frequency(&mut self, frequency: u64) {
424        if self.sys_checkpoint_frequency == frequency {
425            return;
426        }
427        self.sys_checkpoint_frequency = frequency;
428        // Reset the `num_uncheckpointed_barrier` for all databases that use default param.
429        for db_state in self.databases.values_mut() {
430            if db_state.checkpoint_frequency.is_none() {
431                db_state.num_uncheckpointed_barrier = 0;
432                db_state.force_checkpoint = false;
433            }
434        }
435    }
436
437    pub(super) fn update_database_barrier(
438        &mut self,
439        database_id: DatabaseId,
440        barrier_interval_ms: Option<u32>,
441        checkpoint_frequency: Option<u64>,
442    ) {
443        match self.databases.entry(database_id) {
444            Entry::Occupied(mut entry) => {
445                let db_state = entry.get_mut();
446                db_state.barrier_interval =
447                    barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64));
448                db_state.checkpoint_frequency = checkpoint_frequency;
449                // Reset the `num_uncheckpointed_barrier` since the barrier interval or checkpoint frequency is changed.
450                db_state.num_uncheckpointed_barrier = 0;
451                db_state.force_checkpoint = false;
452            }
453            Entry::Vacant(entry) => {
454                entry.insert(DatabaseBarrierState::new(
455                    barrier_interval_ms,
456                    checkpoint_frequency,
457                ));
458            }
459        }
460
461        // If the database already has a timer stream, reset it with the new interval.
462        let duration = if let Some(ms) = barrier_interval_ms {
463            Duration::from_millis(ms as u64)
464        } else {
465            self.sys_barrier_interval
466        };
467        let interval_stream = Self::new_interval_stream(duration);
468        self.timer_streams.insert(database_id, interval_stream);
469    }
470
471    /// Make the `checkpoint` of the next barrier must be true.
472    pub fn force_checkpoint_in_next_barrier(&mut self, database_id: DatabaseId) {
473        if let Some(db_state) = self.databases.get_mut(&database_id) {
474            db_state.force_checkpoint = true;
475        } else {
476            warn!(
477                ?database_id,
478                "force checkpoint in next barrier for non-existing database"
479            );
480        }
481    }
482
483    #[await_tree::instrument]
484    pub(super) async fn next_barrier(
485        &mut self,
486        context: &impl GlobalBarrierWorkerContext,
487    ) -> NewBarrier {
488        let new_barrier = select! {
489            biased;
490            scheduled = context.next_scheduled() => {
491                let database_id = scheduled.database_id;
492                // Check if the database exists.
493                assert!(self.databases.contains_key(&database_id), "database {} not found in periodic barriers", database_id);
494                assert!(self.timer_streams.contains_key(&database_id), "timer stream for database {} not found in periodic barriers", database_id);
495                // New command will trigger the barriers, so reset the timer for the specific database.
496                for (db_id, timer_stream) in self.timer_streams.iter_mut() {
497                    if *db_id == database_id {
498                        timer_stream.as_mut().reset();
499                    }
500                }
501                let checkpoint = scheduled.command.need_checkpoint() || self.try_get_checkpoint(database_id);
502                NewBarrier {
503                    database_id: scheduled.database_id,
504                    command: Some((scheduled.command, scheduled.notifiers)),
505                    span: scheduled.span,
506                    checkpoint,
507                }
508            },
509            // If there is no database, we won't wait for `Interval`, but only wait for command.
510            // Normally it will not return None, because there is always at least one database.
511            next_timer = pending_on_none(self.timer_streams.next()) => {
512                let (database_id, _instant) = next_timer;
513                let checkpoint = self.try_get_checkpoint(database_id);
514                NewBarrier {
515                    database_id,
516                    command: None,
517                    span: tracing_span(),
518                    checkpoint,
519                }
520            }
521        };
522        self.update_num_uncheckpointed_barrier(new_barrier.database_id, new_barrier.checkpoint);
523
524        new_barrier
525    }
526
527    /// Whether the barrier(checkpoint = true) should be injected.
528    fn try_get_checkpoint(&self, database_id: DatabaseId) -> bool {
529        let db_state = self.databases.get(&database_id).unwrap();
530        let checkpoint_frequency = db_state
531            .checkpoint_frequency
532            .unwrap_or(self.sys_checkpoint_frequency);
533        db_state.num_uncheckpointed_barrier + 1 >= checkpoint_frequency || db_state.force_checkpoint
534    }
535
536    /// Update the `num_uncheckpointed_barrier`
537    fn update_num_uncheckpointed_barrier(&mut self, database_id: DatabaseId, checkpoint: bool) {
538        let db_state = self.databases.get_mut(&database_id).unwrap();
539        if checkpoint {
540            db_state.num_uncheckpointed_barrier = 0;
541            db_state.force_checkpoint = false;
542        } else {
543            db_state.num_uncheckpointed_barrier += 1;
544        }
545    }
546}
547
548impl ScheduledBarriers {
549    pub(super) async fn next_scheduled(&self) -> Scheduled {
550        'outer: loop {
551            let mut rx = self.inner.changed_tx.subscribe();
552            {
553                let mut queue = self.inner.queue.lock();
554                if queue.status.is_blocked() {
555                    continue;
556                }
557                for (database_id, queue) in &mut queue.queue {
558                    if queue.status.is_blocked() {
559                        continue;
560                    }
561                    if let Some(item) = queue.queue.inner.pop_front() {
562                        item.send_latency_timer.observe_duration();
563                        break 'outer Scheduled {
564                            database_id: *database_id,
565                            command: item.command,
566                            notifiers: item.notifiers,
567                            span: item.span,
568                        };
569                    }
570                }
571            }
572            rx.changed().await.unwrap();
573        }
574    }
575}
576
577pub(super) enum MarkReadyOptions {
578    Database(DatabaseId),
579    Global {
580        blocked_databases: HashSet<DatabaseId>,
581    },
582}
583
584impl ScheduledBarriers {
585    /// Pre buffered drop and cancel command, return true if any.
586    pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> bool {
587        self.pre_apply_drop_cancel_scheduled(database_id)
588    }
589
590    /// Mark command scheduler as blocked and abort all queued scheduled command and notify with
591    /// specific reason.
592    pub(super) fn abort_and_mark_blocked(
593        &self,
594        database_id: Option<DatabaseId>,
595        reason: impl Into<String>,
596    ) {
597        let mut queue = self.inner.queue.lock();
598        fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
599            format!("database {} unavailable {}", database_id, reason)
600        }
601        fn mark_blocked_and_notify_failed(
602            database_id: DatabaseId,
603            queue: &mut DatabaseScheduledQueue,
604            reason: &String,
605        ) {
606            let reason = database_blocked_reason(database_id, reason);
607            let err: MetaError = anyhow!("{}", reason).into();
608            queue.mark_blocked(reason);
609            while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
610                notifiers
611                    .into_iter()
612                    .for_each(|notify| notify.notify_collection_failed(err.clone()))
613            }
614        }
615        if let Some(database_id) = database_id {
616            let reason = reason.into();
617            match queue.queue.entry(database_id) {
618                Entry::Occupied(entry) => {
619                    let queue = entry.into_mut();
620                    if queue.status.is_blocked() {
621                        if cfg!(debug_assertions) {
622                            panic!("database {} marked as blocked twice", database_id);
623                        } else {
624                            warn!(?database_id, "database marked as blocked twice");
625                        }
626                    }
627                    info!(?database_id, "database marked as blocked");
628                    mark_blocked_and_notify_failed(database_id, queue, &reason);
629                }
630                Entry::Vacant(entry) => {
631                    entry.insert(DatabaseScheduledQueue::new(
632                        database_id,
633                        &self.inner.metrics,
634                        QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
635                    ));
636                }
637            }
638        } else {
639            let reason = reason.into();
640            if queue.status.is_blocked() {
641                if cfg!(debug_assertions) {
642                    panic!("cluster marked as blocked twice");
643                } else {
644                    warn!("cluster marked as blocked twice");
645                }
646            }
647            info!("cluster marked as blocked");
648            queue.mark_blocked(reason.clone());
649            for (database_id, queue) in &mut queue.queue {
650                mark_blocked_and_notify_failed(*database_id, queue, &reason);
651            }
652        }
653    }
654
655    /// Mark command scheduler as ready to accept new command.
656    pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
657        let mut queue = self.inner.queue.lock();
658        let queue = &mut *queue;
659        match options {
660            MarkReadyOptions::Database(database_id) => {
661                info!(?database_id, "database marked as ready");
662                let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
663                    DatabaseScheduledQueue::new(
664                        database_id,
665                        &self.inner.metrics,
666                        QueueStatus::Ready,
667                    )
668                });
669                if !database_queue.status.is_blocked() {
670                    if cfg!(debug_assertions) {
671                        panic!("database {} marked as ready twice", database_id);
672                    } else {
673                        warn!(?database_id, "database marked as ready twice");
674                    }
675                }
676                if database_queue.mark_ready()
677                    && !queue.status.is_blocked()
678                    && !database_queue.queue.inner.is_empty()
679                {
680                    self.inner.changed_tx.send(()).ok();
681                }
682            }
683            MarkReadyOptions::Global { blocked_databases } => {
684                if !queue.status.is_blocked() {
685                    if cfg!(debug_assertions) {
686                        panic!("cluster marked as ready twice");
687                    } else {
688                        warn!("cluster marked as ready twice");
689                    }
690                }
691                info!(?blocked_databases, "cluster marked as ready");
692                let prev_blocked = queue.mark_ready();
693                for database_id in &blocked_databases {
694                    queue.queue.entry(*database_id).or_insert_with(|| {
695                        DatabaseScheduledQueue::new(
696                            *database_id,
697                            &self.inner.metrics,
698                            QueueStatus::Blocked(format!(
699                                "database {} failed to recover in global recovery",
700                                database_id
701                            )),
702                        )
703                    });
704                }
705                for (database_id, queue) in &mut queue.queue {
706                    if !blocked_databases.contains(database_id) {
707                        queue.mark_ready();
708                    }
709                }
710                if prev_blocked
711                    && queue
712                        .queue
713                        .values()
714                        .any(|database_queue| !database_queue.queue.inner.is_empty())
715                {
716                    self.inner.changed_tx.send(()).ok();
717                }
718            }
719        }
720    }
721
722    /// Try to pre apply drop and cancel scheduled command and return them if any.
723    /// It should only be called in recovery.
724    pub(super) fn pre_apply_drop_cancel_scheduled(&self, database_id: Option<DatabaseId>) -> bool {
725        let mut queue = self.inner.queue.lock();
726        let mut applied = false;
727
728        let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
729            while let Some(ScheduledQueueItem {
730                notifiers, command, ..
731            }) = queue.queue.inner.pop_front()
732            {
733                match command {
734                    Command::DropStreamingJobs { .. } => {
735                        applied = true;
736                    }
737                    Command::DropSubscription { .. } => {}
738                    _ => {
739                        unreachable!("only drop and cancel streaming jobs should be buffered");
740                    }
741                }
742                notifiers.into_iter().for_each(|notify| {
743                    notify.notify_collected();
744                });
745            }
746        };
747
748        if let Some(database_id) = database_id {
749            assert_matches!(queue.status, QueueStatus::Ready);
750            if let Some(queue) = queue.queue.get_mut(&database_id) {
751                assert_matches!(queue.status, QueueStatus::Blocked(_));
752                pre_apply_drop_cancel(queue);
753            }
754        } else {
755            assert_matches!(queue.status, QueueStatus::Blocked(_));
756            for queue in queue.queue.values_mut() {
757                pre_apply_drop_cancel(queue);
758            }
759        }
760
761        applied
762    }
763}
764
765#[cfg(test)]
766mod tests {
767    use super::*;
768
769    fn create_test_database(
770        id: u32,
771        barrier_interval_ms: Option<u32>,
772        checkpoint_frequency: Option<u64>,
773    ) -> Database {
774        Database {
775            id,
776            name: format!("test_db_{}", id),
777            barrier_interval_ms,
778            checkpoint_frequency,
779            ..Default::default()
780        }
781    }
782
783    // Mock context for testing next_barrier
784    struct MockGlobalBarrierWorkerContext {
785        scheduled_rx: tokio::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<Scheduled>>,
786    }
787
788    impl MockGlobalBarrierWorkerContext {
789        fn new() -> (Self, tokio::sync::mpsc::UnboundedSender<Scheduled>) {
790            let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
791            (
792                Self {
793                    scheduled_rx: tokio::sync::Mutex::new(rx),
794                },
795                tx,
796            )
797        }
798    }
799
800    impl GlobalBarrierWorkerContext for MockGlobalBarrierWorkerContext {
801        async fn next_scheduled(&self) -> Scheduled {
802            self.scheduled_rx.lock().await.recv().await.unwrap()
803        }
804
805        async fn commit_epoch(
806            &self,
807            _commit_info: crate::hummock::CommitEpochInfo,
808        ) -> MetaResult<risingwave_pb::hummock::HummockVersionStats> {
809            unimplemented!()
810        }
811
812        fn abort_and_mark_blocked(
813            &self,
814            _database_id: Option<DatabaseId>,
815            _recovery_reason: crate::barrier::RecoveryReason,
816        ) {
817            unimplemented!()
818        }
819
820        fn mark_ready(&self, _options: MarkReadyOptions) {
821            unimplemented!()
822        }
823
824        async fn post_collect_command<'a>(
825            &'a self,
826            _command: &'a crate::barrier::command::CommandContext,
827        ) -> MetaResult<()> {
828            unimplemented!()
829        }
830
831        async fn notify_creating_job_failed(&self, _database_id: Option<DatabaseId>, _err: String) {
832            unimplemented!()
833        }
834
835        async fn finish_creating_job(
836            &self,
837            _job: crate::barrier::progress::TrackingJob,
838        ) -> MetaResult<()> {
839            unimplemented!()
840        }
841
842        async fn new_control_stream(
843            &self,
844            _node: &risingwave_pb::common::WorkerNode,
845            _init_request: &risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest,
846        ) -> MetaResult<risingwave_rpc_client::StreamingControlHandle> {
847            unimplemented!()
848        }
849
850        async fn reload_runtime_info(
851            &self,
852        ) -> MetaResult<crate::barrier::BarrierWorkerRuntimeInfoSnapshot> {
853            unimplemented!()
854        }
855
856        async fn reload_database_runtime_info(
857            &self,
858            _database_id: DatabaseId,
859        ) -> MetaResult<Option<crate::barrier::DatabaseRuntimeInfoSnapshot>> {
860            unimplemented!()
861        }
862
863        async fn handle_load_finished_source_ids(
864            &self,
865            _load_finished_source_ids: Vec<u32>,
866        ) -> MetaResult<()> {
867            unimplemented!()
868        }
869    }
870
871    #[tokio::test]
872    async fn test_next_barrier_with_different_intervals() {
873        // Create databases with different intervals
874        let databases = vec![
875            create_test_database(1, Some(50), Some(2)), // 50ms interval, checkpoint every 2
876            create_test_database(2, Some(100), Some(3)), // 100ms interval, checkpoint every 3
877            create_test_database(3, None, Some(5)), /* Use system default (200ms), checkpoint every 5 */
878        ];
879
880        let mut periodic = PeriodicBarriers::new(
881            Duration::from_millis(200), // System default
882            10,                         // System checkpoint frequency
883            databases,
884        );
885
886        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
887
888        // Call next_barrier for each database once, because the first tick is returned immediately
889        for _ in 0..3 {
890            let barrier = periodic.next_barrier(&context).await;
891            assert!(barrier.command.is_none()); // Should be a periodic barrier, not a scheduled command
892            assert!(!barrier.checkpoint); // First barrier shouldn't be a checkpoint
893        }
894
895        // Since we have 3 databases with intervals 50ms, 100ms, and 200ms,
896        // the first barrier should come from database 1 (50ms interval)
897        let start_time = Instant::now();
898        let barrier = periodic.next_barrier(&context).await;
899        let elapsed = start_time.elapsed();
900
901        // Verify the barrier properties
902        assert_eq!(barrier.database_id, DatabaseId::from(1));
903        assert!(barrier.command.is_none()); // Should be a periodic barrier, not a scheduled command
904        assert!(barrier.checkpoint); // Second barrier should be checkpoint for database 1
905        assert!(
906            elapsed <= Duration::from_millis(100),
907            "Elapsed time exceeded: {:?}",
908            elapsed
909        ); // Should be around 50ms
910
911        // Verify that the checkpoint frequency works
912        let db1_id = DatabaseId::from(1);
913        let db1_state = periodic.databases.get_mut(&db1_id).unwrap();
914        assert_eq!(db1_state.num_uncheckpointed_barrier, 0); // Should reset after checkpoint
915    }
916
917    #[tokio::test]
918    async fn test_next_barrier_with_scheduled_command() {
919        let databases = vec![
920            create_test_database(1, Some(1000), Some(2)), // Long interval to avoid interference
921        ];
922
923        let mut periodic = PeriodicBarriers::new(Duration::from_millis(1000), 10, databases);
924
925        let (context, tx) = MockGlobalBarrierWorkerContext::new();
926
927        // Skip the first barrier to let the timers start
928        periodic.next_barrier(&context).await;
929
930        // Schedule a command
931        let scheduled_command = Scheduled {
932            database_id: DatabaseId::from(1),
933            command: Command::Flush,
934            notifiers: vec![],
935            span: tracing::Span::none(),
936        };
937
938        // Send scheduled command in background
939        let tx_clone = tx.clone();
940        tokio::spawn(async move {
941            tokio::time::sleep(Duration::from_millis(10)).await;
942            tx_clone.send(scheduled_command).unwrap();
943        });
944
945        let barrier = periodic.next_barrier(&context).await;
946
947        // Should return the scheduled command
948        assert!(barrier.command.is_some());
949        assert_eq!(barrier.database_id, DatabaseId::from(1));
950
951        if let Some((command, _)) = barrier.command {
952            assert!(matches!(command, Command::Flush));
953        }
954    }
955
956    #[tokio::test]
957    async fn test_next_barrier_multiple_databases_timing() {
958        let databases = vec![
959            create_test_database(1, Some(30), Some(10)), // Fast interval
960            create_test_database(2, Some(100), Some(10)), // Slower interval
961        ];
962
963        let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 10, databases);
964
965        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
966
967        // Skip first 2 barriers to let the timers start
968        for _ in 0..2 {
969            periodic.next_barrier(&context).await;
970        }
971
972        let mut barrier_counts = HashMap::new();
973
974        // Collect barriers for a short period
975        let mut barriers = Vec::new();
976        for _ in 0..5 {
977            let barrier = periodic.next_barrier(&context).await;
978            barriers.push(barrier);
979        }
980
981        // Count barriers per database
982        for barrier in barriers {
983            *barrier_counts.entry(barrier.database_id).or_insert(0) += 1;
984        }
985
986        // Database 1 (30ms interval) should have more barriers than database 2 (100ms interval)
987        let db1_count = barrier_counts.get(&DatabaseId::from(1)).unwrap_or(&0);
988        let db2_count = barrier_counts.get(&DatabaseId::from(2)).unwrap_or(&0);
989
990        // Due to timing, db1 should generally have more barriers, but allow for some variance
991        assert!(*db1_count >= *db2_count);
992    }
993
994    #[tokio::test]
995    async fn test_next_barrier_force_checkpoint() {
996        let databases = vec![create_test_database(1, Some(100), Some(10))];
997
998        let mut periodic = PeriodicBarriers::new(Duration::from_millis(100), 10, databases);
999
1000        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1001
1002        // Force checkpoint for next barrier
1003        periodic.force_checkpoint_in_next_barrier(DatabaseId::from(1));
1004
1005        let barrier = periodic.next_barrier(&context).await;
1006
1007        // Should be a checkpoint barrier due to force_checkpoint
1008        assert!(barrier.checkpoint);
1009        assert_eq!(barrier.database_id, DatabaseId::from(1));
1010        assert!(barrier.command.is_none());
1011    }
1012
1013    #[tokio::test]
1014    async fn test_next_barrier_checkpoint_frequency() {
1015        let databases = vec![create_test_database(1, Some(50), Some(2))]; // Checkpoint every 2 barriers
1016
1017        let mut periodic = PeriodicBarriers::new(Duration::from_millis(50), 10, databases);
1018
1019        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1020
1021        // First barrier - should not be checkpoint
1022        let barrier1 = periodic.next_barrier(&context).await;
1023        assert!(!barrier1.checkpoint);
1024
1025        // Second barrier - should be checkpoint (frequency = 2)
1026        let barrier2 = periodic.next_barrier(&context).await;
1027        assert!(barrier2.checkpoint);
1028
1029        // Third barrier - should not be checkpoint (counter reset)
1030        let barrier3 = periodic.next_barrier(&context).await;
1031        assert!(!barrier3.checkpoint);
1032    }
1033
1034    #[tokio::test]
1035    async fn test_update_database_barrier() {
1036        let databases = vec![create_test_database(1, Some(1000), Some(10))];
1037
1038        let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 20, databases);
1039
1040        // Update existing database
1041        periodic.update_database_barrier(DatabaseId::from(1), Some(2000), Some(15));
1042
1043        let db_state = periodic.databases.get(&DatabaseId::from(1)).unwrap();
1044        assert_eq!(db_state.barrier_interval, Some(Duration::from_millis(2000)));
1045        assert_eq!(db_state.checkpoint_frequency, Some(15));
1046        assert_eq!(db_state.num_uncheckpointed_barrier, 0);
1047        assert!(!db_state.force_checkpoint);
1048
1049        // Add new database
1050        periodic.update_database_barrier(DatabaseId::from(2), None, None);
1051
1052        assert!(periodic.databases.contains_key(&DatabaseId::from(2)));
1053        let db2_state = periodic.databases.get(&DatabaseId::from(2)).unwrap();
1054        assert_eq!(db2_state.barrier_interval, None);
1055        assert_eq!(db2_state.checkpoint_frequency, None);
1056    }
1057}