构建自己的rpc protocol

rpc的的框架写过好几个了,协议部分大都是参照redis mysql mongodb 的协议来实现的,其中有一个已经在生产环境中运行了几个月了,到目前为止运行良好。用现有的一些协议来进行通信有一个好处就是,rpc的client部分只是简单的封装一下他们现有client 的实现就可以。不好的地方就是现有的协议有很多内容不是你真正想要的,比如我在用mongodb的协议在实现一个rpc的时候,我只用到了他的query请求,但是我要在server端实现跟query相关的一系列的协议处理。这并不是我想要的。

当然要完全实现自己的一套协议的话,需要做的也很多:要实现自己的client,rpc消息的打包解包啊之类的工作,和使用redis mysql mongodb 的协议相比我更喜欢一个纯粹的比较干净的rpc协议。

先说一下这个rpc的用途吧,前端用nodejs调用后端的接口然后进行视图的渲染,后端用golang提供数据层的封装。综合考虑之后决定使用bson来进行数据的编码和解码。

一个请求分为两部分:消息头 和 消息体。

消息头:

消息头用四个字节表示,采用大端字节序进行表示,这样的话每个数据包最大可以是2的32次方,如果消息体有20个字节的话。消息头应该就是这样的:

00000000 00000000 00000000 00010100

消息体:

消息体主要分为四种:call reply error info,并全部采用bson编码。

call 消息体的格式为:

{call:{funcname:'function name',arguments:'arguments'}}

reply 消息体的格式为:

{reply:'result'}  or  {reply:noreply}

error 消息体的格式为:

{error:{message:'message',detail:'detail'}}

message: 是一个标示符,用来表示错误类型

detail:是错误的描述 注意这里的错误信息有时候会返回给用户

info 消息体的格式:

暂时先保留

以上这些基本上能满足我的需求了,下一步就是实现了。

发表在 未分类 | 评论关闭

戴帽子

最近彻底爱上了戴帽子,

帽沿下压,

整个世界就找不到你了.

发表在 未分类 | 评论关闭

根据mongo protocol 写了个rpc 框架

最近研究协议上瘾了,从memcache protocol , redis protocol,再到复杂的mysql protocol,还有最近几天看的mongo protcol .最让我感到欢喜的是mongo protocol. 协议很简洁, 外加用bson来序列化数据,bson支持的数据类型很多可以和python进行互相转化.选用mongo protocol 和bson 主要有如下优点:

  1. 数据量很小而且解析很快(具体的可以自行google).
  2. 之前有研究过protocol rpc,给人的感觉就是不太灵活,输入和返回都要定义一个对象.用bson 来传的话全是dict 操作,很灵活.
  3. mongodb 客户端众多,简单的对客户端进行封装一下就是一个rpc 的client,理论上可以支持很多种语言,
  4. 可以很好的mongodb 结合起来

以上四点就足够让我兴奋的了,还有就是前一段时间学了twisted,一直也想用twisted 写点东西,于是这个oocrpc就出来了.
正好最近也在看<<失控>> (Out of Control), 就给它起个oocrpc 的名字,名字听起来有点霸气~~~~~~

写代码的时候同时参考了gashero写的 magicrpctxmongo 的代码.

发表在 python, twisted | 评论关闭

mongodb protocol 分析

Mongo Wire Protocol

前一段时间研究一下mysql protocol 还有 mysql udf,mysql protocol 搞明白了,也用twisted 实现了一下mysql protocol.但结果令我很恼火,mysql protocol 不足够简洁,自己捣鼓的一个项目也做到一半做不下去了.然后就看到了mongodb protocol ,简单的看了一下–清爽,简洁.甚合我意,于是,研究之.

Introduction

mongo protocol 是一个简单的依据socket 的,请求-回应型的协议.用来进行mongo client 和 mongo server 之间的数据交互.
client 可以通过一个正常的 tcp/ip socket 来连接server.默认的client 和server 之间没有handshake(握手).

Messages Types and Formats

下面只讲几个我可能用到的几个消息类型 和格式

Standard Message Header

一般来说,叫做消息头.mongodb protocol 的消息都包含一个消息头.消息头的结构如下:

struct MsgHeader {
    int32   messageLength; // total message size, including this
    int32   requestID;     // identifier for this message
    int32   responseTo;    // requestID from the original request
                           //   (used in reponses from db)
    int32   opCode;        // request type - see table below
}

messageLength: 这个是整个消息的字节长度,包括它自己本身
requestID: 这个是client 生成的这个消息的标识符,server端会把这个requestID 放在responseTo 中传回来,client 就可以把返回的消息关联起来
responseTo:根据上面讲,值跟client 中的requestID 是一样的.
opCode: 下面会讲到

Opcode Name opCode value Comment
OP_REPLY 1 Reply to a client request. responseTo is set
OP_MSG 1000 generic msg command followed by a string
OP_UPDATE 2001 update document
OP_INSERT 2002 insert new document
RESERVED 2003 formerly used for OP_GET_BY_OID
OP_QUERY 2004 query a collection
OP_GET_MORE 2005 Get more data from a query. See Cursors
OP_DELETE 2006 Delete documents
OP_KILL_CURSORS 2007 Tell database client is done with a cursor

每一项占四个字节,MsgHeader 总共16个字节.

OP_QUERY

OP_QUERY 消息用来查询database 中的文档,格式如下:

struct OP_QUERY {
    MsgHeader header;                // standard message header
    int32     flags;                  // bit vector of query options.  See below for details.
    cstring   fullCollectionName;    // "dbname.collectionname"
    int32     numberToSkip;          // number of documents to skip
    int32     numberToReturn;        // number of documents to return
                                     //  in the first OP_REPLY batch
    document  query;                 // query object.  See below for details.
  [ document  returnFieldSelector; ] // Optional. Selector indicating the fields
                                     //  to return.  See below for details.
}

flags: 值如下

bit num name description
0 Reserved Must be set to 0.
1 TailableCursor Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object’s position. You can resume using the cursor later, from where it was located, if more data were received. Like any “latent cursor”, the cursor may become invalid at some point (CursorNotFound) – for example if the final object it references were deleted.
2 SlaveOk Allow query of replica slave. Normally these return an error except for namespace “local”.
3 OplogReplay Internal replication use only – driver should not set
4 NoCursorTimeout The server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to prevent that.
5 AwaitData Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.
6 Exhaust Stream the data down full blast in multiple “more” packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
7 Partial Get partial results from a mongos if some shards are down (instead of throwing an error)
8-31 Reserved Must be set to 0.

fullCollectionName: collection(集合)名字.完整的collection 名字应该包括database(数据库)名字和collection(集合名字),中间用一个点连接.例如,database 名字为foo,collection 名字为bar.完成的collection 名字就为”foo.bar”
numberToSkip: 查询结果中忽略的数量,和sql 中的offset(位移)差不多.
numberToReturn: 限制返回结果的数量.如果查询的结果大于numberToReturn,server 端会建立一个标尺,并返回这个cursorID.和sql 中的limit 差不多
query: 一个包含查询信息的bson 格式的文档.这个查询包含一个或多个元素.可能的元素包括$query,$orderby,$hint,$explain,$snapshot.
returnFieldsSelector: 可选的bson 文档,来限制返回结果中的字段.

dadabase 会针对 OP_QUERY 返回一个OP_REPLY 消息.

OP_GETMORE

OP_QUERY 消息用来查询database 中的文档,格式如下:

struct {
    MsgHeader header;        // standard message header
    int32     ZERO;               // 0 - reserved for future use
    cstring   fullCollectionName; // "dbname.collectionname"
    int32     numberToReturn;   // number of documents to return
    int64     cursorID;  // cursorID from the OP_REPLY
}

fullCollectionName : collection(集合)名字.完整的collection 名字应该包括database(数据库)名字和collection(集合名字),中间用一个点连接.例如,database 名字为foo,collection 名字为bar.完成的collection 名字就为”foo.bar”
numberToReturn: 限制返回结果的数量.如果查询的结果大于numberToReturn,server 端会建立一个标尺,并返回这个cursorID.和sql 中的limit 差不多
cursorID: 执行OP_QUERY 时从database 返回的OP_REPLY 消息中的cursorID.

dadabase 会针对OP_GETMORE 返回一个OP_REPLY 消息.

OP_REPLY

OP_REPLY 消息用来回复OP_QUERY 和OP_GET_MORE .OP_REPLY 的格式如下:

struct {
    MsgHeader header;         // standard message header
    int32     responseFlags;  // bit vector - see details below
    int64     cursorID;       // cursor id if client needs to do get more's
    int32     startingFrom;   // where in the cursor this reply is starting
    int32     numberReturned; // number of documents in the reply
    document* documents;      // documents
}

responseFlags :

bit num name description
0 CursorNotFound Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
1 QueryFailure Set when query failed. Results consist of one document containing an “$err” field describing the failure.
2 ShardConfigStale Drivers should ignore this. Only mongos will ever see this set, in which case, it needs to update config from the server.
3 AwaitCapable Set when the server supports the AwaitData Query option. If it doesn’t, a client should sleep a little between getMore’s of a Tailable cursor. Mongod version 1.6 supports AwaitData and thus always sets AwaitCapable.
4-31 Reserved Ignore

cursorID:如果一个查询结果符合OP_REPLY 包,cursorID 会为0.cursorID 会在OP_GET_MORE 被用到用来获取更多的数据.

发表在 python | 评论关闭

twisted系列教程十九–cancel deferred

Part 19: I Thought I Wanted It But I Changed My Mind

原文:http://krondo.com/blog/?p=2601
作者:dave
译者:notedit
时间:2011.07.03

Introduction

twisted 是一个正在发展的项目,twisted 的开发者们会添加一些新的特色或者扩展旧的.随着twisted 10.1.0 的发布,开发者们增加了一个新的功能–取消,这个就是我们今天要讲的内容.
异步的程序把request 和response 进行了解耦,于是增加了一个新的可能:在发送一个请求和得到返回结果之间你可以决定是否你还要返回结果.细想一下第十四部分的poetry proxy server.下面是这个代理怎么工作的(第一次没有缓存的时候):

  1. 一个请求进来
  2. 这个proxy 连接真正的server去获取诗
  3. 一但获取到完整的诗,发送给client


一切工作的很好,但是假如client在得到诗之前挂起了怎么办?这样的话我们的proxy 就会被卡住.在出现这种情况的时候我们最好的办法就是关掉连接.

回想一下图片十五,一个描述了同步程序中概念上的流程控制的图表.在这个图片中我们可以看到函数调用往下走,异常往上走.假如我们想取消一个同步的调用,程序的流程会和函数调用的方向是一样的,从高层代码到底层代码.图片三十八描述了这个过程:
图片三十八
当然,在一个同步的程序中这个是不可能的因为高层的代码不会恢复运行直到底层的代码运行完,在这时就没有什么可以取消的了.但是在一个异步的程序中,高层的代码在底层的代码运行完之前控制着程序,这就让我们有能力取消底层次的请求.

在一个twisted 程序中,底层的请求是被deferred对象所体现的.在deferred 中信息流是向下的,从底层的代码到高层的代码,这个和同步程序中return 信息流是相同的.从twsited 10.1.0 开始,高层的代码可以向底层的代码传送信息–它可以告诉底层的代码它不再想要返回的结果了.看图片三十九:
图片三十九

Canceling Deferreds

先让我们看一些例子来搞明白取消deferred 是怎样工作的.记住,运行这一部分的代码你需要将twisted 升级到10.1.0 .首先deferred-cancel/defer-cancel-1.py:

from twisted.internet import defer

def callback(res):
    print 'callback got:', res

d = defer.Deferred()
d.addCallback(callback)
d.cancel()
print 'done'

利用这个取消的特性,deferred 类增加了一个cancel 的方法.这个例子程序创建一个deferred,增加一个callback,然后在触发这个deferred之前取消它.下面是输出:

done
Unhandled error in Deferred:
Traceback (most recent call last):
Failure: twisted.internet.defer.CancelledError:

ok,取消一个deferred 触发了errback,我们正常的callback 没有被调用.并且出现的错误是twisted.internet.defer.CancelledError.让我们在deferred-cancel/defer-cancel-2.py增加一个errback:

from twisted.internet import defer

def callback(res):
    print 'callback got:', res

def errback(err):
    print 'errback got:', err

d = defer.Deferred()
d.addCallbacks(callback, errback)
d.cancel()
print 'done'

我们得到如下的输出:

errback got: [Failure instance: Traceback (failure with no frames): :
]
done

我们可以捕捉到这个CancelledError 就像捕捉其他的deferred 的错误一样.
下面让我们先触发deferred 然后再取消它,代码在deferred-cancel/defer-cancel-3.py:

from twisted.internet import defer

def callback(res):
    print 'callback got:', res

def errback(err):
    print 'errback got:', err

d = defer.Deferred()
d.addCallbacks(callback, errback)
d.callback('result')
d.cancel()
print 'done'

下面是输出:

callback got: result
done

我们的callback 被正常的触发了,然后我们的程序正常的结束.就好像cancel 没有被调用一样.看起来取消一个cancel 对一个已经触发的deferred 没有什么作用一样.(继续往下看).
如果我们取消deferred 之后再去触发deferred 会发生什么呢? 例子在deferred-cancel/defer-cancel-4.py

from twisted.internet import defer

def callback(res):
    print 'callback got:', res

def errback(err):
    print 'errback got:', err

d = defer.Deferred()
d.addCallbacks(callback, errback)
d.cancel()
d.callback('result')
print 'done'

在这种情况下我们得到如下的输出:

errback got: [Failure instance: Traceback (failure with no frames): :
]
done

很有意思,输出和第二个例子中的一样,就像根本没有触发deferred一样.所以假如deferred 已经被取消了,再去触发这个deferred 会没有什么用.但是为什么d.callback(‘result’) 会抛出一个异常,因为你不能触发一个已经被触发过的deferred?

再看一下图片三十九.触发一个deferred 是底层代码的任务,而取消一个deferred是高层代码的的动作.触发这个deferred 意味着”这里是你的结果”,而取消一个deferred 意味着”我不在需要它”.

cancel 方法主要做了两件事情:

  1. 告诉deferred 对象如果这个结果还没有来的话,我们就不要了.并忽略后面的callback或者errback
  2. 告诉底层的正在产生结果的代码去做一些取消操作需要做的事情

我们取消deferred 的行为是自由的,在取消之后,我们就不会得到那些还没有返回的结果(即使返回了我们也收不到).但是取消deferred 并不会最终取消异步的操作.取消一个异步的操作需要一系列的操作.你可能需要关闭一个连接,回滚一些事物,杀掉一个自进程,等等. 因为deferred 只是一个callback 的组织者,它怎么知道当你取消它的时候会采取什么操作? 或者,它怎样把取消请求转发给底层的代码? 大声跟着我喊:

用Callback

Canceling Deferreds, Really

好的,让我们看一下deferred-cancel/defer-cancel-5.py:

from twisted.internet import defer

def canceller(d):
    print "I need to cancel this deferred:", d

def callback(res):
    print 'callback got:', res

def errback(err):
    print 'errback got:', err

d = defer.Deferred(canceller)
# created by lower-level code
d.addCallbacks(callback, errback)
# added by higher-level code
d.cancel()
print 'done'

这段代码很像第二个例子,除了一个多了一个canceller callback,这个callback 是deferred创建的时候传入的,不是后面加入的.这个callback 负责执行要取消这个deferred 所要进行的一系列操作(仅仅这个deferred 被取消的时候).这个canceller callback 是底层代码返回deferred 不可缺少的一部分,不是用来接收deferred并添加callback 和errback 的高层代码.

运行这个代码,你会看到如下的输出:

I need to cancel this deferred:
errback got: [Failure instance: Traceback (failure with no frames): :
]
done

你可以看到,这个canceller callback被传入一个我们想取消的deferred.在canceller callback 中我们可以进行一些取消deferred要完成的一些附属操作.canceller callback 是在errback之前触发的.实际上,这时我们可以选择用正常的结果还是错误的结果触发deferred.所有的情况在 deferred-cancel/defer-cancel-6.pydeferred-cancel/defer-cancel-7.py.

在把reactor引入进来之前,让我们再做一个简单的测试.我们创建了一个带有canceller 的deferred,正常的触发,并取消它,你可以在deferred-cancel/defer-cancel-8.py看到代码.通过检查这个例子的输出,你可以看到在取消一个已经触发的deferred之后并没有触发canceller callback.

我们上面看的例子都没有做些实际的异步操作.下面让我们创建一个简单的触发异步操作的程序,然后我们会弄明白怎样让这个异步操作变为可以取消的.看一下代码 deferred-cancel/defer-cancel-9.py:

from twisted.internet.defer import Deferred

def send_poem(d):
    print 'Sending poem'
    d.callback('Once upon a midnight dreary')

def get_poem():
    """Return a poem 5 seconds later."""
    from twisted.internet import reactor
    d = Deferred()
    reactor.callLater(5, send_poem, d)
    return d

def got_poem(poem):
    print 'I got a poem:', poem

def poem_error(err):
    print 'get_poem failed:', err

def main():
    from twisted.internet import reactor
    reactor.callLater(10, reactor.stop)
    # stop the reactor in 10 seconds

    d = get_poem()
    d.addCallbacks(got_poem, poem_error)

    reactor.run()

main()

这个例子包含了一个get_poem 函数,get_poem利用callLater 方法五秒钟之后返回一首诗.main 函数调用get_poem,并添加了一个callback/errback 对,并启动reactor.
运行这个例子会有如下的输出:

Sending poem
I got a poem: Once upon a midnight dreary

让我们试着在这首诗返回之前取消deferred.我们加上一些代码在两秒钟之后取消deferred.:

reactor.callLater(2, d.cancel) # cancel after 2 seconds

完整的程序在deferred-cancel/defer-cancel-10.py,会输出如下的内容:

get_poem failed: [Failure instance: Traceback (failure with no frames): :
]
Sending poem

这个例子清晰的描述了取消一个deferred并不一定会取消底层的异步请求.在两秒之后我们从errback看到了输出了CancelledError,在五秒之后我们仍旧会看到send_poem 的输出(但是接下来的callback没有触发).

这和例子四deferred-cancel/defer-cancel-4.py.取消deferred 导致最后的结果被忽略,但是真正意义上说并没有完全终止操作.根据我们上面讲的,要想彻底取消一个deferred你必须在deferred 创建的时候向其传入一个cancel callback.

这个被传入的callback需要做些什么呢?看一下callLater 的手册.callLater 返回的值是一个实现了IDelayedCall,并带有一个cancel 方法的对象,这个cancel 方法可以用来组织延迟的call 被执行.

这样就比较简单了,更新后的代码在deferred-cancel/defer-cancel-11.py,主要的变化在get_poem 函数:

def get_poem():
    """Return a poem 5 seconds later."""

    def canceler(d):
        # They don't want the poem anymore,
        #  so cancel the delayed call
        delayed_call.cancel()

        # At this point we have three choices:
        #   1. Do nothing, and the deferred will fire the errback
        #      chain with CancelledError.
        #   2. Fire the errback chain with a different error.
        #   3. Fire the callback chain with an alternative result.

    d = Deferred(canceler)

    from twisted.internet import reactor
    delayed_call = reactor.callLater(5, send_poem, d)

    return d

在这个新的版本中,我们保存了从callLater 返回的值从而让我们可以在callback中用到.我们的callback唯一需要做的是触发delayed_call.cancel().根据我们上面讲的,我们也可以选择我们自己去触发这个callback.最新版的例子的输出为:

get_poem failed: [Failure instance: Traceback (failure with no frames): :
]

你可以看到,deferred 被取消了,异步的操作确实被抛弃了.

Poetry Proxy 3.0

根据我们所讲的,poetry proxy server 是一个应用cancel deferred 的很好的场景,因为它可以允许我们放弃诗的下载假如没有人想要诗的话. proxy Version 3.0 代码在 twisted-server-4/poetry-proxy.py,实现了deferred 的取消.第一个变化的在 PoetryProxyProtocol:

class PoetryProxyProtocol(Protocol):

    def connectionMade(self):
        self.deferred = self.factory.service.get_poem()
        self.deferred.addCallback(self.transport.write)
        self.deferred.addBoth(lambda r: self.transport.loseConnection())

    def connectionLost(self, reason):
        if self.deferred is not None:
            deferred, self.deferred = self.deferred, None
            deferred.cancel() # cancel the deferred if it hasn't fired

和老一版本的相比.主要的变化是:

  1. 保存我们从get_poem 得到的deferred,以便我们可以以后用于取消deferred
  2. 当连接关闭的时候取消deferred.需要注意的是在我们实际上获取诗之后也会取消deferred,但是取消一个已经触发的deferred不会有什么影响

现在我们需要确认的是取消deferred 之后确实会放弃下载诗.为了这个我们也需要改变一下 ProxyService:

class ProxyService(object):

    poem = None # the cached poem

    def __init__(self, host, port):
        self.host = host
        self.port = port

    def get_poem(self):
        if self.poem is not None:
            print 'Using cached poem.'
            # return an already-fired deferred
            return succeed(self.poem)

        def canceler(d):
            print 'Canceling poem download.'
            factory.deferred = None
            connector.disconnect()

        print 'Fetching poem from server.'
        deferred = Deferred(canceler)
        deferred.addCallback(self.set_poem)
        factory = PoetryClientFactory(deferred)
        from twisted.internet import reactor
        connector = reactor.connectTCP(self.host, self.port, factory)
        return factory.deferred

    def set_poem(self, poem):
        self.poem = poem
        return poem

老版本的ProxyService,有如下变化:

  1. 我们保存了reactor.connectTCP 的返回值,它是一个IConnector 对象,我们可以用它提供的disconnect方法来关闭连接
  2. 我们创建了一个带有canceler callback 的deferred.这个callback 用connector 来关闭连接.但首先要先设置factory.deferred 属性为None.否则的话,这个factory可能会在这个deferred 被CancelledError触发之前触发这个deferred(发生连接错误的时候)


你可能已经注意到我们现在在ProxyService 创建这个deferred,而不是在PoetryClientFactory中.因为我们的canceler callback 需要操作IConnector 对象,ProxyService 就成了创建deferred 最方便的额地方.

在我们之前的的例子中,我们的canceler callback 都是作为一个闭包来实现的.闭包是非常有用的当我们要取消callback的时候.

让我们试一下我们的proxy.首先开启一个slow server.它需要足够慢让我们有时间来取消:

python blocking-server/slowpoetry.py --port 10001 poetry/fascination.txt

现在我们开启proxy:

python twisted-server-4/poetry-proxy.py --port 10000 10001

现在我们可以从proxy 下载诗了,可以用curl:

curl localhost:10000

几秒之后,按下Ctrl-C停止client,在运行proxy 的终端中你可以看到如下的输出:

Fetching poem from server.
Canceling poem download.

你可以看到我们的server 已经停止输出了,因为我们的proxy已经挂起了.

One More Wrinkle

我们上面已经说过了取消一个已经触发过的deferred 会没有作用.这并不是十分准确.在第十三部分我们了解到被添加到deferred 上的callback 和errback 会返回deferred.在这种情况下,外部的deferred 暂停执行它的cllback 链并等待内部的deferred 触发.

因此,即使一个deferred 触发了发起异步请求的高层的代码也不会接收到结果.因为这个callback 正在等待内部的deferred 实行完毕.如果高层的代码取消了外部的deferred会发生什么? 在这种情况下外部的deffered 不会取消自己,相反的它会取消内部的deferred.

所以当你取消一个deferred 的时候,你不是在取消主要的异步操作,而是在取消其他的被主要的异步操作触发的异步操作.(比较难懂哈 ,建议看英文).

我们可以用一个例子来描述这个过程.代码在deferred-cancel/defer-cancel-12.py:

from twisted.internet import defer

def cancel_outer(d):
    print "outer cancel callback."

def cancel_inner(d):
    print "inner cancel callback."

def first_outer_callback(res):
    print 'first outer callback, returning inner deferred'
    return inner_d

def second_outer_callback(res):
    print 'second outer callback got:', res

def outer_errback(err):
    print 'outer errback got:', err

outer_d = defer.Deferred(cancel_outer)
inner_d = defer.Deferred(cancel_inner)

outer_d.addCallback(first_outer_callback)
outer_d.addCallbacks(second_outer_callback, outer_errback)

outer_d.callback('result')

# at this point the outer deferred has fired, but is paused
# on the inner deferred.

print 'canceling outer deferred.'
outer_d.cancel()

print 'done'

在这个例子中我们创造了两个deferred,一个内部的一个外部的.首先我们触发外部的deferred,然后取消它.这个例子会有如下输出:

first outer callback, returning inner deferred
canceling outer deferred.
inner cancel callback.
outer errback got: [Failure instance: Traceback (failure with no frames): :
]
done

你可以看到,取消外部的deferred 不会引起外部的cancel callback 触发.相反的,它取消内部的deferred,所以内部deferred 的cancel callback会触发,然后外部的errback 会接收到一个CancelledError.

Discussion

取消一个deferred 是一个非常有用的操作,允许我们的程序去避免无谓的工作.但是也有一些取巧.

一个非常重要的要记住的事实是取消一个异步操作并不会一定取消根本的异步操作.实际上,大多数deferred 并不会真正的取消,因为大多数的twisted 代码还没有更新.检查文档或者源代码去看看是否取消deferred 会不会真正取消请求,或者只是忽略它.

第二个重要的事实是仅仅从异步的api 中返回一个deferred 并不会真正的取消deferred.假如你想在你的代码中实现取消deferred,你需要在在源代码中找更多的例子.

发表在 python, twisted | 评论关闭

twisted系列教程十八–异步操作的并行运行

Part 18: Deferreds En Masse

原文:http://krondo.com/blog/?p=2571
作者:dave
译者:notedit
时间:2011.07.02

Introduction

在上一部分我们学习了一种新的用生成器来组织一系列异步callbacks 的方法.加上deferred,我们已经有两种组织异步操作的方法了.

有时候,我们想让一组异步操作并行的运行.因为twisted 是单线程的,它不会真正的并行的运行,但是我们想要异步的I/O 在一组任务上运行的尽可能的快.比如我们的poetry client,从多个server上同时下载诗,而不是一个接一个的.这就是我们为什么用twisted.

我们的opetry client 不得不解决这个问题:你怎么知道你所有的异步操作什么时候能结束?目前为止我们是用把所有的返回结果放进一个list里面,并检查这个list 的长度.我们必须在收集结果的时候非常注意,因为一个错误的结果可能让我们的程序永久的运行下去.

就如你想象的那样,twisted 包含了一个解决这个问题的抽象,我们今天就会学习一下它的用法.

The DeferredList

DeferredList类允许我们把一个deferred 对象的列表当成一个deferred来对待.这样的话我们就可以开启多个异步的操作并在它们全部执行完的时候得到通知.让我们看一些例子.
deferred-list/deferred-list-1.py,你会发现这些代码:

from twisted.internet import defer

def got_results(res):
    print 'We got:', res

print 'Empty List.'
d = defer.DeferredList([])
print 'Adding Callback.'
d.addCallback(got_results)

如果你运行它,你会得到如下的输出:

Empty List.
Adding Callback.
We got: []

需要注意的一些事情:

  1. DeferredList 是从python 的list 创建而来.在这种情况下这个list 是空的,但是我们会看到这个list 里面的对象必须都是Deferred 对象
  2. DeferredList 也是一个deferred 对象,它继承至Deferred.这就意味着你可以向它加入callback 和errback,就像它是一个普通的deferred一样
  3. 在上面的例子中,我们的callback在被我们加入之后立即触发,所以这个DeferredList一定已经立马触发了.我们过一会会继续讨论这个
  4. deferred list 的返回结果是一个空的list

现在让我们看一下 deferred-list/deferred-list-2.py:

from twisted.internet import defer

def got_results(res):
    print 'We got:', res

print 'One Deferred.'
d1 = defer.Deferred()
d = defer.DeferredList([d1])
print 'Adding Callback.'
d.addCallback(got_results)
print 'Firing d1.'
d1.callback('d1 result')

现在我们创建了一个包含一个deferred 对象的DeferredList,下面是我们得到的信息:

One Deferred.
Adding Callback.
Firing d1.
We got: [(True, 'd1 result')]

一些注意的事情:

  1. 这一次DeferredList没有触发它的callback直到我们触发了list中的deferred
  2. 这个结果仍旧是一个list,不过这一次有了一个元素
  3. 这个元素是一个tuple,它的第二个值是它对应的deferred 的结果

让我们向list中添加两个deferred,在deferred-list/deferred-list-3.py:

from twisted.internet import defer

def got_results(res):
    print 'We got:', res

print 'Two Deferreds.'
d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.DeferredList([d1, d2])
print 'Adding Callback.'
d.addCallback(got_results)
print 'Firing d1.'
d1.callback('d1 result')
print 'Firing d2.'
d2.callback('d2 result')

下面是输出:

Two Deferreds.
Adding Callback.
Firing d1.
Firing d2.
We got: [(True, 'd1 result'), (True, 'd2 result')]

DeferredList 的结果是一个数量和DeferredList 中deferred的数量的相同的list.结果的中的每一个元素包含了和它相对应的deferred 的返回结果,前提是这个deferred运行成功.这就意味着DeferredList不会触发直到list 中的所有中的deferred 都已经触发.一个包含空列表的DeferredList 会立即触发.
DeferredList 中deferred 运行的顺序是怎样的呢? 看一下 deferred-list/deferred-list-4.py:

from twisted.internet import defer

def got_results(res):
    print 'We got:', res

print 'Two Deferreds.'
d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.DeferredList([d1, d2])
print 'Adding Callback.'
d.addCallback(got_results)
print 'Firing d2.'
d2.callback('d2 result')
print 'Firing d1.'
d1.callback('d1 result')

现在我们先触发d2然后触发d1.下面是输出:

Two Deferreds.
Adding Callback.
Firing d2.
Firing d1.
We got: [(True, 'd1 result'), (True, 'd2 result')]

输出列表有着和原来的list 的一样的顺序,而不是被触发的顺序.这样非常好,因为我们可以很好的把输出结果和deferred很好的关联起来.

好的,如果DeferredList 中的deferred 有一个失败了会发生什么?输出中的那些True 是做什么用的?让我们看 deferred-list/deferred-list-5.py:

from twisted.internet import defer

def got_results(res):
    print 'We got:', res

d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.DeferredList([d1, d2], consumeErrors=True)
d.addCallback(got_results)
print 'Firing d1.'
d1.callback('d1 result')
print 'Firing d2 with errback.'
d2.errback(Exception('d2 failure'))

现在我们用一个正常的结果触发d1,用一个error来触发d2.咱们先暂时忽略掉 consumeErrors 选项,下面是输出:

Firing d1.
Firing d2 with errback.
We got: [(True, 'd1 result'), (False, >)]

现在和d2 对应的返回结果出现一个错误.到现在我们应该清楚DeferredList 是怎样工作的:

  1. DeferredList是被一个deferred 的列表组成的
  2. DeferredList本身也是一个deferred,它的返回结果是一个长度和DeferredList本身长度的列表
  3. DeferredList在列表中所有deferred都触发之后才被触发
  4. 返回结果列表中的每一个元素对应着DeferredList中的每一个deferred.加入那个deferred成功了 这个元素是(True,result),假如这个deferred失败了,这个元素是(False,failure)
  5. 一个DeferredList不会失败,因为每一个deferred 无论成功与否它的结果都会被搜集到返回的列表中


下面让我们看一下consumeErrors 选项,假如我们不设置consumeErrors选项(deferred-list/deferred-list-6.py),我们会得到如下的输出:

Firing d1.
Firing d2 with errback.
We got: [(True, 'd1 result'), (False, >twisted.python.failure.Failure >type 'exceptions.Exception'<<)]
Unhandled error in Deferred:
Traceback (most recent call last):
Failure: exceptions.Exception: d2 failure

如果你回想一下,在deferred 中未处理的错误信息会在deferred 被垃圾回收的时候被抛出来.这个信息告诉我们我们没有全部的捕捉我们异步程序中的错误.这个错误信息从哪里来的呢?它明显的不是从DeferredList来的,所以这个错误一定来自d2.

DeferredList需要知道它下面的deferreds都在什么时候触发.DeferredList 通过增加一个callback和errback到每一个deferred,这样就可以监测了.默认的,这个callback(errback) 返回正常的结果(错误),因为返回错误会触发下一个errback,这样d2 在触发之后会保持失败状态.
但是假如我们设置consumeErrors 为True,向每一个deferred加入的errback 会返回None.我们也可以向d2加入自己的errback,例子在deferred-list/deferred-list-7.py.

Client 8.0

我们的poetry client 的8.0版本使用DeferredList去监测什么时候所有的poetry全部下载完.你可以在twisted-client-8/get-poetry.py看到代码.唯一的变化是poetry_main.让我们看一下主要的变化:

...
ds = []

for (host, port) in addresses:
    d = get_transformed_poem(host, port)
    d.addCallbacks(got_poem)
    ds.append(d)

dlist = defer.DeferredList(ds, consumeErrors=True)
dlist.addCallback(lambda res : reactor.stop())

在client 8.0 中,我们不需要poem_done callback 或者 results list.相反的,我们把从get_transformed_poem 获得的deferred 全部放入一个list,并创建一个DeferredList.因为DeferredList 会直到所有的deferred触发之后才会被触发.我们向DeferredList增加一个callback来关闭reactor.在这种情况下,我们没有使用DeferredList的返回结果,我们只需要知道什么时候所有的事情能结束.

Discussion
我们可以用图片形象话一个DeferredList 是怎样工作的,图片三十七:
图片三十七
真的很简单,仍旧有几个DeferredList 的参数我们没有覆盖掉.这些参数会改变DeferredList的默认行为,感兴趣的可以自己看.

在下一部分我们还会讲deferred 的一个特色,一个刚被twisted 10.1.0 加进去的.

发表在 python, twisted | 评论关闭

twisted系列教程十七–用inlineCallbacks来管理callbacks

Part 17: Just Another Way to Spell “Callback”

原文:http://krondo.com/blog/?p=2441
作者:dave
译者:notedit
时间:2011.06.30

Introduction

在这一部分我们继续回到callback.我们将介绍用生成器来写callbacks.我们会讲到这个技巧怎么工作的,还有它和Deferred 的比较.最后我们会用这个技巧重写我们的poetry client.首先我们要回顾一下生成器是怎样工作的,人后我们就会明白为什么它是创造callbacks 的一个替代品.

A Brief Review of Generators

你可能已经知道,python 的生成器是一个可以重新启动的函数,你可以用yield来实现重新启动的功能.通过yield,这个函数就变成了一个生成器函数,它会返回一个迭带器,并可以将一个函数分成一系列的步骤来运行.迭带器的每一个循环都会重启这个函数,这个函数会继续运行直到遇到下一个yield.

生成器(和迭带器)经常用来表示生成一系列的返回值.先看一下我们的例子inline-callbacks/gen-1.py:

def my_generator():
    print 'starting up'
    yield 1
    print "workin'"
    yield 2
    print "still workin'"
    yield 3
    print 'done'

for n in my_generator():
    print n

我们有了一个可以生成序列的1,2,3 的生成器.如果你运行这个代码的话,你会发现generator 里面的输出和for循环里面的输出是交替出现的.

我们可以自己实现生成器来让这段代码更明确,代码见inline-callbacks/gen-2.py:

def my_generator():
    print 'starting up'
    yield 1
    print "workin'"
    yield 2
    print "still workin'"
    yield 3
    print 'done'

gen = my_generator()

while True:
    try:
        n = gen.next()
    except StopIteration:
        break
    else:
        print n

把它作为一个序列,生成器只是一个可以生产连续的值的对象.但是我们仍就可以从生成器本身来看事情:

  1. 生成器函数不会开始运行直到被循环调用(用 next 方法)
  2. 一但生成器开始运行,它会一直运行下去直到它返回到这个循环(使用yield)
  3. 当这个循环正在运行其他的代码(比如print 语句),这个生成器不会运行
  4. 当这个生成器在运行的时候,这个循环不会运行(它等待generator 并阻塞)
  5. 一但一个生成器失去对循环的控制, 可能会花任意数量的时间去执行其他的任务直到生成器再一次运行

这有点像callback 在异步程序中的工作方式.我们可以把循环看成reactor,把生成器看成一系列的callbacks.有意思的地方是,所有的callbacks 都共享同一个命名空间,命名空间从一个callback到另一个callback 得到了延续.

此外,我们可以在同一时间让多个生成器处于活跃状态(例子在inline-callbacks/gen-3.py),让它们的”callback”互相的交替运行,就像你在twisted 中又运行了一个独立的异步任务.

还漏了一点,callback不是被reactor调用的,callbacks同时也要接受参数.在一个deferred 链中,一个callback 或者接收一个结果,或者接收一个Failure 对象. 自从python2.5 起,生成器被扩展成在重启的时候也可以接收参数,就像inline-callbacks/gen-4.py中阐明的一样:

class Malfunction(Exception):
    pass

def my_generator():
    print 'starting up'

    val = yield 1
    print 'got:', val

    val = yield 2
    print 'got:', val

    try:
        yield 3
    except Malfunction:
        print 'malfunction!'

    yield 4

    print 'done'

gen = my_generator()

print gen.next() # start the generator
print gen.send(10) # send the value 10
print gen.send(20) # send the value 20
print gen.throw(Malfunction())
 # raise an exception inside the generator

try:
    gen.next()
except StopIteration:
    pass

在python2.5 和以后的版本中,yield 声明是一个求值表达式.重启发生器的代码可以通过send 方法来获得这个值,如果你用next 这个值会是None.你也可以在生成器中抛出一个异常.

Inline Callbacks

根据上面我们讲的在生成器中发送和抛出值和异常的原理,我们可以可以把生成器看成一系列的callbacks,就像deferred 中的那样,或接收一个正常的结果或接收一个failure.这些callbacks 被yield 分开,每一个yield 表达式的值是下一个callback(yield) 的参数.图像三十五描述了这个过程:
图片三十五

现在当一系列的callback在deferred中组成一个链的时候.每一个callback从上一个callback中获取结果.这在生成器中很简单–把你上一次运行生成器的的结果传到下一次运行生成器就可以了.

回想第十三部分我们所讲到的,deferred 中的callback 也可以返回deferred 对象,外部的deferred 会保持暂停直到内部的deferred 的触发,然后外部deferred中的下一个callback(errback)会被调用,并被传入内部deferred 返回的值.

所以想象我们的生成器返回的不是一个普通的python 值,而是一个deferred 对象.当这发生时,外部的deferred 暂停,直到内部的deferred触发.这里相当于生成器暂停,并且是是自动的,生成器在一个yield之后会一直暂停直到它被明确的重启.所以我们可以延迟重启生成器直到deferred 触发,在这时我们可以返回正常的值(deferred 正常)或者抛出异常(deferred 失败).这样我们的生成器就成为一个真正的异步callback 序列了,这也是inlineCallbacks 函数要完成的功能.代码在twisted.internet.defer中.

inlineCallbacks

看一下在inline-callbacks/inline-callbacks-1.py中的例子程序:

from twisted.internet.defer import inlineCallbacks, Deferred

@inlineCallbacks
def my_callbacks():
    from twisted.internet import reactor

    print 'first callback'
    result = yield 1
    # yielded values that aren't deferred come right back

    print 'second callback got', result
    d = Deferred()
    reactor.callLater(5, d.callback, 2)
    result = yield d
    # yielded deferreds will pause the generator

    print 'third callback got', result
    # the result of the deferred

    d = Deferred()
    reactor.callLater(5, d.errback, Exception(3))

    try:
        yield d
    except Exception, e:
        result = e

    print 'fourth callback got', repr(result)
   # the exception from the deferred
    reactor.stop()

from twisted.internet import reactor
reactor.callWhenRunning(my_callbacks)
reactor.run()

运行这个例子你会看到生成器运行到底然后停止reactor.这个例子说明了inlineCallbacks 函数的几个方面,首先,inlineCallbacks 是一个装饰器并用来装饰生成器函数.inlineCallbacks 的主要的目的就是把一个生成器变成一系列的异步的callbacks.

第二,当我们调用一个用inlineCallbacks 修饰的函数的时候,我们不需要调用下一个或者发送或者抛出我们自己.这个装饰器会帮我们完成这些并会确保我们的生成器会一直运行到底(假设它并没有抛出异常).
第三,假如我们在生成器中生成一个不是deferred 的值,生成器会立即重启并带着这个yield 生成的值.
最后,如果我们在生成器中生成一个deferred,它会在这个deferred触发之后才会重启.如果这个deferred 成功了,yield 的结果就是deferred 的结果.如果这个deferred 失败了,yield 会抛出这个异常.注意这里的异常是一个普通的Exception 而不是Failure,我们可以用try/except 来捕捉它.

在这个例子中我们仅仅使用了callLater 在一段时间之后去触发deferred.这是一个很方便的把非阻塞的延迟放入callback 链的方法,一般来说,在我们的生成器中我们会不断的返回一个已经被触发过的deferred.

ok,我们现在已经知道一个被inlineCallbacks修饰的函数是怎样运行的,但这个函数最终会返回什么呢? 你可能已经猜到了,会返回deferred.因为我们不知道生成器什么时候会停止运行,这个被修饰过的函数是一个异步的函数,最适合返回的是deferred.注意这个返回的deferred 不是yield 语句返回的deferred,它是这个生成器全部运行完毕之后才触发的deferred.

如果这个生成器抛出一个异常,返回的deferred 会触发它的errback 链并带有一个封装了异常的Failure.如果我们想让生成器返回一个正常的值,我们必须用defer.returnValue 函数返回它.像平常的return 语句,它会停止这个生成器(实际上是抛出了一个特别的异常).inline-callbacks/inline-callbacks-2.py 例子描述了所有的情况.

Client 7.0

让我们用inlineCallbacks 来写我们的新的poetry client.你可以在twisted-client-7/get-poetry.py看到代码.你可能希望和client 6.0 做比较,代码在这里twisted-client-6/get-poetry.py.他们不一样的地方在poetry_main 方法中:

def poetry_main():
    addresses = parse_args()

    xform_addr = addresses.pop(0)

    proxy = TransformProxy(*xform_addr)

    from twisted.internet import reactor

    results = []

    @defer.inlineCallbacks
    def get_transformed_poem(host, port):
        try:
            poem = yield get_poetry(host, port)
        except Exception, e:
            print >>sys.stderr, 'The poem download failed:', e
            raise

        try:
            poem = yield proxy.xform('cummingsify', poem)
        except Exception:
            print >>sys.stderr, 'Cummingsify failed!'

        defer.returnValue(poem)

    def got_poem(poem):
        print poem

    def poem_done(_):
        results.append(_)
        if len(results) == len(addresses):
            reactor.stop()

    for address in addresses:
        host, port = address
        d = get_transformed_poem(host, port)
        d.addCallbacks(got_poem)
        d.addBoth(poem_done)

    reactor.run()

我们的inlineCallbacks 生成器函数get_transformed_poem 负责获取到诗歌和应用改变.因为两种操作都是异步的,我们每一次yield 一个deferred 并等待结果.和client 6.0 中一样,如果transformation失败我们就返回原来的诗.注意我们可以用try/except 在生成器中处理异步的错误.

我们可以用和以前一样的方法测试新的client.首先开启一个transform server:

python twisted-server-1/tranformedpoetry.py --port 10001

然后开启两个poetry server:

python twisted-server-1/fastpoetry.py --port 10002 poetry/fascination.txt
python twisted-server-1/fastpoetry.py --port 10003 poetry/science.txt

现在你可以运行我们的新的client 了:

python twisted-client-7/get-poetry.py 10001 10002 10003

Discussion

就像deferred 对象,inlineCallbacks 函数给了我们一个组织我们的异步的callback 的方法.和deferred 相比,inlineCallbacks 没有改变游戏规则(不知道怎样翻译贴切).特别的,我们的callback仍是一次只运行一个,而且都是reactor触发的.我们可以通过在代码中输出堆栈信息来确认这一点,例子代码在inline-callbacks/inline-callbacks-tb.py.运行这个代码你会得到一个traceback, reactor.run()在上面,中间有很多帮助函数,我们的callback 在下面.

我们可以适当的改变图片二十九,它解释了当一个deferred 中的callback 返回了令一个deferred会发生什么.图片三十六描述了当一个inlineCallbacks 生成器生成一个deferred 会发生什么:
图片三十六

这个图片和图片二十九很像,因为它们表述的想法是一样的–一个异步的操作等待令一个.

因为inlineCallbacks 和 deferred 可以解决很多相同的问题,它们两个该选哪一个呢?下面是inlineCallbacks 可能存在的一些优势:

  1. 可以让callbacks 共享一个命名空间,不用传过于的参数
  2. callback 的顺序很容易看到,因为它们从头执行到尾
  3. 有的callback 不用具体的函数声明,明确地流程控制,可以让人少写代码
  4. 错误可以被我们熟悉的try/except 来处理

当然也有一些陷阱:

  1. 在生成器中的callback不能个别的触发,这就样重用代码变得困难些.而在deferred,组成deferred 的代码可以任意的加callback
  2. 生成器的形式模糊了一个异步的callbacks是成对(callback/errback)出现的事实.尽管它外表上看起来像一般的序列函数,一个生成器却表现出一种完全不同的行为.inlineCallbacks 函数是学习异步编程模型必不可少的

Summary

在这一部分我们学习了inlineCallbacks 装饰器和怎样用inlineCallbacks来组织多个callbacks.
在第十八部分 我们会学习一个可以管理多个并行的异步操作的方法.

发表在 python, twisted | 评论关闭

twisted系列教程十六–twisted守护进程

Part 16: Twisted Daemonologie

原文:http://krondo.com/blog/?p=2345
作者:dave
译者:notedit
时间:2011.06.28

Introduction

到目前为止我们写的server 还运行在一个终端里面,通过print 语句向外输出内容.开发的时候这样做是很有好处的,但是当你部署一个产品的时候这样就不好了.一个生产环境中的server应该是:

  1. 作为一个守护进程运行,不和任何的终端和会话相连.你不会希望当你登出的时候,你的server 也会退出
  2. 把debug和错误信息输出到一个日志文件中,或者syslog 服务中
  3. 低权限的,比如,用一个低权限的用户运行
  4. 把它的pid 记到一个文件中,管理员可以很容易的向守护进程发送信号

twisted 已经提供了对上面四条的支持.但首先我们首先要先改变一下我们的代码.

The Concepts

要想理解twisted 你需要学习一些新的概念.最重要的一个概念是Service,一般来说,一个新的概念往往和一个或多个接口对应.

IService

IService接口定义了一个可以被停止和开启的service.这个service 做什么呢? 你可以做任何你想做的,这个接口仅仅需要一些很少的参数和方法(它定义的少,你可以实现的就多),而不是定义了特定的函数的一种service.

这个接口需要两个属性:name 和 running.name属性只是一个字符串, 例如”fastpoetry”或者None,running 属性是一个布尔类型,假如这个service被成功的开启,这个值为true.

我们只会接触ISservice 的一些方法.我们会略过那些高级的不常用的方法.IService 的两个基本方法是startServicestopService:

def startService():
    """
    Start the service.
    """

def stopService():
    """
    Stop the service.

    @rtype: L{Deferred}
    @return: a L{Deferred} which is triggered when the service has
        finished shutting down. If shutting down is immediate, a
        value can be returned (usually, C{None}).
    """

这些方法会完成什么功能要看service要完成的功能.例如,startService 可能会做如下的工作:

加载配置文件,或者
初始化数据库,或者
开始监听一个端口,或者
什么也不做

stopService 方法可能做如下的工作:

持久化一些状态,或者
关闭数据库连接,或者
停止监听一个端口,或者
什么也不做

当我们写我们自己的service 的时候我们需要适当的实现这些方法.对于一些常见的操作,比如监听一个端口,twisted 已经提供了一个可以直接使用的service.

注意一下stopService 有可能返回一个deferred,这个deferred 会在这个service 完全关闭的时候触发.这可以让我们在这个应用完全关闭前去清理一些数据.假如你的service是突然关闭的,你可以只返回None 来代替.

services 可以被组织在一起,可以同时的被开启或者同时关闭.我们要看的最后一个IService 的方法是setServiceParent,可以把一个service 加入到一个集合中.

def setServiceParent(parent):
    """
    Set the parent of the service.

    @type parent: L{IServiceCollection}
    @raise RuntimeError: Raised if the service already has a parent
        or if the service has a name and the parent already has a child
        by that name.
    """

任何一个service 都会有一个parent,这意味着services 可以按等级来组织,这就引出了下面的这个接口.

IServiceCollection

IServiceCollection接口定义了一个可以包含IService 对象的对象.一个service 的集合就是包含如下方法的集合类:

通过service name 查找 service (getServiceNamed)
迭带集合中的services(__iter__)
向集合中增加一个service(addService)
从集合中移除一个service(removeService)

注意一下一个IServiceCollection 的实现不会自动是一个IService的实现,但是没有理由一个类不能实现两个接口(后面我们将会看到一个实例).

Application

twisted 的Application 不是被分开的接口定义的,但是一个Application 对象需要实现 IService 和 IServiceCollection,还有一些其他的我们不会讲到的接口.
Application 是最顶层的代表你整个twisted 应用的service.你所有的其他的service 全是Application 的孩子.
去完全的实现你自己的Application 是很少见的.我们今天会用到twisted 已经提供了的实现.

Twisted Logging

twisted 包含了它自己的logging 实现,在模块twisted.python.log 中,logging 中的一些api 非常简单,所以我们只包含了一个很简单的例子在basic-twisted/log.py.如果你感兴趣的话,你可以在twisted 源码中看个究竟.

FastPoetry 2.0

好了,让我们看一些代码.我们已经更新我们的fast poetry server了,原代码在twisted-server-3/fastpoetry.py,首先我们看一些 poetry protocol:

class PoetryProtocol(Protocol):

    def connectionMade(self):
        poem = self.factory.service.poem
        log.msg('sending %d bytes of poetry to %s'
                % (len(poem), self.transport.getPeer()))
        self.transport.write(poem)
        self.transport.loseConnection()

注意我们使用twisted.python.log.msg 函数替代我们原来的输出语句,来记录每一个新的连接.
下面是 factory class:

class PoetryFactory(ServerFactory):

    protocol = PoetryProtocol

    def __init__(self, service):
        self.service = service

你可以看到,诗已经不再存在factory里面,而是存在一个被factory 引用的service 对象里面.注意这个protocol 是怎样从service 中通过factory 获取到诗的.最后是service class:

class PoetryService(service.Service):

    def __init__(self, poetry_file):
        self.poetry_file = poetry_file

    def startService(self):
        service.Service.startService(self)
        self.poem = open(self.poetry_file).read()
        log.msg('loaded a poem from: %s' % (self.poetry_file,))

就像很多其他的接口类,twisted 提供了一个基类,我们用它提供的默认的行为来完成我们的实现.在这里我们使用 twisted.application.service.Service来实现我们的PoetryService.

这个基本的类提供了需要实现的全部的方法的默认实现,所以我们只需实现具有特定行为的方法.在本例中,我们只重新了startService 去加载我们需要的诗歌文件.
另一个值得一提的是PoetryService 不知道PoetryProtocol 的任何事情.这个service 的唯一的工作就是加载诗,并把它提供给那些需要它的对象.换句话说就是,PoetryService 只负责高层次的提供诗歌内容,而不是低层次的向一个tcp 连接中发送数据.所以这个service 也可以被其他的protocol 使用.我们的例子比较小所以你看不出有多大的便利,如果是现实中的一个service 的实现,你可以想象会给我们带来多大的便利.

假如这是一个典型的twisted 程序,上面的程序最终不可能只在一个文件中.而是,它应该在某个模块中.但是根据我们前面的写代码的经验,我们把所有的需要的代码文件都放在一个文件中了.

Twisted tac files

一个tac 文件是一个twisted application configuration 文件,它会告诉twisted 怎样构建一个.作为一个配置文件它会为了让一个application以某种特定的方式运行起来 来选择配置.换句话说,一个tac 文件代表了一种我们服务的部署,而不是一段可以开启我们poetry server 的脚本.

假如我们在一台主机上运行了多个poetry server,我们可以为每一个server 配置一个配置文件.在我们的例子中,这个tac 文件被配置成一个server 在本地10000端口上并提供poetry/ecstasy.txt 文件里的诗.

# configuration parameters
port = 10000
iface = 'localhost'
poetry_file = 'poetry/ecstasy.txt'

twisted 对配置文件里面的变量是一无所知的,我们只是把放在这里.实际上,twisted 只关心整个配置文件中的一个变量,我们在后面会讲到.下面我们开始创建我们的application:

# this will hold the services that combine to
# form the poetry server
top_service = service.MultiService()

我们的poetry server 由两个services 组成,一个是我们上面定义的PoetryService,另一个是twisted 内置的 service–用来创建我们的诗用来服务的socket. 因为这两个services互相之间有关系,我们会把他们放在一起通过一个MultiService,一个实现了IService 和 IServiceCollection 接口的类.

作为一个service 的集合,MultiService 会把我们的两个poetry service 组合起来.作为一个service,MultiService 会在它自己启动的时候同时启动两个子service,当它关闭的时候也同时关闭子service.让我们向集合中加入第一个poetry service:

# the poetry service holds the poem. it will load the poem
# when it is started
poetry_service = PoetryService(poetry_file)
poetry_service.setServiceParent(top_service)

非常简单,我们创建了一个PoetryService 并把它用setServiceParent加入到集合中,下一步我们加入TCP 监听:

# the tcp service connects the factory to a listening socket.
# it will  create the listening socket when it is started
factory = PoetryFactory(poetry_service)
tcp_service = internet.TCPServer(port, factory, interface=iface)
tcp_service.setServiceParent(top_service)

twisted 提供了一个TCPServer service 用来创建一个和任意factory 相连的 监听tcp 连接的socket .我们没有直接用reactor.listenTCP 的原因是tac 文件的工作是让我们的application准备好,而不是运行它.TCPServer 在被twisted 启动的时候TCPServer会创建这个socket.

ok,现在我们的service 都被绑定进集合去了.现在我们可以创建我们的Application,并把我们的集合传给它.

# this variable has to be named 'application'
application = service.Application("fastpoetry")

# this hooks the collection we made to the application
top_service.setServiceParent(application)

在这个脚本中,twisted真正关心的是application 变量.twisted 通过它来找到application(而且必须命名为application),当这个application启动的时候,我们向其添加的service也会被启动.

图片三十四描述了我们上面刚建立的这个application 的结构:
图片三十四

Running the Server

一个tac文件,我们需要用twisted 来运行它.当然,它也是一个python 文件.所以先让我们用python 执行一下它看看会发生什么:

python twisted-server-3/fastpoetry.py

如果你这样做,你会发现什么也没发生.就像我们之前说的,tac 文件的工作就是让一个application 做好准备运行的准备.为了提醒tac 文件的特殊应用,一些把它的后缀名改为.tac.twisted 并不关心后缀名是什么.你可以twisted 来运行这个tac 文件:

twistd --nodaemon --python twisted-server-3/fastpoetry.py

你会看到如下的输出:

2010-06-23 20:57:14-0700 [-] Log opened.
2010-06-23 20:57:14-0700 [-] twistd 10.0.0 (/usr/bin/python 2.6.5) starting up.
2010-06-23 20:57:14-0700 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
2010-06-23 20:57:14-0700 [-] __builtin__.PoetryFactory starting on 10000
2010-06-23 20:57:14-0700 [-] Starting factory <__builtin__.PoetryFactory instance at 0x14ae8c0>
2010-06-23 20:57:14-0700 [-] loaded a poem from: poetry/ecstasy.txt

下面是要注意的一些事情:

  1. 你可以看到Twisted logging 系统的输出,包括PoetryFactory 中的log.msg. twisted 已经为我们安装了一个logger
  2. 你也可以看到我们的两个services,PoetryService 和 TCPServer 开始运行
  3. 你会发现命令行并没有返回,这个说明我们的server 并不是以守护进程运行的.默认的twisted 会以守护进程的方式运行.你可以用 --nodaemon 选项,这样log信息会输出到标准输出.这对调试很有帮助


现在我们可以测试了,你可以用poetry client 或者netcat:

netcat localhost 10000

这个会从server 上获取一首诗,你还会看到下面的日志输出:

2010-06-27 22:17:39-0700 [__builtin__.PoetryFactory] sending 3003 bytes of poetry to IPv4Address(TCP, '127.0.0.1', 58208)

输出的日志是从PoetryProtocol.connectionMade 中的log.msg 输出的.如果你做多次请求server,你会发现其他的日志输出.

现在你按Ctrl-C 输出,你会看到下面的一些输出:

^C2010-06-29 21:32:59-0700 [-] Received SIGINT, shutting down.
2010-06-29 21:32:59-0700 [-] (Port 10000 Closed)
2010-06-29 21:32:59-0700 [-] Stopping factory <__builtin__.PoetryFactory instance at 0x28d38c0>
2010-06-29 21:32:59-0700 [-] Main loop terminated.
2010-06-29 21:32:59-0700 [-] Server Shut Down.

你会看到,twisted 不会简单的崩溃掉,而是干净的关闭自己并用log 信息记录清理过程.

A Real Daemon

现在让我们的server成为一个真正的守护进程:

twistd --python twisted-server-3/fastpoetry.py

这一次我们立即的返回了得到我们的命令行提示符号.如果你看下你当前的目录你会发现除了twisted.pid 文件以外还有一个twisted.log 文件.
当我们开启一个守护进程的时候,twisted 会安装一个log handler ,这样你的log 就会被记录到一个文件中而不是输出到标准输出.默认的log 文件是twisted.log,在你运行twisted 的当前目录.但是你可以通过–logfile 来改变log 的路径.

(略去两段)

The Twisted Plugin System

现在我们可以用twisted 来启动守护进程了.用python文件作为我们的配置文件给了我们配置上的很大的灵活性.但是我们经常用不到这么多的灵活性.对于我们的poetry server 来说,我们只关心三个选项:

1,诗的内容
2,server 需要用的端口
3,需要监听的ip(localhost)

twisted 的插件提供了一个定义带有多个命令行选项的Application 的方法,twisted 就可以动态的运行.twisted 自身带了很多的插件. 如果你不带任何的参数运行twisted 的话,你会发现它带的一些插件.你可以现在试一下twisted命令.在帮助信息之后,你会看到如下的内容:

 ...
    ftp                An FTP server.
    telnet             A simple, telnet-based remote debugging service.
    socks              A SOCKSv4 proxy service.
    ...

每一行输出一个twisted 内置的插件,你可以用twisted 运行他们中的任何一个.

每个插件还会有它自己的输出信息,你可以用–help 来查看具体的信息,例如:

twistd ftp --help

你可以用下面的命令运行一个ftp 服务:

twistd --nodaemon ftp --port 10001

ok,让我们从poetry server 上先转向twisted 插件.同样的,让我们先讲一些新的概念.

IPlugin

任何的twisted plugin 必须实现twisted.plugin.IPlugin接口,如果你看那个接口的声明,你会发现它并没有指定任何的方法.实现IPlugin 最plugin 来说是标明它的身份.当然,如果要实现一些功能,它还要实现一些其他的接口,我们后面会讲到.

但是你怎样知道一个对象是否实现了一个空的接口? zope.interface 包含了一个叫做implements 的方法,你可以用它声明一个特定的类实现了一个特定的接口.我们会在我们的poetry server 的plugin 版本中看到一个例子.

IServiceMaker

除IPlugin 之外,我们的plugin也会实现IServiceMaker接口.一个实现了IServiceMaker 的对象知道怎样去创建一个IService.IServiceMaker 指定了三个属性和一个方法:

  1. tapname:我们plugin 的名字."tap" 代表了Twisted Application Plugin.
  2. description:我们plugin 的描述
  3. options:一个描述我们的plugin 的要接收的命令行参数的对象
  4. makeService:可以创建一个IService对象,接收option传来的参数


我们会在我们的下一版本的poetry server 中讲到.

Fast Poetry 3.0

现在我们准备去看一下插件版的Fast poetry,代码在 twisted/plugins/fastpoetry_plugin.py.
你可能已经注意到我们这次的命名规则和其他的例子不太一样.那是因为twisted 需要插件文件在twisted/plugins 目录中.这个目录不必是一个包,并且你可以有多个twisted/plugins 目录在你的twisted 能找到的路径中.

我们的plugin 的第一部分包含了一些poetry protocol,factory,service implementations.和以前一样,这些文件本应该在一个模块中,但为了方便,我们把他们全部放在一个plugin 中–让这些代码的都是自包含的.
下面是 plugin 的命令行选项:

class Options(usage.Options):

    optParameters = [
        ['port', 'p', 10000, 'The port number to listen on.'],
        ['poem', None, None, 'The file containing the poem.'],
        ['iface', None, 'localhost', 'The interface to listen on.'],
        ]

这些代码指定了这个plugin 的特定的命令行选项.我们下面看我们plugin 最主要的部分–service maker class:

class PoetryServiceMaker(object):

    implements(service.IServiceMaker, IPlugin)

    tapname = "fastpoetry"
    description = "A fast poetry service."
    options = Options

    def makeService(self, options):
        top_service = service.MultiService()

        poetry_service = PoetryService(options['poem'])
        poetry_service.setServiceParent(top_service)

        factory = PoetryFactory(poetry_service)
        tcp_service = internet.TCPServer(int(options['port']), factory,
                                         interface=options['iface'])
        tcp_service.setServiceParent(top_service)

        return top_service

这里你可以看到zope.interface.implements 函数怎样声明我们的类要实现 IServiceMaker 和 IPlugin.
你应该能认出makeService 中的一些代码,和tac 那一版本的实现一样.但这次我们不需要自己创建一个Application对象.我们仅仅创建并返回最上层的service.twisted 会帮助我们完成其他的工作.

在声明这个类之后,还有唯一的一件事情要做:

service_maker = PoetryServiceMaker()

我们的twisted 脚本会发现这个plugin 实例,并用它创建我们的顶层的service.不像tac 文件,这里我们的变量名是可以任意取的.重要的是我们的对象要实现IPlugin 和 IServiceMaker.

既然我们已经完成了我们的plugin,让我们运行一下.确保你在twisted-intro 目录,或者确保twisted-intro目录在你的python 路径中. 试着运行twisted 你会发现”fastpoetry” 已经在插件列表中了.

你也会发现在twisted/plugins 目录中出现了一个dropin.cache 的文件,这个文件是用来加快插件的浏览的.
你可以获得我们插件的帮助信息:

twisted fastpoetry --help

你可以在帮助信息中看到fastpoetry中的选项参数.让我们运行我们的plugin:

twistd fastpoetry --port 10000 --poem poetry/ecstasy.txt

这样就可以开启一个fastpoetry 的守护进程.你可以在当前目录下看到twisted.pid 和twisted.log 文件.你可以用过下面的命令来关闭它:

kill `cat twisted.pid`

Summary
在这一部分,我们学习了怎样把我们的twisted server 变成一个守护进程.我们还讲了twisted 的logging 系统,怎样用twisted运行一个以守护进程方式运行的twisted application–用tac配置 文件或者用twisted 插件的方式.在第十七部分,我们将讲更多的异步编程的原理,还有在twisted 中callback 的另一种组织方式.

发表在 python, twisted | 评论关闭

twisted系列教程十五–测试twisted代码

Part 15: Tested Poetry

原文:http://krondo.com/blog/?p=2273
作者:dave
译者:notedit
时间:2011.06.27

Introduction

在这个系列中我们也已经写了很多twisted 代码了,但目前为止我们忽略了一个很重要的事情—测试.你可能也一直在想我们怎样用一个同步的测试框架unitest来测试我们的异步的程序.简短的回答是不能,我们已经发现,同步的程序和异步的程序不能混合在一起.最起码不会很好的结合在一起.

幸运的是,twisted 已经包含了他自己的异步测试测试框架–trial.我们也可以用它来测试同步的框架.

我们假设你已经对unittest 的测试原理和测试框架比较熟悉了,在unittest 中,我们通过定义一个继承TestCase的类创建测试,并且每个测试方法前面以test为前缀.框架会帮你发现测试,运行测试,然后报告出测试结果.

The Example

你会在tests/test_poetry.py发现一些例子代码.为了保证我们的例子可以自包含,我们已经把全部的有必要的代码拷进test 模块.正常的来说,你只需要导入你想测试的模块就可以了.

这个例子测试了client 和server,通过使用这个client 去从test server 上获取到每一首诗.为了提供一个可供测试的server,我们在我们的测试用例中实现了setUp 方法:

class PoetryTestCase(TestCase):

    def setUp(self):
        factory = PoetryServerFactory(TEST_POEM)
        from twisted.internet import reactor
        self.port = reactor.listenTCP(0, factory, interface="127.0.0.1")
        self.portnum = self.port.getHost().port

这个setUp 方法创建了一个poetry server,并随机监听一个端口.我们保存了这个端口号,让我们的测试用例使用.当我们的测试用例运行完的时候我们用tearDown清理我们的test server:

def tearDown(self):
    port, self.port = self.port, None
    return port.stopListening()

下面我们看一下我们真实的测试代码,test_client,我们使用get_poetry 从test server 上获取诗的内容并验证是不是我们想要的诗:

def test_client(self):
    """The correct poem is returned by get_poetry."""
    d = get_poetry('127.0.0.1', self.portnum)

    def got_poem(poem):
        self.assertEquals(poem, TEST_POEM)

    d.addCallback(got_poem)

    return d

注意一下我们的测试函数返回了一个deferred.在trial中,每一个测试方法被当作一个callback.它意味着reactor 在运行着我们可以执行异步的操作.我们需要让这个测试框架知道我们的测试是异步的,我们通过返回一个deferred来告诉测试框架我们的函数是异步的.

trial 框架会等待所有的deferred 被触发之后才会调用tearDown 方法,如果deferred 失败了测试也会失败.假如我们的deferred 花了太多的时间去触发,我们的测试也会失败,默认的是两分钟.如果我们的测试运行结束了,我们知道我们的deferred 也触发了,我们的callback 也触发了,我们就可以运行assertEquals 了.

我们的第二个测试是,test_failure,可以验证get_poetry 失败的方式,假如我们不能连接上server 的话:

def test_failure(self):
    """The correct failure is returned by get_poetry when
    connecting to a port with no server."""
    d = get_poetry('127.0.0.1', -1)
    return self.assertFailure(d, ConnectionRefusedError)

在这里我们试图去连接一个不正确的端口,并用trial 提供的assertFailure方法.这个方法有点像assertRaises ,但是是用来测试异步的代码的.它会返回一个成功的deferred 假如测试用的deferred 出现我们要的错误.

你可以运行这个测试脚本:

trial tests/test_poetry.py

你会看到每个测试用例的输出,如果测试通过的话会输出OK.

Discussion

因为trial 和unittest 提供的一些api非常相像.用它写单元测试是很简单的.如果你想测试异步的代码你只需要返回一个deferred,trial 会负责其他的事情.你也可以从setUp 和 tearDonw 中返回一个deferred,如果他们也需要异步的话.

任何的日志信息都会被保存到一个叫做_trial_temp的目录下,如果没有的话 trial 会自动帮我们建立.除了被打印到屏幕上的一些错误信息除外,这些日志信息对我们分析出错的原因非常重要.

图片三十三展示了一个假想的测试过程:
图片三十三

如果你曾经用过类似的测试框架,这个会是一个非常熟悉的模型,除了这个测试相关的方法都返回deferred.

trial 框架也是一个怎样在一个程序中用交错任务让这个程序变成异步的程序的很好的说明,为了让一个测试变成异步的,你需要:

  1. 不能是阻塞的
  2. 返回一个deferred

Summary

这个就是我们要讲 单元测试.如果你想要看更多的怎样测试twisted 代码的例子,你可以看一下twisted 的源代码.twisted 代码中包含了大量的单元测试.因为这些测试都是twisted 的专家经过仔细地检查之后才被加入代码库的,他们的代码就是一个非常好的单元测试的例子.

在第十六部分我们会使用twisted 的一个实用的功能让我们的poetry server 变成一个守护进程.

发表在 python, twisted | 评论关闭

twisted系列教程十四— pre-fireed deferred

Part 14: When a Deferred Isn’t

原文:http://krondo.com/blog/?p=2205
作者:dave
译者:notedit
时间:2011.06.27

Introduction

在这一部分我们将要学习deferred 类的另外的一个方面.为了促进讨论,我们要为我们的poetry service增加一个server.假设我们有大量的内部的client 想要连接一个相同的外部的server.假设这个server已经很慢而且已经负载很高了.我们不想再让server上连接更多的client 了.

所以我们会创建一个缓存代理服务器.当一个client 连接到proxy的时候,这个proxy或者从外部的server获取到一首诗或者就返回一个之前已经缓存了的内容.我们可以让我们的client 都连接proxy,我们的外部的server 的负载就会很小.我们可以用图片三十来描述这个过程:
图片三十

思考一下当一个client连接到proxy 之后会发生什么,假如这个proxy 的缓存是不存在的,这个proxy必须异步等待外部的server 返回一个结果然后才能返回client.到目前为止还不错,我们已经知道怎样去处理返回deferred 的异步的函数.另一方面,假如在缓存中已经有了一个一首诗,这个proxy 会把它立即返回,一点也不用等待.所以proxy获取一首诗的内容可以是同步的或者是异步的.

所以 我们能做些什么假如我们有一个有时异步有时同步的函数?twisted 提供了很多选项,并且它们依据deferred 的一个我们没有讲的特色:你可以在你返回deferred之前触发它.

这个是管用的,因为尽管你不能触发一个deferred 两次,但是你可以在deferred 触发之后向deferred 中增加callbacks 和 errbacks.当你这样做的时候,deferred 会继续的触发 callback/errback 链 从上次它离开的地方.一个很重要的一点是一个已经触发的deferred 可以立即触发新假如的callback.

图片三十一展示了一个已经被触发的deferred:
图片三十一
如果我们现在向其中加入一对callback/errback,这个deferred 会立即的触发新加入的callback,就像图片三十二:
图片三十二

我们测试这个deferred 的新特色通过代码twisted-deferred/defer-11.py.试着运行一下并看看deferred 被触发之后又加入callback 之后会发生什么.注意在第一个例子中每个新的callback是怎样被立即触发的.

第二个例子展示了我们怎样pause() 一个deferred不让它立即触发callback的.当我们都准备好的时候,我们可以用unpause().其实暂停deferred 的原理和 当一个callback 返回deferred 导致外部的deferred 暂停的原理是一样的.

Proxy 1.0

现在让我们看一下第一版的poetry proxy –twisted-server-1/poetry-proxy.py,因为这个proxy 同时扮演了client 和 server 的角色,它有两对 Protocol/Factory.一个用来为poetry 服务,另一个用来从外部的server 获取诗歌的内容.我们就不看为client 服务的protocol/factory 了,和以前的版本一样.

让我们来看 ProxyService,proxy中的server-side protocol 利用它来从外部的server获取一首诗:

class ProxyService(object):

    poem = None # the cached poem

    def __init__(self, host, port):
        self.host = host
        self.port = port

    def get_poem(self):
        if self.poem is not None:
            print 'Using cached poem.'
            return self.poem

        print 'Fetching poem from server.'
        factory = PoetryClientFactory()
        factory.deferred.addCallback(self.set_poem)
        from twisted.internet import reactor
        reactor.connectTCP(self.host, self.port, factory)
        return factory.deferred

    def set_poem(self, poem):
        self.poem = poem
        return poem

重要的方法是get_poem.假如在缓存里已经有一首诗存在,直接返回.如果没有的话,我们向外部的server 发起一个连接并返回一个deferred,如果等待的诗来到则触发deferred.get_poem 是一个只有一部分时间是异步的.

怎样来处理那样的一个函数呢?让我们来看一下server-side protocol/factory :

class PoetryProxyProtocol(Protocol):

    def connectionMade(self):
        d = maybeDeferred(self.factory.service.get_poem)
        d.addCallback(self.transport.write)
        d.addBoth(lambda r: self.transport.loseConnection())

class PoetryProxyFactory(ServerFactory):

    protocol = PoetryProxyProtocol

    def __init__(self, service):
        self.service = service

这个factory 是很简单的,它只保存了一个proxy service 的引用,这样可以让protocol 实例可以调用get_poem 方法.protocol 是核心所在.它没有直接的调用get_poem,而是使用了一个twisted.internet.defer 的封装—maybeDeferred.

maybeDeferred 函数拿到一个函数的引用,并加上了一些参数,maybeDeferred 会最终调用这个函数,并且做如下的工作:

  1. 如果这个函数返回了一个deferred,maybeDeferred 也返回这个deferred,或者
  2. 假如这个函数返回了一个Failure,maybeDeferred 返回一个已经被触发的deferred ,并带着failure参数,或者
  3. 假如这个函数返回了一个正常的值,maybeDeferred返回一个已经被触发的deferred,并带着这个正常的值作为参数,或者
  4. 假如这个函数抛出了一个错误,maybeDeferred会返回一个已经被触发的deferred,并带着由这个错误转化来的failure作为参数

换句话说,从maybeDeferred 返回的值一定是一个deferred,即使你传递过去的函数不会返回deferred.这就让我们可以安全的调用一个同步的函数,并把它当作一个返回deferred异步的函数.


注意一:这里仍有一点不一样,被一个同步的函数返回的deferred 是已经被触发过的,所以任何的你加入的callback 和errback 都会被立即调用,而不是在一些reactor loop 的迭带之后.
注意二:也许给一个一定会返回deferred 的函数命名为maybeDeferred 不是一个特别好的选择.

一但这个protocol 有了一个真正的deferred,它可以增加一些callback把诗送到client,并关闭相应的连接.这个就是我们的第一个poetry proxy.

Running the Proxy

要测试我们的代理的话,先开启一个poetry server,像下面这样:

python twisted-server-1/fastpoetry.py --port 10001 poetry/fascination.txt

然后开启一个proxy server:

python twisted-server-1/poetry-proxy.py --port 10000 10001

也就说proxy 运行在10000端口,poetry server 运行在10001端口.
下面你可以运行一个client 连接proxy:

python twisted-client-4/get-poetry.py 10000

我们使用了一个早期的没有poetry transformations 的client 版本.你可以看到一首诗出现在client 的窗口里,还有一些文字说明它正在从server 下载.如果你再运行client 一次,这个proxy 会告诉你它正在使用缓存起来的poem.

Proxy 2.0

我们前面已经说过,还有另外一种方法可以实现我们的需求.在Porxy 2.0 中有说明,代码见twisted-server-2/poetry-proxy.py.既然我们可以在返回deferred之前触发它,我们可以让proxy service 在缓存中已经存在这首诗的时候返回一个已经触发过的deferred.下面是proxy service 中get_poem 的新版本:

def get_poem(self):
    if self.poem is not None:
        print 'Using cached poem.'
        # return an already-fired deferred
        return succeed(self.poem)

    print 'Fetching poem from server.'
    factory = PoetryClientFactory()
    factory.deferred.addCallback(self.set_poem)
    from twisted.internet import reactor
    reactor.connectTCP(self.host, self.port, factory)
    return factory.deferred

这个defer.succeed 函数是创建一个已经触发的deferred并返回一个值的很便捷的方法.查看一下它的实现你会发现它就是创建一个deferred ,并用callback()触发 的封装.如果我们想返回一个已经失败了的deferred 我们可以用defer.fail.

在这个版本中,因为get_poem 已经返回了一个deferred,protocol 类不再需要maybeDeferred:

class PoetryProxyProtocol(Protocol):

    def connectionMade(self):
        d = self.factory.service.get_poem()
        d.addCallback(self.transport.write)
        d.addBoth(lambda r: self.transport.loseConnection())

除了这两个地方的变化之外,其他的没什么变化了.你可以像上面的方法一样来运行它.

Summary

在这一部分我们学到了怎样deferred 在被返回之前被调用,因而我们可以在同步的程序使用它,我们有两种方法去实现它:

  1. 我们可以使用maybeDeferred来处理时而返回deferred 时而返回正常结果的函数
  2. 我们可以用defer.succed 和 defer.fail 提前触发我们的deferred,所以我们的有时同步有时异步的函数可以总是返回deferred

我们可以使用他们中的任何一个方法.前一个强调了我们的函数不是总是异步的,而后一个让代码更简单.没有一个定论非要使用哪个.

两种方法都可以是因为我们可以向deferred中增加callback/errback 在它被触发之后.它也解了我们在第九部分提出的疑问.我们了解到在deferred中,不管是最后一个callback 或者errback 失败,错误会在deferred 被垃圾回收的时候才被报告出来.现在我们知道因为什么了–因为我们可以一直向一个deferred 对象中增加一个callback/errback 对,直到最后一个对deferred 的引用也消失了,twisted 才能认定这个错误没有被处理.

所以,这就是deferred了吗?我们已经知道deferred 的全部了吗? 对于大部分来说,是的.但是twisted 包含了很多我们还没有探寻到的很多种交替使用deferred 的方式.同时,twisted 的开发人员也在不停的增加新的特色.在将来的发布的版本中,deferred 会有更多的能力.我们会在以后的章节中讲到,但首先我们需要从deferred中休息一下,看一些twisted 的其他的方面.

发表在 python, twisted | 评论关闭