- Timestamp:
- 02/15/08 01:03:22 (8 months ago)
- Files:
-
- vendor/zasync/branches/1.1-nux/bucketqueue.py (modified) (8 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
vendor/zasync/branches/1.1-nux/bucketqueue.py
r31431 r52565 16 16 17 17 class BucketQueue(Queue): 18 """A queue that only allows a single item from a given bucket to be 19 obtained at a time. Within a bucket, order is guaranteed. Between 18 """A queue that only allows a single item from a given bucket to be 19 obtained at a time. Within a bucket, order is guaranteed. Between 20 20 unblocked buckets, order is also honored. 21 21 """ … … 36 36 finally: 37 37 self.mutex.release() 38 38 39 39 def available(self, id=None): 40 """Return True if there is currently an item available for this 40 """Return True if there is currently an item available for this 41 41 particular thread""" 42 42 if id is None: … … 47 47 finally: 48 48 self.mutex.release() 49 49 50 50 def makeBucket(self, name, silent=False): 51 51 if name is None: … … 69 69 """ 70 70 if bucket is not None and self._bucketsema.get(bucket) is None: 71 raise ValueError("bucket does not exist; create with makeBucket", 71 raise ValueError("bucket does not exist; create with makeBucket", 72 72 bucket) 73 73 if block: … … 124 124 """ 125 125 return self.put(item, False, bucket=bucket) 126 126 127 127 def releaseBucketByThreadId(self, id): 128 128 return self._releaseBucket(id=id) 129 129 130 130 def releaseBucket(self, bucket=None): 131 131 """Release bucket. If bucket is None, release the bucket for this 132 132 thread, if any.""" 133 133 return self._releaseBucket(bucket) 134 134 135 135 def _releaseBucket(self, bucket=None, id=None): 136 136 self.mutex.acquire() … … 221 221 self.mutex.release() 222 222 return item 223 223 224 224 # Override if desired (but others are more useful, below). These are 225 225 # only called with the appropriate locks held. 226 226 227 227 def _findNext(self, current=None): 228 228 """return ix, bucket lock, and item of next open, where ix is the value 229 that _removeIx needs to remove the item from the queue. Raise 229 that _removeIx needs to remove the item from the queue. Raise 230 230 LookupError if no next item is available.""" 231 231 buckets = sets.Set() … … 239 239 buckets.add(bucket) 240 240 raise LookupError("No next value available") 241 241 242 242 def _primed(self, current=None): 243 243 try: … … 253 253 def _enumerateItems(self): 254 254 """enumerate ix, bucket name, and item of each item in the queue in 255 preferred order (defaults to FIFO). ix is whatever value that 255 preferred order (defaults to FIFO). ix is whatever value that 256 256 _removeIx needs to remove the item from the queue.""" 257 257 for ix, (item, bucket) in enumerate(self.queue): 258 258 yield ix, bucket, item 259 259 260 260 def _removeIx(self, ix): 261 261 """Given an ix of an item provided by _enumerateItems or _findNext, 262 262 remove the associated entry from the queue""" 263 263 del self.queue[ix] 264 265 # _get, from the original Queue implementation, is not used: see 264 265 # _get, from the original Queue implementation, is not used: see 266 266 # _findNext, _enumerateItems, and _removeIx for methods that perform 267 267 # elements of the original task of _get.
