Blob Blame History Raw
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include <algorithm>
#include "mozilla/Attributes.h"
#include "mozilla/IntegerPrintfMacros.h"
#include "mozilla/ReentrantMonitor.h"
#include "nsIBufferedStreams.h"
#include "nsICloneableInputStream.h"
#include "nsIPipe.h"
#include "nsIEventTarget.h"
#include "nsISeekableStream.h"
#include "mozilla/RefPtr.h"
#include "nsSegmentedBuffer.h"
#include "nsStreamUtils.h"
#include "nsCOMPtr.h"
#include "nsCRT.h"
#include "mozilla/Logging.h"
#include "nsIClassInfoImpl.h"
#include "nsAlgorithm.h"
#include "nsMemory.h"
#include "nsIAsyncInputStream.h"
#include "nsIAsyncOutputStream.h"

using namespace mozilla;

#ifdef LOG
#undef LOG
#endif
//
// set MOZ_LOG=nsPipe:5
//
static LazyLogModule sPipeLog("nsPipe");
#define LOG(args) MOZ_LOG(sPipeLog, mozilla::LogLevel::Debug, args)

#define DEFAULT_SEGMENT_SIZE 4096
#define DEFAULT_SEGMENT_COUNT 16

class nsPipe;
class nsPipeEvents;
class nsPipeInputStream;
class nsPipeOutputStream;
class AutoReadSegment;

namespace {

enum MonitorAction { DoNotNotifyMonitor, NotifyMonitor };

enum SegmentChangeResult { SegmentNotChanged, SegmentAdvanceBufferRead };

}  // namespace

//-----------------------------------------------------------------------------

// this class is used to delay notifications until the end of a particular
// scope.  it helps avoid the complexity of issuing callbacks while inside
// a critical section.
class nsPipeEvents {
 public:
  nsPipeEvents() {}
  ~nsPipeEvents();

  inline void NotifyInputReady(nsIAsyncInputStream* aStream,
                               nsIInputStreamCallback* aCallback) {
    mInputList.AppendElement(InputEntry(aStream, aCallback));
  }

  inline void NotifyOutputReady(nsIAsyncOutputStream* aStream,
                                nsIOutputStreamCallback* aCallback) {
    MOZ_DIAGNOSTIC_ASSERT(!mOutputCallback);
    mOutputStream = aStream;
    mOutputCallback = aCallback;
  }

 private:
  struct InputEntry {
    InputEntry(nsIAsyncInputStream* aStream, nsIInputStreamCallback* aCallback)
        : mStream(aStream), mCallback(aCallback) {
      MOZ_DIAGNOSTIC_ASSERT(mStream);
      MOZ_DIAGNOSTIC_ASSERT(mCallback);
    }

    nsCOMPtr<nsIAsyncInputStream> mStream;
    nsCOMPtr<nsIInputStreamCallback> mCallback;
  };

  nsTArray<InputEntry> mInputList;

  nsCOMPtr<nsIAsyncOutputStream> mOutputStream;
  nsCOMPtr<nsIOutputStreamCallback> mOutputCallback;
};

//-----------------------------------------------------------------------------

// This class is used to maintain input stream state.  Its broken out from the
// nsPipeInputStream class because generally the nsPipe should be modifying
// this state and not the input stream itself.
struct nsPipeReadState {
  nsPipeReadState()
      : mReadCursor(nullptr),
        mReadLimit(nullptr),
        mSegment(0),
        mAvailable(0),
        mActiveRead(false),
        mNeedDrain(false) {}

  char* mReadCursor;
  char* mReadLimit;
  int32_t mSegment;
  uint32_t mAvailable;

  // This flag is managed using the AutoReadSegment RAII stack class.
  bool mActiveRead;

  // Set to indicate that the input stream has closed and should be drained,
  // but that drain has been delayed due to an active read.  When the read
  // completes, this flag indicate the drain should then be performed.
  bool mNeedDrain;
};

//-----------------------------------------------------------------------------

// an input end of a pipe (maintained as a list of refs within the pipe)
class nsPipeInputStream final : public nsIAsyncInputStream,
                                public nsISeekableStream,
                                public nsISearchableInputStream,
                                public nsICloneableInputStream,
                                public nsIClassInfo,
                                public nsIBufferedInputStream {
 public:
  NS_DECL_THREADSAFE_ISUPPORTS
  NS_DECL_NSIINPUTSTREAM
  NS_DECL_NSIASYNCINPUTSTREAM
  NS_DECL_NSISEEKABLESTREAM
  NS_DECL_NSISEARCHABLEINPUTSTREAM
  NS_DECL_NSICLONEABLEINPUTSTREAM
  NS_DECL_NSICLASSINFO
  NS_DECL_NSIBUFFEREDINPUTSTREAM

  explicit nsPipeInputStream(nsPipe* aPipe)
      : mPipe(aPipe),
        mLogicalOffset(0),
        mInputStatus(NS_OK),
        mBlocking(true),
        mBlocked(false),
        mCallbackFlags(0) {}

  explicit nsPipeInputStream(const nsPipeInputStream& aOther)
      : mPipe(aOther.mPipe),
        mLogicalOffset(aOther.mLogicalOffset),
        mInputStatus(aOther.mInputStatus),
        mBlocking(aOther.mBlocking),
        mBlocked(false),
        mCallbackFlags(0),
        mReadState(aOther.mReadState) {}

  nsresult Fill();
  void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }

  uint32_t Available();

  // synchronously wait for the pipe to become readable.
  nsresult Wait();

  // These two don't acquire the monitor themselves.  Instead they
  // expect their caller to have done so and to pass the monitor as
  // evidence.
  MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&,
                                const ReentrantMonitorAutoEnter& ev);
  MonitorAction OnInputException(nsresult, nsPipeEvents&,
                                 const ReentrantMonitorAutoEnter& ev);

  nsPipeReadState& ReadState() { return mReadState; }

  const nsPipeReadState& ReadState() const { return mReadState; }

  nsresult Status() const;

  // A version of Status() that doesn't acquire the monitor.
  nsresult Status(const ReentrantMonitorAutoEnter& ev) const;

 private:
  virtual ~nsPipeInputStream();

  RefPtr<nsPipe> mPipe;

  int64_t mLogicalOffset;
  // Individual input streams can be closed without effecting the rest of the
  // pipe.  So track individual input stream status separately.  |mInputStatus|
  // is protected by |mPipe->mReentrantMonitor|.
  nsresult mInputStatus;
  bool mBlocking;

  // these variables can only be accessed while inside the pipe's monitor
  bool mBlocked;
  nsCOMPtr<nsIInputStreamCallback> mCallback;
  uint32_t mCallbackFlags;

  // requires pipe's monitor; usually treat as an opaque token to pass to nsPipe
  nsPipeReadState mReadState;
};

//-----------------------------------------------------------------------------

// the output end of a pipe (allocated as a member of the pipe).
class nsPipeOutputStream : public nsIAsyncOutputStream, public nsIClassInfo {
 public:
  // since this class will be allocated as a member of the pipe, we do not
  // need our own ref count.  instead, we share the lifetime (the ref count)
  // of the entire pipe.  this macro is just convenience since it does not
  // declare a mRefCount variable; however, don't let the name fool you...
  // we are not inheriting from nsPipe ;-)
  NS_DECL_ISUPPORTS_INHERITED

  NS_DECL_NSIOUTPUTSTREAM
  NS_DECL_NSIASYNCOUTPUTSTREAM
  NS_DECL_NSICLASSINFO

  explicit nsPipeOutputStream(nsPipe* aPipe)
      : mPipe(aPipe),
        mWriterRefCnt(0),
        mLogicalOffset(0),
        mBlocking(true),
        mBlocked(false),
        mWritable(true),
        mCallbackFlags(0) {}

  void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }
  void SetWritable(bool aWritable) { mWritable = aWritable; }

  // synchronously wait for the pipe to become writable.
  nsresult Wait();

  MonitorAction OnOutputWritable(nsPipeEvents&);
  MonitorAction OnOutputException(nsresult, nsPipeEvents&);

 private:
  nsPipe* mPipe;

  // separate refcnt so that we know when to close the producer
  mozilla::ThreadSafeAutoRefCnt mWriterRefCnt;
  int64_t mLogicalOffset;
  bool mBlocking;

  // these variables can only be accessed while inside the pipe's monitor
  bool mBlocked;
  bool mWritable;
  nsCOMPtr<nsIOutputStreamCallback> mCallback;
  uint32_t mCallbackFlags;
};

//-----------------------------------------------------------------------------

class nsPipe final : public nsIPipe {
 public:
  friend class nsPipeInputStream;
  friend class nsPipeOutputStream;
  friend class AutoReadSegment;

  NS_DECL_THREADSAFE_ISUPPORTS
  NS_DECL_NSIPIPE

  // nsPipe methods:
  nsPipe();

 private:
  ~nsPipe();

  //
  // Methods below may only be called while inside the pipe's monitor.  Some
  // of these methods require passing a ReentrantMonitorAutoEnter to prove the
  // monitor is held.
  //

  void PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
                   char*& aCursor, char*& aLimit);
  SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState,
                                         const ReentrantMonitorAutoEnter& ev);
  bool ReadSegmentBeingWritten(nsPipeReadState& aReadState);
  uint32_t CountSegmentReferences(int32_t aSegment);
  void SetAllNullReadCursors();
  bool AllReadCursorsMatchWriteCursor();
  void RollBackAllReadCursors(char* aWriteCursor);
  void UpdateAllReadCursors(char* aWriteCursor);
  void ValidateAllReadCursors();
  uint32_t GetBufferSegmentCount(const nsPipeReadState& aReadState,
                                 const ReentrantMonitorAutoEnter& ev) const;
  bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const;

  //
  // methods below may be called while outside the pipe's monitor
  //

  void DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents);
  nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
  void AdvanceWriteCursor(uint32_t aCount);

  void OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason);
  void OnPipeException(nsresult aReason, bool aOutputOnly = false);

  nsresult CloneInputStream(nsPipeInputStream* aOriginal,
                            nsIInputStream** aCloneOut);

  // methods below should only be called by AutoReadSegment
  nsresult GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment,
                          uint32_t& aLength);
  void ReleaseReadSegment(nsPipeReadState& aReadState, nsPipeEvents& aEvents);
  void AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount);

  // We can't inherit from both nsIInputStream and nsIOutputStream
  // because they collide on their Close method. Consequently we nest their
  // implementations to avoid the extra object allocation.
  nsPipeOutputStream mOutput;

  // Since the input stream can be cloned, we may have more than one.  Use
  // a weak reference as the streams will clear their entry here in their
  // destructor.  Using a strong reference would create a reference cycle.
  // Only usable while mReentrantMonitor is locked.
  nsTArray<nsPipeInputStream*> mInputList;

  // But hold a strong ref to our original input stream.  For backward
  // compatibility we need to be able to consistently return this same
  // object from GetInputStream().  Note, mOriginalInput is also stored
  // in mInputList as a weak ref.
  RefPtr<nsPipeInputStream> mOriginalInput;

  ReentrantMonitor mReentrantMonitor;
  nsSegmentedBuffer mBuffer;

  // The maximum number of segments to allow to be buffered in advance
  // of the fastest reader.  This is collection of segments is called
  // the "advance buffer".
  uint32_t mMaxAdvanceBufferSegmentCount;

  int32_t mWriteSegment;
  char* mWriteCursor;
  char* mWriteLimit;

  // |mStatus| is protected by |mReentrantMonitor|.
  nsresult mStatus;
  bool mInited;
};

//-----------------------------------------------------------------------------

// RAII class representing an active read segment.  When it goes out of scope
// it automatically updates the read cursor and releases the read segment.
class MOZ_STACK_CLASS AutoReadSegment final {
 public:
  AutoReadSegment(nsPipe* aPipe, nsPipeReadState& aReadState,
                  uint32_t aMaxLength)
      : mPipe(aPipe),
        mReadState(aReadState),
        mStatus(NS_ERROR_FAILURE),
        mSegment(nullptr),
        mLength(0),
        mOffset(0) {
    MOZ_DIAGNOSTIC_ASSERT(mPipe);
    MOZ_DIAGNOSTIC_ASSERT(!mReadState.mActiveRead);
    mStatus = mPipe->GetReadSegment(mReadState, mSegment, mLength);
    if (NS_SUCCEEDED(mStatus)) {
      MOZ_DIAGNOSTIC_ASSERT(mReadState.mActiveRead);
      MOZ_DIAGNOSTIC_ASSERT(mSegment);
      mLength = std::min(mLength, aMaxLength);
      MOZ_DIAGNOSTIC_ASSERT(mLength);
    }
  }

  ~AutoReadSegment() {
    if (NS_SUCCEEDED(mStatus)) {
      if (mOffset) {
        mPipe->AdvanceReadCursor(mReadState, mOffset);
      } else {
        nsPipeEvents events;
        mPipe->ReleaseReadSegment(mReadState, events);
      }
    }
    MOZ_DIAGNOSTIC_ASSERT(!mReadState.mActiveRead);
  }

  nsresult Status() const { return mStatus; }

  const char* Data() const {
    MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
    MOZ_DIAGNOSTIC_ASSERT(mSegment);
    return mSegment + mOffset;
  }

  uint32_t Length() const {
    MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
    MOZ_DIAGNOSTIC_ASSERT(mLength >= mOffset);
    return mLength - mOffset;
  }

  void Advance(uint32_t aCount) {
    MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus));
    MOZ_DIAGNOSTIC_ASSERT(aCount <= (mLength - mOffset));
    mOffset += aCount;
  }

  nsPipeReadState& ReadState() const { return mReadState; }

 private:
  // guaranteed to remain alive due to limited stack lifetime of AutoReadSegment
  nsPipe* mPipe;
  nsPipeReadState& mReadState;
  nsresult mStatus;
  const char* mSegment;
  uint32_t mLength;
  uint32_t mOffset;
};

//
// NOTES on buffer architecture:
//
//       +-----------------+ - - mBuffer.GetSegment(0)
//       |                 |
//       + - - - - - - - - + - - nsPipeReadState.mReadCursor
//       |/////////////////|
//       |/////////////////|
//       |/////////////////|
//       |/////////////////|
//       +-----------------+ - - nsPipeReadState.mReadLimit
//                |
//       +-----------------+
//       |/////////////////|
//       |/////////////////|
//       |/////////////////|
//       |/////////////////|
//       |/////////////////|
//       |/////////////////|
//       +-----------------+
//                |
//       +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
//       |/////////////////|
//       |/////////////////|
//       |/////////////////|
//       + - - - - - - - - + - - mWriteCursor
//       |                 |
//       |                 |
//       +-----------------+ - - mWriteLimit
//
// (shaded region contains data)
//
// NOTE: Each input stream produced by the nsPipe contains its own, separate
//       nsPipeReadState.  This means there are multiple mReadCursor and
//       mReadLimit values in play.  The pipe cannot discard old data until
//       all mReadCursors have moved beyond that point in the stream.
//
//       Likewise, each input stream reader will have it's own amount of
//       buffered data.  The pipe size threshold, however, is only applied
//       to the input stream that is being read fastest.  We call this
//       the "advance buffer" in that its in advance of all readers.  We
//       allow slower input streams to buffer more data so that we don't
//       stall processing of the faster input stream.
//
// NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
// small allocations (e.g., 64 byte allocations).  this means that buffers may
// be allocated back-to-back.  in the diagram above, for example, mReadLimit
// would actually be pointing at the beginning of the next segment.  when
// making changes to this file, please keep this fact in mind.
//

//-----------------------------------------------------------------------------
// nsPipe methods:
//-----------------------------------------------------------------------------

nsPipe::nsPipe()
    : mOutput(this),
      mOriginalInput(new nsPipeInputStream(this)),
      mReentrantMonitor("nsPipe.mReentrantMonitor"),
      mMaxAdvanceBufferSegmentCount(0),
      mWriteSegment(-1),
      mWriteCursor(nullptr),
      mWriteLimit(nullptr),
      mStatus(NS_OK),
      mInited(false) {
  mInputList.AppendElement(mOriginalInput);
}

nsPipe::~nsPipe() {}

NS_IMPL_ADDREF(nsPipe)
NS_IMPL_QUERY_INTERFACE(nsPipe, nsIPipe)

NS_IMETHODIMP_(MozExternalRefCountType)
nsPipe::Release() {
  MOZ_DIAGNOSTIC_ASSERT(int32_t(mRefCnt) > 0, "dup release");
  nsrefcnt count = --mRefCnt;
  NS_LOG_RELEASE(this, count, "nsPipe");
  if (count == 0) {
    delete (this);
    return 0;
  }
  // Avoid racing on |mOriginalInput| by only looking at it when
  // the refcount is 1, that is, we are the only pointer (hence only
  // thread) to access it.
  if (count == 1 && mOriginalInput) {
    mOriginalInput = nullptr;
    return 1;
  }
  return count;
}

NS_IMETHODIMP
nsPipe::Init(bool aNonBlockingIn, bool aNonBlockingOut, uint32_t aSegmentSize,
             uint32_t aSegmentCount) {
  mInited = true;

  if (aSegmentSize == 0) {
    aSegmentSize = DEFAULT_SEGMENT_SIZE;
  }
  if (aSegmentCount == 0) {
    aSegmentCount = DEFAULT_SEGMENT_COUNT;
  }

  // protect against overflow
  uint32_t maxCount = uint32_t(-1) / aSegmentSize;
  if (aSegmentCount > maxCount) {
    aSegmentCount = maxCount;
  }

  // The internal buffer is always "infinite" so that we can allow
  // the size to expand when cloned streams are read at different
  // rates.  We enforce a limit on how much data can be buffered
  // ahead of the fastest reader in GetWriteSegment().
  nsresult rv = mBuffer.Init(aSegmentSize, UINT32_MAX);
  if (NS_FAILED(rv)) {
    return rv;
  }

  mMaxAdvanceBufferSegmentCount = aSegmentCount;

  mOutput.SetNonBlocking(aNonBlockingOut);
  mOriginalInput->SetNonBlocking(aNonBlockingIn);

  return NS_OK;
}

NS_IMETHODIMP
nsPipe::GetInputStream(nsIAsyncInputStream** aInputStream) {
  if (NS_WARN_IF(!mInited)) {
    return NS_ERROR_NOT_INITIALIZED;
  }
  RefPtr<nsPipeInputStream> ref = mOriginalInput;
  ref.forget(aInputStream);
  return NS_OK;
}

NS_IMETHODIMP
nsPipe::GetOutputStream(nsIAsyncOutputStream** aOutputStream) {
  if (NS_WARN_IF(!mInited)) {
    return NS_ERROR_NOT_INITIALIZED;
  }
  NS_ADDREF(*aOutputStream = &mOutput);
  return NS_OK;
}

void nsPipe::PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
                         char*& aCursor, char*& aLimit) {
  if (aIndex == 0) {
    MOZ_DIAGNOSTIC_ASSERT(!aReadState.mReadCursor || mBuffer.GetSegmentCount());
    aCursor = aReadState.mReadCursor;
    aLimit = aReadState.mReadLimit;
  } else {
    uint32_t absoluteIndex = aReadState.mSegment + aIndex;
    uint32_t numSegments = mBuffer.GetSegmentCount();
    if (absoluteIndex >= numSegments) {
      aCursor = aLimit = nullptr;
    } else {
      aCursor = mBuffer.GetSegment(absoluteIndex);
      if (mWriteSegment == (int32_t)absoluteIndex) {
        aLimit = mWriteCursor;
      } else {
        aLimit = aCursor + mBuffer.GetSegmentSize();
      }
    }
  }
}

nsresult nsPipe::GetReadSegment(nsPipeReadState& aReadState,
                                const char*& aSegment, uint32_t& aLength) {
  ReentrantMonitorAutoEnter mon(mReentrantMonitor);

  if (aReadState.mReadCursor == aReadState.mReadLimit) {
    return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
  }

  // The input stream locks the pipe while getting the buffer to read from,
  // but then unlocks while actual data copying is taking place.  In
  // order to avoid deleting the buffer out from under this lockless read
  // set a flag to indicate a read is active.  This flag is only modified
  // while the lock is held.
  MOZ_DIAGNOSTIC_ASSERT(!aReadState.mActiveRead);
  aReadState.mActiveRead = true;

  aSegment = aReadState.mReadCursor;
  aLength = aReadState.mReadLimit - aReadState.mReadCursor;
  MOZ_DIAGNOSTIC_ASSERT(aLength <= aReadState.mAvailable);

  return NS_OK;
}

void nsPipe::ReleaseReadSegment(nsPipeReadState& aReadState,
                                nsPipeEvents& aEvents) {
  ReentrantMonitorAutoEnter mon(mReentrantMonitor);

  MOZ_DIAGNOSTIC_ASSERT(aReadState.mActiveRead);
  aReadState.mActiveRead = false;

  // When a read completes and releases the mActiveRead flag, we may have
  // blocked a drain from completing.  This occurs when the input stream is
  // closed during the read.  In these cases, we need to complete the drain as
  // soon as the active read completes.
  if (aReadState.mNeedDrain) {
    aReadState.mNeedDrain = false;
    DrainInputStream(aReadState, aEvents);
  }
}

void nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState,
                               uint32_t aBytesRead) {
  MOZ_DIAGNOSTIC_ASSERT(aBytesRead > 0);

  nsPipeEvents events;
  {
    ReentrantMonitorAutoEnter mon(mReentrantMonitor);

    LOG(("III advancing read cursor by %u\n", aBytesRead));
    MOZ_DIAGNOSTIC_ASSERT(aBytesRead <= mBuffer.GetSegmentSize());

    aReadState.mReadCursor += aBytesRead;
    MOZ_DIAGNOSTIC_ASSERT(aReadState.mReadCursor <= aReadState.mReadLimit);

    MOZ_DIAGNOSTIC_ASSERT(aReadState.mAvailable >= aBytesRead);
    aReadState.mAvailable -= aBytesRead;

    // Check to see if we're at the end of the available read data.  If we
    // are, and this segment is not still being written, then we can possibly
    // free up the segment.
    if (aReadState.mReadCursor == aReadState.mReadLimit &&
        !ReadSegmentBeingWritten(aReadState)) {
      // Advance the segment position.  If we have read any segments from the
      // advance buffer then we can potentially notify blocked writers.
      if (AdvanceReadSegment(aReadState, mon) == SegmentAdvanceBufferRead &&
          mOutput.OnOutputWritable(events) == NotifyMonitor) {
        mon.NotifyAll();
      }
    }

    ReleaseReadSegment(aReadState, events);
  }
}

SegmentChangeResult nsPipe::AdvanceReadSegment(
    nsPipeReadState& aReadState, const ReentrantMonitorAutoEnter& ev) {
  // Calculate how many segments are buffered for this stream to start.
  uint32_t startBufferSegments = GetBufferSegmentCount(aReadState, ev);

  int32_t currentSegment = aReadState.mSegment;

  // Move to the next segment to read
  aReadState.mSegment += 1;

  // If this was the last reference to the first segment, then remove it.
  if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
    // shift write and read segment index (-1 indicates an empty buffer).
    mWriteSegment -= 1;

    // Directly modify the current read state.  If the associated input
    // stream is closed simultaneous with reading, then it may not be
    // in the mInputList any more.
    aReadState.mSegment -= 1;

    for (uint32_t i = 0; i < mInputList.Length(); ++i) {
      // Skip the current read state structure since we modify it manually
      // before entering this loop.
      if (&mInputList[i]->ReadState() == &aReadState) {
        continue;
      }
      mInputList[i]->ReadState().mSegment -= 1;
    }

    // done with this segment
    mBuffer.DeleteFirstSegment();
    LOG(("III deleting first segment\n"));
  }

  if (mWriteSegment < aReadState.mSegment) {
    // read cursor has hit the end of written data, so reset it
    MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
    aReadState.mReadCursor = nullptr;
    aReadState.mReadLimit = nullptr;
    // also, the buffer is completely empty, so reset the write cursor
    if (mWriteSegment == -1) {
      mWriteCursor = nullptr;
      mWriteLimit = nullptr;
    }
  } else {
    // advance read cursor and limit to next buffer segment
    aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
    if (mWriteSegment == aReadState.mSegment) {
      aReadState.mReadLimit = mWriteCursor;
    } else {
      aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
    }
  }

  // Calculate how many segments are buffered for the stream after
  // reading.
  uint32_t endBufferSegments = GetBufferSegmentCount(aReadState, ev);

  // If the stream has read a segment out of the set of advanced buffer
  // segments, then the writer may advance.
  if (startBufferSegments >= mMaxAdvanceBufferSegmentCount &&
      endBufferSegments < mMaxAdvanceBufferSegmentCount) {
    return SegmentAdvanceBufferRead;
  }

  // Otherwise there are no significant changes to the segment structure.
  return SegmentNotChanged;
}

void nsPipe::DrainInputStream(nsPipeReadState& aReadState,
                              nsPipeEvents& aEvents) {
  ReentrantMonitorAutoEnter mon(mReentrantMonitor);

  // If a segment is actively being read in ReadSegments() for this input
  // stream, then we cannot drain the stream.  This can happen because
  // ReadSegments() does not hold the lock while copying from the buffer.
  // If we detect this condition, simply note that we need a drain once
  // the read completes and return immediately.
  if (aReadState.mActiveRead) {
    MOZ_DIAGNOSTIC_ASSERT(!aReadState.mNeedDrain);
    aReadState.mNeedDrain = true;
    return;
  }

  while (mWriteSegment >= aReadState.mSegment) {
    // If the last segment to free is still being written to, we're done
    // draining.  We can't free any more.
    if (ReadSegmentBeingWritten(aReadState)) {
      break;
    }

    // Don't bother checking if this results in an advance buffer segment
    // read.  Since we are draining the entire stream we will read an
    // advance buffer segment no matter what.
    AdvanceReadSegment(aReadState, mon);
  }

  // Force the stream into an empty state.  Make sure mAvailable, mCursor, and
  // mReadLimit are consistent with one another.
  aReadState.mAvailable = 0;
  aReadState.mReadCursor = nullptr;
  aReadState.mReadLimit = nullptr;

  // Remove the input stream from the pipe's list of streams.  This will
  // prevent the pipe from holding the stream alive or trying to update
  // its read state any further.
  DebugOnly<uint32_t> numRemoved = 0;
  mInputList.RemoveElementsBy([&](nsPipeInputStream* aEntry) {
    bool result = &aReadState == &aEntry->ReadState();
    numRemoved += result ? 1 : 0;
    return result;
  });
  MOZ_ASSERT(numRemoved == 1);

  // If we have read any segments from the advance buffer then we can
  // potentially notify blocked writers.
  if (!IsAdvanceBufferFull(mon) &&
      mOutput.OnOutputWritable(aEvents) == NotifyMonitor) {
    mon.NotifyAll();
  }
}

bool nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState) {
  mReentrantMonitor.AssertCurrentThreadIn();
  bool beingWritten =
      mWriteSegment == aReadState.mSegment && mWriteLimit > mWriteCursor;
  MOZ_DIAGNOSTIC_ASSERT(!beingWritten || aReadState.mReadLimit == mWriteCursor);
  return beingWritten;
}

nsresult nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen) {
  ReentrantMonitorAutoEnter mon(mReentrantMonitor);

  if (NS_FAILED(mStatus)) {
    return mStatus;
  }

  // write cursor and limit may both be null indicating an empty buffer.
  if (mWriteCursor == mWriteLimit) {
    // The pipe is full if we have hit our limit on advance data buffering.
    // This means the fastest reader is still reading slower than data is
    // being written into the pipe.
    if (IsAdvanceBufferFull(mon)) {
      return NS_BASE_STREAM_WOULD_BLOCK;
    }

    // The nsSegmentedBuffer is configured to be "infinite", so this
    // should never return nullptr here.
    char* seg = mBuffer.AppendNewSegment();
    if (!seg) {
      return NS_ERROR_OUT_OF_MEMORY;
    }

    LOG(("OOO appended new segment\n"));
    mWriteCursor = seg;
    mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
    ++mWriteSegment;
  }

  // make sure read cursor is initialized
  SetAllNullReadCursors();

  // check to see if we can roll-back our read and write cursors to the
  // beginning of the current/first segment.  this is purely an optimization.
  if (mWriteSegment == 0 && AllReadCursorsMatchWriteCursor()) {
    char* head = mBuffer.GetSegment(0);
    LOG(("OOO rolling back write cursor %" PRId64 " bytes\n",
         static_cast<int64_t>(mWriteCursor - head)));
    RollBackAllReadCursors(head);
    mWriteCursor = head;
  }

  aSegment = mWriteCursor;
  aSegmentLen = mWriteLimit - mWriteCursor;
  return NS_OK;
}

void nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten) {
  MOZ_DIAGNOSTIC_ASSERT(aBytesWritten > 0);

  nsPipeEvents events;
  {
    ReentrantMonitorAutoEnter mon(mReentrantMonitor);

    LOG(("OOO advancing write cursor by %u\n", aBytesWritten));

    char* newWriteCursor = mWriteCursor + aBytesWritten;
    MOZ_DIAGNOSTIC_ASSERT(newWriteCursor <= mWriteLimit);

    // update read limit if reading in the same segment
    UpdateAllReadCursors(newWriteCursor);

    mWriteCursor = newWriteCursor;

    ValidateAllReadCursors();

    // update the writable flag on the output stream
    if (mWriteCursor == mWriteLimit) {
      mOutput.SetWritable(!IsAdvanceBufferFull(mon));
    }

    // notify input stream that pipe now contains additional data
    bool needNotify = false;
    for (uint32_t i = 0; i < mInputList.Length(); ++i) {
      if (mInputList[i]->OnInputReadable(aBytesWritten, events, mon) ==
          NotifyMonitor) {
        needNotify = true;
      }
    }

    if (needNotify) {
      mon.NotifyAll();
    }
  }
}

void nsPipe::OnInputStreamException(nsPipeInputStream* aStream,
                                    nsresult aReason) {
  MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));

  nsPipeEvents events;
  {
    ReentrantMonitorAutoEnter mon(mReentrantMonitor);

    // Its possible to re-enter this method when we call OnPipeException() or
    // OnInputExection() below.  If there is a caller stuck in our synchronous
    // Wait() method, then they will get woken up with a failure code which
    // re-enters this method.  Therefore, gracefully handle unknown streams
    // here.

    // If we only have one stream open and it is the given stream, then shut
    // down the entire pipe.
    if (mInputList.Length() == 1) {
      if (mInputList[0] == aStream) {
        OnPipeException(aReason);
      }
      return;
    }

    // Otherwise just close the particular stream that hit an exception.
    for (uint32_t i = 0; i < mInputList.Length(); ++i) {
      if (mInputList[i] != aStream) {
        continue;
      }

      MonitorAction action =
          mInputList[i]->OnInputException(aReason, events, mon);

      // Notify after element is removed in case we re-enter as a result.
      if (action == NotifyMonitor) {
        mon.NotifyAll();
      }

      return;
    }
  }
}

void nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly) {
  LOG(("PPP nsPipe::OnPipeException [reason=%" PRIx32 " output-only=%d]\n",
       static_cast<uint32_t>(aReason), aOutputOnly));

  nsPipeEvents events;
  {
    ReentrantMonitorAutoEnter mon(mReentrantMonitor);

    // if we've already hit an exception, then ignore this one.
    if (NS_FAILED(mStatus)) {
      return;
    }

    mStatus = aReason;

    bool needNotify = false;

    // OnInputException() can drain the stream and remove it from
    // mInputList.  So iterate over a temp list instead.
    nsTArray<nsPipeInputStream*> list(mInputList);
    for (uint32_t i = 0; i < list.Length(); ++i) {
      // an output-only exception applies to the input end if the pipe has
      // zero bytes available.
      if (aOutputOnly && list[i]->Available()) {
        continue;
      }

      if (list[i]->OnInputException(aReason, events, mon) == NotifyMonitor) {
        needNotify = true;
      }
    }

    if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) {
      needNotify = true;
    }

    // Notify after we have removed any input streams from mInputList
    if (needNotify) {
      mon.NotifyAll();
    }
  }
}

nsresult nsPipe::CloneInputStream(nsPipeInputStream* aOriginal,
                                  nsIInputStream** aCloneOut) {
  ReentrantMonitorAutoEnter mon(mReentrantMonitor);
  RefPtr<nsPipeInputStream> ref = new nsPipeInputStream(*aOriginal);
  mInputList.AppendElement(ref);
  nsCOMPtr<nsIAsyncInputStream> downcast = ref.forget();
  downcast.forget(aCloneOut);
  return NS_OK;
}

uint32_t nsPipe::CountSegmentReferences(int32_t aSegment) {
  mReentrantMonitor.AssertCurrentThreadIn();
  uint32_t count = 0;
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
    if (aSegment >= mInputList[i]->ReadState().mSegment) {
      count += 1;
    }
  }
  return count;
}

void nsPipe::SetAllNullReadCursors() {
  mReentrantMonitor.AssertCurrentThreadIn();
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
    nsPipeReadState& readState = mInputList[i]->ReadState();
    if (!readState.mReadCursor) {
      MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == readState.mSegment);
      readState.mReadCursor = readState.mReadLimit = mWriteCursor;
    }
  }
}

bool nsPipe::AllReadCursorsMatchWriteCursor() {
  mReentrantMonitor.AssertCurrentThreadIn();
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
    const nsPipeReadState& readState = mInputList[i]->ReadState();
    if (readState.mSegment != mWriteSegment ||
        readState.mReadCursor != mWriteCursor) {
      return false;
    }
  }
  return true;
}

void nsPipe::RollBackAllReadCursors(char* aWriteCursor) {
  mReentrantMonitor.AssertCurrentThreadIn();
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
    nsPipeReadState& readState = mInputList[i]->ReadState();
    MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == readState.mSegment);
    MOZ_DIAGNOSTIC_ASSERT(mWriteCursor == readState.mReadCursor);
    MOZ_DIAGNOSTIC_ASSERT(mWriteCursor == readState.mReadLimit);
    readState.mReadCursor = aWriteCursor;
    readState.mReadLimit = aWriteCursor;
  }
}

void nsPipe::UpdateAllReadCursors(char* aWriteCursor) {
  mReentrantMonitor.AssertCurrentThreadIn();
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
    nsPipeReadState& readState = mInputList[i]->ReadState();
    if (mWriteSegment == readState.mSegment &&
        readState.mReadLimit == mWriteCursor) {
      readState.mReadLimit = aWriteCursor;
    }
  }
}

void nsPipe::ValidateAllReadCursors() {
  mReentrantMonitor.AssertCurrentThreadIn();
  // The only way mReadCursor == mWriteCursor is if:
  //
  // - mReadCursor is at the start of a segment (which, based on how
  //   nsSegmentedBuffer works, means that this segment is the "first"
  //   segment)
  // - mWriteCursor points at the location past the end of the current
  //   write segment (so the current write filled the current write
  //   segment, so we've incremented mWriteCursor to point past the end
  //   of it)
  // - the segment to which data has just been written is located
  //   exactly one segment's worth of bytes before the first segment
  //   where mReadCursor is located
  //
  // Consequently, the byte immediately after the end of the current
  // write segment is the first byte of the first segment, so
  // mReadCursor == mWriteCursor.  (Another way to think about this is
  // to consider the buffer architecture diagram above, but consider it
  // with an arena allocator which allocates from the *end* of the
  // arena to the *beginning* of the arena.)
#ifdef DEBUG
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
    const nsPipeReadState& state = mInputList[i]->ReadState();
    MOZ_ASSERT(state.mReadCursor != mWriteCursor ||
               (mBuffer.GetSegment(state.mSegment) == state.mReadCursor &&
                mWriteCursor == mWriteLimit));
  }
#endif
}

uint32_t nsPipe::GetBufferSegmentCount(
    const nsPipeReadState& aReadState,
    const ReentrantMonitorAutoEnter& ev) const {
  // The write segment can be smaller than the current reader position
  // in some cases.  For example, when the first write segment has not
  // been allocated yet mWriteSegment is negative.  In these cases
  // the stream is effectively using zero segments.
  if (mWriteSegment < aReadState.mSegment) {
    return 0;
  }

  MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= 0);
  MOZ_DIAGNOSTIC_ASSERT(aReadState.mSegment >= 0);

  // Otherwise at least one segment is being used.  We add one here
  // since a single segment is being used when the write and read
  // segment indices are the same.
  return 1 + mWriteSegment - aReadState.mSegment;
}

bool nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const {
  // If we have fewer total segments than the limit we can immediately
  // determine we are not full.  Note, we must add one to mWriteSegment
  // to convert from a index to a count.
  MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= -1);
  MOZ_DIAGNOSTIC_ASSERT(mWriteSegment < INT32_MAX);
  uint32_t totalWriteSegments = mWriteSegment + 1;
  if (totalWriteSegments < mMaxAdvanceBufferSegmentCount) {
    return false;
  }

  // Otherwise we must inspect all of our reader streams.  We need
  // to determine the buffer depth of the fastest reader.
  uint32_t minBufferSegments = UINT32_MAX;
  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
    // Only count buffer segments from input streams that are open.
    if (NS_FAILED(mInputList[i]->Status(ev))) {
      continue;
    }
    const nsPipeReadState& state = mInputList[i]->ReadState();
    uint32_t bufferSegments = GetBufferSegmentCount(state, ev);
    minBufferSegments = std::min(minBufferSegments, bufferSegments);
    // We only care if any reader has fewer segments buffered than
    // our threshold.  We can stop once we hit that threshold.
    if (minBufferSegments < mMaxAdvanceBufferSegmentCount) {
      return false;
    }
  }

  // Note, its possible for minBufferSegments to exceed our
  // mMaxAdvanceBufferSegmentCount here.  This happens when a cloned
  // reader gets far behind, but then the fastest reader stream is
  // closed.  This leaves us with a single stream that is buffered
  // beyond our max.  Naturally we continue to indicate the pipe
  // is full at this point.

  return true;
}

//-----------------------------------------------------------------------------
// nsPipeEvents methods:
//-----------------------------------------------------------------------------

nsPipeEvents::~nsPipeEvents() {
  // dispatch any pending events

  for (uint32_t i = 0; i < mInputList.Length(); ++i) {
    mInputList[i].mCallback->OnInputStreamReady(mInputList[i].mStream);
  }
  mInputList.Clear();

  if (mOutputCallback) {
    mOutputCallback->OnOutputStreamReady(mOutputStream);
    mOutputCallback = nullptr;
    mOutputStream = nullptr;
  }
}

//-----------------------------------------------------------------------------
// nsPipeInputStream methods:
//-----------------------------------------------------------------------------

NS_IMPL_ADDREF(nsPipeInputStream);
NS_IMPL_RELEASE(nsPipeInputStream);

NS_INTERFACE_TABLE_HEAD(nsPipeInputStream)
  NS_INTERFACE_TABLE_BEGIN
    NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIAsyncInputStream)
    NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISeekableStream)
    NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISearchableInputStream)
    NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsICloneableInputStream)
    NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIBufferedInputStream)
    NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIClassInfo)
    NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsIInputStream,
                                       nsIAsyncInputStream)
    NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsISupports,
                                       nsIAsyncInputStream)
  NS_INTERFACE_TABLE_END
NS_INTERFACE_TABLE_TAIL

NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream, nsIInputStream,
                            nsIAsyncInputStream, nsISeekableStream,
                            nsISearchableInputStream, nsICloneableInputStream,
                            nsIBufferedInputStream)

NS_IMPL_THREADSAFE_CI(nsPipeInputStream)

NS_IMETHODIMP
nsPipeInputStream::Init(nsIInputStream*, uint32_t) {
  MOZ_CRASH(
      "nsPipeInputStream should never be initialized with "
      "nsIBufferedInputStream::Init!\n");
}

NS_IMETHODIMP
nsPipeInputStream::GetData(nsIInputStream** aResult) {
  // as this was not created with init() we are not
  // wrapping anything
  return NS_ERROR_NOT_IMPLEMENTED;
}

uint32_t nsPipeInputStream::Available() {
  mPipe->mReentrantMonitor.AssertCurrentThreadIn();
  return mReadState.mAvailable;
}

nsresult nsPipeInputStream::Wait() {
  MOZ_DIAGNOSTIC_ASSERT(mBlocking);

  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);

  while (NS_SUCCEEDED(Status(mon)) && (mReadState.mAvailable == 0)) {
    LOG(("III pipe input: waiting for data\n"));

    mBlocked = true;
    mon.Wait();
    mBlocked = false;

    LOG(("III pipe input: woke up [status=%" PRIx32 " available=%u]\n",
         static_cast<uint32_t>(Status(mon)), mReadState.mAvailable));
  }

  return Status(mon) == NS_BASE_STREAM_CLOSED ? NS_OK : Status(mon);
}

MonitorAction nsPipeInputStream::OnInputReadable(
    uint32_t aBytesWritten, nsPipeEvents& aEvents,
    const ReentrantMonitorAutoEnter& ev) {
  MonitorAction result = DoNotNotifyMonitor;

  mPipe->mReentrantMonitor.AssertCurrentThreadIn();
  mReadState.mAvailable += aBytesWritten;

  if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
    aEvents.NotifyInputReady(this, mCallback);
    mCallback = nullptr;
    mCallbackFlags = 0;
  } else if (mBlocked) {
    result = NotifyMonitor;
  }

  return result;
}

MonitorAction nsPipeInputStream::OnInputException(
    nsresult aReason, nsPipeEvents& aEvents,
    const ReentrantMonitorAutoEnter& ev) {
  LOG(("nsPipeInputStream::OnInputException [this=%p reason=%" PRIx32 "]\n",
       this, static_cast<uint32_t>(aReason)));

  MonitorAction result = DoNotNotifyMonitor;

  MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));

  if (NS_SUCCEEDED(mInputStatus)) {
    mInputStatus = aReason;
  }

  // force count of available bytes to zero.
  mPipe->DrainInputStream(mReadState, aEvents);

  if (mCallback) {
    aEvents.NotifyInputReady(this, mCallback);
    mCallback = nullptr;
    mCallbackFlags = 0;
  } else if (mBlocked) {
    result = NotifyMonitor;
  }

  return result;
}

NS_IMETHODIMP
nsPipeInputStream::CloseWithStatus(nsresult aReason) {
  LOG(("III CloseWithStatus [this=%p reason=%" PRIx32 "]\n", this,
       static_cast<uint32_t>(aReason)));

  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);

  if (NS_FAILED(mInputStatus)) {
    return NS_OK;
  }

  if (NS_SUCCEEDED(aReason)) {
    aReason = NS_BASE_STREAM_CLOSED;
  }

  mPipe->OnInputStreamException(this, aReason);
  return NS_OK;
}

NS_IMETHODIMP
nsPipeInputStream::Close() { return CloseWithStatus(NS_BASE_STREAM_CLOSED); }

NS_IMETHODIMP
nsPipeInputStream::Available(uint64_t* aResult) {
  // nsPipeInputStream supports under 4GB stream only
  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);

  // return error if closed
  if (!mReadState.mAvailable && NS_FAILED(Status(mon))) {
    return Status(mon);
  }

  *aResult = (uint64_t)mReadState.mAvailable;
  return NS_OK;
}

NS_IMETHODIMP
nsPipeInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
                                uint32_t aCount, uint32_t* aReadCount) {
  LOG(("III ReadSegments [this=%p count=%u]\n", this, aCount));

  nsresult rv = NS_OK;

  *aReadCount = 0;
  while (aCount) {
    AutoReadSegment segment(mPipe, mReadState, aCount);
    rv = segment.Status();
    if (NS_FAILED(rv)) {
      // ignore this error if we've already read something.
      if (*aReadCount > 0) {
        rv = NS_OK;
        break;
      }
      if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
        // pipe is empty
        if (!mBlocking) {
          break;
        }
        // wait for some data to be written to the pipe
        rv = Wait();
        if (NS_SUCCEEDED(rv)) {
          continue;
        }
      }
      // ignore this error, just return.
      if (rv == NS_BASE_STREAM_CLOSED) {
        rv = NS_OK;
        break;
      }
      mPipe->OnInputStreamException(this, rv);
      break;
    }

    uint32_t writeCount;
    while (segment.Length()) {
      writeCount = 0;

      rv = aWriter(static_cast<nsIAsyncInputStream*>(this), aClosure,
                   segment.Data(), *aReadCount, segment.Length(), &writeCount);

      if (NS_FAILED(rv) || writeCount == 0) {
        aCount = 0;
        // any errors returned from the writer end here: do not
        // propagate to the caller of ReadSegments.
        rv = NS_OK;
        break;
      }

      MOZ_DIAGNOSTIC_ASSERT(writeCount <= segment.Length());
      segment.Advance(writeCount);
      aCount -= writeCount;
      *aReadCount += writeCount;
      mLogicalOffset += writeCount;
    }
  }

  return rv;
}

NS_IMETHODIMP
nsPipeInputStream::Read(char* aToBuf, uint32_t aBufLen, uint32_t* aReadCount) {
  return ReadSegments(NS_CopySegmentToBuffer, aToBuf, aBufLen, aReadCount);
}

NS_IMETHODIMP
nsPipeInputStream::IsNonBlocking(bool* aNonBlocking) {
  *aNonBlocking = !mBlocking;
  return NS_OK;
}

NS_IMETHODIMP
nsPipeInputStream::AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
                             uint32_t aRequestedCount,
                             nsIEventTarget* aTarget) {
  LOG(("III AsyncWait [this=%p]\n", this));

  nsPipeEvents pipeEvents;
  {
    ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);

    // replace a pending callback
    mCallback = nullptr;
    mCallbackFlags = 0;

    if (!aCallback) {
      return NS_OK;
    }

    nsCOMPtr<nsIInputStreamCallback> proxy;
    if (aTarget) {
      proxy = NS_NewInputStreamReadyEvent("nsPipeInputStream::AsyncWait",
                                          aCallback, aTarget);
      aCallback = proxy;
    }

    if (NS_FAILED(Status(mon)) ||
        (mReadState.mAvailable && !(aFlags & WAIT_CLOSURE_ONLY))) {
      // stream is already closed or readable; post event.
      pipeEvents.NotifyInputReady(this, aCallback);
    } else {
      // queue up callback object to be notified when data becomes available
      mCallback = aCallback;
      mCallbackFlags = aFlags;
    }
  }
  return NS_OK;
}

NS_IMETHODIMP
nsPipeInputStream::Seek(int32_t aWhence, int64_t aOffset) {
  NS_NOTREACHED("nsPipeInputStream::Seek");
  return NS_ERROR_NOT_IMPLEMENTED;
}

NS_IMETHODIMP
nsPipeInputStream::Tell(int64_t* aOffset) {
  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);

  // return error if closed
  if (!mReadState.mAvailable && NS_FAILED(Status(mon))) {
    return Status(mon);
  }

  *aOffset = mLogicalOffset;
  return NS_OK;
}

NS_IMETHODIMP
nsPipeInputStream::SetEOF() {
  NS_NOTREACHED("nsPipeInputStream::SetEOF");
  return NS_ERROR_NOT_IMPLEMENTED;
}

static bool strings_equal(bool aIgnoreCase, const char* aS1, const char* aS2,
                          uint32_t aLen) {
  return aIgnoreCase ? !nsCRT::strncasecmp(aS1, aS2, aLen)
                     : !strncmp(aS1, aS2, aLen);
}

NS_IMETHODIMP
nsPipeInputStream::Search(const char* aForString, bool aIgnoreCase,
                          bool* aFound, uint32_t* aOffsetSearchedTo) {
  LOG(("III Search [for=%s ic=%u]\n", aForString, aIgnoreCase));

  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);

  char* cursor1;
  char* limit1;
  uint32_t index = 0, offset = 0;
  uint32_t strLen = strlen(aForString);

  mPipe->PeekSegment(mReadState, 0, cursor1, limit1);
  if (cursor1 == limit1) {
    *aFound = false;
    *aOffsetSearchedTo = 0;
    LOG(("  result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
    return NS_OK;
  }

  while (true) {
    uint32_t i, len1 = limit1 - cursor1;

    // check if the string is in the buffer segment
    for (i = 0; i < len1 - strLen + 1; i++) {
      if (strings_equal(aIgnoreCase, &cursor1[i], aForString, strLen)) {
        *aFound = true;
        *aOffsetSearchedTo = offset + i;
        LOG(("  result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
        return NS_OK;
      }
    }

    // get the next segment
    char* cursor2;
    char* limit2;
    uint32_t len2;

    index++;
    offset += len1;

    mPipe->PeekSegment(mReadState, index, cursor2, limit2);
    if (cursor2 == limit2) {
      *aFound = false;
      *aOffsetSearchedTo = offset - strLen + 1;
      LOG(("  result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
      return NS_OK;
    }
    len2 = limit2 - cursor2;

    // check if the string is straddling the next buffer segment
    uint32_t lim = XPCOM_MIN(strLen, len2 + 1);
    for (i = 0; i < lim; ++i) {
      uint32_t strPart1Len = strLen - i - 1;
      uint32_t strPart2Len = strLen - strPart1Len;
      const char* strPart2 = &aForString[strLen - strPart2Len];
      uint32_t bufSeg1Offset = len1 - strPart1Len;
      if (strings_equal(aIgnoreCase, &cursor1[bufSeg1Offset], aForString,
                        strPart1Len) &&
          strings_equal(aIgnoreCase, cursor2, strPart2, strPart2Len)) {
        *aFound = true;
        *aOffsetSearchedTo = offset - strPart1Len;
        LOG(("  result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
        return NS_OK;
      }
    }

    // finally continue with the next buffer
    cursor1 = cursor2;
    limit1 = limit2;
  }

  NS_NOTREACHED("can't get here");
  return NS_ERROR_UNEXPECTED;  // keep compiler happy
}

NS_IMETHODIMP
nsPipeInputStream::GetCloneable(bool* aCloneableOut) {
  *aCloneableOut = true;
  return NS_OK;
}

NS_IMETHODIMP
nsPipeInputStream::Clone(nsIInputStream** aCloneOut) {
  return mPipe->CloneInputStream(this, aCloneOut);
}

nsresult nsPipeInputStream::Status(const ReentrantMonitorAutoEnter& ev) const {
  if (NS_FAILED(mInputStatus)) {
    return mInputStatus;
  }

  if (mReadState.mAvailable) {
    // Still something to read and this input stream state is OK.
    return NS_OK;
  }

  // Nothing to read, just fall through to the pipe's state that
  // may reflect state of its output stream side (already closed).
  return mPipe->mStatus;
}

nsresult nsPipeInputStream::Status() const {
  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
  return Status(mon);
}

nsPipeInputStream::~nsPipeInputStream() { Close(); }

//-----------------------------------------------------------------------------
// nsPipeOutputStream methods:
//-----------------------------------------------------------------------------

NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream, nsIOutputStream,
                        nsIAsyncOutputStream, nsIClassInfo)

NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream, nsIOutputStream,
                            nsIAsyncOutputStream)

NS_IMPL_THREADSAFE_CI(nsPipeOutputStream)

nsresult nsPipeOutputStream::Wait() {
  MOZ_DIAGNOSTIC_ASSERT(mBlocking);

  ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);

  if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) {
    LOG(("OOO pipe output: waiting for space\n"));
    mBlocked = true;
    mon.Wait();
    mBlocked = false;
    LOG(("OOO pipe output: woke up [pipe-status=%" PRIx32 " writable=%u]\n",
         static_cast<uint32_t>(mPipe->mStatus), mWritable));
  }

  return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
}

MonitorAction nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents) {
  MonitorAction result = DoNotNotifyMonitor;

  mWritable = true;

  if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
    aEvents.NotifyOutputReady(this, mCallback);
    mCallback = nullptr;
    mCallbackFlags = 0;
  } else if (mBlocked) {
    result = NotifyMonitor;
  }

  return result;
}

MonitorAction nsPipeOutputStream::OnOutputException(nsresult aReason,
                                                    nsPipeEvents& aEvents) {
  LOG(("nsPipeOutputStream::OnOutputException [this=%p reason=%" PRIx32 "]\n",
       this, static_cast<uint32_t>(aReason)));

  MonitorAction result = DoNotNotifyMonitor;

  MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason));
  mWritable = false;

  if (mCallback) {
    aEvents.NotifyOutputReady(this, mCallback);
    mCallback = nullptr;
    mCallbackFlags = 0;
  } else if (mBlocked) {
    result = NotifyMonitor;
  }

  return result;
}

NS_IMETHODIMP_(MozExternalRefCountType)
nsPipeOutputStream::AddRef() {
  ++mWriterRefCnt;
  return mPipe->AddRef();
}

NS_IMETHODIMP_(MozExternalRefCountType)
nsPipeOutputStream::Release() {
  if (--mWriterRefCnt == 0) {
    Close();
  }
  return mPipe->Release();
}

NS_IMETHODIMP
nsPipeOutputStream::CloseWithStatus(nsresult aReason) {
  LOG(("OOO CloseWithStatus [this=%p reason=%" PRIx32 "]\n", this,
       static_cast<uint32_t>(aReason)));

  if (NS_SUCCEEDED(aReason)) {
    aReason = NS_BASE_STREAM_CLOSED;
  }

  // input stream may remain open
  mPipe->OnPipeException(aReason, true);
  return NS_OK;
}

NS_IMETHODIMP
nsPipeOutputStream::Close() { return CloseWithStatus(NS_BASE_STREAM_CLOSED); }

NS_IMETHODIMP
nsPipeOutputStream::WriteSegments(nsReadSegmentFun aReader, void* aClosure,
                                  uint32_t aCount, uint32_t* aWriteCount) {
  LOG(("OOO WriteSegments [this=%p count=%u]\n", this, aCount));

  nsresult rv = NS_OK;

  char* segment;
  uint32_t segmentLen;

  *aWriteCount = 0;
  while (aCount) {
    rv = mPipe->GetWriteSegment(segment, segmentLen);
    if (NS_FAILED(rv)) {
      if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
        // pipe is full
        if (!mBlocking) {
          // ignore this error if we've already written something
          if (*aWriteCount > 0) {
            rv = NS_OK;
          }
          break;
        }
        // wait for the pipe to have an empty segment.
        rv = Wait();
        if (NS_SUCCEEDED(rv)) {
          continue;
        }
      }
      mPipe->OnPipeException(rv);
      break;
    }

    // write no more than aCount
    if (segmentLen > aCount) {
      segmentLen = aCount;
    }

    uint32_t readCount, originalLen = segmentLen;
    while (segmentLen) {
      readCount = 0;

      rv = aReader(this, aClosure, segment, *aWriteCount, segmentLen,
                   &readCount);

      if (NS_FAILED(rv) || readCount == 0) {
        aCount = 0;
        // any errors returned from the aReader end here: do not
        // propagate to the caller of WriteSegments.
        rv = NS_OK;
        break;
      }

      MOZ_DIAGNOSTIC_ASSERT(readCount <= segmentLen);
      segment += readCount;
      segmentLen -= readCount;
      aCount -= readCount;
      *aWriteCount += readCount;
      mLogicalOffset += readCount;
    }

    if (segmentLen < originalLen) {
      mPipe->AdvanceWriteCursor(originalLen - segmentLen);
    }
  }

  return rv;
}

static nsresult nsReadFromRawBuffer(nsIOutputStream* aOutStr, void* aClosure,
                                    char* aToRawSegment, uint32_t aOffset,
                                    uint32_t aCount, uint32_t* aReadCount) {
  const char* fromBuf = (const char*)aClosure;
  memcpy(aToRawSegment, &fromBuf[aOffset], aCount);
  *aReadCount = aCount;
  return NS_OK;
}

NS_IMETHODIMP
nsPipeOutputStream::Write(const char* aFromBuf, uint32_t aBufLen,
                          uint32_t* aWriteCount) {
  return WriteSegments(nsReadFromRawBuffer, (void*)aFromBuf, aBufLen,
                       aWriteCount);
}

NS_IMETHODIMP
nsPipeOutputStream::Flush(void) {
  // nothing to do
  return NS_OK;
}

static nsresult nsReadFromInputStream(nsIOutputStream* aOutStr, void* aClosure,
                                      char* aToRawSegment, uint32_t aOffset,
                                      uint32_t aCount, uint32_t* aReadCount) {
  nsIInputStream* fromStream = (nsIInputStream*)aClosure;
  return fromStream->Read(aToRawSegment, aCount, aReadCount);
}

NS_IMETHODIMP
nsPipeOutputStream::WriteFrom(nsIInputStream* aFromStream, uint32_t aCount,
                              uint32_t* aWriteCount) {
  return WriteSegments(nsReadFromInputStream, aFromStream, aCount, aWriteCount);
}

NS_IMETHODIMP
nsPipeOutputStream::IsNonBlocking(bool* aNonBlocking) {
  *aNonBlocking = !mBlocking;
  return NS_OK;
}

NS_IMETHODIMP
nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback* aCallback,
                              uint32_t aFlags, uint32_t aRequestedCount,
                              nsIEventTarget* aTarget) {
  LOG(("OOO AsyncWait [this=%p]\n", this));

  nsPipeEvents pipeEvents;
  {
    ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);

    // replace a pending callback
    mCallback = nullptr;
    mCallbackFlags = 0;

    if (!aCallback) {
      return NS_OK;
    }

    nsCOMPtr<nsIOutputStreamCallback> proxy;
    if (aTarget) {
      proxy = NS_NewOutputStreamReadyEvent(aCallback, aTarget);
      aCallback = proxy;
    }

    if (NS_FAILED(mPipe->mStatus) ||
        (mWritable && !(aFlags & WAIT_CLOSURE_ONLY))) {
      // stream is already closed or writable; post event.
      pipeEvents.NotifyOutputReady(this, aCallback);
    } else {
      // queue up callback object to be notified when data becomes available
      mCallback = aCallback;
      mCallbackFlags = aFlags;
    }
  }
  return NS_OK;
}

////////////////////////////////////////////////////////////////////////////////

nsresult NS_NewPipe(nsIInputStream** aPipeIn, nsIOutputStream** aPipeOut,
                    uint32_t aSegmentSize, uint32_t aMaxSize,
                    bool aNonBlockingInput, bool aNonBlockingOutput) {
  if (aSegmentSize == 0) {
    aSegmentSize = DEFAULT_SEGMENT_SIZE;
  }

  // Handle aMaxSize of UINT32_MAX as a special case
  uint32_t segmentCount;
  if (aMaxSize == UINT32_MAX) {
    segmentCount = UINT32_MAX;
  } else {
    segmentCount = aMaxSize / aSegmentSize;
  }

  nsIAsyncInputStream* in;
  nsIAsyncOutputStream* out;
  nsresult rv = NS_NewPipe2(&in, &out, aNonBlockingInput, aNonBlockingOutput,
                            aSegmentSize, segmentCount);
  if (NS_FAILED(rv)) {
    return rv;
  }

  *aPipeIn = in;
  *aPipeOut = out;
  return NS_OK;
}

nsresult NS_NewPipe2(nsIAsyncInputStream** aPipeIn,
                     nsIAsyncOutputStream** aPipeOut, bool aNonBlockingInput,
                     bool aNonBlockingOutput, uint32_t aSegmentSize,
                     uint32_t aSegmentCount) {
  nsPipe* pipe = new nsPipe();
  nsresult rv = pipe->Init(aNonBlockingInput, aNonBlockingOutput, aSegmentSize,
                           aSegmentCount);
  if (NS_FAILED(rv)) {
    NS_ADDREF(pipe);
    NS_RELEASE(pipe);
    return rv;
  }

  // These always succeed because the pipe is initialized above.
  MOZ_ALWAYS_SUCCEEDS(pipe->GetInputStream(aPipeIn));
  MOZ_ALWAYS_SUCCEEDS(pipe->GetOutputStream(aPipeOut));
  return NS_OK;
}

nsresult nsPipeConstructor(nsISupports* aOuter, REFNSIID aIID, void** aResult) {
  if (aOuter) {
    return NS_ERROR_NO_AGGREGATION;
  }
  nsPipe* pipe = new nsPipe();
  NS_ADDREF(pipe);
  nsresult rv = pipe->QueryInterface(aIID, aResult);
  NS_RELEASE(pipe);
  return rv;
}

////////////////////////////////////////////////////////////////////////////////