risingwave_batch_executors/executor/
vector_index_nearest.rs1use anyhow::anyhow;
16use futures::pin_mut;
17use futures::prelude::stream::StreamExt;
18use futures_async_stream::try_stream;
19use futures_util::TryStreamExt;
20use risingwave_common::array::DataChunk;
21use risingwave_common::catalog::{Field, Schema};
22use risingwave_common::types::DataType;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24use risingwave_pb::common::BatchQueryEpoch;
25use risingwave_storage::table::batch_table::VectorIndexReader;
26use risingwave_storage::{StateStore, dispatch_state_store};
27
28use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
29use crate::error::{BatchError, Result};
30
31pub struct VectorIndexNearestExecutor<S: StateStore> {
32 identity: String,
33 schema: Schema,
34
35 input: BoxedExecutor,
36 query_epoch: BatchQueryEpoch,
37 vector_column_idx: usize,
38
39 reader: VectorIndexReader<S>,
40}
41
42pub struct VectorIndexNearestExecutorBuilder {}
43
44impl BoxedExecutorBuilder for VectorIndexNearestExecutorBuilder {
45 async fn new_boxed_executor(
46 source: &ExecutorBuilder<'_>,
47 inputs: Vec<BoxedExecutor>,
48 ) -> Result<BoxedExecutor> {
49 ensure!(
50 inputs.len() == 1,
51 "VectorIndexNearest should have an input executor!"
52 );
53 let [input]: [_; 1] = inputs.try_into().unwrap();
54 let vector_index_nearest_node = try_match_expand!(
55 source.plan_node().get_node_body().unwrap(),
56 NodeBody::VectorIndexNearest
57 )?;
58
59 dispatch_state_store!(source.context().state_store(), state_store, {
60 let reader = VectorIndexReader::new(
61 vector_index_nearest_node.reader_desc.as_ref().unwrap(),
62 state_store,
63 );
64
65 let mut schema = input.schema().clone();
66 schema.fields.push(Field::new(
67 "vector_info",
68 DataType::list(DataType::Struct(reader.info_struct_type().clone())),
69 ));
70
71 Ok(Box::new(VectorIndexNearestExecutor {
72 identity: source.plan_node().get_identity().clone(),
73 schema,
74 query_epoch: vector_index_nearest_node.query_epoch.ok_or_else(|| {
75 anyhow!("vector_index_query not set in distributed lookup join")
76 })?,
77 vector_column_idx: vector_index_nearest_node.vector_column_idx as usize,
78 input,
79 reader,
80 }))
81 })
82 }
83}
84
85impl<S: StateStore> Executor for VectorIndexNearestExecutor<S> {
86 fn schema(&self) -> &Schema {
87 &self.schema
88 }
89
90 fn identity(&self) -> &str {
91 &self.identity
92 }
93
94 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
95 self.do_execute().boxed()
96 }
97}
98
99impl<S: StateStore> VectorIndexNearestExecutor<S> {
100 #[try_stream(ok = DataChunk, error = BatchError)]
101 async fn do_execute(self: Box<Self>) {
102 let Self {
103 query_epoch,
104 input,
105 vector_column_idx,
106 ..
107 } = *self;
108
109 let input = input.execute();
110 pin_mut!(input);
111
112 let read_snapshot = self.reader.new_snapshot(query_epoch.into()).await?;
113
114 while let Some(chunk) = input.try_next().await? {
115 yield read_snapshot
116 .query_expand_chunk(chunk, vector_column_idx)
117 .await?;
118 }
119 }
120}