mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 14:00:38 +00:00
feat(pagebench): add aux file bench (#7746)
part of https://github.com/neondatabase/neon/issues/7462 ## Summary of changes This pull request adds two APIs to the pageserver management API: list_aux_files and ingest_aux_files. The aux file pagebench is intended to be used on an empty timeline because the data do not go through the safekeeper. LSNs are advanced by 8 for each ingestion, to avoid invariant checks inside the pageserver. For now, I only care about space amplification / read amplification, so the bench is designed in a very simple way: ingest 10000 files, and I will manually dump the layer map to analyze. --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -841,6 +841,16 @@ pub struct DownloadRemoteLayersTaskSpawnRequest {
|
||||
pub max_concurrent_downloads: NonZeroUsize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct IngestAuxFilesRequest {
|
||||
pub aux_files: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ListAuxFilesRequest {
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct DownloadRemoteLayersTaskInfo {
|
||||
pub task_id: String,
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::{models::*, shard::TenantShardId};
|
||||
use reqwest::{IntoUrl, Method, StatusCode};
|
||||
use utils::{
|
||||
http::error::HttpErrorBody,
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
pub mod util;
|
||||
@@ -561,4 +565,57 @@ impl Client {
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn ingest_aux_files(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
aux_files: HashMap<String, String>,
|
||||
) -> Result<bool> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/ingest_aux_files",
|
||||
self.mgmt_api_endpoint, tenant_shard_id, timeline_id
|
||||
);
|
||||
let resp = self
|
||||
.request_noerror(Method::POST, &uri, IngestAuxFilesRequest { aux_files })
|
||||
.await?;
|
||||
match resp.status() {
|
||||
StatusCode::OK => Ok(true),
|
||||
status => Err(match resp.json::<HttpErrorBody>().await {
|
||||
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
|
||||
Err(_) => {
|
||||
Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
|
||||
}
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_aux_files(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<HashMap<String, Bytes>> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/list_aux_files",
|
||||
self.mgmt_api_endpoint, tenant_shard_id, timeline_id
|
||||
);
|
||||
let resp = self
|
||||
.request_noerror(Method::POST, &uri, ListAuxFilesRequest { lsn })
|
||||
.await?;
|
||||
match resp.status() {
|
||||
StatusCode::OK => {
|
||||
let resp: HashMap<String, Bytes> = resp.json().await.map_err(|e| {
|
||||
Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))
|
||||
})?;
|
||||
Ok(resp)
|
||||
}
|
||||
status => Err(match resp.json::<HttpErrorBody>().await {
|
||||
Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg),
|
||||
Err(_) => {
|
||||
Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri))
|
||||
}
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
98
pageserver/pagebench/src/cmd/aux_files.rs
Normal file
98
pageserver/pagebench/src/cmd/aux_files.rs
Normal file
@@ -0,0 +1,98 @@
|
||||
use pageserver_api::models::{AuxFilePolicy, TenantConfig, TenantConfigRequest};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Ingest aux files into the pageserver.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
page_service_connstring: String,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let main_task = rt.spawn(main_impl(args));
|
||||
rt.block_on(main_task).unwrap()
|
||||
}
|
||||
|
||||
async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
crate::util::cli::targets::Spec {
|
||||
limit_to_first_n_targets: None,
|
||||
targets: {
|
||||
if let Some(targets) = &args.targets {
|
||||
if targets.len() != 1 {
|
||||
anyhow::bail!("must specify exactly one target");
|
||||
}
|
||||
Some(targets.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let timeline = timelines[0];
|
||||
let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id);
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
println!("operating on timeline {}", timeline);
|
||||
|
||||
mgmt_api_client
|
||||
.tenant_config(&TenantConfigRequest {
|
||||
tenant_id: timeline.tenant_id,
|
||||
config: TenantConfig {
|
||||
switch_aux_file_policy: Some(AuxFilePolicy::V2),
|
||||
..Default::default()
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
|
||||
for batch in 0..100 {
|
||||
let items = (0..100)
|
||||
.map(|id| {
|
||||
(
|
||||
format!("pg_logical/mappings/{:03}.{:03}", batch, id),
|
||||
format!("{:08}", id),
|
||||
)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
let file_cnt = items.len();
|
||||
mgmt_api_client
|
||||
.ingest_aux_files(tenant_shard_id, timeline_id, items)
|
||||
.await?;
|
||||
println!("ingested {file_cnt} files");
|
||||
}
|
||||
|
||||
let files = mgmt_api_client
|
||||
.list_aux_files(tenant_shard_id, timeline_id, Lsn(Lsn::MAX.0 - 1))
|
||||
.await?;
|
||||
|
||||
println!("{} files found", files.len());
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
@@ -14,6 +14,7 @@ mod util {
|
||||
|
||||
/// The pagebench CLI sub-commands, dispatched in [`main`] below.
|
||||
mod cmd {
|
||||
pub(super) mod aux_files;
|
||||
pub(super) mod basebackup;
|
||||
pub(super) mod getpage_latest_lsn;
|
||||
pub(super) mod ondemand_download_churn;
|
||||
@@ -27,6 +28,7 @@ enum Args {
|
||||
GetPageLatestLsn(cmd::getpage_latest_lsn::Args),
|
||||
TriggerInitialSizeCalculation(cmd::trigger_initial_size_calculation::Args),
|
||||
OndemandDownloadChurn(cmd::ondemand_download_churn::Args),
|
||||
AuxFiles(cmd::aux_files::Args),
|
||||
}
|
||||
|
||||
fn main() {
|
||||
@@ -46,6 +48,7 @@ fn main() {
|
||||
cmd::trigger_initial_size_calculation::main(args)
|
||||
}
|
||||
Args::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args),
|
||||
Args::AuxFiles(args) => cmd::aux_files::main(args),
|
||||
}
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ use hyper::header;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use pageserver_api::models::IngestAuxFilesRequest;
|
||||
use pageserver_api::models::ListAuxFilesRequest;
|
||||
use pageserver_api::models::LocationConfig;
|
||||
use pageserver_api::models::LocationConfigListResponse;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
@@ -2331,6 +2333,71 @@ async fn get_utilization(
|
||||
.map_err(ApiError::InternalServerError)
|
||||
}
|
||||
|
||||
async fn list_aux_files(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let body: ListAuxFilesRequest = json_request(&mut request).await?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let process = || async move {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let files = timeline.list_aux_files(body.lsn, &ctx).await?;
|
||||
Ok::<_, anyhow::Error>(files)
|
||||
};
|
||||
|
||||
match process().await {
|
||||
Ok(st) => json_response(StatusCode::OK, st),
|
||||
Err(err) => json_response(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
ApiError::InternalServerError(err).to_string(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
async fn ingest_aux_files(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let body: IngestAuxFilesRequest = json_request(&mut request).await?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let process = || async move {
|
||||
let mut modification = timeline.begin_modification(Lsn(
|
||||
timeline.get_last_record_lsn().0 + 8
|
||||
) /* advance LSN by 8 */);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
for (fname, content) in body.aux_files {
|
||||
modification
|
||||
.put_file(&fname, content.as_bytes(), &ctx)
|
||||
.await?;
|
||||
}
|
||||
modification.commit(&ctx).await?;
|
||||
Ok::<_, anyhow::Error>(())
|
||||
};
|
||||
|
||||
match process().await {
|
||||
Ok(st) => json_response(StatusCode::OK, st),
|
||||
Err(err) => Err(ApiError::InternalServerError(err)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Report on the largest tenants on this pageserver, for the storage controller to identify
|
||||
/// candidates for splitting
|
||||
async fn post_top_tenants(
|
||||
@@ -2708,6 +2775,14 @@ pub fn make_router(
|
||||
)
|
||||
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
|
||||
.get("/v1/utilization", |r| api_handler(r, get_utilization))
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",
|
||||
|r| testing_api_handler("ingest_aux_files", r, ingest_aux_files),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/list_aux_files",
|
||||
|r| testing_api_handler("list_aux_files", r, list_aux_files),
|
||||
)
|
||||
.post("/v1/top_tenants", |r| api_handler(r, post_top_tenants))
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user