diff --git a/code/common/datareplay.q b/code/common/datareplay.q index 0a67eb3e8..eba97f46c 100644 --- a/code/common/datareplay.q +++ b/code/common/datareplay.q @@ -1,110 +1,99 @@ \d .datareplay +// params[`tabs] is list of tables to get - Required +// params[`sts] is start of time window to get - Required +// params[`ets] is end of time window to get - Required +// params[`syms] is list of instruments to get - Default all syms +// params[`where] is an additional where clause in functional form - Not Required +// params[`timer] is whether or not to retrieve timer - Default 0b +// params[`h] is handle to hdb - Default 0 (self) +// params[`replayinterval] is the interval to bucket the data messages into - Default no bucketing, messages published as per data timestamps +// params[`timerinterval] is the interval to bucket the timer messages into - Default 10 seconds, only used if timer is true +// prarms[`tc] is the time column of the tables specified - Default `time +// params[`timerfunc] is the timer function to use in timer messages - Default `.z.ts +tablesToDataStream:{[params] + defaults:`timer`h`syms`replayinterval`timerinterval`tc`timerfunc`where!(0b;0;`symbol$();`timespan$0n;`timespan$0n;`time;`.z.ts;()); + params:defaults,params; + + // Check for default parameters `tabs`sts`ets + if[count missing:`tabs`sts`ets except key params;'"missing parameters: "," " sv string missing;]; + params[`tabs]:(),params[`tabs]; + + ds:raze {tableToDataStream x,(enlist `tn)!enlist y}[params] each params[`tabs]; + + $[params[`timer]; + `time xasc ds,getTimers[params,enlist[`timerinterval]! enlist $[null k:params[`timerinterval];0D00:00:10.00;k]]; + `time xasc ds] + }; + // Generate times between two input times in p intervals -getBuckets:{[s;e;p](s+p*til(ceiling 1+e%p)-(ceiling s%p))} +getBuckets:{[s;e;p](s+p*til(ceiling 1+e%p)-(ceiling s%p))}; // params[`t] is table data -// params[`tc] is time column to cut on // params[`tn] is table name -// params[`interval] is the time interval to bucket the messages into. tableDataToDataStream:{[params] // Sort table by time column. params[`t]:params[`tc] xasc delete date from params[`t]; - // get all times from table + // Get all times from table t_times:params[`t][params[`tc]]; - $[not null params[`interval]; - [ // if there is an interval, bucket messages into this interval - // make bukets of ten second intervals - times:getBuckets[params[`sts];params[`ets];params[`interval]]; + $[not null params[`replayinterval]; + [ // If there is an interval, bucket messages into this interval + // Make buckets of ten second intervals + times:getBuckets[params[`sts];params[`ets];params[`replayinterval]]; - // put start time in fornt of t_times + // Put start time in front of t_times t_times:params[`sts],t_times; - //Get places to cut + // Get places to cut cuts:distinct t_times bin times; cuts:cuts where cuts>-1; - // fill first cut + // Fill first cut if[0<>first cuts;cuts:0,cuts]; - //cut table by time interval + // Cut table by time interval msgs:cuts cut params[`t]; - // get times that match data + // Get times that match data time:{first x[y]}[;params[`tc]] each msgs; // Return table of times and message chunks -1_([]time:time;msg:{(`upd;x;y)}[params[`tn]] each msgs) ]; - // if there is no intevral, cut by distinct time. + // If there is no interval, cut by distinct time. ([] time:distinct t_times; - msg:{(`upd;x;$[1