Skip to content

Rust::com Rust Stream API Implementation#480

Open
bharatGoswami8 wants to merge 7 commits into
eclipse-score:mainfrom
bharatGoswami8:rust_Stream_api_impl
Open

Rust::com Rust Stream API Implementation#480
bharatGoswami8 wants to merge 7 commits into
eclipse-score:mainfrom
bharatGoswami8:rust_Stream_api_impl

Conversation

@bharatGoswami8
Copy link
Copy Markdown
Contributor

@bharatGoswami8 bharatGoswami8 commented May 28, 2026

  • Adds a borrow-based, non-'static async streaming API (Subscription::to_stream) with an internal fixed-capacity buffer.
  • Implements the stream for the Lola runtime.

#371

darkwisebear

This comment was marked as low quality.

* Proposal for async stream receive api with basic sequence diagram
* Implemented poll_next function for stream api in consumer crate
* Unpin bound required with impl stream return
* Added Test case for to_stream api usage
* Removed the max sample paramter from API
* Removed the overflow parameter
@bharatGoswami8 bharatGoswami8 force-pushed the rust_Stream_api_impl branch from 9bffdbe to 595751d Compare May 30, 2026 10:43
* Stream taking ProxyEventManagerGuard as value
* Updated field type to Option
@bharatGoswami8 bharatGoswami8 force-pushed the rust_Stream_api_impl branch from 595751d to 95cd73f Compare May 30, 2026 10:59
}

#[tokio::test(flavor = "multi_thread")]
#[ignore = "This test demonstrates async receive with timeout, it can run individually if wanted to test timeout behavior"]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test cases ignored in example app because of it required separate service instance.
We already have plan to optimize this example app which can run as an app and can select which API want to test using user input - #489

///
/// # Errors
/// Returns an error if a problem occurs during sample reception.
fn to_stream<'a>(&'a self) -> impl Stream<Item = Result<Self::Sample<'a>>> + Unpin + 'a;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be &mut self, so that receive (or a 2nd to_stream call) cannot happen.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes agreed, updated the API signature.

offered_producer: VehicleOfferedProducer<R>,
) -> VehicleOfferedProducer<R> {
for i in 0..10 {
for i in 0..6 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is 6 better than 10?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was reduced because we have now more test and sending 10 sample is taking time because each test is invoking producer but as already i have mention in above comment , we will optimize this example app from test based to proper API function based testing using user input from CLI , ticket for same #489

println!("[RECEIVER] Async data processor started");
let mut buffer = SampleContainer::new(5);
for _ in 0..5 {
for _ in 0..3 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is 3 better than 5? Consider turning these numbers into constants and explain why they have the limit they have.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we are testing developed API using this example app test, it takes some time to run because of that iteration is reduced but we will optimize this example app from test based to proper API function-based testing using user input from CLI , ticket for same #489

struct SampleStream<'a, T: CommData + Debug> {
subscriber: &'a SubscriberImpl<T>,
sample_container: Option<SampleContainer<Sample<T>>>,
event_guard: Option<ProxyEventManagerGuard<'a>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be an Option? I do not see any place where this is set to None.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue was originally caused by the borrow checker. When calling fn try_receive_samples, it required mutable access to both values. To address this, we previously wrapped them in Option, took temporary ownership, and then reassigned them.
Now, this workaround has been removed because we are borrowing self mutably only once and using it to call internal fields when passing values to the function call so only one mut borrow happening with this now.


fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Yield any buffered samples from a previous batch fetch first.
if let Some(mut container) = self.sample_container.take() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to take the container? Wouldn't a simple pop_front be enough? You reassign it in any case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was because of Option and borrow checker issue, removed now.


let samples_received = {
// Temporarily take ownership of scratch to avoid borrow issues
if let Some(mut scratch) = self.sample_container.take() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. Why take the container? In case of an error, the stream is then broken, since the container "gets lost".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was because of Option and borrow checker issue, removed now.


match samples_received {
Ok(_count) => {
if let Some(mut container) = self.sample_container.take() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was because of Option and borrow checker issue, removed now.


#[allow(clippy::manual_async_fn)]
fn to_stream<'a>(&'a self) -> impl Stream<Item = Result<Self::Sample<'a>>> + Unpin + 'a {
stream::empty()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a ticket to extend this at some point?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created ticket for same - #490

* API is taking mut self
* Removed Option for internal field
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants