feat: limit total rows copied in COPY TABLE FROM with LIMIT segment (#3910)

* feat: limit total rows copied in COPY TABLE FROM with LIMIT segment

* fmt

* disable default limit

* fix: check parse

* fix test, add error case

* fix: forbide LIMIT in database

* fix: only support LIMIT segment

* fix: simplify

* fix

* fix

* fix

* fix

* fix: test

* fix: change error info

* fix clippy

* fix: fix error msg

* fix test

* fix: test error info
This commit is contained in:
irenjj
2024-05-16 21:39:26 +08:00
committed by GitHub
parent 669a6d84e9
commit f93b5b19f0
10 changed files with 73 additions and 23 deletions

View File

@@ -321,6 +321,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
connection,
with,
table_name,
limit,
..
} = match stmt {
CopyTable::To(arg) => arg,
@@ -347,6 +348,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
pattern,
direction,
timestamp_range,
limit,
})
}

View File

@@ -90,6 +90,7 @@ impl StatementExecutor {
pattern: None,
direction: CopyDirection::Export,
timestamp_range: req.time_range,
limit: None,
},
ctx.clone(),
)
@@ -155,6 +156,7 @@ impl StatementExecutor {
pattern: None,
direction: CopyDirection::Import,
timestamp_range: None,
limit: None,
};
debug!("Copy table, arg: {:?}", req);
match self.copy_table_from(req, ctx.clone()).await {

View File

@@ -52,8 +52,6 @@ use crate::statement::StatementExecutor;
const DEFAULT_BATCH_SIZE: usize = 8192;
const DEFAULT_READ_BUFFER: usize = 256 * 1024;
const MAX_INSERT_ROWS: &str = "max_insert_rows";
const DEFAULT_MAX_INSERT_ROWS: usize = 1000;
enum FileMetadata {
Parquet {
@@ -379,11 +377,7 @@ impl StatementExecutor {
let mut rows_inserted = 0;
let mut insert_cost = 0;
let max_insert_rows = req
.with
.get(MAX_INSERT_ROWS)
.and_then(|val| val.parse::<usize>().ok())
.unwrap_or(DEFAULT_MAX_INSERT_ROWS);
let max_insert_rows = req.limit.map(|n| n as usize);
for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
{
let mut stream = self
@@ -435,8 +429,10 @@ impl StatementExecutor {
insert_cost += cost;
}
if rows_inserted >= max_insert_rows {
return Ok(gen_insert_output(rows_inserted, insert_cost));
if let Some(max_insert_rows) = max_insert_rows {
if rows_inserted >= max_insert_rows {
return Ok(gen_insert_output(rows_inserted, insert_cost));
}
}
}

View File

@@ -56,7 +56,14 @@ impl<'a> ParserContext<'a> {
})?;
let req = if self.parser.parse_keyword(Keyword::TO) {
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, limit) = self.parse_copy_parameters()?;
if limit.is_some() {
return error::InvalidSqlSnafu {
msg: "limit is not supported",
}
.fail();
}
let argument = CopyDatabaseArgument {
database_name,
with: with.into(),
@@ -68,7 +75,14 @@ impl<'a> ParserContext<'a> {
self.parser
.expect_keyword(Keyword::FROM)
.context(error::SyntaxSnafu)?;
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, limit) = self.parse_copy_parameters()?;
if limit.is_some() {
return error::InvalidSqlSnafu {
msg: "limit is not supported",
}
.fail();
}
let argument = CopyDatabaseArgument {
database_name,
with: with.into(),
@@ -91,28 +105,30 @@ impl<'a> ParserContext<'a> {
let table_name = Self::canonicalize_object_name(raw_table_name);
if self.parser.parse_keyword(Keyword::TO) {
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, limit) = self.parse_copy_parameters()?;
Ok(CopyTable::To(CopyTableArgument {
table_name,
with: with.into(),
connection: connection.into(),
location,
limit,
}))
} else {
self.parser
.expect_keyword(Keyword::FROM)
.context(error::SyntaxSnafu)?;
let (with, connection, location) = self.parse_copy_parameters()?;
let (with, connection, location, limit) = self.parse_copy_parameters()?;
Ok(CopyTable::From(CopyTableArgument {
table_name,
with: with.into(),
connection: connection.into(),
location,
limit,
}))
}
}
fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String)> {
fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String, Option<u64>)> {
let location =
self.parser
.parse_literal_string()
@@ -142,7 +158,21 @@ impl<'a> ParserContext<'a> {
.map(parse_option_string)
.collect::<Result<Connection>>()?;
Ok((with, connection, location))
let limit = if self.parser.parse_keyword(Keyword::LIMIT) {
Some(
self.parser
.parse_literal_uint()
.with_context(|_| error::UnexpectedSnafu {
sql: self.sql,
expected: "the number of maximum rows",
actual: self.peek_token_as_string(),
})?,
)
} else {
None
};
Ok((with, connection, location, limit))
}
}

View File

@@ -111,6 +111,7 @@ pub struct CopyTableArgument {
pub connection: OptionMap,
/// Copy tbl [To|From] 'location'.
pub location: String,
pub limit: Option<u64>,
}
#[cfg(test)]

View File

@@ -228,6 +228,7 @@ pub struct CopyTableRequest {
pub pattern: Option<String>,
pub direction: CopyDirection,
pub timestamp_range: Option<TimestampRange>,
pub limit: Option<u64>,
}
#[derive(Debug, Clone, Default)]

View File

@@ -52,6 +52,14 @@ SELECT * FROM demo ORDER BY ts;
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
+-------+------+--------+---------------------+
DELETE FROM demo;
Affected Rows: 1
COPY DATABASE public FROM '/tmp/demo/export/parquet_range/' LIMIT 2;
Error: 2000(InvalidSyntax), Invalid SQL, error: limit is not supported
DROP TABLE demo;
Affected Rows: 0

View File

@@ -20,4 +20,8 @@ COPY DATABASE public FROM '/tmp/demo/export/parquet_range/';
SELECT * FROM demo ORDER BY ts;
DELETE FROM demo;
COPY DATABASE public FROM '/tmp/demo/export/parquet_range/' LIMIT 2;
DROP TABLE demo;

View File

@@ -93,15 +93,15 @@ select count(*) from without_limit_rows;
| 4 |
+----------+
CREATE TABLE with_limit_rows(host string, cpu double, memory double, ts timestamp time index);
CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index);
Affected Rows: 0
Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROWS = 2);
Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2;
Affected Rows: 2
select count(*) from with_limit_rows;
select count(*) from with_limit_rows_segment;
+----------+
| COUNT(*) |
@@ -109,6 +109,10 @@ select count(*) from with_limit_rows;
| 2 |
+----------+
Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello;
Error: 2000(InvalidSyntax), Unexpected token while parsing SQL statement: Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello;, expected: 'the number of maximum rows', found: ;: sql parser error: Expected literal int, found: hello at Line: 1, Column 75
drop table demo;
Affected Rows: 0
@@ -133,7 +137,7 @@ drop table without_limit_rows;
Affected Rows: 0
drop table with_limit_rows;
drop table with_limit_rows_segment;
Affected Rows: 0

View File

@@ -34,11 +34,13 @@ Copy without_limit_rows FROM '/tmp/demo/export/parquet_files/';
select count(*) from without_limit_rows;
CREATE TABLE with_limit_rows(host string, cpu double, memory double, ts timestamp time index);
CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index);
Copy with_limit_rows FROM '/tmp/demo/export/parquet_files/' WITH (MAX_INSERT_ROWS = 2);
Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2;
select count(*) from with_limit_rows;
select count(*) from with_limit_rows_segment;
Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello;
drop table demo;
@@ -52,4 +54,4 @@ drop table with_pattern;
drop table without_limit_rows;
drop table with_limit_rows;
drop table with_limit_rows_segment;