FPC 에서 pthread 기반 Leader-Follower 패턴을 구현하였다. Unbound thread set 이며, 사용 용도는 Worker Thread Pool 정도~
예외 처리 등 많이 다듬어야하지만~ 일단 관련 소스 첨부~
unit DLNA_Thread;
{$mode objfpc}{$H+}
interface
uses
Classes, SysUtils, BaseUnix, pthreads;
const
NO_CURRENT_LEADER = 0;
type
TPThreadParameter = record
Mutex: ppthread_mutex_t;
Condition: ppthread_cond_t;
Terminated: Boolean;
Suspended: Boolean;
Owner: TObject;
end;
PPThreadParmater = ^TPThreadParameter;
{ TPThread }
TPThread = class
private
FThread: pthread_t;
FMutex: pthread_mutex_t;
FCondition: pthread_cond_t;
FStartRoutine: TStartRoutine;
FSuspended: Boolean;
function GetSuspended: Boolean;
procedure SetSuspended(const AValue: Boolean);
protected
FPThreadParameter: TPThreadParameter;
FTerminated: Boolean;
public
constructor Create(CreateSuspended: Boolean; StartRoutine: TStartRoutine);
destructor Destroy; override;
procedure Terminate; virtual;
property Thread: pthread_t read FThread;
property Suspended: Boolean read GetSuspended write SetSuspended;
end;
{ TMutex }
TMutex = class
private
FMutex: pthread_mutex_t;
public
constructor Create;
destructor Destroy; override;
procedure Acquire;
procedure Release;
property Handle: pthread_mutex_t read FMutex;
end;
{ TGuard }
TGuard = class
protected
FMutex: TMutex;
FOwner: Boolean;
public
constructor Create(AMutex: TMutex);
destructor Destroy; override;
procedure Acquire;
procedure Release;
end;
{ TThreadCondition }
TThreadCondition = class
protected
FMutex: TMutex;
FCondition: pthread_cond_t;
public
constructor Create(AMutex: TMutex);
destructor Destroy; override;
procedure Wait(Timeout: Cardinal);
procedure Notify;
end;
{ TThreadSet }
// Unbound thread set
TThreadSet = class
protected
FThreadLeader: Cardinal;
FThreadCondition: TThreadCondition;
FMutex: TMutex;
public
constructor Create;
destructor Destroy; override;
function Join(Timeout: Cardinal): Boolean; virtual;
function PromoteNewLeader: Boolean; virtual;
property Mutex: TMutex read FMutex;
end;
implementation
{ TPThread }
function TPThread.GetSuspended: Boolean;
begin
Result := FSuspended;
end;
procedure TPThread.SetSuspended(const AValue: Boolean);
begin
if FSuspended <> AValue then
begin
FSuspended := AValue;
FPThreadParameter.Suspended := AValue;
if not AValue then pthread_cond_signal(FCondition);
end;
end;
constructor TPThread.Create(CreateSuspended: Boolean; StartRoutine: TStartRoutine);
begin
inherited Create;
FStartRoutine := StartRoutine;
pthread_mutex_init(@FMutex, nil);
pthread_cond_init(@FCondition, nil);
FPThreadParameter.Mutex := @FMutex;
FPThreadParameter.Condition := @FCondition;
FPThreadParameter.Terminated := False;
FPThreadParameter.Suspended := False;
if pthread_create(@FThread, nil, FStartRoutine, @FPThreadParameter) < 0 then
begin
// failed
end else
begin
SetSuspended(CreateSuspended);
end;
end;
destructor TPThread.Destroy;
begin
if not FTerminated then Terminate;
inherited Destroy;
end;
procedure TPThread.Terminate;
begin
FTerminated := True;
FPThreadParameter.Terminated := True;
pthread_cond_signal(FCondition);
pthread_cond_destroy(FCondition);
pthread_mutex_destroy(FMutex);
end;
{ TThreadSet }
constructor TThreadSet.Create;
begin
inherited;
FThreadLeader := NO_CURRENT_LEADER;
FMutex := TMutex.Create;
FThreadCondition := TThreadCondition.Create(FMutex);
end;
destructor TThreadSet.Destroy;
begin
FThreadCondition.Free;
FMutex.Free;
inherited Destroy;
end;
function TThreadSet.Join(Timeout: Cardinal): Boolean;
var
Guard: TGuard;
begin
Result := False;
Guard := TGuard.Create(FMutex);
try
while FThreadLeader <> NO_CURRENT_LEADER do
begin
FThreadCondition.Wait(Timeout);
end;
FThreadLeader := pthread_self;
Result := True;
Guard.Release;
finally
Guard.Free;
end;
end;
function TThreadSet.PromoteNewLeader: Boolean;
begin
Result := True;
try
if FThreadLeader <> Cardinal(pthread_self) then Exit(False);
FThreadLeader := NO_CURRENT_LEADER;
FThreadCondition.Notify;
except
Result := False;
end;
end;
{ TMutex }
constructor TMutex.Create;
begin
inherited;
pthread_mutex_init(@FMutex, nil);
end;
destructor TMutex.Destroy;
begin
pthread_mutex_destroy(FMutex);
inherited Destroy;
end;
procedure TMutex.Acquire;
begin
pthread_mutex_lock(FMutex);
end;
procedure TMutex.Release;
begin
pthread_mutex_unlock(FMutex);
end;
{ TThreadCondition }
constructor TThreadCondition.Create(AMutex: TMutex);
begin
inherited Create;
if Assigned(AMutex) then FMutex := AMutex;
pthread_cond_init(@FCondition, nil);
end;
destructor TThreadCondition.Destroy;
begin
pthread_cond_destroy(@FCondition);
inherited Destroy;
end;
procedure TThreadCondition.Wait(Timeout: Cardinal);
begin
pthread_cond_wait(@FCondition, @FMutex.Handle)
end;
procedure TThreadCondition.Notify;
begin
pthread_cond_signal(@FCondition);
end;
{ TGuard }
constructor TGuard.Create(AMutex: TMutex);
begin
inherited Create;
if Assigned(AMutex) then
begin
FMutex := AMutex;
FOwner := True;
FMutex.Acquire;
end;
end;
destructor TGuard.Destroy;
begin
if FOwner then FMutex.Release;
inherited Destroy;
end;
procedure TGuard.Acquire;
begin
FOwner := True;
FMutex.Acquire;
end;
procedure TGuard.Release;
begin
FMutex.Release;
end;
end.
상기 소스에서 TPThread 는 Delphi 와의 호환성을 위하여 추가하였다.
이때 해당 StartRoutine 은 다음과 같이 작성한다.
// thread function
function ThreadServerFunc(Param: Pointer): cint; cdecl;
var
ThreadParam: PPThreadParmater;
begin
ThreadParam := PPThreadParmater(Param);
while not ThreadParam^.Terminated do
begin
pthread_mutex_lock(ThreadParam^.Mutex);
if ThreadParam^.Suspended then pthread_cond_wait(ThreadParam^.Condition, ThreadParam^.Mutex);
pthread_mutex_unlock(ThreadParam^.Mutex);
end;
end;
스레드 풀은 다음과 같이 구현한다.
while True do
begin
ThreadSet.Join(0);
// wait 등 처리~
ThreadSet.PromoteNewLeader;
// 로직 수행~
end;
끝으로 POSA2 에서 설명하고 있는 Leader-Follower 패턴의 시퀀스 다이어그램~
반응형
'개발일지 > 아키텍트' 카테고리의 다른 글
Proactor with EPOLL, SELECT (2) | 2012.03.15 |
---|---|
Delphi 에서 Leader-Follower 패턴 구현 (0) | 2012.03.14 |
POSA, 패턴 시스템 (0) | 2012.02.14 |
Proactor with IOCP (4) | 2012.02.13 |
POSA2, ACT 패턴 (0) | 2012.02.08 |
댓글