AGENT++  4.0.3
threads.h
Go to the documentation of this file.
1 /*_############################################################################
2  _##
3  _## AGENT++ 4.0 - threads.h
4  _##
5  _## Copyright (C) 2000-2013 Frank Fock and Jochen Katz (agentpp.com)
6  _##
7  _## Licensed under the Apache License, Version 2.0 (the "License");
8  _## you may not use this file except in compliance with the License.
9  _## You may obtain a copy of the License at
10  _##
11  _## http://www.apache.org/licenses/LICENSE-2.0
12  _##
13  _## Unless required by applicable law or agreed to in writing, software
14  _## distributed under the License is distributed on an "AS IS" BASIS,
15  _## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  _## See the License for the specific language governing permissions and
17  _## limitations under the License.
18  _##
19  _##########################################################################*/
20 
21 
22 #ifndef multi_h_
23 #define multi_h_
24 
25 #include <agent_pp/agent++.h>
26 
27 #ifdef _THREADS
28 #ifdef _WIN32THREADS
29 #include <winbase.h>
30 #else
31 #include <pthread.h>
32 #endif
33 #endif
34 
35 #include <time.h>
36 #include <sys/types.h>
37 
38 #include <agent_pp/List.h>
39 
40 
41 #define MULTI_THREADED TRUE
42 #define SINGLE_THREADED FALSE
43 #define AGENTPP_DEFAULT_STACKSIZE 0x10000
44 
45 #ifdef AGENTPP_NAMESPACE
46 namespace Agentpp {
47 #endif
48 
49 class MibEntry;
50 class Request;
51 class Mib;
52 
53 
54 typedef enum { STANDARD_CB, SHADOW_CB } cb_type;
55 
56 typedef void (Mib::*mib_method_t)(Request*);
57 
58 
59 // The MibMethodCall class stores function pointers of the class Mib
60 
62 
63 public:
65  void (Mib::*m) (Request *),
66  Request* r):
67 
68  called_class(c), method(m), req(r) {
69  }
70 
71  MibMethodCall(const MibMethodCall& other) {
72 
73  called_class = other.called_class;
74  method = other.method;
75  req = other.req;
76  }
77 
81 };
82 
83 
84 void* method_routine_caller(void *);
85 
86 
87 
88 #define TS_SYNCHRONIZED(x) { ThreadSynchronize _ts_synchronize(*this); x }
89 
90 #ifdef _THREADS
91 #ifndef WIN32
92 #ifndef POSIX_THREADS
93 #error "This SYSTEM does not support threads. Undefine _THREADS in agent++.h"
94 #endif
95 #endif
96 
97 
122 
123  public:
124  Runnable() { }
125  virtual ~Runnable() { }
126 
132  virtual void run() = 0;
133  };
134 
143 public:
144  enum TryLockResult { LOCKED = 1, BUSY = 0, OWNED = -1 };
145 
146  Synchronized();
147  ~Synchronized();
148 
154  void wait();
155 
167  bool wait(unsigned long timeout);
168 
173  void notify();
178  void notify_all();
179 
188  bool lock();
189 
200  TryLockResult trylock();
201 
210  bool unlock();
211 
212 
213  private:
214 #ifndef _NO_LOGGING
215  static int next_id;
216  int id;
217 #endif
218 #ifdef POSIX_THREADS
219  int cond_timed_wait(const timespec*);
220  pthread_cond_t cond;
221  pthread_mutex_t monitor;
222 #else
223 #ifdef WIN32
224  char numNotifies;
225  HANDLE semEvent;
226  HANDLE semMutex;
227 #endif
228 #endif
229  bool isLocked;
230  };
231 
243  public:
251  Lock (Synchronized &s):sync(s) { sync.lock(); }
252 
257  ~Lock() { sync.unlock(); }
258 
259 
269  void wait (long timeout)
270  { if (timeout<0) sync.wait(); else sync.wait(timeout);}
271 
276  void notify () { sync.notify (); }
277 
278  private:
279  Synchronized &sync;
280  };
281 
283 
300  class AGENTPP_DECL Thread : public Synchronized, public Runnable {
301 
302  enum ThreadStatus { IDLE, RUNNING, FINISHED };
303 
304  friend class Synchronized;
305 #ifdef WIN32
306  friend DWORD thread_starter(LPDWORD lpdwParam);
307 #else
308  friend void* thread_starter(void*);
309 #endif
310  public:
314  Thread();
315 
322  Thread(Runnable &r);
323 
328  virtual ~Thread();
329 
337  static void sleep(long millis);
338 
349  static void sleep(long millis, int nanos);
350 
358  virtual void run();
359 
368  Runnable& get_runnable();
369 
373  void join();
374 
379  void start();
380 
388  void set_stack_size(long s) { stackSize = s; }
389 
396  bool is_alive() { return (status == RUNNING); }
397 
402  Thread* clone() { return new Thread(get_runnable()); }
403 
404  private:
405  Runnable* runnable;
406  ThreadStatus status;
407  long stackSize;
408 #ifdef POSIX_THREADS
409  pthread_t tid;
410 #else
411 #if WIN32
412  HANDLE threadHandle;
413  DWORD tid;
414  HANDLE threadEndEvent;
415 #endif
416 #endif
417  static ThreadList threadList;
418  static void nsleep(int secs, long nanos);
419  };
420 
421 
429 #if !defined (AGENTPP_DECL_TEMPL_ARRAY_THREAD)
430 #define AGENTPP_DECL_TEMPL_ARRAY_THREAD
432 #endif
433 
435  public:
437  ~ThreadList() { list.clear(); /* do no delete threads */ }
438 
439  void add(Thread* t) { lock(); list.add(t); unlock(); }
440  void remove(Thread* t) { lock(); list.remove(t); unlock(); }
441  int size() const { return list.size(); }
442  Thread* last() { lock(); Thread* t = list.last(); unlock(); return t; }
443 
444  protected:
446  };
447 
448 
449  class TaskManager;
450 
451 #ifdef AGENTPP_USE_THREAD_POOL
452 
453 #if !defined (AGENTPP_DECL_TEMPL_ARRAY_TASKMANAGER)
454 #define AGENTPP_DECL_TEMPL_ARRAY_TASKMANAGER
456 #endif
457 
465  class AGENTPP_DECL ThreadPool : public Synchronized {
466 
467  protected:
468  Array<TaskManager> taskList;
469  int stackSize;
470  public:
478  ThreadPool(int size = 4);
479 
480 
491  ThreadPool(int size, int stackSize);
492 
496  virtual ~ThreadPool();
497 
502  virtual void execute(Runnable*);
503 
511  bool is_idle();
512 
521  bool is_busy();
522 
528  unsigned int size() { return taskList.size(); }
529 
536  int stack_size() { return stackSize; }
537 
541  virtual void idle_notification() { lock(); notify(); unlock(); }
542 
548  void terminate();
549  };
550 
551 
552 
553 #if !defined (AGENTPP_DECL_TEMPL_LIST_RUNNABLE)
554 #define AGENTPP_DECL_TEMPL_LIST_RUNNABLE
556 #endif
557 
571  class AGENTPP_DECL QueuedThreadPool : public ThreadPool, public Thread {
572 
573  List<Runnable> queue;
574  bool go;
575 
576  public:
584  QueuedThreadPool(int size = 4);
585 
586 
597  QueuedThreadPool(int size, int stackSize);
598 
602  virtual ~QueuedThreadPool();
603 
608  void execute(Runnable*);
609 
616  unsigned int queue_length() { return queue.size(); }
617 
618 
622  void run();
623 
627  void stop() { go = FALSE; }
628 
632  virtual void idle_notification();
633 
634  private:
635  void assign(Runnable* task);
636  };
637 
638 
646  class AGENTPP_DECL TaskManager : public Synchronized, public Runnable {
647  public:
657  TaskManager(ThreadPool*,
658  int stackSize = AGENTPP_DEFAULT_STACKSIZE);
659 
663  virtual ~TaskManager();
664 
665 
673  bool is_idle() { return (!task); }
674 
678  void start() { thread.start(); }
679 
683  void stop() { go = FALSE; }
684 
696  bool set_task(Runnable*);
697 
701  TaskManager* clone()
702  { return new TaskManager(
703  new ThreadPool(threadPool->size(), threadPool->stack_size()));}
704 
705  protected:
706  Thread thread;
707  ThreadPool* threadPool;
708  Runnable* task;
709  void run();
710  bool go;
711  };
712 
713  #endif // AGENTPP_USE_THREAD_POOL
714 
722  class AGENTPP_DECL MibTask: public Runnable {
723  public:
724  MibTask(MibMethodCall* call) { task = call; }
725  virtual ~MibTask() { delete task; }
726 
727  virtual void run();
728  protected:
730  };
731 
732 #ifdef NO_FAST_MUTEXES
733 
742  class AGENTPP_DECL LockRequest: public Synchronized {
743  public:
751  LockRequest(Synchronized*);
752  ~LockRequest();
753 
754  Synchronized* target;
755  };
756 
768 #if !defined (AGENTPP_DECL_TEMPL_LIST_LOCKREQUEST)
769 #define AGENTPP_DECL_TEMPL_LIST_LOCKREQUEST
771 #endif
772 
773  class AGENTPP_DECL LockQueue: public Thread {
774  public:
775  LockQueue();
776  virtual ~LockQueue();
777  virtual void run();
778 
787  void acquire(LockRequest*);
788 
797  void release(LockRequest*);
798 
799  protected:
800  List<LockRequest> pendingLock;
801  List<LockRequest> pendingRelease;
802  bool go;
803  };
804 #endif
805 
806 #endif
807 
816 #ifdef _THREADS
818 #else
820 #endif
821 
822 public:
823 
827  ThreadManager();
828 
832  virtual ~ThreadManager();
833 
837  void start_synch();
841  void end_synch();
842 
846  static void start_global_synch();
850  static void end_global_synch();
851 
852 private:
853 #ifdef _THREADS
854  static Synchronized global_lock;
855 #endif
856 };
857 
858 
860  public:
863  protected:
865 };
866 
868 {
869 public:
871  virtual ~SingleThreadObject();
872 };
873 
874 
875 #ifdef AGENTPP_NAMESPACE
876 }
877 #endif
878 
879 #endif