mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 04:20:39 +00:00
Compare commits
9 Commits
compress-p
...
rc/2024-07
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
30027d94a2 | ||
|
|
bc704917a3 | ||
|
|
b8bbaafc03 | ||
|
|
e1a06b40b7 | ||
|
|
babbe125da | ||
|
|
ca2f7d06b2 | ||
|
|
c22c6a6c9e | ||
|
|
deec3bc578 | ||
|
|
063553a51b |
@@ -21,10 +21,8 @@ use pageserver_api::config::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
|
||||
};
|
||||
use pageserver_api::controller_api::PlacementPolicy;
|
||||
use pageserver_api::models::{
|
||||
ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo,
|
||||
};
|
||||
use pageserver_api::controller_api::{PlacementPolicy, TenantCreateRequest};
|
||||
use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInfo};
|
||||
use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
|
||||
use postgres_backend::AuthType;
|
||||
use postgres_connection::parse_host_port;
|
||||
|
||||
@@ -17,8 +17,7 @@ use anyhow::{bail, Context};
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::SinkExt;
|
||||
use pageserver_api::models::{
|
||||
self, AuxFilePolicy, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo,
|
||||
TimelineInfo,
|
||||
self, AuxFilePolicy, LocationConfig, TenantHistorySize, TenantInfo, TimelineInfo,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
@@ -397,28 +396,6 @@ impl PageServerNode {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn tenant_create(
|
||||
&self,
|
||||
new_tenant_id: TenantId,
|
||||
generation: Option<u32>,
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<TenantId> {
|
||||
let config = Self::parse_config(settings.clone())?;
|
||||
|
||||
let request = models::TenantCreateRequest {
|
||||
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
|
||||
generation,
|
||||
config,
|
||||
shard_parameters: ShardParameters::default(),
|
||||
// Placement policy is not meaningful for creations not done via storage controller
|
||||
placement_policy: None,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
}
|
||||
Ok(self.http_client.tenant_create(&request).await?)
|
||||
}
|
||||
|
||||
pub async fn tenant_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -5,12 +5,11 @@ use crate::{
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeConfigureRequest, NodeRegisterRequest, TenantCreateResponse, TenantLocateResponse,
|
||||
TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
NodeConfigureRequest, NodeRegisterRequest, TenantCreateRequest, TenantCreateResponse,
|
||||
TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
},
|
||||
models::{
|
||||
TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
|
||||
TimelineCreateRequest, TimelineInfo,
|
||||
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
|
||||
},
|
||||
shard::{ShardStripeSize, TenantShardId},
|
||||
};
|
||||
|
||||
@@ -4,13 +4,13 @@ use std::{str::FromStr, time::Duration};
|
||||
use clap::{Parser, Subcommand};
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy,
|
||||
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
|
||||
TenantDescribeResponse, TenantPolicyRequest,
|
||||
},
|
||||
models::{
|
||||
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
|
||||
ShardParameters, TenantConfig, TenantConfigRequest, TenantCreateRequest,
|
||||
TenantShardSplitRequest, TenantShardSplitResponse,
|
||||
ShardParameters, TenantConfig, TenantConfigRequest, TenantShardSplitRequest,
|
||||
TenantShardSplitResponse,
|
||||
},
|
||||
shard::{ShardStripeSize, TenantShardId},
|
||||
};
|
||||
@@ -336,14 +336,18 @@ async fn main() -> anyhow::Result<()> {
|
||||
.await?;
|
||||
}
|
||||
Command::TenantCreate { tenant_id } => {
|
||||
vps_client
|
||||
.tenant_create(&TenantCreateRequest {
|
||||
new_tenant_id: TenantShardId::unsharded(tenant_id),
|
||||
generation: None,
|
||||
shard_parameters: ShardParameters::default(),
|
||||
placement_policy: Some(PlacementPolicy::Attached(1)),
|
||||
config: TenantConfig::default(),
|
||||
})
|
||||
storcon_client
|
||||
.dispatch(
|
||||
Method::POST,
|
||||
"v1/tenant".to_string(),
|
||||
Some(TenantCreateRequest {
|
||||
new_tenant_id: TenantShardId::unsharded(tenant_id),
|
||||
generation: None,
|
||||
shard_parameters: ShardParameters::default(),
|
||||
placement_policy: Some(PlacementPolicy::Attached(1)),
|
||||
config: TenantConfig::default(),
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::TenantDelete { tenant_id } => {
|
||||
|
||||
@@ -11,6 +11,27 @@ use crate::{
|
||||
shard::{ShardStripeSize, TenantShardId},
|
||||
};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantCreateRequest {
|
||||
pub new_tenant_id: TenantShardId,
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generation: Option<u32>,
|
||||
|
||||
// If omitted, create a single shard with TenantShardId::unsharded()
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
|
||||
pub shard_parameters: ShardParameters,
|
||||
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub placement_policy: Option<PlacementPolicy>,
|
||||
|
||||
#[serde(flatten)]
|
||||
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantCreateResponseShard {
|
||||
pub shard_id: TenantShardId,
|
||||
@@ -280,4 +301,19 @@ mod test {
|
||||
assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reject_unknown_field() {
|
||||
let id = TenantId::generate();
|
||||
let create_request = serde_json::json!({
|
||||
"new_tenant_id": id.to_string(),
|
||||
"unknown_field": "unknown_value".to_string(),
|
||||
});
|
||||
let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("unknown field `unknown_field`"),
|
||||
"expect unknown field `unknown_field` error, got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,6 @@ use utils::{
|
||||
serde_system_time,
|
||||
};
|
||||
|
||||
use crate::controller_api::PlacementPolicy;
|
||||
use crate::{
|
||||
reltag::RelTag,
|
||||
shard::{ShardCount, ShardStripeSize, TenantShardId},
|
||||
@@ -271,28 +270,6 @@ impl Default for ShardParameters {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantCreateRequest {
|
||||
pub new_tenant_id: TenantShardId,
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generation: Option<u32>,
|
||||
|
||||
// If omitted, create a single shard with TenantShardId::unsharded()
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
|
||||
pub shard_parameters: ShardParameters,
|
||||
|
||||
// This parameter is only meaningful in requests sent to the storage controller
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub placement_policy: Option<PlacementPolicy>,
|
||||
|
||||
#[serde(flatten)]
|
||||
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
}
|
||||
|
||||
/// An alternative representation of `pageserver::tenant::TenantConf` with
|
||||
/// simpler types.
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
|
||||
@@ -547,10 +524,6 @@ pub struct LocationConfigListResponse {
|
||||
pub tenant_shards: Vec<(TenantShardId, Option<LocationConfig>)>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct TenantCreateResponse(pub TenantId);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct StatusResponse {
|
||||
pub id: NodeId,
|
||||
@@ -1507,18 +1480,6 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_reject_unknown_field() {
|
||||
let id = TenantId::generate();
|
||||
let create_request = json!({
|
||||
"new_tenant_id": id.to_string(),
|
||||
"unknown_field": "unknown_value".to_string(),
|
||||
});
|
||||
let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("unknown field `unknown_field`"),
|
||||
"expect unknown field `unknown_field` error, got: {}",
|
||||
err
|
||||
);
|
||||
|
||||
let id = TenantId::generate();
|
||||
let config_request = json!({
|
||||
"tenant_id": id.to_string(),
|
||||
|
||||
@@ -356,6 +356,28 @@ impl CheckPoint {
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Advance next multi-XID/offset to those given in arguments.
|
||||
///
|
||||
/// It's important that this handles wraparound correctly. This should match the
|
||||
/// MultiXactAdvanceNextMXact() logic in PostgreSQL's xlog_redo() function.
|
||||
///
|
||||
/// Returns 'true' if the Checkpoint was updated.
|
||||
pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
|
||||
let mut modified = false;
|
||||
|
||||
if multi_xid.wrapping_sub(self.nextMulti) as i32 > 0 {
|
||||
self.nextMulti = multi_xid;
|
||||
modified = true;
|
||||
}
|
||||
|
||||
if multi_offset.wrapping_sub(self.nextMultiOffset) as i32 > 0 {
|
||||
self.nextMultiOffset = multi_offset;
|
||||
modified = true;
|
||||
}
|
||||
|
||||
modified
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate new, empty WAL segment, with correct block headers at the first
|
||||
|
||||
@@ -202,6 +202,53 @@ pub fn test_update_next_xid() {
|
||||
assert_eq!(checkpoint.nextXid.value, 2048);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_update_next_multixid() {
|
||||
let checkpoint_buf = [0u8; std::mem::size_of::<CheckPoint>()];
|
||||
let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
|
||||
|
||||
// simple case
|
||||
checkpoint.nextMulti = 20;
|
||||
checkpoint.nextMultiOffset = 20;
|
||||
checkpoint.update_next_multixid(1000, 2000);
|
||||
assert_eq!(checkpoint.nextMulti, 1000);
|
||||
assert_eq!(checkpoint.nextMultiOffset, 2000);
|
||||
|
||||
// No change
|
||||
checkpoint.update_next_multixid(500, 900);
|
||||
assert_eq!(checkpoint.nextMulti, 1000);
|
||||
assert_eq!(checkpoint.nextMultiOffset, 2000);
|
||||
|
||||
// Close to wraparound, but not wrapped around yet
|
||||
checkpoint.nextMulti = 0xffff0000;
|
||||
checkpoint.nextMultiOffset = 0xfffe0000;
|
||||
checkpoint.update_next_multixid(0xffff00ff, 0xfffe00ff);
|
||||
assert_eq!(checkpoint.nextMulti, 0xffff00ff);
|
||||
assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
|
||||
|
||||
// Wraparound
|
||||
checkpoint.update_next_multixid(1, 900);
|
||||
assert_eq!(checkpoint.nextMulti, 1);
|
||||
assert_eq!(checkpoint.nextMultiOffset, 900);
|
||||
|
||||
// Wraparound nextMulti to 0.
|
||||
//
|
||||
// It's a bit surprising that nextMulti can be 0, because that's a special value
|
||||
// (InvalidMultiXactId). However, that's how Postgres does it at multi-xid wraparound:
|
||||
// nextMulti wraps around to 0, but then when the next multi-xid is assigned, it skips
|
||||
// the 0 and the next multi-xid actually assigned is 1.
|
||||
checkpoint.nextMulti = 0xffff0000;
|
||||
checkpoint.nextMultiOffset = 0xfffe0000;
|
||||
checkpoint.update_next_multixid(0, 0xfffe00ff);
|
||||
assert_eq!(checkpoint.nextMulti, 0);
|
||||
assert_eq!(checkpoint.nextMultiOffset, 0xfffe00ff);
|
||||
|
||||
// Wraparound nextMultiOffset to 0
|
||||
checkpoint.update_next_multixid(0, 0);
|
||||
assert_eq!(checkpoint.nextMulti, 0);
|
||||
assert_eq!(checkpoint.nextMultiOffset, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_encode_logical_message() {
|
||||
let expected = [
|
||||
|
||||
@@ -205,15 +205,6 @@ impl Client {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn tenant_create(&self, req: &TenantCreateRequest) -> Result<TenantId> {
|
||||
let uri = format!("{}/v1/tenant", self.mgmt_api_endpoint);
|
||||
self.request(Method::POST, &uri, req)
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
/// The tenant deletion API can return 202 if deletion is incomplete, or
|
||||
/// 404 if it is complete. Callers are responsible for checking the status
|
||||
/// code and retrying. Error codes other than 404 will return Err().
|
||||
|
||||
@@ -53,7 +53,6 @@ use utils::http::request::{get_request_param, must_get_query_param, parse_query_
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::{LocationConf, TenantConfOpt};
|
||||
@@ -75,13 +74,12 @@ use crate::tenant::timeline::CompactFlags;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::SpawnMode;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
use crate::{config::PageServerConf, tenant::mgr};
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
|
||||
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
|
||||
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
|
||||
TimelineInfo,
|
||||
};
|
||||
use utils::{
|
||||
auth::SwappableJwtAuth,
|
||||
@@ -1237,75 +1235,6 @@ pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>,
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Helper for requests that may take a generation, which is mandatory
|
||||
/// when control_plane_api is set, but otherwise defaults to Generation::none()
|
||||
fn get_request_generation(state: &State, req_gen: Option<u32>) -> Result<Generation, ApiError> {
|
||||
if state.conf.control_plane_api.is_some() {
|
||||
req_gen
|
||||
.map(Generation::new)
|
||||
.ok_or(ApiError::BadRequest(anyhow!(
|
||||
"generation attribute missing"
|
||||
)))
|
||||
} else {
|
||||
// Legacy mode: all tenants operate with no generation
|
||||
Ok(Generation::none())
|
||||
}
|
||||
}
|
||||
|
||||
async fn tenant_create_handler(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||
let target_tenant_id = request_data.new_tenant_id;
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let _timer = STORAGE_TIME_GLOBAL
|
||||
.get_metric_with_label_values(&[StorageTimeOperation::CreateTenant.into()])
|
||||
.expect("bug")
|
||||
.start_timer();
|
||||
|
||||
let tenant_conf =
|
||||
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
let generation = get_request_generation(state, request_data.generation)?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let location_conf =
|
||||
LocationConf::attached_single(tenant_conf, generation, &request_data.shard_parameters);
|
||||
|
||||
let new_tenant = state
|
||||
.tenant_manager
|
||||
.upsert_location(
|
||||
target_tenant_id,
|
||||
location_conf,
|
||||
None,
|
||||
SpawnMode::Create,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let Some(new_tenant) = new_tenant else {
|
||||
// This should never happen: indicates a bug in upsert_location
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Upsert succeeded but didn't return tenant!"
|
||||
)));
|
||||
};
|
||||
// We created the tenant. Existing API semantics are that the tenant
|
||||
// is Active when this function returns.
|
||||
new_tenant
|
||||
.wait_to_become_active(ACTIVE_TENANT_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
json_response(
|
||||
StatusCode::CREATED,
|
||||
TenantCreateResponse(new_tenant.tenant_shard_id().tenant_id),
|
||||
)
|
||||
}
|
||||
|
||||
async fn get_tenant_config_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -2611,7 +2540,6 @@ pub fn make_router(
|
||||
api_handler(r, reload_auth_validation_keys_handler)
|
||||
})
|
||||
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
|
||||
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
|
||||
.get("/v1/tenant/:tenant_shard_id", |r| {
|
||||
api_handler(r, tenant_status)
|
||||
})
|
||||
|
||||
@@ -53,9 +53,6 @@ pub(crate) enum StorageTimeOperation {
|
||||
|
||||
#[strum(serialize = "find gc cutoffs")]
|
||||
FindGcCutoffs,
|
||||
|
||||
#[strum(serialize = "create tenant")]
|
||||
CreateTenant,
|
||||
}
|
||||
|
||||
pub(crate) static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {
|
||||
|
||||
@@ -213,8 +213,6 @@ pub(crate) enum SpawnMode {
|
||||
Eager,
|
||||
/// Lazy activation in the background, with the option to skip the queue if the need comes up
|
||||
Lazy,
|
||||
/// Tenant has been created during the lifetime of this process
|
||||
Create,
|
||||
}
|
||||
|
||||
///
|
||||
@@ -808,9 +806,6 @@ impl Tenant {
|
||||
};
|
||||
|
||||
let preload = match &mode {
|
||||
SpawnMode::Create => {
|
||||
None
|
||||
},
|
||||
SpawnMode::Eager | SpawnMode::Lazy => {
|
||||
let _preload_timer = TENANT.preload.start_timer();
|
||||
let res = tenant_clone
|
||||
@@ -832,11 +827,8 @@ impl Tenant {
|
||||
|
||||
// We will time the duration of the attach phase unless this is a creation (attach will do no work)
|
||||
let attached = {
|
||||
let _attach_timer = match mode {
|
||||
SpawnMode::Create => None,
|
||||
SpawnMode::Eager | SpawnMode::Lazy => Some(TENANT.attach.start_timer()),
|
||||
};
|
||||
tenant_clone.attach(preload, mode, &ctx).await
|
||||
let _attach_timer = Some(TENANT.attach.start_timer());
|
||||
tenant_clone.attach(preload, &ctx).await
|
||||
};
|
||||
|
||||
match attached {
|
||||
@@ -912,21 +904,14 @@ impl Tenant {
|
||||
async fn attach(
|
||||
self: &Arc<Tenant>,
|
||||
preload: Option<TenantPreload>,
|
||||
mode: SpawnMode,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
failpoint_support::sleep_millis_async!("before-attaching-tenant");
|
||||
|
||||
let preload = match (preload, mode) {
|
||||
(Some(p), _) => p,
|
||||
(None, SpawnMode::Create) => TenantPreload {
|
||||
timelines: HashMap::new(),
|
||||
},
|
||||
(None, _) => {
|
||||
anyhow::bail!("local-only deployment is no longer supported, https://github.com/neondatabase/neon/issues/5624");
|
||||
}
|
||||
let Some(preload) = preload else {
|
||||
anyhow::bail!("local-only deployment is no longer supported, https://github.com/neondatabase/neon/issues/5624");
|
||||
};
|
||||
|
||||
let mut timelines_to_resume_deletions = vec![];
|
||||
@@ -3841,7 +3826,7 @@ pub(crate) mod harness {
|
||||
let preload = tenant
|
||||
.preload(&self.remote_storage, CancellationToken::new())
|
||||
.await?;
|
||||
tenant.attach(Some(preload), SpawnMode::Eager, ctx).await?;
|
||||
tenant.attach(Some(preload), ctx).await?;
|
||||
|
||||
tenant.state.send_replace(TenantState::Active);
|
||||
for timeline in tenant.timelines.lock().unwrap().values() {
|
||||
@@ -6279,7 +6264,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_vectored_missing_metadata_key_reads() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_vectored_missing_data_key_reads")?;
|
||||
let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
|
||||
@@ -262,6 +262,7 @@ impl scheduler::RunningJob for RunningDownload {
|
||||
struct CompleteDownload {
|
||||
secondary_state: Arc<SecondaryTenant>,
|
||||
completed_at: Instant,
|
||||
result: Result<(), UpdateError>,
|
||||
}
|
||||
|
||||
impl scheduler::Completion for CompleteDownload {
|
||||
@@ -286,21 +287,33 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
let CompleteDownload {
|
||||
secondary_state,
|
||||
completed_at: _completed_at,
|
||||
result,
|
||||
} = completion;
|
||||
|
||||
tracing::debug!("Secondary tenant download completed");
|
||||
|
||||
let mut detail = secondary_state.detail.lock().unwrap();
|
||||
|
||||
let period = detail
|
||||
.last_download
|
||||
.as_ref()
|
||||
.map(|d| d.upload_period)
|
||||
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
|
||||
match result {
|
||||
Err(UpdateError::Restart) => {
|
||||
// Start downloading again as soon as we can. This will involve waiting for the scheduler's
|
||||
// scheduling interval. This slightly reduces the peak download speed of tenants that hit their
|
||||
// deadline and keep restarting, but that also helps give other tenants a chance to execute rather
|
||||
// that letting one big tenant dominate for a long time.
|
||||
detail.next_download = Some(Instant::now());
|
||||
}
|
||||
_ => {
|
||||
let period = detail
|
||||
.last_download
|
||||
.as_ref()
|
||||
.map(|d| d.upload_period)
|
||||
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
|
||||
|
||||
// We advance next_download irrespective of errors: we don't want error cases to result in
|
||||
// expensive busy-polling.
|
||||
detail.next_download = Some(Instant::now() + period_jitter(period, 5));
|
||||
// We advance next_download irrespective of errors: we don't want error cases to result in
|
||||
// expensive busy-polling.
|
||||
detail.next_download = Some(Instant::now() + period_jitter(period, 5));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
|
||||
@@ -396,9 +409,10 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
(RunningDownload { barrier }, Box::pin(async move {
|
||||
let _completion = completion;
|
||||
|
||||
match TenantDownloader::new(conf, &remote_storage, &secondary_state)
|
||||
let result = TenantDownloader::new(conf, &remote_storage, &secondary_state)
|
||||
.download(&download_ctx)
|
||||
.await
|
||||
.await;
|
||||
match &result
|
||||
{
|
||||
Err(UpdateError::NoData) => {
|
||||
tracing::info!("No heatmap found for tenant. This is fine if it is new.");
|
||||
@@ -415,6 +429,9 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
|
||||
tracing::error!("Error while downloading tenant: {e}");
|
||||
},
|
||||
Err(UpdateError::Restart) => {
|
||||
tracing::info!("Download reached deadline & will restart to update heatmap")
|
||||
}
|
||||
Ok(()) => {}
|
||||
};
|
||||
|
||||
@@ -436,6 +453,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
CompleteDownload {
|
||||
secondary_state,
|
||||
completed_at: Instant::now(),
|
||||
result
|
||||
}
|
||||
}.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
|
||||
}
|
||||
@@ -452,6 +470,11 @@ struct TenantDownloader<'a> {
|
||||
/// Errors that may be encountered while updating a tenant
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum UpdateError {
|
||||
/// This is not a true failure, but it's how a download indicates that it would like to be restarted by
|
||||
/// the scheduler, to pick up the latest heatmap
|
||||
#[error("Reached deadline, restarting downloads")]
|
||||
Restart,
|
||||
|
||||
#[error("No remote data found")]
|
||||
NoData,
|
||||
#[error("Insufficient local storage space")]
|
||||
@@ -603,6 +626,26 @@ impl<'a> TenantDownloader<'a> {
|
||||
self.prepare_timelines(&heatmap, heatmap_mtime).await?;
|
||||
}
|
||||
|
||||
// Calculate a deadline for downloads: if downloading takes longer than this, it is useful to drop out and start again,
|
||||
// so that we are always using reasonably a fresh heatmap. Otherwise, if we had really huge content to download, we might
|
||||
// spend 10s of minutes downloading layers we don't need.
|
||||
// (see https://github.com/neondatabase/neon/issues/8182)
|
||||
let deadline = {
|
||||
let period = self
|
||||
.secondary_state
|
||||
.detail
|
||||
.lock()
|
||||
.unwrap()
|
||||
.last_download
|
||||
.as_ref()
|
||||
.map(|d| d.upload_period)
|
||||
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
|
||||
|
||||
// Use double the period: we are not promising to complete within the period, this is just a heuristic
|
||||
// to keep using a "reasonably fresh" heatmap.
|
||||
Instant::now() + period * 2
|
||||
};
|
||||
|
||||
// Download the layers in the heatmap
|
||||
for timeline in heatmap.timelines {
|
||||
let timeline_state = timeline_states
|
||||
@@ -618,7 +661,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
|
||||
let timeline_id = timeline.timeline_id;
|
||||
self.download_timeline(timeline, timeline_state, ctx)
|
||||
self.download_timeline(timeline, timeline_state, deadline, ctx)
|
||||
.instrument(tracing::info_span!(
|
||||
"secondary_download_timeline",
|
||||
tenant_id=%tenant_shard_id.tenant_id,
|
||||
@@ -827,26 +870,28 @@ impl<'a> TenantDownloader<'a> {
|
||||
.and_then(|x| x)
|
||||
}
|
||||
|
||||
async fn download_timeline(
|
||||
/// Download heatmap layers that are not present on local disk, or update their
|
||||
/// access time if they are already present.
|
||||
async fn download_timeline_layers(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline: HeatMapTimeline,
|
||||
timeline_state: SecondaryDetailTimeline,
|
||||
deadline: Instant,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), UpdateError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
|
||||
) -> (Result<(), UpdateError>, Vec<HeatMapLayer>) {
|
||||
// Accumulate updates to the state
|
||||
let mut touched = Vec::new();
|
||||
|
||||
tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
|
||||
|
||||
// Download heatmap layers that are not present on local disk, or update their
|
||||
// access time if they are already present.
|
||||
for layer in timeline.layers {
|
||||
if self.secondary_state.cancel.is_cancelled() {
|
||||
tracing::debug!("Cancelled -- dropping out of layer loop");
|
||||
return Err(UpdateError::Cancelled);
|
||||
return (Err(UpdateError::Cancelled), touched);
|
||||
}
|
||||
|
||||
if Instant::now() > deadline {
|
||||
// We've been running downloads for a while, restart to download latest heatmap.
|
||||
return (Err(UpdateError::Restart), touched);
|
||||
}
|
||||
|
||||
// Existing on-disk layers: just update their access time.
|
||||
@@ -916,20 +961,43 @@ impl<'a> TenantDownloader<'a> {
|
||||
|
||||
match self
|
||||
.download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
|
||||
.await?
|
||||
.await
|
||||
{
|
||||
Some(layer) => touched.push(layer),
|
||||
None => {
|
||||
Ok(Some(layer)) => touched.push(layer),
|
||||
Ok(None) => {
|
||||
// Not an error but we didn't download it: remote layer is missing. Don't add it to the list of
|
||||
// things to consider touched.
|
||||
}
|
||||
Err(e) => {
|
||||
return (Err(e), touched);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write updates to state to record layers we just downloaded or touched.
|
||||
(Ok(()), touched)
|
||||
}
|
||||
|
||||
async fn download_timeline(
|
||||
&self,
|
||||
timeline: HeatMapTimeline,
|
||||
timeline_state: SecondaryDetailTimeline,
|
||||
deadline: Instant,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), UpdateError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
|
||||
|
||||
let (result, touched) = self
|
||||
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
|
||||
.await;
|
||||
|
||||
// Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful
|
||||
{
|
||||
let mut detail = self.secondary_state.detail.lock().unwrap();
|
||||
let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
|
||||
let timeline_detail = detail.timelines.entry(timeline_id).or_default();
|
||||
|
||||
tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
|
||||
|
||||
@@ -943,14 +1011,14 @@ impl<'a> TenantDownloader<'a> {
|
||||
let local_path = local_layer_path(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&timeline.timeline_id,
|
||||
&timeline_id,
|
||||
&t.name,
|
||||
&t.metadata.generation,
|
||||
);
|
||||
e.insert(OnDiskState::new(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&timeline.timeline_id,
|
||||
&timeline_id,
|
||||
t.name,
|
||||
t.metadata.clone(),
|
||||
t.access_time,
|
||||
@@ -961,7 +1029,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
result
|
||||
}
|
||||
|
||||
/// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics
|
||||
|
||||
@@ -20,6 +20,7 @@ use std::num::NonZeroUsize;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use pageserver_api::key::Key;
|
||||
use tokio_epoll_uring::BoundedBuf;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::vec_map::VecMap;
|
||||
|
||||
@@ -316,8 +317,9 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
);
|
||||
let buf = self
|
||||
.file
|
||||
.read_exact_at_n(buf, read.start, read.size(), ctx)
|
||||
.await?;
|
||||
.read_exact_at(buf.slice(0..read.size()), read.start, ctx)
|
||||
.await?
|
||||
.into_inner();
|
||||
|
||||
let blobs_at = read.blobs_at.as_slice();
|
||||
let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
|
||||
|
||||
use crate::page_cache::PageWriteGuard;
|
||||
use crate::page_cache::{PageWriteGuard, PAGE_SZ};
|
||||
use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
@@ -48,6 +48,7 @@ pub(crate) mod owned_buffers_io {
|
||||
//! but for the time being we're proving out the primitives in the neon.git repo
|
||||
//! for faster iteration.
|
||||
|
||||
pub(crate) mod slice;
|
||||
pub(crate) mod write;
|
||||
pub(crate) mod util {
|
||||
pub(crate) mod size_tracking_writer;
|
||||
@@ -143,16 +144,17 @@ struct SlotInner {
|
||||
/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
|
||||
struct PageWriteGuardBuf {
|
||||
page: PageWriteGuard<'static>,
|
||||
init_up_to: usize,
|
||||
}
|
||||
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
|
||||
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
|
||||
// Page cache pages are zero-initialized, so, wrt uninitialized memory we're good.
|
||||
// (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.)
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.page.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
self.page.len()
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.page.len()
|
||||
@@ -166,8 +168,8 @@ unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
// There shouldn't really be any reason to call this API since bytes_init() == bytes_total().
|
||||
assert!(pos <= self.page.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -585,37 +587,37 @@ impl VirtualFile {
|
||||
Ok(self.pos)
|
||||
}
|
||||
|
||||
pub async fn read_exact_at<B>(
|
||||
/// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
|
||||
///
|
||||
/// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
|
||||
pub async fn read_exact_at<Buf>(
|
||||
&self,
|
||||
buf: B,
|
||||
slice: Slice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<B, Error>
|
||||
) -> Result<Slice<Buf>, Error>
|
||||
where
|
||||
B: IoBufMut + Send,
|
||||
Buf: IoBufMut + Send,
|
||||
{
|
||||
let (buf, res) = read_exact_at_impl(buf, offset, None, |buf, offset| {
|
||||
self.read_at(buf, offset, ctx)
|
||||
})
|
||||
.await;
|
||||
res.map(|()| buf)
|
||||
}
|
||||
let assert_we_return_original_bounds = if cfg!(debug_assertions) {
|
||||
Some((slice.stable_ptr() as usize, slice.bytes_total()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
pub async fn read_exact_at_n<B>(
|
||||
&self,
|
||||
buf: B,
|
||||
offset: u64,
|
||||
count: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<B, Error>
|
||||
where
|
||||
B: IoBufMut + Send,
|
||||
{
|
||||
let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| {
|
||||
self.read_at(buf, offset, ctx)
|
||||
})
|
||||
.await;
|
||||
res.map(|()| buf)
|
||||
let original_bounds = slice.bounds();
|
||||
let (buf, res) =
|
||||
read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await;
|
||||
let res = res.map(|_| buf.slice(original_bounds));
|
||||
|
||||
if let Some(original_bounds) = assert_we_return_original_bounds {
|
||||
if let Ok(slice) = &res {
|
||||
let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total());
|
||||
assert_eq!(original_bounds, returned_bounds);
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
/// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
|
||||
@@ -625,13 +627,11 @@ impl VirtualFile {
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PageWriteGuard<'static>, Error> {
|
||||
let buf = PageWriteGuardBuf {
|
||||
page,
|
||||
init_up_to: 0,
|
||||
};
|
||||
let res = self.read_exact_at(buf, offset, ctx).await;
|
||||
res.map(|PageWriteGuardBuf { page, .. }| page)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, e))
|
||||
let buf = PageWriteGuardBuf { page }.slice_full();
|
||||
debug_assert_eq!(buf.bytes_total(), PAGE_SZ);
|
||||
self.read_exact_at(buf, offset, ctx)
|
||||
.await
|
||||
.map(|slice| slice.into_inner().page)
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||
@@ -722,14 +722,14 @@ impl VirtualFile {
|
||||
(buf, Ok(n))
|
||||
}
|
||||
|
||||
pub(crate) async fn read_at<B>(
|
||||
pub(crate) async fn read_at<Buf>(
|
||||
&self,
|
||||
buf: B,
|
||||
buf: tokio_epoll_uring::Slice<Buf>,
|
||||
offset: u64,
|
||||
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
|
||||
) -> (B, Result<usize, Error>)
|
||||
) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
|
||||
where
|
||||
B: tokio_epoll_uring::BoundedBufMut + Send,
|
||||
Buf: tokio_epoll_uring::IoBufMut + Send,
|
||||
{
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
@@ -781,26 +781,16 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub async fn read_exact_at_impl<B, F, Fut>(
|
||||
buf: B,
|
||||
pub async fn read_exact_at_impl<Buf, F, Fut>(
|
||||
mut buf: tokio_epoll_uring::Slice<Buf>,
|
||||
mut offset: u64,
|
||||
count: Option<usize>,
|
||||
mut read_at: F,
|
||||
) -> (B, std::io::Result<()>)
|
||||
) -> (Buf, std::io::Result<()>)
|
||||
where
|
||||
B: IoBufMut + Send,
|
||||
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
|
||||
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
|
||||
Buf: IoBufMut + Send,
|
||||
F: FnMut(tokio_epoll_uring::Slice<Buf>, u64) -> Fut,
|
||||
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<Buf>, std::io::Result<usize>)>,
|
||||
{
|
||||
let mut buf: tokio_epoll_uring::Slice<B> = match count {
|
||||
Some(count) => {
|
||||
assert!(count <= buf.bytes_total());
|
||||
assert!(count > 0);
|
||||
buf.slice(..count) // may include uninitialized memory
|
||||
}
|
||||
None => buf.slice_full(), // includes all the uninitialized memory
|
||||
};
|
||||
|
||||
while buf.bytes_total() != 0 {
|
||||
let res;
|
||||
(buf, res) = read_at(buf, offset).await;
|
||||
@@ -882,7 +872,7 @@ mod test_read_exact_at_impl {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic() {
|
||||
let buf = Vec::with_capacity(5);
|
||||
let buf = Vec::with_capacity(5).slice_full();
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::from(vec![Expectation {
|
||||
offset: 0,
|
||||
@@ -890,7 +880,7 @@ mod test_read_exact_at_impl {
|
||||
result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
|
||||
}]),
|
||||
}));
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
@@ -899,33 +889,13 @@ mod test_read_exact_at_impl {
|
||||
assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_with_count() {
|
||||
let buf = Vec::with_capacity(5);
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::from(vec![Expectation {
|
||||
offset: 0,
|
||||
bytes_total: 3,
|
||||
result: Ok(vec![b'a', b'b', b'c']),
|
||||
}]),
|
||||
}));
|
||||
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, Some(3), |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
.await;
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(buf, vec![b'a', b'b', b'c']);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_empty_buf_issues_no_syscall() {
|
||||
let buf = Vec::new();
|
||||
let buf = Vec::new().slice_full();
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::new(),
|
||||
}));
|
||||
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
|
||||
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
@@ -935,7 +905,7 @@ mod test_read_exact_at_impl {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_two_read_at_calls_needed_until_buf_filled() {
|
||||
let buf = Vec::with_capacity(4);
|
||||
let buf = Vec::with_capacity(4).slice_full();
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::from(vec![
|
||||
Expectation {
|
||||
@@ -950,7 +920,7 @@ mod test_read_exact_at_impl {
|
||||
},
|
||||
]),
|
||||
}));
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
@@ -961,7 +931,7 @@ mod test_read_exact_at_impl {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_eof_before_buffer_full() {
|
||||
let buf = Vec::with_capacity(3);
|
||||
let buf = Vec::with_capacity(3).slice_full();
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::from(vec![
|
||||
Expectation {
|
||||
@@ -981,7 +951,7 @@ mod test_read_exact_at_impl {
|
||||
},
|
||||
]),
|
||||
}));
|
||||
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
|
||||
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
@@ -1051,27 +1021,29 @@ impl VirtualFile {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
let buf = vec![0; PAGE_SZ];
|
||||
let buf = self
|
||||
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64), ctx)
|
||||
let slice = Vec::with_capacity(PAGE_SZ).slice_full();
|
||||
assert_eq!(slice.bytes_total(), PAGE_SZ);
|
||||
let slice = self
|
||||
.read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
|
||||
.await?;
|
||||
Ok(crate::tenant::block_io::BlockLease::Vec(buf))
|
||||
Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner()))
|
||||
}
|
||||
|
||||
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
|
||||
let mut tmp = vec![0; 128];
|
||||
loop {
|
||||
let res;
|
||||
(tmp, res) = self.read_at(tmp, self.pos, ctx).await;
|
||||
let slice = tmp.slice(..128);
|
||||
let (slice, res) = self.read_at(slice, self.pos, ctx).await;
|
||||
match res {
|
||||
Ok(0) => return Ok(()),
|
||||
Ok(n) => {
|
||||
self.pos += n as u64;
|
||||
buf.extend_from_slice(&tmp[..n]);
|
||||
buf.extend_from_slice(&slice[..n]);
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
tmp = slice.into_inner();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1185,6 +1157,7 @@ mod tests {
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
use super::*;
|
||||
use owned_buffers_io::slice::SliceExt;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use rand::Rng;
|
||||
@@ -1206,13 +1179,16 @@ mod tests {
|
||||
impl MaybeVirtualFile {
|
||||
async fn read_exact_at(
|
||||
&self,
|
||||
mut buf: Vec<u8>,
|
||||
mut slice: tokio_epoll_uring::Slice<Vec<u8>>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<u8>, Error> {
|
||||
) -> Result<tokio_epoll_uring::Slice<Vec<u8>>, Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset, ctx).await,
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
|
||||
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
|
||||
MaybeVirtualFile::File(file) => {
|
||||
let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
|
||||
file.read_exact_at(rust_slice, offset).map(|()| slice)
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
@@ -1286,9 +1262,12 @@ mod tests {
|
||||
len: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<String, Error> {
|
||||
let buf = vec![0; len];
|
||||
let buf = self.read_exact_at(buf, pos, ctx).await?;
|
||||
Ok(String::from_utf8(buf).unwrap())
|
||||
let slice = Vec::with_capacity(len).slice_full();
|
||||
assert_eq!(slice.bytes_total(), len);
|
||||
let slice = self.read_exact_at(slice, pos, ctx).await?;
|
||||
let vec = slice.into_inner();
|
||||
assert_eq!(vec.len(), len);
|
||||
Ok(String::from_utf8(vec).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1507,7 +1486,11 @@ mod tests {
|
||||
let mut rng = rand::rngs::OsRng;
|
||||
for _ in 1..1000 {
|
||||
let f = &files[rng.gen_range(0..files.len())];
|
||||
buf = f.read_exact_at(buf, 0, &ctx).await.unwrap();
|
||||
buf = f
|
||||
.read_exact_at(buf.slice_full(), 0, &ctx)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
assert!(buf == SAMPLE);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -107,7 +107,7 @@ use std::{
|
||||
sync::atomic::{AtomicU8, Ordering},
|
||||
};
|
||||
|
||||
use super::{FileGuard, Metadata};
|
||||
use super::{owned_buffers_io::slice::SliceExt, FileGuard, Metadata};
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
|
||||
@@ -120,38 +120,29 @@ fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std:
|
||||
}
|
||||
|
||||
impl IoEngine {
|
||||
pub(super) async fn read_at<B>(
|
||||
pub(super) async fn read_at<Buf>(
|
||||
&self,
|
||||
file_guard: FileGuard,
|
||||
offset: u64,
|
||||
mut buf: B,
|
||||
) -> ((FileGuard, B), std::io::Result<usize>)
|
||||
mut slice: tokio_epoll_uring::Slice<Buf>,
|
||||
) -> (
|
||||
(FileGuard, tokio_epoll_uring::Slice<Buf>),
|
||||
std::io::Result<usize>,
|
||||
)
|
||||
where
|
||||
B: tokio_epoll_uring::BoundedBufMut + Send,
|
||||
Buf: tokio_epoll_uring::IoBufMut + Send,
|
||||
{
|
||||
match self {
|
||||
IoEngine::NotSet => panic!("not initialized"),
|
||||
IoEngine::StdFs => {
|
||||
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
|
||||
let dst = unsafe {
|
||||
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
|
||||
};
|
||||
let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
|
||||
if let Ok(nbytes) = &res {
|
||||
assert!(*nbytes <= buf.bytes_total());
|
||||
// SAFETY: see above assertion
|
||||
unsafe {
|
||||
buf.set_init(*nbytes);
|
||||
}
|
||||
}
|
||||
#[allow(dropping_references)]
|
||||
drop(dst);
|
||||
((file_guard, buf), res)
|
||||
let rust_slice = slice.as_mut_rust_slice_full_zeroed();
|
||||
let res = file_guard.with_std_file(|std_file| std_file.read_at(rust_slice, offset));
|
||||
((file_guard, slice), res)
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let (resources, res) = system.read(file_guard, offset, buf).await;
|
||||
let (resources, res) = system.read(file_guard, offset, slice).await;
|
||||
(resources, res.map_err(epoll_uring_error_to_std))
|
||||
}
|
||||
}
|
||||
|
||||
121
pageserver/src/virtual_file/owned_buffers_io/slice.rs
Normal file
121
pageserver/src/virtual_file/owned_buffers_io/slice.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
use tokio_epoll_uring::BoundedBuf;
|
||||
use tokio_epoll_uring::BoundedBufMut;
|
||||
use tokio_epoll_uring::IoBufMut;
|
||||
use tokio_epoll_uring::Slice;
|
||||
|
||||
pub(crate) trait SliceExt {
|
||||
/// Get a `&mut[0..self.bytes_total()`] slice, for when you need to do borrow-based IO.
|
||||
///
|
||||
/// See the test case `test_slice_full_zeroed` for the difference to just doing `&slice[..]`
|
||||
fn as_mut_rust_slice_full_zeroed(&mut self) -> &mut [u8];
|
||||
}
|
||||
|
||||
impl<B> SliceExt for Slice<B>
|
||||
where
|
||||
B: IoBufMut,
|
||||
{
|
||||
#[inline(always)]
|
||||
fn as_mut_rust_slice_full_zeroed(&mut self) -> &mut [u8] {
|
||||
// zero-initialize the uninitialized parts of the buffer so we can create a Rust slice
|
||||
//
|
||||
// SAFETY: we own `slice`, don't write outside the bounds
|
||||
unsafe {
|
||||
let to_init = self.bytes_total() - self.bytes_init();
|
||||
self.stable_mut_ptr()
|
||||
.add(self.bytes_init())
|
||||
.write_bytes(0, to_init);
|
||||
self.set_init(self.bytes_total());
|
||||
};
|
||||
let bytes_total = self.bytes_total();
|
||||
&mut self[0..bytes_total]
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io::Read;
|
||||
|
||||
use super::*;
|
||||
use bytes::Buf;
|
||||
use tokio_epoll_uring::Slice;
|
||||
|
||||
#[test]
|
||||
fn test_slice_full_zeroed() {
|
||||
let make_fake_file = || bytes::BytesMut::from(&b"12345"[..]).reader();
|
||||
|
||||
// before we start the test, let's make sure we have a shared understanding of what slice_full does
|
||||
{
|
||||
let buf = Vec::with_capacity(3);
|
||||
let slice: Slice<_> = buf.slice_full();
|
||||
assert_eq!(slice.bytes_init(), 0);
|
||||
assert_eq!(slice.bytes_total(), 3);
|
||||
let rust_slice = &slice[..];
|
||||
assert_eq!(
|
||||
rust_slice.len(),
|
||||
0,
|
||||
"Slice only derefs to a &[u8] of the initialized part"
|
||||
);
|
||||
}
|
||||
|
||||
// and also let's establish a shared understanding of .slice()
|
||||
{
|
||||
let buf = Vec::with_capacity(3);
|
||||
let slice: Slice<_> = buf.slice(0..2);
|
||||
assert_eq!(slice.bytes_init(), 0);
|
||||
assert_eq!(slice.bytes_total(), 2);
|
||||
let rust_slice = &slice[..];
|
||||
assert_eq!(
|
||||
rust_slice.len(),
|
||||
0,
|
||||
"Slice only derefs to a &[u8] of the initialized part"
|
||||
);
|
||||
}
|
||||
|
||||
// the above leads to the easy mistake of using slice[..] for borrow-based IO like so:
|
||||
{
|
||||
let buf = Vec::with_capacity(3);
|
||||
let mut slice: Slice<_> = buf.slice_full();
|
||||
assert_eq!(slice[..].len(), 0);
|
||||
let mut file = make_fake_file();
|
||||
file.read_exact(&mut slice[..]).unwrap(); // one might think this reads 3 bytes but it reads 0
|
||||
assert_eq!(&slice[..] as &[u8], &[][..] as &[u8]);
|
||||
}
|
||||
|
||||
// With owned buffers IO like with VirtualFilem, you could totally
|
||||
// pass in a `Slice` with bytes_init()=0 but bytes_total()=5
|
||||
// and it will read 5 bytes into the slice, and return a slice that has bytes_init()=5.
|
||||
{
|
||||
// TODO: demo
|
||||
}
|
||||
|
||||
//
|
||||
// Ok, now that we have a shared understanding let's demo how to use the extension trait.
|
||||
//
|
||||
|
||||
// slice_full()
|
||||
{
|
||||
let buf = Vec::with_capacity(3);
|
||||
let mut slice: Slice<_> = buf.slice_full();
|
||||
let rust_slice = slice.as_mut_rust_slice_full_zeroed();
|
||||
assert_eq!(rust_slice.len(), 3);
|
||||
assert_eq!(rust_slice, &[0, 0, 0]);
|
||||
let mut file = make_fake_file();
|
||||
file.read_exact(rust_slice).unwrap();
|
||||
assert_eq!(rust_slice, b"123");
|
||||
assert_eq!(&slice[..], b"123");
|
||||
}
|
||||
|
||||
// .slice(..)
|
||||
{
|
||||
let buf = Vec::with_capacity(3);
|
||||
let mut slice: Slice<_> = buf.slice(0..2);
|
||||
let rust_slice = slice.as_mut_rust_slice_full_zeroed();
|
||||
assert_eq!(rust_slice.len(), 2);
|
||||
assert_eq!(rust_slice, &[0, 0]);
|
||||
let mut file = make_fake_file();
|
||||
file.read_exact(rust_slice).unwrap();
|
||||
assert_eq!(rust_slice, b"12");
|
||||
assert_eq!(&slice[..], b"12");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1384,14 +1384,31 @@ impl WalIngest {
|
||||
// Note: The multixact members can wrap around, even within one WAL record.
|
||||
offset = offset.wrapping_add(n_this_page as u32);
|
||||
}
|
||||
if xlrec.mid >= self.checkpoint.nextMulti {
|
||||
self.checkpoint.nextMulti = xlrec.mid + 1;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
|
||||
self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
|
||||
let next_offset = offset;
|
||||
assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
|
||||
|
||||
// Update next-multi-xid and next-offset
|
||||
//
|
||||
// NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
|
||||
// go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
|
||||
// read it, like GetNewMultiXactId(). This is different from how nextXid is
|
||||
// incremented! nextXid skips over < FirstNormalTransactionId when the the value
|
||||
// is stored, so it's never 0 in a checkpoint.
|
||||
//
|
||||
// I don't know why it's done that way, it seems less error-prone to skip over 0
|
||||
// when the value is stored rather than when it's read. But let's do it the same
|
||||
// way here.
|
||||
let next_multi_xid = xlrec.mid.wrapping_add(1);
|
||||
|
||||
if self
|
||||
.checkpoint
|
||||
.update_next_multixid(next_multi_xid, next_offset)
|
||||
{
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
|
||||
// Also update the next-xid with the highest member. According to the comments in
|
||||
// multixact_redo(), this shouldn't be necessary, but let's do the same here.
|
||||
let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
|
||||
if let Some(max_xid) = acc {
|
||||
if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
|
||||
|
||||
@@ -29,7 +29,8 @@ use utils::pid_file;
|
||||
use metrics::set_build_info_metric;
|
||||
use safekeeper::defaults::{
|
||||
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR,
|
||||
DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
|
||||
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
|
||||
};
|
||||
use safekeeper::http;
|
||||
use safekeeper::wal_service;
|
||||
@@ -191,6 +192,9 @@ struct Args {
|
||||
/// Pending updates to control file will be automatically saved after this interval.
|
||||
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_CONTROL_FILE_SAVE_INTERVAL)]
|
||||
control_file_save_interval: Duration,
|
||||
/// Number of allowed concurrent uploads of partial segments to remote storage.
|
||||
#[arg(long, default_value = DEFAULT_PARTIAL_BACKUP_CONCURRENCY)]
|
||||
partial_backup_concurrency: usize,
|
||||
}
|
||||
|
||||
// Like PathBufValueParser, but allows empty string.
|
||||
@@ -344,6 +348,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
enable_offload: args.enable_offload,
|
||||
delete_offloaded_wal: args.delete_offloaded_wal,
|
||||
control_file_save_interval: args.control_file_save_interval,
|
||||
partial_backup_concurrency: args.partial_backup_concurrency,
|
||||
};
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
|
||||
@@ -52,6 +52,7 @@ pub mod defaults {
|
||||
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
|
||||
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
|
||||
pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
|
||||
pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -91,6 +92,7 @@ pub struct SafeKeeperConf {
|
||||
pub enable_offload: bool,
|
||||
pub delete_offloaded_wal: bool,
|
||||
pub control_file_save_interval: Duration,
|
||||
pub partial_backup_concurrency: usize,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -133,6 +135,7 @@ impl SafeKeeperConf {
|
||||
enable_offload: false,
|
||||
delete_offloaded_wal: false,
|
||||
control_file_save_interval: Duration::from_secs(1),
|
||||
partial_backup_concurrency: 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,7 +72,8 @@ pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"safekeeper_wal_storage_operation_seconds",
|
||||
"Seconds spent on WAL storage operations",
|
||||
&["operation"]
|
||||
&["operation"],
|
||||
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
|
||||
});
|
||||
@@ -80,7 +81,8 @@ pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"safekeeper_misc_operation_seconds",
|
||||
"Seconds spent on miscellaneous operations",
|
||||
&["operation"]
|
||||
&["operation"],
|
||||
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
|
||||
});
|
||||
|
||||
@@ -36,7 +36,7 @@ use crate::timeline_guard::ResidenceGuard;
|
||||
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_backup::{self};
|
||||
use crate::wal_backup_partial::PartialRemoteSegment;
|
||||
use crate::wal_backup_partial::{PartialRemoteSegment, RateLimiter};
|
||||
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
|
||||
|
||||
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
|
||||
@@ -587,6 +587,7 @@ impl Timeline {
|
||||
shared_state: &mut WriteGuardSharedState<'_>,
|
||||
conf: &SafeKeeperConf,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
partial_backup_rate_limiter: RateLimiter,
|
||||
) -> Result<()> {
|
||||
match fs::metadata(&self.timeline_dir).await {
|
||||
Ok(_) => {
|
||||
@@ -617,7 +618,7 @@ impl Timeline {
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
self.bootstrap(conf, broker_active_set);
|
||||
self.bootstrap(conf, broker_active_set, partial_backup_rate_limiter);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -626,6 +627,7 @@ impl Timeline {
|
||||
self: &Arc<Timeline>,
|
||||
conf: &SafeKeeperConf,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
partial_backup_rate_limiter: RateLimiter,
|
||||
) {
|
||||
let (tx, rx) = self.manager_ctl.bootstrap_manager();
|
||||
|
||||
@@ -637,6 +639,7 @@ impl Timeline {
|
||||
broker_active_set,
|
||||
tx,
|
||||
rx,
|
||||
partial_backup_rate_limiter,
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ use crate::{
|
||||
timeline_guard::{AccessService, GuardId, ResidenceGuard},
|
||||
timelines_set::{TimelineSetGuard, TimelinesSet},
|
||||
wal_backup::{self, WalBackupTaskHandle},
|
||||
wal_backup_partial::{self, PartialRemoteSegment},
|
||||
wal_backup_partial::{self, PartialRemoteSegment, RateLimiter},
|
||||
SafeKeeperConf,
|
||||
};
|
||||
|
||||
@@ -185,6 +185,7 @@ pub(crate) struct Manager {
|
||||
|
||||
// misc
|
||||
pub(crate) access_service: AccessService,
|
||||
pub(crate) partial_backup_rate_limiter: RateLimiter,
|
||||
}
|
||||
|
||||
/// This task gets spawned alongside each timeline and is responsible for managing the timeline's
|
||||
@@ -197,6 +198,7 @@ pub async fn main_task(
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
|
||||
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
|
||||
partial_backup_rate_limiter: RateLimiter,
|
||||
) {
|
||||
tli.set_status(Status::Started);
|
||||
|
||||
@@ -209,7 +211,14 @@ pub async fn main_task(
|
||||
}
|
||||
};
|
||||
|
||||
let mut mgr = Manager::new(tli, conf, broker_active_set, manager_tx).await;
|
||||
let mut mgr = Manager::new(
|
||||
tli,
|
||||
conf,
|
||||
broker_active_set,
|
||||
manager_tx,
|
||||
partial_backup_rate_limiter,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Start recovery task which always runs on the timeline.
|
||||
if !mgr.is_offloaded && mgr.conf.peer_recovery_enabled {
|
||||
@@ -321,6 +330,7 @@ impl Manager {
|
||||
conf: SafeKeeperConf,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
|
||||
partial_backup_rate_limiter: RateLimiter,
|
||||
) -> Manager {
|
||||
let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
|
||||
Manager {
|
||||
@@ -339,6 +349,7 @@ impl Manager {
|
||||
partial_backup_uploaded,
|
||||
access_service: AccessService::new(manager_tx),
|
||||
tli,
|
||||
partial_backup_rate_limiter,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -525,6 +536,7 @@ impl Manager {
|
||||
self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
|
||||
self.wal_resident_timeline(),
|
||||
self.conf.clone(),
|
||||
self.partial_backup_rate_limiter.clone(),
|
||||
)));
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_backup_partial::RateLimiter;
|
||||
use crate::SafeKeeperConf;
|
||||
use anyhow::{bail, Context, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
@@ -23,6 +24,7 @@ struct GlobalTimelinesState {
|
||||
conf: Option<SafeKeeperConf>,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
|
||||
partial_backup_rate_limiter: RateLimiter,
|
||||
}
|
||||
|
||||
// Used to prevent concurrent timeline loading.
|
||||
@@ -37,8 +39,12 @@ impl GlobalTimelinesState {
|
||||
}
|
||||
|
||||
/// Get dependencies for a timeline constructor.
|
||||
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>) {
|
||||
(self.get_conf().clone(), self.broker_active_set.clone())
|
||||
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>, RateLimiter) {
|
||||
(
|
||||
self.get_conf().clone(),
|
||||
self.broker_active_set.clone(),
|
||||
self.partial_backup_rate_limiter.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Insert timeline into the map. Returns error if timeline with the same id already exists.
|
||||
@@ -66,6 +72,7 @@ static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
|
||||
conf: None,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
|
||||
partial_backup_rate_limiter: RateLimiter::new(1),
|
||||
})
|
||||
});
|
||||
|
||||
@@ -79,6 +86,7 @@ impl GlobalTimelines {
|
||||
// lock, so use explicit block
|
||||
let tenants_dir = {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
state.partial_backup_rate_limiter = RateLimiter::new(conf.partial_backup_concurrency);
|
||||
state.conf = Some(conf);
|
||||
|
||||
// Iterate through all directories and load tenants for all directories
|
||||
@@ -122,7 +130,7 @@ impl GlobalTimelines {
|
||||
/// this function is called during init when nothing else is running, so
|
||||
/// this is fine.
|
||||
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
|
||||
let (conf, broker_active_set) = {
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
state.get_dependencies()
|
||||
};
|
||||
@@ -145,7 +153,11 @@ impl GlobalTimelines {
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(ttid, tli.clone());
|
||||
tli.bootstrap(&conf, broker_active_set.clone());
|
||||
tli.bootstrap(
|
||||
&conf,
|
||||
broker_active_set.clone(),
|
||||
partial_backup_rate_limiter.clone(),
|
||||
);
|
||||
}
|
||||
// If we can't load a timeline, it's most likely because of a corrupted
|
||||
// directory. We will log an error and won't allow to delete/recreate
|
||||
@@ -178,7 +190,8 @@ impl GlobalTimelines {
|
||||
_guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
|
||||
ttid: TenantTimelineId,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let (conf, broker_active_set) = TIMELINES_STATE.lock().unwrap().get_dependencies();
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter) =
|
||||
TIMELINES_STATE.lock().unwrap().get_dependencies();
|
||||
|
||||
match Timeline::load_timeline(&conf, ttid) {
|
||||
Ok(timeline) => {
|
||||
@@ -191,7 +204,7 @@ impl GlobalTimelines {
|
||||
.timelines
|
||||
.insert(ttid, tli.clone());
|
||||
|
||||
tli.bootstrap(&conf, broker_active_set);
|
||||
tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter);
|
||||
|
||||
Ok(tli)
|
||||
}
|
||||
@@ -222,7 +235,7 @@ impl GlobalTimelines {
|
||||
commit_lsn: Lsn,
|
||||
local_start_lsn: Lsn,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let (conf, broker_active_set) = {
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
if let Ok(timeline) = state.get(&ttid) {
|
||||
// Timeline already exists, return it.
|
||||
@@ -257,7 +270,12 @@ impl GlobalTimelines {
|
||||
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
|
||||
// and the state on disk should remain unchanged.
|
||||
if let Err(e) = timeline
|
||||
.init_new(&mut shared_state, &conf, broker_active_set)
|
||||
.init_new(
|
||||
&mut shared_state,
|
||||
&conf,
|
||||
broker_active_set,
|
||||
partial_backup_rate_limiter,
|
||||
)
|
||||
.await
|
||||
{
|
||||
// Note: the most likely reason for init failure is that the timeline
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
//! This way control file stores information about all potentially existing
|
||||
//! remote partial segments and can clean them up after uploading a newer version.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
|
||||
use remote_storage::RemotePath;
|
||||
@@ -27,7 +29,7 @@ use tracing::{debug, error, info, instrument, warn};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
metrics::{PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
|
||||
metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
|
||||
safekeeper::Term,
|
||||
timeline::WalResidentTimeline,
|
||||
timeline_manager::StateSnapshot,
|
||||
@@ -35,6 +37,30 @@ use crate::{
|
||||
SafeKeeperConf,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RateLimiter {
|
||||
semaphore: Arc<tokio::sync::Semaphore>,
|
||||
}
|
||||
|
||||
impl RateLimiter {
|
||||
pub fn new(permits: usize) -> Self {
|
||||
Self {
|
||||
semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit {
|
||||
let _timer = MISC_OPERATION_SECONDS
|
||||
.with_label_values(&["partial_permit_acquire"])
|
||||
.start_timer();
|
||||
self.semaphore
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.expect("semaphore is closed")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub enum UploadStatus {
|
||||
/// Upload is in progress. This status should be used only for garbage collection,
|
||||
@@ -208,6 +234,9 @@ impl PartialBackup {
|
||||
/// Upload the latest version of the partial segment and garbage collect older versions.
|
||||
#[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
|
||||
async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
|
||||
let _timer = MISC_OPERATION_SECONDS
|
||||
.with_label_values(&["partial_do_upload"])
|
||||
.start_timer();
|
||||
info!("starting upload {:?}", prepared);
|
||||
|
||||
let state_0 = self.state.clone();
|
||||
@@ -307,6 +336,7 @@ pub(crate) fn needs_uploading(
|
||||
pub async fn main_task(
|
||||
tli: WalResidentTimeline,
|
||||
conf: SafeKeeperConf,
|
||||
limiter: RateLimiter,
|
||||
) -> Option<PartialRemoteSegment> {
|
||||
debug!("started");
|
||||
let await_duration = conf.partial_backup_timeout;
|
||||
@@ -411,6 +441,9 @@ pub async fn main_task(
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
// limit concurrent uploads
|
||||
let _upload_permit = limiter.acquire_owned().await;
|
||||
|
||||
let prepared = backup.prepare_upload().await;
|
||||
if let Some(seg) = &uploaded_segment {
|
||||
if seg.eq_without_status(&prepared) {
|
||||
|
||||
@@ -187,6 +187,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
enable_offload: false,
|
||||
delete_offloaded_wal: false,
|
||||
control_file_save_interval: Duration::from_secs(1),
|
||||
partial_backup_concurrency: 1,
|
||||
};
|
||||
|
||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||
|
||||
@@ -10,8 +10,9 @@ use hyper::header::CONTENT_TYPE;
|
||||
use hyper::{Body, Request, Response};
|
||||
use hyper::{StatusCode, Uri};
|
||||
use metrics::{BuildInfo, NeonMetrics};
|
||||
use pageserver_api::controller_api::TenantCreateRequest;
|
||||
use pageserver_api::models::{
|
||||
TenantConfigRequest, TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
|
||||
TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
|
||||
TenantTimeTravelRequest, TimelineCreateRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::persistence::Persistence;
|
||||
use crate::service;
|
||||
use pageserver_api::controller_api::PlacementPolicy;
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
||||
};
|
||||
@@ -29,6 +30,7 @@ pub(super) struct Reconciler {
|
||||
/// of a tenant's state from when we spawned a reconcile task.
|
||||
pub(super) tenant_shard_id: TenantShardId,
|
||||
pub(crate) shard: ShardIdentity,
|
||||
pub(crate) placement_policy: PlacementPolicy,
|
||||
pub(crate) generation: Option<Generation>,
|
||||
pub(crate) intent: TargetState,
|
||||
|
||||
@@ -641,7 +643,7 @@ impl Reconciler {
|
||||
generation,
|
||||
&self.shard,
|
||||
&self.config,
|
||||
!self.intent.secondary.is_empty(),
|
||||
&self.placement_policy,
|
||||
);
|
||||
match self.observed.locations.get(&node.get_id()) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
@@ -801,8 +803,15 @@ pub(crate) fn attached_location_conf(
|
||||
generation: Generation,
|
||||
shard: &ShardIdentity,
|
||||
config: &TenantConfig,
|
||||
has_secondaries: bool,
|
||||
policy: &PlacementPolicy,
|
||||
) -> LocationConfig {
|
||||
let has_secondaries = match policy {
|
||||
PlacementPolicy::Attached(0) | PlacementPolicy::Detached | PlacementPolicy::Secondary => {
|
||||
false
|
||||
}
|
||||
PlacementPolicy::Attached(_) => true,
|
||||
};
|
||||
|
||||
LocationConfig {
|
||||
mode: LocationConfigMode::AttachedSingle,
|
||||
generation: generation.into(),
|
||||
|
||||
@@ -32,10 +32,10 @@ use itertools::Itertools;
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
|
||||
ShardSchedulingPolicy, TenantCreateResponse, TenantCreateResponseShard,
|
||||
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse,
|
||||
TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
UtilizationScore,
|
||||
ShardSchedulingPolicy, TenantCreateRequest, TenantCreateResponse,
|
||||
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
|
||||
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
|
||||
TenantShardMigrateResponse, UtilizationScore,
|
||||
},
|
||||
models::{SecondaryProgress, TenantConfigRequest, TopTenantShardsRequest},
|
||||
};
|
||||
@@ -46,10 +46,9 @@ use crate::pageserver_client::PageserverClient;
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
self, LocationConfig, LocationConfigListResponse, LocationConfigMode,
|
||||
PageserverUtilization, ShardParameters, TenantConfig, TenantCreateRequest,
|
||||
TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation,
|
||||
TenantShardSplitRequest, TenantShardSplitResponse, TenantTimeTravelRequest,
|
||||
TimelineCreateRequest, TimelineInfo,
|
||||
PageserverUtilization, ShardParameters, TenantConfig, TenantLocationConfigRequest,
|
||||
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
|
||||
TenantShardSplitResponse, TenantTimeTravelRequest, TimelineCreateRequest, TimelineInfo,
|
||||
},
|
||||
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
|
||||
upcall_api::{
|
||||
@@ -1391,7 +1390,7 @@ impl Service {
|
||||
tenant_shard.generation.unwrap(),
|
||||
&tenant_shard.shard,
|
||||
&tenant_shard.config,
|
||||
false,
|
||||
&PlacementPolicy::Attached(0),
|
||||
)),
|
||||
},
|
||||
)]);
|
||||
@@ -3322,7 +3321,7 @@ impl Service {
|
||||
generation,
|
||||
&child_shard,
|
||||
&config,
|
||||
matches!(policy, PlacementPolicy::Attached(n) if n > 0),
|
||||
&policy,
|
||||
)),
|
||||
},
|
||||
);
|
||||
|
||||
@@ -908,12 +908,8 @@ impl TenantShard {
|
||||
.generation
|
||||
.expect("Attempted to enter attached state without a generation");
|
||||
|
||||
let wanted_conf = attached_location_conf(
|
||||
generation,
|
||||
&self.shard,
|
||||
&self.config,
|
||||
!self.intent.secondary.is_empty(),
|
||||
);
|
||||
let wanted_conf =
|
||||
attached_location_conf(generation, &self.shard, &self.config, &self.policy);
|
||||
match self.observed.locations.get(&node_id) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
||||
Some(_) | None => {
|
||||
@@ -1099,6 +1095,7 @@ impl TenantShard {
|
||||
let mut reconciler = Reconciler {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
shard: self.shard,
|
||||
placement_policy: self.policy.clone(),
|
||||
generation: self.generation,
|
||||
intent: reconciler_intent,
|
||||
detach,
|
||||
|
||||
@@ -2741,7 +2741,19 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
if generation is None:
|
||||
generation = self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
|
||||
client = self.http_client(auth_token=auth_token)
|
||||
return client.tenant_create(tenant_id, conf, generation=generation)
|
||||
|
||||
conf = conf or {}
|
||||
|
||||
client.tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "AttachedSingle",
|
||||
"generation": generation,
|
||||
"tenant_conf": conf,
|
||||
"secondary_conf": None,
|
||||
},
|
||||
)
|
||||
return tenant_id
|
||||
|
||||
def list_layers(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
|
||||
@@ -220,34 +220,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json, list)
|
||||
return res_json
|
||||
|
||||
def tenant_create(
|
||||
self,
|
||||
new_tenant_id: Union[TenantId, TenantShardId],
|
||||
conf: Optional[Dict[str, Any]] = None,
|
||||
generation: Optional[int] = None,
|
||||
) -> TenantId:
|
||||
if conf is not None:
|
||||
assert "new_tenant_id" not in conf.keys()
|
||||
|
||||
body: Dict[str, Any] = {
|
||||
"new_tenant_id": str(new_tenant_id),
|
||||
**(conf or {}),
|
||||
}
|
||||
|
||||
if generation is not None:
|
||||
body.update({"generation": generation})
|
||||
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant",
|
||||
json=body,
|
||||
)
|
||||
self.verbose_error(res)
|
||||
if res.status_code == 409:
|
||||
raise Exception(f"could not create tenant: already exists for id {new_tenant_id}")
|
||||
new_tenant_id = res.json()
|
||||
assert isinstance(new_tenant_id, str)
|
||||
return TenantId(new_tenant_id)
|
||||
|
||||
def tenant_attach(
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
|
||||
@@ -7,6 +7,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
VanillaPostgres,
|
||||
import_timeline_from_vanilla_postgres,
|
||||
wait_for_wal_insert_lsn,
|
||||
)
|
||||
@@ -182,3 +183,275 @@ def test_import_at_2bil(
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT count(*) from t")
|
||||
assert cur.fetchone() == (10000 + 1 + 1,)
|
||||
|
||||
|
||||
# Constants and macros copied from PostgreSQL multixact.c and headers. These are needed to
|
||||
# calculate the SLRU segments that a particular multixid or multixid-offsets falls into.
|
||||
BLCKSZ = 8192
|
||||
MULTIXACT_OFFSETS_PER_PAGE = int(BLCKSZ / 4)
|
||||
SLRU_PAGES_PER_SEGMENT = int(32)
|
||||
MXACT_MEMBER_BITS_PER_XACT = 8
|
||||
MXACT_MEMBER_FLAGS_PER_BYTE = 1
|
||||
MULTIXACT_FLAGBYTES_PER_GROUP = 4
|
||||
MULTIXACT_MEMBERS_PER_MEMBERGROUP = MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE
|
||||
MULTIXACT_MEMBERGROUP_SIZE = 4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP
|
||||
MULTIXACT_MEMBERGROUPS_PER_PAGE = int(BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE)
|
||||
MULTIXACT_MEMBERS_PER_PAGE = MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP
|
||||
|
||||
|
||||
def MultiXactIdToOffsetSegment(xid: int):
|
||||
return int(xid / (SLRU_PAGES_PER_SEGMENT * MULTIXACT_OFFSETS_PER_PAGE))
|
||||
|
||||
|
||||
def MXOffsetToMemberSegment(off: int):
|
||||
return int(off / (SLRU_PAGES_PER_SEGMENT * MULTIXACT_MEMBERS_PER_PAGE))
|
||||
|
||||
|
||||
def advance_multixid_to(
|
||||
pg_bin: PgBin, vanilla_pg: VanillaPostgres, next_multi_xid: int, next_multi_offset: int
|
||||
):
|
||||
"""
|
||||
Use pg_resetwal to advance the nextMulti and nextMultiOffset values in a stand-alone
|
||||
Postgres cluster. This is useful to get close to wraparound or some other interesting
|
||||
value, without having to burn a lot of time consuming the (multi-)XIDs one by one.
|
||||
|
||||
The new values should be higher than the old ones, in a wraparound-aware sense.
|
||||
|
||||
On entry, the server should be running. It will be shut down and restarted.
|
||||
"""
|
||||
|
||||
# Read old values from the last checkpoint. We will pass the old oldestMultiXid value
|
||||
# back to pg_resetwal, there's no option to leave it alone.
|
||||
with vanilla_pg.connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Make sure the oldest-multi-xid value in the control file is up-to-date
|
||||
cur.execute("checkpoint")
|
||||
cur.execute("select oldest_multi_xid, next_multixact_id from pg_control_checkpoint()")
|
||||
rec = cur.fetchone()
|
||||
assert rec is not None
|
||||
(ckpt_oldest_multi_xid, ckpt_next_multi_xid) = rec
|
||||
log.info(f"oldestMultiXid was {ckpt_oldest_multi_xid}, nextMultiXid was {ckpt_next_multi_xid}")
|
||||
log.info(f"Resetting to {next_multi_xid}")
|
||||
|
||||
# Use pg_resetwal to reset the next multiXid and multiOffset to given values.
|
||||
vanilla_pg.stop()
|
||||
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
|
||||
cmd = [
|
||||
pg_resetwal_path,
|
||||
f"--multixact-ids={next_multi_xid},{ckpt_oldest_multi_xid}",
|
||||
f"--multixact-offset={next_multi_offset}",
|
||||
"-D",
|
||||
str(vanilla_pg.pgdatadir),
|
||||
]
|
||||
pg_bin.run_capture(cmd)
|
||||
|
||||
# Because we skip over a lot of values, Postgres hasn't created the SLRU segments for
|
||||
# the new values yet. Create them manually, to allow Postgres to start up.
|
||||
#
|
||||
# This leaves "gaps" in the SLRU where segments between old value and new value are
|
||||
# missing. That's OK for our purposes. Autovacuum will print some warnings about the
|
||||
# missing segments, but will clean it up by truncating the SLRUs up to the new value,
|
||||
# closing the gap.
|
||||
segname = "%04X" % MultiXactIdToOffsetSegment(next_multi_xid)
|
||||
log.info(f"Creating dummy segment pg_multixact/offsets/{segname}")
|
||||
with open(vanilla_pg.pgdatadir / "pg_multixact" / "offsets" / segname, "w") as of:
|
||||
of.write("\0" * SLRU_PAGES_PER_SEGMENT * BLCKSZ)
|
||||
of.flush()
|
||||
|
||||
segname = "%04X" % MXOffsetToMemberSegment(next_multi_offset)
|
||||
log.info(f"Creating dummy segment pg_multixact/members/{segname}")
|
||||
with open(vanilla_pg.pgdatadir / "pg_multixact" / "members" / segname, "w") as of:
|
||||
of.write("\0" * SLRU_PAGES_PER_SEGMENT * BLCKSZ)
|
||||
of.flush()
|
||||
|
||||
# Start Postgres again and wait until autovacuum has processed all the databases
|
||||
#
|
||||
# This allows truncating the SLRUs, fixing the gaps with missing segments.
|
||||
vanilla_pg.start()
|
||||
with vanilla_pg.connect().cursor() as cur:
|
||||
for _ in range(1000):
|
||||
datminmxid = int(
|
||||
query_scalar(cur, "select min(datminmxid::text::int8) from pg_database")
|
||||
)
|
||||
log.info(f"datminmxid {datminmxid}")
|
||||
if next_multi_xid - datminmxid < 1_000_000: # not wraparound-aware!
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
def test_multixid_wraparound_import(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_bin: PgBin,
|
||||
vanilla_pg,
|
||||
):
|
||||
"""
|
||||
Test that the wraparound of the "next-multi-xid" counter is handled correctly in
|
||||
pageserver, And multi-offsets as well
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# In order to to test multixid wraparound, we need to first advance the counter to
|
||||
# within spitting distance of the wraparound, that is 2^32 multi-XIDs. We could simply
|
||||
# run a workload that consumes a lot of multi-XIDs until we approach that, but that
|
||||
# takes a very long time. So we cheat.
|
||||
#
|
||||
# Our strategy is to create a vanilla Postgres cluster, and use pg_resetwal to
|
||||
# directly set the multi-xid counter a higher value. However, we cannot directly set
|
||||
# it to just before 2^32 (~ 4 billion), because that would make the exisitng
|
||||
# 'relminmxid' values to look like they're in the future. It's not clear how the
|
||||
# system would behave in that situation. So instead, we bump it up ~ 1 billion
|
||||
# multi-XIDs at a time, and let autovacuum to process all the relations and update
|
||||
# 'relminmxid' between each run.
|
||||
#
|
||||
# XXX: For the multi-offsets, most of the bump is done in the last call. This is
|
||||
# because advancing it ~ 1 billion at a time hit a pathological case in the
|
||||
# MultiXactMemberFreezeThreshold() function, causing autovacuum not trigger multixid
|
||||
# freezing. See
|
||||
# https://www.postgresql.org/message-id/85fb354c-f89f-4d47-b3a2-3cbd461c90a3%40iki.fi
|
||||
# Multi-offsets don't have the same wraparound problems at 2 billion mark as
|
||||
# multi-xids do, so one big jump is fine.
|
||||
vanilla_pg.configure(
|
||||
[
|
||||
"log_autovacuum_min_duration = 0",
|
||||
# Perform anti-wraparound vacuuming aggressively
|
||||
"autovacuum_naptime='1 s'",
|
||||
"autovacuum_freeze_max_age = 1000000",
|
||||
"autovacuum_multixact_freeze_max_age = 1000000",
|
||||
],
|
||||
)
|
||||
vanilla_pg.start()
|
||||
advance_multixid_to(pg_bin, vanilla_pg, 0x40000000, 0x10000000)
|
||||
advance_multixid_to(pg_bin, vanilla_pg, 0x80000000, 0x20000000)
|
||||
advance_multixid_to(pg_bin, vanilla_pg, 0xC0000000, 0x30000000)
|
||||
advance_multixid_to(pg_bin, vanilla_pg, 0xFFFFFF00, 0xFFFFFF00)
|
||||
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
vanilla_pg.safe_psql("create table tt as select g as id from generate_series(1, 10) g")
|
||||
vanilla_pg.safe_psql("CHECKPOINT")
|
||||
|
||||
# Import the cluster to the pageserver
|
||||
tenant_id = TenantId.generate()
|
||||
env.pageserver.tenant_create(tenant_id)
|
||||
timeline_id = TimelineId.generate()
|
||||
import_timeline_from_vanilla_postgres(
|
||||
test_output_dir,
|
||||
env,
|
||||
pg_bin,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
"imported_multixid_wraparound_test",
|
||||
vanilla_pg.connstr(),
|
||||
)
|
||||
vanilla_pg.stop()
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"imported_multixid_wraparound_test",
|
||||
tenant_id=tenant_id,
|
||||
config_lines=[
|
||||
"log_autovacuum_min_duration = 0",
|
||||
"autovacuum_naptime='5 s'",
|
||||
"autovacuum=off",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
assert query_scalar(cur, "select count(*) from tt") == 10 # sanity check
|
||||
|
||||
# Install extension containing function needed for test
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
# Consume a lot of XIDs, just to advance the XIDs to different range than the
|
||||
# multi-xids. That avoids confusion while debugging
|
||||
cur.execute("select test_consume_xids(100000)")
|
||||
cur.execute("select pg_switch_wal()")
|
||||
cur.execute("checkpoint")
|
||||
|
||||
# Use subtransactions so that each row in 'tt' is stamped with different XID. Leave
|
||||
# the transaction open.
|
||||
cur.execute("BEGIN")
|
||||
cur.execute(
|
||||
"""
|
||||
do $$
|
||||
declare
|
||||
idvar int;
|
||||
begin
|
||||
for idvar in select id from tt loop
|
||||
begin
|
||||
update tt set id = idvar where id = idvar;
|
||||
exception when others then
|
||||
raise 'didn''t expect an error: %', sqlerrm;
|
||||
end;
|
||||
end loop;
|
||||
end;
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
|
||||
# In a different transaction, acquire a FOR KEY SHARE lock on each row. This generates
|
||||
# a new multixid for each row, with the previous xmax and this transaction's XID as the
|
||||
# members.
|
||||
#
|
||||
# Repeat this until the multi-xid counter wraps around.
|
||||
conn3 = endpoint.connect()
|
||||
cur3 = conn3.cursor()
|
||||
next_multixact_id_before_restart = 0
|
||||
observed_before_wraparound = False
|
||||
while True:
|
||||
cur3.execute("BEGIN")
|
||||
cur3.execute("SELECT * FROM tt FOR KEY SHARE")
|
||||
|
||||
# Get the xmax of one of the rows we locked. It should be a multi-xid. It might
|
||||
# not be the latest one, but close enough.
|
||||
row_xmax = int(query_scalar(cur3, "SELECT xmax FROM tt LIMIT 1"))
|
||||
cur3.execute("COMMIT")
|
||||
log.info(f"observed a row with xmax {row_xmax}")
|
||||
|
||||
# High value means not wrapped around yet
|
||||
if row_xmax >= 0xFFFFFF00:
|
||||
observed_before_wraparound = True
|
||||
continue
|
||||
|
||||
# xmax should not be a regular XID. (We bumped up the regular XID range earlier
|
||||
# to around 100000 and above.)
|
||||
assert row_xmax < 100
|
||||
|
||||
# xmax values < FirstNormalTransactionId (== 3) could be special XID values, or
|
||||
# multixid values after wraparound. We don't know for sure which, so keep going to
|
||||
# be sure we see value that's unambiguously a wrapped-around multixid
|
||||
if row_xmax < 3:
|
||||
continue
|
||||
|
||||
next_multixact_id_before_restart = row_xmax
|
||||
log.info(
|
||||
f"next_multixact_id is now at {next_multixact_id_before_restart} or a little higher"
|
||||
)
|
||||
break
|
||||
|
||||
# We should have observed the state before wraparound
|
||||
assert observed_before_wraparound
|
||||
|
||||
cur.execute("COMMIT")
|
||||
|
||||
# Wait until pageserver has received all the data, and restart the endpoint
|
||||
wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
endpoint.stop(mode="immediate") # 'immediate' to avoid writing shutdown checkpoint
|
||||
endpoint.start()
|
||||
|
||||
# Check that the next-multixid value wrapped around correctly
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("select next_multixact_id from pg_control_checkpoint()")
|
||||
next_multixact_id_after_restart = int(
|
||||
query_scalar(cur, "select next_multixact_id from pg_control_checkpoint()")
|
||||
)
|
||||
log.info(f"next_multixact_id after restart: {next_multixact_id_after_restart}")
|
||||
assert next_multixact_id_after_restart >= next_multixact_id_before_restart
|
||||
|
||||
# The multi-offset should wrap around as well
|
||||
cur.execute("select next_multi_offset from pg_control_checkpoint()")
|
||||
next_multi_offset_after_restart = int(
|
||||
query_scalar(cur, "select next_multi_offset from pg_control_checkpoint()")
|
||||
)
|
||||
log.info(f"next_multi_offset after restart: {next_multi_offset_after_restart}")
|
||||
assert next_multi_offset_after_restart < 100000
|
||||
|
||||
@@ -85,8 +85,10 @@ def check_client(env: NeonEnv, client: PageserverHttpClient):
|
||||
|
||||
# create new tenant and check it is also there
|
||||
tenant_id = TenantId.generate()
|
||||
client.tenant_create(
|
||||
tenant_id, generation=env.storage_controller.attach_hook_issue(tenant_id, env.pageserver.id)
|
||||
env.pageserver.tenant_create(
|
||||
tenant_id,
|
||||
generation=env.storage_controller.attach_hook_issue(tenant_id, env.pageserver.id),
|
||||
auth_token=client.auth_token,
|
||||
)
|
||||
assert tenant_id in {TenantId(t["id"]) for t in client.tenant_list()}
|
||||
|
||||
|
||||
@@ -542,6 +542,13 @@ def test_sharding_split_smoke(
|
||||
for k, v in non_default_tenant_config.items():
|
||||
assert config.effective_config[k] == v
|
||||
|
||||
# Check that heatmap uploads remain enabled after shard split
|
||||
# (https://github.com/neondatabase/neon/issues/8189)
|
||||
assert (
|
||||
config.effective_config["heatmap_period"]
|
||||
and config.effective_config["heatmap_period"] != "0s"
|
||||
)
|
||||
|
||||
# Validate pageserver state: expect every child shard to have an attached and secondary location
|
||||
(total, attached) = get_node_shard_counts(env, tenant_ids=[tenant_id])
|
||||
assert sum(attached.values()) == split_shard_count
|
||||
|
||||
@@ -315,7 +315,7 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
|
||||
# Create a tenant directly via pageserver HTTP API, skipping the storage controller
|
||||
tenant_id = TenantId.generate()
|
||||
generation = 123
|
||||
origin_ps.http_client().tenant_create(tenant_id, generation=generation)
|
||||
origin_ps.tenant_create(tenant_id, generation=generation)
|
||||
|
||||
# As if doing a live migration, first configure origin into stale mode
|
||||
r = origin_ps.http_client().tenant_location_conf(
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 7845c122d5...223dd92595
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 2ff5ecc67c...f54d7373eb
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: d55e0aca10...e06bebc753
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"v16": ["16.3", "d55e0aca104af0b611cf5565f1033b2acd2dcc1c"],
|
||||
"v15": ["15.7", "2ff5ecc67c64e5fe44b7dde598e64e4538e0c373"],
|
||||
"v14": ["14.12", "7845c122d51d3ebb547a984a640ac0310a2fadce"]
|
||||
"v16": ["16.3", "e06bebc75306b583e758b52c95946d41109239b2"],
|
||||
"v15": ["15.7", "f54d7373eb0de5a54bce2becdb1c801026c7edff"],
|
||||
"v14": ["14.12", "223dd925959f8124711dd3d867dc8ba6629d52c0"]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user