How can I split and pipe multiple NAudio stream
Asked Answered
O

2

7

I have a C# project working with input audio Stream from Kinect 1, Kinect 2, Microphone or anything else.

waveIn.DataAvailable += (object sender, WaveInEventArgs e) => {
  lock(buffer){
    var pos = buffer.Position;
              buffer.Write(e.Buffer, 0, e.BytesRecorded);
              buffer.Position = pos;
  }
};

The buffer variable is a Stream from component A that will be processed by a SpeechRecognition component B working on Streams.

I will add new components C, D, E, working on Streams to compute pitch, detect sound, do finger printing, or anything else ...

How can I duplicate that Stream for components C, D, E ?

  • Component A send an Event "I have a Stream do what you want" I don't want to reverse the logic by an Event "Give me your streams"

  • I'm looking for a "MultiStream" that could give me a Stream instance and will handle the job

Component A

var MultiStream buffer = new MultiStream()
...
SendMyEventWith(buffer)

Component B, C, D, E

public void HandleMyEvent(MultiStream buffer){
  var stream = buffer.GetNewStream();
  var engine = new EngineComponentB()
      engine.SetStream(stream);
}
  • The MultiStream must be a Stream to wrap Write() method (because Stream do not have data available mechanics) ?
  • If a Stream is Dispose() by Component B the MultiStream should remove it from it's array ?
  • The MultiStream must throw an exception on Read() to require use of GetNewStream()

EDIT: Kinect 1 provide a Stream itself ... :-( should I use a Thread to pumpit into the MultiStream ?

Did anybody have that kind of MultiStream Class ?

Thanks

Orsino answered 3/7, 2015 at 14:6 Comment(0)
H
1

I'm not sure if this is the best way to do it or that it's better than the previous answer, and I'm not guaranteeing that this code is perfect, but I coded something that is literally what you asked for because it was fun - a MultiStream class.

You can find the code for the class here: http://pastie.org/10289142

Usage Example:

MultiStream ms = new MultiStream();

Stream copy1 = ms.CloneStream();
ms.Read( ... );

Stream copy2 = ms.CloneStream();
ms.Read( ... );

copy1 and copy2 will contain identical data after the example is ran, and they will continue to get updated as the MultiStream is written to. You can read, update position, and dispose of the cloned streams individually. If disposed the cloned streams will get removed from MultiStream, and disposing of Multistream will close all related and cloned streams (you can change this if it's not the behavior you want). Trying to write to the cloned streams will throw a not supported exception.

Heathen answered 13/7, 2015 at 4:34 Comment(0)
W
1

Somehow I don't think streams really fit what you're trying to do. You're setting up a situation where a long run of the program is going to continually expand the data requirements for no apparent reason.

I'd suggest a pub/sub model that publishes the received audio data to subscribers, preferably using a multi-threaded approach to minimize the impact of a bad subscriber. Some ideas can be found here.

I've done this before with a processor class that implements IObserver<byte[]> and uses a Queue<byte[]> to store the sample blocks until the process thread is ready for them. Here's are the base classes:

public abstract class BufferedObserver<T> : IObserver<T>, IDisposable
{
    private object _lck = new object();

    private IDisposable _subscription = null;
    public bool Subscribed { get { return _subscription != null; } }

    private bool _completed = false;
    public bool Completed { get { return _completed; } }

    protected readonly Queue<T> _queue = new Queue<T>();

    protected bool DataAvailable { get { lock(_lck) { return _queue.Any(); } } }
    protected int AvailableCount { get { lock (_lck) { return _queue.Count; } } }

    protected BufferedObserver()
    {
    }

    protected BufferedObserver(IObservable<T> observable)
    {
        SubscribeTo(observable);
    }

    public virtual void Dispose()
    {
        if (_subscription != null)
        {
            _subscription.Dispose();
            _subscription = null;
        }
    }

    public void SubscribeTo(IObservable<T> observable)
    {
        if (_subscription != null)
            _subscription.Dispose();
        _subscription = observable.Subscribe(this);
        _completed = false;
    }

    public virtual void OnCompleted()
    {
        _completed = true;
    }

    public virtual void OnError(Exception error)
    { }

    public virtual void OnNext(T value)
    {
        lock (_lck)
            _queue.Enqueue(value);
    }

    protected bool GetNext(ref T buffer)
    {
        lock (_lck)
        {
            if (!_queue.Any())
                return false;
            buffer = _queue.Dequeue();
            return true;
        }
    }

    protected T NextOrDefault()
    {
        T buffer = default(T);
        GetNext(ref buffer);
        return buffer;
    }
}

public abstract class Processor<T> : BufferedObserver<T>
{
    private object _lck = new object();
    private Thread _thread = null;

    private object _cancel_lck = new object();
    private bool _cancel_requested = false;
    private bool CancelRequested
    {
        get { lock(_cancel_lck) return _cancel_requested; }
        set { lock(_cancel_lck) _cancel_requested = value; }
    }

    public bool Running { get { return _thread == null ? false : _thread.IsAlive; } }
    public bool Finished { get { return _thread == null ? false : !_thread.IsAlive; } }

    protected Processor(IObservable<T> observable)
        : base(observable)
    { }

    public override void Dispose()
    {
        if (_thread != null && _thread.IsAlive)
        {
            //CancelRequested = true;
            _thread.Join(5000);
        }
        base.Dispose();
    }

    public bool Start()
    {
        if (_thread != null)
            return false;

        _thread = new Thread(threadfunc);
        _thread.Start();
        return true;
    }

    private void threadfunc()
    {
        while (!CancelRequested && (!Completed || _queue.Any()))
        {
            if (DataAvailable)
            {
                T data = NextOrDefault();
                if (data != null && !data.Equals(default(T)))
                    ProcessData(data);
            }
            else
                Thread.Sleep(10);
        }
    }

    // implement this in a sub-class to process the blocks
    protected abstract void ProcessData(T data);
}

This way you're only keeping the data as long as you need it, and you can attach as many process threads as you need to the same observable data source.


And for the sake of completeness, here's a generic class that implements IObservable<T> so you can see how it all fits together. This one even has comments:

/// <summary>Generic IObservable implementation</summary>
/// <typeparam name="T">Type of messages being observed</typeparam>
public class Observable<T> : IObservable<T>
{
    /// <summary>Subscription class to manage unsubscription of observers.</summary>
    private class Subscription : IDisposable
    {
        /// <summary>Observer list that this subscription relates to</summary>
        public readonly ConcurrentBag<IObserver<T>> _observers;

        /// <summary>Observer to manage</summary>
        public readonly IObserver<T> _observer;

        /// <summary>Initialize subscription</summary>
        /// <param name="observers">List of subscribed observers to unsubscribe from</param>
        /// <param name="observer">Observer to manage</param>
        public Subscription(ConcurrentBag<IObserver<T>> observers, IObserver<T> observer)
        {
            _observers = observers;
            _observer = observer;
        }

        /// <summary>On disposal remove the subscriber from the subscription list</summary>
        public void Dispose()
        {
            IObserver<T> observer;
            if (_observers != null && _observers.Contains(_observer))
                _observers.TryTake(out observer);
        }
    }

    // list of subscribed observers
    private readonly ConcurrentBag<IObserver<T>> _observers = new ConcurrentBag<IObserver<T>>();

    /// <summary>Subscribe an observer to this observable</summary>
    /// <param name="observer">Observer instance to subscribe</param>
    /// <returns>A subscription object that unsubscribes on destruction</returns>
    /// <remarks>Always returns a subscription.  Ensure that previous subscriptions are disposed
    /// before re-subscribing.</remarks>
    public IDisposable Subscribe(IObserver<T> observer)
    {
        // only add observer if it doesn't already exist:
        if (!_observers.Contains(observer))
            _observers.Add(observer);

        // ...but always return a new subscription.
        return new Subscription(_observers, observer);
    }

    // delegate type for threaded invocation of IObserver.OnNext method
    private delegate void delNext(T value);

    /// <summary>Send <paramref name="data"/> to the OnNext methods of each subscriber</summary>
    /// <param name="data">Data object to send to subscribers</param>
    /// <remarks>Uses delegate.BeginInvoke to send out notifications asynchronously.</remarks>
    public void Notify(T data)
    {
        foreach (var observer in _observers)
        {
            delNext handler = observer.OnNext;
            handler.BeginInvoke(data, null, null);
        }
    }

    // delegate type for asynchronous invocation of IObserver.OnComplete method
    private delegate void delComplete();

    /// <summary>Notify all subscribers that the observable has completed</summary>
    /// <remarks>Uses delegate.BeginInvoke to send out notifications asynchronously.</remarks>
    public void NotifyComplete()
    {
        foreach (var observer in _observers)
        {
            delComplete handler = observer.OnCompleted;
            handler.BeginInvoke(null, null);
        }
    }
}

Now you can create an Observable<byte[]> to use as your transmitter for Process<byte[]> instances that are interested. Pull data blocks out of the input stream, audio reader, etc. and pass them to the Notify method. Just make sure that you clone the arrays beforehand...

Weathercock answered 6/7, 2015 at 15:14 Comment(3)
In a Pub/Sub observer I will write data from a stream to an other using a thread ? This may fix the Kinect 1 issue that provide a Stream I can pump into a memory stream.Orsino
But it do not solve the MultiStream Issue. On a side I have a Stream in component A and on the other side I have B, C, D , E readers that want to read the full stream. So it's not a lock issue but a "read" cursor issue I try to solveOrsino
In the Notify method of the IObservable<T> you post the received information to every subscriber, so every subscriber gets a full copy of the data to work on. If you need to retain a history of the data in each observer then the memory usage can blow out a little... it's a trade-off I guess. You could share a stream between different subscribers if you really wanted to, with a stream cursor object for each subscriber that handles the independent positioning they require. Would be fun to write,,, but give Pub/Sub a try.Weathercock
H
1

I'm not sure if this is the best way to do it or that it's better than the previous answer, and I'm not guaranteeing that this code is perfect, but I coded something that is literally what you asked for because it was fun - a MultiStream class.

You can find the code for the class here: http://pastie.org/10289142

Usage Example:

MultiStream ms = new MultiStream();

Stream copy1 = ms.CloneStream();
ms.Read( ... );

Stream copy2 = ms.CloneStream();
ms.Read( ... );

copy1 and copy2 will contain identical data after the example is ran, and they will continue to get updated as the MultiStream is written to. You can read, update position, and dispose of the cloned streams individually. If disposed the cloned streams will get removed from MultiStream, and disposing of Multistream will close all related and cloned streams (you can change this if it's not the behavior you want). Trying to write to the cloned streams will throw a not supported exception.

Heathen answered 13/7, 2015 at 4:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.