jlocalsocket.cpp 7.85 KB
/***************************************************************************
 *   Copyright (C) 2005 by Jeff Ferr                                       *
 *   root@sat                                                              *
 *                                                                         *
 *   This program is free software; you can redistribute it and/or modify  *
 *   it under the terms of the GNU General Public License as published by  *
 *   the Free Software Foundation; either version 2 of the License, or     *
 *   (at your option) any later version.                                   *
 *                                                                         *
 *   This program is distributed in the hope that it will be useful,       *
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
 *   GNU General Public License for more details.                          *
 *                                                                         *
 *   You should have received a copy of the GNU General Public License     *
 *   along with this program; if not, write to the                         *
 *   Free Software Foundation, Inc.,                                       *
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
 ***************************************************************************/
#include "Stdafx.h"
#include "jlocalsocket.h"
#include "jsocketexception.h"
#include "jsockettimeoutexception.h"
#include "jioexception.h"

namespace jsocket {

LocalSocket::LocalSocket(std::string file, int timeout_, int rbuf_, int wbuf_):
	jsocket::Connection(JCT_TCP)
{
	jcommon::Object::SetClassName("jsocket::Socket");

	_file = file;

#ifdef _WIN32
	throw jcommon::SocketException("Named socket unsupported.");
#endif

	_is = NULL;
	_os = NULL;
	_is_closed = true;
	_sent_bytes = 0;
	_receive_bytes = 0;
	_timeout = timeout_;

	CreateSocket();
	ConnectSocket();
	InitStreams(rbuf_, wbuf_);

	_is_closed = false;
}

LocalSocket::~LocalSocket()
{
	try {
		Close();
	} catch (...) {
	}

	if (_is != NULL) {
		delete _is;
		_is = NULL;
	}

	if (_os != NULL) {
		delete _os;
		_os = NULL;
	}
}

/** Private */

LocalSocket::LocalSocket(jsocket_t handler_, std::string file_, int timeout_, int rbuf_, int wbuf_):
	jsocket::Connection(JCT_TCP)
{
	jcommon::Object::SetClassName("jsocket::Socket");

	_fd = handler_;
	_file = file_;

	InitStreams(rbuf_, wbuf_);

	_is_closed = false;
}

void LocalSocket::CreateSocket()
{
#ifdef _WIN32
#else
	_fd = socket (PF_UNIX, SOCK_STREAM, PF_UNSPEC);

	if (_fd < 0) {
		throw SocketException("Socket creation exception");
	}
#endif
}

void LocalSocket::ConnectSocket()
{
#ifdef _WIN32
#else
	int length = sizeof(_address.sun_path)-1;

	_address.sun_family = AF_UNIX;
	strncpy(_address.sun_path, _file.c_str(), length);
	
	int r,
			address_length = sizeof(_address.sun_family) + strnlen(_address.sun_path, length);

	if (_timeout > 0) {
		int opt = 1;

		ioctl(_fd, FIONBIO, &opt);

		r = connect(_fd, (struct sockaddr *)&_address, address_length);

		if (errno != EINPROGRESS) {
			throw SocketException("Socket connection exception");
		}

		if (r != 0) {
			fd_set wset;
			struct timeval t;

			t.tv_sec = _timeout/1000;
			t.tv_usec = (_timeout%1000)*1000;

			FD_ZERO(&wset);
			FD_SET(_fd, &wset);

			r = select(_fd + 1, &wset, &wset, &wset, &t);

			if (r <= 0) {
				opt = 0;

				if (ioctl(_fd, FIONBIO, &opt) < 0) {
					throw SocketException("Socket connection exception");
				}

				shutdown(_fd, SHUT_RDWR);

				if (r == 0) {
					throw SocketException("Socket connection timeout exception");
				} else if (r < 0) {
					throw SocketException("Socket connection exception");
				}
			}

			int optlen = sizeof(r);

			getsockopt(_fd, SOL_SOCKET, SO_ERROR, (void *)&r, (socklen_t *)&optlen);

			if (r != 0) {
				throw SocketException("Unknown socket exception");
			}
		}

		opt = 0;

		if (ioctl(_fd, FIONBIO, &opt) < 0) {
			throw SocketException("Socket connection exception");
		}
	} else {
		r = connect(_fd, (struct sockaddr *)&_address, sizeof(_address));
	}

	if (r < 0) {
		throw SocketException("Socket connection exception");
	}
#endif
}

void LocalSocket::InitStreams(int64_t rbuf_, int64_t wbuf_)
{
	_is = new SocketInputStream((Connection *)this, &_is_closed, rbuf_);
	_os = new SocketOutputStream((Connection *)this, &_is_closed, wbuf_);
}

/** End */

jsocket_t LocalSocket::GetHandler()
{
#ifdef _WIN32
	return -1;
#else
	return _fd;
#endif
}

std::string LocalSocket::GetLocalFile()
{
	return _file;
}

int LocalSocket::Send(const char *data_, int size_, int time_)
{
	if (_is_closed == true) {
		throw SocketException("Connection closed exception");
	}

#ifdef _WIN32
	return -1;
#else
	struct pollfd ufds[1];

	ufds[0].fd = _fd;
	ufds[0].events = POLLOUT | POLLWRBAND;

	int rv = poll(ufds, 1, time_);

	if (rv == -1) {
		throw SocketException("Invalid send parameters exception");
	} else if (rv == 0) {
		throw SocketTimeoutException("Socket send timeout exception");
	} else {
		if ((ufds[0].revents & POLLOUT) || (ufds[0].revents & POLLWRBAND)) {
			return LocalSocket::Send(data_, size_);
		}
	}
#endif

	return -1;
}

int LocalSocket::Send(const char *data_, int size_, bool block_)
{
	if (_is_closed == true) {
		throw SocketException("Connection closed exception");
	}

#ifdef _WIN32
	return -1;
#else
	int flags;

	if (block_ == true) {
		flags = MSG_NOSIGNAL;
	} else {
		flags = MSG_NOSIGNAL | MSG_DONTWAIT;
	}

	int n = ::send(_fd, data_, size_, flags);

	if (n < 0) {
		if (errno == EAGAIN) {
			if (block_ == true) {
				throw SocketTimeoutException("Socket send timeout exception");
			} else {
				// INFO:: non-blocking socket, no data read
				n = 0;
			}
		} else if (errno == EPIPE || errno == ECONNRESET) {
			Close();

			throw SocketException("Broken pipe exception");
		} else {
			throw SocketTimeoutException("Socket send exception");
		}
	}

	_sent_bytes += n;

	return n;
#endif
}

int LocalSocket::Receive(char *data_, int size_, int time_)
{
	if (_is_closed == true) {
		throw SocketException("Connection closed exception");
	}

#ifdef _WIN32
#else
	struct pollfd ufds[1];

	ufds[0].fd = _fd;
	ufds[0].events = POLLIN | POLLRDBAND;

	int rv = poll(ufds, 1, time_);

	if (rv == -1) {
		throw SocketException("Invalid receive parameters exception");
	} else if (rv == 0) {
		throw SocketTimeoutException("Socket read timeout exception");
	} else {
		if ((ufds[0].revents & POLLIN) || (ufds[0].revents & POLLRDBAND)) {
			return LocalSocket::Receive(data_, size_);
		}
	}
#endif
	
	return -1;
}

int LocalSocket::Receive(char *data_, int size_, bool block_)
{
	if (_is_closed == true) {
		throw SocketException("Connection closed exception");
	}

#ifdef _WIN32
#else
	int flags = 0;

	if (block_ == false) {
		flags = MSG_DONTWAIT;
	}

	int n = ::recv(_fd, data_, size_, flags);

	if (n < 0) {
		if (errno == EAGAIN) {
			if (block_ == true) {
				throw SocketTimeoutException("Socket receive timeout exception");
			} else {
				// INFO:: non-blocking socket, no data read
				n = 0;
			}
		} else {
			throw jio::IOException("Socket read exception");
		}
	} else if (n == 0) {
		Close(); 
		
		throw SocketException("Broken pipe exception");
	}

	_receive_bytes += n;

	return n;
#endif
}

void LocalSocket::Close()
{
	if (_is_closed == true) {
		return;
	}

#ifdef _WIN32
#else
	if (close(_fd) != 0) {
		throw SocketException("Unknown close exception");
	}
	
	unlink(_file.c_str());
#endif
		
	_is_closed = true;
}

jio::InputStream * LocalSocket::GetInputStream()
{
	return (jio::InputStream *)_is;
}

jio::OutputStream * LocalSocket::GetOutputStream()
{
	return (jio::OutputStream *)_os;
}

int64_t LocalSocket::GetSentBytes()
{
	return _sent_bytes + _os->GetSentBytes();
}

int64_t LocalSocket::GetReadedBytes()
{
	return _receive_bytes + _is->GetReadedBytes();
}

SocketOptions * LocalSocket::GetSocketOptions()
{
	return new SocketOptions(_fd, JCT_TCP);
}

}