Multi-threading with .Net HttpListener
Asked Answered
E

5

29

I have a listener:

listener = new HttpListener();
listener.Prefixes.Add(@"http://+:8077/");
listener.Start();
listenerThread = new Thread(HandleRequests);
listenerThread.Start();

And I am handling requests:

private void HandleRequests()
{
    while (listener.IsListening)
    {
        var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener);
        context.AsyncWaitHandle.WaitOne();
    }
}

private void ListenerCallback(IAsyncResult ar)
{
    var listener = ar.AsyncState as HttpListener;

    var context = listener.EndGetContext(ar);

    //do some stuff
}

I would like to write void Stop() in such a way, that:

  1. It will block until all currently handled requests will end (ie. will wait for all threads to "do some stuff").
  2. While it will wait for already started requests, it will not allow any more requests (ie. return at the beginning of ListenerCallback).
  3. After that it will call listener.Stop() (listener.IsListening became false).

How could it be write?

EDIT: What do you think about this solution? Is it safe?

public void Stop() 
{
    lock (this)
    {
        isStopping = true;
    }
    resetEvent.WaitOne(); //initially set to true
    listener.Stop();
}

private void ListenerCallback(IAsyncResult ar)
{
    lock (this)
    {
        if (isStopping)
            return;

        resetEvent.Reset();
        numberOfRequests++;
    }

    var listener = ar.AsyncState as HttpListener;

    var context = listener.EndGetContext(ar);

    //do some stuff

    lock (this)
    {
        if (--numberOfRequests == 0)
            resetEvent.Set();
    }
}
Egwan answered 12/1, 2011 at 17:40 Comment(0)
E
2

I have consulted my code in EDIT part of my question and I've decided to accept it with some modifications:

public void Stop() 
{
    lock (locker)
    {
        isStopping = true;
    }
    resetEvent.WaitOne(); //initially set to true
    listener.Stop();
}

private void ListenerCallback(IAsyncResult ar)
{
    lock (locker) //locking on this is a bad idea, but I forget about it before
    {
        if (isStopping)
            return;

        resetEvent.Reset();
        numberOfRequests++;
    }

    try
    {
        var listener = ar.AsyncState as HttpListener;

        var context = listener.EndGetContext(ar);

        //do some stuff
    }
    finally //to make sure that bellow code will be executed
    {
        lock (locker)
        {
            if (--numberOfRequests == 0)
                resetEvent.Set();
        }
    }
}
Egwan answered 14/1, 2011 at 7:11 Comment(0)
T
71

For completeness, here is what it would look like if you manage your own worker threads:

class HttpServer : IDisposable
{
    private readonly HttpListener _listener;
    private readonly Thread _listenerThread;
    private readonly Thread[] _workers;
    private readonly ManualResetEvent _stop, _ready;
    private Queue<HttpListenerContext> _queue;

    public HttpServer(int maxThreads)
    {
        _workers = new Thread[maxThreads];
        _queue = new Queue<HttpListenerContext>();
        _stop = new ManualResetEvent(false);
        _ready = new ManualResetEvent(false);
        _listener = new HttpListener();
        _listenerThread = new Thread(HandleRequests);
    }

    public void Start(int port)
    {
        _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port));
        _listener.Start();
        _listenerThread.Start();

        for (int i = 0; i < _workers.Length; i++)
        {
            _workers[i] = new Thread(Worker);
            _workers[i].Start();
        }
    }

    public void Dispose()
    { Stop(); }

    public void Stop()
    {
        _stop.Set();
        _listenerThread.Join();
        foreach (Thread worker in _workers)
            worker.Join();
        _listener.Stop();
    }

    private void HandleRequests()
    {
        while (_listener.IsListening)
        {
            var context = _listener.BeginGetContext(ContextReady, null);

            if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle }))
                return;
        }
    }

    private void ContextReady(IAsyncResult ar)
    {
        try
        {
            lock (_queue)
            {
                _queue.Enqueue(_listener.EndGetContext(ar));
                _ready.Set();
            }
        }
        catch { return; }
    }

    private void Worker()
    {
        WaitHandle[] wait = new[] { _ready, _stop };
        while (0 == WaitHandle.WaitAny(wait))
        {
            HttpListenerContext context;
            lock (_queue)
            {
                if (_queue.Count > 0)
                    context = _queue.Dequeue();
                else
                {
                    _ready.Reset();
                    continue;
                }
            }

            try { ProcessRequest(context); }
            catch (Exception e) { Console.Error.WriteLine(e); }
        }
    }

    public event Action<HttpListenerContext> ProcessRequest;
}
Torrent answered 12/1, 2011 at 19:37 Comment(6)
This is awesome - it serves as a great candidate to test HttpListener throughput against.Smell
Thank you lots for that piece of code! There are two small issues: 1. ProcessRequest might be null 2. HttpListenerContext is not threadsafe unless it is staticFraze
@MartinMeeser thanks for the comment. for 1. instead of wrapping it in try catch block we could use this ProcessRequest?.Invoke(context);. For 2. however if static is not an option what do you recommend?Oversweet
@Oversweet there is no issue here with respect to #1 or #2 above. Move along.Torrent
This was helpful. Before I changed _ready from a ManualResetEvent to an AutoResetEvent though, every Worker was getting inside the body of the while() loop when the event was .Set() even though only 1 of them would remove the item from the queue (correctly). After the change, 1 Worker was still able to take the item as desired, but the other threads didn't all try and execute the loop body (until the event was .Set() more times). That seemed better so I wonder if there is a good reason NOT to use it. Any problems, or since this was an older answer, did AutoResetEvent not exist then, or ..?Achromat
@Achromat certainly the AutoResetEvent is a more common approach when releasing a thread from a waiting pool. It's just a personal preference, I'm just not a fan of them.Torrent
T
4

Well there are several ways to solve this... This is a simple example that uses a semaphore to track ongoing work, and a signal that is raised when all workers are finished. This should give you a basic idea to work from.

The solution below is not ideal, ideally we should acquire the semaphore before calling BeginGetContext. That makes shutdown more difficult, so I've chosen to use this more simplified approach. If I were doing this for 'real' I'd probably write my own thread management rather than relying on the ThreadPool. This would allow a more dependable shutdown.

Anyway here is the complete example:

class TestHttp
{
    static void Main()
    {
        using (HttpServer srvr = new HttpServer(5))
        {
            srvr.Start(8085);
            Console.WriteLine("Press [Enter] to quit.");
            Console.ReadLine();
        }
    }
}


class HttpServer : IDisposable
{
    private readonly int _maxThreads;
    private readonly HttpListener _listener;
    private readonly Thread _listenerThread;
    private readonly ManualResetEvent _stop, _idle;
    private readonly Semaphore _busy;

    public HttpServer(int maxThreads)
    {
        _maxThreads = maxThreads;
        _stop = new ManualResetEvent(false);
        _idle = new ManualResetEvent(false);
        _busy = new Semaphore(maxThreads, maxThreads);
        _listener = new HttpListener();
        _listenerThread = new Thread(HandleRequests);
    }

    public void Start(int port)
    {
        _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port));
        _listener.Start();
        _listenerThread.Start();
    }

    public void Dispose()
    { Stop(); }

    public void Stop()
    {
        _stop.Set();
        _listenerThread.Join();
        _idle.Reset();

        //aquire and release the semaphore to see if anyone is running, wait for idle if they are.
        _busy.WaitOne();
        if(_maxThreads != 1 + _busy.Release())
            _idle.WaitOne();

        _listener.Stop();
    }

    private void HandleRequests()
    {
        while (_listener.IsListening)
        {
            var context = _listener.BeginGetContext(ListenerCallback, null);

            if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle }))
                return;
        }
    }

    private void ListenerCallback(IAsyncResult ar)
    {
        _busy.WaitOne();
        try
        {
            HttpListenerContext context;
            try
            { context = _listener.EndGetContext(ar); }
            catch (HttpListenerException)
            { return; }

            if (_stop.WaitOne(0, false))
                return;

            Console.WriteLine("{0} {1}", context.Request.HttpMethod, context.Request.RawUrl);
            context.Response.SendChunked = true;
            using (TextWriter tw = new StreamWriter(context.Response.OutputStream))
            {
                tw.WriteLine("<html><body><h1>Hello World</h1>");
                for (int i = 0; i < 5; i++)
                {
                    tw.WriteLine("<p>{0} @ {1}</p>", i, DateTime.Now);
                    tw.Flush();
                    Thread.Sleep(1000);
                }
                tw.WriteLine("</body></html>");
            }
        }
        finally
        {
            if (_maxThreads == 1 + _busy.Release())
                _idle.Set();
        }
    }
}
Torrent answered 12/1, 2011 at 18:48 Comment(0)
E
2

I have consulted my code in EDIT part of my question and I've decided to accept it with some modifications:

public void Stop() 
{
    lock (locker)
    {
        isStopping = true;
    }
    resetEvent.WaitOne(); //initially set to true
    listener.Stop();
}

private void ListenerCallback(IAsyncResult ar)
{
    lock (locker) //locking on this is a bad idea, but I forget about it before
    {
        if (isStopping)
            return;

        resetEvent.Reset();
        numberOfRequests++;
    }

    try
    {
        var listener = ar.AsyncState as HttpListener;

        var context = listener.EndGetContext(ar);

        //do some stuff
    }
    finally //to make sure that bellow code will be executed
    {
        lock (locker)
        {
            if (--numberOfRequests == 0)
                resetEvent.Set();
        }
    }
}
Egwan answered 14/1, 2011 at 7:11 Comment(0)
G
0

Simply calling listener.Stop() should do the trick. This will not terminate any connections that have already been established but will prevent any new connections.

Galatia answered 12/1, 2011 at 18:5 Comment(4)
This is not true. If you call listener.Stop() during execution of ListenerCallback you will get an exception eg. when calling EndGetContext or even later, when using output stream. I can catch the exceptions of course, but I would prefer not to.Egwan
In my code I use a flag and don't refer to listener anymore after having called stop on it, but closing the listener does not close the already accepted connections, just the listener.Galatia
I don't know what do you mean by saying "I use a flag". The problem is, that in ListenerCallback I am using listener and if another thread close it, while I am using it, I will end up with exceptions, that I mentioned.Egwan
How about, in your Stop method, if you acquire your _busy semaphore and then call listener.Stop? That should allow any pending ListenerCallback call to finish before you destroy the listener object.Galatia
W
0

This uses the BlockingCollection typed queue to service requests. It is usable as is. You should derive a class from this one and override Response.

using System;
using System.Collections.Concurrent;
using System.Net;
using System.Text;
using System.Threading;

namespace Service
{
    class HttpServer : IDisposable
    {
        private HttpListener httpListener;
        private Thread listenerLoop;
        private Thread[] requestProcessors;
        private BlockingCollection<HttpListenerContext> messages;

        public HttpServer(int threadCount)
        {
            requestProcessors = new Thread[threadCount];
            messages = new BlockingCollection<HttpListenerContext>();
            httpListener = new HttpListener();
        }

        public virtual int Port { get; set; } = 80;

        public virtual string[] Prefixes
        {
            get { return new string[] {string.Format(@"http://+:{0}/", Port )}; }
        }

        public void Start(int port)
        {
            listenerLoop = new Thread(HandleRequests);

            foreach( string prefix in Prefixes ) httpListener.Prefixes.Add( prefix );

            listenerLoop.Start();

            for (int i = 0; i < requestProcessors.Length; i++)
            {
                requestProcessors[i] = StartProcessor(i, messages);
            }
        }

        public void Dispose() { Stop(); }

        public void Stop()
        {
            messages.CompleteAdding();

            foreach (Thread worker in requestProcessors) worker.Join();

            httpListener.Stop();
            listenerLoop.Join();
        }

        private void HandleRequests()
        {
            httpListener.Start();
            try 
            {
                while (httpListener.IsListening)
                {
                    Console.WriteLine("The Linstener Is Listening!");
                    HttpListenerContext context = httpListener.GetContext();

                    messages.Add(context);
                    Console.WriteLine("The Linstener has added a message!");
                }
            }
            catch(Exception e)
            {
                Console.WriteLine (e.Message);
            }
        }

        private Thread StartProcessor(int number, BlockingCollection<HttpListenerContext> messages)
        {
            Thread thread = new Thread(() => Processor(number, messages));
            thread.Start();
            return thread;
        }

        private void Processor(int number, BlockingCollection<HttpListenerContext> messages)
        {
            Console.WriteLine ("Processor {0} started.", number);
            try
            {
                for (;;)
                {
                    Console.WriteLine ("Processor {0} awoken.", number);
                    HttpListenerContext context = messages.Take();
                    Console.WriteLine ("Processor {0} dequeued message.", number);
                    Response (context);
                }
            } catch { }

            Console.WriteLine ("Processor {0} terminated.", number);
        }

        public virtual void Response(HttpListenerContext context)
        {
            SendReply(context, new StringBuilder("<html><head><title>NULL</title></head><body>This site not yet implementd.</body></html>") );
        }

        public static void SendReply(HttpListenerContext context, StringBuilder responseString )
        {
            byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString.ToString());
            context.Response.ContentLength64 = buffer.Length;
            System.IO.Stream output = context.Response.OutputStream;
            output.Write(buffer, 0, buffer.Length);
            output.Close();
        }
    }
}

This is a sample of how to use it. No need to use events or any lock blocks. The BlockingCollection solves all these problems.

using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;

namespace Service
{
  class Server
  {
    public static void Main (string[] args)
    {
        HttpServer Service = new QuizzServer (8);
        Service.Start (80);
        for (bool coninute = true; coninute ;)
        {
            string input = Console.ReadLine ().ToLower();
            switch (input)
            {
                case "stop":
                    Console.WriteLine ("Stop command accepted.");
                    Service.Stop ();
                    coninute = false;
                    break;
                default:
                    Console.WriteLine ("Unknown Command: '{0}'.",input);
                    break;
            }
        }
    }
  }
}
Woodsman answered 27/5, 2017 at 23:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.