first commit

This commit is contained in:
Gitea
2020-05-15 10:33:06 +02:00
parent 4cfa70f385
commit 0732bb0945

439
gitea_update.py Normal file
View File

@@ -0,0 +1,439 @@
import requests
import os
import json
from urllib3.exceptions import InsecureRequestWarning
import time
import tempfile
import urllib.parse as urlparse
import copy
import subprocess
import types
from packaging import version
import shutil
import sys
import email
dryrun = True
gitea_path = os.path.join('/usr','local','bin')
if hasattr(sys.stdout,'name') and sys.stdout.name == '<stdout>':
# not in pyscripter debugger
try:
from clint.textui.progress import Bar as ProgressBar
except ImportError:
# for build time
ProgressBar = None
else:
ProgressBar = None
def httpdatetime2time(httpdate):
"""convert a date string as returned in http headers or mail headers to isodate
>>> import requests
>>> last_modified = requests.head('http://wapt/wapt/Packages',headers={'cache-control':'no-cache','pragma':'no-cache'}).headers['last-modified']
>>> len(httpdatetime2isodate(last_modified)) == 19
True
"""
if httpdate:
date_time_tz = email.utils.parsedate_tz(httpdate)
return time.mktime(date_time_tz[:9]) - date_time_tz[9]
else:
return None
def default_http_headers():
return {
'cache-control':'no-cache',
'pragma':'no-cache',
'user-agent':'Mozilla/5.0 (Linux x86_64; rv:75.0) Gecko/20100101 Firefox/75.0',
}
def get_requests_client_cert_session(url=None,cert=None, verify=True, proxies = {'http':None,'https':None},**kwargs):
"""Returns a requests Session which is aware of client cert auth with password protected key
Disable use of environ.
Args:
url (str): base prefix url for which the session is created
cert (tuple) : (certfilename,pem encoded key filename, key password)
verify (bool or str) : verify server certificate. Id str, path to trusted CA bundle
Returns:
Session
"""
result = requests.Session()
# be sure to not use HTTP_PROXY or HTTPS_PROXY environ variable
result.trust_env = False
result.headers = default_http_headers()
result.verify = verify
result.proxies = proxies
if not verify:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # pylint: disable=no-member
if url is not None and cert is not None:
cert_path = cert[0]
key_path = cert[1]
if cert_path is not None and key_path is not None and os.path.isfile(cert_path) and os.path.isfile(key_path):
# no client cert auth
if len(cert)<3:
# append empty password
cert = (cert[0],cert[1],None)
result.mount(url, SSLAdapter(cert[0],cert[1],cert[2],**kwargs))
return result
def get_disk_free_space(filepath):
"""
Returns the number of free bytes on the drive that filepath is on
"""
if os.name == 'nt':
import win32file
secs_per_cluster, bytes_per_sector, free_clusters, total_clusters = win32file.GetDiskFreeSpace(filepath) #pylint: disable=no-member
return secs_per_cluster * bytes_per_sector * free_clusters
else:
# like shutil
def disk_usage(path):
# pylint: disable=no-member
# no error
st = os.statvfs(path)
free = st.f_bavail * st.f_frsize
total = st.f_blocks * st.f_frsize
used = (st.f_blocks - st.f_bfree) * st.f_frsize
return (total, used, free)
total, used, free = disk_usage(filepath)
return free
def wget(url,target=None,printhook=None,proxies=None,connect_timeout=10,download_timeout=None,verify_cert=None,referer=None,
user_agent=None,cert=None,resume=False,md5=None,sha1=None,sha256=None,cache_dir=None,requests_session=None,limit_bandwidth=None):
r"""Copy the contents of a file from a given URL to a local file.
Args:
url (str): URL to document
target (str) : full file path of downloaded file. If None, put in a temporary dir with supplied url filename (final part of url)
proxies (dict) : proxies to use. eg {'http':'http://wpad:3128','https':'http://wpad:3128'}
timeout (int) : seconds to wait for answer before giving up
auth (list) : (user,password) to authenticate with basic auth
verify_cert (bool or str) : either False, True (verify with embedded CA list), or path to a directory or PEM encoded CA bundle file
to check https certificate signature against.
cert (list) : tuple/list of (x509certfilename,pemkeyfilename,key password) for authenticating the client. If key is not encrypted, password must be None
referer (str):
user_agent:
resume (bool):
md5 (str) :
sha1 (str) :
sha256 (str) :
cache_dir (str) : if file exists here, and md5 matches, copy from here instead of downloading. If not, put a copy of the file here after downloading.
requests_session (request.Session) : predefined request session to use instead of building one from scratch from proxies, cert, verfify_cert
Returns:
str : path to downloaded file
>>> respath = wget('http://wapt.tranquil.it/wapt/tis-firefox_28.0.0-1_all.wapt','c:\\tmp\\test.wapt',proxies={'http':'http://proxy:3128'})
???
>>> os.stat(respath).st_size>10000
True
>>> respath = wget('http://localhost:8088/runstatus','c:\\tmp\\test.json')
???
"""
start_time = time.time()
last_time_display = 0.0
last_downloaded = 0
def reporthook(received,total):
if total is not None:
total = float(total)
else:
total = received
if received>1:
# print only every second or at end
if (time.time()-start_time>1) and ((time.time()-last_time_display>=1) or (received>=total)):
speed = received /(1024.0 * (time.time()-start_time))
if printhook:
printhook(received,total,speed,url)
elif sys.stdout is not None:
try:
if received == 0:
print(u"Downloading %s (%.1f Mb)" % (url,int(total)/1024/1024))
elif received>=total:
print(u" -> download finished (%.0f Kb/s)" % (total /(1024.0*(time.time()+.001-start_time))))
else:
print(u'%i / %i (%.0f%%) (%.0f KB/s)\r' % (received,total,100.0*received/total,speed))
except:
return False
return True
else:
return False
if target is None:
target = tempfile.gettempdir()
if os.path.isdir(target):
target = os.path.join(target,'')
(dir,filename) = os.path.split(target)
if not filename:
url_parts = urlparse.urlparse(url)
filename = url_parts.path.split('/')[-1]
if not dir:
dir = os.getcwd()
if not os.path.isdir(dir):
os.makedirs(dir)
if requests_session is None:
if verify_cert is None:
verify_cert = False
requests_session = get_requests_client_cert_session(url,cert=cert,verify=verify_cert,proxies=proxies)
elif proxies is not None or verify_cert is not None or cert is not None:
raise Exception('wget: requests_session and proxies,verify_cert,cert are mutually exclusive')
with requests_session as session:
target_fn = os.path.join(dir,filename)
# return cached file if md5 matches.
if (md5 is not None or sha1 is not None or sha256 is not None) and cache_dir is not None and os.path.isdir(cache_dir):
cached_filename = os.path.join(cache_dir,filename)
if os.path.isfile(cached_filename):
if _check_hash_for_file(cached_filename,md5=md5,sha1=sha1,sha256=sha256):
resume = False
if cached_filename != target_fn:
shutil.copy2(cached_filename,target_fn)
return target_fn
else:
cached_filename = None
headers = copy.copy(session.headers)
if referer != None:
headers.update({'referer': '%s' % referer})
if user_agent != None:
headers.update({'user-agent': '%s' % user_agent})
if os.path.isfile(target_fn) and resume:
try:
actual_size = os.stat(target_fn).st_size
size_req = session.head(url,
timeout=connect_timeout,
headers=headers,
allow_redirects=True)
target_size = int(size_req.headers['content-length'])
file_date = size_req.headers.get('last-modified',None)
if target_size > actual_size:
headers.update({'Range':'bytes=%s-' % (actual_size,)})
write_mode = 'ab'
elif target_size < actual_size:
target_size = None
write_mode = 'wb'
except Exception:
target_size = None
write_mode = 'wb'
else:
file_date = None
actual_size = 0
target_size = None
write_mode = 'wb'
# check hashes if size equal
if resume and (md5 is not None or sha1 is not None or sha256 is not None) and target_size is not None and (target_size <= actual_size):
if not _check_hash_for_file(target_fn,md5=md5,sha1=sha1,sha256=sha256):
# restart download...
target_size = None
write_mode = 'wb'
if not resume or target_size is None or (target_size - actual_size) > 0:
httpreq = session.get(url,
stream=True,
timeout=connect_timeout,
headers=headers,
allow_redirects=True)
httpreq.raise_for_status()
total_bytes = None
if 'content-length' in httpreq.headers:
total_bytes = int(httpreq.headers['content-length'])
target_free_bytes = get_disk_free_space(os.path.dirname(os.path.abspath(target)))
if total_bytes > target_free_bytes:
raise Exception('wget : not enough free space on target drive to get %s MB. Total size: %s MB. Free space: %s MB' % (url,total_bytes // (1024*1024),target_free_bytes // (1024*1024)))
# 1Mb max, 1kb min
chunk_size = min([1024*1024,max([total_bytes/100,2048])])
else:
chunk_size = 1024*1024
cnt = 0
if printhook is None and ProgressBar is not None and total_bytes:
progress_bar = ProgressBar(label=filename,expected_size=target_size or total_bytes, filled_char='=')
progress_bar.show(actual_size)
else:
progress_bar = None
with open(target_fn,write_mode) as output_file:
last_time_display = time.time()
last_downloaded = 0
if httpreq.ok:
if limit_bandwidth:
sleep_time = chunk_size/(limit_bandwidth*1024*1024)
else:
sleep_time = 0
print(chunk_size)
for chunk in httpreq.iter_content(chunk_size=int(chunk_size)):
time.sleep(sleep_time)
output_file.write(chunk)
output_file.flush()
cnt +=1
if download_timeout is not None and (time.time()-start_time>download_timeout):
raise requests.Timeout(r'Download of %s takes more than the requested %ss'%(url,download_timeout))
if printhook is None and ProgressBar is not None and progress_bar:
if (time.time()-start_time>0.2) and (time.time()-last_time_display>=0.2):
progress_bar.show(actual_size + cnt*len(chunk))
last_time_display = time.time()
else:
if reporthook(cnt*len(chunk),total_bytes):
last_time_display = time.time()
last_downloaded += len(chunk)
if printhook is None and ProgressBar is not None and progress_bar:
progress_bar.show(total_bytes or last_downloaded)
progress_bar.done()
last_time_display = time.time()
elif reporthook(last_downloaded,total_bytes or last_downloaded):
last_time_display = time.time()
# check hashes
if sha256 is not None:
file_hash = _hash_file(target_fn,hash_func=hashlib.sha256)
if file_hash != sha256:
raise Exception(u'Downloaded file %s sha256 %s does not match expected %s' % (url,file_hash,sha256))
elif sha1 is not None:
file_hash = _hash_file(target_fn,hash_func=hashlib.sha1)
if file_hash != sha1:
raise Exception(u'Downloaded file %s sha1 %s does not match expected %s' % (url,file_hash,sha1))
elif md5 is not None:
file_hash = _hash_file(target_fn,hash_func=hashlib.md5)
if file_hash != md5:
raise Exception(u'Downloaded file %s md5 %s does not match expected %s' % (url,file_hash,md5))
file_date = httpreq.headers.get('last-modified',None)
if file_date:
file_datetime_utc = httpdatetime2time(file_date)
if time.daylight:
file_datetime_local = file_datetime_utc - time.altzone
else:
file_datetime_local = file_datetime_utc - time.timezone
os.utime(target_fn,(file_datetime_local,file_datetime_local))
# cache result
if cache_dir:
if not os.path.isdir(cache_dir):
os.makedirs(cache_dir)
cached_filename = os.path.join(cache_dir,filename)
if target_fn != cached_filename:
shutil.copy2(target_fn,cached_filename)
return target_fn
def wgets(url,proxies=None,verify_cert=None,referer=None,user_agent=None,timeout=None,cert=None,requests_session=None):
"""Return the content of a remote resource as a String with a http get request.
Raise an exception if remote data can't be retrieved.
Args:
url (str): http(s) url
proxies (dict): proxy configuration as requests requires it {'http': url, 'https':url}
verify_cert (bool or str) : verfiy server certificate, path to trusted CA bundle
cert (tuple of 3 str) : (cert_path, key_path, key password) client side authentication.
requests_session (request.Session) : predefined request session to use instead of building one from scratch
Returns:
str : content of remote resource
>>> data = wgets('https://wapt/ping')
>>> "msg" in data
True
"""
if requests_session is None:
if verify_cert is None:
verify_cert = False
requests_session = get_requests_client_cert_session(url,cert=cert,verify=verify_cert,proxies=proxies)
elif proxies is not None or verify_cert is not None or cert is not None:
raise Exception('wgets: requests_session and proxies,verify_cert,cert are mutually exclusive')
with requests_session as session:
if referer != None:
session.headers.update({'referer': '%s' % referer})
if user_agent != None:
session.headers.update({'user-agent': '%s' % user_agent})
r = session.get(url,timeout=timeout,allow_redirects=True)
if r.ok:
return r.content
else:
r.raise_for_status()
def need_upgrade():
filename,url = [(str(p['name']),str(p['browser_download_url'])) for p in json.loads(wgets('https://api.github.com/repos/go-gitea/gitea/releases/latest'))['assets'] if 'linux-amd64' in p['name']][0]
# print(filename)
# print(url)
print('\nStart download file %s in %s\n' % (filename,url))
if dryrun:
wget(url,target='/tmp')
print('Move new file in %s' % gitea_path)
if dryrun:
shutil.move(os.path.join('/tmp',filename),gitea_path)
print('Remove old sym link')
if dryrun:
os.remove(os.path.join(gitea_path,'gitea'))
print('Create new sym link')
if dryrun:
os.symlink(os.path.join(gitea_path,filename), os.path.join(gitea_path,'gitea'))
print('Set chmod')
if dryrun:
os.chmod(os.path.join(gitea_path,'gitea'),755)
print('Set chown')
if dryrun:
os.chown(os.path.join(gitea_path,'gitea'),106,0)
print('Migrate database to new version')
if dryrun:
subprocess.check_output('sudo -u git gitea migrate -c /etc/gitea/app.ini',shell=True)
print('Restart gitea')
if dryrun:
subprocess.check_output('systemctl restart gitea',shell=True)
if __name__ == '__main__':
gitea_version = subprocess.check_output('gitea --version',shell=True)
installed_version = str(gitea_version[14:20].decode('utf-8'))
print('Installed version : ', installed_version)
last_version = str(json.loads(wgets('https://api.github.com/repos/go-gitea/gitea/releases/latest'))['tag_name'])[1:]
print('Last version : ',last_version)
if version.parse(installed_version) < version.parse(last_version):
print('Need upgrade')
need_upgrade()
else:
print('Nothing to do')