1: <?php
2: /**
3: * PHP OpenCloud library.
4: *
5: * @copyright 2013 Rackspace Hosting, Inc. See LICENSE for information.
6: * @license https://www.apache.org/licenses/LICENSE-2.0
7: * @author Jamie Hannaford <jamie.hannaford@rackspace.com>
8: */
9:
10: namespace OpenCloud\Queues;
11:
12: use Guzzle\Common\Event;
13: use Guzzle\Http\Exception\BadResponseException;
14: use OpenCloud\Common\Exceptions\InvalidArgumentError;
15: use OpenCloud\Common\Service\AbstractService;
16: use Symfony\Component\EventDispatcher\EventSubscriberInterface;
17:
18: /**
19: * Cloud Queues is an open source, scalable, and highly available message and
20: * notifications service. Users of this service can create and manage a
21: * "producer-consumer" or a "publisher-subscriber" model from one simple API. It
22: * is made up of a few basic components: queues, messages, claims, and stats.
23: *
24: * In the producer-consumer model, users create queues where producers
25: * can post messages. Workers, or consumers, may then claim those messages and
26: * delete them once complete. A single claim may contain multiple messages, and
27: * administrators are given the ability to query claims for status.
28: *
29: * In the publisher-subscriber model, messages are posted to a queue like above,
30: * but messages are never claimed. Instead, subscribers, or watchers, simply
31: * send GET requests to pull all messages since their last request. In this
32: * model, a message will remain in the queue, unclaimed, until the message's
33: * time to live (TTL) has expired.
34: *
35: * Here is an overview of the Cloud Queues workflow:
36: *
37: * 1. You create a queue to which producers post messages.
38: *
39: * 2. Producers post messages to the queue.
40: *
41: * 3. Workers claim messages from the queue, complete the work in that message,
42: * and delete the message.
43: *
44: * 4. If a worker plans to be offline before its message completes, the worker
45: * can retire the claim TTL, putting the message back into the queue for
46: * another worker to claim.
47: *
48: * 5. Subscribers monitor the claims of these queues to keep track of activity
49: * and help troubleshoot if things go wrong.
50: *
51: * For the majority of use cases, Cloud Queues itself will not be responsible
52: * for the ordering of messages. If there is only a single producer, however,
53: * Cloud Queueing guarantees that messages are handled in a First In, First Out
54: * (FIFO) order.
55: */
56: class Service extends AbstractService implements EventSubscriberInterface
57: {
58: const DEFAULT_TYPE = 'rax:queues';
59: const DEFAULT_NAME = 'cloudQueues';
60:
61: public static function getSubscribedEvents()
62: {
63: return array(
64: 'request.before_send' => 'appendClientIdToRequest'
65: );
66: }
67:
68: /**
69: * Append the Client-ID header to all requests for this service.
70: *
71: * @param Event $event
72: */
73: public function appendClientIdToRequest(Event $event)
74: {
75: $event['request']->addHeader('Client-ID', $this->getClientId());
76: }
77:
78: /**
79: * An arbitrary string used to differentiate your worker/subscriber. This is
80: * needed, for example, when you return back a list of messages and want to
81: * know the ones your worker is processing.
82: *
83: * @var string
84: */
85: private $clientId;
86:
87: /**
88: * @param null $clientId
89: * @return $this
90: */
91: public function setClientId($clientId = null)
92: {
93: if (!$clientId) {
94: $clientId = self::generateUuid();
95: }
96: $this->clientId = $clientId;
97: return $this;
98: }
99:
100: /**
101: * @return string
102: */
103: public function getClientId()
104: {
105: return $this->clientId;
106: }
107:
108: /**
109: * Create a new Queue.
110: *
111: * @param $name Name of the new queue
112: * @return Queue
113: */
114: public function createQueue($name)
115: {
116: if (!is_string($name)) {
117: throw new InvalidArgumentError(
118: 'The only parameter required to create a Queue is a string name. Metadata can be set with '
119: . 'Queue::setMetadata and Queue::saveMetadata'
120: );
121: }
122:
123: $queue = $this->getQueue();
124: $queue->setName($name);
125:
126: // send the request
127: $this->getClient()->put($queue->getUrl())->send();
128:
129: return $queue;
130: }
131:
132: /**
133: * This operation lists queues for the project, sorting the queues
134: * alphabetically by name.
135: *
136: * @param array $params An associative array of optional parameters:
137: *
138: * - marker (string) Specifies the name of the last queue received in a
139: * previous request, or none to get the first page of
140: * results. Optional.
141: *
142: * - limit (integer) Specifies up to 20 (the default, but configurable)
143: * queues to return. Optional.
144: *
145: * - detailed (bool) Determines whether queue metadata is included in the
146: * response. Optional.
147: *
148: * @return Collection
149: */
150: public function listQueues(array $params = array())
151: {
152: return $this->resourceList('Queue', $this->getUrl('queues', $params));
153: }
154:
155: /**
156: * Return an empty Queue.md object.
157: *
158: * @return Queue
159: */
160: public function getQueue($id = null)
161: {
162: return $this->resource('Queue', $id);
163: }
164:
165: /**
166: * This operation checks to see if the specified queue exists.
167: *
168: * @param string $name The queue name that you want to check
169: * @return bool
170: */
171: public function hasQueue($name)
172: {
173: if (!$name || !is_string($name)) {
174: throw new InvalidArgumentError(sprintf(
175: 'You must provide a queue name as a valid string. You provided: %s',
176: print_r($name, true)
177: ));
178: }
179:
180: try {
181: $url = $this->getUrl();
182: $url->addPath('queues')->addPath($name);
183:
184: $this->getClient()->head($url)->send();
185:
186: return true;
187: } catch (BadResponseException $e) {
188: return false;
189: }
190: }
191:
192: }