risingwave_frontend/optimizer/plan_node/
stream_changelog.rsuse risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::ChangeLogNode;
use super::expr_visitable::ExprVisitable;
use super::stream::prelude::PhysicalPlanRef;
use super::stream::StreamPlanRef;
use super::utils::impl_distill_by_unit;
use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode};
use crate::optimizer::property::MonotonicityMap;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamChangeLog {
pub base: PlanBase<Stream>,
core: generic::ChangeLog<PlanRef>,
}
impl StreamChangeLog {
pub fn new(core: generic::ChangeLog<PlanRef>) -> Self {
let input = core.input.clone();
let dist = input.distribution().clone();
let input_len = input.schema().len();
let mut watermark_columns = input.watermark_columns().clone();
if core.need_op {
watermark_columns.grow(input_len + 2);
} else {
watermark_columns.grow(input_len + 1);
}
let base = PlanBase::new_stream_with_core(
&core,
dist,
true,
input.emit_on_window_close(),
watermark_columns,
MonotonicityMap::new(), );
StreamChangeLog { base, core }
}
}
impl PlanTreeNodeUnary for StreamChangeLog {
fn input(&self) -> PlanRef {
self.core.input.clone()
}
fn clone_with_input(&self, input: PlanRef) -> Self {
let mut core = self.core.clone();
core.input = input;
Self::new(core)
}
}
impl_plan_tree_node_for_unary! { StreamChangeLog }
impl_distill_by_unit!(StreamChangeLog, core, "StreamChangeLog");
impl StreamNode for StreamChangeLog {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
PbNodeBody::Changelog(ChangeLogNode {
need_op: self.core.need_op,
})
}
}
impl ExprRewritable for StreamChangeLog {}
impl ExprVisitable for StreamChangeLog {}