IRSOL
C++ code implementing socket server for interacting with Baumer camera.
collector.cpp
Go to the documentation of this file.
2
3#include "irsol/macros.hpp"
4
5#include <algorithm>
6#include <chrono>
7#include <iostream>
8#include <set>
9
10namespace irsol {
11namespace server {
12
13namespace frame_collector {
14
15std::shared_ptr<FrameCollector::frame_queue_t>
17{
18 return std::make_shared<FrameCollector::frame_queue_t>();
19}
20
25
30
31void
33{
34 if(m_stop.load()) {
36 "frame_collector", "Collector was requested to stop already. Ignoring re-start request");
37 return;
38 }
39 m_distributorThread = std::thread(&FrameCollector::run, this);
40}
41
42void
44{
45 m_stop.store(true);
46 m_scheduleCondition.notify_all();
47 if(m_distributorThread.joinable())
49}
50
51bool
53{
54 return !m_clients.empty();
55}
56
57void
60 double fps,
61 std::shared_ptr<FrameCollector::frame_queue_t> queue,
62 int64_t frameCount)
63{
64
65 std::scoped_lock<std::mutex> lock(m_clientsMutex);
66
67 std::chrono::microseconds interval;
68 bool immediate = false;
69 if(frameCount == 1 && fps <= 0.0) {
70 // Client is requesting a single frame immediately
71 interval = std::chrono::microseconds(1);
72 immediate = true;
73 frameCount = 1;
74 fps = 0.0;
75 } else {
76 interval = std::chrono::microseconds(static_cast<uint64_t>(1000000.0 / fps));
78 "frame_collector",
79 "registering client with interval of {}",
81 }
82
83 // Registers the client so that the next due time is in SLACK ms.
84 // This is to allow the collector thread to batch multiple clients, that desire a frame at a
85 // specific timestamp, but all within the SLACK duration and to serve them all with the same frame
86 // image.
87 auto nextDue = irsol::types::clock_t::now() + FrameCollector::SLACK;
88
90 "frame_collector",
91 "Registering client {} with frame rate {} fps (interval {}), "
92 "next frame due at {} #frames {}, immediate {}",
93 clientId,
94 fps,
97 frameCount,
98 immediate);
99
100 m_clients.emplace(
101 clientId, ClientCollectionParams(fps, interval, nextDue, queue, frameCount, immediate));
102 schedule(clientId, nextDue);
103}
104
105void
107{
108 std::scoped_lock<std::mutex> lock(m_clientsMutex);
110}
111
112void
114{
115 std::unique_lock<std::mutex> lock(m_clientsMutex);
116
117 while(!m_stop) {
118 if(m_scheduleMap.empty()) {
119 // Wait until at least one new client is registered in the map, or if a stop request has
120 // arrived.
121 m_scheduleCondition.wait(lock, [this]() {
123 "frame_collector",
124 "Waiting until a client is scheduled (clients size: {}, schedule size: {})",
125 m_clients.size(),
126 m_scheduleMap.size());
127 return m_stop || !m_scheduleMap.empty();
128 });
129 }
130
131 if(m_stop.load()) {
132 IRSOL_NAMED_LOG_INFO("frame_collector", "Requested frame-collection stop");
133 break;
134 }
135
136 // Retrieve the nextDue time from the schedule map.
137 // This map is always sorted from small to high, as it's an ordered container.
138 irsol::types::timepoint_t nextDue = m_scheduleMap.begin()->first;
140 "frame_collector",
141 "Running for frame collection due at {}",
143
144 // Wait at most until the `nextDue` time.
145 // Allow for early break in case of:
146 // - m_stop is set to true due to a stop-request
147 // - a new client is registered with a nextDue time that is smaller than the current nextDue
148 // time
149 m_scheduleCondition.wait_until(lock, nextDue, [this, currentNextDue = nextDue]() {
150 if(m_stop.load()) {
151 // stopped externally, exit the wait
152 return true;
153 }
154 if(m_scheduleMap.empty()) {
155 // Don't wake up early unnecessarily, as there's no schedules in the map
156 return false;
157 }
158
159 // Check if a new schedule has been inserted into the map, which happens
160 // earlier than the due time we captured prior to sleeping.
161 auto newNextDue = m_scheduleMap.begin()->first;
162
164 "frame_collector",
165 "Current first timestamp in the map: {}, current nextDue: {}",
167 irsol::utils::timestampToString(currentNextDue));
168 return newNextDue < currentNextDue;
169 });
170
171 if(m_stop.load()) {
173 "frame_collector", "Frame collection stop request received, breaking loop");
174 return;
175 }
176
177 // Refresh the nextDue, as the above condition might have finished due to a new client being
178 // registered earlier than the 'nextDue' time that was initially selected.
179 nextDue = m_scheduleMap.begin()->first;
180
181 // Clients due now or earlier
182 irsol::types::timepoint_t now = irsol::types::clock_t::now();
184 "frame_collector",
185 "Woken up at timestamp {}, with next due at {}",
188
189 const auto slack = m_clients.size() == 1 ? std::chrono::milliseconds(0) : FrameCollector::SLACK;
190 auto [readyClients, clientsSchedules] = collectReadyClients(now, slack);
192 "frame_collector", "Found {} clients that need an image now!", readyClients.size());
193 {
194 auto uniqueSchedules =
195 std::set<irsol::types::timepoint_t>(clientsSchedules.begin(), clientsSchedules.end());
196 for(auto uniqueSchedule : uniqueSchedules) {
197 auto numClientsWithSchedule = std::count_if(
198 clientsSchedules.begin(),
199 clientsSchedules.end(),
200 [&uniqueSchedule](const auto& schedule) { return schedule == uniqueSchedule; });
201
202 if(uniqueSchedule != nextDue) {
204 "frame_collector_slack",
205 "Actual schedule: {}, also considering {} clients with schedule {} due to allowed "
206 "slack of {}",
208 numClientsWithSchedule,
209 irsol::utils::timestampToString(uniqueSchedule),
211 }
212 }
213 }
214 cleanUpSchedule(clientsSchedules);
216 "frame_collector",
217 "After schedule cleanup, there are still {} schedules",
218 m_scheduleMap.size());
219
220 auto grabResult = grabImageData();
221 if(!grabResult) {
222 IRSOL_NAMED_LOG_WARN("frame_collector", "Image acquisition failed.");
223 continue;
224 }
225 auto& [frameMetadata, imageRawBuffer] = *grabResult;
226
227 // Deliver the frame to clients
228 std::vector<irsol::types::client_id_t> finishedClient;
229 for(auto clientId : readyClients) {
230 IRSOL_NAMED_LOG_DEBUG("frame_collector", "Notifying client {} for new image data", clientId);
231 auto& clientParams = m_clients.at(clientId);
232 // Push a new frame created on the fly to the current client's queue.
233 // This creates a copy of the image data into the queue, and this is wanted, so that if a
234 // consumer modifies the image, it doesn't affect other clients.
235 clientParams.queue->push(std::make_unique<Frame>(
236 frameMetadata,
238 {imageRawBuffer.begin(), imageRawBuffer.end()},
239 {frameMetadata.height, frameMetadata.width},
240 {irsol::protocol::BinaryDataAttribute("imageId", static_cast<int>(frameMetadata.frameId)),
242 "timestamp", irsol::utils::timestampToString(frameMetadata.timestamp))})));
243
244 // Try to schedule the client, if no longer needed, register it in the finishedClients
245 if(!schedule(clientId, clientParams.nextFrameDue + clientParams.interval)) {
246 finishedClient.push_back(clientId);
247 }
248 }
249
250 // Unlock the clients so we can remove finished clients
251 for(const auto& clientId : finishedClient) {
253 "frame_collector",
254 "Deregistering client {}, as it has consumed all the frames it needed.",
255 clientId);
257 }
258
259 IRSOL_NAMED_LOG_DEBUG("frame_collector", "Loop finished, restarting loop");
260 }
261}
262
263void
265{
266 // Mark the client's queue as finished, so the client is notified that
267 // no more data will be pushed to him
268 IRSOL_NAMED_LOG_INFO("frame_collector", "Deregistering client {}", clientId);
269
270 if(m_clients.find(clientId) == m_clients.end()) {
272 "frame_collector", "Client {} was already deregistered, ignoring request.", clientId);
273 return;
274 }
275 auto& clientParams = m_clients.at(clientId);
276 clientParams.queue->producerFinished();
277
278 // Removes the client from the storage
279 auto dueNext = clientParams.nextFrameDue;
280
281 m_clients.erase(clientId);
283 "frame_collector", "Removed client {}, now remaining {} clients", clientId, m_clients.size());
284
285 if(m_scheduleMap.find(dueNext) == m_scheduleMap.end()) {
287 "frame_collector",
288 "Scheduled time {} was already deregistered. Ignoring request.",
290 return;
291 }
292 auto& clientsAtDue = m_scheduleMap.at(dueNext);
293
295 "frame_collector",
296 "Deregistering client {} from next schedule at {}",
297 clientId,
299 clientsAtDue.erase(
300 std::remove(clientsAtDue.begin(), clientsAtDue.end(), clientId), clientsAtDue.end());
301
302 if(clientsAtDue.empty()) {
304 "frame_collector",
305 "No more clients for schedule {}. Removing schedule.",
307 m_scheduleMap.erase(dueNext);
308 }
309
310 m_scheduleCondition.notify_one();
311}
312
313std::pair<std::vector<irsol::types::client_id_t>, std::vector<irsol::types::timepoint_t>>
315 const
316{
318 "frame_collector",
319 "Collecting clients for time {}, with slack of {}",
322 std::vector<irsol::types::client_id_t> readyClients;
323 std::vector<irsol::types::timepoint_t> clientsSchedule;
324
325 for(const auto& [scheduleTime, clients] : m_scheduleMap) {
326 // As soon as the scheduleTime of the client is above now+slack, break the loop.
327 // We can break early, as m_scheduleMap is an ordered container, that is iterated over
328 // in a way that smaller keys (scheduledTimes) are always iterated over at the beginning.
329 if(scheduleTime > now + slack) {
330 break;
331 }
332 for(const auto& client : clients) {
333 readyClients.push_back(client);
334 clientsSchedule.push_back(scheduleTime);
335 }
336 }
337
338 return {readyClients, clientsSchedule};
339}
340
341void
342FrameCollector::cleanUpSchedule(const std::vector<irsol::types::timepoint_t> schedules)
343{
344 // Erase all the schedules in the input vector
345 for(auto schedule : schedules) {
346 m_scheduleMap.erase(schedule);
347 }
348}
349
350std::optional<std::pair<FrameMetadata, std::vector<irsol::types::byte_t>>>
352{
353 // Capture the frame just-in-time
354 IRSOL_MAYBE_UNUSED auto t0 = irsol::types::clock_t::now();
355 auto image = m_cam.captureImage();
356 IRSOL_MAYBE_UNUSED auto t1 = irsol::types::clock_t::now();
358 "frame_collector",
359 "Capture image: start: {}, stop: {}, duration: {}",
363
364 // Extract the image data from the image buffer, and copy it into an
365 // owning structure. In this way, when `image` is destroyed at the end of the
366 // execution of this function, it can return into the pool of NeoAPI::Images
367 // for next frames to be written to the buffer.
368 auto* imageData = image.GetImageData();
369 auto numBytes = image.GetSize();
370
371 if(numBytes == 0) {
372 return std::nullopt;
373 }
374
375 std::vector<irsol::types::byte_t> rawData(numBytes);
376 std::memcpy(rawData.data(), imageData, numBytes);
377
378 return std::make_pair<FrameMetadata, std::vector<irsol::types::byte_t>>(
379 {irsol::types::clock_t::now(), image.GetImageID(), image.GetHeight(), image.GetWidth()},
380 std::move(rawData));
381}
382
383bool
385 const irsol::types::client_id_t& clientId,
386 irsol::types::timepoint_t nextFrameDue)
387{
389 m_clients.find(clientId) != m_clients.end(), "Impossible to schedule an unregistered client.");
390 // Update the parameters of the client.
391 auto& clientParams = m_clients.at(clientId);
392 if(clientParams.remainingFrames-- == 0 && clientParams.remainingFrames < 0) {
393 // Client no longer expects frames.
394 // This handles also clients that are listening forever, as their 'remainingFrames' is negative
395 // so this¨ condition is never fully met.
397 "frame_collector", "Client {} had no longer frames to produce.", clientId);
398 return false;
399 }
400
401 if(clientParams.nextFrameDue == nextFrameDue) {
402 // This has been called the first time for client-registration
404 "frame_collector",
405 "Client {} has been scheduled for timestamp {}.",
406 clientId,
407 irsol::utils::timestampToString(clientParams.nextFrameDue));
408 } else {
409
411 "frame_collector",
412 "(Rescheduling client {} for next frame, previous due: {}, next due {}, # count {}",
413 clientId,
414 irsol::utils::timestampToString(clientParams.nextFrameDue),
416 clientParams.remainingFrames);
417 }
418
419 // Updates the parameters of the client
420 clientParams.nextFrameDue = nextFrameDue;
421
422 // Registers the client for the scheduled timestamp.
423 m_scheduleMap[nextFrameDue].push_back(clientId);
424
425 // Notify the condition variable, that a new client has been scheduled
426 m_scheduleCondition.notify_one();
427 return true;
428}
429
430} // namespace frame_collector
431} // namespace server
432} // namespace irsol
High-level wrapper around the NeoAPI camera for synchronized access.
Definition interface.hpp:61
image_t captureImage(std::optional< irsol::types::duration_t > timeout=std::nullopt)
Capture a single image from the camera.
std::atomic< bool > m_stop
Indicates whether the collector is stopping due to an external request.
void run()
Runs the frame distribution loop in a background thread.
void deregisterClient(irsol::types::client_id_t clientId)
Deregisters a client and stops frame delivery.
void start()
Starts the frame collection and distribution thread.
Definition collector.cpp:32
void deregisterClientNonThreadSafe(irsol::types::client_id_t clientId)
Deregisters a client and stops frame delivery (not thread-safe).
std::thread m_distributorThread
Thread responsible for frame distribution.
std::pair< std::vector< irsol::types::client_id_t >, std::vector< irsol::types::timepoint_t > > collectReadyClients(irsol::types::timepoint_t now, irsol::types::duration_t slack) const
Collects clients who are scheduled to receive a frame at the given time.
std::map< irsol::types::timepoint_t, std::vector< irsol::types::client_id_t > > m_scheduleMap
Maps timestamps to client IDs scheduled at that time.
~FrameCollector()
Destructor. Stops any running threads and cleans up resources.
Definition collector.cpp:26
std::condition_variable m_scheduleCondition
Signals when a new client is scheduled.
void registerClient(irsol::types::client_id_t clientId, double fps, std::shared_ptr< frame_queue_t > queue, int64_t frameCount=-1)
Registers a client to receive frames at a specified frame rate.
Definition collector.cpp:58
irsol::camera::Interface & m_cam
Reference to the camera interface used for capturing.
FrameCollector(irsol::camera::Interface &camera)
Constructs a FrameCollector for the given camera interface.
Definition collector.cpp:21
bool schedule(const irsol::types::client_id_t &clientId, irsol::types::timepoint_t nextFrameDue)
Schedules the next frame delivery for a client.
bool isBusy() const
Checks whether the collector is currently serving any clients.
Definition collector.cpp:52
std::unordered_map< irsol::types::client_id_t, ClientCollectionParams > m_clients
Stores parameters for each registered client.
static constexpr irsol::types::duration_t SLACK
Slack window for batching frame delivery to clients.
Definition collector.hpp:91
std::optional< std::pair< FrameMetadata, std::vector< irsol::types::byte_t > > > grabImageData() const
Captures an image and returns it along with associated metadata.
void stop()
Stops the frame collector and joins worker threads.
Definition collector.cpp:43
std::mutex m_clientsMutex
Protects access to m_clients and m_scheduleMap.
static std::shared_ptr< frame_queue_t > makeQueuePtr()
Utility static function to create a shared pointer to a frame queue.
Definition collector.cpp:16
void cleanUpSchedule(const std::vector< irsol::types::timepoint_t > schedules)
Removes schedules from the schedule map.
#define IRSOL_ASSERT_ERROR
Error-level assertion macro.
Definition assert.hpp:134
#define IRSOL_NAMED_LOG_INFO(name,...)
Logs an info-level message using a named logger.
Definition logging.hpp:176
#define IRSOL_NAMED_LOG_DEBUG(name,...)
Logs a debug-level message using a named logger.
Definition logging.hpp:174
#define IRSOL_NAMED_LOG_WARN(name,...)
Logs a warning-level message using a named logger.
Definition logging.hpp:178
Common portability and diagnostic macros for the irsol library.
#define IRSOL_MAYBE_UNUSED
Suppresses compiler warnings about unused variables or parameters.
Definition macros.hpp:39
clock_t::duration duration_t
Alias for a duration of time as defined by clock_t.
Definition types.hpp:128
std::string client_id_t
Represents a unique client identifier. Typically used to identify connected clients by string IDs.
Definition types.hpp:55
clock_t::time_point timepoint_t
Alias for a point in time as defined by clock_t.
Definition types.hpp:120
std::string timestampToString(irsol::types::timepoint_t tp)
Converts a steady_clock time point to a human-readable string.
Definition utils.cpp:111
std::string durationToString(irsol::types::duration_t dr)
Converts a duration to a human-readable string.
Definition utils.cpp:133
Represents a single binary data attribute within the protocol.
Definition binary.hpp:35
Represents a binary data object within the protocol.
Definition binary.hpp:107