Subversion Repositories Integrator Subversion

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
771 blopes 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
package nonblocking;
18
 
19
import java.io.IOException;
20
import java.nio.charset.StandardCharsets;
21
import java.util.concurrent.atomic.AtomicInteger;
22
 
23
import javax.servlet.AsyncContext;
24
import javax.servlet.ReadListener;
25
import javax.servlet.ServletException;
26
import javax.servlet.ServletInputStream;
27
import javax.servlet.ServletOutputStream;
28
import javax.servlet.WriteListener;
29
import javax.servlet.http.HttpServlet;
30
import javax.servlet.http.HttpServletRequest;
31
import javax.servlet.http.HttpServletResponse;
32
 
33
/**
34
 * This doesn't do anything particularly useful - it just writes a series of
35
 * numbers to the response body while demonstrating how to perform non-blocking
36
 * writes.
37
 */
38
public class NumberWriter extends HttpServlet {
39
 
40
    private static final long serialVersionUID = 1L;
41
 
42
    @Override
43
    protected void doGet(HttpServletRequest req, HttpServletResponse resp)
44
            throws ServletException, IOException {
45
 
46
        resp.setContentType("text/plain");
47
        resp.setCharacterEncoding("UTF-8");
48
 
49
        // Non-blocking IO requires async
50
        AsyncContext ac = req.startAsync();
51
 
52
        // Use a single listener for read and write. Listeners often need to
53
        // share state to coordinate reads and writes and this is much easier as
54
        // a single object.
55
        @SuppressWarnings("unused")
56
        NumberWriterListener listener = new NumberWriterListener(
57
                ac, req.getInputStream(), resp.getOutputStream());
58
 
59
    }
60
 
61
 
62
    /**
63
     * Keep in mind that each call may well be on a different thread to the
64
     * previous call. Ensure that changes in values will be visible across
65
     * threads. There should only ever be one container thread at a time calling
66
     * the listener.
67
     */
68
    private static class NumberWriterListener implements ReadListener,
69
            WriteListener {
70
 
71
        private static final int LIMIT =  10000;
72
 
73
        private final AsyncContext ac;
74
        private final ServletInputStream sis;
75
        private final ServletOutputStream sos;
76
        private final AtomicInteger counter = new AtomicInteger(0);
77
 
78
        private volatile boolean readFinished = false;
79
        private byte[] buffer = new byte[8192];
80
 
81
        private NumberWriterListener(AsyncContext ac, ServletInputStream sis,
82
                ServletOutputStream sos) {
83
            this.ac = ac;
84
            this.sis = sis;
85
            this.sos = sos;
86
 
87
            // In Tomcat, the order the listeners are set controls the order
88
            // that the first calls are made. In this case, the read listener
89
            // will be called before the write listener.
90
            sis.setReadListener(this);
91
            sos.setWriteListener(this);
92
        }
93
 
94
        @Override
95
        public void onDataAvailable() throws IOException {
96
 
97
            // There should be no data to read
98
 
99
            int read = 0;
100
            // Loop as long as there is data to read. If isReady() returns false
101
            // the socket will be added to the poller and onDataAvailable() will
102
            // be called again as soon as there is more data to read.
103
            while (sis.isReady() && read > -1) {
104
                read = sis.read(buffer);
105
                if (read > 0) {
106
                    throw new IOException("Data was present in input stream");
107
                }
108
            }
109
        }
110
 
111
        @Override
112
        public void onAllDataRead() throws IOException {
113
            readFinished = true;
114
 
115
            // If sos is not ready to write data, the call to isReady() will
116
            // register the socket with the poller which will trigger a call to
117
            // onWritePossible() when the socket is ready to have data written
118
            // to it.
119
            if (sos.isReady()) {
120
                onWritePossible();
121
            }
122
        }
123
 
124
        @Override
125
        public void onWritePossible() throws IOException {
126
            if (readFinished) {
127
                int i = counter.get();
128
                boolean ready = true;
129
                while (i < LIMIT && ready) {
130
                    i = counter.incrementAndGet();
131
                    String msg = String.format("%1$020d\n", Integer.valueOf(i));
132
                    sos.write(msg.getBytes(StandardCharsets.UTF_8));
133
                    ready = sos.isReady();
134
                }
135
 
136
                if (i == LIMIT) {
137
                    ac.complete();
138
                }
139
            }
140
        }
141
 
142
        @Override
143
        public void onError(Throwable throwable) {
144
            // Should probably log the throwable
145
            ac.complete();
146
        }
147
    }
148
}