Blob Blame History Raw
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include "SlicedInputStream.h"
#include "mozilla/ipc/InputStreamUtils.h"
#include "nsISeekableStream.h"
#include "nsStreamUtils.h"

namespace mozilla {

using namespace ipc;

NS_IMPL_ADDREF(SlicedInputStream);
NS_IMPL_RELEASE(SlicedInputStream);

NS_INTERFACE_MAP_BEGIN(SlicedInputStream)
  NS_INTERFACE_MAP_ENTRY(nsIInputStream)
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream,
                                     mWeakCloneableInputStream || !mInputStream)
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(
      nsIIPCSerializableInputStream,
      mWeakIPCSerializableInputStream || !mInputStream)
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream,
                                     mWeakSeekableInputStream || !mInputStream)
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream,
                                     mWeakAsyncInputStream || !mInputStream)
  NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback,
                                     mWeakAsyncInputStream || !mInputStream)
  NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream)
NS_INTERFACE_MAP_END

SlicedInputStream::SlicedInputStream(
    already_AddRefed<nsIInputStream> aInputStream, uint64_t aStart,
    uint64_t aLength)
    : mWeakCloneableInputStream(nullptr),
      mWeakIPCSerializableInputStream(nullptr),
      mWeakSeekableInputStream(nullptr),
      mWeakAsyncInputStream(nullptr),
      mStart(aStart),
      mLength(aLength),
      mCurPos(0),
      mClosed(false),
      mAsyncWaitFlags(0),
      mAsyncWaitRequestedCount(0) {
  nsCOMPtr<nsIInputStream> inputStream = mozilla::Move(aInputStream);
  SetSourceStream(inputStream.forget());
}

SlicedInputStream::SlicedInputStream()
    : mWeakCloneableInputStream(nullptr),
      mWeakIPCSerializableInputStream(nullptr),
      mWeakSeekableInputStream(nullptr),
      mWeakAsyncInputStream(nullptr),
      mStart(0),
      mLength(0),
      mCurPos(0),
      mClosed(false),
      mAsyncWaitFlags(0),
      mAsyncWaitRequestedCount(0) {}

SlicedInputStream::~SlicedInputStream() {}

void SlicedInputStream::SetSourceStream(
    already_AddRefed<nsIInputStream> aInputStream) {
  MOZ_ASSERT(!mInputStream);

  mInputStream = mozilla::Move(aInputStream);

  nsCOMPtr<nsICloneableInputStream> cloneableStream =
      do_QueryInterface(mInputStream);
  if (cloneableStream && SameCOMIdentity(mInputStream, cloneableStream)) {
    mWeakCloneableInputStream = cloneableStream;
  }

  nsCOMPtr<nsIIPCSerializableInputStream> serializableStream =
      do_QueryInterface(mInputStream);
  if (serializableStream && SameCOMIdentity(mInputStream, serializableStream)) {
    mWeakIPCSerializableInputStream = serializableStream;
  }

  nsCOMPtr<nsISeekableStream> seekableStream = do_QueryInterface(mInputStream);
  if (seekableStream && SameCOMIdentity(mInputStream, seekableStream)) {
    mWeakSeekableInputStream = seekableStream;
  }

  nsCOMPtr<nsIAsyncInputStream> asyncInputStream =
      do_QueryInterface(mInputStream);
  if (asyncInputStream && SameCOMIdentity(mInputStream, asyncInputStream)) {
    mWeakAsyncInputStream = asyncInputStream;
  }
}

NS_IMETHODIMP
SlicedInputStream::Close() {
  NS_ENSURE_STATE(mInputStream);

  mClosed = true;
  return mInputStream->Close();
}

// nsIInputStream interface

NS_IMETHODIMP
SlicedInputStream::Available(uint64_t* aLength) {
  NS_ENSURE_STATE(mInputStream);

  if (mClosed) {
    return NS_BASE_STREAM_CLOSED;
  }

  nsresult rv = mInputStream->Available(aLength);
  if (rv == NS_BASE_STREAM_CLOSED) {
    mClosed = true;
    return rv;
  }

  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  // Let's remove extra length from the end.
  if (*aLength + mCurPos > mStart + mLength) {
    *aLength -= XPCOM_MIN(*aLength, (*aLength + mCurPos) - (mStart + mLength));
  }

  // Let's remove extra length from the begin.
  if (mCurPos < mStart) {
    *aLength -= XPCOM_MIN(*aLength, mStart - mCurPos);
  }

  return NS_OK;
}

NS_IMETHODIMP
SlicedInputStream::Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) {
  *aReadCount = 0;

  if (mClosed) {
    return NS_OK;
  }

  if (mCurPos < mStart) {
    nsCOMPtr<nsISeekableStream> seekableStream =
        do_QueryInterface(mInputStream);
    if (seekableStream) {
      nsresult rv =
          seekableStream->Seek(nsISeekableStream::NS_SEEK_SET, mStart);
      if (NS_WARN_IF(NS_FAILED(rv))) {
        return rv;
      }

      mCurPos = mStart;
    } else {
      char buf[4096];
      while (mCurPos < mStart) {
        uint32_t bytesRead;
        uint64_t bufCount = XPCOM_MIN(mStart - mCurPos, (uint64_t)sizeof(buf));
        nsresult rv = mInputStream->Read(buf, bufCount, &bytesRead);
        if (NS_SUCCEEDED(rv) && bytesRead == 0) {
          mClosed = true;
          return rv;
        }

        if (NS_WARN_IF(NS_FAILED(rv))) {
          return rv;
        }

        mCurPos += bytesRead;
      }
    }
  }

  // Let's reduce aCount in case it's too big.
  if (mCurPos + aCount > mStart + mLength) {
    aCount = mStart + mLength - mCurPos;
  }

  // Nothing else to read.
  if (!aCount) {
    return NS_OK;
  }

  nsresult rv = mInputStream->Read(aBuffer, aCount, aReadCount);
  if (NS_SUCCEEDED(rv) && *aReadCount == 0) {
    mClosed = true;
    return rv;
  }

  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  mCurPos += *aReadCount;
  return NS_OK;
}

NS_IMETHODIMP
SlicedInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
                                uint32_t aCount, uint32_t* aResult) {
  return NS_ERROR_NOT_IMPLEMENTED;
}

NS_IMETHODIMP
SlicedInputStream::IsNonBlocking(bool* aNonBlocking) {
  NS_ENSURE_STATE(mInputStream);
  return mInputStream->IsNonBlocking(aNonBlocking);
}

// nsICloneableInputStream interface

NS_IMETHODIMP
SlicedInputStream::GetCloneable(bool* aCloneable) {
  NS_ENSURE_STATE(mInputStream);
  NS_ENSURE_STATE(mWeakCloneableInputStream);

  *aCloneable = true;
  return NS_OK;
}

NS_IMETHODIMP
SlicedInputStream::Clone(nsIInputStream** aResult) {
  NS_ENSURE_STATE(mInputStream);
  NS_ENSURE_STATE(mWeakCloneableInputStream);

  nsCOMPtr<nsIInputStream> clonedStream;
  nsresult rv = mWeakCloneableInputStream->Clone(getter_AddRefs(clonedStream));
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  nsCOMPtr<nsIInputStream> sis =
      new SlicedInputStream(clonedStream.forget(), mStart, mLength);

  sis.forget(aResult);
  return NS_OK;
}

// nsIAsyncInputStream interface

NS_IMETHODIMP
SlicedInputStream::CloseWithStatus(nsresult aStatus) {
  NS_ENSURE_STATE(mInputStream);
  NS_ENSURE_STATE(mWeakAsyncInputStream);

  mClosed = true;
  return mWeakAsyncInputStream->CloseWithStatus(aStatus);
}

NS_IMETHODIMP
SlicedInputStream::AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
                             uint32_t aRequestedCount,
                             nsIEventTarget* aEventTarget) {
  NS_ENSURE_STATE(mInputStream);
  NS_ENSURE_STATE(mWeakAsyncInputStream);

  if (mAsyncWaitCallback && aCallback) {
    return NS_ERROR_FAILURE;
  }

  mAsyncWaitCallback = aCallback;

  if (!mAsyncWaitCallback) {
    return NS_OK;
  }

  // If we haven't started retrieving data, let's see if we can seek.
  // If we cannot seek, we will do consecutive reads.
  if (mCurPos < mStart && mWeakSeekableInputStream) {
    nsresult rv =
        mWeakSeekableInputStream->Seek(nsISeekableStream::NS_SEEK_SET, mStart);
    if (NS_WARN_IF(NS_FAILED(rv))) {
      return rv;
    }

    mCurPos = mStart;
  }

  mAsyncWaitFlags = aFlags;
  mAsyncWaitRequestedCount = aRequestedCount;
  mAsyncWaitEventTarget = aEventTarget;

  // If we are not at the right position, let's do an asyncWait just internal.
  if (mCurPos < mStart) {
    return mWeakAsyncInputStream->AsyncWait(this, 0, mStart - mCurPos,
                                            aEventTarget);
  }

  return mWeakAsyncInputStream->AsyncWait(this, aFlags, aRequestedCount,
                                          aEventTarget);
}

// nsIInputStreamCallback

NS_IMETHODIMP
SlicedInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
  MOZ_ASSERT(mInputStream);
  MOZ_ASSERT(mWeakAsyncInputStream);
  MOZ_ASSERT(mWeakAsyncInputStream == aStream);

  // We have been canceled in the meanwhile.
  if (!mAsyncWaitCallback) {
    return NS_OK;
  }

  if (mCurPos < mStart) {
    char buf[4096];
    while (mCurPos < mStart) {
      uint32_t bytesRead;
      uint64_t bufCount = XPCOM_MIN(mStart - mCurPos, (uint64_t)sizeof(buf));
      nsresult rv = mInputStream->Read(buf, bufCount, &bytesRead);
      if (NS_SUCCEEDED(rv) && bytesRead == 0) {
        mClosed = true;
        return RunAsyncWaitCallback();
      }

      if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
        return mWeakAsyncInputStream->AsyncWait(this, 0, mStart - mCurPos,
                                                mAsyncWaitEventTarget);
      }

      if (NS_WARN_IF(NS_FAILED(rv))) {
        return RunAsyncWaitCallback();
      }

      mCurPos += bytesRead;
    }

    // Now we are ready to do the 'real' asyncWait.
    return mWeakAsyncInputStream->AsyncWait(
        this, mAsyncWaitFlags, mAsyncWaitRequestedCount, mAsyncWaitEventTarget);
  }

  return RunAsyncWaitCallback();
}

nsresult SlicedInputStream::RunAsyncWaitCallback() {
  nsCOMPtr<nsIInputStreamCallback> callback = mAsyncWaitCallback;

  mAsyncWaitCallback = nullptr;
  mAsyncWaitEventTarget = nullptr;

  return callback->OnInputStreamReady(this);
}

// nsIIPCSerializableInputStream

void SlicedInputStream::Serialize(mozilla::ipc::InputStreamParams& aParams,
                                  FileDescriptorArray& aFileDescriptors) {
  MOZ_ASSERT(mInputStream);
  MOZ_ASSERT(mWeakIPCSerializableInputStream);

  SlicedInputStreamParams params;
  InputStreamHelper::SerializeInputStream(mInputStream, params.stream(),
                                          aFileDescriptors);
  params.start() = mStart;
  params.length() = mLength;
  params.curPos() = mCurPos;
  params.closed() = mClosed;

  aParams = params;
}

bool SlicedInputStream::Deserialize(
    const mozilla::ipc::InputStreamParams& aParams,
    const FileDescriptorArray& aFileDescriptors) {
  MOZ_ASSERT(!mInputStream);
  MOZ_ASSERT(!mWeakIPCSerializableInputStream);

  if (aParams.type() != InputStreamParams::TSlicedInputStreamParams) {
    NS_ERROR("Received unknown parameters from the other process!");
    return false;
  }

  const SlicedInputStreamParams& params = aParams.get_SlicedInputStreamParams();

  nsCOMPtr<nsIInputStream> stream = InputStreamHelper::DeserializeInputStream(
      params.stream(), aFileDescriptors);
  if (!stream) {
    NS_WARNING("Deserialize failed!");
    return false;
  }

  SetSourceStream(stream.forget());

  mStart = params.start();
  mLength = params.length();
  mCurPos = params.curPos();
  mClosed = params.closed();

  return true;
}

mozilla::Maybe<uint64_t> SlicedInputStream::ExpectedSerializedLength() {
  if (!mInputStream || !mWeakIPCSerializableInputStream) {
    return mozilla::Nothing();
  }

  return mWeakIPCSerializableInputStream->ExpectedSerializedLength();
}

// nsISeekableStream

NS_IMETHODIMP
SlicedInputStream::Seek(int32_t aWhence, int64_t aOffset) {
  NS_ENSURE_STATE(mInputStream);
  NS_ENSURE_STATE(mWeakSeekableInputStream);

  int64_t offset;
  nsresult rv;

  switch (aWhence) {
    case NS_SEEK_SET:
      offset = mStart + aOffset;
      break;
    case NS_SEEK_CUR:
      // mCurPos could be lower than mStart if the reading has not started yet.
      offset = XPCOM_MAX(mStart, mCurPos) + aOffset;
      break;
    case NS_SEEK_END: {
      uint64_t available;
      rv = mInputStream->Available(&available);
      if (rv == NS_BASE_STREAM_CLOSED) {
        mClosed = true;
        return rv;
      }

      if (NS_WARN_IF(NS_FAILED(rv))) {
        return rv;
      }

      offset = XPCOM_MIN(mStart + mLength, available) + aOffset;
      break;
    }
    default:
      return NS_ERROR_ILLEGAL_VALUE;
  }

  if (offset < (int64_t)mStart || offset > (int64_t)(mStart + mLength)) {
    return NS_ERROR_INVALID_ARG;
  }

  rv = mWeakSeekableInputStream->Seek(NS_SEEK_SET, offset);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  mCurPos = offset;
  return NS_OK;
}

NS_IMETHODIMP
SlicedInputStream::Tell(int64_t* aResult) {
  NS_ENSURE_STATE(mInputStream);
  NS_ENSURE_STATE(mWeakSeekableInputStream);

  int64_t tell = 0;

  nsresult rv = mWeakSeekableInputStream->Tell(&tell);
  if (NS_WARN_IF(NS_FAILED(rv))) {
    return rv;
  }

  if (tell < (int64_t)mStart) {
    *aResult = 0;
    return NS_OK;
  }

  *aResult = tell - mStart;
  if (*aResult > (int64_t)mLength) {
    *aResult = mLength;
  }

  return NS_OK;
}

NS_IMETHODIMP
SlicedInputStream::SetEOF() {
  NS_ENSURE_STATE(mInputStream);
  NS_ENSURE_STATE(mWeakSeekableInputStream);

  mClosed = true;
  return mWeakSeekableInputStream->SetEOF();
}

}  // namespace mozilla