作者:mobiledu2502909493 | 来源:互联网 | 2023-08-22 16:21
简易的分布式文件系统 本来初期打算用Hadoop 2,可是后来有限的服务器部署了Solr Cloud,各种站点,发现资源不够了,近10T的文件,已经几乎把服务器的磁盘全
简易的分布式文件系统
本来初期打算用Hadoop 2,可是后来有限的服务器部署了Solr Cloud,各种站点,发现资源不够了,近10T的文件,已经几乎把服务器的磁盘全部用光。想来想去,由于目前架构基于Scala的,所以还是用Scala Akka实现了一个简单版本的分布式文件系统。
Scala版本是2.10.3:http://www.scala-lang.org,Akka版本是2.2.3:http://akka.io。
所有文件随机放在不同的服务器上,在数据库中记录了文件存放的服务器IP地址、文件路径。在服务端部署基于Akka的简单文件服务,接收文件路径,读取并返回文件内容。调用者根据文件地址,去数据库中查找文件的服务IP地址和文件路径,根据得到的服务器IP地址,传入文件路径,调用该服务器的文件服务。
以下是部分实现代码。
1.文件服务参数
1
case
class
PatentFulltextArgs(
2
val url: String,
3
val start: Int,
4
val size: Int) {
5
6
}
2.文件服务Trait(有点像WCF中的服务契约)
1
trait PatentFulltextService {
2
def find(args: PatentFulltextArgs): Array[Byte]
3
}
3.文件服务实现
1
class
PatentFulltextServiceImpl
extends
PatentFulltextService with Disposable {
2
def find(args: PatentFulltextArgs): Array[Byte] =
{
3
val list =
ListBuffer[Byte]()
4
val file =
FileSystems.getDefault().getPath(args.url)
5
6
using(Files.newInputStream(file)) { in =>
7
{
8
val bytes =
new
Array[Byte](args.size + 1
)
9
in.skip(args.start)
10
in.read(bytes, 0
, bytes.length)
11
12
list ++=
bytes
13
}
14
}
15
16
list.toArray
17
}
18
}
4.用户Akka Deploy发布的类
class
ServiceApplication
extends
Bootable {
val system
= ActorSystem("serivce", ConfigFactory.load.getConfig("service"
))
def startup() {
TypedActor(system).typedActorOf(TypedProps[PatentFulltextServiceImpl],
"patentfulltext"
)
}
def shutdown() {
system.shutdown
}
}
在这里,我使用的Akka的TypeActor,请参考:http://doc.akka.io/docs/akka/2.2.3/scala/typed-actors.html。
以下是部署过程。
把生成的jar包,发布在Akka的deploy目录下,根据需要修改Akka的配置文件目录config下的application.conf。以下是我配置的内容,仅供参考:
actor {
provider = "akka.remote.RemoteActorRefProvider"
typed {
# Default timeout for typed actor methods with non-void return type
timeout = 6000s
}
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
netty.tcp {
hostname
= "服务端IP"
port = 2552
}
客户端使用时只需要服务契约Trait和相关实体类,以下是我写的一个客户端调用的类,仅供参考:
1
object RemoteService {
2
val logger = LoggerFactory.getLogger(
this
.getClass())
3
private
var system: ActorSystem =
null
4
5
def apply(configFile: String) =
{
6
system = ActorSystem("RemoteService", ConfigFactory.parseFile(
new
File(configFile)))
7
}
8
9
def findPatentFulltext(serverIp: String, patentFulltextArgs: PatentFulltextArgs) =
{
10
TypedActor(system).typedActorOf(TypedProps[com.cloud.akka.service.model.PatentFulltextService], system.actorFor("akka.tcp://serivce@" + serverIp + ":2552/user/patentfulltext"
)).find(patentFulltextArgs)
11
12
}
13
14
def shutdown =
{
15
if
(
null
!=
system) system.shutdown()
16
}
17
}}
以下问题是我还没找到合适的解决办法:
1.Akka无法传输大文件,即使修改配置,服务器可以返回,但是接收的客户端还会报错。我的解决方案是在客户端分块读取,然后合并。
2.在客户端使用时,TypedActor没有找到使用ActorSelection构建,因为ActorFor是标记为
Deprecated。
简易的分布式文件系统