risingwave_stream/from_proto/
vector_index_lookup_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::DataType;
16use risingwave_pb::stream_plan::VectorIndexLookupJoinNode;
17use risingwave_storage::StateStore;
18use risingwave_storage::table::batch_table::VectorIndexReader;
19
20use crate::error::StreamResult;
21use crate::executor::{Executor, VectorIndexLookupJoinExecutor};
22use crate::from_proto::ExecutorBuilder;
23use crate::task::ExecutorParams;
24
25pub struct VectorIndexLookupJoinBuilder;
26
27impl ExecutorBuilder for VectorIndexLookupJoinBuilder {
28    type Node = VectorIndexLookupJoinNode;
29
30    async fn new_boxed_executor(
31        params: ExecutorParams,
32        node: &Self::Node,
33        store: impl StateStore,
34    ) -> StreamResult<Executor> {
35        let [input]: [_; 1] = params.input.try_into().unwrap();
36
37        let reader = VectorIndexReader::new(node.reader_desc.as_ref().unwrap(), store);
38
39        assert!(
40            params
41                .info
42                .schema
43                .fields
44                .last()
45                .expect("non-empty")
46                .data_type
47                .equals_datatype(&DataType::Struct(reader.info_struct_type().clone()).list())
48        );
49
50        let executor =
51            VectorIndexLookupJoinExecutor::new(input, reader, node.vector_column_idx as _);
52        Ok(Executor::new(params.info, Box::new(executor)))
53    }
54}