risingwave_batch_executors/executor/
vector_index_nearest.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use futures::pin_mut;
17use futures::prelude::stream::StreamExt;
18use futures_async_stream::try_stream;
19use futures_util::TryStreamExt;
20use risingwave_common::array::DataChunk;
21use risingwave_common::catalog::{Field, Schema};
22use risingwave_common::types::DataType;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24use risingwave_pb::common::BatchQueryEpoch;
25use risingwave_storage::table::batch_table::VectorIndexReader;
26use risingwave_storage::{StateStore, dispatch_state_store};
27
28use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
29use crate::error::{BatchError, Result};
30
31pub struct VectorIndexNearestExecutor<S: StateStore> {
32    identity: String,
33    schema: Schema,
34
35    input: BoxedExecutor,
36    query_epoch: BatchQueryEpoch,
37    vector_column_idx: usize,
38
39    reader: VectorIndexReader<S>,
40}
41
42pub struct VectorIndexNearestExecutorBuilder {}
43
44impl BoxedExecutorBuilder for VectorIndexNearestExecutorBuilder {
45    async fn new_boxed_executor(
46        source: &ExecutorBuilder<'_>,
47        inputs: Vec<BoxedExecutor>,
48    ) -> Result<BoxedExecutor> {
49        ensure!(
50            inputs.len() == 1,
51            "VectorIndexNearest should have an input executor!"
52        );
53        let [input]: [_; 1] = inputs.try_into().unwrap();
54        let vector_index_nearest_node = try_match_expand!(
55            source.plan_node().get_node_body().unwrap(),
56            NodeBody::VectorIndexNearest
57        )?;
58
59        dispatch_state_store!(source.context().state_store(), state_store, {
60            let reader = VectorIndexReader::new(
61                vector_index_nearest_node.reader_desc.as_ref().unwrap(),
62                state_store,
63            );
64
65            let mut schema = input.schema().clone();
66            schema.fields.push(Field::new(
67                "vector_info",
68                DataType::list(DataType::Struct(reader.info_struct_type().clone())),
69            ));
70
71            Ok(Box::new(VectorIndexNearestExecutor {
72                identity: source.plan_node().get_identity().clone(),
73                schema,
74                query_epoch: vector_index_nearest_node.query_epoch.ok_or_else(|| {
75                    anyhow!("vector_index_query not set in distributed lookup join")
76                })?,
77                vector_column_idx: vector_index_nearest_node.vector_column_idx as usize,
78                input,
79                reader,
80            }))
81        })
82    }
83}
84
85impl<S: StateStore> Executor for VectorIndexNearestExecutor<S> {
86    fn schema(&self) -> &Schema {
87        &self.schema
88    }
89
90    fn identity(&self) -> &str {
91        &self.identity
92    }
93
94    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
95        self.do_execute().boxed()
96    }
97}
98
99impl<S: StateStore> VectorIndexNearestExecutor<S> {
100    #[try_stream(ok = DataChunk, error = BatchError)]
101    async fn do_execute(self: Box<Self>) {
102        let Self {
103            query_epoch,
104            input,
105            vector_column_idx,
106            ..
107        } = *self;
108
109        let input = input.execute();
110        pin_mut!(input);
111
112        let read_snapshot = self.reader.new_snapshot(query_epoch.into()).await?;
113
114        while let Some(chunk) = input.try_next().await? {
115            yield read_snapshot
116                .query_expand_chunk(chunk, vector_column_idx)
117                .await?;
118        }
119    }
120}