risingwave_stream/from_proto/
now.rs1use anyhow::Context;
16use risingwave_common::system_param::reader::SystemParamsRead;
17use risingwave_common::types::{DataType, Datum};
18use risingwave_common::util::value_encoding::DatumFromProtoExt;
19use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
20use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries};
21use risingwave_storage::StateStore;
22
23use super::ExecutorBuilder;
24use crate::common::table::state_table::StateTableBuilder;
25use crate::error::StreamResult;
26use crate::executor::{Executor, NowExecutor, NowMode};
27use crate::task::ExecutorParams;
28
29pub struct NowExecutorBuilder;
30
31impl ExecutorBuilder for NowExecutorBuilder {
32 type Node = NowNode;
33
34 async fn new_boxed_executor(
35 params: ExecutorParams,
36 node: &NowNode,
37 store: impl StateStore,
38 ) -> StreamResult<Executor> {
39 let barrier_receiver = params
40 .local_barrier_manager
41 .subscribe_barrier(params.actor_context.id);
42
43 let mode = if let Ok(pb_mode) = node.get_mode() {
44 match pb_mode {
45 PbNowMode::UpdateCurrent(_) => NowMode::UpdateCurrent,
46 PbNowMode::GenerateSeries(PbNowModeGenerateSeries {
47 start_timestamp,
48 interval,
49 }) => {
50 let start_timestamp = Datum::from_protobuf(
51 start_timestamp.as_ref().unwrap(),
52 &DataType::Timestamptz,
53 )
54 .context("`start_timestamp` field is not decodable")?
55 .context("`start_timestamp` field should not be NULL")?
56 .into_timestamptz();
57 let interval =
58 Datum::from_protobuf(interval.as_ref().unwrap(), &DataType::Interval)
59 .context("`interval` field is not decodable")?
60 .context("`interval` field should not be NULL")?
61 .into_interval();
62 NowMode::GenerateSeries {
63 start_timestamp,
64 interval,
65 }
66 }
67 }
68 } else {
69 NowMode::UpdateCurrent
71 };
72
73 let state_table = StateTableBuilder::new(node.get_state_table()?, store, None)
74 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
75 .build()
76 .await;
77 let barrier_interval_ms = params
78 .env
79 .system_params_manager_ref()
80 .get_params()
81 .load()
82 .barrier_interval_ms();
83 let progress_ratio = params.env.config().developer.now_progress_ratio;
84 let exec = NowExecutor::new(
85 params.info.schema.data_types(),
86 mode,
87 params.eval_error_report,
88 barrier_receiver,
89 state_table,
90 progress_ratio,
91 barrier_interval_ms,
92 );
93 Ok((params.info, exec).into())
94 }
95}