jdz/gps08.py
2024-07-22 18:25:16 -06:00

128 lines
3.4 KiB
Python

#!/usr/bin/python
# -*- coding:utf-8 -*-
import RPi.GPIO as GPIO
import paho.mqtt.client as mqtt
import serial
import time
import os
import csv
from datetime import datetime
# MQTT Broker settings
mqttBroker = "65.108.199.212"
myhost = os.uname()[1]
client = mqtt.Client(myhost)
ser = serial.Serial('/dev/ttyS0', 115200)
ser.flushInput()
rec_buff = ''
time_count = 0
# GPIO setup
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
gpio_initialized = False
def setup_gpio(power_key):
global gpio_initialized
if not gpio_initialized:
GPIO.setup(power_key, GPIO.OUT)
gpio_initialized = True
def send_at(command, back, timeout):
rec_buff = ''
ser.write((command+'\r\n').encode())
time.sleep(timeout)
if ser.inWaiting():
time.sleep(0.01)
rec_buff = ser.read(ser.inWaiting())
if rec_buff != '':
if back not in rec_buff.decode():
print(command + ' ERROR')
print(command + ' back:\t' + rec_buff.decode())
return 0
else:
gps_data = rec_buff.decode()[13:-36]
print(gps_data)
client.publish("iiot/" + myhost + "/gps", gps_data)
return gps_data
else:
print('GPS no está listo')
return 0
def parse_gps_data(gps_data):
parts = gps_data.split(',')
if len(parts) >= 4:
lat = float(parts[0][:2]) + float(parts[0][2:]) / 60
lon = float(parts[2][:3]) + float(parts[2][3:]) / 60
if parts[1] == 'S':
lat = -lat
if parts[3] == 'W':
lon = -lon
return lat, lon
return None, None
def write_to_csv(lat, lon):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open('gps_data.csv', 'a', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([timestamp, lat, lon])
def get_gps_position():
rec_null = True
print('Iniciando GPS')
send_at('AT+CGPS=1,1', 'OK', 1)
time.sleep(2)
while rec_null:
gps_data = send_at('AT+CGPSINFO', '+CGPSINFO: ', 1)
if gps_data and gps_data != 0:
if ',,,,,,' not in gps_data:
lat, lon = parse_gps_data(gps_data)
if lat is not None and lon is not None:
write_to_csv(lat, lon)
print(f"GPS data written to CSV: {lat}, {lon}")
rec_null = False
else:
print('GPS no está listo')
else:
print('error')
send_at('AT+CGPS=0', 'OK', 1)
return False
time.sleep(1.5)
def power_on(power_key):
print('SIM7600X is starting:')
setup_gpio(power_key)
time.sleep(0.1)
GPIO.output(power_key, GPIO.HIGH)
time.sleep(2)
GPIO.output(power_key, GPIO.LOW)
time.sleep(20)
ser.flushInput()
print('SIM7600X is ready')
def power_down(power_key):
if gpio_initialized:
print('SIM7600X is loging off:')
GPIO.output(power_key, GPIO.HIGH)
time.sleep(3)
GPIO.output(power_key, GPIO.LOW)
time.sleep(18)
print('Good bye')
try:
power_key = 6 # Replace with your actual power key GPIO pin number
power_on(power_key)
get_gps_position()
power_down(power_key)
except Exception as e:
print(f"An error occurred: {e}")
finally:
if ser is not None:
ser.close()
if gpio_initialized:
GPIO.cleanup()
print("Script execution completed.")