| 1 |
############################################################################## |
|---|
| 2 |
# |
|---|
| 3 |
# Copyright (c) 2004 Zope Corporation and Contributors. All Rights Reserved. |
|---|
| 4 |
# |
|---|
| 5 |
# This software is subject to the provisions of the Zope Public License, |
|---|
| 6 |
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. |
|---|
| 7 |
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED |
|---|
| 8 |
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|---|
| 9 |
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS |
|---|
| 10 |
# FOR A PARTICULAR PURPOSE. |
|---|
| 11 |
# |
|---|
| 12 |
############################################################################## |
|---|
| 13 |
"""Asynchronous call manager |
|---|
| 14 |
|
|---|
| 15 |
A place to hang asynchronous calls and get their results |
|---|
| 16 |
|
|---|
| 17 |
$Id$ |
|---|
| 18 |
""" |
|---|
| 19 |
|
|---|
| 20 |
from types import NoneType |
|---|
| 21 |
import datetime, random, sys, os, logging, sets, traceback |
|---|
| 22 |
|
|---|
| 23 |
from twisted.internet import defer |
|---|
| 24 |
from twisted.python import failure |
|---|
| 25 |
|
|---|
| 26 |
from Persistence import Persistent |
|---|
| 27 |
from ZODB.POSException import ConflictError |
|---|
| 28 |
from ZEO.Exceptions import ClientDisconnected |
|---|
| 29 |
from Globals import InitializeClass, Persistent |
|---|
| 30 |
from Acquisition import aq_base, aq_inner, aq_parent |
|---|
| 31 |
from AccessControl.Owned import ownerInfo |
|---|
| 32 |
from AccessControl import ClassSecurityInfo, Unauthorized, \ |
|---|
| 33 |
getSecurityManager |
|---|
| 34 |
from AccessControl.SecurityManagement import newSecurityManager, \ |
|---|
| 35 |
setSecurityManager |
|---|
| 36 |
from Persistence import PersistentMapping |
|---|
| 37 |
from OFS.SimpleItem import SimpleItem |
|---|
| 38 |
from OFS.PropertyManager import PropertyManager |
|---|
| 39 |
from BTrees import OOBTree |
|---|
| 40 |
from Products.PageTemplates.PageTemplateFile import PageTemplateFile |
|---|
| 41 |
from Products.PageTemplates.Expressions import getEngine, SecureModuleImporter |
|---|
| 42 |
from Products.Sessions.BrowserIdManager import BROWSERID_MANAGER_NAME |
|---|
| 43 |
from AccessControl.SecurityInfo import allow_class |
|---|
| 44 |
|
|---|
| 45 |
from nuxeo.persistentqueue.persistentqueue import PersistentQueue |
|---|
| 46 |
|
|---|
| 47 |
import permissions, bforests, interfaces |
|---|
| 48 |
|
|---|
| 49 |
UNCALLED = 0 |
|---|
| 50 |
CALLED = 1 |
|---|
| 51 |
FAILURE = -1 |
|---|
| 52 |
state_name_map = {UNCALLED: None, CALLED: 'success', FAILURE: 'failure'} |
|---|
| 53 |
|
|---|
| 54 |
# try to make failure.Failure available to restricted python |
|---|
| 55 |
allow_class(failure.Failure) # in particular, this should allow access to |
|---|
| 56 |
# 'getErrorMessage', 'getBriefTraceback', 'getTraceback', and 'trap'. |
|---|
| 57 |
|
|---|
| 58 |
def iterable(value): |
|---|
| 59 |
try: |
|---|
| 60 |
iter(value) |
|---|
| 61 |
except TypeError: |
|---|
| 62 |
return False |
|---|
| 63 |
else: |
|---|
| 64 |
return True |
|---|
| 65 |
|
|---|
| 66 |
def countable(value): |
|---|
| 67 |
try: |
|---|
| 68 |
len(value) |
|---|
| 69 |
except TypeError: |
|---|
| 70 |
return False |
|---|
| 71 |
else: |
|---|
| 72 |
return True |
|---|
| 73 |
|
|---|
| 74 |
class Reference: |
|---|
| 75 |
|
|---|
| 76 |
def __init__(self, obj): |
|---|
| 77 |
try: |
|---|
| 78 |
self._path = obj.getPhysicalPath() |
|---|
| 79 |
except (AttributeError, KeyError): |
|---|
| 80 |
self._path = None |
|---|
| 81 |
self._repr = repr(obj) |
|---|
| 82 |
|
|---|
| 83 |
def dereference(self, context): |
|---|
| 84 |
return context.restrictedTraverse(self._path, None) |
|---|
| 85 |
|
|---|
| 86 |
def __repr__(self): |
|---|
| 87 |
return self._repr |
|---|
| 88 |
allow_class(Reference) |
|---|
| 89 |
|
|---|
| 90 |
class StandIn: |
|---|
| 91 |
|
|---|
| 92 |
data = klass = contents = None |
|---|
| 93 |
|
|---|
| 94 |
def __init__(self, obj, depth, contents=None): |
|---|
| 95 |
d = getattr(obj, '__dict__', None) |
|---|
| 96 |
if d is not None: |
|---|
| 97 |
self.data = sanitize(d, depth) |
|---|
| 98 |
self.klass = obj.__class__ |
|---|
| 99 |
self.contents = contents |
|---|
| 100 |
self._repr = repr(obj) |
|---|
| 101 |
|
|---|
| 102 |
def __getattr__(self, name): |
|---|
| 103 |
if self.data is not None and self.data is not MAX_DEPTH_MARKER: |
|---|
| 104 |
try: |
|---|
| 105 |
return self.data[name] |
|---|
| 106 |
except KeyError: |
|---|
| 107 |
pass |
|---|
| 108 |
raise AttributeError(name) |
|---|
| 109 |
|
|---|
| 110 |
def __repr__(self): |
|---|
| 111 |
return self._repr |
|---|
| 112 |
allow_class(StandIn) |
|---|
| 113 |
|
|---|
| 114 |
MAX_DEPTH = 4 |
|---|
| 115 |
|
|---|
| 116 |
class MAX_DEPTH_MARKER: pass # singleton |
|---|
| 117 |
|
|---|
| 118 |
def sanitize(value, depth=0, max_depth=MAX_DEPTH): |
|---|
| 119 |
if depth >= max_depth: |
|---|
| 120 |
return MAX_DEPTH_MARKER |
|---|
| 121 |
depth += 1 |
|---|
| 122 |
if aq_base(value) is not value or isinstance(value, Persistent): |
|---|
| 123 |
if (aq_parent(value) is not None and |
|---|
| 124 |
getattr(aq_base(value), 'getPhysicalPath', None) is not None): |
|---|
| 125 |
value = Reference(value) |
|---|
| 126 |
else: |
|---|
| 127 |
value = StandIn(value, depth) # XXX could theoretically do a better |
|---|
| 128 |
# job for BTrees, but don't think I really want to make sending |
|---|
| 129 |
# those across the wire all that easy |
|---|
| 130 |
elif getattr(value, '__dict__', None) is not None: |
|---|
| 131 |
value = StandIn(value, depth) # XXX not ideal for subclasses of built |
|---|
| 132 |
# in types, but oh well |
|---|
| 133 |
elif isinstance(value, (list, tuple, sets.Set)): |
|---|
| 134 |
val_type = type(value) |
|---|
| 135 |
contents = [sanitize(item, depth) for item in value] |
|---|
| 136 |
if val_type is list: |
|---|
| 137 |
value = contents |
|---|
| 138 |
else: |
|---|
| 139 |
try: |
|---|
| 140 |
value = val_type(contents) |
|---|
| 141 |
except TypeError: |
|---|
| 142 |
value = StandIn(value, depth, contents) |
|---|
| 143 |
elif isinstance(value, dict): |
|---|
| 144 |
contents = [ |
|---|
| 145 |
(sanitize(k, depth), sanitize(v, depth)) for k, v in value.items()] |
|---|
| 146 |
val_type = type(value) |
|---|
| 147 |
try: |
|---|
| 148 |
value = val_type(contents) |
|---|
| 149 |
except TypeError: |
|---|
| 150 |
value = StandIn(value, depth, contents) |
|---|
| 151 |
elif not isinstance(value, basestring) and iterable(value): |
|---|
| 152 |
contents = None |
|---|
| 153 |
if countable(value): |
|---|
| 154 |
contents = [sanitize(item, depth) for item in value] |
|---|
| 155 |
value = StandIn(value, depth, contents) |
|---|
| 156 |
return value |
|---|
| 157 |
|
|---|
| 158 |
def cleanFailure(failure): |
|---|
| 159 |
# used instead of failure.cleanFailure for Zope-based failures. See comment |
|---|
| 160 |
# in safe_repr below. Could be monkey patched, but want to try and be a |
|---|
| 161 |
# good Twisted citizen for other Twisted services (particularly in the |
|---|
| 162 |
# client). |
|---|
| 163 |
c = failure.__dict__.copy() |
|---|
| 164 |
|
|---|
| 165 |
def safe_repr(obj): # this is needed because Zope effectively raises some |
|---|
| 166 |
# exceptions in __repr__ (I found one in Shared/DC/Scripts/Bindings.py, |
|---|
| 167 |
# UnauthorizedBinding.__getattr__). :-( |
|---|
| 168 |
try: |
|---|
| 169 |
return repr(obj) |
|---|
| 170 |
except: |
|---|
| 171 |
return traceback.format_exception_only(*sys.exc_info()[:2])[0] |
|---|
| 172 |
|
|---|
| 173 |
c['frames'] = [ |
|---|
| 174 |
[ |
|---|
| 175 |
v[0], v[1], v[2], |
|---|
| 176 |
[(j[0], safe_repr(j[1])) for j in v[3]], |
|---|
| 177 |
[(j[0], safe_repr(j[1])) for j in v[4]] |
|---|
| 178 |
] for v in failure.frames |
|---|
| 179 |
] |
|---|
| 180 |
|
|---|
| 181 |
c['tb'] = None |
|---|
| 182 |
|
|---|
| 183 |
if failure.stack is not None: |
|---|
| 184 |
c['stack'] = [ |
|---|
| 185 |
[ |
|---|
| 186 |
v[0], v[1], v[2], |
|---|
| 187 |
[(j[0], repr(j[1])) for j in v[3]], |
|---|
| 188 |
[(j[0], repr(j[1])) for j in v[4]] |
|---|
| 189 |
] for v in failure.stack |
|---|
| 190 |
] |
|---|
| 191 |
|
|---|
| 192 |
c['pickled'] = 1 |
|---|
| 193 |
failure.__dict__ = c |
|---|
| 194 |
return failure |
|---|
| 195 |
|
|---|
| 196 |
# Expression class copied from CMFCore/Expression.py |
|---|
| 197 |
class Expression (Persistent): |
|---|
| 198 |
text = '' |
|---|
| 199 |
_v_compiled = None |
|---|
| 200 |
|
|---|
| 201 |
security = ClassSecurityInfo() |
|---|
| 202 |
|
|---|
| 203 |
def __init__(self, text): |
|---|
| 204 |
self.text = text |
|---|
| 205 |
self._v_compiled = getEngine().compile(text) |
|---|
| 206 |
|
|---|
| 207 |
def __call__(self, econtext): |
|---|
| 208 |
compiled = self._v_compiled |
|---|
| 209 |
if compiled is None: |
|---|
| 210 |
compiled = self._v_compiled = getEngine().compile(self.text) |
|---|
| 211 |
res = compiled(econtext) |
|---|
| 212 |
if isinstance(res, Exception): |
|---|
| 213 |
raise res |
|---|
| 214 |
return res |
|---|
| 215 |
|
|---|
| 216 |
class Deferred(SimpleItem): |
|---|
| 217 |
|
|---|
| 218 |
result = failure = original_result = original_failure = None |
|---|
| 219 |
key = local_key = resolution_date = None |
|---|
| 220 |
timeout = 60 * 60 * 24 # one day; zasync plugins generally constrain |
|---|
| 221 |
# timeouts more strictly |
|---|
| 222 |
|
|---|
| 223 |
|
|---|
| 224 |
# keep webdav interface off deferreds and tool |
|---|
| 225 |
__implements__ = (interfaces.IZopeDeferred,) |
|---|
| 226 |
|
|---|
| 227 |
security = ClassSecurityInfo() |
|---|
| 228 |
|
|---|
| 229 |
def __init__(self, plugin, args, kwargs): |
|---|
| 230 |
self.creation_date = datetime.datetime.now() |
|---|
| 231 |
self.__signature = (plugin, args, kwargs) |
|---|
| 232 |
self.__callbacks = () |
|---|
| 233 |
self.raw_state = UNCALLED # raw_state is not reliable. |
|---|
| 234 |
# Use getState. |
|---|
| 235 |
|
|---|
| 236 |
security.declareProtected(permissions.View, 'getSignature') |
|---|
| 237 |
def getSignature(self): |
|---|
| 238 |
return self.__signature |
|---|
| 239 |
|
|---|
| 240 |
security.declareProtected(permissions.View, 'getState') |
|---|
| 241 |
def getState(self): |
|---|
| 242 |
"returns 'success', 'failure', or None" |
|---|
| 243 |
state = self.raw_state |
|---|
| 244 |
if state==UNCALLED and ( |
|---|
| 245 |
self.remainingSeconds() < -(self.aq_parent.poll_interval*3)): |
|---|
| 246 |
state=FAILURE |
|---|
| 247 |
global state_name_map |
|---|
| 248 |
return state_name_map[state] |
|---|
| 249 |
|
|---|
| 250 |
security.declareProtected(permissions.View, 'getRawState') |
|---|
| 251 |
def getRawState(self): |
|---|
| 252 |
"returns 'success', 'failure', or None" |
|---|
| 253 |
global state_name_map |
|---|
| 254 |
return state_name_map[self.raw_state] |
|---|
| 255 |
|
|---|
| 256 |
security.declareProtected(permissions.View, 'getValue') |
|---|
| 257 |
def getValue(self): |
|---|
| 258 |
state = self.raw_state |
|---|
| 259 |
if state==UNCALLED: |
|---|
| 260 |
if (self.remainingSeconds() < -(self.aq_parent.poll_interval*3)): |
|---|
| 261 |
return failure.Failure(defer.TimeoutError('Timed out.')) |
|---|
| 262 |
raise RuntimeError("Deferred still pending") |
|---|
| 263 |
if state==CALLED: |
|---|
| 264 |
return self.result |
|---|
| 265 |
else: |
|---|
| 266 |
return self.failure |
|---|
| 267 |
|
|---|
| 268 |
security.declareProtected(permissions.View, 'getRawValue') |
|---|
| 269 |
def getRawValue(self): |
|---|
| 270 |
state = self.raw_state |
|---|
| 271 |
if state==UNCALLED: |
|---|
| 272 |
raise RuntimeError("Deferred still pending") |
|---|
| 273 |
if state==CALLED: |
|---|
| 274 |
return self.result |
|---|
| 275 |
else: |
|---|
| 276 |
return self.failure |
|---|
| 277 |
|
|---|
| 278 |
security.declareProtected(permissions.View, 'getErrorMessage') |
|---|
| 279 |
def getErrorMessage(self): |
|---|
| 280 |
state = self.getState() |
|---|
| 281 |
if state != 'failure': |
|---|
| 282 |
raise RuntimeError("No failure state.", state or 'pending') |
|---|
| 283 |
value = self.getValue() |
|---|
| 284 |
return value and value.getErrorMessage() or 'Unknown error.' |
|---|
| 285 |
|
|---|
| 286 |
def __repr__(self): |
|---|
| 287 |
classname = type(self).__name__ |
|---|
| 288 |
if aq_parent(self) is not None: |
|---|
| 289 |
location = " in %s" % '/'.join(self.getPhysicalPath()) |
|---|
| 290 |
else: |
|---|
| 291 |
location = "" |
|---|
| 292 |
return "<%s (key %r)%s at %s>" % ( |
|---|
| 293 |
classname, self.key, location, id(aq_base(self))) |
|---|
| 294 |
|
|---|
| 295 |
def __call_all(self): |
|---|
| 296 |
callbacks = self.__callbacks |
|---|
| 297 |
assert self.raw_state != UNCALLED |
|---|
| 298 |
res = (self.raw_state == FAILURE and self.failure or self.result) |
|---|
| 299 |
self.resolution_date = datetime.datetime.now() |
|---|
| 300 |
if not callbacks: |
|---|
| 301 |
return res |
|---|
| 302 |
user = self.getWrappedOwner() |
|---|
| 303 |
tool = aq_parent(self) |
|---|
| 304 |
root = self.getPhysicalRoot() |
|---|
| 305 |
results = {} |
|---|
| 306 |
old_sm = getSecurityManager() |
|---|
| 307 |
newSecurityManager(None, user) |
|---|
| 308 |
context_dict = { |
|---|
| 309 |
'nothing': None, # provided automatically but here for clarity |
|---|
| 310 |
'user': user, |
|---|
| 311 |
'userhome': aq_parent(aq_inner(aq_parent(aq_inner(user)))), |
|---|
| 312 |
'tool': tool, |
|---|
| 313 |
'root': root, |
|---|
| 314 |
'deferred': self, |
|---|
| 315 |
'here': self, |
|---|
| 316 |
'modules': SecureModuleImporter, |
|---|
| 317 |
'request': root.REQUEST, |
|---|
| 318 |
'result': self.result, |
|---|
| 319 |
'failure': self.failure, |
|---|
| 320 |
'original': res, |
|---|
| 321 |
'results': results, |
|---|
| 322 |
} |
|---|
| 323 |
try: |
|---|
| 324 |
callbacks = list(callbacks) |
|---|
| 325 |
self.__callbacks = () |
|---|
| 326 |
while callbacks: |
|---|
| 327 |
callback, errback = callbacks.pop(0) |
|---|
| 328 |
if self.raw_state == CALLED: |
|---|
| 329 |
call = callback |
|---|
| 330 |
else: |
|---|
| 331 |
assert self.raw_state == FAILURE |
|---|
| 332 |
call = errback |
|---|
| 333 |
name = None |
|---|
| 334 |
if call is None: |
|---|
| 335 |
continue # no-op |
|---|
| 336 |
elif isinstance(call, tuple): |
|---|
| 337 |
name, call = call |
|---|
| 338 |
context = getEngine().getContext(context_dict) # XXX looks like |
|---|
| 339 |
# we need to create a new one of these each time in order to |
|---|
| 340 |
# make the change to result or failure stick. Before tried |
|---|
| 341 |
# context.contexts['result'] = res (and similar for failure) |
|---|
| 342 |
# to change, but no success. |
|---|
| 343 |
try: |
|---|
| 344 |
res = call(context) |
|---|
| 345 |
except ((ConflictError, KeyboardInterrupt, |
|---|
| 346 |
SystemExit, ClientDisconnected)): |
|---|
| 347 |
raise |
|---|
| 348 |
except: |
|---|
| 349 |
res = failure.Failure() |
|---|
| 350 |
else: |
|---|
| 351 |
if name is not None: |
|---|
| 352 |
results[name] = res |
|---|
| 353 |
if isinstance(res, failure.Failure): |
|---|
| 354 |
cleanFailure(res) # remove object references |
|---|
| 355 |
self.failure = context_dict["failure"] = res |
|---|
| 356 |
self.raw_state = FAILURE |
|---|
| 357 |
else: |
|---|
| 358 |
self.result = context_dict["result"] = res |
|---|
| 359 |
self.raw_state = CALLED |
|---|
| 360 |
finally: |
|---|
| 361 |
setSecurityManager(old_sm) |
|---|
| 362 |
return res |
|---|
| 363 |
# manage transactions and retries externally (zasync) |
|---|
| 364 |
|
|---|
| 365 |
security.declarePrivate('callback') |
|---|
| 366 |
def callback(self, result): |
|---|
| 367 |
self.aq_inner.aq_parent.resolve(self) |
|---|
| 368 |
if self.raw_state != UNCALLED: |
|---|
| 369 |
# we don't want to abort the transaction because there's a good |
|---|
| 370 |
# chance that we *need* to have the manager resolve this deferred. |
|---|
| 371 |
# Therefore, we return a failure rather than raise a transaction. |
|---|
| 372 |
# We do not call the callbacks. |
|---|
| 373 |
logging.getLogger('zasync').error( |
|---|
| 374 |
"Zope deferred %r has already been called" % (self.key,)) |
|---|
| 375 |
return failure.Failure( |
|---|
| 376 |
RuntimeError("Deferred has already been called")) |
|---|
| 377 |
self.raw_state = CALLED |
|---|
| 378 |
self.original_result = self.result = result |
|---|
| 379 |
return self.__call_all() |
|---|
| 380 |
|
|---|
| 381 |
security.declarePrivate('errback') |
|---|
| 382 |
def errback(self, fail): |
|---|
| 383 |
self.aq_inner.aq_parent.resolve(self) |
|---|
| 384 |
if self.raw_state != UNCALLED: |
|---|
| 385 |
# we don't want to abort the transaction because there's a good |
|---|
| 386 |
# chance that we *need* to have the manager resolve this deferred. |
|---|
| 387 |
# Therefore, we return a failure rather than raise a transaction. |
|---|
| 388 |
# We do not call the errbacks. |
|---|
| 389 |
logging.getLogger('zasync').error( |
|---|
| 390 |
"Zope deferred %r has already been called" % (self.key,)) |
|---|
| 391 |
return failure.Failure( |
|---|
| 392 |
RuntimeError("Deferred has already been called")) |
|---|
| 393 |
self.raw_state = FAILURE |
|---|
| 394 |
self.original_failure = self.failure = fail |
|---|
| 395 |
return self.__call_all() |
|---|
| 396 |
|
|---|
| 397 |
security.declareProtected(permissions.ModifyDeferred, |
|---|
| 398 |
'addCallback') |
|---|
| 399 |
def addCallback(self, callback): |
|---|
| 400 |
return self.addCallbacks(callback) |
|---|
| 401 |
|
|---|
| 402 |
security.declareProtected(permissions.ModifyDeferred, |
|---|
| 403 |
'addErrback') |
|---|
| 404 |
def addErrback(self, errback): |
|---|
| 405 |
return self.addCallbacks(errback=errback) |
|---|
| 406 |
|
|---|
| 407 |
def _convertArg(self, arg, default): |
|---|
| 408 |
if not arg or arg==default: |
|---|
| 409 |
arg = None |
|---|
| 410 |
else: |
|---|
| 411 |
if isinstance(arg, basestring): |
|---|
| 412 |
arg = Expression(arg) |
|---|
| 413 |
elif (isinstance(arg, (list, tuple)) and |
|---|
| 414 |
len(arg)==2 and |
|---|
| 415 |
isinstance(arg[0], basestring) and |
|---|
| 416 |
isinstance(arg[1], basestring)): |
|---|
| 417 |
arg = (arg[0], Expression(arg[1])) |
|---|
| 418 |
else: |
|---|
| 419 |
raise ValueError( |
|---|
| 420 |
"Must be None, expression string, or pair of (result name, " |
|---|
| 421 |
"expression string)", arg) |
|---|
| 422 |
return arg |
|---|
| 423 |
|
|---|
| 424 |
security.declareProtected(permissions.ModifyDeferred, |
|---|
| 425 |
'addCallbacks') |
|---|
| 426 |
def addCallbacks(self, callback=None, errback=None): |
|---|
| 427 |
if errback is None and callback is None: |
|---|
| 428 |
raise ValueError("No callback or errback added") |
|---|
| 429 |
callback = self._convertArg(callback, 'result') |
|---|
| 430 |
errback = self._convertArg(errback, 'failure') |
|---|
| 431 |
self.__callbacks += ((callback, errback),) |
|---|
| 432 |
# call immediately if called |
|---|
| 433 |
if self.raw_state != UNCALLED: |
|---|
| 434 |
return self.__call_all() |
|---|
| 435 |
return self |
|---|
| 436 |
|
|---|
| 437 |
# XXX this part of the API may be in flux; Twisted has deprecated it |
|---|
| 438 |
security.declareProtected(permissions.ModifyDeferred, |
|---|
| 439 |
'setTimeout') |
|---|
| 440 |
def setTimeout(self, seconds): |
|---|
| 441 |
if self.getState() is not None: |
|---|
| 442 |
raise RuntimeError( |
|---|
| 443 |
"Cannot set timeout on completed deferred") |
|---|
| 444 |
seconds = int(seconds) |
|---|
| 445 |
if seconds < 0: |
|---|
| 446 |
raise ValueError("Seconds must be >= 0") |
|---|
| 447 |
if self.timeout < seconds: |
|---|
| 448 |
raise ValueError( |
|---|
| 449 |
"Cannot set timeout greater than previous value", self.timeout) |
|---|
| 450 |
self.timeout = seconds |
|---|
| 451 |
|
|---|
| 452 |
security.declareProtected(permissions.View, 'remainingSeconds') |
|---|
| 453 |
def remainingSeconds(self): |
|---|
| 454 |
"""returns number of seconds until timeout (positive integer), or |
|---|
| 455 |
number of seconds since timeout (negative integer)""" |
|---|
| 456 |
timeout = self.timeout |
|---|
| 457 |
end = self.creation_date + datetime.timedelta(seconds = timeout) |
|---|
| 458 |
now = datetime.datetime.now() |
|---|
| 459 |
delta = end - now |
|---|
| 460 |
if delta < datetime.timedelta(): |
|---|
| 461 |
timeout = -(-delta).seconds # XXX should include day seconds too |
|---|
| 462 |
else: |
|---|
| 463 |
timeout = delta.seconds |
|---|
| 464 |
return timeout |
|---|
| 465 |
InitializeClass(Deferred) |
|---|
| 466 |
|
|---|
| 467 |
def getDeferredInfo(context, info, sort_field=None, reverse=False): |
|---|
| 468 |
res = [] |
|---|
| 469 |
if isinstance(info, PersistentQueue): |
|---|
| 470 |
ditems = info |
|---|
| 471 |
else: |
|---|
| 472 |
ditems = info.values() |
|---|
| 473 |
for d in ditems: |
|---|
| 474 |
d = d.__of__(context) |
|---|
| 475 |
plugin, args, kwargs = d.getSignature() |
|---|
| 476 |
state = d.getState() |
|---|
| 477 |
value = original_value = original_state = None |
|---|
| 478 |
if state is not None: |
|---|
| 479 |
value = d.getValue() |
|---|
| 480 |
if d.original_failure is None: |
|---|
| 481 |
original_value = d.original_result |
|---|
| 482 |
original_state = state_name_map[CALLED] |
|---|
| 483 |
else: |
|---|
| 484 |
original_value = d.original_failure |
|---|
| 485 |
original_state = state_name_map[FAILURE] |
|---|
| 486 |
info ={ |
|---|
| 487 |
'key': d.key, |
|---|
| 488 |
'user': d.getOwnerTuple(), |
|---|
| 489 |
'plugin': plugin, |
|---|
| 490 |
'args': args, |
|---|
| 491 |
'kwargs': kwargs, |
|---|
| 492 |
'state': state, |
|---|
| 493 |
'value': value, |
|---|
| 494 |
'creation_date': d.creation_date, |
|---|
| 495 |
'resolution_date': d.resolution_date, |
|---|
| 496 |
'original_state': original_state, |
|---|
| 497 |
'original_value': original_value} |
|---|
| 498 |
if sort_field is None: |
|---|
| 499 |
res.append(info) |
|---|
| 500 |
else: |
|---|
| 501 |
res.append((info.get(sort_field), info)) |
|---|
| 502 |
if sort_field is not None: |
|---|
| 503 |
res.sort() |
|---|
| 504 |
if reverse: |
|---|
| 505 |
res.reverse() |
|---|
| 506 |
res = [val for sort, val in res] |
|---|
| 507 |
return res |
|---|
| 508 |
|
|---|
| 509 |
class AsynchronousCallManager(PropertyManager, SimpleItem): |
|---|
| 510 |
"""a tool that holds deferreds for zasync to manipulate""" |
|---|
| 511 |
security = ClassSecurityInfo() |
|---|
| 512 |
|
|---|
| 513 |
manage_options = ( |
|---|
| 514 |
({'label':'Overview', 'action':'manage_overview',}, |
|---|
| 515 |
{'label':'Calls', 'action':'manage_calls',},) + |
|---|
| 516 |
PropertyManager.manage_options |
|---|
| 517 |
+ SimpleItem.manage_options) |
|---|
| 518 |
|
|---|
| 519 |
_properties = ( |
|---|
| 520 |
{'id':'rotation_period', 'type': 'int', 'mode':'w', |
|---|
| 521 |
'label': 'Resolved cache rotation period in seconds'}, |
|---|
| 522 |
{'id':'poll_interval', 'type': 'int', 'mode': 'w', |
|---|
| 523 |
'label': 'Interval between zasync call polls'}, |
|---|
| 524 |
) |
|---|
| 525 |
|
|---|
| 526 |
id = 'asynchronous_call_manager' |
|---|
| 527 |
title = meta_type = 'Asynchronous Call Manager' |
|---|
| 528 |
icon = "misc_/zasync/tool.gif" |
|---|
| 529 |
rotation_period = 60*60*24 # a day |
|---|
| 530 |
poll_interval = 5 |
|---|
| 531 |
__plugins = () |
|---|
| 532 |
_next_rotate = _last_ping = _last_pong = None |
|---|
| 533 |
|
|---|
| 534 |
# keep webdav interface off deferreds and tool |
|---|
| 535 |
__implements__ = (interfaces.IZasyncAsynchronousCallManager,) |
|---|
| 536 |
|
|---|
| 537 |
def _getBrowserId(self): |
|---|
| 538 |
bim = getattr(self, BROWSERID_MANAGER_NAME) |
|---|
| 539 |
return bim.getBrowserId() |
|---|
| 540 |
|
|---|
| 541 |
def __init__(self, id=None): |
|---|
| 542 |
if id is not None: |
|---|
| 543 |
self.id = id |
|---|
| 544 |
# items to be picked up by zasync |
|---|
| 545 |
self._new = PersistentQueue() |
|---|
| 546 |
# items collected by zasync from the queue |
|---|
| 547 |
self._accepted = OOBTree.OOBTree() |
|---|
| 548 |
# long term cache |
|---|
| 549 |
self._resolved = bforests.OOBForest() |
|---|
| 550 |
self._next_rotate = None |
|---|
| 551 |
self._last_ping = None |
|---|
| 552 |
self._last_pong = None |
|---|
| 553 |
|
|---|
| 554 |
|
|---|
| 555 |
security.declareProtected(permissions.ViewManagementScreens, |
|---|
| 556 |
'manage_overview') |
|---|
| 557 |
manage_overview = PageTemplateFile( |
|---|
| 558 |
'www/controlAsynchronousCallManagerForm.zpt', globals(), |
|---|
| 559 |
__name__='manage_overview') |
|---|
| 560 |
|
|---|
| 561 |
|
|---|
| 562 |
security.declareProtected(permissions.ViewManagementScreens, |
|---|
| 563 |
'manage_calls') |
|---|
| 564 |
manage_calls = PageTemplateFile( |
|---|
| 565 |
'www/analyzeCalls.zpt', globals(), |
|---|
| 566 |
__name__='manage_calls') |
|---|
| 567 |
|
|---|
| 568 |
security.declareProtected(permissions.ViewManagementScreens, |
|---|
| 569 |
'ping') |
|---|
| 570 |
def ping(self, REQUEST=None): |
|---|
| 571 |
"""make a ping request to the zasync client to see if it replies |
|---|
| 572 |
in the heartbeat method.""" |
|---|
| 573 |
self._last_ping = datetime.datetime.now() |
|---|
| 574 |
self._last_pong = None |
|---|
| 575 |
if REQUEST is not None: |
|---|
| 576 |
REQUEST.RESPONSE.redirect( |
|---|
| 577 |
'%s/manage_overview' % self.absolute_url()) |
|---|
| 578 |
|
|---|
| 579 |
security.declareProtected(permissions.ViewManagementScreens, |
|---|
| 580 |
'getLastPing') |
|---|
| 581 |
def getLastPing(self): |
|---|
| 582 |
return self._last_ping |
|---|
| 583 |
|
|---|
| 584 |
security.declareProtected(permissions.ViewManagementScreens, |
|---|
| 585 |
'getLastPong') |
|---|
| 586 |
def getLastPong(self): |
|---|
| 587 |
return self._last_pong |
|---|
| 588 |
|
|---|
| 589 |
# property code copied from CMFCore/utils.py SimpleItemWithProperties >>> |
|---|
| 590 |
security.declarePrivate('manage_addProperty') |
|---|
| 591 |
security.declarePrivate('manage_delProperties') |
|---|
| 592 |
security.declarePrivate('manage_changePropertyTypes') |
|---|
| 593 |
|
|---|
| 594 |
def manage_propertiesForm(self, REQUEST, *args, **kw): |
|---|
| 595 |
""" An override that makes the schema fixed. |
|---|
| 596 |
""" |
|---|
| 597 |
my_kw = kw.copy() |
|---|
| 598 |
my_kw['property_extensible_schema__'] = 0 |
|---|
| 599 |
form = PropertyManager.manage_propertiesForm.__of__(self) |
|---|
| 600 |
return form(self, REQUEST, *args, **my_kw) |
|---|
| 601 |
|
|---|
| 602 |
# <<< end copy from CMF |
|---|
| 603 |
|
|---|
| 604 |
security.declarePrivate('resolve') |
|---|
| 605 |
def resolve(self, deferred): |
|---|
| 606 |
try: |
|---|
| 607 |
del self._accepted[deferred.key] |
|---|
| 608 |
except KeyError: |
|---|
| 609 |
pass # presumably resolved before |
|---|
| 610 |
else: |
|---|
| 611 |
self._resolved[deferred.key] = deferred |
|---|
| 612 |
if self._next_rotate is None and self.rotation_period: |
|---|
| 613 |
self._next_rotate = ( |
|---|
| 614 |
datetime.datetime.now() + |
|---|
| 615 |
datetime.timedelta(seconds=self.rotation_period)) |
|---|
| 616 |
|
|---|
| 617 |
security.declarePublic('getNextCacheRotation') |
|---|
| 618 |
def getNextCacheRotation(self): |
|---|
| 619 |
return self._next_rotate |
|---|
| 620 |
|
|---|
| 621 |
security.declareProtected(permissions.ViewManagementScreens, |
|---|
| 622 |
'resetNextCacheRotation') |
|---|
| 623 |
def resetNextCacheRotation(self, clear=False): |
|---|
| 624 |
if clear: |
|---|
| 625 |
res = self._next_rotate = None |
|---|
| 626 |
else: |
|---|
| 627 |
res = self._next_rotate = ( |
|---|
| 628 |
datetime.datetime.now() + |
|---|
| 629 |
datetime.timedelta(seconds=self.rotation_period)) |
|---|
| 630 |
return res |
|---|
| 631 |
|
|---|
| 632 |
security.declarePrivate('setPlugins') |
|---|
| 633 |
def setPlugins(self, plugins): # protect plugins by groups or perms? |
|---|
| 634 |
"set item list, name: description" |
|---|
| 635 |
# XXX it would be good to include a signature validation function |
|---|
| 636 |
# XXX it would be good to be able to specify if some plugins are |
|---|
| 637 |
# not available to session calls--although that needs to be more |
|---|
| 638 |
# general; session call approach may need to be adjusted. See XXX in |
|---|
| 639 |
# getSessionDeferred. |
|---|
| 640 |
self.__plugins = tuple(plugins) |
|---|
| 641 |
|
|---|
| 642 |
security.declareProtected(permissions.View, 'listPlugins') |
|---|
| 643 |
def listPlugins(self): |
|---|
| 644 |
"""show plugin names available. key is name, value is description. |
|---|
| 645 |
If no plugins, no engine should be expected (i.e., the tool will |
|---|
| 646 |
not work).""" |
|---|
| 647 |
return dict(self.__plugins) |
|---|
| 648 |
|
|---|
| 649 |
security.declarePrivate('_putCall') |
|---|
| 650 |
def _putCall(self, _id_prefix, _plugin, *args, **kwargs): |
|---|
| 651 |
if _plugin not in self.listPlugins(): # keys of dict |
|---|
| 652 |
raise ValueError( |
|---|
| 653 |
"Requested plugin is not registered") |
|---|
| 654 |
while 1: |
|---|
| 655 |
randomizer = key = random.randint(-sys.maxint-1, sys.maxint) |
|---|
| 656 |
if _id_prefix is not None: |
|---|
| 657 |
key = _id_prefix + (randomizer,) |
|---|
| 658 |
if self.getDeferred(key) is None: |
|---|
| 659 |
break |
|---|
| 660 |
args = sanitize(args) |
|---|
| 661 |
kwargs = sanitize(kwargs) |
|---|
| 662 |
d = Deferred(_plugin, args, kwargs) |
|---|
| 663 |
d.key = key |
|---|
| 664 |
d.id = repr(key) |
|---|
| 665 |
d.local_key = randomizer |
|---|
| 666 |
self._new.append(d) |
|---|
| 667 |
wrapped = d.__of__(self) |
|---|
| 668 |
wrapped.manage_fixupOwnershipAfterAdd() |
|---|
| 669 |
user=getSecurityManager().getUser() |
|---|
| 670 |
if user is not None: |
|---|
| 671 |
userid=user.getId() |
|---|
| 672 |
if userid is not None: |
|---|
| 673 |
wrapped.manage_setLocalRoles(userid, ['Owner']) |
|---|
| 674 |
return wrapped |
|---|
| 675 |
|
|---|
| 676 |
security.declareProtected( |
|---|
| 677 |
permissions.MakeAsynchronousApplicationCalls, 'putCall') |
|---|
| 678 |
def putCall(self, _plugin, *args, **kwargs): |
|---|
| 679 |
return self._putCall( |
|---|
| 680 |
None, _plugin, *args, **kwargs) |
|---|
| 681 |
|
|---|
| 682 |
security.declareProtected( |
|---|
| 683 |
permissions.MakeAsynchronousSessionCalls, 'putSessionCall') |
|---|
| 684 |
def putSessionCall(self, _plugin, *args, **kwargs): |
|---|
| 685 |
return self._putCall( |
|---|
| 686 |
(self._getBrowserId(),), _plugin, *args, **kwargs) |
|---|
| 687 |
|
|---|
| 688 |
security.declareProtected( |
|---|
| 689 |
permissions.MakeAsynchronousApplicationCalls, 'getDeferred') |
|---|
| 690 |
def getDeferred(self, d_id, default=None): |
|---|
| 691 |
for src in self._new: |
|---|
| 692 |
if src.key == d_id: |
|---|
| 693 |
return src.__of__(self) |
|---|
| 694 |
for src in (self._accepted, self._resolved): |
|---|
| 695 |
res = src.get(d_id) |
|---|
| 696 |
if res is not None: |
|---|
| 697 |
return res.__of__(self) |
|---|
| 698 |
return default |
|---|
| 699 |
|
|---|
| 700 |
security.declareProtected( |
|---|
| 701 |
permissions.MakeAsynchronousSessionCalls, 'getSessionDeferred') |
|---|
| 702 |
def getSessionDeferred(self, d_id, default=None): |
|---|
| 703 |
# XXX this API doesn't support the use case of users who have limited |
|---|
| 704 |
# permissions but still should be able to store away keys for later |
|---|
| 705 |
# look-up, irrespective of sessions. We should either have three |
|---|
| 706 |
# levels of deferred call or let plugins specify permissions--but then |
|---|
| 707 |
# in what context? |
|---|
| 708 |
bid = self._getBrowserId() |
|---|
| 709 |
if isinstance(d_id, tuple): |
|---|
| 710 |
if d_id[0] != bid: |
|---|
| 711 |
return default |
|---|
| 712 |
d_id = d_id[-1] |
|---|
| 713 |
return self.getDeferred((bid, d_id), default) |
|---|
| 714 |
|
|---|
| 715 |
def __len__(self): # potentially expensive |
|---|
| 716 |
return len(self._new) + len(self._accepted) + len(self._resolved) |
|---|
| 717 |
|
|---|
| 718 |
def __nonzero__(self): |
|---|
| 719 |
return True |
|---|
| 720 |
|
|---|
| 721 |
security.declareProtected( |
|---|
| 722 |
permissions.ViewManagementScreens, 'listNewCalls') |
|---|
| 723 |
def listNewCalls(self, sort='creation_date', reverse=True): |
|---|
| 724 |
return getDeferredInfo(self, self._new, sort, reverse) |
|---|
| 725 |
|
|---|
| 726 |
security.declareProtected( |
|---|
| 727 |
permissions.ViewManagementScreens, 'listAcceptedCalls') |
|---|
| 728 |
def listAcceptedCalls(self, sort='creation_date', reverse=True): |
|---|
| 729 |
return getDeferredInfo(self, self._accepted, sort, reverse) |
|---|
| 730 |
|
|---|
| 731 |
security.declareProtected( |
|---|
| 732 |
permissions.ViewManagementScreens, 'listResolvedCalls') |
|---|
| 733 |
def listResolvedCalls(self, bucket=None, sort='creation_date', reverse=True): |
|---|
| 734 |
if bucket is None: |
|---|
| 735 |
d = self._resolved |
|---|
| 736 |
else: |
|---|
| 737 |
d = self._resolved.buckets[bucket] |
|---|
| 738 |
return getDeferredInfo(self, d, sort, reverse) |
|---|
| 739 |
|
|---|
| 740 |
security.declareProtected( |
|---|
| 741 |
permissions.ViewManagementScreens, 'lenNewCalls') |
|---|
| 742 |
def lenNewCalls(self): |
|---|
| 743 |
return len(self._new) |
|---|
| 744 |
|
|---|
| 745 |
security.declareProtected( |
|---|
| 746 |
permissions.ViewManagementScreens, 'lenAcceptedCalls') |
|---|
| 747 |
def lenAcceptedCalls(self): |
|---|
| 748 |
return len(self._accepted) |
|---|
| 749 |
|
|---|
| 750 |
security.declareProtected( |
|---|
| 751 |
permissions.ViewManagementScreens, 'lenResolvedCalls') |
|---|
| 752 |
def lenResolvedCalls(self, bucket=None): |
|---|
| 753 |
if bucket is None: |
|---|
| 754 |
d = self._resolved |
|---|
| 755 |
else: |
|---|
| 756 |
d = self._resolved.buckets[bucket] |
|---|
| 757 |
return len(d) |
|---|
| 758 |
|
|---|
| 759 |
security.declareProtected( |
|---|
| 760 |
permissions.ViewManagementScreens, 'lenResolvedBuckets') |
|---|
| 761 |
def lenResolvedBuckets(self): |
|---|
| 762 |
return len(self._resolved.buckets) |
|---|
| 763 |
|
|---|
| 764 |
security.declareProtected( |
|---|
| 765 |
permissions.ViewManagementScreens, 'nextResolvedBucketRotation') |
|---|
| 766 |
def nextResolvedBucketRotation(self): |
|---|
| 767 |
return self._next_rotate |
|---|
| 768 |
|
|---|
| 769 |
security.declarePrivate('acceptAll') |
|---|
| 770 |
def acceptAll(self): |
|---|
| 771 |
new = self._new |
|---|
| 772 |
for d in new: |
|---|
| 773 |
self._accepted[d.key] = d |
|---|
| 774 |
res = [] |
|---|
| 775 |
if new: |
|---|
| 776 |
res = new[:] |
|---|
| 777 |
while new: |
|---|
| 778 |
new.pop(0) |
|---|
| 779 |
return res |
|---|
| 780 |
|
|---|
| 781 |
security.declarePrivate('getAcceptedCalls') |
|---|
| 782 |
def getAcceptedCalls(self): |
|---|
| 783 |
return self._accepted.values() |
|---|
| 784 |
|
|---|
| 785 |
security.declarePrivate('heartbeat') |
|---|
| 786 |
def heartbeat(self): |
|---|
| 787 |
next_rotate = self._next_rotate |
|---|
| 788 |
now = datetime.datetime.now() |
|---|
| 789 |
if next_rotate is not None and next_rotate < now: |
|---|
| 790 |
self._resolved.rotateBucket() |
|---|
| 791 |
if self._resolved: # i.e., not empty; len is expensive! |
|---|
| 792 |
self._next_rotate = ( |
|---|
| 793 |
now + |
|---|
| 794 |
datetime.timedelta(seconds=self.rotation_period)) |
|---|
| 795 |
else: |
|---|
| 796 |
self._next_rotate = None |
|---|
| 797 |
if self._last_ping is not None and self._last_pong is None: |
|---|
| 798 |
self._last_pong = now |
|---|
| 799 |
InitializeClass(AsynchronousCallManager) |
|---|
| 800 |
|
|---|
| 801 |
constructAsynchronousCallManagerForm = PageTemplateFile( |
|---|
| 802 |
'www/constructAsynchronousCallManagerForm.zpt', globals(), |
|---|
| 803 |
__name__='manage_addSchedulerForm') |
|---|
| 804 |
|
|---|
| 805 |
def constructAsynchronousCallManager( |
|---|
| 806 |
dispatcher, id="asynchronous_call_manager", |
|---|
| 807 |
poll_interval=None, rotation_period=None, RESPONSE=None): |
|---|
| 808 |
"""Construct an AsynchronousCallManager""" |
|---|
| 809 |
acm = AsynchronousCallManager(id) |
|---|
| 810 |
if poll_interval is not None: |
|---|
| 811 |
acm.poll_interval = max(poll_interval, 2) |
|---|
| 812 |
if rotation_period is not None: |
|---|
| 813 |
acm.rotation_period = abs(rotation_period) |
|---|
| 814 |
container = dispatcher.this() |
|---|
| 815 |
container._setObject(id, acm) |
|---|
| 816 |
|
|---|
| 817 |
if RESPONSE is not None: |
|---|
| 818 |
RESPONSE.redirect(container.absolute_url() + '/manage_main') |
|---|