risingwave_frontend/optimizer/plan_node/
stream_now.rsuse fixedbitset::FixedBitSet;
use pretty_xmlish::XmlNode;
use risingwave_common::types::Datum;
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{PbNowModeGenerateSeries, PbNowModeUpdateCurrent, PbNowNode};
use super::generic::Mode;
use super::stream::prelude::*;
use super::utils::{childless_record, Distill, TableCatalogBuilder};
use super::{generic, ExprRewritable, PlanBase, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::column_names_pretty;
use crate::optimizer::property::{Distribution, Monotonicity, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamNow {
pub base: PlanBase<Stream>,
core: generic::Now,
}
impl StreamNow {
pub fn new(core: generic::Now) -> Self {
let mut watermark_columns = FixedBitSet::with_capacity(1);
watermark_columns.set(0, true);
let mut columns_monotonicity = MonotonicityMap::new();
columns_monotonicity.insert(0, Monotonicity::NonDecreasing);
let base = PlanBase::new_stream_with_core(
&core,
Distribution::Single,
core.mode.is_generate_series(), core.mode.is_generate_series(), watermark_columns,
columns_monotonicity,
);
Self { base, core }
}
}
impl Distill for StreamNow {
fn distill<'a>(&self) -> XmlNode<'a> {
let vec = if self.base.ctx().is_explain_verbose() {
vec![("output", column_names_pretty(self.schema()))]
} else {
vec![]
};
childless_record("StreamNow", vec)
}
}
impl_plan_tree_node_for_leaf! { StreamNow }
impl StreamNode for StreamNow {
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
let schema = self.base.schema();
let dist_keys = self.base.distribution().dist_column_indices().to_vec();
let mut internal_table_catalog_builder = TableCatalogBuilder::default();
schema.fields().iter().for_each(|field| {
internal_table_catalog_builder.add_column(field);
});
let table_catalog = internal_table_catalog_builder
.build(dist_keys, 0)
.with_id(state.gen_table_id_wrapped());
NodeBody::Now(PbNowNode {
state_table: Some(table_catalog.to_internal_table_prost()),
mode: Some(match &self.core.mode {
Mode::UpdateCurrent => PbNowMode::UpdateCurrent(PbNowModeUpdateCurrent {}),
Mode::GenerateSeries {
start_timestamp,
interval,
} => PbNowMode::GenerateSeries(PbNowModeGenerateSeries {
start_timestamp: Some(Datum::Some((*start_timestamp).into()).to_protobuf()),
interval: Some(Datum::Some((*interval).into()).to_protobuf()),
}),
}),
})
}
}
impl ExprRewritable for StreamNow {}
impl ExprVisitable for StreamNow {}