mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 05:20:38 +00:00
Compare commits
4 Commits
build_info
...
ci-run/pr-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b461841e53 | ||
|
|
5cf2b8de01 | ||
|
|
1ab0cfc8cb | ||
|
|
ca469be1cf |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -1133,9 +1133,7 @@ dependencies = [
|
||||
"compute_api",
|
||||
"flate2",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hyper",
|
||||
"metrics",
|
||||
"notify",
|
||||
"num_cpus",
|
||||
"opentelemetry",
|
||||
@@ -3013,6 +3011,7 @@ dependencies = [
|
||||
"serde_with",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -12,10 +12,8 @@ cfg-if.workspace = true
|
||||
clap.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
notify.workspace = true
|
||||
metrics.workspace = true
|
||||
num_cpus.workspace = true
|
||||
opentelemetry.workspace = true
|
||||
postgres.workspace = true
|
||||
|
||||
@@ -57,27 +57,18 @@ use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
use compute_tools::params::*;
|
||||
use compute_tools::spec::*;
|
||||
use metrics::set_build_info_metric;
|
||||
use utils::{project_build_tag, project_git_version};
|
||||
|
||||
// this is an arbitrary build tag. Fine as a default / for testing purposes
|
||||
// in-case of not-set environment var
|
||||
const BUILD_TAG_DEFAULT: &str = "latest";
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
project_build_tag!(BUILD_TAG);
|
||||
|
||||
fn main() -> Result<()> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
|
||||
let build_tag = option_env!("BUILD_TAG")
|
||||
.unwrap_or(BUILD_TAG_DEFAULT)
|
||||
.to_string();
|
||||
|
||||
info!("Version: {GIT_VERSION}");
|
||||
info!("Build_tag: {BUILD_TAG}");
|
||||
|
||||
set_build_info_metric(GIT_VERSION, BUILD_TAG);
|
||||
info!("build_tag: {build_tag}");
|
||||
|
||||
let matches = cli().get_matches();
|
||||
let pgbin_default = String::from("postgres");
|
||||
|
||||
@@ -11,8 +11,7 @@ use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIErr
|
||||
|
||||
use anyhow::Result;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{header::CONTENT_TYPE, Body, Method, Request, Response, Server, StatusCode};
|
||||
use metrics::{Encoder, TextEncoder};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use num_cpus;
|
||||
use serde_json;
|
||||
use tokio::task;
|
||||
@@ -52,20 +51,6 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
|
||||
}
|
||||
|
||||
// prometheus metrics
|
||||
(&Method::GET, "/metrics") => {
|
||||
let mut buffer = vec![];
|
||||
let metrics = metrics::gather();
|
||||
let encoder = TextEncoder::new();
|
||||
encoder.encode(&metrics, &mut buffer).unwrap();
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, encoder.format_type())
|
||||
.body(Body::from(buffer))
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
// Startup metrics in JSON format. Keep /metrics reserved for a possible
|
||||
// future use for Prometheus metrics format.
|
||||
(&Method::GET, "/metrics.json") => {
|
||||
|
||||
@@ -14,7 +14,6 @@ use pageserver_api::models::{
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use utils::{
|
||||
generation::Generation,
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
@@ -93,6 +92,22 @@ pub fn migrate_tenant(
|
||||
// Get a new generation
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
|
||||
fn build_location_config(
|
||||
mode: LocationConfigMode,
|
||||
generation: Option<u32>,
|
||||
secondary_conf: Option<LocationConfigSecondary>,
|
||||
) -> LocationConfig {
|
||||
LocationConfig {
|
||||
mode,
|
||||
generation,
|
||||
secondary_conf,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
shard_number: 0,
|
||||
shard_count: 0,
|
||||
shard_stripe_size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
let previous = attachment_service.inspect(tenant_id)?;
|
||||
let mut baseline_lsns = None;
|
||||
if let Some((generation, origin_ps_id)) = &previous {
|
||||
@@ -101,12 +116,7 @@ pub fn migrate_tenant(
|
||||
if origin_ps_id == &dest_ps.conf.id {
|
||||
println!("🔁 Already attached to {origin_ps_id}, freshening...");
|
||||
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
|
||||
let dest_conf = LocationConfig {
|
||||
mode: LocationConfigMode::AttachedSingle,
|
||||
generation: gen.map(Generation::new),
|
||||
secondary_conf: None,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
};
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
|
||||
dest_ps.location_config(tenant_id, dest_conf)?;
|
||||
println!("✅ Migration complete");
|
||||
return Ok(());
|
||||
@@ -114,24 +124,15 @@ pub fn migrate_tenant(
|
||||
|
||||
println!("🔁 Switching origin pageserver {origin_ps_id} to stale mode");
|
||||
|
||||
let stale_conf = LocationConfig {
|
||||
mode: LocationConfigMode::AttachedStale,
|
||||
generation: Some(Generation::new(*generation)),
|
||||
secondary_conf: None,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
};
|
||||
let stale_conf =
|
||||
build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None);
|
||||
origin_ps.location_config(tenant_id, stale_conf)?;
|
||||
|
||||
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps)?);
|
||||
}
|
||||
|
||||
let gen = attachment_service.attach_hook(tenant_id, dest_ps.conf.id)?;
|
||||
let dest_conf = LocationConfig {
|
||||
mode: LocationConfigMode::AttachedMulti,
|
||||
generation: gen.map(Generation::new),
|
||||
secondary_conf: None,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
};
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None);
|
||||
|
||||
println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
|
||||
dest_ps.location_config(tenant_id, dest_conf)?;
|
||||
@@ -170,12 +171,11 @@ pub fn migrate_tenant(
|
||||
}
|
||||
|
||||
// Downgrade to a secondary location
|
||||
let secondary_conf = LocationConfig {
|
||||
mode: LocationConfigMode::Secondary,
|
||||
generation: None,
|
||||
secondary_conf: Some(LocationConfigSecondary { warm: true }),
|
||||
tenant_conf: TenantConfig::default(),
|
||||
};
|
||||
let secondary_conf = build_location_config(
|
||||
LocationConfigMode::Secondary,
|
||||
None,
|
||||
Some(LocationConfigSecondary { warm: true }),
|
||||
);
|
||||
|
||||
println!(
|
||||
"💤 Switching to secondary mode on pageserver {}",
|
||||
@@ -188,12 +188,7 @@ pub fn migrate_tenant(
|
||||
"🔁 Switching to AttachedSingle mode on pageserver {}",
|
||||
dest_ps.conf.id
|
||||
);
|
||||
let dest_conf = LocationConfig {
|
||||
mode: LocationConfigMode::AttachedSingle,
|
||||
generation: gen.map(Generation::new),
|
||||
secondary_conf: None,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
};
|
||||
let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
|
||||
dest_ps.location_config(tenant_id, dest_conf)?;
|
||||
|
||||
println!("✅ Migration complete");
|
||||
|
||||
@@ -18,6 +18,7 @@ enum-map.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
hex.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ use serde_with::serde_as;
|
||||
use strum_macros;
|
||||
use utils::{
|
||||
completion,
|
||||
generation::Generation,
|
||||
history_buffer::HistoryBufferWithDropCounter,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -262,10 +261,19 @@ pub struct LocationConfig {
|
||||
pub mode: LocationConfigMode,
|
||||
/// If attaching, in what generation?
|
||||
#[serde(default)]
|
||||
pub generation: Option<Generation>,
|
||||
pub generation: Option<u32>,
|
||||
#[serde(default)]
|
||||
pub secondary_conf: Option<LocationConfigSecondary>,
|
||||
|
||||
// Shard parameters: if shard_count is nonzero, then other shard_* fields
|
||||
// must be set accurately.
|
||||
#[serde(default)]
|
||||
pub shard_number: u8,
|
||||
#[serde(default)]
|
||||
pub shard_count: u8,
|
||||
#[serde(default)]
|
||||
pub shard_stripe_size: u32,
|
||||
|
||||
// If requesting mode `Secondary`, configuration for that.
|
||||
// Custom storage configuration for the tenant, if any
|
||||
pub tenant_conf: TenantConfig,
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::{ops::RangeInclusive, str::FromStr};
|
||||
|
||||
use hex::FromHex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror;
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
|
||||
@@ -139,6 +140,89 @@ impl From<[u8; 18]> for TenantShardId {
|
||||
}
|
||||
}
|
||||
|
||||
/// For use within the context of a particular tenant, when we need to know which
|
||||
/// shard we're dealing with, but do not need to know the full ShardIdentity (because
|
||||
/// we won't be doing any page->shard mapping), and do not need to know the fully qualified
|
||||
/// TenantShardId.
|
||||
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
|
||||
pub struct ShardIndex {
|
||||
pub shard_number: ShardNumber,
|
||||
pub shard_count: ShardCount,
|
||||
}
|
||||
|
||||
impl ShardIndex {
|
||||
pub fn new(number: ShardNumber, count: ShardCount) -> Self {
|
||||
Self {
|
||||
shard_number: number,
|
||||
shard_count: count,
|
||||
}
|
||||
}
|
||||
pub fn unsharded() -> Self {
|
||||
Self {
|
||||
shard_number: ShardNumber(0),
|
||||
shard_count: ShardCount(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
|
||||
}
|
||||
|
||||
/// For use in constructing remote storage paths: concatenate this with a TenantId
|
||||
/// to get a fully qualified TenantShardId.
|
||||
///
|
||||
/// Backward compat: this function returns an empty string if Self::is_unsharded, such
|
||||
/// that the legacy pre-sharding remote key format is preserved.
|
||||
pub fn get_suffix(&self) -> String {
|
||||
if self.is_unsharded() {
|
||||
"".to_string()
|
||||
} else {
|
||||
format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ShardIndex {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ShardIndex {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// Debug is the same as Display: the compact hex representation
|
||||
write!(f, "{}", self)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::str::FromStr for ShardIndex {
|
||||
type Err = hex::FromHexError;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
// Expect format: 1 byte shard number, 1 byte shard count
|
||||
if s.len() == 4 {
|
||||
let bytes = s.as_bytes();
|
||||
let mut shard_parts: [u8; 2] = [0u8; 2];
|
||||
hex::decode_to_slice(bytes, &mut shard_parts)?;
|
||||
Ok(Self {
|
||||
shard_number: ShardNumber(shard_parts[0]),
|
||||
shard_count: ShardCount(shard_parts[1]),
|
||||
})
|
||||
} else {
|
||||
Err(hex::FromHexError::InvalidStringLength)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<[u8; 2]> for ShardIndex {
|
||||
fn from(b: [u8; 2]) -> Self {
|
||||
Self {
|
||||
shard_number: ShardNumber(b[0]),
|
||||
shard_count: ShardCount(b[1]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for TenantShardId {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
@@ -209,6 +293,151 @@ impl<'de> Deserialize<'de> for TenantShardId {
|
||||
}
|
||||
}
|
||||
|
||||
/// Stripe size in number of pages
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
|
||||
pub struct ShardStripeSize(pub u32);
|
||||
|
||||
/// Layout version: for future upgrades where we might change how the key->shard mapping works
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
|
||||
pub struct ShardLayout(u8);
|
||||
|
||||
const LAYOUT_V1: ShardLayout = ShardLayout(1);
|
||||
|
||||
/// Default stripe size in pages: 256MiB divided by 8kiB page size.
|
||||
const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
|
||||
|
||||
/// The ShardIdentity contains the information needed for one member of map
|
||||
/// to resolve a key to a shard, and then check whether that shard is ==self.
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
|
||||
pub struct ShardIdentity {
|
||||
pub layout: ShardLayout,
|
||||
pub number: ShardNumber,
|
||||
pub count: ShardCount,
|
||||
pub stripe_size: ShardStripeSize,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
|
||||
pub enum ShardConfigError {
|
||||
#[error("Invalid shard count")]
|
||||
InvalidCount,
|
||||
#[error("Invalid shard number")]
|
||||
InvalidNumber,
|
||||
#[error("Invalid stripe size")]
|
||||
InvalidStripeSize,
|
||||
}
|
||||
|
||||
impl ShardIdentity {
|
||||
/// An identity with number=0 count=0 is a "none" identity, which represents legacy
|
||||
/// tenants. Modern single-shard tenants should not use this: they should
|
||||
/// have number=0 count=1.
|
||||
pub fn unsharded() -> Self {
|
||||
Self {
|
||||
number: ShardNumber(0),
|
||||
count: ShardCount(0),
|
||||
layout: LAYOUT_V1,
|
||||
stripe_size: DEFAULT_STRIPE_SIZE,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.number == ShardNumber(0) && self.count == ShardCount(0)
|
||||
}
|
||||
|
||||
/// Count must be nonzero, and number must be < count. To construct
|
||||
/// the legacy case (count==0), use Self::unsharded instead.
|
||||
pub fn new(
|
||||
number: ShardNumber,
|
||||
count: ShardCount,
|
||||
stripe_size: ShardStripeSize,
|
||||
) -> Result<Self, ShardConfigError> {
|
||||
if count.0 == 0 {
|
||||
Err(ShardConfigError::InvalidCount)
|
||||
} else if number.0 > count.0 - 1 {
|
||||
Err(ShardConfigError::InvalidNumber)
|
||||
} else if stripe_size.0 == 0 {
|
||||
Err(ShardConfigError::InvalidStripeSize)
|
||||
} else {
|
||||
Ok(Self {
|
||||
number,
|
||||
count,
|
||||
layout: LAYOUT_V1,
|
||||
stripe_size,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for ShardIndex {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
if serializer.is_human_readable() {
|
||||
serializer.collect_str(self)
|
||||
} else {
|
||||
// Binary encoding is not used in index_part.json, but is included in anticipation of
|
||||
// switching various structures (e.g. inter-process communication, remote metadata) to more
|
||||
// compact binary encodings in future.
|
||||
let mut packed: [u8; 2] = [0; 2];
|
||||
packed[0] = self.shard_number.0;
|
||||
packed[1] = self.shard_count.0;
|
||||
packed.serialize(serializer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for ShardIndex {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
struct IdVisitor {
|
||||
is_human_readable_deserializer: bool,
|
||||
}
|
||||
|
||||
impl<'de> serde::de::Visitor<'de> for IdVisitor {
|
||||
type Value = ShardIndex;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
if self.is_human_readable_deserializer {
|
||||
formatter.write_str("value in form of hex string")
|
||||
} else {
|
||||
formatter.write_str("value in form of integer array([u8; 2])")
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
|
||||
where
|
||||
A: serde::de::SeqAccess<'de>,
|
||||
{
|
||||
let s = serde::de::value::SeqAccessDeserializer::new(seq);
|
||||
let id: [u8; 2] = Deserialize::deserialize(s)?;
|
||||
Ok(ShardIndex::from(id))
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
ShardIndex::from_str(v).map_err(E::custom)
|
||||
}
|
||||
}
|
||||
|
||||
if deserializer.is_human_readable() {
|
||||
deserializer.deserialize_str(IdVisitor {
|
||||
is_human_readable_deserializer: true,
|
||||
})
|
||||
} else {
|
||||
deserializer.deserialize_tuple(
|
||||
2,
|
||||
IdVisitor {
|
||||
is_human_readable_deserializer: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
@@ -318,4 +547,66 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_identity_validation() -> Result<(), ShardConfigError> {
|
||||
// Happy cases
|
||||
ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
|
||||
ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
|
||||
ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
|
||||
|
||||
assert_eq!(
|
||||
ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
|
||||
Err(ShardConfigError::InvalidCount)
|
||||
);
|
||||
assert_eq!(
|
||||
ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
|
||||
Err(ShardConfigError::InvalidNumber)
|
||||
);
|
||||
assert_eq!(
|
||||
ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
|
||||
Err(ShardConfigError::InvalidNumber)
|
||||
);
|
||||
assert_eq!(
|
||||
ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
|
||||
Err(ShardConfigError::InvalidNumber)
|
||||
);
|
||||
assert_eq!(
|
||||
ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
|
||||
Err(ShardConfigError::InvalidStripeSize)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
|
||||
let example = ShardIndex {
|
||||
shard_number: ShardNumber(13),
|
||||
shard_count: ShardCount(17),
|
||||
};
|
||||
let expected: String = "0d11".to_string();
|
||||
let encoded = format!("{example}");
|
||||
assert_eq!(&encoded, &expected);
|
||||
|
||||
let decoded = ShardIndex::from_str(&encoded)?;
|
||||
assert_eq!(example, decoded);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
|
||||
let example = ShardIndex {
|
||||
shard_number: ShardNumber(13),
|
||||
shard_count: ShardCount(17),
|
||||
};
|
||||
let expected: [u8; 2] = [0x0d, 0x11];
|
||||
|
||||
let encoded = bincode::serialize(&example).unwrap();
|
||||
assert_eq!(Hex(&encoded), Hex(&expected));
|
||||
let decoded = bincode::deserialize(&encoded).unwrap();
|
||||
assert_eq!(example, decoded);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::control_plane_client::ControlPlaneGenerationsApi;
|
||||
use crate::metrics;
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
use crate::tenant::remote_timeline_client::remote_timeline_path;
|
||||
use crate::tenant::remote_timeline_client::LayerFileMetadata;
|
||||
use crate::virtual_file::MaybeFatalIo;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use anyhow::Context;
|
||||
@@ -509,18 +510,19 @@ impl DeletionQueueClient {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerFileName, Generation)>,
|
||||
layers: Vec<(LayerFileName, LayerFileMetadata)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
if current_generation.is_none() {
|
||||
debug!("Enqueuing deletions in legacy mode, skipping queue");
|
||||
|
||||
let mut layer_paths = Vec::new();
|
||||
for (layer, generation) in layers {
|
||||
for (layer, meta) in layers {
|
||||
layer_paths.push(remote_layer_path(
|
||||
&tenant_id,
|
||||
&timeline_id,
|
||||
meta.shard,
|
||||
&layer,
|
||||
generation,
|
||||
meta.generation,
|
||||
));
|
||||
}
|
||||
self.push_immediate(layer_paths).await?;
|
||||
@@ -540,7 +542,7 @@ impl DeletionQueueClient {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerFileName, Generation)>,
|
||||
layers: Vec<(LayerFileName, LayerFileMetadata)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_submitted
|
||||
@@ -751,6 +753,7 @@ impl DeletionQueue {
|
||||
mod test {
|
||||
use camino::Utf8Path;
|
||||
use hex_literal::hex;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::{io::ErrorKind, time::Duration};
|
||||
use tracing::info;
|
||||
|
||||
@@ -990,6 +993,8 @@ mod test {
|
||||
// we delete, and the generation of the running Tenant.
|
||||
let layer_generation = Generation::new(0xdeadbeef);
|
||||
let now_generation = Generation::new(0xfeedbeef);
|
||||
let layer_metadata =
|
||||
LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
|
||||
|
||||
let remote_layer_file_name_1 =
|
||||
format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
|
||||
@@ -1013,7 +1018,7 @@ mod test {
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(layer_file_name_1.clone(), layer_generation)].to_vec(),
|
||||
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
|
||||
@@ -1052,6 +1057,8 @@ mod test {
|
||||
let stale_generation = latest_generation.previous();
|
||||
// Generation that our example layer file was written with
|
||||
let layer_generation = stale_generation.previous();
|
||||
let layer_metadata =
|
||||
LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
|
||||
|
||||
ctx.set_latest_generation(latest_generation);
|
||||
|
||||
@@ -1069,7 +1076,7 @@ mod test {
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
stale_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1084,7 +1091,7 @@ mod test {
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
latest_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1111,6 +1118,8 @@ mod test {
|
||||
|
||||
let layer_generation = Generation::new(0xdeadbeef);
|
||||
let now_generation = Generation::new(0xfeedbeef);
|
||||
let layer_metadata =
|
||||
LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
|
||||
|
||||
// Inject a deletion in the generation before generation_now: after restart,
|
||||
// this deletion should _not_ get executed (only the immediately previous
|
||||
@@ -1122,7 +1131,7 @@ mod test {
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
now_generation.previous(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1136,7 +1145,7 @@ mod test {
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_generation)].to_vec(),
|
||||
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1226,12 +1235,13 @@ pub(crate) mod mock {
|
||||
match msg {
|
||||
ListWriterQueueMessage::Delete(op) => {
|
||||
let mut objects = op.objects;
|
||||
for (layer, generation) in op.layers {
|
||||
for (layer, meta) in op.layers {
|
||||
objects.push(remote_layer_path(
|
||||
&op.tenant_id,
|
||||
&op.timeline_id,
|
||||
meta.shard,
|
||||
&layer,
|
||||
generation,
|
||||
meta.generation,
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::config::PageServerConf;
|
||||
use crate::deletion_queue::TEMP_SUFFIX;
|
||||
use crate::metrics;
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
use crate::tenant::remote_timeline_client::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use crate::virtual_file::on_fatal_io_error;
|
||||
use crate::virtual_file::MaybeFatalIo;
|
||||
@@ -58,7 +59,7 @@ pub(super) struct DeletionOp {
|
||||
// `layers` and `objects` are both just lists of objects. `layers` is used if you do not
|
||||
// have a config object handy to project it to a remote key, and need the consuming worker
|
||||
// to do it for you.
|
||||
pub(super) layers: Vec<(LayerFileName, Generation)>,
|
||||
pub(super) layers: Vec<(LayerFileName, LayerFileMetadata)>,
|
||||
pub(super) objects: Vec<RemotePath>,
|
||||
|
||||
/// The _current_ generation of the Tenant attachment in which we are enqueuing
|
||||
@@ -387,12 +388,13 @@ impl ListWriter {
|
||||
);
|
||||
|
||||
let mut layer_paths = Vec::new();
|
||||
for (layer, generation) in op.layers {
|
||||
for (layer, meta) in op.layers {
|
||||
layer_paths.push(remote_layer_path(
|
||||
&op.tenant_id,
|
||||
&op.timeline_id,
|
||||
meta.shard,
|
||||
&layer,
|
||||
generation,
|
||||
meta.generation,
|
||||
));
|
||||
}
|
||||
layer_paths.extend(op.objects);
|
||||
|
||||
@@ -3468,6 +3468,7 @@ pub async fn dump_layerfile_from_path(
|
||||
pub(crate) mod harness {
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
use utils::logging;
|
||||
@@ -3534,6 +3535,7 @@ pub(crate) mod harness {
|
||||
pub tenant_conf: TenantConf,
|
||||
pub tenant_id: TenantId,
|
||||
pub generation: Generation,
|
||||
pub shard: ShardIndex,
|
||||
pub remote_storage: GenericRemoteStorage,
|
||||
pub remote_fs_dir: Utf8PathBuf,
|
||||
pub deletion_queue: MockDeletionQueue,
|
||||
@@ -3593,6 +3595,7 @@ pub(crate) mod harness {
|
||||
tenant_conf,
|
||||
tenant_id,
|
||||
generation: Generation::new(0xdeadbeef),
|
||||
shard: ShardIndex::unsharded(),
|
||||
remote_storage,
|
||||
remote_fs_dir,
|
||||
deletion_queue,
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
//!
|
||||
use anyhow::Context;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::num::NonZeroU64;
|
||||
use std::time::Duration;
|
||||
@@ -88,6 +89,14 @@ pub(crate) struct LocationConf {
|
||||
/// The location-specific part of the configuration, describes the operating
|
||||
/// mode of this pageserver for this tenant.
|
||||
pub(crate) mode: LocationMode,
|
||||
|
||||
/// The detailed shard identity. This structure is already scoped within
|
||||
/// a TenantShardId, but we need the full ShardIdentity to enable calculating
|
||||
/// key->shard mappings.
|
||||
#[serde(default = "ShardIdentity::unsharded")]
|
||||
#[serde(skip_serializing_if = "ShardIdentity::is_unsharded")]
|
||||
pub(crate) shard: ShardIdentity,
|
||||
|
||||
/// The pan-cluster tenant configuration, the same on all locations
|
||||
pub(crate) tenant_conf: TenantConfOpt,
|
||||
}
|
||||
@@ -160,6 +169,8 @@ impl LocationConf {
|
||||
generation,
|
||||
attach_mode: AttachmentMode::Single,
|
||||
}),
|
||||
// Legacy configuration loads are always from tenants created before sharding existed.
|
||||
shard: ShardIdentity::unsharded(),
|
||||
tenant_conf,
|
||||
}
|
||||
}
|
||||
@@ -187,6 +198,7 @@ impl LocationConf {
|
||||
|
||||
fn get_generation(conf: &'_ models::LocationConfig) -> Result<Generation, anyhow::Error> {
|
||||
conf.generation
|
||||
.map(Generation::new)
|
||||
.ok_or_else(|| anyhow::anyhow!("Generation must be set when attaching"))
|
||||
}
|
||||
|
||||
@@ -226,7 +238,21 @@ impl LocationConf {
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self { mode, tenant_conf })
|
||||
let shard = if conf.shard_count == 0 {
|
||||
ShardIdentity::unsharded()
|
||||
} else {
|
||||
ShardIdentity::new(
|
||||
ShardNumber(conf.shard_number),
|
||||
ShardCount(conf.shard_count),
|
||||
ShardStripeSize(conf.shard_stripe_size),
|
||||
)?
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
shard,
|
||||
mode,
|
||||
tenant_conf,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -241,6 +267,7 @@ impl Default for LocationConf {
|
||||
attach_mode: AttachmentMode::Single,
|
||||
}),
|
||||
tenant_conf: TenantConfOpt::default(),
|
||||
shard: ShardIdentity::unsharded(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,6 +188,7 @@ use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use scopeguard::ScopeGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
pub(crate) use upload::upload_initdb_dir;
|
||||
@@ -402,6 +403,11 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_shard_index(&self) -> ShardIndex {
|
||||
// TODO: carry this on the struct
|
||||
ShardIndex::unsharded()
|
||||
}
|
||||
|
||||
pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
|
||||
match &mut *self.upload_queue.lock().unwrap() {
|
||||
UploadQueue::Uninitialized => None,
|
||||
@@ -465,6 +471,7 @@ impl RemoteTimelineClient {
|
||||
&self.storage_impl,
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
self.get_shard_index(),
|
||||
self.generation,
|
||||
cancel,
|
||||
)
|
||||
@@ -657,10 +664,10 @@ impl RemoteTimelineClient {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
let with_generations =
|
||||
let with_metadata =
|
||||
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
|
||||
|
||||
self.schedule_deletion_of_unlinked0(upload_queue, with_generations);
|
||||
self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
|
||||
|
||||
// Launch the tasks immediately, if possible
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
@@ -695,7 +702,7 @@ impl RemoteTimelineClient {
|
||||
self: &Arc<Self>,
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
names: I,
|
||||
) -> Vec<(LayerFileName, Generation)>
|
||||
) -> Vec<(LayerFileName, LayerFileMetadata)>
|
||||
where
|
||||
I: IntoIterator<Item = LayerFileName>,
|
||||
{
|
||||
@@ -703,16 +710,17 @@ impl RemoteTimelineClient {
|
||||
// so we don't need update it. Just serialize it.
|
||||
let metadata = upload_queue.latest_metadata.clone();
|
||||
|
||||
// Decorate our list of names with each name's generation, dropping
|
||||
// names that are unexpectedly missing from our metadata.
|
||||
let with_generations: Vec<_> = names
|
||||
// Decorate our list of names with each name's metadata, dropping
|
||||
// names that are unexpectedly missing from our metadata. This metadata
|
||||
// is later used when physically deleting layers, to construct key paths.
|
||||
let with_metadata: Vec<_> = names
|
||||
.into_iter()
|
||||
.filter_map(|name| {
|
||||
let meta = upload_queue.latest_files.remove(&name);
|
||||
|
||||
if let Some(meta) = meta {
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
|
||||
Some((name, meta.generation))
|
||||
Some((name, meta))
|
||||
} else {
|
||||
// This can only happen if we forgot to to schedule the file upload
|
||||
// before scheduling the delete. Log it because it is a rare/strange
|
||||
@@ -725,9 +733,10 @@ impl RemoteTimelineClient {
|
||||
.collect();
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
for (name, gen) in &with_generations {
|
||||
if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), *gen) {
|
||||
if &unexpected == gen {
|
||||
for (name, metadata) in &with_metadata {
|
||||
let gen = metadata.generation;
|
||||
if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen) {
|
||||
if unexpected == gen {
|
||||
tracing::error!("{name} was unlinked twice with same generation");
|
||||
} else {
|
||||
tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
|
||||
@@ -742,14 +751,14 @@ impl RemoteTimelineClient {
|
||||
self.schedule_index_upload(upload_queue, metadata);
|
||||
}
|
||||
|
||||
with_generations
|
||||
with_metadata
|
||||
}
|
||||
|
||||
/// Schedules deletion for layer files which have previously been unlinked from the
|
||||
/// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
|
||||
pub(crate) fn schedule_deletion_of_unlinked(
|
||||
self: &Arc<Self>,
|
||||
layers: Vec<(LayerFileName, Generation)>,
|
||||
layers: Vec<(LayerFileName, LayerFileMetadata)>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
@@ -762,16 +771,22 @@ impl RemoteTimelineClient {
|
||||
fn schedule_deletion_of_unlinked0(
|
||||
self: &Arc<Self>,
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
with_generations: Vec<(LayerFileName, Generation)>,
|
||||
with_metadata: Vec<(LayerFileName, LayerFileMetadata)>,
|
||||
) {
|
||||
for (name, gen) in &with_generations {
|
||||
info!("scheduling deletion of layer {}{}", name, gen.get_suffix());
|
||||
for (name, meta) in &with_metadata {
|
||||
info!(
|
||||
"scheduling deletion of layer {}{} (shard {})",
|
||||
name,
|
||||
meta.generation.get_suffix(),
|
||||
meta.shard
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
for (name, gen) in &with_generations {
|
||||
for (name, meta) in &with_metadata {
|
||||
let gen = meta.generation;
|
||||
match upload_queue.dangling_files.remove(name) {
|
||||
Some(same) if &same == gen => { /* expected */ }
|
||||
Some(same) if same == gen => { /* expected */ }
|
||||
Some(other) => {
|
||||
tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
|
||||
}
|
||||
@@ -783,7 +798,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
// schedule the actual deletions
|
||||
let op = UploadOp::Delete(Delete {
|
||||
layers: with_generations,
|
||||
layers: with_metadata,
|
||||
});
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
@@ -904,6 +919,7 @@ impl RemoteTimelineClient {
|
||||
&self.storage_impl,
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
self.get_shard_index(),
|
||||
self.generation,
|
||||
&index_part_with_deleted_at,
|
||||
)
|
||||
@@ -962,6 +978,7 @@ impl RemoteTimelineClient {
|
||||
remote_layer_path(
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
meta.shard,
|
||||
&file_name,
|
||||
meta.generation,
|
||||
)
|
||||
@@ -1010,7 +1027,12 @@ impl RemoteTimelineClient {
|
||||
.unwrap_or(
|
||||
// No generation-suffixed indices, assume we are dealing with
|
||||
// a legacy index.
|
||||
remote_index_path(&self.tenant_id, &self.timeline_id, Generation::none()),
|
||||
remote_index_path(
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
self.get_shard_index(),
|
||||
Generation::none(),
|
||||
),
|
||||
);
|
||||
|
||||
let remaining_layers: Vec<RemotePath> = remaining
|
||||
@@ -1219,6 +1241,7 @@ impl RemoteTimelineClient {
|
||||
&self.storage_impl,
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
self.get_shard_index(),
|
||||
self.generation,
|
||||
index_part,
|
||||
)
|
||||
@@ -1527,12 +1550,14 @@ pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> R
|
||||
pub fn remote_layer_path(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
layer_file_name: &LayerFileName,
|
||||
generation: Generation,
|
||||
) -> RemotePath {
|
||||
// Generation-aware key format
|
||||
let path = format!(
|
||||
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
|
||||
"tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
|
||||
shard.get_suffix(),
|
||||
layer_file_name.file_name(),
|
||||
generation.get_suffix()
|
||||
);
|
||||
@@ -1550,10 +1575,12 @@ pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId
|
||||
pub fn remote_index_path(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
generation: Generation,
|
||||
) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
|
||||
"tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
|
||||
shard.get_suffix(),
|
||||
IndexPart::FILE_NAME,
|
||||
generation.get_suffix()
|
||||
))
|
||||
@@ -1778,6 +1805,7 @@ mod tests {
|
||||
println!("remote_timeline_dir: {remote_timeline_dir}");
|
||||
|
||||
let generation = harness.generation;
|
||||
let shard = harness.shard;
|
||||
|
||||
// Create a couple of dummy files, schedule upload for them
|
||||
|
||||
@@ -1794,7 +1822,7 @@ mod tests {
|
||||
harness.conf,
|
||||
&timeline,
|
||||
name,
|
||||
LayerFileMetadata::new(contents.len() as u64, generation),
|
||||
LayerFileMetadata::new(contents.len() as u64, generation, shard),
|
||||
)
|
||||
}).collect::<Vec<_>>();
|
||||
|
||||
@@ -1943,7 +1971,7 @@ mod tests {
|
||||
harness.conf,
|
||||
&timeline,
|
||||
layer_file_name_1.clone(),
|
||||
LayerFileMetadata::new(content_1.len() as u64, harness.generation),
|
||||
LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
|
||||
);
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||
@@ -2008,7 +2036,11 @@ mod tests {
|
||||
assert_eq!(actual_c, expected_c);
|
||||
}
|
||||
|
||||
async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
|
||||
async fn inject_index_part(
|
||||
test_state: &TestSetup,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
) -> IndexPart {
|
||||
// An empty IndexPart, just sufficient to ensure deserialization will succeed
|
||||
let example_metadata = TimelineMetadata::example();
|
||||
let example_index_part = IndexPart::new(
|
||||
@@ -2029,7 +2061,13 @@ mod tests {
|
||||
std::fs::create_dir_all(remote_timeline_dir).expect("creating test dir should work");
|
||||
|
||||
let index_path = test_state.harness.remote_fs_dir.join(
|
||||
remote_index_path(&test_state.harness.tenant_id, &TIMELINE_ID, generation).get_path(),
|
||||
remote_index_path(
|
||||
&test_state.harness.tenant_id,
|
||||
&TIMELINE_ID,
|
||||
shard,
|
||||
generation,
|
||||
)
|
||||
.get_path(),
|
||||
);
|
||||
eprintln!("Writing {index_path}");
|
||||
std::fs::write(&index_path, index_part_bytes).unwrap();
|
||||
@@ -2066,7 +2104,12 @@ mod tests {
|
||||
|
||||
// Simple case: we are in generation N, load the index from generation N - 1
|
||||
let generation_n = 5;
|
||||
let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
|
||||
let injected = inject_index_part(
|
||||
&test_state,
|
||||
Generation::new(generation_n - 1),
|
||||
ShardIndex::unsharded(),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
|
||||
|
||||
@@ -2084,22 +2127,34 @@ mod tests {
|
||||
|
||||
// A generation-less IndexPart exists in the bucket, we should find it
|
||||
let generation_n = 5;
|
||||
let injected_none = inject_index_part(&test_state, Generation::none()).await;
|
||||
let injected_none =
|
||||
inject_index_part(&test_state, Generation::none(), ShardIndex::unsharded()).await;
|
||||
assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
|
||||
|
||||
// If a more recent-than-none generation exists, we should prefer to load that
|
||||
let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
|
||||
let injected_1 =
|
||||
inject_index_part(&test_state, Generation::new(1), ShardIndex::unsharded()).await;
|
||||
assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
|
||||
|
||||
// If a more-recent-than-me generation exists, we should ignore it.
|
||||
let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
|
||||
let _injected_10 =
|
||||
inject_index_part(&test_state, Generation::new(10), ShardIndex::unsharded()).await;
|
||||
assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
|
||||
|
||||
// If a directly previous generation exists, _and_ an index exists in my own
|
||||
// generation, I should prefer my own generation.
|
||||
let _injected_prev =
|
||||
inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
|
||||
let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
|
||||
let _injected_prev = inject_index_part(
|
||||
&test_state,
|
||||
Generation::new(generation_n - 1),
|
||||
ShardIndex::unsharded(),
|
||||
)
|
||||
.await;
|
||||
let injected_current = inject_index_part(
|
||||
&test_state,
|
||||
Generation::new(generation_n),
|
||||
ShardIndex::unsharded(),
|
||||
)
|
||||
.await;
|
||||
assert_got_index_part(
|
||||
&test_state,
|
||||
Generation::new(generation_n),
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use camino::Utf8Path;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -53,6 +54,7 @@ pub async fn download_layer_file<'a>(
|
||||
let remote_path = remote_layer_path(
|
||||
&tenant_id,
|
||||
&timeline_id,
|
||||
layer_metadata.shard,
|
||||
layer_file_name,
|
||||
layer_metadata.generation,
|
||||
);
|
||||
@@ -213,10 +215,11 @@ async fn do_download_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
index_generation: Generation,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, shard, index_generation);
|
||||
|
||||
let index_part_bytes = download_retry_forever(
|
||||
|| async {
|
||||
@@ -254,6 +257,7 @@ pub(super) async fn download_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
my_generation: Generation,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
@@ -261,8 +265,15 @@ pub(super) async fn download_index_part(
|
||||
|
||||
if my_generation.is_none() {
|
||||
// Operating without generations: just fetch the generation-less path
|
||||
return do_download_index_part(storage, tenant_id, timeline_id, my_generation, cancel)
|
||||
.await;
|
||||
return do_download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard,
|
||||
my_generation,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Stale case: If we were intentionally attached in a stale generation, there may already be a remote
|
||||
@@ -273,6 +284,7 @@ pub(super) async fn download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard,
|
||||
my_generation,
|
||||
cancel.clone(),
|
||||
)
|
||||
@@ -300,6 +312,7 @@ pub(super) async fn download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard,
|
||||
my_generation.previous(),
|
||||
cancel.clone(),
|
||||
)
|
||||
@@ -320,8 +333,9 @@ pub(super) async fn download_index_part(
|
||||
}
|
||||
|
||||
// General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
|
||||
// objects, and select the highest one with a generation <= my_generation.
|
||||
let index_prefix = remote_index_path(tenant_id, timeline_id, Generation::none());
|
||||
// objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
|
||||
// to constructing a full index path with no generation, because the generation is a suffix.
|
||||
let index_prefix = remote_index_path(tenant_id, timeline_id, shard, Generation::none());
|
||||
let indices = backoff::retry(
|
||||
|| async { storage.list_files(Some(&index_prefix)).await },
|
||||
|_| false,
|
||||
@@ -347,14 +361,21 @@ pub(super) async fn download_index_part(
|
||||
match max_previous_generation {
|
||||
Some(g) => {
|
||||
tracing::debug!("Found index_part in generation {g:?}");
|
||||
do_download_index_part(storage, tenant_id, timeline_id, g, cancel).await
|
||||
do_download_index_part(storage, tenant_id, timeline_id, shard, g, cancel).await
|
||||
}
|
||||
None => {
|
||||
// Migration from legacy pre-generation state: we have a generation but no prior
|
||||
// attached pageservers did. Try to load from a no-generation path.
|
||||
tracing::info!("No index_part.json* found");
|
||||
do_download_index_part(storage, tenant_id, timeline_id, Generation::none(), cancel)
|
||||
.await
|
||||
do_download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard,
|
||||
Generation::none(),
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use crate::tenant::upload_queue::UploadQueueInitialized;
|
||||
use crate::tenant::Generation;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -25,6 +26,8 @@ pub struct LayerFileMetadata {
|
||||
file_size: u64,
|
||||
|
||||
pub(crate) generation: Generation,
|
||||
|
||||
pub(crate) shard: ShardIndex,
|
||||
}
|
||||
|
||||
impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
|
||||
@@ -32,15 +35,17 @@ impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
|
||||
LayerFileMetadata {
|
||||
file_size: other.file_size,
|
||||
generation: other.generation,
|
||||
shard: other.shard,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
pub fn new(file_size: u64, generation: Generation) -> Self {
|
||||
pub fn new(file_size: u64, generation: Generation, shard: ShardIndex) -> Self {
|
||||
LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
shard,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,6 +166,10 @@ pub struct IndexLayerMetadata {
|
||||
#[serde(default = "Generation::none")]
|
||||
#[serde(skip_serializing_if = "Generation::is_none")]
|
||||
pub generation: Generation,
|
||||
|
||||
#[serde(default = "ShardIndex::unsharded")]
|
||||
#[serde(skip_serializing_if = "ShardIndex::is_unsharded")]
|
||||
pub shard: ShardIndex,
|
||||
}
|
||||
|
||||
impl From<LayerFileMetadata> for IndexLayerMetadata {
|
||||
@@ -168,6 +177,7 @@ impl From<LayerFileMetadata> for IndexLayerMetadata {
|
||||
IndexLayerMetadata {
|
||||
file_size: other.file_size,
|
||||
generation: other.generation,
|
||||
shard: other.shard,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -195,13 +205,15 @@ mod tests {
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none()
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none()
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -233,13 +245,15 @@ mod tests {
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none()
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none()
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -272,13 +286,15 @@ mod tests {
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none()
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none()
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -354,19 +370,21 @@ mod tests {
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none()
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none()
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: Some(chrono::NaiveDateTime::parse_from_str(
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
|
||||
@@ -4,6 +4,7 @@ use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use fail::fail_point;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::io::ErrorKind;
|
||||
use tokio::fs;
|
||||
|
||||
@@ -26,6 +27,7 @@ pub(super) async fn upload_index_part<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
generation: Generation,
|
||||
index_part: &'a IndexPart,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -42,7 +44,7 @@ pub(super) async fn upload_index_part<'a>(
|
||||
let index_part_size = index_part_bytes.len();
|
||||
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
|
||||
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, generation);
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, shard, generation);
|
||||
storage
|
||||
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path)
|
||||
.await
|
||||
|
||||
@@ -3,6 +3,7 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
@@ -96,6 +97,7 @@ impl Layer {
|
||||
desc,
|
||||
None,
|
||||
metadata.generation,
|
||||
metadata.shard,
|
||||
)));
|
||||
|
||||
debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
|
||||
@@ -136,6 +138,7 @@ impl Layer {
|
||||
desc,
|
||||
Some(inner),
|
||||
metadata.generation,
|
||||
metadata.shard,
|
||||
)
|
||||
}));
|
||||
|
||||
@@ -179,6 +182,7 @@ impl Layer {
|
||||
desc,
|
||||
Some(inner),
|
||||
timeline.generation,
|
||||
timeline.get_shard_index(),
|
||||
)
|
||||
}));
|
||||
|
||||
@@ -426,6 +430,15 @@ struct LayerInner {
|
||||
/// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
|
||||
/// for created layers from [`Timeline::generation`].
|
||||
generation: Generation,
|
||||
|
||||
/// The shard of this Layer.
|
||||
///
|
||||
/// For layers created in this process, this will always be the [`ShardIndex`] of the
|
||||
/// current `ShardIdentity`` (TODO: add link once it's introduced).
|
||||
///
|
||||
/// For loaded layers, this may be some other value if the tenant has undergone
|
||||
/// a shard split since the layer was originally written.
|
||||
shard: ShardIndex,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for LayerInner {
|
||||
@@ -459,9 +472,9 @@ impl Drop for LayerInner {
|
||||
|
||||
let path = std::mem::take(&mut self.path);
|
||||
let file_name = self.layer_desc().filename();
|
||||
let gen = self.generation;
|
||||
let file_size = self.layer_desc().file_size;
|
||||
let timeline = self.timeline.clone();
|
||||
let meta = self.metadata();
|
||||
|
||||
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
@@ -489,7 +502,7 @@ impl Drop for LayerInner {
|
||||
timeline.metrics.resident_physical_size_sub(file_size);
|
||||
}
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, gen)]);
|
||||
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
|
||||
|
||||
if let Err(e) = res {
|
||||
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
|
||||
@@ -523,6 +536,7 @@ impl LayerInner {
|
||||
desc: PersistentLayerDesc,
|
||||
downloaded: Option<Arc<DownloadedLayer>>,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
) -> Self {
|
||||
let path = conf
|
||||
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
|
||||
@@ -550,6 +564,7 @@ impl LayerInner {
|
||||
status: tokio::sync::broadcast::channel(1).0,
|
||||
consecutive_failures: AtomicUsize::new(0),
|
||||
generation,
|
||||
shard,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1077,7 +1092,7 @@ impl LayerInner {
|
||||
}
|
||||
|
||||
fn metadata(&self) -> LayerFileMetadata {
|
||||
LayerFileMetadata::new(self.desc.file_size, self.generation)
|
||||
LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -62,6 +62,7 @@ use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
|
||||
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
|
||||
use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
|
||||
use pageserver_api::reltag::RelTag;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::to_pg_timestamp;
|
||||
@@ -1597,6 +1598,7 @@ impl Timeline {
|
||||
|
||||
// Copy to move into the task we're about to spawn
|
||||
let generation = self.generation;
|
||||
let shard = self.get_shard_index();
|
||||
let this = self.myself.upgrade().expect("&self method holds the arc");
|
||||
|
||||
let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
|
||||
@@ -1645,6 +1647,7 @@ impl Timeline {
|
||||
index_part.as_ref(),
|
||||
disk_consistent_lsn,
|
||||
generation,
|
||||
shard,
|
||||
);
|
||||
|
||||
let mut loaded_layers = Vec::new();
|
||||
@@ -4364,6 +4367,11 @@ impl Timeline {
|
||||
resident_layers,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_shard_index(&self) -> ShardIndex {
|
||||
// TODO: carry this on the struct
|
||||
ShardIndex::unsharded()
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalPathItem = (
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::{
|
||||
};
|
||||
use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -107,6 +108,7 @@ pub(super) fn reconcile(
|
||||
index_part: Option<&IndexPart>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
) -> Vec<(LayerFileName, Result<Decision, DismissedLayer>)> {
|
||||
use Decision::*;
|
||||
|
||||
@@ -118,10 +120,13 @@ pub(super) fn reconcile(
|
||||
.map(|(name, file_size)| {
|
||||
(
|
||||
name,
|
||||
// The generation here will be corrected to match IndexPart in the merge below, unless
|
||||
// The generation and shard here will be corrected to match IndexPart in the merge below, unless
|
||||
// it is not in IndexPart, in which case using our current generation makes sense
|
||||
// because it will be uploaded in this generation.
|
||||
(Some(LayerFileMetadata::new(file_size, generation)), None),
|
||||
(
|
||||
Some(LayerFileMetadata::new(file_size, generation, shard)),
|
||||
None,
|
||||
),
|
||||
)
|
||||
})
|
||||
.collect::<Collected>();
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use super::storage_layer::LayerFileName;
|
||||
use super::storage_layer::ResidentLayer;
|
||||
use super::Generation;
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::remote_timeline_client::index::IndexPart;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
@@ -15,6 +14,9 @@ use utils::lsn::AtomicLsn;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
use utils::generation::Generation;
|
||||
|
||||
// clippy warns that Uninitialized is much smaller than Initialized, which wastes
|
||||
// memory for Uninitialized variants. Doesn't matter in practice, there are not
|
||||
// that many upload queues in a running pageserver, and most of them are initialized
|
||||
@@ -232,7 +234,7 @@ pub(crate) struct UploadTask {
|
||||
/// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Delete {
|
||||
pub(crate) layers: Vec<(LayerFileName, Generation)>,
|
||||
pub(crate) layers: Vec<(LayerFileName, LayerFileMetadata)>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -18,6 +18,7 @@ from datetime import datetime
|
||||
from functools import cached_property
|
||||
from itertools import chain, product
|
||||
from pathlib import Path
|
||||
from queue import SimpleQueue
|
||||
from types import TracebackType
|
||||
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, cast
|
||||
from urllib.parse import urlparse
|
||||
@@ -36,8 +37,11 @@ from _pytest.fixtures import FixtureRequest
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
from psycopg2.extensions import cursor as PgCursor
|
||||
from psycopg2.extensions import make_dsn, parse_dsn
|
||||
from pytest_httpserver import HTTPServer
|
||||
from typing_extensions import Literal
|
||||
from urllib3.util.retry import Retry
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
from fixtures.broker import NeonBroker
|
||||
from fixtures.log_helper import log
|
||||
@@ -1005,6 +1009,68 @@ def neon_env_builder(
|
||||
yield builder
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_env_and_metrics_server(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
) -> Tuple[NeonEnv, HTTPServer, SimpleQueue[Any]]:
|
||||
"""
|
||||
Fixture to create a Neon environment and metrics server.
|
||||
"""
|
||||
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
|
||||
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
|
||||
uploads: SimpleQueue[Any] = SimpleQueue()
|
||||
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
if request.json is None:
|
||||
return Response(status=400)
|
||||
|
||||
events = request.json["events"]
|
||||
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
|
||||
assert is_last in ["true", "false"]
|
||||
uploads.put((events, is_last == "true"))
|
||||
return Response(status=200)
|
||||
|
||||
# Require collecting metrics frequently, since we change
|
||||
# the timeline and want something to be logged about it.
|
||||
#
|
||||
# Disable time-based pitr, we will use the manual GC calls
|
||||
# to trigger remote storage operations in a controlled way
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
metric_collection_interval="1s"
|
||||
metric_collection_endpoint="{metric_collection_endpoint}"
|
||||
cached_metric_collection_interval="0s"
|
||||
synthetic_size_calculation_interval="3s"
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
|
||||
|
||||
# mock http server that returns OK for the metrics
|
||||
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
|
||||
metrics_handler
|
||||
)
|
||||
|
||||
# spin up neon, after http server is ready
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
# we have a fast rate of calculation, these can happen at shutdown
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
|
||||
)
|
||||
|
||||
return (env, httpserver, uploads)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageserverPort:
|
||||
pg: int
|
||||
@@ -2640,11 +2706,6 @@ class Endpoint(PgProtocol):
|
||||
|
||||
return self
|
||||
|
||||
def get_metrics_str(self) -> str:
|
||||
request_result = requests.get(f"http://localhost:{self.http_port}/metrics")
|
||||
request_result.raise_for_status()
|
||||
return request_result.text
|
||||
|
||||
def __enter__(self) -> "Endpoint":
|
||||
return self
|
||||
|
||||
|
||||
@@ -6,15 +6,11 @@ def test_build_info_metric(neon_env_builder: NeonEnvBuilder, link_proxy: NeonPro
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_build_info_metric")
|
||||
endpoint = env.endpoints.create_start("test_build_info_metric")
|
||||
|
||||
parsed_metrics = {}
|
||||
|
||||
parsed_metrics["pageserver"] = parse_metrics(env.pageserver.http_client().get_metrics_str())
|
||||
parsed_metrics["safekeeper"] = parse_metrics(env.safekeepers[0].http_client().get_metrics_str())
|
||||
parsed_metrics["proxy"] = parse_metrics(link_proxy.get_metrics())
|
||||
parsed_metrics["compute_ctl"] = parse_metrics(endpoint.get_metrics_str())
|
||||
|
||||
for _component, metrics in parsed_metrics.items():
|
||||
sample = metrics.query_one("libmetrics_build_info")
|
||||
|
||||
@@ -3,75 +3,21 @@ import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from queue import SimpleQueue
|
||||
from typing import Any, Dict, Set
|
||||
from typing import Any, Dict, Set, Tuple
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
NeonEnv,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
# TODO: collect all of the env setup *AFTER* removal of RemoteStorageKind.NOOP
|
||||
|
||||
|
||||
def test_metric_collection(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
neon_env_and_metrics_server: Tuple[NeonEnv, HTTPServer, SimpleQueue[Any]]
|
||||
):
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
|
||||
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
|
||||
uploads: SimpleQueue[Any] = SimpleQueue()
|
||||
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
if request.json is None:
|
||||
return Response(status=400)
|
||||
|
||||
events = request.json["events"]
|
||||
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
|
||||
assert is_last in ["true", "false"]
|
||||
uploads.put((events, is_last == "true"))
|
||||
return Response(status=200)
|
||||
|
||||
# Require collecting metrics frequently, since we change
|
||||
# the timeline and want something to be logged about it.
|
||||
#
|
||||
# Disable time-based pitr, we will use the manual GC calls
|
||||
# to trigger remote storage operations in a controlled way
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
metric_collection_interval="1s"
|
||||
metric_collection_endpoint="{metric_collection_endpoint}"
|
||||
cached_metric_collection_interval="0s"
|
||||
synthetic_size_calculation_interval="3s"
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
|
||||
|
||||
# mock http server that returns OK for the metrics
|
||||
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
|
||||
metrics_handler
|
||||
)
|
||||
|
||||
# spin up neon, after http server is ready
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
# we have a fast rate of calculation, these can happen at shutdown
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
|
||||
)
|
||||
(env, httpserver, uploads) = neon_env_and_metrics_server
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
@@ -168,59 +114,11 @@ def test_metric_collection(
|
||||
|
||||
|
||||
def test_metric_collection_cleans_up_tempfile(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
neon_env_and_metrics_server: Tuple[NeonEnv, HTTPServer, SimpleQueue[Any]]
|
||||
):
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
|
||||
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
|
||||
uploads: SimpleQueue[Any] = SimpleQueue()
|
||||
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
if request.json is None:
|
||||
return Response(status=400)
|
||||
|
||||
events = request.json["events"]
|
||||
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
|
||||
assert is_last in ["true", "false"]
|
||||
uploads.put((events, is_last == "true"))
|
||||
return Response(status=200)
|
||||
|
||||
# Require collecting metrics frequently, since we change
|
||||
# the timeline and want something to be logged about it.
|
||||
#
|
||||
# Disable time-based pitr, we will use the manual GC calls
|
||||
# to trigger remote storage operations in a controlled way
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
metric_collection_interval="1s"
|
||||
metric_collection_endpoint="{metric_collection_endpoint}"
|
||||
cached_metric_collection_interval="0s"
|
||||
synthetic_size_calculation_interval="3s"
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
# mock http server that returns OK for the metrics
|
||||
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
|
||||
metrics_handler
|
||||
)
|
||||
|
||||
# spin up neon, after http server is ready
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
|
||||
(env, httpserver, uploads) = neon_env_and_metrics_server
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
# we have a fast rate of calculation, these can happen at shutdown
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
@@ -290,6 +188,8 @@ def test_metric_collection_cleans_up_tempfile(
|
||||
), "only initial tempfile should had been removed"
|
||||
assert initially.other.issuperset(later.other), "no other files should had been removed"
|
||||
|
||||
httpserver.check()
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrefixPartitionedFiles:
|
||||
|
||||
Reference in New Issue
Block a user