mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Fix paths to match infra more closely.
Make extension_server actually async. Handle more complex cases of extensions with their dependencies.
This commit is contained in:
committed by
Anastasia Lubennikova
parent
33f1bacfb7
commit
3ce678b3bb
@@ -72,18 +72,10 @@ fn main() -> Result<()> {
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
|
||||
|
||||
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
|
||||
let ext_remote_storage = match remote_ext_config {
|
||||
Some(x) => match init_remote_storage(x, build_tag) {
|
||||
Ok(y) => Some(y),
|
||||
Err(e) => {
|
||||
panic!(
|
||||
"cannot initialize remote extension storage from config {}: {}",
|
||||
x, e
|
||||
);
|
||||
}
|
||||
},
|
||||
None => None,
|
||||
};
|
||||
let ext_remote_storage = remote_ext_config.map(|x| {
|
||||
init_remote_storage(x, build_tag)
|
||||
.expect("cannot initialize remote extension storage from config")
|
||||
});
|
||||
|
||||
let http_port = *matches
|
||||
.get_one::<u16>("http-port")
|
||||
|
||||
@@ -20,6 +20,7 @@ use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
|
||||
use crate::extension_server::PathAndFlag;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
use crate::{config, extension_server};
|
||||
@@ -53,8 +54,8 @@ pub struct ComputeNode {
|
||||
/// the S3 bucket that we search for extensions in
|
||||
pub ext_remote_storage: Option<GenericRemoteStorage>,
|
||||
// cached lists of available extensions and libraries
|
||||
pub available_libraries: OnceLock<HashMap<String, RemotePath>>,
|
||||
pub available_extensions: OnceLock<HashMap<String, Vec<RemotePath>>>,
|
||||
pub available_libraries: OnceLock<HashMap<String, Vec<RemotePath>>>,
|
||||
pub available_extensions: OnceLock<HashMap<String, Vec<PathAndFlag>>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -531,6 +532,9 @@ impl ComputeNode {
|
||||
pspec.timeline_id,
|
||||
);
|
||||
|
||||
// TODO FIXME: this should not be blocking here
|
||||
// Maybe we can run it as a child process?
|
||||
// Also, worth measuring how long this step is taking
|
||||
self.prepare_external_extensions(&compute_state)?;
|
||||
|
||||
self.prepare_pgdata(&compute_state, extension_server_port)?;
|
||||
@@ -734,33 +738,30 @@ LIMIT 100",
|
||||
}
|
||||
|
||||
info!("Libraries to download: {:?}", &libs_vec);
|
||||
|
||||
// download extension control files & shared_preload_libraries
|
||||
|
||||
let available_extensions = extension_server::get_available_extensions(
|
||||
let get_extensions_task = extension_server::get_available_extensions(
|
||||
ext_remote_storage,
|
||||
&self.pgbin,
|
||||
&self.pgversion,
|
||||
&custom_ext_prefixes,
|
||||
)
|
||||
.await?;
|
||||
self.available_extensions
|
||||
.set(available_extensions)
|
||||
.expect("available_extensions.set error");
|
||||
|
||||
let available_libraries = extension_server::get_available_libraries(
|
||||
);
|
||||
let get_libraries_task = extension_server::get_available_libraries(
|
||||
ext_remote_storage,
|
||||
&self.pgbin,
|
||||
&self.pgversion,
|
||||
&custom_ext_prefixes,
|
||||
&libs_vec,
|
||||
)
|
||||
.await?;
|
||||
);
|
||||
|
||||
let (available_extensions, available_libraries) =
|
||||
tokio::join!(get_extensions_task, get_libraries_task);
|
||||
self.available_extensions
|
||||
.set(available_extensions?)
|
||||
.expect("available_extensions.set error");
|
||||
self.available_libraries
|
||||
.set(available_libraries)
|
||||
.set(available_libraries?)
|
||||
.expect("available_libraries.set error");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Download extension files from the extension store
|
||||
// and put them in the right place in the postgres directory
|
||||
use anyhow::{self, bail, Context, Result};
|
||||
use futures::future::join_all;
|
||||
use remote_storage::*;
|
||||
use serde_json::{self, Value};
|
||||
use std::collections::HashMap;
|
||||
@@ -12,7 +13,15 @@ use std::str;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::info;
|
||||
|
||||
const SHARE_EXT_PATH: &str = "share/postgresql/extension";
|
||||
// remote!
|
||||
const SHARE_EXT_PATH: &str = "share/extension";
|
||||
|
||||
fn pass_any_error(results: Vec<Result<()>>) -> Result<()> {
|
||||
for result in results {
|
||||
result?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
||||
// gives the result of `pg_config [argument]`
|
||||
@@ -42,59 +51,42 @@ pub fn get_pg_version(pgbin: &str) -> String {
|
||||
|
||||
async fn download_helper(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
remote_from_path: &RemotePath,
|
||||
remote_from_prefix: Option<&Path>,
|
||||
remote_from_path: RemotePath,
|
||||
sub_directory: Option<&str>,
|
||||
download_location: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
// downloads file at remote_from_path to download_location/[file_name]
|
||||
// downloads file at remote_from_path to
|
||||
// `download_location/[optional: subdirectory]/[remote_storage.object_name()]`
|
||||
// Note: the subdirectory commmand is needed when there is an extension that
|
||||
// depends on files in a subdirectory.
|
||||
// For example, v14/share/extension/some_ext.control
|
||||
// might depend on v14/share/extension/some_ext/some_ext--1.1.0.sql
|
||||
// and v14/share/extension/some_ext/xxx.csv
|
||||
// Note: it is the caller's responsibility to create the appropriate subdirectory
|
||||
|
||||
// we cannot use remote_from_path.object_name() here
|
||||
// because extension files can be in subdirectories of the extension store.
|
||||
//
|
||||
// To handle this, we use remote_from_prefix to strip the prefix from the path
|
||||
// this gives us the relative path of the file in the extension store,
|
||||
// and we use this relative path to construct the local path.
|
||||
//
|
||||
let local_path = match remote_from_prefix {
|
||||
Some(prefix) => {
|
||||
let p = remote_from_path
|
||||
.get_path()
|
||||
.strip_prefix(prefix)
|
||||
.expect("bad prefix");
|
||||
|
||||
download_location.join(p)
|
||||
}
|
||||
let local_path = match sub_directory {
|
||||
Some(subdir) => download_location
|
||||
.join(subdir)
|
||||
.join(remote_from_path.object_name().expect("bad object")),
|
||||
None => download_location.join(remote_from_path.object_name().expect("bad object")),
|
||||
};
|
||||
|
||||
if local_path.exists() {
|
||||
info!("File {:?} already exists. Skipping download", &local_path);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
"Downloading {:?} to location {:?}",
|
||||
&remote_from_path, &local_path
|
||||
);
|
||||
let mut download = remote_storage.download(remote_from_path).await?;
|
||||
let mut download = remote_storage.download(&remote_from_path).await?;
|
||||
let mut write_data_buffer = Vec::new();
|
||||
download
|
||||
.download_stream
|
||||
.read_to_end(&mut write_data_buffer)
|
||||
.await?;
|
||||
if remote_from_prefix.is_some() {
|
||||
if let Some(prefix) = local_path.parent() {
|
||||
info!(
|
||||
"Downloading file with prefix. Create directory {:?}",
|
||||
prefix
|
||||
);
|
||||
// if directory already exists, this is a no-op
|
||||
std::fs::create_dir_all(prefix)?;
|
||||
}
|
||||
}
|
||||
|
||||
let mut output_file = BufWriter::new(File::create(local_path)?);
|
||||
output_file.write_all(&write_data_buffer)?;
|
||||
info!("Download {:?} completed successfully", &remote_from_path);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -107,15 +99,14 @@ pub async fn get_available_extensions(
|
||||
pgbin: &str,
|
||||
pg_version: &str,
|
||||
custom_ext_prefixes: &Vec<String>,
|
||||
) -> anyhow::Result<HashMap<String, Vec<RemotePath>>> {
|
||||
) -> anyhow::Result<HashMap<String, Vec<PathAndFlag>>> {
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
|
||||
// public path, plus any private paths to download extensions from
|
||||
let mut paths: Vec<RemotePath> = Vec::new();
|
||||
// public extensions
|
||||
paths.push(RemotePath::new(
|
||||
&Path::new(pg_version).join(SHARE_EXT_PATH),
|
||||
)?);
|
||||
// custom extensions
|
||||
for custom_prefix in custom_ext_prefixes {
|
||||
paths.push(RemotePath::new(
|
||||
&Path::new(pg_version)
|
||||
@@ -124,23 +115,21 @@ pub async fn get_available_extensions(
|
||||
)?);
|
||||
}
|
||||
|
||||
let all_available_files = list_files_in_prefixes_for_extensions(remote_storage, &paths).await?;
|
||||
|
||||
info!(
|
||||
"list of available_extension files {:?}",
|
||||
&all_available_files
|
||||
);
|
||||
let (extension_files, control_files) =
|
||||
organized_extension_files(remote_storage, &paths).await?;
|
||||
|
||||
let mut control_file_download_tasks = Vec::new();
|
||||
// download all control files
|
||||
for (obj_name, obj_paths) in &all_available_files {
|
||||
for obj_path in obj_paths {
|
||||
if obj_name.ends_with("control") {
|
||||
download_helper(remote_storage, obj_path, None, &local_sharedir).await?;
|
||||
}
|
||||
}
|
||||
for control_file in control_files {
|
||||
control_file_download_tasks.push(download_helper(
|
||||
remote_storage,
|
||||
control_file.clone(),
|
||||
None,
|
||||
&local_sharedir,
|
||||
));
|
||||
}
|
||||
|
||||
Ok(all_available_files)
|
||||
pass_any_error(join_all(control_file_download_tasks).await)?;
|
||||
Ok(extension_files)
|
||||
}
|
||||
|
||||
// Download requested shared_preload_libraries
|
||||
@@ -155,11 +144,9 @@ pub async fn get_available_libraries(
|
||||
pg_version: &str,
|
||||
custom_ext_prefixes: &Vec<String>,
|
||||
preload_libraries: &Vec<String>,
|
||||
) -> anyhow::Result<HashMap<String, RemotePath>> {
|
||||
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
|
||||
) -> anyhow::Result<HashMap<String, Vec<RemotePath>>> {
|
||||
// Construct a hashmap of all available libraries
|
||||
// example (key, value) pair: test_lib0.so, v14/lib/test_lib0.so
|
||||
|
||||
// example (key, value) pair: test_lib0: [RemotePath(v14/lib/test_lib0.so), RemotePath(v14/lib/test_lib0.so.3)]
|
||||
let mut paths: Vec<RemotePath> = Vec::new();
|
||||
// public libraries
|
||||
paths.push(
|
||||
@@ -173,32 +160,20 @@ pub async fn get_available_libraries(
|
||||
.expect("The hard coded path here is valid"),
|
||||
);
|
||||
}
|
||||
|
||||
let all_available_libraries = list_files_in_prefixes(remote_storage, &paths).await?;
|
||||
let all_available_libraries = organized_library_files(remote_storage, &paths).await?;
|
||||
|
||||
info!("list of library files {:?}", &all_available_libraries);
|
||||
|
||||
// download all requested libraries
|
||||
let mut download_tasks = Vec::new();
|
||||
for lib_name in preload_libraries {
|
||||
// add file extension if it isn't in the filename
|
||||
let lib_name_with_ext = enforce_so_end(lib_name);
|
||||
info!("looking for library {:?}", &lib_name_with_ext);
|
||||
|
||||
match all_available_libraries.get(&*lib_name_with_ext) {
|
||||
Some(remote_path) => {
|
||||
download_helper(remote_storage, remote_path, None, &local_libdir).await?
|
||||
}
|
||||
None => {
|
||||
let file_path = local_libdir.join(&lib_name_with_ext);
|
||||
if file_path.exists() {
|
||||
info!("File {:?} already exists. Skipping download", &file_path);
|
||||
} else {
|
||||
bail!("Shared library file {lib_name} is not found in the extension store")
|
||||
}
|
||||
}
|
||||
}
|
||||
download_tasks.push(download_library_file(
|
||||
lib_name,
|
||||
remote_storage,
|
||||
pgbin,
|
||||
&all_available_libraries,
|
||||
));
|
||||
}
|
||||
|
||||
pass_any_error(join_all(download_tasks).await)?;
|
||||
Ok(all_available_libraries)
|
||||
}
|
||||
|
||||
@@ -208,47 +183,54 @@ pub async fn download_extension_files(
|
||||
ext_name: &str,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
all_available_files: &HashMap<String, Vec<RemotePath>>,
|
||||
all_available_files: &HashMap<String, Vec<PathAndFlag>>,
|
||||
) -> Result<()> {
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
let mut downloaded_something = false;
|
||||
let mut made_subdir = false;
|
||||
|
||||
info!("EXTENSION {:?}", ext_name);
|
||||
info!("{:?}", all_available_files.get(ext_name));
|
||||
|
||||
info!("start download");
|
||||
let mut download_tasks = Vec::new();
|
||||
if let Some(files) = all_available_files.get(ext_name) {
|
||||
for file in files {
|
||||
if file.extension().context("bad file name")? != "control" {
|
||||
// find files prefix to handle cases when extension files are stored
|
||||
// in a directory with the same name as the extension
|
||||
// example:
|
||||
// share/postgresql/extension/extension_name/extension_name--1.0.sql
|
||||
let index = file
|
||||
.get_path()
|
||||
.to_str()
|
||||
.context("invalid path")?
|
||||
.find(ext_name)
|
||||
.context("invalid path")?;
|
||||
|
||||
let prefix_str =
|
||||
file.get_path().to_str().context("invalid path")?[..index].to_string();
|
||||
let remote_from_prefix = if prefix_str.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Path::new(&prefix_str))
|
||||
};
|
||||
|
||||
download_helper(remote_storage, file, remote_from_prefix, &local_sharedir).await?;
|
||||
downloaded_something = true;
|
||||
info!("Downloading files for extension {:?}", &ext_name);
|
||||
for path_and_flag in files {
|
||||
let file = &path_and_flag.path;
|
||||
let subdir_flag = path_and_flag.subdir_flag;
|
||||
info!(
|
||||
"--- Downloading {:?} (for {:?} as subdir? = {:?})",
|
||||
&file, &ext_name, subdir_flag
|
||||
);
|
||||
let mut subdir = None;
|
||||
if subdir_flag {
|
||||
subdir = Some(ext_name);
|
||||
if !made_subdir {
|
||||
made_subdir = true;
|
||||
std::fs::create_dir_all(local_sharedir.join(ext_name))?;
|
||||
}
|
||||
}
|
||||
download_tasks.push(download_helper(
|
||||
remote_storage,
|
||||
file.clone(),
|
||||
subdir,
|
||||
&local_sharedir,
|
||||
));
|
||||
downloaded_something = true;
|
||||
}
|
||||
}
|
||||
if !downloaded_something {
|
||||
bail!("Files for extension {ext_name} are not found in the extension store");
|
||||
}
|
||||
pass_any_error(join_all(download_tasks).await)?;
|
||||
info!("finish download");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// appends an .so suffix to libname if it does not already have one
|
||||
fn enforce_so_end(libname: &str) -> String {
|
||||
if !libname.ends_with(".so") {
|
||||
if !libname.contains(".so") {
|
||||
format!("{}.so", libname)
|
||||
} else {
|
||||
libname.to_string()
|
||||
@@ -260,20 +242,44 @@ pub async fn download_library_file(
|
||||
lib_name: &str,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
all_available_libraries: &HashMap<String, RemotePath>,
|
||||
all_available_libraries: &HashMap<String, Vec<RemotePath>>,
|
||||
) -> Result<()> {
|
||||
let lib_name = get_library_name(lib_name);
|
||||
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
|
||||
let lib_name_with_ext = enforce_so_end(lib_name);
|
||||
info!("looking for library {:?}", &lib_name_with_ext);
|
||||
match all_available_libraries.get(&*lib_name_with_ext) {
|
||||
Some(remote_path) => {
|
||||
download_helper(remote_storage, remote_path, None, &local_libdir).await?
|
||||
info!("looking for library {:?}", &lib_name);
|
||||
match all_available_libraries.get(&*lib_name) {
|
||||
Some(remote_paths) => {
|
||||
let mut library_download_tasks = Vec::new();
|
||||
for remote_path in remote_paths {
|
||||
let file_path = local_libdir.join(remote_path.object_name().expect("bad object"));
|
||||
if file_path.exists() {
|
||||
info!("File {:?} already exists. Skipping download", &file_path);
|
||||
} else {
|
||||
library_download_tasks.push(download_helper(
|
||||
remote_storage,
|
||||
remote_path.clone(),
|
||||
None,
|
||||
&local_libdir,
|
||||
));
|
||||
}
|
||||
}
|
||||
pass_any_error(join_all(library_download_tasks).await)?;
|
||||
}
|
||||
None => {
|
||||
// minor TODO: this logic seems to be somewhat faulty for .so.3 type files?
|
||||
let lib_name_with_ext = enforce_so_end(&lib_name);
|
||||
let file_path = local_libdir.join(lib_name_with_ext);
|
||||
if file_path.exists() {
|
||||
info!("File {:?} already exists. Skipping download", &file_path);
|
||||
} else {
|
||||
bail!("Library file {lib_name} not found")
|
||||
}
|
||||
}
|
||||
None => bail!("Shared library file {lib_name} is not found in the extension store"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// This function initializes the necessary structs to use remmote storage (should be fairly cheap)
|
||||
pub fn init_remote_storage(
|
||||
remote_ext_config: &str,
|
||||
default_prefix: &str,
|
||||
@@ -315,70 +321,108 @@ pub fn init_remote_storage(
|
||||
GenericRemoteStorage::from_config(&config)
|
||||
}
|
||||
|
||||
// helper to collect all files in the given prefixes
|
||||
// returns hashmap of (file_name, file_remote_path)
|
||||
async fn list_files_in_prefixes(
|
||||
fn get_library_name(path: &str) -> String {
|
||||
let path_suffix: Vec<&str> = path.split('/').collect();
|
||||
let path_suffix = path_suffix.last().expect("bad ext name").to_string();
|
||||
if let Some(index) = path_suffix.find(".so") {
|
||||
return path_suffix[..index].to_string();
|
||||
}
|
||||
path_suffix
|
||||
}
|
||||
|
||||
// asyncrounously lists files in all necessary directories
|
||||
// TODO: potential optimization: do a single list files on the entire bucket
|
||||
// and then filter out the files we don't need
|
||||
async fn list_all_files(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
paths: &Vec<RemotePath>,
|
||||
) -> Result<HashMap<String, RemotePath>> {
|
||||
let mut res = HashMap::new();
|
||||
|
||||
) -> Result<Vec<RemotePath>> {
|
||||
let mut list_tasks = Vec::new();
|
||||
let mut all_files = Vec::new();
|
||||
for path in paths {
|
||||
for file in remote_storage.list_files(Some(path)).await? {
|
||||
res.insert(
|
||||
file.object_name().expect("bad object").to_owned(),
|
||||
file.to_owned(),
|
||||
);
|
||||
}
|
||||
list_tasks.push(remote_storage.list_files(Some(path)));
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
for list_result in join_all(list_tasks).await {
|
||||
all_files.extend(list_result?);
|
||||
}
|
||||
Ok(all_files)
|
||||
}
|
||||
|
||||
// helper to extract extension name
|
||||
// extension files can be in subdirectories of the extension store.
|
||||
// examples of layout:
|
||||
//
|
||||
// share/postgresql/extension/extension_name--1.0.sql
|
||||
//
|
||||
// or
|
||||
//
|
||||
// share/postgresql/extension/extension_name/extension_name--1.0.sql
|
||||
// share/postgresql/extension/extension_name/extra_data.csv
|
||||
//
|
||||
// Note: we **assume** that the extension files is in one of these formats.
|
||||
// If it is not, this code will not download it.
|
||||
fn get_ext_name(path: &str) -> Result<&str> {
|
||||
let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect();
|
||||
|
||||
let path_suffix = path_suffix.last().expect("bad ext name");
|
||||
// the order of these is important
|
||||
// otherwise we'll return incorrect extension name
|
||||
// for path like share/postgresql/extension/extension_name/extension_name--1.0.sql
|
||||
for index in ["/", "--"] {
|
||||
if let Some(index) = path_suffix.find(index) {
|
||||
return Ok(&path_suffix[..index]);
|
||||
}
|
||||
}
|
||||
Ok(path_suffix)
|
||||
}
|
||||
|
||||
// helper to collect files of given prefixes for extensions
|
||||
// and group them by extension
|
||||
// returns a hashmap of (extension_name, Vector of remote paths for all files needed for this extension)
|
||||
async fn list_files_in_prefixes_for_extensions(
|
||||
// helper to collect all libraries, grouped by library name
|
||||
// Returns a hashmap of (library name: [paths]})
|
||||
// example entry: {libpgtypes: [libpgtypes.so.3, libpgtypes.so]}
|
||||
async fn organized_library_files(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
paths: &Vec<RemotePath>,
|
||||
) -> Result<HashMap<String, Vec<RemotePath>>> {
|
||||
let mut result = HashMap::new();
|
||||
for path in paths {
|
||||
for file in remote_storage.list_files(Some(path)).await? {
|
||||
let file_ext_name = get_ext_name(file.get_path().to_str().context("invalid path")?)?;
|
||||
let ext_file_list = result
|
||||
let mut library_groups = HashMap::new();
|
||||
for file in list_all_files(remote_storage, paths).await? {
|
||||
let lib_name = get_library_name(file.get_path().to_str().context("invalid path")?);
|
||||
let lib_list = library_groups.entry(lib_name).or_insert(Vec::new());
|
||||
lib_list.push(file.to_owned());
|
||||
}
|
||||
Ok(library_groups)
|
||||
}
|
||||
|
||||
// store a path, paired with a flag indicating whether the path is to a file in
|
||||
// the root or subdirectory
|
||||
#[derive(Debug)]
|
||||
pub struct PathAndFlag {
|
||||
path: RemotePath,
|
||||
subdir_flag: bool,
|
||||
}
|
||||
|
||||
// get_ext_name extracts the extension name, and returns a flag indicating
|
||||
// whether this file is in a subdirectory or not.
|
||||
//
|
||||
// extension files can be in subdirectories of the extension store.
|
||||
// examples of layout:
|
||||
// v14//share//extension/extension_name--1.0.sql,
|
||||
// v14//share//extension/extension_name/extension_name--1.0.sql,
|
||||
// v14//share//extension/extension_name/extra_data.csv
|
||||
// Note: we *assume* that the extension files is in one of these formats.
|
||||
// If it is not, this code's behavior is *undefined*.
|
||||
fn get_ext_name(path: &str) -> Result<(&str, bool)> {
|
||||
let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect();
|
||||
let ext_name = path_suffix.last().expect("bad ext name");
|
||||
|
||||
if let Some(index) = ext_name.find('/') {
|
||||
return Ok((&ext_name[..index], true));
|
||||
} else if let Some(index) = ext_name.find("--") {
|
||||
return Ok((&ext_name[..index], false));
|
||||
}
|
||||
Ok((ext_name, false))
|
||||
}
|
||||
|
||||
// helper to collect files of given prefixes for extensions and group them by extension
|
||||
// returns a hashmap of (extension_name, Vector of remote paths for all files needed for this extension)
|
||||
// and a list of control files
|
||||
// For example, an entry in the hashmap could be
|
||||
// {"anon": [RemotePath("v14/anon/share/extension/anon/address.csv"),
|
||||
// RemotePath("v14/anon/share/extension/anon/anon--1.1.0.sql")]},
|
||||
// with corresponding list of control files entry being
|
||||
// {"anon.control": RemotePath("v14/anon/share/extension/anon.control")}
|
||||
async fn organized_extension_files(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
paths: &Vec<RemotePath>,
|
||||
) -> Result<(HashMap<String, Vec<PathAndFlag>>, Vec<RemotePath>)> {
|
||||
let mut grouped_dependencies = HashMap::new();
|
||||
let mut control_files = Vec::new();
|
||||
|
||||
for file in list_all_files(remote_storage, paths).await? {
|
||||
if file.extension().context("bad file name")? == "control" {
|
||||
control_files.push(file.to_owned());
|
||||
} else {
|
||||
let (file_ext_name, subdir_flag) =
|
||||
get_ext_name(file.get_path().to_str().context("invalid path")?)?;
|
||||
let ext_file_list = grouped_dependencies
|
||||
.entry(file_ext_name.to_string())
|
||||
.or_insert(Vec::new());
|
||||
ext_file_list.push(file.to_owned());
|
||||
ext_file_list.push(PathAndFlag {
|
||||
path: file.to_owned(),
|
||||
subdir_flag,
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
Ok((grouped_dependencies, control_files))
|
||||
}
|
||||
|
||||
@@ -491,7 +491,7 @@ impl Endpoint {
|
||||
pageserver_connstring: Some(pageserver_connstring),
|
||||
safekeeper_connstrings,
|
||||
storage_auth_token: auth_token.clone(),
|
||||
// This is a hack to test custom extensions locally.
|
||||
// TODO FIXME: This is a hack to test custom extensions locally.
|
||||
// In test_download_extensions, we assume that the custom extension
|
||||
// prefix is the tenant ID. So we set it here.
|
||||
//
|
||||
|
||||
@@ -53,14 +53,14 @@ neon_download_extension_file_http(const char *filename, bool is_library)
|
||||
}
|
||||
|
||||
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
|
||||
extension_server_port, filename, is_library?"?is_library=true":"");
|
||||
extension_server_port, filename, is_library ? "?is_library=true" : "");
|
||||
|
||||
elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url);
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
|
||||
curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url);
|
||||
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */);
|
||||
|
||||
// NOTE: 15L may be insufficient time for large extensions like postgis
|
||||
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 15L /* seconds */);
|
||||
|
||||
if (curl)
|
||||
{
|
||||
|
||||
@@ -22,7 +22,7 @@ comment = 'This is a mock extension'
|
||||
default_version = '1.0'
|
||||
module_pathname = '$libdir/test_ext{i}'
|
||||
relocatable = true"""
|
||||
return output
|
||||
return BytesIO(bytes(output, "utf-8"))
|
||||
|
||||
|
||||
def sql_file_content():
|
||||
@@ -33,7 +33,11 @@ def sql_file_content():
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
"""
|
||||
return output
|
||||
return BytesIO(bytes(output, "utf-8"))
|
||||
|
||||
|
||||
def fake_library_content():
|
||||
return BytesIO(bytes("\n111\n", "utf-8"))
|
||||
|
||||
|
||||
# Prepare some mock extension files and upload them to the bucket
|
||||
@@ -47,9 +51,10 @@ def prepare_mock_ext_storage(
|
||||
):
|
||||
bucket_prefix = ext_remote_storage.prefix_in_bucket
|
||||
custom_prefix = str(tenant_id)
|
||||
PUB_EXT_ROOT = f"v{pg_version}/share/postgresql/extension"
|
||||
PRIVATE_EXT_ROOT = f"v{pg_version}/{custom_prefix}/share/postgresql/extension"
|
||||
LOCAL_EXT_ROOT = f"pg_install/{PUB_EXT_ROOT}"
|
||||
|
||||
PUB_EXT_ROOT = f"v{pg_version}/share/extension"
|
||||
PRIVATE_EXT_ROOT = f"v{pg_version}/{custom_prefix}/share/extension"
|
||||
LOCAL_EXT_ROOT = f"pg_install/{pg_version}/share/postgresql/extension"
|
||||
|
||||
PUB_LIB_ROOT = f"v{pg_version}/lib"
|
||||
PRIVATE_LIB_ROOT = f"v{pg_version}/{custom_prefix}/lib"
|
||||
@@ -70,56 +75,53 @@ def prepare_mock_ext_storage(
|
||||
|
||||
# Upload several test_ext{i}.control files to the bucket
|
||||
for i in range(NUM_EXT):
|
||||
public_ext = BytesIO(bytes(control_file_content("public", i), "utf-8"))
|
||||
public_remote_name = f"{bucket_prefix}/{PUB_EXT_ROOT}/test_ext{i}.control"
|
||||
public_local_name = f"{LOCAL_EXT_ROOT}/test_ext{i}.control"
|
||||
custom_ext = BytesIO(bytes(control_file_content(str(tenant_id), i), "utf-8"))
|
||||
custom_remote_name = f"{bucket_prefix}/{PRIVATE_EXT_ROOT}/custom_ext{i}.control"
|
||||
custom_local_name = f"{LOCAL_EXT_ROOT}/custom_ext{i}.control"
|
||||
cleanup_files += [public_local_name, custom_local_name]
|
||||
cleanup_files += [
|
||||
f"{LOCAL_EXT_ROOT}/test_ext{i}.control",
|
||||
f"{LOCAL_EXT_ROOT}/custom_ext{i}.control",
|
||||
]
|
||||
|
||||
log.info(f"Uploading control file to {public_remote_name}")
|
||||
remote_storage_client.upload_fileobj(
|
||||
public_ext, ext_remote_storage.bucket_name, public_remote_name
|
||||
control_file_content("public", i), ext_remote_storage.bucket_name, public_remote_name
|
||||
)
|
||||
log.info(f"Uploading control file to {custom_remote_name}")
|
||||
remote_storage_client.upload_fileobj(
|
||||
custom_ext, ext_remote_storage.bucket_name, custom_remote_name
|
||||
control_file_content(str(tenant_id), i),
|
||||
ext_remote_storage.bucket_name,
|
||||
custom_remote_name,
|
||||
)
|
||||
|
||||
# Upload SQL file for the extension we're going to create
|
||||
sql_filename = "test_ext0--1.0.sql"
|
||||
test_sql_public_remote_path = f"{bucket_prefix}/{PUB_EXT_ROOT}/{sql_filename}"
|
||||
test_sql_local_path = f"{LOCAL_EXT_ROOT}/{sql_filename}"
|
||||
test_ext_sql_file = BytesIO(bytes(sql_file_content(), "utf-8"))
|
||||
remote_storage_client.upload_fileobj(
|
||||
test_ext_sql_file,
|
||||
sql_file_content(),
|
||||
ext_remote_storage.bucket_name,
|
||||
test_sql_public_remote_path,
|
||||
)
|
||||
cleanup_files += [test_sql_local_path]
|
||||
cleanup_files += [f"{LOCAL_EXT_ROOT}/{sql_filename}"]
|
||||
|
||||
# upload some fake library files
|
||||
for i in range(2):
|
||||
public_library = BytesIO(bytes("\n111\n", "utf-8"))
|
||||
public_remote_name = f"{bucket_prefix}/{PUB_LIB_ROOT}/test_lib{i}.so"
|
||||
public_local_name = f"{LOCAL_LIB_ROOT}/test_lib{i}.so"
|
||||
custom_library = BytesIO(bytes("\n111\n", "utf-8"))
|
||||
custom_remote_name = f"{bucket_prefix}/{PRIVATE_LIB_ROOT}/custom_lib{i}.so"
|
||||
custom_local_name = f"{LOCAL_LIB_ROOT}/custom_lib{i}.so"
|
||||
|
||||
log.info(f"uploading library to {public_remote_name}")
|
||||
log.info(f"uploading library to {custom_remote_name}")
|
||||
|
||||
log.info(f"uploading fake library to {public_remote_name}")
|
||||
remote_storage_client.upload_fileobj(
|
||||
public_library,
|
||||
fake_library_content(),
|
||||
ext_remote_storage.bucket_name,
|
||||
public_remote_name,
|
||||
)
|
||||
|
||||
log.info(f"uploading fake library to {custom_remote_name}")
|
||||
remote_storage_client.upload_fileobj(
|
||||
custom_library,
|
||||
fake_library_content(),
|
||||
ext_remote_storage.bucket_name,
|
||||
custom_remote_name,
|
||||
)
|
||||
cleanup_files += [public_local_name, custom_local_name]
|
||||
cleanup_files += [f"{LOCAL_LIB_ROOT}/test_lib{i}.so", f"{LOCAL_LIB_ROOT}/custom_lib{i}.so"]
|
||||
|
||||
return cleanup_files
|
||||
|
||||
@@ -136,7 +138,8 @@ def prepare_mock_ext_storage(
|
||||
# export AWS_SECURITY_TOKEN='test'
|
||||
# export AWS_SESSION_TOKEN='test'
|
||||
# export AWS_DEFAULT_REGION='us-east-1'
|
||||
#
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
|
||||
def test_remote_extensions(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
|
||||
Reference in New Issue
Block a user