From 8543485e9201da72a5be727ef55fef754ebd3009 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Fri, 28 Apr 2023 17:20:46 +0300 Subject: [PATCH] Pull clone timeline from peer safekeepers (#4089) Add HTTP endpoint to initialize safekeeper timeline from peer safekeepers. This is useful for initializing new safekeeper to replace failed safekeeper. Not fully "correct" in all cases, but should work in most. This code is not suitable for production workloads but can be tested on staging to get started. New endpoint is separated from usual cases and should not affect anything if no one explicitly uses a new endpoint. We can rollback this commit in case of issues. --- Cargo.lock | 2 + safekeeper/Cargo.toml | 6 +- safekeeper/src/http/routes.rs | 52 ++++- safekeeper/src/lib.rs | 1 + safekeeper/src/pull_timeline.rs | 240 +++++++++++++++++++++++ safekeeper/src/timeline.rs | 6 +- safekeeper/src/timelines_global_map.rs | 20 ++ safekeeper/src/wal_storage.rs | 2 +- test_runner/fixtures/neon_fixtures.py | 7 + test_runner/regress/test_wal_acceptor.py | 95 +++++++++ 10 files changed, 424 insertions(+), 7 deletions(-) create mode 100644 safekeeper/src/pull_timeline.rs diff --git a/Cargo.lock b/Cargo.lock index d79837a831..bc63cb0442 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3657,6 +3657,7 @@ dependencies = [ "const_format", "crc32c", "fs2", + "futures", "git-version", "hex", "humantime", @@ -3671,6 +3672,7 @@ dependencies = [ "pq_proto", "regex", "remote_storage", + "reqwest", "safekeeper_api", "serde", "serde_json", diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 00cd111da5..b6e8497809 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -19,11 +19,13 @@ git-version.workspace = true hex.workspace = true humantime.workspace = true hyper.workspace = true +futures.workspace = true once_cell.workspace = true parking_lot.workspace = true postgres.workspace = true postgres-protocol.workspace = true regex.workspace = true +reqwest = { workspace = true, features = ["json"] } serde.workspace = true serde_json.workspace = true serde_with.workspace = true @@ -33,6 +35,7 @@ tokio = { workspace = true, features = ["fs"] } tokio-io-timeout.workspace = true tokio-postgres.workspace = true toml_edit.workspace = true +tempfile.workspace = true tracing.workspace = true url.workspace = true metrics.workspace = true @@ -45,6 +48,3 @@ storage_broker.workspace = true utils.workspace = true workspace_hack.workspace = true - -[dev-dependencies] -tempfile.workspace = true diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index eeb08d2733..a498d868af 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -11,11 +11,13 @@ use std::str::FromStr; use std::sync::Arc; use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; +use tokio::fs::File; +use tokio::io::AsyncReadExt; use tokio::task::JoinError; -use crate::debug_dump; use crate::safekeeper::ServerInfo; use crate::safekeeper::Term; +use crate::{debug_dump, pull_timeline}; use crate::timelines_global_map::TimelineDeleteForceResult; use crate::GlobalTimelines; @@ -177,6 +179,49 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result, ApiError> { + check_permission(&request, None)?; + + let data: pull_timeline::Request = json_request(&mut request).await?; + + let resp = pull_timeline::handle_request(data) + .await + .map_err(ApiError::InternalServerError)?; + json_response(StatusCode::OK, resp) +} + +/// Download a file from the timeline directory. +// TODO: figure out a better way to copy files between safekeepers +async fn timeline_files_handler(request: Request) -> Result, ApiError> { + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + check_permission(&request, Some(ttid.tenant_id))?; + + let filename: String = parse_request_param(&request, "filename")?; + + let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; + + let filepath = tli.timeline_dir.join(filename); + let mut file = File::open(&filepath) + .await + .map_err(|e| ApiError::InternalServerError(e.into()))?; + + let mut content = Vec::new(); + // TODO: don't store files in memory + file.read_to_end(&mut content) + .await + .map_err(|e| ApiError::InternalServerError(e.into()))?; + + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/octet-stream") + .body(Body::from(content)) + .map_err(|e| ApiError::InternalServerError(e.into())) +} + /// Deactivates the timeline and removes its data directory. async fn timeline_delete_force_handler( mut request: Request, @@ -353,6 +398,11 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder timeline_delete_force_handler, ) .delete("/v1/tenant/:tenant_id", tenant_delete_force_handler) + .post("/v1/pull_timeline", timeline_pull_handler) + .get( + "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename", + timeline_files_handler, + ) // for tests .post( "/v1/record_safekeeper_info/:tenant_id/:timeline_id", diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 2c28c5218d..ff621fdbc0 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -15,6 +15,7 @@ pub mod handler; pub mod http; pub mod json_ctrl; pub mod metrics; +pub mod pull_timeline; pub mod receive_wal; pub mod remove_wal; pub mod safekeeper; diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs new file mode 100644 index 0000000000..344b760fd3 --- /dev/null +++ b/safekeeper/src/pull_timeline.rs @@ -0,0 +1,240 @@ +use serde::{Deserialize, Serialize}; + +use anyhow::{bail, Context, Result}; +use tokio::io::AsyncWriteExt; +use tracing::info; +use utils::id::{TenantId, TenantTimelineId, TimelineId}; + +use serde_with::{serde_as, DisplayFromStr}; + +use crate::{ + control_file, debug_dump, + http::routes::TimelineStatus, + wal_storage::{self, Storage}, + GlobalTimelines, +}; + +/// Info about timeline on safekeeper ready for reporting. +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] +pub struct Request { + #[serde_as(as = "DisplayFromStr")] + pub tenant_id: TenantId, + #[serde_as(as = "DisplayFromStr")] + pub timeline_id: TimelineId, + pub http_hosts: Vec, +} + +#[derive(Debug, Serialize)] +pub struct Response { + // Donor safekeeper host + pub safekeeper_host: String, + // TODO: add more fields? +} + +/// Find the most advanced safekeeper and pull timeline from it. +pub async fn handle_request(request: Request) -> Result { + let existing_tli = GlobalTimelines::get(TenantTimelineId::new( + request.tenant_id, + request.timeline_id, + )); + if existing_tli.is_ok() { + bail!("Timeline {} already exists", request.timeline_id); + } + + let client = reqwest::Client::new(); + let http_hosts = request.http_hosts.clone(); + + // Send request to /v1/tenant/:tenant_id/timeline/:timeline_id + let responses = futures::future::join_all(http_hosts.iter().map(|url| { + let url = format!( + "{}/v1/tenant/{}/timeline/{}", + url, request.tenant_id, request.timeline_id + ); + client.get(url).send() + })) + .await; + + let mut statuses = Vec::new(); + for (i, response) in responses.into_iter().enumerate() { + let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?; + let status: crate::http::routes::TimelineStatus = response.json().await?; + statuses.push((status, i)); + } + + // Find the most advanced safekeeper + // TODO: current logic may be wrong, fix it later + let (status, i) = statuses + .into_iter() + .max_by_key(|(status, _)| { + ( + status.acceptor_state.epoch, + status.flush_lsn, + status.commit_lsn, + ) + }) + .unwrap(); + let safekeeper_host = http_hosts[i].clone(); + + assert!(status.tenant_id == request.tenant_id); + assert!(status.timeline_id == request.timeline_id); + + pull_timeline(status, safekeeper_host).await +} + +async fn pull_timeline(status: TimelineStatus, host: String) -> Result { + let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id); + info!( + "Pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}", + ttid, + host, + status.commit_lsn, + status.flush_lsn, + status.acceptor_state.term, + status.acceptor_state.epoch + ); + + let conf = &GlobalTimelines::get_global_config(); + + let client = reqwest::Client::new(); + // TODO: don't use debug dump, it should be used only in tests. + // This is a proof of concept, we should figure out a way + // to use scp without implementing it manually. + + // Implementing our own scp over HTTP. + // At first, we need to fetch list of files from safekeeper. + let dump: debug_dump::Response = client + .get(format!( + "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}", + host, status.tenant_id, status.timeline_id + )) + .send() + .await? + .json() + .await?; + + if dump.timelines.len() != 1 { + bail!( + "Expected to fetch single timeline, got {} timelines", + dump.timelines.len() + ); + } + + let timeline = dump.timelines.into_iter().next().unwrap(); + let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!( + "Timeline {} doesn't have disk content", + ttid + ))?; + + let mut filenames = disk_content + .files + .iter() + .map(|file| file.name.clone()) + .collect::>(); + + // Sort filenames to make sure we pull files in correct order + // After sorting, we should have: + // - 000000010000000000000001 + // - ... + // - 000000010000000000000002.partial + // - safekeeper.control + filenames.sort(); + + // safekeeper.control should be the first file, so we need to move it to the beginning + let control_file_index = filenames + .iter() + .position(|name| name == "safekeeper.control") + .ok_or(anyhow::anyhow!("safekeeper.control not found"))?; + filenames.remove(control_file_index); + filenames.insert(0, "safekeeper.control".to_string()); + + info!( + "Downloading {} files from safekeeper {}", + filenames.len(), + host + ); + + // Creating temp directory for a new timeline. It needs to be + // located on the same filesystem as the rest of the timelines. + + // conf.workdir is usually /storage/safekeeper/data + // will try to transform it into /storage/safekeeper/tmp + let temp_base = conf + .workdir + .parent() + .ok_or(anyhow::anyhow!("workdir has no parent"))? + .join("tmp"); + + tokio::fs::create_dir_all(&temp_base).await?; + + let tli_dir = tempfile::Builder::new() + .suffix("_temptli") + .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id)) + .tempdir_in(temp_base)?; + let tli_dir_path = tli_dir.path().to_owned(); + + // Note: some time happens between fetching list of files and fetching files themselves. + // It's possible that some files will be removed from safekeeper and we will fail to fetch them. + // This function will fail in this case, should be retried by the caller. + for filename in filenames { + let file_path = tli_dir_path.join(&filename); + // /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename + let http_url = format!( + "{}/v1/tenant/{}/timeline/{}/file/{}", + host, status.tenant_id, status.timeline_id, filename + ); + + let mut file = tokio::fs::File::create(&file_path).await?; + let mut response = client.get(&http_url).send().await?; + while let Some(chunk) = response.chunk().await? { + file.write_all(&chunk).await?; + } + } + + // TODO: fsync? + + // Let's create timeline from temp directory and verify that it's correct + + let control_path = tli_dir_path.join("safekeeper.control"); + + let control_store = control_file::FileStorage::load_control_file(control_path)?; + if control_store.server.wal_seg_size == 0 { + bail!("wal_seg_size is not set"); + } + + let wal_store = + wal_storage::PhysicalStorage::new(&ttid, tli_dir_path.clone(), conf, &control_store)?; + + let commit_lsn = status.commit_lsn; + let flush_lsn = wal_store.flush_lsn(); + + info!( + "Finished downloading timeline {}, commit_lsn={}, flush_lsn={}", + ttid, commit_lsn, flush_lsn + ); + assert!(status.commit_lsn <= status.flush_lsn); + + // Move timeline dir to the correct location + let timeline_path = conf.timeline_dir(&ttid); + + info!( + "Moving timeline {} from {} to {}", + ttid, + tli_dir_path.display(), + timeline_path.display() + ); + tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?; + tokio::fs::rename(tli_dir_path, &timeline_path).await?; + + let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?; + + info!( + "Loaded timeline {}, flush_lsn={}", + ttid, + tli.get_flush_lsn() + ); + + Ok(Response { + safekeeper_host: host, + }) +} diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 64ca6967df..2dbf215998 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -129,7 +129,8 @@ impl SharedState { // We don't want to write anything to disk, because we may have existing timeline there. // These functions should not change anything on disk. let control_store = control_file::FileStorage::create_new(ttid, conf, state)?; - let wal_store = wal_storage::PhysicalStorage::new(ttid, conf, &control_store)?; + let wal_store = + wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?; let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?; Ok(Self { @@ -149,7 +150,8 @@ impl SharedState { bail!(TimelineError::UninitializedWalSegSize(*ttid)); } - let wal_store = wal_storage::PhysicalStorage::new(ttid, conf, &control_store)?; + let wal_store = + wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?; Ok(Self { sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?, diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 868ee97645..41809794dc 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -159,6 +159,26 @@ impl GlobalTimelines { Ok(()) } + /// Load timeline from disk to the memory. + pub fn load_timeline(ttid: TenantTimelineId) -> Result> { + let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies(); + + match Timeline::load_timeline(conf, ttid, wal_backup_launcher_tx) { + Ok(timeline) => { + let tli = Arc::new(timeline); + // TODO: prevent concurrent timeline creation/loading + TIMELINES_STATE + .lock() + .unwrap() + .timelines + .insert(ttid, tli.clone()); + Ok(tli) + } + // If we can't load a timeline, it's bad. Caller will figure it out. + Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e), + } + } + /// Get the number of timelines in the map. pub fn timelines_count() -> usize { TIMELINES_STATE.lock().unwrap().timelines.len() diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 54e27714ea..5ef22b2f6a 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -112,10 +112,10 @@ impl PhysicalStorage { /// the disk. Otherwise, all LSNs are set to zero. pub fn new( ttid: &TenantTimelineId, + timeline_dir: PathBuf, conf: &SafeKeeperConf, state: &SafeKeeperState, ) -> Result { - let timeline_dir = conf.timeline_dir(ttid); let wal_seg_size = state.server.wal_seg_size as usize; // Find out where stored WAL ends, starting at commit_lsn which is a diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index af7571cc4d..79b2e5b290 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2661,6 +2661,13 @@ class SafekeeperHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json + def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]: + res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body) + res.raise_for_status() + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + def timeline_create( self, tenant_id: TenantId, timeline_id: TimelineId, pg_version: int, commit_lsn: Lsn ): diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index e8cfa4f318..fed5f325ca 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1254,3 +1254,98 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): with closing(endpoint_other.connect()) as conn: with conn.cursor() as cur: cur.execute("INSERT INTO t (key) VALUES (123)") + + +def test_pull_timeline(neon_env_builder: NeonEnvBuilder): + def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str: + return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names]) + + def execute_payload(endpoint: Endpoint): + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)") + cur.execute("INSERT INTO t VALUES (0, 'something')") + sum_before = query_scalar(cur, "SELECT SUM(key) FROM t") + + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + sum_after = query_scalar(cur, "SELECT SUM(key) FROM t") + assert sum_after == sum_before + 5000050000 + + def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId): + for sk in safekeepers: + http_cli = sk.http_client() + try: + status = http_cli.timeline_status(tenant_id, timeline_id) + log.info(f"Safekeeper {sk.id} status: {status}") + except Exception as e: + log.info(f"Safekeeper {sk.id} status error: {e}") + + neon_env_builder.num_safekeepers = 4 + env = neon_env_builder.init_start() + env.neon_cli.create_branch("test_pull_timeline") + + log.info("Use only first 3 safekeepers") + env.safekeepers[3].stop() + active_safekeepers = [1, 2, 3] + endpoint = env.endpoints.create("test_pull_timeline") + endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.start() + + # learn neon timeline from compute + tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) + timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + + execute_payload(endpoint) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Kill safekeeper 2, continue with payload") + env.safekeepers[1].stop(immediate=True) + execute_payload(endpoint) + + log.info("Initialize new safekeeper 4, pull data from 1 & 3") + env.safekeepers[3].start() + + res = ( + env.safekeepers[3] + .http_client() + .pull_timeline( + { + "tenant_id": str(tenant_id), + "timeline_id": str(timeline_id), + "http_hosts": [ + f"http://localhost:{env.safekeepers[0].port.http}", + f"http://localhost:{env.safekeepers[2].port.http}", + ], + } + ) + ) + log.info("Finished pulling timeline") + log.info(res) + + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Restarting compute with new config to verify that it works") + active_safekeepers = [1, 3, 4] + + endpoint.stop_and_destroy().create("test_pull_timeline") + endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.start() + + execute_payload(endpoint) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Stop sk1 (simulate failure) and use only quorum of sk3 and sk4") + env.safekeepers[0].stop(immediate=True) + execute_payload(endpoint) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Restart sk4 and and use quorum of sk1 and sk4") + env.safekeepers[3].stop() + env.safekeepers[2].stop() + env.safekeepers[0].start() + env.safekeepers[3].start() + + execute_payload(endpoint) + show_statuses(env.safekeepers, tenant_id, timeline_id)