risingwave_frontend/optimizer/plan_node/
stream_vector_index_write.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroU32;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_common::catalog::{
20    ColumnCatalog, ConflictBehavior, CreateType, Engine, StreamJobStatus, TableId,
21};
22use risingwave_common::hash::VnodeCount;
23use risingwave_common::id::FragmentId;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_pb::catalog::PbVectorIndexInfo;
26use risingwave_pb::stream_plan::stream_node::PbNodeBody;
27
28use crate::TableCatalog;
29use crate::catalog::table_catalog::TableType;
30use crate::catalog::{DatabaseId, SchemaId};
31use crate::error::ErrorCode;
32use crate::optimizer::StreamOptimizedLogicalPlanRoot;
33use crate::optimizer::plan_node::derive::{derive_columns, derive_pk};
34use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
35use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
36use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
37use crate::optimizer::plan_node::utils::{Distill, childless_record, plan_can_use_background_ddl};
38use crate::optimizer::plan_node::{
39    ExprRewritable, PlanBase, PlanNodeMeta, PlanTreeNodeUnary, Stream, StreamNode,
40    StreamPlanRef as PlanRef, reorganize_elements_id,
41};
42use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist, StreamKind};
43use crate::stream_fragmenter::BuildFragmentGraphState;
44
45#[derive(Debug, Clone, PartialEq, Eq, Hash)]
46pub struct StreamVectorIndexWrite {
47    pub base: PlanBase<Stream>,
48    /// Child of Vector Index Write plan
49    input: PlanRef,
50    table: TableCatalog,
51}
52
53impl StreamVectorIndexWrite {
54    fn new(input: PlanRef, table: TableCatalog) -> crate::error::Result<Self> {
55        if input.stream_kind() != StreamKind::AppendOnly {
56            return Err(ErrorCode::NotSupported(
57                "cannot create vector index on non-append-only workflow".to_owned(),
58                "try create vector index on append only workflow".to_owned(),
59            )
60            .into());
61        }
62        let base = PlanBase::new_stream(
63            input.ctx(),
64            input.schema().clone(),
65            Some(table.stream_key()),
66            input.functional_dependency().clone(),
67            input.distribution().clone(),
68            StreamKind::AppendOnly,
69            input.emit_on_window_close(),
70            input.watermark_columns().clone(),
71            input.columns_monotonicity().clone(),
72        );
73        Ok(Self { base, input, table })
74    }
75
76    pub fn create(
77        StreamOptimizedLogicalPlanRoot {
78            plan: input,
79            required_dist: user_distributed_by,
80            required_order: user_order_by,
81            out_fields: user_cols,
82            out_names,
83            ..
84        }: StreamOptimizedLogicalPlanRoot,
85        name: String,
86        database_id: DatabaseId,
87        schema_id: SchemaId,
88        definition: String,
89        cardinality: Cardinality,
90        retention_seconds: Option<NonZeroU32>,
91        vector_index_info: PbVectorIndexInfo,
92    ) -> crate::error::Result<Self> {
93        let input = RequiredDist::PhysicalDist(Distribution::Single)
94            .streaming_enforce_if_not_satisfies(input)?;
95        // the hidden column name might refer some expr id
96        let input = reorganize_elements_id(input);
97        let columns = derive_columns(input.schema(), out_names, &user_cols)?;
98
99        let create_type = if input.ctx().session_ctx().config().background_ddl()
100            && plan_can_use_background_ddl(&input)
101        {
102            CreateType::Background
103        } else {
104            CreateType::Foreground
105        };
106
107        let table = Self::derive_table_catalog(
108            input.clone(),
109            name,
110            database_id,
111            schema_id,
112            user_distributed_by,
113            user_order_by,
114            columns,
115            definition,
116            cardinality,
117            retention_seconds,
118            create_type,
119            vector_index_info,
120        )?;
121
122        Self::new(input, table)
123    }
124
125    #[expect(clippy::too_many_arguments)]
126    fn derive_table_catalog(
127        rewritten_input: PlanRef,
128        name: String,
129        database_id: DatabaseId,
130        schema_id: SchemaId,
131        user_distributed_by: RequiredDist,
132        user_order_by: Order,
133        columns: Vec<ColumnCatalog>,
134        definition: String,
135        cardinality: Cardinality,
136        retention_seconds: Option<NonZeroU32>,
137        create_type: CreateType,
138        vector_index_info: PbVectorIndexInfo,
139    ) -> crate::error::Result<TableCatalog> {
140        let input = rewritten_input;
141
142        let value_indices = (0..columns.len()).collect_vec();
143        assert_eq!(input.distribution(), &Distribution::Single);
144        let distribution_key = vec![];
145        let append_only = input.append_only();
146        // TODO(rc): In `TableCatalog` we still use `FixedBitSet` for watermark columns, ignoring the watermark group information.
147        // We will record the watermark group information in `TableCatalog` in the future. For now, let's flatten the watermark columns.
148        let watermark_columns = input.watermark_columns().indices().collect();
149
150        let (table_pk, stream_key) = derive_pk(input, user_distributed_by, user_order_by, &columns);
151        // assert: `stream_key` is a subset of `table_pk`
152
153        // Vector index writes go through vector writer APIs instead of StateTable
154        // get/prefix-iter reads, so prefix SST filters are not consumed.
155        let read_prefix_len_hint = 0;
156        // We don't need to fill in table id for table in frontend. The id will be generated on
157        // meta catalog service.
158        Ok(TableCatalog {
159            id: TableId::placeholder(),
160            schema_id,
161            database_id,
162            associated_source_id: None,
163            name,
164            columns,
165            pk: table_pk,
166            stream_key,
167            distribution_key,
168            table_type: TableType::VectorIndex,
169            append_only,
170            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
171            fragment_id: FragmentId::placeholder(),
172            dml_fragment_id: None,
173            vnode_col_index: None,
174            row_id_index: None,
175            value_indices,
176            definition,
177            conflict_behavior: ConflictBehavior::NoCheck,
178            version_column_indices: vec![],
179            read_prefix_len_hint,
180            version: None,
181            watermark_columns,
182            dist_key_in_pk: vec![],
183            cardinality,
184            created_at_epoch: None,
185            initialized_at_epoch: None,
186            create_type,
187            stream_job_status: StreamJobStatus::Creating,
188            description: None,
189            initialized_at_cluster_version: None,
190            created_at_cluster_version: None,
191            retention_seconds: retention_seconds.map(|i| i.into()),
192            cdc_table_id: None,
193            vnode_count: VnodeCount::Singleton, // will be filled in by the meta service later
194            webhook_info: None,
195            job_id: None,
196            engine: Engine::Hummock,
197            clean_watermark_index_in_pk: None,
198            clean_watermark_indices: vec![],
199            refreshable: false,
200            vector_index_info: Some(vector_index_info),
201            cdc_table_type: None,
202        })
203    }
204
205    pub(crate) fn table(&self) -> &TableCatalog {
206        &self.table
207    }
208}
209
210impl Distill for StreamVectorIndexWrite {
211    fn distill<'a>(&self) -> XmlNode<'a> {
212        let table = self.table();
213
214        let vector_column_name = Pretty::from(table.columns[0].name().to_owned());
215
216        let column_names = (table.columns.iter())
217            .map(|col| col.name_with_hidden().to_string())
218            .map(Pretty::from)
219            .collect();
220
221        let stream_key = (table.stream_key().iter())
222            .map(|&k| table.columns[k].name().to_owned())
223            .map(Pretty::from)
224            .collect();
225
226        let vec = vec![
227            ("vector_column", vector_column_name),
228            ("columns", Pretty::Array(column_names)),
229            ("stream_key", Pretty::Array(stream_key)),
230        ];
231
232        childless_record("StreamVectorIndexWrite", vec)
233    }
234}
235
236impl PlanTreeNodeUnary<Stream> for StreamVectorIndexWrite {
237    fn input(&self) -> PlanRef {
238        self.input.clone()
239    }
240
241    fn clone_with_input(&self, input: PlanRef) -> Self {
242        let new = Self::new(input, self.table().clone()).unwrap();
243        new.base
244            .schema()
245            .fields
246            .iter()
247            .zip_eq_fast(self.base.schema().fields.iter())
248            .for_each(|(a, b)| {
249                assert_eq!(a.data_type, b.data_type);
250            });
251        assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
252        new
253    }
254}
255
256impl_plan_tree_node_for_unary! { Stream, StreamVectorIndexWrite }
257
258impl StreamNode for StreamVectorIndexWrite {
259    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
260        use risingwave_pb::stream_plan::*;
261
262        let table = self.table();
263        PbNodeBody::VectorIndexWrite(Box::new(VectorIndexWriteNode {
264            table: Some(table.to_prost()),
265        }))
266    }
267}
268
269impl ExprRewritable<Stream> for StreamVectorIndexWrite {}
270
271impl ExprVisitable for StreamVectorIndexWrite {}