risingwave_frontend/optimizer/plan_node/
batch_vector_search.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{ColumnDesc, Field, Schema};
17use risingwave_common::types::{DataType, StructType};
18use risingwave_pb::batch_plan::PbVectorIndexNearestNode;
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20use risingwave_pb::common::PbDistanceType;
21
22use crate::OptimizerContextRef;
23use crate::catalog::TableId;
24use crate::expr::{ExprDisplay, ExprImpl, InputRef};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::{GenericPlanNode, GenericPlanRef, PhysicalPlanRef};
27use crate::optimizer::plan_node::utils::{Distill, childless_record};
28use crate::optimizer::plan_node::{
29    Batch, BatchPlanRef as PlanRef, BatchPlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary,
30    ToBatchPb, ToDistributedBatch, ToLocalBatch,
31};
32use crate::optimizer::property::{FunctionalDependencySet, Order};
33
34#[derive(Debug, Clone, educe::Educe)]
35#[educe(Hash, PartialEq, Eq)]
36pub struct BatchVectorSearchCore {
37    pub input: BatchPlanRef,
38    pub top_n: u64,
39    pub distance_type: PbDistanceType,
40    pub index_name: String,
41    pub index_table_id: TableId,
42    pub info_column_desc: Vec<ColumnDesc>,
43    pub vector_column_idx: usize,
44    pub hnsw_ef_search: Option<usize>,
45    #[educe(Hash(ignore), Eq(ignore))]
46    pub ctx: OptimizerContextRef,
47}
48
49impl GenericPlanNode for BatchVectorSearchCore {
50    fn functional_dependency(&self) -> FunctionalDependencySet {
51        // FunctionalDependencySet::new(self.info_column_desc.len())
52        // TODO: include dependency derived from info columns
53        self.input.functional_dependency().clone()
54    }
55
56    fn schema(&self) -> Schema {
57        let mut schema = self.input.schema().clone();
58        schema.fields.push(Field::new(
59            "vector_info",
60            DataType::list(
61                StructType::new(
62                    self.info_column_desc
63                        .iter()
64                        .map(|col| (col.name.clone(), col.data_type.clone()))
65                        .chain([("__distance".to_owned(), DataType::Float64)]),
66                )
67                .into(),
68            ),
69        ));
70        schema
71    }
72
73    fn stream_key(&self) -> Option<Vec<usize>> {
74        None
75    }
76
77    fn ctx(&self) -> OptimizerContextRef {
78        self.ctx.clone()
79    }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83pub struct BatchVectorSearch {
84    pub base: PlanBase<Batch>,
85    pub core: BatchVectorSearchCore,
86}
87
88impl BatchVectorSearch {
89    pub(super) fn with_core(core: BatchVectorSearchCore) -> Self {
90        // TODO: support specifying order in nested struct to avoid unnecessary sort
91        let order = Order::any();
92        let base = PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), order);
93        Self { base, core }
94    }
95}
96
97impl Distill for BatchVectorSearch {
98    fn distill<'a>(&self) -> XmlNode<'a> {
99        let mut fields = vec![
100            (
101                "schema",
102                Pretty::Array(self.schema().fields.iter().map(Pretty::debug).collect()),
103            ),
104            ("top_n", Pretty::debug(&self.core.top_n)),
105            ("distance_type", Pretty::debug(&self.core.distance_type)),
106            ("index_name", Pretty::debug(&self.core.index_name)),
107            (
108                "vector",
109                Pretty::debug(&ExprDisplay {
110                    expr: &ExprImpl::InputRef(
111                        InputRef::new(
112                            self.core.vector_column_idx,
113                            self.core.input.schema()[self.core.vector_column_idx].data_type(),
114                        )
115                        .into(),
116                    ),
117                    input_schema: self.core.input.schema(),
118                }),
119            ),
120        ];
121        if let Some(hnsw_ef_search) = self.core.hnsw_ef_search {
122            fields.push(("hnsw_ef_search", Pretty::debug(&hnsw_ef_search)));
123        }
124        childless_record("BatchVectorSearch", fields)
125    }
126}
127
128impl PlanTreeNodeUnary<Batch> for BatchVectorSearch {
129    fn input(&self) -> crate::PlanRef<Batch> {
130        self.core.input.clone()
131    }
132
133    fn clone_with_input(&self, input: crate::PlanRef<Batch>) -> Self {
134        let mut core = self.core.clone();
135        core.input = input;
136        Self::with_core(core)
137    }
138}
139
140impl_plan_tree_node_for_unary!(Batch, BatchVectorSearch);
141
142impl ToBatchPb for BatchVectorSearch {
143    fn to_batch_prost_body(&self) -> NodeBody {
144        NodeBody::VectorIndexNearest(PbVectorIndexNearestNode {
145            table_id: self.core.index_table_id.table_id,
146            info_column_desc: self
147                .core
148                .info_column_desc
149                .iter()
150                .map(|col| col.to_protobuf())
151                .collect(),
152            vector_column_idx: self.core.vector_column_idx as _,
153            top_n: self.core.top_n as _,
154            distance_type: self.core.distance_type as _,
155            hnsw_ef_search: self.core.hnsw_ef_search.unwrap_or(0) as _,
156        })
157    }
158}
159
160impl ToLocalBatch for BatchVectorSearch {
161    fn to_local(&self) -> crate::error::Result<PlanRef> {
162        let mut core = self.core.clone();
163        core.input = core.input.to_local()?;
164        Ok(Self::with_core(core).into())
165    }
166}
167
168impl ToDistributedBatch for BatchVectorSearch {
169    fn to_distributed(&self) -> crate::error::Result<PlanRef> {
170        let mut core = self.core.clone();
171        core.input = core.input.to_distributed()?;
172        Ok(Self::with_core(core).into())
173    }
174}
175
176impl ExprVisitable for BatchVectorSearch {}
177
178impl ExprRewritable<Batch> for BatchVectorSearch {}