risingwave_frontend/optimizer/plan_node/
batch_vector_search.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{ColumnDesc, Field, Schema};
17use risingwave_common::types::{DataType, StructType};
18use risingwave_pb::batch_plan::PbVectorIndexNearestNode;
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20use risingwave_pb::common::PbDistanceType;
21
22use crate::OptimizerContextRef;
23use crate::catalog::TableId;
24use crate::expr::{ExprDisplay, ExprImpl, InputRef};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::{GenericPlanNode, GenericPlanRef, PhysicalPlanRef};
27use crate::optimizer::plan_node::utils::{Distill, childless_record};
28use crate::optimizer::plan_node::{
29 Batch, BatchPlanRef as PlanRef, BatchPlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary,
30 ToBatchPb, ToDistributedBatch, ToLocalBatch,
31};
32use crate::optimizer::property::{FunctionalDependencySet, Order};
33
34#[derive(Debug, Clone, educe::Educe)]
35#[educe(Hash, PartialEq, Eq)]
36pub struct BatchVectorSearchCore {
37 pub input: BatchPlanRef,
38 pub top_n: u64,
39 pub distance_type: PbDistanceType,
40 pub index_name: String,
41 pub index_table_id: TableId,
42 pub info_column_desc: Vec<ColumnDesc>,
43 pub vector_column_idx: usize,
44 pub hnsw_ef_search: Option<usize>,
45 #[educe(Hash(ignore), Eq(ignore))]
46 pub ctx: OptimizerContextRef,
47}
48
49impl GenericPlanNode for BatchVectorSearchCore {
50 fn functional_dependency(&self) -> FunctionalDependencySet {
51 self.input.functional_dependency().clone()
54 }
55
56 fn schema(&self) -> Schema {
57 let mut schema = self.input.schema().clone();
58 schema.fields.push(Field::new(
59 "vector_info",
60 DataType::list(
61 StructType::new(
62 self.info_column_desc
63 .iter()
64 .map(|col| (col.name.clone(), col.data_type.clone()))
65 .chain([("__distance".to_owned(), DataType::Float64)]),
66 )
67 .into(),
68 ),
69 ));
70 schema
71 }
72
73 fn stream_key(&self) -> Option<Vec<usize>> {
74 None
75 }
76
77 fn ctx(&self) -> OptimizerContextRef {
78 self.ctx.clone()
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83pub struct BatchVectorSearch {
84 pub base: PlanBase<Batch>,
85 pub core: BatchVectorSearchCore,
86}
87
88impl BatchVectorSearch {
89 pub(super) fn with_core(core: BatchVectorSearchCore) -> Self {
90 let order = Order::any();
92 let base = PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), order);
93 Self { base, core }
94 }
95}
96
97impl Distill for BatchVectorSearch {
98 fn distill<'a>(&self) -> XmlNode<'a> {
99 let mut fields = vec![
100 (
101 "schema",
102 Pretty::Array(self.schema().fields.iter().map(Pretty::debug).collect()),
103 ),
104 ("top_n", Pretty::debug(&self.core.top_n)),
105 ("distance_type", Pretty::debug(&self.core.distance_type)),
106 ("index_name", Pretty::debug(&self.core.index_name)),
107 (
108 "vector",
109 Pretty::debug(&ExprDisplay {
110 expr: &ExprImpl::InputRef(
111 InputRef::new(
112 self.core.vector_column_idx,
113 self.core.input.schema()[self.core.vector_column_idx].data_type(),
114 )
115 .into(),
116 ),
117 input_schema: self.core.input.schema(),
118 }),
119 ),
120 ];
121 if let Some(hnsw_ef_search) = self.core.hnsw_ef_search {
122 fields.push(("hnsw_ef_search", Pretty::debug(&hnsw_ef_search)));
123 }
124 childless_record("BatchVectorSearch", fields)
125 }
126}
127
128impl PlanTreeNodeUnary<Batch> for BatchVectorSearch {
129 fn input(&self) -> crate::PlanRef<Batch> {
130 self.core.input.clone()
131 }
132
133 fn clone_with_input(&self, input: crate::PlanRef<Batch>) -> Self {
134 let mut core = self.core.clone();
135 core.input = input;
136 Self::with_core(core)
137 }
138}
139
140impl_plan_tree_node_for_unary!(Batch, BatchVectorSearch);
141
142impl ToBatchPb for BatchVectorSearch {
143 fn to_batch_prost_body(&self) -> NodeBody {
144 NodeBody::VectorIndexNearest(PbVectorIndexNearestNode {
145 table_id: self.core.index_table_id.table_id,
146 info_column_desc: self
147 .core
148 .info_column_desc
149 .iter()
150 .map(|col| col.to_protobuf())
151 .collect(),
152 vector_column_idx: self.core.vector_column_idx as _,
153 top_n: self.core.top_n as _,
154 distance_type: self.core.distance_type as _,
155 hnsw_ef_search: self.core.hnsw_ef_search.unwrap_or(0) as _,
156 })
157 }
158}
159
160impl ToLocalBatch for BatchVectorSearch {
161 fn to_local(&self) -> crate::error::Result<PlanRef> {
162 let mut core = self.core.clone();
163 core.input = core.input.to_local()?;
164 Ok(Self::with_core(core).into())
165 }
166}
167
168impl ToDistributedBatch for BatchVectorSearch {
169 fn to_distributed(&self) -> crate::error::Result<PlanRef> {
170 let mut core = self.core.clone();
171 core.input = core.input.to_distributed()?;
172 Ok(Self::with_core(core).into())
173 }
174}
175
176impl ExprVisitable for BatchVectorSearch {}
177
178impl ExprRewritable<Batch> for BatchVectorSearch {}