[python-sybase] Pulling my hair out about a query... (solved?)
Dave Cole
djc at object-craft.com.au
Tue, 29 Jun 2004 11:28:32 +1000
This is a multi-part message in MIME format.
--------------020703060205030404080808
Content-Type: text/plain; charset=us-ascii; format=flowed
Content-Transfer-Encoding: 7bit
Dave Cole wrote:
> Gregory Bond wrote:
>
>> We still have 11.5 (youch!!), 12.0 and 12.5 servers on Solaris we can
>> test
>> against.
>> And if you ever get the bulkcopy implemented again we'd use it live!
>
>
> Ben keeps tellng me to reimplement the bulkcopy stuff. If you are happy
> to test it I should be able to do it over the weekend.
Have been sick for the last two days. Getting bored at home so I had a
crack at implementing bulkcopy again...
Install the attached Sybase.py and try something like the following:
import csv
import Sybase
db = Sybase.connect(server, user, passwd, database,
delay_connect=1, auto_commit=1)
db.set_property(Sybase.CS_BULK_LOGIN, Sybase.CS_TRUE)
db.connect()
blk = db.bulkcopy(table)
for row in csv.reader(file("some.csv")):
blk.rowxfer(row)
blk.done()
- Dave
--
http://www.object-craft.com.au
--------------020703060205030404080808
Content-Type: text/x-python;
name="Sybase.py"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
filename="Sybase.py"
#
# Copyright 2001 by Object Craft P/L, Melbourne, Australia.
#
# LICENCE - see LICENCE file distributed with this software for details.
#
try:
import DateTime
use_datetime = 1
except ImportError:
try:
import mx.DateTime
DateTime = mx.DateTime
use_datetime = 1
except ImportError:
use_datetime = 0
import sys
import time
import string
import threading
from sybasect import *
from sybasect import __have_freetds__
set_debug(sys.stderr)
__version__ = '0.36'
# DB-API values
apilevel = '2.0' # DB API level supported
threadsafety = 2 # Threads may share the module
# and connections.
paramstyle = 'named' # Named style,
# e.g. '...WHERE name=@name'
# DB-API exceptions
#
# StandardError
# |__Warning
# |__Error
# |__InterfaceError
# |__DatabaseError
# |__DataError
# |__OperationalError
# |__IntegrityError
# |__InternalError
# |__ProgrammingError
# |__NotSupportedError
class Warning(StandardError):
pass
class Error(StandardError):
def append(self, other):
self.args = (self.args[0] + other.args[0],)
class InterfaceError(Error):
pass
class DatabaseError(Error):
pass
class DataError(DatabaseError):
pass
class OperationalError(DatabaseError):
pass
class IntegrityError(DatabaseError):
pass
class InternalError(DatabaseError):
pass
class ProgrammingError(DatabaseError):
pass
class NotSupportedError(DatabaseError):
pass
class DBAPITypeObject:
def __init__(self, *values):
self.values = values
def __cmp__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
STRING = DBAPITypeObject(CS_LONGCHAR_TYPE, CS_VARCHAR_TYPE,
CS_TEXT_TYPE, CS_CHAR_TYPE)
BINARY = DBAPITypeObject(CS_IMAGE_TYPE, CS_LONGBINARY_TYPE,
CS_VARBINARY_TYPE, CS_BINARY_TYPE)
NUMBER = DBAPITypeObject(CS_BIT_TYPE, CS_TINYINT_TYPE,
CS_SMALLINT_TYPE, CS_INT_TYPE,
CS_MONEY_TYPE, CS_REAL_TYPE, CS_FLOAT_TYPE,
CS_DECIMAL_TYPE, CS_NUMERIC_TYPE)
DATETIME = DBAPITypeObject(CS_DATETIME4_TYPE, CS_DATETIME_TYPE)
ROWID = DBAPITypeObject(CS_DECIMAL_TYPE, CS_NUMERIC_TYPE)
def OUTPUT(value):
buf = DataBuf(value)
buf.status = CS_RETURN
return buf
def Date(year, month, day):
return datetime('%s-%s-%s' % (year, month, day))
def Time(hour, minute, second):
return datetime('%d:%d:%d' % (hour, minute, second))
def Timestamp(year, month, day, hour, minute, second):
return datetime('%s-%s-%s %d:%d:%d' % (year, month, day,
hour, minute, second))
def DateFromTicks(ticks):
return apply(Date, time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return apply(Time, time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return apply(Timestamp, time.localtime(ticks)[:6])
def Binary(str):
return str
def _fmt_server(msg):
parts = []
for label, name in (('Msg', 'msgnumber'),
('Level', 'severity'),
('State', 'state'),
('Procedure', 'proc'),
('Line', 'line')):
value = getattr(msg, name)
if value:
parts.append('%s %s' % (label, value))
text = '%s\n%s' % (string.join(parts, ', '), msg.text)
_ctx.debug_msg(text)
return text
def _fmt_client(msg):
text = 'Layer: %s, Origin: %s\n' \
'%s' % (CS_LAYER(msg.msgnumber), CS_ORIGIN(msg.msgnumber),
msg.msgstring)
_ctx.debug_msg(text)
return text
def _cslib_cb(ctx, msg):
raise Error(_fmt_client(msg))
def _clientmsg_cb(ctx, conn, msg):
raise DatabaseError(_fmt_client(msg))
def _servermsg_cb(ctx, conn, msg):
if msg.msgnumber not in (5701, 5703):
raise DatabaseError(_fmt_server(msg))
def _row_bind(cmd, count=1):
'''Bind buffers for count rows of column data.
'''
status, num_cols = cmd.ct_res_info(CS_NUMDATA)
if status != CS_SUCCEED:
raise Error('ct_res_info')
bufs = []
for i in range(num_cols):
status, fmt = cmd.ct_describe(i + 1)
if status != CS_SUCCEED:
raise Error('ct_describe')
fmt.count = count
if fmt.datatype == CS_VARBINARY_TYPE:
fmt.datatype = CS_BINARY_TYPE
if fmt.maxlength > 65536:
fmt.maxlength = 65536
status, buf = cmd.ct_bind(i + 1, fmt)
if status != CS_SUCCEED:
raise Error('ct_bind')
bufs.append(buf)
return bufs
def _column_value(val):
if use_datetime and type(val) is DateTimeType:
return DateTime.DateTime(val.year, val.month + 1, val.day,
val.hour, val.minute,
val.second + val.msecond / 1000.0)
else:
return val
def _extract_row(bufs, n):
'''Extract a row tuple from buffers.
'''
row = [None] * len(bufs)
col = 0
for buf in bufs:
row[col] = _column_value(buf[n])
col = col + 1
return tuple(row)
def _fetch_rows(cmd, bufs, rows):
'''Fetch rows into bufs.
When bound to buffers for a single row, return a row tuple.
When bound to multiple row buffers, return a list of row
tuples.
'''
_ctx.debug_msg('_fetch_rows\n')
status, rows_read = cmd.ct_fetch()
if status == CS_SUCCEED:
pass
elif status == CS_END_DATA:
return 0
elif status in (CS_ROW_FAIL, CS_FAIL, CS_CANCELED):
raise Error('ct_fetch')
if bufs[0].count > 1:
for i in xrange(rows_read):
rows.append(_extract_row(bufs, i))
return rows_read
else:
rows.append(_extract_row(bufs, 0))
return 1
def _bufs_description(bufs):
desc = []
for buf in bufs:
desc.append((buf.name, buf.datatype, 0,
buf.maxlength, buf.precision, buf.scale,
buf.status & CS_CANBENULL))
return desc
# Setup global library context
status, _ctx = cs_ctx_alloc()
if status != CS_SUCCEED:
raise InternalError('cs_ctx_alloc failed')
set_global_ctx(_ctx)
if _ctx.ct_init() != CS_SUCCEED:
raise Error('ct_init')
_ctx.cs_config(CS_SET, CS_MESSAGE_CB, _cslib_cb)
_ctx.ct_callback(CS_SET, CS_CLIENTMSG_CB, _clientmsg_cb)
_ctx.ct_callback(CS_SET, CS_SERVERMSG_CB, _servermsg_cb)
if _ctx.ct_config(CS_SET, CS_NETIO, CS_SYNC_IO) != CS_SUCCEED:
raise Error('ct_config')
class _FetchNow:
def __init__(self, owner):
self._owner = owner
self._conn = owner._conn
self._result_list = []
self._description_list = []
self._rownum = 0
status, self._cmd = self._conn.ct_cmd_alloc()
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_cmd_alloc')
def start(self, arraysize):
self._arraysize = arraysize
status = self._cmd.ct_send()
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_send')
while 1:
try:
status, result = self._cmd.ct_results()
except Exception, e:
self._conn.ct_cancel(CS_CANCEL_ALL)
raise e
if status == CS_END_RESULTS:
if self._description_list:
return self._description_list[0]
return None
elif status != CS_SUCCEED:
self._raise_error(Error, 'ct_results')
if result == CS_ROW_RESULT:
self._row_result()
elif result == CS_STATUS_RESULT:
self._status_result()
elif result == CS_PARAM_RESULT:
self._param_result()
elif result == CS_COMPUTE_RESULT:
self._compute_result()
elif result not in (CS_CMD_DONE, CS_CMD_SUCCEED):
self._raise_error(Error, 'ct_results')
def _is_idle(self):
return 1
def _raise_error(self, exc, text):
self._conn.ct_cancel(CS_CANCEL_ALL)
raise exc(text)
def _read_result(self):
bufs = _row_bind(self._cmd, self._arraysize)
self._description_list.append(_bufs_description(bufs))
logical_result = []
while _fetch_rows(self._cmd, bufs, logical_result):
pass
self._result_list.append(logical_result)
_row_result = _read_result
_status_result = _read_result
_param_result = _read_result
_compute_result = _read_result
def result_list(self):
return self._result_list
def fetchone(self):
rownum = self._rownum
if self._result_list and rownum < len(self._result_list[0]):
self._rownum = self._rownum + 1
return self._result_list[0][rownum]
def fetchmany(self, num):
if self._result_list:
rownum = self._rownum
rows = self._result_list[0][rownum: rownum + num]
self._rownum = rownum + num
return rows
return []
def fetchall(self):
if self._result_list:
rows = self._result_list[0][self._rownum:]
self._rownum = len(self._result_list[0])
return rows
return []
def nextset(self):
if self._result_list:
del self._result_list[0]
del self._description_list[0]
self._rownum = 0
if self._result_list:
return self._description_list[0]
return None
class _FetchNowParams(_FetchNow):
def start(self, arraysize, params):
self._params = params
return _FetchNow.start(self, arraysize)
def _param_result(self):
bufs = _row_bind(self._cmd, 1)
while 1:
status, rows_read = self._cmd.ct_fetch()
if status == CS_SUCCEED:
pass
elif status == CS_END_DATA:
break
elif status in (CS_ROW_FAIL, CS_FAIL, CS_CANCELED):
self._raise_error(Error, 'ct_fetch')
for buf in bufs:
if buf.status & CS_RETURN:
if type(self._params) is type({}):
self._params[buf.name] = _column_value(buf[0])
else:
self._params.append(_column_value(buf[0]))
_LAZY_IDLE = 0 # prepared command
_LAZY_FETCHING = 1 # fetching rows
_LAZY_END_RESULT = 2 # fetching rows
_LAZY_CLOSED = 3 # cursor closed
_state_names = { _LAZY_IDLE: '_LAZY_IDLE',
_LAZY_FETCHING: '_LAZY_FETCHING',
_LAZY_END_RESULT: '_LAZY_END_RESULT',
_LAZY_CLOSED: '_LAZY_CLOSED' }
class _FetchLazy:
def __init__(self, owner):
self._owner = owner
self._conn = owner._conn
self._cmd = None
self._lock_count = 0
self._state = _LAZY_IDLE
self._open()
def _set_state(self, state):
_ctx.debug_msg('_set_state: %s\n' % _state_names[state])
self._state = state
def _lock(self):
self._lock_count = self._lock_count + 1
_ctx.debug_msg('_lock: count -> %d\n' % self._lock_count)
self._owner._lock()
def _unlock(self):
self._lock_count = self._lock_count - 1
_ctx.debug_msg('_unlock: count -> %d\n' % self._lock_count)
self._owner._unlock()
def _open(self):
self._lock()
try:
status, self._cmd = self._owner._conn.ct_cmd_alloc()
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_cmd_alloc')
self._lock()
self._set_state(_LAZY_IDLE)
finally:
self._unlock()
def _close(self):
if self._state == _LAZY_CLOSED:
return
self._lock()
try:
if self._state != _LAZY_IDLE:
status = self._cmd.ct_cancel(CS_CANCEL_ALL)
if status == CS_SUCCEED:
self._unlock()
self._cmd = None
self._set_state(_LAZY_CLOSED)
finally:
self._unlock()
def __del__(self):
if self._state not in (_LAZY_IDLE, _LAZY_CLOSED):
if self._owner._is_connected:
self._owner._conn.ct_cancel(CS_CANCEL_ALL)
if self._lock_count:
# By the time we get called the threading module might
# have killed the thread the lock was created in ---
# oops.
count, owner = self._owner._connlock._release_save()
self._owner._connlock._acquire_restore((count, threading.currentThread()))
while self._lock_count:
self._unlock()
def _raise_error(self, exc, text):
if self._state not in (_LAZY_IDLE, _LAZY_CLOSED):
if self._owner._conn.ct_cancel(CS_CANCEL_ALL) == CS_SUCCEED:
self._set_state(_LAZY_IDLE)
self._unlock()
raise exc(text)
def _is_idle(self):
return self._state == _LAZY_IDLE
def start(self, arraysize):
self._arraysize = arraysize
status = self._cmd.ct_send()
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_send')
return self._start_results()
def fetchone(self):
self._lock()
try:
if self._state == _LAZY_IDLE:
self._raise_error(ProgrammingError, 'no result set pending')
if self._state == _LAZY_CLOSED:
self._raise_error(ProgrammingError, 'cursor is closed')
if self._state == _LAZY_FETCHING:
if self._array_pos >= len(self._array):
try:
self._array_pos = 0
self._array = []
_fetch_rows(self._cmd, self._bufs, self._array)
except Error:
status = self._cmd.ct_cancel(CS_CANCEL_ALL)
if status == CS_SUCCEED:
self._set_state(_LAZY_IDLE)
self._unlock()
raise
if self._array_pos < len(self._array):
row = self._array[self._array_pos]
self._array_pos = self._array_pos + 1
return row
self._fetch_rowcount()
self._set_state(_LAZY_END_RESULT)
finally:
self._unlock()
def fetchmany(self, num):
self._lock()
try:
if self._state == _LAZY_IDLE:
self._raise_error(ProgrammingError, 'no result set pending')
if self._state == _LAZY_CLOSED:
self._raise_error(ProgrammingError, 'cursor is closed')
if self._state == _LAZY_FETCHING:
if num == -1:
num = self._arraysize
if num != self._bufs[0].count:
rows = []
for i in xrange(num):
row = self.fetchone()
if not row:
break
rows.append(row)
return rows
elif self._array and self._array_pos < len(self._array):
rows = self._array[self._array_pos:]
else:
try:
rows = []
_fetch_rows(self._cmd, self._bufs, rows)
except Error:
status = self._cmd.ct_cancel(CS_CANCEL_ALL)
if status == CS_SUCCEED:
self._set_state(_LAZY_IDLE)
self._unlock()
raise
self._array = []
self._array_pos = 0
if rows:
return rows
self._fetch_rowcount()
self._set_state(_LAZY_END_RESULT)
return []
finally:
self._unlock()
def fetchall(self):
self._lock()
try:
rows = []
while 1:
row = self.fetchone()
if not row:
break
rows.append(row)
return rows
finally:
self._unlock()
def nextset(self):
self.rowcount = 0
self._lock()
try:
if self._state == _LAZY_CLOSED:
self._raise_error(ProgrammingError, 'cursor is closed')
if self._state == _LAZY_IDLE:
return []
if self._state == _LAZY_FETCHING:
status = self._cmd.ct_cancel(CS_CANCEL_CURRENT)
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_cancel')
return self._start_results()
finally:
self._unlock()
def _start_results(self):
_ctx.debug_msg('_start_results\n')
self._array = []
self._array_pos = 0
while 1:
status, result = self._cmd.ct_results()
if status == CS_END_RESULTS:
if self._state != _LAZY_END_RESULT:
self._unlock()
self._set_state(_LAZY_IDLE)
return None
elif status != CS_SUCCEED:
self._raise_error(Error, 'ct_results')
if result in (CS_COMPUTE_RESULT, CS_CURSOR_RESULT,
CS_PARAM_RESULT, CS_ROW_RESULT, CS_STATUS_RESULT):
if self._arraysize > 0:
bufs = self._bufs = _row_bind(self._cmd, self._arraysize)
else:
bufs = self._bufs = _row_bind(self._cmd)
self._set_state(_LAZY_FETCHING)
return _bufs_description(bufs)
elif result in (CS_CMD_DONE, CS_CMD_SUCCEED):
status, self.rowcount = self._cmd.ct_res_info(CS_ROW_COUNT)
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_res_info')
else:
self._raise_error(Error, 'ct_results')
def _fetch_rowcount(self):
_ctx.debug_msg('_fetch_rowcount\n')
while 1:
status, result = self._cmd.ct_results()
if status == CS_END_RESULTS:
self._set_state(_LAZY_IDLE)
self._unlock()
return
elif status != CS_SUCCEED:
self._raise_error(Error, 'ct_results')
if result == CS_PARAM_RESULT:
bufs = _row_bind(self._cmd)
while 1:
rows = []
_fetch_rows(self._cmd, bufs, rows)
if not rows:
break
elif result in (CS_CMD_DONE, CS_CMD_SUCCEED):
status, self.rowcount = self._cmd.ct_res_info(CS_ROW_COUNT)
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_res_info')
return
else:
self._raise_error(Error, 'ct_results')
class Cursor:
def __init__(self, owner):
'''Implements DB-API Cursor object
'''
self.description = None # DB-API
self.rowcount = -1 # DB-API
self.arraysize = 1 # DB-API
self._owner = owner
self._fetcher = None
self._closed = 0
def _lock(self):
self._owner._lock()
def _unlock(self):
self._owner._unlock()
def callproc(self, name, params=()):
'''DB-API Cursor.callproc()
'''
_ctx.debug_msg('Cursor.callproc\n')
if self._closed:
raise ProgrammingError('cursor is closed')
self._lock()
try:
# Discard any previous results
self._fetcher = None
# Prepare to retrieve new results.
fetcher = self._fetcher = _FetchNowParams(self._owner)
cmd = fetcher._cmd
status = cmd.ct_command(CS_RPC_CMD, name)
if status != CS_SUCCEED:
fetcher._raise_error(Error, 'ct_command')
# Send parameters.
if type(params) is type({}):
out_params = {}
for name, value in params.items():
out_params[name] = value
if isinstance(value, DataBufType):
buf = value
else:
buf = DataBuf(value)
buf.name = name
status = cmd.ct_param(buf)
if status != CS_SUCCEED:
fetcher._raise_error(Error, 'ct_param')
else:
out_params = []
for value in params:
out_params.append(value)
if isinstance(value, DataBufType):
buf = value
else:
buf = DataBuf(value)
status = cmd.ct_param(buf)
if status != CS_SUCCEED:
fetcher._raise_error(Error, 'ct_param')
# Start retreiving results.
self.description = fetcher.start(self.arraysize, out_params)
return out_params
finally:
self._unlock()
def close(self):
'''DB-API Cursor.close()
'''
if self._closed:
raise ProgrammingError('cursor is closed')
self._fetcher = None
self._closed = 1
def execute(self, sql, params={}):
'''DB-API Cursor.execute()
'''
_ctx.debug_msg('Cursor.execute\n')
if self._closed:
raise ProgrammingError('cursor is closed')
self._lock()
try:
# Discard any previous results
self._fetcher = None
# Prepare to retrieve new results.
fetcher = self._fetcher = _FetchLazy(self._owner)
cmd = fetcher._cmd
cmd.ct_command(CS_LANG_CMD, sql)
for name, value in params.items():
buf = DataBuf(value)
buf.name = name
status = cmd.ct_param(buf)
if status != CS_SUCCEED:
fetcher._raise_error(Error, 'ct_param')
self.description = fetcher.start(self.arraysize)
finally:
self._unlock()
def executemany(self, sql, params_seq=[]):
'''DB-API Cursor.executemany()
'''
_ctx.debug_msg('Cursor.executemany\n')
if self._closed:
raise ProgrammingError('cursor is closed')
self._lock()
try:
for params in params_seq:
self.execute(sql, params)
if not self._fetcher._is_idle():
self._fetcher._raise_error(ProgrammingError, 'fetchable results on cursor')
finally:
self._unlock()
def fetchone(self):
'''DB-API Cursor.fetchone()
'''
if self._closed:
raise ProgrammingError('cursor is closed')
if not self._fetcher:
raise ProgrammingError('query has not been executed')
return self._fetcher.fetchone()
def fetchmany(self, num=-1):
'''DB-API Cursor.fetchmany()
'''
if self._closed:
raise ProgrammingError('cursor is closed')
if not self._fetcher:
raise ProgrammingError('query has not been executed')
if num < 0:
num = self.arraysize
return self._fetcher.fetchmany(num)
def fetchall(self):
'''DB-API Cursor.fetchall()
'''
if self._closed:
raise ProgrammingError('cursor is closed')
if not self._fetcher:
raise ProgrammingError('query has not been executed')
return self._fetcher.fetchall()
def nextset(self):
'''DB-API Cursor.nextset()
'''
if self._closed:
raise ProgrammingError('cursor is closed')
if not self._fetcher:
raise ProgrammingError('query has not been executed')
desc = self._fetcher.nextset()
if desc:
self.description = desc
return 1
return 0
def setinputsizes(self, *sizes):
'''DB-API Cursor.setinputsizes()
'''
pass
def setoutputsize(self, size, column=None):
'''DB-API Cursor.setoutputsize()
'''
pass
class Bulkcopy:
def __init__(self, owner, table, direction):
self._owner = owner
self._conn = conn = owner._conn
self._table = table
self._direction = direction
status, blk = conn.blk_alloc()
if status != CS_SUCCEED:
raise InternalError('blk_alloc')
self._blk = blk
status = blk.blk_init(direction, table)
if status != CS_SUCCEED:
raise InternalError('blk_init')
def rowxfer(self, data):
if type(data) not in (type([]), type(())):
raise ProgrammingError('list or tuple expected')
blk = self._blk
bufs = []
for col in xrange(len(data)):
buf = DataBuf(data[col])
bufs.append(buf)
if blk.blk_bind(col + 1, buf) != CS_SUCCEED:
raise InternalError('blk_bind')
if blk.blk_rowxfer() != CS_SUCCEED:
raise InternalError('blk_rowxfer')
def batch(self):
blk = self._blk
status, num_rows = blk.blk_done(CS_BLK_BATCH)
if status != CS_SUCCEED:
raise InternalError('blk_done')
return num_rows
def done(self):
blk = self._blk
status, num_rows = blk.blk_done(CS_BLK_ALL)
if status != CS_SUCCEED:
raise InternalError('blk_done')
return num_rows
class Connection:
def __init__(self, dsn, user, passwd, database=None,
strip=0, auto_commit=0, delay_connect=0, locking=1):
'''DB-API Sybase.Connect()
'''
self._conn = self._cmd = None
self.dsn = dsn
self.user = user
self.passwd = passwd
self.database = database
self.auto_commit = auto_commit
self._do_locking = locking
self._is_connected = 0
self.arraysize = 32
if locking:
self._connlock = threading.RLock()
# Do not lock in sybasect - we take care if locking in Python.
status, conn = _ctx.ct_con_alloc(0)
if status != CS_SUCCEED:
raise Error('ct_con_alloc')
self._conn = conn
conn.strip = strip
status = conn.ct_con_props(CS_SET, CS_USERNAME, user)
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_con_props')
status = conn.ct_con_props(CS_SET, CS_PASSWORD, passwd)
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_con_props')
if not delay_connect:
self.connect()
def _lock(self):
if self._do_locking:
self._connlock.acquire()
def _unlock(self):
if self._do_locking:
self._connlock.release()
def _raise_error(self, exc, text):
if self._is_connected:
self._conn.ct_cancel(CS_CANCEL_ALL)
raise exc(text)
def connect(self):
conn = self._conn
self._lock()
try:
status = conn.ct_connect(self.dsn)
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_connect')
self._is_connected = 1
status = conn.ct_options(CS_SET, CS_OPT_CHAINXACTS, not self.auto_commit)
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_options')
finally:
self._unlock()
if self.database:
self.execute('use %s' % self.database)
self._dyn_num = 0
def get_property(self, prop):
conn = self._conn
self._lock()
try:
status, value = conn.ct_con_props(CS_GET, prop)
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_con_props')
finally:
self._unlock()
return value
def set_property(self, prop, value):
conn = self._conn
self._lock()
try:
status = conn.ct_con_props(CS_SET, prop, value)
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_con_props')
finally:
self._unlock()
def __del__(self):
try:
self.close()
except:
pass
def close(self):
'''DBI-API Connection.close()
'''
conn = self._conn
self._lock()
try:
status, result = conn.ct_con_props(CS_GET, CS_CON_STATUS)
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_con_props')
if not result & CS_CONSTAT_CONNECTED:
self._raise_error(ProgrammingError, 'Connection is already closed')
if self._cmd:
self._cmd = None
status = conn.ct_close(CS_FORCE_CLOSE)
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_close')
self._is_connected = 0
finally:
self._unlock()
def begin(self, name=None):
'''Not in DB-API, but useful for Sybase
'''
if name:
self.execute('begin transaction %s' % name)
else:
self.execute('begin transaction')
def commit(self, name=None):
'''DB-API Connection.commit()
'''
if name:
self.execute('commit transaction %s' % name)
else:
self.execute('commit transaction')
def rollback(self, name=None):
'''DB-API Connection.rollback()
'''
if name:
self.execute('rollback transaction %s' % name)
else:
self.execute('rollback transaction')
def cursor(self):
'''DB-API Connection.cursor()
'''
return Cursor(self)
def bulkcopy(self, table, copy_in=1):
if copy_in:
direction = CS_BLK_IN
else:
direction = CS_BLK_OUT
return Bulkcopy(self, table, direction)
def execute(self, sql):
'''Backwards compatibility
'''
self._lock()
try:
fetcher = _FetchNow(self)
cmd = fetcher._cmd
status = cmd.ct_command(CS_LANG_CMD, sql)
if status != CS_SUCCEED:
self._raise_error(Error, 'ct_command')
fetcher.start(self.arraysize)
return fetcher.result_list()
finally:
self._unlock()
def connect(dsn, user, passwd, database=None,
strip=0, auto_commit=0, delay_connect=0, locking=1):
return Connection(dsn, user, passwd, database,
strip, auto_commit, delay_connect, locking)
--------------020703060205030404080808--