/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/
package io.crate.blob;
import com.google.common.util.concurrent.SettableFuture;
import io.crate.blob.exceptions.DigestMismatchException;
import io.crate.blob.pending_transfer.*;
import io.crate.blob.v2.BlobIndices;
import io.crate.blob.v2.BlobShard;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
public class BlobTransferTarget extends AbstractComponent {
private final ConcurrentMap<UUID, BlobTransferStatus> activeTransfers =
ConcurrentCollections.newConcurrentMap();
private final BlobIndices blobIndices;
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterService clusterService;
private CountDownLatch getHeadRequestLatch;
private SettableFuture<CountDownLatch> getHeadRequestLatchFuture;
private final ConcurrentLinkedQueue<UUID> activePutHeadChunkTransfers;
private CountDownLatch activePutHeadChunkTransfersLatch;
private volatile boolean recoveryActive = false;
private final Object lock = new Object();
private final List<UUID> finishedUploads = new ArrayList<>();
private final TimeValue STATE_REMOVAL_DELAY;
@Inject
public BlobTransferTarget(Settings settings, BlobIndices blobIndices,
ThreadPool threadPool,
TransportService transportService,
ClusterService clusterService) {
super(settings);
String property = System.getProperty("tests.short_timeouts");
if (property == null) {
STATE_REMOVAL_DELAY = new TimeValue(40, TimeUnit.SECONDS);
} else {
STATE_REMOVAL_DELAY = new TimeValue(2, TimeUnit.SECONDS);
}
this.blobIndices = blobIndices;
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterService = clusterService;
this.getHeadRequestLatchFuture = SettableFuture.create();
this.activePutHeadChunkTransfers = new ConcurrentLinkedQueue<>();
}
public BlobTransferStatus getActiveTransfer(UUID transferId) {
return activeTransfers.get(transferId);
}
public void startTransfer(int shardId, StartBlobRequest request, StartBlobResponse response) {
logger.debug("startTransfer {} {}", request.transferId(), request.isLast());
BlobShard blobShard = blobIndices.blobShardSafe(request.index(), shardId);
File existing = blobShard.blobContainer().getFile(request.id());
long size = existing.length();
if (existing.exists()) {
// the file exists
response.status(RemoteDigestBlob.Status.EXISTS);
response.size(size);
return;
}
DigestBlob digestBlob = blobShard.blobContainer().createBlob(request.id(), request.transferId());
digestBlob.addContent(request.content(), request.isLast());
response.size(digestBlob.size());
if (request.isLast()) {
try {
digestBlob.commit();
response.status(RemoteDigestBlob.Status.FULL);
} catch (DigestMismatchException e) {
response.status(RemoteDigestBlob.Status.MISMATCH);
}
} else {
BlobTransferStatus status = new BlobTransferStatus(
request.index(), request.transferId(), digestBlob
);
activeTransfers.put(request.transferId(), status);
response.status(RemoteDigestBlob.Status.PARTIAL);
}
logger.debug("startTransfer finished {} {}", response.status(), response.size());
}
public void continueTransfer(PutChunkReplicaRequest request, PutChunkResponse response, int shardId) {
BlobTransferStatus status = activeTransfers.get(request.transferId);
if (status == null) {
status = restoreTransferStatus(request, shardId);
}
addContent(request, response, status);
}
public void continueTransfer(PutChunkRequest request, PutChunkResponse response) {
BlobTransferStatus status = activeTransfers.get(request.transferId());
if (status == null) {
logger.error("No context for transfer: {} Dropping request", request.transferId());
response.status(RemoteDigestBlob.Status.PARTIAL);
return;
}
addContent(request, response, status);
}
private BlobTransferStatus restoreTransferStatus(PutChunkReplicaRequest request, int shardId) {
logger.trace("Restoring transferContext for PutChunkReplicaRequest with transferId {}",
request.transferId);
DiscoveryNode recipientNodeId = clusterService.state().getNodes().get(request.sourceNodeId);
String senderNodeId = clusterService.localNode().getId();
BlobTransferInfoResponse transferInfoResponse =
(BlobTransferInfoResponse)transportService.submitRequest(
recipientNodeId,
BlobHeadRequestHandler.Actions.GET_TRANSFER_INFO,
new BlobInfoRequest(senderNodeId, request.transferId),
TransportRequestOptions.options(),
new FutureTransportResponseHandler<TransportResponse>() {
@Override
public TransportResponse newInstance() {
return new BlobTransferInfoResponse();
}
}
).txGet();
BlobShard blobShard;
try {
blobShard = blobIndices.blobShardFuture(transferInfoResponse.index, shardId).get();
} catch (InterruptedException | ExecutionException e) {
throw new TransferRestoreException("failure loading blobShard", request.transferId, e);
}
DigestBlob digestBlob = DigestBlob.resumeTransfer(
blobShard.blobContainer(), transferInfoResponse.digest, request.transferId, request.currentPos
);
assert digestBlob != null : "DigestBlob couldn't be restored";
BlobTransferStatus status;
status = new BlobTransferStatus(transferInfoResponse.index, request.transferId, digestBlob);
activeTransfers.put(request.transferId, status);
logger.trace("Restored transferStatus for digest {} transferId: {}",
transferInfoResponse.digest, request.transferId
);
transportService.submitRequest(
recipientNodeId,
BlobHeadRequestHandler.Actions.GET_BLOB_HEAD,
new GetBlobHeadRequest(senderNodeId, request.transferId(), request.currentPos),
TransportRequestOptions.options(),
EmptyTransportResponseHandler.INSTANCE_SAME
).txGet();
return status;
}
private void addContent(IPutChunkRequest request, PutChunkResponse response, BlobTransferStatus status) {
DigestBlob digestBlob = status.digestBlob();
try {
digestBlob.addContent(request.content(), request.isLast());
} catch (BlobWriteException e) {
activeTransfers.remove(status.transferId());
throw e;
}
response.size(digestBlob.size());
if (request.isLast()) {
digestBlob.waitForHead();
try {
digestBlob.commit();
response.status(RemoteDigestBlob.Status.FULL);
} catch (DigestMismatchException e) {
response.status(RemoteDigestBlob.Status.MISMATCH);
} finally {
removeTransferAfterRecovery(status.transferId());
}
logger.debug("transfer finished digest:{} status:{} size:{} chunks:{}",
status.transferId(), response.status(), response.size(), digestBlob.chunks());
} else {
response.status(RemoteDigestBlob.Status.PARTIAL);
}
}
private void removeTransferAfterRecovery(UUID transferId) {
boolean toSchedule = false;
synchronized (lock) {
if (recoveryActive) {
/**
* the recovery target node might request the transfer context. So it is
* necessary to keep the state until the recovery is done.
*/
finishedUploads.add(transferId);
} else {
toSchedule = true;
}
}
if (toSchedule) {
logger.info("finished transfer {}, removing state", transferId);
/**
* there might be a race condition that the recoveryActive flag is still false although a
* recovery has been started.
*
* delay the state removal a bit and re-check the recoveryActive flag in order to not remove
* states which might still be needed.
*/
threadPool.schedule(STATE_REMOVAL_DELAY, ThreadPool.Names.GENERIC, new StateRemoval(transferId));
}
}
private class StateRemoval implements Runnable {
private final UUID transferId;
private StateRemoval(UUID transferId) {
this.transferId = transferId;
}
@Override
public void run() {
synchronized (lock) {
if (recoveryActive) {
finishedUploads.add(transferId);
} else {
activeTransfers.remove(transferId);
}
}
}
}
/**
* creates a countDownLatch from the activeTransfers.
* This is the number of "GetHeadRequests" that are expected from the target node
* The actual number of GetHeadRequests that will be received might be lower
* than the number that is expected.
*/
public void createActiveTransfersSnapshot() {
getHeadRequestLatch = new CountDownLatch(activeTransfers.size());
/**
* the future is used because {@link #gotAGetBlobHeadRequest(org.elasticsearch.common.UUID)}
* might be called before this method und there is a .get() call that blocks and waits
*/
getHeadRequestLatchFuture.set(getHeadRequestLatch);
}
/**
* wait until the expected number of GetHeadRequests was received or at most
* num / timeUnit.
*
* The number of GetHeadRequests that are expected is set when
* {@link #createActiveTransfersSnapshot()} is called
*
*/
public void waitForGetHeadRequests(int num, TimeUnit timeUnit) {
try {
getHeadRequestLatch.await(num, timeUnit);
} catch (InterruptedException e) {
Thread.interrupted();
}
}
public void waitUntilPutHeadChunksAreFinished() {
try {
activePutHeadChunkTransfersLatch.await();
} catch (InterruptedException e) {
Thread.interrupted();
}
}
public void gotAGetBlobHeadRequest(UUID transferId) {
/**
* this method might be called before the getHeadRequestLatch is initialized
* the future is used here to wait until it is initialized.
*/
if (getHeadRequestLatch == null) {
try {
getHeadRequestLatch = getHeadRequestLatchFuture.get();
activePutHeadChunkTransfers.add(transferId);
getHeadRequestLatch.countDown();
} catch (InterruptedException | ExecutionException e) {
logger.error("can't retrieve getHeadRequestLatch", e);
}
}
}
public void createActivePutHeadChunkTransfersSnapshot() {
activePutHeadChunkTransfersLatch = new CountDownLatch(activePutHeadChunkTransfers.size());
}
public void putHeadChunkTransferFinished(UUID transferId) {
activePutHeadChunkTransfers.remove(transferId);
if (activePutHeadChunkTransfersLatch != null) {
activePutHeadChunkTransfersLatch.countDown();
}
}
public void startRecovery() {
recoveryActive = true;
}
public void stopRecovery() {
synchronized (lock) {
recoveryActive = false;
for (UUID finishedUpload : finishedUploads) {
logger.info("finished transfer and recovery for {}, removing state", finishedUpload);
activeTransfers.remove(finishedUpload);
}
}
}
}