Class QueueWireTapInput

java.lang.Object
com.swiftmq.impl.streams.comp.io.QueueWireTapInput
All Implemented Interfaces:
DestinationInput, Input

public class QueueWireTapInput extends Object implements DestinationInput
Creates or re-uses a named WireTap on a Queue and adds a subscriber to it.

Messages are sent to the WireTap when the Message is inserted into the Queue. If the WireTap has multiple subscribers (that is, Streams are using the same WireTap name), Messages are distributed evenly over the subscribers in a round-robin fashion.

Per default, a deep copy of a Message is performed before it is transferred to a subscriber. So it is save to change it. If the Message will not be changed by a subscriber, the deep copy can be disabled. But be careful! All changes of a Message will have side effects.

Message transfer to a WireTap subscriber is done outside of a transaction. The Subscriber uses a BlockingQueue to receive the Messages. The BlockingQueue has a bufferSize which is 10 Messages per default. If the size is reached, the Queue waits until there is free space. The time to wait is specified in maxBlockTime with a default of 500 ms. If the timeout is reached, the Message will not be inserted, so a WireTap subscriber may not receive all Messages transferred through the Queue. For that reason, a WireTap is a good solution if Message lost can be tolerated, e.g. in statistic scenarios.

Author:
IIT Software GmbH, Muenster/Germany, (c) 2017, All Rights Reserved
  • Method Details

    • getMessageProcessor

      public QueueMessageProcessor getMessageProcessor()
      Description copied from interface: DestinationInput
      Internal use.
      Specified by:
      getMessageProcessor in interface DestinationInput
    • destinationName

      public DestinationInput destinationName(String destinationName)
      Description copied from interface: DestinationInput
      Sets the destinationName if different from the name used in stream.create().input(name)
      Specified by:
      destinationName in interface DestinationInput
      Parameters:
      destinationName - destinationName
      Returns:
      DestinationInput
    • destinationName

      public String destinationName()
      Description copied from interface: DestinationInput
      Returns the destinationName
      Specified by:
      destinationName in interface DestinationInput
      Returns:
      destinationName
    • bufferSize

      public QueueWireTapInput bufferSize(int bufferSize)
      Sets the bufferSize of the internal BlockingQueue. Default is 10 Messages.
      Parameters:
      bufferSize - Buffer Size
      Returns:
      this
    • maxBlockTime

      public QueueWireTapInput maxBlockTime(long maxBlockTime)
      Sets the maximum block time when the internal BlockingQueue is full. Default is 500 ms. If the time is reached, the Message will not be inserted.
      Parameters:
      maxBlockTime - Max Block Time
      Returns:
      this
    • requieresDeepCopy

      public boolean requieresDeepCopy()
      Returns whether Message inserts requires a deep copy (default is true).
      Returns:
      deepCopy flag
    • requiresDeepCopy

      public QueueWireTapInput requiresDeepCopy(boolean requiresDeepCopy)
      Sets whether the Message inserts requires a deep copy in order to change it later on. Default is true.
      Parameters:
      requiresDeepCopy - deepCopy flag
      Returns:
      this
    • isSelected

      public boolean isSelected(MessageImpl message)
      Internal use.
    • putMessage

      public void putMessage(MessageImpl message)
      Internal use.
    • next

      public Message next()
      Internal use.
    • setMessageProcessor

      public void setMessageProcessor(QueueMessageProcessor messageProcessor)
      Description copied from interface: DestinationInput
      Internal use.
      Specified by:
      setMessageProcessor in interface DestinationInput
    • getName

      public String getName()
      Description copied from interface: Input
      Returns the name of this Input.
      Specified by:
      getName in interface Input
      Returns:
      Name
    • getSelector

      public String getSelector()
      Internal use.
      Specified by:
      getSelector in interface DestinationInput
      Returns:
      JMS Message Selector
    • selector

      public DestinationInput selector(String selector)
      Description copied from interface: DestinationInput
      Sets the JMS Message Selector
      Specified by:
      selector in interface DestinationInput
      Parameters:
      selector - JMS Message Selector
      Returns:
      DestinationInput
    • current

      public Input current(Message current)
      Description copied from interface: Input
      Sets the current Message on this Input.
      Specified by:
      current in interface Input
      Parameters:
      current - Message
      Returns:
      Input
    • current

      public Message current()
      Description copied from interface: Input
      Returns the current Message of this Input.
      Specified by:
      current in interface Input
      Returns:
      Message
    • onInput

      public DestinationInput onInput(InputCallback callback)
      Description copied from interface: DestinationInput
      Sets the onInput callback.
      Specified by:
      onInput in interface DestinationInput
      Parameters:
      callback - callback
      Returns:
      DestinationInput
    • executeCallback

      public void executeCallback() throws Exception
      Description copied from interface: Input
      Internal use.
      Specified by:
      executeCallback in interface Input
      Throws:
      Exception
    • collect

      public void collect(long interval)
      Description copied from interface: Input
      Internal use.
      Specified by:
      collect in interface Input
    • start

      public void start() throws Exception
      Description copied from interface: Input
      Starts this Input. This method is called automatically if an Input is created outside a callback. If it is created inside, it must be called explicitly.
      Specified by:
      start in interface Input
      Throws:
      Exception
    • close

      public void close()
      Description copied from interface: Input
      Closes this Input.
      Specified by:
      close in interface Input
    • toString

      public String toString()
      Overrides:
      toString in class Object