Commit 01f9e739 authored by Rudy BARAGLIA's avatar Rudy BARAGLIA

Added waiting queue for clients when a worker isn't available

parent 3e9f72b6
......@@ -43,9 +43,19 @@ class Application(tornado.web.Application):
(r"/worker/ws/speech", WorkerWebSocketHandler)
]
tornado.web.Application.__init__(self, handlers, **settings)
self.available_workers = set() # Hold available worker
self.active_connexions = dict() # Hold active client connexions
self.available_workers = set()
self.waiting_client = set()
self.num_requests_processed = 0
def check_waiting_clients(self):
if len(self.waiting_client) > 0:
try:
client = self.waiting_client.pop()
except:
pass
else:
client.waitWorker.notify()
# Return le README
......@@ -67,7 +77,8 @@ class DecodeRequestHandler(tornado.web.RequestHandler):
self.filePath = None
self.uuid = str(uuid.uuid4())
self.set_status(200, "Initial statut")
self.condition = Condition()
self.waitResponse = Condition()
self.waitWorker = Condition()
if self.request.method != 'POST' :
logging.debug("Received a non-POST request")
self.set_status(403, "Wrong request, server handles only POST requests")
......@@ -96,32 +107,35 @@ class DecodeRequestHandler(tornado.web.RequestHandler):
@gen.coroutine
def post(self, *args, **kwargs):
logging.debug("Allocating Worker to %s" % self.uuid)
try:
self.worker = self.application.available_workers.pop()
except:
# TODO: Add queue
logging.error("Failed to allocate worker to %s" % self.uuid)
self.set_status(503, "Failed to allocate a worker")
self.finish()
else:
self.worker.client_handler = self
logging.debug("Worker allocate to client %s" % self.uuid)
logging.debug("Available workers: " + str(len(self.application.available_workers)))
self.worker.write_message(json.dumps({'uuid':self.uuid, 'file': self.temp_file.encode('base64')}))
yield self.condition.wait()
yield self.allocate_worker()
self.worker.write_message(json.dumps({'uuid':self.uuid, 'file': self.temp_file.encode('base64')}))
yield self.waitResponse.wait()
self.finish()
@gen.coroutine
def allocate_worker(self):
while self.worker == None:
try:
self.worker = self.application.available_workers.pop()
except:
self.worker = None
self.application.waiting_client.add(self)
logging.debug("Awaiting client: %s" % str(len(self.application.waiting_client)))
yield self.waitWorker.wait()
else:
self.worker.client_handler = self
logging.debug("Worker allocated to client %s" % self.uuid)
logging.debug("Available workers: " + str(len(self.application.available_workers)))
@gen.coroutine
def receive_response(self, message):
logging.debug("Forwarding transcription to client")
self.add_header('result', message)
self.set_status(200, "Transcription succeded")
self.condition.notify()
#self.finish()
def send_error(self, error_msg):
logging.debug("Error received from worker: %s" % error_msg)
#self.set_status(503)
#self.finish()
self.waitResponse.notify()
def on_finish(self):
#CLEANUP
......@@ -135,6 +149,7 @@ class WorkerWebSocketHandler(tornado.websocket.WebSocketHandler):
def open(self):
self.client_handler = None
self.application.available_workers.add(self)
self.application.check_waiting_clients()
logging.debug("Worker connected")
logging.debug("Available workers: " + str(len(self.application.available_workers)))
......@@ -150,9 +165,11 @@ class WorkerWebSocketHandler(tornado.websocket.WebSocketHandler):
self.client_handler = None
self.application.available_workers.add(self)
logging.debug("WORKER Available workers: " + str(len(self.application.available_workers)))
self.application.check_waiting_clients()
elif 'error' in json_msg.keys():
logging.debug("WORKER Received error message worker, forwardind to client")
self.client_handler.send_error(message)
#TODO: Error forwarding to client
self.close()
def on_close(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment