BowlerKernel
BowlerAbstractConnection.java
Go to the documentation of this file.
1 /*******************************************************************************
2  * Copyright 2010 Neuron Robotics, LLC
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  ******************************************************************************/
32 package com.neuronrobotics.sdk.common;
33 
34 import java.io.DataInputStream;
35 import java.io.DataOutputStream;
36 import java.io.IOException;
37 import java.util.ArrayList;
38 import java.util.concurrent.locks.ReentrantLock;
39 
40 import javax.management.RuntimeErrorException;
41 
42 import com.neuronrobotics.sdk.commands.bcs.core.NamespaceCommand;
43 import com.neuronrobotics.sdk.commands.bcs.core.PingCommand;
44 import com.neuronrobotics.sdk.commands.bcs.core.RpcArgumentsCommand;
45 import com.neuronrobotics.sdk.commands.bcs.core.RpcCommand;
46 import com.neuronrobotics.sdk.util.ThreadUtil;
47 
48 
49 
50 
51 // TODO: Auto-generated Javadoc
57 public abstract class BowlerAbstractConnection {
58 
60  //private boolean threadedUpstreamPackets=false;
61  private boolean useThreadedStack=true;
62 
64  private int sleepTime = 1000;
65 
67  private long lastWrite = -1;
68 
70  private long heartBeatTime=1000;
71 
73  private int chunkSize = 64;
74 
75 
77  private BowlerDatagram response = null;
78 
80  private ArrayList<IBowlerDatagramListener> listeners = new ArrayList<IBowlerDatagramListener>();
81 
83  ArrayList<IConnectionEventListener> disconnectListeners = new ArrayList<IConnectionEventListener> ();
84 
87 
89  private QueueManager syncQueue = null;
90 
92  private QueueManager asyncQueue=null;
93 
95  private boolean connected = false;
96 
98  private DataInputStream dataIns;
99 
101  private DataOutputStream dataOuts;
102 
103  //private Updater updater = null;
104 
105 
107  private ArrayList<NamespaceEncapsulation> namespaceList=null;
108 
110  private ArrayList<String> nameSpaceStrings = null;
111 
113  private boolean beater = false;
114  //private ReentrantLock executingLock = new ReentrantLock();
115 
116 
122  abstract public boolean connect();
123 
129  //abstract public boolean reconnect() throws IOException;
130 
136  abstract public boolean waitingForConnection();
137 
143  public void setThreadedUpstreamPackets(boolean up){
144  //threadedUpstreamPackets=up;
145  }
146 
147 
157  return sendSynchronusly(sendable,false);
158  }
159 
169  public synchronized BowlerDatagram sendSynchronusly(BowlerDatagram sendable, boolean switchParser){
170 
171  if(!isConnected()) {
172  Log.error("Can not send message because the engine is not connected.");
173  return null;
174  }
175  //executingLock.lock();
177  try {
178  long send = System.currentTimeMillis();
179  sendable.setUpstream(false);
180  Log.info("\nT>>"+sendable);
181  write(sendable.getBytes());
182  Log.info("Transmit took: "+(System.currentTimeMillis()-send)+" ms");
183  } catch (IOException e1) {
184  //executingLock.unlock();
185  throw new RuntimeException(e1);
186  }
187  long startOfReciveTime = System.currentTimeMillis();
188 
189 
190  do{
191  if(isUseThreadedStack())
192  ThreadUtil.wait(0,10);
193  else{
195  }
196  }while (((System.currentTimeMillis()-startOfReciveTime)<getSleepTime()) && (getLastSyncronousResponse() == null));
197  long rcvTime = (System.currentTimeMillis()-startOfReciveTime);
198 
199  if(rcvTime>(getSleepTime()*getPercentagePrint() /100) ){
200  Log.warning("Receive took: "+rcvTime +" ms. This is greater then "+getPercentagePrint() +"% of the sleep timeout");
201  }else{
202  Log.info("Receive took: "+rcvTime +" ms");
203  }
204 
205  if (getLastSyncronousResponse() == null){
206  Log.error("No response from device, no response in "+(System.currentTimeMillis()-startOfReciveTime)+" ms");
207  //new RuntimeException().printStackTrace();
208  if(switchParser){
210  //If the ping fails to get a response, try the older bowler format
211  Log.error("Switching to legacy parser");
213  }else{
214 // //If the ping fails to get a response, try the older bowler format
215 // Log.error("Switching to legacy parser");
216 // BowlerDatagram.setUseBowlerV4(true);
217  }
218  }
219  }
222 
223  return b;
224  }
225 
234  public void sendAsync(BowlerDatagram sendable) throws IOException{
235  if(!isConnected()) {
236  //Log.error("Can not send message because the engine is not connected.");
237  return;
238  }
239  sendable.setUpstream(true);
240  try {
241  write(sendable.getBytes());
242  } catch (IOException e1) {
243  Log.error("No response from device...");
244  //reconnect();
245  throw e1;
246  }
247  }
248 
252  public void disconnect(){
253  if(!isConnected()) {
254  return;
255  }
256  Log.info("Disconnecting Bowler Connection");
258  t.setStartTime(100);
259  while(!t.isTimedOut()){
260  try {
261  if(dataIns!=null)
262  getDataIns().read();
263  } catch (NullPointerException e) {
264  // TODO Auto-generated catch block
265  //e.printStackTrace();
266  } catch (IOException e) {
267  // TODO Auto-generated catch block
268  //e.printStackTrace();
269  }
270  }
271  Log.info("Shutting down streams");
272  setConnected(false);
273 
274  }
275 
282  this.sleepTime = sleepTime;
283  if(sleepTime*2>BowlerDatagramFactory.getPacketTimeout())
284  BowlerDatagramFactory.setPacketTimeout(sleepTime*2);
285  Log.warning("Setting the synchronus packet timeout to "+sleepTime);
286  }
287 
288 
294  public int getSleepTime() {
295  return sleepTime;
296  }
297 
298 
304  public long msSinceLastSend() {
305  if(getLastWrite()<0){
306  return 0;
307  }
308  return System.currentTimeMillis() - getLastWrite() ;
309  }
310 
311 
317  public synchronized void setConnected(boolean c) {
318  if(connected == c)
319  return;
320  connected = c;
321  Log.info("Setting connection to "+c);
322  if(connected){
323  setSyncQueue(new QueueManager(true));
324  setAsyncQueue(new QueueManager(false));
325 
326 // if(!ping(new MACAddress())){
327 /* if( BowlerDatagram.isUseBowlerV4()){
328  //If the ping fails to get a response, try the older bowler format
329  Log.warning("Switching to legacy parser");
330  BowlerDatagram.setUseBowlerV4(false);
331  }else{
332  Log.warning("Switching to v4 parser");
333  BowlerDatagram.setUseBowlerV4(true);
334  }
335  if(!ping(new MACAddress())){
336  //neither packet format is working, bail out
337  setConnected(false);
338  }
339  }
340  */
342  Runtime.getRuntime().addShutdownHook(new Thread() {
343  @Override
344  public void run() {
345  if(isConnected()){
346  //System.out.println("WARNING: Bowler devices should be shut down before exit");
347  disconnect();
348  }
349  }
350  });
351 
352 
353  }else{
354  try {
355  if(dataIns !=null)
356  getDataIns().close();
357  } catch (Exception e) {
358  //return;
359  }
360  try {
361  if(dataOuts !=null)
362  getDataOuts().close();
363  } catch (Exception e) {
364  //return;
365  }
366  setDataIns(null);
367  setDataOuts(null);
368  if(getSyncQueue() != null) {
369  getSyncQueue().kill();
370  setSyncQueue(null);
371  }
372  if(getAsyncQueue() != null) {
373  getAsyncQueue().kill();
374 
375  setAsyncQueue(null);
376  }
378  }
379  }
380 
386  public boolean isConnected() {
387  return connected;
388  }
389 
394  response = null;
395  }
396 
403  return response;
404  }
405 
413  public void onDataReceived(BowlerDatagram data) {
414  if(data.isSyncronous()) {
415 
416  if(syncListen!=null){
417  // this is a server and the packet needs to processed
418  getSyncQueue().addDatagram(data);
419  Log.info("Added packet to the response queue");
420  }else{
421  response = data;
422  }
423  }else {
424  getAsyncQueue().addDatagram(data);
425  }
426 
427  }
428 
437  if(datagram.isSyncronous()){
438  if (syncListen!=null){
439  return syncListen.onSyncReceive(datagram);
440  }
441  }
442  return null;
443  }
444 
445 
451  protected void fireAsyncOnResponse(BowlerDatagram datagram) {
452  if(!datagram.isSyncronous()){
454  Log.info("\nASYNC to "+listeners.size()+" listeners<<\n"+datagram);
455  for(int i=0;i<listeners.size();i++) {
457  Log.info("\nASYNC listener: "+l.getClass());
458 
459  try{
460  l.onAsyncResponse(datagram);
461  }catch (Exception ex){
462  ex.printStackTrace();
463  }
464  }
465  }else{
466  //Log.warning("\nASYNC Not ready<<");
467  }
468 
469  }
470 
471  }
472 
473 
480  if(listeners.contains(listener)) {
481  return;
482  }
483  //synchronized(listeners){
484  listeners.add(listener);
485  //}
486  }
487 
494  if(!listeners.contains(listener)) {
495  return;
496  }
497 
498  listeners.remove(listener);
499  }
500 
501 
514  public void setDataIns(DataInputStream dataIns) {
515  this.dataIns = dataIns;
516  }
517 
524  public DataInputStream getDataIns() throws NullPointerException{
525  if(dataIns==null)
526  throw new NullPointerException();
527  return dataIns;
528  }
529 
535  public void setDataOuts(DataOutputStream dataOuts) {
536 
537  this.dataOuts = dataOuts;
538  }
539 
546  public DataOutputStream getDataOuts() throws NullPointerException{
547  if(dataOuts==null)
548  throw new NullPointerException();
549  return dataOuts;
550  }
551 
555  protected void waitForConnectioToBeReady(){
556  if(!waitingForConnection()) {
557  return;
558  }
559 
560  Log.info("Waiting for connection...");
561  long start = System.currentTimeMillis() ;
562  while(true){
563  if(System.currentTimeMillis()> (start + 20000)){
564  break;
565  }
566 
567  if(!waitingForConnection()) {
568  break;
569  }
570 
571  ThreadUtil.wait(10);
572  }
573  Log.info("Connection ready");
574  }
575 
581  public void setChunkSize(int chunkSize) {
582  this.chunkSize = chunkSize;
583  }
584 
590  public int getChunkSize() {
591  return chunkSize;
592  }
593 
600  this.asyncQueue = asyncQueue;
601  if(this.asyncQueue != null && isUseThreadedStack()){
602  this.asyncQueue.start();
603  asyncQueue.setName("Bowler Platform Asynchronus Queue");
604  }
605  }
606 
613  this.syncQueue = syncQueue;
614  if(this.syncQueue != null && isUseThreadedStack()){
615  this.syncQueue.start();
616  syncQueue.setName("Bowler Platform Synchronus Queue");
617  }
618 
619  }
620 
627  return asyncQueue;
628  }
629 
636  return syncQueue;
637  }
638 
639 
640 
641 
648  private void pushUp(BowlerDatagram b) throws IOException{
649  if(b==null)
650  return;
651  b.setFree(false);
652  if(b.isSyncronous()){
654  if(ret !=null){
655  // Sending response to server
656  sendAsync(ret);
657  }
658  }else
660 
661  }
662 
663 
664 
671  if(!disconnectListeners.contains(l)) {
672  disconnectListeners.add(l);
673  }
674  }
675 
682  if(disconnectListeners.contains(l)) {
683  disconnectListeners.remove(l);
684  }
685  }
686 
690  private void fireDisconnectEvent() {
691  for(IConnectionEventListener l:disconnectListeners) {
692  l.onDisconnect(this);
693  }
694  }
695 
699  private void fireConnectEvent() {
700  for(IConnectionEventListener l:disconnectListeners) {
701  l.onConnect(this);
702  }
703  }
704 
711  if (syncListen == null){
712  syncListen = l;
713  }else{
714  if(syncListen == l)
715  return;
716  throw new RuntimeException("There is already a listener "+syncListen);
717  }
718  }
719 
726  if(syncListen!= null){
727  if(syncListen!= l){
728  throw new RuntimeException("There is a different listener "+syncListen);
729  }
730  }
731  syncListen=null;
732  }
733 
742  public RpcEncapsulation locateRpc(String namespace,BowlerMethod method, String rpcString){
744  if(ns.getNamespace().toLowerCase().contains(namespace.toLowerCase())){
745  //found the namespace
746  for(RpcEncapsulation rpc:ns.getRpcList()){
747  if( rpc.getRpc().toLowerCase().contains(rpcString.toLowerCase()) &&
748  rpc.getDownstreamMethod() == method){
749  //Found the command in the namespace
750  return rpc;
751  }
752  }
753  }
754  }
755  return null;
756  }
757 
768  public static BowlerAbstractCommand getCommand(String namespace,BowlerMethod method, String rpcString, Object[] arguments,RpcEncapsulation rpc){
769 
770  if(rpc != null)
771  return rpc.getCommand(arguments);
772 
773  return null;
774 
775  }
776 
786  public Object [] parseResponse(String namespace,BowlerMethod method, String rpcString,BowlerDatagram dg){
787  RpcEncapsulation rpc = locateRpc(namespace, method, rpcString);
788  if(rpc != null)
789  return rpc.parseResponse(dg);//parse and return
790 
791  return new Object [0];
792  }
793 
808  public Object [] send(MACAddress addr,String namespace,BowlerMethod method, String rpcString, Object[] arguments, int retry) throws DeviceConnectionException{
809  if(namespaceList == null){
810  getNamespaces(addr);
811  }
812  RpcEncapsulation rpc = locateRpc(namespace, method, rpcString);
813  BowlerAbstractCommand command = getCommand(namespace, method, rpcString,arguments,rpc);
814 
815  if(command != null){
816  BowlerDatagram dg = send(command,addr,retry);
817  if(dg!=null){
818  addr.setValues(dg.getAddress());
819  }else{
820  throw new BowlerRuntimeException("Device failed to respond");
821  }
822  Object [] en =parseResponse(namespace, method, rpcString,dg);//parse and return
823  BowlerDatagramFactory.freePacket(dg);
824  return en;
825  }
826 
827  Log.error("No method found, attempted "+namespace+" RPC: "+rpcString);
829  Log.error("Namespace \n"+ns);
830  }
831  throw new DeviceConnectionException("Device does not contain command NS="+namespace+" Method="+method+" RPC="+rpcString+"'");
832  }
833 
835  private boolean namespacesFinishedInitializing = false;
836 
838  private double percentagePrint =75.0;
839 
845  public boolean isInitializedNamespaces(){
847  }
848 
855  public ArrayList<String> getNamespaces(MACAddress addr){
856  if(namespaceList == null){
857  namespaceList = new ArrayList<NamespaceEncapsulation>();
858  nameSpaceStrings = new ArrayList<String>();
859  int numTry=0;
860  boolean done=false;
861  while(!done){
862  numTry++;
863  try {
864  BowlerDatagram namespacePacket = send(new NamespaceCommand(0),addr,5);
865  int num;
866  String tmpNs =namespacePacket.getData().asString();
867  if(tmpNs.length() == namespacePacket.getData().size()){
868  //Done with the packet
869  BowlerDatagramFactory.freePacket(namespacePacket);
870  //System.out.println("Ns = "+tmpNs+" len = "+tmpNs.length()+" data = "+b.getData().size());
871  namespacePacket = send(new NamespaceCommand(),addr,5);
872 
873  num= namespacePacket.getData().getByte(0);
874  if(num <=0){
875  Log.error("Not enougn namespaces!"+namespacePacket);
876  }
877  //Done with the packet
878  BowlerDatagramFactory.freePacket(namespacePacket);
879  Log.warning("This is an older implementation of core, depricated");
880  }else{
881  num= namespacePacket.getData().getByte(namespacePacket.getData().size()-1);
882  if(num <=0){
883  Log.error("Not enougn namespaces!"+namespacePacket);
884  }
885  //Done with the packet
886  BowlerDatagramFactory.freePacket(namespacePacket);
887  Log.info("This is the new core");
888  }
889 
890  // if(num<1){
891  // Log.error("Namespace request failed:\n"+namespacePacket);
892  // }else{
893  // Log.info("Number of Namespaces="+num);
894  // }
895 
896 
897  for (int i=0;i<num;i++){
898 
899  BowlerDatagram nsStringPacket= send(new NamespaceCommand(i),addr,5);
900  String space = nsStringPacket.getData().asString();
901  //Done with the packet
902  BowlerDatagramFactory.freePacket(nsStringPacket);
903  Log.debug("Adding Namespace: "+space);
904 
905  namespaceList.add(new NamespaceEncapsulation(space));
906  }
907  Log.debug("There are "+num+" namespaces on this device");
908  Log.debug("Attempting to populate RPC lists for all "+namespaceList.size());
910  getRpcList(ns.getNamespace(),addr);
911  }
912  done = true;
913  } catch (InvalidResponseException e) {
914  Log.error("Invalid response from Namespace");
915  if(numTry>3)
916  throw e;
917 
918  } catch (NoConnectionAvailableException e) {
919  Log.error("No connection is available.");
920  if(numTry>3)
921  throw e;
922  }catch (Exception e) {
923  Log.error("Other exception");
924  e.printStackTrace();
925  if(numTry>3)
926  throw new RuntimeException(e);
927  }
928  if(!done){
929  //failed coms, reset list
930  namespaceList = new ArrayList<NamespaceEncapsulation>();
931  }
932  }
933 
934  }
935 
936  if(nameSpaceStrings.size() != namespaceList.size()){
938  nameSpaceStrings.add(ns.getNamespace());
939  getRpcList(ns.getNamespace(), addr);
940  }
941  }
943  return nameSpaceStrings;
944 
945  }
946 
954  public boolean hasNamespace(String string,MACAddress addr) {
955  if(namespaceList == null)
956  getNamespaces(addr);
958  if(ns.getNamespace().contains(string))
959  return true;
960  }
961  return false;
962  }
963 
971  public ArrayList<RpcEncapsulation> getRpcList(String namespace,MACAddress addr) {
972  int namespaceIndex = 0;
973  boolean hasCoreRpcNS = false;
974 
975  for (int i=0;i<namespaceList.size();i++){
976  if(namespaceList.get(i).getNamespace().contains(namespace)){
977  namespaceIndex=i;
978  }
979  if(namespaceList.get(i).getNamespace().contains("bcs.rpc.*")){
980  hasCoreRpcNS=true;
981  }
982  }
983  if(!hasCoreRpcNS){
984  //this device has no RPC namespace, failing out
985  Log.info("Device has no RPC identification namespace");
986  return new ArrayList<RpcEncapsulation>();
987  }
988  if(namespaceList.get(namespaceIndex).getRpcList()!=null){
989  //fast return if list is already populated
990  return namespaceList.get(namespaceIndex).getRpcList();
991  }
992 
993  try{
994  //populate RPC set
995  BowlerDatagram b = send(new RpcCommand(namespaceIndex),addr,5);
996 
997  if(!b.getRPC().contains("_rpc")){
998  System.err.println(b);
999  throw new RuntimeException("This RPC index request has failed");
1000  }
1001  //int ns = b.getData().getByte(0);// gets the index of the namespace
1002  //int rpcIndex = b.getData().getByte(1);// gets the index of the selected RPC
1003  int numRpcs;
1004  try{
1005  numRpcs = b.getData().getByte(2);// gets the number of RPC's
1006  }catch(IndexOutOfBoundsException e){
1007  e.printStackTrace();
1008  throw new RuntimeException(e.getMessage()+"\r\n"+b);
1009  }
1010  if(numRpcs<1){
1011  Log.error("RPC request failed:\n"+b);
1012  }else{
1013  Log.info("Number of RPC's = "+numRpcs);
1014  }
1015  Log.debug("There are "+numRpcs+" RPC's in "+namespace);
1016  namespaceList.get(namespaceIndex).setRpcList(new ArrayList<RpcEncapsulation>());
1017  for (int i=0;i<numRpcs;i++){
1018  b = send(new RpcCommand(namespaceIndex,i),addr,5);
1019  if(!b.getRPC().contains("_rpc")){
1020  System.err.println(b);
1021  throw new RuntimeException("This RPC section failed");
1022  }
1023  String rpcStr = new String(b.getData().getBytes(3, 4));
1024  //Done with the packet
1025  BowlerDatagramFactory.freePacket(b);
1026  b = send(new RpcArgumentsCommand(namespaceIndex,i),addr,5);
1027  if(!b.getRPC().contains("args")){
1028  System.err.println(b);
1029  throw new RuntimeException("This RPC section failed");
1030  }
1031  byte []data = b.getData().getBytes(2);
1032 
1033  BowlerMethod downstreamMethod = BowlerMethod.get(data[0]);
1034  int numDownArgs = data[1];
1035  BowlerMethod upstreamMethod = BowlerMethod.get(data[numDownArgs+2]);
1036  int numUpArgs = data[numDownArgs+3];
1037 
1038  BowlerDataType [] downArgs = new BowlerDataType[numDownArgs];
1039  BowlerDataType [] upArgs = new BowlerDataType[numUpArgs];
1040 
1041  for(int k=0;k<numDownArgs;k++){
1042  downArgs[k] = BowlerDataType.get(data[k+2]);
1043  }
1044  for(int k=0;k<numUpArgs;k++){
1045  upArgs[k] = BowlerDataType.get(data[k+numDownArgs+4]);
1046  }
1047 
1048  RpcEncapsulation tmpRpc;
1049  try{
1050  tmpRpc = new RpcEncapsulation(namespaceIndex,namespace, rpcStr, downstreamMethod,downArgs,upstreamMethod,upArgs);
1051  }catch (RuntimeException e){
1052  Log.error("Argumet parsing failure!\r\n"+b);
1053  throw e;
1054  }
1055  //Done with the packet
1056  BowlerDatagramFactory.freePacket(b);
1057  Log.debug(tmpRpc.toString());
1058  namespaceList.get(namespaceIndex).getRpcList().add(tmpRpc);
1059  }
1060 
1061  }catch(InvalidResponseException ex){
1062  Log.debug("Older version of core, discovery disabled");
1063  }
1064  return namespaceList.get(namespaceIndex).getRpcList();
1065  }
1066 
1078  return send(command,addr,retry,false);
1079  }
1080 
1092  public BowlerDatagram send(BowlerAbstractCommand command,MACAddress addr, int retry, boolean switchParser) throws NoConnectionAvailableException, InvalidResponseException {
1093  for(int i=0;i<retry;i++){
1094  if(i!=0)
1095  Log.error("Re-sending");
1096  BowlerDatagram ret;
1097  try{
1098  ret = send( command,addr,switchParser);
1099  //System.out.println(ret);
1100  if(ret != null){
1101  addr.setValues(ret.getAddress());
1102  //if(!ret.getRPC().contains("_err"))
1103 
1104  return ret;
1105  }
1106  }catch(MalformattedDatagram e){
1107  Log.error("Sending Synchronus packet and there was a failure, will retry "+(retry-i-1)+" more times");
1108  ThreadUtil.wait(150*i);
1109 
1110  } catch (InvalidResponseException e) {
1111  Log.error("Sending Synchronus packet and there was a failure, will retry "+(retry-i-1)+" more times");
1112  ThreadUtil.wait(150*i);
1113  } catch (NullPointerException e) {
1114  Log.error("Sending Synchronus packet and there was a failure, will retry "+(retry-i-1)+" more times");
1115  ThreadUtil.wait(150*i);
1116  }
1117  // Toggle chackeing for different protocol versions while fail checking
1118  //BowlerDatagram.setUseBowlerV4(i%2==0);
1119 
1120  }
1121  return null;
1122  }
1123 
1124 
1125 
1126 
1137  return send(command,addr,false);
1138  }
1139 
1151 // if(!isConnected()) {
1152 // if(!connect())
1153 // throw new NoConnectionAvailableException();
1154 // }
1155  BowlerDatagram cmd= BowlerDatagramFactory.build(addr, command);
1156  BowlerDatagram back = sendSynchronusly(cmd,switchParser);
1157  if(back!=null){
1158  addr.setValues(back.getAddress());
1159  }
1160 
1161  return command.validate(back);
1162 
1163  //BowlerDatagramFactory.freePacket(cmd);
1164 
1165  }
1166 
1170  private class PingCommand extends BowlerAbstractCommand {
1171 
1175  public PingCommand() {
1177  setOpCode("_png");
1178  }
1179  }
1180 
1188  public boolean ping(MACAddress mac) {
1189 
1190  return ping( mac,false);
1191  }
1192 
1201  public boolean ping(MACAddress mac, boolean switchParser) {
1202  try {
1203  //Log.warning("Ping device:");
1204  BowlerDatagram bd = send(new PingCommand(),mac,5,switchParser);
1205  if(bd !=null){
1206  BowlerDatagramFactory.freePacket(bd);
1207  startHeartBeat();
1208  return true;
1209  }
1210  } catch (InvalidResponseException e) {
1211  Log.error("Invalid response from Ping ");
1212  //e.printStackTrace();
1213  } catch (Exception e) {
1214  Log.error("No connection is available.");
1215  //e.printStackTrace();
1216  }
1217 
1218  return false;
1219  }
1220 
1224  public void startHeartBeat(){
1225  setBeater(true);
1226  }
1227 
1233  public void startHeartBeat(long msHeartBeatTime){
1234  if (msHeartBeatTime<10)
1235  msHeartBeatTime = 10;
1236  heartBeatTime= msHeartBeatTime;
1237  startHeartBeat();
1238  }
1239 
1243  public void stopHeartBeat(){
1244  setBeater(false);
1245  }
1246 
1250  private void runHeartBeat(){
1252  //System.out.println("Heartbeat");
1253  try{
1254  if(!ping(new MACAddress())){
1255  Log.debug("Ping failed, disconnecting");
1256  //disconnect();
1257  }
1258  }catch(Exception e){
1259  Log.debug("Ping failed, disconnecting");
1260  disconnect();
1261  }
1262  }
1263  }
1264 
1270  public double getPercentagePrint() {
1271  return percentagePrint;
1272  }
1273 
1279  public void setPercentagePrint(double percentagePrint) {
1280  this.percentagePrint = percentagePrint;
1281  }
1282 
1288  public long getLastWrite() {
1289  return lastWrite;
1290  }
1291 
1297  public void setLastWrite(long lastWrite) {
1298  this.lastWrite = lastWrite;
1299  }
1300 
1306  private class QueueManager extends Thread {
1307  // stack extends vector and gives thread safety
1309  private ArrayList<BowlerDatagram> queueBuffer = new ArrayList<BowlerDatagram>();
1310 
1313 
1315  private boolean isSystemQueue=false;
1316 
1318  private boolean killSwitch=false;
1319 
1325  public QueueManager(boolean b) {
1326  isSystemQueue = b;
1327  }
1328 
1329 
1330  /* (non-Javadoc)
1331  * @see java.lang.Thread#run()
1332  */
1333  public void run() {
1334  Log.info("Starting the Queue Manager as "+isSystemQueue);
1335  ThreadUtil.wait(100);
1336  while(isConnected() && !killSwitch && isUseThreadedStack()) {
1337 
1338  long start = System.currentTimeMillis();
1339  if(isSystemQueue)
1340  runPacketUpdate();
1341  else{
1342  if(isBeater())
1343  runHeartBeat();
1344 
1345  }
1346  long packetUpdate = System.currentTimeMillis();
1347  if(queueBuffer.isEmpty()){
1348  // prevents thread lock
1349  ThreadUtil.wait(1);
1350  }else{
1351  try{
1352  //send(queueBuffer.remove(queueBuffer.size()-1) );
1353 
1354  BowlerDatagram b = queueBuffer.remove(0);
1355  long pulledPacket = System.currentTimeMillis();
1356  pushUp(b);
1357  if(b!=null){
1358  long pushedPacket = System.currentTimeMillis();
1359 
1360  if((System.currentTimeMillis()-getLastWrite())>(getSleepTime()*(getPercentagePrint() /100.0))&& b.isSyncronous()){
1361  Log.error("Packet recive took more then "+getPercentagePrint()+"%. " +
1362  "\nPacket Update\t"+(packetUpdate- start)+"" +
1363  "\nPulled Packet\t"+(pulledPacket-packetUpdate)+"" +
1364  "\nPushed Packet\t"+(pushedPacket-pulledPacket));
1365  }
1366  }
1367  }catch(Exception e){
1368  e.printStackTrace();
1369  }
1370  }
1371 
1372  int index = queueBuffer.size()-1;
1373  int max = 500;
1374  while(queueBuffer.size()>max){
1375  if(queueBuffer.get(index).isFree()){
1376  Log.error("Removing packet because freed "+queueBuffer.remove(index));
1377  }else{
1378  if(!queueBuffer.get(index).isSyncronous()){
1379  int state = Log.getMinimumPrintLevel();
1381  Log.error("Removing packet from overflow: "+queueBuffer.remove(index));
1382  Log.setMinimumPrintLevel(state);
1383  }else{
1384  index--;
1385  }
1386  }
1387  if(index >= max){
1388  break;
1389  }
1390  }
1391 
1392 
1393  }
1394 
1395  Log.error("Queue Manager thread exited! Connected="+isConnected()+" kill switch="+killSwitch);
1396  //throw new RuntimeException();
1397  }
1398 
1404  private boolean runPacketUpdate() {
1405  try {
1407  if(bd!=null){
1408  Log.info("\nR<<"+bd);
1409  onDataReceived(bd);
1411  }
1412  } catch (Exception e) {
1413  //e.printStackTrace();
1414  if(isConnected()){
1415  Log.error("Data read failed "+e.getMessage());
1416  e.printStackTrace();
1417  disconnect();
1418  //connect();
1419  }
1420  }
1421  return false;
1422  }
1423 
1424 
1430  private void addDatagram(BowlerDatagram dg) {
1431  queueBuffer.add(dg);
1432  }
1433 
1437  public void kill() {
1438  killSwitch=true;
1439  //new RuntimeException("Killing the Queue").printStackTrace();
1440  }
1441  }
1442 
1448  public boolean isUseThreadedStack() {
1449  return useThreadedStack;
1450  }
1451 
1457  public void setUseThreadedStack(boolean useThreadedStack) {
1458  this.useThreadedStack = useThreadedStack;
1459  }
1460 
1466  public boolean isBeater() {
1467  return beater;
1468  }
1469 
1475  public void setBeater(boolean beater) {
1476  this.beater = beater;
1477  }
1478 
1487  public BowlerDatagram loadPacketFromPhy(ByteList bytesToPacketBuffer) throws NullPointerException, IOException{
1488  BowlerDatagram bd=BowlerDatagramFactory.build(bytesToPacketBuffer);
1489  if(dataIns!=null){
1490  int have,b,ret =0;
1491  try{
1492  synchronized (dataIns) {
1493  have = getDataIns().available();
1494  }
1495  if(have==0)
1496  return null;
1497  }catch (IOException e){
1498  //Log.enableErrorPrint();
1499  Log.error("IO Error "+e.getMessage());
1500  throw e;
1501  }
1502 
1503  for(b=0;b<have;b++){
1504  if(bd!=null)
1505  Log.error("Adding "+(have-b-1)+" after packet found");
1506  synchronized (dataIns) {
1507  ret = getDataIns().read();
1508  }
1509 
1510  if(ret<0){
1511  Log.error("Stream is broken - unexpected: claimed to have "+have+" bytes, read in "+b);
1512  //reconnect();
1513  //something went wrong
1514  new RuntimeException(" Buffer attempted to read "+have+" got "+b).printStackTrace();
1515  return null;
1516  }else{
1517  bytesToPacketBuffer.add(ret);
1518  if(bd==null)
1519  bd = BowlerDatagramFactory.build(bytesToPacketBuffer);
1520  if(bd!=null)
1521  return bd;
1522 
1523  }
1524 
1525  }
1526 
1527  //ThreadUtil.wait(1);
1528 
1529  }else{
1530  Log.error("Input stream is null");
1531  }
1532  return bd;
1533  }
1534 
1541  //private ByteList outgoing = new ByteList();
1542  public void write(byte[] data) throws IOException {
1544  setLastWrite(System.currentTimeMillis());
1545  if(dataOuts != null){
1546  try{
1547  //Log.info("Writing: "+data.length+" bytes");
1548  ByteList outgoing = new ByteList(data);
1549 
1550  while(outgoing.size()>0){
1551  byte[] b =outgoing.popList(getChunkSize());
1552  //System.out.println("Writing "+new ByteList(data));
1553  getDataOuts().write( b );
1554  getDataOuts().flush();
1555  }
1556  }catch (Exception e){
1557  //e.printStackTrace();
1558  Log.error("Write failed. "+e.getMessage());
1559  //reconnect();
1560  }
1561  }else{
1562  Log.error("No data sent, stream closed");
1563  }
1564 
1565  }
1566 
1567 
1568 
1569 }
BowlerDatagram loadPacketFromPhy(ByteList bytesToPacketBuffer)
boolean ping(MACAddress mac, boolean switchParser)
Object[] send(MACAddress addr, String namespace, BowlerMethod method, String rpcString, Object[] arguments, int retry)
void removeSynchronousDatagramListener(ISynchronousDatagramListener l)
RpcEncapsulation locateRpc(String namespace, BowlerMethod method, String rpcString)
Object[] parseResponse(String namespace, BowlerMethod method, String rpcString, BowlerDatagram dg)
static BowlerAbstractCommand getCommand(String namespace, BowlerMethod method, String rpcString, Object[] arguments, RpcEncapsulation rpc)
BowlerDatagram send(BowlerAbstractCommand command, MACAddress addr, int retry)
void setSynchronousDatagramListener(ISynchronousDatagramListener l)
BowlerDatagram send(BowlerAbstractCommand command, MACAddress addr, int retry, boolean switchParser)
ArrayList< RpcEncapsulation > getRpcList(String namespace, MACAddress addr)
BowlerDatagram send(BowlerAbstractCommand command, MACAddress addr)
synchronized BowlerDatagram sendSynchronusly(BowlerDatagram sendable, boolean switchParser)
BowlerDatagram send(BowlerAbstractCommand command, MACAddress addr, boolean switchParser)
static void setUseBowlerV4(boolean useBowlerV4)
static void enableErrorPrint()
Definition: Log.java:225
static void info(String message)
Definition: Log.java:110
static void error(String message)
Definition: Log.java:92
static void setMinimumPrintLevel(int level)
Definition: Log.java:236
static void warning(String message)
Definition: Log.java:101
static void debug(String message)
Definition: Log.java:128
static int getMinimumPrintLevel()
Definition: Log.java:246
Object[] parseResponse(BowlerDatagram datagram)
BowlerAbstractCommand getCommand(Object[] doswnstreamData)
static BowlerDataType get(byte code)
static BowlerMethod get(byte code)
BowlerDatagram onSyncReceive(BowlerDatagram data)