1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, anyhow};
21use assert_matches::assert_matches;
22use itertools::Itertools;
23use parking_lot::Mutex;
24use prometheus::HistogramTimer;
25use risingwave_common::catalog::{DatabaseId, TableId};
26use risingwave_common::metrics::LabelGuardedHistogram;
27use risingwave_hummock_sdk::HummockVersionId;
28use tokio::select;
29use tokio::sync::{oneshot, watch};
30use tokio::time::Interval;
31use tracing::{info, warn};
32
33use super::notifier::Notifier;
34use super::{Command, Scheduled};
35use crate::barrier::context::GlobalBarrierWorkerContext;
36use crate::hummock::HummockManagerRef;
37use crate::rpc::metrics::MetaMetrics;
38use crate::{MetaError, MetaResult};
39
40pub(super) struct NewBarrier {
41 pub command: Option<(DatabaseId, Command, Vec<Notifier>)>,
42 pub span: tracing::Span,
43 pub checkpoint: bool,
44}
45
46struct Inner {
51 queue: Mutex<ScheduledQueue>,
52
53 changed_tx: watch::Sender<()>,
55
56 metrics: Arc<MetaMetrics>,
58}
59
60#[derive(Debug)]
61enum QueueStatus {
62 Ready,
64 Blocked(String),
66}
67
68impl QueueStatus {
69 fn is_blocked(&self) -> bool {
70 matches!(self, Self::Blocked(_))
71 }
72}
73
74struct ScheduledQueueItem {
75 command: Command,
76 notifiers: Vec<Notifier>,
77 send_latency_timer: HistogramTimer,
78 span: tracing::Span,
79}
80
81struct StatusQueue<T> {
82 queue: T,
83 status: QueueStatus,
84}
85
86struct DatabaseQueue {
87 inner: VecDeque<ScheduledQueueItem>,
88 send_latency: LabelGuardedHistogram<1>,
89}
90
91type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
92type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
93
94impl DatabaseScheduledQueue {
95 fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
96 Self {
97 queue: DatabaseQueue {
98 inner: Default::default(),
99 send_latency: metrics
100 .barrier_send_latency
101 .with_guarded_label_values(&[database_id.database_id.to_string().as_str()]),
102 },
103 status,
104 }
105 }
106}
107
108impl<T> StatusQueue<T> {
109 fn mark_blocked(&mut self, reason: String) {
110 self.status = QueueStatus::Blocked(reason);
111 }
112
113 fn mark_ready(&mut self) -> bool {
114 let prev_blocked = self.status.is_blocked();
115 self.status = QueueStatus::Ready;
116 prev_blocked
117 }
118
119 fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
120 if let QueueStatus::Blocked(reason) = &self.status
126 && !matches!(
127 command,
128 Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
129 )
130 {
131 return Err(MetaError::unavailable(reason));
132 }
133 Ok(())
134 }
135}
136
137fn tracing_span() -> tracing::Span {
138 if tracing::Span::current().is_none() {
139 tracing::Span::none()
140 } else {
141 tracing::info_span!(
142 "barrier",
143 checkpoint = tracing::field::Empty,
144 epoch = tracing::field::Empty
145 )
146 }
147}
148
149#[derive(Clone)]
152pub struct BarrierScheduler {
153 inner: Arc<Inner>,
154
155 hummock_manager: HummockManagerRef,
157}
158
159impl BarrierScheduler {
160 pub fn new_pair(
163 hummock_manager: HummockManagerRef,
164 metrics: Arc<MetaMetrics>,
165 ) -> (Self, ScheduledBarriers) {
166 let inner = Arc::new(Inner {
167 queue: Mutex::new(ScheduledQueue {
168 queue: Default::default(),
169 status: QueueStatus::Ready,
170 }),
171 changed_tx: watch::channel(()).0,
172 metrics,
173 });
174
175 (
176 Self {
177 inner: inner.clone(),
178 hummock_manager,
179 },
180 ScheduledBarriers { inner },
181 )
182 }
183
184 fn push(
186 &self,
187 database_id: DatabaseId,
188 scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
189 ) -> MetaResult<()> {
190 let mut queue = self.inner.queue.lock();
191 let scheduleds = scheduleds.into_iter().collect_vec();
192 scheduleds
193 .iter()
194 .try_for_each(|(command, _)| queue.validate_item(command))?;
195 let queue = queue.queue.entry(database_id).or_insert_with(|| {
196 DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
197 });
198 scheduleds
199 .iter()
200 .try_for_each(|(command, _)| queue.validate_item(command))?;
201 for (command, notifier) in scheduleds {
202 queue.queue.inner.push_back(ScheduledQueueItem {
203 command,
204 notifiers: vec![notifier],
205 send_latency_timer: queue.queue.send_latency.start_timer(),
206 span: tracing_span(),
207 });
208 if queue.queue.inner.len() == 1 {
209 self.inner.changed_tx.send(()).ok();
210 }
211 }
212 Ok(())
213 }
214
215 pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, table_id: TableId) -> bool {
217 let queue = &mut self.inner.queue.lock();
218 let Some(queue) = queue.queue.get_mut(&database_id) else {
219 return false;
220 };
221
222 if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
223 if let Command::CreateStreamingJob { info, .. } = &scheduled.command
224 && info.stream_job_fragments.stream_job_id() == table_id
225 {
226 true
227 } else {
228 false
229 }
230 }) {
231 queue.queue.inner.remove(idx).unwrap();
232 true
233 } else {
234 false
235 }
236 }
237
238 async fn run_multiple_commands(
245 &self,
246 database_id: DatabaseId,
247 commands: Vec<Command>,
248 ) -> MetaResult<()> {
249 let mut contexts = Vec::with_capacity(commands.len());
250 let mut scheduleds = Vec::with_capacity(commands.len());
251
252 for command in commands {
253 let (started_tx, started_rx) = oneshot::channel();
254 let (collect_tx, collect_rx) = oneshot::channel();
255
256 contexts.push((started_rx, collect_rx));
257 scheduleds.push((
258 command,
259 Notifier {
260 started: Some(started_tx),
261 collected: Some(collect_tx),
262 },
263 ));
264 }
265
266 self.push(database_id, scheduleds)?;
267
268 for (injected_rx, collect_rx) in contexts {
269 tracing::trace!("waiting for injected_rx");
271 injected_rx
272 .await
273 .ok()
274 .context("failed to inject barrier")??;
275
276 tracing::trace!("waiting for collect_rx");
277 collect_rx
279 .await
280 .ok()
281 .context("failed to collect barrier")??;
282 }
283
284 Ok(())
285 }
286
287 pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
291 tracing::trace!("run_command: {:?}", command);
292 let ret = self.run_multiple_commands(database_id, vec![command]).await;
293 tracing::trace!("run_command finished");
294 ret
295 }
296
297 pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
299 let start = Instant::now();
300
301 tracing::debug!("start barrier flush");
302 self.run_multiple_commands(database_id, vec![Command::Flush])
303 .await?;
304
305 let elapsed = Instant::now().duration_since(start);
306 tracing::debug!("barrier flushed in {:?}", elapsed);
307
308 let version_id = self.hummock_manager.get_version_id().await;
309 Ok(version_id)
310 }
311}
312
313pub struct ScheduledBarriers {
315 inner: Arc<Inner>,
316}
317
318pub(super) struct PeriodicBarriers {
320 min_interval: Interval,
321
322 force_checkpoint: bool,
324
325 num_uncheckpointed_barrier: usize,
327 checkpoint_frequency: usize,
328}
329
330impl PeriodicBarriers {
331 pub(super) fn new(min_interval: Duration, checkpoint_frequency: usize) -> PeriodicBarriers {
332 Self {
333 min_interval: tokio::time::interval(min_interval),
334 force_checkpoint: false,
335 num_uncheckpointed_barrier: 0,
336 checkpoint_frequency,
337 }
338 }
339
340 pub(super) fn set_min_interval(&mut self, min_interval: Duration) {
341 let set_new_interval = min_interval != self.min_interval.period();
342 if set_new_interval {
343 let mut min_interval = tokio::time::interval(min_interval);
344 min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
345 self.min_interval = min_interval;
346 }
347 }
348
349 pub(super) async fn next_barrier(
350 &mut self,
351 context: &impl GlobalBarrierWorkerContext,
352 ) -> NewBarrier {
353 let checkpoint = self.try_get_checkpoint();
354 let scheduled = select! {
355 biased;
356 scheduled = context.next_scheduled() => {
357 self.min_interval.reset();
358 let checkpoint = scheduled.command.need_checkpoint() || checkpoint;
359 NewBarrier {
360 command: Some((scheduled.database_id, scheduled.command, scheduled.notifiers)),
361 span: scheduled.span,
362 checkpoint,
363 }
364 },
365 _ = self.min_interval.tick() => {
366 NewBarrier {
367 command: None,
368 span: tracing_span(),
369 checkpoint,
370 }
371 }
372 };
373 self.update_num_uncheckpointed_barrier(scheduled.checkpoint);
374 scheduled
375 }
376}
377
378impl ScheduledBarriers {
379 pub(super) async fn next_scheduled(&self) -> Scheduled {
380 'outer: loop {
381 let mut rx = self.inner.changed_tx.subscribe();
382 {
383 let mut queue = self.inner.queue.lock();
384 if queue.status.is_blocked() {
385 continue;
386 }
387 for (database_id, queue) in &mut queue.queue {
388 if queue.status.is_blocked() {
389 continue;
390 }
391 if let Some(item) = queue.queue.inner.pop_front() {
392 item.send_latency_timer.observe_duration();
393 break 'outer Scheduled {
394 database_id: *database_id,
395 command: item.command,
396 notifiers: item.notifiers,
397 span: item.span,
398 };
399 }
400 }
401 }
402 rx.changed().await.unwrap();
403 }
404 }
405}
406
407pub(super) enum MarkReadyOptions {
408 Database(DatabaseId),
409 Global {
410 blocked_databases: HashSet<DatabaseId>,
411 },
412}
413
414impl ScheduledBarriers {
415 pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> bool {
417 self.pre_apply_drop_cancel_scheduled(database_id)
418 }
419
420 pub(super) fn abort_and_mark_blocked(
423 &self,
424 database_id: Option<DatabaseId>,
425 reason: impl Into<String>,
426 ) {
427 let mut queue = self.inner.queue.lock();
428 fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
429 format!("database {} unavailable {}", database_id, reason)
430 }
431 fn mark_blocked_and_notify_failed(
432 database_id: DatabaseId,
433 queue: &mut DatabaseScheduledQueue,
434 reason: &String,
435 ) {
436 let reason = database_blocked_reason(database_id, reason);
437 let err: MetaError = anyhow!("{}", reason).into();
438 queue.mark_blocked(reason);
439 while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
440 notifiers
441 .into_iter()
442 .for_each(|notify| notify.notify_collection_failed(err.clone()))
443 }
444 }
445 if let Some(database_id) = database_id {
446 let reason = reason.into();
447 match queue.queue.entry(database_id) {
448 Entry::Occupied(entry) => {
449 let queue = entry.into_mut();
450 if queue.status.is_blocked() {
451 if cfg!(debug_assertions) {
452 panic!("database {} marked as blocked twice", database_id);
453 } else {
454 warn!(?database_id, "database marked as blocked twice");
455 }
456 }
457 info!(?database_id, "database marked as blocked");
458 mark_blocked_and_notify_failed(database_id, queue, &reason);
459 }
460 Entry::Vacant(entry) => {
461 entry.insert(DatabaseScheduledQueue::new(
462 database_id,
463 &self.inner.metrics,
464 QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
465 ));
466 }
467 }
468 } else {
469 let reason = reason.into();
470 if queue.status.is_blocked() {
471 if cfg!(debug_assertions) {
472 panic!("cluster marked as blocked twice");
473 } else {
474 warn!("cluster marked as blocked twice");
475 }
476 }
477 info!("cluster marked as blocked");
478 queue.mark_blocked(reason.clone());
479 for (database_id, queue) in &mut queue.queue {
480 mark_blocked_and_notify_failed(*database_id, queue, &reason);
481 }
482 }
483 }
484
485 pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
487 let mut queue = self.inner.queue.lock();
488 let queue = &mut *queue;
489 match options {
490 MarkReadyOptions::Database(database_id) => {
491 info!(?database_id, "database marked as ready");
492 let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
493 DatabaseScheduledQueue::new(
494 database_id,
495 &self.inner.metrics,
496 QueueStatus::Ready,
497 )
498 });
499 if !database_queue.status.is_blocked() {
500 if cfg!(debug_assertions) {
501 panic!("database {} marked as ready twice", database_id);
502 } else {
503 warn!(?database_id, "database marked as ready twice");
504 }
505 }
506 if database_queue.mark_ready()
507 && !queue.status.is_blocked()
508 && !database_queue.queue.inner.is_empty()
509 {
510 self.inner.changed_tx.send(()).ok();
511 }
512 }
513 MarkReadyOptions::Global { blocked_databases } => {
514 if !queue.status.is_blocked() {
515 if cfg!(debug_assertions) {
516 panic!("cluster marked as ready twice");
517 } else {
518 warn!("cluster marked as ready twice");
519 }
520 }
521 info!(?blocked_databases, "cluster marked as ready");
522 let prev_blocked = queue.mark_ready();
523 for database_id in &blocked_databases {
524 queue.queue.entry(*database_id).or_insert_with(|| {
525 DatabaseScheduledQueue::new(
526 *database_id,
527 &self.inner.metrics,
528 QueueStatus::Blocked(format!(
529 "database {} failed to recover in global recovery",
530 database_id
531 )),
532 )
533 });
534 }
535 for (database_id, queue) in &mut queue.queue {
536 if !blocked_databases.contains(database_id) {
537 queue.mark_ready();
538 }
539 }
540 if prev_blocked
541 && queue
542 .queue
543 .values()
544 .any(|database_queue| !database_queue.queue.inner.is_empty())
545 {
546 self.inner.changed_tx.send(()).ok();
547 }
548 }
549 }
550 }
551
552 pub(super) fn pre_apply_drop_cancel_scheduled(&self, database_id: Option<DatabaseId>) -> bool {
555 let mut queue = self.inner.queue.lock();
556 let mut applied = false;
557
558 let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
559 while let Some(ScheduledQueueItem {
560 notifiers, command, ..
561 }) = queue.queue.inner.pop_front()
562 {
563 match command {
564 Command::DropStreamingJobs { .. } => {
565 applied = true;
566 }
567 Command::DropSubscription { .. } => {}
568 _ => {
569 unreachable!("only drop and cancel streaming jobs should be buffered");
570 }
571 }
572 notifiers.into_iter().for_each(|notify| {
573 notify.notify_collected();
574 });
575 }
576 };
577
578 if let Some(database_id) = database_id {
579 assert_matches!(queue.status, QueueStatus::Ready);
580 if let Some(queue) = queue.queue.get_mut(&database_id) {
581 assert_matches!(queue.status, QueueStatus::Blocked(_));
582 pre_apply_drop_cancel(queue);
583 }
584 } else {
585 assert_matches!(queue.status, QueueStatus::Blocked(_));
586 for queue in queue.queue.values_mut() {
587 pre_apply_drop_cancel(queue);
588 }
589 }
590
591 applied
592 }
593}
594
595impl PeriodicBarriers {
596 fn try_get_checkpoint(&self) -> bool {
598 self.num_uncheckpointed_barrier + 1 >= self.checkpoint_frequency || self.force_checkpoint
599 }
600
601 pub fn force_checkpoint_in_next_barrier(&mut self) {
603 self.force_checkpoint = true;
604 }
605
606 pub fn set_checkpoint_frequency(&mut self, frequency: usize) {
608 self.checkpoint_frequency = frequency;
609 }
610
611 fn update_num_uncheckpointed_barrier(&mut self, checkpoint: bool) {
613 if checkpoint {
614 self.num_uncheckpointed_barrier = 0;
615 self.force_checkpoint = false;
616 } else {
617 self.num_uncheckpointed_barrier += 1;
618 }
619 }
620}