The issue is related to the headers missing when sending a message to the Kafka Topic
.
My requirement is to generate a Spring Message
with headers including id
holding a value that matches with the Kafka Producer Record Key.
While generating the Spring Message, I'm explicitly setting the id
header and sending the message to the Kafka topic. But, when I consume the message from the topic, the value of the id
header is set as None
.
Approach 1:
I have implemented a Custom Kafka Message Header and passed header patterns asid, !timestamp, *
and configured the binder to use the bean by setting the below property
spring.cloud.stream.kafka.binder.headerMapperBeanName=customKafkaHeaderMapper
Still, I receive the id
value as 'None'.
I have investigated further and observed that the Spring Framework is explicitly removing the headers (in this case, the id header), even after setting the pattern to allow id
header in the custom header mapper. Here is the spring code:
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination, ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel channel, MessageChannel errorChannel) throws Exception {
final KafkaHeaderMapper mapper = null;
// Bean name configured in the binder properties - spring.cloud.stream.kafka.binder.headerMapperBeanName=customKafkaHeaderMapper
if (this.configurationProperties.getHeaderMapperBeanName() != null) {
mapper = (KafkaHeaderMapper)applicationContext.getBean(this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class);
}
if (mapper == null) {
try {
mapper = (KafkaHeaderMapper)applicationContext.getBean("kafkaBinderHeaderMapper", KafkaHeaderMapper.class);
} catch (BeansException var14) {
}
}
Object mapper;
if (producerProperties.getHeaderMode() != null && !HeaderMode.headers.equals(producerProperties.getHeaderMode())) {
mapper = null;
} else if (mapper == null) {
String[] headerPatterns = ((KafkaProducerProperties)producerProperties.getExtension()).getHeaderPatterns();
if (headerPatterns != null && headerPatterns.length > 0) {
mapper = new BinderHeaderMapper(BinderHeaderMapper.addNeverHeaderPatterns(Arrays.asList(headerPatterns)));
} else {
mapper = new BinderHeaderMapper();
}
} else {
// creates an instance of KafkaHeaderMapper to use CustomHeaderMapper
mapper = new KafkaHeaderMapper() {
public void toHeaders(Headers source, Map<String, Object> target) {
mapper.toHeaders(source, target);
}
public void fromHeaders(MessageHeaders headers, Headers target) {
mapper.fromHeaders(headers, target);
// Explicitly removes header id
BinderHeaderMapper.removeNeverHeaders(target);
}
};
}
handler.setHeaderMapper((KafkaHeaderMapper)mapper);
return handler;
}
The BinderHeaderMapper.removeNeverHeaders(target)
static method will remove the header id passed from the Spring Message.
public static void removeNeverHeaders(Headers headers) {
headers.remove("id");
headers.remove("timestamp");
headers.remove("deliveryAttempt");
headers.remove("scst_nativeHeadersPresent");
}
Could you please provide me with some suggestions on how to configure/implement a functionality that will populate id
header in the Kafka Producer Record.
Comment From: rstoyanchev
The given Spring Kafka example code doesn't help me to understand if the root cause lies in Spring Kafka or the Spring Framework. I can't run the sample code either. If you believe the issue is in the Spring Framework, please explain further or provide a more isolated snippet, preferably without Spring Kafka, that demonstrates what you think is the issue. Or otherwise, create the issue in https://github.com/spring-projects/spring-kafka.
Comment From: skkadium
Apologies for the uncertainty. Please find below the issue in detail:
Requirement:
The application must generate a Spring Messageid
as header and send the message to Kafka Topic. We have used Spring Cloud Streams to integrate the application with Kafka.
Expected Output
The consumer application that consumes message from the Kakfa Topic must be able to read the message headers id
and validate if the value of the header is same as Kafka Producer Record Key.
Actual Output
The consumer application that consumes message from the Kafka Topic is always receiving the header id
value as "None".
Customization
To avoid unwanted headers like spring_json_header_types
generated along with custom headers by the default mapper class i.e. BinderHeaderMapper
and override the default header patterns to allow id
, a custom header mapper is implemented.
@Slf4j
public class CustomKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
private final ObjectMapper objectMapper;
public CustomKafkaHeaderMapper(String... patterns) {
super(patterns);
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(
(new SimpleModule())
.addDeserializer(
MimeType.class, new CustomKafkaHeaderMapper.MimeTypeJsonDeserializer()));
}
@Override
public void fromHeaders(MessageHeaders messageHeaders, Headers target) {
messageHeaders.forEach(
(key, value) -> {
if (!KafkaHeaders.DELIVERY_ATTEMPT.equals(key) && this.matches(key, value)) {
Object valueToAdd = this.headerValueToAddOut(key, value);
if (valueToAdd instanceof byte[]) {
target.add(new RecordHeader(key, (byte[]) valueToAdd));
} else if (valueToAdd instanceof String) {
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(this.getCharset())));
} else {
try {
target.add(new RecordHeader(key, new ObjectMapper().writeValueAsBytes(valueToAdd)));
} catch (JsonProcessingException e) {
logger.error(
e,
() -> "Could not map " + key + " with type " + value.getClass().getName()
);
}
}
}
});
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
source.forEach(
header -> {
if (KafkaHeaders.DELIVERY_ATTEMPT.equals(header.key())) {
target.put(header.key(), ByteBuffer.wrap(header.value()).getInt());
} else {
target.put(header.key(), new String(header.value(), this.getCharset()));
}
});
}
private class MimeTypeJsonDeserializer extends StdNodeBasedDeserializer<MimeType> {
private static final long serialVersionUID = 1L;
MimeTypeJsonDeserializer() {
super(MimeType.class);
}
public MimeType convert(JsonNode root, DeserializationContext ctxt) throws IOException {
if (root instanceof TextNode) {
return MimeType.valueOf(root.asText());
} else {
JsonNode type = root.get("type");
JsonNode subType = root.get("subtype");
JsonNode parameters = root.get("parameters");
Map<String, String> params =
(Map)
CustomKafkaHeaderMapper.this.objectMapper.readValue(
parameters.traverse(),
TypeFactory.defaultInstance()
.constructMapType(HashMap.class, String.class, String.class));
return new MimeType(type.asText(), subType.asText(), params);
}
}
}
}
This customHeaderMapper
bean name is configured to the Binder properties as mentioned below:
spring.cloud.stream.kafka.binder.headerMapperBeanName=customKafkaHeaderMapper
Analysis
I thought the Custom Header Mapper will serve the purpose and set the id
header in the Kafka Producer Record. But in my initial analysis, I observed that
- The custom header mapper bean is captured and configured to be used in the code below inside
KafkaMessageChannelBinder
classcreateProducerMessageHandler
method.
final KafkaHeaderMapper mapper = null;
if (this.configurationProperties.getHeaderMapperBeanName() != null) {
mapper = (KafkaHeaderMapper)applicationContext.getBean(this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class);
}
But at the end of this method, the handler is set to a new KafkaHeaderMapper object that uses the custom header mapper to map spring message headers.
final KafkaHeaderMapper mapper = null;
if (this.configurationProperties.getHeaderMapperBeanName() != null) {
mapper = (KafkaHeaderMapper)applicationContext.getBean(this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class);
}
if (mapper == null) {
try {
mapper = (KafkaHeaderMapper)applicationContext.getBean("kafkaBinderHeaderMapper", KafkaHeaderMapper.class);
} catch (BeansException var14) {
}
}
Object mapper;
if (producerProperties.getHeaderMode() != null && !HeaderMode.headers.equals(producerProperties.getHeaderMode())) {
mapper = null;
} else if (mapper == null) {
String[] headerPatterns = ((KafkaProducerProperties)producerProperties.getExtension()).getHeaderPatterns();
if (headerPatterns != null && headerPatterns.length > 0) {
mapper = new BinderHeaderMapper(BinderHeaderMapper.addNeverHeaderPatterns(Arrays.asList(headerPatterns)));
} else {
mapper = new BinderHeaderMapper();
}
} else {
mapper = new KafkaHeaderMapper() {
public void toHeaders(Headers source, Map<String, Object> target) {
mapper.toHeaders(source, target);
}
public void fromHeaders(MessageHeaders headers, Headers target) {
mapper.fromHeaders(headers, target);
BinderHeaderMapper.removeNeverHeaders(target);
}
};
}
handler.setHeaderMapper((KafkaHeaderMapper)mapper);
If it can be observed inside the fromHeaders
method of the above code, the mapper
is the custom header mapper instance that is mapping the spring message headers to the Apache Kafka Headers
. Till here, I can see the id
header entry available in the Apache Kafka Headers
. Post that the BinderHeaderMapper's
removeNeverHeaders
is invoked by passing the Apache Kafka Headers to explicitly remove the id
header from the record.
public static void removeNeverHeaders(Headers headers) {
headers.remove("id");
headers.remove("timestamp");
headers.remove("deliveryAttempt");
headers.remove("scst_nativeHeadersPresent");
}
As per my analysis, even if my application sets the a simple Spring Message with id
header, before the message is written to the Kafka topic, the above code is removing the 'id' header from the Apache Kafka Headers and the id value is deserialized as 'None' by the consumer.
I would like to know, if there is a way to avoid the removal of the id
header from the Kafka Producer Record Headers as per our project requirement.
Please let mw know, if you need further information.
Comment From: rstoyanchev
This doesn't give me anything new I'm afraid. I still don't know how it relates to spring-messaging. All the code you're showing is Spring Kafka and not in the Spring Framework, and there isn't anything I can run to have a closer look.
Presumably the id is removed in the MessageHeaders
contructor if that's called ID_VALUE_NONE
. Is that what you're seeing and if you're debugging can you point me to where Spring Kafka calls it?
Comment From: skkadium
Okay @rstoyanchev, as you said, looks like I'm in a wring forum. I'll post the same in the Spring Kafka Forum. The issue is not related to the Spring Message, since we are overriding the MessageHeaders contructor to allow the id to be set with a custom value.
I'll post the same in the Spring Kafka forum as suggested by you.
Thanks for your time.
Comment From: rstoyanchev
Alright, closing for now.