Compare commits

..

2 Commits

Author SHA1 Message Date
Conrad Ludgate
4c78a5067f compress cache key 2024-06-28 09:12:18 +01:00
Conrad Ludgate
108f08f982 proxy: cache a compressed version of the node info 2024-06-28 09:04:54 +01:00
60 changed files with 573 additions and 1959 deletions

View File

@@ -21,8 +21,10 @@ use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
};
use pageserver_api::controller_api::{PlacementPolicy, TenantCreateRequest};
use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInfo};
use pageserver_api::controller_api::PlacementPolicy;
use pageserver_api::models::{
ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;

View File

@@ -17,7 +17,8 @@ use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use futures::SinkExt;
use pageserver_api::models::{
self, AuxFilePolicy, LocationConfig, TenantHistorySize, TenantInfo, TimelineInfo,
self, AuxFilePolicy, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo,
TimelineInfo,
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
@@ -396,6 +397,28 @@ impl PageServerNode {
}
}
pub async fn tenant_create(
&self,
new_tenant_id: TenantId,
generation: Option<u32>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<TenantId> {
let config = Self::parse_config(settings.clone())?;
let request = models::TenantCreateRequest {
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
generation,
config,
shard_parameters: ShardParameters::default(),
// Placement policy is not meaningful for creations not done via storage controller
placement_policy: None,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
}
Ok(self.http_client.tenant_create(&request).await?)
}
pub async fn tenant_config(
&self,
tenant_id: TenantId,

View File

@@ -5,11 +5,12 @@ use crate::{
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::{
controller_api::{
NodeConfigureRequest, NodeRegisterRequest, TenantCreateRequest, TenantCreateResponse,
TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse,
NodeConfigureRequest, NodeRegisterRequest, TenantCreateResponse, TenantLocateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse,
},
models::{
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
TimelineCreateRequest, TimelineInfo,
},
shard::{ShardStripeSize, TenantShardId},
};

View File

@@ -4,13 +4,13 @@ use std::{str::FromStr, time::Duration};
use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy,
TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
ShardParameters, TenantConfig, TenantConfigRequest, TenantShardSplitRequest,
TenantShardSplitResponse,
ShardParameters, TenantConfig, TenantConfigRequest, TenantCreateRequest,
TenantShardSplitRequest, TenantShardSplitResponse,
},
shard::{ShardStripeSize, TenantShardId},
};
@@ -336,18 +336,14 @@ async fn main() -> anyhow::Result<()> {
.await?;
}
Command::TenantCreate { tenant_id } => {
storcon_client
.dispatch(
Method::POST,
"v1/tenant".to_string(),
Some(TenantCreateRequest {
new_tenant_id: TenantShardId::unsharded(tenant_id),
generation: None,
shard_parameters: ShardParameters::default(),
placement_policy: Some(PlacementPolicy::Attached(1)),
config: TenantConfig::default(),
}),
)
vps_client
.tenant_create(&TenantCreateRequest {
new_tenant_id: TenantShardId::unsharded(tenant_id),
generation: None,
shard_parameters: ShardParameters::default(),
placement_policy: Some(PlacementPolicy::Attached(1)),
config: TenantConfig::default(),
})
.await?;
}
Command::TenantDelete { tenant_id } => {

View File

@@ -11,27 +11,6 @@ use crate::{
shard::{ShardStripeSize, TenantShardId},
};
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
pub new_tenant_id: TenantShardId,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,
// If omitted, create a single shard with TenantShardId::unsharded()
#[serde(default)]
#[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
pub shard_parameters: ShardParameters,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub placement_policy: Option<PlacementPolicy>,
#[serde(flatten)]
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
}
#[derive(Serialize, Deserialize)]
pub struct TenantCreateResponseShard {
pub shard_id: TenantShardId,
@@ -301,19 +280,4 @@ mod test {
assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
Ok(())
}
#[test]
fn test_reject_unknown_field() {
let id = TenantId::generate();
let create_request = serde_json::json!({
"new_tenant_id": id.to_string(),
"unknown_field": "unknown_value".to_string(),
});
let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
);
}
}

View File

@@ -25,6 +25,7 @@ use utils::{
serde_system_time,
};
use crate::controller_api::PlacementPolicy;
use crate::{
reltag::RelTag,
shard::{ShardCount, ShardStripeSize, TenantShardId},
@@ -270,6 +271,28 @@ impl Default for ShardParameters {
}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
pub new_tenant_id: TenantShardId,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,
// If omitted, create a single shard with TenantShardId::unsharded()
#[serde(default)]
#[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
pub shard_parameters: ShardParameters,
// This parameter is only meaningful in requests sent to the storage controller
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub placement_policy: Option<PlacementPolicy>,
#[serde(flatten)]
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
}
/// An alternative representation of `pageserver::tenant::TenantConf` with
/// simpler types.
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
@@ -524,6 +547,10 @@ pub struct LocationConfigListResponse {
pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
}
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
pub struct TenantCreateResponse(pub TenantId);
#[derive(Serialize)]
pub struct StatusResponse {
pub id: NodeId,
@@ -1480,6 +1507,18 @@ mod tests {
#[test]
fn test_reject_unknown_field() {
let id = TenantId::generate();
let create_request = json!({
"new_tenant_id": id.to_string(),
"unknown_field": "unknown_value".to_string(),
});
let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
);
let id = TenantId::generate();
let config_request = json!({
"tenant_id": id.to_string(),

View File

@@ -356,28 +356,6 @@ impl CheckPoint {
}
false
}
/// Advance next multi-XID/offset to those given in arguments.
///
/// It's important that this handles wraparound correctly. This should match the
/// MultiXactAdvanceNextMXact() logic in PostgreSQL's xlog_redo() function.
///
/// Returns 'true' if the Checkpoint was updated.
pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
let mut modified = false;
if multi_xid.wrapping_sub(self.nextMulti) as i32 > 0 {
self.nextMulti = multi_xid;
modified = true;
}
if multi_offset.wrapping_sub(self.nextMultiOffset) as i32 > 0 {
self.nextMultiOffset = multi_offset;
modified = true;
}
modified
}
}
/// Generate new, empty WAL segment, with correct block headers at the first

View File

@@ -202,53 +202,6 @@ pub fn test_update_next_xid() {
assert_eq!(checkpoint.nextXid.value, 2048);
}
#[test]
pub fn test_update_next_multixid() {
let checkpoint_buf = [0u8; std::mem::size_of::<CheckPoint>()];
let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
// simple case
checkpoint.nextMulti = 20;
checkpoint.nextMultiOffset = 20;
checkpoint.update_next_multixid(1000, 2000);
assert_eq!(checkpoint.nextMulti, 1000);
assert_eq!(checkpoint.nextMultiOffset, 2000);
// No change
checkpoint.update_next_multixid(500, 900);
assert_eq!(checkpoint.nextMulti, 1000);
assert_eq!(checkpoint.nextMultiOffset, 2000);
// Close to wraparound, but not wrapped around yet
checkpoint.nextMulti = 0xffff0000;
checkpoint.nextMultiOffset = 0xfffe0000;
checkpoint.update_next_multixid(0xffff00ff, 0xfffe00ff);
assert_eq!(checkpoint.nextMulti, 0xffff00ff);
assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
// Wraparound
checkpoint.update_next_multixid(1, 900);
assert_eq!(checkpoint.nextMulti, 1);
assert_eq!(checkpoint.nextMultiOffset, 900);
// Wraparound nextMulti to 0.
//
// It's a bit surprising that nextMulti can be 0, because that's a special value
// (InvalidMultiXactId). However, that's how Postgres does it at multi-xid wraparound:
// nextMulti wraps around to 0, but then when the next multi-xid is assigned, it skips
// the 0 and the next multi-xid actually assigned is 1.
checkpoint.nextMulti = 0xffff0000;
checkpoint.nextMultiOffset = 0xfffe0000;
checkpoint.update_next_multixid(0, 0xfffe00ff);
assert_eq!(checkpoint.nextMulti, 0);
assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
// Wraparound nextMultiOffset to 0
checkpoint.update_next_multixid(0, 0);
assert_eq!(checkpoint.nextMulti, 0);
assert_eq!(checkpoint.nextMultiOffset, 0);
}
#[test]
pub fn test_encode_logical_message() {
let expected = [

View File

@@ -205,6 +205,15 @@ impl Client {
Ok(())
}
pub async fn tenant_create(&self, req: &TenantCreateRequest) -> Result<TenantId> {
let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
self.request(Method::POST, &uri, req)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
/// The tenant deletion API can return 202 if deletion is incomplete, or
/// 404 if it is complete. Callers are responsible for checking the status
/// code and retrying. Error codes other than 404 will return Err().

View File

@@ -53,6 +53,7 @@ use utils::http::request::{get_request_param, must_get_query_param, parse_query_
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::{LocationConf, TenantConfOpt};
@@ -74,12 +75,13 @@ use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
use crate::tenant::SpawnMode;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
TimelineInfo,
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
};
use utils::{
auth::SwappableJwtAuth,
@@ -1235,6 +1237,75 @@ pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>,
Ok(response)
}
/// Helper for requests that may take a generation, which is mandatory
/// when control_plane_api is set, but otherwise defaults to Generation::none()
fn get_request_generation(state: &State, req_gen: Option<u32>) -> Result<Generation, ApiError> {
if state.conf.control_plane_api.is_some() {
req_gen
.map(Generation::new)
.ok_or(ApiError::BadRequest(anyhow!(
"generation attribute missing"
)))
} else {
// Legacy mode: all tenants operate with no generation
Ok(Generation::none())
}
}
async fn tenant_create_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let target_tenant_id = request_data.new_tenant_id;
check_permission(&request, None)?;
let _timer = STORAGE_TIME_GLOBAL
.get_metric_with_label_values(&[StorageTimeOperation::CreateTenant.into()])
.expect("bug")
.start_timer();
let tenant_conf =
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
let state = get_state(&request);
let generation = get_request_generation(state, request_data.generation)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let location_conf =
LocationConf::attached_single(tenant_conf, generation, &request_data.shard_parameters);
let new_tenant = state
.tenant_manager
.upsert_location(
target_tenant_id,
location_conf,
None,
SpawnMode::Create,
&ctx,
)
.await?;
let Some(new_tenant) = new_tenant else {
// This should never happen: indicates a bug in upsert_location
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Upsert succeeded but didn't return tenant!"
)));
};
// We created the tenant. Existing API semantics are that the tenant
// is Active when this function returns.
new_tenant
.wait_to_become_active(ACTIVE_TENANT_TIMEOUT)
.await?;
json_response(
StatusCode::CREATED,
TenantCreateResponse(new_tenant.tenant_shard_id().tenant_id),
)
}
async fn get_tenant_config_handler(
request: Request<Body>,
_cancel: CancellationToken,
@@ -2540,6 +2611,7 @@ pub fn make_router(
api_handler(r, reload_auth_validation_keys_handler)
})
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
.get("/v1/tenant/:tenant_shard_id", |r| {
api_handler(r, tenant_status)
})

View File

@@ -53,6 +53,9 @@ pub(crate) enum StorageTimeOperation {
#[strum(serialize = "find gc cutoffs")]
FindGcCutoffs,
#[strum(serialize = "create tenant")]
CreateTenant,
}
pub(crate) static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {

View File

@@ -213,6 +213,8 @@ pub(crate) enum SpawnMode {
Eager,
/// Lazy activation in the background, with the option to skip the queue if the need comes up
Lazy,
/// Tenant has been created during the lifetime of this process
Create,
}
///
@@ -806,6 +808,9 @@ impl Tenant {
};
let preload = match &mode {
SpawnMode::Create => {
None
},
SpawnMode::Eager | SpawnMode::Lazy => {
let _preload_timer = TENANT.preload.start_timer();
let res = tenant_clone
@@ -827,8 +832,11 @@ impl Tenant {
// We will time the duration of the attach phase unless this is a creation (attach will do no work)
let attached = {
let _attach_timer = Some(TENANT.attach.start_timer());
tenant_clone.attach(preload, &ctx).await
let _attach_timer = match mode {
SpawnMode::Create => None,
SpawnMode::Eager | SpawnMode::Lazy => Some(TENANT.attach.start_timer()),
};
tenant_clone.attach(preload, mode, &ctx).await
};
match attached {
@@ -904,14 +912,21 @@ impl Tenant {
async fn attach(
self: &Arc<Tenant>,
preload: Option<TenantPreload>,
mode: SpawnMode,
ctx: &RequestContext,
) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
failpoint_support::sleep_millis_async!("before-attaching-tenant");
let Some(preload) = preload else {
anyhow::bail!("local-only deployment is no longer supported, https://github.com/neondatabase/neon/issues/5624");
let preload = match (preload, mode) {
(Some(p), _) => p,
(None, SpawnMode::Create) => TenantPreload {
timelines: HashMap::new(),
},
(None, _) => {
anyhow::bail!("local-only deployment is no longer supported, https://github.com/neondatabase/neon/issues/5624");
}
};
let mut timelines_to_resume_deletions = vec![];
@@ -3826,7 +3841,7 @@ pub(crate) mod harness {
let preload = tenant
.preload(&self.remote_storage, CancellationToken::new())
.await?;
tenant.attach(Some(preload), ctx).await?;
tenant.attach(Some(preload), SpawnMode::Eager, ctx).await?;
tenant.state.send_replace(TenantState::Active);
for timeline in tenant.timelines.lock().unwrap().values() {
@@ -6264,7 +6279,7 @@ mod tests {
#[tokio::test]
async fn test_vectored_missing_metadata_key_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads")?;
let harness = TenantHarness::create("test_vectored_missing_data_key_reads")?;
let (tenant, ctx) = harness.load().await;
let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap();

View File

@@ -262,7 +262,6 @@ impl scheduler::RunningJob for RunningDownload {
struct CompleteDownload {
secondary_state: Arc<SecondaryTenant>,
completed_at: Instant,
result: Result<(), UpdateError>,
}
impl scheduler::Completion for CompleteDownload {
@@ -287,33 +286,21 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
let CompleteDownload {
secondary_state,
completed_at: _completed_at,
result,
} = completion;
tracing::debug!("Secondary tenant download completed");
let mut detail = secondary_state.detail.lock().unwrap();
match result {
Err(UpdateError::Restart) => {
// Start downloading again as soon as we can. This will involve waiting for the scheduler's
// scheduling interval. This slightly reduces the peak download speed of tenants that hit their
// deadline and keep restarting, but that also helps give other tenants a chance to execute rather
// that letting one big tenant dominate for a long time.
detail.next_download = Some(Instant::now());
}
_ => {
let period = detail
.last_download
.as_ref()
.map(|d| d.upload_period)
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
let period = detail
.last_download
.as_ref()
.map(|d| d.upload_period)
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
// We advance next_download irrespective of errors: we don't want error cases to result in
// expensive busy-polling.
detail.next_download = Some(Instant::now() + period_jitter(period, 5));
}
}
// We advance next_download irrespective of errors: we don't want error cases to result in
// expensive busy-polling.
detail.next_download = Some(Instant::now() + period_jitter(period, 5));
}
async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
@@ -409,10 +396,9 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
(RunningDownload { barrier }, Box::pin(async move {
let _completion = completion;
let result = TenantDownloader::new(conf, &remote_storage, &secondary_state)
match TenantDownloader::new(conf, &remote_storage, &secondary_state)
.download(&download_ctx)
.await;
match &result
.await
{
Err(UpdateError::NoData) => {
tracing::info!("No heatmap found for tenant. This is fine if it is new.");
@@ -429,9 +415,6 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
tracing::error!("Error while downloading tenant: {e}");
},
Err(UpdateError::Restart) => {
tracing::info!("Download reached deadline & will restart to update heatmap")
}
Ok(()) => {}
};
@@ -453,7 +436,6 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
CompleteDownload {
secondary_state,
completed_at: Instant::now(),
result
}
}.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
}
@@ -470,11 +452,6 @@ struct TenantDownloader<'a> {
/// Errors that may be encountered while updating a tenant
#[derive(thiserror::Error, Debug)]
enum UpdateError {
/// This is not a true failure, but it's how a download indicates that it would like to be restarted by
/// the scheduler, to pick up the latest heatmap
#[error("Reached deadline, restarting downloads")]
Restart,
#[error("No remote data found")]
NoData,
#[error("Insufficient local storage space")]
@@ -626,26 +603,6 @@ impl<'a> TenantDownloader<'a> {
self.prepare_timelines(&heatmap, heatmap_mtime).await?;
}
// Calculate a deadline for downloads: if downloading takes longer than this, it is useful to drop out and start again,
// so that we are always using reasonably a fresh heatmap. Otherwise, if we had really huge content to download, we might
// spend 10s of minutes downloading layers we don't need.
// (see https://github.com/neondatabase/neon/issues/8182)
let deadline = {
let period = self
.secondary_state
.detail
.lock()
.unwrap()
.last_download
.as_ref()
.map(|d| d.upload_period)
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
// Use double the period: we are not promising to complete within the period, this is just a heuristic
// to keep using a "reasonably fresh" heatmap.
Instant::now() + period * 2
};
// Download the layers in the heatmap
for timeline in heatmap.timelines {
let timeline_state = timeline_states
@@ -661,7 +618,7 @@ impl<'a> TenantDownloader<'a> {
}
let timeline_id = timeline.timeline_id;
self.download_timeline(timeline, timeline_state, deadline, ctx)
self.download_timeline(timeline, timeline_state, ctx)
.instrument(tracing::info_span!(
"secondary_download_timeline",
tenant_id=%tenant_shard_id.tenant_id,
@@ -870,28 +827,26 @@ impl<'a> TenantDownloader<'a> {
.and_then(|x| x)
}
/// Download heatmap layers that are not present on local disk, or update their
/// access time if they are already present.
async fn download_timeline_layers(
async fn download_timeline(
&self,
tenant_shard_id: &TenantShardId,
timeline: HeatMapTimeline,
timeline_state: SecondaryDetailTimeline,
deadline: Instant,
ctx: &RequestContext,
) -> (Result<(), UpdateError>, Vec<HeatMapLayer>) {
) -> Result<(), UpdateError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
// Accumulate updates to the state
let mut touched = Vec::new();
tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
if self.secondary_state.cancel.is_cancelled() {
tracing::debug!("Cancelled -- dropping out of layer loop");
return (Err(UpdateError::Cancelled), touched);
}
if Instant::now() > deadline {
// We've been running downloads for a while, restart to download latest heatmap.
return (Err(UpdateError::Restart), touched);
return Err(UpdateError::Cancelled);
}
// Existing on-disk layers: just update their access time.
@@ -961,43 +916,20 @@ impl<'a> TenantDownloader<'a> {
match self
.download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
.await
.await?
{
Ok(Some(layer)) => touched.push(layer),
Ok(None) => {
Some(layer) => touched.push(layer),
None => {
// Not an error but we didn't download it: remote layer is missing. Don't add it to the list of
// things to consider touched.
}
Err(e) => {
return (Err(e), touched);
}
}
}
(Ok(()), touched)
}
async fn download_timeline(
&self,
timeline: HeatMapTimeline,
timeline_state: SecondaryDetailTimeline,
deadline: Instant,
ctx: &RequestContext,
) -> Result<(), UpdateError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
let timeline_id = timeline.timeline_id;
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
let (result, touched) = self
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
.await;
// Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful
// Write updates to state to record layers we just downloaded or touched.
{
let mut detail = self.secondary_state.detail.lock().unwrap();
let timeline_detail = detail.timelines.entry(timeline_id).or_default();
let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
@@ -1011,14 +943,14 @@ impl<'a> TenantDownloader<'a> {
let local_path = local_layer_path(
self.conf,
tenant_shard_id,
&timeline_id,
&timeline.timeline_id,
&t.name,
&t.metadata.generation,
);
e.insert(OnDiskState::new(
self.conf,
tenant_shard_id,
&timeline_id,
&timeline.timeline_id,
t.name,
t.metadata.clone(),
t.access_time,
@@ -1029,7 +961,7 @@ impl<'a> TenantDownloader<'a> {
}
}
result
Ok(())
}
/// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics

View File

@@ -20,7 +20,6 @@ use std::num::NonZeroUsize;
use bytes::BytesMut;
use pageserver_api::key::Key;
use tokio_epoll_uring::BoundedBuf;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;
@@ -317,9 +316,8 @@ impl<'a> VectoredBlobReader<'a> {
);
let buf = self
.file
.read_exact_at(buf.slice(0..read.size()), read.start, ctx)
.await?
.into_inner();
.read_exact_at_n(buf, read.start, read.size(), ctx)
.await?;
let blobs_at = read.blobs_at.as_slice();
let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;

View File

@@ -13,7 +13,7 @@
use crate::context::RequestContext;
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
use crate::page_cache::{PageWriteGuard, PAGE_SZ};
use crate::page_cache::PageWriteGuard;
use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
@@ -48,7 +48,6 @@ pub(crate) mod owned_buffers_io {
//! but for the time being we're proving out the primitives in the neon.git repo
//! for faster iteration.
pub(crate) mod slice;
pub(crate) mod write;
pub(crate) mod util {
pub(crate) mod size_tracking_writer;
@@ -144,17 +143,16 @@ struct SlotInner {
/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
struct PageWriteGuardBuf {
page: PageWriteGuard<'static>,
init_up_to: usize,
}
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
// Page cache pages are zero-initialized, so, wrt uninitialized memory we're good.
// (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.)
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
fn stable_ptr(&self) -> *const u8 {
self.page.as_ptr()
}
fn bytes_init(&self) -> usize {
self.page.len()
self.init_up_to
}
fn bytes_total(&self) -> usize {
self.page.len()
@@ -168,8 +166,8 @@ unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
}
unsafe fn set_init(&mut self, pos: usize) {
// There shouldn't really be any reason to call this API since bytes_init() == bytes_total().
assert!(pos <= self.page.len());
self.init_up_to = pos;
}
}
@@ -587,37 +585,37 @@ impl VirtualFile {
Ok(self.pos)
}
/// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
///
/// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
pub async fn read_exact_at<Buf>(
pub async fn read_exact_at<B>(
&self,
slice: Slice<Buf>,
buf: B,
offset: u64,
ctx: &RequestContext,
) -> Result<Slice<Buf>, Error>
) -> Result<B, Error>
where
Buf: IoBufMut + Send,
B: IoBufMut + Send,
{
let assert_we_return_original_bounds = if cfg!(debug_assertions) {
Some((slice.stable_ptr() as usize, slice.bytes_total()))
} else {
None
};
let (buf, res) = read_exact_at_impl(buf, offset, None, |buf, offset| {
self.read_at(buf, offset, ctx)
})
.await;
res.map(|()| buf)
}
let original_bounds = slice.bounds();
let (buf, res) =
read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await;
let res = res.map(|_| buf.slice(original_bounds));
if let Some(original_bounds) = assert_we_return_original_bounds {
if let Ok(slice) = &res {
let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total());
assert_eq!(original_bounds, returned_bounds);
}
}
res
pub async fn read_exact_at_n<B>(
&self,
buf: B,
offset: u64,
count: usize,
ctx: &RequestContext,
) -> Result<B, Error>
where
B: IoBufMut + Send,
{
let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| {
self.read_at(buf, offset, ctx)
})
.await;
res.map(|()| buf)
}
/// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
@@ -627,11 +625,13 @@ impl VirtualFile {
offset: u64,
ctx: &RequestContext,
) -> Result<PageWriteGuard<'static>, Error> {
let buf = PageWriteGuardBuf { page }.slice_full();
debug_assert_eq!(buf.bytes_total(), PAGE_SZ);
self.read_exact_at(buf, offset, ctx)
.await
.map(|slice| slice.into_inner().page)
let buf = PageWriteGuardBuf {
page,
init_up_to: 0,
};
let res = self.read_exact_at(buf, offset, ctx).await;
res.map(|PageWriteGuardBuf { page, .. }| page)
.map_err(|e| Error::new(ErrorKind::Other, e))
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
@@ -722,14 +722,14 @@ impl VirtualFile {
(buf, Ok(n))
}
pub(crate) async fn read_at<Buf>(
pub(crate) async fn read_at<B>(
&self,
buf: tokio_epoll_uring::Slice<Buf>,
buf: B,
offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
) -> (B, Result<usize, Error>)
where
Buf: tokio_epoll_uring::IoBufMut + Send,
B: tokio_epoll_uring::BoundedBufMut + Send,
{
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
@@ -781,16 +781,26 @@ impl VirtualFile {
}
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub async fn read_exact_at_impl<Buf, F, Fut>(
mut buf: tokio_epoll_uring::Slice<Buf>,
pub async fn read_exact_at_impl<B, F, Fut>(
buf: B,
mut offset: u64,
count: Option<usize>,
mut read_at: F,
) -> (Buf, std::io::Result<()>)
) -> (B, std::io::Result<()>)
where
Buf: IoBufMut + Send,
F: FnMut(tokio_epoll_uring::Slice<Buf>, u64) -> Fut,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<Buf>, std::io::Result<usize>)>,
B: IoBufMut + Send,
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
{
let mut buf: tokio_epoll_uring::Slice<B> = match count {
Some(count) => {
assert!(count <= buf.bytes_total());
assert!(count > 0);
buf.slice(..count) // may include uninitialized memory
}
None => buf.slice_full(), // includes all the uninitialized memory
};
while buf.bytes_total() != 0 {
let res;
(buf, res) = read_at(buf, offset).await;
@@ -872,7 +882,7 @@ mod test_read_exact_at_impl {
#[tokio::test]
async fn test_basic() {
let buf = Vec::with_capacity(5).slice_full();
let buf = Vec::with_capacity(5);
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![Expectation {
offset: 0,
@@ -880,7 +890,7 @@ mod test_read_exact_at_impl {
result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
}]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -889,13 +899,33 @@ mod test_read_exact_at_impl {
assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
}
#[tokio::test]
async fn test_with_count() {
let buf = Vec::with_capacity(5);
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![Expectation {
offset: 0,
bytes_total: 3,
result: Ok(vec![b'a', b'b', b'c']),
}]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, Some(3), |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
.await;
assert!(res.is_ok());
assert_eq!(buf, vec![b'a', b'b', b'c']);
}
#[tokio::test]
async fn test_empty_buf_issues_no_syscall() {
let buf = Vec::new().slice_full();
let buf = Vec::new();
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::new(),
}));
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -905,7 +935,7 @@ mod test_read_exact_at_impl {
#[tokio::test]
async fn test_two_read_at_calls_needed_until_buf_filled() {
let buf = Vec::with_capacity(4).slice_full();
let buf = Vec::with_capacity(4);
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![
Expectation {
@@ -920,7 +950,7 @@ mod test_read_exact_at_impl {
},
]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -931,7 +961,7 @@ mod test_read_exact_at_impl {
#[tokio::test]
async fn test_eof_before_buffer_full() {
let buf = Vec::with_capacity(3).slice_full();
let buf = Vec::with_capacity(3);
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![
Expectation {
@@ -951,7 +981,7 @@ mod test_read_exact_at_impl {
},
]),
}));
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -1021,29 +1051,27 @@ impl VirtualFile {
ctx: &RequestContext,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let slice = Vec::with_capacity(PAGE_SZ).slice_full();
assert_eq!(slice.bytes_total(), PAGE_SZ);
let slice = self
.read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
let buf = vec![0; PAGE_SZ];
let buf = self
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64), ctx)
.await?;
Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner()))
Ok(crate::tenant::block_io::BlockLease::Vec(buf))
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
let mut tmp = vec![0; 128];
loop {
let slice = tmp.slice(..128);
let (slice, res) = self.read_at(slice, self.pos, ctx).await;
let res;
(tmp, res) = self.read_at(tmp, self.pos, ctx).await;
match res {
Ok(0) => return Ok(()),
Ok(n) => {
self.pos += n as u64;
buf.extend_from_slice(&slice[..n]);
buf.extend_from_slice(&tmp[..n]);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
tmp = slice.into_inner();
}
}
}
@@ -1157,7 +1185,6 @@ mod tests {
use crate::task_mgr::TaskKind;
use super::*;
use owned_buffers_io::slice::SliceExt;
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::Rng;
@@ -1179,16 +1206,13 @@ mod tests {
impl MaybeVirtualFile {
async fn read_exact_at(
&self,
mut slice: tokio_epoll_uring::Slice<Vec<u8>>,
mut buf: Vec<u8>,
offset: u64,
ctx: &RequestContext,
) -> Result<tokio_epoll_uring::Slice<Vec<u8>>, Error> {
) -> Result<Vec<u8>, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
MaybeVirtualFile::File(file) => {
let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
file.read_exact_at(rust_slice, offset).map(|()| slice)
}
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset, ctx).await,
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
}
}
async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
@@ -1262,12 +1286,9 @@ mod tests {
len: usize,
ctx: &RequestContext,
) -> Result<String, Error> {
let slice = Vec::with_capacity(len).slice_full();
assert_eq!(slice.bytes_total(), len);
let slice = self.read_exact_at(slice, pos, ctx).await?;
let vec = slice.into_inner();
assert_eq!(vec.len(), len);
Ok(String::from_utf8(vec).unwrap())
let buf = vec![0; len];
let buf = self.read_exact_at(buf, pos, ctx).await?;
Ok(String::from_utf8(buf).unwrap())
}
}
@@ -1486,11 +1507,7 @@ mod tests {
let mut rng = rand::rngs::OsRng;
for _ in 1..1000 {
let f = &files[rng.gen_range(0..files.len())];
buf = f
.read_exact_at(buf.slice_full(), 0, &ctx)
.await
.unwrap()
.into_inner();
buf = f.read_exact_at(buf, 0, &ctx).await.unwrap();
assert!(buf == SAMPLE);
}
});

View File

@@ -107,7 +107,7 @@ use std::{
sync::atomic::{AtomicU8, Ordering},
};
use super::{owned_buffers_io::slice::SliceExt, FileGuard, Metadata};
use super::{FileGuard, Metadata};
#[cfg(target_os = "linux")]
fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
@@ -120,29 +120,38 @@ fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std:
}
impl IoEngine {
pub(super) async fn read_at<Buf>(
pub(super) async fn read_at<B>(
&self,
file_guard: FileGuard,
offset: u64,
mut slice: tokio_epoll_uring::Slice<Buf>,
) -> (
(FileGuard, tokio_epoll_uring::Slice<Buf>),
std::io::Result<usize>,
)
mut buf: B,
) -> ((FileGuard, B), std::io::Result<usize>)
where
Buf: tokio_epoll_uring::IoBufMut + Send,
B: tokio_epoll_uring::BoundedBufMut + Send,
{
match self {
IoEngine::NotSet => panic!("not initialized"),
IoEngine::StdFs => {
let rust_slice = slice.as_mut_rust_slice_full_zeroed();
let res = file_guard.with_std_file(|std_file| std_file.read_at(rust_slice, offset));
((file_guard, slice), res)
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
let dst = unsafe {
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
};
let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
if let Ok(nbytes) = &res {
assert!(*nbytes <= buf.bytes_total());
// SAFETY: see above assertion
unsafe {
buf.set_init(*nbytes);
}
}
#[allow(dropping_references)]
drop(dst);
((file_guard, buf), res)
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.read(file_guard, offset, slice).await;
let (resources, res) = system.read(file_guard, offset, buf).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
}

View File

@@ -1,121 +0,0 @@
use tokio_epoll_uring::BoundedBuf;
use tokio_epoll_uring::BoundedBufMut;
use tokio_epoll_uring::IoBufMut;
use tokio_epoll_uring::Slice;
pub(crate) trait SliceExt {
/// Get a `&mut[0..self.bytes_total()`] slice, for when you need to do borrow-based IO.
///
/// See the test case `test_slice_full_zeroed` for the difference to just doing `&slice[..]`
fn as_mut_rust_slice_full_zeroed(&mut self) -> &mut [u8];
}
impl<B> SliceExt for Slice<B>
where
B: IoBufMut,
{
#[inline(always)]
fn as_mut_rust_slice_full_zeroed(&mut self) -> &mut [u8] {
// zero-initialize the uninitialized parts of the buffer so we can create a Rust slice
//
// SAFETY: we own `slice`, don't write outside the bounds
unsafe {
let to_init = self.bytes_total() - self.bytes_init();
self.stable_mut_ptr()
.add(self.bytes_init())
.write_bytes(0, to_init);
self.set_init(self.bytes_total());
};
let bytes_total = self.bytes_total();
&mut self[0..bytes_total]
}
}
#[cfg(test)]
mod tests {
use std::io::Read;
use super::*;
use bytes::Buf;
use tokio_epoll_uring::Slice;
#[test]
fn test_slice_full_zeroed() {
let make_fake_file = || bytes::BytesMut::from(&b"12345"[..]).reader();
// before we start the test, let's make sure we have a shared understanding of what slice_full does
{
let buf = Vec::with_capacity(3);
let slice: Slice<_> = buf.slice_full();
assert_eq!(slice.bytes_init(), 0);
assert_eq!(slice.bytes_total(), 3);
let rust_slice = &slice[..];
assert_eq!(
rust_slice.len(),
0,
"Slice only derefs to a &[u8] of the initialized part"
);
}
// and also let's establish a shared understanding of .slice()
{
let buf = Vec::with_capacity(3);
let slice: Slice<_> = buf.slice(0..2);
assert_eq!(slice.bytes_init(), 0);
assert_eq!(slice.bytes_total(), 2);
let rust_slice = &slice[..];
assert_eq!(
rust_slice.len(),
0,
"Slice only derefs to a &[u8] of the initialized part"
);
}
// the above leads to the easy mistake of using slice[..] for borrow-based IO like so:
{
let buf = Vec::with_capacity(3);
let mut slice: Slice<_> = buf.slice_full();
assert_eq!(slice[..].len(), 0);
let mut file = make_fake_file();
file.read_exact(&mut slice[..]).unwrap(); // one might think this reads 3 bytes but it reads 0
assert_eq!(&slice[..] as &[u8], &[][..] as &[u8]);
}
// With owned buffers IO like with VirtualFilem, you could totally
// pass in a `Slice` with bytes_init()=0 but bytes_total()=5
// and it will read 5 bytes into the slice, and return a slice that has bytes_init()=5.
{
// TODO: demo
}
//
// Ok, now that we have a shared understanding let's demo how to use the extension trait.
//
// slice_full()
{
let buf = Vec::with_capacity(3);
let mut slice: Slice<_> = buf.slice_full();
let rust_slice = slice.as_mut_rust_slice_full_zeroed();
assert_eq!(rust_slice.len(), 3);
assert_eq!(rust_slice, &[0, 0, 0]);
let mut file = make_fake_file();
file.read_exact(rust_slice).unwrap();
assert_eq!(rust_slice, b"123");
assert_eq!(&slice[..], b"123");
}
// .slice(..)
{
let buf = Vec::with_capacity(3);
let mut slice: Slice<_> = buf.slice(0..2);
let rust_slice = slice.as_mut_rust_slice_full_zeroed();
assert_eq!(rust_slice.len(), 2);
assert_eq!(rust_slice, &[0, 0]);
let mut file = make_fake_file();
file.read_exact(rust_slice).unwrap();
assert_eq!(rust_slice, b"12");
assert_eq!(&slice[..], b"12");
}
}
}

View File

@@ -343,33 +343,7 @@ impl WalIngest {
xlog_checkpoint.oldestActiveXid,
self.checkpoint.oldestActiveXid
);
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
// because at shutdown, all in-progress transactions will implicitly
// end. Postgres startup code knows that, and allows hot standby to start
// immediately from a shutdown checkpoint.
//
// In Neon, Postgres hot standby startup always behaves as if starting from
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
// instead of overwriting self.checkpoint.oldestActiveXid with
// InvalidTransactionid from the checkpoint WAL record, update it to a
// proper value, knowing that there are no in-progress transactions at this
// point, except for prepared transactions.
//
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
}
self.checkpoint.oldestActiveXid = oldest_active_xid;
} else {
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
@@ -401,7 +375,6 @@ impl WalIngest {
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
self.checkpoint_modified = true;
}
}
pg_constants::RM_REPLORIGIN_ID => {
@@ -1304,10 +1277,13 @@ impl WalIngest {
xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
);
// In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
// truncated, but a checkpoint record with the updated values isn't written until
// later. In Neon, a server can start at any LSN, not just on a checkpoint record,
// so we keep the oldestXid and oldestXidDB up-to-date.
// Here we treat oldestXid and oldestXidDB
// differently from postgres redo routines.
// In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
// until checkpoint happens and updates the value.
// Here we can use the most recent value.
// It's just an optimization, though and can be deleted.
// TODO Figure out if there will be any issues with replica.
self.checkpoint.oldestXid = xlrec.oldest_xid;
self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
self.checkpoint_modified = true;
@@ -1408,31 +1384,14 @@ impl WalIngest {
// Note: The multixact members can wrap around, even within one WAL record.
offset = offset.wrapping_add(n_this_page as u32);
}
let next_offset = offset;
assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
// Update next-multi-xid and next-offset
//
// NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
// go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
// read it, like GetNewMultiXactId(). This is different from how nextXid is
// incremented! nextXid skips over < FirstNormalTransactionId when the the value
// is stored, so it's never 0 in a checkpoint.
//
// I don't know why it's done that way, it seems less error-prone to skip over 0
// when the value is stored rather than when it's read. But let's do it the same
// way here.
let next_multi_xid = xlrec.mid.wrapping_add(1);
if self
.checkpoint
.update_next_multixid(next_multi_xid, next_offset)
{
if xlrec.mid >= self.checkpoint.nextMulti {
self.checkpoint.nextMulti = xlrec.mid + 1;
self.checkpoint_modified = true;
}
if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
self.checkpoint_modified = true;
}
// Also update the next-xid with the highest member. According to the comments in
// multixact_redo(), this shouldn't be necessary, but let's do the same here.
let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
if let Some(max_xid) = acc {
if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {

View File

@@ -12,8 +12,6 @@
#include "fmgr.h"
#include "miscadmin.h"
#include "access/subtrans.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "storage/buf_internals.h"
@@ -24,12 +22,10 @@
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/guc.h"
#include "utils/wait_event.h"
@@ -270,293 +266,6 @@ LogicalSlotsMonitorMain(Datum main_arg)
}
}
/*
* XXX: These private to procarray.c, but we need them here.
*/
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
#define TOTAL_MAX_CACHED_SUBXIDS \
((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
/*
* Restore running-xact information by scanning the CLOG at startup.
*
* In PostgreSQL, a standby always has to wait for a running-xacts WAL record
* to arrive before it can start accepting queries. Furthermore, if there are
* transactions with too many subxids (> 64) open to fit in the in-memory
* subxids cache, the running-xacts record will be marked as "suboverflowed",
* and the standby will need to also wait for the currently in-progress
* transactions to finish.
*
* That's not great in PostgreSQL, because a hot standby does not necessary
* open up for queries immediately as you might expect. But it's worse in
* Neon: A standby in Neon doesn't need to start WAL replay from a checkpoint
* record; it can start at any LSN. Postgres arranges things so that there is
* a running-xacts record soon after every checkpoint record, but when you
* start from an arbitrary LSN, that doesn't help. If the primary is idle, or
* not running at all, it might never write a new running-xacts record,
* leaving the replica in a limbo where it can never start accepting queries.
*
* To mitigate that, we have an additional mechanism to find the running-xacts
* information: we scan the CLOG, making note of any XIDs not marked as
* committed or aborted. They are added to the Postgres known-assigned XIDs
* array by calling ProcArrayApplyRecoveryInfo() in the caller of this
* function.
*
* There is one big limitation with that mechanism: The size of the
* known-assigned XIDs is limited, so if there are a lot of in-progress XIDs,
* we have to give up. Furthermore, we don't know how many of the in-progress
* XIDs are subtransactions, and if we use up all the space in the
* known-assigned XIDs array for subtransactions, we might run out of space in
* the array later during WAL replay, causing the replica to shut down with
* "ERROR: too many KnownAssignedXids". The safe # of XIDs that we can add to
* the known-assigned array without risking that error later is very low,
* merely PGPROC_MAX_CACHED_SUBXIDS == 64, so we take our chances and use up
* to half of the known-assigned XIDs array for the subtransactions, even
* though that risks getting the error later.
*
* Note: It's OK if the recovered list of XIDs includes some transactions that
* have crashed in the primary, and hence will never commit. They will be seen
* as in-progress, until we see a new next running-acts record with an
* oldestActiveXid that invalidates them. That's how the known-assigned XIDs
* array always works.
*
* If scraping the CLOG doesn't succeed for some reason, like the subxid
* overflow, Postgres will fall back to waiting for a running-xacts record
* like usual.
*
* Returns true if a complete list of in-progress XIDs was scraped.
*/
static bool
RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *nxids)
{
TransactionId from;
TransactionId till;
int max_xcnt;
TransactionId *prepared_xids = NULL;
int n_prepared_xids;
TransactionId *restored_xids = NULL;
int n_restored_xids;
int next_prepared_idx;
Assert(*xids == NULL);
/*
* If the checkpoint doesn't have a valid oldestActiveXid, bail out. We
* don't know where to start the scan.
*
* This shouldn't happen, because the pageserver always maintains a valid
* oldestActiveXid nowadays. Except when starting at an old point in time
* that was ingested before the pageserver was taught to do that.
*/
if (!TransactionIdIsValid(checkpoint->oldestActiveXid))
{
elog(LOG, "cannot restore running-xacts from CLOG because oldestActiveXid is not set");
goto fail;
}
/*
* We will scan the CLOG starting from the oldest active XID.
*
* In some corner cases, the oldestActiveXid from the last checkpoint
* might already have been truncated from the CLOG. That is,
* oldestActiveXid might be older than oldestXid. That's possible because
* oldestActiveXid is only updated at checkpoints. After the last
* checkpoint, the oldest transaction might have committed, and the CLOG
* might also have been already truncated. So if oldestActiveXid is older
* than oldestXid, start at oldestXid instead. (Otherwise we'd try to
* access CLOG segments that have already been truncated away.)
*/
from = TransactionIdPrecedes(checkpoint->oldestXid, checkpoint->oldestActiveXid)
? checkpoint->oldestActiveXid : checkpoint->oldestXid;
till = XidFromFullTransactionId(checkpoint->nextXid);
/*
* To avoid "too many KnownAssignedXids" error later during replay, we
* limit number of collected transactions. This is a tradeoff: if we are
* willing to consume more of the KnownAssignedXids space for the XIDs
* now, that allows us to start up, but we might run out of space later.
*
* The size of the KnownAssignedXids array is TOTAL_MAX_CACHED_SUBXIDS,
* which is (PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS). In
* PostgreSQL, that's always enough because the primary will always write
* an XLOG_XACT_ASSIGNMENT record if a transaction has more than
* PGPROC_MAX_CACHED_SUBXIDS subtransactions. Seeing that record allows
* the standby to mark the XIDs in pg_subtrans and removing them from the
* KnowingAssignedXids array.
*
* Here, we don't know which XIDs belong to subtransactions that have
* already been WAL-logged with an XLOG_XACT_ASSIGNMENT record. If we
* wanted to be totally safe and avoid the possibility of getting a "too
* many KnownAssignedXids" error later, we would have to limit ourselves
* to PGPROC_MAX_CACHED_SUBXIDS, which is not much. And that includes top
* transaction IDs too, because we cannot distinguish between top
* transaction IDs and subtransactions here.
*
* Somewhat arbitrarily, we use up to half of KnownAssignedXids. That
* strikes a sensible balance between being useful, and risking a "too
* many KnownAssignedXids" error later.
*/
max_xcnt = TOTAL_MAX_CACHED_SUBXIDS / 2;
/*
* Collect XIDs of prepared transactions in an array. This includes only
* their top-level XIDs. We assume that StandbyRecoverPreparedTransactions
* has already been called, so we can find all the sub-transactions in
* pg_subtrans.
*/
PrescanPreparedTransactions(&prepared_xids, &n_prepared_xids);
qsort(prepared_xids, n_prepared_xids, sizeof(TransactionId), xidLogicalComparator);
/*
* Scan the CLOG, collecting in-progress XIDs into 'restored_xids'.
*/
elog(DEBUG1, "scanning CLOG between %u and %u for in-progress XIDs", from, till);
restored_xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId));
n_restored_xids = 0;
next_prepared_idx = 0;
for (TransactionId xid = from; xid != till;)
{
XLogRecPtr xidlsn;
XidStatus xidstatus;
xidstatus = TransactionIdGetStatus(xid, &xidlsn);
/*
* "Merge" the prepared transactions into the restored_xids array as
* we go. The prepared transactions array is sorted. This is mostly
* a sanity check to ensure that all the prpeared transactions are
* seen as in-progress. (There is a check after the loop that we didn't
* miss any.)
*/
if (next_prepared_idx < n_prepared_xids && xid == prepared_xids[next_prepared_idx])
{
/*
* This is a top-level transaction ID of a prepared transaction.
* Include it in the array.
*/
/* sanity check */
if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS)
{
elog(LOG, "prepared transaction %u has unexpected status %X, cannot restore running-xacts from CLOG",
xid, xidstatus);
Assert(false);
goto fail;
}
elog(DEBUG1, "XID %u: was next prepared xact (%d / %d)", xid, next_prepared_idx, n_prepared_xids);
next_prepared_idx++;
}
else if (xidstatus == TRANSACTION_STATUS_COMMITTED)
{
elog(DEBUG1, "XID %u: was committed", xid);
goto skip;
}
else if (xidstatus == TRANSACTION_STATUS_ABORTED)
{
elog(DEBUG1, "XID %u: was aborted", xid);
goto skip;
}
else if (xidstatus == TRANSACTION_STATUS_IN_PROGRESS)
{
/*
* In-progress transactions are included in the array.
*
* Except subtransactions of the prepared transactions. They are
* already set in pg_subtrans, and hence don't need to be tracked
* in the known-assigned XIDs array.
*/
if (n_prepared_xids > 0)
{
TransactionId parent = SubTransGetParent(xid);
if (TransactionIdIsValid(parent))
{
/*
* This is a subtransaction belonging to a prepared
* transaction.
*
* Sanity check that it is in the prepared XIDs array. It
* should be, because StandbyRecoverPreparedTransactions
* populated pg_subtrans, and no other XID should be set
* in it yet. (This also relies on the fact that
* StandbyRecoverPreparedTransactions sets the parent of
* each subxid to point directly to the top-level XID,
* rather than restoring the original subtransaction
* hierarchy.)
*/
if (bsearch(&parent, prepared_xids, next_prepared_idx,
sizeof(TransactionId), xidLogicalComparator) == NULL)
{
elog(LOG, "sub-XID %u has unexpected parent %u, cannot restore running-xacts from CLOG",
xid, parent);
Assert(false);
goto fail;
}
elog(DEBUG1, "XID %u: was a subtransaction of prepared xid %u", xid, parent);
goto skip;
}
}
/* include it in the array */
elog(DEBUG1, "XID %u: is in progress", xid);
}
else
{
/*
* SUB_COMMITTED is a transient state used at commit. We don't
* expect to see that here.
*/
elog(LOG, "XID %u has unexpected status %X in pg_xact, cannot restore running-xacts from CLOG",
xid, xidstatus);
Assert(false);
goto fail;
}
if (n_restored_xids >= max_xcnt)
{
/*
* Overflowed. We won't be able to install the RunningTransactions
* snapshot.
*/
elog(LOG, "too many running xacts to restore from the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
checkpoint->oldestXid, checkpoint->oldestActiveXid,
XidFromFullTransactionId(checkpoint->nextXid));
goto fail;
}
restored_xids[n_restored_xids++] = xid;
skip:
TransactionIdAdvance(xid);
continue;
}
/* sanity check */
if (next_prepared_idx != n_prepared_xids)
{
elog(LOG, "prepared transaction ID %u was not visited in the CLOG scan, cannot restore running-xacts from CLOG",
prepared_xids[next_prepared_idx]);
Assert(false);
goto fail;
}
elog(LOG, "restored %d running xacts by scanning the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
n_restored_xids, checkpoint->oldestXid, checkpoint->oldestActiveXid, XidFromFullTransactionId(checkpoint->nextXid));
*nxids = n_restored_xids;
*xids = restored_xids;
return true;
fail:
*nxids = 0;
*xids = NULL;
if (restored_xids)
pfree(restored_xids);
if (prepared_xids)
pfree(prepared_xids);
return false;
}
void
_PG_init(void)
{
@@ -579,8 +288,6 @@ _PG_init(void)
pg_init_extension_server();
restore_running_xacts_callback = RestoreRunningXactsFromClog;
/*
* Important: This must happen after other parts of the extension are
* loaded, otherwise any settings to GUCs that were set before the

View File

@@ -7,7 +7,7 @@ OBJS = \
neontest.o
EXTENSION = neon_test_utils
DATA = neon_test_utils--1.2.sql
DATA = neon_test_utils--1.1.sql
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
PG_CONFIG = pg_config

View File

@@ -41,7 +41,7 @@ RETURNS bytea
AS 'MODULE_PATHNAME', 'get_raw_page_at_lsn_ex'
LANGUAGE C PARALLEL UNSAFE;
CREATE FUNCTION neon_xlogflush(lsn pg_lsn DEFAULT NULL)
CREATE FUNCTION neon_xlogflush(lsn pg_lsn)
RETURNS VOID
AS 'MODULE_PATHNAME', 'neon_xlogflush'
LANGUAGE C PARALLEL UNSAFE;

View File

@@ -1,6 +1,6 @@
# neon_test_utils extension
comment = 'helpers for neon testing and debugging'
default_version = '1.2'
default_version = '1.1'
module_pathname = '$libdir/neon_test_utils'
relocatable = true
trusted = true

View File

@@ -15,7 +15,6 @@
#include "access/relation.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "catalog/namespace.h"
#include "fmgr.h"
#include "funcapi.h"
@@ -445,46 +444,11 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
/*
* Directly calls XLogFlush(lsn) to flush WAL buffers.
*
* If 'lsn' is not specified (is NULL), flush all generated WAL.
*/
Datum
neon_xlogflush(PG_FUNCTION_ARGS)
{
XLogRecPtr lsn;
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is in progress"),
errhint("cannot flush WAL during recovery.")));
if (!PG_ARGISNULL(0))
lsn = PG_GETARG_LSN(0);
else
{
lsn = GetXLogInsertRecPtr();
/*---
* The LSN returned by GetXLogInsertRecPtr() is the position where the
* next inserted record would begin. If the last record ended just at
* the page boundary, the next record will begin after the page header
* on the next page, and that's what GetXLogInsertRecPtr().returns,
* but the page header has not been written yet. If we tried to flush
* it, XLogFlush() would throw an error:
*
* ERROR : xlog flush request %X/%X is not satisfied --- flushed only to %X/%X
*
* To avoid that, if the insert position points to just after the page
* header, back off to page boundary.
*/
if (lsn % XLOG_BLCKSZ == SizeOfXLogShortPHD &&
XLogSegmentOffset(lsn, wal_segment_size) > XLOG_BLCKSZ)
lsn -= SizeOfXLogShortPHD;
else if (lsn % XLOG_BLCKSZ == SizeOfXLogLongPHD &&
XLogSegmentOffset(lsn, wal_segment_size) < XLOG_BLCKSZ)
lsn -= SizeOfXLogLongPHD;
}
XLogRecPtr lsn = PG_GETARG_LSN(0);
XLogFlush(lsn);
PG_RETURN_VOID();

View File

@@ -153,7 +153,7 @@ pub struct ComputeUserInfo {
impl ComputeUserInfo {
pub fn endpoint_cache_key(&self) -> EndpointCacheKey {
self.options.get_cache_key(&self.endpoint)
self.options.get_cache_key((&self.endpoint).into())
}
}

View File

@@ -241,6 +241,8 @@ fn project_name_valid(name: &str) -> bool {
#[cfg(test)]
mod tests {
use crate::intern::EndpointIdInt;
use super::*;
use serde_json::json;
use ComputeUserInfoParseError::*;
@@ -284,7 +286,6 @@ mod tests {
ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, sni, common_names.as_ref())?;
assert_eq!(user_info.user, "john_doe");
assert_eq!(user_info.endpoint_id.as_deref(), Some("foo"));
assert_eq!(user_info.options.get_cache_key("foo"), "foo");
Ok(())
}
@@ -442,8 +443,9 @@ mod tests {
let user_info =
ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, sni, common_names.as_ref())?;
assert_eq!(user_info.endpoint_id.as_deref(), Some("project"));
let project = EndpointIdInt::from(EndpointId::from("project"));
assert_eq!(
user_info.options.get_cache_key("project"),
user_info.options.get_cache_key(project).to_string(),
"project endpoint_type:read_write lsn:0/2"
);

View File

@@ -43,6 +43,15 @@ impl<C: Cache, V> Cached<C, V> {
Self { token: None, value }
}
/// Place any entry into this wrapper; invalidation will be a no-op.
pub fn map<U>(self, f: impl FnOnce(V) -> U) -> Cached<C, U> {
let token = self.token;
Cached {
token,
value: f(self.value),
}
}
pub fn take_value(self) -> (Cached<C, ()>, V) {
(
Cached {

View File

@@ -93,7 +93,7 @@ pub type ScramKeys = tokio_postgres::config::ScramKeys<32>;
/// Eventually, `tokio_postgres` will be replaced with something better.
/// Newtype allows us to implement methods on top of it.
#[derive(Clone, Default)]
pub struct ConnCfg(Box<tokio_postgres::Config>);
pub struct ConnCfg(tokio_postgres::Config);
/// Creation and initialization routines.
impl ConnCfg {

View File

@@ -9,14 +9,14 @@ use crate::{
IpPattern,
},
cache::{endpoints::EndpointsCache, project_info::ProjectInfoCacheImpl, Cached, TimedLru},
compute,
compute::{self, ConnCfg},
config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions},
context::RequestMonitoring,
error::ReportableError,
intern::ProjectIdInt,
metrics::ApiLockMetrics,
rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token},
scram, EndpointCacheKey,
scram, EndpointCacheKey, Host,
};
use dashmap::DashMap;
use std::{hash::Hash, sync::Arc, time::Duration};
@@ -289,6 +289,33 @@ pub struct NodeInfo {
pub allow_self_signed_compute: bool,
}
/// Cached info for establishing a connection to a compute node.
#[derive(Clone)]
pub struct NodeCachedInfo {
pub host: Host,
pub port: u16,
/// Labels for proxy's metrics.
pub aux: MetricsAuxInfo,
/// Whether we should accept self-signed certificates (for testing)
pub allow_self_signed_compute: bool,
}
impl NodeCachedInfo {
pub fn into_node_info(self) -> NodeInfo {
let mut config = ConnCfg::default();
config.ssl_mode(tokio_postgres::config::SslMode::Disable);
config.host(&self.host);
config.port(self.port);
NodeInfo {
config,
aux: self.aux,
allow_self_signed_compute: self.allow_self_signed_compute,
}
}
}
impl NodeInfo {
pub async fn connect(
&self,
@@ -317,8 +344,8 @@ impl NodeInfo {
}
}
pub type NodeInfoCache = TimedLru<EndpointCacheKey, NodeInfo>;
pub type CachedNodeInfo = Cached<&'static NodeInfoCache>;
pub type NodeInfoCache = TimedLru<EndpointCacheKey, NodeCachedInfo>;
pub type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>;
pub type CachedRoleSecret = Cached<&'static ProjectInfoCacheImpl, Option<AuthSecret>>;
pub type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<IpPattern>>>;

View File

@@ -4,22 +4,20 @@ use super::{
super::messages::{ConsoleError, GetRoleSecret, WakeCompute},
errors::{ApiError, GetAuthInfoError, WakeComputeError},
ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret,
NodeInfo,
NodeCachedInfo,
};
use crate::{
auth::backend::ComputeUserInfo,
compute,
console::messages::ColdStartInfo,
http,
metrics::{CacheOutcome, Metrics},
rate_limiter::EndpointRateLimiter,
scram, EndpointCacheKey,
scram, EndpointCacheKey, Host,
};
use crate::{cache::Cached, context::RequestMonitoring};
use futures::TryFutureExt;
use std::sync::Arc;
use tokio::time::Instant;
use tokio_postgres::config::SslMode;
use tracing::{error, info, info_span, warn, Instrument};
pub struct Api {
@@ -132,7 +130,7 @@ impl Api {
&self,
ctx: &mut RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<NodeInfo, WakeComputeError> {
) -> Result<NodeCachedInfo, WakeComputeError> {
let request_id = ctx.session_id.to_string();
let application_name = ctx.console_application_name();
async {
@@ -167,15 +165,11 @@ impl Api {
None => return Err(WakeComputeError::BadComputeAddress(body.address)),
Some(x) => x,
};
let host = Host(host.into());
// Don't set anything but host and port! This config will be cached.
// We'll set username and such later using the startup message.
// TODO: add more type safety (in progress).
let mut config = compute::ConnCfg::new();
config.host(host).port(port).ssl_mode(SslMode::Disable); // TLS is not configured on compute nodes.
let node = NodeInfo {
config,
let node = NodeCachedInfo {
host,
port,
aux: body.aux,
allow_self_signed_compute: false,
};
@@ -278,9 +272,9 @@ impl super::Api for Api {
// The connection info remains the same during that period of time,
// which means that we might cache it to reduce the load and latency.
if let Some(cached) = self.caches.node_info.get(&key) {
info!(key = &*key, "found cached compute node info");
info!(key = display(&key), "found cached compute node info");
ctx.set_project(cached.aux.clone());
return Ok(cached);
return Ok(cached.map(NodeCachedInfo::into_node_info));
}
let permit = self.locks.get_permit(&key).await?;
@@ -289,9 +283,9 @@ impl super::Api for Api {
// double check
if permit.should_check_cache() {
if let Some(cached) = self.caches.node_info.get(&key) {
info!(key = &*key, "found cached compute node info");
info!(key = display(&key), "found cached compute node info");
ctx.set_project(cached.aux.clone());
return Ok(cached);
return Ok(cached.map(NodeCachedInfo::into_node_info));
}
}
@@ -300,7 +294,7 @@ impl super::Api for Api {
.wake_compute_endpoint_rate_limiter
.check(user_info.endpoint.normalize_intern(), 1)
{
info!(key = &*key, "found cached compute node info");
info!(key = display(&key), "found cached compute node info");
return Err(WakeComputeError::TooManyConnections);
}
@@ -314,9 +308,12 @@ impl super::Api for Api {
let (_, mut cached) = self.caches.node_info.insert(key.clone(), node);
cached.aux.cold_start_info = cold_start_info;
info!(key = &*key, "created a cache entry for compute node info");
info!(
key = display(&key),
"created a cache entry for compute node info"
);
Ok(cached)
Ok(cached.map(NodeCachedInfo::into_node_info))
}
}

View File

@@ -157,8 +157,16 @@ smol_str_wrapper!(BranchId);
// 90% of project strings are 23 characters or less.
smol_str_wrapper!(ProjectId);
// will usually equal endpoint ID
smol_str_wrapper!(EndpointCacheKey);
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub struct EndpointCacheKey {
pub id: EndpointIdInt,
pub extra: Box<str>,
}
impl std::fmt::Display for EndpointCacheKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}{}", &self.id, &self.extra)
}
}
smol_str_wrapper!(DbName);

View File

@@ -10,6 +10,7 @@ pub mod wake_compute;
pub use copy_bidirectional::copy_bidirectional_client_compute;
pub use copy_bidirectional::ErrorSource;
use crate::intern::EndpointIdInt;
use crate::{
auth,
cancellation::{self, CancellationHandlerMain, CancellationHandlerMainInternal},
@@ -404,13 +405,20 @@ impl NeonOptions {
Self(options)
}
pub fn get_cache_key(&self, prefix: &str) -> EndpointCacheKey {
// prefix + format!(" {k}:{v}")
// kinda jank because SmolStr is immutable
std::iter::once(prefix)
.chain(self.0.iter().flat_map(|(k, v)| [" ", &**k, ":", &**v]))
.collect::<SmolStr>()
.into()
pub fn get_cache_key(&self, endpoint: EndpointIdInt) -> EndpointCacheKey {
EndpointCacheKey {
id: endpoint,
extra: self.get_cache_key_extras(),
}
}
pub fn get_cache_key_extras(&self) -> Box<str> {
let mut extras = String::new();
for (k, v) in &self.0 {
use std::fmt::Write;
write!(&mut extras, " {k}:{v}").unwrap();
}
extras.into_boxed_str()
}
/// <https://swagger.io/docs/specification/serialization/> DeepObject format

View File

@@ -47,7 +47,7 @@ pub trait ConnectMechanism {
async fn connect_once(
&self,
ctx: &mut RequestMonitoring,
node_info: &console::CachedNodeInfo,
node_info: &NodeInfo,
timeout: time::Duration,
) -> Result<Self::Connection, Self::ConnectError>;
@@ -82,7 +82,7 @@ impl ConnectMechanism for TcpMechanism<'_> {
async fn connect_once(
&self,
ctx: &mut RequestMonitoring,
node_info: &console::CachedNodeInfo,
node_info: &NodeInfo,
timeout: time::Duration,
) -> Result<PostgresConnection, Self::Error> {
let host = node_info.config.get_host()?;

View File

@@ -13,8 +13,10 @@ use crate::auth::backend::{
use crate::config::{CertResolver, RetryConfig};
use crate::console::caches::NodeInfoCache;
use crate::console::messages::{ConsoleError, Details, MetricsAuxInfo, Status};
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend};
use crate::console::{self, CachedNodeInfo, NodeInfo};
use crate::console::provider::{
CachedAllowedIps, CachedRoleSecret, ConsoleBackend, NodeCachedInfo,
};
use crate::console::{self, CachedNodeInfo};
use crate::error::ErrorKind;
use crate::{http, sasl, scram, BranchId, EndpointId, ProjectId};
use anyhow::{bail, Context};
@@ -458,7 +460,7 @@ impl ConnectMechanism for TestConnectMechanism {
async fn connect_once(
&self,
_ctx: &mut RequestMonitoring,
_node_info: &console::CachedNodeInfo,
_node_info: &console::NodeInfo,
_timeout: std::time::Duration,
) -> Result<Self::Connection, Self::ConnectError> {
let mut counter = self.counter.lock().unwrap();
@@ -530,8 +532,9 @@ impl TestBackend for TestConnectMechanism {
}
fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo {
let node = NodeInfo {
config: compute::ConnCfg::new(),
let node = NodeCachedInfo {
host: "localhost".into(),
port: 5432,
aux: MetricsAuxInfo {
endpoint_id: (&EndpointId::from("endpoint")).into(),
project_id: (&ProjectId::from("project")).into(),
@@ -540,8 +543,12 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
},
allow_self_signed_compute: false,
};
let (_, node) = cache.insert("key".into(), node);
node
let key = EndpointCacheKey {
id: node.aux.endpoint_id,
extra: "".into(),
};
let (_, node) = cache.insert(key, node);
node.map(NodeCachedInfo::into_node_info)
}
fn helper_create_connect_info(

View File

@@ -11,7 +11,7 @@ use crate::{
errors::{GetAuthInfoError, WakeComputeError},
locks::ApiLocks,
provider::ApiLockError,
CachedNodeInfo,
NodeInfo,
},
context::RequestMonitoring,
error::{ErrorKind, ReportableError, UserFacingError},
@@ -223,7 +223,7 @@ impl ConnectMechanism for TokioMechanism {
async fn connect_once(
&self,
ctx: &mut RequestMonitoring,
node_info: &CachedNodeInfo,
node_info: &NodeInfo,
timeout: Duration,
) -> Result<Self::Connection, Self::ConnectError> {
let host = node_info.config.get_host()?;

View File

@@ -61,7 +61,7 @@ impl fmt::Display for ConnInfo {
self.user_info.user,
self.user_info.endpoint,
self.dbname,
self.user_info.options.get_cache_key("")
self.user_info.options.get_cache_key_extras()
)
}
}

View File

@@ -29,8 +29,7 @@ use utils::pid_file;
use metrics::set_build_info_metric;
use safekeeper::defaults::{
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR,
DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::http;
use safekeeper::wal_service;
@@ -192,9 +191,6 @@ struct Args {
/// Pending updates to control file will be automatically saved after this interval.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_CONTROL_FILE_SAVE_INTERVAL)]
control_file_save_interval: Duration,
/// Number of allowed concurrent uploads of partial segments to remote storage.
#[arg(long, default_value = DEFAULT_PARTIAL_BACKUP_CONCURRENCY)]
partial_backup_concurrency: usize,
}
// Like PathBufValueParser, but allows empty string.
@@ -348,7 +344,6 @@ async fn main() -> anyhow::Result<()> {
enable_offload: args.enable_offload,
delete_offloaded_wal: args.delete_offloaded_wal,
control_file_save_interval: args.control_file_save_interval,
partial_backup_concurrency: args.partial_backup_concurrency,
};
// initialize sentry if SENTRY_DSN is provided

View File

@@ -52,7 +52,6 @@ pub mod defaults {
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
}
#[derive(Debug, Clone)]
@@ -92,7 +91,6 @@ pub struct SafeKeeperConf {
pub enable_offload: bool,
pub delete_offloaded_wal: bool,
pub control_file_save_interval: Duration,
pub partial_backup_concurrency: usize,
}
impl SafeKeeperConf {
@@ -135,7 +133,6 @@ impl SafeKeeperConf {
enable_offload: false,
delete_offloaded_wal: false,
control_file_save_interval: Duration::from_secs(1),
partial_backup_concurrency: 1,
}
}
}

View File

@@ -72,8 +72,7 @@ pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"safekeeper_wal_storage_operation_seconds",
"Seconds spent on WAL storage operations",
&["operation"],
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
&["operation"]
)
.expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
});
@@ -81,8 +80,7 @@ pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"safekeeper_misc_operation_seconds",
"Seconds spent on miscellaneous operations",
&["operation"],
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
&["operation"]
)
.expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
});

View File

@@ -36,7 +36,7 @@ use crate::timeline_guard::ResidenceGuard;
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self};
use crate::wal_backup_partial::{PartialRemoteSegment, RateLimiter};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
@@ -587,7 +587,6 @@ impl Timeline {
shared_state: &mut WriteGuardSharedState<'_>,
conf: &SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
partial_backup_rate_limiter: RateLimiter,
) -> Result<()> {
match fs::metadata(&self.timeline_dir).await {
Ok(_) => {
@@ -618,7 +617,7 @@ impl Timeline {
return Err(e);
}
self.bootstrap(conf, broker_active_set, partial_backup_rate_limiter);
self.bootstrap(conf, broker_active_set);
Ok(())
}
@@ -627,7 +626,6 @@ impl Timeline {
self: &Arc<Timeline>,
conf: &SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
partial_backup_rate_limiter: RateLimiter,
) {
let (tx, rx) = self.manager_ctl.bootstrap_manager();
@@ -639,7 +637,6 @@ impl Timeline {
broker_active_set,
tx,
rx,
partial_backup_rate_limiter,
));
}

View File

@@ -32,7 +32,7 @@ use crate::{
timeline_guard::{AccessService, GuardId, ResidenceGuard},
timelines_set::{TimelineSetGuard, TimelinesSet},
wal_backup::{self, WalBackupTaskHandle},
wal_backup_partial::{self, PartialRemoteSegment, RateLimiter},
wal_backup_partial::{self, PartialRemoteSegment},
SafeKeeperConf,
};
@@ -185,7 +185,6 @@ pub(crate) struct Manager {
// misc
pub(crate) access_service: AccessService,
pub(crate) partial_backup_rate_limiter: RateLimiter,
}
/// This task gets spawned alongside each timeline and is responsible for managing the timeline's
@@ -198,7 +197,6 @@ pub async fn main_task(
broker_active_set: Arc<TimelinesSet>,
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
partial_backup_rate_limiter: RateLimiter,
) {
tli.set_status(Status::Started);
@@ -211,14 +209,7 @@ pub async fn main_task(
}
};
let mut mgr = Manager::new(
tli,
conf,
broker_active_set,
manager_tx,
partial_backup_rate_limiter,
)
.await;
let mut mgr = Manager::new(tli, conf, broker_active_set, manager_tx).await;
// Start recovery task which always runs on the timeline.
if !mgr.is_offloaded && mgr.conf.peer_recovery_enabled {
@@ -330,7 +321,6 @@ impl Manager {
conf: SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
partial_backup_rate_limiter: RateLimiter,
) -> Manager {
let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
Manager {
@@ -349,7 +339,6 @@ impl Manager {
partial_backup_uploaded,
access_service: AccessService::new(manager_tx),
tli,
partial_backup_rate_limiter,
}
}
@@ -536,7 +525,6 @@ impl Manager {
self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
self.wal_resident_timeline(),
self.conf.clone(),
self.partial_backup_rate_limiter.clone(),
)));
}

View File

@@ -5,7 +5,6 @@
use crate::safekeeper::ServerInfo;
use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup_partial::RateLimiter;
use crate::SafeKeeperConf;
use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf;
@@ -24,7 +23,6 @@ struct GlobalTimelinesState {
conf: Option<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
partial_backup_rate_limiter: RateLimiter,
}
// Used to prevent concurrent timeline loading.
@@ -39,12 +37,8 @@ impl GlobalTimelinesState {
}
/// Get dependencies for a timeline constructor.
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>, RateLimiter) {
(
self.get_conf().clone(),
self.broker_active_set.clone(),
self.partial_backup_rate_limiter.clone(),
)
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>) {
(self.get_conf().clone(), self.broker_active_set.clone())
}
/// Insert timeline into the map. Returns error if timeline with the same id already exists.
@@ -72,7 +66,6 @@ static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
partial_backup_rate_limiter: RateLimiter::new(1),
})
});
@@ -86,7 +79,6 @@ impl GlobalTimelines {
// lock, so use explicit block
let tenants_dir = {
let mut state = TIMELINES_STATE.lock().unwrap();
state.partial_backup_rate_limiter = RateLimiter::new(conf.partial_backup_concurrency);
state.conf = Some(conf);
// Iterate through all directories and load tenants for all directories
@@ -130,7 +122,7 @@ impl GlobalTimelines {
/// this function is called during init when nothing else is running, so
/// this is fine.
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let (conf, broker_active_set) = {
let state = TIMELINES_STATE.lock().unwrap();
state.get_dependencies()
};
@@ -153,11 +145,7 @@ impl GlobalTimelines {
.unwrap()
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(
&conf,
broker_active_set.clone(),
partial_backup_rate_limiter.clone(),
);
tli.bootstrap(&conf, broker_active_set.clone());
}
// If we can't load a timeline, it's most likely because of a corrupted
// directory. We will log an error and won't allow to delete/recreate
@@ -190,8 +178,7 @@ impl GlobalTimelines {
_guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
ttid: TenantTimelineId,
) -> Result<Arc<Timeline>> {
let (conf, broker_active_set, partial_backup_rate_limiter) =
TIMELINES_STATE.lock().unwrap().get_dependencies();
let (conf, broker_active_set) = TIMELINES_STATE.lock().unwrap().get_dependencies();
match Timeline::load_timeline(&conf, ttid) {
Ok(timeline) => {
@@ -204,7 +191,7 @@ impl GlobalTimelines {
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter);
tli.bootstrap(&conf, broker_active_set);
Ok(tli)
}
@@ -235,7 +222,7 @@ impl GlobalTimelines {
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let (conf, broker_active_set) = {
let state = TIMELINES_STATE.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it.
@@ -270,12 +257,7 @@ impl GlobalTimelines {
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
// and the state on disk should remain unchanged.
if let Err(e) = timeline
.init_new(
&mut shared_state,
&conf,
broker_active_set,
partial_backup_rate_limiter,
)
.init_new(&mut shared_state, &conf, broker_active_set)
.await
{
// Note: the most likely reason for init failure is that the timeline

View File

@@ -18,8 +18,6 @@
//! This way control file stores information about all potentially existing
//! remote partial segments and can clean them up after uploading a newer version.
use std::sync::Arc;
use camino::Utf8PathBuf;
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
@@ -29,7 +27,7 @@ use tracing::{debug, error, info, instrument, warn};
use utils::lsn::Lsn;
use crate::{
metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
metrics::{PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
safekeeper::Term,
timeline::WalResidentTimeline,
timeline_manager::StateSnapshot,
@@ -37,30 +35,6 @@ use crate::{
SafeKeeperConf,
};
#[derive(Clone)]
pub struct RateLimiter {
semaphore: Arc<tokio::sync::Semaphore>,
}
impl RateLimiter {
pub fn new(permits: usize) -> Self {
Self {
semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
}
}
async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit {
let _timer = MISC_OPERATION_SECONDS
.with_label_values(&["partial_permit_acquire"])
.start_timer();
self.semaphore
.clone()
.acquire_owned()
.await
.expect("semaphore is closed")
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum UploadStatus {
/// Upload is in progress. This status should be used only for garbage collection,
@@ -234,9 +208,6 @@ impl PartialBackup {
/// Upload the latest version of the partial segment and garbage collect older versions.
#[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
let _timer = MISC_OPERATION_SECONDS
.with_label_values(&["partial_do_upload"])
.start_timer();
info!("starting upload {:?}", prepared);
let state_0 = self.state.clone();
@@ -336,7 +307,6 @@ pub(crate) fn needs_uploading(
pub async fn main_task(
tli: WalResidentTimeline,
conf: SafeKeeperConf,
limiter: RateLimiter,
) -> Option<PartialRemoteSegment> {
debug!("started");
let await_duration = conf.partial_backup_timeout;
@@ -441,9 +411,6 @@ pub async fn main_task(
continue 'outer;
}
// limit concurrent uploads
let _upload_permit = limiter.acquire_owned().await;
let prepared = backup.prepare_upload().await;
if let Some(seg) = &uploaded_segment {
if seg.eq_without_status(&prepared) {

View File

@@ -187,7 +187,6 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
enable_offload: false,
delete_offloaded_wal: false,
control_file_save_interval: Duration::from_secs(1),
partial_backup_concurrency: 1,
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -10,9 +10,8 @@ use hyper::header::CONTENT_TYPE;
use hyper::{Body, Request, Response};
use hyper::{StatusCode, Uri};
use metrics::{BuildInfo, NeonMetrics};
use pageserver_api::controller_api::TenantCreateRequest;
use pageserver_api::models::{
TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
TenantConfigRequest, TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
TenantTimeTravelRequest, TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;

View File

@@ -1,7 +1,6 @@
use crate::pageserver_client::PageserverClient;
use crate::persistence::Persistence;
use crate::service;
use pageserver_api::controller_api::PlacementPolicy;
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
@@ -30,7 +29,6 @@ pub(super) struct Reconciler {
/// of a tenant's state from when we spawned a reconcile task.
pub(super) tenant_shard_id: TenantShardId,
pub(crate) shard: ShardIdentity,
pub(crate) placement_policy: PlacementPolicy,
pub(crate) generation: Option<Generation>,
pub(crate) intent: TargetState,
@@ -643,7 +641,7 @@ impl Reconciler {
generation,
&self.shard,
&self.config,
&self.placement_policy,
!self.intent.secondary.is_empty(),
);
match self.observed.locations.get(&node.get_id()) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
@@ -803,15 +801,8 @@ pub(crate) fn attached_location_conf(
generation: Generation,
shard: &ShardIdentity,
config: &TenantConfig,
policy: &PlacementPolicy,
has_secondaries: bool,
) -> LocationConfig {
let has_secondaries = match policy {
PlacementPolicy::Attached(0) | PlacementPolicy::Detached | PlacementPolicy::Secondary => {
false
}
PlacementPolicy::Attached(_) => true,
};
LocationConfig {
mode: LocationConfigMode::AttachedSingle,
generation: generation.into(),

View File

@@ -32,10 +32,10 @@ use itertools::Itertools;
use pageserver_api::{
controller_api::{
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
ShardSchedulingPolicy, TenantCreateRequest, TenantCreateResponse,
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
TenantShardMigrateResponse, UtilizationScore,
ShardSchedulingPolicy, TenantCreateResponse, TenantCreateResponseShard,
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse,
TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
UtilizationScore,
},
models::{SecondaryProgress, TenantConfigRequest, TopTenantShardsRequest},
};
@@ -46,9 +46,10 @@ use crate::pageserver_client::PageserverClient;
use pageserver_api::{
models::{
self, LocationConfig, LocationConfigListResponse, LocationConfigMode,
PageserverUtilization, ShardParameters, TenantConfig, TenantLocationConfigRequest,
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
TenantShardSplitResponse, TenantTimeTravelRequest, TimelineCreateRequest, TimelineInfo,
PageserverUtilization, ShardParameters, TenantConfig, TenantCreateRequest,
TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation,
TenantShardSplitRequest, TenantShardSplitResponse, TenantTimeTravelRequest,
TimelineCreateRequest, TimelineInfo,
},
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
upcall_api::{
@@ -1390,7 +1391,7 @@ impl Service {
tenant_shard.generation.unwrap(),
&tenant_shard.shard,
&tenant_shard.config,
&PlacementPolicy::Attached(0),
false,
)),
},
)]);
@@ -3321,7 +3322,7 @@ impl Service {
generation,
&child_shard,
&config,
&policy,
matches!(policy, PlacementPolicy::Attached(n) if n > 0),
)),
},
);

View File

@@ -908,8 +908,12 @@ impl TenantShard {
.generation
.expect("Attempted to enter attached state without a generation");
let wanted_conf =
attached_location_conf(generation, &self.shard, &self.config, &self.policy);
let wanted_conf = attached_location_conf(
generation,
&self.shard,
&self.config,
!self.intent.secondary.is_empty(),
);
match self.observed.locations.get(&node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
Some(_) | None => {
@@ -1095,7 +1099,6 @@ impl TenantShard {
let mut reconciler = Reconciler {
tenant_shard_id: self.tenant_shard_id,
shard: self.shard,
placement_policy: self.policy.clone(),
generation: self.generation,
intent: reconciler_intent,
detach,

View File

@@ -2741,19 +2741,7 @@ class NeonPageserver(PgProtocol, LogUtils):
if generation is None:
generation = self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
client = self.http_client(auth_token=auth_token)
conf = conf or {}
client.tenant_location_conf(
tenant_id,
{
"mode": "AttachedSingle",
"generation": generation,
"tenant_conf": conf,
"secondary_conf": None,
},
)
return tenant_id
return client.tenant_create(tenant_id, conf, generation=generation)
def list_layers(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
@@ -3491,6 +3479,7 @@ class Endpoint(PgProtocol, LogUtils):
):
super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres")
self.env = env
self.running = False
self.branch_name: Optional[str] = None # dubious
self.endpoint_id: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
@@ -3856,9 +3845,7 @@ class EndpointFactory:
return self
def new_replica(
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
):
def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]):
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None

View File

@@ -220,6 +220,34 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
assert isinstance(res_json, list)
return res_json
def tenant_create(
self,
new_tenant_id: Union[TenantId, TenantShardId],
conf: Optional[Dict[str, Any]] = None,
generation: Optional[int] = None,
) -> TenantId:
if conf is not None:
assert "new_tenant_id" not in conf.keys()
body: Dict[str, Any] = {
"new_tenant_id": str(new_tenant_id),
**(conf or {}),
}
if generation is not None:
body.update({"generation": generation})
res = self.post(
f"http://localhost:{self.port}/v1/tenant",
json=body,
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create tenant: already exists for id {new_tenant_id}")
new_tenant_id = res.json()
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_attach(
self,
tenant_id: Union[TenantId, TenantShardId],

View File

@@ -198,7 +198,7 @@ def wait_for_last_record_lsn(
lsn: Lsn,
) -> Lsn:
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
for i in range(1000):
for i in range(100):
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
return current_lsn

View File

@@ -7,7 +7,6 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
VanillaPostgres,
import_timeline_from_vanilla_postgres,
wait_for_wal_insert_lsn,
)
@@ -183,275 +182,3 @@ def test_import_at_2bil(
cur = conn.cursor()
cur.execute("SELECT count(*) from t")
assert cur.fetchone() == (10000 + 1 + 1,)
# Constants and macros copied from PostgreSQL multixact.c and headers. These are needed to
# calculate the SLRU segments that a particular multixid or multixid-offsets falls into.
BLCKSZ = 8192
MULTIXACT_OFFSETS_PER_PAGE = int(BLCKSZ / 4)
SLRU_PAGES_PER_SEGMENT = int(32)
MXACT_MEMBER_BITS_PER_XACT = 8
MXACT_MEMBER_FLAGS_PER_BYTE = 1
MULTIXACT_FLAGBYTES_PER_GROUP = 4
MULTIXACT_MEMBERS_PER_MEMBERGROUP = MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE
MULTIXACT_MEMBERGROUP_SIZE = 4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP
MULTIXACT_MEMBERGROUPS_PER_PAGE = int(BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE)
MULTIXACT_MEMBERS_PER_PAGE = MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP
def MultiXactIdToOffsetSegment(xid: int):
return int(xid / (SLRU_PAGES_PER_SEGMENT * MULTIXACT_OFFSETS_PER_PAGE))
def MXOffsetToMemberSegment(off: int):
return int(off / (SLRU_PAGES_PER_SEGMENT * MULTIXACT_MEMBERS_PER_PAGE))
def advance_multixid_to(
pg_bin: PgBin, vanilla_pg: VanillaPostgres, next_multi_xid: int, next_multi_offset: int
):
"""
Use pg_resetwal to advance the nextMulti and nextMultiOffset values in a stand-alone
Postgres cluster. This is useful to get close to wraparound or some other interesting
value, without having to burn a lot of time consuming the (multi-)XIDs one by one.
The new values should be higher than the old ones, in a wraparound-aware sense.
On entry, the server should be running. It will be shut down and restarted.
"""
# Read old values from the last checkpoint. We will pass the old oldestMultiXid value
# back to pg_resetwal, there's no option to leave it alone.
with vanilla_pg.connect() as conn:
with conn.cursor() as cur:
# Make sure the oldest-multi-xid value in the control file is up-to-date
cur.execute("checkpoint")
cur.execute("select oldest_multi_xid, next_multixact_id from pg_control_checkpoint()")
rec = cur.fetchone()
assert rec is not None
(ckpt_oldest_multi_xid, ckpt_next_multi_xid) = rec
log.info(f"oldestMultiXid was {ckpt_oldest_multi_xid}, nextMultiXid was {ckpt_next_multi_xid}")
log.info(f"Resetting to {next_multi_xid}")
# Use pg_resetwal to reset the next multiXid and multiOffset to given values.
vanilla_pg.stop()
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
cmd = [
pg_resetwal_path,
f"--multixact-ids={next_multi_xid},{ckpt_oldest_multi_xid}",
f"--multixact-offset={next_multi_offset}",
"-D",
str(vanilla_pg.pgdatadir),
]
pg_bin.run_capture(cmd)
# Because we skip over a lot of values, Postgres hasn't created the SLRU segments for
# the new values yet. Create them manually, to allow Postgres to start up.
#
# This leaves "gaps" in the SLRU where segments between old value and new value are
# missing. That's OK for our purposes. Autovacuum will print some warnings about the
# missing segments, but will clean it up by truncating the SLRUs up to the new value,
# closing the gap.
segname = "%04X" % MultiXactIdToOffsetSegment(next_multi_xid)
log.info(f"Creating dummy segment pg_multixact/offsets/{segname}")
with open(vanilla_pg.pgdatadir / "pg_multixact" / "offsets" / segname, "w") as of:
of.write("\0" * SLRU_PAGES_PER_SEGMENT * BLCKSZ)
of.flush()
segname = "%04X" % MXOffsetToMemberSegment(next_multi_offset)
log.info(f"Creating dummy segment pg_multixact/members/{segname}")
with open(vanilla_pg.pgdatadir / "pg_multixact" / "members" / segname, "w") as of:
of.write("\0" * SLRU_PAGES_PER_SEGMENT * BLCKSZ)
of.flush()
# Start Postgres again and wait until autovacuum has processed all the databases
#
# This allows truncating the SLRUs, fixing the gaps with missing segments.
vanilla_pg.start()
with vanilla_pg.connect().cursor() as cur:
for _ in range(1000):
datminmxid = int(
query_scalar(cur, "select min(datminmxid::text::int8) from pg_database")
)
log.info(f"datminmxid {datminmxid}")
if next_multi_xid - datminmxid < 1_000_000: # not wraparound-aware!
break
time.sleep(0.5)
def test_multixid_wraparound_import(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_bin: PgBin,
vanilla_pg,
):
"""
Test that the wraparound of the "next-multi-xid" counter is handled correctly in
pageserver, And multi-offsets as well
"""
env = neon_env_builder.init_start()
# In order to to test multixid wraparound, we need to first advance the counter to
# within spitting distance of the wraparound, that is 2^32 multi-XIDs. We could simply
# run a workload that consumes a lot of multi-XIDs until we approach that, but that
# takes a very long time. So we cheat.
#
# Our strategy is to create a vanilla Postgres cluster, and use pg_resetwal to
# directly set the multi-xid counter a higher value. However, we cannot directly set
# it to just before 2^32 (~ 4 billion), because that would make the exisitng
# 'relminmxid' values to look like they're in the future. It's not clear how the
# system would behave in that situation. So instead, we bump it up ~ 1 billion
# multi-XIDs at a time, and let autovacuum to process all the relations and update
# 'relminmxid' between each run.
#
# XXX: For the multi-offsets, most of the bump is done in the last call. This is
# because advancing it ~ 1 billion at a time hit a pathological case in the
# MultiXactMemberFreezeThreshold() function, causing autovacuum not trigger multixid
# freezing. See
# https://www.postgresql.org/message-id/85fb354c-f89f-4d47-b3a2-3cbd461c90a3%40iki.fi
# Multi-offsets don't have the same wraparound problems at 2 billion mark as
# multi-xids do, so one big jump is fine.
vanilla_pg.configure(
[
"log_autovacuum_min_duration = 0",
# Perform anti-wraparound vacuuming aggressively
"autovacuum_naptime='1 s'",
"autovacuum_freeze_max_age = 1000000",
"autovacuum_multixact_freeze_max_age = 1000000",
],
)
vanilla_pg.start()
advance_multixid_to(pg_bin, vanilla_pg, 0x40000000, 0x10000000)
advance_multixid_to(pg_bin, vanilla_pg, 0x80000000, 0x20000000)
advance_multixid_to(pg_bin, vanilla_pg, 0xC0000000, 0x30000000)
advance_multixid_to(pg_bin, vanilla_pg, 0xFFFFFF00, 0xFFFFFF00)
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
vanilla_pg.safe_psql("create table tt as select g as id from generate_series(1, 10) g")
vanilla_pg.safe_psql("CHECKPOINT")
# Import the cluster to the pageserver
tenant_id = TenantId.generate()
env.pageserver.tenant_create(tenant_id)
timeline_id = TimelineId.generate()
import_timeline_from_vanilla_postgres(
test_output_dir,
env,
pg_bin,
tenant_id,
timeline_id,
"imported_multixid_wraparound_test",
vanilla_pg.connstr(),
)
vanilla_pg.stop()
endpoint = env.endpoints.create_start(
"imported_multixid_wraparound_test",
tenant_id=tenant_id,
config_lines=[
"log_autovacuum_min_duration = 0",
"autovacuum_naptime='5 s'",
"autovacuum=off",
],
)
conn = endpoint.connect()
cur = conn.cursor()
assert query_scalar(cur, "select count(*) from tt") == 10 # sanity check
# Install extension containing function needed for test
cur.execute("CREATE EXTENSION neon_test_utils")
# Consume a lot of XIDs, just to advance the XIDs to different range than the
# multi-xids. That avoids confusion while debugging
cur.execute("select test_consume_xids(100000)")
cur.execute("select pg_switch_wal()")
cur.execute("checkpoint")
# Use subtransactions so that each row in 'tt' is stamped with different XID. Leave
# the transaction open.
cur.execute("BEGIN")
cur.execute(
"""
do $$
declare
idvar int;
begin
for idvar in select id from tt loop
begin
update tt set id = idvar where id = idvar;
exception when others then
raise 'didn''t expect an error: %', sqlerrm;
end;
end loop;
end;
$$;
"""
)
# In a different transaction, acquire a FOR KEY SHARE lock on each row. This generates
# a new multixid for each row, with the previous xmax and this transaction's XID as the
# members.
#
# Repeat this until the multi-xid counter wraps around.
conn3 = endpoint.connect()
cur3 = conn3.cursor()
next_multixact_id_before_restart = 0
observed_before_wraparound = False
while True:
cur3.execute("BEGIN")
cur3.execute("SELECT * FROM tt FOR KEY SHARE")
# Get the xmax of one of the rows we locked. It should be a multi-xid. It might
# not be the latest one, but close enough.
row_xmax = int(query_scalar(cur3, "SELECT xmax FROM tt LIMIT 1"))
cur3.execute("COMMIT")
log.info(f"observed a row with xmax {row_xmax}")
# High value means not wrapped around yet
if row_xmax >= 0xFFFFFF00:
observed_before_wraparound = True
continue
# xmax should not be a regular XID. (We bumped up the regular XID range earlier
# to around 100000 and above.)
assert row_xmax < 100
# xmax values < FirstNormalTransactionId (== 3) could be special XID values, or
# multixid values after wraparound. We don't know for sure which, so keep going to
# be sure we see value that's unambiguously a wrapped-around multixid
if row_xmax < 3:
continue
next_multixact_id_before_restart = row_xmax
log.info(
f"next_multixact_id is now at {next_multixact_id_before_restart} or a little higher"
)
break
# We should have observed the state before wraparound
assert observed_before_wraparound
cur.execute("COMMIT")
# Wait until pageserver has received all the data, and restart the endpoint
wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id)
endpoint.stop(mode="immediate") # 'immediate' to avoid writing shutdown checkpoint
endpoint.start()
# Check that the next-multixid value wrapped around correctly
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("select next_multixact_id from pg_control_checkpoint()")
next_multixact_id_after_restart = int(
query_scalar(cur, "select next_multixact_id from pg_control_checkpoint()")
)
log.info(f"next_multixact_id after restart: {next_multixact_id_after_restart}")
assert next_multixact_id_after_restart >= next_multixact_id_before_restart
# The multi-offset should wrap around as well
cur.execute("select next_multi_offset from pg_control_checkpoint()")
next_multi_offset_after_restart = int(
query_scalar(cur, "select next_multi_offset from pg_control_checkpoint()")
)
log.info(f"next_multi_offset after restart: {next_multi_offset_after_restart}")
assert next_multi_offset_after_restart < 100000

View File

@@ -85,10 +85,8 @@ def check_client(env: NeonEnv, client: PageserverHttpClient):
# create new tenant and check it is also there
tenant_id = TenantId.generate()
env.pageserver.tenant_create(
tenant_id,
generation=env.storage_controller.attach_hook_issue(tenant_id, env.pageserver.id),
auth_token=client.auth_token,
client.tenant_create(
tenant_id, generation=env.storage_controller.attach_hook_issue(tenant_id, env.pageserver.id)
)
assert tenant_id in {TenantId(t["id"]) for t in client.tenant_list()}

View File

@@ -1,646 +0,0 @@
"""
In PostgreSQL, a standby always has to wait for a running-xacts WAL record to
arrive before it can start accepting queries. Furthermore, if there are
transactions with too many subxids (> 64) open to fit in the in-memory subxids
cache, the running-xacts record will be marked as "suboverflowed", and the
standby will need to also wait for the currently in-progress transactions to
finish.
In Neon, we have an additional mechanism that scans the CLOG at server startup
to determine the list of running transactions, so that the standby can start up
immediately without waiting for the running-xacts record, but that mechanism
only works if the # of active (sub-)transactions is reasonably small. Otherwise
it falls back to waiting. Furthermore, it's somewhat optimistic in using up the
known-assigned XIDs array: if too many transactions with subxids are started in
the primary later, the replay in the replica will crash with "too many
KnownAssignedXids" error.
This module contains tests for those various cases at standby startup: starting
from shutdown checkpoint, using the CLOG scanning mechanism, waiting for
running-xacts record and for in-progress transactions to finish etc.
"""
import threading
from contextlib import closing
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.pg_version import PgVersion
from fixtures.utils import query_scalar, wait_until
CREATE_SUBXACTS_FUNC = """
create or replace function create_subxacts(n integer) returns void as $$
declare
i integer;
begin
for i in 1..n loop
begin
insert into t (payload) values (0);
exception
when others then
raise exception 'caught something: %', sqlerrm;
end;
end loop;
end; $$ language plpgsql
"""
def test_replica_start_scan_clog(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup. There is one
transaction active in the primary when the standby is started. The primary
is killed before it has a chance to write a running-xacts record. The
CLOG-scanning at neon startup allows the standby to start up anyway.
See the module docstring for background.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
primary_cur.execute("select pg_switch_wal()")
# Start a transaction in the primary. Leave the transaction open.
#
# The transaction has some subtransactions, but not too many to cause the
# CLOG-scanning mechanism to give up.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(50)")
# Wait for the WAL to be flushed, but then immediately kill the primary,
# before it has a chance to generate a running-xacts record.
primary_cur.execute("select neon_xlogflush()")
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
primary.stop(mode="immediate")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism.
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
# The transaction did not commit, so it should not be visible in the secondary
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (0,)
def test_replica_start_scan_clog_crashed_xids(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup, after
leaving behind crashed transactions.
See the module docstring for background.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
primary_cur.execute("select pg_switch_wal()")
# Consume a lot of XIDs, then kill Postgres without giving it a
# chance to write abort records for them.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(100000)")
primary.stop(mode="immediate")
# Restart the primary. Do some light work, and shut it down cleanly
primary.start()
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("insert into t (payload) values (0)")
primary.stop(mode="fast")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism. (Restarting the primary writes a checkpoint and/or running-xacts
# record, which allows the standby to know that the crashed XIDs are aborted)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (1,)
def test_replica_start_at_running_xacts(neon_simple_env: NeonEnv, pg_version):
"""
Test that starting a replica works right after the primary has
created a running-xacts record. This may seem like a trivial case,
but during development, we had a bug that was triggered by having
oldestActiveXid == nextXid. Starting right after a running-xacts
record is one way to test that case.
See the module docstring for background.
"""
env = neon_simple_env
if env.pg_version == PgVersion.V14 or env.pg_version == PgVersion.V15:
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("select pg_log_standby_snapshot()")
primary_cur.execute("select neon_xlogflush()")
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select 123")
assert secondary_cur.fetchone() == (123,)
def test_replica_start_wait_subxids_finish(neon_simple_env: NeonEnv):
"""
Test replica startup when there are a lot of (sub)transactions active in the
primary. That's too many for the CLOG-scanning mechanism to handle, so the
replica has to wait for the large transaction to finish before it starts to
accept queries.
After replica startup, test MVCC with transactions that were in-progress
when the replica was started.
See the module docstring for background.
"""
# Initialize the primary, a test table, and a helper function to create
# lots of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Start a transaction with 100000 subtransactions, and leave it open. That's
# too many to fit in the "known-assigned XIDs array" in the replica, and
# also too many to fit in the subxid caches so the running-xacts record will
# also overflow.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(100000)")
# Start another, smaller transaction in the primary. We'll come back to this
# later.
primary_conn2 = primary.connect()
primary_cur2 = primary_conn2.cursor()
primary_cur2.execute("begin")
primary_cur2.execute("insert into t (payload) values (0)")
# Create a replica. but before that, wait for the wal to be flushed to
# safekeepers, so that the replica is started at a point where the large
# transaction is already active. (The whole transaction might not be flushed
# yet, but that's OK.)
#
# Start it in a separate thread, so that we can do other stuff while it's
# blocked waiting for the startup to finish.
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
secondary = env.endpoints.new_replica(origin=primary, endpoint_id="secondary")
start_secondary_thread = threading.Thread(target=secondary.start)
start_secondary_thread.start()
# Verify that the replica has otherwise started up, but cannot start
# accepting queries yet.
log.info("Waiting 5 s to verify that the secondary does not start")
start_secondary_thread.join(5)
assert secondary.log_contains("consistent recovery state reached")
assert secondary.log_contains("started streaming WAL from primary")
# The "redo starts" message is printed when the first WAL record is
# received. It might or might not be present in the log depending on how
# far exactly the WAL was flushed when the replica was started, and whether
# background activity caused any more WAL records to be flushed on the
# primary afterwards.
#
# assert secondary.log_contains("redo # starts")
# should not be open for connections yet
assert start_secondary_thread.is_alive()
assert not secondary.is_running()
assert not secondary.log_contains("database system is ready to accept read-only connections")
# Commit the large transaction in the primary.
#
# Within the next 15 s, the primary should write a new running-xacts record
# to the WAL which shows the transaction as completed. Once the replica
# replays that record, it will start accepting queries.
primary_cur.execute("commit")
start_secondary_thread.join()
# Verify that the large transaction is correctly visible in the secondary
# (but not the second, small transaction, which is still in-progress!)
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100000,)
# Perform some more MVCC testing using the second transaction that was
# started in the primary before the replica was created
primary_cur2.execute("select create_subxacts(10000)")
# The second transaction still hasn't committed
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("BEGIN ISOLATION LEVEL REPEATABLE READ")
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100000,)
# Commit the second transaction in the primary
primary_cur2.execute("commit")
# Should still be invisible to the old snapshot
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100000,)
# Commit the REPEATABLE READ transaction in the replica. Both
# primary transactions should now be visible to a new snapshot.
secondary_cur.execute("commit")
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (110001,)
def test_replica_too_many_known_assigned_xids(neon_simple_env: NeonEnv):
"""
The CLOG-scanning mechanism fills the known-assigned XIDs array
optimistically at standby startup, betting that it can still fit
upcoming transactions replayed later from the WAL in the
array. This test tests what happens when that bet fails and the
known-assigned XID array fills up after the standby has already
been started. The WAL redo will fail with an error:
FATAL: too many KnownAssignedXids
CONTEXT: WAL redo at 0/1895CB0 for neon/INSERT: off: 25, flags: 0x08; blkref #0: rel 1663/5/16385, blk 64
which causes the standby to shut down.
See the module docstring for background.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Determine how many connections we can use
primary_cur.execute("show max_connections")
max_connections = int(primary_cur.fetchall()[0][0])
primary_cur.execute("show superuser_reserved_connections")
superuser_reserved_connections = int(primary_cur.fetchall()[0][0])
n_connections = max_connections - superuser_reserved_connections
n_subxids = 200
# Start one top transaction in primary, with lots of subtransactions. This
# uses up much of the known-assigned XIDs space in the standby, but doesn't
# cause it to overflow.
large_p_conn = primary.connect()
large_p_cur = large_p_conn.cursor()
large_p_cur.execute("begin")
large_p_cur.execute(f"select create_subxacts({max_connections} * 30)")
with closing(primary.connect()) as small_p_conn:
with small_p_conn.cursor() as small_p_cur:
small_p_cur.execute("select create_subxacts(1)")
# Create a replica at this LSN
primary_cur.execute("select neon_xlogflush()")
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
# The transaction in primary has not committed yet.
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (1,)
# Start max number of top transactions in primary, with a lot of
# subtransactions each. We add the subtransactions to each top transaction
# in a round-robin fashion, instead of adding a lot of subtransactions to
# one top transaction at a time. This way, we will have the max number of
# subtransactions in the in-memory subxid cache of each top transaction,
# until they all overflow.
#
# Currently, PGPROC_MAX_CACHED_SUBXIDS == 64, so this will overflow the all
# the subxid caches after creating 64 subxids in each top transaction. The
# point just before the caches have overflowed is the most interesting point
# in time, but we'll keep going beyond that, to ensure that this test is
# robust even if PGPROC_MAX_CACHED_SUBXIDS changes.
p_curs = []
for _ in range(0, n_connections):
p_cur = primary.connect().cursor()
p_cur.execute("begin")
p_curs.append(p_cur)
for _subxid in range(0, n_subxids):
for i in range(0, n_connections):
p_curs[i].execute("select create_subxacts(1)")
# Commit all the transactions in the primary
for i in range(0, n_connections):
p_curs[i].execute("commit")
large_p_cur.execute("commit")
# Wait until the replica crashes with "too many KnownAssignedXids" error.
def check_replica_crashed():
try:
secondary.connect()
except psycopg2.Error:
# Once the connection fails, return success
return None
raise RuntimeError("connection succeeded")
wait_until(20, 0.5, check_replica_crashed)
assert secondary.log_contains("too many KnownAssignedXids")
# Replica is crashed, so ignore stop result
secondary.check_stop_result = False
def test_replica_start_repro_visibility_bug(neon_simple_env: NeonEnv):
"""
Before PR #7288, a hot standby in neon incorrectly started up
immediately, before it had received a running-xacts record. That
led to visibility bugs if there were active transactions in the
primary. This test reproduces the incorrect query results and
incorrectly set hint bits, before that was fixed.
"""
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
p_cur = primary.connect().cursor()
p_cur.execute("begin")
p_cur.execute("create table t(pk integer primary key, payload integer)")
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
wait_replica_caughtup(primary, secondary)
s_cur = secondary.connect().cursor()
# Set hint bits for pg_class tuples. If primary's transaction is
# not marked as in-progress in MVCC snapshot, then XMIN_INVALID
# hint bit will be set for table's 't' tuple, making it invisible
# even after the commit record is replayed later.
s_cur.execute("select * from pg_class")
p_cur.execute("commit")
wait_replica_caughtup(primary, secondary)
s_cur.execute("select * from t where pk = 1")
assert s_cur.fetchone() == (1, 0)
@pytest.mark.parametrize("shutdown", [True, False])
def test_replica_start_with_prepared_xacts(neon_simple_env: NeonEnv, shutdown: bool):
"""
Test the CLOG-scanning mechanism at hot standby startup in the presence of
prepared transactions.
This test is run in two variants: one where the primary server is shut down
before starting the secondary, or not.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
)
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute("create table t1(pk integer primary key)")
primary_cur.execute("create table t2(pk integer primary key)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Prepare a transaction for two-phase commit
primary_cur.execute("begin")
primary_cur.execute("insert into t1 values (1)")
primary_cur.execute("prepare transaction 't1'")
# Prepare another transaction for two-phase commit, with a subtransaction
primary_cur.execute("begin")
primary_cur.execute("insert into t2 values (2)")
primary_cur.execute("savepoint sp")
primary_cur.execute("insert into t2 values (3)")
primary_cur.execute("prepare transaction 't2'")
# Start a transaction in the primary. Leave the transaction open.
#
# The transaction has some subtransactions, but not too many to cause the
# CLOG-scanning mechanism to give up.
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(50)")
# Wait for the WAL to be flushed
primary_cur.execute("select neon_xlogflush()")
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
if shutdown:
primary.stop(mode="fast")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism.
secondary = env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
)
# The transaction did not commit, so it should not be visible in the secondary
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (0,)
secondary_cur.execute("select count(*) from t1")
assert secondary_cur.fetchone() == (0,)
secondary_cur.execute("select count(*) from t2")
assert secondary_cur.fetchone() == (0,)
if shutdown:
primary.start()
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
else:
primary_cur.execute("commit")
primary_cur.execute("commit prepared 't1'")
primary_cur.execute("commit prepared 't2'")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
if shutdown:
assert secondary_cur.fetchone() == (0,)
else:
assert secondary_cur.fetchone() == (50,)
secondary_cur.execute("select * from t1")
assert secondary_cur.fetchall() == [(1,)]
secondary_cur.execute("select * from t2")
assert secondary_cur.fetchall() == [(2,), (3,)]
def test_replica_start_with_prepared_xacts_with_subxacts(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup in the presence of
prepared transactions, with subtransactions.
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
)
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
# Install extension containing function needed for test
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Advance nextXid close to the beginning of the next pg_subtrans segment (2^16 XIDs)
#
# This is interesting, because it tests that pg_subtrans is initialized correctly
# at standby startup. (We had a bug where it didn't at one point during development.)
while True:
xid = int(query_scalar(primary_cur, "SELECT txid_current()"))
log.info(f"xid now {xid}")
# Consume 500 transactions at a time until we get close
if xid < 65535 - 600:
primary_cur.execute("select test_consume_xids(500);")
else:
break
primary_cur.execute("checkpoint")
# Prepare a transaction for two-phase commit
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(1000)")
primary_cur.execute("prepare transaction 't1'")
# Wait for the WAL to be flushed, and stop the primary
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
primary.stop(mode="fast")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism.
secondary = env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
)
# The transaction did not commit, so it should not be visible in the secondary
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (0,)
primary.start()
# Open a lot of subtransactions in the primary, causing the subxids cache to overflow
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("select create_subxacts(100000)")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100000,)
primary_cur.execute("commit prepared 't1'")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (101000,)
def test_replica_start_with_prepared_xacts_with_many_subxacts(neon_simple_env: NeonEnv):
"""
Test the CLOG-scanning mechanism at hot standby startup in the presence of
prepared transactions, with lots of subtransactions.
Like test_replica_start_with_prepared_xacts_with_subxacts, but with more
subxacts, to test that the prepared transaction's subxids don't consume
space in the known-assigned XIDs array. (They are set in pg_subtrans
instead)
"""
# Initialize the primary, a test table, and a helper function to create lots
# of subtransactions.
env = neon_simple_env
primary = env.endpoints.create_start(
branch_name="main", endpoint_id="primary", config_lines=["max_prepared_transactions=5"]
)
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
# Install extension containing function needed for test
primary_cur.execute("CREATE EXTENSION neon_test_utils")
primary_cur.execute("create table t(pk serial primary key, payload integer)")
primary_cur.execute(CREATE_SUBXACTS_FUNC)
# Prepare a transaction for two-phase commit, with lots of subxids
primary_cur.execute("begin")
primary_cur.execute("select create_subxacts(50000)")
# to make things a bit more varied, intersperse a few other XIDs in between
# the prepared transaction's sub-XIDs
with primary.connect().cursor() as primary_cur2:
primary_cur2.execute("insert into t (payload) values (123)")
primary_cur2.execute("begin; insert into t (payload) values (-1); rollback")
primary_cur.execute("select create_subxacts(50000)")
primary_cur.execute("prepare transaction 't1'")
# Wait for the WAL to be flushed
wait_for_last_flush_lsn(env, primary, env.initial_tenant, env.initial_timeline)
primary.stop(mode="fast")
# Create a replica. It should start up normally, thanks to the CLOG-scanning
# mechanism.
secondary = env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary", config_lines=["max_prepared_transactions=5"]
)
# The transaction did not commit, so it should not be visible in the secondary
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (1,)
primary.start()
# Open a lot of subtransactions in the primary, causing the subxids cache to overflow
primary_conn = primary.connect()
primary_cur = primary_conn.cursor()
primary_cur.execute("select create_subxacts(100000)")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (100001,)
primary_cur.execute("commit prepared 't1'")
wait_replica_caughtup(primary, secondary)
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (200001,)

View File

@@ -0,0 +1,32 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
@pytest.mark.xfail
def test_replication_start(neon_simple_env: NeonEnv):
env = neon_simple_env
with env.endpoints.create_start(branch_name="main", endpoint_id="primary") as primary:
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute("begin")
p_cur.execute("create table t(pk integer primary key, payload integer)")
p_cur.execute("insert into t values (generate_series(1,100000), 0)")
p_cur.execute("select txid_current()")
xid = p_cur.fetchall()[0][0]
log.info(f"Master transaction {xid}")
with env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary"
) as secondary:
wait_replica_caughtup(primary, secondary)
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
# Enforce setting hint bits for pg_class tuples.
# If master's transaction is not marked as in-progress in MVCC snapshot,
# then XMIN_INVALID hint bit will be set for table's 't' tuple makeing it invisible.
s_cur.execute("select * from pg_class")
p_cur.execute("commit")
wait_replica_caughtup(primary, secondary)
s_cur.execute("select * from t where pk = 1")
assert s_cur.fetchone() == (1, 0)

View File

@@ -542,13 +542,6 @@ def test_sharding_split_smoke(
for k, v in non_default_tenant_config.items():
assert config.effective_config[k] == v
# Check that heatmap uploads remain enabled after shard split
# (https://github.com/neondatabase/neon/issues/8189)
assert (
config.effective_config["heatmap_period"]
and config.effective_config["heatmap_period"] != "0s"
)
# Validate pageserver state: expect every child shard to have an attached and secondary location
(total, attached) = get_node_shard_counts(env, tenant_ids=[tenant_id])
assert sum(attached.values()) == split_shard_count

View File

@@ -315,7 +315,7 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
# Create a tenant directly via pageserver HTTP API, skipping the storage controller
tenant_id = TenantId.generate()
generation = 123
origin_ps.tenant_create(tenant_id, generation=generation)
origin_ps.http_client().tenant_create(tenant_id, generation=generation)
# As if doing a live migration, first configure origin into stale mode
r = origin_ps.http_client().tenant_location_conf(

View File

@@ -1,5 +1,5 @@
{
"v16": ["16.3", "b810fdfcbb59afea7ea7bbe0cf94eaccb55a2ea2"],
"v15": ["15.7", "4874c8e52ed349a9f8290bbdcd91eb92677a5d24"],
"v14": ["14.12", "ad73770c446ea361f43e4f0404798b7e5e7a62d8"]
"v16": ["16.3", "d55e0aca104af0b611cf5565f1033b2acd2dcc1c"],
"v15": ["15.7", "2ff5ecc67c64e5fe44b7dde598e64e4538e0c373"],
"v14": ["14.12", "7845c122d51d3ebb547a984a640ac0310a2fadce"]
}