1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use assert_matches::assert_matches;
21use await_tree::InstrumentAwait;
22use itertools::Itertools;
23use parking_lot::Mutex;
24use prometheus::HistogramTimer;
25use risingwave_common::catalog::{DatabaseId, TableId};
26use risingwave_common::id::JobId;
27use risingwave_common::metrics::LabelGuardedHistogram;
28use risingwave_hummock_sdk::HummockVersionId;
29use risingwave_pb::catalog::Database;
30use rw_futures_util::pending_on_none;
31use tokio::select;
32use tokio::sync::{oneshot, watch};
33use tokio::time::{Duration, Instant};
34use tokio_stream::wrappers::IntervalStream;
35use tokio_stream::{StreamExt, StreamMap};
36use tracing::{info, warn};
37
38use super::notifier::Notifier;
39use super::{Command, Scheduled};
40use crate::barrier::context::GlobalBarrierWorkerContext;
41use crate::hummock::HummockManagerRef;
42use crate::rpc::metrics::{GLOBAL_META_METRICS, MetaMetrics};
43use crate::{MetaError, MetaResult};
44
45pub(super) struct NewBarrier {
46 pub database_id: DatabaseId,
47 pub command: Option<(Command, Vec<Notifier>)>,
48 pub span: tracing::Span,
49 pub checkpoint: bool,
50}
51
52struct Inner {
57 queue: Mutex<ScheduledQueue>,
58
59 changed_tx: watch::Sender<()>,
61
62 metrics: Arc<MetaMetrics>,
64}
65
66#[derive(Debug)]
67enum QueueStatus {
68 Ready,
70 Blocked(String),
72}
73
74impl QueueStatus {
75 fn is_blocked(&self) -> bool {
76 matches!(self, Self::Blocked(_))
77 }
78}
79
80struct ScheduledQueueItem {
81 command: Command,
82 notifiers: Vec<Notifier>,
83 send_latency_timer: HistogramTimer,
84 span: tracing::Span,
85}
86
87struct StatusQueue<T> {
88 queue: T,
89 status: QueueStatus,
90}
91
92struct DatabaseQueue {
93 inner: VecDeque<ScheduledQueueItem>,
94 send_latency: LabelGuardedHistogram,
95}
96
97type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
98type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
99
100impl DatabaseScheduledQueue {
101 fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
102 Self {
103 queue: DatabaseQueue {
104 inner: Default::default(),
105 send_latency: metrics
106 .barrier_send_latency
107 .with_guarded_label_values(&[database_id.to_string().as_str()]),
108 },
109 status,
110 }
111 }
112}
113
114impl<T> StatusQueue<T> {
115 fn mark_blocked(&mut self, reason: String) {
116 self.status = QueueStatus::Blocked(reason);
117 }
118
119 fn mark_ready(&mut self) -> bool {
120 let prev_blocked = self.status.is_blocked();
121 self.status = QueueStatus::Ready;
122 prev_blocked
123 }
124
125 fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
126 if let QueueStatus::Blocked(reason) = &self.status
132 && !matches!(
133 command,
134 Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
135 )
136 {
137 return Err(MetaError::unavailable(reason));
138 }
139 Ok(())
140 }
141}
142
143fn tracing_span() -> tracing::Span {
144 if tracing::Span::current().is_none() {
145 tracing::Span::none()
146 } else {
147 tracing::info_span!(
148 "barrier",
149 checkpoint = tracing::field::Empty,
150 epoch = tracing::field::Empty
151 )
152 }
153}
154
155#[derive(Clone)]
158pub struct BarrierScheduler {
159 inner: Arc<Inner>,
160
161 hummock_manager: HummockManagerRef,
163}
164
165impl BarrierScheduler {
166 pub fn new_pair(
169 hummock_manager: HummockManagerRef,
170 metrics: Arc<MetaMetrics>,
171 ) -> (Self, ScheduledBarriers) {
172 let inner = Arc::new(Inner {
173 queue: Mutex::new(ScheduledQueue {
174 queue: Default::default(),
175 status: QueueStatus::Ready,
176 }),
177 changed_tx: watch::channel(()).0,
178 metrics,
179 });
180
181 (
182 Self {
183 inner: inner.clone(),
184 hummock_manager,
185 },
186 ScheduledBarriers { inner },
187 )
188 }
189
190 fn push(
192 &self,
193 database_id: DatabaseId,
194 scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
195 ) -> MetaResult<()> {
196 let mut queue = self.inner.queue.lock();
197 let scheduleds = scheduleds.into_iter().collect_vec();
198 scheduleds
199 .iter()
200 .try_for_each(|(command, _)| queue.validate_item(command))?;
201 let queue = queue.queue.entry(database_id).or_insert_with(|| {
202 DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
203 });
204 scheduleds
205 .iter()
206 .try_for_each(|(command, _)| queue.validate_item(command))?;
207 for (command, notifier) in scheduleds {
208 queue.queue.inner.push_back(ScheduledQueueItem {
209 command,
210 notifiers: vec![notifier],
211 send_latency_timer: queue.queue.send_latency.start_timer(),
212 span: tracing_span(),
213 });
214 if queue.queue.inner.len() == 1 {
215 self.inner.changed_tx.send(()).ok();
216 }
217 }
218 Ok(())
219 }
220
221 pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, job_id: JobId) -> bool {
223 let queue = &mut self.inner.queue.lock();
224 let Some(queue) = queue.queue.get_mut(&database_id) else {
225 return false;
226 };
227
228 if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
229 if let Command::CreateStreamingJob { info, .. } = &scheduled.command
230 && info.stream_job_fragments.stream_job_id() == job_id
231 {
232 true
233 } else {
234 false
235 }
236 }) {
237 queue.queue.inner.remove(idx).unwrap();
238 true
239 } else {
240 false
241 }
242 }
243
244 #[await_tree::instrument("run_commands({})", commands.iter().join(", "))]
251 async fn run_multiple_commands(
252 &self,
253 database_id: DatabaseId,
254 commands: Vec<Command>,
255 ) -> MetaResult<()> {
256 let mut contexts = Vec::with_capacity(commands.len());
257 let mut scheduleds = Vec::with_capacity(commands.len());
258
259 for command in commands {
260 let (started_tx, started_rx) = oneshot::channel();
261 let (collect_tx, collect_rx) = oneshot::channel();
262
263 contexts.push((started_rx, collect_rx));
264 scheduleds.push((
265 command,
266 Notifier {
267 started: Some(started_tx),
268 collected: Some(collect_tx),
269 },
270 ));
271 }
272
273 self.push(database_id, scheduleds)?;
274
275 for (injected_rx, collect_rx) in contexts {
276 tracing::trace!("waiting for injected_rx");
278 injected_rx
279 .instrument_await("wait_injected")
280 .await
281 .ok()
282 .context("failed to inject barrier")??;
283
284 tracing::trace!("waiting for collect_rx");
285 collect_rx
287 .instrument_await("wait_collected")
288 .await
289 .ok()
290 .context("failed to collect barrier")??;
291 }
292
293 Ok(())
294 }
295
296 pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
300 tracing::trace!("run_command: {:?}", command);
301 let ret = self.run_multiple_commands(database_id, vec![command]).await;
302 tracing::trace!("run_command finished");
303 ret
304 }
305
306 pub fn run_command_no_wait(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
308 tracing::trace!("run_command_no_wait: {:?}", command);
309 self.push(database_id, vec![(command, Notifier::default())])
310 }
311
312 pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
314 let start = Instant::now();
315
316 tracing::debug!("start barrier flush");
317 self.run_multiple_commands(database_id, vec![Command::Flush])
318 .await?;
319
320 let elapsed = Instant::now().duration_since(start);
321 tracing::debug!("barrier flushed in {:?}", elapsed);
322
323 let version_id = self.hummock_manager.get_version_id().await;
324 Ok(version_id)
325 }
326}
327
328pub struct ScheduledBarriers {
330 inner: Arc<Inner>,
331}
332
333#[derive(Debug)]
335pub struct DatabaseBarrierState {
336 barrier_interval: Option<Duration>,
337 checkpoint_frequency: Option<u64>,
338 num_uncheckpointed_barrier: u64,
340}
341
342impl DatabaseBarrierState {
343 fn new(barrier_interval_ms: Option<u32>, checkpoint_frequency: Option<u64>) -> Self {
344 Self {
345 barrier_interval: barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64)),
346 checkpoint_frequency,
347 num_uncheckpointed_barrier: 0,
348 }
349 }
350}
351
352#[derive(Default, Debug)]
354pub struct PeriodicBarriers {
355 sys_barrier_interval: Duration,
357 sys_checkpoint_frequency: u64,
358 databases: HashMap<DatabaseId, DatabaseBarrierState>,
360 timer_streams: StreamMap<DatabaseId, IntervalStream>,
363 force_checkpoint_databases: HashSet<DatabaseId>,
364}
365
366impl PeriodicBarriers {
367 pub(super) fn new(
368 sys_barrier_interval: Duration,
369 sys_checkpoint_frequency: u64,
370 database_infos: Vec<Database>,
371 ) -> Self {
372 let mut databases = HashMap::with_capacity(database_infos.len());
373 let mut timer_streams = StreamMap::with_capacity(database_infos.len());
374 database_infos.into_iter().for_each(|database| {
375 let database_id: DatabaseId = database.id;
376 let barrier_interval_ms = database.barrier_interval_ms;
377 let checkpoint_frequency = database.checkpoint_frequency;
378 databases.insert(
379 database_id,
380 DatabaseBarrierState::new(barrier_interval_ms, checkpoint_frequency),
381 );
382 let duration = if let Some(ms) = barrier_interval_ms {
383 Duration::from_millis(ms as u64)
384 } else {
385 sys_barrier_interval
386 };
387
388 let interval_stream = Self::new_interval_stream(duration, &database_id);
390 timer_streams.insert(database_id, interval_stream);
391 });
392 Self {
393 sys_barrier_interval,
394 sys_checkpoint_frequency,
395 databases,
396 timer_streams,
397 force_checkpoint_databases: Default::default(),
398 }
399 }
400
401 fn new_interval_stream(duration: Duration, database_id: &DatabaseId) -> IntervalStream {
403 GLOBAL_META_METRICS
404 .barrier_interval_by_database
405 .with_label_values(&[&database_id.to_string()])
406 .set(duration.as_millis_f64());
407 let mut interval = tokio::time::interval(duration);
408 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
409 IntervalStream::new(interval)
410 }
411
412 pub(super) fn set_sys_barrier_interval(&mut self, duration: Duration) {
414 if self.sys_barrier_interval == duration {
415 return;
416 }
417 self.sys_barrier_interval = duration;
418 for (db_id, db_state) in &mut self.databases {
420 if db_state.barrier_interval.is_none() {
421 let interval_stream = Self::new_interval_stream(duration, db_id);
422 self.timer_streams.insert(*db_id, interval_stream);
423 }
424 }
425 }
426
427 pub fn set_sys_checkpoint_frequency(&mut self, frequency: u64) {
429 if self.sys_checkpoint_frequency == frequency {
430 return;
431 }
432 self.sys_checkpoint_frequency = frequency;
433 for db_state in self.databases.values_mut() {
435 if db_state.checkpoint_frequency.is_none() {
436 db_state.num_uncheckpointed_barrier = 0;
437 }
438 }
439 }
440
441 pub(super) fn update_database_barrier(
442 &mut self,
443 database_id: DatabaseId,
444 barrier_interval_ms: Option<u32>,
445 checkpoint_frequency: Option<u64>,
446 ) {
447 match self.databases.entry(database_id) {
448 Entry::Occupied(mut entry) => {
449 let db_state = entry.get_mut();
450 db_state.barrier_interval =
451 barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64));
452 db_state.checkpoint_frequency = checkpoint_frequency;
453 db_state.num_uncheckpointed_barrier = 0;
455 }
456 Entry::Vacant(entry) => {
457 entry.insert(DatabaseBarrierState::new(
458 barrier_interval_ms,
459 checkpoint_frequency,
460 ));
461 }
462 }
463
464 let duration = if let Some(ms) = barrier_interval_ms {
466 Duration::from_millis(ms as u64)
467 } else {
468 self.sys_barrier_interval
469 };
470
471 let interval_stream = Self::new_interval_stream(duration, &database_id);
472 self.timer_streams.insert(database_id, interval_stream);
473 }
474
475 pub fn force_checkpoint_in_next_barrier(&mut self, database_id: DatabaseId) {
477 if self.databases.contains_key(&database_id) {
478 self.force_checkpoint_databases.insert(database_id);
479 } else {
480 warn!(
481 ?database_id,
482 "force checkpoint in next barrier for non-existing database"
483 );
484 }
485 }
486
487 fn reset_database_timer(&mut self, database_id: DatabaseId) {
488 assert!(
490 self.databases.contains_key(&database_id),
491 "database {} not found in scheduled barriers",
492 database_id
493 );
494 assert!(
495 self.timer_streams.contains_key(&database_id),
496 "timer stream for database {} not found in scheduled barriers",
497 database_id
498 );
499 for (db_id, timer_stream) in self.timer_streams.iter_mut() {
501 if *db_id == database_id {
502 timer_stream.as_mut().reset();
503 }
504 }
505 }
506
507 #[await_tree::instrument]
508 pub(super) async fn next_barrier(
509 &mut self,
510 context: &impl GlobalBarrierWorkerContext,
511 ) -> NewBarrier {
512 let force_checkpoint_database = self.force_checkpoint_databases.drain().next();
513 let new_barrier = if let Some(database_id) = force_checkpoint_database {
514 self.reset_database_timer(database_id);
515 NewBarrier {
516 database_id,
517 command: None,
518 span: tracing_span(),
519 checkpoint: true,
520 }
521 } else {
522 select! {
523 biased;
524 scheduled = context.next_scheduled() => {
525 let database_id = scheduled.database_id;
526 self.reset_database_timer(database_id);
527 let checkpoint = scheduled.command.need_checkpoint() || self.try_get_checkpoint(database_id);
528 NewBarrier {
529 database_id: scheduled.database_id,
530 command: Some((scheduled.command, scheduled.notifiers)),
531 span: scheduled.span,
532 checkpoint,
533 }
534 },
535 (database_id, _instant) = pending_on_none(self.timer_streams.next()) => {
538 let checkpoint = self.try_get_checkpoint(database_id);
539 NewBarrier {
540 database_id,
541 command: None,
542 span: tracing_span(),
543 checkpoint,
544 }
545 }
546 }
547 };
548 self.update_num_uncheckpointed_barrier(new_barrier.database_id, new_barrier.checkpoint);
549
550 new_barrier
551 }
552
553 fn try_get_checkpoint(&self, database_id: DatabaseId) -> bool {
555 let db_state = self.databases.get(&database_id).unwrap();
556 let checkpoint_frequency = db_state
557 .checkpoint_frequency
558 .unwrap_or(self.sys_checkpoint_frequency);
559 db_state.num_uncheckpointed_barrier + 1 >= checkpoint_frequency
560 }
561
562 fn update_num_uncheckpointed_barrier(&mut self, database_id: DatabaseId, checkpoint: bool) {
564 let db_state = self.databases.get_mut(&database_id).unwrap();
565 if checkpoint {
566 db_state.num_uncheckpointed_barrier = 0;
567 } else {
568 db_state.num_uncheckpointed_barrier += 1;
569 }
570 }
571}
572
573impl ScheduledBarriers {
574 pub(super) async fn next_scheduled(&self) -> Scheduled {
575 'outer: loop {
576 let mut rx = self.inner.changed_tx.subscribe();
577 {
578 let mut queue = self.inner.queue.lock();
579 if queue.status.is_blocked() {
580 continue;
581 }
582 for (database_id, queue) in &mut queue.queue {
583 if queue.status.is_blocked() {
584 continue;
585 }
586 if let Some(item) = queue.queue.inner.pop_front() {
587 item.send_latency_timer.observe_duration();
588 break 'outer Scheduled {
589 database_id: *database_id,
590 command: item.command,
591 notifiers: item.notifiers,
592 span: item.span,
593 };
594 }
595 }
596 }
597 rx.changed().await.unwrap();
598 }
599 }
600}
601
602pub(super) enum MarkReadyOptions {
603 Database(DatabaseId),
604 Global {
605 blocked_databases: HashSet<DatabaseId>,
606 },
607}
608
609impl ScheduledBarriers {
610 pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> Vec<TableId> {
612 self.pre_apply_drop_cancel_scheduled(database_id)
613 }
614
615 pub(super) fn abort_and_mark_blocked(
618 &self,
619 database_id: Option<DatabaseId>,
620 reason: impl Into<String>,
621 ) {
622 let mut queue = self.inner.queue.lock();
623 fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
624 format!("database {} unavailable {}", database_id, reason)
625 }
626 fn mark_blocked_and_notify_failed(
627 database_id: DatabaseId,
628 queue: &mut DatabaseScheduledQueue,
629 reason: &String,
630 ) {
631 let reason = database_blocked_reason(database_id, reason);
632 let err: MetaError = anyhow!("{}", reason).into();
633 queue.mark_blocked(reason);
634 while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
635 notifiers
636 .into_iter()
637 .for_each(|notify| notify.notify_collection_failed(err.clone()))
638 }
639 }
640 if let Some(database_id) = database_id {
641 let reason = reason.into();
642 match queue.queue.entry(database_id) {
643 Entry::Occupied(entry) => {
644 let queue = entry.into_mut();
645 if queue.status.is_blocked() {
646 if cfg!(debug_assertions) {
647 panic!("database {} marked as blocked twice", database_id);
648 } else {
649 warn!(?database_id, "database marked as blocked twice");
650 }
651 }
652 info!(?database_id, "database marked as blocked");
653 mark_blocked_and_notify_failed(database_id, queue, &reason);
654 }
655 Entry::Vacant(entry) => {
656 entry.insert(DatabaseScheduledQueue::new(
657 database_id,
658 &self.inner.metrics,
659 QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
660 ));
661 }
662 }
663 } else {
664 let reason = reason.into();
665 if queue.status.is_blocked() {
666 if cfg!(debug_assertions) {
667 panic!("cluster marked as blocked twice");
668 } else {
669 warn!("cluster marked as blocked twice");
670 }
671 }
672 info!("cluster marked as blocked");
673 queue.mark_blocked(reason.clone());
674 for (database_id, queue) in &mut queue.queue {
675 mark_blocked_and_notify_failed(*database_id, queue, &reason);
676 }
677 }
678 }
679
680 pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
682 let mut queue = self.inner.queue.lock();
683 let queue = &mut *queue;
684 match options {
685 MarkReadyOptions::Database(database_id) => {
686 info!(?database_id, "database marked as ready");
687 let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
688 DatabaseScheduledQueue::new(
689 database_id,
690 &self.inner.metrics,
691 QueueStatus::Ready,
692 )
693 });
694 if !database_queue.status.is_blocked() {
695 if cfg!(debug_assertions) {
696 panic!("database {} marked as ready twice", database_id);
697 } else {
698 warn!(?database_id, "database marked as ready twice");
699 }
700 }
701 if database_queue.mark_ready()
702 && !queue.status.is_blocked()
703 && !database_queue.queue.inner.is_empty()
704 {
705 self.inner.changed_tx.send(()).ok();
706 }
707 }
708 MarkReadyOptions::Global { blocked_databases } => {
709 if !queue.status.is_blocked() {
710 if cfg!(debug_assertions) {
711 panic!("cluster marked as ready twice");
712 } else {
713 warn!("cluster marked as ready twice");
714 }
715 }
716 info!(?blocked_databases, "cluster marked as ready");
717 let prev_blocked = queue.mark_ready();
718 for database_id in &blocked_databases {
719 queue.queue.entry(*database_id).or_insert_with(|| {
720 DatabaseScheduledQueue::new(
721 *database_id,
722 &self.inner.metrics,
723 QueueStatus::Blocked(format!(
724 "database {} failed to recover in global recovery",
725 database_id
726 )),
727 )
728 });
729 }
730 for (database_id, queue) in &mut queue.queue {
731 if !blocked_databases.contains(database_id) {
732 queue.mark_ready();
733 }
734 }
735 if prev_blocked
736 && queue
737 .queue
738 .values()
739 .any(|database_queue| !database_queue.queue.inner.is_empty())
740 {
741 self.inner.changed_tx.send(()).ok();
742 }
743 }
744 }
745 }
746
747 pub(super) fn pre_apply_drop_cancel_scheduled(
750 &self,
751 database_id: Option<DatabaseId>,
752 ) -> Vec<TableId> {
753 let mut queue = self.inner.queue.lock();
754 let mut dropped_tables = vec![];
755
756 let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
757 while let Some(ScheduledQueueItem {
758 notifiers, command, ..
759 }) = queue.queue.inner.pop_front()
760 {
761 match command {
762 Command::DropStreamingJobs {
763 unregistered_state_table_ids,
764 ..
765 } => {
766 dropped_tables.extend(unregistered_state_table_ids);
767 }
768 Command::DropSubscription { .. } => {}
769 _ => {
770 unreachable!("only drop and cancel streaming jobs should be buffered");
771 }
772 }
773 notifiers.into_iter().for_each(|notify| {
774 notify.notify_collected();
775 });
776 }
777 };
778
779 if let Some(database_id) = database_id {
780 assert_matches!(queue.status, QueueStatus::Ready);
781 if let Some(queue) = queue.queue.get_mut(&database_id) {
782 assert_matches!(queue.status, QueueStatus::Blocked(_));
783 pre_apply_drop_cancel(queue);
784 }
785 } else {
786 assert_matches!(queue.status, QueueStatus::Blocked(_));
787 for queue in queue.queue.values_mut() {
788 pre_apply_drop_cancel(queue);
789 }
790 }
791
792 dropped_tables
793 }
794}
795
796#[cfg(test)]
797mod tests {
798 use futures::FutureExt;
799
800 use super::*;
801
802 fn create_test_database(
803 id: u32,
804 barrier_interval_ms: Option<u32>,
805 checkpoint_frequency: Option<u64>,
806 ) -> Database {
807 Database {
808 id: id.into(),
809 name: format!("test_db_{}", id),
810 barrier_interval_ms,
811 checkpoint_frequency,
812 ..Default::default()
813 }
814 }
815
816 struct MockGlobalBarrierWorkerContext {
818 scheduled_rx: tokio::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<Scheduled>>,
819 }
820
821 impl MockGlobalBarrierWorkerContext {
822 fn new() -> (Self, tokio::sync::mpsc::UnboundedSender<Scheduled>) {
823 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
824 (
825 Self {
826 scheduled_rx: tokio::sync::Mutex::new(rx),
827 },
828 tx,
829 )
830 }
831 }
832
833 impl GlobalBarrierWorkerContext for MockGlobalBarrierWorkerContext {
834 async fn next_scheduled(&self) -> Scheduled {
835 self.scheduled_rx.lock().await.recv().await.unwrap()
836 }
837
838 async fn commit_epoch(
839 &self,
840 _commit_info: crate::hummock::CommitEpochInfo,
841 ) -> MetaResult<risingwave_pb::hummock::HummockVersionStats> {
842 unimplemented!()
843 }
844
845 fn abort_and_mark_blocked(
846 &self,
847 _database_id: Option<DatabaseId>,
848 _recovery_reason: crate::barrier::RecoveryReason,
849 ) {
850 unimplemented!()
851 }
852
853 fn mark_ready(&self, _options: MarkReadyOptions) {
854 unimplemented!()
855 }
856
857 async fn post_collect_command<'a>(
858 &'a self,
859 _command: &'a crate::barrier::command::CommandContext,
860 ) -> MetaResult<()> {
861 unimplemented!()
862 }
863
864 async fn notify_creating_job_failed(&self, _database_id: Option<DatabaseId>, _err: String) {
865 unimplemented!()
866 }
867
868 async fn finish_creating_job(
869 &self,
870 _job: crate::barrier::progress::TrackingJob,
871 ) -> MetaResult<()> {
872 unimplemented!()
873 }
874
875 async fn new_control_stream(
876 &self,
877 _node: &risingwave_pb::common::WorkerNode,
878 _init_request: &risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest,
879 ) -> MetaResult<risingwave_rpc_client::StreamingControlHandle> {
880 unimplemented!()
881 }
882
883 async fn reload_runtime_info(
884 &self,
885 ) -> MetaResult<crate::barrier::BarrierWorkerRuntimeInfoSnapshot> {
886 unimplemented!()
887 }
888
889 async fn reload_database_runtime_info(
890 &self,
891 _database_id: DatabaseId,
892 ) -> MetaResult<Option<crate::barrier::DatabaseRuntimeInfoSnapshot>> {
893 unimplemented!()
894 }
895
896 async fn handle_list_finished_source_ids(
897 &self,
898 _list_finished_source_ids: Vec<
899 risingwave_pb::stream_service::barrier_complete_response::PbListFinishedSource,
900 >,
901 ) -> MetaResult<()> {
902 unimplemented!()
903 }
904
905 async fn handle_load_finished_source_ids(
906 &self,
907 _load_finished_source_ids: Vec<
908 risingwave_pb::stream_service::barrier_complete_response::PbLoadFinishedSource,
909 >,
910 ) -> MetaResult<()> {
911 unimplemented!()
912 }
913
914 async fn finish_cdc_table_backfill(&self, _job_id: JobId) -> MetaResult<()> {
915 unimplemented!()
916 }
917
918 async fn handle_refresh_finished_table_ids(
919 &self,
920 _refresh_finished_table_ids: Vec<JobId>,
921 ) -> MetaResult<()> {
922 unimplemented!()
923 }
924 }
925
926 #[tokio::test(start_paused = true)]
927 async fn test_next_barrier_with_different_intervals() {
928 let databases = vec![
930 create_test_database(1, Some(50), Some(2)), create_test_database(2, Some(100), Some(3)), create_test_database(3, None, Some(5)), ];
934
935 let mut periodic = PeriodicBarriers::new(
936 Duration::from_millis(200), 10, databases,
939 );
940
941 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
942
943 for _ in 0..3 {
945 let barrier = periodic.next_barrier(&context).await;
946 assert!(barrier.command.is_none()); assert!(!barrier.checkpoint); }
949
950 let start_time = Instant::now();
953 let barrier = periodic.next_barrier(&context).await;
954 let mut elapsed = start_time.elapsed();
955
956 assert_eq!(barrier.database_id, DatabaseId::from(1));
958 assert!(barrier.command.is_none()); assert!(barrier.checkpoint); assert_eq!(
962 elapsed,
963 Duration::from_millis(50),
964 "Elapsed time exceeded: {:?}",
965 elapsed
966 );
967
968 let db1_id = DatabaseId::from(1);
970 let db1_state = periodic.databases.get_mut(&db1_id).unwrap();
971 assert_eq!(db1_state.num_uncheckpointed_barrier, 0); for _ in 0..2 {
975 let barrier = periodic.next_barrier(&context).await;
976 assert!(barrier.command.is_none()); assert!(!barrier.checkpoint); }
979
980 elapsed = start_time.elapsed();
981
982 assert_eq!(
983 elapsed,
984 Duration::from_millis(100),
985 "Elapsed time exceeded: {:?}",
986 elapsed
987 );
988 }
989
990 #[tokio::test]
991 async fn test_next_barrier_with_scheduled_command() {
992 let databases = vec![
993 create_test_database(1, Some(1000), Some(2)), ];
995
996 let mut periodic = PeriodicBarriers::new(Duration::from_millis(1000), 10, databases);
997
998 let (context, tx) = MockGlobalBarrierWorkerContext::new();
999
1000 periodic.next_barrier(&context).await;
1002
1003 let scheduled_command = Scheduled {
1005 database_id: DatabaseId::from(1),
1006 command: Command::Flush,
1007 notifiers: vec![],
1008 span: tracing::Span::none(),
1009 };
1010
1011 let tx_clone = tx.clone();
1013 tokio::spawn(async move {
1014 tokio::time::sleep(Duration::from_millis(10)).await;
1015 tx_clone.send(scheduled_command).unwrap();
1016 });
1017
1018 let barrier = periodic.next_barrier(&context).await;
1019
1020 assert!(barrier.command.is_some());
1022 assert_eq!(barrier.database_id, DatabaseId::from(1));
1023
1024 if let Some((command, _)) = barrier.command {
1025 assert!(matches!(command, Command::Flush));
1026 }
1027 }
1028
1029 #[tokio::test(start_paused = true)]
1030 async fn test_next_barrier_multiple_databases_timing() {
1031 let databases = vec![
1032 create_test_database(1, Some(30), Some(10)), create_test_database(2, Some(100), Some(10)), ];
1035
1036 let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 10, databases);
1037
1038 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1039
1040 for _ in 0..2 {
1042 periodic.next_barrier(&context).await;
1043 }
1044
1045 let mut barrier_counts = HashMap::new();
1046
1047 let mut barriers = Vec::new();
1049 for _ in 0..5 {
1050 let barrier = periodic.next_barrier(&context).await;
1051 barriers.push(barrier);
1052 }
1053
1054 for barrier in barriers {
1056 *barrier_counts.entry(barrier.database_id).or_insert(0) += 1;
1057 }
1058
1059 let db1_count = barrier_counts.get(&DatabaseId::from(1)).unwrap_or(&0);
1061 let db2_count = barrier_counts.get(&DatabaseId::from(2)).unwrap_or(&0);
1062
1063 assert_eq!(*db1_count, 4);
1065 assert_eq!(*db2_count, 1);
1066 }
1067
1068 #[tokio::test]
1069 async fn test_next_barrier_force_checkpoint() {
1070 let databases = vec![create_test_database(1, Some(100), Some(10))];
1071
1072 let mut periodic = PeriodicBarriers::new(Duration::from_millis(100), 10, databases);
1073
1074 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1075
1076 periodic.force_checkpoint_in_next_barrier(DatabaseId::from(1));
1078
1079 let barrier = periodic.next_barrier(&context).now_or_never().unwrap();
1080
1081 assert!(barrier.checkpoint);
1083 assert_eq!(barrier.database_id, DatabaseId::from(1));
1084 assert!(barrier.command.is_none());
1085 }
1086
1087 #[tokio::test]
1088 async fn test_next_barrier_checkpoint_frequency() {
1089 let databases = vec![create_test_database(1, Some(50), Some(2))]; let mut periodic = PeriodicBarriers::new(Duration::from_millis(50), 10, databases);
1092
1093 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1094
1095 let barrier1 = periodic.next_barrier(&context).await;
1097 assert!(!barrier1.checkpoint);
1098
1099 let barrier2 = periodic.next_barrier(&context).await;
1101 assert!(barrier2.checkpoint);
1102
1103 let barrier3 = periodic.next_barrier(&context).await;
1105 assert!(!barrier3.checkpoint);
1106 }
1107
1108 #[tokio::test]
1109 async fn test_update_database_barrier() {
1110 let databases = vec![create_test_database(1, Some(1000), Some(10))];
1111
1112 let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 20, databases);
1113
1114 let database_id = DatabaseId::new(1);
1115
1116 periodic.update_database_barrier(database_id, Some(2000), Some(15));
1118
1119 let db_state = periodic.databases.get(&database_id).unwrap();
1120 assert_eq!(db_state.barrier_interval, Some(Duration::from_millis(2000)));
1121 assert_eq!(db_state.checkpoint_frequency, Some(15));
1122 assert_eq!(db_state.num_uncheckpointed_barrier, 0);
1123 assert!(!periodic.force_checkpoint_databases.contains(&database_id));
1124
1125 periodic.update_database_barrier(DatabaseId::from(2), None, None);
1127
1128 assert!(periodic.databases.contains_key(&DatabaseId::from(2)));
1129 let db2_state = periodic.databases.get(&DatabaseId::from(2)).unwrap();
1130 assert_eq!(db2_state.barrier_interval, None);
1131 assert_eq!(db2_state.checkpoint_frequency, None);
1132 }
1133}