Misadventures in Concurrent and Parallel programming, plus random comments on software performance and various OSS contributions.
Tuesday, 9 December 2008
More Mono, AMQP and System.Messaging
Progress has gone really well, I have implemented all of the Receive, Send and Peek methods of MessageQueue including methods that select by Id/CorrelationId, use transactions and timeouts. MessageEnumerators are currently supported although it is not possible to implement the transactional version RemoveCurrent, due to the way the AMQP handles transactions. Well maybe not impossible, just an enormous, ineffient hack if I managed to get it to work.
The code has now be shifted into the mono trunk, which suggests I should really make sure I get it complete (or as complete as possible). One nice side effect of the implementation is that I have added an SPI that can be used by other potential implementations.
In similar news, a couple of weeks ago MS announced that they are joining the AMQP working group it will be interesting to see where they go with it.
Sunday, 21 September 2008
Another GWT and Comet
I've been quite interested in GWT and Comet for some time now, however when looking around for solutions none really grabbed me. The most promising is the rocket-gwt implementation, but that requires an iframe to manage the Comet comunnication, which seemed a little unecessary. With the latest version of GWT (1.5.2) the Google guys have included a direct HTTP api in addition to the RPC mechanism provided with the earlier releases. I though I would have a go at producing my own implementation in combination with Tomcat 6. I set myself a couple of constraints:
- The code must remain functional in the standard GWT dev environment, but not necessary performing Comet-style HTTP Push
- When deployed to a proper Tomcat 6 servlet engine it must use the implemented CometProcessor to manage HTTP Push.
- It must be possible to deploy/run/debug in the 2 environments (GWT dev, full Tomcat 6) without code or configuration changes when switching between them.
Client Code
I started by implementing the client portion of my little test application. Starting with the basic premise of a stock watching application (a fairly common use case for Comet), I constructed a small app that would fetch a list of stocks using the standard RPC mechanism and then listen for update using the custom HTTP/Comet implementation. I created and interface called StockUpdateService
and an implementation called StockUpdateServiceImpl
. For the main client method I reused the GWT AsyncCallback interface to handle the responses to the service call. The HTTP method would expect a Stock object encoded using JSON.
public interface StockUpdateService {
void waitForUpdate(AyncCallback<stock> callback);
}
public class StockUpdateServiceImpl {
private static final String url = GWT.getModuleBaseURL() + "/updateServlet";
private Request currentRequest = null;
public void waitForUpdate(final AsyncCallbackcallback) {
RequestBuilder builder = new RequestBuilder(RequestBuilder.GET, URL.encode(url));
try {
currentRequest = builder.sendRequest(null, new RequestCallback() {
public void onError(Request request, Throwable exception) {
callback.onFailure(exception);
}
public void onResponseReceived(Request request, Response response) {
if (200 == response.getStatusCode()) {
// When receiving a successful response...
JSONValue v = JSONParser.parse(response.getText());
JSONObject o = v.isObject();
if (o != null) {
// Decode the JSON Object...
String code = o.get("code").isString().stringValue();
long price = (long) o.get("price").isNumber().doubleValue();
long change = (long) o.get("change").isNumber().doubleValue();
Stock s = new Stock(code, price, change);
// Pass is back to the caller...
callback.onSuccess(s);
} else {
callback.onFailure(new Exception(
"Invalid JSON response: " + response.getText()));
}
} else {
callback.onFailure(new Exception(response.getStatusText()));
}
}
});
} catch (RequestException e) {
callback.onFailure(e);
}
}
}
All fairly stright forward, the HTTP api supports an HTTPResponseCallback
interface, within which I simply marshall the incoming data and pass it onto the supplied AsyncCallback
.
Server Code for GWT Dev Environment
Since the Tomcat server that comes with GWT does not support Comet, I had to find a way to provide the push functionality, but without using Comet. Essentially I just needed some that could be used during testing and debugging that was functionally correct. It turned out to be ridiculously simple. What is not mentioned clearly in the GWT docs is that you can create standard Java servlets in the GWT environment and they simply work the way you expect. Therefore I created a standalone serlet that would block the running thread before returning.
public class UpdateServlet extends HTTPServlet {
Stock[] stocks = {
new Stock("BA", 130, 15),
new Stock("NT", 400, -1),
new Stock("FA", 5000, 2),
new Stock("HZ", 213, -70),
new Stock("CR", 14, 1)
};
protected void doGet(HttpServletRequest req, HttpServletResponse rsp) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Continue.
}
Random r = new Random();
int stockNum = r.nextInt(stocks.length);
long change = r.nextInt(100) - 49;
Stock oldStock = stocks[stockNum];
Stock newStock = new Stock(oldStock.getCode(), oldStock.getPrice() + change, change);
stocks[stockNum] = newStock;
String result = Formatter.toJSON(newStock);
resp.getOutputStream().write(result.getBytes());
resp.getOutputStream().flush();
}
}
This class I configured in the GWT module for my project using the XML element: <servlet path='/updateServlet' class='uk.co.middlesoft.trader.server.UpdateServlet'/>
Server Code for Comet
All of that is well and good, but the important bit is the actual Comet part. I decided to use Tomcat's Comet Processor, so I created a second servlet with the same functionality as the traditional servlet but implementing the CometProcessor
interface.
public class CometUpdateServlet extends HttpServlet implements CometProcessor {
public void event(CometEvent event) throws IOException, ServletException {
private final List<CometEvent> connections = new ArrayList<CometEvent>();
switch (event.getEventType()) {
case BEGIN:
synchronized (connections) {
connections.add(event);
}
break;
case READ:
HttpServletRequest req = event.getHttpServletRequest();
InputStream is = req.getInputStream();
byte[] buf = new byte[512];
do {
int n = is.read(buf); //can throw an IOException
if (n < 0);
return;
}
} while (is.available() > 0);
break;
case END:
synchronized (connections) {
connections.remove(event);
}
event.close();
break;
case ERROR:
synchronized (connections) {
connections.remove(event);
}
event.close();
break;
}
}
private Thread t = null;
public void init(ServletConfig config) throws ServletException {
t = new Thread(new MyRunnable());
t.setDaemon(true);
t.start();
System.out.println("Started thread");
}
private class MyRunnable implements Runnable {
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(1000);
Random r = new Random();
int stockNum = r.nextInt(stocks.length);
long change = r.nextInt(100) - 49;
Stock oldStock = stocks[stockNum];
Stock newStock = new Stock(oldStock.getCode(),
oldStock.getPrice() + change, change);
stocks[stockNum] = newStock;
synchronized (connections) {
for (Iterator<CometEvent> i = connections.iterator(); i.hasNext();) {
try {
CometEvent e = i.next();
// Remove the current connection so that it can
// be closed.
i.remove();
HttpServletResponse rsp = e.getHttpServletResponse();
String result = Formatter.toJSON(newStock);
rsp.getOutputStream().write(result.getBytes());
rsp.getOutputStream().flush();
// Apparently this is needed for IE.
e.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
public void destroy() {
t.interrupt();
connections.clear();
}
}
The servlet uses a separate thread to generate events for the client. The core of the Runnable
implementation works almost identically to the traditional servlet implementation returning a Stock
object marshalled as a JSON string. The key thing to remember is that multiple threads will be accessing an instance of this class, so it is important to ensure that shared data access is thread safe.
The final step to setting this up, was putting together an Ant script that would build a war file that could be deployed into Tomcat. The war file would contain a web.xml which configured the CometProcessor servlet with a servlet mapping that matches the same URL as servlet defined in the GWT module.
<servlet>
<servlet-name>cometUpdateServlet</servlet-name>
<servlet-class>uk.co.middlesoft.trader.server.CometUpdateServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>cometUpdateServlet</servlet-name>
<url-pattern>/updateServlet</url-pattern>
</servlet-mapping>
Setting up the Ant script was fairly straight forward with the exception of having to create a target that would compile the GWT code into HTML and JavaScript
<target name="compile-js" depends="init" description="Build javascript code">
<java classname="com.google.gwt.dev.GWTCompiler" fork="true" failonerror="true">
<classpath>
<path refid="client.classpath"/>
</classpath>
<arg value="-out" />
<arg value="${build.www}" />
<arg value="uk.co.middlesoft.trader.StockWatch" />
</java>
<copy todir="WebContent">
<fileset dir="${build.www}/${module}" includes="**/*"/>
</copy>
</target>
The final result of this meant that I could run and debug the client using the standard GWT hosted browser and deploy a "Cometified" version straight to Tomcat simply by running the ant script, without having to change any of the code or configuration.
Friday, 9 May 2008
Service Oriented UIs and MS Architecture Insight
Friday, 18 April 2008
System.Messaging for Mono
I have just released on Google Code a 0.0.1 release of a bridge between Mono's System.Messaging and Apache QPid. Apache QPid is an implementation of the AMQP protocol. AMQP is an open protocol designed to provide a standard wire protocol for asynchronous messaging buses. There are a number of other open source implementations of AMQP floating about too. RabbitMQ looks like another interesting implementation, built on Erlang.
I used the QPid client libraries to provide the client-side networking implementation. Unfortunately that requires linking to 8 different assemblies. Forcing users of Mono's System.Messaging to link to these at compile time seemed excessive, so I wanted to decouple the QPid implementation from System.Messaging. Unlike JMS, Microsoft's System.Messaging API was never designed to have multiple implementations. Therefore, in order to decouple the 2 I had to add a provider layer into the Mono code base. This is available in the form of a patch on the same Google code site.
The current version only provide basic sending and receiving. XML and Binary message formatting are also supported, however that is implemented inside of the Mono code base. I haven't looked into performance yet. It currently runs all requests over a single socket connection (multiplexing using AMQP's channel feature), which probably not the optimal approach.
I'll see how far I get before I head off on my big trip.