FFMQ Documentation
Important Note : in the following document, whenever you see a package or file named ffmqX, you must replace the X by 3 or 4 depending on the FFMQ version you are using.
Table of contents
Installing
Prerequisites
- FFMQ 4.x requires a JRE version 1.5 or above to run.
- FFMQ 3.x requires a JRE version 1.4 or above to run.
Default JVM parameters are set in the bin/setenv.sh script, which is read on startup.
If you want to change things like the default FFMQ heap size this is the file to edit.
Server side
Unpacking the distribution
Download the latest ffmqX-distribution-<version>-dist.zip archive from the github releases page, and unpack it somewhere.
You will get the following directory tree :
- bin/
- bridges/
- conf/
- conf/templates/
- data/
- destinations/
- docs/
- lib/
- logs/
What's inside ?
- The bin/ directory contains shell scripts to start, stop and manage the FFMQ server.
- The bridges/ directory holds JMS bridges descriptors loaded by the server on startup.
- The conf/ directory contains various configuration files used by the server (see Configuring below).
- The conf/templates/ directory contains default templates used to create new destinations (queues or topics).
- The data/ directory (empty upon installation) is the default data folder where queues store/index files will be stored.
- The destinations/ directory (empty upon installation) is the default descriptor folder where queues and topics descriptors will be created.
- The docs/ directory contains these documentation files.
- The lib/ directory contains all the jar files used by the server.
- The logs/ directory will hold the server log files
Client side
On the client-side, you need the ffmqX-core.jar in your classpath. (plus the jms-api and commons-logging if you don't already have them)
You will find those jars in the lib/ directory of the server package.
Configuring
Server side
The main configuration file is conf/ffmq-server.properties.
This is a standard java properties file with key/pair values.
See inside the distributed file for details, the various configuration settings are described in comments.
Destination auto-creation
FFMQ supports destination auto-creation based on templates.
When enabled, on first usage of a non-existing queue (or topic), a matching template will be looked up in the
templates.mapping file and used to generate a valid queue (or topic) descriptor.
This operation is transparent to clients and is useful to create auto-deployable queuers.
Security
If the default XML-based security module is enabled in the main configuration file, users and permissions must
be described in the security.xml file.
See syntax inside the distributed file. Please note that security is *disabled* by default.
Templates
When a queue (or topic) is auto-created, the FFMQ engine looks up a template descriptor and
copy the template values into a real descriptor file (in the destinations/ folder)
Here is a sample queue template descriptor (which is actually a properties file) :
# # This is the default template for queues # name = DEFAULT_QUEUE_TEMPLATE persistentStore.initialBlockCount = 1000 persistentStore.autoExtendAmount = 1000 persistentStore.maxBlockCount = 10000 persistentStore.blockSize = 4096 persistentStore.dataFolder = ${FFMQ_BASE}/data persistentStore.useJournal = true memoryStore.maxMessages = 1000
Here is an exhaustive list of the supported properties and their default values :
Common Properties
Property | Default | Usage |
---|---|---|
name | No default | The template name (as referenced in the templates.mapping file) |
persistentStore.initialBlockCount | 0 | Initial number of available blocks in the persistent store |
persistentStore.maxBlockCount | 0 | Maximum number of blocks in the persistent store. The store file may auto-extend up to this size (see persistentStore.autoExtendAmount).0 means no persistent storage. |
persistentStore.autoExtendAmount | 0 | Amount to grow by when auto-extending the store. The store is auto-extended when full. |
persistentStore.blockSize | 4096 | Size of a storage block. Minimum size is 1024 (1KB). Using bigger values may improve performance with large messages but will waste more disk space. |
persistentStore.dataFolder | ${FFMQ_BASE}/data | Folder where to store persistent files for this destination. May be relative or absolute. May use system properties as variables. |
persistentStore.useJournal | true | Enables the use of a synced transaction log for the persistent store. Setting this to false disables transaction logs and filesystem syncing for this destination, which is a lot faster but no longer fail-safe. |
persistentStore.journal.dataFolder | same as persistentStore.dataFolder | Folder where to store journal files for this destination. May be relative or absolute. May use system properties as variables. This is useful if you want to place journal files on different hardware devices (or on faster devices like SSDs) to improve performance. |
persistentStore.journal.maxFileSize | 33554432 | Maximum size of a journal file before creating a new one. Default is 32MB. Actual journal files size may grow a little larger than this limit if it ends on a large transaction. |
persistentStore.journal.maxWriteBatchSize | 1000 | Maximum number of operations to process per asynchronous task. This setting has various implications on lock contention and commit barriers. |
persistentStore.journal.maxUnflushedJournalSize | 4194304 | Maximum size of journal data that may be bufferized in memory before starting to write to the actual journal file. Default is 4MB. |
persistentStore.journal.maxUncommittedStoreSize | 16777216 | Maximum size of store data that may be written to disk without syncing. Default is 16MB. |
persistentStore.journal.outputBufferSize | 16384 | Size of the I/O buffer to be used when writing to a journal file. Default is 16KB. |
persistentStore.journal.preAllocateFiles | true | If true, journal files are pre-allocated to improve performance. |
persistentStore.syncMethod | 2 |
Select the disk sync method to use :
|
memoryStore.maxMessages | 0 | Maximum number of non-persistent messages to store in memory. |
memoryStore.overflowToPersistent | false | Allow non-persistent messages to be stored in the persistent store if the memory store is full. Note that if you use this option, message ordering is no longer guaranteed. |
Topic Properties
Property | Default | Usage |
---|---|---|
subscriberFailurePolicy | 1 | Define this topic behavior when an exception occurs while pushing a message to a subscriber. (Bit mask)
|
subscriberOverflowPolicy | 1 | Define this topic behavior when a subscriber queue is full. (Bit mask)
|
partitionsKeysToIndex | Not set |
Allow to specify message headers on which to index topic subscribers that use a message selector. This can improve performance when using a partitioned topic pattern with a large number of subscribers (reducing dispatch complexity from O(n) to O(1)).
Typical use-case : most topic subscribers have a message selector like :
Indexing the subscribers on 'someHeader' will improve dispatch performance. Note: Indexing only works with operators '=' (equals) and 'in'. |
Notes :
- The syntax is the same for a queue or topic
- For a topic, the various parameters will be applied to the backing queues that hold the messages for each topic subscriber.
- There will be one temporary queue for each subscriber, so you need to take this into account when sizing your topic.
- The filename of a queue template must match the pattern queueTemplate-*.properties to be picked up.
- The filename of a topic template must match the pattern topicTemplate-*.properties to be picked up.
Destinations Descriptors
A destination descriptor is much like a template descriptor but it represents an actual existing destination.
Descriptors are automatically created from a template or created on demand from the administration utility (see below).
Syntax is the same as for a template, only the 'name' property refers to the actual destination name.
Notes :
- The syntax is the same for a queue or topic
- The filename of a queue definition must start with queue-
- The filename of a topic definition must start with topic-
Command-line utility
There is a command-line utility to administrate the server. The utility must be configured
in the conf/ffmq-admin-client.properties properties file.
See usage below.
Client side
Here is the default implementation behavior regarding synchronous/asynchronous calls :
- Producing messages :
PERSISTENT NON-PERSISTENT TRANSACTED Async* Async* DUPS_OK_ACKNOWLEDGE Sync Async AUTO_ACKNOWLEDGE Sync Async CLIENT_ACKNOWLEDGE Sync Async - Consuming messages :
PERSISTENT NON-PERSISTENT TRANSACTED Sync Sync DUPS_OK_ACKNOWLEDGE Async Async AUTO_ACKNOWLEDGE Sync Sync CLIENT_ACKNOWLEDGE Sync Sync
Client properties
The client-side driver looks for an optional resource file named ffmqX.properties in the classpath root.
If you need to override some default settings, you can create this file and add it to your classpath.
You can refer to net/timewalker/ffmqX/ffmqX.properties in ffmq-core.jar to
see available properties and their default values :
# Time to wait for an anwser from the server (seconds) transport.timeout=120 # Time to wait for a connection attempt (seconds) transport.tcp.connectTimeout=30 # SSL options transport.tcp.ssl.protocol=SSLv3 transport.tcp.ssl.ignoreCertificates=false # If you need to use a custom TrustManager, you can specify a TrustManager class name to use instead of the default JDK/JRE one here #transport.tcp.ssl.trustManager=<custom trust manager class name> # Indicate if a consumer should send message acknowledgements synchronously or not consumer.sendAcksAsync=true # Indicate if a producer should wait for an acknowledgment when sending a NON_PERSISTENT message on a non-transacted session # Default is true for maximum performance at the expense of delivery acknowledgment. # In particular, "destination full" errors are not returned to the client in asynchronous mode and are only visible on the server-side producer.allowSendAsync=true # Indicate if a producer should retry if a target queue is full producer.retryOnQueueFull=true # Indicate how much time a producer should wait if a target queue is full (milliseconds) # 0 = unlimited producer.retryTimeout=30000
Note that in most cases you won't have to change any of these default settings.
Overriding settings using system properties
In FFMQ4, it is also possible to override client-side settings via system properties.
Use the same property name as in ffmq4.properties, but with an extra 'ffmq4.' prefix.
Example : to override 'producer.retryOnQueueFull' you must set the system property 'ffmq4.producer.retryOnQueueFull'
System properties take precedence over values in the properties files.
Spring integration
The FFMQ provider can be declared in spring like any other JMS client implementation.
Here is how you would define a JMS connection factory :
<bean id="JMSConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="factory/ConnectionFactory"/> <property name="jndiEnvironment"> <props> <prop key="java.naming.factory.initial">net.timewalker.ffmqX.jndi.FFMQInitialContextFactory</prop> <prop key="java.naming.provider.url">tcp://localhost:10002</prop> </props> </property> </bean>
And here is how to obtain a a JMS destination bean from JNDI :
<bean id="myJMSQueue" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="queue/myJMSQueue"/> <property name="jndiEnvironment"> <props> <prop key="java.naming.factory.initial">net.timewalker.ffmqX.jndi.FFMQInitialContextFactory</prop> </props> </property> </bean>
Then, you can declare standard spring JMS templates and listeners. See the spring documentation for details.
Tomcat integration
The FFMQ provider and destination can be declared as a resource reference in a Tomcat <Context>.
Here is an example definition :
<Resource name="jms/ConnectionFactory" type="net.timewalker.ffmqX.jndi.FFMQConnectionFactory" factory="net.timewalker.ffmqX.jndi.JNDIObjectFactory" providerURL="tcp://localhost:10002" userName="test" password="test" /> <Resource name="jms/myQueue" type="net.timewalker.ffmqX.common.destination.QueueRef" factory="net.timewalker.ffmqX.jndi.JNDIObjectFactory" queueName="MYQUEUE" /> <Resource name="jms/mytopic" type="net.timewalker.ffmqX.common.destination.TopicRef" factory="net.timewalker.ffmqX.jndi.JNDIObjectFactory" queueName="MYTOPIC" />
And here is how to retrieve thoses resources using the tomcat default InitialContext :
InitialContext init = new InitialContext(); // Retrieving the connection factory ConnectionFactory connectionFactory = (ConnectionFactory) init.lookup("java:comp/env/jms/ConnectionFactory"); // Retrieving the queue or topic reference Queue myQueue = (Queue)init.lookup("java:comp/env/jms/myQueue"); Topic myTopic = (Topic)init.lookup("java:comp/env/jms/myTopic");
Running
Server side
Starting and stopping
- To start the server, use the ffmq-server.bat or ffmq-server.sh shell script in the server bin/ directory.
- To stop the server, use the ffmq-shutdown.bat or ffmq-shutdown.sh shell script in the server bin/ directory.
(If you want to remotely stop the server, you need to enable the administrative queues in the server configuration and use the ffmq-remote-shutdown.bat or ffmq-remote-shutdown.sh shell script)
Notes :
- The server will start a clean shutdown sequence if it receives a SIGTERM signal.
- It is not recommended to abruptly kill the server process as it will cause a recovery of all the active persistent storages on next restart.
Diskspace usage
The persistent storage for a given destination is made of two or more files :
- A data file with extension .store (blockCount*blockSize bytes large), containing the messages
- An index file with extension .index (~ blockCount*13 bytes large) acting as an allocation table
- Up to two journal files (2*maxJournalSize bytes) during execution
The required disk space for a queue is close to : blockCount*(blockSize+13)+2*maxJournalSize
Note that messages are splitted over complete blocks, which means you can store between :
blockCount / (maxMessageSize modulo blockSize) (worst case) and blockCount (best case)
in a given message store.
Client side
JNDI integration
The client implementation can be accessed through JNDI like any other standard JMS implementation.
Here is the JNDI configuration to use :
- Naming Context Factory : net.timewalker.ffmqX.jndi.FFMQInitialContextFactory
- Connection Factory JNDI Name : factory/ConnectionFactory
- Provider URL : tcp://<hostname>:10002
Here is a sample source code if you need to do it programmatically :
// Create and initialize a JNDI context Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY); env.put(Context.PROVIDER_URL, "tcp://localhost:10002"); Context context = new InitialContext(env); // Lookup a connection factory in the context ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME); // Obtain a JMS connection from the factory Connection conn = connFactory.createConnection(); conn.start(); ...
Note that to provide JMS 1.0.2 compliance, the following connection factories are also declared in the JNDI context :
- factory/QueueConnectionFactory
- factory/TopicConnectionFactory
Command-line administration
The FFMQ server can be (remotely) administered through a command-line utility.
To execute this utility, use the ffmq-admin-client.bat or ffmq-admin-client.sh shell script in the server bin/ directory.
Executing the utility without command-line parameters will display usage information :
Command-line parameters ------------------------- -command: the command to execute (createQueue,createTopic,deleteQueue,deleteTopic,shutdown) -conf : path to a properties file (optional) All other variables should be passed as variable=value
When dealing with destinations you can pass all available properties (see 'Templates' above) with <propertyName>=<value>
Examples :
# Create queue FOO with a non-persistent message capacity of 1000 ffmq-admin-client -command createQueue name=FOO memoryStore.maxMessages=1000 # Delete topic BAR ffmq-admin-client -command deleteTopic name=BAR # Ask the server to shutdown ffmq-admin-client -command shutdown
Command-line JMX console
The FFMQ server can be (remotely) monitored through JMX using generic tools or the provided command-line utility.
To execute this utility, use the ffmq-jmx-console.bat or ffmq-jmx-console.sh shell script in the server bin/ directory.
Embedding a server instance
Spring integration
You can start one (or more) engine instance in a given ApplicationContext, using a provided helper bean :
net.timewalker.ffmqX.spring.FFMQServerBean
Here is a typical XML spring definition for this :
<bean id="FFMQServer" class="net.timewalker.ffmqX.spring.FFMQServerBean" init-method="start" destroy-method="stop"> <property name="engineName" value="myEngine"/> <property name="configLocation" value="../conf/ffmq-server.properties"/> </bean>
Notes :
- The engineName property value must be unique in a given VM
- The configLocation property supports the 'classpath:' notation and must point to an existing properties file containing the server configuration
- This spring definition automatically calls the server start() and stop() methods on context init and close
Java integration
A server instance can be created directly from Java code if needed.
Here is an example source code :
package net.timewalker.ffmqX.sample.embedded; import java.io.FileInputStream; import java.util.Properties; import net.timewalker.ffmqX.listeners.ClientListener; import net.timewalker.ffmqX.listeners.tcp.io.TcpListener; import net.timewalker.ffmqX.local.FFMQEngine; import net.timewalker.ffmqX.management.destination.definition.QueueDefinition; import net.timewalker.ffmqX.utils.Settings; /** * Embedded FFMQ sample */ public class EmbeddedFFMQSample implements Runnable { private FFMQEngine engine; /* * (non-Javadoc) * @see java.lang.Runnable#run() */ public void run() { try { // Create engine settings Settings settings = createEngineSettings(); // Create the engine itself engine = new FFMQEngine("myLocalEngineName", settings); // -> myLocalEngineName will be the engine name. // - It should be unique in a given JVM // - This is the name to be used by local clients to establish // an internal JVM connection (high performance) // Use the following URL for clients : vm://myLocalEngineName // // Deploy the engine System.out.println("Deploying engine : "+engine.getName()); engine.deploy(); // - The FFMQ engine is not functional until deployed. // - The deploy operation re-activates all persistent queues // and recovers them if the engine was not properly closed. // (May take some time for large queues) // Adding a TCP based client listener System.out.println("Starting listener ..."); ClientListener tcpListener = new TcpListener(engine,"0.0.0.0",10002,settings,null); tcpListener.start(); // This is how you can programmatically define a new queue if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo")) { QueueDefinition queueDef = new QueueDefinition(); queueDef.setName("foo"); queueDef.setUseJournal(false); queueDef.setMaxNonPersistentMessages(1000); queueDef.check(); engine.createQueue(queueDef); } // You could also define a queue using some java Properties if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo2")) { Properties queueProps = new Properties(); queueProps.put("name", "foo2"); queueProps.put("persistentStore.useJournal", "false"); queueProps.put("memoryStore.maxMessages", "1000"); QueueDefinition queueDef2 = new QueueDefinition(new Settings(queueProps)); engine.createQueue(queueDef2); } // Run for some time System.out.println("Running ..."); Thread.sleep(30*1000); // Stopping the listener System.out.println("Stopping listener ..."); tcpListener.stop(); // Undeploy the engine System.out.println("Undeploying engine ..."); engine.undeploy(); // - It is important to properly shutdown the engine // before stopping the JVM to make sure current transactions // are nicely completed and storages properly closed. System.out.println("Done."); } catch (Exception e) { // Oops e.printStackTrace(); } } private Settings createEngineSettings() { // Various ways of creating engine settings // 1 - From a properties file Properties externalProperties = new Properties(); try { FileInputStream in = new FileInputStream("../conf/ffmq-server.properties"); externalProperties.load(in); in.close(); } catch (Exception e) { throw new RuntimeException("Cannot load external properties",e); } Settings settings = new Settings(externalProperties); // 2 - Explicit Java code // Settings settings = new Settings(); // // settings.setStringProperty(FFMQCoreSettings.DESTINATION_DEFINITIONS_DIR, "."); // settings.setStringProperty(FFMQCoreSettings.BRIDGE_DEFINITIONS_DIR, "."); // settings.setStringProperty(FFMQCoreSettings.TEMPLATES_DIR, "."); // settings.setStringProperty(FFMQCoreSettings.DEFAULT_DATA_DIR, "."); // ... return settings; } /** * @param args */ public static void main(String[] args) { System.setProperty("FFMQ_BASE", ".."); new EmbeddedFFMQSample().run(); } }