Skip to main content

risingwave_stream/executor/
now.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::array::Op;
17use risingwave_common::row;
18use risingwave_common::types::{DefaultOrdered, Interval, Timestamptz, ToDatumRef};
19use risingwave_expr::capture_context;
20use risingwave_expr::expr::{
21    EvalErrorReport, ExpressionBoxExt, InputRefExpression, LiteralExpression, NonStrictExpression,
22    build_func_non_strict,
23};
24use risingwave_expr::expr_context::TIME_ZONE;
25use tokio::sync::mpsc::UnboundedReceiver;
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28use crate::executor::prelude::*;
29use crate::task::ActorEvalErrorReport;
30
31pub struct NowExecutor<S: StateStore> {
32    data_types: Vec<DataType>,
33
34    mode: NowMode,
35    eval_error_report: ActorEvalErrorReport,
36
37    /// Receiver of barrier channel.
38    barrier_receiver: UnboundedReceiver<Barrier>,
39
40    state_table: StateTable<S>,
41
42    progress_ratio: Option<f32>,
43
44    barrier_interval_ms: u32,
45}
46
47pub enum NowMode {
48    /// Emit current timestamp on startup, update it on barrier.
49    UpdateCurrent,
50    /// Generate a series of timestamps starting from `start_timestamp` with `interval`.
51    /// Keep generating new timestamps on barrier.
52    GenerateSeries {
53        start_timestamp: Timestamptz,
54        interval: Interval,
55    },
56}
57
58enum ModeVars {
59    UpdateCurrent,
60    GenerateSeries {
61        chunk_builder: StreamChunkBuilder,
62        add_interval_expr: NonStrictExpression,
63    },
64}
65
66impl<S: StateStore> NowExecutor<S> {
67    pub fn new(
68        data_types: Vec<DataType>,
69        mode: NowMode,
70        eval_error_report: ActorEvalErrorReport,
71        barrier_receiver: UnboundedReceiver<Barrier>,
72        state_table: StateTable<S>,
73        progress_ratio: Option<f32>,
74        barrier_interval_ms: u32,
75    ) -> Self {
76        Self {
77            data_types,
78            mode,
79            eval_error_report,
80            barrier_receiver,
81            state_table,
82            progress_ratio,
83            barrier_interval_ms,
84        }
85    }
86
87    #[try_stream(ok = Message, error = StreamExecutorError)]
88    async fn execute_inner(self) {
89        let Self {
90            data_types,
91            mode,
92            eval_error_report,
93            barrier_receiver,
94            mut state_table,
95            progress_ratio,
96            barrier_interval_ms,
97        } = self;
98
99        info!(
100            "NowExecutor started. progress_ratio: {:?}, barrier_interval_ms: {:?}",
101            progress_ratio, barrier_interval_ms
102        );
103
104        let max_chunk_size = crate::config::chunk_size();
105
106        // Whether the executor is paused.
107        let mut paused = false;
108        // The last timestamp **sent** to the downstream.
109        let mut last_timestamp_datum: Datum = None;
110
111        // Whether the first barrier is handled and `last_timestamp` is initialized.
112        let mut initialized = false;
113
114        let mut mode_vars = match &mode {
115            NowMode::UpdateCurrent => ModeVars::UpdateCurrent,
116            NowMode::GenerateSeries { interval, .. } => {
117                // in most cases there won't be more than one row except for the first time
118                let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1));
119                let add_interval_expr =
120                    build_add_interval_expr_captured(*interval, eval_error_report)?;
121                ModeVars::GenerateSeries {
122                    chunk_builder,
123                    add_interval_expr,
124                }
125            }
126        };
127
128        const MAX_MERGE_BARRIER_SIZE: usize = 64;
129
130        #[for_await]
131        for barriers in
132            UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE)
133        {
134            let mut curr_timestamp_datum: Datum = None;
135            if barriers.len() > 1 {
136                warn!(
137                    "handle multiple barriers at once in now executor: {}",
138                    barriers.len()
139                );
140            }
141            for barrier in barriers {
142                let curr_epoch = barrier.get_curr_epoch();
143                let new_timestamp = curr_epoch.as_timestamptz();
144                let pause_mutation =
145                    barrier
146                        .mutation
147                        .as_deref()
148                        .and_then(|mutation| match mutation {
149                            Mutation::Pause => Some(true),
150                            Mutation::Resume => Some(false),
151                            _ => None,
152                        });
153
154                if !initialized {
155                    let first_epoch = barrier.epoch;
156                    let is_pause_on_startup = barrier.is_pause_on_startup();
157                    yield Message::Barrier(barrier);
158                    // Handle the initial barrier.
159                    state_table.init_epoch(first_epoch).await?;
160                    last_timestamp_datum = state_table.get_from_one_value_table().await?;
161                    paused = is_pause_on_startup;
162                    initialized = true;
163                } else {
164                    state_table
165                        .commit_assert_no_update_vnode_bitmap(barrier.epoch)
166                        .await?;
167                    yield Message::Barrier(barrier);
168                }
169
170                // Extract timestamp from the current epoch.
171                if let Some(datum) = &last_timestamp_datum
172                    && let Some(progress_ratio) = progress_ratio
173                    && progress_ratio > 1.0
174                {
175                    let last_timestamp = datum.as_timestamptz();
176                    // curr_timestamp = min(last_timestamp + barrier_interval * progress_ratio, timestamp from epoch)
177                    // to avoid having a big gap between the last timestamp and the current timestamp,
178                    // which may cause excessive changes in downstream dynamic filter
179                    let progress_timestamp = last_timestamp
180                        .timestamp_millis()
181                        .checked_add((barrier_interval_ms as f32 * progress_ratio).ceil() as i64)
182                        .expect("progress_timestamp is out of i64 range");
183                    let adjusted_timestamp = if progress_timestamp
184                        < new_timestamp.timestamp_millis()
185                    {
186                        debug!(
187                            "adjusted next now timestamp from {} to {}. curr_epoch: {}, barrier_interval_ms: {}, progress_ratio: {}",
188                            new_timestamp.timestamp_millis(),
189                            progress_timestamp,
190                            curr_epoch,
191                            barrier_interval_ms,
192                            progress_ratio
193                        );
194                        Timestamptz::from_millis(progress_timestamp)
195                            .expect("progress_timestamp is out of timestamptz range")
196                    } else {
197                        new_timestamp
198                    };
199                    curr_timestamp_datum = Some(adjusted_timestamp.into());
200                } else {
201                    curr_timestamp_datum = Some(new_timestamp.into());
202                }
203
204                // Update paused state.
205                if let Some(pause_mutation) = pause_mutation {
206                    paused = pause_mutation;
207                }
208            }
209
210            // Do not yield any messages if paused.
211            if paused {
212                continue;
213            }
214
215            match (&mode, &mut mode_vars) {
216                (NowMode::UpdateCurrent, ModeVars::UpdateCurrent) => {
217                    let chunk = if last_timestamp_datum.is_some() {
218                        let last_row = row::once(&last_timestamp_datum);
219                        let row = row::once(&curr_timestamp_datum);
220                        state_table.update(last_row, row);
221
222                        StreamChunk::from_rows(
223                            &[(Op::Delete, last_row), (Op::Insert, row)],
224                            &data_types,
225                        )
226                    } else {
227                        let row = row::once(&curr_timestamp_datum);
228                        state_table.insert(row);
229
230                        StreamChunk::from_rows(&[(Op::Insert, row)], &data_types)
231                    };
232
233                    yield Message::Chunk(chunk);
234                    last_timestamp_datum.clone_from(&curr_timestamp_datum)
235                }
236                (
237                    &NowMode::GenerateSeries {
238                        start_timestamp, ..
239                    },
240                    &mut ModeVars::GenerateSeries {
241                        ref mut chunk_builder,
242                        ref add_interval_expr,
243                    },
244                ) => {
245                    if last_timestamp_datum.is_none() {
246                        // We haven't emit any timestamp yet. Let's emit the first one and populate the state table.
247                        let first = Some(start_timestamp.into());
248                        let first_row = row::once(&first);
249                        let _ = chunk_builder.append_row(Op::Insert, first_row);
250                        state_table.insert(first_row);
251                        last_timestamp_datum = first;
252                    }
253
254                    // Now let's step through the timestamps from the last timestamp to the current timestamp.
255                    // We use `last_row` as a temporary cursor to track the progress, and won't touch `last_timestamp`
256                    // until the end of the loop, so that `last_timestamp` is always synced with the state table.
257                    let mut last_row = OwnedRow::new(vec![last_timestamp_datum.clone()]);
258
259                    loop {
260                        if chunk_builder.size() >= max_chunk_size {
261                            // Manually yield the chunk when size exceeds the limit. We don't want to use chunk builder
262                            // with limited size here because the initial capacity can be too large for most cases.
263                            // Basically only the first several chunks can potentially exceed the `max_chunk_size`.
264                            if let Some(chunk) = chunk_builder.take() {
265                                yield Message::Chunk(chunk);
266                            }
267                        }
268
269                        let next = add_interval_expr.eval_row_infallible(&last_row).await;
270                        if DefaultOrdered(next.to_datum_ref())
271                            > DefaultOrdered(curr_timestamp_datum.to_datum_ref())
272                        {
273                            // We only increase the timestamp to the current timestamp.
274                            break;
275                        }
276
277                        let next_row = OwnedRow::new(vec![next]);
278                        let _ = chunk_builder.append_row(Op::Insert, &next_row);
279                        last_row = next_row;
280                    }
281
282                    if let Some(chunk) = chunk_builder.take() {
283                        yield Message::Chunk(chunk);
284                    }
285
286                    // Update the last timestamp.
287                    state_table.update(row::once(&last_timestamp_datum), &last_row);
288                    last_timestamp_datum =
289                        Itertools::exactly_one(last_row.into_inner().into_vec().into_iter())
290                            .unwrap();
291                }
292                _ => unreachable!(),
293            }
294
295            yield Message::Watermark(Watermark::new(
296                0,
297                DataType::Timestamptz,
298                curr_timestamp_datum.unwrap(),
299            ));
300        }
301    }
302}
303
304impl<S: StateStore> Execute for NowExecutor<S> {
305    fn execute(self: Box<Self>) -> BoxedMessageStream {
306        self.execute_inner().boxed()
307    }
308}
309
310#[capture_context(TIME_ZONE)]
311pub fn build_add_interval_expr(
312    time_zone: &str,
313    interval: Interval,
314    eval_error_report: impl EvalErrorReport + 'static,
315) -> risingwave_expr::Result<NonStrictExpression> {
316    let timestamptz_input = InputRefExpression::new(DataType::Timestamptz, 0);
317    let interval = LiteralExpression::new(DataType::Interval, Some(interval.into()));
318    let time_zone = LiteralExpression::new(DataType::Varchar, Some(time_zone.into()));
319
320    use risingwave_pb::expr::expr_node::PbType as PbExprType;
321    build_func_non_strict(
322        PbExprType::AddWithTimeZone,
323        DataType::Timestamptz,
324        vec![
325            timestamptz_input.boxed(),
326            interval.boxed(),
327            time_zone.boxed(),
328        ],
329        eval_error_report,
330    )
331}
332
333#[cfg(test)]
334mod tests {
335    use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
336    use risingwave_common::test_prelude::StreamChunkTestExt;
337    use risingwave_common::types::test_utils::IntervalTestExt;
338    use risingwave_common::util::epoch::test_epoch;
339    use risingwave_storage::memory::MemoryStateStore;
340    use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
341
342    use super::*;
343    use crate::common::table::test_utils::gen_pbtable;
344    use crate::executor::test_utils::StreamExecutorTestExt;
345
346    #[tokio::test]
347    async fn test_now() -> StreamExecutorResult<()> {
348        let state_store = create_state_store();
349        let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
350
351        // Init barrier
352        tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap();
353
354        // Consume the barrier
355        now.next_unwrap_ready_barrier()?;
356
357        // Consume the data chunk
358        let chunk_msg = now.next_unwrap_ready_chunk()?;
359
360        assert_eq!(
361            chunk_msg.compact_vis(),
362            StreamChunk::from_pretty(
363                " TZ
364                + 2021-04-01T00:00:00.001Z"
365            )
366        );
367
368        // Consume the watermark
369        let watermark = now.next_unwrap_ready_watermark()?;
370
371        assert_eq!(
372            watermark,
373            Watermark::new(
374                0,
375                DataType::Timestamptz,
376                ScalarImpl::Timestamptz("2021-04-01T00:00:00.001Z".parse().unwrap())
377            )
378        );
379
380        tx.send(Barrier::with_prev_epoch_for_test(
381            test_epoch(2),
382            test_epoch(1),
383        ))
384        .unwrap();
385
386        // Consume the barrier
387        now.next_unwrap_ready_barrier()?;
388
389        // Consume the data chunk
390        let chunk_msg = now.next_unwrap_ready_chunk()?;
391
392        assert_eq!(
393            chunk_msg.compact_vis(),
394            StreamChunk::from_pretty(
395                " TZ
396                - 2021-04-01T00:00:00.001Z
397                + 2021-04-01T00:00:00.002Z"
398            )
399        );
400
401        // Consume the watermark
402        let watermark = now.next_unwrap_ready_watermark()?;
403
404        assert_eq!(
405            watermark,
406            Watermark::new(
407                0,
408                DataType::Timestamptz,
409                ScalarImpl::Timestamptz("2021-04-01T00:00:00.002Z".parse().unwrap())
410            )
411        );
412
413        // No more messages until the next barrier
414        now.next_unwrap_pending();
415
416        // Recovery
417        drop((tx, now));
418        let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
419        tx.send(Barrier::with_prev_epoch_for_test(
420            test_epoch(3),
421            test_epoch(1),
422        ))
423        .unwrap();
424
425        // Consume the barrier
426        now.next_unwrap_ready_barrier()?;
427
428        // Consume the data chunk
429        let chunk_msg = now.next_unwrap_ready_chunk()?;
430        assert_eq!(
431            chunk_msg.compact_vis(),
432            // the last chunk was not checkpointed so the deleted old value should be `001`
433            StreamChunk::from_pretty(
434                " TZ
435                - 2021-04-01T00:00:00.001Z
436                + 2021-04-01T00:00:00.003Z"
437            )
438        );
439
440        // Consume the watermark
441        let watermark = now.next_unwrap_ready_watermark()?;
442
443        assert_eq!(
444            watermark,
445            Watermark::new(
446                0,
447                DataType::Timestamptz,
448                ScalarImpl::Timestamptz("2021-04-01T00:00:00.003Z".parse().unwrap())
449            )
450        );
451
452        // Recovery with paused
453        drop((tx, now));
454        let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
455        tx.send(
456            Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(1))
457                .with_mutation(Mutation::Pause),
458        )
459        .unwrap();
460
461        // Consume the barrier
462        now.next_unwrap_ready_barrier()?;
463
464        // There should be no messages until `Resume`
465        now.next_unwrap_pending();
466
467        // Resume barrier
468        tx.send(
469            Barrier::with_prev_epoch_for_test(test_epoch(5), test_epoch(4))
470                .with_mutation(Mutation::Resume),
471        )
472        .unwrap();
473
474        // Consume the barrier
475        now.next_unwrap_ready_barrier()?;
476
477        // Consume the data chunk
478        let chunk_msg = now.next_unwrap_ready_chunk()?;
479        assert_eq!(
480            chunk_msg.compact_vis(),
481            StreamChunk::from_pretty(
482                " TZ
483                - 2021-04-01T00:00:00.001Z
484                + 2021-04-01T00:00:00.005Z"
485            )
486        );
487
488        // Consume the watermark
489        let watermark = now.next_unwrap_ready_watermark()?;
490
491        assert_eq!(
492            watermark,
493            Watermark::new(
494                0,
495                DataType::Timestamptz,
496                ScalarImpl::Timestamptz("2021-04-01T00:00:00.005Z".parse().unwrap())
497            )
498        );
499
500        Ok(())
501    }
502
503    #[tokio::test]
504    async fn test_now_start_with_paused() -> StreamExecutorResult<()> {
505        let state_store = create_state_store();
506        let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
507
508        // Init barrier
509        tx.send(Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Pause))
510            .unwrap();
511
512        // Consume the barrier
513        now.next_unwrap_ready_barrier()?;
514
515        // There should be no messages until `Resume`
516        now.next_unwrap_pending();
517
518        // Resume barrier
519        tx.send(
520            Barrier::with_prev_epoch_for_test(test_epoch(2), test_epoch(1))
521                .with_mutation(Mutation::Resume),
522        )
523        .unwrap();
524
525        // Consume the barrier
526        now.next_unwrap_ready_barrier()?;
527
528        // Consume the data chunk
529        let chunk_msg = now.next_unwrap_ready_chunk()?;
530
531        assert_eq!(
532            chunk_msg.compact_vis(),
533            StreamChunk::from_pretty(
534                " TZ
535                + 2021-04-01T00:00:00.002Z" // <- the timestamp is extracted from the current epoch
536            )
537        );
538
539        // Consume the watermark
540        let watermark = now.next_unwrap_ready_watermark()?;
541
542        assert_eq!(
543            watermark,
544            Watermark::new(
545                0,
546                DataType::Timestamptz,
547                ScalarImpl::Timestamptz("2021-04-01T00:00:00.002Z".parse().unwrap())
548            )
549        );
550
551        // No more messages until the next barrier
552        now.next_unwrap_pending();
553
554        Ok(())
555    }
556
557    #[tokio::test]
558    async fn test_now_generate_series() -> StreamExecutorResult<()> {
559        TIME_ZONE::scope("UTC".to_owned(), test_now_generate_series_inner()).await
560    }
561
562    #[tokio::test]
563    async fn test_now_with_progress_ratio() -> StreamExecutorResult<()> {
564        let state_store = create_state_store();
565        let progress_ratio = Some(2.0);
566        let (tx, mut now) = create_executor_with_progress_ratio(
567            NowMode::UpdateCurrent,
568            &state_store,
569            progress_ratio,
570        )
571        .await;
572
573        // Init barrier at epoch 1 (timestamp 2021-04-01T00:00:00.001Z)
574        tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap();
575
576        // Consume the barrier
577        now.next_unwrap_ready_barrier()?;
578
579        // Consume the data chunk
580        let chunk_msg = now.next_unwrap_ready_chunk()?;
581
582        assert_eq!(
583            chunk_msg.compact_vis(),
584            StreamChunk::from_pretty(
585                " TZ
586                + 2021-04-01T00:00:00.001Z"
587            )
588        );
589
590        // Consume the watermark
591        let watermark = now.next_unwrap_ready_watermark()?;
592
593        assert_eq!(
594            watermark,
595            Watermark::new(
596                0,
597                DataType::Timestamptz,
598                ScalarImpl::Timestamptz("2021-04-01T00:00:00.001Z".parse().unwrap())
599            )
600        );
601
602        // Send next barrier at epoch 5000 (timestamp 2021-04-01T00:00:00.005Z)
603        // With progress_ratio = 2.0 and barrier_interval_ms = 1000,
604        // adjusted timestamp should be: 1 + (1000 * 2.0) = 2001ms = 2021-04-01T00:00:02.001Z
605        // Since 2001 < 5000, the adjusted timestamp should be used
606        tx.send(Barrier::with_prev_epoch_for_test(
607            test_epoch(5000),
608            test_epoch(1),
609        ))
610        .unwrap();
611
612        // Consume the barrier
613        now.next_unwrap_ready_barrier()?;
614
615        // Consume the data chunk - should show adjusted timestamp
616        let chunk_msg = now.next_unwrap_ready_chunk()?;
617
618        assert_eq!(
619            chunk_msg.compact_vis(),
620            StreamChunk::from_pretty(
621                " TZ
622                - 2021-04-01T00:00:00.001Z
623                + 2021-04-01T00:00:02.001Z" // adjusted timestamp
624            )
625        );
626
627        // Consume the watermark
628        let watermark = now.next_unwrap_ready_watermark()?;
629
630        assert_eq!(
631            watermark,
632            Watermark::new(
633                0,
634                DataType::Timestamptz,
635                ScalarImpl::Timestamptz("2021-04-01T00:00:02.001Z".parse().unwrap())
636            )
637        );
638
639        // Send another barrier at epoch 10000 (timestamp 2021-04-01T00:00:00.010Z)
640        // With progress_ratio = 2.0, adjusted timestamp should be: 2001 + (1000 * 2.0) = 4001ms
641        // Since 4001 < 10000, the adjusted timestamp should be used again
642        tx.send(Barrier::with_prev_epoch_for_test(
643            test_epoch(10000),
644            test_epoch(5000),
645        ))
646        .unwrap();
647
648        // Consume the barrier
649        now.next_unwrap_ready_barrier()?;
650
651        // Consume the data chunk
652        let chunk_msg = now.next_unwrap_ready_chunk()?;
653
654        assert_eq!(
655            chunk_msg.compact_vis(),
656            StreamChunk::from_pretty(
657                " TZ
658                - 2021-04-01T00:00:02.001Z
659                + 2021-04-01T00:00:04.001Z" // adjusted timestamp
660            )
661        );
662
663        // Consume the watermark
664        let watermark = now.next_unwrap_ready_watermark()?;
665
666        assert_eq!(
667            watermark,
668            Watermark::new(
669                0,
670                DataType::Timestamptz,
671                ScalarImpl::Timestamptz("2021-04-01T00:00:04.001Z".parse().unwrap())
672            )
673        );
674
675        // Send another barrier at epoch 15 (timestamp 2021-04-01T00:00:00.015Z)
676        // With progress_ratio = 2.0, adjusted timestamp should be: 4001 + (1000 * 2.0) = 6001ms
677        // Since 6001 < 15, the adjusted timestamp should be used
678        tx.send(Barrier::with_prev_epoch_for_test(
679            test_epoch(15000),
680            test_epoch(10000),
681        ))
682        .unwrap();
683
684        // Consume the barrier
685        now.next_unwrap_ready_barrier()?;
686
687        // Consume the data chunk
688        let chunk_msg = now.next_unwrap_ready_chunk()?;
689
690        assert_eq!(
691            chunk_msg.compact_vis(),
692            StreamChunk::from_pretty(
693                " TZ
694                - 2021-04-01T00:00:04.001Z
695                + 2021-04-01T00:00:06.001Z" // adjusted timestamp
696            )
697        );
698
699        // Consume the watermark
700        let watermark = now.next_unwrap_ready_watermark()?;
701
702        assert_eq!(
703            watermark,
704            Watermark::new(
705                0,
706                DataType::Timestamptz,
707                ScalarImpl::Timestamptz("2021-04-01T00:00:06.001Z".parse().unwrap())
708            )
709        );
710
711        // Now send a barrier at epoch 20 (timestamp 2021-04-01T00:00:00.020Z)
712        // With progress_ratio = 2.0, adjusted timestamp should be: 6001 + (1000 * 2.0) = 8001ms
713        // Since 8001 < 20, the adjusted timestamp should be used
714        tx.send(Barrier::with_prev_epoch_for_test(
715            test_epoch(20000),
716            test_epoch(15000),
717        ))
718        .unwrap();
719
720        // Consume the barrier
721        now.next_unwrap_ready_barrier()?;
722
723        // Consume the data chunk
724        let chunk_msg = now.next_unwrap_ready_chunk()?;
725
726        assert_eq!(
727            chunk_msg.compact_vis(),
728            StreamChunk::from_pretty(
729                " TZ
730                - 2021-04-01T00:00:06.001Z
731                + 2021-04-01T00:00:08.001Z" // adjusted timestamp
732            )
733        );
734
735        // Consume the watermark
736        let watermark = now.next_unwrap_ready_watermark()?;
737
738        assert_eq!(
739            watermark,
740            Watermark::new(
741                0,
742                DataType::Timestamptz,
743                ScalarImpl::Timestamptz("2021-04-01T00:00:08.001Z".parse().unwrap())
744            )
745        );
746
747        // Test case where epoch timestamp is smaller than adjusted timestamp
748        // Send barrier at epoch 25 (timestamp 2021-04-01T00:00:00.025Z)
749        // Adjusted timestamp would be: 8001 + (1000 * 2.0) = 10001ms = 2021-04-01T00:00:10.001Z
750        // Since 10001 < 25, use adjusted timestamp
751        tx.send(Barrier::with_prev_epoch_for_test(
752            test_epoch(25000),
753            test_epoch(20000),
754        ))
755        .unwrap();
756
757        // Consume the barrier
758        now.next_unwrap_ready_barrier()?;
759
760        // Consume the data chunk
761        let chunk_msg = now.next_unwrap_ready_chunk()?;
762
763        assert_eq!(
764            chunk_msg.compact_vis(),
765            StreamChunk::from_pretty(
766                " TZ
767                - 2021-04-01T00:00:08.001Z
768                + 2021-04-01T00:00:10.001Z" // adjusted timestamp
769            )
770        );
771
772        // Consume the watermark
773        let watermark = now.next_unwrap_ready_watermark()?;
774
775        assert_eq!(
776            watermark,
777            Watermark::new(
778                0,
779                DataType::Timestamptz,
780                ScalarImpl::Timestamptz("2021-04-01T00:00:10.001Z".parse().unwrap())
781            )
782        );
783
784        // Finally test when epoch timestamp is larger than adjusted timestamp
785        // Send barrier at epoch 30 (timestamp 2021-04-01T00:00:00.030Z)
786        // Adjusted timestamp would be: 10001 + (1000 * 2.0) = 12001ms = 2021-04-01T00:00:12.001Z
787        // Since 12001 < 30, use adjusted timestamp
788        tx.send(Barrier::with_prev_epoch_for_test(
789            test_epoch(30000),
790            test_epoch(25000),
791        ))
792        .unwrap();
793
794        // Consume the barrier
795        now.next_unwrap_ready_barrier()?;
796
797        // Consume the data chunk
798        let chunk_msg = now.next_unwrap_ready_chunk()?;
799
800        assert_eq!(
801            chunk_msg.compact_vis(),
802            StreamChunk::from_pretty(
803                " TZ
804                - 2021-04-01T00:00:10.001Z
805                + 2021-04-01T00:00:12.001Z" // adjusted timestamp
806            )
807        );
808
809        // Consume the watermark
810        let watermark = now.next_unwrap_ready_watermark()?;
811
812        assert_eq!(
813            watermark,
814            Watermark::new(
815                0,
816                DataType::Timestamptz,
817                ScalarImpl::Timestamptz("2021-04-01T00:00:12.001Z".parse().unwrap())
818            )
819        );
820
821        Ok(())
822    }
823
824    async fn test_now_generate_series_inner() -> StreamExecutorResult<()> {
825        let start_timestamp = Timestamptz::from_secs(1617235190).unwrap(); // 2021-03-31 23:59:50 UTC
826        let interval = Interval::from_millis(1000); // 1s interval
827
828        let state_store = create_state_store();
829        let (tx, mut now) = create_executor(
830            NowMode::GenerateSeries {
831                start_timestamp,
832                interval,
833            },
834            &state_store,
835        )
836        .await;
837
838        // Init barrier
839        tx.send(Barrier::new_test_barrier(test_epoch(1000)))
840            .unwrap();
841        now.next_unwrap_ready_barrier()?;
842
843        // Initial timestamps
844        let chunk = now.next_unwrap_ready_chunk()?;
845        assert_eq!(chunk.cardinality(), 12); // seconds from 23:59:50 to 00:00:01 (inclusive)
846
847        assert_eq!(
848            now.next_unwrap_ready_watermark()?,
849            Watermark::new(
850                0,
851                DataType::Timestamptz,
852                ScalarImpl::Timestamptz("2021-04-01T00:00:01.000Z".parse().unwrap())
853            )
854        );
855
856        tx.send(Barrier::with_prev_epoch_for_test(
857            test_epoch(2000),
858            test_epoch(1000),
859        ))
860        .unwrap();
861        tx.send(Barrier::with_prev_epoch_for_test(
862            test_epoch(3000),
863            test_epoch(2000),
864        ))
865        .unwrap();
866
867        now.next_unwrap_ready_barrier()?;
868        now.next_unwrap_ready_barrier()?;
869
870        let chunk = now.next_unwrap_ready_chunk()?;
871        assert_eq!(
872            chunk.compact_vis(),
873            StreamChunk::from_pretty(
874                " TZ
875                + 2021-04-01T00:00:02.000Z
876                + 2021-04-01T00:00:03.000Z"
877            )
878        );
879
880        let watermark = now.next_unwrap_ready_watermark()?;
881        assert_eq!(
882            watermark,
883            Watermark::new(
884                0,
885                DataType::Timestamptz,
886                ScalarImpl::Timestamptz("2021-04-01T00:00:03.000Z".parse().unwrap())
887            )
888        );
889
890        // Recovery
891        drop((tx, now));
892        let (tx, mut now) = create_executor(
893            NowMode::GenerateSeries {
894                start_timestamp,
895                interval,
896            },
897            &state_store,
898        )
899        .await;
900
901        tx.send(Barrier::with_prev_epoch_for_test(
902            test_epoch(4000),
903            test_epoch(2000),
904        ))
905        .unwrap();
906
907        now.next_unwrap_ready_barrier()?;
908
909        let chunk = now.next_unwrap_ready_chunk()?;
910        assert_eq!(
911            chunk.compact_vis(),
912            StreamChunk::from_pretty(
913                " TZ
914                + 2021-04-01T00:00:02.000Z
915                + 2021-04-01T00:00:03.000Z
916                + 2021-04-01T00:00:04.000Z"
917            )
918        );
919
920        let watermark = now.next_unwrap_ready_watermark()?;
921        assert_eq!(
922            watermark,
923            Watermark::new(
924                0,
925                DataType::Timestamptz,
926                ScalarImpl::Timestamptz("2021-04-01T00:00:04.000Z".parse().unwrap())
927            )
928        );
929
930        Ok(())
931    }
932
933    fn create_state_store() -> MemoryStateStore {
934        MemoryStateStore::new()
935    }
936
937    async fn create_executor_with_progress_ratio(
938        mode: NowMode,
939        state_store: &MemoryStateStore,
940        progress_ratio: Option<f32>,
941    ) -> (UnboundedSender<Barrier>, BoxedMessageStream) {
942        let table_id = TableId::new(1);
943        let column_descs = vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamptz)];
944        let state_table = StateTable::from_table_catalog(
945            &gen_pbtable(table_id, column_descs, vec![], vec![], 0),
946            state_store.clone(),
947            None,
948        )
949        .await;
950
951        let (sender, barrier_receiver) = unbounded_channel();
952
953        let eval_error_report = ActorEvalErrorReport {
954            actor_context: ActorContext::for_test(123),
955            identity: "NowExecutor".into(),
956        };
957        let barrier_interval_ms = 1000;
958        let now_executor = NowExecutor::new(
959            vec![DataType::Timestamptz],
960            mode,
961            eval_error_report,
962            barrier_receiver,
963            state_table,
964            progress_ratio,
965            barrier_interval_ms,
966        );
967        (sender, now_executor.boxed().execute())
968    }
969
970    async fn create_executor(
971        mode: NowMode,
972        state_store: &MemoryStateStore,
973    ) -> (UnboundedSender<Barrier>, BoxedMessageStream) {
974        create_executor_with_progress_ratio(mode, state_store, None).await
975    }
976}