Add API for safekeeper timeline copy (#6091)

Implement API for cloning a single timeline inside a safekeeper. Also
add API for calculating a sha256 hash of WAL, which is used in tests.

`/copy` API works by copying objects inside S3 for all but the last
segments, and the last segments are copied on-disk. A special temporary
directory is created for a timeline, because copy can take a lot of
time, especially for large timelines. After all files segments have been
prepared, this directory is mounted to the main tree and timeline is
loaded to memory.

Some caveats:
- large timelines can take a lot of time to copy, because we need to
copy many S3 segments
- caller should wait for HTTP call to finish indefinetely and don't
close the HTTP connection, because it will stop the process, which is
not continued in the background
- `until_lsn` must be a valid LSN, otherwise bad things can happen
- API will return 200 if specified `timeline_id` already exists, even if
it's not a copy
- each safekeeper will try to copy S3 segments, so it's better to not
call this API in-parallel on different safekeepers
This commit is contained in:
Arthur Petukhovsky
2024-01-04 21:40:38 +04:00
committed by GitHub
parent 18e9208158
commit f3b5db1443
21 changed files with 727 additions and 55 deletions

1
Cargo.lock generated
View File

@@ -4475,6 +4475,7 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"sha2",
"signal-hook",
"storage_broker",
"thiserror",

View File

@@ -322,6 +322,12 @@ impl RemoteStorage for AzureBlobStorage {
}
Ok(())
}
async fn copy(&self, _from: &RemotePath, _to: &RemotePath) -> anyhow::Result<()> {
Err(anyhow::anyhow!(
"copy for azure blob storage is not implemented"
))
}
}
pin_project_lite::pin_project! {

View File

@@ -207,6 +207,9 @@ pub trait RemoteStorage: Send + Sync + 'static {
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>;
/// Copy a remote object inside a bucket from one path to another.
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>;
}
pub type DownloadStream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync>>;
@@ -374,6 +377,15 @@ impl GenericRemoteStorage {
Self::Unreliable(s) => s.delete_objects(paths).await,
}
}
pub async fn copy_object(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.copy(from, to).await,
Self::AwsS3(s) => s.copy(from, to).await,
Self::AzureBlob(s) => s.copy(from, to).await,
Self::Unreliable(s) => s.copy(from, to).await,
}
}
}
impl GenericRemoteStorage {
@@ -660,6 +672,7 @@ impl ConcurrencyLimiter {
RequestKind::Put => &self.write,
RequestKind::List => &self.read,
RequestKind::Delete => &self.write,
RequestKind::Copy => &self.write,
}
}

View File

@@ -409,6 +409,20 @@ impl RemoteStorage for LocalFs {
}
Ok(())
}
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
let from_path = from.with_base(&self.storage_root);
let to_path = to.with_base(&self.storage_root);
create_target_directory(&to_path).await?;
fs::copy(&from_path, &to_path).await.with_context(|| {
format!(
"Failed to copy file from '{from_path}' to '{to_path}'",
from_path = from_path,
to_path = to_path
)
})?;
Ok(())
}
}
fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {

View File

@@ -493,6 +493,38 @@ impl RemoteStorage for S3Bucket {
Ok(())
}
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
let kind = RequestKind::Copy;
let _guard = self.permit(kind).await;
let started_at = start_measuring_requests(kind);
// we need to specify bucket_name as a prefix
let copy_source = format!(
"{}/{}",
self.bucket_name,
self.relative_path_to_s3_object(from)
);
let res = self
.client
.copy_object()
.bucket(self.bucket_name.clone())
.key(self.relative_path_to_s3_object(to))
.copy_source(copy_source)
.send()
.await;
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &res, started_at);
res?;
Ok(())
}
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
// if prefix is not none then download file `prefix/from`
// if prefix is none then download file `from`

View File

@@ -11,6 +11,7 @@ pub(crate) enum RequestKind {
Put = 1,
Delete = 2,
List = 3,
Copy = 4,
}
use RequestKind::*;
@@ -22,6 +23,7 @@ impl RequestKind {
Put => "put_object",
Delete => "delete_object",
List => "list_objects",
Copy => "copy_object",
}
}
const fn as_index(&self) -> usize {
@@ -29,7 +31,7 @@ impl RequestKind {
}
}
pub(super) struct RequestTyped<C>([C; 4]);
pub(super) struct RequestTyped<C>([C; 5]);
impl<C> RequestTyped<C> {
pub(super) fn get(&self, kind: RequestKind) -> &C {
@@ -38,8 +40,8 @@ impl<C> RequestTyped<C> {
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
use RequestKind::*;
let mut it = [Get, Put, Delete, List].into_iter();
let arr = std::array::from_fn::<C, 4, _>(|index| {
let mut it = [Get, Put, Delete, List, Copy].into_iter();
let arr = std::array::from_fn::<C, 5, _>(|index| {
let next = it.next().unwrap();
assert_eq!(index, next.as_index());
f(next)

View File

@@ -162,4 +162,11 @@ impl RemoteStorage for UnreliableWrapper {
}
Ok(())
}
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> {
// copy is equivalent to download + upload
self.attempt(RemoteOp::Download(from.clone()))?;
self.attempt(RemoteOp::Upload(to.clone()))?;
self.inner.copy_object(from, to).await
}
}

View File

@@ -51,3 +51,9 @@ pub struct SkTimelineInfo {
#[serde(default)]
pub http_connstr: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineCopyRequest {
pub target_timeline_id: TimelineId,
pub until_lsn: Lsn,
}

View File

@@ -54,6 +54,7 @@ postgres_ffi.workspace = true
pq_proto.workspace = true
remote_storage.workspace = true
safekeeper_api.workspace = true
sha2.workspace = true
sd-notify.workspace = true
storage_broker.workspace = true
tokio-stream.workspace = true

View File

@@ -66,12 +66,10 @@ impl FileStorage {
/// Create file storage for a new timeline, but don't persist it yet.
pub fn create_new(
ttid: &TenantTimelineId,
timeline_dir: Utf8PathBuf,
conf: &SafeKeeperConf,
state: SafeKeeperState,
) -> Result<FileStorage> {
let timeline_dir = conf.timeline_dir(ttid);
let store = FileStorage {
timeline_dir,
conf: conf.clone(),
@@ -277,7 +275,8 @@ mod test {
.await
.expect("failed to create timeline dir");
let state = SafeKeeperState::empty();
let storage = FileStorage::create_new(ttid, conf, state.clone())?;
let timeline_dir = conf.timeline_dir(ttid);
let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?;
Ok((storage, state))
}

View File

@@ -0,0 +1,250 @@
use std::sync::Arc;
use anyhow::{bail, Result};
use camino::Utf8PathBuf;
use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
use tokio::{
fs::OpenOptions,
io::{AsyncSeekExt, AsyncWriteExt},
};
use tracing::{info, warn};
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
control_file::{FileStorage, Storage},
pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline},
safekeeper::SafeKeeperState,
timeline::{Timeline, TimelineError},
wal_backup::copy_s3_segments,
wal_storage::{wal_file_paths, WalReader},
GlobalTimelines, SafeKeeperConf,
};
// we don't want to have more than 10 segments on disk after copy, because they take space
const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64;
pub struct Request {
pub source: Arc<Timeline>,
pub until_lsn: Lsn,
pub destination_ttid: TenantTimelineId,
}
pub async fn handle_request(request: Request) -> Result<()> {
// TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
// if LSN will point to the middle of a WAL record, timeline will be in "broken" state
match GlobalTimelines::get(request.destination_ttid) {
// timeline already exists. would be good to check that this timeline is the copy
// of the source timeline, but it isn't obvious how to do that
Ok(_) => return Ok(()),
// timeline not found, we are going to create it
Err(TimelineError::NotFound(_)) => {}
// error, probably timeline was deleted
res => {
res?;
}
}
let conf = &GlobalTimelines::get_global_config();
let ttid = request.destination_ttid;
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
let (mem_state, state) = request.source.get_state().await;
let start_lsn = state.timeline_start_lsn;
if start_lsn == Lsn::INVALID {
bail!("timeline is not initialized");
}
let backup_lsn = mem_state.backup_lsn;
{
let commit_lsn = mem_state.commit_lsn;
let flush_lsn = request.source.get_flush_lsn().await;
info!(
"collected info about source timeline: start_lsn={}, backup_lsn={}, commit_lsn={}, flush_lsn={}",
start_lsn, backup_lsn, commit_lsn, flush_lsn
);
assert!(backup_lsn >= start_lsn);
assert!(commit_lsn >= start_lsn);
assert!(flush_lsn >= start_lsn);
if request.until_lsn > flush_lsn {
bail!("requested LSN is beyond the end of the timeline");
}
if request.until_lsn < start_lsn {
bail!("requested LSN is before the start of the timeline");
}
if request.until_lsn > commit_lsn {
warn!("copy_timeline WAL is not fully committed");
}
if backup_lsn < request.until_lsn && request.until_lsn.0 - backup_lsn.0 > MAX_BACKUP_LAG {
// we have a lot of segments that are not backed up. we can try to wait here until
// segments will be backed up to remote storage, but it's not clear how long to wait
bail!("too many segments are not backed up");
}
}
let wal_seg_size = state.server.wal_seg_size as usize;
if wal_seg_size == 0 {
bail!("wal_seg_size is not set");
}
let first_segment = start_lsn.segment_number(wal_seg_size);
let last_segment = request.until_lsn.segment_number(wal_seg_size);
let new_backup_lsn = {
// we can't have new backup_lsn greater than existing backup_lsn or start of the last segment
let max_backup_lsn = backup_lsn.min(Lsn(last_segment * wal_seg_size as u64));
if max_backup_lsn <= start_lsn {
// probably we are starting from the first segment, which was not backed up yet.
// note that start_lsn can be in the middle of the segment
start_lsn
} else {
// we have some segments backed up, so we will assume all WAL below max_backup_lsn is backed up
assert!(max_backup_lsn.segment_offset(wal_seg_size) == 0);
max_backup_lsn
}
};
// all previous segments will be copied inside S3
let first_ondisk_segment = new_backup_lsn.segment_number(wal_seg_size);
assert!(first_ondisk_segment <= last_segment);
assert!(first_ondisk_segment >= first_segment);
copy_s3_segments(
wal_seg_size,
&request.source.ttid,
&request.destination_ttid,
first_segment,
first_ondisk_segment,
)
.await?;
copy_disk_segments(
conf,
&state,
wal_seg_size,
&request.source.ttid,
new_backup_lsn,
request.until_lsn,
&tli_dir_path,
)
.await?;
let mut new_state = SafeKeeperState::new(
&request.destination_ttid,
state.server.clone(),
vec![],
request.until_lsn,
start_lsn,
);
new_state.timeline_start_lsn = start_lsn;
new_state.peer_horizon_lsn = request.until_lsn;
new_state.backup_lsn = new_backup_lsn;
let mut file_storage = FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone())?;
file_storage.persist(&new_state).await?;
// now we have a ready timeline in a temp directory
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
load_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
Ok(())
}
async fn copy_disk_segments(
conf: &SafeKeeperConf,
persisted_state: &SafeKeeperState,
wal_seg_size: usize,
source_ttid: &TenantTimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
tli_dir_path: &Utf8PathBuf,
) -> Result<()> {
let mut wal_reader = WalReader::new(
conf.workdir.clone(),
conf.timeline_dir(source_ttid),
persisted_state,
start_lsn,
true,
)?;
let mut buf = [0u8; MAX_SEND_SIZE];
let first_segment = start_lsn.segment_number(wal_seg_size);
let last_segment = end_lsn.segment_number(wal_seg_size);
for segment in first_segment..=last_segment {
let segment_start = segment * wal_seg_size as u64;
let segment_end = segment_start + wal_seg_size as u64;
let copy_start = segment_start.max(start_lsn.0);
let copy_end = segment_end.min(end_lsn.0);
let copy_start = copy_start - segment_start;
let copy_end = copy_end - segment_start;
let wal_file_path = {
let (normal, partial) = wal_file_paths(tli_dir_path, segment, wal_seg_size)?;
if segment == last_segment {
partial
} else {
normal
}
};
write_segment(
&mut buf,
&wal_file_path,
wal_seg_size as u64,
copy_start,
copy_end,
&mut wal_reader,
)
.await?;
}
Ok(())
}
async fn write_segment(
buf: &mut [u8],
file_path: &Utf8PathBuf,
wal_seg_size: u64,
from: u64,
to: u64,
reader: &mut WalReader,
) -> Result<()> {
assert!(from <= to);
assert!(to <= wal_seg_size);
let mut file = OpenOptions::new()
.create(true)
.write(true)
.open(&file_path)
.await?;
// maybe fill with zeros, as in wal_storage.rs?
file.set_len(wal_seg_size).await?;
file.seek(std::io::SeekFrom::Start(from)).await?;
let mut bytes_left = to - from;
while bytes_left > 0 {
let len = bytes_left as usize;
let len = len.min(buf.len());
let len = reader.read(&mut buf[..len]).await?;
file.write_all(&buf[..len]).await?;
bytes_left -= len as u64;
}
file.flush().await?;
file.sync_all().await?;
Ok(())
}

View File

@@ -7,13 +7,16 @@ use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::bail;
use anyhow::Result;
use camino::Utf8Path;
use chrono::{DateTime, Utc};
use postgres_ffi::XLogSegNo;
use postgres_ffi::MAX_SEND_SIZE;
use serde::Deserialize;
use serde::Serialize;
use sha2::{Digest, Sha256};
use utils::id::NodeId;
use utils::id::TenantTimelineId;
use utils::id::{TenantId, TimelineId};
@@ -25,6 +28,7 @@ use crate::safekeeper::TermHistory;
use crate::SafeKeeperConf;
use crate::send_wal::WalSenderState;
use crate::wal_storage::WalReader;
use crate::GlobalTimelines;
/// Various filters that influence the resulting JSON output.
@@ -300,3 +304,56 @@ fn build_config(config: SafeKeeperConf) -> Config {
wal_backup_enabled: config.wal_backup_enabled,
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineDigestRequest {
pub from_lsn: Lsn,
pub until_lsn: Lsn,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TimelineDigest {
pub sha256: String,
}
pub async fn calculate_digest(
tli: &Arc<crate::timeline::Timeline>,
request: TimelineDigestRequest,
) -> Result<TimelineDigest> {
if request.from_lsn > request.until_lsn {
bail!("from_lsn is greater than until_lsn");
}
let conf = GlobalTimelines::get_global_config();
let (_, persisted_state) = tli.get_state().await;
if persisted_state.timeline_start_lsn > request.from_lsn {
bail!("requested LSN is before the start of the timeline");
}
let mut wal_reader = WalReader::new(
conf.workdir.clone(),
tli.timeline_dir.clone(),
&persisted_state,
request.from_lsn,
true,
)?;
let mut hasher = Sha256::new();
let mut buf = [0u8; MAX_SEND_SIZE];
let mut bytes_left = (request.until_lsn.0 - request.from_lsn.0) as usize;
while bytes_left > 0 {
let bytes_to_read = std::cmp::min(buf.len(), bytes_left);
let bytes_read = wal_reader.read(&mut buf[..bytes_to_read]).await?;
if bytes_read == 0 {
bail!("wal_reader.read returned 0 bytes");
}
hasher.update(&buf[..bytes_read]);
bytes_left -= bytes_read;
}
let digest = hasher.finalize();
let digest = hex::encode(digest);
Ok(TimelineDigest { sha256: digest })
}

View File

@@ -2,7 +2,7 @@ use hyper::{Body, Request, Response, StatusCode, Uri};
use once_cell::sync::Lazy;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::SkTimelineInfo;
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt;
@@ -14,19 +14,21 @@ use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_util::sync::CancellationToken;
use utils::failpoint_support::failpoints_handler;
use utils::http::request::parse_query_param;
use std::io::Write as _;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::info_span;
use tracing::{info_span, Instrument};
use utils::http::endpoint::{request_span, ChannelWriter};
use crate::debug_dump::TimelineDigestRequest;
use crate::receive_wal::WalReceiverState;
use crate::safekeeper::Term;
use crate::safekeeper::{ServerInfo, TermLsn};
use crate::send_wal::WalSenderState;
use crate::timeline::PeerInfo;
use crate::{debug_dump, pull_timeline};
use crate::{copy_timeline, debug_dump, pull_timeline};
use crate::timelines_global_map::TimelineDeleteForceResult;
use crate::GlobalTimelines;
@@ -204,6 +206,56 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
json_response(StatusCode::OK, resp)
}
async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let request_data: TimelineCopyRequest = json_request(&mut request).await?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "source_timeline_id")?,
);
let source = GlobalTimelines::get(ttid)?;
copy_timeline::handle_request(copy_timeline::Request{
source,
until_lsn: request_data.until_lsn,
destination_ttid: TenantTimelineId::new(ttid.tenant_id, request_data.target_timeline_id),
})
.instrument(info_span!("copy_timeline", from=%ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;
let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
let request = TimelineDigestRequest {
from_lsn: from_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
"from_lsn is required"
)))?,
until_lsn: until_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
"until_lsn is required"
)))?,
};
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let response = debug_dump::calculate_digest(&tli, request)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, response)
}
/// Download a file from the timeline directory.
// TODO: figure out a better way to copy files between safekeepers
async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -472,11 +524,18 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
"/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
|r| request_span(r, timeline_files_handler),
)
.post(
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|r| request_span(r, timeline_copy_handler),
)
// for tests
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
request_span(r, record_safekeeper_info)
})
.get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
request_span(r, timeline_digest_handler)
})
}
#[cfg(test)]

View File

@@ -16,6 +16,7 @@ mod auth;
pub mod broker;
pub mod control_file;
pub mod control_file_upgrade;
pub mod copy_timeline;
pub mod debug_dump;
pub mod handler;
pub mod http;

View File

@@ -1,16 +1,24 @@
use std::sync::Arc;
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use anyhow::{bail, Context, Result};
use tokio::io::AsyncWriteExt;
use tracing::info;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
};
use crate::{
control_file, debug_dump,
http::routes::TimelineStatus,
timeline::{Timeline, TimelineError},
wal_storage::{self, Storage},
GlobalTimelines,
GlobalTimelines, SafeKeeperConf,
};
/// Info about timeline on safekeeper ready for reporting.
@@ -91,7 +99,7 @@ pub async fn handle_request(request: Request) -> Result<Response> {
async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response> {
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
info!(
"Pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
"pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
ttid,
host,
status.commit_lsn,
@@ -121,14 +129,14 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
if dump.timelines.len() != 1 {
bail!(
"Expected to fetch single timeline, got {} timelines",
"expected to fetch single timeline, got {} timelines",
dump.timelines.len()
);
}
let timeline = dump.timelines.into_iter().next().unwrap();
let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!(
"Timeline {} doesn't have disk content",
"timeline {} doesn't have disk content",
ttid
))?;
@@ -155,29 +163,12 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
filenames.insert(0, "safekeeper.control".to_string());
info!(
"Downloading {} files from safekeeper {}",
"downloading {} files from safekeeper {}",
filenames.len(),
host
);
// Creating temp directory for a new timeline. It needs to be
// located on the same filesystem as the rest of the timelines.
// conf.workdir is usually /storage/safekeeper/data
// will try to transform it into /storage/safekeeper/tmp
let temp_base = conf
.workdir
.parent()
.ok_or(anyhow::anyhow!("workdir has no parent"))?
.join("tmp");
tokio::fs::create_dir_all(&temp_base).await?;
let tli_dir = camino_tempfile::Builder::new()
.suffix("_temptli")
.prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
.tempdir_in(temp_base)?;
let tli_dir_path = tli_dir.path().to_path_buf();
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
// Note: some time happens between fetching list of files and fetching files themselves.
// It's possible that some files will be removed from safekeeper and we will fail to fetch them.
@@ -201,47 +192,105 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
// TODO: fsync?
// Let's create timeline from temp directory and verify that it's correct
let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
info!(
"finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
ttid, commit_lsn, flush_lsn
);
assert!(status.commit_lsn <= status.flush_lsn);
let control_path = tli_dir_path.join("safekeeper.control");
// Finally, load the timeline.
let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?;
Ok(Response {
safekeeper_host: host,
})
}
/// Create temp directory for a new timeline. It needs to be located on the same
/// filesystem as the rest of the timelines. It will be automatically deleted when
/// Utf8TempDir goes out of scope.
pub async fn create_temp_timeline_dir(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
) -> Result<(Utf8TempDir, Utf8PathBuf)> {
// conf.workdir is usually /storage/safekeeper/data
// will try to transform it into /storage/safekeeper/tmp
let temp_base = conf
.workdir
.parent()
.ok_or(anyhow::anyhow!("workdir has no parent"))?
.join("tmp");
tokio::fs::create_dir_all(&temp_base).await?;
let tli_dir = camino_tempfile::Builder::new()
.suffix("_temptli")
.prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
.tempdir_in(temp_base)?;
let tli_dir_path = tli_dir.path().to_path_buf();
Ok((tli_dir, tli_dir_path))
}
/// Do basic validation of a temp timeline, before moving it to the global map.
pub async fn validate_temp_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
path: &Utf8PathBuf,
) -> Result<(Lsn, Lsn)> {
let control_path = path.join("safekeeper.control");
let control_store = control_file::FileStorage::load_control_file(control_path)?;
if control_store.server.wal_seg_size == 0 {
bail!("wal_seg_size is not set");
}
let wal_store =
wal_storage::PhysicalStorage::new(&ttid, tli_dir_path.clone(), conf, &control_store)?;
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
let commit_lsn = status.commit_lsn;
let commit_lsn = control_store.commit_lsn;
let flush_lsn = wal_store.flush_lsn();
info!(
"Finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
ttid, commit_lsn, flush_lsn
);
assert!(status.commit_lsn <= status.flush_lsn);
Ok((commit_lsn, flush_lsn))
}
/// Move timeline from a temp directory to the main storage, and load it to the global map.
/// This operation is done under a lock to prevent bugs if several concurrent requests are
/// trying to load the same timeline. Note that it doesn't guard against creating the
/// timeline with the same ttid, but no one should be doing this anyway.
pub async fn load_temp_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
) -> Result<Arc<Timeline>> {
// Take a lock to prevent concurrent loadings
let load_lock = GlobalTimelines::loading_lock().await;
let guard = load_lock.lock().await;
if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) {
bail!("timeline already exists, cannot overwrite it")
}
// Move timeline dir to the correct location
let timeline_path = conf.timeline_dir(&ttid);
info!(
"Moving timeline {} from {} to {}",
ttid, tli_dir_path, timeline_path
"moving timeline {} from {} to {}",
ttid, tmp_path, timeline_path
);
tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
tokio::fs::rename(tli_dir_path, &timeline_path).await?;
tokio::fs::rename(tmp_path, &timeline_path).await?;
let tli = GlobalTimelines::load_timeline(ttid)
let tli = GlobalTimelines::load_timeline(&guard, ttid)
.await
.context("Failed to load timeline after copy")?;
info!(
"Loaded timeline {}, flush_lsn={}",
"loaded timeline {}, flush_lsn={}",
ttid,
tli.get_flush_lsn().await
);
Ok(Response {
safekeeper_host: host,
})
Ok(tli)
}

View File

@@ -141,7 +141,8 @@ impl SharedState {
// We don't want to write anything to disk, because we may have existing timeline there.
// These functions should not change anything on disk.
let control_store = control_file::FileStorage::create_new(ttid, conf, state)?;
let timeline_dir = conf.timeline_dir(ttid);
let control_store = control_file::FileStorage::create_new(timeline_dir, conf, state)?;
let wal_store =
wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;

View File

@@ -21,8 +21,12 @@ struct GlobalTimelinesState {
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
wal_backup_launcher_tx: Option<Sender<TenantTimelineId>>,
conf: Option<SafeKeeperConf>,
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
}
// Used to prevent concurrent timeline loading.
pub struct TimelineLoadLock;
impl GlobalTimelinesState {
/// Get configuration, which must be set once during init.
fn get_conf(&self) -> &SafeKeeperConf {
@@ -63,6 +67,7 @@ static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
timelines: HashMap::new(),
wal_backup_launcher_tx: None,
conf: None,
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
})
});
@@ -174,8 +179,16 @@ impl GlobalTimelines {
Ok(())
}
/// Take a lock for timeline loading.
pub async fn loading_lock() -> Arc<tokio::sync::Mutex<TimelineLoadLock>> {
TIMELINES_STATE.lock().unwrap().load_lock.clone()
}
/// Load timeline from disk to the memory.
pub async fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
pub async fn load_timeline<'a>(
_guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
ttid: TenantTimelineId,
) -> Result<Arc<Timeline>> {
let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {

View File

@@ -7,7 +7,7 @@ use tokio::task::JoinHandle;
use utils::id::NodeId;
use std::cmp::min;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -531,3 +531,62 @@ pub async fn read_object(
Ok(Box::pin(reader))
}
/// Copy segments from one timeline to another. Used in copy_timeline.
pub async fn copy_s3_segments(
wal_seg_size: usize,
src_ttid: &TenantTimelineId,
dst_ttid: &TenantTimelineId,
from_segment: XLogSegNo,
to_segment: XLogSegNo,
) -> Result<()> {
const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
let storage = REMOTE_STORAGE
.get()
.expect("failed to get remote storage")
.as_ref()
.unwrap();
let relative_dst_path =
Utf8Path::new(&dst_ttid.tenant_id.to_string()).join(dst_ttid.timeline_id.to_string());
let remote_path = RemotePath::new(&relative_dst_path)?;
let files = storage.list_files(Some(&remote_path)).await?;
let uploaded_segments = &files
.iter()
.filter_map(|file| file.object_name().map(ToOwned::to_owned))
.collect::<HashSet<_>>();
debug!(
"these segments have already been uploaded: {:?}",
uploaded_segments
);
let relative_src_path =
Utf8Path::new(&src_ttid.tenant_id.to_string()).join(src_ttid.timeline_id.to_string());
for segno in from_segment..to_segment {
if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
info!("copied all segments from {} until {}", from_segment, segno);
}
let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
if uploaded_segments.contains(&segment_name) {
continue;
}
debug!("copying segment {}", segment_name);
let from = RemotePath::new(&relative_src_path.join(&segment_name))?;
let to = RemotePath::new(&relative_dst_path.join(&segment_name))?;
storage.copy_object(&from, &to).await?;
}
info!(
"finished copying segments from {} until {}",
from_segment, to_segment
);
Ok(())
}

View File

@@ -728,7 +728,7 @@ async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
}
/// Helper returning full path to WAL segment file and its .partial brother.
fn wal_file_paths(
pub fn wal_file_paths(
timeline_dir: &Utf8Path,
segno: XLogSegNo,
wal_seg_size: usize,

View File

@@ -3032,6 +3032,28 @@ class SafekeeperHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: Dict[str, Any]):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",
json=body,
)
res.raise_for_status()
def timeline_digest(
self, tenant_id: TenantId, timeline_id: TimelineId, from_lsn: Lsn, until_lsn: Lsn
) -> Dict[str, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/digest",
params={
"from_lsn": str(from_lsn),
"until_lsn": str(until_lsn),
},
)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def timeline_create(
self,
tenant_id: TenantId,

View File

@@ -1838,3 +1838,83 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder):
assert final_stats.get("START_REPLICATION", 0) >= 1
# walproposer should connect to each safekeeper at least once
assert final_stats.get("START_WAL_PUSH", 0) >= 3
@pytest.mark.parametrize("insert_rows", [0, 100, 100000, 500000])
def test_timeline_copy(neon_env_builder: NeonEnvBuilder, insert_rows: int):
target_percents = [10, 50, 90, 100]
neon_env_builder.num_safekeepers = 3
# we need remote storage that supports copy_object S3 API
neon_env_builder.enable_safekeeper_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
lsns = []
def remember_lsn():
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
lsns.append(lsn)
return lsn
# remember LSN right after timeline creation
lsn = remember_lsn()
log.info(f"LSN after timeline creation: {lsn}")
endpoint.safe_psql("create table t(key int, value text)")
timeline_status = env.safekeepers[0].http_client().timeline_status(tenant_id, timeline_id)
timeline_start_lsn = timeline_status.timeline_start_lsn
log.info(f"Timeline start LSN: {timeline_start_lsn}")
current_percent = 0.0
for new_percent in target_percents:
new_rows = insert_rows * (new_percent - current_percent) / 100
current_percent = new_percent
if new_rows == 0:
continue
endpoint.safe_psql(
f"insert into t select generate_series(1, {new_rows}), repeat('payload!', 10)"
)
# remember LSN right after reaching new_percent
lsn = remember_lsn()
log.info(f"LSN after inserting {new_rows} rows: {lsn}")
# TODO: would be also good to test cases where not all segments are uploaded to S3
for lsn in lsns:
new_timeline_id = TimelineId.generate()
log.info(f"Copying branch for LSN {lsn}, to timeline {new_timeline_id}")
orig_digest = (
env.safekeepers[0]
.http_client()
.timeline_digest(tenant_id, timeline_id, timeline_start_lsn, lsn)
)
log.info(f"Original digest: {orig_digest}")
for sk in env.safekeepers:
sk.http_client().copy_timeline(
tenant_id,
timeline_id,
{
"target_timeline_id": str(new_timeline_id),
"until_lsn": str(lsn),
},
)
new_digest = sk.http_client().timeline_digest(
tenant_id, new_timeline_id, timeline_start_lsn, lsn
)
log.info(f"Digest after timeline copy on safekeeper {sk.id}: {new_digest}")
assert orig_digest == new_digest
# TODO: test timelines can start after copy