I'm trying to create an Observable where each item is produced via an asynchronous task. The next item should be produced via an async call on the result of the previous item (co-recursion). In "Generate" parlance this would look something like this - except that Generate does not support async (nor does it support the delegate on the initial state.
var ob = Observable.Generate(
async () => await ProduceFirst(), // Task<T> ProduceFirst()
prev => Continue(prev) // bool Continue(T);
async prev => await ProduceNext(prev) // Task<T> ProduceNext(T)
item => item
);
As a more concrete example, to peek all messages from a ServiceBus queue by fetching them 100 messages at a time, implement ProduceFirst, Continue and ProduceNext as follows:
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}
And then call .SelectMany(i => i)
on the IObservable<IEnumerable<BrokeredMessage>>
to turn it into a IObservable<BrokeredMessage>
Where _serviceBusReceiver is an instance of an interface as follows:
public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}
And BrokeredMessage is from https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx
Continue
andProduceNext
methods do not compile. Also, you need to provide the signatures at least for your_serviceBusClient
class and forBrokeredMessage
. – Photosynthesis