001 package de.jw.cloud42.core.endpoint;
002
003 import java.io.StringReader;
004 import java.util.List;
005 import java.util.logging.Level;
006 import java.util.logging.Logger;
007
008 import javax.annotation.Resource;
009
010 import javax.xml.bind.JAXBContext;
011
012 import javax.xml.bind.JAXBException;
013 import javax.xml.bind.Unmarshaller;
014
015 import javax.xml.transform.Source;
016
017 import javax.xml.transform.stream.StreamSource;
018
019 import javax.xml.ws.BindingType;
020 import javax.xml.ws.Endpoint;
021 import javax.xml.ws.Provider;
022 import javax.xml.ws.Service;
023 import javax.xml.ws.ServiceMode;
024
025 import javax.xml.ws.WebServiceContext;
026 import javax.xml.ws.WebServiceException;
027 import javax.xml.ws.WebServiceProvider;
028 import javax.xml.ws.handler.MessageContext;
029 import javax.xml.ws.http.HTTPBinding;
030
031
032 import org.hibernate.Query;
033 import org.hibernate.Session;
034
035 import de.jw.cloud42.core.domain.Settings;
036 import de.jw.cloud42.core.eventing.Message;
037 import de.jw.cloud42.core.eventing.notification.NotificationManager;
038 import de.jw.cloud42.core.eventing.storage.DefaultSubscriberStore;
039 import de.jw.cloud42.core.eventing.storage.SubscriberStore;
040 import de.jw.cloud42.core.eventing.subscription.Subscription;
041 import de.jw.cloud42.core.hibernate.HibernateUtil;
042
043 /**
044 *
045 * This is an endpoint with a HTTP binding to receive POST notification messages
046 * sent from an AMI instance to Cloud42.
047 * Per default, it listens to http://localhost:8084/messages, but the address can be changed using the
048 * Web Service interface of Cloud42.
049 *
050 *
051 * It uses JAX-WS for a RESTful web service implementation.
052 *
053 * Incoming messages are parsed into
054 * <code>de.jw.cloud42.core.eventing.Message</code> objects using JAXB.
055 *
056 * <b>Important:</b> Note that the HTTP content type explicitly must be set to text/xml so that a message
057 * can be parsed correctly.
058 *
059 * @author fbitzer
060 */
061 @WebServiceProvider
062 @BindingType(value = HTTPBinding.HTTP_BINDING)
063 @ServiceMode(value = Service.Mode.MESSAGE)
064 public class Cloud42Endpoint implements Provider<Source> {
065
066 /**
067 * The default address of the Cloud42 endpoint if no settings are specified.
068 */
069 private final String DEFAULT_ENDPOINT_ADDRESS = "http://localhost:8084/messages";
070
071
072 //use Singleton pattern
073 private static Cloud42Endpoint theInstance;
074
075 @Resource
076 protected WebServiceContext wsContext;
077
078 private JAXBContext jc;
079
080
081 //the current endpoint
082 Endpoint endpoint = null;
083
084 /**
085 * Private Constructor; initialize JAXB context
086 */
087 private Cloud42Endpoint() {
088 try {
089 jc = JAXBContext.newInstance(Message.class);
090
091 } catch (JAXBException je) {
092 throw new WebServiceException("Cannot create JAXBContext", je);
093 }
094 }
095
096 /**
097 * Singleton method.
098 * @return current instance.
099 */
100 public static Cloud42Endpoint getInstance(){
101 if (theInstance == null){
102 theInstance = new Cloud42Endpoint();
103 }
104
105 return theInstance;
106 }
107
108 /**
109 * Invoke method of web service. Process incoming message.
110 */
111 public Source invoke(Source request) {
112
113 MessageContext mc = wsContext.getMessageContext();
114
115 String httpMethod = (String) mc.get(MessageContext.HTTP_REQUEST_METHOD);
116
117 // Only POST messages are processed
118 if (httpMethod.equals("POST")) {
119
120 return post(request, mc);
121
122 } else {
123
124 mc.put(MessageContext.HTTP_RESPONSE_CODE, 400);
125 return null;
126 }
127
128 }
129
130 /**
131 * Create Cloud42 endpoint with HTTP Binding. Address is read from settings.
132 * If an endpoint is running, it is closed first.
133 *
134 * @param args
135 */
136 public void startEndpoint() {
137 try {
138
139
140 //if there is a running endpoint, close it
141 if (endpoint != null){
142 endpoint.stop();
143
144 Logger.getAnonymousLogger().info("Currently running Cloud42 message endpoint was closed.");
145
146 }
147
148 endpoint = Endpoint.create(HTTPBinding.HTTP_BINDING,
149 new Cloud42Endpoint());
150
151 //read the address for the new endpoint
152 String address = getAddress();
153
154 endpoint.publish(address);
155
156 Logger.getAnonymousLogger().info("------------------------------------------------");
157 Logger.getAnonymousLogger().info("Cloud42 message endpoint published at " + address);
158 Logger.getAnonymousLogger().info("------------------------------------------------");
159
160 } catch (Exception ex) {
161 Logger.getAnonymousLogger().log(Level.SEVERE,
162 "Creating Cloud42 endpoint failed: " + ex.getMessage());
163 ex.printStackTrace();
164 }
165
166 }
167
168 /**
169 * Reads the address of the Cloud42 endpoint from the configuration entry in the database.
170 * If no settings entry exists, a default value is returned and saved to database.
171 *
172 * @return Address of Cloud42 endpoint for incoming notifications.
173 */
174 public String getAddress(){
175
176 String address = Settings.getInstance().getEndpointAddress();
177
178 //no address, so set default
179 if (address == null) {
180 address = DEFAULT_ENDPOINT_ADDRESS;
181 Settings.getInstance().setEndpointAddress(address);
182 Settings.getInstance().save();
183 }
184
185 return address;
186
187
188 }
189 /**
190 * Set address of Cloud42 endpoint.
191 * Causes restart of endpoint with new address.
192 *
193 * @param newAddress the address to set.
194 */
195 public void setAddress(String newAddress){
196
197 Settings.getInstance().setEndpointAddress(newAddress);
198 Settings.getInstance().save();
199
200 //restart endpoint
201 this.startEndpoint();
202 }
203
204
205 /**
206 * Handles HTTP POST and starts notifiying subscribers.
207 */
208 private Source post(Source source, MessageContext mc) {
209
210 String replyElement;
211 try {
212 // parse message
213 Unmarshaller u = jc.createUnmarshaller();
214 Message message = (Message) u.unmarshal(source);
215
216 Logger.getAnonymousLogger().info(
217 "Cloud42 endpoint retrieved a event message. InstanceId is: "
218 + message.instanceId);
219
220 // set Http Status Code 202 for "Accepted"
221 mc.put(MessageContext.HTTP_RESPONSE_CODE, 202);
222 replyElement = "<reply>OK</reply>";
223
224
225 // Notify subscribers
226 NotificationManager.notifySubscribers(message);
227
228 } catch (Exception ex) {
229 // error parsing the request, so set status code 400 (Bad Request)
230 Logger.getAnonymousLogger().log(Level.WARNING,
231 "Error parsing incoming event message: " + ex.getMessage()
232 + ". Maybe HTTP content-type was wrong (must be text/xml) or invalid message format.");
233
234 mc.put(MessageContext.HTTP_RESPONSE_CODE, 400);
235 replyElement = "<reply>" + ex.getMessage() + "</reply>";
236
237 }
238
239 StreamSource reply = new StreamSource(new StringReader(replyElement));
240 return reply;
241
242 }
243
244 }