作者:坚韧稻草 | 来源:互联网 | 2023-02-04 17:51
问题标题可能不那么有用,因为我正在尝试实现各种功能.我想根据他发送的标题授权调用者,并将此信息传播给gRPC方法处理程序.问题在于授权过程的异步性质.我最终得到了这个:
case class AsyncContextawareInterceptor[A](
f: Metadata ? Future[Either[Status, (Context.Key[A], A)]]
)(implicit val system: ActorSystem)
extends ServerInterceptor
with AnyLogging {
import system.dispatcher
sealed trait Msg
case object HalfClose extends Msg
case object Cancel extends Msg
case object Complete extends Msg
case object Ready extends Msg
case class Message[T](msg: T) extends Msg
override def interceptCall[ReqT, RespT](call: ServerCall[ReqT, RespT],
headers: Metadata,
next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] =
new ServerCall.Listener[ReqT] {
private val stash = new java.util.concurrent.ConcurrentLinkedQueue[Msg]()
private var interceptor: Option[ServerCall.Listener[ReqT]] = None
private def enqueueAndProcess(msg: Msg) =
if (interceptor.isDefined) processMessage(msg) else stash.add(msg)
private def processMessage(msg: Msg) = msg match {
case HalfClose ? interceptor.foreach(_.onHalfClose)
case Cancel ? interceptor.foreach(_.onCancel)
case Complete ? interceptor.foreach(_.onComplete)
case Ready ? interceptor.foreach(_.onReady)
case Message(msg: ReqT @unchecked) ? interceptor.foreach(_.onMessage(msg))
}
private def processMessages() = while (!stash.isEmpty) {
Option(stash.poll).foreach(processMessage)
}
override def onHalfClose(): Unit = enqueueAndProcess(HalfClose)
override def onCancel(): Unit = enqueueAndProcess(Cancel)
override def onComplete(): Unit = enqueueAndProcess(Complete)
override def onReady(): Unit = enqueueAndProcess(Ready)
override def onMessage(message: ReqT): Unit = enqueueAndProcess(Message(message))
f(headers).map {
case Right((k, v)) ?
val cOntext= Context.current.withValue(k, v)
interceptor = Some(Contexts.interceptCall(context, call, headers, next))
processMessages()
case Left(status) ? call.close(status, new Metadata())
}.recover {
case t: Throwable ?
log.error(t, "AsyncContextawareInterceptor future failed")
call.close(Status.fromThrowable(t), new Metadata())
}
}
}
object AuthInterceptor {
val BOTID_CONTEXT_KEY: Context.Key[Int] = Context.key[Int]("botId")
val TOKEN_HEADER_KEY: Metadata.Key[String] = Metadata.Key.of[String]("token", Metadata.ASCII_STRING_MARSHALLER)
def authInterceptor(resolver: String ? Future[Option[Int]])(implicit system: ActorSystem): ServerInterceptor =
AsyncContextawareInterceptor { metadata ?
import system.dispatcher
(for {
token ? OptionT.fromOption[Future](Option(metadata.get(TOKEN_HEADER_KEY)))
botId ? OptionT(resolver(token))
} yield botId).value.map {
case Some(id) ? Right(BOTID_CONTEXT_KEY ? id)
case None ? Left(Status.PERMISSION_DENIED)
}
}
}
这工作(我的意思是,运行没有例外:)),但当我AuthInterceptor.BOTID_CONTEXT_KEY.get
在我的方法处理程序中,它产生null
.
也许,有一种更好的方法来处理异步的东西?