risingwave_stream/from_proto/
now.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use risingwave_common::types::{DataType, Datum};
17use risingwave_common::util::value_encoding::DatumFromProtoExt;
18use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
19use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries};
20use risingwave_storage::StateStore;
21
22use super::ExecutorBuilder;
23use crate::common::table::state_table::StateTable;
24use crate::error::StreamResult;
25use crate::executor::{Executor, NowExecutor, NowMode};
26use crate::task::ExecutorParams;
27
28pub struct NowExecutorBuilder;
29
30impl ExecutorBuilder for NowExecutorBuilder {
31    type Node = NowNode;
32
33    async fn new_boxed_executor(
34        params: ExecutorParams,
35        node: &NowNode,
36        store: impl StateStore,
37    ) -> StreamResult<Executor> {
38        let barrier_receiver = params
39            .local_barrier_manager
40            .subscribe_barrier(params.actor_context.id);
41
42        let mode = if let Ok(pb_mode) = node.get_mode() {
43            match pb_mode {
44                PbNowMode::UpdateCurrent(_) => NowMode::UpdateCurrent,
45                PbNowMode::GenerateSeries(PbNowModeGenerateSeries {
46                    start_timestamp,
47                    interval,
48                }) => {
49                    let start_timestamp = Datum::from_protobuf(
50                        start_timestamp.as_ref().unwrap(),
51                        &DataType::Timestamptz,
52                    )
53                    .context("`start_timestamp` field is not decodable")?
54                    .context("`start_timestamp` field should not be NULL")?
55                    .into_timestamptz();
56                    let interval =
57                        Datum::from_protobuf(interval.as_ref().unwrap(), &DataType::Interval)
58                            .context("`interval` field is not decodable")?
59                            .context("`interval` field should not be NULL")?
60                            .into_interval();
61                    NowMode::GenerateSeries {
62                        start_timestamp,
63                        interval,
64                    }
65                }
66            }
67        } else {
68            // default to `UpdateCurrent` for backward-compatibility
69            NowMode::UpdateCurrent
70        };
71
72        let state_table =
73            StateTable::from_table_catalog(node.get_state_table()?, store, None).await;
74
75        let exec = NowExecutor::new(
76            params.info.schema.data_types(),
77            mode,
78            params.eval_error_report,
79            barrier_receiver,
80            state_table,
81        );
82        Ok((params.info, exec).into())
83    }
84}