How to convert an IGroupedObservable to IGrouping?
Asked Answered
A

3

0

I have an observable sequence of elements that have a char Key property, that has values in the range from 'A' to 'E'. I want to group these elements based on this key. After grouping them I want the result to by an observable of groups, so that I can process each group separately. My problem is that I can't find a nice way to preserve the key of each group in the final observable. Here is an example of what I am trying to do:

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(100))
    .Take(42)
    .GroupBy(n => (char)(65 + n % 5))
    .Select(grouped => grouped.ToArray())
    .Merge();

observable.Subscribe(group =>
    Console.WriteLine($"Group: {String.Join(", ", group)}"));

Output:

Group: 0, 5, 10, 15, 20, 25, 30, 35, 40
Group: 1, 6, 11, 16, 21, 26, 31, 36, 41
Group: 2, 7, 12, 17, 22, 27, 32, 37
Group: 3, 8, 13, 18, 23, 28, 33, 38
Group: 4, 9, 14, 19, 24, 29, 34, 39

The groups are correct, but the keys ('A' - 'E') are lost. The type of the observable is IObservable<long[]>. What I would like it to be instead, is an IObservable<IGrouping<char, long>>. This way the group.Key would be available inside the final subscription code. But as far as I can see there is no built-in way to convert an IGroupedObservable (the result of the GroupBy operator) to an IGrouping. I can see the operators ToArray, ToList, ToLookup, ToDictionary etc, but not a ToGrouping operator. My question is, how can I implement this operator?

Here is my incomplete attempt to implement it:

public static IObservable<IGrouping<TKey, TSource>> ToGrouping<TKey, TSource>(
    this IGroupedObservable<TKey, TSource> source)
{
    return Observable.Create<IGrouping<TKey, TSource>>(observer =>
    {
        // What to do?
        return source.Subscribe();
    });
}

My intention is to use it in the original example instead of the ToArray, like this:

.Select(grouped => grouped.ToGrouping())
Anglocatholic answered 26/11, 2020 at 4:0 Comment(0)
P
1

This does most of what you want:

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(100))
    .Take(42)
    .GroupBy(n => (char)(65 + n % 5))
    .SelectMany(grouped => grouped.ToArray().Select(a => (key: grouped.Key, results: a)));

That's an IObservable<ValueTuple<TKey, TResult[]>. If you wanted the IGrouping interface, you would have to make an object, since I don't think there's one available for you:

public static class Grouping
{
    // Because I'm too lazy to code types
    public static Grouping<TKey, TResult> Create<TKey, TResult>(TKey key, IEnumerable<TResult> results)
    {
        return new Grouping<TKey, TResult>(key, results);
    }
}

public class Grouping<TKey, TResult> : IGrouping<TKey, TResult>
{
    public Grouping(TKey key, IEnumerable<TResult> results)
    {
        this.Key = key;
        this.Results = results;
    }
    
    public TKey Key { get; }
    public IEnumerable<TResult> Results { get; }

    public IEnumerator<TResult> GetEnumerator()
    {
        return Results.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return Results.GetEnumerator();
    }
}

then your observable becomes:

var o2 = Observable.Interval(TimeSpan.FromMilliseconds(100))
    .Take(42)
    .GroupBy(n => (char)(65 + n % 5))
    .SelectMany(grouped => grouped.ToArray().Select(a => Grouping.Create(grouped.Key, a)));
Preparator answered 26/11, 2020 at 15:32 Comment(0)
S
1

This seems to be what you want:

IObservable<(char Key, long[] Values)> observable =
    Observable
        .Interval(TimeSpan.FromMilliseconds(100))
        .Take(42)
        .GroupBy(n => (char)(65 + n % 5))
        .Select(grouped => new { Key = grouped.Key, Values = grouped.ToArray() })
        .SelectMany(x => x.Values, (k, v) => (Key: k.Key, Values: v));

observable.Subscribe(group =>
    Console.WriteLine($"Group {group.Key}: {String.Join(", ", group.Values)}"));

I get:

Group A: 0, 5, 10, 15, 20, 25, 30, 35, 40
Group B: 1, 6, 11, 16, 21, 26, 31, 36, 41
Group C: 2, 7, 12, 17, 22, 27, 32, 37
Group D: 3, 8, 13, 18, 23, 28, 33, 38
Group E: 4, 9, 14, 19, 24, 29, 34, 39
Shushubert answered 29/11, 2020 at 21:53 Comment(4)
Yeap, that does the job. But having to work with a generic ValueTuple<TKey, TSource[] is less satisfying than working with the semantically meaningful IGrouping<TKey, TSource>! 😃Anglocatholic
@TheodorZoulias - OK... (insert confused emoji here).Shushubert
To be honest I didn't ask this question because I had a real problem to solve. I asked it because solving this hypothetical problem in a nice way (the way asked in the title) was a bit challenging. 😃Anglocatholic
@TheodorZoulias - Fair enough, I didn't read the title too clearly. It should be an easy fix though.Shushubert
A
1

I found a way to implement the ToGrouping operator without creating a custom class that implements the IGrouping interface. It is more succinct but less efficient than Shlomo's solution.

/// <summary>
/// Creates an observable sequence containing a single 'IGrouping' that has the same
/// key with the source 'IGroupedObservable', and contains all of its elements.
/// </summary>
public static IObservable<IGrouping<TKey, TSource>> ToGrouping<TKey, TSource>(
    this IGroupedObservable<TKey, TSource> source)
{
    return source
        .ToList()
        .Select(list => list.GroupBy(_ => source.Key).Single());
}

This implementation assumes that the TKey type does not implement the IEquatable interface in some crazy way, that returns different hashcodes for the same value, or considers a value not equal to itself. In case that happens, the Single LINQ operator will throw an exception.

Anglocatholic answered 29/11, 2020 at 22:56 Comment(2)
I actually deliberately didn’t do this. IObservable<IGrouping<TKey, TResult>> implicitly implies that you’ll get each grouping once it is ready/complete. This solution gives all groupings only when they’re all ready.Preparator
@Preparator I am not sure I understand. The return value of this answer's ToGrouping operator is an observable that emits a single IGrouping. It is emitted when the source IGroupedObservable completes. Could you elaborate about the differences between the two solutions?Anglocatholic

© 2022 - 2024 — McMap. All rights reserved.