real s3 tests

This commit is contained in:
Alek Westover
2023-06-23 13:41:08 -04:00
8 changed files with 285 additions and 104 deletions

View File

@@ -49,7 +49,7 @@ use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::{download_extension, init_remote_storage, ExtensionType};
use compute_tools::extension_server::{get_availiable_extensions, init_remote_storage};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -76,12 +76,12 @@ fn main() -> Result<()> {
None => None,
};
let rt0 = Runtime::new().unwrap();
rt0.block_on(async {
download_extension(&ext_remote_storage, ExtensionType::Shared, pgbin)
.await
.expect("download shared extensions should work");
});
// let rt0 = Runtime::new().unwrap();
// rt0.block_on(async {
// download_extension(&ext_remote_storage, ExtensionType::Shared, pgbin)
// .await
// .expect("download shared extensions should work");
// });
let http_port = *matches
.get_one::<u16>("http-port")
@@ -200,7 +200,9 @@ fn main() -> Result<()> {
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage: ext_remote_storage.clone(),
ext_remote_storage,
availiable_extensions: Vec::new(),
availiable_libraries: Vec::new(),
};
let compute = Arc::new(compute_node);
@@ -211,6 +213,19 @@ fn main() -> Result<()> {
let extension_server_port: u16 = http_port;
// exen before we have spec, we can get public availiable extensions
// TODO maybe convert get_availiable_extensions into ComputeNode method as well as other functions
let rt = Runtime::new().unwrap();
let copy_remote_storage = compute.ext_remote_storage.clone();
if let Some(remote_storage) = copy_remote_storage {
rt.block_on(async move {
get_availiable_extensions(&remote_storage, pgbin, None)
.await
.unwrap();
});
}
if !spec_set {
// No spec provided, hang waiting for it.
info!("no compute spec provided, waiting");
@@ -259,6 +274,17 @@ fn main() -> Result<()> {
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
// download private tenant extensions before postgres start
// TODO
// compute_node.availiable_extensions = get_availiable_extensions(ext_remote_storage,
// pg_version, //TODO
// pgbin,
// tenant_id); //TODO get tenant_id from spec
// download preload shared libraries before postgres start (if any)
// TODO
// download_library_file();
// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;

View File

@@ -16,11 +16,11 @@ use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use remote_storage::GenericRemoteStorage;
use remote_storage::{GenericRemoteStorage, RemotePath};
use crate::config;
use crate::pg_helpers::*;
use crate::spec::*;
use crate::{config, extension_server};
/// Compute node info shared across several `compute_ctl` threads.
pub struct ComputeNode {
@@ -49,6 +49,8 @@ pub struct ComputeNode {
pub state_changed: Condvar,
/// S3 extensions configuration variables
pub ext_remote_storage: Option<GenericRemoteStorage>,
pub availiable_extensions: Vec<RemotePath>,
pub availiable_libraries: Vec<RemotePath>,
}
#[derive(Clone, Debug)]
@@ -579,4 +581,28 @@ LIMIT 100",
"{{\"pg_stat_statements\": []}}".to_string()
}
}
pub async fn download_extension_sql_files(&self, filename: String) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_extension_sql_files(
&filename,
&remote_storage,
&self.pgbin,
)
.await
}
}
}
pub async fn download_library_file(&self, filename: String) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_library_file(&filename, &remote_storage, &self.pgbin)
.await
}
}
}
}

View File

@@ -1,4 +1,4 @@
use anyhow::{self, bail};
use anyhow::{self, bail, Result};
use remote_storage::*;
use serde_json::{self, Value};
use std::fs::File;
@@ -8,6 +8,7 @@ use std::path::{Path, PathBuf};
use std::str;
use tokio::io::AsyncReadExt;
use tracing::info;
use utils::id::TenantId;
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
@@ -58,60 +59,117 @@ async fn download_helper(
Ok(())
}
pub enum ExtensionType {
Shared, // we just use the public folder here
Tenant(String), // String is tenant_id
Library(String), // String is name of the extension
}
pub async fn download_extension(
remote_storage: &Option<GenericRemoteStorage>,
ext_type: ExtensionType,
// download extension control files
//
// return list of all extension files to use it in the future searches
//
// if tenant_id is provided - search in a private per-tenant extension path,
// otherwise - in public extension path
//
pub async fn get_availiable_extensions(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
) -> anyhow::Result<()> {
let remote_storage = match remote_storage {
Some(remote_storage) => remote_storage,
None => return Ok(()),
};
tenant_id: Option<TenantId>,
) -> anyhow::Result<Vec<RemotePath>> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let pg_version = get_pg_version(pgbin);
match ext_type {
ExtensionType::Shared => {
// 1. Download control files from s3-bucket/public/*.control to SHAREDIR/extension
// We can do this step even before we have spec,
// because public extensions are common for all projects.
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let remote_sharedir = Path::new(&pg_version).join("share/postgresql/extension");
let remote_sharedir = RemotePath::new(Path::new(&remote_sharedir))?;
let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?;
for remote_from_path in from_paths {
if remote_from_path.extension() == Some("control") {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
}
}
}
ExtensionType::Tenant(tenant_id) => {
// 2. After we have spec, before project start
// Download control files from s3-bucket/[tenant-id]/*.control to SHAREDIR/extension
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let remote_path = RemotePath::new(Path::new(&tenant_id.to_string()))?;
let from_paths = remote_storage.list_files(Some(&remote_path)).await?;
for remote_from_path in from_paths {
if remote_from_path.extension() == Some("control") {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
}
}
}
ExtensionType::Library(library_name) => {
// 3. After we have spec, before postgres start
// Download preload_shared_libraries from s3-bucket/public/[library-name].control into LIBDIR/
let remote_sharedir = match tenant_id {
None => RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?,
Some(tenant_id) => RemotePath::new(
&Path::new(&pg_version)
.join(&tenant_id.to_string())
.join("share/postgresql/extension"),
)?,
};
let local_libdir: PathBuf = Path::new(&get_pg_config("--libdir", pgbin)).into();
let remote_path = format!("{library_name}.control");
let remote_from_path = RemotePath::new(Path::new(&remote_path))?;
download_helper(remote_storage, &remote_from_path, &local_libdir).await?;
info!(
"get_availiable_extensions remote_sharedir: {:?}, local_sharedir: {:?}",
remote_sharedir, local_sharedir
);
let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?;
// download all found control files
for remote_from_path in &from_paths {
if remote_from_path.extension() == Some("control") {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
}
}
Ok(from_paths)
}
// download all sql files for a given extension name
//
pub async fn download_extension_sql_files(
ext_name: &str,
//availiable_extensions: &Vec<RemotePath>,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
) -> Result<()> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let pg_version = get_pg_version(pgbin);
let remote_sharedir =
RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension")).unwrap();
// TODO cache availiable_extensions list on the first read to avoid unneeded s3 calls
let availiable_extensions = remote_storage.list_files(Some(&remote_sharedir)).await?;
info!(
"list of availiable_extension files {:?}",
&availiable_extensions
);
// check if extension files exist
let files_to_download: Vec<&RemotePath> = availiable_extensions
.iter()
.filter(|ext| {
ext.extension() == Some("sql") && ext.object_name().unwrap().starts_with(ext_name)
})
.collect();
if files_to_download.is_empty() {
bail!("Files for extension {ext_name} are not found in the extension store");
}
for remote_from_path in files_to_download {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
}
Ok(())
}
// download shared library file
pub async fn download_library_file(
lib_name: &str,
// availiable_libraries: &Vec<RemotePath>,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
) -> Result<()> {
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
let pg_version = get_pg_version(pgbin);
let remote_sharedir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap();
// TODO cache availiable_libraries list on the first read to avoid unneeded s3 calls
let availiable_libraries = remote_storage.list_files(Some(&remote_sharedir)).await?;
info!("list of library files {:?}", &availiable_libraries);
// check if the library file exists
let lib = availiable_libraries
.iter()
.find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name);
match lib {
None => bail!("Shared library file {lib_name} is not found in the extension store"),
Some(lib) => {
download_helper(remote_storage, &lib, &local_libdir).await?;
}
}
Ok(())
}

View File

@@ -16,7 +16,7 @@ use tokio::task;
use tracing::{error, info};
use tracing_utils::http::OtelName;
use crate::extension_server::{self, ExtensionType};
use crate::extension_server::{download_extension_sql_files, download_library_file};
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
@@ -126,25 +126,51 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
// download extension files from S3 on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
info!("req.uri {:?}", req.uri());
let mut is_library = false;
if let Some(params) = req.uri().query() {
info!("serving {:?} POST request with params: {}", route, params);
if params == "is_library=true" {
is_library = true;
} else {
let mut resp = Response::new(Body::from("Wrong request parameters"));
*resp.status_mut() = StatusCode::BAD_REQUEST;
return resp;
}
}
let filename = route.split('/').last().unwrap().to_string();
info!(
"serving /extension_server POST request, filename: {:?}",
&filename
"serving /extension_server POST request, filename: {:?} is_library: {}",
filename, is_library
);
match extension_server::download_extension(
&compute.ext_remote_storage,
ExtensionType::Library(filename),
&compute.pgbin,
)
.await
{
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("download_extension failed: {}", e);
Response::new(Body::from(e.to_string()))
if is_library {
match compute.download_library_file(filename.to_string()).await {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("library download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
}
} else {
match compute
.download_extension_sql_files(filename.to_string())
.await
{
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("extension download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
}
}
}

View File

@@ -29,9 +29,13 @@ static int extension_server_port = 0;
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
// curl -X POST http://localhost:8080/extension_server/postgis-3.so
// to download all SQL files for an extension:
// curl -X POST http://localhost:8080/extension_server/postgis
//
// to download specific library file:
// curl -X POST http://localhost:8080/extension_server/postgis-3.so?=true
static bool
neon_download_extension_file_http(const char *filename)
neon_download_extension_file_http(const char *filename, bool is_library)
{
CURL *curl;
CURLcode res;
@@ -44,7 +48,14 @@ neon_download_extension_file_http(const char *filename)
elog(ERROR, "Failed to initialize curl handle");
}
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s", extension_server_port, filename);
if (is_library)
{
elog(LOG, "request library");
}
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
extension_server_port, filename, is_library?"?is_library=true":"");
elog(LOG, "curl_easy_perform() url: %s", compute_ctl_url);
@@ -52,6 +63,7 @@ neon_download_extension_file_http(const char *filename)
curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */);
if (curl)
{
/* Perform the request, res will get the return code */

View File

@@ -11,19 +11,15 @@ from fixtures.neon_fixtures import (
)
"""
TODO:
status:
it appears that list_files on a non-existing path is bad whether you are real or mock s3 storage
1. debug real s3 tests: I think the paths were slightly different than I was expecting
2. Make sure it gracefully is sad when tenant is not found
TODO Alek:
Calling list_files on a non-existing path returns [] (expectedly) but then
causes the program to crash somehow (for both real and mock s3 storage)
stderr: command failed: unexpected compute status: Empty
3. clean up the junk I put in the bucket (one time task)
4. can we simultaneously do MOCK and REAL s3 tests, or are the env vars conflicting/
5. libs/remote_storage/src/s3_bucket.rs TODO // TODO: if bucket prefix is empty,
- real s3 tests: I think the paths were slightly different than I was expecting
- clean up the junk I put in the bucket
- libs/remote_storage/src/s3_bucket.rs TODO // TODO: if bucket prefix is empty,
the folder is prefixed with a "/" I think. Is this desired?
6. test LIBRARY extensions: maybe Anastasia already did this?
"""
@@ -87,12 +83,37 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
private_ext, env.ext_remote_storage.bucket_name, private_remote_name
)
# Rust will then download the control files from the bucket
# our rust code should obtain the same result as the following:
# env.remote_storage_client.get_object(
# Bucket=env.ext_remote_storage.bucket_name,
# Key=os.path.join(BUCKET_PREFIX, PUB_EXT_PATHS[0])
# )["Body"].read()
TEST_EXT_SQL_PATH = "v14/share/postgresql/extension/test_ext--1.0.sql"
test_ext_sql_file = BytesIO(
b"""
CREATE FUNCTION test_ext_add(integer, integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
"""
)
env.remote_storage_client.upload_fileobj(
test_ext_sql_file,
env.ext_remote_storage.bucket_name,
os.path.join(BUCKET_PREFIX, TEST_EXT_SQL_PATH),
)
# upload some fake library file
TEST_LIB_PATH = "v14/lib/test_ext.so"
test_lib_file = BytesIO(
b"""
111
"""
)
env.remote_storage_client.upload_fileobj(
test_lib_file,
env.ext_remote_storage.bucket_name,
os.path.join(BUCKET_PREFIX, TEST_LIB_PATH),
)
tenant, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_file_download", tenant_id=tenant)
region = "us-east-1"
if remote_storage_kind == RemoteStorageKind.REAL_S3:
@@ -108,17 +129,13 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
)
endpoint = env.endpoints.create_start(
"test_file_download", tenant_id=tenant_id, remote_ext_config=remote_ext_config
"test_file_download",
tenant_id=tenant,
remote_ext_config=remote_ext_config,
config_lines=["log_min_messages=debug3"],
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# example query: insert some values and select them
cur.execute("CREATE TABLE t(key int primary key, value text)")
for i in range(100):
cur.execute(f"insert into t values({i}, {2*i})")
cur.execute("select * from t")
log.info(cur.fetchall())
# Test query: check that test_ext0 was successfully downloaded
cur.execute("SELECT * FROM pg_available_extensions")
all_extensions = [x[0] for x in cur.fetchall()]
@@ -127,8 +144,24 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
assert f"test_ext{i}" in all_extensions
# assert f"private_ext{i}" in all_extensions
# TODO: can create extension actually install an extension?
# cur.execute("CREATE EXTENSION test_ext0")
cur.execute("CREATE EXTENSION test_ext")
cur.execute("SELECT extname FROM pg_extension")
all_extensions = [x[0] for x in cur.fetchall()]
log.info(all_extensions)
assert "test_ext" in all_extensions
try:
cur.execute("LOAD 'test_ext.so'")
except Exception as e:
# expected to fail with
# could not load library ... test_ext.so: file too short
# because test_ext.so is not real library file
log.info("LOAD test_ext.so failed (expectedly): %s", e)
assert "file too short" in str(e)
# TODO add more test cases:
# - try to load non-existing library
# cleanup downloaded extensions
for file in cleanup_files: