-
Notifications
You must be signed in to change notification settings - Fork 14
[PECOBLR-1655] Implemented the functionality of pool_pre_ping #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a6a3564
d2a3339
77138f3
1e47cb2
c3ae658
6936842
0145f62
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -336,6 +336,59 @@ def do_rollback(self, dbapi_connection): | |
| # Databricks SQL Does not support transactions | ||
| pass | ||
|
|
||
| def is_disconnect(self, e, connection, cursor): | ||
| """Determine if an exception indicates the connection was lost. | ||
|
|
||
| This method is called by SQLAlchemy after exceptions occur during query | ||
| execution to determine if the error was due to a lost connection. If this | ||
| returns True, SQLAlchemy will invalidate the connection and create a new | ||
| one for the next operation. | ||
|
|
||
| This method is also used by SQLAlchemy's default do_ping() implementation | ||
| when pool_pre_ping=True. If do_ping() encounters an exception, it calls | ||
| is_disconnect() to classify the error and determine whether to invalidate | ||
| the connection. | ||
|
|
||
| Args: | ||
| e: The exception that was raised | ||
| connection: The connection that raised the exception (may be None) | ||
| cursor: The cursor that raised the exception (may be None) | ||
|
|
||
| Returns: | ||
| True if the error indicates a disconnect, False otherwise | ||
| """ | ||
| from databricks.sql.exc import ( | ||
| Error, | ||
| InterfaceError, | ||
| DatabaseError, | ||
| RequestError, | ||
| ) | ||
|
|
||
| error_msg = str(e).lower() | ||
|
|
||
| # InterfaceError: closed connection/cursor errors from client.py | ||
| # All raised when self.open is False: | ||
| if isinstance(e, InterfaceError): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. InterfaceError can also be raised for programming errors like invalid params. Will this be an expected behavior then?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's not the case with the current connector. All the interface errors are raised when connection is closed. |
||
| return "closed" in error_msg | ||
|
|
||
| # RequestError (subclass of DatabaseError via OperationalError): | ||
| # transport/network-level errors indicating connection is unusable. | ||
| # Check before DatabaseError since RequestError is a subclass. | ||
| if isinstance(e, RequestError): | ||
| return True | ||
|
|
||
| # DatabaseError: server-side errors indicating session/operation gone | ||
| if isinstance(e, DatabaseError): | ||
| return ("invalid" in error_msg and "handle" in error_msg) or ( | ||
| "unexpectedly closed server side" in error_msg | ||
| ) | ||
|
|
||
| # Base Error class: older connector versions raise Error (not InterfaceError) | ||
| if isinstance(e, Error): | ||
| return "closed connection" in error_msg or "closed cursor" in error_msg | ||
|
Comment on lines
+374
to
+388
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The string checks aren't arbitrary, they match exact error messages from specific code paths in the connector. But I agree the approach is fragile, especially for DatabaseError where the message comes from the server response. The three cases we need to handle for pool_pre_ping are:
For (3), the connector raises the same DatabaseError for both ERROR_STATUS and INVALID_HANDLE_STATUS, so string matching is the only option today. The better fix would be a dedicated exception class for INVALID_HANDLE_STATUS in the connector — then we can replace all string checks with isinstance checks. I can raise that as a follow-up in databricks-sql-python. Also the base error class check is needed for some versions of driver where interfaceError was not defined, more specifically
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am saying why to even make this so complex, pool pre ping is just a health check. Simply try to execute a query if there is an error throw true. So sqlalchemy will use a new connection.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's needed for the is_disconnect calls for mid query failures where we shouldn't be closing the connection pool for every failure type. Let's park it for now. For current mitigation, I'll override do_ping in the PR and we can look into the complete resolution later with this PR. |
||
|
|
||
| return False | ||
|
|
||
| @reflection.cache | ||
| def has_table( | ||
| self, connection, table_name, schema=None, catalog=None, **kwargs | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,151 @@ | ||
| """Tests for DatabricksDialect.is_disconnect() method.""" | ||
| import pytest | ||
| from databricks.sqlalchemy import DatabricksDialect | ||
| from databricks.sql.exc import ( | ||
| Error, | ||
| InterfaceError, | ||
| DatabaseError, | ||
| OperationalError, | ||
| RequestError, | ||
| SessionAlreadyClosedError, | ||
| CursorAlreadyClosedError, | ||
| MaxRetryDurationError, | ||
| NonRecoverableNetworkError, | ||
| UnsafeToRetryError, | ||
| ) | ||
|
|
||
|
|
||
| class TestIsDisconnect: | ||
| @pytest.fixture | ||
| def dialect(self): | ||
| return DatabricksDialect() | ||
|
|
||
| # --- InterfaceError: closed connection/cursor (client.py) --- | ||
|
|
||
| def test_interface_error_closed_connection(self, dialect): | ||
| """All InterfaceError messages with 'closed' are disconnects.""" | ||
| test_cases = [ | ||
| InterfaceError("Cannot create cursor from closed connection"), | ||
| InterfaceError("Cannot get autocommit on closed connection"), | ||
| InterfaceError("Cannot set autocommit on closed connection"), | ||
| InterfaceError("Cannot commit on closed connection"), | ||
| InterfaceError("Cannot rollback on closed connection"), | ||
| InterfaceError("Cannot get transaction isolation on closed connection"), | ||
| InterfaceError("Cannot set transaction isolation on closed connection"), | ||
| InterfaceError("Attempting operation on closed cursor"), | ||
| ] | ||
| for error in test_cases: | ||
| assert dialect.is_disconnect(error, None, None) is True | ||
|
|
||
| def test_interface_error_without_closed_not_disconnect(self, dialect): | ||
| """InterfaceError without 'closed' is not a disconnect.""" | ||
| error = InterfaceError("Some other interface error") | ||
| assert dialect.is_disconnect(error, None, None) is False | ||
|
|
||
| # --- RequestError: transport/network-level errors --- | ||
|
|
||
| def test_request_error_is_disconnect(self, dialect): | ||
| """All RequestError instances are disconnects.""" | ||
| test_cases = [ | ||
| RequestError("HTTP client is closing or has been closed"), | ||
| RequestError("Connection pool not initialized"), | ||
| RequestError("HTTP request failed: max retries exceeded"), | ||
| RequestError("HTTP request error: connection reset"), | ||
| ] | ||
| for error in test_cases: | ||
| assert dialect.is_disconnect(error, None, None) is True | ||
|
|
||
| def test_request_error_subclasses_are_disconnect(self, dialect): | ||
| """RequestError subclasses are all disconnects.""" | ||
| test_cases = [ | ||
| SessionAlreadyClosedError("Session already closed"), | ||
| CursorAlreadyClosedError("Cursor already closed"), | ||
| MaxRetryDurationError("Retry duration exceeded"), | ||
| NonRecoverableNetworkError("HTTP 501"), | ||
| UnsafeToRetryError("Unexpected HTTP error"), | ||
| ] | ||
| for error in test_cases: | ||
| assert dialect.is_disconnect(error, None, None) is True | ||
|
|
||
| # --- DatabaseError: server-side session/operation errors --- | ||
|
|
||
| def test_database_error_with_invalid_handle(self, dialect): | ||
| """DatabaseError with 'invalid handle' is a disconnect.""" | ||
| test_cases = [ | ||
| DatabaseError("Invalid SessionHandle"), | ||
| DatabaseError("[Errno INVALID_HANDLE] Session does not exist"), | ||
| DatabaseError("INVALID HANDLE"), | ||
| DatabaseError("invalid handle"), | ||
| ] | ||
| for error in test_cases: | ||
| assert dialect.is_disconnect(error, None, None) is True | ||
|
|
||
| def test_database_error_unexpectedly_closed_server_side(self, dialect): | ||
| """DatabaseError for operations closed server-side is a disconnect.""" | ||
| test_cases = [ | ||
| DatabaseError("Command abc123 unexpectedly closed server side"), | ||
| DatabaseError("Command None unexpectedly closed server side"), | ||
| ] | ||
| for error in test_cases: | ||
| assert dialect.is_disconnect(error, None, None) is True | ||
|
|
||
| def test_database_error_without_disconnect_indicators(self, dialect): | ||
| """DatabaseError without disconnect indicators is not a disconnect.""" | ||
| test_cases = [ | ||
| DatabaseError("Syntax error in SQL"), | ||
| DatabaseError("Table not found"), | ||
| DatabaseError("Permission denied"), | ||
| DatabaseError("Catalog name is required for get_schemas"), | ||
| DatabaseError("Catalog name is required for get_columns"), | ||
| ] | ||
| for error in test_cases: | ||
| assert dialect.is_disconnect(error, None, None) is False | ||
|
|
||
| # --- OperationalError (non-RequestError) --- | ||
|
|
||
| def test_operational_error_not_disconnect(self, dialect): | ||
| """OperationalError without disconnect indicators is not a disconnect.""" | ||
| test_cases = [ | ||
| OperationalError("Timeout waiting for query"), | ||
| OperationalError("Empty TColumn instance"), | ||
| OperationalError("Unsupported TRowSet instance"), | ||
| ] | ||
| for error in test_cases: | ||
| assert dialect.is_disconnect(error, None, None) is False | ||
|
|
||
| # --- Base Error class: older connector versions (client.py:385) --- | ||
|
|
||
| def test_base_error_closed_connection_is_disconnect(self, dialect): | ||
| """Base Error with 'closed connection/cursor' is a disconnect. | ||
|
|
||
| Older released versions of databricks-sql-connector raise Error | ||
| (not InterfaceError) for closed connection messages. | ||
| """ | ||
| test_cases = [ | ||
| Error("Cannot create cursor from closed connection"), | ||
| Error("Cannot get autocommit on closed connection"), | ||
| Error("Attempting operation on closed cursor"), | ||
| ] | ||
| for error in test_cases: | ||
| assert dialect.is_disconnect(error, None, None) is True | ||
|
|
||
| def test_base_error_without_closed_not_disconnect(self, dialect): | ||
| """Base Error without 'closed connection/cursor' is not a disconnect.""" | ||
| test_cases = [ | ||
| Error("Some other error"), | ||
| Error("Connection timeout"), | ||
| ] | ||
| for error in test_cases: | ||
| assert dialect.is_disconnect(error, None, None) is False | ||
|
|
||
| # --- Other exceptions --- | ||
|
|
||
| def test_other_errors_not_disconnect(self, dialect): | ||
| """Non-connector exception types are not disconnects.""" | ||
| test_cases = [ | ||
| Exception("Some random error"), | ||
| ValueError("Bad value"), | ||
| RuntimeError("Runtime failure"), | ||
| ] | ||
| for error in test_cases: | ||
| assert dialect.is_disconnect(error, None, None) is False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pyproject.toml uses 2.0.8, why is this using 2.2.1 ?