Commit 3e9f72b6 authored by Rudy BARAGLIA's avatar Rudy BARAGLIA
Browse files

Added coroutine and condition to server in order to wait for worker response

parent ed0f1996
......@@ -17,7 +17,9 @@ SERVER_REQUEST_PATH = u"/client/post/speech"
def main():
with open('../linSTT-dispatch/tests/mocanu-Samy.wav', 'rb') as f:
r = requests.post(SERVER_ADRESS+SERVER_PORT+SERVER_REQUEST_PATH, files={'file_to_transcript': f})
print(str(r))
print(type(r))
print(r.headers)
print(r.status_code)
if __name__ == '__main__':
main()
\ No newline at end of file
......@@ -16,6 +16,8 @@ import configparser
import tornado.ioloop
import tornado.web
import tornado.websocket
from tornado import gen
from tornado.locks import Condition
#LOADING CONFIGURATION
server_settings = configparser.ConfigParser()
......@@ -53,15 +55,7 @@ class MainHandler(tornado.web.RequestHandler):
parent_directory = os.path.join(current_directory, os.pardir)
readme = os.path.join(parent_directory, "README.md")
self.render(readme)
def run_async(func):
@functools.wraps(func)
def async_func(*args, **kwargs):
func_hl = threading.Thread(target=func, args=args, kwargs=kwargs)
func_hl.start()
return func_hl
return async_func
#Handler des requêtes de décodage.
class DecodeRequestHandler(tornado.web.RequestHandler):
......@@ -72,16 +66,17 @@ class DecodeRequestHandler(tornado.web.RequestHandler):
self.worker = None
self.filePath = None
self.uuid = str(uuid.uuid4())
self.set_status(200, "Initial statut")
self.condition = Condition()
if self.request.method != 'POST' :
logging.debug("Received a non-POST request")
self.set_statut(403)
self.finish("Wrong Method")
self.set_status(403, "Wrong request, server handles only POST requests")
self.finish()
#File Retrieval
# TODO: Adapt input to existing controller API
if 'file_to_transcript' not in self.request.files.keys():
self.set_statut(403)
self.finish("Wrong request format")
self.set_status(403, "POST request must contain a 'file_to_transcript' field.")
self.finish()
logging.debug("POST request from %s does not contain 'file_to_transcript' field.")
temp_file = self.request.files['file_to_transcript'][0]['body']
self.temp_file = temp_file
......@@ -91,22 +86,14 @@ class DecodeRequestHandler(tornado.web.RequestHandler):
f = open(TEMP_FILE_PATH+self.uuid+'.wav', 'wb')
except IOError:
logging.error("Could not write file.")
self.set_statut(500)
self.finish("Server Error: Could not write input file")
self.set_status(500, "Server error: Counldn't write file on server side.")
self.finish()
else:
f.write(temp_file)
self.filePath = TEMP_FILE_PATH+self.uuid+'.wav'
logging.debug("File correctly received from %s")
logging.debug("File correctly received from client")
#@tornado.gen.coroutine
def allocate_worker(self, *args, **kwargs):
assert self.worker is not None
logging.debug("Aloo ?")
self.application.num_requests_processed += 1
self.worker.set_client_handler(None)
self.worker.close()
self.finish()
@gen.coroutine
def post(self, *args, **kwargs):
logging.debug("Allocating Worker to %s" % self.uuid)
try:
......@@ -114,26 +101,31 @@ class DecodeRequestHandler(tornado.web.RequestHandler):
except:
# TODO: Add queue
logging.error("Failed to allocate worker to %s" % self.uuid)
self.set_status(503)
self.finish("Failed to allocate worker")
self.set_status(503, "Failed to allocate a worker")
self.finish()
else:
self.worker.client_handler = self
logging.debug("Worker allocate to client %s" % self.uuid)
logging.debug("Available workers: " + str(len(self.application.available_workers)))
self.worker.write_message(json.dumps({'uuid':self.uuid, 'file': self.temp_file.encode('base64')}))
def send_response(self, message):
self.set_status(200)
self.finish(message)
yield self.condition.wait()
@gen.coroutine
def receive_response(self, message):
logging.debug("Forwarding transcription to client")
self.add_header('result', message)
self.set_status(200, "Transcription succeded")
self.condition.notify()
#self.finish()
def send_error(self, error_msg):
self.set_status(503)
self.finish("Failed to allocate worker")
logging.debug("Error received from worker: %s" % error_msg)
#self.set_status(503)
#self.finish()
def on_finish(self):
#CLEANUP
logging.debug("on_finish called")
pass
#WebSocket de communication entre le serveur et le worker
class WorkerWebSocketHandler(tornado.websocket.WebSocketHandler):
......@@ -153,9 +145,8 @@ class WorkerWebSocketHandler(tornado.websocket.WebSocketHandler):
logging.debug("Message received from worker:" + message)
else:
if 'transcription' in json_msg.keys(): #Receive the file path to process
logging.debug("Transcription received: %s" % json_msg['transcription'])
self.client_handler.send_response(json.dumps({'transcript':json_msg['transcription']}))
logging.debug("Response send by worker : %s" % json.dumps({'transcript':json_msg['transcription']}))
self.client_handler.receive_response(json.dumps({'transcript':json_msg['transcription']}))
self.client_handler = None
self.application.available_workers.add(self)
logging.debug("WORKER Available workers: " + str(len(self.application.available_workers)))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment