热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Scala(java)grpcasync拦截器状态传播

如何解决《Scala(java)grpcasync拦截器状态传播》经验,需要怎么解决?

问题标题可能不那么有用,因为我正在尝试实现各种功能.我想根据他发送的标题授权调用者,并将此信息传播给gRPC方法处理程序.问题在于授权过程的异步性质.我最终得到了这个:

case class AsyncContextawareInterceptor[A](
    f: Metadata ? Future[Either[Status, (Context.Key[A], A)]]
)(implicit val system: ActorSystem)
    extends ServerInterceptor
    with AnyLogging {
  import system.dispatcher

  sealed trait Msg
  case object HalfClose extends Msg
  case object Cancel extends Msg
  case object Complete extends Msg
  case object Ready extends Msg
  case class Message[T](msg: T) extends Msg

  override def interceptCall[ReqT, RespT](call: ServerCall[ReqT, RespT],
                                          headers: Metadata,
                                          next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] =
    new ServerCall.Listener[ReqT] {
      private val stash = new java.util.concurrent.ConcurrentLinkedQueue[Msg]()
      private var interceptor: Option[ServerCall.Listener[ReqT]] = None

      private def enqueueAndProcess(msg: Msg) =
        if (interceptor.isDefined) processMessage(msg) else stash.add(msg)

      private def processMessage(msg: Msg) = msg match {
        case HalfClose ? interceptor.foreach(_.onHalfClose)
        case Cancel ? interceptor.foreach(_.onCancel)
        case Complete ? interceptor.foreach(_.onComplete)
        case Ready ? interceptor.foreach(_.onReady)
        case Message(msg: ReqT @unchecked) ? interceptor.foreach(_.onMessage(msg))
      }

      private def processMessages() = while (!stash.isEmpty) {
        Option(stash.poll).foreach(processMessage)
      }

      override def onHalfClose(): Unit = enqueueAndProcess(HalfClose)

      override def onCancel(): Unit = enqueueAndProcess(Cancel)

      override def onComplete(): Unit = enqueueAndProcess(Complete)

      override def onReady(): Unit = enqueueAndProcess(Ready)

      override def onMessage(message: ReqT): Unit = enqueueAndProcess(Message(message))

      f(headers).map {
        case Right((k, v)) ?
          val cOntext= Context.current.withValue(k, v)
          interceptor = Some(Contexts.interceptCall(context, call, headers, next))
          processMessages()
        case Left(status) ? call.close(status, new Metadata())
      }.recover {
        case t: Throwable ?
          log.error(t, "AsyncContextawareInterceptor future failed")
          call.close(Status.fromThrowable(t), new Metadata())
      }
    }
}

object AuthInterceptor {
  val BOTID_CONTEXT_KEY: Context.Key[Int] = Context.key[Int]("botId")
  val TOKEN_HEADER_KEY: Metadata.Key[String] = Metadata.Key.of[String]("token", Metadata.ASCII_STRING_MARSHALLER)

  def authInterceptor(resolver: String ? Future[Option[Int]])(implicit system: ActorSystem): ServerInterceptor =
    AsyncContextawareInterceptor { metadata ?
      import system.dispatcher
      (for {
        token ? OptionT.fromOption[Future](Option(metadata.get(TOKEN_HEADER_KEY)))
        botId ? OptionT(resolver(token))
      } yield botId).value.map {
        case Some(id) ? Right(BOTID_CONTEXT_KEY ? id)
        case None ? Left(Status.PERMISSION_DENIED)
      }
    }
}

这工作(我的意思是,运行没有例外:)),但当我AuthInterceptor.BOTID_CONTEXT_KEY.get在我的方法处理程序中,它产生null.

也许,有一种更好的方法来处理异步的东西?


推荐阅读
  • dataguard日志传输模式解析_SOFAJRaft 日志复制pipeline 实现剖析 | SOFAJRaft 实现原理
    SOFAStack(ScalableOpenFinancialArchitectureStack)是蚂蚁金服自主研发的金融级分布式架构,包 ... [详细]
  • java线程池的实现原理源码分析
    这篇文章主要介绍“java线程池的实现原理源码分析”,在日常操作中,相信很多人在java线程池的实现原理源码分析问题上存在疑惑,小编查阅了各式资 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • ConsumerConfiguration在kafka0.9使用JavaConsumer替代了老版本的scalaConsumer。新版的配置如下:bootstrap. ... [详细]
  • rust编程这篇文章是关于我通过解决Twitch上尚未解决的所有CtCI问题来学习Rust的经验。英国科学博物馆集团AdaLovelace的肖像Rust徽标,由Moz ... [详细]
  • 透明木头问世!“木头大王”胡良兵再发顶刊,已成立公司加速落地69
    道翰天琼认知智能机器人平台API接口大脑为您揭秘。木材是人类最古老的建筑材料之一,也是一种绿色节能材料,我们对其外观的认知可谓根深蒂固。如今,随着透明木材的问世,这一观感将被颠覆。 ... [详细]
  • Flink(三)IDEA开发Flink环境搭建与测试
    一.IDEA开发环境1.pom文件设置1.8 ... [详细]
  • 在这分布式系统架构盛行的时代,很多互联网大佬公司开源出自己的分布式RPC系统框架,例如:阿里的dubbo,谷歌的gRPC,apache的Thrift。而在我们公司一直都在推荐使用d ... [详细]
  • 阿里云监控URL的配置笔记
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了阿里云监控URL的配置笔记相关的知识,希望对你有一定的参考价值。有很多细节需要记录 ... [详细]
  • 2018年人工智能大数据的爆发,学Java还是Python?
    本文介绍了2018年人工智能大数据的爆发以及学习Java和Python的相关知识。在人工智能和大数据时代,Java和Python这两门编程语言都很优秀且火爆。选择学习哪门语言要根据个人兴趣爱好来决定。Python是一门拥有简洁语法的高级编程语言,容易上手。其特色之一是强制使用空白符作为语句缩进,使得新手可以快速上手。目前,Python在人工智能领域有着广泛的应用。如果对Java、Python或大数据感兴趣,欢迎加入qq群458345782。 ... [详细]
  • 本文介绍了一道经典的状态压缩题目——关灯问题2,并提供了解决该问题的算法思路。通过使用二进制表示灯的状态,并枚举所有可能的状态,可以求解出最少按按钮的次数,从而将所有灯关掉。本文还对状压和位运算进行了解释,并指出了该方法的适用性和局限性。 ... [详细]
  • Istio是一个用来连接、管理和保护微服务的开放平台。Istio提供一种简单的方式来为已部署的服务建 ... [详细]
  • 协程asyncio_asyncio异步编程,你搞懂了吗?
    同步视频教程:asyncio异步编程-网易云课堂协程&异步编程(asyncio)协程(Coroutine),也可以被称为微线 ... [详细]
author-avatar
坚韧稻草
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有