Running parallel girder workers from celery

In this post, I will share a working example of celery task that runs the girder workers in parallel.

Prerequisites:

The girder worker allows us to write independent tasks and orchestrate them in a chain of tasks to get a workflow. The output of one task (worker) can be fed as an input to the other worker in the chain. The workers can be run independently or in the chain (workflow).

The celery allows us to create tasks which can execute asynchronously. The grouping feature of celery allows us to call the group of tasks. The group of tasks is fed to the task queue and is processed in parallel manner. 
NOTE: The number of tasks executed at a time is dependent on the number of CPU cores present in your machine.

Lets start with the working example borrowed from celery and girder worker manual.

First, lets create the girder worker. This is very problematic if we mesh up with the indentation. I am using the example that takes an input image and blurs the image. I modified the task to read the image from the web, blur it and then save it.  The code follows:

from datetime import datetime

import girder_worker

from girder_worker.specs import Workflow


class girderWorkerClass:


    def __init__(self):

        print "!!!!!inside default constructor"


    def runWorkFlow(self):

        wf = Workflow()

        lenna = {

        'type': 'image',

        'format': 'png',

        'url': 'https://upload.wikimedia.org/wikipedia/en/2/24/Lenna.png'

        }


        '''

        task to save the image

        '''

        save_image = {

            'inputs': [

            {'name': 'the_image', 'type': 'image', 'format': 'pil'}

            ],

            'outputs': [],

            'script': '''

from datetime import datetime

fileName = '/home/vagrant/celery/proj/lenna__'+ datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')+'.jpg'

the_image.save(fileName)

print "file saved to %s ",fileName

'''

}

        #running the single task

        #output = girder_worker.run(save_image, {'the_image': lenna})



        #task to blur the image

        blur_image = {

            'inputs': [

            {'name': 'blur_input', 'type': 'image', 'format': 'pil'},

            {'name': 'blur_radius', 'type': 'number', 'format': 'number'}

            ],

            'outputs': [{'name': 'blur_output', 'type': 'image', 'format': 'pil'}],

            'script': '''

from PIL import ImageFilter

blur_output = blur_input.filter(ImageFilter.GaussianBlur(blur_radius))

print "blur task completed"

}

        #add the tasks to the workflow

        wf.add_task(blur_image, 'blur')

        wf.add_task(save_image, 'save')

        #connect the tasks in the order of execution

        wf.connect_tasks('blur', 'save', {'blur_output': 'the_image'})

        #run the workflow

        output = girder_worker.run(

            wf,

            inputs={

            'blur_input': lenna,

            'blur_radius': {'format': 'number', 'data': 5}

        }

        ) 

Lets save this file with name girderWorker.py. 

The code is pretty simple. We just create one class (girderWorkerClass) that has a default constructor that just prints a statement. The main method is the runWorkFlow() method. This method doesn't take any parameter but defines the worker tasks save_image  and blur_image. The two tasks are orchestrated and run using the command girder_worker.run(....). Now we will use this girder worker in our celery and execute this in parallel.

Lets create a celery project that takes the form:

proj
     __init__.py
     tasks.py
     girderWorker.py
     celery.py

The proj folder is our project folder. The empty file __init__.py indicates that this is a package.
The file girderWorker.py is the one we created above.
The file celery.py is the main file of celery. It contains the following code:

from __future__ import absolute_import

from celery import Celery

app = Celery('proj',

             broker='amqp://',

             backend='amqp://',

             include=['proj.tasks'])

# Optional configuration, see the application user guide.

app.conf.update(

    CELERY_TASK_RESULT_EXPIRES=3600,

)

if __name__ == '__main__':

    app.start()

This just registers our tasks (defined later in the file tasks.py) and defines the configuration for the broker, backend, and other configuration parameters. Don't worry about these configuration parameters for now.

The line app = Celery(...) creates a celery object which we will refer in our tasks.py file later. The broker, backend are set to Rabbitmq (the detail will follow in another post).

Now let's look into the tasks.py file:

from __future__ import absolute_import

from proj.celery import app

import time

from datetime import datetime

from proj.girderWorker import girderWorkerClass

 

@app.task

def add(x, y):

    time.sleep(20)

    timeString = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]

    return str(x + y)+" computed at:"+timeString


@app.task

def mul(x, y):

    return x * y

 

@app.task

def xsum(numbers):

    return sum(numbers)

@app.task

def girderTask():

    time.sleep(20)

    girderWorkerObject = girderWorkerClass()

    girderWorkerObject.runWorkFlow()

    return "SUCCESS"


The code is pretty straightforward. The methods annotated by @app.task are the one that can be called using celery. The add, mul, xsum are the methods taken from the celery manual. As these are straightforward, I am going to explain the girderTask() method that calls our girderWorker class.
The girderTask() method is also annotated by @app.task, which means it can be called from the celery application named "app". This name "app" is the one we defined in the celery.py file before.

Now to show the effect of parallel operation, I have the method wait for 20s and execute the runWorkFlow() method in the girderWorker class. The girder worker tasks will produce the blurred image and save with a file name that is time stamped by the current timestamp. So the files with very close timestamp (closeness in microseconds) are the one produced by the tasks executed in one batch.

As my machine has 4 cores, I will have 4 files with similar name. 

Now, lets create a client file that executes the girderTask() in parallel. The code client.py takes the form:

from celery import group
from proj.tasks import girderTask 
 
group(girderTask.s() for i in xrange(10))().get() #create the group of tasks
 

The first import statement imports the group package that is used to create the group of tasks and the second import imports our girderTask that is defined inside the project folder proj and inside the tasks file (tasks.py).
The group package has the method group() that takes the list of tasks to be executed in parallel. Here, I just create 10 instances of the girderTask() and use the subtask approach (girderTask.s()) to execute. The .get() waits for the completion of the all the 10 tasks (i.e if there is any statement below the group(), then this will block).

After we have all the files, we need to first start the celery application.

In order to run the celery, we need to run it from the folder where the proj folder is located (at the same level of proj folder, not from inside of proj folder):

celery -A proj worker -l info
 
This means we are asking celery to run the worker that is defined in the proj folder and -l info is to log the INFO level log messages
 
After this, we should be able to get the celery welcome screen that indicates that celery is listening on the incoming message requests. It looks as:

(girder_env) vagrant@data-science-toolbox:~/celery$ celery -A proj worker -l info

[2016-06-08 05:11:01,804: WARNING/MainProcess] /home/vagrant/girder_env/local/lib/python2.7/site-packages/celery/apps/worker.py:162: CDeprecationWarning: 

Starting from version 3.2 Celery will refuse to accept pickle by default.


The pickle serializer is a security concern as it may give attackers

the ability to execute any command.  It's important to secure

your broker from unauthorized access when using pickle, so we think

that enabling pickle should require a deliberate action and not be

the default choice.


If you depend on pickle then you should set a setting to disable this

warning and to be sure that everything will continue working

when you upgrade to Celery 3.2::


    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']


You must only enable the serializers that you will actually use.



  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 

 -------------- celery@data-science-toolbox v3.1.20 (Cipater)

---- **** ----- 

--- * ***  * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty

-- * - **** --- 

- ** ---------- [config]

- ** ---------- .> app:         proj:0x7f628dab8250

- ** ---------- .> transport:   amqp://guest:**@localhost:5672//

- ** ---------- .> results:     amqp:///

- *** --- * --- .> concurrency: 4 (prefork)

-- ******* ---- 

--- ***** ----- [queues]

 -------------- .> celery           exchange=celery(direct) key=celery

                


[tasks]

  . proj.tasks.add

  . proj.tasks.girderTask

  . proj.tasks.mul

  . proj.tasks.xsum


[2016-06-08 05:11:01,820: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//

[2016-06-08 05:11:01,827: INFO/MainProcess] mingle: searching for neighbors

[2016-06-08 05:11:02,835: INFO/MainProcess] mingle: all alone

[2016-06-08 05:11:02,849: WARNING/MainProcess] celery@data-science-toolbox ready.

Now, in another console tab, we run our client.py file using:

 python client.py 

After this, we should be able to see the result in a while. In my case, the result is the blurred images saved in a folder. The name of the blurred images looked like this:

 -rw-rw-r-- 1 vagrant vagrant 17096 Jun  8 05:11 proj/lenna__2016-06-08 05:11:47.569200.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun  8 05:11 proj/lenna__2016-06-08 05:11:47.574676.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun  8 05:11 proj/lenna__2016-06-08 05:11:47.577239.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun  8 05:11 proj/lenna__2016-06-08 05:11:47.577431.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun  8 05:12 proj/lenna__2016-06-08 05:12:08.019034.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun  8 05:12 proj/lenna__2016-06-08 05:12:08.041674.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun  8 05:12 proj/lenna__2016-06-08 05:12:08.043691.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun  8 05:12 proj/lenna__2016-06-08 05:12:08.059164.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun  8 05:12 proj/lenna__2016-06-08 05:12:28.461512.jpg
-rw-rw-r-- 1 vagrant vagrant 17096 Jun  8 05:12 proj/lenna__2016-06-08 05:12:28.515340.jpg

You can see that first 4 files have similar names (just a diff of microseconds), the next 4 files have names that are timestamped 20s after the first 4 files. This is because, even though the celery takes the message requests, the girderWorker.runWorkFlow() method has 20s sleep time. So, these workers sleep for 20s and blur the image. The last two files have 20s timestamped after the second set of 4 files.

Lesson Learned:
  1. Be careful while writing the worker tasks because they need to be properly aligned (to have the import inside the script part of worker be in leftmost aligned).
I will share the working example in my gitHub account after a while.

No comments:

Post a Comment