1#include <QLoggingCategory>
2#include <QStandardPaths>
10 #if defined(HAVE_UNIX_DOMAIN_SOCKET)
19 #define close closesocket
23 #include <sys/eventfd.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
31 #include <arpa/inet.h>
33 #if defined(HAVE_UNIX_DOMAIN_SOCKET)
36 #define INVALID_SOCKET -1
37 #define SOCKET_ERROR -1
42static Q_LOGGING_CATEGORY(log,
"Channel.Event")
44CEvent::CEvent(QObject *parent)
47 qDebug(log) <<
"CEvent::CEvent";
48 fd[0] = INVALID_SOCKET;
49 fd[1] = INVALID_SOCKET;
55 qDebug(log) <<
"Event::~Event()";
62 qDebug(log) <<
"Use eventfd";
63 fd[0] = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC | EFD_SEMAPHORE);
65 return CreateSocketPair(fd);
72 if(INVALID_SOCKET != fd[0]) {
74 fd[0] = INVALID_SOCKET;
77 if(INVALID_SOCKET != fd[1]) {
79 fd[1] = INVALID_SOCKET;
87 if(INVALID_SOCKET == fd[0])
93 nRet = eventfd_read(fd[0], &value);
96 if(EAGAIN == errno || EWOULDBLOCK == errno)
98 qDebug(log) <<
"eventfd_read fail" << errno <<
"-" << strerror(errno);
103 nRet = recv(fd[0], buf, len, 0);
109 if(EAGAIN == errno || EWOULDBLOCK == errno)
111 qDebug(log) <<
"recv fail. nRet:" << nRet <<
"fd:" << fd[0]
112 <<
"error:" << errno <<
"-" << strerror(errno);
123 if(INVALID_SOCKET == fd[0])
125 nRet = eventfd_write(fd[0], 1);
127 if(EAGAIN == errno || EWOULDBLOCK == errno)
129 qDebug(log) <<
"eventfd_write fail" << errno <<
"-" << strerror(errno);
132 if(INVALID_SOCKET == fd[1])
136 nRet = send(fd[1], buf, len, 0);
142 if(EAGAIN == errno || EWOULDBLOCK == errno)
144 qDebug(log) <<
"send fail" <<
"fd:" << fd[1]
145 <<
"error:" << errno <<
"-" << strerror(errno);
151qintptr CEvent::GetFd()
156int CEvent::CreateSocketPair(qintptr fd[])
158 int family = AF_INET;
159 int type = SOCK_STREAM;
160 SOCKET listener = INVALID_SOCKET;
161 SOCKET connector = INVALID_SOCKET;
162 SOCKET acceptor = INVALID_SOCKET;
166 qCritical(log) <<
"The fd is null";
170#if defined(HAVE_UNIX_DOMAIN_SOCKET)
171 struct sockaddr_un listen_addr;
172 struct sockaddr_un connect_addr;
174 QString szUnixDomainSocket;
175 szPath = QStandardPaths::writableLocation(
176 QStandardPaths::TempLocation)
177 + QDir::separator() +
"RabbitRemoteControl";
178 auto now = std::chrono::system_clock::now();
179 auto tick = std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count();
180 szUnixDomainSocket = szPath + QDir::separator() +
"RRC_" + QString::number(tick) +
"_"
181 + QString::number((
unsigned long)QThread::currentThreadId()) +
".socket";
185 QDir f(szUnixDomainSocket);
187 f.remove(szUnixDomainSocket);
188 size =
sizeof(sockaddr_un::sun_path);
189 if(szUnixDomainSocket.size() > size)
191 qCritical(log) <<
"The unix domain socket length greater then" << size;
197 struct sockaddr_in listen_addr;
198 struct sockaddr_in connect_addr;
201 size =
sizeof(listen_addr);
202 memset(&listen_addr, 0,
sizeof(listen_addr));
203 memset(&connect_addr, 0,
sizeof(connect_addr));
205 listener = socket(family, type, 0);
206 if (INVALID_SOCKET == listener) {
207 qCritical(log) <<
"Create server socket fail:"
208 << errno <<
"[" << strerror(errno) <<
"]";
214#if defined(HAVE_UNIX_DOMAIN_SOCKET)
215 listen_addr.sun_family = AF_UNIX;
216 strcpy(listen_addr.sun_path, szUnixDomainSocket.toStdString().c_str());
218 listen_addr.sin_family = AF_INET;
219 listen_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
220 listen_addr.sin_port = 0;
222 if (-1 == bind(listener, (
struct sockaddr *) &listen_addr,
sizeof(listen_addr)))
224 qCritical(log) <<
"bind fail:" << strerror(errno)
225 << GetAddress(&listen_addr);
228 if (-1 == listen(listener, 1))
230 qCritical(log) <<
"listen fail:" << strerror(errno)
231 << GetAddress(&listen_addr);
235#if !defined(HAVE_UNIX_DOMAIN_SOCKET)
237 if (-1 == getsockname(listener, (
struct sockaddr *) &listen_addr, &size))
239 qCritical(log) <<
"Get listen socket address fail";
242 if (
sizeof(listen_addr) != size) {
243 qCritical(log) <<
"getsockname return size greater then supplied to the call:"
244 <<
"return size:" << size
245 <<
"call size:" <<
sizeof(listen_addr);
249 qDebug(log) <<
"Socket listener:" << GetAddress(&listen_addr);
251 connector = socket(family, type, 0);
252 if (INVALID_SOCKET == connector) {
253 qCritical(log) <<
"Create connect socket fail:"
254 << errno <<
"[" << strerror(errno) <<
"]";
258 if (-1 == ::connect(connector, (
struct sockaddr *) &listen_addr, size)) {
259 qDebug(log) <<
"connect fail:" << strerror(errno)
260 << GetAddress(&listen_addr);
264 acceptor = accept(listener, (
struct sockaddr *) &listen_addr, &size);
265 if (INVALID_SOCKET == acceptor)
267 qDebug(log) <<
"accept fail:" << strerror(errno)
268 << GetAddress(&listen_addr);
271 qDebug(log) <<
"accept:" << GetAddress(&listen_addr)
272 <<
"accept fd:" << acceptor <<
"connect fd:" << connector;
273#if !defined(HAVE_UNIX_DOMAIN_SOCKET)
274 if (
sizeof(listen_addr) != size) {
275 qCritical(log) <<
"accept return size greater then supplied to the call:"
276 <<
"return size:" << size
277 <<
"call size:" <<
sizeof(listen_addr);
282 if (getsockname(connector, (
struct sockaddr *) &connect_addr, &size) == -1) {
283 qCritical(log) <<
"Get connector socket address fail";
286 qDebug(log) <<
"Connector port:" << ntohs(connect_addr.sin_port) <<
"fd:" << connector;
287 if (size !=
sizeof (connect_addr)
288 || listen_addr.sin_family != connect_addr.sin_family
289 || listen_addr.sin_addr.s_addr != connect_addr.sin_addr.s_addr
290 || listen_addr.sin_port != connect_addr.sin_port) {
291 qCritical(log) <<
"Accept address"
292 << ntohs(listen_addr.sin_port)
293 <<
"is not same connect address"
294 << ntohs(connect_addr.sin_port);
302 SetSocketNonBlocking(fd[0]);
303 SetSocketNonBlocking(fd[1]);
304#if !defined(HAVE_UNIX_DOMAIN_SOCKET)
305 EnableNagles(fd[0],
false);
306 EnableNagles(fd[1],
false);
311 if (INVALID_SOCKET != listener)
313 if (INVALID_SOCKET != connector)
315 if (INVALID_SOCKET != acceptor)
321QString CEvent::GetAddress(
void* address)
324#if defined(HAVE_UNIX_DOMAIN_SOCKET)
325 struct sockaddr_un* pAdd = (
struct sockaddr_un*)address;
326 szAdd = pAdd->sun_path;
328 struct sockaddr_in* pAdd = (
struct sockaddr_in*)address;
329 szAdd = inet_ntoa(pAdd->sin_addr) + QString(
":") + QString::number(ntohs(pAdd->sin_port));
335int CEvent::SetSocketNonBlocking(SOCKET fd)
339 unsigned long nonblocking = 1;
340 if (ioctlsocket(fd, FIONBIO, &nonblocking) == SOCKET_ERROR) {
341 qCritical(log,
"ioctlsocket(%d, FIONBIO, &%lu)", (
int)fd, (
unsigned long)nonblocking);
348 if ((flags = fcntl(fd, F_GETFL, NULL)) < 0) {
349 qCritical(log,
"fcntl(%d, F_GETFL)", fd);
352 if (!(flags & O_NONBLOCK)) {
353 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == SOCKET_ERROR) {
354 qCritical(log,
"fcntl(%d, F_SETFL)", fd);
363int CEvent::SetSocketBlocking(SOCKET fd,
bool block)
366 unsigned long nonblocking = 1;
369 if (ioctlsocket(fd, FIONBIO, &nonblocking) == SOCKET_ERROR) {
370 qCritical(log,
"ioctlsocket(%d, FIONBIO, &%lu)", (
int)fd, (
unsigned long)nonblocking);
375 if ((flags = fcntl(fd, F_GETFL, NULL)) < 0) {
376 qCritical(log,
"fcntl(%d, F_GETFL)", fd);
381 flags &= ~O_NONBLOCK;
385 if (fcntl(fd, F_SETFL, flags) == SOCKET_ERROR) {
386 qCritical(log,
"fcntl(%d, F_SETFL)", fd);
393bool CEvent::EnableNagles(SOCKET fd,
bool enable) {
395 int opt = enable ? 0 : 1;
397 optlen =
sizeof(opt);
410 opt = enable ? 0 : 1;
411 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (
const char*)&opt, optlen);
412 if (SOCKET_ERROR == rc) {
413 qCritical(log) <<
"unable to setsockopt TCP_NODELAY:"
414 << errno <<
"-" << strerror(errno);