mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Two separate commands
More easy to have an overview
This commit is contained in:
@@ -63,12 +63,15 @@ pub(crate) enum LayerCmd {
|
||||
#[clap(long)]
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
},
|
||||
Compress {
|
||||
CompressOne {
|
||||
dest_path: Utf8PathBuf,
|
||||
layer_file_path: Option<Utf8PathBuf>,
|
||||
tenant_remote_prefix: Option<String>,
|
||||
tenant_remote_config: Option<String>,
|
||||
layers_dir: Option<Utf8PathBuf>,
|
||||
layer_file_path: Utf8PathBuf,
|
||||
},
|
||||
CompressMany {
|
||||
dest_path: Utf8PathBuf,
|
||||
tenant_remote_prefix: String,
|
||||
tenant_remote_config: String,
|
||||
layers_dir: Utf8PathBuf,
|
||||
parallelism: Option<u32>,
|
||||
},
|
||||
}
|
||||
@@ -256,9 +259,25 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
|
||||
anyhow::bail!("not an image or delta layer: {layer_file_path}");
|
||||
}
|
||||
LayerCmd::Compress {
|
||||
LayerCmd::CompressOne {
|
||||
dest_path,
|
||||
layer_file_path,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
let stats =
|
||||
ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?;
|
||||
println!(
|
||||
"Statistics: {stats:#?}\n{}",
|
||||
serde_json::to_string(&stats).unwrap()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::CompressMany {
|
||||
dest_path,
|
||||
tenant_remote_prefix,
|
||||
tenant_remote_config,
|
||||
layers_dir,
|
||||
@@ -267,103 +286,83 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
let toml_document = toml_edit::Document::from_str(tenant_remote_config)?;
|
||||
let toml_item = toml_document
|
||||
.get("remote_storage")
|
||||
.expect("need remote_storage");
|
||||
let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config");
|
||||
let storage = remote_storage::GenericRemoteStorage::from_config(&config)?;
|
||||
let storage = Arc::new(storage);
|
||||
|
||||
if let Some(layer_file_path) = layer_file_path {
|
||||
let stats =
|
||||
ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?;
|
||||
println!(
|
||||
"Statistics: {stats:#?}\n{}",
|
||||
serde_json::to_string(&stats).unwrap()
|
||||
);
|
||||
} else if let (
|
||||
Some(tenant_remote_prefix),
|
||||
Some(tenant_remote_config),
|
||||
Some(layers_dir),
|
||||
) = (tenant_remote_prefix, tenant_remote_config, layers_dir)
|
||||
{
|
||||
let toml_document = toml_edit::Document::from_str(tenant_remote_config)?;
|
||||
let toml_item = toml_document
|
||||
.get("remote_storage")
|
||||
.expect("need remote_storage");
|
||||
let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config");
|
||||
let storage = remote_storage::GenericRemoteStorage::from_config(&config)?;
|
||||
let storage = Arc::new(storage);
|
||||
let cancel = CancellationToken::new();
|
||||
let path = RemotePath::from_string(tenant_remote_prefix)?;
|
||||
let max_files = NonZeroU32::new(16000);
|
||||
let files_list = storage
|
||||
.list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel)
|
||||
.await?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let path = RemotePath::from_string(tenant_remote_prefix)?;
|
||||
let max_files = NonZeroU32::new(16000);
|
||||
let files_list = storage
|
||||
.list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel)
|
||||
.await?;
|
||||
let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize));
|
||||
|
||||
let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize));
|
||||
|
||||
let mut tasks = JoinSet::new();
|
||||
for file_key in files_list.keys.iter() {
|
||||
let Some(file_name) = file_key.object_name() else {
|
||||
continue;
|
||||
};
|
||||
let Ok(LayerName::Image(_layer_file_name)) = LayerName::from_str(file_name)
|
||||
else {
|
||||
// Skipping because it's either not a layer or a delta layer
|
||||
continue;
|
||||
};
|
||||
let json_file_path = layers_dir.join(format!("{file_name}.json"));
|
||||
if tokio::fs::try_exists(&json_file_path).await? {
|
||||
// If we have already created a report for the layer, skip it.
|
||||
continue;
|
||||
}
|
||||
let local_layer_path = layers_dir.join(file_name);
|
||||
async fn stats(
|
||||
semaphore: Arc<Semaphore>,
|
||||
local_layer_path: Utf8PathBuf,
|
||||
json_file_path: Utf8PathBuf,
|
||||
dest_path: Utf8PathBuf,
|
||||
storage: Arc<GenericRemoteStorage>,
|
||||
file_key: RemotePath,
|
||||
) -> Result<Vec<(Option<ImageCompressionAlgorithm>, u64)>, anyhow::Error>
|
||||
{
|
||||
let _permit = semaphore.acquire().await?;
|
||||
let cancel = CancellationToken::new();
|
||||
let download = storage.download(&file_key, &cancel).await?;
|
||||
let mut dest_layer_file =
|
||||
tokio::fs::File::create(&local_layer_path).await?;
|
||||
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?;
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
let stats =
|
||||
ImageLayer::compression_statistics(&dest_path, &local_layer_path, &ctx)
|
||||
.await?;
|
||||
|
||||
let stats_str = serde_json::to_string(&stats).unwrap();
|
||||
tokio::fs::write(json_file_path, stats_str).await?;
|
||||
|
||||
tokio::fs::remove_file(&local_layer_path).await?;
|
||||
Ok(stats)
|
||||
}
|
||||
let semaphore = semaphore.clone();
|
||||
let file_key = file_key.to_owned();
|
||||
let storage = storage.clone();
|
||||
let dest_path = dest_path.to_owned();
|
||||
let file_name = file_name.to_owned();
|
||||
tasks.spawn(async move {
|
||||
let stats = stats(
|
||||
semaphore,
|
||||
local_layer_path.to_owned(),
|
||||
json_file_path.to_owned(),
|
||||
dest_path,
|
||||
storage,
|
||||
file_key,
|
||||
)
|
||||
.await;
|
||||
println!("Statistics for {file_name}: {stats:#?}\n");
|
||||
});
|
||||
let mut tasks = JoinSet::new();
|
||||
for file_key in files_list.keys.iter() {
|
||||
let Some(file_name) = file_key.object_name() else {
|
||||
continue;
|
||||
};
|
||||
let Ok(LayerName::Image(_layer_file_name)) = LayerName::from_str(file_name) else {
|
||||
// Skipping because it's either not a layer or a delta layer
|
||||
continue;
|
||||
};
|
||||
let json_file_path = layers_dir.join(format!("{file_name}.json"));
|
||||
if tokio::fs::try_exists(&json_file_path).await? {
|
||||
// If we have already created a report for the layer, skip it.
|
||||
continue;
|
||||
}
|
||||
while let Some(_res) = tasks.join_next().await {}
|
||||
} else {
|
||||
anyhow::bail!("No tenant dir or remote config or layers dir specified");
|
||||
let local_layer_path = layers_dir.join(file_name);
|
||||
async fn stats(
|
||||
semaphore: Arc<Semaphore>,
|
||||
local_layer_path: Utf8PathBuf,
|
||||
json_file_path: Utf8PathBuf,
|
||||
dest_path: Utf8PathBuf,
|
||||
storage: Arc<GenericRemoteStorage>,
|
||||
file_key: RemotePath,
|
||||
) -> Result<Vec<(Option<ImageCompressionAlgorithm>, u64)>, anyhow::Error>
|
||||
{
|
||||
let _permit = semaphore.acquire().await?;
|
||||
let cancel = CancellationToken::new();
|
||||
let download = storage.download(&file_key, &cancel).await?;
|
||||
let mut dest_layer_file = tokio::fs::File::create(&local_layer_path).await?;
|
||||
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?;
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
let stats =
|
||||
ImageLayer::compression_statistics(&dest_path, &local_layer_path, &ctx)
|
||||
.await?;
|
||||
|
||||
let stats_str = serde_json::to_string(&stats).unwrap();
|
||||
tokio::fs::write(json_file_path, stats_str).await?;
|
||||
|
||||
tokio::fs::remove_file(&local_layer_path).await?;
|
||||
Ok(stats)
|
||||
}
|
||||
let semaphore = semaphore.clone();
|
||||
let file_key = file_key.to_owned();
|
||||
let storage = storage.clone();
|
||||
let dest_path = dest_path.to_owned();
|
||||
let file_name = file_name.to_owned();
|
||||
tasks.spawn(async move {
|
||||
let stats = stats(
|
||||
semaphore,
|
||||
local_layer_path.to_owned(),
|
||||
json_file_path.to_owned(),
|
||||
dest_path,
|
||||
storage,
|
||||
file_key,
|
||||
)
|
||||
.await;
|
||||
println!("Statistics for {file_name}: {stats:#?}\n");
|
||||
});
|
||||
}
|
||||
while let Some(_res) = tasks.join_next().await {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user