diff --git a/Cargo.lock b/Cargo.lock index 93efbadd79..4dd195a895 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4475,6 +4475,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "sha2", "signal-hook", "storage_broker", "thiserror", diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 7ea1103eb2..18cf5d97ba 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -322,6 +322,12 @@ impl RemoteStorage for AzureBlobStorage { } Ok(()) } + + async fn copy(&self, _from: &RemotePath, _to: &RemotePath) -> anyhow::Result<()> { + Err(anyhow::anyhow!( + "copy for azure blob storage is not implemented" + )) + } } pin_project_lite::pin_project! { diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 3e408e3119..942d0016b0 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -207,6 +207,9 @@ pub trait RemoteStorage: Send + Sync + 'static { async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>; async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>; + + /// Copy a remote object inside a bucket from one path to another. + async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>; } pub type DownloadStream = Pin> + Unpin + Send + Sync>>; @@ -374,6 +377,15 @@ impl GenericRemoteStorage { Self::Unreliable(s) => s.delete_objects(paths).await, } } + + pub async fn copy_object(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => s.copy(from, to).await, + Self::AwsS3(s) => s.copy(from, to).await, + Self::AzureBlob(s) => s.copy(from, to).await, + Self::Unreliable(s) => s.copy(from, to).await, + } + } } impl GenericRemoteStorage { @@ -660,6 +672,7 @@ impl ConcurrencyLimiter { RequestKind::Put => &self.write, RequestKind::List => &self.read, RequestKind::Delete => &self.write, + RequestKind::Copy => &self.write, } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index d1e7d325b9..bf8b6b5dde 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -409,6 +409,20 @@ impl RemoteStorage for LocalFs { } Ok(()) } + + async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> { + let from_path = from.with_base(&self.storage_root); + let to_path = to.with_base(&self.storage_root); + create_target_directory(&to_path).await?; + fs::copy(&from_path, &to_path).await.with_context(|| { + format!( + "Failed to copy file from '{from_path}' to '{to_path}'", + from_path = from_path, + to_path = to_path + ) + })?; + Ok(()) + } } fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 0f95458ad1..d7b41edaaf 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -493,6 +493,38 @@ impl RemoteStorage for S3Bucket { Ok(()) } + async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> { + let kind = RequestKind::Copy; + let _guard = self.permit(kind).await; + + let started_at = start_measuring_requests(kind); + + // we need to specify bucket_name as a prefix + let copy_source = format!( + "{}/{}", + self.bucket_name, + self.relative_path_to_s3_object(from) + ); + + let res = self + .client + .copy_object() + .bucket(self.bucket_name.clone()) + .key(self.relative_path_to_s3_object(to)) + .copy_source(copy_source) + .send() + .await; + + let started_at = ScopeGuard::into_inner(started_at); + metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &res, started_at); + + res?; + + Ok(()) + } + async fn download(&self, from: &RemotePath) -> Result { // if prefix is not none then download file `prefix/from` // if prefix is none then download file `from` diff --git a/libs/remote_storage/src/s3_bucket/metrics.rs b/libs/remote_storage/src/s3_bucket/metrics.rs index ea11edafa5..21dde14906 100644 --- a/libs/remote_storage/src/s3_bucket/metrics.rs +++ b/libs/remote_storage/src/s3_bucket/metrics.rs @@ -11,6 +11,7 @@ pub(crate) enum RequestKind { Put = 1, Delete = 2, List = 3, + Copy = 4, } use RequestKind::*; @@ -22,6 +23,7 @@ impl RequestKind { Put => "put_object", Delete => "delete_object", List => "list_objects", + Copy => "copy_object", } } const fn as_index(&self) -> usize { @@ -29,7 +31,7 @@ impl RequestKind { } } -pub(super) struct RequestTyped([C; 4]); +pub(super) struct RequestTyped([C; 5]); impl RequestTyped { pub(super) fn get(&self, kind: RequestKind) -> &C { @@ -38,8 +40,8 @@ impl RequestTyped { fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self { use RequestKind::*; - let mut it = [Get, Put, Delete, List].into_iter(); - let arr = std::array::from_fn::(|index| { + let mut it = [Get, Put, Delete, List, Copy].into_iter(); + let arr = std::array::from_fn::(|index| { let next = it.next().unwrap(); assert_eq!(index, next.as_index()); f(next) diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 802b0db7f5..7f5adcea30 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -162,4 +162,11 @@ impl RemoteStorage for UnreliableWrapper { } Ok(()) } + + async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()> { + // copy is equivalent to download + upload + self.attempt(RemoteOp::Download(from.clone()))?; + self.attempt(RemoteOp::Upload(to.clone()))?; + self.inner.copy_object(from, to).await + } } diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 786712deb1..ce5a1e411e 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -51,3 +51,9 @@ pub struct SkTimelineInfo { #[serde(default)] pub http_connstr: Option, } + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TimelineCopyRequest { + pub target_timeline_id: TimelineId, + pub until_lsn: Lsn, +} diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 4015c27933..364cad7892 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -54,6 +54,7 @@ postgres_ffi.workspace = true pq_proto.workspace = true remote_storage.workspace = true safekeeper_api.workspace = true +sha2.workspace = true sd-notify.workspace = true storage_broker.workspace = true tokio-stream.workspace = true diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index 7aadd67ac6..591bfea182 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -66,12 +66,10 @@ impl FileStorage { /// Create file storage for a new timeline, but don't persist it yet. pub fn create_new( - ttid: &TenantTimelineId, + timeline_dir: Utf8PathBuf, conf: &SafeKeeperConf, state: SafeKeeperState, ) -> Result { - let timeline_dir = conf.timeline_dir(ttid); - let store = FileStorage { timeline_dir, conf: conf.clone(), @@ -277,7 +275,8 @@ mod test { .await .expect("failed to create timeline dir"); let state = SafeKeeperState::empty(); - let storage = FileStorage::create_new(ttid, conf, state.clone())?; + let timeline_dir = conf.timeline_dir(ttid); + let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?; Ok((storage, state)) } diff --git a/safekeeper/src/copy_timeline.rs b/safekeeper/src/copy_timeline.rs new file mode 100644 index 0000000000..ef88eb27e3 --- /dev/null +++ b/safekeeper/src/copy_timeline.rs @@ -0,0 +1,250 @@ +use std::sync::Arc; + +use anyhow::{bail, Result}; +use camino::Utf8PathBuf; + +use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE}; +use tokio::{ + fs::OpenOptions, + io::{AsyncSeekExt, AsyncWriteExt}, +}; +use tracing::{info, warn}; +use utils::{id::TenantTimelineId, lsn::Lsn}; + +use crate::{ + control_file::{FileStorage, Storage}, + pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline}, + safekeeper::SafeKeeperState, + timeline::{Timeline, TimelineError}, + wal_backup::copy_s3_segments, + wal_storage::{wal_file_paths, WalReader}, + GlobalTimelines, SafeKeeperConf, +}; + +// we don't want to have more than 10 segments on disk after copy, because they take space +const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64; + +pub struct Request { + pub source: Arc, + pub until_lsn: Lsn, + pub destination_ttid: TenantTimelineId, +} + +pub async fn handle_request(request: Request) -> Result<()> { + // TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :( + // if LSN will point to the middle of a WAL record, timeline will be in "broken" state + + match GlobalTimelines::get(request.destination_ttid) { + // timeline already exists. would be good to check that this timeline is the copy + // of the source timeline, but it isn't obvious how to do that + Ok(_) => return Ok(()), + // timeline not found, we are going to create it + Err(TimelineError::NotFound(_)) => {} + // error, probably timeline was deleted + res => { + res?; + } + } + + let conf = &GlobalTimelines::get_global_config(); + let ttid = request.destination_ttid; + + let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?; + + let (mem_state, state) = request.source.get_state().await; + let start_lsn = state.timeline_start_lsn; + if start_lsn == Lsn::INVALID { + bail!("timeline is not initialized"); + } + let backup_lsn = mem_state.backup_lsn; + + { + let commit_lsn = mem_state.commit_lsn; + let flush_lsn = request.source.get_flush_lsn().await; + + info!( + "collected info about source timeline: start_lsn={}, backup_lsn={}, commit_lsn={}, flush_lsn={}", + start_lsn, backup_lsn, commit_lsn, flush_lsn + ); + + assert!(backup_lsn >= start_lsn); + assert!(commit_lsn >= start_lsn); + assert!(flush_lsn >= start_lsn); + + if request.until_lsn > flush_lsn { + bail!("requested LSN is beyond the end of the timeline"); + } + if request.until_lsn < start_lsn { + bail!("requested LSN is before the start of the timeline"); + } + + if request.until_lsn > commit_lsn { + warn!("copy_timeline WAL is not fully committed"); + } + + if backup_lsn < request.until_lsn && request.until_lsn.0 - backup_lsn.0 > MAX_BACKUP_LAG { + // we have a lot of segments that are not backed up. we can try to wait here until + // segments will be backed up to remote storage, but it's not clear how long to wait + bail!("too many segments are not backed up"); + } + } + + let wal_seg_size = state.server.wal_seg_size as usize; + if wal_seg_size == 0 { + bail!("wal_seg_size is not set"); + } + + let first_segment = start_lsn.segment_number(wal_seg_size); + let last_segment = request.until_lsn.segment_number(wal_seg_size); + + let new_backup_lsn = { + // we can't have new backup_lsn greater than existing backup_lsn or start of the last segment + let max_backup_lsn = backup_lsn.min(Lsn(last_segment * wal_seg_size as u64)); + + if max_backup_lsn <= start_lsn { + // probably we are starting from the first segment, which was not backed up yet. + // note that start_lsn can be in the middle of the segment + start_lsn + } else { + // we have some segments backed up, so we will assume all WAL below max_backup_lsn is backed up + assert!(max_backup_lsn.segment_offset(wal_seg_size) == 0); + max_backup_lsn + } + }; + + // all previous segments will be copied inside S3 + let first_ondisk_segment = new_backup_lsn.segment_number(wal_seg_size); + assert!(first_ondisk_segment <= last_segment); + assert!(first_ondisk_segment >= first_segment); + + copy_s3_segments( + wal_seg_size, + &request.source.ttid, + &request.destination_ttid, + first_segment, + first_ondisk_segment, + ) + .await?; + + copy_disk_segments( + conf, + &state, + wal_seg_size, + &request.source.ttid, + new_backup_lsn, + request.until_lsn, + &tli_dir_path, + ) + .await?; + + let mut new_state = SafeKeeperState::new( + &request.destination_ttid, + state.server.clone(), + vec![], + request.until_lsn, + start_lsn, + ); + new_state.timeline_start_lsn = start_lsn; + new_state.peer_horizon_lsn = request.until_lsn; + new_state.backup_lsn = new_backup_lsn; + + let mut file_storage = FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone())?; + file_storage.persist(&new_state).await?; + + // now we have a ready timeline in a temp directory + validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?; + load_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?; + + Ok(()) +} + +async fn copy_disk_segments( + conf: &SafeKeeperConf, + persisted_state: &SafeKeeperState, + wal_seg_size: usize, + source_ttid: &TenantTimelineId, + start_lsn: Lsn, + end_lsn: Lsn, + tli_dir_path: &Utf8PathBuf, +) -> Result<()> { + let mut wal_reader = WalReader::new( + conf.workdir.clone(), + conf.timeline_dir(source_ttid), + persisted_state, + start_lsn, + true, + )?; + + let mut buf = [0u8; MAX_SEND_SIZE]; + + let first_segment = start_lsn.segment_number(wal_seg_size); + let last_segment = end_lsn.segment_number(wal_seg_size); + + for segment in first_segment..=last_segment { + let segment_start = segment * wal_seg_size as u64; + let segment_end = segment_start + wal_seg_size as u64; + + let copy_start = segment_start.max(start_lsn.0); + let copy_end = segment_end.min(end_lsn.0); + + let copy_start = copy_start - segment_start; + let copy_end = copy_end - segment_start; + + let wal_file_path = { + let (normal, partial) = wal_file_paths(tli_dir_path, segment, wal_seg_size)?; + + if segment == last_segment { + partial + } else { + normal + } + }; + + write_segment( + &mut buf, + &wal_file_path, + wal_seg_size as u64, + copy_start, + copy_end, + &mut wal_reader, + ) + .await?; + } + + Ok(()) +} + +async fn write_segment( + buf: &mut [u8], + file_path: &Utf8PathBuf, + wal_seg_size: u64, + from: u64, + to: u64, + reader: &mut WalReader, +) -> Result<()> { + assert!(from <= to); + assert!(to <= wal_seg_size); + + let mut file = OpenOptions::new() + .create(true) + .write(true) + .open(&file_path) + .await?; + + // maybe fill with zeros, as in wal_storage.rs? + file.set_len(wal_seg_size).await?; + file.seek(std::io::SeekFrom::Start(from)).await?; + + let mut bytes_left = to - from; + while bytes_left > 0 { + let len = bytes_left as usize; + let len = len.min(buf.len()); + let len = reader.read(&mut buf[..len]).await?; + file.write_all(&buf[..len]).await?; + bytes_left -= len as u64; + } + + file.flush().await?; + file.sync_all().await?; + Ok(()) +} diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index daf9255ecb..c9ff1afdea 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -7,13 +7,16 @@ use std::io::Read; use std::path::PathBuf; use std::sync::Arc; +use anyhow::bail; use anyhow::Result; use camino::Utf8Path; use chrono::{DateTime, Utc}; use postgres_ffi::XLogSegNo; +use postgres_ffi::MAX_SEND_SIZE; use serde::Deserialize; use serde::Serialize; +use sha2::{Digest, Sha256}; use utils::id::NodeId; use utils::id::TenantTimelineId; use utils::id::{TenantId, TimelineId}; @@ -25,6 +28,7 @@ use crate::safekeeper::TermHistory; use crate::SafeKeeperConf; use crate::send_wal::WalSenderState; +use crate::wal_storage::WalReader; use crate::GlobalTimelines; /// Various filters that influence the resulting JSON output. @@ -300,3 +304,56 @@ fn build_config(config: SafeKeeperConf) -> Config { wal_backup_enabled: config.wal_backup_enabled, } } + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TimelineDigestRequest { + pub from_lsn: Lsn, + pub until_lsn: Lsn, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TimelineDigest { + pub sha256: String, +} + +pub async fn calculate_digest( + tli: &Arc, + request: TimelineDigestRequest, +) -> Result { + if request.from_lsn > request.until_lsn { + bail!("from_lsn is greater than until_lsn"); + } + + let conf = GlobalTimelines::get_global_config(); + let (_, persisted_state) = tli.get_state().await; + + if persisted_state.timeline_start_lsn > request.from_lsn { + bail!("requested LSN is before the start of the timeline"); + } + + let mut wal_reader = WalReader::new( + conf.workdir.clone(), + tli.timeline_dir.clone(), + &persisted_state, + request.from_lsn, + true, + )?; + + let mut hasher = Sha256::new(); + let mut buf = [0u8; MAX_SEND_SIZE]; + + let mut bytes_left = (request.until_lsn.0 - request.from_lsn.0) as usize; + while bytes_left > 0 { + let bytes_to_read = std::cmp::min(buf.len(), bytes_left); + let bytes_read = wal_reader.read(&mut buf[..bytes_to_read]).await?; + if bytes_read == 0 { + bail!("wal_reader.read returned 0 bytes"); + } + hasher.update(&buf[..bytes_read]); + bytes_left -= bytes_read; + } + + let digest = hasher.finalize(); + let digest = hex::encode(digest); + Ok(TimelineDigest { sha256: digest }) +} diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 25a3334e63..5283ea19c1 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -2,7 +2,7 @@ use hyper::{Body, Request, Response, StatusCode, Uri}; use once_cell::sync::Lazy; use postgres_ffi::WAL_SEGMENT_SIZE; -use safekeeper_api::models::SkTimelineInfo; +use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::fmt; @@ -14,19 +14,21 @@ use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio_util::sync::CancellationToken; use utils::failpoint_support::failpoints_handler; +use utils::http::request::parse_query_param; use std::io::Write as _; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tracing::info_span; +use tracing::{info_span, Instrument}; use utils::http::endpoint::{request_span, ChannelWriter}; +use crate::debug_dump::TimelineDigestRequest; use crate::receive_wal::WalReceiverState; use crate::safekeeper::Term; use crate::safekeeper::{ServerInfo, TermLsn}; use crate::send_wal::WalSenderState; use crate::timeline::PeerInfo; -use crate::{debug_dump, pull_timeline}; +use crate::{copy_timeline, debug_dump, pull_timeline}; use crate::timelines_global_map::TimelineDeleteForceResult; use crate::GlobalTimelines; @@ -204,6 +206,56 @@ async fn timeline_pull_handler(mut request: Request) -> Result) -> Result, ApiError> { + check_permission(&request, None)?; + + let request_data: TimelineCopyRequest = json_request(&mut request).await?; + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "source_timeline_id")?, + ); + + let source = GlobalTimelines::get(ttid)?; + + copy_timeline::handle_request(copy_timeline::Request{ + source, + until_lsn: request_data.until_lsn, + destination_ttid: TenantTimelineId::new(ttid.tenant_id, request_data.target_timeline_id), + }) + .instrument(info_span!("copy_timeline", from=%ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn)) + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, ()) +} + +async fn timeline_digest_handler(request: Request) -> Result, ApiError> { + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + check_permission(&request, Some(ttid.tenant_id))?; + + let from_lsn: Option = parse_query_param(&request, "from_lsn")?; + let until_lsn: Option = parse_query_param(&request, "until_lsn")?; + + let request = TimelineDigestRequest { + from_lsn: from_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!( + "from_lsn is required" + )))?, + until_lsn: until_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!( + "until_lsn is required" + )))?, + }; + + let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; + + let response = debug_dump::calculate_digest(&tli, request) + .await + .map_err(ApiError::InternalServerError)?; + json_response(StatusCode::OK, response) +} + /// Download a file from the timeline directory. // TODO: figure out a better way to copy files between safekeepers async fn timeline_files_handler(request: Request) -> Result, ApiError> { @@ -472,11 +524,18 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename", |r| request_span(r, timeline_files_handler), ) + .post( + "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy", + |r| request_span(r, timeline_copy_handler), + ) // for tests .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| { request_span(r, record_safekeeper_info) }) .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler)) + .get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| { + request_span(r, timeline_digest_handler) + }) } #[cfg(test)] diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 3a086f1f54..fc5f99eb00 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -16,6 +16,7 @@ mod auth; pub mod broker; pub mod control_file; pub mod control_file_upgrade; +pub mod copy_timeline; pub mod debug_dump; pub mod handler; pub mod http; diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index ad3a18a536..93b51f32c0 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -1,16 +1,24 @@ +use std::sync::Arc; + +use camino::Utf8PathBuf; +use camino_tempfile::Utf8TempDir; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use anyhow::{bail, Context, Result}; use tokio::io::AsyncWriteExt; use tracing::info; -use utils::id::{TenantId, TenantTimelineId, TimelineId}; +use utils::{ + id::{TenantId, TenantTimelineId, TimelineId}, + lsn::Lsn, +}; use crate::{ control_file, debug_dump, http::routes::TimelineStatus, + timeline::{Timeline, TimelineError}, wal_storage::{self, Storage}, - GlobalTimelines, + GlobalTimelines, SafeKeeperConf, }; /// Info about timeline on safekeeper ready for reporting. @@ -91,7 +99,7 @@ pub async fn handle_request(request: Request) -> Result { async fn pull_timeline(status: TimelineStatus, host: String) -> Result { let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id); info!( - "Pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}", + "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}", ttid, host, status.commit_lsn, @@ -121,14 +129,14 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result if dump.timelines.len() != 1 { bail!( - "Expected to fetch single timeline, got {} timelines", + "expected to fetch single timeline, got {} timelines", dump.timelines.len() ); } let timeline = dump.timelines.into_iter().next().unwrap(); let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!( - "Timeline {} doesn't have disk content", + "timeline {} doesn't have disk content", ttid ))?; @@ -155,29 +163,12 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result filenames.insert(0, "safekeeper.control".to_string()); info!( - "Downloading {} files from safekeeper {}", + "downloading {} files from safekeeper {}", filenames.len(), host ); - // Creating temp directory for a new timeline. It needs to be - // located on the same filesystem as the rest of the timelines. - - // conf.workdir is usually /storage/safekeeper/data - // will try to transform it into /storage/safekeeper/tmp - let temp_base = conf - .workdir - .parent() - .ok_or(anyhow::anyhow!("workdir has no parent"))? - .join("tmp"); - - tokio::fs::create_dir_all(&temp_base).await?; - - let tli_dir = camino_tempfile::Builder::new() - .suffix("_temptli") - .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id)) - .tempdir_in(temp_base)?; - let tli_dir_path = tli_dir.path().to_path_buf(); + let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?; // Note: some time happens between fetching list of files and fetching files themselves. // It's possible that some files will be removed from safekeeper and we will fail to fetch them. @@ -201,47 +192,105 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result // TODO: fsync? // Let's create timeline from temp directory and verify that it's correct + let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?; + info!( + "finished downloading timeline {}, commit_lsn={}, flush_lsn={}", + ttid, commit_lsn, flush_lsn + ); + assert!(status.commit_lsn <= status.flush_lsn); - let control_path = tli_dir_path.join("safekeeper.control"); + // Finally, load the timeline. + let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?; + + Ok(Response { + safekeeper_host: host, + }) +} + +/// Create temp directory for a new timeline. It needs to be located on the same +/// filesystem as the rest of the timelines. It will be automatically deleted when +/// Utf8TempDir goes out of scope. +pub async fn create_temp_timeline_dir( + conf: &SafeKeeperConf, + ttid: TenantTimelineId, +) -> Result<(Utf8TempDir, Utf8PathBuf)> { + // conf.workdir is usually /storage/safekeeper/data + // will try to transform it into /storage/safekeeper/tmp + let temp_base = conf + .workdir + .parent() + .ok_or(anyhow::anyhow!("workdir has no parent"))? + .join("tmp"); + + tokio::fs::create_dir_all(&temp_base).await?; + + let tli_dir = camino_tempfile::Builder::new() + .suffix("_temptli") + .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id)) + .tempdir_in(temp_base)?; + + let tli_dir_path = tli_dir.path().to_path_buf(); + + Ok((tli_dir, tli_dir_path)) +} + +/// Do basic validation of a temp timeline, before moving it to the global map. +pub async fn validate_temp_timeline( + conf: &SafeKeeperConf, + ttid: TenantTimelineId, + path: &Utf8PathBuf, +) -> Result<(Lsn, Lsn)> { + let control_path = path.join("safekeeper.control"); let control_store = control_file::FileStorage::load_control_file(control_path)?; if control_store.server.wal_seg_size == 0 { bail!("wal_seg_size is not set"); } - let wal_store = - wal_storage::PhysicalStorage::new(&ttid, tli_dir_path.clone(), conf, &control_store)?; + let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?; - let commit_lsn = status.commit_lsn; + let commit_lsn = control_store.commit_lsn; let flush_lsn = wal_store.flush_lsn(); - info!( - "Finished downloading timeline {}, commit_lsn={}, flush_lsn={}", - ttid, commit_lsn, flush_lsn - ); - assert!(status.commit_lsn <= status.flush_lsn); + Ok((commit_lsn, flush_lsn)) +} + +/// Move timeline from a temp directory to the main storage, and load it to the global map. +/// This operation is done under a lock to prevent bugs if several concurrent requests are +/// trying to load the same timeline. Note that it doesn't guard against creating the +/// timeline with the same ttid, but no one should be doing this anyway. +pub async fn load_temp_timeline( + conf: &SafeKeeperConf, + ttid: TenantTimelineId, + tmp_path: &Utf8PathBuf, +) -> Result> { + // Take a lock to prevent concurrent loadings + let load_lock = GlobalTimelines::loading_lock().await; + let guard = load_lock.lock().await; + + if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) { + bail!("timeline already exists, cannot overwrite it") + } // Move timeline dir to the correct location let timeline_path = conf.timeline_dir(&ttid); info!( - "Moving timeline {} from {} to {}", - ttid, tli_dir_path, timeline_path + "moving timeline {} from {} to {}", + ttid, tmp_path, timeline_path ); tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?; - tokio::fs::rename(tli_dir_path, &timeline_path).await?; + tokio::fs::rename(tmp_path, &timeline_path).await?; - let tli = GlobalTimelines::load_timeline(ttid) + let tli = GlobalTimelines::load_timeline(&guard, ttid) .await .context("Failed to load timeline after copy")?; info!( - "Loaded timeline {}, flush_lsn={}", + "loaded timeline {}, flush_lsn={}", ttid, tli.get_flush_lsn().await ); - Ok(Response { - safekeeper_host: host, - }) + Ok(tli) } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index bdc9088138..2f284abe8c 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -141,7 +141,8 @@ impl SharedState { // We don't want to write anything to disk, because we may have existing timeline there. // These functions should not change anything on disk. - let control_store = control_file::FileStorage::create_new(ttid, conf, state)?; + let timeline_dir = conf.timeline_dir(ttid); + let control_store = control_file::FileStorage::create_new(timeline_dir, conf, state)?; let wal_store = wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?; let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?; diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index cbb3342e40..92ac5ba66d 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -21,8 +21,12 @@ struct GlobalTimelinesState { timelines: HashMap>, wal_backup_launcher_tx: Option>, conf: Option, + load_lock: Arc>, } +// Used to prevent concurrent timeline loading. +pub struct TimelineLoadLock; + impl GlobalTimelinesState { /// Get configuration, which must be set once during init. fn get_conf(&self) -> &SafeKeeperConf { @@ -63,6 +67,7 @@ static TIMELINES_STATE: Lazy> = Lazy::new(|| { timelines: HashMap::new(), wal_backup_launcher_tx: None, conf: None, + load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)), }) }); @@ -174,8 +179,16 @@ impl GlobalTimelines { Ok(()) } + /// Take a lock for timeline loading. + pub async fn loading_lock() -> Arc> { + TIMELINES_STATE.lock().unwrap().load_lock.clone() + } + /// Load timeline from disk to the memory. - pub async fn load_timeline(ttid: TenantTimelineId) -> Result> { + pub async fn load_timeline<'a>( + _guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>, + ttid: TenantTimelineId, + ) -> Result> { let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies(); match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) { diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index c99bbc7d61..e4499eaf50 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -7,7 +7,7 @@ use tokio::task::JoinHandle; use utils::id::NodeId; use std::cmp::min; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -531,3 +531,62 @@ pub async fn read_object( Ok(Box::pin(reader)) } + +/// Copy segments from one timeline to another. Used in copy_timeline. +pub async fn copy_s3_segments( + wal_seg_size: usize, + src_ttid: &TenantTimelineId, + dst_ttid: &TenantTimelineId, + from_segment: XLogSegNo, + to_segment: XLogSegNo, +) -> Result<()> { + const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024; + + let storage = REMOTE_STORAGE + .get() + .expect("failed to get remote storage") + .as_ref() + .unwrap(); + + let relative_dst_path = + Utf8Path::new(&dst_ttid.tenant_id.to_string()).join(dst_ttid.timeline_id.to_string()); + + let remote_path = RemotePath::new(&relative_dst_path)?; + + let files = storage.list_files(Some(&remote_path)).await?; + let uploaded_segments = &files + .iter() + .filter_map(|file| file.object_name().map(ToOwned::to_owned)) + .collect::>(); + + debug!( + "these segments have already been uploaded: {:?}", + uploaded_segments + ); + + let relative_src_path = + Utf8Path::new(&src_ttid.tenant_id.to_string()).join(src_ttid.timeline_id.to_string()); + + for segno in from_segment..to_segment { + if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 { + info!("copied all segments from {} until {}", from_segment, segno); + } + + let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size); + if uploaded_segments.contains(&segment_name) { + continue; + } + debug!("copying segment {}", segment_name); + + let from = RemotePath::new(&relative_src_path.join(&segment_name))?; + let to = RemotePath::new(&relative_dst_path.join(&segment_name))?; + + storage.copy_object(&from, &to).await?; + } + + info!( + "finished copying segments from {} until {}", + from_segment, to_segment + ); + Ok(()) +} diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index e7538f805c..8d138c701f 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -728,7 +728,7 @@ async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> { } /// Helper returning full path to WAL segment file and its .partial brother. -fn wal_file_paths( +pub fn wal_file_paths( timeline_dir: &Utf8Path, segno: XLogSegNo, wal_seg_size: usize, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5b1a8ba27d..f33e17a76a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3032,6 +3032,28 @@ class SafekeeperHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json + def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: Dict[str, Any]): + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy", + json=body, + ) + res.raise_for_status() + + def timeline_digest( + self, tenant_id: TenantId, timeline_id: TimelineId, from_lsn: Lsn, until_lsn: Lsn + ) -> Dict[str, Any]: + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/digest", + params={ + "from_lsn": str(from_lsn), + "until_lsn": str(until_lsn), + }, + ) + res.raise_for_status() + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + def timeline_create( self, tenant_id: TenantId, diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 4dfc883f4c..b4ce633531 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1838,3 +1838,83 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder): assert final_stats.get("START_REPLICATION", 0) >= 1 # walproposer should connect to each safekeeper at least once assert final_stats.get("START_WAL_PUSH", 0) >= 3 + + +@pytest.mark.parametrize("insert_rows", [0, 100, 100000, 500000]) +def test_timeline_copy(neon_env_builder: NeonEnvBuilder, insert_rows: int): + target_percents = [10, 50, 90, 100] + + neon_env_builder.num_safekeepers = 3 + # we need remote storage that supports copy_object S3 API + neon_env_builder.enable_safekeeper_remote_storage(RemoteStorageKind.MOCK_S3) + env = neon_env_builder.init_start() + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + endpoint = env.endpoints.create_start("main") + + lsns = [] + + def remember_lsn(): + lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + lsns.append(lsn) + return lsn + + # remember LSN right after timeline creation + lsn = remember_lsn() + log.info(f"LSN after timeline creation: {lsn}") + + endpoint.safe_psql("create table t(key int, value text)") + + timeline_status = env.safekeepers[0].http_client().timeline_status(tenant_id, timeline_id) + timeline_start_lsn = timeline_status.timeline_start_lsn + log.info(f"Timeline start LSN: {timeline_start_lsn}") + + current_percent = 0.0 + for new_percent in target_percents: + new_rows = insert_rows * (new_percent - current_percent) / 100 + current_percent = new_percent + + if new_rows == 0: + continue + + endpoint.safe_psql( + f"insert into t select generate_series(1, {new_rows}), repeat('payload!', 10)" + ) + + # remember LSN right after reaching new_percent + lsn = remember_lsn() + log.info(f"LSN after inserting {new_rows} rows: {lsn}") + + # TODO: would be also good to test cases where not all segments are uploaded to S3 + + for lsn in lsns: + new_timeline_id = TimelineId.generate() + log.info(f"Copying branch for LSN {lsn}, to timeline {new_timeline_id}") + + orig_digest = ( + env.safekeepers[0] + .http_client() + .timeline_digest(tenant_id, timeline_id, timeline_start_lsn, lsn) + ) + log.info(f"Original digest: {orig_digest}") + + for sk in env.safekeepers: + sk.http_client().copy_timeline( + tenant_id, + timeline_id, + { + "target_timeline_id": str(new_timeline_id), + "until_lsn": str(lsn), + }, + ) + + new_digest = sk.http_client().timeline_digest( + tenant_id, new_timeline_id, timeline_start_lsn, lsn + ) + log.info(f"Digest after timeline copy on safekeeper {sk.id}: {new_digest}") + + assert orig_digest == new_digest + + # TODO: test timelines can start after copy