热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

序列化与反序列化_深入理解kafka的序列化和反序列化

简介kafka内部发送和接收消息的时候,使用的是byte[]字节数组的方式(RPC底层也是用这种通讯格式)。但是我们在应用层其实可以使用更多的数据类型,
3de2a4a404b8f5314e6203f7e22d16cf.png
简介

kafka内部发送和接收消息的时候,使用的是byte[]字节数组的方式(RPC底层也是用这种通讯格式)。但是我们在应用层其实可以使用更多的数据类型,比如int,short, long,String等,这归功于kafka的序列化和反序列化机制。

915c83e299e50c81c1c62c16729d125a.png
基本原理分析

在之前的一篇文章springboot集成kafka示例中,我使用的是kafka原生的StringSerializer序列化方式,

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

源码如下:

public class StringSerializer implements Serializer { private String encoding = "UTF8"; public StringSerializer() { } public void configure(Map configs, Boolean isKey) { String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) { encodingValue = configs.get("serializer.encoding"); } if (encodingValue instanceof String) { this.encoding = (String)encodingValue; } } public byte[] serialize(String topic, String data) { try { return data == null ? null : data.getBytes(this.encoding); } catch (UnsupportedEncodingException var4) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding); } } public void close() { }}

其实很简单,configure方法设置序列化(serialize方法)需要使用的编码,如果没有设置就使用UTF8格式。这个方法是在生成producer实例的时候被调用的。serialize方法使用的就是String的getBytes把String类型的消息转化为byte字节数组。

反序列呢?聪明如你应该能想到,使用new String就可以解决了。源码如下:

@Override public String deserialize(String topic, byte[] data) { try { if (data == null) return null; else return new String(data, encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); }}---------------------

是不是简单到爆呢?

其它的内置序列化组件,像Double, Integer,Long这些原理都类似,就不一一分析了。

05c514e97a781c32b8ed1f5542639795.png
自定义序列化组件

有时候内置的组件不能满足我们的需要。比如我有个自定义的对象要作为kafka的消息进行收发(把对象转化为json字符串通过String的方式也是一种思路),希望能有一个针对我这个对象自定义的序列化和反序列化组件。

我们先定义一个消息对象,

@Data@ToStringpublic class Person { private int id; private String name; private int age;}

然后自定义自己的序列化和反序列化实现类,

@Slf4jpublic class PersonDeserializer implements Deserializer { @Override public void configure(Map map, Boolean b) { } @Override public Person deserialize(String s, byte[] bytes) { log.info("自定义的反序列化-deserialize"); return JSON.parseObject(bytes, Person.class); } @Override public void close() { }}@Slf4jpublic class PersonSerializer implements Serializer { private static Gson gson; static { gson = new GsonBuilder().create(); } @Override public void configure(Map map, Boolean b) { log.info("自定义的序列化组件--configure"); } @Override public byte[] serialize(String s, Person person) { log.info("自定义的序列化组件--serialize"); return JSON.toJSONBytes(person); } @Override public void close() { log.info("自定义的序列化组件--close"); }}

代码一看就明白,其实核心就是利用fastjson的toJSONBytes把对象转化为byte数组。

然后我们在配置里指定使用我们自己的序列化和反序列化实现类,

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=com.ponymaggie.github.kafka.serializer.PersonSerializerspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=com.ponymaggie.github.kafka.serializer.PersonDeserializer测试

启动springboot项目,通过日志可以看出消息的收发都是正常的。

2019-08-15 20:06:34.251 INFO 16676 --- [ main] c.p.github.kafka.producer.KafkaSender : +++++++++++++++++++++ message = {"id":1000,"name":"小明



推荐阅读
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • 本文讨论了在Spring 3.1中,数据源未能自动连接到@Configuration类的错误原因,并提供了解决方法。作者发现了错误的原因,并在代码中手动定义了PersistenceAnnotationBeanPostProcessor。作者删除了该定义后,问题得到解决。此外,作者还指出了默认的PersistenceAnnotationBeanPostProcessor的注册方式,并提供了自定义该bean定义的方法。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 标题: ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文介绍了多因子选股模型在实际中的构建步骤,包括风险源分析、因子筛选和体系构建,并进行了模拟实证回测。在风险源分析中,从宏观、行业、公司和特殊因素四个角度分析了影响资产价格的因素。具体包括宏观经济运行和宏经济政策对证券市场的影响,以及行业类型、行业生命周期和行业政策对股票价格的影响。 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • r2dbc配置多数据源
    R2dbc配置多数据源问题根据官网配置r2dbc连接mysql多数据源所遇到的问题pom配置可以参考官网,不过我这样配置会报错我并没有这样配置将以下内容添加到pom.xml文件d ... [详细]
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • Spring学习(4):Spring管理对象之间的关联关系
    本文是关于Spring学习的第四篇文章,讲述了Spring框架中管理对象之间的关联关系。文章介绍了MessageService类和MessagePrinter类的实现,并解释了它们之间的关联关系。通过学习本文,读者可以了解Spring框架中对象之间的关联关系的概念和实现方式。 ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
author-avatar
npa3689305
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有