risingwave_frontend/optimizer/plan_node/
plan_base.rsuse educe::Educe;
use super::generic::GenericPlanNode;
use super::*;
use crate::optimizer::property::Distribution;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct NoExtra;
mod physical_common {
use super::*;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct PhysicalCommonExtra {
pub dist: Distribution,
}
pub trait GetPhysicalCommon {
fn physical(&self) -> &PhysicalCommonExtra;
fn physical_mut(&mut self) -> &mut PhysicalCommonExtra;
}
}
use physical_common::*;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct StreamExtra {
physical: PhysicalCommonExtra,
append_only: bool,
emit_on_window_close: bool,
watermark_columns: FixedBitSet,
columns_monotonicity: MonotonicityMap,
}
impl GetPhysicalCommon for StreamExtra {
fn physical(&self) -> &PhysicalCommonExtra {
&self.physical
}
fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
&mut self.physical
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct BatchExtra {
physical: PhysicalCommonExtra,
order: Order,
}
impl GetPhysicalCommon for BatchExtra {
fn physical(&self) -> &PhysicalCommonExtra {
&self.physical
}
fn physical_mut(&mut self) -> &mut PhysicalCommonExtra {
&mut self.physical
}
}
#[derive(Educe)]
#[educe(PartialEq, Eq, Hash, Clone, Debug)]
pub struct PlanBase<C: ConventionMarker> {
#[educe(PartialEq(ignore), Hash(ignore))]
id: PlanNodeId,
#[educe(PartialEq(ignore), Hash(ignore))]
ctx: OptimizerContextRef,
schema: Schema,
stream_key: Option<Vec<usize>>,
functional_dependency: FunctionalDependencySet,
extra: C::Extra,
}
impl<C: ConventionMarker> generic::GenericPlanRef for PlanBase<C> {
fn id(&self) -> PlanNodeId {
self.id
}
fn schema(&self) -> &Schema {
&self.schema
}
fn stream_key(&self) -> Option<&[usize]> {
self.stream_key.as_deref()
}
fn ctx(&self) -> OptimizerContextRef {
self.ctx.clone()
}
fn functional_dependency(&self) -> &FunctionalDependencySet {
&self.functional_dependency
}
}
impl<C: ConventionMarker> generic::PhysicalPlanRef for PlanBase<C>
where
C::Extra: GetPhysicalCommon,
{
fn distribution(&self) -> &Distribution {
&self.extra.physical().dist
}
}
impl stream::StreamPlanRef for PlanBase<Stream> {
fn append_only(&self) -> bool {
self.extra.append_only
}
fn emit_on_window_close(&self) -> bool {
self.extra.emit_on_window_close
}
fn watermark_columns(&self) -> &FixedBitSet {
&self.extra.watermark_columns
}
fn columns_monotonicity(&self) -> &MonotonicityMap {
&self.extra.columns_monotonicity
}
}
impl batch::BatchPlanRef for PlanBase<Batch> {
fn order(&self) -> &Order {
&self.extra.order
}
}
impl<C: ConventionMarker> PlanBase<C> {
pub fn clone_with_new_plan_id(&self) -> Self {
let mut new = self.clone();
new.id = self.ctx().next_plan_node_id();
new
}
}
impl PlanBase<Logical> {
pub fn new_logical(
ctx: OptimizerContextRef,
schema: Schema,
stream_key: Option<Vec<usize>>,
functional_dependency: FunctionalDependencySet,
) -> Self {
let id = ctx.next_plan_node_id();
Self {
id,
ctx,
schema,
stream_key,
functional_dependency,
extra: NoExtra,
}
}
pub fn new_logical_with_core(core: &impl GenericPlanNode) -> Self {
Self::new_logical(
core.ctx(),
core.schema(),
core.stream_key(),
core.functional_dependency(),
)
}
}
impl PlanBase<Stream> {
pub fn new_stream(
ctx: OptimizerContextRef,
schema: Schema,
stream_key: Option<Vec<usize>>,
functional_dependency: FunctionalDependencySet,
dist: Distribution,
append_only: bool,
emit_on_window_close: bool,
watermark_columns: FixedBitSet,
columns_monotonicity: MonotonicityMap,
) -> Self {
let id = ctx.next_plan_node_id();
assert_eq!(watermark_columns.len(), schema.len());
Self {
id,
ctx,
schema,
stream_key,
functional_dependency,
extra: StreamExtra {
physical: PhysicalCommonExtra { dist },
append_only,
emit_on_window_close,
watermark_columns,
columns_monotonicity,
},
}
}
pub fn new_stream_with_core(
core: &impl GenericPlanNode,
dist: Distribution,
append_only: bool,
emit_on_window_close: bool,
watermark_columns: FixedBitSet,
columns_monotonicity: MonotonicityMap,
) -> Self {
Self::new_stream(
core.ctx(),
core.schema(),
core.stream_key(),
core.functional_dependency(),
dist,
append_only,
emit_on_window_close,
watermark_columns,
columns_monotonicity,
)
}
}
impl PlanBase<Batch> {
pub fn new_batch(
ctx: OptimizerContextRef,
schema: Schema,
dist: Distribution,
order: Order,
) -> Self {
let id = ctx.next_plan_node_id();
let functional_dependency = FunctionalDependencySet::new(schema.len());
Self {
id,
ctx,
schema,
stream_key: None,
functional_dependency,
extra: BatchExtra {
physical: PhysicalCommonExtra { dist },
order,
},
}
}
pub fn new_batch_with_core(
core: &impl GenericPlanNode,
dist: Distribution,
order: Order,
) -> Self {
Self::new_batch(core.ctx(), core.schema(), dist, order)
}
}
impl<C: ConventionMarker> PlanBase<C>
where
C::Extra: GetPhysicalCommon,
{
pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self {
let mut new = self.clone();
new.extra.physical_mut().dist = dist;
new
}
}
#[cfg(test)]
impl<C: ConventionMarker> PlanBase<C> {
pub fn functional_dependency_mut(&mut self) -> &mut FunctionalDependencySet {
&mut self.functional_dependency
}
}
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, enum_as_inner::EnumAsInner)]
pub enum PlanBaseRef<'a> {
Logical(&'a PlanBase<Logical>),
Stream(&'a PlanBase<Stream>),
Batch(&'a PlanBase<Batch>),
}
impl PlanBaseRef<'_> {
pub fn convention(self) -> Convention {
match self {
PlanBaseRef::Logical(_) => Convention::Logical,
PlanBaseRef::Stream(_) => Convention::Stream,
PlanBaseRef::Batch(_) => Convention::Batch,
}
}
}
macro_rules! dispatch_plan_base {
($self:ident, [$($convention:ident),+ $(,)?], $method:expr) => {
match $self {
$(
PlanBaseRef::$convention(plan) => $method(plan),
)+
#[allow(unreachable_patterns)]
_ => unreachable!("calling `{}` on a plan node of `{:?}`", stringify!($method), $self.convention()),
}
}
}
impl<'a> PlanBaseRef<'a> {
pub(super) fn schema(self) -> &'a Schema {
dispatch_plan_base!(self, [Logical, Stream, Batch], GenericPlanRef::schema)
}
pub(super) fn stream_key(self) -> Option<&'a [usize]> {
dispatch_plan_base!(self, [Logical, Stream, Batch], GenericPlanRef::stream_key)
}
pub(super) fn functional_dependency(self) -> &'a FunctionalDependencySet {
dispatch_plan_base!(
self,
[Logical, Stream, Batch],
GenericPlanRef::functional_dependency
)
}
pub(super) fn distribution(self) -> &'a Distribution {
dispatch_plan_base!(self, [Stream, Batch], PhysicalPlanRef::distribution)
}
pub(super) fn watermark_columns(self) -> &'a FixedBitSet {
dispatch_plan_base!(self, [Stream], StreamPlanRef::watermark_columns)
}
pub(super) fn columns_monotonicity(self) -> &'a MonotonicityMap {
dispatch_plan_base!(self, [Stream], StreamPlanRef::columns_monotonicity)
}
pub(super) fn order(self) -> &'a Order {
dispatch_plan_base!(self, [Batch], BatchPlanRef::order)
}
}
impl GenericPlanRef for PlanBaseRef<'_> {
fn id(&self) -> PlanNodeId {
dispatch_plan_base!(self, [Logical, Stream, Batch], GenericPlanRef::id)
}
fn schema(&self) -> &Schema {
(*self).schema()
}
fn stream_key(&self) -> Option<&[usize]> {
(*self).stream_key()
}
fn functional_dependency(&self) -> &FunctionalDependencySet {
(*self).functional_dependency()
}
fn ctx(&self) -> OptimizerContextRef {
dispatch_plan_base!(self, [Logical, Stream, Batch], GenericPlanRef::ctx)
}
}
impl PhysicalPlanRef for PlanBaseRef<'_> {
fn distribution(&self) -> &Distribution {
(*self).distribution()
}
}
impl StreamPlanRef for PlanBaseRef<'_> {
fn append_only(&self) -> bool {
dispatch_plan_base!(self, [Stream], StreamPlanRef::append_only)
}
fn emit_on_window_close(&self) -> bool {
dispatch_plan_base!(self, [Stream], StreamPlanRef::emit_on_window_close)
}
fn watermark_columns(&self) -> &FixedBitSet {
(*self).watermark_columns()
}
fn columns_monotonicity(&self) -> &MonotonicityMap {
(*self).columns_monotonicity()
}
}
impl BatchPlanRef for PlanBaseRef<'_> {
fn order(&self) -> &Order {
(*self).order()
}
}