risingwave_frontend/optimizer/plan_node/
stream_vector_index_lookup_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_common::bail;
17use risingwave_pb::plan_common::PbVectorIndexReaderDesc;
18use risingwave_pb::stream_plan::PbVectorIndexLookupJoinNode;
19
20use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
21use crate::optimizer::plan_node::generic::{PhysicalPlanRef, VectorIndexLookupJoin};
22use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
23use crate::optimizer::plan_node::utils::{Distill, childless_record};
24use crate::optimizer::plan_node::{
25    ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef,
26};
27use crate::optimizer::property::StreamKind;
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamVectorIndexLookupJoin {
32    pub base: PlanBase<Stream>,
33    pub core: VectorIndexLookupJoin<StreamPlanRef>,
34}
35
36impl StreamVectorIndexLookupJoin {
37    pub fn new(core: VectorIndexLookupJoin<StreamPlanRef>) -> crate::error::Result<Self> {
38        if core.input.stream_kind() != StreamKind::AppendOnly {
39            bail!("StreamVectorIndexLookupJoin only support append only input")
40        }
41        Ok(Self::with_core(core))
42    }
43
44    fn with_core(core: VectorIndexLookupJoin<StreamPlanRef>) -> Self {
45        assert_eq!(core.input.stream_kind(), StreamKind::AppendOnly);
46        let base = PlanBase::new_stream_with_core(
47            &core,
48            core.input.distribution().clone(),
49            core.input.stream_kind(),
50            core.input.emit_on_window_close(),
51            core.input.watermark_columns().clone(),
52            core.input.columns_monotonicity().clone(),
53        );
54        Self { base, core }
55    }
56}
57
58impl Distill for StreamVectorIndexLookupJoin {
59    fn distill<'a>(&self) -> XmlNode<'a> {
60        let fields = self.core.distill();
61        childless_record("StreamVectorIndexLookupJoin", fields)
62    }
63}
64
65impl PlanTreeNodeUnary<Stream> for StreamVectorIndexLookupJoin {
66    fn input(&self) -> crate::PlanRef<Stream> {
67        self.core.input.clone()
68    }
69
70    fn clone_with_input(&self, input: crate::PlanRef<Stream>) -> Self {
71        let mut core = self.core.clone();
72        core.input = input;
73        Self::with_core(core)
74    }
75}
76
77impl_plan_tree_node_for_unary!(Stream, StreamVectorIndexLookupJoin);
78
79impl StreamNode for StreamVectorIndexLookupJoin {
80    fn to_stream_prost_body(
81        &self,
82        _state: &mut BuildFragmentGraphState,
83    ) -> risingwave_pb::stream_plan::stream_node::NodeBody {
84        risingwave_pb::stream_plan::stream_node::NodeBody::VectorIndexLookupJoin(
85            PbVectorIndexLookupJoinNode {
86                reader_desc: Some(PbVectorIndexReaderDesc {
87                    table_id: self.core.index_table_id,
88                    info_column_desc: self
89                        .core
90                        .info_column_desc
91                        .iter()
92                        .map(|col| col.to_protobuf())
93                        .collect(),
94                    top_n: self.core.top_n as _,
95                    distance_type: self.core.distance_type as _,
96                    hnsw_ef_search: self.core.hnsw_ef_search.unwrap_or(0) as _,
97                    info_output_indices: self
98                        .core
99                        .info_output_indices
100                        .iter()
101                        .map(|&idx| idx as _)
102                        .collect(),
103                    include_distance: self.core.include_distance,
104                }),
105                vector_column_idx: self.core.vector_column_idx as _,
106            }
107            .into(),
108        )
109    }
110}
111
112impl ExprVisitable for StreamVectorIndexLookupJoin {}
113
114impl ExprRewritable<Stream> for StreamVectorIndexLookupJoin {}