13namespace frame_collector {
15std::shared_ptr<FrameCollector::frame_queue_t>
18 return std::make_shared<FrameCollector::frame_queue_t>();
36 "frame_collector",
"Collector was requested to stop already. Ignoring re-start request");
61 std::shared_ptr<FrameCollector::frame_queue_t> queue,
67 std::chrono::microseconds interval;
68 bool immediate =
false;
69 if(frameCount == 1 && fps <= 0.0) {
71 interval = std::chrono::microseconds(1);
76 interval = std::chrono::microseconds(
static_cast<uint64_t
>(1000000.0 / fps));
79 "registering client with interval of {}",
91 "Registering client {} with frame rate {} fps (interval {}), "
92 "next frame due at {} #frames {}, immediate {}",
124 "Waiting until a client is scheduled (clients size: {}, schedule size: {})",
141 "Running for frame collection due at {}",
165 "Current first timestamp in the map: {}, current nextDue: {}",
168 return newNextDue < currentNextDue;
173 "frame_collector",
"Frame collection stop request received, breaking loop");
185 "Woken up at timestamp {}, with next due at {}",
192 "frame_collector",
"Found {} clients that need an image now!", readyClients.size());
194 auto uniqueSchedules =
195 std::set<irsol::types::timepoint_t>(clientsSchedules.begin(), clientsSchedules.end());
196 for(
auto uniqueSchedule : uniqueSchedules) {
197 auto numClientsWithSchedule = std::count_if(
198 clientsSchedules.begin(),
199 clientsSchedules.end(),
200 [&uniqueSchedule](
const auto&
schedule) { return schedule == uniqueSchedule; });
202 if(uniqueSchedule != nextDue) {
204 "frame_collector_slack",
205 "Actual schedule: {}, also considering {} clients with schedule {} due to allowed "
208 numClientsWithSchedule,
217 "After schedule cleanup, there are still {} schedules",
225 auto& [frameMetadata, imageRawBuffer] = *grabResult;
228 std::vector<irsol::types::client_id_t> finishedClient;
229 for(
auto clientId : readyClients) {
231 auto& clientParams =
m_clients.at(clientId);
235 clientParams.queue->push(std::make_unique<Frame>(
238 {imageRawBuffer.begin(), imageRawBuffer.end()},
239 {frameMetadata.height, frameMetadata.width},
245 if(!
schedule(clientId, clientParams.nextFrameDue + clientParams.interval)) {
246 finishedClient.push_back(clientId);
251 for(
const auto& clientId : finishedClient) {
254 "Deregistering client {}, as it has consumed all the frames it needed.",
272 "frame_collector",
"Client {} was already deregistered, ignoring request.", clientId);
275 auto& clientParams =
m_clients.at(clientId);
276 clientParams.queue->producerFinished();
279 auto dueNext = clientParams.nextFrameDue;
283 "frame_collector",
"Removed client {}, now remaining {} clients", clientId,
m_clients.size());
288 "Scheduled time {} was already deregistered. Ignoring request.",
296 "Deregistering client {} from next schedule at {}",
300 std::remove(clientsAtDue.begin(), clientsAtDue.end(), clientId), clientsAtDue.end());
302 if(clientsAtDue.empty()) {
305 "No more clients for schedule {}. Removing schedule.",
313std::pair<std::vector<irsol::types::client_id_t>, std::vector<irsol::types::timepoint_t>>
319 "Collecting clients for time {}, with slack of {}",
322 std::vector<irsol::types::client_id_t> readyClients;
323 std::vector<irsol::types::timepoint_t> clientsSchedule;
329 if(scheduleTime > now + slack) {
332 for(
const auto& client : clients) {
333 readyClients.push_back(client);
334 clientsSchedule.push_back(scheduleTime);
338 return {readyClients, clientsSchedule};
350std::optional<std::pair<FrameMetadata, std::vector<irsol::types::byte_t>>>
359 "Capture image: start: {}, stop: {}, duration: {}",
368 auto* imageData = image.GetImageData();
369 auto numBytes = image.GetSize();
375 std::vector<irsol::types::byte_t> rawData(numBytes);
376 std::memcpy(rawData.data(), imageData, numBytes);
378 return std::make_pair<FrameMetadata, std::vector<irsol::types::byte_t>>(
379 {irsol::types::clock_t::now(), image.GetImageID(), image.GetHeight(), image.GetWidth()},
389 m_clients.find(clientId) !=
m_clients.end(),
"Impossible to schedule an unregistered client.");
391 auto& clientParams =
m_clients.at(clientId);
392 if(clientParams.remainingFrames-- == 0 && clientParams.remainingFrames < 0) {
397 "frame_collector",
"Client {} had no longer frames to produce.", clientId);
401 if(clientParams.nextFrameDue == nextFrameDue) {
405 "Client {} has been scheduled for timestamp {}.",
412 "(Rescheduling client {} for next frame, previous due: {}, next due {}, # count {}",
416 clientParams.remainingFrames);
420 clientParams.nextFrameDue = nextFrameDue;
High-level wrapper around the NeoAPI camera for synchronized access.
image_t captureImage(std::optional< irsol::types::duration_t > timeout=std::nullopt)
Capture a single image from the camera.
std::atomic< bool > m_stop
Indicates whether the collector is stopping due to an external request.
void run()
Runs the frame distribution loop in a background thread.
void deregisterClient(irsol::types::client_id_t clientId)
Deregisters a client and stops frame delivery.
void start()
Starts the frame collection and distribution thread.
void deregisterClientNonThreadSafe(irsol::types::client_id_t clientId)
Deregisters a client and stops frame delivery (not thread-safe).
std::thread m_distributorThread
Thread responsible for frame distribution.
std::pair< std::vector< irsol::types::client_id_t >, std::vector< irsol::types::timepoint_t > > collectReadyClients(irsol::types::timepoint_t now, irsol::types::duration_t slack) const
Collects clients who are scheduled to receive a frame at the given time.
std::map< irsol::types::timepoint_t, std::vector< irsol::types::client_id_t > > m_scheduleMap
Maps timestamps to client IDs scheduled at that time.
~FrameCollector()
Destructor. Stops any running threads and cleans up resources.
std::condition_variable m_scheduleCondition
Signals when a new client is scheduled.
void registerClient(irsol::types::client_id_t clientId, double fps, std::shared_ptr< frame_queue_t > queue, int64_t frameCount=-1)
Registers a client to receive frames at a specified frame rate.
irsol::camera::Interface & m_cam
Reference to the camera interface used for capturing.
FrameCollector(irsol::camera::Interface &camera)
Constructs a FrameCollector for the given camera interface.
bool schedule(const irsol::types::client_id_t &clientId, irsol::types::timepoint_t nextFrameDue)
Schedules the next frame delivery for a client.
bool isBusy() const
Checks whether the collector is currently serving any clients.
std::unordered_map< irsol::types::client_id_t, ClientCollectionParams > m_clients
Stores parameters for each registered client.
static constexpr irsol::types::duration_t SLACK
Slack window for batching frame delivery to clients.
std::optional< std::pair< FrameMetadata, std::vector< irsol::types::byte_t > > > grabImageData() const
Captures an image and returns it along with associated metadata.
void stop()
Stops the frame collector and joins worker threads.
std::mutex m_clientsMutex
Protects access to m_clients and m_scheduleMap.
static std::shared_ptr< frame_queue_t > makeQueuePtr()
Utility static function to create a shared pointer to a frame queue.
void cleanUpSchedule(const std::vector< irsol::types::timepoint_t > schedules)
Removes schedules from the schedule map.
#define IRSOL_ASSERT_ERROR
Error-level assertion macro.
#define IRSOL_NAMED_LOG_INFO(name,...)
Logs an info-level message using a named logger.
#define IRSOL_NAMED_LOG_DEBUG(name,...)
Logs a debug-level message using a named logger.
#define IRSOL_NAMED_LOG_WARN(name,...)
Logs a warning-level message using a named logger.
Common portability and diagnostic macros for the irsol library.
#define IRSOL_MAYBE_UNUSED
Suppresses compiler warnings about unused variables or parameters.
clock_t::duration duration_t
Alias for a duration of time as defined by clock_t.
std::string client_id_t
Represents a unique client identifier. Typically used to identify connected clients by string IDs.
clock_t::time_point timepoint_t
Alias for a point in time as defined by clock_t.
std::string timestampToString(irsol::types::timepoint_t tp)
Converts a steady_clock time point to a human-readable string.
std::string durationToString(irsol::types::duration_t dr)
Converts a duration to a human-readable string.
Represents a single binary data attribute within the protocol.
Represents a binary data object within the protocol.