pageserver: add Generation type to Tenant, Timeline & Index

This commit is contained in:
John Spray
2023-08-24 14:50:35 +01:00
parent dd033d9138
commit 930de712ee
6 changed files with 94 additions and 16 deletions

View File

@@ -85,6 +85,7 @@ pub use pageserver_api::models::TenantState;
use toml_edit;
use utils::{
crashsafe,
generation::Generation,
id::{TenantId, TimelineId},
lsn::{Lsn, RecordLsn},
};
@@ -178,6 +179,10 @@ pub struct Tenant {
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenant_id: TenantId,
// The remote storage generation, used to protect S3 objects from split-brain
generation: Generation,
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
// This mutex prevents creation of new timelines during GC.
// Adding yet another mutex (in addition to `timelines`) is needed because holding
@@ -522,6 +527,7 @@ impl Tenant {
pub(crate) fn spawn_attach(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
broker_client: storage_broker::BrokerClientChannel,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
remote_storage: GenericRemoteStorage,
@@ -538,6 +544,7 @@ impl Tenant {
tenant_conf,
wal_redo_manager,
tenant_id,
generation,
Some(remote_storage.clone()),
));
@@ -851,6 +858,7 @@ impl Tenant {
TenantConfOpt::default(),
wal_redo_manager,
tenant_id,
Generation::broken(),
None,
))
}
@@ -868,6 +876,7 @@ impl Tenant {
pub(crate) fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
@@ -893,6 +902,7 @@ impl Tenant {
tenant_conf,
wal_redo_manager,
tenant_id,
generation,
remote_storage.clone(),
);
let tenant = Arc::new(tenant);
@@ -2274,6 +2284,7 @@ impl Tenant {
ancestor,
new_timeline_id,
self.tenant_id,
self.generation,
Arc::clone(&self.walredo_mgr),
resources,
pg_version,
@@ -2291,6 +2302,7 @@ impl Tenant {
tenant_conf: TenantConfOpt,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenant_id: TenantId,
generation: Generation,
remote_storage: Option<GenericRemoteStorage>,
) -> Tenant {
let (state, mut rx) = watch::channel(state);
@@ -2349,6 +2361,7 @@ impl Tenant {
Tenant {
tenant_id,
generation,
conf,
// using now here is good enough approximation to catch tenants with really long
// activation times.
@@ -3454,6 +3467,7 @@ pub mod harness {
pub conf: &'static PageServerConf,
pub tenant_conf: TenantConf,
pub tenant_id: TenantId,
pub generation: Generation,
}
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
@@ -3495,6 +3509,7 @@ pub mod harness {
conf,
tenant_conf,
tenant_id,
generation: Generation::new(0xdeadbeef),
})
}
@@ -3521,6 +3536,7 @@ pub mod harness {
TenantConfOpt::from(self.tenant_conf),
walredo_mgr,
self.tenant_id,
self.generation,
remote_storage,
));
tenant

View File

@@ -25,6 +25,7 @@ use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantSt
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
use utils::fs_ext::PathExt;
use utils::generation::Generation;
use utils::id::{TenantId, TimelineId};
use super::delete::DeleteTenantError;
@@ -202,6 +203,7 @@ pub(crate) fn schedule_local_tenant_processing(
match Tenant::spawn_attach(
conf,
tenant_id,
Generation::none(),
resources.broker_client,
tenants,
remote_storage,
@@ -224,7 +226,15 @@ pub(crate) fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(conf, tenant_id, resources, init_order, tenants, ctx)
Tenant::spawn_load(
conf,
tenant_id,
Generation::none(),
resources,
init_order,
tenants,
ctx,
)
};
Ok(tenant)
}

View File

@@ -1367,7 +1367,7 @@ mod tests {
context::RequestContext,
tenant::{
harness::{TenantHarness, TIMELINE_ID},
Tenant, Timeline,
Generation, Tenant, Timeline,
},
DEFAULT_PG_VERSION,
};
@@ -1542,16 +1542,18 @@ mod tests {
std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
}
let generation = Generation::new(0xdeadbeef);
client
.schedule_layer_file_upload(
&layer_file_name_1,
&LayerFileMetadata::new(content_1.len() as u64),
&LayerFileMetadata::new(content_1.len() as u64, generation),
)
.unwrap();
client
.schedule_layer_file_upload(
&layer_file_name_2,
&LayerFileMetadata::new(content_2.len() as u64),
&LayerFileMetadata::new(content_2.len() as u64, generation),
)
.unwrap();
@@ -1615,7 +1617,7 @@ mod tests {
client
.schedule_layer_file_upload(
&layer_file_name_3,
&LayerFileMetadata::new(content_3.len() as u64),
&LayerFileMetadata::new(content_3.len() as u64, generation),
)
.unwrap();
client
@@ -1703,12 +1705,14 @@ mod tests {
// Test
let generation = Generation::new(0xdeadbeef);
let init = get_bytes_started_stopped();
client
.schedule_layer_file_upload(
&layer_file_name_1,
&LayerFileMetadata::new(content_1.len() as u64),
&LayerFileMetadata::new(content_1.len() as u64, generation),
)
.unwrap();

View File

@@ -12,6 +12,7 @@ use utils::bin_ser::SerializeError;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::upload_queue::UploadQueueInitialized;
use crate::tenant::Generation;
use utils::lsn::Lsn;
@@ -23,19 +24,26 @@ use utils::lsn::Lsn;
#[cfg_attr(test, derive(Default))]
pub struct LayerFileMetadata {
file_size: u64,
// Optional for backward compatibility: older data will not have a generation set
generation: Option<Generation>,
}
impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
fn from(other: &IndexLayerMetadata) -> Self {
LayerFileMetadata {
file_size: other.file_size,
generation: other.generation,
}
}
}
impl LayerFileMetadata {
pub fn new(file_size: u64) -> Self {
LayerFileMetadata { file_size }
pub fn new(file_size: u64, generation: Generation) -> Self {
LayerFileMetadata {
file_size,
generation: Some(generation),
}
}
pub fn file_size(&self) -> u64 {
@@ -138,12 +146,17 @@ impl TryFrom<&UploadQueueInitialized> for IndexPart {
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
pub struct IndexLayerMetadata {
pub(super) file_size: u64,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) generation: Option<Generation>,
}
impl From<&'_ LayerFileMetadata> for IndexLayerMetadata {
fn from(other: &'_ LayerFileMetadata) -> Self {
IndexLayerMetadata {
file_size: other.file_size,
generation: other.generation,
}
}
}
@@ -172,11 +185,13 @@ mod tests {
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
generation: None,
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: None,
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -209,11 +224,13 @@ mod tests {
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
generation: None
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: None
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -247,11 +264,13 @@ mod tests {
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
generation: None
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: None
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),

View File

@@ -67,6 +67,7 @@ use postgres_connection::PgConnectionConfig;
use postgres_ffi::to_pg_timestamp;
use utils::{
completion,
generation::Generation,
id::{TenantId, TimelineId},
lsn::{AtomicLsn, Lsn, RecordLsn},
seqwait::SeqWait,
@@ -152,6 +153,9 @@ pub struct Timeline {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
// The generation of the tenant that instantiated us: this is used for safety when writing remote objects
generation: Generation,
pub pg_version: u32,
/// The tuple has two elements.
@@ -1198,7 +1202,7 @@ impl Timeline {
Ok(delta) => Some(delta),
};
let layer_metadata = LayerFileMetadata::new(layer_file_size);
let layer_metadata = LayerFileMetadata::new(layer_file_size, self.generation);
let new_remote_layer = Arc::new(match local_layer.filename() {
LayerFileName::Image(image_name) => RemoteLayer::new_img(
@@ -1376,6 +1380,7 @@ impl Timeline {
ancestor: Option<Arc<Timeline>>,
timeline_id: TimelineId,
tenant_id: TenantId,
generation: Generation,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
resources: TimelineResources,
pg_version: u32,
@@ -1405,6 +1410,7 @@ impl Timeline {
myself: myself.clone(),
timeline_id,
tenant_id,
generation,
pg_version,
layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
wanted_image_layers: Mutex::new(None),
@@ -1614,6 +1620,9 @@ impl Timeline {
let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id);
let span = tracing::Span::current();
// Copy to move into the task we're about to spawn
let generation = self.generation;
let (loaded_layers, to_sync, total_physical_size) = tokio::task::spawn_blocking({
move || {
let _g = span.entered();
@@ -1655,8 +1664,12 @@ impl Timeline {
);
}
let decided =
init::reconcile(discovered_layers, index_part.as_ref(), disk_consistent_lsn);
let decided = init::reconcile(
discovered_layers,
index_part.as_ref(),
disk_consistent_lsn,
generation,
);
let mut loaded_layers = Vec::new();
let mut needs_upload = Vec::new();
@@ -2659,7 +2672,7 @@ impl Timeline {
(
HashMap::from([(
layer.filename(),
LayerFileMetadata::new(layer.layer_desc().file_size),
LayerFileMetadata::new(layer.layer_desc().file_size, self.generation),
)]),
Some(layer),
)
@@ -3055,7 +3068,10 @@ impl Timeline {
.metadata()
.with_context(|| format!("reading metadata of layer file {}", path.file_name()))?;
layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len()));
layer_paths_to_upload.insert(
path,
LayerFileMetadata::new(metadata.len(), self.generation),
);
self.metrics
.resident_physical_size_gauge
@@ -3730,7 +3746,7 @@ impl Timeline {
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_upload(
&l.filename(),
&LayerFileMetadata::new(metadata.len()),
&LayerFileMetadata::new(metadata.len(), self.generation),
)?;
}
@@ -3739,7 +3755,10 @@ impl Timeline {
.resident_physical_size_gauge
.add(metadata.len());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
new_layer_paths.insert(
new_delta_path,
LayerFileMetadata::new(metadata.len(), self.generation),
);
l.access_stats().record_residence_event(
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,

View File

@@ -7,6 +7,7 @@ use crate::{
index::{IndexPart, LayerFileMetadata},
},
storage_layer::LayerFileName,
Generation,
},
METADATA_FILE_NAME,
};
@@ -104,6 +105,7 @@ pub(super) fn reconcile(
discovered: Vec<(LayerFileName, u64)>,
index_part: Option<&IndexPart>,
disk_consistent_lsn: Lsn,
generation: Generation,
) -> Vec<(LayerFileName, Result<Decision, FutureLayer>)> {
use Decision::*;
@@ -112,7 +114,15 @@ pub(super) fn reconcile(
let mut discovered = discovered
.into_iter()
.map(|(name, file_size)| (name, (Some(LayerFileMetadata::new(file_size)), None)))
.map(|(name, file_size)| {
(
name,
// The generation here will be corrected to match IndexPart in the merge below, unless
// it is not in IndexPart, in which case using our current generation makes sense
// because it will be uploaded in this generation.
(Some(LayerFileMetadata::new(file_size, generation)), None),
)
})
.collect::<Collected>();
// merge any index_part information, when available