Compare commits

..

14 Commits

Author SHA1 Message Date
Christian Schwarz
98d0bca8d1 Merge branch 'problame/repartition-bail-on-concurrent-call' into problame/avoid-count-deltas-if-no-changes 2024-02-21 18:05:29 +00:00
Christian Schwarz
bafa6d37d9 Merge remote-tracking branch 'origin/main' into problame/repartition-bail-on-concurrent-call 2024-02-21 18:05:28 +00:00
Christian Schwarz
ad23c945df WIP 2024-02-21 18:04:04 +00:00
Conrad Ludgate
60e5a56a5a proxy: include client IP in ip deny message (#6854)
## Problem

Debugging IP deny errors is difficult for our users

## Summary of changes

Include the client IP in the deny message
2024-02-21 18:24:59 +01:00
John Spray
afda4420bd test_sharding_ingress: bigger data, skip in debug mode (#6859)
## Problem

Accidentally merged #6852 without this test stability change. The test
as-written could sometimes fail on debug-pg14.

## Summary of changes

- Write more data so that the test can more reliably assert on the ratio
of total layers to small layers
- Skip the test in debug mode, since writing any more than a tiny bit of
data tends to result in a flaky test in the much slower debug
environment.
2024-02-21 17:03:55 +00:00
Christian Schwarz
4536caa253 LayerMap: keep track of rebuilds 2024-02-21 17:02:39 +00:00
Christian Schwarz
d7920a2a57 Merge remote-tracking branch 'origin/main' into problame/repartition-bail-on-concurrent-call 2024-02-21 16:01:29 +00:00
John Spray
ce1673a8c4 tests: improve stability of tests using wait_for_upload_queue_empty (#6856)
## Problem

PR #6834 introduced an assertion that the sets of metric labels on
finished operations should equal those on started operations, which is
not true if no operations have finished yet for a particular set of
labels.

## Summary of changes

- Instead of asserting out, wait and re-check in the case that finished
metrics don't match started
2024-02-21 16:00:17 +00:00
Christian Schwarz
f990e07581 Merge remote-tracking branch 'origin/main' into problame/repartition-bail-on-concurrent-call 2024-02-21 15:56:07 +00:00
Christian Schwarz
1af2f3caf1 Timeline::repartition: enforce no concurrent callers & lsn to not move backwards
This PR enforces aspects of `Timeline::repartition` that were already
true at runtime:

- it's not called concurrently, so, bail out if it is anyway (see
  comment why it's not called concurrently)
- the `lsn` should never be moving backwards over the lifetime of a
  Timeline object, because last_record_lsn() can only move forwards
  over the lifetime of a Timeline object

part of #6861
2024-02-21 15:52:28 +00:00
John Spray
532b0fa52b Revise CODEOWNERS (#6840)
## Problem

- Current file has ambiguous ownership for some paths
- The /control_plane/attachment_service is storage specific & updates
there don't need to request reviews from other teams.

## Summary of changes

- Define a single owning team per path, so that we can make reviews by
that team mandatory in future.
- Remove the top-level /control_plane as no one specific team owns
neon_local, and we would rarely see a PR that exclusively touches that
path.
- Add an entry for /control_plane/attachment_service, which is newer
storage-specific code.
2024-02-21 15:45:22 +00:00
Arpad Müller
4de2f0f3e0 Implement a sharded time travel recovery endpoint (#6821)
The sharding service didn't have support for S3 disaster recovery.

This PR adds a new endpoint to the attachment service, which is slightly
different from the endpoint on the pageserver, in that it takes the
shard count history of the tenant as json parameters: we need to do
time travel recovery for both the shard count at the target time and the
shard count at the current moment in time, as well as the past shard
counts that either still reference.

Fixes #6604, part of https://github.com/neondatabase/cloud/issues/8233

---------

Co-authored-by: John Spray <john@neon.tech>
2024-02-21 16:35:37 +01:00
Joonas Koivunen
41464325c7 fix: remaining missed cancellations and timeouts (#6843)
As noticed in #6836 some occurances of error conversions were missed in
#6697:
- `std::io::Error` popped up by `tokio::io::copy_buf` containing
`DownloadError` was turned into `DownloadError::Other`
- similarly for secondary downloader errors

These changes come at the loss of pathname context.

Cc: #6096
2024-02-21 15:20:59 +00:00
Joonas Koivunen
7257ffbf75 feat: imitiation_only eviction_task policy (#6598)
mostly reusing the existing and perhaps controversially sharing the
histogram. in practice we don't configure this per-tenant.

Cc: #5331
2024-02-21 16:57:30 +02:00
45 changed files with 978 additions and 1665 deletions

View File

@@ -1,10 +1,10 @@
/compute_tools/ @neondatabase/control-plane @neondatabase/compute
/control_plane/ @neondatabase/compute @neondatabase/storage
/libs/pageserver_api/ @neondatabase/compute @neondatabase/storage
/control_plane/attachment_service @neondatabase/storage
/libs/pageserver_api/ @neondatabase/storage
/libs/postgres_ffi/ @neondatabase/compute
/libs/remote_storage/ @neondatabase/storage
/libs/safekeeper_api/ @neondatabase/safekeepers
/libs/vm_monitor/ @neondatabase/autoscaling @neondatabase/compute
/libs/vm_monitor/ @neondatabase/autoscaling
/pageserver/ @neondatabase/storage
/pgxn/ @neondatabase/compute
/proxy/ @neondatabase/proxy

1
Cargo.lock generated
View File

@@ -284,6 +284,7 @@ dependencies = [
"diesel_migrations",
"futures",
"git-version",
"humantime",
"hyper",
"metrics",
"once_cell",

View File

@@ -18,6 +18,7 @@ clap.workspace = true
futures.workspace = true
git-version.workspace = true
hyper.workspace = true
humantime.workspace = true
once_cell.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true

View File

@@ -4,7 +4,7 @@ use hyper::{Body, Request, Response};
use hyper::{StatusCode, Uri};
use pageserver_api::models::{
TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
TimelineCreateRequest,
TenantTimeTravelRequest, TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
@@ -12,7 +12,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use utils::auth::SwappableJwtAuth;
use utils::http::endpoint::{auth_middleware, request_span};
use utils::http::request::parse_request_param;
use utils::http::request::{must_get_query_param, parse_request_param};
use utils::id::{TenantId, TimelineId};
use utils::{
@@ -180,6 +180,39 @@ async fn handle_tenant_location_config(
)
}
async fn handle_tenant_time_travel_remote_storage(
service: Arc<Service>,
mut req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
let timestamp_raw = must_get_query_param(&req, "travel_to")?;
let _timestamp = humantime::parse_rfc3339(&timestamp_raw).map_err(|_e| {
ApiError::BadRequest(anyhow::anyhow!(
"Invalid time for travel_to: {timestamp_raw:?}"
))
})?;
let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
ApiError::BadRequest(anyhow::anyhow!(
"Invalid time for done_if_after: {done_if_after_raw:?}"
))
})?;
service
.tenant_time_travel_remote_storage(
&time_travel_req,
tenant_id,
timestamp_raw,
done_if_after_raw,
)
.await?;
json_response(StatusCode::OK, ())
}
async fn handle_tenant_delete(
service: Arc<Service>,
req: Request<Body>,
@@ -477,6 +510,9 @@ pub fn make_router(
.put("/v1/tenant/:tenant_id/location_config", |r| {
tenant_service_handler(r, handle_tenant_location_config)
})
.put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
tenant_service_handler(r, handle_tenant_time_travel_remote_storage)
})
// Timeline operations
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
tenant_service_handler(r, handle_tenant_timeline_delete)

View File

@@ -175,10 +175,7 @@ impl Scheduler {
}
}
pub(crate) fn schedule_shard(
&mut self,
hard_exclude: &[NodeId],
) -> Result<NodeId, ScheduleError> {
pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result<NodeId, ScheduleError> {
if self.nodes.is_empty() {
return Err(ScheduleError::NoPageservers);
}

View File

@@ -1,4 +1,5 @@
use std::{
borrow::Cow,
cmp::Ordering,
collections::{BTreeMap, HashMap, HashSet},
str::FromStr,
@@ -25,7 +26,7 @@ use pageserver_api::{
self, LocationConfig, LocationConfigListResponse, LocationConfigMode, ShardParameters,
TenantConfig, TenantCreateRequest, TenantLocationConfigRequest,
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
TenantShardSplitResponse, TenantTimeTravelRequest, TimelineCreateRequest, TimelineInfo,
},
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
};
@@ -1329,6 +1330,95 @@ impl Service {
Ok(result)
}
pub(crate) async fn tenant_time_travel_remote_storage(
&self,
time_travel_req: &TenantTimeTravelRequest,
tenant_id: TenantId,
timestamp: Cow<'_, str>,
done_if_after: Cow<'_, str>,
) -> Result<(), ApiError> {
let node = {
let locked = self.inner.read().unwrap();
// Just a sanity check to prevent misuse: the API expects that the tenant is fully
// detached everywhere, and nothing writes to S3 storage. Here, we verify that,
// but only at the start of the process, so it's really just to prevent operator
// mistakes.
for (shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
if shard.intent.get_attached().is_some() || !shard.intent.get_secondary().is_empty()
{
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"We want tenant to be attached in shard with tenant_shard_id={shard_id}"
)));
}
let maybe_attached = shard
.observed
.locations
.iter()
.filter_map(|(node_id, observed_location)| {
observed_location
.conf
.as_ref()
.map(|loc| (node_id, observed_location, loc.mode))
})
.find(|(_, _, mode)| *mode != LocationConfigMode::Detached);
if let Some((node_id, _observed_location, mode)) = maybe_attached {
return Err(ApiError::InternalServerError(anyhow::anyhow!("We observed attached={mode:?} tenant in node_id={node_id} shard with tenant_shard_id={shard_id}")));
}
}
let scheduler = &locked.scheduler;
// Right now we only perform the operation on a single node without parallelization
// TODO fan out the operation to multiple nodes for better performance
let node_id = scheduler.schedule_shard(&[])?;
let node = locked
.nodes
.get(&node_id)
.expect("Pageservers may not be deleted while lock is active");
node.clone()
};
// The shard count is encoded in the remote storage's URL, so we need to handle all historically used shard counts
let mut counts = time_travel_req
.shard_counts
.iter()
.copied()
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
counts.sort_unstable();
for count in counts {
let shard_ids = (0..count.count())
.map(|i| TenantShardId {
tenant_id,
shard_number: ShardNumber(i),
shard_count: count,
})
.collect::<Vec<_>>();
for tenant_shard_id in shard_ids {
let client =
mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
client
.tenant_time_travel_remote_storage(
tenant_shard_id,
&timestamp,
&done_if_after,
)
.await
.map_err(|e| {
ApiError::InternalServerError(anyhow::anyhow!(
"Error doing time travel recovery for shard {tenant_shard_id} on node {}: {e}",
node.id
))
})?;
}
}
Ok(())
}
pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
self.ensure_attached_wait(tenant_id).await?;

View File

@@ -495,6 +495,13 @@ impl TenantState {
}
}
for node_id in self.observed.locations.keys() {
if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
// We have observed state that isn't part of our intent: need to clean it up.
return true;
}
}
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
// wake up a reconciler to send it.
if self.pending_compute_notification {

View File

@@ -616,7 +616,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
let tenant_id = get_tenant_id(create_match, env)?;
let new_branch_name = create_match
.get_one::<String>("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;
.ok_or_else(|| anyhow!("No branch name provided"))?; // TODO
let pg_version = create_match
.get_one::<u32>("pg-version")

View File

@@ -291,6 +291,7 @@ pub struct TenantConfig {
pub enum EvictionPolicy {
NoEviction,
LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
OnlyImitiate(EvictionPolicyLayerAccessThreshold),
}
impl EvictionPolicy {
@@ -298,6 +299,7 @@ impl EvictionPolicy {
match self {
EvictionPolicy::NoEviction => "NoEviction",
EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
}
}
}
@@ -342,7 +344,7 @@ impl ThrottleConfig {
/// A flattened analog of a `pagesever::tenant::LocationMode`, which
/// lists out all possible states (and the virtual "Detached" state)
/// in a flat form rather than using rust-style enums.
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
pub enum LocationConfigMode {
AttachedSingle,
AttachedMulti,
@@ -406,6 +408,12 @@ pub struct TenantLocationConfigRequest {
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantTimeTravelRequest {
pub shard_counts: Vec<ShardCount>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantShardLocation {
@@ -720,7 +728,6 @@ pub enum PagestreamFeMessage {
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
GetVectoredPages(PagestreamGetVectoredPagesRequest),
}
// Wrapped in libpq CopyData
@@ -732,7 +739,6 @@ pub enum PagestreamBeMessage {
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
GetVectoredPages(PagestreamGetVectoredPagesResponse),
}
// Keep in sync with `pagestore_client.h`
@@ -744,7 +750,6 @@ enum PagestreamBeMessageTag {
Error = 103,
DbSize = 104,
GetSlruSegment = 105,
GetVectoredPages = 106,
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
@@ -756,7 +761,6 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
103 => Ok(PagestreamBeMessageTag::Error),
104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
106 => Ok(PagestreamBeMessageTag::GetVectoredPages),
_ => Err(value),
}
}
@@ -799,15 +803,6 @@ pub struct PagestreamGetSlruSegmentRequest {
pub segno: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetVectoredPagesRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
pub count: u8,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub exists: bool,
@@ -828,12 +823,6 @@ pub struct PagestreamGetSlruSegmentResponse {
pub segment: Bytes,
}
#[derive(Debug)]
pub struct PagestreamGetVectoredPagesResponse {
pub page_count: u8,
pub pages: Bytes,
}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub message: String,
@@ -905,18 +894,6 @@ impl PagestreamFeMessage {
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
Self::GetVectoredPages(req) => {
bytes.put_u8(5);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
bytes.put_u8(req.count);
}
}
bytes.into()
@@ -975,20 +952,6 @@ impl PagestreamFeMessage {
segno: body.read_u32::<BigEndian>()?,
},
)),
5 => Ok(PagestreamFeMessage::GetVectoredPages(
PagestreamGetVectoredPagesRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
count: body.read_u8()?,
},
)),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
@@ -1030,12 +993,6 @@ impl PagestreamBeMessage {
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
Self::GetVectoredPages(resp) => {
bytes.put_u8(Tag::GetVectoredPages as u8);
bytes.put_u8(resp.page_count);
bytes.put(&resp.pages[..]);
}
}
bytes.into()
@@ -1084,15 +1041,6 @@ impl PagestreamBeMessage {
segment: segment.into(),
})
}
Tag::GetVectoredPages => {
let page_count = buf.read_u8()?;
let mut pages = vec![0; page_count as usize * 8192];
buf.read_exact(&mut pages)?;
Self::GetVectoredPages(PagestreamGetVectoredPagesResponse {
page_count,
pages: pages.into(),
})
}
};
let remaining = buf.into_inner();
if !remaining.is_empty() {
@@ -1112,7 +1060,6 @@ impl PagestreamBeMessage {
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
Self::GetVectoredPages(_) => "GetVectoredPages",
}
}
}

View File

@@ -44,6 +44,26 @@ impl DownloadError {
}
}
impl From<std::io::Error> for DownloadError {
fn from(value: std::io::Error) -> Self {
let needs_unwrap = value.kind() == std::io::ErrorKind::Other
&& value
.get_ref()
.and_then(|x| x.downcast_ref::<DownloadError>())
.is_some();
if needs_unwrap {
*value
.into_inner()
.expect("just checked")
.downcast::<DownloadError>()
.expect("just checked")
} else {
DownloadError::Other(value.into())
}
}
}
#[derive(Debug)]
pub enum TimeTravelError {
/// Validation or other error happened due to user input.
@@ -142,13 +162,12 @@ impl std::fmt::Display for TimeoutOrCancel {
impl std::error::Error for TimeoutOrCancel {}
impl TimeoutOrCancel {
pub fn caused(error: &anyhow::Error) -> Option<&Self> {
error.root_cause().downcast_ref()
}
/// Returns true if the error was caused by [`TimeoutOrCancel::Cancel`].
pub fn caused_by_cancel(error: &anyhow::Error) -> bool {
Self::caused(error).is_some_and(Self::is_cancel)
error
.root_cause()
.downcast_ref::<Self>()
.is_some_and(Self::is_cancel)
}
pub fn is_cancel(&self) -> bool {

View File

@@ -73,6 +73,8 @@ where
if !*this.hit {
if let Poll::Ready(e) = this.cancellation.poll(cx) {
*this.hit = true;
// most likely this will be a std::io::Error wrapping a DownloadError
let e = Err(std::io::Error::from(e));
return Poll::Ready(Some(e));
}
@@ -130,6 +132,8 @@ mod tests {
.is_some_and(|e| matches!(e, DownloadError::Cancelled)),
"{inner:?}"
);
let e = DownloadError::from(e);
assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
tokio::select! {
_ = stream.next() => unreachable!("no timeout ever happens as we were already cancelled"),
@@ -146,7 +150,7 @@ mod tests {
let stream = DownloadStream::new(cancel_or_timeout(timeout, cancel.clone()), inner);
let mut stream = std::pin::pin!(stream);
// because the stream uses 120s timeout we are paused, we advance to 120s right away.
// because the stream uses 120s timeout and we are paused, we advance to 120s right away.
let first = stream.next();
let e = first.await.expect("there must be some").unwrap_err();
@@ -158,6 +162,8 @@ mod tests {
.is_some_and(|e| matches!(e, DownloadError::Timeout)),
"{inner:?}"
);
let e = DownloadError::from(e);
assert!(matches!(e, DownloadError::Timeout), "{e:?}");
cancel.cancel();

View File

@@ -217,6 +217,20 @@ impl Client {
}
}
pub async fn tenant_time_travel_remote_storage(
&self,
tenant_shard_id: TenantShardId,
timestamp: &str,
done_if_after: &str,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
self.mgmt_api_endpoint
);
self.request(Method::PUT, &uri, ()).await?;
Ok(())
}
pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
self.request(Method::PUT, &uri, req).await?;

View File

@@ -4,8 +4,7 @@ use futures::SinkExt;
use pageserver_api::{
models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
PagestreamGetPageResponse, PagestreamGetVectoredPagesRequest,
PagestreamGetVectoredPagesResponse,
PagestreamGetPageResponse,
},
reltag::RelTag,
};
@@ -158,39 +157,7 @@ impl PagestreamClient {
PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetSlruSegment(_)
| PagestreamBeMessage::GetVectoredPages(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
msg.kind()
)
}
}
}
pub async fn getpages(
&mut self,
req: PagestreamGetVectoredPagesRequest,
) -> anyhow::Result<PagestreamGetVectoredPagesResponse> {
let req = PagestreamFeMessage::GetVectoredPages(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?;
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;
let msg = PagestreamBeMessage::deserialize(next)?;
match msg {
PagestreamBeMessage::GetVectoredPages(p) => Ok(p),
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetSlruSegment(_)
| PagestreamBeMessage::GetPage(_) => {
| PagestreamBeMessage::GetSlruSegment(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
msg.kind()

View File

@@ -8,7 +8,7 @@ use utils::lsn::Lsn;
use rand::prelude::*;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tracing::{info, instrument};
use tracing::{debug, info, instrument};
use std::collections::HashMap;
use std::num::NonZeroUsize;
@@ -28,8 +28,6 @@ pub(crate) struct Args {
#[clap(long, default_value = "localhost:64000")]
page_service_host_port: String,
#[clap(long)]
page_service_connstring: Option<String>,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")]
num_clients: NonZeroUsize,
@@ -232,17 +230,12 @@ async fn client(
) {
start_work_barrier.wait().await;
let connstr = match &args.page_service_connstring {
Some(connstr) => connstr.clone(),
None => crate::util::connstring::connstring(
&args.page_service_host_port,
args.pageserver_jwt.as_deref(),
),
};
let client = pageserver_client::page_service::Client::new(connstr)
.await
.unwrap();
let client = pageserver_client::page_service::Client::new(crate::util::connstring::connstring(
&args.page_service_host_port,
args.pageserver_jwt.as_deref(),
))
.await
.unwrap();
while let Some(Work { lsn, gzip }) = work.recv().await {
let start = Instant::now();
@@ -270,7 +263,7 @@ async fn client(
}
})
.await;
info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {

View File

@@ -2,7 +2,7 @@ use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver_api::key::{is_rel_block_key, key_to_rel_block, Key};
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamGetVectoredPagesRequest};
use pageserver_api::models::PagestreamGetPageRequest;
use tokio_util::sync::CancellationToken;
use utils::id::TenantTimelineId;
@@ -57,8 +57,6 @@ pub(crate) struct Args {
/// [`pageserver_api::models::virtual_file::IoEngineKind`].
#[clap(long)]
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
#[clap(long)]
vectored_read_size: Option<u8>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -301,45 +299,22 @@ async fn main_impl(
}
let start = Instant::now();
if let Some(size) = args.vectored_read_size {
assert!(size > 0);
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(is_rel_block_key(&key));
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
PagestreamGetVectoredPagesRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
count: size
}
};
client.getpages(req).await.unwrap();
} else {
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(is_rel_block_key(&key));
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
}
};
client.getpage(req).await.unwrap();
}
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(is_rel_block_key(&key));
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
}
};
client.getpage(req).await.unwrap();
let end = Instant::now();
live_stats.request_done();
ticks_processed += 1;

View File

@@ -143,7 +143,6 @@ where
ar: &'a mut Builder<&'b mut W>,
buf: Vec<u8>,
current_segment: Option<(SlruKind, u32)>,
total_blocks: usize,
}
impl<'a, 'b, W> SlruSegmentsBuilder<'a, 'b, W>
@@ -155,7 +154,6 @@ where
ar,
buf: Vec::new(),
current_segment: None,
total_blocks: 0,
}
}
@@ -201,8 +199,7 @@ where
let header = new_tar_header(&segname, self.buf.len() as u64)?;
self.ar.append(&header, self.buf.as_slice()).await?;
self.total_blocks += nblocks;
debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
trace!("Added to basebackup slru {} relsize {}", segname, nblocks);
self.buf.clear();
@@ -210,15 +207,11 @@ where
}
async fn finish(mut self) -> anyhow::Result<()> {
let res = if self.current_segment.is_none() || self.buf.is_empty() {
Ok(())
} else {
self.flush().await
};
if self.current_segment.is_none() || self.buf.is_empty() {
return Ok(());
}
info!("Collected {} SLRU blocks", self.total_blocks);
res
self.flush().await
}
}

View File

@@ -87,10 +87,6 @@ pub mod defaults {
pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
pub const DEFAULT_MAX_VECTORED_READ_SIZE: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
///
/// Default built-in configuration file.
///
@@ -130,10 +126,6 @@ pub mod defaults {
#get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}'
#max_vectored_read_size = '{DEFAULT_MAX_VECTORED_READ_SIZE}'
#validate_vectored_get = '{DEFAULT_VALIDATE_VECTORED_GET}'
[tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -271,10 +263,6 @@ pub struct PageServerConf {
pub virtual_file_io_engine: virtual_file::IoEngineKind,
pub get_vectored_impl: GetVectoredImpl,
pub max_vectored_read_size: usize,
pub validate_vectored_get: bool,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -363,10 +351,6 @@ struct PageServerConfigBuilder {
virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
get_vectored_impl: BuilderValue<GetVectoredImpl>,
max_vectored_read_size: BuilderValue<usize>,
validate_vectored_get: BuilderValue<bool>,
}
impl Default for PageServerConfigBuilder {
@@ -446,8 +430,6 @@ impl Default for PageServerConfigBuilder {
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()),
max_vectored_read_size: Set(DEFAULT_MAX_VECTORED_READ_SIZE),
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
}
}
}
@@ -612,14 +594,6 @@ impl PageServerConfigBuilder {
self.get_vectored_impl = BuilderValue::Set(value);
}
pub fn get_max_vectored_read_size(&mut self, value: usize) {
self.max_vectored_read_size = BuilderValue::Set(value);
}
pub fn get_validate_vectored_get(&mut self, value: bool) {
self.validate_vectored_get = BuilderValue::Set(value);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_warmup = self
.concurrent_tenant_warmup
@@ -733,12 +707,6 @@ impl PageServerConfigBuilder {
get_vectored_impl: self
.get_vectored_impl
.ok_or(anyhow!("missing get_vectored_impl"))?,
max_vectored_read_size: self
.max_vectored_read_size
.ok_or(anyhow!("missing max_vectored_read_size"))?,
validate_vectored_get: self
.validate_vectored_get
.ok_or(anyhow!("missing validate_vectored_get"))?,
})
}
}
@@ -996,12 +964,6 @@ impl PageServerConf {
"get_vectored_impl" => {
builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?)
}
"max_vectored_read_size" => {
builder.get_max_vectored_read_size(parse_toml_u64("max_vectored_read_size", item)? as usize)
}
"validate_vectored_get" => {
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -1077,8 +1039,6 @@ impl PageServerConf {
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
max_vectored_read_size: defaults::DEFAULT_MAX_VECTORED_READ_SIZE,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
}
}
}
@@ -1313,8 +1273,6 @@ background_task_maximum_delay = '334 s'
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
max_vectored_read_size: defaults::DEFAULT_MAX_VECTORED_READ_SIZE,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
},
"Correct defaults should be used when no config values are provided"
);
@@ -1380,8 +1338,6 @@ background_task_maximum_delay = '334 s'
ingest_batch_size: 100,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
max_vectored_read_size: defaults::DEFAULT_MAX_VECTORED_READ_SIZE,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
},
"Should be able to parse all basic config values correctly"
);
@@ -1616,17 +1572,50 @@ threshold = "20m"
eviction_order: crate::disk_usage_eviction_task::EvictionOrder::AbsoluteAccessed,
})
);
match &conf.default_tenant_conf.eviction_policy {
EvictionPolicy::NoEviction => panic!("Unexpected eviction opolicy tenant settings"),
EvictionPolicy::LayerAccessThreshold(eviction_thresold) => {
assert_eq!(eviction_thresold.period, Duration::from_secs(20 * 60));
assert_eq!(eviction_thresold.threshold, Duration::from_secs(20 * 60));
EvictionPolicy::LayerAccessThreshold(eviction_threshold) => {
assert_eq!(eviction_threshold.period, Duration::from_secs(20 * 60));
assert_eq!(eviction_threshold.threshold, Duration::from_secs(20 * 60));
}
other => unreachable!("Unexpected eviction policy tenant settings: {other:?}"),
}
Ok(())
}
#[test]
fn parse_imitation_only_pageserver_config() {
let tempdir = tempdir().unwrap();
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir).unwrap();
let pageserver_conf_toml = format!(
r#"pg_distrib_dir = "{pg_distrib_dir}"
metric_collection_endpoint = "http://sample.url"
metric_collection_interval = "10min"
id = 222
[tenant_config]
evictions_low_residence_duration_metric_threshold = "20m"
[tenant_config.eviction_policy]
kind = "OnlyImitiate"
period = "20m"
threshold = "20m"
"#,
);
let toml: Document = pageserver_conf_toml.parse().unwrap();
let conf = PageServerConf::parse_and_validate(&toml, &workdir).unwrap();
match &conf.default_tenant_conf.eviction_policy {
EvictionPolicy::OnlyImitiate(t) => {
assert_eq!(t.period, Duration::from_secs(20 * 60));
assert_eq!(t.threshold, Duration::from_secs(20 * 60));
}
other => unreachable!("Unexpected eviction policy tenant settings: {other:?}"),
}
}
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
let tempdir_path = tempdir.path();

View File

@@ -17,8 +17,6 @@ use futures::stream::FuturesUnordered;
use futures::Stream;
use futures::StreamExt;
use pageserver_api::key::Key;
use pageserver_api::models::PagestreamGetVectoredPagesRequest;
use pageserver_api::models::PagestreamGetVectoredPagesResponse;
use pageserver_api::models::TenantState;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
@@ -73,7 +71,6 @@ use crate::tenant::mgr;
use crate::tenant::mgr::get_active_tenant_with_timeout;
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::mgr::ShardSelector;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::timeline::WaitLsnError;
use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
@@ -337,10 +334,6 @@ enum PageStreamError {
#[error("Read error")]
Read(#[source] PageReconstructError),
/// Something went wrong reading a page: this likely indicates a pageserver bug
#[error("Vectored read error")]
VectoredRead(#[source] GetVectoredError),
/// Ran out of time waiting for an LSN
#[error("LSN timeout: {0}")]
LsnTimeout(WaitLsnError),
@@ -364,15 +357,6 @@ impl From<PageReconstructError> for PageStreamError {
}
}
impl From<GetVectoredError> for PageStreamError {
fn from(value: GetVectoredError) -> Self {
match value {
GetVectoredError::Cancelled => Self::Shutdown,
e => Self::VectoredRead(e),
}
}
}
impl From<GetActiveTimelineError> for PageStreamError {
fn from(value: GetActiveTimelineError) -> Self {
match value {
@@ -682,15 +666,6 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::GetVectoredPages(req) => {
let span = tracing::info_span!("handle_get_vectored_pages_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn, req_count = %req.count);
(
self.handle_get_pages_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
};
match response {
@@ -1186,80 +1161,6 @@ impl PageServerHandler {
}))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_pages_at_lsn_request(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetVectoredPagesRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
// This is cheeky and relies on not using sharding :)
// A real solution has to split the requested key sequence between shards.
let get_page_request = PagestreamGetPageRequest {
latest: req.latest,
lsn: req.lsn,
rel: req.rel,
blkno: req.blkno,
};
let timeline = match self.get_cached_timeline_for_page(&get_page_request) {
Ok(tl) => tl,
Err(key) => {
match self
.load_timeline_for_page(tenant_id, timeline_id, key)
.await
{
Ok(t) => t,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
}
}
};
// load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't
set_tracing_field_shard_id(timeline);
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let (page_count, pages_buf) = timeline
.get_rel_pages_at_lsn(
req.rel,
req.blkno,
req.count,
Version::Lsn(lsn),
req.latest,
ctx,
)
.await?;
Ok(PagestreamBeMessage::GetVectoredPages(
PagestreamGetVectoredPagesResponse {
page_count,
pages: pages_buf,
},
))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_slru_segment_request(
&mut self,

View File

@@ -11,9 +11,8 @@ use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::repository::*;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::tenant::timeline::GetVectoredError;
use crate::walrecord::NeonWalRecord;
use anyhow::{anyhow, ensure, Context};
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
@@ -28,7 +27,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::collections::{hash_map, HashMap, HashSet};
use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
@@ -199,41 +198,6 @@ impl Timeline {
version.get(self, key, ctx).await
}
pub(crate) async fn get_rel_pages_at_lsn(
&self,
tag: RelTag,
blknum: BlockNumber,
count: u8,
version: Version<'_>,
latest: bool,
ctx: &RequestContext,
) -> Result<(u8, Bytes), GetVectoredError> {
if tag.relnode == 0 {
return Err(GetVectoredError::Other(
RelationError::InvalidRelnode.into(),
));
}
let nblocks = self
.get_rel_size(tag, version, latest, ctx)
.await
.map_err(|e| GetVectoredError::Other(anyhow!(e)))?;
if blknum + (count - 1) as u32 >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
return Ok((1, ZERO_PAGE.clone()));
}
let start_key = rel_block_to_key(tag, blknum);
let end_key = start_key.add(count as u32);
version.get_vectored(self, start_key..end_key, ctx).await
}
// Get size of a database in blocks
pub(crate) async fn get_db_size(
&self,
@@ -1645,55 +1609,6 @@ impl<'a> DatadirModification<'a> {
self.tline.get(key, lsn, ctx).await
}
async fn get_vectored(
&self,
key_range: Range<Key>,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
// Have we already updated the same key? Read the latest pending updated
// version in that case.
//
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let mut keys_in_modification = KeySpaceAccum::new();
let key = key_range.start;
while key != key_range.end {
if let Some(values) = self.pending_updates.get(&key) {
if let Some((_, value)) = values.last() {
keys_in_modification.add_key(key);
match value {
Value::Image(img) => {
results.insert(key, Ok(img.clone()));
}
_ => {
results.insert(
key,
Err(PageReconstructError::from(anyhow::anyhow!(
"unexpected pending WAL record"
))),
);
}
}
}
}
}
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
let mut keyspace = KeySpace {
ranges: vec![key_range],
};
keyspace.remove_overlapping_with(&keys_in_modification.to_keyspace());
let pages = self.tline.get_vectored(keyspace, lsn, ctx).await?;
results.extend(pages.into_iter());
Ok(results)
}
fn put(&mut self, key: Key, val: Value) {
let values = self.pending_updates.entry(key).or_default();
// Replace the previous value if it exists at the same lsn
@@ -1737,43 +1652,6 @@ impl<'a> Version<'a> {
}
}
async fn get_vectored(
&self,
timeline: &Timeline,
key_range: Range<Key>,
ctx: &RequestContext,
) -> Result<(u8, Bytes), GetVectoredError> {
let pages = match self {
Version::Lsn(lsn) => {
timeline
.get_vectored(
KeySpace {
ranges: vec![key_range],
},
*lsn,
ctx,
)
.await
}
Version::Modified(modification) => modification.get_vectored(key_range, ctx).await,
}?;
let mut buf = BytesMut::new();
let page_count: u8 = pages.len().try_into().expect("too many pages returned");
for page in pages {
match page {
(_key, Ok(bytes)) => {
buf.extend_from_slice(&bytes[..]);
}
(_key, Err(err)) => {
return Err(GetVectoredError::Other(anyhow!(err)));
}
}
}
Ok((page_count, buf.freeze()))
}
fn get_lsn(&self) -> Lsn {
match self {
Version::Lsn(lsn) => *lsn,

View File

@@ -146,7 +146,6 @@ macro_rules! pausable_failpoint {
pub mod blob_io;
pub mod block_io;
pub mod vectored_blob_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;

View File

@@ -61,6 +61,8 @@ use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::LayerKey;
pub(crate) use self::historic_layer_coverage::RebuildVersion;
use super::storage_layer::PersistentLayerDesc;
///
@@ -500,7 +502,7 @@ impl LayerMap {
///
/// Helper function for BatchedUpdates::remove_historic
///
pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
pub(self) fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
self.historic
.remove(historic_layer_coverage::LayerKey::from(layer_desc));
let layer_key = layer_desc.key();
@@ -525,6 +527,10 @@ impl LayerMap {
self.historic.rebuild();
}
pub fn get_rebuild_version(&self) -> RebuildVersion {
self.historic.get_rebuild_version()
}
/// Is there a newer image layer for given key- and LSN-range? Or a set
/// of image layers within the specified lsn range that cover the entire
/// specified key range?

View File

@@ -413,6 +413,8 @@ fn test_persistent_overlapping() {
/// See this for more on persistent and retroactive techniques:
/// <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
pub struct BufferedHistoricLayerCoverage<Value> {
rebuild_version: RebuildVersion,
/// A persistent layer map that we rebuild when we need to retroactively update
historic_coverage: HistoricLayerCoverage<Value>,
@@ -438,9 +440,33 @@ impl<T: Clone> Default for BufferedHistoricLayerCoverage<T> {
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct RebuildVersion(u64);
impl RebuildVersion {
fn inc(&mut self) {
self.0
.checked_add(1)
.expect("at current clock cycles, we won't hit this");
}
}
impl Default for RebuildVersion {
fn default() -> Self {
RebuildVersion(1)
}
}
impl std::fmt::Display for RebuildVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
pub fn new() -> Self {
Self {
rebuild_version: RebuildVersion::default(),
historic_coverage: HistoricLayerCoverage::<Value>::new(),
buffer: BTreeMap::new(),
layers: BTreeMap::new(),
@@ -462,6 +488,8 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
None => return, // No need to rebuild if buffer is empty
};
self.rebuild_version.inc();
// Apply buffered updates to self.layers
let num_updates = self.buffer.len();
self.buffer.retain(|layer_key, layer| {
@@ -493,11 +521,17 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
// TODO maybe only warn if ratio is at least 10
info!(
version = %self.rebuild_version,
"Rebuilt layer map. Did {} insertions to process a batch of {} updates.",
num_inserted, num_updates,
num_inserted,
num_updates,
)
}
pub fn get_rebuild_version(&self) -> RebuildVersion {
self.rebuild_version
}
/// Iterate all the layers
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
// NOTE we can actually perform this without rebuilding,

View File

@@ -88,14 +88,7 @@ pub async fn download_layer_file<'a>(
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file)
.await
.with_context(|| {
format!(
"download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
)
})
.map_err(DownloadError::Other);
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file).await;
match bytes_amount {
Ok(bytes_amount) => {
@@ -107,7 +100,7 @@ pub async fn download_layer_file<'a>(
on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
}
Err(e)
Err(e.into())
}
}
},
@@ -245,10 +238,7 @@ async fn do_download_index_part(
let stream = download.download_stream;
let mut stream = StreamReader::new(stream);
tokio::io::copy_buf(&mut stream, &mut bytes)
.await
.with_context(|| format!("download index part at {remote_path:?}"))
.map_err(DownloadError::Other)?;
tokio::io::copy_buf(&mut stream, &mut bytes).await?;
Ok(bytes)
},
@@ -428,14 +418,7 @@ pub(crate) async fn download_initdb_tar_zst(
let mut download = tokio_util::io::StreamReader::new(download.download_stream);
let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
// TODO: this consumption of the response body should be subject to timeout + cancellation, but
// not without thinking carefully about how to recover safely from cancelling a write to
// local storage (e.g. by writing into a temp file as we do in download_layer)
// FIXME: flip the weird error wrapping
tokio::io::copy_buf(&mut download, &mut writer)
.await
.with_context(|| format!("download initdb.tar.zst at {remote_path:?}"))
.map_err(DownloadError::Other)?;
tokio::io::copy_buf(&mut download, &mut writer).await?;
let mut file = writer.into_inner();

View File

@@ -438,8 +438,14 @@ impl From<std::io::Error> for UpdateError {
fn from(value: std::io::Error) -> Self {
if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
UpdateError::NoSpace
} else if value
.get_ref()
.and_then(|x| x.downcast_ref::<DownloadError>())
.is_some()
{
UpdateError::from(DownloadError::from(value))
} else {
// An I/O error from e.g. tokio::io::copy is most likely a remote storage issue
// An I/O error from e.g. tokio::io::copy_buf is most likely a remote storage issue
UpdateError::Other(anyhow::anyhow!(value))
}
}
@@ -672,20 +678,17 @@ impl<'a> TenantDownloader<'a> {
.await
{
Ok(bytes) => bytes,
Err(e) => {
if let DownloadError::NotFound = e {
// A heatmap might be out of date and refer to a layer that doesn't exist any more.
// This is harmless: continue to download the next layer. It is expected during compaction
// GC.
tracing::debug!(
"Skipped downloading missing layer {}, raced with compaction/gc?",
layer.name
);
continue;
} else {
return Err(e.into());
}
Err(DownloadError::NotFound) => {
// A heatmap might be out of date and refer to a layer that doesn't exist any more.
// This is harmless: continue to download the next layer. It is expected during compaction
// GC.
tracing::debug!(
"Skipped downloading missing layer {}, raced with compaction/gc?",
layer.name
);
continue;
}
Err(e) => return Err(e.into()),
};
if downloaded_bytes != layer.metadata.file_size {

View File

@@ -329,20 +329,10 @@ impl ReadableLayerDesc {
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
match self {
ReadableLayerDesc::Persistent {
desc,
lsn_floor,
lsn_ceil,
} => {
ReadableLayerDesc::Persistent { desc, lsn_ceil, .. } => {
let layer = layer_manager.get_from_desc(desc);
layer
.get_values_reconstruct_data(
keyspace,
*lsn_floor,
*lsn_ceil,
reconstruct_state,
ctx,
)
.get_values_reconstruct_data(keyspace, *lsn_ceil, reconstruct_state, ctx)
.await
}
ReadableLayerDesc::InMemory { handle, lsn_ceil } => {

View File

@@ -36,21 +36,18 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
@@ -66,7 +63,8 @@ use utils::{
};
use super::{
AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValueReconstructSituation,
ValuesReconstructState,
};
///
@@ -215,7 +213,6 @@ pub struct DeltaLayerInner {
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
vectored_blob_reader: VectoredBlobReader,
/// Reader object for reading blocks from the file.
file: FileBlockReader,
@@ -251,7 +248,7 @@ impl DeltaLayer {
return Ok(());
}
let inner = self.load(LayerAccessKind::Dump, 0, ctx).await?;
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
inner.dump(ctx).await
}
@@ -287,25 +284,20 @@ impl DeltaLayer {
async fn load(
&self,
access_kind: LayerAccessKind,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<&Arc<DeltaLayerInner>> {
self.access_stats.record_access(access_kind, ctx);
// Quick exit if already loaded
self.inner
.get_or_try_init(|| self.load_inner(max_vectored_read_size, ctx))
.get_or_try_init(|| self.load_inner(ctx))
.await
.with_context(|| format!("Failed to load delta layer {}", self.path()))
}
async fn load_inner(
&self,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<Arc<DeltaLayerInner>> {
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
let path = self.path();
let loaded = DeltaLayerInner::load(&path, None, max_vectored_read_size, ctx)
let loaded = DeltaLayerInner::load(&path, None, ctx)
.await
.and_then(|res| res)?;
@@ -706,16 +698,15 @@ impl DeltaLayerInner {
pub(super) async fn load(
path: &Utf8Path,
summary: Option<Summary>,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let block_reader = FileBlockReader::new(file);
let file = FileBlockReader::new(file);
let summary_blk = match block_reader.read_blk(0, ctx).await {
let summary_blk = match file.read_blk(0, ctx).await {
Ok(blk) => blk,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
};
@@ -737,16 +728,8 @@ impl DeltaLayerInner {
}
}
// TODO: don't open file twice
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let vectored_blob_reader = VectoredBlobReader::new(file, max_vectored_read_size);
Ok(Ok(DeltaLayerInner {
file: block_reader,
vectored_blob_reader,
file,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
}))
@@ -851,31 +834,10 @@ impl DeltaLayerInner {
pub(super) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
start_lsn: Lsn,
end_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let reads = self
.plan_reads(keyspace, start_lsn..end_lsn, reconstruct_state, ctx)
.await
.map_err(GetVectoredError::Other)?;
self.do_reads_and_update_state(reads, reconstruct_state)
.await;
Ok(())
}
async fn plan_reads(
&self,
keyspace: KeySpace,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<Vec<VectoredRead>> {
let mut planner = VectoredReadPlanner::new(self.vectored_blob_reader.get_max_read_size());
let file = &self.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
@@ -883,130 +845,110 @@ impl DeltaLayerInner {
file,
);
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;
let mut offsets: BTreeMap<Key, Vec<(Lsn, u64)>> = BTreeMap::new();
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
for range in keyspace.ranges.iter() {
let mut ignore_key = None;
// Scan the page versions backwards, starting from the last key in the range.
// to collect all the offsets at which need to be read.
let end_key = DeltaKey::from_key_lsn(&range.end, Lsn(end_lsn.0 - 1));
tree_reader
.visit(
&start_key.0,
VisitDirection::Forwards,
&end_key.0,
VisitDirection::Backwards,
|raw_key, value| {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
let lsn = DeltaKey::extract_lsn_from_buf(raw_key);
let blob_ref = BlobRef(value);
let entry_lsn = DeltaKey::extract_lsn_from_buf(raw_key);
assert!(key >= range.start && lsn >= lsn_range.start);
let cached_lsn = reconstruct_state.get_cached_lsn(&key);
let flag = {
if cached_lsn >= Some(lsn) {
BlobFlag::Ignore
} else if blob_ref.will_init() {
BlobFlag::Replaces
} else {
BlobFlag::None
}
};
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
planner.handle_range_end(blob_ref.pos());
range_end_handled = true;
false
} else {
planner.handle(key, lsn, blob_ref.pos(), flag);
true
if entry_lsn >= end_lsn {
return true;
}
if key < range.start {
return false;
}
if key >= range.end {
return true;
}
if Some(key) == ignore_key {
return true;
}
if let Some(cached_lsn) = reconstruct_state.get_cached_lsn(&key) {
if entry_lsn <= cached_lsn {
return key != range.start;
}
}
let blob_ref = BlobRef(value);
let lsns_at = offsets.entry(key).or_default();
lsns_at.push((entry_lsn, blob_ref.pos()));
if blob_ref.will_init() {
if key == range.start {
return false;
} else {
ignore_key = Some(key);
return true;
}
}
true
},
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(),
)
.await
.map_err(|err| anyhow!(err))?;
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
}
if !range_end_handled {
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
tracing::info!("Handling range end fallback at {}", payload_end);
planner.handle_range_end(payload_end);
let ctx = &RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerValue)
.build();
let cursor = file.block_cursor();
let mut buf = Vec::new();
for (key, lsns_at) in offsets {
for (lsn, block_offset) in lsns_at {
let res = cursor.read_blob_into_buf(block_offset, &mut buf, ctx).await;
if let Err(e) = res {
reconstruct_state.on_key_error(
key,
PageReconstructError::from(anyhow!(e).context(format!(
"Failed to read blob from virtual file {}",
file.file.path
))),
);
break;
}
let value = Value::des(&buf);
if let Err(e) = value {
reconstruct_state.on_key_error(
key,
PageReconstructError::from(anyhow!(e).context(format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path
))),
);
break;
}
let key_situation = reconstruct_state.update_key(&key, lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
break;
}
}
}
Ok(planner.finish())
}
async fn do_reads_and_update_state(
&self,
reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState,
) {
let mut ignore_key_with_err = None;
let mut buf = Some(BytesMut::with_capacity(
self.vectored_blob_reader.get_max_read_size(),
));
for read in reads.into_iter().rev() {
let res = self
.vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"))
.await;
let blobs_buf = match res {
Ok(blobs_buf) => blobs_buf,
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.vectored_blob_reader.get_file_ref().path,
kind
)),
);
}
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(
self.vectored_blob_reader.get_max_read_size(),
));
continue;
}
};
for meta in blobs_buf.blobs.iter().rev() {
if Some(meta.meta.key) == ignore_key_with_err {
continue;
}
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
let value = match value {
Ok(v) => v,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::from(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.vectored_blob_reader.get_file_ref().path,
))),
);
ignore_key_with_err = Some(meta.meta.key);
continue;
}
};
// Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
// state, no further updates shall be made to it. The call below will
// panic if the invariant is violated.
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
}
buf = Some(blobs_buf.buf);
}
Ok(())
}
pub(super) async fn load_keys<'a>(

View File

@@ -34,14 +34,11 @@ use crate::tenant::storage_layer::{
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use pageserver_api::keyspace::KeySpace;
@@ -157,7 +154,6 @@ pub struct ImageLayerInner {
/// Reader object for reading blocks from the file.
file: FileBlockReader,
vectored_blob_reader: VectoredBlobReader,
}
impl std::fmt::Debug for ImageLayerInner {
@@ -214,7 +210,7 @@ impl ImageLayer {
return Ok(());
}
let inner = self.load(LayerAccessKind::Dump, 0, ctx).await?;
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
inner.dump(ctx).await?;
@@ -244,32 +240,21 @@ impl ImageLayer {
async fn load(
&self,
access_kind: LayerAccessKind,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<&ImageLayerInner> {
self.access_stats.record_access(access_kind, ctx);
self.inner
.get_or_try_init(|| self.load_inner(max_vectored_read_size, ctx))
.get_or_try_init(|| self.load_inner(ctx))
.await
.with_context(|| format!("Failed to load image layer {}", self.path()))
}
async fn load_inner(
&self,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<ImageLayerInner> {
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
let path = self.path();
let loaded = ImageLayerInner::load(
&path,
self.desc.image_layer_lsn(),
None,
max_vectored_read_size,
ctx,
)
.await
.and_then(|res| res)?;
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx)
.await
.and_then(|res| res)?;
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
@@ -376,15 +361,14 @@ impl ImageLayerInner {
path: &Utf8Path,
lsn: Lsn,
summary: Option<Summary>,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let block_reader = FileBlockReader::new(file);
let summary_blk = match block_reader.read_blk(0, ctx).await {
let file = FileBlockReader::new(file);
let summary_blk = match file.read_blk(0, ctx).await {
Ok(blk) => blk,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
};
@@ -410,19 +394,11 @@ impl ImageLayerInner {
}
}
// TODO: don't open file twice
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let vectored_blob_reader = VectoredBlobReader::new(file, max_vectored_read_size);
Ok(Ok(ImageLayerInner {
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
lsn,
file: block_reader,
vectored_blob_reader,
file,
}))
}
@@ -473,30 +449,12 @@ impl ImageLayerInner {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let reads = self
.plan_reads(keyspace, ctx)
.await
.map_err(GetVectoredError::Other)?;
self.do_reads_and_update_state(reads, reconstruct_state)
.await;
Ok(())
}
async fn plan_reads(
&self,
keyspace: KeySpace,
ctx: &RequestContext,
) -> anyhow::Result<Vec<VectoredRead>> {
let mut planner = VectoredReadPlanner::new(self.vectored_blob_reader.get_max_read_size());
let file = &self.file;
let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;
let mut offsets = Vec::new();
for range in keyspace.ranges.iter() {
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
range.start.write_to_byte_slice(&mut search_key);
@@ -504,18 +462,17 @@ impl ImageLayerInner {
.visit(
&search_key,
VisitDirection::Forwards,
|raw_key, offset| {
|raw_key, value| {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
assert!(key >= range.start);
if key >= range.end {
planner.handle_range_end(offset);
range_end_handled = true;
false
} else {
planner.handle(key, self.lsn, offset, BlobFlag::None);
true
if !range.contains(&key) {
return false;
}
offsets.push((key, value));
true
},
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
@@ -523,64 +480,33 @@ impl ImageLayerInner {
)
.await
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
}
if !range_end_handled {
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
planner.handle_range_end(payload_end);
let ctx = &RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerValue)
.build();
let cursor = file.block_cursor();
let mut buf = Vec::new();
for (key, offset) in offsets {
let res = cursor.read_blob_into_buf(offset, &mut buf, ctx).await;
if let Err(e) = res {
reconstruct_state.on_key_error(
key,
PageReconstructError::from(anyhow!(e).context(format!(
"Failed to read blob from virtual file {}",
file.file.path
))),
);
continue;
}
let blob = Bytes::copy_from_slice(buf.as_slice());
reconstruct_state.update_key(&key, self.lsn, Value::Image(blob));
}
Ok(planner.finish())
}
async fn do_reads_and_update_state(
&self,
reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState,
) {
let mut buf = Some(BytesMut::with_capacity(
self.vectored_blob_reader.get_max_read_size(),
));
for read in reads.into_iter().rev() {
let res = self
.vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"))
.await;
match res {
Ok(blobs_buf) => {
for meta in blobs_buf.blobs.iter().rev() {
let img_buf = Bytes::copy_from_slice(&blobs_buf.buf[meta.start..meta.end]);
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
);
}
buf = Some(blobs_buf.buf);
}
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.vectored_blob_reader.get_file_ref().path,
kind
)),
);
}
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(
self.vectored_blob_reader.get_max_read_size(),
));
}
};
}
Ok(())
}
}

View File

@@ -267,7 +267,6 @@ impl Layer {
pub(crate) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
start_lsn: Lsn,
end_lsn: Lsn,
reconstruct_data: &mut ValuesReconstructState,
ctx: &RequestContext,
@@ -283,14 +282,7 @@ impl Layer {
.record_access(LayerAccessKind::GetValueReconstructData, ctx);
layer
.get_values_reconstruct_data(
keyspace,
start_lsn,
end_lsn,
reconstruct_data,
&self.0,
ctx,
)
.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, &self.0, ctx)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
.await
}
@@ -1307,14 +1299,9 @@ impl DownloadedLayer {
owner.desc.key_range.clone(),
owner.desc.lsn_range.clone(),
));
delta_layer::DeltaLayerInner::load(
&owner.path,
summary,
owner.conf.max_vectored_read_size,
ctx,
)
.await
.map(|res| res.map(LayerKind::Delta))
delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx)
.await
.map(|res| res.map(LayerKind::Delta))
} else {
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
@@ -1323,15 +1310,9 @@ impl DownloadedLayer {
owner.desc.key_range.clone(),
lsn,
));
image_layer::ImageLayerInner::load(
&owner.path,
lsn,
summary,
owner.conf.max_vectored_read_size,
ctx,
)
.await
.map(|res| res.map(LayerKind::Image))
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx)
.await
.map(|res| res.map(LayerKind::Image))
};
match res {
@@ -1384,7 +1365,6 @@ impl DownloadedLayer {
async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
start_lsn: Lsn,
end_lsn: Lsn,
reconstruct_data: &mut ValuesReconstructState,
owner: &Arc<LayerInner>,
@@ -1394,7 +1374,7 @@ impl DownloadedLayer {
match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
Delta(d) => {
d.get_values_reconstruct_data(keyspace, start_lsn, end_lsn, reconstruct_data, ctx)
d.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, ctx)
.await
}
Image(i) => {

View File

@@ -111,10 +111,10 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{config::TenantConf, storage_layer::ReadableLayerDesc};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{layer_map, remote_timeline_client::RemoteTimelineClient};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -210,6 +210,10 @@ pub struct Timeline {
/// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
/// State that [`Self::create_image_layers`] keeps across invocations to determine if it can
/// skip an invocation becaucse nothing changed.
create_image_layers_skipper_state: std::sync::Mutex<Option<CreateImageLayersParams>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
/// It is used by compaction task when it checks if new image layer should be created.
@@ -302,8 +306,9 @@ pub struct Timeline {
// though let's keep them both for better error visibility.
pub initdb_lsn: Lsn,
/// When did we last calculate the partitioning?
partitioning: Mutex<(KeyPartitioning, Lsn)>,
/// Used by [`Timline::repartition`] to avoid re-computing partitioning on every iteration.
/// Must bump [`CompactionKeyspacePartitioning::version`] when changing.
partitioning: tokio::sync::Mutex<CompactionKeyspacePartitioning>,
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
@@ -400,6 +405,57 @@ pub struct GcInfo {
pub pitr_cutoff: Lsn,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct CreateImageLayersParams {
/// From [`layer_map::LayerMap::get_rebuild_version`].
layer_map_version: layer_map::RebuildVersion,
/// From [`Timeline::partitioning`].
partitioning_version: CompactionKeyspacePartitioningVersion,
}
/// The version of the keyspace partitioning maintained in [`Timeline::partitioning`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct CompactionKeyspacePartitioningVersion(u64);
impl CompactionKeyspacePartitioningVersion {
fn inc(&mut self) {
self.0
.checked_add(1)
.expect("at current clock cycles, we won't hit this");
}
}
#[derive(Clone)]
struct CompactionKeyspacePartitioning {
version: CompactionKeyspacePartitioningVersion,
partitioning: KeyPartitioning,
lsn: Lsn,
}
struct CompactionKeyspacePartition<'a> {
key_space: &'a KeySpace,
lsn: Lsn,
}
impl CompactionKeyspacePartitioning {
pub fn update(&mut self, partitioning: KeyPartitioning, lsn: Lsn) {
assert!(self.lsn <= lsn);
self.version.inc();
self.partitioning = partitioning;
self.lsn = lsn;
}
pub fn iter(&self) -> impl Iterator<Item = CompactionKeyspacePartition<'_>> {
self.partitioning
.parts
.iter()
.map(|key_space| CompactionKeyspacePartition {
key_space,
lsn: self.lsn,
})
}
}
/// An error happened in a get() operation.
#[derive(thiserror::Error, Debug)]
pub(crate) enum PageReconstructError {
@@ -778,10 +834,8 @@ impl Timeline {
GetVectoredImpl::Vectored => {
let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
}
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
vectored_res
}
@@ -1156,7 +1210,7 @@ impl Timeline {
)
.await
{
Ok((partitioning, lsn)) => {
Ok(partitioning) => {
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
let image_ctx = RequestContextBuilder::extend(ctx)
.access_stats_behavior(AccessStatsBehavior::Skip)
@@ -1170,7 +1224,7 @@ impl Timeline {
// 3. Create new image layers for partitions that have been modified
// "enough".
let layers = self
.create_image_layers(&partitioning, lsn, false, &image_ctx)
.create_image_layers(&partitioning, false, &image_ctx)
.await
.map_err(anyhow::Error::from)?;
if let Some(remote_client) = &self.remote_client {
@@ -1661,8 +1715,15 @@ impl Timeline {
// initial logical size is 0.
LogicalSize::empty_initial()
},
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
partitioning: tokio::sync::Mutex::new(CompactionKeyspacePartitioning {
// The other fields don't matter because this is 0 and hence repartition(),
// will calculate a new one
lsn: Lsn(0),
version: CompactionKeyspacePartitioningVersion(0),
partitioning: KeyPartitioning::new(),
}),
repartition_threshold: 0,
create_image_layers_skipper_state: std::sync::Mutex::new(None),
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(HashMap::new()),
@@ -3153,7 +3214,7 @@ impl Timeline {
}
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
// require downloading anything during initial import.
let (partitioning, _lsn) = self
let partitioning = self
.repartition(
self.initdb_lsn,
self.get_compaction_target_size(),
@@ -3168,8 +3229,7 @@ impl Timeline {
// For image layers, we add them immediately into the layer map.
(
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
.await?,
self.create_image_layers(&partitioning, true, ctx).await?,
None,
)
} else {
@@ -3407,36 +3467,46 @@ impl Timeline {
partition_size: u64,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
{
let partitioning_guard = self.partitioning.lock().unwrap();
let distance = lsn.0 - partitioning_guard.1 .0;
if partitioning_guard.1 != Lsn(0)
&& distance <= self.repartition_threshold
&& !flags.contains(CompactFlags::ForceRepartition)
{
debug!(
distance,
threshold = self.repartition_threshold,
"no repartitioning needed"
);
return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
}
) -> anyhow::Result<CompactionKeyspacePartitioning> {
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timelien.
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
// and hence before the compaction task starts.
anyhow::bail!("repartition() called concurrently, this should not happen");
};
if lsn < partitioning_guard.lsn {
anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
}
let distance = lsn.0 - partitioning_guard.lsn.0;
if partitioning_guard.lsn != Lsn(0)
&& distance <= self.repartition_threshold
&& !flags.contains(CompactFlags::ForceRepartition)
{
debug!(
distance,
threshold = self.repartition_threshold,
"no repartitioning needed"
);
return Ok(partitioning_guard.clone());
}
let keyspace = self.collect_keyspace(lsn, ctx).await?;
let partitioning = keyspace.partition(partition_size);
let mut partitioning_guard = self.partitioning.lock().unwrap();
if lsn > partitioning_guard.1 {
*partitioning_guard = (partitioning, lsn);
} else {
warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
}
Ok((partitioning_guard.0.clone(), partitioning_guard.1))
partitioning_guard.update(partitioning, lsn);
Ok(partitioning_guard.clone())
}
// Is it time to create a new image layer for the given partition?
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
async fn time_for_new_image_layer(
&self,
&CompactionKeyspacePartition {
key_space: partition,
lsn,
}: &CompactionKeyspacePartition<'_>,
) -> bool {
let threshold = self.get_image_creation_threshold();
let guard = self.layers.read().await;
@@ -3511,14 +3581,14 @@ impl Timeline {
false
}
#[tracing::instrument(skip_all, fields(%lsn, %force))]
#[tracing::instrument(skip_all, fields(lsn=%partitioning.lsn, %force))]
async fn create_image_layers(
self: &Arc<Timeline>,
partitioning: &KeyPartitioning,
lsn: Lsn,
partitioning: &CompactionKeyspacePartitioning,
force: bool,
ctx: &RequestContext,
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
// REVIEW: should we not even bump this timer if we're taking short exit?
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers = Vec::new();
@@ -3533,9 +3603,30 @@ impl Timeline {
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
let mut start = Key::MIN;
for partition in partitioning.parts.iter() {
let img_range = start..partition.ranges.last().unwrap().end;
if !force && !self.time_for_new_image_layer(partition, lsn).await {
let new_inputs = {
let layer_manager = self.layers.read().await;
let layer_map = layer_manager.layer_map();
CreateImageLayersParams {
layer_map_version: layer_map.get_rebuild_version(),
partitioning_version: partitioning.version,
}
};
let last_inputs = {
// bail out early if nothing changed
let last = self.create_image_layers_skipper_state.lock().unwrap();
match (force, &*last, &new_inputs) {
(false, Some(last), new_inputs) if last == new_inputs => {
debug!(?last, ?new_inputs, "inputs to image layer creation algorithm have not changed since last invocation");
return Ok(image_layers);
}
_ => (), // we'll update .last once we finish with success
}
*last
};
for partition in partitioning.iter() {
let img_range = start..partition.key_space.ranges.last().unwrap().end;
if !force && !self.time_for_new_image_layer(&partition).await {
start = img_range.end;
continue;
}
@@ -3545,7 +3636,7 @@ impl Timeline {
self.timeline_id,
self.tenant_shard_id,
&img_range,
lsn,
partitioning.lsn,
)
.await?;
@@ -3558,7 +3649,7 @@ impl Timeline {
let mut wrote_keys = false;
let mut key_request_accum = KeySpaceAccum::new();
for range in &partition.ranges {
for range in &partition.key_space.ranges {
let mut key = range.start;
while key < range.end {
// Decide whether to retain this key: usually we do, but sharded tenants may
@@ -3582,7 +3673,11 @@ impl Timeline {
|| last_key_in_range
{
let results = self
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
.get_vectored(
key_request_accum.consume_keyspace(),
partitioning.lsn,
ctx,
)
.await?;
for (img_key, img) in results {
@@ -3671,6 +3766,25 @@ impl Timeline {
.context("fsync of timeline dir")?;
}
// Remember the inputs so that we take the early exit next compaction iteration
// if nothing changed in the meantime.
// NB: since we created `new_inputs`, the layer map might have changed and hence
// `get_rebuild_version()` might have advanced already, e.g., by a
// concurrent garbage collection iteration that removed layers. That's ok, next
// `new_inputs` will observe an increased `get_rebuild_version()` and run again.
// Same goes for the modifications that we're doing below: if `image_layers` is
// not empty, we'll insert them into the layer map in the code below, which
// will bump `get_rebuild_version()`, which will make us re-run once more.
//
{
let mut state = self.create_image_layers_skipper_state.lock().unwrap();
if *state != last_inputs {
warn!(?last_inputs, ?new_inputs, observed=?*state, "unexpected: create_image_layers called concurrently? not caching inputs");
} else {
*state = Some(new_inputs);
}
}
let mut guard = self.layers.write().await;
// FIXME: we could add the images to be uploaded *before* returning from here, but right

View File

@@ -85,6 +85,7 @@ impl Timeline {
let policy = self.get_eviction_policy();
let period = match policy {
EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
EvictionPolicy::OnlyImitiate(lat) => lat.period,
EvictionPolicy::NoEviction => Duration::from_secs(10),
};
if random_init_delay(period, &cancel).await.is_err() {
@@ -119,33 +120,45 @@ impl Timeline {
ctx: &RequestContext,
) -> ControlFlow<(), Instant> {
debug!("eviction iteration: {policy:?}");
match policy {
let start = Instant::now();
let (period, threshold) = match policy {
EvictionPolicy::NoEviction => {
// check again in 10 seconds; XXX config watch mechanism
ControlFlow::Continue(Instant::now() + Duration::from_secs(10))
return ControlFlow::Continue(Instant::now() + Duration::from_secs(10));
}
EvictionPolicy::LayerAccessThreshold(p) => {
let start = Instant::now();
match self.eviction_iteration_threshold(p, cancel, ctx).await {
ControlFlow::Break(()) => return ControlFlow::Break(()),
ControlFlow::Continue(()) => (),
}
let elapsed = start.elapsed();
crate::tenant::tasks::warn_when_period_overrun(
elapsed,
p.period,
BackgroundLoopKind::Eviction,
);
crate::metrics::EVICTION_ITERATION_DURATION
.get_metric_with_label_values(&[
&format!("{}", p.period.as_secs()),
&format!("{}", p.threshold.as_secs()),
])
.unwrap()
.observe(elapsed.as_secs_f64());
ControlFlow::Continue(start + p.period)
(p.period, p.threshold)
}
}
EvictionPolicy::OnlyImitiate(p) => {
if self.imitiate_only(p, cancel, ctx).await.is_break() {
return ControlFlow::Break(());
}
(p.period, p.threshold)
}
};
let elapsed = start.elapsed();
crate::tenant::tasks::warn_when_period_overrun(
elapsed,
period,
BackgroundLoopKind::Eviction,
);
// FIXME: if we were to mix policies on a pageserver, we would have no way to sense this. I
// don't think that is a relevant fear however, and regardless the imitation should be the
// most costly part.
crate::metrics::EVICTION_ITERATION_DURATION
.get_metric_with_label_values(&[
&format!("{}", period.as_secs()),
&format!("{}", threshold.as_secs()),
])
.unwrap()
.observe(elapsed.as_secs_f64());
ControlFlow::Continue(start + period)
}
async fn eviction_iteration_threshold(
@@ -167,30 +180,6 @@ impl Timeline {
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
};
// If we evict layers but keep cached values derived from those layers, then
// we face a storm of on-demand downloads after pageserver restart.
// The reason is that the restart empties the caches, and so, the values
// need to be re-computed by accessing layers, which we evicted while the
// caches were filled.
//
// Solutions here would be one of the following:
// 1. Have a persistent cache.
// 2. Count every access to a cached value to the access stats of all layers
// that were accessed to compute the value in the first place.
// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
// get re-computed from layers, thereby counting towards layer access stats.
// 4. Make the eviction task imitate the layer accesses that typically hit caches.
//
// We follow approach (4) here because in Neon prod deployment:
// - page cache is quite small => high churn => low hit rate
// => eviction gets correct access stats
// - value-level caches such as logical size & repatition have a high hit rate,
// especially for inactive tenants
// => eviction sees zero accesses for these
// => they cause the on-demand download storm on pageserver restart
//
// We should probably move to persistent caches in the future, or avoid
// having inactive tenants attached to pageserver in the first place.
match self.imitate_layer_accesses(p, cancel, ctx).await {
ControlFlow::Break(()) => return ControlFlow::Break(()),
ControlFlow::Continue(()) => (),
@@ -307,6 +296,52 @@ impl Timeline {
ControlFlow::Continue(())
}
/// Like `eviction_iteration_threshold`, but without any eviction. Eviction will be done by
/// disk usage based eviction task.
async fn imitiate_only(
self: &Arc<Self>,
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> ControlFlow<()> {
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
BackgroundLoopKind::Eviction,
ctx,
);
let _permit = tokio::select! {
permit = acquire_permit => permit,
_ = cancel.cancelled() => return ControlFlow::Break(()),
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
};
self.imitate_layer_accesses(p, cancel, ctx).await
}
/// If we evict layers but keep cached values derived from those layers, then
/// we face a storm of on-demand downloads after pageserver restart.
/// The reason is that the restart empties the caches, and so, the values
/// need to be re-computed by accessing layers, which we evicted while the
/// caches were filled.
///
/// Solutions here would be one of the following:
/// 1. Have a persistent cache.
/// 2. Count every access to a cached value to the access stats of all layers
/// that were accessed to compute the value in the first place.
/// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
/// get re-computed from layers, thereby counting towards layer access stats.
/// 4. Make the eviction task imitate the layer accesses that typically hit caches.
///
/// We follow approach (4) here because in Neon prod deployment:
/// - page cache is quite small => high churn => low hit rate
/// => eviction gets correct access stats
/// - value-level caches such as logical size & repatition have a high hit rate,
/// especially for inactive tenants
/// => eviction sees zero accesses for these
/// => they cause the on-demand download storm on pageserver restart
///
/// We should probably move to persistent caches in the future, or avoid
/// having inactive tenants attached to pageserver in the first place.
#[instrument(skip_all)]
async fn imitate_layer_accesses(
&self,

View File

@@ -1,412 +0,0 @@
//!
//! Utilities for vectored reading of variable-sized "blobs".
//!
//! The "blob" api is an abstraction on top of the "block" api,
//! with the main difference being that blobs do not have a fixed
//! size (each blob is prefixed with 1 or 4 byte length field)
//!
//! The vectored apis provided in this module allow for planning
//! and executing disk IO which covers multiple blobs.
//!
//! Reads are planned with [`VectoredReadPlanner`] which will coalesce
//! adjacent blocks into a single disk IO request and exectuted by
//! [`VectoredBlobReader`] which does all the required offset juggling
//! and returns a buffer housing all the blobs and a list of offsets.
//!
//! Note that the vectored blob api does *not* go through the page cache.
use std::collections::BTreeMap;
use bytes::BytesMut;
use pageserver_api::key::Key;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;
use crate::virtual_file::VirtualFile;
/// Metadata bundled with the start and end offset of a blob.
#[derive(Copy, Clone, Debug)]
pub struct BlobMeta {
pub key: Key,
pub lsn: Lsn,
}
/// Blob offsets into [`VectoredBlobsBuf::buf`]
pub struct VectoredBlob {
pub start: usize,
pub end: usize,
pub meta: BlobMeta,
}
/// Return type of [`VectoredBlobReader::read_blobs`]
pub struct VectoredBlobsBuf {
/// Buffer for all blobs in this read
pub buf: BytesMut,
/// Offsets into the buffer and metadata for all blobs in this read
pub blobs: Vec<VectoredBlob>,
}
/// Description of one disk read for multiple blobs.
/// Used as the argument form [`VectoredBlobReader::read_blobs`]
#[derive(Debug)]
pub struct VectoredRead {
pub start: u64,
pub end: u64,
/// Starting offsets and metadata for each blob in this read
pub blobs_at: VecMap<u64, BlobMeta>,
max_read_size: usize,
}
#[derive(Eq, PartialEq)]
enum VectoredReadExtended {
Yes,
No,
}
impl VectoredRead {
fn new(start_offset: u64, end_offset: u64, meta: BlobMeta, max_read_size: usize) -> Self {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, meta)
.expect("First insertion always succeeds");
Self {
start: start_offset,
end: end_offset,
blobs_at,
max_read_size,
}
}
/// Attempt to extend the current read with a new blob if the start
/// offset matches with the current end of the vectored read
/// and the resuting size is below the max read size
fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
let size = (end - start) as usize;
if self.end == start && self.size() + size <= self.max_read_size {
self.end = end;
self.blobs_at
.append(start, meta)
.expect("LSNs are ordered within vectored reads");
return VectoredReadExtended::Yes;
}
VectoredReadExtended::No
}
fn size(&self) -> usize {
(self.end - self.start) as usize
}
}
#[derive(Copy, Clone, Debug)]
pub enum BlobFlag {
None,
Ignore,
Replaces,
}
/// Planner for vectored blob reads.
///
/// Blob offsets are received via [`VectoredReadPlanner::handle`]
/// and coalesced into disk reads.
///
/// The implementation is very simple:
/// * Collect all blob offsets in an ordered structure
/// * Iterate over the collected blobs and coalesce them into reads at the end
pub struct VectoredReadPlanner {
// Track all the blob offsets. Start offsets must be ordered.
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
prev: Option<(Key, Lsn, u64, BlobFlag)>,
max_read_size: usize,
}
impl VectoredReadPlanner {
pub fn new(max_read_size: usize) -> Self {
Self {
blobs: BTreeMap::new(),
prev: None,
max_read_size,
}
}
/// Include a new blob in the read plan.
///
/// Notes:
/// * This function should be called for each blob in the desired *inclusive* range.
/// See `DeltaLayerInner::plan_reads` and `ImageLayerInner::plan_reads`.
/// * Calls to this function should be for monotonically continuous (key, lsn) tuples.
///
/// The `flag` argument has two interesting values:
/// * [`BlobFlag::Replaces`]: The blob for this key should replace all existing blobs.
/// This is used for WAL records that `will_init`.
/// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
/// if the blob is cached.
pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
// Implementation note: internally lag behind by one blob such that
// we have a start and end offset when initialising [`VectoredRead`]
let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
None => {
self.prev = Some((key, lsn, offset, flag));
return;
}
Some(prev) => prev,
};
self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
self.prev = Some((key, lsn, offset, flag));
}
pub fn handle_range_end(&mut self, offset: u64) {
if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
}
self.prev = None;
}
fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
match flag {
BlobFlag::None => {
let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.push((lsn, start_offset, end_offset));
}
BlobFlag::Replaces => {
let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.clear();
blobs_for_key.push((lsn, start_offset, end_offset));
}
BlobFlag::Ignore => {}
}
}
pub fn finish(self) -> Vec<VectoredRead> {
let mut current_read: Option<VectoredRead> = None;
let mut reads = Vec::new();
for (key, blobs_for_key) in self.blobs {
for (lsn, start_offset, end_offset) in blobs_for_key {
let extended = match &mut current_read {
Some(read) => read.extend(start_offset, end_offset, BlobMeta { key, lsn }),
None => VectoredReadExtended::No,
};
if extended == VectoredReadExtended::No {
let next_read = VectoredRead::new(
start_offset,
end_offset,
BlobMeta { key, lsn },
self.max_read_size,
);
let prev_read = current_read.replace(next_read);
if let Some(read) = prev_read {
reads.push(read);
}
}
}
}
if let Some(read) = current_read {
reads.push(read);
}
reads
}
}
/// Disk reader for vectored blob spans (does not go through the page cache)
pub struct VectoredBlobReader {
file: VirtualFile,
max_vectored_read_size: usize,
}
impl VectoredBlobReader {
pub fn new(file: VirtualFile, max_vectored_read_size: usize) -> Self {
Self {
file,
max_vectored_read_size,
}
}
pub fn get_max_read_size(&self) -> usize {
self.max_vectored_read_size
}
pub fn get_file_ref(&self) -> &VirtualFile {
&self.file
}
/// Read the requested blobs into the buffer.
///
/// We have to deal with the fact that blobs are not fixed size.
/// Each blob is prefixed by a size header.
///
/// The success return value is a struct which contains the buffer
/// filled from disk and a list of offsets at which each blob lies
/// in the buffer.
pub async fn read_blobs(
&self,
read: &VectoredRead,
buf: BytesMut,
) -> Result<VectoredBlobsBuf, std::io::Error> {
// tracing::info!("read_blobs(read={read:?}, read_size={})", read.size());
assert!(read.size() > 0);
assert!(
read.size() <= buf.capacity(),
"{} > {}",
read.size(),
buf.capacity()
);
let buf = self
.file
.read_exact_at_n(buf, read.start, read.size())
.await?;
let blobs_at = read.blobs_at.as_slice();
let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
let mut metas = Vec::new();
let pairs = blobs_at.iter().zip(
blobs_at
.iter()
.map(Some)
.skip(1)
.chain(std::iter::once(None)),
);
for ((offset, meta), next) in pairs {
let offset_in_buf = offset - start_offset;
let first_len_byte = buf[offset_in_buf as usize];
// Each blob is prefixed by a header containing it's size.
// Extract the size and skip that header to find the start of the data.
let (size_length, blob_size) = if first_len_byte < 0x80 {
(1, first_len_byte as u64)
} else {
let mut blob_size_buf = [0u8; 4];
let offset_in_buf = offset_in_buf as usize;
blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
blob_size_buf[0] &= 0x7f;
(4, u32::from_be_bytes(blob_size_buf) as u64)
};
let start = offset_in_buf + size_length;
let end = match next {
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
None => start + blob_size,
};
assert_eq!(end - start, blob_size);
metas.push(VectoredBlob {
start: start as usize,
end: end as usize,
meta: *meta,
})
}
Ok(VectoredBlobsBuf { buf, blobs: metas })
}
}
#[cfg(test)]
mod tests {
use super::*;
fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
assert_eq!(read.start, offset_range.first().unwrap().2);
let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
let offsets_in_read: Vec<_> = read
.blobs_at
.as_slice()
.iter()
.map(|(offset, _)| *offset)
.collect();
assert_eq!(expected_offsets_in_read, offsets_in_read);
}
#[test]
fn planner_max_read_size_test() {
let max_read_size = 128 * 1024;
let key = Key::MIN;
let lsn = Lsn(0);
let blob_descriptions = vec![
(key, lsn, 0, BlobFlag::None),
(key, lsn, 32 * 1024, BlobFlag::None),
(key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
(key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
(key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
(key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
(key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
(key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
];
let ranges = [
&blob_descriptions[0..3],
&blob_descriptions[3..4],
&blob_descriptions[4..5],
&blob_descriptions[5..6],
&blob_descriptions[6..7],
&blob_descriptions[7..],
];
let mut planner = VectoredReadPlanner::new(max_read_size);
for (key, lsn, offset, flag) in blob_descriptions.clone() {
planner.handle(key, lsn, offset, flag);
}
planner.handle_range_end(652 * 1024);
let reads = planner.finish();
assert_eq!(reads.len(), 6);
for (idx, read) in reads.iter().enumerate() {
validate_read(read, ranges[idx]);
}
}
#[test]
fn planner_replacement_test() {
let max_read_size = 128 * 1024;
let first_key = Key::MIN;
let second_key = first_key.next();
let lsn = Lsn(0);
let blob_descriptions = vec![
(first_key, lsn, 0, BlobFlag::None), // First in read 1
(first_key, lsn, 1024, BlobFlag::None), // Last in read 1
(second_key, lsn, 2 * 1024, BlobFlag::Replaces),
(second_key, lsn, 3 * 1024, BlobFlag::None),
(second_key, lsn, 4 * 1024, BlobFlag::Replaces), // First in read 2
(second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2
];
let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
let mut planner = VectoredReadPlanner::new(max_read_size);
for (key, lsn, offset, flag) in blob_descriptions.clone() {
planner.handle(key, lsn, offset, flag);
}
planner.handle_range_end(6 * 1024);
let reads = planner.finish();
assert_eq!(reads.len(), 2);
for (idx, read) in reads.iter().enumerate() {
validate_read(read, ranges[idx]);
}
}
}

View File

@@ -562,18 +562,7 @@ impl VirtualFile {
B: IoBufMut + Send,
{
let (buf, res) =
read_exact_at_impl(buf, offset, None, |buf, offset| self.read_at(buf, offset)).await;
res.map(|()| buf)
}
pub async fn read_exact_at_n<B>(&self, buf: B, offset: u64, count: usize) -> Result<B, Error>
where
B: IoBufMut + Send,
{
let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| {
self.read_at(buf, offset)
})
.await;
read_exact_at_impl(buf, offset, |buf, offset| self.read_at(buf, offset)).await;
res.map(|()| buf)
}
@@ -707,7 +696,6 @@ impl VirtualFile {
pub async fn read_exact_at_impl<B, F, Fut>(
buf: B,
mut offset: u64,
count: Option<usize>,
mut read_at: F,
) -> (B, std::io::Result<()>)
where
@@ -715,15 +703,7 @@ where
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
{
let mut buf: tokio_epoll_uring::Slice<B> = match count {
Some(count) => {
assert!(count <= buf.bytes_total());
assert!(count > 0);
buf.slice(..count) // may include uninitialized memory
}
None => buf.slice_full(), // includes all the uninitialized memory
};
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory
while buf.bytes_total() != 0 {
let res;
(buf, res) = read_at(buf, offset).await;
@@ -813,7 +793,7 @@ mod test_read_exact_at_impl {
result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
}]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -822,33 +802,13 @@ mod test_read_exact_at_impl {
assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
}
#[tokio::test]
async fn test_with_count() {
let buf = Vec::with_capacity(5);
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![Expectation {
offset: 0,
bytes_total: 3,
result: Ok(vec![b'a', b'b', b'c']),
}]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, Some(3), |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
.await;
assert!(res.is_ok());
assert_eq!(buf, vec![b'a', b'b', b'c']);
}
#[tokio::test]
async fn test_empty_buf_issues_no_syscall() {
let buf = Vec::new();
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::new(),
}));
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -873,7 +833,7 @@ mod test_read_exact_at_impl {
},
]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -904,7 +864,7 @@ mod test_read_exact_at_impl {
},
]),
}));
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})

View File

@@ -21,7 +21,7 @@ use crate::{
console,
error::{ReportableError, UserFacingError},
};
use std::io;
use std::{io, net::IpAddr};
use thiserror::Error;
/// Convenience wrapper for the authentication error.
@@ -62,10 +62,11 @@ pub enum AuthErrorImpl {
Io(#[from] io::Error),
#[error(
"This IP address is not allowed to connect to this endpoint. \
Please add it to the allowed list in the Neon console."
"This IP address {0} is not allowed to connect to this endpoint. \
Please add it to the allowed list in the Neon console. \
Make sure to check for IPv4 or IPv6 addresses."
)]
IpAddressNotAllowed,
IpAddressNotAllowed(IpAddr),
#[error("Too many connections to this endpoint. Please try again later.")]
TooManyConnections,
@@ -87,8 +88,8 @@ impl AuthError {
AuthErrorImpl::AuthFailed(user.into()).into()
}
pub fn ip_address_not_allowed() -> Self {
AuthErrorImpl::IpAddressNotAllowed.into()
pub fn ip_address_not_allowed(ip: IpAddr) -> Self {
AuthErrorImpl::IpAddressNotAllowed(ip).into()
}
pub fn too_many_connections() -> Self {
@@ -122,7 +123,7 @@ impl UserFacingError for AuthError {
MalformedPassword(_) => self.to_string(),
MissingEndpointName => self.to_string(),
Io(_) => "Internal error".to_string(),
IpAddressNotAllowed => self.to_string(),
IpAddressNotAllowed(_) => self.to_string(),
TooManyConnections => self.to_string(),
UserTimeout(_) => self.to_string(),
}
@@ -141,7 +142,7 @@ impl ReportableError for AuthError {
MalformedPassword(_) => crate::error::ErrorKind::User,
MissingEndpointName => crate::error::ErrorKind::User,
Io(_) => crate::error::ErrorKind::ClientDisconnect,
IpAddressNotAllowed => crate::error::ErrorKind::User,
IpAddressNotAllowed(_) => crate::error::ErrorKind::User,
TooManyConnections => crate::error::ErrorKind::RateLimit,
UserTimeout(_) => crate::error::ErrorKind::User,
}

View File

@@ -209,7 +209,7 @@ async fn auth_quirks(
// check allowed list
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed());
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr));
}
let cached_secret = match maybe_secret {
Some(secret) => secret,

View File

@@ -32,7 +32,7 @@ impl PoolingBackend {
let backend = self.config.auth_backend.as_ref().map(|_| user_info.clone());
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
return Err(AuthError::ip_address_not_allowed());
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr));
}
let cached_secret = match maybe_secret {
Some(secret) => secret,

View File

@@ -512,7 +512,7 @@ class NeonEnvBuilder:
def init_start(
self,
initial_tenant_conf: Optional[Dict[str, str]] = None,
initial_tenant_conf: Optional[Dict[str, Any]] = None,
default_remote_storage_if_missing: bool = True,
initial_tenant_shard_count: Optional[int] = None,
initial_tenant_shard_stripe_size: Optional[int] = None,
@@ -1497,7 +1497,7 @@ class NeonCli(AbstractNeonCli):
self,
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
conf: Optional[Dict[str, str]] = None,
conf: Optional[Dict[str, Any]] = None,
shard_count: Optional[int] = None,
shard_stripe_size: Optional[int] = None,
set_default: bool = False,

View File

@@ -395,12 +395,20 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
tenant_id: Union[TenantId, TenantShardId],
timestamp: datetime,
done_if_after: datetime,
shard_counts: Optional[List[int]] = None,
):
"""
Issues a request to perform time travel operations on the remote storage
"""
if shard_counts is None:
shard_counts = []
body: Dict[str, Any] = {
"shard_counts": shard_counts,
}
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/time_travel_remote_storage?travel_to={timestamp.isoformat()}Z&done_if_after={done_if_after.isoformat()}Z"
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/time_travel_remote_storage?travel_to={timestamp.isoformat()}Z&done_if_after={done_if_after.isoformat()}Z",
json=body,
)
self.verbose_error(res)

View File

@@ -219,6 +219,7 @@ def wait_for_last_record_lsn(
def wait_for_upload_queue_empty(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
wait_period_secs = 0.2
while True:
all_metrics = pageserver_http.get_metrics()
started = all_metrics.query_all(
@@ -235,7 +236,7 @@ def wait_for_upload_queue_empty(
"timeline_id": str(timeline_id),
},
)
assert len(started) == len(finished)
# this is `started left join finished`; if match, subtracting start from finished, resulting in queue depth
remaining_labels = ["shard_id", "file_kind", "op_kind"]
tl: List[Tuple[Any, float]] = []
@@ -256,7 +257,7 @@ def wait_for_upload_queue_empty(
log.info(f" {labels}: {queue_count}")
if all(queue_count == 0 for (_, queue_count) in tl):
return
time.sleep(0.2)
time.sleep(wait_period_secs)
def wait_timeline_detail_404(
@@ -482,8 +483,8 @@ def tenant_delete_wait_completed(
MANY_SMALL_LAYERS_TENANT_CONFIG = {
"gc_period": "0s",
"compaction_period": "0s",
"checkpoint_distance": f"{1024**2}",
"image_creation_threshold": "100",
"checkpoint_distance": 1024**2,
"image_creation_threshold": 100,
}

View File

@@ -1,195 +0,0 @@
import asyncio
import json
from pathlib import Path
from typing import Any, Dict, Tuple
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.utils import get_scale_for_db, humantime_to_ms
from performance.pageserver.util import (
setup_pageserver_with_tenants,
)
@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(200)])
@pytest.mark.parametrize("n_tenants", [10])
@pytest.mark.parametrize("get_vectored_impl", ["sequential", "vectored"])
@pytest.mark.timeout(1000)
def test_basebackup_with_high_slru_count(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
get_vectored_impl: str,
n_tenants: int,
pgbench_scale: int,
duration: int,
):
def record(metric, **kwargs):
zenbenchmark.record(metric_name=f"pageserver_basebackup.{metric}", **kwargs)
params: Dict[str, Tuple[Any, Dict[str, Any]]] = {}
# params from fixtures
params.update(
{
"n_tenants": (n_tenants, {"unit": ""}),
"pgbench_scale": (pgbench_scale, {"unit": ""}),
"duration": (duration, {"unit": "s"}),
}
)
# configure cache sizes like in prod
page_cache_size = 16384
max_file_descriptors = 500000
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}; "
f"get_vectored_impl='{get_vectored_impl}'; validate_vectored_get=false"
)
params.update(
{
"pageserver_config_override.page_cache_size": (
page_cache_size * 8192,
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
}
)
for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
n_txns = 500000
def setup_wrapper(env: NeonEnv):
return setup_tenant_template(env, n_txns)
env = setup_pageserver_with_tenants(
neon_env_builder, f"large_slru_count-{n_tenants}-{n_txns}", n_tenants, setup_wrapper
)
run_benchmark(env, pg_bin, record, duration)
def setup_tenant_template(env: NeonEnv, n_txns: int):
config = {
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ps_http = env.pageserver.http_client()
with env.endpoints.create_start(
"main", tenant_id=template_tenant, config_lines=["shared_buffers=1MB"]
) as ep:
rels = 10
asyncio.run(run_updates(ep, n_txns, rels))
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
return (template_tenant, template_timeline, config)
# Takes about 5 minutes and produces tenants with around 300 SLRU blocks
# of 8 KiB each.
async def run_updates(ep: Endpoint, n_txns: int, workers_count: int):
workers = []
for i in range(workers_count):
workers.append(asyncio.create_task(run_update_loop_worker(ep, n_txns, i)))
await asyncio.gather(*workers)
async def run_update_loop_worker(ep: Endpoint, n_txns: int, idx: int):
table = f"t_{idx}"
conn = await ep.connect_async()
await conn.execute(f"CREATE TABLE {table} (pk integer PRIMARY KEY, x integer)")
await conn.execute(f"ALTER TABLE {table} SET (autovacuum_enabled = false)")
await conn.execute(f"INSERT INTO {table} VALUES (1, 0)")
await conn.execute(
"""
CREATE PROCEDURE updating{0}() as
$$
DECLARE
i integer;
BEGIN
FOR i IN 1..{1} LOOP
UPDATE {0} SET x = x + 1 WHERE pk=1;
COMMIT;
END LOOP;
END
$$ LANGUAGE plpgsql
""".format(table, n_txns)
)
await conn.execute("SET statement_timeout=0")
await conn.execute(f"call updating{table}()")
def run_benchmark(env: NeonEnv, pg_bin: PgBin, record, duration_secs: int):
ps_http = env.pageserver.http_client()
cmd = [
str(env.neon_binpath / "pagebench"),
"basebackup",
"--mgmt-api-endpoint",
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--gzip-probability",
"1",
"--runtime",
f"{duration_secs}s",
# don't specify the targets explicitly, let pagebench auto-discover them
]
log.info(f"command: {' '.join(cmd)}")
basepath = pg_bin.run_capture(cmd, with_command_header=False)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
with open(results_path, "r") as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
total = results["total"]
metric = "request_count"
record(
metric,
metric_value=total[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "latency_mean"
record(
metric,
metric_value=humantime_to_ms(total[metric]),
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
metric = "latency_percentiles"
for k, v in total[metric].items():
record(
f"{metric}.{k}",
metric_value=humantime_to_ms(v),
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)

View File

@@ -3,6 +3,7 @@ import os
from pathlib import Path
from typing import Any, Dict, Tuple
import fixtures.pageserver.many_tenants as many_tenants
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
@@ -14,9 +15,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.utils import get_scale_for_db, humantime_to_ms
from performance.pageserver.util import (
setup_pageserver_with_tenants,
)
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
# For reference, the space usage of the snapshots:
@@ -81,72 +80,10 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
def setup_wrapper(env: NeonEnv):
return setup_tenant_template(env, pg_bin, pgbench_scale)
env = setup_pageserver_with_tenants(
neon_env_builder,
f"max_throughput_latest_lsn-{n_tenants}-{pgbench_scale}",
n_tenants,
setup_wrapper,
)
env = setup_pageserver_with_pgbench_tenants(neon_env_builder, pg_bin, n_tenants, pgbench_scale)
run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration)
def setup_tenant_template(env: NeonEnv, pg_bin: PgBin, scale: int):
# use a config that makes production of on-disk state timing-insensitive
# as we ingest data into the tenant.
config = {
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ps_http = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
for _ in range(
0, 17
): # some prime number to avoid potential resonances with the "_threshold" variables from the config
# the L0s produced by this appear to have size ~5MiB
num_txns = 10_000
pg_bin.run_capture(
["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()]
)
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
# for reference, the output at scale=6 looked like so (306M total)
# ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59
# total 306M
# 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829
# 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919
# 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71
# 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791
# 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1
# 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9
# 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639
# 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799
# 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19
# 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021
return (template_tenant, template_timeline, config)
def run_benchmark_max_throughput_latest_lsn(
env: NeonEnv, pg_bin: PgBin, record, duration_secs: int
):
@@ -201,3 +138,78 @@ def run_benchmark_max_throughput_latest_lsn(
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
def setup_pageserver_with_pgbench_tenants(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
n_tenants: int,
scale: int,
) -> NeonEnv:
"""
Utility function to set up a pageserver with a given number of identical tenants.
Each tenant is a pgbench tenant, initialize to a certain scale, and treated afterwards
with a repeat application of (pgbench simple-update workload, checkpoint, compact).
"""
def setup_template(env: NeonEnv):
# use a config that makes production of on-disk state timing-insensitive
# as we ingest data into the tenant.
config = {
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ps_http = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
for _ in range(
0, 17
): # some prime number to avoid potential resonances with the "_threshold" variables from the config
# the L0s produced by this appear to have size ~5MiB
num_txns = 10_000
pg_bin.run_capture(
["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()]
)
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
# for reference, the output at scale=6 looked like so (306M total)
# ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59
# total 306M
# 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829
# 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919
# 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71
# 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791
# 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1
# 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9
# 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639
# 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799
# 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19
# 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021
return (template_tenant, template_timeline, config)
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
return many_tenants.single_timeline(neon_env_builder, setup_template, n_tenants)
env = neon_env_builder.build_and_use_snapshot(
f"max_throughput_latest_lsn-{n_tenants}-{scale}", doit
)
env.start()
ensure_pageserver_ready_for_benchmarking(env, n_tenants)
return env

View File

@@ -2,16 +2,9 @@
Utilities used by all code in this sub-directory
"""
from typing import Any, Callable, Dict, Tuple
import fixtures.pageserver.many_tenants as many_tenants
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.neon_fixtures import NeonEnv
from fixtures.pageserver.utils import wait_until_all_tenants_state
from fixtures.types import TenantId, TimelineId
def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
@@ -34,24 +27,3 @@ def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
assert not layer.remote
log.info("ready")
def setup_pageserver_with_tenants(
neon_env_builder: NeonEnvBuilder,
name: str,
n_tenants: int,
setup: Callable[[NeonEnv], Tuple[TenantId, TimelineId, Dict[str, Any]]],
) -> NeonEnv:
"""
Utility function to set up a pageserver with a given number of identical tenants.
Each tenant is a pgbench tenant, initialize to a certain scale, and treated afterwards
with a repeat application of (pgbench simple-update workload, checkpoint, compact).
"""
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
return many_tenants.single_timeline(neon_env_builder, setup, n_tenants)
env = neon_env_builder.build_and_use_snapshot(name, doit)
env.start()
ensure_pageserver_ready_for_benchmarking(env, n_tenants)
return env

View File

@@ -24,7 +24,7 @@ async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.safe_psql(**kwargs)
text = str(exprinfo.value).strip()
assert "This IP address is not allowed to connect" in text
assert "not allowed to connect" in text
# no SNI, deprecated `options=project` syntax (before we had several endpoint in project)
check_cannot_connect(query="select 1", sslsni=0, options="project=private-project")

View File

@@ -1,3 +1,6 @@
import os
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -286,6 +289,12 @@ def test_sharding_split_smoke(
env.attachment_service.consistency_check()
@pytest.mark.skipif(
# The quantity of data isn't huge, but debug can be _very_ slow, and the things we're
# validating in this test don't benefit much from debug assertions.
os.getenv("BUILD_TYPE") == "debug",
reason="Avoid running bulkier ingest tests in debug mode",
)
def test_sharding_ingest(
neon_env_builder: NeonEnvBuilder,
):
@@ -319,10 +328,10 @@ def test_sharding_ingest(
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(512, upload=False)
workload.write_rows(512, upload=False)
workload.write_rows(512, upload=False)
workload.write_rows(512, upload=False)
workload.write_rows(4096, upload=False)
workload.write_rows(4096, upload=False)
workload.write_rows(4096, upload=False)
workload.write_rows(4096, upload=False)
workload.validate()
small_layer_count = 0
@@ -361,7 +370,12 @@ def test_sharding_ingest(
# - Because we roll layers on checkpoint_distance * shard_count, we expect to obey the target
# layer size on average, but it is still possible to write some tiny layers.
log.info(f"Totals: {small_layer_count} small layers, {ok_layer_count} ok layers")
assert float(small_layer_count) / float(ok_layer_count) < 0.25
if small_layer_count <= shard_count:
# If each shard has <= 1 small layer
pass
else:
# General case:
assert float(small_layer_count) / float(ok_layer_count) < 0.25
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
assert huge_layer_count <= shard_count

View File

@@ -1,13 +1,30 @@
import time
from collections import defaultdict
from datetime import datetime, timezone
from typing import List
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import tenant_delete_wait_completed, timeline_delete_wait_completed
from fixtures.pageserver.utils import (
MANY_SMALL_LAYERS_TENANT_CONFIG,
enable_remote_storage_versioning,
list_prefix,
remote_storage_delete_key,
tenant_delete_wait_completed,
timeline_delete_wait_completed,
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind, s3_storage
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
from fixtures.utils import run_pg_bench_small, wait_until
from mypy_boto3_s3.type_defs import (
ObjectTypeDef,
)
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@@ -457,3 +474,113 @@ def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder):
# Check that the 'drop' APIs didn't leave things in a state that would fail a consistency check: they're
# meant to be unclean wrt the pageserver state, but not leave a broken storage controller behind.
env.attachment_service.consistency_check()
def test_sharding_service_s3_time_travel_recovery(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
"""
Test for S3 time travel
"""
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
# Mock S3 doesn't have versioning enabled by default, enable it
# (also do it before there is any writes to the bucket)
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
remote_storage = neon_env_builder.pageserver_remote_storage
assert remote_storage, "remote storage not configured"
enable_remote_storage_versioning(remote_storage)
neon_env_builder.num_pageservers = 1
env = neon_env_builder.init_start()
virtual_ps_http = PageserverHttpClient(env.attachment_service_port, lambda: True)
tenant_id = TenantId.generate()
env.attachment_service.tenant_create(
tenant_id,
shard_count=2,
shard_stripe_size=8192,
tenant_config=MANY_SMALL_LAYERS_TENANT_CONFIG,
)
# Check that the consistency check passes
env.attachment_service.consistency_check()
branch_name = "main"
timeline_id = env.neon_cli.create_timeline(
branch_name,
tenant_id=tenant_id,
)
# Write some nontrivial amount of data into the endpoint and wait until it is uploaded
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
run_pg_bench_small(pg_bin, endpoint.connstr())
endpoint.safe_psql("CREATE TABLE created_foo(id integer);")
# last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
# Give the data time to be uploaded
time.sleep(4)
# Detach the tenant
virtual_ps_http.tenant_location_conf(
tenant_id,
{
"mode": "Detached",
"secondary_conf": None,
"tenant_conf": {},
"generation": None,
},
)
time.sleep(4)
ts_before_disaster = datetime.now(tz=timezone.utc).replace(tzinfo=None)
time.sleep(4)
# Simulate a "disaster": delete some random files from remote storage for one of the shards
assert env.pageserver_remote_storage
shard_id_for_list = "0002"
objects: List[ObjectTypeDef] = list_prefix(
env.pageserver_remote_storage,
f"tenants/{tenant_id}-{shard_id_for_list}/timelines/{timeline_id}/",
).get("Contents", [])
assert len(objects) > 1
log.info(f"Found {len(objects)} objects in remote storage")
should_delete = False
for obj in objects:
obj_key = obj["Key"]
should_delete = not should_delete
if not should_delete:
log.info(f"Keeping key on remote storage: {obj_key}")
continue
log.info(f"Deleting key from remote storage: {obj_key}")
remote_storage_delete_key(env.pageserver_remote_storage, obj_key)
pass
time.sleep(4)
ts_after_disaster = datetime.now(tz=timezone.utc).replace(tzinfo=None)
time.sleep(4)
# Do time travel recovery
virtual_ps_http.tenant_time_travel_remote_storage(
tenant_id, ts_before_disaster, ts_after_disaster, shard_counts=[2]
)
time.sleep(4)
# Attach the tenant again
virtual_ps_http.tenant_location_conf(
tenant_id,
{
"mode": "AttachedSingle",
"secondary_conf": None,
"tenant_conf": {},
"generation": 100,
},
)
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
endpoint.safe_psql("SELECT * FROM created_foo;")
env.attachment_service.consistency_check()

View File

@@ -74,7 +74,6 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
}
prev = Some(req);
}
PagestreamFeMessage::GetVectoredPages(_) => {}
PagestreamFeMessage::DbSize(_) => {}
};
}