jmessagequeue.cpp 5.29 KB
/***************************************************************************
 *   Copyright (C) 2005 by Jeff Ferr                                       *
 *   root@sat                                                              *
 *                                                                         *
 *   This program is free software; you can redistribute it and/or modify  *
 *   it under the terms of the GNU General Public License as published by  *
 *   the Free Software Foundation; either version 2 of the License, or     *
 *   (at your option) any later version.                                   *
 *                                                                         *
 *   This program is distributed in the hope that it will be useful,       *
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
 *   GNU General Public License for more details.                          *
 *                                                                         *
 *   You should have received a copy of the GNU General Public License     *
 *   along with this program; if not, write to the                         *
 *   Free Software Foundation, Inc.,                                       *
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
 ***************************************************************************/
#include "Stdafx.h"
#include "jmessagequeue.h"
#include "jmessageexception.h"

namespace jshared {

#ifdef _WIN32
MessageQueue::MessageQueue(int key_, int perms_, int maxsize_, int maxmsgs_):
#else 
MessageQueue::MessageQueue(key_t key_, int perms_, int size_, int maxmsgs_):
#endif
	jcommon::Object()
{
	jcommon::Object::SetClassName("jshared::MessageQueue");
	
#ifdef _WIN32
	/* TODO::
	MSGQUEUEOPTIONS_OS desc;

	if (maxsize_ < 4) {
		maxsize_ = 4;
	}

	if (maxmsgs_ < 0) {
		maxmsgs_ = 0;
	}

	desc.dwSize = maxsize_;
	desc.dwFlags = MSGQUEUE_NOPRECOMMIT; // MSGQUEUE_ALLOW_BROKEN
	desc.dwMaxMessages = maxmsgs_;
	desc.cbMaxMessage = maxsize_;
	desc.bReadAccess = ((paerms_ & 0444) != 0)?TRUE:FALSE;

	_id = CreateMsgQueue(key_.c_str(), &desc);
	*/
#else
	_id = msgget(key_, IPC_CREAT | IPC_EXCL | perms_);

	if (_id < 0) {
		_id = msgget(key_, 0);
		
		if (_id < 0) {
			if (errno == EACCES) {
				throw MessageException("No access permissions to the queue");
			} else {
				throw MessageException("Error opening a message queue");
			}
		}
	}
#endif
	
	_is_open = true;
	_blocking = true;
}

MessageQueue::~MessageQueue()
{
}

int MessageQueue::GetMessageIndex()
{
#ifdef _WIN32
	/* TODO::
	MSGQUEUEINFO info;

	GetMsgQueueInfo(_id, &info);

	return info.dwCurrentMessages;
	*/

	return -1;
#else
	struct msqid_ds buf;
	
	int r = msgctl (_id, IPC_STAT, &buf);

	if (r < 0) {
		buf.msg_qnum = 0;
	}
	
	return buf.msg_qnum;
#endif
}

void MessageQueue::Send(const void *msg_, int size_, long type_)
{
    if (_is_open == false) {
		throw MessageException("Message queue is released");
    }

	if (msg_ == NULL) {
		throw MessageException("Null pointer exception");
	}

	if (type_ <= 0) {
		type_ = 1;
	}
	
#ifdef _WIN32
	/* TODO::
	msg_t msg;

	msg.msg.mtype = type_;
	memcpy(&(msg.msg.msg), msg_, size_);
	
	BOOL b = WriteMsgQueue(
		_id,
		msg.buf,
		sizeof(msg_t) - sizeof(long),
		INFINITE,
		MSGQUEUE_MSGALERT 
	);

	if (b == FALSE) {
		throw MessageException("Send message error");
	}
	*/
#else
	msg_t msg;

	msg.msg.mtype = type_;
	memcpy(&(msg.msg.msg), msg_, size_);
	
	int r, 
		msz = sizeof(msg_t) - sizeof(long);

	if (_blocking == true) {
		r = msgsnd(_id, &msg.buf, msz, 0);
	} else {
		r = msgsnd(_id, &msg.buf, msz, IPC_NOWAIT);
	}

	if (r < 0) {
		if (errno == EAGAIN) {
			throw MessageException("Buffer is fully");
		} else {
			throw MessageException("Send message error");
		}
	}
#endif
}

void MessageQueue::Receive(void *msg_, int size_, long type_, int flag_)
{
    if (_is_open == false) {
		 throw MessageException("Message queue is released");
    }

	if (msg_ == NULL) {
		 throw MessageException("Null pointer exception");
	}
	
#ifdef _WIN32
	/* TODO::
	BOOL b;
	int r;
	msg_t msg;

	msg.buf.mtype = type_;

	b= ReadMsgQueue(
		_id,
		&msg.buf,
		sizeof(msg_t) - sizeof(long),
		&r,
		INFINITE,
		MSGQUEUE_MSGALERT
	);

	if (b == FALSE) {
		throw MessageException("Receive message error");
	}
	*/
#else
	msg_t msg;

	msg.buf.mtype = type_;
	
	int r,
		msz = sizeof(msg_t) - sizeof(long);
	
	do {
		if (_blocking == true) {
			r = msgrcv(_id, &msg.buf, msz, type_, 0 | flag_);
		} else {
			r = msgrcv(_id, &msg.buf, msz, type_, IPC_NOWAIT | flag_);
		}
	} while (r == -1 && errno == EINTR);

	if (r < 0) {
		msg_ = 0;
		
		if (errno == ENOMSG) {
			throw MessageException("Receive message error");
		}
	}

	memcpy(msg_, &(msg.msg.msg), size_);
#endif
}

void MessageQueue::SetBlocking(bool b_)
{
	_blocking = b_;
}

void MessageQueue::Release()
{
#ifdef _WIN32
	/* TODO::
	BOOL b = CloseMsgQueue(_id);

	if (b == FALSE) {
		throw MessageException("Release message queue failed");
	}
	*/
#else
	int r = msgctl(_id, IPC_RMID, 0);

	if (r < 0) {
		if (errno == EACCES) {
			throw MessageException("No access permissions to remove the queue");
		} else if (errno == EIDRM) {
			throw MessageException("Message queue was removed");
		} else {
			throw MessageException("Error releasing the message queue");
		}
	}
#endif
}

}