risingwave_ctl/cmd_impl/hummock/
migrate_legacy_object.rs1use std::time::Instant;
16
17use anyhow::anyhow;
18use futures::StreamExt;
19use futures::future::try_join_all;
20use risingwave_common::config::ObjectStoreConfig;
21use risingwave_hummock_sdk::{OBJECT_SUFFIX, get_object_id_from_path, get_sst_data_path};
22use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
23use risingwave_object_store::object::prefix::opendal_engine::get_object_prefix;
24use risingwave_object_store::object::{
25 ObjectStoreImpl, OpendalObjectStore, build_remote_object_store,
26};
27
28pub async fn migrate_legacy_object(
29 url: String,
30 source_dir: String,
31 target_dir: String,
32 concurrency: u32,
33) -> anyhow::Result<()> {
34 let source_dir = source_dir.trim_end_matches('/');
35 let target_dir = target_dir.trim_end_matches('/');
36 println!("Normalized source_dir: {source_dir}.");
37 println!("Normalized target_dir: {target_dir}.");
38 if source_dir.is_empty() || target_dir.is_empty() {
39 return Err(anyhow!("the source_dir and target_dir must not be empty"));
40 }
41 if target_dir.starts_with(source_dir) {
42 return Err(anyhow!("the target_dir must not include source_dir"));
43 }
44 let mut config = ObjectStoreConfig::default();
45 config.s3.developer.use_opendal = true;
46 let store = build_remote_object_store(
47 &url,
48 ObjectStoreMetrics::unused().into(),
49 "migrate_legacy_object",
50 config.into(),
51 )
52 .await;
53 let ObjectStoreImpl::Opendal(opendal) = store else {
54 return Err(anyhow!("OpenDAL is required"));
55 };
56 let mut iter = opendal.list(source_dir, None, None).await?;
57 let mut count = 0;
58 println!("Migration is started: from {source_dir} to {target_dir}.");
59 let mut from_to = Vec::with_capacity(concurrency as usize);
60 let timer = Instant::now();
61 while let Some(object) = iter.next().await {
62 let object = object?;
63 if !object.key.ends_with(OBJECT_SUFFIX) {
64 let legacy_path = object.key;
65 assert_eq!(
66 &legacy_path[..source_dir.len()],
67 source_dir,
68 "{legacy_path} versus {source_dir}"
69 );
70 if legacy_path.ends_with('/') {
71 tracing::warn!(legacy_path, "skip directory");
72 continue;
73 }
74 let new_path = format!("{}{}", target_dir, &legacy_path[source_dir.len()..]);
75 from_to.push((legacy_path, new_path));
76 } else {
77 let object_id = get_object_id_from_path(&object.key);
78 let legacy_prefix = get_object_prefix(object_id, false);
79 let legacy_path = get_sst_data_path(&legacy_prefix, source_dir, object_id);
80 if object.key != legacy_path {
81 return Err(anyhow!(format!(
82 "the source object store does not appear to be legacy: {} versus {}",
83 object.key, legacy_path
84 )));
85 }
86 let new_path =
87 get_sst_data_path(&get_object_prefix(object_id, true), target_dir, object_id);
88 from_to.push((legacy_path, new_path));
89 }
90 count += 1;
91 if from_to.len() >= concurrency as usize {
92 copy(std::mem::take(&mut from_to).into_iter(), opendal.inner()).await?;
93 }
94 }
95 if !from_to.is_empty() {
96 copy(from_to.into_iter(), opendal.inner()).await?;
97 }
98 let cost = timer.elapsed();
99 println!(
100 "Migration is finished in {} seconds. {count} objects have been migrated from {source_dir} to {target_dir}.",
101 cost.as_secs()
102 );
103 Ok(())
104}
105
106async fn copy(
107 from_to: impl Iterator<Item = (String, String)>,
108 opendal: &OpendalObjectStore,
109) -> anyhow::Result<()> {
110 let futures = from_to.map(|(from_path, to_path)| async move {
111 println!("From {from_path} to {to_path}");
112 opendal.copy(&from_path, &to_path).await
113 });
114 try_join_all(futures).await?;
115 Ok(())
116}