1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use anyhow::{Context, anyhow};
21use assert_matches::assert_matches;
22use await_tree::InstrumentAwait;
23use itertools::Itertools;
24use parking_lot::Mutex;
25use prometheus::HistogramTimer;
26use risingwave_common::catalog::{DatabaseId, TableId};
27use risingwave_common::metrics::LabelGuardedHistogram;
28use risingwave_hummock_sdk::HummockVersionId;
29use risingwave_pb::catalog::Database;
30use rw_futures_util::pending_on_none;
31use tokio::select;
32use tokio::sync::{oneshot, watch};
33use tokio_stream::wrappers::IntervalStream;
34use tokio_stream::{StreamExt, StreamMap};
35use tracing::{info, warn};
36
37use super::notifier::Notifier;
38use super::{Command, Scheduled};
39use crate::barrier::context::GlobalBarrierWorkerContext;
40use crate::hummock::HummockManagerRef;
41use crate::rpc::metrics::{GLOBAL_META_METRICS, MetaMetrics};
42use crate::{MetaError, MetaResult};
43
44pub(super) struct NewBarrier {
45    pub database_id: DatabaseId,
46    pub command: Option<(Command, Vec<Notifier>)>,
47    pub span: tracing::Span,
48    pub checkpoint: bool,
49}
50
51struct Inner {
56    queue: Mutex<ScheduledQueue>,
57
58    changed_tx: watch::Sender<()>,
60
61    metrics: Arc<MetaMetrics>,
63}
64
65#[derive(Debug)]
66enum QueueStatus {
67    Ready,
69    Blocked(String),
71}
72
73impl QueueStatus {
74    fn is_blocked(&self) -> bool {
75        matches!(self, Self::Blocked(_))
76    }
77}
78
79struct ScheduledQueueItem {
80    command: Command,
81    notifiers: Vec<Notifier>,
82    send_latency_timer: HistogramTimer,
83    span: tracing::Span,
84}
85
86struct StatusQueue<T> {
87    queue: T,
88    status: QueueStatus,
89}
90
91struct DatabaseQueue {
92    inner: VecDeque<ScheduledQueueItem>,
93    send_latency: LabelGuardedHistogram,
94}
95
96type DatabaseScheduledQueue = StatusQueue<DatabaseQueue>;
97type ScheduledQueue = StatusQueue<HashMap<DatabaseId, DatabaseScheduledQueue>>;
98
99impl DatabaseScheduledQueue {
100    fn new(database_id: DatabaseId, metrics: &MetaMetrics, status: QueueStatus) -> Self {
101        Self {
102            queue: DatabaseQueue {
103                inner: Default::default(),
104                send_latency: metrics
105                    .barrier_send_latency
106                    .with_guarded_label_values(&[database_id.database_id.to_string().as_str()]),
107            },
108            status,
109        }
110    }
111}
112
113impl<T> StatusQueue<T> {
114    fn mark_blocked(&mut self, reason: String) {
115        self.status = QueueStatus::Blocked(reason);
116    }
117
118    fn mark_ready(&mut self) -> bool {
119        let prev_blocked = self.status.is_blocked();
120        self.status = QueueStatus::Ready;
121        prev_blocked
122    }
123
124    fn validate_item(&mut self, command: &Command) -> MetaResult<()> {
125        if let QueueStatus::Blocked(reason) = &self.status
131            && !matches!(
132                command,
133                Command::DropStreamingJobs { .. } | Command::DropSubscription { .. }
134            )
135        {
136            return Err(MetaError::unavailable(reason));
137        }
138        Ok(())
139    }
140}
141
142fn tracing_span() -> tracing::Span {
143    if tracing::Span::current().is_none() {
144        tracing::Span::none()
145    } else {
146        tracing::info_span!(
147            "barrier",
148            checkpoint = tracing::field::Empty,
149            epoch = tracing::field::Empty
150        )
151    }
152}
153
154#[derive(Clone)]
157pub struct BarrierScheduler {
158    inner: Arc<Inner>,
159
160    hummock_manager: HummockManagerRef,
162}
163
164impl BarrierScheduler {
165    pub fn new_pair(
168        hummock_manager: HummockManagerRef,
169        metrics: Arc<MetaMetrics>,
170    ) -> (Self, ScheduledBarriers) {
171        let inner = Arc::new(Inner {
172            queue: Mutex::new(ScheduledQueue {
173                queue: Default::default(),
174                status: QueueStatus::Ready,
175            }),
176            changed_tx: watch::channel(()).0,
177            metrics,
178        });
179
180        (
181            Self {
182                inner: inner.clone(),
183                hummock_manager,
184            },
185            ScheduledBarriers { inner },
186        )
187    }
188
189    fn push(
191        &self,
192        database_id: DatabaseId,
193        scheduleds: impl IntoIterator<Item = (Command, Notifier)>,
194    ) -> MetaResult<()> {
195        let mut queue = self.inner.queue.lock();
196        let scheduleds = scheduleds.into_iter().collect_vec();
197        scheduleds
198            .iter()
199            .try_for_each(|(command, _)| queue.validate_item(command))?;
200        let queue = queue.queue.entry(database_id).or_insert_with(|| {
201            DatabaseScheduledQueue::new(database_id, &self.inner.metrics, QueueStatus::Ready)
202        });
203        scheduleds
204            .iter()
205            .try_for_each(|(command, _)| queue.validate_item(command))?;
206        for (command, notifier) in scheduleds {
207            queue.queue.inner.push_back(ScheduledQueueItem {
208                command,
209                notifiers: vec![notifier],
210                send_latency_timer: queue.queue.send_latency.start_timer(),
211                span: tracing_span(),
212            });
213            if queue.queue.inner.len() == 1 {
214                self.inner.changed_tx.send(()).ok();
215            }
216        }
217        Ok(())
218    }
219
220    pub fn try_cancel_scheduled_create(&self, database_id: DatabaseId, table_id: TableId) -> bool {
222        let queue = &mut self.inner.queue.lock();
223        let Some(queue) = queue.queue.get_mut(&database_id) else {
224            return false;
225        };
226
227        if let Some(idx) = queue.queue.inner.iter().position(|scheduled| {
228            if let Command::CreateStreamingJob { info, .. } = &scheduled.command
229                && info.stream_job_fragments.stream_job_id() == table_id
230            {
231                true
232            } else {
233                false
234            }
235        }) {
236            queue.queue.inner.remove(idx).unwrap();
237            true
238        } else {
239            false
240        }
241    }
242
243    #[await_tree::instrument("run_commands({})", commands.iter().join(", "))]
250    async fn run_multiple_commands(
251        &self,
252        database_id: DatabaseId,
253        commands: Vec<Command>,
254    ) -> MetaResult<()> {
255        let mut contexts = Vec::with_capacity(commands.len());
256        let mut scheduleds = Vec::with_capacity(commands.len());
257
258        for command in commands {
259            let (started_tx, started_rx) = oneshot::channel();
260            let (collect_tx, collect_rx) = oneshot::channel();
261
262            contexts.push((started_rx, collect_rx));
263            scheduleds.push((
264                command,
265                Notifier {
266                    started: Some(started_tx),
267                    collected: Some(collect_tx),
268                },
269            ));
270        }
271
272        self.push(database_id, scheduleds)?;
273
274        for (injected_rx, collect_rx) in contexts {
275            tracing::trace!("waiting for injected_rx");
277            injected_rx
278                .instrument_await("wait_injected")
279                .await
280                .ok()
281                .context("failed to inject barrier")??;
282
283            tracing::trace!("waiting for collect_rx");
284            collect_rx
286                .instrument_await("wait_collected")
287                .await
288                .ok()
289                .context("failed to collect barrier")??;
290        }
291
292        Ok(())
293    }
294
295    pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
299        tracing::trace!("run_command: {:?}", command);
300        let ret = self.run_multiple_commands(database_id, vec![command]).await;
301        tracing::trace!("run_command finished");
302        ret
303    }
304
305    pub fn run_command_no_wait(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> {
307        tracing::trace!("run_command_no_wait: {:?}", command);
308        self.push(database_id, vec![(command, Notifier::default())])
309    }
310
311    pub async fn flush(&self, database_id: DatabaseId) -> MetaResult<HummockVersionId> {
313        let start = Instant::now();
314
315        tracing::debug!("start barrier flush");
316        self.run_multiple_commands(database_id, vec![Command::Flush])
317            .await?;
318
319        let elapsed = Instant::now().duration_since(start);
320        tracing::debug!("barrier flushed in {:?}", elapsed);
321
322        let version_id = self.hummock_manager.get_version_id().await;
323        Ok(version_id)
324    }
325}
326
327pub struct ScheduledBarriers {
329    inner: Arc<Inner>,
330}
331
332#[derive(Debug)]
334pub struct DatabaseBarrierState {
335    pub barrier_interval: Option<Duration>,
336    pub checkpoint_frequency: Option<u64>,
337    pub force_checkpoint: bool,
339    pub num_uncheckpointed_barrier: u64,
341}
342
343impl DatabaseBarrierState {
344    fn new(barrier_interval_ms: Option<u32>, checkpoint_frequency: Option<u64>) -> Self {
345        Self {
346            barrier_interval: barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64)),
347            checkpoint_frequency,
348            force_checkpoint: false,
349            num_uncheckpointed_barrier: 0,
350        }
351    }
352}
353
354#[derive(Default, Debug)]
356pub struct PeriodicBarriers {
357    sys_barrier_interval: Duration,
359    sys_checkpoint_frequency: u64,
360    databases: HashMap<DatabaseId, DatabaseBarrierState>,
362    timer_streams: StreamMap<DatabaseId, IntervalStream>,
365}
366
367impl PeriodicBarriers {
368    pub(super) fn new(
369        sys_barrier_interval: Duration,
370        sys_checkpoint_frequency: u64,
371        database_infos: Vec<Database>,
372    ) -> Self {
373        let mut databases = HashMap::with_capacity(database_infos.len());
374        let mut timer_streams = StreamMap::with_capacity(database_infos.len());
375        database_infos.into_iter().for_each(|database| {
376            let database_id: DatabaseId = database.id.into();
377            let barrier_interval_ms = database.barrier_interval_ms;
378            let checkpoint_frequency = database.checkpoint_frequency;
379            databases.insert(
380                database_id,
381                DatabaseBarrierState::new(barrier_interval_ms, checkpoint_frequency),
382            );
383            let duration = if let Some(ms) = barrier_interval_ms {
384                Duration::from_millis(ms as u64)
385            } else {
386                sys_barrier_interval
387            };
388
389            let interval_stream = Self::new_interval_stream(duration, &database_id);
391            timer_streams.insert(database_id, interval_stream);
392        });
393        Self {
394            sys_barrier_interval,
395            sys_checkpoint_frequency,
396            databases,
397            timer_streams,
398        }
399    }
400
401    fn new_interval_stream(duration: Duration, database_id: &DatabaseId) -> IntervalStream {
403        GLOBAL_META_METRICS
404            .barrier_interval_by_database
405            .with_label_values(&[&database_id.to_string()])
406            .set(duration.as_millis_f64());
407        let mut interval = tokio::time::interval(duration);
408        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
409        IntervalStream::new(interval)
410    }
411
412    pub(super) fn set_sys_barrier_interval(&mut self, duration: Duration) {
414        if self.sys_barrier_interval == duration {
415            return;
416        }
417        self.sys_barrier_interval = duration;
418        for (db_id, db_state) in &mut self.databases {
420            if db_state.barrier_interval.is_none() {
421                let interval_stream = Self::new_interval_stream(duration, db_id);
422                self.timer_streams.insert(*db_id, interval_stream);
423            }
424        }
425    }
426
427    pub fn set_sys_checkpoint_frequency(&mut self, frequency: u64) {
429        if self.sys_checkpoint_frequency == frequency {
430            return;
431        }
432        self.sys_checkpoint_frequency = frequency;
433        for db_state in self.databases.values_mut() {
435            if db_state.checkpoint_frequency.is_none() {
436                db_state.num_uncheckpointed_barrier = 0;
437                db_state.force_checkpoint = false;
438            }
439        }
440    }
441
442    pub(super) fn update_database_barrier(
443        &mut self,
444        database_id: DatabaseId,
445        barrier_interval_ms: Option<u32>,
446        checkpoint_frequency: Option<u64>,
447    ) {
448        match self.databases.entry(database_id) {
449            Entry::Occupied(mut entry) => {
450                let db_state = entry.get_mut();
451                db_state.barrier_interval =
452                    barrier_interval_ms.map(|ms| Duration::from_millis(ms as u64));
453                db_state.checkpoint_frequency = checkpoint_frequency;
454                db_state.num_uncheckpointed_barrier = 0;
456                db_state.force_checkpoint = false;
457            }
458            Entry::Vacant(entry) => {
459                entry.insert(DatabaseBarrierState::new(
460                    barrier_interval_ms,
461                    checkpoint_frequency,
462                ));
463            }
464        }
465
466        let duration = if let Some(ms) = barrier_interval_ms {
468            Duration::from_millis(ms as u64)
469        } else {
470            self.sys_barrier_interval
471        };
472
473        let interval_stream = Self::new_interval_stream(duration, &database_id);
474        self.timer_streams.insert(database_id, interval_stream);
475    }
476
477    pub fn force_checkpoint_in_next_barrier(&mut self, database_id: DatabaseId) {
479        if let Some(db_state) = self.databases.get_mut(&database_id) {
480            db_state.force_checkpoint = true;
481        } else {
482            warn!(
483                ?database_id,
484                "force checkpoint in next barrier for non-existing database"
485            );
486        }
487    }
488
489    #[await_tree::instrument]
490    pub(super) async fn next_barrier(
491        &mut self,
492        context: &impl GlobalBarrierWorkerContext,
493    ) -> NewBarrier {
494        let new_barrier = select! {
495            biased;
496            scheduled = context.next_scheduled() => {
497                let database_id = scheduled.database_id;
498                assert!(self.databases.contains_key(&database_id), "database {} not found in periodic barriers", database_id);
500                assert!(self.timer_streams.contains_key(&database_id), "timer stream for database {} not found in periodic barriers", database_id);
501                for (db_id, timer_stream) in self.timer_streams.iter_mut() {
503                    if *db_id == database_id {
504                        timer_stream.as_mut().reset();
505                    }
506                }
507                let checkpoint = scheduled.command.need_checkpoint() || self.try_get_checkpoint(database_id);
508                NewBarrier {
509                    database_id: scheduled.database_id,
510                    command: Some((scheduled.command, scheduled.notifiers)),
511                    span: scheduled.span,
512                    checkpoint,
513                }
514            },
515            next_timer = pending_on_none(self.timer_streams.next()) => {
518                let (database_id, _instant) = next_timer;
519                let checkpoint = self.try_get_checkpoint(database_id);
520                NewBarrier {
521                    database_id,
522                    command: None,
523                    span: tracing_span(),
524                    checkpoint,
525                }
526            }
527        };
528        self.update_num_uncheckpointed_barrier(new_barrier.database_id, new_barrier.checkpoint);
529
530        new_barrier
531    }
532
533    fn try_get_checkpoint(&self, database_id: DatabaseId) -> bool {
535        let db_state = self.databases.get(&database_id).unwrap();
536        let checkpoint_frequency = db_state
537            .checkpoint_frequency
538            .unwrap_or(self.sys_checkpoint_frequency);
539        db_state.num_uncheckpointed_barrier + 1 >= checkpoint_frequency || db_state.force_checkpoint
540    }
541
542    fn update_num_uncheckpointed_barrier(&mut self, database_id: DatabaseId, checkpoint: bool) {
544        let db_state = self.databases.get_mut(&database_id).unwrap();
545        if checkpoint {
546            db_state.num_uncheckpointed_barrier = 0;
547            db_state.force_checkpoint = false;
548        } else {
549            db_state.num_uncheckpointed_barrier += 1;
550        }
551    }
552}
553
554impl ScheduledBarriers {
555    pub(super) async fn next_scheduled(&self) -> Scheduled {
556        'outer: loop {
557            let mut rx = self.inner.changed_tx.subscribe();
558            {
559                let mut queue = self.inner.queue.lock();
560                if queue.status.is_blocked() {
561                    continue;
562                }
563                for (database_id, queue) in &mut queue.queue {
564                    if queue.status.is_blocked() {
565                        continue;
566                    }
567                    if let Some(item) = queue.queue.inner.pop_front() {
568                        item.send_latency_timer.observe_duration();
569                        break 'outer Scheduled {
570                            database_id: *database_id,
571                            command: item.command,
572                            notifiers: item.notifiers,
573                            span: item.span,
574                        };
575                    }
576                }
577            }
578            rx.changed().await.unwrap();
579        }
580    }
581}
582
583pub(super) enum MarkReadyOptions {
584    Database(DatabaseId),
585    Global {
586        blocked_databases: HashSet<DatabaseId>,
587    },
588}
589
590impl ScheduledBarriers {
591    pub(super) fn pre_apply_drop_cancel(&self, database_id: Option<DatabaseId>) -> Vec<TableId> {
593        self.pre_apply_drop_cancel_scheduled(database_id)
594    }
595
596    pub(super) fn abort_and_mark_blocked(
599        &self,
600        database_id: Option<DatabaseId>,
601        reason: impl Into<String>,
602    ) {
603        let mut queue = self.inner.queue.lock();
604        fn database_blocked_reason(database_id: DatabaseId, reason: &String) -> String {
605            format!("database {} unavailable {}", database_id, reason)
606        }
607        fn mark_blocked_and_notify_failed(
608            database_id: DatabaseId,
609            queue: &mut DatabaseScheduledQueue,
610            reason: &String,
611        ) {
612            let reason = database_blocked_reason(database_id, reason);
613            let err: MetaError = anyhow!("{}", reason).into();
614            queue.mark_blocked(reason);
615            while let Some(ScheduledQueueItem { notifiers, .. }) = queue.queue.inner.pop_front() {
616                notifiers
617                    .into_iter()
618                    .for_each(|notify| notify.notify_collection_failed(err.clone()))
619            }
620        }
621        if let Some(database_id) = database_id {
622            let reason = reason.into();
623            match queue.queue.entry(database_id) {
624                Entry::Occupied(entry) => {
625                    let queue = entry.into_mut();
626                    if queue.status.is_blocked() {
627                        if cfg!(debug_assertions) {
628                            panic!("database {} marked as blocked twice", database_id);
629                        } else {
630                            warn!(?database_id, "database marked as blocked twice");
631                        }
632                    }
633                    info!(?database_id, "database marked as blocked");
634                    mark_blocked_and_notify_failed(database_id, queue, &reason);
635                }
636                Entry::Vacant(entry) => {
637                    entry.insert(DatabaseScheduledQueue::new(
638                        database_id,
639                        &self.inner.metrics,
640                        QueueStatus::Blocked(database_blocked_reason(database_id, &reason)),
641                    ));
642                }
643            }
644        } else {
645            let reason = reason.into();
646            if queue.status.is_blocked() {
647                if cfg!(debug_assertions) {
648                    panic!("cluster marked as blocked twice");
649                } else {
650                    warn!("cluster marked as blocked twice");
651                }
652            }
653            info!("cluster marked as blocked");
654            queue.mark_blocked(reason.clone());
655            for (database_id, queue) in &mut queue.queue {
656                mark_blocked_and_notify_failed(*database_id, queue, &reason);
657            }
658        }
659    }
660
661    pub(super) fn mark_ready(&self, options: MarkReadyOptions) {
663        let mut queue = self.inner.queue.lock();
664        let queue = &mut *queue;
665        match options {
666            MarkReadyOptions::Database(database_id) => {
667                info!(?database_id, "database marked as ready");
668                let database_queue = queue.queue.entry(database_id).or_insert_with(|| {
669                    DatabaseScheduledQueue::new(
670                        database_id,
671                        &self.inner.metrics,
672                        QueueStatus::Ready,
673                    )
674                });
675                if !database_queue.status.is_blocked() {
676                    if cfg!(debug_assertions) {
677                        panic!("database {} marked as ready twice", database_id);
678                    } else {
679                        warn!(?database_id, "database marked as ready twice");
680                    }
681                }
682                if database_queue.mark_ready()
683                    && !queue.status.is_blocked()
684                    && !database_queue.queue.inner.is_empty()
685                {
686                    self.inner.changed_tx.send(()).ok();
687                }
688            }
689            MarkReadyOptions::Global { blocked_databases } => {
690                if !queue.status.is_blocked() {
691                    if cfg!(debug_assertions) {
692                        panic!("cluster marked as ready twice");
693                    } else {
694                        warn!("cluster marked as ready twice");
695                    }
696                }
697                info!(?blocked_databases, "cluster marked as ready");
698                let prev_blocked = queue.mark_ready();
699                for database_id in &blocked_databases {
700                    queue.queue.entry(*database_id).or_insert_with(|| {
701                        DatabaseScheduledQueue::new(
702                            *database_id,
703                            &self.inner.metrics,
704                            QueueStatus::Blocked(format!(
705                                "database {} failed to recover in global recovery",
706                                database_id
707                            )),
708                        )
709                    });
710                }
711                for (database_id, queue) in &mut queue.queue {
712                    if !blocked_databases.contains(database_id) {
713                        queue.mark_ready();
714                    }
715                }
716                if prev_blocked
717                    && queue
718                        .queue
719                        .values()
720                        .any(|database_queue| !database_queue.queue.inner.is_empty())
721                {
722                    self.inner.changed_tx.send(()).ok();
723                }
724            }
725        }
726    }
727
728    pub(super) fn pre_apply_drop_cancel_scheduled(
731        &self,
732        database_id: Option<DatabaseId>,
733    ) -> Vec<TableId> {
734        let mut queue = self.inner.queue.lock();
735        let mut dropped_tables = vec![];
736
737        let mut pre_apply_drop_cancel = |queue: &mut DatabaseScheduledQueue| {
738            while let Some(ScheduledQueueItem {
739                notifiers, command, ..
740            }) = queue.queue.inner.pop_front()
741            {
742                match command {
743                    Command::DropStreamingJobs {
744                        unregistered_state_table_ids,
745                        ..
746                    } => {
747                        dropped_tables.extend(unregistered_state_table_ids);
748                    }
749                    Command::DropSubscription { .. } => {}
750                    _ => {
751                        unreachable!("only drop and cancel streaming jobs should be buffered");
752                    }
753                }
754                notifiers.into_iter().for_each(|notify| {
755                    notify.notify_collected();
756                });
757            }
758        };
759
760        if let Some(database_id) = database_id {
761            assert_matches!(queue.status, QueueStatus::Ready);
762            if let Some(queue) = queue.queue.get_mut(&database_id) {
763                assert_matches!(queue.status, QueueStatus::Blocked(_));
764                pre_apply_drop_cancel(queue);
765            }
766        } else {
767            assert_matches!(queue.status, QueueStatus::Blocked(_));
768            for queue in queue.queue.values_mut() {
769                pre_apply_drop_cancel(queue);
770            }
771        }
772
773        dropped_tables
774    }
775}
776
777#[cfg(test)]
778mod tests {
779    use super::*;
780
781    fn create_test_database(
782        id: u32,
783        barrier_interval_ms: Option<u32>,
784        checkpoint_frequency: Option<u64>,
785    ) -> Database {
786        Database {
787            id,
788            name: format!("test_db_{}", id),
789            barrier_interval_ms,
790            checkpoint_frequency,
791            ..Default::default()
792        }
793    }
794
795    struct MockGlobalBarrierWorkerContext {
797        scheduled_rx: tokio::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<Scheduled>>,
798    }
799
800    impl MockGlobalBarrierWorkerContext {
801        fn new() -> (Self, tokio::sync::mpsc::UnboundedSender<Scheduled>) {
802            let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
803            (
804                Self {
805                    scheduled_rx: tokio::sync::Mutex::new(rx),
806                },
807                tx,
808            )
809        }
810    }
811
812    impl GlobalBarrierWorkerContext for MockGlobalBarrierWorkerContext {
813        async fn next_scheduled(&self) -> Scheduled {
814            self.scheduled_rx.lock().await.recv().await.unwrap()
815        }
816
817        async fn commit_epoch(
818            &self,
819            _commit_info: crate::hummock::CommitEpochInfo,
820        ) -> MetaResult<risingwave_pb::hummock::HummockVersionStats> {
821            unimplemented!()
822        }
823
824        fn abort_and_mark_blocked(
825            &self,
826            _database_id: Option<DatabaseId>,
827            _recovery_reason: crate::barrier::RecoveryReason,
828        ) {
829            unimplemented!()
830        }
831
832        fn mark_ready(&self, _options: MarkReadyOptions) {
833            unimplemented!()
834        }
835
836        async fn post_collect_command<'a>(
837            &'a self,
838            _command: &'a crate::barrier::command::CommandContext,
839        ) -> MetaResult<()> {
840            unimplemented!()
841        }
842
843        async fn notify_creating_job_failed(&self, _database_id: Option<DatabaseId>, _err: String) {
844            unimplemented!()
845        }
846
847        async fn finish_creating_job(
848            &self,
849            _job: crate::barrier::progress::TrackingJob,
850        ) -> MetaResult<()> {
851            unimplemented!()
852        }
853
854        async fn new_control_stream(
855            &self,
856            _node: &risingwave_pb::common::WorkerNode,
857            _init_request: &risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest,
858        ) -> MetaResult<risingwave_rpc_client::StreamingControlHandle> {
859            unimplemented!()
860        }
861
862        async fn reload_runtime_info(
863            &self,
864        ) -> MetaResult<crate::barrier::BarrierWorkerRuntimeInfoSnapshot> {
865            unimplemented!()
866        }
867
868        async fn reload_database_runtime_info(
869            &self,
870            _database_id: DatabaseId,
871        ) -> MetaResult<Option<crate::barrier::DatabaseRuntimeInfoSnapshot>> {
872            unimplemented!()
873        }
874
875        async fn handle_list_finished_source_ids(
876            &self,
877            _list_finished_source_ids: Vec<u32>,
878        ) -> MetaResult<()> {
879            unimplemented!()
880        }
881
882        async fn handle_load_finished_source_ids(
883            &self,
884            _load_finished_source_ids: Vec<u32>,
885        ) -> MetaResult<()> {
886            unimplemented!()
887        }
888
889        async fn finish_cdc_table_backfill(&self, _job_id: TableId) -> MetaResult<()> {
890            unimplemented!()
891        }
892
893        async fn handle_refresh_finished_table_ids(
894            &self,
895            _refresh_finished_table_ids: Vec<u32>,
896        ) -> MetaResult<()> {
897            unimplemented!()
898        }
899    }
900
901    #[tokio::test]
902    async fn test_next_barrier_with_different_intervals() {
903        let databases = vec![
905            create_test_database(1, Some(50), Some(2)), create_test_database(2, Some(100), Some(3)), create_test_database(3, None, Some(5)), ];
909
910        let mut periodic = PeriodicBarriers::new(
911            Duration::from_millis(200), 10,                         databases,
914        );
915
916        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
917
918        for _ in 0..3 {
920            let barrier = periodic.next_barrier(&context).await;
921            assert!(barrier.command.is_none()); assert!(!barrier.checkpoint); }
924
925        let start_time = Instant::now();
928        let barrier = periodic.next_barrier(&context).await;
929        let _elapsed = start_time.elapsed();
930
931        assert_eq!(barrier.database_id, DatabaseId::from(1));
933        assert!(barrier.command.is_none()); assert!(barrier.checkpoint); let db1_id = DatabaseId::from(1);
944        let db1_state = periodic.databases.get_mut(&db1_id).unwrap();
945        assert_eq!(db1_state.num_uncheckpointed_barrier, 0); }
947
948    #[tokio::test]
949    async fn test_next_barrier_with_scheduled_command() {
950        let databases = vec![
951            create_test_database(1, Some(1000), Some(2)), ];
953
954        let mut periodic = PeriodicBarriers::new(Duration::from_millis(1000), 10, databases);
955
956        let (context, tx) = MockGlobalBarrierWorkerContext::new();
957
958        periodic.next_barrier(&context).await;
960
961        let scheduled_command = Scheduled {
963            database_id: DatabaseId::from(1),
964            command: Command::Flush,
965            notifiers: vec![],
966            span: tracing::Span::none(),
967        };
968
969        let tx_clone = tx.clone();
971        tokio::spawn(async move {
972            tokio::time::sleep(Duration::from_millis(10)).await;
973            tx_clone.send(scheduled_command).unwrap();
974        });
975
976        let barrier = periodic.next_barrier(&context).await;
977
978        assert!(barrier.command.is_some());
980        assert_eq!(barrier.database_id, DatabaseId::from(1));
981
982        if let Some((command, _)) = barrier.command {
983            assert!(matches!(command, Command::Flush));
984        }
985    }
986
987    #[tokio::test]
988    async fn test_next_barrier_multiple_databases_timing() {
989        let databases = vec![
990            create_test_database(1, Some(30), Some(10)), create_test_database(2, Some(100), Some(10)), ];
993
994        let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 10, databases);
995
996        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
997
998        for _ in 0..2 {
1000            periodic.next_barrier(&context).await;
1001        }
1002
1003        let mut barrier_counts = HashMap::new();
1004
1005        let mut barriers = Vec::new();
1007        for _ in 0..5 {
1008            let barrier = periodic.next_barrier(&context).await;
1009            barriers.push(barrier);
1010        }
1011
1012        for barrier in barriers {
1014            *barrier_counts.entry(barrier.database_id).or_insert(0) += 1;
1015        }
1016
1017        let db1_count = barrier_counts.get(&DatabaseId::from(1)).unwrap_or(&0);
1019        let db2_count = barrier_counts.get(&DatabaseId::from(2)).unwrap_or(&0);
1020
1021        assert!(*db1_count >= *db2_count);
1023    }
1024
1025    #[tokio::test]
1026    async fn test_next_barrier_force_checkpoint() {
1027        let databases = vec![create_test_database(1, Some(100), Some(10))];
1028
1029        let mut periodic = PeriodicBarriers::new(Duration::from_millis(100), 10, databases);
1030
1031        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1032
1033        periodic.force_checkpoint_in_next_barrier(DatabaseId::from(1));
1035
1036        let barrier = periodic.next_barrier(&context).await;
1037
1038        assert!(barrier.checkpoint);
1040        assert_eq!(barrier.database_id, DatabaseId::from(1));
1041        assert!(barrier.command.is_none());
1042    }
1043
1044    #[tokio::test]
1045    async fn test_next_barrier_checkpoint_frequency() {
1046        let databases = vec![create_test_database(1, Some(50), Some(2))]; let mut periodic = PeriodicBarriers::new(Duration::from_millis(50), 10, databases);
1049
1050        let (context, _tx) = MockGlobalBarrierWorkerContext::new();
1051
1052        let barrier1 = periodic.next_barrier(&context).await;
1054        assert!(!barrier1.checkpoint);
1055
1056        let barrier2 = periodic.next_barrier(&context).await;
1058        assert!(barrier2.checkpoint);
1059
1060        let barrier3 = periodic.next_barrier(&context).await;
1062        assert!(!barrier3.checkpoint);
1063    }
1064
1065    #[tokio::test]
1066    async fn test_update_database_barrier() {
1067        let databases = vec![create_test_database(1, Some(1000), Some(10))];
1068
1069        let mut periodic = PeriodicBarriers::new(Duration::from_millis(500), 20, databases);
1070
1071        periodic.update_database_barrier(DatabaseId::from(1), Some(2000), Some(15));
1073
1074        let db_state = periodic.databases.get(&DatabaseId::from(1)).unwrap();
1075        assert_eq!(db_state.barrier_interval, Some(Duration::from_millis(2000)));
1076        assert_eq!(db_state.checkpoint_frequency, Some(15));
1077        assert_eq!(db_state.num_uncheckpointed_barrier, 0);
1078        assert!(!db_state.force_checkpoint);
1079
1080        periodic.update_database_barrier(DatabaseId::from(2), None, None);
1082
1083        assert!(periodic.databases.contains_key(&DatabaseId::from(2)));
1084        let db2_state = periodic.databases.get(&DatabaseId::from(2)).unwrap();
1085        assert_eq!(db2_state.barrier_interval, None);
1086        assert_eq!(db2_state.checkpoint_frequency, None);
1087    }
1088}