echo_server.cpp
3.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#include <chrono>
#include <future>
#include <iostream>
#include <list>
#include <memory>
#include <sstream>
#include <string.h>
#include <thread>
#include <utility>
#include "socket.h"
#include "config.h"
void threadManager(std::list<std::pair<std::shared_ptr<std::atomic<bool>>, std::unique_ptr<std::thread>>> & threads)
{
std::cout << "iniciei gerenciador de threads" << std::endl;
while(true)
{
std::this_thread::sleep_for(std::chrono::seconds(30));
std::cout << "realizando limpeza das threads (size = " << threads.size() << ")" << std::endl;
int i = 0;
std::list<std::pair<std::shared_ptr<std::atomic<bool>>, std::unique_ptr<std::thread>>>::iterator it;
for(it = threads.begin(); it != threads.end(); ++i)
{
if ((*(it->first)))
{
(it->first).reset();
(it->second)->join();
(it->second).reset();
it = threads.erase(it);
std::cout << "removi thread inutilizada " << (i+1) << std::endl;
}
else
{
++it;
}
}
}
}
void echoServerTask(std::unique_ptr<lemoce::ClientSocket> client_ptr,
std::shared_ptr<std::atomic<bool>> done)
{
lemoce::ClientSocket client = *client_ptr;
std::thread::id threadId = std::this_thread::get_id();
try
{
while(!client.isClosed())
{
std::unique_ptr<std::string> message_ptr = client.read();
if ((*message_ptr).size() == 0)
{
client.close();
break;
}
std::string * message = message_ptr.get();
client.write(*message);
std::cout << "enviada mensagem para filho " << threadId <<
" (" << client.getLocalHost() << ", " <<
client.getLocalPort() << ")" << std::endl;
}
}
catch (lemoce::SocketException & ex)
{
client.close();
}
std::cout << "terminei a thread" << std::endl;
(*done) = true;
}
int main(int argc, char *argv[])
{
int MAX_BUFFER = 8192;
int port = 7;
if (argc == 2)
{
std::istringstream iss{argv[1]};
iss >> port;
}
try
{
lemoce::ServerSocket server{port};
server.listen();
std::list<
std::pair<std::shared_ptr<std::atomic<bool>>,
std::unique_ptr<std::thread>>> threads;
std::thread manager{threadManager, std::ref(threads)};
for(;;)
{
std::unique_ptr<lemoce::ClientSocket> client_ptr{new lemoce::ClientSocket{}};
try
{
server.accept((*client_ptr));
}
catch (lemoce::SocketException & ex)
{
if (ex.getErrno() == EINTR)
{
(*client_ptr).close();
std::cout << "fechei conexao pela excecao de interrupcao" << std::endl;
continue;
}
else
{
throw ex;
}
}
std::shared_ptr<std::atomic<bool>> done{new std::atomic<bool>(false)};
std::unique_ptr<std::thread> thread{new std::thread
{
echoServerTask,
std::move(client_ptr),
done
}};
threads.push_back(std::make_pair(done, std::move(thread)));
}
}
catch (lemoce::SocketException& ex)
{
std::cout << ex.getMessage() << std::endl;
}
return 0;
}