root/vendor/zasync/branches/z29-nux/manager.py

Revision 31848, 29.1 kB (checked in by rspivak, 4 years ago)

Made Properties tab in ZMI to work for async manager(tested with Zope 2.8.x)

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1 ##############################################################################
2 #
3 # Copyright (c) 2004 Zope Corporation and Contributors. All Rights Reserved.
4 #
5 # This software is subject to the provisions of the Zope Public License,
6 # Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10 # FOR A PARTICULAR PURPOSE.
11 #
12 ##############################################################################
13 """Asynchronous call manager
14
15 A place to hang asynchronous calls and get their results
16
17 $Id$
18 """
19
20 from types import NoneType
21 import datetime, random, sys, os, logging, sets, traceback
22
23 from twisted.internet import defer
24 from twisted.python import failure
25
26 from Persistence import Persistent
27 from ZODB.POSException import ConflictError
28 from ZEO.Exceptions import ClientDisconnected
29 from Globals import InitializeClass, Persistent
30 from Acquisition import aq_base, aq_inner, aq_parent
31 from AccessControl.Owned import ownerInfo
32 from AccessControl import ClassSecurityInfo, Unauthorized, \
33     getSecurityManager
34 from AccessControl.SecurityManagement import newSecurityManager, \
35     setSecurityManager
36 from Persistence import PersistentMapping
37 from OFS.SimpleItem import SimpleItem
38 from OFS.PropertyManager import PropertyManager
39 from BTrees import OOBTree
40 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
41 from Products.PageTemplates.Expressions import getEngine, SecureModuleImporter
42 from Products.Sessions.BrowserIdManager import BROWSERID_MANAGER_NAME
43 from AccessControl.SecurityInfo import allow_class
44
45 from nuxeo.persistentqueue.persistentqueue import PersistentQueue
46
47 import permissions, bforests, interfaces
48
49 UNCALLED = 0
50 CALLED = 1
51 FAILURE = -1
52 state_name_map = {UNCALLED: None, CALLED: 'success', FAILURE: 'failure'}
53
54 # try to make failure.Failure available to restricted python
55 allow_class(failure.Failure) # in particular, this should allow access to
56 # 'getErrorMessage', 'getBriefTraceback', 'getTraceback', and 'trap'.
57
58 def iterable(value):
59     try:
60         iter(value)
61     except TypeError:
62         return False
63     else:
64         return True
65
66 def countable(value):
67     try:
68         len(value)
69     except TypeError:
70         return False
71     else:
72         return True
73
74 class Reference:
75
76     def __init__(self, obj):
77         try:
78             self._path = obj.getPhysicalPath()
79         except (AttributeError, KeyError):
80             self._path = None
81         self._repr = repr(obj)
82
83     def dereference(self, context):
84         return context.restrictedTraverse(self._path, None)
85
86     def __repr__(self):
87         return self._repr
88 allow_class(Reference)
89
90 class StandIn:
91
92     data = klass = contents = None
93
94     def __init__(self, obj, depth, contents=None):
95         d = getattr(obj, '__dict__', None)
96         if d is not None:
97             self.data = sanitize(d, depth)
98         self.klass = obj.__class__
99         self.contents = contents
100         self._repr = repr(obj)
101
102     def __getattr__(self, name):
103         if self.data is not None and self.data is not MAX_DEPTH_MARKER:
104             try:
105                 return self.data[name]
106             except KeyError:
107                 pass
108         raise AttributeError(name)
109
110     def __repr__(self):
111         return self._repr
112 allow_class(StandIn)
113
114 MAX_DEPTH = 4
115
116 class MAX_DEPTH_MARKER: pass # singleton
117
118 def sanitize(value, depth=0, max_depth=MAX_DEPTH):
119     if depth >= max_depth:
120         return MAX_DEPTH_MARKER
121     depth += 1
122     if aq_base(value) is not value or isinstance(value, Persistent):
123         if (aq_parent(value) is not None and
124             getattr(aq_base(value), 'getPhysicalPath', None) is not None):
125             value = Reference(value)
126         else:
127             value = StandIn(value, depth) # XXX could theoretically do a better
128             # job for BTrees, but don't think I really want to make sending
129             # those across the wire all that easy
130     elif getattr(value, '__dict__', None) is not None:
131         value = StandIn(value, depth) # XXX not ideal for subclasses of built
132         # in types, but oh well
133     elif isinstance(value, (list, tuple, sets.Set)):
134         val_type = type(value)
135         contents = [sanitize(item, depth) for item in value]
136         if val_type is list:
137             value = contents
138         else:
139             try:
140                 value = val_type(contents)
141             except TypeError:
142                 value = StandIn(value, depth, contents)
143     elif isinstance(value, dict):
144         contents = [
145             (sanitize(k, depth), sanitize(v, depth)) for k, v in value.items()]
146         val_type = type(value)
147         try:
148             value = val_type(contents)
149         except TypeError:
150             value = StandIn(value, depth, contents)
151     elif not isinstance(value, basestring) and iterable(value):
152         contents = None
153         if countable(value):
154             contents = [sanitize(item, depth) for item in value]
155         value = StandIn(value, depth, contents)
156     return value
157
158 def cleanFailure(failure):
159     # used instead of failure.cleanFailure for Zope-based failures.  See comment
160     # in safe_repr below.  Could be monkey patched, but want to try and be a
161     # good Twisted citizen for other Twisted services (particularly in the
162     # client).
163     c = failure.__dict__.copy()
164
165     def safe_repr(obj): # this is needed because Zope effectively raises some
166         # exceptions in __repr__ (I found one in Shared/DC/Scripts/Bindings.py,
167         # UnauthorizedBinding.__getattr__). :-(
168         try:
169             return repr(obj)
170         except:
171             return traceback.format_exception_only(*sys.exc_info()[:2])[0]
172
173     c['frames'] = [
174         [
175             v[0], v[1], v[2],
176             [(j[0], safe_repr(j[1])) for j in v[3]],
177             [(j[0], safe_repr(j[1])) for j in v[4]]
178         ] for v in failure.frames
179     ]
180
181     c['tb'] = None
182
183     if failure.stack is not None:
184         c['stack'] = [
185             [
186                 v[0], v[1], v[2],
187                 [(j[0], repr(j[1])) for j in v[3]],
188                 [(j[0], repr(j[1])) for j in v[4]]
189             ] for v in failure.stack
190         ]
191
192     c['pickled'] = 1
193     failure.__dict__ = c
194     return failure
195
196 # Expression class copied from CMFCore/Expression.py
197 class Expression (Persistent):
198     text = ''
199     _v_compiled = None
200
201     security = ClassSecurityInfo()
202
203     def __init__(self, text):
204         self.text = text
205         self._v_compiled = getEngine().compile(text)
206
207     def __call__(self, econtext):
208         compiled = self._v_compiled
209         if compiled is None:
210             compiled = self._v_compiled = getEngine().compile(self.text)
211         res = compiled(econtext)
212         if isinstance(res, Exception):
213             raise res
214         return res
215
216 class Deferred(SimpleItem):
217
218     result = failure = original_result = original_failure = None
219     key = local_key = resolution_date = None
220     timeout = 60 * 60 * 24 # one day; zasync plugins generally constrain
221     # timeouts more strictly
222
223
224     # keep webdav interface off deferreds and tool
225     __implements__ = (interfaces.IZopeDeferred,)
226
227     security = ClassSecurityInfo()
228
229     def __init__(self, plugin, args, kwargs):
230         self.creation_date = datetime.datetime.now()
231         self.__signature = (plugin, args, kwargs)
232         self.__callbacks = ()
233         self.raw_state = UNCALLED # raw_state is not reliable.
234         # Use getState.
235
236     security.declareProtected(permissions.View, 'getSignature')
237     def getSignature(self):
238         return self.__signature
239
240     security.declareProtected(permissions.View, 'getState')
241     def getState(self):
242         "returns 'success', 'failure', or None"
243         state = self.raw_state
244         if state==UNCALLED and (
245             self.remainingSeconds() < -(self.aq_parent.poll_interval*3)):
246             state=FAILURE
247         global state_name_map
248         return state_name_map[state]
249
250     security.declareProtected(permissions.View, 'getRawState')
251     def getRawState(self):
252         "returns 'success', 'failure', or None"
253         global state_name_map
254         return state_name_map[self.raw_state]
255
256     security.declareProtected(permissions.View, 'getValue')
257     def getValue(self):
258         state = self.raw_state
259         if state==UNCALLED:
260             if (self.remainingSeconds() < -(self.aq_parent.poll_interval*3)):
261                 return failure.Failure(defer.TimeoutError('Timed out.'))
262             raise RuntimeError("Deferred still pending")
263         if state==CALLED:
264             return self.result
265         else:
266             return self.failure
267
268     security.declareProtected(permissions.View, 'getRawValue')
269     def getRawValue(self):
270         state = self.raw_state
271         if state==UNCALLED:
272             raise RuntimeError("Deferred still pending")
273         if state==CALLED:
274             return self.result
275         else:
276             return self.failure
277
278     security.declareProtected(permissions.View, 'getErrorMessage')
279     def getErrorMessage(self):
280         state = self.getState()
281         if state != 'failure':
282             raise RuntimeError("No failure state.", state or 'pending')
283         value = self.getValue()
284         return value and value.getErrorMessage() or 'Unknown error.'
285
286     def __repr__(self):
287         classname = type(self).__name__
288         if aq_parent(self) is not None:
289             location = " in %s" % '/'.join(self.getPhysicalPath())
290         else:
291             location = ""
292         return "<%s (key %r)%s at %s>" % (
293             classname, self.key, location, id(aq_base(self)))
294
295     def __call_all(self):
296         callbacks = self.__callbacks
297         assert self.raw_state != UNCALLED
298         res = (self.raw_state == FAILURE and self.failure or self.result)
299         self.resolution_date = datetime.datetime.now()
300         if not callbacks:
301             return res
302         user = self.getWrappedOwner()
303         tool = aq_parent(self)
304         root = self.getPhysicalRoot()
305         results = {}
306         old_sm = getSecurityManager()
307         newSecurityManager(None, user)
308         context_dict = {
309             'nothing': None, # provided automatically but here for clarity
310             'user': user,
311             'userhome': aq_parent(aq_inner(aq_parent(aq_inner(user)))),
312             'tool': tool,
313             'root': root,
314             'deferred': self,
315             'here': self,
316             'modules': SecureModuleImporter,
317             'request': root.REQUEST,
318             'result': self.result,
319             'failure': self.failure,
320             'original': res,
321             'results': results,
322             }
323         try:
324             callbacks = list(callbacks)
325             self.__callbacks = ()
326             while callbacks:
327                 callback, errback = callbacks.pop(0)
328                 if self.raw_state == CALLED:
329                     call = callback
330                 else:
331                     assert self.raw_state == FAILURE
332                     call = errback
333                 name = None
334                 if call is None:
335                     continue # no-op
336                 elif isinstance(call, tuple):
337                     name, call = call
338                 context = getEngine().getContext(context_dict) # XXX looks like
339                 # we need to create a new one of these each time in order to
340                 # make the change to result or failure stick.  Before tried
341                 # context.contexts['result'] = res (and similar for failure)
342                 # to change, but no success.
343                 try:
344                     res = call(context)
345                 except ((ConflictError, KeyboardInterrupt,
346                          SystemExit, ClientDisconnected)):
347                     raise
348                 except:
349                     res = failure.Failure()
350                 else:
351                     if name is not None:
352                         results[name] = res
353                 if isinstance(res, failure.Failure):
354                     cleanFailure(res) # remove object references
355                     self.failure = context_dict["failure"] = res
356                     self.raw_state = FAILURE
357                 else:
358                     self.result = context_dict["result"] = res
359                     self.raw_state = CALLED
360         finally:
361             setSecurityManager(old_sm)
362         return res
363         # manage transactions and retries externally (zasync)
364
365     security.declarePrivate('callback')
366     def callback(self, result):
367         self.aq_inner.aq_parent.resolve(self)
368         if self.raw_state != UNCALLED:
369             # we don't want to abort the transaction because there's a good
370             # chance that we *need* to have the manager resolve this deferred.
371             # Therefore, we return a failure rather than raise a transaction.
372             # We do not call the callbacks.
373             logging.getLogger('zasync').error(
374                 "Zope deferred %r has already been called" % (self.key,))
375             return failure.Failure(
376                 RuntimeError("Deferred has already been called"))
377         self.raw_state = CALLED
378         self.original_result = self.result = result
379         return self.__call_all()
380
381     security.declarePrivate('errback')
382     def errback(self, fail):
383         self.aq_inner.aq_parent.resolve(self)
384         if self.raw_state != UNCALLED:
385             # we don't want to abort the transaction because there's a good
386             # chance that we *need* to have the manager resolve this deferred.
387             # Therefore, we return a failure rather than raise a transaction.
388             # We do not call the errbacks.
389             logging.getLogger('zasync').error(
390                 "Zope deferred %r has already been called" % (self.key,))
391             return failure.Failure(
392                 RuntimeError("Deferred has already been called"))
393         self.raw_state = FAILURE
394         self.original_failure = self.failure = fail
395         return self.__call_all()
396
397     security.declareProtected(permissions.ModifyDeferred,
398                               'addCallback')
399     def addCallback(self, callback):
400         return self.addCallbacks(callback)
401
402     security.declareProtected(permissions.ModifyDeferred,
403                               'addErrback')
404     def addErrback(self, errback):
405         return self.addCallbacks(errback=errback)
406
407     def _convertArg(self, arg, default):
408         if not arg or arg==default:
409             arg = None
410         else:
411             if isinstance(arg, basestring):
412                 arg = Expression(arg)
413             elif (isinstance(arg, (list, tuple)) and
414                   len(arg)==2 and
415                   isinstance(arg[0], basestring) and
416                   isinstance(arg[1], basestring)):
417                 arg = (arg[0], Expression(arg[1]))
418             else:
419                 raise ValueError(
420                     "Must be None, expression string, or pair of (result name, "
421                     "expression string)", arg)
422         return arg
423
424     security.declareProtected(permissions.ModifyDeferred,
425                               'addCallbacks')
426     def addCallbacks(self, callback=None, errback=None):
427         if errback is None and callback is None:
428             raise ValueError("No callback or errback added")
429         callback = self._convertArg(callback, 'result')
430         errback = self._convertArg(errback, 'failure')
431         self.__callbacks += ((callback, errback),)
432         # call immediately if called
433         if self.raw_state != UNCALLED:
434             return self.__call_all()
435         return self
436
437     # XXX this part of the API may be in flux; Twisted has deprecated it
438     security.declareProtected(permissions.ModifyDeferred,
439                               'setTimeout')
440     def setTimeout(self, seconds):
441         if self.getState() is not None:
442             raise RuntimeError(
443                 "Cannot set timeout on completed deferred")
444         seconds = int(seconds)
445         if seconds < 0:
446             raise ValueError("Seconds must be >= 0")
447         if self.timeout < seconds:
448             raise ValueError(
449                 "Cannot set timeout greater than previous value", self.timeout)
450         self.timeout = seconds
451
452     security.declareProtected(permissions.View, 'remainingSeconds')
453     def remainingSeconds(self):
454         """returns number of seconds until timeout (positive integer), or
455         number of seconds since timeout (negative integer)"""
456         timeout = self.timeout
457         end = self.creation_date + datetime.timedelta(seconds = timeout)
458         now = datetime.datetime.now()
459         delta = end - now
460         if delta < datetime.timedelta():
461             timeout = -(-delta).seconds # XXX should include day seconds too
462         else:
463             timeout = delta.seconds
464         return timeout
465 InitializeClass(Deferred)
466
467 def getDeferredInfo(context, info, sort_field=None, reverse=False):
468     res = []
469     if isinstance(info, PersistentQueue):
470         ditems = info
471     else:
472         ditems = info.values()
473     for d in ditems:
474         d = d.__of__(context)
475         plugin, args, kwargs = d.getSignature()
476         state = d.getState()
477         value = original_value = original_state = None
478         if state is not None:
479             value = d.getValue()
480             if d.original_failure is None:
481                 original_value = d.original_result
482                 original_state = state_name_map[CALLED]
483             else:
484                 original_value = d.original_failure
485                 original_state = state_name_map[FAILURE]
486         info ={
487             'key': d.key,
488             'user': d.getOwnerTuple(),
489             'plugin': plugin,
490             'args': args,
491             'kwargs': kwargs,
492             'state': state,
493             'value': value,
494             'creation_date': d.creation_date,
495             'resolution_date': d.resolution_date,
496             'original_state': original_state,
497             'original_value': original_value}
498         if sort_field is None:
499             res.append(info)
500         else:
501             res.append((info.get(sort_field), info))
502     if sort_field is not None:
503         res.sort()
504         if reverse:
505             res.reverse()
506         res = [val for sort, val in res]
507     return res
508
509 class AsynchronousCallManager(PropertyManager, SimpleItem):
510     """a tool that holds deferreds for zasync to manipulate"""
511     security = ClassSecurityInfo()
512
513     manage_options = (
514         ({'label':'Overview', 'action':'manage_overview',},
515          {'label':'Calls', 'action':'manage_calls',},) +
516         PropertyManager.manage_options
517         + SimpleItem.manage_options)
518
519     _properties = (
520         {'id':'rotation_period', 'type': 'int', 'mode':'w',
521          'label': 'Resolved cache rotation period in seconds'},
522         {'id':'poll_interval', 'type': 'int', 'mode': 'w',
523          'label': 'Interval between zasync call polls'},
524         )
525
526     id = 'asynchronous_call_manager'
527     title = meta_type = 'Asynchronous Call Manager'
528     icon = "misc_/zasync/tool.gif"
529     rotation_period = 60*60*24 # a day
530     poll_interval = 5
531     __plugins = ()
532     _next_rotate = _last_ping = _last_pong = None
533
534     # keep webdav interface off deferreds and tool
535     __implements__ = (interfaces.IZasyncAsynchronousCallManager,)
536
537     def _getBrowserId(self):
538         bim = getattr(self, BROWSERID_MANAGER_NAME)
539         return bim.getBrowserId()
540
541     def __init__(self, id=None):
542         if id is not None:
543             self.id = id
544         # items to be picked up by zasync
545         self._new = PersistentQueue()
546         # items collected by zasync from the queue
547         self._accepted = OOBTree.OOBTree()
548         # long term cache
549         self._resolved = bforests.OOBForest()
550         self._next_rotate = None
551         self._last_ping = None
552         self._last_pong = None
553
554
555     security.declareProtected(permissions.ViewManagementScreens,
556                               'manage_overview')
557     manage_overview = PageTemplateFile(
558         'www/controlAsynchronousCallManagerForm.zpt', globals(),
559         __name__='manage_overview')
560
561
562     security.declareProtected(permissions.ViewManagementScreens,
563                               'manage_calls')
564     manage_calls = PageTemplateFile(
565         'www/analyzeCalls.zpt', globals(),
566         __name__='manage_calls')
567
568     security.declareProtected(permissions.ViewManagementScreens,
569                               'ping')
570     def ping(self, REQUEST=None):
571         """make a ping request to the zasync client to see if it replies
572         in the heartbeat method."""
573         self._last_ping = datetime.datetime.now()
574         self._last_pong = None
575         if REQUEST is not None:
576             REQUEST.RESPONSE.redirect(
577                 '%s/manage_overview' % self.absolute_url())
578
579     security.declareProtected(permissions.ViewManagementScreens,
580                               'getLastPing')
581     def getLastPing(self):
582         return self._last_ping
583
584     security.declareProtected(permissions.ViewManagementScreens,
585                               'getLastPong')
586     def getLastPong(self):
587         return self._last_pong
588
589     # property code copied from CMFCore/utils.py SimpleItemWithProperties >>>
590     security.declarePrivate('manage_addProperty')
591     security.declarePrivate('manage_delProperties')
592     security.declarePrivate('manage_changePropertyTypes')
593
594     def manage_propertiesForm(self, REQUEST, *args, **kw):
595         """ An override that makes the schema fixed.
596         """
597         my_kw = kw.copy()
598         my_kw['property_extensible_schema__'] = 0
599         form = PropertyManager.manage_propertiesForm.__of__(self)
600         return form(self, REQUEST, *args, **my_kw)
601
602     # <<< end copy from CMF
603
604     security.declarePrivate('resolve')
605     def resolve(self, deferred):
606         try:
607             del self._accepted[deferred.key]
608         except KeyError:
609             pass # presumably resolved before
610         else:
611             self._resolved[deferred.key] = deferred
612             if self._next_rotate is None and self.rotation_period:
613                 self._next_rotate = (
614                     datetime.datetime.now() +
615                     datetime.timedelta(seconds=self.rotation_period))
616
617     security.declarePublic('getNextCacheRotation')
618     def getNextCacheRotation(self):
619         return self._next_rotate
620
621     security.declareProtected(permissions.ViewManagementScreens,
622                               'resetNextCacheRotation')
623     def resetNextCacheRotation(self, clear=False):
624         if clear:
625             res = self._next_rotate = None
626         else:
627             res = self._next_rotate = (
628                 datetime.datetime.now() +
629                 datetime.timedelta(seconds=self.rotation_period))
630         return res
631
632     security.declarePrivate('setPlugins')
633     def setPlugins(self, plugins): # protect plugins by groups or perms?
634         "set item list, name: description"
635         # XXX it would be good to include a signature validation function
636         # XXX it would be good to be able to specify if some plugins are
637         # not available to session calls--although that needs to be more
638         # general; session call approach may need to be adjusted.  See XXX in
639         # getSessionDeferred.
640         self.__plugins = tuple(plugins)
641
642     security.declareProtected(permissions.View, 'listPlugins')
643     def listPlugins(self):
644         """show plugin names available.  key is name, value is description.
645         If no plugins, no engine should be expected (i.e., the tool will
646         not work)."""
647         return dict(self.__plugins)
648
649     security.declarePrivate('_putCall')
650     def _putCall(self, _id_prefix, _plugin, *args, **kwargs):
651         if _plugin not in self.listPlugins(): # keys of dict
652             raise ValueError(
653                 "Requested plugin is not registered")
654         while 1:
655             randomizer = key = random.randint(-sys.maxint-1, sys.maxint)
656             if _id_prefix is not None:
657                 key = _id_prefix + (randomizer,)
658             if self.getDeferred(key) is None:
659                 break
660         args = sanitize(args)
661         kwargs = sanitize(kwargs)
662         d = Deferred(_plugin, args, kwargs)
663         d.key = key
664         d.id = repr(key)
665         d.local_key = randomizer
666         self._new.append(d)
667         wrapped = d.__of__(self)
668         wrapped.manage_fixupOwnershipAfterAdd()
669         user=getSecurityManager().getUser()
670         if user is not None:
671             userid=user.getId()
672             if userid is not None:
673                 wrapped.manage_setLocalRoles(userid, ['Owner'])
674         return wrapped
675
676     security.declareProtected(
677         permissions.MakeAsynchronousApplicationCalls, 'putCall')
678     def putCall(self, _plugin, *args, **kwargs):
679         return self._putCall(
680             None, _plugin, *args, **kwargs)
681
682     security.declareProtected(
683         permissions.MakeAsynchronousSessionCalls, 'putSessionCall')
684     def putSessionCall(self, _plugin, *args, **kwargs):
685         return self._putCall(
686             (self._getBrowserId(),), _plugin, *args, **kwargs)
687
688     security.declareProtected(
689         permissions.MakeAsynchronousApplicationCalls, 'getDeferred')
690     def getDeferred(self, d_id, default=None):
691         for src in self._new:
692             if src.key == d_id:
693                 return src.__of__(self)
694         for src in (self._accepted, self._resolved):
695             res = src.get(d_id)
696             if res is not None:
697                 return res.__of__(self)
698         return default
699
700     security.declareProtected(
701         permissions.MakeAsynchronousSessionCalls, 'getSessionDeferred')
702     def getSessionDeferred(self, d_id, default=None):
703         # XXX this API doesn't support the use case of users who have limited
704         # permissions but still should be able to store away keys for later
705         # look-up, irrespective of sessions.  We should either have three
706         # levels of deferred call or let plugins specify permissions--but then
707         # in what context?
708         bid = self._getBrowserId()
709         if isinstance(d_id, tuple):
710             if d_id[0] != bid:
711                 return default
712             d_id = d_id[-1]
713         return self.getDeferred((bid, d_id), default)
714
715     def __len__(self): # potentially expensive
716         return len(self._new) + len(self._accepted) + len(self._resolved)
717
718     def __nonzero__(self):
719         return True
720
721     security.declareProtected(
722         permissions.ViewManagementScreens, 'listNewCalls')
723     def listNewCalls(self, sort='creation_date', reverse=True):
724         return getDeferredInfo(self, self._new, sort, reverse)
725
726     security.declareProtected(
727         permissions.ViewManagementScreens, 'listAcceptedCalls')
728     def listAcceptedCalls(self, sort='creation_date', reverse=True):
729         return getDeferredInfo(self, self._accepted, sort, reverse)
730
731     security.declareProtected(
732         permissions.ViewManagementScreens, 'listResolvedCalls')
733     def listResolvedCalls(self, bucket=None, sort='creation_date', reverse=True):
734         if bucket is None:
735             d = self._resolved
736         else:
737             d = self._resolved.buckets[bucket]
738         return getDeferredInfo(self, d, sort, reverse)
739
740     security.declareProtected(
741         permissions.ViewManagementScreens, 'lenNewCalls')
742     def lenNewCalls(self):
743         return len(self._new)
744
745     security.declareProtected(
746         permissions.ViewManagementScreens, 'lenAcceptedCalls')
747     def lenAcceptedCalls(self):
748         return len(self._accepted)
749
750     security.declareProtected(
751         permissions.ViewManagementScreens, 'lenResolvedCalls')
752     def lenResolvedCalls(self, bucket=None):
753         if bucket is None:
754             d = self._resolved
755         else:
756             d = self._resolved.buckets[bucket]
757         return len(d)
758
759     security.declareProtected(
760         permissions.ViewManagementScreens, 'lenResolvedBuckets')
761     def lenResolvedBuckets(self):
762         return len(self._resolved.buckets)
763
764     security.declareProtected(
765         permissions.ViewManagementScreens, 'nextResolvedBucketRotation')
766     def nextResolvedBucketRotation(self):
767         return self._next_rotate
768
769     security.declarePrivate('acceptAll')
770     def acceptAll(self):
771         new = self._new
772         for d in new:
773             self._accepted[d.key] = d
774         res = []
775         if new:
776             res = new[:]
777         while new:
778             new.pop(0)
779         return res
780
781     security.declarePrivate('getAcceptedCalls')
782     def getAcceptedCalls(self):
783         return self._accepted.values()
784
785     security.declarePrivate('heartbeat')
786     def heartbeat(self):
787         next_rotate = self._next_rotate
788         now = datetime.datetime.now()
789         if next_rotate is not None and next_rotate < now:
790             self._resolved.rotateBucket()
791             if self._resolved: # i.e., not empty; len is expensive!
792                 self._next_rotate = (
793                     now +
794                     datetime.timedelta(seconds=self.rotation_period))
795             else:
796                 self._next_rotate = None
797         if self._last_ping is not None and self._last_pong is None:
798             self._last_pong = now
799 InitializeClass(AsynchronousCallManager)
800
801 constructAsynchronousCallManagerForm = PageTemplateFile(
802     'www/constructAsynchronousCallManagerForm.zpt', globals(),
803     __name__='manage_addSchedulerForm')
804
805 def constructAsynchronousCallManager(
806     dispatcher, id="asynchronous_call_manager",
807     poll_interval=None, rotation_period=None, RESPONSE=None):
808     """Construct an AsynchronousCallManager"""
809     acm = AsynchronousCallManager(id)
810     if poll_interval is not None:
811         acm.poll_interval = max(poll_interval, 2)
812     if rotation_period is not None:
813         acm.rotation_period = abs(rotation_period)
814     container = dispatcher.this()
815     container._setObject(id, acm)
816
817     if RESPONSE is not None:
818         RESPONSE.redirect(container.absolute_url() + '/manage_main')
Note: See TracBrowser for help on using the browser.