1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, anyhow};
21use assert_matches::assert_matches;
22use await_tree::InstrumentAwait;
23use itertools::Itertools;
24use parking_lot::Mutex;
25use prometheus::HistogramTimer;
26use risingwave_common::catalog::{DatabaseId, TableId};
27use risingwave_common::metrics::LabelGuardedHistogram;
28use risingwave_hummock_sdk::HummockVersionId;
29use risingwave_pb::catalog::Database;
30use rw_futures_util::pending_on_none;
31use tokio::select;
32use tokio::sync::{oneshot, watch};
33use tokio_stream::wrappers::IntervalStream;
34use tokio_stream::{StreamExt, StreamMap};
35use tracing::{info, warn};
36
37use super::notifier::Notifier;
38use super::{Command, Scheduled};
39use crate::barrier::context::GlobalBarrierWorkerContext;
40use crate::hummock::HummockManagerRef;
41use crate::rpc::metrics::MetaMetrics;
42use crate::{MetaError, MetaResult};
43
44pub(super) struct NewBarrier {
45 pub database_id: DatabaseId,
46 pub command: Option<(Command, Vec<Notifier>)>,
47 pub span: tracing::Span,
48 pub checkpoint: bool,
49}
50
51struct Inner {
56 queue: Mutex<ScheduledQueue>,
57
58 changed_tx: watch::Sender<()>,
60
61 metrics: Arc<MetaMetrics>,
63}
64
65#[derive(Debug)]
66enum QueueStatus {
67 Ready,
69 Blocked(String),
71}
72
73impl QueueStatus {
74 fn is_blocked(&self) -> bool {
75 matches!(self, Self::Blocked(_))
76 }
77}
78
79struct ScheduledQueueItem {
80 command: Command,
81 notifiers: Vec<Notifier>,
82 send_latency_timer: HistogramTimer,
83 span: tracing::Span,
84}
85
86struct StatusQueue<T> {
87 queue: T,
88 status: QueueStatus,
89}
90
91struct DatabaseQueue {
92 inner: VecDeque<ScheduledQueueItem>,
93 send_latency: LabelGuardedHistogram,
94}
95
96type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
97type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
98
99impl DatabaseScheduledQueue {
100 fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
101 Self {
102 queue: DatabaseQueue {
103 inner: Default::default(),
104 send_latency: metrics
105 .barrier_send_latency
106 .with_guarded_label_values(&[database_id.database_id.to_string().as_str()]),
107 },
108 status,
109 }
110 }
111}
112
113impl<T> StatusQueue<T> {
114 fn mark_blocked(&mut self, reason: String) {
115 self.status = QueueStatus::Blocked(reason);
116 }
117
118 fn mark_ready(&mut self) -> bool {
119 let prev_blocked = self.status.is_blocked();
120 self.status = QueueStatus::Ready;
121 prev_blocked
122 }
123
124 fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
125 if let QueueStatus::Blocked(reason) = &self.status
131 && !matches!(
132 command,
133 Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
134 )
135 {
136 return Err(MetaError::unavailable(reason));
137 }
138 Ok(())
139 }
140}
141
142fn tracing_span() -> tracing::Span {
143 if tracing::Span::current().is_none() {
144 tracing::Span::none()
145 } else {
146 tracing::info_span!(
147 "barrier",
148 checkpoint = tracing::field::Empty,
149 epoch = tracing::field::Empty
150 )
151 }
152}
153
154#[derive(Clone)]
157pub struct BarrierScheduler {
158 inner: Arc<Inner>,
159
160 hummock_manager: HummockManagerRef,
162}
163
164impl BarrierScheduler {
165 pub fn new_pair(
168 hummock_manager: HummockManagerRef,
169 metrics: Arc<MetaMetrics>,
170 ) -> (Self, ScheduledBarriers) {
171 let inner = Arc::new(Inner {
172 queue: Mutex::new(ScheduledQueue {
173 queue: Default::default(),
174 status: QueueStatus::Ready,
175 }),
176 changed_tx: watch::channel(()).0,
177 metrics,
178 });
179
180 (
181 Self {
182 inner: inner.clone(),
183 hummock_manager,
184 },
185 ScheduledBarriers { inner },
186 )
187 }
188
189 fn push(
191 &self,
192 database_id: DatabaseId,
193 scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
194 ) -> MetaResult<()> {
195 let mut queue = self.inner.queue.lock();
196 let scheduleds = scheduleds.into_iter().collect_vec();
197 scheduleds
198 .iter()
199 .try_for_each(|(command, _)| queue.validate_item(command))?;
200 let queue = queue.queue.entry(database_id).or_insert_with(|| {
201 DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
202 });
203 scheduleds
204 .iter()
205 .try_for_each(|(command, _)| queue.validate_item(command))?;
206 for (command, notifier) in scheduleds {
207 queue.queue.inner.push_back(ScheduledQueueItem {
208 command,
209 notifiers: vec![notifier],
210 send_latency_timer: queue.queue.send_latency.start_timer(),
211 span: tracing_span(),
212 });
213 if queue.queue.inner.len() == 1 {
214 self.inner.changed_tx.send(()).ok();
215 }
216 }
217 Ok(())
218 }
219
220 pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, table_id: TableId) -> bool {
222 let queue = &mut self.inner.queue.lock();
223 let Some(queue) = queue.queue.get_mut(&database_id) else {
224 return false;
225 };
226
227 if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
228 if let Command::CreateStreamingJob { info, .. } = &scheduled.command
229 && info.stream_job_fragments.stream_job_id() == table_id
230 {
231 true
232 } else {
233 false
234 }
235 }) {
236 queue.queue.inner.remove(idx).unwrap();
237 true
238 } else {
239 false
240 }
241 }
242
243 #[await_tree::instrument("run_commands({})", commands.iter().join(", "))]
250 async fn run_multiple_commands(
251 &self,
252 database_id: DatabaseId,
253 commands: Vec<Command>,
254 ) -> MetaResult<()> {
255 let mut contexts = Vec::with_capacity(commands.len());
256 let mut scheduleds = Vec::with_capacity(commands.len());
257
258 for command in commands {
259 let (started_tx, started_rx) = oneshot::channel();
260 let (collect_tx, collect_rx) = oneshot::channel();
261
262 contexts.push((started_rx, collect_rx));
263 scheduleds.push((
264 command,
265 Notifier {
266 started: Some(started_tx),
267 collected: Some(collect_tx),
268 },
269 ));
270 }
271
272 self.push(database_id, scheduleds)?;
273
274 for (injected_rx, collect_rx) in contexts {
275 tracing::trace!("waiting for injected_rx");
277 injected_rx
278 .instrument_await("wait_injected")
279 .await
280 .ok()
281 .context("failed to inject barrier")??;
282
283 tracing::trace!("waiting for collect_rx");
284 collect_rx
286 .instrument_await("wait_collected")
287 .await
288 .ok()
289 .context("failed to collect barrier")??;
290 }
291
292 Ok(())
293 }
294
295 pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
299 tracing::trace!("run_command: {:?}", command);
300 let ret = self.run_multiple_commands(database_id, vec![command]).await;
301 tracing::trace!("run_command finished");
302 ret
303 }
304
305 pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
307 let start = Instant::now();
308
309 tracing::debug!("start barrier flush");
310 self.run_multiple_commands(database_id, vec![Command::Flush])
311 .await?;
312
313 let elapsed = Instant::now().duration_since(start);
314 tracing::debug!("barrier flushed in {:?}", elapsed);
315
316 let version_id = self.hummock_manager.get_version_id().await;
317 Ok(version_id)
318 }
319}
320
321pub struct ScheduledBarriers {
323 inner: Arc<Inner>,
324}
325
326#[derive(Debug)]
328pub struct DatabaseBarrierState {
329 pub barrier_interval: Option<Duration>,
330 pub checkpoint_frequency: Option<u64>,
331 pub force_checkpoint: bool,
333 pub num_uncheckpointed_barrier: u64,
335}
336
337impl DatabaseBarrierState {
338 fn new(barrier_interval_ms: Option<u32>, checkpoint_frequency: Option<u64>) -> Self {
339 Self {
340 barrier_interval: barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64)),
341 checkpoint_frequency,
342 force_checkpoint: false,
343 num_uncheckpointed_barrier: 0,
344 }
345 }
346}
347
348#[derive(Default, Debug)]
350pub struct PeriodicBarriers {
351 sys_barrier_interval: Duration,
353 sys_checkpoint_frequency: u64,
354 databases: HashMap<DatabaseId, DatabaseBarrierState>,
356 timer_streams: StreamMap<DatabaseId, IntervalStream>,
359}
360
361impl PeriodicBarriers {
362 pub(super) fn new(
363 sys_barrier_interval: Duration,
364 sys_checkpoint_frequency: u64,
365 database_infos: Vec<Database>,
366 ) -> Self {
367 let mut databases = HashMap::with_capacity(database_infos.len());
368 let mut timer_streams = StreamMap::with_capacity(database_infos.len());
369 database_infos.into_iter().for_each(|database| {
370 let database_id: DatabaseId = database.id.into();
371 let barrier_interval_ms = database.barrier_interval_ms;
372 let checkpoint_frequency = database.checkpoint_frequency;
373 databases.insert(
374 database_id,
375 DatabaseBarrierState::new(barrier_interval_ms, checkpoint_frequency),
376 );
377 let duration = if let Some(ms) = barrier_interval_ms {
378 Duration::from_millis(ms as u64)
379 } else {
380 sys_barrier_interval
381 };
382 let interval_stream = Self::new_interval_stream(duration);
384 timer_streams.insert(database_id, interval_stream);
385 });
386 Self {
387 sys_barrier_interval,
388 sys_checkpoint_frequency,
389 databases,
390 timer_streams,
391 }
392 }
393
394 fn new_interval_stream(duration: Duration) -> IntervalStream {
396 let mut interval = tokio::time::interval(duration);
397 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
398 IntervalStream::new(interval)
399 }
400
401 pub(super) fn set_sys_barrier_interval(&mut self, duration: Duration) {
403 if self.sys_barrier_interval == duration {
404 return;
405 }
406 self.sys_barrier_interval = duration;
407 for (db_id, db_state) in &mut self.databases {
409 if db_state.barrier_interval.is_none() {
410 let interval_stream = Self::new_interval_stream(duration);
411 self.timer_streams.insert(*db_id, interval_stream);
412 }
413 }
414 }
415
416 pub fn set_sys_checkpoint_frequency(&mut self, frequency: u64) {
418 if self.sys_checkpoint_frequency == frequency {
419 return;
420 }
421 self.sys_checkpoint_frequency = frequency;
422 for db_state in self.databases.values_mut() {
424 if db_state.checkpoint_frequency.is_none() {
425 db_state.num_uncheckpointed_barrier = 0;
426 db_state.force_checkpoint = false;
427 }
428 }
429 }
430
431 pub(super) fn update_database_barrier(
432 &mut self,
433 database_id: DatabaseId,
434 barrier_interval_ms: Option<u32>,
435 checkpoint_frequency: Option<u64>,
436 ) {
437 match self.databases.entry(database_id) {
438 Entry::Occupied(mut entry) => {
439 let db_state = entry.get_mut();
440 db_state.barrier_interval =
441 barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64));
442 db_state.checkpoint_frequency = checkpoint_frequency;
443 db_state.num_uncheckpointed_barrier = 0;
445 db_state.force_checkpoint = false;
446 }
447 Entry::Vacant(entry) => {
448 entry.insert(DatabaseBarrierState::new(
449 barrier_interval_ms,
450 checkpoint_frequency,
451 ));
452 }
453 }
454
455 let duration = if let Some(ms) = barrier_interval_ms {
457 Duration::from_millis(ms as u64)
458 } else {
459 self.sys_barrier_interval
460 };
461 let interval_stream = Self::new_interval_stream(duration);
462 self.timer_streams.insert(database_id, interval_stream);
463 }
464
465 pub fn force_checkpoint_in_next_barrier(&mut self, database_id: DatabaseId) {
467 if let Some(db_state) = self.databases.get_mut(&database_id) {
468 db_state.force_checkpoint = true;
469 } else {
470 warn!(
471 ?database_id,
472 "force checkpoint in next barrier for non-existing database"
473 );
474 }
475 }
476
477 #[await_tree::instrument]
478 pub(super) async fn next_barrier(
479 &mut self,
480 context: &impl GlobalBarrierWorkerContext,
481 ) -> NewBarrier {
482 let new_barrier = select! {
483 biased;
484 scheduled = context.next_scheduled() => {
485 let database_id = scheduled.database_id;
486 assert!(self.databases.contains_key(&database_id), "database {} not found in periodic barriers", database_id);
488 assert!(self.timer_streams.contains_key(&database_id), "timer stream for database {} not found in periodic barriers", database_id);
489 for (db_id, timer_stream) in self.timer_streams.iter_mut() {
491 if *db_id == database_id {
492 timer_stream.as_mut().reset();
493 }
494 }
495 let checkpoint = scheduled.command.need_checkpoint() || self.try_get_checkpoint(database_id);
496 NewBarrier {
497 database_id: scheduled.database_id,
498 command: Some((scheduled.command, scheduled.notifiers)),
499 span: scheduled.span,
500 checkpoint,
501 }
502 },
503 next_timer = pending_on_none(self.timer_streams.next()) => {
506 let (database_id, _instant) = next_timer;
507 let checkpoint = self.try_get_checkpoint(database_id);
508 NewBarrier {
509 database_id,
510 command: None,
511 span: tracing_span(),
512 checkpoint,
513 }
514 }
515 };
516 self.update_num_uncheckpointed_barrier(new_barrier.database_id, new_barrier.checkpoint);
517
518 new_barrier
519 }
520
521 fn try_get_checkpoint(&self, database_id: DatabaseId) -> bool {
523 let db_state = self.databases.get(&database_id).unwrap();
524 let checkpoint_frequency = db_state
525 .checkpoint_frequency
526 .unwrap_or(self.sys_checkpoint_frequency);
527 db_state.num_uncheckpointed_barrier + 1 >= checkpoint_frequency || db_state.force_checkpoint
528 }
529
530 fn update_num_uncheckpointed_barrier(&mut self, database_id: DatabaseId, checkpoint: bool) {
532 let db_state = self.databases.get_mut(&database_id).unwrap();
533 if checkpoint {
534 db_state.num_uncheckpointed_barrier = 0;
535 db_state.force_checkpoint = false;
536 } else {
537 db_state.num_uncheckpointed_barrier += 1;
538 }
539 }
540}
541
542impl ScheduledBarriers {
543 pub(super) async fn next_scheduled(&self) -> Scheduled {
544 'outer: loop {
545 let mut rx = self.inner.changed_tx.subscribe();
546 {
547 let mut queue = self.inner.queue.lock();
548 if queue.status.is_blocked() {
549 continue;
550 }
551 for (database_id, queue) in &mut queue.queue {
552 if queue.status.is_blocked() {
553 continue;
554 }
555 if let Some(item) = queue.queue.inner.pop_front() {
556 item.send_latency_timer.observe_duration();
557 break 'outer Scheduled {
558 database_id: *database_id,
559 command: item.command,
560 notifiers: item.notifiers,
561 span: item.span,
562 };
563 }
564 }
565 }
566 rx.changed().await.unwrap();
567 }
568 }
569}
570
571pub(super) enum MarkReadyOptions {
572 Database(DatabaseId),
573 Global {
574 blocked_databases: HashSet<DatabaseId>,
575 },
576}
577
578impl ScheduledBarriers {
579 pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> bool {
581 self.pre_apply_drop_cancel_scheduled(database_id)
582 }
583
584 pub(super) fn abort_and_mark_blocked(
587 &self,
588 database_id: Option<DatabaseId>,
589 reason: impl Into<String>,
590 ) {
591 let mut queue = self.inner.queue.lock();
592 fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
593 format!("database {} unavailable {}", database_id, reason)
594 }
595 fn mark_blocked_and_notify_failed(
596 database_id: DatabaseId,
597 queue: &mut DatabaseScheduledQueue,
598 reason: &String,
599 ) {
600 let reason = database_blocked_reason(database_id, reason);
601 let err: MetaError = anyhow!("{}", reason).into();
602 queue.mark_blocked(reason);
603 while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
604 notifiers
605 .into_iter()
606 .for_each(|notify| notify.notify_collection_failed(err.clone()))
607 }
608 }
609 if let Some(database_id) = database_id {
610 let reason = reason.into();
611 match queue.queue.entry(database_id) {
612 Entry::Occupied(entry) => {
613 let queue = entry.into_mut();
614 if queue.status.is_blocked() {
615 if cfg!(debug_assertions) {
616 panic!("database {} marked as blocked twice", database_id);
617 } else {
618 warn!(?database_id, "database marked as blocked twice");
619 }
620 }
621 info!(?database_id, "database marked as blocked");
622 mark_blocked_and_notify_failed(database_id, queue, &reason);
623 }
624 Entry::Vacant(entry) => {
625 entry.insert(DatabaseScheduledQueue::new(
626 database_id,
627 &self.inner.metrics,
628 QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
629 ));
630 }
631 }
632 } else {
633 let reason = reason.into();
634 if queue.status.is_blocked() {
635 if cfg!(debug_assertions) {
636 panic!("cluster marked as blocked twice");
637 } else {
638 warn!("cluster marked as blocked twice");
639 }
640 }
641 info!("cluster marked as blocked");
642 queue.mark_blocked(reason.clone());
643 for (database_id, queue) in &mut queue.queue {
644 mark_blocked_and_notify_failed(*database_id, queue, &reason);
645 }
646 }
647 }
648
649 pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
651 let mut queue = self.inner.queue.lock();
652 let queue = &mut *queue;
653 match options {
654 MarkReadyOptions::Database(database_id) => {
655 info!(?database_id, "database marked as ready");
656 let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
657 DatabaseScheduledQueue::new(
658 database_id,
659 &self.inner.metrics,
660 QueueStatus::Ready,
661 )
662 });
663 if !database_queue.status.is_blocked() {
664 if cfg!(debug_assertions) {
665 panic!("database {} marked as ready twice", database_id);
666 } else {
667 warn!(?database_id, "database marked as ready twice");
668 }
669 }
670 if database_queue.mark_ready()
671 && !queue.status.is_blocked()
672 && !database_queue.queue.inner.is_empty()
673 {
674 self.inner.changed_tx.send(()).ok();
675 }
676 }
677 MarkReadyOptions::Global { blocked_databases } => {
678 if !queue.status.is_blocked() {
679 if cfg!(debug_assertions) {
680 panic!("cluster marked as ready twice");
681 } else {
682 warn!("cluster marked as ready twice");
683 }
684 }
685 info!(?blocked_databases, "cluster marked as ready");
686 let prev_blocked = queue.mark_ready();
687 for database_id in &blocked_databases {
688 queue.queue.entry(*database_id).or_insert_with(|| {
689 DatabaseScheduledQueue::new(
690 *database_id,
691 &self.inner.metrics,
692 QueueStatus::Blocked(format!(
693 "database {} failed to recover in global recovery",
694 database_id
695 )),
696 )
697 });
698 }
699 for (database_id, queue) in &mut queue.queue {
700 if !blocked_databases.contains(database_id) {
701 queue.mark_ready();
702 }
703 }
704 if prev_blocked
705 && queue
706 .queue
707 .values()
708 .any(|database_queue| !database_queue.queue.inner.is_empty())
709 {
710 self.inner.changed_tx.send(()).ok();
711 }
712 }
713 }
714 }
715
716 pub(super) fn pre_apply_drop_cancel_scheduled(&self, database_id: Option<DatabaseId>) -> bool {
719 let mut queue = self.inner.queue.lock();
720 let mut applied = false;
721
722 let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
723 while let Some(ScheduledQueueItem {
724 notifiers, command, ..
725 }) = queue.queue.inner.pop_front()
726 {
727 match command {
728 Command::DropStreamingJobs { .. } => {
729 applied = true;
730 }
731 Command::DropSubscription { .. } => {}
732 _ => {
733 unreachable!("only drop and cancel streaming jobs should be buffered");
734 }
735 }
736 notifiers.into_iter().for_each(|notify| {
737 notify.notify_collected();
738 });
739 }
740 };
741
742 if let Some(database_id) = database_id {
743 assert_matches!(queue.status, QueueStatus::Ready);
744 if let Some(queue) = queue.queue.get_mut(&database_id) {
745 assert_matches!(queue.status, QueueStatus::Blocked(_));
746 pre_apply_drop_cancel(queue);
747 }
748 } else {
749 assert_matches!(queue.status, QueueStatus::Blocked(_));
750 for queue in queue.queue.values_mut() {
751 pre_apply_drop_cancel(queue);
752 }
753 }
754
755 applied
756 }
757}
758
759#[cfg(test)]
760mod tests {
761 use super::*;
762
763 fn create_test_database(
764 id: u32,
765 barrier_interval_ms: Option<u32>,
766 checkpoint_frequency: Option<u64>,
767 ) -> Database {
768 Database {
769 id,
770 name: format!("test_db_{}", id),
771 barrier_interval_ms,
772 checkpoint_frequency,
773 ..Default::default()
774 }
775 }
776
777 struct MockGlobalBarrierWorkerContext {
779 scheduled_rx: tokio::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<Scheduled>>,
780 }
781
782 impl MockGlobalBarrierWorkerContext {
783 fn new() -> (Self, tokio::sync::mpsc::UnboundedSender<Scheduled>) {
784 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
785 (
786 Self {
787 scheduled_rx: tokio::sync::Mutex::new(rx),
788 },
789 tx,
790 )
791 }
792 }
793
794 impl GlobalBarrierWorkerContext for MockGlobalBarrierWorkerContext {
795 async fn next_scheduled(&self) -> Scheduled {
796 self.scheduled_rx.lock().await.recv().await.unwrap()
797 }
798
799 async fn commit_epoch(
800 &self,
801 _commit_info: crate::hummock::CommitEpochInfo,
802 ) -> MetaResult<risingwave_pb::hummock::HummockVersionStats> {
803 unimplemented!()
804 }
805
806 fn abort_and_mark_blocked(
807 &self,
808 _database_id: Option<DatabaseId>,
809 _recovery_reason: crate::barrier::RecoveryReason,
810 ) {
811 unimplemented!()
812 }
813
814 fn mark_ready(&self, _options: MarkReadyOptions) {
815 unimplemented!()
816 }
817
818 async fn post_collect_command<'a>(
819 &'a self,
820 _command: &'a crate::barrier::command::CommandContext,
821 ) -> MetaResult<()> {
822 unimplemented!()
823 }
824
825 async fn notify_creating_job_failed(&self, _database_id: Option<DatabaseId>, _err: String) {
826 unimplemented!()
827 }
828
829 async fn finish_creating_job(
830 &self,
831 _job: crate::barrier::progress::TrackingJob,
832 ) -> MetaResult<()> {
833 unimplemented!()
834 }
835
836 async fn new_control_stream(
837 &self,
838 _node: &risingwave_pb::common::WorkerNode,
839 _init_request: &risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest,
840 ) -> MetaResult<risingwave_rpc_client::StreamingControlHandle> {
841 unimplemented!()
842 }
843
844 async fn reload_runtime_info(
845 &self,
846 ) -> MetaResult<crate::barrier::BarrierWorkerRuntimeInfoSnapshot> {
847 unimplemented!()
848 }
849
850 async fn reload_database_runtime_info(
851 &self,
852 _database_id: DatabaseId,
853 ) -> MetaResult<Option<crate::barrier::DatabaseRuntimeInfoSnapshot>> {
854 unimplemented!()
855 }
856 }
857
858 #[tokio::test]
859 async fn test_next_barrier_with_different_intervals() {
860 let databases = vec![
862 create_test_database(1, Some(50), Some(2)), create_test_database(2, Some(100), Some(3)), create_test_database(3, None, Some(5)), ];
866
867 let mut periodic = PeriodicBarriers::new(
868 Duration::from_millis(200), 10, databases,
871 );
872
873 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
874
875 for _ in 0..3 {
877 let barrier = periodic.next_barrier(&context).await;
878 assert!(barrier.command.is_none()); assert!(!barrier.checkpoint); }
881
882 let start_time = Instant::now();
885 let barrier = periodic.next_barrier(&context).await;
886 let elapsed = start_time.elapsed();
887
888 assert_eq!(barrier.database_id, DatabaseId::from(1));
890 assert!(barrier.command.is_none()); assert!(barrier.checkpoint); assert!(
893 elapsed <= Duration::from_millis(100),
894 "Elapsed time exceeded: {:?}",
895 elapsed
896 ); let db1_id = DatabaseId::from(1);
900 let db1_state = periodic.databases.get_mut(&db1_id).unwrap();
901 assert_eq!(db1_state.num_uncheckpointed_barrier, 0); }
903
904 #[tokio::test]
905 async fn test_next_barrier_with_scheduled_command() {
906 let databases = vec![
907 create_test_database(1, Some(1000), Some(2)), ];
909
910 let mut periodic = PeriodicBarriers::new(Duration::from_millis(1000), 10, databases);
911
912 let (context, tx) = MockGlobalBarrierWorkerContext::new();
913
914 periodic.next_barrier(&context).await;
916
917 let scheduled_command = Scheduled {
919 database_id: DatabaseId::from(1),
920 command: Command::Flush,
921 notifiers: vec![],
922 span: tracing::Span::none(),
923 };
924
925 let tx_clone = tx.clone();
927 tokio::spawn(async move {
928 tokio::time::sleep(Duration::from_millis(10)).await;
929 tx_clone.send(scheduled_command).unwrap();
930 });
931
932 let barrier = periodic.next_barrier(&context).await;
933
934 assert!(barrier.command.is_some());
936 assert_eq!(barrier.database_id, DatabaseId::from(1));
937
938 if let Some((command, _)) = barrier.command {
939 assert!(matches!(command, Command::Flush));
940 }
941 }
942
943 #[tokio::test]
944 async fn test_next_barrier_multiple_databases_timing() {
945 let databases = vec![
946 create_test_database(1, Some(30), Some(10)), create_test_database(2, Some(100), Some(10)), ];
949
950 let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 10, databases);
951
952 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
953
954 for _ in 0..2 {
956 periodic.next_barrier(&context).await;
957 }
958
959 let mut barrier_counts = HashMap::new();
960
961 let mut barriers = Vec::new();
963 for _ in 0..5 {
964 let barrier = periodic.next_barrier(&context).await;
965 barriers.push(barrier);
966 }
967
968 for barrier in barriers {
970 *barrier_counts.entry(barrier.database_id).or_insert(0) += 1;
971 }
972
973 let db1_count = barrier_counts.get(&DatabaseId::from(1)).unwrap_or(&0);
975 let db2_count = barrier_counts.get(&DatabaseId::from(2)).unwrap_or(&0);
976
977 assert!(*db1_count >= *db2_count);
979 }
980
981 #[tokio::test]
982 async fn test_next_barrier_force_checkpoint() {
983 let databases = vec![create_test_database(1, Some(100), Some(10))];
984
985 let mut periodic = PeriodicBarriers::new(Duration::from_millis(100), 10, databases);
986
987 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
988
989 periodic.force_checkpoint_in_next_barrier(DatabaseId::from(1));
991
992 let barrier = periodic.next_barrier(&context).await;
993
994 assert!(barrier.checkpoint);
996 assert_eq!(barrier.database_id, DatabaseId::from(1));
997 assert!(barrier.command.is_none());
998 }
999
1000 #[tokio::test]
1001 async fn test_next_barrier_checkpoint_frequency() {
1002 let databases = vec![create_test_database(1, Some(50), Some(2))]; let mut periodic = PeriodicBarriers::new(Duration::from_millis(50), 10, databases);
1005
1006 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1007
1008 let barrier1 = periodic.next_barrier(&context).await;
1010 assert!(!barrier1.checkpoint);
1011
1012 let barrier2 = periodic.next_barrier(&context).await;
1014 assert!(barrier2.checkpoint);
1015
1016 let barrier3 = periodic.next_barrier(&context).await;
1018 assert!(!barrier3.checkpoint);
1019 }
1020
1021 #[tokio::test]
1022 async fn test_update_database_barrier() {
1023 let databases = vec![create_test_database(1, Some(1000), Some(10))];
1024
1025 let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 20, databases);
1026
1027 periodic.update_database_barrier(DatabaseId::from(1), Some(2000), Some(15));
1029
1030 let db_state = periodic.databases.get(&DatabaseId::from(1)).unwrap();
1031 assert_eq!(db_state.barrier_interval, Some(Duration::from_millis(2000)));
1032 assert_eq!(db_state.checkpoint_frequency, Some(15));
1033 assert_eq!(db_state.num_uncheckpointed_barrier, 0);
1034 assert!(!db_state.force_checkpoint);
1035
1036 periodic.update_database_barrier(DatabaseId::from(2), None, None);
1038
1039 assert!(periodic.databases.contains_key(&DatabaseId::from(2)));
1040 let db2_state = periodic.databases.get(&DatabaseId::from(2)).unwrap();
1041 assert_eq!(db2_state.barrier_interval, None);
1042 assert_eq!(db2_state.checkpoint_frequency, None);
1043 }
1044}