1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, anyhow};
21use assert_matches::assert_matches;
22use await_tree::InstrumentAwait;
23use itertools::Itertools;
24use parking_lot::Mutex;
25use prometheus::HistogramTimer;
26use risingwave_common::catalog::{DatabaseId, TableId};
27use risingwave_common::id::JobId;
28use risingwave_common::metrics::LabelGuardedHistogram;
29use risingwave_hummock_sdk::HummockVersionId;
30use risingwave_pb::catalog::Database;
31use rw_futures_util::pending_on_none;
32use tokio::select;
33use tokio::sync::{oneshot, watch};
34use tokio_stream::wrappers::IntervalStream;
35use tokio_stream::{StreamExt, StreamMap};
36use tracing::{info, warn};
37
38use super::notifier::Notifier;
39use super::{Command, Scheduled};
40use crate::barrier::context::GlobalBarrierWorkerContext;
41use crate::hummock::HummockManagerRef;
42use crate::rpc::metrics::{GLOBAL_META_METRICS, MetaMetrics};
43use crate::{MetaError, MetaResult};
44
45pub(super) struct NewBarrier {
46 pub database_id: DatabaseId,
47 pub command: Option<(Command, Vec<Notifier>)>,
48 pub span: tracing::Span,
49 pub checkpoint: bool,
50}
51
52struct Inner {
57 queue: Mutex<ScheduledQueue>,
58
59 changed_tx: watch::Sender<()>,
61
62 metrics: Arc<MetaMetrics>,
64}
65
66#[derive(Debug)]
67enum QueueStatus {
68 Ready,
70 Blocked(String),
72}
73
74impl QueueStatus {
75 fn is_blocked(&self) -> bool {
76 matches!(self, Self::Blocked(_))
77 }
78}
79
80struct ScheduledQueueItem {
81 command: Command,
82 notifiers: Vec<Notifier>,
83 send_latency_timer: HistogramTimer,
84 span: tracing::Span,
85}
86
87struct StatusQueue<T> {
88 queue: T,
89 status: QueueStatus,
90}
91
92struct DatabaseQueue {
93 inner: VecDeque<ScheduledQueueItem>,
94 send_latency: LabelGuardedHistogram,
95}
96
97type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
98type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
99
100impl DatabaseScheduledQueue {
101 fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
102 Self {
103 queue: DatabaseQueue {
104 inner: Default::default(),
105 send_latency: metrics
106 .barrier_send_latency
107 .with_guarded_label_values(&[database_id.to_string().as_str()]),
108 },
109 status,
110 }
111 }
112}
113
114impl<T> StatusQueue<T> {
115 fn mark_blocked(&mut self, reason: String) {
116 self.status = QueueStatus::Blocked(reason);
117 }
118
119 fn mark_ready(&mut self) -> bool {
120 let prev_blocked = self.status.is_blocked();
121 self.status = QueueStatus::Ready;
122 prev_blocked
123 }
124
125 fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
126 if let QueueStatus::Blocked(reason) = &self.status
132 && !matches!(
133 command,
134 Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
135 )
136 {
137 return Err(MetaError::unavailable(reason));
138 }
139 Ok(())
140 }
141}
142
143fn tracing_span() -> tracing::Span {
144 if tracing::Span::current().is_none() {
145 tracing::Span::none()
146 } else {
147 tracing::info_span!(
148 "barrier",
149 checkpoint = tracing::field::Empty,
150 epoch = tracing::field::Empty
151 )
152 }
153}
154
155#[derive(Clone)]
158pub struct BarrierScheduler {
159 inner: Arc<Inner>,
160
161 hummock_manager: HummockManagerRef,
163}
164
165impl BarrierScheduler {
166 pub fn new_pair(
169 hummock_manager: HummockManagerRef,
170 metrics: Arc<MetaMetrics>,
171 ) -> (Self, ScheduledBarriers) {
172 let inner = Arc::new(Inner {
173 queue: Mutex::new(ScheduledQueue {
174 queue: Default::default(),
175 status: QueueStatus::Ready,
176 }),
177 changed_tx: watch::channel(()).0,
178 metrics,
179 });
180
181 (
182 Self {
183 inner: inner.clone(),
184 hummock_manager,
185 },
186 ScheduledBarriers { inner },
187 )
188 }
189
190 fn push(
192 &self,
193 database_id: DatabaseId,
194 scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
195 ) -> MetaResult<()> {
196 let mut queue = self.inner.queue.lock();
197 let scheduleds = scheduleds.into_iter().collect_vec();
198 scheduleds
199 .iter()
200 .try_for_each(|(command, _)| queue.validate_item(command))?;
201 let queue = queue.queue.entry(database_id).or_insert_with(|| {
202 DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
203 });
204 scheduleds
205 .iter()
206 .try_for_each(|(command, _)| queue.validate_item(command))?;
207 for (command, notifier) in scheduleds {
208 queue.queue.inner.push_back(ScheduledQueueItem {
209 command,
210 notifiers: vec![notifier],
211 send_latency_timer: queue.queue.send_latency.start_timer(),
212 span: tracing_span(),
213 });
214 if queue.queue.inner.len() == 1 {
215 self.inner.changed_tx.send(()).ok();
216 }
217 }
218 Ok(())
219 }
220
221 pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, job_id: JobId) -> bool {
223 let queue = &mut self.inner.queue.lock();
224 let Some(queue) = queue.queue.get_mut(&database_id) else {
225 return false;
226 };
227
228 if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
229 if let Command::CreateStreamingJob { info, .. } = &scheduled.command
230 && info.stream_job_fragments.stream_job_id() == job_id
231 {
232 true
233 } else {
234 false
235 }
236 }) {
237 queue.queue.inner.remove(idx).unwrap();
238 true
239 } else {
240 false
241 }
242 }
243
244 #[await_tree::instrument("run_commands({})", commands.iter().join(", "))]
251 async fn run_multiple_commands(
252 &self,
253 database_id: DatabaseId,
254 commands: Vec<Command>,
255 ) -> MetaResult<()> {
256 let mut contexts = Vec::with_capacity(commands.len());
257 let mut scheduleds = Vec::with_capacity(commands.len());
258
259 for command in commands {
260 let (started_tx, started_rx) = oneshot::channel();
261 let (collect_tx, collect_rx) = oneshot::channel();
262
263 contexts.push((started_rx, collect_rx));
264 scheduleds.push((
265 command,
266 Notifier {
267 started: Some(started_tx),
268 collected: Some(collect_tx),
269 },
270 ));
271 }
272
273 self.push(database_id, scheduleds)?;
274
275 for (injected_rx, collect_rx) in contexts {
276 tracing::trace!("waiting for injected_rx");
278 injected_rx
279 .instrument_await("wait_injected")
280 .await
281 .ok()
282 .context("failed to inject barrier")??;
283
284 tracing::trace!("waiting for collect_rx");
285 collect_rx
287 .instrument_await("wait_collected")
288 .await
289 .ok()
290 .context("failed to collect barrier")??;
291 }
292
293 Ok(())
294 }
295
296 pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
300 tracing::trace!("run_command: {:?}", command);
301 let ret = self.run_multiple_commands(database_id, vec![command]).await;
302 tracing::trace!("run_command finished");
303 ret
304 }
305
306 pub fn run_command_no_wait(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
308 tracing::trace!("run_command_no_wait: {:?}", command);
309 self.push(database_id, vec![(command, Notifier::default())])
310 }
311
312 pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
314 let start = Instant::now();
315
316 tracing::debug!("start barrier flush");
317 self.run_multiple_commands(database_id, vec![Command::Flush])
318 .await?;
319
320 let elapsed = Instant::now().duration_since(start);
321 tracing::debug!("barrier flushed in {:?}", elapsed);
322
323 let version_id = self.hummock_manager.get_version_id().await;
324 Ok(version_id)
325 }
326}
327
328pub struct ScheduledBarriers {
330 inner: Arc<Inner>,
331}
332
333#[derive(Debug)]
335pub struct DatabaseBarrierState {
336 pub barrier_interval: Option<Duration>,
337 pub checkpoint_frequency: Option<u64>,
338 pub force_checkpoint: bool,
340 pub num_uncheckpointed_barrier: u64,
342}
343
344impl DatabaseBarrierState {
345 fn new(barrier_interval_ms: Option<u32>, checkpoint_frequency: Option<u64>) -> Self {
346 Self {
347 barrier_interval: barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64)),
348 checkpoint_frequency,
349 force_checkpoint: false,
350 num_uncheckpointed_barrier: 0,
351 }
352 }
353}
354
355#[derive(Default, Debug)]
357pub struct PeriodicBarriers {
358 sys_barrier_interval: Duration,
360 sys_checkpoint_frequency: u64,
361 databases: HashMap<DatabaseId, DatabaseBarrierState>,
363 timer_streams: StreamMap<DatabaseId, IntervalStream>,
366}
367
368impl PeriodicBarriers {
369 pub(super) fn new(
370 sys_barrier_interval: Duration,
371 sys_checkpoint_frequency: u64,
372 database_infos: Vec<Database>,
373 ) -> Self {
374 let mut databases = HashMap::with_capacity(database_infos.len());
375 let mut timer_streams = StreamMap::with_capacity(database_infos.len());
376 database_infos.into_iter().for_each(|database| {
377 let database_id: DatabaseId = database.id;
378 let barrier_interval_ms = database.barrier_interval_ms;
379 let checkpoint_frequency = database.checkpoint_frequency;
380 databases.insert(
381 database_id,
382 DatabaseBarrierState::new(barrier_interval_ms, checkpoint_frequency),
383 );
384 let duration = if let Some(ms) = barrier_interval_ms {
385 Duration::from_millis(ms as u64)
386 } else {
387 sys_barrier_interval
388 };
389
390 let interval_stream = Self::new_interval_stream(duration, &database_id);
392 timer_streams.insert(database_id, interval_stream);
393 });
394 Self {
395 sys_barrier_interval,
396 sys_checkpoint_frequency,
397 databases,
398 timer_streams,
399 }
400 }
401
402 fn new_interval_stream(duration: Duration, database_id: &DatabaseId) -> IntervalStream {
404 GLOBAL_META_METRICS
405 .barrier_interval_by_database
406 .with_label_values(&[&database_id.to_string()])
407 .set(duration.as_millis_f64());
408 let mut interval = tokio::time::interval(duration);
409 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
410 IntervalStream::new(interval)
411 }
412
413 pub(super) fn set_sys_barrier_interval(&mut self, duration: Duration) {
415 if self.sys_barrier_interval == duration {
416 return;
417 }
418 self.sys_barrier_interval = duration;
419 for (db_id, db_state) in &mut self.databases {
421 if db_state.barrier_interval.is_none() {
422 let interval_stream = Self::new_interval_stream(duration, db_id);
423 self.timer_streams.insert(*db_id, interval_stream);
424 }
425 }
426 }
427
428 pub fn set_sys_checkpoint_frequency(&mut self, frequency: u64) {
430 if self.sys_checkpoint_frequency == frequency {
431 return;
432 }
433 self.sys_checkpoint_frequency = frequency;
434 for db_state in self.databases.values_mut() {
436 if db_state.checkpoint_frequency.is_none() {
437 db_state.num_uncheckpointed_barrier = 0;
438 db_state.force_checkpoint = false;
439 }
440 }
441 }
442
443 pub(super) fn update_database_barrier(
444 &mut self,
445 database_id: DatabaseId,
446 barrier_interval_ms: Option<u32>,
447 checkpoint_frequency: Option<u64>,
448 ) {
449 match self.databases.entry(database_id) {
450 Entry::Occupied(mut entry) => {
451 let db_state = entry.get_mut();
452 db_state.barrier_interval =
453 barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64));
454 db_state.checkpoint_frequency = checkpoint_frequency;
455 db_state.num_uncheckpointed_barrier = 0;
457 db_state.force_checkpoint = false;
458 }
459 Entry::Vacant(entry) => {
460 entry.insert(DatabaseBarrierState::new(
461 barrier_interval_ms,
462 checkpoint_frequency,
463 ));
464 }
465 }
466
467 let duration = if let Some(ms) = barrier_interval_ms {
469 Duration::from_millis(ms as u64)
470 } else {
471 self.sys_barrier_interval
472 };
473
474 let interval_stream = Self::new_interval_stream(duration, &database_id);
475 self.timer_streams.insert(database_id, interval_stream);
476 }
477
478 pub fn force_checkpoint_in_next_barrier(&mut self, database_id: DatabaseId) {
480 if let Some(db_state) = self.databases.get_mut(&database_id) {
481 db_state.force_checkpoint = true;
482 } else {
483 warn!(
484 ?database_id,
485 "force checkpoint in next barrier for non-existing database"
486 );
487 }
488 }
489
490 #[await_tree::instrument]
491 pub(super) async fn next_barrier(
492 &mut self,
493 context: &impl GlobalBarrierWorkerContext,
494 ) -> NewBarrier {
495 let new_barrier = select! {
496 biased;
497 scheduled = context.next_scheduled() => {
498 let database_id = scheduled.database_id;
499 assert!(self.databases.contains_key(&database_id), "database {} not found in periodic barriers", database_id);
501 assert!(self.timer_streams.contains_key(&database_id), "timer stream for database {} not found in periodic barriers", database_id);
502 for (db_id, timer_stream) in self.timer_streams.iter_mut() {
504 if *db_id == database_id {
505 timer_stream.as_mut().reset();
506 }
507 }
508 let checkpoint = scheduled.command.need_checkpoint() || self.try_get_checkpoint(database_id);
509 NewBarrier {
510 database_id: scheduled.database_id,
511 command: Some((scheduled.command, scheduled.notifiers)),
512 span: scheduled.span,
513 checkpoint,
514 }
515 },
516 next_timer = pending_on_none(self.timer_streams.next()) => {
519 let (database_id, _instant) = next_timer;
520 let checkpoint = self.try_get_checkpoint(database_id);
521 NewBarrier {
522 database_id,
523 command: None,
524 span: tracing_span(),
525 checkpoint,
526 }
527 }
528 };
529 self.update_num_uncheckpointed_barrier(new_barrier.database_id, new_barrier.checkpoint);
530
531 new_barrier
532 }
533
534 fn try_get_checkpoint(&self, database_id: DatabaseId) -> bool {
536 let db_state = self.databases.get(&database_id).unwrap();
537 let checkpoint_frequency = db_state
538 .checkpoint_frequency
539 .unwrap_or(self.sys_checkpoint_frequency);
540 db_state.num_uncheckpointed_barrier + 1 >= checkpoint_frequency || db_state.force_checkpoint
541 }
542
543 fn update_num_uncheckpointed_barrier(&mut self, database_id: DatabaseId, checkpoint: bool) {
545 let db_state = self.databases.get_mut(&database_id).unwrap();
546 if checkpoint {
547 db_state.num_uncheckpointed_barrier = 0;
548 db_state.force_checkpoint = false;
549 } else {
550 db_state.num_uncheckpointed_barrier += 1;
551 }
552 }
553}
554
555impl ScheduledBarriers {
556 pub(super) async fn next_scheduled(&self) -> Scheduled {
557 'outer: loop {
558 let mut rx = self.inner.changed_tx.subscribe();
559 {
560 let mut queue = self.inner.queue.lock();
561 if queue.status.is_blocked() {
562 continue;
563 }
564 for (database_id, queue) in &mut queue.queue {
565 if queue.status.is_blocked() {
566 continue;
567 }
568 if let Some(item) = queue.queue.inner.pop_front() {
569 item.send_latency_timer.observe_duration();
570 break 'outer Scheduled {
571 database_id: *database_id,
572 command: item.command,
573 notifiers: item.notifiers,
574 span: item.span,
575 };
576 }
577 }
578 }
579 rx.changed().await.unwrap();
580 }
581 }
582}
583
584pub(super) enum MarkReadyOptions {
585 Database(DatabaseId),
586 Global {
587 blocked_databases: HashSet<DatabaseId>,
588 },
589}
590
591impl ScheduledBarriers {
592 pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> Vec<TableId> {
594 self.pre_apply_drop_cancel_scheduled(database_id)
595 }
596
597 pub(super) fn abort_and_mark_blocked(
600 &self,
601 database_id: Option<DatabaseId>,
602 reason: impl Into<String>,
603 ) {
604 let mut queue = self.inner.queue.lock();
605 fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
606 format!("database {} unavailable {}", database_id, reason)
607 }
608 fn mark_blocked_and_notify_failed(
609 database_id: DatabaseId,
610 queue: &mut DatabaseScheduledQueue,
611 reason: &String,
612 ) {
613 let reason = database_blocked_reason(database_id, reason);
614 let err: MetaError = anyhow!("{}", reason).into();
615 queue.mark_blocked(reason);
616 while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
617 notifiers
618 .into_iter()
619 .for_each(|notify| notify.notify_collection_failed(err.clone()))
620 }
621 }
622 if let Some(database_id) = database_id {
623 let reason = reason.into();
624 match queue.queue.entry(database_id) {
625 Entry::Occupied(entry) => {
626 let queue = entry.into_mut();
627 if queue.status.is_blocked() {
628 if cfg!(debug_assertions) {
629 panic!("database {} marked as blocked twice", database_id);
630 } else {
631 warn!(?database_id, "database marked as blocked twice");
632 }
633 }
634 info!(?database_id, "database marked as blocked");
635 mark_blocked_and_notify_failed(database_id, queue, &reason);
636 }
637 Entry::Vacant(entry) => {
638 entry.insert(DatabaseScheduledQueue::new(
639 database_id,
640 &self.inner.metrics,
641 QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
642 ));
643 }
644 }
645 } else {
646 let reason = reason.into();
647 if queue.status.is_blocked() {
648 if cfg!(debug_assertions) {
649 panic!("cluster marked as blocked twice");
650 } else {
651 warn!("cluster marked as blocked twice");
652 }
653 }
654 info!("cluster marked as blocked");
655 queue.mark_blocked(reason.clone());
656 for (database_id, queue) in &mut queue.queue {
657 mark_blocked_and_notify_failed(*database_id, queue, &reason);
658 }
659 }
660 }
661
662 pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
664 let mut queue = self.inner.queue.lock();
665 let queue = &mut *queue;
666 match options {
667 MarkReadyOptions::Database(database_id) => {
668 info!(?database_id, "database marked as ready");
669 let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
670 DatabaseScheduledQueue::new(
671 database_id,
672 &self.inner.metrics,
673 QueueStatus::Ready,
674 )
675 });
676 if !database_queue.status.is_blocked() {
677 if cfg!(debug_assertions) {
678 panic!("database {} marked as ready twice", database_id);
679 } else {
680 warn!(?database_id, "database marked as ready twice");
681 }
682 }
683 if database_queue.mark_ready()
684 && !queue.status.is_blocked()
685 && !database_queue.queue.inner.is_empty()
686 {
687 self.inner.changed_tx.send(()).ok();
688 }
689 }
690 MarkReadyOptions::Global { blocked_databases } => {
691 if !queue.status.is_blocked() {
692 if cfg!(debug_assertions) {
693 panic!("cluster marked as ready twice");
694 } else {
695 warn!("cluster marked as ready twice");
696 }
697 }
698 info!(?blocked_databases, "cluster marked as ready");
699 let prev_blocked = queue.mark_ready();
700 for database_id in &blocked_databases {
701 queue.queue.entry(*database_id).or_insert_with(|| {
702 DatabaseScheduledQueue::new(
703 *database_id,
704 &self.inner.metrics,
705 QueueStatus::Blocked(format!(
706 "database {} failed to recover in global recovery",
707 database_id
708 )),
709 )
710 });
711 }
712 for (database_id, queue) in &mut queue.queue {
713 if !blocked_databases.contains(database_id) {
714 queue.mark_ready();
715 }
716 }
717 if prev_blocked
718 && queue
719 .queue
720 .values()
721 .any(|database_queue| !database_queue.queue.inner.is_empty())
722 {
723 self.inner.changed_tx.send(()).ok();
724 }
725 }
726 }
727 }
728
729 pub(super) fn pre_apply_drop_cancel_scheduled(
732 &self,
733 database_id: Option<DatabaseId>,
734 ) -> Vec<TableId> {
735 let mut queue = self.inner.queue.lock();
736 let mut dropped_tables = vec![];
737
738 let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
739 while let Some(ScheduledQueueItem {
740 notifiers, command, ..
741 }) = queue.queue.inner.pop_front()
742 {
743 match command {
744 Command::DropStreamingJobs {
745 unregistered_state_table_ids,
746 ..
747 } => {
748 dropped_tables.extend(unregistered_state_table_ids);
749 }
750 Command::DropSubscription { .. } => {}
751 _ => {
752 unreachable!("only drop and cancel streaming jobs should be buffered");
753 }
754 }
755 notifiers.into_iter().for_each(|notify| {
756 notify.notify_collected();
757 });
758 }
759 };
760
761 if let Some(database_id) = database_id {
762 assert_matches!(queue.status, QueueStatus::Ready);
763 if let Some(queue) = queue.queue.get_mut(&database_id) {
764 assert_matches!(queue.status, QueueStatus::Blocked(_));
765 pre_apply_drop_cancel(queue);
766 }
767 } else {
768 assert_matches!(queue.status, QueueStatus::Blocked(_));
769 for queue in queue.queue.values_mut() {
770 pre_apply_drop_cancel(queue);
771 }
772 }
773
774 dropped_tables
775 }
776}
777
778#[cfg(test)]
779mod tests {
780 use super::*;
781
782 fn create_test_database(
783 id: u32,
784 barrier_interval_ms: Option<u32>,
785 checkpoint_frequency: Option<u64>,
786 ) -> Database {
787 Database {
788 id: id.into(),
789 name: format!("test_db_{}", id),
790 barrier_interval_ms,
791 checkpoint_frequency,
792 ..Default::default()
793 }
794 }
795
796 struct MockGlobalBarrierWorkerContext {
798 scheduled_rx: tokio::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<Scheduled>>,
799 }
800
801 impl MockGlobalBarrierWorkerContext {
802 fn new() -> (Self, tokio::sync::mpsc::UnboundedSender<Scheduled>) {
803 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
804 (
805 Self {
806 scheduled_rx: tokio::sync::Mutex::new(rx),
807 },
808 tx,
809 )
810 }
811 }
812
813 impl GlobalBarrierWorkerContext for MockGlobalBarrierWorkerContext {
814 async fn next_scheduled(&self) -> Scheduled {
815 self.scheduled_rx.lock().await.recv().await.unwrap()
816 }
817
818 async fn commit_epoch(
819 &self,
820 _commit_info: crate::hummock::CommitEpochInfo,
821 ) -> MetaResult<risingwave_pb::hummock::HummockVersionStats> {
822 unimplemented!()
823 }
824
825 fn abort_and_mark_blocked(
826 &self,
827 _database_id: Option<DatabaseId>,
828 _recovery_reason: crate::barrier::RecoveryReason,
829 ) {
830 unimplemented!()
831 }
832
833 fn mark_ready(&self, _options: MarkReadyOptions) {
834 unimplemented!()
835 }
836
837 async fn post_collect_command<'a>(
838 &'a self,
839 _command: &'a crate::barrier::command::CommandContext,
840 ) -> MetaResult<()> {
841 unimplemented!()
842 }
843
844 async fn notify_creating_job_failed(&self, _database_id: Option<DatabaseId>, _err: String) {
845 unimplemented!()
846 }
847
848 async fn finish_creating_job(
849 &self,
850 _job: crate::barrier::progress::TrackingJob,
851 ) -> MetaResult<()> {
852 unimplemented!()
853 }
854
855 async fn new_control_stream(
856 &self,
857 _node: &risingwave_pb::common::WorkerNode,
858 _init_request: &risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest,
859 ) -> MetaResult<risingwave_rpc_client::StreamingControlHandle> {
860 unimplemented!()
861 }
862
863 async fn reload_runtime_info(
864 &self,
865 ) -> MetaResult<crate::barrier::BarrierWorkerRuntimeInfoSnapshot> {
866 unimplemented!()
867 }
868
869 async fn reload_database_runtime_info(
870 &self,
871 _database_id: DatabaseId,
872 ) -> MetaResult<Option<crate::barrier::DatabaseRuntimeInfoSnapshot>> {
873 unimplemented!()
874 }
875
876 async fn handle_list_finished_source_ids(
877 &self,
878 _list_finished_source_ids: Vec<u32>,
879 ) -> MetaResult<()> {
880 unimplemented!()
881 }
882
883 async fn handle_load_finished_source_ids(
884 &self,
885 _load_finished_source_ids: Vec<u32>,
886 ) -> MetaResult<()> {
887 unimplemented!()
888 }
889
890 async fn finish_cdc_table_backfill(&self, _job_id: JobId) -> MetaResult<()> {
891 unimplemented!()
892 }
893
894 async fn handle_refresh_finished_table_ids(
895 &self,
896 _refresh_finished_table_ids: Vec<JobId>,
897 ) -> MetaResult<()> {
898 unimplemented!()
899 }
900 }
901
902 #[tokio::test]
903 async fn test_next_barrier_with_different_intervals() {
904 let databases = vec![
906 create_test_database(1, Some(50), Some(2)), create_test_database(2, Some(100), Some(3)), create_test_database(3, None, Some(5)), ];
910
911 let mut periodic = PeriodicBarriers::new(
912 Duration::from_millis(200), 10, databases,
915 );
916
917 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
918
919 for _ in 0..3 {
921 let barrier = periodic.next_barrier(&context).await;
922 assert!(barrier.command.is_none()); assert!(!barrier.checkpoint); }
925
926 let start_time = Instant::now();
929 let barrier = periodic.next_barrier(&context).await;
930 let _elapsed = start_time.elapsed();
931
932 assert_eq!(barrier.database_id, DatabaseId::from(1));
934 assert!(barrier.command.is_none()); assert!(barrier.checkpoint); let db1_id = DatabaseId::from(1);
945 let db1_state = periodic.databases.get_mut(&db1_id).unwrap();
946 assert_eq!(db1_state.num_uncheckpointed_barrier, 0); }
948
949 #[tokio::test]
950 async fn test_next_barrier_with_scheduled_command() {
951 let databases = vec![
952 create_test_database(1, Some(1000), Some(2)), ];
954
955 let mut periodic = PeriodicBarriers::new(Duration::from_millis(1000), 10, databases);
956
957 let (context, tx) = MockGlobalBarrierWorkerContext::new();
958
959 periodic.next_barrier(&context).await;
961
962 let scheduled_command = Scheduled {
964 database_id: DatabaseId::from(1),
965 command: Command::Flush,
966 notifiers: vec![],
967 span: tracing::Span::none(),
968 };
969
970 let tx_clone = tx.clone();
972 tokio::spawn(async move {
973 tokio::time::sleep(Duration::from_millis(10)).await;
974 tx_clone.send(scheduled_command).unwrap();
975 });
976
977 let barrier = periodic.next_barrier(&context).await;
978
979 assert!(barrier.command.is_some());
981 assert_eq!(barrier.database_id, DatabaseId::from(1));
982
983 if let Some((command, _)) = barrier.command {
984 assert!(matches!(command, Command::Flush));
985 }
986 }
987
988 #[tokio::test]
989 async fn test_next_barrier_multiple_databases_timing() {
990 let databases = vec![
991 create_test_database(1, Some(30), Some(10)), create_test_database(2, Some(100), Some(10)), ];
994
995 let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 10, databases);
996
997 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
998
999 for _ in 0..2 {
1001 periodic.next_barrier(&context).await;
1002 }
1003
1004 let mut barrier_counts = HashMap::new();
1005
1006 let mut barriers = Vec::new();
1008 for _ in 0..5 {
1009 let barrier = periodic.next_barrier(&context).await;
1010 barriers.push(barrier);
1011 }
1012
1013 for barrier in barriers {
1015 *barrier_counts.entry(barrier.database_id).or_insert(0) += 1;
1016 }
1017
1018 let db1_count = barrier_counts.get(&DatabaseId::from(1)).unwrap_or(&0);
1020 let db2_count = barrier_counts.get(&DatabaseId::from(2)).unwrap_or(&0);
1021
1022 assert!(*db1_count >= *db2_count);
1024 }
1025
1026 #[tokio::test]
1027 async fn test_next_barrier_force_checkpoint() {
1028 let databases = vec![create_test_database(1, Some(100), Some(10))];
1029
1030 let mut periodic = PeriodicBarriers::new(Duration::from_millis(100), 10, databases);
1031
1032 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1033
1034 periodic.force_checkpoint_in_next_barrier(DatabaseId::from(1));
1036
1037 let barrier = periodic.next_barrier(&context).await;
1038
1039 assert!(barrier.checkpoint);
1041 assert_eq!(barrier.database_id, DatabaseId::from(1));
1042 assert!(barrier.command.is_none());
1043 }
1044
1045 #[tokio::test]
1046 async fn test_next_barrier_checkpoint_frequency() {
1047 let databases = vec![create_test_database(1, Some(50), Some(2))]; let mut periodic = PeriodicBarriers::new(Duration::from_millis(50), 10, databases);
1050
1051 let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1052
1053 let barrier1 = periodic.next_barrier(&context).await;
1055 assert!(!barrier1.checkpoint);
1056
1057 let barrier2 = periodic.next_barrier(&context).await;
1059 assert!(barrier2.checkpoint);
1060
1061 let barrier3 = periodic.next_barrier(&context).await;
1063 assert!(!barrier3.checkpoint);
1064 }
1065
1066 #[tokio::test]
1067 async fn test_update_database_barrier() {
1068 let databases = vec![create_test_database(1, Some(1000), Some(10))];
1069
1070 let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 20, databases);
1071
1072 periodic.update_database_barrier(DatabaseId::from(1), Some(2000), Some(15));
1074
1075 let db_state = periodic.databases.get(&DatabaseId::from(1)).unwrap();
1076 assert_eq!(db_state.barrier_interval, Some(Duration::from_millis(2000)));
1077 assert_eq!(db_state.checkpoint_frequency, Some(15));
1078 assert_eq!(db_state.num_uncheckpointed_barrier, 0);
1079 assert!(!db_state.force_checkpoint);
1080
1081 periodic.update_database_barrier(DatabaseId::from(2), None, None);
1083
1084 assert!(periodic.databases.contains_key(&DatabaseId::from(2)));
1085 let db2_state = periodic.databases.get(&DatabaseId::from(2)).unwrap();
1086 assert_eq!(db2_state.barrier_interval, None);
1087 assert_eq!(db2_state.checkpoint_frequency, None);
1088 }
1089}