Quarkus, Kafka, java.lang.IllegalStateException: SRMSG00028 and the Solution 📎
java.lang.IllegalStateException: SRMSG00028: The subscription to ... has been cancelled
at io.smallrye.reactive.messaging.extension.AbstractEmitter.verify(AbstractEmitter.java:165)
at io.smallrye.reactive.messaging.extension.AbstractEmitter.emit(AbstractEmitter.java:144)
at io.smallrye.reactive.messaging.extension.EmitterImpl.send(EmitterImpl.java:29)
at airhacks.kafka.jaxrs.boundary.MessagesResource.send(MessagesResource.java:34)
at airhacks.kafka.jaxrs.boundary.MessagesResource_Subclass.send$$superaccessor1(MessagesResource_Subclass.zig:209)
at airhacks.kafka.jaxrs.boundary.MessagesResource_Subclass$$function$$3.apply(MessagesResource_Subclass$$function$$3.zig:33)
is caused by an injected Emitter
into a RequestScoped
managed bean:
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.ws.rs.*;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@RequestScoped
@Path("messages")
@Consumes(MediaType.TEXT_PLAIN)
public class MessagesResource {
@Inject
@Channel("messages")
Emitter<String> messageEmitter;
@POST
public void send(String message) {
this.messageEmitter.send(message);
}
@PreDestroy
public void closeStream(){
this.messageEmitter.complete();
}
}
Changing the scope to ApplicationScoped
solves the problem:
@ApplicationScoped
@Path("messages")
@Consumes(MediaType.TEXT_PLAIN)
public class MessagesResource{
//...
}
Example was tested with: quarkus.io and configured SmallRye Apache Kafka connector (mp.messaging.outgoing.messages.connector=smallrye-kafka
).