1use itertools::Itertools;
16use risingwave_common::array::Op;
17use risingwave_common::row;
18use risingwave_common::types::{DefaultOrdered, Interval, Timestamptz, ToDatumRef};
19use risingwave_expr::capture_context;
20use risingwave_expr::expr::{
21 EvalErrorReport, ExpressionBoxExt, InputRefExpression, LiteralExpression, NonStrictExpression,
22 build_func_non_strict,
23};
24use risingwave_expr::expr_context::TIME_ZONE;
25use tokio::sync::mpsc::UnboundedReceiver;
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28use crate::executor::prelude::*;
29use crate::task::ActorEvalErrorReport;
30
31pub struct NowExecutor<S: StateStore> {
32 data_types: Vec<DataType>,
33
34 mode: NowMode,
35 eval_error_report: ActorEvalErrorReport,
36
37 barrier_receiver: UnboundedReceiver<Barrier>,
39
40 state_table: StateTable<S>,
41}
42
43pub enum NowMode {
44 UpdateCurrent,
46 GenerateSeries {
49 start_timestamp: Timestamptz,
50 interval: Interval,
51 },
52}
53
54enum ModeVars {
55 UpdateCurrent,
56 GenerateSeries {
57 chunk_builder: StreamChunkBuilder,
58 add_interval_expr: NonStrictExpression,
59 },
60}
61
62impl<S: StateStore> NowExecutor<S> {
63 pub fn new(
64 data_types: Vec<DataType>,
65 mode: NowMode,
66 eval_error_report: ActorEvalErrorReport,
67 barrier_receiver: UnboundedReceiver<Barrier>,
68 state_table: StateTable<S>,
69 ) -> Self {
70 Self {
71 data_types,
72 mode,
73 eval_error_report,
74 barrier_receiver,
75 state_table,
76 }
77 }
78
79 #[try_stream(ok = Message, error = StreamExecutorError)]
80 async fn execute_inner(self) {
81 let Self {
82 data_types,
83 mode,
84 eval_error_report,
85 barrier_receiver,
86 mut state_table,
87 } = self;
88
89 let max_chunk_size = crate::config::chunk_size();
90
91 let mut paused = false;
93 let mut last_timestamp: Datum = None;
95
96 let mut initialized = false;
98
99 let mut mode_vars = match &mode {
100 NowMode::UpdateCurrent => ModeVars::UpdateCurrent,
101 NowMode::GenerateSeries { interval, .. } => {
102 let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1));
104 let add_interval_expr =
105 build_add_interval_expr_captured(*interval, eval_error_report)?;
106 ModeVars::GenerateSeries {
107 chunk_builder,
108 add_interval_expr,
109 }
110 }
111 };
112
113 const MAX_MERGE_BARRIER_SIZE: usize = 64;
114
115 #[for_await]
116 for barriers in
117 UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE)
118 {
119 let mut curr_timestamp = None;
120 if barriers.len() > 1 {
121 warn!(
122 "handle multiple barriers at once in now executor: {}",
123 barriers.len()
124 );
125 }
126 for barrier in barriers {
127 let new_timestamp = Some(barrier.get_curr_epoch().as_scalar());
128 let pause_mutation =
129 barrier
130 .mutation
131 .as_deref()
132 .and_then(|mutation| match mutation {
133 Mutation::Pause => Some(true),
134 Mutation::Resume => Some(false),
135 _ => None,
136 });
137
138 if !initialized {
139 let first_epoch = barrier.epoch;
140 let is_pause_on_startup = barrier.is_pause_on_startup();
141 yield Message::Barrier(barrier);
142 state_table.init_epoch(first_epoch).await?;
144 last_timestamp = state_table.get_from_one_value_table().await?;
145 paused = is_pause_on_startup;
146 initialized = true;
147 } else {
148 state_table
149 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
150 .await?;
151 yield Message::Barrier(barrier);
152 }
153
154 curr_timestamp = new_timestamp;
156
157 if let Some(pause_mutation) = pause_mutation {
159 paused = pause_mutation;
160 }
161 }
162
163 if paused {
165 continue;
166 }
167
168 match (&mode, &mut mode_vars) {
169 (NowMode::UpdateCurrent, ModeVars::UpdateCurrent) => {
170 let chunk = if last_timestamp.is_some() {
171 let last_row = row::once(&last_timestamp);
172 let row = row::once(&curr_timestamp);
173 state_table.update(last_row, row);
174
175 StreamChunk::from_rows(
176 &[(Op::Delete, last_row), (Op::Insert, row)],
177 &data_types,
178 )
179 } else {
180 let row = row::once(&curr_timestamp);
181 state_table.insert(row);
182
183 StreamChunk::from_rows(&[(Op::Insert, row)], &data_types)
184 };
185
186 yield Message::Chunk(chunk);
187 last_timestamp.clone_from(&curr_timestamp)
188 }
189 (
190 &NowMode::GenerateSeries {
191 start_timestamp, ..
192 },
193 &mut ModeVars::GenerateSeries {
194 ref mut chunk_builder,
195 ref add_interval_expr,
196 },
197 ) => {
198 if last_timestamp.is_none() {
199 let first = Some(start_timestamp.into());
201 let first_row = row::once(&first);
202 let _ = chunk_builder.append_row(Op::Insert, first_row);
203 state_table.insert(first_row);
204 last_timestamp = first;
205 }
206
207 let mut last_row = OwnedRow::new(vec![last_timestamp.clone()]);
211
212 loop {
213 if chunk_builder.size() >= max_chunk_size {
214 if let Some(chunk) = chunk_builder.take() {
218 yield Message::Chunk(chunk);
219 }
220 }
221
222 let next = add_interval_expr.eval_row_infallible(&last_row).await;
223 if DefaultOrdered(next.to_datum_ref())
224 > DefaultOrdered(curr_timestamp.to_datum_ref())
225 {
226 break;
228 }
229
230 let next_row = OwnedRow::new(vec![next]);
231 let _ = chunk_builder.append_row(Op::Insert, &next_row);
232 last_row = next_row;
233 }
234
235 if let Some(chunk) = chunk_builder.take() {
236 yield Message::Chunk(chunk);
237 }
238
239 state_table.update(row::once(&last_timestamp), &last_row);
241 last_timestamp = last_row
242 .into_inner()
243 .into_vec()
244 .into_iter()
245 .exactly_one()
246 .unwrap();
247 }
248 _ => unreachable!(),
249 }
250
251 yield Message::Watermark(Watermark::new(
252 0,
253 DataType::Timestamptz,
254 curr_timestamp.unwrap(),
255 ));
256 }
257 }
258}
259
260impl<S: StateStore> Execute for NowExecutor<S> {
261 fn execute(self: Box<Self>) -> BoxedMessageStream {
262 self.execute_inner().boxed()
263 }
264}
265
266#[capture_context(TIME_ZONE)]
267pub fn build_add_interval_expr(
268 time_zone: &str,
269 interval: Interval,
270 eval_error_report: impl EvalErrorReport + 'static,
271) -> risingwave_expr::Result<NonStrictExpression> {
272 let timestamptz_input = InputRefExpression::new(DataType::Timestamptz, 0);
273 let interval = LiteralExpression::new(DataType::Interval, Some(interval.into()));
274 let time_zone = LiteralExpression::new(DataType::Varchar, Some(time_zone.into()));
275
276 use risingwave_pb::expr::expr_node::PbType as PbExprType;
277 build_func_non_strict(
278 PbExprType::AddWithTimeZone,
279 DataType::Timestamptz,
280 vec![
281 timestamptz_input.boxed(),
282 interval.boxed(),
283 time_zone.boxed(),
284 ],
285 eval_error_report,
286 )
287}
288
289#[cfg(test)]
290mod tests {
291 use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
292 use risingwave_common::test_prelude::StreamChunkTestExt;
293 use risingwave_common::types::test_utils::IntervalTestExt;
294 use risingwave_common::util::epoch::test_epoch;
295 use risingwave_storage::memory::MemoryStateStore;
296 use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
297
298 use super::*;
299 use crate::common::table::test_utils::gen_pbtable;
300 use crate::executor::test_utils::StreamExecutorTestExt;
301
302 #[tokio::test]
303 async fn test_now() -> StreamExecutorResult<()> {
304 let state_store = create_state_store();
305 let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
306
307 tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap();
309
310 now.next_unwrap_ready_barrier()?;
312
313 let chunk_msg = now.next_unwrap_ready_chunk()?;
315
316 assert_eq!(
317 chunk_msg.compact(),
318 StreamChunk::from_pretty(
319 " TZ
320 + 2021-04-01T00:00:00.001Z"
321 )
322 );
323
324 let watermark = now.next_unwrap_ready_watermark()?;
326
327 assert_eq!(
328 watermark,
329 Watermark::new(
330 0,
331 DataType::Timestamptz,
332 ScalarImpl::Timestamptz("2021-04-01T00:00:00.001Z".parse().unwrap())
333 )
334 );
335
336 tx.send(Barrier::with_prev_epoch_for_test(
337 test_epoch(2),
338 test_epoch(1),
339 ))
340 .unwrap();
341
342 now.next_unwrap_ready_barrier()?;
344
345 let chunk_msg = now.next_unwrap_ready_chunk()?;
347
348 assert_eq!(
349 chunk_msg.compact(),
350 StreamChunk::from_pretty(
351 " TZ
352 - 2021-04-01T00:00:00.001Z
353 + 2021-04-01T00:00:00.002Z"
354 )
355 );
356
357 let watermark = now.next_unwrap_ready_watermark()?;
359
360 assert_eq!(
361 watermark,
362 Watermark::new(
363 0,
364 DataType::Timestamptz,
365 ScalarImpl::Timestamptz("2021-04-01T00:00:00.002Z".parse().unwrap())
366 )
367 );
368
369 now.next_unwrap_pending();
371
372 drop((tx, now));
374 let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
375 tx.send(Barrier::with_prev_epoch_for_test(
376 test_epoch(3),
377 test_epoch(1),
378 ))
379 .unwrap();
380
381 now.next_unwrap_ready_barrier()?;
383
384 let chunk_msg = now.next_unwrap_ready_chunk()?;
386 assert_eq!(
387 chunk_msg.compact(),
388 StreamChunk::from_pretty(
390 " TZ
391 - 2021-04-01T00:00:00.001Z
392 + 2021-04-01T00:00:00.003Z"
393 )
394 );
395
396 let watermark = now.next_unwrap_ready_watermark()?;
398
399 assert_eq!(
400 watermark,
401 Watermark::new(
402 0,
403 DataType::Timestamptz,
404 ScalarImpl::Timestamptz("2021-04-01T00:00:00.003Z".parse().unwrap())
405 )
406 );
407
408 drop((tx, now));
410 let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
411 tx.send(
412 Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(1))
413 .with_mutation(Mutation::Pause),
414 )
415 .unwrap();
416
417 now.next_unwrap_ready_barrier()?;
419
420 now.next_unwrap_pending();
422
423 tx.send(
425 Barrier::with_prev_epoch_for_test(test_epoch(5), test_epoch(4))
426 .with_mutation(Mutation::Resume),
427 )
428 .unwrap();
429
430 now.next_unwrap_ready_barrier()?;
432
433 let chunk_msg = now.next_unwrap_ready_chunk()?;
435 assert_eq!(
436 chunk_msg.compact(),
437 StreamChunk::from_pretty(
438 " TZ
439 - 2021-04-01T00:00:00.001Z
440 + 2021-04-01T00:00:00.005Z"
441 )
442 );
443
444 let watermark = now.next_unwrap_ready_watermark()?;
446
447 assert_eq!(
448 watermark,
449 Watermark::new(
450 0,
451 DataType::Timestamptz,
452 ScalarImpl::Timestamptz("2021-04-01T00:00:00.005Z".parse().unwrap())
453 )
454 );
455
456 Ok(())
457 }
458
459 #[tokio::test]
460 async fn test_now_start_with_paused() -> StreamExecutorResult<()> {
461 let state_store = create_state_store();
462 let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
463
464 tx.send(Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Pause))
466 .unwrap();
467
468 now.next_unwrap_ready_barrier()?;
470
471 now.next_unwrap_pending();
473
474 tx.send(
476 Barrier::with_prev_epoch_for_test(test_epoch(2), test_epoch(1))
477 .with_mutation(Mutation::Resume),
478 )
479 .unwrap();
480
481 now.next_unwrap_ready_barrier()?;
483
484 let chunk_msg = now.next_unwrap_ready_chunk()?;
486
487 assert_eq!(
488 chunk_msg.compact(),
489 StreamChunk::from_pretty(
490 " TZ
491 + 2021-04-01T00:00:00.002Z" )
493 );
494
495 let watermark = now.next_unwrap_ready_watermark()?;
497
498 assert_eq!(
499 watermark,
500 Watermark::new(
501 0,
502 DataType::Timestamptz,
503 ScalarImpl::Timestamptz("2021-04-01T00:00:00.002Z".parse().unwrap())
504 )
505 );
506
507 now.next_unwrap_pending();
509
510 Ok(())
511 }
512
513 #[tokio::test]
514 async fn test_now_generate_series() -> StreamExecutorResult<()> {
515 TIME_ZONE::scope("UTC".to_owned(), test_now_generate_series_inner()).await
516 }
517
518 async fn test_now_generate_series_inner() -> StreamExecutorResult<()> {
519 let start_timestamp = Timestamptz::from_secs(1617235190).unwrap(); let interval = Interval::from_millis(1000); let state_store = create_state_store();
523 let (tx, mut now) = create_executor(
524 NowMode::GenerateSeries {
525 start_timestamp,
526 interval,
527 },
528 &state_store,
529 )
530 .await;
531
532 tx.send(Barrier::new_test_barrier(test_epoch(1000)))
534 .unwrap();
535 now.next_unwrap_ready_barrier()?;
536
537 let chunk = now.next_unwrap_ready_chunk()?;
539 assert_eq!(chunk.cardinality(), 12); assert_eq!(
542 now.next_unwrap_ready_watermark()?,
543 Watermark::new(
544 0,
545 DataType::Timestamptz,
546 ScalarImpl::Timestamptz("2021-04-01T00:00:01.000Z".parse().unwrap())
547 )
548 );
549
550 tx.send(Barrier::with_prev_epoch_for_test(
551 test_epoch(2000),
552 test_epoch(1000),
553 ))
554 .unwrap();
555 tx.send(Barrier::with_prev_epoch_for_test(
556 test_epoch(3000),
557 test_epoch(2000),
558 ))
559 .unwrap();
560
561 now.next_unwrap_ready_barrier()?;
562 now.next_unwrap_ready_barrier()?;
563
564 let chunk = now.next_unwrap_ready_chunk()?;
565 assert_eq!(
566 chunk.compact(),
567 StreamChunk::from_pretty(
568 " TZ
569 + 2021-04-01T00:00:02.000Z
570 + 2021-04-01T00:00:03.000Z"
571 )
572 );
573
574 let watermark = now.next_unwrap_ready_watermark()?;
575 assert_eq!(
576 watermark,
577 Watermark::new(
578 0,
579 DataType::Timestamptz,
580 ScalarImpl::Timestamptz("2021-04-01T00:00:03.000Z".parse().unwrap())
581 )
582 );
583
584 drop((tx, now));
586 let (tx, mut now) = create_executor(
587 NowMode::GenerateSeries {
588 start_timestamp,
589 interval,
590 },
591 &state_store,
592 )
593 .await;
594
595 tx.send(Barrier::with_prev_epoch_for_test(
596 test_epoch(4000),
597 test_epoch(2000),
598 ))
599 .unwrap();
600
601 now.next_unwrap_ready_barrier()?;
602
603 let chunk = now.next_unwrap_ready_chunk()?;
604 assert_eq!(
605 chunk.compact(),
606 StreamChunk::from_pretty(
607 " TZ
608 + 2021-04-01T00:00:02.000Z
609 + 2021-04-01T00:00:03.000Z
610 + 2021-04-01T00:00:04.000Z"
611 )
612 );
613
614 let watermark = now.next_unwrap_ready_watermark()?;
615 assert_eq!(
616 watermark,
617 Watermark::new(
618 0,
619 DataType::Timestamptz,
620 ScalarImpl::Timestamptz("2021-04-01T00:00:04.000Z".parse().unwrap())
621 )
622 );
623
624 Ok(())
625 }
626
627 fn create_state_store() -> MemoryStateStore {
628 MemoryStateStore::new()
629 }
630
631 async fn create_executor(
632 mode: NowMode,
633 state_store: &MemoryStateStore,
634 ) -> (UnboundedSender<Barrier>, BoxedMessageStream) {
635 let table_id = TableId::new(1);
636 let column_descs = vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamptz)];
637 let state_table = StateTable::from_table_catalog(
638 &gen_pbtable(table_id, column_descs, vec![], vec![], 0),
639 state_store.clone(),
640 None,
641 )
642 .await;
643
644 let (sender, barrier_receiver) = unbounded_channel();
645
646 let eval_error_report = ActorEvalErrorReport {
647 actor_context: ActorContext::for_test(123),
648 identity: "NowExecutor".into(),
649 };
650 let now_executor = NowExecutor::new(
651 vec![DataType::Timestamptz],
652 mode,
653 eval_error_report,
654 barrier_receiver,
655 state_table,
656 );
657 (sender, now_executor.boxed().execute())
658 }
659}