TCP/IP IOCP received data sometimes corrupt - Visual C++ on Windows
Asked Answered
H

3

7

I am writing a simple test ICOP client and server to ensure I am using the API correctly and that the data the client sending is being received correctly by the server. I have included all the code for this question.

This is where I ran into some problems, that the data within the receive buffers sometimes seems to be corrupted (corrupted in that sometimes chunks of data within buffers can be out of order or missing). To be clear, this is data within individual receive buffers, I don’t mean out of order between multiple buffers because of thread scheduling issues. I previously posted a question related to this here. However I have done more work in getting a proper code example so am posting a new question and and will link to this. I am hoping others are able to run this code and experience the same weird behaviour.

The Test Code

The test app can run in two modes, client and server. Run the server and it starts listening, run the client and connects to the server and as soon as it has connected will start throwing data at the server as fast as it will allow. The server then verifies the data within each buffer that is returned from GetQueuedCompletionStatus after calls to WSARecv. Each time a WSASend completes, I zero out the OVERLAPPED section of the structure and call WSASend again with the original buffer of data.

Each buffer of data the client sends is a sequence of bytes that increment one after the other up to a maximum specified. I don’t send the full range 0..255 in case that size fits neatly in multiples into packets and somehow hides the issue so in my example code bytes range from 0..250. For each send buffer that is constructed, I repeat that pattern numberOfGroups times.

This format should mean that I can have multiple WSARecv’s outstanding and then verify the data within the returned buffers completely independently from any other buffer, meaning no synchronization or reconstructing the order should be required. i.e. I can start at the first byte and verify that they increment one after another up to the max and then reset to 0. Once I have this test working with no issues, I can move onto something more sophisticated, ordering the received buffers and verifying more complex data.

You can specify on the command line how many simultaneous outstanding WSASend and WSARecv calls there can be. This problem seems to happen far more often when there are 2 or more outstanding WSARecv calls. With 1, it can run for quite some time before it occasionally detects a problem.

I’ve been testing on Windows 7 and using Visual Studio 2010 C++.

The number of simultaneous calls in both client and server seem to have an affect. Using 2 for both seems to produce corrupt data more than some combinations.

Sockets and IOCP seem to require quite a lot of boilerplate code just to get a very basic client and server app up and running. The actual code that does the receiving of buffers is only a few lines and involves calling WSARecv and handling the completed calls from GetQueuedCompletionStatus.

This code calls WSARecv

void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
    DWORD numberOfBytesTransferred = 0;
    DWORD flags = 0;
    if (overlapped == nullptr)
    {
        overlapped = new TestOverlapped(receiveBufferSize);
        overlapped->connection = this;
    }
    else
    {
        overlapped->reset();
    }
    overlapped->operation = soRecv;
    auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}

When the WSARecv calls complete, they are handled by worker threads - I've removed lines not related to receiving data from this snippet

void IOCPWorker::execute()
{
    bool quit = false;
    DWORD numberOfBytesTransferred = 0;
    ULONG_PTR completionKey = NULL;
    PTestOverlapped overlapped = nullptr;
    while (!quit)
    {
        auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
        if (queueResult)
        {
            switch (overlapped->operation)
            {
                case soRecv:
                {
                    IOCPConnection *connection = overlapped->connection;
                    connection->onRecv(overlapped, numberOfBytesTransferred); // This method validates the received data

                    connection->postRecv(overlapped);
                    overlapped = nullptr;
                    break;
                }
                default:;
            }
        }
    }
}

The call to connection->onRecv is where I validate the data. Does anything look obviously wrong here?

I've included the complete code for reference and should compile if you're feeling adventurous.


Full Source for Reference

Server example listening on port 3000 and have at most 2 outstanding WSARecv calls

> IOCPTest.exe server 3000 2

Client example connecting to 127.0.0.1 on port 3000 with at most 2 outstanding WSASend calls

> IOCPTest.exe client 127.0.0.1 3000 2

The program consists of a small number of classes

IOCPConnectionManager

This class handles listening for connections and also starts up the worker threads.

IOCPConnection

Just keeps track of the SOCKET and a few methods for handling asynchronous calls. IOCPConnection::onRecv is called when a WSARecv returns and the verifies the data within the buffer. It just prints a message and returns if the data is found to be out of sequence.

IOCPWorker

Worker thread. IOCPWorker::execute() is where GetQueuedCompletionStatus is called.

TestOverlapped

The required OVERLAPPED structure.

You'll need to include Ws2_32.lib and Mswsock.lib for the linker as well.

Main cpp file

/************************************************************************
*                                                                       *
*  Test IOCP Client and Server - David Shaw                             *
*                                                                       *
*  There is limited error handling here and it assumes ideal conditions *
*  Some allocated objects are not freed at the end, this is a test only *
*                                                                       *
************************************************************************/

#include "stdafx.h"
#include <iostream>
#include <string>
#include "IOCPTest.h"
#include <Windows.h>

void printUse()
{
    std::cout << "Invalid arguments" << std::endl;
    std::cout << "This test app has very limited error handling or memory management" << std::endl;
    std::cout << "Run as client or server (run the server first) e.g." << std::endl << std::endl;
    std::cout << "To run as server listening on port 3000 with 2 pending receives:" << std::endl;
    std::cout << "> IOCPTester.exe server 3000 2" << std::endl << std::endl;
    std::cout << "To run as client connected to 127.0.0.1 on port 3000 with 2 pending sends:" << std::endl;
    std::cout << "> IOCPTester.exe client 127.0.0.1 3000 2" << std::endl << std::endl;
    std::cout << "Hit enter to exit" << std::endl;
    std::cin.ignore();
}

int main(int argc, char *argv[])
{
    if (argc < 4)
    {
        printUse();
        return 0;
    }
    std::string mode(argv[1]);
    if ((mode.compare("client") != 0) && (mode.compare("server") != 0))
    {
        printUse();
        return 0;
    }

    IOCPTest::IOCPConnectionManager *manager = new IOCPTest::IOCPConnectionManager();

    bool server = mode.compare("server") == 0;
    if (server)
    {
        std::string listenPort(argv[2]);
        std::string postedReceiveCount(argv[3]);

        manager->listenPort = atoi(listenPort.c_str());
        manager->postedReceiveCount = atoi(postedReceiveCount.c_str());
        manager->postedSendCount = 1; // Not really used in this mode
        manager->startListening();
    }
    else
    {
        if (argc < 5)
        {
            printUse();
            return 0;
        }

        std::string host(argv[2]);
        std::string port(argv[3]);
        std::string postedSendCount(argv[4]);

        manager->postedReceiveCount = 1; // Not really used in this mode
        manager->postedSendCount = atoi(postedSendCount.c_str());

        IOCPTest::IOCPConnection *connection = manager->createConnection();

        connection->host = host;
        connection->port = atoi(port.c_str());
        connection->connect();
    }
    std::cout << "Hit enter to exit" << std::endl;
    std::cin.ignore();
}

IOCPTest.h

/************************************************************************
*                                                                       *
*  Test IOCP Client and Server - David Shaw                             *
*                                                                       *
*  There is limited error handling here and it assumes ideal conditions *
*  std::cout might not be the best approach in a multithreaded          *
*  environment but this is just a simple test app.                      *
*  Some allocated objects are not cleaned up at the end either, but     *
*  again this is just a test.                                           *
*                                                                       *
************************************************************************/

#ifndef IOCPTestH
#define IOCPTestH
#endif

#include <WinSock2.h> // Include before as otherwise Windows.h includes and causes issues
#include <Windows.h>
#include <string>

namespace IOCPTest
{

class IOCPConnection;

enum IOCPSocketOperation
{
    soUnknown,
    soAccept,
    soConnect,
    soDisconnect,
    soSend,
    soRecv,
    soQuit
};

struct TestOverlapped
{
    OVERLAPPED overlapped;
    WSABUF buffer;
    IOCPSocketOperation operation;
    IOCPConnection *connection;
    bool resend; // Set this to keep sending the same data over and over

    TestOverlapped(int bufferSize);
    ~TestOverlapped();
    void reset();
};

typedef TestOverlapped *PTestOverlapped;

class IOCPConnectionManager
{
public:
    static const int NUMACCEPTS = 5;

    WSADATA wsaData;
    HANDLE iocp;
    SOCKET listenSocket;
    USHORT listenPort;
    int postedReceiveCount;
    int postedSendCount;

    void startListening();
    void postAcceptEx();

    IOCPConnection *createConnection();

    IOCPConnectionManager();
};

class IOCPConnection
{
public:
    SOCKET socket;
    IOCPConnectionManager *manager;
    std::string host;
    USHORT port;

    void onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
    void postRecv(PTestOverlapped overlapped = nullptr);
    void onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
    void onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
    void send(PTestOverlapped overlapped);
    void onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);

    void connect();
};

class IOCPWorker
{
public:
    HANDLE threadHandle;
    DWORD threadId;
    IOCPConnectionManager *manager;

    IOCPWorker(bool suspended);

    void start();
    void execute();
};

}

IOCPTest.cpp

#include "stdafx.h"
#include "IOCPTest.h"
#include <iostream>
#include <Mswsock.h>
#include <WS2tcpip.h>
#include <sstream>

namespace IOCPTest
{

LPFN_ACCEPTEX fnAcceptEx = nullptr;
LPFN_CONNECTEX fnConnectEx = nullptr;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidConnectEx = WSAID_CONNECTEX;
const byte maxByteExpected = 250;
const int numberOfGroups = 4096;
const int receiveBufferSize = 0x100000;

BOOL AcceptEx
(
    SOCKET sListenSocket,
    SOCKET sAcceptSocket,
    PVOID lpOutputBuffer,
    DWORD dwReceiveDataLength,
    DWORD dwLocalAddressLength,
    DWORD dwRemoteAddressLength,
    LPDWORD lpdwBytesReceived,
    LPOVERLAPPED lpOverlapped
)
{
    if (fnAcceptEx == nullptr)
    {
        DWORD dwBytes;
        int result = WSAIoctl(sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof (GuidAcceptEx), &fnAcceptEx, sizeof(fnAcceptEx), &dwBytes, NULL, NULL);
        if (result != 0)
        {
            std::cerr << "Error calling WSAIoctl for AcceptEx" << std::endl;
            return false;
        }
    }
    return fnAcceptEx(sListenSocket, sAcceptSocket, lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, lpOverlapped);
}

BOOL ConnectEx(
    SOCKET s,
    const struct sockaddr FAR *name,
    int namelen,
    PVOID lpSendBuffer,
    DWORD dwSendDataLength,
    LPDWORD lpdwBytesSent,
    LPOVERLAPPED lpOverlapped
)
{
    if (fnConnectEx == nullptr)
    {
        DWORD dwBytes;
        int result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidConnectEx, sizeof (GuidConnectEx), &fnConnectEx, sizeof(fnConnectEx), &dwBytes, NULL, NULL);
        if (result != 0)
        {
            std::cerr << "Error calling WSAIoctl for ConnectEx" << std::endl;
            return false;
        }
    }
    return fnConnectEx(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, lpOverlapped);
}

// TestOverlapped

TestOverlapped::TestOverlapped(int bufferSize):
    overlapped(), 
    operation(soUnknown),
    connection(nullptr),
    buffer(),
    resend(false)
{
    if (bufferSize > 0)
    {
        buffer.len = bufferSize;
        buffer.buf = (CHAR*) malloc(bufferSize);
    }
}

TestOverlapped::~TestOverlapped()
{
    if (buffer.buf != nullptr)
    {
        free(buffer.buf);
    }
}

void TestOverlapped::reset()
{
    overlapped = OVERLAPPED();
}

// IOCPConnectionManager

IOCPConnectionManager::IOCPConnectionManager():
    wsaData(),
    listenSocket(0),
    listenPort(0),
    postedReceiveCount(1)
{
    WSAStartup(WINSOCK_VERSION, &wsaData);
    iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);

    SYSTEM_INFO systemInfo = SYSTEM_INFO();
    GetSystemInfo(&systemInfo);

    for (decltype(systemInfo.dwNumberOfProcessors) i = 0; i < systemInfo.dwNumberOfProcessors; i++)
    {
        IOCPWorker* worker = new IOCPWorker(true);
        worker->manager = this;
        worker->start();
    }
}

void IOCPConnectionManager::startListening()
{
    listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    CreateIoCompletionPort((HANDLE)listenSocket, iocp, ULONG_PTR(this), 0);

    sockaddr_in localAddress = sockaddr_in();
    localAddress.sin_family = AF_INET;
    localAddress.sin_addr.s_addr = INADDR_ANY; // Listen on all addresses
    localAddress.sin_port = htons(listenPort);

    if (bind(listenSocket, (SOCKADDR*) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
    {
        std::cerr << "Error in binding listening socket" << std::endl;
    }
    if (listen(listenSocket, SOMAXCONN) == 0)
    {
        std::cout << "Listening on port " << listenPort << std::endl;
    }
    for (int i = 0; i < NUMACCEPTS; i++)
    {
        postAcceptEx();
    }
}

void IOCPConnectionManager::postAcceptEx()
{
    SOCKET acceptSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

    IOCPConnection *connection = new IOCPConnection();
    connection->manager = this;
    connection->socket = acceptSocket;

    CreateIoCompletionPort((HANDLE) acceptSocket, iocp, ULONG_PTR(connection), 0); // The thread count is ignored in this call when just associating the socket

    PTestOverlapped overlapped = new TestOverlapped(2 * (sizeof(sockaddr_in) + 16)); // As specified in documentation
    overlapped->operation = soAccept;
    overlapped->connection = connection;
    DWORD byesReceived = 0;
    int result = IOCPTest::AcceptEx
    (
        listenSocket,
        acceptSocket,
        overlapped->buffer.buf,
        0, // Size of initial receiving buffer, excluding the space at the end for the two addressed
        sizeof(sockaddr_in) + 16, // Sizes as specified in the Winsock 2.2 API documentation
        sizeof(sockaddr_in) + 16, // Sizes as specified in the Winsock 2.2 API documentation
        &byesReceived,
        (LPOVERLAPPED) overlapped
    );
    if (!result)
    {
        int errorCode = WSAGetLastError();
        if (errorCode != WSA_IO_PENDING)
        {
            std::cerr << "Error calling AcceptEx. Returned errorCode = " << errorCode << std::endl;
        }
    }
}

IOCPConnection *IOCPConnectionManager::createConnection()
{
    IOCPConnection *connection = new IOCPConnection();
    connection->manager = this;

    return connection;
}

// IOCPConnection

void IOCPConnection::onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
    manager->postAcceptEx(); // Replace this accept
    auto returnCode = setsockopt(socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (const char *)&manager->listenSocket, sizeof(manager->listenSocket));
    if (returnCode == SOCKET_ERROR)
    {
        std::cerr << "SetSockOpt in OnAcceptEx returned SOCKET_ERROR" << std::endl;
    }
    std::cout << "Connection Accepted" << std::endl;
    for (int i = 0; i < manager->postedReceiveCount; ++i)
    {
        postRecv();
    }
}

void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
    DWORD numberOfBytesTransferred = 0;
    DWORD flags = 0;
    if (overlapped == nullptr)
    {
        overlapped = new TestOverlapped(receiveBufferSize);
        overlapped->connection = this;
    }
    else
    {
        overlapped->reset();
    }
    overlapped->operation = soRecv;
    auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}

void IOCPConnection::onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
    if (numberOfBytesTransferred > 0)
    {
        byte *data = (byte *)overlapped->buffer.buf;
        if (data[0] > maxByteExpected)
        {
            std::cerr << "Byte greater than max expected found. Max Expected: " << maxByteExpected << "; Found: " << data[0] << std::endl;
            return;
        }
        byte next = (data[0] == maxByteExpected)?0:data[0] + 1;
        for (decltype(numberOfBytesTransferred) i = 1; i < numberOfBytesTransferred; ++i)
        {
            if (data[i] != next)
            {
                // Not really the best solution for writing data out from multiple threads. Test app only.
                std::cerr << "Invalid data. Expected: " << (int)next << "; Got: " << (int)data[i] << std::endl;
                return;
            }
            else if (next == maxByteExpected)
            {
                next = 0;
            }
            else
            {
                ++next;
            }
        }
        //std::cout << "Valid buffer processed" << std::endl;
    }
}

void IOCPConnection::onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
    for (int i = 0; i < manager->postedSendCount; ++i)
    {
        // Construct a sequence of incremented byte values 0..maxByteExpected repeated numberOfGroups
        PTestOverlapped sendOverlapped = new TestOverlapped((maxByteExpected + 1) * numberOfGroups);
        sendOverlapped->connection = this;

        for (int j = 0; j < numberOfGroups; ++j)
        {
            for (byte k = 0; k <= maxByteExpected; ++k)
            {
                ((byte *)sendOverlapped->buffer.buf)[(j * (maxByteExpected + 1)) + (int)k] = k;
            }
        }
        sendOverlapped->resend = true; // Repeat sending this data
        send(sendOverlapped);
    }
}

void IOCPConnection::send(PTestOverlapped overlapped)
{
    overlapped->reset();
    overlapped->operation = soSend;

    DWORD bytesSent = 0;
    DWORD flags = 0;

    if (WSASend(socket, &overlapped->buffer, 1, &bytesSent, flags, (LPWSAOVERLAPPED) overlapped, nullptr) == SOCKET_ERROR)
    {
        int errorCode = WSAGetLastError();
        if (errorCode != WSA_IO_PENDING)
        {
            std::cerr << "Error calling WSASend. Returned errorCode = " << errorCode << std::endl;
        }
    }
}

void IOCPConnection::onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
}

void IOCPConnection::connect()
{
    socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (socket == INVALID_SOCKET)
    {
        std::cerr << "Error calling socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) in IOCPConnection::connect()" << std::endl;
        return;
    }
    CreateIoCompletionPort((HANDLE)socket, manager->iocp, ULONG_PTR(this), 0); // The thread count is ignored in this call when just associating the socket

    sockaddr_in localAddress = sockaddr_in();
    localAddress.sin_family = AF_INET;
    localAddress.sin_addr.s_addr = INADDR_ANY;
    localAddress.sin_port = 0;

    if (bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
    {
        std::cerr << "Error calling bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress) in IOCPConnection::connect()" << std::endl;
        return;
    }

    addrinfo hints = addrinfo();
    addrinfo *remoteAddress = nullptr;

    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
    hints.ai_flags = AI_PASSIVE;

    std::stringstream ss;
    ss << port;
    //std::cout << ss.str() << std::endl;
    if (getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) != 0)
    {
        std::cerr << "Error calling getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) in IOCPConnection::connect()" << std::endl;
        return;
    }

    TestOverlapped *overlapped = new TestOverlapped(0);
    overlapped->connection = this;
    overlapped->operation = soConnect;

    BOOL result = IOCPTest::ConnectEx
    (
        socket,
        remoteAddress->ai_addr,
        remoteAddress->ai_addrlen,
        nullptr,
        0,
        nullptr,
        LPOVERLAPPED(overlapped)
    );
    if (result == FALSE)
    {
        int errorCode = WSAGetLastError();
        if (errorCode != WSA_IO_PENDING)
        {
            //std::cerr << "Error calling ConnectEx. You'll need to add some more code if you want to know why :)" << std::endl;
            std::cerr << "Error calling ConnectEx. Returned errorCode = " << errorCode << std::endl;
        }
    }

    freeaddrinfo(remoteAddress);
}

// IOCPWorker

DWORD WINAPI IOCPWorkerThreadProc(LPVOID lpParam)
{
    ((IOCPWorker*)lpParam)->execute();
    return 0;
}

IOCPWorker::IOCPWorker(bool suspended)
{
    threadHandle = CreateThread(NULL, 0, IOCPWorkerThreadProc, this, (suspended)?CREATE_SUSPENDED:0, &threadId);
}

void IOCPWorker::start()
{
    ResumeThread(threadHandle);
}

void IOCPWorker::execute()
{
    //std::cout << "TMVIOCPWorker::execute()" << std::endl;
    bool quit = false;
    DWORD numberOfBytesTransferred = 0;
    ULONG_PTR completionKey = NULL;
    PTestOverlapped overlapped = nullptr;
    while (!quit)
    {
        auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
        if (queueResult)
        {
            switch (overlapped->operation)
            {
                case soAccept:
                {
                    IOCPConnection *connection = overlapped->connection;
                    connection->onAcceptEx(overlapped, numberOfBytesTransferred);

                    delete overlapped;
                    overlapped = nullptr;
                    break;
                }
                case soConnect:
                {
                    std::cout << "ConnectEx returned" << std::endl;
                    IOCPConnection *connection = overlapped->connection;
                    connection->onConnect(overlapped, numberOfBytesTransferred); // This method validates the received data
                    delete overlapped;
                    overlapped = nullptr;
                    break;
                }
                case soRecv:
                {
                    //std::cout << "Received Data: " << numberOfBytesTransferred << std::endl;
                    IOCPConnection *connection = overlapped->connection;
                    connection->onRecv(overlapped, numberOfBytesTransferred); // This method validates the received data

                    overlapped->reset();
                    connection->postRecv(overlapped);
                    overlapped = nullptr;
                    break;
                }
                case soSend:
                {
                    IOCPConnection *connection = overlapped->connection;
                    connection->onSent(overlapped, numberOfBytesTransferred);

                    // Send the same data over and over
                    std::cout << "Resending buffer" << std::endl;
                    if (overlapped->resend)
                    {
                        connection->send(overlapped);
                    }
                    else
                    {
                        delete overlapped;
                    }
                    overlapped = nullptr;
                    break;
                }
                default:;
            }
        }
    }
}

}

Most buffers received are correct however I still have a lot of scroll like this when running with 2 receive and 2 send buffers for the socket:

Invalid data. Expected: 169; Got: 123
Invalid data. Expected: 114; Got: 89
Invalid data. Expected: 89; Got: 156
Invalid data. Expected: 206; Got: 227
Invalid data. Expected: 125; Got: 54
Invalid data. Expected: 25; Got: 0
Invalid data. Expected: 58; Got: 146
Invalid data. Expected: 33; Got: 167
Invalid data. Expected: 212; Got: 233
Invalid data. Expected: 111; Got: 86
Invalid data. Expected: 86; Got: 153
Invalid data. Expected: 190; Got: 165
Invalid data. Expected: 175; Got: 150
Invalid data. Expected: 150; Got: 217
Invalid data. Expected: 91; Got: 112
Invalid data. Expected: 95; Got: 162
Invalid data. Expected: 207; Got: 182
Invalid data. Expected: 222; Got: 243
Invalid data. Expected: 126; Got: 101
Invalid data. Expected: 157; Got: 132
Invalid data. Expected: 160; Got: 89
Invalid data. Expected: 205; Got: 180
Invalid data. Expected: 113; Got: 134
Invalid data. Expected: 45; Got: 20
Invalid data. Expected: 113; Got: 201
Invalid data. Expected: 64; Got: 198
Invalid data. Expected: 115; Got: 182
Invalid data. Expected: 140; Got: 115

I hope it is just something simple I am doing wrong. I've run the same verification over the data buffer before sending as I do when receiving as well to make sure I hadn't done something silly there but it passes that check. I wrote a server in another language not using IOCP, and that seems to receive the data correctly. I also wrote a client in another language, and the IOCP server seems to detect corruptions in that case too. But that said, there might be issues with both the client and server. I appreciate any time that anyone is willing to spend on this.

Hindermost answered 6/1, 2015 at 16:1 Comment(7)
A "boiled-down" code should be shorter. Did you use a debugger; valgrind, etc.etc.?Sigurd
This question is by far too long. Clone your project and start deleting stuff until you can demo the problem in a few dozen lines. In the process you'll find the problem yourself.Fiann
Sorry, perhaps that was the wrong term to use. The code only includes enough functionality such that it can be compiled and have the client connect to the server and send data a very specific block of data over and over. I guess I was thinking it was boiled down from the entire server application - what I have here can't do much useful and it's all the boiler plate code required by IOCP and sockets.Hindermost
I've used the built in debugger in Visual Studio but not valgrind. The main code handling the incoming data is in void IOCPConnection::postRecv(PTestOverlapped overlapped) that calls WSARecv and looking in void IOCPWorker::execute() shows the section waiting on GetQueuedCompletionStatus. I have not noticed any other memory issues other than the data in the returned buffer not always matching what is being sent.Hindermost
I've edited the question and split it into two parts. The first part separates out the code of interest that handles the receives. It is actually already only a few lines of code, the rest is boilerplate required to get IOCP and sockets up and going. I've left the full source for reference at the end if the two snippets don't shed enough light.Hindermost
I don't recommend having more than one WSASend pending at the same time for a single socket. Same for WSARecv.San
@DavidSchwartz Funny as that's pretty much been my line to others in the past who have been working on IOCP, however many sources suggest it can be benefitial so had been looking into ways to make resequencing incoming data efficient so wanted to test this out. Regardless I wanted to know what the issue was here in case there was something else I was doing wrong as it should be possible to have multiple pending sends or receives. I think the issue is resolved now, I'll add some comments to the answer that was added shortly once I'm sure what I've changed is the solution.Hindermost
C
6

Okay, I may have found your problem. If you take a look at the data you receive, all the bytes are in order, but suddenly jump in the sequence, as if it was interrupted by another call. Now, from the MSDN documentation on WSASend and WSARecv :

If you are using I/O completion ports, be aware that the order of calls made to WSASend is also the order in which the buffers are populated. WSASend should not be called on the same socket simultaneously from different threads, because it can result in an unpredictable buffer order.

If you are using I/O completion ports, be aware that the order of calls made to WSARecv is also the order in which the buffers are populated. WSARecv should not be called on the same socket simultaneously from different threads, because it can result in an unpredictable buffer order.

That's it. I don't really now the good way for what you want, but what you do is probably not the way it is meant to be used.

Did you try this over a real network ? The loopback interface is a special circuit and may behave differently, but it's still undefined behavior, so you shouldn't rely on this.

Capriole answered 6/1, 2015 at 18:32 Comment(16)
As a comment, your test code doesn't need Mswsock.lib.Capriole
Ah with Mswsock.lib, you are correct. The code I posted was different than it was originally and no longer requires it. I had also tested over a real network and that had issues too. I’d read several sources talking about using multiple pending WSASend and WSARecv calls per socket lead me down the path of assuming that it's safe to call them simultaneously from multiple threads for the one socket with the only issue being that without extra synchronization code, you can't tell what order the buffers were actually accepted in and so what order they were filled.Hindermost
As I just verify the contents of each buffer independently in my test, I thought "Well, I don't care what order they were filled, as long as they are a valid sequential chunk of the stream". I had originally misinterpreted that comment from MSDN you highlighted as it being ok if you were not worried about the order the buffers were filled. I now believe that assumption was incorrect.Hindermost
Your reply lead me to reduce the worker count to 1 such that only one thread was making the calls to test if this was the case. The corruption seemed to stop. I then changed the worker count back to match the CPU count and then created a critical section per connection to ensure that there is only ever one simultaneous call per socket to WSASend or WSARecv. I still have multiple pending WSASend and WSARecv's per socket though.Hindermost
In summary, it's not only the order of buffers that makes it unsafe, it's just not safe to call WSASend or WSARecv for the one socket simultaneously from different threads. The calls need to be serialized per socket. It's still valid to have multiple pending requests for the one socket and multiple threads can issue them, as long as the calls to the functions are synchronized. I assume it's safe to call both WSASend and WSARecv for the one socket simultaneously but that test will have to wait.Hindermost
Thank you very much for taking the time to look at this, this pointed me in the right direction and I am glad only such a minor change was required. I’ve had a client and server running full speed for many hours now with no detected corruption so far.Hindermost
@Hindermost About simultaneous Send/Recv, it should work. The usual pattern is to have a single thread to send (possibly fed from multiple threads by a queue), and another thread to receive (possibly dispatching data to multiple threads).Capriole
If you have multiple outstanding calls to WSARecv on a single socket and more than one thread processing the IOCP then you MUST synchronise the order of processing of the completions so that they are processed in the order that the WSARecvs were issued. One way to do this is to have a sequence number in the buffer. You can always allow multiple calls to WSASend from multiple threads as long as each send is a complete and distinct message and order they arrive doesn't matter. I wrote about this here: codeproject.com/Articles/2610/…Winnipegosis
@LenHolgate Thank you, I was aware of that reordering approach and was looking at implementing something similar for the real system. The whole problem was not realising that calls to WSARecv need to be synchronized per socket EVEN if the order of buffers is not important and EVEN if there is no concept of a message and all you want is a valid slice from the stream (even where you don't require the start and end of a buffer to match the start and end of buffers sent, as long as it's a valid continuous slice from the stream).Hindermost
@LenHolgate Without a critical section per socket around WSARecv each buffer can sometimes contain random chunks from various locations in the stream all mashed together. I have removed the one around WSASend however, just for my test app and so far no corruptions have been detected by the receiver as you suggest.Hindermost
@LenHolgate Hmm I'm no longer convinced about WSASend being called for the one socket simultaneously from multiple threads. Very very rarely it detects slight corruptions in the received buffers. Example is I am sending bytes in the range 0..250 in sequence. In one buffer I received bytes "..224,225,0,1,2.." rather than expected "..224,225,226,227,228.." in the next buffer I received "..249,250,226,227,228.." rather than "..249,250,0,1,2..". Seems to happen 1 to 2 times per hour.Hindermost
@LenHolgate I'll note that this is running at several Gbps for the one socket continuous. A normal server might never even experience this situation or extremely rarely. In the situation that I ensure only one thread per socket is making an active call to WSASend (only the call itself, I still have multiple pending sends per socket) I haven't detected any problems, even after running overnight transferring many TB's of data at full speed.Hindermost
I just noticed I'd left another test client/server running in the background on my machine during that previous WSASend test, when I killed the other process and made more bandwidth available, the WSASend corruption increased dramatically. Adding the critical section back around WSASend dropped the corruption back to 0 again however.Hindermost
@LenHolgate Re you last comment, what you say is what I assumed too, however try it out, if you don't protect the call to WSASend for a particular socket with a critical section, you will receive corrupted data and you will receive messed up buffers of intermingled data as you say can't happen.. I understand what you are saying in your message, and that is what I thought too.Hindermost
@LenHolgate To test out what I am saying, add a critical section to IOCPConnection and initialize it, then put an EnterCriticalSection(&cs);/LeaveCriticalSection(&cs); around both WSARead and WSASend, run the app - no corruption, remove it from around the send and you will get corrupted data received. It does not matter the order in which it sends the buffers passed to WSASend as each buffer contains a sequence that can be verified independently. I can chat with you on messenger if you want.Hindermost
@LenHolgate I've emailed you an example of what I'm talking about and why in this example that the sequence of WSARecv buffers is not the problem. I think once you've looked through it you will follow what I am saying and you'll also be able to verify the issue yourself. I understand what you are saying.Hindermost
W
4

Having tested the code in question it seems that multiple concurrent calls to WSARecv on a single socket can cause data corruption in the resulting buffers that are passed out to the completion handler. A lock that ensures each connection is only issuing a single WSARecv call at a time will fix this.

This is consistent with the current MSDN documentation for WSARecv.

If you are using I/O completion ports, be aware that the order of calls made to WSARecv is also the order in which the buffers are populated. WSARecv should not be called on the same socket simultaneously from different threads, because it can result in an unpredictable buffer order.

Though personally I feel that documentation could be clearer as it implies that the 'order that the buffers are filled' can be an issue - which is well known and documented and doesn't mention the fact that the inbound data stream can actually be spread rather unpredictably between the buffers.


This is interesting to me in that I've never known this was an issue and in 15 years I've never seen it :) However, I rely on sequencing multiple WSARecv completions to avoid the well known and documented issue of thread scheduling affecting the order that you process a read completion even though they are guaranteed to come out of the IOCP in the order that they went in in. My sequencing requires a sequence number in each read buffer and therefore I have a lock around the sequence number increment and the call to WSARecv.

Given that it's impossible to issue multiple WSARecv's from multiple threads and successfully recreate the inbound data stream unless you can somehow determine the sequence in which the WSARecvs were issued from the completions I can't see how this can actually be a real world problem with TCP sockets. It could pose a problem with UDP however as there's no need to sequence and so no need for locking except to prevent this issue, and though I don't think I've ever realised that I've seen it I think this could be an issue on one system that I've been involved with...

I need to do more testing with the WSASend side but I've no reason to think that's any more likely to be thread safe than the WSARecv call. Ah well, you learn something new every day...

I've blogged about this here.

Winnipegosis answered 8/1, 2015 at 14:33 Comment(2)
Thanks Len.. Is there any sample code available which can handle multiple WSARecv on server side without data corruption ?Quattrocento
My free IOCP source code from ~2002 deals with it, the articles for this code are here: codeproject.com/Articles/2610/… and the latest code can be found here: serverframework.com/products---the-free-framework.htmlWinnipegosis
T
0

I think do NOT post receive multitimes make only one received buffer for one socket at any time.

after you process all data, call WSARecv again for more data.

Tova answered 15/1, 2015 at 9:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.