Compare commits

..

1 Commits

Author SHA1 Message Date
Tristan Partin
3b56c3eecd Remove dead spec parameters
Pageserver and safekeeper connection strings are passed as Postgres
GUCs by the production control plane, so remove the dead spec parameters
and teach neon_local to behave like the production control plane.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-05-01 13:23:33 -05:00
46 changed files with 428 additions and 851 deletions

View File

@@ -33,14 +33,9 @@ config-variables:
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID
- SLACK_COMPUTE_CHANNEL_ID
- SLACK_ON_CALL_DEVPROD_STREAM
- SLACK_ON_CALL_QA_STAGING_STREAM
- SLACK_ON_CALL_STORAGE_STAGING_STREAM
- SLACK_ONCALL_COMPUTE_GROUP
- SLACK_ONCALL_PROXY_GROUP
- SLACK_ONCALL_STORAGE_GROUP
- SLACK_PROXY_CHANNEL_ID
- SLACK_RUST_CHANNEL_ID
- SLACK_STORAGE_CHANNEL_ID
- SLACK_UPCOMING_RELEASE_CHANNEL_ID

View File

@@ -53,77 +53,6 @@ concurrency:
cancel-in-progress: true
jobs:
cleanup:
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
env:
ORG_ID: org-solitary-dew-09443886
LIMIT: 100
SEARCH: "Created by actions/neon-project-create; GITHUB_RUN_ID"
BASE_URL: https://console-stage.neon.build/api/v2
DRY_RUN: "false" # Set to "true" to just test out the workflow
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Cleanup inactive Neon projects left over from prior runs
env:
API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
run: |
set -euo pipefail
NOW=$(date -u +%s)
DAYS_AGO=$((NOW - 5 * 86400))
REQUEST_URL="$BASE_URL/projects?limit=$LIMIT&search=$(printf '%s' "$SEARCH" | jq -sRr @uri)&org_id=$ORG_ID"
echo "Requesting project list from:"
echo "$REQUEST_URL"
response=$(curl -s -X GET "$REQUEST_URL" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}" )
echo "Response:"
echo "$response" | jq .
projects_to_delete=$(echo "$response" | jq --argjson cutoff "$DAYS_AGO" '
.projects[]
| select(.compute_last_active_at != null)
| select((.compute_last_active_at | fromdateiso8601) < $cutoff)
| {id, name, compute_last_active_at}
')
if [ -z "$projects_to_delete" ]; then
echo "No projects eligible for deletion."
exit 0
fi
echo "Projects that will be deleted:"
echo "$projects_to_delete" | jq -r '.id'
if [ "$DRY_RUN" = "false" ]; then
echo "$projects_to_delete" | jq -r '.id' | while read -r project_id; do
echo "Deleting project: $project_id"
curl -s -X DELETE "$BASE_URL/projects/$project_id" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}"
done
else
echo "Dry run enabled — no projects were deleted."
fi
bench:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:

View File

@@ -1434,10 +1434,10 @@ jobs:
;;
esac
notify-release-deploy-failure:
needs: [ meta, deploy ]
notify-storage-release-deploy-failure:
needs: [ deploy ]
# We want this to run even if (transitive) dependencies are skipped, because deploy should really be successful on release branch workflow runs.
if: contains(fromJSON('["storage-release", "compute-release", "proxy-release"]'), needs.meta.outputs.run-kind) && needs.deploy.result != 'success' && always()
if: github.ref_name == 'release' && needs.deploy.result != 'success' && always()
runs-on: ubuntu-22.04
steps:
- name: Harden the runner (Audit all outbound calls)
@@ -1445,40 +1445,15 @@ jobs:
with:
egress-policy: audit
- name: Post release-deploy failure to team slack channel
- name: Post release-deploy failure to team-storage slack channel
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
env:
TEAM_ONCALL: >-
${{
fromJSON(format('{
"storage-release": "<!subteam^{0}|@oncall-storage>",
"compute-release": "<!subteam^{1}|@oncall-compute>",
"proxy-release": "<!subteam^{2}|@oncall-proxy>"
}',
vars.SLACK_ONCALL_STORAGE_GROUP,
vars.SLACK_ONCALL_COMPUTE_GROUP,
vars.SLACK_ONCALL_PROXY_GROUP
))[needs.meta.outputs.run-kind]
}}
CHANNEL: >-
${{
fromJSON(format('{
"storage-release": "{0}",
"compute-release": "{1}",
"proxy-release": "{2}"
}',
vars.SLACK_STORAGE_CHANNEL_ID,
vars.SLACK_COMPUTE_CHANNEL_ID,
vars.SLACK_PROXY_CHANNEL_ID
))[needs.meta.outputs.run-kind]
}}
with:
method: chat.postMessage
token: ${{ secrets.SLACK_BOT_TOKEN }}
payload: |
channel: ${{ env.CHANNEL }}
channel: ${{ vars.SLACK_STORAGE_CHANNEL_ID }}
text: |
🔴 ${{ env.TEAM_ONCALL }}: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
🔴 <!subteam^S06CJ87UMNY|@oncall-storage>: deploy job on release branch had unexpected status "${{ needs.deploy.result }}" <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>.
# The job runs on `release` branch and copies compatibility data and Neon artifact from the last *release PR* to the latest directory
promote-compatibility-data:

View File

@@ -210,24 +210,20 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
// may be empty. In that case, we need to dig them from the GUCs in the
// cluster.settings field.
let pageserver_connstr = spec
.pageserver_connstring
.clone()
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
.ok_or("pageserver connstr should be provided")?;
let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
if matches!(spec.mode, ComputeMode::Primary) {
spec.cluster
.settings
.find("neon.safekeepers")
.ok_or("safekeeper connstrings should be provided")?
.split(',')
.map(|str| str.to_string())
.collect()
} else {
vec![]
}
.cluster
.settings
.find("neon.pageserver_connstring")
.expect("pageserver connstr should be provided");
let safekeeper_connstrings = if matches!(spec.mode, ComputeMode::Primary) {
spec.cluster
.settings
.find("neon.safekeepers")
.ok_or("safekeeper connstrings should be provided")?
.split(',')
.map(|str| str.to_string())
.collect()
} else {
spec.safekeeper_connstrings.clone()
vec![]
};
let storage_auth_token = spec.storage_auth_token.clone();
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {

View File

@@ -1,5 +1,4 @@
use anyhow::Result;
use std::fmt::Write as FmtWrite;
use std::fs::{File, OpenOptions};
use std::io;
use std::io::Write;
@@ -56,29 +55,9 @@ pub fn write_postgres_conf(
// Add options for connecting to storage
writeln!(file, "# Neon storage settings")?;
if let Some(s) = &spec.pageserver_connstring {
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
}
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
if !spec.safekeeper_connstrings.is_empty() {
let mut neon_safekeepers_value = String::new();
tracing::info!(
"safekeepers_connstrings is not zero, gen: {:?}",
spec.safekeepers_generation
);
// If generation is given, prepend sk list with g#number:
if let Some(generation) = spec.safekeepers_generation {
write!(neon_safekeepers_value, "g#{}:", generation)?;
}
neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(","));
writeln!(
file,
"neon.safekeepers={}",
escape_conf_value(&neon_safekeepers_value)
)?;
}
if let Some(s) = &spec.tenant_id {
writeln!(file, "neon.tenant_id={}", escape_conf_value(&s.to_string()))?;
}
@@ -165,6 +144,21 @@ pub fn write_postgres_conf(
if spec.cluster.settings.is_some() {
writeln!(file, "# Managed by compute_ctl: begin")?;
write!(file, "{}", spec.cluster.settings.as_pg_settings())?;
// If generation is given, prepend sk list with g#number:
if let Some(generation) = spec.safekeepers_generation {
let neon_safekeepers = spec.cluster.settings.find("neon.safekeepers").unwrap();
// Don't try to add it if it already exists
if !neon_safekeepers.starts_with("g#") {
writeln!(
file,
"# Overwriting original neon.safekeepers value to include generation"
)?;
writeln!(
file,
"neon.safekeepers = 'g#{generation}:{neon_safekeepers}'"
)?;
}
}
writeln!(file, "# Managed by compute_ctl: end")?;
}

View File

@@ -366,47 +366,24 @@ impl ComputeNode {
// and can thus use all `max_connections` connection slots. However, that's generally not
// very efficient, so we generally still limit it to a smaller number.
if compute_state.status == ComputeStatus::Init {
// If the settings contain 'max_connections', use that as template
if let Some(config) = spec.cluster.settings.find("max_connections") {
config.parse::<usize>().ok()
} else {
// Otherwise, try to find the setting in the postgresql_conf string
spec.cluster
.postgresql_conf
.iter()
.flat_map(|conf| conf.split("\n"))
.filter_map(|line| {
if !line.contains("max_connections") {
return None;
}
// If the settings contain 'max_connections', use that as a
// template. Otherwise, if we didn't find max_connections, default
// to 10 concurrent connections.
spec.cluster
.settings
.find("max_connections")
.map_or(10, |guc| {
let max_connections = guc.parse::<usize>().unwrap();
let (key, value) = line.split_once("=")?;
let key = key
.trim_start_matches(char::is_whitespace)
.trim_end_matches(char::is_whitespace);
let value = value
.trim_start_matches(char::is_whitespace)
.trim_end_matches(char::is_whitespace);
if key != "max_connections" {
return None;
}
value.parse::<usize>().ok()
})
.next()
}
// If max_connections is present, use at most 1/3rd of that.
// When max_connections is lower than 30, try to use at least 10 connections, but
// never more than max_connections.
.map(|limit| match limit {
0..10 => limit,
10..30 => 10,
30.. => limit / 3,
})
// If we didn't find max_connections, default to 10 concurrent connections.
.unwrap_or(10)
// If max_connections is present, use at most 1/3rd of that.
// When max_connections is lower than 30, try to use at least 10 connections, but
// never more than max_connections.
match max_connections {
0..10 => max_connections,
10..30 => 10,
30.. => max_connections / 3,
}
})
} else {
// state == Running
// Because the cluster is already in the Running state, we should assume users are

View File

@@ -50,8 +50,8 @@ use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
};
use compute_api::spec::{
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
RemoteExtSpec, Role,
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, GenericOption,
PgIdent, RemoteExtSpec, Role,
};
use jsonwebtoken::jwk::{
AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
@@ -711,7 +711,18 @@ impl Endpoint {
} else {
Vec::new()
},
settings: None,
settings: Some(vec![
GenericOption {
name: "neon.pageserver_connstring".to_string(),
value: Some(pageserver_connstring),
vartype: "string".to_string(),
},
GenericOption {
name: "neon.safekeepers".to_string(),
value: Some(safekeeper_connstrings.join(",")),
vartype: "string".to_string(),
},
]),
postgresql_conf: Some(postgresql_conf.clone()),
},
delta_operations: None,
@@ -721,9 +732,7 @@ impl Endpoint {
branch_id: None,
endpoint_id: Some(self.endpoint_id.clone()),
mode: self.mode,
pageserver_connstring: Some(pageserver_connstring),
safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
safekeeper_connstrings,
storage_auth_token: auth_token.clone(),
remote_extensions,
pgbouncer_settings: None,
@@ -928,6 +937,8 @@ impl Endpoint {
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
) -> Result<()> {
let mut pg_settings: Vec<GenericOption> = vec![];
let (mut spec, compute_ctl_config) = {
let config_path = self.endpoint_path().join("config.json");
let file = std::fs::File::open(config_path)?;
@@ -958,7 +969,12 @@ impl Endpoint {
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
assert!(!pageserver_connstr.is_empty());
spec.pageserver_connstring = Some(pageserver_connstr);
pg_settings.push(GenericOption {
name: "neon.pageserver_connstring".to_string(),
value: Some(pageserver_connstr),
vartype: "string".to_string(),
});
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}
@@ -966,9 +982,22 @@ impl Endpoint {
// If safekeepers are not specified, don't change them.
if let Some(safekeepers) = safekeepers {
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
spec.safekeeper_connstrings = safekeeper_connstrings;
pg_settings.push(GenericOption {
name: "neon.safekeepers".to_string(),
value: Some(safekeeper_connstrings.join(",")),
vartype: "string".to_string(),
});
}
spec.cluster.settings = match spec.cluster.settings {
// Append the new settings to the configured settings
Some(mut settings) => {
settings.append(&mut pg_settings);
Some(settings)
}
None => Some(pg_settings),
};
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(120))
.build()

View File

@@ -90,19 +90,17 @@ pub struct ComputeSpec {
// Information needed to connect to the storage layer.
//
// `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
// `tenant_id` and `timeline_id` are always needed.
//
// Depending on `mode`, this can be a primary read-write node, a read-only
// replica, or a read-only node pinned at an older LSN.
// `safekeeper_connstrings` must be set for a primary.
//
// For backwards compatibility, the control plane may leave out all of
// For backwards compatibility, the control plane may leave out both of
// these, and instead set the "neon.tenant_id", "neon.timeline_id",
// etc. GUCs in cluster.settings. TODO: Once the control plane has been
// updated to fill these fields, we can make these non optional.
pub tenant_id: Option<TenantId>,
pub timeline_id: Option<TimelineId>,
pub pageserver_connstring: Option<String>,
// More neon ids that we expose to the compute_ctl
// and to postgres as neon extension GUCs.
@@ -121,8 +119,6 @@ pub struct ComputeSpec {
/// compute_ctl with postgres_ffi.
#[serde(default)]
pub safekeepers_generation: Option<u32>,
#[serde(default)]
pub safekeeper_connstrings: Vec<String>,
#[serde(default)]
pub mode: ComputeMode,

View File

@@ -16,7 +16,6 @@ pub struct Collector {
const NMETRICS: usize = 2;
static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
// SAFETY: libc::sysconf is safe, it merely returns a value.
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
if long == -1 {
panic!("sysconf(_SC_CLK_TCK) failed");

View File

@@ -10,7 +10,6 @@ use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer, delta_layer, ima
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::virtual_file::api::IoMode;
use pageserver::{page_cache, virtual_file};
use pageserver_api::key::Key;
use utils::id::{TenantId, TimelineId};
use crate::layer_map_analyzer::parse_filename;
@@ -28,7 +27,6 @@ pub(crate) enum LayerCmd {
path: PathBuf,
tenant: String,
timeline: String,
key: Option<Key>,
},
/// Dump all information of a layer file
DumpLayer {
@@ -102,7 +100,6 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
path,
tenant,
timeline,
key,
} => {
let timeline_path = path
.join(TENANTS_SEGMENT_NAME)
@@ -110,37 +107,21 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
.join(TIMELINES_SEGMENT_NAME)
.join(timeline);
let mut idx = 0;
let mut to_print = Vec::default();
for layer in fs::read_dir(timeline_path)? {
let layer = layer?;
if let Ok(layer_file) = parse_filename(&layer.file_name().into_string().unwrap()) {
if let Some(key) = key {
if layer_file.key_range.start <= *key && *key < layer_file.key_range.end {
to_print.push((idx, layer_file));
}
} else {
to_print.push((idx, layer_file));
}
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
idx += 1;
}
}
if key.is_some() {
to_print
.sort_by_key(|(_idx, layer_file)| std::cmp::Reverse(layer_file.lsn_range.end));
}
for (idx, layer_file) in to_print {
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
}
Ok(())
}
LayerCmd::DumpLayer {

View File

@@ -2469,7 +2469,6 @@ async fn timeline_checkpoint_handler(
.map_err(|e|
match e {
CompactionError::ShuttingDown => ApiError::ShuttingDown,
CompactionError::Cancelled => ApiError::ShuttingDown,
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::Other(e) => ApiError::InternalServerError(e),

View File

@@ -3198,7 +3198,6 @@ impl TenantShard {
match err {
err if err.is_cancel() => {}
CompactionError::ShuttingDown => (),
CompactionError::Cancelled => (),
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
// shouldn't block compaction.
CompactionError::Offload(_) => {}

View File

@@ -90,18 +90,10 @@ impl Header {
}
}
#[derive(Debug, thiserror::Error)]
pub enum BlobWriterError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum WriteBlobError {
#[error(transparent)]
Flush(BlobWriterError),
Flush(FlushTaskError),
#[error("blob too large ({len} bytes)")]
BlobTooLarge { len: usize },
#[error(transparent)]
@@ -246,16 +238,14 @@ where
cancel: CancellationToken,
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> Result<Self, BlobWriterError> {
let gate_token = gate.enter().map_err(|_| BlobWriterError::Cancelled)?;
) -> anyhow::Result<Self> {
Ok(Self {
io_buf: Some(BytesMut::new()),
writer: BufferedWriter::new(
file,
start_offset,
|| IoBufferMut::with_capacity(Self::CAPACITY),
gate_token,
gate.enter()?,
cancel,
ctx,
flush_task_span,
@@ -275,16 +265,13 @@ where
&mut self,
src_buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), BlobWriterError>) {
) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
let res = self
.writer
// TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
// Can remove all the complexity around owned buffers upstack
.write_buffered_borrowed(&src_buf, ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
})
.map(|len| {
self.offset += len as u64;
});
@@ -431,10 +418,8 @@ where
self,
mode: BufferedWriterShutdownMode,
ctx: &RequestContext,
) -> Result<W, BlobWriterError> {
let (_, file) = self.writer.shutdown(mode, ctx).await.map_err(|e| match e {
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
})?;
) -> Result<W, FlushTaskError> {
let (_, file) = self.writer.shutdown(mode, ctx).await?;
Ok(file)
}
}
@@ -482,11 +467,8 @@ pub(crate) mod tests {
.await?,
gate.enter()?,
);
let mut wtr = BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test"))
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
let mut wtr =
BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
for blob in blobs.iter() {
let (_, res) = if compression {
let res = wtr
@@ -508,11 +490,7 @@ pub(crate) mod tests {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
.await?;
file.disarm_into_inner()
};
Ok((temp_dir, pathbuf, offsets))

View File

@@ -19,14 +19,6 @@ use crate::context::RequestContext;
use crate::tenant::Timeline;
use crate::tenant::storage_layer::Layer;
#[derive(Debug, thiserror::Error)]
pub enum BatchSplitWriterError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
pub(crate) enum BatchWriterResult {
Produced(ResidentLayer),
Discarded(PersistentLayerKey),
@@ -105,7 +97,7 @@ impl BatchLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> Result<Vec<ResidentLayer>, BatchSplitWriterError> {
) -> anyhow::Result<Vec<ResidentLayer>> {
let res = self
.finish_with_discard_fn(tline, ctx, |_| async { false })
.await?;
@@ -123,7 +115,7 @@ impl BatchLayerWriter {
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
@@ -147,14 +139,14 @@ impl BatchLayerWriter {
generated_layers.push(BatchWriterResult::Discarded(layer_key));
} else {
let res = match inner {
LayerWriterWrapper::Delta(writer) => writer
.finish(layer_key.key_range.end, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
LayerWriterWrapper::Image(writer) => writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
LayerWriterWrapper::Delta(writer) => {
writer.finish(layer_key.key_range.end, ctx).await
}
LayerWriterWrapper::Image(writer) => {
writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
}
};
let layer = match res {
Ok((desc, path)) => {
@@ -163,7 +155,7 @@ impl BatchLayerWriter {
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(BatchSplitWriterError::Other(e));
return Err(e);
}
}
}
@@ -243,7 +235,7 @@ impl<'a> SplitImageLayerWriter<'a> {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), BatchSplitWriterError> {
) -> anyhow::Result<()> {
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
@@ -261,8 +253,7 @@ impl<'a> SplitImageLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
.await?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.batches.add_unfinished_image_writer(
prev_image_writer,
@@ -271,10 +262,7 @@ impl<'a> SplitImageLayerWriter<'a> {
);
self.start_key = key;
}
self.inner
.put_image(key, img, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
self.inner.put_image(key, img, ctx).await
}
pub(crate) async fn finish_with_discard_fn<D, F>(
@@ -283,7 +271,7 @@ impl<'a> SplitImageLayerWriter<'a> {
ctx: &RequestContext,
end_key: Key,
discard_fn: D,
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
@@ -303,7 +291,7 @@ impl<'a> SplitImageLayerWriter<'a> {
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
) -> anyhow::Result<Vec<BatchWriterResult>> {
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
.await
}
@@ -358,7 +346,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> Result<(), BatchSplitWriterError> {
) -> anyhow::Result<()> {
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
//
@@ -378,8 +366,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?,
.await?,
));
}
let (_, inner) = self.inner.as_mut().unwrap();
@@ -399,8 +386,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
.await?;
let (start_key, prev_delta_writer) =
self.inner.replace((key, next_delta_writer)).unwrap();
self.batches.add_unfinished_delta_writer(
@@ -410,19 +396,16 @@ impl<'a> SplitDeltaLayerWriter<'a> {
);
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
return Err(BatchSplitWriterError::Other(anyhow::anyhow!(
anyhow::bail!(
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
key,
inner.estimated_size()
)));
);
}
}
self.last_key_written = key;
let (_, inner) = self.inner.as_mut().unwrap();
inner
.put_value(key, lsn, val, ctx)
.await
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
inner.put_value(key, lsn, val, ctx).await
}
pub(crate) async fn finish_with_discard_fn<D, F>(
@@ -430,7 +413,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
@@ -456,7 +439,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
) -> anyhow::Result<Vec<BatchWriterResult>> {
self.finish_with_discard_fn(tline, ctx, |_| async { false })
.await
}

View File

@@ -35,9 +35,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use crate::tenant::blob_io::BlobWriterError;
use anyhow::{Context, Result, bail};
use anyhow::{Context, Result, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
@@ -78,7 +76,7 @@ use crate::tenant::vectored_blob_io::{
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -450,11 +448,7 @@ impl DeltaLayerWriterInner {
cancel,
ctx,
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
)?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -483,12 +477,15 @@ impl DeltaLayerWriterInner {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> Result<(), DeltaLayerWriterError> {
let val_ser =
Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
) -> anyhow::Result<()> {
let (_, res) = self
.put_value_bytes(key, lsn, val_ser.slice_len(), val.will_init(), ctx)
.put_value_bytes(
key,
lsn,
Value::ser(&val)?.slice_len(),
val.will_init(),
ctx,
)
.await;
res
}
@@ -500,46 +497,25 @@ impl DeltaLayerWriterInner {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBuf + Send,
{
if self.lsn_range.start > lsn {
return (
val,
Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
"lsn_start={}, lsn={}",
self.lsn_range.start,
lsn
))),
);
}
assert!(
self.lsn_range.start <= lsn,
"lsn_start={}, lsn={}",
self.lsn_range.start,
lsn
);
// We don't want to use compression in delta layer creation
let compression = ImageCompressionAlgorithm::Disabled;
let (val, res) = self
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)
.await;
let off = match res {
Ok((off, _)) => off,
Err(e) => {
return (
val,
Err(match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
DeltaLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
DeltaLayerWriterError::Other(err)
}
},
other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)),
}),
);
}
Err(e) => return (val, Err(anyhow::anyhow!(e))),
};
let blob_ref = BlobRef::new(off, will_init);
@@ -549,10 +525,7 @@ impl DeltaLayerWriterInner {
self.num_keys += 1;
(
val,
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e))),
)
(val, res.map_err(|e| anyhow::anyhow!(e)))
}
fn size(&self) -> u64 {
@@ -566,7 +539,7 @@ impl DeltaLayerWriterInner {
self,
key_end: Key,
ctx: &RequestContext,
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
let file = self
@@ -575,24 +548,17 @@ impl DeltaLayerWriterInner {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled,
BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err),
})?;
.await?;
// Write out the index
let (index_root_blk, block_buf) = self
.tree
.finish()
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let (index_root_blk, block_buf) = self.tree.finish()?;
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
res?;
offset += PAGE_SZ as u64;
}
assert!(self.lsn_range.start < self.lsn_range.end);
@@ -609,27 +575,24 @@ impl DeltaLayerWriterInner {
};
// Writes summary at the first block (offset 0).
let buf = summary
.ser_into_page()
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
let buf = summary.ser_into_page()?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
res?;
let metadata = file
.metadata()
.await
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
.context("get file metadata to determine size")?;
// 5GB limit for objects without multipart upload (which we don't want to use)
// Make it a little bit below to account for differing GB units
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
if metadata.len() > S3_UPLOAD_LIMIT {
return Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path(),
metadata.len()
)));
}
ensure!(
metadata.len() <= S3_UPLOAD_LIMIT,
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path(),
metadata.len()
);
// Note: Because we opened the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
@@ -646,7 +609,7 @@ impl DeltaLayerWriterInner {
// fsync the file
file.sync_all()
.await
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
.maybe_fatal_err("delta_layer sync_all")?;
trace!("created delta layer {}", self.path);
@@ -731,7 +694,7 @@ impl DeltaLayerWriter {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> Result<(), DeltaLayerWriterError> {
) -> anyhow::Result<()> {
self.inner
.as_mut()
.unwrap()
@@ -746,7 +709,7 @@ impl DeltaLayerWriter {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBuf + Send,
{
@@ -768,7 +731,7 @@ impl DeltaLayerWriter {
mut self,
key_end: Key,
ctx: &RequestContext,
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
self.inner.take().unwrap().finish(key_end, ctx).await
}
@@ -782,14 +745,6 @@ impl DeltaLayerWriter {
}
}
#[derive(Debug, thiserror::Error)]
pub enum DeltaLayerWriterError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
@@ -800,7 +755,7 @@ pub enum RewriteSummaryError {
impl From<std::io::Error> for RewriteSummaryError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::Error::new(e))
Self::Other(anyhow::anyhow!(e))
}
}

View File

@@ -33,9 +33,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use crate::tenant::blob_io::BlobWriterError;
use anyhow::{Context, Result, bail};
use anyhow::{Context, Result, bail, ensure};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
@@ -76,7 +74,7 @@ use crate::tenant::vectored_blob_io::{
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
///
@@ -342,14 +340,6 @@ impl ImageLayer {
}
}
#[derive(Debug, thiserror::Error)]
pub enum ImageLayerWriterError {
#[error("flush task cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
@@ -824,11 +814,7 @@ impl ImageLayerWriterInner {
cancel,
ctx,
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
)
.map_err(|e| match e {
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
BlobWriterError::Other(err) => err,
})?;
)?;
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
@@ -864,13 +850,8 @@ impl ImageLayerWriterInner {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), ImageLayerWriterError> {
if !self.key_range.contains(&key) {
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
"key not in range"
)));
}
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let compression = self.conf.image_compression;
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
@@ -880,18 +861,7 @@ impl ImageLayerWriterInner {
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
.await;
// TODO: re-use the buffer for `img` further upstack
let (off, compression_info) = res.map_err(|e| match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
ImageLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
ImageLayerWriterError::Other(err)
}
},
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
})?;
let (off, compression_info) = res?;
if compression_info.compressed_size.is_some() {
// The image has been considered for compression at least
self.uncompressed_bytes_eligible += uncompressed_len;
@@ -903,9 +873,7 @@ impl ImageLayerWriterInner {
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree
.append(&keybuf, off)
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
self.tree.append(&keybuf, off)?;
#[cfg(feature = "testing")]
{
@@ -925,12 +893,8 @@ impl ImageLayerWriterInner {
key: Key,
raw_with_header: Bytes,
ctx: &RequestContext,
) -> Result<(), ImageLayerWriterError> {
if !self.key_range.contains(&key) {
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
"key not in range"
)));
}
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
// NB: we don't update the (un)compressed metrics, since we can't determine them without
// decompressing the image. This seems okay.
@@ -940,23 +904,11 @@ impl ImageLayerWriterInner {
.blob_writer
.write_blob_raw(raw_with_header.slice_len(), ctx)
.await;
let offset = res.map_err(|e| match e {
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
crate::tenant::blob_io::BlobWriterError::Cancelled => {
ImageLayerWriterError::Cancelled
}
crate::tenant::blob_io::BlobWriterError::Other(err) => {
ImageLayerWriterError::Other(err)
}
},
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
})?;
let offset = res?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree
.append(&keybuf, offset)
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
self.tree.append(&keybuf, offset)?;
#[cfg(feature = "testing")]
{
@@ -973,7 +925,7 @@ impl ImageLayerWriterInner {
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
// Calculate compression ratio
@@ -996,24 +948,17 @@ impl ImageLayerWriterInner {
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
ctx,
)
.await
.map_err(|e| match e {
BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled,
BlobWriterError::Other(err) => ImageLayerWriterError::Other(err),
})?;
.await?;
// Write out the index
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
let (index_root_blk, block_buf) = self
.tree
.finish()
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
let (index_root_blk, block_buf) = self.tree.finish()?;
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
// Should we just replace BlockBuf::blocks with one big buffer?
for buf in block_buf.blocks {
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
res?;
offset += PAGE_SZ as u64;
}
@@ -1036,18 +981,14 @@ impl ImageLayerWriterInner {
};
// Writes summary at the first block (offset 0).
let buf = summary
.ser_into_page()
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
let buf = summary.ser_into_page()?;
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
res?;
let metadata = file.metadata().await.map_err(|e| {
ImageLayerWriterError::Other(anyhow::anyhow!(
"get metadata to determine file size: {}",
e
))
})?;
let metadata = file
.metadata()
.await
.context("get metadata to determine file size")?;
let desc = PersistentLayerDesc::new_img(
self.tenant_shard_id,
@@ -1070,9 +1011,9 @@ impl ImageLayerWriterInner {
// set inner.file here. The first read will have to re-open it.
// fsync the file
file.sync_all().await.map_err(|e| {
ImageLayerWriterError::Other(anyhow::anyhow!("image_layer sync_all: {}", e))
})?;
file.sync_all()
.await
.maybe_fatal_err("image_layer sync_all")?;
trace!("created image layer {}", self.path);
@@ -1152,7 +1093,7 @@ impl ImageLayerWriter {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), ImageLayerWriterError> {
) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}
@@ -1167,7 +1108,7 @@ impl ImageLayerWriter {
key: Key,
raw_with_header: Bytes,
ctx: &RequestContext,
) -> Result<(), ImageLayerWriterError> {
) -> anyhow::Result<()> {
self.inner
.as_mut()
.unwrap()
@@ -1191,7 +1132,7 @@ impl ImageLayerWriter {
pub(crate) async fn finish(
mut self,
ctx: &RequestContext,
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
self.inner.take().unwrap().finish(ctx, None).await
}
@@ -1200,7 +1141,7 @@ impl ImageLayerWriter {
mut self,
end_key: Key,
ctx: &RequestContext,
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
self.inner.take().unwrap().finish(ctx, Some(end_key)).await
}
}

View File

@@ -26,7 +26,6 @@ use utils::lsn::Lsn;
use utils::vec_map::VecMap;
use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
use super::delta_layer::DeltaLayerWriterError;
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64, u64_to_usize};
use crate::config::PageServerConf;
@@ -582,17 +581,7 @@ impl InMemoryLayer {
estimated_in_mem_size: AtomicU64::new(0),
})
}
}
#[derive(Debug, thiserror::Error)]
pub enum InMemoryLayerError {
#[error("flush task cancelled")]
Cancelled,
#[error(transparent)]
Other(anyhow::Error),
}
impl InMemoryLayer {
/// Write path.
///
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
@@ -602,7 +591,7 @@ impl InMemoryLayer {
&self,
serialized_batch: SerializedValueBatch,
ctx: &RequestContext,
) -> Result<(), InMemoryLayerError> {
) -> anyhow::Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
@@ -616,11 +605,7 @@ impl InMemoryLayer {
} = serialized_batch;
// Write the batch to the file
inner
.file
.write_raw(&raw, ctx)
.await
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
let expected_new_len = base_offset
@@ -652,8 +637,7 @@ impl InMemoryLayer {
batch_offset,
len,
will_init,
})
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
})?;
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
@@ -810,25 +794,14 @@ impl InMemoryLayer {
ctx,
)
.await;
res.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
DeltaLayerWriterError::Other(err) => err,
})?;
res?;
}
}
}
}
// MAX is used here because we identify L0 layers by full key range
let (desc, path) = delta_layer_writer
.finish(Key::MAX, ctx)
.await
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
DeltaLayerWriterError::Other(err) => err,
})?;
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
//

View File

@@ -300,7 +300,6 @@ pub(crate) fn log_compaction_error(
let level = match err {
e if e.is_cancel() => return,
ShuttingDown => return,
Cancelled => return,
Offload(_) => Level::ERROR,
AlreadyRunning(_) => Level::ERROR,
CollectKeySpaceError(_) => Level::ERROR,

View File

@@ -119,8 +119,6 @@ use crate::tenant::gc_result::GcResult;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::image_layer::ImageLayerWriterError;
use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerError;
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
use crate::tenant::storage_layer::{
AsLayerDesc, BatchLayerWriter, DeltaLayerWriter, EvictionError, ImageLayerName,
@@ -775,21 +773,6 @@ impl From<layer_manager::Shutdown> for CreateImageLayersError {
}
}
impl From<crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError>
for CreateImageLayersError
{
fn from(err: crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError) -> Self {
match err {
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Cancelled => {
Self::Cancelled
}
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Other(err) => {
Self::Other(err)
}
}
}
}
#[derive(thiserror::Error, Debug, Clone)]
pub(crate) enum FlushLayerError {
/// Timeline cancellation token was cancelled
@@ -2058,9 +2041,6 @@ impl Timeline {
Err(CompactionError::ShuttingDown) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::Cancelled) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::AlreadyRunning(_)) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
@@ -5252,17 +5232,7 @@ impl Timeline {
};
// Write all the keys we just read into our new image layer.
image_layer_writer
.put_image(img_key, img, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => CreateImageLayersError::Other(
anyhow::anyhow!("flush task cancelled"),
),
ImageLayerWriterError::Other(err) => {
CreateImageLayersError::Other(err)
}
})?;
image_layer_writer.put_image(img_key, img, ctx).await?;
wrote_keys = true;
}
}
@@ -5359,15 +5329,7 @@ impl Timeline {
// TODO: split image layers to avoid too large layer files. Too large image files are not handled
// on the normal data path either.
image_layer_writer
.put_image(k, v, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
CreateImageLayersError::Other(anyhow::anyhow!("flush task cancelled"))
}
ImageLayerWriterError::Other(err) => CreateImageLayersError::Other(err),
})?;
image_layer_writer.put_image(k, v, ctx).await?;
}
if wrote_any_image {
@@ -5881,8 +5843,6 @@ pub(crate) enum CompactionError {
Other(anyhow::Error),
#[error("Compaction already running: {0}")]
AlreadyRunning(&'static str),
#[error("cancelled")]
Cancelled,
}
impl CompactionError {
@@ -5897,7 +5857,6 @@ impl CompactionError {
PageReconstructError::Cancelled
))
| Self::Offload(OffloadError::Cancelled)
| Self::Cancelled
)
}
@@ -6963,22 +6922,9 @@ impl Timeline {
)
.await?;
for (key, img) in images {
image_layer_writer
.put_image(key, img, ctx)
.await
.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
ImageLayerWriterError::Other(err) => err,
})?;
image_layer_writer.put_image(key, img, ctx).await?;
}
let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e {
ImageLayerWriterError::Cancelled => {
anyhow::anyhow!("flush task cancelled")
}
ImageLayerWriterError::Other(err) => err,
})?;
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created image layer {}", image_layer.local_path());
{
@@ -7432,10 +7378,7 @@ impl TimelineWriter<'_> {
state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn));
}
res.map_err(|e| match e {
InMemoryLayerError::Cancelled => anyhow::anyhow!("flush task cancelled"),
InMemoryLayerError::Other(err) => err,
})
res
}
#[cfg(test)]

View File

@@ -1547,7 +1547,7 @@ impl Timeline {
ctx,
)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.map_err(CompactionError::Other)?;
// Safety of layer rewrites:
// - We are writing to a different local file path than we are reading from, so the old Layer
@@ -1572,7 +1572,7 @@ impl Timeline {
let (desc, path) = image_layer_writer
.finish(ctx)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.map_err(CompactionError::Other)?;
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
@@ -2140,7 +2140,7 @@ impl Timeline {
.unwrap()
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.map_err(CompactionError::Other)?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
@@ -2199,7 +2199,7 @@ impl Timeline {
.unwrap()
.put_value(key, lsn, value, ctx)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.map_err(CompactionError::Other)?;
} else {
let owner = self.shard_identity.get_shard_number(&key);
@@ -2217,7 +2217,7 @@ impl Timeline {
let (desc, path) = writer
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.map_err(CompactionError::Other)?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
new_layers.push(new_delta);
@@ -3682,7 +3682,8 @@ impl Timeline {
let (desc, path) = delta_writer_before
.finish(job_desc.compaction_key_range.start, ctx)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)
.context("failed to finish creating delta layer")
.map_err(CompactionError::Other)?;
@@ -3692,7 +3693,8 @@ impl Timeline {
let (desc, path) = delta_writer_after
.finish(key.key_range.end, ctx)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?;
let layer = Layer::finish_creating(self.conf, self, desc, &path)
.context("failed to finish creating delta layer")
.map_err(CompactionError::Other)?;
@@ -3711,7 +3713,8 @@ impl Timeline {
writer
.finish_with_discard_fn(self, ctx, end_key, discard)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
.context("failed to finish image layer writer")
.map_err(CompactionError::Other)?
} else {
drop(writer);
Vec::new()
@@ -3724,7 +3727,8 @@ impl Timeline {
delta_layer_writer
.finish_with_discard_fn(self, ctx, discard)
.await
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
.context("failed to finish delta layer writer")
.map_err(CompactionError::Other)?
} else {
drop(delta_layer_writer);
Vec::new()
@@ -4249,10 +4253,7 @@ impl TimelineAdaptor {
unfinished_image_layer,
} = outcome
{
let (desc, path) = unfinished_image_layer
.finish(ctx)
.await
.map_err(|e| CreateImageLayersError::Other(anyhow::anyhow!(e)))?;
let (desc, path) = unfinished_image_layer.finish(ctx).await?;
let image_layer =
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
self.new_images.push(image_layer);

View File

@@ -1,7 +1,6 @@
use std::collections::HashSet;
use std::sync::Arc;
use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError;
use anyhow::Context;
use bytes::Bytes;
use http_utils::error::ApiError;
@@ -817,10 +816,7 @@ async fn copy_lsn_prefix(
let (desc, path) = writer
.finish(reused_highest_key, ctx)
.await
.map_err(|e| match e {
DeltaLayerWriterError::Cancelled => Error::ShuttingDown,
DeltaLayerWriterError::Other(err) => Error::Prepare(err),
})?;
.map_err(Error::Prepare)?;
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
.map_err(Error::Prepare)?;

View File

@@ -14,6 +14,8 @@
use std::fs::File;
use std::io::{Error, ErrorKind};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
use std::sync::LazyLock;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
@@ -97,7 +99,7 @@ impl VirtualFile {
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
path: P,
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut open_options: OpenOptions,
open_options: &OpenOptions,
ctx: &RequestContext,
) -> Result<Self, std::io::Error> {
let mode = get_io_mode();
@@ -110,16 +112,21 @@ impl VirtualFile {
#[cfg(target_os = "linux")]
(IoMode::DirectRw, _) => true,
};
if set_o_direct {
let open_options = open_options.clone();
let open_options = if set_o_direct {
#[cfg(target_os = "linux")]
{
open_options = open_options.custom_flags(nix::libc::O_DIRECT);
let mut open_options = open_options;
open_options.custom_flags(nix::libc::O_DIRECT);
open_options
}
#[cfg(not(target_os = "linux"))]
unreachable!(
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
);
}
} else {
open_options
};
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
Ok(VirtualFile { inner, _mode: mode })
}
@@ -523,7 +530,7 @@ impl VirtualFileInner {
path: P,
ctx: &RequestContext,
) -> Result<VirtualFileInner, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await
}
/// Open a file with given options.
@@ -551,11 +558,10 @@ impl VirtualFileInner {
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
// only to set them.
let reopen_options = open_options
.clone()
.create(false)
.create_new(false)
.truncate(false);
let mut reopen_options = open_options.clone();
reopen_options.create(false);
reopen_options.create_new(false);
reopen_options.truncate(false);
let vfile = VirtualFileInner {
handle: RwLock::new(handle),
@@ -1301,7 +1307,7 @@ mod tests {
opts: OpenOptions,
ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
let vf = VirtualFile::open_with_options_v2(&path, opts, ctx).await?;
let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
}
}
@@ -1368,7 +1374,7 @@ mod tests {
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = A::open(path_a, OpenOptions::new().read(true), &ctx).await?;
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
// cannot write to a file opened in read-only mode
let _ = file_a
@@ -1387,7 +1393,8 @@ mod tests {
.read(true)
.write(true)
.create(true)
.truncate(true),
.truncate(true)
.to_owned(),
&ctx,
)
.await?;
@@ -1405,7 +1412,12 @@ mod tests {
let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile = A::open(path_b.clone(), OpenOptions::new().read(true), &ctx).await?;
let mut vfile = A::open(
path_b.clone(),
OpenOptions::new().read(true).to_owned(),
&ctx,
)
.await?;
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
vfiles.push(vfile);
}
@@ -1454,7 +1466,7 @@ mod tests {
for _ in 0..VIRTUAL_FILES {
let f = VirtualFileInner::open_with_options(
&test_file_path,
OpenOptions::new().read(true),
OpenOptions::new().read(true).clone(),
&ctx,
)
.await?;

View File

@@ -1,7 +1,6 @@
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
use std::os::fd::OwnedFd;
use std::os::unix::fs::OpenOptionsExt;
use std::path::Path;
use super::io_engine::IoEngine;
@@ -44,7 +43,7 @@ impl OpenOptions {
self.write
}
pub fn read(mut self, read: bool) -> Self {
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.read(read);
@@ -57,7 +56,7 @@ impl OpenOptions {
self
}
pub fn write(mut self, write: bool) -> Self {
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
self.write = write;
match &mut self.inner {
Inner::StdFs(x) => {
@@ -71,7 +70,7 @@ impl OpenOptions {
self
}
pub fn create(mut self, create: bool) -> Self {
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.create(create);
@@ -84,7 +83,7 @@ impl OpenOptions {
self
}
pub fn create_new(mut self, create_new: bool) -> Self {
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.create_new(create_new);
@@ -97,7 +96,7 @@ impl OpenOptions {
self
}
pub fn truncate(mut self, truncate: bool) -> Self {
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.truncate(truncate);
@@ -125,8 +124,10 @@ impl OpenOptions {
}
}
}
}
pub fn mode(mut self, mode: u32) -> Self {
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.mode(mode);
@@ -139,7 +140,7 @@ impl OpenOptions {
self
}
pub fn custom_flags(mut self, flags: i32) -> Self {
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.custom_flags(flags);

View File

@@ -26,7 +26,6 @@
#include "portability/instr_time.h"
#include "postmaster/interrupt.h"
#include "storage/buf_internals.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
@@ -80,7 +79,6 @@ int neon_protocol_version = 3;
static int neon_compute_mode = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
static int max_sockets;
static int pageserver_response_log_timeout = 10000;
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
@@ -338,13 +336,6 @@ load_shard_map(shardno_t shard_no, char *connstr_p, shardno_t *num_shards_p)
pageserver_disconnect(i);
}
pagestore_local_counter = end_update_counter;
/* Reserve file descriptors for sockets */
while (max_sockets < num_shards)
{
max_sockets += 1;
ReserveExternalFD();
}
}
if (num_shards_p)

View File

@@ -4,7 +4,6 @@
#include "miscadmin.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "storage/ipc.h"
#include "storage/shmem.h"
#include "storage/buf_internals.h"
@@ -397,10 +396,9 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn,
XLogRecPtr
neon_set_lwlsn_block_range(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum, BlockNumber from, BlockNumber n_blocks)
{
if (lsn == InvalidXLogRecPtr || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
if (lsn < FirstNormalUnloggedLSN || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
return lsn;
Assert(lsn >= WalSegMinSize);
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
lsn = SetLastWrittenLSNForBlockRangeInternal(lsn, rlocator, forknum, from, n_blocks);
LWLockRelease(LastWrittenLsnLock);
@@ -437,6 +435,7 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
NInfoGetRelNumber(relfilenode) == InvalidOid)
return InvalidXLogRecPtr;
BufTagInit(key, relNumber, forknum, blockno, spcOid, dbOid);
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
@@ -445,10 +444,6 @@ neon_set_lwlsn_block_v(const XLogRecPtr *lsns, NRelFileInfo relfilenode,
{
XLogRecPtr lsn = lsns[i];
if (lsn == InvalidXLogRecPtr)
continue;
Assert(lsn >= WalSegMinSize);
key.blockNum = blockno + i;
entry = hash_search(lastWrittenLsnCache, &key, HASH_ENTER, &found);
if (found)

View File

@@ -150,7 +150,7 @@ NeonWALReaderFree(NeonWALReader *state)
* fetched from timeline 'tli'.
*
* Returns NEON_WALREAD_SUCCESS if succeeded, NEON_WALREAD_ERROR if an error
* occurs, in which case 'err' has the description. Error always closes remote
* occurs, in which case 'err' has the desciption. Error always closes remote
* connection, if there was any, so socket subscription should be removed.
*
* NEON_WALREAD_WOULDBLOCK means caller should obtain socket to wait for with

View File

@@ -1989,14 +1989,8 @@ neon_start_unlogged_build(SMgrRelation reln)
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
#if PG_MAJORVERSION_NUM >= 17
/*
* We have to disable this check for pg14-16 because sorted build of GIST index requires
* to perform unlogged build several times
*/
if (smgrnblocks(reln, MAIN_FORKNUM) != 0)
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
#endif
unlogged_build_rel = reln;
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;

View File

@@ -124,7 +124,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
}
else
{
wp->safekeepers_generation = INVALID_GENERATION;
host = wp->config->safekeepers_list;
}
wp_log(LOG, "safekeepers_generation=%u", wp->safekeepers_generation);
@@ -757,7 +756,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
{
SafekeeperId *sk_id = &wp->mconf.members.m[i];
if (sk_id->node_id == sk->greetResponse.nodeId)
if (wp->mconf.members.m[i].node_id == sk->greetResponse.nodeId)
{
/*
* If mconf or list of safekeepers to connect to changed (the
@@ -782,7 +781,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
{
SafekeeperId *sk_id = &wp->mconf.new_members.m[i];
if (sk_id->node_id == sk->greetResponse.nodeId)
if (wp->mconf.new_members.m[i].node_id == sk->greetResponse.nodeId)
{
if (wp->new_members_safekeepers[i] != NULL && wp->new_members_safekeepers[i] != sk)
{
@@ -1072,6 +1071,7 @@ RecvVoteResponse(Safekeeper *sk)
/* ready for elected message */
sk->state = SS_WAIT_ELECTED;
wp->n_votes++;
/* Are we already elected? */
if (wp->state == WPS_CAMPAIGN)
{

View File

@@ -845,6 +845,9 @@ typedef struct WalProposer
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
/* number of votes collected from safekeepers */
int n_votes;
/* number of successful connections over the lifetime of walproposer */
int n_connected;

View File

@@ -409,22 +409,14 @@ impl JwkCacheEntryLock {
if let Some(exp) = payload.expiration {
if now >= exp + CLOCK_SKEW_LEEWAY {
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired(
exp.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
)));
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired));
}
}
if let Some(nbf) = payload.not_before {
if nbf >= now + CLOCK_SKEW_LEEWAY {
return Err(JwtError::InvalidClaims(
JwtClaimsError::JwtTokenNotYetReadyToUse(
nbf.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
),
JwtClaimsError::JwtTokenNotYetReadyToUse,
));
}
}
@@ -542,10 +534,10 @@ struct JwtPayload<'a> {
#[serde(rename = "aud", default)]
audience: OneOrMany,
/// Expiration - Time after which the JWT expires
#[serde(rename = "exp", deserialize_with = "numeric_date_opt", default)]
#[serde(deserialize_with = "numeric_date_opt", rename = "exp", default)]
expiration: Option<SystemTime>,
/// Not before - Time before which the JWT is not valid
#[serde(rename = "nbf", deserialize_with = "numeric_date_opt", default)]
/// Not before - Time after which the JWT expires
#[serde(deserialize_with = "numeric_date_opt", rename = "nbf", default)]
not_before: Option<SystemTime>,
// the following entries are only extracted for the sake of debug logging.
@@ -617,15 +609,8 @@ impl<'de> Deserialize<'de> for OneOrMany {
}
fn numeric_date_opt<'de, D: Deserializer<'de>>(d: D) -> Result<Option<SystemTime>, D::Error> {
<Option<u64>>::deserialize(d)?
.map(|t| {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(t))
.ok_or_else(|| {
serde::de::Error::custom(format_args!("timestamp out of bounds: {t}"))
})
})
.transpose()
let d = <Option<u64>>::deserialize(d)?;
Ok(d.map(|n| SystemTime::UNIX_EPOCH + Duration::from_secs(n)))
}
struct JwkRenewalPermit<'a> {
@@ -761,11 +746,11 @@ pub enum JwtClaimsError {
#[error("invalid JWT token audience")]
InvalidJwtTokenAudience,
#[error("JWT token has expired (exp={0})")]
JwtTokenHasExpired(u64),
#[error("JWT token has expired")]
JwtTokenHasExpired,
#[error("JWT token is not yet ready to use (nbf={0})")]
JwtTokenNotYetReadyToUse(u64),
#[error("JWT token is not yet ready to use")]
JwtTokenNotYetReadyToUse,
}
#[allow(dead_code, reason = "Debug use only")]
@@ -1248,14 +1233,14 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
"nbf": now + 60,
"aud": "neon",
}},
error: JwtClaimsError::JwtTokenNotYetReadyToUse(now + 60),
error: JwtClaimsError::JwtTokenNotYetReadyToUse,
},
Test {
body: json! {{
"exp": now - 60,
"aud": ["neon"],
}},
error: JwtClaimsError::JwtTokenHasExpired(now - 60),
error: JwtClaimsError::JwtTokenHasExpired,
},
Test {
body: json! {{

View File

@@ -32,6 +32,12 @@ pub(crate) enum ComputeUserInfoParseError {
option: EndpointId,
},
#[error(
"Common name inferred from SNI ('{}') is not known",
.cn,
)]
UnknownCommonName { cn: String },
#[error("Project name ('{0}') must contain only alphanumeric characters and hyphen.")]
MalformedProjectName(EndpointId),
}
@@ -60,15 +66,22 @@ impl ComputeUserInfoMaybeEndpoint {
}
}
pub(crate) fn endpoint_sni(sni: &str, common_names: &HashSet<String>) -> Option<EndpointId> {
let (subdomain, common_name) = sni.split_once('.')?;
pub(crate) fn endpoint_sni(
sni: &str,
common_names: &HashSet<String>,
) -> Result<Option<EndpointId>, ComputeUserInfoParseError> {
let Some((subdomain, common_name)) = sni.split_once('.') else {
return Err(ComputeUserInfoParseError::UnknownCommonName { cn: sni.into() });
};
if !common_names.contains(common_name) {
return None;
return Err(ComputeUserInfoParseError::UnknownCommonName {
cn: common_name.into(),
});
}
if subdomain == SERVERLESS_DRIVER_SNI {
return None;
return Ok(None);
}
Some(EndpointId::from(subdomain))
Ok(Some(EndpointId::from(subdomain)))
}
impl ComputeUserInfoMaybeEndpoint {
@@ -100,8 +113,15 @@ impl ComputeUserInfoMaybeEndpoint {
})
.map(|name| name.into());
let endpoint_from_domain =
sni.and_then(|sni_str| common_names.and_then(|cn| endpoint_sni(sni_str, cn)));
let endpoint_from_domain = if let Some(sni_str) = sni {
if let Some(cn) = common_names {
endpoint_sni(sni_str, cn)?
} else {
None
}
} else {
None
};
let endpoint = match (endpoint_option, endpoint_from_domain) {
// Invariant: if we have both project name variants, they should match.
@@ -404,34 +424,21 @@ mod tests {
}
#[test]
fn parse_unknown_sni() {
fn parse_inconsistent_sni() {
let options = StartupMessageParams::new([("user", "john_doe")]);
let sni = Some("project.localhost");
let common_names = Some(["example.com".into()].into());
let ctx = RequestContext::test();
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.unwrap();
assert!(info.endpoint_id.is_none());
}
#[test]
fn parse_unknown_sni_with_options() {
let options = StartupMessageParams::new([
("user", "john_doe"),
("options", "endpoint=foo-bar-baz-1234"),
]);
let sni = Some("project.localhost");
let common_names = Some(["example.com".into()].into());
let ctx = RequestContext::test();
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.unwrap();
assert_eq!(info.endpoint_id.as_deref(), Some("foo-bar-baz-1234"));
let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.expect_err("should fail");
match err {
UnknownCommonName { cn } => {
assert_eq!(cn, "localhost");
}
_ => panic!("bad error: {err:?}"),
}
}
#[test]

View File

@@ -24,6 +24,9 @@ pub(crate) enum HandshakeError {
#[error("protocol violation")]
ProtocolViolation,
#[error("missing certificate")]
MissingCertificate,
#[error("{0}")]
StreamUpgradeError(#[from] StreamUpgradeError),
@@ -39,6 +42,10 @@ impl ReportableError for HandshakeError {
match self {
HandshakeError::EarlyData => crate::error::ErrorKind::User,
HandshakeError::ProtocolViolation => crate::error::ErrorKind::User,
// This error should not happen, but will if we have no default certificate and
// the client sends no SNI extension.
// If they provide SNI then we can be sure there is a certificate that matches.
HandshakeError::MissingCertificate => crate::error::ErrorKind::Service,
HandshakeError::StreamUpgradeError(upgrade) => match upgrade {
StreamUpgradeError::AlreadyTls => crate::error::ErrorKind::Service,
StreamUpgradeError::Io(_) => crate::error::ErrorKind::ClientDisconnect,
@@ -139,7 +146,7 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
// try parse endpoint
let ep = conn_info
.server_name()
.and_then(|sni| endpoint_sni(sni, &tls.common_names));
.and_then(|sni| endpoint_sni(sni, &tls.common_names).ok().flatten());
if let Some(ep) = ep {
ctx.set_endpoint_id(ep);
}
@@ -154,8 +161,10 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
}
let (_, tls_server_end_point) =
tls.cert_resolver.resolve(conn_info.server_name());
let (_, tls_server_end_point) = tls
.cert_resolver
.resolve(conn_info.server_name())
.ok_or(HandshakeError::MissingCertificate)?;
stream = PqStream {
framed: Framed {

View File

@@ -98,7 +98,8 @@ fn generate_tls_config<'a>(
.with_no_client_auth()
.with_single_cert(vec![cert.clone()], key.clone_key())?;
let cert_resolver = CertResolver::new(key, vec![cert])?;
let mut cert_resolver = CertResolver::new();
cert_resolver.add_cert(key, vec![cert], true)?;
let common_names = cert_resolver.get_common_names();

View File

@@ -199,7 +199,8 @@ fn get_conn_info(
let endpoint = match connection_url.host() {
Some(url::Host::Domain(hostname)) => {
if let Some(tls) = tls {
endpoint_sni(hostname, &tls.common_names).ok_or(ConnInfoError::MalformedEndpoint)?
endpoint_sni(hostname, &tls.common_names)?
.ok_or(ConnInfoError::MalformedEndpoint)?
} else {
hostname
.split_once('.')

View File

@@ -5,7 +5,6 @@ use anyhow::{Context, bail};
use itertools::Itertools;
use rustls::crypto::ring::{self, sign};
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use rustls::sign::CertifiedKey;
use x509_cert::der::{Reader, SliceReader};
use super::{PG_ALPN_PROTOCOL, TlsServerEndPoint};
@@ -26,8 +25,10 @@ pub fn configure_tls(
certs_dir: Option<&String>,
allow_tls_keylogfile: bool,
) -> anyhow::Result<TlsConfig> {
let mut cert_resolver = CertResolver::new();
// add default certificate
let mut cert_resolver = CertResolver::parse_new(key_path, cert_path)?;
cert_resolver.add_cert_path(key_path, cert_path, true)?;
// add extra certificates
if let Some(certs_dir) = certs_dir {
@@ -39,8 +40,11 @@ pub fn configure_tls(
let key_path = path.join("tls.key");
let cert_path = path.join("tls.crt");
if key_path.exists() && cert_path.exists() {
cert_resolver
.add_cert_path(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
cert_resolver.add_cert_path(
&key_path.to_string_lossy(),
&cert_path.to_string_lossy(),
false,
)?;
}
}
}
@@ -79,42 +83,92 @@ pub fn configure_tls(
})
}
#[derive(Debug)]
#[derive(Default, Debug)]
pub struct CertResolver {
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
default: (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint),
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
}
impl CertResolver {
fn parse_new(key_path: &str, cert_path: &str) -> anyhow::Result<Self> {
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
Self::new(priv_key, cert_chain)
pub fn new() -> Self {
Self::default()
}
pub fn new(
priv_key: PrivateKeyDer<'static>,
cert_chain: Vec<CertificateDer<'static>>,
) -> anyhow::Result<Self> {
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
fn add_cert_path(
&mut self,
key_path: &str,
cert_path: &str,
is_default: bool,
) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path)
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
rustls_pemfile::private_key(&mut &key_bytes[..])
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
};
let mut certs = HashMap::new();
let default = (cert.clone(), tls_server_end_point);
certs.insert(common_name, (cert, tls_server_end_point));
Ok(Self { certs, default })
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.try_collect()
.with_context(|| {
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
})?
};
self.add_cert(priv_key, cert_chain, is_default)
}
fn add_cert_path(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
self.add_cert(priv_key, cert_chain)
}
fn add_cert(
pub fn add_cert(
&mut self,
priv_key: PrivateKeyDer<'static>,
cert_chain: Vec<CertificateDer<'static>>,
is_default: bool,
) -> anyhow::Result<()> {
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
let first_cert = &cert_chain[0];
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
let certificate = SliceReader::new(first_cert)
.context("Failed to parse cerficiate")?
.decode::<x509_cert::Certificate>()
.context("Failed to parse cerficiate")?;
let common_name = certificate.tbs_certificate.subject.to_string();
// We need to get the canonical name for this certificate so we can match them against any domain names
// seen within the proxy codebase.
//
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
// We need to remove the wildcard prefix for the purposes of certificate selection.
//
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
//
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
// validation, so let's we can continue with any common-name
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=") {
s.to_string()
} else {
bail!("Failed to parse common name from certificate")
};
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
if is_default {
self.default = Some((cert.clone(), tls_server_end_point));
}
self.certs.insert(common_name, (cert, tls_server_end_point));
Ok(())
}
@@ -123,82 +177,12 @@ impl CertResolver {
}
}
fn parse_key_cert(
key_path: &str,
cert_path: &str,
) -> anyhow::Result<(PrivateKeyDer<'static>, Vec<CertificateDer<'static>>)> {
let priv_key = {
let key_bytes = std::fs::read(key_path)
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
rustls_pemfile::private_key(&mut &key_bytes[..])
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
};
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.try_collect()
.with_context(|| {
format!(
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
)
})?
};
Ok((priv_key, cert_chain))
}
fn process_key_cert(
priv_key: PrivateKeyDer<'static>,
cert_chain: Vec<CertificateDer<'static>>,
) -> anyhow::Result<(String, Arc<CertifiedKey>, TlsServerEndPoint)> {
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
let first_cert = &cert_chain[0];
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
let certificate = SliceReader::new(first_cert)
.context("Failed to parse cerficiate")?
.decode::<x509_cert::Certificate>()
.context("Failed to parse cerficiate")?;
let common_name = certificate.tbs_certificate.subject.to_string();
// We need to get the canonical name for this certificate so we can match them against any domain names
// seen within the proxy codebase.
//
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
// We need to remove the wildcard prefix for the purposes of certificate selection.
//
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
//
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
// validation, so let's we can continue with any common-name
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
s.to_string()
} else if let Some(s) = common_name.strip_prefix("CN=") {
s.to_string()
} else {
bail!("Failed to parse common name from certificate")
};
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
Ok((common_name, cert, tls_server_end_point))
}
impl rustls::server::ResolvesServerCert for CertResolver {
fn resolve(
&self,
client_hello: rustls::server::ClientHello<'_>,
) -> Option<Arc<rustls::sign::CertifiedKey>> {
Some(self.resolve(client_hello.server_name()).0)
self.resolve(client_hello.server_name()).map(|x| x.0)
}
}
@@ -206,7 +190,7 @@ impl CertResolver {
pub fn resolve(
&self,
server_name: Option<&str>,
) -> (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint) {
) -> Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)> {
// loop here and cut off more and more subdomains until we find
// a match to get a proper wildcard support. OTOH, we now do not
// use nested domains, so keep this simple for now.
@@ -216,17 +200,12 @@ impl CertResolver {
if let Some(mut sni_name) = server_name {
loop {
if let Some(cert) = self.certs.get(sni_name) {
return cert.clone();
return Some(cert.clone());
}
if let Some((_, rest)) = sni_name.split_once('.') {
sni_name = rest;
} else {
// The customer has some custom DNS mapping - just return
// a default certificate.
//
// This will error if the customer uses anything stronger
// than sslmode=require. That's a choice they can make.
return self.default.clone();
return None;
}
}
} else {

View File

@@ -1279,8 +1279,7 @@ class NeonEnv:
)
tenant_config = ps_cfg.setdefault("tenant_config", {})
# This feature is pending rollout.
# tenant_config["rel_size_v2_enabled"] = True
tenant_config["rel_size_v2_enabled"] = True # Enable relsize_v2 by default in tests
if self.pageserver_remote_storage is not None:
ps_cfg["remote_storage"] = remote_storage_to_toml_dict(

View File

@@ -1,5 +1,4 @@
import math # Add this import
import os
import time
import traceback
from pathlib import Path
@@ -88,10 +87,7 @@ def test_cumulative_statistics_persistence(
- insert additional tuples that by itself are not enough to trigger auto-vacuum but in combination with the previous tuples are
- verify that autovacuum is triggered by the combination of tuples inserted before and after endpoint suspension
"""
project = neon_api.create_project(
pg_version,
f"Test cumulative statistics persistence, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}",
)
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
endpoint_id = project["endpoints"][0]["id"]

View File

@@ -62,9 +62,7 @@ def test_ro_replica_lag(
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
project = neon_api.create_project(
pg_version, f"Test readonly replica lag, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
)
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
@@ -197,9 +195,7 @@ def test_replication_start_stop(
pgbench_duration = f"-T{2**num_replicas * configuration_test_time_sec}"
error_occurred = False
project = neon_api.create_project(
pg_version, f"Test replication start stop, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
)
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])

View File

@@ -186,7 +186,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"type": "interpreted",
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
},
"rel_size_v2_enabled": True,
"rel_size_v2_enabled": False, # test suite enables it by default as of https://github.com/neondatabase/neon/issues/11081, so, custom config means disabling it
"gc_compaction_enabled": True,
"gc_compaction_verification": False,
"gc_compaction_initial_threshold_kb": 1024000,

View File

@@ -202,8 +202,6 @@ def test_pageserver_gc_compaction_preempt(
env = neon_env_builder.init_start(initial_tenant_conf=conf)
env.pageserver.allowed_errors.append(".*The timeline or pageserver is shutting down.*")
env.pageserver.allowed_errors.append(".*flush task cancelled.*")
env.pageserver.allowed_errors.append(".*failed to pipe.*")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
@@ -231,7 +229,7 @@ def test_pageserver_gc_compaction_preempt(
@skip_in_debug_build("only run with release build")
@pytest.mark.timeout(900) # This test is slow with sanitizers enabled, especially on ARM
@pytest.mark.timeout(600) # This test is slow with sanitizers enabled, especially on ARM
@pytest.mark.parametrize(
"with_branches",
["with_branches", "no_branches"],

View File

@@ -1,28 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
#
# Test unlogged build for GIST index
#
def test_gist(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
con = endpoint.connect()
cur = con.cursor()
iterations = 100
for _ in range(iterations):
cur.execute(
"CREATE TABLE pvactst (i INT, a INT[], p POINT) with (autovacuum_enabled = off)"
)
cur.execute(
"INSERT INTO pvactst SELECT i, array[1,2,3], point(i, i+1) FROM generate_series(1,1000) i"
)
cur.execute("CREATE INDEX gist_pvactst ON pvactst USING gist (p)")
cur.execute("VACUUM pvactst")
cur.execute("DROP TABLE pvactst")

View File

@@ -471,7 +471,7 @@ def test_tx_abort_with_many_relations(
try:
# Rollback phase should be fast: this is one WAL record that we should process efficiently
fut = exec.submit(rollback_and_wait)
fut.result(timeout=15 if reldir_type == "v1" else 30)
fut.result(timeout=15)
except:
exec.shutdown(wait=False, cancel_futures=True)
raise

View File

@@ -5,14 +5,14 @@
],
"v16": [
"16.8",
"05ddf212e2e07b788b5c8b88bdcf98630941f6ae"
"37496f87b5324af53c56127e278ee5b1e8435253"
],
"v15": [
"15.12",
"b838c8969b7c63f3e637a769656f5f36793b797c"
"8ecb12f21d862dfa39f7204b8f5e1c00a2a225b3"
],
"v14": [
"14.17",
"c8dab02bfc003ae7bd59096919042d7840f3c194"
"d3c9d61fb7a362a165dac7060819dd9d6ad68c28"
]
}