mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 05:30:37 +00:00
Compare commits
4 Commits
tristan957
...
unlogged_b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3821af6a6f | ||
|
|
f432b48cf0 | ||
|
|
f05df409bd | ||
|
|
f6c0f6c4ec |
@@ -310,13 +310,13 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
|
||||
. "$HOME/.cargo/env" && \
|
||||
cargo --version && rustup --version && \
|
||||
rustup component add llvm-tools rustfmt clippy && \
|
||||
cargo install rustfilt --version ${RUSTFILT_VERSION} && \
|
||||
cargo install cargo-hakari --version ${CARGO_HAKARI_VERSION} && \
|
||||
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \
|
||||
cargo install cargo-hack --version ${CARGO_HACK_VERSION} && \
|
||||
cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} && \
|
||||
cargo install cargo-chef --locked --version ${CARGO_CHEF_VERSION} && \
|
||||
cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} \
|
||||
cargo install rustfilt --version ${RUSTFILT_VERSION} --locked && \
|
||||
cargo install cargo-hakari --version ${CARGO_HAKARI_VERSION} --locked && \
|
||||
cargo install cargo-deny --version ${CARGO_DENY_VERSION} --locked && \
|
||||
cargo install cargo-hack --version ${CARGO_HACK_VERSION} --locked && \
|
||||
cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} --locked && \
|
||||
cargo install cargo-chef --version ${CARGO_CHEF_VERSION} --locked && \
|
||||
cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} --locked \
|
||||
--features postgres-bundled --no-default-features && \
|
||||
rm -rf /home/nonroot/.cargo/registry && \
|
||||
rm -rf /home/nonroot/.cargo/git
|
||||
|
||||
@@ -64,9 +64,8 @@ struct Cli {
|
||||
pub pgbin: String,
|
||||
|
||||
/// The base URL for the remote extension storage proxy gateway.
|
||||
/// Should be in the form of `http(s)://<gateway-hostname>[:<port>]`.
|
||||
#[arg(short = 'r', long, alias = "remote-ext-config")]
|
||||
pub remote_ext_base_url: Option<String>,
|
||||
#[arg(short = 'r', long)]
|
||||
pub remote_ext_base_url: Option<Url>,
|
||||
|
||||
/// The port to bind the external listening HTTP server to. Clients running
|
||||
/// outside the compute will talk to the compute through this port. Keep
|
||||
|
||||
@@ -31,6 +31,7 @@ use std::time::{Duration, Instant};
|
||||
use std::{env, fs};
|
||||
use tokio::spawn;
|
||||
use tracing::{Instrument, debug, error, info, instrument, warn};
|
||||
use url::Url;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::measured_stream::MeasuredReader;
|
||||
@@ -96,7 +97,7 @@ pub struct ComputeNodeParams {
|
||||
pub internal_http_port: u16,
|
||||
|
||||
/// the address of extension storage proxy gateway
|
||||
pub remote_ext_base_url: Option<String>,
|
||||
pub remote_ext_base_url: Option<Url>,
|
||||
|
||||
/// Interval for installed extensions collection
|
||||
pub installed_extensions_collection_interval: u64,
|
||||
|
||||
@@ -83,6 +83,7 @@ use reqwest::StatusCode;
|
||||
use tar::Archive;
|
||||
use tracing::info;
|
||||
use tracing::log::warn;
|
||||
use url::Url;
|
||||
use zstd::stream::read::Decoder;
|
||||
|
||||
use crate::metrics::{REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
|
||||
@@ -158,14 +159,14 @@ fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
|
||||
pub async fn download_extension(
|
||||
ext_name: &str,
|
||||
ext_path: &RemotePath,
|
||||
remote_ext_base_url: &str,
|
||||
remote_ext_base_url: &Url,
|
||||
pgbin: &str,
|
||||
) -> Result<u64> {
|
||||
info!("Download extension {:?} from {:?}", ext_name, ext_path);
|
||||
|
||||
// TODO add retry logic
|
||||
let download_buffer =
|
||||
match download_extension_tar(remote_ext_base_url, &ext_path.to_string()).await {
|
||||
match download_extension_tar(remote_ext_base_url.as_str(), &ext_path.to_string()).await {
|
||||
Ok(buffer) => buffer,
|
||||
Err(error_message) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
|
||||
@@ -64,57 +64,45 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
};
|
||||
|
||||
match data.claims.scope {
|
||||
// If the scope is not [`ComputeClaimsScope::Admin`], then we
|
||||
// must validate the compute_id
|
||||
Some(ref scope) => {
|
||||
// TODO: We should validate audience for every token, but
|
||||
// instead of this ad-hoc validation, we should turn
|
||||
// [`Validation::validate_aud`] on. This is merely a stopgap
|
||||
// while we roll out `aud` deployment. We return a 401
|
||||
// Unauthorized because when we eventually do use
|
||||
// [`Validation`], we will hit the above `Err` match arm which
|
||||
// returns 401 Unauthorized.
|
||||
if scope.contains(&ComputeClaimsScope::Admin) {
|
||||
let Some(ref audience) = data.claims.audience else {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"missing audience in authorization token claims",
|
||||
));
|
||||
};
|
||||
|
||||
if !audience.iter().any(|a| a == COMPUTE_AUDIENCE) {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"invalid audience in authorization token claims",
|
||||
));
|
||||
}
|
||||
} else if scope.contains(&ComputeClaimsScope::ComputeCtlExternalApi) {
|
||||
let Some(ref claimed_compute_id) = data.claims.compute_id else {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::FORBIDDEN,
|
||||
"missing compute_id in authorization token claims",
|
||||
));
|
||||
};
|
||||
|
||||
if *claimed_compute_id != compute_id {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::FORBIDDEN,
|
||||
"invalid compute ID in authorization token claims",
|
||||
));
|
||||
}
|
||||
} else {
|
||||
// TODO: We should validate audience for every token, but
|
||||
// instead of this ad-hoc validation, we should turn
|
||||
// [`Validation::validate_aud`] on. This is merely a stopgap
|
||||
// while we roll out `aud` deployment. We return a 401
|
||||
// Unauthorized because when we eventually do use
|
||||
// [`Validation`], we will hit the above `Err` match arm which
|
||||
// returns 401 Unauthorized.
|
||||
Some(ComputeClaimsScope::Admin) => {
|
||||
let Some(ref audience) = data.claims.audience else {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"compute_ctl:external_api scope not included",
|
||||
"missing audience in authorization token claims",
|
||||
));
|
||||
};
|
||||
|
||||
if !audience.iter().any(|a| a == COMPUTE_AUDIENCE) {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"invalid audience in authorization token claims",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
None => {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
"compute_ctl:external_api scope not included",
|
||||
));
|
||||
// If the scope is not [`ComputeClaimsScope::Admin`], then we
|
||||
// must validate the compute_id
|
||||
_ => {
|
||||
let Some(ref claimed_compute_id) = data.claims.compute_id else {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::FORBIDDEN,
|
||||
"missing compute_id in authorization token claims",
|
||||
));
|
||||
};
|
||||
|
||||
if *claimed_compute_id != compute_id {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::FORBIDDEN,
|
||||
"invalid compute ID in authorization token claims",
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -709,7 +709,7 @@ struct EndpointGenerateJwtCmdArgs {
|
||||
endpoint_id: String,
|
||||
|
||||
#[clap(short = 's', long, help = "Scope to generate the JWT with", value_parser = ComputeClaimsScope::from_str)]
|
||||
scope: Vec<ComputeClaimsScope>,
|
||||
scope: Option<ComputeClaimsScope>,
|
||||
}
|
||||
|
||||
#[derive(clap::Subcommand)]
|
||||
@@ -1580,7 +1580,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?
|
||||
};
|
||||
|
||||
let jwt = endpoint.generate_jwt(Some(args.scope.clone()))?;
|
||||
let jwt = endpoint.generate_jwt(args.scope)?;
|
||||
|
||||
print!("{jwt}");
|
||||
}
|
||||
|
||||
@@ -632,16 +632,14 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
/// Generate a JWT with the correct claims.
|
||||
pub fn generate_jwt(&self, scope: Option<Vec<ComputeClaimsScope>>) -> Result<String> {
|
||||
pub fn generate_jwt(&self, scope: Option<ComputeClaimsScope>) -> Result<String> {
|
||||
self.env.generate_auth_token(&ComputeClaims {
|
||||
audience: match scope {
|
||||
Some(ref scope) if scope.contains(&ComputeClaimsScope::Admin) => {
|
||||
Some(vec![COMPUTE_AUDIENCE.to_owned()])
|
||||
}
|
||||
Some(ComputeClaimsScope::Admin) => Some(vec![COMPUTE_AUDIENCE.to_owned()]),
|
||||
_ => None,
|
||||
},
|
||||
compute_id: match scope {
|
||||
Some(ref scope) if scope.contains(&ComputeClaimsScope::Admin) => None,
|
||||
Some(ComputeClaimsScope::Admin) => None,
|
||||
_ => Some(self.endpoint_id.clone()),
|
||||
},
|
||||
scope,
|
||||
@@ -920,7 +918,7 @@ impl Endpoint {
|
||||
self.external_http_address.port()
|
||||
),
|
||||
)
|
||||
.bearer_auth(self.generate_jwt(Some(vec![ComputeClaimsScope::ComputeCtlExternalApi]))?)
|
||||
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
@@ -997,7 +995,7 @@ impl Endpoint {
|
||||
self.external_http_address.port()
|
||||
))
|
||||
.header(CONTENT_TYPE.as_str(), "application/json")
|
||||
.bearer_auth(self.generate_jwt(Some(vec![ComputeClaimsScope::ComputeCtlExternalApi]))?)
|
||||
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
|
||||
.body(
|
||||
serde_json::to_string(&ConfigurationRequest {
|
||||
spec,
|
||||
|
||||
@@ -17,10 +17,6 @@ pub enum ComputeClaimsScope {
|
||||
/// An admin-scoped token allows access to all of `compute_ctl`'s authorized
|
||||
/// facilities.
|
||||
Admin,
|
||||
|
||||
/// Scope providing access to the `compute_ctl` external API.
|
||||
#[serde(rename = "compute_ctl:external_api")]
|
||||
ComputeCtlExternalApi,
|
||||
}
|
||||
|
||||
impl FromStr for ComputeClaimsScope {
|
||||
@@ -29,7 +25,6 @@ impl FromStr for ComputeClaimsScope {
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"admin" => Ok(ComputeClaimsScope::Admin),
|
||||
"compute_ctl:external_api" => Ok(ComputeClaimsScope::ComputeCtlExternalApi),
|
||||
_ => Err(anyhow::anyhow!("invalid compute claims scope \"{s}\"")),
|
||||
}
|
||||
}
|
||||
@@ -46,8 +41,8 @@ pub struct ComputeClaims {
|
||||
/// [`ComputeClaimsScope::Admin`].
|
||||
pub compute_id: Option<String>,
|
||||
|
||||
/// The scopes of what the token is authorized for.
|
||||
pub scope: Option<Vec<ComputeClaimsScope>>,
|
||||
/// The scope of what the token authorizes.
|
||||
pub scope: Option<ComputeClaimsScope>,
|
||||
|
||||
/// The recipient the token is intended for.
|
||||
///
|
||||
|
||||
@@ -1604,19 +1604,23 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
|
||||
#endif
|
||||
{
|
||||
/* It exists locally. Guess it's unlogged then. */
|
||||
if (mdnblocks(reln, forknum) >= blocknum)
|
||||
{
|
||||
/* prevent race condition with unlogged build end which unlinks local files */
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
|
||||
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
|
||||
#else
|
||||
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
|
||||
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
|
||||
#endif
|
||||
/*
|
||||
* We could set relpersistence now that we have determined
|
||||
* that it's local. But we don't dare to do it, because that
|
||||
* would immediately allow reads as well, which shouldn't
|
||||
* happen. We could cache it with a different 'relpersistence'
|
||||
* value, but this isn't performance critical.
|
||||
*/
|
||||
return;
|
||||
/*
|
||||
* We could set relpersistence now that we have determined
|
||||
* that it's local. But we don't dare to do it, because that
|
||||
* would immediately allow reads as well, which shouldn't
|
||||
* happen. We could cache it with a different 'relpersistence'
|
||||
* value, but this isn't performance critical.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -1684,17 +1688,30 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
||||
if (mdexists(reln, INIT_FORKNUM))
|
||||
#endif
|
||||
{
|
||||
/* It exists locally. Guess it's unlogged then. */
|
||||
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
|
||||
|
||||
/*
|
||||
* We could set relpersistence now that we have determined
|
||||
* that it's local. But we don't dare to do it, because that
|
||||
* would immediately allow reads as well, which shouldn't
|
||||
* happen. We could cache it with a different 'relpersistence'
|
||||
* value, but this isn't performance critical.
|
||||
* There can be race condition between backend A performing unlogged index build and some other backend B evicting page of this index.
|
||||
* After `mdexists` check in backend B is completed and returns true, backend A can complete unlogged build and unlink local files
|
||||
* of this relation before backend B performs `mdwrite`. It is not a problem if unlogged relation has just one segment:
|
||||
* it is opened and file descriptor is cached by smgr, preventing removing file on the disk. So subsequent `mdwrite` will succeed.
|
||||
* But if unlogged relation is large enough and consists of multiple segments, then first segment will be truncated and all other - removed.
|
||||
* So attempt to write to some segment cause error because file doesn't exists any more and previous segment is empty or not exists as well.
|
||||
* Calling `mdnblocks` cause SMGR to open and cache descriptors of all relation segments and so prevent there removal which guarantees that
|
||||
* subsequent `mdwrite` will succeed.
|
||||
*/
|
||||
return;
|
||||
if (mdnblocks(reln, forknum) >= blkno)
|
||||
{
|
||||
/* prevent race condition with unlogged build end which unlinks local files */
|
||||
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
|
||||
|
||||
/*
|
||||
* We could set relpersistence now that we have determined
|
||||
* that it's local. But we don't dare to do it, because that
|
||||
* would immediately allow reads as well, which shouldn't
|
||||
* happen. We could cache it with a different 'relpersistence'
|
||||
* value, but this isn't performance critical.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
|
||||
@@ -25,7 +25,6 @@ The value to place in the `aud` claim.
|
||||
@final
|
||||
class ComputeClaimsScope(StrEnum):
|
||||
ADMIN = "admin"
|
||||
COMPUTE_CTL_EXTERNAL_API = "compute_ctl:external_api"
|
||||
|
||||
|
||||
@final
|
||||
|
||||
@@ -536,14 +536,16 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
def endpoint_generate_jwt(self, endpoint_id: str, scope: list[ComputeClaimsScope]) -> str:
|
||||
def endpoint_generate_jwt(
|
||||
self, endpoint_id: str, scope: ComputeClaimsScope | None = None
|
||||
) -> str:
|
||||
"""
|
||||
Generate a JWT for making requests to the endpoint's external HTTP
|
||||
server.
|
||||
"""
|
||||
args = ["endpoint", "generate-jwt", endpoint_id]
|
||||
for s in scope:
|
||||
args += ["--scope", str(s)]
|
||||
if scope:
|
||||
args += ["--scope", str(scope)]
|
||||
|
||||
cmd = self.raw_cli(args)
|
||||
cmd.check_returncode()
|
||||
|
||||
@@ -4282,7 +4282,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
self.config(config_lines)
|
||||
|
||||
self.__jwt = self.generate_jwt([ComputeClaimsScope.COMPUTE_CTL_EXTERNAL_API])
|
||||
self.__jwt = self.generate_jwt()
|
||||
|
||||
return self
|
||||
|
||||
@@ -4329,7 +4329,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
return self
|
||||
|
||||
def generate_jwt(self, scope: list[ComputeClaimsScope]) -> str:
|
||||
def generate_jwt(self, scope: ComputeClaimsScope | None = None) -> str:
|
||||
"""
|
||||
Generate a JWT for making requests to the endpoint's external HTTP
|
||||
server.
|
||||
|
||||
Reference in New Issue
Block a user