001 /**
002 *
003 */
004 package de.jw.cloud42.core.eventing.subscription;
005
006 import java.net.URI;
007 import java.util.logging.Logger;
008
009 import javax.persistence.Entity;
010 import javax.persistence.Id;
011
012
013
014
015 import org.apache.axiom.om.OMAbstractFactory;
016 import org.apache.axiom.om.OMElement;
017 import org.apache.axiom.om.OMFactory;
018 import org.apache.axiom.om.OMNamespace;
019 import org.apache.axis2.AxisFault;
020 import org.apache.axis2.Constants;
021 import org.apache.axis2.addressing.EndpointReference;
022 import org.apache.axis2.client.Options;
023 import org.apache.axis2.client.ServiceClient;
024 import org.apache.axis2.context.MessageContext;
025 import org.apache.axis2.description.TransportOutDescription;
026 import org.apache.axis2.transport.http.HTTPConstants;
027
028 import de.jw.cloud42.core.eventing.EventingConstants;
029 import de.jw.cloud42.core.eventing.EventingException;
030 import de.jw.cloud42.core.eventing.Message;
031
032 /**
033 * This is an implementation of a subscription for an endpoint that receives notification messages
034 * as SOAP messages over HTTP.
035 *
036 * @author fbitzer
037 *
038 */
039 @Entity
040 public class SOAPSubscription extends Subscription {
041
042
043 /**
044 * Endpoint where notifications must be send to.
045 */
046 private EndpointReference toEndpoint;
047
048
049
050 /**
051 * Transforms the provided notification message into a SOAP message and sends it.
052 *
053 * @see de.jw.cloud42.core.eventing.subscription.Subscription#sendEventData(Message message)
054 * @param message the message to send as SOAP message.
055 */
056 @Override
057 public void sendEventData(Message message) throws Exception {
058 //send a SOAP message to the subscribing endpoint
059
060 //create an OMElement out of the message
061 OMFactory factory = OMAbstractFactory.getOMFactory();
062
063 OMNamespace ns = factory.createOMNamespace(EventingConstants.NOTIFICATION_NAMESPACE, "msg");
064
065 OMElement messageElement = factory.createOMElement("message", ns);
066
067
068 OMElement topicElement = factory.createOMElement("topic", ns);
069 messageElement.addChild(topicElement);
070 topicElement.setText(message.topic);
071
072 OMElement idElement = factory.createOMElement("instanceId", ns);
073 messageElement.addChild(idElement);
074 idElement.setText(message.instanceId );
075
076
077
078 OMElement timeElement = factory.createOMElement("timestamp", ns);
079 messageElement.addChild(timeElement);
080 timeElement.setText(message.timestamp);
081
082 OMElement textElement = factory.createOMElement("text", ns);
083 messageElement.addChild(textElement);
084 textElement.setText(message.text);
085
086 OMElement infoElement = factory.createOMElement("info", ns);
087 messageElement.addChild(infoElement);
088 infoElement.setText(message.info);
089
090
091 sendThePublication(messageElement);
092
093
094 }
095
096 /**
097 * @return the toEndpoint
098 */
099 public EndpointReference getToEndpoint() {
100 return toEndpoint;
101 }
102
103 /**
104 * @param toEndpoint the toEndpoint to set
105 */
106 public void setToEndpoint(EndpointReference toEndpoint) {
107 this.toEndpoint = toEndpoint;
108 }
109
110
111 /**
112 * Send data to subscriber's endpoint using Axis2 ServiceClient.
113 *
114 * @param eventData OMElement containing the data to put on the wire.
115 * @throws EventingException
116 */
117 private void sendThePublication(OMElement eventData) throws EventingException {
118
119 Logger.getAnonymousLogger().info("Sending notification to " + this.getToEndpoint().getAddress());
120
121 EndpointReference deliveryEPR = this.getToEndpoint();
122 try {
123 ServiceClient sc = new ServiceClient();
124 Options options = new Options();
125
126 options.setTo(deliveryEPR);
127
128 options.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
129
130 options.setProperty(Constants.Configuration.CONTENT_TYPE, HTTPConstants.MEDIA_TYPE_APPLICATION_SOAP_XML);
131
132 sc.setOptions(options);
133
134 sc.fireAndForget(eventData);
135
136 } catch (AxisFault e) {
137 throw new EventingException(e);
138
139 } catch (Exception ex) {
140 throw new EventingException(ex);
141 }
142 }
143
144 }