Python twisted: iterators and yields/inlineCallbacks?

You're right that you can't express what you want to express in cacheiter The inlineCallbacks decorator won't let you have a function that returns an iterator. If you decorate a function with it, then the result is a function that always returns a Deferred That's what it is for Part of what makes this difficult is that iterators don't work well with asynchronous code. If there's a Deferred involved in producing the elements of your iterator, then the elements that come out of your iterator are going to be Deferreds first You might do something like this to account for that: inlineCallbacks def process_work(): for element_deferred in some_jobs: element = yield element_deferred work_on(element) This can work, but it looks particularly weird.

Since generators can only yield to their caller (not, for example, to their caller's caller), the some_jobs iterator can't do anything about this; only code lexically within process_work can yield a Deferred to the inlineCallbacks provided trampoline to wait on If you don't mind this pattern, then we could imaging your code being written something like: from twisted.internet. Task import deferLater from twisted.internet. Defer import inlineCallbacks, returnValue from twisted.

Internet import reactor class cacheiter(object): def __init__(self, cached): self. _cached = iter(cached.items()) self. _remaining = def __iter__(self): return self @inlineCallbacks def next(self): # First re-fill the list of synchronously-producable values if it is empty if not self.

_remaining: for name, value in self. _cached: # Wait on this Deferred to determine if this cache item should be included if (yield check_condition(name, value)): # If so, put all of its values into the value cache so the next one # can be returned immediately next time this method is called. Self.

_remaining. Extend((name, k, v) for (k, v) in value.items()) # Now actually give out a value, if there is one. If self.

_remaining: returnValue(self. _remaining.pop()) # Otherwise the entire cache has been visited and the iterator is complete. # Sadly we cannot signal completion with StopIteration, because the iterator # protocol isn't going to add an errback to this Deferred and check for # StopIteration.So signal completion with a simple None value.

ReturnValue(None) @inlineCallbacks def process_chunk(myiter, num): for I in xrange(num): nextval = yield myiter.next() if nextval is None: # The iterator signaled completion via the special None value. # Processing is complete. ReturnValue(True) # Otherwise process the value.

Yield some_processing(nextval) # Indicate there is more processing to be done. ReturnValue(False) def sleep(sec): # Simple helper to delay asynchronously for some number of seconds. Return deferLater(reactor, sec, lambda: None) @inlineCallbacks def process_loop(cached): myiter = cacheiter(cached) while True: # Loop processing 10 items from myiter at a time, until process_chunk signals # there are no values left.

Result = yield process_chunk(myiter, 10) if result: print 'All done' break print 'More left' # Insert the 5 second delay before starting on the next chunk. Yield sleep(5) d = process_loop(cached) Another approach you might be able to take, though, is to use twisted.internet.task. Cooperate cooperate takes an iterator and consumes it, assuming that consuming it is potentially costly, and splitting up the job over multiple reactor iterations.

Taking the definition of cacheiter from above: from twisted.internet. Task import cooperate def process_loop(cached): finished = def process_one(value): if value is None: finished. Append(True) else: return some_processing(value) myiter = cacheiter(cached) while not finished: value_deferred = myiter.next() value_deferred.

AddCallback(process_one) yield value_deferred task = cooperate(process_loop(cached)) d = task.whenDone().

You're right that you can't express what you want to express in cacheiter. The inlineCallbacks decorator won't let you have a function that returns an iterator. If you decorate a function with it, then the result is a function that always returns a Deferred.

That's what it is for. Part of what makes this difficult is that iterators don't work well with asynchronous code. If there's a Deferred involved in producing the elements of your iterator, then the elements that come out of your iterator are going to be Deferreds first.

You might do something like this to account for that: @inlineCallbacks def process_work(): for element_deferred in some_jobs: element = yield element_deferred work_on(element) This can work, but it looks particularly weird. Since generators can only yield to their caller (not, for example, to their caller's caller), the some_jobs iterator can't do anything about this; only code lexically within process_work can yield a Deferred to the inlineCallbacks-provided trampoline to wait on. If you don't mind this pattern, then we could imaging your code being written something like: from twisted.internet.

Task import deferLater from twisted.internet. Defer import inlineCallbacks, returnValue from twisted. Internet import reactor class cacheiter(object): def __init__(self, cached): self.

_cached = iter(cached.items()) self. _remaining = def __iter__(self): return self @inlineCallbacks def next(self): # First re-fill the list of synchronously-producable values if it is empty if not self. _remaining: for name, value in self.

_cached: # Wait on this Deferred to determine if this cache item should be included if (yield check_condition(name, value)): # If so, put all of its values into the value cache so the next one # can be returned immediately next time this method is called. Self. _remaining.

Extend((name, k, v) for (k, v) in value.items()) # Now actually give out a value, if there is one. If self. _remaining: returnValue(self.

_remaining.pop()) # Otherwise the entire cache has been visited and the iterator is complete. # Sadly we cannot signal completion with StopIteration, because the iterator # protocol isn't going to add an errback to this Deferred and check for # StopIteration. So signal completion with a simple None value.

ReturnValue(None) @inlineCallbacks def process_chunk(myiter, num): for I in xrange(num): nextval = yield myiter.next() if nextval is None: # The iterator signaled completion via the special None value. # Processing is complete. ReturnValue(True) # Otherwise process the value.

Yield some_processing(nextval) # Indicate there is more processing to be done. ReturnValue(False) def sleep(sec): # Simple helper to delay asynchronously for some number of seconds. Return deferLater(reactor, sec, lambda: None) @inlineCallbacks def process_loop(cached): myiter = cacheiter(cached) while True: # Loop processing 10 items from myiter at a time, until process_chunk signals # there are no values left.

Result = yield process_chunk(myiter, 10) if result: print 'All done' break print 'More left' # Insert the 5 second delay before starting on the next chunk. Yield sleep(5) d = process_loop(cached) Another approach you might be able to take, though, is to use twisted.internet.task.cooperate. Cooperate takes an iterator and consumes it, assuming that consuming it is potentially costly, and splitting up the job over multiple reactor iterations.

Taking the definition of cacheiter from above: from twisted.internet. Task import cooperate def process_loop(cached): finished = def process_one(value): if value is None: finished. Append(True) else: return some_processing(value) myiter = cacheiter(cached) while not finished: value_deferred = myiter.next() value_deferred.

AddCallback(process_one) yield value_deferred task = cooperate(process_loop(cached)) d = task.whenDone().

Try writing your iterator as a DeferredGenerator.

DeferredGenerator is just an older version of inlineCallbacks; the OP is basically doing this already. – Glyph May 12 at 16:24.

I think you're trying to do this: @inlineCallbacks def cacheiter(cached): for cachename,cachevalue in cached.items" rel="nofollow">cached.items(): result = yield some_deferred() # some deferred you'd like evaluated if result is True: # here you want to return something, so you have to use returnValue # the generator you want to return can be written as a generator expression gen = ((cachename, k, v) for k,v in cachedvalue.items" rel="nofollow">cachedvalue.items()) returnValue(gen) When a genexp can't express what you're trying to return you can write a closure: @inlineCallbacks def cacheiter(cached): for cachename,cachevalue in cached.items" rel="nofollow">cached.items(): result = yield some_deferred() if result is True: # define the generator, saving the current values of the cache def gen(cachedvalue=cachedvalue, cachename=cachename): for k,v in cachedvalue.items" rel="nofollow">cachedvalue.items(): yield cachename, k, v returnValue(gen()) # return it.

It's not wrong in this case, but you should be careful when advising people to write closures that are embedded in 'for' loops, especially in generators. If the outer 'cacheiter' were to actually proceed an additional iteration, 'cachedvalue' would change in 'gen'. – Glyph May 12 at 16:26 Glyph is right, is missed the outer for-loop, which can lead to problems with closures.It does not matter here because the function will only return a single generator.

I fixed it anyways :-) – Jochen Ritzel May 12 at 16:57.

I cant really gove you an answer,but what I can give you is a way to a solution, that is you have to find the anglde that you relate to or peaks your interest. A good paper is one that people get drawn into because it reaches them ln some way.As for me WW11 to me, I think of the holocaust and the effect it had on the survivors, their families and those who stood by and did nothing until it was too late.

Related Questions