1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, anyhow};
21use assert_matches::assert_matches;
22use await_tree::InstrumentAwait;
23use itertools::Itertools;
24use parking_lot::Mutex;
25use prometheus::HistogramTimer;
26use risingwave_common::catalog::{DatabaseId, TableId};
27use risingwave_common::metrics::LabelGuardedHistogram;
28use risingwave_hummock_sdk::HummockVersionId;
29use tokio::select;
30use tokio::sync::{oneshot, watch};
31use tokio::time::Interval;
32use tracing::{info, warn};
33
34use super::notifier::Notifier;
35use super::{Command, Scheduled};
36use crate::barrier::context::GlobalBarrierWorkerContext;
37use crate::hummock::HummockManagerRef;
38use crate::rpc::metrics::MetaMetrics;
39use crate::{MetaError, MetaResult};
40
41pub(super) struct NewBarrier {
42 pub command: Option<(DatabaseId, Command, Vec<Notifier>)>,
43 pub span: tracing::Span,
44 pub checkpoint: bool,
45}
46
47struct Inner {
52 queue: Mutex<ScheduledQueue>,
53
54 changed_tx: watch::Sender<()>,
56
57 metrics: Arc<MetaMetrics>,
59}
60
61#[derive(Debug)]
62enum QueueStatus {
63 Ready,
65 Blocked(String),
67}
68
69impl QueueStatus {
70 fn is_blocked(&self) -> bool {
71 matches!(self, Self::Blocked(_))
72 }
73}
74
75struct ScheduledQueueItem {
76 command: Command,
77 notifiers: Vec<Notifier>,
78 send_latency_timer: HistogramTimer,
79 span: tracing::Span,
80}
81
82struct StatusQueue<T> {
83 queue: T,
84 status: QueueStatus,
85}
86
87struct DatabaseQueue {
88 inner: VecDeque<ScheduledQueueItem>,
89 send_latency: LabelGuardedHistogram,
90}
91
92type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
93type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
94
95impl DatabaseScheduledQueue {
96 fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
97 Self {
98 queue: DatabaseQueue {
99 inner: Default::default(),
100 send_latency: metrics
101 .barrier_send_latency
102 .with_guarded_label_values(&[database_id.database_id.to_string().as_str()]),
103 },
104 status,
105 }
106 }
107}
108
109impl<T> StatusQueue<T> {
110 fn mark_blocked(&mut self, reason: String) {
111 self.status = QueueStatus::Blocked(reason);
112 }
113
114 fn mark_ready(&mut self) -> bool {
115 let prev_blocked = self.status.is_blocked();
116 self.status = QueueStatus::Ready;
117 prev_blocked
118 }
119
120 fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
121 if let QueueStatus::Blocked(reason) = &self.status
127 && !matches!(
128 command,
129 Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
130 )
131 {
132 return Err(MetaError::unavailable(reason));
133 }
134 Ok(())
135 }
136}
137
138fn tracing_span() -> tracing::Span {
139 if tracing::Span::current().is_none() {
140 tracing::Span::none()
141 } else {
142 tracing::info_span!(
143 "barrier",
144 checkpoint = tracing::field::Empty,
145 epoch = tracing::field::Empty
146 )
147 }
148}
149
150#[derive(Clone)]
153pub struct BarrierScheduler {
154 inner: Arc<Inner>,
155
156 hummock_manager: HummockManagerRef,
158}
159
160impl BarrierScheduler {
161 pub fn new_pair(
164 hummock_manager: HummockManagerRef,
165 metrics: Arc<MetaMetrics>,
166 ) -> (Self, ScheduledBarriers) {
167 let inner = Arc::new(Inner {
168 queue: Mutex::new(ScheduledQueue {
169 queue: Default::default(),
170 status: QueueStatus::Ready,
171 }),
172 changed_tx: watch::channel(()).0,
173 metrics,
174 });
175
176 (
177 Self {
178 inner: inner.clone(),
179 hummock_manager,
180 },
181 ScheduledBarriers { inner },
182 )
183 }
184
185 fn push(
187 &self,
188 database_id: DatabaseId,
189 scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
190 ) -> MetaResult<()> {
191 let mut queue = self.inner.queue.lock();
192 let scheduleds = scheduleds.into_iter().collect_vec();
193 scheduleds
194 .iter()
195 .try_for_each(|(command, _)| queue.validate_item(command))?;
196 let queue = queue.queue.entry(database_id).or_insert_with(|| {
197 DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
198 });
199 scheduleds
200 .iter()
201 .try_for_each(|(command, _)| queue.validate_item(command))?;
202 for (command, notifier) in scheduleds {
203 queue.queue.inner.push_back(ScheduledQueueItem {
204 command,
205 notifiers: vec![notifier],
206 send_latency_timer: queue.queue.send_latency.start_timer(),
207 span: tracing_span(),
208 });
209 if queue.queue.inner.len() == 1 {
210 self.inner.changed_tx.send(()).ok();
211 }
212 }
213 Ok(())
214 }
215
216 pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, table_id: TableId) -> bool {
218 let queue = &mut self.inner.queue.lock();
219 let Some(queue) = queue.queue.get_mut(&database_id) else {
220 return false;
221 };
222
223 if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
224 if let Command::CreateStreamingJob { info, .. } = &scheduled.command
225 && info.stream_job_fragments.stream_job_id() == table_id
226 {
227 true
228 } else {
229 false
230 }
231 }) {
232 queue.queue.inner.remove(idx).unwrap();
233 true
234 } else {
235 false
236 }
237 }
238
239 #[await_tree::instrument("run_commands({})", commands.iter().join(", "))]
246 async fn run_multiple_commands(
247 &self,
248 database_id: DatabaseId,
249 commands: Vec<Command>,
250 ) -> MetaResult<()> {
251 let mut contexts = Vec::with_capacity(commands.len());
252 let mut scheduleds = Vec::with_capacity(commands.len());
253
254 for command in commands {
255 let (started_tx, started_rx) = oneshot::channel();
256 let (collect_tx, collect_rx) = oneshot::channel();
257
258 contexts.push((started_rx, collect_rx));
259 scheduleds.push((
260 command,
261 Notifier {
262 started: Some(started_tx),
263 collected: Some(collect_tx),
264 },
265 ));
266 }
267
268 self.push(database_id, scheduleds)?;
269
270 for (injected_rx, collect_rx) in contexts {
271 tracing::trace!("waiting for injected_rx");
273 injected_rx
274 .instrument_await("wait_injected")
275 .await
276 .ok()
277 .context("failed to inject barrier")??;
278
279 tracing::trace!("waiting for collect_rx");
280 collect_rx
282 .instrument_await("wait_collected")
283 .await
284 .ok()
285 .context("failed to collect barrier")??;
286 }
287
288 Ok(())
289 }
290
291 pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
295 tracing::trace!("run_command: {:?}", command);
296 let ret = self.run_multiple_commands(database_id, vec![command]).await;
297 tracing::trace!("run_command finished");
298 ret
299 }
300
301 pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
303 let start = Instant::now();
304
305 tracing::debug!("start barrier flush");
306 self.run_multiple_commands(database_id, vec![Command::Flush])
307 .await?;
308
309 let elapsed = Instant::now().duration_since(start);
310 tracing::debug!("barrier flushed in {:?}", elapsed);
311
312 let version_id = self.hummock_manager.get_version_id().await;
313 Ok(version_id)
314 }
315}
316
317pub struct ScheduledBarriers {
319 inner: Arc<Inner>,
320}
321
322pub(super) struct PeriodicBarriers {
324 min_interval: Interval,
325
326 force_checkpoint: bool,
328
329 num_uncheckpointed_barrier: usize,
331 checkpoint_frequency: usize,
332}
333
334impl PeriodicBarriers {
335 pub(super) fn new(min_interval: Duration, checkpoint_frequency: usize) -> PeriodicBarriers {
336 Self {
337 min_interval: tokio::time::interval(min_interval),
338 force_checkpoint: false,
339 num_uncheckpointed_barrier: 0,
340 checkpoint_frequency,
341 }
342 }
343
344 pub(super) fn set_min_interval(&mut self, min_interval: Duration) {
345 let set_new_interval = min_interval != self.min_interval.period();
346 if set_new_interval {
347 let mut min_interval = tokio::time::interval(min_interval);
348 min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
349 self.min_interval = min_interval;
350 }
351 }
352
353 #[await_tree::instrument]
354 pub(super) async fn next_barrier(
355 &mut self,
356 context: &impl GlobalBarrierWorkerContext,
357 ) -> NewBarrier {
358 let checkpoint = self.try_get_checkpoint();
359 let scheduled = select! {
360 biased;
361 scheduled = context.next_scheduled() => {
362 self.min_interval.reset();
363 let checkpoint = scheduled.command.need_checkpoint() || checkpoint;
364 NewBarrier {
365 command: Some((scheduled.database_id, scheduled.command, scheduled.notifiers)),
366 span: scheduled.span,
367 checkpoint,
368 }
369 },
370 _ = self.min_interval.tick() => {
371 NewBarrier {
372 command: None,
373 span: tracing_span(),
374 checkpoint,
375 }
376 }
377 };
378 self.update_num_uncheckpointed_barrier(scheduled.checkpoint);
379 scheduled
380 }
381}
382
383impl ScheduledBarriers {
384 pub(super) async fn next_scheduled(&self) -> Scheduled {
385 'outer: loop {
386 let mut rx = self.inner.changed_tx.subscribe();
387 {
388 let mut queue = self.inner.queue.lock();
389 if queue.status.is_blocked() {
390 continue;
391 }
392 for (database_id, queue) in &mut queue.queue {
393 if queue.status.is_blocked() {
394 continue;
395 }
396 if let Some(item) = queue.queue.inner.pop_front() {
397 item.send_latency_timer.observe_duration();
398 break 'outer Scheduled {
399 database_id: *database_id,
400 command: item.command,
401 notifiers: item.notifiers,
402 span: item.span,
403 };
404 }
405 }
406 }
407 rx.changed().await.unwrap();
408 }
409 }
410}
411
412pub(super) enum MarkReadyOptions {
413 Database(DatabaseId),
414 Global {
415 blocked_databases: HashSet<DatabaseId>,
416 },
417}
418
419impl ScheduledBarriers {
420 pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> bool {
422 self.pre_apply_drop_cancel_scheduled(database_id)
423 }
424
425 pub(super) fn abort_and_mark_blocked(
428 &self,
429 database_id: Option<DatabaseId>,
430 reason: impl Into<String>,
431 ) {
432 let mut queue = self.inner.queue.lock();
433 fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
434 format!("database {} unavailable {}", database_id, reason)
435 }
436 fn mark_blocked_and_notify_failed(
437 database_id: DatabaseId,
438 queue: &mut DatabaseScheduledQueue,
439 reason: &String,
440 ) {
441 let reason = database_blocked_reason(database_id, reason);
442 let err: MetaError = anyhow!("{}", reason).into();
443 queue.mark_blocked(reason);
444 while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
445 notifiers
446 .into_iter()
447 .for_each(|notify| notify.notify_collection_failed(err.clone()))
448 }
449 }
450 if let Some(database_id) = database_id {
451 let reason = reason.into();
452 match queue.queue.entry(database_id) {
453 Entry::Occupied(entry) => {
454 let queue = entry.into_mut();
455 if queue.status.is_blocked() {
456 if cfg!(debug_assertions) {
457 panic!("database {} marked as blocked twice", database_id);
458 } else {
459 warn!(?database_id, "database marked as blocked twice");
460 }
461 }
462 info!(?database_id, "database marked as blocked");
463 mark_blocked_and_notify_failed(database_id, queue, &reason);
464 }
465 Entry::Vacant(entry) => {
466 entry.insert(DatabaseScheduledQueue::new(
467 database_id,
468 &self.inner.metrics,
469 QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
470 ));
471 }
472 }
473 } else {
474 let reason = reason.into();
475 if queue.status.is_blocked() {
476 if cfg!(debug_assertions) {
477 panic!("cluster marked as blocked twice");
478 } else {
479 warn!("cluster marked as blocked twice");
480 }
481 }
482 info!("cluster marked as blocked");
483 queue.mark_blocked(reason.clone());
484 for (database_id, queue) in &mut queue.queue {
485 mark_blocked_and_notify_failed(*database_id, queue, &reason);
486 }
487 }
488 }
489
490 pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
492 let mut queue = self.inner.queue.lock();
493 let queue = &mut *queue;
494 match options {
495 MarkReadyOptions::Database(database_id) => {
496 info!(?database_id, "database marked as ready");
497 let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
498 DatabaseScheduledQueue::new(
499 database_id,
500 &self.inner.metrics,
501 QueueStatus::Ready,
502 )
503 });
504 if !database_queue.status.is_blocked() {
505 if cfg!(debug_assertions) {
506 panic!("database {} marked as ready twice", database_id);
507 } else {
508 warn!(?database_id, "database marked as ready twice");
509 }
510 }
511 if database_queue.mark_ready()
512 && !queue.status.is_blocked()
513 && !database_queue.queue.inner.is_empty()
514 {
515 self.inner.changed_tx.send(()).ok();
516 }
517 }
518 MarkReadyOptions::Global { blocked_databases } => {
519 if !queue.status.is_blocked() {
520 if cfg!(debug_assertions) {
521 panic!("cluster marked as ready twice");
522 } else {
523 warn!("cluster marked as ready twice");
524 }
525 }
526 info!(?blocked_databases, "cluster marked as ready");
527 let prev_blocked = queue.mark_ready();
528 for database_id in &blocked_databases {
529 queue.queue.entry(*database_id).or_insert_with(|| {
530 DatabaseScheduledQueue::new(
531 *database_id,
532 &self.inner.metrics,
533 QueueStatus::Blocked(format!(
534 "database {} failed to recover in global recovery",
535 database_id
536 )),
537 )
538 });
539 }
540 for (database_id, queue) in &mut queue.queue {
541 if !blocked_databases.contains(database_id) {
542 queue.mark_ready();
543 }
544 }
545 if prev_blocked
546 && queue
547 .queue
548 .values()
549 .any(|database_queue| !database_queue.queue.inner.is_empty())
550 {
551 self.inner.changed_tx.send(()).ok();
552 }
553 }
554 }
555 }
556
557 pub(super) fn pre_apply_drop_cancel_scheduled(&self, database_id: Option<DatabaseId>) -> bool {
560 let mut queue = self.inner.queue.lock();
561 let mut applied = false;
562
563 let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
564 while let Some(ScheduledQueueItem {
565 notifiers, command, ..
566 }) = queue.queue.inner.pop_front()
567 {
568 match command {
569 Command::DropStreamingJobs { .. } => {
570 applied = true;
571 }
572 Command::DropSubscription { .. } => {}
573 _ => {
574 unreachable!("only drop and cancel streaming jobs should be buffered");
575 }
576 }
577 notifiers.into_iter().for_each(|notify| {
578 notify.notify_collected();
579 });
580 }
581 };
582
583 if let Some(database_id) = database_id {
584 assert_matches!(queue.status, QueueStatus::Ready);
585 if let Some(queue) = queue.queue.get_mut(&database_id) {
586 assert_matches!(queue.status, QueueStatus::Blocked(_));
587 pre_apply_drop_cancel(queue);
588 }
589 } else {
590 assert_matches!(queue.status, QueueStatus::Blocked(_));
591 for queue in queue.queue.values_mut() {
592 pre_apply_drop_cancel(queue);
593 }
594 }
595
596 applied
597 }
598}
599
600impl PeriodicBarriers {
601 fn try_get_checkpoint(&self) -> bool {
603 self.num_uncheckpointed_barrier + 1 >= self.checkpoint_frequency || self.force_checkpoint
604 }
605
606 pub fn force_checkpoint_in_next_barrier(&mut self) {
608 self.force_checkpoint = true;
609 }
610
611 pub fn set_checkpoint_frequency(&mut self, frequency: usize) {
613 self.checkpoint_frequency = frequency;
614 }
615
616 fn update_num_uncheckpointed_barrier(&mut self, checkpoint: bool) {
618 if checkpoint {
619 self.num_uncheckpointed_barrier = 0;
620 self.force_checkpoint = false;
621 } else {
622 self.num_uncheckpointed_barrier += 1;
623 }
624 }
625}