/*
 * Copyright 2012-2013 Haiku, Inc. All rights reserved.
 * Distributed under the terms of the MIT License.
 *
 * Authors:
 *		Paweł Dziepak, pdziepak@quarnos.org
 */
 
 
#include "RPCCallbackServer.h"
 
#include "NFS4Defs.h"
#include "RPCCallback.h"
#include "RPCCallbackReply.h"
#include "RPCCallbackRequest.h"
#include "RPCServer.h"
 
 
using namespace RPC;
 
 
CallbackServer* gRPCCallbackServer		= NULL;
CallbackServer* gRPCCallbackServer6		= NULL;
 
 
CallbackServer::CallbackServer(int networkFamily)
	:
	fConnectionList(NULL),
	fListener(NULL),
	fThreadRunning(false),
	fCallbackArray(NULL),
	fArraySize(0),
	fFreeSlot(-1),
	fNetworkFamily(networkFamily)
{
	mutex_init(&fConnectionLock, NULL);
	mutex_init(&fThreadLock, NULL);
	rw_lock_init(&fArrayLock, NULL);
}
 
 
CallbackServer::~CallbackServer()
{
	StopServer();
 
	free(fCallbackArray);
	rw_lock_destroy(&fArrayLock);
	mutex_destroy(&fThreadLock);
	mutex_destroy(&fConnectionLock);
}
 
 
CallbackServer*
CallbackServer::Get(Server* server)
{
	ASSERT(server != NULL);
 
	int family = server->ID().Family();
	ASSERT(family == AF_INET || family == AF_INET6);
 
	int idx;
	switch (family) {
		case AF_INET:
			idx = 0;
			break;
		case AF_INET6:
			idx = 1;
			break;
		default:
			return NULL;
	}
 
	MutexLocker _(fServerCreationLock);
	if (fServers[idx] == NULL)
		fServers[idx] = new CallbackServer(family);
	return fServers[idx];
}
 
 
void
CallbackServer::ShutdownAll()
{
	MutexLocker _(fServerCreationLock);
	for (unsigned int i = 0; i < sizeof(fServers) / sizeof(fServers[0]); i++)
		delete fServers[i];
	memset(&fServers, 0, sizeof(fServers));
}
 
 
mutex			CallbackServer::fServerCreationLock = MUTEX_INITIALIZER(NULL);
CallbackServer*	CallbackServer::fServers[2] = { NULL, NULL };
 
 
status_t
CallbackServer::RegisterCallback(Callback* callback)
{
	ASSERT(callback != NULL);
 
	status_t result = StartServer();
	if (result != B_OK)
		return result;
 
	WriteLocker _(fArrayLock);
	if (fFreeSlot == -1) {
		uint32 newSize = max_c(fArraySize * 2, 4);
		uint32 size = newSize * sizeof(CallbackSlot);
		CallbackSlot* array	= reinterpret_cast<CallbackSlot*>(malloc(size));
		if (array == NULL)
			return B_NO_MEMORY;
 
		if (fCallbackArray != NULL)
			memcpy(array, fCallbackArray, fArraySize * sizeof(CallbackSlot));
 
		for (uint32 i = fArraySize; i < newSize; i++)
			array[i].fNext = i + 1;
 
		array[newSize - 1].fNext = -1;
 
		fCallbackArray = array;
		fFreeSlot = fArraySize;
		fArraySize = newSize;
	}
 
	int32 id = fFreeSlot;
	fFreeSlot = fCallbackArray[id].fNext;
 
	fCallbackArray[id].fCallback = callback;
	callback->SetID(id);
	callback->SetCBServer(this);
 
	return B_OK;
}
 
 
status_t
CallbackServer::UnregisterCallback(Callback* callback)
{
	ASSERT(callback != NULL);
	ASSERT(callback->CBServer() == this);
 
	int32 id = callback->ID();
 
	WriteLocker _(fArrayLock);
	fCallbackArray[id].fNext = fFreeSlot;
	fFreeSlot = id;
 
	callback->SetCBServer(NULL);
	return B_OK;
}
 
 
status_t
CallbackServer::StartServer()
{
	MutexLocker _(fThreadLock);
	if (fThreadRunning)
		return B_OK;
 
	status_t result = ConnectionListener::Listen(&fListener, fNetworkFamily);
	if (result != B_OK)
		return result;
 
	fThread = spawn_kernel_thread(&CallbackServer::ListenerThreadLauncher,
		"NFSv4 Callback Listener", B_NORMAL_PRIORITY, this);
	if (fThread < B_OK)
		return fThread;
 
	fThreadRunning = true;
 
	result = resume_thread(fThread);
	if (result != B_OK) {
		kill_thread(fThread);
		fThreadRunning = false;
		return result;
	}
 
	return B_OK;
}
 
 
status_t
CallbackServer::StopServer()
{
	MutexLocker _(&fThreadLock);
	if (!fThreadRunning)
		return B_OK;
 
	fListener->Disconnect();
	status_t result;
	wait_for_thread(fThread, &result);
 
	MutexLocker locker(fConnectionLock);
	while (fConnectionList != NULL) {
		ConnectionEntry* entry = fConnectionList;
		fConnectionList = entry->fNext;
		entry->fConnection->Disconnect();
 
		status_t result;
		wait_for_thread(entry->fThread, &result);
 
		delete entry->fConnection;
		delete entry;
	}
 
	delete fListener;
 
	fThreadRunning = false;
	return B_OK;
}
 
 
status_t
CallbackServer::NewConnection(Connection* connection)
{
	ASSERT(connection != NULL);
 
	ConnectionEntry* entry = new ConnectionEntry;
	entry->fConnection = connection;
	entry->fPrev = NULL;
 
	MutexLocker locker(fConnectionLock);
	entry->fNext = fConnectionList;
	if (fConnectionList != NULL)
		fConnectionList->fPrev = entry;
	fConnectionList = entry;
	locker.Unlock();
 
	void** arguments = reinterpret_cast<void**>(malloc(sizeof(void*) * 2));
	if (arguments == NULL)
		return B_NO_MEMORY;
 
	arguments[0] = this;
	arguments[1] = entry;
 
	thread_id thread;
	thread = spawn_kernel_thread(&CallbackServer::ConnectionThreadLauncher,
		"NFSv4 Callback Connection", B_NORMAL_PRIORITY, arguments);
	if (thread < B_OK) {
		ReleaseConnection(entry);
		free(arguments);
		return thread;
	}
 
	entry->fThread = thread;
 
	status_t result = resume_thread(thread);
	if (result != B_OK) {
		kill_thread(thread);
		ReleaseConnection(entry);
		free(arguments);
		return result;
	}
 
	return B_OK;
}
 
 
status_t
CallbackServer::ReleaseConnection(ConnectionEntry* entry)
{
	ASSERT(entry != NULL);
 
	MutexLocker _(fConnectionLock);
	if (entry->fNext != NULL)
		entry->fNext->fPrev = entry->fPrev;
	if (entry->fPrev != NULL)
		entry->fPrev->fNext = entry->fNext;
	else
		fConnectionList = entry->fNext;
 
	delete entry->fConnection;
	delete entry;
	return B_OK;
}
 
 
status_t
CallbackServer::ConnectionThreadLauncher(void* object)
{
	ASSERT(object != NULL);
 
	void** objects = reinterpret_cast<void**>(object);
	CallbackServer* server = reinterpret_cast<CallbackServer*>(objects[0]);
	ConnectionEntry* entry = reinterpret_cast<ConnectionEntry*>(objects[1]);
	free(objects);
 
	return server->ConnectionThread(entry);
}
 
 
status_t
CallbackServer::ConnectionThread(ConnectionEntry* entry)
{
	ASSERT(entry != NULL);
 
	Connection* connection = entry->fConnection;
	CallbackReply* reply;
 
	while (fThreadRunning) {
		uint32 size;
		void* buffer;
		status_t result = connection->Receive(&buffer, &size);
		if (result != B_OK) {
			if (result != ECONNABORTED)
				ReleaseConnection(entry);
			return result;
		}
 
		CallbackRequest* request
			= new(std::nothrow) CallbackRequest(buffer, size);
		if (request == NULL) {
			free(buffer);
			continue;	
		} else if (request->Error() != B_OK) {
			reply = CallbackReply::Create(request->XID(), request->RPCError());
			if (reply != NULL) {
				connection->Send(reply->Stream().Buffer(),
					reply->Stream().Size());
				delete reply;
			}
			delete request;
			continue;
		}
 
		switch (request->Procedure()) {
			case CallbackProcCompound:
				GetCallback(request->ID())->EnqueueRequest(request, connection);
				break;
 
			case CallbackProcNull:
				reply = CallbackReply::Create(request->XID());
				if (reply != NULL) {
					connection->Send(reply->Stream().Buffer(),
						reply->Stream().Size());
					delete reply;
				}
 
			default:
				delete request;
		}
	}
 
	return B_OK;
}
 
 
status_t
CallbackServer::ListenerThreadLauncher(void* object)
{
	ASSERT(object != NULL);
 
	CallbackServer* server = reinterpret_cast<CallbackServer*>(object);
	return server->ListenerThread();
}
 
 
status_t
CallbackServer::ListenerThread()
{
	while (fThreadRunning) {
		Connection* connection;
 
		status_t result = fListener->AcceptConnection(&connection);
		if (result != B_OK) {
			fThreadRunning = false;
			return result;
		}
		result = NewConnection(connection);
		if (result != B_OK)
			delete connection;
	}
 
	return B_OK;
}
 

V730 Not all members of a class are initialized inside the constructor. Consider inspecting: fThread.