Open
Conversation
Author
|
Hmmm, not quite sure why the Travis build fails. It's only one and it fails to install |
kirkshoop
requested changes
Nov 17, 2019
Member
kirkshoop
left a comment
There was a problem hiding this comment.
This is greatly appreciated!
| auto localState = state; | ||
|
|
||
| const auto tp = localState->worker.now().time_since_epoch(); | ||
| std::cout << "on_next(" << v << ") at " << tp.count() / 1000000 << " throttled: " << (localState->throttled ? "true" : "false") << std::endl; |
Member
There was a problem hiding this comment.
please remove std::cout usage
| auto work = [v, localState](const rxsc::schedulable&) { | ||
| auto produce_time = localState->worker.now() + localState->period; | ||
|
|
||
| std::cout << "scheduling unthrottle for " << (produce_time.time_since_epoch().count() / 1000000) << std::endl; |
Member
There was a problem hiding this comment.
please remove std::cout usage
| localState->throttled = true; | ||
|
|
||
| state->dest.on_next(v); | ||
|
|
Member
There was a problem hiding this comment.
the call to on_next is not necessarily on the same worker. the following needs to be scheduled on the worker.
if (!localState->throttled) {
localState->throttled = true;
state->dest.on_next(v);
}It would be more efficient to check throttled first and then only schedule state->dest.on_next(v); when throttled == false. To do that throttled would need to be std::atomic<bool>.
|
Any news on this PR? Could |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
I'm used to RxJS and loved the concept, so I wanted to use it in C++ as well. Seeing #501, I thought this would be a good opportunity for me to get a little deeper into understanding the implementation of RxCpp.
This is mostly a copy&paste of the
debounceoperator implementation, edited to implementthrottlesemantics (ref. RxJS).The
throttleoperator re-emits the first value emitted by the source, then swallows all following emissions until the specified timeout has occurred. After that, the next value emitted by the source is emitted again and the cycle continues.The RxJS implementation also supports emitting the last value emitted by the source when the timeout completes. If this is a feature you would like to see, I can add support for that as well.