Bandwidth throttling in C#
Asked Answered
B

2

22

I am developing a program that continually sends a stream of data in the background and I want to allow the user to set a cap for both upload and download limit.

I have read up on the token bucket and leaky bucket alghorhithms, and seemingly the latter seems to fit the description since this is not a matter of maximizing the network bandwidth but rather being as unobtrusive as possible.

I am however a bit unsure on how I would implement this. A natural approach is to extend the abstract Stream class to make it simple to extend existing traffic, but would this not require the involvement of extra threads to send the data while simultaneously receiving (leaky bucket)? Any hints on other implementations that do the same would be appreciated.

Also, although I can modify how much data the program receives, how well does bandwidth throttling work at the C# level? Will the computer still receive the data and simply save it, effectively canceling the throttling effect or will it wait until I ask to receive more?

EDIT: I am interested in throttling both incoming and outgoing data, where I have no control over the opposite end of the stream.

Biggs answered 16/12, 2008 at 11:21 Comment(0)
V
6

Based on @0xDEADBEEF's solution I created the following (testable) solution based on Rx schedulers:

public class ThrottledStream : Stream
{
    private readonly Stream parent;
    private readonly int maxBytesPerSecond;
    private readonly IScheduler scheduler;
    private readonly IStopwatch stopwatch;

    private long processed;

    public ThrottledStream(Stream parent, int maxBytesPerSecond, IScheduler scheduler)
    {
        this.maxBytesPerSecond = maxBytesPerSecond;
        this.parent = parent;
        this.scheduler = scheduler;
        stopwatch = scheduler.StartStopwatch();
        processed = 0;
    }

    public ThrottledStream(Stream parent, int maxBytesPerSecond)
        : this (parent, maxBytesPerSecond, Scheduler.Immediate)
    {
    }

    protected void Throttle(int bytes)
    {
        processed += bytes;
        var targetTime = TimeSpan.FromSeconds((double)processed / maxBytesPerSecond);
        var actualTime = stopwatch.Elapsed;
        var sleep = targetTime - actualTime;
        if (sleep > TimeSpan.Zero)
        {
            using (var waitHandle = new AutoResetEvent(initialState: false))
            {
                scheduler.Sleep(sleep).GetAwaiter().OnCompleted(() => waitHandle.Set());
                waitHandle.WaitOne();
            }
        }
    }

    public override bool CanRead
    {
        get { return parent.CanRead; }
    }

    public override bool CanSeek
    {
        get { return parent.CanSeek; }
    }

    public override bool CanWrite
    {
        get { return parent.CanWrite; }
    }

    public override void Flush()
    {
        parent.Flush();
    }

    public override long Length
    {
        get { return parent.Length; }
    }

    public override long Position
    {
        get
        {
            return parent.Position;
        }
        set
        {
            parent.Position = value;
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        var read = parent.Read(buffer, offset, count);
        Throttle(read);
        return read;
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        return parent.Seek(offset, origin);
    }

    public override void SetLength(long value)
    {
        parent.SetLength(value);
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        Throttle(count);
        parent.Write(buffer, offset, count);
    }
}

and some tests that just take some milliseconds:

[TestMethod]
public void ShouldThrottleReading()
{
    var content = Enumerable
        .Range(0, 1024 * 1024)
        .Select(_ => (byte)'a')
        .ToArray();
    var scheduler = new TestScheduler();
    var source = new ThrottledStream(new MemoryStream(content), content.Length / 8, scheduler);
    var target = new MemoryStream();

    var t = source.CopyToAsync(target);

    t.Wait(10).Should().BeFalse();
    scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks);
    t.Wait(10).Should().BeFalse();
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1);
    t.Wait(10).Should().BeFalse();
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks);
    t.Wait(10).Should().BeTrue();
}

[TestMethod]
public void ShouldThrottleWriting()
{
    var content = Enumerable
        .Range(0, 1024 * 1024)
        .Select(_ => (byte)'a')
        .ToArray();
    var scheduler = new TestScheduler();
    var source = new MemoryStream(content);
    var target = new ThrottledStream(new MemoryStream(), content.Length / 8, scheduler);

    var t = source.CopyToAsync(target);

    t.Wait(10).Should().BeFalse();
    scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks);
    t.Wait(10).Should().BeFalse();
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1);
    t.Wait(10).Should().BeFalse();
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks);
    t.Wait(10).Should().BeTrue();
}
Vraisemblance answered 9/2, 2017 at 10:2 Comment(2)
Works, thanx! Those who want to use this, need to include Reactive Extensions by NugetDeprive
Nuget detail: Scheduler / Stopwatch is using System.Reactive Nuget Package: github.com/dotnet/reactiveCurtin
C
3

I came up with a different implementation of the ThrottledStream-Class mentioned by arul. My version uses a WaitHandle and a Timer with a 1s Interval:

public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
{
    MaxBytesPerSecond = maxBytesPerSecond;
    parent = parentStream;
    processed = 0;
    resettimer = new System.Timers.Timer();
    resettimer.Interval = 1000;
    resettimer.Elapsed += resettimer_Elapsed;
    resettimer.Start();         
}

protected void Throttle(int bytes)
{
    try
    {
        processed += bytes;
        if (processed >= maxBytesPerSecond)
            wh.WaitOne();
    }
    catch
    {
    }
}

private void resettimer_Elapsed(object sender, ElapsedEventArgs e)
{
    processed = 0;
    wh.Set();
}

Whenever the bandwidth-limit exceeds the Thread will sleep until the next second begins. No need to calculate the optimal sleep duration.

Full Implementation:

public class ThrottledStream : Stream
{
    #region Properties

    private int maxBytesPerSecond;
    /// <summary>
    /// Number of Bytes that are allowed per second
    /// </summary>
    public int MaxBytesPerSecond
    {
        get { return maxBytesPerSecond; }
        set 
        {
            if (value < 1)
                throw new ArgumentException("MaxBytesPerSecond has to be >0");

            maxBytesPerSecond = value; 
        }
    }

    #endregion


    #region Private Members

    private int processed;
    System.Timers.Timer resettimer;
    AutoResetEvent wh = new AutoResetEvent(true);
    private Stream parent;

    #endregion

    /// <summary>
    /// Creates a new Stream with Databandwith cap
    /// </summary>
    /// <param name="parentStream"></param>
    /// <param name="maxBytesPerSecond"></param>
    public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
    {
        MaxBytesPerSecond = maxBytesPerSecond;
        parent = parentStream;
        processed = 0;
        resettimer = new System.Timers.Timer();
        resettimer.Interval = 1000;
        resettimer.Elapsed += resettimer_Elapsed;
        resettimer.Start();         
    }

    protected void Throttle(int bytes)
    {
        try
        {
            processed += bytes;
            if (processed >= maxBytesPerSecond)
                wh.WaitOne();
        }
        catch
        {
        }
    }

    private void resettimer_Elapsed(object sender, ElapsedEventArgs e)
    {
        processed = 0;
        wh.Set();
    }

    #region Stream-Overrides

    public override void Close()
    {
        resettimer.Stop();
        resettimer.Close();
        base.Close();
    }
    protected override void Dispose(bool disposing)
    {
        resettimer.Dispose();
        base.Dispose(disposing);
    }

    public override bool CanRead
    {
        get { return parent.CanRead; }
    }

    public override bool CanSeek
    {
        get { return parent.CanSeek; }
    }

    public override bool CanWrite
    {
        get { return parent.CanWrite; }
    }

    public override void Flush()
    {
        parent.Flush();
    }

    public override long Length
    {
        get { return parent.Length; }
    }

    public override long Position
    {
        get
        {
            return parent.Position;
        }
        set
        {
            parent.Position = value;
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        Throttle(count);
        return parent.Read(buffer, offset, count);
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        return parent.Seek(offset, origin);
    }

    public override void SetLength(long value)
    {
        parent.SetLength(value);
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        Throttle(count);
        parent.Write(buffer, offset, count);
    }

    #endregion


}
Constipate answered 5/4, 2015 at 11:18 Comment(2)
It will become more accurate if you don't set processed to 0 when the timer ticks, but subtract maxBytesPerSecond from it.Vraisemblance
In Read, this make slower than limit. For example, you download from internet. Buffer 8Kib, speed 1Kib per read, limit 1Mib/sec. Then you lose 7Kib per read, and it wh.WaitOne() at read 128th -> real speed 16Kib/sec. Need fix to int read = parent.Read(buffer, offset, count); Throttle(read); return read;Yhvh

© 2022 - 2024 — McMap. All rights reserved.