yarp-devices
CanRxTxThreads.hpp
1 // -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
2 
3 #ifndef __CAN_RX_TH_THREADS_HPP__
4 #define __CAN_RX_TH_THREADS_HPP__
5 
6 #include <mutex>
7 #include <string>
8 #include <unordered_map>
9 #include <vector>
10 
11 #include <yarp/os/Bottle.h>
12 #include <yarp/os/Contactable.h>
13 #include <yarp/os/PortWriterBuffer.h>
14 #include <yarp/os/Stamp.h>
15 #include <yarp/os/Thread.h>
16 
17 #include <yarp/dev/CanBusInterface.h>
18 
19 #include "ICanBusSharer.hpp"
20 
21 namespace roboticslab
22 {
23 
31 class CanReaderWriterThread : public yarp::os::Thread
32 {
33 public:
35  CanReaderWriterThread(const std::string & type, const std::string & id, double delay, unsigned int bufferSize)
36  : iCanBus(nullptr), iCanBufferFactory(nullptr),
37  dumpPort(nullptr), dumpWriter(nullptr), dumpMutex(nullptr), busLoadMonitor(nullptr),
38  bufferSize(bufferSize), delay(delay), type(type), id(id)
39  { }
40 
42  ~CanReaderWriterThread() override = default;
43 
45  bool threadInit() override
46  { canBuffer = iCanBufferFactory->createBuffer(bufferSize); return true; }
47 
49  void threadRelease() override
50  { iCanBufferFactory->destroyBuffer(canBuffer); }
51 
53  void beforeStart() override;
54 
56  void afterStart(bool success) override;
57 
59  void onStop() override;
60 
62  void setCanHandles(yarp::dev::ICanBus * iCanBus, yarp::dev::ICanBufferFactory * iCanBufferFactory)
63  {
64  this->iCanBus = iCanBus; this->iCanBufferFactory = iCanBufferFactory;
65  }
66 
68  void attachDumpWriter(yarp::os::Contactable * dumpPort,
69  yarp::os::PortWriterBuffer<yarp::os::Bottle> * dumpWriter,
70  std::mutex * dumpMutex)
71  {
72  this->dumpPort = dumpPort; this->dumpWriter = dumpWriter; this->dumpMutex = dumpMutex;
73  }
74 
77  { this->busLoadMonitor = busLoadMonitor; }
78 
79 protected:
81  static void dumpMessage(const can_message & msg, yarp::os::Bottle & b);
82 
83  yarp::dev::ICanBus * iCanBus;
84  yarp::dev::ICanBufferFactory * iCanBufferFactory;
85  yarp::dev::CanBuffer canBuffer;
86 
87  yarp::os::Contactable * dumpPort;
88  yarp::os::PortWriterBuffer<yarp::os::Bottle> * dumpWriter;
89  std::mutex * dumpMutex;
90 
91  yarp::os::Stamp lastStamp;
92 
93  ICanMessageNotifier * busLoadMonitor;
94 
95  unsigned int bufferSize;
96  double delay;
97 
98 private:
99  std::string type;
100  std::string id;
101 };
102 
110 {
111 public:
113  CanReaderThread(const std::string & id, double delay, unsigned int bufferSize);
114 
116  void registerHandle(ICanBusSharer * p);
117 
119  const std::vector<ICanBusSharer *> & getHandles() const
120  { return handles; }
121 
123  void attachCanNotifier(ICanMessageNotifier * canMessageNotifier)
124  { this->canMessageNotifier = canMessageNotifier; }
125 
126  void run() override;
127 
128 private:
129  std::vector<ICanBusSharer *> handles;
130  std::unordered_map<unsigned int, ICanBusSharer *> canIdToHandle;
131  ICanMessageNotifier * canMessageNotifier;
132 };
133 
142 {
143 public:
145  CanWriterThread(const std::string & id, double delay, unsigned int bufferSize);
146 
148  ~CanWriterThread() override;
149 
152  { return sender; }
153 
155  void flush();
156 
157  void run() override;
158 
159 private:
161  void handlePartialWrite(unsigned int sent);
162 
163  unsigned int preparedMessages;
164  ICanSenderDelegate * sender;
165  mutable std::mutex bufferMutex;
166 };
167 
168 } // namespace roboticslab
169 
170 #endif // __CAN_RX_TH_THREADS_HPP__
A thread that deals with CAN reads.
Definition: CanRxTxThreads.hpp:110
void attachCanNotifier(ICanMessageNotifier *canMessageNotifier)
Attach custom CAN message responder handle.
Definition: CanRxTxThreads.hpp:123
CanReaderThread(const std::string &id, double delay, unsigned int bufferSize)
Constructor.
Definition: CanRxTxThreads.cpp:51
void registerHandle(ICanBusSharer *p)
Map CAN node ids with handles.
Definition: CanRxTxThreads.cpp:58
const std::vector< ICanBusSharer * > & getHandles() const
Retrieve collection of handles to wrapped CAN devices.
Definition: CanRxTxThreads.hpp:119
Base class for a thread that attends CAN reads or writes.
Definition: CanRxTxThreads.hpp:32
void setCanHandles(yarp::dev::ICanBus *iCanBus, yarp::dev::ICanBufferFactory *iCanBufferFactory)
Configure CAN interface handles.
Definition: CanRxTxThreads.hpp:62
bool threadInit() override
Invoked by the thread right before it is started.
Definition: CanRxTxThreads.hpp:45
void attachBusLoadMonitor(ICanMessageNotifier *busLoadMonitor)
Attach CAN bus load monitor.
Definition: CanRxTxThreads.hpp:76
void beforeStart() override
Invoked by the caller right before the thread is started.
Definition: CanRxTxThreads.cpp:18
void threadRelease() override
Invoked by the thread right after it is started.
Definition: CanRxTxThreads.hpp:49
~CanReaderWriterThread() override=default
Virtual destructor.
CanReaderWriterThread(const std::string &type, const std::string &id, double delay, unsigned int bufferSize)
Constructor.
Definition: CanRxTxThreads.hpp:35
void attachDumpWriter(yarp::os::Contactable *dumpPort, yarp::os::PortWriterBuffer< yarp::os::Bottle > *dumpWriter, std::mutex *dumpMutex)
Attach YARP port writer for CAN message dumping.
Definition: CanRxTxThreads.hpp:68
static void dumpMessage(const can_message &msg, yarp::os::Bottle &b)
Dump a CAN message to a YARP bottle.
Definition: CanRxTxThreads.cpp:39
void onStop() override
Callback on thread stop.
Definition: CanRxTxThreads.cpp:32
void afterStart(bool success) override
Invoked by the caller right before the thread is joined.
Definition: CanRxTxThreads.cpp:25
A thread that attends CAN writes.
Definition: CanRxTxThreads.hpp:142
ICanSenderDelegate * getDelegate()
Retrieve a handle to the CAN sender delegate.
Definition: CanRxTxThreads.hpp:151
void handlePartialWrite(unsigned int sent)
In case a write did not succeed, rearrange the CAN message buffer.
Definition: CanRxTxThreads.cpp:221
~CanWriterThread() override
Destructor.
Definition: CanRxTxThreads.cpp:141
void flush()
Send awaiting messages and clear the queue.
Definition: CanRxTxThreads.cpp:148
CanWriterThread(const std::string &id, double delay, unsigned int bufferSize)
Constructor.
Definition: CanRxTxThreads.cpp:133
Abstract base for a CAN bus sharer.
Definition: ICanBusSharer.hpp:25
Implementation-agnostic consumer for RX CAN transfers.
Definition: ICanMessageNotifier.hpp:22
Implementation-agnostic consumer for TX CAN transfers.
Definition: ICanSenderDelegate.hpp:22
The main, catch-all namespace for Robotics Lab UC3M.
Definition: groups.dox:6
Proxy CAN message structure.
Definition: CanMessage.hpp:20