risingwave_stream/executor/
now.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::array::Op;
17use risingwave_common::row;
18use risingwave_common::types::{DefaultOrdered, Interval, Timestamptz, ToDatumRef};
19use risingwave_expr::capture_context;
20use risingwave_expr::expr::{
21    EvalErrorReport, ExpressionBoxExt, InputRefExpression, LiteralExpression, NonStrictExpression,
22    build_func_non_strict,
23};
24use risingwave_expr::expr_context::TIME_ZONE;
25use tokio::sync::mpsc::UnboundedReceiver;
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28use crate::executor::prelude::*;
29use crate::task::ActorEvalErrorReport;
30
31pub struct NowExecutor<S: StateStore> {
32    data_types: Vec<DataType>,
33
34    mode: NowMode,
35    eval_error_report: ActorEvalErrorReport,
36
37    /// Receiver of barrier channel.
38    barrier_receiver: UnboundedReceiver<Barrier>,
39
40    state_table: StateTable<S>,
41}
42
43pub enum NowMode {
44    /// Emit current timestamp on startup, update it on barrier.
45    UpdateCurrent,
46    /// Generate a series of timestamps starting from `start_timestamp` with `interval`.
47    /// Keep generating new timestamps on barrier.
48    GenerateSeries {
49        start_timestamp: Timestamptz,
50        interval: Interval,
51    },
52}
53
54enum ModeVars {
55    UpdateCurrent,
56    GenerateSeries {
57        chunk_builder: StreamChunkBuilder,
58        add_interval_expr: NonStrictExpression,
59    },
60}
61
62impl<S: StateStore> NowExecutor<S> {
63    pub fn new(
64        data_types: Vec<DataType>,
65        mode: NowMode,
66        eval_error_report: ActorEvalErrorReport,
67        barrier_receiver: UnboundedReceiver<Barrier>,
68        state_table: StateTable<S>,
69    ) -> Self {
70        Self {
71            data_types,
72            mode,
73            eval_error_report,
74            barrier_receiver,
75            state_table,
76        }
77    }
78
79    #[try_stream(ok = Message, error = StreamExecutorError)]
80    async fn execute_inner(self) {
81        let Self {
82            data_types,
83            mode,
84            eval_error_report,
85            barrier_receiver,
86            mut state_table,
87        } = self;
88
89        let max_chunk_size = crate::config::chunk_size();
90
91        // Whether the executor is paused.
92        let mut paused = false;
93        // The last timestamp **sent** to the downstream.
94        let mut last_timestamp: Datum = None;
95
96        // Whether the first barrier is handled and `last_timestamp` is initialized.
97        let mut initialized = false;
98
99        let mut mode_vars = match &mode {
100            NowMode::UpdateCurrent => ModeVars::UpdateCurrent,
101            NowMode::GenerateSeries { interval, .. } => {
102                // in most cases there won't be more than one row except for the first time
103                let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1));
104                let add_interval_expr =
105                    build_add_interval_expr_captured(*interval, eval_error_report)?;
106                ModeVars::GenerateSeries {
107                    chunk_builder,
108                    add_interval_expr,
109                }
110            }
111        };
112
113        const MAX_MERGE_BARRIER_SIZE: usize = 64;
114
115        #[for_await]
116        for barriers in
117            UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE)
118        {
119            let mut curr_timestamp = None;
120            if barriers.len() > 1 {
121                warn!(
122                    "handle multiple barriers at once in now executor: {}",
123                    barriers.len()
124                );
125            }
126            for barrier in barriers {
127                let new_timestamp = Some(barrier.get_curr_epoch().as_scalar());
128                let pause_mutation =
129                    barrier
130                        .mutation
131                        .as_deref()
132                        .and_then(|mutation| match mutation {
133                            Mutation::Pause => Some(true),
134                            Mutation::Resume => Some(false),
135                            _ => None,
136                        });
137
138                if !initialized {
139                    let first_epoch = barrier.epoch;
140                    let is_pause_on_startup = barrier.is_pause_on_startup();
141                    yield Message::Barrier(barrier);
142                    // Handle the initial barrier.
143                    state_table.init_epoch(first_epoch).await?;
144                    last_timestamp = state_table.get_from_one_value_table().await?;
145                    paused = is_pause_on_startup;
146                    initialized = true;
147                } else {
148                    state_table
149                        .commit_assert_no_update_vnode_bitmap(barrier.epoch)
150                        .await?;
151                    yield Message::Barrier(barrier);
152                }
153
154                // Extract timestamp from the current epoch.
155                curr_timestamp = new_timestamp;
156
157                // Update paused state.
158                if let Some(pause_mutation) = pause_mutation {
159                    paused = pause_mutation;
160                }
161            }
162
163            // Do not yield any messages if paused.
164            if paused {
165                continue;
166            }
167
168            match (&mode, &mut mode_vars) {
169                (NowMode::UpdateCurrent, ModeVars::UpdateCurrent) => {
170                    let chunk = if last_timestamp.is_some() {
171                        let last_row = row::once(&last_timestamp);
172                        let row = row::once(&curr_timestamp);
173                        state_table.update(last_row, row);
174
175                        StreamChunk::from_rows(
176                            &[(Op::Delete, last_row), (Op::Insert, row)],
177                            &data_types,
178                        )
179                    } else {
180                        let row = row::once(&curr_timestamp);
181                        state_table.insert(row);
182
183                        StreamChunk::from_rows(&[(Op::Insert, row)], &data_types)
184                    };
185
186                    yield Message::Chunk(chunk);
187                    last_timestamp.clone_from(&curr_timestamp)
188                }
189                (
190                    &NowMode::GenerateSeries {
191                        start_timestamp, ..
192                    },
193                    &mut ModeVars::GenerateSeries {
194                        ref mut chunk_builder,
195                        ref add_interval_expr,
196                    },
197                ) => {
198                    if last_timestamp.is_none() {
199                        // We haven't emit any timestamp yet. Let's emit the first one and populate the state table.
200                        let first = Some(start_timestamp.into());
201                        let first_row = row::once(&first);
202                        let _ = chunk_builder.append_row(Op::Insert, first_row);
203                        state_table.insert(first_row);
204                        last_timestamp = first;
205                    }
206
207                    // Now let's step through the timestamps from the last timestamp to the current timestamp.
208                    // We use `last_row` as a temporary cursor to track the progress, and won't touch `last_timestamp`
209                    // until the end of the loop, so that `last_timestamp` is always synced with the state table.
210                    let mut last_row = OwnedRow::new(vec![last_timestamp.clone()]);
211
212                    loop {
213                        if chunk_builder.size() >= max_chunk_size {
214                            // Manually yield the chunk when size exceeds the limit. We don't want to use chunk builder
215                            // with limited size here because the initial capacity can be too large for most cases.
216                            // Basically only the first several chunks can potentially exceed the `max_chunk_size`.
217                            if let Some(chunk) = chunk_builder.take() {
218                                yield Message::Chunk(chunk);
219                            }
220                        }
221
222                        let next = add_interval_expr.eval_row_infallible(&last_row).await;
223                        if DefaultOrdered(next.to_datum_ref())
224                            > DefaultOrdered(curr_timestamp.to_datum_ref())
225                        {
226                            // We only increase the timestamp to the current timestamp.
227                            break;
228                        }
229
230                        let next_row = OwnedRow::new(vec![next]);
231                        let _ = chunk_builder.append_row(Op::Insert, &next_row);
232                        last_row = next_row;
233                    }
234
235                    if let Some(chunk) = chunk_builder.take() {
236                        yield Message::Chunk(chunk);
237                    }
238
239                    // Update the last timestamp.
240                    state_table.update(row::once(&last_timestamp), &last_row);
241                    last_timestamp = last_row
242                        .into_inner()
243                        .into_vec()
244                        .into_iter()
245                        .exactly_one()
246                        .unwrap();
247                }
248                _ => unreachable!(),
249            }
250
251            yield Message::Watermark(Watermark::new(
252                0,
253                DataType::Timestamptz,
254                curr_timestamp.unwrap(),
255            ));
256        }
257    }
258}
259
260impl<S: StateStore> Execute for NowExecutor<S> {
261    fn execute(self: Box<Self>) -> BoxedMessageStream {
262        self.execute_inner().boxed()
263    }
264}
265
266#[capture_context(TIME_ZONE)]
267pub fn build_add_interval_expr(
268    time_zone: &str,
269    interval: Interval,
270    eval_error_report: impl EvalErrorReport + 'static,
271) -> risingwave_expr::Result<NonStrictExpression> {
272    let timestamptz_input = InputRefExpression::new(DataType::Timestamptz, 0);
273    let interval = LiteralExpression::new(DataType::Interval, Some(interval.into()));
274    let time_zone = LiteralExpression::new(DataType::Varchar, Some(time_zone.into()));
275
276    use risingwave_pb::expr::expr_node::PbType as PbExprType;
277    build_func_non_strict(
278        PbExprType::AddWithTimeZone,
279        DataType::Timestamptz,
280        vec![
281            timestamptz_input.boxed(),
282            interval.boxed(),
283            time_zone.boxed(),
284        ],
285        eval_error_report,
286    )
287}
288
289#[cfg(test)]
290mod tests {
291    use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
292    use risingwave_common::test_prelude::StreamChunkTestExt;
293    use risingwave_common::types::test_utils::IntervalTestExt;
294    use risingwave_common::util::epoch::test_epoch;
295    use risingwave_storage::memory::MemoryStateStore;
296    use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
297
298    use super::*;
299    use crate::common::table::test_utils::gen_pbtable;
300    use crate::executor::test_utils::StreamExecutorTestExt;
301
302    #[tokio::test]
303    async fn test_now() -> StreamExecutorResult<()> {
304        let state_store = create_state_store();
305        let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
306
307        // Init barrier
308        tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap();
309
310        // Consume the barrier
311        now.next_unwrap_ready_barrier()?;
312
313        // Consume the data chunk
314        let chunk_msg = now.next_unwrap_ready_chunk()?;
315
316        assert_eq!(
317            chunk_msg.compact(),
318            StreamChunk::from_pretty(
319                " TZ
320                + 2021-04-01T00:00:00.001Z"
321            )
322        );
323
324        // Consume the watermark
325        let watermark = now.next_unwrap_ready_watermark()?;
326
327        assert_eq!(
328            watermark,
329            Watermark::new(
330                0,
331                DataType::Timestamptz,
332                ScalarImpl::Timestamptz("2021-04-01T00:00:00.001Z".parse().unwrap())
333            )
334        );
335
336        tx.send(Barrier::with_prev_epoch_for_test(
337            test_epoch(2),
338            test_epoch(1),
339        ))
340        .unwrap();
341
342        // Consume the barrier
343        now.next_unwrap_ready_barrier()?;
344
345        // Consume the data chunk
346        let chunk_msg = now.next_unwrap_ready_chunk()?;
347
348        assert_eq!(
349            chunk_msg.compact(),
350            StreamChunk::from_pretty(
351                " TZ
352                - 2021-04-01T00:00:00.001Z
353                + 2021-04-01T00:00:00.002Z"
354            )
355        );
356
357        // Consume the watermark
358        let watermark = now.next_unwrap_ready_watermark()?;
359
360        assert_eq!(
361            watermark,
362            Watermark::new(
363                0,
364                DataType::Timestamptz,
365                ScalarImpl::Timestamptz("2021-04-01T00:00:00.002Z".parse().unwrap())
366            )
367        );
368
369        // No more messages until the next barrier
370        now.next_unwrap_pending();
371
372        // Recovery
373        drop((tx, now));
374        let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
375        tx.send(Barrier::with_prev_epoch_for_test(
376            test_epoch(3),
377            test_epoch(1),
378        ))
379        .unwrap();
380
381        // Consume the barrier
382        now.next_unwrap_ready_barrier()?;
383
384        // Consume the data chunk
385        let chunk_msg = now.next_unwrap_ready_chunk()?;
386        assert_eq!(
387            chunk_msg.compact(),
388            // the last chunk was not checkpointed so the deleted old value should be `001`
389            StreamChunk::from_pretty(
390                " TZ
391                - 2021-04-01T00:00:00.001Z
392                + 2021-04-01T00:00:00.003Z"
393            )
394        );
395
396        // Consume the watermark
397        let watermark = now.next_unwrap_ready_watermark()?;
398
399        assert_eq!(
400            watermark,
401            Watermark::new(
402                0,
403                DataType::Timestamptz,
404                ScalarImpl::Timestamptz("2021-04-01T00:00:00.003Z".parse().unwrap())
405            )
406        );
407
408        // Recovery with paused
409        drop((tx, now));
410        let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
411        tx.send(
412            Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(1))
413                .with_mutation(Mutation::Pause),
414        )
415        .unwrap();
416
417        // Consume the barrier
418        now.next_unwrap_ready_barrier()?;
419
420        // There should be no messages until `Resume`
421        now.next_unwrap_pending();
422
423        // Resume barrier
424        tx.send(
425            Barrier::with_prev_epoch_for_test(test_epoch(5), test_epoch(4))
426                .with_mutation(Mutation::Resume),
427        )
428        .unwrap();
429
430        // Consume the barrier
431        now.next_unwrap_ready_barrier()?;
432
433        // Consume the data chunk
434        let chunk_msg = now.next_unwrap_ready_chunk()?;
435        assert_eq!(
436            chunk_msg.compact(),
437            StreamChunk::from_pretty(
438                " TZ
439                - 2021-04-01T00:00:00.001Z
440                + 2021-04-01T00:00:00.005Z"
441            )
442        );
443
444        // Consume the watermark
445        let watermark = now.next_unwrap_ready_watermark()?;
446
447        assert_eq!(
448            watermark,
449            Watermark::new(
450                0,
451                DataType::Timestamptz,
452                ScalarImpl::Timestamptz("2021-04-01T00:00:00.005Z".parse().unwrap())
453            )
454        );
455
456        Ok(())
457    }
458
459    #[tokio::test]
460    async fn test_now_start_with_paused() -> StreamExecutorResult<()> {
461        let state_store = create_state_store();
462        let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await;
463
464        // Init barrier
465        tx.send(Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Pause))
466            .unwrap();
467
468        // Consume the barrier
469        now.next_unwrap_ready_barrier()?;
470
471        // There should be no messages until `Resume`
472        now.next_unwrap_pending();
473
474        // Resume barrier
475        tx.send(
476            Barrier::with_prev_epoch_for_test(test_epoch(2), test_epoch(1))
477                .with_mutation(Mutation::Resume),
478        )
479        .unwrap();
480
481        // Consume the barrier
482        now.next_unwrap_ready_barrier()?;
483
484        // Consume the data chunk
485        let chunk_msg = now.next_unwrap_ready_chunk()?;
486
487        assert_eq!(
488            chunk_msg.compact(),
489            StreamChunk::from_pretty(
490                " TZ
491                + 2021-04-01T00:00:00.002Z" // <- the timestamp is extracted from the current epoch
492            )
493        );
494
495        // Consume the watermark
496        let watermark = now.next_unwrap_ready_watermark()?;
497
498        assert_eq!(
499            watermark,
500            Watermark::new(
501                0,
502                DataType::Timestamptz,
503                ScalarImpl::Timestamptz("2021-04-01T00:00:00.002Z".parse().unwrap())
504            )
505        );
506
507        // No more messages until the next barrier
508        now.next_unwrap_pending();
509
510        Ok(())
511    }
512
513    #[tokio::test]
514    async fn test_now_generate_series() -> StreamExecutorResult<()> {
515        TIME_ZONE::scope("UTC".to_owned(), test_now_generate_series_inner()).await
516    }
517
518    async fn test_now_generate_series_inner() -> StreamExecutorResult<()> {
519        let start_timestamp = Timestamptz::from_secs(1617235190).unwrap(); // 2021-03-31 23:59:50 UTC
520        let interval = Interval::from_millis(1000); // 1s interval
521
522        let state_store = create_state_store();
523        let (tx, mut now) = create_executor(
524            NowMode::GenerateSeries {
525                start_timestamp,
526                interval,
527            },
528            &state_store,
529        )
530        .await;
531
532        // Init barrier
533        tx.send(Barrier::new_test_barrier(test_epoch(1000)))
534            .unwrap();
535        now.next_unwrap_ready_barrier()?;
536
537        // Initial timestamps
538        let chunk = now.next_unwrap_ready_chunk()?;
539        assert_eq!(chunk.cardinality(), 12); // seconds from 23:59:50 to 00:00:01 (inclusive)
540
541        assert_eq!(
542            now.next_unwrap_ready_watermark()?,
543            Watermark::new(
544                0,
545                DataType::Timestamptz,
546                ScalarImpl::Timestamptz("2021-04-01T00:00:01.000Z".parse().unwrap())
547            )
548        );
549
550        tx.send(Barrier::with_prev_epoch_for_test(
551            test_epoch(2000),
552            test_epoch(1000),
553        ))
554        .unwrap();
555        tx.send(Barrier::with_prev_epoch_for_test(
556            test_epoch(3000),
557            test_epoch(2000),
558        ))
559        .unwrap();
560
561        now.next_unwrap_ready_barrier()?;
562        now.next_unwrap_ready_barrier()?;
563
564        let chunk = now.next_unwrap_ready_chunk()?;
565        assert_eq!(
566            chunk.compact(),
567            StreamChunk::from_pretty(
568                " TZ
569                + 2021-04-01T00:00:02.000Z
570                + 2021-04-01T00:00:03.000Z"
571            )
572        );
573
574        let watermark = now.next_unwrap_ready_watermark()?;
575        assert_eq!(
576            watermark,
577            Watermark::new(
578                0,
579                DataType::Timestamptz,
580                ScalarImpl::Timestamptz("2021-04-01T00:00:03.000Z".parse().unwrap())
581            )
582        );
583
584        // Recovery
585        drop((tx, now));
586        let (tx, mut now) = create_executor(
587            NowMode::GenerateSeries {
588                start_timestamp,
589                interval,
590            },
591            &state_store,
592        )
593        .await;
594
595        tx.send(Barrier::with_prev_epoch_for_test(
596            test_epoch(4000),
597            test_epoch(2000),
598        ))
599        .unwrap();
600
601        now.next_unwrap_ready_barrier()?;
602
603        let chunk = now.next_unwrap_ready_chunk()?;
604        assert_eq!(
605            chunk.compact(),
606            StreamChunk::from_pretty(
607                " TZ
608                + 2021-04-01T00:00:02.000Z
609                + 2021-04-01T00:00:03.000Z
610                + 2021-04-01T00:00:04.000Z"
611            )
612        );
613
614        let watermark = now.next_unwrap_ready_watermark()?;
615        assert_eq!(
616            watermark,
617            Watermark::new(
618                0,
619                DataType::Timestamptz,
620                ScalarImpl::Timestamptz("2021-04-01T00:00:04.000Z".parse().unwrap())
621            )
622        );
623
624        Ok(())
625    }
626
627    fn create_state_store() -> MemoryStateStore {
628        MemoryStateStore::new()
629    }
630
631    async fn create_executor(
632        mode: NowMode,
633        state_store: &MemoryStateStore,
634    ) -> (UnboundedSender<Barrier>, BoxedMessageStream) {
635        let table_id = TableId::new(1);
636        let column_descs = vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamptz)];
637        let state_table = StateTable::from_table_catalog(
638            &gen_pbtable(table_id, column_descs, vec![], vec![], 0),
639            state_store.clone(),
640            None,
641        )
642        .await;
643
644        let (sender, barrier_receiver) = unbounded_channel();
645
646        let eval_error_report = ActorEvalErrorReport {
647            actor_context: ActorContext::for_test(123),
648            identity: "NowExecutor".into(),
649        };
650        let now_executor = NowExecutor::new(
651            vec![DataType::Timestamptz],
652            mode,
653            eval_error_report,
654            barrier_receiver,
655            state_table,
656        );
657        (sender, now_executor.boxed().execute())
658    }
659}