risingwave_meta/barrier/
schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, anyhow};
21use assert_matches::assert_matches;
22use await_tree::InstrumentAwait;
23use itertools::Itertools;
24use parking_lot::Mutex;
25use prometheus::HistogramTimer;
26use risingwave_common::catalog::{DatabaseId, TableId};
27use risingwave_common::metrics::LabelGuardedHistogram;
28use risingwave_hummock_sdk::HummockVersionId;
29use tokio::select;
30use tokio::sync::{oneshot, watch};
31use tokio::time::Interval;
32use tracing::{info, warn};
33
34use super::notifier::Notifier;
35use super::{Command, Scheduled};
36use crate::barrier::context::GlobalBarrierWorkerContext;
37use crate::hummock::HummockManagerRef;
38use crate::rpc::metrics::MetaMetrics;
39use crate::{MetaError, MetaResult};
40
41pub(super) struct NewBarrier {
42    pub command: Option<(DatabaseId, Command, Vec<Notifier>)>,
43    pub span: tracing::Span,
44    pub checkpoint: bool,
45}
46
47/// A queue for scheduling barriers.
48///
49/// We manually implement one here instead of using channels since we may need to update the front
50/// of the queue to add some notifiers for instant flushes.
51struct Inner {
52    queue: Mutex<ScheduledQueue>,
53
54    /// When `queue` is not empty anymore, all subscribers of this watcher will be notified.
55    changed_tx: watch::Sender<()>,
56
57    /// Used for recording send latency of each barrier.
58    metrics: Arc<MetaMetrics>,
59}
60
61#[derive(Debug)]
62enum QueueStatus {
63    /// The queue is ready to accept new command.
64    Ready,
65    /// The queue is blocked to accept new command with the given reason.
66    Blocked(String),
67}
68
69impl QueueStatus {
70    fn is_blocked(&self) -> bool {
71        matches!(self, Self::Blocked(_))
72    }
73}
74
75struct ScheduledQueueItem {
76    command: Command,
77    notifiers: Vec<Notifier>,
78    send_latency_timer: HistogramTimer,
79    span: tracing::Span,
80}
81
82struct StatusQueue<T> {
83    queue: T,
84    status: QueueStatus,
85}
86
87struct DatabaseQueue {
88    inner: VecDeque<ScheduledQueueItem>,
89    send_latency: LabelGuardedHistogram,
90}
91
92type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
93type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
94
95impl DatabaseScheduledQueue {
96    fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
97        Self {
98            queue: DatabaseQueue {
99                inner: Default::default(),
100                send_latency: metrics
101                    .barrier_send_latency
102                    .with_guarded_label_values(&[database_id.database_id.to_string().as_str()]),
103            },
104            status,
105        }
106    }
107}
108
109impl<T> StatusQueue<T> {
110    fn mark_blocked(&mut self, reason: String) {
111        self.status = QueueStatus::Blocked(reason);
112    }
113
114    fn mark_ready(&mut self) -> bool {
115        let prev_blocked = self.status.is_blocked();
116        self.status = QueueStatus::Ready;
117        prev_blocked
118    }
119
120    fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
121        // We don't allow any command to be scheduled when the queue is blocked, except for dropping streaming jobs.
122        // Because we allow dropping streaming jobs when the cluster is under recovery, so we have to buffer the drop
123        // command and execute it when the cluster is ready to clean up it.
124        // TODO: this is just a workaround to allow dropping streaming jobs when the cluster is under recovery,
125        // we need to refine it when catalog and streaming metadata can be handled in a transactional way.
126        if let QueueStatus::Blocked(reason) = &self.status
127            && !matches!(
128                command,
129                Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
130            )
131        {
132            return Err(MetaError::unavailable(reason));
133        }
134        Ok(())
135    }
136}
137
138fn tracing_span() -> tracing::Span {
139    if tracing::Span::current().is_none() {
140        tracing::Span::none()
141    } else {
142        tracing::info_span!(
143            "barrier",
144            checkpoint = tracing::field::Empty,
145            epoch = tracing::field::Empty
146        )
147    }
148}
149
150/// The sender side of the barrier scheduling queue.
151/// Can be cloned and held by other managers to schedule and run barriers.
152#[derive(Clone)]
153pub struct BarrierScheduler {
154    inner: Arc<Inner>,
155
156    /// Used for getting the latest snapshot after `FLUSH`.
157    hummock_manager: HummockManagerRef,
158}
159
160impl BarrierScheduler {
161    /// Create a pair of [`BarrierScheduler`] and [`ScheduledBarriers`], for scheduling barriers
162    /// from different managers, and executing them in the barrier manager, respectively.
163    pub fn new_pair(
164        hummock_manager: HummockManagerRef,
165        metrics: Arc<MetaMetrics>,
166    ) -> (Self, ScheduledBarriers) {
167        let inner = Arc::new(Inner {
168            queue: Mutex::new(ScheduledQueue {
169                queue: Default::default(),
170                status: QueueStatus::Ready,
171            }),
172            changed_tx: watch::channel(()).0,
173            metrics,
174        });
175
176        (
177            Self {
178                inner: inner.clone(),
179                hummock_manager,
180            },
181            ScheduledBarriers { inner },
182        )
183    }
184
185    /// Push a scheduled barrier into the queue.
186    fn push(
187        &self,
188        database_id: DatabaseId,
189        scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
190    ) -> MetaResult<()> {
191        let mut queue = self.inner.queue.lock();
192        let scheduleds = scheduleds.into_iter().collect_vec();
193        scheduleds
194            .iter()
195            .try_for_each(|(command, _)| queue.validate_item(command))?;
196        let queue = queue.queue.entry(database_id).or_insert_with(|| {
197            DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
198        });
199        scheduleds
200            .iter()
201            .try_for_each(|(command, _)| queue.validate_item(command))?;
202        for (command, notifier) in scheduleds {
203            queue.queue.inner.push_back(ScheduledQueueItem {
204                command,
205                notifiers: vec![notifier],
206                send_latency_timer: queue.queue.send_latency.start_timer(),
207                span: tracing_span(),
208            });
209            if queue.queue.inner.len() == 1 {
210                self.inner.changed_tx.send(()).ok();
211            }
212        }
213        Ok(())
214    }
215
216    /// Try to cancel scheduled cmd for create streaming job, return true if the command exists previously and get cancelled.
217    pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, table_id: TableId) -> bool {
218        let queue = &mut self.inner.queue.lock();
219        let Some(queue) = queue.queue.get_mut(&database_id) else {
220            return false;
221        };
222
223        if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
224            if let Command::CreateStreamingJob { info, .. } = &scheduled.command
225                && info.stream_job_fragments.stream_job_id() == table_id
226            {
227                true
228            } else {
229                false
230            }
231        }) {
232            queue.queue.inner.remove(idx).unwrap();
233            true
234        } else {
235            false
236        }
237    }
238
239    /// Run multiple commands and return when they're all completely finished (i.e., collected). It's ensured that
240    /// multiple commands are executed continuously.
241    ///
242    /// Returns the barrier info of each command.
243    ///
244    /// TODO: atomicity of multiple commands is not guaranteed.
245    #[await_tree::instrument("run_commands({})", commands.iter().join(", "))]
246    async fn run_multiple_commands(
247        &self,
248        database_id: DatabaseId,
249        commands: Vec<Command>,
250    ) -> MetaResult<()> {
251        let mut contexts = Vec::with_capacity(commands.len());
252        let mut scheduleds = Vec::with_capacity(commands.len());
253
254        for command in commands {
255            let (started_tx, started_rx) = oneshot::channel();
256            let (collect_tx, collect_rx) = oneshot::channel();
257
258            contexts.push((started_rx, collect_rx));
259            scheduleds.push((
260                command,
261                Notifier {
262                    started: Some(started_tx),
263                    collected: Some(collect_tx),
264                },
265            ));
266        }
267
268        self.push(database_id, scheduleds)?;
269
270        for (injected_rx, collect_rx) in contexts {
271            // Wait for this command to be injected, and record the result.
272            tracing::trace!("waiting for injected_rx");
273            injected_rx
274                .instrument_await("wait_injected")
275                .await
276                .ok()
277                .context("failed to inject barrier")??;
278
279            tracing::trace!("waiting for collect_rx");
280            // Throw the error if it occurs when collecting this barrier.
281            collect_rx
282                .instrument_await("wait_collected")
283                .await
284                .ok()
285                .context("failed to collect barrier")??;
286        }
287
288        Ok(())
289    }
290
291    /// Run a command and return when it's completely finished (i.e., collected).
292    ///
293    /// Returns the barrier info of the actual command.
294    pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
295        tracing::trace!("run_command: {:?}", command);
296        let ret = self.run_multiple_commands(database_id, vec![command]).await;
297        tracing::trace!("run_command finished");
298        ret
299    }
300
301    /// Flush means waiting for the next barrier to collect.
302    pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
303        let start = Instant::now();
304
305        tracing::debug!("start barrier flush");
306        self.run_multiple_commands(database_id, vec![Command::Flush])
307            .await?;
308
309        let elapsed = Instant::now().duration_since(start);
310        tracing::debug!("barrier flushed in {:?}", elapsed);
311
312        let version_id = self.hummock_manager.get_version_id().await;
313        Ok(version_id)
314    }
315}
316
317/// The receiver side of the barrier scheduling queue.
318pub struct ScheduledBarriers {
319    inner: Arc<Inner>,
320}
321
322/// Held by the [`crate::barrier::worker::GlobalBarrierWorker`] to execute these commands.
323pub(super) struct PeriodicBarriers {
324    min_interval: Interval,
325
326    /// Force checkpoint in next barrier.
327    force_checkpoint: bool,
328
329    /// The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true)
330    num_uncheckpointed_barrier: usize,
331    checkpoint_frequency: usize,
332}
333
334impl PeriodicBarriers {
335    pub(super) fn new(min_interval: Duration, checkpoint_frequency: usize) -> PeriodicBarriers {
336        Self {
337            min_interval: tokio::time::interval(min_interval),
338            force_checkpoint: false,
339            num_uncheckpointed_barrier: 0,
340            checkpoint_frequency,
341        }
342    }
343
344    pub(super) fn set_min_interval(&mut self, min_interval: Duration) {
345        let set_new_interval = min_interval != self.min_interval.period();
346        if set_new_interval {
347            let mut min_interval = tokio::time::interval(min_interval);
348            min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
349            self.min_interval = min_interval;
350        }
351    }
352
353    #[await_tree::instrument]
354    pub(super) async fn next_barrier(
355        &mut self,
356        context: &impl GlobalBarrierWorkerContext,
357    ) -> NewBarrier {
358        let checkpoint = self.try_get_checkpoint();
359        let scheduled = select! {
360            biased;
361            scheduled = context.next_scheduled() => {
362                self.min_interval.reset();
363                let checkpoint = scheduled.command.need_checkpoint() || checkpoint;
364                NewBarrier {
365                    command: Some((scheduled.database_id, scheduled.command, scheduled.notifiers)),
366                    span: scheduled.span,
367                    checkpoint,
368                }
369            },
370            _ = self.min_interval.tick() => {
371                NewBarrier {
372                    command: None,
373                    span: tracing_span(),
374                    checkpoint,
375                }
376            }
377        };
378        self.update_num_uncheckpointed_barrier(scheduled.checkpoint);
379        scheduled
380    }
381}
382
383impl ScheduledBarriers {
384    pub(super) async fn next_scheduled(&self) -> Scheduled {
385        'outer: loop {
386            let mut rx = self.inner.changed_tx.subscribe();
387            {
388                let mut queue = self.inner.queue.lock();
389                if queue.status.is_blocked() {
390                    continue;
391                }
392                for (database_id, queue) in &mut queue.queue {
393                    if queue.status.is_blocked() {
394                        continue;
395                    }
396                    if let Some(item) = queue.queue.inner.pop_front() {
397                        item.send_latency_timer.observe_duration();
398                        break 'outer Scheduled {
399                            database_id: *database_id,
400                            command: item.command,
401                            notifiers: item.notifiers,
402                            span: item.span,
403                        };
404                    }
405                }
406            }
407            rx.changed().await.unwrap();
408        }
409    }
410}
411
412pub(super) enum MarkReadyOptions {
413    Database(DatabaseId),
414    Global {
415        blocked_databases: HashSet<DatabaseId>,
416    },
417}
418
419impl ScheduledBarriers {
420    /// Pre buffered drop and cancel command, return true if any.
421    pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> bool {
422        self.pre_apply_drop_cancel_scheduled(database_id)
423    }
424
425    /// Mark command scheduler as blocked and abort all queued scheduled command and notify with
426    /// specific reason.
427    pub(super) fn abort_and_mark_blocked(
428        &self,
429        database_id: Option<DatabaseId>,
430        reason: impl Into<String>,
431    ) {
432        let mut queue = self.inner.queue.lock();
433        fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
434            format!("database {} unavailable {}", database_id, reason)
435        }
436        fn mark_blocked_and_notify_failed(
437            database_id: DatabaseId,
438            queue: &mut DatabaseScheduledQueue,
439            reason: &String,
440        ) {
441            let reason = database_blocked_reason(database_id, reason);
442            let err: MetaError = anyhow!("{}", reason).into();
443            queue.mark_blocked(reason);
444            while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
445                notifiers
446                    .into_iter()
447                    .for_each(|notify| notify.notify_collection_failed(err.clone()))
448            }
449        }
450        if let Some(database_id) = database_id {
451            let reason = reason.into();
452            match queue.queue.entry(database_id) {
453                Entry::Occupied(entry) => {
454                    let queue = entry.into_mut();
455                    if queue.status.is_blocked() {
456                        if cfg!(debug_assertions) {
457                            panic!("database {} marked as blocked twice", database_id);
458                        } else {
459                            warn!(?database_id, "database marked as blocked twice");
460                        }
461                    }
462                    info!(?database_id, "database marked as blocked");
463                    mark_blocked_and_notify_failed(database_id, queue, &reason);
464                }
465                Entry::Vacant(entry) => {
466                    entry.insert(DatabaseScheduledQueue::new(
467                        database_id,
468                        &self.inner.metrics,
469                        QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
470                    ));
471                }
472            }
473        } else {
474            let reason = reason.into();
475            if queue.status.is_blocked() {
476                if cfg!(debug_assertions) {
477                    panic!("cluster marked as blocked twice");
478                } else {
479                    warn!("cluster marked as blocked twice");
480                }
481            }
482            info!("cluster marked as blocked");
483            queue.mark_blocked(reason.clone());
484            for (database_id, queue) in &mut queue.queue {
485                mark_blocked_and_notify_failed(*database_id, queue, &reason);
486            }
487        }
488    }
489
490    /// Mark command scheduler as ready to accept new command.
491    pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
492        let mut queue = self.inner.queue.lock();
493        let queue = &mut *queue;
494        match options {
495            MarkReadyOptions::Database(database_id) => {
496                info!(?database_id, "database marked as ready");
497                let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
498                    DatabaseScheduledQueue::new(
499                        database_id,
500                        &self.inner.metrics,
501                        QueueStatus::Ready,
502                    )
503                });
504                if !database_queue.status.is_blocked() {
505                    if cfg!(debug_assertions) {
506                        panic!("database {} marked as ready twice", database_id);
507                    } else {
508                        warn!(?database_id, "database marked as ready twice");
509                    }
510                }
511                if database_queue.mark_ready()
512                    && !queue.status.is_blocked()
513                    && !database_queue.queue.inner.is_empty()
514                {
515                    self.inner.changed_tx.send(()).ok();
516                }
517            }
518            MarkReadyOptions::Global { blocked_databases } => {
519                if !queue.status.is_blocked() {
520                    if cfg!(debug_assertions) {
521                        panic!("cluster marked as ready twice");
522                    } else {
523                        warn!("cluster marked as ready twice");
524                    }
525                }
526                info!(?blocked_databases, "cluster marked as ready");
527                let prev_blocked = queue.mark_ready();
528                for database_id in &blocked_databases {
529                    queue.queue.entry(*database_id).or_insert_with(|| {
530                        DatabaseScheduledQueue::new(
531                            *database_id,
532                            &self.inner.metrics,
533                            QueueStatus::Blocked(format!(
534                                "database {} failed to recover in global recovery",
535                                database_id
536                            )),
537                        )
538                    });
539                }
540                for (database_id, queue) in &mut queue.queue {
541                    if !blocked_databases.contains(database_id) {
542                        queue.mark_ready();
543                    }
544                }
545                if prev_blocked
546                    && queue
547                        .queue
548                        .values()
549                        .any(|database_queue| !database_queue.queue.inner.is_empty())
550                {
551                    self.inner.changed_tx.send(()).ok();
552                }
553            }
554        }
555    }
556
557    /// Try to pre apply drop and cancel scheduled command and return them if any.
558    /// It should only be called in recovery.
559    pub(super) fn pre_apply_drop_cancel_scheduled(&self, database_id: Option<DatabaseId>) -> bool {
560        let mut queue = self.inner.queue.lock();
561        let mut applied = false;
562
563        let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
564            while let Some(ScheduledQueueItem {
565                notifiers, command, ..
566            }) = queue.queue.inner.pop_front()
567            {
568                match command {
569                    Command::DropStreamingJobs { .. } => {
570                        applied = true;
571                    }
572                    Command::DropSubscription { .. } => {}
573                    _ => {
574                        unreachable!("only drop and cancel streaming jobs should be buffered");
575                    }
576                }
577                notifiers.into_iter().for_each(|notify| {
578                    notify.notify_collected();
579                });
580            }
581        };
582
583        if let Some(database_id) = database_id {
584            assert_matches!(queue.status, QueueStatus::Ready);
585            if let Some(queue) = queue.queue.get_mut(&database_id) {
586                assert_matches!(queue.status, QueueStatus::Blocked(_));
587                pre_apply_drop_cancel(queue);
588            }
589        } else {
590            assert_matches!(queue.status, QueueStatus::Blocked(_));
591            for queue in queue.queue.values_mut() {
592                pre_apply_drop_cancel(queue);
593            }
594        }
595
596        applied
597    }
598}
599
600impl PeriodicBarriers {
601    /// Whether the barrier(checkpoint = true) should be injected.
602    fn try_get_checkpoint(&self) -> bool {
603        self.num_uncheckpointed_barrier + 1 >= self.checkpoint_frequency || self.force_checkpoint
604    }
605
606    /// Make the `checkpoint` of the next barrier must be true
607    pub fn force_checkpoint_in_next_barrier(&mut self) {
608        self.force_checkpoint = true;
609    }
610
611    /// Update the `checkpoint_frequency`
612    pub fn set_checkpoint_frequency(&mut self, frequency: usize) {
613        self.checkpoint_frequency = frequency;
614    }
615
616    /// Update the `num_uncheckpointed_barrier`
617    fn update_num_uncheckpointed_barrier(&mut self, checkpoint: bool) {
618        if checkpoint {
619            self.num_uncheckpointed_barrier = 0;
620            self.force_checkpoint = false;
621        } else {
622            self.num_uncheckpointed_barrier += 1;
623        }
624    }
625}