Here is the modified code. My problem is, that when i send a message over a fifo to the runtime_system I can only start one working thread. The other problem is, that he is doing the work/job the hole time. After it finished the job, the worker starts from beginning:
Code:
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include "pthread.h"
#include "runtime_system.h"
#define THREADS_PER_SET 100
typedef enum {
UNUSED = 0,
PENDING,
WORKING,
SUCCESS,
FAILURE,
CANCELED
} status_t;
struct invade_job_struct {
struct invade_job_struct *next;
volatile int canceled;
pthread_mutex_t mutex;
status_t status;
/* Job parameters. This one iterates n times sleep(1)
*/
unsigned long n;
};
static pthread_mutex_t work_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t work_cond = PTHREAD_COND_INITIALIZER;
static struct invade_job_struct *invade_queue_head = NULL;
static struct invade_job_struct *invade_queue_ptr = NULL;
/*
* job worker function
*/
#define DEBUG 1
status_t invade(struct invade_job_struct *const job)
{
volatile int *const canceld = &job->canceled;
unsigned long n = job->n;
while (n > 0UL) {
if (*canceld) {
return CANCELED;
}
n--;
sleep(1);
if (DEBUG) {
printf("Thread %u : invade = %lu\n", (unsigned int)pthread_self(), n);
fflush(stdout);
}
}
return SUCCESS;
}
/* Invade thread function
*
*/
void *invade_thread_worker(void *payload __attribute__((unused)))
{
struct invade_job_struct *job;
status_t status;
pthread_mutex_lock(&work_mutex);
while (1) {
/* no job in queue */
if (!invade_queue_head) {
pthread_cond_wait(&work_cond, &work_mutex);
continue;
}
/* grep first job element of invade queue */
job = invade_queue_head;
if (invade_queue_head->next) {
invade_queue_ptr = invade_queue_head->next;
invade_queue_head = invade_queue_ptr;
}
else {
invade_queue_ptr = NULL;
invade_queue_head = NULL;
}
pthread_mutex_lock(&job->mutex);
if (job->status == PENDING) {
/* work on it */
job->status = WORKING;
pthread_mutex_unlock(&job->mutex);
status = invade(job);
pthread_mutex_lock(&job->mutex);
if (job->status == WORKING) {
job->status = status;
}
}
pthread_mutex_unlock(&job->mutex);
free(job);
}
}
void cancel_invade_job(struct invade_job_struct *const job)
{
pthread_mutex_lock(&job->mutex);
if (job->status == PENDING || job->status == WORKING) {
job->canceled = ~0;
job->status = CANCELED;
}
pthread_mutex_unlock(&job->mutex);
}
#define DEBUG 1
int main()
{
int i;
int recv_bytes;
int fifo_fd;
char fifo_name[100];
char msg[100];
pthread_attr_t attrs;
pthread_t id[THREADS_PER_SET];
struct invade_job_struct *job;
sprintf(fifo_name, "runtime_system_fifo%i\0", 1);
mkfifo(fifo_name, 0666);
fifo_fd = open(fifo_name, O_RDONLY | O_NONBLOCK);
pthread_attr_init(&attrs);
pthread_attr_setstacksize(&attrs, 65536);
for (i = 0; i < THREADS_PER_SET; i++) {
pthread_create(&id[i], &attrs, invade_thread_worker, NULL);
}
while(1) {
recv_bytes = read(fifo_fd, msg, sizeof(msg));
msg[100] = 0;
if(recv_bytes != 0) {
pthread_mutex_lock(&work_mutex);
/* empty invade job queue */
if(invade_queue_head == NULL) {
invade_queue_head = (struct invade_job_struct *) calloc(1, sizeof(struct invade_job_struct));
invade_queue_head->status = PENDING;
invade_queue_head->n = 10;
invade_queue_ptr = invade_queue_head;
}
else {
invade_queue_ptr = invade_queue_head;
while (invade_queue_ptr->next != NULL) {
invade_queue_ptr = invade_queue_ptr->next;
}
invade_queue_ptr->next = (struct invade_job_struct *) calloc(1, sizeof(struct invade_job_struct));
invade_queue_ptr = invade_queue_ptr->next;
invade_queue_ptr->status = PENDING;
invade_queue_ptr->n = 10;
invade_queue_ptr->next = NULL;
}
pthread_cond_signal(&work_cond);
pthread_mutex_unlock(&work_mutex);
}
}
return EXIT_SUCCESS;
}
There is a failure. I create 100 threads, but i can only append one job to the job queue. All other jobs aren't calculated