index.js

const { Transform } = require('stream');
const native = require('./native');
const lame = require('./lame');
const opus = require('./opus');
const pcm = require('./pcm');
const sodium = require('./sodium');

/**
 * A C++ function that can be passed into {@link do}
 * @typedef NativeFunction
 */

/**
 * A chain of functions
 */
class Chain extends Transform {
  /**
   * Creates a chain
   * @hideconstructor
   */
  constructor() {
    super();

    this.functions = [];
    this.buffer = Buffer.alloc(0);
  }

  /**
   * Queues a {@link NativeFunction} to be ran.
   * @param {NativeFunction<T>} func - The function
   * @returns {Chain<T>} - The Chain
   */
  do(func) {
    const chain = new Chain();

    if (func.chunker === true && this.functions.length === 0) {
      chain.chunker = func;
    } else {
      chain.functions = this.functions.concat(func);
    }

    return chain;
  }

  /**
   * Runs the chain of functions
   * @param {bool} [async = true] - Whether or not to run asynchronously in a thread
   * @returns {Promise<T>|T} - The unwrap>ed return value of the last function in the chain
   */
  run(async, data) {
    if (async === undefined) {
      if (module.exports.count === 1) {
        return Promise.resolve(native.run(this.functions, false, data));
      }

      return native.run(this.functions, true, data);
    }

    return native.run(this.functions, async, data);
  }

  async _transform(chunk, encoding, callback) {
    if (this.chunker) {
      if (this.chunker.name === 'Chunker::Opus') {
        this.buffer = Buffer.concat([this.buffer, chunk]);
        const required = this.chunker.args.frameSize * this.chunker.args.channels * 2;
        // console.log(required);
        let n = 0;
        while (this.buffer.length >= required * (n + 1)) {
          this.push(await this.run(undefined, this.buffer.slice(n * required, (n + 1) * required)));
          n += 1;
        }
        if (n > 0) this.buffer = this.buffer.slice(n * required);
        return callback();
      }
    } else {
      return callback(null, await this.run(undefined, chunk));
    }

    return null;
  }
}

module.exports = {
  lame,
  opus,
  pcm,
  sodium,

  /**
   * The number of operations running in parallel, you must increase and decrease this yourself.
   * @type {Number}
   */
  count: 0,

  chunker: {
    opus(channels, frameSize) {
      return {
        chunker: true,
        name: 'Chunker::Opus',
        args: {
          channels,
          frameSize,
        },
      };
    },
  },

  /**
   * Queues a {@link NativeFunction} to be ran.
   * @function do
   * @param {NativeFunction<T>} func - The function
   * @returns {Chain<T>} - The Chain
   */
  do(func) {
    return new Chain().do(func);
  },
};