VTK  9.3.0
vtkThreadedCallbackQueue.h
Go to the documentation of this file.
1 // SPDX-FileCopyrightText: Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
2 // SPDX-License-Identifier: BSD-3-Clause
23 #ifndef vtkThreadedCallbackQueue_h
24 #define vtkThreadedCallbackQueue_h
25 
26 #include "vtkObject.h"
27 #include "vtkParallelCoreModule.h" // For export macro
28 #include "vtkSmartPointer.h" // For vtkSmartPointer
29 
30 #include <atomic> // For atomic_bool
31 #include <condition_variable> // For condition variable
32 #include <deque> // For deque
33 #include <functional> // For greater
34 #include <memory> // For unique_ptr
35 #include <mutex> // For mutex
36 #include <thread> // For thread
37 #include <unordered_map> // For unordered_map
38 #include <unordered_set> // For unordered_set
39 #include <vector> // For vector
40 
41 #if !defined(__WRAP__)
42 
43 VTK_ABI_NAMESPACE_BEGIN
44 
45 class VTKPARALLELCORE_EXPORT vtkThreadedCallbackQueue : public vtkObject
46 {
47 private:
51  template <class FT>
52  struct Signature;
53 
58  template <class T, class DummyT = std::nullptr_t>
59  struct Dereference
60  {
61  struct Type;
62  };
63 
67  template <class T>
68  using DereferencedType = typename std::decay<typename Dereference<T>::Type>::type;
69 
73  template <class FT>
74  using InvokeResult = typename Signature<DereferencedType<FT>>::InvokeResult;
75 
80  class ReturnValueWrapper
81  {
82  class ReturnLValueRef;
83  class ReturnConstLValueRef;
84  };
85 
86 public:
89  void PrintSelf(ostream& os, vtkIndent indent) override;
90 
92 
97 
103  {
104  public:
106 
108  : NumberOfPriorSharedFuturesRemaining(0)
109  , Status(CONSTRUCTING)
110  {
111  }
112 
116  virtual void Wait() const
117  {
118  if (this->Status == READY)
119  {
120  return;
121  }
122  std::unique_lock<std::mutex> lock(this->Mutex);
123  if (this->Status != READY)
124  {
125  this->ConditionVariable.wait(lock, [this] { return this->Status == READY; });
126  }
127  }
128 
130 
131  private:
135  virtual void operator()() = 0;
136 
140  std::atomic_int NumberOfPriorSharedFuturesRemaining;
141 
147  std::atomic_int Status;
148 
154  vtkIdType InvokerIndex;
155 
160  bool IsHighPriority = false;
161 
166  std::vector<vtkSmartPointer<vtkSharedFutureBase>> Dependents;
167 
168  mutable std::mutex Mutex;
169  mutable std::condition_variable ConditionVariable;
170 
171  vtkSharedFutureBase(const vtkSharedFutureBase& other) = delete;
172  void operator=(const vtkSharedFutureBase& other) = delete;
173  };
174 
178  template <class ReturnT>
180  {
181  public:
183 
184  using ReturnLValueRef = typename ReturnValueWrapper<ReturnT>::ReturnLValueRef;
185  using ReturnConstLValueRef = typename ReturnValueWrapper<ReturnT>::ReturnConstLValueRef;
186 
187  vtkSharedFuture() = default;
188 
194 
200 
202 
203  private:
204  ReturnValueWrapper<ReturnT> ReturnValue;
205 
206  vtkSharedFuture(const vtkSharedFuture<ReturnT>& other) = delete;
207  void operator=(const vtkSharedFuture<ReturnT>& other) = delete;
208  };
209 
211  template <class ReturnT>
213 
275  template <class FT, class... ArgsT>
276  SharedFuturePointer<InvokeResult<FT>> Push(FT&& f, ArgsT&&... args);
277 
286  template <class SharedFutureContainerT, class FT, class... ArgsT>
288  SharedFutureContainerT&& priorSharedFutures, FT&& f, ArgsT&&... args);
289 
304  template <class SharedFutureContainerT>
305  void Wait(SharedFutureContainerT&& priorSharedFuture);
306 
308 
316  template <class ReturnT>
318  template <class ReturnT>
320  const SharedFuturePointer<ReturnT>& future);
322 
331  void SetNumberOfThreads(int numberOfThreads);
332 
340  int GetNumberOfThreads() const { return this->NumberOfThreads; }
341 
342 private:
344 
348  template <class FT, class... ArgsT>
349  class vtkInvoker;
351 
352  struct InvokerImpl;
353 
354  template <class FT, class... ArgsT>
355  using InvokerPointer = vtkSmartPointer<vtkInvoker<FT, ArgsT...>>;
356 
357  class ThreadWorker;
358 
359  friend class ThreadWorker;
360 
366  enum Status
367  {
374  CONSTRUCTING = 0x00,
375 
379  ON_HOLD = 0x01,
380 
385  ENQUEUED = 0x02,
386 
390  RUNNING = 0x04,
391 
395  READY = 0x08
396  };
397 
404  void Sync(int startId = 0);
405 
410  void PopFrontNullptr();
411 
417  void SignalDependentSharedFutures(vtkSharedFutureBase* invoker);
418 
424  template <class SharedFutureContainerT, class InvokerT>
425  void HandleDependentInvoker(SharedFutureContainerT&& priorSharedFutures, InvokerT&& invoker);
426 
431  void Invoke(vtkSharedFutureBase* invoker, std::unique_lock<std::mutex>& lock);
432 
437  bool TryInvoke(vtkSharedFutureBase* invoker);
438 
443  template <class FT, class... ArgsT>
444  void PushControl(FT&& f, ArgsT&&... args);
445 
449  template <class SharedFutureContainerT>
450  static bool MustWait(SharedFutureContainerT&& priorSharedFutures);
451 
455  std::deque<SharedFutureBasePointer> InvokerQueue;
456 
460  std::mutex Mutex;
461 
465  std::mutex ControlMutex;
466 
471  std::mutex DestroyMutex;
472 
476  std::mutex ThreadIdToIndexMutex;
477 
478  std::condition_variable ConditionVariable;
479 
484  bool Destroying = false;
485 
489  std::atomic_int NumberOfThreads;
490 
491  std::vector<std::thread> Threads;
492 
500  std::unordered_map<std::thread::id, std::shared_ptr<std::atomic_int>> ThreadIdToIndex;
501 
506  std::unordered_set<SharedFutureBasePointer> ControlFutures;
507 
509  void operator=(const vtkThreadedCallbackQueue&) = delete;
510 };
511 
512 VTK_ABI_NAMESPACE_END
513 
514 #include "vtkThreadedCallbackQueue.txx"
515 
516 #endif
517 #endif
518 // VTK-HeaderTest-Exclude: vtkThreadedCallbackQueue.h
a simple class to control print indentation
Definition: vtkIndent.h:29
abstract base class for most VTK objects
Definition: vtkObjectBase.h:63
abstract base class for most VTK objects
Definition: vtkObject.h:52
Hold a reference to a vtkObjectBase instance.
vtkSharedFutureBase is the base block to store, run, get the returned value of the tasks that are pus...
virtual void Wait() const
Blocks current thread until the task associated with this future has terminated.
vtkBaseTypeMacro(vtkSharedFutureBase, vtkObjectBase)
A vtkSharedFuture is an object returned by the methods Push and PushDependent.
vtkAbstractTypeMacro(vtkSharedFuture< ReturnT >, vtkSharedFutureBase)
ReturnConstLValueRef Get() const
This returns the return value of the pushed function.
ReturnLValueRef Get()
This returns the return value of the pushed function.
typename ReturnValueWrapper< ReturnT >::ReturnLValueRef ReturnLValueRef
typename ReturnValueWrapper< ReturnT >::ReturnConstLValueRef ReturnConstLValueRef
simple threaded callback queue
int GetNumberOfThreads() const
Returns the number of allocated threads.
void PrintSelf(ostream &os, vtkIndent indent) override
Methods invoked by print to print information about the object including superclasses.
vtkSharedFuture< ReturnT >::ReturnConstLValueRef Get(const SharedFuturePointer< ReturnT > &future)
Get the returned value from the task associated with the input future.
SharedFuturePointer< InvokeResult< FT > > Push(FT &&f, ArgsT &&... args)
Pushes a function f to be passed args...
void Wait(SharedFutureContainerT &&priorSharedFuture)
This method blocks the current thread until all the tasks associated with each shared future inside p...
vtkSharedFuture< ReturnT >::ReturnLValueRef Get(SharedFuturePointer< ReturnT > &future)
Get the returned value from the task associated with the input future.
static vtkThreadedCallbackQueue * New()
SharedFuturePointer< InvokeResult< FT > > PushDependent(SharedFutureContainerT &&priorSharedFutures, FT &&f, ArgsT &&... args)
This method behaves the same way Push does, with the addition of a container of futures.
void SetNumberOfThreads(int numberOfThreads)
Sets the number of threads.
~vtkThreadedCallbackQueue() override
Any remaining function that was not executed yet will be executed in this destructor.
@ value
Definition: vtkX3D.h:220
@ type
Definition: vtkX3D.h:516
int vtkIdType
Definition: vtkType.h:315