diff --git a/concoredocker.hpp b/concoredocker.hpp index bcd2bcb..b4fa69b 100644 --- a/concoredocker.hpp +++ b/concoredocker.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include "concore_base.hpp" @@ -28,6 +29,9 @@ class Concore { double simtime = 0; double maxtime = 100; std::unordered_map params; +#ifdef CONCORE_USE_ZMQ + std::map zmq_ports; +#endif std::string stripstr(const std::string& str) { return concore_base::stripstr(str); @@ -53,6 +57,14 @@ class Concore { load_params(); } + ~Concore() { +#ifdef CONCORE_USE_ZMQ + for (auto& kv : zmq_ports) + delete kv.second; + zmq_ports.clear(); +#endif + } + Concore(const Concore&) = delete; Concore& operator=(const Concore&) = delete; @@ -63,13 +75,23 @@ class Concore { inpath(std::move(other.inpath)), outpath(std::move(other.outpath)), simtime(other.simtime), maxtime(other.maxtime), params(std::move(other.params)) - {} + { +#ifdef CONCORE_USE_ZMQ + zmq_ports = std::move(other.zmq_ports); +#endif + } Concore& operator=(Concore&& other) noexcept { if (this == &other) return *this; +#ifdef CONCORE_USE_ZMQ + for (auto& kv : zmq_ports) + delete kv.second; + zmq_ports = std::move(other.zmq_ports); +#endif + iport = std::move(other.iport); oport = std::move(other.oport); s = std::move(other.s); @@ -118,7 +140,7 @@ class Concore { return false; } - std::vector read(int port, const std::string& name, const std::string& initstr) { + std::vector read(int port, const std::string& name, const std::string& initstr) { std::this_thread::sleep_for(std::chrono::seconds(delay)); std::string file_path = inpath + "/" + std::to_string(port) + "/" + name; std::ifstream infile(file_path); @@ -126,10 +148,10 @@ class Concore { if (!infile) { std::cerr << "File " << file_path << " not found, using default value.\n"; - return {initstr}; + return concore_base::parselist_double(initstr); } std::getline(infile, ins); - + int attempts = 0, max_retries = 5; while (ins.empty() && attempts < max_retries) { std::this_thread::sleep_for(std::chrono::seconds(delay)); @@ -142,22 +164,21 @@ class Concore { if (ins.empty()) { std::cerr << "Max retries reached for " << file_path << ", using default value.\n"; - return {initstr}; + return concore_base::parselist_double(initstr); } - + s += ins; - try { - std::vector inval = parselist(ins); - if (!inval.empty()) { - double file_simtime = std::stod(inval[0]); - simtime = std::max(simtime, file_simtime); - return std::vector(inval.begin() + 1, inval.end()); - } - } catch (...) {} - return {ins}; + std::vector inval = concore_base::parselist_double(ins); + if (inval.empty()) + inval = concore_base::parselist_double(initstr); + if (inval.empty()) + return inval; + simtime = simtime > inval[0] ? simtime : inval[0]; + inval.erase(inval.begin()); + return inval; } - void write(int port, const std::string& name, const std::vector& val, int delta = 0) { + void write(int port, const std::string& name, const std::vector& val, int delta = 0) { std::string file_path = outpath + "/" + std::to_string(port) + "/" + name; std::ofstream outfile(file_path); if (!outfile) { @@ -174,15 +195,60 @@ class Concore { } } - std::vector initval(const std::string& simtime_val) { - try { - std::vector val = parselist(simtime_val); - if (!val.empty()) { - simtime = std::stod(val[0]); - return std::vector(val.begin() + 1, val.end()); - } - } catch (...) {} - return {}; +#ifdef CONCORE_USE_ZMQ + void init_zmq_port(const std::string& port_name, const std::string& port_type, + const std::string& address, const std::string& socket_type_str) { + if (zmq_ports.count(port_name)) return; + int sock_type = concore_base::zmq_socket_type_from_string(socket_type_str); + if (sock_type == -1) { + std::cerr << "init_zmq_port: unknown socket type '" << socket_type_str << "'\n"; + return; + } + zmq_ports[port_name] = new concore_base::ZeroMQPort(port_type, address, sock_type); + } + + std::vector read_ZMQ(const std::string& port_name, const std::string& name, const std::string& initstr) { + auto it = zmq_ports.find(port_name); + if (it == zmq_ports.end()) { + std::cerr << "read_ZMQ: port '" << port_name << "' not initialized\n"; + return concore_base::parselist_double(initstr); + } + std::vector inval = it->second->recv_with_retry(); + if (inval.empty()) + inval = concore_base::parselist_double(initstr); + if (inval.empty()) return inval; + simtime = simtime > inval[0] ? simtime : inval[0]; + s += port_name; + inval.erase(inval.begin()); + return inval; + } + + void write_ZMQ(const std::string& port_name, const std::string& name, std::vector val, int delta = 0) { + auto it = zmq_ports.find(port_name); + if (it == zmq_ports.end()) { + std::cerr << "write_ZMQ: port '" << port_name << "' not initialized\n"; + return; + } + val.insert(val.begin(), simtime + delta); + it->second->send_with_retry(val); + // simtime must not be mutated here. + } + + std::vector read(const std::string& port_name, const std::string& name, const std::string& initstr) { + return read_ZMQ(port_name, name, initstr); + } + + void write(const std::string& port_name, const std::string& name, std::vector val, int delta = 0) { + return write_ZMQ(port_name, name, val, delta); + } +#endif // CONCORE_USE_ZMQ + + std::vector initval(const std::string& simtime_val) { + std::vector val = concore_base::parselist_double(simtime_val); + if (val.empty()) return val; + simtime = val[0]; + val.erase(val.begin()); + return val; } };