确认交付的Akka持久性给出了不一致的结果

 幸福欧旭旭_320 发布于 2022-12-04 03:44

我一直在玩Akka Persistence,并编写了以下程序来测试我的理解.问题是每次运行此程序时都会得到不同的结果.正确的答案是49995000,但我并不总是这样.我已经清理了每次运行之间的日志目录,但它没有任何区别.任何人都可以看到出了什么问题?程序简单地将从1到n的所有数字相加(其中n在下面的代码中为9999).

正确的答案是:(n*(n + 1))/ 2.对于n = 9999,这是49995000.

编辑:似乎与JDK 8更一致地工作,而不是JDK 7.我应该只使用JDK 8吗?

package io.github.ourkid.akka.aggregator.guaranteed

import akka.actor.Actor
import akka.actor.ActorPath
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.persistence.AtLeastOnceDelivery
import akka.persistence.PersistentActor

case class ExternalRequest(updateAmount : Int)
case class CountCommand(deliveryId : Long, updateAmount : Int)
case class Confirm(deliveryId : Long)

sealed trait Evt
case class CountEvent(updateAmount : Int) extends Evt
case class ConfirmEvent(deliveryId : Long) extends Evt

class TestGuaranteedDeliveryActor(counter : ActorPath) extends PersistentActor with AtLeastOnceDelivery {

  override def persistenceId = "persistent-actor-ref-1"

  override def receiveCommand : Receive = {
    case ExternalRequest(updateAmount) => persist(CountEvent(updateAmount))(updateState)
    case Confirm(deliveryId) => persist(ConfirmEvent(deliveryId)) (updateState)
  }

  override def receiveRecover : Receive = {
    case evt : Evt => updateState(evt)
  }

  def updateState(evt:Evt) = evt match {
    case CountEvent(updateAmount) => deliver(counter, id => CountCommand(id, updateAmount))
    case ConfirmEvent(deliveryId) => confirmDelivery(deliveryId)
  }
}

class FactorialActor extends Actor {
  var count = 0
  def receive = {
    case CountCommand(deliveryId : Long, updateAmount:Int) => {
      count = count + updateAmount
      sender() ! Confirm(deliveryId)
    }
    case "print" => println(count)
  }
}

object GuaranteedDeliveryTest extends App {
  val system = ActorSystem()

  val factorial = system.actorOf(Props[FactorialActor])

  val delActor = system.actorOf(Props(classOf[TestGuaranteedDeliveryActor], factorial.path))

  import system.dispatcher

  system.scheduler.schedule(0 seconds, 2 seconds) { factorial ! "print" }

  for (i <- 1 to 9999)
    delActor ! ExternalRequest(i) 



}

SBT文件

name := "akka_aggregator"

organization := "io.github.ourkid"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.4"

scalacOptions ++= Seq("-unchecked", "-deprecation")

resolvers ++= Seq(
    "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
)

val Akka  = "2.3.7"
val Spray = "1.3.2"

libraryDependencies ++= Seq(
     // Core Akka
    "com.typesafe.akka" %% "akka-actor" % Akka,
    "com.typesafe.akka" %% "akka-cluster" % Akka,
    "com.typesafe.akka" %% "akka-persistence-experimental" % Akka,
    "org.iq80.leveldb" % "leveldb" % "0.7",
    "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8",

    // For future REST API
    "io.spray" %% "spray-httpx" % Spray,
    "io.spray" %% "spray-can" % Spray,
    "io.spray" %% "spray-routing" % Spray,
    "org.typelevel" %% "scodec-core" % "1.3.0",

    // CSV reader    
    "net.sf.opencsv" % "opencsv" % "2.3",

    // Logging
    "com.typesafe.akka" %% "akka-slf4j" % Akka,
    "ch.qos.logback" % "logback-classic" % "1.0.13",

    // Testing
    "org.scalatest" %% "scalatest" % "2.2.1" % "test",
    "com.typesafe.akka" %% "akka-testkit" % Akka % "test",
    "io.spray" %% "spray-testkit" % Spray % "test",
    "org.scalacheck" %% "scalacheck" % "1.11.6" % "test"
)
fork := true
mainClass in assembly := Some("io.github.ourkid.akka.aggregator.TestGuaranteedDeliveryActor")

application.conf文件

##########################################
# Akka Persistence Reference Config File #
##########################################

akka {

  # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
  # to STDOUT)
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # Log level used by the configured loggers (see "loggers") as soon
  # as they have been started; before that, see "stdout-loglevel"
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  loglevel = "DEBUG"

  # Log level for the very basic logger activated during ActorSystem startup.
  # This logger prints the log messages to stdout (System.out).
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  stdout-loglevel = "INFO"

  # Filter of log events that is used by the LoggingAdapter before
  # publishing log events to the eventStream.
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

  # Protobuf serialization for persistent messages
  actor {

    serializers {

      akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer"
      akka-persistence-message = "akka.persistence.serialization.MessageSerializer"
    }

    serialization-bindings {

      "akka.persistence.serialization.Snapshot" = akka-persistence-snapshot
      "akka.persistence.serialization.Message" = akka-persistence-message
    }
  }

  persistence {

    journal {

      # Maximum size of a persistent message batch written to the journal.
      max-message-batch-size = 200

      # Maximum size of a deletion batch written to the journal.
      max-deletion-batch-size = 10000

      # Path to the journal plugin to be used
      plugin = "akka.persistence.journal.leveldb"

      # In-memory journal plugin.
      inmem {

        # Class name of the plugin.
        class = "akka.persistence.journal.inmem.InmemJournal"

        # Dispatcher for the plugin actor.
        plugin-dispatcher = "akka.actor.default-dispatcher"
      }

      # LevelDB journal plugin.
      leveldb {

        # Class name of the plugin.
        class = "akka.persistence.journal.leveldb.LeveldbJournal"

        # Dispatcher for the plugin actor.
        plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

        # Dispatcher for message replay.
        replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"

        # Storage location of LevelDB files.
        dir = "journal"

        # Use fsync on write
        fsync = on

        # Verify checksum on read.
        checksum = off

        # Native LevelDB (via JNI) or LevelDB Java port
        native = on
        # native = off
      }

      # Shared LevelDB journal plugin (for testing only).
      leveldb-shared {

        # Class name of the plugin.
        class = "akka.persistence.journal.leveldb.SharedLeveldbJournal"

        # Dispatcher for the plugin actor.
        plugin-dispatcher = "akka.actor.default-dispatcher"

        # timeout for async journal operations
        timeout = 10s

        store {

          # Dispatcher for shared store actor.
          store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

          # Dispatcher for message replay.
          replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

          # Storage location of LevelDB files.
          dir = "journal"

          # Use fsync on write
          fsync = on

          # Verify checksum on read.
          checksum = off

          # Native LevelDB (via JNI) or LevelDB Java port
          native = on
        }
      }
    }

    snapshot-store {

      # Path to the snapshot store plugin to be used
      plugin = "akka.persistence.snapshot-store.local"

      # Local filesystem snapshot store plugin.
      local {

        # Class name of the plugin.
        class = "akka.persistence.snapshot.local.LocalSnapshotStore"

        # Dispatcher for the plugin actor.
        plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

        # Dispatcher for streaming snapshot IO.
        stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher"

        # Storage location of snapshot files.
        dir = "snapshots"
      }
    }

    view {

      # Automated incremental view update.
      auto-update = on

      # Interval between incremental updates
      auto-update-interval = 5s

      # Maximum number of messages to replay per incremental view update. Set to
      # -1 for no upper limit.
      auto-update-replay-max = -1
    }

    at-least-once-delivery {
      # Interval between redelivery attempts
      redeliver-interval = 5s

      # Maximum number of unconfirmed messages that will be sent in one redelivery burst
      redelivery-burst-limit = 10000

      # After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning`
      # message will be sent to the actor.
      warn-after-number-of-unconfirmed-attempts = 5

      # Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is
      # allowed to hold in memory.
      max-unconfirmed-messages = 100000
    }

    dispatchers {
      default-plugin-dispatcher {
        type = PinnedDispatcher
        executor = "thread-pool-executor"
      }
      default-replay-dispatcher {
        type = Dispatcher
        executor = "fork-join-executor"
        fork-join-executor {
          parallelism-min = 2
          parallelism-max = 8
        }
      }
      default-stream-dispatcher {
        type = Dispatcher
        executor = "fork-join-executor"
        fork-join-executor {
          parallelism-min = 2
          parallelism-max = 8
        }
      }
    }
  }
}

正确输出:

18:02:36.684 [default-akka.actor.default-dispatcher-3] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started
0
18:02:36.951 [default-akka.actor.default-dispatcher-14] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl]
18:02:36.966 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent]
3974790
24064453
18:02:42.313 [default-akka.actor.default-dispatcher-11] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent]
49995000
49995000
49995000
49995000

运行不正确:

17:56:22.493 [default-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - Default Loggers started
0
17:56:22.750 [default-akka.actor.default-dispatcher-2] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl]
17:56:22.765 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent]
3727815
22167811
17:56:28.391 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent]
49995000
51084018
51084018
52316760
52316760
52316760
52316760
52316760

另一个错误的运行

17:59:12.122 [default-akka.actor.default-dispatcher-3] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started
0
17:59:12.387 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl]
17:59:12.402 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent]
2982903
17710176
49347145
17:59:18.204 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent]
51704199
51704199
55107844
55107844
55107844
55107844

dk14.. 10

你正在使用AtLeastOnceDelivery语义.正如它在这里所说:

注意至少一次传送意味着并不总是保留原始邮件发送顺序,并且目标可能会收到重复的邮件.这意味着语义与普通的ActorRef发送操作不匹配:

由于崩溃后可能重新发送并且目标消息的重新启动仍然传递给新的演员化身,因此不会保留同一发送者 - 接收者对的最多一次传递消息顺序这些语义类似于ActorPath表示(参见Actor Lifecycle),因此在传递消息时需要提供路径而不是引用.消息通过actor选择发送到路径.

因此可能不止一次收到一些数字.您可以忽略内部的重复数字FactorialActor或不使用此语义.

1 个回答
  • 你正在使用AtLeastOnceDelivery语义.正如它在这里所说:

    注意至少一次传送意味着并不总是保留原始邮件发送顺序,并且目标可能会收到重复的邮件.这意味着语义与普通的ActorRef发送操作不匹配:

    由于崩溃后可能重新发送并且目标消息的重新启动仍然传递给新的演员化身,因此不会保留同一发送者 - 接收者对的最多一次传递消息顺序这些语义类似于ActorPath表示(参见Actor Lifecycle),因此在传递消息时需要提供路径而不是引用.消息通过actor选择发送到路径.

    因此可能不止一次收到一些数字.您可以忽略内部的重复数字FactorialActor或不使用此语义.

    2022-12-11 02:56 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有