diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 4f3f606935..8b1376ef4a 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -12,11 +12,12 @@ use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool} use crate::retry::Retry; use crate::split::GetPageSplitter; use compute_api::spec::PageserverProtocol; -use pageserver_api::shard::ShardStripeSize; use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::{ShardCount, ShardIndex, ShardNumber}; +pub use pageserver_api::shard::ShardStripeSize; + /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up /// when full. /// diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 14fb3fbd5a..a9ace8cf98 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -3,4 +3,4 @@ mod pool; mod retry; mod split; -pub use client::{PageserverClient, ShardSpec}; +pub use client::{PageserverClient, ShardSpec, ShardStripeSize}; diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index aadf9b3a60..2ef82e7746 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -12,7 +12,7 @@ use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess}; use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest}; use crate::neon_request::{NeonIORequest, NeonIOResult}; use crate::worker_process::in_progress_ios::{RequestInProgressKey, RequestInProgressTable}; -use pageserver_client_grpc::{PageserverClient, ShardSpec}; +use pageserver_client_grpc::{PageserverClient, ShardSpec, ShardStripeSize}; use pageserver_page_api as page_api; use metrics::{IntCounter, IntCounterVec}; @@ -70,6 +70,7 @@ pub(super) async fn init( timeline_id: String, auth_token: Option, shard_map: HashMap, + stripe_size: Option, initial_file_cache_size: u64, file_cache_path: Option, ) -> CommunicatorWorkerProcessStruct<'static> { @@ -91,10 +92,9 @@ pub(super) async fn init( .integrated_cache_init_struct .worker_process_init(last_lsn, file_cache); - // TODO: plumb through the stripe size. let tenant_id = TenantId::from_str(&tenant_id).expect("invalid tenant ID"); let timeline_id = TimelineId::from_str(&timeline_id).expect("invalid timeline ID"); - let shard_spec = ShardSpec::new(shard_map, None).expect("invalid shard spec"); + let shard_spec = ShardSpec::new(shard_map, stripe_size).expect("invalid shard spec"); let client = PageserverClient::new(tenant_id, timeline_id, shard_spec, auth_token) .expect("could not create client"); diff --git a/pgxn/neon/communicator/src/worker_process/worker_interface.rs b/pgxn/neon/communicator/src/worker_process/worker_interface.rs index 9aaa483c9e..e873555daa 100644 --- a/pgxn/neon/communicator/src/worker_process/worker_interface.rs +++ b/pgxn/neon/communicator/src/worker_process/worker_interface.rs @@ -10,6 +10,8 @@ use crate::init::CommunicatorInitStruct; use crate::worker_process::main_loop; use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct; +use pageserver_client_grpc::ShardStripeSize; + /// Launch the communicator's tokio tasks, which do most of the work. /// /// The caller has initialized the process as a regular PostgreSQL @@ -24,6 +26,7 @@ pub extern "C" fn communicator_worker_process_launch( auth_token: *const c_char, shard_map: *mut *mut c_char, nshards: u32, + stripe_size: u32, file_cache_path: *const c_char, initial_file_cache_size: u64, ) -> &'static CommunicatorWorkerProcessStruct<'static> { @@ -63,6 +66,11 @@ pub extern "C" fn communicator_worker_process_launch( timeline_id.to_string(), auth_token, shard_map, + if stripe_size > 0 { + Some(ShardStripeSize(stripe_size)) + } else { + None + }, initial_file_cache_size, file_cache_path, )); diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index d361ff7274..3f2870621d 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -336,6 +336,7 @@ communicator_new_bgworker_main(Datum main_arg) neon_auth_token, connstrs, num_shards, + neon_stripe_size, lfc_path, file_cache_size); cis = NULL; diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index ee17b5d33b..f64e6ee233 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -80,7 +80,7 @@ int neon_protocol_version = 3; static int neon_compute_mode = 0; static int max_reconnect_attempts = 60; -static int stripe_size; +int neon_stripe_size; static int max_sockets; static int pageserver_response_log_timeout = 10000; @@ -454,10 +454,10 @@ get_shard_number(BufferTag *tag) #if PG_MAJORVERSION_NUM < 16 hash = murmurhash32(tag->rnode.relNode); - hash = hash_combine(hash, murmurhash32(tag->blockNum / stripe_size)); + hash = hash_combine(hash, murmurhash32(tag->blockNum / neon_stripe_size)); #else hash = murmurhash32(tag->relNumber); - hash = hash_combine(hash, murmurhash32(tag->blockNum / stripe_size)); + hash = hash_combine(hash, murmurhash32(tag->blockNum / neon_stripe_size)); #endif return hash % n_shards; @@ -1510,7 +1510,7 @@ pg_init_libpagestore(void) DefineCustomIntVariable("neon.stripe_size", "sharding stripe size", NULL, - &stripe_size, + &neon_stripe_size, 32768, 1, INT_MAX, PGC_SIGHUP, GUC_UNIT_BLOCKS, diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index eb3c80702e..b3c074c9ee 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -244,6 +244,7 @@ extern char *neon_timeline; extern char *neon_tenant; extern int32 max_cluster_size; extern int neon_protocol_version; +extern int neon_stripe_size; extern void get_shard_map(char ***connstrs_p, shardno_t *num_shards_p); extern shardno_t get_shard_number(BufferTag* tag);