risingwave_frontend/optimizer/plan_node/
batch_vector_search.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::PbVectorIndexNearestNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
20use crate::optimizer::plan_node::generic::{PhysicalPlanRef, VectorIndexLookupJoin};
21use crate::optimizer::plan_node::utils::{Distill, childless_record, to_batch_query_epoch};
22use crate::optimizer::plan_node::{
23 Batch, BatchPlanRef as PlanRef, BatchPlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary,
24 ToDistributedBatch, ToLocalBatch, TryToBatchPb,
25};
26use crate::optimizer::property::Order;
27use crate::scheduler::SchedulerResult;
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct BatchVectorSearch {
31 pub base: PlanBase<Batch>,
32 pub core: VectorIndexLookupJoin<BatchPlanRef>,
33}
34
35impl BatchVectorSearch {
36 pub(super) fn with_core(core: VectorIndexLookupJoin<BatchPlanRef>) -> Self {
37 let order = Order::any();
39 let base = PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), order);
40 Self { base, core }
41 }
42}
43
44impl Distill for BatchVectorSearch {
45 fn distill<'a>(&self) -> XmlNode<'a> {
46 let fields = self.core.distill();
47 childless_record("BatchVectorSearch", fields)
48 }
49}
50
51impl PlanTreeNodeUnary<Batch> for BatchVectorSearch {
52 fn input(&self) -> crate::PlanRef<Batch> {
53 self.core.input.clone()
54 }
55
56 fn clone_with_input(&self, input: crate::PlanRef<Batch>) -> Self {
57 let mut core = self.core.clone();
58 core.input = input;
59 Self::with_core(core)
60 }
61}
62
63impl_plan_tree_node_for_unary!(Batch, BatchVectorSearch);
64
65impl TryToBatchPb for BatchVectorSearch {
66 fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
67 Ok(NodeBody::VectorIndexNearest(PbVectorIndexNearestNode {
68 reader_desc: Some(self.core.to_reader_desc()),
69 vector_column_idx: self.core.vector_column_idx as _,
70 query_epoch: to_batch_query_epoch(&self.core.as_of)?,
71 }))
72 }
73}
74
75impl ToLocalBatch for BatchVectorSearch {
76 fn to_local(&self) -> crate::error::Result<PlanRef> {
77 let mut core = self.core.clone();
78 core.input = core.input.to_local()?;
79 Ok(Self::with_core(core).into())
80 }
81}
82
83impl ToDistributedBatch for BatchVectorSearch {
84 fn to_distributed(&self) -> crate::error::Result<PlanRef> {
85 let mut core = self.core.clone();
86 core.input = core.input.to_distributed()?;
87 Ok(Self::with_core(core).into())
88 }
89}
90
91impl ExprVisitable for BatchVectorSearch {}
92
93impl ExprRewritable<Batch> for BatchVectorSearch {}