4,075 questions
1
vote
0
answers
69
views
Return type mismatch: expected 'Any', actual 'Authentication?'
I tried to upgrade my codes to Spring Boot 4.0.0, and given the following Kotlin codes,
@Bean
fun auditorAware(): ReactiveAuditorAware<String> = ReactiveAuditorAware<String> {
...
Advice
0
votes
2
replies
58
views
why java stream api is one time use but reactor's publisher not?
why can we subscribe as may as we want from reactor publisher but just one time from java stream api?What reason is there for stream to be one time use?
1
vote
1
answer
76
views
How to find the root cause / exception stacktrace of signal type CANCEL
In doFinally have printed the signal type to finally verify that it was a cancellation (CANCEL). However, this is not captured as part of doOnError as technically cancellation is not part of error.
...
Best practices
0
votes
1
replies
56
views
Wrap function result in Mono.defer
In which cases does it make sense to wrap the result of a function that returns a Mono in Mono.defer, when assigning it to a variable and using it later?
F.e.
fun getMono(): Mono<Int> // some ...
1
vote
1
answer
81
views
Measure elapsed time on Spring reactor
I am using spring-webflux version 3.2.x
I want to log the total elapsed time of an whole pipeline execution.
I have found a method elapsed() but I want the total time, but I want the total time, not ...
1
vote
1
answer
171
views
Spring MCP Client do not access to HTTP Headers from McpTransportContext in stream mode
I'm trying to implement a Spring MCP Client based on the spring-ai 1.1.0-SNAPSHOT library.
The minimal project source code is available in this public GitHub repository: https://github.com/robynico/...
0
votes
0
answers
44
views
Why use FluxSink API in Project Reactor instead of directly managing subscriptions?
In Project Reactor, why did the designers choose to use the FluxSink API for emitting data instead of exposing a getSubscription() method, where you could directly call the request() and cancel() ...
0
votes
0
answers
65
views
Spring Kafka 3.3 reactive @KafkaListener commits offset immediately on error — DefaultErrorHandler not retrying
I’m using Spring Kafka 3.3.10 with a reactive Kafka listener (returning Mono).
My listener processes a message, calls a downstream WebClient, aggregates results, and returns a Mono.
If a timeout ...
0
votes
1
answer
47
views
What is the equivalent to doOnComplete for mono
At some point in our code using rxjava we have
public Maybe<Event> verifyFoo(Event event){
return Maybe.just(event)
.flatMap(this::foo)
.doOnComplete(() -> ...
1
vote
1
answer
46
views
Process events with throttling and timeout
There are different kinds of events in my application. Let's call them for simplicity a-event, b-events, c-events and so on. I need to implement throttling for each kind of events separately:
@...
2
votes
1
answer
102
views
How to retrieve metrics from Couchbase query with async API?
After I understood how to run queries including to evaluate the metadata in this question I am running into a design flaw:
Instant threshold = Instant.now().minus(retention);
String query = String....
0
votes
1
answer
137
views
SpringBoot Reactive (Webclient) - TraceId not propagated to reactor?
I'm working on a Spring Boot application using Spring WebFlux with reactive WebClient and Micrometer for tracing. While tracing works correctly in the imperative (servlet) stack, the trace context (...
2
votes
0
answers
201
views
Spring Webflux propagate response information in Reactor Context
I have a HTTP proxy created on my interface with HttpServiceProxyFactory - it uses WebClient under the hood
interface MyClient {
@GetExchange("/info")
fun getInformation(): Mono<...
0
votes
0
answers
100
views
How to configure HTTP forward proxy (instead of CONNECT tunnel)?
I'm trying to configure an HTTP proxy (without CONNECT tunnelling support) for my Spring WebFlux WebClients (based on Netty HttpClient) but without success (403 HTTP status code).
As explained in the ...
0
votes
1
answer
179
views
S3AsyncClient (AWS SDK v2, Java) uploads 0 bytes despite non-empty content
S3AsyncUploadService.java
package com.util.s3;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import ...
1
vote
0
answers
61
views
project reactor, spring webflux and publishing async events
so I'm always quite confused when it comes to Flux objects and everything related to them.
In my usecase I have a reactive endpoint exposed to webclient which I want to publish async elements received ...
1
vote
0
answers
87
views
Why .publishOn() affects predecessing Mono.fromRunnable()?
Running
Mono.fromRunnable(() -> log.info("Thread {}", Thread.currentThread().getName()))
.publishOn(Schedulers.boundedElastic())
.subscribe();
logs
Thread boundedElastic-&...
0
votes
0
answers
89
views
OpenTelemetry and log parentId using Context Reactor
I am trying to inject a field in the MDC context using Context Reactor using Spring webflux. Reading documentation, its said that OpenTelemetry just puts the traceId and spanId in the MDC context but ...
-2
votes
2
answers
178
views
Asserting Mono completion softly
Suppose I need to softly assert against multiple object properties, one of which is a Mono. Typically, you use StepVerifier to assert against that. However, the verifyComplete() call will not be soft. ...
1
vote
0
answers
52
views
Reactor Kafka Consumer with Backpressure
We have the following Kafka consumer code with Reactor Kafka. For large messages the application runs out of memory. Back-pressure seems to be not applied.
When the groupBy, flatMap and publishOn ...
1
vote
1
answer
74
views
Preserving distinct types downstream for different operators
Help me solve this Project Reactor puzzle.
Basically two operators need different types, I seemingly can't preserve them both downstream.
I could have some data container that stores both the DTO and ...
0
votes
1
answer
67
views
How do you force a Flux to pull or implement backpressure limits?
I'm attempting to open a file and reading lines through a Flux. I would then like X number of workers to process lines concurrently. Those workers may be slow and so I would like to prevent the ENTIRE ...
0
votes
0
answers
137
views
503 Error Http client inbound receiver cancelled, closing channel
We have a post REST endpoint defined as:
@RequestMapping(value = "/service/",
produces = { "application/json" },
consumes = { "application/json" },
method = ...
1
vote
1
answer
64
views
Why does flatMapMany exist as a standalone method instead of an overload of flatMap?
I was confused by this and it wasn't addressed by the documentation. To me, it looks like the need for FlatMapMany can be inferred by the compiler from the types involved (if a flatMap on a Mono ...
0
votes
0
answers
39
views
WebFlux with WebFilter - correctly starting a long running task inside of a WebFilter without blocking response
Given a org.springframework.web.server.WebFilter - what are some best ways to start a "longer running" thread without blocking the REST response coming back to the consumer. The long running ...
1
vote
1
answer
271
views
Should a Spring WebFlux controller return a Mono<T> or the unwrapped T?
I'm building a reactive API using Spring WebFlux and Project Reactor. In my controller layer, I currently return a Mono<MyResponseDto> like this:
@GetMapping("/example")
public Mono<...
0
votes
0
answers
242
views
Connection reset by peer: reactor-netty
We have a microservices-based Spring Boot architecture where we use Spring WebClient which internally uses Reactor Netty for inter-service communication (i.e., Service A → Service B) and this ...
0
votes
0
answers
32
views
Reactive Sink - Avoiding the OverflowException with concurrent emissions [duplicate]
I have this sink configured in my app:
createdSink = Sinks.many().multicast().onBackpressureBuffer(4096);
public void activityCreated(ActivityResource createdActivity) {
try {
var ...
0
votes
0
answers
75
views
Short-circuit propagation with Spring Reactive Webclient
I have written an integration test which merges five HTTP requests into a flux, short circuits using any(), and then executes a new HTTP request using the same client:
Map<String, Cookie> ...
0
votes
1
answer
221
views
Reactive Sink - Avoiding the OverflowException
I have this sink configured in my app:
createdSink = Sinks.many().multicast().onBackpressureBuffer(4096);
public void activityCreated(ActivityResource createdActivity) {
try {
var ...
1
vote
0
answers
112
views
get an error when using rsocket-js to communicate with Spring-rsocket when the data transferred is large then 65536
I'm tesing rsocket-js on web browser using rsocket-websocket-client.
Client code,running on web browser:
async function initRequest() {
const transportOpts = {
url: 'ws:' + '127.0.0.1:6565',
...
0
votes
0
answers
54
views
Is something more flexible than Flux.collectMap(function, function) with null skip value?
There is useful, but not enough smart method:
reactor.core.publisher.Flux#collectMap(java.util.function.Function<? super T,? extends K>, java.util.function.Function<? super T,? extends V>)
...
0
votes
0
answers
90
views
App error on Reactor when instrumented with OTEL Java Agent
I am getting the below error when I am running the application with OTEL Java-agent to process the traces. This is affecting the functionality of the application. If anyone knows what is happening, ...
0
votes
0
answers
60
views
ParallelFlux - Parallel Execution with multiple downstream API calls
I have 3 input IDs, which should be processed parallely. For that I have used ParallelFlux as below.
For every ID, there are 2 downstream API calls(mono1 and mono2 as below) that needs to be parallely ...
1
vote
1
answer
223
views
Why is switchIfEmpty not being executed?
In the below code logic switchIfEmpty is not getting executed.
As per below code logic I am expecting lowerCase names when filter(__ -> names.size() > 5)
evaluates to false and upperCase names ...
2
votes
1
answer
167
views
Spring WebFlux SSE connection, update header on retry
I establish a ServerSentEvent connection using the code below. The GET request must have a "Last-Event-ID" HTTP Header set that indicates the last fully processed event. Occasionally the ...
2
votes
2
answers
119
views
How to read a file more efficiently?
I have code that reads a file and converts and returns a flux of a list of hashmaps in groups of 10. Initially I was taking an input of the entire file content as a string, but I felt like it would ...
0
votes
1
answer
80
views
Difference between Mono.error(Throwable throwable) and Mono.error(Supplier<? extends Throwable> errorSupplier)
Can I visually see the difference between the Mono.error(Throwable throwable) and Mono.error(Supplier<? extends Throwable> errorSupplier). I understand that these are eager and lazy loading. The ...
0
votes
1
answer
1k
views
Not able to install `insightface==0.7.3` using `python3.10.16`
System Info
python --version
Output Python 3.10.16
I ran this command python -m pip install insightface==0.7.3 and getting the following console output
Using cached Cython-3.0.11-cp310-cp310-...
4
votes
0
answers
324
views
Why does disposing of a Flux-based subscription in Micronaut’s Reactor HTTP client hang subsequent requests?
I have a Micronaut application using the declarative Reactor HTTP client, backed by DefaultHttpClient (DefaultHttpClient implements both HttpClient and StreamingHttpClient).
When I subscribe to a ...
0
votes
0
answers
142
views
Automatic Context Propagation fails to work with MDC logging in reactive API Gateway
We have developed our API Gateway using Spring Cloud Gateway framework based on reactive paradigm using below libraries:
Spring Boot: 3.2.0
Spring Cloud: 2023.0.0
Spring: 6.1.1
Spring Cloud Gateway: 4....
1
vote
1
answer
61
views
Reactive Spring Boot call an API recursively based on response parameter of same API call
I have an external API which returns a List of Student along with count and offset parameters which represents number of students remaining in the database.
It is similar to a paginated response but ...
2
votes
0
answers
41
views
Spring WebFlux BlockingIterator.hasNext() blocking forever
I am trying to track down a bug in my app with Spring Webflux hanging under load. I have been debugging this for many hours so I really need some help, even just on direction to investigate.
Problem:
...
0
votes
1
answer
54
views
Spring webflux flatmap callback is not executed
I have the following code inside a webfilter in my Srping webflux app
return this.authenticationConverter.convert(exchange)
.onErrorResume(AuthenticationException.class, (ex) -> {
...
1
vote
0
answers
101
views
How to retain new elements in projectreactor's buffer sink.many.multicast.onBackpressureBuffer
I'm using Sinks.many().multicast().onBackpressureBuffer(int bufferSize) in Reactor. This method buffers up to bufferSize items before the first subscriber is registered. However, if the buffer ...
2
votes
1
answer
233
views
Spring WebClient seems to suppress WebClientResponseException using Retry.withThrowable
Using Spring's WebClient I'd like to implement the following behavior.
When the server returns HTTP 429 (Too Many Requests), then I'd like to extract the Retry-After header value. The request shall ...
0
votes
1
answer
108
views
process reactive query result concurrently
I need to iterate over a massive query result and perform some business logic on each entity. The database is connected with Spring Data Mongo Reactive so I get a Flux<Entity>.
The simple ...
0
votes
0
answers
105
views
Deadlock in in calling spring webflux or webclient
I need to use the spring WebClient but I need to block it, however, I know it is anti pattern and bad. But I need to since the defined interface that I am going to implement is not reactive. On the ...
1
vote
0
answers
178
views
Spring WebClient fails on multiple concurrent requests
I have a problem with executing multiple (460) concurrent HTTPs requests using WebClient. I'am trying to perform specific simulation, therefore it's important for me to start all thread simultaneously....
1
vote
0
answers
28
views
Project Reactor, using a Flux sink to send a data with delay
I have a interface with default methods to send a value:
import org.springframework.web.reactive.socket.WebSocketMessage;
import reactor.core.publisher.FluxSink;
public interface ISocketClient {
...