feat: add Txn for pg kv backend (#5266)

* feat: txn for pg kv backend

* chore: clippy

* fix: txn uses one client

* test: clean up and txn test

* test: clean up

* test: change lock_id to avoid conflict in test

* test: use different prefix in pg election test

* fix(test): just a fix

* test: aggregate multiple test to avoid concurrency problem

* test: use uuid instead of rng

* perf: batch cmp in txn

* perf: batch same op in txn
This commit is contained in:
Yohan Wal
2025-01-06 11:29:09 +08:00
committed by GitHub
parent 69d9a2845f
commit 513569ed5d
12 changed files with 800 additions and 369 deletions

1
Cargo.lock generated
View File

@@ -6600,6 +6600,7 @@ dependencies = [
"tracing-subscriber",
"typetag",
"url",
"uuid",
]
[[package]]

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[features]
pg_kvbackend = ["common-meta/pg_kvbackend"]
[lints]
workspace = true

View File

@@ -22,6 +22,9 @@ use clap::Parser;
use common_error::ext::BoxedError;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
#[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::postgres::PgStore;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_telemetry::info;
@@ -55,18 +58,32 @@ where
#[derive(Debug, Default, Parser)]
pub struct BenchTableMetadataCommand {
#[clap(long)]
etcd_addr: String,
etcd_addr: Option<String>,
#[cfg(feature = "pg_kvbackend")]
#[clap(long)]
postgres_addr: Option<String>,
#[clap(long)]
count: u32,
}
impl BenchTableMetadataCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128)
.await
.unwrap();
let kv_backend = if let Some(etcd_addr) = &self.etcd_addr {
info!("Using etcd as kv backend");
EtcdStore::with_endpoints([etcd_addr], 128).await.unwrap()
} else {
Arc::new(MemoryKvBackend::new())
};
let table_metadata_manager = Arc::new(TableMetadataManager::new(etcd_store));
#[cfg(feature = "pg_kvbackend")]
let kv_backend = if let Some(postgres_addr) = &self.postgres_addr {
info!("Using postgres as kv backend");
PgStore::with_url(postgres_addr, 128).await.unwrap()
} else {
kv_backend
};
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
let tool = BenchTableMetadata {
table_metadata_manager,

View File

@@ -683,6 +683,16 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to {} Postgres transaction", operation))]
PostgresTransaction {
#[snafu(source)]
error: tokio_postgres::Error,
#[snafu(implicit)]
location: Location,
operation: String,
},
#[snafu(display(
"Datanode table info not found, table id: {}, datanode id: {}",
table_id,
@@ -794,9 +804,10 @@ impl ErrorExt for Error {
| EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
#[cfg(feature = "pg_kvbackend")]
PostgresExecution { .. } | CreatePostgresPool { .. } | GetPostgresConnection { .. } => {
StatusCode::Internal
}
PostgresExecution { .. }
| CreatePostgresPool { .. }
| GetPostgresConnection { .. }
| PostgresTransaction { .. } => StatusCode::Internal,
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
}
}

View File

@@ -542,6 +542,8 @@ mod tests {
prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less,
test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op,
unprepare_kv,
};
@@ -628,4 +630,17 @@ mod tests {
test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await;
}
}
#[tokio::test]
async fn test_etcd_txn() {
if let Some(kv_backend) = build_kv_backend().await {
let kv_backend_ref = Arc::new(kv_backend);
test_txn_one_compare_op(kv_backend_ref.clone()).await;
text_txn_multi_compare_op(kv_backend_ref.clone()).await;
test_txn_compare_equal(kv_backend_ref.clone()).await;
test_txn_compare_greater(kv_backend_ref.clone()).await;
test_txn_compare_less(kv_backend_ref.clone()).await;
test_txn_compare_not_equal(kv_backend_ref).await;
}
}
}

View File

@@ -325,7 +325,9 @@ mod tests {
use crate::error::Error;
use crate::kv_backend::test::{
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, test_txn_compare_equal,
test_txn_compare_greater, test_txn_compare_less, test_txn_compare_not_equal,
test_txn_one_compare_op, text_txn_multi_compare_op,
};
async fn mock_mem_store_with_data() -> MemoryKvBackend<Error> {
@@ -383,4 +385,15 @@ mod tests {
test_kv_batch_delete(kv_backend).await;
}
#[tokio::test]
async fn test_memory_txn() {
let kv_backend = Arc::new(MemoryKvBackend::<Error>::new());
test_txn_one_compare_op(kv_backend.clone()).await;
text_txn_multi_compare_op(kv_backend.clone()).await;
test_txn_compare_equal(kv_backend.clone()).await;
test_txn_compare_greater(kv_backend.clone()).await;
test_txn_compare_less(kv_backend.clone()).await;
test_txn_compare_not_equal(kv_backend).await;
}
}

View File

@@ -22,11 +22,14 @@ use tokio_postgres::types::ToSql;
use tokio_postgres::NoTls;
use crate::error::{
CreatePostgresPoolSnafu, Error, GetPostgresConnectionSnafu, PostgresExecutionSnafu, Result,
StrFromUtf8Snafu,
CreatePostgresPoolSnafu, Error, GetPostgresConnectionSnafu, PostgresExecutionSnafu,
PostgresTransactionSnafu, Result, StrFromUtf8Snafu,
};
use crate::kv_backend::txn::{
Compare, Txn as KvTxn, TxnOp, TxnOpResponse, TxnResponse as KvTxnResponse,
};
use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
use crate::metrics::METRIC_META_TXN_REQUEST;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
@@ -34,9 +37,47 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;
type PgClient = deadpool::managed::Object<deadpool_postgres::Manager>;
enum PgQueryExecutor<'a> {
Client(PgClient),
Transaction(deadpool_postgres::Transaction<'a>),
}
impl PgQueryExecutor<'_> {
async fn query(
&self,
query: &str,
params: &[&(dyn ToSql + Sync)],
) -> Result<Vec<tokio_postgres::Row>> {
match self {
PgQueryExecutor::Client(client) => client
.query(query, params)
.await
.context(PostgresExecutionSnafu),
PgQueryExecutor::Transaction(txn) => txn
.query(query, params)
.await
.context(PostgresExecutionSnafu),
}
}
async fn commit(self) -> Result<()> {
match self {
PgQueryExecutor::Client(_) => Ok(()),
PgQueryExecutor::Transaction(txn) => {
txn.commit().await.context(PostgresTransactionSnafu {
operation: "commit".to_string(),
})
}
}
}
}
/// Posgres backend store for metasrv
pub struct PgStore {
pool: Pool,
max_txn_ops: usize,
}
const EMPTY: &[u8] = &[0];
@@ -94,17 +135,17 @@ SELECT k, v FROM prev;"#;
impl PgStore {
/// Create pgstore impl of KvBackendRef from url.
pub async fn with_url(url: &str) -> Result<KvBackendRef> {
pub async fn with_url(url: &str, max_txn_ops: usize) -> Result<KvBackendRef> {
let mut cfg = Config::new();
cfg.url = Some(url.to_string());
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)?;
Self::with_pg_pool(pool).await
Self::with_pg_pool(pool, max_txn_ops).await
}
/// Create pgstore impl of KvBackendRef from tokio-postgres client.
pub async fn with_pg_pool(pool: Pool) -> Result<KvBackendRef> {
pub async fn with_pg_pool(pool: Pool, max_txn_ops: usize) -> Result<KvBackendRef> {
// This step ensures the postgres metadata backend is ready to use.
// We check if greptime_metakv table exists, and we will create a new table
// if it does not exist.
@@ -121,10 +162,10 @@ impl PgStore {
.execute(METADKV_CREATION, &[])
.await
.context(PostgresExecutionSnafu)?;
Ok(Arc::new(Self { pool }))
Ok(Arc::new(Self { pool, max_txn_ops }))
}
async fn get_client(&self) -> Result<deadpool::managed::Object<deadpool_postgres::Manager>> {
async fn get_client(&self) -> Result<PgClient> {
match self.pool.get().await {
Ok(client) => Ok(client),
Err(e) => GetPostgresConnectionSnafu {
@@ -134,13 +175,30 @@ impl PgStore {
}
}
async fn put_if_not_exists(&self, key: &str, value: &str) -> Result<bool> {
let res = self
.get_client()
.await?
.query(PUT_IF_NOT_EXISTS, &[&key, &value])
async fn get_client_executor(&self) -> Result<PgQueryExecutor<'_>> {
let client = self.get_client().await?;
Ok(PgQueryExecutor::Client(client))
}
async fn get_txn_executor<'a>(&self, client: &'a mut PgClient) -> Result<PgQueryExecutor<'a>> {
let txn = client
.transaction()
.await
.context(PostgresExecutionSnafu)?;
.context(PostgresTransactionSnafu {
operation: "start".to_string(),
})?;
Ok(PgQueryExecutor::Transaction(txn))
}
async fn put_if_not_exists_with_query_executor(
&self,
query_executor: &PgQueryExecutor<'_>,
key: &str,
value: &str,
) -> Result<bool> {
let res = query_executor
.query(PUT_IF_NOT_EXISTS, &[&key, &value])
.await?;
Ok(res.is_empty())
}
}
@@ -247,6 +305,47 @@ impl KvBackend for PgStore {
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
let client = self.get_client_executor().await?;
self.range_with_query_executor(&client, req).await
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
let client = self.get_client_executor().await?;
self.put_with_query_executor(&client, req).await
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
let client = self.get_client_executor().await?;
self.batch_put_with_query_executor(&client, req).await
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
let client = self.get_client_executor().await?;
self.batch_get_with_query_executor(&client, req).await
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let client = self.get_client_executor().await?;
self.delete_range_with_query_executor(&client, req).await
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
let client = self.get_client_executor().await?;
self.batch_delete_with_query_executor(&client, req).await
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
let client = self.get_client_executor().await?;
self.compare_and_put_with_query_executor(&client, req).await
}
}
impl PgStore {
async fn range_with_query_executor(
&self,
query_executor: &PgQueryExecutor<'_>,
req: RangeRequest,
) -> Result<RangeResponse> {
let mut params = vec![];
let template = select_range_template(&req);
if req.key != EMPTY {
@@ -275,12 +374,7 @@ impl KvBackend for PgStore {
Cow::Owned(owned) => owned as &(dyn ToSql + Sync),
})
.collect();
let res = self
.get_client()
.await?
.query(&template, &params)
.await
.context(PostgresExecutionSnafu)?;
let res = query_executor.query(&template, &params).await?;
let kvs: Vec<KeyValue> = res
.into_iter()
.map(|r| {
@@ -308,16 +402,23 @@ impl KvBackend for PgStore {
})
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
async fn put_with_query_executor(
&self,
query_executor: &PgQueryExecutor<'_>,
req: PutRequest,
) -> Result<PutResponse> {
let kv = KeyValue {
key: req.key,
value: req.value,
};
let mut res = self
.batch_put(BatchPutRequest {
kvs: vec![kv],
prev_kv: req.prev_kv,
})
.batch_put_with_query_executor(
query_executor,
BatchPutRequest {
kvs: vec![kv],
prev_kv: req.prev_kv,
},
)
.await?;
if !res.prev_kvs.is_empty() {
@@ -328,7 +429,11 @@ impl KvBackend for PgStore {
Ok(PutResponse { prev_kv: None })
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
async fn batch_put_with_query_executor(
&self,
query_executor: &PgQueryExecutor<'_>,
req: BatchPutRequest,
) -> Result<BatchPutResponse> {
let mut in_params = Vec::with_capacity(req.kvs.len());
let mut values_params = Vec::with_capacity(req.kvs.len() * 2);
@@ -346,12 +451,7 @@ impl KvBackend for PgStore {
let query = generate_batch_upsert_query(req.kvs.len());
let res = self
.get_client()
.await?
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
let res = query_executor.query(&query, &params).await?;
if req.prev_kv {
let kvs: Vec<KeyValue> = res
.into_iter()
@@ -371,7 +471,12 @@ impl KvBackend for PgStore {
Ok(BatchPutResponse { prev_kvs: vec![] })
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
/// Batch get with certain client. It's needed for a client with transaction.
async fn batch_get_with_query_executor(
&self,
query_executor: &PgQueryExecutor<'_>,
req: BatchGetRequest,
) -> Result<BatchGetResponse> {
if req.keys.is_empty() {
return Ok(BatchGetResponse { kvs: vec![] });
}
@@ -386,12 +491,7 @@ impl KvBackend for PgStore {
.map(|x| x as &(dyn ToSql + Sync))
.collect();
let res = self
.get_client()
.await?
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
let res = query_executor.query(&query, &params).await?;
let kvs: Vec<KeyValue> = res
.into_iter()
.map(|r| {
@@ -406,7 +506,11 @@ impl KvBackend for PgStore {
Ok(BatchGetResponse { kvs })
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
async fn delete_range_with_query_executor(
&self,
query_executor: &PgQueryExecutor<'_>,
req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse> {
let mut params = vec![];
let template = select_range_delete_template(&req);
if req.key != EMPTY {
@@ -430,12 +534,7 @@ impl KvBackend for PgStore {
})
.collect();
let res = self
.get_client()
.await?
.query(template, &params)
.await
.context(PostgresExecutionSnafu)?;
let res = query_executor.query(template, &params).await?;
let deleted = res.len() as i64;
if !req.prev_kv {
return Ok({
@@ -462,7 +561,11 @@ impl KvBackend for PgStore {
})
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
async fn batch_delete_with_query_executor(
&self,
query_executor: &PgQueryExecutor<'_>,
req: BatchDeleteRequest,
) -> Result<BatchDeleteResponse> {
if req.keys.is_empty() {
return Ok(BatchDeleteResponse { prev_kvs: vec![] });
}
@@ -477,12 +580,7 @@ impl KvBackend for PgStore {
.map(|x| x as &(dyn ToSql + Sync))
.collect();
let res = self
.get_client()
.await?
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
let res = query_executor.query(&query, &params).await?;
if !req.prev_kv {
return Ok(BatchDeleteResponse { prev_kvs: vec![] });
}
@@ -500,11 +598,17 @@ impl KvBackend for PgStore {
Ok(BatchDeleteResponse { prev_kvs: kvs })
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
async fn compare_and_put_with_query_executor(
&self,
query_executor: &PgQueryExecutor<'_>,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse> {
let key = process_bytes(&req.key, "CASKey")?;
let value = process_bytes(&req.value, "CASValue")?;
if req.expect.is_empty() {
let put_res = self.put_if_not_exists(key, value).await?;
let put_res = self
.put_if_not_exists_with_query_executor(query_executor, key, value)
.await?;
return Ok(CompareAndPutResponse {
success: put_res,
prev_kv: None,
@@ -512,12 +616,7 @@ impl KvBackend for PgStore {
}
let expect = process_bytes(&req.expect, "CASExpect")?;
let res = self
.get_client()
.await?
.query(CAS, &[&key, &value, &expect])
.await
.context(PostgresExecutionSnafu)?;
let res = query_executor.query(CAS, &[&key, &value, &expect]).await?;
match res.is_empty() {
true => Ok(CompareAndPutResponse {
success: false,
@@ -542,19 +641,258 @@ impl KvBackend for PgStore {
}
}
}
async fn execute_txn_cmp(
&self,
query_executor: &PgQueryExecutor<'_>,
cmp: &[Compare],
) -> Result<bool> {
let batch_get_req = BatchGetRequest {
keys: cmp.iter().map(|c| c.key.clone()).collect(),
};
let res = self
.batch_get_with_query_executor(query_executor, batch_get_req)
.await?;
let res_map = res
.kvs
.into_iter()
.map(|kv| (kv.key, kv.value))
.collect::<std::collections::HashMap<Vec<u8>, Vec<u8>>>();
for c in cmp {
let value = res_map.get(&c.key);
if !c.compare_value(value) {
return Ok(false);
}
}
Ok(true)
}
/// Execute a batch of transaction operations. This function is only used for transactions with the same operation type.
async fn try_batch_txn(
&self,
query_executor: &PgQueryExecutor<'_>,
txn_ops: &[TxnOp],
) -> Result<Option<Vec<TxnOpResponse>>> {
if !check_txn_ops(txn_ops)? {
return Ok(None);
}
match txn_ops.first() {
Some(TxnOp::Delete(_)) => {
let mut batch_del_req = BatchDeleteRequest {
keys: vec![],
prev_kv: false,
};
for op in txn_ops {
if let TxnOp::Delete(key) = op {
batch_del_req.keys.push(key.clone());
}
}
let res = self
.batch_delete_with_query_executor(query_executor, batch_del_req)
.await?;
let res_map = res
.prev_kvs
.into_iter()
.map(|kv| (kv.key, kv.value))
.collect::<std::collections::HashMap<Vec<u8>, Vec<u8>>>();
let mut resps = Vec::with_capacity(txn_ops.len());
for op in txn_ops {
if let TxnOp::Delete(key) = op {
let value = res_map.get(key);
resps.push(TxnOpResponse::ResponseDelete(DeleteRangeResponse {
deleted: if value.is_some() { 1 } else { 0 },
prev_kvs: value
.map(|v| {
vec![KeyValue {
key: key.clone(),
value: v.clone(),
}]
})
.unwrap_or_default(),
}));
}
}
Ok(Some(resps))
}
Some(TxnOp::Put(_, _)) => {
let mut batch_put_req = BatchPutRequest {
kvs: vec![],
prev_kv: false,
};
for op in txn_ops {
if let TxnOp::Put(key, value) = op {
batch_put_req.kvs.push(KeyValue {
key: key.clone(),
value: value.clone(),
});
}
}
let res = self
.batch_put_with_query_executor(query_executor, batch_put_req)
.await?;
let res_map = res
.prev_kvs
.into_iter()
.map(|kv| (kv.key, kv.value))
.collect::<std::collections::HashMap<Vec<u8>, Vec<u8>>>();
let mut resps = Vec::with_capacity(txn_ops.len());
for op in txn_ops {
if let TxnOp::Put(key, _) = op {
let prev_kv = res_map.get(key);
match prev_kv {
Some(v) => {
resps.push(TxnOpResponse::ResponsePut(PutResponse {
prev_kv: Some(KeyValue {
key: key.clone(),
value: v.clone(),
}),
}));
}
None => {
resps.push(TxnOpResponse::ResponsePut(PutResponse {
prev_kv: None,
}));
}
}
}
}
Ok(Some(resps))
}
Some(TxnOp::Get(_)) => {
let mut batch_get_req = BatchGetRequest { keys: vec![] };
for op in txn_ops {
if let TxnOp::Get(key) = op {
batch_get_req.keys.push(key.clone());
}
}
let res = self
.batch_get_with_query_executor(query_executor, batch_get_req)
.await?;
let res_map = res
.kvs
.into_iter()
.map(|kv| (kv.key, kv.value))
.collect::<std::collections::HashMap<Vec<u8>, Vec<u8>>>();
let mut resps = Vec::with_capacity(txn_ops.len());
for op in txn_ops {
if let TxnOp::Get(key) = op {
let value = res_map.get(key);
resps.push(TxnOpResponse::ResponseGet(RangeResponse {
kvs: value
.map(|v| {
vec![KeyValue {
key: key.clone(),
value: v.clone(),
}]
})
.unwrap_or_default(),
more: false,
}));
}
}
Ok(Some(resps))
}
None => Ok(Some(vec![])),
}
}
async fn execute_txn_op(
&self,
query_executor: &PgQueryExecutor<'_>,
op: TxnOp,
) -> Result<TxnOpResponse> {
match op {
TxnOp::Put(key, value) => {
let res = self
.put_with_query_executor(
query_executor,
PutRequest {
key,
value,
prev_kv: false,
},
)
.await?;
Ok(TxnOpResponse::ResponsePut(res))
}
TxnOp::Get(key) => {
let res = self
.range_with_query_executor(
query_executor,
RangeRequest {
key,
range_end: vec![],
limit: 1,
keys_only: false,
},
)
.await?;
Ok(TxnOpResponse::ResponseGet(res))
}
TxnOp::Delete(key) => {
let res = self
.delete_range_with_query_executor(
query_executor,
DeleteRangeRequest {
key,
range_end: vec![],
prev_kv: false,
},
)
.await?;
Ok(TxnOpResponse::ResponseDelete(res))
}
}
}
}
#[async_trait::async_trait]
impl TxnService for PgStore {
type Error = Error;
async fn txn(&self, _txn: KvTxn) -> Result<KvTxnResponse> {
// TODO: implement txn for pg kv backend.
unimplemented!()
async fn txn(&self, txn: KvTxn) -> Result<KvTxnResponse> {
let _timer = METRIC_META_TXN_REQUEST
.with_label_values(&["postgres", "txn"])
.start_timer();
let mut client = self.get_client().await?;
let pg_txn = self.get_txn_executor(&mut client).await?;
let mut success = true;
if txn.c_when {
success = self.execute_txn_cmp(&pg_txn, &txn.req.compare).await?;
}
let mut responses = vec![];
if success && txn.c_then {
match self.try_batch_txn(&pg_txn, &txn.req.success).await? {
Some(res) => responses.extend(res),
None => {
for txnop in txn.req.success {
let res = self.execute_txn_op(&pg_txn, txnop).await?;
responses.push(res);
}
}
}
} else if !success && txn.c_else {
match self.try_batch_txn(&pg_txn, &txn.req.failure).await? {
Some(res) => responses.extend(res),
None => {
for txnop in txn.req.failure {
let res = self.execute_txn_op(&pg_txn, txnop).await?;
responses.push(res);
}
}
}
}
pg_txn.commit().await?;
Ok(KvTxnResponse {
responses,
succeeded: success,
})
}
fn max_txn_ops(&self) -> usize {
unreachable!("postgres backend does not support max_txn_ops!")
self.max_txn_ops
}
}
@@ -570,6 +908,25 @@ fn is_prefix_range(start: &[u8], end: &[u8]) -> bool {
false
}
/// Check if the transaction operations are the same type.
fn check_txn_ops(txn_ops: &[TxnOp]) -> Result<bool> {
if txn_ops.is_empty() {
return Ok(false);
}
let first_op = &txn_ops[0];
for op in txn_ops {
match (op, first_op) {
(TxnOp::Put(_, _), TxnOp::Put(_, _)) => {}
(TxnOp::Get(_), TxnOp::Get(_)) => {}
(TxnOp::Delete(_), TxnOp::Delete(_)) => {}
_ => {
return Ok(false);
}
}
}
Ok(true)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -577,6 +934,8 @@ mod tests {
prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less,
test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op,
unprepare_kv,
};
@@ -598,69 +957,66 @@ mod tests {
.await
.context(PostgresExecutionSnafu)
.unwrap();
Some(PgStore { pool })
Some(PgStore {
pool,
max_txn_ops: 128,
})
}
#[tokio::test]
async fn test_put() {
async fn test_pg_crud() {
if let Some(kv_backend) = build_pg_kv_backend().await {
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_range() {
if let Some(kv_backend) = build_pg_kv_backend().await {
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test]
async fn test_range_2() {
if let Some(kv_backend) = build_pg_kv_backend().await {
test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await;
}
}
#[tokio::test]
async fn test_batch_get() {
if let Some(kv_backend) = build_pg_kv_backend().await {
let prefix = b"batchGet/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_compare_and_put() {
if let Some(kv_backend) = build_pg_kv_backend().await {
let kv_backend = Arc::new(kv_backend);
test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await;
}
}
#[tokio::test]
async fn test_delete_range() {
if let Some(kv_backend) = build_pg_kv_backend().await {
let prefix = b"deleteRange/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await;
}
}
#[tokio::test]
async fn test_batch_delete() {
if let Some(kv_backend) = build_pg_kv_backend().await {
test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await;
}
if let Some(kv_backend) = build_pg_kv_backend().await {
let kv_backend = Arc::new(kv_backend);
test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await;
}
if let Some(kv_backend) = build_pg_kv_backend().await {
let prefix = b"batchDelete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await;
}
if let Some(kv_backend) = build_pg_kv_backend().await {
let kv_backend_ref = Arc::new(kv_backend);
test_txn_one_compare_op(kv_backend_ref.clone()).await;
text_txn_multi_compare_op(kv_backend_ref.clone()).await;
test_txn_compare_equal(kv_backend_ref.clone()).await;
test_txn_compare_greater(kv_backend_ref.clone()).await;
test_txn_compare_less(kv_backend_ref.clone()).await;
test_txn_compare_not_equal(kv_backend_ref.clone()).await;
// Clean up
kv_backend_ref
.get_client()
.await
.unwrap()
.execute("DELETE FROM greptime_metakv", &[])
.await
.unwrap();
}
}
}

View File

@@ -15,6 +15,8 @@
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use txn::{Compare, CompareOp, TxnOp};
use super::{KvBackend, *};
use crate::error::Error;
use crate::rpc::store::{BatchGetRequest, PutRequest};
@@ -444,3 +446,207 @@ pub async fn test_kv_batch_delete_with_prefix(kv_backend: impl KvBackend, prefix
assert!(kv_backend.get(&key3).await.unwrap().is_none());
assert!(kv_backend.get(&key11).await.unwrap().is_none());
}
pub async fn test_txn_one_compare_op(kv_backend: KvBackendRef) {
let _ = kv_backend
.put(PutRequest {
key: vec![11],
value: vec![3],
..Default::default()
})
.await
.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_value(
vec![11],
CompareOp::Greater,
vec![1],
)])
.and_then(vec![TxnOp::Put(vec![11], vec![1])])
.or_else(vec![TxnOp::Put(vec![11], vec![2])]);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
assert_eq!(txn_response.responses.len(), 1);
}
pub async fn text_txn_multi_compare_op(kv_backend: KvBackendRef) {
for i in 1..3 {
let _ = kv_backend
.put(PutRequest {
key: vec![i],
value: vec![i],
..Default::default()
})
.await
.unwrap();
}
let when: Vec<_> = (1..3u8)
.map(|i| Compare::with_value(vec![i], CompareOp::Equal, vec![i]))
.collect();
let txn = Txn::new()
.when(when)
.and_then(vec![
TxnOp::Put(vec![1], vec![10]),
TxnOp::Put(vec![2], vec![20]),
])
.or_else(vec![TxnOp::Put(vec![1], vec![11])]);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
assert_eq!(txn_response.responses.len(), 2);
}
pub async fn test_txn_compare_equal(kv_backend: KvBackendRef) {
let key = vec![101u8];
kv_backend.delete(&key, false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::Equal,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
assert!(txn_response.succeeded);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
vec![2],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Put(key, vec![4])]);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
}
pub async fn test_txn_compare_greater(kv_backend: KvBackendRef) {
let key = vec![102u8];
kv_backend.delete(&key, false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::Greater,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
assert!(!txn_response.succeeded);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Greater,
vec![1],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Get(key.clone())]);
let mut txn_response = kv_backend.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let res = txn_response.responses.pop().unwrap();
assert_eq!(
res,
TxnOpResponse::ResponseGet(RangeResponse {
kvs: vec![KeyValue {
key,
value: vec![1]
}],
more: false,
})
);
}
pub async fn test_txn_compare_less(kv_backend: KvBackendRef) {
let key = vec![103u8];
kv_backend.delete(&[3], false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::Less,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
assert!(!txn_response.succeeded);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Less,
vec![2],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Get(key.clone())]);
let mut txn_response = kv_backend.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let res = txn_response.responses.pop().unwrap();
assert_eq!(
res,
TxnOpResponse::ResponseGet(RangeResponse {
kvs: vec![KeyValue {
key,
value: vec![2]
}],
more: false,
})
);
}
pub async fn test_txn_compare_not_equal(kv_backend: KvBackendRef) {
let key = vec![104u8];
kv_backend.delete(&key, false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::NotEqual,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
assert!(!txn_response.succeeded);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
vec![2],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Get(key.clone())]);
let mut txn_response = kv_backend.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let res = txn_response.responses.pop().unwrap();
assert_eq!(
res,
TxnOpResponse::ResponseGet(RangeResponse {
kvs: vec![KeyValue {
key,
value: vec![1]
}],
more: false,
})
);
}

View File

@@ -131,9 +131,9 @@ pub struct TxnResponse {
pub struct Txn {
// HACK - chroot would modify this field
pub(super) req: TxnRequest,
c_when: bool,
c_then: bool,
c_else: bool,
pub(super) c_when: bool,
pub(super) c_then: bool,
pub(super) c_else: bool,
}
#[cfg(any(test, feature = "testing"))]
@@ -241,14 +241,7 @@ impl From<Txn> for TxnRequest {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
use crate::rpc::KeyValue;
#[test]
fn test_compare() {
@@ -310,232 +303,4 @@ mod tests {
}
);
}
#[tokio::test]
async fn test_txn_one_compare_op() {
let kv_backend = create_kv_backend().await;
let _ = kv_backend
.put(PutRequest {
key: vec![11],
value: vec![3],
..Default::default()
})
.await
.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_value(
vec![11],
CompareOp::Greater,
vec![1],
)])
.and_then(vec![TxnOp::Put(vec![11], vec![1])])
.or_else(vec![TxnOp::Put(vec![11], vec![2])]);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
assert_eq!(txn_response.responses.len(), 1);
}
#[tokio::test]
async fn test_txn_multi_compare_op() {
let kv_backend = create_kv_backend().await;
for i in 1..3 {
let _ = kv_backend
.put(PutRequest {
key: vec![i],
value: vec![i],
..Default::default()
})
.await
.unwrap();
}
let when: Vec<_> = (1..3u8)
.map(|i| Compare::with_value(vec![i], CompareOp::Equal, vec![i]))
.collect();
let txn = Txn::new()
.when(when)
.and_then(vec![
TxnOp::Put(vec![1], vec![10]),
TxnOp::Put(vec![2], vec![20]),
])
.or_else(vec![TxnOp::Put(vec![1], vec![11])]);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
assert_eq!(txn_response.responses.len(), 2);
}
#[tokio::test]
async fn test_txn_compare_equal() {
let kv_backend = create_kv_backend().await;
let key = vec![101u8];
kv_backend.delete(&key, false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::Equal,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
assert!(txn_response.succeeded);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
vec![2],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Put(key, vec![4])]);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
}
#[tokio::test]
async fn test_txn_compare_greater() {
let kv_backend = create_kv_backend().await;
let key = vec![102u8];
kv_backend.delete(&key, false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::Greater,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
assert!(!txn_response.succeeded);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Greater,
vec![1],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Get(key.clone())]);
let mut txn_response = kv_backend.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let res = txn_response.responses.pop().unwrap();
assert_eq!(
res,
TxnOpResponse::ResponseGet(RangeResponse {
kvs: vec![KeyValue {
key,
value: vec![1]
}],
more: false,
})
);
}
#[tokio::test]
async fn test_txn_compare_less() {
let kv_backend = create_kv_backend().await;
let key = vec![103u8];
kv_backend.delete(&[3], false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::Less,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
assert!(!txn_response.succeeded);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Less,
vec![2],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Get(key.clone())]);
let mut txn_response = kv_backend.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let res = txn_response.responses.pop().unwrap();
assert_eq!(
res,
TxnOpResponse::ResponseGet(RangeResponse {
kvs: vec![KeyValue {
key,
value: vec![2]
}],
more: false,
})
);
}
#[tokio::test]
async fn test_txn_compare_not_equal() {
let kv_backend = create_kv_backend().await;
let key = vec![104u8];
kv_backend.delete(&key, false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_value_not_exists(
key.clone(),
CompareOp::NotEqual,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
assert!(!txn_response.succeeded);
let txn_response = kv_backend.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
vec![2],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Get(key.clone())]);
let mut txn_response = kv_backend.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let res = txn_response.responses.pop().unwrap();
assert_eq!(
res,
TxnOpResponse::ResponseGet(RangeResponse {
kvs: vec![KeyValue {
key,
value: vec![1]
}],
more: false,
})
);
}
async fn create_kv_backend() -> KvBackendRef {
Arc::new(MemoryKvBackend::<Error>::new())
// TODO(jiachun): Add a feature to test against etcd in github CI
//
// The same test can be run against etcd by uncommenting the following line
// crate::service::store::etcd::EtcdStore::with_endpoints(["127.0.0.1:2379"])
// .await
// .unwrap()
}
}

View File

@@ -65,6 +65,7 @@ tonic.workspace = true
tower.workspace = true
typetag.workspace = true
url = "2.3"
uuid.workspace = true
[dev-dependencies]
chrono.workspace = true

View File

@@ -229,7 +229,7 @@ pub async fn metasrv_builder(
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pool = create_postgres_pool(opts).await?;
let kv_backend = PgStore::with_pg_pool(pool)
let kv_backend = PgStore::with_pg_pool(pool, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
// Client for election should be created separately since we need a different session keep-alive idle time.

View File

@@ -35,8 +35,8 @@ use crate::error::{
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
// TODO(CookiePie): The lock id should be configurable.
const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(28319)";
const STEP_DOWN: &str = "SELECT pg_advisory_unlock(28319)";
const CAMPAIGN: &str = "SELECT pg_try_advisory_lock({})";
const STEP_DOWN: &str = "SELECT pg_advisory_unlock({})";
const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_in_transaction_session_timeout = $1";
// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive.
// Either the leader reconnects and step down or the session expires and the lock is released.
@@ -73,6 +73,14 @@ const PREFIX_GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIM
const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE k = $1 RETURNING k,v;";
fn campaign_sql(lock_id: u64) -> String {
CAMPAIGN.replace("{}", &lock_id.to_string())
}
fn step_down_sql(lock_id: u64) -> String {
STEP_DOWN.replace("{}", &lock_id.to_string())
}
/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time".
fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
let (value, expire_time) = value
@@ -130,6 +138,7 @@ pub struct PgElection {
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
lock_id: u64,
}
impl PgElection {
@@ -154,6 +163,8 @@ impl PgElection {
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
// TODO(CookiePie): The lock id should be configurable.
lock_id: 28319,
}))
}
@@ -265,7 +276,7 @@ impl Election for PgElection {
loop {
let res = self
.client
.query(CAMPAIGN, &[])
.query(&campaign_sql(self.lock_id), &[])
.await
.context(PostgresExecutionSnafu)?;
if let Some(row) = res.first() {
@@ -550,7 +561,7 @@ impl PgElection {
{
self.delete_value(&key).await?;
self.client
.query(STEP_DOWN, &[])
.query(&step_down_sql(self.lock_id), &[])
.await
.context(PostgresExecutionSnafu)?;
if let Err(e) = self
@@ -657,8 +668,9 @@ mod tests {
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
store_key_prefix: uuid::Uuid::new_v4().to_string(),
candidate_lease_ttl_secs: 10,
lock_id: 28319,
};
let res = pg_election
@@ -716,7 +728,11 @@ mod tests {
assert!(current == Timestamp::default());
}
async fn candidate(leader_value: String, candidate_lease_ttl_secs: u64) {
async fn candidate(
leader_value: String,
candidate_lease_ttl_secs: u64,
store_key_prefix: String,
) {
let client = create_postgres_client().await.unwrap();
let (tx, _) = broadcast::channel(100);
@@ -726,8 +742,9 @@ mod tests {
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
store_key_prefix,
candidate_lease_ttl_secs,
lock_id: 28319,
};
let node_info = MetasrvNodeInfo {
@@ -743,10 +760,15 @@ mod tests {
async fn test_candidate_registration() {
let leader_value_prefix = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let store_key_prefix = uuid::Uuid::new_v4().to_string();
let mut handles = vec![];
for i in 0..10 {
let leader_value = format!("{}{}", leader_value_prefix, i);
let handle = tokio::spawn(candidate(leader_value, candidate_lease_ttl_secs));
let handle = tokio::spawn(candidate(
leader_value,
candidate_lease_ttl_secs,
store_key_prefix.clone(),
));
handles.push(handle);
}
// Wait for candidates to registrate themselves and renew their leases at least once.
@@ -762,8 +784,9 @@ mod tests {
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
store_key_prefix: store_key_prefix.clone(),
candidate_lease_ttl_secs,
lock_id: 28319,
};
let candidates = pg_election.all_candidates().await.unwrap();
@@ -782,7 +805,7 @@ mod tests {
for i in 0..10 {
let key = format!(
"{}{}{}{}",
"test_prefix", CANDIDATES_ROOT, leader_value_prefix, i
store_key_prefix, CANDIDATES_ROOT, leader_value_prefix, i
);
let res = pg_election.delete_value(&key).await.unwrap();
assert!(res);
@@ -802,8 +825,9 @@ mod tests {
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
store_key_prefix: uuid::Uuid::new_v4().to_string(),
candidate_lease_ttl_secs,
lock_id: 28320,
};
leader_pg_election.elected().await.unwrap();
@@ -899,6 +923,7 @@ mod tests {
#[tokio::test]
async fn test_leader_action() {
let leader_value = "test_leader".to_string();
let store_key_prefix = uuid::Uuid::new_v4().to_string();
let candidate_lease_ttl_secs = 5;
let client = create_postgres_client().await.unwrap();
@@ -909,14 +934,15 @@ mod tests {
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
store_key_prefix,
candidate_lease_ttl_secs,
lock_id: 28321,
};
// Step 1: No leader exists, campaign and elected.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.await
.unwrap();
let res: bool = res[0].get(0);
@@ -947,7 +973,7 @@ mod tests {
// Step 2: As a leader, renew the lease.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.await
.unwrap();
let res: bool = res[0].get(0);
@@ -967,7 +993,7 @@ mod tests {
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.await
.unwrap();
let res: bool = res[0].get(0);
@@ -995,7 +1021,7 @@ mod tests {
// Step 4: Re-campaign and elected.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.await
.unwrap();
let res: bool = res[0].get(0);
@@ -1052,7 +1078,7 @@ mod tests {
// Step 6: Re-campaign and elected.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.await
.unwrap();
let res: bool = res[0].get(0);
@@ -1083,7 +1109,7 @@ mod tests {
// Step 7: Something wrong, the leader key changed by others.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.await
.unwrap();
let res: bool = res[0].get(0);
@@ -1116,11 +1142,19 @@ mod tests {
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
// Clean up
leader_pg_election
.client
.query(&step_down_sql(leader_pg_election.lock_id), &[])
.await
.unwrap();
}
#[tokio::test]
async fn test_follower_action() {
let candidate_lease_ttl_secs = 5;
let store_key_prefix = uuid::Uuid::new_v4().to_string();
let follower_client = create_postgres_client().await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
@@ -1130,8 +1164,9 @@ mod tests {
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
store_key_prefix: store_key_prefix.clone(),
candidate_lease_ttl_secs,
lock_id: 28322,
};
let leader_client = create_postgres_client().await.unwrap();
@@ -1142,13 +1177,14 @@ mod tests {
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
store_key_prefix,
candidate_lease_ttl_secs,
lock_id: 28322,
};
leader_pg_election
.client
.query(CAMPAIGN, &[])
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.await
.unwrap();
leader_pg_election.elected().await.unwrap();
@@ -1185,5 +1221,12 @@ mod tests {
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
// Clean up
leader_pg_election
.client
.query(&step_down_sql(leader_pg_election.lock_id), &[])
.await
.unwrap();
}
}