Skip to content
Snippets Groups Projects
Commit 5efbc8c9 authored by Server (Shared Users)'s avatar Server (Shared Users)
Browse files

delete testfile

parent b0e3479c
Branches main
No related tags found
No related merge requests found
import subprocess
import socket
import sys
import time
import bluetooth._bluetooth as bluez
from time import sleep
bct_BLUETOOTHDEVICE = "hci0"
bct_OGF = "0x08"
bct_OCF_format = "0x0008"
bct_OCF_setting = "0x0006"
bct_OCF_operate = "0x000A"
bct_start = "01"
bct_stop = "00"
bct_IBEACONPROFIX = "1E 02 01 1A 1A FF 4C 00 02 15"
bct_UUID = " 00 00 00 AC E8 B4 E0 C2 7D 20 B6 11 B6 11 C7 74"
bct_MAJOR = "00 01"
bct_MINOR = "00 02"
bct_POWER = "CE 00"
class BeaconEmit():
def __init__(self):
self.beacon_TX_config("up")
self.beacon_TX_cmd_format(bct_OCF_format, bct_IBEACONPROFIX, bct_UUID, bct_MAJOR, bct_MINOR, bct_POWER)
self.beacon_TX_cmd_setting(bct_OCF_setting, 100)
self.beacon_TX_cmd_operate(bct_OCF_operate, bct_start)
def beacon_TX_config(self, _param):
result = subprocess.check_output("sudo hciconfig " + bct_BLUETOOTHDEVICE + " " + _param, shell=True)
def beacon_TX_cmd_format(self, _ocf, _ibeaconprofix, _uuid, _major, _minor, _power):
_bct_ogf = bct_OGF + " "
_ocf = _ocf + " "
_ibeaconprofix = _ibeaconprofix + " "
_uuid = _uuid + " "
_major = _major + " "
_minor = _minor + " "
result = subprocess.check_output("sudo hcitool -i " + bct_BLUETOOTHDEVICE + " cmd " + _bct_ogf + _ocf + _ibeaconprofix + _uuid + _major + _minor + _power, shell=True)
def beacon_TX_cmd_setting(self, _ocf, _interval):
_bct_ogf = bct_OGF + " "
_ocf = _ocf + " "
_intervalHEX = '{:04X}'.format(int(_interval/0.625))
_minInterval = _intervalHEX[2:] + " " + _intervalHEX[:2] + " "
_maxInterval = _intervalHEX[2:] + " " + _intervalHEX[:2] + " "
result = subprocess.check_output("sudo hcitool -i " + bct_BLUETOOTHDEVICE + " cmd " + _bct_ogf + _ocf + _maxInterval + _maxInterval + "00 00 00 00 00 00 00 00 00 07 00", shell=True)
def beacon_TX_cmd_operate(self, _ocf, _param):
_bct_ogf = bct_OGF + " "
_ocf = _ocf + " "
result = subprocess.check_output("sudo hcitool -i " + bct_BLUETOOTHDEVICE + " cmd " + _bct_ogf + _ocf + _param, shell=True)
time.sleep(0.1)
def beacon_TX_DevTrigger(self, _str):
_bct_uuid = _str
self.beacon_TX_cmd_format(bct_OCF_format, bct_IBEACONPROFIX, _bct_uuid, bct_MAJOR, bct_MINOR, bct_POWER)
def beacon_Start(self, _str):
try:
startTime = time.time()
repeatTime = 60
print ("BLE EMIT START")
while True:
if time.time() - startTime > repeatTime:
break
self.beacon_TX_DevTrigger(_str)
self.beacon_TX_cmd_operate(bct_OCF_operate, bct_stop)
print ("BLE EMIT STOP")
finally:
pass
#!/usr/bin/env python
import getopt
import socket
import sys
import re, uuid
import time
import threading
import json
import bluetooth._bluetooth as bluez
import blescan
import _thread
import math
from testBeacon import BeaconEmit
from coapthon.client.helperclient import HelperClient
from coapthon.utils import parse_uri
__author__ = 'Giacomo Tanganelli'
class CoapClient():
def __init__(self): # complete
self.ip = "shmd01.iptime.org"
self.myMAC = ':'.join(re.findall('..', '%012x' % uuid.getnode()))
self.clearMyData()
self.myuuid, self.operate = self.obtainMyID()
if self.operate == 1: # rpNode가 이미 한 개 이상존재하는 경우
self._beacon = BeaconEmit()
self.advertiseMe()
def clearMyData(self): # complete
path = "/rpNodeInfoPath"
path = "coap://" + self.ip + path
host, port, path = parse_uri(path)
try:
tmp = socket.gethostbyname(host)
host = tmp
except socket.gaierror:
pass
client = HelperClient(server=(host, port))
try:
payload = {
'option' : -1, #clear my data from Server
'rpMAC' : self.myMAC
}
response = client.put(path, json.dumps(payload))
print((response.pretty_print()))
except KeyboardInterrupt:
print("obtainMyID Stop")
client.stop()
client.stop()
def obtainMyID(self): # complete
path = "/rpNodeInfoPath"
path = "coap://" + self.ip + path
host, port, path = parse_uri(path)
try:
tmp = socket.gethostbyname(host)
host = tmp
except socket.gaierror:
pass
client = HelperClient(server=(host, port))
try:
payload = {
'option' : 0, #obtain rpuuid from Server
'rpMAC' : self.myMAC
}
response = client.put(path, json.dumps(payload))
print((response.pretty_print()))
string = json.loads(response.payload)['rpuuid']
except KeyboardInterrupt:
print("obtainMyID Stop")
client.stop()
client.stop()
return string, json.loads(response.payload)['operate']
def advertiseMe(self): # complete
seq = self.myuuid
length = 3
string =""
for i in range(0, len(seq), 2):
string += seq[i:i+2]+" "
print(string)
print(map(''.join, zip(*[iter(seq)]*length)))
self._beacon.beacon_Start(string)
def calculateNodePositionAtServer(self): # complete
path = "/rpNodeInfoPath"
path = "coap://" + self.ip + path
host, port, path = parse_uri(path)
try:
tmp = socket.gethostbyname(host)
host = tmp
except socket.gaierror:
pass
client = HelperClient(server=(host, port))
try:
payload = {
'option' : 1, # obtain my position from Server.
'rpuuid' : self.myuuid
}
response = client.put(path, json.dumps(payload))
print((response.pretty_print()))
if json.loads(response.payload)['operate'] == 0: # Postion 계산이 불가능한 경우
print("impossible position calculation")
except KeyboardInterrupt:
print("obtainMyID Stop")
client.stop()
client.stop()
def putBLEInfo(self): # complete
dev_id = 0
try:
self.sock = bluez.hci_open_dev(dev_id)
print("ble thread started")
except:
print("error accessing bluetooth device...")
sys.exit(1)
blescan.hci_le_set_scan_parameters(self.sock)
blescan.hci_enable_le_scan(self.sock)
path = "/bleInfoPath"
payload = []
path = "coap://" + self.ip + path
host, port, path = parse_uri(path)
try:
tmp = socket.gethostbyname(host)
host = tmp
except socket.gaierror:
pass
client = HelperClient(server=(host, port))
try:
while True :
payload = []
returnedList = blescan.parse_events(self.sock, 50)
for beacon in returnedList:
beaconInfo = [str(x) for x in beacon.split(',') if x.strip()]
if beaconInfo[1][:8] == "52528282":
print("scanned : "+ str(beaconInfo))
regiRelThread = threading.Thread(target = self.regiRelWithNewPi, args=([beaconInfo]))
regiRelThread.start()
continue
jsonpayload = self.mkJsonbeaconInfo(beaconInfo)
if jsonpayload['distance'] < 60:
payload.append(jsonpayload)
response = client.put(path, json.dumps(payload))
print((response.pretty_print()))
except KeyboardInterrupt:
print("PutBLEInfo Stop")
client.stop()
client.stop()
def mkJsonbeaconInfo(self, info): # complete
n = 2.05
distance = math.pow(10, (float(info[4]) - float(info[5])) / (10 * n))
payload = {
'rpuuid' : self.myuuid,
'userMAC' : info[0],
'useruuid' : info[1],
'distance' : distance,
'updateTime' : time.time()
}
return payload
def regiRelWithNewPi(self, info): # complete
#새로운 rasbPi를 찾은 경우 새로운 rasbPi를 위치측정기로 사용하기 위해서 server에 관련 정보를 저장하는 method
path = "/rpGraphInfoPath"
path = "coap://" + self.ip + path
host, port, path = parse_uri(path)
try:
tmp = socket.gethostbyname(host)
host = tmp
except socket.gaierror:
pass
client = HelperClient(server=(host, port))
try:
response = client.put(path, json.dumps(self.mkJsonRpgraphInfo(info)))
print((response.pretty_print()))
except KeyboardInterrupt:
print("obtainMyID Stop")
client.stop()
client.stop()
def mkJsonRpgraphInfo(self, info): # complete
n = 2.05
distance = math.pow(10, ((float(info[4]) - float(info[5])) / (10 * n)))
payload = {
'v1' : self.myuuid,
'v2' : info[1],
'rssi' : float(info[5])
}
return payload
def main():
measuringDevice = CoapClient()
measuringDevice.calculateNodePositionAtServer()
putBLEInfoThread = threading.Thread(target = measuringDevice.putBLEInfo)
putBLEInfoThread.start()
putBLEInfoThread.join()
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment