cache extensions

This commit is contained in:
Alek Westover
2023-06-28 14:51:55 -04:00
parent 0875c2284c
commit a6c9a4abe7
3 changed files with 71 additions and 74 deletions

View File

@@ -193,6 +193,7 @@ fn main() -> Result<()> {
state_changed: Condvar::new(),
ext_remote_storage,
available_libraries: Mutex::new(HashMap::new()),
available_extensions: Mutex::new(HashMap::new()),
};
let compute = Arc::new(compute_node);

View File

@@ -53,6 +53,7 @@ pub struct ComputeNode {
/// S3 extensions configuration variables
pub ext_remote_storage: Option<GenericRemoteStorage>,
pub available_libraries: Mutex<HashMap<String, RemotePath>>,
pub available_extensions: Mutex<HashMap<String, Vec<RemotePath>>>,
}
#[derive(Clone, Debug)]
@@ -733,7 +734,8 @@ LIMIT 100",
// download extension control files & shared_preload_libraries
extension_server::get_available_extensions(
let mut available_extensions_lock = self.available_extensions.lock().unwrap();
*available_extensions_lock = extension_server::get_available_extensions(
ext_remote_storage,
&self.pgbin,
&self.pgversion,
@@ -769,13 +771,12 @@ LIMIT 100",
};
info!("private_ext_prefixes: {:?}", &private_ext_prefixes);
let available_extensions_lock = self.available_extensions.lock().unwrap().clone();
extension_server::download_extension_sql_files(
&filename,
remote_storage,
&self.pgbin,
&self.pgversion,
&private_ext_prefixes,
&available_extensions_lock,
)
.await
}

View File

@@ -1,6 +1,6 @@
// Download extension files from the extension store
// and put them in the right place in the postgres directory
use anyhow::{self, bail, Result};
use anyhow::{self, bail, Context, Result};
use remote_storage::*;
use serde_json::{self, Value};
use std::collections::HashMap;
@@ -12,6 +12,8 @@ use std::str;
use tokio::io::AsyncReadExt;
use tracing::info;
const SHARE_EXT_PATH: &str = "share/postgresql/extension";
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
// where argument is a flag like `--version` or `--sharedir`
@@ -81,7 +83,6 @@ async fn download_helper(
.download_stream
.read_to_end(&mut write_data_buffer)
.await?;
//dbg!(str::from_utf8(&write_data_buffer)?);
if remote_from_prefix.is_some() {
if let Some(prefix) = local_path.parent() {
info!(
@@ -106,24 +107,24 @@ pub async fn get_available_extensions(
pgbin: &str,
pg_version: &str,
private_ext_prefixes: &Vec<String>,
) -> anyhow::Result<()> {
) -> anyhow::Result<HashMap<String, Vec<RemotePath>>> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let mut paths: Vec<RemotePath> = Vec::new();
// public extensions
paths.push(RemotePath::new(
&Path::new(pg_version).join("share/postgresql/extension"),
&Path::new(pg_version).join(SHARE_EXT_PATH),
)?);
// private extensions
for private_prefix in private_ext_prefixes {
paths.push(RemotePath::new(
&Path::new(pg_version)
.join(private_prefix)
.join("share/postgresql/extension"),
.join(SHARE_EXT_PATH),
)?);
}
let all_available_files = list_files_in_prefixes(remote_storage, &paths).await?;
let all_available_files = list_files_in_prefixes_for_extensions(remote_storage, &paths).await?;
info!(
"list of available_extension files {:?}",
@@ -131,13 +132,15 @@ pub async fn get_available_extensions(
);
// download all control files
for (obj_name, obj_path) in &all_available_files {
if obj_name.ends_with("control") {
download_helper(remote_storage, obj_path, None, &local_sharedir).await?;
for (obj_name, obj_paths) in &all_available_files {
for obj_path in obj_paths {
if obj_name.ends_with("control") {
download_helper(remote_storage, obj_path, None, &local_sharedir).await?;
}
}
}
Ok(())
Ok(all_available_files)
}
// Download requested shared_preload_libraries
@@ -201,74 +204,22 @@ pub async fn download_extension_sql_files(
ext_name: &str,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
pg_version: &str,
private_ext_prefixes: &Vec<String>,
all_available_files: &HashMap<String, Vec<RemotePath>>,
) -> Result<()> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let mut downloaded_something = false;
let mut paths: Vec<RemotePath> = Vec::new();
// public extensions
paths.push(RemotePath::new(
&Path::new(&pg_version).join("share/postgresql/extension"),
)?);
// private extensions
for private_prefix in private_ext_prefixes {
paths.push(RemotePath::new(
&Path::new(&pg_version)
.join(private_prefix)
.join("share/postgresql/extension"),
)?);
}
let all_available_files: HashMap<String, RemotePath> =
list_files_in_prefixes(remote_storage, &paths).await?;
info!(
"list of available_extension files {:?}",
&all_available_files
);
// check if extension files exist
let mut files_to_download: Vec<(&RemotePath, Option<&Path>)> = Vec::new();
for (obj_name, obj_path) in &all_available_files {
// ignore control files
if !obj_name.ends_with("control") {
// We can't use just ext.object_name() here
// because extension files can be in subdirectories of the extension store.
// examples of layout:
//
// share/postgresql/extension/extension_name--1.0.sql
//
// or
//
// share/postgresql/extension/extension_name/extension_name--1.0.sql
// share/postgresql/extension/extension_name/extra_data.csv
//
for prefix in paths.iter() {
if let Ok(full_object_name) = obj_path.get_path().strip_prefix(prefix.get_path()) {
if full_object_name.to_str().unwrap().starts_with(ext_name) {
files_to_download.push((obj_path, Some(prefix.get_path())));
}
}
if let Some(files) = all_available_files.get(ext_name) {
for file in files {
if file.extension().context("bad file name")? != "control" {
downloaded_something = true;
download_helper(remote_storage, file, None, &local_sharedir).await?;
}
}
}
if files_to_download.is_empty() {
if !downloaded_something {
bail!("Files for extension {ext_name} are not found in the extension store");
}
for (remote_from_path, remote_from_prefix) in files_to_download {
download_helper(
remote_storage,
remote_from_path,
remote_from_prefix,
&local_sharedir,
)
.await?;
}
Ok(())
}
@@ -355,3 +306,47 @@ async fn list_files_in_prefixes(
Ok(res)
}
// helper to extract extension name
// extension files can be in subdirectories of the extension store.
// examples of layout:
//
// share/postgresql/extension/extension_name--1.0.sql
//
// or
//
// share/postgresql/extension/extension_name/extension_name--1.0.sql
// share/postgresql/extension/extension_name/extra_data.csv
//
// Note: we **assume** that the extension files is in one of these formats.
// If it is not, this code will not download it.
fn get_ext_name(path: &str) -> Result<&str> {
let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect();
let path_suffix = path_suffix.last().expect("bad ext name");
for index in ["--", "/"] {
if let Some(index) = path_suffix.find(index) {
return Ok(&path_suffix[..index]);
}
}
Ok(path_suffix)
}
// helper to collect files of given prefixes for extensions
// and group them by extension
// returns a hashmap of (extension_name, Vector of remote paths for all files needed for this extension)
async fn list_files_in_prefixes_for_extensions(
remote_storage: &GenericRemoteStorage,
paths: &Vec<RemotePath>,
) -> Result<HashMap<String, Vec<RemotePath>>> {
let mut result = HashMap::new();
for path in paths {
for file in remote_storage.list_files(Some(&path)).await? {
let file_ext_name = get_ext_name(file.get_path().to_str().context("invalid path")?)?;
let ext_file_list = result
.entry(file_ext_name.to_string())
.or_insert(Vec::new());
ext_file_list.push(file.to_owned());
}
}
Ok(result)
}