Subversion Repositories Integrator Subversion

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
771 blopes 1
/*
2
 *  Licensed to the Apache Software Foundation (ASF) under one or more
3
 *  contributor license agreements.  See the NOTICE file distributed with
4
 *  this work for additional information regarding copyright ownership.
5
 *  The ASF licenses this file to You under the Apache License, Version 2.0
6
 *  (the "License"); you may not use this file except in compliance with
7
 *  the License.  You may obtain a copy of the License at
8
 *
9
 *      http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 *  Unless required by applicable law or agreed to in writing, software
12
 *  distributed under the License is distributed on an "AS IS" BASIS,
13
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 *  See the License for the specific language governing permissions and
15
 *  limitations under the License.
16
 */
17
package async;
18
 
19
import java.io.IOException;
20
import java.io.PrintWriter;
21
import java.util.concurrent.ConcurrentLinkedQueue;
22
import java.util.concurrent.atomic.AtomicInteger;
23
 
24
import javax.servlet.AsyncContext;
25
import javax.servlet.AsyncEvent;
26
import javax.servlet.AsyncListener;
27
import javax.servlet.ServletContext;
28
import javax.servlet.ServletException;
29
import javax.servlet.http.HttpServlet;
30
import javax.servlet.http.HttpServletRequest;
31
import javax.servlet.http.HttpServletResponse;
32
 
33
import org.apache.juli.logging.Log;
34
import org.apache.juli.logging.LogFactory;
35
 
36
import async.Stockticker.Stock;
37
import async.Stockticker.TickListener;
38
 
39
public class AsyncStockServlet extends HttpServlet implements TickListener, AsyncListener{
40
 
41
    private static final long serialVersionUID = 1L;
42
 
43
    private static final Log log = LogFactory.getLog(AsyncStockServlet.class);
44
 
45
    private static final ConcurrentLinkedQueue<AsyncContext> clients =
46
            new ConcurrentLinkedQueue<>();
47
    private static final AtomicInteger clientcount = new AtomicInteger(0);
48
 
49
    public AsyncStockServlet() {
50
        log.info("AsyncStockServlet created");
51
    }
52
 
53
 
54
    @Override
55
    protected void service(HttpServletRequest req, HttpServletResponse resp)
56
            throws ServletException, IOException {
57
        if (req.isAsyncStarted()) {
58
            req.getAsyncContext().complete();
59
        } else if (req.isAsyncSupported()) {
60
            AsyncContext actx = req.startAsync();
61
            actx.addListener(this);
62
            resp.setContentType("text/plain");
63
            clients.add(actx);
64
            if (clientcount.incrementAndGet()==1) {
65
                Stockticker ticker = (Stockticker) req.getServletContext().getAttribute(
66
                        AsyncStockContextListener.STOCK_TICKER_KEY);
67
                ticker.addTickListener(this);
68
            }
69
        } else {
70
            new Exception("Async Not Supported").printStackTrace();
71
            resp.sendError(400,"Async is not supported.");
72
        }
73
    }
74
 
75
 
76
    @Override
77
    public void tick(Stock stock) {
78
        for (AsyncContext actx : clients) {
79
            try {
80
                writeStock(actx, stock);
81
            } catch (Exception e) {
82
                // Ignore. The async error handling will deal with this.
83
            }
84
        }
85
    }
86
 
87
 
88
    public void writeStock(AsyncContext actx, Stock stock) throws IOException {
89
        HttpServletResponse response = (HttpServletResponse)actx.getResponse();
90
        PrintWriter writer = response.getWriter();
91
        writer.write("STOCK#");//make client parsing easier
92
        writer.write(stock.getSymbol());
93
        writer.write("#");
94
        writer.write(stock.getValueAsString());
95
        writer.write("#");
96
        writer.write(stock.getLastChangeAsString());
97
        writer.write("#");
98
        writer.write(String.valueOf(stock.getCnt()));
99
        writer.write("\n");
100
        writer.flush();
101
        response.flushBuffer();
102
    }
103
 
104
 
105
    @Override
106
    public void shutdown() {
107
        // The web application is shutting down. Complete any AsyncContexts
108
        // associated with an active client.
109
        for (AsyncContext actx : clients) {
110
            try {
111
                actx.complete();
112
            } catch (Exception e) {
113
                // Ignore. The async error handling will deal with this.
114
            }
115
        }
116
    }
117
 
118
 
119
    @Override
120
    public void onComplete(AsyncEvent event) throws IOException {
121
        if (clients.remove(event.getAsyncContext()) && clientcount.decrementAndGet()==0) {
122
            ServletContext sc = event.getAsyncContext().getRequest().getServletContext();
123
            Stockticker ticker = (Stockticker) sc.getAttribute(
124
                    AsyncStockContextListener.STOCK_TICKER_KEY);
125
            ticker.removeTickListener(this);
126
        }
127
    }
128
 
129
    @Override
130
    public void onError(AsyncEvent event) throws IOException {
131
        event.getAsyncContext().complete();
132
    }
133
 
134
    @Override
135
    public void onTimeout(AsyncEvent event) throws IOException {
136
        event.getAsyncContext().complete();
137
    }
138
 
139
 
140
    @Override
141
    public void onStartAsync(AsyncEvent event) throws IOException {
142
        // NOOP
143
    }
144
}