Compare commits

..

4 Commits

Author SHA1 Message Date
Konstantin Knizhnik
3821af6a6f Add comment about using mdnblocks to prevent race condition 2025-06-16 18:22:49 +03:00
Konstantin Knizhnik
f432b48cf0 Use mdnblocks to prevent raqce condition during end of unlogged build 2025-05-31 20:44:58 +03:00
Shockingly Good
f05df409bd impr(compute): Remove the deprecated CLI arg alias for remote-ext-config. (#12087)
Also moves it from `String` to `Url`.
2025-05-30 17:45:24 +00:00
Alex Chi Z.
f6c0f6c4ec fix(ci): install build tools with --locked (#12083)
## Problem

Release pipeline failing due to some tools cannot be installed.

## Summary of changes

Install with `--locked`.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-30 17:00:41 +00:00
12 changed files with 99 additions and 99 deletions

View File

@@ -310,13 +310,13 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
. "$HOME/.cargo/env" && \
cargo --version && rustup --version && \
rustup component add llvm-tools rustfmt clippy && \
cargo install rustfilt --version ${RUSTFILT_VERSION} && \
cargo install cargo-hakari --version ${CARGO_HAKARI_VERSION} && \
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \
cargo install cargo-hack --version ${CARGO_HACK_VERSION} && \
cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} && \
cargo install cargo-chef --locked --version ${CARGO_CHEF_VERSION} && \
cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} \
cargo install rustfilt --version ${RUSTFILT_VERSION} --locked && \
cargo install cargo-hakari --version ${CARGO_HAKARI_VERSION} --locked && \
cargo install cargo-deny --version ${CARGO_DENY_VERSION} --locked && \
cargo install cargo-hack --version ${CARGO_HACK_VERSION} --locked && \
cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} --locked && \
cargo install cargo-chef --version ${CARGO_CHEF_VERSION} --locked && \
cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} --locked \
--features postgres-bundled --no-default-features && \
rm -rf /home/nonroot/.cargo/registry && \
rm -rf /home/nonroot/.cargo/git

View File

@@ -64,9 +64,8 @@ struct Cli {
pub pgbin: String,
/// The base URL for the remote extension storage proxy gateway.
/// Should be in the form of `http(s)://<gateway-hostname>[:<port>]`.
#[arg(short = 'r', long, alias = "remote-ext-config")]
pub remote_ext_base_url: Option<String>,
#[arg(short = 'r', long)]
pub remote_ext_base_url: Option<Url>,
/// The port to bind the external listening HTTP server to. Clients running
/// outside the compute will talk to the compute through this port. Keep

View File

@@ -31,6 +31,7 @@ use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::spawn;
use tracing::{Instrument, debug, error, info, instrument, warn};
use url::Url;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
@@ -96,7 +97,7 @@ pub struct ComputeNodeParams {
pub internal_http_port: u16,
/// the address of extension storage proxy gateway
pub remote_ext_base_url: Option<String>,
pub remote_ext_base_url: Option<Url>,
/// Interval for installed extensions collection
pub installed_extensions_collection_interval: u64,

View File

@@ -83,6 +83,7 @@ use reqwest::StatusCode;
use tar::Archive;
use tracing::info;
use tracing::log::warn;
use url::Url;
use zstd::stream::read::Decoder;
use crate::metrics::{REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
@@ -158,14 +159,14 @@ fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
pub async fn download_extension(
ext_name: &str,
ext_path: &RemotePath,
remote_ext_base_url: &str,
remote_ext_base_url: &Url,
pgbin: &str,
) -> Result<u64> {
info!("Download extension {:?} from {:?}", ext_name, ext_path);
// TODO add retry logic
let download_buffer =
match download_extension_tar(remote_ext_base_url, &ext_path.to_string()).await {
match download_extension_tar(remote_ext_base_url.as_str(), &ext_path.to_string()).await {
Ok(buffer) => buffer,
Err(error_message) => {
return Err(anyhow::anyhow!(

View File

@@ -64,57 +64,45 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
};
match data.claims.scope {
// If the scope is not [`ComputeClaimsScope::Admin`], then we
// must validate the compute_id
Some(ref scope) => {
// TODO: We should validate audience for every token, but
// instead of this ad-hoc validation, we should turn
// [`Validation::validate_aud`] on. This is merely a stopgap
// while we roll out `aud` deployment. We return a 401
// Unauthorized because when we eventually do use
// [`Validation`], we will hit the above `Err` match arm which
// returns 401 Unauthorized.
if scope.contains(&ComputeClaimsScope::Admin) {
let Some(ref audience) = data.claims.audience else {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"missing audience in authorization token claims",
));
};
if !audience.iter().any(|a| a == COMPUTE_AUDIENCE) {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"invalid audience in authorization token claims",
));
}
} else if scope.contains(&ComputeClaimsScope::ComputeCtlExternalApi) {
let Some(ref claimed_compute_id) = data.claims.compute_id else {
return Err(JsonResponse::error(
StatusCode::FORBIDDEN,
"missing compute_id in authorization token claims",
));
};
if *claimed_compute_id != compute_id {
return Err(JsonResponse::error(
StatusCode::FORBIDDEN,
"invalid compute ID in authorization token claims",
));
}
} else {
// TODO: We should validate audience for every token, but
// instead of this ad-hoc validation, we should turn
// [`Validation::validate_aud`] on. This is merely a stopgap
// while we roll out `aud` deployment. We return a 401
// Unauthorized because when we eventually do use
// [`Validation`], we will hit the above `Err` match arm which
// returns 401 Unauthorized.
Some(ComputeClaimsScope::Admin) => {
let Some(ref audience) = data.claims.audience else {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"compute_ctl:external_api scope not included",
"missing audience in authorization token claims",
));
};
if !audience.iter().any(|a| a == COMPUTE_AUDIENCE) {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"invalid audience in authorization token claims",
));
}
}
None => {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"compute_ctl:external_api scope not included",
));
// If the scope is not [`ComputeClaimsScope::Admin`], then we
// must validate the compute_id
_ => {
let Some(ref claimed_compute_id) = data.claims.compute_id else {
return Err(JsonResponse::error(
StatusCode::FORBIDDEN,
"missing compute_id in authorization token claims",
));
};
if *claimed_compute_id != compute_id {
return Err(JsonResponse::error(
StatusCode::FORBIDDEN,
"invalid compute ID in authorization token claims",
));
}
}
}

View File

@@ -709,7 +709,7 @@ struct EndpointGenerateJwtCmdArgs {
endpoint_id: String,
#[clap(short = 's', long, help = "Scope to generate the JWT with", value_parser = ComputeClaimsScope::from_str)]
scope: Vec<ComputeClaimsScope>,
scope: Option<ComputeClaimsScope>,
}
#[derive(clap::Subcommand)]
@@ -1580,7 +1580,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?
};
let jwt = endpoint.generate_jwt(Some(args.scope.clone()))?;
let jwt = endpoint.generate_jwt(args.scope)?;
print!("{jwt}");
}

View File

@@ -632,16 +632,14 @@ impl Endpoint {
}
/// Generate a JWT with the correct claims.
pub fn generate_jwt(&self, scope: Option<Vec<ComputeClaimsScope>>) -> Result<String> {
pub fn generate_jwt(&self, scope: Option<ComputeClaimsScope>) -> Result<String> {
self.env.generate_auth_token(&ComputeClaims {
audience: match scope {
Some(ref scope) if scope.contains(&ComputeClaimsScope::Admin) => {
Some(vec![COMPUTE_AUDIENCE.to_owned()])
}
Some(ComputeClaimsScope::Admin) => Some(vec![COMPUTE_AUDIENCE.to_owned()]),
_ => None,
},
compute_id: match scope {
Some(ref scope) if scope.contains(&ComputeClaimsScope::Admin) => None,
Some(ComputeClaimsScope::Admin) => None,
_ => Some(self.endpoint_id.clone()),
},
scope,
@@ -920,7 +918,7 @@ impl Endpoint {
self.external_http_address.port()
),
)
.bearer_auth(self.generate_jwt(Some(vec![ComputeClaimsScope::ComputeCtlExternalApi]))?)
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
.send()
.await?;
@@ -997,7 +995,7 @@ impl Endpoint {
self.external_http_address.port()
))
.header(CONTENT_TYPE.as_str(), "application/json")
.bearer_auth(self.generate_jwt(Some(vec![ComputeClaimsScope::ComputeCtlExternalApi]))?)
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
.body(
serde_json::to_string(&ConfigurationRequest {
spec,

View File

@@ -17,10 +17,6 @@ pub enum ComputeClaimsScope {
/// An admin-scoped token allows access to all of `compute_ctl`'s authorized
/// facilities.
Admin,
/// Scope providing access to the `compute_ctl` external API.
#[serde(rename = "compute_ctl:external_api")]
ComputeCtlExternalApi,
}
impl FromStr for ComputeClaimsScope {
@@ -29,7 +25,6 @@ impl FromStr for ComputeClaimsScope {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"admin" => Ok(ComputeClaimsScope::Admin),
"compute_ctl:external_api" => Ok(ComputeClaimsScope::ComputeCtlExternalApi),
_ => Err(anyhow::anyhow!("invalid compute claims scope \"{s}\"")),
}
}
@@ -46,8 +41,8 @@ pub struct ComputeClaims {
/// [`ComputeClaimsScope::Admin`].
pub compute_id: Option<String>,
/// The scopes of what the token is authorized for.
pub scope: Option<Vec<ComputeClaimsScope>>,
/// The scope of what the token authorizes.
pub scope: Option<ComputeClaimsScope>,
/// The recipient the token is intended for.
///

View File

@@ -1604,19 +1604,23 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
#endif
{
/* It exists locally. Guess it's unlogged then. */
if (mdnblocks(reln, forknum) >= blocknum)
{
/* prevent race condition with unlogged build end which unlinks local files */
#if PG_MAJORVERSION_NUM >= 17
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
#else
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
#endif
/*
* We could set relpersistence now that we have determined
* that it's local. But we don't dare to do it, because that
* would immediately allow reads as well, which shouldn't
* happen. We could cache it with a different 'relpersistence'
* value, but this isn't performance critical.
*/
return;
/*
* We could set relpersistence now that we have determined
* that it's local. But we don't dare to do it, because that
* would immediately allow reads as well, which shouldn't
* happen. We could cache it with a different 'relpersistence'
* value, but this isn't performance critical.
*/
return;
}
}
break;
@@ -1684,17 +1688,30 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
if (mdexists(reln, INIT_FORKNUM))
#endif
{
/* It exists locally. Guess it's unlogged then. */
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
/*
* We could set relpersistence now that we have determined
* that it's local. But we don't dare to do it, because that
* would immediately allow reads as well, which shouldn't
* happen. We could cache it with a different 'relpersistence'
* value, but this isn't performance critical.
* There can be race condition between backend A performing unlogged index build and some other backend B evicting page of this index.
* After `mdexists` check in backend B is completed and returns true, backend A can complete unlogged build and unlink local files
* of this relation before backend B performs `mdwrite`. It is not a problem if unlogged relation has just one segment:
* it is opened and file descriptor is cached by smgr, preventing removing file on the disk. So subsequent `mdwrite` will succeed.
* But if unlogged relation is large enough and consists of multiple segments, then first segment will be truncated and all other - removed.
* So attempt to write to some segment cause error because file doesn't exists any more and previous segment is empty or not exists as well.
* Calling `mdnblocks` cause SMGR to open and cache descriptors of all relation segments and so prevent there removal which guarantees that
* subsequent `mdwrite` will succeed.
*/
return;
if (mdnblocks(reln, forknum) >= blkno)
{
/* prevent race condition with unlogged build end which unlinks local files */
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
/*
* We could set relpersistence now that we have determined
* that it's local. But we don't dare to do it, because that
* would immediately allow reads as well, which shouldn't
* happen. We could cache it with a different 'relpersistence'
* value, but this isn't performance critical.
*/
return;
}
}
break;

View File

@@ -25,7 +25,6 @@ The value to place in the `aud` claim.
@final
class ComputeClaimsScope(StrEnum):
ADMIN = "admin"
COMPUTE_CTL_EXTERNAL_API = "compute_ctl:external_api"
@final

View File

@@ -536,14 +536,16 @@ class NeonLocalCli(AbstractNeonCli):
res.check_returncode()
return res
def endpoint_generate_jwt(self, endpoint_id: str, scope: list[ComputeClaimsScope]) -> str:
def endpoint_generate_jwt(
self, endpoint_id: str, scope: ComputeClaimsScope | None = None
) -> str:
"""
Generate a JWT for making requests to the endpoint's external HTTP
server.
"""
args = ["endpoint", "generate-jwt", endpoint_id]
for s in scope:
args += ["--scope", str(s)]
if scope:
args += ["--scope", str(scope)]
cmd = self.raw_cli(args)
cmd.check_returncode()

View File

@@ -4282,7 +4282,7 @@ class Endpoint(PgProtocol, LogUtils):
self.config(config_lines)
self.__jwt = self.generate_jwt([ComputeClaimsScope.COMPUTE_CTL_EXTERNAL_API])
self.__jwt = self.generate_jwt()
return self
@@ -4329,7 +4329,7 @@ class Endpoint(PgProtocol, LogUtils):
return self
def generate_jwt(self, scope: list[ComputeClaimsScope]) -> str:
def generate_jwt(self, scope: ComputeClaimsScope | None = None) -> str:
"""
Generate a JWT for making requests to the endpoint's external HTTP
server.