/*
 * Copyright (c) 1999-2000, Eric Moon.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions, and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions, and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * 3. The name of the author may not be used to endorse or promote products
 *    derived from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
 * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
 
 
// NodeGroup.cpp
 
#include "NodeGroup.h"
//#include "NodeGroup_transport_thread.h"
 
#include "NodeManager.h"
#include "NodeRef.h"
 
#include <MediaRoster.h>
#include <OS.h>
#include <TimeSource.h>
 
#include <algorithm>
#include <functional>
 
#include "array_delete.h"
#include "BasicThread.h"
#include "node_manager_impl.h"
#include "functional_tools.h"
 
using namespace std;
 
__USE_CORTEX_NAMESPACE
#define D_METHOD(x) //PRINT (x)
#define D_ROSTER(x) //PRINT (x)
#define D_LOCK(x) //PRINT (x)
 
 
 
// -------------------------------------------------------- //
// *** ctor/dtor
// -------------------------------------------------------- //
 
// free the group, including all nodes within it
// (this call will result in the eventual deletion of the object.)
// returns B_OK on success; B_NOT_ALLOWED if release() has
// already been called; other error codes if the Media Roster
// call fails.
// * THE MANAGER MUST BE LOCKED
 
status_t NodeGroup::release() {
 
	D_METHOD((
		"NodeGroup::release()\n"));
	
	if(isReleased())
		return B_NOT_ALLOWED;
 
	// clean up		
	lock();
 
	// halt all nodes
	_stop();
	
	// remove & release all nodes
	// +++++ causes triply-nested lock: eww!
	while(m_nodes.size()) {
		NodeRef* last = m_nodes.back();
		removeNode(m_nodes.size()-1);
		last->release();
	}
 
	unlock();
 
// [e.moon 7nov99]
// removing the released group is now NodeManager's responsibility
//
	// remove from NodeManager
	if(!m_manager->lock()) {
		ASSERT(!"* m_manager->lock() failed.\n");
	}
	m_manager->_removeGroup(this);
	m_manager->unlock();
	
	// hand off to IObservable
	return _inherited::release();	
}
 
// call release() rather than deleting NodeGroup objects
NodeGroup::~NodeGroup() {
 
	Autolock _l(this);
	D_METHOD((
		"~NodeGroup()\n"));
 
	ASSERT(!m_nodes.size());	
 
	if(m_timeSourceObj) {
		m_timeSourceObj->Release();
		m_timeSourceObj = 0;
	}
}
 
 
// -------------------------------------------------------- //
// *** accessors
// -------------------------------------------------------- //
 
// [e.moon 13oct99] moved to header
//inline uint32 NodeGroup::id() const { return m_id; }
 
// -------------------------------------------------------- //
// *** operations
// -------------------------------------------------------- //
 
// name access
const char* NodeGroup::name() const {
	Autolock _l(this);
	return m_name.String();
}
 
status_t NodeGroup::setName(const char* name) {
	Autolock _l(this);
	m_name = name;
	return B_OK;
}
 
// content access
uint32 NodeGroup::countNodes() const {
	Autolock _l(this);
	return m_nodes.size();
}
 
NodeRef* NodeGroup::nodeAt(
	uint32											index) const {
	Autolock _l(this);
	return (index < m_nodes.size()) ?
		m_nodes[index] :
		0;
}
 
// add/remove nodes:
// - you may only add a node with no current group.
// - nodes added during playback will be started;
//   nodes removed during playback will be stopped (unless
//   the NO_START_STOP transport restriction flag is set
//   for a given node.)
 
status_t NodeGroup::addNode(
	NodeRef*										node) {
	
	D_METHOD((
		"NodeGroup::addNode()\n"));
		
	// lock the manager first; if the node has no current group,
	// this locks the node.
	m_manager->lock();
	
	Autolock _l(this);
 
	// precondition: GROUP_LOCKED not set
	if(m_flags & GROUP_LOCKED)
		return B_NOT_ALLOWED;
 
	// precondition: no current group
	if(node->m_group) {
		// [e.moon 28sep99] whoops, forgot one
		PRINT((
			"!!! node already in group '%s'\n", node->m_group->name()));
 
		m_manager->unlock(); 
		return B_NOT_ALLOWED;
	}
 
	// add it
	m_nodes.push_back(node);	
	node->_setGroup(this);
 
	// release the manager
	m_manager->unlock();
	
	// first node? the transport is now ready to start
	if(m_nodes.size() == 1) {
		_changeState(TRANSPORT_INVALID, TRANSPORT_STOPPED);
	}
//
//	if(m_syncNode == media_node::null) {
//		// assign as sync node
//		setSyncNode(node->node());
//	}
//	
	// initialize the new node
	status_t err = node->_initTransportState();
	if(err < B_OK)
		return err;
 
	// set time source
	node->_setTimeSource(m_timeSource.node);
 
	// set run mode
	node->_setRunMode(m_runMode);	
 
	// add to cycle set if need be
	// +++++ should I call _cycleAddRef() instead?
	if(node->m_cycle)
		_refCycleChanged(node);
		
	if(m_transportState == TRANSPORT_RUNNING) {
		// +++++ start if necessary!
	}
	// +++++ not started if TRANSPORT_ROLLING: is that proper? [e.moon 11oct99]
	
	// send notification
	if(!LockLooper()) {
		ASSERT(!"LockLooper() failed.");
	}
	BMessage m(M_NODE_ADDED);
	m.AddInt32("groupID", id());
	m.AddInt32("nodeID", node->id());
	notify(&m);
	UnlockLooper();
	
	// success
	return B_OK;
}
		
 
status_t NodeGroup::removeNode(
	NodeRef*										node) {
 
	D_METHOD((
		"NodeGroup::removeNode()\n"));
		
	// lock the manager first; once the node is ungrouped,
	// the manager lock applies to it
	m_manager->lock();
	
	Autolock _l(this);
 
	// precondition: this must be the node's group
	if(node->m_group != this) {
		// [e.moon 28sep99] whoops, forgot one
		PRINT((
			"!!! node not in group '%s'\n", node->m_group->name()));
 
		m_manager->unlock(); 
		return B_NOT_ALLOWED;
	}
 
	// remove from the cycle set
	if(node->m_cycle)
		_cycleRemoveRef(node);
 
	// remove it
	ASSERT(m_nodes.size());
	remove(
		m_nodes.begin(),
		m_nodes.end(),
		node);
 
	// should have removed one and only one entry
	m_nodes.resize(m_nodes.size()-1);
 
//	// 6aug99: the timesource is now the sync node...
//	// is this the sync node? reassign if so
//
//	if(node->node() == m_syncNode) {
//
//		// look for another sync-capable node
//		bool found = false;
//		for(int n = 0; !found && n < m_nodes.size(); ++n)
//			if(setSyncNode(m_nodes[n]->node()) == B_OK)
//				found = true;
//
//		// no luck? admit defeat:		
//		if(!found) {
//			PRINT((
//				"* NodeGroup::removeNode(): no sync-capable nodes left!\n"));
//				
//			// +++++ stop & set to invalid state?
//			
//			setSyncNode(media_node::null);
//		}
//	}
 
	// stop the node if necessary	
	status_t err = node->_stop();
	if(err < B_OK) {
		PRINT((
			"*** NodeGroup::removeNode('%s'): error from node->_stop():\n"
			"    %s\n",
			node->name(),
			strerror(err)));
	}
 
	// clear the node's group pointer
	node->_setGroup(0);
 
	// release the manager lock; the node is now ungrouped and
	// unlocked
	m_manager->unlock();
	
	// was that the last node? stop/disable the transport if so
	if(!m_nodes.size()) {
 
		// +++++ kill sync thread(s)
		
		_changeState(TRANSPORT_INVALID);
	}
		
	// send notification
	if(!LockLooper()) {
		ASSERT(!"LockLooper() failed.");
	}
	BMessage m(M_NODE_REMOVED);
	m.AddInt32("groupID", id());
	m.AddInt32("nodeID", node->id());
	notify(&m);
	UnlockLooper();
		
	// success
	return B_OK;
}
	
status_t NodeGroup::removeNode(
	uint32											index) {
 
	D_METHOD((
		"NodeGroup::removeNode(by index)\n"));
		
	// +++++ icky nested lock
	Autolock _l(this);
 
	ASSERT(m_nodes.size() > index);
	return removeNode(m_nodes[index]);	
}
 
uint32 NodeGroup::groupFlags() const {
	Autolock _l(this);
	return m_flags;
}
 
status_t NodeGroup::setGroupFlags(
	uint32											flags) {
	Autolock _l(this);
	m_flags = flags;
	return B_OK;
}
 
 
// returns true if one or more nodes in the group have cycling
// enabled, and the start- and end-positions are valid
bool NodeGroup::canCycle() const {
	Autolock _l(this);
	
	return
		m_cycleNodes.size() > 0 &&
		m_endPosition - m_startPosition > s_minCyclePeriod;
}
 
// -------------------------------------------------------- //
// *** TRANSPORT POSITIONING (LOCK REQUIRED)
// -------------------------------------------------------- //
 
// Fetch the current transport state
	
NodeGroup::transport_state_t NodeGroup::transportState() const {
	Autolock _l(this);
	return m_transportState;
}
	
// Set the starting media time:
//   This is the point at which playback will begin in any media
//   files/documents being played by the nodes in this group.
//   When cycle mode is enabled, this is the point to which each
//   node will be seek'd at the end of each cycle (loop).
//
//   The starting time can't be changed in the B_OFFLINE run mode
//   (this call will return an error.)
 
status_t NodeGroup::setStartPosition(
	bigtime_t										start) {
	Autolock _l(this);
	
	D_METHOD((
		"NodeGroup::setStartPosition(%Ld)\n", start));
		
	if(
		m_transportState == TRANSPORT_RUNNING ||
		m_transportState == TRANSPORT_ROLLING ||
		m_transportState == TRANSPORT_STARTING) {
	
		if(m_runMode == BMediaNode::B_OFFLINE)
			return B_NOT_ALLOWED;
		
		ASSERT(m_timeSourceObj);
	
		if(_cycleValid()) {
			if(m_timeSourceObj->Now() >= m_cycleDeadline) {
				// too late to change start position; defer
//				PRINT((" - deferred\n"));
				m_newStartPosition = start;
				m_newStart = true;
				return B_OK;
			}
			
			// not at deadline yet; fall through to set start position
		}
	}
	
	m_startPosition = start;
	
	// +++++ notify [e.moon 11oct99]
 
	return B_OK;
}
	
// Fetch the starting position:
 
// +++++ if a previously-set start position was deferred, it won't be
//       returned yet
 
bigtime_t NodeGroup::startPosition() const {
	Autolock _l(this);
	
	return m_startPosition;
}
		
// Set the ending media time:
//   This is the point at which playback will end relative to
//   media documents begin played by the nodes in this group; 
//   in cycle mode, this specifies the loop point.  If the
//   ending time is less than or equal to the starting time,
//   the transport will continue until stopped manually.
//   If the end position is changed while the transport is playing,
//   it must take effect retroactively (if it's before the current
//   position and looping is enabled, all nodes must 'warp' to
//   the proper post-loop position.)
//
//   The ending time can't be changed if run mode is B_OFFLINE and
//   the transport is running (this call will return an error.)
	
status_t NodeGroup::setEndPosition(
	bigtime_t										end) {
	Autolock _l(this);
	
	D_METHOD((
		"NodeGroup::setEndPosition(%Ld)\n", end));
 
	if(
		m_transportState == TRANSPORT_RUNNING ||
		m_transportState == TRANSPORT_ROLLING ||
		m_transportState == TRANSPORT_STARTING) {
	
		if(m_runMode == BMediaNode::B_OFFLINE)
			return B_NOT_ALLOWED;
		
		ASSERT(m_timeSourceObj);
	
		bigtime_t endDelta = end - m_endPosition;
	
		if(_cycleValid()) {
			if(m_timeSourceObj->Now() >= m_cycleDeadline + endDelta) {
				// too late to change end position; defer
//				PRINT((" - deferred\n"));
				m_newEndPosition = end;
				m_newEnd = true;
				return B_OK;
			}
			else {
				// set new end position
				m_endPosition = end;
				
				// inform thread
				ASSERT(m_cyclePort);
				write_port(
					m_cyclePort,
					_CYCLE_END_CHANGED,
					0,
					0);
//			
//				// restart nodes' cycle threads with new end position
//				_cycleInit(m_cycleStart);
//				for(node_set::iterator it = m_cycleNodes.begin();
//					it != m_cycleNodes.end(); ++it) {
//					(*it)->_scheduleCycle(m_cycleBoundary);
//				}
//				return B_OK;
			}
		}
	}
		
	m_endPosition = end;
 
	// +++++ notify [e.moon 11oct99]
 
	return B_OK;
}
 
 
// Fetch the end position:
//   Note that if the end position is less than or equal to the start
//   position, it's ignored.
 
// +++++ if a previously-set end position was deferred, it won't be
//       returned yet
		
bigtime_t NodeGroup::endPosition() const {
	Autolock _l(this);
	return m_endPosition;
}
		
// -------------------------------------------------------- //
// *** TRANSPORT OPERATIONS (LOCK REQUIRED)
// -------------------------------------------------------- //
 
// Preroll the group:
//   Seeks, then prerolls, each node in the group (honoring the
//   NO_SEEK and NO_PREROLL flags.)  This ensures that the group
//   can start as quickly as possible.
//
//   Does not return until all nodes in the group have been
//   prepared.
 
status_t NodeGroup::preroll() {
	D_METHOD((
		"NodeGroup::preroll()\n"));
		
	Autolock _l(this);
	return _preroll();
}
 
// Start all nodes in the group:
//   Nodes with the NO_START_STOP flag aren't molested.
	
status_t NodeGroup::start() {
	D_METHOD((
		"NodeGroup::start()\n"));
		
	Autolock _l(this);
	return _start();
}
 
// Stop all nodes in the group:
//   Nodes with the NO_START_STOP flag aren't molested.
	
status_t NodeGroup::stop() {
	D_METHOD((
		"NodeGroup::stop()\n"));
		
	Autolock _l(this);
	return _stop();
}
 
// Roll all nodes in the group:
//   Queues a start and stop atomically (via BMediaRoster::RollNode()).
//   Returns B_NOT_ALLOWED if endPosition <= startPosition;
 
status_t NodeGroup::roll() {
	D_METHOD((
		"NodeGroup::roll()\n"));
		
	Autolock _l(this);
	return _roll();
}
 
// -------------------------------------------------------- //
// *** TIME SOURCE & RUN-MODE OPERATIONS (LOCK REQUIRED)
// -------------------------------------------------------- //
 
// time source control:
//   getTimeSource():
//   returns B_ERROR if no time source has been set; otherwise,
//   returns the node ID of the current time source for all
//   nodes in the group.
//
//   setTimeSource():
//   Calls SetTimeSourceFor() on every node in the group.
//   The group must be stopped; B_NOT_ALLOWED will be returned
//   if the state is TRANSPORT_RUNNING or TRANSPORT_ROLLING.
	
status_t NodeGroup::getTimeSource(
	media_node*									outTimeSource) const {
	Autolock _l(this);
 
	if(m_timeSource != media_node::null) {
		*outTimeSource = m_timeSource;	
		return B_OK;
	}
	return B_ERROR;
}
			
status_t NodeGroup::setTimeSource(
	const media_node&						timeSource) {
 
	Autolock _l(this);
	
	if(m_transportState == TRANSPORT_RUNNING || m_transportState == TRANSPORT_ROLLING)
		return B_NOT_ALLOWED;
	
	if(m_timeSourceObj)
		m_timeSourceObj->Release();
	
	m_timeSource = timeSource;
	
	// cache a BTimeSource*
	m_timeSourceObj = m_manager->roster->MakeTimeSourceFor(timeSource);
	ASSERT(m_timeSourceObj);
	
	// apply new time source to all nodes
	for_each(
		m_nodes.begin(),
		m_nodes.end(),
		bind2nd(
			mem_fun(&NodeRef::_setTimeSource),
			m_timeSource.node
		)
	);
	
//	// try to set as sync node
//	err = setSyncNode(timeSource);
//	if(err < B_OK) {
//		PRINT((
//			"* NodeGroup::setTimeSource(): setSyncNode() failed: %s\n",
//			strerror(err)));
//	}
	
	// notify
	if(!LockLooper()) {
		ASSERT(!"LockLooper() failed.");
	}
	BMessage m(M_TIME_SOURCE_CHANGED);
	m.AddInt32("groupID", id());
	m.AddInt32("timeSourceID", timeSource.node);
	notify(&m);
	UnlockLooper();
	
	return B_OK;
}
		
// run mode access:
//   Sets the default run mode for the group.  This will be
//   applied to every node with a wildcard (0) run mode.
//
//   Special case: if the run mode is B_OFFLINE, it will be
//   applied to all nodes in the group.
	
BMediaNode::run_mode NodeGroup::runMode() const {
	Autolock _l(this);
	return m_runMode;
}
 
status_t NodeGroup::setRunMode(BMediaNode::run_mode mode) {
	Autolock _l(this);
 
	m_runMode = mode;
	
	// apply to all nodes
	for_each(
		m_nodes.begin(),
		m_nodes.end(),
		bind2nd(
			mem_fun(&NodeRef::_setRunModeAuto),
			m_runMode
		)
//		bound_method(
//			*this,
//			&NodeGroup::setRunModeFor)
	);
	
 
	return B_OK;
}
 
// -------------------------------------------------------- //
// *** BHandler
// -------------------------------------------------------- //
 
void NodeGroup::MessageReceived(
	BMessage*										message) {
 
//	PRINT((
//		"NodeGroup::MessageReceived():\n"));
//	message->PrintToStream();
	status_t err;
	
	switch(message->what) {
		case M_SET_TIME_SOURCE:
			{
				media_node timeSource;
				void* data;
				ssize_t dataSize;
				err = message->FindData(
					"timeSourceNode",
					B_RAW_TYPE,
					(const void**)&data,
					&dataSize);
				if(err < B_OK) {
					PRINT((
						"* NodeGroup::MessageReceived(M_SET_TIME_SOURCE):\n"
						"  no timeSourceNode!\n"));
					break;
				}
				timeSource = *(media_node*)data;
 
				setTimeSource(timeSource);
			}
			break;
			
		case M_SET_RUN_MODE:
			{
				uint32 runMode;
				err = message->FindInt32("runMode", (int32*)&runMode);
				if(err < B_OK) {
					PRINT((
						"* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
						"  no runMode!\n"));
					break;
				}
				
				if(runMode < BMediaNode::B_OFFLINE ||
					runMode > BMediaNode::B_RECORDING) {
					PRINT((
						"* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
						"  invalid run mode (%" B_PRIu32 ")\n", runMode));
					break;
				}
				
				setRunMode((BMediaNode::run_mode)runMode);
			}
			break;
			
		case M_SET_START_POSITION:
			{
				bigtime_t position;
				err = message->FindInt64("position", (int64*)&position);
				if(err < B_OK) {
					PRINT((	
						"* NodeGroup::MessageReceived(M_SET_START_POSITION):\n"
						"  no position!\n"));
					break;
				}
				setStartPosition(position);
			}
			break;
	
		case M_SET_END_POSITION:
			{
				bigtime_t position;
				err = message->FindInt64("position", (int64*)&position);
				if(err < B_OK) {
					PRINT((	
						"* NodeGroup::MessageReceived(M_SET_END_POSITION):\n"
						"  no position!\n"));
					break;
				}
				setEndPosition(position);
			}
			break;
	
		case M_PREROLL:
			preroll();
			break;
 
		case M_START:
			start();
			break;
	
		case M_STOP:
			stop();
			break;
			
		case M_ROLL:
			roll();
			break;
	
		default:
			_inherited::MessageReceived(message);
			break;
	}
}
		
 
// -------------------------------------------------------- //
// *** IPersistent
// -------------------------------------------------------- //
 
// !
#if CORTEX_XML
// !
 
// +++++
 
// Default constructor
NodeGroup::NodeGroup() :
	m_manager(0) {} // +++++ finish initialization
	
 
// !
#endif /*CORTEX_XML*/
// !
 
// -------------------------------------------------------- //
// *** IObservable:		[19aug99]
// -------------------------------------------------------- //
 
void NodeGroup::observerAdded(
	const BMessenger&				observer) {
 
	BMessage m(M_OBSERVER_ADDED);
	m.AddInt32("groupID", id());
	m.AddMessenger("target", BMessenger(this));
	observer.SendMessage(&m);
}
		
void NodeGroup::observerRemoved(
	const BMessenger&				observer) {
 
	BMessage m(M_OBSERVER_REMOVED);
	m.AddInt32("groupID", id());
	m.AddMessenger("target", BMessenger(this));
	observer.SendMessage(&m);
}
 
void NodeGroup::notifyRelease() {
 
	BMessage m(M_RELEASED);
	m.AddInt32("groupID", id());
	m.AddMessenger("target", BMessenger(this));
	notify(&m);
}
 
void NodeGroup::releaseComplete() {
	// +++++
}
 
// -------------------------------------------------------- //
// *** ILockable: pass lock requests to m_lock
// -------------------------------------------------------- //
 
bool NodeGroup::lock(
	lock_t type,
	bigtime_t timeout) {
	
	D_LOCK(("*** NodeGroup::lock(): %ld\n", find_thread(0)));
	
	ASSERT(type == WRITE);
	status_t err = m_lock.LockWithTimeout(timeout);
	
	D_LOCK(("*** NodeGroup::lock() ACQUIRED: %ld\n", find_thread(0)));
 
	return err == B_OK;
}
	
bool NodeGroup::unlock(
	lock_t type) {
 
	D_LOCK(("*** NodeGroup::unlock(): %ld\n", find_thread(0)));
	
	ASSERT(type == WRITE);
	m_lock.Unlock();
 
	D_LOCK(("*** NodeGroup::unlock() RELEASED: %ld\n", find_thread(0)));
 
	return true;
}
	
bool NodeGroup::isLocked(
	lock_t type) const {
 
	ASSERT(type == WRITE);
	return m_lock.IsLocked();
}
		
// -------------------------------------------------------- //
// *** ctor (accessible to NodeManager)
// -------------------------------------------------------- //
 
NodeGroup::NodeGroup(
	const char*									name,
	NodeManager*								manager,
	BMediaNode::run_mode				runMode) :
	
	ObservableHandler(name),
	m_lock("NodeGroup::m_lock"),
	m_manager(manager),
	m_id(NextID()),
	m_name(name),
	m_flags(0),
	m_transportState(TRANSPORT_INVALID),
	m_runMode(runMode),
	m_timeSourceObj(0),
	m_released(false),
	m_cycleThread(0),
	m_cyclePort(0),
	m_startPosition(0LL),
	m_endPosition(0LL),
	m_newStart(false),
	m_newEnd(false) {
 
	ASSERT(m_manager);
	
	if(!m_manager->Lock()) {
		ASSERT(!"m_manager->Lock() failed");
	}
	m_manager->AddHandler(this);
	m_manager->Unlock();
	
	// set default time source
	media_node ts;
	D_ROSTER(("# roster->GetTimeSource()\n"));
	status_t err = m_manager->roster->GetTimeSource(&ts);
	if(err < B_OK) {
		PRINT((
			"*** NodeGroup(): roster->GetTimeSource() failed:\n"
			"    %s\n", strerror(err)));
	}
	setTimeSource(ts);
}
 
// -------------------------------------------------------- //
// *** internal operations
// -------------------------------------------------------- //
 
uint32 NodeGroup::s_nextID = 1;
uint32 NodeGroup::NextID() {
	return atomic_add((int32*)&s_nextID, 1);
}
 
// -------------------------------------------------------- //
// *** ref->group communication (LOCK REQUIRED)
// -------------------------------------------------------- //
 
// when a NodeRef's cycle state (ie. looping or not looping)
// changes, it must pass that information on via this method
 
void NodeGroup::_refCycleChanged(
	NodeRef*										ref) {
	assert_locked(this);
	D_METHOD((
		"NodeGroup::_refCycleChanged('%s')\n",
		ref->name()));
 
	if(ref->m_cycle) {
		_cycleAddRef(ref);
	}	else {
		_cycleRemoveRef(ref);
	}
 
	// +++++ if running & cycle valid, the node should be properly
	//       seek'd and start'd
}
	
 
// when a cycling node's latency changes, call this method.
 
void NodeGroup::_refLatencyChanged(
	NodeRef*										ref) {
	assert_locked(this);
	D_METHOD((
		"NodeGroup::_refLatencyChanged('%s')\n",
		ref->name()));
	
	if(!_cycleValid())
		return;
 
	// remove & replace ref (positions it properly)	
	_cycleRemoveRef(ref);
	_cycleAddRef(ref);
	
	// slap my thread up
	ASSERT(m_cyclePort);
	write_port(
		m_cyclePort,
		_CYCLE_LATENCY_CHANGED,
		0,
		0);
 
	// +++++ zat it?
}
 
// when a NodeRef receives notification that it has been stopped,
// but is labeled as still running, it must call this method.
// [e.moon 11oct99: roll/B_OFFLINE support]
 
void NodeGroup::_refStopped(
	NodeRef*										ref) {
	assert_locked(this);
	D_METHOD((
		"NodeGroup::_refStopped('%s')\n",
		ref->name()));
	
	// roll/B_OFFLINE support [e.moon 11oct99]
	// (check to see if any other nodes in the group are still running;
	//  mark group stopped if not.)
	if(m_transportState == TRANSPORT_ROLLING) {
		bool nodesRunning = false;
		for(node_set::iterator it = m_nodes.begin();
			it != m_nodes.end(); ++it) {
			if((*it)->isRunning()) {
				nodesRunning = true;
				break;
			}
		}
		if(!nodesRunning)
			// the group has stopped; update transport state
			_changeState(TRANSPORT_STOPPED);	
 
	}
	
}
 
 
// -------------------------------------------------------- //
// *** transport helpers (LOCK REQUIRED)
// -------------------------------------------------------- //
 
 
// Preroll all nodes in the group; this is the implementation
// of preroll().
// *** this method should not be called from the transport thread
// (since preroll operations can block for a relatively long time.)
	
status_t NodeGroup::_preroll() {
	assert_locked(this);
	
	D_METHOD((
		"NodeGroup::_preroll()\n"));
		
	if(
		m_transportState == TRANSPORT_RUNNING ||
		m_transportState == TRANSPORT_ROLLING)
		// too late
		return B_NOT_ALLOWED;
	
	// * preroll all nodes to the start position
 
	// +++++ currently, if an error is encountered it's ignored.
	//       should the whole operation fail if one node couldn't
	//       be prerolled?
	//
	//       My gut response is 'no', since the preroll step is
	//       optional, but the caller should have some inkling that
	//       one of its nodes didn't behave.
 
// [e.moon 13oct99] making PPC compiler happy
//	for_each(
//		m_nodes.begin(),
//		m_nodes.end(),
//		bind2nd(
//			mem_fun(&NodeRef::_preroll),
//			m_startPosition
//		)
//	);
	for(node_set::iterator it = m_nodes.begin();
		it != m_nodes.end(); ++it) {
		(*it)->_preroll(m_startPosition);
	}
 
//    replaces
//		bind2nd(
//			bound_method(*this, &NodeGroup::prerollNode),
//			m_startPosition
//		)
 
	return B_OK;
}
 
 
//// functor: calculates latency of each node it's handed, caching
//// the largest one found; includes initial latency if nodes report it.
//
//class NodeGroup::calcLatencyFn { public:
//	bigtime_t& maxLatency;
//	
//	calcLatencyFn(bigtime_t& _m) : maxLatency(_m) {}
//
//	void operator()(NodeRef* r) {
//		ASSERT(r);
//		
////		PRINT((
////			"# calcLatencyFn(): '%s'\n",
////			r->name()));
//			
//		if(!(r->node().kind & B_BUFFER_PRODUCER)) {
//			// node can't incur latency
////			PRINT((
////				"-   not a producer\n"));
//			return;
//		}
//		
//		bigtime_t latency;
//		status_t err =
//			BMediaRoster::Roster()->GetLatencyFor(
//				r->node(),
//				&latency);
//		if(err < B_OK) {
//			PRINT((
//				"* calcLatencyFn: GetLatencyFor() failed: %s\n",
//				strerror(err)));
//			return;
//		}
////		PRINT(("-   %Ld\n", latency));
//		
//		bigtime_t add;
//		err = BMediaRoster::Roster()->GetInitialLatencyFor(
//			r->node(),
//			&add);
////		PRINT(("-   %Ld\n", add));
//		if(err < B_OK) {
//			PRINT((
//				"* calcLatencyFn: GetInitialLatencyFor() failed: %s\n",
//				strerror(err)));
//		}
//		else
//			latency += add;
//		
//		if(latency > maxLatency)
//			maxLatency = latency;
//
////		PRINT((
////			"-   max latency: %Ld\n",
////			maxLatency));
//	}
//};
 
// Start all nodes in the group; this is the implementation of
// start().  Fails if the run mode is B_OFFLINE; use _roll() instead
// in that case.
//
// (this may be called from the transport thread or from
//  an API-implementation method.)
		
status_t NodeGroup::_start() {
	assert_locked(this);
	
	D_METHOD((
		"NodeGroup::_start()\n"));
	status_t err;
		
	if(m_transportState != TRANSPORT_STOPPED)
		return B_NOT_ALLOWED;
		
	if(m_runMode == BMediaNode::B_OFFLINE)
		return B_NOT_ALLOWED;
 
	ASSERT(m_nodes.size());
 
	_changeState(TRANSPORT_STARTING);
 
	// * Find the highest latency in the group
	
	bigtime_t offset = 0LL;
	calcLatencyFn _f(offset);
	for_each(
		m_nodes.begin(),
		m_nodes.end(),
		_f);
 
	offset += s_rosterLatency;
	PRINT((
		"- offset: %" B_PRIdBIGTIME "\n", offset));
	
	// * Seek all nodes (in case one or more failed to preroll)
 
	for(node_set::iterator it = m_nodes.begin();
		it != m_nodes.end(); ++it) {
		err = (*it)->_seekStopped(m_startPosition);
		if(err < B_OK) {
			PRINT((
				"! NodeGroup('%s')::_start():\n"
				"  ref('%s')->_seekStopped(%" B_PRIdBIGTIME ") failed:\n"
				"  %s\n",
				name(), (*it)->name(), m_startPosition,
				strerror(err)));
 
			// +++++ continue?
		}
	}
	
	// * Start all nodes, allowing for the max latency found
 
	ASSERT(m_timeSourceObj);
	bigtime_t when = m_timeSourceObj->Now() + offset;
 
	// 10aug99: initialize cycle (loop) settings
	if(_cycleValid()) {
		_initCycleThread();
		_cycleInit(when);
	}
	
	// start the nodes
	for(node_set::iterator it = m_nodes.begin();
		it != m_nodes.end(); ++it) {
		err = (*it)->_start(when);
		if(err < B_OK) {
			PRINT((
				"! NodeGroup('%s')::_start():\n"
				"  ref('%s')->_start(%" B_PRIdBIGTIME ") failed:\n"
				"  %s\n",
				name(), (*it)->name(), when,
				strerror(err)));
 
			// +++++ continue?
		}
	}
	
	// notify observers
	_changeState(TRANSPORT_RUNNING);
	return B_OK;	
}
	
// Stop all nodes in the group; this is the implementation of
// stop().
//
// (this may be called from the transport thread or from
//  an API-implementation method.)
		
status_t NodeGroup::_stop() {
 
	D_METHOD((
		"NodeGroup::_stop()\n"));
		
	assert_locked(this);
 
	if(
		m_transportState != TRANSPORT_RUNNING &&
		m_transportState != TRANSPORT_ROLLING)
		return B_NOT_ALLOWED;
 
	_changeState(TRANSPORT_STOPPING);
 
	// * stop the cycle thread if need be
	_destroyCycleThread();
 
	// * stop all nodes
	//   +++++ error reports would be nice
	
	for_each(
		m_nodes.begin(),
		m_nodes.end(),
		mem_fun(&NodeRef::_stop)
	);
 
	// update transport state
	_changeState(TRANSPORT_STOPPED);
		
	return B_OK;
}
 
// Roll all nodes in the group; this is the implementation of
// roll().
//
// (this may be called from the transport thread or from
//  an API-implementation method.)
 
status_t NodeGroup::_roll() {
 
	D_METHOD((
		"NodeGroup::_roll()\n"));		
	assert_locked(this);
	status_t err;
 
	if(m_transportState != TRANSPORT_STOPPED)
		return B_NOT_ALLOWED;
		
	bigtime_t period = m_endPosition - m_startPosition;
	if(period <= 0LL)
		return B_NOT_ALLOWED;
	
	_changeState(TRANSPORT_STARTING);
 
	bigtime_t tpStart = 0LL;
	bigtime_t tpStop = period;
	
	if(m_runMode != BMediaNode::B_OFFLINE) {
	
		// * Find the highest latency in the group	
		bigtime_t offset = 0LL;
		calcLatencyFn _f(offset);
		for_each(
			m_nodes.begin(),
			m_nodes.end(),
			_f);
 
		offset += s_rosterLatency;
		PRINT((
			"- offset: %" B_PRIdBIGTIME "\n", offset));
 
		ASSERT(m_timeSourceObj);
		tpStart = m_timeSourceObj->Now() + offset;
		tpStop += tpStart;
	}
	
	// * Roll all nodes; watch for errors
	bool allFailed = true;
	err = B_OK;
	for(
		node_set::iterator it = m_nodes.begin();
		it != m_nodes.end(); ++it) {
 
		status_t e = (*it)->_roll(
			tpStart,
			tpStop,
			m_startPosition);
		if(e < B_OK)
			err = e;
		else
			allFailed = false;
	}
	
	if(!allFailed)
		// notify observers
		_changeState(TRANSPORT_ROLLING);
 
	return err;	
}
	
 
// State transition; notify listeners
// +++++ [18aug99] DANGER: should notification happen in the middle
//                         of such an operation?
inline void NodeGroup::_changeState(
	transport_state_t			to) {
 
	assert_locked(this);
	
	m_transportState = to;
 
	if(!LockLooper()) {
		ASSERT(!"LockLooper() failed.");
	}
	BMessage m(M_TRANSPORT_STATE_CHANGED);
	m.AddInt32("groupID", id());
	m.AddInt32("transportState", m_transportState);
	notify(&m);
	UnlockLooper();
}
 
// Enforce a state transition, and notify listeners
inline void NodeGroup::_changeState(
	transport_state_t			from,
	transport_state_t			to) {
 
	assert_locked(this);
	ASSERT(m_transportState == from);
	
	_changeState(to);
}
 
 
// -------------------------------------------------------- //
// *** cycle thread & helpers (LOCK REQUIRED)
// -------------------------------------------------------- //
 
// *** cycle port definitions
 
const int32					_portLength			= 32;
const char* const		_portName				= "NodeGroup::m_cyclePort";
const size_t				_portMsgMaxSize	= 256;
 
 
// set up the cycle thread (including its kernel port)
status_t NodeGroup::_initCycleThread() {
	assert_locked(this);
	status_t err;
	D_METHOD((
		"NodeGroup::_initCycleThread()\n"));
 
	if(m_cycleThread) {
		// thread is still alive
		err = _destroyCycleThread();
		if(err < B_OK)
			return err;
	}
 
	// create
	m_cycleThreadDone = false;
	m_cycleThread = spawn_thread(
		&_CycleThread,
		"NodeGroup[cycleThread]",
		B_NORMAL_PRIORITY,
		(void*)this);
	if(m_cycleThread < B_OK) {
		PRINT((
			"* NodeGroup::_initCycleThread(): spawn_thread() failed:\n"
			"  %s\n",
			strerror(m_cycleThread)));
		return m_cycleThread;
	}
 
	// launch
	return resume_thread(m_cycleThread);
}
	
// shut down the cycle thread/port
status_t NodeGroup::_destroyCycleThread() {
	assert_locked(this);
	status_t err;
	D_METHOD((
		"NodeGroup::_destroyCycleThread()\n"));
 
	if(!m_cycleThread)
		return B_OK;
 
	if(!m_cycleThreadDone) {
		// kill the thread		
		ASSERT(m_cyclePort);
		err = write_port_etc(
			m_cyclePort,
			_CYCLE_STOP,
			0,
			0,
			B_TIMEOUT,
			10000LL);
 
		if(err < B_OK) {
			// bad thread.  die, thread, die.
			PRINT((
				"* NodeGroup::_destroyCycleThread(): port write failed; killing.\n"));
			delete_port(m_cyclePort);
			m_cyclePort = 0;
			kill_thread(m_cycleThread);
			m_cycleThread = 0;
			return B_OK;
		}	
	
		// the thread got the message; wait for it to quit
		unlock();
		while(wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED) {
			PRINT((
				"! wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED\n"));
		}		
		lock();
	}
	
	// it's up to the thread to close its port
	ASSERT(!m_cyclePort);
 
	m_cycleThread = 0;
				
	return B_OK;
}
 
 
// 1) do the current positions specify a valid cycle region?
// 2) are any nodes in the group cycle-enabled?
 
bool NodeGroup::_cycleValid() {
	assert_locked(this);
	return
		(m_transportState == TRANSPORT_RUNNING ||
		 m_transportState == TRANSPORT_STARTING) &&
		 canCycle();
}
 
// initialize the cycle members (call when starting)
 
void NodeGroup::_cycleInit(
	bigtime_t										startTime) {
	assert_locked(this);
	ASSERT(m_cycleNodes.size() > 0);
	D_METHOD((
		"NodeGroup::_cycleInit(%Ld)\n",
		startTime));
 
	// +++++ rescan latencies?
 
	// figure new boundary & deadline from region length
	bigtime_t cyclePeriod = m_endPosition - m_startPosition;
 
	if(cyclePeriod <= 0) {
		// cycle region is no longer valid
		m_cycleBoundary = 0LL;
		m_cycleDeadline = 0LL;
 
//		no no no -- deadlocks when the thread calls this method
//		// stop the thread
//		_destroyCycleThread();
		return;
	}
	
	m_cycleStart = startTime;
	m_cycleBoundary = startTime + cyclePeriod;
	m_cycleDeadline = m_cycleBoundary - (m_cycleMaxLatency + s_rosterLatency);
}
 
 
// add a ref to the cycle set (in proper order, based on latency)
void NodeGroup::_cycleAddRef(
	NodeRef*										ref) {
	assert_locked(this);
	
	// make sure it's not already there
	ASSERT(find(
		m_cycleNodes.begin(),
		m_cycleNodes.end(),
		ref) == m_cycleNodes.end());
 
	// [re]calc latency if 0
	if(!ref->m_latency)
		ref->_updateLatency();
 
	node_set::iterator it;
	for(it = m_cycleNodes.begin();
		it != m_cycleNodes.end(); ++it) {
		if(ref->m_latency > (*it)->m_latency) {
			m_cycleNodes.insert(it, ref);
			break;
		}
	}
	
	// not inserted? new ref belongs at the end
	if(it == m_cycleNodes.end())
		m_cycleNodes.insert(it, ref);
}
		
// remove a ref from the cycle set
void NodeGroup::_cycleRemoveRef(
	NodeRef*										ref) {
	assert_locked(this);
	
	node_set::iterator it = find(
		m_cycleNodes.begin(),
		m_cycleNodes.end(),
		ref);
	ASSERT(it != m_cycleNodes.end());
	m_cycleNodes.erase(it);
}
 
bigtime_t NodeGroup::_cycleBoundary() const {
	Autolock _l(this);
	return m_cycleBoundary;
}
 
// cycle thread impl.
/*static*/
status_t NodeGroup::_CycleThread(void* user) {
	((NodeGroup*)user)->_cycleThread();
	return B_OK;
}
 
void NodeGroup::_cycleThread() {
 
	status_t err;
	int32 code;
	int32 errorCount = 0;
 
	// +++++ liability -- if the thread has to be killed, this buffer
	//       won't be reclaimed
	char* msgBuffer = new char[_portMsgMaxSize];
	array_delete<char> _d(msgBuffer);
	
	// create port
	ASSERT(!m_cyclePort);
	m_cyclePort = create_port(
		_portLength,
		_portName);
	ASSERT(m_cyclePort >= B_OK);
	
	// the message-handling loop
	bool done = false;	
	while(!done) {
 
		// *** wait until it's time to queue the next cycle, or until
		// *** a message arrives
 
		lock();				// **** BEGIN LOCKED SECTION ****
		if(!_cycleValid()) {
			unlock();
			break;
		}
 
		ASSERT(m_cycleNodes.size() > 0);
		ASSERT(m_timeSourceObj);
 
		bigtime_t maxLatency = m_cycleNodes.front()->m_latency;
		bigtime_t wakeUpAt = m_timeSourceObj->RealTimeFor(
			m_cycleBoundary, maxLatency + s_rosterLatency);
		bigtime_t timeout = wakeUpAt - m_timeSourceObj->RealTime();
 
		if(timeout <= 0) {
			// +++++ whoops, I'm late.
			// +++++ adjust to compensate !!!
			PRINT((
				"*** NodeGroup::_cycleThread(): LATE\n"
				"    by %" B_PRIdBIGTIME "\n", -timeout));
		}
		
		// +++++ if timeout is very short, spin until the target time arrives
		
		unlock();			// **** END LOCKED SECTION ****
		
		// block until message arrives or it's time to wake up
		err = read_port_etc(
			m_cyclePort,
			&code,
			msgBuffer,
			_portMsgMaxSize,
			B_TIMEOUT,
			timeout);
 
		if(err == B_TIMED_OUT) {
			// the time has come to seek my nodes
			_handleCycleService();
			continue;
		}		
		else if(err < B_OK) {
			// any other error is bad news
			PRINT((
				"* NodeGroup::_cycleThread(): read_port error:\n"
				"  %s\n"
				"  ABORTING\n\n", strerror(err)));
			if(++errorCount > 10) {
				PRINT((
					"*** Too many errors; aborting.\n"));
				break;
			}
			continue;
		}
		
		errorCount = 0;
		
		// process the message
		switch(code) {
			case _CYCLE_STOP:
				// bail
				done = true;
				break;
				
			case _CYCLE_END_CHANGED:
			case _CYCLE_LATENCY_CHANGED:
				// fall through to next loop; for now, these messages
				// serve only to slap me out of my stupor and reassess
				// the timing situation...
				break;
				
			default:
				PRINT((
					"* NodeGroup::_cycleThread(): unknown message code '%"
						B_PRId32 "'\n", code));
				break;
		}
	} // while(!done)
 
	
	// delete port
	delete_port(m_cyclePort);
	m_cyclePort = 0;
 
	// done
	m_cycleThreadDone = true;
}
 
// cycle service: seek all nodes & initiate next cycle
void NodeGroup::_handleCycleService() {
	Autolock _l(this);
//	D_METHOD((
//		"NodeGroup::_handleCycleService()\n"));
	status_t err;
		
	if(!_cycleValid()) {
//		PRINT((
//			"- _handleCycleService(): cycle not valid; quitting.\n"));
		return;
	}
	
	// seek
	for(node_set::iterator it = m_cycleNodes.begin();
		it != m_cycleNodes.end(); ++it) {
		err = (*it)->_seek(
			m_startPosition,
			m_cycleBoundary);
		if(err < B_OK) {
			PRINT((
				"- _handleCycleService(): node('%s')::_seek() failed:\n"
				"  %s\n",
				(*it)->name(), strerror(err)));
		}
	}
 
	// update cycle settings
	if(m_newStart) {
		m_newStart = false;
		m_startPosition = m_newStartPosition;
	}
	if(m_newEnd) {
		m_newEnd = false;
		m_endPosition = m_newEndPosition;
	}
 
	// prepare next cycle
	_cycleInit(m_cycleBoundary);
}
 
// END -- NodeGroup.cpp --

V530 The return value of function 'remove' is required to be utilized.