Commit dbb80451 authored by Dan Povey's avatar Dan Povey
Browse files

trunk: some changes to online-nnet2 threaded decoding program to fix an issue...

trunk: some changes to online-nnet2 threaded decoding program to fix an issue Nagendra and Tanel noticed where endpointing does not work if simulate-realtime-decoding=false.

git-svn-id: https://svn.code.sf.net/p/kaldi/code/trunk@5016 5e6a8d80-dfce-4ca6-a32a-6e07a63d50c8
parent 38ab1f44
......@@ -119,6 +119,7 @@ SingleUtteranceNnet2DecoderThreaded::SingleUtteranceNnet2DecoderThreaded(
config_(config), am_nnet_(am_nnet), tmodel_(tmodel), sampling_rate_(0.0),
num_samples_received_(0), input_finished_(false),
feature_pipeline_(feature_info),
num_samples_discarded_(0),
silence_weighting_(tmodel, feature_info.silence_weighting_config),
decodable_(tmodel),
num_frames_decoded_(0), decoder_(fst, config_.decoder_opts),
......@@ -176,6 +177,10 @@ SingleUtteranceNnet2DecoderThreaded::~SingleUtteranceNnet2DecoderThreaded() {
delete input_waveform_.front();
input_waveform_.pop_front();
}
while (!processed_waveform_.empty()) {
delete processed_waveform_.front();
processed_waveform_.pop_front();
}
}
void SingleUtteranceNnet2DecoderThreaded::AcceptWaveform(
......@@ -200,6 +205,22 @@ void SingleUtteranceNnet2DecoderThreaded::AcceptWaveform(
waveform_synchronizer_.UnlockSuccess(ThreadSynchronizer::kProducer);
}
int32 SingleUtteranceNnet2DecoderThreaded::NumWaveformPiecesPending() {
// Note RE locking: what we really want here is just to lock the mutex. As a
// side effect, because of the way the synchronizer code works, it will also
// increment the semaphore and might wake up the consumer thread. This will
// possibly make it do a little useless work (go around a loop once), but
// won't really do any harm. Perhaps we should have implemented a version of
// the Lock function that takes no arguments.
if (!waveform_synchronizer_.Lock(ThreadSynchronizer::kProducer)) {
KALDI_ERR << "Failure locking mutex: decoding aborted.";
}
int32 ans = input_waveform_.size();
waveform_synchronizer_.UnlockSuccess(ThreadSynchronizer::kProducer);
return ans;
}
int32 SingleUtteranceNnet2DecoderThreaded::NumFramesReceivedApprox() const {
return num_samples_received_ /
(sampling_rate_ * feature_pipeline_.FrameShiftInSeconds());
......@@ -237,6 +258,55 @@ void SingleUtteranceNnet2DecoderThreaded::FinalizeDecoding() {
decoder_.FinalizeDecoding();
}
BaseFloat SingleUtteranceNnet2DecoderThreaded::GetRemainingWaveform(
Vector<BaseFloat> *waveform) const {
if (KALDI_PTHREAD_PTR(threads_[0]) != 0) {
KALDI_ERR << "It is an error to call GetRemainingWaveform before Wait().";
}
int64 num_samples_stored = 0; // number of samples we still have.
std::vector< Vector<BaseFloat>* > all_pieces;
std::deque< Vector<BaseFloat>* >::const_iterator iter;
for (iter = input_waveform_.begin(); iter != input_waveform_.end(); ++iter) {
num_samples_stored += (*iter)->Dim();
all_pieces.push_back(*iter);
}
for (iter = processed_waveform_.begin(); iter != processed_waveform_.end();
++iter) {
num_samples_stored += (*iter)->Dim();
all_pieces.push_back(*iter);
}
// put the pieces in chronological order.
std::reverse(all_pieces.begin(), all_pieces.end());
int64 samples_shift_per_frame =
sampling_rate_ * feature_pipeline_.FrameShiftInSeconds();
int64 num_samples_to_discard = samples_shift_per_frame * num_frames_decoded_;
KALDI_ASSERT(num_samples_to_discard >= num_samples_discarded_);
// num_samp_discard is how many samples we must discard from our stored
// samples.
int64 num_samp_discard = num_samples_to_discard - num_samples_discarded_,
num_samp_keep = num_samples_stored - num_samp_discard;
KALDI_ASSERT(num_samp_discard <= num_samples_stored && num_samp_keep >= 0);
waveform->Resize(num_samp_keep, kUndefined);
int32 offset = 0; // offset in output waveform. assume output waveform is no
// larger than int32.
for (size_t i = 0; i < all_pieces.size(); i++) {
Vector<BaseFloat> *this_piece = all_pieces[i];
int32 this_dim = this_piece->Dim();
if (num_samp_discard >= this_dim) {
num_samp_discard -= this_dim;
} else {
// normal case is num_samp_discard = 0.
int32 this_dim_keep = this_dim - num_samp_discard;
waveform->Range(offset, this_dim_keep).CopyFromVec(
this_piece->Range(num_samp_discard, this_dim_keep));
offset += this_dim_keep;
num_samp_discard = 0;
}
}
KALDI_ASSERT(offset == num_samp_keep && num_samp_discard == 0);
return sampling_rate_;
}
void SingleUtteranceNnet2DecoderThreaded::GetAdaptationState(
OnlineIvectorExtractorAdaptationState *adaptation_state) {
......@@ -413,11 +483,23 @@ bool SingleUtteranceNnet2DecoderThreaded::FeatureComputation(
while (num_frames_usable < config_.nnet_batch_size &&
!input_waveform_.empty()) {
feature_pipeline_.AcceptWaveform(sampling_rate_, *input_waveform_.front());
delete input_waveform_.front();
processed_waveform_.push_back(input_waveform_.front());
input_waveform_.pop_front();
num_frames_ready = feature_pipeline_.NumFramesReady();
num_frames_usable = num_frames_ready - num_frames_output;
}
// Delete already-processed pieces of waveform if we have already decoded
// those frames. (If not already decoded, we keep them around for the
// sake of GetRemainingWaveform()).
int32 samples_shift_per_frame =
sampling_rate_ * feature_pipeline_.FrameShiftInSeconds();
while (!processed_waveform_.empty() &&
num_samples_discarded_ + processed_waveform_.front()->Dim() <
samples_shift_per_frame * num_frames_decoded_) {
num_samples_discarded_ += processed_waveform_.front()->Dim();
delete processed_waveform_.front();
processed_waveform_.pop_front();
}
return waveform_synchronizer_.UnlockSuccess(ThreadSynchronizer::kConsumer);
}
}
......@@ -605,4 +687,3 @@ bool SingleUtteranceNnet2DecoderThreaded::EndpointDetected(
} // namespace kaldi
......@@ -201,11 +201,18 @@ class SingleUtteranceNnet2DecoderThreaded {
const OnlineNnet2FeaturePipelineInfo &feature_info,
const OnlineIvectorExtractorAdaptationState &adaptation_state);
/// You call this to provide this class with more waveform to decode. This
/// call is, for all practical purposes, non-blocking.
void AcceptWaveform(BaseFloat samp_freq,
const VectorBase<BaseFloat> &wave_part);
/// Returns the number of pieces of waveform that are still waiting to be
/// processed. This may be useful for calling code to judge whether to supply
/// more waveform or to wait.
int32 NumWaveformPiecesPending();
/// You call this to inform the class that no more waveform will be provided;
/// this allows it to flush out the last few frames of features, and is
/// necessary if you want to call Wait() to wait until all decoding is done.
......@@ -286,6 +293,12 @@ class SingleUtteranceNnet2DecoderThreaded {
/// InputFinished, and then Wait(). Otherwise it is an error.
void GetAdaptationState(OnlineIvectorExtractorAdaptationState *adaptation_state);
/// Gets the remaining, un-decoded part of the waveform and returns the sample
/// rate. May only be called after Wait(), and it only makes sense to call
/// this if you called TerminateDecoding() before Wait(). The idea is that
/// you can then provide this un-decoded piece of waveform to another decoder.
BaseFloat GetRemainingWaveform(Vector<BaseFloat> *waveform_out) const;
~SingleUtteranceNnet2DecoderThreaded();
private:
......@@ -349,6 +362,8 @@ class SingleUtteranceNnet2DecoderThreaded {
// sampling_rate_ is only needed for checking that it matches the config.
bool input_finished_;
std::deque< Vector<BaseFloat>* > input_waveform_;
ThreadSynchronizer waveform_synchronizer_;
// feature_pipeline_ is accessed by the nnet-evaluation thread, by the main
......@@ -358,6 +373,15 @@ class SingleUtteranceNnet2DecoderThreaded {
OnlineNnet2FeaturePipeline feature_pipeline_;
Mutex feature_pipeline_mutex_;
// The next two variables are required only for implementation of the function
// GetRemainingWaveform(). After we take waveform from the input_waveform_
// queue to be processed into features, we put them onto this deque. Then we
// discard from this queue any that we can discard because we have already
// decoded those frames (see num_frames_decoded_), and we increment
// num_samples_discarded_ by the corresponding number of samples.
std::deque< Vector<BaseFloat>* > processed_waveform_;
int64 num_samples_discarded_;
// This object is used to control the (optional) downweighting of silence in iVector estimation,
// which is based on the decoder traceback.
OnlineSilenceWeighting silence_weighting_;
......
......@@ -219,6 +219,15 @@ int main(int argc, char *argv[]) {
: samp_remaining;
SubVector<BaseFloat> wave_part(data, samp_offset, num_samp);
// The endpointing code won't work if we let the waveform be given to
// the decoder all at once, because we'll exit this while loop, and
// the endpointing happens inside this while loop. The next statement
// is intended to prevent this from happening.
while (do_endpointing &&
decoder.NumWaveformPiecesPending() * chunk_length_secs > 2.0)
Sleep(0.5);
decoder.AcceptWaveform(samp_freq, wave_part);
samp_offset += num_samp;
......
......@@ -166,6 +166,11 @@ fortran_opt = $(shell gcc -v 2>&1 | perl -e '$$x = join(" ", <STDIN>); if($$x =~
# if you want Open Blas to use multiple threads. then you could set,
# for example, OPENBLAS_NUM_THREADS=2 in your path.sh so that the
# runtime knows how many threads to use.
# Note: if you ever get the error "Program is Terminated. Because you tried to
# allocate too many memory regions.", this is because OpenBLAS has a fixed
# buffer size for the number of threads and you might have gone beyond that. It
# may possibly help to add e.g. NUM_THREADS=64 to the command line below (after
# $(MAKE)).
openblas_compiled:
-git clone git://github.com/xianyi/OpenBLAS
cd OpenBLAS; sed 's:# FCOMMON_OPT = -frecursive:FCOMMON_OPT = -frecursive:' < Makefile.rule >tmp && mv tmp Makefile.rule
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment