risingwave_stream/from_proto/
now.rsuse anyhow::Context;
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::value_encoding::DatumFromProtoExt;
use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries};
use risingwave_storage::StateStore;
use super::ExecutorBuilder;
use crate::common::table::state_table::StateTable;
use crate::error::StreamResult;
use crate::executor::{Executor, NowExecutor, NowMode};
use crate::task::ExecutorParams;
pub struct NowExecutorBuilder;
impl ExecutorBuilder for NowExecutorBuilder {
type Node = NowNode;
async fn new_boxed_executor(
params: ExecutorParams,
node: &NowNode,
store: impl StateStore,
) -> StreamResult<Executor> {
let barrier_receiver = params
.shared_context
.local_barrier_manager
.subscribe_barrier(params.actor_context.id);
let mode = if let Ok(pb_mode) = node.get_mode() {
match pb_mode {
PbNowMode::UpdateCurrent(_) => NowMode::UpdateCurrent,
PbNowMode::GenerateSeries(PbNowModeGenerateSeries {
start_timestamp,
interval,
}) => {
let start_timestamp = Datum::from_protobuf(
start_timestamp.as_ref().unwrap(),
&DataType::Timestamptz,
)
.context("`start_timestamp` field is not decodable")?
.context("`start_timestamp` field should not be NULL")?
.into_timestamptz();
let interval =
Datum::from_protobuf(interval.as_ref().unwrap(), &DataType::Interval)
.context("`interval` field is not decodable")?
.context("`interval` field should not be NULL")?
.into_interval();
NowMode::GenerateSeries {
start_timestamp,
interval,
}
}
}
} else {
NowMode::UpdateCurrent
};
let state_table =
StateTable::from_table_catalog(node.get_state_table()?, store, None).await;
let exec = NowExecutor::new(
params.info.schema.data_types(),
mode,
params.eval_error_report,
barrier_receiver,
state_table,
);
Ok((params.info, exec).into())
}
}