from datetime import datetime
import girder_worker
from girder_worker.specs import Workflow
class girderWorkerClass:
def __init__(self):
print "!!!!!inside default constructor"
def runWorkFlow(self):
wf = Workflow()
lenna = {
'type': 'image',
'format': 'png',
'url': 'https://upload.wikimedia.org/wikipedia/en/2/24/Lenna.png'
}
'''
task to save the image
'''
save_image = {
'inputs': [
{'name': 'the_image', 'type': 'image', 'format': 'pil'}
],
'outputs': [],
'script': '''
from datetime import datetime
fileName = '/home/vagrant/celery/proj/lenna__'+ datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')+'.jpg'
the_image.save(fileName)
print "file saved to %s ",fileName
'''
}
#running the single task
#output = girder_worker.run(save_image, {'the_image': lenna})
#task to blur the image
blur_image = {
'inputs': [
{'name': 'blur_input', 'type': 'image', 'format': 'pil'},
{'name': 'blur_radius', 'type': 'number', 'format': 'number'}
],
'outputs': [{'name': 'blur_output', 'type': 'image', 'format': 'pil'}],
'script': '''
from PIL import ImageFilter
blur_output = blur_input.filter(ImageFilter.GaussianBlur(blur_radius))
print "blur task completed"
#add the tasks to the workflow
wf.add_task(blur_image, 'blur')
wf.add_task(save_image, 'save')
#connect the tasks in the order of execution
wf.connect_tasks('blur', 'save', {'blur_output': 'the_image'})
#run the workflow
output = girder_worker.run(
wf,
inputs={
'blur_input': lenna,
'blur_radius': {'format': 'number', 'data': 5}
}
from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
broker='amqp://',
backend='amqp://',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
app.start()
from __future__ import absolute_import
from proj.celery import app
import time
from datetime import datetime
from proj.girderWorker import girderWorkerClass
@app.task
def add(x, y):
time.sleep(20)
timeString = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
return str(x + y)+" computed at:"+timeString
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
@app.task
def girderTask():
time.sleep(20)
girderWorkerObject = girderWorkerClass()
girderWorkerObject.runWorkFlow()
return "SUCCESS"
from celery import group
from proj.tasks import girderTask
group(girderTask.s() for i in xrange(10))().get() #create the group of tasks
celery -A proj worker -l info
(girder_env) vagrant@data-science-toolbox:~/celery$ celery -A proj worker -l info
[2016-06-08 05:11:01,804: WARNING/MainProcess] /home/vagrant/girder_env/local/lib/python2.7/site-packages/celery/apps/worker.py:162: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.
The pickle serializer is a security concern as it may give attackers
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.
If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
You must only enable the serializers that you will actually use.
warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
-------------- celery@data-science-toolbox v3.1.20 (Cipater)
---- **** -----
--- * *** * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x7f628dab8250
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: amqp:///
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. proj.tasks.add
. proj.tasks.girderTask
. proj.tasks.mul
. proj.tasks.xsum
[2016-06-08 05:11:01,820: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2016-06-08 05:11:01,827: INFO/MainProcess] mingle: searching for neighbors
[2016-06-08 05:11:02,835: INFO/MainProcess] mingle: all alone
[2016-06-08 05:11:02,849: WARNING/MainProcess] celery@data-science-toolbox ready.
python client.py
-rw-rw-r-- 1 vagrant vagrant 17096 Jun 8 05:11 proj/lenna__2016-06-08 05:11:47.569200.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun 8 05:11 proj/lenna__2016-06-08 05:11:47.574676.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun 8 05:11 proj/lenna__2016-06-08 05:11:47.577239.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun 8 05:11 proj/lenna__2016-06-08 05:11:47.577431.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun 8 05:12 proj/lenna__2016-06-08 05:12:08.019034.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun 8 05:12 proj/lenna__2016-06-08 05:12:08.041674.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun 8 05:12 proj/lenna__2016-06-08 05:12:08.043691.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun 8 05:12 proj/lenna__2016-06-08 05:12:08.059164.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun 8 05:12 proj/lenna__2016-06-08 05:12:28.461512.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun 8 05:12 proj/lenna__2016-06-08 05:12:28.515340.jpg
- Be careful while writing the worker tasks because they need to be properly aligned (to have the import inside the script part of worker be in leftmost aligned).
No comments:
Post a Comment