mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 07:00:38 +00:00
Compare commits
14 Commits
vlad/get-v
...
problame/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
98d0bca8d1 | ||
|
|
bafa6d37d9 | ||
|
|
ad23c945df | ||
|
|
60e5a56a5a | ||
|
|
afda4420bd | ||
|
|
4536caa253 | ||
|
|
d7920a2a57 | ||
|
|
ce1673a8c4 | ||
|
|
f990e07581 | ||
|
|
1af2f3caf1 | ||
|
|
532b0fa52b | ||
|
|
4de2f0f3e0 | ||
|
|
41464325c7 | ||
|
|
7257ffbf75 |
@@ -1,10 +1,10 @@
|
||||
/compute_tools/ @neondatabase/control-plane @neondatabase/compute
|
||||
/control_plane/ @neondatabase/compute @neondatabase/storage
|
||||
/libs/pageserver_api/ @neondatabase/compute @neondatabase/storage
|
||||
/control_plane/attachment_service @neondatabase/storage
|
||||
/libs/pageserver_api/ @neondatabase/storage
|
||||
/libs/postgres_ffi/ @neondatabase/compute
|
||||
/libs/remote_storage/ @neondatabase/storage
|
||||
/libs/safekeeper_api/ @neondatabase/safekeepers
|
||||
/libs/vm_monitor/ @neondatabase/autoscaling @neondatabase/compute
|
||||
/libs/vm_monitor/ @neondatabase/autoscaling
|
||||
/pageserver/ @neondatabase/storage
|
||||
/pgxn/ @neondatabase/compute
|
||||
/proxy/ @neondatabase/proxy
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -284,6 +284,7 @@ dependencies = [
|
||||
"diesel_migrations",
|
||||
"futures",
|
||||
"git-version",
|
||||
"humantime",
|
||||
"hyper",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
|
||||
@@ -18,6 +18,7 @@ clap.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hyper.workspace = true
|
||||
humantime.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
|
||||
@@ -4,7 +4,7 @@ use hyper::{Body, Request, Response};
|
||||
use hyper::{StatusCode, Uri};
|
||||
use pageserver_api::models::{
|
||||
TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
|
||||
TimelineCreateRequest,
|
||||
TenantTimeTravelRequest, TimelineCreateRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
@@ -12,7 +12,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use utils::auth::SwappableJwtAuth;
|
||||
use utils::http::endpoint::{auth_middleware, request_span};
|
||||
use utils::http::request::parse_request_param;
|
||||
use utils::http::request::{must_get_query_param, parse_request_param};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use utils::{
|
||||
@@ -180,6 +180,39 @@ async fn handle_tenant_location_config(
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_time_travel_remote_storage(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
|
||||
|
||||
let timestamp_raw = must_get_query_param(&req, "travel_to")?;
|
||||
let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
|
||||
ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Invalid time for travel_to: {timestamp_raw:?}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
|
||||
let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
|
||||
ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Invalid time for done_if_after: {done_if_after_raw:?}"
|
||||
))
|
||||
})?;
|
||||
|
||||
service
|
||||
.tenant_time_travel_remote_storage(
|
||||
&time_travel_req,
|
||||
tenant_id,
|
||||
timestamp_raw,
|
||||
done_if_after_raw,
|
||||
)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_delete(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -477,6 +510,9 @@ pub fn make_router(
|
||||
.put("/v1/tenant/:tenant_id/location_config", |r| {
|
||||
tenant_service_handler(r, handle_tenant_location_config)
|
||||
})
|
||||
.put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
|
||||
tenant_service_handler(r, handle_tenant_time_travel_remote_storage)
|
||||
})
|
||||
// Timeline operations
|
||||
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
tenant_service_handler(r, handle_tenant_timeline_delete)
|
||||
|
||||
@@ -175,10 +175,7 @@ impl Scheduler {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_shard(
|
||||
&mut self,
|
||||
hard_exclude: &[NodeId],
|
||||
) -> Result<NodeId, ScheduleError> {
|
||||
pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result<NodeId, ScheduleError> {
|
||||
if self.nodes.is_empty() {
|
||||
return Err(ScheduleError::NoPageservers);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
cmp::Ordering,
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
str::FromStr,
|
||||
@@ -25,7 +26,7 @@ use pageserver_api::{
|
||||
self, LocationConfig, LocationConfigListResponse, LocationConfigMode, ShardParameters,
|
||||
TenantConfig, TenantCreateRequest, TenantLocationConfigRequest,
|
||||
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
|
||||
TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
|
||||
TenantShardSplitResponse, TenantTimeTravelRequest, TimelineCreateRequest, TimelineInfo,
|
||||
},
|
||||
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
|
||||
};
|
||||
@@ -1329,6 +1330,95 @@ impl Service {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_time_travel_remote_storage(
|
||||
&self,
|
||||
time_travel_req: &TenantTimeTravelRequest,
|
||||
tenant_id: TenantId,
|
||||
timestamp: Cow<'_, str>,
|
||||
done_if_after: Cow<'_, str>,
|
||||
) -> Result<(), ApiError> {
|
||||
let node = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
// Just a sanity check to prevent misuse: the API expects that the tenant is fully
|
||||
// detached everywhere, and nothing writes to S3 storage. Here, we verify that,
|
||||
// but only at the start of the process, so it's really just to prevent operator
|
||||
// mistakes.
|
||||
for (shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
|
||||
if shard.intent.get_attached().is_some() || !shard.intent.get_secondary().is_empty()
|
||||
{
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"We want tenant to be attached in shard with tenant_shard_id={shard_id}"
|
||||
)));
|
||||
}
|
||||
let maybe_attached = shard
|
||||
.observed
|
||||
.locations
|
||||
.iter()
|
||||
.filter_map(|(node_id, observed_location)| {
|
||||
observed_location
|
||||
.conf
|
||||
.as_ref()
|
||||
.map(|loc| (node_id, observed_location, loc.mode))
|
||||
})
|
||||
.find(|(_, _, mode)| *mode != LocationConfigMode::Detached);
|
||||
if let Some((node_id, _observed_location, mode)) = maybe_attached {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!("We observed attached={mode:?} tenant in node_id={node_id} shard with tenant_shard_id={shard_id}")));
|
||||
}
|
||||
}
|
||||
let scheduler = &locked.scheduler;
|
||||
// Right now we only perform the operation on a single node without parallelization
|
||||
// TODO fan out the operation to multiple nodes for better performance
|
||||
let node_id = scheduler.schedule_shard(&[])?;
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while lock is active");
|
||||
node.clone()
|
||||
};
|
||||
|
||||
// The shard count is encoded in the remote storage's URL, so we need to handle all historically used shard counts
|
||||
let mut counts = time_travel_req
|
||||
.shard_counts
|
||||
.iter()
|
||||
.copied()
|
||||
.collect::<HashSet<_>>()
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
counts.sort_unstable();
|
||||
|
||||
for count in counts {
|
||||
let shard_ids = (0..count.count())
|
||||
.map(|i| TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(i),
|
||||
shard_count: count,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
for tenant_shard_id in shard_ids {
|
||||
let client =
|
||||
mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
|
||||
|
||||
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
|
||||
|
||||
client
|
||||
.tenant_time_travel_remote_storage(
|
||||
tenant_shard_id,
|
||||
×tamp,
|
||||
&done_if_after,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Error doing time travel recovery for shard {tenant_shard_id} on node {}: {e}",
|
||||
node.id
|
||||
))
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
|
||||
|
||||
@@ -495,6 +495,13 @@ impl TenantState {
|
||||
}
|
||||
}
|
||||
|
||||
for node_id in self.observed.locations.keys() {
|
||||
if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
|
||||
// We have observed state that isn't part of our intent: need to clean it up.
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
|
||||
// wake up a reconciler to send it.
|
||||
if self.pending_compute_notification {
|
||||
|
||||
@@ -616,7 +616,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
let tenant_id = get_tenant_id(create_match, env)?;
|
||||
let new_branch_name = create_match
|
||||
.get_one::<String>("branch-name")
|
||||
.ok_or_else(|| anyhow!("No branch name provided"))?;
|
||||
.ok_or_else(|| anyhow!("No branch name provided"))?; // TODO
|
||||
|
||||
let pg_version = create_match
|
||||
.get_one::<u32>("pg-version")
|
||||
|
||||
@@ -291,6 +291,7 @@ pub struct TenantConfig {
|
||||
pub enum EvictionPolicy {
|
||||
NoEviction,
|
||||
LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
|
||||
OnlyImitiate(EvictionPolicyLayerAccessThreshold),
|
||||
}
|
||||
|
||||
impl EvictionPolicy {
|
||||
@@ -298,6 +299,7 @@ impl EvictionPolicy {
|
||||
match self {
|
||||
EvictionPolicy::NoEviction => "NoEviction",
|
||||
EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
|
||||
EvictionPolicy::OnlyImitiate(_) => "OnlyImitiate",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -342,7 +344,7 @@ impl ThrottleConfig {
|
||||
/// A flattened analog of a `pagesever::tenant::LocationMode`, which
|
||||
/// lists out all possible states (and the virtual "Detached" state)
|
||||
/// in a flat form rather than using rust-style enums.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub enum LocationConfigMode {
|
||||
AttachedSingle,
|
||||
AttachedMulti,
|
||||
@@ -406,6 +408,12 @@ pub struct TenantLocationConfigRequest {
|
||||
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantTimeTravelRequest {
|
||||
pub shard_counts: Vec<ShardCount>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantShardLocation {
|
||||
@@ -720,7 +728,6 @@ pub enum PagestreamFeMessage {
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
DbSize(PagestreamDbSizeRequest),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentRequest),
|
||||
GetVectoredPages(PagestreamGetVectoredPagesRequest),
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
@@ -732,7 +739,6 @@ pub enum PagestreamBeMessage {
|
||||
Error(PagestreamErrorResponse),
|
||||
DbSize(PagestreamDbSizeResponse),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
||||
GetVectoredPages(PagestreamGetVectoredPagesResponse),
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
@@ -744,7 +750,6 @@ enum PagestreamBeMessageTag {
|
||||
Error = 103,
|
||||
DbSize = 104,
|
||||
GetSlruSegment = 105,
|
||||
GetVectoredPages = 106,
|
||||
}
|
||||
impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
type Error = u8;
|
||||
@@ -756,7 +761,6 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
103 => Ok(PagestreamBeMessageTag::Error),
|
||||
104 => Ok(PagestreamBeMessageTag::DbSize),
|
||||
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
|
||||
106 => Ok(PagestreamBeMessageTag::GetVectoredPages),
|
||||
_ => Err(value),
|
||||
}
|
||||
}
|
||||
@@ -799,15 +803,6 @@ pub struct PagestreamGetSlruSegmentRequest {
|
||||
pub segno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamGetVectoredPagesRequest {
|
||||
pub latest: bool,
|
||||
pub lsn: Lsn,
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
pub count: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamExistsResponse {
|
||||
pub exists: bool,
|
||||
@@ -828,12 +823,6 @@ pub struct PagestreamGetSlruSegmentResponse {
|
||||
pub segment: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetVectoredPagesResponse {
|
||||
pub page_count: u8,
|
||||
pub pages: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamErrorResponse {
|
||||
pub message: String,
|
||||
@@ -905,18 +894,6 @@ impl PagestreamFeMessage {
|
||||
bytes.put_u8(req.kind);
|
||||
bytes.put_u32(req.segno);
|
||||
}
|
||||
|
||||
Self::GetVectoredPages(req) => {
|
||||
bytes.put_u8(5);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
bytes.put_u32(req.blkno);
|
||||
bytes.put_u8(req.count);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -975,20 +952,6 @@ impl PagestreamFeMessage {
|
||||
segno: body.read_u32::<BigEndian>()?,
|
||||
},
|
||||
)),
|
||||
5 => Ok(PagestreamFeMessage::GetVectoredPages(
|
||||
PagestreamGetVectoredPagesRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
count: body.read_u8()?,
|
||||
},
|
||||
)),
|
||||
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
@@ -1030,12 +993,6 @@ impl PagestreamBeMessage {
|
||||
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
|
||||
bytes.put(&resp.segment[..]);
|
||||
}
|
||||
|
||||
Self::GetVectoredPages(resp) => {
|
||||
bytes.put_u8(Tag::GetVectoredPages as u8);
|
||||
bytes.put_u8(resp.page_count);
|
||||
bytes.put(&resp.pages[..]);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -1084,15 +1041,6 @@ impl PagestreamBeMessage {
|
||||
segment: segment.into(),
|
||||
})
|
||||
}
|
||||
Tag::GetVectoredPages => {
|
||||
let page_count = buf.read_u8()?;
|
||||
let mut pages = vec![0; page_count as usize * 8192];
|
||||
buf.read_exact(&mut pages)?;
|
||||
Self::GetVectoredPages(PagestreamGetVectoredPagesResponse {
|
||||
page_count,
|
||||
pages: pages.into(),
|
||||
})
|
||||
}
|
||||
};
|
||||
let remaining = buf.into_inner();
|
||||
if !remaining.is_empty() {
|
||||
@@ -1112,7 +1060,6 @@ impl PagestreamBeMessage {
|
||||
Self::Error(_) => "Error",
|
||||
Self::DbSize(_) => "DbSize",
|
||||
Self::GetSlruSegment(_) => "GetSlruSegment",
|
||||
Self::GetVectoredPages(_) => "GetVectoredPages",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,6 +44,26 @@ impl DownloadError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for DownloadError {
|
||||
fn from(value: std::io::Error) -> Self {
|
||||
let needs_unwrap = value.kind() == std::io::ErrorKind::Other
|
||||
&& value
|
||||
.get_ref()
|
||||
.and_then(|x| x.downcast_ref::<DownloadError>())
|
||||
.is_some();
|
||||
|
||||
if needs_unwrap {
|
||||
*value
|
||||
.into_inner()
|
||||
.expect("just checked")
|
||||
.downcast::<DownloadError>()
|
||||
.expect("just checked")
|
||||
} else {
|
||||
DownloadError::Other(value.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum TimeTravelError {
|
||||
/// Validation or other error happened due to user input.
|
||||
@@ -142,13 +162,12 @@ impl std::fmt::Display for TimeoutOrCancel {
|
||||
impl std::error::Error for TimeoutOrCancel {}
|
||||
|
||||
impl TimeoutOrCancel {
|
||||
pub fn caused(error: &anyhow::Error) -> Option<&Self> {
|
||||
error.root_cause().downcast_ref()
|
||||
}
|
||||
|
||||
/// Returns true if the error was caused by [`TimeoutOrCancel::Cancel`].
|
||||
pub fn caused_by_cancel(error: &anyhow::Error) -> bool {
|
||||
Self::caused(error).is_some_and(Self::is_cancel)
|
||||
error
|
||||
.root_cause()
|
||||
.downcast_ref::<Self>()
|
||||
.is_some_and(Self::is_cancel)
|
||||
}
|
||||
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
|
||||
@@ -73,6 +73,8 @@ where
|
||||
if !*this.hit {
|
||||
if let Poll::Ready(e) = this.cancellation.poll(cx) {
|
||||
*this.hit = true;
|
||||
|
||||
// most likely this will be a std::io::Error wrapping a DownloadError
|
||||
let e = Err(std::io::Error::from(e));
|
||||
return Poll::Ready(Some(e));
|
||||
}
|
||||
@@ -130,6 +132,8 @@ mod tests {
|
||||
.is_some_and(|e| matches!(e, DownloadError::Cancelled)),
|
||||
"{inner:?}"
|
||||
);
|
||||
let e = DownloadError::from(e);
|
||||
assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
|
||||
|
||||
tokio::select! {
|
||||
_ = stream.next() => unreachable!("no timeout ever happens as we were already cancelled"),
|
||||
@@ -146,7 +150,7 @@ mod tests {
|
||||
let stream = DownloadStream::new(cancel_or_timeout(timeout, cancel.clone()), inner);
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
|
||||
// because the stream uses 120s timeout we are paused, we advance to 120s right away.
|
||||
// because the stream uses 120s timeout and we are paused, we advance to 120s right away.
|
||||
let first = stream.next();
|
||||
|
||||
let e = first.await.expect("there must be some").unwrap_err();
|
||||
@@ -158,6 +162,8 @@ mod tests {
|
||||
.is_some_and(|e| matches!(e, DownloadError::Timeout)),
|
||||
"{inner:?}"
|
||||
);
|
||||
let e = DownloadError::from(e);
|
||||
assert!(matches!(e, DownloadError::Timeout), "{e:?}");
|
||||
|
||||
cancel.cancel();
|
||||
|
||||
|
||||
@@ -217,6 +217,20 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn tenant_time_travel_remote_storage(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timestamp: &str,
|
||||
done_if_after: &str,
|
||||
) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{tenant_shard_id}/time_travel_remote_storage?travel_to={timestamp}&done_if_after={done_if_after}",
|
||||
self.mgmt_api_endpoint
|
||||
);
|
||||
self.request(Method::PUT, &uri, ()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
|
||||
let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
|
||||
self.request(Method::PUT, &uri, req).await?;
|
||||
|
||||
@@ -4,8 +4,7 @@ use futures::SinkExt;
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
|
||||
PagestreamGetPageResponse, PagestreamGetVectoredPagesRequest,
|
||||
PagestreamGetVectoredPagesResponse,
|
||||
PagestreamGetPageResponse,
|
||||
},
|
||||
reltag::RelTag,
|
||||
};
|
||||
@@ -158,39 +157,7 @@ impl PagestreamClient {
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_)
|
||||
| PagestreamBeMessage::GetVectoredPages(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
msg.kind()
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn getpages(
|
||||
&mut self,
|
||||
req: PagestreamGetVectoredPagesRequest,
|
||||
) -> anyhow::Result<PagestreamGetVectoredPagesResponse> {
|
||||
let req = PagestreamFeMessage::GetVectoredPages(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next: bytes::Bytes = next.unwrap()?;
|
||||
|
||||
let msg = PagestreamBeMessage::deserialize(next)?;
|
||||
match msg {
|
||||
PagestreamBeMessage::GetVectoredPages(p) => Ok(p),
|
||||
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_)
|
||||
| PagestreamBeMessage::GetPage(_) => {
|
||||
| PagestreamBeMessage::GetSlruSegment(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
msg.kind()
|
||||
|
||||
@@ -8,7 +8,7 @@ use utils::lsn::Lsn;
|
||||
use rand::prelude::*;
|
||||
use tokio::sync::Barrier;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{info, instrument};
|
||||
use tracing::{debug, info, instrument};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
@@ -28,8 +28,6 @@ pub(crate) struct Args {
|
||||
#[clap(long, default_value = "localhost:64000")]
|
||||
page_service_host_port: String,
|
||||
#[clap(long)]
|
||||
page_service_connstring: Option<String>,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(long, default_value = "1")]
|
||||
num_clients: NonZeroUsize,
|
||||
@@ -232,17 +230,12 @@ async fn client(
|
||||
) {
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
let connstr = match &args.page_service_connstring {
|
||||
Some(connstr) => connstr.clone(),
|
||||
None => crate::util::connstring::connstring(
|
||||
&args.page_service_host_port,
|
||||
args.pageserver_jwt.as_deref(),
|
||||
),
|
||||
};
|
||||
|
||||
let client = pageserver_client::page_service::Client::new(connstr)
|
||||
.await
|
||||
.unwrap();
|
||||
let client = pageserver_client::page_service::Client::new(crate::util::connstring::connstring(
|
||||
&args.page_service_host_port,
|
||||
args.pageserver_jwt.as_deref(),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(Work { lsn, gzip }) = work.recv().await {
|
||||
let start = Instant::now();
|
||||
@@ -270,7 +263,7 @@ async fn client(
|
||||
}
|
||||
})
|
||||
.await;
|
||||
info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
|
||||
debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
|
||||
let elapsed = start.elapsed();
|
||||
live_stats.inc();
|
||||
STATS.with(|stats| {
|
||||
|
||||
@@ -2,7 +2,7 @@ use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::{is_rel_block_key, key_to_rel_block, Key};
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamGetVectoredPagesRequest};
|
||||
use pageserver_api::models::PagestreamGetPageRequest;
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TenantTimelineId;
|
||||
@@ -57,8 +57,6 @@ pub(crate) struct Args {
|
||||
/// [`pageserver_api::models::virtual_file::IoEngineKind`].
|
||||
#[clap(long)]
|
||||
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
|
||||
#[clap(long)]
|
||||
vectored_read_size: Option<u8>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
@@ -301,45 +299,22 @@ async fn main_impl(
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
if let Some(size) = args.vectored_read_size {
|
||||
assert!(size > 0);
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(is_rel_block_key(&key));
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
|
||||
PagestreamGetVectoredPagesRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
lsn: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
count: size
|
||||
}
|
||||
};
|
||||
client.getpages(req).await.unwrap();
|
||||
} else {
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(is_rel_block_key(&key));
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
lsn: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage(req).await.unwrap();
|
||||
}
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(is_rel_block_key(&key));
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
lsn: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage(req).await.unwrap();
|
||||
let end = Instant::now();
|
||||
live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
|
||||
@@ -143,7 +143,6 @@ where
|
||||
ar: &'a mut Builder<&'b mut W>,
|
||||
buf: Vec<u8>,
|
||||
current_segment: Option<(SlruKind, u32)>,
|
||||
total_blocks: usize,
|
||||
}
|
||||
|
||||
impl<'a, 'b, W> SlruSegmentsBuilder<'a, 'b, W>
|
||||
@@ -155,7 +154,6 @@ where
|
||||
ar,
|
||||
buf: Vec::new(),
|
||||
current_segment: None,
|
||||
total_blocks: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,8 +199,7 @@ where
|
||||
let header = new_tar_header(&segname, self.buf.len() as u64)?;
|
||||
self.ar.append(&header, self.buf.as_slice()).await?;
|
||||
|
||||
self.total_blocks += nblocks;
|
||||
debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
|
||||
trace!("Added to basebackup slru {} relsize {}", segname, nblocks);
|
||||
|
||||
self.buf.clear();
|
||||
|
||||
@@ -210,15 +207,11 @@ where
|
||||
}
|
||||
|
||||
async fn finish(mut self) -> anyhow::Result<()> {
|
||||
let res = if self.current_segment.is_none() || self.buf.is_empty() {
|
||||
Ok(())
|
||||
} else {
|
||||
self.flush().await
|
||||
};
|
||||
if self.current_segment.is_none() || self.buf.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!("Collected {} SLRU blocks", self.total_blocks);
|
||||
|
||||
res
|
||||
self.flush().await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -87,10 +87,6 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
|
||||
|
||||
pub const DEFAULT_MAX_VECTORED_READ_SIZE: usize = 128 * 1024; // 128 KiB
|
||||
|
||||
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
|
||||
|
||||
///
|
||||
/// Default built-in configuration file.
|
||||
///
|
||||
@@ -130,10 +126,6 @@ pub mod defaults {
|
||||
|
||||
#get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}'
|
||||
|
||||
#max_vectored_read_size = '{DEFAULT_MAX_VECTORED_READ_SIZE}'
|
||||
|
||||
#validate_vectored_get = '{DEFAULT_VALIDATE_VECTORED_GET}'
|
||||
|
||||
[tenant_config]
|
||||
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
|
||||
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
|
||||
@@ -271,10 +263,6 @@ pub struct PageServerConf {
|
||||
pub virtual_file_io_engine: virtual_file::IoEngineKind,
|
||||
|
||||
pub get_vectored_impl: GetVectoredImpl,
|
||||
|
||||
pub max_vectored_read_size: usize,
|
||||
|
||||
pub validate_vectored_get: bool,
|
||||
}
|
||||
|
||||
/// We do not want to store this in a PageServerConf because the latter may be logged
|
||||
@@ -363,10 +351,6 @@ struct PageServerConfigBuilder {
|
||||
virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
|
||||
|
||||
get_vectored_impl: BuilderValue<GetVectoredImpl>,
|
||||
|
||||
max_vectored_read_size: BuilderValue<usize>,
|
||||
|
||||
validate_vectored_get: BuilderValue<bool>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConfigBuilder {
|
||||
@@ -446,8 +430,6 @@ impl Default for PageServerConfigBuilder {
|
||||
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
|
||||
|
||||
get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()),
|
||||
max_vectored_read_size: Set(DEFAULT_MAX_VECTORED_READ_SIZE),
|
||||
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -612,14 +594,6 @@ impl PageServerConfigBuilder {
|
||||
self.get_vectored_impl = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn get_max_vectored_read_size(&mut self, value: usize) {
|
||||
self.max_vectored_read_size = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn get_validate_vectored_get(&mut self, value: bool) {
|
||||
self.validate_vectored_get = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn build(self) -> anyhow::Result<PageServerConf> {
|
||||
let concurrent_tenant_warmup = self
|
||||
.concurrent_tenant_warmup
|
||||
@@ -733,12 +707,6 @@ impl PageServerConfigBuilder {
|
||||
get_vectored_impl: self
|
||||
.get_vectored_impl
|
||||
.ok_or(anyhow!("missing get_vectored_impl"))?,
|
||||
max_vectored_read_size: self
|
||||
.max_vectored_read_size
|
||||
.ok_or(anyhow!("missing max_vectored_read_size"))?,
|
||||
validate_vectored_get: self
|
||||
.validate_vectored_get
|
||||
.ok_or(anyhow!("missing validate_vectored_get"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -996,12 +964,6 @@ impl PageServerConf {
|
||||
"get_vectored_impl" => {
|
||||
builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?)
|
||||
}
|
||||
"max_vectored_read_size" => {
|
||||
builder.get_max_vectored_read_size(parse_toml_u64("max_vectored_read_size", item)? as usize)
|
||||
}
|
||||
"validate_vectored_get" => {
|
||||
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
|
||||
}
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
}
|
||||
@@ -1077,8 +1039,6 @@ impl PageServerConf {
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
|
||||
max_vectored_read_size: defaults::DEFAULT_MAX_VECTORED_READ_SIZE,
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1313,8 +1273,6 @@ background_task_maximum_delay = '334 s'
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
|
||||
max_vectored_read_size: defaults::DEFAULT_MAX_VECTORED_READ_SIZE,
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
);
|
||||
@@ -1380,8 +1338,6 @@ background_task_maximum_delay = '334 s'
|
||||
ingest_batch_size: 100,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
|
||||
max_vectored_read_size: defaults::DEFAULT_MAX_VECTORED_READ_SIZE,
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
);
|
||||
@@ -1616,17 +1572,50 @@ threshold = "20m"
|
||||
eviction_order: crate::disk_usage_eviction_task::EvictionOrder::AbsoluteAccessed,
|
||||
})
|
||||
);
|
||||
|
||||
match &conf.default_tenant_conf.eviction_policy {
|
||||
EvictionPolicy::NoEviction => panic!("Unexpected eviction opolicy tenant settings"),
|
||||
EvictionPolicy::LayerAccessThreshold(eviction_thresold) => {
|
||||
assert_eq!(eviction_thresold.period, Duration::from_secs(20 * 60));
|
||||
assert_eq!(eviction_thresold.threshold, Duration::from_secs(20 * 60));
|
||||
EvictionPolicy::LayerAccessThreshold(eviction_threshold) => {
|
||||
assert_eq!(eviction_threshold.period, Duration::from_secs(20 * 60));
|
||||
assert_eq!(eviction_threshold.threshold, Duration::from_secs(20 * 60));
|
||||
}
|
||||
other => unreachable!("Unexpected eviction policy tenant settings: {other:?}"),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_imitation_only_pageserver_config() {
|
||||
let tempdir = tempdir().unwrap();
|
||||
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir).unwrap();
|
||||
|
||||
let pageserver_conf_toml = format!(
|
||||
r#"pg_distrib_dir = "{pg_distrib_dir}"
|
||||
metric_collection_endpoint = "http://sample.url"
|
||||
metric_collection_interval = "10min"
|
||||
id = 222
|
||||
|
||||
[tenant_config]
|
||||
evictions_low_residence_duration_metric_threshold = "20m"
|
||||
|
||||
[tenant_config.eviction_policy]
|
||||
kind = "OnlyImitiate"
|
||||
period = "20m"
|
||||
threshold = "20m"
|
||||
"#,
|
||||
);
|
||||
let toml: Document = pageserver_conf_toml.parse().unwrap();
|
||||
let conf = PageServerConf::parse_and_validate(&toml, &workdir).unwrap();
|
||||
|
||||
match &conf.default_tenant_conf.eviction_policy {
|
||||
EvictionPolicy::OnlyImitiate(t) => {
|
||||
assert_eq!(t.period, Duration::from_secs(20 * 60));
|
||||
assert_eq!(t.threshold, Duration::from_secs(20 * 60));
|
||||
}
|
||||
other => unreachable!("Unexpected eviction policy tenant settings: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
|
||||
let tempdir_path = tempdir.path();
|
||||
|
||||
|
||||
@@ -17,8 +17,6 @@ use futures::stream::FuturesUnordered;
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::PagestreamGetVectoredPagesRequest;
|
||||
use pageserver_api::models::PagestreamGetVectoredPagesResponse;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
@@ -73,7 +71,6 @@ use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::get_active_tenant_with_timeout;
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::mgr::ShardSelector;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::timeline::WaitLsnError;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
@@ -337,10 +334,6 @@ enum PageStreamError {
|
||||
#[error("Read error")]
|
||||
Read(#[source] PageReconstructError),
|
||||
|
||||
/// Something went wrong reading a page: this likely indicates a pageserver bug
|
||||
#[error("Vectored read error")]
|
||||
VectoredRead(#[source] GetVectoredError),
|
||||
|
||||
/// Ran out of time waiting for an LSN
|
||||
#[error("LSN timeout: {0}")]
|
||||
LsnTimeout(WaitLsnError),
|
||||
@@ -364,15 +357,6 @@ impl From<PageReconstructError> for PageStreamError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetVectoredError> for PageStreamError {
|
||||
fn from(value: GetVectoredError) -> Self {
|
||||
match value {
|
||||
GetVectoredError::Cancelled => Self::Shutdown,
|
||||
e => Self::VectoredRead(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTimelineError> for PageStreamError {
|
||||
fn from(value: GetActiveTimelineError) -> Self {
|
||||
match value {
|
||||
@@ -682,15 +666,6 @@ impl PageServerHandler {
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetVectoredPages(req) => {
|
||||
let span = tracing::info_span!("handle_get_vectored_pages_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn, req_count = %req.count);
|
||||
(
|
||||
self.handle_get_pages_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
match response {
|
||||
@@ -1186,80 +1161,6 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_pages_at_lsn_request(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamGetVectoredPagesRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
// This is cheeky and relies on not using sharding :)
|
||||
// A real solution has to split the requested key sequence between shards.
|
||||
let get_page_request = PagestreamGetPageRequest {
|
||||
latest: req.latest,
|
||||
lsn: req.lsn,
|
||||
rel: req.rel,
|
||||
blkno: req.blkno,
|
||||
};
|
||||
|
||||
let timeline = match self.get_cached_timeline_for_page(&get_page_request) {
|
||||
Ok(tl) => tl,
|
||||
Err(key) => {
|
||||
match self
|
||||
.load_timeline_for_page(tenant_id, timeline_id, key)
|
||||
.await
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
//
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
return Err(PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into(),
|
||||
));
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't
|
||||
set_tracing_field_shard_id(timeline);
|
||||
|
||||
let _timer = timeline
|
||||
.query_metrics
|
||||
.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let (page_count, pages_buf) = timeline
|
||||
.get_rel_pages_at_lsn(
|
||||
req.rel,
|
||||
req.blkno,
|
||||
req.count,
|
||||
Version::Lsn(lsn),
|
||||
req.latest,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetVectoredPages(
|
||||
PagestreamGetVectoredPagesResponse {
|
||||
page_count,
|
||||
pages: pages_buf,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_slru_segment_request(
|
||||
&mut self,
|
||||
|
||||
@@ -11,9 +11,8 @@ use crate::context::RequestContext;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::repository::*;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::{anyhow, ensure, Context};
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
@@ -28,7 +27,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::{Oid, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use strum::IntoEnumIterator;
|
||||
@@ -199,41 +198,6 @@ impl Timeline {
|
||||
version.get(self, key, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_rel_pages_at_lsn(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
blknum: BlockNumber,
|
||||
count: u8,
|
||||
version: Version<'_>,
|
||||
latest: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u8, Bytes), GetVectoredError> {
|
||||
if tag.relnode == 0 {
|
||||
return Err(GetVectoredError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
));
|
||||
}
|
||||
|
||||
let nblocks = self
|
||||
.get_rel_size(tag, version, latest, ctx)
|
||||
.await
|
||||
.map_err(|e| GetVectoredError::Other(anyhow!(e)))?;
|
||||
if blknum + (count - 1) as u32 >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag,
|
||||
blknum,
|
||||
version.get_lsn(),
|
||||
nblocks
|
||||
);
|
||||
return Ok((1, ZERO_PAGE.clone()));
|
||||
}
|
||||
|
||||
let start_key = rel_block_to_key(tag, blknum);
|
||||
let end_key = start_key.add(count as u32);
|
||||
version.get_vectored(self, start_key..end_key, ctx).await
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
pub(crate) async fn get_db_size(
|
||||
&self,
|
||||
@@ -1645,55 +1609,6 @@ impl<'a> DatadirModification<'a> {
|
||||
self.tline.get(key, lsn, ctx).await
|
||||
}
|
||||
|
||||
async fn get_vectored(
|
||||
&self,
|
||||
key_range: Range<Key>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
// Have we already updated the same key? Read the latest pending updated
|
||||
// version in that case.
|
||||
//
|
||||
// Note: we don't check pending_deletions. It is an error to request a
|
||||
// value that has been removed, deletion only avoids leaking storage.
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let mut keys_in_modification = KeySpaceAccum::new();
|
||||
|
||||
let key = key_range.start;
|
||||
while key != key_range.end {
|
||||
if let Some(values) = self.pending_updates.get(&key) {
|
||||
if let Some((_, value)) = values.last() {
|
||||
keys_in_modification.add_key(key);
|
||||
|
||||
match value {
|
||||
Value::Image(img) => {
|
||||
results.insert(key, Ok(img.clone()));
|
||||
}
|
||||
_ => {
|
||||
results.insert(
|
||||
key,
|
||||
Err(PageReconstructError::from(anyhow::anyhow!(
|
||||
"unexpected pending WAL record"
|
||||
))),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
|
||||
|
||||
let mut keyspace = KeySpace {
|
||||
ranges: vec![key_range],
|
||||
};
|
||||
keyspace.remove_overlapping_with(&keys_in_modification.to_keyspace());
|
||||
|
||||
let pages = self.tline.get_vectored(keyspace, lsn, ctx).await?;
|
||||
results.extend(pages.into_iter());
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
fn put(&mut self, key: Key, val: Value) {
|
||||
let values = self.pending_updates.entry(key).or_default();
|
||||
// Replace the previous value if it exists at the same lsn
|
||||
@@ -1737,43 +1652,6 @@ impl<'a> Version<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_vectored(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
key_range: Range<Key>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u8, Bytes), GetVectoredError> {
|
||||
let pages = match self {
|
||||
Version::Lsn(lsn) => {
|
||||
timeline
|
||||
.get_vectored(
|
||||
KeySpace {
|
||||
ranges: vec![key_range],
|
||||
},
|
||||
*lsn,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Version::Modified(modification) => modification.get_vectored(key_range, ctx).await,
|
||||
}?;
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
let page_count: u8 = pages.len().try_into().expect("too many pages returned");
|
||||
for page in pages {
|
||||
match page {
|
||||
(_key, Ok(bytes)) => {
|
||||
buf.extend_from_slice(&bytes[..]);
|
||||
}
|
||||
(_key, Err(err)) => {
|
||||
return Err(GetVectoredError::Other(anyhow!(err)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((page_count, buf.freeze()))
|
||||
}
|
||||
|
||||
fn get_lsn(&self) -> Lsn {
|
||||
match self {
|
||||
Version::Lsn(lsn) => *lsn,
|
||||
|
||||
@@ -146,7 +146,6 @@ macro_rules! pausable_failpoint {
|
||||
|
||||
pub mod blob_io;
|
||||
pub mod block_io;
|
||||
pub mod vectored_blob_io;
|
||||
|
||||
pub mod disk_btree;
|
||||
pub(crate) mod ephemeral_file;
|
||||
|
||||
@@ -61,6 +61,8 @@ use utils::lsn::Lsn;
|
||||
use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
pub use historic_layer_coverage::LayerKey;
|
||||
|
||||
pub(crate) use self::historic_layer_coverage::RebuildVersion;
|
||||
|
||||
use super::storage_layer::PersistentLayerDesc;
|
||||
|
||||
///
|
||||
@@ -500,7 +502,7 @@ impl LayerMap {
|
||||
///
|
||||
/// Helper function for BatchedUpdates::remove_historic
|
||||
///
|
||||
pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
|
||||
pub(self) fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
|
||||
self.historic
|
||||
.remove(historic_layer_coverage::LayerKey::from(layer_desc));
|
||||
let layer_key = layer_desc.key();
|
||||
@@ -525,6 +527,10 @@ impl LayerMap {
|
||||
self.historic.rebuild();
|
||||
}
|
||||
|
||||
pub fn get_rebuild_version(&self) -> RebuildVersion {
|
||||
self.historic.get_rebuild_version()
|
||||
}
|
||||
|
||||
/// Is there a newer image layer for given key- and LSN-range? Or a set
|
||||
/// of image layers within the specified lsn range that cover the entire
|
||||
/// specified key range?
|
||||
|
||||
@@ -413,6 +413,8 @@ fn test_persistent_overlapping() {
|
||||
/// See this for more on persistent and retroactive techniques:
|
||||
/// <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
|
||||
pub struct BufferedHistoricLayerCoverage<Value> {
|
||||
rebuild_version: RebuildVersion,
|
||||
|
||||
/// A persistent layer map that we rebuild when we need to retroactively update
|
||||
historic_coverage: HistoricLayerCoverage<Value>,
|
||||
|
||||
@@ -438,9 +440,33 @@ impl<T: Clone> Default for BufferedHistoricLayerCoverage<T> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
||||
pub struct RebuildVersion(u64);
|
||||
|
||||
impl RebuildVersion {
|
||||
fn inc(&mut self) {
|
||||
self.0
|
||||
.checked_add(1)
|
||||
.expect("at current clock cycles, we won't hit this");
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RebuildVersion {
|
||||
fn default() -> Self {
|
||||
RebuildVersion(1)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for RebuildVersion {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
rebuild_version: RebuildVersion::default(),
|
||||
historic_coverage: HistoricLayerCoverage::<Value>::new(),
|
||||
buffer: BTreeMap::new(),
|
||||
layers: BTreeMap::new(),
|
||||
@@ -462,6 +488,8 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
None => return, // No need to rebuild if buffer is empty
|
||||
};
|
||||
|
||||
self.rebuild_version.inc();
|
||||
|
||||
// Apply buffered updates to self.layers
|
||||
let num_updates = self.buffer.len();
|
||||
self.buffer.retain(|layer_key, layer| {
|
||||
@@ -493,11 +521,17 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
|
||||
// TODO maybe only warn if ratio is at least 10
|
||||
info!(
|
||||
version = %self.rebuild_version,
|
||||
"Rebuilt layer map. Did {} insertions to process a batch of {} updates.",
|
||||
num_inserted, num_updates,
|
||||
num_inserted,
|
||||
num_updates,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn get_rebuild_version(&self) -> RebuildVersion {
|
||||
self.rebuild_version
|
||||
}
|
||||
|
||||
/// Iterate all the layers
|
||||
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
|
||||
// NOTE we can actually perform this without rebuilding,
|
||||
|
||||
@@ -88,14 +88,7 @@ pub async fn download_layer_file<'a>(
|
||||
|
||||
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
|
||||
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
|
||||
)
|
||||
})
|
||||
.map_err(DownloadError::Other);
|
||||
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file).await;
|
||||
|
||||
match bytes_amount {
|
||||
Ok(bytes_amount) => {
|
||||
@@ -107,7 +100,7 @@ pub async fn download_layer_file<'a>(
|
||||
on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
|
||||
}
|
||||
|
||||
Err(e)
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -245,10 +238,7 @@ async fn do_download_index_part(
|
||||
let stream = download.download_stream;
|
||||
let mut stream = StreamReader::new(stream);
|
||||
|
||||
tokio::io::copy_buf(&mut stream, &mut bytes)
|
||||
.await
|
||||
.with_context(|| format!("download index part at {remote_path:?}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
tokio::io::copy_buf(&mut stream, &mut bytes).await?;
|
||||
|
||||
Ok(bytes)
|
||||
},
|
||||
@@ -428,14 +418,7 @@ pub(crate) async fn download_initdb_tar_zst(
|
||||
let mut download = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let mut writer = tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, file);
|
||||
|
||||
// TODO: this consumption of the response body should be subject to timeout + cancellation, but
|
||||
// not without thinking carefully about how to recover safely from cancelling a write to
|
||||
// local storage (e.g. by writing into a temp file as we do in download_layer)
|
||||
// FIXME: flip the weird error wrapping
|
||||
tokio::io::copy_buf(&mut download, &mut writer)
|
||||
.await
|
||||
.with_context(|| format!("download initdb.tar.zst at {remote_path:?}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
tokio::io::copy_buf(&mut download, &mut writer).await?;
|
||||
|
||||
let mut file = writer.into_inner();
|
||||
|
||||
|
||||
@@ -438,8 +438,14 @@ impl From<std::io::Error> for UpdateError {
|
||||
fn from(value: std::io::Error) -> Self {
|
||||
if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
|
||||
UpdateError::NoSpace
|
||||
} else if value
|
||||
.get_ref()
|
||||
.and_then(|x| x.downcast_ref::<DownloadError>())
|
||||
.is_some()
|
||||
{
|
||||
UpdateError::from(DownloadError::from(value))
|
||||
} else {
|
||||
// An I/O error from e.g. tokio::io::copy is most likely a remote storage issue
|
||||
// An I/O error from e.g. tokio::io::copy_buf is most likely a remote storage issue
|
||||
UpdateError::Other(anyhow::anyhow!(value))
|
||||
}
|
||||
}
|
||||
@@ -672,20 +678,17 @@ impl<'a> TenantDownloader<'a> {
|
||||
.await
|
||||
{
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
if let DownloadError::NotFound = e {
|
||||
// A heatmap might be out of date and refer to a layer that doesn't exist any more.
|
||||
// This is harmless: continue to download the next layer. It is expected during compaction
|
||||
// GC.
|
||||
tracing::debug!(
|
||||
"Skipped downloading missing layer {}, raced with compaction/gc?",
|
||||
layer.name
|
||||
);
|
||||
continue;
|
||||
} else {
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(DownloadError::NotFound) => {
|
||||
// A heatmap might be out of date and refer to a layer that doesn't exist any more.
|
||||
// This is harmless: continue to download the next layer. It is expected during compaction
|
||||
// GC.
|
||||
tracing::debug!(
|
||||
"Skipped downloading missing layer {}, raced with compaction/gc?",
|
||||
layer.name
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
if downloaded_bytes != layer.metadata.file_size {
|
||||
|
||||
@@ -329,20 +329,10 @@ impl ReadableLayerDesc {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
match self {
|
||||
ReadableLayerDesc::Persistent {
|
||||
desc,
|
||||
lsn_floor,
|
||||
lsn_ceil,
|
||||
} => {
|
||||
ReadableLayerDesc::Persistent { desc, lsn_ceil, .. } => {
|
||||
let layer = layer_manager.get_from_desc(desc);
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
keyspace,
|
||||
*lsn_floor,
|
||||
*lsn_ceil,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.get_values_reconstruct_data(keyspace, *lsn_ceil, reconstruct_state, ctx)
|
||||
.await
|
||||
}
|
||||
ReadableLayerDesc::InMemory { handle, lsn_ceil } => {
|
||||
|
||||
@@ -36,21 +36,18 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeMap;
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
@@ -66,7 +63,8 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::{
|
||||
AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
|
||||
AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValueReconstructSituation,
|
||||
ValuesReconstructState,
|
||||
};
|
||||
|
||||
///
|
||||
@@ -215,7 +213,6 @@ pub struct DeltaLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
vectored_blob_reader: VectoredBlobReader,
|
||||
|
||||
/// Reader object for reading blocks from the file.
|
||||
file: FileBlockReader,
|
||||
@@ -251,7 +248,7 @@ impl DeltaLayer {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load(LayerAccessKind::Dump, 0, ctx).await?;
|
||||
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
|
||||
|
||||
inner.dump(ctx).await
|
||||
}
|
||||
@@ -287,25 +284,20 @@ impl DeltaLayer {
|
||||
async fn load(
|
||||
&self,
|
||||
access_kind: LayerAccessKind,
|
||||
max_vectored_read_size: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<&Arc<DeltaLayerInner>> {
|
||||
self.access_stats.record_access(access_kind, ctx);
|
||||
// Quick exit if already loaded
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner(max_vectored_read_size, ctx))
|
||||
.get_or_try_init(|| self.load_inner(ctx))
|
||||
.await
|
||||
.with_context(|| format!("Failed to load delta layer {}", self.path()))
|
||||
}
|
||||
|
||||
async fn load_inner(
|
||||
&self,
|
||||
max_vectored_read_size: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<DeltaLayerInner>> {
|
||||
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
|
||||
let path = self.path();
|
||||
|
||||
let loaded = DeltaLayerInner::load(&path, None, max_vectored_read_size, ctx)
|
||||
let loaded = DeltaLayerInner::load(&path, None, ctx)
|
||||
.await
|
||||
.and_then(|res| res)?;
|
||||
|
||||
@@ -706,16 +698,15 @@ impl DeltaLayerInner {
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
summary: Option<Summary>,
|
||||
max_vectored_read_size: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
let block_reader = FileBlockReader::new(file);
|
||||
let file = FileBlockReader::new(file);
|
||||
|
||||
let summary_blk = match block_reader.read_blk(0, ctx).await {
|
||||
let summary_blk = match file.read_blk(0, ctx).await {
|
||||
Ok(blk) => blk,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
|
||||
};
|
||||
@@ -737,16 +728,8 @@ impl DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: don't open file twice
|
||||
let file = match VirtualFile::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
let vectored_blob_reader = VectoredBlobReader::new(file, max_vectored_read_size);
|
||||
|
||||
Ok(Ok(DeltaLayerInner {
|
||||
file: block_reader,
|
||||
vectored_blob_reader,
|
||||
file,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
}))
|
||||
@@ -851,31 +834,10 @@ impl DeltaLayerInner {
|
||||
pub(super) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let reads = self
|
||||
.plan_reads(keyspace, start_lsn..end_lsn, reconstruct_state, ctx)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
self.do_reads_and_update_state(reads, reconstruct_state)
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn plan_reads(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<VectoredRead>> {
|
||||
let mut planner = VectoredReadPlanner::new(self.vectored_blob_reader.get_max_read_size());
|
||||
|
||||
let file = &self.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
@@ -883,130 +845,110 @@ impl DeltaLayerInner {
|
||||
file,
|
||||
);
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
let mut offsets: BTreeMap<Key, Vec<(Lsn, u64)>> = BTreeMap::new();
|
||||
|
||||
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut ignore_key = None;
|
||||
|
||||
// Scan the page versions backwards, starting from the last key in the range.
|
||||
// to collect all the offsets at which need to be read.
|
||||
let end_key = DeltaKey::from_key_lsn(&range.end, Lsn(end_lsn.0 - 1));
|
||||
tree_reader
|
||||
.visit(
|
||||
&start_key.0,
|
||||
VisitDirection::Forwards,
|
||||
&end_key.0,
|
||||
VisitDirection::Backwards,
|
||||
|raw_key, value| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
let entry_lsn = DeltaKey::extract_lsn_from_buf(raw_key);
|
||||
|
||||
assert!(key >= range.start && lsn >= lsn_range.start);
|
||||
|
||||
let cached_lsn = reconstruct_state.get_cached_lsn(&key);
|
||||
let flag = {
|
||||
if cached_lsn >= Some(lsn) {
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
BlobFlag::Replaces
|
||||
} else {
|
||||
BlobFlag::None
|
||||
}
|
||||
};
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
false
|
||||
} else {
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
true
|
||||
if entry_lsn >= end_lsn {
|
||||
return true;
|
||||
}
|
||||
|
||||
if key < range.start {
|
||||
return false;
|
||||
}
|
||||
|
||||
if key >= range.end {
|
||||
return true;
|
||||
}
|
||||
|
||||
if Some(key) == ignore_key {
|
||||
return true;
|
||||
}
|
||||
|
||||
if let Some(cached_lsn) = reconstruct_state.get_cached_lsn(&key) {
|
||||
if entry_lsn <= cached_lsn {
|
||||
return key != range.start;
|
||||
}
|
||||
}
|
||||
|
||||
let blob_ref = BlobRef(value);
|
||||
let lsns_at = offsets.entry(key).or_default();
|
||||
lsns_at.push((entry_lsn, blob_ref.pos()));
|
||||
|
||||
if blob_ref.will_init() {
|
||||
if key == range.start {
|
||||
return false;
|
||||
} else {
|
||||
ignore_key = Some(key);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
tracing::info!("Handling range end fallback at {}", payload_end);
|
||||
planner.handle_range_end(payload_end);
|
||||
let ctx = &RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerValue)
|
||||
.build();
|
||||
|
||||
let cursor = file.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (key, lsns_at) in offsets {
|
||||
for (lsn, block_offset) in lsns_at {
|
||||
let res = cursor.read_blob_into_buf(block_offset, &mut buf, ctx).await;
|
||||
|
||||
if let Err(e) = res {
|
||||
reconstruct_state.on_key_error(
|
||||
key,
|
||||
PageReconstructError::from(anyhow!(e).context(format!(
|
||||
"Failed to read blob from virtual file {}",
|
||||
file.file.path
|
||||
))),
|
||||
);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
let value = Value::des(&buf);
|
||||
if let Err(e) = value {
|
||||
reconstruct_state.on_key_error(
|
||||
key,
|
||||
PageReconstructError::from(anyhow!(e).context(format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
file.file.path
|
||||
))),
|
||||
);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
let key_situation = reconstruct_state.update_key(&key, lsn, value.unwrap());
|
||||
if key_situation == ValueReconstructSituation::Complete {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(planner.finish())
|
||||
}
|
||||
|
||||
async fn do_reads_and_update_state(
|
||||
&self,
|
||||
reads: Vec<VectoredRead>,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
) {
|
||||
let mut ignore_key_with_err = None;
|
||||
|
||||
let mut buf = Some(BytesMut::with_capacity(
|
||||
self.vectored_blob_reader.get_max_read_size(),
|
||||
));
|
||||
|
||||
for read in reads.into_iter().rev() {
|
||||
let res = self
|
||||
.vectored_blob_reader
|
||||
.read_blobs(&read, buf.take().expect("Should have a buffer"))
|
||||
.await;
|
||||
|
||||
let blobs_buf = match res {
|
||||
Ok(blobs_buf) => blobs_buf,
|
||||
Err(err) => {
|
||||
let kind = err.kind();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
reconstruct_state.on_key_error(
|
||||
blob_meta.key,
|
||||
PageReconstructError::from(anyhow!(
|
||||
"Failed to read blobs from virtual file {}: {}",
|
||||
self.vectored_blob_reader.get_file_ref().path,
|
||||
kind
|
||||
)),
|
||||
);
|
||||
}
|
||||
|
||||
// We have "lost" the buffer since the lower level IO api
|
||||
// doesn't return the buffer on error. Allocate a new one.
|
||||
buf = Some(BytesMut::with_capacity(
|
||||
self.vectored_blob_reader.get_max_read_size(),
|
||||
));
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for meta in blobs_buf.blobs.iter().rev() {
|
||||
if Some(meta.meta.key) == ignore_key_with_err {
|
||||
continue;
|
||||
}
|
||||
|
||||
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
|
||||
let value = match value {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
reconstruct_state.on_key_error(
|
||||
meta.meta.key,
|
||||
PageReconstructError::from(anyhow!(e).context(format!(
|
||||
"Failed to deserialize blob from virtual file {}",
|
||||
self.vectored_blob_reader.get_file_ref().path,
|
||||
))),
|
||||
);
|
||||
|
||||
ignore_key_with_err = Some(meta.meta.key);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
|
||||
// state, no further updates shall be made to it. The call below will
|
||||
// panic if the invariant is violated.
|
||||
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
|
||||
}
|
||||
|
||||
buf = Some(blobs_buf.buf);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn load_keys<'a>(
|
||||
|
||||
@@ -34,14 +34,11 @@ use crate::tenant::storage_layer::{
|
||||
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use bytes::Bytes;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
@@ -157,7 +154,6 @@ pub struct ImageLayerInner {
|
||||
|
||||
/// Reader object for reading blocks from the file.
|
||||
file: FileBlockReader,
|
||||
vectored_blob_reader: VectoredBlobReader,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ImageLayerInner {
|
||||
@@ -214,7 +210,7 @@ impl ImageLayer {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load(LayerAccessKind::Dump, 0, ctx).await?;
|
||||
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
|
||||
|
||||
inner.dump(ctx).await?;
|
||||
|
||||
@@ -244,32 +240,21 @@ impl ImageLayer {
|
||||
async fn load(
|
||||
&self,
|
||||
access_kind: LayerAccessKind,
|
||||
max_vectored_read_size: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<&ImageLayerInner> {
|
||||
self.access_stats.record_access(access_kind, ctx);
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner(max_vectored_read_size, ctx))
|
||||
.get_or_try_init(|| self.load_inner(ctx))
|
||||
.await
|
||||
.with_context(|| format!("Failed to load image layer {}", self.path()))
|
||||
}
|
||||
|
||||
async fn load_inner(
|
||||
&self,
|
||||
max_vectored_read_size: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<ImageLayerInner> {
|
||||
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
|
||||
let path = self.path();
|
||||
|
||||
let loaded = ImageLayerInner::load(
|
||||
&path,
|
||||
self.desc.image_layer_lsn(),
|
||||
None,
|
||||
max_vectored_read_size,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.and_then(|res| res)?;
|
||||
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx)
|
||||
.await
|
||||
.and_then(|res| res)?;
|
||||
|
||||
// not production code
|
||||
let actual_filename = path.file_name().unwrap().to_owned();
|
||||
@@ -376,15 +361,14 @@ impl ImageLayerInner {
|
||||
path: &Utf8Path,
|
||||
lsn: Lsn,
|
||||
summary: Option<Summary>,
|
||||
max_vectored_read_size: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
let block_reader = FileBlockReader::new(file);
|
||||
let summary_blk = match block_reader.read_blk(0, ctx).await {
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = match file.read_blk(0, ctx).await {
|
||||
Ok(blk) => blk,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
|
||||
};
|
||||
@@ -410,19 +394,11 @@ impl ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: don't open file twice
|
||||
let file = match VirtualFile::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
let vectored_blob_reader = VectoredBlobReader::new(file, max_vectored_read_size);
|
||||
|
||||
Ok(Ok(ImageLayerInner {
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
lsn,
|
||||
file: block_reader,
|
||||
vectored_blob_reader,
|
||||
file,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -473,30 +449,12 @@ impl ImageLayerInner {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let reads = self
|
||||
.plan_reads(keyspace, ctx)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
self.do_reads_and_update_state(reads, reconstruct_state)
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn plan_reads(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<VectoredRead>> {
|
||||
let mut planner = VectoredReadPlanner::new(self.vectored_blob_reader.get_max_read_size());
|
||||
|
||||
let file = &self.file;
|
||||
let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
let mut offsets = Vec::new();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
range.start.write_to_byte_slice(&mut search_key);
|
||||
|
||||
@@ -504,18 +462,17 @@ impl ImageLayerInner {
|
||||
.visit(
|
||||
&search_key,
|
||||
VisitDirection::Forwards,
|
||||
|raw_key, offset| {
|
||||
|raw_key, value| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
assert!(key >= range.start);
|
||||
|
||||
if key >= range.end {
|
||||
planner.handle_range_end(offset);
|
||||
range_end_handled = true;
|
||||
false
|
||||
} else {
|
||||
planner.handle(key, self.lsn, offset, BlobFlag::None);
|
||||
true
|
||||
if !range.contains(&key) {
|
||||
return false;
|
||||
}
|
||||
|
||||
offsets.push((key, value));
|
||||
|
||||
true
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
|
||||
@@ -523,64 +480,33 @@ impl ImageLayerInner {
|
||||
)
|
||||
.await
|
||||
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
planner.handle_range_end(payload_end);
|
||||
let ctx = &RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build();
|
||||
|
||||
let cursor = file.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (key, offset) in offsets {
|
||||
let res = cursor.read_blob_into_buf(offset, &mut buf, ctx).await;
|
||||
if let Err(e) = res {
|
||||
reconstruct_state.on_key_error(
|
||||
key,
|
||||
PageReconstructError::from(anyhow!(e).context(format!(
|
||||
"Failed to read blob from virtual file {}",
|
||||
file.file.path
|
||||
))),
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
let blob = Bytes::copy_from_slice(buf.as_slice());
|
||||
reconstruct_state.update_key(&key, self.lsn, Value::Image(blob));
|
||||
}
|
||||
|
||||
Ok(planner.finish())
|
||||
}
|
||||
|
||||
async fn do_reads_and_update_state(
|
||||
&self,
|
||||
reads: Vec<VectoredRead>,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
) {
|
||||
let mut buf = Some(BytesMut::with_capacity(
|
||||
self.vectored_blob_reader.get_max_read_size(),
|
||||
));
|
||||
for read in reads.into_iter().rev() {
|
||||
let res = self
|
||||
.vectored_blob_reader
|
||||
.read_blobs(&read, buf.take().expect("Should have a buffer"))
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(blobs_buf) => {
|
||||
for meta in blobs_buf.blobs.iter().rev() {
|
||||
let img_buf = Bytes::copy_from_slice(&blobs_buf.buf[meta.start..meta.end]);
|
||||
reconstruct_state.update_key(
|
||||
&meta.meta.key,
|
||||
self.lsn,
|
||||
Value::Image(img_buf),
|
||||
);
|
||||
}
|
||||
|
||||
buf = Some(blobs_buf.buf);
|
||||
}
|
||||
Err(err) => {
|
||||
let kind = err.kind();
|
||||
for (_, blob_meta) in read.blobs_at.as_slice() {
|
||||
reconstruct_state.on_key_error(
|
||||
blob_meta.key,
|
||||
PageReconstructError::from(anyhow!(
|
||||
"Failed to read blobs from virtual file {}: {}",
|
||||
self.vectored_blob_reader.get_file_ref().path,
|
||||
kind
|
||||
)),
|
||||
);
|
||||
}
|
||||
|
||||
// We have "lost" the buffer since the lower level IO api
|
||||
// doesn't return the buffer on error. Allocate a new one.
|
||||
buf = Some(BytesMut::with_capacity(
|
||||
self.vectored_blob_reader.get_max_read_size(),
|
||||
));
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -267,7 +267,6 @@ impl Layer {
|
||||
pub(crate) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
@@ -283,14 +282,7 @@ impl Layer {
|
||||
.record_access(LayerAccessKind::GetValueReconstructData, ctx);
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
keyspace,
|
||||
start_lsn,
|
||||
end_lsn,
|
||||
reconstruct_data,
|
||||
&self.0,
|
||||
ctx,
|
||||
)
|
||||
.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, &self.0, ctx)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
|
||||
.await
|
||||
}
|
||||
@@ -1307,14 +1299,9 @@ impl DownloadedLayer {
|
||||
owner.desc.key_range.clone(),
|
||||
owner.desc.lsn_range.clone(),
|
||||
));
|
||||
delta_layer::DeltaLayerInner::load(
|
||||
&owner.path,
|
||||
summary,
|
||||
owner.conf.max_vectored_read_size,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Delta))
|
||||
delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Delta))
|
||||
} else {
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
@@ -1323,15 +1310,9 @@ impl DownloadedLayer {
|
||||
owner.desc.key_range.clone(),
|
||||
lsn,
|
||||
));
|
||||
image_layer::ImageLayerInner::load(
|
||||
&owner.path,
|
||||
lsn,
|
||||
summary,
|
||||
owner.conf.max_vectored_read_size,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Image))
|
||||
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Image))
|
||||
};
|
||||
|
||||
match res {
|
||||
@@ -1384,7 +1365,6 @@ impl DownloadedLayer {
|
||||
async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
owner: &Arc<LayerInner>,
|
||||
@@ -1394,7 +1374,7 @@ impl DownloadedLayer {
|
||||
|
||||
match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
|
||||
Delta(d) => {
|
||||
d.get_values_reconstruct_data(keyspace, start_lsn, end_lsn, reconstruct_data, ctx)
|
||||
d.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, ctx)
|
||||
.await
|
||||
}
|
||||
Image(i) => {
|
||||
|
||||
@@ -111,10 +111,10 @@ use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
|
||||
use super::{config::TenantConf, storage_layer::ReadableLayerDesc};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{layer_map, remote_timeline_client::RemoteTimelineClient};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
@@ -210,6 +210,10 @@ pub struct Timeline {
|
||||
/// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
|
||||
pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
|
||||
|
||||
/// State that [`Self::create_image_layers`] keeps across invocations to determine if it can
|
||||
/// skip an invocation becaucse nothing changed.
|
||||
create_image_layers_skipper_state: std::sync::Mutex<Option<CreateImageLayersParams>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
/// It is used by compaction task when it checks if new image layer should be created.
|
||||
@@ -302,8 +306,9 @@ pub struct Timeline {
|
||||
// though let's keep them both for better error visibility.
|
||||
pub initdb_lsn: Lsn,
|
||||
|
||||
/// When did we last calculate the partitioning?
|
||||
partitioning: Mutex<(KeyPartitioning, Lsn)>,
|
||||
/// Used by [`Timline::repartition`] to avoid re-computing partitioning on every iteration.
|
||||
/// Must bump [`CompactionKeyspacePartitioning::version`] when changing.
|
||||
partitioning: tokio::sync::Mutex<CompactionKeyspacePartitioning>,
|
||||
|
||||
/// Configuration: how often should the partitioning be recalculated.
|
||||
repartition_threshold: u64,
|
||||
@@ -400,6 +405,57 @@ pub struct GcInfo {
|
||||
pub pitr_cutoff: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
struct CreateImageLayersParams {
|
||||
/// From [`layer_map::LayerMap::get_rebuild_version`].
|
||||
layer_map_version: layer_map::RebuildVersion,
|
||||
/// From [`Timeline::partitioning`].
|
||||
partitioning_version: CompactionKeyspacePartitioningVersion,
|
||||
}
|
||||
|
||||
/// The version of the keyspace partitioning maintained in [`Timeline::partitioning`].
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
struct CompactionKeyspacePartitioningVersion(u64);
|
||||
|
||||
impl CompactionKeyspacePartitioningVersion {
|
||||
fn inc(&mut self) {
|
||||
self.0
|
||||
.checked_add(1)
|
||||
.expect("at current clock cycles, we won't hit this");
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CompactionKeyspacePartitioning {
|
||||
version: CompactionKeyspacePartitioningVersion,
|
||||
partitioning: KeyPartitioning,
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
struct CompactionKeyspacePartition<'a> {
|
||||
key_space: &'a KeySpace,
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
impl CompactionKeyspacePartitioning {
|
||||
pub fn update(&mut self, partitioning: KeyPartitioning, lsn: Lsn) {
|
||||
assert!(self.lsn <= lsn);
|
||||
self.version.inc();
|
||||
self.partitioning = partitioning;
|
||||
self.lsn = lsn;
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = CompactionKeyspacePartition<'_>> {
|
||||
self.partitioning
|
||||
.parts
|
||||
.iter()
|
||||
.map(|key_space| CompactionKeyspacePartition {
|
||||
key_space,
|
||||
lsn: self.lsn,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// An error happened in a get() operation.
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum PageReconstructError {
|
||||
@@ -778,10 +834,8 @@ impl Timeline {
|
||||
GetVectoredImpl::Vectored => {
|
||||
let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
|
||||
|
||||
if self.conf.validate_vectored_get {
|
||||
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
|
||||
.await;
|
||||
}
|
||||
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
|
||||
.await;
|
||||
|
||||
vectored_res
|
||||
}
|
||||
@@ -1156,7 +1210,7 @@ impl Timeline {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((partitioning, lsn)) => {
|
||||
Ok(partitioning) => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::extend(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
@@ -1170,7 +1224,7 @@ impl Timeline {
|
||||
// 3. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let layers = self
|
||||
.create_image_layers(&partitioning, lsn, false, &image_ctx)
|
||||
.create_image_layers(&partitioning, false, &image_ctx)
|
||||
.await
|
||||
.map_err(anyhow::Error::from)?;
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
@@ -1661,8 +1715,15 @@ impl Timeline {
|
||||
// initial logical size is 0.
|
||||
LogicalSize::empty_initial()
|
||||
},
|
||||
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
|
||||
partitioning: tokio::sync::Mutex::new(CompactionKeyspacePartitioning {
|
||||
// The other fields don't matter because this is 0 and hence repartition(),
|
||||
// will calculate a new one
|
||||
lsn: Lsn(0),
|
||||
version: CompactionKeyspacePartitioningVersion(0),
|
||||
partitioning: KeyPartitioning::new(),
|
||||
}),
|
||||
repartition_threshold: 0,
|
||||
create_image_layers_skipper_state: std::sync::Mutex::new(None),
|
||||
|
||||
last_received_wal: Mutex::new(None),
|
||||
rel_size_cache: RwLock::new(HashMap::new()),
|
||||
@@ -3153,7 +3214,7 @@ impl Timeline {
|
||||
}
|
||||
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
|
||||
// require downloading anything during initial import.
|
||||
let (partitioning, _lsn) = self
|
||||
let partitioning = self
|
||||
.repartition(
|
||||
self.initdb_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
@@ -3168,8 +3229,7 @@ impl Timeline {
|
||||
|
||||
// For image layers, we add them immediately into the layer map.
|
||||
(
|
||||
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
|
||||
.await?,
|
||||
self.create_image_layers(&partitioning, true, ctx).await?,
|
||||
None,
|
||||
)
|
||||
} else {
|
||||
@@ -3407,36 +3467,46 @@ impl Timeline {
|
||||
partition_size: u64,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
|
||||
{
|
||||
let partitioning_guard = self.partitioning.lock().unwrap();
|
||||
let distance = lsn.0 - partitioning_guard.1 .0;
|
||||
if partitioning_guard.1 != Lsn(0)
|
||||
&& distance <= self.repartition_threshold
|
||||
&& !flags.contains(CompactFlags::ForceRepartition)
|
||||
{
|
||||
debug!(
|
||||
distance,
|
||||
threshold = self.repartition_threshold,
|
||||
"no repartitioning needed"
|
||||
);
|
||||
return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
|
||||
}
|
||||
) -> anyhow::Result<CompactionKeyspacePartitioning> {
|
||||
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
|
||||
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timelien.
|
||||
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
|
||||
// and hence before the compaction task starts.
|
||||
anyhow::bail!("repartition() called concurrently, this should not happen");
|
||||
};
|
||||
if lsn < partitioning_guard.lsn {
|
||||
anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
|
||||
}
|
||||
|
||||
let distance = lsn.0 - partitioning_guard.lsn.0;
|
||||
if partitioning_guard.lsn != Lsn(0)
|
||||
&& distance <= self.repartition_threshold
|
||||
&& !flags.contains(CompactFlags::ForceRepartition)
|
||||
{
|
||||
debug!(
|
||||
distance,
|
||||
threshold = self.repartition_threshold,
|
||||
"no repartitioning needed"
|
||||
);
|
||||
return Ok(partitioning_guard.clone());
|
||||
}
|
||||
|
||||
let keyspace = self.collect_keyspace(lsn, ctx).await?;
|
||||
let partitioning = keyspace.partition(partition_size);
|
||||
|
||||
let mut partitioning_guard = self.partitioning.lock().unwrap();
|
||||
if lsn > partitioning_guard.1 {
|
||||
*partitioning_guard = (partitioning, lsn);
|
||||
} else {
|
||||
warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
|
||||
}
|
||||
Ok((partitioning_guard.0.clone(), partitioning_guard.1))
|
||||
partitioning_guard.update(partitioning, lsn);
|
||||
|
||||
Ok(partitioning_guard.clone())
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
|
||||
async fn time_for_new_image_layer(
|
||||
&self,
|
||||
&CompactionKeyspacePartition {
|
||||
key_space: partition,
|
||||
lsn,
|
||||
}: &CompactionKeyspacePartition<'_>,
|
||||
) -> bool {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let guard = self.layers.read().await;
|
||||
@@ -3511,14 +3581,14 @@ impl Timeline {
|
||||
false
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(%lsn, %force))]
|
||||
#[tracing::instrument(skip_all, fields(lsn=%partitioning.lsn, %force))]
|
||||
async fn create_image_layers(
|
||||
self: &Arc<Timeline>,
|
||||
partitioning: &KeyPartitioning,
|
||||
lsn: Lsn,
|
||||
partitioning: &CompactionKeyspacePartitioning,
|
||||
force: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
|
||||
// REVIEW: should we not even bump this timer if we're taking short exit?
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
let mut image_layers = Vec::new();
|
||||
|
||||
@@ -3533,9 +3603,30 @@ impl Timeline {
|
||||
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
|
||||
let mut start = Key::MIN;
|
||||
|
||||
for partition in partitioning.parts.iter() {
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
if !force && !self.time_for_new_image_layer(partition, lsn).await {
|
||||
let new_inputs = {
|
||||
let layer_manager = self.layers.read().await;
|
||||
let layer_map = layer_manager.layer_map();
|
||||
CreateImageLayersParams {
|
||||
layer_map_version: layer_map.get_rebuild_version(),
|
||||
partitioning_version: partitioning.version,
|
||||
}
|
||||
};
|
||||
let last_inputs = {
|
||||
// bail out early if nothing changed
|
||||
let last = self.create_image_layers_skipper_state.lock().unwrap();
|
||||
match (force, &*last, &new_inputs) {
|
||||
(false, Some(last), new_inputs) if last == new_inputs => {
|
||||
debug!(?last, ?new_inputs, "inputs to image layer creation algorithm have not changed since last invocation");
|
||||
return Ok(image_layers);
|
||||
}
|
||||
_ => (), // we'll update .last once we finish with success
|
||||
}
|
||||
*last
|
||||
};
|
||||
|
||||
for partition in partitioning.iter() {
|
||||
let img_range = start..partition.key_space.ranges.last().unwrap().end;
|
||||
if !force && !self.time_for_new_image_layer(&partition).await {
|
||||
start = img_range.end;
|
||||
continue;
|
||||
}
|
||||
@@ -3545,7 +3636,7 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&img_range,
|
||||
lsn,
|
||||
partitioning.lsn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -3558,7 +3649,7 @@ impl Timeline {
|
||||
let mut wrote_keys = false;
|
||||
|
||||
let mut key_request_accum = KeySpaceAccum::new();
|
||||
for range in &partition.ranges {
|
||||
for range in &partition.key_space.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
// Decide whether to retain this key: usually we do, but sharded tenants may
|
||||
@@ -3582,7 +3673,11 @@ impl Timeline {
|
||||
|| last_key_in_range
|
||||
{
|
||||
let results = self
|
||||
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
|
||||
.get_vectored(
|
||||
key_request_accum.consume_keyspace(),
|
||||
partitioning.lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
for (img_key, img) in results {
|
||||
@@ -3671,6 +3766,25 @@ impl Timeline {
|
||||
.context("fsync of timeline dir")?;
|
||||
}
|
||||
|
||||
// Remember the inputs so that we take the early exit next compaction iteration
|
||||
// if nothing changed in the meantime.
|
||||
// NB: since we created `new_inputs`, the layer map might have changed and hence
|
||||
// `get_rebuild_version()` might have advanced already, e.g., by a
|
||||
// concurrent garbage collection iteration that removed layers. That's ok, next
|
||||
// `new_inputs` will observe an increased `get_rebuild_version()` and run again.
|
||||
// Same goes for the modifications that we're doing below: if `image_layers` is
|
||||
// not empty, we'll insert them into the layer map in the code below, which
|
||||
// will bump `get_rebuild_version()`, which will make us re-run once more.
|
||||
//
|
||||
{
|
||||
let mut state = self.create_image_layers_skipper_state.lock().unwrap();
|
||||
if *state != last_inputs {
|
||||
warn!(?last_inputs, ?new_inputs, observed=?*state, "unexpected: create_image_layers called concurrently? not caching inputs");
|
||||
} else {
|
||||
*state = Some(new_inputs);
|
||||
}
|
||||
}
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
// FIXME: we could add the images to be uploaded *before* returning from here, but right
|
||||
|
||||
@@ -85,6 +85,7 @@ impl Timeline {
|
||||
let policy = self.get_eviction_policy();
|
||||
let period = match policy {
|
||||
EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
|
||||
EvictionPolicy::OnlyImitiate(lat) => lat.period,
|
||||
EvictionPolicy::NoEviction => Duration::from_secs(10),
|
||||
};
|
||||
if random_init_delay(period, &cancel).await.is_err() {
|
||||
@@ -119,33 +120,45 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<(), Instant> {
|
||||
debug!("eviction iteration: {policy:?}");
|
||||
match policy {
|
||||
let start = Instant::now();
|
||||
let (period, threshold) = match policy {
|
||||
EvictionPolicy::NoEviction => {
|
||||
// check again in 10 seconds; XXX config watch mechanism
|
||||
ControlFlow::Continue(Instant::now() + Duration::from_secs(10))
|
||||
return ControlFlow::Continue(Instant::now() + Duration::from_secs(10));
|
||||
}
|
||||
EvictionPolicy::LayerAccessThreshold(p) => {
|
||||
let start = Instant::now();
|
||||
match self.eviction_iteration_threshold(p, cancel, ctx).await {
|
||||
ControlFlow::Break(()) => return ControlFlow::Break(()),
|
||||
ControlFlow::Continue(()) => (),
|
||||
}
|
||||
let elapsed = start.elapsed();
|
||||
crate::tenant::tasks::warn_when_period_overrun(
|
||||
elapsed,
|
||||
p.period,
|
||||
BackgroundLoopKind::Eviction,
|
||||
);
|
||||
crate::metrics::EVICTION_ITERATION_DURATION
|
||||
.get_metric_with_label_values(&[
|
||||
&format!("{}", p.period.as_secs()),
|
||||
&format!("{}", p.threshold.as_secs()),
|
||||
])
|
||||
.unwrap()
|
||||
.observe(elapsed.as_secs_f64());
|
||||
ControlFlow::Continue(start + p.period)
|
||||
(p.period, p.threshold)
|
||||
}
|
||||
}
|
||||
EvictionPolicy::OnlyImitiate(p) => {
|
||||
if self.imitiate_only(p, cancel, ctx).await.is_break() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
(p.period, p.threshold)
|
||||
}
|
||||
};
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
crate::tenant::tasks::warn_when_period_overrun(
|
||||
elapsed,
|
||||
period,
|
||||
BackgroundLoopKind::Eviction,
|
||||
);
|
||||
// FIXME: if we were to mix policies on a pageserver, we would have no way to sense this. I
|
||||
// don't think that is a relevant fear however, and regardless the imitation should be the
|
||||
// most costly part.
|
||||
crate::metrics::EVICTION_ITERATION_DURATION
|
||||
.get_metric_with_label_values(&[
|
||||
&format!("{}", period.as_secs()),
|
||||
&format!("{}", threshold.as_secs()),
|
||||
])
|
||||
.unwrap()
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
ControlFlow::Continue(start + period)
|
||||
}
|
||||
|
||||
async fn eviction_iteration_threshold(
|
||||
@@ -167,30 +180,6 @@ impl Timeline {
|
||||
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
|
||||
};
|
||||
|
||||
// If we evict layers but keep cached values derived from those layers, then
|
||||
// we face a storm of on-demand downloads after pageserver restart.
|
||||
// The reason is that the restart empties the caches, and so, the values
|
||||
// need to be re-computed by accessing layers, which we evicted while the
|
||||
// caches were filled.
|
||||
//
|
||||
// Solutions here would be one of the following:
|
||||
// 1. Have a persistent cache.
|
||||
// 2. Count every access to a cached value to the access stats of all layers
|
||||
// that were accessed to compute the value in the first place.
|
||||
// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
|
||||
// get re-computed from layers, thereby counting towards layer access stats.
|
||||
// 4. Make the eviction task imitate the layer accesses that typically hit caches.
|
||||
//
|
||||
// We follow approach (4) here because in Neon prod deployment:
|
||||
// - page cache is quite small => high churn => low hit rate
|
||||
// => eviction gets correct access stats
|
||||
// - value-level caches such as logical size & repatition have a high hit rate,
|
||||
// especially for inactive tenants
|
||||
// => eviction sees zero accesses for these
|
||||
// => they cause the on-demand download storm on pageserver restart
|
||||
//
|
||||
// We should probably move to persistent caches in the future, or avoid
|
||||
// having inactive tenants attached to pageserver in the first place.
|
||||
match self.imitate_layer_accesses(p, cancel, ctx).await {
|
||||
ControlFlow::Break(()) => return ControlFlow::Break(()),
|
||||
ControlFlow::Continue(()) => (),
|
||||
@@ -307,6 +296,52 @@ impl Timeline {
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
|
||||
/// Like `eviction_iteration_threshold`, but without any eviction. Eviction will be done by
|
||||
/// disk usage based eviction task.
|
||||
async fn imitiate_only(
|
||||
self: &Arc<Self>,
|
||||
p: &EvictionPolicyLayerAccessThreshold,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<()> {
|
||||
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
|
||||
BackgroundLoopKind::Eviction,
|
||||
ctx,
|
||||
);
|
||||
|
||||
let _permit = tokio::select! {
|
||||
permit = acquire_permit => permit,
|
||||
_ = cancel.cancelled() => return ControlFlow::Break(()),
|
||||
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
|
||||
};
|
||||
|
||||
self.imitate_layer_accesses(p, cancel, ctx).await
|
||||
}
|
||||
|
||||
/// If we evict layers but keep cached values derived from those layers, then
|
||||
/// we face a storm of on-demand downloads after pageserver restart.
|
||||
/// The reason is that the restart empties the caches, and so, the values
|
||||
/// need to be re-computed by accessing layers, which we evicted while the
|
||||
/// caches were filled.
|
||||
///
|
||||
/// Solutions here would be one of the following:
|
||||
/// 1. Have a persistent cache.
|
||||
/// 2. Count every access to a cached value to the access stats of all layers
|
||||
/// that were accessed to compute the value in the first place.
|
||||
/// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
|
||||
/// get re-computed from layers, thereby counting towards layer access stats.
|
||||
/// 4. Make the eviction task imitate the layer accesses that typically hit caches.
|
||||
///
|
||||
/// We follow approach (4) here because in Neon prod deployment:
|
||||
/// - page cache is quite small => high churn => low hit rate
|
||||
/// => eviction gets correct access stats
|
||||
/// - value-level caches such as logical size & repatition have a high hit rate,
|
||||
/// especially for inactive tenants
|
||||
/// => eviction sees zero accesses for these
|
||||
/// => they cause the on-demand download storm on pageserver restart
|
||||
///
|
||||
/// We should probably move to persistent caches in the future, or avoid
|
||||
/// having inactive tenants attached to pageserver in the first place.
|
||||
#[instrument(skip_all)]
|
||||
async fn imitate_layer_accesses(
|
||||
&self,
|
||||
|
||||
@@ -1,412 +0,0 @@
|
||||
//!
|
||||
//! Utilities for vectored reading of variable-sized "blobs".
|
||||
//!
|
||||
//! The "blob" api is an abstraction on top of the "block" api,
|
||||
//! with the main difference being that blobs do not have a fixed
|
||||
//! size (each blob is prefixed with 1 or 4 byte length field)
|
||||
//!
|
||||
//! The vectored apis provided in this module allow for planning
|
||||
//! and executing disk IO which covers multiple blobs.
|
||||
//!
|
||||
//! Reads are planned with [`VectoredReadPlanner`] which will coalesce
|
||||
//! adjacent blocks into a single disk IO request and exectuted by
|
||||
//! [`VectoredBlobReader`] which does all the required offset juggling
|
||||
//! and returns a buffer housing all the blobs and a list of offsets.
|
||||
//!
|
||||
//! Note that the vectored blob api does *not* go through the page cache.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use pageserver_api::key::Key;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::vec_map::VecMap;
|
||||
|
||||
use crate::virtual_file::VirtualFile;
|
||||
|
||||
/// Metadata bundled with the start and end offset of a blob.
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct BlobMeta {
|
||||
pub key: Key,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Blob offsets into [`VectoredBlobsBuf::buf`]
|
||||
pub struct VectoredBlob {
|
||||
pub start: usize,
|
||||
pub end: usize,
|
||||
pub meta: BlobMeta,
|
||||
}
|
||||
|
||||
/// Return type of [`VectoredBlobReader::read_blobs`]
|
||||
pub struct VectoredBlobsBuf {
|
||||
/// Buffer for all blobs in this read
|
||||
pub buf: BytesMut,
|
||||
/// Offsets into the buffer and metadata for all blobs in this read
|
||||
pub blobs: Vec<VectoredBlob>,
|
||||
}
|
||||
|
||||
/// Description of one disk read for multiple blobs.
|
||||
/// Used as the argument form [`VectoredBlobReader::read_blobs`]
|
||||
#[derive(Debug)]
|
||||
pub struct VectoredRead {
|
||||
pub start: u64,
|
||||
pub end: u64,
|
||||
/// Starting offsets and metadata for each blob in this read
|
||||
pub blobs_at: VecMap<u64, BlobMeta>,
|
||||
|
||||
max_read_size: usize,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq)]
|
||||
enum VectoredReadExtended {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
impl VectoredRead {
|
||||
fn new(start_offset: u64, end_offset: u64, meta: BlobMeta, max_read_size: usize) -> Self {
|
||||
let mut blobs_at = VecMap::default();
|
||||
blobs_at
|
||||
.append(start_offset, meta)
|
||||
.expect("First insertion always succeeds");
|
||||
|
||||
Self {
|
||||
start: start_offset,
|
||||
end: end_offset,
|
||||
blobs_at,
|
||||
max_read_size,
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to extend the current read with a new blob if the start
|
||||
/// offset matches with the current end of the vectored read
|
||||
/// and the resuting size is below the max read size
|
||||
fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
|
||||
let size = (end - start) as usize;
|
||||
if self.end == start && self.size() + size <= self.max_read_size {
|
||||
self.end = end;
|
||||
self.blobs_at
|
||||
.append(start, meta)
|
||||
.expect("LSNs are ordered within vectored reads");
|
||||
|
||||
return VectoredReadExtended::Yes;
|
||||
}
|
||||
|
||||
VectoredReadExtended::No
|
||||
}
|
||||
|
||||
fn size(&self) -> usize {
|
||||
(self.end - self.start) as usize
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub enum BlobFlag {
|
||||
None,
|
||||
Ignore,
|
||||
Replaces,
|
||||
}
|
||||
|
||||
/// Planner for vectored blob reads.
|
||||
///
|
||||
/// Blob offsets are received via [`VectoredReadPlanner::handle`]
|
||||
/// and coalesced into disk reads.
|
||||
///
|
||||
/// The implementation is very simple:
|
||||
/// * Collect all blob offsets in an ordered structure
|
||||
/// * Iterate over the collected blobs and coalesce them into reads at the end
|
||||
pub struct VectoredReadPlanner {
|
||||
// Track all the blob offsets. Start offsets must be ordered.
|
||||
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
|
||||
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
|
||||
prev: Option<(Key, Lsn, u64, BlobFlag)>,
|
||||
|
||||
max_read_size: usize,
|
||||
}
|
||||
|
||||
impl VectoredReadPlanner {
|
||||
pub fn new(max_read_size: usize) -> Self {
|
||||
Self {
|
||||
blobs: BTreeMap::new(),
|
||||
prev: None,
|
||||
max_read_size,
|
||||
}
|
||||
}
|
||||
|
||||
/// Include a new blob in the read plan.
|
||||
///
|
||||
/// Notes:
|
||||
/// * This function should be called for each blob in the desired *inclusive* range.
|
||||
/// See `DeltaLayerInner::plan_reads` and `ImageLayerInner::plan_reads`.
|
||||
/// * Calls to this function should be for monotonically continuous (key, lsn) tuples.
|
||||
///
|
||||
/// The `flag` argument has two interesting values:
|
||||
/// * [`BlobFlag::Replaces`]: The blob for this key should replace all existing blobs.
|
||||
/// This is used for WAL records that `will_init`.
|
||||
/// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
|
||||
/// if the blob is cached.
|
||||
pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
|
||||
// Implementation note: internally lag behind by one blob such that
|
||||
// we have a start and end offset when initialising [`VectoredRead`]
|
||||
let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
|
||||
None => {
|
||||
self.prev = Some((key, lsn, offset, flag));
|
||||
return;
|
||||
}
|
||||
Some(prev) => prev,
|
||||
};
|
||||
|
||||
self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
|
||||
|
||||
self.prev = Some((key, lsn, offset, flag));
|
||||
}
|
||||
|
||||
pub fn handle_range_end(&mut self, offset: u64) {
|
||||
if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
|
||||
self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
|
||||
}
|
||||
|
||||
self.prev = None;
|
||||
}
|
||||
|
||||
fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
|
||||
match flag {
|
||||
BlobFlag::None => {
|
||||
let blobs_for_key = self.blobs.entry(key).or_default();
|
||||
blobs_for_key.push((lsn, start_offset, end_offset));
|
||||
}
|
||||
BlobFlag::Replaces => {
|
||||
let blobs_for_key = self.blobs.entry(key).or_default();
|
||||
blobs_for_key.clear();
|
||||
blobs_for_key.push((lsn, start_offset, end_offset));
|
||||
}
|
||||
BlobFlag::Ignore => {}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finish(self) -> Vec<VectoredRead> {
|
||||
let mut current_read: Option<VectoredRead> = None;
|
||||
let mut reads = Vec::new();
|
||||
|
||||
for (key, blobs_for_key) in self.blobs {
|
||||
for (lsn, start_offset, end_offset) in blobs_for_key {
|
||||
let extended = match &mut current_read {
|
||||
Some(read) => read.extend(start_offset, end_offset, BlobMeta { key, lsn }),
|
||||
None => VectoredReadExtended::No,
|
||||
};
|
||||
|
||||
if extended == VectoredReadExtended::No {
|
||||
let next_read = VectoredRead::new(
|
||||
start_offset,
|
||||
end_offset,
|
||||
BlobMeta { key, lsn },
|
||||
self.max_read_size,
|
||||
);
|
||||
|
||||
let prev_read = current_read.replace(next_read);
|
||||
|
||||
if let Some(read) = prev_read {
|
||||
reads.push(read);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(read) = current_read {
|
||||
reads.push(read);
|
||||
}
|
||||
|
||||
reads
|
||||
}
|
||||
}
|
||||
|
||||
/// Disk reader for vectored blob spans (does not go through the page cache)
|
||||
pub struct VectoredBlobReader {
|
||||
file: VirtualFile,
|
||||
max_vectored_read_size: usize,
|
||||
}
|
||||
|
||||
impl VectoredBlobReader {
|
||||
pub fn new(file: VirtualFile, max_vectored_read_size: usize) -> Self {
|
||||
Self {
|
||||
file,
|
||||
max_vectored_read_size,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_max_read_size(&self) -> usize {
|
||||
self.max_vectored_read_size
|
||||
}
|
||||
|
||||
pub fn get_file_ref(&self) -> &VirtualFile {
|
||||
&self.file
|
||||
}
|
||||
|
||||
/// Read the requested blobs into the buffer.
|
||||
///
|
||||
/// We have to deal with the fact that blobs are not fixed size.
|
||||
/// Each blob is prefixed by a size header.
|
||||
///
|
||||
/// The success return value is a struct which contains the buffer
|
||||
/// filled from disk and a list of offsets at which each blob lies
|
||||
/// in the buffer.
|
||||
pub async fn read_blobs(
|
||||
&self,
|
||||
read: &VectoredRead,
|
||||
buf: BytesMut,
|
||||
) -> Result<VectoredBlobsBuf, std::io::Error> {
|
||||
// tracing::info!("read_blobs(read={read:?}, read_size={})", read.size());
|
||||
|
||||
assert!(read.size() > 0);
|
||||
assert!(
|
||||
read.size() <= buf.capacity(),
|
||||
"{} > {}",
|
||||
read.size(),
|
||||
buf.capacity()
|
||||
);
|
||||
let buf = self
|
||||
.file
|
||||
.read_exact_at_n(buf, read.start, read.size())
|
||||
.await?;
|
||||
|
||||
let blobs_at = read.blobs_at.as_slice();
|
||||
let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
|
||||
|
||||
let mut metas = Vec::new();
|
||||
let pairs = blobs_at.iter().zip(
|
||||
blobs_at
|
||||
.iter()
|
||||
.map(Some)
|
||||
.skip(1)
|
||||
.chain(std::iter::once(None)),
|
||||
);
|
||||
for ((offset, meta), next) in pairs {
|
||||
let offset_in_buf = offset - start_offset;
|
||||
let first_len_byte = buf[offset_in_buf as usize];
|
||||
|
||||
// Each blob is prefixed by a header containing it's size.
|
||||
// Extract the size and skip that header to find the start of the data.
|
||||
let (size_length, blob_size) = if first_len_byte < 0x80 {
|
||||
(1, first_len_byte as u64)
|
||||
} else {
|
||||
let mut blob_size_buf = [0u8; 4];
|
||||
let offset_in_buf = offset_in_buf as usize;
|
||||
|
||||
blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
|
||||
blob_size_buf[0] &= 0x7f;
|
||||
(4, u32::from_be_bytes(blob_size_buf) as u64)
|
||||
};
|
||||
|
||||
let start = offset_in_buf + size_length;
|
||||
let end = match next {
|
||||
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
|
||||
None => start + blob_size,
|
||||
};
|
||||
|
||||
assert_eq!(end - start, blob_size);
|
||||
|
||||
metas.push(VectoredBlob {
|
||||
start: start as usize,
|
||||
end: end as usize,
|
||||
meta: *meta,
|
||||
})
|
||||
}
|
||||
|
||||
Ok(VectoredBlobsBuf { buf, blobs: metas })
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
|
||||
assert_eq!(read.start, offset_range.first().unwrap().2);
|
||||
|
||||
let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
|
||||
|
||||
let offsets_in_read: Vec<_> = read
|
||||
.blobs_at
|
||||
.as_slice()
|
||||
.iter()
|
||||
.map(|(offset, _)| *offset)
|
||||
.collect();
|
||||
|
||||
assert_eq!(expected_offsets_in_read, offsets_in_read);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn planner_max_read_size_test() {
|
||||
let max_read_size = 128 * 1024;
|
||||
let key = Key::MIN;
|
||||
let lsn = Lsn(0);
|
||||
|
||||
let blob_descriptions = vec![
|
||||
(key, lsn, 0, BlobFlag::None),
|
||||
(key, lsn, 32 * 1024, BlobFlag::None),
|
||||
(key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
|
||||
(key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
|
||||
(key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
|
||||
(key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
|
||||
(key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
|
||||
(key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
|
||||
];
|
||||
|
||||
let ranges = [
|
||||
&blob_descriptions[0..3],
|
||||
&blob_descriptions[3..4],
|
||||
&blob_descriptions[4..5],
|
||||
&blob_descriptions[5..6],
|
||||
&blob_descriptions[6..7],
|
||||
&blob_descriptions[7..],
|
||||
];
|
||||
|
||||
let mut planner = VectoredReadPlanner::new(max_read_size);
|
||||
for (key, lsn, offset, flag) in blob_descriptions.clone() {
|
||||
planner.handle(key, lsn, offset, flag);
|
||||
}
|
||||
|
||||
planner.handle_range_end(652 * 1024);
|
||||
|
||||
let reads = planner.finish();
|
||||
assert_eq!(reads.len(), 6);
|
||||
|
||||
for (idx, read) in reads.iter().enumerate() {
|
||||
validate_read(read, ranges[idx]);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn planner_replacement_test() {
|
||||
let max_read_size = 128 * 1024;
|
||||
let first_key = Key::MIN;
|
||||
let second_key = first_key.next();
|
||||
let lsn = Lsn(0);
|
||||
|
||||
let blob_descriptions = vec![
|
||||
(first_key, lsn, 0, BlobFlag::None), // First in read 1
|
||||
(first_key, lsn, 1024, BlobFlag::None), // Last in read 1
|
||||
(second_key, lsn, 2 * 1024, BlobFlag::Replaces),
|
||||
(second_key, lsn, 3 * 1024, BlobFlag::None),
|
||||
(second_key, lsn, 4 * 1024, BlobFlag::Replaces), // First in read 2
|
||||
(second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2
|
||||
];
|
||||
|
||||
let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
|
||||
|
||||
let mut planner = VectoredReadPlanner::new(max_read_size);
|
||||
for (key, lsn, offset, flag) in blob_descriptions.clone() {
|
||||
planner.handle(key, lsn, offset, flag);
|
||||
}
|
||||
|
||||
planner.handle_range_end(6 * 1024);
|
||||
|
||||
let reads = planner.finish();
|
||||
assert_eq!(reads.len(), 2);
|
||||
|
||||
for (idx, read) in reads.iter().enumerate() {
|
||||
validate_read(read, ranges[idx]);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -562,18 +562,7 @@ impl VirtualFile {
|
||||
B: IoBufMut + Send,
|
||||
{
|
||||
let (buf, res) =
|
||||
read_exact_at_impl(buf, offset, None, |buf, offset| self.read_at(buf, offset)).await;
|
||||
res.map(|()| buf)
|
||||
}
|
||||
|
||||
pub async fn read_exact_at_n<B>(&self, buf: B, offset: u64, count: usize) -> Result<B, Error>
|
||||
where
|
||||
B: IoBufMut + Send,
|
||||
{
|
||||
let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| {
|
||||
self.read_at(buf, offset)
|
||||
})
|
||||
.await;
|
||||
read_exact_at_impl(buf, offset, |buf, offset| self.read_at(buf, offset)).await;
|
||||
res.map(|()| buf)
|
||||
}
|
||||
|
||||
@@ -707,7 +696,6 @@ impl VirtualFile {
|
||||
pub async fn read_exact_at_impl<B, F, Fut>(
|
||||
buf: B,
|
||||
mut offset: u64,
|
||||
count: Option<usize>,
|
||||
mut read_at: F,
|
||||
) -> (B, std::io::Result<()>)
|
||||
where
|
||||
@@ -715,15 +703,7 @@ where
|
||||
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
|
||||
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
|
||||
{
|
||||
let mut buf: tokio_epoll_uring::Slice<B> = match count {
|
||||
Some(count) => {
|
||||
assert!(count <= buf.bytes_total());
|
||||
assert!(count > 0);
|
||||
buf.slice(..count) // may include uninitialized memory
|
||||
}
|
||||
None => buf.slice_full(), // includes all the uninitialized memory
|
||||
};
|
||||
|
||||
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory
|
||||
while buf.bytes_total() != 0 {
|
||||
let res;
|
||||
(buf, res) = read_at(buf, offset).await;
|
||||
@@ -813,7 +793,7 @@ mod test_read_exact_at_impl {
|
||||
result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
|
||||
}]),
|
||||
}));
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
@@ -822,33 +802,13 @@ mod test_read_exact_at_impl {
|
||||
assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_with_count() {
|
||||
let buf = Vec::with_capacity(5);
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::from(vec![Expectation {
|
||||
offset: 0,
|
||||
bytes_total: 3,
|
||||
result: Ok(vec![b'a', b'b', b'c']),
|
||||
}]),
|
||||
}));
|
||||
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, Some(3), |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
.await;
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(buf, vec![b'a', b'b', b'c']);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_empty_buf_issues_no_syscall() {
|
||||
let buf = Vec::new();
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::new(),
|
||||
}));
|
||||
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
|
||||
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
@@ -873,7 +833,7 @@ mod test_read_exact_at_impl {
|
||||
},
|
||||
]),
|
||||
}));
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
@@ -904,7 +864,7 @@ mod test_read_exact_at_impl {
|
||||
},
|
||||
]),
|
||||
}));
|
||||
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
|
||||
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::{
|
||||
console,
|
||||
error::{ReportableError, UserFacingError},
|
||||
};
|
||||
use std::io;
|
||||
use std::{io, net::IpAddr};
|
||||
use thiserror::Error;
|
||||
|
||||
/// Convenience wrapper for the authentication error.
|
||||
@@ -62,10 +62,11 @@ pub enum AuthErrorImpl {
|
||||
Io(#[from] io::Error),
|
||||
|
||||
#[error(
|
||||
"This IP address is not allowed to connect to this endpoint. \
|
||||
Please add it to the allowed list in the Neon console."
|
||||
"This IP address {0} is not allowed to connect to this endpoint. \
|
||||
Please add it to the allowed list in the Neon console. \
|
||||
Make sure to check for IPv4 or IPv6 addresses."
|
||||
)]
|
||||
IpAddressNotAllowed,
|
||||
IpAddressNotAllowed(IpAddr),
|
||||
|
||||
#[error("Too many connections to this endpoint. Please try again later.")]
|
||||
TooManyConnections,
|
||||
@@ -87,8 +88,8 @@ impl AuthError {
|
||||
AuthErrorImpl::AuthFailed(user.into()).into()
|
||||
}
|
||||
|
||||
pub fn ip_address_not_allowed() -> Self {
|
||||
AuthErrorImpl::IpAddressNotAllowed.into()
|
||||
pub fn ip_address_not_allowed(ip: IpAddr) -> Self {
|
||||
AuthErrorImpl::IpAddressNotAllowed(ip).into()
|
||||
}
|
||||
|
||||
pub fn too_many_connections() -> Self {
|
||||
@@ -122,7 +123,7 @@ impl UserFacingError for AuthError {
|
||||
MalformedPassword(_) => self.to_string(),
|
||||
MissingEndpointName => self.to_string(),
|
||||
Io(_) => "Internal error".to_string(),
|
||||
IpAddressNotAllowed => self.to_string(),
|
||||
IpAddressNotAllowed(_) => self.to_string(),
|
||||
TooManyConnections => self.to_string(),
|
||||
UserTimeout(_) => self.to_string(),
|
||||
}
|
||||
@@ -141,7 +142,7 @@ impl ReportableError for AuthError {
|
||||
MalformedPassword(_) => crate::error::ErrorKind::User,
|
||||
MissingEndpointName => crate::error::ErrorKind::User,
|
||||
Io(_) => crate::error::ErrorKind::ClientDisconnect,
|
||||
IpAddressNotAllowed => crate::error::ErrorKind::User,
|
||||
IpAddressNotAllowed(_) => crate::error::ErrorKind::User,
|
||||
TooManyConnections => crate::error::ErrorKind::RateLimit,
|
||||
UserTimeout(_) => crate::error::ErrorKind::User,
|
||||
}
|
||||
|
||||
@@ -209,7 +209,7 @@ async fn auth_quirks(
|
||||
|
||||
// check allowed list
|
||||
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
|
||||
return Err(auth::AuthError::ip_address_not_allowed());
|
||||
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr));
|
||||
}
|
||||
let cached_secret = match maybe_secret {
|
||||
Some(secret) => secret,
|
||||
|
||||
@@ -32,7 +32,7 @@ impl PoolingBackend {
|
||||
let backend = self.config.auth_backend.as_ref().map(|_| user_info.clone());
|
||||
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
|
||||
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
|
||||
return Err(AuthError::ip_address_not_allowed());
|
||||
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr));
|
||||
}
|
||||
let cached_secret = match maybe_secret {
|
||||
Some(secret) => secret,
|
||||
|
||||
@@ -512,7 +512,7 @@ class NeonEnvBuilder:
|
||||
|
||||
def init_start(
|
||||
self,
|
||||
initial_tenant_conf: Optional[Dict[str, str]] = None,
|
||||
initial_tenant_conf: Optional[Dict[str, Any]] = None,
|
||||
default_remote_storage_if_missing: bool = True,
|
||||
initial_tenant_shard_count: Optional[int] = None,
|
||||
initial_tenant_shard_stripe_size: Optional[int] = None,
|
||||
@@ -1497,7 +1497,7 @@ class NeonCli(AbstractNeonCli):
|
||||
self,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
conf: Optional[Dict[str, str]] = None,
|
||||
conf: Optional[Dict[str, Any]] = None,
|
||||
shard_count: Optional[int] = None,
|
||||
shard_stripe_size: Optional[int] = None,
|
||||
set_default: bool = False,
|
||||
|
||||
@@ -395,12 +395,20 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
timestamp: datetime,
|
||||
done_if_after: datetime,
|
||||
shard_counts: Optional[List[int]] = None,
|
||||
):
|
||||
"""
|
||||
Issues a request to perform time travel operations on the remote storage
|
||||
"""
|
||||
|
||||
if shard_counts is None:
|
||||
shard_counts = []
|
||||
body: Dict[str, Any] = {
|
||||
"shard_counts": shard_counts,
|
||||
}
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/time_travel_remote_storage?travel_to={timestamp.isoformat()}Z&done_if_after={done_if_after.isoformat()}Z"
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/time_travel_remote_storage?travel_to={timestamp.isoformat()}Z&done_if_after={done_if_after.isoformat()}Z",
|
||||
json=body,
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
|
||||
@@ -219,6 +219,7 @@ def wait_for_last_record_lsn(
|
||||
def wait_for_upload_queue_empty(
|
||||
pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
|
||||
):
|
||||
wait_period_secs = 0.2
|
||||
while True:
|
||||
all_metrics = pageserver_http.get_metrics()
|
||||
started = all_metrics.query_all(
|
||||
@@ -235,7 +236,7 @@ def wait_for_upload_queue_empty(
|
||||
"timeline_id": str(timeline_id),
|
||||
},
|
||||
)
|
||||
assert len(started) == len(finished)
|
||||
|
||||
# this is `started left join finished`; if match, subtracting start from finished, resulting in queue depth
|
||||
remaining_labels = ["shard_id", "file_kind", "op_kind"]
|
||||
tl: List[Tuple[Any, float]] = []
|
||||
@@ -256,7 +257,7 @@ def wait_for_upload_queue_empty(
|
||||
log.info(f" {labels}: {queue_count}")
|
||||
if all(queue_count == 0 for (_, queue_count) in tl):
|
||||
return
|
||||
time.sleep(0.2)
|
||||
time.sleep(wait_period_secs)
|
||||
|
||||
|
||||
def wait_timeline_detail_404(
|
||||
@@ -482,8 +483,8 @@ def tenant_delete_wait_completed(
|
||||
MANY_SMALL_LAYERS_TENANT_CONFIG = {
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
"checkpoint_distance": f"{1024**2}",
|
||||
"image_creation_threshold": "100",
|
||||
"checkpoint_distance": 1024**2,
|
||||
"image_creation_threshold": 100,
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,195 +0,0 @@
|
||||
import asyncio
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
|
||||
from fixtures.utils import get_scale_for_db, humantime_to_ms
|
||||
|
||||
from performance.pageserver.util import (
|
||||
setup_pageserver_with_tenants,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("duration", [30])
|
||||
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(200)])
|
||||
@pytest.mark.parametrize("n_tenants", [10])
|
||||
@pytest.mark.parametrize("get_vectored_impl", ["sequential", "vectored"])
|
||||
@pytest.mark.timeout(1000)
|
||||
def test_basebackup_with_high_slru_count(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
pg_bin: PgBin,
|
||||
get_vectored_impl: str,
|
||||
n_tenants: int,
|
||||
pgbench_scale: int,
|
||||
duration: int,
|
||||
):
|
||||
def record(metric, **kwargs):
|
||||
zenbenchmark.record(metric_name=f"pageserver_basebackup.{metric}", **kwargs)
|
||||
|
||||
params: Dict[str, Tuple[Any, Dict[str, Any]]] = {}
|
||||
|
||||
# params from fixtures
|
||||
params.update(
|
||||
{
|
||||
"n_tenants": (n_tenants, {"unit": ""}),
|
||||
"pgbench_scale": (pgbench_scale, {"unit": ""}),
|
||||
"duration": (duration, {"unit": "s"}),
|
||||
}
|
||||
)
|
||||
|
||||
# configure cache sizes like in prod
|
||||
page_cache_size = 16384
|
||||
max_file_descriptors = 500000
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}; "
|
||||
f"get_vectored_impl='{get_vectored_impl}'; validate_vectored_get=false"
|
||||
)
|
||||
params.update(
|
||||
{
|
||||
"pageserver_config_override.page_cache_size": (
|
||||
page_cache_size * 8192,
|
||||
{"unit": "byte"},
|
||||
),
|
||||
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
|
||||
}
|
||||
)
|
||||
|
||||
for param, (value, kwargs) in params.items():
|
||||
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
|
||||
|
||||
n_txns = 500000
|
||||
|
||||
def setup_wrapper(env: NeonEnv):
|
||||
return setup_tenant_template(env, n_txns)
|
||||
|
||||
env = setup_pageserver_with_tenants(
|
||||
neon_env_builder, f"large_slru_count-{n_tenants}-{n_txns}", n_tenants, setup_wrapper
|
||||
)
|
||||
run_benchmark(env, pg_bin, record, duration)
|
||||
|
||||
|
||||
def setup_tenant_template(env: NeonEnv, n_txns: int):
|
||||
config = {
|
||||
"gc_period": "0s", # disable periodic gc
|
||||
"checkpoint_timeout": "10 years",
|
||||
"compaction_period": "0s", # disable periodic compaction
|
||||
"compaction_threshold": 10,
|
||||
"compaction_target_size": 134217728,
|
||||
"checkpoint_distance": 268435456,
|
||||
"image_creation_threshold": 3,
|
||||
}
|
||||
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
|
||||
env.pageserver.tenant_detach(template_tenant)
|
||||
env.pageserver.allowed_errors.append(
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
env.pageserver.tenant_attach(template_tenant, config)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
with env.endpoints.create_start(
|
||||
"main", tenant_id=template_tenant, config_lines=["shared_buffers=1MB"]
|
||||
) as ep:
|
||||
rels = 10
|
||||
|
||||
asyncio.run(run_updates(ep, n_txns, rels))
|
||||
|
||||
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
|
||||
ps_http.timeline_checkpoint(template_tenant, template_timeline)
|
||||
ps_http.timeline_compact(template_tenant, template_timeline)
|
||||
|
||||
return (template_tenant, template_timeline, config)
|
||||
|
||||
|
||||
# Takes about 5 minutes and produces tenants with around 300 SLRU blocks
|
||||
# of 8 KiB each.
|
||||
async def run_updates(ep: Endpoint, n_txns: int, workers_count: int):
|
||||
workers = []
|
||||
for i in range(workers_count):
|
||||
workers.append(asyncio.create_task(run_update_loop_worker(ep, n_txns, i)))
|
||||
|
||||
await asyncio.gather(*workers)
|
||||
|
||||
|
||||
async def run_update_loop_worker(ep: Endpoint, n_txns: int, idx: int):
|
||||
table = f"t_{idx}"
|
||||
conn = await ep.connect_async()
|
||||
await conn.execute(f"CREATE TABLE {table} (pk integer PRIMARY KEY, x integer)")
|
||||
await conn.execute(f"ALTER TABLE {table} SET (autovacuum_enabled = false)")
|
||||
await conn.execute(f"INSERT INTO {table} VALUES (1, 0)")
|
||||
await conn.execute(
|
||||
"""
|
||||
CREATE PROCEDURE updating{0}() as
|
||||
$$
|
||||
DECLARE
|
||||
i integer;
|
||||
BEGIN
|
||||
FOR i IN 1..{1} LOOP
|
||||
UPDATE {0} SET x = x + 1 WHERE pk=1;
|
||||
COMMIT;
|
||||
END LOOP;
|
||||
END
|
||||
$$ LANGUAGE plpgsql
|
||||
""".format(table, n_txns)
|
||||
)
|
||||
await conn.execute("SET statement_timeout=0")
|
||||
await conn.execute(f"call updating{table}()")
|
||||
|
||||
|
||||
def run_benchmark(env: NeonEnv, pg_bin: PgBin, record, duration_secs: int):
|
||||
ps_http = env.pageserver.http_client()
|
||||
cmd = [
|
||||
str(env.neon_binpath / "pagebench"),
|
||||
"basebackup",
|
||||
"--mgmt-api-endpoint",
|
||||
ps_http.base_url,
|
||||
"--page-service-connstring",
|
||||
env.pageserver.connstr(password=None),
|
||||
"--gzip-probability",
|
||||
"1",
|
||||
"--runtime",
|
||||
f"{duration_secs}s",
|
||||
# don't specify the targets explicitly, let pagebench auto-discover them
|
||||
]
|
||||
|
||||
log.info(f"command: {' '.join(cmd)}")
|
||||
basepath = pg_bin.run_capture(cmd, with_command_header=False)
|
||||
results_path = Path(basepath + ".stdout")
|
||||
log.info(f"Benchmark results at: {results_path}")
|
||||
|
||||
with open(results_path, "r") as f:
|
||||
results = json.load(f)
|
||||
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
|
||||
|
||||
total = results["total"]
|
||||
metric = "request_count"
|
||||
record(
|
||||
metric,
|
||||
metric_value=total[metric],
|
||||
unit="",
|
||||
report=MetricReport.HIGHER_IS_BETTER,
|
||||
)
|
||||
|
||||
metric = "latency_mean"
|
||||
record(
|
||||
metric,
|
||||
metric_value=humantime_to_ms(total[metric]),
|
||||
unit="ms",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
metric = "latency_percentiles"
|
||||
for k, v in total[metric].items():
|
||||
record(
|
||||
f"{metric}.{k}",
|
||||
metric_value=humantime_to_ms(v),
|
||||
unit="ms",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
@@ -3,6 +3,7 @@ import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Tuple
|
||||
|
||||
import fixtures.pageserver.many_tenants as many_tenants
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
@@ -14,9 +15,7 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.utils import get_scale_for_db, humantime_to_ms
|
||||
|
||||
from performance.pageserver.util import (
|
||||
setup_pageserver_with_tenants,
|
||||
)
|
||||
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
|
||||
|
||||
|
||||
# For reference, the space usage of the snapshots:
|
||||
@@ -81,72 +80,10 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
|
||||
|
||||
for param, (value, kwargs) in params.items():
|
||||
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
|
||||
|
||||
def setup_wrapper(env: NeonEnv):
|
||||
return setup_tenant_template(env, pg_bin, pgbench_scale)
|
||||
|
||||
env = setup_pageserver_with_tenants(
|
||||
neon_env_builder,
|
||||
f"max_throughput_latest_lsn-{n_tenants}-{pgbench_scale}",
|
||||
n_tenants,
|
||||
setup_wrapper,
|
||||
)
|
||||
env = setup_pageserver_with_pgbench_tenants(neon_env_builder, pg_bin, n_tenants, pgbench_scale)
|
||||
run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration)
|
||||
|
||||
|
||||
def setup_tenant_template(env: NeonEnv, pg_bin: PgBin, scale: int):
|
||||
# use a config that makes production of on-disk state timing-insensitive
|
||||
# as we ingest data into the tenant.
|
||||
config = {
|
||||
"gc_period": "0s", # disable periodic gc
|
||||
"checkpoint_timeout": "10 years",
|
||||
"compaction_period": "0s", # disable periodic compaction
|
||||
"compaction_threshold": 10,
|
||||
"compaction_target_size": 134217728,
|
||||
"checkpoint_distance": 268435456,
|
||||
"image_creation_threshold": 3,
|
||||
}
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
|
||||
env.pageserver.tenant_detach(template_tenant)
|
||||
env.pageserver.allowed_errors.append(
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
env.pageserver.tenant_attach(template_tenant, config)
|
||||
ps_http = env.pageserver.http_client()
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
|
||||
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
|
||||
ps_http.timeline_checkpoint(template_tenant, template_timeline)
|
||||
ps_http.timeline_compact(template_tenant, template_timeline)
|
||||
for _ in range(
|
||||
0, 17
|
||||
): # some prime number to avoid potential resonances with the "_threshold" variables from the config
|
||||
# the L0s produced by this appear to have size ~5MiB
|
||||
num_txns = 10_000
|
||||
pg_bin.run_capture(
|
||||
["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()]
|
||||
)
|
||||
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
|
||||
ps_http.timeline_checkpoint(template_tenant, template_timeline)
|
||||
ps_http.timeline_compact(template_tenant, template_timeline)
|
||||
# for reference, the output at scale=6 looked like so (306M total)
|
||||
# ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59
|
||||
# total 306M
|
||||
# 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829
|
||||
# 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919
|
||||
# 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71
|
||||
# 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791
|
||||
# 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1
|
||||
# 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9
|
||||
# 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639
|
||||
# 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799
|
||||
# 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19
|
||||
# 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021
|
||||
|
||||
return (template_tenant, template_timeline, config)
|
||||
|
||||
|
||||
def run_benchmark_max_throughput_latest_lsn(
|
||||
env: NeonEnv, pg_bin: PgBin, record, duration_secs: int
|
||||
):
|
||||
@@ -201,3 +138,78 @@ def run_benchmark_max_throughput_latest_lsn(
|
||||
unit="ms",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
|
||||
def setup_pageserver_with_pgbench_tenants(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
n_tenants: int,
|
||||
scale: int,
|
||||
) -> NeonEnv:
|
||||
"""
|
||||
Utility function to set up a pageserver with a given number of identical tenants.
|
||||
Each tenant is a pgbench tenant, initialize to a certain scale, and treated afterwards
|
||||
with a repeat application of (pgbench simple-update workload, checkpoint, compact).
|
||||
"""
|
||||
|
||||
def setup_template(env: NeonEnv):
|
||||
# use a config that makes production of on-disk state timing-insensitive
|
||||
# as we ingest data into the tenant.
|
||||
config = {
|
||||
"gc_period": "0s", # disable periodic gc
|
||||
"checkpoint_timeout": "10 years",
|
||||
"compaction_period": "0s", # disable periodic compaction
|
||||
"compaction_threshold": 10,
|
||||
"compaction_target_size": 134217728,
|
||||
"checkpoint_distance": 268435456,
|
||||
"image_creation_threshold": 3,
|
||||
}
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
|
||||
env.pageserver.tenant_detach(template_tenant)
|
||||
env.pageserver.allowed_errors.append(
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
env.pageserver.tenant_attach(template_tenant, config)
|
||||
ps_http = env.pageserver.http_client()
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
|
||||
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
|
||||
ps_http.timeline_checkpoint(template_tenant, template_timeline)
|
||||
ps_http.timeline_compact(template_tenant, template_timeline)
|
||||
for _ in range(
|
||||
0, 17
|
||||
): # some prime number to avoid potential resonances with the "_threshold" variables from the config
|
||||
# the L0s produced by this appear to have size ~5MiB
|
||||
num_txns = 10_000
|
||||
pg_bin.run_capture(
|
||||
["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()]
|
||||
)
|
||||
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
|
||||
ps_http.timeline_checkpoint(template_tenant, template_timeline)
|
||||
ps_http.timeline_compact(template_tenant, template_timeline)
|
||||
# for reference, the output at scale=6 looked like so (306M total)
|
||||
# ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59
|
||||
# total 306M
|
||||
# 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829
|
||||
# 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919
|
||||
# 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71
|
||||
# 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791
|
||||
# 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1
|
||||
# 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9
|
||||
# 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639
|
||||
# 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799
|
||||
# 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19
|
||||
# 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021
|
||||
|
||||
return (template_tenant, template_timeline, config)
|
||||
|
||||
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
|
||||
return many_tenants.single_timeline(neon_env_builder, setup_template, n_tenants)
|
||||
|
||||
env = neon_env_builder.build_and_use_snapshot(
|
||||
f"max_throughput_latest_lsn-{n_tenants}-{scale}", doit
|
||||
)
|
||||
env.start()
|
||||
ensure_pageserver_ready_for_benchmarking(env, n_tenants)
|
||||
return env
|
||||
|
||||
@@ -2,16 +2,9 @@
|
||||
Utilities used by all code in this sub-directory
|
||||
"""
|
||||
|
||||
from typing import Any, Callable, Dict, Tuple
|
||||
|
||||
import fixtures.pageserver.many_tenants as many_tenants
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.pageserver.utils import wait_until_all_tenants_state
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
|
||||
@@ -34,24 +27,3 @@ def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
|
||||
assert not layer.remote
|
||||
|
||||
log.info("ready")
|
||||
|
||||
|
||||
def setup_pageserver_with_tenants(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
name: str,
|
||||
n_tenants: int,
|
||||
setup: Callable[[NeonEnv], Tuple[TenantId, TimelineId, Dict[str, Any]]],
|
||||
) -> NeonEnv:
|
||||
"""
|
||||
Utility function to set up a pageserver with a given number of identical tenants.
|
||||
Each tenant is a pgbench tenant, initialize to a certain scale, and treated afterwards
|
||||
with a repeat application of (pgbench simple-update workload, checkpoint, compact).
|
||||
"""
|
||||
|
||||
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
|
||||
return many_tenants.single_timeline(neon_env_builder, setup, n_tenants)
|
||||
|
||||
env = neon_env_builder.build_and_use_snapshot(name, doit)
|
||||
env.start()
|
||||
ensure_pageserver_ready_for_benchmarking(env, n_tenants)
|
||||
return env
|
||||
|
||||
@@ -24,7 +24,7 @@ async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
|
||||
with pytest.raises(psycopg2.Error) as exprinfo:
|
||||
static_proxy.safe_psql(**kwargs)
|
||||
text = str(exprinfo.value).strip()
|
||||
assert "This IP address is not allowed to connect" in text
|
||||
assert "not allowed to connect" in text
|
||||
|
||||
# no SNI, deprecated `options=project` syntax (before we had several endpoint in project)
|
||||
check_cannot_connect(query="select 1", sslsni=0, options="project=private-project")
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
import os
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
@@ -286,6 +289,12 @@ def test_sharding_split_smoke(
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
# The quantity of data isn't huge, but debug can be _very_ slow, and the things we're
|
||||
# validating in this test don't benefit much from debug assertions.
|
||||
os.getenv("BUILD_TYPE") == "debug",
|
||||
reason="Avoid running bulkier ingest tests in debug mode",
|
||||
)
|
||||
def test_sharding_ingest(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
@@ -319,10 +328,10 @@ def test_sharding_ingest(
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(512, upload=False)
|
||||
workload.write_rows(512, upload=False)
|
||||
workload.write_rows(512, upload=False)
|
||||
workload.write_rows(512, upload=False)
|
||||
workload.write_rows(4096, upload=False)
|
||||
workload.write_rows(4096, upload=False)
|
||||
workload.write_rows(4096, upload=False)
|
||||
workload.write_rows(4096, upload=False)
|
||||
workload.validate()
|
||||
|
||||
small_layer_count = 0
|
||||
@@ -361,7 +370,12 @@ def test_sharding_ingest(
|
||||
# - Because we roll layers on checkpoint_distance * shard_count, we expect to obey the target
|
||||
# layer size on average, but it is still possible to write some tiny layers.
|
||||
log.info(f"Totals: {small_layer_count} small layers, {ok_layer_count} ok layers")
|
||||
assert float(small_layer_count) / float(ok_layer_count) < 0.25
|
||||
if small_layer_count <= shard_count:
|
||||
# If each shard has <= 1 small layer
|
||||
pass
|
||||
else:
|
||||
# General case:
|
||||
assert float(small_layer_count) / float(ok_layer_count) < 0.25
|
||||
|
||||
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
|
||||
assert huge_layer_count <= shard_count
|
||||
|
||||
@@ -1,13 +1,30 @@
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from typing import List
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import tenant_delete_wait_completed, timeline_delete_wait_completed
|
||||
from fixtures.pageserver.utils import (
|
||||
MANY_SMALL_LAYERS_TENANT_CONFIG,
|
||||
enable_remote_storage_versioning,
|
||||
list_prefix,
|
||||
remote_storage_delete_key,
|
||||
tenant_delete_wait_completed,
|
||||
timeline_delete_wait_completed,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.utils import run_pg_bench_small, wait_until
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
ObjectTypeDef,
|
||||
)
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
@@ -457,3 +474,113 @@ def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder):
|
||||
# Check that the 'drop' APIs didn't leave things in a state that would fail a consistency check: they're
|
||||
# meant to be unclean wrt the pageserver state, but not leave a broken storage controller behind.
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
|
||||
def test_sharding_service_s3_time_travel_recovery(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
"""
|
||||
Test for S3 time travel
|
||||
"""
|
||||
|
||||
remote_storage_kind = s3_storage()
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
# Mock S3 doesn't have versioning enabled by default, enable it
|
||||
# (also do it before there is any writes to the bucket)
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
remote_storage = neon_env_builder.pageserver_remote_storage
|
||||
assert remote_storage, "remote storage not configured"
|
||||
enable_remote_storage_versioning(remote_storage)
|
||||
|
||||
neon_env_builder.num_pageservers = 1
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
virtual_ps_http = PageserverHttpClient(env.attachment_service_port, lambda: True)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.attachment_service.tenant_create(
|
||||
tenant_id,
|
||||
shard_count=2,
|
||||
shard_stripe_size=8192,
|
||||
tenant_config=MANY_SMALL_LAYERS_TENANT_CONFIG,
|
||||
)
|
||||
|
||||
# Check that the consistency check passes
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
branch_name = "main"
|
||||
timeline_id = env.neon_cli.create_timeline(
|
||||
branch_name,
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
# Write some nontrivial amount of data into the endpoint and wait until it is uploaded
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
run_pg_bench_small(pg_bin, endpoint.connstr())
|
||||
endpoint.safe_psql("CREATE TABLE created_foo(id integer);")
|
||||
# last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
# Give the data time to be uploaded
|
||||
time.sleep(4)
|
||||
|
||||
# Detach the tenant
|
||||
virtual_ps_http.tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "Detached",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": None,
|
||||
},
|
||||
)
|
||||
|
||||
time.sleep(4)
|
||||
ts_before_disaster = datetime.now(tz=timezone.utc).replace(tzinfo=None)
|
||||
time.sleep(4)
|
||||
|
||||
# Simulate a "disaster": delete some random files from remote storage for one of the shards
|
||||
assert env.pageserver_remote_storage
|
||||
shard_id_for_list = "0002"
|
||||
objects: List[ObjectTypeDef] = list_prefix(
|
||||
env.pageserver_remote_storage,
|
||||
f"tenants/{tenant_id}-{shard_id_for_list}/timelines/{timeline_id}/",
|
||||
).get("Contents", [])
|
||||
assert len(objects) > 1
|
||||
log.info(f"Found {len(objects)} objects in remote storage")
|
||||
should_delete = False
|
||||
for obj in objects:
|
||||
obj_key = obj["Key"]
|
||||
should_delete = not should_delete
|
||||
if not should_delete:
|
||||
log.info(f"Keeping key on remote storage: {obj_key}")
|
||||
continue
|
||||
log.info(f"Deleting key from remote storage: {obj_key}")
|
||||
remote_storage_delete_key(env.pageserver_remote_storage, obj_key)
|
||||
pass
|
||||
|
||||
time.sleep(4)
|
||||
ts_after_disaster = datetime.now(tz=timezone.utc).replace(tzinfo=None)
|
||||
time.sleep(4)
|
||||
|
||||
# Do time travel recovery
|
||||
virtual_ps_http.tenant_time_travel_remote_storage(
|
||||
tenant_id, ts_before_disaster, ts_after_disaster, shard_counts=[2]
|
||||
)
|
||||
time.sleep(4)
|
||||
|
||||
# Attach the tenant again
|
||||
virtual_ps_http.tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "AttachedSingle",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": 100,
|
||||
},
|
||||
)
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
endpoint.safe_psql("SELECT * FROM created_foo;")
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
@@ -74,7 +74,6 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
|
||||
}
|
||||
prev = Some(req);
|
||||
}
|
||||
PagestreamFeMessage::GetVectoredPages(_) => {}
|
||||
PagestreamFeMessage::DbSize(_) => {}
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user