mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 18:40:38 +00:00
Compare commits
8 Commits
jcsp/shard
...
sasha_dont
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c632f490f | ||
|
|
2a9ffa4686 | ||
|
|
3afe386d86 | ||
|
|
7fab731f65 | ||
|
|
483caa22c6 | ||
|
|
da5e03b0d8 | ||
|
|
be885370f6 | ||
|
|
bc1020f965 |
@@ -274,7 +274,13 @@ fn main() -> Result<()> {
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
state.error = Some(format!("{:?}", err));
|
||||
state.status = ComputeStatus::Failed;
|
||||
drop(state);
|
||||
// Notify others that Postgres failed to start. In case of configuring the
|
||||
// empty compute, it's likely that API handler is still waiting for compute
|
||||
// state change. With this we will notify it that compute is in Failed state,
|
||||
// so control plane will know about it earlier and record proper error instead
|
||||
// of timeout.
|
||||
compute.state_changed.notify_all();
|
||||
drop(state); // unlock
|
||||
delay_exit = true;
|
||||
None
|
||||
}
|
||||
|
||||
@@ -709,6 +709,26 @@ async fn tenant_detach_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_reset_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let state = get_state(&request);
|
||||
state
|
||||
.tenant_manager
|
||||
.reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_load_handler(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -1828,6 +1848,9 @@ pub fn make_router(
|
||||
.post("/v1/tenant/:tenant_id/detach", |r| {
|
||||
api_handler(r, tenant_detach_handler)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_shard_id/reset", |r| {
|
||||
api_handler(r, tenant_reset_handler)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/load", |r| {
|
||||
api_handler(r, tenant_load_handler)
|
||||
})
|
||||
|
||||
@@ -205,7 +205,7 @@ async fn timed<Fut: std::future::Future>(
|
||||
match tokio::time::timeout(warn_at, &mut fut).await {
|
||||
Ok(ret) => {
|
||||
tracing::info!(
|
||||
task = name,
|
||||
stage = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
"completed"
|
||||
);
|
||||
@@ -213,7 +213,7 @@ async fn timed<Fut: std::future::Future>(
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::info!(
|
||||
task = name,
|
||||
stage = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
"still waiting, taking longer than expected..."
|
||||
);
|
||||
@@ -222,7 +222,7 @@ async fn timed<Fut: std::future::Future>(
|
||||
|
||||
// this has a global allowed_errors
|
||||
tracing::warn!(
|
||||
task = name,
|
||||
stage = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
"completed, took longer than expected"
|
||||
);
|
||||
|
||||
@@ -2515,7 +2515,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
info!("persisting tenantconf to {config_path}");
|
||||
debug!("persisting tenantconf to {config_path}");
|
||||
|
||||
let mut conf_content = r#"# This file contains a specific per-tenant's config.
|
||||
# It is read in case of pageserver restart.
|
||||
@@ -2550,7 +2550,7 @@ impl Tenant {
|
||||
target_config_path: &Utf8Path,
|
||||
tenant_conf: &TenantConfOpt,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("persisting tenantconf to {target_config_path}");
|
||||
debug!("persisting tenantconf to {target_config_path}");
|
||||
|
||||
let mut conf_content = r#"# This file contains a specific per-tenant's config.
|
||||
# It is read in case of pageserver restart.
|
||||
|
||||
@@ -270,49 +270,6 @@ async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<U
|
||||
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
|
||||
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
|
||||
|
||||
/// Create a directory, including parents. This does no fsyncs and makes
|
||||
/// no guarantees about the persistence of the resulting metadata: for
|
||||
/// use when creating dirs for use as cache.
|
||||
async fn unsafe_create_dir_all(path: &Utf8PathBuf) -> std::io::Result<()> {
|
||||
let mut dirs_to_create = Vec::new();
|
||||
let mut path: &Utf8Path = path.as_ref();
|
||||
|
||||
// Figure out which directories we need to create.
|
||||
loop {
|
||||
let meta = tokio::fs::metadata(path).await;
|
||||
match meta {
|
||||
Ok(metadata) if metadata.is_dir() => break,
|
||||
Ok(_) => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::AlreadyExists,
|
||||
format!("non-directory found in path: {path}"),
|
||||
));
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
|
||||
dirs_to_create.push(path);
|
||||
|
||||
match path.parent() {
|
||||
Some(parent) => path = parent,
|
||||
None => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
format!("can't find parent of path '{path}'"),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create directories from parent to child.
|
||||
for &path in dirs_to_create.iter().rev() {
|
||||
tokio::fs::create_dir(path).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// The TenantManager is responsible for storing and mutating the collection of all tenants
|
||||
/// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
|
||||
/// lives inside the TenantManager.
|
||||
@@ -646,7 +603,13 @@ pub(crate) fn tenant_spawn(
|
||||
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
|
||||
);
|
||||
|
||||
info!("Attaching tenant {tenant_shard_id}");
|
||||
info!(
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug(),
|
||||
generation = ?location_conf.location.generation,
|
||||
attach_mode = ?location_conf.location.attach_mode,
|
||||
"Attaching tenant"
|
||||
);
|
||||
let tenant = match Tenant::spawn(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
@@ -1035,7 +998,7 @@ impl TenantManager {
|
||||
LocationMode::Secondary(_) => {
|
||||
// Directory doesn't need to be fsync'd because if we crash it can
|
||||
// safely be recreated next time this tenant location is configured.
|
||||
unsafe_create_dir_all(&tenant_path)
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {tenant_path}"))?;
|
||||
|
||||
@@ -1051,7 +1014,7 @@ impl TenantManager {
|
||||
// Directory doesn't need to be fsync'd because we do not depend on
|
||||
// it to exist after crashes: it may be recreated when tenant is
|
||||
// re-attached, see https://github.com/neondatabase/neon/issues/5550
|
||||
unsafe_create_dir_all(&timelines_path)
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {timelines_path}"))?;
|
||||
|
||||
@@ -1081,6 +1044,81 @@ impl TenantManager {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
|
||||
/// LocationConf that was last used to attach it. Optionally, the local file cache may be
|
||||
/// dropped before re-attaching.
|
||||
///
|
||||
/// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
|
||||
/// where an issue is identified that would go away with a restart of the tenant.
|
||||
///
|
||||
/// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
|
||||
/// to respect the cancellation tokens used in normal shutdown().
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
|
||||
pub(crate) async fn reset_tenant(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
drop_cache: bool,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
let Some(old_slot) = slot_guard.get_old_value() else {
|
||||
anyhow::bail!("Tenant not found when trying to reset");
|
||||
};
|
||||
|
||||
let Some(tenant) = old_slot.get_attached() else {
|
||||
slot_guard.revert();
|
||||
anyhow::bail!("Tenant is not in attached state");
|
||||
};
|
||||
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {
|
||||
slot_guard.drop_old_value()?;
|
||||
}
|
||||
Err(_barrier) => {
|
||||
slot_guard.revert();
|
||||
anyhow::bail!("Cannot reset Tenant, already shutting down");
|
||||
}
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
|
||||
|
||||
if drop_cache {
|
||||
tracing::info!("Dropping local file cache");
|
||||
|
||||
match tokio::fs::read_dir(&timelines_path).await {
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to list timelines while dropping cache: {}", e);
|
||||
}
|
||||
Ok(mut entries) => {
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
tokio::fs::remove_dir_all(entry.path()).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let shard_identity = config.shard;
|
||||
let tenant = tenant_spawn(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&tenant_path,
|
||||
self.resources.clone(),
|
||||
AttachedTenantConf::try_from(config)?,
|
||||
shard_identity,
|
||||
None,
|
||||
self.tenants,
|
||||
SpawnMode::Normal,
|
||||
&ctx,
|
||||
)?;
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -1289,8 +1327,7 @@ pub(crate) async fn delete_tenant(
|
||||
// See https://github.com/neondatabase/neon/issues/5080
|
||||
|
||||
// TODO(sharding): make delete API sharding-aware
|
||||
let mut slot_guard =
|
||||
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
|
||||
let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
|
||||
|
||||
// unwrap is safe because we used MustExist mode when acquiring
|
||||
let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
|
||||
@@ -1617,9 +1654,10 @@ pub enum TenantSlotUpsertError {
|
||||
MapState(#[from] TenantMapError),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum TenantSlotDropError {
|
||||
/// It is only legal to drop a TenantSlot if its contents are fully shut down
|
||||
#[error("Tenant was not shut down")]
|
||||
NotShutdown,
|
||||
}
|
||||
|
||||
@@ -1679,9 +1717,9 @@ impl SlotGuard {
|
||||
}
|
||||
}
|
||||
|
||||
/// Take any value that was present in the slot before we acquired ownership
|
||||
/// Get any value that was present in the slot before we acquired ownership
|
||||
/// of it: in state transitions, this will be the old state.
|
||||
fn get_old_value(&mut self) -> &Option<TenantSlot> {
|
||||
fn get_old_value(&self) -> &Option<TenantSlot> {
|
||||
&self.old_value
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "c.h"
|
||||
@@ -37,6 +36,8 @@
|
||||
#include "walproposer.h"
|
||||
#include "neon_utils.h"
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#define PageStoreTrace DEBUG5
|
||||
|
||||
#define RECONNECT_INTERVAL_USEC 1000000
|
||||
@@ -69,7 +70,7 @@ int max_reconnect_attempts = 60;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
LWLockId lock;
|
||||
pthread_rwlock_t lock;
|
||||
pg_atomic_uint64 update_counter;
|
||||
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
|
||||
} PagestoreShmemState;
|
||||
@@ -105,10 +106,10 @@ AssignPageserverConnstring(const char *newval, void *extra)
|
||||
{
|
||||
if(!PagestoreShmemIsValid())
|
||||
return;
|
||||
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
|
||||
pthread_rwlock_wrlock(&pagestore_shared->lock);
|
||||
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
|
||||
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
|
||||
LWLockRelease(pagestore_shared->lock);
|
||||
pthread_rwlock_unlock(&pagestore_shared->lock);
|
||||
}
|
||||
|
||||
static bool
|
||||
@@ -119,15 +120,24 @@ CheckConnstringUpdated()
|
||||
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
|
||||
}
|
||||
|
||||
static void
|
||||
/* Returns true if the connstring has changed and false if not */
|
||||
static bool
|
||||
ReloadConnstring()
|
||||
{
|
||||
if(!PagestoreShmemIsValid())
|
||||
return;
|
||||
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
|
||||
return false;
|
||||
pthread_rwlock_rdlock(&pagestore_shared->lock);
|
||||
|
||||
if(strcmp(local_pageserver_connstring, pagestore_shared->pageserver_connstring) == 0)
|
||||
{
|
||||
pthread_rwlock_unlock(&pagestore_shared->lock);
|
||||
return false;
|
||||
}
|
||||
|
||||
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
|
||||
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
|
||||
LWLockRelease(pagestore_shared->lock);
|
||||
pthread_rwlock_unlock(&pagestore_shared->lock);
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool
|
||||
@@ -290,7 +300,6 @@ pageserver_disconnect(void)
|
||||
*/
|
||||
if (connected)
|
||||
{
|
||||
neon_log(LOG, "dropping connection to page server due to error");
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
connected = false;
|
||||
@@ -311,8 +320,12 @@ pageserver_send(NeonRequest * request)
|
||||
|
||||
if(CheckConnstringUpdated())
|
||||
{
|
||||
pageserver_disconnect();
|
||||
ReloadConnstring();
|
||||
bool should_disconnect = ReloadConnstring();
|
||||
if(should_disconnect)
|
||||
{
|
||||
neon_log(LOG, "pageserver_send disconnect because connstring changed");
|
||||
pageserver_disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
/* If the connection was lost for some reason, reconnect */
|
||||
@@ -484,7 +497,12 @@ PagestoreShmemInit(void)
|
||||
&found);
|
||||
if(!found)
|
||||
{
|
||||
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
|
||||
pthread_rwlockattr_t attr;
|
||||
pthread_rwlockattr_init(&attr);
|
||||
pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
|
||||
pthread_rwlock_init(&pagestore_shared->lock, &attr);
|
||||
pthread_rwlockattr_destroy(&attr);
|
||||
|
||||
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
|
||||
AssignPageserverConnstring(page_server_connstring, NULL);
|
||||
}
|
||||
|
||||
@@ -59,6 +59,7 @@
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/fsm_internals.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "storage/md.h"
|
||||
#include "pgstat.h"
|
||||
@@ -2722,6 +2723,86 @@ smgr_init_neon(void)
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, XLogRecPtr end_recptr)
|
||||
{
|
||||
BlockNumber relsize;
|
||||
/* Extend the relation if we know its size */
|
||||
if (get_cached_relsize(rinfo, forknum, &relsize))
|
||||
{
|
||||
if (relsize < blkno + 1)
|
||||
{
|
||||
update_cached_relsize(rinfo, forknum, blkno + 1);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Size was not cached. We populate the cache now, with the size of the
|
||||
* relation measured after this WAL record is applied.
|
||||
*
|
||||
* This length is later reused when we open the smgr to read the block,
|
||||
* which is fine and expected.
|
||||
*/
|
||||
|
||||
NeonResponse *response;
|
||||
NeonNblocksResponse *nbresponse;
|
||||
NeonNblocksRequest request = {
|
||||
.req = (NeonRequest) {
|
||||
.lsn = end_recptr,
|
||||
.latest = false,
|
||||
.tag = T_NeonNblocksRequest,
|
||||
},
|
||||
.rinfo = rinfo,
|
||||
.forknum = forknum,
|
||||
};
|
||||
|
||||
response = page_server_request(&request);
|
||||
|
||||
Assert(response->tag == T_NeonNblocksResponse);
|
||||
nbresponse = (NeonNblocksResponse *) response;
|
||||
|
||||
relsize = Max(nbresponse->n_blocks, blkno+1);
|
||||
|
||||
set_cached_relsize(rinfo, forknum, relsize);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
|
||||
elog(SmgrTrace, "Set length to %d", relsize);
|
||||
}
|
||||
}
|
||||
|
||||
#define FSM_TREE_DEPTH ((SlotsPerFSMPage >= 1626) ? 3 : 4)
|
||||
|
||||
/*
|
||||
* TODO: May be it is better to make correspondent fgunctio from freespace.c public?
|
||||
*/
|
||||
static BlockNumber
|
||||
get_fsm_physical_block(BlockNumber heapblk)
|
||||
{
|
||||
BlockNumber pages;
|
||||
int leafno;
|
||||
int l;
|
||||
|
||||
/*
|
||||
* Calculate the logical page number of the first leaf page below the
|
||||
* given page.
|
||||
*/
|
||||
leafno = heapblk / SlotsPerFSMPage;
|
||||
|
||||
/* Count upper level nodes required to address the leaf page */
|
||||
pages = 0;
|
||||
for (l = 0; l < FSM_TREE_DEPTH; l++)
|
||||
{
|
||||
pages += leafno + 1;
|
||||
leafno /= SlotsPerFSMPage;
|
||||
}
|
||||
|
||||
/* Turn the page count into 0-based block number */
|
||||
return pages - 1;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Return whether we can skip the redo for this block.
|
||||
*
|
||||
@@ -2769,7 +2850,6 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
LWLock *partitionLock;
|
||||
Buffer buffer;
|
||||
bool no_redo_needed;
|
||||
BlockNumber relsize;
|
||||
|
||||
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
|
||||
return true;
|
||||
@@ -2819,49 +2899,10 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
|
||||
LWLockRelease(partitionLock);
|
||||
|
||||
/* Extend the relation if we know its size */
|
||||
if (get_cached_relsize(rinfo, forknum, &relsize))
|
||||
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
|
||||
if (forknum == MAIN_FORKNUM)
|
||||
{
|
||||
if (relsize < blkno + 1)
|
||||
{
|
||||
update_cached_relsize(rinfo, forknum, blkno + 1);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
}
|
||||
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Size was not cached. We populate the cache now, with the size of the
|
||||
* relation measured after this WAL record is applied.
|
||||
*
|
||||
* This length is later reused when we open the smgr to read the block,
|
||||
* which is fine and expected.
|
||||
*/
|
||||
|
||||
NeonResponse *response;
|
||||
NeonNblocksResponse *nbresponse;
|
||||
NeonNblocksRequest request = {
|
||||
.req = (NeonRequest) {
|
||||
.lsn = end_recptr,
|
||||
.latest = false,
|
||||
.tag = T_NeonNblocksRequest,
|
||||
},
|
||||
.rinfo = rinfo,
|
||||
.forknum = forknum,
|
||||
};
|
||||
|
||||
response = page_server_request(&request);
|
||||
|
||||
Assert(response->tag == T_NeonNblocksResponse);
|
||||
nbresponse = (NeonNblocksResponse *) response;
|
||||
|
||||
Assert(nbresponse->n_blocks > blkno);
|
||||
|
||||
set_cached_relsize(rinfo, forknum, nbresponse->n_blocks);
|
||||
SetLastWrittenLSNForRelation(end_recptr, rinfo, forknum);
|
||||
|
||||
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
|
||||
}
|
||||
|
||||
return no_redo_needed;
|
||||
}
|
||||
|
||||
@@ -260,6 +260,14 @@ class PageserverHttpClient(requests.Session):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_reset(self, tenant_id: TenantId, drop_cache: bool):
|
||||
params = {}
|
||||
if drop_cache:
|
||||
params["drop_cache"] = "true"
|
||||
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/reset", params=params)
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_delete(self, tenant_id: TenantId):
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -60,7 +60,10 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
execute("SELECT count(*) FROM foo")
|
||||
assert fetchone() == (100000,)
|
||||
|
||||
endpoint.reconfigure(pageserver_id=alt_pageserver_id)
|
||||
# Reconfigure it using the same connstring just to make sure nothing breaks
|
||||
# as we have special handling for if the connstring doesn't change
|
||||
for _ in range(5):
|
||||
endpoint.reconfigure(pageserver_id=alt_pageserver_id)
|
||||
|
||||
# Verify that the neon.pageserver_connstring GUC is set to the correct thing
|
||||
execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
|
||||
|
||||
29
test_runner/regress/test_physical_replication.py
Normal file
29
test_runner/regress/test_physical_replication.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import random
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_physical_replication(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
n_records = 100000
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
) as primary:
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute(
|
||||
"CREATE TABLE t(pk bigint primary key, payload text default repeat('?',200))"
|
||||
)
|
||||
time.sleep(1)
|
||||
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
for pk in range(n_records):
|
||||
p_cur.execute("insert into t (pk) values (%s)", (pk,))
|
||||
s_cur.execute(
|
||||
"select * from t where pk=%s", (random.randrange(1, n_records),)
|
||||
)
|
||||
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
import enum
|
||||
import random
|
||||
import time
|
||||
from threading import Thread
|
||||
@@ -51,11 +52,20 @@ def do_gc_target(
|
||||
log.info("gc http thread returning")
|
||||
|
||||
|
||||
class ReattachMode(str, enum.Enum):
|
||||
REATTACH_EXPLICIT = "explicit"
|
||||
REATTACH_RESET = "reset"
|
||||
REATTACH_RESET_DROP = "reset"
|
||||
|
||||
|
||||
# Basic detach and re-attach test
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||
@pytest.mark.parametrize(
|
||||
"mode",
|
||||
[ReattachMode.REATTACH_EXPLICIT, ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP],
|
||||
)
|
||||
def test_tenant_reattach(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, mode: str
|
||||
):
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
@@ -100,8 +110,15 @@ def test_tenant_reattach(
|
||||
ps_metrics.query_one("pageserver_last_record_lsn", filter=tenant_metric_filter).value
|
||||
)
|
||||
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
if mode == ReattachMode.REATTACH_EXPLICIT:
|
||||
# Explicitly detach then attach the tenant as two separate API calls
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
elif mode in (ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP):
|
||||
# Use the reset API to detach/attach in one shot
|
||||
pageserver_http.tenant_reset(tenant_id, mode == ReattachMode.REATTACH_RESET_DROP)
|
||||
else:
|
||||
raise NotImplementedError(mode)
|
||||
|
||||
time.sleep(1) # for metrics propagation
|
||||
|
||||
|
||||
Reference in New Issue
Block a user