risingwave_stream/from_proto/
now.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use risingwave_common::system_param::reader::SystemParamsRead;
17use risingwave_common::types::{DataType, Datum};
18use risingwave_common::util::value_encoding::DatumFromProtoExt;
19use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
20use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries};
21use risingwave_storage::StateStore;
22
23use super::ExecutorBuilder;
24use crate::common::table::state_table::StateTableBuilder;
25use crate::error::StreamResult;
26use crate::executor::{Executor, NowExecutor, NowMode};
27use crate::task::ExecutorParams;
28
29pub struct NowExecutorBuilder;
30
31impl ExecutorBuilder for NowExecutorBuilder {
32    type Node = NowNode;
33
34    async fn new_boxed_executor(
35        params: ExecutorParams,
36        node: &NowNode,
37        store: impl StateStore,
38    ) -> StreamResult<Executor> {
39        let barrier_receiver = params
40            .local_barrier_manager
41            .subscribe_barrier(params.actor_context.id);
42
43        let mode = if let Ok(pb_mode) = node.get_mode() {
44            match pb_mode {
45                PbNowMode::UpdateCurrent(_) => NowMode::UpdateCurrent,
46                PbNowMode::GenerateSeries(PbNowModeGenerateSeries {
47                    start_timestamp,
48                    interval,
49                }) => {
50                    let start_timestamp = Datum::from_protobuf(
51                        start_timestamp.as_ref().unwrap(),
52                        &DataType::Timestamptz,
53                    )
54                    .context("`start_timestamp` field is not decodable")?
55                    .context("`start_timestamp` field should not be NULL")?
56                    .into_timestamptz();
57                    let interval =
58                        Datum::from_protobuf(interval.as_ref().unwrap(), &DataType::Interval)
59                            .context("`interval` field is not decodable")?
60                            .context("`interval` field should not be NULL")?
61                            .into_interval();
62                    NowMode::GenerateSeries {
63                        start_timestamp,
64                        interval,
65                    }
66                }
67            }
68        } else {
69            // default to `UpdateCurrent` for backward-compatibility
70            NowMode::UpdateCurrent
71        };
72
73        let state_table = StateTableBuilder::new(node.get_state_table()?, store, None)
74            .enable_preload_all_rows_by_config(&params.actor_context.streaming_config)
75            .build()
76            .await;
77        let barrier_interval_ms = params
78            .env
79            .system_params_manager_ref()
80            .get_params()
81            .load()
82            .barrier_interval_ms();
83        let progress_ratio = params.env.config().developer.now_progress_ratio;
84        let exec = NowExecutor::new(
85            params.info.schema.data_types(),
86            mode,
87            params.eval_error_report,
88            barrier_receiver,
89            state_table,
90            progress_ratio,
91            barrier_interval_ms,
92        );
93        Ok((params.info, exec).into())
94    }
95}