risingwave_stream/from_proto/
vector_index_lookup_join.rs1use risingwave_common::types::DataType;
16use risingwave_pb::stream_plan::VectorIndexLookupJoinNode;
17use risingwave_storage::StateStore;
18use risingwave_storage::table::batch_table::VectorIndexReader;
19
20use crate::error::StreamResult;
21use crate::executor::{Executor, VectorIndexLookupJoinExecutor};
22use crate::from_proto::ExecutorBuilder;
23use crate::task::ExecutorParams;
24
25pub struct VectorIndexLookupJoinBuilder;
26
27impl_stream_node_body!(VectorIndexLookupJoin(VectorIndexLookupJoinNode) => VectorIndexLookupJoinBuilder);
28
29impl ExecutorBuilder for VectorIndexLookupJoinBuilder {
30 type Node = VectorIndexLookupJoinNode;
31
32 async fn new_boxed_executor(
33 params: ExecutorParams,
34 node: &Self::Node,
35 store: impl StateStore,
36 ) -> StreamResult<Executor> {
37 let [input]: [_; 1] = params.input.try_into().unwrap();
38
39 let reader = VectorIndexReader::new(node.reader_desc.as_ref().unwrap(), store);
40
41 assert!(
42 params
43 .info
44 .schema
45 .fields
46 .last()
47 .expect("non-empty")
48 .data_type
49 .equals_datatype(&DataType::Struct(reader.info_struct_type().clone()).list())
50 );
51
52 let executor =
53 VectorIndexLookupJoinExecutor::new(input, reader, node.vector_column_idx as _);
54 Ok(Executor::new(params.info, Box::new(executor)))
55 }
56}