简介kafka内部发送和接收消息的时候,使用的是byte[]字节数组的方式(RPC底层也是用这种通讯格式)。但是我们在应用层其实可以使用更多的数据类型,比如int,short, long,String等,这归功于kafka的序列化和反序列化机制。
基本原理分析在之前的一篇文章springboot集成kafka示例中,我使用的是kafka原生的StringSerializer序列化方式,
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
源码如下:
public class StringSerializer implements Serializer { private String encoding = "UTF8"; public StringSerializer() { } public void configure(Map configs, Boolean isKey) { String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) { encodingValue = configs.get("serializer.encoding"); } if (encodingValue instanceof String) { this.encoding = (String)encodingValue; } } public byte[] serialize(String topic, String data) { try { return data == null ? null : data.getBytes(this.encoding); } catch (UnsupportedEncodingException var4) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding); } } public void close() { }}
其实很简单,configure方法设置序列化(serialize方法)需要使用的编码,如果没有设置就使用UTF8格式。这个方法是在生成producer实例的时候被调用的。serialize方法使用的就是String的getBytes把String类型的消息转化为byte字节数组。
反序列呢?聪明如你应该能想到,使用new String就可以解决了。源码如下:
@Override public String deserialize(String topic, byte[] data) { try { if (data == null) return null; else return new String(data, encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); }}---------------------
是不是简单到爆呢?
其它的内置序列化组件,像Double, Integer,Long这些原理都类似,就不一一分析了。
自定义序列化组件有时候内置的组件不能满足我们的需要。比如我有个自定义的对象要作为kafka的消息进行收发(把对象转化为json字符串通过String的方式也是一种思路),希望能有一个针对我这个对象自定义的序列化和反序列化组件。
我们先定义一个消息对象,
@Data@ToStringpublic class Person { private int id; private String name; private int age;}
然后自定义自己的序列化和反序列化实现类,
@Slf4jpublic class PersonDeserializer implements Deserializer { @Override public void configure(Map map, Boolean b) { } @Override public Person deserialize(String s, byte[] bytes) { log.info("自定义的反序列化-deserialize"); return JSON.parseObject(bytes, Person.class); } @Override public void close() { }}@Slf4jpublic class PersonSerializer implements Serializer { private static Gson gson; static { gson = new GsonBuilder().create(); } @Override public void configure(Map map, Boolean b) { log.info("自定义的序列化组件--configure"); } @Override public byte[] serialize(String s, Person person) { log.info("自定义的序列化组件--serialize"); return JSON.toJSONBytes(person); } @Override public void close() { log.info("自定义的序列化组件--close"); }}
代码一看就明白,其实核心就是利用fastjson的toJSONBytes把对象转化为byte数组。
然后我们在配置里指定使用我们自己的序列化和反序列化实现类,
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=com.ponymaggie.github.kafka.serializer.PersonSerializerspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=com.ponymaggie.github.kafka.serializer.PersonDeserializer
测试
启动springboot项目,通过日志可以看出消息的收发都是正常的。
2019-08-15 20:06:34.251 INFO 16676 --- [ main] c.p.github.kafka.producer.KafkaSender : +++++++++++++++++++++ message = {"id":1000,"name":"小明