mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 21:40:39 +00:00
Merge pull request #8858 from neondatabase/releases/2024-08-28-compute-only
Compute release 2024-08-28
This commit is contained in:
@@ -113,7 +113,7 @@ md5 = "0.7.0"
|
||||
measured = { version = "0.0.22", features=["lasso"] }
|
||||
measured-process = { version = "0.0.22" }
|
||||
memoffset = "0.8"
|
||||
nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] }
|
||||
nix = { version = "0.27", features = ["dir", "fs", "process", "socket", "signal", "poll"] }
|
||||
notify = "6.0.0"
|
||||
num_cpus = "1.15"
|
||||
num-traits = "0.2.15"
|
||||
|
||||
@@ -236,6 +236,15 @@ impl Key {
|
||||
field5: u8::MAX,
|
||||
field6: u32::MAX,
|
||||
};
|
||||
/// A key slightly smaller than [`Key::MAX`] for use in layer key ranges to avoid them to be confused with L0 layers
|
||||
pub const NON_L0_MAX: Key = Key {
|
||||
field1: u8::MAX,
|
||||
field2: u32::MAX,
|
||||
field3: u32::MAX,
|
||||
field4: u32::MAX,
|
||||
field5: u8::MAX,
|
||||
field6: u32::MAX - 1,
|
||||
};
|
||||
|
||||
pub fn from_hex(s: &str) -> Result<Self> {
|
||||
if s.len() != 36 {
|
||||
|
||||
@@ -718,6 +718,7 @@ pub struct TimelineInfo {
|
||||
pub pg_version: u32,
|
||||
|
||||
pub state: TimelineState,
|
||||
pub is_archived: bool,
|
||||
|
||||
pub walreceiver_status: String,
|
||||
|
||||
|
||||
@@ -126,10 +126,56 @@ fn main() -> anyhow::Result<()> {
|
||||
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
|
||||
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");
|
||||
|
||||
// The tenants directory contains all the pageserver local disk state.
|
||||
// Create if not exists and make sure all the contents are durable before proceeding.
|
||||
// Ensuring durability eliminates a whole bug class where we come up after an unclean shutdown.
|
||||
// After unclea shutdown, we don't know if all the filesystem content we can read via syscalls is actually durable or not.
|
||||
// Examples for that: OOM kill, systemd killing us during shutdown, self abort due to unrecoverable IO error.
|
||||
let tenants_path = conf.tenants_path();
|
||||
if !tenants_path.exists() {
|
||||
utils::crashsafe::create_dir_all(conf.tenants_path())
|
||||
.with_context(|| format!("Failed to create tenants root dir at '{tenants_path}'"))?;
|
||||
{
|
||||
let open = || {
|
||||
nix::dir::Dir::open(
|
||||
tenants_path.as_std_path(),
|
||||
nix::fcntl::OFlag::O_DIRECTORY | nix::fcntl::OFlag::O_RDONLY,
|
||||
nix::sys::stat::Mode::empty(),
|
||||
)
|
||||
};
|
||||
let dirfd = match open() {
|
||||
Ok(dirfd) => dirfd,
|
||||
Err(e) => match e {
|
||||
nix::errno::Errno::ENOENT => {
|
||||
utils::crashsafe::create_dir_all(&tenants_path).with_context(|| {
|
||||
format!("Failed to create tenants root dir at '{tenants_path}'")
|
||||
})?;
|
||||
open().context("open tenants dir after creating it")?
|
||||
}
|
||||
e => anyhow::bail!(e),
|
||||
},
|
||||
};
|
||||
|
||||
let started = Instant::now();
|
||||
// Linux guarantees durability for syncfs.
|
||||
// POSIX doesn't have syncfs, and further does not actually guarantee durability of sync().
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
use std::os::fd::AsRawFd;
|
||||
nix::unistd::syncfs(dirfd.as_raw_fd()).context("syncfs")?;
|
||||
}
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
// macOS is not a production platform for Neon, don't even bother.
|
||||
drop(dirfd);
|
||||
}
|
||||
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
|
||||
{
|
||||
compile_error!("Unsupported OS");
|
||||
}
|
||||
|
||||
let elapsed = started.elapsed();
|
||||
info!(
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"made tenant directory contents durable"
|
||||
);
|
||||
}
|
||||
|
||||
// Initialize up failpoints support
|
||||
|
||||
@@ -318,6 +318,24 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::TimelineArchivalError> for ApiError {
|
||||
fn from(value: crate::tenant::TimelineArchivalError) -> Self {
|
||||
use crate::tenant::TimelineArchivalError::*;
|
||||
match value {
|
||||
NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
|
||||
Timeout => ApiError::Timeout("hit pageserver internal timeout".into()),
|
||||
HasUnarchivedChildren(children) => ApiError::PreconditionFailed(
|
||||
format!(
|
||||
"Cannot archive timeline which has non-archived child timelines: {children:?}"
|
||||
)
|
||||
.into_boxed_str(),
|
||||
),
|
||||
a @ AlreadyInProgress => ApiError::Conflict(a.to_string()),
|
||||
Other(e) => ApiError::InternalServerError(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
|
||||
fn from(value: crate::tenant::mgr::DeleteTimelineError) -> Self {
|
||||
use crate::tenant::mgr::DeleteTimelineError::*;
|
||||
@@ -405,6 +423,8 @@ async fn build_timeline_info_common(
|
||||
let current_logical_size = timeline.get_current_logical_size(logical_size_task_priority, ctx);
|
||||
let current_physical_size = Some(timeline.layer_size_sum().await);
|
||||
let state = timeline.current_state();
|
||||
// Report is_archived = false if the timeline is still loading
|
||||
let is_archived = timeline.is_archived().unwrap_or(false);
|
||||
let remote_consistent_lsn_projected = timeline
|
||||
.get_remote_consistent_lsn_projected()
|
||||
.unwrap_or(Lsn(0));
|
||||
@@ -445,6 +465,7 @@ async fn build_timeline_info_common(
|
||||
pg_version: timeline.pg_version,
|
||||
|
||||
state,
|
||||
is_archived,
|
||||
|
||||
walreceiver_status,
|
||||
|
||||
@@ -686,9 +707,7 @@ async fn timeline_archival_config_handler(
|
||||
|
||||
tenant
|
||||
.apply_timeline_archival_config(timeline_id, request_data.state)
|
||||
.await
|
||||
.context("applying archival config")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.await?;
|
||||
Ok::<_, ApiError>(())
|
||||
}
|
||||
.instrument(info_span!("timeline_archival_config",
|
||||
|
||||
@@ -501,6 +501,38 @@ impl Debug for DeleteTimelineError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error)]
|
||||
pub enum TimelineArchivalError {
|
||||
#[error("NotFound")]
|
||||
NotFound,
|
||||
|
||||
#[error("Timeout")]
|
||||
Timeout,
|
||||
|
||||
#[error("HasUnarchivedChildren")]
|
||||
HasUnarchivedChildren(Vec<TimelineId>),
|
||||
|
||||
#[error("Timeline archival is already in progress")]
|
||||
AlreadyInProgress,
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl Debug for TimelineArchivalError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::NotFound => write!(f, "NotFound"),
|
||||
Self::Timeout => write!(f, "Timeout"),
|
||||
Self::HasUnarchivedChildren(c) => {
|
||||
f.debug_tuple("HasUnarchivedChildren").field(c).finish()
|
||||
}
|
||||
Self::AlreadyInProgress => f.debug_tuple("AlreadyInProgress").finish(),
|
||||
Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum SetStoppingError {
|
||||
AlreadyStopping(completion::Barrier),
|
||||
Broken,
|
||||
@@ -1326,24 +1358,50 @@ impl Tenant {
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
state: TimelineArchivalState,
|
||||
) -> anyhow::Result<()> {
|
||||
let timeline = self
|
||||
.get_timeline(timeline_id, false)
|
||||
.context("Cannot apply timeline archival config to inexistent timeline")?;
|
||||
) -> Result<(), TimelineArchivalError> {
|
||||
info!("setting timeline archival config");
|
||||
let timeline = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
|
||||
let timeline = match timelines.get(&timeline_id) {
|
||||
Some(t) => t,
|
||||
None => return Err(TimelineArchivalError::NotFound),
|
||||
};
|
||||
|
||||
// Ensure that there are no non-archived child timelines
|
||||
let children: Vec<TimelineId> = timelines
|
||||
.iter()
|
||||
.filter_map(|(id, entry)| {
|
||||
if entry.get_ancestor_timeline_id() != Some(timeline_id) {
|
||||
return None;
|
||||
}
|
||||
if entry.is_archived() == Some(true) {
|
||||
return None;
|
||||
}
|
||||
Some(*id)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if !children.is_empty() && state == TimelineArchivalState::Archived {
|
||||
return Err(TimelineArchivalError::HasUnarchivedChildren(children));
|
||||
}
|
||||
Arc::clone(timeline)
|
||||
};
|
||||
|
||||
let upload_needed = timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_timeline_archival_state(state)?;
|
||||
|
||||
if upload_needed {
|
||||
info!("Uploading new state");
|
||||
const MAX_WAIT: Duration = Duration::from_secs(10);
|
||||
let Ok(v) =
|
||||
tokio::time::timeout(MAX_WAIT, timeline.remote_client.wait_completion()).await
|
||||
else {
|
||||
tracing::warn!("reached timeout for waiting on upload queue");
|
||||
bail!("reached timeout for upload queue flush");
|
||||
return Err(TimelineArchivalError::Timeout);
|
||||
};
|
||||
v?;
|
||||
v.map_err(|e| TimelineArchivalError::Other(anyhow::anyhow!(e)))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -7013,18 +7071,14 @@ mod tests {
|
||||
vec![
|
||||
// Image layer at GC horizon
|
||||
PersistentLayerKey {
|
||||
key_range: {
|
||||
let mut key = Key::MAX;
|
||||
key.field6 -= 1;
|
||||
Key::MIN..key
|
||||
},
|
||||
key_range: Key::MIN..Key::NON_L0_MAX,
|
||||
lsn_range: Lsn(0x30)..Lsn(0x31),
|
||||
is_delta: false
|
||||
},
|
||||
// The delta layer that is cut in the middle
|
||||
// The delta layer covers the full range (with the layer key hack to avoid being recognized as L0)
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(3)..get_key(4),
|
||||
lsn_range: Lsn(0x30)..Lsn(0x41),
|
||||
key_range: Key::MIN..Key::NON_L0_MAX,
|
||||
lsn_range: Lsn(0x30)..Lsn(0x48),
|
||||
is_delta: true
|
||||
},
|
||||
// The delta3 layer that should not be picked for the compaction
|
||||
@@ -8004,6 +8058,214 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple_bottom_most_compaction_with_retain_lsns_single_key() -> anyhow::Result<()>
|
||||
{
|
||||
let harness =
|
||||
TenantHarness::create("test_simple_bottom_most_compaction_with_retain_lsns_single_key")
|
||||
.await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
fn get_key(id: u32) -> Key {
|
||||
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
|
||||
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
key.field6 = id;
|
||||
key
|
||||
}
|
||||
|
||||
let img_layer = (0..10)
|
||||
.map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10"))))
|
||||
.collect_vec();
|
||||
|
||||
let delta1 = vec![
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
|
||||
),
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x28),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x28")),
|
||||
),
|
||||
];
|
||||
let delta2 = vec![
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x30),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x30")),
|
||||
),
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x38),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x38")),
|
||||
),
|
||||
];
|
||||
let delta3 = vec![
|
||||
(
|
||||
get_key(8),
|
||||
Lsn(0x48),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x48")),
|
||||
),
|
||||
(
|
||||
get_key(9),
|
||||
Lsn(0x48),
|
||||
Value::WalRecord(NeonWalRecord::wal_append("@0x48")),
|
||||
),
|
||||
];
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![
|
||||
// delta1 and delta 2 only contain a single key but multiple updates
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x30), delta1),
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x30)..Lsn(0x50), delta2),
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x50), delta3),
|
||||
], // delta layers
|
||||
vec![(Lsn(0x10), img_layer)], // image layers
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
{
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
retain_lsns: vec![
|
||||
(Lsn(0x10), tline.timeline_id),
|
||||
(Lsn(0x20), tline.timeline_id),
|
||||
],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x30),
|
||||
space: Lsn(0x30),
|
||||
},
|
||||
leases: Default::default(),
|
||||
within_ancestor_pitr: false,
|
||||
};
|
||||
}
|
||||
|
||||
let expected_result = [
|
||||
Bytes::from_static(b"value 0@0x10"),
|
||||
Bytes::from_static(b"value 1@0x10@0x20@0x28@0x30@0x38"),
|
||||
Bytes::from_static(b"value 2@0x10"),
|
||||
Bytes::from_static(b"value 3@0x10"),
|
||||
Bytes::from_static(b"value 4@0x10"),
|
||||
Bytes::from_static(b"value 5@0x10"),
|
||||
Bytes::from_static(b"value 6@0x10"),
|
||||
Bytes::from_static(b"value 7@0x10"),
|
||||
Bytes::from_static(b"value 8@0x10@0x48"),
|
||||
Bytes::from_static(b"value 9@0x10@0x48"),
|
||||
];
|
||||
|
||||
let expected_result_at_gc_horizon = [
|
||||
Bytes::from_static(b"value 0@0x10"),
|
||||
Bytes::from_static(b"value 1@0x10@0x20@0x28@0x30"),
|
||||
Bytes::from_static(b"value 2@0x10"),
|
||||
Bytes::from_static(b"value 3@0x10"),
|
||||
Bytes::from_static(b"value 4@0x10"),
|
||||
Bytes::from_static(b"value 5@0x10"),
|
||||
Bytes::from_static(b"value 6@0x10"),
|
||||
Bytes::from_static(b"value 7@0x10"),
|
||||
Bytes::from_static(b"value 8@0x10"),
|
||||
Bytes::from_static(b"value 9@0x10"),
|
||||
];
|
||||
|
||||
let expected_result_at_lsn_20 = [
|
||||
Bytes::from_static(b"value 0@0x10"),
|
||||
Bytes::from_static(b"value 1@0x10@0x20"),
|
||||
Bytes::from_static(b"value 2@0x10"),
|
||||
Bytes::from_static(b"value 3@0x10"),
|
||||
Bytes::from_static(b"value 4@0x10"),
|
||||
Bytes::from_static(b"value 5@0x10"),
|
||||
Bytes::from_static(b"value 6@0x10"),
|
||||
Bytes::from_static(b"value 7@0x10"),
|
||||
Bytes::from_static(b"value 8@0x10"),
|
||||
Bytes::from_static(b"value 9@0x10"),
|
||||
];
|
||||
|
||||
let expected_result_at_lsn_10 = [
|
||||
Bytes::from_static(b"value 0@0x10"),
|
||||
Bytes::from_static(b"value 1@0x10"),
|
||||
Bytes::from_static(b"value 2@0x10"),
|
||||
Bytes::from_static(b"value 3@0x10"),
|
||||
Bytes::from_static(b"value 4@0x10"),
|
||||
Bytes::from_static(b"value 5@0x10"),
|
||||
Bytes::from_static(b"value 6@0x10"),
|
||||
Bytes::from_static(b"value 7@0x10"),
|
||||
Bytes::from_static(b"value 8@0x10"),
|
||||
Bytes::from_static(b"value 9@0x10"),
|
||||
];
|
||||
|
||||
let verify_result = || async {
|
||||
let gc_horizon = {
|
||||
let gc_info = tline.gc_info.read().unwrap();
|
||||
gc_info.cutoffs.time
|
||||
};
|
||||
for idx in 0..10 {
|
||||
assert_eq!(
|
||||
tline
|
||||
.get(get_key(idx as u32), Lsn(0x50), &ctx)
|
||||
.await
|
||||
.unwrap(),
|
||||
&expected_result[idx]
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get(get_key(idx as u32), gc_horizon, &ctx)
|
||||
.await
|
||||
.unwrap(),
|
||||
&expected_result_at_gc_horizon[idx]
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get(get_key(idx as u32), Lsn(0x20), &ctx)
|
||||
.await
|
||||
.unwrap(),
|
||||
&expected_result_at_lsn_20[idx]
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get(get_key(idx as u32), Lsn(0x10), &ctx)
|
||||
.await
|
||||
.unwrap(),
|
||||
&expected_result_at_lsn_10[idx]
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
verify_result().await;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let mut dryrun_flags = EnumSet::new();
|
||||
dryrun_flags.insert(CompactFlags::DryRun);
|
||||
|
||||
tline
|
||||
.compact_with_gc(&cancel, dryrun_flags, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
// We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs
|
||||
// cleaning things up, and therefore, we don't do sanity checks on the layer map during unit tests.
|
||||
verify_result().await;
|
||||
|
||||
tline
|
||||
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
verify_result().await;
|
||||
|
||||
// compact again
|
||||
tline
|
||||
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
verify_result().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?;
|
||||
|
||||
@@ -8,7 +8,6 @@ mod layer_desc;
|
||||
mod layer_name;
|
||||
pub mod merge_iterator;
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod split_writer;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
|
||||
@@ -36,6 +36,7 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
|
||||
use crate::tenant::disk_btree::{
|
||||
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
|
||||
};
|
||||
use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
@@ -568,7 +569,6 @@ impl DeltaLayerWriterInner {
|
||||
// 5GB limit for objects without multipart upload (which we don't want to use)
|
||||
// Make it a little bit below to account for differing GB units
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
|
||||
const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
|
||||
ensure!(
|
||||
metadata.len() <= S3_UPLOAD_LIMIT,
|
||||
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
|
||||
@@ -702,12 +702,10 @@ impl DeltaLayerWriter {
|
||||
self.inner.take().unwrap().finish(key_end, ctx).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn num_keys(&self) -> usize {
|
||||
self.inner.as_ref().unwrap().num_keys
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn estimated_size(&self) -> u64 {
|
||||
let inner = self.inner.as_ref().unwrap();
|
||||
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
|
||||
|
||||
@@ -716,10 +716,6 @@ struct ImageLayerWriterInner {
|
||||
}
|
||||
|
||||
impl ImageLayerWriterInner {
|
||||
fn size(&self) -> u64 {
|
||||
self.tree.borrow_writer().size() + self.blob_writer.size()
|
||||
}
|
||||
|
||||
///
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
@@ -854,13 +850,19 @@ impl ImageLayerWriterInner {
|
||||
res?;
|
||||
}
|
||||
|
||||
let final_key_range = if let Some(end_key) = end_key {
|
||||
self.key_range.start..end_key
|
||||
} else {
|
||||
self.key_range.clone()
|
||||
};
|
||||
|
||||
// Fill in the summary on blk 0
|
||||
let summary = Summary {
|
||||
magic: IMAGE_FILE_MAGIC,
|
||||
format_version: STORAGE_FORMAT_VERSION,
|
||||
tenant_id: self.tenant_shard_id.tenant_id,
|
||||
timeline_id: self.timeline_id,
|
||||
key_range: self.key_range.clone(),
|
||||
key_range: final_key_range.clone(),
|
||||
lsn: self.lsn,
|
||||
index_start_blk,
|
||||
index_root_blk,
|
||||
@@ -881,11 +883,7 @@ impl ImageLayerWriterInner {
|
||||
let desc = PersistentLayerDesc::new_img(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
if let Some(end_key) = end_key {
|
||||
self.key_range.start..end_key
|
||||
} else {
|
||||
self.key_range.clone()
|
||||
},
|
||||
final_key_range,
|
||||
self.lsn,
|
||||
metadata.len(),
|
||||
);
|
||||
@@ -974,14 +972,12 @@ impl ImageLayerWriter {
|
||||
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
/// Estimated size of the image layer.
|
||||
pub(crate) fn estimated_size(&self) -> u64 {
|
||||
let inner = self.inner.as_ref().unwrap();
|
||||
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn num_keys(&self) -> usize {
|
||||
self.inner.as_ref().unwrap().num_keys
|
||||
}
|
||||
@@ -997,7 +993,6 @@ impl ImageLayerWriter {
|
||||
self.inner.take().unwrap().finish(timeline, ctx, None).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
/// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
|
||||
pub(super) async fn finish_with_end_key(
|
||||
mut self,
|
||||
@@ -1011,10 +1006,6 @@ impl ImageLayerWriter {
|
||||
.finish(timeline, ctx, Some(end_key))
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) fn size(&self) -> u64 {
|
||||
self.inner.as_ref().unwrap().size()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ImageLayerWriter {
|
||||
|
||||
@@ -35,6 +35,8 @@ mod tests;
|
||||
#[cfg(test)]
|
||||
mod failpoints;
|
||||
|
||||
pub const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
|
||||
|
||||
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
|
||||
/// range of LSNs.
|
||||
///
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{ops::Range, sync::Arc};
|
||||
use std::{future::Future, ops::Range, sync::Arc};
|
||||
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::{Key, KEY_SIZE};
|
||||
@@ -7,7 +7,32 @@ use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
|
||||
|
||||
use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
|
||||
use super::layer::S3_UPLOAD_LIMIT;
|
||||
use super::{
|
||||
DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
|
||||
};
|
||||
|
||||
pub(crate) enum SplitWriterResult {
|
||||
Produced(ResidentLayer),
|
||||
Discarded(PersistentLayerKey),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl SplitWriterResult {
|
||||
fn into_resident_layer(self) -> ResidentLayer {
|
||||
match self {
|
||||
SplitWriterResult::Produced(layer) => layer,
|
||||
SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
|
||||
}
|
||||
}
|
||||
|
||||
fn into_discarded_layer(self) -> PersistentLayerKey {
|
||||
match self {
|
||||
SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
|
||||
SplitWriterResult::Discarded(layer) => layer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An image writer that takes images and produces multiple image layers. The interface does not
|
||||
/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
|
||||
@@ -16,11 +41,12 @@ use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
|
||||
pub struct SplitImageLayerWriter {
|
||||
inner: ImageLayerWriter,
|
||||
target_layer_size: u64,
|
||||
generated_layers: Vec<ResidentLayer>,
|
||||
generated_layers: Vec<SplitWriterResult>,
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
lsn: Lsn,
|
||||
start_key: Key,
|
||||
}
|
||||
|
||||
impl SplitImageLayerWriter {
|
||||
@@ -49,16 +75,22 @@ impl SplitImageLayerWriter {
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
lsn,
|
||||
start_key,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn put_image(
|
||||
pub async fn put_image_with_discard_fn<D, F>(
|
||||
&mut self,
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
discard: D,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
D: FnOnce(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
{
|
||||
// The current estimation is an upper bound of the space that the key/image could take
|
||||
// because we did not consider compression in this estimation. The resulting image layer
|
||||
// could be smaller than the target size.
|
||||
@@ -76,33 +108,87 @@ impl SplitImageLayerWriter {
|
||||
)
|
||||
.await?;
|
||||
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
|
||||
self.generated_layers.push(
|
||||
prev_image_writer
|
||||
.finish_with_end_key(tline, key, ctx)
|
||||
.await?,
|
||||
);
|
||||
let layer_key = PersistentLayerKey {
|
||||
key_range: self.start_key..key,
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
|
||||
is_delta: false,
|
||||
};
|
||||
self.start_key = key;
|
||||
|
||||
if discard(&layer_key).await {
|
||||
drop(prev_image_writer);
|
||||
self.generated_layers
|
||||
.push(SplitWriterResult::Discarded(layer_key));
|
||||
} else {
|
||||
self.generated_layers.push(SplitWriterResult::Produced(
|
||||
prev_image_writer
|
||||
.finish_with_end_key(tline, key, ctx)
|
||||
.await?,
|
||||
));
|
||||
}
|
||||
}
|
||||
self.inner.put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish(
|
||||
#[cfg(test)]
|
||||
pub async fn put_image(
|
||||
&mut self,
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false })
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish_with_discard_fn<D, F>(
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
) -> anyhow::Result<Vec<ResidentLayer>> {
|
||||
discard: D,
|
||||
) -> anyhow::Result<Vec<SplitWriterResult>>
|
||||
where
|
||||
D: FnOnce(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
{
|
||||
let Self {
|
||||
mut generated_layers,
|
||||
inner,
|
||||
..
|
||||
} = self;
|
||||
generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?);
|
||||
if inner.num_keys() == 0 {
|
||||
return Ok(generated_layers);
|
||||
}
|
||||
let layer_key = PersistentLayerKey {
|
||||
key_range: self.start_key..end_key,
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
|
||||
is_delta: false,
|
||||
};
|
||||
if discard(&layer_key).await {
|
||||
generated_layers.push(SplitWriterResult::Discarded(layer_key));
|
||||
} else {
|
||||
generated_layers.push(SplitWriterResult::Produced(
|
||||
inner.finish_with_end_key(tline, end_key, ctx).await?,
|
||||
));
|
||||
}
|
||||
Ok(generated_layers)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn finish(
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
) -> anyhow::Result<Vec<SplitWriterResult>> {
|
||||
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
|
||||
.await
|
||||
}
|
||||
|
||||
/// When split writer fails, the caller should call this function and handle partially generated layers.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn take(self) -> anyhow::Result<(Vec<ResidentLayer>, ImageLayerWriter)> {
|
||||
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
|
||||
Ok((self.generated_layers, self.inner))
|
||||
}
|
||||
}
|
||||
@@ -110,15 +196,21 @@ impl SplitImageLayerWriter {
|
||||
/// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
|
||||
/// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
|
||||
/// to be cleaned up).
|
||||
///
|
||||
/// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
|
||||
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
|
||||
/// will split them into multiple files based on size.
|
||||
#[must_use]
|
||||
pub struct SplitDeltaLayerWriter {
|
||||
inner: DeltaLayerWriter,
|
||||
target_layer_size: u64,
|
||||
generated_layers: Vec<ResidentLayer>,
|
||||
generated_layers: Vec<SplitWriterResult>,
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
lsn_range: Range<Lsn>,
|
||||
last_key_written: Key,
|
||||
start_key: Key,
|
||||
}
|
||||
|
||||
impl SplitDeltaLayerWriter {
|
||||
@@ -147,9 +239,74 @@ impl SplitDeltaLayerWriter {
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
lsn_range,
|
||||
last_key_written: Key::MIN,
|
||||
start_key,
|
||||
})
|
||||
}
|
||||
|
||||
/// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end.
|
||||
pub async fn put_value_with_discard_fn<D, F>(
|
||||
&mut self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
discard: D,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
D: FnOnce(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
{
|
||||
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
|
||||
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
|
||||
//
|
||||
// Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
|
||||
// strategy. https://github.com/neondatabase/neon/issues/8837
|
||||
let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
|
||||
if self.inner.num_keys() >= 1
|
||||
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
{
|
||||
if key != self.last_key_written {
|
||||
let next_delta_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
|
||||
let layer_key = PersistentLayerKey {
|
||||
key_range: self.start_key..key,
|
||||
lsn_range: self.lsn_range.clone(),
|
||||
is_delta: true,
|
||||
};
|
||||
self.start_key = key;
|
||||
if discard(&layer_key).await {
|
||||
drop(prev_delta_writer);
|
||||
self.generated_layers
|
||||
.push(SplitWriterResult::Discarded(layer_key));
|
||||
} else {
|
||||
let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
|
||||
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
|
||||
self.generated_layers
|
||||
.push(SplitWriterResult::Produced(delta_layer));
|
||||
}
|
||||
} else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT {
|
||||
// We have to produce a very large file b/c a key is updated too often.
|
||||
anyhow::bail!(
|
||||
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
|
||||
key,
|
||||
self.inner.estimated_size()
|
||||
);
|
||||
}
|
||||
}
|
||||
self.last_key_written = key;
|
||||
self.inner.put_value(key, lsn, val, ctx).await
|
||||
}
|
||||
|
||||
pub async fn put_value(
|
||||
&mut self,
|
||||
key: Key,
|
||||
@@ -158,56 +315,64 @@ impl SplitDeltaLayerWriter {
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
|
||||
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
|
||||
let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
|
||||
if self.inner.num_keys() >= 1
|
||||
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
{
|
||||
let next_delta_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
|
||||
let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
|
||||
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
|
||||
self.generated_layers.push(delta_layer);
|
||||
}
|
||||
self.inner.put_value(key, lsn, val, ctx).await
|
||||
self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false })
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish(
|
||||
pub(crate) async fn finish_with_discard_fn<D, F>(
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
) -> anyhow::Result<Vec<ResidentLayer>> {
|
||||
discard: D,
|
||||
) -> anyhow::Result<Vec<SplitWriterResult>>
|
||||
where
|
||||
D: FnOnce(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
{
|
||||
let Self {
|
||||
mut generated_layers,
|
||||
inner,
|
||||
..
|
||||
} = self;
|
||||
|
||||
let (desc, path) = inner.finish(end_key, ctx).await?;
|
||||
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
|
||||
generated_layers.push(delta_layer);
|
||||
if inner.num_keys() == 0 {
|
||||
return Ok(generated_layers);
|
||||
}
|
||||
let layer_key = PersistentLayerKey {
|
||||
key_range: self.start_key..end_key,
|
||||
lsn_range: self.lsn_range.clone(),
|
||||
is_delta: true,
|
||||
};
|
||||
if discard(&layer_key).await {
|
||||
generated_layers.push(SplitWriterResult::Discarded(layer_key));
|
||||
} else {
|
||||
let (desc, path) = inner.finish(end_key, ctx).await?;
|
||||
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
|
||||
generated_layers.push(SplitWriterResult::Produced(delta_layer));
|
||||
}
|
||||
Ok(generated_layers)
|
||||
}
|
||||
|
||||
/// When split writer fails, the caller should call this function and handle partially generated layers.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn take(self) -> anyhow::Result<(Vec<ResidentLayer>, DeltaLayerWriter)> {
|
||||
pub(crate) async fn finish(
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
) -> anyhow::Result<Vec<SplitWriterResult>> {
|
||||
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
|
||||
.await
|
||||
}
|
||||
|
||||
/// When split writer fails, the caller should call this function and handle partially generated layers.
|
||||
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, DeltaLayerWriter)> {
|
||||
Ok((self.generated_layers, self.inner))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use itertools::Itertools;
|
||||
use rand::{RngCore, SeedableRng};
|
||||
|
||||
use crate::{
|
||||
@@ -302,9 +467,16 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_split() {
|
||||
let harness = TenantHarness::create("split_writer_write_split")
|
||||
.await
|
||||
.unwrap();
|
||||
write_split_helper("split_writer_write_split", false).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_split_discard() {
|
||||
write_split_helper("split_writer_write_split_discard", false).await;
|
||||
}
|
||||
|
||||
async fn write_split_helper(harness_name: &'static str, discard: bool) {
|
||||
let harness = TenantHarness::create(harness_name).await.unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline = tenant
|
||||
@@ -338,16 +510,19 @@ mod tests {
|
||||
for i in 0..N {
|
||||
let i = i as u32;
|
||||
image_writer
|
||||
.put_image(get_key(i), get_large_img(), &tline, &ctx)
|
||||
.put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async {
|
||||
discard
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
delta_writer
|
||||
.put_value(
|
||||
.put_value_with_discard_fn(
|
||||
get_key(i),
|
||||
Lsn(0x20),
|
||||
Value::Image(get_large_img()),
|
||||
&tline,
|
||||
&ctx,
|
||||
|_| async { discard },
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -360,22 +535,39 @@ mod tests {
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(image_layers.len(), N / 512 + 1);
|
||||
assert_eq!(delta_layers.len(), N / 512 + 1);
|
||||
for idx in 0..image_layers.len() {
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
if idx > 0 {
|
||||
assert_eq!(
|
||||
image_layers[idx - 1].layer_desc().key_range.end,
|
||||
image_layers[idx].layer_desc().key_range.start
|
||||
);
|
||||
assert_eq!(
|
||||
delta_layers[idx - 1].layer_desc().key_range.end,
|
||||
delta_layers[idx].layer_desc().key_range.start
|
||||
);
|
||||
if discard {
|
||||
for layer in image_layers {
|
||||
layer.into_discarded_layer();
|
||||
}
|
||||
for layer in delta_layers {
|
||||
layer.into_discarded_layer();
|
||||
}
|
||||
} else {
|
||||
let image_layers = image_layers
|
||||
.into_iter()
|
||||
.map(|x| x.into_resident_layer())
|
||||
.collect_vec();
|
||||
let delta_layers = delta_layers
|
||||
.into_iter()
|
||||
.map(|x| x.into_resident_layer())
|
||||
.collect_vec();
|
||||
assert_eq!(image_layers.len(), N / 512 + 1);
|
||||
assert_eq!(delta_layers.len(), N / 512 + 1);
|
||||
for idx in 0..image_layers.len() {
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
if idx > 0 {
|
||||
assert_eq!(
|
||||
image_layers[idx - 1].layer_desc().key_range.end,
|
||||
image_layers[idx].layer_desc().key_range.start
|
||||
);
|
||||
assert_eq!(
|
||||
delta_layers[idx - 1].layer_desc().key_range.end,
|
||||
delta_layers[idx].layer_desc().key_range.start
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -456,4 +648,49 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(layers.len(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_split_single_key() {
|
||||
let harness = TenantHarness::create("split_writer_write_split_single_key")
|
||||
.await
|
||||
.unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
const N: usize = 2000;
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for i in 0..N {
|
||||
let i = i as u32;
|
||||
delta_writer
|
||||
.put_value(
|
||||
get_key(0),
|
||||
Lsn(i as u64 * 16 + 0x10),
|
||||
Value::Image(get_large_img()),
|
||||
&tline,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let delta_layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(delta_layers.len(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5444,12 +5444,17 @@ impl Timeline {
|
||||
!(a.end <= b.start || b.end <= a.start)
|
||||
}
|
||||
|
||||
let guard = self.layers.read().await;
|
||||
for layer in guard.layer_map()?.iter_historic_layers() {
|
||||
if layer.is_delta()
|
||||
&& overlaps_with(&layer.lsn_range, &deltas.lsn_range)
|
||||
&& layer.lsn_range != deltas.lsn_range
|
||||
{
|
||||
if deltas.key_range.start.next() != deltas.key_range.end {
|
||||
let guard = self.layers.read().await;
|
||||
let mut invalid_layers =
|
||||
guard.layer_map()?.iter_historic_layers().filter(|layer| {
|
||||
layer.is_delta()
|
||||
&& overlaps_with(&layer.lsn_range, &deltas.lsn_range)
|
||||
&& layer.lsn_range != deltas.lsn_range
|
||||
// skip single-key layer files
|
||||
&& layer.key_range.start.next() != layer.key_range.end
|
||||
});
|
||||
if let Some(layer) = invalid_layers.next() {
|
||||
// If a delta layer overlaps with another delta layer AND their LSN range is not the same, panic
|
||||
panic!(
|
||||
"inserted layer violates delta layer LSN invariant: current_lsn_range={}..{}, conflict_lsn_range={}..{}",
|
||||
|
||||
@@ -14,7 +14,7 @@ use super::{
|
||||
RecordedDuration, Timeline,
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
use bytes::Bytes;
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
@@ -32,6 +32,9 @@ use crate::page_cache;
|
||||
use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
|
||||
use crate::tenant::storage_layer::split_writer::{
|
||||
SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult,
|
||||
};
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
|
||||
};
|
||||
@@ -71,15 +74,60 @@ pub(crate) struct KeyHistoryRetention {
|
||||
}
|
||||
|
||||
impl KeyHistoryRetention {
|
||||
/// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
|
||||
///
|
||||
/// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
|
||||
/// For example, consider the case where a single delta with range [0x10,0x50) exists.
|
||||
/// And we have branches at LSN 0x10, 0x20, 0x30.
|
||||
/// Then we delete branch @ 0x20.
|
||||
/// Bottom-most compaction may now delete the delta [0x20,0x30).
|
||||
/// And that wouldnt' change the shape of the layer.
|
||||
///
|
||||
/// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
|
||||
///
|
||||
/// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside.
|
||||
async fn discard_key(key: &PersistentLayerKey, tline: &Arc<Timeline>, dry_run: bool) -> bool {
|
||||
if dry_run {
|
||||
return true;
|
||||
}
|
||||
let guard = tline.layers.read().await;
|
||||
if !guard.contains_key(key) {
|
||||
return false;
|
||||
}
|
||||
let layer_generation = guard.get_from_key(key).metadata().generation;
|
||||
drop(guard);
|
||||
if layer_generation == tline.generation {
|
||||
info!(
|
||||
key=%key,
|
||||
?layer_generation,
|
||||
"discard layer due to duplicated layer key in the same generation",
|
||||
);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Pipe a history of a single key to the writers.
|
||||
///
|
||||
/// If `image_writer` is none, the images will be placed into the delta layers.
|
||||
/// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn pipe_to(
|
||||
self,
|
||||
key: Key,
|
||||
delta_writer: &mut Vec<(Key, Lsn, Value)>,
|
||||
mut image_writer: Option<&mut ImageLayerWriter>,
|
||||
tline: &Arc<Timeline>,
|
||||
delta_writer: &mut SplitDeltaLayerWriter,
|
||||
mut image_writer: Option<&mut SplitImageLayerWriter>,
|
||||
stat: &mut CompactionStatistics,
|
||||
dry_run: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut first_batch = true;
|
||||
let discard = |key: &PersistentLayerKey| {
|
||||
let key = key.clone();
|
||||
async move { Self::discard_key(&key, tline, dry_run).await }
|
||||
};
|
||||
for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
|
||||
if first_batch {
|
||||
if logs.len() == 1 && logs[0].1.is_image() {
|
||||
@@ -88,28 +136,45 @@ impl KeyHistoryRetention {
|
||||
};
|
||||
stat.produce_image_key(img);
|
||||
if let Some(image_writer) = image_writer.as_mut() {
|
||||
image_writer.put_image(key, img.clone(), ctx).await?;
|
||||
image_writer
|
||||
.put_image_with_discard_fn(key, img.clone(), tline, ctx, discard)
|
||||
.await?;
|
||||
} else {
|
||||
delta_writer.push((key, cutoff_lsn, Value::Image(img.clone())));
|
||||
delta_writer
|
||||
.put_value_with_discard_fn(
|
||||
key,
|
||||
cutoff_lsn,
|
||||
Value::Image(img.clone()),
|
||||
tline,
|
||||
ctx,
|
||||
discard,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
} else {
|
||||
for (lsn, val) in logs {
|
||||
stat.produce_key(&val);
|
||||
delta_writer.push((key, lsn, val));
|
||||
delta_writer
|
||||
.put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
first_batch = false;
|
||||
} else {
|
||||
for (lsn, val) in logs {
|
||||
stat.produce_key(&val);
|
||||
delta_writer.push((key, lsn, val));
|
||||
delta_writer
|
||||
.put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
|
||||
for (lsn, val) in above_horizon_logs {
|
||||
stat.produce_key(&val);
|
||||
delta_writer.push((key, lsn, val));
|
||||
delta_writer
|
||||
.put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1814,11 +1879,27 @@ impl Timeline {
|
||||
}
|
||||
let mut selected_layers = Vec::new();
|
||||
drop(gc_info);
|
||||
// Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
|
||||
let Some(max_layer_lsn) = layers
|
||||
.iter_historic_layers()
|
||||
.filter(|desc| desc.get_lsn_range().start <= gc_cutoff)
|
||||
.map(|desc| desc.get_lsn_range().end)
|
||||
.max()
|
||||
else {
|
||||
info!("no layers to compact with gc");
|
||||
return Ok(());
|
||||
};
|
||||
// Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
|
||||
// layers to compact.
|
||||
for desc in layers.iter_historic_layers() {
|
||||
if desc.get_lsn_range().start <= gc_cutoff {
|
||||
if desc.get_lsn_range().end <= max_layer_lsn {
|
||||
selected_layers.push(guard.get_from_desc(&desc));
|
||||
}
|
||||
}
|
||||
if selected_layers.is_empty() {
|
||||
info!("no layers to compact with gc");
|
||||
return Ok(());
|
||||
}
|
||||
retain_lsns_below_horizon.sort();
|
||||
(selected_layers, gc_cutoff, retain_lsns_below_horizon)
|
||||
};
|
||||
@@ -1848,27 +1929,53 @@ impl Timeline {
|
||||
lowest_retain_lsn
|
||||
);
|
||||
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
|
||||
// Also, collect the layer information to decide when to split the new delta layers.
|
||||
let mut downloaded_layers = Vec::new();
|
||||
let mut delta_split_points = BTreeSet::new();
|
||||
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
|
||||
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
|
||||
for layer in &layer_selection {
|
||||
let resident_layer = layer.download_and_keep_resident().await?;
|
||||
downloaded_layers.push(resident_layer);
|
||||
|
||||
let desc = layer.layer_desc();
|
||||
if desc.is_delta() {
|
||||
// TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
|
||||
// so that we can avoid having too many small delta layers.
|
||||
let key_range = desc.get_key_range();
|
||||
delta_split_points.insert(key_range.start);
|
||||
delta_split_points.insert(key_range.end);
|
||||
// ignore single-key layer files
|
||||
if desc.key_range.start.next() != desc.key_range.end {
|
||||
let lsn_range = &desc.lsn_range;
|
||||
lsn_split_point.insert(lsn_range.start);
|
||||
lsn_split_point.insert(lsn_range.end);
|
||||
}
|
||||
stat.visit_delta_layer(desc.file_size());
|
||||
} else {
|
||||
stat.visit_image_layer(desc.file_size());
|
||||
}
|
||||
}
|
||||
for layer in &layer_selection {
|
||||
let desc = layer.layer_desc();
|
||||
let key_range = &desc.key_range;
|
||||
if desc.is_delta() && key_range.start.next() != key_range.end {
|
||||
let lsn_range = desc.lsn_range.clone();
|
||||
let intersects = lsn_split_point.range(lsn_range).collect_vec();
|
||||
if intersects.len() > 1 {
|
||||
bail!(
|
||||
"cannot run gc-compaction because it violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
|
||||
desc.key(),
|
||||
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
// The maximum LSN we are processing in this compaction loop
|
||||
let end_lsn = layer_selection
|
||||
.iter()
|
||||
.map(|l| l.layer_desc().lsn_range.end)
|
||||
.max()
|
||||
.unwrap();
|
||||
// We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
|
||||
// as an L0 layer.
|
||||
let hack_end_key = Key::NON_L0_MAX;
|
||||
let mut delta_layers = Vec::new();
|
||||
let mut image_layers = Vec::new();
|
||||
let mut downloaded_layers = Vec::new();
|
||||
for layer in &layer_selection {
|
||||
let resident_layer = layer.download_and_keep_resident().await?;
|
||||
downloaded_layers.push(resident_layer);
|
||||
}
|
||||
for resident_layer in &downloaded_layers {
|
||||
if resident_layer.layer_desc().is_delta() {
|
||||
let layer = resident_layer.get_as_delta(ctx).await?;
|
||||
@@ -1884,138 +1991,17 @@ impl Timeline {
|
||||
let mut accumulated_values = Vec::new();
|
||||
let mut last_key: Option<Key> = None;
|
||||
|
||||
enum FlushDeltaResult {
|
||||
/// Create a new resident layer
|
||||
CreateResidentLayer(ResidentLayer),
|
||||
/// Keep an original delta layer
|
||||
KeepLayer(PersistentLayerKey),
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn flush_deltas(
|
||||
deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
|
||||
last_key: Key,
|
||||
delta_split_points: &[Key],
|
||||
current_delta_split_point: &mut usize,
|
||||
tline: &Arc<Timeline>,
|
||||
lowest_retain_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
stats: &mut CompactionStatistics,
|
||||
dry_run: bool,
|
||||
last_batch: bool,
|
||||
) -> anyhow::Result<Option<FlushDeltaResult>> {
|
||||
// Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
|
||||
// overlapping layers.
|
||||
//
|
||||
// If we have a structure like this:
|
||||
//
|
||||
// | Delta 1 | | Delta 4 |
|
||||
// |---------| Delta 2 |---------|
|
||||
// | Delta 3 | | Delta 5 |
|
||||
//
|
||||
// And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4.
|
||||
// A simple solution here is to split the delta layers using the original boundary, while this
|
||||
// might produce a lot of small layers. This should be improved and fixed in the future.
|
||||
let mut need_split = false;
|
||||
while *current_delta_split_point < delta_split_points.len()
|
||||
&& last_key >= delta_split_points[*current_delta_split_point]
|
||||
{
|
||||
*current_delta_split_point += 1;
|
||||
need_split = true;
|
||||
}
|
||||
if !need_split && !last_batch {
|
||||
return Ok(None);
|
||||
}
|
||||
let deltas: Vec<(Key, Lsn, Value)> = std::mem::take(deltas);
|
||||
if deltas.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1;
|
||||
let delta_key = PersistentLayerKey {
|
||||
key_range: {
|
||||
let key_start = deltas.first().unwrap().0;
|
||||
let key_end = deltas.last().unwrap().0.next();
|
||||
key_start..key_end
|
||||
},
|
||||
lsn_range: lowest_retain_lsn..end_lsn,
|
||||
is_delta: true,
|
||||
};
|
||||
{
|
||||
// Hack: skip delta layer if we need to produce a layer of a same key-lsn.
|
||||
//
|
||||
// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range.
|
||||
// For example, consider the case where a single delta with range [0x10,0x50) exists.
|
||||
// And we have branches at LSN 0x10, 0x20, 0x30.
|
||||
// Then we delete branch @ 0x20.
|
||||
// Bottom-most compaction may now delete the delta [0x20,0x30).
|
||||
// And that wouldnt' change the shape of the layer.
|
||||
//
|
||||
// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes.
|
||||
// That's why it's safe to skip.
|
||||
let guard = tline.layers.read().await;
|
||||
|
||||
if guard.contains_key(&delta_key) {
|
||||
let layer_generation = guard.get_from_key(&delta_key).metadata().generation;
|
||||
drop(guard);
|
||||
if layer_generation == tline.generation {
|
||||
stats.discard_delta_layer();
|
||||
// TODO: depending on whether we design this compaction process to run along with
|
||||
// other compactions, there could be layer map modifications after we drop the
|
||||
// layer guard, and in case it creates duplicated layer key, we will still error
|
||||
// in the end.
|
||||
info!(
|
||||
key=%delta_key,
|
||||
?layer_generation,
|
||||
"discard delta layer due to duplicated layer in the same generation"
|
||||
);
|
||||
return Ok(Some(FlushDeltaResult::KeepLayer(delta_key)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut delta_layer_writer = DeltaLayerWriter::new(
|
||||
tline.conf,
|
||||
tline.timeline_id,
|
||||
tline.tenant_shard_id,
|
||||
delta_key.key_range.start,
|
||||
lowest_retain_lsn..end_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
for (key, lsn, val) in deltas {
|
||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||
}
|
||||
|
||||
stats.produce_delta_layer(delta_layer_writer.size());
|
||||
if dry_run {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let (desc, path) = delta_layer_writer
|
||||
.finish(delta_key.key_range.end, ctx)
|
||||
.await?;
|
||||
let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?;
|
||||
Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
|
||||
}
|
||||
|
||||
// Hack the key range to be min..(max-1). Otherwise, the image layer will be
|
||||
// interpreted as an L0 delta layer.
|
||||
let hack_image_layer_range = {
|
||||
let mut end_key = Key::MAX;
|
||||
end_key.field6 -= 1;
|
||||
Key::MIN..end_key
|
||||
};
|
||||
|
||||
// Only create image layers when there is no ancestor branches. TODO: create covering image layer
|
||||
// when some condition meet.
|
||||
let mut image_layer_writer = if self.ancestor_timeline.is_none() {
|
||||
Some(
|
||||
ImageLayerWriter::new(
|
||||
SplitImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&hack_image_layer_range, // covers the full key range
|
||||
Key::MIN,
|
||||
lowest_retain_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -2024,6 +2010,17 @@ impl Timeline {
|
||||
None
|
||||
};
|
||||
|
||||
let mut delta_layer_writer = SplitDeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
Key::MIN,
|
||||
lowest_retain_lsn..end_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
/// Returns None if there is no ancestor branch. Throw an error when the key is not found.
|
||||
///
|
||||
/// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
|
||||
@@ -2044,47 +2041,11 @@ impl Timeline {
|
||||
let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
|
||||
Ok(Some((key, tline.ancestor_lsn, img)))
|
||||
}
|
||||
let image_layer_key = PersistentLayerKey {
|
||||
key_range: hack_image_layer_range,
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lowest_retain_lsn),
|
||||
is_delta: false,
|
||||
};
|
||||
|
||||
// Like with delta layers, it can happen that we re-produce an already existing image layer.
|
||||
// This could happen when a user triggers force compaction and image generation. In this case,
|
||||
// it's always safe to rewrite the layer.
|
||||
let discard_image_layer = {
|
||||
let guard = self.layers.read().await;
|
||||
if guard.contains_key(&image_layer_key) {
|
||||
let layer_generation = guard.get_from_key(&image_layer_key).metadata().generation;
|
||||
drop(guard);
|
||||
if layer_generation == self.generation {
|
||||
// TODO: depending on whether we design this compaction process to run along with
|
||||
// other compactions, there could be layer map modifications after we drop the
|
||||
// layer guard, and in case it creates duplicated layer key, we will still error
|
||||
// in the end.
|
||||
info!(
|
||||
key=%image_layer_key,
|
||||
?layer_generation,
|
||||
"discard image layer due to duplicated layer key in the same generation",
|
||||
);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
// Actually, we can decide not to write to the image layer at all at this point because
|
||||
// the key and LSN range are determined. However, to keep things simple here, we still
|
||||
// create this writer, and discard the writer in the end.
|
||||
|
||||
let mut delta_values = Vec::new();
|
||||
let delta_split_points = delta_split_points.into_iter().collect_vec();
|
||||
let mut current_delta_split_point = 0;
|
||||
let mut delta_layers = Vec::new();
|
||||
while let Some((key, lsn, val)) = merge_iter.next().await? {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
|
||||
@@ -2115,27 +2076,14 @@ impl Timeline {
|
||||
retention
|
||||
.pipe_to(
|
||||
*last_key,
|
||||
&mut delta_values,
|
||||
self,
|
||||
&mut delta_layer_writer,
|
||||
image_layer_writer.as_mut(),
|
||||
&mut stat,
|
||||
dry_run,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
delta_layers.extend(
|
||||
flush_deltas(
|
||||
&mut delta_values,
|
||||
*last_key,
|
||||
&delta_split_points,
|
||||
&mut current_delta_split_point,
|
||||
self,
|
||||
lowest_retain_lsn,
|
||||
ctx,
|
||||
&mut stat,
|
||||
dry_run,
|
||||
false,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
accumulated_values.clear();
|
||||
*last_key = key;
|
||||
accumulated_values.push((key, lsn, val));
|
||||
@@ -2159,43 +2107,75 @@ impl Timeline {
|
||||
retention
|
||||
.pipe_to(
|
||||
last_key,
|
||||
&mut delta_values,
|
||||
self,
|
||||
&mut delta_layer_writer,
|
||||
image_layer_writer.as_mut(),
|
||||
&mut stat,
|
||||
dry_run,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
delta_layers.extend(
|
||||
flush_deltas(
|
||||
&mut delta_values,
|
||||
last_key,
|
||||
&delta_split_points,
|
||||
&mut current_delta_split_point,
|
||||
self,
|
||||
lowest_retain_lsn,
|
||||
ctx,
|
||||
&mut stat,
|
||||
dry_run,
|
||||
true,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
assert!(delta_values.is_empty(), "unprocessed keys");
|
||||
|
||||
let image_layer = if discard_image_layer {
|
||||
stat.discard_image_layer();
|
||||
None
|
||||
} else if let Some(writer) = image_layer_writer {
|
||||
stat.produce_image_layer(writer.size());
|
||||
let discard = |key: &PersistentLayerKey| {
|
||||
let key = key.clone();
|
||||
async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await }
|
||||
};
|
||||
|
||||
let produced_image_layers = if let Some(writer) = image_layer_writer {
|
||||
if !dry_run {
|
||||
Some(writer.finish(self, ctx).await?)
|
||||
writer
|
||||
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
|
||||
.await?
|
||||
} else {
|
||||
None
|
||||
let (layers, _) = writer.take()?;
|
||||
assert!(layers.is_empty(), "image layers produced in dry run mode?");
|
||||
Vec::new()
|
||||
}
|
||||
} else {
|
||||
None
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let produced_delta_layers = if !dry_run {
|
||||
delta_layer_writer
|
||||
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
|
||||
.await?
|
||||
} else {
|
||||
let (layers, _) = delta_layer_writer.take()?;
|
||||
assert!(layers.is_empty(), "delta layers produced in dry run mode?");
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let mut compact_to = Vec::new();
|
||||
let mut keep_layers = HashSet::new();
|
||||
let produced_delta_layers_len = produced_delta_layers.len();
|
||||
let produced_image_layers_len = produced_image_layers.len();
|
||||
for action in produced_delta_layers {
|
||||
match action {
|
||||
SplitWriterResult::Produced(layer) => {
|
||||
stat.produce_delta_layer(layer.layer_desc().file_size());
|
||||
compact_to.push(layer);
|
||||
}
|
||||
SplitWriterResult::Discarded(l) => {
|
||||
keep_layers.insert(l);
|
||||
stat.discard_delta_layer();
|
||||
}
|
||||
}
|
||||
}
|
||||
for action in produced_image_layers {
|
||||
match action {
|
||||
SplitWriterResult::Produced(layer) => {
|
||||
stat.produce_image_layer(layer.layer_desc().file_size());
|
||||
compact_to.push(layer);
|
||||
}
|
||||
SplitWriterResult::Discarded(l) => {
|
||||
keep_layers.insert(l);
|
||||
stat.discard_image_layer();
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut layer_selection = layer_selection;
|
||||
layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
|
||||
|
||||
info!(
|
||||
"gc-compaction statistics: {}",
|
||||
serde_json::to_string(&stat)?
|
||||
@@ -2206,28 +2186,11 @@ impl Timeline {
|
||||
}
|
||||
|
||||
info!(
|
||||
"produced {} delta layers and {} image layers",
|
||||
delta_layers.len(),
|
||||
if image_layer.is_some() { 1 } else { 0 }
|
||||
"produced {} delta layers and {} image layers, {} layers are kept",
|
||||
produced_delta_layers_len,
|
||||
produced_image_layers_len,
|
||||
layer_selection.len()
|
||||
);
|
||||
let mut compact_to = Vec::new();
|
||||
let mut keep_layers = HashSet::new();
|
||||
for action in delta_layers {
|
||||
match action {
|
||||
FlushDeltaResult::CreateResidentLayer(layer) => {
|
||||
compact_to.push(layer);
|
||||
}
|
||||
FlushDeltaResult::KeepLayer(l) => {
|
||||
keep_layers.insert(l);
|
||||
}
|
||||
}
|
||||
}
|
||||
if discard_image_layer {
|
||||
keep_layers.insert(image_layer_key);
|
||||
}
|
||||
let mut layer_selection = layer_selection;
|
||||
layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
|
||||
compact_to.extend(image_layer);
|
||||
|
||||
// Step 3: Place back to the layer map.
|
||||
{
|
||||
|
||||
@@ -85,7 +85,7 @@ pub trait TestBackend: Send + Sync + 'static {
|
||||
impl std::fmt::Display for BackendType<'_, (), ()> {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Console(api, _) => match &**api {
|
||||
Self::Console(api, ()) => match &**api {
|
||||
ConsoleBackend::Console(endpoint) => {
|
||||
fmt.debug_tuple("Console").field(&endpoint.url()).finish()
|
||||
}
|
||||
@@ -96,7 +96,7 @@ impl std::fmt::Display for BackendType<'_, (), ()> {
|
||||
#[cfg(test)]
|
||||
ConsoleBackend::Test(_) => fmt.debug_tuple("Test").finish(),
|
||||
},
|
||||
Self::Link(url, _) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
|
||||
Self::Link(url, ()) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
|
||||
Self::Local(_) => fmt.debug_tuple("Local").finish(),
|
||||
}
|
||||
}
|
||||
@@ -324,21 +324,20 @@ async fn auth_quirks(
|
||||
};
|
||||
let (cached_entry, secret) = cached_secret.take_value();
|
||||
|
||||
let secret = match secret {
|
||||
Some(secret) => config.check_rate_limit(
|
||||
let secret = if let Some(secret) = secret {
|
||||
config.check_rate_limit(
|
||||
ctx,
|
||||
config,
|
||||
secret,
|
||||
&info.endpoint,
|
||||
unauthenticated_password.is_some() || allow_cleartext,
|
||||
)?,
|
||||
None => {
|
||||
// If we don't have an authentication secret, we mock one to
|
||||
// prevent malicious probing (possible due to missing protocol steps).
|
||||
// This mocked secret will never lead to successful authentication.
|
||||
info!("authentication info not found, mocking it");
|
||||
AuthSecret::Scram(scram::ServerSecret::mock(rand::random()))
|
||||
}
|
||||
)?
|
||||
} else {
|
||||
// If we don't have an authentication secret, we mock one to
|
||||
// prevent malicious probing (possible due to missing protocol steps).
|
||||
// This mocked secret will never lead to successful authentication.
|
||||
info!("authentication info not found, mocking it");
|
||||
AuthSecret::Scram(scram::ServerSecret::mock(rand::random()))
|
||||
};
|
||||
|
||||
match authenticate_with_secret(
|
||||
@@ -409,7 +408,7 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> {
|
||||
pub fn get_endpoint(&self) -> Option<EndpointId> {
|
||||
match self {
|
||||
Self::Console(_, user_info) => user_info.endpoint_id.clone(),
|
||||
Self::Link(_, _) => Some("link".into()),
|
||||
Self::Link(_, ()) => Some("link".into()),
|
||||
Self::Local(_) => Some("local".into()),
|
||||
}
|
||||
}
|
||||
@@ -418,7 +417,7 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> {
|
||||
pub fn get_user(&self) -> &str {
|
||||
match self {
|
||||
Self::Console(_, user_info) => &user_info.user,
|
||||
Self::Link(_, _) => "link",
|
||||
Self::Link(_, ()) => "link",
|
||||
Self::Local(_) => "local",
|
||||
}
|
||||
}
|
||||
@@ -454,7 +453,7 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> {
|
||||
BackendType::Console(api, credentials)
|
||||
}
|
||||
// NOTE: this auth backend doesn't use client credentials.
|
||||
Self::Link(url, _) => {
|
||||
Self::Link(url, ()) => {
|
||||
info!("performing link authentication");
|
||||
|
||||
let info = link::authenticate(ctx, &url, client).await?;
|
||||
@@ -478,7 +477,7 @@ impl BackendType<'_, ComputeUserInfo, &()> {
|
||||
) -> Result<CachedRoleSecret, GetAuthInfoError> {
|
||||
match self {
|
||||
Self::Console(api, user_info) => api.get_role_secret(ctx, user_info).await,
|
||||
Self::Link(_, _) => Ok(Cached::new_uncached(None)),
|
||||
Self::Link(_, ()) => Ok(Cached::new_uncached(None)),
|
||||
Self::Local(_) => Ok(Cached::new_uncached(None)),
|
||||
}
|
||||
}
|
||||
@@ -489,7 +488,7 @@ impl BackendType<'_, ComputeUserInfo, &()> {
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
|
||||
match self {
|
||||
Self::Console(api, user_info) => api.get_allowed_ips_and_secret(ctx, user_info).await,
|
||||
Self::Link(_, _) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
|
||||
Self::Link(_, ()) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
|
||||
Self::Local(_) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
|
||||
}
|
||||
}
|
||||
@@ -525,7 +524,7 @@ impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, &()> {
|
||||
) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
|
||||
match self {
|
||||
Self::Console(api, creds) => api.wake_compute(ctx, &creds.info).await,
|
||||
Self::Link(_, _) => unreachable!("link auth flow doesn't support waking the compute"),
|
||||
Self::Link(_, ()) => unreachable!("link auth flow doesn't support waking the compute"),
|
||||
Self::Local(local) => Ok(Cached::new_uncached(local.node_info.clone())),
|
||||
}
|
||||
}
|
||||
@@ -533,7 +532,7 @@ impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, &()> {
|
||||
fn get_keys(&self) -> &ComputeCredentialKeys {
|
||||
match self {
|
||||
Self::Console(_, creds) => &creds.keys,
|
||||
Self::Link(_, _) => &ComputeCredentialKeys::None,
|
||||
Self::Link(_, ()) => &ComputeCredentialKeys::None,
|
||||
Self::Local(_) => &ComputeCredentialKeys::None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,10 +224,10 @@ impl JwkCacheEntryLock {
|
||||
// where Signature = alg(<B64(Header)> || . || <B64(Payload)>);
|
||||
|
||||
let (header_payload, signature) = jwt
|
||||
.rsplit_once(".")
|
||||
.rsplit_once('.')
|
||||
.context("Provided authentication token is not a valid JWT encoding")?;
|
||||
let (header, payload) = header_payload
|
||||
.split_once(".")
|
||||
.split_once('.')
|
||||
.context("Provided authentication token is not a valid JWT encoding")?;
|
||||
|
||||
let header = base64::decode_config(header, base64::URL_SAFE_NO_PAD)
|
||||
@@ -320,14 +320,11 @@ impl JwkCache {
|
||||
// try with just a read lock first
|
||||
let key = (endpoint, role_name.clone());
|
||||
let entry = self.map.get(&key).as_deref().map(Arc::clone);
|
||||
let entry = match entry {
|
||||
Some(entry) => entry,
|
||||
None => {
|
||||
// acquire a write lock after to insert.
|
||||
let entry = self.map.entry(key).or_default();
|
||||
Arc::clone(&*entry)
|
||||
}
|
||||
};
|
||||
let entry = entry.unwrap_or_else(|| {
|
||||
// acquire a write lock after to insert.
|
||||
let entry = self.map.entry(key).or_default();
|
||||
Arc::clone(&*entry)
|
||||
});
|
||||
|
||||
entry
|
||||
.check_jwt(ctx, jwt, &self.client, role_name, fetch)
|
||||
|
||||
@@ -130,9 +130,12 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
}))
|
||||
}
|
||||
// Invariant: project name may not contain certain characters.
|
||||
(a, b) => a.or(b).map(|name| match project_name_valid(name.as_ref()) {
|
||||
false => Err(ComputeUserInfoParseError::MalformedProjectName(name)),
|
||||
true => Ok(name),
|
||||
(a, b) => a.or(b).map(|name| {
|
||||
if project_name_valid(name.as_ref()) {
|
||||
Ok(name)
|
||||
} else {
|
||||
Err(ComputeUserInfoParseError::MalformedProjectName(name))
|
||||
}
|
||||
}),
|
||||
}
|
||||
.transpose()?;
|
||||
|
||||
8
proxy/src/cache/project_info.rs
vendored
8
proxy/src/cache/project_info.rs
vendored
@@ -274,13 +274,13 @@ impl ProjectInfoCacheImpl {
|
||||
let ttl_disabled_since_us = self
|
||||
.ttl_disabled_since_us
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let ignore_cache_since = if ttl_disabled_since_us != u64::MAX {
|
||||
let ignore_cache_since = if ttl_disabled_since_us == u64::MAX {
|
||||
None
|
||||
} else {
|
||||
let ignore_cache_since = self.start_time + Duration::from_micros(ttl_disabled_since_us);
|
||||
// We are fine if entry is not older than ttl or was added before we are getting notifications.
|
||||
valid_since = valid_since.min(ignore_cache_since);
|
||||
Some(ignore_cache_since)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
(valid_since, ignore_cache_since)
|
||||
}
|
||||
@@ -306,7 +306,7 @@ impl ProjectInfoCacheImpl {
|
||||
let mut removed = 0;
|
||||
let shard = self.project2ep.shards()[shard].write();
|
||||
for (_, endpoints) in shard.iter() {
|
||||
for endpoint in endpoints.get().iter() {
|
||||
for endpoint in endpoints.get() {
|
||||
self.cache.remove(endpoint);
|
||||
removed += 1;
|
||||
}
|
||||
|
||||
@@ -220,7 +220,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn cancel_session_noop_regression() {
|
||||
let handler = CancellationHandler::<()>::new(Default::default(), CancellationSource::Local);
|
||||
let handler =
|
||||
CancellationHandler::<()>::new(CancelMap::default(), CancellationSource::Local);
|
||||
handler
|
||||
.cancel_session(
|
||||
CancelKeyData {
|
||||
|
||||
@@ -286,7 +286,7 @@ impl ConnCfg {
|
||||
|
||||
let client_config = if allow_self_signed_compute {
|
||||
// Allow all certificates for creating the connection
|
||||
let verifier = Arc::new(AcceptEverythingVerifier) as Arc<dyn ServerCertVerifier>;
|
||||
let verifier = Arc::new(AcceptEverythingVerifier);
|
||||
rustls::ClientConfig::builder()
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(verifier)
|
||||
|
||||
@@ -318,7 +318,7 @@ impl CertResolver {
|
||||
// a) Instead of multi-cert approach use single cert with extra
|
||||
// domains listed in Subject Alternative Name (SAN).
|
||||
// b) Deploy separate proxy instances for extra domains.
|
||||
self.default.as_ref().cloned()
|
||||
self.default.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,7 +64,7 @@ impl Api {
|
||||
tokio_postgres::connect(self.endpoint.as_str(), tokio_postgres::NoTls).await?;
|
||||
|
||||
tokio::spawn(connection);
|
||||
let secret = match get_execute_postgres_query(
|
||||
let secret = if let Some(entry) = get_execute_postgres_query(
|
||||
&client,
|
||||
"select rolpassword from pg_catalog.pg_authid where rolname = $1",
|
||||
&[&&*user_info.user],
|
||||
@@ -72,15 +72,12 @@ impl Api {
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Some(entry) => {
|
||||
info!("got a secret: {entry}"); // safe since it's not a prod scenario
|
||||
let secret = scram::ServerSecret::parse(&entry).map(AuthSecret::Scram);
|
||||
secret.or_else(|| parse_md5(&entry).map(AuthSecret::Md5))
|
||||
}
|
||||
None => {
|
||||
warn!("user '{}' does not exist", user_info.user);
|
||||
None
|
||||
}
|
||||
info!("got a secret: {entry}"); // safe since it's not a prod scenario
|
||||
let secret = scram::ServerSecret::parse(&entry).map(AuthSecret::Scram);
|
||||
secret.or_else(|| parse_md5(&entry).map(AuthSecret::Md5))
|
||||
} else {
|
||||
warn!("user '{}' does not exist", user_info.user);
|
||||
None
|
||||
};
|
||||
let allowed_ips = match get_execute_postgres_query(
|
||||
&client,
|
||||
@@ -142,12 +139,11 @@ async fn get_execute_postgres_query(
|
||||
let rows = client.query(query, params).await?;
|
||||
|
||||
// We can get at most one row, because `rolname` is unique.
|
||||
let row = match rows.first() {
|
||||
Some(row) => row,
|
||||
let Some(row) = rows.first() else {
|
||||
// This means that the user doesn't exist, so there can be no secret.
|
||||
// However, this is still a *valid* outcome which is very similar
|
||||
// to getting `404 Not found` from the Neon console.
|
||||
None => return Ok(None),
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let entry = row.try_get(idx).map_err(MockApiError::PasswordNotSet)?;
|
||||
|
||||
@@ -38,9 +38,9 @@ impl Api {
|
||||
locks: &'static ApiLocks<EndpointCacheKey>,
|
||||
wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
|
||||
) -> Self {
|
||||
let jwt: String = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
|
||||
let jwt = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
Err(_) => String::new(),
|
||||
};
|
||||
Self {
|
||||
endpoint,
|
||||
@@ -96,10 +96,10 @@ impl Api {
|
||||
// Error 404 is special: it's ok not to have a secret.
|
||||
// TODO(anna): retry
|
||||
Err(e) => {
|
||||
if e.get_reason().is_not_found() {
|
||||
return Ok(AuthInfo::default());
|
||||
return if e.get_reason().is_not_found() {
|
||||
Ok(AuthInfo::default())
|
||||
} else {
|
||||
return Err(e.into());
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// https://rust-lang.github.io/rust-clippy/master/index.html#?groups=restriction
|
||||
#![warn(
|
||||
clippy::undocumented_unsafe_blocks,
|
||||
// TODO: Enable once all individual checks are enabled.
|
||||
//clippy::as_conversions,
|
||||
clippy::dbg_macro,
|
||||
clippy::empty_enum_variants_with_brackets,
|
||||
clippy::exit,
|
||||
@@ -31,8 +33,15 @@
|
||||
)]
|
||||
// List of permanently allowed lints.
|
||||
#![allow(
|
||||
// It's ok to cast u8 to bool, etc.
|
||||
// It's ok to cast bool to u8, etc.
|
||||
clippy::cast_lossless,
|
||||
// Seems unavoidable.
|
||||
clippy::multiple_crate_versions,
|
||||
// While #[must_use] is a great feature this check is too noisy.
|
||||
clippy::must_use_candidate,
|
||||
// Inline consts, structs, fns, imports, etc. are ok if they're used by
|
||||
// the following statement(s).
|
||||
clippy::items_after_statements,
|
||||
)]
|
||||
// List of temporarily allowed lints.
|
||||
// TODO: Switch to except() once stable with 1.81.
|
||||
@@ -43,46 +52,26 @@
|
||||
clippy::cast_possible_wrap,
|
||||
clippy::cast_precision_loss,
|
||||
clippy::cast_sign_loss,
|
||||
clippy::default_trait_access,
|
||||
clippy::doc_markdown,
|
||||
clippy::explicit_iter_loop,
|
||||
clippy::float_cmp,
|
||||
clippy::if_not_else,
|
||||
clippy::ignored_unit_patterns,
|
||||
clippy::implicit_hasher,
|
||||
clippy::inconsistent_struct_constructor,
|
||||
clippy::inline_always,
|
||||
clippy::items_after_statements,
|
||||
clippy::manual_assert,
|
||||
clippy::manual_let_else,
|
||||
clippy::manual_string_new,
|
||||
clippy::match_bool,
|
||||
clippy::match_same_arms,
|
||||
clippy::match_wild_err_arm,
|
||||
clippy::missing_errors_doc,
|
||||
clippy::missing_panics_doc,
|
||||
clippy::module_name_repetitions,
|
||||
clippy::multiple_crate_versions,
|
||||
clippy::must_use_candidate,
|
||||
clippy::needless_for_each,
|
||||
clippy::needless_pass_by_value,
|
||||
clippy::needless_raw_string_hashes,
|
||||
clippy::option_as_ref_cloned,
|
||||
clippy::redundant_closure_for_method_calls,
|
||||
clippy::redundant_else,
|
||||
clippy::return_self_not_must_use,
|
||||
clippy::similar_names,
|
||||
clippy::single_char_pattern,
|
||||
clippy::single_match_else,
|
||||
clippy::struct_excessive_bools,
|
||||
clippy::struct_field_names,
|
||||
clippy::too_many_lines,
|
||||
clippy::uninlined_format_args,
|
||||
clippy::unnested_or_patterns,
|
||||
clippy::unreadable_literal,
|
||||
clippy::unused_async,
|
||||
clippy::unused_self,
|
||||
clippy::used_underscore_binding,
|
||||
clippy::wildcard_imports
|
||||
)]
|
||||
// List of temporarily allowed lints to unblock beta/nightly.
|
||||
|
||||
@@ -254,7 +254,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
let metrics = &Metrics::get().proxy;
|
||||
let proto = ctx.protocol();
|
||||
let _request_gauge = metrics.connection_requests.guard(proto);
|
||||
let request_gauge = metrics.connection_requests.guard(proto);
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
|
||||
@@ -283,7 +283,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let result = config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|_| auth::ComputeUserInfoMaybeEndpoint::parse(ctx, ¶ms, hostname, common_names))
|
||||
.map(|()| auth::ComputeUserInfoMaybeEndpoint::parse(ctx, ¶ms, hostname, common_names))
|
||||
.transpose();
|
||||
|
||||
let user_info = match result {
|
||||
@@ -340,7 +340,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
client: stream,
|
||||
aux: node.aux.clone(),
|
||||
compute: node,
|
||||
req: _request_gauge,
|
||||
req: request_gauge,
|
||||
conn: conn_gauge,
|
||||
cancel: session,
|
||||
}))
|
||||
|
||||
@@ -30,9 +30,10 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> NodeInfo {
|
||||
if is_cached {
|
||||
warn!("invalidating stalled compute node info cache entry");
|
||||
}
|
||||
let label = match is_cached {
|
||||
true => ConnectionFailureKind::ComputeCached,
|
||||
false => ConnectionFailureKind::ComputeUncached,
|
||||
let label = if is_cached {
|
||||
ConnectionFailureKind::ComputeCached
|
||||
} else {
|
||||
ConnectionFailureKind::ComputeUncached
|
||||
};
|
||||
Metrics::get().proxy.connection_failures_total.inc(label);
|
||||
|
||||
|
||||
@@ -230,11 +230,10 @@ impl CopyBuffer {
|
||||
io::ErrorKind::WriteZero,
|
||||
"write zero byte into writer",
|
||||
))));
|
||||
} else {
|
||||
self.pos += i;
|
||||
self.amt += i as u64;
|
||||
self.need_flush = true;
|
||||
}
|
||||
self.pos += i;
|
||||
self.amt += i as u64;
|
||||
self.need_flush = true;
|
||||
}
|
||||
|
||||
// If pos larger than cap, this loop will never stop.
|
||||
|
||||
@@ -433,7 +433,7 @@ impl ReportableError for TestConnectError {
|
||||
|
||||
impl std::fmt::Display for TestConnectError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
write!(f, "{self:?}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -475,7 +475,7 @@ impl ConnectMechanism for TestConnectMechanism {
|
||||
retryable: false,
|
||||
kind: ErrorKind::Compute,
|
||||
}),
|
||||
x => panic!("expecting action {:?}, connect is called instead", x),
|
||||
x => panic!("expecting action {x:?}, connect is called instead"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -515,7 +515,7 @@ impl TestBackend for TestConnectMechanism {
|
||||
assert!(err.could_retry());
|
||||
Err(console::errors::WakeComputeError::ApiError(err))
|
||||
}
|
||||
x => panic!("expecting action {:?}, wake_compute is called instead", x),
|
||||
x => panic!("expecting action {x:?}, wake_compute is called instead"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -115,9 +115,7 @@ where
|
||||
let mut buf = [0];
|
||||
stream.read_exact(&mut buf).await.unwrap();
|
||||
|
||||
if buf[0] != b'S' {
|
||||
panic!("ssl not supported by server");
|
||||
}
|
||||
assert!(buf[0] == b'S', "ssl not supported by server");
|
||||
|
||||
tls.connect(stream).await.unwrap()
|
||||
}
|
||||
|
||||
@@ -119,6 +119,7 @@ impl Default for LeakyBucketState {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::float_cmp)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
|
||||
@@ -174,9 +174,8 @@ impl DynamicLimiter {
|
||||
let mut inner = self.inner.lock();
|
||||
if inner.take(&self.ready).is_some() {
|
||||
break Ok(Token::new(self.clone()));
|
||||
} else {
|
||||
notified.set(self.ready.notified());
|
||||
}
|
||||
notified.set(self.ready.notified());
|
||||
}
|
||||
notified.as_mut().await;
|
||||
ready = true;
|
||||
|
||||
@@ -150,7 +150,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
Notification::AllowedIpsUpdate { .. } | Notification::PasswordUpdate { .. } => {
|
||||
invalidate_cache(self.cache.clone(), msg.clone());
|
||||
if matches!(msg, Notification::AllowedIpsUpdate { .. }) {
|
||||
Metrics::get()
|
||||
|
||||
@@ -89,7 +89,7 @@ impl<'a> ClientFirstMessage<'a> {
|
||||
write!(&mut message, "r={}", self.nonce).unwrap();
|
||||
base64::encode_config_buf(nonce, base64::STANDARD, &mut message);
|
||||
let combined_nonce = 2..message.len();
|
||||
write!(&mut message, ",s={},i={}", salt_base64, iterations).unwrap();
|
||||
write!(&mut message, ",s={salt_base64},i={iterations}").unwrap();
|
||||
|
||||
// This design guarantees that it's impossible to create a
|
||||
// server-first-message without receiving a client-first-message
|
||||
|
||||
@@ -82,13 +82,7 @@ mod tests {
|
||||
let stored_key = "D5h6KTMBlUvDJk2Y8ELfC1Sjtc6k9YHjRyuRZyBNJns=";
|
||||
let server_key = "Pi3QHbcluX//NDfVkKlFl88GGzlJ5LkyPwcdlN/QBvI=";
|
||||
|
||||
let secret = format!(
|
||||
"SCRAM-SHA-256${iterations}:{salt}${stored_key}:{server_key}",
|
||||
iterations = iterations,
|
||||
salt = salt,
|
||||
stored_key = stored_key,
|
||||
server_key = server_key,
|
||||
);
|
||||
let secret = format!("SCRAM-SHA-256${iterations}:{salt}${stored_key}:{server_key}");
|
||||
|
||||
let parsed = ServerSecret::parse(&secret).unwrap();
|
||||
assert_eq!(parsed.iterations, iterations);
|
||||
|
||||
@@ -222,12 +222,11 @@ fn thread_rt(pool: Arc<ThreadPool>, worker: Worker<JobSpec>, index: usize) {
|
||||
}
|
||||
|
||||
for i in 0.. {
|
||||
let mut job = match worker
|
||||
let Some(mut job) = worker
|
||||
.pop()
|
||||
.or_else(|| pool.steal(&mut rng, index, &worker))
|
||||
{
|
||||
Some(job) => job,
|
||||
None => continue 'wait,
|
||||
else {
|
||||
continue 'wait;
|
||||
};
|
||||
|
||||
pool.metrics
|
||||
|
||||
@@ -93,11 +93,11 @@ pub async fn task_main(
|
||||
let mut tls_server_config = rustls::ServerConfig::clone(&config.to_server_config());
|
||||
// prefer http2, but support http/1.1
|
||||
tls_server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
Arc::new(tls_server_config) as Arc<_>
|
||||
Arc::new(tls_server_config)
|
||||
}
|
||||
None => {
|
||||
warn!("TLS config is missing");
|
||||
Arc::new(NoTls) as Arc<_>
|
||||
Arc::new(NoTls)
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -44,7 +44,11 @@ impl PoolingBackend {
|
||||
password: &[u8],
|
||||
) -> Result<ComputeCredentials, AuthError> {
|
||||
let user_info = user_info.clone();
|
||||
let backend = self.config.auth_backend.as_ref().map(|_| user_info.clone());
|
||||
let backend = self
|
||||
.config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|()| user_info.clone());
|
||||
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
|
||||
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
|
||||
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
|
||||
@@ -101,10 +105,10 @@ impl PoolingBackend {
|
||||
jwt: &str,
|
||||
) -> Result<ComputeCredentials, AuthError> {
|
||||
match &self.config.auth_backend {
|
||||
crate::auth::BackendType::Console(_, _) => {
|
||||
crate::auth::BackendType::Console(_, ()) => {
|
||||
Err(AuthError::auth_failed("JWT login is not yet supported"))
|
||||
}
|
||||
crate::auth::BackendType::Link(_, _) => Err(AuthError::auth_failed(
|
||||
crate::auth::BackendType::Link(_, ()) => Err(AuthError::auth_failed(
|
||||
"JWT login over link proxy is not supported",
|
||||
)),
|
||||
crate::auth::BackendType::Local(cache) => {
|
||||
@@ -138,12 +142,12 @@ impl PoolingBackend {
|
||||
keys: ComputeCredentials,
|
||||
force_new: bool,
|
||||
) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
|
||||
let maybe_client = if !force_new {
|
||||
info!("pool: looking for an existing connection");
|
||||
self.pool.get(ctx, &conn_info)?
|
||||
} else {
|
||||
let maybe_client = if force_new {
|
||||
info!("pool: pool is disabled");
|
||||
None
|
||||
} else {
|
||||
info!("pool: looking for an existing connection");
|
||||
self.pool.get(ctx, &conn_info)?
|
||||
};
|
||||
|
||||
if let Some(client) = maybe_client {
|
||||
@@ -152,7 +156,7 @@ impl PoolingBackend {
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let backend = self.config.auth_backend.as_ref().map(|_| keys);
|
||||
let backend = self.config.auth_backend.as_ref().map(|()| keys);
|
||||
crate::proxy::connect_compute::connect_to_compute(
|
||||
ctx,
|
||||
&TokioMechanism {
|
||||
|
||||
@@ -339,9 +339,9 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
} = pool.get_mut();
|
||||
|
||||
// ensure that closed clients are removed
|
||||
pools.iter_mut().for_each(|(_, db_pool)| {
|
||||
for db_pool in pools.values_mut() {
|
||||
clients_removed += db_pool.clear_closed_clients(total_conns);
|
||||
});
|
||||
}
|
||||
|
||||
// we only remove this pool if it has no active connections
|
||||
if *total_conns == 0 {
|
||||
@@ -405,21 +405,20 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
if client.is_closed() {
|
||||
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
|
||||
return Ok(None);
|
||||
} else {
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
info!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
client.session.send(ctx.session_id())?;
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.success();
|
||||
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
|
||||
}
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
info!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
client.session.send(ctx.session_id())?;
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.success();
|
||||
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
@@ -660,7 +659,7 @@ impl<C: ClientInnerExt> Client<C> {
|
||||
span: _,
|
||||
} = self;
|
||||
let inner = inner.as_mut().expect("client inner should not be removed");
|
||||
(&mut inner.inner, Discard { pool, conn_info })
|
||||
(&mut inner.inner, Discard { conn_info, pool })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -722,7 +721,9 @@ impl<C: ClientInnerExt> Drop for Client<C> {
|
||||
mod tests {
|
||||
use std::{mem, sync::atomic::AtomicBool};
|
||||
|
||||
use crate::{serverless::cancel_set::CancelSet, BranchId, EndpointId, ProjectId};
|
||||
use crate::{
|
||||
proxy::NeonOptions, serverless::cancel_set::CancelSet, BranchId, EndpointId, ProjectId,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -781,7 +782,7 @@ mod tests {
|
||||
user_info: ComputeUserInfo {
|
||||
user: "user".into(),
|
||||
endpoint: "endpoint".into(),
|
||||
options: Default::default(),
|
||||
options: NeonOptions::default(),
|
||||
},
|
||||
dbname: "dbname".into(),
|
||||
auth: AuthData::Password("password".as_bytes().into()),
|
||||
@@ -839,7 +840,7 @@ mod tests {
|
||||
user_info: ComputeUserInfo {
|
||||
user: "user".into(),
|
||||
endpoint: "endpoint-2".into(),
|
||||
options: Default::default(),
|
||||
options: NeonOptions::default(),
|
||||
},
|
||||
dbname: "dbname".into(),
|
||||
auth: AuthData::Password("password".as_bytes().into()),
|
||||
|
||||
@@ -55,7 +55,7 @@ fn json_array_to_pg_array(value: &Value) -> Option<String> {
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
|
||||
Some(format!("{{{}}}", vals))
|
||||
Some(format!("{{{vals}}}"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,12 +207,12 @@ fn get_conn_info(
|
||||
.ok_or(ConnInfoError::MalformedEndpoint)?
|
||||
} else {
|
||||
hostname
|
||||
.split_once(".")
|
||||
.split_once('.')
|
||||
.map_or(hostname, |(prefix, _)| prefix)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
Some(url::Host::Ipv4(_)) | Some(url::Host::Ipv6(_)) | None => {
|
||||
Some(url::Host::Ipv4(_) | url::Host::Ipv6(_)) | None => {
|
||||
return Err(ConnInfoError::MissingHostname)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -67,7 +67,7 @@ impl<S: AsyncRead + Unpin> PqStream<S> {
|
||||
FeMessage::PasswordMessage(msg) => Ok(msg),
|
||||
bad => Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!("unexpected message type: {:?}", bad),
|
||||
format!("unexpected message type: {bad:?}"),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -450,12 +450,9 @@ async fn upload_events_chunk(
|
||||
remote_path: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let storage = match storage {
|
||||
Some(storage) => storage,
|
||||
None => {
|
||||
error!("no remote storage configured");
|
||||
return Ok(());
|
||||
}
|
||||
let Some(storage) = storage else {
|
||||
error!("no remote storage configured");
|
||||
return Ok(());
|
||||
};
|
||||
let data = serde_json::to_vec(&chunk).context("serialize metrics")?;
|
||||
let mut encoder = GzipEncoder::new(Vec::new());
|
||||
|
||||
@@ -31,7 +31,7 @@ pub struct Waiters<T>(pub(self) Mutex<HashMap<String, oneshot::Sender<T>>>);
|
||||
|
||||
impl<T> Default for Waiters<T> {
|
||||
fn default() -> Self {
|
||||
Waiters(Default::default())
|
||||
Waiters(Mutex::default())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import random
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from functools import total_ordering
|
||||
from typing import Any, Dict, Type, TypeVar, Union
|
||||
|
||||
@@ -213,3 +214,9 @@ class TenantShardId:
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(self._tuple())
|
||||
|
||||
|
||||
# TODO: Replace with `StrEnum` when we upgrade to python 3.11
|
||||
class TimelineArchivalState(str, Enum):
|
||||
ARCHIVED = "Archived"
|
||||
UNARCHIVED = "Unarchived"
|
||||
|
||||
@@ -10,7 +10,7 @@ import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineArchivalState, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
|
||||
from fixtures.pg_version import PgVersion
|
||||
@@ -621,6 +621,22 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
def timeline_archival_config(
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
timeline_id: TimelineId,
|
||||
state: TimelineArchivalState,
|
||||
):
|
||||
config = {"state": state.value}
|
||||
log.info(
|
||||
f"requesting timeline archival config {config} for tenant {tenant_id} and timeline {timeline_id}"
|
||||
)
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/archival_config",
|
||||
json=config,
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
def timeline_get_lsn_by_timestamp(
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
|
||||
96
test_runner/regress/test_timeline_archive.py
Normal file
96
test_runner/regress/test_timeline_archive.py
Normal file
@@ -0,0 +1,96 @@
|
||||
import pytest
|
||||
from fixtures.common_types import TenantId, TimelineArchivalState, TimelineId
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
|
||||
|
||||
def test_timeline_archive(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Timeline .* was not found.*",
|
||||
".*timeline not found.*",
|
||||
".*Cannot archive timeline which has unarchived child timelines.*",
|
||||
".*Precondition failed: Requested tenant is missing.*",
|
||||
]
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# first try to archive non existing timeline
|
||||
# for existing tenant:
|
||||
invalid_timeline_id = TimelineId.generate()
|
||||
with pytest.raises(PageserverApiException, match="timeline not found") as exc:
|
||||
ps_http.timeline_archival_config(
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=invalid_timeline_id,
|
||||
state=TimelineArchivalState.ARCHIVED,
|
||||
)
|
||||
|
||||
assert exc.value.status_code == 404
|
||||
|
||||
# for non existing tenant:
|
||||
invalid_tenant_id = TenantId.generate()
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match=f"NotFound: tenant {invalid_tenant_id}",
|
||||
) as exc:
|
||||
ps_http.timeline_archival_config(
|
||||
tenant_id=invalid_tenant_id,
|
||||
timeline_id=invalid_timeline_id,
|
||||
state=TimelineArchivalState.ARCHIVED,
|
||||
)
|
||||
|
||||
assert exc.value.status_code == 404
|
||||
|
||||
# construct pair of branches to validate that pageserver prohibits
|
||||
# archival of ancestor timelines when they have non-archived child branches
|
||||
parent_timeline_id = env.neon_cli.create_branch("test_ancestor_branch_archive_parent", "empty")
|
||||
|
||||
leaf_timeline_id = env.neon_cli.create_branch(
|
||||
"test_ancestor_branch_archive_branch1", "test_ancestor_branch_archive_parent"
|
||||
)
|
||||
|
||||
timeline_path = env.pageserver.timeline_dir(env.initial_tenant, parent_timeline_id)
|
||||
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="Cannot archive timeline which has non-archived child timelines",
|
||||
) as exc:
|
||||
assert timeline_path.exists()
|
||||
|
||||
ps_http.timeline_archival_config(
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=parent_timeline_id,
|
||||
state=TimelineArchivalState.ARCHIVED,
|
||||
)
|
||||
|
||||
assert exc.value.status_code == 412
|
||||
|
||||
# Test timeline_detail
|
||||
leaf_detail = ps_http.timeline_detail(
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=leaf_timeline_id,
|
||||
)
|
||||
assert leaf_detail["is_archived"] is False
|
||||
|
||||
# Test that archiving the leaf timeline and then the parent works
|
||||
ps_http.timeline_archival_config(
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=leaf_timeline_id,
|
||||
state=TimelineArchivalState.ARCHIVED,
|
||||
)
|
||||
leaf_detail = ps_http.timeline_detail(
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=leaf_timeline_id,
|
||||
)
|
||||
assert leaf_detail["is_archived"] is True
|
||||
|
||||
ps_http.timeline_archival_config(
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=parent_timeline_id,
|
||||
state=TimelineArchivalState.ARCHIVED,
|
||||
)
|
||||
@@ -326,15 +326,13 @@ files:
|
||||
SELECT checkpoints_timed FROM pg_stat_bgwriter;
|
||||
|
||||
- metric_name: compute_logical_snapshot_files
|
||||
type: guage
|
||||
type: gauge
|
||||
help: 'Number of snapshot files in pg_logical/snapshot'
|
||||
key_labels:
|
||||
- tenant_id
|
||||
- timeline_id
|
||||
values: [num_logical_snapshot_files]
|
||||
query: |
|
||||
SELECT
|
||||
(SELECT setting FROM pg_settings WHERE name = 'neon.tenant_id') AS tenant_id,
|
||||
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp. These
|
||||
-- temporary snapshot files are renamed to the actual snapshot files after they are
|
||||
@@ -356,6 +354,17 @@ files:
|
||||
from pg_replication_slots
|
||||
where slot_type = 'logical';
|
||||
|
||||
- metric_name: compute_subscriptions_count
|
||||
type: gauge
|
||||
help: 'Number of logical replication subscriptions grouped by enabled/disabled'
|
||||
key_labels:
|
||||
- enabled
|
||||
values: [subscriptions_count]
|
||||
query: |
|
||||
select subenabled::text as enabled, count(*) as subscriptions_count
|
||||
from pg_subscription
|
||||
group by subenabled;
|
||||
|
||||
- metric_name: retained_wal
|
||||
type: gauge
|
||||
help: 'Retained WAL in inactive replication slots'
|
||||
|
||||
Reference in New Issue
Block a user