FFMQ

Light-weight & fast JMS message queuer

FFMQ Documentation

Important Note : in the following document, whenever you see a package or file named ffmqX, you must replace the X by 3 or 4 depending on the FFMQ version you are using.

Table of contents


Installing

Prerequisites

- FFMQ 4.x requires a JRE version 1.5 or above to run.
- FFMQ 3.x requires a JRE version 1.4 or above to run.

Default JVM parameters are set in the bin/setenv.sh script, which is read on startup.
If you want to change things like the default FFMQ heap size this is the file to edit.

Server side

Unpacking the distribution

Download the latest ffmqX-distribution-<version>-dist.zip archive from the github releases page, and unpack it somewhere.
You will get the following directory tree :

What's inside ?

Client side

On the client-side, you need the ffmqX-core.jar in your classpath. (plus the jms-api and commons-logging if you don't already have them)
You will find those jars in the lib/ directory of the server package.

Configuring

Server side

The main configuration file is conf/ffmq-server.properties.

This is a standard java properties file with key/pair values.
See inside the distributed file for details, the various configuration settings are described in comments.

Destination auto-creation

FFMQ supports destination auto-creation based on templates.
When enabled, on first usage of a non-existing queue (or topic), a matching template will be looked up in the templates.mapping file and used to generate a valid queue (or topic) descriptor. This operation is transparent to clients and is useful to create auto-deployable queuers.

Security

If the default XML-based security module is enabled in the main configuration file, users and permissions must be described in the security.xml file.
See syntax inside the distributed file. Please note that security is *disabled* by default.

Templates

When a queue (or topic) is auto-created, the FFMQ engine looks up a template descriptor and copy the template values into a real descriptor file (in the destinations/ folder)
Here is a sample queue template descriptor (which is actually a properties file) :

#
#  This is the default template for queues
#
name = DEFAULT_QUEUE_TEMPLATE

persistentStore.initialBlockCount = 1000
persistentStore.autoExtendAmount  = 1000
persistentStore.maxBlockCount     = 10000
persistentStore.blockSize         = 4096
persistentStore.dataFolder        = ${FFMQ_BASE}/data
persistentStore.useJournal        = true

memoryStore.maxMessages           = 1000

Here is an exhaustive list of the supported properties and their default values :

Common Properties

Property Default Usage
name No default The template name (as referenced in the templates.mapping file)
persistentStore.initialBlockCount 0 Initial number of available blocks in the persistent store
persistentStore.maxBlockCount 0 Maximum number of blocks in the persistent store. The store file may auto-extend up to this size (see persistentStore.autoExtendAmount).0 means no persistent storage.
persistentStore.autoExtendAmount 0 Amount to grow by when auto-extending the store. The store is auto-extended when full.
persistentStore.blockSize 4096 Size of a storage block. Minimum size is 1024 (1KB). Using bigger values may improve performance with large messages but will waste more disk space.
persistentStore.dataFolder ${FFMQ_BASE}/data Folder where to store persistent files for this destination. May be relative or absolute. May use system properties as variables.
persistentStore.useJournal true Enables the use of a synced transaction log for the persistent store. Setting this to false disables transaction logs and filesystem syncing for this destination, which is a lot faster but no longer fail-safe.
persistentStore.journal.dataFolder same as persistentStore.dataFolder Folder where to store journal files for this destination. May be relative or absolute. May use system properties as variables. This is useful if you want to place journal files on different hardware devices (or on faster devices like SSDs) to improve performance.
persistentStore.journal.maxFileSize 33554432 Maximum size of a journal file before creating a new one. Default is 32MB. Actual journal files size may grow a little larger than this limit if it ends on a large transaction.
persistentStore.journal.maxWriteBatchSize 1000 Maximum number of operations to process per asynchronous task. This setting has various implications on lock contention and commit barriers.
persistentStore.journal.maxUnflushedJournalSize 4194304 Maximum size of journal data that may be bufferized in memory before starting to write to the actual journal file. Default is 4MB.
persistentStore.journal.maxUncommittedStoreSize 16777216 Maximum size of store data that may be written to disk without syncing. Default is 16MB.
persistentStore.journal.outputBufferSize 16384 Size of the I/O buffer to be used when writing to a journal file. Default is 16KB.
persistentStore.journal.preAllocateFiles true If true, journal files are pre-allocated to improve performance.
persistentStore.syncMethod 2 Select the disk sync method to use :
  • 1 : FileDescriptor.sync()
  • 2 : FileChannel.force(false)
memoryStore.maxMessages 0 Maximum number of non-persistent messages to store in memory.
memoryStore.overflowToPersistent false Allow non-persistent messages to be stored in the persistent store if the memory store is full. Note that if you use this option, message ordering is no longer guaranteed.

Topic Properties

Property Default Usage
subscriberFailurePolicy 1 Define this topic behavior when an exception occurs while pushing a message to a subscriber. (Bit mask)
  • 0 : Silently ignore errors
  • 1 : Log exception (server-side)
  • 2 : Propagate exception to message producer
  • 3 : Do both (1 + 2)
subscriberOverflowPolicy 1 Define this topic behavior when a subscriber queue is full. (Bit mask)
  • 0 : Silently ignore and skip message
  • 1 : Log exception (server-side)
  • 2 : Propagate exception to message producer.
    Note that enabling this feature in conjunction with the client-side 'producer.retryOnQueueFull' property allows to throttle producer when a subscriber gets behind.
  • 3 : Do both (1 + 2)
partitionsKeysToIndex Not set

Allow to specify message headers on which to index topic subscribers that use a message selector.

This can improve performance when using a partitioned topic pattern with a large number of subscribers (reducing dispatch complexity from O(n) to O(1)).

Typical use-case : most topic subscribers have a message selector like :

  • (Consumer1) someHeader='key1'
  • (Consumer2) someHeader='key2'
  • ...

Indexing the subscribers on 'someHeader' will improve dispatch performance.

Note: Indexing only works with operators '=' (equals) and 'in'.

Notes :

Destinations Descriptors

A destination descriptor is much like a template descriptor but it represents an actual existing destination.
Descriptors are automatically created from a template or created on demand from the administration utility (see below). Syntax is the same as for a template, only the 'name' property refers to the actual destination name.

Notes :

Command-line utility

There is a command-line utility to administrate the server. The utility must be configured in the conf/ffmq-admin-client.properties properties file.
See usage below.

Client side

Here is the default implementation behavior regarding synchronous/asynchronous calls :

Client properties

The client-side driver looks for an optional resource file named ffmqX.properties in the classpath root. If you need to override some default settings, you can create this file and add it to your classpath.
You can refer to net/timewalker/ffmqX/ffmqX.properties in ffmq-core.jar to see available properties and their default values :

# Time to wait for an anwser from the server (seconds)
transport.timeout=120

# Time to wait for a connection attempt (seconds)
transport.tcp.connectTimeout=30

# SSL options
transport.tcp.ssl.protocol=SSLv3
transport.tcp.ssl.ignoreCertificates=false
# If you need to use a custom TrustManager, you can specify a TrustManager class name to use instead of the default JDK/JRE one here
#transport.tcp.ssl.trustManager=<custom trust manager class name>

# Indicate if a consumer should send message acknowledgements synchronously or not
consumer.sendAcksAsync=true

# Indicate if a producer should wait for an acknowledgment when sending a NON_PERSISTENT message on a non-transacted session
#   Default is true for maximum performance at the expense of delivery acknowledgment.
#   In particular, "destination full" errors are not returned to the client in asynchronous mode and are only visible on the server-side 
producer.allowSendAsync=true

# Indicate if a producer should retry if a target queue is full
producer.retryOnQueueFull=true

# Indicate how much time a producer should wait if a target queue is full (milliseconds)
# 0 = unlimited
producer.retryTimeout=30000

Note that in most cases you won't have to change any of these default settings.

Overriding settings using system properties

In FFMQ4, it is also possible to override client-side settings via system properties.
Use the same property name as in ffmq4.properties, but with an extra 'ffmq4.' prefix.

Example : to override 'producer.retryOnQueueFull' you must set the system property 'ffmq4.producer.retryOnQueueFull'

System properties take precedence over values in the properties files.

Spring integration

The FFMQ provider can be declared in spring like any other JMS client implementation.

Here is how you would define a JMS connection factory :

<bean id="JMSConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
  <property name="jndiName" value="factory/ConnectionFactory"/>
  <property name="jndiEnvironment">
        <props>
            <prop key="java.naming.factory.initial">net.timewalker.ffmqX.jndi.FFMQInitialContextFactory</prop>
            <prop key="java.naming.provider.url">tcp://localhost:10002</prop>
        </props>
    </property>
</bean>

And here is how to obtain a a JMS destination bean from JNDI :

<bean id="myJMSQueue" class="org.springframework.jndi.JndiObjectFactoryBean">
  <property name="jndiName" value="queue/myJMSQueue"/>
  <property name="jndiEnvironment">
    <props>
      <prop key="java.naming.factory.initial">net.timewalker.ffmqX.jndi.FFMQInitialContextFactory</prop>
    </props>
  </property>
</bean>

Then, you can declare standard spring JMS templates and listeners. See the spring documentation for details.

Tomcat integration

The FFMQ provider and destination can be declared as a resource reference in a Tomcat <Context>.

Here is an example definition :

<Resource name="jms/ConnectionFactory"
  type="net.timewalker.ffmqX.jndi.FFMQConnectionFactory"
     factory="net.timewalker.ffmqX.jndi.JNDIObjectFactory"
     providerURL="tcp://localhost:10002"
     userName="test" password="test"
     />

<Resource name="jms/myQueue"
  type="net.timewalker.ffmqX.common.destination.QueueRef"
  factory="net.timewalker.ffmqX.jndi.JNDIObjectFactory"
  queueName="MYQUEUE" />
  
  
<Resource name="jms/mytopic"
  type="net.timewalker.ffmqX.common.destination.TopicRef"
  factory="net.timewalker.ffmqX.jndi.JNDIObjectFactory"
  queueName="MYTOPIC" />

And here is how to retrieve thoses resources using the tomcat default InitialContext :

InitialContext init = new InitialContext();
 
// Retrieving the connection factory
ConnectionFactory connectionFactory = (ConnectionFactory) init.lookup("java:comp/env/jms/ConnectionFactory");
        
// Retrieving the queue or topic reference          
Queue myQueue = (Queue)init.lookup("java:comp/env/jms/myQueue");
Topic myTopic = (Topic)init.lookup("java:comp/env/jms/myTopic");

Running

Server side

Starting and stopping

- To start the server, use the ffmq-server.bat or ffmq-server.sh shell script in the server bin/ directory.
- To stop the server, use the ffmq-shutdown.bat or ffmq-shutdown.sh shell script in the server bin/ directory.
(If you want to remotely stop the server, you need to enable the administrative queues in the server configuration and use the ffmq-remote-shutdown.bat or ffmq-remote-shutdown.sh shell script)

Notes :

Diskspace usage

The persistent storage for a given destination is made of two or more files :

The required disk space for a queue is close to : blockCount*(blockSize+13)+2*maxJournalSize
Note that messages are splitted over complete blocks, which means you can store between : blockCount / (maxMessageSize modulo blockSize) (worst case) and blockCount (best case) in a given message store.

Client side

JNDI integration

The client implementation can be accessed through JNDI like any other standard JMS implementation.
Here is the JNDI configuration to use :

Here is a sample source code if you need to do it programmatically :

// Create and initialize a JNDI context
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
Context context = new InitialContext(env);

// Lookup a connection factory in the context
ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME);

// Obtain a JMS connection from the factory
Connection conn = connFactory.createConnection();
conn.start();
...

Note that to provide JMS 1.0.2 compliance, the following connection factories are also declared in the JNDI context :

Command-line administration

The FFMQ server can be (remotely) administered through a command-line utility.
To execute this utility, use the ffmq-admin-client.bat or ffmq-admin-client.sh shell script in the server bin/ directory.
Executing the utility without command-line parameters will display usage information :

Command-line parameters
-------------------------
  -command       : the command to execute (createQueue,createTopic,deleteQueue,deleteTopic,shutdown)
  -conf   : path to a properties file (optional)

All other variables should be passed as variable=value

When dealing with destinations you can pass all available properties (see 'Templates' above) with <propertyName>=<value>

Examples :

# Create queue FOO with a non-persistent message capacity of 1000
ffmq-admin-client -command createQueue name=FOO memoryStore.maxMessages=1000
	
# Delete topic BAR
ffmq-admin-client -command deleteTopic name=BAR

# Ask the server to shutdown
ffmq-admin-client -command shutdown

Command-line JMX console

The FFMQ server can be (remotely) monitored through JMX using generic tools or the provided command-line utility.
To execute this utility, use the ffmq-jmx-console.bat or ffmq-jmx-console.sh shell script in the server bin/ directory.

Embedding a server instance

Spring integration

You can start one (or more) engine instance in a given ApplicationContext, using a provided helper bean :

net.timewalker.ffmqX.spring.FFMQServerBean

Here is a typical XML spring definition for this :

<bean id="FFMQServer" class="net.timewalker.ffmqX.spring.FFMQServerBean" init-method="start" destroy-method="stop">
  <property name="engineName" value="myEngine"/>
  <property name="configLocation" value="../conf/ffmq-server.properties"/>
</bean>

Notes :

Java integration

A server instance can be created directly from Java code if needed.
Here is an example source code :

package net.timewalker.ffmqX.sample.embedded;

import java.io.FileInputStream;
import java.util.Properties;

import net.timewalker.ffmqX.listeners.ClientListener;
import net.timewalker.ffmqX.listeners.tcp.io.TcpListener;
import net.timewalker.ffmqX.local.FFMQEngine;
import net.timewalker.ffmqX.management.destination.definition.QueueDefinition;
import net.timewalker.ffmqX.utils.Settings;

/**
 * Embedded FFMQ sample
 */
public class EmbeddedFFMQSample implements Runnable
{
    private FFMQEngine engine;
    
    /*
     * (non-Javadoc)
     * @see java.lang.Runnable#run()
     */
    public void run()
    {
        try
        {
            // Create engine settings
            Settings settings = createEngineSettings();
            
            // Create the engine itself
            engine = new FFMQEngine("myLocalEngineName", settings);
            //   -> myLocalEngineName will be the engine name.
            //       - It should be unique in a given JVM
            //       - This is the name to be used by local clients to establish
            //         an internal JVM connection (high performance)
            //         Use the following URL for clients :   vm://myLocalEngineName
            //
            
            // Deploy the engine
            System.out.println("Deploying engine : "+engine.getName());
            engine.deploy();
            //  - The FFMQ engine is not functional until deployed.
            //  - The deploy operation re-activates all persistent queues
            //    and recovers them if the engine was not properly closed.
            //    (May take some time for large queues)

            // Adding a TCP based client listener
            System.out.println("Starting listener ...");
            ClientListener tcpListener = new TcpListener(engine,"0.0.0.0",10002,settings,null);
            tcpListener.start();
            
            // This is how you can programmatically define a new queue
            if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo"))
            {
                QueueDefinition queueDef = new QueueDefinition();
                queueDef.setName("foo");
                queueDef.setUseJournal(false);
                queueDef.setMaxNonPersistentMessages(1000);
                queueDef.check();
                engine.createQueue(queueDef);
            }
            
            // You could also define a queue using some java Properties
            if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo2"))
            {
                Properties queueProps = new Properties();
                queueProps.put("name", "foo2");
                queueProps.put("persistentStore.useJournal", "false");
                queueProps.put("memoryStore.maxMessages", "1000");
                QueueDefinition queueDef2 = new QueueDefinition(new Settings(queueProps));
                engine.createQueue(queueDef2);
            }
            
            // Run for some time
            System.out.println("Running ...");
            Thread.sleep(30*1000);
            
            // Stopping the listener
            System.out.println("Stopping listener ...");
            tcpListener.stop();
            
            // Undeploy the engine
            System.out.println("Undeploying engine ...");
            engine.undeploy();
            //   - It is important to properly shutdown the engine 
            //     before stopping the JVM to make sure current transactions 
            //     are nicely completed and storages properly closed.
            
            System.out.println("Done.");
        }
        catch (Exception e)
        {
            // Oops
            e.printStackTrace();
        }
    }
    
    private Settings createEngineSettings()
    {
        // Various ways of creating engine settings
        
        // 1 - From a properties file
        Properties externalProperties = new Properties();
        try
        {
            FileInputStream in = new FileInputStream("../conf/ffmq-server.properties");
            externalProperties.load(in);
            in.close();
        }
        catch (Exception e)
        {
            throw new RuntimeException("Cannot load external properties",e);
        }
        Settings settings = new Settings(externalProperties);
        
        // 2 - Explicit Java code
//        Settings settings = new Settings();
//        
//        settings.setStringProperty(FFMQCoreSettings.DESTINATION_DEFINITIONS_DIR, ".");
//        settings.setStringProperty(FFMQCoreSettings.BRIDGE_DEFINITIONS_DIR, ".");
//        settings.setStringProperty(FFMQCoreSettings.TEMPLATES_DIR, ".");
//        settings.setStringProperty(FFMQCoreSettings.DEFAULT_DATA_DIR, ".");
//        ...
        
        return settings;
    }
    
    /**
     * @param args
     */
    public static void main(String[] args)
    {
        System.setProperty("FFMQ_BASE", "..");
        
        new EmbeddedFFMQSample().run();
    }
}
    

<< Back to index