rework all the logic for the zip file organization scheme

This commit is contained in:
Alek Westover
2023-07-11 11:28:52 -04:00
parent 7bcd06b2f7
commit 4f6edae2ad
5 changed files with 97 additions and 464 deletions

View File

@@ -198,7 +198,7 @@ fn main() -> Result<()> {
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage,
available_libraries: OnceLock::new(),
// available_libraries: OnceLock::new(),
available_extensions: OnceLock::new(),
};
let compute = Arc::new(compute_node);

View File

@@ -20,7 +20,6 @@ use compute_api::spec::{ComputeMode, ComputeSpec};
use remote_storage::{GenericRemoteStorage, RemotePath};
use crate::extension_server::PathAndFlag;
use crate::pg_helpers::*;
use crate::spec::*;
use crate::{config, extension_server};
@@ -54,8 +53,8 @@ pub struct ComputeNode {
/// the S3 bucket that we search for extensions in
pub ext_remote_storage: Option<GenericRemoteStorage>,
// cached lists of available extensions and libraries
pub available_libraries: OnceLock<HashMap<String, Vec<RemotePath>>>,
pub available_extensions: OnceLock<HashMap<String, Vec<PathAndFlag>>>,
// pub available_libraries: OnceLock<HashMap<String, Vec<RemotePath>>>,
pub available_extensions: OnceLock<HashMap<String, RemotePath>>,
}
#[derive(Clone, Debug)]
@@ -534,16 +533,15 @@ impl ComputeNode {
// This part is sync, because we need to download
// remote shared_preload_libraries before postgres start (if any)
let library_load_start_time = Utc::now();
{
self.prepare_extenal_libraries(&compute_state)?;
let library_load_start_time = Utc::now();
self.prepare_preload_libraries(&compute_state)?;
let library_load_time = Utc::now()
.signed_duration_since(library_load_start_time)
.to_std()
.unwrap()
.as_millis() as u64;
let mut state = self.state.lock().unwrap();
state.metrics.load_libraries_ms = library_load_time;
info!(
@@ -689,86 +687,6 @@ LIMIT 100",
}
}
// If remote extension storage is configured,
// download shared preload libraries.
#[tokio::main]
pub async fn prepare_extenal_libraries(&self, compute_state: &ComputeState) -> Result<()> {
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
// download preload shared libraries before postgres start (if any)
let spec = &pspec.spec;
// 1. parse custom extension paths from spec
let custom_ext_prefixes = match &spec.custom_extensions {
Some(custom_extensions) => custom_extensions.clone(),
None => Vec::new(),
};
info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes);
// parse shared_preload_libraries from spec
let mut libs_vec = Vec::new();
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
libs_vec = libs
.split(&[',', '\'', ' '])
.filter(|s| *s != "neon" && !s.is_empty())
.map(str::to_string)
.collect();
}
info!(
"shared_preload_libraries parsed from spec.cluster.settings: {:?}",
libs_vec
);
// also parse shared_preload_libraries from provided postgresql.conf
// that is used in neon_local and python tests
if let Some(conf) = &spec.cluster.postgresql_conf {
let conf_lines = conf.split('\n').collect::<Vec<&str>>();
let mut shared_preload_libraries_line = "";
for line in conf_lines {
if line.starts_with("shared_preload_libraries") {
shared_preload_libraries_line = line;
}
}
let mut preload_libs_vec = Vec::new();
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
preload_libs_vec = libs
.split(&[',', '\'', ' '])
.filter(|s| *s != "neon" && !s.is_empty())
.map(str::to_string)
.collect();
}
info!(
"shared_preload_libraries parsed from spec.cluster.postgresql_conf: {:?}",
preload_libs_vec
);
libs_vec.extend(preload_libs_vec);
}
info!("Libraries to download: {:?}", &libs_vec);
// download shared_preload_libraries
let available_libraries = extension_server::get_available_libraries(
ext_remote_storage,
&self.pgbin,
&self.pgversion,
&custom_ext_prefixes,
&libs_vec,
)
.await?;
self.available_libraries
.set(available_libraries)
.expect("available_libraries.set error");
}
Ok(())
}
// If remote extension storage is configured,
// download extension control files
#[tokio::main]
@@ -801,37 +719,29 @@ LIMIT 100",
Ok(())
}
pub async fn download_extension_files(&self, filename: String) -> Result<()> {
pub async fn download_extension(&self, ext_name: &str) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_extension_files(
&filename,
remote_storage,
&self.pgbin,
extension_server::download_extension(
ext_name,
self.available_extensions
.get()
.context("available_extensions broke")?,
.context("extension download error")?
.get(ext_name)
.context("cannot find extension")?,
remote_storage,
&self.pgbin,
)
.await
}
}
}
pub async fn download_library_file(&self, filename: String) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_library_file(
&filename,
remote_storage,
&self.pgbin,
self.available_libraries
.get()
.context("available_libraries broke")?,
)
.await
}
}
#[tokio::main]
pub async fn prepare_preload_libraries(&self, compute_state: &ComputeState) -> Result<()> {
// TODO: revive some of the old logic for downloading shared preload libaries
info!("ERRRRRORRRR");
Ok(())
}
}

View File

@@ -1,30 +1,20 @@
// Download extension files from the extension store
// and put them in the right place in the postgres directory
use crate::compute::ComputeNode;
use anyhow::{self, bail, Context, Result};
use futures::future::{join_all, Remote};
use anyhow::{self, bail, Result};
use remote_storage::*;
use serde_json::{self, Value};
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::io::BufWriter;
use std::io::Write;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::{Path, PathBuf};
use std::path::Path;
use std::str;
use std::sync::Arc;
use std::thread;
use tokio::io::AsyncReadExt;
use tracing::info;
// remote!
const SHARE_EXT_PATH: &str = "share/extension";
fn pass_any_error(results: Vec<Result<()>>) -> Result<()> {
for result in results {
result?;
}
Ok(())
}
use tracing::{info, warn};
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
@@ -52,233 +42,93 @@ pub fn get_pg_version(pgbin: &str) -> String {
panic!("Unsuported postgres version {human_version}");
}
async fn download_helper(
remote_storage: &GenericRemoteStorage,
remote_from_path: RemotePath,
sub_directory: Option<&str>,
download_location: &Path,
) -> anyhow::Result<()> {
// downloads file at remote_from_path to
// `download_location/[optional: subdirectory]/[remote_storage.object_name()]`
// Note: the subdirectory commmand is needed when there is an extension that
// depends on files in a subdirectory.
// For example, v14/share/extension/some_ext.control
// might depend on v14/share/extension/some_ext/some_ext--1.1.0.sql
// and v14/share/extension/some_ext/xxx.csv
// Note: it is the caller's responsibility to create the appropriate subdirectory
let local_path = match sub_directory {
Some(subdir) => download_location
.join(subdir)
.join(remote_from_path.object_name().expect("bad object")),
None => download_location.join(remote_from_path.object_name().expect("bad object")),
};
if local_path.exists() {
info!("File {:?} already exists. Skipping download", &local_path);
return Ok(());
}
info!(
"Downloading {:?} to location {:?}",
&remote_from_path, &local_path
);
let mut download = remote_storage.download(&remote_from_path).await?;
let mut write_data_buffer = Vec::new();
download
.download_stream
.read_to_end(&mut write_data_buffer)
.await?;
let mut output_file = BufWriter::new(File::create(local_path)?);
output_file.write_all(&write_data_buffer)?;
info!("Download {:?} completed successfully", &remote_from_path);
Ok(())
}
// download extension control files
//
// if custom_ext_prefixes is provided - search also in custom extension paths
//
pub async fn get_available_extensions(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
pg_version: &str,
custom_ext_prefixes: &Vec<String>,
) -> anyhow::Result<Vec<RemotePath>> {
) -> Result<HashMap<String, RemotePath>> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
// public path, plus any private paths to download extensions from
let mut paths: Vec<RemotePath> = Vec::new();
paths.push(RemotePath::new(&Path::new(pg_version).join("public"))?);
for custom_prefix in custom_ext_prefixes {
paths.push(RemotePath::new(&Path::new(pg_version).join(custom_prefix))?);
}
let (control_files, zip_files) = organized_extension_files(remote_storage, &paths).await?;
let mut control_file_download_tasks = Vec::new();
// download all control files
for control_file in control_files {
control_file_download_tasks.push(download_helper(
remote_storage,
control_file.clone(),
None,
&local_sharedir,
));
}
pass_any_error(join_all(control_file_download_tasks).await)?;
Ok(zip_files)
}
let index_path = RemotePath::new(Path::new(&format!("{:?}/ext_index.json", pg_version)))
.expect("error forming path");
let mut download = remote_storage.download(&index_path).await?;
let mut write_data_buffer = Vec::new();
download
.download_stream
.read_to_end(&mut write_data_buffer)
.await?;
let ext_index_str =
serde_json::to_string(&write_data_buffer).expect("Failed to convert to JSON");
let ext_index_full = match serde_json::from_str(&ext_index_str) {
Ok(Value::Object(map)) => map,
_ => bail!("error parsing json"),
};
pub async fn download_extensions(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
pg_version: &str,
custom_ext_prefixes: &Vec<String>,
) {
// OK I was just going to download everything, but that seems wrong.
}
let mut prefixes = vec!["public".to_string()];
prefixes.extend(custom_ext_prefixes.clone());
let mut ext_index_limited = HashMap::new();
for prefix in prefixes {
let ext_details_str = ext_index_full.get(&prefix);
if let Some(ext_details_str) = ext_details_str {
let ext_details =
serde_json::to_string(ext_details_str).expect("Failed to convert to JSON");
let ext_details = match serde_json::from_str(&ext_details) {
Ok(Value::Object(map)) => map,
_ => bail!("error parsing json"),
};
let control_contents = match ext_details.get("control").expect("broken json file") {
Value::String(s) => s,
_ => bail!("broken json file"),
};
let path = RemotePath::new(Path::new(&format!(
"{:?}/{:?}",
pg_version,
ext_details.get("path")
)))
.expect("error forming path");
// Download requested shared_preload_libraries
//
// Note that tenant_id is not optional here, because we only download libraries
// after we know the tenant spec and the tenant_id.
//
// return list of all library files to use it in the future searches
pub async fn get_available_libraries(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
pg_version: &str,
custom_ext_prefixes: &Vec<String>,
preload_libraries: &Vec<String>,
) -> anyhow::Result<HashMap<String, Vec<RemotePath>>> {
// Construct a hashmap of all available libraries
// example (key, value) pair: test_lib0: [RemotePath(v14/lib/test_lib0.so), RemotePath(v14/lib/test_lib0.so.3)]
let mut paths: Vec<RemotePath> = Vec::new();
// public libraries
paths.push(
RemotePath::new(&Path::new(&pg_version).join("lib/"))
.expect("The hard coded path here is valid"),
);
// custom libraries
for custom_prefix in custom_ext_prefixes {
paths.push(
RemotePath::new(&Path::new(&pg_version).join(custom_prefix).join("lib"))
.expect("The hard coded path here is valid"),
);
}
let all_available_libraries = organized_library_files(remote_storage, &paths).await?;
let control_path = format!("{:?}/{:?}.control", &local_sharedir, &prefix);
std::fs::write(control_path, &control_contents)?;
info!("list of library files {:?}", &all_available_libraries);
// download all requested libraries
let mut download_tasks = Vec::new();
for lib_name in preload_libraries {
download_tasks.push(download_library_file(
lib_name,
remote_storage,
pgbin,
&all_available_libraries,
));
ext_index_limited.insert(prefix, path);
} else {
warn!("BAD PREFIX {:?}", prefix);
}
}
pass_any_error(join_all(download_tasks).await)?;
Ok(all_available_libraries)
Ok(ext_index_limited)
}
// download all sqlfiles (and possibly data files) for a given extension name
//
pub async fn download_extension_files(
pub async fn download_extension(
ext_name: &str,
ext_path: &RemotePath,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
all_available_files: &HashMap<String, Vec<PathAndFlag>>,
) -> Result<()> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let mut downloaded_something = false;
let mut made_subdir = false;
let local_libdir = Path::new(&get_pg_config("--libdir", pgbin)).to_owned();
info!("Start downloading extension {:?}", ext_name);
let mut download = remote_storage.download(&ext_path).await?;
let mut write_data_buffer = Vec::new();
download
.download_stream
.read_to_end(&mut write_data_buffer)
.await?;
let zip_name = ext_path.object_name().expect("invalid extension path");
let mut output_file = BufWriter::new(File::create(zip_name)?);
output_file.write_all(&write_data_buffer)?;
info!("Download {:?} completed successfully", &ext_path);
info!("Unzipping extension {:?}", zip_name);
info!("EXTENSION {:?}", ext_name);
info!("{:?}", all_available_files.get(ext_name));
// TODO unzip and place files in appropriate locations
info!("unzip {zip_name:?}");
info!("place extension files in {local_sharedir:?}");
info!("place library files in {local_libdir:?}");
info!("start download");
let mut download_tasks = Vec::new();
if let Some(files) = all_available_files.get(ext_name) {
info!("Downloading files for extension {:?}", &ext_name);
for path_and_flag in files {
let file = &path_and_flag.path;
let subdir_flag = path_and_flag.subdir_flag;
info!(
"--- Downloading {:?} (for {:?} as subdir? = {:?})",
&file, &ext_name, subdir_flag
);
let mut subdir = None;
if subdir_flag {
subdir = Some(ext_name);
if !made_subdir {
made_subdir = true;
std::fs::create_dir_all(local_sharedir.join(ext_name))?;
}
}
download_tasks.push(download_helper(
remote_storage,
file.clone(),
subdir,
&local_sharedir,
));
downloaded_something = true;
}
}
if !downloaded_something {
bail!("Files for extension {ext_name} are not found in the extension store");
}
pass_any_error(join_all(download_tasks).await)?;
info!("finish download");
Ok(())
}
// appends an .so suffix to libname if it does not already have one
fn enforce_so_end(libname: &str) -> String {
if !libname.contains(".so") {
format!("{}.so", libname)
} else {
libname.to_string()
}
}
// download shared library file
pub async fn download_library_file(
lib_name: &str,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
all_available_libraries: &HashMap<String, Vec<RemotePath>>,
) -> Result<()> {
let lib_name = get_library_name(lib_name);
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
info!("looking for library {:?}", &lib_name);
match all_available_libraries.get(&*lib_name) {
Some(remote_paths) => {
let mut library_download_tasks = Vec::new();
for remote_path in remote_paths {
let file_path = local_libdir.join(remote_path.object_name().expect("bad object"));
if file_path.exists() {
info!("File {:?} already exists. Skipping download", &file_path);
} else {
library_download_tasks.push(download_helper(
remote_storage,
remote_path.clone(),
None,
&local_libdir,
));
}
}
pass_any_error(join_all(library_download_tasks).await)?;
}
None => {
// minor TODO: this logic seems to be somewhat faulty for .so.3 type files?
let lib_name_with_ext = enforce_so_end(&lib_name);
let file_path = local_libdir.join(lib_name_with_ext);
if file_path.exists() {
info!("File {:?} already exists. Skipping download", &file_path);
} else {
bail!("Library file {lib_name} not found")
}
}
}
Ok(())
}
@@ -307,7 +157,8 @@ pub fn init_remote_storage(
_ => Some(default_prefix.to_string()),
};
// load will not be large, so default parameters are fine
// TODO: is this a valid assumption? some extensions are quite large
// load should not be large, so default parameters are fine
let config = S3Config {
bucket_name: remote_ext_bucket.to_string(),
bucket_region: remote_ext_region.to_string(),
@@ -324,103 +175,6 @@ pub fn init_remote_storage(
GenericRemoteStorage::from_config(&config)
}
fn get_library_name(path: &str) -> String {
let path_suffix: Vec<&str> = path.split('/').collect();
let path_suffix = path_suffix.last().expect("bad ext name").to_string();
if let Some(index) = path_suffix.find(".so") {
return path_suffix[..index].to_string();
}
path_suffix
}
// asyncrounously lists files in all necessary directories
// TODO: potential optimization: do a single list files on the entire bucket
// and then filter out the files we don't need
async fn list_all_files(
remote_storage: &GenericRemoteStorage,
paths: &Vec<RemotePath>,
) -> Result<Vec<RemotePath>> {
let mut list_tasks = Vec::new();
let mut all_files = Vec::new();
for path in paths {
list_tasks.push(remote_storage.list_files(Some(path)));
}
for list_result in join_all(list_tasks).await {
all_files.extend(list_result?);
}
Ok(all_files)
}
// helper to collect all libraries, grouped by library name
// Returns a hashmap of (library name: [paths]})
// example entry: {libpgtypes: [libpgtypes.so.3, libpgtypes.so]}
async fn organized_library_files(
remote_storage: &GenericRemoteStorage,
paths: &Vec<RemotePath>,
) -> Result<HashMap<String, Vec<RemotePath>>> {
let mut library_groups = HashMap::new();
for file in list_all_files(remote_storage, paths).await? {
let lib_name = get_library_name(file.get_path().to_str().context("invalid path")?);
let lib_list = library_groups.entry(lib_name).or_insert(Vec::new());
lib_list.push(file.to_owned());
}
Ok(library_groups)
}
// store a path, paired with a flag indicating whether the path is to a file in
// the root or subdirectory
#[derive(Debug)]
pub struct PathAndFlag {
path: RemotePath,
subdir_flag: bool,
}
// get_ext_name extracts the extension name, and returns a flag indicating
// whether this file is in a subdirectory or not.
//
// extension files can be in subdirectories of the extension store.
// examples of layout:
// v14//share//extension/extension_name--1.0.sql,
// v14//share//extension/extension_name/extension_name--1.0.sql,
// v14//share//extension/extension_name/extra_data.csv
// Note: we *assume* that the extension files is in one of these formats.
// If it is not, this code's behavior is *undefined*.
fn get_ext_name(path: &str) -> Result<(&str, bool)> {
let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect();
let ext_name = path_suffix.last().expect("bad ext name");
if let Some(index) = ext_name.find('/') {
return Ok((&ext_name[..index], true));
} else if let Some(index) = ext_name.find("--") {
return Ok((&ext_name[..index], false));
}
Ok((ext_name, false))
}
// list files in all given directories and find the zip / control file in them
async fn organized_extension_files(
remote_storage: &GenericRemoteStorage,
paths: &Vec<RemotePath>,
) -> Result<(Vec<RemotePath>, Vec<RemotePath>)> {
let mut control_files = Vec::new();
let mut zip_files = Vec::new();
let mut list_file_tasks = Vec::new();
for path in paths {
list_file_tasks.push(remote_storage.list_files(Some(path)));
}
for list_result in join_all(list_file_tasks).await {
for file in list_result? {
if file.extension().expect("bad file name") == "control" {
control_files.push(file.to_owned());
} else {
zip_files.push(file.to_owned());
}
}
}
Ok((control_files, zip_files))
}
pub fn launch_download_extensions(
compute: &Arc<ComputeNode>,
) -> Result<thread::JoinHandle<()>, std::io::Error> {

View File

@@ -125,47 +125,19 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
info!("req.uri {:?}", req.uri());
let mut is_library = false;
if let Some(params) = req.uri().query() {
info!("serving {:?} POST request with params: {}", route, params);
if params == "is_library=true" {
is_library = true;
} else {
let mut resp = Response::new(Body::from("Wrong request parameters"));
*resp.status_mut() = StatusCode::BAD_REQUEST;
return resp;
}
}
let filename = route.split('/').last().unwrap().to_string();
info!(
"serving /extension_server POST request, filename: {:?} is_library: {}",
filename, is_library
"serving /extension_server POST request, filename: {:?}",
&filename
);
if is_library {
match compute.download_library_file(filename.to_string()).await {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("library download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
}
} else {
match compute.download_extension_files(filename.to_string()).await {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("extension download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
match compute.download_extension(&filename).await {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("extension download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
}
}

View File

@@ -1,17 +1,14 @@
import json
from contextlib import closing
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
RemoteStorageKind,
available_s3_storages,
)
from fixtures.pg_version import PgVersion
# Generate mock extension files and upload them to the mock bucket.
#
# NOTE: You must have appropriate AWS credentials to run REAL_S3 test.