00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef __SYNC_H__
00012 #define __SYNC_H__
00013
00014
00015 BEGIN_GIGABASE_NAMESPACE
00016
00017 #if defined(_WIN32)
00018 class GIGABASE_DLL_ENTRY dbMutex {
00019 CRITICAL_SECTION cs;
00020 bool initialized;
00021 public:
00022 dbMutex() {
00023 InitializeCriticalSection(&cs);
00024 initialized = true;
00025 }
00026 ~dbMutex() {
00027 DeleteCriticalSection(&cs);
00028 initialized = false;
00029 }
00030 bool isInitialized() {
00031 return initialized;
00032 }
00033 void lock() {
00034 if (initialized) {
00035 EnterCriticalSection(&cs);
00036 }
00037 }
00038 void unlock() {
00039 if (initialized) {
00040 LeaveCriticalSection(&cs);
00041 }
00042 }
00043 };
00044
00045 #define thread_proc WINAPI
00046
00047 class GIGABASE_DLL_ENTRY dbThread {
00048 HANDLE h;
00049 public:
00050 enum ThreadPriority {
00051 THR_PRI_LOW,
00052 THR_PRI_HIGH
00053 };
00054
00055 void setPriority(ThreadPriority pri) {
00056 SetThreadPriority(h, pri == THR_PRI_LOW ? THREAD_PRIORITY_IDLE : THREAD_PRIORITY_HIGHEST);
00057 }
00058
00059 typedef void (thread_proc* thread_proc_t)(void*);
00060
00061 void create(thread_proc_t f, void* arg) {
00062 DWORD threadid;
00063 h = CreateThread(NULL, 0, LPTHREAD_START_ROUTINE(f), arg, 0, &threadid);
00064 }
00065 void join() {
00066 WaitForSingleObject(h, INFINITE);
00067 CloseHandle(h);
00068 h = NULL;
00069 }
00070 void detach() {
00071 if (h != NULL) {
00072 CloseHandle(h);
00073 h = NULL;
00074 }
00075 }
00076 dbThread() {
00077 h = NULL;
00078 }
00079 ~dbThread() {
00080 if (h != NULL) {
00081 CloseHandle(h);
00082 }
00083 }
00084 static int numberOfProcessors() {
00085 SYSTEM_INFO sysinfo;
00086 GetSystemInfo(&sysinfo);
00087 return sysinfo.dwNumberOfProcessors;
00088 }
00089 };
00090
00091 const int dbMaxSemValue = 1000000;
00092
00093
00094 class GIGABASE_DLL_ENTRY dbSemaphore {
00095 HANDLE s;
00096 public:
00097 void wait(dbMutex& mutex, time_t timeout = INFINITE) {
00098 mutex.unlock();
00099 int rc = WaitForSingleObject(s, (DWORD)(timeout == (time_t)INFINITE ? timeout : timeout*1000));
00100 assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT);
00101 mutex.lock();
00102 }
00103 void signal(unsigned inc = 1) {
00104 if (inc != 0) {
00105 ReleaseSemaphore(s, inc, NULL);
00106 }
00107 }
00108 void open(unsigned initValue = 0) {
00109 s = CreateSemaphore(NULL, initValue, dbMaxSemValue, NULL);
00110 assert(s != NULL);
00111 }
00112 void close() {
00113 CloseHandle(s);
00114 }
00115 dbSemaphore() {
00116 s = NULL;
00117 }
00118 };
00119
00120 class GIGABASE_DLL_ENTRY dbEvent {
00121 HANDLE e;
00122 int nWaitingThreads;
00123 int nPulses;
00124 public:
00125 void wait(dbMutex& mutex, time_t timeout = INFINITE) {
00126 nWaitingThreads += 1;
00127 mutex.unlock();
00128 int rc = WaitForSingleObject(e, (DWORD)(timeout == (time_t)INFINITE ? timeout : timeout*1000));
00129 assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT);
00130 mutex.lock();
00131 nWaitingThreads -= 1;
00132 if (nPulses > 0) {
00133 nPulses -= 1;
00134 ResetEvent(e);
00135 }
00136 }
00137 void signal() {
00138 SetEvent(e);
00139 }
00140 void reset() {
00141 ResetEvent(e);
00142 }
00143 void pulse() {
00144 if (nWaitingThreads > 0) {
00145 nPulses += 1;
00146 SetEvent(e);
00147 }
00148 }
00149 void open(bool initValue = false) {
00150 e = CreateEvent(NULL, true, initValue, NULL);
00151 nWaitingThreads = 0;
00152 nPulses = 0;
00153 }
00154 void close() {
00155 CloseHandle(e);
00156 }
00157 dbEvent() {
00158 e = NULL;
00159 }
00160 };
00161
00162 template<class T>
00163 class dbThreadContext {
00164 int index;
00165 public:
00166 T* get() {
00167 return (T*)TlsGetValue(index);
00168 }
00169 void set(T* value) {
00170 TlsSetValue(index, value);
00171 }
00172 dbThreadContext() {
00173 index = TlsAlloc();
00174 assert(index != (int)TLS_OUT_OF_INDEXES);
00175 }
00176 ~dbThreadContext() {
00177 TlsFree(index);
00178 }
00179 };
00180
00181 #else // Unix
00182
00183 #define thread_proc
00184
00185 #ifndef NO_PTHREADS
00186
00187 END_GIGABASE_NAMESPACE
00188
00189 #include <unistd.h>
00190 #include <sys/time.h>
00191 #include <pthread.h>
00192
00193 BEGIN_GIGABASE_NAMESPACE
00194
00195 class dbMutex {
00196 friend class dbEvent;
00197 friend class dbSemaphore;
00198 pthread_mutex_t cs;
00199 bool initialized;
00200 public:
00201 dbMutex() {
00202 pthread_mutex_init(&cs, NULL);
00203 initialized = true;
00204 }
00205 ~dbMutex() {
00206 pthread_mutex_destroy(&cs);
00207 initialized = false;
00208 }
00209 bool isInitialized() {
00210 return initialized;
00211 }
00212 void lock() {
00213 if (initialized) {
00214 pthread_mutex_lock(&cs);
00215 }
00216 }
00217 void unlock() {
00218 if (initialized) {
00219 pthread_mutex_unlock(&cs);
00220 }
00221 }
00222 };
00223
00224
00225 const size_t dbThreadStackSize = 1024*1024;
00226
00227 class dbThread {
00228 pthread_t thread;
00229 public:
00230 typedef void (thread_proc* thread_proc_t)(void*);
00231
00232 void create(thread_proc_t f, void* arg) {
00233 pthread_attr_t attr;
00234 pthread_attr_init(&attr);
00235 #if !defined(__linux__)
00236 pthread_attr_setstacksize(&attr, dbThreadStackSize);
00237 #endif
00238 #if defined(_AIX41)
00239
00240 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_UNDETACHED);
00241 #endif
00242 pthread_create(&thread, &attr, (void*(*)(void*))f, arg);
00243 pthread_attr_destroy(&attr);
00244 }
00245
00246 enum ThreadPriority {
00247 THR_PRI_LOW,
00248 THR_PRI_HIGH
00249 };
00250 #if defined(PRI_OTHER_MIN) && defined(PRI_OTHER_MAX)
00251 void setPriority(ThreadPriority pri) {
00252 struct sched_param sp;
00253 sp.sched_priority = pri == THR_PRI_LOW ? PRI_OTHER_MIN : PRI_OTHER_MAX;
00254 pthread_setschedparam(thread, SCHED_OTHER, &sp);
00255 }
00256 #else
00257 void setPriority(ThreadPriority) {}
00258 #endif
00259
00260
00261
00262 void join() {
00263 void* result;
00264 pthread_join(thread, &result);
00265 }
00266 void detach() {
00267 pthread_detach(thread);
00268 }
00269 static int numberOfProcessors();
00270 };
00271
00272 #if defined(_SC_NPROCESSORS_ONLN)
00273 inline int dbThread::numberOfProcessors() {
00274 return sysconf(_SC_NPROCESSORS_ONLN);
00275 }
00276 #elif defined(__linux__)
00277 END_GIGABASE_NAMESPACE
00278 #include <linux/smp.h>
00279 BEGIN_GIGABASE_NAMESPACE
00280 inline int dbThread::numberOfProcessors() { return smp_num_cpus; }
00281 #elif defined(__FreeBSD__) || defined(__bsdi__) || defined(__OpenBSD__)
00282 #if defined(__bsdi__) || defined(__OpenBSD__)
00283 END_GIGABASE_NAMESPACE
00284 #include <sys/param.h>
00285 BEGIN_GIGABASE_NAMESPACE
00286 #endif
00287 END_GIGABASE_NAMESPACE
00288 #include <sys/sysctl.h>
00289 BEGIN_GIGABASE_NAMESPACE
00290 inline int dbThread::numberOfProcessors() {
00291 int mib[2],ncpus=0;
00292 size_t len=sizeof(ncpus);
00293 mib[0]= CTL_HW;
00294 mib[1]= HW_NCPU;
00295 sysctl(mib,2,&ncpus,&len,NULL,0);
00296 return ncpus;
00297 }
00298 #else
00299 #warning Do not know how to detect number of processors: assuming 1
00300 inline int dbThread::numberOfProcessors() { return 1; }
00301 #endif
00302
00303 class dbEvent {
00304 pthread_cond_t cond;
00305 int signaled;
00306 long n_signals;
00307
00308 public:
00309 void wait(dbMutex& mutex) {
00310 long before_n_signals = n_signals;
00311 while (!signaled && n_signals == before_n_signals) {
00312 pthread_cond_wait(&cond, &mutex.cs);
00313 }
00314 }
00315
00316 bool wait(dbMutex& mutex, time_t timeout) {
00317 if (!signaled) {
00318 struct timespec abs_ts;
00319 #ifdef PTHREAD_GET_EXPIRATION_NP
00320 struct timespec rel_ts;
00321 rel_ts.tv_sec = timeout;
00322 rel_ts.tv_nsec = 0;
00323 pthread_get_expiration_np(&rel_ts, &abs_ts);
00324 #else
00325 struct timeval cur_tv;
00326 gettimeofday(&cur_tv, NULL);
00327 abs_ts.tv_sec = cur_tv.tv_sec + timeout;
00328 abs_ts.tv_nsec = cur_tv.tv_usec*1000;
00329 #endif
00330 long before_n_signals = n_signals;
00331 do {
00332 int rc = pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts);
00333 if (rc != 0) {
00334 return false;
00335 }
00336 } while (!signaled && n_signals == before_n_signals);
00337 }
00338 return true;
00339 }
00340
00341 void signal() {
00342 signaled = true;
00343 n_signals += 1;
00344 pthread_cond_broadcast(&cond);
00345 }
00346
00347 void pulse() {
00348 n_signals += 1;
00349 pthread_cond_broadcast(&cond);
00350 }
00351
00352 void reset() {
00353 signaled = false;
00354 }
00355 void open(bool initValue = false) {
00356 signaled = initValue;
00357 n_signals = 0;
00358 pthread_cond_init(&cond, NULL);
00359 }
00360 void close() {
00361 pthread_cond_destroy(&cond);
00362 }
00363 };
00364
00365 class dbSemaphore {
00366 pthread_cond_t cond;
00367 int count;
00368 public:
00369 void wait(dbMutex& mutex) {
00370 while (count == 0) {
00371 pthread_cond_wait(&cond, &mutex.cs);
00372 }
00373 count -= 1;
00374 }
00375
00376 bool wait(dbMutex& mutex, time_t timeout) {
00377 if (count == 0) {
00378 struct timespec abs_ts;
00379 #ifdef PTHREAD_GET_EXPIRATION_NP
00380 struct timespec rel_ts;
00381 rel_ts.tv_sec = timeout;
00382 rel_ts.tv_nsec = 0;
00383 pthread_get_expiration_np(&rel_ts, &abs_ts);
00384 #else
00385 struct timeval cur_tv;
00386 gettimeofday(&cur_tv, NULL);
00387 abs_ts.tv_sec = cur_tv.tv_sec + timeout;
00388 abs_ts.tv_nsec = cur_tv.tv_usec*1000;
00389 #endif
00390 do {
00391 int rc = pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts);
00392 if (rc != 0) {
00393 return false;
00394 }
00395 } while (count == 0);
00396 }
00397 count -= 1;
00398 }
00399
00400 void signal(unsigned inc = 1) {
00401 count += inc;
00402 if (inc > 1) {
00403 pthread_cond_broadcast(&cond);
00404 } else if (inc == 1) {
00405 pthread_cond_signal(&cond);
00406 }
00407 }
00408 void open(unsigned initValue = 0) {
00409 pthread_cond_init(&cond, NULL);
00410 count = initValue;
00411 }
00412 void close() {
00413 pthread_cond_destroy(&cond);
00414 }
00415 };
00416
00417 template<class T>
00418 class dbThreadContext {
00419 pthread_key_t key;
00420 public:
00421 T* get() {
00422 return (T*)pthread_getspecific(key);
00423 }
00424 void set(T* value) {
00425 pthread_setspecific(key, value);
00426 }
00427 dbThreadContext() {
00428 pthread_key_create(&key, NULL);
00429 }
00430 ~dbThreadContext() {
00431 pthread_key_delete(key);
00432 }
00433 };
00434
00435 #else
00436
00437 class dbMutex {
00438 bool initialized;
00439
00440 public:
00441 dbMutex() {
00442 initialized = true;
00443 }
00444
00445 ~dbMutex() {
00446 initialized = false;
00447 }
00448
00449 bool isInitialized() {
00450 return initialized;
00451 }
00452
00453 void lock() {}
00454 void unlock() {}
00455 };
00456
00457 class dbThread {
00458 public:
00459 typedef void (thread_proc* thread_proc_t)(void*);
00460 void create(thread_proc_t f, void* arg) { f(arg); }
00461 void join() {}
00462 void detach() {}
00463
00464 enum ThreadPriority {
00465 THR_PRI_LOW,
00466 THR_PRI_HIGH
00467 };
00468 void setPriority(ThreadPriority) {}
00469
00470 static int numberOfProcessors() { return 1; }
00471 };
00472
00473 class dbSemaphore {
00474 int count;
00475 public:
00476 void wait(dbMutex&, time_t=0) {
00477 assert (count > 0);
00478 count -= 1;
00479 }
00480 void signal(unsigned inc = 1) {
00481 count += inc;
00482 }
00483 void open(unsigned initValue = 0) {
00484 count = initValue;
00485 }
00486 void close() {}
00487 };
00488
00489 class dbEvent {
00490 bool signaled;
00491 public:
00492 void wait(dbMutex&, time_t=0) {
00493 assert(signaled);
00494 }
00495 void signal() {
00496 signaled = true;
00497 }
00498 void reset() {
00499 signaled = false;
00500 }
00501 void open(bool initValue = false) {
00502 signaled = initValue;
00503 }
00504 void pulse() {}
00505 void close() {}
00506 };
00507
00508 template<class T>
00509 class dbThreadContext {
00510 T* value;
00511 public:
00512 T* get() {
00513 return value;
00514 }
00515 void set(T* value) {
00516 this->value = value;
00517 }
00518 dbThreadContext() { value = NULL; }
00519 };
00520
00521
00522 #endif
00523
00524 #endif
00525
00526 class GIGABASE_DLL_ENTRY dbCriticalSection {
00527 private:
00528 dbMutex& mutex;
00529 public:
00530 dbCriticalSection(dbMutex& guard) : mutex(guard) {
00531 mutex.lock();
00532 }
00533 ~dbCriticalSection() {
00534 mutex.unlock();
00535 }
00536 };
00537
00538 #define SMALL_BUF_SIZE 512
00539
00540 template<class T>
00541 class dbSmallBuffer {
00542 protected:
00543 T smallBuf[SMALL_BUF_SIZE];
00544 T* buf;
00545 size_t used;
00546
00547 public:
00548 dbSmallBuffer(size_t size) {
00549 if (size > SMALL_BUF_SIZE) {
00550 buf = new T[size];
00551 } else {
00552 buf = smallBuf;
00553 }
00554 used = size;
00555 }
00556
00557 dbSmallBuffer() {
00558 used = 0;
00559 buf = smallBuf;
00560 }
00561
00562 void put(size_t size) {
00563 if (size > SMALL_BUF_SIZE && size > used) {
00564 if (buf != smallBuf) {
00565 delete[] buf;
00566 }
00567 buf = new T[size];
00568 used = size;
00569 }
00570 }
00571
00572 operator T*() { return buf; }
00573 T* base() { return buf; }
00574
00575 ~dbSmallBuffer() {
00576 if (buf != smallBuf) {
00577 delete[] buf;
00578 }
00579 }
00580 };
00581
00582 class dbThreadPool;
00583
00584 class GIGABASE_DLL_ENTRY dbPooledThread {
00585 private:
00586 friend class dbThreadPool;
00587
00588 dbThread thread;
00589 dbThreadPool* pool;
00590 dbPooledThread* next;
00591 dbThread::thread_proc_t f;
00592 void* arg;
00593 bool running;
00594 dbSemaphore startSem;
00595 dbSemaphore readySem;
00596
00597 static void thread_proc pooledThreadFunc(void* arg);
00598
00599 void run();
00600 void stop();
00601
00602 dbPooledThread(dbThreadPool* threadPool);
00603 ~dbPooledThread();
00604 };
00605
00606 class GIGABASE_DLL_ENTRY dbThreadPool {
00607 friend class dbPooledThread;
00608 dbPooledThread* freeThreads;
00609 dbMutex mutex;
00610
00611 public:
00612 dbPooledThread* create(dbThread::thread_proc_t f, void* arg);
00613 void join(dbPooledThread* thr);
00614 dbThreadPool();
00615 ~dbThreadPool();
00616 };
00617
00618 END_GIGABASE_NAMESPACE
00619
00620 #endif