risingwave_ctl/cmd_impl/hummock/
migrate_legacy_object.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use anyhow::anyhow;
18use futures::StreamExt;
19use futures::future::try_join_all;
20use risingwave_common::config::ObjectStoreConfig;
21use risingwave_hummock_sdk::{OBJECT_SUFFIX, get_object_id_from_path, get_sst_data_path};
22use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
23use risingwave_object_store::object::prefix::opendal_engine::get_object_prefix;
24use risingwave_object_store::object::{
25    ObjectStoreImpl, OpendalObjectStore, build_remote_object_store,
26};
27
28pub async fn migrate_legacy_object(
29    url: String,
30    source_dir: String,
31    target_dir: String,
32    concurrency: u32,
33) -> anyhow::Result<()> {
34    let source_dir = source_dir.trim_end_matches('/');
35    let target_dir = target_dir.trim_end_matches('/');
36    println!("Normalized source_dir: {source_dir}.");
37    println!("Normalized target_dir: {target_dir}.");
38    if source_dir.is_empty() || target_dir.is_empty() {
39        return Err(anyhow!("the source_dir and target_dir must not be empty"));
40    }
41    if target_dir.starts_with(source_dir) {
42        return Err(anyhow!("the target_dir must not include source_dir"));
43    }
44    let mut config = ObjectStoreConfig::default();
45    config.s3.developer.use_opendal = true;
46    let store = build_remote_object_store(
47        &url,
48        ObjectStoreMetrics::unused().into(),
49        "migrate_legacy_object",
50        config.into(),
51    )
52    .await;
53    let ObjectStoreImpl::Opendal(opendal) = store else {
54        return Err(anyhow!("OpenDAL is required"));
55    };
56    let mut iter = opendal.list(source_dir, None, None).await?;
57    let mut count = 0;
58    println!("Migration is started: from {source_dir} to {target_dir}.");
59    let mut from_to = Vec::with_capacity(concurrency as usize);
60    let timer = Instant::now();
61    while let Some(object) = iter.next().await {
62        let object = object?;
63        if !object.key.ends_with(OBJECT_SUFFIX) {
64            let legacy_path = object.key;
65            assert_eq!(
66                &legacy_path[..source_dir.len()],
67                source_dir,
68                "{legacy_path} versus {source_dir}"
69            );
70            if legacy_path.ends_with('/') {
71                tracing::warn!(legacy_path, "skip directory");
72                continue;
73            }
74            let new_path = format!("{}{}", target_dir, &legacy_path[source_dir.len()..]);
75            from_to.push((legacy_path, new_path));
76        } else {
77            let object_id = get_object_id_from_path(&object.key);
78            let legacy_prefix = get_object_prefix(object_id, false);
79            let legacy_path = get_sst_data_path(&legacy_prefix, source_dir, object_id);
80            if object.key != legacy_path {
81                return Err(anyhow!(format!(
82                    "the source object store does not appear to be legacy: {} versus {}",
83                    object.key, legacy_path
84                )));
85            }
86            let new_path =
87                get_sst_data_path(&get_object_prefix(object_id, true), target_dir, object_id);
88            from_to.push((legacy_path, new_path));
89        }
90        count += 1;
91        if from_to.len() >= concurrency as usize {
92            copy(std::mem::take(&mut from_to).into_iter(), opendal.inner()).await?;
93        }
94    }
95    if !from_to.is_empty() {
96        copy(from_to.into_iter(), opendal.inner()).await?;
97    }
98    let cost = timer.elapsed();
99    println!(
100        "Migration is finished in {} seconds. {count} objects have been migrated from {source_dir} to {target_dir}.",
101        cost.as_secs()
102    );
103    Ok(())
104}
105
106async fn copy(
107    from_to: impl Iterator<Item = (String, String)>,
108    opendal: &OpendalObjectStore,
109) -> anyhow::Result<()> {
110    let futures = from_to.map(|(from_path, to_path)| async move {
111        println!("From {from_path} to {to_path}");
112        opendal.copy(&from_path, &to_path).await
113    });
114    try_join_all(futures).await?;
115    Ok(())
116}