risingwave_stream/from_proto/
now.rs1use anyhow::Context;
16use risingwave_common::system_param::reader::SystemParamsRead;
17use risingwave_common::types::{DataType, Datum};
18use risingwave_common::util::value_encoding::DatumFromProtoExt;
19use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
20use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries};
21use risingwave_storage::StateStore;
22
23use super::ExecutorBuilder;
24use crate::common::table::state_table::StateTableBuilder;
25use crate::error::StreamResult;
26use crate::executor::{Executor, NowExecutor, NowMode};
27use crate::task::ExecutorParams;
28
29pub struct NowExecutorBuilder;
30
31impl_stream_node_body!(Now(NowNode) => NowExecutorBuilder);
32
33impl ExecutorBuilder for NowExecutorBuilder {
34 type Node = NowNode;
35
36 async fn new_boxed_executor(
37 params: ExecutorParams,
38 node: &NowNode,
39 store: impl StateStore,
40 ) -> StreamResult<Executor> {
41 let barrier_receiver = params
42 .local_barrier_manager
43 .subscribe_barrier(params.actor_context.id);
44
45 let mode = if let Ok(pb_mode) = node.get_mode() {
46 match pb_mode {
47 PbNowMode::UpdateCurrent(_) => NowMode::UpdateCurrent,
48 PbNowMode::GenerateSeries(PbNowModeGenerateSeries {
49 start_timestamp,
50 interval,
51 }) => {
52 let start_timestamp = Datum::from_protobuf(
53 start_timestamp.as_ref().unwrap(),
54 &DataType::Timestamptz,
55 )
56 .context("`start_timestamp` field is not decodable")?
57 .context("`start_timestamp` field should not be NULL")?
58 .into_timestamptz();
59 let interval =
60 Datum::from_protobuf(interval.as_ref().unwrap(), &DataType::Interval)
61 .context("`interval` field is not decodable")?
62 .context("`interval` field should not be NULL")?
63 .into_interval();
64 NowMode::GenerateSeries {
65 start_timestamp,
66 interval,
67 }
68 }
69 }
70 } else {
71 NowMode::UpdateCurrent
73 };
74
75 let state_table = StateTableBuilder::new(node.get_state_table()?, store, None)
76 .enable_preload_all_rows_by_config(¶ms.config)
77 .build()
78 .await;
79 let barrier_interval_ms = params
80 .env
81 .system_params_manager_ref()
82 .get_params()
83 .load()
84 .barrier_interval_ms();
85 let progress_ratio = params.config.developer.now_progress_ratio;
86 let exec = NowExecutor::new(
87 params.info.schema.data_types(),
88 mode,
89 params.eval_error_report,
90 barrier_receiver,
91 state_table,
92 progress_ratio,
93 barrier_interval_ms,
94 );
95 Ok((params.info, exec).into())
96 }
97}