master_server.py 7.96 KB
Newer Older
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Wed Jan  3 16:53:16 2018

@author: rbaraglia
"""
import os
import json
import functools
import threading
import uuid
import logging
import configparser

import tornado.ioloop
import tornado.web
import tornado.websocket
19 20
from tornado import gen
from tornado.locks import Condition
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
21 22 23 24 25 26 27

#LOADING CONFIGURATION
server_settings = configparser.ConfigParser()
server_settings.read('server.cfg')
SERVER_PORT = server_settings.get('server_params', 'listening_port')
TEMP_FILE_PATH = server_settings.get('machine_params', 'temp_file_location')
KEEP_TEMP_FILE = True if server_settings.get('server_params', 'keep_temp_files') == 'true' else False
28
LOGGING_LEVEL = logging.DEBUG if server_settings.get('server_params', 'debug') == 'true' else logging.INFO
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
29

30 31
if "OFFLINE_PORT" in os.environ:
    SERVER_PORT = os.environ['OFFLINE_PORT']
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
32 33 34 35 36 37 38 39 40 41 42 43 44 45

class Application(tornado.web.Application):
    def __init__(self):
        settings = dict(
            cookie_secret="43oETzKXQAGaYdkL5gEmGeJJFuYh7EQnp2XdTP1o/Vo=",
            template_path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "templates"),
            static_path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "static"),
            xsrf_cookies=False,
            autoescape=None,
        )

        handlers = [
            (r"/", MainHandler),
            (r"/client/post/speech", DecodeRequestHandler),
46
            (r"/upload", DecodeRequestHandler),
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
47 48 49
            (r"/worker/ws/speech", WorkerWebSocketHandler)
        ]
        tornado.web.Application.__init__(self, handlers, **settings)
50
        self.connected_worker = 0
51 52
        self.available_workers = set()
        self.waiting_client = set()
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
53
        self.num_requests_processed = 0
54

55
    #TODO: Abort request when the client is waiting for a determined amount of time
56 57 58 59 60 61 62 63 64
    def check_waiting_clients(self):
        if len(self.waiting_client) > 0:
            try:
                client = self.waiting_client.pop()
            except:
                pass
            else:
                 client.waitWorker.notify() 

65 66
    def display_server_status(self):
        logging.info('#'*50)
67
        logging.info("Connected workers: %s (Available: %s)" % (str(self.connected_worker),str(len(self.available_workers))))
68 69
        logging.info("Waiting clients: %s" % str(len(self.waiting_client)))
        logging.info("Requests processed: %s" % str(self.num_requests_processed))
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
            

# Return le README
class MainHandler(tornado.web.RequestHandler):
    def get(self):
        current_directory = os.path.dirname(os.path.abspath(__file__))
        parent_directory = os.path.join(current_directory, os.pardir)
        readme = os.path.join(parent_directory, "README.md")
        self.render(readme)


#Handler des requêtes de décodage. 
class DecodeRequestHandler(tornado.web.RequestHandler):
    SUPPORTED_METHOD = ('POST')
    #Called at the beginning of a request before get/post/etc
    def prepare(self):
        self.worker = None
        self.filePath = None
        self.uuid = str(uuid.uuid4())
89
        self.set_status(200, "Initial statut")
90 91
        self.waitResponse = Condition()
        self.waitWorker = Condition()
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
92 93
        if self.request.method != 'POST' :
            logging.debug("Received a non-POST request")
94 95
            self.set_status(403, "Wrong request, server handles only POST requests")
            self.finish()
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
96 97
        #File Retrieval
        # TODO: Adapt input to existing controller API
98
        if 'wavFile' not in  self.request.files.keys():
99 100
            self.set_status(403, "POST request must contain a 'file_to_transcript' field.")
            self.finish()
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
101
            logging.debug("POST request from %s does not contain 'file_to_transcript' field.")
102
        temp_file = self.request.files['wavFile'][0]['body']
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
103 104 105 106 107 108 109
        self.temp_file = temp_file
        
        #Writing file
        try:
            f = open(TEMP_FILE_PATH+self.uuid+'.wav', 'wb')
        except IOError:
            logging.error("Could not write file.")
110 111
            self.set_status(500, "Server error: Counldn't write file on server side.")
            self.finish()
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
112 113 114
        else:
           f.write(temp_file)
           self.filePath = TEMP_FILE_PATH+self.uuid+'.wav'
115
           logging.debug("File correctly received from client")             
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
116

117
    @gen.coroutine    
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
118 119
    def post(self, *args, **kwargs):
        logging.debug("Allocating Worker to %s" % self.uuid)
120 121 122 123 124 125 126 127 128 129 130 131 132
        yield self.allocate_worker()
        self.worker.write_message(json.dumps({'uuid':self.uuid, 'file': self.temp_file.encode('base64')}))
        yield self.waitResponse.wait()
        self.finish()
    
    @gen.coroutine
    def allocate_worker(self):
        while self.worker == None:
            try:
                self.worker = self.application.available_workers.pop()
            except:
                self.worker = None
                self.application.waiting_client.add(self)
133
                self.application.display_server_status()
134 135 136 137
                yield self.waitWorker.wait()
            else:
                self.worker.client_handler = self
                logging.debug("Worker allocated to client %s" % self.uuid)
138
                self.application.display_server_status()
139 140


141 142 143 144
          
    @gen.coroutine        
    def receive_response(self, message):
        logging.debug("Forwarding transcription to client")
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
145
        self.write({'transcript': message})
146
        os.remove(TEMP_FILE_PATH+self.uuid+'.wav')
147
        self.set_status(200, "Transcription succeded")
148
        self.application.num_requests_processed += 1
149 150
        self.waitResponse.notify()
        
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
151 152 153

    def on_finish(self):
        #CLEANUP
154
        pass
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
155 156 157 158 159 160 161 162 163

#WebSocket de communication entre le serveur et le worker
class WorkerWebSocketHandler(tornado.websocket.WebSocketHandler):
    def check_origin(self, origin):
        return True
    
    def open(self):
        self.client_handler = None 
        self.application.available_workers.add(self)
164
        self.application.connected_worker += 1
165
        self.application.check_waiting_clients()
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
166
        logging.debug("Worker connected")
167
        self.application.display_server_status()
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
168 169 170 171 172 173 174 175
        
    def on_message(self, message):
        try:
            json_msg = json.loads(str(message))
        except:
            logging.debug("Message received from worker:" + message)
        else:  
            if 'transcription' in json_msg.keys(): #Receive the file path to process
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
176 177
                response = json.dumps({'transcript':json_msg['transcription'].encode('utf-8')})
                logging.debug("Response send by worker : %s" % response)
178
                self.client_handler.receive_response(json.dumps({'transcript':json_msg['transcription']}))
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
179 180
                self.client_handler = None
                self.application.available_workers.add(self)
181
                self.application.display_server_status()
182 183
                self.application.check_waiting_clients()
                
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
184 185
            elif 'error' in json_msg.keys():
                logging.debug("WORKER Received error message worker, forwardind to client")
186
                #TODO: Error forwarding to client
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
187 188 189 190
                self.close()
        
    def on_close(self):
        if self.client_handler != None:
191 192
            self.client_handler.set_status(503, "Worker failed to translate file")
            self.client_handler.finish()
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
193 194
        logging.debug("WORKER WebSocket closed")
        self.application.available_workers.discard(self)
195
        self.application.connected_worker -= 1
196
        self.application.display_server_status()
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
197 198 199

def main():
    
200
    logging.basicConfig(level=LOGGING_LEVEL, format="%(levelname)8s %(asctime)s %(message)s ")
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
201 202 203 204 205 206 207
    #Check if the temp_file repository exist
    if not os.path.isdir(TEMP_FILE_PATH):
        os.mkdir(TEMP_FILE_PATH)
    print('#'*50)
    app = Application()
    app.listen(int(SERVER_PORT))
    logging.info('Starting up server listening on port %s' % SERVER_PORT)
208 209 210 211
    try:
        tornado.ioloop.IOLoop.instance().start()
    except KeyboardInterrupt:
        logging.info("Server close by user.")
Rudy BARAGLIA's avatar
Rudy BARAGLIA committed
212 213
if __name__ == '__main__':
    main()