-
Notifications
You must be signed in to change notification settings - Fork 689
Expand file tree
/
Copy pathsql_utils.py
More file actions
554 lines (450 loc) · 19.7 KB
/
sql_utils.py
File metadata and controls
554 lines (450 loc) · 19.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
from __future__ import annotations
import re
from typing import Any, Generator, Literal
import sqlglot
import sqlglot.tokens
import sqlparse
from sqlparse.sql import Function, Identifier, IdentifierList, Token, TokenList
from sqlparse.tokens import DML, Keyword, Punctuation
sqlparse.engine.grouping.MAX_GROUPING_DEPTH = None # type: ignore[assignment]
sqlparse.engine.grouping.MAX_GROUPING_TOKENS = None # type: ignore[assignment]
cleanup_regex: dict[str, re.Pattern] = {
# This matches only alphanumerics and underscores.
"alphanum_underscore": re.compile(r"(\w+)$"),
# This matches everything except spaces, parens, colon, and comma
"many_punctuations": re.compile(r"([^():,\s]+)$"),
# This matches everything except spaces, parens, colon, comma, and period
"most_punctuations": re.compile(r"([^\.():,\s]+)$"),
# This matches everything except a space.
"all_punctuations": re.compile(r"([^\s]+)$"),
}
def last_word(
text: str,
include: Literal[
'alphanum_underscore',
'many_punctuations',
'most_punctuations',
'all_punctuations',
] = 'alphanum_underscore',
) -> str:
r"""
Find the last word in a sentence.
>>> last_word('abc')
'abc'
>>> last_word(' abc')
'abc'
>>> last_word('')
''
>>> last_word(' ')
''
>>> last_word('abc ')
''
>>> last_word('abc def')
'def'
>>> last_word('abc def ')
''
>>> last_word('abc def;')
''
>>> last_word('bac $def')
'def'
>>> last_word('bac $def', include='most_punctuations')
'$def'
>>> last_word('bac \def', include='most_punctuations')
'\\\\def'
>>> last_word('bac \def;', include='most_punctuations')
'\\\\def;'
>>> last_word('bac::def', include='most_punctuations')
'def'
"""
if not text: # Empty string
return ""
if text[-1].isspace():
return ""
else:
regex = cleanup_regex[include]
matches = regex.search(text)
if matches:
return matches.group(0)
else:
return ""
# This code is borrowed from sqlparse example script.
# <url>
def is_subselect(parsed: TokenList) -> bool:
if not parsed.is_group:
return False
for item in parsed.tokens:
if item.ttype is DML and item.value.upper() in ("SELECT", "INSERT", "UPDATE", "CREATE", "DELETE"):
return True
return False
def get_last_select(parsed: TokenList) -> TokenList:
"""
Takes a parsed sql statement and returns the last select query where applicable.
The intended use case is for when giving table suggestions based on columns, where
we only want to look at the columns from the most recent select. This works for a single
select query, or one or more sub queries (the useful part).
The custom logic is necessary because the typical sqlparse logic for things like finding
sub selects (i.e. is_subselect) only works on complete statements, such as:
* select c1 from t1;
However when suggesting tables based on columns, we only have partial select statements, i.e.:
* select c1
* select c1 from (select c2)
So given the above, we must parse them ourselves as they are not viewed as complete statements.
Returns a TokenList of the last select statement's tokens.
"""
select_indexes: list[int] = []
for token in parsed:
if token.match(DML, "select"): # match is case insensitive
select_indexes.append(parsed.token_index(token))
last_select = TokenList()
if select_indexes:
last_select = TokenList(parsed[select_indexes[-1] :])
return last_select
def extract_from_part(parsed: TokenList, stop_at_punctuation: bool = True) -> Generator[Any, None, None]:
tbl_prefix_seen = False
for item in parsed.tokens:
if tbl_prefix_seen:
if is_subselect(item):
yield from extract_from_part(item, stop_at_punctuation)
elif stop_at_punctuation and item.ttype is Punctuation:
return None
# Multiple JOINs in the same query won't work properly since
# "ON" is a keyword and will trigger the next elif condition.
# So instead of stooping the loop when finding an "ON" skip it
# eg: 'SELECT * FROM abc JOIN def ON abc.id = def.abc_id JOIN ghi'
elif item.ttype is Keyword and item.value.upper() == "ON":
tbl_prefix_seen = False
continue
# An incomplete nested select won't be recognized correctly as a
# sub-select. eg: 'SELECT * FROM (SELECT id FROM user'. This causes
# the second FROM to trigger this elif condition resulting in a
# StopIteration. So we need to ignore the keyword if the keyword
# FROM.
# Also 'SELECT * FROM abc JOIN def' will trigger this elif
# condition. So we need to ignore the keyword JOIN and its variants
# INNER JOIN, FULL OUTER JOIN, etc.
elif item.ttype is Keyword and (not item.value.upper() == "FROM") and (not item.value.upper().endswith("JOIN")):
return None
else:
yield item
elif (item.ttype is Keyword or item.ttype is Keyword.DML) and item.value.upper() in (
"COPY",
"FROM",
"INTO",
"UPDATE",
"TABLE",
"JOIN",
):
tbl_prefix_seen = True
# 'SELECT a, FROM abc' will detect FROM as part of the column list.
# So this check here is necessary.
elif isinstance(item, IdentifierList):
for identifier in item.get_identifiers():
if identifier.ttype is Keyword and identifier.value.upper() == "FROM":
tbl_prefix_seen = True
break
def extract_table_identifiers(token_stream: Generator[Any, None, None]) -> Generator[tuple[str | None, str, str], None, None]:
"""yields tuples of (schema_name, table_name, table_alias)"""
for item in token_stream:
if isinstance(item, IdentifierList):
for identifier in item.get_identifiers():
# Sometimes Keywords (such as FROM ) are classified as
# identifiers which don't have the get_real_name() method.
try:
schema_name = identifier.get_parent_name()
real_name = identifier.get_real_name()
except AttributeError:
continue
if real_name:
yield (schema_name, real_name, identifier.get_alias())
elif isinstance(item, Identifier):
real_name = item.get_real_name()
schema_name = item.get_parent_name()
if real_name:
yield (schema_name, real_name, item.get_alias())
else:
name = item.get_name()
yield (None, name, item.get_alias() or name)
elif isinstance(item, Function):
yield (None, item.get_name(), item.get_name())
# extract_tables is inspired from examples in the sqlparse lib.
def extract_tables(sql: str) -> list[tuple[str | None, str, str]]:
"""Extract the table names from an SQL statement.
Returns a list of (schema, table, alias) tuples
"""
parsed = sqlparse.parse(sql)
if not parsed:
return []
# INSERT statements must stop looking for tables at the sign of first
# Punctuation. eg: INSERT INTO abc (col1, col2) VALUES (1, 2)
# abc is the table name, but if we don't stop at the first lparen, then
# we'll identify abc, col1 and col2 as table names.
insert_stmt = parsed[0].token_first().value.lower() == "insert"
stream = extract_from_part(parsed[0], stop_at_punctuation=insert_stmt)
return list(extract_table_identifiers(stream))
def extract_columns_from_select(sql: str) -> list[str]:
"""
Extract the column names from a select SQL statement.
Returns a list of columns.
"""
parsed = sqlparse.parse(sql)
if not parsed:
return []
statement = get_last_select(parsed[0])
# if there is no select, skip checking for columns
if not statement:
return []
columns = []
# Loops through the tokens (pieces) of the SQL statement.
# Once it finds the SELECT token (generally first), it
# will then start looking for columns from that point on.
# The get_real_name() function returns the real column name
# even if an alias is used.
found_select = False
for token in statement.tokens:
if token.ttype is DML and token.value.upper() == 'SELECT':
found_select = True
elif found_select:
if isinstance(token, IdentifierList):
# multiple columns
for identifier in token.get_identifiers():
if isinstance(identifier, Identifier):
column = identifier.get_real_name()
elif isinstance(identifier, Token):
column = identifier.value
else:
continue
columns.append(column)
elif isinstance(token, Identifier):
# single column
column = token.get_real_name()
columns.append(column)
elif token.ttype is Keyword:
break
if columns:
break
return columns
def extract_tables_from_complete_statements(sql: str) -> list[tuple[str | None, str, str | None]]:
"""Extract the table names from a complete and valid series of SQL
statements.
Returns a list of (schema, table, alias) tuples
"""
# sqlglot chokes entirely on things like "\T" that it doesn't know about,
# but is much better at extracting table names from complete statements.
# sqlparse can extract the series of statements, though it also doesn't
# understand "\T".
roughly_parsed = sqlparse.parse(sql)
if not roughly_parsed:
return []
finely_parsed = []
for rough_statement in roughly_parsed:
try:
finely_parsed.append(sqlglot.parse_one(str(rough_statement), read='mysql'))
except sqlglot.errors.ParseError:
pass
tables = []
for fine_statement in finely_parsed:
for identifier in fine_statement.find_all(sqlglot.exp.Table):
if identifier.parent_select and identifier.parent_select.sql().startswith('WITH'):
continue
tables.append((
None if identifier.db == '' else identifier.db,
identifier.name,
None if identifier.alias == '' else identifier.alias,
))
return tables
def find_prev_keyword(sql: str) -> tuple[Token | None, str]:
"""Find the last sql keyword in an SQL statement
Returns the value of the last keyword, and the text of the query with
everything after the last keyword stripped
"""
if not sql.strip():
return None, ""
parsed = sqlparse.parse(sql)[0]
flattened = list(parsed.flatten())
logical_operators = ("AND", "OR", "NOT", "BETWEEN")
for t in reversed(flattened):
if t.value == "(" or (t.is_keyword and (t.value.upper() not in logical_operators)):
# Find the location of token t in the original parsed statement
# We can't use parsed.token_index(t) because t may be a child token
# inside a TokenList, in which case token_index thows an error
# Minimal example:
# p = sqlparse.parse('select * from foo where bar')
# t = list(p.flatten())[-3] # The "Where" token
# p.token_index(t) # Throws ValueError: not in list
idx = flattened.index(t)
# Combine the string values of all tokens in the original list
# up to and including the target keyword token t, to produce a
# query string with everything after the keyword token removed
text = "".join(tok.value for tok in flattened[: idx + 1])
return t, text
return None, ""
def query_starts_with(query: str, prefixes: list[str]) -> bool:
"""Check if the query starts with any item from *prefixes*."""
prefixes = [prefix.lower() for prefix in prefixes]
formatted_sql = sqlparse.format(query.lower(), strip_comments=True)
return bool(formatted_sql) and formatted_sql.split()[0] in prefixes
def queries_start_with(queries: str, prefixes: list[str]) -> bool:
"""Check if any queries start with any item from *prefixes*."""
for query in sqlparse.split(queries):
if query and query_starts_with(query, prefixes) is True:
return True
return False
def query_has_where_clause(query: str) -> bool:
"""Check if the query contains a where-clause."""
return any(isinstance(token, sqlparse.sql.Where) for token_list in sqlparse.parse(query) for token in token_list)
# todo: handle "UPDATE LOW_PRIORITY" and "UPDATE IGNORE"
def query_is_single_table_update(query: str) -> bool:
"""Check if a query is a simple single-table UPDATE."""
cleaned_query_for_parsing_only = sqlparse.format(query, strip_comments=True)
cleaned_query_for_parsing_only = re.sub(r'\s+', ' ', cleaned_query_for_parsing_only)
if not cleaned_query_for_parsing_only:
return False
parsed = sqlparse.parse(cleaned_query_for_parsing_only)
if not parsed:
return False
statement = parsed[0]
try:
retval = bool(
statement[0].value.lower() == 'update'
and statement[1].is_whitespace
and ',' not in statement[2].value # multiple tables
and statement[3].is_whitespace
and statement[4].value.lower() == 'set'
)
except IndexError:
retval = False
return retval
def is_destructive(keywords: list[str], queries: str) -> bool:
"""Returns True if any of the queries in *queries* is destructive."""
for query in sqlparse.split(queries):
if not query:
continue
# subtle: if "UPDATE" is one of our keywords AND "query" starts with "UPDATE"
if query_starts_with(query, keywords) and query_starts_with(query, ["update"]):
if query_has_where_clause(query) and query_is_single_table_update(query):
return False
else:
return True
if query_starts_with(query, keywords):
return True
return False
def is_dropping_database(queries: str, dbname: str | None) -> bool:
"""Determine if the query is dropping a specific database."""
result = False
if dbname is None:
return False
def normalize_db_name(db: str) -> str:
return db.lower().strip('`"')
dbname = normalize_db_name(dbname)
for query in sqlparse.parse(queries):
keywords = [t for t in query.tokens if t.is_keyword]
if len(keywords) < 2:
continue
if keywords[0].normalized in ("DROP", "CREATE") and keywords[1].value.lower() in (
"database",
"schema",
):
database_token = next((t for t in query.tokens if isinstance(t, Identifier)), None)
if database_token is not None and normalize_db_name(database_token.get_name()) == dbname:
result = keywords[0].normalized == "DROP"
return result
def need_completion_refresh(queries: str) -> bool:
"""Determines if the completion needs a refresh by checking if the sql
statement is an alter, create, drop or change db."""
for query in sqlparse.split(queries):
try:
first_token = query.split()[0]
if first_token.lower() in ("alter", "create", "use", "\\r", "\\u", "connect", "drop", "rename"):
return True
except Exception:
continue
return False
def need_completion_reset(queries: str) -> bool:
"""Determines if the statement is a database switch such as 'use' or '\\u'.
When a database is changed the existing completions must be reset before we
start the completion refresh for the new database.
"""
for query in sqlparse.split(queries):
try:
tokens = query.split()
first_token = tokens[0]
if first_token.lower() in ("use", "\\u"):
return True
if first_token.lower() in ("\\r", "connect") and len(tokens) > 1:
return True
except Exception:
continue
return False
def is_mutating(status_plain: str | None) -> bool:
"""Determines if the statement is mutating based on the status."""
if not status_plain:
return False
mutating = {"insert", "update", "delete", "alter", "create", "drop", "replace", "truncate", "load", "rename"}
return status_plain.split(None, 1)[0].lower() in mutating
def is_select(status_plain: str | None) -> bool:
"""Returns true if the first word in status is 'select'."""
if not status_plain:
return False
return status_plain.split(None, 1)[0].lower() == "select"
def classify_sandbox_statement(text: str) -> tuple[str | None, str | None]:
"""Classify a SQL statement for sandbox mode and extract the new password.
Returns (statement_type, new_password) where statement_type is one of:
- 'alter_user' — ALTER USER ... IDENTIFIED BY ...
- 'set_password' — SET PASSWORD [FOR ...] = ...
- 'quit' — quit, exit, \\q
- None — not allowed in sandbox mode
"""
stripped = text.strip()
if not stripped:
return ('quit', None)
tokens = list(sqlglot.tokenize(stripped, dialect='mysql'))
if not tokens:
return ('quit', None)
types = [t.token_type for t in tokens]
texts = [t.text.upper() for t in tokens]
tt = sqlglot.tokens.TokenType
# quit, exit
if len(tokens) == 1 and types[0] == tt.VAR and texts[0] in ('QUIT', 'EXIT'):
return ('quit', None)
# \q
if len(tokens) == 2 and types[0] == tt.BACKSLASH and texts[1] == 'Q':
return ('quit', None)
# ALTER USER ...
if len(tokens) >= 2 and types[0] == tt.ALTER and texts[1] == 'USER':
pw = _find_password_after_by(tokens)
return ('alter_user', pw)
# SET PASSWORD ...
if len(tokens) >= 2 and types[0] == tt.SET and texts[1] == 'PASSWORD':
pw = _find_password_after_eq(tokens)
return ('set_password', pw)
return (None, None)
def _find_password_after_by(tokens: list[sqlglot.tokens.Token]) -> str | None:
"""Find a password literal following a BY token (for ALTER USER ... IDENTIFIED BY 'pw')."""
tt = sqlglot.tokens.TokenType
for i, tok in enumerate(tokens):
if tok.token_type == tt.VAR and tok.text.upper() == 'BY' and i + 1 < len(tokens):
next_tok = tokens[i + 1]
if next_tok.token_type == tt.STRING:
return next_tok.text
return None
def _find_password_after_eq(tokens: list[sqlglot.tokens.Token]) -> str | None:
"""Find a password literal following an = token (for SET PASSWORD = 'pw')."""
tt = sqlglot.tokens.TokenType
for i, tok in enumerate(tokens):
if tok.token_type == tt.EQ and i + 1 < len(tokens):
next_tok = tokens[i + 1]
if next_tok.token_type == tt.STRING:
return next_tok.text
return None
def is_sandbox_allowed(text: str) -> bool:
"""Return True if the command is allowed in expired-password sandbox mode."""
stmt_type, _ = classify_sandbox_statement(text)
return stmt_type is not None
def is_password_change(text: str) -> bool:
"""Return True if the command is a password change statement."""
stmt_type, _ = classify_sandbox_statement(text)
return stmt_type in ('alter_user', 'set_password')
def extract_new_password(text: str) -> str | None:
"""Extract the new password from an ALTER USER or SET PASSWORD statement."""
_, password = classify_sandbox_statement(text)
return password