1: <?php
2: /**
3: * PHP OpenCloud library.
4: *
5: * @copyright 2013 Rackspace Hosting, Inc. See LICENSE for information.
6: * @license https://www.apache.org/licenses/LICENSE-2.0
7: * @author Jamie Hannaford <jamie.hannaford@rackspace.com>
8: * @author Glen Campbell <glen.campbell@rackspace.com>
9: */
10:
11: namespace OpenCloud\ObjectStore\Upload;
12:
13: use Guzzle\Http\ReadLimitEntityBody;
14: use Guzzle\Http\EntityBody;
15:
16: /**
17: * A transfer type which executes in a concurrent fashion, i.e. with multiple workers uploading at once. Each worker is
18: * charged with uploading a particular chunk of data. The entity body is fragmented into n pieces - calculated by
19: * dividing the total size by the individual part size.
20: *
21: * @codeCoverageIgnore
22: */
23: class ConcurrentTransfer extends AbstractTransfer
24: {
25: public function transfer()
26: {
27: $totalParts = (int) ceil($this->entityBody->getContentLength() / $this->partSize);
28: $workers = min($totalParts, $this->options['concurrency']);
29: $parts = $this->collectParts($workers);
30:
31: while ($this->transferState->count() < $totalParts) {
32:
33: $completedParts = $this->transferState->count();
34: $requests = array();
35:
36: // Iterate over number of workers until total completed parts is what we need it to be
37: for ($i = 0; $i < $workers && ($completedParts + $i) < $totalParts; $i++) {
38:
39: // Offset is the current pointer multiplied by the standard chunk length
40: $offset = ($completedParts + $i) * $this->partSize;
41: $parts[$i]->setOffset($offset);
42:
43: // If this segment is empty (i.e. buffering a half-full chunk), break the iteration
44: if ($parts[$i]->getContentLength() == 0) {
45: break;
46: }
47:
48: // Add this to the request queue for later processing
49: $requests[] = TransferPart::createRequest(
50: $parts[$i],
51: $this->transferState->count() + $i + 1,
52: $this->client,
53: $this->options
54: );
55: }
56:
57: // Iterate over our queued requests and process them
58: foreach ($this->client->send($requests) as $response) {
59: // Add this part to the TransferState
60: $this->transferState->addPart(TransferPart::fromResponse($response));
61: }
62: }
63: }
64:
65: /**
66: * Partitions the entity body into an array - each worker is represented by a key, and the value is a
67: * ReadLimitEntityBody object, whose read limit is fixed based on this object's partSize value. This will always
68: * ensure the chunks are sent correctly.
69: *
70: * @param int The total number of workers
71: * @return array The worker array
72: */
73: private function collectParts($workers)
74: {
75: $uri = $this->entityBody->getUri();
76:
77: $array = array(new ReadLimitEntityBody($this->entityBody, $this->partSize));
78:
79: for ($i = 1; $i < $workers; $i++) {
80: // Need to create a fresh EntityBody, otherwise you'll get weird 408 responses
81: $array[] = new ReadLimitEntityBody(new EntityBody(fopen($uri, 'r')), $this->partSize);
82: }
83:
84: return $array;
85: }
86:
87: }