Skip to content
Open
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 53 additions & 64 deletions code/common/datareplay.q
Original file line number Diff line number Diff line change
@@ -1,110 +1,99 @@
\d .datareplay

// params[`tabs] is list of tables to get - Required
// params[`sts] is start of time window to get - Required
// params[`ets] is end of time window to get - Required
// params[`syms] is list of instruments to get - Default all syms
// params[`where] is an additional where clause in functional form - Not Required
// params[`timer] is whether or not to retrieve timer - Default 0b
// params[`h] is handle to hdb - Default 0 (self)
// params[`replayinterval] is the interval to bucket the data messages into - Default no bucketing, messages published as per data timestamps
// params[`timerinterval] is the interval to bucket the timer messages into - Default 10 seconds, only used if timer is true
// prarms[`tc] is the time column of the tables specified - Default `time
// params[`timerfunc] is the timer function to use in timer messages - Default `.z.ts
tablesToDataStream:{[params]
defaults:`timer`h`syms`replayinterval`timerinterval`tc`timerfunc`where!(0b;0;`symbol$();`timespan$0n;`timespan$0n;`time;`.z.ts;());
params:defaults,params;

// Check for default parameters `tabs`sts`ets
if[count missing:`tabs`sts`ets except key params;'"missing parameters: "," " sv string missing;];
params[`tabs]:(),params[`tabs];

ds:raze {tableToDataStream x,(enlist `tn)!enlist y}[params] each params[`tabs];

$[params[`timer];
`time xasc ds,getTimers[params,enlist[`timerinterval]! enlist $[null k:params[`timerinterval];0D00:00:10.00;k]];
`time xasc ds]
};

// Generate times between two input times in p intervals
getBuckets:{[s;e;p](s+p*til(ceiling 1+e%p)-(ceiling s%p))}
getBuckets:{[s;e;p](s+p*til(ceiling 1+e%p)-(ceiling s%p))};

// params[`t] is table data
// params[`tc] is time column to cut on
// params[`tn] is table name
// params[`interval] is the time interval to bucket the messages into.
tableDataToDataStream:{[params]
// Sort table by time column.
params[`t]:params[`tc] xasc delete date from params[`t];

// get all times from table
// Get all times from table
t_times:params[`t][params[`tc]];

$[not null params[`interval];
[ // if there is an interval, bucket messages into this interval
// make bukets of ten second intervals
times:getBuckets[params[`sts];params[`ets];params[`interval]];
$[not null params[`replayinterval];
[ // If there is an interval, bucket messages into this interval
// Make buckets of ten second intervals
times:getBuckets[params[`sts];params[`ets];params[`replayinterval]];

// put start time in fornt of t_times
// Put start time in front of t_times
t_times:params[`sts],t_times;

//Get places to cut
// Get places to cut
cuts:distinct t_times bin times;
cuts:cuts where cuts>-1;

// fill first cut
// Fill first cut
if[0<>first cuts;cuts:0,cuts];

//cut table by time interval
// Cut table by time interval
msgs:cuts cut params[`t];

// get times that match data
// Get times that match data
time:{first x[y]}[;params[`tc]] each msgs;

// Return table of times and message chunks
-1_([]time:time;msg:{(`upd;x;y)}[params[`tn]] each msgs)
];
// if there is no intevral, cut by distinct time.
// If there is no interval, cut by distinct time.
([]
time:distinct t_times;
msg:{(`upd;x;$[1<count y;flip y;first y])}[params[`tn]] each
msg:{(`upd;x;y)}[params[`tn]] each
(where differ t_times) cut params[`t]
)
]

};

]
};

// params[`h] is handle to hdb process
// params[`tn] is table name used to query hdb
// params[`syms] is list of instruments to get
// params[`where] is an additional where clause in functional form - Not Reuqired
// params[`sts] is start of time window to get
// params[`ets] is end of time window to get
tableToDataStream:{[params]
// Evaluate select statement in HDB
t:@[params[`h];
(eval;tableSelectStatement params);
{.lg.e[`dataloader;"Failed to evauluate query on hdb: ",x]}
];

tableDataToDataStream[params,enlist[`t]!enlist t]
};

tableSelectStatement:{[params]
// Build where clause
wherec:(enlist (within;`date;(enlist;`date$params[`sts];`date$params[`ets]))) // date in daterange
,$[count params[`syms];enlist (in;`sym;enlist params[`syms]);()] //if syms is empty, omit sym in syms
,$[count params[`where];params[`where];()] // custom where clause (optional)
,enlist (within;params[`tc];(enlist;params[`sts];params[`ets])); // time within (sts;ets)

// Have hdb evaluate select statement.
t:@[params[`h];
(eval;(?;params[`tn];enlist wherec;0b;()));
{.lg.e[`dataloader;"Failed to evauluate query on hdb: ",x]}
];

tableDataToDataStream[params,enlist[`t]!enlist t]
(?;params[`tn];enlist wherec;0b;())
};

// params[`sts] is start of time window to get
// params[`ets] is end of time window to get
// params[`tp] is the inrement between times
// params[`timerfunc] is the timer function to use
getTimers:{[params]
times:getBuckets[params[`sts];params[`ets];params[`interval]];
([]time:times;msg:params[`timerfunc],'times)
}


// params[`tabs] is list of tables to get - Required
// params[`sts] is start of time window to get - Required
// params[`ets] is end of time window to get - Required
// params[`syms] is list of instruments to get - Default all syms
// params[`where] is an additional where clause in functional form - Not Reuqired
// params[`timer] is whether or not to retrieve timer - Default 0b
// params[`h] is handle to hdb - Default 0 (self)
// params[`interval] is the time interval to bucket the messages into. - Not Required
// prarms[`tc] is the time column of the tables specified - Defualt `time
// params[`timerfunc] is the timer function to use in timer messages - Default `.z.ts
tablesToDataStream:{[params]
defaults:`timer`h`syms`interval`tc`timerfunc`where!(0b;0;`symbol$();`timespan$0n;`time;`.z.ts;());
params:defaults,params;

// check for default parameters `tabs`sts`ets
if[count missing:`tabs`sts`ets except key params;'"mising prameters: "," " sv string missing;];
params[`tabs]:(),params[`tabs];

ds:raze {tableToDataStream x,(enlist `tn)!enlist y}[params] each params[`tabs];

$[params[`timer];
`time xasc ds,getTimers[params,enlist[`interval]! enlist $[null k:params[`interval];0D00:00:10.00;k]];
`time xasc ds]
times:getBuckets[params[`sts];params[`ets];params[`timerinterval]];
([]time:times;msg:params[`timerfunc],'times)
};

\d .