1 package net.sf.mbus4j.master;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 import java.io.BufferedReader;
31 import java.io.Closeable;
32 import java.io.FileOutputStream;
33 import net.sf.mbus4j.dataframes.Frame;
34 import net.sf.mbus4j.dataframes.MBusResponseFramesContainer;
35 import net.sf.mbus4j.dataframes.RequestClassXData;
36 import net.sf.mbus4j.dataframes.SelectionOfSlaves;
37 import net.sf.mbus4j.dataframes.SingleCharFrame;
38 import net.sf.mbus4j.dataframes.UserDataResponse;
39 import net.sf.mbus4j.dataframes.datablocks.DataBlock;
40 import net.sf.mbus4j.decoder.Decoder;
41 import net.sf.mbus4j.devices.DeviceFactory;
42 import net.sf.mbus4j.devices.GenericDevice;
43 import net.sf.mbus4j.devices.Sender;
44 import net.sf.mbus4j.encoder.Encoder;
45
46 import java.io.IOException;
47 import java.io.InputStream;
48 import java.io.InputStreamReader;
49 import java.io.OutputStreamWriter;
50 import java.io.UnsupportedEncodingException;
51 import java.util.ArrayList;
52 import java.util.Arrays;
53 import java.util.HashMap;
54 import java.util.Iterator;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.Queue;
58 import java.util.concurrent.ConcurrentLinkedQueue;
59 import java.util.logging.Level;
60 import java.util.logging.Logger;
61 import net.sf.json.JSONArray;
62 import net.sf.json.JSONObject;
63 import net.sf.mbus4j.Connection;
64 import net.sf.mbus4j.MBusAddressing;
65 import net.sf.mbus4j.MBusUtils;
66 import net.sf.mbus4j.dataframes.GarbageCharFrame;
67 import net.sf.mbus4j.dataframes.MBusMedium;
68 import net.sf.mbus4j.dataframes.SendInitSlave;
69 import net.sf.mbus4j.decoder.DecoderListener;
70 import net.sf.mbus4j.json.JSONSerializable;
71 import net.sf.mbus4j.json.JsonSerializeType;
72 import net.sf.mbus4j.log.LogUtils;
73
74
75
76
77
78
79
80
81
82 public class MBusMaster
83 implements Iterable<GenericDevice>,
84 Sender,
85 JSONSerializable,
86 Closeable {
87
88 public void cancel() {
89
90 }
91
92 public static GenericDevice getDevice(Iterable<GenericDevice> deviceList, ValueRequestPointLocator locator) {
93 for (GenericDevice dev : deviceList) {
94 if ((dev.getAddress() == locator.getAddress())
95 && (dev.getIdentNumber() == locator.getIdentnumber())
96 && dev.getManufacturer().equals(locator.getManufacturer())
97 && dev.getMedium().equals(locator.getMedium())
98 && (dev.getVersion() == locator.getVersion())) {
99 return dev;
100 }
101 }
102
103 return null;
104 }
105
106 public static DataBlock getDataBlock(Frame frame, ValueRequestPointLocator locator) {
107 if (frame instanceof UserDataResponse) {
108 for (DataBlock db : (UserDataResponse) frame) {
109 if (db.getDataFieldCode().equals(locator.getDifCode())
110 && db.getVif().equals(locator.getVif())
111 && db.getFunctionField().equals(locator.getFunctionField())
112 && (db.getStorageNumber() == locator.getStorageNumber())
113 && (db.getSubUnit() == locator.getDeviceUnit())
114 && (db.getTariff() == locator.getTariff())
115 && Arrays.equals(db.getVifes(),
116 locator.getVifes())) {
117 return db;
118 }
119 }
120 } else if (frame == null) {
121 return null;
122 } else {
123 throw new RuntimeException("Response is not a UserDataResponse but: " + frame);
124 }
125
126 throw new RuntimeException("can't find datablock of locator");
127 }
128
129 private DataBlock getTimeStampDB(GenericDevice dev, ValueRequestPointLocator locator) {
130
131 return null;
132 }
133
134 public Connection getConnection() {
135 return conn;
136 }
137
138 public void writeJsonStream(FileOutputStream os) throws UnsupportedEncodingException, IOException {
139 try (OutputStreamWriter osw = new OutputStreamWriter(os, "UTF-8")) {
140 JSONObject json = toJSON(JsonSerializeType.SLAVE_CONFIG);
141 String text = json.toString(1);
142 osw.write(text, 0, text.length());
143 osw.flush();
144 }
145 }
146
147
148
149
150 public long getLastByteSended() {
151 return lastByteSended;
152 }
153
154 private class StreamListener implements Runnable, DecoderListener {
155
156 private final Decoder parser = new Decoder(this);
157
158 @Override
159 public void run() {
160 log.fine("Thread MBus StreamListener Started");
161 parser.reset();
162 try {
163 try {
164 while (!isClosed()) {
165 try {
166 int theData;
167 if ((theData = conn.getInputStream().read()) == -1) {
168 if (log.isLoggable(Level.FINEST)) {
169 log.finest("Thread interrupted or eof on waiting occured");
170 }
171 } else {
172 try {
173
174 parser.addByte((byte) theData);
175 } catch (Exception e) {
176 log.log(Level.SEVERE, "Error during createPackage()", e);
177 parser.reset();
178 }
179 }
180 } catch (NullPointerException npe) {
181 if (!isClosed()) {
182 throw new RuntimeException(npe);
183 }
184 }
185 }
186 log.info("Thread MBus StreamListener Will stop");
187 } catch (IOException e) {
188 if (isClosed()) {
189 log.log(Level.FINE, "Port Closed", e);
190 } else {
191 log.log(Level.SEVERE, "run()", e);
192 }
193 } catch (RuntimeException e) {
194 if (isClosed()) {
195 log.log(Level.FINE, "Port Closed", e);
196 } else {
197 log.log(Level.INFO, "finished waiting for packages", e);
198 }
199 }
200 } catch (Throwable t) {
201 log.log(Level.SEVERE, "END", t);
202 } finally {
203 log.fine("Thread MBus StreamListener Stopped");
204 parser.reset();
205 }
206 }
207
208 private boolean isClosed() {
209 return conn == null ? true : conn.getConnState().equals(Connection.State.CLOSED) || conn.getConnState().equals(Connection.State.CLOSING);
210 }
211
212 @Override
213 public void success(Frame frame) {
214 log.log(Level.FINER, "New frame parsed {0}", frame);
215
216 synchronized (frameQueue) {
217 frameQueue.add(frame);
218 frameQueue.notifyAll();
219 }
220 }
221
222 void resetDecoder() {
223 parser.reset();
224 }
225
226 }
227 private final static Logger log = LogUtils.getMasterLogger();
228 private final List<GenericDevice> devices = new ArrayList<>();
229 private final Encoder encoder = new Encoder();
230 private Thread t;
231 private final StreamListener streamListener = new StreamListener();
232 private final Queue<Frame> frameQueue = new ConcurrentLinkedQueue<>();
233 private Connection conn;
234 private long lastByteSended;
235
236 public MBusMaster() {
237 super();
238 }
239
240
241
242
243
244
245
246
247 public boolean addDevice(GenericDevice device) {
248 for (GenericDevice dev : devices) {
249 if (dev.getIdentNumber() == device.getIdentNumber()) {
250 if (dev.getManufacturer().equals(device.getManufacturer()) && dev.getMedium().equals(device.getMedium())) {
251 return false;
252 }
253 }
254 }
255 return devices.add(device);
256 }
257
258
259 public GenericDevice addDeviceByAddress(byte address)
260 throws InterruptedException, IOException {
261 Frame f = sendRequestUserData(address);
262
263 GenericDevice result = null;
264 if (f instanceof UserDataResponse) {
265 UserDataResponse udr = (UserDataResponse) f;
266
267 result = DeviceFactory.createDevice(udr, new RequestClassXData(Frame.ControlCode.REQ_UD2, (byte) address));
268 addDevice(result);
269 log.info(String.format("added device: address = 0x%02X, id = %08d, man = %s, medium = %s, version = 0x%02X",
270 udr.getAddress(),
271 udr.getIdentNumber(),
272 udr.getManufacturer(),
273 udr.getMedium(),
274 udr.getVersion()));
275
276 } else {
277 log.info(String.format("no device at address = 0x%02X", address));
278 }
279 return result;
280 }
281
282 public int deviceIndexOf(GenericDevice d) {
283 return devices.indexOf(d);
284 }
285
286 public void clearDevices() {
287 devices.clear();
288 }
289
290 private void clearFrameQueue() {
291 synchronized (frameQueue) {
292 frameQueue.clear();
293 }
294 }
295
296 @Override
297 public void close() throws IOException {
298 if (conn != null) {
299 log.fine("TRY CLOSING");
300 conn.close();
301 log.fine("CLOSED");
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316 t = null;
317 }
318 }
319
320
321
322
323 public void deselectBySecondaryAddress() {
324 }
325
326 public int deviceCount() {
327 return devices.size();
328 }
329
330 public GenericDevice getDevice(int i) {
331 return devices.get(i);
332 }
333
334 public GenericDevice[] getDevices() {
335 return devices.toArray(new GenericDevice[deviceCount()]);
336 }
337
338
339
340
341
342
343 public long getIdleTime() {
344 return 33000 / conn.getBitPerSecond();
345 }
346
347
348
349
350
351
352
353 public long getResponseTimeout() {
354 return ((((512 * 11) + 330) * 1000) / conn.getBitPerSecond()) + conn.getResponseTimeOutOffset();
355 }
356
357 public long getShortResponseTimeout() {
358 return getIdleTime() * 10 + conn.getResponseTimeOutOffset();
359 }
360
361 @Override
362 public Iterator<GenericDevice> iterator() {
363 return devices.iterator();
364 }
365
366 private Frame removeFrame() {
367 return frameQueue.remove();
368 }
369
370 public GenericDevice[] searchDevicesBySecondaryAddressing(int maxTries) throws IOException, InterruptedException {
371 return widcardSearch(0xFFFFFFFF, (short) 0xFFFF, (byte) 0xFF, (byte) 0xFF, maxTries);
372 }
373
374
375
376
377
378
379
380
381
382 public GenericDevice[] searchDevicesByPrimaryAddress()
383 throws IOException, InterruptedException {
384 return searchDevicesByPrimaryAddress((byte) 0, MBusUtils.LAST_REGULAR_PRIMARY_ADDRESS);
385 }
386
387 public GenericDevice[] searchDevicesByPrimaryAddress(byte first, byte last)
388 throws IOException, InterruptedException {
389 List<GenericDevice> result = new ArrayList<>();
390 final short fAddr = (short) (first & 0xFF);
391 final short lAddr = (short) (last & 0xFF);
392 for (short i = fAddr; i <= lAddr; i++) {
393 GenericDevice c = addDeviceByAddress((byte) i);
394 if (c != null) {
395 result.add(c);
396 }
397 }
398 return result.toArray(new GenericDevice[result.size()]);
399 }
400
401 @Override
402 public Frame send(Frame frame, int maxTries, long timeout)
403 throws IOException, InterruptedException {
404 for (int tries = 0; tries <= maxTries; tries++) {
405 clearFrameQueue();
406
407 final byte[] b = encoder.encode(frame);
408
409
410 streamListener.resetDecoder();
411
412 conn.getOutputStrteam().write(b);
413 conn.getOutputStrteam().flush();
414 lastByteSended = System.currentTimeMillis();
415
416 if (log.isLoggable(Level.FINE)) {
417 log.log(Level.FINE, "Data Sent (try: {0}): {} ",
418 new Object[]{tries, Decoder.bytes2Ascii(b)});
419 }
420
421 Frame result = pollFrameOrWaitUntil(lastByteSended + timeout);
422 log.log(Level.FINE, "Answer took {0} ms", System.currentTimeMillis() - lastByteSended);
423
424 if (result != null) {
425 return result;
426 } else {
427 Thread.sleep(getIdleTime());
428 }
429 }
430 log.log(Level.INFO, "max tries({0}) reached .. aborting send to: {1}", new Object[]{maxTries, frame});
431
432 return null;
433 }
434
435 public Map<GenericDevice, Frame> sendRequestUserData(MBusAddressing addressing)
436 throws IOException, InterruptedException {
437 Map<GenericDevice, MBusAddressing> devMap = new HashMap<>();
438 for (GenericDevice d : devices) {
439 devMap.put(d, addressing);
440 }
441 return sendRequestUserData(devMap);
442 }
443
444 public Frame sendInitSlave(byte address) throws IOException, InterruptedException {
445 SendInitSlave req = new SendInitSlave(address);
446 return send(req, DEFAULT_SEND_TRIES, getResponseTimeout());
447 }
448
449 public Frame sendRequestUserData(byte address)
450 throws IOException, InterruptedException {
451 RequestClassXData req = new RequestClassXData(Frame.ControlCode.REQ_UD2, address);
452 req.setFcb(true);
453 return send(req, DEFAULT_SEND_TRIES, getResponseTimeout());
454 }
455
456 public Map<GenericDevice, Frame> sendRequestUserData(Map<GenericDevice, MBusAddressing> devices)
457 throws IOException, InterruptedException {
458 Map<GenericDevice, Frame> result = new HashMap<>();
459
460 byte address;
461 for (GenericDevice dev : devices.keySet()) {
462 MBusAddressing addressing = devices.get(dev);
463 if (MBusAddressing.SECONDARY.equals(addressing)) {
464 selectDevice(dev);
465 address = MBusUtils.SLAVE_SELECT_PRIMARY_ADDRESS;
466 } else {
467 address = dev.getAddress();
468 }
469 result.put(dev, sendRequestUserData(address));
470 if (MBusAddressing.SECONDARY.equals(addressing)) {
471
472 }
473 }
474
475 return result;
476 }
477
478 public int sendSlaveSelect(int bcdMaskedId, short maskedMan, byte maskedVersion,
479 byte maskedMedium, int maxTries) throws IOException, InterruptedException {
480
481 if (log.isLoggable(Level.FINE)) {
482 log.fine(String.format("Will select Slave: id=0x%08X, man=0x%04X, ver=0x%02X, medium=0x%02X", bcdMaskedId, maskedMan, maskedVersion, maskedMedium));
483 }
484 SelectionOfSlaves selOfSl = new SelectionOfSlaves((byte) MBusUtils.SLAVE_SELECT_PRIMARY_ADDRESS);
485 selOfSl.setBcdMaskedId(bcdMaskedId);
486 selOfSl.setMaskedMan(maskedMan);
487 selOfSl.setMaskedVersion(maskedVersion);
488 selOfSl.setMaskedMedium(maskedMedium);
489
490 int result;
491 Frame resultFrame = send(selOfSl, maxTries, getShortResponseTimeout());
492 if (resultFrame == null) {
493 return 0;
494 }
495 if (resultFrame instanceof SingleCharFrame) {
496 log.log(Level.FINE, "Slave selected {0}", bcdMaskedId);
497 result = 1;
498 } else if (resultFrame instanceof GarbageCharFrame) {
499 log.log(Level.FINE, "Multiple Slaves selected {0}", bcdMaskedId);
500 result = 2;
501 } else {
502 log.severe(String.format("unexpected Frame received \n \"%s\" \n tried to select Slave: id=0x%08X, man=0x%04X, ver=0x%02X, medium=0x%02X", resultFrame.toString(), bcdMaskedId, maskedMan, maskedVersion, maskedMedium));
503 return 0;
504 }
505 log.fine("Wait for more Answers of slave select");
506 result += waitForSingleCharsOrGarbage(getShortResponseTimeout());
507 return result;
508 }
509
510 public void setConnection(Connection conn) {
511 this.conn = conn;
512 }
513
514 public Closeable open() throws IOException {
515 conn.open();
516 t = new Thread(streamListener);
517 t.setDaemon(true);
518 t.start();
519 return this;
520 }
521
522 private Frame pollFrameOrWaitUntil(long endTime) throws InterruptedException {
523 synchronized (frameQueue) {
524 while (endTime - System.currentTimeMillis() > 0) {
525 if (frameQueue.peek() == null) {
526 log.log(Level.FINE, "Wait max for {0} ms", endTime - System.currentTimeMillis());
527 frameQueue.wait(endTime - System.currentTimeMillis());
528 } else {
529 return frameQueue.poll();
530 }
531 }
532 return frameQueue.poll();
533 }
534 }
535
536 private int waitForSingleCharsOrGarbage(long timeout)
537 throws InterruptedException {
538 int result = 0;
539
540 while ((System.currentTimeMillis() - lastByteSended) <= timeout) {
541 Frame frame = pollFrameOrWaitUntil(lastByteSended + timeout);
542
543 if (frame instanceof SingleCharFrame) {
544 result++;
545 } else if (frame instanceof GarbageCharFrame) {
546 result++;
547 } else {
548 return result;
549 }
550 }
551
552 return result;
553 }
554
555 private int getLeftmostMaskedNibble(int value) {
556 int mask = 0xF0000000;
557 for (int nibblePos = 7; nibblePos >= 0; nibblePos--) {
558 if ((value & mask) == mask) {
559 return nibblePos;
560 } else {
561 mask >>>= 4;
562 }
563 }
564 return -1;
565 }
566
567 private int exchangeNibbleAtPos(int nibblePos, int value, int nibbleValue) {
568 int mask = ~(0x0F << (nibblePos * 4));
569 int nibbleToSet = (nibbleValue & 0x0F) << (nibblePos * 4);
570 return (value & mask) | nibbleToSet;
571 }
572
573
574 public GenericDevice[] widcardSearch(int bcdMaskedId, short bcdMaskedMan, byte bcdMaskedVersion, byte bcdMaskedMedium, int maxTries)
575 throws IOException, InterruptedException {
576 List<GenericDevice> result = new ArrayList<GenericDevice>();
577 log.fine(String.format("widcardSearch bcdMaskedId: 0x%08X", bcdMaskedId));
578 int answers = sendSlaveSelect(bcdMaskedId, bcdMaskedMan, bcdMaskedVersion, bcdMaskedMedium, maxTries);
579 if (answers == 0) {
580 log.fine(String.format("no slave with mask: 0x%08X", bcdMaskedId));
581 } else if (answers == 1) {
582 log.fine(String.format("detect slave with mask: 0x%08X", bcdMaskedId));
583 GenericDevice dev = addDeviceByAddress(MBusUtils.SLAVE_SELECT_PRIMARY_ADDRESS);
584 result.add(dev);
585 } else {
586 log.fine(String.format("multiple slaves (%d) with mask: 0x%08X", answers, bcdMaskedId));
587 int leftmostMaskedNibble = getLeftmostMaskedNibble(bcdMaskedId);
588 if (leftmostMaskedNibble >= 0) {
589 for (int i = 0; i <= 9; i++) {
590 GenericDevice[] devs = widcardSearch(exchangeNibbleAtPos(leftmostMaskedNibble, bcdMaskedId, i), bcdMaskedMan, bcdMaskedVersion, bcdMaskedMedium, maxTries);
591 result.addAll(Arrays.asList(devs));
592 }
593 } else {
594 log.fine(String.format("Can't separate slaves (%d) with id: 0x%08X", answers,
595 bcdMaskedId));
596 }
597 }
598 return result.toArray(new GenericDevice[result.size()]);
599 }
600
601 public UserDataResponse readResponseBySecondary(int bcdId, String man, Byte version, MBusMedium medium, int maxTries) throws IOException, InterruptedException {
602 final short bcdMan = (man == null || man.length() == 0) ? (short) 0xFFFF : MBusUtils.man2Short(man);
603 byte bcdVersion = (version == null) ? (byte) 0xFF : version;
604 byte bcdMedium = (medium == null) ? (byte) 0xFF : (byte) medium.getId();
605
606 if (selectDevice(bcdId, bcdMan, bcdVersion, bcdMedium, maxTries)) {
607 return readResponse(MBusUtils.SLAVE_SELECT_PRIMARY_ADDRESS);
608 } else {
609 return null;
610 }
611 }
612
613 public UserDataResponse readResponse(byte address) throws IOException, InterruptedException {
614 Frame fi = sendInitSlave(address);
615 Frame f = sendRequestUserData(address);
616 if (f instanceof UserDataResponse) {
617 return (UserDataResponse) f;
618 } else {
619
620 return null;
621 }
622 }
623
624 public boolean selectDevice(int bcdMaskedId, short maskedMan, byte maskedVersion, byte maskedMedium, int maxTries) throws IOException, InterruptedException {
625 int answers = sendSlaveSelect(bcdMaskedId, maskedMan, maskedVersion, maskedMedium, maxTries);
626 if (answers > 1) {
627 log.warning(String.format("Can't select select (too many) Slave: id=0x%08X, man=0x%04X, ver=0x%02X, medium=0x%02X", bcdMaskedId, maskedMan, maskedVersion, maskedMedium));
628 } else if (answers == 0) {
629 log.warning(String.format("Can't select select (none) Slave: id=0x%08X, man=0x%04X, ver=0x%02X, medium=0x%02X", bcdMaskedId, maskedMan, maskedVersion, maskedMedium));
630 }
631 return answers == 1;
632 }
633
634 public boolean selectDevice(MBusResponseFramesContainer dev) throws IOException, InterruptedException {
635 return selectDevice(MBusUtils.int2Bcd(dev.getIdentNumber()),
636 MBusUtils.man2Short(dev.getManufacturer()),
637 dev.getVersion(),
638 (byte) dev.getMedium().getId(), DEFAULT_SEND_TRIES);
639 }
640
641 public void readValues(ValueRequest<?> requests)
642 throws IOException, InterruptedException {
643
644 HashMap<GenericDevice, MBusAddressing> result = new HashMap<>();
645
646 for (ValueRequestPointLocator locator : requests) {
647 GenericDevice myDevice = getDevice(devices, locator);
648
649 if (myDevice == null) {
650 myDevice
651 = DeviceFactory.createDevice(locator.getAddress(),
652 locator.getManufacturer(),
653 locator.getMedium(),
654 locator.getVersion(),
655 locator.getIdentnumber());
656 addDevice(myDevice);
657 }
658
659 result.put(myDevice, locator.getAddressing());
660 }
661
662
663
664 Map<GenericDevice, Frame> responses = sendRequestUserData(result);
665
666
667 for (ValueRequestPointLocator locator : requests) {
668 GenericDevice dev = getDevice(result.keySet(), locator);
669 DataBlock db = getDataBlock(responses.get(dev),
670 locator);
671 locator.setDb(db);
672 db = getTimeStampDB(dev, locator);
673 locator.setTimestampDb(db);
674 }
675 }
676
677 public static MBusMaster readJsonStream(InputStream is)
678 throws IOException {
679 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
680 StringBuilder sb = new StringBuilder();
681 String line;
682
683 while ((line = br.readLine()) != null) {
684 sb.append(line);
685 }
686
687 MBusMaster result = new MBusMaster();
688 result.fromJSON(JSONObject.fromObject(sb.toString()));
689
690 return result;
691 }
692
693 @Override
694 public void fromJSON(JSONObject json) {
695 conn = Connection.createFromJSON(json);
696
697 JSONArray jsonDevices = json.getJSONArray("devices");
698
699 for (int i = 0; i < jsonDevices.size(); i++) {
700 GenericDevice device = new GenericDevice();
701 device.fromJSON(jsonDevices.getJSONObject(i));
702 addDevice(device);
703 }
704 }
705
706 @Override
707 public JSONObject toJSON(JsonSerializeType jsonSerializeType) {
708 JSONObject result = new JSONObject();
709
710 result.accumulate(conn.getJsonFieldName(), conn.toJSON(jsonSerializeType));
711
712 JSONArray jsonDevices = new JSONArray();
713
714 for (GenericDevice device : devices) {
715 jsonDevices.add(device.toJSON(jsonSerializeType));
716 }
717
718 result.accumulate("devices", jsonDevices);
719
720 return result;
721 }
722 }