00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00044 #include <string.h>
00045
00046 #include "sbthread.h"
00047 #include "ckd_alloc.h"
00048 #include "err.h"
00049
00050
00051
00052
00053 #if (defined(_WIN32) || defined(__CYGWIN__)) && !defined(__SYMBIAN32__)
00054 #define _WIN32_WINNT 0x0400
00055 #include <windows.h>
00056
00057 struct sbthread_s {
00058 cmd_ln_t *config;
00059 sbmsgq_t *msgq;
00060 sbthread_main func;
00061 void *arg;
00062 HANDLE th;
00063 DWORD tid;
00064 };
00065
00066 struct sbmsgq_s {
00067
00068 char *data;
00069 size_t depth;
00070 size_t out;
00071 size_t nbytes;
00072
00073
00074 char *msg;
00075 size_t msglen;
00076 CRITICAL_SECTION mtx;
00077 HANDLE evt;
00078 };
00079
00080 struct sbevent_s {
00081 HANDLE evt;
00082 };
00083
00084 struct sbmtx_s {
00085 CRITICAL_SECTION mtx;
00086 };
00087
00088 DWORD WINAPI
00089 sbthread_internal_main(LPVOID arg)
00090 {
00091 sbthread_t *th = (sbthread_t *)arg;
00092 int rv;
00093
00094 rv = (*th->func)(th);
00095 return (DWORD)rv;
00096 }
00097
00098 sbthread_t *
00099 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
00100 {
00101 sbthread_t *th;
00102
00103 th = ckd_calloc(1, sizeof(*th));
00104 th->config = config;
00105 th->func = func;
00106 th->arg = arg;
00107 th->msgq = sbmsgq_init(256);
00108 th->th = CreateThread(NULL, 0, sbthread_internal_main, th, 0, &th->tid);
00109 if (th->th == NULL) {
00110 sbthread_free(th);
00111 return NULL;
00112 }
00113 return th;
00114 }
00115
00116 int
00117 sbthread_wait(sbthread_t *th)
00118 {
00119 DWORD rv, exit;
00120
00121
00122 if (th->th == NULL)
00123 return -1;
00124
00125 rv = WaitForSingleObject(th->th, INFINITE);
00126 if (rv == WAIT_FAILED) {
00127 E_ERROR("Failed to join thread: WAIT_FAILED\n");
00128 return -1;
00129 }
00130 GetExitCodeThread(th->th, &exit);
00131 CloseHandle(th->th);
00132 th->th = NULL;
00133 return (int)exit;
00134 }
00135
00136 static DWORD
00137 cond_timed_wait(HANDLE cond, int sec, int nsec)
00138 {
00139 DWORD rv;
00140 if (sec == -1) {
00141 rv = WaitForSingleObject(cond, INFINITE);
00142 }
00143 else {
00144 DWORD ms;
00145
00146 ms = sec * 1000 + nsec / (1000*1000);
00147 rv = WaitForSingleObject(cond, ms);
00148 }
00149 return rv;
00150 }
00151
00152
00153 sbevent_t *
00154 sbevent_init(void)
00155 {
00156 sbevent_t *evt;
00157
00158 evt = ckd_calloc(1, sizeof(*evt));
00159 evt->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
00160 if (evt->evt == NULL) {
00161 ckd_free(evt);
00162 return NULL;
00163 }
00164 return evt;
00165 }
00166
00167 void
00168 sbevent_free(sbevent_t *evt)
00169 {
00170 CloseHandle(evt->evt);
00171 ckd_free(evt);
00172 }
00173
00174 int
00175 sbevent_signal(sbevent_t *evt)
00176 {
00177 return SetEvent(evt->evt) ? 0 : -1;
00178 }
00179
00180 int
00181 sbevent_wait(sbevent_t *evt, int sec, int nsec)
00182 {
00183 DWORD rv;
00184
00185 rv = cond_timed_wait(evt->evt, sec, nsec);
00186 return rv;
00187 }
00188
00189 sbmtx_t *
00190 sbmtx_init(void)
00191 {
00192 sbmtx_t *mtx;
00193
00194 mtx = ckd_calloc(1, sizeof(*mtx));
00195 InitializeCriticalSection(&mtx->mtx);
00196 return mtx;
00197 }
00198
00199 int
00200 sbmtx_trylock(sbmtx_t *mtx)
00201 {
00202 return TryEnterCriticalSection(&mtx->mtx) ? 0 : -1;
00203 }
00204
00205 int
00206 sbmtx_lock(sbmtx_t *mtx)
00207 {
00208 EnterCriticalSection(&mtx->mtx);
00209 return 0;
00210 }
00211
00212 int
00213 sbmtx_unlock(sbmtx_t *mtx)
00214 {
00215 LeaveCriticalSection(&mtx->mtx);
00216 return 0;
00217 }
00218
00219 void
00220 sbmtx_free(sbmtx_t *mtx)
00221 {
00222 DeleteCriticalSection(&mtx->mtx);
00223 ckd_free(mtx);
00224 }
00225
00226 sbmsgq_t *
00227 sbmsgq_init(size_t depth)
00228 {
00229 sbmsgq_t *msgq;
00230
00231 msgq = ckd_calloc(1, sizeof(*msgq));
00232 msgq->depth = depth;
00233 msgq->evt = CreateEventW(NULL, FALSE, FALSE, NULL);
00234 if (msgq->evt == NULL) {
00235 ckd_free(msgq);
00236 return NULL;
00237 }
00238 InitializeCriticalSection(&msgq->mtx);
00239 msgq->data = ckd_calloc(depth, 1);
00240 msgq->msg = ckd_calloc(depth, 1);
00241 return msgq;
00242 }
00243
00244 void
00245 sbmsgq_free(sbmsgq_t *msgq)
00246 {
00247 CloseHandle(msgq->evt);
00248 ckd_free(msgq->data);
00249 ckd_free(msgq->msg);
00250 ckd_free(msgq);
00251 }
00252
00253 int
00254 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
00255 {
00256 char const *cdata = (char const *)data;
00257 size_t in;
00258
00259
00260 if (len + sizeof(len) > q->depth)
00261 return -1;
00262
00263 if (q->nbytes + len + sizeof(len) > q->depth)
00264 WaitForSingleObject(q->evt, INFINITE);
00265
00266
00267
00268 EnterCriticalSection(&q->mtx);
00269 in = (q->out + q->nbytes) % q->depth;
00270
00271 if (in + sizeof(len) > q->depth) {
00272
00273 size_t len1 = q->depth - in;
00274 memcpy(q->data + in, &len, len1);
00275 memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
00276 q->nbytes += sizeof(len);
00277 in = sizeof(len) - len1;
00278 }
00279 else {
00280 memcpy(q->data + in, &len, sizeof(len));
00281 q->nbytes += sizeof(len);
00282 in += sizeof(len);
00283 }
00284
00285
00286 if (in + len > q->depth) {
00287
00288 size_t len1 = q->depth - in;
00289 memcpy(q->data + in, cdata, len1);
00290 q->nbytes += len1;
00291 cdata += len1;
00292 len -= len1;
00293 in = 0;
00294 }
00295 memcpy(q->data + in, cdata, len);
00296 q->nbytes += len;
00297
00298
00299 SetEvent(q->evt);
00300
00301 LeaveCriticalSection(&q->mtx);
00302
00303 return 0;
00304 }
00305
00306 void *
00307 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
00308 {
00309 char *outptr;
00310 size_t len;
00311
00312
00313 if (q->nbytes == 0) {
00314 if (cond_timed_wait(q->evt, sec, nsec) == WAIT_FAILED)
00315
00316 return NULL;
00317 }
00318
00319 EnterCriticalSection(&q->mtx);
00320
00321 if (q->out + sizeof(q->msglen) > q->depth) {
00322
00323 size_t len1 = q->depth - q->out;
00324 memcpy(&q->msglen, q->data + q->out, len1);
00325 memcpy(((char *)&q->msglen) + len1, q->data,
00326 sizeof(q->msglen) - len1);
00327 q->out = sizeof(q->msglen) - len1;
00328 }
00329 else {
00330 memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
00331 q->out += sizeof(q->msglen);
00332 }
00333 q->nbytes -= sizeof(q->msglen);
00334
00335 outptr = q->msg;
00336 len = q->msglen;
00337 if (q->out + q->msglen > q->depth) {
00338
00339 size_t len1 = q->depth - q->out;
00340 memcpy(outptr, q->data + q->out, len1);
00341 outptr += len1;
00342 len -= len1;
00343 q->nbytes -= len1;
00344 q->out = 0;
00345 }
00346 memcpy(outptr, q->data + q->out, len);
00347 q->nbytes -= len;
00348 q->out += len;
00349
00350
00351 SetEvent(q->evt);
00352
00353 LeaveCriticalSection(&q->mtx);
00354 if (out_len)
00355 *out_len = q->msglen;
00356 return q->msg;
00357 }
00358
00359 #else
00360 #include <pthread.h>
00361 #include <sys/time.h>
00362
00363 struct sbthread_s {
00364 cmd_ln_t *config;
00365 sbmsgq_t *msgq;
00366 sbthread_main func;
00367 void *arg;
00368 pthread_t th;
00369 };
00370
00371 struct sbmsgq_s {
00372
00373 char *data;
00374 size_t depth;
00375 size_t out;
00376 size_t nbytes;
00377
00378
00379 char *msg;
00380 size_t msglen;
00381 pthread_mutex_t mtx;
00382 pthread_cond_t cond;
00383 };
00384
00385 struct sbevent_s {
00386 pthread_mutex_t mtx;
00387 pthread_cond_t cond;
00388 int signalled;
00389 };
00390
00391 struct sbmtx_s {
00392 pthread_mutex_t mtx;
00393 };
00394
00395 static void *
00396 sbthread_internal_main(void *arg)
00397 {
00398 sbthread_t *th = (sbthread_t *)arg;
00399 int rv;
00400
00401 rv = (*th->func)(th);
00402 return (void *)(long)rv;
00403 }
00404
00405 sbthread_t *
00406 sbthread_start(cmd_ln_t *config, sbthread_main func, void *arg)
00407 {
00408 sbthread_t *th;
00409 int rv;
00410
00411 th = ckd_calloc(1, sizeof(*th));
00412 th->config = config;
00413 th->func = func;
00414 th->arg = arg;
00415 th->msgq = sbmsgq_init(1024);
00416 if ((rv = pthread_create(&th->th, NULL, &sbthread_internal_main, th)) != 0) {
00417 E_ERROR("Failed to create thread: %d\n", rv);
00418 sbthread_free(th);
00419 return NULL;
00420 }
00421 return th;
00422 }
00423
00424 int
00425 sbthread_wait(sbthread_t *th)
00426 {
00427 void *exit;
00428 int rv;
00429
00430
00431 if (th->th == (pthread_t)-1)
00432 return -1;
00433
00434 rv = pthread_join(th->th, &exit);
00435 if (rv != 0) {
00436 E_ERROR("Failed to join thread: %d\n", rv);
00437 return -1;
00438 }
00439 th->th = (pthread_t)-1;
00440 return (int)(long)exit;
00441 }
00442
00443 sbmsgq_t *
00444 sbmsgq_init(size_t depth)
00445 {
00446 sbmsgq_t *msgq;
00447
00448 msgq = ckd_calloc(1, sizeof(*msgq));
00449 msgq->depth = depth;
00450 if (pthread_cond_init(&msgq->cond, NULL) != 0) {
00451 ckd_free(msgq);
00452 return NULL;
00453 }
00454 if (pthread_mutex_init(&msgq->mtx, NULL) != 0) {
00455 pthread_cond_destroy(&msgq->cond);
00456 ckd_free(msgq);
00457 return NULL;
00458 }
00459 msgq->data = ckd_calloc(depth, 1);
00460 msgq->msg = ckd_calloc(depth, 1);
00461 return msgq;
00462 }
00463
00464 void
00465 sbmsgq_free(sbmsgq_t *msgq)
00466 {
00467 pthread_mutex_destroy(&msgq->mtx);
00468 pthread_cond_destroy(&msgq->cond);
00469 ckd_free(msgq->data);
00470 ckd_free(msgq->msg);
00471 ckd_free(msgq);
00472 }
00473
00474 int
00475 sbmsgq_send(sbmsgq_t *q, size_t len, void const *data)
00476 {
00477 size_t in;
00478
00479
00480 if (len + sizeof(len) > q->depth)
00481 return -1;
00482
00483
00484 pthread_mutex_lock(&q->mtx);
00485 if (q->nbytes + len + sizeof(len) > q->depth) {
00486
00487 if (pthread_cond_wait(&q->cond, &q->mtx) != 0) {
00488
00489 pthread_mutex_unlock(&q->mtx);
00490 return -1;
00491 }
00492
00493 }
00494 in = (q->out + q->nbytes) % q->depth;
00495
00496
00497 if (in + sizeof(len) > q->depth) {
00498
00499 size_t len1 = q->depth - in;
00500 memcpy(q->data + in, &len, len1);
00501 memcpy(q->data, ((char *)&len) + len1, sizeof(len) - len1);
00502 q->nbytes += sizeof(len);
00503 in = sizeof(len) - len1;
00504 }
00505 else {
00506 memcpy(q->data + in, &len, sizeof(len));
00507 q->nbytes += sizeof(len);
00508 in += sizeof(len);
00509 }
00510
00511
00512 if (in + len > q->depth) {
00513
00514 size_t len1 = q->depth - in;
00515 memcpy(q->data + in, data, len1);
00516 q->nbytes += len1;
00517 data = (char const *)data + len1;
00518 len -= len1;
00519 in = 0;
00520 }
00521 memcpy(q->data + in, data, len);
00522 q->nbytes += len;
00523
00524
00525 pthread_cond_signal(&q->cond);
00526
00527 pthread_mutex_unlock(&q->mtx);
00528 return 0;
00529 }
00530
00531 static int
00532 cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mtx, int sec, int nsec)
00533 {
00534 int rv;
00535 if (sec == -1) {
00536 rv = pthread_cond_wait(cond, mtx);
00537 }
00538 else {
00539 struct timeval now;
00540 struct timespec end;
00541
00542 gettimeofday(&now, NULL);
00543 end.tv_sec = now.tv_sec + sec;
00544 end.tv_nsec = now.tv_usec * 1000 + nsec;
00545 if (end.tv_nsec > (1000*1000*1000)) {
00546 sec += end.tv_nsec / (1000*1000*1000);
00547 end.tv_nsec = end.tv_nsec % (1000*1000*1000);
00548 }
00549 rv = pthread_cond_timedwait(cond, mtx, &end);
00550 }
00551 return rv;
00552 }
00553
00554 void *
00555 sbmsgq_wait(sbmsgq_t *q, size_t *out_len, int sec, int nsec)
00556 {
00557 char *outptr;
00558 size_t len;
00559
00560
00561 pthread_mutex_lock(&q->mtx);
00562 if (q->nbytes == 0) {
00563
00564 if (cond_timed_wait(&q->cond, &q->mtx, sec, nsec) != 0) {
00565
00566 pthread_mutex_unlock(&q->mtx);
00567 return NULL;
00568 }
00569
00570 }
00571
00572 if (q->out + sizeof(q->msglen) > q->depth) {
00573
00574 size_t len1 = q->depth - q->out;
00575 memcpy(&q->msglen, q->data + q->out, len1);
00576 memcpy(((char *)&q->msglen) + len1, q->data,
00577 sizeof(q->msglen) - len1);
00578 q->out = sizeof(q->msglen) - len1;
00579 }
00580 else {
00581 memcpy(&q->msglen, q->data + q->out, sizeof(q->msglen));
00582 q->out += sizeof(q->msglen);
00583 }
00584 q->nbytes -= sizeof(q->msglen);
00585
00586 outptr = q->msg;
00587 len = q->msglen;
00588 if (q->out + q->msglen > q->depth) {
00589
00590 size_t len1 = q->depth - q->out;
00591 memcpy(outptr, q->data + q->out, len1);
00592 outptr += len1;
00593 len -= len1;
00594 q->nbytes -= len1;
00595 q->out = 0;
00596 }
00597 memcpy(outptr, q->data + q->out, len);
00598 q->nbytes -= len;
00599 q->out += len;
00600
00601
00602 pthread_cond_signal(&q->cond);
00603
00604 pthread_mutex_unlock(&q->mtx);
00605 if (out_len)
00606 *out_len = q->msglen;
00607 return q->msg;
00608 }
00609
00610 sbevent_t *
00611 sbevent_init(void)
00612 {
00613 sbevent_t *evt;
00614 int rv;
00615
00616 evt = ckd_calloc(1, sizeof(*evt));
00617 if ((rv = pthread_mutex_init(&evt->mtx, NULL)) != 0) {
00618 E_ERROR("Failed to initialize mutex: %d\n", rv);
00619 ckd_free(evt);
00620 return NULL;
00621 }
00622 if ((rv = pthread_cond_init(&evt->cond, NULL)) != 0) {
00623 E_ERROR_SYSTEM("Failed to initialize mutex: %d\n", rv);
00624 pthread_mutex_destroy(&evt->mtx);
00625 ckd_free(evt);
00626 return NULL;
00627 }
00628 return evt;
00629 }
00630
00631 void
00632 sbevent_free(sbevent_t *evt)
00633 {
00634 pthread_mutex_destroy(&evt->mtx);
00635 pthread_cond_destroy(&evt->cond);
00636 ckd_free(evt);
00637 }
00638
00639 int
00640 sbevent_signal(sbevent_t *evt)
00641 {
00642 int rv;
00643
00644 pthread_mutex_lock(&evt->mtx);
00645 evt->signalled = TRUE;
00646 rv = pthread_cond_signal(&evt->cond);
00647 pthread_mutex_unlock(&evt->mtx);
00648 return rv;
00649 }
00650
00651 int
00652 sbevent_wait(sbevent_t *evt, int sec, int nsec)
00653 {
00654 int rv = 0;
00655
00656
00657 pthread_mutex_lock(&evt->mtx);
00658
00659 if (!evt->signalled)
00660 rv = cond_timed_wait(&evt->cond, &evt->mtx, sec, nsec);
00661
00662 if (rv == 0)
00663 evt->signalled = FALSE;
00664
00665 pthread_mutex_unlock(&evt->mtx);
00666
00667 return rv;
00668 }
00669
00670 sbmtx_t *
00671 sbmtx_init(void)
00672 {
00673 sbmtx_t *mtx;
00674
00675 mtx = ckd_calloc(1, sizeof(*mtx));
00676 if (pthread_mutex_init(&mtx->mtx, NULL) != 0) {
00677 ckd_free(mtx);
00678 return NULL;
00679 }
00680 return mtx;
00681 }
00682
00683 int
00684 sbmtx_trylock(sbmtx_t *mtx)
00685 {
00686 return pthread_mutex_trylock(&mtx->mtx);
00687 }
00688
00689 int
00690 sbmtx_lock(sbmtx_t *mtx)
00691 {
00692 return pthread_mutex_lock(&mtx->mtx);
00693 }
00694
00695 int
00696 sbmtx_unlock(sbmtx_t *mtx)
00697 {
00698 return pthread_mutex_unlock(&mtx->mtx);
00699 }
00700
00701 void
00702 sbmtx_free(sbmtx_t *mtx)
00703 {
00704 pthread_mutex_destroy(&mtx->mtx);
00705 ckd_free(mtx);
00706 }
00707 #endif
00708
00709 cmd_ln_t *
00710 sbthread_config(sbthread_t *th)
00711 {
00712 return th->config;
00713 }
00714
00715 void *
00716 sbthread_arg(sbthread_t *th)
00717 {
00718 return th->arg;
00719 }
00720
00721 sbmsgq_t *
00722 sbthread_msgq(sbthread_t *th)
00723 {
00724 return th->msgq;
00725 }
00726
00727 int
00728 sbthread_send(sbthread_t *th, size_t len, void const *data)
00729 {
00730 return sbmsgq_send(th->msgq, len, data);
00731 }
00732
00733 void
00734 sbthread_free(sbthread_t *th)
00735 {
00736 sbthread_wait(th);
00737 sbmsgq_free(th->msgq);
00738 ckd_free(th);
00739 }