Source code for aliyun.log.util
#!/usr/bin/env python
#encoding: utf-8
import re
import hmac
import zlib
import base64
import urllib
import socket
import hashlib
import collections
try:
import json
except ImportError:
import simplejson as json
[docs]class Util:
@staticmethod
[docs] def is_row_ip(ip):
iparray = ip.split('.')
if len(iparray)!=4:
return False
for tmp in iparray:
if not tmp.isdigit() or int(tmp)>=256:
return False
pattern = re.compile(r'^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$')
if pattern.match(ip):
return True
return False
@staticmethod
[docs] def get_host_ip(logHost):
""" If it is not match your local ip, you should fill the PutLogsRequest
parameter source by yourself.
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((logHost, 80))
ip = s.getsockname()[0]
return ip
except:
return '127.0.0.1'
finally:
s.close()
@staticmethod
[docs] def compress_data(data):
return zlib.compress(data,6)
@staticmethod
[docs] def cal_md5(content):
return hashlib.md5(content).hexdigest().upper()
@staticmethod
[docs] def hmac_sha1(content, key):
if isinstance(content, unicode): # hmac.new accept 8-bit str
content = content.encode('utf-8')
hashed = hmac.new(key, content, hashlib.sha1).digest()
return base64.encodestring(hashed).rstrip()
@staticmethod
@staticmethod
[docs] def url_encode(params):
for key, value in params.items(): # urllib.urlencode accept 8-bit str
if isinstance(value, unicode):
params[key] = value.encode('utf-8')
return urllib.urlencode(params, True)
@staticmethod
[docs] def canonicalized_resource(resource, params):
if params:
urlString = ''
for key in sorted(params.iterkeys()):
value = params[key]
if not isinstance(value, unicode): # int, float, str to unicode
value = unicode(str(value), 'utf-8')
urlString += key+'='+value+'&'
resource = resource+'?'+urlString[:-1] # strip the last &
return resource
@staticmethod
[docs] def get_request_authorization(method, resource, key, params, headers):
if not key:
return ''
content = method+"\n"
if 'Content-MD5' in headers:
content += headers['Content-MD5']
content += '\n'
if 'Content-Type' in headers:
content += headers['Content-Type']
content += "\n"
content += headers['Date']+"\n"
content += Util.canonicalized_log_headers(headers)
content += Util.canonicalized_resource(resource, params)
return Util.hmac_sha1(content, key)
@staticmethod
[docs] def convert_unicode_to_str(data):
if isinstance(data, basestring):
return str(data)
elif isinstance(data, collections.Mapping):
return dict(map(Util.convert_unicode_to_str, data.iteritems()))
elif isinstance(data, collections.Iterable):
return type(data)(map(Util.convert_unicode_to_str, data))
else:
return data
@staticmethod
[docs] def get_json_value(json_map, key, default_value = None) :
if key in json_map :
return json_map[key]
return default_value