1: <?php
2: /**
3: * PHP OpenCloud library.
4: *
5: * @copyright 2013 Rackspace Hosting, Inc. See LICENSE for information.
6: * @license https://www.apache.org/licenses/LICENSE-2.0
7: * @author Jamie Hannaford <jamie.hannaford@rackspace.com>
8: */
9:
10: namespace OpenCloud\Queues\Resource;
11:
12: use Guzzle\Http\Url;
13: use OpenCloud\Common\PersistentObject;
14: use OpenCloud\Common\Exceptions\InvalidArgumentError;
15: use OpenCloud\Queues\Exception;
16: use OpenCloud\Common\Metadata;
17: use OpenCloud\Common\Collection\PaginatedIterator;
18: use OpenCloud\Common\Http\Message\Formatter;
19:
20: /**
21: * A queue holds messages. Ideally, a queue is created per work type. For example,
22: * if you want to compress files, you would create a queue dedicated to this job.
23: * Any application that reads from this queue would only compress files.
24: */
25: class Queue extends PersistentObject
26: {
27: /**
28: * Maximum number of messages that can be posted at once.
29: */
30: const MAX_POST_MESSAGES = 10;
31:
32: /**
33: * The name given to the queue. The name MUST NOT exceed 64 bytes in length,
34: * and is limited to US-ASCII letters, digits, underscores, and hyphens.
35: *
36: * @var string
37: */
38: private $name;
39:
40: /**
41: * Miscellaneous, user-defined information about the queue.
42: *
43: * @var array|Metadata
44: */
45: protected $metadata;
46:
47: /**
48: * Populated when the service's listQueues() method is called. Provides a
49: * convenient link for a particular Queue.md.
50: *
51: * @var string
52: */
53: private $href;
54:
55: protected static $url_resource = 'queues';
56: protected static $json_collection_name = 'queues';
57: protected static $json_name = false;
58:
59: public $createKeys = array('name');
60:
61: /**
62: * Set the name (with validation).
63: *
64: * @param $name string
65: * @return $this
66: * @throws \OpenCloud\Queues\Exception\QueueException
67: */
68: public function setName($name)
69: {
70: if (preg_match('#[^\w\d\-\_]+#', $name)) {
71: throw new Exception\QueueException(sprintf(
72: 'Queues names are restricted to alphanumeric characters, '
73: . ' hyphens and underscores. You provided: %s',
74: print_r($name, true)
75: ));
76: }
77:
78: $this->name = $name;
79: return $this;
80: }
81:
82: /**
83: * @return string
84: */
85: public function getName()
86: {
87: return $this->name;
88: }
89:
90: /**
91: * Sets the metadata for this Queue.
92: *
93: * @param $data
94: * @return $this
95: * @throws \OpenCloud\Common\Exceptions\InvalidArgumentError
96: */
97: public function setMetadata($data)
98: {
99: // Check for either objects, arrays or Metadata objects
100: if ($data instanceof Metadata) {
101: $metadata = $data;
102: } elseif (is_array($data) || is_object($data)) {
103: $metadata = new Metadata();
104: $metadata->setArray($data);
105: } else {
106: throw new InvalidArgumentError(sprintf(
107: 'You must specify either an array/object of parameters, or an '
108: . 'instance of Metadata. You provided: %s',
109: print_r($data, true)
110: ));
111: }
112:
113: $this->metadata = $metadata;
114: return $this;
115: }
116:
117: /**
118: * Save this metadata both to the local object and the API.
119: *
120: * @param array $params
121: * @return mixed
122: */
123: public function saveMetadata(array $params = array())
124: {
125: if (!empty($params)) {
126: $this->setMetadata($params);
127: }
128:
129: $json = json_encode((object) $this->getMetadata()->toArray());
130:
131: return $this->getClient()->put($this->getUrl('metadata'), array(), $json)->send();
132: }
133:
134: /**
135: * Returns the metadata associated with a Queue.md.
136: *
137: * @return Metadata|null
138: */
139: public function getMetadata()
140: {
141: return $this->metadata;
142: }
143:
144: /**
145: * Retrieve metadata from the API and set it to the local object.
146: */
147: public function retrieveMetadata()
148: {
149: $response = $this->getClient()->get($this->url('metadata'))->send();
150:
151: $metadata = new Metadata();
152: $metadata->setArray(Formatter::decode($response));
153: $this->setMetadata($metadata);
154: }
155:
156: public function create($params = array())
157: {
158: return $this->getService()->createQueue($params);
159: }
160:
161: public function createJson()
162: {
163: return (object) array(
164: 'queue_name' => $this->getName(),
165: 'metadata' => $this->getMetadata(false)
166: );
167: }
168:
169: public function primaryKeyField()
170: {
171: return 'name';
172: }
173:
174: public function update($params = array())
175: {
176: return $this->noUpdate();
177: }
178:
179: /**
180: * This operation returns queue statistics including how many messages are
181: * in the queue, broken out by status.
182: *
183: * @return object
184: */
185: public function getStats()
186: {
187: $response = $this->getClient()
188: ->get($this->getUrl('stats'))
189: ->send();
190:
191: $body = Formatter::decode($response);
192:
193: return (!isset($body->messages)) ? false : $body->messages;
194: }
195:
196: /**
197: * Gets a message either by a specific ID, or, if no ID is specified, just
198: * an empty Message object.
199: *
200: * @param string|null $id If a string, then the service will retrieve an
201: * individual message based on its specific ID. If NULL, then an empty
202: * object is returned for further use.
203: * @return Message
204: */
205: public function getMessage($id = null)
206: {
207: return $this->getService()->resource('Message', $id, $this);
208: }
209:
210: /**
211: * Post an individual message.
212: *
213: * @param array $params
214: * @return bool
215: */
216: public function createMessage(array $params)
217: {
218: return $this->createMessages(array($params));
219: }
220:
221: /**
222: * Post multiple messages.
223: *
224: * @param array $messages
225: * @return bool
226: */
227: public function createMessages(array $messages)
228: {
229: $objects = array();
230:
231: foreach ($messages as $dataArray) {
232: $objects[] = $this->getMessage($dataArray)->createJson();
233: }
234:
235: $json = json_encode(array_slice($objects, 0, self::MAX_POST_MESSAGES));
236: $this->checkJsonError();
237:
238: $response = $this->getClient()
239: ->post($this->getUrl('messages'), array(), $json)
240: ->send();
241:
242: if (null !== ($location = $response->getHeader('Location'))) {
243:
244: $parts = array_merge($this->getUrl()->getParts(), parse_url($location));
245: $url = Url::buildUrl($parts);
246:
247: return $this->getService()->resourceList('Message', $url, $this);
248: }
249:
250: return true;
251: }
252:
253: /**
254: * Lists messages according to certain filter options. Results are ordered
255: * by age, oldest message first. All of the parameters are optional.
256: *
257: * @param array $options An associative array of filtering parameters:
258: *
259: * - ids (array) A two-dimensional array of IDs to retrieve.
260: *
261: * - claim_id (string) The claim ID to which the message is associated.
262: *
263: * - marker (string) An opaque string that the client can use to request the
264: * next batch of messages. If not specified, the API will return all
265: * messages at the head of the queue (up to limit).
266: *
267: * - limit (integer) A number up to 20 (the default, but is configurable)
268: * queues to return. If not specified, it defaults to 10.
269: *
270: * - echo (bool) Determines whether the API returns a client's own messages,
271: * as determined by the uuid portion of the User-Agent header. If not
272: * specified, echo defaults to FALSE.
273: *
274: * - include_claimed (bool) Determines whether the API returns claimed
275: * messages as well as unclaimed messages. If not specified, defaults
276: * to FALSE (i.e. only unclaimed messages are returned).
277: *
278: * @return Collection
279: */
280: public function listMessages(array $options = array())
281: {
282: // Implode array into delimeted string if necessary
283: if (isset($options['ids']) && is_array($options['ids'])) {
284: $options['ids'] = implode(',', $options['ids']);
285: }
286:
287: $url = $this->url('messages', $options);
288:
289: return $this->getService()->resourceList('Message', $url, $this);
290: }
291:
292: /**
293: * This operation immediately deletes the specified messages, providing a
294: * means for bulk deletes.
295: *
296: * @param array $ids Two-dimensional array of IDs to be deleted
297: * @return boolean
298: */
299: public function deleteMessages(array $ids)
300: {
301: $url = $this->url('messages', array('ids' => implode(',', $ids)));
302: $this->getClient()->delete($url)->send();
303: return true;
304: }
305:
306: /**
307: * This operation claims a set of messages, up to limit, from oldest to
308: * newest, skipping any that are already claimed. If no unclaimed messages
309: * are available, FALSE is returned.
310: *
311: * You should delete the message when you have finished processing it,
312: * before the claim expires, to ensure the message is processed only once.
313: * Be aware that you must call the delete() method on the Message object and
314: * pass in the Claim ID, rather than relying on the service's bulk delete
315: * deleteMessages() method. This is so that the server can return an error
316: * if the claim just expired, giving you a chance to roll back your processing
317: * of the given message, since another worker will likely claim the message
318: * and process it.
319: *
320: * Just as with a message's age, the age given for the claim is relative to
321: * the server's clock, and is useful for determining how quickly messages are
322: * getting processed, and whether a given message's claim is about to expire.
323: *
324: * When a claim expires, it is removed, allowing another client worker to
325: * claim the message in the case that the original worker fails to process it.
326: *
327: * @param int $limit
328: */
329: public function claimMessages(array $options = array())
330: {
331: $limit = (isset($options['limit'])) ? $options['limit'] : Claim::LIMIT_DEFAULT;
332: $grace = (isset($options['grace'])) ? $options['grace'] : Claim::GRACE_DEFAULT;
333: $ttl = (isset($options['ttl'])) ? $options['ttl'] : Claim::TTL_DEFAULT;
334:
335: $json = json_encode((object) array(
336: 'grace' => $grace,
337: 'ttl' => $ttl
338: ));
339:
340: $url = $this->getUrl('claims', array('limit' => $limit));
341:
342: $response = $this->getClient()->post($url, array(), $json)->send();
343:
344: if ($response->getStatusCode() == 204) {
345: return false;
346: }
347:
348: $options = array('resourceClass' => 'Message', 'baseUrl' => $url);
349:
350: return PaginatedIterator::factory($this, $options, Formatter::decode($response));
351: }
352:
353: /**
354: * If an ID is supplied, the API is queried for a persistent object; otherwise
355: * an empty object is returned.
356: *
357: * @param int $id
358: * @return Claim
359: */
360: public function getClaim($id = null)
361: {
362: return $this->getService()->resource('Claim', $id, $this);
363: }
364:
365: }