选择函数一直返回-1

select function keep returning -1

本文关键字:返回 一直 函数 选择      更新时间:2023-10-16

我想做一个简单的例子,接受客户端使用TCP套接字,并处理他们与winsock的select函数

问题是,当我运行select函数时,它一直返回值-1(错误)和WSAGetLastError返回值10022。

我无法找出我做错了什么,因为fd_set正确地设置了套接字,并且maximum_sd设置为正确的值。

#include "stdafx.h"
#include <stdio.h>
#include <thread>
#include <queue>
#include <string.h>
#include <WinSock2.h>
#include <iostream>
#include <Windows.h>
using namespace std;
void SlaveThread(queue<char*>* tasks);
void MasterThread(queue<char*>* tasks);
fd_set readfds;
int max_sd = 0;
deque<SOCKET> socketsQueue;
int nSocketsAmount = 0;
int _tmain(int argc, _TCHAR* argv[])
{
    queue<char*>* tasksQueue = new queue<char*>();
    FD_ZERO(&readfds);
    thread SecondThread(MasterThread,tasksQueue);
    thread FirstThread(SlaveThread,tasksQueue);
    int nReady;
    struct timeval timeout={0, 0};
    timeout.tv_sec=10;
    timeout.tv_usec=0;
    while (true)
    {
        int i;
        nReady = select(max_sd + 1, &readfds, NULL, NULL, &timeout);
         for (i=0; i < nSocketsAmount && nReady > 0; i++)
         {
             SOCKET temp = socketsQueue[i];
             if (FD_ISSET(temp, &readfds)) {
                char buffer[200];
                memset(buffer, 0, 200);
                recv(temp, buffer, 200, 0);
                tasksQueue->push(buffer);
                nReady--;
            }
         }
    }
    FirstThread.join();
    SecondThread.join();
    return 0;
};
void SlaveThread(queue<char*>* tasks)
{
    while (true)
    {
        if (!tasks->empty())
        {
            cout << tasks->front() << " Queue size : " << tasks->size() << endl;
            tasks->pop();
        }
        Sleep(1000);
    }
};
void MasterThread(queue<char*>* tasks)
{
    WSAData WinSockData;
    WORD Version = MAKEWORD(2, 1);
    WSAStartup(Version, &WinSockData);
    /* Create socket structure */
    SOCKADDR_IN Server;
    Server.sin_addr.s_addr = inet_addr("10.0.0.7");
    Server.sin_family = AF_INET;
    Server.sin_port = htons(27015);
    SOCKET ListenSock = socket(AF_INET, SOCK_STREAM, NULL);
    SOCKET Connect;
    ::bind(ListenSock, (SOCKADDR*)&Server, sizeof(Server));
    int errno0 = WSAGetLastError();
    listen(ListenSock, 1);
    int errno1 = WSAGetLastError();
    cout << "Listening on port 27015" << endl;
    char buffer[200];
    int size = sizeof(Server);
    while (true)
    {
        if (Connect = accept(ListenSock, (SOCKADDR*)&Server, &size))
        {
            cout << "Connection established..." << endl;
            FD_SET(Connect, &readfds);
            socketsQueue.push_front(Connect);
            nSocketsAmount++;
            if (Connect > max_sd)
            {
                max_sd = Connect;
            }
        }
    }
    WSACleanup();
};

主线程添加套接字到fd_set和套接字队列。而主要使用select来获取套接字需要读取的内容。

有什么建议吗?

谢谢。

谁(什么文档页)给你权限改变readfds,而select正在使用它?

当你将一个数据结构传递给一个API函数时,该函数拥有它,直到它返回(或者更长时间,在I/O重叠的情况下)。你不能从另一个线程重写它。

你需要结合你的主select循环和你的"主"线程,幸运的是select是完全能够等待一个监听套接字上的传入连接。

你得到的实际错误是由于违反了文档中的要求:

参数readfds、writefds或exceptfds中的任意两个都可以作为空值。至少有一个描述符必须为非空,并且任何非空描述符集必须至少包含一个套接字句柄。

应用我上面描述的重新设计后,集合将始终包含侦听套接字,也解决了这个问题。

10022是WSAEINVAL。您正在向select()传递一个无效参数。您没有考虑到的一件事是,select()在退出时修改了提供的fd_set结构体,因此每次调用select()时都必须重置它们。您也没有考虑到您正在修改fd_set,而select()可能仍在使用它。你也没有保护你的任务队列免受并发线程访问。

试试这样写:

#include "stdafx.h"
#include <stdio.h>
#include <thread>
#include <queue>
#include <mutex>
#include <string.h>
#include <WinSock2.h>
#include <iostream>
#include <Windows.h>
using namespace std;
struct taskQueue
{
    queue<char*> items;
    mutex itemsLock;
};
void SlaveThread(taskQueue *tasks);
void MasterThread(taskQueue *tasks);
deque<SOCKET> socketsQueue;
mutex socketsQueueLock;
int _tmain(int argc, _TCHAR* argv[])
{
    WSAData WinSockData;
    WORD Version = MAKEWORD(2, 1);
    WSAStartup(Version, &WinSockData);
    taskQueue tasks;
    thread SecondThread(MasterThread, &tasks);
    thread FirstThread(SlaveThread, &tasks);
    char *buffer = NULL;
    while (true)
    {
        fd_set readfds;
        FD_ZERO(&readfds);
        struct timeval timeout = {0};
        timeout.tv_sec = 10;
        timeout.tv_usec = 0;
        int nReady = 0;
        {
            lock_guard<mutex> lock(socketsQueueLock);
            for (deque<SOCKET>::iterator iter = socketsQueue.begin(); iter != socketsQueue.end(); ++iter)
            {
                FD_SET(*iter, &readfds);
                ++nReady;
            }
        }
        if (nReady == 0)
        {
            Sleep(100);
            continue;
        }
        nReady = select(0, &readfds, NULL, NULL, &timeout);
        if (nReady > 0)
        {
            lock_guard<mutex> lock(socketsQueueLock);
            for (deque<SOCKET>::iterator iter = socketsQueue.begin(); iter != socketsQueue.end(); ++iter)
            {
                SOCKET temp = *iter;
                if (FD_ISSET(temp, &readfds))
                {
                    if (!buffer)
                        buffer = new char[201];
                    memset(buffer, 0, 200);
                    nReady = recv(temp, buffer, 200, 0);
                    if (nReady > 0)
                    {
                        buffer[nReady] = 0;
                        lock_guard<mutex> lock2(tasks.itemsLock);
                        tasks.items.push(buffer);
                        buffer = NULL;
                    }
                }
            }
        }
    }
    FirstThread.join();
    SecondThread.join();
    delete[] buffer;
    WSACleanup();
    return 0;
};
void SlaveThread(taskQueue *tasks)
{
    while (true)
    {
        {
            lock_guard<mutex> lock(tasks->itemsLock);
            if (!tasks->items.empty())
            {
                char *buffer = tasks->items.front();
                cout << buffer << " Queue size : " << tasks->items.size() << endl;
                tasks->items.pop();
                delete[] buffer;
            }
        }
        Sleep(1000);
    }
};
void MasterThread(taskQueue *tasks)
{
    /* Create socket structure */
    SOCKADDR_IN Server = {0};
    Server.sin_addr.s_addr = inet_addr("10.0.0.7");
    Server.sin_family = AF_INET;
    Server.sin_port = htons(27015);
    SOCKET ListenSock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (ListenSock == INVALID_SOCKET)
    {
        cout << "Cannot create listening socket. Error: " << WSAGetLastError() << endl;
        return;
    }
    if (::bind(ListenSock, (SOCKADDR*)&Server, sizeof(Server)) == SOCKET_ERROR)
    {
        cout << "Cannot bind listening socket. Error: " << WSAGetLastError() << endl;
        closesocket(ListenSock);
        return;
    }
    if (listen(ListenSock, 1) == SOCKET_ERROR)
    {
        cout << "Cannot listen on port 27015. Error: " << WSAGetLastError() << endl;
        closesocket(ListenSock);
        return;
    }
    cout << "Listening on port 27015" << endl;
    char buffer[200];
    int size;
    SOCKET Connect;
    while (true)
    {
        size = sizeof(Server);
        Connect = accept(ListenSock, (SOCKADDR*)&Server, &size);
        if (Connect != INVALID_SOCKET)
        {
            cout << "Connection established..." << endl;
            lock_guard<mutex> lock(socketsQueueLock);
            socketsQueue.push_front(Connect);
        }
    }
};