risingwave_hummock_sdk/
compact_task.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::mem::size_of;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType};
21use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask;
22use risingwave_pb::hummock::{
23    LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats,
24    PbValidationTask,
25};
26use risingwave_pb::id::WorkerId;
27
28use crate::compaction_group::StateTableId;
29use crate::key_range::KeyRange;
30use crate::level::InputLevel;
31use crate::sstable_info::SstableInfo;
32use crate::table_watermark::{TableWatermarks, WatermarkSerdeType};
33use crate::{CompactionGroupId, HummockSstableObjectId};
34
35#[derive(Clone, PartialEq, Default, Debug)]
36pub struct CompactTask {
37    /// SSTs to be compacted, which will be removed from LSM after compaction
38    pub input_ssts: Vec<InputLevel>,
39    /// In ideal case, the compaction will generate `splits.len()` tables which have key range
40    /// corresponding to that in `splits`, respectively
41    pub splits: Vec<KeyRange>,
42    /// compaction output, which will be added to `target_level` of LSM after compaction
43    pub sorted_output_ssts: Vec<SstableInfo>,
44    /// task id assigned by hummock storage service
45    pub task_id: u64,
46    /// compaction output will be added to `target_level` of LSM after compaction
47    pub target_level: u32,
48    pub gc_delete_keys: bool,
49    /// Lbase in LSM
50    pub base_level: u32,
51    pub task_status: PbTaskStatus,
52    /// compaction group the task belongs to.
53    pub compaction_group_id: CompactionGroupId,
54    /// compaction group id when the compaction task is created
55    pub compaction_group_version_id: u64,
56    /// `existing_table_ids` for compaction drop key
57    pub existing_table_ids: Vec<TableId>,
58    pub compression_algorithm: u32,
59    pub target_file_size: u64,
60    pub compaction_filter_mask: u32,
61    pub table_options: BTreeMap<TableId, PbTableOption>,
62    pub current_epoch_time: u64,
63    pub target_sub_level_id: u64,
64    /// Identifies whether the task is `space_reclaim`, if the `compact_task_type` increases, it will be refactored to enum
65    pub task_type: PbTaskType,
66    /// Deprecated. use `table_vnode_partition` instead;
67    pub split_by_state_table: bool,
68    /// Compaction needs to cut the state table every time 1/weight of vnodes in the table have been processed.
69    /// Deprecated. use `table_vnode_partition` instead;
70    pub split_weight_by_vnode: u32,
71    pub table_vnode_partition: BTreeMap<TableId, u32>,
72    /// The table watermark of any table id. In compaction we only use the table watermarks on safe epoch,
73    /// so we only need to include the table watermarks on safe epoch to reduce the size of metadata.
74    pub pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
75
76    pub non_pk_prefix_table_watermarks: BTreeMap<TableId, TableWatermarks>,
77    pub value_table_watermarks: BTreeMap<TableId, TableWatermarks>,
78
79    pub table_schemas: BTreeMap<TableId, PbTableSchema>,
80
81    pub max_sub_compaction: u32,
82
83    pub max_kv_count_for_xor16: Option<u64>,
84
85    pub max_vnode_key_range_bytes: Option<u64>,
86}
87
88impl CompactTask {
89    pub fn estimated_encode_len(&self) -> usize {
90        self.input_ssts
91            .iter()
92            .map(|input_level| input_level.estimated_encode_len())
93            .sum::<usize>()
94            + self
95                .splits
96                .iter()
97                .map(|split| split.left.len() + split.right.len() + size_of::<bool>())
98                .sum::<usize>()
99            + size_of::<u64>()
100            + self
101                .sorted_output_ssts
102                .iter()
103                .map(|sst| sst.estimated_encode_len())
104                .sum::<usize>()
105            + size_of::<u64>()
106            + size_of::<u32>()
107            + size_of::<bool>()
108            + size_of::<u32>()
109            + size_of::<i32>()
110            + size_of::<u64>()
111            + self.existing_table_ids.len() * size_of::<u32>()
112            + size_of::<u32>()
113            + size_of::<u64>()
114            + size_of::<u32>()
115            + self.table_options.len() * size_of::<u64>()
116            + size_of::<u64>()
117            + size_of::<u64>()
118            + size_of::<i32>()
119            + size_of::<bool>()
120            + size_of::<u32>()
121            + self.table_vnode_partition.len() * size_of::<u64>()
122            + self
123                .pk_prefix_table_watermarks
124                .values()
125                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
126                .sum::<usize>()
127            + self
128                .non_pk_prefix_table_watermarks
129                .values()
130                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
131                .sum::<usize>()
132            + self
133                .max_vnode_key_range_bytes
134                .map(|_| size_of::<u32>())
135                .unwrap_or_default()
136            + self
137                .value_table_watermarks
138                .values()
139                .map(|table_watermark| size_of::<u32>() + table_watermark.estimated_encode_len())
140                .sum::<usize>()
141    }
142
143    pub fn is_trivial_move_task(&self) -> bool {
144        if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency {
145            return false;
146        }
147
148        if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping
149        {
150            return false;
151        }
152
153        // it may be a manual compaction task
154        if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx
155            && self.input_ssts[0].level_idx > 0
156        {
157            return false;
158        }
159
160        if self.input_ssts[1].level_idx == self.target_level
161            && self.input_ssts[1].table_infos.is_empty()
162        {
163            return true;
164        }
165
166        false
167    }
168
169    pub fn is_trivial_reclaim(&self) -> bool {
170        // Currently all VnodeWatermark tasks are trivial reclaim.
171        if self.task_type == TaskType::VnodeWatermark {
172            return true;
173        }
174        let exist_table_ids =
175            HashSet::<TableId>::from_iter(self.existing_table_ids.iter().copied());
176        self.input_ssts.iter().all(|level| {
177            level.table_infos.iter().all(|sst| {
178                sst.table_ids
179                    .iter()
180                    .all(|table_id| !exist_table_ids.contains(table_id))
181            })
182        })
183    }
184}
185
186impl CompactTask {
187    // The compact task may need to reclaim key with TTL
188    pub fn contains_ttl(&self) -> bool {
189        self.table_options
190            .iter()
191            .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
192    }
193
194    // The compact task may need to reclaim key with range tombstone
195    pub fn contains_range_tombstone(&self) -> bool {
196        self.input_ssts
197            .iter()
198            .flat_map(|level| level.table_infos.iter())
199            .any(|sst| sst.range_tombstone_count > 0)
200    }
201
202    // The compact task may need to reclaim key with split sst
203    pub fn contains_split_sst(&self) -> bool {
204        self.input_ssts
205            .iter()
206            .flat_map(|level| level.table_infos.iter())
207            .any(|sst| sst.sst_id.as_raw_id() != sst.object_id.as_raw_id())
208    }
209
210    pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> + use<> {
211        self.input_ssts
212            .iter()
213            .flat_map(|level| level.table_infos.iter())
214            .flat_map(|sst| sst.table_ids.clone())
215            .sorted()
216            .dedup()
217    }
218
219    // filter the table-id that in existing_table_ids with the table-id in compact-task
220    pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
221        let existing_table_ids: HashSet<TableId> =
222            HashSet::from_iter(self.existing_table_ids.clone());
223        self.get_table_ids_from_input_ssts()
224            .filter(|table_id| existing_table_ids.contains(table_id))
225            .collect()
226    }
227
228    pub fn is_expired(&self, compaction_group_version_id_expected: u64) -> bool {
229        is_compaction_task_expired(
230            self.compaction_group_version_id,
231            compaction_group_version_id_expected,
232        )
233    }
234
235    /// Determines whether to use block-based filter for this compaction task.
236    /// Returns true if the total key count exceeds the configured threshold.
237    pub fn should_use_block_based_filter(&self) -> bool {
238        let kv_count = self
239            .input_ssts
240            .iter()
241            .flat_map(|level| level.table_infos.iter())
242            .map(|sst| sst.total_key_count)
243            .sum::<u64>();
244
245        crate::filter_utils::is_kv_count_too_large_for_xor16(kv_count, self.max_kv_count_for_xor16)
246    }
247
248    /// Returns the effective vnode key-range hint limit (in bytes) for this compaction task.
249    ///
250    /// The hint is only meaningful when all input SSTs belong to a single table.
251    pub fn effective_max_vnode_key_range_bytes(&self) -> Option<usize> {
252        let limit = self.max_vnode_key_range_bytes.filter(|&v| v > 0)? as usize;
253        (self.build_compact_table_ids().len() == 1).then_some(limit)
254    }
255}
256
257fn split_watermark_serde_types(
258    pb_compact_task: &PbCompactTask,
259) -> (
260    BTreeMap<TableId, TableWatermarks>,
261    BTreeMap<TableId, TableWatermarks>,
262    BTreeMap<TableId, TableWatermarks>,
263) {
264    let mut pk_prefix_table_watermarks = BTreeMap::default();
265    let mut non_pk_prefix_table_watermarks = BTreeMap::default();
266    let mut value_table_watermarks = BTreeMap::default();
267    for (table_id, pbwatermark) in &pb_compact_task.table_watermarks {
268        let watermark = TableWatermarks::from(pbwatermark);
269        match watermark.watermark_type {
270            WatermarkSerdeType::PkPrefix => {
271                pk_prefix_table_watermarks.insert(*table_id, watermark);
272            }
273            WatermarkSerdeType::NonPkPrefix => {
274                non_pk_prefix_table_watermarks.insert(*table_id, watermark);
275            }
276            WatermarkSerdeType::Value => {
277                value_table_watermarks.insert(*table_id, watermark);
278            }
279        }
280    }
281    (
282        pk_prefix_table_watermarks,
283        non_pk_prefix_table_watermarks,
284        value_table_watermarks,
285    )
286}
287
288pub fn is_compaction_task_expired(
289    compaction_group_version_id_in_task: u64,
290    compaction_group_version_id_expected: u64,
291) -> bool {
292    compaction_group_version_id_in_task != compaction_group_version_id_expected
293}
294
295impl From<PbCompactTask> for CompactTask {
296    fn from(pb_compact_task: PbCompactTask) -> Self {
297        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
298            split_watermark_serde_types(&pb_compact_task);
299        #[expect(deprecated)]
300        Self {
301            input_ssts: pb_compact_task
302                .input_ssts
303                .into_iter()
304                .map(InputLevel::from)
305                .collect_vec(),
306            splits: pb_compact_task
307                .splits
308                .into_iter()
309                .map(|pb_keyrange| KeyRange {
310                    left: pb_keyrange.left.into(),
311                    right: pb_keyrange.right.into(),
312                    right_exclusive: pb_keyrange.right_exclusive,
313                })
314                .collect_vec(),
315            sorted_output_ssts: pb_compact_task
316                .sorted_output_ssts
317                .into_iter()
318                .map(SstableInfo::from)
319                .collect_vec(),
320            task_id: pb_compact_task.task_id,
321            target_level: pb_compact_task.target_level,
322            gc_delete_keys: pb_compact_task.gc_delete_keys,
323            base_level: pb_compact_task.base_level,
324            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
325            compaction_group_id: pb_compact_task.compaction_group_id,
326            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
327            compression_algorithm: pb_compact_task.compression_algorithm,
328            target_file_size: pb_compact_task.target_file_size,
329            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
330            table_options: pb_compact_task
331                .table_options
332                .iter()
333                .map(|(table_id, v)| (*table_id, *v))
334                .collect(),
335            current_epoch_time: pb_compact_task.current_epoch_time,
336            target_sub_level_id: pb_compact_task.target_sub_level_id,
337            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
338            split_by_state_table: pb_compact_task.split_by_state_table,
339            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
340            table_vnode_partition: pb_compact_task
341                .table_vnode_partition
342                .iter()
343                .map(|(table_id, v)| (*table_id, *v))
344                .collect(),
345            pk_prefix_table_watermarks,
346            non_pk_prefix_table_watermarks,
347            value_table_watermarks,
348            table_schemas: pb_compact_task
349                .table_schemas
350                .iter()
351                .map(|(table_id, v)| (*table_id, v.clone()))
352                .collect(),
353            max_sub_compaction: pb_compact_task.max_sub_compaction,
354            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
355            max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
356            max_vnode_key_range_bytes: pb_compact_task.max_vnode_key_range_bytes,
357        }
358    }
359}
360
361impl From<&PbCompactTask> for CompactTask {
362    fn from(pb_compact_task: &PbCompactTask) -> Self {
363        let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks, value_table_watermarks) =
364            split_watermark_serde_types(pb_compact_task);
365        #[expect(deprecated)]
366        Self {
367            input_ssts: pb_compact_task
368                .input_ssts
369                .iter()
370                .map(InputLevel::from)
371                .collect_vec(),
372            splits: pb_compact_task
373                .splits
374                .iter()
375                .map(|pb_keyrange| KeyRange {
376                    left: pb_keyrange.left.clone().into(),
377                    right: pb_keyrange.right.clone().into(),
378                    right_exclusive: pb_keyrange.right_exclusive,
379                })
380                .collect_vec(),
381            sorted_output_ssts: pb_compact_task
382                .sorted_output_ssts
383                .iter()
384                .map(SstableInfo::from)
385                .collect_vec(),
386            task_id: pb_compact_task.task_id,
387            target_level: pb_compact_task.target_level,
388            gc_delete_keys: pb_compact_task.gc_delete_keys,
389            base_level: pb_compact_task.base_level,
390            task_status: TaskStatus::try_from(pb_compact_task.task_status).unwrap(),
391            compaction_group_id: pb_compact_task.compaction_group_id,
392            existing_table_ids: pb_compact_task.existing_table_ids.clone(),
393            compression_algorithm: pb_compact_task.compression_algorithm,
394            target_file_size: pb_compact_task.target_file_size,
395            compaction_filter_mask: pb_compact_task.compaction_filter_mask,
396            table_options: pb_compact_task
397                .table_options
398                .iter()
399                .map(|(table_id, v)| (*table_id, *v))
400                .collect(),
401            current_epoch_time: pb_compact_task.current_epoch_time,
402            target_sub_level_id: pb_compact_task.target_sub_level_id,
403            task_type: PbTaskType::try_from(pb_compact_task.task_type).unwrap(),
404            split_by_state_table: pb_compact_task.split_by_state_table,
405            split_weight_by_vnode: pb_compact_task.split_weight_by_vnode,
406            table_vnode_partition: pb_compact_task
407                .table_vnode_partition
408                .iter()
409                .map(|(table_id, v)| (*table_id, *v))
410                .collect(),
411            pk_prefix_table_watermarks,
412            non_pk_prefix_table_watermarks,
413            value_table_watermarks,
414            table_schemas: pb_compact_task
415                .table_schemas
416                .iter()
417                .map(|(table_id, v)| (*table_id, v.clone()))
418                .collect(),
419            max_sub_compaction: pb_compact_task.max_sub_compaction,
420            compaction_group_version_id: pb_compact_task.compaction_group_version_id,
421            max_kv_count_for_xor16: pb_compact_task.max_kv_count_for_xor16,
422            max_vnode_key_range_bytes: pb_compact_task.max_vnode_key_range_bytes,
423        }
424    }
425}
426
427impl From<CompactTask> for PbCompactTask {
428    fn from(compact_task: CompactTask) -> Self {
429        #[expect(deprecated)]
430        Self {
431            input_ssts: compact_task
432                .input_ssts
433                .into_iter()
434                .map(|input_level| input_level.into())
435                .collect_vec(),
436            splits: compact_task
437                .splits
438                .into_iter()
439                .map(|keyrange| PbKeyRange {
440                    left: keyrange.left.into(),
441                    right: keyrange.right.into(),
442                    right_exclusive: keyrange.right_exclusive,
443                })
444                .collect_vec(),
445            sorted_output_ssts: compact_task
446                .sorted_output_ssts
447                .into_iter()
448                .map(|sst| sst.into())
449                .collect_vec(),
450            task_id: compact_task.task_id,
451            target_level: compact_task.target_level,
452            gc_delete_keys: compact_task.gc_delete_keys,
453            base_level: compact_task.base_level,
454            task_status: compact_task.task_status.into(),
455            compaction_group_id: compact_task.compaction_group_id,
456            existing_table_ids: compact_task.existing_table_ids.clone(),
457            compression_algorithm: compact_task.compression_algorithm,
458            target_file_size: compact_task.target_file_size,
459            compaction_filter_mask: compact_task.compaction_filter_mask,
460            table_options: compact_task.table_options.clone(),
461            current_epoch_time: compact_task.current_epoch_time,
462            target_sub_level_id: compact_task.target_sub_level_id,
463            task_type: compact_task.task_type.into(),
464            split_weight_by_vnode: compact_task.split_weight_by_vnode,
465            table_vnode_partition: compact_task.table_vnode_partition.clone(),
466            table_watermarks: compact_task
467                .pk_prefix_table_watermarks
468                .into_iter()
469                .chain(compact_task.non_pk_prefix_table_watermarks)
470                .chain(compact_task.value_table_watermarks)
471                .map(|(table_id, table_watermark)| (table_id, table_watermark.into()))
472                .collect(),
473            split_by_state_table: compact_task.split_by_state_table,
474            table_schemas: compact_task.table_schemas.clone(),
475            max_sub_compaction: compact_task.max_sub_compaction,
476            compaction_group_version_id: compact_task.compaction_group_version_id,
477            max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
478            max_vnode_key_range_bytes: compact_task.max_vnode_key_range_bytes,
479        }
480    }
481}
482
483impl From<&CompactTask> for PbCompactTask {
484    fn from(compact_task: &CompactTask) -> Self {
485        #[expect(deprecated)]
486        Self {
487            input_ssts: compact_task
488                .input_ssts
489                .iter()
490                .map(|input_level| input_level.into())
491                .collect_vec(),
492            splits: compact_task
493                .splits
494                .iter()
495                .map(|keyrange| PbKeyRange {
496                    left: keyrange.left.to_vec(),
497                    right: keyrange.right.to_vec(),
498                    right_exclusive: keyrange.right_exclusive,
499                })
500                .collect_vec(),
501            sorted_output_ssts: compact_task
502                .sorted_output_ssts
503                .iter()
504                .map(|sst| sst.into())
505                .collect_vec(),
506            task_id: compact_task.task_id,
507            target_level: compact_task.target_level,
508            gc_delete_keys: compact_task.gc_delete_keys,
509            base_level: compact_task.base_level,
510            task_status: compact_task.task_status.into(),
511            compaction_group_id: compact_task.compaction_group_id,
512            existing_table_ids: compact_task.existing_table_ids.clone(),
513            compression_algorithm: compact_task.compression_algorithm,
514            target_file_size: compact_task.target_file_size,
515            compaction_filter_mask: compact_task.compaction_filter_mask,
516            table_options: compact_task.table_options.clone(),
517            current_epoch_time: compact_task.current_epoch_time,
518            target_sub_level_id: compact_task.target_sub_level_id,
519            task_type: compact_task.task_type.into(),
520            split_weight_by_vnode: compact_task.split_weight_by_vnode,
521            table_vnode_partition: compact_task.table_vnode_partition.clone(),
522            table_watermarks: compact_task
523                .pk_prefix_table_watermarks
524                .iter()
525                .chain(compact_task.non_pk_prefix_table_watermarks.iter())
526                .chain(compact_task.value_table_watermarks.iter())
527                .map(|(table_id, table_watermark)| (*table_id, table_watermark.into()))
528                .collect(),
529            split_by_state_table: compact_task.split_by_state_table,
530            table_schemas: compact_task.table_schemas.clone(),
531            max_sub_compaction: compact_task.max_sub_compaction,
532            compaction_group_version_id: compact_task.compaction_group_version_id,
533            max_kv_count_for_xor16: compact_task.max_kv_count_for_xor16,
534            max_vnode_key_range_bytes: compact_task.max_vnode_key_range_bytes,
535        }
536    }
537}
538
539#[derive(Clone, PartialEq, Default)]
540pub struct ValidationTask {
541    pub sst_infos: Vec<SstableInfo>,
542    pub sst_id_to_worker_id: HashMap<HummockSstableObjectId, WorkerId>,
543}
544
545impl From<PbValidationTask> for ValidationTask {
546    fn from(pb_validation_task: PbValidationTask) -> Self {
547        Self {
548            sst_infos: pb_validation_task
549                .sst_infos
550                .into_iter()
551                .map(SstableInfo::from)
552                .collect_vec(),
553            sst_id_to_worker_id: pb_validation_task.sst_id_to_worker_id.into_iter().collect(),
554        }
555    }
556}
557
558impl From<ValidationTask> for PbValidationTask {
559    fn from(validation_task: ValidationTask) -> Self {
560        Self {
561            sst_infos: validation_task
562                .sst_infos
563                .into_iter()
564                .map(|sst| sst.into())
565                .collect_vec(),
566            sst_id_to_worker_id: validation_task.sst_id_to_worker_id,
567        }
568    }
569}
570
571impl ValidationTask {
572    pub fn estimated_encode_len(&self) -> usize {
573        self.sst_infos
574            .iter()
575            .map(|sst| sst.estimated_encode_len())
576            .sum::<usize>()
577            + self.sst_id_to_worker_id.len() * (size_of::<u64>() + size_of::<u32>())
578            + size_of::<u64>()
579    }
580}
581
582#[derive(Clone, PartialEq, Default, Debug)]
583pub struct ReportTask {
584    pub table_stats_change: HashMap<TableId, PbTableStats>,
585    pub task_id: u64,
586    pub task_status: TaskStatus,
587    pub sorted_output_ssts: Vec<SstableInfo>,
588    pub object_timestamps: HashMap<HummockSstableObjectId, u64>,
589}
590
591impl From<PbReportTask> for ReportTask {
592    fn from(value: PbReportTask) -> Self {
593        Self {
594            table_stats_change: value.table_stats_change.clone(),
595            task_id: value.task_id,
596            task_status: PbTaskStatus::try_from(value.task_status).unwrap(),
597            sorted_output_ssts: value
598                .sorted_output_ssts
599                .into_iter()
600                .map(SstableInfo::from)
601                .collect_vec(),
602            object_timestamps: value.object_timestamps,
603        }
604    }
605}
606
607impl From<ReportTask> for PbReportTask {
608    fn from(value: ReportTask) -> Self {
609        Self {
610            table_stats_change: value.table_stats_change.clone(),
611            task_id: value.task_id,
612            task_status: value.task_status.into(),
613            sorted_output_ssts: value
614                .sorted_output_ssts
615                .into_iter()
616                .map(|sst| sst.into())
617                .collect_vec(),
618            object_timestamps: value.object_timestamps,
619        }
620    }
621}