Gearman入门


#软件架构与思考#


2014-04-07

gearman是一个支持多语言的任务分发框架,很像远程过程调用。

图片来自网络

上图是来自资料[1],展示了gearman的基本设计思路。worker作为执行任务的节点,在生成时会向job server注册。client节点向job server发起任务,job server将任务分配给worker,job server作为一个中间人会负责client和worker之间的任务参数、执行结果等信息的传递。Gearman的更多内容,请参考资料[2]。

在ubuntu下安装:

sudo apt-get install gearman

启动:

service gearman-job-server start

或者使用gearmand启动。

安装相应的python库:

sudo pip install gearman

关于这个库,具体请参考资料[3]。

从示例开始

创建文件worker.py,内容如下:

import gearman
import pickle

gm_worker = gearman.GearmanWorker(['localhost:4730'])

def sum_worker(gearman_worker, gearman_job):
    print gearman_job
    print gearman_job.data
    print type(gearman_job.data)
    return pickle.dumps(sum(pickle.loads(gearman_job.data)))

def str2upper_worker(gearman_worker, gearman_job):
    print gearman_job
    print gearman_job.data
    print type(gearman_job.data)
    return gearman_job.data.upper()

gm_worker.register_task('sum', sum_worker)
gm_worker.register_task('str2upper', str2upper_worker)

gm_worker.work()

创建文件client.py,内容如下:

import gearman
import pickle

gm_client = gearman.GearmanClient(['localhost:4730'])

request = gm_client.submit_job("str2upper", "hello world")
print request
print request.result

request = gm_client.submit_job("sum", pickle.dumps([1,2,3]))
print request
print pickle.loads(request.result)

打开终端A,运行gearman:

sudo gearmand

打开另外一个终端B,运行worker.py:

python worker.py

再打开一个终端C,运行client.py:

python client.py

此时,终端B输出以下内容:

<GearmanJob connection/handle=(<GearmanConnection localhost:4730 connected=True>, 'H:myhost:5'), task=str2upper, unique=7f3bc96ba032288de7e55570fe5ea9b7, data='hello world'>
hello world
<type 'str'>
<GearmanJob connection/handle=(<GearmanConnection localhost:4730 connected=True>, 'H:myhost:6'), task=sum, unique=9195ef08761f921c322a716aa1db61a5, data='(lp0\nI1\naI2\naI3\na.'>
(lp0
I1
aI2
aI3
a.
<type 'str'>

终端C中输出以下内容:

<GearmanJobRequest task='str2upper', unique='7f3bc96ba032288de7e55570fe5ea9b7', priority=None, background=False, state='COMPLETE', timed_out=False>
HELLO WORLD
<GearmanJobRequest task='sum', unique='9195ef08761f921c322a716aa1db61a5', priority=None, background=False, state='COMPLETE', timed_out=False>
6

如果打开终端D,运行worker.py,此时,会有两个worker处理任务sum和str2upper,job server在收到任务后会进行合理地选择,然后将通过IP和port将任务发给某一个worker。

分析worker.py

gearmand的默认端口为4730。在worker.py中gm_worker.register_task()用来注册worker来处理指定的任务,例如:

gm_worker.register_task('sum', sum_worker)

意味着,任务sum使用函数sum_worker()处理。函数sum_worker()有两个参数,其通过第二个参数gearman_job获取client传递的参数数据,即gearman_job.data,这是一个字符串。参数需要使用str传递,如果参数不是str则需要转换为str。str2upper_worker()便使用了pickle库将list和str互相转换。

gm_worker.work()负责让这个worker工作起来。

分析client.py

client.py中gm_client是类gearman.GearmanClient()的一个实例。函数gm_client.submit_job()用来将任务名和任务参数传递给job server。

如果任务参数不是字符串

上面有讲过,如何任务的参数不是str,则需要设法将其转换为str,例如使用pickle。 另外一种方法,就是给gearman.GearmanClient()和gearman.GearmanWorker()定义一个“编码/解码器”。

为worker.py加上编码/解码器:

import gearman
import pickle

class PickleDataEncoder(gearman.DataEncoder):
    @classmethod
    def encode(cls, encodable_object):
        return pickle.dumps(encodable_object)

    @classmethod
    def decode(cls, decodable_string):
        return pickle.loads(decodable_string)
    
class MyGearmanWorker(gearman.GearmanWorker):
    data_encoder = PickleDataEncoder

gm_worker = MyGearmanWorker(['localhost:4730'])

def sum_worker(gearman_worker, gearman_job):
    print gearman_job
    print gearman_job.data
    print type(gearman_job.data)
    return sum(gearman_job.data)

def str2upper_worker(gearman_worker, gearman_job):
    print gearman_job
    print gearman_job.data
    print type(gearman_job.data)
    return gearman_job.data.upper()

gm_worker.register_task('sum', sum_worker)
gm_worker.register_task('str2upper', str2upper_worker)

gm_worker.work()

为client.py加上编码/解码器:

import gearman
import pickle

class PickleDataEncoder(gearman.DataEncoder):
    @classmethod
    def encode(cls, encodable_object):
        return pickle.dumps(encodable_object)

    @classmethod
    def decode(cls, decodable_string):
        return pickle.loads(decodable_string)

class MyGearmanClient(gearman.GearmanClient):
    data_encoder = PickleDataEncoder

gm_client = MyGearmanClient(['localhost:4730'])

request = gm_client.submit_job("str2upper", "hello world")
print request
print request.result

request = gm_client.submit_job("sum", [1,2,3])
print request
print request.result

资料

[1] 任务调度程序Gearman http://www.oschina.net/p/gearman

[2] gearman官网 http://gearman.org/

[3] python-gearman http://pythonhosted.org//gearman/


( 本文完 )