diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 919b6b2982..a0c0c7ca4c 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -28,7 +28,7 @@ use crate::safekeeper::Term; use crate::safekeeper::{ServerInfo, TermLsn}; use crate::send_wal::WalSenderState; use crate::timeline::PeerInfo; -use crate::{copy_timeline, debug_dump, pull_timeline}; +use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline}; use crate::timelines_global_map::TimelineDeleteForceResult; use crate::GlobalTimelines; @@ -465,6 +465,26 @@ async fn dump_debug_handler(mut request: Request) -> Result Ok(response) } +async fn patch_control_file_handler( + mut request: Request, +) -> Result, ApiError> { + check_permission(&request, None)?; + + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + + let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; + + let patch_request: patch_control_file::Request = json_request(&mut request).await?; + let response = patch_control_file::handle_request(tli, patch_request) + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, response) +} + /// Safekeeper http router. pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder { let mut router = endpoint::make_router(); @@ -526,6 +546,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy", |r| request_span(r, timeline_copy_handler), ) + .patch( + "/v1/tenant/:tenant_id/timeline/:timeline_id/control_file", + |r| request_span(r, patch_control_file_handler), + ) // for tests .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| { request_span(r, record_safekeeper_info) diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index f18a1ec22d..27b80fcbe8 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -22,6 +22,7 @@ pub mod handler; pub mod http; pub mod json_ctrl; pub mod metrics; +pub mod patch_control_file; pub mod pull_timeline; pub mod receive_wal; pub mod recovery; diff --git a/safekeeper/src/patch_control_file.rs b/safekeeper/src/patch_control_file.rs new file mode 100644 index 0000000000..2136d1b5f7 --- /dev/null +++ b/safekeeper/src/patch_control_file.rs @@ -0,0 +1,85 @@ +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tracing::info; + +use crate::{state::TimelinePersistentState, timeline::Timeline}; + +#[derive(Deserialize, Debug, Clone)] +pub struct Request { + /// JSON object with fields to update + pub updates: serde_json::Value, + /// List of fields to apply + pub apply_fields: Vec, +} + +#[derive(Serialize)] +pub struct Response { + pub old_control_file: TimelinePersistentState, + pub new_control_file: TimelinePersistentState, +} + +/// Patch control file with given request. Will update the persistent state using +/// fields from the request and persist the new state on disk. +pub async fn handle_request(tli: Arc, request: Request) -> anyhow::Result { + let response = tli + .map_control_file(|state| { + let old_control_file = state.clone(); + let new_control_file = state_apply_diff(&old_control_file, &request)?; + + info!( + "patching control file, old: {:?}, new: {:?}, patch: {:?}", + old_control_file, new_control_file, request + ); + *state = new_control_file.clone(); + + Ok(Response { + old_control_file, + new_control_file, + }) + }) + .await?; + + Ok(response) +} + +fn state_apply_diff( + state: &TimelinePersistentState, + request: &Request, +) -> anyhow::Result { + let mut json_value = serde_json::to_value(state)?; + + if let Value::Object(a) = &mut json_value { + if let Value::Object(b) = &request.updates { + json_apply_diff(a, b, &request.apply_fields)?; + } else { + anyhow::bail!("request.updates is not a json object") + } + } else { + anyhow::bail!("TimelinePersistentState is not a json object") + } + + let new_state: TimelinePersistentState = serde_json::from_value(json_value)?; + Ok(new_state) +} + +fn json_apply_diff( + object: &mut serde_json::Map, + updates: &serde_json::Map, + apply_keys: &Vec, +) -> anyhow::Result<()> { + for key in apply_keys { + if let Some(new_value) = updates.get(key) { + if let Some(existing_value) = object.get_mut(key) { + *existing_value = new_value.clone(); + } else { + anyhow::bail!("key not found in original object: {}", key); + } + } else { + anyhow::bail!("key not found in request.updates: {}", key); + } + } + + Ok(()) +} diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index ec7dd7d89b..730a80a583 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -901,6 +901,20 @@ impl Timeline { file_open, } } + + /// Apply a function to the control file state and persist it. + pub async fn map_control_file( + &self, + f: impl FnOnce(&mut TimelinePersistentState) -> Result, + ) -> Result { + let mut state = self.write_shared_state().await; + let mut persistent_state = state.sk.state.start_change(); + // If f returns error, we abort the change and don't persist anything. + let res = f(&mut persistent_state)?; + // If persisting fails, we abort the change and return error. + state.sk.state.finish_change(&persistent_state).await?; + Ok(res) + } } /// Deletes directory and it's contents. Returns false if directory does not exist. diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index bbabfeedf6..804685589f 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3443,6 +3443,24 @@ class SafekeeperHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json + def patch_control_file( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + patch: Dict[str, Any], + ) -> Dict[str, Any]: + res = self.patch( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/control_file", + json={ + "updates": patch, + "apply_fields": list(patch.keys()), + }, + ) + res.raise_for_status() + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]: res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body) res.raise_for_status() diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 2f8e69165e..dab446fcfd 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1946,3 +1946,51 @@ def test_timeline_copy(neon_env_builder: NeonEnvBuilder, insert_rows: int): assert orig_digest == new_digest # TODO: test timelines can start after copy + + +def test_patch_control_file(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start() + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + endpoint = env.endpoints.create_start("main") + # initialize safekeeper + endpoint.safe_psql("create table t(key int, value text)") + + # update control file + res = ( + env.safekeepers[0] + .http_client() + .patch_control_file( + tenant_id, + timeline_id, + { + "timeline_start_lsn": "0/1", + }, + ) + ) + + timeline_start_lsn_before = res["old_control_file"]["timeline_start_lsn"] + timeline_start_lsn_after = res["new_control_file"]["timeline_start_lsn"] + + log.info(f"patch_control_file response: {res}") + log.info( + f"updated control file timeline_start_lsn, before {timeline_start_lsn_before}, after {timeline_start_lsn_after}" + ) + + assert timeline_start_lsn_after == "0/1" + env.safekeepers[0].stop().start() + + # wait/check that safekeeper is alive + endpoint.safe_psql("insert into t values (1, 'payload')") + + # check that timeline_start_lsn is updated + res = ( + env.safekeepers[0] + .http_client() + .debug_dump({"dump_control_file": "true", "timeline_id": str(timeline_id)}) + ) + log.info(f"dump_control_file response: {res}") + assert res["timelines"][0]["control_file"]["timeline_start_lsn"] == "0/1"