1use itertools::Itertools;
16use risingwave_common::array::Op;
17use risingwave_common::row;
18use risingwave_common::types::{DefaultOrdered, Interval, Timestamptz, ToDatumRef};
19use risingwave_expr::capture_context;
20use risingwave_expr::expr::{
21 EvalErrorReport, ExpressionBoxExt, InputRefExpression, LiteralExpression, NonStrictExpression,
22 build_func_non_strict,
23};
24use risingwave_expr::expr_context::TIME_ZONE;
25use tokio::sync::mpsc::UnboundedReceiver;
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28use crate::executor::prelude::*;
29use crate::task::ActorEvalErrorReport;
30
31pub struct NowExecutor<S: StateStore> {
32 data_types: Vec<DataType>,
33
34 mode: NowMode,
35 eval_error_report: ActorEvalErrorReport,
36
37 barrier_receiver: UnboundedReceiver<Barrier>,
39
40 state_table: StateTable<S>,
41
42 progress_ratio: Option<f32>,
43
44 barrier_interval_ms: u32,
45}
46
47pub enum NowMode {
48 UpdateCurrent,
50 GenerateSeries {
53 start_timestamp: Timestamptz,
54 interval: Interval,
55 },
56}
57
58enum ModeVars {
59 UpdateCurrent,
60 GenerateSeries {
61 chunk_builder: StreamChunkBuilder,
62 add_interval_expr: NonStrictExpression,
63 },
64}
65
66impl<S: StateStore> NowExecutor<S> {
67 pub fn new(
68 data_types: Vec<DataType>,
69 mode: NowMode,
70 eval_error_report: ActorEvalErrorReport,
71 barrier_receiver: UnboundedReceiver<Barrier>,
72 state_table: StateTable<S>,
73 progress_ratio: Option<f32>,
74 barrier_interval_ms: u32,
75 ) -> Self {
76 Self {
77 data_types,
78 mode,
79 eval_error_report,
80 barrier_receiver,
81 state_table,
82 progress_ratio,
83 barrier_interval_ms,
84 }
85 }
86
87 #[try_stream(ok = Message, error = StreamExecutorError)]
88 async fn execute_inner(self) {
89 let Self {
90 data_types,
91 mode,
92 eval_error_report,
93 barrier_receiver,
94 mut state_table,
95 progress_ratio,
96 barrier_interval_ms,
97 } = self;
98
99 info!(
100 "NowExecutor started. progress_ratio: {:?}, barrier_interval_ms: {:?}",
101 progress_ratio, barrier_interval_ms
102 );
103
104 let max_chunk_size = crate::config::chunk_size();
105
106 let mut paused = false;
108 let mut last_timestamp_datum: Datum = None;
110
111 let mut initialized = false;
113
114 let mut mode_vars = match &mode {
115 NowMode::UpdateCurrent => ModeVars::UpdateCurrent,
116 NowMode::GenerateSeries { interval, .. } => {
117 let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1));
119 let add_interval_expr =
120 build_add_interval_expr_captured(*interval, eval_error_report)?;
121 ModeVars::GenerateSeries {
122 chunk_builder,
123 add_interval_expr,
124 }
125 }
126 };
127
128 const MAX_MERGE_BARRIER_SIZE: usize = 64;
129
130 #[for_await]
131 for barriers in
132 UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE)
133 {
134 let mut curr_timestamp_datum: Datum = None;
135 if barriers.len() > 1 {
136 warn!(
137 "handle multiple barriers at once in now executor: {}",
138 barriers.len()
139 );
140 }
141 for barrier in barriers {
142 let curr_epoch = barrier.get_curr_epoch();
143 let new_timestamp = curr_epoch.as_timestamptz();
144 let pause_mutation =
145 barrier
146 .mutation
147 .as_deref()
148 .and_then(|mutation| match mutation {
149 Mutation::Pause => Some(true),
150 Mutation::Resume => Some(false),
151 _ => None,
152 });
153
154 if !initialized {
155 let first_epoch = barrier.epoch;
156 let is_pause_on_startup = barrier.is_pause_on_startup();
157 yield Message::Barrier(barrier);
158 state_table.init_epoch(first_epoch).await?;
160 last_timestamp_datum = state_table.get_from_one_value_table().await?;
161 paused = is_pause_on_startup;
162 initialized = true;
163 } else {
164 state_table
165 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
166 .await?;
167 yield Message::Barrier(barrier);
168 }
169
170 if let Some(datum) = &last_timestamp_datum
172 && let Some(progress_ratio) = progress_ratio
173 && progress_ratio > 1.0
174 {
175 let last_timestamp = datum.as_timestamptz();
176 let progress_timestamp = last_timestamp
180 .timestamp_millis()
181 .checked_add((barrier_interval_ms as f32 * progress_ratio).ceil() as i64)
182 .expect("progress_timestamp is out of i64 range");
183 let adjusted_timestamp = if progress_timestamp
184 < new_timestamp.timestamp_millis()
185 {
186 debug!(
187 "adjusted next now timestamp from {} to {}. curr_epoch: {}, barrier_interval_ms: {}, progress_ratio: {}",
188 new_timestamp.timestamp_millis(),
189 progress_timestamp,
190 curr_epoch,
191 barrier_interval_ms,
192 progress_ratio
193 );
194 Timestamptz::from_millis(progress_timestamp)
195 .expect("progress_timestamp is out of timestamptz range")
196 } else {
197 new_timestamp
198 };
199 curr_timestamp_datum = Some(adjusted_timestamp.into());
200 } else {
201 curr_timestamp_datum = Some(new_timestamp.into());
202 }
203
204 if let Some(pause_mutation) = pause_mutation {
206 paused = pause_mutation;
207 }
208 }
209
210 if paused {
212 continue;
213 }
214
215 match (&mode, &mut mode_vars) {
216 (NowMode::UpdateCurrent, ModeVars::UpdateCurrent) => {
217 let chunk = if last_timestamp_datum.is_some() {
218 let last_row = row::once(&last_timestamp_datum);
219 let row = row::once(&curr_timestamp_datum);
220 state_table.update(last_row, row);
221
222 StreamChunk::from_rows(
223 &[(Op::Delete, last_row), (Op::Insert, row)],
224 &data_types,
225 )
226 } else {
227 let row = row::once(&curr_timestamp_datum);
228 state_table.insert(row);
229
230 StreamChunk::from_rows(&[(Op::Insert, row)], &data_types)
231 };
232
233 yield Message::Chunk(chunk);
234 last_timestamp_datum.clone_from(&curr_timestamp_datum)
235 }
236 (
237 &NowMode::GenerateSeries {
238 start_timestamp, ..
239 },
240 &mut ModeVars::GenerateSeries {
241 ref mut chunk_builder,
242 ref add_interval_expr,
243 },
244 ) => {
245 if last_timestamp_datum.is_none() {
246 let first = Some(start_timestamp.into());
248 let first_row = row::once(&first);
249 let _ = chunk_builder.append_row(Op::Insert, first_row);
250 state_table.insert(first_row);
251 last_timestamp_datum = first;
252 }
253
254 let mut last_row = OwnedRow::new(vec![last_timestamp_datum.clone()]);
258
259 loop {
260 if chunk_builder.size() >= max_chunk_size {
261 if let Some(chunk) = chunk_builder.take() {
265 yield Message::Chunk(chunk);
266 }
267 }
268
269 let next = add_interval_expr.eval_row_infallible(&last_row).await;
270 if DefaultOrdered(next.to_datum_ref())
271 > DefaultOrdered(curr_timestamp_datum.to_datum_ref())
272 {
273 break;
275 }
276
277 let next_row = OwnedRow::new(vec![next]);
278 let _ = chunk_builder.append_row(Op::Insert, &next_row);
279 last_row = next_row;
280 }
281
282 if let Some(chunk) = chunk_builder.take() {
283 yield Message::Chunk(chunk);
284 }
285
286 state_table.update(row::once(&last_timestamp_datum), &last_row);
288 last_timestamp_datum = last_row
289 .into_inner()
290 .into_vec()
291 .into_iter()
292 .exactly_one()
293 .unwrap();
294 }
295 _ => unreachable!(),
296 }
297
298 yield Message::Watermark(Watermark::new(
299 0,
300 DataType::Timestamptz,
301 curr_timestamp_datum.unwrap(),
302 ));
303 }
304 }
305}
306
307impl<S: StateStore> Execute for NowExecutor<S> {
308 fn execute(self: Box<Self>) -> BoxedMessageStream {
309 self.execute_inner().boxed()
310 }
311}
312
313#[capture_context(TIME_ZONE)]
314pub fn build_add_interval_expr(
315 time_zone: &str,
316 interval: Interval,
317 eval_error_report: impl EvalErrorReport + 'static,
318) -> risingwave_expr::Result<NonStrictExpression> {
319 let timestamptz_input = InputRefExpression::new(DataType::Timestamptz, 0);
320 let interval = LiteralExpression::new(DataType::Interval, Some(interval.into()));
321 let time_zone = LiteralExpression::new(DataType::Varchar, Some(time_zone.into()));
322
323 use risingwave_pb::expr::expr_node::PbType as PbExprType;
324 build_func_non_strict(
325 PbExprType::AddWithTimeZone,
326 DataType::Timestamptz,
327 vec![
328 timestamptz_input.boxed(),
329 interval.boxed(),
330 time_zone.boxed(),
331 ],
332 eval_error_report,
333 )
334}
335
336#[cfg(test)]
337mod tests {
338 use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
339 use risingwave_common::test_prelude::StreamChunkTestExt;
340 use risingwave_common::types::test_utils::IntervalTestExt;
341 use risingwave_common::util::epoch::test_epoch;
342 use risingwave_storage::memory::MemoryStateStore;
343 use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
344
345 use super::*;
346 use crate::common::table::test_utils::gen_pbtable;
347 use crate::executor::test_utils::StreamExecutorTestExt;
348
349 #[tokio::test]
350 async fn test_now() -> StreamExecutorResult<()> {
351 let state_store = create_state_store();
352 let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
353
354 tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap();
356
357 now.next_unwrap_ready_barrier()?;
359
360 let chunk_msg = now.next_unwrap_ready_chunk()?;
362
363 assert_eq!(
364 chunk_msg.compact(),
365 StreamChunk::from_pretty(
366 " TZ
367 + 2021-04-01T00:00:00.001Z"
368 )
369 );
370
371 let watermark = now.next_unwrap_ready_watermark()?;
373
374 assert_eq!(
375 watermark,
376 Watermark::new(
377 0,
378 DataType::Timestamptz,
379 ScalarImpl::Timestamptz("2021-04-01T00:00:00.001Z".parse().unwrap())
380 )
381 );
382
383 tx.send(Barrier::with_prev_epoch_for_test(
384 test_epoch(2),
385 test_epoch(1),
386 ))
387 .unwrap();
388
389 now.next_unwrap_ready_barrier()?;
391
392 let chunk_msg = now.next_unwrap_ready_chunk()?;
394
395 assert_eq!(
396 chunk_msg.compact(),
397 StreamChunk::from_pretty(
398 " TZ
399 - 2021-04-01T00:00:00.001Z
400 + 2021-04-01T00:00:00.002Z"
401 )
402 );
403
404 let watermark = now.next_unwrap_ready_watermark()?;
406
407 assert_eq!(
408 watermark,
409 Watermark::new(
410 0,
411 DataType::Timestamptz,
412 ScalarImpl::Timestamptz("2021-04-01T00:00:00.002Z".parse().unwrap())
413 )
414 );
415
416 now.next_unwrap_pending();
418
419 drop((tx, now));
421 let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
422 tx.send(Barrier::with_prev_epoch_for_test(
423 test_epoch(3),
424 test_epoch(1),
425 ))
426 .unwrap();
427
428 now.next_unwrap_ready_barrier()?;
430
431 let chunk_msg = now.next_unwrap_ready_chunk()?;
433 assert_eq!(
434 chunk_msg.compact(),
435 StreamChunk::from_pretty(
437 " TZ
438 - 2021-04-01T00:00:00.001Z
439 + 2021-04-01T00:00:00.003Z"
440 )
441 );
442
443 let watermark = now.next_unwrap_ready_watermark()?;
445
446 assert_eq!(
447 watermark,
448 Watermark::new(
449 0,
450 DataType::Timestamptz,
451 ScalarImpl::Timestamptz("2021-04-01T00:00:00.003Z".parse().unwrap())
452 )
453 );
454
455 drop((tx, now));
457 let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
458 tx.send(
459 Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(1))
460 .with_mutation(Mutation::Pause),
461 )
462 .unwrap();
463
464 now.next_unwrap_ready_barrier()?;
466
467 now.next_unwrap_pending();
469
470 tx.send(
472 Barrier::with_prev_epoch_for_test(test_epoch(5), test_epoch(4))
473 .with_mutation(Mutation::Resume),
474 )
475 .unwrap();
476
477 now.next_unwrap_ready_barrier()?;
479
480 let chunk_msg = now.next_unwrap_ready_chunk()?;
482 assert_eq!(
483 chunk_msg.compact(),
484 StreamChunk::from_pretty(
485 " TZ
486 - 2021-04-01T00:00:00.001Z
487 + 2021-04-01T00:00:00.005Z"
488 )
489 );
490
491 let watermark = now.next_unwrap_ready_watermark()?;
493
494 assert_eq!(
495 watermark,
496 Watermark::new(
497 0,
498 DataType::Timestamptz,
499 ScalarImpl::Timestamptz("2021-04-01T00:00:00.005Z".parse().unwrap())
500 )
501 );
502
503 Ok(())
504 }
505
506 #[tokio::test]
507 async fn test_now_start_with_paused() -> StreamExecutorResult<()> {
508 let state_store = create_state_store();
509 let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
510
511 tx.send(Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Pause))
513 .unwrap();
514
515 now.next_unwrap_ready_barrier()?;
517
518 now.next_unwrap_pending();
520
521 tx.send(
523 Barrier::with_prev_epoch_for_test(test_epoch(2), test_epoch(1))
524 .with_mutation(Mutation::Resume),
525 )
526 .unwrap();
527
528 now.next_unwrap_ready_barrier()?;
530
531 let chunk_msg = now.next_unwrap_ready_chunk()?;
533
534 assert_eq!(
535 chunk_msg.compact(),
536 StreamChunk::from_pretty(
537 " TZ
538 + 2021-04-01T00:00:00.002Z" )
540 );
541
542 let watermark = now.next_unwrap_ready_watermark()?;
544
545 assert_eq!(
546 watermark,
547 Watermark::new(
548 0,
549 DataType::Timestamptz,
550 ScalarImpl::Timestamptz("2021-04-01T00:00:00.002Z".parse().unwrap())
551 )
552 );
553
554 now.next_unwrap_pending();
556
557 Ok(())
558 }
559
560 #[tokio::test]
561 async fn test_now_generate_series() -> StreamExecutorResult<()> {
562 TIME_ZONE::scope("UTC".to_owned(), test_now_generate_series_inner()).await
563 }
564
565 #[tokio::test]
566 async fn test_now_with_progress_ratio() -> StreamExecutorResult<()> {
567 let state_store = create_state_store();
568 let progress_ratio = Some(2.0);
569 let (tx, mut now) = create_executor_with_progress_ratio(
570 NowMode::UpdateCurrent,
571 &state_store,
572 progress_ratio,
573 )
574 .await;
575
576 tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap();
578
579 now.next_unwrap_ready_barrier()?;
581
582 let chunk_msg = now.next_unwrap_ready_chunk()?;
584
585 assert_eq!(
586 chunk_msg.compact(),
587 StreamChunk::from_pretty(
588 " TZ
589 + 2021-04-01T00:00:00.001Z"
590 )
591 );
592
593 let watermark = now.next_unwrap_ready_watermark()?;
595
596 assert_eq!(
597 watermark,
598 Watermark::new(
599 0,
600 DataType::Timestamptz,
601 ScalarImpl::Timestamptz("2021-04-01T00:00:00.001Z".parse().unwrap())
602 )
603 );
604
605 tx.send(Barrier::with_prev_epoch_for_test(
610 test_epoch(5000),
611 test_epoch(1),
612 ))
613 .unwrap();
614
615 now.next_unwrap_ready_barrier()?;
617
618 let chunk_msg = now.next_unwrap_ready_chunk()?;
620
621 assert_eq!(
622 chunk_msg.compact(),
623 StreamChunk::from_pretty(
624 " TZ
625 - 2021-04-01T00:00:00.001Z
626 + 2021-04-01T00:00:02.001Z" )
628 );
629
630 let watermark = now.next_unwrap_ready_watermark()?;
632
633 assert_eq!(
634 watermark,
635 Watermark::new(
636 0,
637 DataType::Timestamptz,
638 ScalarImpl::Timestamptz("2021-04-01T00:00:02.001Z".parse().unwrap())
639 )
640 );
641
642 tx.send(Barrier::with_prev_epoch_for_test(
646 test_epoch(10000),
647 test_epoch(5000),
648 ))
649 .unwrap();
650
651 now.next_unwrap_ready_barrier()?;
653
654 let chunk_msg = now.next_unwrap_ready_chunk()?;
656
657 assert_eq!(
658 chunk_msg.compact(),
659 StreamChunk::from_pretty(
660 " TZ
661 - 2021-04-01T00:00:02.001Z
662 + 2021-04-01T00:00:04.001Z" )
664 );
665
666 let watermark = now.next_unwrap_ready_watermark()?;
668
669 assert_eq!(
670 watermark,
671 Watermark::new(
672 0,
673 DataType::Timestamptz,
674 ScalarImpl::Timestamptz("2021-04-01T00:00:04.001Z".parse().unwrap())
675 )
676 );
677
678 tx.send(Barrier::with_prev_epoch_for_test(
682 test_epoch(15000),
683 test_epoch(10000),
684 ))
685 .unwrap();
686
687 now.next_unwrap_ready_barrier()?;
689
690 let chunk_msg = now.next_unwrap_ready_chunk()?;
692
693 assert_eq!(
694 chunk_msg.compact(),
695 StreamChunk::from_pretty(
696 " TZ
697 - 2021-04-01T00:00:04.001Z
698 + 2021-04-01T00:00:06.001Z" )
700 );
701
702 let watermark = now.next_unwrap_ready_watermark()?;
704
705 assert_eq!(
706 watermark,
707 Watermark::new(
708 0,
709 DataType::Timestamptz,
710 ScalarImpl::Timestamptz("2021-04-01T00:00:06.001Z".parse().unwrap())
711 )
712 );
713
714 tx.send(Barrier::with_prev_epoch_for_test(
718 test_epoch(20000),
719 test_epoch(15000),
720 ))
721 .unwrap();
722
723 now.next_unwrap_ready_barrier()?;
725
726 let chunk_msg = now.next_unwrap_ready_chunk()?;
728
729 assert_eq!(
730 chunk_msg.compact(),
731 StreamChunk::from_pretty(
732 " TZ
733 - 2021-04-01T00:00:06.001Z
734 + 2021-04-01T00:00:08.001Z" )
736 );
737
738 let watermark = now.next_unwrap_ready_watermark()?;
740
741 assert_eq!(
742 watermark,
743 Watermark::new(
744 0,
745 DataType::Timestamptz,
746 ScalarImpl::Timestamptz("2021-04-01T00:00:08.001Z".parse().unwrap())
747 )
748 );
749
750 tx.send(Barrier::with_prev_epoch_for_test(
755 test_epoch(25000),
756 test_epoch(20000),
757 ))
758 .unwrap();
759
760 now.next_unwrap_ready_barrier()?;
762
763 let chunk_msg = now.next_unwrap_ready_chunk()?;
765
766 assert_eq!(
767 chunk_msg.compact(),
768 StreamChunk::from_pretty(
769 " TZ
770 - 2021-04-01T00:00:08.001Z
771 + 2021-04-01T00:00:10.001Z" )
773 );
774
775 let watermark = now.next_unwrap_ready_watermark()?;
777
778 assert_eq!(
779 watermark,
780 Watermark::new(
781 0,
782 DataType::Timestamptz,
783 ScalarImpl::Timestamptz("2021-04-01T00:00:10.001Z".parse().unwrap())
784 )
785 );
786
787 tx.send(Barrier::with_prev_epoch_for_test(
792 test_epoch(30000),
793 test_epoch(25000),
794 ))
795 .unwrap();
796
797 now.next_unwrap_ready_barrier()?;
799
800 let chunk_msg = now.next_unwrap_ready_chunk()?;
802
803 assert_eq!(
804 chunk_msg.compact(),
805 StreamChunk::from_pretty(
806 " TZ
807 - 2021-04-01T00:00:10.001Z
808 + 2021-04-01T00:00:12.001Z" )
810 );
811
812 let watermark = now.next_unwrap_ready_watermark()?;
814
815 assert_eq!(
816 watermark,
817 Watermark::new(
818 0,
819 DataType::Timestamptz,
820 ScalarImpl::Timestamptz("2021-04-01T00:00:12.001Z".parse().unwrap())
821 )
822 );
823
824 Ok(())
825 }
826
827 async fn test_now_generate_series_inner() -> StreamExecutorResult<()> {
828 let start_timestamp = Timestamptz::from_secs(1617235190).unwrap(); let interval = Interval::from_millis(1000); let state_store = create_state_store();
832 let (tx, mut now) = create_executor(
833 NowMode::GenerateSeries {
834 start_timestamp,
835 interval,
836 },
837 &state_store,
838 )
839 .await;
840
841 tx.send(Barrier::new_test_barrier(test_epoch(1000)))
843 .unwrap();
844 now.next_unwrap_ready_barrier()?;
845
846 let chunk = now.next_unwrap_ready_chunk()?;
848 assert_eq!(chunk.cardinality(), 12); assert_eq!(
851 now.next_unwrap_ready_watermark()?,
852 Watermark::new(
853 0,
854 DataType::Timestamptz,
855 ScalarImpl::Timestamptz("2021-04-01T00:00:01.000Z".parse().unwrap())
856 )
857 );
858
859 tx.send(Barrier::with_prev_epoch_for_test(
860 test_epoch(2000),
861 test_epoch(1000),
862 ))
863 .unwrap();
864 tx.send(Barrier::with_prev_epoch_for_test(
865 test_epoch(3000),
866 test_epoch(2000),
867 ))
868 .unwrap();
869
870 now.next_unwrap_ready_barrier()?;
871 now.next_unwrap_ready_barrier()?;
872
873 let chunk = now.next_unwrap_ready_chunk()?;
874 assert_eq!(
875 chunk.compact(),
876 StreamChunk::from_pretty(
877 " TZ
878 + 2021-04-01T00:00:02.000Z
879 + 2021-04-01T00:00:03.000Z"
880 )
881 );
882
883 let watermark = now.next_unwrap_ready_watermark()?;
884 assert_eq!(
885 watermark,
886 Watermark::new(
887 0,
888 DataType::Timestamptz,
889 ScalarImpl::Timestamptz("2021-04-01T00:00:03.000Z".parse().unwrap())
890 )
891 );
892
893 drop((tx, now));
895 let (tx, mut now) = create_executor(
896 NowMode::GenerateSeries {
897 start_timestamp,
898 interval,
899 },
900 &state_store,
901 )
902 .await;
903
904 tx.send(Barrier::with_prev_epoch_for_test(
905 test_epoch(4000),
906 test_epoch(2000),
907 ))
908 .unwrap();
909
910 now.next_unwrap_ready_barrier()?;
911
912 let chunk = now.next_unwrap_ready_chunk()?;
913 assert_eq!(
914 chunk.compact(),
915 StreamChunk::from_pretty(
916 " TZ
917 + 2021-04-01T00:00:02.000Z
918 + 2021-04-01T00:00:03.000Z
919 + 2021-04-01T00:00:04.000Z"
920 )
921 );
922
923 let watermark = now.next_unwrap_ready_watermark()?;
924 assert_eq!(
925 watermark,
926 Watermark::new(
927 0,
928 DataType::Timestamptz,
929 ScalarImpl::Timestamptz("2021-04-01T00:00:04.000Z".parse().unwrap())
930 )
931 );
932
933 Ok(())
934 }
935
936 fn create_state_store() -> MemoryStateStore {
937 MemoryStateStore::new()
938 }
939
940 async fn create_executor_with_progress_ratio(
941 mode: NowMode,
942 state_store: &MemoryStateStore,
943 progress_ratio: Option<f32>,
944 ) -> (UnboundedSender<Barrier>, BoxedMessageStream) {
945 let table_id = TableId::new(1);
946 let column_descs = vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamptz)];
947 let state_table = StateTable::from_table_catalog(
948 &gen_pbtable(table_id, column_descs, vec![], vec![], 0),
949 state_store.clone(),
950 None,
951 )
952 .await;
953
954 let (sender, barrier_receiver) = unbounded_channel();
955
956 let eval_error_report = ActorEvalErrorReport {
957 actor_context: ActorContext::for_test(123),
958 identity: "NowExecutor".into(),
959 };
960 let barrier_interval_ms = 1000;
961 let now_executor = NowExecutor::new(
962 vec![DataType::Timestamptz],
963 mode,
964 eval_error_report,
965 barrier_receiver,
966 state_table,
967 progress_ratio,
968 barrier_interval_ms,
969 );
970 (sender, now_executor.boxed().execute())
971 }
972
973 async fn create_executor(
974 mode: NowMode,
975 state_store: &MemoryStateStore,
976 ) -> (UnboundedSender<Barrier>, BoxedMessageStream) {
977 create_executor_with_progress_ratio(mode, state_store, None).await
978 }
979}