risingwave_meta/barrier/
schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use assert_matches::assert_matches;
21use await_tree::InstrumentAwait;
22use itertools::Itertools;
23use parking_lot::Mutex;
24use prometheus::HistogramTimer;
25use risingwave_common::catalog::{DatabaseId, TableId};
26use risingwave_common::id::JobId;
27use risingwave_common::metrics::LabelGuardedHistogram;
28use risingwave_hummock_sdk::HummockVersionId;
29use risingwave_pb::catalog::Database;
30use rw_futures_util::pending_on_none;
31use tokio::select;
32use tokio::sync::{oneshot, watch};
33use tokio::time::{Duration, Instant};
34use tokio_stream::wrappers::IntervalStream;
35use tokio_stream::{StreamExt, StreamMap};
36use tracing::{info, warn};
37
38use super::notifier::Notifier;
39use super::{Command, Scheduled};
40use crate::barrier::context::GlobalBarrierWorkerContext;
41use crate::hummock::HummockManagerRef;
42use crate::rpc::metrics::{GLOBAL_META_METRICS, MetaMetrics};
43use crate::{MetaError, MetaResult};
44
45pub(super) struct NewBarrier {
46    pub database_id: DatabaseId,
47    pub command: Option<(Command, Vec<Notifier>)>,
48    pub span: tracing::Span,
49    pub checkpoint: bool,
50}
51
52/// A queue for scheduling barriers.
53///
54/// We manually implement one here instead of using channels since we may need to update the front
55/// of the queue to add some notifiers for instant flushes.
56struct Inner {
57    queue: Mutex<ScheduledQueue>,
58
59    /// When `queue` is not empty anymore, all subscribers of this watcher will be notified.
60    changed_tx: watch::Sender<()>,
61
62    /// Used for recording send latency of each barrier.
63    metrics: Arc<MetaMetrics>,
64}
65
66#[derive(Debug)]
67enum QueueStatus {
68    /// The queue is ready to accept new command.
69    Ready,
70    /// The queue is blocked to accept new command with the given reason.
71    Blocked(String),
72}
73
74impl QueueStatus {
75    fn is_blocked(&self) -> bool {
76        matches!(self, Self::Blocked(_))
77    }
78}
79
80struct ScheduledQueueItem {
81    command: Command,
82    notifiers: Vec<Notifier>,
83    send_latency_timer: HistogramTimer,
84    span: tracing::Span,
85}
86
87struct StatusQueue<T> {
88    queue: T,
89    status: QueueStatus,
90}
91
92struct DatabaseQueue {
93    inner: VecDeque<ScheduledQueueItem>,
94    send_latency: LabelGuardedHistogram,
95}
96
97type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
98type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
99
100impl DatabaseScheduledQueue {
101    fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
102        Self {
103            queue: DatabaseQueue {
104                inner: Default::default(),
105                send_latency: metrics
106                    .barrier_send_latency
107                    .with_guarded_label_values(&[database_id.to_string().as_str()]),
108            },
109            status,
110        }
111    }
112}
113
114impl<T> StatusQueue<T> {
115    fn mark_blocked(&mut self, reason: String) {
116        self.status = QueueStatus::Blocked(reason);
117    }
118
119    fn mark_ready(&mut self) -> bool {
120        let prev_blocked = self.status.is_blocked();
121        self.status = QueueStatus::Ready;
122        prev_blocked
123    }
124
125    fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
126        // We don't allow any command to be scheduled when the queue is blocked, except for dropping streaming jobs.
127        // Because we allow dropping streaming jobs when the cluster is under recovery, so we have to buffer the drop
128        // command and execute it when the cluster is ready to clean up it.
129        // TODO: this is just a workaround to allow dropping streaming jobs when the cluster is under recovery,
130        // we need to refine it when catalog and streaming metadata can be handled in a transactional way.
131        if let QueueStatus::Blocked(reason) = &self.status
132            && !matches!(
133                command,
134                Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
135            )
136        {
137            return Err(MetaError::unavailable(reason));
138        }
139        Ok(())
140    }
141}
142
143fn tracing_span() -> tracing::Span {
144    if tracing::Span::current().is_none() {
145        tracing::Span::none()
146    } else {
147        tracing::info_span!(
148            "barrier",
149            checkpoint = tracing::field::Empty,
150            epoch = tracing::field::Empty
151        )
152    }
153}
154
155/// The sender side of the barrier scheduling queue.
156/// Can be cloned and held by other managers to schedule and run barriers.
157#[derive(Clone)]
158pub struct BarrierScheduler {
159    inner: Arc<Inner>,
160
161    /// Used for getting the latest snapshot after `FLUSH`.
162    hummock_manager: HummockManagerRef,
163}
164
165impl BarrierScheduler {
166    /// Create a pair of [`BarrierScheduler`] and [`ScheduledBarriers`], for scheduling barriers
167    /// from different managers, and executing them in the barrier manager, respectively.
168    pub fn new_pair(
169        hummock_manager: HummockManagerRef,
170        metrics: Arc<MetaMetrics>,
171    ) -> (Self, ScheduledBarriers) {
172        let inner = Arc::new(Inner {
173            queue: Mutex::new(ScheduledQueue {
174                queue: Default::default(),
175                status: QueueStatus::Ready,
176            }),
177            changed_tx: watch::channel(()).0,
178            metrics,
179        });
180
181        (
182            Self {
183                inner: inner.clone(),
184                hummock_manager,
185            },
186            ScheduledBarriers { inner },
187        )
188    }
189
190    /// Push a scheduled barrier into the queue.
191    fn push(
192        &self,
193        database_id: DatabaseId,
194        scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
195    ) -> MetaResult<()> {
196        let mut queue = self.inner.queue.lock();
197        let scheduleds = scheduleds.into_iter().collect_vec();
198        scheduleds
199            .iter()
200            .try_for_each(|(command, _)| queue.validate_item(command))?;
201        let queue = queue.queue.entry(database_id).or_insert_with(|| {
202            DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
203        });
204        scheduleds
205            .iter()
206            .try_for_each(|(command, _)| queue.validate_item(command))?;
207        for (command, notifier) in scheduleds {
208            queue.queue.inner.push_back(ScheduledQueueItem {
209                command,
210                notifiers: vec![notifier],
211                send_latency_timer: queue.queue.send_latency.start_timer(),
212                span: tracing_span(),
213            });
214            if queue.queue.inner.len() == 1 {
215                self.inner.changed_tx.send(()).ok();
216            }
217        }
218        Ok(())
219    }
220
221    /// Try to cancel scheduled cmd for create streaming job, return true if the command exists previously and get cancelled.
222    pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, job_id: JobId) -> bool {
223        let queue = &mut self.inner.queue.lock();
224        let Some(queue) = queue.queue.get_mut(&database_id) else {
225            return false;
226        };
227
228        if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
229            if let Command::CreateStreamingJob { info, .. } = &scheduled.command
230                && info.stream_job_fragments.stream_job_id() == job_id
231            {
232                true
233            } else {
234                false
235            }
236        }) {
237            queue.queue.inner.remove(idx).unwrap();
238            true
239        } else {
240            false
241        }
242    }
243
244    /// Run multiple commands and return when they're all completely finished (i.e., collected). It's ensured that
245    /// multiple commands are executed continuously.
246    ///
247    /// Returns the barrier info of each command.
248    ///
249    /// TODO: atomicity of multiple commands is not guaranteed.
250    #[await_tree::instrument("run_commands({})", commands.iter().join(", "))]
251    async fn run_multiple_commands(
252        &self,
253        database_id: DatabaseId,
254        commands: Vec<Command>,
255    ) -> MetaResult<()> {
256        let mut contexts = Vec::with_capacity(commands.len());
257        let mut scheduleds = Vec::with_capacity(commands.len());
258
259        for command in commands {
260            let (started_tx, started_rx) = oneshot::channel();
261            let (collect_tx, collect_rx) = oneshot::channel();
262
263            contexts.push((started_rx, collect_rx));
264            scheduleds.push((
265                command,
266                Notifier {
267                    started: Some(started_tx),
268                    collected: Some(collect_tx),
269                },
270            ));
271        }
272
273        self.push(database_id, scheduleds)?;
274
275        for (injected_rx, collect_rx) in contexts {
276            // Wait for this command to be injected, and record the result.
277            tracing::trace!("waiting for injected_rx");
278            injected_rx
279                .instrument_await("wait_injected")
280                .await
281                .ok()
282                .context("failed to inject barrier")??;
283
284            tracing::trace!("waiting for collect_rx");
285            // Throw the error if it occurs when collecting this barrier.
286            collect_rx
287                .instrument_await("wait_collected")
288                .await
289                .ok()
290                .context("failed to collect barrier")??;
291        }
292
293        Ok(())
294    }
295
296    /// Run a command and return when it's completely finished (i.e., collected).
297    ///
298    /// Returns the barrier info of the actual command.
299    pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
300        tracing::trace!("run_command: {:?}", command);
301        let ret = self.run_multiple_commands(database_id, vec![command]).await;
302        tracing::trace!("run_command finished");
303        ret
304    }
305
306    /// Schedule a command without waiting for it to be executed.
307    pub fn run_command_no_wait(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
308        tracing::trace!("run_command_no_wait: {:?}", command);
309        self.push(database_id, vec![(command, Notifier::default())])
310    }
311
312    /// Flush means waiting for the next barrier to collect.
313    pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
314        let start = Instant::now();
315
316        tracing::debug!("start barrier flush");
317        self.run_multiple_commands(database_id, vec![Command::Flush])
318            .await?;
319
320        let elapsed = Instant::now().duration_since(start);
321        tracing::debug!("barrier flushed in {:?}", elapsed);
322
323        let version_id = self.hummock_manager.get_version_id().await;
324        Ok(version_id)
325    }
326}
327
328/// The receiver side of the barrier scheduling queue.
329pub struct ScheduledBarriers {
330    inner: Arc<Inner>,
331}
332
333/// State specific to each database for barrier generation.
334#[derive(Debug)]
335pub struct DatabaseBarrierState {
336    barrier_interval: Option<Duration>,
337    checkpoint_frequency: Option<u64>,
338    // The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true)
339    num_uncheckpointed_barrier: u64,
340}
341
342impl DatabaseBarrierState {
343    fn new(barrier_interval_ms: Option<u32>, checkpoint_frequency: Option<u64>) -> Self {
344        Self {
345            barrier_interval: barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64)),
346            checkpoint_frequency,
347            num_uncheckpointed_barrier: 0,
348        }
349    }
350}
351
352/// Held by the [`crate::barrier::worker::GlobalBarrierWorker`] to execute these commands.
353#[derive(Default, Debug)]
354pub struct PeriodicBarriers {
355    /// Default system params for barrier interval and checkpoint frequency.
356    sys_barrier_interval: Duration,
357    sys_checkpoint_frequency: u64,
358    /// Per-database state.
359    databases: HashMap<DatabaseId, DatabaseBarrierState>,
360    /// Holds `IntervalStream` for each database, keyed by `DatabaseId`.
361    /// `StreamMap` will yield `(DatabaseId, Instant)` when a timer ticks.
362    timer_streams: StreamMap<DatabaseId, IntervalStream>,
363    force_checkpoint_databases: HashSet<DatabaseId>,
364}
365
366impl PeriodicBarriers {
367    pub(super) fn new(
368        sys_barrier_interval: Duration,
369        sys_checkpoint_frequency: u64,
370        database_infos: Vec<Database>,
371    ) -> Self {
372        let mut databases = HashMap::with_capacity(database_infos.len());
373        let mut timer_streams = StreamMap::with_capacity(database_infos.len());
374        database_infos.into_iter().for_each(|database| {
375            let database_id: DatabaseId = database.id;
376            let barrier_interval_ms = database.barrier_interval_ms;
377            let checkpoint_frequency = database.checkpoint_frequency;
378            databases.insert(
379                database_id,
380                DatabaseBarrierState::new(barrier_interval_ms, checkpoint_frequency),
381            );
382            let duration = if let Some(ms) = barrier_interval_ms {
383                Duration::from_millis(ms as u64)
384            } else {
385                sys_barrier_interval
386            };
387
388            // Create an `IntervalStream` for the database with the specified interval.
389            let interval_stream = Self::new_interval_stream(duration, &database_id);
390            timer_streams.insert(database_id, interval_stream);
391        });
392        Self {
393            sys_barrier_interval,
394            sys_checkpoint_frequency,
395            databases,
396            timer_streams,
397            force_checkpoint_databases: Default::default(),
398        }
399    }
400
401    // Create a new interval stream with the specified duration.
402    fn new_interval_stream(duration: Duration, database_id: &DatabaseId) -> IntervalStream {
403        GLOBAL_META_METRICS
404            .barrier_interval_by_database
405            .with_label_values(&[&database_id.to_string()])
406            .set(duration.as_millis_f64());
407        let mut interval = tokio::time::interval(duration);
408        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
409        IntervalStream::new(interval)
410    }
411
412    /// Update the system barrier interval.
413    pub(super) fn set_sys_barrier_interval(&mut self, duration: Duration) {
414        if self.sys_barrier_interval == duration {
415            return;
416        }
417        self.sys_barrier_interval = duration;
418        // Reset the `IntervalStream` for all databases that use default param.
419        for (db_id, db_state) in &mut self.databases {
420            if db_state.barrier_interval.is_none() {
421                let interval_stream = Self::new_interval_stream(duration, db_id);
422                self.timer_streams.insert(*db_id, interval_stream);
423            }
424        }
425    }
426
427    /// Update the system checkpoint frequency.
428    pub fn set_sys_checkpoint_frequency(&mut self, frequency: u64) {
429        if self.sys_checkpoint_frequency == frequency {
430            return;
431        }
432        self.sys_checkpoint_frequency = frequency;
433        // Reset the `num_uncheckpointed_barrier` for all databases that use default param.
434        for db_state in self.databases.values_mut() {
435            if db_state.checkpoint_frequency.is_none() {
436                db_state.num_uncheckpointed_barrier = 0;
437            }
438        }
439    }
440
441    pub(super) fn update_database_barrier(
442        &mut self,
443        database_id: DatabaseId,
444        barrier_interval_ms: Option<u32>,
445        checkpoint_frequency: Option<u64>,
446    ) {
447        match self.databases.entry(database_id) {
448            Entry::Occupied(mut entry) => {
449                let db_state = entry.get_mut();
450                db_state.barrier_interval =
451                    barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64));
452                db_state.checkpoint_frequency = checkpoint_frequency;
453                // Reset the `num_uncheckpointed_barrier` since the barrier interval or checkpoint frequency is changed.
454                db_state.num_uncheckpointed_barrier = 0;
455            }
456            Entry::Vacant(entry) => {
457                entry.insert(DatabaseBarrierState::new(
458                    barrier_interval_ms,
459                    checkpoint_frequency,
460                ));
461            }
462        }
463
464        // If the database already has a timer stream, reset it with the new interval.
465        let duration = if let Some(ms) = barrier_interval_ms {
466            Duration::from_millis(ms as u64)
467        } else {
468            self.sys_barrier_interval
469        };
470
471        let interval_stream = Self::new_interval_stream(duration, &database_id);
472        self.timer_streams.insert(database_id, interval_stream);
473    }
474
475    /// Make the `checkpoint` of the next barrier must be true.
476    pub fn force_checkpoint_in_next_barrier(&mut self, database_id: DatabaseId) {
477        if self.databases.contains_key(&database_id) {
478            self.force_checkpoint_databases.insert(database_id);
479        } else {
480            warn!(
481                ?database_id,
482                "force checkpoint in next barrier for non-existing database"
483            );
484        }
485    }
486
487    fn reset_database_timer(&mut self, database_id: DatabaseId) {
488        // Check if the database exists.
489        assert!(
490            self.databases.contains_key(&database_id),
491            "database {} not found in scheduled barriers",
492            database_id
493        );
494        assert!(
495            self.timer_streams.contains_key(&database_id),
496            "timer stream for database {} not found in scheduled barriers",
497            database_id
498        );
499        // New command will trigger the barriers, so reset the timer for the specific database.
500        for (db_id, timer_stream) in self.timer_streams.iter_mut() {
501            if *db_id == database_id {
502                timer_stream.as_mut().reset();
503            }
504        }
505    }
506
507    #[await_tree::instrument]
508    pub(super) async fn next_barrier(
509        &mut self,
510        context: &impl GlobalBarrierWorkerContext,
511    ) -> NewBarrier {
512        let force_checkpoint_database = self.force_checkpoint_databases.drain().next();
513        let new_barrier = if let Some(database_id) = force_checkpoint_database {
514            self.reset_database_timer(database_id);
515            NewBarrier {
516                database_id,
517                command: None,
518                span: tracing_span(),
519                checkpoint: true,
520            }
521        } else {
522            select! {
523                biased;
524                scheduled = context.next_scheduled() => {
525                    let database_id = scheduled.database_id;
526                    self.reset_database_timer(database_id);
527                    let checkpoint = scheduled.command.need_checkpoint() || self.try_get_checkpoint(database_id);
528                    NewBarrier {
529                        database_id: scheduled.database_id,
530                        command: Some((scheduled.command, scheduled.notifiers)),
531                        span: scheduled.span,
532                        checkpoint,
533                    }
534                },
535                // If there is no database, we won't wait for `Interval`, but only wait for command.
536                // Normally it will not return None, because there is always at least one database.
537                (database_id, _instant) = pending_on_none(self.timer_streams.next()) => {
538                    let checkpoint = self.try_get_checkpoint(database_id);
539                    NewBarrier {
540                        database_id,
541                        command: None,
542                        span: tracing_span(),
543                        checkpoint,
544                    }
545                }
546            }
547        };
548        self.update_num_uncheckpointed_barrier(new_barrier.database_id, new_barrier.checkpoint);
549
550        new_barrier
551    }
552
553    /// Whether the barrier(checkpoint = true) should be injected.
554    fn try_get_checkpoint(&self, database_id: DatabaseId) -> bool {
555        let db_state = self.databases.get(&database_id).unwrap();
556        let checkpoint_frequency = db_state
557            .checkpoint_frequency
558            .unwrap_or(self.sys_checkpoint_frequency);
559        db_state.num_uncheckpointed_barrier + 1 >= checkpoint_frequency
560    }
561
562    /// Update the `num_uncheckpointed_barrier`
563    fn update_num_uncheckpointed_barrier(&mut self, database_id: DatabaseId, checkpoint: bool) {
564        let db_state = self.databases.get_mut(&database_id).unwrap();
565        if checkpoint {
566            db_state.num_uncheckpointed_barrier = 0;
567        } else {
568            db_state.num_uncheckpointed_barrier += 1;
569        }
570    }
571}
572
573impl ScheduledBarriers {
574    pub(super) async fn next_scheduled(&self) -> Scheduled {
575        'outer: loop {
576            let mut rx = self.inner.changed_tx.subscribe();
577            {
578                let mut queue = self.inner.queue.lock();
579                if queue.status.is_blocked() {
580                    continue;
581                }
582                for (database_id, queue) in &mut queue.queue {
583                    if queue.status.is_blocked() {
584                        continue;
585                    }
586                    if let Some(item) = queue.queue.inner.pop_front() {
587                        item.send_latency_timer.observe_duration();
588                        break 'outer Scheduled {
589                            database_id: *database_id,
590                            command: item.command,
591                            notifiers: item.notifiers,
592                            span: item.span,
593                        };
594                    }
595                }
596            }
597            rx.changed().await.unwrap();
598        }
599    }
600}
601
602pub(super) enum MarkReadyOptions {
603    Database(DatabaseId),
604    Global {
605        blocked_databases: HashSet<DatabaseId>,
606    },
607}
608
609impl ScheduledBarriers {
610    /// Pre buffered drop and cancel command, return all dropped state tables if any.
611    pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> Vec<TableId> {
612        self.pre_apply_drop_cancel_scheduled(database_id)
613    }
614
615    /// Mark command scheduler as blocked and abort all queued scheduled command and notify with
616    /// specific reason.
617    pub(super) fn abort_and_mark_blocked(
618        &self,
619        database_id: Option<DatabaseId>,
620        reason: impl Into<String>,
621    ) {
622        let mut queue = self.inner.queue.lock();
623        fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
624            format!("database {} unavailable {}", database_id, reason)
625        }
626        fn mark_blocked_and_notify_failed(
627            database_id: DatabaseId,
628            queue: &mut DatabaseScheduledQueue,
629            reason: &String,
630        ) {
631            let reason = database_blocked_reason(database_id, reason);
632            let err: MetaError = anyhow!("{}", reason).into();
633            queue.mark_blocked(reason);
634            while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
635                notifiers
636                    .into_iter()
637                    .for_each(|notify| notify.notify_collection_failed(err.clone()))
638            }
639        }
640        if let Some(database_id) = database_id {
641            let reason = reason.into();
642            match queue.queue.entry(database_id) {
643                Entry::Occupied(entry) => {
644                    let queue = entry.into_mut();
645                    if queue.status.is_blocked() {
646                        if cfg!(debug_assertions) {
647                            panic!("database {} marked as blocked twice", database_id);
648                        } else {
649                            warn!(?database_id, "database marked as blocked twice");
650                        }
651                    }
652                    info!(?database_id, "database marked as blocked");
653                    mark_blocked_and_notify_failed(database_id, queue, &reason);
654                }
655                Entry::Vacant(entry) => {
656                    entry.insert(DatabaseScheduledQueue::new(
657                        database_id,
658                        &self.inner.metrics,
659                        QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
660                    ));
661                }
662            }
663        } else {
664            let reason = reason.into();
665            if queue.status.is_blocked() {
666                if cfg!(debug_assertions) {
667                    panic!("cluster marked as blocked twice");
668                } else {
669                    warn!("cluster marked as blocked twice");
670                }
671            }
672            info!("cluster marked as blocked");
673            queue.mark_blocked(reason.clone());
674            for (database_id, queue) in &mut queue.queue {
675                mark_blocked_and_notify_failed(*database_id, queue, &reason);
676            }
677        }
678    }
679
680    /// Mark command scheduler as ready to accept new command.
681    pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
682        let mut queue = self.inner.queue.lock();
683        let queue = &mut *queue;
684        match options {
685            MarkReadyOptions::Database(database_id) => {
686                info!(?database_id, "database marked as ready");
687                let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
688                    DatabaseScheduledQueue::new(
689                        database_id,
690                        &self.inner.metrics,
691                        QueueStatus::Ready,
692                    )
693                });
694                if !database_queue.status.is_blocked() {
695                    if cfg!(debug_assertions) {
696                        panic!("database {} marked as ready twice", database_id);
697                    } else {
698                        warn!(?database_id, "database marked as ready twice");
699                    }
700                }
701                if database_queue.mark_ready()
702                    && !queue.status.is_blocked()
703                    && !database_queue.queue.inner.is_empty()
704                {
705                    self.inner.changed_tx.send(()).ok();
706                }
707            }
708            MarkReadyOptions::Global { blocked_databases } => {
709                if !queue.status.is_blocked() {
710                    if cfg!(debug_assertions) {
711                        panic!("cluster marked as ready twice");
712                    } else {
713                        warn!("cluster marked as ready twice");
714                    }
715                }
716                info!(?blocked_databases, "cluster marked as ready");
717                let prev_blocked = queue.mark_ready();
718                for database_id in &blocked_databases {
719                    queue.queue.entry(*database_id).or_insert_with(|| {
720                        DatabaseScheduledQueue::new(
721                            *database_id,
722                            &self.inner.metrics,
723                            QueueStatus::Blocked(format!(
724                                "database {} failed to recover in global recovery",
725                                database_id
726                            )),
727                        )
728                    });
729                }
730                for (database_id, queue) in &mut queue.queue {
731                    if !blocked_databases.contains(database_id) {
732                        queue.mark_ready();
733                    }
734                }
735                if prev_blocked
736                    && queue
737                        .queue
738                        .values()
739                        .any(|database_queue| !database_queue.queue.inner.is_empty())
740                {
741                    self.inner.changed_tx.send(()).ok();
742                }
743            }
744        }
745    }
746
747    /// Try to pre apply drop and cancel scheduled command and return all dropped state tables if any.
748    /// It should only be called in recovery.
749    pub(super) fn pre_apply_drop_cancel_scheduled(
750        &self,
751        database_id: Option<DatabaseId>,
752    ) -> Vec<TableId> {
753        let mut queue = self.inner.queue.lock();
754        let mut dropped_tables = vec![];
755
756        let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
757            while let Some(ScheduledQueueItem {
758                notifiers, command, ..
759            }) = queue.queue.inner.pop_front()
760            {
761                match command {
762                    Command::DropStreamingJobs {
763                        unregistered_state_table_ids,
764                        ..
765                    } => {
766                        dropped_tables.extend(unregistered_state_table_ids);
767                    }
768                    Command::DropSubscription { .. } => {}
769                    _ => {
770                        unreachable!("only drop and cancel streaming jobs should be buffered");
771                    }
772                }
773                notifiers.into_iter().for_each(|notify| {
774                    notify.notify_collected();
775                });
776            }
777        };
778
779        if let Some(database_id) = database_id {
780            assert_matches!(queue.status, QueueStatus::Ready);
781            if let Some(queue) = queue.queue.get_mut(&database_id) {
782                assert_matches!(queue.status, QueueStatus::Blocked(_));
783                pre_apply_drop_cancel(queue);
784            }
785        } else {
786            assert_matches!(queue.status, QueueStatus::Blocked(_));
787            for queue in queue.queue.values_mut() {
788                pre_apply_drop_cancel(queue);
789            }
790        }
791
792        dropped_tables
793    }
794}
795
796#[cfg(test)]
797mod tests {
798    use futures::FutureExt;
799
800    use super::*;
801
802    fn create_test_database(
803        id: u32,
804        barrier_interval_ms: Option<u32>,
805        checkpoint_frequency: Option<u64>,
806    ) -> Database {
807        Database {
808            id: id.into(),
809            name: format!("test_db_{}", id),
810            barrier_interval_ms,
811            checkpoint_frequency,
812            ..Default::default()
813        }
814    }
815
816    // Mock context for testing next_barrier
817    struct MockGlobalBarrierWorkerContext {
818        scheduled_rx: tokio::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<Scheduled>>,
819    }
820
821    impl MockGlobalBarrierWorkerContext {
822        fn new() -> (Self, tokio::sync::mpsc::UnboundedSender<Scheduled>) {
823            let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
824            (
825                Self {
826                    scheduled_rx: tokio::sync::Mutex::new(rx),
827                },
828                tx,
829            )
830        }
831    }
832
833    impl GlobalBarrierWorkerContext for MockGlobalBarrierWorkerContext {
834        async fn next_scheduled(&self) -> Scheduled {
835            self.scheduled_rx.lock().await.recv().await.unwrap()
836        }
837
838        async fn commit_epoch(
839            &self,
840            _commit_info: crate::hummock::CommitEpochInfo,
841        ) -> MetaResult<risingwave_pb::hummock::HummockVersionStats> {
842            unimplemented!()
843        }
844
845        fn abort_and_mark_blocked(
846            &self,
847            _database_id: Option<DatabaseId>,
848            _recovery_reason: crate::barrier::RecoveryReason,
849        ) {
850            unimplemented!()
851        }
852
853        fn mark_ready(&self, _options: MarkReadyOptions) {
854            unimplemented!()
855        }
856
857        async fn post_collect_command<'a>(
858            &'a self,
859            _command: &'a crate::barrier::command::CommandContext,
860        ) -> MetaResult<()> {
861            unimplemented!()
862        }
863
864        async fn notify_creating_job_failed(&self, _database_id: Option<DatabaseId>, _err: String) {
865            unimplemented!()
866        }
867
868        async fn finish_creating_job(
869            &self,
870            _job: crate::barrier::progress::TrackingJob,
871        ) -> MetaResult<()> {
872            unimplemented!()
873        }
874
875        async fn new_control_stream(
876            &self,
877            _node: &risingwave_pb::common::WorkerNode,
878            _init_request: &risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest,
879        ) -> MetaResult<risingwave_rpc_client::StreamingControlHandle> {
880            unimplemented!()
881        }
882
883        async fn reload_runtime_info(
884            &self,
885        ) -> MetaResult<crate::barrier::BarrierWorkerRuntimeInfoSnapshot> {
886            unimplemented!()
887        }
888
889        async fn reload_database_runtime_info(
890            &self,
891            _database_id: DatabaseId,
892        ) -> MetaResult<Option<crate::barrier::DatabaseRuntimeInfoSnapshot>> {
893            unimplemented!()
894        }
895
896        async fn handle_list_finished_source_ids(
897            &self,
898            _list_finished_source_ids: Vec<
899                risingwave_pb::stream_service::barrier_complete_response::PbListFinishedSource,
900            >,
901        ) -> MetaResult<()> {
902            unimplemented!()
903        }
904
905        async fn handle_load_finished_source_ids(
906            &self,
907            _load_finished_source_ids: Vec<
908                risingwave_pb::stream_service::barrier_complete_response::PbLoadFinishedSource,
909            >,
910        ) -> MetaResult<()> {
911            unimplemented!()
912        }
913
914        async fn finish_cdc_table_backfill(&self, _job_id: JobId) -> MetaResult<()> {
915            unimplemented!()
916        }
917
918        async fn handle_refresh_finished_table_ids(
919            &self,
920            _refresh_finished_table_ids: Vec<JobId>,
921        ) -> MetaResult<()> {
922            unimplemented!()
923        }
924    }
925
926    #[tokio::test(start_paused = true)]
927    async fn test_next_barrier_with_different_intervals() {
928        // Create databases with different intervals
929        let databases = vec![
930            create_test_database(1, Some(50), Some(2)), // 50ms interval, checkpoint every 2
931            create_test_database(2, Some(100), Some(3)), // 100ms interval, checkpoint every 3
932            create_test_database(3, None, Some(5)), /* Use system default (200ms), checkpoint every 5 */
933        ];
934
935        let mut periodic = PeriodicBarriers::new(
936            Duration::from_millis(200), // System default
937            10,                         // System checkpoint frequency
938            databases,
939        );
940
941        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
942
943        // Call next_barrier for each database once, because the first tick is returned immediately
944        for _ in 0..3 {
945            let barrier = periodic.next_barrier(&context).await;
946            assert!(barrier.command.is_none()); // Should be a periodic barrier, not a scheduled command
947            assert!(!barrier.checkpoint); // First barrier shouldn't be a checkpoint
948        }
949
950        // Since we have 3 databases with intervals 50ms, 100ms, and 200ms,
951        // the first barrier should come from database 1 (50ms interval)
952        let start_time = Instant::now();
953        let barrier = periodic.next_barrier(&context).await;
954        let mut elapsed = start_time.elapsed();
955
956        // Verify the barrier properties
957        assert_eq!(barrier.database_id, DatabaseId::from(1));
958        assert!(barrier.command.is_none()); // Should be a periodic barrier, not a scheduled command
959        assert!(barrier.checkpoint); // Second barrier should be checkpoint for database 1
960        // Use tokio's time pause mechanism, so it will be exactly 50ms here.
961        assert_eq!(
962            elapsed,
963            Duration::from_millis(50),
964            "Elapsed time exceeded: {:?}",
965            elapsed
966        );
967
968        // Verify that the checkpoint frequency works
969        let db1_id = DatabaseId::from(1);
970        let db1_state = periodic.databases.get_mut(&db1_id).unwrap();
971        assert_eq!(db1_state.num_uncheckpointed_barrier, 0); // Should reset after checkpoint
972
973        // Next barrier should come from database 1 and database 2 at 100ms
974        for _ in 0..2 {
975            let barrier = periodic.next_barrier(&context).await;
976            assert!(barrier.command.is_none()); // Should be a periodic barrier, not a scheduled command
977            assert!(!barrier.checkpoint); // Next two barriers shouldn't be checkpoints
978        }
979
980        elapsed = start_time.elapsed();
981
982        assert_eq!(
983            elapsed,
984            Duration::from_millis(100),
985            "Elapsed time exceeded: {:?}",
986            elapsed
987        );
988    }
989
990    #[tokio::test]
991    async fn test_next_barrier_with_scheduled_command() {
992        let databases = vec![
993            create_test_database(1, Some(1000), Some(2)), // Long interval to avoid interference
994        ];
995
996        let mut periodic = PeriodicBarriers::new(Duration::from_millis(1000), 10, databases);
997
998        let (context, tx) = MockGlobalBarrierWorkerContext::new();
999
1000        // Skip the first barrier to let the timers start
1001        periodic.next_barrier(&context).await;
1002
1003        // Schedule a command
1004        let scheduled_command = Scheduled {
1005            database_id: DatabaseId::from(1),
1006            command: Command::Flush,
1007            notifiers: vec![],
1008            span: tracing::Span::none(),
1009        };
1010
1011        // Send scheduled command in background
1012        let tx_clone = tx.clone();
1013        tokio::spawn(async move {
1014            tokio::time::sleep(Duration::from_millis(10)).await;
1015            tx_clone.send(scheduled_command).unwrap();
1016        });
1017
1018        let barrier = periodic.next_barrier(&context).await;
1019
1020        // Should return the scheduled command
1021        assert!(barrier.command.is_some());
1022        assert_eq!(barrier.database_id, DatabaseId::from(1));
1023
1024        if let Some((command, _)) = barrier.command {
1025            assert!(matches!(command, Command::Flush));
1026        }
1027    }
1028
1029    #[tokio::test(start_paused = true)]
1030    async fn test_next_barrier_multiple_databases_timing() {
1031        let databases = vec![
1032            create_test_database(1, Some(30), Some(10)), // Fast interval
1033            create_test_database(2, Some(100), Some(10)), // Slower interval
1034        ];
1035
1036        let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 10, databases);
1037
1038        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1039
1040        // Skip first 2 barriers to let the timers start
1041        for _ in 0..2 {
1042            periodic.next_barrier(&context).await;
1043        }
1044
1045        let mut barrier_counts = HashMap::new();
1046
1047        // Collect barriers for a short period
1048        let mut barriers = Vec::new();
1049        for _ in 0..5 {
1050            let barrier = periodic.next_barrier(&context).await;
1051            barriers.push(barrier);
1052        }
1053
1054        // Count barriers per database
1055        for barrier in barriers {
1056            *barrier_counts.entry(barrier.database_id).or_insert(0) += 1;
1057        }
1058
1059        // Database 1 (30ms interval) should have more barriers than database 2 (100ms interval)
1060        let db1_count = barrier_counts.get(&DatabaseId::from(1)).unwrap_or(&0);
1061        let db2_count = barrier_counts.get(&DatabaseId::from(2)).unwrap_or(&0);
1062
1063        // Due to timing, db1 should generally have more barriers, but allow for some variance
1064        assert_eq!(*db1_count, 4);
1065        assert_eq!(*db2_count, 1);
1066    }
1067
1068    #[tokio::test]
1069    async fn test_next_barrier_force_checkpoint() {
1070        let databases = vec![create_test_database(1, Some(100), Some(10))];
1071
1072        let mut periodic = PeriodicBarriers::new(Duration::from_millis(100), 10, databases);
1073
1074        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1075
1076        // Force checkpoint for next barrier
1077        periodic.force_checkpoint_in_next_barrier(DatabaseId::from(1));
1078
1079        let barrier = periodic.next_barrier(&context).now_or_never().unwrap();
1080
1081        // Should be a checkpoint barrier due to force_checkpoint
1082        assert!(barrier.checkpoint);
1083        assert_eq!(barrier.database_id, DatabaseId::from(1));
1084        assert!(barrier.command.is_none());
1085    }
1086
1087    #[tokio::test]
1088    async fn test_next_barrier_checkpoint_frequency() {
1089        let databases = vec![create_test_database(1, Some(50), Some(2))]; // Checkpoint every 2 barriers
1090
1091        let mut periodic = PeriodicBarriers::new(Duration::from_millis(50), 10, databases);
1092
1093        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1094
1095        // First barrier - should not be checkpoint
1096        let barrier1 = periodic.next_barrier(&context).await;
1097        assert!(!barrier1.checkpoint);
1098
1099        // Second barrier - should be checkpoint (frequency = 2)
1100        let barrier2 = periodic.next_barrier(&context).await;
1101        assert!(barrier2.checkpoint);
1102
1103        // Third barrier - should not be checkpoint (counter reset)
1104        let barrier3 = periodic.next_barrier(&context).await;
1105        assert!(!barrier3.checkpoint);
1106    }
1107
1108    #[tokio::test]
1109    async fn test_update_database_barrier() {
1110        let databases = vec![create_test_database(1, Some(1000), Some(10))];
1111
1112        let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 20, databases);
1113
1114        let database_id = DatabaseId::new(1);
1115
1116        // Update existing database
1117        periodic.update_database_barrier(database_id, Some(2000), Some(15));
1118
1119        let db_state = periodic.databases.get(&database_id).unwrap();
1120        assert_eq!(db_state.barrier_interval, Some(Duration::from_millis(2000)));
1121        assert_eq!(db_state.checkpoint_frequency, Some(15));
1122        assert_eq!(db_state.num_uncheckpointed_barrier, 0);
1123        assert!(!periodic.force_checkpoint_databases.contains(&database_id));
1124
1125        // Add new database
1126        periodic.update_database_barrier(DatabaseId::from(2), None, None);
1127
1128        assert!(periodic.databases.contains_key(&DatabaseId::from(2)));
1129        let db2_state = periodic.databases.get(&DatabaseId::from(2)).unwrap();
1130        assert_eq!(db2_state.barrier_interval, None);
1131        assert_eq!(db2_state.checkpoint_frequency, None);
1132    }
1133}