risingwave_stream/from_proto/
now.rs1use anyhow::Context;
16use risingwave_common::types::{DataType, Datum};
17use risingwave_common::util::value_encoding::DatumFromProtoExt;
18use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
19use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries};
20use risingwave_storage::StateStore;
21
22use super::ExecutorBuilder;
23use crate::common::table::state_table::StateTable;
24use crate::error::StreamResult;
25use crate::executor::{Executor, NowExecutor, NowMode};
26use crate::task::ExecutorParams;
27
28pub struct NowExecutorBuilder;
29
30impl ExecutorBuilder for NowExecutorBuilder {
31 type Node = NowNode;
32
33 async fn new_boxed_executor(
34 params: ExecutorParams,
35 node: &NowNode,
36 store: impl StateStore,
37 ) -> StreamResult<Executor> {
38 let barrier_receiver = params
39 .local_barrier_manager
40 .subscribe_barrier(params.actor_context.id);
41
42 let mode = if let Ok(pb_mode) = node.get_mode() {
43 match pb_mode {
44 PbNowMode::UpdateCurrent(_) => NowMode::UpdateCurrent,
45 PbNowMode::GenerateSeries(PbNowModeGenerateSeries {
46 start_timestamp,
47 interval,
48 }) => {
49 let start_timestamp = Datum::from_protobuf(
50 start_timestamp.as_ref().unwrap(),
51 &DataType::Timestamptz,
52 )
53 .context("`start_timestamp` field is not decodable")?
54 .context("`start_timestamp` field should not be NULL")?
55 .into_timestamptz();
56 let interval =
57 Datum::from_protobuf(interval.as_ref().unwrap(), &DataType::Interval)
58 .context("`interval` field is not decodable")?
59 .context("`interval` field should not be NULL")?
60 .into_interval();
61 NowMode::GenerateSeries {
62 start_timestamp,
63 interval,
64 }
65 }
66 }
67 } else {
68 NowMode::UpdateCurrent
70 };
71
72 let state_table =
73 StateTable::from_table_catalog(node.get_state_table()?, store, None).await;
74
75 let exec = NowExecutor::new(
76 params.info.schema.data_types(),
77 mode,
78 params.eval_error_report,
79 barrier_receiver,
80 state_table,
81 );
82 Ok((params.info, exec).into())
83 }
84}