/*
* Copyright 2009, Colin Günther, coling@gmx.de
* Copyright 2007, Hugo Santos. All Rights Reserved.
* Distributed under the terms of the MIT License.
*
* Authors:
* Hugo Santos, hugosantos@gmail.com
*/
#include "device.h"
#include <stdio.h>
#include <compat/sys/callout.h>
#include <compat/sys/taskqueue.h>
#include <compat/sys/haiku-module.h>
#define TQ_FLAGS_ACTIVE (1 << 0)
#define TQ_FLAGS_BLOCKED (1 << 1)
#define TQ_FLAGS_PENDING (1 << 2)
#define DT_CALLOUT_ARMED (1 << 0)
#define DT_DRAIN_IN_PROGRESS (1 << 1)
struct taskqueue {
char tq_name[TASKQUEUE_NAMELEN];
struct mtx tq_mutex;
struct list tq_list;
taskqueue_enqueue_fn tq_enqueue;
void *tq_arg;
int tq_fast;
spinlock tq_spinlock;
sem_id tq_sem;
thread_id *tq_threads;
thread_id tq_thread_storage;
int tq_threadcount;
int tq_flags;
int tq_callouts;
};
struct taskqueue *taskqueue_fast = NULL;
struct taskqueue *taskqueue_swi = NULL;
struct taskqueue *taskqueue_thread = NULL;
static struct taskqueue *
_taskqueue_create(const char *name, int mflags, int fast,
taskqueue_enqueue_fn enqueueFunction, void *context)
{
struct taskqueue *tq = malloc(sizeof(struct taskqueue));
if (tq == NULL)
return NULL;
tq->tq_fast = fast;
if (fast) {
B_INITIALIZE_SPINLOCK(&tq->tq_spinlock);
} else {
mtx_init(&tq->tq_mutex, name, NULL, MTX_DEF);
}
strlcpy(tq->tq_name, name, sizeof(tq->tq_name));
list_init_etc(&tq->tq_list, offsetof(struct task, ta_link));
tq->tq_enqueue = enqueueFunction;
tq->tq_arg = context;
tq->tq_sem = -1;
tq->tq_threads = NULL;
tq->tq_threadcount = 0;
tq->tq_flags = TQ_FLAGS_ACTIVE;
tq->tq_callouts = 0;
return tq;
}
static void
tq_lock(struct taskqueue *taskQueue, cpu_status *status)
{
if (taskQueue->tq_fast) {
*status = disable_interrupts();
acquire_spinlock(&taskQueue->tq_spinlock);
} else {
mtx_lock(&taskQueue->tq_mutex);
}
}
static void
tq_unlock(struct taskqueue *taskQueue, cpu_status status)
{
if (taskQueue->tq_fast) {
release_spinlock(&taskQueue->tq_spinlock);
restore_interrupts(status);
} else {
mtx_unlock(&taskQueue->tq_mutex);
}
}
struct taskqueue *
taskqueue_create(const char *name, int mflags,
taskqueue_enqueue_fn enqueueFunction, void *context)
{
return _taskqueue_create(name, mflags, 0, enqueueFunction, context);
}
static int32
tq_handle_thread(void *data)
{
struct taskqueue *tq = data;
cpu_status cpu_state;
struct task *t;
int pending;
sem_id sem;
/* just a synchronization point */
tq_lock(tq, &cpu_state);
sem = tq->tq_sem;
tq_unlock(tq, cpu_state);
while (acquire_sem(sem) == B_NO_ERROR) {
tq_lock(tq, &cpu_state);
t = list_remove_head_item(&tq->tq_list);
tq_unlock(tq, cpu_state);
if (t == NULL)
continue;
pending = t->ta_pending;
t->ta_pending = 0;
t->ta_handler(t->ta_argument, pending);
}
return 0;
}
static int
_taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority,
const char *name)
{
struct taskqueue *tq = (*taskQueue);
int i, j;
if (count == 0)
return -1;
if (tq->tq_threads != NULL)
return -1;
if (count == 1) {
tq->tq_threads = &tq->tq_thread_storage;
} else {
tq->tq_threads = malloc(sizeof(thread_id) * count);
if (tq->tq_threads == NULL)
return B_NO_MEMORY;
}
tq->tq_sem = create_sem(0, tq->tq_name);
if (tq->tq_sem < B_OK) {
if (count > 1)
free(tq->tq_threads);
tq->tq_threads = NULL;
return tq->tq_sem;
}
for (i = 0; i < count; i++) {
tq->tq_threads[i] = spawn_kernel_thread(tq_handle_thread, tq->tq_name,
priority, tq);
if (tq->tq_threads[i] < B_OK) {
status_t status = tq->tq_threads[i];
for (j = 0; j < i; j++)
kill_thread(tq->tq_threads[j]);
if (count > 1)
free(tq->tq_threads);
tq->tq_threads = NULL;
delete_sem(tq->tq_sem);
return status;
}
}
tq->tq_threadcount = count;
for (i = 0; i < count; i++)
resume_thread(tq->tq_threads[i]);
return 0;
}
int
taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority,
const char *format, ...)
{
/* we assume that start_threads is called in a sane place, and thus
* don't need to be locked. This is mostly due to the fact that if
* the TQ is 'fast', locking the TQ disables interrupts... and then
* we can't create semaphores, threads and bananas. */
/* cpu_status state; */
char name[64];
int result;
va_list vl;
va_start(vl, format);
vsnprintf(name, sizeof(name), format, vl);
va_end(vl);
/*tq_lock(*tqp, &state);*/
result = _taskqueue_start_threads(taskQueue, count, priority, name);
/*tq_unlock(*tqp, state);*/
return result;
}
void
taskqueue_free(struct taskqueue *taskQueue)
{
if (taskQueue == NULL) {
printf("taskqueue_free called with NULL taskqueue\n");
return;
}
/* lock and drain list? */
taskQueue->tq_flags &= ~TQ_FLAGS_ACTIVE;
if (!taskQueue->tq_fast)
mtx_destroy(&taskQueue->tq_mutex);
if (taskQueue->tq_sem != -1) {
int i;
delete_sem(taskQueue->tq_sem);
for (i = 0; i < taskQueue->tq_threadcount; i++) {
status_t status;
wait_for_thread(taskQueue->tq_threads[i], &status);
}
if (taskQueue->tq_threadcount > 1)
free(taskQueue->tq_threads);
}
free(taskQueue);
}
void
taskqueue_drain(struct taskqueue *taskQueue, struct task *task)
{
cpu_status status;
if (taskQueue == NULL) {
printf("taskqueue_drain called with NULL taskqueue\n");
return;
}
tq_lock(taskQueue, &status);
while (task->ta_pending != 0) {
tq_unlock(taskQueue, status);
snooze(0);
tq_lock(taskQueue, &status);
}
tq_unlock(taskQueue, status);
}
void
taskqueue_drain_timeout(struct taskqueue *queue,
struct timeout_task *timeout_task)
{
cpu_status status;
/*
* Set flag to prevent timer from re-starting during drain:
*/
tq_lock(queue, &status);
KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
("Drain already in progress"));
timeout_task->f |= DT_DRAIN_IN_PROGRESS;
tq_unlock(queue, status);
callout_drain(&timeout_task->c);
taskqueue_drain(queue, &timeout_task->t);
/*
* Clear flag to allow timer to re-start:
*/
tq_lock(queue, &status);
timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
tq_unlock(queue, status);
}
static void
taskqueue_task_nop_fn(void* context, int pending)
{
}
void
taskqueue_drain_all(struct taskqueue *taskQueue)
{
struct task t_barrier;
if (taskQueue == NULL) {
printf("taskqueue_drain_all called with NULL taskqueue\n");
return;
}
TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier);
taskqueue_enqueue(taskQueue, &t_barrier);
taskqueue_drain(taskQueue, &t_barrier);
}
static void
taskqueue_enqueue_locked(struct taskqueue *taskQueue, struct task *task,
cpu_status status)
{
/* we don't really support priorities */
if (task->ta_pending) {
task->ta_pending++;
} else {
list_add_item(&taskQueue->tq_list, task);
task->ta_pending = 1;
if ((taskQueue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
taskQueue->tq_enqueue(taskQueue->tq_arg);
else
taskQueue->tq_flags |= TQ_FLAGS_PENDING;
}
tq_unlock(taskQueue, status);
}
int
taskqueue_enqueue(struct taskqueue *taskQueue, struct task *task)
{
cpu_status status;
tq_lock(taskQueue, &status);
taskqueue_enqueue_locked(taskQueue, task, status);
/* The lock is released inside. */
return 0;
}
static void
taskqueue_timeout_func(void *arg)
{
struct taskqueue *queue;
struct timeout_task *timeout_task;
cpu_status status;
// dummy, as we should never get here on a spin taskqueue
timeout_task = arg;
queue = timeout_task->q;
KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
timeout_task->f &= ~DT_CALLOUT_ARMED;
queue->tq_callouts--;
taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t, status);
/* The lock is released inside. */
}
int
taskqueue_enqueue_timeout(struct taskqueue *queue,
struct timeout_task *ttask, int _ticks)
{
int res;
cpu_status status;
tq_lock(queue, &status);
KASSERT(ttask->q == NULL || ttask->q == queue,
("Migrated queue"));
ttask->q = queue;
res = ttask->t.ta_pending;
if (ttask->f & DT_DRAIN_IN_PROGRESS) {
/* Do nothing */
tq_unlock(queue, status);
res = -1;
} else if (_ticks == 0) {
tq_unlock(queue, status);
taskqueue_enqueue(queue, &ttask->t);
} else {
if ((ttask->f & DT_CALLOUT_ARMED) != 0) {
res++;
} else {
queue->tq_callouts++;
ttask->f |= DT_CALLOUT_ARMED;
if (_ticks < 0)
_ticks = -_ticks; /* Ignore overflow. */
}
tq_unlock(queue, status);
if (_ticks > 0) {
callout_reset(&ttask->c, _ticks,
taskqueue_timeout_func, ttask);
}
}
return (res);
}
static int
taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
u_int *pendp)
{
if (task->ta_pending > 0)
list_remove_item(&queue->tq_list, task);
if (pendp != NULL)
*pendp = task->ta_pending;
task->ta_pending = 0;
return 0;
}
int
taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
{
int error;
cpu_status status;
tq_lock(queue, &status);
error = taskqueue_cancel_locked(queue, task, pendp);
tq_unlock(queue, status);
return (error);
}
int
taskqueue_cancel_timeout(struct taskqueue *queue,
struct timeout_task *timeout_task, u_int *pendp)
{
u_int pending, pending1;
int error;
cpu_status status;
tq_lock(queue, &status);
pending = !!(callout_stop(&timeout_task->c) > 0);
error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
timeout_task->f &= ~DT_CALLOUT_ARMED;
queue->tq_callouts--;
}
tq_unlock(queue, status);
if (pendp != NULL)
*pendp = pending + pending1;
return (error);
}
void
taskqueue_thread_enqueue(void *context)
{
struct taskqueue **tqp = context;
release_sem_etc((*tqp)->tq_sem, 1, B_DO_NOT_RESCHEDULE);
}
int
taskqueue_enqueue_fast(struct taskqueue *taskQueue, struct task *task)
{
return taskqueue_enqueue(taskQueue, task);
}
struct taskqueue *
taskqueue_create_fast(const char *name, int mflags,
taskqueue_enqueue_fn enqueueFunction, void *context)
{
return _taskqueue_create(name, mflags, 1, enqueueFunction, context);
}
void
task_init(struct task *task, int prio, task_fn_t handler, void *context)
{
task->ta_priority = prio;
task->ta_handler = handler;
task->ta_argument = context;
task->ta_pending = 0;
}
void
timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
int priority, task_fn_t func, void *context)
{
TASK_INIT(&timeout_task->t, priority, func, context);
callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
CALLOUT_RETURNUNLOCKED);
timeout_task->q = queue;
timeout_task->f = 0;
}
status_t
init_taskqueues()
{
status_t status = B_NO_MEMORY;
if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) {
taskqueue_fast = taskqueue_create_fast("fast taskq", 0,
taskqueue_thread_enqueue, &taskqueue_fast);
if (taskqueue_fast == NULL)
return B_NO_MEMORY;
status = taskqueue_start_threads(&taskqueue_fast, 1,
B_REAL_TIME_PRIORITY, "fast taskq thread");
if (status < B_OK)
goto err_1;
}
if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) {
taskqueue_swi = taskqueue_create_fast("swi taskq", 0,
taskqueue_thread_enqueue, &taskqueue_swi);
if (taskqueue_swi == NULL) {
status = B_NO_MEMORY;
goto err_1;
}
status = taskqueue_start_threads(&taskqueue_swi, 1,
B_REAL_TIME_PRIORITY, "swi taskq");
if (status < B_OK)
goto err_2;
}
if (HAIKU_DRIVER_REQUIRES(FBSD_THREAD_TASKQUEUE)) {
taskqueue_thread = taskqueue_create_fast("thread taskq", 0,
taskqueue_thread_enqueue, &taskqueue_thread);
if (taskqueue_thread == NULL) {
status = B_NO_MEMORY;
goto err_2;
}
status = taskqueue_start_threads(&taskqueue_thread, 1,
B_REAL_TIME_PRIORITY, "swi taskq");
if (status < B_OK)
goto err_3;
}
return B_OK;
err_3:
if (taskqueue_thread)
taskqueue_free(taskqueue_thread);
err_2:
if (taskqueue_swi)
taskqueue_free(taskqueue_swi);
err_1:
if (taskqueue_fast)
taskqueue_free(taskqueue_fast);
return status;
}
void
uninit_taskqueues()
{
if (HAIKU_DRIVER_REQUIRES(FBSD_THREAD_TASKQUEUE))
taskqueue_free(taskqueue_thread);
if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE))
taskqueue_free(taskqueue_swi);
if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE))
taskqueue_free(taskqueue_fast);
}
void
taskqueue_block(struct taskqueue *taskQueue)
{
cpu_status status;
tq_lock(taskQueue, &status);
taskQueue->tq_flags |= TQ_FLAGS_BLOCKED;
tq_unlock(taskQueue, status);
}
void
taskqueue_unblock(struct taskqueue *taskQueue)
{
cpu_status status;
tq_lock(taskQueue, &status);
taskQueue->tq_flags &= ~TQ_FLAGS_BLOCKED;
if (taskQueue->tq_flags & TQ_FLAGS_PENDING) {
taskQueue->tq_flags &= ~TQ_FLAGS_PENDING;
taskQueue->tq_enqueue(taskQueue->tq_arg);
}
tq_unlock(taskQueue, status);
}
↑ V614 Uninitialized variable 'status' used. Consider checking the third actual argument of the 'taskqueue_enqueue_locked' function.