본문 바로가기
개발일지/아키텍트

FPC 에서 Leader-Follower 패턴 구현

by 사악신 2012. 3. 9.

FPC 에서 pthread 기반 Leader-Follower 패턴을 구현하였다. Unbound thread set 이며, 사용 용도는 Worker Thread Pool 정도~

예외 처리 등 많이 다듬어야하지만~ 일단 관련 소스 첨부~

unit DLNA_Thread;

{$mode objfpc}{$H+}

interface

uses
  Classes, SysUtils, BaseUnix, pthreads;

const
  NO_CURRENT_LEADER = 0;

type
  TPThreadParameter = record
    Mutex: ppthread_mutex_t;
    Condition: ppthread_cond_t;
    Terminated: Boolean;
    Suspended: Boolean;
    Owner: TObject;
  end;
  PPThreadParmater = ^TPThreadParameter;

{ TPThread }

TPThread = class
private
  FThread: pthread_t;
  FMutex: pthread_mutex_t;
  FCondition: pthread_cond_t;
  FStartRoutine: TStartRoutine;
  FSuspended: Boolean;

  function GetSuspended: Boolean;
  procedure SetSuspended(const AValue: Boolean);
protected
  FPThreadParameter: TPThreadParameter;
  FTerminated: Boolean;
public
  constructor Create(CreateSuspended: Boolean; StartRoutine: TStartRoutine);
  destructor Destroy; override;

  procedure Terminate; virtual;

  property Thread: pthread_t read FThread;
  property Suspended: Boolean read GetSuspended write SetSuspended;
end;

{ TMutex }

TMutex = class
private
  FMutex: pthread_mutex_t;
public
  constructor Create;
  destructor Destroy; override;

  procedure Acquire;
  procedure Release;

  property Handle: pthread_mutex_t read FMutex;
end;

{ TGuard }

TGuard = class
protected
  FMutex: TMutex;
  FOwner: Boolean;
public
  constructor Create(AMutex: TMutex);
  destructor Destroy; override;

  procedure Acquire;
  procedure Release;
end;

{ TThreadCondition }

TThreadCondition = class
protected
  FMutex: TMutex;
  FCondition: pthread_cond_t;
public
  constructor Create(AMutex: TMutex);
  destructor Destroy; override;

  procedure Wait(Timeout: Cardinal);
  procedure Notify;
end;


{ TThreadSet }
// Unbound thread set
TThreadSet = class
protected
  FThreadLeader: Cardinal;
  FThreadCondition: TThreadCondition;
  FMutex: TMutex;
public
  constructor Create;
  destructor Destroy; override;

  function Join(Timeout: Cardinal): Boolean; virtual;
  function PromoteNewLeader: Boolean; virtual;

  property Mutex: TMutex read FMutex;
end;

implementation


{ TPThread }

function TPThread.GetSuspended: Boolean;
begin
  Result := FSuspended;
end;

procedure TPThread.SetSuspended(const AValue: Boolean);
begin
  if FSuspended <> AValue then
  begin
    FSuspended := AValue;
    FPThreadParameter.Suspended := AValue;
    if not AValue then pthread_cond_signal(FCondition);
  end;
end;

constructor TPThread.Create(CreateSuspended: Boolean; StartRoutine: TStartRoutine);
begin
  inherited Create;

  FStartRoutine := StartRoutine;

  pthread_mutex_init(@FMutex, nil);
  pthread_cond_init(@FCondition, nil);

  FPThreadParameter.Mutex := @FMutex;
  FPThreadParameter.Condition := @FCondition;
  FPThreadParameter.Terminated := False;
  FPThreadParameter.Suspended := False;
  if pthread_create(@FThread, nil, FStartRoutine, @FPThreadParameter) < 0 then
  begin
    // failed
  end else
  begin
    SetSuspended(CreateSuspended);
  end;
end;

destructor TPThread.Destroy;
begin
  if not FTerminated then Terminate;

  inherited Destroy;
end;

procedure TPThread.Terminate;
begin
  FTerminated := True;
  FPThreadParameter.Terminated := True;

  pthread_cond_signal(FCondition);
  pthread_cond_destroy(FCondition);
  pthread_mutex_destroy(FMutex);
end;


{ TThreadSet }

constructor TThreadSet.Create;
begin
  inherited;

  FThreadLeader := NO_CURRENT_LEADER;
  FMutex := TMutex.Create;
  FThreadCondition := TThreadCondition.Create(FMutex);
end;

destructor TThreadSet.Destroy;
begin
  FThreadCondition.Free;
  FMutex.Free;

  inherited Destroy;
end;

function TThreadSet.Join(Timeout: Cardinal): Boolean;
var
  Guard: TGuard;
begin
  Result := False;
  Guard := TGuard.Create(FMutex);
  try
    while FThreadLeader <> NO_CURRENT_LEADER do
    begin
      FThreadCondition.Wait(Timeout);
    end;

    FThreadLeader := pthread_self;
    Result := True;
    Guard.Release;
  finally
    Guard.Free;
  end;
end;

function TThreadSet.PromoteNewLeader: Boolean;
begin
  Result := True;
  try
    if FThreadLeader <> Cardinal(pthread_self) then Exit(False);
    FThreadLeader := NO_CURRENT_LEADER;
    FThreadCondition.Notify;
  except
    Result := False;
  end;
end;

{ TMutex }

constructor TMutex.Create;
begin
  inherited;

  pthread_mutex_init(@FMutex, nil);
end;

destructor TMutex.Destroy;
begin
  pthread_mutex_destroy(FMutex);

  inherited Destroy;
end;

procedure TMutex.Acquire;
begin
  pthread_mutex_lock(FMutex);
end;

procedure TMutex.Release;
begin
  pthread_mutex_unlock(FMutex);
end;


{ TThreadCondition }

constructor TThreadCondition.Create(AMutex: TMutex);
begin
  inherited Create;

  if Assigned(AMutex) then FMutex := AMutex;
  pthread_cond_init(@FCondition, nil);
end;

destructor TThreadCondition.Destroy;
begin
  pthread_cond_destroy(@FCondition);

  inherited Destroy;
end;

procedure TThreadCondition.Wait(Timeout: Cardinal);
begin
  pthread_cond_wait(@FCondition, @FMutex.Handle)
end;

procedure TThreadCondition.Notify;
begin
  pthread_cond_signal(@FCondition);
end;

{ TGuard }

constructor TGuard.Create(AMutex: TMutex);
begin
  inherited Create;

  if Assigned(AMutex) then
  begin
    FMutex := AMutex;
    FOwner := True;
    FMutex.Acquire;
  end;
end;

destructor TGuard.Destroy;
begin
  if FOwner then FMutex.Release;

  inherited Destroy;
end;

procedure TGuard.Acquire;
begin
  FOwner := True;
  FMutex.Acquire;
end;

procedure TGuard.Release;
begin
  FMutex.Release;
end;


end.

상기 소스에서 TPThread 는 Delphi 와의 호환성을 위하여 추가하였다.

이때 해당 StartRoutine 은 다음과 같이 작성한다.

// thread function
function ThreadServerFunc(Param: Pointer): cint; cdecl;
var
  ThreadParam: PPThreadParmater;
begin
  ThreadParam := PPThreadParmater(Param);

  while not ThreadParam^.Terminated do
  begin
    pthread_mutex_lock(ThreadParam^.Mutex);
    if ThreadParam^.Suspended then pthread_cond_wait(ThreadParam^.Condition, ThreadParam^.Mutex);
    pthread_mutex_unlock(ThreadParam^.Mutex);


  end;
end;

스레드 풀은 다음과 같이 구현한다.

while True do
begin
  ThreadSet.Join(0);
  // wait 등 처리~

  ThreadSet.PromoteNewLeader;
  // 로직 수행~
end;

끝으로 POSA2 에서 설명하고 있는 Leader-Follower 패턴의 시퀀스 다이어그램~

 

반응형

'개발일지 > 아키텍트' 카테고리의 다른 글

Proactor with EPOLL, SELECT  (2) 2012.03.15
Delphi 에서 Leader-Follower 패턴 구현  (0) 2012.03.14
POSA, 패턴 시스템  (0) 2012.02.14
Proactor with IOCP  (4) 2012.02.13
POSA2, ACT 패턴  (0) 2012.02.08

댓글