/*
* Copyright 2012-2014, Rene Gollent, rene@gollent.com.
* Copyright 2009, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#include "Worker.h"
#include <AutoDeleter.h>
#include <AutoLocker.h>
// pragma mark - JobKey
JobKey::~JobKey()
{
}
// pragma mark - SimpleJobKey
SimpleJobKey::SimpleJobKey(const void* object, uint32 type)
:
object(object),
type(type)
{
}
SimpleJobKey::SimpleJobKey(const SimpleJobKey& other)
:
object(other.object),
type(other.type)
{
}
size_t
SimpleJobKey::HashValue() const
{
return (size_t)(addr_t)object ^ (size_t)type;
}
bool
SimpleJobKey::operator==(const JobKey& other) const
{
const SimpleJobKey* otherKey = dynamic_cast<const SimpleJobKey*>(&other);
return otherKey != NULL && object == otherKey->object
&& type == otherKey->type;
}
SimpleJobKey&
SimpleJobKey::operator=(const SimpleJobKey& other)
{
object = other.object;
type = other.type;
return *this;
}
// #pragma mark - JobListener
JobListener::~JobListener()
{
}
void
JobListener::JobStarted(Job* job)
{
}
void
JobListener::JobDone(Job* job)
{
}
void
JobListener::JobWaitingForInput(Job* job)
{
}
void
JobListener::JobFailed(Job* job)
{
}
void
JobListener::JobAborted(Job* job)
{
}
// #pragma mark - Job
Job::Job()
:
fWorker(NULL),
fState(JOB_STATE_UNSCHEDULED),
fDependency(NULL),
fWaitStatus(JOB_DEPENDENCY_NOT_FOUND),
fListeners(10)
{
}
Job::~Job()
{
}
job_wait_status
Job::WaitFor(const JobKey& key)
{
return fWorker->WaitForJob(this, key);
}
status_t
Job::WaitForUserInput()
{
return fWorker->WaitForUserInput(this);
}
void
Job::SetDescription(const char* format, ...)
{
va_list args;
va_start(args, format);
fDescription.SetToFormatVarArgs(format, args);
}
void
Job::SetWorker(Worker* worker)
{
fWorker = worker;
}
void
Job::SetState(job_state state)
{
fState = state;
}
void
Job::SetDependency(Job* job)
{
fDependency = job;
}
void
Job::SetWaitStatus(job_wait_status status)
{
fWaitStatus = status;
switch (fWaitStatus) {
case JOB_DEPENDENCY_ACTIVE:
case JOB_USER_INPUT_WAITING:
fState = JOB_STATE_WAITING;
break;
default:
fState = JOB_STATE_ACTIVE;
break;
}
}
status_t
Job::AddListener(JobListener* listener)
{
return fListeners.AddItem(listener) ? B_OK : B_NO_MEMORY;
}
void
Job::RemoveListener(JobListener* listener)
{
fListeners.RemoveItem(listener);
}
void
Job::NotifyListeners()
{
int32 count = fListeners.CountItems();
for (int32 i = count - 1; i >= 0; i--) {
JobListener* listener = fListeners.ItemAt(i);
switch (fState) {
case JOB_STATE_ACTIVE:
listener->JobStarted(this);
break;
case JOB_STATE_WAITING:
if (fWaitStatus == JOB_USER_INPUT_WAITING)
listener->JobWaitingForInput(this);
break;
case JOB_STATE_SUCCEEDED:
listener->JobDone(this);
break;
case JOB_STATE_FAILED:
listener->JobFailed(this);
break;
case JOB_STATE_ABORTED:
default:
listener->JobAborted(this);
break;
}
}
}
// #pragma mark - Worker
Worker::Worker()
:
fLock("worker"),
fWorkerThread(-1),
fTerminating(false)
{
}
Worker::~Worker()
{
ShutDown();
if (fWorkerThread >= 0)
wait_for_thread(fWorkerThread, NULL);
}
status_t
Worker::Init()
{
// check lock
status_t error = fLock.InitCheck();
if (error != B_OK)
return error;
// init jobs table
error = fJobs.Init();
if (error != B_OK)
return error;
// create semaphore for the worker
fWorkToDoSem = create_sem(0, "work to do");
if (fWorkToDoSem < 0)
return fWorkToDoSem;
// spawn worker thread
fWorkerThread = spawn_thread(_WorkerLoopEntry, "worker", B_NORMAL_PRIORITY,
this);
if (fWorkerThread < 0)
return fWorkerThread;
resume_thread(fWorkerThread);
return B_OK;
}
void
Worker::ShutDown()
{
AutoLocker<Worker> locker(this);
if (fTerminating)
return;
fTerminating = true;
// abort all jobs
Job* job = fJobs.Clear(true);
while (job != NULL) {
Job* nextJob = job->fNext;
_AbortJob(job, false);
job = nextJob;
}
// let the work thread terminate
delete_sem(fWorkToDoSem);
fWorkToDoSem = -1;
}
status_t
Worker::ScheduleJob(Job* job, JobListener* listener)
{
if (job == NULL)
return B_NO_MEMORY;
BReference<Job> jobReference(job, true);
AutoLocker<Worker> locker(this);
if (fTerminating)
return B_ERROR;
if (listener != NULL) {
status_t error = job->AddListener(listener);
if (error != B_OK)
return error;
}
bool notify = fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty();
job->SetWorker(this);
job->SetState(JOB_STATE_UNSCHEDULED);
fJobs.Insert(job);
fUnscheduledJobs.Add(jobReference.Detach());
if (notify)
release_sem(fWorkToDoSem);
return B_OK;
}
void
Worker::AbortJob(const JobKey& key)
{
AutoLocker<Worker> locker(this);
Job* job = fJobs.Lookup(key);
if (job == NULL)
return;
_AbortJob(job, true);
}
Job*
Worker::GetJob(const JobKey& key)
{
AutoLocker<Worker> locker(this);
return fJobs.Lookup(key);
}
status_t
Worker::ResumeJob(Job* job)
{
AutoLocker<Worker> locker(this);
for (JobList::Iterator it = fSuspendedJobs.GetIterator(); it.Next();) {
if (it.Current() == job) {
it.Remove();
job->SetState(JOB_STATE_UNSCHEDULED);
fUnscheduledJobs.Add(job);
release_sem(fWorkToDoSem);
return B_OK;
}
}
return B_ENTRY_NOT_FOUND;
}
bool
Worker::HasPendingJobs()
{
AutoLocker<Worker> locker(this);
return !fJobs.IsEmpty();
}
status_t
Worker::AddListener(const JobKey& key, JobListener* listener)
{
AutoLocker<Worker> locker(this);
Job* job = fJobs.Lookup(key);
if (job == NULL)
return B_ENTRY_NOT_FOUND;
return job->AddListener(listener);
}
void
Worker::RemoveListener(const JobKey& key, JobListener* listener)
{
AutoLocker<Worker> locker(this);
if (Job* job = fJobs.Lookup(key))
job->RemoveListener(listener);
}
job_wait_status
Worker::WaitForJob(Job* waitingJob, const JobKey& key)
{
AutoLocker<Worker> locker(this);
// don't wait when the game is over anyway
if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED)
return JOB_DEPENDENCY_ABORTED;
Job* job = fJobs.Lookup(key);
if (job == NULL)
return JOB_DEPENDENCY_NOT_FOUND;
waitingJob->SetWaitStatus(JOB_DEPENDENCY_ACTIVE);
waitingJob->SetDependency(job);
job->DependentJobs().Add(waitingJob);
return waitingJob->WaitStatus();
}
status_t
Worker::WaitForUserInput(Job* waitingJob)
{
AutoLocker<Worker> locker(this);
if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED)
return B_INTERRUPTED;
waitingJob->SetWaitStatus(JOB_USER_INPUT_WAITING);
waitingJob->NotifyListeners();
fSuspendedJobs.Add(waitingJob);
return B_OK;
}
/*static*/ status_t
Worker::_WorkerLoopEntry(void* data)
{
return ((Worker*)data)->_WorkerLoop();
}
status_t
Worker::_WorkerLoop()
{
_ProcessJobs();
// clean up aborted jobs
AutoLocker<Worker> locker(this);
while (Job* job = fAbortedJobs.RemoveHead())
_FinishJob(job);
return B_OK;
}
void
Worker::_ProcessJobs()
{
while (true) {
AutoLocker<Worker> locker(this);
// wait for next job
if (fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty()) {
locker.Unlock();
status_t error = acquire_sem(fWorkToDoSem);
if (error != B_OK) {
if (error == B_INTERRUPTED) {
locker.Lock();
continue;
}
break;
}
locker.Lock();
}
// clean up aborted jobs
while (Job* job = fAbortedJobs.RemoveHead())
_FinishJob(job);
// process the next job
if (Job* job = fUnscheduledJobs.RemoveHead()) {
job->SetState(JOB_STATE_ACTIVE);
job->NotifyListeners();
locker.Unlock();
status_t error = job->Do();
locker.Lock();
if (job->State() == JOB_STATE_ACTIVE) {
job->SetState(
error == B_OK ? JOB_STATE_SUCCEEDED : JOB_STATE_FAILED);
} else if (job->State() == JOB_STATE_WAITING)
continue;
_FinishJob(job);
}
}
}
void
Worker::_AbortJob(Job* job, bool removeFromTable)
{
switch (job->State()) {
case JOB_STATE_ABORTED:
return;
case JOB_STATE_UNSCHEDULED:
fUnscheduledJobs.Remove(job);
fAbortedJobs.Add(job);
break;
case JOB_STATE_WAITING:
{
Job* dependency = job->Dependency();
if (dependency != NULL)
dependency->DependentJobs().Remove(job);
job->SetDependency(NULL);
break;
}
case JOB_STATE_ACTIVE:
case JOB_STATE_FAILED:
case JOB_STATE_SUCCEEDED:
default:
break;
}
job->SetState(JOB_STATE_ABORTED);
if (removeFromTable)
fJobs.Remove(job);
}
void
Worker::_FinishJob(Job* job)
{
// wake up dependent jobs
if (!job->DependentJobs().IsEmpty()) {
job_wait_status waitStatus;
switch (job->State()) {
case JOB_STATE_ABORTED:
waitStatus = JOB_DEPENDENCY_ABORTED;
break;
case JOB_STATE_FAILED:
waitStatus = JOB_DEPENDENCY_FAILED;
break;
case JOB_STATE_SUCCEEDED:
waitStatus = JOB_DEPENDENCY_SUCCEEDED;
break;
case JOB_STATE_UNSCHEDULED:
case JOB_STATE_WAITING:
case JOB_STATE_ACTIVE:
default:
// should never happen
waitStatus = JOB_DEPENDENCY_NOT_FOUND;
break;
}
while (Job* dependentJob = job->DependentJobs().RemoveHead()) {
dependentJob->SetDependency(NULL);
dependentJob->SetWaitStatus(waitStatus);
fUnscheduledJobs.Add(dependentJob);
}
release_sem(fWorkToDoSem);
}
if (job->State() != JOB_STATE_ABORTED)
fJobs.Remove(job);
job->NotifyListeners();
job->ReleaseReference();
}
↑ V730 Not all members of a class are initialized inside the constructor. Consider inspecting: fNext.
↑ V730 Not all members of a class are initialized inside the constructor. Consider inspecting: fWorkToDoSem.