Source: index.js

'use strict';

const { spawn } = require('child_process');
const { Writable } = require('stream');
const { EventEmitter } = require('events');

class FfmpegRespawn extends EventEmitter {
  /**
   *
   * @param options {Object}
   * @param options.params {String[]} - Parameters passed to ffmpeg process.
   * @param [options.pipes] {Writable[]|Function[]} - Array of writable pipes and/or functions to receive data output from ffmpeg stdio[n].
   * @param [options.path=ffmpeg] {String} - Specify path to ffmpeg if it is not in PATH.
   * @param [options.logLevel=quiet] {String} - Valid options: quiet, -8, panic, 0, fatal, 8, error, 16, warning, 24, info, 32, verbose, 40, debug, 48, trace, 56.
   * @param [options.killAfterStall=10] {Number} - Valid range: 10 - 60. Number of seconds to wait to kill ffmpeg process if not receiving progress.
   * @param [options.reSpawnDelay=2] {Number} - Valid range: 2 - 60. Number of seconds to wait to re-spawn ffmpeg process after it exits.
   * @param [options.reSpawnLimit=1] {Number} - Valid range: 0 - Infinity. Number of attempts to re-spawn ffmpeg after exiting without progress. Fail event will be emitted after reaching limit.
   * @param [options.debug=false] {Boolean} - If true, will output some debugging data to console.
   * @returns {FfmpegRespawn}
   */
  constructor(options) {
    super();

    //options are required on instantiation
    if (!options) {
      throw new Error('Options error: must pass a configuration object');
    }

    //check that params is array and has a minimum number of items
    if (!options.params || !Array.isArray(options.params) || options.params.length < 3) {
      throw new Error('Params error: must be an array with a minimum of 3 items.');
    }

    //set params property since it met the bare minimum
    this._params = options.params;

    //set defaults that will be passed to spawned ffmpeg, will update when looping through params
    this._stdio = ['ignore', 'ignore', 'ignore', 'ignore'];

    let paramPipeCount = 0;

    //loop through params and configure pipes
    for (let i = 0; i < this._params.length; i++) {
      if (
        this._params[i] === '-i' &&
        (this._params[i + 1] === 'pipe:0' || this._params[i + 1] === '-' || this._params[i + 1] === 'pipe:')
      ) {
        throw new Error('Params error: stdin/stdio[0]/pipe:0 not supported yet.');
      }
      if (this._params[i] === 'pipe:1' || this._params[i] === 'pipe:' || this._params[i] === '-') {
        throw new Error('Params error: stdout/stdio[1]/pipe:1 is reserved for internal progress monitoring.');
      }
      if (this._params[i] === 'pipe:2') {
        throw new Error('Params error: pipe:2 is reserved, set options.logLevel and listen to "stderr" event instead.');
      }
      if (this._params[i] === '-loglevel') {
        throw new Error(
          'Params error: -loglevel is reserved, set options.logLevel and listen to "stderr" event instead.'
        );
      }
      const results = /pipe:(\d+)/.exec(this._params[i]);
      if (results && results.index === 0) {
        this._stdio[results[1]] = 'pipe';
        paramPipeCount++;
      }
    }

    if (
      paramPipeCount > 0 &&
      (!options.pipes || !Array.isArray(options.pipes) || options.pipes.length !== paramPipeCount)
    ) {
      throw new Error(`Pipes error: must be an array with ${paramPipeCount} item${paramPipeCount === 1 ? '' : 's'}).`);
    }

    //create temp variable to hold array
    const pipes = options.pipes;

    //will be used to keep track of pipes passed to spawned ffmpeg process
    this._stdioPipes = [];

    //test if any pipes were skipped in options.params, and process options.pipes
    for (let i = 0; i < this._stdio.length; i++) {
      if (this._stdio[i] === undefined) {
        throw new Error(`Params error: pipe:${i} was skipped.`);
      }
      if (this._stdio[i] === 'pipe') {
        let foundPipe = false;
        for (let j = 0; j < pipes.length; j++) {
          if (pipes[j].stdioIndex === i) {
            const pipe = pipes[j];
            if (typeof pipe.destination === 'function' && pipe.destination.length > 0) {
              this._stdioPipes.push({
                stdioIndex: pipe.stdioIndex,
                destination: FfmpegRespawn._createWritable(pipe.destination)
              });
            } else if (pipe.destination instanceof Writable) {
              this._stdioPipes.push(pipe);
            } else {
              throw new Error(
                `Destination: ${pipe.destination} must be a stream(Writable, Duplex, Transform) or a callback function that can receive a single param.`
              );
            }
            foundPipe = true;
          }
        }
        if (!foundPipe) {
          throw new Error(`Pipes error: pipes array did not have a matching pipe or callback for pipe:${i}.`);
        }
      }
    }

    //create and add the progress pipe to our stdioPipes array
    this._stdioPipes.push({
      stdioIndex: 1,
      destination: FfmpegRespawn._createWritable((data) => {
        this._checkProgress(data);
      })
    });

    //add the progress pipe to front of params array
    this._params.unshift(...['-progress', 'pipe:1']);

    //add 'pipe' to array that will activate stdio[1] from ffmpeg
    this._stdio[1] = 'pipe';

    //check options.logLevel and create pipe to handle output if necessary
    if (FfmpegRespawn._checkLoglevel(options.logLevel)) {
      this._stdioPipes.push({
        stdioIndex: 2,
        destination: FfmpegRespawn._createWritable((data) => {
          this.emit('stderr', data);
        })
      });
      this._params.unshift(...['-loglevel', options.logLevel]);
      this._stdio[2] = 'pipe';
    } else {
      this._params.unshift(...['-loglevel', 'quiet']);
    }

    //optional, path to ffmpeg
    if (options.path) {
      this._path = options.path;
    } else {
      this._path = 'ffmpeg';
    }

    //configure number of seconds that passes without progress before killing ffmpeg process
    const killAfterStall = Math.trunc(options.killAfterStall);
    if (isNaN(killAfterStall) || killAfterStall < 10) {
      this._killAfterStall = 10000;
    } else if (killAfterStall > 60) {
      this._killAfterStall = 60000;
    } else {
      this._killAfterStall = killAfterStall * 1000;
    }

    //configure time to wait before re spawning ffmpeg after exiting
    const reSpawnDelay = Math.trunc(options.reSpawnDelay);
    if (isNaN(reSpawnDelay) || reSpawnDelay < 2) {
      this._reSpawnDelay = 2000;
    } else if (reSpawnDelay > 60) {
      this._reSpawnDelay = 60000;
    } else {
      this._reSpawnDelay = reSpawnDelay * 1000;
    }

    //set number of failed attempts to respawn, number is reset if progress occurs
    const reSpawnLimit = Math.trunc(options.reSpawnLimit);
    if (Number.isNaN(reSpawnLimit)) {
      this._reSpawnLimit = 1;
    } else if (reSpawnLimit < 0) {
      this._reSpawnLimit = 0;
    } else {
      this._reSpawnLimit = reSpawnLimit;
    }

    //output some details if in debug
    if (options.debug) {
      console.dir(this, { showHidden: true, depth: 2, colors: true });
    }

    return this;
  }

  /**
   *
   * @readonly
   * @return {*|null}
   */
  get ffmpeg() {
    return this._ffmpeg || null;
  }

  /**
   *
   * @readonly
   * @return {*}
   */
  get stdio() {
    return this._ffmpeg && this._ffmpeg.stdio ? this._ffmpeg.stdio : null;
  }

  /**
   *
   * @readonly
   * @returns {String | null}
   */
  get params() {
    return this._params.join(' ') || null;
  }

  /**
   *
   * @readonly
   * @returns {Boolean}
   */
  get running() {
    return this._running || false;
  }

  /**
   *
   * @readonly
   * @return {String | null}
   */
  get progress() {
    return this._progress || null;
  }

  /**
   *
   * @returns {FfmpegRespawn}
   */
  start() {
    if (this._running !== true) {
      this._running = true;
      this._reSpawnCounter = 0;
      this._spawn();
    }
    return this;
  }

  /**
   *
   * @returns {FfmpegRespawn}
   */
  stop() {
    if (this._running === true) {
      this._running = false;
      this._stopStallInterval();
      this._stopReSpawnTimer();
      this._kill();
    }
    return this;
  }

  /**
   *
   * Stops process and makes instance unusable
   */
  destroy() {
    this.stop();
    delete this._stdioPipes;
    delete this._progress;
    delete this._reSpawnDelay;
    delete this._params;
    delete this._path;
    delete this._progressTime;
    delete this._reSpawnCounter;
    delete this._reSpawnLimit;
    delete this._running;
    delete this._stdio;
    delete this._killAfterStall;
    this.emit('destroy');
  }

  /**
   *
   * @private
   */
  _spawn() {
    this._ffmpeg = spawn(this._path, this._params, { stdio: this._stdio });
    this._ffmpeg.once('error', (error) => {
      throw error;
    });
    this._ffmpeg.once('exit', (code, signal) => {
      this._onExit(code, signal);
    });
    for (let i = 0; i < this._stdioPipes.length; i++) {
      this._ffmpeg.stdio[this._stdioPipes[i].stdioIndex].pipe(
        this._stdioPipes[i].destination,
        { end: false }
      );
    }
    this._startStallInterval();
  }

  /**
   *
   * @private
   */
  _kill() {
    if (this._ffmpeg) {
      if (this._ffmpeg.kill(0)) {
        for (let i = 0; i < this._stdioPipes.length; i++) {
          const pipe = this._stdioPipes[i];
          this._ffmpeg.stdio[pipe.stdioIndex].unpipe(pipe.destination);
          if (pipe.resetCache && typeof pipe.destination.resetCache === 'function') {
            pipe.destination.resetCache();
          }
        }
        this._ffmpeg.kill(process.platform === 'darwin' || process.platform === 'linux' ? 'SIGHUP' : 'SIGTERM'); //SIGTERM, SIGINT, (SIGHUP fails on Windows, works reliably on macOS)
      }
      delete this._ffmpeg;
      this.emit('kill');
    }
  }

  /**
   *
   * @param code
   * @param signal
   * @private
   */
  _onExit(code, signal) {
    this._kill();
    this._stopStallInterval();
    this.emit('exit', code, signal);
    if (this._running === true) {
      if (this._reSpawnCounter >= this._reSpawnLimit) {
        this._running = false;
        this.emit(
          'fail',
          `Ffmpeg unable to get progress from input source after ${this._reSpawnCounter} attempt${
            this._reSpawnCounter === 1 ? '' : 's'
          } at respawning.`
        );
      } else {
        this._reSpawnCounter++;
        this._startReSpawnTimer();
      }
    }
  }

  /**
   *
   * @param chunk
   * @private
   */
  _checkProgress(chunk) {
    const string = chunk.toString();
    const array = string.split('\n').slice(0, -1);
    const object = {};
    for (let i = 0; i < array.length; i++) {
      const tempArr = array[i].split('=');
      object[tempArr[0]] = tempArr[1].trimLeft();
    }
    if (object.progress === 'continue') {
      this._progressTime = Date.now();
      this._reSpawnCounter = 0;
      this._progress = string;
      this.emit('progress', object, string);
    } else if (object.progress === 'end') {
      this._progress = string;
      console.log('progress end');
      // todo call stop() ??
    }
  }

  /**
   *
   * @private
   */
  _startStallInterval() {
    this._stopStallInterval();
    this._progressTime = Date.now();
    this._stallInterval = setInterval(() => {
      const elapsed = Date.now() - this._progressTime;
      if (elapsed > this._killAfterStall) {
        this._stopStallInterval();
        this._kill();
      }
    }, 2000);
  }

  /**
   *
   * @private
   */
  _stopStallInterval() {
    if (this._stallInterval) {
      clearInterval(this._stallInterval);
      delete this._stallInterval;
    }
  }

  /**
   *
   * @private
   */
  _startReSpawnTimer() {
    this._stopReSpawnTimer();
    this._spawnTimer = setTimeout(() => {
      this._stopReSpawnTimer();
      this._spawn();
    }, this._reSpawnDelay);
  }

  /**
   *
   * @private
   */
  _stopReSpawnTimer() {
    if (this._spawnTimer) {
      clearTimeout(this._spawnTimer);
      delete this._spawnTimer;
    }
  }

  /**
   *
   * @param level
   * @return {boolean}
   * @private
   */
  static _checkLoglevel(level) {
    const levels = [
      'quiet',
      '-8',
      'panic',
      '0',
      'fatal',
      '8',
      'error',
      '16',
      'warning',
      '24',
      'info',
      '32',
      'verbose',
      '40',
      'debug',
      '48',
      'trace',
      '56'
    ];
    return levels.indexOf(level) > 1;
  }

  /**
   *
   * @param destination
   * @return {Writable}
   * @private
   */
  static _createWritable(destination) {
    return new Writable({
      write(chunk, encoding, callback) {
        destination(chunk);
        callback();
      }
    });
  }
}

module.exports = FfmpegRespawn;

//todo add error event listener to pipes and propagate event out of ffmpeg-respawn instance