mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-05 09:20:38 +00:00
Compare commits
12 Commits
conrad/pro
...
jcsp/storc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
41d546df84 | ||
|
|
792c8ced06 | ||
|
|
0415f28734 | ||
|
|
fcc2eb88a1 | ||
|
|
3d6ad0c42e | ||
|
|
acf6c05f7e | ||
|
|
5df352638f | ||
|
|
d8a6942e0e | ||
|
|
54d683ae07 | ||
|
|
0526603adb | ||
|
|
bb299f0229 | ||
|
|
6d631ae816 |
@@ -686,6 +686,13 @@ impl Service {
|
|||||||
// request in flight over the network: TODO handle that by making location_conf API refuse
|
// request in flight over the network: TODO handle that by making location_conf API refuse
|
||||||
// to go backward in generations.
|
// to go backward in generations.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
"Responding with {} shards to node {}",
|
||||||
|
response.tenants.len(),
|
||||||
|
reattach_req.node_id
|
||||||
|
);
|
||||||
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -870,14 +870,16 @@ async fn tenant_detach_handler(
|
|||||||
|
|
||||||
let state = get_state(&request);
|
let state = get_state(&request);
|
||||||
let conf = state.conf;
|
let conf = state.conf;
|
||||||
mgr::detach_tenant(
|
state
|
||||||
conf,
|
.tenant_manager
|
||||||
tenant_shard_id,
|
.detach_tenant(
|
||||||
detach_ignored.unwrap_or(false),
|
conf,
|
||||||
&state.deletion_queue_client,
|
tenant_shard_id,
|
||||||
)
|
detach_ignored.unwrap_or(false),
|
||||||
.instrument(info_span!("tenant_detach", %tenant_id, shard_id=%tenant_shard_id.shard_slug()))
|
&state.deletion_queue_client,
|
||||||
.await?;
|
)
|
||||||
|
.instrument(info_span!("tenant_detach", %tenant_id, shard_id=%tenant_shard_id.shard_slug()))
|
||||||
|
.await?;
|
||||||
|
|
||||||
json_response(StatusCode::OK, ())
|
json_response(StatusCode::OK, ())
|
||||||
}
|
}
|
||||||
@@ -1398,13 +1400,14 @@ async fn put_tenant_location_config_handler(
|
|||||||
// The `Detached` state is special, it doesn't upsert a tenant, it removes
|
// The `Detached` state is special, it doesn't upsert a tenant, it removes
|
||||||
// its local disk content and drops it from memory.
|
// its local disk content and drops it from memory.
|
||||||
if let LocationConfigMode::Detached = request_data.config.mode {
|
if let LocationConfigMode::Detached = request_data.config.mode {
|
||||||
if let Err(e) =
|
if let Err(e) = state
|
||||||
mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
|
.tenant_manager
|
||||||
.instrument(info_span!("tenant_detach",
|
.detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
|
||||||
tenant_id = %tenant_shard_id.tenant_id,
|
.instrument(info_span!("tenant_detach",
|
||||||
shard_id = %tenant_shard_id.shard_slug()
|
tenant_id = %tenant_shard_id.tenant_id,
|
||||||
))
|
shard_id = %tenant_shard_id.shard_slug()
|
||||||
.await
|
))
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
match e {
|
match e {
|
||||||
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
|
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
//! page server.
|
//! page server.
|
||||||
|
|
||||||
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
|
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
|
||||||
|
use futures::stream::StreamExt;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use pageserver_api::key::Key;
|
use pageserver_api::key::Key;
|
||||||
use pageserver_api::models::ShardParameters;
|
use pageserver_api::models::ShardParameters;
|
||||||
@@ -1439,8 +1440,10 @@ impl TenantManager {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: hardlink layers from the parent into the child shard directories so that they don't immediately re-download
|
// Optimization: hardlink layers from the parent into the children, so that they don't have to
|
||||||
// TODO: erase the dentries from the parent
|
// re-download & duplicate the data referenced in their initial IndexPart
|
||||||
|
self.shard_split_hardlink(parent, child_shards.clone())
|
||||||
|
.await?;
|
||||||
|
|
||||||
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
|
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
|
||||||
// child shards to reach this point.
|
// child shards to reach this point.
|
||||||
@@ -1479,10 +1482,11 @@ impl TenantManager {
|
|||||||
|
|
||||||
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
|
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
|
||||||
for child_shard_id in &child_shards {
|
for child_shard_id in &child_shards {
|
||||||
|
let child_shard_id = *child_shard_id;
|
||||||
let child_shard = {
|
let child_shard = {
|
||||||
let locked = TENANTS.read().unwrap();
|
let locked = TENANTS.read().unwrap();
|
||||||
let peek_slot =
|
let peek_slot =
|
||||||
tenant_map_peek_slot(&locked, child_shard_id, TenantSlotPeekMode::Read)?;
|
tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
|
||||||
peek_slot.and_then(|s| s.get_attached()).cloned()
|
peek_slot.and_then(|s| s.get_attached()).cloned()
|
||||||
};
|
};
|
||||||
if let Some(t) = child_shard {
|
if let Some(t) = child_shard {
|
||||||
@@ -1517,7 +1521,7 @@ impl TenantManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Phase 5: Shut down the parent shard.
|
// Phase 5: Shut down the parent shard, and erase it from disk
|
||||||
let (_guard, progress) = completion::channel();
|
let (_guard, progress) = completion::channel();
|
||||||
match parent.shutdown(progress, false).await {
|
match parent.shutdown(progress, false).await {
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
@@ -1525,6 +1529,12 @@ impl TenantManager {
|
|||||||
other.wait().await;
|
other.wait().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let local_tenant_directory = self.conf.tenant_path(&tenant_shard_id);
|
||||||
|
let tmp_path = safe_rename_tenant_dir(&local_tenant_directory)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?;
|
||||||
|
self.spawn_background_purge(tmp_path);
|
||||||
|
|
||||||
parent_slot_guard.drop_old_value()?;
|
parent_slot_guard.drop_old_value()?;
|
||||||
|
|
||||||
// Phase 6: Release the InProgress on the parent shard
|
// Phase 6: Release the InProgress on the parent shard
|
||||||
@@ -1532,6 +1542,222 @@ impl TenantManager {
|
|||||||
|
|
||||||
Ok(child_shards)
|
Ok(child_shards)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Part of [`Self::shard_split`]: hard link parent shard layers into child shards, as an optimization
|
||||||
|
/// to avoid the children downloading them again.
|
||||||
|
///
|
||||||
|
/// For each resident layer in the parent shard, we will hard link it into all of the child shards.
|
||||||
|
async fn shard_split_hardlink(
|
||||||
|
&self,
|
||||||
|
parent_shard: &Tenant,
|
||||||
|
child_shards: Vec<TenantShardId>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
debug_assert_current_span_has_tenant_id();
|
||||||
|
|
||||||
|
let parent_path = self.conf.tenant_path(parent_shard.get_tenant_shard_id());
|
||||||
|
let (parent_timelines, parent_layers) = {
|
||||||
|
let mut parent_layers = Vec::new();
|
||||||
|
let timelines = parent_shard.timelines.lock().unwrap().clone();
|
||||||
|
let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
|
||||||
|
for timeline in timelines.values() {
|
||||||
|
// let timeline_layers_stream = timeline.layers.read().await.resident_layers();
|
||||||
|
// let timeline_layers = timeline_layers_stream.collect::<Vec<_>>().await;
|
||||||
|
let timeline_layers = timeline
|
||||||
|
.layers
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.resident_layers()
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await;
|
||||||
|
for layer in timeline_layers {
|
||||||
|
let relative_path = layer
|
||||||
|
.local_path()
|
||||||
|
.strip_prefix(&parent_path)
|
||||||
|
.context("Removing prefix from parent layer path")?;
|
||||||
|
parent_layers.push(relative_path.to_owned());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(parent_timelines, parent_layers)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut child_prefixes = Vec::new();
|
||||||
|
let mut create_dirs = Vec::new();
|
||||||
|
|
||||||
|
for child in child_shards {
|
||||||
|
let child_prefix = self.conf.tenant_path(&child);
|
||||||
|
create_dirs.push(child_prefix.clone());
|
||||||
|
create_dirs.extend(
|
||||||
|
parent_timelines
|
||||||
|
.iter()
|
||||||
|
.map(|t| self.conf.timeline_path(&child, t)),
|
||||||
|
);
|
||||||
|
|
||||||
|
child_prefixes.push(child_prefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since we will do a large number of small filesystem metadata operations, batch them into
|
||||||
|
// spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
|
||||||
|
let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
|
||||||
|
for dir in &create_dirs {
|
||||||
|
if let Err(e) = std::fs::create_dir_all(dir) {
|
||||||
|
// Ignore AlreadyExists errors, drop out on all other errors
|
||||||
|
match e.kind() {
|
||||||
|
std::io::ErrorKind::AlreadyExists => {}
|
||||||
|
_ => {
|
||||||
|
return Err(anyhow::anyhow!(e).context(format!("Creating {dir}")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for child_prefix in child_prefixes {
|
||||||
|
for relative_layer in &parent_layers {
|
||||||
|
let parent_path = parent_path.join(relative_layer);
|
||||||
|
let child_path = child_prefix.join(relative_layer);
|
||||||
|
if let Err(e) = std::fs::hard_link(&parent_path, &child_path) {
|
||||||
|
match e.kind() {
|
||||||
|
std::io::ErrorKind::AlreadyExists => {}
|
||||||
|
std::io::ErrorKind::NotFound => {
|
||||||
|
tracing::info!(
|
||||||
|
"Layer {} not found during hard-linking, evicted during split?",
|
||||||
|
relative_layer
|
||||||
|
);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
return Err(anyhow::anyhow!(e).context(format!(
|
||||||
|
"Hard linking {relative_layer} into {child_prefix}"
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Durability is not required for correctness, but if we crashed during split and
|
||||||
|
// then came restarted with empty timeline dirs, it would be very inefficient to
|
||||||
|
// re-populate from remote storage.
|
||||||
|
for dir in create_dirs {
|
||||||
|
if let Err(e) = crashsafe::fsync(&dir) {
|
||||||
|
// Something removed a newly created timeline dir out from underneath us? Extremely
|
||||||
|
// unexpected, but not worth panic'ing over as this whole function is just an
|
||||||
|
// optimization.
|
||||||
|
tracing::warn!("Failed to fsync directory {dir}: {e}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(parent_layers.len())
|
||||||
|
});
|
||||||
|
|
||||||
|
match jh.await {
|
||||||
|
Ok(Ok(layer_count)) => {
|
||||||
|
tracing::info!(count = layer_count, "Hard-linked layers into child shards");
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
// This is an optimization, so we tolerate failure.
|
||||||
|
tracing::warn!("Error hard-linking layers, proceeding anyway: {e}")
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
// This is something totally unexpected like a panic, so bail out.
|
||||||
|
anyhow::bail!("Error joining hard linking task: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// When we have moved a tenant's content to a temporary directory, we may delete it lazily in
|
||||||
|
/// the background, and thereby avoid blocking any API requests on this deletion completing.
|
||||||
|
fn spawn_background_purge(&self, tmp_path: Utf8PathBuf) {
|
||||||
|
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
|
||||||
|
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
|
||||||
|
let task_tenant_id = None;
|
||||||
|
|
||||||
|
task_mgr::spawn(
|
||||||
|
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||||
|
TaskKind::MgmtRequest,
|
||||||
|
task_tenant_id,
|
||||||
|
None,
|
||||||
|
"tenant_files_delete",
|
||||||
|
false,
|
||||||
|
async move {
|
||||||
|
fs::remove_dir_all(tmp_path.as_path())
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("tenant directory {:?} deletion", tmp_path))
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn detach_tenant(
|
||||||
|
&self,
|
||||||
|
conf: &'static PageServerConf,
|
||||||
|
tenant_shard_id: TenantShardId,
|
||||||
|
detach_ignored: bool,
|
||||||
|
deletion_queue_client: &DeletionQueueClient,
|
||||||
|
) -> Result<(), TenantStateError> {
|
||||||
|
let tmp_path = self
|
||||||
|
.detach_tenant0(
|
||||||
|
conf,
|
||||||
|
&TENANTS,
|
||||||
|
tenant_shard_id,
|
||||||
|
detach_ignored,
|
||||||
|
deletion_queue_client,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
self.spawn_background_purge(tmp_path);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn detach_tenant0(
|
||||||
|
&self,
|
||||||
|
conf: &'static PageServerConf,
|
||||||
|
tenants: &std::sync::RwLock<TenantsMap>,
|
||||||
|
tenant_shard_id: TenantShardId,
|
||||||
|
detach_ignored: bool,
|
||||||
|
deletion_queue_client: &DeletionQueueClient,
|
||||||
|
) -> Result<Utf8PathBuf, TenantStateError> {
|
||||||
|
let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
|
||||||
|
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
|
||||||
|
safe_rename_tenant_dir(&local_tenant_directory)
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!("local tenant directory {local_tenant_directory:?} rename")
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let removal_result = remove_tenant_from_memory(
|
||||||
|
tenants,
|
||||||
|
tenant_shard_id,
|
||||||
|
tenant_dir_rename_operation(tenant_shard_id),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Flush pending deletions, so that they have a good chance of passing validation
|
||||||
|
// before this tenant is potentially re-attached elsewhere.
|
||||||
|
deletion_queue_client.flush_advisory();
|
||||||
|
|
||||||
|
// Ignored tenants are not present in memory and will bail the removal from memory operation.
|
||||||
|
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
|
||||||
|
if detach_ignored
|
||||||
|
&& matches!(
|
||||||
|
removal_result,
|
||||||
|
Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
|
||||||
|
)
|
||||||
|
{
|
||||||
|
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
|
||||||
|
if tenant_ignore_mark.exists() {
|
||||||
|
info!("Detaching an ignored tenant");
|
||||||
|
let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!("Ignored tenant {tenant_shard_id} local directory rename")
|
||||||
|
})?;
|
||||||
|
return Ok(tmp_path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
removal_result
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
@@ -1733,87 +1959,6 @@ pub(crate) enum TenantStateError {
|
|||||||
Other(#[from] anyhow::Error),
|
Other(#[from] anyhow::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn detach_tenant(
|
|
||||||
conf: &'static PageServerConf,
|
|
||||||
tenant_shard_id: TenantShardId,
|
|
||||||
detach_ignored: bool,
|
|
||||||
deletion_queue_client: &DeletionQueueClient,
|
|
||||||
) -> Result<(), TenantStateError> {
|
|
||||||
let tmp_path = detach_tenant0(
|
|
||||||
conf,
|
|
||||||
&TENANTS,
|
|
||||||
tenant_shard_id,
|
|
||||||
detach_ignored,
|
|
||||||
deletion_queue_client,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
|
|
||||||
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
|
|
||||||
let task_tenant_id = None;
|
|
||||||
task_mgr::spawn(
|
|
||||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
|
||||||
TaskKind::MgmtRequest,
|
|
||||||
task_tenant_id,
|
|
||||||
None,
|
|
||||||
"tenant_files_delete",
|
|
||||||
false,
|
|
||||||
async move {
|
|
||||||
fs::remove_dir_all(tmp_path.as_path())
|
|
||||||
.await
|
|
||||||
.with_context(|| format!("tenant directory {:?} deletion", tmp_path))
|
|
||||||
},
|
|
||||||
);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn detach_tenant0(
|
|
||||||
conf: &'static PageServerConf,
|
|
||||||
tenants: &std::sync::RwLock<TenantsMap>,
|
|
||||||
tenant_shard_id: TenantShardId,
|
|
||||||
detach_ignored: bool,
|
|
||||||
deletion_queue_client: &DeletionQueueClient,
|
|
||||||
) -> Result<Utf8PathBuf, TenantStateError> {
|
|
||||||
let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
|
|
||||||
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
|
|
||||||
safe_rename_tenant_dir(&local_tenant_directory)
|
|
||||||
.await
|
|
||||||
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
|
|
||||||
};
|
|
||||||
|
|
||||||
let removal_result = remove_tenant_from_memory(
|
|
||||||
tenants,
|
|
||||||
tenant_shard_id,
|
|
||||||
tenant_dir_rename_operation(tenant_shard_id),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// Flush pending deletions, so that they have a good chance of passing validation
|
|
||||||
// before this tenant is potentially re-attached elsewhere.
|
|
||||||
deletion_queue_client.flush_advisory();
|
|
||||||
|
|
||||||
// Ignored tenants are not present in memory and will bail the removal from memory operation.
|
|
||||||
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
|
|
||||||
if detach_ignored
|
|
||||||
&& matches!(
|
|
||||||
removal_result,
|
|
||||||
Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
|
|
||||||
)
|
|
||||||
{
|
|
||||||
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
|
|
||||||
if tenant_ignore_mark.exists() {
|
|
||||||
info!("Detaching an ignored tenant");
|
|
||||||
let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
|
|
||||||
.await
|
|
||||||
.with_context(|| {
|
|
||||||
format!("Ignored tenant {tenant_shard_id} local directory rename")
|
|
||||||
})?;
|
|
||||||
return Ok(tmp_path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
removal_result
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn load_tenant(
|
pub(crate) async fn load_tenant(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
tenant_id: TenantId,
|
tenant_id: TenantId,
|
||||||
|
|||||||
@@ -302,6 +302,15 @@ class PageserverHttpClient(requests.Session):
|
|||||||
)
|
)
|
||||||
self.verbose_error(res)
|
self.verbose_error(res)
|
||||||
|
|
||||||
|
def tenant_list_locations(self):
|
||||||
|
res = self.get(
|
||||||
|
f"http://localhost:{self.port}/v1/location_config",
|
||||||
|
)
|
||||||
|
self.verbose_error(res)
|
||||||
|
res_json = res.json()
|
||||||
|
assert isinstance(res_json["tenant_shards"], list)
|
||||||
|
return res_json
|
||||||
|
|
||||||
def tenant_delete(self, tenant_id: Union[TenantId, TenantShardId]):
|
def tenant_delete(self, tenant_id: Union[TenantId, TenantShardId]):
|
||||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||||
self.verbose_error(res)
|
self.verbose_error(res)
|
||||||
|
|||||||
@@ -194,6 +194,18 @@ def test_sharding_split_smoke(
|
|||||||
|
|
||||||
assert len(pre_split_pageserver_ids) == 4
|
assert len(pre_split_pageserver_ids) == 4
|
||||||
|
|
||||||
|
def shards_on_disk(shard_ids):
|
||||||
|
for pageserver in env.pageservers:
|
||||||
|
for shard_id in shard_ids:
|
||||||
|
if pageserver.tenant_dir(shard_id).exists():
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
old_shard_ids = [TenantShardId(tenant_id, i, shard_count) for i in range(0, shard_count)]
|
||||||
|
# Before split, old shards exist
|
||||||
|
assert shards_on_disk(old_shard_ids)
|
||||||
|
|
||||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||||
|
|
||||||
post_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
|
post_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
|
||||||
@@ -202,6 +214,9 @@ def test_sharding_split_smoke(
|
|||||||
assert len(set(post_split_pageserver_ids)) == shard_count
|
assert len(set(post_split_pageserver_ids)) == shard_count
|
||||||
assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids)
|
assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids)
|
||||||
|
|
||||||
|
# The old parent shards should no longer exist on disk
|
||||||
|
assert not shards_on_disk(old_shard_ids)
|
||||||
|
|
||||||
workload.validate()
|
workload.validate()
|
||||||
|
|
||||||
workload.churn_rows(256)
|
workload.churn_rows(256)
|
||||||
@@ -213,11 +228,6 @@ def test_sharding_split_smoke(
|
|||||||
all_shards = tenant_get_shards(env, tenant_id)
|
all_shards = tenant_get_shards(env, tenant_id)
|
||||||
for tenant_shard_id, pageserver in all_shards:
|
for tenant_shard_id, pageserver in all_shards:
|
||||||
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
|
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
|
||||||
|
|
||||||
# Restart all nodes, to check that the newly created shards are durable
|
|
||||||
for ps in env.pageservers:
|
|
||||||
ps.restart()
|
|
||||||
|
|
||||||
workload.validate()
|
workload.validate()
|
||||||
|
|
||||||
migrate_to_pageserver_ids = list(
|
migrate_to_pageserver_ids = list(
|
||||||
@@ -240,3 +250,27 @@ def test_sharding_split_smoke(
|
|||||||
env.neon_cli.tenant_migrate(migrate_shard, destination, timeout_secs=10)
|
env.neon_cli.tenant_migrate(migrate_shard, destination, timeout_secs=10)
|
||||||
|
|
||||||
workload.validate()
|
workload.validate()
|
||||||
|
|
||||||
|
# Validate pageserver state
|
||||||
|
shards_exist: list[TenantShardId] = []
|
||||||
|
for pageserver in env.pageservers:
|
||||||
|
locations = pageserver.http_client().tenant_list_locations()
|
||||||
|
shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"])
|
||||||
|
|
||||||
|
log.info("Shards after split: {shards_exist}")
|
||||||
|
assert len(shards_exist) == split_shard_count
|
||||||
|
|
||||||
|
# Ensure post-split pageserver locations survive a restart (i.e. the child shards
|
||||||
|
# correctly wrote config to disk, and the storage controller responds correctly
|
||||||
|
# to /re-attach)
|
||||||
|
for pageserver in env.pageservers:
|
||||||
|
pageserver.stop()
|
||||||
|
pageserver.start()
|
||||||
|
|
||||||
|
shards_exist = []
|
||||||
|
for pageserver in env.pageservers:
|
||||||
|
locations = pageserver.http_client().tenant_list_locations()
|
||||||
|
shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"])
|
||||||
|
|
||||||
|
log.info("Shards after restart: {shards_exist}")
|
||||||
|
assert len(shards_exist) == split_shard_count
|
||||||
|
|||||||
@@ -108,6 +108,20 @@ def test_sharding_service_smoke(
|
|||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
assert get_node_shard_counts(env, tenant_ids)[env.pageservers[0].id] == 0
|
assert get_node_shard_counts(env, tenant_ids)[env.pageservers[0].id] == 0
|
||||||
|
|
||||||
|
# Restarting a pageserver should not detach any tenants (i.e. /re-attach works)
|
||||||
|
before_restart = env.pageservers[1].http_client().tenant_list_locations()
|
||||||
|
env.pageservers[1].stop()
|
||||||
|
env.pageservers[1].start()
|
||||||
|
after_restart = env.pageservers[1].http_client().tenant_list_locations()
|
||||||
|
assert len(after_restart) == len(before_restart)
|
||||||
|
|
||||||
|
# Locations should be the same before & after restart, apart from generations
|
||||||
|
for _shard_id, tenant in after_restart["tenant_shards"]:
|
||||||
|
del tenant["generation"]
|
||||||
|
for _shard_id, tenant in before_restart["tenant_shards"]:
|
||||||
|
del tenant["generation"]
|
||||||
|
assert before_restart == after_restart
|
||||||
|
|
||||||
# Delete all the tenants
|
# Delete all the tenants
|
||||||
for tid in tenant_ids:
|
for tid in tenant_ids:
|
||||||
tenant_delete_wait_completed(env.attachment_service.pageserver_api(), tid, 10)
|
tenant_delete_wait_completed(env.attachment_service.pageserver_api(), tid, 10)
|
||||||
|
|||||||
Reference in New Issue
Block a user