1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, anyhow};
21use assert_matches::assert_matches;
22use await_tree::InstrumentAwait;
23use itertools::Itertools;
24use parking_lot::Mutex;
25use prometheus::HistogramTimer;
26use risingwave_common::catalog::{DatabaseId, TableId};
27use risingwave_common::metrics::LabelGuardedHistogram;
28use risingwave_hummock_sdk::HummockVersionId;
29use risingwave_pb::catalog::Database;
30use rw_futures_util::pending_on_none;
31use tokio::select;
32use tokio::sync::{oneshot, watch};
33use tokio_stream::wrappers::IntervalStream;
34use tokio_stream::{StreamExt, StreamMap};
35use tracing::{info, warn};
36
37use super::notifier::Notifier;
38use super::{Command, Scheduled};
39use crate::barrier::context::GlobalBarrierWorkerContext;
40use crate::hummock::HummockManagerRef;
41use crate::rpc::metrics::MetaMetrics;
42use crate::{MetaError, MetaResult};
43
44pub(super) struct NewBarrier {
45 pub database_id: DatabaseId,
46 pub command: Option<(Command, Vec<Notifier>)>,
47 pub span: tracing::Span,
48 pub checkpoint: bool,
49}
50
51struct Inner {
56 queue: Mutex<ScheduledQueue>,
57
58 changed_tx: watch::Sender<()>,
60
61 metrics: Arc<MetaMetrics>,
63}
64
65#[derive(Debug)]
66enum QueueStatus {
67 Ready,
69 Blocked(String),
71}
72
73impl QueueStatus {
74 fn is_blocked(&self) -> bool {
75 matches!(self, Self::Blocked(_))
76 }
77}
78
79struct ScheduledQueueItem {
80 command: Command,
81 notifiers: Vec<Notifier>,
82 send_latency_timer: HistogramTimer,
83 span: tracing::Span,
84}
85
86struct StatusQueue<T> {
87 queue: T,
88 status: QueueStatus,
89}
90
91struct DatabaseQueue {
92 inner: VecDeque<ScheduledQueueItem>,
93 send_latency: LabelGuardedHistogram,
94}
95
96type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
97type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
98
99impl DatabaseScheduledQueue {
100 fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
101 Self {
102 queue: DatabaseQueue {
103 inner: Default::default(),
104 send_latency: metrics
105 .barrier_send_latency
106 .with_guarded_label_values(&[database_id.database_id.to_string().as_str()]),
107 },
108 status,
109 }
110 }
111}
112
113impl<T> StatusQueue<T> {
114 fn mark_blocked(&mut self, reason: String) {
115 self.status = QueueStatus::Blocked(reason);
116 }
117
118 fn mark_ready(&mut self) -> bool {
119 let prev_blocked = self.status.is_blocked();
120 self.status = QueueStatus::Ready;
121 prev_blocked
122 }
123
124 fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
125 if let QueueStatus::Blocked(reason) = &self.status
131 && !matches!(
132 command,
133 Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
134 )
135 {
136 return Err(MetaError::unavailable(reason));
137 }
138 Ok(())
139 }
140}
141
142fn tracing_span() -> tracing::Span {
143 if tracing::Span::current().is_none() {
144 tracing::Span::none()
145 } else {
146 tracing::info_span!(
147 "barrier",
148 checkpoint = tracing::field::Empty,
149 epoch = tracing::field::Empty
150 )
151 }
152}
153
154#[derive(Clone)]
157pub struct BarrierScheduler {
158 inner: Arc<Inner>,
159
160 hummock_manager: HummockManagerRef,
162}
163
164impl BarrierScheduler {
165 pub fn new_pair(
168 hummock_manager: HummockManagerRef,
169 metrics: Arc<MetaMetrics>,
170 ) -> (Self, ScheduledBarriers) {
171 let inner = Arc::new(Inner {
172 queue: Mutex::new(ScheduledQueue {
173 queue: Default::default(),
174 status: QueueStatus::Ready,
175 }),
176 changed_tx: watch::channel(()).0,
177 metrics,
178 });
179
180 (
181 Self {
182 inner: inner.clone(),
183 hummock_manager,
184 },
185 ScheduledBarriers { inner },
186 )
187 }
188
189 fn push(
191 &self,
192 database_id: DatabaseId,
193 scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
194 ) -> MetaResult<()> {
195 let mut queue = self.inner.queue.lock();
196 let scheduleds = scheduleds.into_iter().collect_vec();
197 scheduleds
198 .iter()
199 .try_for_each(|(command, _)| queue.validate_item(command))?;
200 let queue = queue.queue.entry(database_id).or_insert_with(|| {
201 DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
202 });
203 scheduleds
204 .iter()
205 .try_for_each(|(command, _)| queue.validate_item(command))?;
206 for (command, notifier) in scheduleds {
207 queue.queue.inner.push_back(ScheduledQueueItem {
208 command,
209 notifiers: vec![notifier],
210 send_latency_timer: queue.queue.send_latency.start_timer(),
211 span: tracing_span(),
212 });
213 if queue.queue.inner.len() == 1 {
214 self.inner.changed_tx.send(()).ok();
215 }
216 }
217 Ok(())
218 }
219
220 pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, table_id: TableId) -> bool {
222 let queue = &mut self.inner.queue.lock();
223 let Some(queue) = queue.queue.get_mut(&database_id) else {
224 return false;
225 };
226
227 if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
228 if let Command::CreateStreamingJob { info, .. } = &scheduled.command
229 && info.stream_job_fragments.stream_job_id() == table_id
230 {
231 true
232 } else {
233 false
234 }
235 }) {
236 queue.queue.inner.remove(idx).unwrap();
237 true
238 } else {
239 false
240 }
241 }
242
243 #[await_tree::instrument("run_commands({})", commands.iter().join(", "))]
250 async fn run_multiple_commands(
251 &self,
252 database_id: DatabaseId,
253 commands: Vec<Command>,
254 ) -> MetaResult<()> {
255 let mut contexts = Vec::with_capacity(commands.len());
256 let mut scheduleds = Vec::with_capacity(commands.len());
257
258 for command in commands {
259 let (started_tx, started_rx) = oneshot::channel();
260 let (collect_tx, collect_rx) = oneshot::channel();
261
262 contexts.push((started_rx, collect_rx));
263 scheduleds.push((
264 command,
265 Notifier {
266 started: Some(started_tx),
267 collected: Some(collect_tx),
268 },
269 ));
270 }
271
272 self.push(database_id, scheduleds)?;
273
274 for (injected_rx, collect_rx) in contexts {
275 tracing::trace!("waiting for injected_rx");
277 injected_rx
278 .instrument_await("wait_injected")
279 .await
280 .ok()
281 .context("failed to inject barrier")??;
282
283 tracing::trace!("waiting for collect_rx");
284 collect_rx
286 .instrument_await("wait_collected")
287 .await
288 .ok()
289 .context("failed to collect barrier")??;
290 }
291
292 Ok(())
293 }
294
295 pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
299 tracing::trace!("run_command: {:?}", command);
300 let ret = self.run_multiple_commands(database_id, vec![command]).await;
301 tracing::trace!("run_command finished");
302 ret
303 }
304
305 pub fn run_command_no_wait(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
307 tracing::trace!("run_command_no_wait: {:?}", command);
308 self.push(database_id, vec![(command, Notifier::default())])
309 }
310
311 pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
313 let start = Instant::now();
314
315 tracing::debug!("start barrier flush");
316 self.run_multiple_commands(database_id, vec![Command::Flush])
317 .await?;
318
319 let elapsed = Instant::now().duration_since(start);
320 tracing::debug!("barrier flushed in {:?}", elapsed);
321
322 let version_id = self.hummock_manager.get_version_id().await;
323 Ok(version_id)
324 }
325}
326
327pub struct ScheduledBarriers {
329 inner: Arc<Inner>,
330}
331
332#[derive(Debug)]
334pub struct DatabaseBarrierState {
335 pub barrier_interval: Option<Duration>,
336 pub checkpoint_frequency: Option<u64>,
337 pub force_checkpoint: bool,
339 pub num_uncheckpointed_barrier: u64,
341}
342
343impl DatabaseBarrierState {
344 fn new(barrier_interval_ms: Option<u32>, checkpoint_frequency: Option<u64>) -> Self {
345 Self {
346 barrier_interval: barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64)),
347 checkpoint_frequency,
348 force_checkpoint: false,
349 num_uncheckpointed_barrier: 0,
350 }
351 }
352}
353
354#[derive(Default, Debug)]
356pub struct PeriodicBarriers {
357 sys_barrier_interval: Duration,
359 sys_checkpoint_frequency: u64,
360 databases: HashMap<DatabaseId, DatabaseBarrierState>,
362 timer_streams: StreamMap<DatabaseId, IntervalStream>,
365}
366
367impl PeriodicBarriers {
368 pub(super) fn new(
369 sys_barrier_interval: Duration,
370 sys_checkpoint_frequency: u64,
371 database_infos: Vec<Database>,
372 ) -> Self {
373 let mut databases = HashMap::with_capacity(database_infos.len());
374 let mut timer_streams = StreamMap::with_capacity(database_infos.len());
375 database_infos.into_iter().for_each(|database| {
376 let database_id: DatabaseId = database.id.into();
377 let barrier_interval_ms = database.barrier_interval_ms;
378 let checkpoint_frequency = database.checkpoint_frequency;
379 databases.insert(
380 database_id,
381 DatabaseBarrierState::new(barrier_interval_ms, checkpoint_frequency),
382 );
383 let duration = if let Some(ms) = barrier_interval_ms {
384 Duration::from_millis(ms as u64)
385 } else {
386 sys_barrier_interval
387 };
388 let interval_stream = Self::new_interval_stream(duration);
390 timer_streams.insert(database_id, interval_stream);
391 });
392 Self {
393 sys_barrier_interval,
394 sys_checkpoint_frequency,
395 databases,
396 timer_streams,
397 }
398 }
399
400 fn new_interval_stream(duration: Duration) -> IntervalStream {
402 let mut interval = tokio::time::interval(duration);
403 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
404 IntervalStream::new(interval)
405 }
406
407 pub(super) fn set_sys_barrier_interval(&mut self, duration: Duration) {
409 if self.sys_barrier_interval == duration {
410 return;
411 }
412 self.sys_barrier_interval = duration;
413 for (db_id, db_state) in &mut self.databases {
415 if db_state.barrier_interval.is_none() {
416 let interval_stream = Self::new_interval_stream(duration);
417 self.timer_streams.insert(*db_id, interval_stream);
418 }
419 }
420 }
421
422 pub fn set_sys_checkpoint_frequency(&mut self, frequency: u64) {
424 if self.sys_checkpoint_frequency == frequency {
425 return;
426 }
427 self.sys_checkpoint_frequency = frequency;
428 for db_state in self.databases.values_mut() {
430 if db_state.checkpoint_frequency.is_none() {
431 db_state.num_uncheckpointed_barrier = 0;
432 db_state.force_checkpoint = false;
433 }
434 }
435 }
436
437 pub(super) fn update_database_barrier(
438 &mut self,
439 database_id: DatabaseId,
440 barrier_interval_ms: Option<u32>,
441 checkpoint_frequency: Option<u64>,
442 ) {
443 match self.databases.entry(database_id) {
444 Entry::Occupied(mut entry) => {
445 let db_state = entry.get_mut();
446 db_state.barrier_interval =
447 barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64));
448 db_state.checkpoint_frequency = checkpoint_frequency;
449 db_state.num_uncheckpointed_barrier = 0;
451 db_state.force_checkpoint = false;
452 }
453 Entry::Vacant(entry) => {
454 entry.insert(DatabaseBarrierState::new(
455 barrier_interval_ms,
456 checkpoint_frequency,
457 ));
458 }
459 }
460
461 let duration = if let Some(ms) = barrier_interval_ms {
463 Duration::from_millis(ms as u64)
464 } else {
465 self.sys_barrier_interval
466 };
467 let interval_stream = Self::new_interval_stream(duration);
468 self.timer_streams.insert(database_id, interval_stream);
469 }
470
471 pub fn force_checkpoint_in_next_barrier(&mut self, database_id: DatabaseId) {
473 if let Some(db_state) = self.databases.get_mut(&database_id) {
474 db_state.force_checkpoint = true;
475 } else {
476 warn!(
477 ?database_id,
478 "force checkpoint in next barrier for non-existing database"
479 );
480 }
481 }
482
483 #[await_tree::instrument]
484 pub(super) async fn next_barrier(
485 &mut self,
486 context: &impl GlobalBarrierWorkerContext,
487 ) -> NewBarrier {
488 let new_barrier = select! {
489 biased;
490 scheduled = context.next_scheduled() => {
491 let database_id = scheduled.database_id;
492 assert!(self.databases.contains_key(&database_id), "database {} not found in periodic barriers", database_id);
494 assert!(self.timer_streams.contains_key(&database_id), "timer stream for database {} not found in periodic barriers", database_id);
495 for (db_id, timer_stream) in self.timer_streams.iter_mut() {
497 if *db_id == database_id {
498 timer_stream.as_mut().reset();
499 }
500 }
501 let checkpoint = scheduled.command.need_checkpoint() || self.try_get_checkpoint(database_id);
502 NewBarrier {
503 database_id: scheduled.database_id,
504 command: Some((scheduled.command, scheduled.notifiers)),
505 span: scheduled.span,
506 checkpoint,
507 }
508 },
509 next_timer = pending_on_none(self.timer_streams.next()) => {
512 let (database_id, _instant) = next_timer;
513 let checkpoint = self.try_get_checkpoint(database_id);
514 NewBarrier {
515 database_id,
516 command: None,
517 span: tracing_span(),
518 checkpoint,
519 }
520 }
521 };
522 self.update_num_uncheckpointed_barrier(new_barrier.database_id, new_barrier.checkpoint);
523
524 new_barrier
525 }
526
527 fn try_get_checkpoint(&self, database_id: DatabaseId) -> bool {
529 let db_state = self.databases.get(&database_id).unwrap();
530 let checkpoint_frequency = db_state
531 .checkpoint_frequency
532 .unwrap_or(self.sys_checkpoint_frequency);
533 db_state.num_uncheckpointed_barrier + 1 >= checkpoint_frequency || db_state.force_checkpoint
534 }
535
536 fn update_num_uncheckpointed_barrier(&mut self, database_id: DatabaseId, checkpoint: bool) {
538 let db_state = self.databases.get_mut(&database_id).unwrap();
539 if checkpoint {
540 db_state.num_uncheckpointed_barrier = 0;
541 db_state.force_checkpoint = false;
542 } else {
543 db_state.num_uncheckpointed_barrier += 1;
544 }
545 }
546}
547
548impl ScheduledBarriers {
549 pub(super) async fn next_scheduled(&self) -> Scheduled {
550 'outer: loop {
551 let mut rx = self.inner.changed_tx.subscribe();
552 {
553 let mut queue = self.inner.queue.lock();
554 if queue.status.is_blocked() {
555 continue;
556 }
557 for (database_id, queue) in &mut queue.queue {
558 if queue.status.is_blocked() {
559 continue;
560 }
561 if let Some(item) = queue.queue.inner.pop_front() {
562 item.send_latency_timer.observe_duration();
563 break 'outer Scheduled {
564 database_id: *database_id,
565 command: item.command,
566 notifiers: item.notifiers,
567 span: item.span,
568 };
569 }
570 }
571 }
572 rx.changed().await.unwrap();
573 }
574 }
575}
576
577pub(super) enum MarkReadyOptions {
578 Database(DatabaseId),
579 Global {
580 blocked_databases: HashSet<DatabaseId>,
581 },
582}
583
584impl ScheduledBarriers {
585 pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> bool {
587 self.pre_apply_drop_cancel_scheduled(database_id)
588 }
589
590 pub(super) fn abort_and_mark_blocked(
593 &self,
594 database_id: Option<DatabaseId>,
595 reason: impl Into<String>,
596 ) {
597 let mut queue = self.inner.queue.lock();
598 fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
599 format!("database {} unavailable {}", database_id, reason)
600 }
601 fn mark_blocked_and_notify_failed(
602 database_id: DatabaseId,
603 queue: &mut DatabaseScheduledQueue,
604 reason: &String,
605 ) {
606 let reason = database_blocked_reason(database_id, reason);
607 let err: MetaError = anyhow!("{}", reason).into();
608 queue.mark_blocked(reason);
609 while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
610 notifiers
611 .into_iter()
612 .for_each(|notify| notify.notify_collection_failed(err.clone()))
613 }
614 }
615 if let Some(database_id) = database_id {
616 let reason = reason.into();
617 match queue.queue.entry(database_id) {
618 Entry::Occupied(entry) => {
619 let queue = entry.into_mut();
620 if queue.status.is_blocked() {
621 if cfg!(debug_assertions) {
622 panic!("database {} marked as blocked twice", database_id);
623 } else {
624 warn!(?database_id, "database marked as blocked twice");
625 }
626 }
627 info!(?database_id, "database marked as blocked");
628 mark_blocked_and_notify_failed(database_id, queue, &reason);
629 }
630 Entry::Vacant(entry) => {
631 entry.insert(DatabaseScheduledQueue::new(
632 database_id,
633 &self.inner.metrics,
634 QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
635 ));
636 }
637 }
638 } else {
639 let reason = reason.into();
640 if queue.status.is_blocked() {
641 if cfg!(debug_assertions) {
642 panic!("cluster marked as blocked twice");
643 } else {
644 warn!("cluster marked as blocked twice");
645 }
646 }
647 info!("cluster marked as blocked");
648 queue.mark_blocked(reason.clone());
649 for (database_id, queue) in &mut queue.queue {
650 mark_blocked_and_notify_failed(*database_id, queue, &reason);
651 }
652 }
653 }
654
655 pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
657 let mut queue = self.inner.queue.lock();
658 let queue = &mut *queue;
659 match options {
660 MarkReadyOptions::Database(database_id) => {
661 info!(?database_id, "database marked as ready");
662 let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
663 DatabaseScheduledQueue::new(
664 database_id,
665 &self.inner.metrics,
666 QueueStatus::Ready,
667 )
668 });
669 if !database_queue.status.is_blocked() {
670 if cfg!(debug_assertions) {
671 panic!("database {} marked as ready twice", database_id);
672 } else {
673 warn!(?database_id, "database marked as ready twice");
674 }
675 }
676 if database_queue.mark_ready()
677 && !queue.status.is_blocked()
678 && !database_queue.queue.inner.is_empty()
679 {
680 self.inner.changed_tx.send(()).ok();
681 }
682 }
683 MarkReadyOptions::Global { blocked_databases } => {
684 if !queue.status.is_blocked() {
685 if cfg!(debug_assertions) {
686 panic!("cluster marked as ready twice");
687 } else {
688 warn!("cluster marked as ready twice");
689 }
690 }
691 info!(?blocked_databases, "cluster marked as ready");
692 let prev_blocked = queue.mark_ready();
693 for database_id in &blocked_databases {
694 queue.queue.entry(*database_id).or_insert_with(|| {
695 DatabaseScheduledQueue::new(
696 *database_id,
697 &self.inner.metrics,
698 QueueStatus::Blocked(format!(
699 "database {} failed to recover in global recovery",
700 database_id
701 )),
702 )
703 });
704 }
705 for (database_id, queue) in &mut queue.queue {
706 if !blocked_databases.contains(database_id) {
707 queue.mark_ready();
708 }
709 }
710 if prev_blocked
711 && queue
712 .queue
713 .values()
714 .any(|database_queue| !database_queue.queue.inner.is_empty())
715 {
716 self.inner.changed_tx.send(()).ok();
717 }
718 }
719 }
720 }
721
722 pub(super) fn pre_apply_drop_cancel_scheduled(&self, database_id: Option<DatabaseId>) -> bool {
725 let mut queue = self.inner.queue.lock();
726 let mut applied = false;
727
728 let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
729 while let Some(ScheduledQueueItem {
730 notifiers, command, ..
731 }) = queue.queue.inner.pop_front()
732 {
733 match command {
734 Command::DropStreamingJobs { .. } => {
735 applied = true;
736 }
737 Command::DropSubscription { .. } => {}
738 _ => {
739 unreachable!("only drop and cancel streaming jobs should be buffered");
740 }
741 }
742 notifiers.into_iter().for_each(|notify| {
743 notify.notify_collected();
744 });
745 }
746 };
747
748 if let Some(database_id) = database_id {
749 assert_matches!(queue.status, QueueStatus::Ready);
750 if let Some(queue) = queue.queue.get_mut(&database_id) {
751 assert_matches!(queue.status, QueueStatus::Blocked(_));
752 pre_apply_drop_cancel(queue);
753 }
754 } else {
755 assert_matches!(queue.status, QueueStatus::Blocked(_));
756 for queue in queue.queue.values_mut() {
757 pre_apply_drop_cancel(queue);
758 }
759 }
760
761 applied
762 }
763}
764
765#[cfg(test)]
766mod tests {
767 use super::*;
768
769 fn create_test_database(
770 id: u32,
771 barrier_interval_ms: Option<u32>,
772 checkpoint_frequency: Option<u64>,
773 ) -> Database {
774 Database {
775 id,
776 name: format!("test_db_{}", id),
777 barrier_interval_ms,
778 checkpoint_frequency,
779 ..Default::default()
780 }
781 }
782
783 struct MockGlobalBarrierWorkerContext {
785 scheduled_rx: tokio::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<Scheduled>>,
786 }
787
788 impl MockGlobalBarrierWorkerContext {
789 fn new() -> (Self, tokio::sync::mpsc::UnboundedSender<Scheduled>) {
790 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
791 (
792 Self {
793 scheduled_rx: tokio::sync::Mutex::new(rx),
794 },
795 tx,
796 )
797 }
798 }
799
800 impl GlobalBarrierWorkerContext for MockGlobalBarrierWorkerContext {
801 async fn next_scheduled(&self) -> Scheduled {
802 self.scheduled_rx.lock().await.recv().await.unwrap()
803 }
804
805 async fn commit_epoch(
806 &self,
807 _commit_info: crate::hummock::CommitEpochInfo,
808 ) -> MetaResult<risingwave_pb::hummock::HummockVersionStats> {
809 unimplemented!()
810 }
811
812 fn abort_and_mark_blocked(
813 &self,
814 _database_id: Option<DatabaseId>,
815 _recovery_reason: crate::barrier::RecoveryReason,
816 ) {
817 unimplemented!()
818 }
819
820 fn mark_ready(&self, _options: MarkReadyOptions) {
821 unimplemented!()
822 }
823
824 async fn post_collect_command<'a>(
825 &'a self,
826 _command: &'a crate::barrier::command::CommandContext,
827 ) -> MetaResult<()> {
828 unimplemented!()
829 }
830
831 async fn notify_creating_job_failed(&self, _database_id: Option<DatabaseId>, _err: String) {
832 unimplemented!()
833 }
834
835 async fn finish_creating_job(
836 &self,
837 _job: crate::barrier::progress::TrackingJob,
838 ) -> MetaResult<()> {
839 unimplemented!()
840 }
841
842 async fn new_control_stream(
843 &self,
844 _node: &risingwave_pb::common::WorkerNode,
845 _init_request: &risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest,
846 ) -> MetaResult<risingwave_rpc_client::StreamingControlHandle> {
847 unimplemented!()
848 }
849
850 async fn reload_runtime_info(
851 &self,
852 ) -> MetaResult<crate::barrier::BarrierWorkerRuntimeInfoSnapshot> {
853 unimplemented!()
854 }
855
856 async fn reload_database_runtime_info(
857 &self,
858 _database_id: DatabaseId,
859 ) -> MetaResult<Option<crate::barrier::DatabaseRuntimeInfoSnapshot>> {
860 unimplemented!()
861 }
862
863 async fn handle_load_finished_source_ids(
864 &self,
865 _load_finished_source_ids: Vec<u32>,
866 ) -> MetaResult<()> {
867 unimplemented!()
868 }
869 }
870
871 #[tokio::test]
872 async fn test_next_barrier_with_different_intervals() {
873 let databases = vec![
875 create_test_database(1, Some(50), Some(2)), create_test_database(2, Some(100), Some(3)), create_test_database(3, None, Some(5)), ];
879
880 let mut periodic = PeriodicBarriers::new(
881 Duration::from_millis(200), 10, databases,
884 );
885
886 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
887
888 for _ in 0..3 {
890 let barrier = periodic.next_barrier(&context).await;
891 assert!(barrier.command.is_none()); assert!(!barrier.checkpoint); }
894
895 let start_time = Instant::now();
898 let barrier = periodic.next_barrier(&context).await;
899 let elapsed = start_time.elapsed();
900
901 assert_eq!(barrier.database_id, DatabaseId::from(1));
903 assert!(barrier.command.is_none()); assert!(barrier.checkpoint); assert!(
906 elapsed <= Duration::from_millis(100),
907 "Elapsed time exceeded: {:?}",
908 elapsed
909 ); let db1_id = DatabaseId::from(1);
913 let db1_state = periodic.databases.get_mut(&db1_id).unwrap();
914 assert_eq!(db1_state.num_uncheckpointed_barrier, 0); }
916
917 #[tokio::test]
918 async fn test_next_barrier_with_scheduled_command() {
919 let databases = vec![
920 create_test_database(1, Some(1000), Some(2)), ];
922
923 let mut periodic = PeriodicBarriers::new(Duration::from_millis(1000), 10, databases);
924
925 let (context, tx) = MockGlobalBarrierWorkerContext::new();
926
927 periodic.next_barrier(&context).await;
929
930 let scheduled_command = Scheduled {
932 database_id: DatabaseId::from(1),
933 command: Command::Flush,
934 notifiers: vec![],
935 span: tracing::Span::none(),
936 };
937
938 let tx_clone = tx.clone();
940 tokio::spawn(async move {
941 tokio::time::sleep(Duration::from_millis(10)).await;
942 tx_clone.send(scheduled_command).unwrap();
943 });
944
945 let barrier = periodic.next_barrier(&context).await;
946
947 assert!(barrier.command.is_some());
949 assert_eq!(barrier.database_id, DatabaseId::from(1));
950
951 if let Some((command, _)) = barrier.command {
952 assert!(matches!(command, Command::Flush));
953 }
954 }
955
956 #[tokio::test]
957 async fn test_next_barrier_multiple_databases_timing() {
958 let databases = vec![
959 create_test_database(1, Some(30), Some(10)), create_test_database(2, Some(100), Some(10)), ];
962
963 let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 10, databases);
964
965 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
966
967 for _ in 0..2 {
969 periodic.next_barrier(&context).await;
970 }
971
972 let mut barrier_counts = HashMap::new();
973
974 let mut barriers = Vec::new();
976 for _ in 0..5 {
977 let barrier = periodic.next_barrier(&context).await;
978 barriers.push(barrier);
979 }
980
981 for barrier in barriers {
983 *barrier_counts.entry(barrier.database_id).or_insert(0) += 1;
984 }
985
986 let db1_count = barrier_counts.get(&DatabaseId::from(1)).unwrap_or(&0);
988 let db2_count = barrier_counts.get(&DatabaseId::from(2)).unwrap_or(&0);
989
990 assert!(*db1_count >= *db2_count);
992 }
993
994 #[tokio::test]
995 async fn test_next_barrier_force_checkpoint() {
996 let databases = vec![create_test_database(1, Some(100), Some(10))];
997
998 let mut periodic = PeriodicBarriers::new(Duration::from_millis(100), 10, databases);
999
1000 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1001
1002 periodic.force_checkpoint_in_next_barrier(DatabaseId::from(1));
1004
1005 let barrier = periodic.next_barrier(&context).await;
1006
1007 assert!(barrier.checkpoint);
1009 assert_eq!(barrier.database_id, DatabaseId::from(1));
1010 assert!(barrier.command.is_none());
1011 }
1012
1013 #[tokio::test]
1014 async fn test_next_barrier_checkpoint_frequency() {
1015 let databases = vec![create_test_database(1, Some(50), Some(2))]; let mut periodic = PeriodicBarriers::new(Duration::from_millis(50), 10, databases);
1018
1019 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1020
1021 let barrier1 = periodic.next_barrier(&context).await;
1023 assert!(!barrier1.checkpoint);
1024
1025 let barrier2 = periodic.next_barrier(&context).await;
1027 assert!(barrier2.checkpoint);
1028
1029 let barrier3 = periodic.next_barrier(&context).await;
1031 assert!(!barrier3.checkpoint);
1032 }
1033
1034 #[tokio::test]
1035 async fn test_update_database_barrier() {
1036 let databases = vec![create_test_database(1, Some(1000), Some(10))];
1037
1038 let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 20, databases);
1039
1040 periodic.update_database_barrier(DatabaseId::from(1), Some(2000), Some(15));
1042
1043 let db_state = periodic.databases.get(&DatabaseId::from(1)).unwrap();
1044 assert_eq!(db_state.barrier_interval, Some(Duration::from_millis(2000)));
1045 assert_eq!(db_state.checkpoint_frequency, Some(15));
1046 assert_eq!(db_state.num_uncheckpointed_barrier, 0);
1047 assert!(!db_state.force_checkpoint);
1048
1049 periodic.update_database_barrier(DatabaseId::from(2), None, None);
1051
1052 assert!(periodic.databases.contains_key(&DatabaseId::from(2)));
1053 let db2_state = periodic.databases.get(&DatabaseId::from(2)).unwrap();
1054 assert_eq!(db2_state.barrier_interval, None);
1055 assert_eq!(db2_state.checkpoint_frequency, None);
1056 }
1057}