GDAL
cpl_worker_thread_pool.h
Go to the documentation of this file.
1 /**********************************************************************
2  * $Id$
3  *
4  * Project: CPL - Common Portability Library
5  * Purpose: CPL worker thread pool
6  * Author: Even Rouault, <even dot rouault at spatialys dot com>
7  *
8  **********************************************************************
9  * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a
12  * copy of this software and associated documentation files (the "Software"),
13  * to deal in the Software without restriction, including without limitation
14  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
15  * and/or sell copies of the Software, and to permit persons to whom the
16  * Software is furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included
19  * in all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
24  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
26  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
27  * DEALINGS IN THE SOFTWARE.
28  ****************************************************************************/
29 
30 #ifndef CPL_WORKER_THREAD_POOL_H_INCLUDED_
31 #define CPL_WORKER_THREAD_POOL_H_INCLUDED_
32 
33 #include "cpl_multiproc.h"
34 #include "cpl_list.h"
35 
36 #include <condition_variable>
37 #include <memory>
38 #include <mutex>
39 #include <vector>
40 
48 #ifndef DOXYGEN_SKIP
49 struct CPLWorkerThreadJob;
51 
52 struct CPLWorkerThread
53 {
54  CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThread)
55  CPLWorkerThread() = default;
56 
57  CPLThreadFunc pfnInitFunc = nullptr;
58  void *pInitData = nullptr;
59  CPLWorkerThreadPool *poTP = nullptr;
60  CPLJoinableThread *hThread = nullptr;
61  bool bMarkedAsWaiting = false;
62 
63  std::mutex m_mutex{};
64  std::condition_variable m_cv{};
65 };
66 
67 typedef enum
68 {
69  CPLWTS_OK,
70  CPLWTS_STOP,
71  CPLWTS_ERROR
72 } CPLWorkerThreadState;
73 #endif // ndef DOXYGEN_SKIP
74 
75 class CPLJobQueue;
76 
78 class CPL_DLL CPLWorkerThreadPool
79 {
81 
82  std::vector<std::unique_ptr<CPLWorkerThread>> aWT{};
83  mutable std::mutex m_mutex{};
84  std::condition_variable m_cv{};
85  volatile CPLWorkerThreadState eState = CPLWTS_OK;
86  CPLList *psJobQueue = nullptr;
87  int nPendingJobs = 0;
88 
89  CPLList *psWaitingWorkerThreadsList = nullptr;
90  int nWaitingWorkerThreads = 0;
91 
92  int m_nMaxThreads = 0;
93 
94  static void WorkerThreadFunction(void *user_data);
95 
96  void DeclareJobFinished();
97  CPLWorkerThreadJob *GetNextJob(CPLWorkerThread *psWorkerThread);
98 
99  public:
102 
103  bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData);
104  bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData,
105  bool bWaitallStarted);
106 
107  std::unique_ptr<CPLJobQueue> CreateJobQueue();
108 
109  bool SubmitJob(CPLThreadFunc pfnFunc, void *pData);
110  bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector<void *> &apData);
111  void WaitCompletion(int nMaxRemainingJobs = 0);
112  void WaitEvent();
113 
115  int GetThreadCount() const;
116 };
117 
119 class CPL_DLL CPLJobQueue
120 {
122  CPLWorkerThreadPool *m_poPool = nullptr;
123  std::mutex m_mutex{};
124  std::condition_variable m_cv{};
125  int m_nPendingJobs = 0;
126 
127  static void JobQueueFunction(void *);
128  void DeclareJobFinished();
129 
131  protected:
132  friend class CPLWorkerThreadPool;
133  explicit CPLJobQueue(CPLWorkerThreadPool *poPool);
135 
136  public:
137  ~CPLJobQueue();
138 
141  {
142  return m_poPool;
143  }
144 
145  bool SubmitJob(CPLThreadFunc pfnFunc, void *pData);
146  void WaitCompletion(int nMaxRemainingJobs = 0);
147 };
148 
149 #endif // CPL_WORKER_THREAD_POOL_H_INCLUDED_
Job queue.
Definition: cpl_worker_thread_pool.h:120
CPLWorkerThreadPool * GetPool()
Return the owning worker thread pool.
Definition: cpl_worker_thread_pool.h:140
Pool of worker threads.
Definition: cpl_worker_thread_pool.h:79
bool SubmitJob(CPLThreadFunc pfnFunc, void *pData)
Queue a new job.
Definition: cpl_worker_thread_pool.cpp:141
void WaitCompletion(int nMaxRemainingJobs=0)
Wait for completion of part or whole jobs.
Definition: cpl_worker_thread_pool.cpp:412
Simplest list implementation.
#define CPL_DISALLOW_COPY_ASSIGN(ClassName)
Helper to remove the copy and assignment constructors so that the compiler will not generate the defa...
Definition: cpl_port.h:1042
List element structure.
Definition: cpl_list.h:52