diff --git a/Cargo.lock b/Cargo.lock
index d79837a831..bc63cb0442 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3657,6 +3657,7 @@ dependencies = [
"const_format",
"crc32c",
"fs2",
+ "futures",
"git-version",
"hex",
"humantime",
@@ -3671,6 +3672,7 @@ dependencies = [
"pq_proto",
"regex",
"remote_storage",
+ "reqwest",
"safekeeper_api",
"serde",
"serde_json",
diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml
index 00cd111da5..b6e8497809 100644
--- a/safekeeper/Cargo.toml
+++ b/safekeeper/Cargo.toml
@@ -19,11 +19,13 @@ git-version.workspace = true
hex.workspace = true
humantime.workspace = true
hyper.workspace = true
+futures.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
postgres.workspace = true
postgres-protocol.workspace = true
regex.workspace = true
+reqwest = { workspace = true, features = ["json"] }
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
@@ -33,6 +35,7 @@ tokio = { workspace = true, features = ["fs"] }
tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
toml_edit.workspace = true
+tempfile.workspace = true
tracing.workspace = true
url.workspace = true
metrics.workspace = true
@@ -45,6 +48,3 @@ storage_broker.workspace = true
utils.workspace = true
workspace_hack.workspace = true
-
-[dev-dependencies]
-tempfile.workspace = true
diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs
index eeb08d2733..a498d868af 100644
--- a/safekeeper/src/http/routes.rs
+++ b/safekeeper/src/http/routes.rs
@@ -11,11 +11,13 @@ use std::str::FromStr;
use std::sync::Arc;
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
+use tokio::fs::File;
+use tokio::io::AsyncReadExt;
use tokio::task::JoinError;
-use crate::debug_dump;
use crate::safekeeper::ServerInfo;
use crate::safekeeper::Term;
+use crate::{debug_dump, pull_timeline};
use crate::timelines_global_map::TimelineDeleteForceResult;
use crate::GlobalTimelines;
@@ -177,6 +179,49 @@ async fn timeline_create_handler(mut request: Request
) -> Result) -> Result, ApiError> {
+ check_permission(&request, None)?;
+
+ let data: pull_timeline::Request = json_request(&mut request).await?;
+
+ let resp = pull_timeline::handle_request(data)
+ .await
+ .map_err(ApiError::InternalServerError)?;
+ json_response(StatusCode::OK, resp)
+}
+
+/// Download a file from the timeline directory.
+// TODO: figure out a better way to copy files between safekeepers
+async fn timeline_files_handler(request: Request) -> Result, ApiError> {
+ let ttid = TenantTimelineId::new(
+ parse_request_param(&request, "tenant_id")?,
+ parse_request_param(&request, "timeline_id")?,
+ );
+ check_permission(&request, Some(ttid.tenant_id))?;
+
+ let filename: String = parse_request_param(&request, "filename")?;
+
+ let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
+
+ let filepath = tli.timeline_dir.join(filename);
+ let mut file = File::open(&filepath)
+ .await
+ .map_err(|e| ApiError::InternalServerError(e.into()))?;
+
+ let mut content = Vec::new();
+ // TODO: don't store files in memory
+ file.read_to_end(&mut content)
+ .await
+ .map_err(|e| ApiError::InternalServerError(e.into()))?;
+
+ Response::builder()
+ .status(StatusCode::OK)
+ .header("Content-Type", "application/octet-stream")
+ .body(Body::from(content))
+ .map_err(|e| ApiError::InternalServerError(e.into()))
+}
+
/// Deactivates the timeline and removes its data directory.
async fn timeline_delete_force_handler(
mut request: Request,
@@ -353,6 +398,11 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder
timeline_delete_force_handler,
)
.delete("/v1/tenant/:tenant_id", tenant_delete_force_handler)
+ .post("/v1/pull_timeline", timeline_pull_handler)
+ .get(
+ "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
+ timeline_files_handler,
+ )
// for tests
.post(
"/v1/record_safekeeper_info/:tenant_id/:timeline_id",
diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs
index 2c28c5218d..ff621fdbc0 100644
--- a/safekeeper/src/lib.rs
+++ b/safekeeper/src/lib.rs
@@ -15,6 +15,7 @@ pub mod handler;
pub mod http;
pub mod json_ctrl;
pub mod metrics;
+pub mod pull_timeline;
pub mod receive_wal;
pub mod remove_wal;
pub mod safekeeper;
diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs
new file mode 100644
index 0000000000..344b760fd3
--- /dev/null
+++ b/safekeeper/src/pull_timeline.rs
@@ -0,0 +1,240 @@
+use serde::{Deserialize, Serialize};
+
+use anyhow::{bail, Context, Result};
+use tokio::io::AsyncWriteExt;
+use tracing::info;
+use utils::id::{TenantId, TenantTimelineId, TimelineId};
+
+use serde_with::{serde_as, DisplayFromStr};
+
+use crate::{
+ control_file, debug_dump,
+ http::routes::TimelineStatus,
+ wal_storage::{self, Storage},
+ GlobalTimelines,
+};
+
+/// Info about timeline on safekeeper ready for reporting.
+#[serde_as]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Request {
+ #[serde_as(as = "DisplayFromStr")]
+ pub tenant_id: TenantId,
+ #[serde_as(as = "DisplayFromStr")]
+ pub timeline_id: TimelineId,
+ pub http_hosts: Vec,
+}
+
+#[derive(Debug, Serialize)]
+pub struct Response {
+ // Donor safekeeper host
+ pub safekeeper_host: String,
+ // TODO: add more fields?
+}
+
+/// Find the most advanced safekeeper and pull timeline from it.
+pub async fn handle_request(request: Request) -> Result {
+ let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
+ request.tenant_id,
+ request.timeline_id,
+ ));
+ if existing_tli.is_ok() {
+ bail!("Timeline {} already exists", request.timeline_id);
+ }
+
+ let client = reqwest::Client::new();
+ let http_hosts = request.http_hosts.clone();
+
+ // Send request to /v1/tenant/:tenant_id/timeline/:timeline_id
+ let responses = futures::future::join_all(http_hosts.iter().map(|url| {
+ let url = format!(
+ "{}/v1/tenant/{}/timeline/{}",
+ url, request.tenant_id, request.timeline_id
+ );
+ client.get(url).send()
+ }))
+ .await;
+
+ let mut statuses = Vec::new();
+ for (i, response) in responses.into_iter().enumerate() {
+ let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?;
+ let status: crate::http::routes::TimelineStatus = response.json().await?;
+ statuses.push((status, i));
+ }
+
+ // Find the most advanced safekeeper
+ // TODO: current logic may be wrong, fix it later
+ let (status, i) = statuses
+ .into_iter()
+ .max_by_key(|(status, _)| {
+ (
+ status.acceptor_state.epoch,
+ status.flush_lsn,
+ status.commit_lsn,
+ )
+ })
+ .unwrap();
+ let safekeeper_host = http_hosts[i].clone();
+
+ assert!(status.tenant_id == request.tenant_id);
+ assert!(status.timeline_id == request.timeline_id);
+
+ pull_timeline(status, safekeeper_host).await
+}
+
+async fn pull_timeline(status: TimelineStatus, host: String) -> Result {
+ let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
+ info!(
+ "Pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
+ ttid,
+ host,
+ status.commit_lsn,
+ status.flush_lsn,
+ status.acceptor_state.term,
+ status.acceptor_state.epoch
+ );
+
+ let conf = &GlobalTimelines::get_global_config();
+
+ let client = reqwest::Client::new();
+ // TODO: don't use debug dump, it should be used only in tests.
+ // This is a proof of concept, we should figure out a way
+ // to use scp without implementing it manually.
+
+ // Implementing our own scp over HTTP.
+ // At first, we need to fetch list of files from safekeeper.
+ let dump: debug_dump::Response = client
+ .get(format!(
+ "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
+ host, status.tenant_id, status.timeline_id
+ ))
+ .send()
+ .await?
+ .json()
+ .await?;
+
+ if dump.timelines.len() != 1 {
+ bail!(
+ "Expected to fetch single timeline, got {} timelines",
+ dump.timelines.len()
+ );
+ }
+
+ let timeline = dump.timelines.into_iter().next().unwrap();
+ let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
+ "Timeline {} doesn't have disk content",
+ ttid
+ ))?;
+
+ let mut filenames = disk_content
+ .files
+ .iter()
+ .map(|file| file.name.clone())
+ .collect::>();
+
+ // Sort filenames to make sure we pull files in correct order
+ // After sorting, we should have:
+ // - 000000010000000000000001
+ // - ...
+ // - 000000010000000000000002.partial
+ // - safekeeper.control
+ filenames.sort();
+
+ // safekeeper.control should be the first file, so we need to move it to the beginning
+ let control_file_index = filenames
+ .iter()
+ .position(|name| name == "safekeeper.control")
+ .ok_or(anyhow::anyhow!("safekeeper.control not found"))?;
+ filenames.remove(control_file_index);
+ filenames.insert(0, "safekeeper.control".to_string());
+
+ info!(
+ "Downloading {} files from safekeeper {}",
+ filenames.len(),
+ host
+ );
+
+ // Creating temp directory for a new timeline. It needs to be
+ // located on the same filesystem as the rest of the timelines.
+
+ // conf.workdir is usually /storage/safekeeper/data
+ // will try to transform it into /storage/safekeeper/tmp
+ let temp_base = conf
+ .workdir
+ .parent()
+ .ok_or(anyhow::anyhow!("workdir has no parent"))?
+ .join("tmp");
+
+ tokio::fs::create_dir_all(&temp_base).await?;
+
+ let tli_dir = tempfile::Builder::new()
+ .suffix("_temptli")
+ .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
+ .tempdir_in(temp_base)?;
+ let tli_dir_path = tli_dir.path().to_owned();
+
+ // Note: some time happens between fetching list of files and fetching files themselves.
+ // It's possible that some files will be removed from safekeeper and we will fail to fetch them.
+ // This function will fail in this case, should be retried by the caller.
+ for filename in filenames {
+ let file_path = tli_dir_path.join(&filename);
+ // /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename
+ let http_url = format!(
+ "{}/v1/tenant/{}/timeline/{}/file/{}",
+ host, status.tenant_id, status.timeline_id, filename
+ );
+
+ let mut file = tokio::fs::File::create(&file_path).await?;
+ let mut response = client.get(&http_url).send().await?;
+ while let Some(chunk) = response.chunk().await? {
+ file.write_all(&chunk).await?;
+ }
+ }
+
+ // TODO: fsync?
+
+ // Let's create timeline from temp directory and verify that it's correct
+
+ let control_path = tli_dir_path.join("safekeeper.control");
+
+ let control_store = control_file::FileStorage::load_control_file(control_path)?;
+ if control_store.server.wal_seg_size == 0 {
+ bail!("wal_seg_size is not set");
+ }
+
+ let wal_store =
+ wal_storage::PhysicalStorage::new(&ttid, tli_dir_path.clone(), conf, &control_store)?;
+
+ let commit_lsn = status.commit_lsn;
+ let flush_lsn = wal_store.flush_lsn();
+
+ info!(
+ "Finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
+ ttid, commit_lsn, flush_lsn
+ );
+ assert!(status.commit_lsn <= status.flush_lsn);
+
+ // Move timeline dir to the correct location
+ let timeline_path = conf.timeline_dir(&ttid);
+
+ info!(
+ "Moving timeline {} from {} to {}",
+ ttid,
+ tli_dir_path.display(),
+ timeline_path.display()
+ );
+ tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
+ tokio::fs::rename(tli_dir_path, &timeline_path).await?;
+
+ let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?;
+
+ info!(
+ "Loaded timeline {}, flush_lsn={}",
+ ttid,
+ tli.get_flush_lsn()
+ );
+
+ Ok(Response {
+ safekeeper_host: host,
+ })
+}
diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs
index 64ca6967df..2dbf215998 100644
--- a/safekeeper/src/timeline.rs
+++ b/safekeeper/src/timeline.rs
@@ -129,7 +129,8 @@ impl SharedState {
// We don't want to write anything to disk, because we may have existing timeline there.
// These functions should not change anything on disk.
let control_store = control_file::FileStorage::create_new(ttid, conf, state)?;
- let wal_store = wal_storage::PhysicalStorage::new(ttid, conf, &control_store)?;
+ let wal_store =
+ wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
Ok(Self {
@@ -149,7 +150,8 @@ impl SharedState {
bail!(TimelineError::UninitializedWalSegSize(*ttid));
}
- let wal_store = wal_storage::PhysicalStorage::new(ttid, conf, &control_store)?;
+ let wal_store =
+ wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
Ok(Self {
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs
index 868ee97645..41809794dc 100644
--- a/safekeeper/src/timelines_global_map.rs
+++ b/safekeeper/src/timelines_global_map.rs
@@ -159,6 +159,26 @@ impl GlobalTimelines {
Ok(())
}
+ /// Load timeline from disk to the memory.
+ pub fn load_timeline(ttid: TenantTimelineId) -> Result> {
+ let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
+
+ match Timeline::load_timeline(conf, ttid, wal_backup_launcher_tx) {
+ Ok(timeline) => {
+ let tli = Arc::new(timeline);
+ // TODO: prevent concurrent timeline creation/loading
+ TIMELINES_STATE
+ .lock()
+ .unwrap()
+ .timelines
+ .insert(ttid, tli.clone());
+ Ok(tli)
+ }
+ // If we can't load a timeline, it's bad. Caller will figure it out.
+ Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
+ }
+ }
+
/// Get the number of timelines in the map.
pub fn timelines_count() -> usize {
TIMELINES_STATE.lock().unwrap().timelines.len()
diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs
index 54e27714ea..5ef22b2f6a 100644
--- a/safekeeper/src/wal_storage.rs
+++ b/safekeeper/src/wal_storage.rs
@@ -112,10 +112,10 @@ impl PhysicalStorage {
/// the disk. Otherwise, all LSNs are set to zero.
pub fn new(
ttid: &TenantTimelineId,
+ timeline_dir: PathBuf,
conf: &SafeKeeperConf,
state: &SafeKeeperState,
) -> Result {
- let timeline_dir = conf.timeline_dir(ttid);
let wal_seg_size = state.server.wal_seg_size as usize;
// Find out where stored WAL ends, starting at commit_lsn which is a
diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py
index af7571cc4d..79b2e5b290 100644
--- a/test_runner/fixtures/neon_fixtures.py
+++ b/test_runner/fixtures/neon_fixtures.py
@@ -2661,6 +2661,13 @@ class SafekeeperHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
+ def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
+ res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
+ res.raise_for_status()
+ res_json = res.json()
+ assert isinstance(res_json, dict)
+ return res_json
+
def timeline_create(
self, tenant_id: TenantId, timeline_id: TimelineId, pg_version: int, commit_lsn: Lsn
):
diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py
index e8cfa4f318..fed5f325ca 100644
--- a/test_runner/regress/test_wal_acceptor.py
+++ b/test_runner/regress/test_wal_acceptor.py
@@ -1254,3 +1254,98 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
with closing(endpoint_other.connect()) as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO t (key) VALUES (123)")
+
+
+def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
+ def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str:
+ return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names])
+
+ def execute_payload(endpoint: Endpoint):
+ with closing(endpoint.connect()) as conn:
+ with conn.cursor() as cur:
+ # we rely upon autocommit after each statement
+ # as waiting for acceptors happens there
+ cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)")
+ cur.execute("INSERT INTO t VALUES (0, 'something')")
+ sum_before = query_scalar(cur, "SELECT SUM(key) FROM t")
+
+ cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
+ sum_after = query_scalar(cur, "SELECT SUM(key) FROM t")
+ assert sum_after == sum_before + 5000050000
+
+ def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId):
+ for sk in safekeepers:
+ http_cli = sk.http_client()
+ try:
+ status = http_cli.timeline_status(tenant_id, timeline_id)
+ log.info(f"Safekeeper {sk.id} status: {status}")
+ except Exception as e:
+ log.info(f"Safekeeper {sk.id} status error: {e}")
+
+ neon_env_builder.num_safekeepers = 4
+ env = neon_env_builder.init_start()
+ env.neon_cli.create_branch("test_pull_timeline")
+
+ log.info("Use only first 3 safekeepers")
+ env.safekeepers[3].stop()
+ active_safekeepers = [1, 2, 3]
+ endpoint = env.endpoints.create("test_pull_timeline")
+ endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
+ endpoint.start()
+
+ # learn neon timeline from compute
+ tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
+ timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
+
+ execute_payload(endpoint)
+ show_statuses(env.safekeepers, tenant_id, timeline_id)
+
+ log.info("Kill safekeeper 2, continue with payload")
+ env.safekeepers[1].stop(immediate=True)
+ execute_payload(endpoint)
+
+ log.info("Initialize new safekeeper 4, pull data from 1 & 3")
+ env.safekeepers[3].start()
+
+ res = (
+ env.safekeepers[3]
+ .http_client()
+ .pull_timeline(
+ {
+ "tenant_id": str(tenant_id),
+ "timeline_id": str(timeline_id),
+ "http_hosts": [
+ f"http://localhost:{env.safekeepers[0].port.http}",
+ f"http://localhost:{env.safekeepers[2].port.http}",
+ ],
+ }
+ )
+ )
+ log.info("Finished pulling timeline")
+ log.info(res)
+
+ show_statuses(env.safekeepers, tenant_id, timeline_id)
+
+ log.info("Restarting compute with new config to verify that it works")
+ active_safekeepers = [1, 3, 4]
+
+ endpoint.stop_and_destroy().create("test_pull_timeline")
+ endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
+ endpoint.start()
+
+ execute_payload(endpoint)
+ show_statuses(env.safekeepers, tenant_id, timeline_id)
+
+ log.info("Stop sk1 (simulate failure) and use only quorum of sk3 and sk4")
+ env.safekeepers[0].stop(immediate=True)
+ execute_payload(endpoint)
+ show_statuses(env.safekeepers, tenant_id, timeline_id)
+
+ log.info("Restart sk4 and and use quorum of sk1 and sk4")
+ env.safekeepers[3].stop()
+ env.safekeepers[2].stop()
+ env.safekeepers[0].start()
+ env.safekeepers[3].start()
+
+ execute_payload(endpoint)
+ show_statuses(env.safekeepers, tenant_id, timeline_id)