API to duplicate a tenant

This commit is contained in:
Christian Schwarz
2023-10-26 16:30:11 +00:00
parent bb8531d920
commit c5f58ef3f7
8 changed files with 364 additions and 3 deletions

View File

@@ -112,7 +112,7 @@ impl RemotePath {
self.0.file_name()
}
pub fn join(&self, segment: &Utf8Path) -> Self {
pub fn join<P: AsRef<Utf8Path>>(&self, segment: P) -> Self {
Self(self.0.join(segment))
}

View File

@@ -681,6 +681,45 @@ async fn tenant_ignore_handler(
json_response(StatusCode::OK, ())
}
async fn tenant_duplicate_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let src_tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let new_tenant_id = request_data.new_tenant_id;
check_permission(&request, None)?;
let _timer = STORAGE_TIME_GLOBAL
.get_metric_with_label_values(&[StorageTimeOperation::DuplicateTenant.into()])
.expect("bug")
.start_timer();
let tenant_conf =
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
let state = get_state(&request);
let generation = get_request_generation(state, request_data.generation)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
mgr::duplicate_tenant(
state.conf,
tenant_conf,
src_tenant_id,
new_tenant_id,
generation,
state.tenant_resources(),
&ctx,
)
.instrument(info_span!("tenant_duplicate", %src_tenant_id, tenant_id = %new_tenant_id))
.await?;
json_response(StatusCode::CREATED, TenantCreateResponse(new_tenant_id))
}
async fn tenant_list_handler(
request: Request<Body>,
_cancel: CancellationToken,
@@ -1728,6 +1767,9 @@ pub fn make_router(
.post("/v1/tenant/:tenant_id/ignore", |r| {
api_handler(r, tenant_ignore_handler)
})
.post("/v1/tenant/:tenant_id/duplicate", |r| {
api_handler(r, tenant_duplicate_handler)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
api_handler(r, timeline_detail_handler)
})

View File

@@ -51,6 +51,9 @@ pub enum StorageTimeOperation {
#[strum(serialize = "create tenant")]
CreateTenant,
#[strum(serialize = "duplicate tenant")]
DuplicateTenant,
}
pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {

View File

@@ -4,8 +4,10 @@
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use rand::{distributions::Alphanumeric, Rng};
use std::collections::{hash_map, HashMap};
use std::str::FromStr;
use std::sync::Arc;
use tokio::fs;
use tokio::io::AsyncSeekExt;
use anyhow::Context;
use once_cell::sync::Lazy;
@@ -26,9 +28,11 @@ use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::storage_layer::{DeltaLayer, ImageLayer, LayerFileName};
use crate::tenant::{
create_tenant_files, AttachMarkerMode, AttachedTenantConf, CreateTenantFilesMode, Tenant,
TenantState,
create_tenant_files, remote_timeline_client, AttachMarkerMode, AttachedTenantConf,
CreateTenantFilesMode, IndexPart, Tenant, TenantState,
};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
@@ -695,6 +699,159 @@ pub(crate) async fn create_tenant(
}).await
}
pub(crate) async fn duplicate_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
src_tenant_id: TenantId,
new_tenant_id: TenantId,
generation: Generation,
resources: TenantSharedResources,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
debug_assert_current_span_has_tenant_id();
// TODO: would be nice to use tenant_map_insert here, but, we're not ready to create a Tenant object yet
let tempdir = path_with_suffix_extension(
conf.tenants_path().join(&new_tenant_id.to_string()),
&format!("duplication.{TEMP_FILE_SUFFIX}"),
);
tokio::fs::remove_dir_all(&tempdir)
.await
.or_else(|e| match e.kind() {
std::io::ErrorKind::NotFound => Ok(()),
_ => Err(e),
})
.context("pre-run clean up tempdir")?;
tokio::fs::create_dir(&tempdir)
.await
.context("create tempdir")?;
// Copy the tenant's data in S3
let remote_storage = resources
.remote_storage
.as_ref()
.context("only works with remote storage")?;
let remote_src_timelines =
remote_timeline_client::list_remote_timelines(remote_storage, src_tenant_id)
.await
.context("list src timelines")?;
info!(?remote_src_timelines, "got src timelines");
for timeline_id in remote_src_timelines {
async {
let tempdir = tempdir.join(&timeline_id.to_string());
tokio::fs::create_dir(&tempdir)
.await
.context("create tempdir for timeline")?;
let remote_src_tl =
remote_timeline_client::remote_timeline_path(&src_tenant_id, &timeline_id);
let remote_dst_tl =
remote_timeline_client::remote_timeline_path(&new_tenant_id, &timeline_id);
let object_names = remote_storage
.list_prefixes(Some(&remote_src_tl))
.await
.context("list timeline remote prefix")?;
for name in object_names {
async {
let name = name.object_name().context(
"list_prefixes return values should always have object_name()=Some",
)?;
let remote_src_obj = remote_src_tl.join(name);
let remote_dst_obj = remote_dst_tl.join(name);
let tmp_obj_filepath = tempdir.join(name);
let mut tmp_obj_file = tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(&tmp_obj_filepath)
.await
.context("create temp file")?;
let mut tmp_dl = remote_storage
.download(&remote_src_obj)
.await
.context("start download")?;
let tmp_obj_size =
tokio::io::copy(&mut tmp_dl.download_stream, &mut tmp_obj_file)
.await
.context("do the download")?;
if name == IndexPart::FILE_NAME {
// needs no patching
} else {
let name = LayerFileName::from_str(name).map_err(|e: String| {
anyhow::anyhow!("unknown key in timeline s3 prefix: {name:?}: {e}")
})?;
match name {
LayerFileName::Image(_) => {
ImageLayer::rewrite_tenant_timeline(
&tmp_obj_filepath,
new_tenant_id,
timeline_id, /* leave as is */
ctx,
)
.await
.context("rewrite tenant timeline")?;
}
LayerFileName::Delta(_) => {
DeltaLayer::rewrite_tenant_timeline(
&tmp_obj_filepath,
new_tenant_id,
timeline_id, /* leave as is */
ctx,
)
.await
.context("rewrite tenant timeline")?;
}
}
}
info!(?remote_dst_obj, "uploading");
tmp_obj_file
.seek(std::io::SeekFrom::Start(0))
.await
.context("seek tmp file to beginning for upload")?;
remote_storage
.upload(
tmp_obj_file,
tmp_obj_size as usize,
&remote_dst_obj,
tmp_dl.metadata,
)
.await
.context("upload modified")?;
tokio::fs::remove_file(tmp_obj_filepath)
.await
.context("remove temp file")?;
anyhow::Ok(())
}
.instrument(info_span!("copy object", object_name=?name))
.await
.context("copy object")?;
}
anyhow::Ok(())
}
.instrument(info_span!("copy_timeline", timeline_id=%timeline_id))
.await?;
}
tokio::fs::remove_dir_all(&tempdir)
.await
.context("post-run clean up tempdir")?;
attach_tenant(conf, new_tenant_id, generation, tenant_conf, resources, ctx).await
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum SetNewTenantConfigError {
#[error(transparent)]

View File

@@ -844,6 +844,49 @@ impl Drop for DeltaLayerWriter {
}
}
impl DeltaLayer {
/// Assume the file at `path` is corrupt if this function returns with an error.
pub(crate) async fn rewrite_tenant_timeline(
path: &Utf8Path,
new_tenant: TenantId,
new_timeline: TimelineId,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let mut file = file.file;
if actual_summary.magic != DELTA_FILE_MAGIC {
bail!("File '{}' is not a delta layer", path);
}
let new_summary = Summary {
tenant_id: new_tenant,
timeline_id: new_timeline,
..actual_summary
};
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf)?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
anyhow::bail!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl DeltaLayerInner {
pub(super) async fn load(
path: &Utf8Path,

View File

@@ -436,6 +436,49 @@ impl ImageLayer {
}
}
impl ImageLayer {
/// Assume the file at `path` is corrupt if this function returns with an error.
pub(crate) async fn rewrite_tenant_timeline(
path: &Utf8Path,
new_tenant: TenantId,
new_timeline: TimelineId,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let mut file = file.file;
if actual_summary.magic != IMAGE_FILE_MAGIC {
bail!("File '{}' is not a delta layer", path);
}
let new_summary = Summary {
tenant_id: new_tenant,
timeline_id: new_timeline,
..actual_summary
};
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf)?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
anyhow::bail!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl ImageLayerInner {
pub(super) async fn load(
path: &Utf8Path,

View File

@@ -215,6 +215,25 @@ class PageserverHttpClient(requests.Session):
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_duplicate(
self, src_tenant_id: TenantId, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None
) -> TenantId:
if conf is not None:
assert "new_tenant_id" not in conf.keys()
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{src_tenant_id}/duplicate",
json={
"new_tenant_id": str(new_tenant_id),
**(conf or {}),
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create tenant: already exists for id {new_tenant_id}")
new_tenant_id = res.json()
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_attach(
self,
tenant_id: TenantId,

View File

@@ -0,0 +1,54 @@
import time
from fixtures.neon_fixtures import (
NeonEnvBuilder,
last_flush_lsn_upload,
)
from fixtures.remote_storage import (
RemoteStorageKind,
)
from fixtures.types import TenantId
from fixtures.log_helper import log
def test_tenant_duplicate(
neon_env_builder: NeonEnvBuilder,
):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep_main:
ep_main.safe_psql("CREATE TABLE foo (i int);")
ep_main.safe_psql("INSERT INTO foo VALUES (1), (2), (3);")
last_flush_lsn = last_flush_lsn_upload(
env, ep_main, env.initial_tenant, env.initial_timeline
)
new_tenant_id = TenantId.generate()
# timeline id remains unchanged with tenant_duplicate
# TODO: implement a remapping scheme so timeline ids remain globally unique
new_timeline_id = env.initial_timeline
log.info(f"Duplicate tenant/timeline will be: {new_tenant_id}/{new_timeline_id}")
ps_http = env.pageserver.http_client()
ps_http.tenant_duplicate(env.initial_tenant, new_tenant_id)
ps_http.tenant_delete(env.initial_tenant)
env.neon_cli.map_branch("duplicate", new_tenant_id, new_timeline_id)
# start read-only replicate and validate
with env.endpoints.create_start(
"duplicate", tenant_id=new_tenant_id, lsn=last_flush_lsn
) as ep_dup:
with ep_dup.connect() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM foo ORDER BY i;")
cur.fetchall() == [(1,), (2,), (3,)]
# ensure restarting PS works
env.pageserver.stop()
env.pageserver.start()