책정리/열혈 TCP,IP

23장 IOCP(Input Output Completion Port)

GONII 2015. 8. 10. 14:19
  • Overlapped IO를 기반으로 IOCP 이해하기

    논의가 한참인 epoll과 IOCP의 성능비교

    select와 같은 전통적인 IO 모델의 한계극복을 목적으로 운영체제 레벨(커널 레벨)에서 성능을 향상시킨 IO 모델이 운영체제 별로 등장하였다. 그 중 대표적인 것이 리눅스의 epoll, BSD의 kqueue 그리고 윈도우의 IOCP이다. 이들의 공통적인 특성은 운영체제에 의해서 기능이 지원 및 완성된다는 것이다.

    넌블로킹 모드의 소켓 구성하기

    윈도우에서는 다음의 함수호출을 통해서 넌블로킹 모드로 소켓의 속성을 변경한다.

SOCKET hListenSock;
int mode = 11;
....
hListenSock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

ioctlsocket(hListenSock, FIONBIO, &mode); // for non blocking socket

ioctlsocket함수는 소켓의 IO방식을 컨트롤하는 함수이다. 그리고 위와 같은 형태로의 함수호출이 의미하는 바는 다음과 같다.

"핸들hListenSock이 참조하는 소켓의 입출력 모드(FIONBIO)를 변수 mode에 저장된 값의 형태로 변경한다"

FIONBIO는 소켓의 입출력 모드를 변경하는 옵션이며, 이 함수의 세 번째 인자로 전달된 주소 값의 변수에 0이 저장되어 있으면 블로킹 모드로, 0이 아닌 값이 저장되어 있으면 논블로킹 모드로 소켓의 입출력 속성을 변경한다. 그리고 이렇게 속성이 논블로킹 모드로 변경되면, 논블로킹 모드로 입출력 되는 것 이외에 다음의 특징도 지니게 된다.

  • 클라이언트의 요청이 존재하지 않은 상태에서 accept함수가 호출되면 INVALID_SOCKET이 곧바로 반환된다. 그리고 이어서 WSAGetLastError 함수를 호출하면 WSAEWOULDBLOCK가 반환된다.
  • accept 함수호출을 통해서 새로 생성되는 소켓 역시 논블로킹 속성을 지닌다.

따라서 논블로킹 입출력 소켓을 대상으로 accept함수를 호출해서 INVALID_SOCKET이 반환되면, WSAGetLastError 함수 호출을 통해서 INVALID_SOCKET이 반환된 이유를 확인하고, 그에 적절한 처리를 해야만 한다.

Ovalapped IO만 가지고 에코 서버 구현하기

  • 예제 ComplRouEchoServ_win.c

#include <stdio.h>

#include <stdlib.h>

#include <winsock2.h>

#pragma comment(lib, "ws2_32.lib")

   

#define BUF_SIZE 1024

   

void CALLBACK ReadCompRoutine(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);

void CALLBACK WriteCompRoutine(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);

void errorHandling(char* msg);

   

typedef struct

{

SOCKET hClientSock;

char buf[BUF_SIZE];

WSABUF wsaBuf;

} PER_IO_DATA, *LPPER_IO_DATA;

   

int main(int argc, char* argv[])

{

WSADATA wsaData;

SOCKET hListenSock, hRecvSock;

SOCKADDR_IN listenAddr, recvAddr;

LPWSAOVERLAPPED lpOverlapped;

DWORD recvBytes, flagInfo = 0;

LPPER_IO_DATA hbInfo;

ULONG mode = 1;

int recvAddrSz;

   

   

if( argc != 2 )

{

printf("Usage : %s <port>\n", argv[0]);

exit(1);

}

   

if( WSAStartup(MAKEWORD(2,2), &wsaData) != 0 )

errorHandling("WSAStartup() error");

   

hListenSock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

ioctlsocket(hListenSock, FIONBIO, &mode);

   

memset(&listenAddr, 0, sizeof(listenAddr));

listenAddr.sin_family = AF_INET;

listenAddr.sin_addr.s_addr = htonl(INADDR_ANY);

listenAddr.sin_port = htons(atoi(argv[1]));

   

if(bind(hListenSock, (SOCKADDR*)&listenAddr, sizeof(listenAddr)) == SOCKET_ERROR)

errorHandling("bind() error");

if( listen(hListenSock, SOMAXCONN) == SOCKET_ERROR )

errorHandling("listen() error");

   

recvAddrSz = sizeof(recvAddr);

while(1)

{

// alertable wait state

SleepEx(100, TRUE);

hRecvSock = accept(hListenSock, (SOCKADDR*)&recvAddr, &recvAddrSz);

if( hRecvSock == INVALID_SOCKET )

{

if( WSAGetLastError() == WSAEWOULDBLOCK )

continue;

else

errorHandling("accept() error");

}

puts("client connected.....");

lpOverlapped = (LPWSAOVERLAPPED)malloc(sizeof(WSAOVERLAPPED));

memset(lpOverlapped, 0, sizeof(WSAOVERLAPPED));

   

hbInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));

hbInfo->hClientSock = (DWORD)hRecvSock;

(hbInfo->wsaBuf).buf = hbInfo->buf;

(hbInfo->wsaBuf).len = BUF_SIZE;

// lpOverlapped->hEvent = hbInfo

lpOverlapped->hEvent = (HANDLE)hbInfo;

WSARecv(hRecvSock, &(hbInfo->wsaBuf), 1, &recvBytes, &flagInfo, lpOverlapped, ReadCompRoutine);

}

   

closesocket(hRecvSock);

closesocket(hListenSock);

WSACleanup();

return 0;

}

   

void CALLBACK ReadCompRoutine(DWORD dwError, DWORD szRecvBytes, LPWSAOVERLAPPED lpOverlapped, DWORD flags)

{

LPPER_IO_DATA hbInfo = (LPPER_IO_DATA)(lpOverlapped->hEvent);

SOCKET hSock = hbInfo->hClientSock;

LPWSABUF bufInfo = &(hbInfo->wsaBuf);

DWORD sendBytes;

   

if( szRecvBytes == 0 )

{

closesocket(hSock);

free(lpOverlapped->hEvent);

free(lpOverlapped);

puts("client disconnected.....");

}

else

{

bufInfo->len = szRecvBytes;

WSASend(hSock, bufInfo, 1, &sendBytes, 0, lpOverlapped, WriteCompRoutine);

}

}

void CALLBACK WriteCompRoutine(DWORD dwError, DWORD szSendBytes, LPWSAOVERLAPPED lpOverlapped, DWORD flags)

{

LPPER_IO_DATA hbInfo = (LPPER_IO_DATA)(lpOverlapped->hEvent);

SOCKET hSock = hbInfo->hClientSock;

LPWSABUF bufInfo = &(hbInfo->wsaBuf);

DWORD recvbytes;

DWORD flagInfo = 0;

   

WSARecv(hSock, bufInfo, 1, &recvbytes, &flagInfo, lpOverlapped, ReadCompRoutine);

}

void errorHandling(char* msg)

{

fputs(msg, stderr);

fputc('\n', stderr);

exit(1);

}

위 예제의 동작원리를 정리하면 다음과 같다

  • 클라이언트가 연결되면 WSARecv 함수를 호출하면서 논블로킹 모드로 데이터가 수신되게 되고, 수신이 완료되면 ReadCompRoutine 함수가 호출되게 한다.
  • ReadCompRoutine 함수가 호출되면 WSASend 함수를 호출하면서 논블로킹 모드로 데이터가 수신되게 하고, 수신이 완료되면 WriteCompRoutine 함수가 호출되게 한다.
  • 그런데 이렇게 해서 호출된 WriteCompRoutine 함수는 다시 WSARecv 함수를 호출하면서 논블로킹 모드로 데이터의 수신을 기다린다.

즉, ReadCompRoutine 함수와 WriteCompRoutine 함수가 번갈아 호출되면서 데이터의 수신과 송신을 반복하도록 구성하였다. 그리고 클라이언트가 늘 때마다 추가로 생성되는 소켓의 핸들과 버퍼 정보를 ReadCompRoutine 함수와 WriteCompRoutine 함수에 전달하기 위해서 PER_IO_DATA구조체를 정의하였고, 이 구조체 변수의 주소 값은 WSAOVERLAPPED 구조체의 멤버 hEvent에 저장되어 Completion Routine 함수에 전달되게끔 하였다.

클라이언트의 재구현

  • 예제 StableEchoClnt_win.c

#include <stdio.h>

#include <stdlib.h>

#include <winsock2.h>

#pragma comment(lib, "ws2_32.lib")

   

#define BUFSIZE 1024

void ErrorHandling(char* msg);

   

int main(int argc, char* argv[])

{

WSADATA wsaData;

SOCKET hSocket;

SOCKADDR_IN servAddr;

char msg[BUFSIZE];

int strLen, readLen;

   

if( argc != 3 )

{

printf("Usage : %s <IP> <port>\n", argv[0]);

exit(1);

}

if( WSAStartup(MAKEWORD(2,2), &wsaData) != 0 )

{

ErrorHandling("WSAStartup() error");

}

   

hSocket = socket(AF_INET, SOCK_STREAM, 0);

if( hSocket == INVALID_SOCKET )

{

ErrorHandling("socket() error");

}

   

memset(&servAddr, 0, sizeof(servAddr));

servAddr.sin_family = AF_INET;

servAddr.sin_addr.s_addr = inet_addr(argv[1]);

servAddr.sin_port = htons(atoi(argv[2]));

   

if( connect(hSocket, (SOCKADDR*)&servAddr, sizeof(servAddr)) == SOCKET_ERROR )

{

ErrorHandling("connect() error");

}

else

{

puts("connected...");

}

   

while(1)

{

fputs("Input Message(Q to quit): ", stdout);

fgets(msg, BUFSIZE, stdin);

if( !strcmp(msg, "q\n") || !strcmp(msg, "Q\n"))

break;

   

strLen = strlen(msg);

readLen = 0;

while(1)

{

readLen += recv(hSocket, &msg[readLen], BUFSIZE-1, 0);

if( readLen >= strLen )

break;

}

msg[strLen] = 0;

printf("Message from server: %s", msg);

}

   

closesocket(hSocket);

WSACleanup();

return 0;

}

   

void ErrorHandling(char* msg)

{

fputs(msg, stderr);

fputc('\n', stderr);

exit(1);

}

Overlapped IO 모델에서 IOCP 모델로

앞서 확인한 Overlapped IO 모델의 에코 서버가 지니고 있는 단점은 "넌블로킹 모드의 accept함수와 alertable wait상태로의 진입을 위한 SleepEx함수가 번갈아 가며 반복 호출되는 것은 성능에 영향을 미칠수 있다"이다.

연결요청의 처리를 위한 accept함수만 호출할 수 있는 상황이 아니기 때문에, 그리고 Completion Routine 함수의 호출을 위해서 SleepEx 함수만 호출할 수 있는 상황도 아니기 대문에, accept 함수는 넌블로킹모드로 SleepEx함수는 타임아웃을 짧게 지정해서 돌아가며 반복호출하였다. 그리고 이는 실제로 성능에 영향을 주는 코드 구성이다.

이 문제의 해결을 위해서는 다음의 방법을 고려할 수 있다.

"accept 함수의 호출은 main쓰레드가(main 함수 내에서) 처리하도록 하고, 별도의 쓰레드를 추가로 하나 생성해서 클라이언트와의 입출력을 담당하게 한다."

그리고 이것이 IOCP에서 제안하는 서버의 구현 모델이다. 즉, IOCP에서는 IO를 전담하는 쓰레드를 별도로 생성한다. 그리고 이 쓰레드가 모든 클라이언트를 대상으로 IO를 진행하게 된다.

  • IOCP의 단계적 구현

    Completion Port의 생성

    IOCP에서는 완료된 IO의 정보가 Completion Port오브젝트라는 커널 오브젝트에 등록된다.

    IOCP 모델의 서버 구현을 위해서는 다음 두 가지 일을 진행해야 한다.

    • Completion Port 오브젝트의 생성
    • Completion Port 오브젝트와 소켓의 연결

    이 때 소켓은 반드시 Overlapped 속성이 부여된 소켓이여야 하며, 위의 두 가지 일은 다음 하나의 함수를 통해 이뤄진다.

#include <windows.h>

HANDLE CreateIoCompletionPort(

__in HANDLE FileHandle,

__in_opt HANDLE ExistingCompletionPort,

__in ULONG_PTR CompletionKey,

__in DWORD NumberOfConcurrentThreads

);

  • HANDLE FileHandle

    CP오브젝트 생성시에는 INVALID_HANDLE_VALUE 전달

  • HANDLE ExistingCompletionPort

    CP 오브젝트 생성시에는 NULL 전달

  • ULONG_PTR CompletionKey

    CP 오브젝트 생성시에는 0 전달

  • DWORD NumberOfConcurrentThreads

    CP 오브젝트에 할당되어 완료된 IO를 처리할 쓰레드의 수를 전달, 2가 전달되면 CP오브젝트에 할당되어 동시 실행 가능한 쓰레드의 수는 최대 2개로 제한된다. 0이 전달되면 시스템의 CPU 개수가 동시에 실행 가능한 쓰레드의 최대수로 저장된다.

위 함수를 CP오브젝트의 생성을 목적으로 호출할 때에는 마지막 매개변수만이 의미를 갖는다. 즉, CP오브젝트에 할당되어 IO를 처리할 쓰레드의 수를 2개로 지정할 경우 다음의 형태로 문장을 구성하면 된다.

HANDLE hCpObject;
....
hCpObject = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 2);

Completion Port 오브젝트와 소켓의 연결

CP오브젝트가 생성되었다면, 이제 이를 소켓과 연결시켜야 한다. 그래야 완료된 소켓의 IO정보가 CP오브젝트에 등록된다.

#include <windows.h>

HANDLE CreateIoCompletionPort(

__in HANDLE FileHandle,

__in_opt HANDLE ExistingCompletionPort,

__in ULONG_PTR CompletionKey,

__in DWORD NumberOfConcurrentThreads

);

  • HANDLE FileHandle

    CP오브젝트에 연결할 소켓의 핸들 전달

  • HANDLE ExistingCompletionPort

    소켓과 연결할 CP오브젝트의 핸들 전달

  • ULONG_PTR CompletionKey

    완료된 IO관련 정보의 전달을 위한 매개변수

  • DWORD NumberOfConcurrentThreads

    어떠한 값을 전달하건, NULL이 아니면 그냥 무시된다.

즉, 매개변수 FileHandle에 전달된 핸들의 소켓을 매개변수 ExistingCompletionPort에 전달된 핸들의 CP오브젝트에 연결시키는 것이 위 함수의 두 번째 기능이다. 그리고 호출의 형태는 다음과 같다

HANDLE hCpObject;
SOCKET hSock;
...
CreateIoCompletionPort((HANDLE)hSock, hCpObject, (DWORD)ioInfo, 0);

이렇게 CreateIoCompletionPort함수가 호출된 이후부터는 hSock을 대상으로 진행된 IO가 완료되면 이에 대한 정보가 핸들 hCpObject에 해당하는 CP오브젝트에 등록된다.

Completion Port의 완료된 IO확인과 쓰레드의 IO처리

CP에 등록되는 완료된 IO의 확인하기 위해 다음 함수를 사용한다

BOOL GetQueuedCompletionStatus(

__in HANDLE CompletionPort,

__out LPDWORD lpNumberOfBytesTransferred,

__out PULONG_PTR lpCompletionKey,

__out LPOVERLAPPED *lpOverlapped,

__in DWORD dwMilliseconds

);
// 성공 : TRUE
// 실패 : FALSE

  • HANDLE CompletionPort

    완료된 IO정보가 등록되어 있는 CP오브젝트의 핸들 전달

  • LPDWORD lpNumberOfBytesTransferred

    입출력 과정에서 송수신 된 데이터의 크기정보를 저장할 변수의 주소 값 전달

  • PULONG_PTR lpCompletionKey

    CreateIoCompletionPort 함수의 세번째 인자로 전달된 값의 저장을 위한 변수의 주소 값 전달

  • LPOVERLAPPED *lpOverlapped

    WSASend, WSARecv 함수호출 시 전달하는 OVERLAPPED 구조체 변수의 주소 값이 저장될, 변수의 주소 값 전달

  • DWORD dwMilliseconds

    타임아웃 정보 전달, 지정한 시간이 완료되면 FALSE를 반환하면서 함수를 빠져나가며, INFINITE를 전달하면 완료된 IO가 CP오브젝트에 등록될 때까지 블로킹 상태에 있게 된다.

위 함수의 세번째, 네번째 매개변수는 값을 얻기 위해서 추가된 매개변수이다. 이 두 매개변수를 통헤 얻게 되는 정보는 다음과 같다

lpCompletionKey : 소켓과 CP오브젝트의 연결을 목적으로 CreateIoCompletionPort함수가 호출될 때 전달되는 세 번째 인자 값

lpOverlapped : WSASend, WSARecv 함수 호출 시 전달되는 WSAOVERLAPPED 구조체 변수의 주소값

GetQueuedCompletionStatus함수는 IOCP의 완료된 IO 처리를 담당하는 쓰레드가 호출해야 한다.

GetQueuedCompletionStatus 함수는 어떠한 쓰레드라도 호출 가능하지만, 실제 IO의 완료에 대한 응답을 받는 쓰레드의 수는 CreateIoCompletionPort 함수 호출 시 지정한 최대 쓰레드의 수를 넘지 않는다.

IOCP 기반의 에코 서버의 구현

  • IOCPEchoServ_win.c

#include <stdio.h>

#include <stdlib.h>

#include <process.h>

#include <winsock2.h>

#include <windows.h>

#pragma comment(lib, "ws2_32.lib")

   

#define BUFSIZE 100

#define READ 3

#define WRITE 5

   

// socket info

typedef struct

{

SOCKET hClientSock;

SOCKADDR_IN clientAddr;

} PER_HANDLE_DATA, *LPPER_HANDLE_DATA;

   

// buffer info

typedef struct

{

OVERLAPPED overlapped;

WSABUF wsaBuf;

char buffer[BUFSIZE];

int rwMode;                // READ or WRITE

} PER_IO_DATA, *LPPER_IO_DATA;

   

DWORD WINAPI EchoThread(LPVOID hIOCP);

void errorHandling(char* msg);

   

int _tmain(int argc, _TCHAR* argv[])

{

if( argc != 2 )

{

printf("%s <port>", argv[0]);

exit(1);

}

WSADATA wsaData;

HANDLE hIOCP;

SYSTEM_INFO sysInfo;

LPPER_IO_DATA ioInfo;

LPPER_HANDLE_DATA handleInfo;

   

SOCKET hListenSock;

SOCKADDR_IN servAddr;

DWORD recvBytes;

DWORD flags = 0;

if( WSAStartup(MAKEWORD(2,2), &wsaData) != 0 )

errorHandling("WSAStartup() error");

   

// IOCP 생성

hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

GetSystemInfo(&sysInfo);

for( unsigned int i = 0 ; i < sysInfo.dwNumberOfProcessors ; i++ )

{

CreateThread(NULL, 0, EchoThread, (LPVOID)hIOCP, 0, NULL);

}

hListenSock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

memset(&servAddr, 0, sizeof(servAddr));

servAddr.sin_family = AF_INET;

servAddr.sin_addr.s_addr = htonl(INADDR_ANY);

servAddr.sin_port = htons(atoi(argv[1]));

   

if( bind(hListenSock, (SOCKADDR*)&servAddr, sizeof(servAddr)) == SOCKET_ERROR )

{

errorHandling("bind() error");

}

   

if( listen(hListenSock, SOMAXCONN) == SOCKET_ERROR )

{

errorHandling("listen() error");

}

   

while(1)

{

SOCKET hClientSock;

SOCKADDR_IN clientAddr;

int addrLen = sizeof(clientAddr);

// accept

hClientSock = accept(hListenSock, (SOCKADDR*)&clientAddr, &addrLen);

if( hClientSock == INVALID_SOCKET )

errorHandling("accept() error");

   

// socketInfo 할당, IOCP와 소켓 연결 시 completionKey로 사용

handleInfo = (LPPER_HANDLE_DATA)malloc(sizeof(PER_HANDLE_DATA));

handleInfo->hClientSock = hClientSock;

memcpy(&(handleInfo->clientAddr), &clientAddr, sizeof(clientAddr));

   

// 소켓과 IOCP 연결

CreateIoCompletionPort((HANDLE)hClientSock, hIOCP, (DWORD)handleInfo, 0);

   

// dataInfo

ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));

memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));

ioInfo->wsaBuf.buf = ioInfo->buffer;

ioInfo->wsaBuf.len = BUFSIZE;

ioInfo->rwMode = READ;

WSARecv(

handleInfo->hClientSock,

&(ioInfo->wsaBuf),

1,

&recvBytes,

&flags,

&(ioInfo->overlapped),

NULL);

}

return 0;

}

   

DWORD WINAPI EchoThread(LPVOID hIOCP)

{

HANDLE iocp = (HANDLE)hIOCP;

SOCKET sock;

DWORD bytestrans;

LPPER_HANDLE_DATA handleInfo;

LPPER_IO_DATA ioInfo;

DWORD flags = 0;

   

while(1)

{

GetQueuedCompletionStatus(

iocp,

&bytestrans,

(LPDWORD)&handleInfo,

(LPOVERLAPPED*)&ioInfo,

INFINITE);

sock = handleInfo->hClientSock;

   

if( ioInfo->rwMode == READ )

{

puts("message received!");

// EOF 전송 시

if( bytestrans == 0 )

{

closesocket(sock);

free(handleInfo);

free(ioInfo);

continue;

}

   

memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));

ioInfo->wsaBuf.len = bytestrans;

ioInfo->rwMode = WRITE;

WSASend(

sock,

&(ioInfo->wsaBuf),

1,

NULL,

0,

&(ioInfo->overlapped),

NULL);

   

ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));

memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));

ioInfo->wsaBuf.len = BUFSIZE;

ioInfo->wsaBuf.buf = ioInfo->buffer;

ioInfo->rwMode = READ;

WSARecv(

sock,

&(ioInfo->wsaBuf),

1,

NULL,

&flags,

&(ioInfo->overlapped),

NULL);

}

else

{

puts("message sent!");

free(ioInfo);

}

}

   

return 0;

}

void errorHandling(char* msg)

{

fputs(msg, stderr);

fputc('\n', stderr);

exit(1);

}

 

반응형