Commit bbaeccbd4a93c25971050dd25099c516232a8dd9

Authored by Carlos Picanco
1 parent fef87696
Exists in master

finally fully working network

- fully implemented chat (PUSH>POLL>PULL>PUB>SUB)
- fully implemented request and replies (PUSH_REQ>POLL>PULL>REP)
cultural_matrix.lpi
@@ -38,7 +38,7 @@ @@ -38,7 +38,7 @@
38 <PackageName Value="LCL"/> 38 <PackageName Value="LCL"/>
39 </Item3> 39 </Item3>
40 </RequiredPackages> 40 </RequiredPackages>
41 - <Units Count="19"> 41 + <Units Count="17">
42 <Unit0> 42 <Unit0>
43 <Filename Value="cultural_matrix.lpr"/> 43 <Filename Value="cultural_matrix.lpr"/>
44 <IsPartOfProject Value="True"/> 44 <IsPartOfProject Value="True"/>
@@ -113,17 +113,9 @@ @@ -113,17 +113,9 @@
113 <IsPartOfProject Value="True"/> 113 <IsPartOfProject Value="True"/>
114 </Unit15> 114 </Unit15>
115 <Unit16> 115 <Unit16>
116 - <Filename Value="units/zmq_network.pas"/>  
117 - <IsPartOfProject Value="True"/>  
118 - </Unit16>  
119 - <Unit17>  
120 <Filename Value="units/zmq_network3.pas"/> 116 <Filename Value="units/zmq_network3.pas"/>
121 <IsPartOfProject Value="True"/> 117 <IsPartOfProject Value="True"/>
122 - </Unit17>  
123 - <Unit18>  
124 - <Filename Value="units/game_zmq_actors.pas"/>  
125 - <IsPartOfProject Value="True"/>  
126 - </Unit18> 118 + </Unit16>
127 </Units> 119 </Units>
128 </ProjectOptions> 120 </ProjectOptions>
129 <CompilerOptions> 121 <CompilerOptions>
cultural_matrix.lpr
@@ -40,7 +40,7 @@ begin @@ -40,7 +40,7 @@ begin
40 if FileExists(F) then 40 if FileExists(F) then
41 try 41 try
42 ID.LoadFromFile(F); 42 ID.LoadFromFile(F);
43 - F := ID.Text; 43 + F := Copy(ID.Text,0,Length(ID.Text)-2);
44 finally 44 finally
45 ID.Free; 45 ID.Free;
46 end 46 end
@@ -48,7 +48,7 @@ begin @@ -48,7 +48,7 @@ begin
48 try 48 try
49 ID.Text := s_random(32); 49 ID.Text := s_random(32);
50 ID.SaveToFile(F); 50 ID.SaveToFile(F);
51 - F := ID.Text; 51 + F := Copy(ID.Text,0,Length(ID.Text)-2);
52 except 52 except
53 on E: Exception do 53 on E: Exception do
54 begin 54 begin
form_chooseactor.lfm
@@ -3,7 +3,7 @@ object FormChooseActor: TFormChooseActor @@ -3,7 +3,7 @@ object FormChooseActor: TFormChooseActor
3 Height = 240 3 Height = 240
4 Top = 194 4 Top = 194
5 Width = 320 5 Width = 320
6 - BorderStyle = bsNone 6 + BorderStyle = bsDialog
7 Caption = 'FormChooseActor' 7 Caption = 'FormChooseActor'
8 ClientHeight = 240 8 ClientHeight = 240
9 ClientWidth = 320 9 ClientWidth = 320
form_chooseactor.pas
@@ -71,7 +71,7 @@ end; @@ -71,7 +71,7 @@ end;
71 71
72 procedure TFormChooseActor.FormCreate(Sender: TObject); 72 procedure TFormChooseActor.FormCreate(Sender: TObject);
73 begin 73 begin
74 - FCanClose := False 74 + FCanClose := True;
75 end; 75 end;
76 76
77 procedure TFormChooseActor.SetStyle(AValue: string); 77 procedure TFormChooseActor.SetStyle(AValue: string);
form_matrixgame.pas
@@ -212,21 +212,20 @@ procedure TFormMatrixGame.SetGameActor(AValue: TGameActor); @@ -212,21 +212,20 @@ procedure TFormMatrixGame.SetGameActor(AValue: TGameActor);
212 212
213 procedure SetZMQAdmin; 213 procedure SetZMQAdmin;
214 begin 214 begin
215 -  
216 - FGameControl := TGameControl.Create(TZMQAdmin.Create(Self),FID); 215 + FGameControl := TGameControl.Create(TZMQAdmin.Create(Self,FID));
217 GBAdmin.Visible:= True; 216 GBAdmin.Visible:= True;
218 end; 217 end;
219 218
220 procedure SetZMQPlayer; 219 procedure SetZMQPlayer;
221 begin 220 begin
222 - FGameControl := TGameControl.Create(TZMQPlayer.Create(Self),FID); 221 + FGameControl := TGameControl.Create(TZMQPlayer.Create(Self,FID));
223 btnConfirmRow.Visible := True; 222 btnConfirmRow.Visible := True;
224 StringGridMatrix.Enabled := True; 223 StringGridMatrix.Enabled := True;
225 end; 224 end;
226 225
227 procedure SetZMQWatcher; 226 procedure SetZMQWatcher;
228 begin 227 begin
229 - FGameControl := TGameControl.Create(TZMQWatcher.Create(Self),FID); 228 + //FGameControl := TGameControl.Create(TZMQWatcher.Create(Self,FID));
230 end; 229 end;
231 230
232 begin 231 begin
units/game_control.pas
@@ -43,7 +43,7 @@ type @@ -43,7 +43,7 @@ type
43 procedure SetRowBase(AValue: integer); 43 procedure SetRowBase(AValue: integer);
44 procedure SendSystemMessage(AMessage: array of UTF8String); 44 procedure SendSystemMessage(AMessage: array of UTF8String);
45 public 45 public
46 - constructor Create(AZMQActor : TZMQActor;AID : string);overload; 46 + constructor Create(AOwner : TComponent);override;
47 destructor Destroy; override; 47 destructor Destroy; override;
48 procedure SetMatrix; 48 procedure SetMatrix;
49 procedure SendRequest(ARequest : UTF8string); 49 procedure SendRequest(ARequest : UTF8string);
@@ -129,6 +129,8 @@ end; @@ -129,6 +129,8 @@ end;
129 129
130 function TGameControl.MessageHas(const A_CONST: string; AMessage: TStringList): Boolean; 130 function TGameControl.MessageHas(const A_CONST: string; AMessage: TStringList): Boolean;
131 begin 131 begin
  132 + Result:= False;
  133 + if not Assigned(AMessage) then Exit;
132 Result := Pos(A_CONST,AMessage[0])>0; 134 Result := Pos(A_CONST,AMessage[0])>0;
133 end; 135 end;
134 136
@@ -210,13 +212,14 @@ end; @@ -210,13 +212,14 @@ end;
210 212
211 procedure TGameControl.SendSystemMessage(AMessage: array of UTF8String); 213 procedure TGameControl.SendSystemMessage(AMessage: array of UTF8String);
212 begin 214 begin
213 - TZMQAdmin(FZMQActor).SendMessage(AMessage); 215 + //TZMQAdmin(FZMQActor).SendMessage(AMessage);
214 end; 216 end;
215 217
216 -constructor TGameControl.Create(AZMQActor: TZMQActor; AID: string); 218 +constructor TGameControl.Create(AOwner: TComponent);
217 begin 219 begin
218 - inherited Create(AZMQActor.Owner);  
219 - FZMQActor := AZMQActor; 220 + FZMQActor := TZMQActor(AOwner);
  221 + inherited Create(FZMQActor.Owner);
  222 + FID := FZMQActor.ID;
220 FZMQActor.OnMessageReceived:=@ReceiveMessage; 223 FZMQActor.OnMessageReceived:=@ReceiveMessage;
221 FZMQActor.OnRequestReceived:=@ReceiveRequest; 224 FZMQActor.OnRequestReceived:=@ReceiveRequest;
222 FZMQActor.OnReplyReceived:=@ReceiveReply; 225 FZMQActor.OnReplyReceived:=@ReceiveReply;
@@ -233,10 +236,7 @@ begin @@ -233,10 +236,7 @@ begin
233 MustDrawDots:=False; 236 MustDrawDots:=False;
234 MustDrawDotsClear:=False; 237 MustDrawDotsClear:=False;
235 238
236 - FZMQActor.SetID(AID);  
237 - FID := AID;  
238 -  
239 - FExperiment := TExperiment.Create(AZMQActor.Owner); 239 + FExperiment := TExperiment.Create(FZMQActor.Owner);
240 SendRequest(K_LOGIN); 240 SendRequest(K_LOGIN);
241 end; 241 end;
242 242
@@ -251,33 +251,55 @@ begin @@ -251,33 +251,55 @@ begin
251 end; 251 end;
252 252
253 procedure TGameControl.SendRequest(ARequest: UTF8string); 253 procedure TGameControl.SendRequest(ARequest: UTF8string);
  254 +var
  255 + M : array of UTF8String;
  256 +
  257 + procedure SetM(A : array of UTF8String);
  258 + var i : integer;
  259 + begin
  260 + SetLength(M,Length(A));
  261 + for i := 0 to Length(A) -1 do
  262 + M[i] := A[i];
  263 + end;
254 begin 264 begin
  265 + case ARequest of
  266 + K_LOGIN : SetM([
  267 + FZMQActor.ID
  268 + , ' '
  269 + , ARequest
  270 + ]);
  271 + end;
255 272
  273 + case FActor of
  274 + gaAdmin: begin
  275 + M[2] := GA_ADMIN+M[2];
  276 + end;
  277 + gaPlayer:begin
  278 + M[2] := GA_PLAYER+M[2];
  279 + end;
  280 + //gaWatcher:begin // for now cannot SendMessages
  281 + // M[0] := GA_WATCHER+M[0];
  282 + end;
  283 + FZMQActor.Request(M);
256 end; 284 end;
257 285
258 286
259 procedure TGameControl.SendMessage(AMessage: UTF8string); 287 procedure TGameControl.SendMessage(AMessage: UTF8string);
260 var 288 var
261 -{$IFDEF DEBUG}  
262 - i : integer;  
263 -{$ENDIF}  
264 - M : array of UTF8string; 289 + M : array of UTF8String;
265 290
266 - procedure SetM(A: array of UTF8String); 291 + procedure SetM(A : array of UTF8String);
267 var i : integer; 292 var i : integer;
268 begin 293 begin
269 SetLength(M,Length(A)); 294 SetLength(M,Length(A));
270 for i := 0 to Length(A) -1 do 295 for i := 0 to Length(A) -1 do
271 M[i] := A[i]; 296 M[i] := A[i];
272 end; 297 end;
273 -  
274 begin 298 begin
275 case AMessage of 299 case AMessage of
276 K_ARRIVED : SetM([ 300 K_ARRIVED : SetM([
277 AMessage 301 AMessage
278 , FZMQActor.ID 302 , FZMQActor.ID
279 - //, FZMQActor.ClassType.ClassName;  
280 - //,  
281 ]); 303 ]);
282 304
283 K_CHOICE : SetM([ 305 K_CHOICE : SetM([
@@ -318,15 +340,13 @@ begin @@ -318,15 +340,13 @@ begin
318 // M[0] := GA_WATCHER+M[0]; 340 // M[0] := GA_WATCHER+M[0];
319 end; 341 end;
320 FZMQActor.SendMessage(M); 342 FZMQActor.SendMessage(M);
321 -  
322 -{$IFDEF DEBUG}  
323 - for i := 0 to Length(M)-1 do  
324 - WriteLn(M[i]);  
325 -{$ENDIF}  
326 end; 343 end;
327 344
328 procedure TGameControl.ReceiveMessage(AMessage: TStringList); 345 procedure TGameControl.ReceiveMessage(AMessage: TStringList);
329 - 346 +{$IFDEF DEBUG}
  347 +var
  348 + i : integer;
  349 +{$ENDIF}
330 function MHas(const C : string) : Boolean; 350 function MHas(const C : string) : Boolean;
331 begin 351 begin
332 Result := MessageHas(C,AMessage); 352 Result := MessageHas(C,AMessage);
@@ -508,17 +528,51 @@ begin @@ -508,17 +528,51 @@ begin
508 if MHas(K_LEFT) then SayGoodBye; 528 if MHas(K_LEFT) then SayGoodBye;
509 if MHas(K_RESUME) then ResumeActor; 529 if MHas(K_RESUME) then ResumeActor;
510 if MHas(K_STATUS) then ReceiveStatus; 530 if MHas(K_STATUS) then ReceiveStatus;
  531 +
  532 + {$IFDEF DEBUG}
  533 + AMessage.Append('MessageReceived');
  534 + for i:= 0 to AMessage.Count-1 do
  535 + WriteLn(AMessage[i]);
  536 + {$ENDIF}
511 end; 537 end;
512 538
513 procedure TGameControl.ReceiveRequest(var ARequest: TStringList); 539 procedure TGameControl.ReceiveRequest(var ARequest: TStringList);
  540 +{$IFDEF DEBUG}
  541 +var
  542 + i : integer;
  543 +{$ENDIF}
  544 + function MHas(const C : string) : Boolean;
  545 + begin
  546 + Result := MessageHas(C,ARequest);
  547 + end;
  548 +
  549 + procedure ReplyLogin;
  550 + begin
  551 +
  552 + end;
514 begin 553 begin
  554 + if MHas(K_LOGIN) then ReplyLogin;
515 555
  556 + {$IFDEF DEBUG}
  557 + ARequest.Append('RequestReceived');
  558 + for i:= 0 to ARequest.Count-1 do
  559 + WriteLn(ARequest[i]);
  560 + {$ENDIF}
516 end; 561 end;
517 562
  563 +
  564 +// player
518 procedure TGameControl.ReceiveReply(AReply: TStringList); 565 procedure TGameControl.ReceiveReply(AReply: TStringList);
  566 +var i: integer;
519 begin 567 begin
520 - 568 + {$IFDEF DEBUG}
  569 + AReply.Append('ReplyReceived');
  570 + for i:= 0 to AReply.Count-1 do
  571 + WriteLn(AReply[i]);
  572 + {$ENDIF}
521 end; 573 end;
522 574
  575 +
  576 +
523 end. 577 end.
524 578
units/game_zmq_actors.pas
@@ -16,18 +16,17 @@ type @@ -16,18 +16,17 @@ type
16 16
17 TZMQActor = class(TComponent) 17 TZMQActor = class(TComponent)
18 private 18 private
19 - FID: UTF8string;  
20 FOnMessageReceived : TMessRecvProc; 19 FOnMessageReceived : TMessRecvProc;
21 FOnReplyReceived: TMessRecvProc; 20 FOnReplyReceived: TMessRecvProc;
22 FOnRequestReceived: TReqRecvProc; 21 FOnRequestReceived: TReqRecvProc;
23 protected 22 protected
  23 + FID: UTF8string;
24 procedure MessageReceived(AMultipartMessage : TStringList); 24 procedure MessageReceived(AMultipartMessage : TStringList);
25 - procedure ReplyReceived(AMultipartMessage : TStringList); virtual;  
26 - procedure RequestReceived(var AMultipartMessage : TStringList); virtual; 25 + procedure ReplyReceived(AMultipartMessage : TStringList);
  26 + procedure RequestReceived(var AMultipartMessage : TStringList);
27 public 27 public
28 - constructor Create(AOwner : TComponent); override; 28 + constructor Create(AOwner : TComponent; AID : UTF8String); virtual; overload;
29 procedure Start; virtual; 29 procedure Start; virtual;
30 - procedure SetID(S:string); virtual;  
31 procedure SendMessage(AMessage : array of UTF8string);virtual;abstract; 30 procedure SendMessage(AMessage : array of UTF8string);virtual;abstract;
32 procedure Request(ARequest : array of UTF8string);virtual;abstract; 31 procedure Request(ARequest : array of UTF8string);virtual;abstract;
33 property OnMessageReceived : TMessRecvProc read FOnMessageReceived write FOnMessageReceived; 32 property OnMessageReceived : TMessRecvProc read FOnMessageReceived write FOnMessageReceived;
@@ -41,10 +40,8 @@ type @@ -41,10 +40,8 @@ type
41 TZMQPlayer = class(TZMQActor) 40 TZMQPlayer = class(TZMQActor)
42 private 41 private
43 FZMQClient : TZMQClientThread; 42 FZMQClient : TZMQClientThread;
44 - protected  
45 - procedure ReplyReceived(AMultipartMessage: TStringList); override;  
46 public 43 public
47 - constructor Create(AOwner : TComponent); override; 44 + constructor Create(AOwner : TComponent; AID : UTF8String); override;
48 destructor Destroy; override; 45 destructor Destroy; override;
49 procedure Start; override; 46 procedure Start; override;
50 procedure SendMessage(AMessage : array of UTF8string); override; 47 procedure SendMessage(AMessage : array of UTF8string); override;
@@ -56,10 +53,8 @@ type @@ -56,10 +53,8 @@ type
56 TZMQAdmin = class(TZMQActor) 53 TZMQAdmin = class(TZMQActor)
57 private 54 private
58 FZMQServer : TZMQServerThread; 55 FZMQServer : TZMQServerThread;
59 - protected  
60 - procedure RequestReceived(var AMultipartMessage: TStringList); override;  
61 public 56 public
62 - constructor Create(AOwner : TComponent); override; 57 + constructor Create(AOwner : TComponent; AID : UTF8String); override;
63 destructor Destroy; override; 58 destructor Destroy; override;
64 procedure Start; override; 59 procedure Start; override;
65 procedure SendMessage(AMessage: array of UTF8string); override; 60 procedure SendMessage(AMessage: array of UTF8string); override;
@@ -85,12 +80,14 @@ end; @@ -85,12 +80,14 @@ end;
85 80
86 { TZMQAdmin } 81 { TZMQAdmin }
87 82
88 -constructor TZMQAdmin.Create(AOwner: TComponent); 83 +constructor TZMQAdmin.Create(AOwner: TComponent; AID: UTF8String);
89 begin 84 begin
90 inherited Create(AOwner); 85 inherited Create(AOwner);
91 - FZMQServer := TZMQServerThread.Create; 86 + FID:=AID;
  87 + FZMQServer := TZMQServerThread.Create(AID);
92 FZMQServer.OnMessageReceived:=@MessageReceived; 88 FZMQServer.OnMessageReceived:=@MessageReceived;
93 FZMQServer.OnRequestReceived:=@RequestReceived; 89 FZMQServer.OnRequestReceived:=@RequestReceived;
  90 +
94 end; 91 end;
95 92
96 destructor TZMQAdmin.Destroy; 93 destructor TZMQAdmin.Destroy;
@@ -109,13 +106,9 @@ begin @@ -109,13 +106,9 @@ begin
109 // do nothing, you are the server 106 // do nothing, you are the server
110 end; 107 end;
111 108
112 -procedure TZMQAdmin.RequestReceived(var AMultipartMessage: TStringList);  
113 -begin  
114 - if Assigned(FOnRequestReceived) then FOnRequestReceived(AMultipartMessage);  
115 -end;  
116 -  
117 procedure TZMQAdmin.Start; 109 procedure TZMQAdmin.Start;
118 begin 110 begin
  111 + inherited Start;
119 FZMQServer.Start; 112 FZMQServer.Start;
120 WriteLn('TZMQAdmin.Start'); 113 WriteLn('TZMQAdmin.Start');
121 end; 114 end;
@@ -132,15 +125,11 @@ begin @@ -132,15 +125,11 @@ begin
132 FZMQClient.Request(ARequest); 125 FZMQClient.Request(ARequest);
133 end; 126 end;
134 127
135 -procedure TZMQPlayer.ReplyReceived(AMultipartMessage: TStringList);  
136 -begin  
137 - if Assigned(FOnReplyReceived) then FOnReplyReceived(AMultipartMessage);  
138 -end;  
139 -  
140 -constructor TZMQPlayer.Create(AOwner: TComponent); 128 +constructor TZMQPlayer.Create(AOwner: TComponent; AID: UTF8String);
141 begin 129 begin
142 inherited Create(AOwner); 130 inherited Create(AOwner);
143 - FZMQClient := TZMQClientThread.Create; 131 + FID:=AID;
  132 + FZMQClient := TZMQClientThread.Create(AID);
144 FZMQClient.OnMessageReceived:=@MessageReceived; 133 FZMQClient.OnMessageReceived:=@MessageReceived;
145 FZMQClient.OnReplyReceived:=@ReplyReceived; 134 FZMQClient.OnReplyReceived:=@ReplyReceived;
146 end; 135 end;
@@ -160,11 +149,6 @@ end; @@ -160,11 +149,6 @@ end;
160 149
161 { TZMQActor } 150 { TZMQActor }
162 151
163 -procedure TZMQActor.SetID(S: string);  
164 -begin  
165 - FID := S;  
166 -end;  
167 -  
168 procedure TZMQActor.MessageReceived(AMultipartMessage: TStringList); 152 procedure TZMQActor.MessageReceived(AMultipartMessage: TStringList);
169 begin 153 begin
170 if Assigned(FOnMessageReceived) then FOnMessageReceived(AMultipartMessage); 154 if Assigned(FOnMessageReceived) then FOnMessageReceived(AMultipartMessage);
@@ -172,15 +156,15 @@ end; @@ -172,15 +156,15 @@ end;
172 156
173 procedure TZMQActor.ReplyReceived(AMultipartMessage: TStringList); 157 procedure TZMQActor.ReplyReceived(AMultipartMessage: TStringList);
174 begin 158 begin
175 - AbstractError; 159 + if Assigned(FOnReplyReceived) then FOnReplyReceived(AMultipartMessage);
176 end; 160 end;
177 161
178 procedure TZMQActor.RequestReceived(var AMultipartMessage: TStringList); 162 procedure TZMQActor.RequestReceived(var AMultipartMessage: TStringList);
179 begin 163 begin
180 - AbstractError; 164 + if Assigned(FOnRequestReceived) then FOnRequestReceived(AMultipartMessage);
181 end; 165 end;
182 166
183 -constructor TZMQActor.Create(AOwner: TComponent); 167 +constructor TZMQActor.Create(AOwner: TComponent; AID: UTF8String);
184 begin 168 begin
185 inherited Create(AOwner); 169 inherited Create(AOwner);
186 end; 170 end;
units/zmq_network.pas
@@ -31,8 +31,10 @@ type @@ -31,8 +31,10 @@ type
31 TZMQClientThread = class(TThread) 31 TZMQClientThread = class(TThread)
32 private 32 private
33 FContext : TZMQContext; 33 FContext : TZMQContext;
  34 + FID: shortstring;
34 FSubscriber, 35 FSubscriber,
35 - FPusher, 36 + FPusher_PUB,
  37 + FPusher_REQ,
36 FRequester : TZMQSocket; 38 FRequester : TZMQSocket;
37 FPoller : TZMQPoller; 39 FPoller : TZMQPoller;
38 FMessage : TStringList; 40 FMessage : TStringList;
@@ -42,25 +44,28 @@ type @@ -42,25 +44,28 @@ type
42 protected 44 protected
43 procedure Execute; override; 45 procedure Execute; override;
44 public 46 public
45 - constructor Create(CreateSuspended: Boolean = True); 47 + constructor Create(AID : UTF8String; CreateSuspended: Boolean = True); overload;
46 destructor Destroy; override; 48 destructor Destroy; override;
47 procedure Request(AMultipartMessage : array of UTF8String); 49 procedure Request(AMultipartMessage : array of UTF8String);
48 procedure Push(AMultipartMessage : array of UTF8String); 50 procedure Push(AMultipartMessage : array of UTF8String);
49 property OnMessageReceived : TMessRecvProc read FOnMessageReceived write FOnMessageReceived; 51 property OnMessageReceived : TMessRecvProc read FOnMessageReceived write FOnMessageReceived;
50 property OnReplyReceived : TMessRecvProc read FOnReplyReceived write FOnReplyReceived; 52 property OnReplyReceived : TMessRecvProc read FOnReplyReceived write FOnReplyReceived;
  53 + property ID :shortstring read FID;
51 end; 54 end;
52 55
53 { TZMQServerThread } 56 { TZMQServerThread }
54 57
55 TZMQServerThread = class(TThread) 58 TZMQServerThread = class(TThread)
56 private 59 private
  60 + FID: shortstring;
57 FOnMessageReceived: TMessRecvProc; 61 FOnMessageReceived: TMessRecvProc;
58 FOnRequestReceived: TReqRecvProc; 62 FOnRequestReceived: TReqRecvProc;
59 FContext : TZMQContext; 63 FContext : TZMQContext;
60 FPublisher, 64 FPublisher,
61 - FPuller,  
62 - FPusher,  
63 - FRouter, 65 + FPuller_PUB,
  66 + FPusher_PUB,
  67 + FPuller_REP,
  68 + //FRouter,
64 FReplier : TZMQSocket; 69 FReplier : TZMQSocket;
65 FPoller : TZMQPoller; 70 FPoller : TZMQPoller;
66 FMessage : TStringList; 71 FMessage : TStringList;
@@ -70,11 +75,12 @@ type @@ -70,11 +75,12 @@ type
70 protected 75 protected
71 procedure Execute; override; 76 procedure Execute; override;
72 public 77 public
73 - constructor Create(CreateSuspended: Boolean = True); 78 + constructor Create(AID : UTF8String; CreateSuspended: Boolean = True); overload;
74 destructor Destroy; override; 79 destructor Destroy; override;
75 procedure Push(AMultipartMessage: array of UTF8string); 80 procedure Push(AMultipartMessage: array of UTF8string);
76 property OnMessageReceived : TMessRecvProc read FOnMessageReceived write FOnMessageReceived; 81 property OnMessageReceived : TMessRecvProc read FOnMessageReceived write FOnMessageReceived;
77 property OnRequestReceived : TReqRecvProc read FOnRequestReceived write FOnRequestReceived; 82 property OnRequestReceived : TReqRecvProc read FOnRequestReceived write FOnRequestReceived;
  83 + property ID :shortstring read FID;
78 end; 84 end;
79 85
80 implementation 86 implementation
@@ -83,8 +89,10 @@ const @@ -83,8 +89,10 @@ const
83 CHost = 'tcp://*:'; 89 CHost = 'tcp://*:';
84 CLocalHost = 'tcp://localhost:'; 90 CLocalHost = 'tcp://localhost:';
85 CPortPublisher = '5056'; 91 CPortPublisher = '5056';
86 - CPortPuller = '5057';  
87 - CPortRouter = '5058'; 92 + CPortPuller_PUB = '5057';
  93 + CPortPuller_REP = '6057';
  94 + //CPortRouter = '5058';
  95 + CPortReplier = '5059';
88 96
89 97
90 { TZMQClientThread } 98 { TZMQClientThread }
@@ -108,20 +116,22 @@ begin @@ -108,20 +116,22 @@ begin
108 LPollEvent := FPoller.poll(50000); 116 LPollEvent := FPoller.poll(50000);
109 if LPollEvent > 0 then 117 if LPollEvent > 0 then
110 begin 118 begin
111 - WriteLn('Server4:FPoller:',FPoller.PollNumber);  
112 LMessagesCount := FSubscriber.recv(LMultipartMessage); 119 LMessagesCount := FSubscriber.recv(LMultipartMessage);
113 if LMessagesCount > 0 then 120 if LMessagesCount > 0 then
114 begin 121 begin
115 FMessage := LMultipartMessage; 122 FMessage := LMultipartMessage;
116 Synchronize(@MessageReceived); 123 Synchronize(@MessageReceived);
117 end; 124 end;
  125 + {$IFDEF DEBUG}
  126 + WriteLn('Server4:FPoller:',FPoller.PollNumber);
  127 + {$ENDIF}
118 end; 128 end;
119 end; 129 end;
120 LMultipartMessage.Free; 130 LMultipartMessage.Free;
121 end; 131 end;
122 132
123 133
124 -constructor TZMQClientThread.Create(CreateSuspended: Boolean); 134 +constructor TZMQClientThread.Create(AID: UTF8String; CreateSuspended: Boolean);
125 begin 135 begin
126 FreeOnTerminate := True; 136 FreeOnTerminate := True;
127 FContext := TZMQContext.create; 137 FContext := TZMQContext.create;
@@ -130,12 +140,17 @@ begin @@ -130,12 +140,17 @@ begin
130 FSubscriber := FContext.Socket( stSub ); 140 FSubscriber := FContext.Socket( stSub );
131 FSubscriber.connect(CLocalHost+CPortPublisher);FSubscriber.Subscribe(''); 141 FSubscriber.connect(CLocalHost+CPortPublisher);FSubscriber.Subscribe('');
132 // pushes to server 142 // pushes to server
133 - FPusher := FContext.Socket( stPush );  
134 - FPusher.connect(CLocalHost+CPortPuller); 143 + FPusher_PUB := FContext.Socket( stPush );
  144 + FPusher_PUB.connect(CLocalHost+CPortPuller_PUB);
  145 +
  146 + FPusher_REQ := FContext.Socket( stPush );
  147 + FPusher_REQ.connect(CLocalHost+CPortPuller_REP);
135 148
136 // request from server 149 // request from server
137 FRequester := FContext.Socket( stReq ); 150 FRequester := FContext.Socket( stReq );
138 - FRequester.connect(CLocalHost+CPortRouter); 151 + //FRequester.Identity := AID;
  152 + //FRequester.connect(CLocalHost+CPortRouter);
  153 + FRequester.connect(CLocalHost+CPortReplier);
139 154
140 // handle income messages 155 // handle income messages
141 FPoller := TZMQPoller.Create(True, FContext); 156 FPoller := TZMQPoller.Create(True, FContext);
@@ -148,7 +163,8 @@ destructor TZMQClientThread.Destroy; @@ -148,7 +163,8 @@ destructor TZMQClientThread.Destroy;
148 begin 163 begin
149 FPoller.Terminate; 164 FPoller.Terminate;
150 FPoller.Free; 165 FPoller.Free;
151 - FPusher.Free; 166 + FPusher_REQ.Free;
  167 + FPusher_PUB.Free;
152 FSubscriber.Free; 168 FSubscriber.Free;
153 FContext.Free; 169 FContext.Free;
154 inherited Destroy; 170 inherited Destroy;
@@ -158,15 +174,18 @@ procedure TZMQClientThread.Request(AMultipartMessage: array of UTF8String); @@ -158,15 +174,18 @@ procedure TZMQClientThread.Request(AMultipartMessage: array of UTF8String);
158 var AReply : TStringList; 174 var AReply : TStringList;
159 begin 175 begin
160 AReply:=TStringList.Create; 176 AReply:=TStringList.Create;
161 - FRequester.send( AMultipartMessage );  
162 - FRequester.recv( AReply ); 177 +
  178 + FPusher_REQ.send( AMultipartMessage ); // avoid infinite loops inside server pool
  179 + FRequester.send( '' ); // block client until server recv
  180 + FRequester.recv( AReply ); // release client
  181 +
163 if Assigned(FOnReplyReceived) then FOnReplyReceived(AReply); 182 if Assigned(FOnReplyReceived) then FOnReplyReceived(AReply);
164 AReply.Free; 183 AReply.Free;
165 end; 184 end;
166 185
167 procedure TZMQClientThread.Push(AMultipartMessage: array of UTF8String); 186 procedure TZMQClientThread.Push(AMultipartMessage: array of UTF8String);
168 begin 187 begin
169 - FPusher.send(AMultipartMessage); 188 + FPusher_PUB.send(AMultipartMessage);
170 end; 189 end;
171 190
172 191
@@ -189,12 +208,12 @@ end; @@ -189,12 +208,12 @@ end;
189 208
190 procedure TZMQServerThread.RequestReceived; 209 procedure TZMQServerThread.RequestReceived;
191 begin 210 begin
192 - if Assigned(FOnMessageReceived) then FOnMessageReceived(FMessage); 211 + if Assigned(FOnRequestReceived) then FOnRequestReceived(FMessage);
193 end; 212 end;
194 213
195 procedure TZMQServerThread.Execute; 214 procedure TZMQServerThread.Execute;
196 var 215 var
197 - LMultipartMessage : TStringList; 216 + LMultipartMessage, S : TStringList;
198 LPollCount, 217 LPollCount,
199 LMessagesCount : integer; 218 LMessagesCount : integer;
200 begin 219 begin
@@ -204,71 +223,74 @@ begin @@ -204,71 +223,74 @@ begin
204 LMultipartMessage := TStringList.Create; 223 LMultipartMessage := TStringList.Create;
205 while not Terminated do 224 while not Terminated do
206 begin 225 begin
207 - LMultipartMessage.Clear;  
208 - LPollCount := FPoller.poll(50000);  
209 - if LPollCount > 0 then 226 + LPollCount := FPoller.poll;
  227 + if LPollCount = 0 then Continue;
  228 + if pePollIn in FPoller.PollItem[0].revents then
210 begin 229 begin
211 - case FPoller.PollNumber of  
212 - 2 : begin// puller  
213 - {$IFDEF DEBUG}  
214 - WriteLn('Server2:');  
215 - {$ENDIF}  
216 - LMessagesCount := FPuller.recv(LMultipartMessage);  
217 - if LMessagesCount > 0 then  
218 - begin  
219 - FMessage := LMultipartMessage;  
220 - Synchronize(@MessageReceived);  
221 - FPublisher.send(LMultiPartMessage);  
222 - end;  
223 - end;  
224 -  
225 - 1 : begin//router  
226 - {$IFDEF DEBUG}  
227 - WriteLn('Server1:');  
228 - {$ENDIF}  
229 - // Exit;  
230 - if LMessagesCount > 2 then  
231 - begin  
232 - FRouter.recv(LMultipartMessage);  
233 - FMessage := LMultipartMessage;  
234 - Synchronize(@RequestReceived);  
235 - LMultipartMessage := FMessage;  
236 - FRouter.send(LMultipartMessage);  
237 - end;  
238 - end;  
239 - end; 230 + LMultipartMessage.Clear;
  231 + {$IFDEF DEBUG}
  232 + WriteLn('pull':LPollCount);
  233 + {$ENDIF}
  234 + LMessagesCount := FPuller_PUB.recv(LMultipartMessage);
  235 + if LMessagesCount > 0 then
  236 + begin
  237 + FMessage := LMultipartMessage;
  238 + Synchronize(@MessageReceived);
  239 + FPublisher.send(LMultiPartMessage);
  240 + end;
  241 + end;
240 242
  243 + if pePollIn in FPoller.PollItem[1].revents then
  244 + begin
  245 + LMultipartMessage.Clear;
  246 + {$IFDEF DEBUG}
  247 + WriteLn('rep:',LPollCount);
  248 + {$ENDIF}
  249 + LMessagesCount := FPuller_REP.recv(LMultipartMessage);
  250 + if LMessagesCount > 2 then
  251 + begin
  252 + FMessage := LMultipartMessage;
  253 + Synchronize(@RequestReceived); LMultipartMessage := FMessage; S := TStringList.Create;
  254 + FReplier.recv(S); S.Free;
  255 + FReplier.send(LMultipartMessage);
  256 + end;
241 end; 257 end;
242 end; 258 end;
243 end; 259 end;
244 260
245 -constructor TZMQServerThread.Create(CreateSuspended: Boolean); 261 +constructor TZMQServerThread.Create(AID: UTF8String; CreateSuspended: Boolean);
246 begin 262 begin
247 FreeOnTerminate := True; 263 FreeOnTerminate := True;
248 FContext := TZMQContext.create; 264 FContext := TZMQContext.create;
249 265
250 // publisher for subscribers 266 // publisher for subscribers
251 FPublisher := FContext.Socket( stPub ); // server don't need to subscribe to itself 267 FPublisher := FContext.Socket( stPub ); // server don't need to subscribe to itself
  268 + FPublisher.bind(CHost+CPortPublisher);
252 269
253 // pull from inside and outside 270 // pull from inside and outside
254 - FPuller := FContext.Socket( stPull ); 271 + FPuller_PUB := FContext.Socket( stPull );
  272 + FPuller_PUB.bind(CHost+CPortPuller_PUB);
255 273
256 // pushes from inside to outside 274 // pushes from inside to outside
257 - FPusher := FContext.Socket( stPush );  
258 - FPusher.connect(CLocalHost+CPortPuller); 275 + FPusher_PUB := FContext.Socket( stPush );
  276 + FPusher_PUB.connect(CLocalHost+CPortPuller_PUB);
259 277
260 // reply requests from outside 278 // reply requests from outside
261 - FRouter := FContext.Socket( stRouter ); 279 + FPuller_REP := FContext.Socket( stPull );
  280 + FPuller_REP.bind(CHost+CPortPuller_REP);
  281 + //FRouter := FContext.Socket( stRouter );
  282 + //FRouter.Identity:=AID;
  283 + //FRouter.bind(CHost+CPortRouter);
262 284
263 - // local setup  
264 - FPublisher.bind(CHost+CPortPublisher);  
265 - FPuller.bind(CHost+CPortPuller);  
266 - FRouter.bind(CHost+CPortRouter); 285 + // blocking server thread for now
  286 + FReplier := FContext.Socket( stRep );
  287 + FReplier.bind(CHost+CPortReplier);
267 288
268 // handle sockets 289 // handle sockets
269 FPoller := TZMQPoller.Create(True, FContext); 290 FPoller := TZMQPoller.Create(True, FContext);
270 - FPoller.Register(FPuller,[pePollIn],True);  
271 - FPoller.Register(FRouter, [pePollIn], True); 291 + FPoller.Register(FPuller_PUB,[pePollIn],True);
  292 + FPoller.Register(FPuller_REP,[pePollIn],True);
  293 + //FPoller.Register(FRouter, [pePollIn], True);
272 294
273 inherited Create(CreateSuspended); 295 inherited Create(CreateSuspended);
274 end; 296 end;
@@ -277,9 +299,10 @@ destructor TZMQServerThread.Destroy; @@ -277,9 +299,10 @@ destructor TZMQServerThread.Destroy;
277 begin 299 begin
278 FPoller.Terminate; 300 FPoller.Terminate;
279 FPoller.Free; 301 FPoller.Free;
280 - FRouter.Free;  
281 - FPusher.Free;  
282 - FPuller.Free; 302 + //FRouter.Free;
  303 + FPuller_REP.Free;
  304 + FPusher_PUB.Free;
  305 + FPuller_PUB.Free;
283 FPublisher.Free; 306 FPublisher.Free;
284 FContext.Free; 307 FContext.Free;
285 inherited Destroy; 308 inherited Destroy;
@@ -287,7 +310,7 @@ end; @@ -287,7 +310,7 @@ end;
287 310
288 procedure TZMQServerThread.Push(AMultipartMessage: array of UTF8string); 311 procedure TZMQServerThread.Push(AMultipartMessage: array of UTF8string);
289 begin 312 begin
290 - FPusher.send(AMultipartMessage); 313 + FPusher_PUB.send(AMultipartMessage);
291 end; 314 end;
292 315
293 316