1use itertools::Itertools;
16use risingwave_common::array::Op;
17use risingwave_common::row;
18use risingwave_common::types::{DefaultOrdered, Interval, Timestamptz, ToDatumRef};
19use risingwave_expr::capture_context;
20use risingwave_expr::expr::{
21 EvalErrorReport, ExpressionBoxExt, InputRefExpression, LiteralExpression, NonStrictExpression,
22 build_func_non_strict,
23};
24use risingwave_expr::expr_context::TIME_ZONE;
25use tokio::sync::mpsc::UnboundedReceiver;
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28use crate::executor::prelude::*;
29use crate::task::ActorEvalErrorReport;
30
31pub struct NowExecutor<S: StateStore> {
32 data_types: Vec<DataType>,
33
34 mode: NowMode,
35 eval_error_report: ActorEvalErrorReport,
36
37 barrier_receiver: UnboundedReceiver<Barrier>,
39
40 state_table: StateTable<S>,
41
42 progress_ratio: Option<f32>,
43
44 barrier_interval_ms: u32,
45}
46
47pub enum NowMode {
48 UpdateCurrent,
50 GenerateSeries {
53 start_timestamp: Timestamptz,
54 interval: Interval,
55 },
56}
57
58enum ModeVars {
59 UpdateCurrent,
60 GenerateSeries {
61 chunk_builder: StreamChunkBuilder,
62 add_interval_expr: NonStrictExpression,
63 },
64}
65
66impl<S: StateStore> NowExecutor<S> {
67 pub fn new(
68 data_types: Vec<DataType>,
69 mode: NowMode,
70 eval_error_report: ActorEvalErrorReport,
71 barrier_receiver: UnboundedReceiver<Barrier>,
72 state_table: StateTable<S>,
73 progress_ratio: Option<f32>,
74 barrier_interval_ms: u32,
75 ) -> Self {
76 Self {
77 data_types,
78 mode,
79 eval_error_report,
80 barrier_receiver,
81 state_table,
82 progress_ratio,
83 barrier_interval_ms,
84 }
85 }
86
87 #[try_stream(ok = Message, error = StreamExecutorError)]
88 async fn execute_inner(self) {
89 let Self {
90 data_types,
91 mode,
92 eval_error_report,
93 barrier_receiver,
94 mut state_table,
95 progress_ratio,
96 barrier_interval_ms,
97 } = self;
98
99 info!(
100 "NowExecutor started. progress_ratio: {:?}, barrier_interval_ms: {:?}",
101 progress_ratio, barrier_interval_ms
102 );
103
104 let max_chunk_size = crate::config::chunk_size();
105
106 let mut paused = false;
108 let mut last_timestamp_datum: Datum = None;
110
111 let mut initialized = false;
113
114 let mut mode_vars = match &mode {
115 NowMode::UpdateCurrent => ModeVars::UpdateCurrent,
116 NowMode::GenerateSeries { interval, .. } => {
117 let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1));
119 let add_interval_expr =
120 build_add_interval_expr_captured(*interval, eval_error_report)?;
121 ModeVars::GenerateSeries {
122 chunk_builder,
123 add_interval_expr,
124 }
125 }
126 };
127
128 const MAX_MERGE_BARRIER_SIZE: usize = 64;
129
130 #[for_await]
131 for barriers in
132 UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE)
133 {
134 let mut curr_timestamp_datum: Datum = None;
135 if barriers.len() > 1 {
136 warn!(
137 "handle multiple barriers at once in now executor: {}",
138 barriers.len()
139 );
140 }
141 for barrier in barriers {
142 let curr_epoch = barrier.get_curr_epoch();
143 let new_timestamp = curr_epoch.as_timestamptz();
144 let pause_mutation =
145 barrier
146 .mutation
147 .as_deref()
148 .and_then(|mutation| match mutation {
149 Mutation::Pause => Some(true),
150 Mutation::Resume => Some(false),
151 _ => None,
152 });
153
154 if !initialized {
155 let first_epoch = barrier.epoch;
156 let is_pause_on_startup = barrier.is_pause_on_startup();
157 yield Message::Barrier(barrier);
158 state_table.init_epoch(first_epoch).await?;
160 last_timestamp_datum = state_table.get_from_one_value_table().await?;
161 paused = is_pause_on_startup;
162 initialized = true;
163 } else {
164 state_table
165 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
166 .await?;
167 yield Message::Barrier(barrier);
168 }
169
170 if let Some(datum) = &last_timestamp_datum
172 && let Some(progress_ratio) = progress_ratio
173 && progress_ratio > 1.0
174 {
175 let last_timestamp = datum.as_timestamptz();
176 let progress_timestamp = last_timestamp
180 .timestamp_millis()
181 .checked_add((barrier_interval_ms as f32 * progress_ratio).ceil() as i64)
182 .expect("progress_timestamp is out of i64 range");
183 let adjusted_timestamp = if progress_timestamp
184 < new_timestamp.timestamp_millis()
185 {
186 debug!(
187 "adjusted next now timestamp from {} to {}. curr_epoch: {}, barrier_interval_ms: {}, progress_ratio: {}",
188 new_timestamp.timestamp_millis(),
189 progress_timestamp,
190 curr_epoch,
191 barrier_interval_ms,
192 progress_ratio
193 );
194 Timestamptz::from_millis(progress_timestamp)
195 .expect("progress_timestamp is out of timestamptz range")
196 } else {
197 new_timestamp
198 };
199 curr_timestamp_datum = Some(adjusted_timestamp.into());
200 } else {
201 curr_timestamp_datum = Some(new_timestamp.into());
202 }
203
204 if let Some(pause_mutation) = pause_mutation {
206 paused = pause_mutation;
207 }
208 }
209
210 if paused {
212 continue;
213 }
214
215 match (&mode, &mut mode_vars) {
216 (NowMode::UpdateCurrent, ModeVars::UpdateCurrent) => {
217 let chunk = if last_timestamp_datum.is_some() {
218 let last_row = row::once(&last_timestamp_datum);
219 let row = row::once(&curr_timestamp_datum);
220 state_table.update(last_row, row);
221
222 StreamChunk::from_rows(
223 &[(Op::Delete, last_row), (Op::Insert, row)],
224 &data_types,
225 )
226 } else {
227 let row = row::once(&curr_timestamp_datum);
228 state_table.insert(row);
229
230 StreamChunk::from_rows(&[(Op::Insert, row)], &data_types)
231 };
232
233 yield Message::Chunk(chunk);
234 last_timestamp_datum.clone_from(&curr_timestamp_datum)
235 }
236 (
237 &NowMode::GenerateSeries {
238 start_timestamp, ..
239 },
240 &mut ModeVars::GenerateSeries {
241 ref mut chunk_builder,
242 ref add_interval_expr,
243 },
244 ) => {
245 if last_timestamp_datum.is_none() {
246 let first = Some(start_timestamp.into());
248 let first_row = row::once(&first);
249 let _ = chunk_builder.append_row(Op::Insert, first_row);
250 state_table.insert(first_row);
251 last_timestamp_datum = first;
252 }
253
254 let mut last_row = OwnedRow::new(vec![last_timestamp_datum.clone()]);
258
259 loop {
260 if chunk_builder.size() >= max_chunk_size {
261 if let Some(chunk) = chunk_builder.take() {
265 yield Message::Chunk(chunk);
266 }
267 }
268
269 let next = add_interval_expr.eval_row_infallible(&last_row).await;
270 if DefaultOrdered(next.to_datum_ref())
271 > DefaultOrdered(curr_timestamp_datum.to_datum_ref())
272 {
273 break;
275 }
276
277 let next_row = OwnedRow::new(vec![next]);
278 let _ = chunk_builder.append_row(Op::Insert, &next_row);
279 last_row = next_row;
280 }
281
282 if let Some(chunk) = chunk_builder.take() {
283 yield Message::Chunk(chunk);
284 }
285
286 state_table.update(row::once(&last_timestamp_datum), &last_row);
288 last_timestamp_datum =
289 Itertools::exactly_one(last_row.into_inner().into_vec().into_iter())
290 .unwrap();
291 }
292 _ => unreachable!(),
293 }
294
295 yield Message::Watermark(Watermark::new(
296 0,
297 DataType::Timestamptz,
298 curr_timestamp_datum.unwrap(),
299 ));
300 }
301 }
302}
303
304impl<S: StateStore> Execute for NowExecutor<S> {
305 fn execute(self: Box<Self>) -> BoxedMessageStream {
306 self.execute_inner().boxed()
307 }
308}
309
310#[capture_context(TIME_ZONE)]
311pub fn build_add_interval_expr(
312 time_zone: &str,
313 interval: Interval,
314 eval_error_report: impl EvalErrorReport + 'static,
315) -> risingwave_expr::Result<NonStrictExpression> {
316 let timestamptz_input = InputRefExpression::new(DataType::Timestamptz, 0);
317 let interval = LiteralExpression::new(DataType::Interval, Some(interval.into()));
318 let time_zone = LiteralExpression::new(DataType::Varchar, Some(time_zone.into()));
319
320 use risingwave_pb::expr::expr_node::PbType as PbExprType;
321 build_func_non_strict(
322 PbExprType::AddWithTimeZone,
323 DataType::Timestamptz,
324 vec![
325 timestamptz_input.boxed(),
326 interval.boxed(),
327 time_zone.boxed(),
328 ],
329 eval_error_report,
330 )
331}
332
333#[cfg(test)]
334mod tests {
335 use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
336 use risingwave_common::test_prelude::StreamChunkTestExt;
337 use risingwave_common::types::test_utils::IntervalTestExt;
338 use risingwave_common::util::epoch::test_epoch;
339 use risingwave_storage::memory::MemoryStateStore;
340 use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
341
342 use super::*;
343 use crate::common::table::test_utils::gen_pbtable;
344 use crate::executor::test_utils::StreamExecutorTestExt;
345
346 #[tokio::test]
347 async fn test_now() -> StreamExecutorResult<()> {
348 let state_store = create_state_store();
349 let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
350
351 tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap();
353
354 now.next_unwrap_ready_barrier()?;
356
357 let chunk_msg = now.next_unwrap_ready_chunk()?;
359
360 assert_eq!(
361 chunk_msg.compact_vis(),
362 StreamChunk::from_pretty(
363 " TZ
364 + 2021-04-01T00:00:00.001Z"
365 )
366 );
367
368 let watermark = now.next_unwrap_ready_watermark()?;
370
371 assert_eq!(
372 watermark,
373 Watermark::new(
374 0,
375 DataType::Timestamptz,
376 ScalarImpl::Timestamptz("2021-04-01T00:00:00.001Z".parse().unwrap())
377 )
378 );
379
380 tx.send(Barrier::with_prev_epoch_for_test(
381 test_epoch(2),
382 test_epoch(1),
383 ))
384 .unwrap();
385
386 now.next_unwrap_ready_barrier()?;
388
389 let chunk_msg = now.next_unwrap_ready_chunk()?;
391
392 assert_eq!(
393 chunk_msg.compact_vis(),
394 StreamChunk::from_pretty(
395 " TZ
396 - 2021-04-01T00:00:00.001Z
397 + 2021-04-01T00:00:00.002Z"
398 )
399 );
400
401 let watermark = now.next_unwrap_ready_watermark()?;
403
404 assert_eq!(
405 watermark,
406 Watermark::new(
407 0,
408 DataType::Timestamptz,
409 ScalarImpl::Timestamptz("2021-04-01T00:00:00.002Z".parse().unwrap())
410 )
411 );
412
413 now.next_unwrap_pending();
415
416 drop((tx, now));
418 let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
419 tx.send(Barrier::with_prev_epoch_for_test(
420 test_epoch(3),
421 test_epoch(1),
422 ))
423 .unwrap();
424
425 now.next_unwrap_ready_barrier()?;
427
428 let chunk_msg = now.next_unwrap_ready_chunk()?;
430 assert_eq!(
431 chunk_msg.compact_vis(),
432 StreamChunk::from_pretty(
434 " TZ
435 - 2021-04-01T00:00:00.001Z
436 + 2021-04-01T00:00:00.003Z"
437 )
438 );
439
440 let watermark = now.next_unwrap_ready_watermark()?;
442
443 assert_eq!(
444 watermark,
445 Watermark::new(
446 0,
447 DataType::Timestamptz,
448 ScalarImpl::Timestamptz("2021-04-01T00:00:00.003Z".parse().unwrap())
449 )
450 );
451
452 drop((tx, now));
454 let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
455 tx.send(
456 Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(1))
457 .with_mutation(Mutation::Pause),
458 )
459 .unwrap();
460
461 now.next_unwrap_ready_barrier()?;
463
464 now.next_unwrap_pending();
466
467 tx.send(
469 Barrier::with_prev_epoch_for_test(test_epoch(5), test_epoch(4))
470 .with_mutation(Mutation::Resume),
471 )
472 .unwrap();
473
474 now.next_unwrap_ready_barrier()?;
476
477 let chunk_msg = now.next_unwrap_ready_chunk()?;
479 assert_eq!(
480 chunk_msg.compact_vis(),
481 StreamChunk::from_pretty(
482 " TZ
483 - 2021-04-01T00:00:00.001Z
484 + 2021-04-01T00:00:00.005Z"
485 )
486 );
487
488 let watermark = now.next_unwrap_ready_watermark()?;
490
491 assert_eq!(
492 watermark,
493 Watermark::new(
494 0,
495 DataType::Timestamptz,
496 ScalarImpl::Timestamptz("2021-04-01T00:00:00.005Z".parse().unwrap())
497 )
498 );
499
500 Ok(())
501 }
502
503 #[tokio::test]
504 async fn test_now_start_with_paused() -> StreamExecutorResult<()> {
505 let state_store = create_state_store();
506 let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
507
508 tx.send(Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Pause))
510 .unwrap();
511
512 now.next_unwrap_ready_barrier()?;
514
515 now.next_unwrap_pending();
517
518 tx.send(
520 Barrier::with_prev_epoch_for_test(test_epoch(2), test_epoch(1))
521 .with_mutation(Mutation::Resume),
522 )
523 .unwrap();
524
525 now.next_unwrap_ready_barrier()?;
527
528 let chunk_msg = now.next_unwrap_ready_chunk()?;
530
531 assert_eq!(
532 chunk_msg.compact_vis(),
533 StreamChunk::from_pretty(
534 " TZ
535 + 2021-04-01T00:00:00.002Z" )
537 );
538
539 let watermark = now.next_unwrap_ready_watermark()?;
541
542 assert_eq!(
543 watermark,
544 Watermark::new(
545 0,
546 DataType::Timestamptz,
547 ScalarImpl::Timestamptz("2021-04-01T00:00:00.002Z".parse().unwrap())
548 )
549 );
550
551 now.next_unwrap_pending();
553
554 Ok(())
555 }
556
557 #[tokio::test]
558 async fn test_now_generate_series() -> StreamExecutorResult<()> {
559 TIME_ZONE::scope("UTC".to_owned(), test_now_generate_series_inner()).await
560 }
561
562 #[tokio::test]
563 async fn test_now_with_progress_ratio() -> StreamExecutorResult<()> {
564 let state_store = create_state_store();
565 let progress_ratio = Some(2.0);
566 let (tx, mut now) = create_executor_with_progress_ratio(
567 NowMode::UpdateCurrent,
568 &state_store,
569 progress_ratio,
570 )
571 .await;
572
573 tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap();
575
576 now.next_unwrap_ready_barrier()?;
578
579 let chunk_msg = now.next_unwrap_ready_chunk()?;
581
582 assert_eq!(
583 chunk_msg.compact_vis(),
584 StreamChunk::from_pretty(
585 " TZ
586 + 2021-04-01T00:00:00.001Z"
587 )
588 );
589
590 let watermark = now.next_unwrap_ready_watermark()?;
592
593 assert_eq!(
594 watermark,
595 Watermark::new(
596 0,
597 DataType::Timestamptz,
598 ScalarImpl::Timestamptz("2021-04-01T00:00:00.001Z".parse().unwrap())
599 )
600 );
601
602 tx.send(Barrier::with_prev_epoch_for_test(
607 test_epoch(5000),
608 test_epoch(1),
609 ))
610 .unwrap();
611
612 now.next_unwrap_ready_barrier()?;
614
615 let chunk_msg = now.next_unwrap_ready_chunk()?;
617
618 assert_eq!(
619 chunk_msg.compact_vis(),
620 StreamChunk::from_pretty(
621 " TZ
622 - 2021-04-01T00:00:00.001Z
623 + 2021-04-01T00:00:02.001Z" )
625 );
626
627 let watermark = now.next_unwrap_ready_watermark()?;
629
630 assert_eq!(
631 watermark,
632 Watermark::new(
633 0,
634 DataType::Timestamptz,
635 ScalarImpl::Timestamptz("2021-04-01T00:00:02.001Z".parse().unwrap())
636 )
637 );
638
639 tx.send(Barrier::with_prev_epoch_for_test(
643 test_epoch(10000),
644 test_epoch(5000),
645 ))
646 .unwrap();
647
648 now.next_unwrap_ready_barrier()?;
650
651 let chunk_msg = now.next_unwrap_ready_chunk()?;
653
654 assert_eq!(
655 chunk_msg.compact_vis(),
656 StreamChunk::from_pretty(
657 " TZ
658 - 2021-04-01T00:00:02.001Z
659 + 2021-04-01T00:00:04.001Z" )
661 );
662
663 let watermark = now.next_unwrap_ready_watermark()?;
665
666 assert_eq!(
667 watermark,
668 Watermark::new(
669 0,
670 DataType::Timestamptz,
671 ScalarImpl::Timestamptz("2021-04-01T00:00:04.001Z".parse().unwrap())
672 )
673 );
674
675 tx.send(Barrier::with_prev_epoch_for_test(
679 test_epoch(15000),
680 test_epoch(10000),
681 ))
682 .unwrap();
683
684 now.next_unwrap_ready_barrier()?;
686
687 let chunk_msg = now.next_unwrap_ready_chunk()?;
689
690 assert_eq!(
691 chunk_msg.compact_vis(),
692 StreamChunk::from_pretty(
693 " TZ
694 - 2021-04-01T00:00:04.001Z
695 + 2021-04-01T00:00:06.001Z" )
697 );
698
699 let watermark = now.next_unwrap_ready_watermark()?;
701
702 assert_eq!(
703 watermark,
704 Watermark::new(
705 0,
706 DataType::Timestamptz,
707 ScalarImpl::Timestamptz("2021-04-01T00:00:06.001Z".parse().unwrap())
708 )
709 );
710
711 tx.send(Barrier::with_prev_epoch_for_test(
715 test_epoch(20000),
716 test_epoch(15000),
717 ))
718 .unwrap();
719
720 now.next_unwrap_ready_barrier()?;
722
723 let chunk_msg = now.next_unwrap_ready_chunk()?;
725
726 assert_eq!(
727 chunk_msg.compact_vis(),
728 StreamChunk::from_pretty(
729 " TZ
730 - 2021-04-01T00:00:06.001Z
731 + 2021-04-01T00:00:08.001Z" )
733 );
734
735 let watermark = now.next_unwrap_ready_watermark()?;
737
738 assert_eq!(
739 watermark,
740 Watermark::new(
741 0,
742 DataType::Timestamptz,
743 ScalarImpl::Timestamptz("2021-04-01T00:00:08.001Z".parse().unwrap())
744 )
745 );
746
747 tx.send(Barrier::with_prev_epoch_for_test(
752 test_epoch(25000),
753 test_epoch(20000),
754 ))
755 .unwrap();
756
757 now.next_unwrap_ready_barrier()?;
759
760 let chunk_msg = now.next_unwrap_ready_chunk()?;
762
763 assert_eq!(
764 chunk_msg.compact_vis(),
765 StreamChunk::from_pretty(
766 " TZ
767 - 2021-04-01T00:00:08.001Z
768 + 2021-04-01T00:00:10.001Z" )
770 );
771
772 let watermark = now.next_unwrap_ready_watermark()?;
774
775 assert_eq!(
776 watermark,
777 Watermark::new(
778 0,
779 DataType::Timestamptz,
780 ScalarImpl::Timestamptz("2021-04-01T00:00:10.001Z".parse().unwrap())
781 )
782 );
783
784 tx.send(Barrier::with_prev_epoch_for_test(
789 test_epoch(30000),
790 test_epoch(25000),
791 ))
792 .unwrap();
793
794 now.next_unwrap_ready_barrier()?;
796
797 let chunk_msg = now.next_unwrap_ready_chunk()?;
799
800 assert_eq!(
801 chunk_msg.compact_vis(),
802 StreamChunk::from_pretty(
803 " TZ
804 - 2021-04-01T00:00:10.001Z
805 + 2021-04-01T00:00:12.001Z" )
807 );
808
809 let watermark = now.next_unwrap_ready_watermark()?;
811
812 assert_eq!(
813 watermark,
814 Watermark::new(
815 0,
816 DataType::Timestamptz,
817 ScalarImpl::Timestamptz("2021-04-01T00:00:12.001Z".parse().unwrap())
818 )
819 );
820
821 Ok(())
822 }
823
824 async fn test_now_generate_series_inner() -> StreamExecutorResult<()> {
825 let start_timestamp = Timestamptz::from_secs(1617235190).unwrap(); let interval = Interval::from_millis(1000); let state_store = create_state_store();
829 let (tx, mut now) = create_executor(
830 NowMode::GenerateSeries {
831 start_timestamp,
832 interval,
833 },
834 &state_store,
835 )
836 .await;
837
838 tx.send(Barrier::new_test_barrier(test_epoch(1000)))
840 .unwrap();
841 now.next_unwrap_ready_barrier()?;
842
843 let chunk = now.next_unwrap_ready_chunk()?;
845 assert_eq!(chunk.cardinality(), 12); assert_eq!(
848 now.next_unwrap_ready_watermark()?,
849 Watermark::new(
850 0,
851 DataType::Timestamptz,
852 ScalarImpl::Timestamptz("2021-04-01T00:00:01.000Z".parse().unwrap())
853 )
854 );
855
856 tx.send(Barrier::with_prev_epoch_for_test(
857 test_epoch(2000),
858 test_epoch(1000),
859 ))
860 .unwrap();
861 tx.send(Barrier::with_prev_epoch_for_test(
862 test_epoch(3000),
863 test_epoch(2000),
864 ))
865 .unwrap();
866
867 now.next_unwrap_ready_barrier()?;
868 now.next_unwrap_ready_barrier()?;
869
870 let chunk = now.next_unwrap_ready_chunk()?;
871 assert_eq!(
872 chunk.compact_vis(),
873 StreamChunk::from_pretty(
874 " TZ
875 + 2021-04-01T00:00:02.000Z
876 + 2021-04-01T00:00:03.000Z"
877 )
878 );
879
880 let watermark = now.next_unwrap_ready_watermark()?;
881 assert_eq!(
882 watermark,
883 Watermark::new(
884 0,
885 DataType::Timestamptz,
886 ScalarImpl::Timestamptz("2021-04-01T00:00:03.000Z".parse().unwrap())
887 )
888 );
889
890 drop((tx, now));
892 let (tx, mut now) = create_executor(
893 NowMode::GenerateSeries {
894 start_timestamp,
895 interval,
896 },
897 &state_store,
898 )
899 .await;
900
901 tx.send(Barrier::with_prev_epoch_for_test(
902 test_epoch(4000),
903 test_epoch(2000),
904 ))
905 .unwrap();
906
907 now.next_unwrap_ready_barrier()?;
908
909 let chunk = now.next_unwrap_ready_chunk()?;
910 assert_eq!(
911 chunk.compact_vis(),
912 StreamChunk::from_pretty(
913 " TZ
914 + 2021-04-01T00:00:02.000Z
915 + 2021-04-01T00:00:03.000Z
916 + 2021-04-01T00:00:04.000Z"
917 )
918 );
919
920 let watermark = now.next_unwrap_ready_watermark()?;
921 assert_eq!(
922 watermark,
923 Watermark::new(
924 0,
925 DataType::Timestamptz,
926 ScalarImpl::Timestamptz("2021-04-01T00:00:04.000Z".parse().unwrap())
927 )
928 );
929
930 Ok(())
931 }
932
933 fn create_state_store() -> MemoryStateStore {
934 MemoryStateStore::new()
935 }
936
937 async fn create_executor_with_progress_ratio(
938 mode: NowMode,
939 state_store: &MemoryStateStore,
940 progress_ratio: Option<f32>,
941 ) -> (UnboundedSender<Barrier>, BoxedMessageStream) {
942 let table_id = TableId::new(1);
943 let column_descs = vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamptz)];
944 let state_table = StateTable::from_table_catalog(
945 &gen_pbtable(table_id, column_descs, vec![], vec![], 0),
946 state_store.clone(),
947 None,
948 )
949 .await;
950
951 let (sender, barrier_receiver) = unbounded_channel();
952
953 let eval_error_report = ActorEvalErrorReport {
954 actor_context: ActorContext::for_test(123),
955 identity: "NowExecutor".into(),
956 };
957 let barrier_interval_ms = 1000;
958 let now_executor = NowExecutor::new(
959 vec![DataType::Timestamptz],
960 mode,
961 eval_error_report,
962 barrier_receiver,
963 state_table,
964 progress_ratio,
965 barrier_interval_ms,
966 );
967 (sender, now_executor.boxed().execute())
968 }
969
970 async fn create_executor(
971 mode: NowMode,
972 state_store: &MemoryStateStore,
973 ) -> (UnboundedSender<Barrier>, BoxedMessageStream) {
974 create_executor_with_progress_ratio(mode, state_store, None).await
975 }
976}