find a way to duplicate a tenant in local_fs

Use the script like so, against the tenant to duplicate:

    poetry run python3 ./test_runner/duplicate_tenant.py 7ea51af32d42bfe7fb93bf5f28114d09  200 8

backup of pageserver.toml

    d =1
    pg_distrib_dir ='/home/admin/neon-main/pg_install'
    http_auth_type ='Trust'
    pg_auth_type ='Trust'
    listen_http_addr ='127.0.0.1:9898'
    listen_pg_addr ='127.0.0.1:64000'
    broker_endpoint ='http://127.0.0.1:50051/'
    #control_plane_api ='http://127.0.0.1:1234/'

    # Initial configuration file created by 'pageserver --init'
    #listen_pg_addr = '127.0.0.1:64000'
    #listen_http_addr = '127.0.0.1:9898'

    #wait_lsn_timeout = '60 s'
    #wal_redo_timeout = '60 s'

    #max_file_descriptors = 10000
    #page_cache_size = 160000

    # initial superuser role name to use when creating a new tenant
    #initial_superuser_name = 'cloud_admin'

    #broker_endpoint = 'http://127.0.0.1:50051'

    #log_format = 'plain'

    #concurrent_tenant_size_logical_size_queries = '1'

    #metric_collection_interval = '10 min'
    #cached_metric_collection_interval = '0s'
    #synthetic_size_calculation_interval = '10 min'

    #disk_usage_based_eviction = { max_usage_pct = .., min_avail_bytes = .., period = "10s"}

    #background_task_maximum_delay = '10s'

    [tenant_config]
    #checkpoint_distance = 268435456 # in bytes
    #checkpoint_timeout = 10 m
    #compaction_target_size = 134217728 # in bytes
    #compaction_period = '20 s'
    #compaction_threshold = 10

    #gc_period = '1 hr'
    #gc_horizon = 67108864
    #image_creation_threshold = 3
    #pitr_interval = '7 days'

    #min_resident_size_override = .. # in bytes
    #evictions_low_residence_duration_metric_threshold = '24 hour'
    #gc_feedback = false

    # make it determinsitic
    gc_period = '0s'
    checkpoint_timeout = '3650 day'
    compaction_period = '20 s'
    compaction_threshold = 10
    compaction_target_size = 134217728
    checkpoint_distance = 268435456
    image_creation_threshold = 3

    [remote_storage]
    local_path = '/home/admin/neon-main/bench_repo_dir/repo/remote_storage_local_fs'

remove http handler

switch to generalized rewrite_summary  & impl page_ctl subcommand to use it

WIP: change duplicate_tenant.py script to use the pagectl command

The script works but at restart, we detach the created tenants because
they're not known to the attachment service:

  Detaching tenant, control plane omitted it in re-attach response tenant_id=1e399d390e3aee6b11c701cbc716bb6c

=> figure out how to further integrate this
This commit is contained in:
Christian Schwarz
2023-10-26 16:30:11 +00:00
parent 1b81640290
commit ccb9fe9b33
16 changed files with 317 additions and 27 deletions

View File

@@ -371,6 +371,8 @@ pub struct TenantInfo {
/// If a layer is present in both local FS and S3, it counts only once.
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
pub attachment_status: TenantAttachmentStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
@@ -515,6 +517,8 @@ pub enum HistoricLayerInfo {
lsn_end: Lsn,
remote: bool,
access_stats: LayerAccessStats,
remote_path: Option<String>,
},
Image {
layer_file_name: String,
@@ -523,6 +527,8 @@ pub enum HistoricLayerInfo {
lsn_start: Lsn,
remote: bool,
access_stats: LayerAccessStats,
remote_path: Option<String>,
},
}
@@ -862,6 +868,7 @@ mod tests {
state: TenantState::Active,
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: None,
};
let expected_active = json!({
"id": original_active.id.to_string(),
@@ -882,6 +889,7 @@ mod tests {
},
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: None,
};
let expected_broken = json!({
"id": original_broken.id.to_string(),

View File

@@ -81,6 +81,12 @@ impl std::fmt::Display for RemotePath {
}
}
impl From<RemotePath> for String {
fn from(val: RemotePath) -> Self {
val.0.into()
}
}
impl RemotePath {
pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
anyhow::ensure!(
@@ -102,7 +108,7 @@ impl RemotePath {
self.0.file_name()
}
pub fn join(&self, segment: &Utf8Path) -> Self {
pub fn join<P: AsRef<Utf8Path>>(&self, segment: P) -> Self {
Self(self.0.join(segment))
}

View File

@@ -1,13 +1,15 @@
use std::path::{Path, PathBuf};
use anyhow::Result;
use camino::Utf8Path;
use camino::{Utf8Path, Utf8PathBuf};
use clap::Subcommand;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::{page_cache, virtual_file};
use pageserver::{
@@ -20,6 +22,7 @@ use pageserver::{
};
use std::fs;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
use crate::layer_map_analyzer::parse_filename;
@@ -45,6 +48,13 @@ pub(crate) enum LayerCmd {
/// The id from list-layer command
id: usize,
},
RewriteSummary {
layer_file_path: Utf8PathBuf,
#[clap(long)]
new_tenant_id: Option<TenantId>,
#[clap(long)]
new_timeline_id: Option<TimelineId>,
},
}
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
@@ -100,6 +110,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
println!("- timeline {}", timeline.file_name().to_string_lossy());
}
}
Ok(())
}
LayerCmd::ListLayer {
path,
@@ -128,6 +139,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
idx += 1;
}
}
Ok(())
}
LayerCmd::DumpLayer {
path,
@@ -168,7 +180,63 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
idx += 1;
}
}
Ok(())
}
LayerCmd::RewriteSummary {
layer_file_path,
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(10);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
macro_rules! rewrite_closure {
($($summary_ty:tt)*) => {{
|summary| $($summary_ty)* {
tenant_id: new_tenant_id.unwrap_or(summary.tenant_id),
timeline_id: new_timeline_id.unwrap_or(summary.timeline_id),
..summary
}
}};
}
let res = ImageLayer::rewrite_summary(
layer_file_path,
rewrite_closure!(image_layer::Summary),
&ctx,
)
.await;
match res {
Ok(()) => {
println!("Successfully rewrote summary of image layer {layer_file_path}");
return Ok(());
}
Err(image_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
Err(image_layer::RewriteSummaryError::Other(e)) => {
return Err(e);
}
}
let res = DeltaLayer::rewrite_summary(
layer_file_path,
rewrite_closure!(delta_layer::Summary),
&ctx,
)
.await;
match res {
Ok(()) => {
println!("Successfully rewrote summary of delta layer {layer_file_path}");
return Ok(());
}
Err(delta_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
Err(delta_layer::RewriteSummaryError::Other(e)) => {
return Err(e);
}
}
anyhow::bail!("not an image or delta layer: {layer_file_path}");
}
}
Ok(())
}

View File

@@ -261,7 +261,7 @@ async fn calculate_synthetic_size_worker(
}
};
for (tenant_id, tenant_state) in tenants {
for (tenant_id, tenant_state, _gen) in tenants {
if tenant_state != TenantState::Active {
continue;
}

View File

@@ -197,7 +197,7 @@ pub(super) async fn collect_all_metrics(
}
};
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
if state != TenantState::Active {
None
} else {

View File

@@ -345,7 +345,7 @@ impl DeletionList {
result.extend(
timeline_layers
.into_iter()
.map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
.map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
);
}
}

View File

@@ -541,7 +541,7 @@ async fn collect_eviction_candidates(
let mut candidates = Vec::new();
for (tenant_id, _state) in &tenants {
for (tenant_id, _state, _gen) in &tenants {
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}

View File

@@ -764,11 +764,12 @@ async fn tenant_list_handler(
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
})?
.iter()
.map(|(id, state)| TenantInfo {
.map(|(id, state, gen)| TenantInfo {
id: *id,
state: state.clone(),
current_physical_size: None,
attachment_status: state.attachment_status(),
generation: (*gen).into(),
})
.collect::<Vec<TenantInfo>>();
@@ -797,6 +798,7 @@ async fn tenant_status(
state: state.clone(),
current_physical_size: Some(current_physical_size),
attachment_status: state.attachment_status(),
generation: tenant.generation().into(),
})
}
.instrument(info_span!("tenant_status_handler", %tenant_id))

View File

@@ -1715,6 +1715,10 @@ impl Tenant {
self.current_state() == TenantState::Active
}
pub fn generation(&self) -> Generation {
self.generation
}
/// Changes tenant status to active, unless shutdown was already requested.
///
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup

View File

@@ -1397,7 +1397,8 @@ pub(crate) enum TenantMapListError {
///
/// Get list of tenants, for the mgmt API
///
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
pub(crate) async fn list_tenants(
) -> Result<Vec<(TenantId, TenantState, Generation)>, TenantMapListError> {
let tenants = TENANTS.read().unwrap();
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
@@ -1405,12 +1406,12 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
};
Ok(m.iter()
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => Some((id, tenant.current_state())),
TenantSlot::Attached(tenant) => Some((id, tenant.current_state(), tenant.generation())),
TenantSlot::Secondary => None,
TenantSlot::InProgress(_) => None,
})
// TODO(sharding): make callers of this function shard-aware
.map(|(k, v)| (k.tenant_id, v))
.map(|(a, b, c)| (a.tenant_id, b, c))
.collect())
}

View File

@@ -2,7 +2,7 @@
pub mod delta_layer;
mod filename;
mod image_layer;
pub mod image_layer;
mod inmemory_layer;
mod layer;
mod layer_desc;

View File

@@ -69,13 +69,13 @@ use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Summary {
/// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
magic: u16,
format_version: u16,
pub magic: u16,
pub format_version: u16,
tenant_id: TenantId,
timeline_id: TimelineId,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub key_range: Range<Key>,
pub lsn_range: Range<Lsn>,
/// Block number where the 'index' part of the file begins.
pub index_start_blk: u32,
@@ -611,6 +611,61 @@ impl Drop for DeltaLayerWriter {
}
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
MagicMismatch,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl From<std::io::Error> for RewriteSummaryError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::anyhow!(e))
}
}
impl DeltaLayer {
pub async fn rewrite_summary<F>(
path: &Utf8Path,
rewrite: F,
ctx: &RequestContext,
) -> Result<(), RewriteSummaryError>
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
let mut file = file.file;
if actual_summary.magic != DELTA_FILE_MAGIC {
return Err(RewriteSummaryError::MagicMismatch);
}
let new_summary = rewrite(actual_summary);
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in DeltaLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl DeltaLayerInner {
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure

View File

@@ -67,20 +67,20 @@ use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
/// the 'index' starts at the block indicated by 'index_start_blk'
///
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct Summary {
pub struct Summary {
/// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
magic: u16,
format_version: u16,
pub magic: u16,
pub format_version: u16,
tenant_id: TenantId,
timeline_id: TimelineId,
key_range: Range<Key>,
lsn: Lsn,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub key_range: Range<Key>,
pub lsn: Lsn,
/// Block number where the 'index' part of the file begins.
index_start_blk: u32,
pub index_start_blk: u32,
/// Block within the 'index', where the B-tree root page is stored
index_root_blk: u32,
pub index_root_blk: u32,
// the 'values' part starts after the summary header, on block 1.
}
@@ -296,6 +296,61 @@ impl ImageLayer {
}
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
MagicMismatch,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl From<std::io::Error> for RewriteSummaryError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::anyhow!(e))
}
}
impl ImageLayer {
pub async fn rewrite_summary<F>(
path: &Utf8Path,
rewrite: F,
ctx: &RequestContext,
) -> Result<(), RewriteSummaryError>
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
let mut file = file.file;
if actual_summary.magic != IMAGE_FILE_MAGIC {
return Err(RewriteSummaryError::MagicMismatch);
}
let new_summary = rewrite(actual_summary);
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl ImageLayerInner {
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure

View File

@@ -3,6 +3,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::models::{
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
};
use remote_storage::RemotePath;
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
@@ -305,6 +306,12 @@ impl Layer {
&self.0.path
}
/// This can return None even though it should return Some in some edge cases.
#[allow(unused)]
pub(crate) fn remote_path(&self) -> Option<RemotePath> {
self.0.remote_path()
}
pub(crate) fn metadata(&self) -> LayerFileMetadata {
self.0.metadata()
}
@@ -918,6 +925,17 @@ impl LayerInner {
}
}
/// This can return None even though it should return Some in some edge cases.
fn remote_path(&self) -> Option<RemotePath> {
let tl = self.timeline.upgrade()?; // TODO: should distinguish this case, but, accuracy doesn't matter for this field.
Some(crate::tenant::remote_timeline_client::remote_layer_path(
&tl.tenant_id,
&tl.timeline_id,
&self.desc.filename(),
self.generation,
))
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.desc.filename().file_name();
@@ -937,6 +955,7 @@ impl LayerInner {
lsn_end: lsn_range.end,
remote,
access_stats,
remote_path: self.remote_path().map(|p| p.into()),
}
} else {
let lsn = self.desc.image_layer_lsn();
@@ -947,6 +966,7 @@ impl LayerInner {
lsn_start: lsn,
remote,
access_stats,
remote_path: self.remote_path().map(|p| p.into()),
}
}
}

View File

@@ -0,0 +1,69 @@
# Usage from top of repo:
# poetry run python3 ./test_runner/duplicate_tenant.py c66e2e233057f7f05563caff664ecb14 .neon/remote_storage_local_fs
import argparse
import shutil
import subprocess
import time
from pathlib import Path
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.types import TenantId
parser = argparse.ArgumentParser(description="Duplicate tenant script.")
parser.add_argument("initial_tenant", type=str, help="Initial tenant")
parser.add_argument("remote_storage_local_fs_root", type=Path, help="Remote storage local fs root")
parser.add_argument("--ncopies", type=int, help="Number of copies")
parser.add_argument("--numthreads", type=int, default=1, help="Number of threads")
parser.add_argument("--port", type=int, default=9898, help="Pageserver management api port")
args = parser.parse_args()
initial_tenant = args.initial_tenant
remote_storage_local_fs_root: Path = args.remote_storage_local_fs_root
ncopies = args.ncopies
numthreads = args.numthreads
new_tenant = TenantId.generate()
print(f"New tenant: {new_tenant}")
client = PageserverHttpClient(args.port, lambda: None)
src_tenant_gen = int(client.tenant_status(initial_tenant)["generation"])
assert remote_storage_local_fs_root.is_dir(), f"{remote_storage_local_fs_root} is not a directory"
src_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / initial_tenant / "timelines"
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
dst_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / str(new_tenant) / "timelines"
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
for tl in src_timelines_dir.iterdir():
src_tl_dir = src_timelines_dir / tl.name
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
dst_tl_dir = dst_timelines_dir / tl.name
dst_tl_dir.mkdir(parents=False, exist_ok=False)
for file in tl.iterdir():
shutil.copy2(file, dst_tl_dir)
if "__" in file.name:
cmd = [
"./target/debug/pagectl", # TODO: abstract this like the other binaries
"layer",
"rewrite-summary",
str(dst_tl_dir / file.name),
"--new-tenant-id",
str(new_tenant),
]
subprocess.run(cmd, check=True)
client.tenant_attach(new_tenant, generation=src_tenant_gen)
while True:
status = client.tenant_status(new_tenant)
if status["state"]["slug"] == "Active":
break
print("Waiting for tenant to be active..., is: " + status["state"]["slug"])
time.sleep(1)
print("Tenant is active: " + str(new_tenant))

View File

@@ -58,6 +58,7 @@ class HistoricLayerInfo:
lsn_start: str
lsn_end: Optional[str]
remote: bool
remote_path: Optional[str] = None
@classmethod
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
@@ -68,6 +69,7 @@ class HistoricLayerInfo:
lsn_start=d["lsn_start"],
lsn_end=d.get("lsn_end"),
remote=d["remote"],
remote_path=d.get("remote_path"),
)