From 48996b0646d21b13a72ad8655f53fbe2fbf8dfd4 Mon Sep 17 00:00:00 2001 From: LFC Date: Tue, 25 Jul 2023 18:02:26 +0800 Subject: [PATCH] fix: etcd range pagenation in table metadata migration tool (#2035) --- src/cmd/src/cli/upgrade.rs | 57 +++++++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index 0423f55635..37504ecb8c 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -20,7 +20,10 @@ use common_meta::key::table_info::{TableInfoKey, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameValue}; use common_meta::key::table_region::{RegionDistribution, TableRegionKey, TableRegionValue}; use common_meta::key::TableMetaKey; -use common_meta::rpc::store::{BatchPutRequest, PutRequest, RangeRequest}; +use common_meta::rpc::store::{ + BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest, RangeResponse, +}; +use common_meta::util::get_prefix_end_key; use common_telemetry::info; use etcd_client::Client; use meta_srv::service::store::etcd::EtcdStore; @@ -61,24 +64,58 @@ struct MigrateTableMetadata { #[async_trait] impl Tool for MigrateTableMetadata { async fn do_work(&self) -> Result<()> { - let req = RangeRequest::new().with_prefix(b"__tg".to_vec()); - let resp = self.etcd_store.range(req).await.unwrap(); - for kv in resp.kvs { - let key = String::from_utf8_lossy(kv.key()); - let value = TableGlobalValue::from_bytes(kv.value()) - .unwrap_or_else(|e| panic!("table global value is corrupted: {e}, key: {key}")); + let mut key = b"__tg".to_vec(); + let range_end = get_prefix_end_key(&key); - self.create_table_name_key(&value).await; + let mut processed_keys = 0; + loop { + info!("Start scanning key from: {}", String::from_utf8_lossy(&key)); - self.create_datanode_table_keys(&value).await; + let req = RangeRequest::new() + .with_range(key, range_end.clone()) + .with_limit(1000); + let resp = self.etcd_store.range(req).await.unwrap(); + for kv in resp.kvs.iter() { + let key = String::from_utf8_lossy(kv.key()); + let value = TableGlobalValue::from_bytes(kv.value()) + .unwrap_or_else(|e| panic!("table global value is corrupted: {e}, key: {key}")); - self.split_table_global_value(&key, value).await; + self.create_table_name_key(&value).await; + + self.create_datanode_table_keys(&value).await; + + self.split_table_global_value(&key, value).await; + } + + self.delete_migrated_keys(&resp).await; + + processed_keys += resp.kvs.len(); + + if resp.more { + key = get_prefix_end_key(resp.kvs.last().unwrap().key()); + } else { + break; + } } + info!("Total migrated TableGlobalKeys: {processed_keys}"); Ok(()) } } impl MigrateTableMetadata { + async fn delete_migrated_keys(&self, resp: &RangeResponse) { + info!("Deleting {} TableGlobalKeys", resp.kvs.len()); + let req = BatchDeleteRequest { + keys: resp.kvs.iter().map(|kv| kv.key().to_vec()).collect(), + prev_kv: false, + }; + if self.dryrun { + info!("Dryrun: do nothing"); + } else { + self.etcd_store.batch_delete(req).await.unwrap(); + } + } + async fn split_table_global_value(&self, key: &str, value: TableGlobalValue) { let table_id = value.table_id(); let region_distribution: RegionDistribution = value.regions_id_map.into_iter().collect();