risingwave_frontend/optimizer/plan_node/
stream_now.rs1use pretty_xmlish::XmlNode;
16use risingwave_common::types::Datum;
17use risingwave_common::util::value_encoding::DatumToProtoExt;
18use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20use risingwave_pb::stream_plan::{PbNowModeGenerateSeries, PbNowModeUpdateCurrent, PbNowNode};
21
22use super::generic::{GenericPlanNode, Mode};
23use super::stream::prelude::*;
24use super::utils::{Distill, TableCatalogBuilder, childless_record};
25use super::{ExprRewritable, PlanBase, StreamNode, generic};
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::plan_node::utils::column_names_pretty;
28use crate::optimizer::property::{Distribution, Monotonicity, MonotonicityMap, WatermarkColumns};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamNow {
33 pub base: PlanBase<Stream>,
34 core: generic::Now,
35}
36
37impl StreamNow {
38 pub fn new(core: generic::Now) -> Self {
39 let mut watermark_columns = WatermarkColumns::new();
40 watermark_columns.insert(0, core.ctx().next_watermark_group_id()); let mut columns_monotonicity = MonotonicityMap::new();
43 columns_monotonicity.insert(0, Monotonicity::NonDecreasing);
44
45 let base = PlanBase::new_stream_with_core(
46 &core,
47 Distribution::Single,
48 core.mode.is_generate_series(), core.mode.is_generate_series(), watermark_columns,
51 columns_monotonicity,
52 );
53 Self { base, core }
54 }
55}
56
57impl Distill for StreamNow {
58 fn distill<'a>(&self) -> XmlNode<'a> {
59 let vec = if self.base.ctx().is_explain_verbose() {
60 vec![("output", column_names_pretty(self.schema()))]
61 } else {
62 vec![]
63 };
64
65 childless_record("StreamNow", vec)
66 }
67}
68
69impl_plan_tree_node_for_leaf! { StreamNow }
70
71impl StreamNode for StreamNow {
72 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
73 let schema = self.base.schema();
74 let dist_keys = self.base.distribution().dist_column_indices().to_vec();
75 let mut internal_table_catalog_builder = TableCatalogBuilder::default();
76 schema.fields().iter().for_each(|field| {
77 internal_table_catalog_builder.add_column(field);
78 });
79
80 let table_catalog = internal_table_catalog_builder
81 .build(dist_keys, 0)
82 .with_id(state.gen_table_id_wrapped());
83 NodeBody::Now(Box::new(PbNowNode {
84 state_table: Some(table_catalog.to_internal_table_prost()),
85 mode: Some(match &self.core.mode {
86 Mode::UpdateCurrent => PbNowMode::UpdateCurrent(PbNowModeUpdateCurrent {}),
87 Mode::GenerateSeries {
88 start_timestamp,
89 interval,
90 } => PbNowMode::GenerateSeries(PbNowModeGenerateSeries {
91 start_timestamp: Some(Datum::Some((*start_timestamp).into()).to_protobuf()),
92 interval: Some(Datum::Some((*interval).into()).to_protobuf()),
93 }),
94 }),
95 }))
96 }
97}
98
99impl ExprRewritable for StreamNow {}
100
101impl ExprVisitable for StreamNow {}