The issue is related to the headers missing when sending a message to the Kafka Topic. My requirement is to generate a Spring Message with headers including id holding a value that matches with the Kafka Producer Record Key. While generating the Spring Message, I'm explicitly setting the id header and sending the message to the Kafka topic. But, when I consume the message from the topic, the value of the id header is set as None.

Approach 1:

I have implemented a Custom Kafka Message Header and passed header patterns asid, !timestamp, * and configured the binder to use the bean by setting the below property spring.cloud.stream.kafka.binder.headerMapperBeanName=customKafkaHeaderMapper Still, I receive the id value as 'None'. I have investigated further and observed that the Spring Framework is explicitly removing the headers (in this case, the id header), even after setting the pattern to allow id header in the custom header mapper. Here is the spring code:

protected MessageHandler createProducerMessageHandler(final ProducerDestination destination, ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel channel, MessageChannel errorChannel) throws Exception {
    final KafkaHeaderMapper mapper = null;
    // Bean name configured in the binder properties - spring.cloud.stream.kafka.binder.headerMapperBeanName=customKafkaHeaderMapper
    if (this.configurationProperties.getHeaderMapperBeanName() != null) {
        mapper = (KafkaHeaderMapper)applicationContext.getBean(this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class);
    }

    if (mapper == null) {
        try {
         mapper = (KafkaHeaderMapper)applicationContext.getBean("kafkaBinderHeaderMapper", KafkaHeaderMapper.class);
        } catch (BeansException var14) {
        }
    }

    Object mapper;
    if (producerProperties.getHeaderMode() != null && !HeaderMode.headers.equals(producerProperties.getHeaderMode())) {
         mapper = null;
    } else if (mapper == null) {
        String[] headerPatterns = ((KafkaProducerProperties)producerProperties.getExtension()).getHeaderPatterns();
        if (headerPatterns != null && headerPatterns.length > 0) {
         mapper = new BinderHeaderMapper(BinderHeaderMapper.addNeverHeaderPatterns(Arrays.asList(headerPatterns)));
        } else {
        mapper = new BinderHeaderMapper();
        }
    } else {
         // creates an instance of KafkaHeaderMapper to use CustomHeaderMapper
         mapper = new KafkaHeaderMapper() {
         public void toHeaders(Headers source, Map<String, Object> target) {
            mapper.toHeaders(source, target);
        }

        public void fromHeaders(MessageHeaders headers, Headers target) {
            mapper.fromHeaders(headers, target);
            // Explicitly removes header id
            BinderHeaderMapper.removeNeverHeaders(target);
        }
    };
    }

handler.setHeaderMapper((KafkaHeaderMapper)mapper);
return handler;
}

The BinderHeaderMapper.removeNeverHeaders(target) static method will remove the header id passed from the Spring Message.

public static void removeNeverHeaders(Headers headers) {
  headers.remove("id");
  headers.remove("timestamp");
  headers.remove("deliveryAttempt");
  headers.remove("scst_nativeHeadersPresent");
}

Could you please provide me with some suggestions on how to configure/implement a functionality that will populate id header in the Kafka Producer Record.

Comment From: rstoyanchev

The given Spring Kafka example code doesn't help me to understand if the root cause lies in Spring Kafka or the Spring Framework. I can't run the sample code either. If you believe the issue is in the Spring Framework, please explain further or provide a more isolated snippet, preferably without Spring Kafka, that demonstrates what you think is the issue. Or otherwise, create the issue in https://github.com/spring-projects/spring-kafka.

Comment From: skkadium

Apologies for the uncertainty. Please find below the issue in detail:

Requirement: The application must generate a Spring Message object including idas header and send the message to Kafka Topic. We have used Spring Cloud Streams to integrate the application with Kafka.

Expected Output The consumer application that consumes message from the Kakfa Topic must be able to read the message headers id and validate if the value of the header is same as Kafka Producer Record Key.

Actual Output The consumer application that consumes message from the Kafka Topic is always receiving the header idvalue as "None".

Customization To avoid unwanted headers like spring_json_header_types generated along with custom headers by the default mapper class i.e. BinderHeaderMapper and override the default header patterns to allow id, a custom header mapper is implemented.

@Slf4j
public class CustomKafkaHeaderMapper extends AbstractKafkaHeaderMapper {

  private final ObjectMapper objectMapper;

  public CustomKafkaHeaderMapper(String... patterns) {
    super(patterns);
    this.objectMapper = new ObjectMapper();
    this.objectMapper.registerModule(
        (new SimpleModule())
            .addDeserializer(
                MimeType.class, new CustomKafkaHeaderMapper.MimeTypeJsonDeserializer()));
  }

  @Override
  public void fromHeaders(MessageHeaders messageHeaders, Headers target) {
    messageHeaders.forEach(
        (key, value) -> {
          if (!KafkaHeaders.DELIVERY_ATTEMPT.equals(key) && this.matches(key, value)) {
            Object valueToAdd = this.headerValueToAddOut(key, value);
            if (valueToAdd instanceof byte[]) {
              target.add(new RecordHeader(key, (byte[]) valueToAdd));
            } else if (valueToAdd instanceof String) {
              target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(this.getCharset())));
            } else {
              try {
                target.add(new RecordHeader(key, new ObjectMapper().writeValueAsBytes(valueToAdd)));
              } catch (JsonProcessingException e) {
                logger.error(
                    e,
                    () -> "Could not map " + key + " with type " + value.getClass().getName()
                    );
              }
            }
          }
        });
  }

  @Override
  public void toHeaders(Headers source, Map<String, Object> target) {
    source.forEach(
        header -> {
          if (KafkaHeaders.DELIVERY_ATTEMPT.equals(header.key())) {
            target.put(header.key(), ByteBuffer.wrap(header.value()).getInt());
          } else {
            target.put(header.key(), new String(header.value(), this.getCharset()));
          }
        });
  }

  private class MimeTypeJsonDeserializer extends StdNodeBasedDeserializer<MimeType> {
    private static final long serialVersionUID = 1L;

    MimeTypeJsonDeserializer() {
      super(MimeType.class);
    }

    public MimeType convert(JsonNode root, DeserializationContext ctxt) throws IOException {
      if (root instanceof TextNode) {
        return MimeType.valueOf(root.asText());
      } else {
        JsonNode type = root.get("type");
        JsonNode subType = root.get("subtype");
        JsonNode parameters = root.get("parameters");
        Map<String, String> params =
            (Map)
                CustomKafkaHeaderMapper.this.objectMapper.readValue(
                    parameters.traverse(),
                    TypeFactory.defaultInstance()
                        .constructMapType(HashMap.class, String.class, String.class));
        return new MimeType(type.asText(), subType.asText(), params);
      }
    }
  }
}

This customHeaderMapper bean name is configured to the Binder properties as mentioned below: spring.cloud.stream.kafka.binder.headerMapperBeanName=customKafkaHeaderMapper

Analysis I thought the Custom Header Mapper will serve the purpose and set the idheader in the Kafka Producer Record. But in my initial analysis, I observed that

  • The custom header mapper bean is captured and configured to be used in the code below inside KafkaMessageChannelBinder class createProducerMessageHandlermethod.
final KafkaHeaderMapper mapper = null;
    if (this.configurationProperties.getHeaderMapperBeanName() != null) {
      mapper = (KafkaHeaderMapper)applicationContext.getBean(this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class);
    }

But at the end of this method, the handler is set to a new KafkaHeaderMapper object that uses the custom header mapper to map spring message headers.

final KafkaHeaderMapper mapper = null;
    if (this.configurationProperties.getHeaderMapperBeanName() != null) {
      mapper = (KafkaHeaderMapper)applicationContext.getBean(this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class);
    }

    if (mapper == null) {
      try {
        mapper = (KafkaHeaderMapper)applicationContext.getBean("kafkaBinderHeaderMapper", KafkaHeaderMapper.class);
      } catch (BeansException var14) {
      }
    }

    Object mapper;
    if (producerProperties.getHeaderMode() != null && !HeaderMode.headers.equals(producerProperties.getHeaderMode())) {
      mapper = null;
    } else if (mapper == null) {
      String[] headerPatterns = ((KafkaProducerProperties)producerProperties.getExtension()).getHeaderPatterns();
      if (headerPatterns != null && headerPatterns.length > 0) {
        mapper = new BinderHeaderMapper(BinderHeaderMapper.addNeverHeaderPatterns(Arrays.asList(headerPatterns)));
      } else {
        mapper = new BinderHeaderMapper();
      }
    } else {
      mapper = new KafkaHeaderMapper() {
        public void toHeaders(Headers source, Map<String, Object> target) {
          mapper.toHeaders(source, target);
        }

        public void fromHeaders(MessageHeaders headers, Headers target) {
          mapper.fromHeaders(headers, target);
          BinderHeaderMapper.removeNeverHeaders(target);
        }
      };
    }

    handler.setHeaderMapper((KafkaHeaderMapper)mapper);

If it can be observed inside the fromHeaders method of the above code, the mapper is the custom header mapper instance that is mapping the spring message headers to the Apache Kafka Headers. Till here, I can see the id header entry available in the Apache Kafka Headers. Post that the BinderHeaderMapper's removeNeverHeadersis invoked by passing the Apache Kafka Headers to explicitly remove the idheader from the record.

public static void removeNeverHeaders(Headers headers) {
    headers.remove("id");
    headers.remove("timestamp");
    headers.remove("deliveryAttempt");
    headers.remove("scst_nativeHeadersPresent");
  }

As per my analysis, even if my application sets the a simple Spring Message with idheader, before the message is written to the Kafka topic, the above code is removing the 'id' header from the Apache Kafka Headers and the id value is deserialized as 'None' by the consumer.

I would like to know, if there is a way to avoid the removal of the idheader from the Kafka Producer Record Headers as per our project requirement.

Please let mw know, if you need further information.

Comment From: rstoyanchev

This doesn't give me anything new I'm afraid. I still don't know how it relates to spring-messaging. All the code you're showing is Spring Kafka and not in the Spring Framework, and there isn't anything I can run to have a closer look.

Presumably the id is removed in the MessageHeaders contructor if that's called ID_VALUE_NONE. Is that what you're seeing and if you're debugging can you point me to where Spring Kafka calls it?

Comment From: skkadium

Okay @rstoyanchev, as you said, looks like I'm in a wring forum. I'll post the same in the Spring Kafka Forum. The issue is not related to the Spring Message, since we are overriding the MessageHeaders contructor to allow the id to be set with a custom value.

I'll post the same in the Spring Kafka forum as suggested by you.

Thanks for your time.

Comment From: rstoyanchev

Alright, closing for now.