diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 4f7ce02da..bc76f85c9 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -6,6 +6,13 @@ # 3. Choose "Existing Azure Pipelines YAML file" and specify path to this file # 4. "More actions" > "Save" +parameters: +# Allow skipping the entire 'Build' stage +- name: runBuilds + displayName: Run builds? Uncheck to run regression tests only. + type: boolean + default: true + # The pipeline CI trigger is set on the branch master only and PR trigger on a # (non-draft) pull request to any branch trigger: @@ -45,6 +52,7 @@ stages: ###################################################################### - job: BuildWindows + condition: eq(${{ parameters.runBuilds }}, true) displayName: Windows strategy: @@ -180,6 +188,7 @@ stages: ###################################################################### - job: BuildUbuntu + condition: eq(${{ parameters.runBuilds }}, true) displayName: Ubuntu timeoutInMinutes: 90 @@ -237,17 +246,7 @@ stages: examples: true static: true ################################################################ - # Ubuntu 16.04 supports CUDA 8+ - "16.04 CUDA 9.2 gcc-7": - image: ubuntu-16.04 - boost: true - cpu: true - gpu: true - cuda: 9.2 - gcc: 7 - unit_tests: true - examples: true - static: false + # Ubuntu 16.04 is no longer available on Azure-hosted machines pool: vmImage: $(image) @@ -322,18 +321,17 @@ stages: ###################################################################### - job: BuildUbuntuMinimal - displayName: Ubuntu CPU+GPU gcc-5 cmake 3.5 + condition: eq(${{ parameters.runBuilds }}, true) + displayName: Ubuntu CPU+GPU gcc-7 cmake 3.5 pool: - vmImage: ubuntu-16.04 + vmImage: ubuntu-18.04 steps: - checkout: self submodules: true # The script simplifies installation of different versions of CUDA. - # Ubuntu 16.04 on Azure-hosted VMs have GCC 5.5 as gcc-5, which is not compatible with CUDA 9. - # Downgrading to GCC 5.4 (the default gcc on Ubuntu 16.04) would be more work... - bash: ./scripts/ci/install_cuda_ubuntu.sh "10.0" displayName: Install CUDA @@ -346,10 +344,10 @@ stages: # GCC 5 is the minimum version supported - bash: | - /usr/bin/gcc-5 --version + /usr/bin/gcc-7 --version mkdir -p build cd build - CC=/usr/bin/gcc-5 CXX=/usr/bin/g++-5 CUDAHOSTCXX=/usr/bin/g++-5 \ + CC=/usr/bin/gcc-7 CXX=/usr/bin/g++-7 CUDAHOSTCXX=/usr/bin/g++-7 \ ../cmake-3.5.1-Linux-x86_64/bin/cmake .. \ -DCOMPILE_CPU=on \ -DCUDA_TOOLKIT_ROOT_DIR=/usr/local/cuda-10.0 @@ -368,10 +366,11 @@ stages: ###################################################################### - job: BuildMacOS + condition: eq(${{ parameters.runBuilds }}, true) displayName: macOS CPU clang pool: - vmImage: macos-latest + vmImage: macos-10.15 steps: - checkout: self @@ -416,6 +415,7 @@ stages: ###################################################################### - job: BuildInstall + condition: eq(${{ parameters.runBuilds }}, true) displayName: Linux CPU library install pool: @@ -580,7 +580,7 @@ stages: # Avoid using $(Build.SourcesDirectory) in bash tasks because on Windows pools it uses '\' # instead of '/', which often breaks the job - - bash: MARIAN=../marian-dev/build bash ./run_mrt.sh '#cpu' '#basics' + - bash: MARIAN=../marian-dev/build TIMEOUT=10m bash ./run_mrt.sh '#cpu' '#basics' '#devops' continueOnError: true displayName: Run tests workingDirectory: marian-prod-tests @@ -677,7 +677,7 @@ stages: AWS_SECRET_SAS_TOKEN: $(blob-sas-token) workingDirectory: marian-prod-tests - - bash: MARIAN=../marian-dev/build bash ./run_mrt.sh '#cpu' '#basics' + - bash: MARIAN=../marian-dev/build bash ./run_mrt.sh '#cpu' '#basics' '#devops' continueOnError: true displayName: Run tests workingDirectory: marian-prod-tests diff --git a/src/common/aliases.cpp b/src/common/aliases.cpp index 36613327e..b38ccc648 100644 --- a/src/common/aliases.cpp +++ b/src/common/aliases.cpp @@ -31,8 +31,8 @@ void ConfigParser::addAliases(cli::CLIWrapper& cli) { cli.alias("fp16", "true", [&](YAML::Node& config) { if(mode_ == cli::mode::training) { config["precision"] = std::vector({"float16", "float32"}); // inference type, optimization type, save type - // scaling factor (power of 2), frequency, multiplier at increase, tolerance, range, minium factor - config["cost-scaling"] = std::vector({"0", "1000", "2", "0.05", "10", "1e-5"}); + // scaling factor, frequency, multiplier at increase, minium scaling factor + config["cost-scaling"] = std::vector({"256.f", "1000", "2.f", "256.f"}); } else { config["precision"] = std::vector({"float16"}); // for inference we do not need the other types } diff --git a/src/common/config_parser.cpp b/src/common/config_parser.cpp index 333d87a7a..ebbe4a89a 100644 --- a/src/common/config_parser.cpp +++ b/src/common/config_parser.cpp @@ -267,10 +267,16 @@ void ConfigParser::addOptionsModel(cli::CLIWrapper& cli) { "Pool encoder states instead of using cross attention (selects first encoder state, best used with special token)"); cli.add("--transformer-dim-ffn", "Size of position-wise feed-forward network (transformer)", - 2048); + 2048); + cli.add("--transformer-decoder-dim-ffn", + "Size of position-wise feed-forward network in decoder (transformer). Uses --transformer-dim-ffn if 0.", + 0); cli.add("--transformer-ffn-depth", "Depth of filters (transformer)", 2); + cli.add("--transformer-decoder-ffn-depth", + "Depth of filters in decoder (transformer). Uses --transformer-ffn-depth if 0", + 0); cli.add("--transformer-ffn-activation", "Activation between filters: swish or relu (transformer)", "swish"); @@ -528,15 +534,15 @@ void ConfigParser::addOptionsTraining(cli::CLIWrapper& cli) { // mixed precision training cli.add("--fp16", "Shortcut for mixed precision training with float16 and cost-scaling, " - "corresponds to: --precision float16 float32 --cost-scaling 0 1000 2 0.05 10 1e-5f"); + "corresponds to: --precision float16 float32 --cost-scaling 256.f 1000 2.f 256.f"); cli.add>("--precision", "Mixed precision training for forward/backward pass and optimizaton. " "Defines types for: forward/backward pass, optimization.", {"float32", "float32"}); cli.add>("--cost-scaling", "Dynamic cost scaling for mixed precision training: " - "power of 2, scaling window, scaling factor, tolerance, range, minimum factor") - ->implicit_val("0.f 1000 2.f 0.05f 10 1e-5f"); + "scaling factor, frequency, multiplier, minimum factor") + ->implicit_val("256.f 1000 2.f 256.f"); cli.add("--gradient-norm-average-window", "Window size over which the exponential average of the gradient norm is recorded (for logging and scaling). " "After this many updates about 90% of the mass of the exponential average comes from these updates", @@ -702,9 +708,10 @@ void ConfigParser::addOptionsTranslation(cli::CLIWrapper& cli) { "Use softmax shortlist: path first best prune"); cli.add>("--weights", "Scorer weights"); - cli.add("--output-sampling", - "Noise output layer with gumbel noise", - false); + cli.add>("--output-sampling", + "Noise output layer with gumbel noise. Implicit default is 'full' for sampling from full distribution. " + " Also accepts 'topk num' (e.g. topk 100) for top-100 sampling.") + ->implicit_val("full"); cli.add>("--output-approx-knn", "Use approximate knn search in output layer (currently only in transformer)") ->implicit_val("100 1024"); @@ -889,6 +896,10 @@ void ConfigParser::addSuboptionsBatching(cli::CLIWrapper& cli) { if(mode_ == cli::mode::training) { cli.add("--shuffle-in-ram", "Keep shuffled corpus in RAM, do not write to temp file"); + + cli.add("--data-threads", + "Number of concurrent threads to use during data reading and processing", 1); + // @TODO: Consider making the next two options options of the vocab instead, to make it more local in scope. cli.add("--all-caps-every", "When forming minibatches, preprocess every Nth line on the fly to all-caps. Assumes UTF-8"); @@ -907,6 +918,9 @@ void ConfigParser::addSuboptionsBatching(cli::CLIWrapper& cli) { cli.add("--mini-batch-round-up", "Round up batch size to next power of 2 for more efficient training, but this can make batch size less stable. Disable with --mini-batch-round-up=false", true); + } else { + cli.add("--data-threads", + "Number of concurrent threads to use during data reading and processing", 1); } // clang-format on } diff --git a/src/common/definitions.h b/src/common/definitions.h index 159791d09..e28ea5dcf 100644 --- a/src/common/definitions.h +++ b/src/common/definitions.h @@ -106,24 +106,24 @@ using Weak = std::weak_ptr; /** @brief Creates shared_ptr of any type, passes all arguments to any available * constructor */ template -Ptr New(Args&&... args) { - return Ptr(new T(std::forward(args)...)); +inline Ptr New(Args&&... args) { + return std::make_shared(std::forward(args)...); } template -Ptr New(Ptr p) { +inline Ptr New(Ptr p) { return Ptr(p); } /** @brief Creates InstrusivePtr of any type, passes all arguments to any available * constructor */ template -IPtr INew(Args&&... args) { +inline IPtr INew(Args&&... args) { return IPtr(new T(std::forward(args)...)); } template -IPtr INew(Ptr p) { +inline IPtr INew(Ptr p) { return IPtr(p); } diff --git a/src/common/utils.cpp b/src/common/utils.cpp index 72624041f..99fc790a2 100644 --- a/src/common/utils.cpp +++ b/src/common/utils.cpp @@ -70,22 +70,20 @@ void split(const std::string& line, // the function guarantees that the output has as many elements as requested void splitTsv(const std::string& line, std::vector& fields, size_t numFields) { fields.clear(); + fields.resize(numFields); // make sure there is as many elements as requested size_t begin = 0; size_t pos = 0; for(size_t i = 0; i < numFields; ++i) { pos = line.find('\t', begin); if(pos == std::string::npos) { - fields.push_back(line.substr(begin)); + fields[i] = line.substr(begin); break; } - fields.push_back(line.substr(begin, pos - begin)); + fields[i] = line.substr(begin, pos - begin); begin = pos + 1; } - if(fields.size() < numFields) // make sure there is as many elements as requested - fields.resize(numFields); - ABORT_IF(pos != std::string::npos, "Excessive field(s) in the tab-separated line: '{}'", line); } diff --git a/src/data/batch_generator.h b/src/data/batch_generator.h index a248db23a..ea9774682 100644 --- a/src/data/batch_generator.h +++ b/src/data/batch_generator.h @@ -2,6 +2,7 @@ #include "common/options.h" #include "common/signal_handling.h" +#include "common/timer.h" #include "data/batch_stats.h" #include "data/rng_engine.h" #include "training/training_state.h" @@ -92,6 +93,8 @@ class BatchGenerator : public RNGEngine { // this runs on a bg thread; sequencing is handled by caller, but locking is done in here std::deque fetchBatches() { + timer::Timer total; + typedef typename Sample::value_type Item; auto itemCmp = [](const Item& sa, const Item& sb) { return sa.size() < sb.size(); }; // sort by element length, not content @@ -135,19 +138,29 @@ class BatchGenerator : public RNGEngine { if(current_ != data_->end()) ++current_; } - size_t sets = 0; - while(current_ != data_->end() && maxiBatch->size() < maxSize) { // loop over data + + Samples maxiBatchTemp; + while(current_ != data_->end() && maxiBatchTemp.size() < maxSize) { // loop over data if (saveAndExitRequested()) // stop generating batches return std::deque(); - maxiBatch->push(*current_); - sets = current_->size(); + + maxiBatchTemp.push_back(*current_); + // do not consume more than required for the maxi batch as this causes // that line-by-line translation is delayed by one sentence - bool last = maxiBatch->size() == maxSize; + bool last = maxiBatchTemp.size() == maxSize; if(!last) ++current_; // this actually reads the next line and pre-processes it } - size_t numSentencesRead = maxiBatch->size(); + size_t numSentencesRead = maxiBatchTemp.size(); + + size_t sets = 0; + for(auto&& s : maxiBatchTemp) { + if(!s.empty()) { + sets = s.size(); + maxiBatch->push(s); + } + } // construct the actual batches and place them in the queue Samples batchVector; @@ -163,6 +176,7 @@ class BatchGenerator : public RNGEngine { BatchStats::const_iterator cachedStatsIter; if (stats_) cachedStatsIter = stats_->begin(); + while(!maxiBatch->empty()) { // while there are sentences in the queue if (saveAndExitRequested()) // stop generating batches return std::deque(); @@ -178,12 +192,7 @@ class BatchGenerator : public RNGEngine { lengths[i] = batchVector.back()[i].size(); // record max lengths so far maxBatchSize = stats_->findBatchSize(lengths, cachedStatsIter); - // this optimization makes no difference indeed -#if 0 // sanity check: would we find the same entry if searching from the start? - auto it = stats_->lower_bound(lengths); - auto maxBatchSize1 = stats_->findBatchSize(lengths, it); - ABORT_IF(maxBatchSize != maxBatchSize1, "findBatchSize iter caching logic is borked"); -#endif + makeBatch = batchVector.size() >= maxBatchSize; // if last added sentence caused a bump then we likely have bad padding, so rather move it into the next batch if(batchVector.size() > maxBatchSize) { @@ -231,6 +240,8 @@ class BatchGenerator : public RNGEngine { LOG(debug, "[data] fetched {} batches with {} sentences. Per batch: {} sentences, {} labels.", tempBatches.size(), numSentencesRead, (double)totalSent / (double)totalDenom, (double)totalLabels / (double)totalDenom); + LOG(debug, "[data] fetching batches took {:.2f} seconds, {:.2f} sents/s", total.elapsed(), (double)numSentencesRead / total.elapsed()); + return tempBatches; } diff --git a/src/data/corpus.cpp b/src/data/corpus.cpp index d8a364b2e..643a7de93 100644 --- a/src/data/corpus.cpp +++ b/src/data/corpus.cpp @@ -14,18 +14,30 @@ namespace data { Corpus::Corpus(Ptr options, bool translate /*= false*/, size_t seed /*= Config:seed*/) : CorpusBase(options, translate, seed), - shuffleInRAM_(options_->get("shuffle-in-ram", false)), - allCapsEvery_(options_->get("all-caps-every", 0)), - titleCaseEvery_(options_->get("english-title-case-every", 0)) {} + shuffleInRAM_(options_->get("shuffle-in-ram", false)), + allCapsEvery_(options_->get("all-caps-every", 0)), + titleCaseEvery_(options_->get("english-title-case-every", 0)) { + + auto numThreads = options_->get("data-threads", 1); + if(numThreads > 1) + threadPool_.reset(new ThreadPool(numThreads)); + +} Corpus::Corpus(std::vector paths, std::vector> vocabs, Ptr options, size_t seed /*= Config:seed*/) : CorpusBase(paths, vocabs, options, seed), - shuffleInRAM_(options_->get("shuffle-in-ram", false)), - allCapsEvery_(options_->get("all-caps-every", 0)), - titleCaseEvery_(options_->get("english-title-case-every", 0)) {} + shuffleInRAM_(options_->get("shuffle-in-ram", false)), + allCapsEvery_(options_->get("all-caps-every", 0)), + titleCaseEvery_(options_->get("english-title-case-every", 0)) { + + auto numThreads = options_->get("data-threads", 1); + if(numThreads > 1) + threadPool_.reset(new ThreadPool(numThreads)); + +} void Corpus::preprocessLine(std::string& line, size_t streamId, bool& altered) { bool isFactoredVocab = vocabs_.back()->tryAs() != nullptr; @@ -52,16 +64,10 @@ void Corpus::preprocessLine(std::string& line, size_t streamId, bool& altered) { } SentenceTuple Corpus::next() { - // Used for handling TSV inputs - // Determine the total number of fields including alignments or weights - auto tsvNumAllFields = tsvNumInputFields_; - if(alignFileIdx_ > -1) - ++tsvNumAllFields; - if(weightFileIdx_ > -1) - ++tsvNumAllFields; - std::vector fields(tsvNumAllFields); - - for(;;) { // (this is a retry loop for skipping invalid sentences) + size_t numStreams = corpusInRAM_.empty() ? files_.size() : corpusInRAM_.size(); + std::vector fields(numStreams); + + while(true) { // retry loop // get index of the current sentence size_t curId = pos_; // note: at end, pos_ == total size // if corpus has been shuffled, ids_ contains sentence indexes @@ -69,83 +75,91 @@ SentenceTuple Corpus::next() { curId = ids_[pos_]; pos_++; - // fill up the sentence tuple with sentences from all input files - SentenceTuple tup(curId); size_t eofsHit = 0; - size_t numStreams = corpusInRAM_.empty() ? files_.size() : corpusInRAM_.size(); - for(size_t i = 0; i < numStreams; ++i) { - std::string line; - + for(size_t i = 0; i < numStreams; ++i) { // looping of all streams // fetch line, from cached copy in RAM or actual file if (!corpusInRAM_.empty()) { if (curId < corpusInRAM_[i].size()) - line = corpusInRAM_[i][curId]; + fields[i] = corpusInRAM_[i][curId]; else { eofsHit++; continue; } } else { - bool gotLine = io::getline(*files_[i], line).good(); + bool gotLine = io::getline(*files_[i], fields[i]).good(); if(!gotLine) { eofsHit++; continue; } } + } - if(i > 0 && i == alignFileIdx_) { - addAlignmentToSentenceTuple(line, tup); - } else if(i > 0 && i == weightFileIdx_) { - addWeightsToSentenceTuple(line, tup); - } else { - if(tsv_) { // split TSV input and add each field into the sentence tuple - utils::splitTsv(line, fields, tsvNumAllFields); - size_t shift = 0; - for(size_t j = 0; j < tsvNumAllFields; ++j) { - // index j needs to be shifted to get the proper vocab index if guided-alignment or - // data-weighting are preceding source or target sequences in TSV input - if(j == alignFileIdx_ || j == weightFileIdx_) { - ++shift; - } else { - size_t vocabId = j - shift; - bool altered; - preprocessLine(fields[j], vocabId, /*out=*/altered); - if (altered) - tup.markAltered(); - addWordsToSentenceTuple(fields[j], vocabId, tup); - } - } - - // weights are added last to the sentence tuple, because this runs a validation that needs - // length of the target sequence - if(alignFileIdx_ > -1) - addAlignmentToSentenceTuple(fields[alignFileIdx_], tup); - if(weightFileIdx_ > -1) - addWeightsToSentenceTuple(fields[weightFileIdx_], tup); + if(eofsHit == numStreams) + return SentenceTuple(); // unintialized SentenceTuple which will be invalid when tested + ABORT_IF(eofsHit != 0, "not all input files have the same number of lines"); + + auto makeSentenceTuple = [this](size_t curId, std::vector fields) { + if(tsv_) { + // with tsv inputs data, there is only one input stream, hence we only have one field + // which needs to be tokenized into tab-separated fields + ABORT_IF(fields.size() != 1, "Reading TSV file, but we have don't have exactly one stream??"); + size_t numAllFields = tsvNumInputFields_; + if(alignFileIdx_ > -1) + ++numAllFields; + if(weightFileIdx_ > -1) + ++numAllFields; + // replace single-element fields array with extracted tsv fields + std::vector tmpFields; + utils::splitTsv(fields[0], tmpFields, numAllFields); // this verifies the number of fields + fields.swap(tmpFields); + } + + // fill up the sentence tuple with sentences from all input files + SentenceTupleImpl tup(curId); + size_t shift = 0; + for(size_t i = 0; i < fields.size(); ++i) { + // index j needs to be shifted to get the proper vocab index if guided-alignment or + // data-weighting are preceding source or target sequences in TSV input + if(i == alignFileIdx_ || i == weightFileIdx_) { + ++shift; } else { + size_t vocabId = i - shift; bool altered; - preprocessLine(line, i, /*out=*/altered); + preprocessLine(fields[i], vocabId, /*out=*/altered); if (altered) tup.markAltered(); - addWordsToSentenceTuple(line, i, tup); + addWordsToSentenceTuple(fields[i], vocabId, tup); } + + // weights are added last to the sentence tuple, because this runs a validation that needs + // length of the target sequence + if(alignFileIdx_ > -1) + addAlignmentToSentenceTuple(fields[alignFileIdx_], tup); + if(weightFileIdx_ > -1) + addWeightsToSentenceTuple(fields[weightFileIdx_], tup); } - } - - if (eofsHit == numStreams) - return SentenceTuple(0); - ABORT_IF(eofsHit != 0, "not all input files have the same number of lines"); - // check if all streams are valid, that is, non-empty and no longer than maximum allowed length - if(std::all_of(tup.begin(), tup.end(), [=](const Words& words) { - return words.size() > 0 && words.size() <= maxLength_; - })) - return tup; + // check if all streams are valid, that is, non-empty and no longer than maximum allowed length + if(std::all_of(tup.begin(), tup.end(), [=](const Words& words) { + return words.size() > 0 && words.size() <= maxLength_; + })) { + return tup; + } else { + return SentenceTupleImpl(); // return an empty tuple if above test does not pass + } + }; + + if(threadPool_) { // use thread pool if available + return SentenceTuple(threadPool_->enqueue(makeSentenceTuple, curId, fields)); + } else { // otherwise launch here and just pass the result into the wrapper + auto tup = makeSentenceTuple(curId, fields); + if(!tup.empty()) + return SentenceTuple(tup); + } - // otherwise skip this sentence and try the next one - // @TODO: tail recursion? - } + } // end of retry loop } // reset and initialize shuffled reading @@ -167,6 +181,8 @@ void Corpus::reset() { pos_ = 0; for (size_t i = 0; i < paths_.size(); ++i) { if(paths_[i] == "stdin" || paths_[i] == "-") { + std::cin.tie(0); + std::ios_base::sync_with_stdio(false); files_[i].reset(new std::istream(std::cin.rdbuf())); // Probably not necessary, unless there are some buffers // that we want flushed. diff --git a/src/data/corpus.h b/src/data/corpus.h index e8e9a9fdb..281d43a22 100644 --- a/src/data/corpus.h +++ b/src/data/corpus.h @@ -4,6 +4,7 @@ #include #include +#include "3rd_party/threadpool.h" #include "common/definitions.h" #include "common/file_stream.h" #include "common/options.h" @@ -20,6 +21,8 @@ class Corpus : public CorpusBase { private: std::vector> tempFiles_; std::vector ids_; + + UPtr threadPool_; // thread pool for parallelized data reading // for shuffle-in-ram bool shuffleInRAM_{false}; diff --git a/src/data/corpus_base.cpp b/src/data/corpus_base.cpp index 9d95a1214..20301103d 100644 --- a/src/data/corpus_base.cpp +++ b/src/data/corpus_base.cpp @@ -12,7 +12,24 @@ typedef std::vector MaskBatch; typedef std::pair WordMask; typedef std::vector SentBatch; -CorpusIterator::CorpusIterator() : pos_(-1), tup_(0) {} +void SentenceTupleImpl::setWeights(const std::vector& weights) { + if(weights.size() != 1) { // this assumes a single sentence-level weight is always fine + ABORT_IF(empty(), "Source and target sequences should be added to a tuple before data weights"); + auto numWeights = weights.size(); + auto numTrgWords = back().size(); + // word-level weights may or may not contain a weight for EOS tokens + if(numWeights != numTrgWords && numWeights != numTrgWords - 1) + LOG(warn, + "[warn] " + "Number of weights ({}) does not match the number of target words ({}) in line #{}", + numWeights, + numTrgWords, + id_); + } + weights_ = weights; +} + +CorpusIterator::CorpusIterator() : pos_(-1) {} CorpusIterator::CorpusIterator(CorpusBase* corpus) : corpus_(corpus), pos_(0), tup_(corpus_->next()) {} @@ -23,7 +40,7 @@ void CorpusIterator::increment() { } bool CorpusIterator::equal(CorpusIterator const& other) const { - return this->pos_ == other.pos_ || (this->tup_.empty() && other.tup_.empty()); + return this->pos_ == other.pos_ || (!this->tup_.valid() && !other.tup_.valid()); } const SentenceTuple& CorpusIterator::dereference() const { @@ -390,7 +407,7 @@ CorpusBase::CorpusBase(Ptr options, bool translate, size_t seed) void CorpusBase::addWordsToSentenceTuple(const std::string& line, size_t batchIndex, - SentenceTuple& tup) const { + SentenceTupleImpl& tup) const { // This turns a string in to a sequence of numerical word ids. Depending // on the vocabulary type, this can be non-trivial, e.g. when SentencePiece // is used. @@ -411,7 +428,7 @@ void CorpusBase::addWordsToSentenceTuple(const std::string& line, } void CorpusBase::addAlignmentToSentenceTuple(const std::string& line, - SentenceTuple& tup) const { + SentenceTupleImpl& tup) const { ABORT_IF(rightLeft_, "Guided alignment and right-left model cannot be used " "together at the moment"); @@ -420,7 +437,7 @@ void CorpusBase::addAlignmentToSentenceTuple(const std::string& line, tup.setAlignment(align); } -void CorpusBase::addWeightsToSentenceTuple(const std::string& line, SentenceTuple& tup) const { +void CorpusBase::addWeightsToSentenceTuple(const std::string& line, SentenceTupleImpl& tup) const { auto elements = utils::split(line, " "); if(!elements.empty()) { @@ -549,6 +566,7 @@ size_t CorpusBase::getNumberOfTSVInputFields(Ptr options) { return 0; } +<<<<<<< HEAD void SentenceTuple::setWeights(const std::vector& weights) { if(weights.size() != 1) { // this assumes a single sentence-level weight is always fine ABORT_IF(empty(), "Source and target sequences should be added to a tuple before data weights"); @@ -564,6 +582,55 @@ void SentenceTuple::setWeights(const std::vector& weights) { id_); } weights_ = weights; +======= +// experimental: hide inline-fix source tokens from cross attention +std::vector SubBatch::crossMaskWithInlineFixSourceSuppressed() const +{ + const auto& srcVocab = *vocab(); + + auto factoredVocab = vocab()->tryAs(); + size_t inlineFixGroupIndex = 0, inlineFixSrc = 0; + auto hasInlineFixFactors = factoredVocab && factoredVocab->tryGetFactor(FactoredVocab_INLINE_FIX_WHAT_serialized, /*out*/ inlineFixGroupIndex, /*out*/ inlineFixSrc); + + auto fixSrcId = srcVocab[FactoredVocab_FIX_SRC_ID_TAG]; + auto fixTgtId = srcVocab[FactoredVocab_FIX_TGT_ID_TAG]; + auto fixEndId = srcVocab[FactoredVocab_FIX_END_ID_TAG]; + auto unkId = srcVocab.getUnkId(); + auto hasInlineFixTags = fixSrcId != unkId && fixTgtId != unkId && fixEndId != unkId; + + auto m = mask(); // default return value, which we will modify in-place below in case we need to + if (hasInlineFixFactors || hasInlineFixTags) { + LOG_ONCE(info, "[data] Suppressing cross-attention into inline-fix source tokens"); + + // example: force French translation of name "frank" to always be "franck" + // - hasInlineFixFactors: "frank|is franck|it", "frank|is" cannot be cross-attended to + // - hasInlineFixTags: " frank franck ", "frank" and all tags cannot be cross-attended to + auto dimBatch = batchSize(); // number of sentences in the batch + auto dimWidth = batchWidth(); // number of words in the longest sentence in the batch + const auto& d = data(); + size_t numWords = 0; + for (size_t b = 0; b < dimBatch; b++) { // loop over batch entries + bool inside = false; + for (size_t s = 0; s < dimWidth; s++) { // loop over source positions + auto i = locate(/*batchIdx=*/b, /*wordPos=*/s); + if (!m[i]) + break; + numWords++; + // keep track of entering/exiting the inline-fix source tags + auto w = d[i]; + if (w == fixSrcId) + inside = true; + else if (w == fixTgtId) + inside = false; + bool wHasSrcIdFactor = hasInlineFixFactors && factoredVocab->getFactor(w, inlineFixGroupIndex) == inlineFixSrc; + if (inside || w == fixSrcId || w == fixTgtId || w == fixEndId || wHasSrcIdFactor) + m[i] = 0.0f; // decoder must not look at embedded source, nor the markup tokens + } + } + ABORT_IF(batchWords() != 0/*n/a*/ && numWords != batchWords(), "batchWords() inconsistency??"); + } + return m; +>>>>>>> master } } // namespace data diff --git a/src/data/corpus_base.h b/src/data/corpus_base.h index d504a7ea3..a54c20f88 100644 --- a/src/data/corpus_base.h +++ b/src/data/corpus_base.h @@ -11,6 +11,8 @@ #include "data/rng_engine.h" #include "data/vocab.h" +#include + namespace marian { namespace data { @@ -22,7 +24,7 @@ namespace data { * construction of marian::data::CorpusBatch objects. They are not a part of * marian::data::CorpusBatch. */ -class SentenceTuple { +class SentenceTupleImpl { private: size_t id_; std::vector tuple_; // [stream index][step index] @@ -33,12 +35,17 @@ class SentenceTuple { public: typedef Words value_type; + /** + * @brief Creates an empty tuple with 0 id (default constructor). + */ + SentenceTupleImpl() : id_(0) {} + /** * @brief Creates an empty tuple with the given Id. */ - SentenceTuple(size_t id) : id_(id) {} + SentenceTupleImpl(size_t id) : id_(id) {} - ~SentenceTuple() { tuple_.clear(); } + ~SentenceTupleImpl() {} /** * @brief Returns the sentence's ID. @@ -114,6 +121,92 @@ class SentenceTuple { void setAlignment(const WordAlignment& alignment) { alignment_ = alignment; } }; +class SentenceTuple { +private: + std::shared_ptr> fImpl_; + mutable std::shared_ptr impl_; + +public: + typedef Words value_type; + + /** + * @brief Creates an empty tuple with no associated future. + */ + SentenceTuple() {} + + SentenceTuple(const SentenceTupleImpl& tupImpl) + : impl_(std::make_shared(tupImpl)) {} + + SentenceTuple(std::future&& fImpl) + : fImpl_(new std::future(std::move(fImpl))) {} + + SentenceTupleImpl& get() const { + if(!impl_) { + ABORT_IF(!fImpl_ || !fImpl_->valid(), "No future tuple associated with SentenceTuple"); + impl_ = std::make_shared(fImpl_->get()); + } + return *impl_; + } + + /** + * @brief Returns the sentence's ID. + */ + size_t getId() const { return get().getId(); } + + /** + * @brief Returns whether this Tuple was altered or augmented from what + * was provided to Marian in input. + */ + bool isAltered() const { return get().isAltered(); } + + /** + * @brief The size of the tuple, e.g. two for parallel data with a source and + * target sentences. + */ + size_t size() const { return get().size(); } + + /** + * @brief confirms that the tuple has been populated with data + */ + bool valid() const { + return fImpl_ || impl_; + } + + /** + * @brief The i-th tuple sentence. + * + * @param i Tuple's index. + */ + Words& operator[](size_t i) { return get()[i]; } + const Words& operator[](size_t i) const { return get()[i]; } + + /** + * @brief The last tuple sentence, i.e. the target sentence. + */ + Words& back() { return get().back(); } + const Words& back() const { return get().back(); } + + /** + * @brief Checks whether the tuple is empty. + */ + bool empty() const { return get().empty(); } + + auto begin() const -> decltype(get().begin()) { return get().begin(); } + auto end() const -> decltype(get().end()) { return get().end(); } + + auto rbegin() const -> decltype(get().rbegin()) { return get().rbegin(); } + auto rend() const -> decltype(get().rend()) { return get().rend(); } + + /** + * @brief Get sentence weights. + * + * For sentence-level weights the vector contains only one element. + */ + const std::vector& getWeights() const { return get().getWeights(); } + + const WordAlignment& getAlignment() const { return get().getAlignment(); } +}; + /** * @brief Batch of sentences represented as word indices with masking. */ @@ -583,17 +676,17 @@ class CorpusBase : public DatasetBase batch, const std::vector& batchVector); diff --git a/src/data/corpus_nbest.cpp b/src/data/corpus_nbest.cpp index d5a48d8df..8029d3516 100644 --- a/src/data/corpus_nbest.cpp +++ b/src/data/corpus_nbest.cpp @@ -43,7 +43,7 @@ SentenceTuple CorpusNBest::next() { pos_++; // fill up the sentence tuple with sentences from all input files - SentenceTuple tup(curId); + SentenceTupleImpl tup(curId); std::string line; lastLines_.resize(files_.size() - 1); @@ -74,9 +74,10 @@ SentenceTuple CorpusNBest::next() { if(cont && std::all_of(tup.begin(), tup.end(), [=](const Words& words) { return words.size() > 0 && words.size() <= maxLength_; })) - return tup; + return SentenceTuple(tup); } - return SentenceTuple(0); + + return SentenceTuple(); } void CorpusNBest::reset() { diff --git a/src/data/corpus_sqlite.cpp b/src/data/corpus_sqlite.cpp index 297847c04..f7c577f29 100644 --- a/src/data/corpus_sqlite.cpp +++ b/src/data/corpus_sqlite.cpp @@ -109,7 +109,7 @@ SentenceTuple CorpusSQLite::next() { while(select_->executeStep()) { // fill up the sentence tuple with sentences from all input files size_t curId = select_->getColumn(0).getInt(); - SentenceTuple tup(curId); + SentenceTupleImpl tup(curId); for(size_t i = 0; i < files_.size(); ++i) { auto line = select_->getColumn((int)(i + 1)); @@ -126,9 +126,9 @@ SentenceTuple CorpusSQLite::next() { if(std::all_of(tup.begin(), tup.end(), [=](const Words& words) { return words.size() > 0 && words.size() <= maxLength_; })) - return tup; + return SentenceTuple(tup); } - return SentenceTuple(0); + return SentenceTuple(); } void CorpusSQLite::shuffle() { diff --git a/src/data/sentencepiece_vocab.cpp b/src/data/sentencepiece_vocab.cpp index 090d478b2..8f774c2bb 100644 --- a/src/data/sentencepiece_vocab.cpp +++ b/src/data/sentencepiece_vocab.cpp @@ -236,18 +236,20 @@ class SentencePieceVocab : public IVocab { return words; } - std::string decode(const Words& sentence, bool /*ignoreEOS*/) const override { + std::string decode(const Words& sentence, bool ignoreEOS) const override { std::string line; if(keepEncoded_) { // i.e. keep the sentence segmented into subword units for(const Word& id : sentence) - line += (*this)[id] + " "; + if(!ignoreEOS || id != getEosId()) + line += (*this)[id] + " "; line.pop_back(); // trim the trailing whitespace } else { // convert vector of Word to vector of int std::vector spmSentence; spmSentence.reserve(sentence.size()); for(auto&& word : sentence) - spmSentence.push_back(word.toWordIndex()); + if(!ignoreEOS || word != getEosId()) + spmSentence.push_back(word.toWordIndex()); spm_->Decode(spmSentence, &line); } return line; diff --git a/src/data/text_input.cpp b/src/data/text_input.cpp index 958190fce..b1f4cdd47 100644 --- a/src/data/text_input.cpp +++ b/src/data/text_input.cpp @@ -40,7 +40,7 @@ SentenceTuple TextInput::next() { size_t curId = pos_++; // fill up the sentence tuple with source and/or target sentences - SentenceTuple tup(curId); + SentenceTupleImpl tup(curId); for(size_t i = 0; i < files_.size(); ++i) { std::string line; if(io::getline(*files_[i], line)) { @@ -57,9 +57,9 @@ SentenceTuple TextInput::next() { } if(tup.size() == files_.size()) // check if each input file provided an example - return tup; + return SentenceTuple(tup); else if(tup.size() == 0) // if no file provided examples we are done - return SentenceTuple(0); + return SentenceTuple(); else // neither all nor none => we have at least on missing entry ABORT("There are missing entries in the text tuples."); } diff --git a/src/graph/expression_operators.cpp b/src/graph/expression_operators.cpp index 322a29ad0..5294fca3f 100644 --- a/src/graph/expression_operators.cpp +++ b/src/graph/expression_operators.cpp @@ -357,6 +357,13 @@ Expr gather(Expr a, int axis, Expr indices) { return Expression(a, axis, indices); } +// scatter() -- scatter arbitrary elements along an axis; batched or non-batched +// This is the reverse operation to gather. +Expr scatter(Expr a, int axis, Expr indices, Expr source) { + return Expression(a, axis, indices, source); +} + + // index_select() -- gather arbitrary elements along an axis from an unbatched // input 'a'. Indices are specified as a 1D vector. // This is used e.g. for embedding lookup. diff --git a/src/graph/expression_operators.h b/src/graph/expression_operators.h index dc756c7d6..1e98047f9 100644 --- a/src/graph/expression_operators.h +++ b/src/graph/expression_operators.h @@ -687,10 +687,23 @@ Expr stopGradient(Expr a); * @param indices The indices to be gathered * @returns Gathered expression with the same shape as @p indices * @note @p a and @p indices must have the same rank - * @note The non-target axes of @p a and @p indicies must have the same size, or be broadcastable. + * @note The non-target axes of @p a and @p indices must have the same size, or be broadcastable. */ Expr gather(Expr a, int axis, Expr indices); +/** + * Scatter elements from source along an axis into a. Unindexed elements from a remain unchanged. + * This is the reverse operation to gather. + * @param a The input expression + * @param axis The axis along which to index + * @param indices The indices to be scattered + * @param source Expression with values to scatter. + * @returns Scattered expression with the same shape as @p a now containing values from @p source in positions @p indices + * @note @p source and @p indices must have the same rank + * @note In this version @p source and @p indicies must have the same shape + */ +Expr scatter(Expr a, int axis, Expr indices, Expr source); + #if 0 // reverse operation to gather. a is expression into with values from b are inserted and positions indices along axis. // with broadcasting diff --git a/src/graph/node_operators_binary.h b/src/graph/node_operators_binary.h index a180bb5c8..b2a646b1c 100644 --- a/src/graph/node_operators_binary.h +++ b/src/graph/node_operators_binary.h @@ -1033,12 +1033,14 @@ struct GatherNodeOp : public NaryNodeOp { NodeOps forwardOps() override { return {NodeOp( + // @TODO: rename to gather Select(val_, child(0)->val(), child(1)->val(), axis_))}; } NodeOps backwardOps() override { return {NodeOp( - Insert(child(0)->grad(), adj_, child(1)->val(), axis_))}; + // @TODO: rename to scatter + Insert(child(0)->grad(), adj_, child(1)->val(), axis_))}; } Shape newShape(Expr a, int axis, Expr indices) { @@ -1046,7 +1048,6 @@ struct GatherNodeOp : public NaryNodeOp { axis = shape.axis(axis); auto rank = shape.size(); ABORT_IF(rank != indices->shape().size(), "Mismatching ranks for input ({}) and indices ({})", std::string(shape), std::string(indices->shape())); - axis = a->shape().axis(axis); shape.set(axis, indices->shape()[axis]); for (size_t i = 0; i < rank; ++i) { if (i != axis) { @@ -1086,6 +1087,62 @@ struct GatherNodeOp : public NaryNodeOp { int axis_; }; +struct ScatterNodeOp : public NaryNodeOp { + ScatterNodeOp(Expr a, int axis, Expr indices, Expr source) + : NaryNodeOp({a, indices, source}, newShape(a, axis, indices, source), a->value_type()), + axis_(a->shape().axis(axis)) { + matchOrAbort(indices->value_type()); + } + + NodeOps forwardOps() override { + return {NodeOp( + CopyCast(val_, child(0)->val()); // @TODO: use normal copy + Insert(val_, child(2)->val(), child(1)->val(), axis_) + )}; + } + + NodeOps backwardOps() override { + ABORT("backward for ScatterNodeOp not yet implemented"); + } + + Shape newShape(Expr a, int axis, Expr indices, Expr source) { + ABORT_IF(axis != -1, "only last dimensions"); + ABORT_IF(indices->shape() != source->shape(), "Shapes must match"); + + Shape shape = a->shape(); + // @TODO: do proper checking + return shape; + } + + const std::string type() override { return "scatter"; } + + const std::string color() override { return "orange"; } + + virtual size_t hash() override { + if(!hash_) { + size_t seed = NaryNodeOp::hash(); + util::hash_combine(seed, axis_); + hash_ = seed; + } + return hash_; + } + + virtual bool equal(Expr node) override { + if(!NaryNodeOp::equal(node)) + return false; + auto cnode = std::dynamic_pointer_cast(node); + if(!cnode) + return false; + if(axis_ != cnode->axis_) + return false; + return true; + } + +private: + friend class SerializationHelpers; + int axis_; +}; + struct ColsNodeOp : public NaryNodeOp { ColsNodeOp(Expr a, Expr indices) : NaryNodeOp({a, indices}, newShape(a, indices), a->value_type()) { diff --git a/src/graph/node_operators_tuple.h b/src/graph/node_operators_tuple.h index c7a9531a1..8acb1bc83 100644 --- a/src/graph/node_operators_tuple.h +++ b/src/graph/node_operators_tuple.h @@ -133,7 +133,7 @@ struct TopKNodeOp : public UnaryNodeOp, } void backward() override { - Insert(/*out*/child(0)->grad(), adj_, val_, axis_); + Insert(/*out*/child(0)->grad(), adj_, val_, axis_); } const std::string type() override { return "topk"; } diff --git a/src/layers/output.cpp b/src/layers/output.cpp index 4d6e488a4..efff58df4 100644 --- a/src/layers/output.cpp +++ b/src/layers/output.cpp @@ -309,14 +309,24 @@ Logits Output::applyAsLogits(Expr input) /*override final*/ { } return Logits(std::move(allLogits), factoredVocab_); } else if(shortlist_) { - return Logits(affineOrDot(input, - shortlist_->getCachedShortWt(), - shortlist_->getCachedShortb(), + const Shape &inputShape = input->shape(); + assert(inputShape[1] == 1); // time dimension always 1 for decoding + input = reshape(input, {inputShape[0], inputShape[2], 1, inputShape[3]}); + + Expr Wt = shortlist_->getCachedShortWt(); + Expr b = shortlist_->getCachedShortb(); + Expr ret = affineShortlist(input, + Wt, + b, false, - /*transB=*/isLegacyUntransposedW ? false : true)); + /*transB=*/isLegacyUntransposedW ? false : true); + const Shape &retShape = ret->shape(); + assert(retShape[2] == 1); // time dimension always 1 for decoding + ret = reshape(ret, {retShape[0], 1, retShape[1], retShape[3]}); + return Logits(ret); } else { - return Logits( - affineOrDot(input, Wt_, b_, false, /*transB=*/isLegacyUntransposedW ? false : true)); + Expr ret = affineOrDot(input, Wt_, b_, false, /*transB=*/isLegacyUntransposedW ? false : true); + return Logits(ret); } } diff --git a/src/models/costs.cpp b/src/models/costs.cpp index c688b2119..4b15bcb36 100644 --- a/src/models/costs.cpp +++ b/src/models/costs.cpp @@ -10,5 +10,40 @@ Ptr LogSoftmaxStep::apply(Ptr state) { return state; } +Ptr GumbelSoftmaxStep::apply(Ptr state) { + state->setLogProbs(state->getLogProbs().applyUnaryFunctions( + [](Expr logits) { // lemma gets gumbelled + return logsoftmax(logits + constant_like(logits, inits::gumbel())); + }, + logsoftmax)); // factors don't + return state; +} + +TopkGumbelSoftmaxStep::TopkGumbelSoftmaxStep(int k) : k_{k} {} + +Ptr TopkGumbelSoftmaxStep::apply(Ptr state) { + state->setLogProbs(state->getLogProbs().applyUnaryFunctions( + [=](Expr logits) { // lemma gets gumbelled + // create logits-sized tensor consisting only of invalid path scores + float invalidPathScore = NumericLimits(logits->value_type()).lowest; + Expr invalidLogits = constant_like(logits, inits::fromValue(invalidPathScore)); + + // select top-k values + Expr val, idx; + std::tie(val, idx) = topk(logits, k_, /*axis=*/-1, /*descending=*/true); + + // uncomment below to display probability mass in top-k selection + // debug(sum(gather(softmax(logits), -1, idx), -1), "sum"); + + // Add Gumbel noise to top-k values only and compute logsoftmax, used for argmax sampling later in beam-search + Expr gumbelVal = logsoftmax(val + constant_like(val, inits::gumbel())); + + // Scatter gumbelled values back into logits to fill with usable values + return scatter(invalidLogits, -1, idx, gumbelVal); + }, + logsoftmax)); // factors don't + return state; +} + } // namespace models } // namespace marian diff --git a/src/models/costs.h b/src/models/costs.h index 982a13c57..9bb2b1039 100644 --- a/src/models/costs.h +++ b/src/models/costs.h @@ -297,20 +297,30 @@ class LogSoftmaxStep : public ILogProbStep { virtual Ptr apply(Ptr state) override; }; -// Gumbel-max noising for sampling during beam-search -// Seems to work well enough with beam-size=1. Turn on -// with --output-sampling during translation with marian-decoder +// Gumbel-max noising for sampling during translation. +// Produces accurate sampling with beam=1. Turn on +// with --output-sampling [full] during translation +// with marian-decoder for samnpling from the full +// softmax distribution. class GumbelSoftmaxStep : public ILogProbStep { public: virtual ~GumbelSoftmaxStep() {} - virtual Ptr apply(Ptr state) override { - state->setLogProbs(state->getLogProbs().applyUnaryFunctions( - [](Expr logits) { // lemma gets gumbelled - return logsoftmax(logits + constant_like(logits, inits::gumbel())); - }, - logsoftmax)); // factors don't - return state; - } + virtual Ptr apply(Ptr state) override; +}; + + +// Gumbel-max noising for top-k sampling during translation. +// Produces accurate sampling with beam=1. Turn on +// with --output-sampling topk [10] during translation +// with marian-decoder for top-10 sampling. +class TopkGumbelSoftmaxStep : public ILogProbStep { +private: + int k_{1}; + +public: + TopkGumbelSoftmaxStep(int k); + virtual ~TopkGumbelSoftmaxStep() {} + virtual Ptr apply(Ptr state) override; }; // class to wrap an IEncoderDecoder and a ILogProbStep that are executed in sequence, diff --git a/src/models/encoder_decoder.cpp b/src/models/encoder_decoder.cpp index 5711ea1b8..a6f4dd3dc 100644 --- a/src/models/encoder_decoder.cpp +++ b/src/models/encoder_decoder.cpp @@ -38,7 +38,9 @@ EncoderDecoder::EncoderDecoder(Ptr graph, Ptr options) modelFeatures_.insert("transformer-heads"); modelFeatures_.insert("transformer-no-projection"); modelFeatures_.insert("transformer-dim-ffn"); + modelFeatures_.insert("transformer-decoder-dim-ffn"); modelFeatures_.insert("transformer-ffn-depth"); + modelFeatures_.insert("transformer-decoder-ffn-depth"); modelFeatures_.insert("transformer-ffn-activation"); modelFeatures_.insert("transformer-dim-aan"); modelFeatures_.insert("transformer-aan-depth"); diff --git a/src/models/model_factory.cpp b/src/models/model_factory.cpp index e176e6a4c..52a87e72a 100644 --- a/src/models/model_factory.cpp +++ b/src/models/model_factory.cpp @@ -370,10 +370,25 @@ Ptr createModelFromOptions(Ptr options, usage use) { // add (log)softmax if requested if (use == usage::translation) { if(std::dynamic_pointer_cast(baseModel)) { - if(options->get("output-sampling", false)) - return New(std::dynamic_pointer_cast(baseModel), New()); - else + if(options->hasAndNotEmpty("output-sampling")) { + auto sampling = options->get>("output-sampling", {}); + std::string method = sampling.size() > 0 ? sampling[0] : "full"; + + if(method == "full" || method == "1" /*for backwards-compat when output-sampling: true in yaml file*/) { + LOG(info, "Output sampling from the full softmax distribution"); + return New(std::dynamic_pointer_cast(baseModel), New()); + } else if(method == "topk") { + int k = sampling.size() > 1 ? std::stoi(sampling[1]) : 10; + if(k == 1) + LOG(info, "Output sampling with k=1 is equivalent to beam search with beam size 1"); + LOG(info, "Output sampling via top-{} sampling", k); + return New(std::dynamic_pointer_cast(baseModel), New(k)); + } else { + ABORT("Unknown sampling method: {}", method); + } + } else { return New(std::dynamic_pointer_cast(baseModel), New()); + } } #ifdef COMPILE_EXAMPLES // note: 'usage::translation' here means 'inference' diff --git a/src/models/transformer.h b/src/models/transformer.h index ec68b801a..95a55d3aa 100644 --- a/src/models/transformer.h +++ b/src/models/transformer.h @@ -148,8 +148,7 @@ class Transformer : public EncoderOrDecoderBase { int dimDepth = dimModel / dimHeads; - auto output - = reshape(input, {dimBatch * dimBeam, dimSteps, dimHeads, dimDepth}); + auto output = reshape(input, {dimBatch * dimBeam, dimSteps, dimHeads, dimDepth}); return transpose(output, {0, 2, 1, 3}); // [dimBatch*dimBeam, dimHeads, dimSteps, dimDepth] } @@ -364,9 +363,9 @@ class Transformer : public EncoderOrDecoderBase { Expr LayerAttention(std::string prefix, Expr input, // [-4: beam depth, -3: batch size, -2: max length, -1: vector dim] - const Expr& keys, // [-4: beam depth=1, -3: batch size, -2: max length, -1: vector dim] - const Expr& values, // ...? - const Expr& mask, // [-4: batch size, -3: num heads broadcast=1, -2: max length broadcast=1, -1: max length] + Expr keys, // [-4: beam depth=1, -3: batch size, -2: max length, -1: vector dim] + Expr values, // ...? + Expr mask, // [-4: batch size, -3: num heads broadcast=1, -2: max length broadcast=1, -1: max length] int dimHeads, bool cache = false, bool saveAttentionWeights = false) { @@ -376,6 +375,12 @@ class Transformer : public EncoderOrDecoderBase { auto opsPre = opt("transformer-preprocess"); auto output = preProcess(prefix + "_Wo", opsPre, input, dropProb); + // fixes missing norm for keys and values in self-attention with pre-norm + if(input == keys) + keys = output; + if(input == values) + values = output; + // multi-head self-attention over previous input output = MultiHead(prefix, dimModel, dimHeads, output, keys, values, mask, cache, saveAttentionWeights); @@ -403,7 +408,7 @@ class Transformer : public EncoderOrDecoderBase { opt("transformer-heads"), /*cache=*/false); } - Expr LayerFFN(std::string prefix, Expr input) const { + Expr LayerFFN(std::string prefix, Expr input, bool isDecoder=false) const { int dimModel = input->shape()[-1]; float dropProb = inference_ ? 0 : opt("transformer-dropout"); @@ -411,13 +416,22 @@ class Transformer : public EncoderOrDecoderBase { auto output = preProcess(prefix + "_ffn", opsPre, input, dropProb); auto actName = opt("transformer-ffn-activation"); + int dimFfn = opt("transformer-dim-ffn"); int depthFfn = opt("transformer-ffn-depth"); - float ffnDropProb - = inference_ ? 0 : opt("transformer-dropout-ffn"); - + if(isDecoder) { + int decDimFfn = opt("transformer-decoder-dim-ffn", 0); + if(decDimFfn != 0) + dimFfn = decDimFfn; + + int decDepthFfn = opt("transformer-decoder-ffn-depth", 0); + if(decDepthFfn != 0) + depthFfn = decDepthFfn; + } + ABORT_IF(depthFfn < 1, "Filter depth {} is smaller than 1", depthFfn); - + + float ffnDropProb = inference_ ? 0 : opt("transformer-dropout-ffn"); auto initFn = inits::glorotUniform(true, true, depthScaling_ ? 1.f / sqrtf((float)depth_) : 1.f); // the stack of FF layers @@ -866,7 +880,7 @@ class DecoderTransformer : public Transformer { // remember decoder state decoderStates.push_back(decoderState); - query = LayerFFN(prefix_ + "_l" + layerNo + "_ffn", query); // [-4: beam depth=1, -3: batch size, -2: max length, -1: vector dim] + query = LayerFFN(prefix_ + "_l" + layerNo + "_ffn", query, /*isDecoder=*/true); // [-4: beam depth=1, -3: batch size, -2: max length, -1: vector dim] checkpoint(query); } diff --git a/src/tensors/cpu/tensor_operators.cpp b/src/tensors/cpu/tensor_operators.cpp index 1afb8f648..1e1adc38b 100755 --- a/src/tensors/cpu/tensor_operators.cpp +++ b/src/tensors/cpu/tensor_operators.cpp @@ -24,6 +24,10 @@ void IsNaN(const Tensor /*in*/, Ptr /*allocator*/, bool& /*isNaN*/, b ABORT("Not implemented"); } +bool SanitizeGradient(marian::Tensor /*in*/, Ptr /*allocator*/, bool /*pruneNaN*/, bool /*clipInf*/) { + ABORT("Not implemented"); +} + template void CopyCastTo(To* out, const From* in, int length) { for(int i = 0; i < length; ++i) @@ -735,6 +739,7 @@ void Select(Tensor out, } } +template void Insert(Tensor out, const Tensor in, const Tensor indices, @@ -756,10 +761,16 @@ void Insert(Tensor out, int idxIndex = idxShape.bindex(dims); // broadcast index into indices tensor dims[axisCPU] = (int)indices->data()[idxIndex]; int outIndex = outShape.index(dims); - out->data()[outIndex] += in->data()[index]; + if(add) + out->data()[outIndex] += in->data()[index]; + else + out->data()[outIndex] = in->data()[index]; } } +template void Insert(Tensor out, const Tensor in, const Tensor indices, int axis); +template void Insert(Tensor out, const Tensor in, const Tensor indices, int axis); + void GRUFastForward(Tensor out_, std::vector inputs, bool final) { int rows = out_->shape().elements() / out_->shape().back(); int cols = out_->shape().back(); diff --git a/src/tensors/gpu/element.cu b/src/tensors/gpu/element.cu index 6790efd4b..e9cbe0812 100755 --- a/src/tensors/gpu/element.cu +++ b/src/tensors/gpu/element.cu @@ -29,7 +29,9 @@ __global__ void gElement( indices[i] = tensors[i].shape().bindex(dims); } - tensors[0].data()[index] = functional::apply(functor, tensors, indices); + // This performs the internal application of the functor in float32 regardless of the input type. + // It seems there are no speed penalties but improved precision. + tensors[0].data()[index] = (T)functional::applyWithCast(functor, tensors, indices); } } } @@ -65,13 +67,7 @@ void Element(Functor functor, Tensor out, Tensors... tensors) { ElementTyped(functor, out, tensors...); } else if(out->type() == Type::float16) { #if COMPILE_FP16 - std::vector ts({out, tensors...}); - bool div2 = std::all_of(ts.cbegin(), ts.cend(), [](marian::Tensor t){ return t->shape()[-1] % 2 == 0; }); - if(div2) { - ElementTyped(functor, out, tensors...); - } else { - ElementTyped(functor, out, tensors...); - } + ElementTyped(functor, out, tensors...); #else ABORT("FP16 not supported with chosen current hardware or CUDA version"); #endif diff --git a/src/tensors/gpu/prod.cpp b/src/tensors/gpu/prod.cpp index bf0d23957..c72af4db9 100755 --- a/src/tensors/gpu/prod.cpp +++ b/src/tensors/gpu/prod.cpp @@ -562,7 +562,11 @@ void ProdBatchedLegacy(marian::Tensor C, ProdBatchedTypedLegacy(C, allocator, A, B, transA, transB, beta, scalar); #if COMPILE_FP16 } else if(C->type() == Type::float16) { // not a *.cu file - ProdBatchedTypedLegacy(C, allocator, A, B, transA, transB, __float2half(beta), __float2half(scalar)); + // we use computeType=float here for fp16 training as this seems more stable and roughly as fast + ProdBatchedTypedLegacy(C, allocator, A, B, transA, transB, beta, scalar); + + // original for reference: + // ProdBatchedTypedLegacy(C, allocator, A, B, transA, transB, __float2half(beta), __float2half(scalar)); #endif } else { ABORT("ProdBatchedLegacy not implemented for element type {}", C->type()); diff --git a/src/tensors/gpu/tensor_operators.cu b/src/tensors/gpu/tensor_operators.cu index d55214bc7..2103ca9de 100644 --- a/src/tensors/gpu/tensor_operators.cu +++ b/src/tensors/gpu/tensor_operators.cu @@ -16,15 +16,12 @@ namespace gpu { namespace atomics { static inline __device__ void atomicAdd(float *address, float val) { - //*address += val; ::atomicAdd(address, val); } #if COMPILE_FP16 // @TODO: copied from CuTorch, adapt this better, give credit. static inline __device__ void atomicAdd(half *address, half val) { - //*address += val; - #if __CUDA_ARCH__ >= 700 && CUDA_VERSION >= 10000 // compute capability 70 and higher with CUDA 10 ::atomicAdd(address, val); #else // __CUDA_ARCH__ < 700 @@ -50,7 +47,8 @@ static inline __device__ void atomicAdd(half *address, half val) { } while (assumed != old); #endif // __CUDA_ARCH__ } -#endif +#endif // COMPILE_FP16 + } @@ -96,6 +94,81 @@ void IsNaN(const Tensor in, Ptr allocator, bool& isNaN, bool& isInf) cudaStreamSynchronize(0); } +template +__global__ void gSanitizeGradient(T* in, int length, + bool* isNaN, bool* isInf, + bool pruneNaN, bool clipInf, + float forNaN = 0.f, float forInf = 65504.f, float forInfNeg = -65504.f) { + for(int bid = 0; bid < length; bid += blockDim.x * gridDim.x) { + int index = bid + blockDim.x * blockIdx.x + threadIdx.x; + if(index < length) { + float v = (float)in[index]; + // handle NaN + if(isnan(v)) { + if(pruneNaN) { + in[index] = (T)forNaN; + } else { + *isNaN = true; + } + } + // handle +/- Inf + if(isinf(v)) { + if(clipInf) { + in[index] = v > 0 ? (T)forInf : (T)forInfNeg; + } else { + *isInf = true; + } + } + } + } +} + +// This function is meant to clean gradients, i.e. clip infinities and prune NaNs if required. +// If all NaNs and Infs have been removed we return `true` for indicating a sane gradient. +// If `clipInf` is set, infinities are replaced with the maximum/minimum non-inf value for the tensor. +// In that case infinities do not result in a bad gradient, since they get clipped. +// If `pruneNaN` is set, NaNs are replaced with 0. Since NaNs get removed now they do not result +// in a bad gradient. +// If NaNs or infinities are detected but not removed (either because of `pruneNaN=false` or `clipInf=false`), +// we return `false` indicating a bad gradient. +bool SanitizeGradient(marian::Tensor in, Ptr allocator, bool pruneNaN, bool clipInf) { + cudaSetDevice(in->getDeviceId().no); + + int length = in->size(); + + int threads = std::min(MAX_THREADS, length); + int blocks = std::min(MAX_BLOCKS, length / threads + (length % threads != 0)); + + auto mem = allocator->alloc(2); + bool* dIsNaN = &mem->data()[0]; + bool* dIsInf = &mem->data()[1]; + fill(in->getBackend(), dIsNaN, dIsNaN + 2, false); + + float forNaN = 0.f; + float forInf = NumericLimits(in->type()).max; + float forInfNeg = NumericLimits(in->type()).lowest; + + if(in->type() == Type::float32) { + gSanitizeGradient<<>>(in->data(), length, dIsNaN, dIsInf, pruneNaN, clipInf, forNaN, forInf, forInfNeg); +#if COMPILE_FP16 + } else if(in->type() == Type::float16) { + gSanitizeGradient<<>>(in->data(), length, dIsNaN, dIsInf, pruneNaN, clipInf, forNaN, forInf, forInfNeg); +#endif + } else { + ABORT("gSanitizeGradient for type {} not implemented", in->type()); + } + + bool isNaN, isInf; + CudaCopy(dIsNaN, dIsNaN + 1, &isNaN); + CudaCopy(dIsInf, dIsInf + 1, &isInf); + + allocator->free(mem); + + cudaStreamSynchronize(0); + + return !isNaN && !isInf; +} + template __global__ void gCopyCastTo(To* out, const From* in, int length) { for(int bid = 0; bid < length; bid += blockDim.x * gridDim.x) { @@ -1090,7 +1163,7 @@ void PasteRows(Tensor out, size_t rowsToCopy = indices->size(); int threads = std::min(MAX_THREADS, (int)cols); -#if 1 // @TODO: make this configurable with a 'deterministic' flag +#if 0 // @TODO: make this configurable with a 'deterministic' flag // If we only use one block, then each core operates on a different column, // hence the summation becomes deterministic. // However, we only use e.g. 512 cores out of possibly 3000+, so this will be @@ -1236,7 +1309,7 @@ __global__ void gSelect(T* out, } } -template +template __global__ void gInsert(T* out, functional::Shape outShape, const T* in, @@ -1254,7 +1327,10 @@ __global__ void gInsert(T* out, int idxIndex = idxShape.bindex(dims); // broadcast index into indices tensor dims[axis] = (int)d_indices[idxIndex]; int outIndex = outShape.index(dims); - out[outIndex] += in[index]; // this is probably wrong, atomicAdd? + if(add) + out[outIndex] += in[index]; // this is probably wrong, atomicAdd? + else + out[outIndex] = in[index]; } } } @@ -1276,21 +1352,21 @@ void Select(Tensor out, if(out->type() == Type::float32) { gSelect<<>>(out->data(), - out->shape(), - in->data(), - in->shape(), - axisGPU, - indices->data(), - indices->shape()); + out->shape(), + in->data(), + in->shape(), + axisGPU, + indices->data(), + indices->shape()); #if COMPILE_FP16 } else if (out->type() == Type::float16) { gSelect<<>>(out->data(), - out->shape(), - in->data(), - in->shape(), - axisGPU, - indices->data(), - indices->shape()); + out->shape(), + in->data(), + in->shape(), + axisGPU, + indices->data(), + indices->shape()); #endif } else if(out->type() == Type::uint32) { gSelect<<>>(out->data(), @@ -1305,6 +1381,7 @@ void Select(Tensor out, } } +template void Insert(Tensor out, const Tensor in, const Tensor indices, @@ -1320,28 +1397,31 @@ void Insert(Tensor out, int axisGPU = axis + functional::Shape::size() - out->shape().size(); if(out->type() == Type::float32) { - gInsert<<>>(out->data(), - out->shape(), - in->data(), - in->shape(), - axisGPU, - indices->data(), - indices->shape()); + gInsert<<>>(out->data(), + out->shape(), + in->data(), + in->shape(), + axisGPU, + indices->data(), + indices->shape()); #if COMPILE_FP16 } else if (out->type() == Type::float16) { - gInsert<<>>(out->data(), - out->shape(), - in->data(), - in->shape(), - axisGPU, - indices->data(), - indices->shape()); + gInsert<<>>(out->data(), + out->shape(), + in->data(), + in->shape(), + axisGPU, + indices->data(), + indices->shape()); #endif } else { ABORT("Insert not implemented for type {}", out->type()); } } +template void Insert(Tensor out, const Tensor in, const Tensor indices, int axis); +template void Insert(Tensor out, const Tensor in, const Tensor indices, int axis); + template __global__ void gGRUFastForward(T* out, const T* state, @@ -1355,7 +1435,7 @@ __global__ void gGRUFastForward(T* out, for(int bid = 0; bid < rows; bid += gridDim.x) { int j = bid + blockIdx.x; if(j < rows) { - T m = !mask || mask[j]; + float m = !mask || mask[j]; T* rowOut = out + j * cols; const T* rowState = state + j * cols; @@ -1365,21 +1445,21 @@ __global__ void gGRUFastForward(T* out, for(int tid = 0; tid < cols; tid += blockDim.x) { int i = tid + threadIdx.x; if(i < cols) { - T r = functional::Ops::sigmoid(xWrow[i] + sUrow[i] + b[i]); + float r = functional::Ops::sigmoid((float)xWrow[i] + (float)sUrow[i] + (float)b[i]); int k = i + cols; - T z = functional::Ops::sigmoid(xWrow[k] + sUrow[k] + b[k]); + float z = functional::Ops::sigmoid((float)xWrow[k] + (float)sUrow[k] + (float)b[k]); int l = i + 2 * cols; - T h; + float h; if(final) - h = functional::Ops::tanh(xWrow[l] + (sUrow[l] + b[l]) * r); + h = functional::Ops::tanh((float)xWrow[l] + ((float)sUrow[l] + (float)b[l]) * r); else - h = functional::Ops::tanh(xWrow[l] + sUrow[l] * r + b[l]); + h = functional::Ops::tanh((float)xWrow[l] + (float)sUrow[l] * r + (float)b[l]); - T out = ((T)1.f - z) * h + z * rowState[i]; - rowOut[i] = m * out + ((T)1.f - m) * rowState[i]; + float out = (1.f - z) * h + z * (float)rowState[i]; + rowOut[i] = (T)(m * out + (1.f - m) * (float)rowState[i]); } } } @@ -1441,7 +1521,7 @@ __global__ void gGRUFastBackward(T* outState, for(int bid = 0; bid < rows; bid += gridDim.x) { int j = bid + blockIdx.x; if(j < rows) { - T m = !mask || mask[j]; + float m = !mask || mask[j]; T* rowOutState = outState + j * cols; T* rowOutXW = outXW + j * cols * 3; @@ -1459,56 +1539,56 @@ __global__ void gGRUFastBackward(T* outState, int k = i + cols; int l = i + 2 * cols; - T r = functional::Ops::sigmoid(rowXW[i] + rowSU[i] + b[i]); - T z = functional::Ops::sigmoid(rowXW[k] + rowSU[k] + b[k]); + float r = functional::Ops::sigmoid((float)rowXW[i] + (float)rowSU[i] + (float)b[i]); + float z = functional::Ops::sigmoid((float)rowXW[k] + (float)rowSU[k] + (float)b[k]); - T h; + float h; if(final) - h = functional::Ops::tanh(rowXW[l] + (rowSU[l] + b[l]) * r); + h = functional::Ops::tanh((float)rowXW[l] + ((float)rowSU[l] + (float)b[l]) * r); else - h = functional::Ops::tanh(rowXW[l] + rowSU[l] * r + b[l]); + h = functional::Ops::tanh((float)rowXW[l] + (float)rowSU[l] * r + (float)b[l]); - T adj = rowAdj[i]; + float adj = rowAdj[i]; - T t = ((T)1.f - z) * ((T)1.f - h * h); + float t = (1.f - z) * (1.f - h * h); // df/ds if(outState) - rowOutState[i] += (m * z - m + (T)1.f) * adj; + rowOutState[i] += (T)((m * z - m + 1.f) * adj); // df/d(xW_r) ... - T dfdxW_r = m * r * ((T)1.f - r) * t * adj; + float dfdxW_r = m * r * (1.f - r) * t * adj; if(final) - dfdxW_r *= rowSU[l] + b[l]; + dfdxW_r *= (float)rowSU[l] + (float)b[l]; else - dfdxW_r *= rowSU[l]; + dfdxW_r *= (float)rowSU[l]; if(outXW) - rowOutXW[i] += dfdxW_r; + rowOutXW[i] += (T)dfdxW_r; if(outSU) - rowOutSU[i] += dfdxW_r; + rowOutSU[i] += (T)dfdxW_r; if(outB) - rowOutB[i] += dfdxW_r; + rowOutB[i] += (T)dfdxW_r; // df/d(xW_z) ... - T dfdxW_z = m * ((T)1.f - z) * z * (rowState[i] - h) * adj; + float dfdxW_z = m * (1.f - z) * z * ((float)rowState[i] - h) * adj; if(outXW) - rowOutXW[k] += dfdxW_z; + rowOutXW[k] += (T)dfdxW_z; if(outSU) - rowOutSU[k] += dfdxW_z; + rowOutSU[k] += (T)dfdxW_z; if(outB) - rowOutB[k] += dfdxW_z; + rowOutB[k] += (T)dfdxW_z; // df/d(xW_x) ... - T dfdxW_x = m * t * adj; + float dfdxW_x = m * t * adj; if(outXW) - rowOutXW[l] += dfdxW_x; + rowOutXW[l] += (T)dfdxW_x; if(outSU) - rowOutSU[l] += dfdxW_x * r; + rowOutSU[l] += (T)(dfdxW_x * r); if(outB) if(final) - rowOutB[l] += dfdxW_x * r; + rowOutB[l] += (T)(dfdxW_x * r); else - rowOutB[l] += dfdxW_x; + rowOutB[l] += (T)dfdxW_x; } } } diff --git a/src/tensors/tensor_operators.h b/src/tensors/tensor_operators.h index 6e587953c..1fc4542d8 100644 --- a/src/tensors/tensor_operators.h +++ b/src/tensors/tensor_operators.h @@ -41,6 +41,25 @@ DISPATCH2(CopyCast, marian::Tensor, const marian::Tensor); DISPATCH2(AddCast, marian::Tensor, const marian::Tensor); DISPATCH4(IsNaN, const Tensor, Ptr, bool&, bool&); +#ifdef CUDA_FOUND +namespace gpu { +bool SanitizeGradient(marian::Tensor in, Ptr allocator, bool pruneNaN, bool clipInf); +} +#endif + +namespace cpu { +bool SanitizeGradient(marian::Tensor in, Ptr allocator, bool pruneNaN, bool clipInf); +} + +static inline bool SanitizeGradient(marian::Tensor in, Ptr allocator, bool pruneNaN, bool clipInf) { +#ifdef CUDA_FOUND + if(in->getBackend()->getDeviceId().type == DeviceType::gpu) + return gpu::SanitizeGradient(in, allocator, pruneNaN, clipInf); + else +#endif + return cpu::SanitizeGradient(in, allocator, pruneNaN, clipInf); +} + template void Element(Functor functor, marian::Tensor out, Tensors... tensors) { #ifdef CUDA_FOUND @@ -278,7 +297,28 @@ DISPATCH3(CopyCols, marian::Tensor, const marian::Tensor, const marian::Tensor) DISPATCH3(PasteCols, marian::Tensor, const marian::Tensor, const marian::Tensor) DISPATCH4(Select, marian::Tensor, const marian::Tensor, const marian::Tensor, int) -DISPATCH4(Insert, marian::Tensor, const marian::Tensor, const marian::Tensor, int) + +#ifdef CUDA_FOUND +namespace gpu { + template + void Insert(Tensor out, const Tensor in, const Tensor indices, int axis); +} +#endif + +namespace cpu { + template + void Insert(Tensor out, const Tensor in, const Tensor indices, int axis); +} + +template +static inline void Insert(Tensor out, const Tensor in, const Tensor indices, int axis) { +#ifdef CUDA_FOUND + if(out->getBackend()->getDeviceId().type == DeviceType::gpu) + gpu::Insert(out, in, indices, axis); + else +#endif + cpu::Insert(out, in, indices, axis); +} DISPATCH7(TopK, marian::Tensor, marian::Tensor, Ptr, const marian::Tensor, int, int, bool); diff --git a/src/training/graph_group.cpp b/src/training/graph_group.cpp index e9c977b9c..59cd4b6d8 100644 --- a/src/training/graph_group.cpp +++ b/src/training/graph_group.cpp @@ -10,25 +10,19 @@ GraphGroup::GraphGroup(Ptr options, Ptr mpi) mbRoundUp_(options_->get("mini-batch-round-up", true)) { if(options_->hasAndNotEmpty("cost-scaling")) { auto vcs = options_->get>("cost-scaling"); - costScale_ = true; - float costExponent = std::stof(vcs[0]); - costScaleFactor_ = std::pow(2.0f, costExponent); - - if(vcs.size() > 1) costScaleFreq_ = std::stoul(vcs[1]); - if(vcs.size() > 2) costScaleMultiplier_ = std::stof(vcs[2]); - if(vcs.size() > 3) costScaleNanTolerance_ = std::stof(vcs[3]); - if(vcs.size() > 4) costScaleNanRange_ = std::stoul(vcs[4]); - if(vcs.size() > 5) costScaleFactorMinimum_ = std::stof(vcs[5]); + + costScaling_ = true; + costScalingFactor_ = std::stof( vcs[0]); + if(vcs.size() > 1) costScalingFreq_ = std::stoul(vcs[1]); + if(vcs.size() > 2) costScalingMultiplier_ = std::stof( vcs[2]); + if(vcs.size() > 3) costScalingFactorMinimum_ = std::stof( vcs[3]); LOG_ONCE(info, - "Training with cost scaling - factor: 2^{} = {}, frequency: {}, multiplier: {}, tolerance: {}, range: {}, minimum: {}", - costExponent, - costScaleFactor_, - costScaleFreq_, - costScaleMultiplier_, - costScaleNanTolerance_, - costScaleNanRange_, - costScaleFactorMinimum_); + "Training with cost scaling - factor: {}, frequency: {}, multiplier: {}, minimum: {}", + costScalingFactor_, + costScalingFreq_, + costScalingMultiplier_, + costScalingFactorMinimum_); } if(options_->hasAndNotEmpty("dynamic-gradient-scaling")) { @@ -37,11 +31,16 @@ GraphGroup::GraphGroup(Ptr options, Ptr mpi) if(vgc.size() > 0) dynamicGradientScalingFactor_ = std::stof(vgc[0]); if(vgc.size() > 1) dynamicGradientScalingUseLogs_ = vgc[1] == "log"; + if(vgc.size() > 2) dynamicGradientScalingFadeout_ = std::stoul(vgc[2]); LOG_ONCE(info, "Re-scaling gradient to have average gradient norm if (log={}) gradient norm diverges from average by {} sigmas", dynamicGradientScalingUseLogs_, dynamicGradientScalingFactor_); + if(dynamicGradientScalingFadeout_ > 0) + LOG_ONCE(info, + "Dynamic gradient re-scaling will fade out linearly after {} updates", + dynamicGradientScalingFadeout_); } if(options_->get("check-gradient-nan")) { @@ -96,21 +95,17 @@ void GraphGroup::initGraphsAndOpts() { // given number of iterations. Usually we increase by 2 which adds // one more bit for precision. void GraphGroup::increaseCostScaleFactor() { - if(!costScale_) + if(!costScaling_) return; noNanSeen_++; size_t total = nanSeen_ + noNanSeen_; - float nanPercent = noNanSeen_ == (float)nanSeen_ / (float)total; // total is at least 1 because of noNanSeen_++ - if(noNanSeen_ % costScaleFreq_ == 0) { - costScaleFactor_ *= costScaleMultiplier_; - LOG(debug, - "NaN/Inf percentage {:.2f} after {} gradient updates. Increasing cost-scaling factor to {}", - nanPercent, - total, - costScaleFactor_); + if(noNanSeen_ % costScalingFreq_ == 0) { + costScalingFactor_ *= costScalingMultiplier_; + if(isMainProcess()) + LOG(debug, "No NaN/Inf after {} gradient updates. Increasing cost-scaling factor to {}", total, costScalingFactor_); // Resetting counts after cost-scale change noNanSeen_ = 0; @@ -120,48 +115,56 @@ void GraphGroup::increaseCostScaleFactor() { // call when a NaN was seen to decrease cost-scaling factor void GraphGroup::decreaseCostScaleFactor() { - if(!costScale_) + if(!costScaling_) return; nanSeen_++; size_t total = nanSeen_ + noNanSeen_; - float nanPercent = (float)nanSeen_ / (float)total; // total is at least 1 because of nanSeen_++ - if(total >= costScaleNanRange_ && nanPercent > costScaleNanTolerance_) { - if(costScaleFactor_ > costScaleFactorMinimum_) { - costScaleFactor_ /= costScaleMultiplier_; - LOG(debug, - "NaN/Inf percentage {:.2f} in {} gradient updates, reducing cost-scaling factor to {}", - nanPercent, - total, - costScaleFactor_); - } else { - // @TODO: think if should this rather abort? - LOG(warn, - "NaN/Inf percentage {:.2f} in {} gradient updates, but cost-scaling factor {} is already at minimum", - nanPercent, - total, - costScaleFactor_); - } - // Resetting counts after cost-scale change - noNanSeen_ = 0; - nanSeen_ = 0; + // do not reduce cost-scaling factor below minimum + if(costScalingFactor_ > costScalingFactorMinimum_) + costScalingFactor_ /= costScalingMultiplier_; + + if(isMainProcess()) { + if(costScalingFactor_ > costScalingFactorMinimum_) + LOG(debug, "Seen NaN/Inf after {} gradient updates. Reduced cost-scaling factor to {}", total, costScalingFactor_); + else + LOG(debug, "Seen NaN/Inf after {} gradient updates, Reduced cost-scaling factor to minimum {}. Pruning NaNs now.", total, costScalingFactor_); } + + // Resetting counts after cost-scale change + noNanSeen_ = 0; + nanSeen_ = 0; } float GraphGroup::checkNanOrNorm(size_t i, size_t begin, size_t end) { auto curGrad = graphs_[i]->params()->grads()->subtensor(begin, end-begin); - if(checkGradientNan_ || costScale_) { - bool hasNan = false, hasInf = false; - IsNaN(curGrad, graphs_[i]->allocator(), hasNan, hasInf); // @TODO: make safe with different compiler options - if(hasNan || hasInf) { - LOG(debug, "Found Nan ({}) or Inf ({})", hasNan, hasInf); + // If costScaling_ then check for NaN values if the costScalingFactor_ is larger than + // the minimum. If a NaN value is seen we exit here and will reduce the factor next and + // this skips an update. + // If costScalingFactor_ is already at the minimum, prune the NaN values away. This replaces + // NaNs with 0. Updates are not skipped any more. + // Regardless of NaNs, we clip +/-inf to the largest corresponding values for the gradient value type. + // This changes the gradient but seems to be quite stable. In effect, for fp16 this is equivalent + // to gradient clipping at (65504.f / costScalingFactor_) which in most cases is still large. + if(costScaling_ || checkGradientNan_) { + bool pruneNaN = !checkGradientNan_ && costScalingFactor_ == costScalingFactorMinimum_; + bool clipInf = !checkGradientNan_; + bool saneGradient = SanitizeGradient(curGrad, graphs_[i]->allocator(), pruneNaN, clipInf); + + // This should never happen, if it does, something is wrong with the kernel above and needs to be fixed. + ABORT_IF(pruneNaN && clipInf && !saneGradient, "We are removing NaNs and clipping Infs, but gradient is still not sane??"); + + if(!saneGradient) { + LOG(debug, "Found NaN"); return std::numeric_limits::quiet_NaN(); } } - + + // The optional clipping above will affect the norm here. The norm can be non-finite despite the above + // gradient sanitization, hence check again and propagate a NaN. if(dynamicGradientScaling_) { auto gNorm = L2Norm(curGrad, graphs_[i]->allocator()); if(isFinite(gNorm) && gNorm > 0.0) @@ -197,8 +200,8 @@ float GraphGroup::executeAndCollectNorm(const std::functionget("normalize-gradient")) normalizationFactor *= updateTrgWords; @@ -207,9 +210,9 @@ float GraphGroup::computeNormalizationFactor(float gNorm, size_t updateTrgWords) return normalizationFactor; if(dynamicGradientScaling_) { - // make gradient norm invariant to changes in costScaleFactor_, luckily norm(c * g) = c * norm(g) - if(costScale_) - gNorm = gNorm / costScaleFactor_; + // make gradient norm invariant to changes in costScalingFactor_, luckily norm(c * g) = c * norm(g) + if(costScaling_) + gNorm = gNorm / costScalingFactor_; // Normalize gradient norm w.r.t. number of labels in batch for statistics, // there should be no gradient normalization before this point, @TODO: check this @@ -231,11 +234,17 @@ float GraphGroup::computeNormalizationFactor(float gNorm, size_t updateTrgWords) auto deltaTransform = gNormTransform - gNormAvgTransform; // compute the difference between the current transformer gradient norm and the running average. auto gNormStdTransform = std::sqrt(gNormVarTransform); // compute STD for the running average of (log) gradient norms. + float fadeoutMultiplier = 1.f; + if(dynamicGradientScalingFadeout_ > 0ul) // fade out linearly after that many updates @TODO: allow units other than updates + fadeoutMultiplier = (float)std::max(dynamicGradientScalingFadeout_, scheduler_->numberOfBatches()) / (float)dynamicGradientScalingFadeout_; + + float dynamicGradientScalingFactorWithFadeout = dynamicGradientScalingFactor_ * fadeoutMultiplier; // if fadeoutMultiplier increases dynamic gradient scaling becomes less and less likely to happen over time. // delta of (log) gradient norm vs (log) gradient norm average is larger than N standard deviations // hence rescale gradient using the average. - if(scheduler_->numberOfBatches() >= window && deltaTransform > dynamicGradientScalingFactor_ * gNormStdTransform) { - LOG(debug, "log gradient norms: {} :: {:.4f} - {:.4f} = {:.4f} > {:.4f} * {:.4f}", - dynamicGradientScalingUseLogs_, gNormTransform, gNormAvgTransform, deltaTransform, dynamicGradientScalingFactor_, gNormStdTransform); + if(scheduler_->numberOfBatches() >= window && deltaTransform > dynamicGradientScalingFactorWithFadeout * gNormStdTransform) { + if(isMainProcess()) + LOG(debug, "log gradient norms: {} :: {:.4f} - {:.4f} = {:.4f} > {:.4f} * {:.4f} - scaling gradient by {:.4f}", + dynamicGradientScalingUseLogs_, gNormTransform, gNormAvgTransform, deltaTransform, dynamicGradientScalingFactorWithFadeout, gNormStdTransform, gNormAvg / gNorm); normalizationFactor *= gNorm / gNormAvg; // since we later do gradient / normalizationFactor this divides by norm and multiplies by the average, rescaling to the average. } @@ -288,9 +297,7 @@ void GraphGroup::load(const OptimizerBase::ScatterStateFunc& scatterFn) { restoreFromCheckpoint(modelFileName, scatterFn); } else if(options_->hasAndNotEmpty("pretrained-model")) { std::string nameInit = options_->get("pretrained-model"); - LOG(info, - "[training] Initializing model weights with pre-trained model {}", - nameInit); + LOG(info, "[training] Initializing model weights with pre-trained model {}", nameInit); size_t i = 0; for(auto graph : graphs_) diff --git a/src/training/graph_group.h b/src/training/graph_group.h index 0e4a68dcc..9f1362e75 100644 --- a/src/training/graph_group.h +++ b/src/training/graph_group.h @@ -60,21 +60,21 @@ class GraphGroup { double typicalTrgBatchWords_{0}; // for dynamic batch sizing: typical batch size in words bool mbRoundUp_{true}; // round up batches for more efficient training but can make batch size less stable, disable with --mini-batch-round-up=false - bool costScale_{false}; - float costScaleFactor_{1.f}; // @TODO, add current costScaleFactor_ to trainingState for serialization - size_t costScaleFreq_{2000}; - float costScaleMultiplier_{2.f}; - float costScaleNanTolerance_{0.f}; - size_t costScaleNanRange_{1}; - float costScaleFactorMinimum_{1.f}; // @TODO make this configureable + bool costScaling_{false}; + float costScalingFactor_{1.f}; // @TODO, add current costScalingFactor_ to trainingState for serialization + size_t costScalingFreq_{2000}; + float costScalingMultiplier_{2.f}; + float costScalingFactorMinimum_{1.f}; + size_t noNanSeen_{0}; // @TODO, add current noNanSeen_ to trainingState for serialization size_t nanSeen_{0}; + bool checkGradientNan_{false}; + bool dynamicGradientScaling_{false}; float dynamicGradientScalingFactor_{2.f}; bool dynamicGradientScalingUseLogs_{false}; - - bool checkGradientNan_{false}; + size_t dynamicGradientScalingFadeout_{0ul}; // determines the number of input streams (i.e. input files or fields in the TSV input) that need // to be included in the batch, i.e. without alignments and weights diff --git a/src/training/graph_group_async.cpp b/src/training/graph_group_async.cpp index 72b06e489..f85f9cf85 100644 --- a/src/training/graph_group_async.cpp +++ b/src/training/graph_group_async.cpp @@ -143,13 +143,13 @@ void AsyncGraphGroup::execute(Ptr batch) { thread_local Tensor accGradients; thread_local Ptr accAlloc; - ABORT_IF(costScale_ ,"Cost-scaling not implemented for AsyncSGD"); + ABORT_IF(costScaling_ ,"Cost-scaling not implemented for AsyncSGD"); auto graph = graphs_[tid]; Ptr dynamicLoss = models_[tid]->build(graph, batch); - if(costScaleFactor_ != 1.f) { + if(costScalingFactor_ != 1.f) { // it's ok to go out of scope, this will still insert the new top node into the graph - auto costNode = dynamicLoss->loss() * costScaleFactor_; + auto costNode = dynamicLoss->loss() * costScalingFactor_; } if(t % optimizerDelay_ == 0) { diff --git a/src/training/graph_group_singleton.cpp b/src/training/graph_group_singleton.cpp index 7dc861375..162610705 100644 --- a/src/training/graph_group_singleton.cpp +++ b/src/training/graph_group_singleton.cpp @@ -16,16 +16,16 @@ void SingletonGraph::execute(Ptr batch) { auto opt = optimizerShards_[0]; auto lossNode = model->build(graph, batch); - if(costScaleFactor_ != 1.f) { + if(costScalingFactor_ != 1.f) { // for fp16 training, it's ok to go out of scope, we do not use the scaled version for anything - auto scaledLoss = lossNode->loss() * costScaleFactor_; + auto scaledLoss = lossNode->loss() * costScalingFactor_; } graph->forward(); graph->backward(); bool noNanOrInf = true; - if(costScale_) { + if(costScaling_) { // Are there NaNs in the gradient? bool hasNan = false, hasInf = false; IsNaN(graph->params()->grads(), graph->allocator(), hasNan, hasInf); @@ -39,7 +39,7 @@ void SingletonGraph::execute(Ptr batch) { opt->update(graph->params()->vals(), graph->params()->grads(), batch->wordsTrg(), - costScaleFactor_); + costScalingFactor_); if(scheduler_) { scheduler_->update(*lossNode, batch); diff --git a/src/training/graph_group_sync.cpp b/src/training/graph_group_sync.cpp index 8c06761e1..c90a384e4 100644 --- a/src/training/graph_group_sync.cpp +++ b/src/training/graph_group_sync.cpp @@ -252,8 +252,8 @@ void SyncGraphGroup::update(std::vector> subBatches, size_t num { // let loss go out of scope, frees memory auto rationalLoss = models_[localDeviceIndex]->build(graph, subBatch); - if(costScaleFactor_ != 1.f) - rationalLoss->loss() * costScaleFactor_; + if(costScalingFactor_ != 1.f) + rationalLoss->loss() * costScalingFactor_; graph->forward(); localDeviceLosses[localDeviceIndex] += *rationalLoss; @@ -262,7 +262,7 @@ void SyncGraphGroup::update(std::vector> subBatches, size_t num graph->backward(/*zero=*/false); // (gradients are reset before we get here) } -#if 1 +#if 0 // @TODO: this can probably be removed now, keep around until confirmed. // experimental and should eventually be somewhere else // Handle local gradient explosion but only clip to largest possible value // given number of GPUs and type. Should clip rarely. Also clips inf @@ -284,7 +284,7 @@ void SyncGraphGroup::update(std::vector> subBatches, size_t num comm_->scatterReduceAndResetGrads(); // reduce gradients across all devices (globally) into shards float gradNorm = 0.f; - if(costScale_ || dynamicGradientScaling_ || checkGradientNan_) { + if(costScaling_ || dynamicGradientScaling_ || checkGradientNan_) { // Wrapping member function auto checkNanOrNorm = [&](size_t i, size_t begin, size_t end) { return GraphGroup::checkNanOrNorm(i, begin, end); diff --git a/src/translator/beam_search.cpp b/src/translator/beam_search.cpp index 2a0d3947a..580895f2f 100644 --- a/src/translator/beam_search.cpp +++ b/src/translator/beam_search.cpp @@ -94,7 +94,7 @@ Beams BeamSearch::toHyps(const std::vector& nBestKeys, // [current // For factored decoding, the word is built over multiple decoding steps, // starting with the lemma, then adding factors one by one. if (factorGroup == 0) { - word = factoredVocab->lemma2Word(shortlist ? shortlist->reverseMap((int) prevBeamHypIdx, (int) currentBatchIdx, wordIdx) : wordIdx); // @BUGBUG: reverseMap is only correct if factoredVocab_->getGroupRange(0).first == 0 + word = factoredVocab->lemma2Word(shortlist ? shortlist->reverseMap((int) prevBeamHypIdx, (int) currentBatchIdx, wordIdx) : wordIdx); std::vector factorIndices; factoredVocab->word2factors(word, factorIndices); //LOG(info, "{} + {} ({}) -> {} -> {}", // factoredVocab->decode(prevHyp->tracebackWords()), @@ -115,7 +115,7 @@ Beams BeamSearch::toHyps(const std::vector& nBestKeys, // [current } } else if (shortlist) - word = Word::fromWordIndex(shortlist->reverseMap((int) prevBeamHypIdx, (int) origBatchIdx, wordIdx)); + word = Word::fromWordIndex(shortlist->reverseMap((int) prevBeamHypIdx, (int) currentBatchIdx, wordIdx)); else word = Word::fromWordIndex(wordIdx); @@ -330,6 +330,7 @@ Histories BeamSearch::search(Ptr graph, Ptr auto prevBatchIdxMap = batchIdxMap; // [origBatchIdx -> currentBatchIdx] but shifted by one time step // main loop over output time steps for (size_t t = 0; ; t++) { + //std::cerr << "\nstep=" << t << std::endl; ABORT_IF(origDimBatch != beams.size(), "Lost a batch entry??"); // determine beam size for next output time step, as max over still-active sentences // E.g. if all batch entries are down from beam 5 to no more than 4 surviving hyps, then diff --git a/src/translator/nth_element.cpp b/src/translator/nth_element.cpp index 237d9b9da..dbcceec47 100644 --- a/src/translator/nth_element.cpp +++ b/src/translator/nth_element.cpp @@ -3,7 +3,9 @@ * SPDX-License-Identifier: MIT */ +#include "common/utils.h" #include "translator/nth_element.h" + #include #include #include diff --git a/src/translator/translator.h b/src/translator/translator.h index 4084ced95..0621fc8ce 100644 --- a/src/translator/translator.h +++ b/src/translator/translator.h @@ -122,7 +122,7 @@ class Translate : public ModelTask { threadPool.enqueue(task, device, id++); } - if(options_->get("output-sampling", false)) { + if(options_->hasAndNotEmpty("output-sampling")) { if(options_->get("beam-size") > 1) LOG(warn, "[warning] Output sampling and beam search (beam-size > 1) are contradictory methods "