40 #include <sys/socket.h>
45 #include "boost/scoped_array.hpp"
54 #include "activemq/core/ActiveMQConnectionFactory.h"
56 namespace pexExceptions = lsst::pex::exceptions;
57 namespace pexLogging = lsst::pex::logging;
58 namespace dafBase = lsst::daf::base;
62 using std::numeric_limits;
80 const std::string Event::TYPE =
"TYPE";
81 const std::string Event::EVENTTIME =
"EVENTTIME";
82 const std::string Event::HOSTID =
"HOSTID";
83 const std::string Event::RUNID =
"RUNID";
84 const std::string Event::STATUS =
"STATUS";
85 const std::string Event::TOPIC =
"TOPIC";
86 const std::string Event::PUBTIME =
"PUBTIME";
88 const std::string Event::UNINITIALIZED =
"uninitialized";
95 _keywords.insert(TYPE);
96 _keywords.insert(EVENTTIME);
97 _keywords.insert(HOSTID);
98 _keywords.insert(RUNID);
99 _keywords.insert(STATUS);
100 _keywords.insert(TOPIC);
101 _keywords.insert(PUBTIME);
104 Event::Event(cms::TextMessage *msg) {
107 _psp = processTextMessage(msg);
109 _psp->set(TYPE, msg->getStringProperty(TYPE));
110 _psp->set(HOSTID, msg->getStringProperty(HOSTID));
111 _psp->set(RUNID, msg->getStringProperty(RUNID));
112 _psp->set(STATUS, msg->getStringProperty(STATUS));
113 _psp->set(TOPIC, msg->getStringProperty(TOPIC));
114 _psp->set(EVENTTIME, msg->getLongProperty(EVENTTIME));
115 _psp->set(PUBTIME, msg->getLongProperty(PUBTIME));
120 vector<std::string> Event::getFilterablePropertyNames() {
121 vector<std::string> _names;
122 set<std::string>::iterator keyIterator;
123 for (keyIterator = _keywords.begin(); keyIterator != _keywords.end(); keyIterator++) {
124 _names.push_back(*keyIterator);
129 vector<std::string> Event::getCustomPropertyNames() {
130 vector<std::string> names = _psp->names();
132 vector<std::string>::iterator nameIterator;
133 set<std::string>::iterator keyIterator;
135 for (nameIterator = names.begin(); nameIterator != names.end();) {
136 keyIterator = _keywords.find(*nameIterator);
137 if (keyIterator == _keywords.end())
140 names.erase(nameIterator);
146 _constructor(runId, *psp);
150 _constructor(runId, ps);
153 void Event::_constructor(
const std::string& runId,
const PropertySet& ps) {
154 long int host_len = sysconf(_SC_HOST_NAME_MAX);
156 boost::scoped_array<char> hostname(
new char[host_len]);
172 if (!_psp->exists(STATUS)) {
173 _psp->set(STATUS,
"unknown");
176 if (!_psp->exists(EVENTTIME)) {
181 if (!_psp->exists(HOSTID)) {
183 gethostname(hostname.get(), host_len);
184 name = hostname.get();
185 _psp->set(HOSTID, name);
189 _psp->set(RUNID, runId);
192 _psp->set(TYPE, EventTypes::EVENT);
195 _psp->set(TOPIC, Event::UNINITIALIZED);
198 _psp->set(PUBTIME, (
long long)0);
201 void Event::populateHeader(cms::TextMessage* msg)
const {
202 msg->setStringProperty(TYPE, _psp->get<std::string>(TYPE));
203 msg->setLongProperty(EVENTTIME, _psp->get<
long long>(EVENTTIME));
204 msg->setStringProperty(HOSTID, _psp->get<std::string>(HOSTID));
205 msg->setStringProperty(RUNID, _psp->get<std::string>(RUNID));
206 msg->setStringProperty(STATUS, _psp->get<std::string>(STATUS));
207 msg->setStringProperty(TOPIC, _psp->get<std::string>(TOPIC));
208 msg->setLongProperty(PUBTIME, _psp->get<
long long>(PUBTIME));
211 long long Event::getEventTime() {
212 return _psp->get<
long long>(EVENTTIME);
215 void Event::setEventTime(
long long nsecs) {
216 _psp->set(EVENTTIME, nsecs);
219 void Event::updateEventTime() {
226 std::string Event::getEventDate() {
227 long long eventTime = _psp->get<
long long>(EVENTTIME);
230 struct tm gmTime = dateTime.gmtime();
231 return asctime(&gmTime);
238 set<std::string>::iterator keyIterator;
239 for (keyIterator = _keywords.begin(); keyIterator != _keywords.end(); keyIterator++)
240 psp->remove(*keyIterator);
253 void Event::setPubTime(
long long t) {
254 _psp->set(PUBTIME, t);
257 long long Event::getPubTime() {
258 return _psp->get<
long long>(PUBTIME);
264 std::string Event::getPubDate() {
265 long long _pubTime = _psp->get<
long long>(PUBTIME);
267 return std::string();
271 struct tm pubTime = dateTime.
gmtime();
272 return asctime(&pubTime);
276 std::string Event::getHostId() {
277 return _psp->get<std::string>(HOSTID);
280 std::string Event::getRunId() {
281 return _psp->get<std::string>(RUNID);
284 std::string Event::getType() {
285 return _psp->get<std::string>(TYPE);
288 std::string Event::getStatus() {
289 return _psp->get<std::string>(STATUS);
292 void Event::setStatus(std::string
status) {
293 return _psp->set(STATUS, status);
296 void Event::setTopic(std::string topic) {
297 _psp->set(TOPIC, topic);
300 std::string Event::getTopic() {
301 return _psp->get<std::string>(TOPIC);
304 void Event::marshall(cms::TextMessage *msg) {
308 psp = getCustomPropertySet();
309 std::string payload = marshall(*psp);
310 msg->setText(payload);
315 std::vector<std::string> v = ps.
paramNames(
false);
318 std::ostringstream payload;
320 payload <<
"nodelist||nodelist||" << (v.size()) <<
"~~";
321 for (i = 0; i < v.size(); i++) {
322 std::string name = v[i];
323 if (ps.
typeOf(name) ==
typeid(bool)) {
324 std::vector<bool> vec = ps.
getArray<
bool>(name);
325 std::vector<bool>::iterator
iter;
326 for (iter = vec.begin(); iter != vec.end(); iter++) {
327 payload <<
"bool||"<< name <<
"||" << *iter <<
"~~";
329 }
else if (ps.
typeOf(name) ==
typeid(long)) {
330 std::vector<long> vec = ps.
getArray<
long>(name);
331 std::vector<long>::iterator
iter;
332 for (iter = vec.begin(); iter != vec.end(); iter++) {
333 payload <<
"long||" << name <<
"||"<< *iter <<
"~~";
335 }
else if (ps.
typeOf(name) ==
typeid(int)) {
336 std::vector<int> vec = ps.
getArray<
int>(name);
337 std::vector<int>::iterator
iter;
338 for (iter = vec.begin(); iter != vec.end(); iter++) {
339 payload <<
"int||" << name <<
"||"<< *iter <<
"~~";
341 }
else if (ps.
typeOf(name) ==
typeid(float)) {
342 std::vector<float> vec = ps.
getArray<
float>(name);
343 std::vector<float>::iterator
iter;
344 payload.precision(numeric_limits<float>::digits10+1);
345 for (iter = vec.begin(); iter != vec.end(); iter++) {
346 payload <<
"float||"<< name <<
"||"<< *iter <<
"~~";
348 }
else if (ps.
typeOf(name) ==
typeid(double)) {
349 std::vector<double> vec = ps.
getArray<
double>(name);
350 std::vector<double>::iterator
iter;
351 payload.precision(numeric_limits<double>::digits10+1);
352 for (iter = vec.begin(); iter != vec.end(); iter++) {
353 payload <<
"double||"<< name <<
"||"<< *iter <<
"~~";
355 }
else if (ps.
typeOf(name) ==
typeid(std::string)) {
356 std::vector<std::string> vec = ps.
getArray<std::string>(name);
357 std::vector<std::string>::iterator
iter;
358 for (iter = vec.begin(); iter != vec.end(); iter++) {
359 payload <<
"string||" << name <<
"||"<< *iter <<
"~~";
363 std::vector<lsst::daf::base::DateTime>::iterator
iter;
364 for (iter = vec.begin(); iter != vec.end(); iter++) {
365 payload <<
"datetime||" << name <<
"||"<< (*iter).nsecs() <<
"~~";
368 std::cout <<
"Couldn't marshall "<< name << std::endl;
371 return payload.str();
377 if (textMessage == NULL)
380 std::string text = textMessage->getText();
382 return unmarshall(text);
388 std::vector<string> tuples;
391 splitString(text,
"~~", tuples);
397 for (i = 0; i < tuples.size(); i++) {
398 std::string
line = tuples.at(i);
399 std::vector<string> vec;
400 splitTuple(line,
"||", vec);
402 std::string type = vec.at(0);
403 std::string key = vec.at(1);
404 std::string
val = vec.at(2);
406 std::istringstream iss(val);
413 psp->add(key, int_value);
414 }
else if (type ==
"bool") {
418 psp->add(key, bool_value);
419 }
else if (type ==
"long long") {
420 long long longlong_value;
421 iss >> longlong_value;
423 psp->add(key, longlong_value);
424 }
else if (type ==
"long") {
428 psp->add(key, long_value);
429 }
else if (type ==
"float") {
433 psp->add(key, float_value);
434 }
else if (type ==
"double") {
438 psp->add(key, double_value);
439 }
else if (type ==
"string") {
441 }
else if (type ==
"datetime") {
442 long long longlong_value;
443 iss >> longlong_value;
455 void Event::splitString(std::string str, std::string delim,
456 std::vector<std::string>&results) {
457 std::string::size_type cutAt;
458 std::string::size_type delim_len = delim.length();
460 while( (cutAt = str.find(delim)) != str.npos ) {
462 results.push_back(str.substr(0,cutAt));
464 str = str.substr(cutAt+delim_len);
466 if(str.length() > 0) {
467 results.push_back(str);
474 void Event::splitTuple(std::string str, std::string delim,
475 std::vector<std::string>&results) {
476 std::string::size_type cutAt;
477 std::string::size_type delim_len = delim.length();
479 cutAt = str.find(delim);
480 results.push_back(str.substr(0,cutAt));
481 str = str.substr(cutAt+delim_len);
483 cutAt = str.find(delim);
484 results.push_back(str.substr(0,cutAt));
485 str = str.substr(cutAt+delim_len);
487 cutAt = str.find(delim);
488 results.push_back(str.substr(0,cutAt));
std::vector< std::string > paramNames(bool topLevelOnly=true) const
static DateTime now(void)
Class for handling dates/times, including MJD, UTC, and TAI.
boost::shared_ptr< PropertySet > Ptr
Include files required for standard LSST Exception handling.
defines the EventLibrary class
Interface for DateTime class.
std::type_info const & typeOf(std::string const &name) const
Class for storing generic metadata.
virtual Ptr deepCopy(void) const
struct tm gmtime(void) const
boost::enable_if< typename ExpressionTraits< Scalar >::IsScalar, bool >::type any(Scalar const &scalar)
Interface for PropertySet class.
std::vector< T > getArray(std::string const &name) const