risingwave_meta/barrier/
schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, anyhow};
21use assert_matches::assert_matches;
22use await_tree::InstrumentAwait;
23use itertools::Itertools;
24use parking_lot::Mutex;
25use prometheus::HistogramTimer;
26use risingwave_common::catalog::{DatabaseId, TableId};
27use risingwave_common::id::JobId;
28use risingwave_common::metrics::LabelGuardedHistogram;
29use risingwave_hummock_sdk::HummockVersionId;
30use risingwave_pb::catalog::Database;
31use rw_futures_util::pending_on_none;
32use tokio::select;
33use tokio::sync::{oneshot, watch};
34use tokio_stream::wrappers::IntervalStream;
35use tokio_stream::{StreamExt, StreamMap};
36use tracing::{info, warn};
37
38use super::notifier::Notifier;
39use super::{Command, Scheduled};
40use crate::barrier::context::GlobalBarrierWorkerContext;
41use crate::hummock::HummockManagerRef;
42use crate::rpc::metrics::{GLOBAL_META_METRICS, MetaMetrics};
43use crate::{MetaError, MetaResult};
44
45pub(super) struct NewBarrier {
46    pub database_id: DatabaseId,
47    pub command: Option<(Command, Vec<Notifier>)>,
48    pub span: tracing::Span,
49    pub checkpoint: bool,
50}
51
52/// A queue for scheduling barriers.
53///
54/// We manually implement one here instead of using channels since we may need to update the front
55/// of the queue to add some notifiers for instant flushes.
56struct Inner {
57    queue: Mutex<ScheduledQueue>,
58
59    /// When `queue` is not empty anymore, all subscribers of this watcher will be notified.
60    changed_tx: watch::Sender<()>,
61
62    /// Used for recording send latency of each barrier.
63    metrics: Arc<MetaMetrics>,
64}
65
66#[derive(Debug)]
67enum QueueStatus {
68    /// The queue is ready to accept new command.
69    Ready,
70    /// The queue is blocked to accept new command with the given reason.
71    Blocked(String),
72}
73
74impl QueueStatus {
75    fn is_blocked(&self) -> bool {
76        matches!(self, Self::Blocked(_))
77    }
78}
79
80struct ScheduledQueueItem {
81    command: Command,
82    notifiers: Vec<Notifier>,
83    send_latency_timer: HistogramTimer,
84    span: tracing::Span,
85}
86
87struct StatusQueue<T> {
88    queue: T,
89    status: QueueStatus,
90}
91
92struct DatabaseQueue {
93    inner: VecDeque<ScheduledQueueItem>,
94    send_latency: LabelGuardedHistogram,
95}
96
97type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
98type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
99
100impl DatabaseScheduledQueue {
101    fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
102        Self {
103            queue: DatabaseQueue {
104                inner: Default::default(),
105                send_latency: metrics
106                    .barrier_send_latency
107                    .with_guarded_label_values(&[database_id.to_string().as_str()]),
108            },
109            status,
110        }
111    }
112}
113
114impl<T> StatusQueue<T> {
115    fn mark_blocked(&mut self, reason: String) {
116        self.status = QueueStatus::Blocked(reason);
117    }
118
119    fn mark_ready(&mut self) -> bool {
120        let prev_blocked = self.status.is_blocked();
121        self.status = QueueStatus::Ready;
122        prev_blocked
123    }
124
125    fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
126        // We don't allow any command to be scheduled when the queue is blocked, except for dropping streaming jobs.
127        // Because we allow dropping streaming jobs when the cluster is under recovery, so we have to buffer the drop
128        // command and execute it when the cluster is ready to clean up it.
129        // TODO: this is just a workaround to allow dropping streaming jobs when the cluster is under recovery,
130        // we need to refine it when catalog and streaming metadata can be handled in a transactional way.
131        if let QueueStatus::Blocked(reason) = &self.status
132            && !matches!(
133                command,
134                Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
135            )
136        {
137            return Err(MetaError::unavailable(reason));
138        }
139        Ok(())
140    }
141}
142
143fn tracing_span() -> tracing::Span {
144    if tracing::Span::current().is_none() {
145        tracing::Span::none()
146    } else {
147        tracing::info_span!(
148            "barrier",
149            checkpoint = tracing::field::Empty,
150            epoch = tracing::field::Empty
151        )
152    }
153}
154
155/// The sender side of the barrier scheduling queue.
156/// Can be cloned and held by other managers to schedule and run barriers.
157#[derive(Clone)]
158pub struct BarrierScheduler {
159    inner: Arc<Inner>,
160
161    /// Used for getting the latest snapshot after `FLUSH`.
162    hummock_manager: HummockManagerRef,
163}
164
165impl BarrierScheduler {
166    /// Create a pair of [`BarrierScheduler`] and [`ScheduledBarriers`], for scheduling barriers
167    /// from different managers, and executing them in the barrier manager, respectively.
168    pub fn new_pair(
169        hummock_manager: HummockManagerRef,
170        metrics: Arc<MetaMetrics>,
171    ) -> (Self, ScheduledBarriers) {
172        let inner = Arc::new(Inner {
173            queue: Mutex::new(ScheduledQueue {
174                queue: Default::default(),
175                status: QueueStatus::Ready,
176            }),
177            changed_tx: watch::channel(()).0,
178            metrics,
179        });
180
181        (
182            Self {
183                inner: inner.clone(),
184                hummock_manager,
185            },
186            ScheduledBarriers { inner },
187        )
188    }
189
190    /// Push a scheduled barrier into the queue.
191    fn push(
192        &self,
193        database_id: DatabaseId,
194        scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
195    ) -> MetaResult<()> {
196        let mut queue = self.inner.queue.lock();
197        let scheduleds = scheduleds.into_iter().collect_vec();
198        scheduleds
199            .iter()
200            .try_for_each(|(command, _)| queue.validate_item(command))?;
201        let queue = queue.queue.entry(database_id).or_insert_with(|| {
202            DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
203        });
204        scheduleds
205            .iter()
206            .try_for_each(|(command, _)| queue.validate_item(command))?;
207        for (command, notifier) in scheduleds {
208            queue.queue.inner.push_back(ScheduledQueueItem {
209                command,
210                notifiers: vec![notifier],
211                send_latency_timer: queue.queue.send_latency.start_timer(),
212                span: tracing_span(),
213            });
214            if queue.queue.inner.len() == 1 {
215                self.inner.changed_tx.send(()).ok();
216            }
217        }
218        Ok(())
219    }
220
221    /// Try to cancel scheduled cmd for create streaming job, return true if the command exists previously and get cancelled.
222    pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, job_id: JobId) -> bool {
223        let queue = &mut self.inner.queue.lock();
224        let Some(queue) = queue.queue.get_mut(&database_id) else {
225            return false;
226        };
227
228        if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
229            if let Command::CreateStreamingJob { info, .. } = &scheduled.command
230                && info.stream_job_fragments.stream_job_id() == job_id
231            {
232                true
233            } else {
234                false
235            }
236        }) {
237            queue.queue.inner.remove(idx).unwrap();
238            true
239        } else {
240            false
241        }
242    }
243
244    /// Run multiple commands and return when they're all completely finished (i.e., collected). It's ensured that
245    /// multiple commands are executed continuously.
246    ///
247    /// Returns the barrier info of each command.
248    ///
249    /// TODO: atomicity of multiple commands is not guaranteed.
250    #[await_tree::instrument("run_commands({})", commands.iter().join(", "))]
251    async fn run_multiple_commands(
252        &self,
253        database_id: DatabaseId,
254        commands: Vec<Command>,
255    ) -> MetaResult<()> {
256        let mut contexts = Vec::with_capacity(commands.len());
257        let mut scheduleds = Vec::with_capacity(commands.len());
258
259        for command in commands {
260            let (started_tx, started_rx) = oneshot::channel();
261            let (collect_tx, collect_rx) = oneshot::channel();
262
263            contexts.push((started_rx, collect_rx));
264            scheduleds.push((
265                command,
266                Notifier {
267                    started: Some(started_tx),
268                    collected: Some(collect_tx),
269                },
270            ));
271        }
272
273        self.push(database_id, scheduleds)?;
274
275        for (injected_rx, collect_rx) in contexts {
276            // Wait for this command to be injected, and record the result.
277            tracing::trace!("waiting for injected_rx");
278            injected_rx
279                .instrument_await("wait_injected")
280                .await
281                .ok()
282                .context("failed to inject barrier")??;
283
284            tracing::trace!("waiting for collect_rx");
285            // Throw the error if it occurs when collecting this barrier.
286            collect_rx
287                .instrument_await("wait_collected")
288                .await
289                .ok()
290                .context("failed to collect barrier")??;
291        }
292
293        Ok(())
294    }
295
296    /// Run a command and return when it's completely finished (i.e., collected).
297    ///
298    /// Returns the barrier info of the actual command.
299    pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
300        tracing::trace!("run_command: {:?}", command);
301        let ret = self.run_multiple_commands(database_id, vec![command]).await;
302        tracing::trace!("run_command finished");
303        ret
304    }
305
306    /// Schedule a command without waiting for it to be executed.
307    pub fn run_command_no_wait(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
308        tracing::trace!("run_command_no_wait: {:?}", command);
309        self.push(database_id, vec![(command, Notifier::default())])
310    }
311
312    /// Flush means waiting for the next barrier to collect.
313    pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
314        let start = Instant::now();
315
316        tracing::debug!("start barrier flush");
317        self.run_multiple_commands(database_id, vec![Command::Flush])
318            .await?;
319
320        let elapsed = Instant::now().duration_since(start);
321        tracing::debug!("barrier flushed in {:?}", elapsed);
322
323        let version_id = self.hummock_manager.get_version_id().await;
324        Ok(version_id)
325    }
326}
327
328/// The receiver side of the barrier scheduling queue.
329pub struct ScheduledBarriers {
330    inner: Arc<Inner>,
331}
332
333/// State specific to each database for barrier generation.
334#[derive(Debug)]
335pub struct DatabaseBarrierState {
336    pub barrier_interval: Option<Duration>,
337    pub checkpoint_frequency: Option<u64>,
338    // Force checkpoint in next barrier.
339    pub force_checkpoint: bool,
340    // The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true)
341    pub num_uncheckpointed_barrier: u64,
342}
343
344impl DatabaseBarrierState {
345    fn new(barrier_interval_ms: Option<u32>, checkpoint_frequency: Option<u64>) -> Self {
346        Self {
347            barrier_interval: barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64)),
348            checkpoint_frequency,
349            force_checkpoint: false,
350            num_uncheckpointed_barrier: 0,
351        }
352    }
353}
354
355/// Held by the [`crate::barrier::worker::GlobalBarrierWorker`] to execute these commands.
356#[derive(Default, Debug)]
357pub struct PeriodicBarriers {
358    /// Default system params for barrier interval and checkpoint frequency.
359    sys_barrier_interval: Duration,
360    sys_checkpoint_frequency: u64,
361    /// Per-database state.
362    databases: HashMap<DatabaseId, DatabaseBarrierState>,
363    /// Holds `IntervalStream` for each database, keyed by `DatabaseId`.
364    /// `StreamMap` will yield `(DatabaseId, Instant)` when a timer ticks.
365    timer_streams: StreamMap<DatabaseId, IntervalStream>,
366}
367
368impl PeriodicBarriers {
369    pub(super) fn new(
370        sys_barrier_interval: Duration,
371        sys_checkpoint_frequency: u64,
372        database_infos: Vec<Database>,
373    ) -> Self {
374        let mut databases = HashMap::with_capacity(database_infos.len());
375        let mut timer_streams = StreamMap::with_capacity(database_infos.len());
376        database_infos.into_iter().for_each(|database| {
377            let database_id: DatabaseId = database.id;
378            let barrier_interval_ms = database.barrier_interval_ms;
379            let checkpoint_frequency = database.checkpoint_frequency;
380            databases.insert(
381                database_id,
382                DatabaseBarrierState::new(barrier_interval_ms, checkpoint_frequency),
383            );
384            let duration = if let Some(ms) = barrier_interval_ms {
385                Duration::from_millis(ms as u64)
386            } else {
387                sys_barrier_interval
388            };
389
390            // Create an `IntervalStream` for the database with the specified interval.
391            let interval_stream = Self::new_interval_stream(duration, &database_id);
392            timer_streams.insert(database_id, interval_stream);
393        });
394        Self {
395            sys_barrier_interval,
396            sys_checkpoint_frequency,
397            databases,
398            timer_streams,
399        }
400    }
401
402    // Create a new interval stream with the specified duration.
403    fn new_interval_stream(duration: Duration, database_id: &DatabaseId) -> IntervalStream {
404        GLOBAL_META_METRICS
405            .barrier_interval_by_database
406            .with_label_values(&[&database_id.to_string()])
407            .set(duration.as_millis_f64());
408        let mut interval = tokio::time::interval(duration);
409        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
410        IntervalStream::new(interval)
411    }
412
413    /// Update the system barrier interval.
414    pub(super) fn set_sys_barrier_interval(&mut self, duration: Duration) {
415        if self.sys_barrier_interval == duration {
416            return;
417        }
418        self.sys_barrier_interval = duration;
419        // Reset the `IntervalStream` for all databases that use default param.
420        for (db_id, db_state) in &mut self.databases {
421            if db_state.barrier_interval.is_none() {
422                let interval_stream = Self::new_interval_stream(duration, db_id);
423                self.timer_streams.insert(*db_id, interval_stream);
424            }
425        }
426    }
427
428    /// Update the system checkpoint frequency.
429    pub fn set_sys_checkpoint_frequency(&mut self, frequency: u64) {
430        if self.sys_checkpoint_frequency == frequency {
431            return;
432        }
433        self.sys_checkpoint_frequency = frequency;
434        // Reset the `num_uncheckpointed_barrier` for all databases that use default param.
435        for db_state in self.databases.values_mut() {
436            if db_state.checkpoint_frequency.is_none() {
437                db_state.num_uncheckpointed_barrier = 0;
438                db_state.force_checkpoint = false;
439            }
440        }
441    }
442
443    pub(super) fn update_database_barrier(
444        &mut self,
445        database_id: DatabaseId,
446        barrier_interval_ms: Option<u32>,
447        checkpoint_frequency: Option<u64>,
448    ) {
449        match self.databases.entry(database_id) {
450            Entry::Occupied(mut entry) => {
451                let db_state = entry.get_mut();
452                db_state.barrier_interval =
453                    barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64));
454                db_state.checkpoint_frequency = checkpoint_frequency;
455                // Reset the `num_uncheckpointed_barrier` since the barrier interval or checkpoint frequency is changed.
456                db_state.num_uncheckpointed_barrier = 0;
457                db_state.force_checkpoint = false;
458            }
459            Entry::Vacant(entry) => {
460                entry.insert(DatabaseBarrierState::new(
461                    barrier_interval_ms,
462                    checkpoint_frequency,
463                ));
464            }
465        }
466
467        // If the database already has a timer stream, reset it with the new interval.
468        let duration = if let Some(ms) = barrier_interval_ms {
469            Duration::from_millis(ms as u64)
470        } else {
471            self.sys_barrier_interval
472        };
473
474        let interval_stream = Self::new_interval_stream(duration, &database_id);
475        self.timer_streams.insert(database_id, interval_stream);
476    }
477
478    /// Make the `checkpoint` of the next barrier must be true.
479    pub fn force_checkpoint_in_next_barrier(&mut self, database_id: DatabaseId) {
480        if let Some(db_state) = self.databases.get_mut(&database_id) {
481            db_state.force_checkpoint = true;
482        } else {
483            warn!(
484                ?database_id,
485                "force checkpoint in next barrier for non-existing database"
486            );
487        }
488    }
489
490    #[await_tree::instrument]
491    pub(super) async fn next_barrier(
492        &mut self,
493        context: &impl GlobalBarrierWorkerContext,
494    ) -> NewBarrier {
495        let new_barrier = select! {
496            biased;
497            scheduled = context.next_scheduled() => {
498                let database_id = scheduled.database_id;
499                // Check if the database exists.
500                assert!(self.databases.contains_key(&database_id), "database {} not found in periodic barriers", database_id);
501                assert!(self.timer_streams.contains_key(&database_id), "timer stream for database {} not found in periodic barriers", database_id);
502                // New command will trigger the barriers, so reset the timer for the specific database.
503                for (db_id, timer_stream) in self.timer_streams.iter_mut() {
504                    if *db_id == database_id {
505                        timer_stream.as_mut().reset();
506                    }
507                }
508                let checkpoint = scheduled.command.need_checkpoint() || self.try_get_checkpoint(database_id);
509                NewBarrier {
510                    database_id: scheduled.database_id,
511                    command: Some((scheduled.command, scheduled.notifiers)),
512                    span: scheduled.span,
513                    checkpoint,
514                }
515            },
516            // If there is no database, we won't wait for `Interval`, but only wait for command.
517            // Normally it will not return None, because there is always at least one database.
518            next_timer = pending_on_none(self.timer_streams.next()) => {
519                let (database_id, _instant) = next_timer;
520                let checkpoint = self.try_get_checkpoint(database_id);
521                NewBarrier {
522                    database_id,
523                    command: None,
524                    span: tracing_span(),
525                    checkpoint,
526                }
527            }
528        };
529        self.update_num_uncheckpointed_barrier(new_barrier.database_id, new_barrier.checkpoint);
530
531        new_barrier
532    }
533
534    /// Whether the barrier(checkpoint = true) should be injected.
535    fn try_get_checkpoint(&self, database_id: DatabaseId) -> bool {
536        let db_state = self.databases.get(&database_id).unwrap();
537        let checkpoint_frequency = db_state
538            .checkpoint_frequency
539            .unwrap_or(self.sys_checkpoint_frequency);
540        db_state.num_uncheckpointed_barrier + 1 >= checkpoint_frequency || db_state.force_checkpoint
541    }
542
543    /// Update the `num_uncheckpointed_barrier`
544    fn update_num_uncheckpointed_barrier(&mut self, database_id: DatabaseId, checkpoint: bool) {
545        let db_state = self.databases.get_mut(&database_id).unwrap();
546        if checkpoint {
547            db_state.num_uncheckpointed_barrier = 0;
548            db_state.force_checkpoint = false;
549        } else {
550            db_state.num_uncheckpointed_barrier += 1;
551        }
552    }
553}
554
555impl ScheduledBarriers {
556    pub(super) async fn next_scheduled(&self) -> Scheduled {
557        'outer: loop {
558            let mut rx = self.inner.changed_tx.subscribe();
559            {
560                let mut queue = self.inner.queue.lock();
561                if queue.status.is_blocked() {
562                    continue;
563                }
564                for (database_id, queue) in &mut queue.queue {
565                    if queue.status.is_blocked() {
566                        continue;
567                    }
568                    if let Some(item) = queue.queue.inner.pop_front() {
569                        item.send_latency_timer.observe_duration();
570                        break 'outer Scheduled {
571                            database_id: *database_id,
572                            command: item.command,
573                            notifiers: item.notifiers,
574                            span: item.span,
575                        };
576                    }
577                }
578            }
579            rx.changed().await.unwrap();
580        }
581    }
582}
583
584pub(super) enum MarkReadyOptions {
585    Database(DatabaseId),
586    Global {
587        blocked_databases: HashSet<DatabaseId>,
588    },
589}
590
591impl ScheduledBarriers {
592    /// Pre buffered drop and cancel command, return all dropped state tables if any.
593    pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> Vec<TableId> {
594        self.pre_apply_drop_cancel_scheduled(database_id)
595    }
596
597    /// Mark command scheduler as blocked and abort all queued scheduled command and notify with
598    /// specific reason.
599    pub(super) fn abort_and_mark_blocked(
600        &self,
601        database_id: Option<DatabaseId>,
602        reason: impl Into<String>,
603    ) {
604        let mut queue = self.inner.queue.lock();
605        fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
606            format!("database {} unavailable {}", database_id, reason)
607        }
608        fn mark_blocked_and_notify_failed(
609            database_id: DatabaseId,
610            queue: &mut DatabaseScheduledQueue,
611            reason: &String,
612        ) {
613            let reason = database_blocked_reason(database_id, reason);
614            let err: MetaError = anyhow!("{}", reason).into();
615            queue.mark_blocked(reason);
616            while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
617                notifiers
618                    .into_iter()
619                    .for_each(|notify| notify.notify_collection_failed(err.clone()))
620            }
621        }
622        if let Some(database_id) = database_id {
623            let reason = reason.into();
624            match queue.queue.entry(database_id) {
625                Entry::Occupied(entry) => {
626                    let queue = entry.into_mut();
627                    if queue.status.is_blocked() {
628                        if cfg!(debug_assertions) {
629                            panic!("database {} marked as blocked twice", database_id);
630                        } else {
631                            warn!(?database_id, "database marked as blocked twice");
632                        }
633                    }
634                    info!(?database_id, "database marked as blocked");
635                    mark_blocked_and_notify_failed(database_id, queue, &reason);
636                }
637                Entry::Vacant(entry) => {
638                    entry.insert(DatabaseScheduledQueue::new(
639                        database_id,
640                        &self.inner.metrics,
641                        QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
642                    ));
643                }
644            }
645        } else {
646            let reason = reason.into();
647            if queue.status.is_blocked() {
648                if cfg!(debug_assertions) {
649                    panic!("cluster marked as blocked twice");
650                } else {
651                    warn!("cluster marked as blocked twice");
652                }
653            }
654            info!("cluster marked as blocked");
655            queue.mark_blocked(reason.clone());
656            for (database_id, queue) in &mut queue.queue {
657                mark_blocked_and_notify_failed(*database_id, queue, &reason);
658            }
659        }
660    }
661
662    /// Mark command scheduler as ready to accept new command.
663    pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
664        let mut queue = self.inner.queue.lock();
665        let queue = &mut *queue;
666        match options {
667            MarkReadyOptions::Database(database_id) => {
668                info!(?database_id, "database marked as ready");
669                let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
670                    DatabaseScheduledQueue::new(
671                        database_id,
672                        &self.inner.metrics,
673                        QueueStatus::Ready,
674                    )
675                });
676                if !database_queue.status.is_blocked() {
677                    if cfg!(debug_assertions) {
678                        panic!("database {} marked as ready twice", database_id);
679                    } else {
680                        warn!(?database_id, "database marked as ready twice");
681                    }
682                }
683                if database_queue.mark_ready()
684                    && !queue.status.is_blocked()
685                    && !database_queue.queue.inner.is_empty()
686                {
687                    self.inner.changed_tx.send(()).ok();
688                }
689            }
690            MarkReadyOptions::Global { blocked_databases } => {
691                if !queue.status.is_blocked() {
692                    if cfg!(debug_assertions) {
693                        panic!("cluster marked as ready twice");
694                    } else {
695                        warn!("cluster marked as ready twice");
696                    }
697                }
698                info!(?blocked_databases, "cluster marked as ready");
699                let prev_blocked = queue.mark_ready();
700                for database_id in &blocked_databases {
701                    queue.queue.entry(*database_id).or_insert_with(|| {
702                        DatabaseScheduledQueue::new(
703                            *database_id,
704                            &self.inner.metrics,
705                            QueueStatus::Blocked(format!(
706                                "database {} failed to recover in global recovery",
707                                database_id
708                            )),
709                        )
710                    });
711                }
712                for (database_id, queue) in &mut queue.queue {
713                    if !blocked_databases.contains(database_id) {
714                        queue.mark_ready();
715                    }
716                }
717                if prev_blocked
718                    && queue
719                        .queue
720                        .values()
721                        .any(|database_queue| !database_queue.queue.inner.is_empty())
722                {
723                    self.inner.changed_tx.send(()).ok();
724                }
725            }
726        }
727    }
728
729    /// Try to pre apply drop and cancel scheduled command and return all dropped state tables if any.
730    /// It should only be called in recovery.
731    pub(super) fn pre_apply_drop_cancel_scheduled(
732        &self,
733        database_id: Option<DatabaseId>,
734    ) -> Vec<TableId> {
735        let mut queue = self.inner.queue.lock();
736        let mut dropped_tables = vec![];
737
738        let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
739            while let Some(ScheduledQueueItem {
740                notifiers, command, ..
741            }) = queue.queue.inner.pop_front()
742            {
743                match command {
744                    Command::DropStreamingJobs {
745                        unregistered_state_table_ids,
746                        ..
747                    } => {
748                        dropped_tables.extend(unregistered_state_table_ids);
749                    }
750                    Command::DropSubscription { .. } => {}
751                    _ => {
752                        unreachable!("only drop and cancel streaming jobs should be buffered");
753                    }
754                }
755                notifiers.into_iter().for_each(|notify| {
756                    notify.notify_collected();
757                });
758            }
759        };
760
761        if let Some(database_id) = database_id {
762            assert_matches!(queue.status, QueueStatus::Ready);
763            if let Some(queue) = queue.queue.get_mut(&database_id) {
764                assert_matches!(queue.status, QueueStatus::Blocked(_));
765                pre_apply_drop_cancel(queue);
766            }
767        } else {
768            assert_matches!(queue.status, QueueStatus::Blocked(_));
769            for queue in queue.queue.values_mut() {
770                pre_apply_drop_cancel(queue);
771            }
772        }
773
774        dropped_tables
775    }
776}
777
778#[cfg(test)]
779mod tests {
780    use super::*;
781
782    fn create_test_database(
783        id: u32,
784        barrier_interval_ms: Option<u32>,
785        checkpoint_frequency: Option<u64>,
786    ) -> Database {
787        Database {
788            id: id.into(),
789            name: format!("test_db_{}", id),
790            barrier_interval_ms,
791            checkpoint_frequency,
792            ..Default::default()
793        }
794    }
795
796    // Mock context for testing next_barrier
797    struct MockGlobalBarrierWorkerContext {
798        scheduled_rx: tokio::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<Scheduled>>,
799    }
800
801    impl MockGlobalBarrierWorkerContext {
802        fn new() -> (Self, tokio::sync::mpsc::UnboundedSender<Scheduled>) {
803            let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
804            (
805                Self {
806                    scheduled_rx: tokio::sync::Mutex::new(rx),
807                },
808                tx,
809            )
810        }
811    }
812
813    impl GlobalBarrierWorkerContext for MockGlobalBarrierWorkerContext {
814        async fn next_scheduled(&self) -> Scheduled {
815            self.scheduled_rx.lock().await.recv().await.unwrap()
816        }
817
818        async fn commit_epoch(
819            &self,
820            _commit_info: crate::hummock::CommitEpochInfo,
821        ) -> MetaResult<risingwave_pb::hummock::HummockVersionStats> {
822            unimplemented!()
823        }
824
825        fn abort_and_mark_blocked(
826            &self,
827            _database_id: Option<DatabaseId>,
828            _recovery_reason: crate::barrier::RecoveryReason,
829        ) {
830            unimplemented!()
831        }
832
833        fn mark_ready(&self, _options: MarkReadyOptions) {
834            unimplemented!()
835        }
836
837        async fn post_collect_command<'a>(
838            &'a self,
839            _command: &'a crate::barrier::command::CommandContext,
840        ) -> MetaResult<()> {
841            unimplemented!()
842        }
843
844        async fn notify_creating_job_failed(&self, _database_id: Option<DatabaseId>, _err: String) {
845            unimplemented!()
846        }
847
848        async fn finish_creating_job(
849            &self,
850            _job: crate::barrier::progress::TrackingJob,
851        ) -> MetaResult<()> {
852            unimplemented!()
853        }
854
855        async fn new_control_stream(
856            &self,
857            _node: &risingwave_pb::common::WorkerNode,
858            _init_request: &risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest,
859        ) -> MetaResult<risingwave_rpc_client::StreamingControlHandle> {
860            unimplemented!()
861        }
862
863        async fn reload_runtime_info(
864            &self,
865        ) -> MetaResult<crate::barrier::BarrierWorkerRuntimeInfoSnapshot> {
866            unimplemented!()
867        }
868
869        async fn reload_database_runtime_info(
870            &self,
871            _database_id: DatabaseId,
872        ) -> MetaResult<Option<crate::barrier::DatabaseRuntimeInfoSnapshot>> {
873            unimplemented!()
874        }
875
876        async fn handle_list_finished_source_ids(
877            &self,
878            _list_finished_source_ids: Vec<u32>,
879        ) -> MetaResult<()> {
880            unimplemented!()
881        }
882
883        async fn handle_load_finished_source_ids(
884            &self,
885            _load_finished_source_ids: Vec<u32>,
886        ) -> MetaResult<()> {
887            unimplemented!()
888        }
889
890        async fn finish_cdc_table_backfill(&self, _job_id: JobId) -> MetaResult<()> {
891            unimplemented!()
892        }
893
894        async fn handle_refresh_finished_table_ids(
895            &self,
896            _refresh_finished_table_ids: Vec<JobId>,
897        ) -> MetaResult<()> {
898            unimplemented!()
899        }
900    }
901
902    #[tokio::test]
903    async fn test_next_barrier_with_different_intervals() {
904        // Create databases with different intervals
905        let databases = vec![
906            create_test_database(1, Some(50), Some(2)), // 50ms interval, checkpoint every 2
907            create_test_database(2, Some(100), Some(3)), // 100ms interval, checkpoint every 3
908            create_test_database(3, None, Some(5)), /* Use system default (200ms), checkpoint every 5 */
909        ];
910
911        let mut periodic = PeriodicBarriers::new(
912            Duration::from_millis(200), // System default
913            10,                         // System checkpoint frequency
914            databases,
915        );
916
917        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
918
919        // Call next_barrier for each database once, because the first tick is returned immediately
920        for _ in 0..3 {
921            let barrier = periodic.next_barrier(&context).await;
922            assert!(barrier.command.is_none()); // Should be a periodic barrier, not a scheduled command
923            assert!(!barrier.checkpoint); // First barrier shouldn't be a checkpoint
924        }
925
926        // Since we have 3 databases with intervals 50ms, 100ms, and 200ms,
927        // the first barrier should come from database 1 (50ms interval)
928        let start_time = Instant::now();
929        let barrier = periodic.next_barrier(&context).await;
930        let _elapsed = start_time.elapsed();
931
932        // Verify the barrier properties
933        assert_eq!(barrier.database_id, DatabaseId::from(1));
934        assert!(barrier.command.is_none()); // Should be a periodic barrier, not a scheduled command
935        assert!(barrier.checkpoint); // Second barrier should be checkpoint for database 1
936        // TODO(zyx): unstable in ci, temporarily commented out
937        // assert!(
938        //     elapsed <= Duration::from_millis(100),
939        //     "Elapsed time exceeded: {:?}",
940        //     elapsed
941        // ); // Should be around 50ms
942
943        // Verify that the checkpoint frequency works
944        let db1_id = DatabaseId::from(1);
945        let db1_state = periodic.databases.get_mut(&db1_id).unwrap();
946        assert_eq!(db1_state.num_uncheckpointed_barrier, 0); // Should reset after checkpoint
947    }
948
949    #[tokio::test]
950    async fn test_next_barrier_with_scheduled_command() {
951        let databases = vec![
952            create_test_database(1, Some(1000), Some(2)), // Long interval to avoid interference
953        ];
954
955        let mut periodic = PeriodicBarriers::new(Duration::from_millis(1000), 10, databases);
956
957        let (context, tx) = MockGlobalBarrierWorkerContext::new();
958
959        // Skip the first barrier to let the timers start
960        periodic.next_barrier(&context).await;
961
962        // Schedule a command
963        let scheduled_command = Scheduled {
964            database_id: DatabaseId::from(1),
965            command: Command::Flush,
966            notifiers: vec![],
967            span: tracing::Span::none(),
968        };
969
970        // Send scheduled command in background
971        let tx_clone = tx.clone();
972        tokio::spawn(async move {
973            tokio::time::sleep(Duration::from_millis(10)).await;
974            tx_clone.send(scheduled_command).unwrap();
975        });
976
977        let barrier = periodic.next_barrier(&context).await;
978
979        // Should return the scheduled command
980        assert!(barrier.command.is_some());
981        assert_eq!(barrier.database_id, DatabaseId::from(1));
982
983        if let Some((command, _)) = barrier.command {
984            assert!(matches!(command, Command::Flush));
985        }
986    }
987
988    #[tokio::test]
989    async fn test_next_barrier_multiple_databases_timing() {
990        let databases = vec![
991            create_test_database(1, Some(30), Some(10)), // Fast interval
992            create_test_database(2, Some(100), Some(10)), // Slower interval
993        ];
994
995        let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 10, databases);
996
997        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
998
999        // Skip first 2 barriers to let the timers start
1000        for _ in 0..2 {
1001            periodic.next_barrier(&context).await;
1002        }
1003
1004        let mut barrier_counts = HashMap::new();
1005
1006        // Collect barriers for a short period
1007        let mut barriers = Vec::new();
1008        for _ in 0..5 {
1009            let barrier = periodic.next_barrier(&context).await;
1010            barriers.push(barrier);
1011        }
1012
1013        // Count barriers per database
1014        for barrier in barriers {
1015            *barrier_counts.entry(barrier.database_id).or_insert(0) += 1;
1016        }
1017
1018        // Database 1 (30ms interval) should have more barriers than database 2 (100ms interval)
1019        let db1_count = barrier_counts.get(&DatabaseId::from(1)).unwrap_or(&0);
1020        let db2_count = barrier_counts.get(&DatabaseId::from(2)).unwrap_or(&0);
1021
1022        // Due to timing, db1 should generally have more barriers, but allow for some variance
1023        assert!(*db1_count >= *db2_count);
1024    }
1025
1026    #[tokio::test]
1027    async fn test_next_barrier_force_checkpoint() {
1028        let databases = vec![create_test_database(1, Some(100), Some(10))];
1029
1030        let mut periodic = PeriodicBarriers::new(Duration::from_millis(100), 10, databases);
1031
1032        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1033
1034        // Force checkpoint for next barrier
1035        periodic.force_checkpoint_in_next_barrier(DatabaseId::from(1));
1036
1037        let barrier = periodic.next_barrier(&context).await;
1038
1039        // Should be a checkpoint barrier due to force_checkpoint
1040        assert!(barrier.checkpoint);
1041        assert_eq!(barrier.database_id, DatabaseId::from(1));
1042        assert!(barrier.command.is_none());
1043    }
1044
1045    #[tokio::test]
1046    async fn test_next_barrier_checkpoint_frequency() {
1047        let databases = vec![create_test_database(1, Some(50), Some(2))]; // Checkpoint every 2 barriers
1048
1049        let mut periodic = PeriodicBarriers::new(Duration::from_millis(50), 10, databases);
1050
1051        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1052
1053        // First barrier - should not be checkpoint
1054        let barrier1 = periodic.next_barrier(&context).await;
1055        assert!(!barrier1.checkpoint);
1056
1057        // Second barrier - should be checkpoint (frequency = 2)
1058        let barrier2 = periodic.next_barrier(&context).await;
1059        assert!(barrier2.checkpoint);
1060
1061        // Third barrier - should not be checkpoint (counter reset)
1062        let barrier3 = periodic.next_barrier(&context).await;
1063        assert!(!barrier3.checkpoint);
1064    }
1065
1066    #[tokio::test]
1067    async fn test_update_database_barrier() {
1068        let databases = vec![create_test_database(1, Some(1000), Some(10))];
1069
1070        let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 20, databases);
1071
1072        // Update existing database
1073        periodic.update_database_barrier(DatabaseId::from(1), Some(2000), Some(15));
1074
1075        let db_state = periodic.databases.get(&DatabaseId::from(1)).unwrap();
1076        assert_eq!(db_state.barrier_interval, Some(Duration::from_millis(2000)));
1077        assert_eq!(db_state.checkpoint_frequency, Some(15));
1078        assert_eq!(db_state.num_uncheckpointed_barrier, 0);
1079        assert!(!db_state.force_checkpoint);
1080
1081        // Add new database
1082        periodic.update_database_barrier(DatabaseId::from(2), None, None);
1083
1084        assert!(periodic.databases.contains_key(&DatabaseId::from(2)));
1085        let db2_state = periodic.databases.get(&DatabaseId::from(2)).unwrap();
1086        assert_eq!(db2_state.barrier_interval, None);
1087        assert_eq!(db2_state.checkpoint_frequency, None);
1088    }
1089}