mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 18:10:37 +00:00
Compare commits
22 Commits
vlad/storc
...
vlad/tmp/g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fce602ed30 | ||
|
|
fc24ba5233 | ||
|
|
e200a2b01e | ||
|
|
2a7f224306 | ||
|
|
d86ddf2b76 | ||
|
|
86d5f4ada9 | ||
|
|
089edb55e8 | ||
|
|
1302f9442a | ||
|
|
80612d2688 | ||
|
|
7f96ac3435 | ||
|
|
999fbbb2a3 | ||
|
|
d22e0b5398 | ||
|
|
58340f9dbf | ||
|
|
fcbac527b0 | ||
|
|
a5154cf990 | ||
|
|
bfe5df8c4e | ||
|
|
46927bc228 | ||
|
|
bb9c792813 | ||
|
|
126bcc3794 | ||
|
|
4c2100794b | ||
|
|
d3b892e9ad | ||
|
|
7515d0f368 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5801,6 +5801,7 @@ dependencies = [
|
||||
"r2d2",
|
||||
"reqwest 0.12.4",
|
||||
"routerify",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"strum",
|
||||
|
||||
@@ -209,6 +209,7 @@ pub enum NodeSchedulingPolicy {
|
||||
Active,
|
||||
Filling,
|
||||
Pause,
|
||||
PauseForRestart,
|
||||
Draining,
|
||||
}
|
||||
|
||||
@@ -220,6 +221,7 @@ impl FromStr for NodeSchedulingPolicy {
|
||||
"active" => Ok(Self::Active),
|
||||
"filling" => Ok(Self::Filling),
|
||||
"pause" => Ok(Self::Pause),
|
||||
"pause_for_restart" => Ok(Self::PauseForRestart),
|
||||
"draining" => Ok(Self::Draining),
|
||||
_ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
|
||||
}
|
||||
@@ -233,6 +235,7 @@ impl From<NodeSchedulingPolicy> for String {
|
||||
Active => "active",
|
||||
Filling => "filling",
|
||||
Pause => "pause",
|
||||
PauseForRestart => "pause_for_restart",
|
||||
Draining => "draining",
|
||||
}
|
||||
.to_string()
|
||||
|
||||
@@ -1,11 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use pageserver::tenant::{metadata::TimelineMetadata, IndexPart};
|
||||
use utils::lsn::Lsn;
|
||||
use pageserver::tenant::IndexPart;
|
||||
|
||||
#[derive(clap::Subcommand)]
|
||||
pub(crate) enum IndexPartCmd {
|
||||
@@ -17,20 +12,7 @@ pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
|
||||
IndexPartCmd::Dump { path } => {
|
||||
let bytes = tokio::fs::read(path).await.context("read file")?;
|
||||
let des: IndexPart = IndexPart::from_s3_bytes(&bytes).context("deserialize")?;
|
||||
#[derive(serde::Serialize)]
|
||||
struct Output<'a> {
|
||||
layer_metadata: &'a HashMap<LayerName, LayerFileMetadata>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
timeline_metadata: &'a TimelineMetadata,
|
||||
}
|
||||
|
||||
let output = Output {
|
||||
layer_metadata: &des.layer_metadata,
|
||||
disk_consistent_lsn: des.metadata.disk_consistent_lsn(),
|
||||
timeline_metadata: &des.metadata,
|
||||
};
|
||||
|
||||
let output = serde_json::to_string_pretty(&output).context("serialize output")?;
|
||||
let output = serde_json::to_string_pretty(&des).context("serialize output")?;
|
||||
println!("{output}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -4044,10 +4044,12 @@ mod tests {
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use hex_literal::hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::{AUX_FILES_KEY, AUX_KEY_PREFIX, NON_INHERITED_RANGE};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
|
||||
use rand::{thread_rng, Rng};
|
||||
use storage_layer::PersistentLayerKey;
|
||||
use tests::storage_layer::ValuesReconstructState;
|
||||
use tests::timeline::{GetVectoredError, ShutdownMode};
|
||||
use utils::bin_ser::BeSer;
|
||||
@@ -6584,8 +6586,8 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_metadata_tombstone_image_creation() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_metadata_tombstone_image_creation")?;
|
||||
async fn test_metadata_tombstone_image_creation() {
|
||||
let harness = TenantHarness::create("test_metadata_tombstone_image_creation").unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let key0 = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
@@ -6613,7 +6615,8 @@ mod tests {
|
||||
vec![(Lsn(0x10), vec![(key1, test_img("metadata key 1"))])],
|
||||
Lsn(0x30),
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
@@ -6628,23 +6631,24 @@ mod tests {
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Image layers are created at last_record_lsn
|
||||
let images = tline
|
||||
.inspect_image_layers(Lsn(0x30), &ctx)
|
||||
.await?
|
||||
.await
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.filter(|(k, _)| k.is_metadata_key())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(images.len(), 2); // the image layer should only contain two existing keys, tombstones should be removed.
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_metadata_tombstone_empty_image_creation() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_metadata_tombstone_image_creation")?;
|
||||
async fn test_metadata_tombstone_empty_image_creation() {
|
||||
let harness =
|
||||
TenantHarness::create("test_metadata_tombstone_empty_image_creation").unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap();
|
||||
@@ -6666,7 +6670,8 @@ mod tests {
|
||||
vec![(Lsn(0x10), vec![(key1, test_img("metadata key 1"))])],
|
||||
Lsn(0x30),
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
@@ -6681,16 +6686,174 @@ mod tests {
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Image layers are created at last_record_lsn
|
||||
let images = tline
|
||||
.inspect_image_layers(Lsn(0x30), &ctx)
|
||||
.await?
|
||||
.await
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.filter(|(k, _)| k.is_metadata_key())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(images.len(), 0); // the image layer should not contain tombstones, or it is not created
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple_bottom_most_compaction() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_simple_bottom_most_compaction")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
fn get_key(id: u32) -> Key {
|
||||
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
|
||||
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
key.field6 = id;
|
||||
key
|
||||
}
|
||||
|
||||
// We create one bottom-most image layer, a delta layer D1 crossing the GC horizon, D2 below the horizon, and D3 above the horizon.
|
||||
//
|
||||
// | D1 | | D3 |
|
||||
// -| |-- gc horizon -----------------
|
||||
// | | | D2 |
|
||||
// --------- img layer ------------------
|
||||
//
|
||||
// What we should expact from this compaction is:
|
||||
// | Part of D1 | | D3 |
|
||||
// --------- img layer with D1+D2 at GC horizon------------------
|
||||
|
||||
// img layer at 0x10
|
||||
let img_layer = (0..10)
|
||||
.map(|id| (get_key(id), test_img(&format!("value {id}@0x10"))))
|
||||
.collect_vec();
|
||||
|
||||
let delta1 = vec![
|
||||
// TODO: we should test a real delta record here, which requires us to add a variant of NeonWalRecord for testing purpose.
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x20),
|
||||
Value::Image(test_img("value 1@0x20")),
|
||||
),
|
||||
(
|
||||
get_key(2),
|
||||
Lsn(0x30),
|
||||
Value::Image(test_img("value 2@0x30")),
|
||||
),
|
||||
(
|
||||
get_key(3),
|
||||
Lsn(0x40),
|
||||
Value::Image(test_img("value 3@0x40")),
|
||||
),
|
||||
];
|
||||
let delta2 = vec![
|
||||
(
|
||||
get_key(5),
|
||||
Lsn(0x20),
|
||||
Value::Image(test_img("value 5@0x20")),
|
||||
),
|
||||
(
|
||||
get_key(6),
|
||||
Lsn(0x20),
|
||||
Value::Image(test_img("value 6@0x20")),
|
||||
),
|
||||
];
|
||||
let delta3 = vec![
|
||||
(
|
||||
get_key(8),
|
||||
Lsn(0x40),
|
||||
Value::Image(test_img("value 8@0x40")),
|
||||
),
|
||||
(
|
||||
get_key(9),
|
||||
Lsn(0x40),
|
||||
Value::Image(test_img("value 9@0x40")),
|
||||
),
|
||||
];
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![delta1, delta2, delta3], // delta layers
|
||||
vec![(Lsn(0x10), img_layer)], // image layers
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
{
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
guard.cutoffs.pitr = Lsn(0x30);
|
||||
guard.cutoffs.horizon = Lsn(0x30);
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
|
||||
|
||||
// Check if the image layer at the GC horizon contains exactly what we want
|
||||
let image_at_gc_horizon = tline
|
||||
.inspect_image_layers(Lsn(0x30), &ctx)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.filter(|(k, _)| k.is_metadata_key())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(image_at_gc_horizon.len(), 10);
|
||||
let expected_lsn = [0x10, 0x20, 0x30, 0x10, 0x10, 0x20, 0x20, 0x10, 0x10, 0x10];
|
||||
for idx in 0..10 {
|
||||
assert_eq!(
|
||||
image_at_gc_horizon[idx],
|
||||
(
|
||||
get_key(idx as u32),
|
||||
test_img(&format!("value {idx}@{:#x}", expected_lsn[idx]))
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// Check if old layers are removed / new layers have the expected LSN
|
||||
let mut all_layers = tline.inspect_historic_layers().await.unwrap();
|
||||
all_layers.sort_by(|k1, k2| {
|
||||
(
|
||||
k1.is_delta,
|
||||
k1.key_range.start,
|
||||
k1.key_range.end,
|
||||
k1.lsn_range.start,
|
||||
k1.lsn_range.end,
|
||||
)
|
||||
.cmp(&(
|
||||
k2.is_delta,
|
||||
k2.key_range.start,
|
||||
k2.key_range.end,
|
||||
k2.lsn_range.start,
|
||||
k2.lsn_range.end,
|
||||
))
|
||||
});
|
||||
assert_eq!(
|
||||
all_layers,
|
||||
vec![
|
||||
// Image layer at GC horizon
|
||||
PersistentLayerKey {
|
||||
key_range: Key::MIN..get_key(10),
|
||||
lsn_range: Lsn(0x30)..Lsn(0x31),
|
||||
is_delta: false
|
||||
},
|
||||
// The delta layer that is cut in the middle
|
||||
PersistentLayerKey {
|
||||
key_range: Key::MIN..get_key(9),
|
||||
lsn_range: Lsn(0x30)..Lsn(0x41),
|
||||
is_delta: true
|
||||
},
|
||||
// The delta layer we created and should not be picked for the compaction
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(8)..get_key(10),
|
||||
lsn_range: Lsn(0x40)..Lsn(0x41),
|
||||
is_delta: true
|
||||
}
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,15 +1,23 @@
|
||||
//! Every image of a certain timeline from [`crate::tenant::Tenant`]
|
||||
//! has a metadata that needs to be stored persistently.
|
||||
//! Describes the legacy now hopefully no longer modified per-timeline metadata stored in
|
||||
//! `index_part.json` managed by [`remote_timeline_client`]. For many tenants and their timelines,
|
||||
//! this struct and it's original serialization format is still needed because they were written a
|
||||
//! long time ago.
|
||||
//!
|
||||
//! Later, the file gets used in [`remote_timeline_client`] as a part of
|
||||
//! external storage import and export operations.
|
||||
//! Instead of changing and adding versioning to this, just change [`IndexPart`] with soft json
|
||||
//! versioning.
|
||||
//!
|
||||
//! The module contains all structs and related helper methods related to timeline metadata.
|
||||
//! To clean up this module we need to migrate all index_part.json files to a later version.
|
||||
//! While doing this, we need to be mindful about s3 based recovery as well, so it might take
|
||||
//! however long we keep the old versions to be able to delete the old code. After that, we can
|
||||
//! remove everything else than [`TimelineMetadataBodyV2`], rename it as `TimelineMetadata` and
|
||||
//! move it to `index.rs`. Before doing all of this, we need to keep the structures for backwards
|
||||
//! compatibility.
|
||||
//!
|
||||
//! [`remote_timeline_client`]: super::remote_timeline_client
|
||||
//! [`IndexPart`]: super::remote_timeline_client::index::IndexPart
|
||||
|
||||
use anyhow::ensure;
|
||||
use serde::{de::Error, Deserialize, Serialize, Serializer};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn};
|
||||
|
||||
@@ -17,17 +25,37 @@ use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn};
|
||||
const METADATA_FORMAT_VERSION: u16 = 4;
|
||||
|
||||
/// Previous supported format versions.
|
||||
///
|
||||
/// In practice, none of these should remain, all are [`METADATA_FORMAT_VERSION`], but confirming
|
||||
/// that requires a scrubber run which is yet to be done.
|
||||
const METADATA_OLD_FORMAT_VERSION: u16 = 3;
|
||||
|
||||
/// We assume that a write of up to METADATA_MAX_SIZE bytes is atomic.
|
||||
/// When the file existed on disk we assumed that a write of up to METADATA_MAX_SIZE bytes is atomic.
|
||||
///
|
||||
/// This is the same assumption that PostgreSQL makes with the control file,
|
||||
///
|
||||
/// see PG_CONTROL_MAX_SAFE_SIZE
|
||||
const METADATA_MAX_SIZE: usize = 512;
|
||||
|
||||
/// Metadata stored on disk for each timeline
|
||||
/// Legacy metadata stored as a component of `index_part.json` per timeline.
|
||||
///
|
||||
/// The fields correspond to the values we hold in memory, in Timeline.
|
||||
/// Do not make new changes to this type or the module. In production, we have two different kinds
|
||||
/// of serializations of this type: bincode and json. Bincode version reflects what used to be
|
||||
/// stored on disk in earlier versions and does internal crc32 checksumming.
|
||||
///
|
||||
/// This type should not implement `serde::Serialize` or `serde::Deserialize` because there would
|
||||
/// be a confusion whether you want the old version ([`TimelineMetadata::from_bytes`]) or the modern
|
||||
/// as-exists in `index_part.json` ([`self::modern_serde`]).
|
||||
///
|
||||
/// ```compile_fail
|
||||
/// #[derive(serde::Serialize)]
|
||||
/// struct DoNotDoThis(pageserver::tenant::metadata::TimelineMetadata);
|
||||
/// ```
|
||||
///
|
||||
/// ```compile_fail
|
||||
/// #[derive(serde::Deserialize)]
|
||||
/// struct NeitherDoThis(pageserver::tenant::metadata::TimelineMetadata);
|
||||
/// ```
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct TimelineMetadata {
|
||||
hdr: TimelineMetadataHeader,
|
||||
@@ -40,6 +68,49 @@ struct TimelineMetadataHeader {
|
||||
size: u16, // size of serialized metadata
|
||||
format_version: u16, // metadata format version (used for compatibility checks)
|
||||
}
|
||||
|
||||
impl TryFrom<&TimelineMetadataBodyV2> for TimelineMetadataHeader {
|
||||
type Error = Crc32CalculationFailed;
|
||||
|
||||
fn try_from(value: &TimelineMetadataBodyV2) -> Result<Self, Self::Error> {
|
||||
#[derive(Default)]
|
||||
struct Crc32Sink {
|
||||
crc: u32,
|
||||
count: usize,
|
||||
}
|
||||
|
||||
impl std::io::Write for Crc32Sink {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.crc = crc32c::crc32c_append(self.crc, buf);
|
||||
self.count += buf.len();
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// jump through hoops to calculate the crc32 so that TimelineMetadata::ne works
|
||||
// across serialization versions
|
||||
let mut sink = Crc32Sink::default();
|
||||
<TimelineMetadataBodyV2 as utils::bin_ser::BeSer>::ser_into(value, &mut sink)
|
||||
.map_err(Crc32CalculationFailed)?;
|
||||
|
||||
let size = METADATA_HDR_SIZE + sink.count;
|
||||
|
||||
Ok(TimelineMetadataHeader {
|
||||
checksum: sink.crc,
|
||||
size: size as u16,
|
||||
format_version: METADATA_FORMAT_VERSION,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[error("re-serializing for crc32 failed")]
|
||||
struct Crc32CalculationFailed(#[source] utils::bin_ser::SerializeError);
|
||||
|
||||
const METADATA_HDR_SIZE: usize = std::mem::size_of::<TimelineMetadataHeader>();
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -111,6 +182,12 @@ impl TimelineMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn with_recalculated_checksum(mut self) -> anyhow::Result<Self> {
|
||||
self.hdr = TimelineMetadataHeader::try_from(&self.body)?;
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn upgrade_timeline_metadata(metadata_bytes: &[u8]) -> anyhow::Result<Self> {
|
||||
let mut hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?;
|
||||
|
||||
@@ -261,32 +338,8 @@ impl TimelineMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for TimelineMetadata {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let bytes = Vec::<u8>::deserialize(deserializer)?;
|
||||
Self::from_bytes(bytes.as_slice()).map_err(D::Error::custom)
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for TimelineMetadata {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let bytes = self.to_bytes().map_err(serde::ser::Error::custom)?;
|
||||
bytes.serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) mod modern_serde {
|
||||
use crate::tenant::metadata::METADATA_FORMAT_VERSION;
|
||||
|
||||
use super::{
|
||||
TimelineMetadata, TimelineMetadataBodyV2, TimelineMetadataHeader, METADATA_HDR_SIZE,
|
||||
};
|
||||
use super::{TimelineMetadata, TimelineMetadataBodyV2, TimelineMetadataHeader};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result<TimelineMetadata, D::Error>
|
||||
@@ -322,71 +375,15 @@ pub(crate) mod modern_serde {
|
||||
|
||||
let de = serde::de::value::MapAccessDeserializer::new(map);
|
||||
let body = TimelineMetadataBodyV2::deserialize(de)?;
|
||||
let hdr = TimelineMetadataHeader::try_from(&body).map_err(A::Error::custom)?;
|
||||
|
||||
// jump through hoops to calculate the crc32 so that TimelineMetadata::ne works
|
||||
// across serialization versions
|
||||
let mut sink = Crc32Sink::default();
|
||||
<TimelineMetadataBodyV2 as utils::bin_ser::BeSer>::ser_into(&body, &mut sink)
|
||||
.map_err(|e| A::Error::custom(Crc32CalculationFailed(e)))?;
|
||||
|
||||
let size = METADATA_HDR_SIZE + sink.count;
|
||||
|
||||
Ok(TimelineMetadata {
|
||||
hdr: TimelineMetadataHeader {
|
||||
checksum: sink.crc,
|
||||
size: size as u16,
|
||||
format_version: METADATA_FORMAT_VERSION,
|
||||
},
|
||||
body,
|
||||
})
|
||||
Ok(TimelineMetadata { hdr, body })
|
||||
}
|
||||
}
|
||||
|
||||
deserializer.deserialize_any(Visitor)
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Crc32Sink {
|
||||
crc: u32,
|
||||
count: usize,
|
||||
}
|
||||
|
||||
impl std::io::Write for Crc32Sink {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.crc = crc32c::crc32c_append(self.crc, buf);
|
||||
self.count += buf.len();
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error)]
|
||||
#[error("re-serializing for crc32 failed")]
|
||||
struct Crc32CalculationFailed<E>(#[source] E);
|
||||
|
||||
// this should be true for one release, after that we can change it to false
|
||||
// remember to check the IndexPart::metadata field TODO comment as well
|
||||
const LEGACY_BINCODED_BYTES: bool = true;
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
#[serde(transparent)]
|
||||
struct LegacyPaddedBytes<'a>(&'a TimelineMetadata);
|
||||
|
||||
struct JustTheBodyV2<'a>(&'a TimelineMetadata);
|
||||
|
||||
impl serde::Serialize for JustTheBodyV2<'_> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
// header is not needed, upon reading we've upgraded all v1 to v2
|
||||
self.0.body.serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn serialize<S>(
|
||||
metadata: &TimelineMetadata,
|
||||
serializer: S,
|
||||
@@ -394,25 +391,23 @@ pub(crate) mod modern_serde {
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
// we cannot use TimelineMetadata::serialize for now because it'll do
|
||||
// TimelineMetadata::to_bytes
|
||||
if LEGACY_BINCODED_BYTES {
|
||||
LegacyPaddedBytes(metadata).serialize(serializer)
|
||||
} else {
|
||||
JustTheBodyV2(metadata).serialize(serializer)
|
||||
}
|
||||
// header is not needed, upon reading we've upgraded all v1 to v2
|
||||
metadata.body.serialize(serializer)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserializes_bytes_as_well_as_equivalent_body_v2() {
|
||||
#[derive(serde::Deserialize, serde::Serialize)]
|
||||
struct Wrapper(#[serde(deserialize_with = "deserialize")] TimelineMetadata);
|
||||
struct Wrapper(
|
||||
#[serde(deserialize_with = "deserialize", serialize_with = "serialize")]
|
||||
TimelineMetadata,
|
||||
);
|
||||
|
||||
let too_many_bytes = "[216,111,252,208,0,54,0,4,0,0,0,0,1,73,253,144,1,0,0,0,0,1,73,253,24,0,0,0,0,0,0,0,0,0,0,0,0,0,1,73,253,24,0,0,0,0,1,73,253,24,0,0,0,15,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]";
|
||||
|
||||
let wrapper_from_bytes = serde_json::from_str::<Wrapper>(too_many_bytes).unwrap();
|
||||
|
||||
let serialized = serde_json::to_value(JustTheBodyV2(&wrapper_from_bytes.0)).unwrap();
|
||||
let serialized = serde_json::to_value(&wrapper_from_bytes).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
serialized,
|
||||
@@ -553,59 +548,6 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_metadata_bincode_serde() {
|
||||
let original_metadata = TimelineMetadata::new(
|
||||
Lsn(0x200),
|
||||
Some(Lsn(0x100)),
|
||||
Some(TIMELINE_ID),
|
||||
Lsn(0),
|
||||
Lsn(0),
|
||||
Lsn(0),
|
||||
// Any version will do here, so use the default
|
||||
crate::DEFAULT_PG_VERSION,
|
||||
);
|
||||
let metadata_bytes = original_metadata
|
||||
.to_bytes()
|
||||
.expect("Cannot create bytes array from metadata");
|
||||
|
||||
let metadata_bincode_be_bytes = original_metadata
|
||||
.ser()
|
||||
.expect("Cannot serialize the metadata");
|
||||
|
||||
// 8 bytes for the length of the vector
|
||||
assert_eq!(metadata_bincode_be_bytes.len(), 8 + metadata_bytes.len());
|
||||
|
||||
let expected_bincode_bytes = {
|
||||
let mut temp = vec![];
|
||||
let len_bytes = metadata_bytes.len().to_be_bytes();
|
||||
temp.extend_from_slice(&len_bytes);
|
||||
temp.extend_from_slice(&metadata_bytes);
|
||||
temp
|
||||
};
|
||||
assert_eq!(metadata_bincode_be_bytes, expected_bincode_bytes);
|
||||
|
||||
let deserialized_metadata = TimelineMetadata::des(&metadata_bincode_be_bytes).unwrap();
|
||||
// Deserialized metadata has the metadata header, which is different from the serialized one.
|
||||
// Reference: TimelineMetaData::to_bytes()
|
||||
let expected_metadata = {
|
||||
let mut temp_metadata = original_metadata;
|
||||
let body_bytes = temp_metadata
|
||||
.body
|
||||
.ser()
|
||||
.expect("Cannot serialize the metadata body");
|
||||
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
|
||||
let hdr = TimelineMetadataHeader {
|
||||
size: metadata_size as u16,
|
||||
format_version: METADATA_FORMAT_VERSION,
|
||||
checksum: crc32c::crc32c(&body_bytes),
|
||||
};
|
||||
temp_metadata.hdr = hdr;
|
||||
temp_metadata
|
||||
};
|
||||
assert_eq!(deserialized_metadata, expected_metadata);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_metadata_bincode_serde_ensure_roundtrip() {
|
||||
let original_metadata = TimelineMetadata::new(
|
||||
@@ -619,8 +561,6 @@ mod tests {
|
||||
crate::DEFAULT_PG_VERSION,
|
||||
);
|
||||
let expected_bytes = vec![
|
||||
/* bincode length encoding bytes */
|
||||
0, 0, 0, 0, 0, 0, 2, 0, // 8 bytes for the length of the serialized vector
|
||||
/* TimelineMetadataHeader */
|
||||
4, 37, 101, 34, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2)
|
||||
/* TimelineMetadataBodyV2 */
|
||||
@@ -650,7 +590,7 @@ mod tests {
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0,
|
||||
];
|
||||
let metadata_ser_bytes = original_metadata.ser().unwrap();
|
||||
let metadata_ser_bytes = original_metadata.to_bytes().unwrap();
|
||||
assert_eq!(metadata_ser_bytes, expected_bytes);
|
||||
|
||||
let expected_metadata = {
|
||||
@@ -668,7 +608,7 @@ mod tests {
|
||||
temp_metadata.hdr = hdr;
|
||||
temp_metadata
|
||||
};
|
||||
let des_metadata = TimelineMetadata::des(&metadata_ser_bytes).unwrap();
|
||||
let des_metadata = TimelineMetadata::from_bytes(&metadata_ser_bytes).unwrap();
|
||||
assert_eq!(des_metadata, expected_metadata);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,14 +38,17 @@ pub struct IndexPart {
|
||||
/// that latest version stores.
|
||||
pub layer_metadata: HashMap<LayerName, LayerFileMetadata>,
|
||||
|
||||
// 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata.
|
||||
// It's duplicated for convenience when reading the serialized structure, but is
|
||||
// private because internally we would read from metadata instead.
|
||||
/// Because of the trouble of eyeballing the legacy "metadata" field, we copied the
|
||||
/// "disk_consistent_lsn" out. After version 7 this is no longer needed, but the name cannot be
|
||||
/// reused.
|
||||
pub(super) disk_consistent_lsn: Lsn,
|
||||
|
||||
// TODO: later make this "rename" to "alias", rename field as "legacy_metadata"
|
||||
// TODO: rename as "metadata" next week, keep the alias = "metadata_bytes", bump version Adding
|
||||
// the "alias = metadata" was forgotten in #7693, so we have to use "rewrite = metadata_bytes"
|
||||
// for backwards compatibility.
|
||||
#[serde(
|
||||
rename = "metadata_bytes",
|
||||
alias = "metadata",
|
||||
with = "crate::tenant::metadata::modern_serde"
|
||||
)]
|
||||
pub metadata: TimelineMetadata,
|
||||
@@ -76,10 +79,11 @@ impl IndexPart {
|
||||
/// - 4: timeline_layers is fully removed.
|
||||
/// - 5: lineage was added
|
||||
/// - 6: last_aux_file_policy is added.
|
||||
const LATEST_VERSION: usize = 6;
|
||||
/// - 7: metadata_bytes is no longer written, but still read
|
||||
const LATEST_VERSION: usize = 7;
|
||||
|
||||
// Versions we may see when reading from a bucket.
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6];
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7];
|
||||
|
||||
pub const FILE_NAME: &'static str = "index_part.json";
|
||||
|
||||
@@ -95,7 +99,7 @@ impl IndexPart {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_version(&self) -> usize {
|
||||
pub fn version(&self) -> usize {
|
||||
self.version
|
||||
}
|
||||
|
||||
@@ -217,9 +221,9 @@ impl Lineage {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
|
||||
use super::*;
|
||||
use std::str::FromStr;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
#[test]
|
||||
fn v1_indexpart_is_parsed() {
|
||||
@@ -338,8 +342,7 @@ mod tests {
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: Some(chrono::NaiveDateTime::parse_from_str(
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()),
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
lineage: Lineage::default(),
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
@@ -515,8 +518,7 @@ mod tests {
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: Some(chrono::NaiveDateTime::parse_from_str(
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()),
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
lineage: Lineage {
|
||||
reparenting_history_truncated: false,
|
||||
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
|
||||
@@ -529,6 +531,60 @@ mod tests {
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn v7_indexpart_is_parsed() {
|
||||
let example = r#"{
|
||||
"version": 7,
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata": {
|
||||
"disk_consistent_lsn": "0/16960E8",
|
||||
"prev_record_lsn": "0/1696070",
|
||||
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
|
||||
"ancestor_lsn": "0/0",
|
||||
"latest_gc_cutoff_lsn": "0/1696070",
|
||||
"initdb_lsn": "0/1696070",
|
||||
"pg_version": 14
|
||||
},
|
||||
"deleted_at": "2023-07-31T09:00:00.123"
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
version: 7,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::new(
|
||||
Lsn::from_str("0/16960E8").unwrap(),
|
||||
Some(Lsn::from_str("0/1696070").unwrap()),
|
||||
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
|
||||
Lsn::INVALID,
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
14,
|
||||
).with_recalculated_checksum().unwrap(),
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
lineage: Default::default(),
|
||||
last_aux_file_policy: Default::default(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
fn parse_naive_datetime(s: &str) -> NaiveDateTime {
|
||||
chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f").unwrap()
|
||||
}
|
||||
|
||||
@@ -929,6 +929,45 @@ impl DeltaLayerInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn load_key_values(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
let mut result = Vec::new();
|
||||
let mut stream =
|
||||
Box::pin(self.stream_index_forwards(&index_reader, &[0; DELTA_KEY_SIZE], ctx));
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let cursor = block_reader.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
while let Some(item) = stream.next().await {
|
||||
let (key, lsn, pos) = item?;
|
||||
// TODO: dedup code with get_reconstruct_value
|
||||
// TODO: ctx handling and sharding
|
||||
cursor
|
||||
.read_blob_into_buf(pos.pos(), &mut buf, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to read blob from virtual file {}", self.file.path)
|
||||
})?;
|
||||
let val = Value::des(&buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
self.file.path
|
||||
)
|
||||
})?;
|
||||
result.push((key, lsn, val));
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn plan_reads<Reader>(
|
||||
keyspace: &KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
@@ -485,6 +485,34 @@ impl ImageLayerInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn load_key_values(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
|
||||
let mut result = Vec::new();
|
||||
let mut stream = Box::pin(tree_reader.get_stream_from(&[0; KEY_SIZE], ctx));
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let cursor = block_reader.block_cursor();
|
||||
while let Some(item) = stream.next().await {
|
||||
// TODO: dedup code with get_reconstruct_value
|
||||
let (raw_key, offset) = item?;
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
// TODO: ctx handling and sharding
|
||||
let blob = cursor
|
||||
.read_blob(offset, ctx)
|
||||
.await
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
let value = Bytes::from(blob);
|
||||
result.push((key, self.lsn, Value::Image(value)));
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Traverse the layer's index to build read operations on the overlap of the input keyspace
|
||||
/// and the keys in this layer.
|
||||
///
|
||||
|
||||
@@ -388,6 +388,23 @@ impl Layer {
|
||||
})
|
||||
}
|
||||
|
||||
/// Get all key/values in the layer. Should be replaced with an iterator-based API in the future.
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn load_key_values(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
|
||||
let layer = self
|
||||
.0
|
||||
.get_or_maybe_download(true, Some(ctx))
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
DownloadError::DownloadCancelled => GetVectoredError::Cancelled,
|
||||
other => GetVectoredError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
layer.load_key_values(&self.0, ctx).await
|
||||
}
|
||||
|
||||
/// Download the layer if evicted.
|
||||
///
|
||||
/// Will not error when the layer is already downloaded.
|
||||
@@ -1757,6 +1774,20 @@ impl DownloadedLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
async fn load_key_values(
|
||||
&self,
|
||||
owner: &Arc<LayerInner>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
|
||||
use LayerKind::*;
|
||||
|
||||
match self.get(owner, ctx).await? {
|
||||
Delta(d) => d.load_key_values(ctx).await,
|
||||
Image(i) => i.load_key_values(ctx).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
use LayerKind::*;
|
||||
match self.get(owner, ctx).await? {
|
||||
|
||||
@@ -5549,6 +5549,19 @@ impl Timeline {
|
||||
all_data.sort();
|
||||
Ok(all_data)
|
||||
}
|
||||
|
||||
/// Get all historic layer descriptors in the layer map
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn inspect_historic_layers(
|
||||
self: &Arc<Timeline>,
|
||||
) -> anyhow::Result<Vec<super::storage_layer::PersistentLayerKey>> {
|
||||
let mut layers = Vec::new();
|
||||
let guard = self.layers.read().await;
|
||||
for layer in guard.layer_map().iter_historic_layers() {
|
||||
layers.push(layer.key());
|
||||
}
|
||||
Ok(layers)
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId);
|
||||
|
||||
@@ -952,6 +952,178 @@ impl Timeline {
|
||||
adaptor.flush_updates().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// An experimental compaction building block that combines compaction with garbage collection.
|
||||
///
|
||||
/// The current implementation picks all delta + image layers that are below or intersecting with
|
||||
/// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
|
||||
/// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
|
||||
/// and create delta layers with all deltas >= gc horizon.
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn compact_with_gc(
|
||||
self: &Arc<Self>,
|
||||
_cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
use crate::tenant::storage_layer::ValueReconstructState;
|
||||
// Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
|
||||
// The layer selection has the following properties:
|
||||
// 1. If a layer is in the selection, all layers below it are in the selection.
|
||||
// 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
|
||||
let (layer_selection, gc_cutoff) = {
|
||||
let guard = self.layers.read().await;
|
||||
let layers = guard.layer_map();
|
||||
let gc_info = self.gc_info.read().unwrap();
|
||||
let gc_cutoff = Lsn::min(gc_info.cutoffs.horizon, gc_info.cutoffs.pitr);
|
||||
let mut selected_layers = Vec::new();
|
||||
// TODO: consider retain_lsns
|
||||
drop(gc_info);
|
||||
for desc in layers.iter_historic_layers() {
|
||||
if desc.get_lsn_range().start <= gc_cutoff {
|
||||
selected_layers.push(guard.get_from_desc(&desc));
|
||||
}
|
||||
}
|
||||
(selected_layers, gc_cutoff)
|
||||
};
|
||||
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
|
||||
let mut all_key_values = Vec::new();
|
||||
for layer in &layer_selection {
|
||||
all_key_values.extend(layer.load_key_values(ctx).await?);
|
||||
}
|
||||
// Key small to large, LSN low to high, if the same LSN has both image and delta due to the merge of delta layers and
|
||||
// image layers, make image appear later than delta.
|
||||
struct ValueWrapper<'a>(&'a crate::repository::Value);
|
||||
impl Ord for ValueWrapper<'_> {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
use crate::repository::Value;
|
||||
use std::cmp::Ordering;
|
||||
match (self.0, other.0) {
|
||||
(Value::Image(_), Value::WalRecord(_)) => Ordering::Greater,
|
||||
(Value::WalRecord(_), Value::Image(_)) => Ordering::Less,
|
||||
_ => Ordering::Equal,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl PartialOrd for ValueWrapper<'_> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
impl PartialEq for ValueWrapper<'_> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.cmp(other) == std::cmp::Ordering::Equal
|
||||
}
|
||||
}
|
||||
impl Eq for ValueWrapper<'_> {}
|
||||
all_key_values.sort_by(|(k1, l1, v1), (k2, l2, v2)| {
|
||||
(k1, l1, ValueWrapper(v1)).cmp(&(k2, l2, ValueWrapper(v2)))
|
||||
});
|
||||
let max_lsn = all_key_values
|
||||
.iter()
|
||||
.map(|(_, lsn, _)| lsn)
|
||||
.max()
|
||||
.copied()
|
||||
.unwrap()
|
||||
+ 1;
|
||||
// Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
|
||||
// Data of the same key.
|
||||
let mut accumulated_values = Vec::new();
|
||||
let mut last_key = all_key_values.first().unwrap().0; // TODO: assert all_key_values not empty
|
||||
|
||||
/// Take a list of images and deltas, produce an image at the GC horizon, and a list of deltas above the GC horizon.
|
||||
async fn flush_accumulated_states(
|
||||
tline: &Arc<Timeline>,
|
||||
key: Key,
|
||||
accumulated_values: &[&(Key, Lsn, crate::repository::Value)],
|
||||
horizon: Lsn,
|
||||
) -> anyhow::Result<(Vec<(Key, Lsn, crate::repository::Value)>, bytes::Bytes)> {
|
||||
let mut base_image = None;
|
||||
let mut keys_above_horizon = Vec::new();
|
||||
let mut delta_above_base_image = Vec::new();
|
||||
// We have a list of deltas/images. We want to create image layers while collect garbages.
|
||||
for (key, lsn, val) in accumulated_values.iter().rev() {
|
||||
if *lsn > horizon {
|
||||
keys_above_horizon.push((*key, *lsn, val.clone())); // TODO: ensure one LSN corresponds to either delta or image instead of both
|
||||
} else if *lsn <= horizon {
|
||||
match val {
|
||||
crate::repository::Value::Image(image) => {
|
||||
if lsn <= &horizon {
|
||||
base_image = Some((*lsn, image.clone()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
crate::repository::Value::WalRecord(wal) => {
|
||||
delta_above_base_image.push((*lsn, wal.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
delta_above_base_image.reverse();
|
||||
keys_above_horizon.reverse();
|
||||
let state = ValueReconstructState {
|
||||
img: base_image,
|
||||
records: delta_above_base_image,
|
||||
};
|
||||
let img = tline.reconstruct_value(key, horizon, state).await?;
|
||||
Ok((keys_above_horizon, img))
|
||||
}
|
||||
|
||||
let mut delta_layer_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
all_key_values.first().unwrap().0,
|
||||
gc_cutoff..max_lsn, // TODO: off by one?
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&(all_key_values.first().unwrap().0..all_key_values.last().unwrap().0.next()),
|
||||
gc_cutoff,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
for item @ (key, _, _) in &all_key_values {
|
||||
if &last_key == key {
|
||||
accumulated_values.push(item);
|
||||
} else {
|
||||
let (deltas, image) =
|
||||
flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff)
|
||||
.await?;
|
||||
image_layer_writer.put_image(last_key, image, ctx).await?;
|
||||
for (key, lsn, val) in deltas {
|
||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||
}
|
||||
accumulated_values.clear();
|
||||
accumulated_values.push(item);
|
||||
last_key = *key;
|
||||
}
|
||||
}
|
||||
let (deltas, image) =
|
||||
flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?;
|
||||
image_layer_writer.put_image(last_key, image, ctx).await?;
|
||||
for (key, lsn, val) in deltas {
|
||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||
}
|
||||
accumulated_values.clear();
|
||||
// TODO: split layers
|
||||
let delta_layer = delta_layer_writer.finish(last_key, self, ctx).await?;
|
||||
let image_layer = image_layer_writer.finish(self, ctx).await?;
|
||||
// Step 3: Place back to the layer map.
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
guard.finish_gc_compaction(
|
||||
&layer_selection,
|
||||
&[delta_layer.clone(), image_layer.clone()],
|
||||
&self.metrics,
|
||||
)
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct TimelineAdaptor {
|
||||
|
||||
@@ -226,6 +226,18 @@ impl LayerManager {
|
||||
updates.flush();
|
||||
}
|
||||
|
||||
/// Called when a GC-compaction is completed.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn finish_gc_compaction(
|
||||
&mut self,
|
||||
compact_from: &[Layer],
|
||||
compact_to: &[ResidentLayer],
|
||||
metrics: &TimelineMetrics,
|
||||
) {
|
||||
// We can simply reuse compact l0 logic. Use a different function name to indicate a different type of layer map modification.
|
||||
self.finish_compact_l0(compact_from, compact_to, metrics)
|
||||
}
|
||||
|
||||
/// Called when compaction is completed.
|
||||
pub(crate) fn rewrite_layers(
|
||||
&mut self,
|
||||
|
||||
@@ -78,17 +78,16 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers: _s3_layers,
|
||||
} => {
|
||||
if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
|
||||
result.errors.push(format!(
|
||||
"index_part.json version: {}",
|
||||
index_part.get_version()
|
||||
))
|
||||
if !IndexPart::KNOWN_VERSIONS.contains(&index_part.version()) {
|
||||
result
|
||||
.errors
|
||||
.push(format!("index_part.json version: {}", index_part.version()))
|
||||
}
|
||||
|
||||
if &index_part.get_version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
|
||||
if &index_part.version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
|
||||
result.warnings.push(format!(
|
||||
"index_part.json version is not latest: {}",
|
||||
index_part.get_version()
|
||||
index_part.version()
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
@@ -125,7 +125,7 @@ impl MetadataSummary {
|
||||
{
|
||||
*self
|
||||
.indices_by_version
|
||||
.entry(index_part.get_version())
|
||||
.entry(index_part.version())
|
||||
.or_insert(0) += 1;
|
||||
|
||||
if let Err(e) = self.update_histograms(index_part) {
|
||||
|
||||
@@ -40,6 +40,7 @@ tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
measured.workspace = true
|
||||
scopeguard.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
|
||||
|
||||
57
storage_controller/src/background_node_operations.rs
Normal file
57
storage_controller/src/background_node_operations.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use std::{borrow::Cow, fmt::Debug, fmt::Display};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::NodeId;
|
||||
|
||||
pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 10;
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) struct Drain {
|
||||
pub(crate) node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) struct Fill {
|
||||
pub(crate) node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) enum Operation {
|
||||
Drain(Drain),
|
||||
Fill(Fill),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum OperationError {
|
||||
#[error("Node state changed during operation: {0}")]
|
||||
NodeStateChanged(Cow<'static, str>),
|
||||
#[error("Operation cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
pub(crate) struct OperationHandler {
|
||||
pub(crate) operation: Operation,
|
||||
#[allow(unused)]
|
||||
pub(crate) cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl Display for Drain {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "drain {}", self.node_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Fill {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "fill {}", self.node_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Operation {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
Operation::Drain(op) => write!(f, "{op}"),
|
||||
Operation::Fill(op) => write!(f, "{op}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -480,6 +480,39 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
let node_status = state.service.get_node(node_id).await?;
|
||||
|
||||
json_response(StatusCode::OK, node_status)
|
||||
}
|
||||
|
||||
async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
state.service.start_node_drain(node_id).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
state.service.start_node_fill(node_id).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_shard_split(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
@@ -832,6 +865,16 @@ pub fn make_router(
|
||||
RequestName("control_v1_node_config"),
|
||||
)
|
||||
})
|
||||
.get("/control/v1/node/:node_id", |r| {
|
||||
named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
|
||||
})
|
||||
.put("/control/v1/node/:node_id/drain", |r| {
|
||||
named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
|
||||
})
|
||||
.put("/control/v1/node/:node_id/fill", |r| {
|
||||
named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
|
||||
})
|
||||
// TODO(vlad): endpoint for cancelling drain and fill
|
||||
// Tenant Shard operations
|
||||
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
|
||||
tenant_service_handler(
|
||||
|
||||
@@ -2,6 +2,7 @@ use serde::Serialize;
|
||||
use utils::seqwait::MonotonicCounter;
|
||||
|
||||
mod auth;
|
||||
mod background_node_operations;
|
||||
mod compute_hook;
|
||||
mod heartbeater;
|
||||
pub mod http;
|
||||
|
||||
@@ -59,6 +59,10 @@ impl Node {
|
||||
self.id
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
|
||||
self.scheduling
|
||||
}
|
||||
|
||||
pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
|
||||
self.scheduling = scheduling
|
||||
}
|
||||
@@ -141,6 +145,7 @@ impl Node {
|
||||
NodeSchedulingPolicy::Draining => MaySchedule::No,
|
||||
NodeSchedulingPolicy::Filling => MaySchedule::Yes(score),
|
||||
NodeSchedulingPolicy::Pause => MaySchedule::No,
|
||||
NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,7 +162,7 @@ impl Node {
|
||||
listen_http_port,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
scheduling: NodeSchedulingPolicy::Filling,
|
||||
scheduling: NodeSchedulingPolicy::Active,
|
||||
availability: NodeAvailability::Offline,
|
||||
cancel: CancellationToken::new(),
|
||||
}
|
||||
|
||||
@@ -442,13 +442,15 @@ impl Persistence {
|
||||
#[tracing::instrument(skip_all, fields(node_id))]
|
||||
pub(crate) async fn re_attach(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
input_node_id: NodeId,
|
||||
) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
|
||||
use crate::schema::nodes::dsl::scheduling_policy;
|
||||
use crate::schema::nodes::dsl::*;
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
let updated = self
|
||||
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
|
||||
let rows_updated = diesel::update(tenant_shards)
|
||||
.filter(generation_pageserver.eq(node_id.0 as i64))
|
||||
.filter(generation_pageserver.eq(input_node_id.0 as i64))
|
||||
.set(generation.eq(generation + 1))
|
||||
.execute(conn)?;
|
||||
|
||||
@@ -457,9 +459,23 @@ impl Persistence {
|
||||
// TODO: UPDATE+SELECT in one query
|
||||
|
||||
let updated = tenant_shards
|
||||
.filter(generation_pageserver.eq(node_id.0 as i64))
|
||||
.filter(generation_pageserver.eq(input_node_id.0 as i64))
|
||||
.select(TenantShardPersistence::as_select())
|
||||
.load(conn)?;
|
||||
|
||||
// If the node went through a drain and restart phase before re-attaching,
|
||||
// then reset it's node scheduling policy to active.
|
||||
diesel::update(nodes)
|
||||
.filter(node_id.eq(input_node_id.0 as i64))
|
||||
.filter(
|
||||
scheduling_policy
|
||||
.eq(String::from(NodeSchedulingPolicy::PauseForRestart))
|
||||
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining)))
|
||||
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))),
|
||||
)
|
||||
.set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
|
||||
.execute(conn)?;
|
||||
|
||||
Ok(updated)
|
||||
})
|
||||
.await?;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::{node::Node, tenant_shard::TenantShard};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::UtilizationScore;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
@@ -29,6 +30,8 @@ pub enum MaySchedule {
|
||||
struct SchedulerNode {
|
||||
/// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
|
||||
shard_count: usize,
|
||||
/// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
|
||||
attached_shard_count: usize,
|
||||
|
||||
/// Whether this node is currently elegible to have new shards scheduled (this is derived
|
||||
/// from a node's availability state and scheduling policy).
|
||||
@@ -42,7 +45,9 @@ impl PartialEq for SchedulerNode {
|
||||
(MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
|
||||
);
|
||||
|
||||
may_schedule_matches && self.shard_count == other.shard_count
|
||||
may_schedule_matches
|
||||
&& self.shard_count == other.shard_count
|
||||
&& self.attached_shard_count == other.attached_shard_count
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,6 +143,15 @@ impl ScheduleContext {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum RefCountUpdate {
|
||||
PromoteSecondary,
|
||||
Attach,
|
||||
Detach,
|
||||
DemoteAttached,
|
||||
AddSecondary,
|
||||
RemoveSecondary,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
|
||||
let mut scheduler_nodes = HashMap::new();
|
||||
@@ -146,6 +160,7 @@ impl Scheduler {
|
||||
node.get_id(),
|
||||
SchedulerNode {
|
||||
shard_count: 0,
|
||||
attached_shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
},
|
||||
);
|
||||
@@ -171,6 +186,7 @@ impl Scheduler {
|
||||
node.get_id(),
|
||||
SchedulerNode {
|
||||
shard_count: 0,
|
||||
attached_shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
},
|
||||
);
|
||||
@@ -179,7 +195,10 @@ impl Scheduler {
|
||||
for shard in shards {
|
||||
if let Some(node_id) = shard.intent.get_attached() {
|
||||
match expect_nodes.get_mut(node_id) {
|
||||
Some(node) => node.shard_count += 1,
|
||||
Some(node) => {
|
||||
node.shard_count += 1;
|
||||
node.attached_shard_count += 1;
|
||||
}
|
||||
None => anyhow::bail!(
|
||||
"Tenant {} references nonexistent node {}",
|
||||
shard.tenant_shard_id,
|
||||
@@ -227,31 +246,80 @@ impl Scheduler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Increment the reference count of a node. This reference count is used to guide scheduling
|
||||
/// decisions, not for memory management: it represents one tenant shard whose IntentState targets
|
||||
/// this node.
|
||||
/// Update the reference counts of a node. These reference counts are used to guide scheduling
|
||||
/// decisions, not for memory management: they represent the number of tenant shard whose IntentState
|
||||
/// targets this node and the number of tenants shars whose IntentState is attached to this
|
||||
/// node.
|
||||
///
|
||||
/// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
|
||||
/// [`Self::new`] or [`Self::node_upsert`])
|
||||
pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
|
||||
pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
|
||||
let Some(node) = self.nodes.get_mut(&node_id) else {
|
||||
tracing::error!("Scheduler missing node {node_id}");
|
||||
debug_assert!(false);
|
||||
tracing::error!("Scheduler missing node {node_id}");
|
||||
return;
|
||||
};
|
||||
|
||||
node.shard_count += 1;
|
||||
match update {
|
||||
RefCountUpdate::PromoteSecondary => {
|
||||
node.attached_shard_count += 1;
|
||||
}
|
||||
RefCountUpdate::Attach => {
|
||||
node.shard_count += 1;
|
||||
node.attached_shard_count += 1;
|
||||
}
|
||||
RefCountUpdate::Detach => {
|
||||
node.shard_count -= 1;
|
||||
node.attached_shard_count -= 1;
|
||||
}
|
||||
RefCountUpdate::DemoteAttached => {
|
||||
node.attached_shard_count -= 1;
|
||||
}
|
||||
RefCountUpdate::AddSecondary => {
|
||||
node.shard_count += 1;
|
||||
}
|
||||
RefCountUpdate::RemoveSecondary => {
|
||||
node.shard_count -= 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`].
|
||||
pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
|
||||
let Some(node) = self.nodes.get_mut(&node_id) else {
|
||||
// Check if the number of shards attached to a given node is lagging below
|
||||
// the cluster average. If that's the case, the node should be filled.
|
||||
pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
|
||||
let Some(node) = self.nodes.get(&node_id) else {
|
||||
debug_assert!(false);
|
||||
tracing::error!("Scheduler missing node {node_id}");
|
||||
return;
|
||||
return 0;
|
||||
};
|
||||
assert!(!self.nodes.is_empty());
|
||||
let expected_attached_shards_per_node = self.expected_attached_shard_count();
|
||||
|
||||
node.shard_count -= 1;
|
||||
for (node_id, node) in self.nodes.iter() {
|
||||
tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
|
||||
}
|
||||
|
||||
if node.attached_shard_count < expected_attached_shards_per_node {
|
||||
expected_attached_shards_per_node - node.attached_shard_count
|
||||
} else {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn expected_attached_shard_count(&self) -> usize {
|
||||
let total_attached_shards: usize =
|
||||
self.nodes.values().map(|n| n.attached_shard_count).sum();
|
||||
|
||||
assert!(!self.nodes.is_empty());
|
||||
total_attached_shards / self.nodes.len()
|
||||
}
|
||||
|
||||
pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
|
||||
self.nodes
|
||||
.iter()
|
||||
.map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
|
||||
.sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn node_upsert(&mut self, node: &Node) {
|
||||
@@ -263,6 +331,7 @@ impl Scheduler {
|
||||
Vacant(entry) => {
|
||||
entry.insert(SchedulerNode {
|
||||
shard_count: 0,
|
||||
attached_shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
});
|
||||
}
|
||||
@@ -385,6 +454,11 @@ impl Scheduler {
|
||||
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
|
||||
self.nodes.get(&node_id).unwrap().shard_count
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
|
||||
self.nodes.get(&node_id).unwrap().attached_shard_count
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -437,18 +511,28 @@ mod tests {
|
||||
let scheduled = scheduler.schedule_shard(&[], &context)?;
|
||||
t2_intent.set_attached(&mut scheduler, Some(scheduled));
|
||||
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
|
||||
|
||||
let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
|
||||
t1_intent.push_secondary(&mut scheduler, scheduled);
|
||||
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 2);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
|
||||
|
||||
t1_intent.clear(&mut scheduler);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
|
||||
|
||||
let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
|
||||
+ scheduler.get_node_attached_shard_count(NodeId(2));
|
||||
assert_eq!(total_attached, 1);
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
// Dropping an IntentState without clearing it causes a panic in debug mode,
|
||||
@@ -459,8 +543,12 @@ mod tests {
|
||||
assert!(result.is_err());
|
||||
} else {
|
||||
t2_intent.clear(&mut scheduler);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 0);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -8,13 +8,17 @@ use std::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
background_node_operations::{
|
||||
Drain, Fill, Operation, OperationError, OperationHandler, MAX_RECONCILES_PER_OPERATION,
|
||||
},
|
||||
compute_hook::NotifyError,
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
|
||||
persistence::{AbortShardSplitStatus, TenantFilter},
|
||||
reconciler::{ReconcileError, ReconcileUnits},
|
||||
scheduler::{ScheduleContext, ScheduleMode},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
|
||||
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
|
||||
ScheduleOptimizationAction,
|
||||
},
|
||||
};
|
||||
use anyhow::Context;
|
||||
@@ -134,6 +138,8 @@ struct ServiceState {
|
||||
|
||||
scheduler: Scheduler,
|
||||
|
||||
ongoing_operation: Option<OperationHandler>,
|
||||
|
||||
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
|
||||
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
|
||||
}
|
||||
@@ -185,6 +191,7 @@ impl ServiceState {
|
||||
tenants,
|
||||
nodes: Arc::new(nodes),
|
||||
scheduler,
|
||||
ongoing_operation: None,
|
||||
delayed_reconcile_rx,
|
||||
}
|
||||
}
|
||||
@@ -296,6 +303,17 @@ impl From<ReconcileWaitError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OperationError> for ApiError {
|
||||
fn from(value: OperationError) -> Self {
|
||||
match value {
|
||||
OperationError::NodeStateChanged(err) => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(err))
|
||||
}
|
||||
OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
enum TenantCreateOrUpdate {
|
||||
Create(TenantCreateRequest),
|
||||
@@ -1562,15 +1580,32 @@ impl Service {
|
||||
// Setting a node active unblocks any Reconcilers that might write to the location config API,
|
||||
// but those requests will not be accepted by the node until it has finished processing
|
||||
// the re-attach response.
|
||||
//
|
||||
// Additionally, reset the nodes scheduling policy to match the conditional update done
|
||||
// in [`Persistence::re_attach`].
|
||||
if let Some(node) = nodes.get(&reattach_req.node_id) {
|
||||
if !node.is_available() {
|
||||
let reset_scheduling = matches!(
|
||||
node.get_scheduling(),
|
||||
NodeSchedulingPolicy::PauseForRestart
|
||||
| NodeSchedulingPolicy::Draining
|
||||
| NodeSchedulingPolicy::Filling
|
||||
);
|
||||
|
||||
if !node.is_available() || reset_scheduling {
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) {
|
||||
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
|
||||
if !node.is_available() {
|
||||
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
|
||||
}
|
||||
|
||||
if reset_scheduling {
|
||||
node.set_scheduling(NodeSchedulingPolicy::Active);
|
||||
}
|
||||
|
||||
scheduler.node_upsert(node);
|
||||
let new_nodes = Arc::new(new_nodes);
|
||||
*nodes = new_nodes;
|
||||
}
|
||||
let new_nodes = Arc::new(new_nodes);
|
||||
*nodes = new_nodes;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1851,6 +1886,25 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Same as [`Service::await_waiters`], but returns the waiters which are still
|
||||
/// in progress
|
||||
async fn await_waiters_remainder(
|
||||
&self,
|
||||
waiters: Vec<ReconcilerWaiter>,
|
||||
timeout: Duration,
|
||||
) -> Vec<ReconcilerWaiter> {
|
||||
let deadline = Instant::now().checked_add(timeout).unwrap();
|
||||
for waiter in waiters.iter() {
|
||||
let timeout = deadline.duration_since(Instant::now());
|
||||
let _ = waiter.wait_timeout(timeout).await;
|
||||
}
|
||||
|
||||
waiters
|
||||
.into_iter()
|
||||
.filter(|waiter| matches!(waiter.get_status(), ReconcilerStatus::InProgress))
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Part of [`Self::tenant_location_config`]: dissect an incoming location config request,
|
||||
/// and transform it into either a tenant creation of a series of shard updates.
|
||||
///
|
||||
@@ -4128,6 +4182,18 @@ impl Service {
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_node(&self, node_id: NodeId) -> Result<Node, ApiError> {
|
||||
self.inner
|
||||
.read()
|
||||
.unwrap()
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.cloned()
|
||||
.ok_or(ApiError::NotFound(
|
||||
format!("Node {node_id} not registered").into(),
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) async fn node_register(
|
||||
&self,
|
||||
register_req: NodeRegisterRequest,
|
||||
@@ -4282,9 +4348,6 @@ impl Service {
|
||||
|
||||
if let Some(scheduling) = scheduling {
|
||||
node.set_scheduling(scheduling);
|
||||
|
||||
// TODO: once we have a background scheduling ticker for fill/drain, kick it
|
||||
// to wake up and start working.
|
||||
}
|
||||
|
||||
// Update the scheduler, in case the elegibility of the node for new shards has changed
|
||||
@@ -4312,7 +4375,7 @@ impl Service {
|
||||
continue;
|
||||
}
|
||||
|
||||
if tenant_shard.intent.demote_attached(node_id) {
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
tenant_shard.sequence = tenant_shard.sequence.next();
|
||||
|
||||
// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
|
||||
@@ -4359,7 +4422,7 @@ impl Service {
|
||||
// TODO: in the background, we should balance work back onto this pageserver
|
||||
}
|
||||
AvailabilityTransition::Unchanged => {
|
||||
tracing::debug!("Node {} no change during config", node_id);
|
||||
tracing::debug!("Node {} no availability change during config", node_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4368,6 +4431,176 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn start_node_drain(
|
||||
self: &Arc<Self>,
|
||||
node_id: NodeId,
|
||||
) -> Result<(), ApiError> {
|
||||
let (ongoing_op, node_available, node_policy, schedulable_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
let schedulable_nodes_count = nodes
|
||||
.iter()
|
||||
.filter(|(_, n)| matches!(n.may_schedule(), MaySchedule::Yes(_)))
|
||||
.count();
|
||||
|
||||
(
|
||||
locked
|
||||
.ongoing_operation
|
||||
.as_ref()
|
||||
.map(|ongoing| ongoing.operation),
|
||||
node.is_available(),
|
||||
node.get_scheduling(),
|
||||
schedulable_nodes_count,
|
||||
)
|
||||
};
|
||||
|
||||
if let Some(ongoing) = ongoing_op {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Background operation already ongoing for node: {}", ongoing).into(),
|
||||
));
|
||||
}
|
||||
|
||||
if !node_available {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Node {node_id} is currently unavailable").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if schedulable_nodes_count == 0 {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
"No other schedulable nodes to drain to".into(),
|
||||
));
|
||||
}
|
||||
|
||||
match node_policy {
|
||||
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => {
|
||||
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining))
|
||||
.await?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
self.inner.write().unwrap().ongoing_operation = Some(OperationHandler {
|
||||
operation: Operation::Drain(Drain { node_id }),
|
||||
cancel: cancel.clone(),
|
||||
});
|
||||
|
||||
tokio::task::spawn({
|
||||
let service = self.clone();
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
let prev = service.inner.write().unwrap().ongoing_operation.take();
|
||||
|
||||
if let Some(Operation::Drain(removed_drain)) = prev.map(|h| h.operation) {
|
||||
assert_eq!(removed_drain.node_id, node_id, "We always take the same operation");
|
||||
} else {
|
||||
panic!("We always remove the same operation")
|
||||
}
|
||||
}
|
||||
service.drain_node(node_id, cancel).await
|
||||
}
|
||||
});
|
||||
}
|
||||
NodeSchedulingPolicy::Draining => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Node {node_id} has drain in progress"
|
||||
)));
|
||||
}
|
||||
policy => {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} cannot be drained due to {policy:?} policy").into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn start_node_fill(self: &Arc<Self>, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (ongoing_op, node_available, node_policy, total_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
|
||||
(
|
||||
locked
|
||||
.ongoing_operation
|
||||
.as_ref()
|
||||
.map(|ongoing| ongoing.operation),
|
||||
node.is_available(),
|
||||
node.get_scheduling(),
|
||||
nodes.len(),
|
||||
)
|
||||
};
|
||||
|
||||
if let Some(ongoing) = ongoing_op {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Background operation already ongoing for node: {}", ongoing).into(),
|
||||
));
|
||||
}
|
||||
|
||||
if !node_available {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Node {node_id} is currently unavailable").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if total_nodes_count <= 1 {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
"No other nodes to fill from".into(),
|
||||
));
|
||||
}
|
||||
|
||||
match node_policy {
|
||||
NodeSchedulingPolicy::Active => {
|
||||
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling))
|
||||
.await?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
self.inner.write().unwrap().ongoing_operation = Some(OperationHandler {
|
||||
operation: Operation::Fill(Fill { node_id }),
|
||||
cancel: cancel.clone(),
|
||||
});
|
||||
|
||||
tokio::task::spawn({
|
||||
let service = self.clone();
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
let prev = service.inner.write().unwrap().ongoing_operation.take();
|
||||
|
||||
if let Some(Operation::Fill(removed_fill)) = prev.map(|h| h.operation) {
|
||||
assert_eq!(removed_fill.node_id, node_id, "We always take the same operation");
|
||||
} else {
|
||||
panic!("We always remove the same operation")
|
||||
}
|
||||
}
|
||||
|
||||
service.fill_node(node_id, cancel).await
|
||||
}
|
||||
});
|
||||
}
|
||||
NodeSchedulingPolicy::Filling => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Node {node_id} has fill in progress"
|
||||
)));
|
||||
}
|
||||
policy => {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} cannot be filled due to {policy:?} policy").into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper for methods that will try and call pageserver APIs for
|
||||
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
|
||||
/// is attached somewhere.
|
||||
@@ -4952,4 +5185,268 @@ impl Service {
|
||||
// to complete.
|
||||
self.gate.close().await;
|
||||
}
|
||||
|
||||
/// Drain a node by moving the shards attached to it as primaries.
|
||||
/// This is a long running operation and it should run as a separate Tokio task.
|
||||
pub(crate) async fn drain_node(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
tracing::info!(%node_id, "Starting drain background operation");
|
||||
|
||||
let mut last_inspected_shard: Option<TenantShardId> = None;
|
||||
let mut inspected_all_shards = false;
|
||||
let mut waiters = Vec::new();
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
while !inspected_all_shards {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(OperationError::Cancelled);
|
||||
}
|
||||
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} was removed").into(),
|
||||
))?;
|
||||
|
||||
let current_policy = node.get_scheduling();
|
||||
if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
|
||||
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
|
||||
// about it
|
||||
return Err(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} changed state to {current_policy:?}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut cursor = tenants.iter_mut().skip_while({
|
||||
let skip_past = last_inspected_shard;
|
||||
move |(tid, _)| match skip_past {
|
||||
Some(last) => **tid != last,
|
||||
None => false,
|
||||
}
|
||||
});
|
||||
|
||||
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
|
||||
let (tid, tenant_shard) = match cursor.next() {
|
||||
Some(some) => some,
|
||||
None => {
|
||||
inspected_all_shards = true;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
tracing::warn!(%tid, "Scheduling error when draining pageserver {} : {e}", node_id);
|
||||
}
|
||||
Ok(()) => {
|
||||
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
|
||||
if let Some(some) = waiter {
|
||||
waiters.push(some);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
last_inspected_shard = Some(*tid);
|
||||
}
|
||||
}
|
||||
|
||||
waiters = self
|
||||
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
|
||||
.await;
|
||||
}
|
||||
|
||||
while !waiters.is_empty() {
|
||||
waiters = self
|
||||
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
|
||||
.await;
|
||||
}
|
||||
|
||||
// At this point we have done the best we could to drain shards from this node.
|
||||
// Set the node scheduling policy to `[NodeSchedulingPolicy::PauseForRestart]`
|
||||
// to complete the drain.
|
||||
if let Err(err) = self
|
||||
.node_configure(node_id, None, Some(NodeSchedulingPolicy::PauseForRestart))
|
||||
.await
|
||||
{
|
||||
// This is not fatal. Anything that is polling the node scheduling policy to detect
|
||||
// the end of the drain operations will hang, but all such places should enforce an
|
||||
// overall timeout. The scheduling policy will be updated upon node re-attach and/or
|
||||
// by the counterpart fill operation.
|
||||
tracing::warn!(%node_id, "Failed to finalise drain by setting scheduling policy: {err}");
|
||||
}
|
||||
|
||||
tracing::info!(%node_id, "Completed drain background operation");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a node fill plan (pick secondaries to promote) that meets the following requirements:
|
||||
/// 1. The node should be filled until it reaches the expected cluster average of
|
||||
/// attached shards. If there are not enough secondaries on the node, the plan stops early.
|
||||
/// 2. Select tenant shards to promote such that the number of attached shards is balanced
|
||||
/// throughout the cluster. We achieve this by picking tenant shards from each node,
|
||||
/// starting from the ones with the largest number of attached shards, until the node
|
||||
/// reaches the expected cluster average.
|
||||
fn fill_node_plan(&self, node_id: NodeId) -> Vec<TenantShardId> {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
|
||||
|
||||
let mut tids_by_node = locked
|
||||
.tenants
|
||||
.iter_mut()
|
||||
.filter_map(|(tid, tenant_shard)| {
|
||||
if tenant_shard.intent.get_secondary().contains(&node_id) {
|
||||
if let Some(primary) = tenant_shard.intent.get_attached() {
|
||||
return Some((*primary, *tid));
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
})
|
||||
.into_group_map();
|
||||
|
||||
let expected_attached = locked.scheduler.expected_attached_shard_count();
|
||||
let nodes_by_load = locked.scheduler.nodes_by_attached_shard_count();
|
||||
|
||||
let mut plan = Vec::new();
|
||||
for (node_id, attached) in nodes_by_load {
|
||||
if plan.len() >= fill_requirement
|
||||
|| tids_by_node.is_empty()
|
||||
|| attached <= expected_attached
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
let can_take = attached - expected_attached;
|
||||
let mut remove_node = false;
|
||||
for _ in 0..can_take {
|
||||
match tids_by_node.get_mut(&node_id) {
|
||||
Some(tids) => match tids.pop() {
|
||||
Some(tid) => {
|
||||
plan.push(tid);
|
||||
}
|
||||
None => {
|
||||
remove_node = true;
|
||||
break;
|
||||
}
|
||||
},
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if remove_node {
|
||||
tids_by_node.remove(&node_id);
|
||||
}
|
||||
}
|
||||
|
||||
plan
|
||||
}
|
||||
|
||||
/// Fill a node by promoting its secondaries until the cluster is balanced
|
||||
/// with regards to attached shard counts. Note that this operation only
|
||||
/// makes sense as a counterpart to the drain implemented in [`Service::drain_node`].
|
||||
/// This is a long running operation and it should run as a separate Tokio task.
|
||||
pub(crate) async fn fill_node(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
// TODO(vlad): Currently this operates on the assumption that all
|
||||
// secondaries are warm. This is not always true (e.g. we just migrated the
|
||||
// tenant). Take that into consideration by checking the secondary status.
|
||||
|
||||
tracing::info!(%node_id, "Starting fill background operation");
|
||||
|
||||
let mut tids_to_promote = self.fill_node_plan(node_id);
|
||||
|
||||
let mut waiters = Vec::new();
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
// Execute the plan we've composed above. Before aplying each move from the plan,
|
||||
// we validate to ensure that it has not gone stale in the meantime.
|
||||
while !tids_to_promote.is_empty() {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(OperationError::Cancelled);
|
||||
}
|
||||
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} was removed").into(),
|
||||
))?;
|
||||
|
||||
let current_policy = node.get_scheduling();
|
||||
if !matches!(current_policy, NodeSchedulingPolicy::Filling) {
|
||||
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
|
||||
// about it
|
||||
return Err(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} changed state to {current_policy:?}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
|
||||
if let Some(tid) = tids_to_promote.pop() {
|
||||
if let Some(tenant_shard) = tenants.get_mut(&tid) {
|
||||
// If the node being filled is not a secondary anymore,
|
||||
// skip the promotion.
|
||||
if !tenant_shard.intent.get_secondary().contains(&node_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
tenant_shard.intent.promote_attached(scheduler, node_id);
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
tracing::warn!(%tid, "Scheduling error when filling pageserver {} : {e}", node_id);
|
||||
}
|
||||
Ok(()) => {
|
||||
if let Some(waiter) =
|
||||
self.maybe_reconcile_shard(tenant_shard, nodes)
|
||||
{
|
||||
waiters.push(waiter);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
waiters = self
|
||||
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
|
||||
.await;
|
||||
}
|
||||
|
||||
while !waiters.is_empty() {
|
||||
waiters = self
|
||||
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
|
||||
.await;
|
||||
}
|
||||
|
||||
if let Err(err) = self
|
||||
.node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
|
||||
.await
|
||||
{
|
||||
// This isn't a huge issue since the filling process starts upon request. However, it
|
||||
// will prevent the next drain from starting. The only case in which this can fail
|
||||
// is database unavailability. Such a case will require manual intervention.
|
||||
tracing::error!(%node_id, "Failed to finalise fill by setting scheduling policy: {err}");
|
||||
}
|
||||
|
||||
tracing::info!(%node_id, "Completed fill background operation");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,9 +8,11 @@ use crate::{
|
||||
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
|
||||
persistence::TenantShardPersistence,
|
||||
reconciler::ReconcileUnits,
|
||||
scheduler::{AffinityScore, MaySchedule, ScheduleContext},
|
||||
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
|
||||
};
|
||||
use pageserver_api::controller_api::{
|
||||
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
|
||||
};
|
||||
use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
|
||||
use pageserver_api::{
|
||||
models::{LocationConfig, LocationConfigMode, TenantConfig},
|
||||
shard::{ShardIdentity, TenantShardId},
|
||||
@@ -153,7 +155,7 @@ impl IntentState {
|
||||
}
|
||||
pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
|
||||
if let Some(node_id) = node_id {
|
||||
scheduler.node_inc_ref(node_id);
|
||||
scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
|
||||
}
|
||||
Self {
|
||||
attached: node_id,
|
||||
@@ -164,10 +166,10 @@ impl IntentState {
|
||||
pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
|
||||
if self.attached != new_attached {
|
||||
if let Some(old_attached) = self.attached.take() {
|
||||
scheduler.node_dec_ref(old_attached);
|
||||
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
|
||||
}
|
||||
if let Some(new_attached) = &new_attached {
|
||||
scheduler.node_inc_ref(*new_attached);
|
||||
scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
|
||||
}
|
||||
self.attached = new_attached;
|
||||
}
|
||||
@@ -177,22 +179,27 @@ impl IntentState {
|
||||
/// secondary to attached while maintaining the scheduler's reference counts.
|
||||
pub(crate) fn promote_attached(
|
||||
&mut self,
|
||||
_scheduler: &mut Scheduler,
|
||||
scheduler: &mut Scheduler,
|
||||
promote_secondary: NodeId,
|
||||
) {
|
||||
// If we call this with a node that isn't in secondary, it would cause incorrect
|
||||
// scheduler reference counting, since we assume the node is already referenced as a secondary.
|
||||
debug_assert!(self.secondary.contains(&promote_secondary));
|
||||
|
||||
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
|
||||
// need to call into it here.
|
||||
self.secondary.retain(|n| n != &promote_secondary);
|
||||
|
||||
let demoted = self.attached;
|
||||
self.attached = Some(promote_secondary);
|
||||
|
||||
scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
|
||||
if let Some(demoted) = demoted {
|
||||
scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
|
||||
debug_assert!(!self.secondary.contains(&new_secondary));
|
||||
scheduler.node_inc_ref(new_secondary);
|
||||
scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
|
||||
self.secondary.push(new_secondary);
|
||||
}
|
||||
|
||||
@@ -200,27 +207,27 @@ impl IntentState {
|
||||
pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
|
||||
let index = self.secondary.iter().position(|n| *n == node_id);
|
||||
if let Some(index) = index {
|
||||
scheduler.node_dec_ref(node_id);
|
||||
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
|
||||
self.secondary.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
|
||||
for secondary in self.secondary.drain(..) {
|
||||
scheduler.node_dec_ref(secondary);
|
||||
scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove the last secondary node from the list of secondaries
|
||||
pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
|
||||
if let Some(node_id) = self.secondary.pop() {
|
||||
scheduler.node_dec_ref(node_id);
|
||||
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
|
||||
if let Some(old_attached) = self.attached.take() {
|
||||
scheduler.node_dec_ref(old_attached);
|
||||
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
|
||||
}
|
||||
|
||||
self.clear_secondary(scheduler);
|
||||
@@ -251,12 +258,11 @@ impl IntentState {
|
||||
/// forget the location on the offline node.
|
||||
///
|
||||
/// Returns true if a change was made
|
||||
pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool {
|
||||
pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
|
||||
if self.attached == Some(node_id) {
|
||||
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
|
||||
// need to call into it here.
|
||||
self.attached = None;
|
||||
self.secondary.push(node_id);
|
||||
scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
@@ -307,6 +313,12 @@ pub(crate) struct ReconcilerWaiter {
|
||||
seq: Sequence,
|
||||
}
|
||||
|
||||
pub(crate) enum ReconcilerStatus {
|
||||
Done,
|
||||
Failed,
|
||||
InProgress,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum ReconcileWaitError {
|
||||
#[error("Timeout waiting for shard {0}")]
|
||||
@@ -369,6 +381,16 @@ impl ReconcilerWaiter {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_status(&self) -> ReconcilerStatus {
|
||||
if self.seq_wait.would_wait_for(self.seq).is_err() {
|
||||
ReconcilerStatus::Done
|
||||
} else if self.error_seq_wait.would_wait_for(self.seq).is_err() {
|
||||
ReconcilerStatus::Failed
|
||||
} else {
|
||||
ReconcilerStatus::InProgress
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Having spawned a reconciler task, the tenant shard's state will carry enough
|
||||
@@ -593,7 +615,7 @@ impl TenantShard {
|
||||
Secondary => {
|
||||
if let Some(node_id) = self.intent.get_attached() {
|
||||
// Populate secondary by demoting the attached node
|
||||
self.intent.demote_attached(*node_id);
|
||||
self.intent.demote_attached(scheduler, *node_id);
|
||||
modified = true;
|
||||
} else if self.intent.secondary.is_empty() {
|
||||
// Populate secondary by scheduling a fresh node
|
||||
@@ -648,13 +670,17 @@ impl TenantShard {
|
||||
let mut scores = all_pageservers
|
||||
.iter()
|
||||
.flat_map(|node_id| {
|
||||
if matches!(
|
||||
nodes
|
||||
.get(node_id)
|
||||
.map(|n| n.may_schedule())
|
||||
.unwrap_or(MaySchedule::No),
|
||||
MaySchedule::No
|
||||
let node = nodes.get(node_id);
|
||||
if node.is_none() {
|
||||
None
|
||||
} else if matches!(
|
||||
node.unwrap().get_scheduling(),
|
||||
NodeSchedulingPolicy::Filling
|
||||
) {
|
||||
// If the node is currently filling, don't count it as a candidate to avoid,
|
||||
// racing with the background fill.
|
||||
None
|
||||
} else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
|
||||
None
|
||||
} else {
|
||||
let affinity_score = schedule_context.get_node_affinity(*node_id);
|
||||
@@ -783,7 +809,7 @@ impl TenantShard {
|
||||
old_attached_node_id,
|
||||
new_attached_node_id,
|
||||
}) => {
|
||||
self.intent.demote_attached(old_attached_node_id);
|
||||
self.intent.demote_attached(scheduler, old_attached_node_id);
|
||||
self.intent
|
||||
.promote_attached(scheduler, new_attached_node_id);
|
||||
}
|
||||
@@ -1321,7 +1347,9 @@ pub(crate) mod tests {
|
||||
assert_ne!(attached_node_id, secondary_node_id);
|
||||
|
||||
// Notifying the attached node is offline should demote it to a secondary
|
||||
let changed = tenant_shard.intent.demote_attached(attached_node_id);
|
||||
let changed = tenant_shard
|
||||
.intent
|
||||
.demote_attached(&mut scheduler, attached_node_id);
|
||||
assert!(changed);
|
||||
assert!(tenant_shard.intent.attached.is_none());
|
||||
assert_eq!(tenant_shard.intent.secondary.len(), 2);
|
||||
@@ -1604,7 +1632,14 @@ pub(crate) mod tests {
|
||||
|
||||
// We should see equal number of locations on the two nodes.
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
|
||||
// Scheduling does not consider the number of attachments picking the initial
|
||||
// pageserver to attach to (hence the assertion that all primaries are on the
|
||||
// same node)
|
||||
// TODO: Tweak the scheduling to evenly distribute attachments for new shards.
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 4);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
|
||||
|
||||
// Add another two nodes: we should see the shards spread out when their optimize
|
||||
// methods are called
|
||||
@@ -1613,9 +1648,16 @@ pub(crate) mod tests {
|
||||
optimize_til_idle(&nodes, &mut scheduler, &mut shards);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
|
||||
|
||||
for shard in shards.iter_mut() {
|
||||
shard.intent.clear(&mut scheduler);
|
||||
|
||||
@@ -2213,6 +2213,30 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_drain(self, node_id):
|
||||
log.info(f"node_drain({node_id})")
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_fill(self, node_id):
|
||||
log.info(f"node_fill({node_id})")
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_status(self, node_id):
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.env.storage_controller_api}/control/v1/node/{node_id}",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
return response.json()
|
||||
|
||||
def node_list(self):
|
||||
response = self.request(
|
||||
"GET",
|
||||
|
||||
@@ -37,7 +37,8 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
|
||||
)
|
||||
|
||||
@@ -1477,3 +1477,120 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
|
||||
workload = Workload(env, tenant_id, timeline, branch_name=branch)
|
||||
workload.expect_rows = expect_rows
|
||||
workload.validate()
|
||||
|
||||
|
||||
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Graceful reststart of storage controller clusters use the drain and
|
||||
fill hooks in order to migrate attachments away from pageservers before
|
||||
restarting. In practice, Ansible will drive this process.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_count = 5
|
||||
shard_count_per_tenant = 8
|
||||
total_shards = tenant_count * shard_count_per_tenant
|
||||
tenant_ids = []
|
||||
|
||||
for _ in range(0, tenant_count):
|
||||
tid = TenantId.generate()
|
||||
tenant_ids.append(tid)
|
||||
env.neon_cli.create_tenant(
|
||||
tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant
|
||||
)
|
||||
|
||||
# Give things a chance to settle.
|
||||
# A call to `reconcile_until_idle` could be used here instead,
|
||||
# however since all attachments are placed on the same node,
|
||||
# we'd have to wait for a long time (2 minutes-ish) for optimizations
|
||||
# to quiesce.
|
||||
# TODO: once the initial attachment selection is fixed, update this
|
||||
# to use `reconcile_until_idle`.
|
||||
time.sleep(2)
|
||||
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 2
|
||||
|
||||
def retryable_node_operation(op, ps_id, max_attempts, backoff):
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
op(ps_id)
|
||||
return
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def poll_node_status(node_id, desired_scheduling_policy, max_attempts, backoff):
|
||||
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
status = env.storage_controller.node_status(node_id)
|
||||
policy = status["scheduling"]
|
||||
if policy == desired_scheduling_policy:
|
||||
return
|
||||
else:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise AssertionError(
|
||||
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
|
||||
)
|
||||
|
||||
time.sleep(backoff)
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call failed ({max_attempts} retries left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def assert_shard_counts_balanced(env: NeonEnv, shard_counts, total_shards):
|
||||
# Assert that all nodes have some attached shards
|
||||
assert len(shard_counts) == len(env.pageservers)
|
||||
|
||||
min_shard_count = min(shard_counts.values())
|
||||
max_shard_count = max(shard_counts.values())
|
||||
|
||||
flake_factor = 5 / 100
|
||||
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
|
||||
|
||||
# Perform a graceful rolling restart
|
||||
for ps in env.pageservers:
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5)
|
||||
|
||||
shard_counts = get_node_shard_counts(env, tenant_ids)
|
||||
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
|
||||
# Assert that we've drained the node
|
||||
assert shard_counts[str(ps.id)] == 0
|
||||
# Assert that those shards actually went somewhere
|
||||
assert sum(shard_counts.values()) == total_shards
|
||||
|
||||
ps.restart()
|
||||
poll_node_status(ps.id, "Active", max_attempts=10, backoff=1)
|
||||
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
poll_node_status(ps.id, "Active", max_attempts=6, backoff=5)
|
||||
|
||||
shard_counts = get_node_shard_counts(env, tenant_ids)
|
||||
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
|
||||
assert_shard_counts_balanced(env, shard_counts, total_shards)
|
||||
|
||||
# Now check that shards are reasonably balanced
|
||||
shard_counts = get_node_shard_counts(env, tenant_ids)
|
||||
log.info(f"Shard counts after rolling restart: {shard_counts}")
|
||||
assert_shard_counts_balanced(env, shard_counts, total_shards)
|
||||
|
||||
Reference in New Issue
Block a user