mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 13:10:38 +00:00
Corrects the postgres extension s3 gateway address to be not just a domain name but a full base URL. To make the code more readable, the option is renamed to "remote_ext_base_url", while keeping the old name also accessible by providing a clap argument alias. Also provides a very simple and, perhaps, even redundant unit test to confirm the logic behind parsing of the corresponding CLI argument. ## Problem As it is clearly stated in https://github.com/neondatabase/cloud/issues/26005, using of the short version of the domain name might work for now, but in the future, we should get rid of using the `default` namespace and this is where it will, most likely, break down. ## Summary of changes The changes adjust the domain name of the extension s3 gateway to use the proper base url format instead of the just domain name assuming the "default" namespace and add a new CLI argument name for to reflect the change and the expectance.
380 lines
13 KiB
Rust
380 lines
13 KiB
Rust
// Download extension files from the extension store
|
|
// and put them in the right place in the postgres directory (share / lib)
|
|
/*
|
|
The layout of the S3 bucket is as follows:
|
|
5615610098 // this is an extension build number
|
|
├── v14
|
|
│ ├── extensions
|
|
│ │ ├── anon.tar.zst
|
|
│ │ └── embedding.tar.zst
|
|
│ └── ext_index.json
|
|
└── v15
|
|
├── extensions
|
|
│ ├── anon.tar.zst
|
|
│ └── embedding.tar.zst
|
|
└── ext_index.json
|
|
5615261079
|
|
├── v14
|
|
│ ├── extensions
|
|
│ │ └── anon.tar.zst
|
|
│ └── ext_index.json
|
|
└── v15
|
|
├── extensions
|
|
│ └── anon.tar.zst
|
|
└── ext_index.json
|
|
5623261088
|
|
├── v14
|
|
│ ├── extensions
|
|
│ │ └── embedding.tar.zst
|
|
│ └── ext_index.json
|
|
└── v15
|
|
├── extensions
|
|
│ └── embedding.tar.zst
|
|
└── ext_index.json
|
|
|
|
Note that build number cannot be part of prefix because we might need extensions
|
|
from other build numbers.
|
|
|
|
ext_index.json stores the control files and location of extension archives
|
|
It also stores a list of public extensions and a library_index
|
|
|
|
We don't need to duplicate extension.tar.zst files.
|
|
We only need to upload a new one if it is updated.
|
|
(Although currently we just upload every time anyways, hopefully will change
|
|
this sometime)
|
|
|
|
*access* is controlled by spec
|
|
|
|
More specifically, here is an example ext_index.json
|
|
{
|
|
"public_extensions": [
|
|
"anon",
|
|
"pg_buffercache"
|
|
],
|
|
"library_index": {
|
|
"anon": "anon",
|
|
"pg_buffercache": "pg_buffercache"
|
|
},
|
|
"extension_data": {
|
|
"pg_buffercache": {
|
|
"control_data": {
|
|
"pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
|
|
},
|
|
"archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
|
|
},
|
|
"anon": {
|
|
"control_data": {
|
|
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
|
|
},
|
|
"archive_path": "5670669815/v14/extensions/anon.tar.zst"
|
|
}
|
|
}
|
|
}
|
|
*/
|
|
use std::path::Path;
|
|
use std::str;
|
|
|
|
use anyhow::{Context, Result, bail};
|
|
use bytes::Bytes;
|
|
use compute_api::spec::RemoteExtSpec;
|
|
use regex::Regex;
|
|
use remote_storage::*;
|
|
use reqwest::StatusCode;
|
|
use tar::Archive;
|
|
use tracing::info;
|
|
use tracing::log::warn;
|
|
use zstd::stream::read::Decoder;
|
|
|
|
use crate::metrics::{REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
|
|
|
|
fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
|
// gives the result of `pg_config [argument]`
|
|
// where argument is a flag like `--version` or `--sharedir`
|
|
let pgconfig = pgbin
|
|
.strip_suffix("postgres")
|
|
.expect("bad pgbin")
|
|
.to_owned()
|
|
+ "/pg_config";
|
|
let config_output = std::process::Command::new(pgconfig)
|
|
.arg(argument)
|
|
.output()
|
|
.expect("pg_config error");
|
|
std::str::from_utf8(&config_output.stdout)
|
|
.expect("pg_config error")
|
|
.trim()
|
|
.to_string()
|
|
}
|
|
|
|
pub fn get_pg_version(pgbin: &str) -> PostgresMajorVersion {
|
|
// pg_config --version returns a (platform specific) human readable string
|
|
// such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
|
|
let human_version = get_pg_config("--version", pgbin);
|
|
parse_pg_version(&human_version)
|
|
}
|
|
|
|
pub fn get_pg_version_string(pgbin: &str) -> String {
|
|
match get_pg_version(pgbin) {
|
|
PostgresMajorVersion::V14 => "v14",
|
|
PostgresMajorVersion::V15 => "v15",
|
|
PostgresMajorVersion::V16 => "v16",
|
|
PostgresMajorVersion::V17 => "v17",
|
|
}
|
|
.to_owned()
|
|
}
|
|
|
|
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
|
pub enum PostgresMajorVersion {
|
|
V14,
|
|
V15,
|
|
V16,
|
|
V17,
|
|
}
|
|
|
|
fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
|
|
use PostgresMajorVersion::*;
|
|
// Normal releases have version strings like "PostgreSQL 15.4". But there
|
|
// are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
|
|
// 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
|
|
// configure option, you can tack any string to the version number,
|
|
// e.g. "PostgreSQL 15.4foobar".
|
|
match Regex::new(r"^PostgreSQL (?<major>\d+).+")
|
|
.unwrap()
|
|
.captures(human_version)
|
|
{
|
|
Some(captures) if captures.len() == 2 => match &captures["major"] {
|
|
"14" => return V14,
|
|
"15" => return V15,
|
|
"16" => return V16,
|
|
"17" => return V17,
|
|
_ => {}
|
|
},
|
|
_ => {}
|
|
}
|
|
panic!("Unsuported postgres version {human_version}");
|
|
}
|
|
|
|
// download the archive for a given extension,
|
|
// unzip it, and place files in the appropriate locations (share/lib)
|
|
pub async fn download_extension(
|
|
ext_name: &str,
|
|
ext_path: &RemotePath,
|
|
remote_ext_base_url: &str,
|
|
pgbin: &str,
|
|
) -> Result<u64> {
|
|
info!("Download extension {:?} from {:?}", ext_name, ext_path);
|
|
|
|
// TODO add retry logic
|
|
let download_buffer =
|
|
match download_extension_tar(remote_ext_base_url, &ext_path.to_string()).await {
|
|
Ok(buffer) => buffer,
|
|
Err(error_message) => {
|
|
return Err(anyhow::anyhow!(
|
|
"error downloading extension {:?}: {:?}",
|
|
ext_name,
|
|
error_message
|
|
));
|
|
}
|
|
};
|
|
|
|
let download_size = download_buffer.len() as u64;
|
|
info!("Download size {:?}", download_size);
|
|
// it's unclear whether it is more performant to decompress into memory or not
|
|
// TODO: decompressing into memory can be avoided
|
|
let decoder = Decoder::new(download_buffer.as_ref())?;
|
|
let mut archive = Archive::new(decoder);
|
|
|
|
let unzip_dest = pgbin
|
|
.strip_suffix("/bin/postgres")
|
|
.expect("bad pgbin")
|
|
.to_string()
|
|
+ "/download_extensions";
|
|
archive.unpack(&unzip_dest)?;
|
|
info!("Download + unzip {:?} completed successfully", &ext_path);
|
|
|
|
let sharedir_paths = (
|
|
unzip_dest.to_string() + "/share/extension",
|
|
Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
|
|
);
|
|
let libdir_paths = (
|
|
unzip_dest.to_string() + "/lib",
|
|
Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
|
|
);
|
|
// move contents of the libdir / sharedir in unzipped archive to the correct local paths
|
|
for paths in [sharedir_paths, libdir_paths] {
|
|
let (zip_dir, real_dir) = paths;
|
|
|
|
let dir = match std::fs::read_dir(&zip_dir) {
|
|
Ok(dir) => dir,
|
|
Err(e) => match e.kind() {
|
|
// In the event of a SQL-only extension, there would be nothing
|
|
// to move from the lib/ directory, so note that in the log and
|
|
// move on.
|
|
std::io::ErrorKind::NotFound => {
|
|
info!("nothing to move from {}", zip_dir);
|
|
continue;
|
|
}
|
|
_ => return Err(anyhow::anyhow!(e)),
|
|
},
|
|
};
|
|
|
|
info!("mv {zip_dir:?}/* {real_dir:?}");
|
|
|
|
for file in dir {
|
|
let old_file = file?.path();
|
|
let new_file =
|
|
Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
|
|
info!("moving {old_file:?} to {new_file:?}");
|
|
|
|
// extension download failed: Directory not empty (os error 39)
|
|
match std::fs::rename(old_file, new_file) {
|
|
Ok(()) => info!("move succeeded"),
|
|
Err(e) => {
|
|
warn!("move failed, probably because the extension already exists: {e}")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
info!("done moving extension {ext_name}");
|
|
Ok(download_size)
|
|
}
|
|
|
|
// Create extension control files from spec
|
|
pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
|
|
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
|
for (ext_name, ext_data) in remote_extensions.extension_data.iter() {
|
|
// Check if extension is present in public or custom.
|
|
// If not, then it is not allowed to be used by this compute.
|
|
if let Some(public_extensions) = &remote_extensions.public_extensions {
|
|
if !public_extensions.contains(ext_name) {
|
|
if let Some(custom_extensions) = &remote_extensions.custom_extensions {
|
|
if !custom_extensions.contains(ext_name) {
|
|
continue; // skip this extension, it is not allowed
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for (control_name, control_content) in &ext_data.control_data {
|
|
let control_path = local_sharedir.join(control_name);
|
|
if !control_path.exists() {
|
|
info!("writing file {:?}{:?}", control_path, control_content);
|
|
std::fs::write(control_path, control_content).unwrap();
|
|
} else {
|
|
warn!(
|
|
"control file {:?} exists both locally and remotely. ignoring the remote version.",
|
|
control_path
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Do request to extension storage proxy, e.g.,
|
|
// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
|
|
// using HTTP GET and return the response body as bytes.
|
|
async fn download_extension_tar(remote_ext_base_url: &str, ext_path: &str) -> Result<Bytes> {
|
|
let uri = format!("{}/{}", remote_ext_base_url, ext_path);
|
|
let filename = Path::new(ext_path)
|
|
.file_name()
|
|
.unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
|
|
.to_str()
|
|
.unwrap_or("unknown")
|
|
.to_string();
|
|
|
|
info!("Downloading extension file '{}' from uri {}", filename, uri);
|
|
|
|
match do_extension_server_request(&uri).await {
|
|
Ok(resp) => {
|
|
info!("Successfully downloaded remote extension data {}", ext_path);
|
|
REMOTE_EXT_REQUESTS_TOTAL
|
|
.with_label_values(&[&StatusCode::OK.to_string(), &filename])
|
|
.inc();
|
|
Ok(resp)
|
|
}
|
|
Err((msg, status)) => {
|
|
REMOTE_EXT_REQUESTS_TOTAL
|
|
.with_label_values(&[&status, &filename])
|
|
.inc();
|
|
bail!(msg);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Do a single remote extensions server request.
|
|
// Return result or (error message + stringified status code) in case of any failures.
|
|
async fn do_extension_server_request(uri: &str) -> Result<Bytes, (String, String)> {
|
|
let resp = reqwest::get(uri).await.map_err(|e| {
|
|
(
|
|
format!(
|
|
"could not perform remote extensions server request: {:?}",
|
|
e
|
|
),
|
|
UNKNOWN_HTTP_STATUS.to_string(),
|
|
)
|
|
})?;
|
|
let status = resp.status();
|
|
|
|
match status {
|
|
StatusCode::OK => match resp.bytes().await {
|
|
Ok(resp) => Ok(resp),
|
|
Err(e) => Err((
|
|
format!("could not read remote extensions server response: {:?}", e),
|
|
// It's fine to return and report error with status as 200 OK,
|
|
// because we still failed to read the response.
|
|
status.to_string(),
|
|
)),
|
|
},
|
|
StatusCode::SERVICE_UNAVAILABLE => Err((
|
|
"remote extensions server is temporarily unavailable".to_string(),
|
|
status.to_string(),
|
|
)),
|
|
_ => Err((
|
|
format!(
|
|
"unexpected remote extensions server response status code: {}",
|
|
status
|
|
),
|
|
status.to_string(),
|
|
)),
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::parse_pg_version;
|
|
|
|
#[test]
|
|
fn test_parse_pg_version() {
|
|
use super::PostgresMajorVersion::*;
|
|
assert_eq!(parse_pg_version("PostgreSQL 15.4"), V15);
|
|
assert_eq!(parse_pg_version("PostgreSQL 15.14"), V15);
|
|
assert_eq!(
|
|
parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
|
|
V15
|
|
);
|
|
|
|
assert_eq!(parse_pg_version("PostgreSQL 14.15"), V14);
|
|
assert_eq!(parse_pg_version("PostgreSQL 14.0"), V14);
|
|
assert_eq!(
|
|
parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
|
|
V14
|
|
);
|
|
|
|
assert_eq!(parse_pg_version("PostgreSQL 16devel"), V16);
|
|
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), V16);
|
|
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), V16);
|
|
assert_eq!(parse_pg_version("PostgreSQL 16extra"), V16);
|
|
}
|
|
|
|
#[test]
|
|
#[should_panic]
|
|
fn test_parse_pg_unsupported_version() {
|
|
parse_pg_version("PostgreSQL 13.14");
|
|
}
|
|
|
|
#[test]
|
|
#[should_panic]
|
|
fn test_parse_pg_incorrect_version_format() {
|
|
parse_pg_version("PostgreSQL 14");
|
|
}
|
|
}
|