/*
* Copyright 2015, Axel Dörfler, axeld@pinc-software.de.
* Distributed under the terms of the MIT License.
*/
#include "Worker.h"
static const bigtime_t kWorkerTimeout = 1000000;
// One second until a worker thread quits without a job
static const int32 kWorkerCountPerCPU = 3;
static int32 sWorkerCount;
Worker::Worker(JobQueue& queue)
:
fThread(-1),
fJobQueue(queue)
{
}
Worker::~Worker()
{
}
status_t
Worker::Init()
{
fThread = spawn_thread(&Worker::_Process, Name(), B_NORMAL_PRIORITY,
this);
if (fThread < 0)
return fThread;
status_t status = resume_thread(fThread);
if (status == B_OK)
atomic_add(&sWorkerCount, 1);
return status;
}
status_t
Worker::Process()
{
while (true) {
BJob* job;
status_t status = fJobQueue.Pop(Timeout(), false, &job);
if (status != B_OK)
return status;
status = Run(job);
if (status != B_OK) {
// TODO: proper error reporting on failed job!
debug_printf("Launching %s failed: %s\n", job->Title().String(),
strerror(status));
}
}
}
bigtime_t
Worker::Timeout() const
{
return kWorkerTimeout;
}
const char*
Worker::Name() const
{
return "worker";
}
status_t
Worker::Run(BJob* job)
{
return job->Run();
}
/*static*/ status_t
Worker::_Process(void* _self)
{
Worker* self = (Worker*)_self;
status_t status = self->Process();
delete self;
return status;
}
// #pragma mark -
MainWorker::MainWorker(JobQueue& queue)
:
Worker(queue),
fMaxWorkerCount(kWorkerCountPerCPU)
{
// TODO: keep track of workers, and quit them on destruction
system_info info;
if (get_system_info(&info) == B_OK)
fMaxWorkerCount = info.cpu_count * kWorkerCountPerCPU;
}
bigtime_t
MainWorker::Timeout() const
{
return B_INFINITE_TIMEOUT;
}
const char*
MainWorker::Name() const
{
return "main worker";
}
status_t
MainWorker::Run(BJob* job)
{
int32 count = atomic_get(&sWorkerCount);
size_t jobCount = fJobQueue.CountJobs();
if (jobCount > INT_MAX)
jobCount = INT_MAX;
if ((int32)jobCount > count && count < fMaxWorkerCount) {
Worker* worker = new Worker(fJobQueue);
worker->Init();
}
return Worker::Run(job);
}
↑ V773 Visibility scope of the 'worker' pointer was exited without releasing the memory. A memory leak is possible.