一、载入与安置
径直去官网(http://activemq.apache.org/)载入最新本子即可,因为这是免安置的,只须要解压就行了。安置完之保守入bin目次,双击 activemq.bat文献(linux下在bin目次下实行 activemq start)
二、考察遏制台
在欣赏器输出:http://ip:8161/admin/,展示如次界面表白启用胜利,默许的用户名暗号都是admin
三、窜改端标语
61616为对外效劳端标语
8161为遏制器端标语
当端标语辩论时,不妨窜改这两个端标语。cd conf ,窜改activemq.xml 窜改内里的61616端口。窜改jetty.xml,窜改内里的8161端口。
queue部队形式:
和rabbitmq大略部队形式一律,假如有多个耗费者耗费同一个部队中的动静的话,默许也是轮询体制的耗费
示例代码:
public class Productor { public static final String BORKER_URL = "tcp://127.0.0.1:61616"; public static final String QUEUE_NAME = "queue1"; public static void main(String***;] args) throws JMSException { //创造工场 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL); //创造tcp贯穿 Connection connection = factory.createConnection(); //创造贯穿 connection.start(); /** * 创造对话,1.能否打开工作,2.签收形式 */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创造部队(动静的手段地) Queue queue = session.createQueue(QUEUE_NAME); //创造消费者 MessageProducer producer = session.createProducer(queue); //动静非长久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //动静长久化 默许是长久化的// producer.setDeliveryMode(DeliveryMode.PERSISTENT); //创造动静 TextMessage message = session.createTextMessage("您好吗"); //发送动静 producer.send(message); producer.close(); session.close(); connection.close(); System.out.println("发送胜利!"); }}public class Consumer { public static final String BORKER_URL = "tcp://127.0.0.1:61616"; public static final String QUEUE_NAME = "queue1"; public static void main(String***;] args) throws JMSException { //创造工场 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL); //创造tcp贯穿 Connection connection = factory.createConnection(); //创造贯穿 connection.start(); /** * 创造对话,1.能否打开工作,2.签收形式 */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创造/证明部队(动静的手段地) Queue queue = session.createQueue(QUEUE_NAME); //创造耗费者 MessageConsumer consumer = session.createConsumer(queue); /*while (true) { //receive会阻碍线程 TextMessage message = (TextMessage)consumer.receive(); System.out.println("接受到动静:" + message.getText()); }*/ //监听的办法耗费 consumer.setMessageListener(message -> { TextMessage textMessage = (TextMessage)message; try { System.out.println("1号接受到动静:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }); }}topic部队形式:
称为颁布订阅形式,消费者把动静发送给订阅给某个topic中心的耗费者,是散发的形式,这种形式默许须要先启用耗费者,否则就算消费者颁布了某个topic中心的动静,耗费者也耗费不了;只有耗费者提早订阅,而且做了动静长久化的处置,如许后启用耗费者本领耗费提早推送的动静。
代码:
public class Productor { public static final String BORKER_URL = "tcp://127.0.0.1:61616"; public static final String TOPIC_NAME = "topic1"; public static void main(String***;] args) throws JMSException { //创造工场 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL); //异步送达 factory.setUseAsyncSend(true); //创造tcp贯穿 Connection connection = factory.createConnection(); /** * 创造对话,1.能否打开工作,2.签收形式 */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创造/证明topic(动静的手段地) Topic topic = session.createTopic(TOPIC_NAME); //创造消费者 ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic); //长久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //创造贯穿 connection.start(); //创造动静 TextMessage message = session.createTextMessage("您好吗"); //发送动静,异步发送回调因变量 producer.send(message, new AsyncCallback() { @Override public void onSuccess() { System.out.println("success"); } @Override public void onException(JMSException e) { System.out.println("fail"); } }); producer.close(); session.close(); connection.close(); System.out.println("发送胜利!"); }}public class Consumer1 { public static final String BORKER_URL = "tcp://127.0.0.1:61616"; public static final String TOPIC_NAME = "topic1"; public static void main(String***;] args) throws JMSException { //创造工场 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL); //创造tcp贯穿 Connection connection = factory.createConnection(); //拟订clientId connection.setClientID("my"); /** * 创造对话,1.能否打开工作,2.签收形式 */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创造/证明topic(动静的手段地) Topic topic = session.createTopic(TOPIC_NAME); //订阅中心 TopicSubscriber subscriber = session.createDurableSubscriber(topic, "remark"); //创造贯穿 connection.start(); while (true) { //receive会阻碍线程 //接受订阅的动静 TextMessage message = (TextMessage) subscriber.receive(); System.out.println("接受到动静:" + message.getText()); } /*//创造耗费者 MessageConsumer consumer = session.createConsumer(topic); //创造贯穿 connection.start(); *//*while (true) { //receive会阻碍线程 TextMessage message = (TextMessage)consumer.receive(); System.out.println("接受到动静:" + message.getText()); }*//* //监听的办法耗费 consumer.setMessageListener(message -> { TextMessage textMessage = (TextMessage)message; try { System.out.println("1号接受到动静:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } });*/ }}怎样保护动静的真实性
回复这个题目重要从长久化,工作,签收这几个上面动手
动静长久化的中心代码:
//queue形式的动静长久化 默许是长久化的 producer.setDeliveryMode(DeliveryMode.PERSISTENT); /** * topic形式的长久化 */Topic topic = session.createTopic(TOPIC_NAME);ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic);producer.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();工作的中心代码(偏消费者):
//参数树立成trueconnection.createSession(false, Session.AUTO_ACKNOWLEDGE);//工作提交session.commit();签收的中心代码(偏耗费者):
//参数树立成手动提交connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//动静签收message.acknowledge();提防:假如既打开工作,又打开手动签收,以工作为准,只有工作被提交了也默许动静被签收了
本能提高:
1.运用nio的和议比tcp的本能高,
摆设办法:在conf目次下activemq.xml照着底下摆设<broker> ... <transportConnectors> <transportConnector name="nio" uri="nio://0.0.0.0:61616"/> </<transportConnectors> ...</broker>第二步是代码考察办法由tcp改为nio//创造工场ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("nio://127.0.0.1:61616");2.jdbc+Journaling普及惟有jdbc长久化的本能,它在做长久化入数据库之前,会先将数据生存到Journaling文献中,之后才渐渐同步到数据库中,即是中央加了一层缓冲层。
把数据库mysql的启动包放到lib目次下摆设办法:在conf目次下activemq.xml照着底下摆设,个中有个createTablesOnStartup属性,默许值是true,表白历次启用后去数据库机动建表<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> //上头是默许摆设找到改成底下的摆设<persistenceAdapter> <journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="${basedir}/activemq-data" dataSource="#mysql-ds"/></persistenceAdapter> //底下的代码写在<beans>节点中<bean id="mysql-ds" class="3e63-6b68-037d-ce36 org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="poolPreparedStatements" value="true"/></bean>