Commit fc435aea authored by Kelly Davis's avatar Kelly Davis Committed by GitHub
Browse files

Merge pull request #113 from mozilla/issue111

Fixed #111
parents 93b7a1e1 b65a78b5
......@@ -9,8 +9,11 @@ const fs = require('fs');
const crypto = require('crypto');
const Promise = require('bluebird');
const mkdirp = require('mkdirp');
const MemoryStream = require('memorystream');
const findRemoveSync = require('find-remove');
const AWS = require('./aws');
const PassThrough = require('stream').PassThrough;
const sox = require('sox-stream');
const UPLOAD_PATH = path.resolve(__dirname, '../..', 'upload');
const CONFIG_PATH = path.resolve(__dirname, '../../..', 'config.json');
......@@ -30,7 +33,7 @@ export default class Clip {
constructor() {
this.s3 = new AWS.S3();
this.files = new Files();
setInterval(findRemoveSync.bind(this, UPLOAD_PATH, {age: {seconds: 300}, extensions: '.mp3'}), 300);
setInterval(findRemoveSync.bind(this, UPLOAD_PATH, {age: {seconds: 300}, extensions: '.mp3'}), 300000);
}
private hash(str: string): string {
......@@ -68,13 +71,6 @@ export default class Clip {
return folder + '/' + fileName;
}
/**
* Load the files module.
*/
init() {
return this.files.init();
}
/**
* Is this request directed at voice clips?
*/
......@@ -120,35 +116,17 @@ export default class Clip {
let sentence = decodeURI(info.sentence as string);
return new Promise((resolve: Function, reject: Function) => {
// First we need to figure out the file extension.
let extension;
// Obtain contentType
let contentType = info['content-type'] as string;
if (contentType.startsWith('audio/ogg')) {
// Firefox gives us opus in an ogg.
extension = '.ogg';
} else if (contentType.startsWith('audio/webm')) {
// Chrome gives us opus in webm.
extension = '.webm';
} else if (contentType.startsWith('audio/m4a')) {
// iOS gives us mp4a.
// Note: Firefox cannot play m4a's,
// but if we save this clipa s mp3 everything just works.
extension = '.mp3';
} else {
// Default to ogg.
console.error('unrecognized audio type!', contentType);
extension = '.ogg';
}
// Where is our audio clip going to be located?
let folder = uid + '/';
let filePrefix = this.hash(sentence);
let file = folder + filePrefix + extension;
let file = folder + filePrefix + '.mp3';
let txtFile = folder + filePrefix + '.txt';
let f = ff(() => {
// if the folder does not exist, we create it
let params = {Bucket: BUCKET_NAME, Key: folder};
this.s3.putObject(params, f.wait());
......@@ -167,12 +145,16 @@ export default class Clip {
// If upload was base64, make sure we decode it first.
if (contentType.includes('base64')) {
let blob = Buffer.from(Buffer.concat(chunks).toString(), 'base64');
let params = {Bucket: BUCKET_NAME, Key: file, Body: blob};
let passThrough = new PassThrough();
passThrough.end(Buffer.from(Buffer.concat(chunks).toString(), 'base64'));
let memStream = new MemoryStream();
memStream = passThrough.pipe(sox({output: { type: 'mp3' } })).pipe(memStream);
let params = {Bucket: BUCKET_NAME, Key: file, Body: memStream};
this.s3.upload(params, f());
} else {
// For now base64 uploads, we can just stream data.
let params = {Bucket: BUCKET_NAME, Key: file, Body: request};
let memStream = request.pipe(sox({output: { type: 'mp3' } })).pipe(new MemoryStream());
let params = {Bucket: BUCKET_NAME, Key: file, Body: memStream};
this.s3.upload(params, f());
}
......@@ -180,9 +162,6 @@ export default class Clip {
let params = {Bucket: BUCKET_NAME, Key: txtFile, Body: sentence};
this.s3.putObject(params, f());
}, () => {
// Converts audio to mp3 if required
this.files.init().then(f());
}, () => {
// File saving is now complete.
console.log('file written to s3', file);
......
......@@ -16,7 +16,6 @@ const config = require(CONFIG_PATH);
const BUCKET_NAME = config.BUCKET_NAME || 'common-voice-corpus';
export default class Files {
private initialized: boolean;
private s3: any;
private files: {
// fileGlob: [
......@@ -28,11 +27,13 @@ export default class Files {
private mp3s: string[];
constructor() {
this.initialized = false;
this.s3 = new AWS.S3();
this.files = {};
this.paths = [];
this.mp3s = [];
this.init().then(() => {
setInterval(this.init.bind(this), 1000);
});
}
/**
......@@ -67,109 +68,6 @@ export default class Files {
});
}
/**
* Find the uploads that haven't been converted to mp3.
*/
private getMissingMP3s() {
let missing = [];
this.paths.forEach((path: string) => {
if (this.files[path].exts.indexOf(MP3_EXT) === -1) {
missing.push(path);
}
});
return missing;
}
private convert(jobs: any, cb: Function) {
if (!Array.isArray(jobs)) {
jobs = [jobs];
}
let finished = 0;
jobs.forEach(job => {
let glob = job.glob;
let ext = job.ext;
// Convert s3 audio from ext to mp3
let sourceParams = {Bucket: BUCKET_NAME, Key: glob + ext};
let awsRequest = this.s3.getObject(sourceParams);
let soxStream = awsRequest.createReadStream()
.on('error', (err) => {
console.error('could not create aws audio stream', err);
})
.pipe(sox({output: { type: 'mp3' } }))
.on('error', (err) => {
console.error('could not stream audio into sox', err);
});
// Pipe mp3 data into a read/write MemoryStream
let memStream = new MemoryStream();
soxStream.pipe(memStream);
// Write memStream to s3
let sinkParams = {Bucket: BUCKET_NAME, Key: glob + MP3_EXT, Body: memStream};
this.s3.upload(sinkParams, (err, data) => {
if (err) {
console.error('Could not write mp3 back to s3', err);
cb();
return;
}
this.files[glob].exts.push(MP3_EXT);
++finished;
if (finished === jobs.length) {
cb();
}
});
});
}
/**
* Convert any sound clips that are not mp3 format into mp3.
*/
private convertMissingToMP3s(): Promise<any> {
let missing = this.getMissingMP3s();
if (missing.length < 1) {
// Nothing to convert, so we are done here;
return Promise.resolve();
}
return new Promise((res: Function, rej: Function) => {
let batches = new Queue(this.convert.bind(this), { batchSize: BATCH_SIZE });
batches.on('error', (err: any) => {
console.error('error process mp3 conversions', err);
rej(err);
return;
});
missing.forEach(glob => {
let ext;
let info = this.files[glob];
for (let i = 0; i < info.exts.length; i++) {
if (CONVERTABLE_EXTS.indexOf(info.exts[i]) !== -1) {
ext = info.exts[i];
break;
}
}
// If we got a convertable extension, add it to our task queue
if (ext) {
batches.push({
glob: glob,
ext: ext
});
}
});
batches.on('drain', () => {
console.log(`Converted ${missing.length} file(s) to mp3.`);
res();
});
});
}
/**
* Make a list of mp3s so we can randomly choose one later.
*/
......@@ -183,15 +81,15 @@ export default class Files {
}
/**
* Load a list of files from the filesystem.
* Load a list of files from S3.
*/
init(): Promise<any> {
private init(): Promise<any> {
// Create our batch processor to help us read all sentences
// from the filesystem without overloading the server.
let batches = new Queue(this.processBatch.bind(this), { batchSize: BATCH_SIZE });
return new Promise((res: Function, rej: Function) => {
let searchParam = {Bucket: BUCKET_NAME};
let searchParam = {Bucket: BUCKET_NAME, MaxKeys: 5000};
let awsRequest = this.s3.listObjectsV2(searchParam);
awsRequest.on('success', (response) => {
......@@ -229,7 +127,6 @@ export default class Files {
awsRequest.on('complete', (response) => {
if (response.error) {
console.error('Error while fetching clip list', response.error);
this.initialized = true;
res();
return;
}
......@@ -238,17 +135,13 @@ export default class Files {
if (this.paths.length === 0) {
// No files found, so we are done
console.log('warning, no sound files found');
this.initialized = true;
res();
return;
}
// Convert any files that haven't been converted to mp3 yet.
this.convertMissingToMP3s().then(() => {
this.generateMP3List();
this.initialized = true;
res();
});
this.generateMP3List();
res();
});
awsRequest.send();
......@@ -259,12 +152,6 @@ export default class Files {
* Grab a random sentence and associated sound file path.
*/
getRandomClip(): Promise<string[2]> {
// If we haven't been initialized yet, we cannot get a random clip.
if (!this.initialized) {
console.error('cannot get random clip before files is initialized');
return Promise.reject('Files not init.');
}
// Make sure we have at least 1 file to choose from.
if (this.mp3s.length === 0) {
return Promise.reject('No files.');
......
......@@ -52,14 +52,6 @@ export default class Server {
}).resume();
}
/**
* Boot up all our dependencies.
*/
init(): Promise<any> {
// Clip needs some initializatin to load all the local clips.
return this.clip.init();
}
/**
* Start up everything.
*/
......@@ -75,7 +67,5 @@ export default class Server {
// If this file is run, boot up a new server instance.
if (require.main === module) {
let server = new Server();
server.init().then(() => {
server.run();
});
server.run();
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment