본문 바로가기
개발일지/아키텍트

Proactor with EPOLL, SELECT

by 사악신 2012. 3. 15.

IOCP 를 사용한 Proactor 구현(

2012/02/13 - [프로그래밍/OOP] - Proactor with IOCP

)은 비교적 많은 곳에서 소개가 되고있지만, EPoll 과 같은 놈을 Proactor 로 구현하는 경우는 거의 없어 보인다.;; 아무튼 앞으로 사용할 통신 프레임워크에 멀티플랫폼이라는 사치품(?)을 달 계획을 추가하다보니 EPoll 이나 KQueue 혹은 기존 POSIX 류의 Select 디스패치도 같은 구조 속에 녹일 필요가 생겼다. Proactor 냐? Reactor 냐? 잠시 고민했지만~ 아무래도 개인적으로 아직까지 메인 개발은 윈도우즈인지라 IOCP 와 최고의 궁합을 보여주는 Proactor 기반으로 설계해보았으며~ 개발 작업은 기본적인 에코 서버를 구현 & 테스트하는 형태로 진행하였다.

먼저, 서버와 핸들러 부분~

핸들러는 Template Method 를 사용하였다. Proactor 에서 사용할 ACT 의 경우 아직 FPC 가 record 함수를 지원하지 않기 때문에 이를 record helper 로 추가하였다. 작업 환경은 FPC 2.6.0, Lazarus 0.9.30.4 이고 리눅스는 fedora core 16 64bit 버전을 사용하였다.

그리고, 중요한 Proactor 부분~

Proactor 의 구현은 Bridge 패턴을 사용하였고, IOCP 도 마찬가지로 추가할 수 있다. EPoll 을 수행하는 Worker Thread 의 수행코드를 살펴보면~

function ThreadProactorEPollFunc(Param: Pointer): Pointer; cdecl;
var
  Proactor: TProactor;
  Handle, Count: Integer;
  ACT: PACT;
begin
  Result := nil;
  Proactor := TProactor(Param);
  Handle := Proactor.GetHandle;

  while True do
  begin
    Proactor.ThreadSet.Join(0);
    // wait
    repeat
      Count := epoll_wait(Handle, Events, 1, -1);
    until Count = 1;
    //until not ((Count < 0) and (ErrNO = EsockEINTR));

    ACT := PACT(Events[0].Data.ptr);
    if not Assigned(ACT) then
    begin
      // maybe server is accepted
      Proactor.ThreadSet.PromoteNewLeader;

    end else
    begin
      Proactor.ThreadSet.PromoteNewLeader;
      if ((Events[0].Events and EPOLLIN) = EPOLLIN) and (ACT^.ACTType = atAccept) then ACT^.ACTType := atRecv;
      ACT^.Complete;
    end;
  end;
end;

EPoll 의 트리거 방식은 ET(edge trigger) 이다. 쓰레드풀을 위한 Leader-Follower 패턴은 예전 글(

2012/03/09 - [프로그래밍/Lazarus] - FPC 에서 Leader-Follower 패턴 구현

)을 참고하면 된다. 끝으로 디스패치로 Select 를 사용하기 위하여 커널큐를 흉내내는 지극히 간단한 구조의 simple linked list 가 TLocalQueue 라는 형태로 추가되어있으며 이를 이용한 Select 를 수행하는 Worker Thread 의 수행 코드이다.

function ThreadProactorSelectFunc(Param: Pointer): Pointer; cdecl;
var
  Proactor: TProactor;
  Timeout: TTimeVal;
  Events: TFDSet;
  Handle, Ret, fd: Integer;
  ACT: PACT;
begin
  Result := nil;
  Proactor := TProactor(Param);
  Handle := Proactor.GetHandle;

  fpFD_ZERO(Proactor.SelEvents);
  Timeout.tv_sec := 1;
  Timeout.tv_usec := 0;
  while True do
  begin
    Proactor.ThreadSet.Join(0);

    repeat
      Events := Proactor.SelEvents;
      Ret := fpSelect(Proactor.SelMax+1, @Events, nil, nil, @Timeout);
    until Ret > 0;

    Proactor.ThreadSet.PromoteNewLeader;
    for fd := 0 to Proactor.SelMax + 1 do
    begin
      Ret := fpFD_ISSET(fd, Events);
      case Ret of
        0: Proactor.ThreadSet.PromoteNewLeader;
        1: begin
             Proactor.ThreadSet.PromoteNewLeader;
             // read
             ACT := TLocalQueue.GetInstance.Find(fd);
             ACT^.ACTType := atRecv;
             if Assigned(ACT) then ACT^.Complete;
           end;
       -1: Proactor.ThreadSet.PromoteNewLeader;
      end;
    end;
  end;
end;

앞으로 여러가지 스트레스 테스트를 하며 코드의 개선이 필요하지만, 일단 기본적인 설계는 끝난 거 같다. ^^

 

반응형

댓글