/* * pipe-sem by Davide Libenzi ( Pipe based semaphore implementation ) * Copyright (C) 1998 Davide Libenzi * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * Davide Libenzi * */ #include #include #include #include #include #include #include "pipe-sem.h" #define PSEM_FD_ERR (-1) #define PSEM_POLL_ERR (-2) #define PSEM_INTR_ERR (-3) static unsigned long long psem_mstime(void); static int psem_setnoblock(int fd); static int psem_down_poll(struct pipe_sem *ps, int timeout); static int psem_up_poll(struct pipe_sem *ps, int timeout); static unsigned long long psem_mstime(void) { struct timeval tm; gettimeofday(&tm, NULL); return (unsigned long long) tm.tv_sec * 1000ULL + (unsigned long long) tm.tv_usec / 1000ULL; } static int psem_setnoblock(int fd) { int flags; if ((flags = fcntl(fd, F_GETFL)) == -1) return -1; return fcntl(fd, F_SETFL, flags | O_NONBLOCK); } static int psem_down_poll(struct pipe_sem *ps, int timeout) { struct pollfd pfd; pfd.fd = ps->fds[0]; pfd.events = POLLIN; pfd.revents = 0; if (poll(&pfd, 1, timeout) == -1) return errno == EINTR ? PSEM_INTR_ERR: PSEM_POLL_ERR; if (pfd.revents & POLLERR) return PSEM_FD_ERR; return (pfd.revents & POLLIN) ? 1: 0; } static int psem_up_poll(struct pipe_sem *ps, int timeout) { struct pollfd pfd; pfd.fd = ps->fds[1]; pfd.events = POLLOUT; pfd.revents = 0; if (poll(&pfd, 1, timeout) == -1) return errno == EINTR ? PSEM_INTR_ERR: PSEM_POLL_ERR; if (pfd.revents & POLLERR) return PSEM_FD_ERR; return (pfd.revents & POLLOUT) ? 1: 0; } int psem_try_down(struct pipe_sem *ps) { unsigned char b; return read(ps->fds[0], &b, 1) == 1 ? 0: -1; } int psem_down_timeo(struct pipe_sem *ps, int timeout) { int res; unsigned long long emst, cmst; unsigned char b; if (read(ps->fds[0], &b, 1) == 1) return 0; emst = psem_mstime() + timeout; for (;;) { if ((res = psem_down_poll(ps, timeout)) > 0 && read(ps->fds[0], &b, 1) == 1) return 0; if (!res) break; if (res < 0 && res != PSEM_INTR_ERR) break; if (timeout > 0) { cmst = psem_mstime(); if (cmst >= emst) break; timeout = (int) (emst - cmst); } } return -1; } int psem_down(struct pipe_sem *ps) { int res = 1; unsigned char b; for (;;) { if (res > 0 && read(ps->fds[0], &b, 1) == 1) break; if ((res = psem_down_poll(ps, -1)) < 0 && res != PSEM_INTR_ERR) return -1; } return 0; } int psem_up(struct pipe_sem *ps) { int res = 1; for (;;) { if (res > 0 && write(ps->fds[1], "w", 1) == 1) break; if ((res = psem_up_poll(ps, -1)) < 0 && res != PSEM_INTR_ERR) return -1; } return 0; } int psem_down_fd(struct pipe_sem *ps) { return ps->fds[0]; } int psem_init(struct pipe_sem *ps, int icnt) { if (pipe(ps->fds) < 0) return -1; for (; icnt > 0; icnt--) write(ps->fds[1], "w", 1); if (psem_setnoblock(ps->fds[0]) == -1 || psem_setnoblock(ps->fds[1]) == -1) { close(ps->fds[0]); close(ps->fds[1]); return -1; } return 0; } void psem_cleanup(struct pipe_sem *ps) { close(ps->fds[0]); close(ps->fds[1]); } #if defined(PIPE_TEST) /* * gcc -o pipe-sem -DPIPE_TEST pipe-sem.c -lpthread */ #include #include #include static int num_threads, num_iters, num_running, debug; static volatile int counter; static struct pipe_sem ps; static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; static unsigned long long smst, emst; static void *thproc(void *data) { int i, thn = (int) data, cnt; pthread_mutex_lock(&mtx); num_running++; if (num_running == num_threads) smst = psem_mstime(); pthread_mutex_unlock(&mtx); for (i = 0; i < num_iters; i++) { sched_yield(); if (psem_down(&ps) == -1) { fprintf(stderr, "psem_down() error: thr=%d\n", thn); exit(1); } if (debug) fprintf(stdout, "got sem: thr=%d\n", thn); cnt = ++counter; sched_yield(); if (cnt != counter) { fprintf(stderr, "aiee! consistency error: thr=%d\n", thn); exit(2); } if (debug) fprintf(stdout, "giving sem: thr=%d\n", thn); if (psem_up(&ps) == -1) { fprintf(stderr, "psem_up() error: thr=%d\n", thn); exit(1); } } pthread_mutex_lock(&mtx); num_running--; if (num_running == 0) emst = psem_mstime(); pthread_mutex_unlock(&mtx); return NULL; } int main(int ac, char **av) { int i; pthread_attr_t thattr; pthread_t thid; num_threads = 8; num_iters = 100; for (i = 1; i < ac; i++) { if (!strcmp(av[i], "-n") || !strcmp(av[i], "--num-threads")) { if (++i < ac) num_threads = atoi(av[i]); } else if (!strcmp(av[i], "-i") || !strcmp(av[i], "--num-iters")) { if (++i < ac) num_iters = atoi(av[i]); } else if (!strcmp(av[i], "-d") || !strcmp(av[i], "--debug")) { if (++i < ac) debug = atoi(av[i]); } } psem_init(&ps, 1); pthread_mutex_lock(&mtx); pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); for (i = 0; i < num_threads; i++) { if (pthread_create(&thid, &thattr, thproc, (void *) i) != 0) { perror("pthread_create()"); return 5; } } pthread_attr_destroy(&thattr); pthread_mutex_unlock(&mtx); do { sleep(1); } while (num_running); fprintf(stdout, "time = %llu ms counter = %d rate = %llu du/ms\n", emst - smst, counter, counter / (emst - smst + 1)); psem_cleanup(&ps); return 0; } #endif