Initial Commit
This commit is contained in:
@@ -0,0 +1,302 @@
|
||||
# dialects/mysql/mysqlconnector.py
|
||||
# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
|
||||
# <see AUTHORS file>
|
||||
#
|
||||
# This module is part of SQLAlchemy and is released under
|
||||
# the MIT License: https://www.opensource.org/licenses/mit-license.php
|
||||
|
||||
|
||||
r"""
|
||||
.. dialect:: mysql+mysqlconnector
|
||||
:name: MySQL Connector/Python
|
||||
:dbapi: myconnpy
|
||||
:connectstring: mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
|
||||
:url: https://pypi.org/project/mysql-connector-python/
|
||||
|
||||
Driver Status
|
||||
-------------
|
||||
|
||||
MySQL Connector/Python is supported as of SQLAlchemy 2.0.39 to the
|
||||
degree which the driver is functional. There are still ongoing issues
|
||||
with features such as server side cursors which remain disabled until
|
||||
upstream issues are repaired.
|
||||
|
||||
.. warning:: The MySQL Connector/Python driver published by Oracle is subject
|
||||
to frequent, major regressions of essential functionality such as being able
|
||||
to correctly persist simple binary strings which indicate it is not well
|
||||
tested. The SQLAlchemy project is not able to maintain this dialect fully as
|
||||
regressions in the driver prevent it from being included in continuous
|
||||
integration.
|
||||
|
||||
.. versionchanged:: 2.0.39
|
||||
|
||||
The MySQL Connector/Python dialect has been updated to support the
|
||||
latest version of this DBAPI. Previously, MySQL Connector/Python
|
||||
was not fully supported. However, support remains limited due to ongoing
|
||||
regressions introduced in this driver.
|
||||
|
||||
Connecting to MariaDB with MySQL Connector/Python
|
||||
--------------------------------------------------
|
||||
|
||||
MySQL Connector/Python may attempt to pass an incompatible collation to the
|
||||
database when connecting to MariaDB. Experimentation has shown that using
|
||||
``?charset=utf8mb4&collation=utfmb4_general_ci`` or similar MariaDB-compatible
|
||||
charset/collation will allow connectivity.
|
||||
|
||||
|
||||
""" # noqa
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
from typing import Optional
|
||||
from typing import Sequence
|
||||
from typing import Tuple
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Union
|
||||
|
||||
from .base import MariaDBIdentifierPreparer
|
||||
from .base import MySQLCompiler
|
||||
from .base import MySQLDialect
|
||||
from .base import MySQLExecutionContext
|
||||
from .base import MySQLIdentifierPreparer
|
||||
from .mariadb import MariaDBDialect
|
||||
from .types import BIT
|
||||
from ... import util
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
from ...engine.base import Connection
|
||||
from ...engine.cursor import CursorResult
|
||||
from ...engine.interfaces import ConnectArgsType
|
||||
from ...engine.interfaces import DBAPIConnection
|
||||
from ...engine.interfaces import DBAPICursor
|
||||
from ...engine.interfaces import DBAPIModule
|
||||
from ...engine.interfaces import IsolationLevel
|
||||
from ...engine.interfaces import PoolProxiedConnection
|
||||
from ...engine.row import Row
|
||||
from ...engine.url import URL
|
||||
from ...sql.elements import BinaryExpression
|
||||
|
||||
|
||||
class MySQLExecutionContext_mysqlconnector(MySQLExecutionContext):
|
||||
def create_server_side_cursor(self) -> DBAPICursor:
|
||||
return self._dbapi_connection.cursor(buffered=False)
|
||||
|
||||
def create_default_cursor(self) -> DBAPICursor:
|
||||
return self._dbapi_connection.cursor(buffered=True)
|
||||
|
||||
|
||||
class MySQLCompiler_mysqlconnector(MySQLCompiler):
|
||||
def visit_mod_binary(
|
||||
self, binary: BinaryExpression[Any], operator: Any, **kw: Any
|
||||
) -> str:
|
||||
return (
|
||||
self.process(binary.left, **kw)
|
||||
+ " % "
|
||||
+ self.process(binary.right, **kw)
|
||||
)
|
||||
|
||||
|
||||
class IdentifierPreparerCommon_mysqlconnector:
|
||||
@property
|
||||
def _double_percents(self) -> bool:
|
||||
return False
|
||||
|
||||
@_double_percents.setter
|
||||
def _double_percents(self, value: Any) -> None:
|
||||
pass
|
||||
|
||||
def _escape_identifier(self, value: str) -> str:
|
||||
value = value.replace(
|
||||
self.escape_quote, # type:ignore[attr-defined]
|
||||
self.escape_to_quote, # type:ignore[attr-defined]
|
||||
)
|
||||
return value
|
||||
|
||||
|
||||
class MySQLIdentifierPreparer_mysqlconnector(
|
||||
IdentifierPreparerCommon_mysqlconnector, MySQLIdentifierPreparer
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class MariaDBIdentifierPreparer_mysqlconnector(
|
||||
IdentifierPreparerCommon_mysqlconnector, MariaDBIdentifierPreparer
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class _myconnpyBIT(BIT):
|
||||
def result_processor(self, dialect: Any, coltype: Any) -> None:
|
||||
"""MySQL-connector already converts mysql bits, so."""
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class MySQLDialect_mysqlconnector(MySQLDialect):
|
||||
driver = "mysqlconnector"
|
||||
supports_statement_cache = True
|
||||
|
||||
supports_sane_rowcount = True
|
||||
supports_sane_multi_rowcount = True
|
||||
|
||||
supports_native_decimal = True
|
||||
|
||||
supports_native_bit = True
|
||||
|
||||
# not until https://bugs.mysql.com/bug.php?id=117548
|
||||
supports_server_side_cursors = False
|
||||
|
||||
default_paramstyle = "format"
|
||||
statement_compiler = MySQLCompiler_mysqlconnector
|
||||
|
||||
execution_ctx_cls = MySQLExecutionContext_mysqlconnector
|
||||
|
||||
preparer: type[MySQLIdentifierPreparer] = (
|
||||
MySQLIdentifierPreparer_mysqlconnector
|
||||
)
|
||||
|
||||
colspecs = util.update_copy(MySQLDialect.colspecs, {BIT: _myconnpyBIT})
|
||||
|
||||
@classmethod
|
||||
def import_dbapi(cls) -> DBAPIModule:
|
||||
return cast("DBAPIModule", __import__("mysql.connector").connector)
|
||||
|
||||
def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
|
||||
dbapi_connection.ping(False)
|
||||
return True
|
||||
|
||||
def create_connect_args(self, url: URL) -> ConnectArgsType:
|
||||
opts = url.translate_connect_args(username="user")
|
||||
|
||||
opts.update(url.query)
|
||||
|
||||
util.coerce_kw_type(opts, "allow_local_infile", bool)
|
||||
util.coerce_kw_type(opts, "autocommit", bool)
|
||||
util.coerce_kw_type(opts, "buffered", bool)
|
||||
util.coerce_kw_type(opts, "client_flag", int)
|
||||
util.coerce_kw_type(opts, "compress", bool)
|
||||
util.coerce_kw_type(opts, "connection_timeout", int)
|
||||
util.coerce_kw_type(opts, "connect_timeout", int)
|
||||
util.coerce_kw_type(opts, "consume_results", bool)
|
||||
util.coerce_kw_type(opts, "force_ipv6", bool)
|
||||
util.coerce_kw_type(opts, "get_warnings", bool)
|
||||
util.coerce_kw_type(opts, "pool_reset_session", bool)
|
||||
util.coerce_kw_type(opts, "pool_size", int)
|
||||
util.coerce_kw_type(opts, "raise_on_warnings", bool)
|
||||
util.coerce_kw_type(opts, "raw", bool)
|
||||
util.coerce_kw_type(opts, "ssl_verify_cert", bool)
|
||||
util.coerce_kw_type(opts, "use_pure", bool)
|
||||
util.coerce_kw_type(opts, "use_unicode", bool)
|
||||
|
||||
# note that "buffered" is set to False by default in MySQL/connector
|
||||
# python. If you set it to True, then there is no way to get a server
|
||||
# side cursor because the logic is written to disallow that.
|
||||
|
||||
# leaving this at True until
|
||||
# https://bugs.mysql.com/bug.php?id=117548 can be fixed
|
||||
opts["buffered"] = True
|
||||
|
||||
# FOUND_ROWS must be set in ClientFlag to enable
|
||||
# supports_sane_rowcount.
|
||||
if self.dbapi is not None:
|
||||
try:
|
||||
from mysql.connector import constants # type: ignore
|
||||
|
||||
ClientFlag = constants.ClientFlag
|
||||
|
||||
client_flags = opts.get(
|
||||
"client_flags", ClientFlag.get_default()
|
||||
)
|
||||
client_flags |= ClientFlag.FOUND_ROWS
|
||||
opts["client_flags"] = client_flags
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return [], opts
|
||||
|
||||
@util.memoized_property
|
||||
def _mysqlconnector_version_info(self) -> Optional[Tuple[int, ...]]:
|
||||
if self.dbapi and hasattr(self.dbapi, "__version__"):
|
||||
m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
|
||||
if m:
|
||||
return tuple(int(x) for x in m.group(1, 2, 3) if x is not None)
|
||||
return None
|
||||
|
||||
def _detect_charset(self, connection: Connection) -> str:
|
||||
return connection.connection.charset # type: ignore
|
||||
|
||||
def _extract_error_code(self, exception: BaseException) -> int:
|
||||
return exception.errno # type: ignore
|
||||
|
||||
def is_disconnect(
|
||||
self,
|
||||
e: Exception,
|
||||
connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
|
||||
cursor: Optional[DBAPICursor],
|
||||
) -> bool:
|
||||
errnos = (2006, 2013, 2014, 2045, 2055, 2048)
|
||||
exceptions = (
|
||||
self.loaded_dbapi.OperationalError, #
|
||||
self.loaded_dbapi.InterfaceError,
|
||||
self.loaded_dbapi.ProgrammingError,
|
||||
)
|
||||
if isinstance(e, exceptions):
|
||||
return (
|
||||
e.errno in errnos
|
||||
or "MySQL Connection not available." in str(e)
|
||||
or "Connection to MySQL is not available" in str(e)
|
||||
)
|
||||
else:
|
||||
return False
|
||||
|
||||
def _compat_fetchall(
|
||||
self,
|
||||
rp: CursorResult[Tuple[Any, ...]],
|
||||
charset: Optional[str] = None,
|
||||
) -> Sequence[Row[Tuple[Any, ...]]]:
|
||||
return rp.fetchall()
|
||||
|
||||
def _compat_fetchone(
|
||||
self,
|
||||
rp: CursorResult[Tuple[Any, ...]],
|
||||
charset: Optional[str] = None,
|
||||
) -> Optional[Row[Tuple[Any, ...]]]:
|
||||
return rp.fetchone()
|
||||
|
||||
def get_isolation_level_values(
|
||||
self, dbapi_conn: DBAPIConnection
|
||||
) -> Sequence[IsolationLevel]:
|
||||
return (
|
||||
"SERIALIZABLE",
|
||||
"READ UNCOMMITTED",
|
||||
"READ COMMITTED",
|
||||
"REPEATABLE READ",
|
||||
"AUTOCOMMIT",
|
||||
)
|
||||
|
||||
def detect_autocommit_setting(self, dbapi_conn: DBAPIConnection) -> bool:
|
||||
return bool(dbapi_conn.autocommit)
|
||||
|
||||
def set_isolation_level(
|
||||
self, dbapi_connection: DBAPIConnection, level: IsolationLevel
|
||||
) -> None:
|
||||
if level == "AUTOCOMMIT":
|
||||
dbapi_connection.autocommit = True
|
||||
else:
|
||||
dbapi_connection.autocommit = False
|
||||
super().set_isolation_level(dbapi_connection, level)
|
||||
|
||||
|
||||
class MariaDBDialect_mysqlconnector(
|
||||
MariaDBDialect, MySQLDialect_mysqlconnector
|
||||
):
|
||||
supports_statement_cache = True
|
||||
_allows_uuid_binds = False
|
||||
preparer = MariaDBIdentifierPreparer_mysqlconnector
|
||||
|
||||
|
||||
dialect = MySQLDialect_mysqlconnector
|
||||
mariadb_dialect = MariaDBDialect_mysqlconnector
|
||||
Reference in New Issue
Block a user