risingwave_frontend/optimizer/plan_node/
batch_vector_search.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{ColumnDesc, Field, Schema};
17use risingwave_common::types::{DataType, StructType};
18use risingwave_pb::batch_plan::PbVectorIndexNearestNode;
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20use risingwave_pb::common::PbDistanceType;
21
22use crate::OptimizerContextRef;
23use crate::catalog::TableId;
24use crate::expr::{ExprDisplay, ExprImpl, InputRef};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::{GenericPlanNode, GenericPlanRef};
27use crate::optimizer::plan_node::utils::{Distill, childless_record};
28use crate::optimizer::plan_node::{
29 Batch, BatchPlanRef as PlanRef, BatchPlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary,
30 ToBatchPb, ToDistributedBatch, ToLocalBatch,
31};
32use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order};
33
34#[derive(Debug, Clone, educe::Educe)]
35#[educe(Hash, PartialEq, Eq)]
36pub struct BatchVectorSearchCore {
37 pub input: BatchPlanRef,
38 pub top_n: u64,
39 pub distance_type: PbDistanceType,
40 pub index_name: String,
41 pub index_table_id: TableId,
42 pub info_column_desc: Vec<ColumnDesc>,
43 pub vector_column_idx: usize,
44 pub hnsw_ef_search: Option<usize>,
45 #[educe(Hash(ignore), Eq(ignore))]
46 pub ctx: OptimizerContextRef,
47}
48
49impl GenericPlanNode for BatchVectorSearchCore {
50 fn functional_dependency(&self) -> FunctionalDependencySet {
51 self.input.functional_dependency().clone()
54 }
55
56 fn schema(&self) -> Schema {
57 let mut schema = self.input.schema().clone();
58 schema.fields.push(Field::new(
59 "vector_info",
60 DataType::List(
61 DataType::Struct(StructType::new(
62 self.info_column_desc
63 .iter()
64 .map(|col| (col.name.clone(), col.data_type.clone()))
65 .chain([("__distance".to_owned(), DataType::Float64)]),
66 ))
67 .into(),
68 ),
69 ));
70 schema
71 }
72
73 fn stream_key(&self) -> Option<Vec<usize>> {
74 None
75 }
76
77 fn ctx(&self) -> OptimizerContextRef {
78 self.ctx.clone()
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83pub struct BatchVectorSearch {
84 pub base: PlanBase<Batch>,
85 pub core: BatchVectorSearchCore,
86}
87
88impl BatchVectorSearch {
89 pub(super) fn with_core(core: BatchVectorSearchCore) -> Self {
90 Self::with_core_inner(core, Distribution::Single)
91 }
92
93 fn with_core_someshard(core: BatchVectorSearchCore) -> Self {
94 Self::with_core_inner(core, Distribution::SomeShard)
95 }
96
97 fn with_core_inner(core: BatchVectorSearchCore, distribution: Distribution) -> Self {
98 let order = Order::any();
100 let base = PlanBase::new_batch_with_core(&core, distribution, order);
101 Self { base, core }
102 }
103}
104
105impl Distill for BatchVectorSearch {
106 fn distill<'a>(&self) -> XmlNode<'a> {
107 let mut fields = vec![
108 (
109 "schema",
110 Pretty::Array(self.schema().fields.iter().map(Pretty::debug).collect()),
111 ),
112 ("top_n", Pretty::debug(&self.core.top_n)),
113 ("distance_type", Pretty::debug(&self.core.distance_type)),
114 ("index_name", Pretty::debug(&self.core.index_name)),
115 (
116 "vector",
117 Pretty::debug(&ExprDisplay {
118 expr: &ExprImpl::InputRef(
119 InputRef::new(
120 self.core.vector_column_idx,
121 self.core.input.schema()[self.core.vector_column_idx].data_type(),
122 )
123 .into(),
124 ),
125 input_schema: self.core.input.schema(),
126 }),
127 ),
128 ];
129 if let Some(hnsw_ef_search) = self.core.hnsw_ef_search {
130 fields.push(("hnsw_ef_search", Pretty::debug(&hnsw_ef_search)));
131 }
132 childless_record("BatchVectorSearch", fields)
133 }
134}
135
136impl PlanTreeNodeUnary<Batch> for BatchVectorSearch {
137 fn input(&self) -> crate::PlanRef<Batch> {
138 self.core.input.clone()
139 }
140
141 fn clone_with_input(&self, input: crate::PlanRef<Batch>) -> Self {
142 let mut core = self.core.clone();
143 core.input = input;
144 Self::with_core(core)
145 }
146}
147
148impl_plan_tree_node_for_unary!(Batch, BatchVectorSearch);
149
150impl ToBatchPb for BatchVectorSearch {
151 fn to_batch_prost_body(&self) -> NodeBody {
152 NodeBody::VectorIndexNearest(PbVectorIndexNearestNode {
153 table_id: self.core.index_table_id.table_id,
154 info_column_desc: self
155 .core
156 .info_column_desc
157 .iter()
158 .map(|col| col.to_protobuf())
159 .collect(),
160 vector_column_idx: self.core.vector_column_idx as _,
161 top_n: self.core.top_n as _,
162 distance_type: self.core.distance_type as _,
163 hnsw_ef_search: self.core.hnsw_ef_search.unwrap_or(0) as _,
164 })
165 }
166}
167
168impl ToLocalBatch for BatchVectorSearch {
169 fn to_local(&self) -> crate::error::Result<PlanRef> {
170 Ok(Self::with_core_someshard(self.core.clone()).into())
171 }
172}
173
174impl ToDistributedBatch for BatchVectorSearch {
175 fn to_distributed(&self) -> crate::error::Result<PlanRef> {
176 Ok(Self::with_core_someshard(self.core.clone()).into())
177 }
178}
179
180impl ExprVisitable for BatchVectorSearch {}
181
182impl ExprRewritable<Batch> for BatchVectorSearch {}