Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 91 additions & 25 deletions concoredocker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <stdexcept>
#include <regex>
#include <algorithm>
#include <map>

#include "concore_base.hpp"

Expand All @@ -28,6 +29,9 @@ class Concore {
double simtime = 0;
double maxtime = 100;
std::unordered_map<std::string, std::string> params;
#ifdef CONCORE_USE_ZMQ
std::map<std::string, concore_base::ZeroMQPort*> zmq_ports;
#endif

std::string stripstr(const std::string& str) {
return concore_base::stripstr(str);
Expand All @@ -53,6 +57,14 @@ class Concore {
load_params();
}

~Concore() {
#ifdef CONCORE_USE_ZMQ
for (auto& kv : zmq_ports)
delete kv.second;
zmq_ports.clear();
#endif
}

Concore(const Concore&) = delete;
Concore& operator=(const Concore&) = delete;

Expand All @@ -63,13 +75,23 @@ class Concore {
inpath(std::move(other.inpath)), outpath(std::move(other.outpath)),
simtime(other.simtime), maxtime(other.maxtime),
params(std::move(other.params))
{}
{
#ifdef CONCORE_USE_ZMQ
zmq_ports = std::move(other.zmq_ports);
#endif
}

Concore& operator=(Concore&& other) noexcept
{
if (this == &other)
return *this;

#ifdef CONCORE_USE_ZMQ
for (auto& kv : zmq_ports)
delete kv.second;
zmq_ports = std::move(other.zmq_ports);
#endif

iport = std::move(other.iport);
oport = std::move(other.oport);
s = std::move(other.s);
Expand Down Expand Up @@ -118,18 +140,18 @@ class Concore {
return false;
}

std::vector<std::string> read(int port, const std::string& name, const std::string& initstr) {
std::vector<double> read(int port, const std::string& name, const std::string& initstr) {
std::this_thread::sleep_for(std::chrono::seconds(delay));
std::string file_path = inpath + "/" + std::to_string(port) + "/" + name;
std::ifstream infile(file_path);
std::string ins;

if (!infile) {
std::cerr << "File " << file_path << " not found, using default value.\n";
return {initstr};
return concore_base::parselist_double(initstr);
}
std::getline(infile, ins);

int attempts = 0, max_retries = 5;
while (ins.empty() && attempts < max_retries) {
std::this_thread::sleep_for(std::chrono::seconds(delay));
Expand All @@ -142,22 +164,21 @@ class Concore {

if (ins.empty()) {
std::cerr << "Max retries reached for " << file_path << ", using default value.\n";
return {initstr};
return concore_base::parselist_double(initstr);
}

s += ins;
try {
std::vector<std::string> inval = parselist(ins);
if (!inval.empty()) {
double file_simtime = std::stod(inval[0]);
simtime = std::max(simtime, file_simtime);
return std::vector<std::string>(inval.begin() + 1, inval.end());
}
} catch (...) {}
return {ins};
std::vector<double> inval = concore_base::parselist_double(ins);
if (inval.empty())
inval = concore_base::parselist_double(initstr);
if (inval.empty())
return inval;
simtime = simtime > inval[0] ? simtime : inval[0];
inval.erase(inval.begin());
return inval;
}

void write(int port, const std::string& name, const std::vector<std::string>& val, int delta = 0) {
void write(int port, const std::string& name, const std::vector<double>& val, int delta = 0) {
std::string file_path = outpath + "/" + std::to_string(port) + "/" + name;
std::ofstream outfile(file_path);
if (!outfile) {
Expand All @@ -174,15 +195,60 @@ class Concore {
}
}

std::vector<std::string> initval(const std::string& simtime_val) {
try {
std::vector<std::string> val = parselist(simtime_val);
if (!val.empty()) {
simtime = std::stod(val[0]);
return std::vector<std::string>(val.begin() + 1, val.end());
}
} catch (...) {}
return {};
#ifdef CONCORE_USE_ZMQ
void init_zmq_port(const std::string& port_name, const std::string& port_type,
const std::string& address, const std::string& socket_type_str) {
if (zmq_ports.count(port_name)) return;
int sock_type = concore_base::zmq_socket_type_from_string(socket_type_str);
if (sock_type == -1) {
std::cerr << "init_zmq_port: unknown socket type '" << socket_type_str << "'\n";
return;
}
zmq_ports[port_name] = new concore_base::ZeroMQPort(port_type, address, sock_type);
}

std::vector<double> read_ZMQ(const std::string& port_name, const std::string& name, const std::string& initstr) {
auto it = zmq_ports.find(port_name);
if (it == zmq_ports.end()) {
std::cerr << "read_ZMQ: port '" << port_name << "' not initialized\n";
return concore_base::parselist_double(initstr);
}
std::vector<double> inval = it->second->recv_with_retry();
if (inval.empty())
inval = concore_base::parselist_double(initstr);
if (inval.empty()) return inval;
simtime = simtime > inval[0] ? simtime : inval[0];
s += port_name;
inval.erase(inval.begin());
return inval;
}

void write_ZMQ(const std::string& port_name, const std::string& name, std::vector<double> val, int delta = 0) {
auto it = zmq_ports.find(port_name);
if (it == zmq_ports.end()) {
std::cerr << "write_ZMQ: port '" << port_name << "' not initialized\n";
return;
}
val.insert(val.begin(), simtime + delta);
it->second->send_with_retry(val);
// simtime must not be mutated here.
}

std::vector<double> read(const std::string& port_name, const std::string& name, const std::string& initstr) {
return read_ZMQ(port_name, name, initstr);
}

void write(const std::string& port_name, const std::string& name, std::vector<double> val, int delta = 0) {
return write_ZMQ(port_name, name, val, delta);
}
#endif // CONCORE_USE_ZMQ

std::vector<double> initval(const std::string& simtime_val) {
std::vector<double> val = concore_base::parselist_double(simtime_val);
if (val.empty()) return val;
simtime = val[0];
val.erase(val.begin());
return val;
}
};

Expand Down
Loading