mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-11 15:32:56 +00:00
committed by
GitHub
parent
ec8dcc2231
commit
2ff1a5cecd
@@ -28,7 +28,7 @@ use crate::safekeeper::Term;
|
||||
use crate::safekeeper::{ServerInfo, TermLsn};
|
||||
use crate::send_wal::WalSenderState;
|
||||
use crate::timeline::PeerInfo;
|
||||
use crate::{copy_timeline, debug_dump, pull_timeline};
|
||||
use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
|
||||
|
||||
use crate::timelines_global_map::TimelineDeleteForceResult;
|
||||
use crate::GlobalTimelines;
|
||||
@@ -465,6 +465,26 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn patch_control_file_handler(
|
||||
mut request: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
|
||||
let patch_request: patch_control_file::Request = json_request(&mut request).await?;
|
||||
let response = patch_control_file::handle_request(tli, patch_request)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
/// Safekeeper http router.
|
||||
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
let mut router = endpoint::make_router();
|
||||
@@ -526,6 +546,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|
||||
|r| request_span(r, timeline_copy_handler),
|
||||
)
|
||||
.patch(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
|
||||
|r| request_span(r, patch_control_file_handler),
|
||||
)
|
||||
// for tests
|
||||
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
|
||||
request_span(r, record_safekeeper_info)
|
||||
|
||||
@@ -22,6 +22,7 @@ pub mod handler;
|
||||
pub mod http;
|
||||
pub mod json_ctrl;
|
||||
pub mod metrics;
|
||||
pub mod patch_control_file;
|
||||
pub mod pull_timeline;
|
||||
pub mod receive_wal;
|
||||
pub mod recovery;
|
||||
|
||||
85
safekeeper/src/patch_control_file.rs
Normal file
85
safekeeper/src/patch_control_file.rs
Normal file
@@ -0,0 +1,85 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use tracing::info;
|
||||
|
||||
use crate::{state::TimelinePersistentState, timeline::Timeline};
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct Request {
|
||||
/// JSON object with fields to update
|
||||
pub updates: serde_json::Value,
|
||||
/// List of fields to apply
|
||||
pub apply_fields: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct Response {
|
||||
pub old_control_file: TimelinePersistentState,
|
||||
pub new_control_file: TimelinePersistentState,
|
||||
}
|
||||
|
||||
/// Patch control file with given request. Will update the persistent state using
|
||||
/// fields from the request and persist the new state on disk.
|
||||
pub async fn handle_request(tli: Arc<Timeline>, request: Request) -> anyhow::Result<Response> {
|
||||
let response = tli
|
||||
.map_control_file(|state| {
|
||||
let old_control_file = state.clone();
|
||||
let new_control_file = state_apply_diff(&old_control_file, &request)?;
|
||||
|
||||
info!(
|
||||
"patching control file, old: {:?}, new: {:?}, patch: {:?}",
|
||||
old_control_file, new_control_file, request
|
||||
);
|
||||
*state = new_control_file.clone();
|
||||
|
||||
Ok(Response {
|
||||
old_control_file,
|
||||
new_control_file,
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
fn state_apply_diff(
|
||||
state: &TimelinePersistentState,
|
||||
request: &Request,
|
||||
) -> anyhow::Result<TimelinePersistentState> {
|
||||
let mut json_value = serde_json::to_value(state)?;
|
||||
|
||||
if let Value::Object(a) = &mut json_value {
|
||||
if let Value::Object(b) = &request.updates {
|
||||
json_apply_diff(a, b, &request.apply_fields)?;
|
||||
} else {
|
||||
anyhow::bail!("request.updates is not a json object")
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("TimelinePersistentState is not a json object")
|
||||
}
|
||||
|
||||
let new_state: TimelinePersistentState = serde_json::from_value(json_value)?;
|
||||
Ok(new_state)
|
||||
}
|
||||
|
||||
fn json_apply_diff(
|
||||
object: &mut serde_json::Map<String, Value>,
|
||||
updates: &serde_json::Map<String, Value>,
|
||||
apply_keys: &Vec<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
for key in apply_keys {
|
||||
if let Some(new_value) = updates.get(key) {
|
||||
if let Some(existing_value) = object.get_mut(key) {
|
||||
*existing_value = new_value.clone();
|
||||
} else {
|
||||
anyhow::bail!("key not found in original object: {}", key);
|
||||
}
|
||||
} else {
|
||||
anyhow::bail!("key not found in request.updates: {}", key);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -901,6 +901,20 @@ impl Timeline {
|
||||
file_open,
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply a function to the control file state and persist it.
|
||||
pub async fn map_control_file<T>(
|
||||
&self,
|
||||
f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
|
||||
) -> Result<T> {
|
||||
let mut state = self.write_shared_state().await;
|
||||
let mut persistent_state = state.sk.state.start_change();
|
||||
// If f returns error, we abort the change and don't persist anything.
|
||||
let res = f(&mut persistent_state)?;
|
||||
// If persisting fails, we abort the change and return error.
|
||||
state.sk.state.finish_change(&persistent_state).await?;
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
/// Deletes directory and it's contents. Returns false if directory does not exist.
|
||||
|
||||
@@ -3443,6 +3443,24 @@ class SafekeeperHttpClient(requests.Session):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def patch_control_file(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
patch: Dict[str, Any],
|
||||
) -> Dict[str, Any]:
|
||||
res = self.patch(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/control_file",
|
||||
json={
|
||||
"updates": patch,
|
||||
"apply_fields": list(patch.keys()),
|
||||
},
|
||||
)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
|
||||
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
|
||||
res.raise_for_status()
|
||||
|
||||
@@ -1946,3 +1946,51 @@ def test_timeline_copy(neon_env_builder: NeonEnvBuilder, insert_rows: int):
|
||||
assert orig_digest == new_digest
|
||||
|
||||
# TODO: test timelines can start after copy
|
||||
|
||||
|
||||
def test_patch_control_file(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
# initialize safekeeper
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
|
||||
# update control file
|
||||
res = (
|
||||
env.safekeepers[0]
|
||||
.http_client()
|
||||
.patch_control_file(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
{
|
||||
"timeline_start_lsn": "0/1",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
timeline_start_lsn_before = res["old_control_file"]["timeline_start_lsn"]
|
||||
timeline_start_lsn_after = res["new_control_file"]["timeline_start_lsn"]
|
||||
|
||||
log.info(f"patch_control_file response: {res}")
|
||||
log.info(
|
||||
f"updated control file timeline_start_lsn, before {timeline_start_lsn_before}, after {timeline_start_lsn_after}"
|
||||
)
|
||||
|
||||
assert timeline_start_lsn_after == "0/1"
|
||||
env.safekeepers[0].stop().start()
|
||||
|
||||
# wait/check that safekeeper is alive
|
||||
endpoint.safe_psql("insert into t values (1, 'payload')")
|
||||
|
||||
# check that timeline_start_lsn is updated
|
||||
res = (
|
||||
env.safekeepers[0]
|
||||
.http_client()
|
||||
.debug_dump({"dump_control_file": "true", "timeline_id": str(timeline_id)})
|
||||
)
|
||||
log.info(f"dump_control_file response: {res}")
|
||||
assert res["timelines"][0]["control_file"]["timeline_start_lsn"] == "0/1"
|
||||
|
||||
Reference in New Issue
Block a user