软件架构与思考

👉 所有文章
高性能 高性能设计模式
数据库 数据库分库分表指南 MySQL 建表参考 MySQL 初始化数据的一些方案 数据版本号
分布式 ID 分布式 ID 生成方案探讨
缓存 缓存 使用数据版本号保证缓存是最新数据 基于redis的二级缓存
微服务 如何实现远程调用 RPC 协议中的数据签名验签和加解密方案探讨 关于服务间调用循环依赖的一些思考 我所理解的负载均衡 一致性哈希 基于Redis的分布式会话管理系统 如何部署服务 灰度发布 如何区分上游和下游 日志级别
算法与协议 一个可扩展的 MQ 消息设计 Dynamo涉及的算法和协议 写时复制
任务分发 Gearman入门 如何使用redis构建异步任务处理程序
安全 关于对账的一些理解 一个简单可靠的 Dubbo 请求/响应数据签名方案
其他 使用卫语句减少 if else 嵌套

Gearman入门


2014-04-07

gearman是一个支持多语言的任务分发框架,很像远程过程调用。

来自网络

上图是来自资料[1],展示了gearman的基本设计思路。worker作为执行任务的节点,在生成时会向job server注册。client节点向job server发起任务,job server将任务分配给worker,job server作为一个中间人会负责client和worker之间的任务参数、执行结果等信息的传递。Gearman的更多内容,请参考资料[2]。

在ubuntu下安装:

sudo apt-get install gearman

启动:

service gearman-job-server start

或者使用gearmand启动。

安装相应的python库:

sudo pip install gearman

关于这个库,具体请参考资料[3]。

从示例开始


创建文件worker.py,内容如下:

import gearman
import pickle

gm_worker = gearman.GearmanWorker(['localhost:4730'])

def sum_worker(gearman_worker, gearman_job):
    print gearman_job
    print gearman_job.data
    print type(gearman_job.data)
    return pickle.dumps(sum(pickle.loads(gearman_job.data)))

def str2upper_worker(gearman_worker, gearman_job):
    print gearman_job
    print gearman_job.data
    print type(gearman_job.data)
    return gearman_job.data.upper()

gm_worker.register_task('sum', sum_worker)
gm_worker.register_task('str2upper', str2upper_worker)

gm_worker.work()

创建文件client.py,内容如下:

import gearman
import pickle

gm_client = gearman.GearmanClient(['localhost:4730'])

request = gm_client.submit_job("str2upper", "hello world")
print request
print request.result

request = gm_client.submit_job("sum", pickle.dumps([1,2,3]))
print request
print pickle.loads(request.result)

打开终端A,运行gearman:

sudo gearmand

打开另外一个终端B,运行worker.py:

python worker.py

再打开一个终端C,运行client.py:

python client.py

此时,终端B输出以下内容:

<GearmanJob connection/handle=(<GearmanConnection localhost:4730 connected=True>, 'H:myhost:5'), task=str2upper, unique=7f3bc96ba032288de7e55570fe5ea9b7, data='hello world'>
hello world
<type 'str'>
<GearmanJob connection/handle=(<GearmanConnection localhost:4730 connected=True>, 'H:myhost:6'), task=sum, unique=9195ef08761f921c322a716aa1db61a5, data='(lp0\nI1\naI2\naI3\na.'>
(lp0
I1
aI2
aI3
a.
<type 'str'>

终端C中输出以下内容:

<GearmanJobRequest task='str2upper', unique='7f3bc96ba032288de7e55570fe5ea9b7', priority=None, background=False, state='COMPLETE', timed_out=False>
HELLO WORLD
<GearmanJobRequest task='sum', unique='9195ef08761f921c322a716aa1db61a5', priority=None, background=False, state='COMPLETE', timed_out=False>
6

如果打开终端D,运行worker.py,此时,会有两个worker处理任务sum和str2upper,job server在收到任务后会进行合理地选择,然后将通过IP和port将任务发给某一个worker。

分析worker.py


gearmand的默认端口为4730。在worker.py中gm_worker.register_task()用来注册worker来处理指定的任务,例如:

gm_worker.register_task('sum', sum_worker)

意味着,任务sum使用函数sum_worker()处理。函数sum_worker()有两个参数,其通过第二个参数gearman_job获取client传递的参数数据,即gearman_job.data,这是一个字符串。参数需要使用str传递,如果参数不是str则需要转换为str。str2upper_worker()便使用了pickle库将list和str互相转换。

gm_worker.work()负责让这个worker工作起来。

分析client.py


client.py中gm_client是类gearman.GearmanClient()的一个实例。函数gm_client.submit_job()用来将任务名和任务参数传递给job server。

如果任务参数不是字符串


上面有讲过,如何任务的参数不是str,则需要设法将其转换为str,例如使用pickle。 另外一种方法,就是给gearman.GearmanClient()和gearman.GearmanWorker()定义一个“编码/解码器”。

为worker.py加上编码/解码器:

import gearman
import pickle

class PickleDataEncoder(gearman.DataEncoder):
    @classmethod
    def encode(cls, encodable_object):
        return pickle.dumps(encodable_object)

    @classmethod
    def decode(cls, decodable_string):
        return pickle.loads(decodable_string)

class MyGearmanWorker(gearman.GearmanWorker):
    data_encoder = PickleDataEncoder

gm_worker = MyGearmanWorker(['localhost:4730'])

def sum_worker(gearman_worker, gearman_job):
    print gearman_job
    print gearman_job.data
    print type(gearman_job.data)
    return sum(gearman_job.data)

def str2upper_worker(gearman_worker, gearman_job):
    print gearman_job
    print gearman_job.data
    print type(gearman_job.data)
    return gearman_job.data.upper()

gm_worker.register_task('sum', sum_worker)
gm_worker.register_task('str2upper', str2upper_worker)

gm_worker.work()

为client.py加上编码/解码器:

import gearman
import pickle

class PickleDataEncoder(gearman.DataEncoder):
    @classmethod
    def encode(cls, encodable_object):
        return pickle.dumps(encodable_object)

    @classmethod
    def decode(cls, decodable_string):
        return pickle.loads(decodable_string)

class MyGearmanClient(gearman.GearmanClient):
    data_encoder = PickleDataEncoder

gm_client = MyGearmanClient(['localhost:4730'])

request = gm_client.submit_job("str2upper", "hello world")
print request
print request.result

request = gm_client.submit_job("sum", [1,2,3])
print request
print request.result

资料


[1] 任务调度程序Gearman http://www.oschina.net/p/gearman

[2] gearman官网 http://gearman.org/

[3] python-gearman http://pythonhosted.org//gearman/


( 本文完 )

文章目录