mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
chore(pageserver): add force aux file policy switch handler (#7842)
For existing users, we want to allow doing a force switch for their aux file policy. Part of #7462 --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -16,6 +16,7 @@ use hyper::header;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::models::IngestAuxFilesRequest;
|
||||
use pageserver_api::models::ListAuxFilesRequest;
|
||||
use pageserver_api::models::LocationConfig;
|
||||
@@ -2307,6 +2308,31 @@ async fn post_tracing_event_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn force_aux_policy_switch_handler(
|
||||
mut r: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&r, None)?;
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&r, "timeline_id")?;
|
||||
let policy: AuxFilePolicy = json_request(&mut r).await?;
|
||||
|
||||
let state = get_state(&r);
|
||||
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
timeline
|
||||
.do_switch_aux_policy(policy)
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn put_io_engine_handler(
|
||||
mut r: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -2384,19 +2410,9 @@ async fn list_aux_files(
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let process = || async move {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let files = timeline.list_aux_files(body.lsn, &ctx).await?;
|
||||
Ok::<_, anyhow::Error>(files)
|
||||
};
|
||||
|
||||
match process().await {
|
||||
Ok(st) => json_response(StatusCode::OK, st),
|
||||
Err(err) => json_response(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
ApiError::InternalServerError(err).to_string(),
|
||||
),
|
||||
}
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let files = timeline.list_aux_files(body.lsn, &ctx).await?;
|
||||
json_response(StatusCode::OK, files)
|
||||
}
|
||||
|
||||
async fn ingest_aux_files(
|
||||
@@ -2414,24 +2430,22 @@ async fn ingest_aux_files(
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let process = || async move {
|
||||
let mut modification = timeline.begin_modification(Lsn(
|
||||
timeline.get_last_record_lsn().0 + 8
|
||||
) /* advance LSN by 8 */);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
for (fname, content) in body.aux_files {
|
||||
modification
|
||||
.put_file(&fname, content.as_bytes(), &ctx)
|
||||
.await?;
|
||||
}
|
||||
modification.commit(&ctx).await?;
|
||||
Ok::<_, anyhow::Error>(())
|
||||
};
|
||||
|
||||
match process().await {
|
||||
Ok(st) => json_response(StatusCode::OK, st),
|
||||
Err(err) => Err(ApiError::InternalServerError(err)),
|
||||
let mut modification = timeline.begin_modification(
|
||||
Lsn(timeline.get_last_record_lsn().0 + 8), /* advance LSN by 8 */
|
||||
);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
for (fname, content) in body.aux_files {
|
||||
modification
|
||||
.put_file(&fname, content.as_bytes(), &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
}
|
||||
modification
|
||||
.commit(&ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Report on the largest tenants on this pageserver, for the storage controller to identify
|
||||
@@ -2814,6 +2828,10 @@ pub fn make_router(
|
||||
|r| api_handler(r, timeline_collect_keyspace),
|
||||
)
|
||||
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
|
||||
.put(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|
||||
|r| api_handler(r, force_aux_policy_switch_handler),
|
||||
)
|
||||
.get("/v1/utilization", |r| api_handler(r, get_utilization))
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",
|
||||
|
||||
@@ -1489,14 +1489,7 @@ impl<'a> DatadirModification<'a> {
|
||||
if aux_files_key_v1.is_empty() {
|
||||
None
|
||||
} else {
|
||||
self.tline
|
||||
.last_aux_file_policy
|
||||
.store(Some(AuxFilePolicy::V1));
|
||||
self.tline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_aux_file_policy_update(Some(
|
||||
AuxFilePolicy::V1,
|
||||
))?;
|
||||
self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
|
||||
Some(AuxFilePolicy::V1)
|
||||
}
|
||||
} else {
|
||||
@@ -1504,10 +1497,7 @@ impl<'a> DatadirModification<'a> {
|
||||
};
|
||||
|
||||
if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
|
||||
self.tline.last_aux_file_policy.store(Some(switch_policy));
|
||||
self.tline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_aux_file_policy_update(Some(switch_policy))?;
|
||||
self.tline.do_switch_aux_policy(switch_policy)?;
|
||||
info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
|
||||
switch_policy
|
||||
} else {
|
||||
|
||||
@@ -5999,6 +5999,67 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn aux_file_policy_force_switch() {
|
||||
let mut harness = TenantHarness::create("aux_file_policy_force_switch").unwrap();
|
||||
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let mut lsn = Lsn(0x08);
|
||||
|
||||
let tline: Arc<Timeline> = tenant
|
||||
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
None,
|
||||
"no aux file is written so it should be unset"
|
||||
);
|
||||
|
||||
{
|
||||
lsn += 8;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification
|
||||
.put_file("pg_logical/mappings/test1", b"first", &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
tline.do_switch_aux_policy(AuxFilePolicy::V2).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
Some(AuxFilePolicy::V2),
|
||||
"dirty index_part.json reflected state is yet to be updated"
|
||||
);
|
||||
|
||||
// lose all data from v1
|
||||
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(files.get("pg_logical/mappings/test1"), None);
|
||||
|
||||
{
|
||||
lsn += 8;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification
|
||||
.put_file("pg_logical/mappings/test2", b"second", &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
// read data ingested in v2
|
||||
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(
|
||||
files.get("pg_logical/mappings/test2"),
|
||||
Some(&bytes::Bytes::from_static(b"second"))
|
||||
);
|
||||
// lose all data from v1
|
||||
assert_eq!(files.get("pg_logical/mappings/test1"), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn aux_file_policy_auto_detect() {
|
||||
let mut harness = TenantHarness::create("aux_file_policy_auto_detect").unwrap();
|
||||
|
||||
@@ -4606,6 +4606,14 @@ impl Timeline {
|
||||
) -> Result<Vec<TimelineId>, anyhow::Error> {
|
||||
detach_ancestor::complete(self, tenant, prepared, ctx).await
|
||||
}
|
||||
|
||||
/// Switch aux file policy and schedule upload to the index part.
|
||||
pub(crate) fn do_switch_aux_policy(&self, policy: AuxFilePolicy) -> anyhow::Result<()> {
|
||||
self.last_aux_file_policy.store(Some(policy));
|
||||
self.remote_client
|
||||
.schedule_index_upload_for_aux_file_policy_update(Some(policy))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Top-level failure to compact.
|
||||
|
||||
Reference in New Issue
Block a user