Overview

Namespaces

  • OpenCloud
    • Autoscale
      • Resource
    • CloudMonitoring
      • Exception
      • Resource
    • Common
      • Collection
      • Constants
      • Exceptions
      • Http
        • Message
      • Identity
      • Log
      • Service
    • Compute
      • Constants
      • Exception
      • Resource
    • Database
      • Resource
    • DNS
      • Resource
    • LoadBalancer
      • Resource
    • ObjectStore
      • Constants
      • Exception
      • Resource
      • Upload
    • Orchestration
    • Queues
      • Exception
      • Resource
    • Volume
      • Resource
  • PHP

Classes

  • AbstractTransfer
  • ConcurrentTransfer
  • ConsecutiveTransfer
  • TransferBuilder
  • TransferPart
  • TransferState
  • Overview
  • Namespace
  • Class
  • Tree
  • Download
 1: <?php
 2: /**
 3:  * PHP OpenCloud library.
 4:  * 
 5:  * @copyright 2013 Rackspace Hosting, Inc. See LICENSE for information.
 6:  * @license   https://www.apache.org/licenses/LICENSE-2.0
 7:  * @author    Jamie Hannaford <jamie.hannaford@rackspace.com>
 8:  * @author    Glen Campbell <glen.campbell@rackspace.com>
 9:  */
10: 
11: namespace OpenCloud\ObjectStore\Upload;
12: 
13: use Guzzle\Http\ReadLimitEntityBody;
14: use Guzzle\Http\EntityBody;
15: 
16: /**
17:  * A transfer type which executes in a concurrent fashion, i.e. with multiple workers uploading at once. Each worker is
18:  * charged with uploading a particular chunk of data. The entity body is fragmented into n pieces - calculated by
19:  * dividing the total size by the individual part size.
20:  *
21:  * @codeCoverageIgnore
22:  */
23: class ConcurrentTransfer extends AbstractTransfer
24: {
25:     public function transfer()
26:     {
27:         $totalParts = (int) ceil($this->entityBody->getContentLength() / $this->partSize);
28:         $workers = min($totalParts, $this->options['concurrency']);
29:         $parts = $this->collectParts($workers);
30:         
31:         while ($this->transferState->count() < $totalParts) {
32:             
33:             $completedParts = $this->transferState->count();
34:             $requests = array();
35:             
36:             // Iterate over number of workers until total completed parts is what we need it to be
37:             for ($i = 0; $i < $workers && ($completedParts + $i) < $totalParts; $i++) {
38:                 
39:                 // Offset is the current pointer multiplied by the standard chunk length
40:                 $offset = ($completedParts + $i) * $this->partSize;
41:                 $parts[$i]->setOffset($offset);
42:                 
43:                 // If this segment is empty (i.e. buffering a half-full chunk), break the iteration
44:                 if ($parts[$i]->getContentLength() == 0) {
45:                     break;
46:                 }
47: 
48:                 // Add this to the request queue for later processing
49:                 $requests[] = TransferPart::createRequest(
50:                     $parts[$i], 
51:                     $this->transferState->count() + $i + 1, 
52:                     $this->client, 
53:                     $this->options
54:                 );
55:             }
56: 
57:             // Iterate over our queued requests and process them
58:             foreach ($this->client->send($requests) as $response) {
59:                 // Add this part to the TransferState
60:                 $this->transferState->addPart(TransferPart::fromResponse($response));
61:             }
62:         }
63:     }
64: 
65:     /**
66:      * Partitions the entity body into an array - each worker is represented by a key, and the value is a
67:      * ReadLimitEntityBody object, whose read limit is fixed based on this object's partSize value. This will always
68:      * ensure the chunks are sent correctly.
69:      *
70:      * @param int    The total number of workers
71:      * @return array The worker array
72:      */
73:     private function collectParts($workers)
74:     {
75:         $uri = $this->entityBody->getUri();
76:         
77:         $array = array(new ReadLimitEntityBody($this->entityBody, $this->partSize));
78:         
79:         for ($i = 1; $i < $workers; $i++) {
80:             // Need to create a fresh EntityBody, otherwise you'll get weird 408 responses
81:             $array[] = new ReadLimitEntityBody(new EntityBody(fopen($uri, 'r')), $this->partSize);
82:         }
83: 
84:         return $array;
85:     }
86:     
87: }
PHP OpenCloud API API documentation generated by ApiGen 2.8.0