View Javadoc
1   package net.sf.mbus4j.slaves;
2   
3   /*
4    * #%L
5    * mbus4j-slaves
6    * %%
7    * Copyright (C) 2009 - 2014 MBus4J
8    * %%
9    * mbus4j - Drivers for the M-Bus protocol - http://mbus4j.sourceforge.net/
10   * Copyright (C) 2009-2014, mbus4j.sf.net, and individual contributors as indicated
11   * by the @authors tag. See the copyright.txt in the distribution for a
12   * full listing of individual contributors.
13   * 
14   * This is free software; you can redistribute it and/or modify it
15   * under the terms of the GNU General Public License as
16   * published by the Free Software Foundation; either version 3 of
17   * the License, or (at your option) any later version.
18   * 
19   * This software is distributed in the hope that it will be useful,
20   * but WITHOUT ANY WARRANTY; without even the implied warranty of
21   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22   * Lesser General Public License for more details.
23   * 
24   * You should have received a copy of the GNU Lesser General Public
25   * License along with this software; if not, write to the Free
26   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
27   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
28   * #L%
29   */
30  import java.io.UnsupportedEncodingException;
31  import net.sf.json.JSONArray;
32  import net.sf.json.JSONObject;
33  
34  import net.sf.mbus4j.dataframes.ApplicationReset;
35  import net.sf.mbus4j.dataframes.Frame;
36  import net.sf.mbus4j.dataframes.RequestClassXData;
37  import net.sf.mbus4j.dataframes.SelectionOfSlaves;
38  import net.sf.mbus4j.dataframes.SendInitSlave;
39  import net.sf.mbus4j.dataframes.SendUserData;
40  import net.sf.mbus4j.dataframes.SendUserDataManSpec;
41  import net.sf.mbus4j.decoder.Decoder;
42  import net.sf.mbus4j.encoder.Encoder;
43  import net.sf.mbus4j.json.JSONSerializable;
44  import net.sf.mbus4j.json.JsonSerializeType;
45  import net.sf.mbus4j.Connection;
46  import net.sf.mbus4j.SerialPortConnection;
47  
48  import java.io.BufferedReader;
49  import java.io.IOException;
50  import java.io.InputStream;
51  import java.io.InputStreamReader;
52  import java.io.OutputStream;
53  import java.io.OutputStreamWriter;
54  import java.io.Reader;
55  import java.util.ArrayList;
56  import java.util.List;
57  import java.util.concurrent.Callable;
58  import java.util.concurrent.LinkedBlockingQueue;
59  import java.util.concurrent.ThreadPoolExecutor;
60  import java.util.concurrent.TimeUnit;
61  import java.util.logging.Level;
62  import java.util.logging.Logger;
63  
64  import javax.script.Bindings;
65  import javax.script.ScriptEngine;
66  import javax.script.ScriptEngineManager;
67  import net.sf.mbus4j.decoder.DecoderListener;
68  import net.sf.mbus4j.log.LogUtils;
69  
70  /**
71   *
72   * @author arnep@users.sourceforge.net
73   * @version $Id: Slaves.java 110 2014-03-13 19:53:31Z arnep $
74   */
75  public class Slaves implements JSONSerializable {
76  
77      public static Slaves readJsonStream(InputStream is)
78              throws IOException {
79          BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
80          StringBuilder sb = new StringBuilder();
81          String line;
82  
83          while ((line = br.readLine()) != null) {
84              sb.append(line);
85          }
86  
87          Slaves result = new Slaves();
88          result.fromJSON(JSONObject.fromObject(sb.toString()));
89  
90          return result;
91      }
92  
93      @Override
94      public JSONObject toJSON(JsonSerializeType jsonSerializeType) {
95          JSONObject result = new JSONObject();
96  
97          result.accumulate(conn.getJsonFieldName(), conn.toJSON(jsonSerializeType));
98  
99          JSONArray jsonSlaves = new JSONArray();
100 
101         for (Slave s : slaves) {
102             jsonSlaves.add(s.toJSON(jsonSerializeType));
103         }
104 
105         result.accumulate("devices", jsonSlaves);
106 
107         return result;
108     }
109 
110     @Override
111     public void fromJSON(JSONObject json) {
112         conn = Connection.createFromJSON(json);
113 
114         JSONArray jsonSlaves = json.getJSONArray("devices");
115 
116         for (int i = 0; i < jsonSlaves.size(); i++) {
117             Slave s = new Slave();
118             s.fromJSON(jsonSlaves.getJSONObject(i));
119             addSlave(s);
120         }
121     }
122 
123     public int getSalvesSize() {
124         return slaves.size();
125     }
126 
127     public Slave getSlave(int index) {
128         return slaves.get(index);
129     }
130 
131     public void writeJsonStream(OutputStream os) throws UnsupportedEncodingException, IOException {
132         OutputStreamWriter osw = new OutputStreamWriter(os, "UTF-8");
133         JSONObject json = toJSON(JsonSerializeType.SLAVE_CONFIG);
134         String text = json.toString(1);
135         osw.write(text, 0, text.length());
136         osw.flush();
137         osw.close();
138     }
139 
140     public Connection getConnection() {
141         return conn;
142     }
143 
144     public void setConnection(Connection conn) {
145         this.conn = conn;
146     }
147 
148     private class RequestHandler
149             implements Callable<Frame> {
150 
151         private final Frame request;
152         private final Slave slave;
153 
154         RequestHandler(Frame request, Slave slave) {
155             this.request = request;
156             this.slave = slave;
157         }
158 
159         @Override
160         public Frame call()
161                 throws Exception {
162             try {
163                 LOG.log(Level.FINEST, "req dispatching: {0}", request);
164 
165                 Frame result;
166 
167                 switch (request.getControlCode()) {
168                     case CON_ACK:
169                         result = null;
170 
171                         break;
172 
173                     case REQ_UD1:
174                         result = slave.handleReqUd1((RequestClassXData) request);
175 
176                         break;
177 
178                     case REQ_UD2:
179                         result = slave.handleReqUd2((RequestClassXData) request);
180 
181                         break;
182 
183                     case RSP_UD:
184 
185                         if (request instanceof SendUserData) {
186                             result = slave.handleSendUserData((SendUserData) request);
187                         } else if (request instanceof ApplicationReset) {
188                             result = slave.handleApplicationReset((ApplicationReset) request);
189                         } else {
190                             result = null;
191                         }
192 
193                         break;
194 
195                     case SND_NKE:
196 
197                         if (request instanceof SendInitSlave) {
198                             result = slave.handleSendInitSlave((SendInitSlave) request);
199                         } else {
200                             result = null;
201                         }
202 
203                         break;
204 
205                     case SND_UD:
206 
207                         if (request instanceof SendUserData) {
208                             result = slave.handleSendUserData((SendUserData) request);
209                         } else if (request instanceof ApplicationReset) {
210                             result = slave.handleApplicationReset((ApplicationReset) request);
211                         } else if (request instanceof SendUserDataManSpec) {
212                             result = slave.handleSendUserDataManSpec((SendUserDataManSpec) request);
213                         } else if (request instanceof SelectionOfSlaves) {
214                             result = slave.handleSelectionOfSlaves((SelectionOfSlaves) request);
215                         } else {
216                             result = null;
217                         }
218 
219                         break;
220 
221                     default:
222                         result = null;
223                 }
224 
225                 if (result != null) {
226                     send(result);
227 
228                     LOG.log(Level.FINE, "req dispatched: ", result);
229                 } else {
230                     LOG.fine("req dispatched no result");
231                 }
232 
233                 return result;
234             } catch (Exception ex) {
235                 LOG.log(Level.SEVERE, "Call", ex);
236                 throw ex;
237             }
238         }
239     }
240 
241     private class StreamListener
242             implements Runnable, DecoderListener {
243 
244         ThreadPoolExecutor tpe
245                 = new ThreadPoolExecutor(5, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
246 
247 //    List<Future<Frame>> l = new ArrayList<Future<Frame>>();
248         @Override
249         public void run() {
250             try {
251                 int theData;
252                 Decoder parser = new Decoder(this);
253                 LOG.info("Wait for data to process");
254 
255                 try {
256                     while (!isClosed()) {
257                         if ((theData = conn.getInputStream().read()) == -1) {
258                             LOG.finest("Thread interrupted or EOF on waiting occured");
259                         } else {
260                             if (LOG.isLoggable(Level.FINEST)) {
261                                 LOG.finest(String.format("Data received 0x%02x", theData));
262                             }
263 
264                             try {
265                                 parser.addByte((byte) theData);
266                             } catch (Exception e) {
267                                 LOG.log(Level.SEVERE, "Error during createPackage()", e);
268                             }
269                         }
270                     }
271 
272                     LOG.info("closing down - finish waiting for new data");
273                 } catch (IOException e) {
274                     LOG.log(Level.SEVERE, "run()", e);
275                 } catch (Exception e) {
276                     LOG.log(Level.INFO, "finished waiting for packages", e);
277                 }
278             } finally {
279                 tpe.shutdownNow();
280             }
281         }
282 
283         private boolean isClosed() {
284             return conn == null ? true : conn.getConnState().equals(Connection.State.CLOSED) || conn.getConnState().equals(Connection.State.CLOSING);
285         }
286 
287         @Override
288         public void success(Frame frame) {
289             if (frame != null) {
290                 if (LOG.isLoggable(Level.FINEST)) {
291                     LOG.log(Level.FINEST, "Frame parsed ... will process: ", frame);
292                 } else {
293                     LOG.fine("Frame parsed ... will process");
294                 }
295 
296                 for (Slave slave : slaves) {
297                     if (slave.willHandleRequest(frame)) {
298                         LOG.log(Level.FINE, "Frame will be handled by slave {}", slave.slaveIdToString());
299                         tpe.submit(new RequestHandler(frame, slave));
300                     }
301                 }
302             }
303         }
304 
305     }
306     private static Logger LOG = LogUtils.getSlaveLogger();
307 
308     public static void main(String[] args)
309             throws Exception {
310         Slaves app = new Slaves();
311         int timeout = 0;
312         ScriptEngineManager scriptManager = new ScriptEngineManager();
313         ScriptEngine js = scriptManager.getEngineByExtension("js");
314 
315         Bindings bindings = js.createBindings();
316         bindings.put("app", app);
317 
318         InputStream is = Slaves.class.getResourceAsStream("/acw-test-slaves.js");
319         Reader in = new InputStreamReader(is);
320         Boolean result = (Boolean) js.eval(in, bindings);
321 
322         SerialPortConnection sc = new SerialPortConnection(args[0]);
323         app.setConnection(sc);
324         app.open();
325         try {
326             if (args.length > 1) {
327                 timeout = Integer.parseInt(args[1]);
328             }
329 
330             try {
331                 if (timeout > 0) {
332                     Thread.sleep(timeout);
333                 } else {
334                     System.in.read();
335                 }
336             } catch (InterruptedException ex) {
337                 System.err.print("Error sleep " + ex);
338             }
339         } finally {
340             try {
341                 app.close();
342             } catch (InterruptedException ex) {
343                 LOG.log(Level.SEVERE, "main", ex);
344             }
345         }
346     }
347     private Connection conn;
348     private List<Slave> slaves = new ArrayList<Slave>();
349     private Encoder encoder = new Encoder();
350     private Thread t;
351     private StreamListener streamListener = new StreamListener();
352 
353     public Slaves() {
354         super();
355     }
356 
357     public boolean addSlave(Slave slave) {
358         return slaves.add(slave);
359     }
360 
361     public boolean removeSlave(Slave slave) {
362         return slaves.remove(slave);
363     }
364 
365     public void close() throws InterruptedException, IOException {
366         if (conn != null) {
367             conn.close();
368             Thread.sleep(100);
369             t.interrupt();
370         }
371     }
372 
373     private void send(Frame frame)
374             throws IOException {
375         conn.getOutputStrteam().write(encoder.encode(frame));
376         conn.getOutputStrteam().flush();
377     }
378 
379     public void open() throws IOException {
380         conn.open();
381         t = new Thread(streamListener);
382         t.setDaemon(true);
383         t.start();
384     }
385 
386     public int slaveIndexOf(Slave s) {
387         return slaves.indexOf(s);
388     }
389 }