############################################################################## # # Zope Public License (ZPL) Version 1.0 # ------------------------------------- # # Copyright (c) Digital Creations. All rights reserved. # Copyright (c) Nexedi SARL 2004. All rights reserved. # # This license has been certified as Open Source(tm). # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # 1. Redistributions in source code must retain the above copyright # notice, this list of conditions, and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions, and the following disclaimer in # the documentation and/or other materials provided with the # distribution. # # 3. Digital Creations requests that attribution be given to Zope # in any manner possible. Zope includes a "Powered by Zope" # button that is installed by default. While it is not a license # violation to remove this button, it is requested that the # attribution remain. A significant investment has been put # into Zope, and this effort will continue if the Zope community # continues to grow. This is one way to assure that growth. # # 4. All advertising materials and documentation mentioning # features derived from or use of this software must display # the following acknowledgement: # # "This product includes software developed by Digital Creations # for use in the Z Object Publishing Environment # (http://www.zope.org/)." # # In the event that the product being advertised includes an # intact Zope distribution (with copyright and license included) # then this clause is waived. # # 5. Names associated with Zope or Digital Creations must not be used to # endorse or promote products derived from this software without # prior written permission from Digital Creations. # # 6. Modified redistributions of any form whatsoever must retain # the following acknowledgment: # # "This product includes software developed by Digital Creations # for use in the Z Object Publishing Environment # (http://www.zope.org/)." # # Intact (re-)distributions of any official Zope release do not # require an external acknowledgement. # # 7. Modifications are encouraged but must be packaged separately as # patches to official Zope releases. Distributions that do not # clearly separate the patches from the original work must be clearly # labeled as unofficial distributions. Modifications which do not # carry the name Zope may be packaged in any form, as long as they # conform to all of the clauses above. # # # Disclaimer # # THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY # EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT # OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # # This software consists of contributions made by Digital Creations and # many individuals on behalf of Digital Creations. Specific # attributions are listed in the accompanying credits file. # ############################################################################## import _mysql import MySQLdb from _mysql_exceptions import OperationalError, NotSupportedError MySQLdb_version_required = (0,9,2) _v = getattr(_mysql, 'version_info', (0,0,0)) if _v < MySQLdb_version_required: raise NotSupportedError, \ "ZMySQLDDA requires at least MySQLdb %s, %s found" % \ (MySQLdb_version_required, _v) from MySQLdb.converters import conversions from MySQLdb.constants import FIELD_TYPE, CR, ER, CLIENT from Shared.DC.ZRDB.TM import TM from DateTime import DateTime from ZODB.POSException import ConflictError from string import strip, split, upper, rfind from thread import get_ident, allocate_lock hosed_connection = ( CR.SERVER_GONE_ERROR, CR.SERVER_LOST ) query_syntax_error = ( ER.BAD_FIELD_ERROR, ) lock_error = ( ER.LOCK_WAIT_TIMEOUT, ER.LOCK_DEADLOCK, ) key_types = { "PRI": "PRIMARY KEY", "MUL": "INDEX", "UNI": "UNIQUE", } field_icons = "bin", "date", "datetime", "float", "int", "text", "time" icon_xlate = { "varchar": "text", "char": "text", "enum": "what", "set": "what", "double": "float", "numeric": "float", "blob": "bin", "mediumblob": "bin", "longblob": "bin", "tinytext": "text", "mediumtext": "text", "longtext": "text", "timestamp": "datetime", "decimal": "float", "smallint": "int", "mediumint": "int", "bigint": "int", } type_xlate = { "double": "float", "numeric": "float", "decimal": "float", "smallint": "int", "mediumint": "int", "bigint": "int", "int": "int", "float": "float", "timestamp": "datetime", "datetime": "datetime", "time": "datetime", } def _mysql_timestamp_converter(s): if len(s) < 14: s = s + "0"*(14-len(s)) parts = map(int, (s[:4],s[4:6],s[6:8], s[8:10],s[10:12],s[12:14])) return DateTime("%04d-%02d-%02d %02d:%02d:%02d" % tuple(parts)) def DateTime_or_None(s): try: return DateTime(s) except: return None def int_or_long(s): try: return int(s) except: return long(s) def ord_or_None(s): if s is not None: return ord(s) class ThreadedDeferredDB: """ An experimental MySQL DA which implements deferred execution of SQL code in order to reduce locks and provide better behaviour with MyISAM non transactional tables """ conv=conversions.copy() conv[FIELD_TYPE.LONG] = int_or_long conv[FIELD_TYPE.DATETIME] = DateTime_or_None conv[FIELD_TYPE.DATE] = DateTime_or_None conv[FIELD_TYPE.DECIMAL] = float conv[FIELD_TYPE.BIT] = ord_or_None del conv[FIELD_TYPE.TIME] def __init__(self,connection): """ Parse the connection string. Initiate a trial connection with the database to check transactionality once instead of once per DeferredDB instance. """ self._connection = connection self._kw_args = self._parse_connection_string(connection) self._db_pool = {} self._db_lock = allocate_lock() connection = MySQLdb.connect(**self._kw_args) transactional = connection.server_capabilities & CLIENT.TRANSACTIONS connection.close() if self._try_transactions == '-': transactional = 0 elif not transactional and self._try_transactions == '+': raise NotSupportedError, "transactions not supported by this server" self._use_TM = self._transactions = transactional if self._mysql_lock: self._use_TM = 1 def _parse_connection_string(self, connection): kwargs = {'conv': self.conv} items = split(connection) self._use_TM = None if not items: return kwargs compress = items[0] if compress == "~": kwargs['compress'] = True items = items[1:] lockreq, items = items[0], items[1:] if lockreq[0] == "*": self._mysql_lock = lockreq[1:] db_host, items = items[0], items[1:] self._use_TM = 1 else: self._mysql_lock = None db_host = lockreq if '@' in db_host: db, host = split(db_host,'@',1) kwargs['db'] = db if host.startswith('['): host, port = split(host[1:], ']', 1) if port.startswith(':'): kwargs['port'] = int(port[1:]) elif ':' in host: host, port = split(host,':',1) kwargs['port'] = int(port) kwargs['host'] = host else: kwargs['db'] = db_host if kwargs['db'] and kwargs['db'][0] in ('+', '-'): self._try_transactions = kwargs['db'][0] kwargs['db'] = kwargs['db'][1:] else: self._try_transactions = None if not kwargs['db']: del kwargs['db'] if not items: return kwargs kwargs['user'], items = items[0], items[1:] if not items: return kwargs kwargs['passwd'], items = items[0], items[1:] if not items: return kwargs kwargs['unix_socket'], items = items[0], items[1:] return kwargs def _pool_set(self, key, value): self._db_lock.acquire() try: self._db_pool[key] = value finally: self._db_lock.release() def _pool_get(self, key): self._db_lock.acquire() try: return self._db_pool.get(key) finally: self._db_lock.release() def _pool_del(self, key): self._db_lock.acquire() try: del self._db_pool[key] finally: self._db_lock.release() def closeConnection(self): ident = get_ident() try: self._pool_del(ident) except KeyError: pass def _access_db(self, method_id, args, kw): """ Generic method to call pooled objects' methods. When the current thread had never issued any call, create a DeferredDB instance. """ ident = get_ident() db = self._pool_get(ident) if db is None: db = DeferredDB(kw_args=self._kw_args, use_TM=self._use_TM, mysql_lock=self._mysql_lock, transactions=self._transactions) self._pool_set(ident, db) return getattr(db, method_id)(*args, **kw) def tables(self, *args, **kw): return self._access_db(method_id='tables', args=args, kw=kw) def columns(self, *args, **kw): return self._access_db(method_id='columns', args=args, kw=kw) def query(self, *args, **kw): return self._access_db(method_id='query', args=args, kw=kw) def string_literal(self, *args, **kw): return self._access_db(method_id='string_literal', args=args, kw=kw) class DeferredDB(TM): """ An experimental MySQL DA which implements deferred execution of SQL code in order to reduce locks and provide better behaviour with MyISAM non transactional tables """ defs={ FIELD_TYPE.CHAR: "i", FIELD_TYPE.DATE: "d", FIELD_TYPE.DATETIME: "d", FIELD_TYPE.DECIMAL: "n", FIELD_TYPE.DOUBLE: "n", FIELD_TYPE.FLOAT: "n", FIELD_TYPE.INT24: "i", FIELD_TYPE.LONG: "i", FIELD_TYPE.LONGLONG: "l", FIELD_TYPE.SHORT: "i", FIELD_TYPE.TIMESTAMP: "d", FIELD_TYPE.TINY: "i", FIELD_TYPE.YEAR: "i", } _p_oid=_p_changed=_registered=None def __init__(self, kw_args, use_TM, mysql_lock, transactions): self._kw_args = kw_args self._mysql_lock = mysql_lock self._use_TM = use_TM self._transactions = transactions self._forceReconnection() self._sql_string_list = [] def __del__(self): self.db.close() def _forceReconnection(self): db = MySQLdb.connect(**self._kw_args) self.db = db def tables(self, rdb=0, _care=('TABLE', 'VIEW')): r=[] a=r.append result = self._query("SHOW TABLES") row = result.fetch_row(1) while row: a({'TABLE_NAME': row[0][0], 'TABLE_TYPE': 'TABLE'}) row = result.fetch_row(1) return r def columns(self, table_name): from string import join try: c = self._query('SHOW COLUMNS FROM %s' % table_name) except: return () r=[] for Field, Type, Null, Key, Default, Extra in c.fetch_row(0): info = {} field_default = Default and "DEFAULT %s"%Default or '' if Default: info['Default'] = Default if '(' in Type: end = rfind(Type,')') short_type, size = split(Type[:end],'(',1) if short_type not in ('set','enum'): if ',' in size: info['Scale'], info['Precision'] = \ map(int, split(size,',',1)) else: info['Scale'] = int(size) else: short_type = Type if short_type in field_icons: info['Icon'] = short_type else: info['Icon'] = icon_xlate.get(short_type, "what") info['Name'] = Field info['Type'] = type_xlate.get(short_type,'string') info['Extra'] = Extra, info['Description'] = join([Type, field_default, Extra or '', key_types.get(Key, Key or ''), Null != 'YES' and 'NOT NULL' or '']), info['Nullable'] = (Null == 'YES') and 1 or 0 if Key: info['Index'] = 1 if Key == 'PRI': info['PrimaryKey'] = 1 info['Unique'] = 1 elif Key == 'UNI': info['Unique'] = 1 r.append(info) return r def _query(self, query, force_reconnect=False): """ Send a to MySQL server. It reconnects automaticaly if needed and the following conditions are met: - It has not just tried to reconnect (ie, this function will not attemp to connect twice per call). - This conection is not transactionnal and has set not MySQL locks, because they are bound to the connection. This check can be overridden by passing force_reconnect with True value. """ try: self.db.query(query) except OperationalError, m: if m[0] in query_syntax_error: raise OperationalError(m[0], '%s: %s' % (m[1], query)) if m[0] in lock_error: raise ConflictError('%s: %s: %s' % (m[0], m[1], query)) if ((not force_reconnect) and \ (self._mysql_lock or self._transactions)) or \ m[0] not in hosed_connection: raise # Hm. maybe the db is hosed. Let's restart it. self._forceReconnection() self.db.query(query) return self.db.store_result() def query(self,query_string, max_rows=1000): self._use_TM and self._register() for qs in filter(None, map(strip,split(query_string, '\0'))): qtype = upper(split(qs, None, 1)[0]) if qtype == "SELECT": raise NotSupportedError, "can not SELECT in deferred connections" self._sql_string_list.append(qs) return (),() def string_literal(self, s): return self.db.string_literal(s) def _begin(self, *ignored): # The Deferred DB instance is sometimes used for several # transactions, so it is required to clear the sql_string_list # each time a transaction starts self._sql_string_list = [] self._transaction_begun = True def _finish(self, *ignored): if not self._transaction_begun: return self._transaction_begun = False # Ping the database to reconnect if connection was lost. self._query("SELECT 1", force_reconnect=True) if self._transactions: self._query("BEGIN") if self._mysql_lock: self._query("SELECT GET_LOCK('%s',0)" % self._mysql_lock) for qs in self._sql_string_list: self._query(qs) if self._mysql_lock: self._query("SELECT RELEASE_LOCK('%s')" % self._mysql_lock) if self._transactions: self._query("COMMIT") def _abort(self, *ignored): self._transaction_begun = False