mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: max-txn-ops option (#3458)
* feat: max-txn-ops limit * chore: by comment
This commit is contained in:
@@ -62,7 +62,9 @@ pub struct BenchTableMetadataCommand {
|
||||
|
||||
impl BenchTableMetadataCommand {
|
||||
pub async fn build(&self) -> Result<Instance> {
|
||||
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr]).await.unwrap();
|
||||
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(etcd_store));
|
||||
|
||||
|
||||
@@ -70,7 +70,7 @@ impl UpgradeCommand {
|
||||
etcd_addr: &self.etcd_addr,
|
||||
})?;
|
||||
let tool = MigrateTableMetadata {
|
||||
etcd_store: EtcdStore::with_etcd_client(client),
|
||||
etcd_store: EtcdStore::with_etcd_client(client, 128),
|
||||
dryrun: self.dryrun,
|
||||
skip_catalog_keys: self.skip_catalog_keys,
|
||||
skip_table_global_keys: self.skip_table_global_keys,
|
||||
|
||||
@@ -117,10 +117,12 @@ struct StartCommand {
|
||||
/// The working home directory of this metasrv instance.
|
||||
#[clap(long)]
|
||||
data_home: Option<String>,
|
||||
|
||||
/// If it's not empty, the metasrv will store all data with this key prefix.
|
||||
#[clap(long, default_value = "")]
|
||||
store_key_prefix: String,
|
||||
/// The max operations per txn
|
||||
#[clap(long)]
|
||||
max_txn_ops: Option<usize>,
|
||||
}
|
||||
|
||||
impl StartCommand {
|
||||
@@ -181,6 +183,10 @@ impl StartCommand {
|
||||
opts.store_key_prefix = self.store_key_prefix.clone()
|
||||
}
|
||||
|
||||
if let Some(max_txn_ops) = self.max_txn_ops {
|
||||
opts.max_txn_ops = max_txn_ops;
|
||||
}
|
||||
|
||||
// Disable dashboard in metasrv.
|
||||
opts.http.disable_dashboard = true;
|
||||
|
||||
|
||||
@@ -67,6 +67,14 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
|
||||
EtcdTxnFailed {
|
||||
max_operations: usize,
|
||||
#[snafu(source)]
|
||||
error: etcd_client::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to get sequence: {}", err_msg))]
|
||||
NextSequence { err_msg: String, location: Location },
|
||||
|
||||
@@ -400,6 +408,7 @@ impl ErrorExt for Error {
|
||||
IllegalServerState { .. }
|
||||
| EtcdTxnOpResponse { .. }
|
||||
| EtcdFailed { .. }
|
||||
| EtcdTxnFailed { .. }
|
||||
| ConnectEtcd { .. } => StatusCode::Internal,
|
||||
|
||||
SerdeJson { .. }
|
||||
|
||||
@@ -464,7 +464,7 @@ impl TableMetadataManager {
|
||||
pub fn max_logical_tables_per_batch(&self) -> usize {
|
||||
// The batch size is max_txn_size / 3 because the size of the `tables_data`
|
||||
// is 3 times the size of the `tables_data`.
|
||||
self.kv_backend.max_txn_size() / 3
|
||||
self.kv_backend.max_txn_ops() / 3
|
||||
}
|
||||
|
||||
/// Creates metadata for multiple logical tables and return an error if different metadata exists.
|
||||
@@ -860,6 +860,7 @@ mod tests {
|
||||
use bytes::Bytes;
|
||||
use common_time::util::current_time_millis;
|
||||
use futures::TryStreamExt;
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::{RawTableInfo, TableInfo};
|
||||
|
||||
use super::datanode_table::DatanodeTableKey;
|
||||
@@ -1056,6 +1057,36 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_many_logical_tables_metadata() {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::default());
|
||||
let table_metadata_manager = TableMetadataManager::new(kv_backend);
|
||||
|
||||
let mut tables_data = vec![];
|
||||
for i in 0..128 {
|
||||
let table_id = i + 1;
|
||||
let regin_number = table_id * 3;
|
||||
let region_id = RegionId::new(table_id, regin_number);
|
||||
let region_route = new_region_route(region_id.as_u64(), 2);
|
||||
let region_routes = vec![region_route.clone()];
|
||||
let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
|
||||
table_id,
|
||||
&format!("my_table_{}", table_id),
|
||||
region_routes.iter().map(|r| r.region.id.region_number()),
|
||||
)
|
||||
.into();
|
||||
let table_route_value = TableRouteValue::physical(region_routes.clone());
|
||||
|
||||
tables_data.push((table_info, table_route_value));
|
||||
}
|
||||
|
||||
// creates metadata.
|
||||
table_metadata_manager
|
||||
.create_logical_tables_metadata(tables_data)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_table_metadata() {
|
||||
let mem_kv = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
@@ -19,8 +19,9 @@ use datatypes::schema::{ColumnSchema, SchemaBuilder};
|
||||
use store_api::storage::TableId;
|
||||
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};
|
||||
|
||||
pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
|
||||
pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
|
||||
table_id: TableId,
|
||||
table_name: &str,
|
||||
region_numbers: I,
|
||||
) -> TableInfo {
|
||||
let column_schemas = vec![
|
||||
@@ -50,8 +51,14 @@ pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
|
||||
TableInfoBuilder::default()
|
||||
.table_id(table_id)
|
||||
.table_version(5)
|
||||
.name("mytable")
|
||||
.name(table_name)
|
||||
.meta(meta)
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
|
||||
table_id: TableId,
|
||||
region_numbers: I,
|
||||
) -> TableInfo {
|
||||
new_test_table_info_with_name(table_id, "mytable", region_numbers)
|
||||
}
|
||||
|
||||
@@ -33,12 +33,6 @@ use crate::rpc::store::{
|
||||
};
|
||||
use crate::rpc::KeyValue;
|
||||
|
||||
// Maximum number of operations permitted in a transaction.
|
||||
// The etcd default configuration's `--max-txn-ops` is 128.
|
||||
//
|
||||
// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
|
||||
const MAX_TXN_SIZE: usize = 128;
|
||||
|
||||
fn convert_key_value(kv: etcd_client::KeyValue) -> KeyValue {
|
||||
let (key, value) = kv.into_key_value();
|
||||
KeyValue { key, value }
|
||||
@@ -46,10 +40,15 @@ fn convert_key_value(kv: etcd_client::KeyValue) -> KeyValue {
|
||||
|
||||
pub struct EtcdStore {
|
||||
client: Client,
|
||||
// Maximum number of operations permitted in a transaction.
|
||||
// The etcd default configuration's `--max-txn-ops` is 128.
|
||||
//
|
||||
// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
|
||||
max_txn_ops: usize,
|
||||
}
|
||||
|
||||
impl EtcdStore {
|
||||
pub async fn with_endpoints<E, S>(endpoints: S) -> Result<KvBackendRef>
|
||||
pub async fn with_endpoints<E, S>(endpoints: S, max_txn_ops: usize) -> Result<KvBackendRef>
|
||||
where
|
||||
E: AsRef<str>,
|
||||
S: AsRef<[E]>,
|
||||
@@ -58,16 +57,19 @@ impl EtcdStore {
|
||||
.await
|
||||
.context(error::ConnectEtcdSnafu)?;
|
||||
|
||||
Ok(Self::with_etcd_client(client))
|
||||
Ok(Self::with_etcd_client(client, max_txn_ops))
|
||||
}
|
||||
|
||||
pub fn with_etcd_client(client: Client) -> KvBackendRef {
|
||||
Arc::new(Self { client })
|
||||
pub fn with_etcd_client(client: Client, max_txn_ops: usize) -> KvBackendRef {
|
||||
Arc::new(Self {
|
||||
client,
|
||||
max_txn_ops,
|
||||
})
|
||||
}
|
||||
|
||||
async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
|
||||
let max_txn_size = self.max_txn_size();
|
||||
if txn_ops.len() < max_txn_size {
|
||||
let max_txn_ops = self.max_txn_ops();
|
||||
if txn_ops.len() < max_txn_ops {
|
||||
// fast path
|
||||
let _timer = METRIC_META_TXN_REQUEST
|
||||
.with_label_values(&["etcd", "txn"])
|
||||
@@ -83,7 +85,7 @@ impl EtcdStore {
|
||||
}
|
||||
|
||||
let txns = txn_ops
|
||||
.chunks(max_txn_size)
|
||||
.chunks(max_txn_ops)
|
||||
.map(|part| async move {
|
||||
let _timer = METRIC_META_TXN_REQUEST
|
||||
.with_label_values(&["etcd", "txn"])
|
||||
@@ -311,18 +313,20 @@ impl TxnService for EtcdStore {
|
||||
.with_label_values(&["etcd", "txn"])
|
||||
.start_timer();
|
||||
|
||||
let max_operations = txn.max_operations();
|
||||
|
||||
let etcd_txn: Txn = txn.into();
|
||||
let txn_res = self
|
||||
.client
|
||||
.kv_client()
|
||||
.txn(etcd_txn)
|
||||
.await
|
||||
.context(error::EtcdFailedSnafu)?;
|
||||
.context(error::EtcdTxnFailedSnafu { max_operations })?;
|
||||
txn_res.try_into()
|
||||
}
|
||||
|
||||
fn max_txn_size(&self) -> usize {
|
||||
MAX_TXN_SIZE
|
||||
fn max_txn_ops(&self) -> usize {
|
||||
self.max_txn_ops
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::cmp::max;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
|
||||
use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};
|
||||
@@ -27,7 +29,7 @@ pub trait TxnService: Sync + Send {
|
||||
}
|
||||
|
||||
/// Maximum number of operations permitted in a transaction.
|
||||
fn max_txn_size(&self) -> usize {
|
||||
fn max_txn_ops(&self) -> usize {
|
||||
usize::MAX
|
||||
}
|
||||
}
|
||||
@@ -192,6 +194,12 @@ impl Txn {
|
||||
self.req.failure = operations.into();
|
||||
self
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn max_operations(&self) -> usize {
|
||||
let opc = max(self.req.compare.len(), self.req.success.len());
|
||||
max(opc, self.req.failure.len())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Txn> for TxnRequest {
|
||||
|
||||
@@ -24,7 +24,9 @@ fn main() {
|
||||
|
||||
#[tokio::main]
|
||||
async fn run() {
|
||||
let kv_backend = EtcdStore::with_endpoints(["127.0.0.1:2380"]).await.unwrap();
|
||||
let kv_backend = EtcdStore::with_endpoints(["127.0.0.1:2380"], 128)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// put
|
||||
let put_req = PutRequest {
|
||||
|
||||
@@ -193,7 +193,8 @@ pub async fn metasrv_builder(
|
||||
(None, false) => {
|
||||
let etcd_client = create_etcd_client(opts).await?;
|
||||
let kv_backend = {
|
||||
let etcd_backend = EtcdStore::with_etcd_client(etcd_client.clone());
|
||||
let etcd_backend =
|
||||
EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
|
||||
if !opts.store_key_prefix.is_empty() {
|
||||
Arc::new(ChrootKvBackend::new(
|
||||
opts.store_key_prefix.clone().into_bytes(),
|
||||
|
||||
@@ -79,6 +79,17 @@ pub struct MetaSrvOptions {
|
||||
pub wal: MetaSrvWalConfig,
|
||||
pub export_metrics: ExportMetricsOption,
|
||||
pub store_key_prefix: String,
|
||||
/// The max operations per txn
|
||||
///
|
||||
/// This value is usually limited by which store is used for the `KvBackend`.
|
||||
/// For example, if using etcd, this value should ensure that it is less than
|
||||
/// or equal to the `--max-txn-ops` option value of etcd.
|
||||
///
|
||||
/// TODO(jeremy): Currently, this option only affects the etcd store, but it may
|
||||
/// also affect other stores in the future. In other words, each store needs to
|
||||
/// limit the number of operations in a txn because an infinitely large txn could
|
||||
/// potentially block other operations.
|
||||
pub max_txn_ops: usize,
|
||||
}
|
||||
|
||||
impl MetaSrvOptions {
|
||||
@@ -112,6 +123,7 @@ impl Default for MetaSrvOptions {
|
||||
wal: MetaSrvWalConfig::default(),
|
||||
export_metrics: ExportMetricsOption::default(),
|
||||
store_key_prefix: String::new(),
|
||||
max_txn_ops: 128,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ pub async fn mock_with_memstore() -> MockInfo {
|
||||
}
|
||||
|
||||
pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
|
||||
let kv_backend = EtcdStore::with_endpoints([addr]).await.unwrap();
|
||||
let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
|
||||
mock(Default::default(), kv_backend, None, None).await
|
||||
}
|
||||
|
||||
|
||||
@@ -468,8 +468,6 @@ impl Inserter {
|
||||
&req.table_name,
|
||||
);
|
||||
|
||||
info!("Logical table `{table_ref}` does not exist, try creating table");
|
||||
|
||||
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
|
||||
let mut create_table_expr = build_create_table_expr(&table_ref, request_schema)?;
|
||||
|
||||
|
||||
@@ -94,7 +94,7 @@ impl GreptimeDbClusterBuilder {
|
||||
.split(',')
|
||||
.map(|s| s.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
let backend = EtcdStore::with_endpoints(endpoints)
|
||||
let backend = EtcdStore::with_endpoints(endpoints, 128)
|
||||
.await
|
||||
.expect("malformed endpoints");
|
||||
// Each retry requires a new isolation namespace.
|
||||
|
||||
Reference in New Issue
Block a user