企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
Twitter最重要的标准库是 [Util](http://github.com/twitter/util) 和 [Finagle](https://github.com/twitter/finagle)。Util 可以理解为Scala和Java的标准库扩展,提供了标准库中没有的功能或已有功能的更合适的实现。Finagle 是我们的RPC系统,核心分布式系统组件。 ### Future Futures已经在并发一节中简单讨论过。它是调异步处理的中心机制,渗透在我们代码库中,也是Finagle的核心。Futures允许组合并发事件,简化了高并发操作。也是JVM上异步并发的一种高效的实现。 Twitter的future是*异步*的,所以基本上任何操作(阻塞操作)——基本上任何可以suspend它的线程的执行;网络IO和磁盘IO是就是例子——必须由系统处理,它为结果提供future。Finagle为网络IO提供了这样一种系统。 Futures清晰简单:它们持有一个尚未完成运算结果的 promise 。它们是一个简单的容器——一个占位符。一次计算当然可能会失败,这种状况必须被编码:一个Future可以是三种状态之一: pending, failed, completed。 ### 闲话: *组合(composition)* 让我们重新审视我们所说的组合:将简单的组件合成一个更复杂的。函数组合的一个权威的例子:给定函数 f 和 g,组合函数 (g∘f)(x) = g(f(x)) ——结果先对 x使用f函数,然后在使用g函数——用Scala来写: ~~~ val f = (i: Int) => i.toString val g = (s: String) => s+s+s val h = g compose f // : Int => String scala> h(123) res0: java.lang.String = 123123123 ~~~ 复合函数h,是个新的函数,由之前定义的f和g函数合成。 Futures是一种集合类型——它是个包含0或1个元素的容器——你可以发现他们有标准的集合方法(eg:map, filter, foreach)。因为Future的值是延迟的,结果应用这些方法中的任何一种必然也延迟;在 ~~~ val result: Future[Int] val resultStr: Future[String] = result map { i => i.toString } ~~~ 函数 { i => i.toString } 不会被调用,直到int值可用;转换集合的resultStr在可用之前也一直是待定状态。 List可以被扁平化(flattened): ~~~ val listOfList: List[List[Int]] = .. val list: List[Int] = listOfList.flatten ~~~ 这对future也是有意义的: ~~~ val futureOfFuture: Future[Future[Int]] = .. val future: Future[Int] = futureOfFuture.flatten ~~~ 因为future是延迟的,flatten的实现——立即返回——不得不返回一个等待外部future (`**Future[**Future[Int]**]**`) 完成的future (`Future[**Future[Int]**]`).如果外部future失败,内部flattened future也将失败。 Future (类似List) 也定义了flatMap;Future[A] 定义方法flatMap的签名 ~~~ flatMap[B](f: A => Future[B]): Future[B] ~~~ 如同组合 map 和 flatten,我们可以这样实现: ~~~ def flatMap[B](f: A => Future[B]): Future[B] = { val mapped: Future[Future[B]] = this map f val flattened: Future[B] = mapped.flatten flattened } ~~~ 这是一种有威力的组合!使用flatMap我们可以定义一个 Future 作为两个Future序列的结果。第二个future 的计算基于第一个的结果。想象我们需要2次RPC调用来验证一个用户身份,我们可以用下面的方式组合操作: ~~~ def getUser(id: Int): Future[User] def authenticate(user: User): Future[Boolean] def isIdAuthed(id: Int): Future[Boolean] = getUser(id) flatMap { user => authenticate(user) } ~~~ 这种组合类型的一个额外的好处是错误处理是内置的:如果getUser(..)或authenticate(..)失败,future 从 isAuthred(..)返回时将会失败。这里我们没有额外的错误处理的代码。 #### 风格 Future回调方法(respond, onSuccess, onFailure, ensure) 返回一个新的Future,并链接到调用者。这个Future被保证只有在它调用者完成后才完成,使用模式如下: ~~~ acquireResource() future onSuccess { value => computeSomething(value) } ensure { freeResource() } ~~~ freeResource() 被保证只有在 computeSomething之后才执行,这样就模拟了try-finally 模式。 使用 onSuccess替代 foreach —— 它与 onFailure 方法对称,命名的意图更明确,并且也允许 chaining。 永远避免直接创建Promise实例: 几乎每一个任务都可以通过使用预定义的组合子完成。这些组合子确保错误和取消是可传播的, 通常鼓励的数据流风格的编程,不再需要同步和volatility声明。 用尾递归风格编写的代码不再导致堆栈空间泄漏,并使得以数据流风格高效的实现循环成为可能: ~~~ case class Node(parent: Option[Node], ...) def getNode(id: Int): Future[Node] = ... def getHierarchy(id: Int, nodes: List[Node] = Nil): Future[Node] = getNode(id) flatMap { case n@Node(Some(parent), ..) => getHierarchy(parent, n :: nodes) case n => Future.value((n :: nodes).reverse) } ~~~ Future定义很多有用的方法: 使用 Future.value() 和 Future.exception() 来创建未满意(pre-satisfied) 的future。Future.collect(), Future.join() 和 Future.select() 提供了组合子将多个future合成一个(例如:scatter-gather操作的gather部分)。 #### Cancellation Future实现了一种弱形式的取消。调用Future#cancel 不会直接终止运算,而是发送某个级别的可被任何处理查询的触发信号,最终满足这个future。Cancellation信号流向相反的方向:一个由消费者设置的cancellation信号,会传播到它的生产者。生产者使用 Promise的onCancellation来监听信号并执行相应的动作。 这意味这cancellation语意上依赖生产者,没有默认的实现。cancellation只是一个提示。 #### Local Util的[Local](https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Local.scala#L40)提供了一个位于特定的future派发树(dispatch tree)的引用单元(cell)。设定一个local的值,使这个值可以用于被同一个线程的Future 延迟的任何计算。有一些类似于thread locals(注:Java中的线程机制),不同的是它们的范围不是一个Java线程,而是一个 future 线程树。在 ~~~ trait User { def name: String def incrCost(points: Int) } val user = new Local[User] ... user() = currentUser rpc() ensure { user().incrCost(10) } ~~~ 在 ensure块中的 user() 将在回调被添加的时候引用 user local的值。 就thread locals来说,我们的Locals非常的方便,但要尽量避免使用:除非确信通过显式传递数据时问题不能被充分的解决,哪怕解决起来有些繁重。 Locals有效的被核心库使用在非常常见的问题上——线程通过RPC跟踪,传播监视器,为future的回调创建stack traces——任何其他解决方法都使得用户负担过度。Locals在几乎任何其他情况下都不适合。 ### Offer/Broker 并发系统由于需要协调访问数据和资源而变得复杂。[Actor](http://www.scala-lang.org/api/current/scala/actors/Actor.html)提出一种简化的策略:每一个actor是一个顺序的进程(process),保持自己的状态和资源,数据通过消息的方式与其它actor共享。 共享数据需要actor之间通信。 Offer/Broker 建立于Actor之上,以这三种重要的方式表现:1,通信通道(Brokers)是first class——即发送消息需要通过Brokers,而非直接到actor。2, Offer/Broker 是一种同步机制:通信会话是同步的。 这意味我们可以用 Broker做为协调机制:当进程a发送一条信息给进程b;a和b都要对系统状态达成一致。3, 最后,通信可以选择性地执行:一个进程可以提出几个不同的通信,其中的一个将被获取。 为了以一种通用的方式支持选择性通信(以及其他组合),我们需要将通信的描述和执行解耦。这正是Offer做的——它是一个持久数据用于描述一次通信;为了执行这个通信(offer执行),我们通过它的sync()方法同步 ~~~ trait Offer[T] { def sync(): Future[T] } ~~~ 返回 Future[T] 当通信被获取的时候生成交换值。 Broker通过offer协调值的交换——它是通信的通道: ~~~ trait Broker[T] { def send(msg: T): Offer[Unit] val recv: Offer[T] } ~~~ 所以,当创建两个offer ~~~ val b: Broker[Int] val sendOf = b.send(1) val recvOf = b.recv ~~~ sendOf和recvOf都同步 ~~~ // In process 1: sendOf.sync() // In process 2: recvOf.sync() ~~~ 两个offer都获取并且值1被交换。 通过将多个offer和Offer.choose绑定来执行可选择通信。 ~~~ def choose[T](ofs: Offer[T]*): Offer[T] ~~~ 上面的代码生成一个新的offer,当同步时获取一个特定的ofs——第一个可用的。当多个都立即可用时,随机获取一个。 Offer对象有些一次性的Offers用于与来自Broker的Offer构建。 ~~~ Offer.timeout(duration): Offer[Unit] ~~~ offer在给定时间后激活。Offer.never将用于不会有效,Offer.const(value)在给定值后立即有效。这些操作由选择性通信来组合是非常有用的。例如,在一个send操作中使用超时: ~~~ Offer.choose( Offer.timeout(10.seconds), broker.send("my value") ).sync() ~~~ 人们可能会比较 Offer/Broker 与[SynchronousQueue](http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html),他们有细微但非常重要的区别。Offer可以被组合,而queue不能。例如,考虑一组queues,描述为 Brokers: ~~~ val q0 = new Broker[Int] val q1 = new Broker[Int] val q2 = new Broker[Int] ~~~ 现在让我们为读取创建一个合并的queue ~~~ val anyq: Offer[Int] = Offer.choose(q0.recv, q1.recv, q2.recv) ~~~ anyq是一个将从第一个可用的queue中读取的offer。注意 anyq 仍是同步的——我们仍然拥有底层队列的语义。这类组合是不可能用queue实现的。 #### 例子:一个简单的连接池 连接池在网络应用中很常见,并且它们的实现常常需要技巧——例如,在从池中获取一个连接的时候,通常需要超时机制,因为不同的客户端有不同的延迟需求。池的简单原则:维护一个连接队列,满足那些进入的等待者。使用传统的同步原语,这通常需要两个队列(queues):一个用于等待者(当没有连接可用时),一个用于连接(当没有等待者时)。 使用 Offer/Brokers ,可以表达得非常自然: ~~~ class Pool(conns: Seq[Conn]) { private[this] val waiters = new Broker[Conn] private[this] val returnConn = new Broker[Conn] val get: Offer[Conn] = waiters.recv def put(c: Conn) { returnConn ! c } private[this] def loop(connq: Queue[Conn]) { Offer.choose( if (connq.isEmpty) Offer.never else { val (head, rest) = connq.dequeue waiters.send(head) { _ => loop(rest) } }, returnConn.recv { c => loop(connq enqueue c) } ).sync() } loop(Queue.empty ++ conns) } ~~~ loop总是提供一个归还的连接,但只有queue非空的时候才会send。 使用持久化队列(persistent queue)更进一步简化逻辑。与连接池的接口也是通过Offer实现,所以调用者如果愿意设置timeout,他们可以通过利用组合子(combinators)来做: ~~~ val conn: Future[Option[Conn]] = Offer.choose( pool.get { conn => Some(conn) }, Offer.timeout(1.second) { _ => None } ).sync() ~~~ 实现timeout不需要额外的记账(bookkeeping);这是因为Offer的语义:如果Offer.timeout被选择,不会再有offer从池中获得——连接池和它的调用者在各自waiter的broker上不必同时同意接受和发送。 #### 埃拉托色尼筛子(Sieve of Eratosthenes 译注:一种用于筛选素数的算法) 把并发程序构造为一组顺序的同步通信进程,通常很有用——有时程序被大大地简化了。Offer和Broker提供了一组工具来让它简单并一致。确实,它们的应用超越了我们可能认为是经典并发性问题——并发编程(有Offer/Broker的辅助)是一种有用的构建工具,正如子例程(subroutines),类,和模块都是——来自CSP(译注:Communicating sequential processes的缩写,即通信顺序进程)的重要思想。 这里有一个[埃拉托色尼筛子](http://ja.wikipedia.org/wiki/%E3%82%A8%E3%83%A9%E3%83%88%E3%82%B9%E3%83%86%E3%83%8D%E3%82%B9%E3%81%AE%E7%AF%A9)可以构造为一个针对一个整数流(stream of integers)的连续的应用过滤器 。首先,我们需要一个整数的源(source of integers): ~~~ def integers(from: Int): Offer[Int] = { val b = new Broker[Int] def gen(n: Int): Unit = b.send(n).sync() ensure gen(n + 1) gen(from) b.recv } ~~~ integers(n) 方法简单地提供了从n开始的所有连续的整数。然后我们需要一个过滤器: ~~~ def filter(in: Offer[Int], prime: Int): Offer[Int] = { val b = new Broker[Int] def loop() { in.sync() onSuccess { i => if (i % prime != 0) b.send(i).sync() ensure loop() else loop() } } loop() b.recv } ~~~ filter(in, p) 方法返回的offer删除了in中的所有质数(prime)的倍数。最终我们定义了我们的筛子(sieve): ~~~ def sieve = { val b = new Broker[Int] def loop(of: Offer[Int]) { for (prime <- of.sync(); _ <- b.send(prime).sync()) loop(filter(of, prime)) } loop(integers(2)) b.recv } ~~~ loop() 工作很简单:从of中读取下一个质数,然后对of应用过滤器排除这个质数。loop不断的递归,持续的质数被过滤,于是我们得到了筛选结果。我们现在打印前10000个质数: ~~~ val primes = sieve 0 until 10000 foreach { _ => println(primes.sync()()) } ~~~ 除了构造简单,组件正交,这种做法也给你一种流式筛子(streaming sieve):你不需要事先计算出你感兴趣的质数集合,从而进一步提高了模块化。