risingwave_stream/from_proto/
vector_index_lookup_join.rs1use risingwave_common::types::DataType;
16use risingwave_pb::stream_plan::VectorIndexLookupJoinNode;
17use risingwave_storage::StateStore;
18use risingwave_storage::table::batch_table::VectorIndexReader;
19
20use crate::error::StreamResult;
21use crate::executor::{Executor, VectorIndexLookupJoinExecutor};
22use crate::from_proto::ExecutorBuilder;
23use crate::task::ExecutorParams;
24
25pub struct VectorIndexLookupJoinBuilder;
26
27impl ExecutorBuilder for VectorIndexLookupJoinBuilder {
28 type Node = VectorIndexLookupJoinNode;
29
30 async fn new_boxed_executor(
31 params: ExecutorParams,
32 node: &Self::Node,
33 store: impl StateStore,
34 ) -> StreamResult<Executor> {
35 let [input]: [_; 1] = params.input.try_into().unwrap();
36
37 let reader = VectorIndexReader::new(node.reader_desc.as_ref().unwrap(), store);
38
39 assert!(
40 params
41 .info
42 .schema
43 .fields
44 .last()
45 .expect("non-empty")
46 .data_type
47 .equals_datatype(&DataType::Struct(reader.info_struct_type().clone()).list())
48 );
49
50 let executor =
51 VectorIndexLookupJoinExecutor::new(input, reader, node.vector_column_idx as _);
52 Ok(Executor::new(params.info, Box::new(executor)))
53 }
54}