Skip to main content

risingwave_stream/from_proto/
now.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use risingwave_common::system_param::reader::SystemParamsRead;
17use risingwave_common::types::{DataType, Datum};
18use risingwave_common::util::value_encoding::DatumFromProtoExt;
19use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
20use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries};
21use risingwave_storage::StateStore;
22
23use super::ExecutorBuilder;
24use crate::common::table::state_table::StateTableBuilder;
25use crate::error::StreamResult;
26use crate::executor::{Executor, NowExecutor, NowMode};
27use crate::task::ExecutorParams;
28
29pub struct NowExecutorBuilder;
30
31impl_stream_node_body!(Now(NowNode) => NowExecutorBuilder);
32
33impl ExecutorBuilder for NowExecutorBuilder {
34    type Node = NowNode;
35
36    async fn new_boxed_executor(
37        params: ExecutorParams,
38        node: &NowNode,
39        store: impl StateStore,
40    ) -> StreamResult<Executor> {
41        let barrier_receiver = params
42            .local_barrier_manager
43            .subscribe_barrier(params.actor_context.id);
44
45        let mode = if let Ok(pb_mode) = node.get_mode() {
46            match pb_mode {
47                PbNowMode::UpdateCurrent(_) => NowMode::UpdateCurrent,
48                PbNowMode::GenerateSeries(PbNowModeGenerateSeries {
49                    start_timestamp,
50                    interval,
51                }) => {
52                    let start_timestamp = Datum::from_protobuf(
53                        start_timestamp.as_ref().unwrap(),
54                        &DataType::Timestamptz,
55                    )
56                    .context("`start_timestamp` field is not decodable")?
57                    .context("`start_timestamp` field should not be NULL")?
58                    .into_timestamptz();
59                    let interval =
60                        Datum::from_protobuf(interval.as_ref().unwrap(), &DataType::Interval)
61                            .context("`interval` field is not decodable")?
62                            .context("`interval` field should not be NULL")?
63                            .into_interval();
64                    NowMode::GenerateSeries {
65                        start_timestamp,
66                        interval,
67                    }
68                }
69            }
70        } else {
71            // default to `UpdateCurrent` for backward-compatibility
72            NowMode::UpdateCurrent
73        };
74
75        let state_table = StateTableBuilder::new(node.get_state_table()?, store, None)
76            .enable_preload_all_rows_by_config(&params.config)
77            .build()
78            .await;
79        let barrier_interval_ms = params
80            .env
81            .system_params_manager_ref()
82            .get_params()
83            .load()
84            .barrier_interval_ms();
85        let progress_ratio = params.config.developer.now_progress_ratio;
86        let exec = NowExecutor::new(
87            params.info.schema.data_types(),
88            mode,
89            params.eval_error_report,
90            barrier_receiver,
91            state_table,
92            progress_ratio,
93            barrier_interval_ms,
94        );
95        Ok((params.info, exec).into())
96    }
97}