1use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::iter;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_common::catalog::{ColumnDesc, TableId};
23use risingwave_common::hash::{VirtualNode, VnodeCountCompat};
24use risingwave_common::util::memcmp_encoding;
25use risingwave_common::util::row_serde::OrderedRowSerde;
26use risingwave_common::util::sort_util::OrderType;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_hummock_sdk::compaction_group::StateTableId;
30use risingwave_hummock_sdk::key::{TABLE_PREFIX_LEN, get_table_id};
31use risingwave_pb::catalog::Table;
32use risingwave_rpc_client::MetaClient;
33use risingwave_rpc_client::error::{Result as RpcResult, RpcError};
34use thiserror_ext::AsReport;
35
36use crate::hummock::{HummockError, HummockResult};
37use crate::row_serde::value_serde::ValueRowSerdeNew;
38
39pub trait FilterKeyExtractor: Send + Sync {
41 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8];
42}
43
44pub enum FilterKeyExtractorImpl {
45 Schema(SchemaFilterKeyExtractor),
46 FullKey(FullKeyFilterKeyExtractor),
47 Dummy(DummyFilterKeyExtractor),
48 Multi(MultiFilterKeyExtractor),
49 FixedLength(FixedLengthFilterKeyExtractor),
50}
51
52impl FilterKeyExtractorImpl {
53 pub fn from_table(table_catalog: &Table) -> Self {
54 let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
55
56 if read_prefix_len == 0 {
57 FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor)
58 } else if read_prefix_len > table_catalog.get_pk().len() {
59 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)
62 } else {
63 FilterKeyExtractorImpl::Schema(SchemaFilterKeyExtractor::new(table_catalog))
64 }
65 }
66}
67
68macro_rules! impl_filter_key_extractor {
69 ($( { $variant_name:ident } ),*) => {
70 impl FilterKeyExtractorImpl {
71 pub fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8]{
72 match self {
73 $( Self::$variant_name(inner) => inner.extract(user_key), )*
74 }
75 }
76 }
77 }
78
79}
80
81macro_rules! for_all_filter_key_extractor_variants {
82 ($macro:ident) => {
83 $macro! {
84 { Schema },
85 { FullKey },
86 { Dummy },
87 { Multi },
88 { FixedLength }
89 }
90 };
91}
92
93for_all_filter_key_extractor_variants! { impl_filter_key_extractor }
94
95#[derive(Default)]
96pub struct FullKeyFilterKeyExtractor;
97
98impl FilterKeyExtractor for FullKeyFilterKeyExtractor {
99 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
100 user_key
101 }
102}
103
104#[derive(Default)]
105pub struct DummyFilterKeyExtractor;
106impl FilterKeyExtractor for DummyFilterKeyExtractor {
107 fn extract<'a>(&self, _user_key: &'a [u8]) -> &'a [u8] {
108 &[]
109 }
110}
111
112#[derive(Default)]
114pub struct FixedLengthFilterKeyExtractor {
115 fixed_length: usize,
116}
117
118impl FilterKeyExtractor for FixedLengthFilterKeyExtractor {
119 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
120 &user_key[0..self.fixed_length]
121 }
122}
123
124impl FixedLengthFilterKeyExtractor {
125 pub fn new(fixed_length: usize) -> Self {
126 Self { fixed_length }
127 }
128}
129
130pub struct SchemaFilterKeyExtractor {
133 read_prefix_len: usize,
138 deserializer: OrderedRowSerde,
139 }
142
143impl FilterKeyExtractor for SchemaFilterKeyExtractor {
144 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
145 if user_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
146 return &[];
147 }
148
149 let (_table_prefix, key) = user_key.split_at(TABLE_PREFIX_LEN);
150 let (_vnode_prefix, pk) = key.split_at(VirtualNode::SIZE);
151
152 let filter_key_len = self
156 .deserializer
157 .deserialize_prefix_len(pk, self.read_prefix_len)
158 .unwrap();
159
160 let end_position = TABLE_PREFIX_LEN + VirtualNode::SIZE + filter_key_len;
161 &user_key[TABLE_PREFIX_LEN + VirtualNode::SIZE..end_position]
162 }
163}
164
165impl SchemaFilterKeyExtractor {
166 pub fn new(table_catalog: &Table) -> Self {
167 let pk_indices: Vec<usize> = table_catalog
168 .pk
169 .iter()
170 .map(|col_order| col_order.column_index as usize)
171 .collect();
172
173 let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
174
175 let data_types = pk_indices
176 .iter()
177 .map(|column_idx| &table_catalog.columns[*column_idx])
178 .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()).data_type)
179 .collect();
180
181 let order_types: Vec<OrderType> = table_catalog
182 .pk
183 .iter()
184 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
185 .collect();
186
187 Self {
188 read_prefix_len,
189 deserializer: OrderedRowSerde::new(data_types, order_types),
190 }
191 }
192}
193
194#[derive(Default)]
195pub struct MultiFilterKeyExtractor {
196 id_to_filter_key_extractor: HashMap<TableId, FilterKeyExtractorImpl>,
197}
198
199impl MultiFilterKeyExtractor {
200 pub fn register(&mut self, table_id: TableId, filter_key_extractor: FilterKeyExtractorImpl) {
201 self.id_to_filter_key_extractor
202 .insert(table_id, filter_key_extractor);
203 }
204
205 pub fn size(&self) -> usize {
206 self.id_to_filter_key_extractor.len()
207 }
208
209 pub fn get_existing_table_ids(&self) -> HashSet<TableId> {
210 self.id_to_filter_key_extractor.keys().cloned().collect()
211 }
212}
213
214impl Debug for MultiFilterKeyExtractor {
215 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216 write!(f, "MultiFilterKeyExtractor size {} ", self.size())
217 }
218}
219
220impl FilterKeyExtractor for MultiFilterKeyExtractor {
221 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
222 if user_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
223 return user_key;
224 }
225
226 let table_id = get_table_id(user_key);
227 self.id_to_filter_key_extractor
228 .get(&table_id)
229 .unwrap()
230 .extract(user_key)
231 }
232}
233
234#[async_trait::async_trait]
235pub trait StateTableAccessor: Send + Sync {
236 async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>>;
237}
238
239#[derive(Default)]
240pub struct FakeRemoteTableAccessor {}
241
242#[derive(Default)]
243struct PreloadedOnlyTableAccessor {}
244
245pub struct RemoteTableAccessor {
246 meta_client: MetaClient,
247}
248
249impl RemoteTableAccessor {
250 pub fn new(meta_client: MetaClient) -> Self {
251 Self { meta_client }
252 }
253}
254
255#[async_trait::async_trait]
256impl StateTableAccessor for RemoteTableAccessor {
257 async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
258 self.meta_client.get_tables(table_ids.to_vec(), true).await
259 }
260}
261
262#[async_trait::async_trait]
263impl StateTableAccessor for FakeRemoteTableAccessor {
264 async fn get_tables(&self, _table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
265 Err(RpcError::Internal(anyhow::anyhow!(
266 "fake accessor does not support fetch remote table"
267 )))
268 }
269}
270
271#[async_trait::async_trait]
272impl StateTableAccessor for PreloadedOnlyTableAccessor {
273 async fn get_tables(&self, _table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
274 Ok(HashMap::new())
275 }
276}
277
278pub struct CompactionCatalogManager {
280 table_id_to_catalog: RwLock<HashMap<StateTableId, Table>>,
282 table_accessor: Box<dyn StateTableAccessor>,
284}
285
286impl Default for CompactionCatalogManager {
287 fn default() -> Self {
288 Self::new(Box::<FakeRemoteTableAccessor>::default())
289 }
290}
291
292impl CompactionCatalogManager {
293 pub fn new(table_accessor: Box<dyn StateTableAccessor>) -> Self {
294 Self {
295 table_id_to_catalog: Default::default(),
296 table_accessor,
297 }
298 }
299
300 pub fn new_preloaded(table_id_to_catalog: HashMap<StateTableId, Table>) -> Self {
304 Self {
305 table_id_to_catalog: RwLock::new(table_id_to_catalog),
306 table_accessor: Box::<PreloadedOnlyTableAccessor>::default(),
307 }
308 }
309}
310
311impl CompactionCatalogManager {
312 pub fn update(&self, table_id: TableId, catalog: Table) {
314 self.table_id_to_catalog.write().insert(table_id, catalog);
315 }
316
317 pub fn sync(&self, catalog_map: HashMap<TableId, Table>) {
319 let mut guard = self.table_id_to_catalog.write();
320 guard.clear();
321 guard.extend(catalog_map);
322 }
323
324 pub fn remove(&self, table_id: TableId) {
326 self.table_id_to_catalog.write().remove(&table_id);
327 }
328
329 pub async fn acquire(
332 &self,
333 mut table_ids: Vec<StateTableId>,
334 ) -> HummockResult<CompactionCatalogAgentRef> {
335 if table_ids.is_empty() {
336 return Err(HummockError::other("table_id_set is empty"));
341 }
342
343 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
344 let mut table_id_to_vnode = HashMap::new();
345 let mut table_id_to_watermark_serde = HashMap::new();
346 let mut table_id_to_value_watermark_serde = HashMap::new();
347
348 {
349 let guard = self.table_id_to_catalog.read();
350 table_ids.retain(|table_id| match guard.get(table_id) {
351 Some(table_catalog) => {
352 multi_filter_key_extractor
354 .register(*table_id, FilterKeyExtractorImpl::from_table(table_catalog));
355
356 table_id_to_vnode.insert(*table_id, table_catalog.vnode_count());
358
359 table_id_to_watermark_serde
361 .insert(*table_id, build_watermark_col_serde(table_catalog));
362 table_id_to_value_watermark_serde.insert(
363 *table_id,
364 build_value_watermark_col_serde(table_catalog).map(Arc::new),
365 );
366
367 false
368 }
369
370 None => true,
371 });
372 }
373
374 if !table_ids.is_empty() {
375 let mut state_tables =
376 self.table_accessor
377 .get_tables(&table_ids)
378 .await
379 .map_err(|e| {
380 HummockError::other(format!(
381 "request rpc list_tables for meta failed: {}",
382 e.as_report()
383 ))
384 })?;
385
386 let mut guard = self.table_id_to_catalog.write();
387 for table_id in table_ids {
388 if let Some(table) = state_tables.remove(&table_id) {
389 let table_id = table.id;
390 let key_extractor = FilterKeyExtractorImpl::from_table(&table);
391 let vnode = table.vnode_count();
392 let watermark_serde = build_watermark_col_serde(&table);
393 let value_watermark_serde = build_value_watermark_col_serde(&table);
394 guard.insert(table_id, table);
395 multi_filter_key_extractor.register(table_id, key_extractor);
397
398 table_id_to_vnode.insert(table_id, vnode);
400
401 table_id_to_watermark_serde.insert(table_id, watermark_serde);
403 table_id_to_value_watermark_serde
404 .insert(table_id, value_watermark_serde.map(Arc::new));
405 }
406 }
407 }
408
409 Ok(Arc::new(CompactionCatalogAgent::new(
410 FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
411 table_id_to_vnode,
412 table_id_to_watermark_serde,
413 table_id_to_value_watermark_serde,
414 )))
415 }
416}
417
418pub struct CompactionCatalogAgent {
422 filter_key_extractor_manager: FilterKeyExtractorImpl,
423 table_id_to_vnode: HashMap<StateTableId, usize>,
424 table_id_to_watermark_serde:
427 HashMap<StateTableId, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
428 value_table_id_to_watermark_serde: HashMap<StateTableId, Option<ValueWatermarkColumnSerdeRef>>,
429}
430
431impl CompactionCatalogAgent {
432 pub fn new(
433 filter_key_extractor_manager: FilterKeyExtractorImpl,
434 table_id_to_vnode: HashMap<StateTableId, usize>,
435 table_id_to_watermark_serde: HashMap<
436 StateTableId,
437 Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
438 >,
439 value_table_id_to_watermark_serde: HashMap<
440 StateTableId,
441 Option<ValueWatermarkColumnSerdeRef>,
442 >,
443 ) -> Self {
444 Self {
445 filter_key_extractor_manager,
446 table_id_to_vnode,
447 table_id_to_watermark_serde,
448 value_table_id_to_watermark_serde,
449 }
450 }
451
452 pub fn dummy() -> Self {
453 Self {
454 filter_key_extractor_manager: FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
455 table_id_to_vnode: Default::default(),
456 table_id_to_watermark_serde: Default::default(),
457 value_table_id_to_watermark_serde: Default::default(),
458 }
459 }
460
461 pub fn for_test(table_ids: Vec<impl Into<StateTableId>>) -> Arc<Self> {
462 let full_key_filter_key_extractor =
463 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
464
465 let table_id_to_vnode: HashMap<TableId, usize> = table_ids
466 .into_iter()
467 .map(|table_id| (table_id.into(), VirtualNode::COUNT_FOR_TEST))
468 .collect();
469
470 let table_id_to_watermark_serde = table_id_to_vnode
471 .keys()
472 .map(|table_id| (*table_id, None))
473 .collect();
474
475 let value_table_id_to_watermark_serde = table_id_to_vnode
476 .keys()
477 .map(|table_id| (*table_id, None))
478 .collect();
479
480 Arc::new(CompactionCatalogAgent::new(
481 full_key_filter_key_extractor,
482 table_id_to_vnode,
483 table_id_to_watermark_serde,
484 value_table_id_to_watermark_serde,
485 ))
486 }
487}
488
489impl CompactionCatalogAgent {
490 pub fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
491 self.filter_key_extractor_manager.extract(user_key)
492 }
493
494 pub fn vnode_count(&self, table_id: StateTableId) -> usize {
495 *self.table_id_to_vnode.get(&table_id).unwrap_or_else(|| {
496 panic!(
497 "table_id not found {} all_table_ids {:?}",
498 table_id,
499 self.table_id_to_vnode.keys()
500 )
501 })
502 }
503
504 pub fn watermark_serde(
505 &self,
506 table_id: StateTableId,
507 ) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
508 self.table_id_to_watermark_serde
509 .get(&table_id)
510 .unwrap_or_else(|| {
511 panic!(
512 "table_id not found {} all_table_ids {:?}",
513 table_id,
514 self.table_id_to_watermark_serde.keys()
515 )
516 })
517 .clone()
518 }
519
520 pub fn value_watermark_serde(
521 &self,
522 table_id: StateTableId,
523 ) -> Option<ValueWatermarkColumnSerdeRef> {
524 self.value_table_id_to_watermark_serde
525 .get(&table_id)
526 .unwrap_or_else(|| {
527 panic!(
528 "table_id not found {} all_table_ids {:?}",
529 table_id,
530 self.value_table_id_to_watermark_serde.keys()
531 )
532 })
533 .clone()
534 }
535
536 pub fn table_id_to_vnode_ref(&self) -> &HashMap<StateTableId, usize> {
537 &self.table_id_to_vnode
538 }
539
540 pub fn table_ids(&self) -> impl Iterator<Item = StateTableId> + '_ {
541 self.table_id_to_vnode.keys().cloned()
542 }
543}
544
545pub type CompactionCatalogManagerRef = Arc<CompactionCatalogManager>;
546pub type CompactionCatalogAgentRef = Arc<CompactionCatalogAgent>;
547
548fn build_watermark_col_serde(
549 table_catalog: &Table,
550) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
551 let clean_watermark_index_in_pk = table_catalog.get_clean_watermark_index_in_pk_compat();
553
554 match clean_watermark_index_in_pk {
555 None => {
556 None
559 }
560
561 Some(clean_watermark_index_in_pk) => {
562 use risingwave_common::types::DataType;
563 let table_columns: Vec<ColumnDesc> = table_catalog
564 .columns
565 .iter()
566 .map(|col| col.column_desc.as_ref().unwrap().into())
567 .collect();
568
569 let pk_data_types: Vec<DataType> = table_catalog
570 .pk
571 .iter()
572 .map(|col_order| {
573 table_columns[col_order.column_index as usize]
574 .data_type
575 .clone()
576 })
577 .collect();
578
579 let pk_order_types = table_catalog
580 .pk
581 .iter()
582 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
583 .collect_vec();
584
585 assert_eq!(pk_data_types.len(), pk_order_types.len());
586 let pk_serde = OrderedRowSerde::new(pk_data_types, pk_order_types);
587 let watermark_col_serde = pk_serde.index(clean_watermark_index_in_pk).into_owned();
588 Some((pk_serde, watermark_col_serde, clean_watermark_index_in_pk))
589 }
590 }
591}
592
593fn build_value_watermark_col_serde(table_catalog: &Table) -> Option<ValueWatermarkColumnSerde> {
594 pub fn try_get_non_pk_clean_watermark_column_index(table: &Table) -> Option<usize> {
596 table
597 .get_clean_watermark_column_indices()
598 .iter()
599 .filter_map(|&col_idx| {
600 if table
601 .pk
602 .iter()
603 .any(|col_order| col_order.column_index == col_idx)
604 {
605 return None;
606 }
607 Some(col_idx as usize)
608 })
609 .at_most_one()
610 .unwrap()
611 }
612
613 let clean_watermark_index = try_get_non_pk_clean_watermark_column_index(table_catalog)?;
614 Some(ValueWatermarkColumnSerde::new(
615 table_catalog,
616 clean_watermark_index,
617 ))
618}
619
620pub struct ValueWatermarkColumnSerde {
621 row_serde: EitherSerde,
623 watermark_index_in_de_row: usize,
625 watermark_column_mem_encoding_order: OrderType,
626}
627
628pub type ValueWatermarkColumnSerdeRef = Arc<ValueWatermarkColumnSerde>;
629
630impl ValueWatermarkColumnSerde {
631 fn new(table_catalog: &Table, clean_watermark_index: usize) -> Self {
632 let table_columns: Vec<ColumnDesc> = table_catalog
633 .columns
634 .iter()
635 .map(|col| col.column_desc.as_ref().unwrap().into())
636 .collect();
637 let pk_order_type = table_catalog
638 .pk
639 .iter()
640 .filter_map(|col_order| {
641 if col_order.column_index as usize == clean_watermark_index {
642 return Some(OrderType::from_protobuf(
643 col_order.get_order_type().unwrap(),
644 ));
645 }
646 None
647 })
648 .at_most_one()
649 .unwrap();
650 let watermark_column_mem_encoding_order = match pk_order_type {
652 Some(o) => o,
653 None => OrderType::ascending(),
655 };
656 let Some(watermark_index_in_value_indices) = table_catalog
658 .value_indices
659 .iter()
660 .position(|p| *p as usize == clean_watermark_index)
661 else {
662 panic!(
663 "Watermark index {} not found in value_indices {:?}.",
664 clean_watermark_index, table_catalog.value_indices
665 );
666 };
667 let (row_serde, watermark_index_in_de_row) = if table_catalog.version.is_none() {
668 let row_serde = BasicSerde::new(
669 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
670 Arc::from(table_columns.into_boxed_slice()),
671 )
672 .into();
673 (row_serde, watermark_index_in_value_indices)
674 } else {
675 let row_serde = ColumnAwareSerde::new(
676 Arc::from_iter(iter::once(clean_watermark_index)),
677 Arc::from(table_columns.into_boxed_slice()),
678 )
679 .into();
680 (row_serde, 0)
682 };
683 Self {
684 row_serde,
685 watermark_index_in_de_row,
686 watermark_column_mem_encoding_order,
687 }
688 }
689
690 pub fn deserialize(&self, encoded_bytes: &[u8]) -> HummockResult<Option<Vec<u8>>> {
691 let mut row = self
692 .row_serde
693 .deserialize(encoded_bytes)
694 .map_err(HummockError::decode_error)?;
695 if self.watermark_index_in_de_row >= row.len() {
696 return Ok(None);
698 }
699 let datum = std::mem::take(&mut row[self.watermark_index_in_de_row]);
700 let bytes = memcmp_encoding::encode_value(datum, self.watermark_column_mem_encoding_order)
702 .map_err(HummockError::encode_error)?;
703 Ok(Some(bytes.into()))
704 }
705}
706
707#[cfg(test)]
708mod tests {
709 use std::collections::{HashMap, HashSet};
710 use std::mem;
711 use std::sync::Arc;
712
713 use bytes::{BufMut, BytesMut};
714 use itertools::Itertools;
715 use risingwave_common::catalog::ColumnDesc;
716 use risingwave_common::hash::VirtualNode;
717 use risingwave_common::row::OwnedRow;
718 use risingwave_common::types::DataType;
719 use risingwave_common::types::ScalarImpl::{self};
720 use risingwave_common::util::row_serde::OrderedRowSerde;
721 use risingwave_common::util::sort_util::OrderType;
722 use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
723 use risingwave_pb::catalog::table::{PbEngine, TableType};
724 use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
725 use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
726 use risingwave_pb::plan_common::PbColumnCatalog;
727 use thiserror_ext::AsReport;
728
729 use super::{DummyFilterKeyExtractor, FilterKeyExtractor, SchemaFilterKeyExtractor};
730 use crate::compaction_catalog_manager::{
731 FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, MultiFilterKeyExtractor,
732 };
733 const fn dummy_vnode() -> [u8; VirtualNode::SIZE] {
734 VirtualNode::from_index(233).to_be_bytes()
735 }
736
737 #[test]
738 fn test_default_filter_key_extractor() {
739 let dummy_filter_key_extractor = DummyFilterKeyExtractor;
740 let full_key = "full_key".as_bytes();
741 let output_key = dummy_filter_key_extractor.extract(full_key);
742
743 assert_eq!("".as_bytes(), output_key);
744
745 let full_key_filter_key_extractor = FullKeyFilterKeyExtractor;
746 let output_key = full_key_filter_key_extractor.extract(full_key);
747
748 assert_eq!(full_key, output_key);
749 }
750
751 fn build_table_with_prefix_column_num(column_count: u32) -> PbTable {
752 PbTable {
753 id: 0.into(),
754 schema_id: 0.into(),
755 database_id: 0.into(),
756 name: "test".to_owned(),
757 table_type: TableType::Table as i32,
758 columns: vec![
759 PbColumnCatalog {
760 column_desc: Some(
761 (&ColumnDesc::named("_row_id", 0.into(), DataType::Int64)).into(),
762 ),
763 is_hidden: true,
764 },
765 PbColumnCatalog {
766 column_desc: Some(
767 (&ColumnDesc::named("col_1", 0.into(), DataType::Int64)).into(),
768 ),
769 is_hidden: false,
770 },
771 PbColumnCatalog {
772 column_desc: Some(
773 (&ColumnDesc::named("col_2", 0.into(), DataType::Float64)).into(),
774 ),
775 is_hidden: false,
776 },
777 PbColumnCatalog {
778 column_desc: Some(
779 (&ColumnDesc::named("col_3", 0.into(), DataType::Varchar)).into(),
780 ),
781 is_hidden: false,
782 },
783 ],
784 pk: vec![
785 PbColumnOrder {
786 column_index: 1,
787 order_type: Some(PbOrderType {
788 direction: PbDirection::Ascending as _,
789 nulls_are: PbNullsAre::Largest as _,
790 }),
791 },
792 PbColumnOrder {
793 column_index: 3,
794 order_type: Some(PbOrderType {
795 direction: PbDirection::Ascending as _,
796 nulls_are: PbNullsAre::Largest as _,
797 }),
798 },
799 ],
800 stream_key: vec![0],
801 distribution_key: (0..column_count as i32).collect_vec(),
802 optional_associated_source_id: None,
803 append_only: false,
804 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
805 retention_seconds: Some(300),
806 fragment_id: 0.into(),
807 dml_fragment_id: None,
808 initialized_at_epoch: None,
809 vnode_col_index: None,
810 row_id_index: Some(0),
811 value_indices: vec![0],
812 definition: "".into(),
813 handle_pk_conflict_behavior: 0,
814 version_column_indices: vec![],
815 read_prefix_len_hint: 1,
816 version: None,
817 watermark_indices: vec![],
818 dist_key_in_pk: vec![],
819 cardinality: None,
820 created_at_epoch: None,
821 #[expect(deprecated)]
822 cleaned_by_watermark: false,
823 stream_job_status: PbStreamJobStatus::Created.into(),
824 create_type: PbCreateType::Foreground.into(),
825 description: None,
826 #[expect(deprecated)]
827 incoming_sinks: Default::default(),
828 initialized_at_cluster_version: None,
829 created_at_cluster_version: None,
830 cdc_table_id: None,
831 maybe_vnode_count: None,
832 webhook_info: None,
833 job_id: None,
834 engine: Some(PbEngine::Hummock as i32),
835 #[expect(deprecated)]
836 clean_watermark_index_in_pk: None,
837 clean_watermark_indices: vec![],
838 refreshable: false,
839 vector_index_info: None,
840 cdc_table_type: None,
841 }
842 }
843
844 #[test]
845 fn test_zero_read_prefix_len_uses_dummy_extractor() {
846 let mut prost_table = build_table_with_prefix_column_num(1);
847 prost_table.read_prefix_len_hint = 0;
848
849 let extractor = FilterKeyExtractorImpl::from_table(&prost_table);
850 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
851 let schema = vec![DataType::Int64, DataType::Varchar];
852 let serializer = OrderedRowSerde::new(schema, order_types);
853 let row = OwnedRow::new(vec![
854 Some(ScalarImpl::Int64(100)),
855 Some(ScalarImpl::Utf8("abc".into())),
856 ]);
857 let mut row_bytes = vec![];
858 serializer.serialize(&row, &mut row_bytes);
859
860 let table_prefix = {
861 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
862 buf.put_u32(1);
863 buf.to_vec()
864 };
865 let vnode_prefix = &dummy_vnode()[..];
866 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
867
868 assert!(extractor.extract(&full_key).is_empty());
869 }
870
871 #[test]
872 fn test_schema_filter_key_extractor() {
873 let prost_table = build_table_with_prefix_column_num(1);
874 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
875
876 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
877 let schema = vec![DataType::Int64, DataType::Varchar];
878 let serializer = OrderedRowSerde::new(schema, order_types);
879 let row = OwnedRow::new(vec![
880 Some(ScalarImpl::Int64(100)),
881 Some(ScalarImpl::Utf8("abc".into())),
882 ]);
883 let mut row_bytes = vec![];
884 serializer.serialize(&row, &mut row_bytes);
885
886 let table_prefix = {
887 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
888 buf.put_u32(1);
889 buf.to_vec()
890 };
891
892 let vnode_prefix = &dummy_vnode()[..];
893
894 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
895 let output_key = schema_filter_key_extractor.extract(&full_key);
896 assert_eq!(1 + mem::size_of::<i64>(), output_key.len());
897 }
898
899 #[test]
900 fn test_multi_filter_key_extractor() {
901 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
902 {
903 let prost_table = build_table_with_prefix_column_num(1);
905 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
906 multi_filter_key_extractor.register(
907 1.into(),
908 FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
909 );
910 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
911 let schema = vec![DataType::Int64, DataType::Varchar];
912 let serializer = OrderedRowSerde::new(schema, order_types);
913 let row = OwnedRow::new(vec![
914 Some(ScalarImpl::Int64(100)),
915 Some(ScalarImpl::Utf8("abc".into())),
916 ]);
917 let mut row_bytes = vec![];
918 serializer.serialize(&row, &mut row_bytes);
919
920 let table_prefix = {
921 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
922 buf.put_u32(1);
923 buf.to_vec()
924 };
925
926 let vnode_prefix = &dummy_vnode()[..];
927
928 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
929 let output_key = multi_filter_key_extractor.extract(&full_key);
930
931 let data_types = vec![DataType::Int64];
932 let order_types = vec![OrderType::ascending()];
933 let deserializer = OrderedRowSerde::new(data_types, order_types);
934
935 let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
936 assert_eq!(pk_prefix_len, output_key.len());
937 }
938
939 {
940 let prost_table = build_table_with_prefix_column_num(2);
942 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
943 multi_filter_key_extractor.register(
944 2.into(),
945 FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
946 );
947 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
948 let schema = vec![DataType::Int64, DataType::Varchar];
949 let serializer = OrderedRowSerde::new(schema, order_types);
950 let row = OwnedRow::new(vec![
951 Some(ScalarImpl::Int64(100)),
952 Some(ScalarImpl::Utf8("abc".into())),
953 ]);
954 let mut row_bytes = vec![];
955 serializer.serialize(&row, &mut row_bytes);
956
957 let table_prefix = {
958 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
959 buf.put_u32(2);
960 buf.to_vec()
961 };
962
963 let vnode_prefix = &dummy_vnode()[..];
964
965 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
966 let output_key = multi_filter_key_extractor.extract(&full_key);
967
968 let data_types = vec![DataType::Int64, DataType::Varchar];
969 let order_types = vec![OrderType::ascending(), OrderType::ascending()];
970 let deserializer = OrderedRowSerde::new(data_types, order_types);
971
972 let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
973
974 assert_eq!(pk_prefix_len, output_key.len());
975 }
976 }
977
978 #[tokio::test]
979 async fn test_compaction_catalog_manager_exception() {
980 let compaction_catalog_manager = super::CompactionCatalogManager::default();
981
982 {
983 let ret = compaction_catalog_manager.acquire(vec![]).await;
984 assert!(ret.is_err());
985 if let Err(e) = ret {
986 assert_eq!(e.to_string(), "Other error: table_id_set is empty");
987 }
988 }
989
990 {
991 let ret = compaction_catalog_manager.acquire(vec![1.into()]).await;
993 assert!(ret.is_err());
994 if let Err(e) = ret {
995 assert_eq!(
996 e.to_string(),
997 "Other error: request rpc list_tables for meta failed: fake accessor does not support fetch remote table"
998 );
999 }
1000 }
1001 }
1002
1003 #[tokio::test]
1004 async fn test_preloaded_compaction_catalog_manager() {
1005 let mut table = build_table_with_prefix_column_num(1);
1006 table.id = 1.into();
1007 let compaction_catalog_manager = Arc::new(super::CompactionCatalogManager::new_preloaded(
1008 HashMap::from([(1.into(), table)]),
1009 ));
1010
1011 let agent = compaction_catalog_manager
1012 .acquire(vec![1.into(), 2.into()])
1013 .await
1014 .unwrap();
1015 let table_ids = agent.table_ids().collect::<HashSet<_>>();
1016
1017 assert_eq!(table_ids, HashSet::from([1.into()]));
1018
1019 let err = match crate::hummock::compactor::compactor_runner::acquire_complete_catalog_agent(
1020 &compaction_catalog_manager,
1021 vec![1.into(), 2.into()],
1022 )
1023 .await
1024 {
1025 Ok(_) => panic!("partial preloaded catalog should fail strict acquire"),
1026 Err(err) => err,
1027 };
1028
1029 assert!(
1030 err.to_report_string()
1031 .contains("some table ids are not acquired")
1032 );
1033 }
1034}