
Thread-safe publish/subscribe queue

Thread::Queue::Multiplex Ranking & Summary


  • Rating:
  • License:
  • Academic Free License
  • Price:
  • FREE
  • Publisher Name:
  • Dean Arnold
  • Publisher web site:

Thread::Queue::Multiplex Tags

Thread::Queue::Multiplex Description

Thread-safe publish/subscribe queue Thread::Queue::Multiplex is a subclass of Thread::Queue::Duplex aka TQD which implements a "publish and subscribe" communications model for threads. Subscribers register with the queue, which registers either the provided subscriber ID, or, if no ID is provided, 1 plus the TID of the subscriber's thread, as a subscriber ID. As the publisher publishes messages to the queue, each subscriber receives a copy of the message. If the publication is not simplex, the publisher expects all subscribers to read and respond to the message; otherwise, the publisher simply continues its processing. Thread::Queue::Multiplex provides publish() method counterparts for all the Thread::Queue::Duplex enqueue() methods, e.g., publish_simplex(), publish_urgent(), publish_and_wait(), publish_and_wait_until(), etc.Subscribers receive and reply to messages using the existing TQD dequeue() and respond() methods. In addition, modified versions of the enqueue() methods are provided to publishers to permit directing a message to a single subscriber, or subset of subscribers, by specifying the scalar subscriber ID (for single subscriber messages), or an arrayref of unique subscriber ID's (for multi-subscriber messages).Thread::Queue::Multiplex subclass overrides some of the internal behavior of Thread::Queue::Duplex by * adding a shared hash to hold the list of unique subscriber ID's (provided either explicitly with subscribe(), or derived from 1 + threads->self()->tid() when the subscriber subscribe()s) mapped to a threads::shared array to hold ID's of messages published to the subscriber. (Note: tid() + 1 is used in order to avoid an ID of zero for the root thread). * adding a shared hash to hold the list of message ID's mapped to a threads::shared array to containing , where flags indicates the urgent and/or simplex status of the request, and refcount indicates the number of subscribers assigned to the request. A special refcount value of -1 indicates that only the first subscriber to retrieve/process the request should respond (to mimic the behavior of Thread::Queueu::Duplex), which is specified by the publisher using any of the enqueue methods with a subscriber ID of -1. * adding a shared hash to hold the list of message ID's mapped to a threads::shared hash containing a reference count of subscribers for the message, and a map of subscriber IDs to their responses. This "pending response" hash is used to accumulate all subscriber responses; when the reference count of a message is zero, the hash of responses is posted to the final response message mapping hash. * adding a shared hash to hold the map of thread ID's to subscriber ID's. Note: Each thread can have only a single subscriber. * changing the message mapping hash to map a unique message ID to a hash of unique subscriber ID's, mapped to their response (if any), i.e., $msg_map = { $msgid => { $subID1 => $subID1_response, $subID2 => $subID2_response, etc. } } * when the publisher dequeues the response to a message, it receives a copy of the subscriber mapping hash, and is responsible for iterating over the hash to read each subscriber's resultsA normal processing sequence for Thread::Queue::Multiplex might be: # # Thread A (the client): # ...marshal parameters for a coroutine... my $id = $tqm->publish('function_name', @paramlist); my $results = $tqm->dequeue_response($id); while (($subID, $subresult) = each %$results) { ...process $results... } # # Thread B (a subscriber): # while (1) { my $call = $tqm->dequeue; my ($id, $func, @params) = @$call; $tqm->respond($id, $self->$func(@params)); }SYNOPSIS use Thread::Queue::Multiplex; # # create new queue, limiting the max pending requests # to 20 # my $tqm = Thread::Queue::Multiplex->new(MaxPending => 20); # # register as a subscriber # $tqm->subscribe('myID'); # # unregister as a subscriber # $tqm->unsubscribe(); # # wait for $count subscribers to register # $tqm->wait_for_subscribers($count, $timeout); # # get the list of current subscribers ID's # my @subids = $tqm->get_subscribers(); # # change the max pending limit # $tqm->set_max_pending($limit); # # enqueue elements, returning a unique queue ID # (used in the client) # my $id = $tqm->publish("foo", "bar"); # # publish elements, and wait for a response # (used in the client) # my $resp = $tqm->publish_and_wait("foo", "bar"); # # publish elements, and wait for a response # until $timeout secs (used in the client) # my $resp = $tqm->publish_and_wait_until($timeout, "foo", "bar"); # # publish elements at head of queue, returning a # unique queue ID (used in the client) # my $id = $tqm->publish_urgent("foo", "bar"); # # publish elements at head of queue and wait for response # my $resp = $tqm->publish_urgent_and_wait("foo", "bar"); # # publish elements at head of queue and wait for # response until $timeout secs # my $resp = $tqm->publish_urgent_and_wait_until($timeout, "foo", "bar"); # # publish elements for simplex operation (no response) # returning the queue object # $tqm->publish_simplex("foo", "bar"); $tqm->publish_simplex_urgent("foo", "bar"); # ######################################################### # # subscribers use the existing TQD dequeue() methods # ####################################################### # # modified versions of the TQD base enqueue methods # to support directed messaging to a single subscriber # or group of subscribers # ####################################################### # # enqueue elements to a specific subscriber, returning # a unique queue ID (used in the client) # my $id = $tqm->enqueue($subID, "foo", "bar"); # # enqueue elements to 2 subscribers, and wait for a response # (used in the client) # my $resp = $tqm->enqueue_and_wait(, "foo", "bar"); # # enqueue elements, and wait for a response # until $timeout secs (used in the client) # my $resp = $tqm->enqueue_and_wait_until($subID, $timeout, "foo", "bar"); # # SEE Thread::Queue::Duplex for the various publisher enqueue() # and wait() methods, # and the subscriber dequeue() methods # Requirements: · Perl

Thread::Queue::Multiplex Related Software