"""
.. module:: randomdomain
:platform: Unix
.. moduleauthor:: Ammar Najjar <najjar@univention.de>
"""
from __future__ import print_function
import os
import tempfile
import time
from io import BytesIO
import pycurl
import univention.testing.utils as utils
[docs]class SimpleCurl(object):
"""pycurl simple class implementation\n
:param proxy: proxy for the http requests
:type proxy: str
:param username: to use for http requests
:type username: str
:param password: password for the user provided
:type password: str
:param bFollowLocation:
:type bFollowLocation: bool
:param maxReDirs:
:type maxReDirs: int
:param connectTimeout:
:type connectTimeout: int
:param port:
:type port: int
:param auth: authentication type
:type auth: int
"""
def __init__(
self,
proxy,
username=None,
password=None,
bFollowLocation=1,
maxReDirs=5,
connectTimout=10,
timeOut=10,
port=3128,
auth=pycurl.HTTPAUTH_BASIC,
cookie=None,
):
# Perform basic authentication by default
self.curl = pycurl.Curl()
self.curl.setopt(pycurl.FOLLOWLOCATION, bFollowLocation)
self.curl.setopt(pycurl.MAXREDIRS, maxReDirs)
self.curl.setopt(pycurl.CONNECTTIMEOUT, connectTimout)
self.curl.setopt(pycurl.TIMEOUT, timeOut)
self.curl.setopt(pycurl.PROXY, proxy)
self.curl.setopt(pycurl.PROXYPORT, port)
self.curl.setopt(pycurl.PROXYAUTH, auth)
account = utils.UCSTestDomainAdminCredentials()
username = username if username else account.username
password = password if password else account.bindpw
self.curl.setopt(pycurl.PROXYUSERPWD, "%s:%s" % (username, password))
with tempfile.NamedTemporaryFile() as tmp:
self.cookieFilename = tmp.name
self.curl.setopt(pycurl.COOKIEJAR, self.cookieFilename)
self.curl.setopt(pycurl.COOKIEFILE, self.cookieFilename)
[docs] def cookies(self):
return self.curl.getinfo(pycurl.INFO_COOKIELIST)
[docs] def getPage(self, url, bVerbose=False, postData=None, encoding="UTF-8"):
"""Gets a http page
this method keep trying to fetch the page for 60secs then stops
raising and exception if not succeeded.\n
:param url: url
:type url: str
:param bVerbose: if verbose
:type bVerbose: bool
:param postData:
:type postData:
:returns: html page
"""
self.curl.setopt(pycurl.URL, str(url))
self.curl.setopt(pycurl.VERBOSE, bVerbose)
if postData:
self.curl.setopt(pycurl.HTTPPOST, postData)
buf = BytesIO()
self.curl.setopt(pycurl.WRITEFUNCTION, buf.write)
print("getting page:", url)
for i in range(60):
try:
self.curl.perform()
break
except Exception:
time.sleep(1)
print(".")
if i == 59:
print("Requested page could not be fetched")
raise
continue
page = buf.getvalue()
# print page[1:150]
buf.close()
return page.decode(encoding)
[docs] def httpCode(self):
"""HTTP status code\n
:returns: int - http_status_code
"""
return self.curl.getinfo(pycurl.HTTP_CODE)
[docs] def response(self, url):
"""HTTP status code\n
:param url: url
:type url: str
:returns: int - HTTP status code
"""
self.getPage(url)
# print page[0:200]
return self.httpCode()
[docs] def close(self):
"""Close the curl connection"""
self.curl.close()
def __del__(self):
self.curl.close()
if os.path.exists(self.cookieFilename):
os.remove(self.cookieFilename)