MokaByte 68 - 9mbre 2002 
Java Message Service
III parte Message Selector e le Durable Subscription
di
Stefano Rossini
Prosegue la mini serie di articoli dedicati a JMS. Nei numeri precedenti si parlato in generale dei sistemi MOM,JMS e delle relative API [JMS1] ed stato presentato un esempio completo di applicazione JMS di tipo Publish/Subscribe per realizzare una Chat[JMS2]. Questa terza parte dedicata a parlare dei message selector e delle destinazioni persistenti.

Come già detto nei numeri precedenti, JMS è l'acronimo di Java Message Service ed indica un'insieme di API che permettono di utilizzare i sistemi MOM disponibili sul mercato.
Esistono due diversi modelli di scambio messaggi:

  • il modello di Messaging Point-To-Point è caratterizzato dal fatto che piu' produttori e piu' consumatori possono condividere la stessa destinazione (Queue) e ogni messaggio ha un solo consumatore; questo permette di fatto una comunicazione uno a uno.
  • il modello Publish/Subscribe permette una comunicazione uno a molti dal momento che ogni messaggio inserito nella destinazione (topic) può essere inoltrato a tutti i destinatari che si sono dichiarati (subscription) interessati.

 


Figura 1

L'esempio (vedi [JMS2]) è un'applicazione JMS basata sul modello Publish/Subscribe e sulla modalità di ricezione asincrona: in questo caso si è voluto mostrare come sia possibile permettere lo scambio tra più produttori e consumatori che condividono la medesima destinazione realizzando di fatto un tipico sistema di chat di messaggi.

 

La chat JMS sportiva : i Message Selectors
Partendo dall'esempio presentato, si può pensare di estendere il concetto di chat semplice introducendo la possibilità di gestire gruppi di utenza omogenei: nel caso specifico ogni gruppo sarà rappresentato da utenti tifosi della medesima squadra di football americano.
Da un punto di vista JMS, questo scenario si traduce nella propagazione di messaggi dai produttori solo verso i consumatori interessati al messaggio in questione, o più brevemente nello scambio "filtrato" di messaggi tra i partecipanti della chat.
E' necessario quindi creare dei filtri applicativi in grado di "scremare" i messaggi e permetterne l'inoltro in modo appropriato: a livello applicativo un filtro deve essere specificato sia in fase di creazione del subscriber che in fase di invio del messaggio.
Anche un consumatore quando si connette al server può definire un filtro per poter leggere solo i messaggi ai quali è interessato.
Tale filtro, detto message selector, è un oggetto di classe String che può contenere valori boolean literals (TRUE, true, FALSE and false), operatori logici(NOT, AND, OR),aritmetici(+,-,*,/), di comparazione(=, >, >=, <, <=, <> ), di ricerca(IS,LIKE,IN,NOT IN) e così via, dando vita ad una sintassi condizionale sottoinsieme del linguaggio SQL-92.
Il controllo e quindi la selezione avviene controllando, mediante i message selector, l'header e i property fields (non il body message) di un messaggio JMS; tali campi possono essere esaminati.
E' importante notare che la selezione avviene sul server, in modo da inoltrare ai client lungo la rete solamente i messaggi strettamente necessari e/o utili risparmiando così la banda del canale.
I property field sono valorizzabili mediante i metodi setObjectProperty e getObjectProperty.
Tornando all'esempio della chat sportiva, si può pensare di definire un message selector che selezioni i messaggi in funzione della squadra a cui fa riferimento il messaggio stesso; ad esempio

private String teamName = null;

// Set the Message Selector
String strMessageSelector = "TEAM = '" + this.teamName + "'";

La squadra preferita verrà richiesta all'utente in fase di startup dell'applicazione e memorizzata nella proprietà teamName della classe.
In fase di creazione del subscriber si deve provvedere a specificare il message selector come secondo parametro del metodo createSubscriber().

topicSubscriber = topicSession.createSubscriber(topic, strMessageSelector, true)

mentre prima dell'invio del messaggio si dovrà impostare tramite il metodo setStringProperty() il valore del property field con il nome della squadra favorita.

// Set the Message Selector in TX
message.setStringProperty("TEAM",this.teamName);
topicPublisher.publish(message);

E' il ProviderJMS che gestisce direttamente il recapito appropriato dei messaggi.


Figura 2


I messaggi inviati dall'utente A vengo recapitati al solo utente C essendo tifosi della medesima squadra ma non agli utenti "avversari" B e D.

 

La chat JMS "persistente" : DurableSubscribers
Un'importante caratteristica offerta dai sistemi MOM è il concetto di Guaranteed Message Delivery (GMD) che garantisce la consegna del messaggio, anche in caso di problemi al sistema o peggio ancora di crash, che porterebbe ad una indesiderata perdita di informazioni: è per questo motivo che i messaggi possono essere resi persistenti (ad es: mediante file o mediante JDBC) prima di essere recapitati ai consumatori.
La persistenza dei messaggi da vita al concetto di durable subscriptions le quali consentono inoltre la ricezione dei messaggi anche se il subscriber non era in ascolto al momento della generazione del messaggio.
Il metodo createSubscriber() della TopicSession crea un subscriber non persistente (non durable subscriber), mentre il createDurableSubscriber()permette di creare subscriber persistenti. Nel caso di topic non persistente se la sottoscrizione avviene al tempo t=0 e finisce al tempo t=3, e successivamente viene creata nuovamente al tempo t=5 e termina al tempo t=6, il messaggio M2 arrivato al tempo t=4 non viene ricevuto dall'applicazione.
Nel caso di durable subscription il messaggio M2 viene memorizzato dal Provider e recapitato al tempo t=5 all'applicazione senza essere perso.
La sottoscrizione di tipo persistente deve essere chiusa in modo esplicito invocando il metodo unsubscribe() sull'oggetto TopicSession specificando l'ID della sottoscrizione da rimuovere.
La sottoscrizione persistente dura da quando viene creata

topicSession.createDurableSubscriber(….);

fino a quando viene chiusa

topicSession.unsubscribe(…);

Dalla figura 3 si vede la differente durata della vita dei messaggi.

 


Figura 3

 

Le diverse durate di una sottoscrizione ordinaria e di una persistente
Nel caso in cui le applicazioni JMS non possono tollerare perdite di dati o duplicazioni di messaggi l'utilizzo di destinazioni persistenti diventa praticamente d'obbligo.
Si deve comunque tenere ben presente che la gestione delle durable subscriptions è introduce un overhead, dato che il JMS Provider deve provvedere salvare i messaggi in un qualche sistema di persistenza: su file o su database.
Ad esempio in JBoss, il meccanismo di default di persistenza di JBoss MQ Server è costituito da un file che permette di memorizzare 1000 messaggi prima di iniziare a scartare i più vecchi (tecnicamente tale gestione è detta persistenza con RolloverFile).
Nel caso in cui si voglia utilizzare un sistema DBMS accessibile mediante JDBC, si dovrà creare l'opportuna tabella che permetta la memorizzazione dell'ID del messaggio (che funzionerà da chiave della tabella), il nome della destinazione di provenienza ed il messaggio da memorizzare; lo script SQL ad esempio potrebbe essere qualcosa del tipo

CREATE TABLE JMS_MESSAGE
{
MESSAGEID CHAR(17) NOT NULL,
DESTINATION VARCHAR(30) NOT NULL,
MESSAGEBLOB BLOB,
PRIMARY KEY (MESSAGEDID,DESTINATION)
};


Da un punto di vista applicativo, per creare un durable subscriber bisogna invocare sull'oggetto TopicSession il metodo createSubscriber() specificando il topic (non temporaneo) d'interesse ed un nome che identifichi univocamente il durable subscriber ed inoltre è possibile specificare un message selector e decider se inibire la ricezione dei propri messaggi pubblicati.
L'id univoco del durable subscriber viene determinato da:

  • un client id associato alla connessione
  • un topic ed un nome che rappresenta l'identificativo univoco della subscriber

L'identificativo univoco associato al durable subscriber serve al JMS server per memorizzare i messaggi arrivati mentre il subscriber non è attivo.
Quando il subscriber si riconnette (logicamente con il medesimo identificativo), il JMS server provvede ad inviare tutti i messaggi unexpired accumulati fino a quel momento. Questo meccanismo è comunemente chiamato store-and-forward ed è alla base del Guaranteed Message Delivery (GMD).

 

Il Message Persistence
Le API JMS permettono si specificare due possibili modi di inviare i messaggi (modo persistente e non) mediante i quali si decide se i messaggi verranno perduti in seguito ad un crash del JMS provider.
Queste modalità di invio sono definite tramite i campi dell'interfaccia DeliveryMode. Il delivery mode di default è quello persistente .
Il DeliveryMode.PERSISTENT istruisce il JMS provider ad assicurare che il messaggio non sia perso nel caso in cui verifichi un errore del JMS provider stesso; è compito del JMS provider stesso memorizzare opportunamente il messaggio.
Il DeliveryMode.NON_PERSISTENT non richiede al JMS provider di rendere persistente i messaggi inviati che quindi verranno persi nel caso si verifichi un errore da parte del JMS provider stesso.
In fase di stesura del codice è possibile specificare il delivery mode in due modi: il primo prevede si basa sull'utilizzo del metodo setDeliveryMode() dell'interfaccia MessageProducer (da cui estendono sia l'interfaccia QueueSender e TopicPublisher), per configurare la modalità di delivery dei messaggi prodotti da quel produttore.

topicPublisher.setDeliveryMode(DeliveryMode.PERSISTENT)

Alternativamente si può impostare la modalità delivery sul singolo messaggio mediante parametro nei metodi send() o publish()

topicPublisher.publish(<message>, DeliveryMode.NON_PERSISTENT, <priority level>, <expirtation time>);

Nel metodo publish() il terzo e quarto argomento permettono rispettivamente di gestire il livello di priorità del messaggio (priority level) e il suo tempo ti vita (expiration time).
Livello di priorità dei messaggi

Se non si specifica la priorità del messaggio in fase di trasmissione, il livello di priorità di default è 4.
Per specificare la priorità del messaggio sono possibili due modi.
Il primo consiste nell'invocare il metodo setTimeToLive sull'interfaccia MessageProducer, per specificare il timeToLive valido per tutti i messaggi prodotti da quel produttore.

topicPublisher.setPriority(3);

Alternativamente è possibile indicare la priorità per uno specifico messaggio come terzo argomento del metodo send o publish.

topicPublisher.publish(<message>,<DeliveryMode>,3,<timeToLive>);

 

Expiration time
Per permettere ai messaggi di "spirare" è possibile specificare la proprietà expiration time.
Di default un messaggio ha tempo di vita infinito.
Da un punto di vista applicativo però si può avere necessità che un messaggio, diventato obsoleto, venga rimosso dal Provider.
Per specificare il tempo di vita di un messaggio si può invocare il metodo setPriority() sull'interfaccia MessageProducer, per specificare la priorità valida per tutti i messaggi prodotti da quel produttore, oppure specificare il valore (espresso in millisecondi) come quarto argomento del metodo send() o publish() per lo specifico messaggio
Specificando un tempo di vita pari a zero, i messaggi non verranno mai marcati come obsoleti. Qualsiasi messaggio che non sia stato inviato entro l'expiration date sarà distrutto.
In definitiva il metodo publish() permette di specificare i vari parametri di esecuzione grazie alle tre firme

public void publish(Message message)throws JMSException
public void publish(Topic topic,Message message)throws JMSException
public void publish(Message message,int deliveryMode,int priority,long timeToLive) throws JMSException

Riconsiderando l'esempio della chat, per implementare la versione durable, è necessario innazitutto identificare univocamente una conessione JMS.
Questa operazione può variare a secondo del tipo di JMS provider che si utilizza.
Infatti alcuni JMS provider possono richiedere una configurazione statica di tale id o permettere la configurazione dinamica mediante il metodo

public void setClientID(java.lang.String clientID) throws JMSException

il quale permette di specificare l'identificativo del client da associare alla connessione JMS.
Questa permette di associare al cliente la stato della connessione e dei relativi oggetti .
Il metodo setClient() della classe TopicConnection va utilizzato immediatamente dopo avere creato la connessione e prima di qualsiasi operazione che coinvolga lo stesso oggetto Connection. Se utilizzato altrove verrà sollevata l'eccezione IllegalStateException così come se si cerca di utilizare un client id già in uso viene lanciata una InvalidClientIDException.
Se si utilizza ad esempio il server WebLogic, è possibile utilizzare il metodo setClientID nel seguente modo

this.topicConnection=topicConnectionFactory.createTopicConnection();
System.out.println("ClientID: "+topicConnection.getClientID());
// imposta il clientId utilizzando lo userName
this.topicConnection.setClientID(this.userName);
System.out.println("ClientID :"+topicConnection.getClientID());
topicSession = topicConnection.createTopicSession(false,
                               Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.createDurableSubscriber(
                               topic,this.userName);

Se si si omette l'invocazione del metodo setClientID() si ottiene un'eccezione che indica che la connessione non ha collegato un identificativo valido:

# JMSException : weblogic.jms.common.JMSException: Connection clientID is null
weblogic.jms.common.JMSException: Connection clientID is null
...

Se si provasse ad utilizzare JBossMQ, il JMS provider integrato in JBoss, allora l'invocazione del metodo setClientId() solleverebbe un'eccezione IllegalStateException visto che già un ID è stato associato alla connessione da parte del Provider stesso: infatti in JBoss l'assegnazione dell'id univoco alla connessione avviene mediante il file di configurazione jbossmq-state.xml, all'interno del quale si deve specificare username, password e client id per lo specifico topic su cui si vuole creare la durable subscription.
Questo meccanismo di configurazione permette meno flessibilità a run-time ma più controllo nella gestione delle risorse.
Il codice funzionante con questa modalità di configurazione è:

Topic topic = (Topic) jndiContext.lookup("MokaTopic");
this.topicConnection = topicConnectionFactory.
                       createTopicConnection("stefano","mokabyte");
System.out.println("ClientID is :"+topicConnection.getClientID());
topicSession = topicConnection.
               createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.
               
  createDurableSubscriber(topic,this.userName);

Questo implica avere dichiarato nel file <JBOSS_HOME>\conf\default\jboss.jcml gli opportuni grant alla destinazione sulla quale si vogliono creare le durable subscription.
Per il corretto funzionamento dell'esempio sopra riportato si deve avere la seguente configurazione:

<?xml version="1.0" encoding="UTF-8"?>
<StateManager>
. . .
<User>
<Name>stefano</Name>
<Password>mokabyte</Password>
<Id>id123</Id>
<DurableSubscription>
<Name>Moka Durable Example</Name>
<TopicName>MokaTopic</TopicName>
</DurableSubscription>
</User>
. . .
</StateManager>

Si dice che l' utente di nome stefano (tag Name)e password mokabyte (tag Password) avrà un ID della connessione di valore id123 (tag Id) ed è in grado di creare delle durable subscription sul topic MokaDurableTopic.
Se si prova a collegarsi con username=stefano e password=mokabyte, allora topicConnection.getClientID() resituisce il valore id123.

 

L'applicazione MokaJmsSpy
L'applicazione it.mokabyte.jms.spy.MokaJmsSpy è un client JMS che permette di visualizzare alcune utili informazioni: ad esempio si possono ricavare i valori di default del messaggio JMS e della tipologia di invio mediante il pulsante MESSAGE_SETTINGS, a cui è agganciato il seguente listener

buttonMsgSet.addActionListener(new java.awt.event.ActionListener() {
  public void actionPerformed(ActionEvent e) {
  String info = "Message.DEFAULT_PRIORITY:"+
                 Message.DEFAULT_PRIORITY+"\n";
         info += "Message.DEFAULT_TIME_TO_LIVE:"+
                  Message.DEFAULT_TIME_TO_LIVE+"\n";
         info+= "Message.DEFAULT_DELIVERY_MODE:"+
                 Message.DEFAULT_DELIVERY_MODE+"\n";
         info += "DeliveryMode.NON_PERSISTENT:"+
                  DeliveryMode.NON_PERSISTENT+"\n";
         info += "DeliveryMode.PERSISTENT:"+
                  DeliveryMode.PERSISTENT+"\n";
  JOptionPane.showMessageDialog(null, info, "Message settings",                                 JOptionPane.INFORMATION_MESSAGE);
  }
});

Il pulsante MESSAGE_DATA (l'oggetto buttonMetaData) permette di ricavare i dati descrittivi del connessione JMS; in questo caso si ottengono i metadata della connessione (ConnectionMetaData) invocando il metodo getMetaData() sull'oggetto TopicConnection.

buttonMetaData.addActionListener(new ActionListener(){
  public void actionPerformed(ActionEvent e) {
    try{
      ConnectionMetaData
cmd;
      connMetaData = topicConnection.getMetaData();
      String
info;
      info = "JMSProviderName: " + cmd.getJMSProviderName()+"\n";
      info += "JMSProviderVersion: " + cmd.getProviderVersion()+\n";
      info += "JMSVersion: " + cmd.getJMSVersion() + "\n";
      JOptionPane.showMessageDialog(null,
                                    info,
                                    "Topic Connection Meta Data",
                                   JOptionPane.INFORMATION_MESSAGE);
     }
     catch(JMSException jmse) {. . .
     ...
     }
  }
});

 


Figura 4

 

Le proprietà ed il body del messaggio ricevuto presso il topic su cui l'applicazione si è registrata possono essere ottenuti tramite il seguente codice

this.append("Received message class["+
             message.getClass().getName()+"]");
this.append("JMSCorrelationID : " +
             message.getJMSCorrelationID());
this.append("JMSMessageID : " + message.getJMSMessageID());
this.append("JMSDeliveryMode : " + message.getJMSDeliveryMode());
this.append("JMSPriority : " + message.getJMSPriority());
this.append("JMSType : " + message.getJMSType());
this.append("JMSDestination : " + message.getJMSDestination());
this.append("JMSExpiration : " + message.getJMSExpiration());
this.append("JMSTimestamp : " + message.getJMSTimestamp());
this.append("JMSReplyTo : " + message.getJMSReplyTo());
java.util.Enumeration enum = message.getPropertyNames();
while(enum.hasMoreElements()){
  String str = (String)enum.nextElement();
  this.append("proprietà["+str+"] : " +   message.getObjectProperty(str));
}

La lista dei messaggi presenti nella coda, su cui l'applicazione si è registrata come listener, possono essere ricavati mediante la classe QueueBrowser. Tale classe permette di analizzare i messaggi (non ancora consumati) presenti in una certa coda senza rimuoverli.

QueueBrowser.addActionListener(new java.awt.event.ActionListener() {
  public void actionPerformed(ActionEvent e) {
    try{
      textArea.setText("");
      QueueSession queueSession =                   
                   queueConnection.createQueueSession(
                                   false,Session.AUTO_ACKNOWLEDGE);
      QueueBrowser browser = queueSession.createBrowser(queue);
      java.util.Enumeration enum = browser.getEnumeration();
      int cnt = 0;
      while(enum.hasMoreElements()){
        append("QueueBrowser ### Messaggio numero "+
                (++cnt) + " ###");
        Message message = (Message) enum.nextElement();
        dumpMessageHeaderProperties(message);
        TextMessage txtMessage = (TextMessage) message;
        append("TextMessage: " + txtMessage.getText());
      }
      browser.close();
    }
    catch(JMSException jmse) {
    ...
    }
  }
});

 

Conclusioni
In questo numero si è arricchita la versione originaria della Chat applicando l'utilizzo dei message selector per gestire gruppi di utenza omogenei, e delle Durable Subscriptions per la gestione persistente dei messaggi.

 

Bibliografia
[JMS1] S.Rossini - "JMS -La gestione dei messaggi : la teoria", Mokabyte N.60, Febbraio 2002
[JMS2] S.Rossini - "JMS -La gestione dei messaggi: la pratica", Mokabyte N.61, Marzo 2002
[ORJMS] D.A.Chappel,R.M.Haefel - Java Message Service,O'Reilly 2001
[JMSS] JMS Specification (versione 1.0.2b) - http://java.sun.com/products/jms/docs.html
[JMST] JMS tutorial - http://java.sun.com/products/jms/tutorial/index.html
[EEDOC] J2SDKEE API Documentation
[BWLS] P. Gomez,P. Zadrozny -"Java 2 Enterprise Editino with BEA Weblogic Server", Wrox 2001

 

Risorse
Scarica qui i sorgenti presentati nell'articolo

MokaByte® è un marchio registrato da MokaByte s.r.l. 
Java®, Jini® e tutti i nomi derivati sono marchi registrati da Sun Microsystems.
Tutti i diritti riservati. E' vietata la riproduzione anche parziale.
Per comunicazioni inviare una mail a info@mokabyte.it