risingwave_frontend/optimizer/plan_node/generic/
now.rsuse educe::Educe;
use enum_as_inner::EnumAsInner;
use pretty_xmlish::{Pretty, Str, XmlNode};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::{DataType, Interval, Timestamptz};
use super::{DistillUnit, GenericPlanNode};
use crate::optimizer::plan_node::utils::childless_record;
use crate::optimizer::property::FunctionalDependencySet;
use crate::OptimizerContextRef;
#[derive(Debug, Clone, Educe)]
#[educe(PartialEq, Eq, Hash)]
pub struct Now {
#[educe(PartialEq(ignore))]
#[educe(Hash(ignore))]
pub ctx: OptimizerContextRef,
pub mode: Mode,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumAsInner)]
pub enum Mode {
UpdateCurrent,
GenerateSeries {
start_timestamp: Timestamptz,
interval: Interval,
},
}
impl GenericPlanNode for Now {
fn functional_dependency(&self) -> crate::optimizer::property::FunctionalDependencySet {
FunctionalDependencySet::new(1) }
fn schema(&self) -> risingwave_common::catalog::Schema {
Schema::new(vec![Field {
data_type: DataType::Timestamptz,
name: String::from(if self.mode.is_update_current() {
"now"
} else {
"ts"
}),
sub_fields: vec![],
type_name: String::default(),
}])
}
fn stream_key(&self) -> Option<Vec<usize>> {
match self.mode {
Mode::UpdateCurrent => Some(vec![]),
Mode::GenerateSeries { .. } => Some(vec![0]),
}
}
fn ctx(&self) -> OptimizerContextRef {
self.ctx.clone()
}
}
impl Now {
pub fn update_current(ctx: OptimizerContextRef) -> Self {
Self::new_inner(ctx, Mode::UpdateCurrent)
}
pub fn generate_series(
ctx: OptimizerContextRef,
start_timestamp: Timestamptz,
interval: Interval,
) -> Self {
Self::new_inner(
ctx,
Mode::GenerateSeries {
start_timestamp,
interval,
},
)
}
fn new_inner(ctx: OptimizerContextRef, mode: Mode) -> Self {
Self { ctx, mode }
}
}
impl DistillUnit for Now {
fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
childless_record(name, vec![("mode", Pretty::debug(&self.mode))])
}
}