Blame xpcom/io/nsStreamUtils.cpp

Packit f0b94e
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
Packit f0b94e
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
Packit f0b94e
/* This Source Code Form is subject to the terms of the Mozilla Public
Packit f0b94e
 * License, v. 2.0. If a copy of the MPL was not distributed with this
Packit f0b94e
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
Packit f0b94e
Packit f0b94e
#include "mozilla/Mutex.h"
Packit f0b94e
#include "mozilla/Attributes.h"
Packit f0b94e
#include "nsStreamUtils.h"
Packit f0b94e
#include "nsAutoPtr.h"
Packit f0b94e
#include "nsCOMPtr.h"
Packit f0b94e
#include "nsIPipe.h"
Packit f0b94e
#include "nsICloneableInputStream.h"
Packit f0b94e
#include "nsIEventTarget.h"
Packit f0b94e
#include "nsICancelableRunnable.h"
Packit f0b94e
#include "nsISafeOutputStream.h"
Packit f0b94e
#include "nsString.h"
Packit f0b94e
#include "nsIAsyncInputStream.h"
Packit f0b94e
#include "nsIAsyncOutputStream.h"
Packit f0b94e
#include "nsIBufferedStreams.h"
Packit f0b94e
#include "nsNetCID.h"
Packit f0b94e
#include "nsServiceManagerUtils.h"
Packit f0b94e
#include "nsThreadUtils.h"
Packit f0b94e
#include "nsITransport.h"
Packit f0b94e
#include "nsIStreamTransportService.h"
Packit f0b94e
#include "NonBlockingAsyncInputStream.h"
Packit f0b94e
Packit f0b94e
using namespace mozilla;
Packit f0b94e
Packit f0b94e
static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
Packit f0b94e
Packit f0b94e
//-----------------------------------------------------------------------------
Packit f0b94e
Packit f0b94e
// This is a nsICancelableRunnable because we can dispatch it to Workers and
Packit f0b94e
// those can be shut down at any time, and in these cases, Cancel() is called
Packit f0b94e
// instead of Run().
Packit f0b94e
class nsInputStreamReadyEvent final : public CancelableRunnable,
Packit f0b94e
                                      public nsIInputStreamCallback {
Packit f0b94e
 public:
Packit f0b94e
  NS_DECL_ISUPPORTS_INHERITED
Packit f0b94e
Packit f0b94e
  nsInputStreamReadyEvent(const char* aName, nsIInputStreamCallback* aCallback,
Packit f0b94e
                          nsIEventTarget* aTarget)
Packit f0b94e
      : CancelableRunnable(aName), mCallback(aCallback), mTarget(aTarget) {}
Packit f0b94e
Packit f0b94e
 private:
Packit f0b94e
  ~nsInputStreamReadyEvent() {
Packit f0b94e
    if (!mCallback) {
Packit f0b94e
      return;
Packit f0b94e
    }
Packit f0b94e
    //
Packit f0b94e
    // whoa!!  looks like we never posted this event.  take care to
Packit f0b94e
    // release mCallback on the correct thread.  if mTarget lives on the
Packit f0b94e
    // calling thread, then we are ok.  otherwise, we have to try to
Packit f0b94e
    // proxy the Release over the right thread.  if that thread is dead,
Packit f0b94e
    // then there's nothing we can do... better to leak than crash.
Packit f0b94e
    //
Packit f0b94e
    bool val;
Packit f0b94e
    nsresult rv = mTarget->IsOnCurrentThread(&val;;
Packit f0b94e
    if (NS_FAILED(rv) || !val) {
Packit f0b94e
      nsCOMPtr<nsIInputStreamCallback> event = NS_NewInputStreamReadyEvent(
Packit f0b94e
          "~nsInputStreamReadyEvent", mCallback, mTarget);
Packit f0b94e
      mCallback = nullptr;
Packit f0b94e
      if (event) {
Packit f0b94e
        rv = event->OnInputStreamReady(nullptr);
Packit f0b94e
        if (NS_FAILED(rv)) {
Packit f0b94e
          NS_NOTREACHED("leaking stream event");
Packit f0b94e
          nsISupports* sup = event;
Packit f0b94e
          NS_ADDREF(sup);
Packit f0b94e
        }
Packit f0b94e
      }
Packit f0b94e
    }
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
 public:
Packit f0b94e
  NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override {
Packit f0b94e
    mStream = aStream;
Packit f0b94e
Packit f0b94e
    nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
Packit f0b94e
    if (NS_FAILED(rv)) {
Packit f0b94e
      NS_WARNING("Dispatch failed");
Packit f0b94e
      return NS_ERROR_FAILURE;
Packit f0b94e
    }
Packit f0b94e
Packit f0b94e
    return NS_OK;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  NS_IMETHOD Run() override {
Packit f0b94e
    if (mCallback) {
Packit f0b94e
      if (mStream) {
Packit f0b94e
        mCallback->OnInputStreamReady(mStream);
Packit f0b94e
      }
Packit f0b94e
      mCallback = nullptr;
Packit f0b94e
    }
Packit f0b94e
    return NS_OK;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsresult Cancel() override {
Packit f0b94e
    mCallback = nullptr;
Packit f0b94e
    return NS_OK;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
 private:
Packit f0b94e
  nsCOMPtr<nsIAsyncInputStream> mStream;
Packit f0b94e
  nsCOMPtr<nsIInputStreamCallback> mCallback;
Packit f0b94e
  nsCOMPtr<nsIEventTarget> mTarget;
Packit f0b94e
};
Packit f0b94e
Packit f0b94e
NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent, CancelableRunnable,
Packit f0b94e
                            nsIInputStreamCallback)
Packit f0b94e
Packit f0b94e
//-----------------------------------------------------------------------------
Packit f0b94e
Packit f0b94e
// This is a nsICancelableRunnable because we can dispatch it to Workers and
Packit f0b94e
// those can be shut down at any time, and in these cases, Cancel() is called
Packit f0b94e
// instead of Run().
Packit f0b94e
class nsOutputStreamReadyEvent final : public CancelableRunnable,
Packit f0b94e
                                       public nsIOutputStreamCallback {
Packit f0b94e
 public:
Packit f0b94e
  NS_DECL_ISUPPORTS_INHERITED
Packit f0b94e
Packit f0b94e
  nsOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
Packit f0b94e
                           nsIEventTarget* aTarget)
Packit f0b94e
      : CancelableRunnable("nsOutputStreamReadyEvent"),
Packit f0b94e
        mCallback(aCallback),
Packit f0b94e
        mTarget(aTarget) {}
Packit f0b94e
Packit f0b94e
 private:
Packit f0b94e
  ~nsOutputStreamReadyEvent() {
Packit f0b94e
    if (!mCallback) {
Packit f0b94e
      return;
Packit f0b94e
    }
Packit f0b94e
    //
Packit f0b94e
    // whoa!!  looks like we never posted this event.  take care to
Packit f0b94e
    // release mCallback on the correct thread.  if mTarget lives on the
Packit f0b94e
    // calling thread, then we are ok.  otherwise, we have to try to
Packit f0b94e
    // proxy the Release over the right thread.  if that thread is dead,
Packit f0b94e
    // then there's nothing we can do... better to leak than crash.
Packit f0b94e
    //
Packit f0b94e
    bool val;
Packit f0b94e
    nsresult rv = mTarget->IsOnCurrentThread(&val;;
Packit f0b94e
    if (NS_FAILED(rv) || !val) {
Packit f0b94e
      nsCOMPtr<nsIOutputStreamCallback> event =
Packit f0b94e
          NS_NewOutputStreamReadyEvent(mCallback, mTarget);
Packit f0b94e
      mCallback = nullptr;
Packit f0b94e
      if (event) {
Packit f0b94e
        rv = event->OnOutputStreamReady(nullptr);
Packit f0b94e
        if (NS_FAILED(rv)) {
Packit f0b94e
          NS_NOTREACHED("leaking stream event");
Packit f0b94e
          nsISupports* sup = event;
Packit f0b94e
          NS_ADDREF(sup);
Packit f0b94e
        }
Packit f0b94e
      }
Packit f0b94e
    }
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
 public:
Packit f0b94e
  NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override {
Packit f0b94e
    mStream = aStream;
Packit f0b94e
Packit f0b94e
    nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
Packit f0b94e
    if (NS_FAILED(rv)) {
Packit f0b94e
      NS_WARNING("PostEvent failed");
Packit f0b94e
      return NS_ERROR_FAILURE;
Packit f0b94e
    }
Packit f0b94e
Packit f0b94e
    return NS_OK;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  NS_IMETHOD Run() override {
Packit f0b94e
    if (mCallback) {
Packit f0b94e
      if (mStream) {
Packit f0b94e
        mCallback->OnOutputStreamReady(mStream);
Packit f0b94e
      }
Packit f0b94e
      mCallback = nullptr;
Packit f0b94e
    }
Packit f0b94e
    return NS_OK;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsresult Cancel() override {
Packit f0b94e
    mCallback = nullptr;
Packit f0b94e
    return NS_OK;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
 private:
Packit f0b94e
  nsCOMPtr<nsIAsyncOutputStream> mStream;
Packit f0b94e
  nsCOMPtr<nsIOutputStreamCallback> mCallback;
Packit f0b94e
  nsCOMPtr<nsIEventTarget> mTarget;
Packit f0b94e
};
Packit f0b94e
Packit f0b94e
NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent, CancelableRunnable,
Packit f0b94e
                            nsIOutputStreamCallback)
Packit f0b94e
Packit f0b94e
//-----------------------------------------------------------------------------
Packit f0b94e
Packit f0b94e
already_AddRefed<nsIInputStreamCallback> NS_NewInputStreamReadyEvent(
Packit f0b94e
    const char* aName, nsIInputStreamCallback* aCallback,
Packit f0b94e
    nsIEventTarget* aTarget) {
Packit f0b94e
  NS_ASSERTION(aCallback, "null callback");
Packit f0b94e
  NS_ASSERTION(aTarget, "null target");
Packit f0b94e
  RefPtr<nsInputStreamReadyEvent> ev =
Packit f0b94e
      new nsInputStreamReadyEvent(aName, aCallback, aTarget);
Packit f0b94e
  return ev.forget();
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
already_AddRefed<nsIOutputStreamCallback> NS_NewOutputStreamReadyEvent(
Packit f0b94e
    nsIOutputStreamCallback* aCallback, nsIEventTarget* aTarget) {
Packit f0b94e
  NS_ASSERTION(aCallback, "null callback");
Packit f0b94e
  NS_ASSERTION(aTarget, "null target");
Packit f0b94e
  RefPtr<nsOutputStreamReadyEvent> ev =
Packit f0b94e
      new nsOutputStreamReadyEvent(aCallback, aTarget);
Packit f0b94e
  return ev.forget();
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
//-----------------------------------------------------------------------------
Packit f0b94e
// NS_AsyncCopy implementation
Packit f0b94e
Packit f0b94e
// abstract stream copier...
Packit f0b94e
class nsAStreamCopier : public nsIInputStreamCallback,
Packit f0b94e
                        public nsIOutputStreamCallback,
Packit f0b94e
                        public CancelableRunnable {
Packit f0b94e
 public:
Packit f0b94e
  NS_DECL_ISUPPORTS_INHERITED
Packit f0b94e
Packit f0b94e
  nsAStreamCopier()
Packit f0b94e
      : CancelableRunnable("nsAStreamCopier"),
Packit f0b94e
        mLock("nsAStreamCopier.mLock"),
Packit f0b94e
        mCallback(nullptr),
Packit f0b94e
        mProgressCallback(nullptr),
Packit f0b94e
        mClosure(nullptr),
Packit f0b94e
        mChunkSize(0),
Packit f0b94e
        mEventInProcess(false),
Packit f0b94e
        mEventIsPending(false),
Packit f0b94e
        mCloseSource(true),
Packit f0b94e
        mCloseSink(true),
Packit f0b94e
        mCanceled(false),
Packit f0b94e
        mCancelStatus(NS_OK) {}
Packit f0b94e
Packit f0b94e
  // kick off the async copy...
Packit f0b94e
  nsresult Start(nsIInputStream* aSource, nsIOutputStream* aSink,
Packit f0b94e
                 nsIEventTarget* aTarget, nsAsyncCopyCallbackFun aCallback,
Packit f0b94e
                 void* aClosure, uint32_t aChunksize, bool aCloseSource,
Packit f0b94e
                 bool aCloseSink, nsAsyncCopyProgressFun aProgressCallback) {
Packit f0b94e
    mSource = aSource;
Packit f0b94e
    mSink = aSink;
Packit f0b94e
    mTarget = aTarget;
Packit f0b94e
    mCallback = aCallback;
Packit f0b94e
    mClosure = aClosure;
Packit f0b94e
    mChunkSize = aChunksize;
Packit f0b94e
    mCloseSource = aCloseSource;
Packit f0b94e
    mCloseSink = aCloseSink;
Packit f0b94e
    mProgressCallback = aProgressCallback;
Packit f0b94e
Packit f0b94e
    mAsyncSource = do_QueryInterface(mSource);
Packit f0b94e
    mAsyncSink = do_QueryInterface(mSink);
Packit f0b94e
Packit f0b94e
    return PostContinuationEvent();
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  // implemented by subclasses, returns number of bytes copied and
Packit f0b94e
  // sets source and sink condition before returning.
Packit f0b94e
  virtual uint32_t DoCopy(nsresult* aSourceCondition,
Packit f0b94e
                          nsresult* aSinkCondition) = 0;
Packit f0b94e
Packit f0b94e
  void Process() {
Packit f0b94e
    if (!mSource || !mSink) {
Packit f0b94e
      return;
Packit f0b94e
    }
Packit f0b94e
Packit f0b94e
    nsresult cancelStatus;
Packit f0b94e
    bool canceled;
Packit f0b94e
    {
Packit f0b94e
      MutexAutoLock lock(mLock);
Packit f0b94e
      canceled = mCanceled;
Packit f0b94e
      cancelStatus = mCancelStatus;
Packit f0b94e
    }
Packit f0b94e
Packit f0b94e
    // If the copy was canceled before Process() was even called, then
Packit f0b94e
    // sourceCondition and sinkCondition should be set to error results to
Packit f0b94e
    // ensure we don't call Finish() on a canceled nsISafeOutputStream.
Packit f0b94e
    MOZ_ASSERT(NS_FAILED(cancelStatus) == canceled, "cancel needs an error");
Packit f0b94e
    nsresult sourceCondition = cancelStatus;
Packit f0b94e
    nsresult sinkCondition = cancelStatus;
Packit f0b94e
Packit f0b94e
    // Copy data from the source to the sink until we hit failure or have
Packit f0b94e
    // copied all the data.
Packit f0b94e
    for (;;) {
Packit f0b94e
      // Note: copyFailed will be true if the source or the sink have
Packit f0b94e
      //       reported an error, or if we failed to write any bytes
Packit f0b94e
      //       because we have consumed all of our data.
Packit f0b94e
      bool copyFailed = false;
Packit f0b94e
      if (!canceled) {
Packit f0b94e
        uint32_t n = DoCopy(&sourceCondition, &sinkCondition);
Packit f0b94e
        if (n > 0 && mProgressCallback) {
Packit f0b94e
          mProgressCallback(mClosure, n);
Packit f0b94e
        }
Packit f0b94e
        copyFailed =
Packit f0b94e
            NS_FAILED(sourceCondition) || NS_FAILED(sinkCondition) || n == 0;
Packit f0b94e
Packit f0b94e
        MutexAutoLock lock(mLock);
Packit f0b94e
        canceled = mCanceled;
Packit f0b94e
        cancelStatus = mCancelStatus;
Packit f0b94e
      }
Packit f0b94e
      if (copyFailed && !canceled) {
Packit f0b94e
        if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
Packit f0b94e
          // need to wait for more data from source.  while waiting for
Packit f0b94e
          // more source data, be sure to observe failures on output end.
Packit f0b94e
          mAsyncSource->AsyncWait(this, 0, 0, nullptr);
Packit f0b94e
Packit f0b94e
          if (mAsyncSink)
Packit f0b94e
            mAsyncSink->AsyncWait(this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
Packit f0b94e
                                  0, nullptr);
Packit f0b94e
          break;
Packit f0b94e
        } else if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
Packit f0b94e
          // need to wait for more room in the sink.  while waiting for
Packit f0b94e
          // more room in the sink, be sure to observer failures on the
Packit f0b94e
          // input end.
Packit f0b94e
          mAsyncSink->AsyncWait(this, 0, 0, nullptr);
Packit f0b94e
Packit f0b94e
          if (mAsyncSource)
Packit f0b94e
            mAsyncSource->AsyncWait(
Packit f0b94e
                this, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, nullptr);
Packit f0b94e
          break;
Packit f0b94e
        }
Packit f0b94e
      }
Packit f0b94e
      if (copyFailed || canceled) {
Packit f0b94e
        if (mCloseSource) {
Packit f0b94e
          // close source
Packit f0b94e
          if (mAsyncSource)
Packit f0b94e
            mAsyncSource->CloseWithStatus(canceled ? cancelStatus
Packit f0b94e
                                                   : sinkCondition);
Packit f0b94e
          else {
Packit f0b94e
            mSource->Close();
Packit f0b94e
          }
Packit f0b94e
        }
Packit f0b94e
        mAsyncSource = nullptr;
Packit f0b94e
        mSource = nullptr;
Packit f0b94e
Packit f0b94e
        if (mCloseSink) {
Packit f0b94e
          // close sink
Packit f0b94e
          if (mAsyncSink)
Packit f0b94e
            mAsyncSink->CloseWithStatus(canceled ? cancelStatus
Packit f0b94e
                                                 : sourceCondition);
Packit f0b94e
          else {
Packit f0b94e
            // If we have an nsISafeOutputStream, and our
Packit f0b94e
            // sourceCondition and sinkCondition are not set to a
Packit f0b94e
            // failure state, finish writing.
Packit f0b94e
            nsCOMPtr<nsISafeOutputStream> sostream = do_QueryInterface(mSink);
Packit f0b94e
            if (sostream && NS_SUCCEEDED(sourceCondition) &&
Packit f0b94e
                NS_SUCCEEDED(sinkCondition)) {
Packit f0b94e
              sostream->Finish();
Packit f0b94e
            } else {
Packit f0b94e
              mSink->Close();
Packit f0b94e
            }
Packit f0b94e
          }
Packit f0b94e
        }
Packit f0b94e
        mAsyncSink = nullptr;
Packit f0b94e
        mSink = nullptr;
Packit f0b94e
Packit f0b94e
        // notify state complete...
Packit f0b94e
        if (mCallback) {
Packit f0b94e
          nsresult status;
Packit f0b94e
          if (!canceled) {
Packit f0b94e
            status = sourceCondition;
Packit f0b94e
            if (NS_SUCCEEDED(status)) {
Packit f0b94e
              status = sinkCondition;
Packit f0b94e
            }
Packit f0b94e
            if (status == NS_BASE_STREAM_CLOSED) {
Packit f0b94e
              status = NS_OK;
Packit f0b94e
            }
Packit f0b94e
          } else {
Packit f0b94e
            status = cancelStatus;
Packit f0b94e
          }
Packit f0b94e
          mCallback(mClosure, status);
Packit f0b94e
        }
Packit f0b94e
        break;
Packit f0b94e
      }
Packit f0b94e
    }
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsresult Cancel(nsresult aReason) {
Packit f0b94e
    MutexAutoLock lock(mLock);
Packit f0b94e
    if (mCanceled) {
Packit f0b94e
      return NS_ERROR_FAILURE;
Packit f0b94e
    }
Packit f0b94e
Packit f0b94e
    if (NS_SUCCEEDED(aReason)) {
Packit f0b94e
      NS_WARNING("cancel with non-failure status code");
Packit f0b94e
      aReason = NS_BASE_STREAM_CLOSED;
Packit f0b94e
    }
Packit f0b94e
Packit f0b94e
    mCanceled = true;
Packit f0b94e
    mCancelStatus = aReason;
Packit f0b94e
    return NS_OK;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aSource) override {
Packit f0b94e
    PostContinuationEvent();
Packit f0b94e
    return NS_OK;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aSink) override {
Packit f0b94e
    PostContinuationEvent();
Packit f0b94e
    return NS_OK;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  // continuation event handler
Packit f0b94e
  NS_IMETHOD Run() override {
Packit f0b94e
    Process();
Packit f0b94e
Packit f0b94e
    // clear "in process" flag and post any pending continuation event
Packit f0b94e
    MutexAutoLock lock(mLock);
Packit f0b94e
    mEventInProcess = false;
Packit f0b94e
    if (mEventIsPending) {
Packit f0b94e
      mEventIsPending = false;
Packit f0b94e
      PostContinuationEvent_Locked();
Packit f0b94e
    }
Packit f0b94e
Packit f0b94e
    return NS_OK;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsresult Cancel() MOZ_MUST_OVERRIDE override = 0;
Packit f0b94e
Packit f0b94e
  nsresult PostContinuationEvent() {
Packit f0b94e
    // we cannot post a continuation event if there is currently
Packit f0b94e
    // an event in process.  doing so could result in Process being
Packit f0b94e
    // run simultaneously on multiple threads, so we mark the event
Packit f0b94e
    // as pending, and if an event is already in process then we
Packit f0b94e
    // just let that existing event take care of posting the real
Packit f0b94e
    // continuation event.
Packit f0b94e
Packit f0b94e
    MutexAutoLock lock(mLock);
Packit f0b94e
    return PostContinuationEvent_Locked();
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsresult PostContinuationEvent_Locked() {
Packit f0b94e
    nsresult rv = NS_OK;
Packit f0b94e
    if (mEventInProcess) {
Packit f0b94e
      mEventIsPending = true;
Packit f0b94e
    } else {
Packit f0b94e
      rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
Packit f0b94e
      if (NS_SUCCEEDED(rv)) {
Packit f0b94e
        mEventInProcess = true;
Packit f0b94e
      } else {
Packit f0b94e
        NS_WARNING("unable to post continuation event");
Packit f0b94e
      }
Packit f0b94e
    }
Packit f0b94e
    return rv;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
 protected:
Packit f0b94e
  nsCOMPtr<nsIInputStream> mSource;
Packit f0b94e
  nsCOMPtr<nsIOutputStream> mSink;
Packit f0b94e
  nsCOMPtr<nsIAsyncInputStream> mAsyncSource;
Packit f0b94e
  nsCOMPtr<nsIAsyncOutputStream> mAsyncSink;
Packit f0b94e
  nsCOMPtr<nsIEventTarget> mTarget;
Packit f0b94e
  Mutex mLock;
Packit f0b94e
  nsAsyncCopyCallbackFun mCallback;
Packit f0b94e
  nsAsyncCopyProgressFun mProgressCallback;
Packit f0b94e
  void* mClosure;
Packit f0b94e
  uint32_t mChunkSize;
Packit f0b94e
  bool mEventInProcess;
Packit f0b94e
  bool mEventIsPending;
Packit f0b94e
  bool mCloseSource;
Packit f0b94e
  bool mCloseSink;
Packit f0b94e
  bool mCanceled;
Packit f0b94e
  nsresult mCancelStatus;
Packit f0b94e
Packit f0b94e
  // virtual since subclasses call superclass Release()
Packit f0b94e
  virtual ~nsAStreamCopier() {}
Packit f0b94e
};
Packit f0b94e
Packit f0b94e
NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier, CancelableRunnable,
Packit f0b94e
                            nsIInputStreamCallback, nsIOutputStreamCallback)
Packit f0b94e
Packit f0b94e
class nsStreamCopierIB final : public nsAStreamCopier {
Packit f0b94e
 public:
Packit f0b94e
  nsStreamCopierIB() : nsAStreamCopier() {}
Packit f0b94e
  virtual ~nsStreamCopierIB() {}
Packit f0b94e
Packit f0b94e
  struct MOZ_STACK_CLASS ReadSegmentsState {
Packit f0b94e
    // the nsIOutputStream will outlive the ReadSegmentsState on the stack
Packit f0b94e
    nsIOutputStream* MOZ_NON_OWNING_REF mSink;
Packit f0b94e
    nsresult mSinkCondition;
Packit f0b94e
  };
Packit f0b94e
Packit f0b94e
  static nsresult ConsumeInputBuffer(nsIInputStream* aInStr, void* aClosure,
Packit f0b94e
                                     const char* aBuffer, uint32_t aOffset,
Packit f0b94e
                                     uint32_t aCount, uint32_t* aCountWritten) {
Packit f0b94e
    ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
Packit f0b94e
Packit f0b94e
    nsresult rv = state->mSink->Write(aBuffer, aCount, aCountWritten);
Packit f0b94e
    if (NS_FAILED(rv)) {
Packit f0b94e
      state->mSinkCondition = rv;
Packit f0b94e
    } else if (*aCountWritten == 0) {
Packit f0b94e
      state->mSinkCondition = NS_BASE_STREAM_CLOSED;
Packit f0b94e
    }
Packit f0b94e
Packit f0b94e
    return state->mSinkCondition;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  uint32_t DoCopy(nsresult* aSourceCondition,
Packit f0b94e
                  nsresult* aSinkCondition) override {
Packit f0b94e
    ReadSegmentsState state;
Packit f0b94e
    state.mSink = mSink;
Packit f0b94e
    state.mSinkCondition = NS_OK;
Packit f0b94e
Packit f0b94e
    uint32_t n;
Packit f0b94e
    *aSourceCondition =
Packit f0b94e
        mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n);
Packit f0b94e
    *aSinkCondition = state.mSinkCondition;
Packit f0b94e
    return n;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsresult Cancel() override { return NS_OK; }
Packit f0b94e
};
Packit f0b94e
Packit f0b94e
class nsStreamCopierOB final : public nsAStreamCopier {
Packit f0b94e
 public:
Packit f0b94e
  nsStreamCopierOB() : nsAStreamCopier() {}
Packit f0b94e
  virtual ~nsStreamCopierOB() {}
Packit f0b94e
Packit f0b94e
  struct MOZ_STACK_CLASS WriteSegmentsState {
Packit f0b94e
    // the nsIInputStream will outlive the WriteSegmentsState on the stack
Packit f0b94e
    nsIInputStream* MOZ_NON_OWNING_REF mSource;
Packit f0b94e
    nsresult mSourceCondition;
Packit f0b94e
  };
Packit f0b94e
Packit f0b94e
  static nsresult FillOutputBuffer(nsIOutputStream* aOutStr, void* aClosure,
Packit f0b94e
                                   char* aBuffer, uint32_t aOffset,
Packit f0b94e
                                   uint32_t aCount, uint32_t* aCountRead) {
Packit f0b94e
    WriteSegmentsState* state = (WriteSegmentsState*)aClosure;
Packit f0b94e
Packit f0b94e
    nsresult rv = state->mSource->Read(aBuffer, aCount, aCountRead);
Packit f0b94e
    if (NS_FAILED(rv)) {
Packit f0b94e
      state->mSourceCondition = rv;
Packit f0b94e
    } else if (*aCountRead == 0) {
Packit f0b94e
      state->mSourceCondition = NS_BASE_STREAM_CLOSED;
Packit f0b94e
    }
Packit f0b94e
Packit f0b94e
    return state->mSourceCondition;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  uint32_t DoCopy(nsresult* aSourceCondition,
Packit f0b94e
                  nsresult* aSinkCondition) override {
Packit f0b94e
    WriteSegmentsState state;
Packit f0b94e
    state.mSource = mSource;
Packit f0b94e
    state.mSourceCondition = NS_OK;
Packit f0b94e
Packit f0b94e
    uint32_t n;
Packit f0b94e
    *aSinkCondition =
Packit f0b94e
        mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n);
Packit f0b94e
    *aSourceCondition = state.mSourceCondition;
Packit f0b94e
    return n;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsresult Cancel() override { return NS_OK; }
Packit f0b94e
};
Packit f0b94e
Packit f0b94e
//-----------------------------------------------------------------------------
Packit f0b94e
Packit f0b94e
nsresult NS_AsyncCopy(nsIInputStream* aSource, nsIOutputStream* aSink,
Packit f0b94e
                      nsIEventTarget* aTarget, nsAsyncCopyMode aMode,
Packit f0b94e
                      uint32_t aChunkSize, nsAsyncCopyCallbackFun aCallback,
Packit f0b94e
                      void* aClosure, bool aCloseSource, bool aCloseSink,
Packit f0b94e
                      nsISupports** aCopierCtx,
Packit f0b94e
                      nsAsyncCopyProgressFun aProgressCallback) {
Packit f0b94e
  NS_ASSERTION(aTarget, "non-null target required");
Packit f0b94e
Packit f0b94e
  nsresult rv;
Packit f0b94e
  nsAStreamCopier* copier;
Packit f0b94e
Packit f0b94e
  if (aMode == NS_ASYNCCOPY_VIA_READSEGMENTS) {
Packit f0b94e
    copier = new nsStreamCopierIB();
Packit f0b94e
  } else {
Packit f0b94e
    copier = new nsStreamCopierOB();
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  // Start() takes an owning ref to the copier...
Packit f0b94e
  NS_ADDREF(copier);
Packit f0b94e
  rv = copier->Start(aSource, aSink, aTarget, aCallback, aClosure, aChunkSize,
Packit f0b94e
                     aCloseSource, aCloseSink, aProgressCallback);
Packit f0b94e
Packit f0b94e
  if (aCopierCtx) {
Packit f0b94e
    *aCopierCtx = static_cast<nsISupports*>(static_cast<nsIRunnable*>(copier));
Packit f0b94e
    NS_ADDREF(*aCopierCtx);
Packit f0b94e
  }
Packit f0b94e
  NS_RELEASE(copier);
Packit f0b94e
Packit f0b94e
  return rv;
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
//-----------------------------------------------------------------------------
Packit f0b94e
Packit f0b94e
nsresult NS_CancelAsyncCopy(nsISupports* aCopierCtx, nsresult aReason) {
Packit f0b94e
  nsAStreamCopier* copier =
Packit f0b94e
      static_cast<nsAStreamCopier*>(static_cast<nsIRunnable*>(aCopierCtx));
Packit f0b94e
  return copier->Cancel(aReason);
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
//-----------------------------------------------------------------------------
Packit f0b94e
Packit f0b94e
nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
Packit f0b94e
                          nsACString& aResult) {
Packit f0b94e
  nsresult rv = NS_OK;
Packit f0b94e
  aResult.Truncate();
Packit f0b94e
Packit f0b94e
  while (aMaxCount) {
Packit f0b94e
    uint64_t avail64;
Packit f0b94e
    rv = aStream->Available(&avail64);
Packit f0b94e
    if (NS_FAILED(rv)) {
Packit f0b94e
      if (rv == NS_BASE_STREAM_CLOSED) {
Packit f0b94e
        rv = NS_OK;
Packit f0b94e
      }
Packit f0b94e
      break;
Packit f0b94e
    }
Packit f0b94e
    if (avail64 == 0) {
Packit f0b94e
      break;
Packit f0b94e
    }
Packit f0b94e
Packit f0b94e
    uint32_t avail = (uint32_t)XPCOM_MIN<uint64_t>(avail64, aMaxCount);
Packit f0b94e
Packit f0b94e
    // resize aResult buffer
Packit f0b94e
    uint32_t length = aResult.Length();
Packit f0b94e
    if (avail > UINT32_MAX - length) {
Packit f0b94e
      return NS_ERROR_FILE_TOO_BIG;
Packit f0b94e
    }
Packit f0b94e
Packit f0b94e
    aResult.SetLength(length + avail);
Packit f0b94e
    if (aResult.Length() != (length + avail)) {
Packit f0b94e
      return NS_ERROR_OUT_OF_MEMORY;
Packit f0b94e
    }
Packit f0b94e
    char* buf = aResult.BeginWriting() + length;
Packit f0b94e
Packit f0b94e
    uint32_t n;
Packit f0b94e
    rv = aStream->Read(buf, avail, &n);
Packit f0b94e
    if (NS_FAILED(rv)) {
Packit f0b94e
      break;
Packit f0b94e
    }
Packit f0b94e
    if (n != avail) {
Packit f0b94e
      aResult.SetLength(length + n);
Packit f0b94e
    }
Packit f0b94e
    if (n == 0) {
Packit f0b94e
      break;
Packit f0b94e
    }
Packit f0b94e
    aMaxCount -= n;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  return rv;
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
//-----------------------------------------------------------------------------
Packit f0b94e
Packit f0b94e
static nsresult TestInputStream(nsIInputStream* aInStr, void* aClosure,
Packit f0b94e
                                const char* aBuffer, uint32_t aOffset,
Packit f0b94e
                                uint32_t aCount, uint32_t* aCountWritten) {
Packit f0b94e
  bool* result = static_cast<bool*>(aClosure);
Packit f0b94e
  *result = true;
Packit f0b94e
  *aCountWritten = 0;
Packit f0b94e
  return NS_ERROR_ABORT;  // don't call me anymore
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
bool NS_InputStreamIsBuffered(nsIInputStream* aStream) {
Packit f0b94e
  nsCOMPtr<nsIBufferedInputStream> bufferedIn = do_QueryInterface(aStream);
Packit f0b94e
  if (bufferedIn) {
Packit f0b94e
    return true;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  bool result = false;
Packit f0b94e
  uint32_t n;
Packit f0b94e
  nsresult rv = aStream->ReadSegments(TestInputStream, &result, 1, &n);
Packit f0b94e
  return result || NS_SUCCEEDED(rv);
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
static nsresult TestOutputStream(nsIOutputStream* aOutStr, void* aClosure,
Packit f0b94e
                                 char* aBuffer, uint32_t aOffset,
Packit f0b94e
                                 uint32_t aCount, uint32_t* aCountRead) {
Packit f0b94e
  bool* result = static_cast<bool*>(aClosure);
Packit f0b94e
  *result = true;
Packit f0b94e
  *aCountRead = 0;
Packit f0b94e
  return NS_ERROR_ABORT;  // don't call me anymore
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
bool NS_OutputStreamIsBuffered(nsIOutputStream* aStream) {
Packit f0b94e
  nsCOMPtr<nsIBufferedOutputStream> bufferedOut = do_QueryInterface(aStream);
Packit f0b94e
  if (bufferedOut) {
Packit f0b94e
    return true;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  bool result = false;
Packit f0b94e
  uint32_t n;
Packit f0b94e
  aStream->WriteSegments(TestOutputStream, &result, 1, &n);
Packit f0b94e
  return result;
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
//-----------------------------------------------------------------------------
Packit f0b94e
Packit f0b94e
nsresult NS_CopySegmentToStream(nsIInputStream* aInStr, void* aClosure,
Packit f0b94e
                                const char* aBuffer, uint32_t aOffset,
Packit f0b94e
                                uint32_t aCount, uint32_t* aCountWritten) {
Packit f0b94e
  nsIOutputStream* outStr = static_cast<nsIOutputStream*>(aClosure);
Packit f0b94e
  *aCountWritten = 0;
Packit f0b94e
  while (aCount) {
Packit f0b94e
    uint32_t n;
Packit f0b94e
    nsresult rv = outStr->Write(aBuffer, aCount, &n);
Packit f0b94e
    if (NS_FAILED(rv)) {
Packit f0b94e
      return rv;
Packit f0b94e
    }
Packit f0b94e
    aBuffer += n;
Packit f0b94e
    aCount -= n;
Packit f0b94e
    *aCountWritten += n;
Packit f0b94e
  }
Packit f0b94e
  return NS_OK;
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
nsresult NS_CopySegmentToBuffer(nsIInputStream* aInStr, void* aClosure,
Packit f0b94e
                                const char* aBuffer, uint32_t aOffset,
Packit f0b94e
                                uint32_t aCount, uint32_t* aCountWritten) {
Packit f0b94e
  char* toBuf = static_cast<char*>(aClosure);
Packit f0b94e
  memcpy(&toBuf[aOffset], aBuffer, aCount);
Packit f0b94e
  *aCountWritten = aCount;
Packit f0b94e
  return NS_OK;
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
nsresult NS_CopySegmentToBuffer(nsIOutputStream* aOutStr, void* aClosure,
Packit f0b94e
                                char* aBuffer, uint32_t aOffset,
Packit f0b94e
                                uint32_t aCount, uint32_t* aCountRead) {
Packit f0b94e
  const char* fromBuf = static_cast<const char*>(aClosure);
Packit f0b94e
  memcpy(aBuffer, &fromBuf[aOffset], aCount);
Packit f0b94e
  *aCountRead = aCount;
Packit f0b94e
  return NS_OK;
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
nsresult NS_DiscardSegment(nsIInputStream* aInStr, void* aClosure,
Packit f0b94e
                           const char* aBuffer, uint32_t aOffset,
Packit f0b94e
                           uint32_t aCount, uint32_t* aCountWritten) {
Packit f0b94e
  *aCountWritten = aCount;
Packit f0b94e
  return NS_OK;
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
//-----------------------------------------------------------------------------
Packit f0b94e
Packit f0b94e
nsresult NS_WriteSegmentThunk(nsIInputStream* aInStr, void* aClosure,
Packit f0b94e
                              const char* aBuffer, uint32_t aOffset,
Packit f0b94e
                              uint32_t aCount, uint32_t* aCountWritten) {
Packit f0b94e
  nsWriteSegmentThunk* thunk = static_cast<nsWriteSegmentThunk*>(aClosure);
Packit f0b94e
  return thunk->mFun(thunk->mStream, thunk->mClosure, aBuffer, aOffset, aCount,
Packit f0b94e
                     aCountWritten);
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
nsresult NS_FillArray(FallibleTArray<char>& aDest, nsIInputStream* aInput,
Packit f0b94e
                      uint32_t aKeep, uint32_t* aNewBytes) {
Packit f0b94e
  MOZ_ASSERT(aInput, "null stream");
Packit f0b94e
  MOZ_ASSERT(aKeep <= aDest.Length(), "illegal keep count");
Packit f0b94e
Packit f0b94e
  char* aBuffer = aDest.Elements();
Packit f0b94e
  int64_t keepOffset = int64_t(aDest.Length()) - aKeep;
Packit f0b94e
  if (aKeep != 0 && keepOffset > 0) {
Packit f0b94e
    memmove(aBuffer, aBuffer + keepOffset, aKeep);
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsresult rv =
Packit f0b94e
      aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes);
Packit f0b94e
  if (NS_FAILED(rv)) {
Packit f0b94e
    *aNewBytes = 0;
Packit f0b94e
  }
Packit f0b94e
  // NOTE: we rely on the fact that the new slots are NOT initialized by
Packit f0b94e
  // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
Packit f0b94e
  // in nsTArray.h:
Packit f0b94e
  aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes);
Packit f0b94e
Packit f0b94e
  MOZ_ASSERT(aDest.Length() <= aDest.Capacity(), "buffer overflow");
Packit f0b94e
  return rv;
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
bool NS_InputStreamIsCloneable(nsIInputStream* aSource) {
Packit f0b94e
  if (!aSource) {
Packit f0b94e
    return false;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
Packit f0b94e
  return cloneable && cloneable->GetCloneable();
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
nsresult NS_CloneInputStream(nsIInputStream* aSource,
Packit f0b94e
                             nsIInputStream** aCloneOut,
Packit f0b94e
                             nsIInputStream** aReplacementOut) {
Packit f0b94e
  if (NS_WARN_IF(!aSource)) {
Packit f0b94e
    return NS_ERROR_FAILURE;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  // Attempt to perform the clone directly on the source stream
Packit f0b94e
  nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
Packit f0b94e
  if (cloneable && cloneable->GetCloneable()) {
Packit f0b94e
    if (aReplacementOut) {
Packit f0b94e
      *aReplacementOut = nullptr;
Packit f0b94e
    }
Packit f0b94e
    return cloneable->Clone(aCloneOut);
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  // If we failed the clone and the caller does not want to replace their
Packit f0b94e
  // original stream, then we are done.  Return error.
Packit f0b94e
  if (!aReplacementOut) {
Packit f0b94e
    return NS_ERROR_FAILURE;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  // The caller has opted-in to the fallback clone support that replaces
Packit f0b94e
  // the original stream.  Copy the data to a pipe and return two cloned
Packit f0b94e
  // input streams.
Packit f0b94e
Packit f0b94e
  nsCOMPtr<nsIInputStream> reader;
Packit f0b94e
  nsCOMPtr<nsIInputStream> readerClone;
Packit f0b94e
  nsCOMPtr<nsIOutputStream> writer;
Packit f0b94e
Packit f0b94e
  nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), 0,
Packit f0b94e
                           0,            // default segment size and max size
Packit f0b94e
                           true, true);  // non-blocking
Packit f0b94e
  if (NS_WARN_IF(NS_FAILED(rv))) {
Packit f0b94e
    return rv;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  cloneable = do_QueryInterface(reader);
Packit f0b94e
  MOZ_ASSERT(cloneable && cloneable->GetCloneable());
Packit f0b94e
Packit f0b94e
  rv = cloneable->Clone(getter_AddRefs(readerClone));
Packit f0b94e
  if (NS_WARN_IF(NS_FAILED(rv))) {
Packit f0b94e
    return rv;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsCOMPtr<nsIEventTarget> target =
Packit f0b94e
      do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv;;
Packit f0b94e
  if (NS_WARN_IF(NS_FAILED(rv))) {
Packit f0b94e
    return rv;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  rv = NS_AsyncCopy(aSource, writer, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS);
Packit f0b94e
  if (NS_WARN_IF(NS_FAILED(rv))) {
Packit f0b94e
    return rv;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  readerClone.forget(aCloneOut);
Packit f0b94e
  reader.forget(aReplacementOut);
Packit f0b94e
Packit f0b94e
  return NS_OK;
Packit f0b94e
}
Packit f0b94e
Packit f0b94e
nsresult NS_MakeAsyncNonBlockingInputStream(
Packit f0b94e
    already_AddRefed<nsIInputStream> aSource,
Packit f0b94e
    nsIAsyncInputStream** aAsyncInputStream) {
Packit f0b94e
  nsCOMPtr<nsIInputStream> source = Move(aSource);
Packit f0b94e
  if (NS_WARN_IF(!aAsyncInputStream)) {
Packit f0b94e
    return NS_ERROR_FAILURE;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  bool nonBlocking = false;
Packit f0b94e
  nsresult rv = source->IsNonBlocking(&nonBlocking);
Packit f0b94e
  if (NS_WARN_IF(NS_FAILED(rv))) {
Packit f0b94e
    return rv;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(source);
Packit f0b94e
Packit f0b94e
  if (nonBlocking && asyncStream) {
Packit f0b94e
    // This stream is perfect!
Packit f0b94e
    asyncStream.forget(aAsyncInputStream);
Packit f0b94e
    return NS_OK;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  if (nonBlocking) {
Packit f0b94e
    // If the stream is non-blocking but not async, we wrap it.
Packit f0b94e
    return NonBlockingAsyncInputStream::Create(source.forget(),
Packit f0b94e
                                               aAsyncInputStream);
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsCOMPtr<nsIStreamTransportService> sts =
Packit f0b94e
      do_GetService(kStreamTransportServiceCID, &rv;;
Packit f0b94e
  if (NS_WARN_IF(NS_FAILED(rv))) {
Packit f0b94e
    return rv;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsCOMPtr<nsITransport> transport;
Packit f0b94e
  rv = sts->CreateInputTransport(source,
Packit f0b94e
                                 /* aCloseWhenDone */ true,
Packit f0b94e
                                 getter_AddRefs(transport));
Packit f0b94e
  if (NS_WARN_IF(NS_FAILED(rv))) {
Packit f0b94e
    return rv;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  nsCOMPtr<nsIInputStream> wrapper;
Packit f0b94e
  rv = transport->OpenInputStream(/* aFlags */ 0,
Packit f0b94e
                                  /* aSegmentSize */ 0,
Packit f0b94e
                                  /* aSegmentCount */ 0,
Packit f0b94e
                                  getter_AddRefs(wrapper));
Packit f0b94e
  if (NS_WARN_IF(NS_FAILED(rv))) {
Packit f0b94e
    return rv;
Packit f0b94e
  }
Packit f0b94e
Packit f0b94e
  asyncStream = do_QueryInterface(wrapper);
Packit f0b94e
  MOZ_ASSERT(asyncStream);
Packit f0b94e
Packit f0b94e
  asyncStream.forget(aAsyncInputStream);
Packit f0b94e
  return NS_OK;
Packit f0b94e
}