I’ve found a need for the channels I created. I plan on having a GUI library in one thread, handling windows, and a REPL in another. So I’m using channels to communicate. The REPL could be in a separate window handled by the GUI library, but that’s no fun. Interfaces should be responsive. There’s no reason to be unable to interact with the window just because the REPL is doing something.
So here is the new version with non-blocking versions of add and get:
#include <semaphore.h> | |
#include <pthread.h> | |
#include <stdlib.h> | |
#include <stdio.h> | |
#include <string.h> | |
#include "channel.h" | |
struct channel_s { | |
sem_t empty_semaphore; | |
sem_t fill_semaphore; | |
pthread_mutex_t mutex; | |
char *buffer; | |
int buffer_length; | |
size_t object_size; | |
int insert_point; | |
int remove_point; | |
}; | |
void channel_init(channel_t **c, int buffer_length, size_t object_size) { | |
int error; | |
*c = malloc(sizeof **c); | |
if (*c == NULL) { | |
fprintf(stderr, "channel allocation error\n"); | |
return; | |
} | |
(*c)->buffer_length = buffer_length; | |
(*c)->object_size = object_size; | |
(*c)->buffer = malloc(object_size * buffer_length); | |
(*c)->insert_point = 0; | |
(*c)->remove_point = 0; | |
error = pthread_mutex_init(&c->mutex, NULL); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
// Semaphores ensure there are items/space to read/write | |
sem_init(&(*c)->empty_semaphore, 0, buffer_length); | |
sem_init(&(*c)->fill_semaphore, 0, 0); | |
} | |
void channel_add(channel_t *c, void *item) { | |
int error; | |
sem_wait(&c->empty_semaphore); // Reserve an empty space | |
error = pthread_mutex_lock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
memcpy(c->buffer + c->insert_point, (const void *) item, c->object_size); | |
c->insert_point = (c->insert_point + 1) % c->buffer_length; | |
error = pthread_mutex_unlock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
sem_post(&c->fill_semaphore); // Increase number of items | |
} | |
/* Nonblocking, returns 1 on Success, 0 on Failure*/ | |
int channel_tryadd(channel_t *c, void *item) { | |
int error; | |
if (sem_trywait(&c->empty_semaphore) != 0) // Attempt to reserve an empty space | |
return 0; | |
error = pthread_mutex_lock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
memcpy(c->buffer + c->insert_point, (char *) item, c->object_size); | |
c->insert_point = (c->insert_point + 1) % c->buffer_length; | |
error = pthread_mutex_unlock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
sem_post(&c->fill_semaphore); // Increase number of items | |
return 1; | |
} | |
void channel_get(channel_t *c, void *space) { | |
int error; | |
sem_wait(&c->fill_semaphore); // Wait for an item | |
error = pthread_mutex_lock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
memcpy(space, c->buffer + c->remove_point, c->object_size); | |
c->remove_point = (c->remove_point + 1) % c->buffer_length; | |
error = pthread_mutex_unlock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
sem_post(&c->empty_semaphore); // Free the space | |
} | |
/* Nonblocking, Returns 1 on Success, 0 on Failure */ | |
int channel_tryget(channel_t *c, void *space) { | |
int error; | |
if (sem_trywait(&c->fill_semaphore) != 0) // Check for an item | |
return 0; | |
error = pthread_mutex_lock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
memcpy(space, c->buffer + c->remove_point, c->object_size); | |
c->remove_point = (c->remove_point + 1) % c->buffer_length; | |
error = pthread_mutex_unlock(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
sem_post(&c->empty_semaphore); // Free the space | |
return 1; | |
} | |
/* | |
Items still in buffer are forfeit | |
May cause memory to leak if pointers are involved. | |
*/ | |
void channel_destroy(channel_t *c) { | |
int error; | |
free(c->buffer); | |
sem_destroy(&c->empty_semaphore); | |
sem_destroy(&c->fill_semaphore); | |
error = pthread_mutex_destroy(&c->mutex); | |
if (error != 0) | |
fprintf(stderr, "channel error:%d\n", error); | |
free(c); | |
} |
#ifndef CHANNEL_H | |
#define CHANNEL_H | |
#include <stddef.h> | |
typedef struct channel_s channel_t; | |
void channel_init(channel_t **, int buffer_length, size_t object_size); | |
void channel_add(channel_t *, void *item); | |
int channel_tryadd(channel_t *, void *item); | |
void channel_get(channel_t *, void *space); | |
int channel_tryget(channel_t *, void *space); | |
void channel_destroy(channel_t *); | |
#endif /* CHANNEL_H */ |
By using sem_trywait instead of sem_wait, we become non-blocking. This means the main loop of the GUI with end up checking for events and processing them, then attending to the channel. Like so:
while (True) { if (channel_tryget(c, item)) process_item(item); while (gtk_events_pending()) gtk_main_iteration(); }