1 package net.sf.mbus4j.slaves;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 import java.io.UnsupportedEncodingException;
31 import net.sf.json.JSONArray;
32 import net.sf.json.JSONObject;
33
34 import net.sf.mbus4j.dataframes.ApplicationReset;
35 import net.sf.mbus4j.dataframes.Frame;
36 import net.sf.mbus4j.dataframes.RequestClassXData;
37 import net.sf.mbus4j.dataframes.SelectionOfSlaves;
38 import net.sf.mbus4j.dataframes.SendInitSlave;
39 import net.sf.mbus4j.dataframes.SendUserData;
40 import net.sf.mbus4j.dataframes.SendUserDataManSpec;
41 import net.sf.mbus4j.decoder.Decoder;
42 import net.sf.mbus4j.encoder.Encoder;
43 import net.sf.mbus4j.json.JSONSerializable;
44 import net.sf.mbus4j.json.JsonSerializeType;
45 import net.sf.mbus4j.Connection;
46 import net.sf.mbus4j.SerialPortConnection;
47
48 import java.io.BufferedReader;
49 import java.io.IOException;
50 import java.io.InputStream;
51 import java.io.InputStreamReader;
52 import java.io.OutputStream;
53 import java.io.OutputStreamWriter;
54 import java.io.Reader;
55 import java.util.ArrayList;
56 import java.util.List;
57 import java.util.concurrent.Callable;
58 import java.util.concurrent.LinkedBlockingQueue;
59 import java.util.concurrent.ThreadPoolExecutor;
60 import java.util.concurrent.TimeUnit;
61 import java.util.logging.Level;
62 import java.util.logging.Logger;
63
64 import javax.script.Bindings;
65 import javax.script.ScriptEngine;
66 import javax.script.ScriptEngineManager;
67 import net.sf.mbus4j.decoder.DecoderListener;
68 import net.sf.mbus4j.log.LogUtils;
69
70
71
72
73
74
75 public class Slaves implements JSONSerializable {
76
77 public static Slaves readJsonStream(InputStream is)
78 throws IOException {
79 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
80 StringBuilder sb = new StringBuilder();
81 String line;
82
83 while ((line = br.readLine()) != null) {
84 sb.append(line);
85 }
86
87 Slaves result = new Slaves();
88 result.fromJSON(JSONObject.fromObject(sb.toString()));
89
90 return result;
91 }
92
93 @Override
94 public JSONObject toJSON(JsonSerializeType jsonSerializeType) {
95 JSONObject result = new JSONObject();
96
97 result.accumulate(conn.getJsonFieldName(), conn.toJSON(jsonSerializeType));
98
99 JSONArray jsonSlaves = new JSONArray();
100
101 for (Slave s : slaves) {
102 jsonSlaves.add(s.toJSON(jsonSerializeType));
103 }
104
105 result.accumulate("devices", jsonSlaves);
106
107 return result;
108 }
109
110 @Override
111 public void fromJSON(JSONObject json) {
112 conn = Connection.createFromJSON(json);
113
114 JSONArray jsonSlaves = json.getJSONArray("devices");
115
116 for (int i = 0; i < jsonSlaves.size(); i++) {
117 Slave s = new Slave();
118 s.fromJSON(jsonSlaves.getJSONObject(i));
119 addSlave(s);
120 }
121 }
122
123 public int getSalvesSize() {
124 return slaves.size();
125 }
126
127 public Slave getSlave(int index) {
128 return slaves.get(index);
129 }
130
131 public void writeJsonStream(OutputStream os) throws UnsupportedEncodingException, IOException {
132 OutputStreamWriter osw = new OutputStreamWriter(os, "UTF-8");
133 JSONObject json = toJSON(JsonSerializeType.SLAVE_CONFIG);
134 String text = json.toString(1);
135 osw.write(text, 0, text.length());
136 osw.flush();
137 osw.close();
138 }
139
140 public Connection getConnection() {
141 return conn;
142 }
143
144 public void setConnection(Connection conn) {
145 this.conn = conn;
146 }
147
148 private class RequestHandler
149 implements Callable<Frame> {
150
151 private final Frame request;
152 private final Slave slave;
153
154 RequestHandler(Frame request, Slave slave) {
155 this.request = request;
156 this.slave = slave;
157 }
158
159 @Override
160 public Frame call()
161 throws Exception {
162 try {
163 LOG.log(Level.FINEST, "req dispatching: {0}", request);
164
165 Frame result;
166
167 switch (request.getControlCode()) {
168 case CON_ACK:
169 result = null;
170
171 break;
172
173 case REQ_UD1:
174 result = slave.handleReqUd1((RequestClassXData) request);
175
176 break;
177
178 case REQ_UD2:
179 result = slave.handleReqUd2((RequestClassXData) request);
180
181 break;
182
183 case RSP_UD:
184
185 if (request instanceof SendUserData) {
186 result = slave.handleSendUserData((SendUserData) request);
187 } else if (request instanceof ApplicationReset) {
188 result = slave.handleApplicationReset((ApplicationReset) request);
189 } else {
190 result = null;
191 }
192
193 break;
194
195 case SND_NKE:
196
197 if (request instanceof SendInitSlave) {
198 result = slave.handleSendInitSlave((SendInitSlave) request);
199 } else {
200 result = null;
201 }
202
203 break;
204
205 case SND_UD:
206
207 if (request instanceof SendUserData) {
208 result = slave.handleSendUserData((SendUserData) request);
209 } else if (request instanceof ApplicationReset) {
210 result = slave.handleApplicationReset((ApplicationReset) request);
211 } else if (request instanceof SendUserDataManSpec) {
212 result = slave.handleSendUserDataManSpec((SendUserDataManSpec) request);
213 } else if (request instanceof SelectionOfSlaves) {
214 result = slave.handleSelectionOfSlaves((SelectionOfSlaves) request);
215 } else {
216 result = null;
217 }
218
219 break;
220
221 default:
222 result = null;
223 }
224
225 if (result != null) {
226 send(result);
227
228 LOG.log(Level.FINE, "req dispatched: ", result);
229 } else {
230 LOG.fine("req dispatched no result");
231 }
232
233 return result;
234 } catch (Exception ex) {
235 LOG.log(Level.SEVERE, "Call", ex);
236 throw ex;
237 }
238 }
239 }
240
241 private class StreamListener
242 implements Runnable, DecoderListener {
243
244 ThreadPoolExecutor tpe
245 = new ThreadPoolExecutor(5, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
246
247
248 @Override
249 public void run() {
250 try {
251 int theData;
252 Decoder parser = new Decoder(this);
253 LOG.info("Wait for data to process");
254
255 try {
256 while (!isClosed()) {
257 if ((theData = conn.getInputStream().read()) == -1) {
258 LOG.finest("Thread interrupted or EOF on waiting occured");
259 } else {
260 if (LOG.isLoggable(Level.FINEST)) {
261 LOG.finest(String.format("Data received 0x%02x", theData));
262 }
263
264 try {
265 parser.addByte((byte) theData);
266 } catch (Exception e) {
267 LOG.log(Level.SEVERE, "Error during createPackage()", e);
268 }
269 }
270 }
271
272 LOG.info("closing down - finish waiting for new data");
273 } catch (IOException e) {
274 LOG.log(Level.SEVERE, "run()", e);
275 } catch (Exception e) {
276 LOG.log(Level.INFO, "finished waiting for packages", e);
277 }
278 } finally {
279 tpe.shutdownNow();
280 }
281 }
282
283 private boolean isClosed() {
284 return conn == null ? true : conn.getConnState().equals(Connection.State.CLOSED) || conn.getConnState().equals(Connection.State.CLOSING);
285 }
286
287 @Override
288 public void success(Frame frame) {
289 if (frame != null) {
290 if (LOG.isLoggable(Level.FINEST)) {
291 LOG.log(Level.FINEST, "Frame parsed ... will process: ", frame);
292 } else {
293 LOG.fine("Frame parsed ... will process");
294 }
295
296 for (Slave slave : slaves) {
297 if (slave.willHandleRequest(frame)) {
298 LOG.log(Level.FINE, "Frame will be handled by slave {}", slave.slaveIdToString());
299 tpe.submit(new RequestHandler(frame, slave));
300 }
301 }
302 }
303 }
304
305 }
306 private static Logger LOG = LogUtils.getSlaveLogger();
307
308 public static void main(String[] args)
309 throws Exception {
310 Slaves app = new Slaves();
311 int timeout = 0;
312 ScriptEngineManager scriptManager = new ScriptEngineManager();
313 ScriptEngine js = scriptManager.getEngineByExtension("js");
314
315 Bindings bindings = js.createBindings();
316 bindings.put("app", app);
317
318 InputStream is = Slaves.class.getResourceAsStream("/acw-test-slaves.js");
319 Reader in = new InputStreamReader(is);
320 Boolean result = (Boolean) js.eval(in, bindings);
321
322 SerialPortConnection sc = new SerialPortConnection(args[0]);
323 app.setConnection(sc);
324 app.open();
325 try {
326 if (args.length > 1) {
327 timeout = Integer.parseInt(args[1]);
328 }
329
330 try {
331 if (timeout > 0) {
332 Thread.sleep(timeout);
333 } else {
334 System.in.read();
335 }
336 } catch (InterruptedException ex) {
337 System.err.print("Error sleep " + ex);
338 }
339 } finally {
340 try {
341 app.close();
342 } catch (InterruptedException ex) {
343 LOG.log(Level.SEVERE, "main", ex);
344 }
345 }
346 }
347 private Connection conn;
348 private List<Slave> slaves = new ArrayList<Slave>();
349 private Encoder encoder = new Encoder();
350 private Thread t;
351 private StreamListener streamListener = new StreamListener();
352
353 public Slaves() {
354 super();
355 }
356
357 public boolean addSlave(Slave slave) {
358 return slaves.add(slave);
359 }
360
361 public boolean removeSlave(Slave slave) {
362 return slaves.remove(slave);
363 }
364
365 public void close() throws InterruptedException, IOException {
366 if (conn != null) {
367 conn.close();
368 Thread.sleep(100);
369 t.interrupt();
370 }
371 }
372
373 private void send(Frame frame)
374 throws IOException {
375 conn.getOutputStrteam().write(encoder.encode(frame));
376 conn.getOutputStrteam().flush();
377 }
378
379 public void open() throws IOException {
380 conn.open();
381 t = new Thread(streamListener);
382 t.setDaemon(true);
383 t.start();
384 }
385
386 public int slaveIndexOf(Slave s) {
387 return slaves.indexOf(s);
388 }
389 }