risingwave_meta/barrier/
schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, anyhow};
21use assert_matches::assert_matches;
22use itertools::Itertools;
23use parking_lot::Mutex;
24use prometheus::HistogramTimer;
25use risingwave_common::catalog::{DatabaseId, TableId};
26use risingwave_common::metrics::LabelGuardedHistogram;
27use risingwave_hummock_sdk::HummockVersionId;
28use tokio::select;
29use tokio::sync::{oneshot, watch};
30use tokio::time::Interval;
31use tracing::{info, warn};
32
33use super::notifier::Notifier;
34use super::{Command, Scheduled};
35use crate::barrier::context::GlobalBarrierWorkerContext;
36use crate::hummock::HummockManagerRef;
37use crate::rpc::metrics::MetaMetrics;
38use crate::{MetaError, MetaResult};
39
40pub(super) struct NewBarrier {
41    pub command: Option<(DatabaseId, Command, Vec<Notifier>)>,
42    pub span: tracing::Span,
43    pub checkpoint: bool,
44}
45
46/// A queue for scheduling barriers.
47///
48/// We manually implement one here instead of using channels since we may need to update the front
49/// of the queue to add some notifiers for instant flushes.
50struct Inner {
51    queue: Mutex<ScheduledQueue>,
52
53    /// When `queue` is not empty anymore, all subscribers of this watcher will be notified.
54    changed_tx: watch::Sender<()>,
55
56    /// Used for recording send latency of each barrier.
57    metrics: Arc<MetaMetrics>,
58}
59
60#[derive(Debug)]
61enum QueueStatus {
62    /// The queue is ready to accept new command.
63    Ready,
64    /// The queue is blocked to accept new command with the given reason.
65    Blocked(String),
66}
67
68impl QueueStatus {
69    fn is_blocked(&self) -> bool {
70        matches!(self, Self::Blocked(_))
71    }
72}
73
74struct ScheduledQueueItem {
75    command: Command,
76    notifiers: Vec<Notifier>,
77    send_latency_timer: HistogramTimer,
78    span: tracing::Span,
79}
80
81struct StatusQueue<T> {
82    queue: T,
83    status: QueueStatus,
84}
85
86struct DatabaseQueue {
87    inner: VecDeque<ScheduledQueueItem>,
88    send_latency: LabelGuardedHistogram<1>,
89}
90
91type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
92type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
93
94impl DatabaseScheduledQueue {
95    fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
96        Self {
97            queue: DatabaseQueue {
98                inner: Default::default(),
99                send_latency: metrics
100                    .barrier_send_latency
101                    .with_guarded_label_values(&[database_id.database_id.to_string().as_str()]),
102            },
103            status,
104        }
105    }
106}
107
108impl<T> StatusQueue<T> {
109    fn mark_blocked(&mut self, reason: String) {
110        self.status = QueueStatus::Blocked(reason);
111    }
112
113    fn mark_ready(&mut self) -> bool {
114        let prev_blocked = self.status.is_blocked();
115        self.status = QueueStatus::Ready;
116        prev_blocked
117    }
118
119    fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
120        // We don't allow any command to be scheduled when the queue is blocked, except for dropping streaming jobs.
121        // Because we allow dropping streaming jobs when the cluster is under recovery, so we have to buffer the drop
122        // command and execute it when the cluster is ready to clean up it.
123        // TODO: this is just a workaround to allow dropping streaming jobs when the cluster is under recovery,
124        // we need to refine it when catalog and streaming metadata can be handled in a transactional way.
125        if let QueueStatus::Blocked(reason) = &self.status
126            && !matches!(
127                command,
128                Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
129            )
130        {
131            return Err(MetaError::unavailable(reason));
132        }
133        Ok(())
134    }
135}
136
137fn tracing_span() -> tracing::Span {
138    if tracing::Span::current().is_none() {
139        tracing::Span::none()
140    } else {
141        tracing::info_span!(
142            "barrier",
143            checkpoint = tracing::field::Empty,
144            epoch = tracing::field::Empty
145        )
146    }
147}
148
149/// The sender side of the barrier scheduling queue.
150/// Can be cloned and held by other managers to schedule and run barriers.
151#[derive(Clone)]
152pub struct BarrierScheduler {
153    inner: Arc<Inner>,
154
155    /// Used for getting the latest snapshot after `FLUSH`.
156    hummock_manager: HummockManagerRef,
157}
158
159impl BarrierScheduler {
160    /// Create a pair of [`BarrierScheduler`] and [`ScheduledBarriers`], for scheduling barriers
161    /// from different managers, and executing them in the barrier manager, respectively.
162    pub fn new_pair(
163        hummock_manager: HummockManagerRef,
164        metrics: Arc<MetaMetrics>,
165    ) -> (Self, ScheduledBarriers) {
166        let inner = Arc::new(Inner {
167            queue: Mutex::new(ScheduledQueue {
168                queue: Default::default(),
169                status: QueueStatus::Ready,
170            }),
171            changed_tx: watch::channel(()).0,
172            metrics,
173        });
174
175        (
176            Self {
177                inner: inner.clone(),
178                hummock_manager,
179            },
180            ScheduledBarriers { inner },
181        )
182    }
183
184    /// Push a scheduled barrier into the queue.
185    fn push(
186        &self,
187        database_id: DatabaseId,
188        scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
189    ) -> MetaResult<()> {
190        let mut queue = self.inner.queue.lock();
191        let scheduleds = scheduleds.into_iter().collect_vec();
192        scheduleds
193            .iter()
194            .try_for_each(|(command, _)| queue.validate_item(command))?;
195        let queue = queue.queue.entry(database_id).or_insert_with(|| {
196            DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
197        });
198        scheduleds
199            .iter()
200            .try_for_each(|(command, _)| queue.validate_item(command))?;
201        for (command, notifier) in scheduleds {
202            queue.queue.inner.push_back(ScheduledQueueItem {
203                command,
204                notifiers: vec![notifier],
205                send_latency_timer: queue.queue.send_latency.start_timer(),
206                span: tracing_span(),
207            });
208            if queue.queue.inner.len() == 1 {
209                self.inner.changed_tx.send(()).ok();
210            }
211        }
212        Ok(())
213    }
214
215    /// Try to cancel scheduled cmd for create streaming job, return true if the command exists previously and get cancelled.
216    pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, table_id: TableId) -> bool {
217        let queue = &mut self.inner.queue.lock();
218        let Some(queue) = queue.queue.get_mut(&database_id) else {
219            return false;
220        };
221
222        if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
223            if let Command::CreateStreamingJob { info, .. } = &scheduled.command
224                && info.stream_job_fragments.stream_job_id() == table_id
225            {
226                true
227            } else {
228                false
229            }
230        }) {
231            queue.queue.inner.remove(idx).unwrap();
232            true
233        } else {
234            false
235        }
236    }
237
238    /// Run multiple commands and return when they're all completely finished (i.e., collected). It's ensured that
239    /// multiple commands are executed continuously.
240    ///
241    /// Returns the barrier info of each command.
242    ///
243    /// TODO: atomicity of multiple commands is not guaranteed.
244    async fn run_multiple_commands(
245        &self,
246        database_id: DatabaseId,
247        commands: Vec<Command>,
248    ) -> MetaResult<()> {
249        let mut contexts = Vec::with_capacity(commands.len());
250        let mut scheduleds = Vec::with_capacity(commands.len());
251
252        for command in commands {
253            let (started_tx, started_rx) = oneshot::channel();
254            let (collect_tx, collect_rx) = oneshot::channel();
255
256            contexts.push((started_rx, collect_rx));
257            scheduleds.push((
258                command,
259                Notifier {
260                    started: Some(started_tx),
261                    collected: Some(collect_tx),
262                },
263            ));
264        }
265
266        self.push(database_id, scheduleds)?;
267
268        for (injected_rx, collect_rx) in contexts {
269            // Wait for this command to be injected, and record the result.
270            tracing::trace!("waiting for injected_rx");
271            injected_rx
272                .await
273                .ok()
274                .context("failed to inject barrier")??;
275
276            tracing::trace!("waiting for collect_rx");
277            // Throw the error if it occurs when collecting this barrier.
278            collect_rx
279                .await
280                .ok()
281                .context("failed to collect barrier")??;
282        }
283
284        Ok(())
285    }
286
287    /// Run a command and return when it's completely finished (i.e., collected).
288    ///
289    /// Returns the barrier info of the actual command.
290    pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
291        tracing::trace!("run_command: {:?}", command);
292        let ret = self.run_multiple_commands(database_id, vec![command]).await;
293        tracing::trace!("run_command finished");
294        ret
295    }
296
297    /// Flush means waiting for the next barrier to collect.
298    pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
299        let start = Instant::now();
300
301        tracing::debug!("start barrier flush");
302        self.run_multiple_commands(database_id, vec![Command::Flush])
303            .await?;
304
305        let elapsed = Instant::now().duration_since(start);
306        tracing::debug!("barrier flushed in {:?}", elapsed);
307
308        let version_id = self.hummock_manager.get_version_id().await;
309        Ok(version_id)
310    }
311}
312
313/// The receiver side of the barrier scheduling queue.
314pub struct ScheduledBarriers {
315    inner: Arc<Inner>,
316}
317
318/// Held by the [`crate::barrier::worker::GlobalBarrierWorker`] to execute these commands.
319pub(super) struct PeriodicBarriers {
320    min_interval: Interval,
321
322    /// Force checkpoint in next barrier.
323    force_checkpoint: bool,
324
325    /// The numbers of barrier (checkpoint = false) since the last barrier (checkpoint = true)
326    num_uncheckpointed_barrier: usize,
327    checkpoint_frequency: usize,
328}
329
330impl PeriodicBarriers {
331    pub(super) fn new(min_interval: Duration, checkpoint_frequency: usize) -> PeriodicBarriers {
332        Self {
333            min_interval: tokio::time::interval(min_interval),
334            force_checkpoint: false,
335            num_uncheckpointed_barrier: 0,
336            checkpoint_frequency,
337        }
338    }
339
340    pub(super) fn set_min_interval(&mut self, min_interval: Duration) {
341        let set_new_interval = min_interval != self.min_interval.period();
342        if set_new_interval {
343            let mut min_interval = tokio::time::interval(min_interval);
344            min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
345            self.min_interval = min_interval;
346        }
347    }
348
349    pub(super) async fn next_barrier(
350        &mut self,
351        context: &impl GlobalBarrierWorkerContext,
352    ) -> NewBarrier {
353        let checkpoint = self.try_get_checkpoint();
354        let scheduled = select! {
355            biased;
356            scheduled = context.next_scheduled() => {
357                self.min_interval.reset();
358                let checkpoint = scheduled.command.need_checkpoint() || checkpoint;
359                NewBarrier {
360                    command: Some((scheduled.database_id, scheduled.command, scheduled.notifiers)),
361                    span: scheduled.span,
362                    checkpoint,
363                }
364            },
365            _ = self.min_interval.tick() => {
366                NewBarrier {
367                    command: None,
368                    span: tracing_span(),
369                    checkpoint,
370                }
371            }
372        };
373        self.update_num_uncheckpointed_barrier(scheduled.checkpoint);
374        scheduled
375    }
376}
377
378impl ScheduledBarriers {
379    pub(super) async fn next_scheduled(&self) -> Scheduled {
380        'outer: loop {
381            let mut rx = self.inner.changed_tx.subscribe();
382            {
383                let mut queue = self.inner.queue.lock();
384                if queue.status.is_blocked() {
385                    continue;
386                }
387                for (database_id, queue) in &mut queue.queue {
388                    if queue.status.is_blocked() {
389                        continue;
390                    }
391                    if let Some(item) = queue.queue.inner.pop_front() {
392                        item.send_latency_timer.observe_duration();
393                        break 'outer Scheduled {
394                            database_id: *database_id,
395                            command: item.command,
396                            notifiers: item.notifiers,
397                            span: item.span,
398                        };
399                    }
400                }
401            }
402            rx.changed().await.unwrap();
403        }
404    }
405}
406
407pub(super) enum MarkReadyOptions {
408    Database(DatabaseId),
409    Global {
410        blocked_databases: HashSet<DatabaseId>,
411    },
412}
413
414impl ScheduledBarriers {
415    /// Pre buffered drop and cancel command, return true if any.
416    pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> bool {
417        self.pre_apply_drop_cancel_scheduled(database_id)
418    }
419
420    /// Mark command scheduler as blocked and abort all queued scheduled command and notify with
421    /// specific reason.
422    pub(super) fn abort_and_mark_blocked(
423        &self,
424        database_id: Option<DatabaseId>,
425        reason: impl Into<String>,
426    ) {
427        let mut queue = self.inner.queue.lock();
428        fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
429            format!("database {} unavailable {}", database_id, reason)
430        }
431        fn mark_blocked_and_notify_failed(
432            database_id: DatabaseId,
433            queue: &mut DatabaseScheduledQueue,
434            reason: &String,
435        ) {
436            let reason = database_blocked_reason(database_id, reason);
437            let err: MetaError = anyhow!("{}", reason).into();
438            queue.mark_blocked(reason);
439            while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
440                notifiers
441                    .into_iter()
442                    .for_each(|notify| notify.notify_collection_failed(err.clone()))
443            }
444        }
445        if let Some(database_id) = database_id {
446            let reason = reason.into();
447            match queue.queue.entry(database_id) {
448                Entry::Occupied(entry) => {
449                    let queue = entry.into_mut();
450                    if queue.status.is_blocked() {
451                        if cfg!(debug_assertions) {
452                            panic!("database {} marked as blocked twice", database_id);
453                        } else {
454                            warn!(?database_id, "database marked as blocked twice");
455                        }
456                    }
457                    info!(?database_id, "database marked as blocked");
458                    mark_blocked_and_notify_failed(database_id, queue, &reason);
459                }
460                Entry::Vacant(entry) => {
461                    entry.insert(DatabaseScheduledQueue::new(
462                        database_id,
463                        &self.inner.metrics,
464                        QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
465                    ));
466                }
467            }
468        } else {
469            let reason = reason.into();
470            if queue.status.is_blocked() {
471                if cfg!(debug_assertions) {
472                    panic!("cluster marked as blocked twice");
473                } else {
474                    warn!("cluster marked as blocked twice");
475                }
476            }
477            info!("cluster marked as blocked");
478            queue.mark_blocked(reason.clone());
479            for (database_id, queue) in &mut queue.queue {
480                mark_blocked_and_notify_failed(*database_id, queue, &reason);
481            }
482        }
483    }
484
485    /// Mark command scheduler as ready to accept new command.
486    pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
487        let mut queue = self.inner.queue.lock();
488        let queue = &mut *queue;
489        match options {
490            MarkReadyOptions::Database(database_id) => {
491                info!(?database_id, "database marked as ready");
492                let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
493                    DatabaseScheduledQueue::new(
494                        database_id,
495                        &self.inner.metrics,
496                        QueueStatus::Ready,
497                    )
498                });
499                if !database_queue.status.is_blocked() {
500                    if cfg!(debug_assertions) {
501                        panic!("database {} marked as ready twice", database_id);
502                    } else {
503                        warn!(?database_id, "database marked as ready twice");
504                    }
505                }
506                if database_queue.mark_ready()
507                    && !queue.status.is_blocked()
508                    && !database_queue.queue.inner.is_empty()
509                {
510                    self.inner.changed_tx.send(()).ok();
511                }
512            }
513            MarkReadyOptions::Global { blocked_databases } => {
514                if !queue.status.is_blocked() {
515                    if cfg!(debug_assertions) {
516                        panic!("cluster marked as ready twice");
517                    } else {
518                        warn!("cluster marked as ready twice");
519                    }
520                }
521                info!(?blocked_databases, "cluster marked as ready");
522                let prev_blocked = queue.mark_ready();
523                for database_id in &blocked_databases {
524                    queue.queue.entry(*database_id).or_insert_with(|| {
525                        DatabaseScheduledQueue::new(
526                            *database_id,
527                            &self.inner.metrics,
528                            QueueStatus::Blocked(format!(
529                                "database {} failed to recover in global recovery",
530                                database_id
531                            )),
532                        )
533                    });
534                }
535                for (database_id, queue) in &mut queue.queue {
536                    if !blocked_databases.contains(database_id) {
537                        queue.mark_ready();
538                    }
539                }
540                if prev_blocked
541                    && queue
542                        .queue
543                        .values()
544                        .any(|database_queue| !database_queue.queue.inner.is_empty())
545                {
546                    self.inner.changed_tx.send(()).ok();
547                }
548            }
549        }
550    }
551
552    /// Try to pre apply drop and cancel scheduled command and return them if any.
553    /// It should only be called in recovery.
554    pub(super) fn pre_apply_drop_cancel_scheduled(&self, database_id: Option<DatabaseId>) -> bool {
555        let mut queue = self.inner.queue.lock();
556        let mut applied = false;
557
558        let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
559            while let Some(ScheduledQueueItem {
560                notifiers, command, ..
561            }) = queue.queue.inner.pop_front()
562            {
563                match command {
564                    Command::DropStreamingJobs { .. } => {
565                        applied = true;
566                    }
567                    Command::DropSubscription { .. } => {}
568                    _ => {
569                        unreachable!("only drop and cancel streaming jobs should be buffered");
570                    }
571                }
572                notifiers.into_iter().for_each(|notify| {
573                    notify.notify_collected();
574                });
575            }
576        };
577
578        if let Some(database_id) = database_id {
579            assert_matches!(queue.status, QueueStatus::Ready);
580            if let Some(queue) = queue.queue.get_mut(&database_id) {
581                assert_matches!(queue.status, QueueStatus::Blocked(_));
582                pre_apply_drop_cancel(queue);
583            }
584        } else {
585            assert_matches!(queue.status, QueueStatus::Blocked(_));
586            for queue in queue.queue.values_mut() {
587                pre_apply_drop_cancel(queue);
588            }
589        }
590
591        applied
592    }
593}
594
595impl PeriodicBarriers {
596    /// Whether the barrier(checkpoint = true) should be injected.
597    fn try_get_checkpoint(&self) -> bool {
598        self.num_uncheckpointed_barrier + 1 >= self.checkpoint_frequency || self.force_checkpoint
599    }
600
601    /// Make the `checkpoint` of the next barrier must be true
602    pub fn force_checkpoint_in_next_barrier(&mut self) {
603        self.force_checkpoint = true;
604    }
605
606    /// Update the `checkpoint_frequency`
607    pub fn set_checkpoint_frequency(&mut self, frequency: usize) {
608        self.checkpoint_frequency = frequency;
609    }
610
611    /// Update the `num_uncheckpointed_barrier`
612    fn update_num_uncheckpointed_barrier(&mut self, checkpoint: bool) {
613        if checkpoint {
614            self.num_uncheckpointed_barrier = 0;
615            self.force_checkpoint = false;
616        } else {
617            self.num_uncheckpointed_barrier += 1;
618        }
619    }
620}