risingwave_frontend/optimizer/plan_node/
stream_now.rs1use pretty_xmlish::XmlNode;
16use risingwave_common::types::Datum;
17use risingwave_common::util::value_encoding::DatumToProtoExt;
18use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20use risingwave_pb::stream_plan::{PbNowModeGenerateSeries, PbNowModeUpdateCurrent, PbNowNode};
21
22use super::generic::{GenericPlanNode, Mode};
23use super::stream::prelude::*;
24use super::utils::{Distill, TableCatalogBuilder, childless_record};
25use super::{ExprRewritable, PlanBase, StreamNode, generic};
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::plan_node::utils::column_names_pretty;
28use crate::optimizer::property::{Distribution, Monotonicity, MonotonicityMap, WatermarkColumns};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamNow {
33 pub base: PlanBase<Stream>,
34 core: generic::Now,
35}
36
37impl StreamNow {
38 pub fn new(core: generic::Now) -> Self {
39 let mut watermark_columns = WatermarkColumns::new();
40 watermark_columns.insert(0, core.ctx().next_watermark_group_id()); let mut columns_monotonicity = MonotonicityMap::new();
43 columns_monotonicity.insert(0, Monotonicity::NonDecreasing);
44
45 let base = PlanBase::new_stream_with_core(
46 &core,
47 Distribution::Single,
48 if core.mode.is_generate_series() {
49 StreamKind::AppendOnly
50 } else {
51 StreamKind::Retract
52 },
53 core.mode.is_generate_series(), watermark_columns,
55 columns_monotonicity,
56 );
57 Self { base, core }
58 }
59}
60
61impl Distill for StreamNow {
62 fn distill<'a>(&self) -> XmlNode<'a> {
63 let vec = if self.base.ctx().is_explain_verbose() {
64 vec![("output", column_names_pretty(self.schema()))]
65 } else {
66 vec![]
67 };
68
69 childless_record("StreamNow", vec)
70 }
71}
72
73impl_plan_tree_node_for_leaf! { Stream, StreamNow }
74
75impl StreamNode for StreamNow {
76 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
77 let schema = self.base.schema();
78 let mut internal_table_catalog_builder = TableCatalogBuilder::default();
79 schema.fields().iter().for_each(|field| {
80 internal_table_catalog_builder.add_column(field);
81 });
82
83 let table_catalog = internal_table_catalog_builder
85 .build(vec![], 0)
86 .with_id(state.gen_table_id_wrapped());
87
88 NodeBody::Now(Box::new(PbNowNode {
89 state_table: Some(table_catalog.to_internal_table_prost()),
90 mode: Some(match &self.core.mode {
91 Mode::UpdateCurrent => PbNowMode::UpdateCurrent(PbNowModeUpdateCurrent {}),
92 Mode::GenerateSeries {
93 start_timestamp,
94 interval,
95 } => PbNowMode::GenerateSeries(PbNowModeGenerateSeries {
96 start_timestamp: Some(Datum::Some((*start_timestamp).into()).to_protobuf()),
97 interval: Some(Datum::Some((*interval).into()).to_protobuf()),
98 }),
99 }),
100 }))
101 }
102}
103
104impl ExprRewritable<Stream> for StreamNow {}
105
106impl ExprVisitable for StreamNow {}