risingwave_frontend/optimizer/plan_node/
stream_vector_index_lookup_join.rs1use pretty_xmlish::XmlNode;
16use risingwave_common::bail;
17use risingwave_pb::plan_common::PbVectorIndexReaderDesc;
18use risingwave_pb::stream_plan::PbVectorIndexLookupJoinNode;
19
20use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
21use crate::optimizer::plan_node::generic::{PhysicalPlanRef, VectorIndexLookupJoin};
22use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
23use crate::optimizer::plan_node::utils::{Distill, childless_record};
24use crate::optimizer::plan_node::{
25 ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef,
26};
27use crate::optimizer::property::StreamKind;
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamVectorIndexLookupJoin {
32 pub base: PlanBase<Stream>,
33 pub core: VectorIndexLookupJoin<StreamPlanRef>,
34}
35
36impl StreamVectorIndexLookupJoin {
37 pub fn new(core: VectorIndexLookupJoin<StreamPlanRef>) -> crate::error::Result<Self> {
38 if core.input.stream_kind() != StreamKind::AppendOnly {
39 bail!("StreamVectorIndexLookupJoin only support append only input")
40 }
41 Ok(Self::with_core(core))
42 }
43
44 fn with_core(core: VectorIndexLookupJoin<StreamPlanRef>) -> Self {
45 assert_eq!(core.input.stream_kind(), StreamKind::AppendOnly);
46 let base = PlanBase::new_stream_with_core(
47 &core,
48 core.input.distribution().clone(),
49 core.input.stream_kind(),
50 core.input.emit_on_window_close(),
51 core.input.watermark_columns().clone(),
52 core.input.columns_monotonicity().clone(),
53 );
54 Self { base, core }
55 }
56}
57
58impl Distill for StreamVectorIndexLookupJoin {
59 fn distill<'a>(&self) -> XmlNode<'a> {
60 let fields = self.core.distill();
61 childless_record("StreamVectorIndexLookupJoin", fields)
62 }
63}
64
65impl PlanTreeNodeUnary<Stream> for StreamVectorIndexLookupJoin {
66 fn input(&self) -> crate::PlanRef<Stream> {
67 self.core.input.clone()
68 }
69
70 fn clone_with_input(&self, input: crate::PlanRef<Stream>) -> Self {
71 let mut core = self.core.clone();
72 core.input = input;
73 Self::with_core(core)
74 }
75}
76
77impl_plan_tree_node_for_unary!(Stream, StreamVectorIndexLookupJoin);
78
79impl StreamNode for StreamVectorIndexLookupJoin {
80 fn to_stream_prost_body(
81 &self,
82 _state: &mut BuildFragmentGraphState,
83 ) -> risingwave_pb::stream_plan::stream_node::NodeBody {
84 risingwave_pb::stream_plan::stream_node::NodeBody::VectorIndexLookupJoin(
85 PbVectorIndexLookupJoinNode {
86 reader_desc: Some(PbVectorIndexReaderDesc {
87 table_id: self.core.index_table_id,
88 info_column_desc: self
89 .core
90 .info_column_desc
91 .iter()
92 .map(|col| col.to_protobuf())
93 .collect(),
94 top_n: self.core.top_n as _,
95 distance_type: self.core.distance_type as _,
96 hnsw_ef_search: self.core.hnsw_ef_search.unwrap_or(0) as _,
97 info_output_indices: self
98 .core
99 .info_output_indices
100 .iter()
101 .map(|&idx| idx as _)
102 .collect(),
103 include_distance: self.core.include_distance,
104 }),
105 vector_column_idx: self.core.vector_column_idx as _,
106 }
107 .into(),
108 )
109 }
110}
111
112impl ExprVisitable for StreamVectorIndexLookupJoin {}
113
114impl ExprRewritable<Stream> for StreamVectorIndexLookupJoin {}