diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 435364d83a..c4ffe055ec 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -112,7 +112,7 @@ impl RemotePath { self.0.file_name() } - pub fn join(&self, segment: &Utf8Path) -> Self { + pub fn join>(&self, segment: P) -> Self { Self(self.0.join(segment)) } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 54c5cdf133..56ca506191 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -681,6 +681,45 @@ async fn tenant_ignore_handler( json_response(StatusCode::OK, ()) } +async fn tenant_duplicate_handler( + mut request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let src_tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + + let request_data: TenantCreateRequest = json_request(&mut request).await?; + let new_tenant_id = request_data.new_tenant_id; + check_permission(&request, None)?; + + let _timer = STORAGE_TIME_GLOBAL + .get_metric_with_label_values(&[StorageTimeOperation::DuplicateTenant.into()]) + .expect("bug") + .start_timer(); + + let tenant_conf = + TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?; + + let state = get_state(&request); + + let generation = get_request_generation(state, request_data.generation)?; + + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + + mgr::duplicate_tenant( + state.conf, + tenant_conf, + src_tenant_id, + new_tenant_id, + generation, + state.tenant_resources(), + &ctx, + ) + .instrument(info_span!("tenant_duplicate", %src_tenant_id, tenant_id = %new_tenant_id)) + .await?; + + json_response(StatusCode::CREATED, TenantCreateResponse(new_tenant_id)) +} + async fn tenant_list_handler( request: Request, _cancel: CancellationToken, @@ -1728,6 +1767,9 @@ pub fn make_router( .post("/v1/tenant/:tenant_id/ignore", |r| { api_handler(r, tenant_ignore_handler) }) + .post("/v1/tenant/:tenant_id/duplicate", |r| { + api_handler(r, tenant_duplicate_handler) + }) .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { api_handler(r, timeline_detail_handler) }) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 267e632312..34575d93b2 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -51,6 +51,9 @@ pub enum StorageTimeOperation { #[strum(serialize = "create tenant")] CreateTenant, + + #[strum(serialize = "duplicate tenant")] + DuplicateTenant, } pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy = Lazy::new(|| { diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 380b610a4c..1432173756 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -4,8 +4,10 @@ use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf}; use rand::{distributions::Alphanumeric, Rng}; use std::collections::{hash_map, HashMap}; +use std::str::FromStr; use std::sync::Arc; use tokio::fs; +use tokio::io::AsyncSeekExt; use anyhow::Context; use once_cell::sync::Lazy; @@ -26,9 +28,11 @@ use crate::deletion_queue::DeletionQueueClient; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt}; use crate::tenant::delete::DeleteTenantFlow; +use crate::tenant::span::debug_assert_current_span_has_tenant_id; +use crate::tenant::storage_layer::{DeltaLayer, ImageLayer, LayerFileName}; use crate::tenant::{ - create_tenant_files, AttachMarkerMode, AttachedTenantConf, CreateTenantFilesMode, Tenant, - TenantState, + create_tenant_files, remote_timeline_client, AttachMarkerMode, AttachedTenantConf, + CreateTenantFilesMode, IndexPart, Tenant, TenantState, }; use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX}; @@ -695,6 +699,159 @@ pub(crate) async fn create_tenant( }).await } +pub(crate) async fn duplicate_tenant( + conf: &'static PageServerConf, + tenant_conf: TenantConfOpt, + src_tenant_id: TenantId, + new_tenant_id: TenantId, + generation: Generation, + resources: TenantSharedResources, + ctx: &RequestContext, +) -> Result<(), TenantMapInsertError> { + debug_assert_current_span_has_tenant_id(); + + // TODO: would be nice to use tenant_map_insert here, but, we're not ready to create a Tenant object yet + let tempdir = path_with_suffix_extension( + conf.tenants_path().join(&new_tenant_id.to_string()), + &format!("duplication.{TEMP_FILE_SUFFIX}"), + ); + tokio::fs::remove_dir_all(&tempdir) + .await + .or_else(|e| match e.kind() { + std::io::ErrorKind::NotFound => Ok(()), + _ => Err(e), + }) + .context("pre-run clean up tempdir")?; + + tokio::fs::create_dir(&tempdir) + .await + .context("create tempdir")?; + + // Copy the tenant's data in S3 + let remote_storage = resources + .remote_storage + .as_ref() + .context("only works with remote storage")?; + + let remote_src_timelines = + remote_timeline_client::list_remote_timelines(remote_storage, src_tenant_id) + .await + .context("list src timelines")?; + + info!(?remote_src_timelines, "got src timelines"); + + for timeline_id in remote_src_timelines { + async { + let tempdir = tempdir.join(&timeline_id.to_string()); + + tokio::fs::create_dir(&tempdir) + .await + .context("create tempdir for timeline")?; + + let remote_src_tl = + remote_timeline_client::remote_timeline_path(&src_tenant_id, &timeline_id); + let remote_dst_tl = + remote_timeline_client::remote_timeline_path(&new_tenant_id, &timeline_id); + + let object_names = remote_storage + .list_prefixes(Some(&remote_src_tl)) + .await + .context("list timeline remote prefix")?; + + for name in object_names { + async { + let name = name.object_name().context( + "list_prefixes return values should always have object_name()=Some", + )?; + let remote_src_obj = remote_src_tl.join(name); + let remote_dst_obj = remote_dst_tl.join(name); + + let tmp_obj_filepath = tempdir.join(name); + let mut tmp_obj_file = tokio::fs::OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&tmp_obj_filepath) + .await + .context("create temp file")?; + let mut tmp_dl = remote_storage + .download(&remote_src_obj) + .await + .context("start download")?; + let tmp_obj_size = + tokio::io::copy(&mut tmp_dl.download_stream, &mut tmp_obj_file) + .await + .context("do the download")?; + + if name == IndexPart::FILE_NAME { + // needs no patching + } else { + let name = LayerFileName::from_str(name).map_err(|e: String| { + anyhow::anyhow!("unknown key in timeline s3 prefix: {name:?}: {e}") + })?; + match name { + LayerFileName::Image(_) => { + ImageLayer::rewrite_tenant_timeline( + &tmp_obj_filepath, + new_tenant_id, + timeline_id, /* leave as is */ + ctx, + ) + .await + .context("rewrite tenant timeline")?; + } + LayerFileName::Delta(_) => { + DeltaLayer::rewrite_tenant_timeline( + &tmp_obj_filepath, + new_tenant_id, + timeline_id, /* leave as is */ + ctx, + ) + .await + .context("rewrite tenant timeline")?; + } + } + } + + info!(?remote_dst_obj, "uploading"); + + tmp_obj_file + .seek(std::io::SeekFrom::Start(0)) + .await + .context("seek tmp file to beginning for upload")?; + remote_storage + .upload( + tmp_obj_file, + tmp_obj_size as usize, + &remote_dst_obj, + tmp_dl.metadata, + ) + .await + .context("upload modified")?; + + tokio::fs::remove_file(tmp_obj_filepath) + .await + .context("remove temp file")?; + + anyhow::Ok(()) + } + .instrument(info_span!("copy object", object_name=?name)) + .await + .context("copy object")?; + } + anyhow::Ok(()) + } + .instrument(info_span!("copy_timeline", timeline_id=%timeline_id)) + .await?; + } + + tokio::fs::remove_dir_all(&tempdir) + .await + .context("post-run clean up tempdir")?; + + attach_tenant(conf, new_tenant_id, generation, tenant_conf, resources, ctx).await +} + #[derive(Debug, thiserror::Error)] pub(crate) enum SetNewTenantConfigError { #[error(transparent)] diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 55fb491b65..a06494b8da 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -844,6 +844,49 @@ impl Drop for DeltaLayerWriter { } } +impl DeltaLayer { + /// Assume the file at `path` is corrupt if this function returns with an error. + pub(crate) async fn rewrite_tenant_timeline( + path: &Utf8Path, + new_tenant: TenantId, + new_timeline: TimelineId, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let file = VirtualFile::open_with_options( + path, + &*std::fs::OpenOptions::new().read(true).write(true), + ) + .await + .with_context(|| format!("Failed to open file '{}'", path))?; + let file = FileBlockReader::new(file); + let summary_blk = file.read_blk(0, ctx).await?; + let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; + let mut file = file.file; + if actual_summary.magic != DELTA_FILE_MAGIC { + bail!("File '{}' is not a delta layer", path); + } + let new_summary = Summary { + tenant_id: new_tenant, + timeline_id: new_timeline, + ..actual_summary + }; + + let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new(); + Summary::ser_into(&new_summary, &mut buf)?; + if buf.spilled() { + // The code in ImageLayerWriterInner just warn!()s for this. + // It should probably error out as well. + anyhow::bail!( + "Used more than one page size for summary buffer: {}", + buf.len() + ); + } + file.seek(SeekFrom::Start(0)).await?; + file.write_all(&buf).await?; + Ok(()) + } +} + impl DeltaLayerInner { pub(super) async fn load( path: &Utf8Path, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 94138a0786..412523f5a1 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -436,6 +436,49 @@ impl ImageLayer { } } +impl ImageLayer { + /// Assume the file at `path` is corrupt if this function returns with an error. + pub(crate) async fn rewrite_tenant_timeline( + path: &Utf8Path, + new_tenant: TenantId, + new_timeline: TimelineId, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let file = VirtualFile::open_with_options( + path, + &*std::fs::OpenOptions::new().read(true).write(true), + ) + .await + .with_context(|| format!("Failed to open file '{}'", path))?; + let file = FileBlockReader::new(file); + let summary_blk = file.read_blk(0, ctx).await?; + let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; + let mut file = file.file; + if actual_summary.magic != IMAGE_FILE_MAGIC { + bail!("File '{}' is not a delta layer", path); + } + let new_summary = Summary { + tenant_id: new_tenant, + timeline_id: new_timeline, + ..actual_summary + }; + + let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new(); + Summary::ser_into(&new_summary, &mut buf)?; + if buf.spilled() { + // The code in ImageLayerWriterInner just warn!()s for this. + // It should probably error out as well. + anyhow::bail!( + "Used more than one page size for summary buffer: {}", + buf.len() + ); + } + file.seek(SeekFrom::Start(0)).await?; + file.write_all(&buf).await?; + Ok(()) + } +} + impl ImageLayerInner { pub(super) async fn load( path: &Utf8Path, diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index d2c3715c52..aa70a0ca06 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -215,6 +215,25 @@ class PageserverHttpClient(requests.Session): assert isinstance(new_tenant_id, str) return TenantId(new_tenant_id) + def tenant_duplicate( + self, src_tenant_id: TenantId, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None + ) -> TenantId: + if conf is not None: + assert "new_tenant_id" not in conf.keys() + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{src_tenant_id}/duplicate", + json={ + "new_tenant_id": str(new_tenant_id), + **(conf or {}), + }, + ) + self.verbose_error(res) + if res.status_code == 409: + raise Exception(f"could not create tenant: already exists for id {new_tenant_id}") + new_tenant_id = res.json() + assert isinstance(new_tenant_id, str) + return TenantId(new_tenant_id) + def tenant_attach( self, tenant_id: TenantId, diff --git a/test_runner/regress/test_tenant_duplicate.py b/test_runner/regress/test_tenant_duplicate.py new file mode 100644 index 0000000000..372ddfbf9c --- /dev/null +++ b/test_runner/regress/test_tenant_duplicate.py @@ -0,0 +1,54 @@ +import time +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + last_flush_lsn_upload, +) +from fixtures.remote_storage import ( + RemoteStorageKind, +) +from fixtures.types import TenantId +from fixtures.log_helper import log + + +def test_tenant_duplicate( + neon_env_builder: NeonEnvBuilder, +): + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + env = neon_env_builder.init_start() + + with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep_main: + ep_main.safe_psql("CREATE TABLE foo (i int);") + ep_main.safe_psql("INSERT INTO foo VALUES (1), (2), (3);") + last_flush_lsn = last_flush_lsn_upload( + env, ep_main, env.initial_tenant, env.initial_timeline + ) + + new_tenant_id = TenantId.generate() + # timeline id remains unchanged with tenant_duplicate + # TODO: implement a remapping scheme so timeline ids remain globally unique + new_timeline_id = env.initial_timeline + + log.info(f"Duplicate tenant/timeline will be: {new_tenant_id}/{new_timeline_id}") + + ps_http = env.pageserver.http_client() + + ps_http.tenant_duplicate(env.initial_tenant, new_tenant_id) + + ps_http.tenant_delete(env.initial_tenant) + + env.neon_cli.map_branch("duplicate", new_tenant_id, new_timeline_id) + + # start read-only replicate and validate + with env.endpoints.create_start( + "duplicate", tenant_id=new_tenant_id, lsn=last_flush_lsn + ) as ep_dup: + with ep_dup.connect() as conn: + with conn.cursor() as cur: + cur.execute("SELECT * FROM foo ORDER BY i;") + cur.fetchall() == [(1,), (2,), (3,)] + + # ensure restarting PS works + env.pageserver.stop() + env.pageserver.start() +