Multi-lists support

Code refactor and clean
This commit is contained in:
Paul 2016-03-14 16:34:11 +01:00
parent d29fb44172
commit 61d9186789
4 changed files with 245 additions and 125 deletions

1
.gitignore vendored
View File

@ -60,3 +60,4 @@ target/
# Config files # Config files
config.py config.py
mailbox.csv

View File

@ -1,12 +1,10 @@
smshost="192.168.1.100" smshost = "192.168.1.100"
smsusername="admin" smsusername = "admin"
smspassword="admin" smspassword = "admin"
smsrecipients=["0660066006","0620022002"] smssize = 160
smssize=160 smstemplate = "Nouveau mail de %s. Object %s. Message ..."
smstemplate="Nouveau mail de %s. Object %s. Message ..." smsformat = "ascii"
mailboxserver="smtp.domain.net" mailboxes = "/home/paul/PycharmProjects/smsgateway/mailbox.csv"
mailboxlogin="charlie@domain.net"
mailboxpassword="charlieroot"
pidfile='/run/lock/smsgateway.pid' pidfile = "/run/lock/smsgateway.pid"

3
mailbox.csv.sample Normal file
View File

@ -0,0 +1,3 @@
# mail, password, number1, number2, number3, ...
mailhost.test.com,test1@test.com,passwordtest1,0606060606,0602020202
mailhost.test.com,test2@test.com,passwordtest2,0604040404,0603030303

View File

@ -8,138 +8,256 @@ import telnetlib
import config import config
import fcntl import fcntl
import sys import sys
import unicodedata
from email.header import decode_header from email.header import decode_header
from messaging.sms import SmsSubmit from messaging.sms import SmsSubmit
def fetch_unread_mails():
"""Fetch unread emails on specific mailbox, and returns some fields"""
mail = imaplib.IMAP4_SSL(config.mailboxserver)
mail.login(config.mailboxlogin,config.mailboxpassword)
mail.list()
status,messages = mail.select("INBOX")
mails=[] def csv_config_parser(mailboxes):
params = []
with open(mailboxes) as f:
for line in f:
if "#" in line:
pass
else:
tmp_str = line.strip("\n").split(",")
params.append(tmp_str)
return params
n=0
retcode, messages = mail.search(None, '(UNSEEN)')
if retcode == 'OK':
for num in messages[0].split():
n=n+1
typ,data = mail.fetch(num,'RFC822')
for response_part in data:
if isinstance(response_part,tuple):
original = email.message_from_string(response_part[1])
mailfrom = original['From']
if "<" in mailfrom:
mailfrom = mailfrom.split('<')[1].rstrip('>')
mailsubject = original['Subject']
mailsubject = decode_header(mailsubject)
default_charset = 'ASCII'
mailsubject=''.join([ unicode(t[0], t[1] or default_charset) for t in mailsubject ])
mails.append([mailfrom,mailsubject])
return mails
def clearallsms(): def fetch_unread_mails(mailboxserver, mailboxlogin, mailboxpassword):
"""Clears all stored SMS on Portech like gateways""" """
try: Fetch unread emails on specific mailbox, and returns some fields
count=0 :param mailboxserver:
tn = telnetlib.Telnet(config.smshost,23) :param mailboxlogin:
tn.read_until("username: ") :param mailboxpassword:
tn.write(config.smsusername + "\r\n") :return str:
tn.write(config.smspassword + "\r\n") """
tn.read_until("user level = admin.") mail = imaplib.IMAP4_SSL(mailboxserver)
tn.write("module1\r\n") mail.login(mailboxlogin, mailboxpassword)
tn.read_until("got!! press 'ctrl-x' to release module 1.") mail.list()
while count<100: mail.select("INBOX")
tn.write("AT+CMGD="+str(count)+"\r\n")
count=count+1
tn.read_until("\r\n")
tn.close()
except:
print("Unexpected error:", sys.exc_info()[0])
raise
def formatsms(message): mails = []
"""Strip SMS if longer than config.smssize"""
if len(message) > config.smssize:
message = message[:config.smssize]
return message
def imap2sms(sender,subject): n = 0
"""Uses a template to make a short message from email fields""" returncode, messages = mail.search(None, '(UNSEEN)')
sms=config.smstemplate % (sender,subject) if returncode == 'OK':
return sms for num in messages[0].split():
n += 1
typ, data = mail.fetch(num, 'RFC822')
for response_part in data:
if isinstance(response_part, tuple):
original = email.message_from_string(response_part[1])
mailfrom = original['From']
if "<" in mailfrom:
mailfrom = mailfrom.split('<')[1].rstrip('>')
mailsubject = original['Subject']
mailsubject = decode_header(mailsubject)
default_charset = 'ASCII'
mailsubject = ''.join([unicode(t[0], t[1] or default_charset) for t in mailsubject])
mails.append([mailfrom, mailsubject])
return mails
def pduformat(phonenumber,message):
"""Formats SMS using pdu encoding"""
sms = SmsSubmit(phonenumber, message)
pdu = sms.to_pdu()[0]
pdustring=pdu.pdu
pdulength=pdu.length
# debug output
#print(phonenumber, message)
#print(pdu.length, pdu.pdu)
return pdustring,pdulength
def sendsms(pdustring,pdulenght): def clear_all_sms():
"""Send SMS using telnetlib, returns exception when issues with telnet communication""" """
try: Clears all stored SMS on Portech like gateways
time.sleep(2) :return: None
tn = telnetlib.Telnet(config.smshost,23) """
tn.read_until("username: ") try:
tn.write(config.smsusername + "\r\n") count = 0
tn.write(config.smspassword + "\r\n") tn = telnetlib.Telnet(config.smshost, 23)
tn.read_until("user level = admin.") tn.read_until("username: ")
tn.write("state1\r\n") tn.write(config.smsusername + "\r\n")
tn.read_until("module 1: free.\r\n]") tn.write(config.smspassword + "\r\n")
tn.write("module1\r\n") tn.read_until("user level = admin.")
tn.read_until("got!! press 'ctrl-x' to release module 1.") tn.write("module1\r\n")
tn.write("AT+CMGF=0\r\n") tn.read_until("got!! press 'ctrl-x' to release module 1.")
tn.read_until("0\r\n") while count < 100:
tn.write('AT+CMGS=%s\r\n' % pdulength) tn.write("AT+CMGD=" + str(count) + "\r\n")
tn.read_until("> ") count += 1
tn.write("%s\x1A" % pdustring) tn.read_until("\r\n")
tn.read_until("+CMGS") tn.close()
tn.close() except:
except: print("Unexpected error:", sys.exc_info()[0])
print("Unexpected error:", sys.exc_info()[0]) raise
raise
def resize_ascii_sms(message):
"""
Strip message if longer than config.smssize
"""
value = config.smssize-3
if len(message) > value:
message = message[:value]
return message
def resize_pdu_sms(message):
"""
Strip SMS if longer than config.smssize
:param message:
:return: str
"""
if len(message) > config.smssize:
message = message[:config.smssize]
return message
def sms_template(sender, subject):
"""
Uses a template to make a short message from email fields
:param sender: str
:param subject: str
:return:
"""
text = config.smstemplate % (sender, subject)
return text
def imap2sms(conf):
for list in conf:
username = list[0]
password = list[1]
mailserver = list[2]
numbers = []
i = 3
while i < len(list):
numbers.append(list[i])
i += 1
mails = fetch_unread_mails(username, password, mailserver)
for number in numbers:
for mail in mails:
sender = mail[0]
subject = mail[1]
if config.smsformat == "pdu":
sms = resize_pdu_sms(sms_template(sender, subject))
pdustring, pdulength = pduformat(number, sms)
send_pdu_sms(pdustring, pdulength)
elif config.smsformat == "ascii":
sms = resize_ascii_sms(sms_template(sender, subject))
send_ascii_sms(number, sms)
def pduformat(phonenumber, message):
"""
Formats SMS using pdu encoding
"""
sms = SmsSubmit(phonenumber, message)
pdu = sms.to_pdu()[0]
pdustring = pdu.pdu
pdulength = pdu.length
# debug output
# print(phonenumber, message)
# print(pdu.length, pdu.pdu)
return pdustring, pdulength
def send_ascii_sms(phonenumber, sms):
"""
Send SMS using telnetlib, returns exception when issues with telnet communication
"""
decoded_sms = sms.encode("ascii", "ignore")
try:
time.sleep(2)
tn = telnetlib.Telnet(config.smshost, 23)
tn.read_until("username: ")
tn.write(config.smsusername + "\r\n")
tn.write(config.smspassword + "\r\n")
tn.read_until("user level = admin.")
tn.write("state1\r\n")
tn.read_until("module 1: free.\r\n]")
tn.write("module1\r\n")
tn.read_until("got!! press 'ctrl-x' to release module 1.")
tn.write("AT+CMGF=1\r\n")
tn.read_until("0\r\n")
tn.write('AT+CMGS=%s\r\n' % phonenumber)
tn.read_until("> ")
tn.write("%s\x1A" % decoded_sms)
tn.read_until("+CMGS")
tn.close()
except:
print("Unexpected error :", sys.exc_info()[0])
raise
def send_pdu_sms(pdustring, pdulenght):
"""
Send SMS using telnetlib, returns exception when issues with telnet communication
:param pdustring: is the converted sms to pdu format
:param pdulenght: is the size of the pdustring
"""
try:
time.sleep(2)
tn = telnetlib.Telnet(config.smshost, 23)
tn.read_until("username: ")
tn.write(config.smsusername + "\r\n")
tn.write(config.smspassword + "\r\n")
tn.read_until("user level = admin.")
tn.write("state1\r\n")
tn.read_until("module 1: free.\r\n]")
tn.write("module1\r\n")
tn.read_until("got!! press 'ctrl-x' to release module 1.")
tn.write("AT+CMGF=0\r\n")
tn.read_until("0\r\n")
tn.write('AT+CMGS=%s\r\n' % pdulength)
tn.read_until("> ")
tn.write("%s\r\n\x1A" % pdustring)
tn.read_until("+CMGS")
tn.close()
except:
print("Unexpected error:", sys.exc_info()[0])
raise
def usage(): def usage():
"""Prints usage""" """
usage="smsgateway.py subcommands : \n\n%s imap2sms\n%s sms <number> <message>\n%s clearallsms\n" % (sys.argv[0],sys.argv[0],sys.argv[0]) Prints usage
return usage """
usagetext = "smsgateway.py subcommands : \n\n%s imap2sms\n%s sms <number> <message>\n%s clearallsms\n" % (
sys.argv[0], sys.argv[0], sys.argv[0])
return usagetext
def debug():
"""
Debug Function
:return:
"""
return True
fh = open(config.pidfile, 'w') fh = open(config.pidfile, 'w')
try: try:
fcntl.lockf(fh, fcntl.LOCK_EX | fcntl.LOCK_NB) fcntl.lockf(fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError: except IOError:
# another instance is running # another instance is running
print 'Error: Another instance is running...' print 'Error: Another instance is running...'
sys.exit(0) sys.exit(0)
if len(sys.argv) > 1: if len(sys.argv) > 1:
if sys.argv[1] == "clearallsms": if sys.argv[1] == "clearallsms":
clearallsms() clear_all_sms()
elif sys.argv[1] == "sms":
if (len(sys.argv)==4): elif sys.argv[1] == "sms":
phonenumber=sys.argv[2] if len(sys.argv) == 4:
sms=sys.argv[3] phonenumber = sys.argv[2]
pdustring,pdulength=pduformat(phonenumber,sms) sms = sys.argv[3]
sendsms(pdustring,pdulength) pdustring, pdulength = pduformat(phonenumber, sms)
else: if config.smsformat == "pdu":
print(usage()) send_pdu_sms(pdustring, pdulength)
elif sys.argv[1] == "imap2sms": elif config.smsformat == "ascii":
mails = fetch_unread_mails() send_ascii_sms(phonenumber, sms)
for phonenumber in config.smsrecipients: else:
for mail in mails: print(usage())
sender=mail[0]
subject=mail[1] elif sys.argv[1] == "imap2sms":
sms=formatsms(imap2sms(sender,subject)) config_params = csv_config_parser(config.mailboxes)
pdustring,pdulength=pduformat(phonenumber,sms) imap2sms(config_params)
sendsms(pdustring,pdulength)
elif sys.argv[1] == "debug":
print debug()
else: else:
print(usage()) print(usage())
exit(1) exit(1)