risingwave_frontend/optimizer/plan_node/
batch_vector_search.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{ColumnDesc, Field, Schema};
17use risingwave_common::types::{DataType, StructType};
18use risingwave_pb::batch_plan::PbVectorIndexNearestNode;
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20use risingwave_pb::common::PbDistanceType;
21
22use crate::OptimizerContextRef;
23use crate::catalog::TableId;
24use crate::expr::{ExprDisplay, ExprImpl, InputRef};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::{GenericPlanNode, GenericPlanRef};
27use crate::optimizer::plan_node::utils::{Distill, childless_record};
28use crate::optimizer::plan_node::{
29    Batch, BatchPlanRef as PlanRef, BatchPlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary,
30    ToBatchPb, ToDistributedBatch, ToLocalBatch,
31};
32use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order};
33
34#[derive(Debug, Clone, educe::Educe)]
35#[educe(Hash, PartialEq, Eq)]
36pub struct BatchVectorSearchCore {
37    pub input: BatchPlanRef,
38    pub top_n: u64,
39    pub distance_type: PbDistanceType,
40    pub index_name: String,
41    pub index_table_id: TableId,
42    pub info_column_desc: Vec<ColumnDesc>,
43    pub vector_column_idx: usize,
44    pub hnsw_ef_search: Option<usize>,
45    #[educe(Hash(ignore), Eq(ignore))]
46    pub ctx: OptimizerContextRef,
47}
48
49impl GenericPlanNode for BatchVectorSearchCore {
50    fn functional_dependency(&self) -> FunctionalDependencySet {
51        // FunctionalDependencySet::new(self.info_column_desc.len())
52        // TODO: include dependency derived from info columns
53        self.input.functional_dependency().clone()
54    }
55
56    fn schema(&self) -> Schema {
57        let mut schema = self.input.schema().clone();
58        schema.fields.push(Field::new(
59            "vector_info",
60            DataType::List(
61                DataType::Struct(StructType::new(
62                    self.info_column_desc
63                        .iter()
64                        .map(|col| (col.name.clone(), col.data_type.clone()))
65                        .chain([("__distance".to_owned(), DataType::Float64)]),
66                ))
67                .into(),
68            ),
69        ));
70        schema
71    }
72
73    fn stream_key(&self) -> Option<Vec<usize>> {
74        None
75    }
76
77    fn ctx(&self) -> OptimizerContextRef {
78        self.ctx.clone()
79    }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83pub struct BatchVectorSearch {
84    pub base: PlanBase<Batch>,
85    pub core: BatchVectorSearchCore,
86}
87
88impl BatchVectorSearch {
89    pub(super) fn with_core(core: BatchVectorSearchCore) -> Self {
90        Self::with_core_inner(core, Distribution::Single)
91    }
92
93    fn with_core_someshard(core: BatchVectorSearchCore) -> Self {
94        Self::with_core_inner(core, Distribution::SomeShard)
95    }
96
97    fn with_core_inner(core: BatchVectorSearchCore, distribution: Distribution) -> Self {
98        // TODO: support specifying order in nested struct to avoid unnecessary sort
99        let order = Order::any();
100        let base = PlanBase::new_batch_with_core(&core, distribution, order);
101        Self { base, core }
102    }
103}
104
105impl Distill for BatchVectorSearch {
106    fn distill<'a>(&self) -> XmlNode<'a> {
107        let mut fields = vec![
108            (
109                "schema",
110                Pretty::Array(self.schema().fields.iter().map(Pretty::debug).collect()),
111            ),
112            ("top_n", Pretty::debug(&self.core.top_n)),
113            ("distance_type", Pretty::debug(&self.core.distance_type)),
114            ("index_name", Pretty::debug(&self.core.index_name)),
115            (
116                "vector",
117                Pretty::debug(&ExprDisplay {
118                    expr: &ExprImpl::InputRef(
119                        InputRef::new(
120                            self.core.vector_column_idx,
121                            self.core.input.schema()[self.core.vector_column_idx].data_type(),
122                        )
123                        .into(),
124                    ),
125                    input_schema: self.core.input.schema(),
126                }),
127            ),
128        ];
129        if let Some(hnsw_ef_search) = self.core.hnsw_ef_search {
130            fields.push(("hnsw_ef_search", Pretty::debug(&hnsw_ef_search)));
131        }
132        childless_record("BatchVectorSearch", fields)
133    }
134}
135
136impl PlanTreeNodeUnary<Batch> for BatchVectorSearch {
137    fn input(&self) -> crate::PlanRef<Batch> {
138        self.core.input.clone()
139    }
140
141    fn clone_with_input(&self, input: crate::PlanRef<Batch>) -> Self {
142        let mut core = self.core.clone();
143        core.input = input;
144        Self::with_core(core)
145    }
146}
147
148impl_plan_tree_node_for_unary!(Batch, BatchVectorSearch);
149
150impl ToBatchPb for BatchVectorSearch {
151    fn to_batch_prost_body(&self) -> NodeBody {
152        NodeBody::VectorIndexNearest(PbVectorIndexNearestNode {
153            table_id: self.core.index_table_id.table_id,
154            info_column_desc: self
155                .core
156                .info_column_desc
157                .iter()
158                .map(|col| col.to_protobuf())
159                .collect(),
160            vector_column_idx: self.core.vector_column_idx as _,
161            top_n: self.core.top_n as _,
162            distance_type: self.core.distance_type as _,
163            hnsw_ef_search: self.core.hnsw_ef_search.unwrap_or(0) as _,
164        })
165    }
166}
167
168impl ToLocalBatch for BatchVectorSearch {
169    fn to_local(&self) -> crate::error::Result<PlanRef> {
170        Ok(Self::with_core_someshard(self.core.clone()).into())
171    }
172}
173
174impl ToDistributedBatch for BatchVectorSearch {
175    fn to_distributed(&self) -> crate::error::Result<PlanRef> {
176        Ok(Self::with_core_someshard(self.core.clone()).into())
177    }
178}
179
180impl ExprVisitable for BatchVectorSearch {}
181
182impl ExprRewritable<Batch> for BatchVectorSearch {}