热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

计算道路平均速度

我去了数据工程师工作面试。面试官问我一个问题。他给了我一些情况,并请我

我去了数据工程师工作面试。面试官问我一个问题。他给了我一些情况,并请我设计该系统的数据流。我解决了,但他不喜欢我的解决方案,但我失败了。我想知道您是否有更好的想法来解决这一难题。

问题是:

我们的系统接收四个数据流。数据包含车辆ID,速度和地理位置坐标。每个车辆每分钟发送一次数据。特定的流与特定的道路,车辆或其他任何东西之间没有任何联系。有一个函数可以接受协调并返回路段名称。我们需要知道每路路段每5分钟的平均速度。最后,我们要将结果写到Kafka。

计算道路平均速度

所以我的解决方法是:

首先将所有数据写入一个Kafka集群中,成为一个主题,并按纬度的5-6个第一位数字与经度的5-6个第一位数字进行分区。然后通过结构化流技术读取数据,并通过协调为每行添加路段名称(为此有一个预定义的udf),然后按路段名称来整理数据。

因为我将Kafka中的数据按协调的5-6位数字进行了划分,所以在将协调转换为部分名称后,无需将大量数据传输到正确的分区,因此我可以利用colesce ()不会触发完全随机播放的操作。

然后计算每个执行者的平均速度。

整个过程每5分钟发生一次,我们将以“追加”模式将数据写入最终的Kafka接收器。

计算道路平均速度

同样,面试官不喜欢我的解决方案。有人可以建议如何改进它,或者是一个完全不同的更好的主意吗?


我发现这个问题非常有趣,并想尝试一下。

正如我进一步评估的那样,您的尝试本身就是好的,除了以下几点:


  

由经纬度的5-6位数字和经度5-6位的数字分隔

如果您已经有了一种基于纬度和经度来获取路段ID /名称的方法,为什么不首先调用该方法并首先使用路段ID /名称来对数据进行分区?

然后,一切都很容易,因此拓扑将是

Merge all four streams ->
Select key as the road section id/name ->
Group the stream by Key ->
Use time windowed aggregation for the given time ->
Materialize it to a store.

(更详细的解释可以在下面的代码注释中找到。请询问是否不清楚)

我在此答案的末尾添加了代码,请注意,我使用sum代替了平均数,因为这更易于演示。通过存储一些额外的数据可以进行平均。

我已在评论中详细说明了答案。以下是根据代码生成的拓扑图(感谢https://zz85.github.io/kafka-streams-viz/)

拓扑:

Topology Diagram

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class VehicleStream {
// 5 minutes aggregation window
private static final long AGGREGATION_WINDOW = 5 * 50 * 1000L;
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
// Setting configs,change accordingly
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"vehicle.stream.app");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,kafka2:19092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
// initializing a streambuilder for building topology.
final StreamsBuilder builder = new StreamsBuilder();
// Our initial 4 streams.
List streamInputTopics = Arrays.asList(
"vehicle.stream1","vehicle.stream2","vehicle.stream3","vehicle.stream4"
);
/*
* Since there is no connection between a specific stream
* to a specific road or vehicle or anything else,* we can take all four streams as a single stream
*/
KStream source = builder.stream(streamInputTopics);
/*
* The initial key is unimportant (which can be ignored),* Instead,we will be using the section name/id as key.
* Data will contain comma separated values in following format.
* VehicleId,Speed,Latitude,Longitude
*/
WindowBytesStoreSupplier windowSpeedStore = Stores.persistentWindowStore(
"windowSpeedStore",AGGREGATION_WINDOW,2,10,true
);
source
.peek((k,v) -> printValues("Initial",k,v))
// First,we rekey the stream based on the road section.
.selectKey(VehicleStream::selectKeyAsRoadSection)
.peek((k,v) -> printValues("After rekey",v))
.groupByKey()
.windowedBy(TimeWindows.of(AGGREGATION_WINDOW))
.aggregate(
() -> "0.0",// Initialize
/*
* I'm using summing here for the aggregation as that's easier.
* It can be converted to average by storing extra details on number of records,etc..
*/
(k,v,previousSpeed) -> // Aggregator (summing speed)
String.valueOf(
Double.parseDouble(previousSpeed) +
VehicleSpeed.getVehicleSpeed(v).speed
),Materialized.as(windowSpeedStore)
);
// generating the topology
final Topology topology = builder.build();
System.out.print(topology.describe());
// constructing a streams client with the properties and topology
final KafkaStreams streams = new KafkaStreams(topology,properties);
final CountDownLatch latch = new CountDownLatch(1);
// attaching shutdown handler
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
private static void printValues(String message,String key,Object value) {
System.out.printf("===%s=== key: %s value: %s%n",message,key,value.toString());
}
private static String selectKeyAsRoadSection(String key,String speedValue) {
// Would make more sense when it's the section id,rather than a name.
return coordinateToRoadSection(
VehicleSpeed.getVehicleSpeed(speedValue).latitude,VehicleSpeed.getVehicleSpeed(speedValue).longitude
);
}
private static String coordinateToRoadSection(String latitude,String longitude) {
// Dummy function
return "Area 51";
}
public static class VehicleSpeed {
public String vehicleId;
public double speed;
public String latitude;
public String longitude;
public static VehicleSpeed getVehicleSpeed(String data) {
return new VehicleSpeed(data);
}
public VehicleSpeed(String data) {
String[] dataArray = data.split(",");
this.vehicleId = dataArray[0];
this.speed = Double.parseDouble(dataArray[1]);
this.latitude = dataArray[2];
this.lOngitude= dataArray[3];
}
@Override
public String toString() {
return String.format("veh: %s,speed: %f,latlong : %s,%s",vehicleId,speed,latitude,longitude);
}
}
}

,

这样的问题似乎很简单,提供的解决方案已经很有意义了。我想知道,面试官是否担心您关注的解决方案的设计和性能或结果的准确性。由于其他人都专注于代码,设计和性能,因此我将权衡准确性。

流媒体解决方案

随着数据的流入,我们可以粗略估计道路的平均速度。此估计将有助于检测拥塞,但无法确定速度限制。


  1. 将所有4个数据流组合在一起。

  2. 创建一个5分钟的窗口,以在5分钟内捕获所有4个流的数据。

  3. 在坐标上应用UDF以获取街道名称和城市名称。街道名称通常在城市之间重复,因此我们将使用城市名称+街道名称作为关键字。

  4. 使用-
  5. 之类的语法计算平均速度


vehicle_street_speed
.groupBy($"city_name_street_name")
.agg(
avg($"speed").as("avg_speed")
)
5. write the result to the Kafka Topic

批量解决方案

由于样本量较小,因此此估计将无效。我们将需要对整个月/季度/年的数据进行批处理,以便更准确地确定速度限制。


  1. 从数据湖(或Kafka主题)中读取年份数据


  2. 在坐标上应用UDF以获取街道名称和城市名称。


  3. 使用-

  4. 之类的语法计算平均速度


vehicle_street_speed
.groupBy($"city_name_street_name")
.agg(
avg($"speed").as("avg_speed")
)


  1. 将结果写入数据湖。

基于此更精确的速度限制,我们可以预测流式应用程序中的慢速流量。

,

我发现您的分区策略存在一些问题:


  • 当您说要根据lat long的前5-6位数字对数据进行分区时,您将无法预先确定kafka分区的数量。您将偏向某些路段的数据,因为某些路段的流量会比其他路段高。


  • 而且您的按键组合也不能保证在同一分区中具有相同的路段数据,因此您无法确定不会混洗。



IMO提供的信息不足以设计整个数据管道。因为在设计管道时,如何对数据进行分区起着重要的作用。您应该查询有关正在接收的数据的更多信息,例如车辆数量,输入数据流的大小,流的数量是否固定或将来会增加?您接收的输入数据流是kafka流吗?您在5分钟内收到多少数据?

  • 现在,让我们假设您在kafka或4个分区中有4个流写入了4个主题,并且没有任何特定键,但是您的数据是根据某个数据中心键进行分区的,或者是对哈希进行分区的。如果不是这样,则应在数据端进行此操作,而不是在另一个kafka流中对数据进行重复数据删除和分区。

  • 如果要在不同的数据中心上接收数据,则需要将数据带到一个群集中,为此,您可以使用Kafka镜像制造商或类似的产品。

  • 将所有数据放在一个群集上之后,您可以在其中运行结构化的流作业,并根据需要以5分钟的触发间隔和水印进行操作。

  • 要计算平均值并避免大量改组,可以使用mapValuesreduceByKey的组合来代替groupBy。请参阅this。

  • 您可以在处理后将数据写入kafka sink。


,

此解决方案的主要问题是:


  • 地图的6位正方形边缘上的路段将在多个主题分区中具有数据,并且将具有多个平均速度。

  • Kafka分区的摄取数据大小可能不平衡(城市与沙漠)。 IMO对按汽车ID的前几个数字进行分区可能是个好主意。

  • 不确定我是否遵循了合并部分,但这听起来有问题。

我想说解决方案需要做:从Kafka流中读取-> UDF-> groupby路段->平均->写入Kafka流中。

,

我的设计取决于


  1. 道路数

  2. 车辆数量

  3. 根据坐标计算道路成本

如果我想扩展任意数量的计数,设计将如下所示
enter image description here

对此设计存在共同的担忧-


  1. 保持输入流的持久状态(如果输入是kafka,我们可以使用Kafka或在外部存储偏移量

  2. 定期向外部系统检查点状态(我更喜欢使用async checkpoint barriers in Flink)

此设计可能会进行一些实用的增强-


  1. 如果可能,基于道路缓存路段映射功能

  2. 处理未命中的ping(实际上并非所有ping都可用)

  3. 考虑道路的曲率(考虑轴承和高度)


,

我的尝试是


  1. 一个从源中读取的流应用程序使它们充满了街道,并且所需的分区数可以说是30。这可以通过使用针对街道名称/ id选择的哈希算法来实现。

  2. 使用Kafka分区,以便对我的数据流进行分区并且易于检索。

  3. 使用火花流处理批处理并通过路旁ID减少批处理。然后将结果发送到Kafka。

其他方法


  1. 使用流媒体应用程序(例如Akka流)加载数据,然后将其转换为街道名称。

  2. 维护一个bloom过滤器,以检查是否已经处理了汽车ID。

  3. 此外,保持总记录速度的总和,然后在批处理追加5分钟后将数据以Bloom基数除以,或将其发送给Kafka主题,然后重新开始。

    • 这种方法将无法扩展,并且在仅考虑第一印象时会提供近似结果。




推荐阅读
author-avatar
邓尕恒_789
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有