risingwave_frontend/optimizer/plan_node/
batch_vector_search.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::PbVectorIndexNearestNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
20use crate::optimizer::plan_node::generic::{PhysicalPlanRef, VectorIndexLookupJoin};
21use crate::optimizer::plan_node::utils::{Distill, childless_record, to_batch_query_epoch};
22use crate::optimizer::plan_node::{
23    Batch, BatchPlanRef as PlanRef, BatchPlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary,
24    ToDistributedBatch, ToLocalBatch, TryToBatchPb,
25};
26use crate::optimizer::property::Order;
27use crate::scheduler::SchedulerResult;
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct BatchVectorSearch {
31    pub base: PlanBase<Batch>,
32    pub core: VectorIndexLookupJoin<BatchPlanRef>,
33}
34
35impl BatchVectorSearch {
36    pub(super) fn with_core(core: VectorIndexLookupJoin<BatchPlanRef>) -> Self {
37        // TODO: support specifying order in nested struct to avoid unnecessary sort
38        let order = Order::any();
39        let base = PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), order);
40        Self { base, core }
41    }
42}
43
44impl Distill for BatchVectorSearch {
45    fn distill<'a>(&self) -> XmlNode<'a> {
46        let fields = self.core.distill();
47        childless_record("BatchVectorSearch", fields)
48    }
49}
50
51impl PlanTreeNodeUnary<Batch> for BatchVectorSearch {
52    fn input(&self) -> crate::PlanRef<Batch> {
53        self.core.input.clone()
54    }
55
56    fn clone_with_input(&self, input: crate::PlanRef<Batch>) -> Self {
57        let mut core = self.core.clone();
58        core.input = input;
59        Self::with_core(core)
60    }
61}
62
63impl_plan_tree_node_for_unary!(Batch, BatchVectorSearch);
64
65impl TryToBatchPb for BatchVectorSearch {
66    fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
67        Ok(NodeBody::VectorIndexNearest(PbVectorIndexNearestNode {
68            reader_desc: Some(self.core.to_reader_desc()),
69            vector_column_idx: self.core.vector_column_idx as _,
70            query_epoch: to_batch_query_epoch(&self.core.as_of)?,
71        }))
72    }
73}
74
75impl ToLocalBatch for BatchVectorSearch {
76    fn to_local(&self) -> crate::error::Result<PlanRef> {
77        let mut core = self.core.clone();
78        core.input = core.input.to_local()?;
79        Ok(Self::with_core(core).into())
80    }
81}
82
83impl ToDistributedBatch for BatchVectorSearch {
84    fn to_distributed(&self) -> crate::error::Result<PlanRef> {
85        let mut core = self.core.clone();
86        core.input = core.input.to_distributed()?;
87        Ok(Self::with_core(core).into())
88    }
89}
90
91impl ExprVisitable for BatchVectorSearch {}
92
93impl ExprRewritable<Batch> for BatchVectorSearch {}