risingwave_stream/executor/
now.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::array::Op;
17use risingwave_common::row;
18use risingwave_common::types::{DefaultOrdered, Interval, Timestamptz, ToDatumRef};
19use risingwave_expr::capture_context;
20use risingwave_expr::expr::{
21    EvalErrorReport, ExpressionBoxExt, InputRefExpression, LiteralExpression, NonStrictExpression,
22    build_func_non_strict,
23};
24use risingwave_expr::expr_context::TIME_ZONE;
25use tokio::sync::mpsc::UnboundedReceiver;
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28use crate::executor::prelude::*;
29use crate::task::ActorEvalErrorReport;
30
31pub struct NowExecutor<S: StateStore> {
32    data_types: Vec<DataType>,
33
34    mode: NowMode,
35    eval_error_report: ActorEvalErrorReport,
36
37    /// Receiver of barrier channel.
38    barrier_receiver: UnboundedReceiver<Barrier>,
39
40    state_table: StateTable<S>,
41
42    progress_ratio: Option<f32>,
43
44    barrier_interval_ms: u32,
45}
46
47pub enum NowMode {
48    /// Emit current timestamp on startup, update it on barrier.
49    UpdateCurrent,
50    /// Generate a series of timestamps starting from `start_timestamp` with `interval`.
51    /// Keep generating new timestamps on barrier.
52    GenerateSeries {
53        start_timestamp: Timestamptz,
54        interval: Interval,
55    },
56}
57
58enum ModeVars {
59    UpdateCurrent,
60    GenerateSeries {
61        chunk_builder: StreamChunkBuilder,
62        add_interval_expr: NonStrictExpression,
63    },
64}
65
66impl<S: StateStore> NowExecutor<S> {
67    pub fn new(
68        data_types: Vec<DataType>,
69        mode: NowMode,
70        eval_error_report: ActorEvalErrorReport,
71        barrier_receiver: UnboundedReceiver<Barrier>,
72        state_table: StateTable<S>,
73        progress_ratio: Option<f32>,
74        barrier_interval_ms: u32,
75    ) -> Self {
76        Self {
77            data_types,
78            mode,
79            eval_error_report,
80            barrier_receiver,
81            state_table,
82            progress_ratio,
83            barrier_interval_ms,
84        }
85    }
86
87    #[try_stream(ok = Message, error = StreamExecutorError)]
88    async fn execute_inner(self) {
89        let Self {
90            data_types,
91            mode,
92            eval_error_report,
93            barrier_receiver,
94            mut state_table,
95            progress_ratio,
96            barrier_interval_ms,
97        } = self;
98
99        info!(
100            "NowExecutor started. progress_ratio: {:?}, barrier_interval_ms: {:?}",
101            progress_ratio, barrier_interval_ms
102        );
103
104        let max_chunk_size = crate::config::chunk_size();
105
106        // Whether the executor is paused.
107        let mut paused = false;
108        // The last timestamp **sent** to the downstream.
109        let mut last_timestamp_datum: Datum = None;
110
111        // Whether the first barrier is handled and `last_timestamp` is initialized.
112        let mut initialized = false;
113
114        let mut mode_vars = match &mode {
115            NowMode::UpdateCurrent => ModeVars::UpdateCurrent,
116            NowMode::GenerateSeries { interval, .. } => {
117                // in most cases there won't be more than one row except for the first time
118                let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1));
119                let add_interval_expr =
120                    build_add_interval_expr_captured(*interval, eval_error_report)?;
121                ModeVars::GenerateSeries {
122                    chunk_builder,
123                    add_interval_expr,
124                }
125            }
126        };
127
128        const MAX_MERGE_BARRIER_SIZE: usize = 64;
129
130        #[for_await]
131        for barriers in
132            UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE)
133        {
134            let mut curr_timestamp_datum: Datum = None;
135            if barriers.len() > 1 {
136                warn!(
137                    "handle multiple barriers at once in now executor: {}",
138                    barriers.len()
139                );
140            }
141            for barrier in barriers {
142                let curr_epoch = barrier.get_curr_epoch();
143                let new_timestamp = curr_epoch.as_timestamptz();
144                let pause_mutation =
145                    barrier
146                        .mutation
147                        .as_deref()
148                        .and_then(|mutation| match mutation {
149                            Mutation::Pause => Some(true),
150                            Mutation::Resume => Some(false),
151                            _ => None,
152                        });
153
154                if !initialized {
155                    let first_epoch = barrier.epoch;
156                    let is_pause_on_startup = barrier.is_pause_on_startup();
157                    yield Message::Barrier(barrier);
158                    // Handle the initial barrier.
159                    state_table.init_epoch(first_epoch).await?;
160                    last_timestamp_datum = state_table.get_from_one_value_table().await?;
161                    paused = is_pause_on_startup;
162                    initialized = true;
163                } else {
164                    state_table
165                        .commit_assert_no_update_vnode_bitmap(barrier.epoch)
166                        .await?;
167                    yield Message::Barrier(barrier);
168                }
169
170                // Extract timestamp from the current epoch.
171                if let Some(datum) = &last_timestamp_datum
172                    && let Some(progress_ratio) = progress_ratio
173                    && progress_ratio > 1.0
174                {
175                    let last_timestamp = datum.as_timestamptz();
176                    // curr_timestamp = min(last_timestamp + barrier_interval * progress_ratio, timestamp from epoch)
177                    // to avoid having a big gap between the last timestamp and the current timestamp,
178                    // which may cause excessive changes in downstream dynamic filter
179                    let progress_timestamp = last_timestamp
180                        .timestamp_millis()
181                        .checked_add((barrier_interval_ms as f32 * progress_ratio).ceil() as i64)
182                        .expect("progress_timestamp is out of i64 range");
183                    let adjusted_timestamp = if progress_timestamp
184                        < new_timestamp.timestamp_millis()
185                    {
186                        debug!(
187                            "adjusted next now timestamp from {} to {}. curr_epoch: {}, barrier_interval_ms: {}, progress_ratio: {}",
188                            new_timestamp.timestamp_millis(),
189                            progress_timestamp,
190                            curr_epoch,
191                            barrier_interval_ms,
192                            progress_ratio
193                        );
194                        Timestamptz::from_millis(progress_timestamp)
195                            .expect("progress_timestamp is out of timestamptz range")
196                    } else {
197                        new_timestamp
198                    };
199                    curr_timestamp_datum = Some(adjusted_timestamp.into());
200                } else {
201                    curr_timestamp_datum = Some(new_timestamp.into());
202                }
203
204                // Update paused state.
205                if let Some(pause_mutation) = pause_mutation {
206                    paused = pause_mutation;
207                }
208            }
209
210            // Do not yield any messages if paused.
211            if paused {
212                continue;
213            }
214
215            match (&mode, &mut mode_vars) {
216                (NowMode::UpdateCurrent, ModeVars::UpdateCurrent) => {
217                    let chunk = if last_timestamp_datum.is_some() {
218                        let last_row = row::once(&last_timestamp_datum);
219                        let row = row::once(&curr_timestamp_datum);
220                        state_table.update(last_row, row);
221
222                        StreamChunk::from_rows(
223                            &[(Op::Delete, last_row), (Op::Insert, row)],
224                            &data_types,
225                        )
226                    } else {
227                        let row = row::once(&curr_timestamp_datum);
228                        state_table.insert(row);
229
230                        StreamChunk::from_rows(&[(Op::Insert, row)], &data_types)
231                    };
232
233                    yield Message::Chunk(chunk);
234                    last_timestamp_datum.clone_from(&curr_timestamp_datum)
235                }
236                (
237                    &NowMode::GenerateSeries {
238                        start_timestamp, ..
239                    },
240                    &mut ModeVars::GenerateSeries {
241                        ref mut chunk_builder,
242                        ref add_interval_expr,
243                    },
244                ) => {
245                    if last_timestamp_datum.is_none() {
246                        // We haven't emit any timestamp yet. Let's emit the first one and populate the state table.
247                        let first = Some(start_timestamp.into());
248                        let first_row = row::once(&first);
249                        let _ = chunk_builder.append_row(Op::Insert, first_row);
250                        state_table.insert(first_row);
251                        last_timestamp_datum = first;
252                    }
253
254                    // Now let's step through the timestamps from the last timestamp to the current timestamp.
255                    // We use `last_row` as a temporary cursor to track the progress, and won't touch `last_timestamp`
256                    // until the end of the loop, so that `last_timestamp` is always synced with the state table.
257                    let mut last_row = OwnedRow::new(vec![last_timestamp_datum.clone()]);
258
259                    loop {
260                        if chunk_builder.size() >= max_chunk_size {
261                            // Manually yield the chunk when size exceeds the limit. We don't want to use chunk builder
262                            // with limited size here because the initial capacity can be too large for most cases.
263                            // Basically only the first several chunks can potentially exceed the `max_chunk_size`.
264                            if let Some(chunk) = chunk_builder.take() {
265                                yield Message::Chunk(chunk);
266                            }
267                        }
268
269                        let next = add_interval_expr.eval_row_infallible(&last_row).await;
270                        if DefaultOrdered(next.to_datum_ref())
271                            > DefaultOrdered(curr_timestamp_datum.to_datum_ref())
272                        {
273                            // We only increase the timestamp to the current timestamp.
274                            break;
275                        }
276
277                        let next_row = OwnedRow::new(vec![next]);
278                        let _ = chunk_builder.append_row(Op::Insert, &next_row);
279                        last_row = next_row;
280                    }
281
282                    if let Some(chunk) = chunk_builder.take() {
283                        yield Message::Chunk(chunk);
284                    }
285
286                    // Update the last timestamp.
287                    state_table.update(row::once(&last_timestamp_datum), &last_row);
288                    last_timestamp_datum = last_row
289                        .into_inner()
290                        .into_vec()
291                        .into_iter()
292                        .exactly_one()
293                        .unwrap();
294                }
295                _ => unreachable!(),
296            }
297
298            yield Message::Watermark(Watermark::new(
299                0,
300                DataType::Timestamptz,
301                curr_timestamp_datum.unwrap(),
302            ));
303        }
304    }
305}
306
307impl<S: StateStore> Execute for NowExecutor<S> {
308    fn execute(self: Box<Self>) -> BoxedMessageStream {
309        self.execute_inner().boxed()
310    }
311}
312
313#[capture_context(TIME_ZONE)]
314pub fn build_add_interval_expr(
315    time_zone: &str,
316    interval: Interval,
317    eval_error_report: impl EvalErrorReport + 'static,
318) -> risingwave_expr::Result<NonStrictExpression> {
319    let timestamptz_input = InputRefExpression::new(DataType::Timestamptz, 0);
320    let interval = LiteralExpression::new(DataType::Interval, Some(interval.into()));
321    let time_zone = LiteralExpression::new(DataType::Varchar, Some(time_zone.into()));
322
323    use risingwave_pb::expr::expr_node::PbType as PbExprType;
324    build_func_non_strict(
325        PbExprType::AddWithTimeZone,
326        DataType::Timestamptz,
327        vec![
328            timestamptz_input.boxed(),
329            interval.boxed(),
330            time_zone.boxed(),
331        ],
332        eval_error_report,
333    )
334}
335
336#[cfg(test)]
337mod tests {
338    use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
339    use risingwave_common::test_prelude::StreamChunkTestExt;
340    use risingwave_common::types::test_utils::IntervalTestExt;
341    use risingwave_common::util::epoch::test_epoch;
342    use risingwave_storage::memory::MemoryStateStore;
343    use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
344
345    use super::*;
346    use crate::common::table::test_utils::gen_pbtable;
347    use crate::executor::test_utils::StreamExecutorTestExt;
348
349    #[tokio::test]
350    async fn test_now() -> StreamExecutorResult<()> {
351        let state_store = create_state_store();
352        let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
353
354        // Init barrier
355        tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap();
356
357        // Consume the barrier
358        now.next_unwrap_ready_barrier()?;
359
360        // Consume the data chunk
361        let chunk_msg = now.next_unwrap_ready_chunk()?;
362
363        assert_eq!(
364            chunk_msg.compact(),
365            StreamChunk::from_pretty(
366                " TZ
367                + 2021-04-01T00:00:00.001Z"
368            )
369        );
370
371        // Consume the watermark
372        let watermark = now.next_unwrap_ready_watermark()?;
373
374        assert_eq!(
375            watermark,
376            Watermark::new(
377                0,
378                DataType::Timestamptz,
379                ScalarImpl::Timestamptz("2021-04-01T00:00:00.001Z".parse().unwrap())
380            )
381        );
382
383        tx.send(Barrier::with_prev_epoch_for_test(
384            test_epoch(2),
385            test_epoch(1),
386        ))
387        .unwrap();
388
389        // Consume the barrier
390        now.next_unwrap_ready_barrier()?;
391
392        // Consume the data chunk
393        let chunk_msg = now.next_unwrap_ready_chunk()?;
394
395        assert_eq!(
396            chunk_msg.compact(),
397            StreamChunk::from_pretty(
398                " TZ
399                - 2021-04-01T00:00:00.001Z
400                + 2021-04-01T00:00:00.002Z"
401            )
402        );
403
404        // Consume the watermark
405        let watermark = now.next_unwrap_ready_watermark()?;
406
407        assert_eq!(
408            watermark,
409            Watermark::new(
410                0,
411                DataType::Timestamptz,
412                ScalarImpl::Timestamptz("2021-04-01T00:00:00.002Z".parse().unwrap())
413            )
414        );
415
416        // No more messages until the next barrier
417        now.next_unwrap_pending();
418
419        // Recovery
420        drop((tx, now));
421        let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
422        tx.send(Barrier::with_prev_epoch_for_test(
423            test_epoch(3),
424            test_epoch(1),
425        ))
426        .unwrap();
427
428        // Consume the barrier
429        now.next_unwrap_ready_barrier()?;
430
431        // Consume the data chunk
432        let chunk_msg = now.next_unwrap_ready_chunk()?;
433        assert_eq!(
434            chunk_msg.compact(),
435            // the last chunk was not checkpointed so the deleted old value should be `001`
436            StreamChunk::from_pretty(
437                " TZ
438                - 2021-04-01T00:00:00.001Z
439                + 2021-04-01T00:00:00.003Z"
440            )
441        );
442
443        // Consume the watermark
444        let watermark = now.next_unwrap_ready_watermark()?;
445
446        assert_eq!(
447            watermark,
448            Watermark::new(
449                0,
450                DataType::Timestamptz,
451                ScalarImpl::Timestamptz("2021-04-01T00:00:00.003Z".parse().unwrap())
452            )
453        );
454
455        // Recovery with paused
456        drop((tx, now));
457        let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
458        tx.send(
459            Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(1))
460                .with_mutation(Mutation::Pause),
461        )
462        .unwrap();
463
464        // Consume the barrier
465        now.next_unwrap_ready_barrier()?;
466
467        // There should be no messages until `Resume`
468        now.next_unwrap_pending();
469
470        // Resume barrier
471        tx.send(
472            Barrier::with_prev_epoch_for_test(test_epoch(5), test_epoch(4))
473                .with_mutation(Mutation::Resume),
474        )
475        .unwrap();
476
477        // Consume the barrier
478        now.next_unwrap_ready_barrier()?;
479
480        // Consume the data chunk
481        let chunk_msg = now.next_unwrap_ready_chunk()?;
482        assert_eq!(
483            chunk_msg.compact(),
484            StreamChunk::from_pretty(
485                " TZ
486                - 2021-04-01T00:00:00.001Z
487                + 2021-04-01T00:00:00.005Z"
488            )
489        );
490
491        // Consume the watermark
492        let watermark = now.next_unwrap_ready_watermark()?;
493
494        assert_eq!(
495            watermark,
496            Watermark::new(
497                0,
498                DataType::Timestamptz,
499                ScalarImpl::Timestamptz("2021-04-01T00:00:00.005Z".parse().unwrap())
500            )
501        );
502
503        Ok(())
504    }
505
506    #[tokio::test]
507    async fn test_now_start_with_paused() -> StreamExecutorResult<()> {
508        let state_store = create_state_store();
509        let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
510
511        // Init barrier
512        tx.send(Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Pause))
513            .unwrap();
514
515        // Consume the barrier
516        now.next_unwrap_ready_barrier()?;
517
518        // There should be no messages until `Resume`
519        now.next_unwrap_pending();
520
521        // Resume barrier
522        tx.send(
523            Barrier::with_prev_epoch_for_test(test_epoch(2), test_epoch(1))
524                .with_mutation(Mutation::Resume),
525        )
526        .unwrap();
527
528        // Consume the barrier
529        now.next_unwrap_ready_barrier()?;
530
531        // Consume the data chunk
532        let chunk_msg = now.next_unwrap_ready_chunk()?;
533
534        assert_eq!(
535            chunk_msg.compact(),
536            StreamChunk::from_pretty(
537                " TZ
538                + 2021-04-01T00:00:00.002Z" // <- the timestamp is extracted from the current epoch
539            )
540        );
541
542        // Consume the watermark
543        let watermark = now.next_unwrap_ready_watermark()?;
544
545        assert_eq!(
546            watermark,
547            Watermark::new(
548                0,
549                DataType::Timestamptz,
550                ScalarImpl::Timestamptz("2021-04-01T00:00:00.002Z".parse().unwrap())
551            )
552        );
553
554        // No more messages until the next barrier
555        now.next_unwrap_pending();
556
557        Ok(())
558    }
559
560    #[tokio::test]
561    async fn test_now_generate_series() -> StreamExecutorResult<()> {
562        TIME_ZONE::scope("UTC".to_owned(), test_now_generate_series_inner()).await
563    }
564
565    #[tokio::test]
566    async fn test_now_with_progress_ratio() -> StreamExecutorResult<()> {
567        let state_store = create_state_store();
568        let progress_ratio = Some(2.0);
569        let (tx, mut now) = create_executor_with_progress_ratio(
570            NowMode::UpdateCurrent,
571            &state_store,
572            progress_ratio,
573        )
574        .await;
575
576        // Init barrier at epoch 1 (timestamp 2021-04-01T00:00:00.001Z)
577        tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap();
578
579        // Consume the barrier
580        now.next_unwrap_ready_barrier()?;
581
582        // Consume the data chunk
583        let chunk_msg = now.next_unwrap_ready_chunk()?;
584
585        assert_eq!(
586            chunk_msg.compact(),
587            StreamChunk::from_pretty(
588                " TZ
589                + 2021-04-01T00:00:00.001Z"
590            )
591        );
592
593        // Consume the watermark
594        let watermark = now.next_unwrap_ready_watermark()?;
595
596        assert_eq!(
597            watermark,
598            Watermark::new(
599                0,
600                DataType::Timestamptz,
601                ScalarImpl::Timestamptz("2021-04-01T00:00:00.001Z".parse().unwrap())
602            )
603        );
604
605        // Send next barrier at epoch 5000 (timestamp 2021-04-01T00:00:00.005Z)
606        // With progress_ratio = 2.0 and barrier_interval_ms = 1000,
607        // adjusted timestamp should be: 1 + (1000 * 2.0) = 2001ms = 2021-04-01T00:00:02.001Z
608        // Since 2001 < 5000, the adjusted timestamp should be used
609        tx.send(Barrier::with_prev_epoch_for_test(
610            test_epoch(5000),
611            test_epoch(1),
612        ))
613        .unwrap();
614
615        // Consume the barrier
616        now.next_unwrap_ready_barrier()?;
617
618        // Consume the data chunk - should show adjusted timestamp
619        let chunk_msg = now.next_unwrap_ready_chunk()?;
620
621        assert_eq!(
622            chunk_msg.compact(),
623            StreamChunk::from_pretty(
624                " TZ
625                - 2021-04-01T00:00:00.001Z
626                + 2021-04-01T00:00:02.001Z" // adjusted timestamp
627            )
628        );
629
630        // Consume the watermark
631        let watermark = now.next_unwrap_ready_watermark()?;
632
633        assert_eq!(
634            watermark,
635            Watermark::new(
636                0,
637                DataType::Timestamptz,
638                ScalarImpl::Timestamptz("2021-04-01T00:00:02.001Z".parse().unwrap())
639            )
640        );
641
642        // Send another barrier at epoch 10000 (timestamp 2021-04-01T00:00:00.010Z)
643        // With progress_ratio = 2.0, adjusted timestamp should be: 2001 + (1000 * 2.0) = 4001ms
644        // Since 4001 < 10000, the adjusted timestamp should be used again
645        tx.send(Barrier::with_prev_epoch_for_test(
646            test_epoch(10000),
647            test_epoch(5000),
648        ))
649        .unwrap();
650
651        // Consume the barrier
652        now.next_unwrap_ready_barrier()?;
653
654        // Consume the data chunk
655        let chunk_msg = now.next_unwrap_ready_chunk()?;
656
657        assert_eq!(
658            chunk_msg.compact(),
659            StreamChunk::from_pretty(
660                " TZ
661                - 2021-04-01T00:00:02.001Z
662                + 2021-04-01T00:00:04.001Z" // adjusted timestamp
663            )
664        );
665
666        // Consume the watermark
667        let watermark = now.next_unwrap_ready_watermark()?;
668
669        assert_eq!(
670            watermark,
671            Watermark::new(
672                0,
673                DataType::Timestamptz,
674                ScalarImpl::Timestamptz("2021-04-01T00:00:04.001Z".parse().unwrap())
675            )
676        );
677
678        // Send another barrier at epoch 15 (timestamp 2021-04-01T00:00:00.015Z)
679        // With progress_ratio = 2.0, adjusted timestamp should be: 4001 + (1000 * 2.0) = 6001ms
680        // Since 6001 < 15, the adjusted timestamp should be used
681        tx.send(Barrier::with_prev_epoch_for_test(
682            test_epoch(15000),
683            test_epoch(10000),
684        ))
685        .unwrap();
686
687        // Consume the barrier
688        now.next_unwrap_ready_barrier()?;
689
690        // Consume the data chunk
691        let chunk_msg = now.next_unwrap_ready_chunk()?;
692
693        assert_eq!(
694            chunk_msg.compact(),
695            StreamChunk::from_pretty(
696                " TZ
697                - 2021-04-01T00:00:04.001Z
698                + 2021-04-01T00:00:06.001Z" // adjusted timestamp
699            )
700        );
701
702        // Consume the watermark
703        let watermark = now.next_unwrap_ready_watermark()?;
704
705        assert_eq!(
706            watermark,
707            Watermark::new(
708                0,
709                DataType::Timestamptz,
710                ScalarImpl::Timestamptz("2021-04-01T00:00:06.001Z".parse().unwrap())
711            )
712        );
713
714        // Now send a barrier at epoch 20 (timestamp 2021-04-01T00:00:00.020Z)
715        // With progress_ratio = 2.0, adjusted timestamp should be: 6001 + (1000 * 2.0) = 8001ms
716        // Since 8001 < 20, the adjusted timestamp should be used
717        tx.send(Barrier::with_prev_epoch_for_test(
718            test_epoch(20000),
719            test_epoch(15000),
720        ))
721        .unwrap();
722
723        // Consume the barrier
724        now.next_unwrap_ready_barrier()?;
725
726        // Consume the data chunk
727        let chunk_msg = now.next_unwrap_ready_chunk()?;
728
729        assert_eq!(
730            chunk_msg.compact(),
731            StreamChunk::from_pretty(
732                " TZ
733                - 2021-04-01T00:00:06.001Z
734                + 2021-04-01T00:00:08.001Z" // adjusted timestamp
735            )
736        );
737
738        // Consume the watermark
739        let watermark = now.next_unwrap_ready_watermark()?;
740
741        assert_eq!(
742            watermark,
743            Watermark::new(
744                0,
745                DataType::Timestamptz,
746                ScalarImpl::Timestamptz("2021-04-01T00:00:08.001Z".parse().unwrap())
747            )
748        );
749
750        // Test case where epoch timestamp is smaller than adjusted timestamp
751        // Send barrier at epoch 25 (timestamp 2021-04-01T00:00:00.025Z)
752        // Adjusted timestamp would be: 8001 + (1000 * 2.0) = 10001ms = 2021-04-01T00:00:10.001Z
753        // Since 10001 < 25, use adjusted timestamp
754        tx.send(Barrier::with_prev_epoch_for_test(
755            test_epoch(25000),
756            test_epoch(20000),
757        ))
758        .unwrap();
759
760        // Consume the barrier
761        now.next_unwrap_ready_barrier()?;
762
763        // Consume the data chunk
764        let chunk_msg = now.next_unwrap_ready_chunk()?;
765
766        assert_eq!(
767            chunk_msg.compact(),
768            StreamChunk::from_pretty(
769                " TZ
770                - 2021-04-01T00:00:08.001Z
771                + 2021-04-01T00:00:10.001Z" // adjusted timestamp
772            )
773        );
774
775        // Consume the watermark
776        let watermark = now.next_unwrap_ready_watermark()?;
777
778        assert_eq!(
779            watermark,
780            Watermark::new(
781                0,
782                DataType::Timestamptz,
783                ScalarImpl::Timestamptz("2021-04-01T00:00:10.001Z".parse().unwrap())
784            )
785        );
786
787        // Finally test when epoch timestamp is larger than adjusted timestamp
788        // Send barrier at epoch 30 (timestamp 2021-04-01T00:00:00.030Z)
789        // Adjusted timestamp would be: 10001 + (1000 * 2.0) = 12001ms = 2021-04-01T00:00:12.001Z
790        // Since 12001 < 30, use adjusted timestamp
791        tx.send(Barrier::with_prev_epoch_for_test(
792            test_epoch(30000),
793            test_epoch(25000),
794        ))
795        .unwrap();
796
797        // Consume the barrier
798        now.next_unwrap_ready_barrier()?;
799
800        // Consume the data chunk
801        let chunk_msg = now.next_unwrap_ready_chunk()?;
802
803        assert_eq!(
804            chunk_msg.compact(),
805            StreamChunk::from_pretty(
806                " TZ
807                - 2021-04-01T00:00:10.001Z
808                + 2021-04-01T00:00:12.001Z" // adjusted timestamp
809            )
810        );
811
812        // Consume the watermark
813        let watermark = now.next_unwrap_ready_watermark()?;
814
815        assert_eq!(
816            watermark,
817            Watermark::new(
818                0,
819                DataType::Timestamptz,
820                ScalarImpl::Timestamptz("2021-04-01T00:00:12.001Z".parse().unwrap())
821            )
822        );
823
824        Ok(())
825    }
826
827    async fn test_now_generate_series_inner() -> StreamExecutorResult<()> {
828        let start_timestamp = Timestamptz::from_secs(1617235190).unwrap(); // 2021-03-31 23:59:50 UTC
829        let interval = Interval::from_millis(1000); // 1s interval
830
831        let state_store = create_state_store();
832        let (tx, mut now) = create_executor(
833            NowMode::GenerateSeries {
834                start_timestamp,
835                interval,
836            },
837            &state_store,
838        )
839        .await;
840
841        // Init barrier
842        tx.send(Barrier::new_test_barrier(test_epoch(1000)))
843            .unwrap();
844        now.next_unwrap_ready_barrier()?;
845
846        // Initial timestamps
847        let chunk = now.next_unwrap_ready_chunk()?;
848        assert_eq!(chunk.cardinality(), 12); // seconds from 23:59:50 to 00:00:01 (inclusive)
849
850        assert_eq!(
851            now.next_unwrap_ready_watermark()?,
852            Watermark::new(
853                0,
854                DataType::Timestamptz,
855                ScalarImpl::Timestamptz("2021-04-01T00:00:01.000Z".parse().unwrap())
856            )
857        );
858
859        tx.send(Barrier::with_prev_epoch_for_test(
860            test_epoch(2000),
861            test_epoch(1000),
862        ))
863        .unwrap();
864        tx.send(Barrier::with_prev_epoch_for_test(
865            test_epoch(3000),
866            test_epoch(2000),
867        ))
868        .unwrap();
869
870        now.next_unwrap_ready_barrier()?;
871        now.next_unwrap_ready_barrier()?;
872
873        let chunk = now.next_unwrap_ready_chunk()?;
874        assert_eq!(
875            chunk.compact(),
876            StreamChunk::from_pretty(
877                " TZ
878                + 2021-04-01T00:00:02.000Z
879                + 2021-04-01T00:00:03.000Z"
880            )
881        );
882
883        let watermark = now.next_unwrap_ready_watermark()?;
884        assert_eq!(
885            watermark,
886            Watermark::new(
887                0,
888                DataType::Timestamptz,
889                ScalarImpl::Timestamptz("2021-04-01T00:00:03.000Z".parse().unwrap())
890            )
891        );
892
893        // Recovery
894        drop((tx, now));
895        let (tx, mut now) = create_executor(
896            NowMode::GenerateSeries {
897                start_timestamp,
898                interval,
899            },
900            &state_store,
901        )
902        .await;
903
904        tx.send(Barrier::with_prev_epoch_for_test(
905            test_epoch(4000),
906            test_epoch(2000),
907        ))
908        .unwrap();
909
910        now.next_unwrap_ready_barrier()?;
911
912        let chunk = now.next_unwrap_ready_chunk()?;
913        assert_eq!(
914            chunk.compact(),
915            StreamChunk::from_pretty(
916                " TZ
917                + 2021-04-01T00:00:02.000Z
918                + 2021-04-01T00:00:03.000Z
919                + 2021-04-01T00:00:04.000Z"
920            )
921        );
922
923        let watermark = now.next_unwrap_ready_watermark()?;
924        assert_eq!(
925            watermark,
926            Watermark::new(
927                0,
928                DataType::Timestamptz,
929                ScalarImpl::Timestamptz("2021-04-01T00:00:04.000Z".parse().unwrap())
930            )
931        );
932
933        Ok(())
934    }
935
936    fn create_state_store() -> MemoryStateStore {
937        MemoryStateStore::new()
938    }
939
940    async fn create_executor_with_progress_ratio(
941        mode: NowMode,
942        state_store: &MemoryStateStore,
943        progress_ratio: Option<f32>,
944    ) -> (UnboundedSender<Barrier>, BoxedMessageStream) {
945        let table_id = TableId::new(1);
946        let column_descs = vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamptz)];
947        let state_table = StateTable::from_table_catalog(
948            &gen_pbtable(table_id, column_descs, vec![], vec![], 0),
949            state_store.clone(),
950            None,
951        )
952        .await;
953
954        let (sender, barrier_receiver) = unbounded_channel();
955
956        let eval_error_report = ActorEvalErrorReport {
957            actor_context: ActorContext::for_test(123),
958            identity: "NowExecutor".into(),
959        };
960        let barrier_interval_ms = 1000;
961        let now_executor = NowExecutor::new(
962            vec![DataType::Timestamptz],
963            mode,
964            eval_error_report,
965            barrier_receiver,
966            state_table,
967            progress_ratio,
968            barrier_interval_ms,
969        );
970        (sender, now_executor.boxed().execute())
971    }
972
973    async fn create_executor(
974        mode: NowMode,
975        state_store: &MemoryStateStore,
976    ) -> (UnboundedSender<Barrier>, BoxedMessageStream) {
977        create_executor_with_progress_ratio(mode, state_store, None).await
978    }
979}