Compare commits

..

No commits in common. "master" and "225c7ecabb2170214a356bda9f7b32d4b621f728" have entirely different histories.

8 changed files with 367 additions and 2056 deletions

7
.gitignore vendored
View File

@ -1,5 +1,4 @@
backup*/
wp-navigation
*.log
backup/
backup1/
web_scrap.log
__pycache__/
wp-gallery

View File

@ -3,8 +3,8 @@
TAR=/usr/bin/tar
PYTHON=/usr/bin/python3
GZIP=/usr/bin/gzip
SCRIPTDIR=/home/valentin/script/webscrap
WEBSCRAP=${SCRIPTDIR}/import_export_canalblog.py
SCRIPTDIR=/home/valentin/script
WEBSCRAP=${SCRIPTDIR}/web_scrap.py
URL=www.clarissariviere.com
DATE=$(date +%Y%m%d)
DIRECTORY=/home/valentin/backup
@ -24,7 +24,7 @@ else
fi
subject="${subject} ${URL} ${DATE}"
echo > ${BACKUPDIR}/${LOGFILE}
${PYTHON} ${WEBSCRAP} --quiet --logfile ${BACKUPDIR}/${LOGFILE} --parallel 20 export --url ${URL} --directory ${DIRECTORY}
${PYTHON} ${WEBSCRAP} --url ${URL} --dir ${DIRECTORY} --quiet --logfile ${BACKUPDIR}/${LOGFILE}
if [ ${?} -ne 0 ]; then
subject="${subject} echoue : recuperation page"
echo ${subject} | mail -s "${subject}" -A ${BACKUPDIR}/${LOGFILE} ${SENDER}

View File

@ -2,143 +2,9 @@
from requests.auth import HTTPBasicAuth
from getpass import getpass
from urllib.parse import urlparse
from concurrent import futures
from concurrent.futures import as_completed, wait, ALL_COMPLETED
import argparse, logging, threading, os, glob
import argparse, logging
from lib.WPImport import WPimport
from lib.WPExport import WPExport
from lib.WPRemove import WPRemove
from lib.WPChange import WPChange
from lib.WPMenu import WPMenu
def errorRevert(logger, revert, tmp):
if revert is True:
files_tmp = glob.glob("{0}/*.json".format(tmp))
if len(files_tmp) == 0:
logger.error("Error revert, because files not found")
exit(1)
if len(files_tmp) != int(args.parallel):
for file_r in files_tmp:
os.remove(file_r)
logger.error("Error revert, because number files tmp is incompatible with parallel number")
exit(1)
def change(index, number, args, logger, tmp, revert):
changeWp = WPChange(logger=logger, index_name=index, number_thread=number, tmp=tmp)
changeWp.fromDirectory(args.directory, revert)
del changeWp
def remove(index, number, args, basic, logger, ssl_wordpress):
removeWp = WPRemove(basic=basic, wordpress="", logger=logger, ssl_wordpress=ssl_wordpress, index_name=index, number_thread=number)
if args.remove == True:
for i in args.wordpress.split(","):
removeWp.setUrl(i)
removeWp.cleanPosts()
removeWp.cleanTags()
removeWp.cleanCategories()
removeWp.cleanMedia()
else:
for i in args.wordpress.split(","):
removeWp.setUrl(i)
if args.posts == True:
removeWp.cleanPosts()
if args.categories == True:
removeWp.cleanCategories()
if args.tags == True:
removeWp.cleanTags()
if args.media == True:
removeWp.cleanMedia()
del removeWp
def download(name_thread, max_thread, url, logger, parser, directory, html, img, ssl_canalblog, revert, tmp):
exportWp = WPExport(name="Thread-{0}".format(int(name_thread) + 1), url=url, logger=logger, parser=parser, directory=directory, ssl_canalblog=ssl_canalblog)
if revert is False:
exportWp.getUrlPage(name_thread, max_thread)
for i in ["article", "page"]:
for j in ["publications", "principal"]:
if html is False:
exportWp.downloadHTML(j, i)
if img is False:
exportWp.downloadImg(j, i)
del exportWp
def importUrl(name_thread, max_thread, canalblog, logger, parser, wordpress, basic, serial, ssl_wordpress, ssl_canalblog, create, update, image, revert, tmp, author):
canalblog = canalblog.split(",")
wordpress = wordpress.split(",")
name = "Thread-{0}".format(int(name_thread) + 1)
protocol = "https"
if ssl_canalblog is False:
protocol = "http"
if serial is False:
for canal in canalblog:
try:
o = urlparse(canal)
o = o._replace(scheme=protocol)
url = o.geturl().replace(":///", "://")
except Exception as err:
logger.error("{0} : parsing error : {1}".format(name, err))
exit(1)
exportWp = WPExport(name="Thread-{0}".format(int(name_thread) + 1), url=url, logger=logger, parser=parser, ssl_canalblog=ssl_canalblog, tmp=tmp)
if not revert:
exportWp.getUrlPage(name_thread, max_thread)
del exportWp
for j in wordpress:
importWp = WPimport(name=name, basic=basic, wordpress=j, logger=logger, parser=parser, ssl_wordpress=ssl_wordpress, no_create=create, no_update=update, no_image=image, tmp=tmp, author=author)
for k in ["article", "page"]:
for l in ["publications", "principal"]:
importWp.fromUrl(l, k)
del importWp
else:
if len(canalblog) != len(wordpress):
logger.error("{0} : ERREUR : Le nombre de dossier n'est pas equivalent au nombre d'URL wordpress".format(name))
exit(1)
for i in range(0, len(canalblog)-1):
try:
o = urlparse(canalblog[i])
o = o._replace(scheme=protocol)
url = o.geturl().replace(":///", "://")
except Exception as err:
logger.error("parsing error : {0}".format(err))
exit(1)
exportWp = WPExport(name=name, url=url, logger=logger, parser=parser, ssl_canalblog=ssl_canalblog)
if not revert:
exportWp.getUrlPage(name_thread, max_thread)
del exportWp
importWp = WPimport(name=name, basic=basic, wordpress=wordpress[i], logger=logger, parser=parser, ssl_wordpress=ssl_wordpress, no_create=create, no_update=update, no_image=image, tmp=tmp, author=author)
for k in ["article", "page"]:
for l in ["publications", "principal"]:
importWp.fromUrl(webpage[l][k])
del importWp
def importDirectory(name_thread, max_thread, directory, logger, parser, wordpress, basic, serial, ssl_wordpress, create, update, image, revert, author):
name = "Thread-{0}".format(int(name_thread) + 1)
directory = directory.split(",")
wordpress = wordpress.split(",")
if serial is False:
for i in wordpress:
importWp = WPimport(name=name, basic=basic, wordpress=i, logger=logger, parser=parser, ssl_wordpress=ssl_wordpress, no_create=create, no_update=update, no_image=image, author=author)
for j in directory:
importWp.fromDirectory(j, name_thread, max_thread, revert)
del importWp
else:
if len(directory) != len(wordpress):
logger.error("{0} : Error : Number directory is different than wordpress".format(name))
exit(1)
for i in range(0, len(wordpress)-1):
importWp = WPimport(name=name, basic=basic, wordpress=wordpress[i], logger=logger, parser=parser, ssl_wordpress=ssl_wordpress, no_create=create, no_update=update, no_image=image, author=author)
importWp.fromDirectory(directory[i], name_thread, max_thread, revert)
del importWp
if __name__ == '__main__':
@ -147,45 +13,16 @@ if __name__ == '__main__':
parser.add_argument("--logfile", help="Log file", default="")
parser.add_argument("--quiet", help="No console output", action="store_true")
parser.add_argument("--parser", help="Parser content", default="html.parser")
parser.add_argument("--parallel", help="Define number thread (default : 1)", default=1)
parser.add_argument("--no-ssl", help="No ssl for canalblog and/or wordpress (example wordpress,canalblog)", dest="ssl", default="")
parser.add_argument("--revert", help="Restart a work from stopping work", action="store_true")
parser.add_argument("--tmp", help="directory tmp", default="/tmp/import_export_canablog")
subparsers = parser.add_subparsers(dest="command")
import_parser = subparsers.add_parser("import")
import_parser.add_argument("--user", help="wordpress user", required=True)
import_parser.add_argument("--password", help="password wordpress's user", default="")
import_parser.add_argument("--file", help="HTML file", default="")
import_parser.add_argument("--directory", help="HTML directory", default="")
import_parser.add_argument("--canalblog", help="URL Canalblog", default="")
import_parser.add_argument("--wordpress", help="URL Wordpress", required=True)
import_parser.add_argument("--serial", help="Serial execution", action="store_true")
import_parser.add_argument("--remove-all", dest="remove", help="Remove all", action="store_true")
import_parser.add_argument("--remove-posts", help="Remove all posts", dest="posts", action="store_true")
import_parser.add_argument("--remove-categories", help="Remove all categories", dest="categories", action="store_true")
import_parser.add_argument("--remove-tags", help="Remove all tags", dest="tags", action="store_true")
import_parser.add_argument("--remove-media", help="Remove all media", dest="media", action="store_true")
import_parser.add_argument("--no-create", help="No create post", dest="create", default="store_false", action="store_true")
import_parser.add_argument("--no-update", help="No update post", dest="update", default="store_false", action="store_true")
import_parser.add_argument("--no-image", help="No image add or update", dest="image", default="store_false", action="store_true")
import_parser.add_argument("--no-menu", help="No menu add or update", dest="menu", default="store_false", action="store_true")
import_parser.add_argument("--author", dest="author", help="Define author", default="")
remove_parser = subparsers.add_parser("remove")
remove_parser.add_argument("--user", help="wordpress user", required=True)
remove_parser.add_argument("--password", help="password wordpress's user", default="")
remove_parser.add_argument("--wordpress", help="URL Wordpress", required=True)
remove_parser.add_argument("--all", dest="remove", help="Remove all (posts, media, tags, categories)", action="store_true")
remove_parser.add_argument("--posts", help="Remove all posts", action="store_true")
remove_parser.add_argument("--categories", help="Remove all categories", action="store_true")
remove_parser.add_argument("--tags", help="Remove all tags", action="store_true")
remove_parser.add_argument("--media", help="Remove all media", action="store_true")
export_parser = subparsers.add_parser("export")
@ -199,33 +36,12 @@ if __name__ == '__main__':
export_parser.add_argument("--no-img", help="No img", dest="img", action="store_true")
export_parser.add_argument("--no-html", help="No HTML", dest="html", action="store_true")
change_parser = subparsers.add_parser("change")
change_parser.add_argument("--directory",
default="",
help="Directory")
change_parser.add_argument("--file",
default="",
help="File")
menu_parser = subparsers.add_parser("menu")
menu_parser.add_argument("--user", help="wordpress user", required=True)
menu_parser.add_argument("--password", help="password wordpress's user", default="")
menu_parser.add_argument("--file", help="HTML file", default="")
menu_parser.add_argument("--canalblog", help="URL Canalblog", default="")
menu_parser.add_argument("--wordpress", help="URL Wordpress", required=True)
args = parser.parse_args()
logger = logging.getLogger('import export canalblog')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ssl_canalblog = True
ssl_wordpress = True
for i in args.ssl.split(","):
if i == "canalblog":
ssl_canalblog = False
if i == "wordpress":
ssl_wordpress = False
if args.quiet is False:
ch = logging.StreamHandler()
@ -248,75 +64,80 @@ if __name__ == '__main__':
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
os.makedirs(args.tmp, exist_ok=True)
if args.command == "import" or args.command == "remove" or args.command == "menu":
password = args.password
if len(args.password) == 0:
password = getpass()
if len(password) == 0:
logger.error("No password error !!! ")
exit(1)
if args.command == "import":
password = getpass()
if len(password) == 0:
logger.error("No password error !!! ")
exit(1)
basic = HTTPBasicAuth(args.user, password)
if args.command == "import":
wordpress = args.wordpress.split(",")
importWp = WPimport(basic=basic, wordpress="", logger=logger, parser=args.parser, ssl_wordpress=ssl_wordpress, author=args.author, ssl_canalblog=ssl_canalblog)
importWp = WPimport(basic, "", logger, args.parser)
if len(args.file) > 0:
for i in wordpress:
importWp.setUrl(i)
importWp.fromFile(files=args.file.split(","))
if args.menu is False:
menuWp = WPMenu(name="Thread-1", basic=basic, wordpress=args.wordpress, logger=logger, parser=args.parser, ssl_canalblog=ssl_canalblog, ssl_wordpress=ssl_wordpress)
menuWp.fromFile("{0}".format(args.file.split(",")[0]))
importWp.fromFile(args.file.split(","))
exit(0)
if len(args.directory) > 0:
try:
with futures.ThreadPoolExecutor(max_workers=int(args.parallel)) as ex:
wait_for = [ ex.submit(remove, i, args.parallel, args, basic, logger, ssl_wordpress) for i in range(0, int(args.parallel)) ]
wait(wait_for, return_when=ALL_COMPLETED)
errorRevert(logger, args.revert, args.tmp)
wait_for = [
ex.submit(importDirectory, i, int(args.parallel), args.directory, logger, args.parser, args.wordpress, basic, args.serial, ssl_wordpress, args.create, args.update, args.image, args.revert, args.author)
for i in range(0, int(args.parallel))
]
if args.menu is False:
wait(wait_for, return_when=ALL_COMPLETED)
menuWp = WPMenu(name="Thread-1", basic=basic, wordpress=args.wordpress, logger=logger, parser=args.parser, ssl_canalblog=ssl_canalblog, ssl_wordpress=ssl_wordpress)
menuWp.fromFile("{0}/index.html".format(args.directory))
except Exception as err:
logger.error("Threading error : {0}".format(err))
directory = args.directory.split(",")
if args.serial is False:
for i in wordpress:
importWp.setUrl(i)
for j in directory:
importWp.fromDirectory(j)
else:
if len(directory) != len(wordpress):
logger.error("ERREUR : Le nombre de dossier n'est pas equivalent au nombre d'URL wordpress")
exit(1)
for i in range(0, len(wordpress)-1):
importWp.setUrl(wordpress[i])
importWp.fromDirectory(directory[i])
exit(0)
if len(args.canalblog) > 0:
try:
with futures.ThreadPoolExecutor(max_workers=int(args.parallel)) as ex:
wait_for = [ ex.submit(remove, i, args.parallel, args, basic, logger, ssl_wordpress) for i in range(0, int(args.parallel)) ]
wait(wait_for, return_when=ALL_COMPLETED)
errorRevert(logger, args.revert, args.tmp)
wait_for = [
ex.submit(importUrl, i, int(args.parallel), args.canalblog, logger, args.parser, args.wordpress, basic, args.serial, ssl_wordpress, ssl_canalblog, args.create, args.update, args.image, args.revert, args.tmp, args.author)
for i in range(0, int(args.parallel))
]
if args.menu is False:
wait(wait_for, return_when=ALL_COMPLETED)
menuWp = WPMenu(name="Thread-1", basic=basic, wordpress=args.wordpress, logger=logger, parser=args.parser, ssl_canalblog=ssl_canalblog, ssl_wordpress=ssl_wordpress)
menuWp.fromUrl(args.canalblog)
exportWp = WPExport("", logger, args.parser, args.directory)
canalblog = args.canalblog.split(",")
wordpress = args.wordpress.split(",")
if args.serial is False:
for canal in canalblog:
try:
o = urlparse(canal)
o = o._replace(scheme="https")
url = o.geturl().replace(":///", "://")
except Exception as err:
logger.error("parsing error : {0}".format(err))
exit(1)
exportWp.setUrl(url)
webpage = exportWp.getUrlPage()
for j in wordpress:
importWp.setUrl(j)
importWp.fromUrl(webpage)
else:
if len(canalblog) != len(wordpress):
logger.error("ERREUR : Le nombre de dossier n'est pas equivalent au nombre d'URL wordpress")
exit(1)
for i in range(0, len(canalblog)-1):
try:
o = urlparse(canalblog[i])
o = o._replace(scheme="https")
url = o.geturl().replace(":///", "://")
except Exception as err:
logger.error("parsing error : {0}".format(err))
exit(1)
exportWp.setUrl(url)
webpage = exportWp.getUrlPage()
importWp.setUrl(wordpress[i])
importWp.fromUrl(webpage)
except Exception as err:
logger.error("Threading error : {0}".format(err))
exit(0)
if args.command == "export":
canalblog = args.url.split(",")
protocol = "https"
if ssl_canalblog is False:
protocol = "http"
exportWp = WPExport(logger=logger, parser=args.parser, directory=args.directory, ssl_canalblog=ssl_canalblog)
exportWp = WPExport("", logger, args.parser, args.directory)
for canal in canalblog:
try:
o = urlparse(canal)
o = o._replace(scheme=protocol)
o = o._replace(scheme="https")
url = o.geturl().replace(":///", "://")
except Exception as err:
logger.error("parsing error : {0}".format(err))
@ -327,48 +148,12 @@ if __name__ == '__main__':
if args.css is False:
exportWp.downloadCss()
del exportWp
if args.html is False or args.img is False:
try:
with futures.ThreadPoolExecutor(max_workers=int(args.parallel)) as ex:
wait_for = [
ex.submit(download, i, int(args.parallel), url, logger, args.parser, args.directory, args.html, args.img, ssl_canalblog, args.revert, args.tmp)
for i in range(0, int(args.parallel))
]
except Exception as err:
logger.error("Threading error : {0}".format(err))
exit(0)
webpage = exportWp.getUrlPage()
if args.html is False:
exportWp.downloadHTML(webpage)
if args.command == "remove":
try:
with futures.ThreadPoolExecutor(max_workers=int(args.parallel)) as ex:
wait_for = [ ex.submit(remove, i, args.parallel, args, basic, logger, ssl_wordpress) for i in range(0, int(args.parallel)) ]
except Exception as err:
logger.error("Thread error for remove : {0}".format(err))
exit(0)
if args.command == "change":
if len(args.directory) > 0:
try:
with futures.ThreadPoolExecutor(max_workers=int(args.parallel)) as ex:
errorRevert(logger, args.revert, args.tmp)
wait_for = [ ex.submit(change, i, args.parallel, args, logger, args.tmp, args.revert) for i in range(0, int(args.parallel)) ]
except Exception as err:
logger.error("Thread error for remove : {0}".format(err))
if len(args.file) > 0:
changeWp = WPChange(logger=logger)
for filei in args.file.split(","):
changeWp.fromFile(filei)
exit(0)
if args.command == "menu":
menuWp = WPMenu(name="Thread-1", basic=basic, wordpress=args.wordpress, logger=logger, parser=args.parser, ssl_canalblog=ssl_canalblog, ssl_wordpress=ssl_wordpress)
if len(args.file) > 0:
menuWp.fromFile(args.file)
if len(args.canalblog) > 0:
menuWp.fromUrl(args.canalblog)
if args.img is False:
exportWp.downloadImg(webpage)
exit(0)

View File

@ -1,173 +0,0 @@
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import requests, os, logging, re, json
class WPChange:
# Constructor
def __init__(self, index_name=1, number_thread=1, logger=None, parser="html.parser", tmp="/tmp/import_export_canablog"):
self._name = "Thread-{0}".format(index_name)
self._logger = logger
self._number_thread = number_thread
self._parser = parser
self._tmp = tmp
self._index_name = index_name
# Destructor
def __del__(self):
print("{0} : Import finished".format(self._name))
# Public method
## from file
def fromFile(self, files=[], number_thread=1, max_thread=1):
divFiles = int(len(files) / max_thread)
currentRangeFiles = int(divFiles * (number_thread))
firstRange = int(currentRangeFiles - divFiles)
self._logger.debug("{0} : index : {1}".format(self._name,number_thread))
self._logger.debug("{0} : first range : {1}".format(self._name,firstRange))
self._logger.debug("{0} : last range : {1}".format(self._name,currentRangeFiles))
for i in range(firstRange, currentRangeFiles):
if os.path.exists(files[i]):
self._logger.info("{0} : ({1}/{2}) File is being processed : {3}".format(self._name, i+1, currentRangeFiles + 1, files[i]))
self._change(files[i])
## From directory
def fromDirectory(self, directory="", revert=False):
self._directory = directory
directory = "{0}/archives".format(directory)
directories = self._getDirectories([], "{0}".format(directory))
if len(directories) > 0:
files = self._getFiles(directories)
if revert is False:
self._tmpFiles(files=files, number_thread=self._index_name, max_thread=self._number_thread)
self._fromFileTmp()
else:
self._logger.error("{0} : No files for {1}".format(self._name, directory))
def fromFile(self, files=[]):
for i in range(0, len(files)):
if os.path.exists(files[i]):
self._logger.info("{0} : ({1}/{2}) File is being processed : {3}".format(self._name, i+1, len(files), files[i]))
self._change(files[i])
# Private method
def _fromFileTmp(self):
try:
with open("{0}/{1}.json".format(self._tmp, self._name)) as file:
files = json.loads(file.read())
self._logger.debug("{0} : size of webpage : {1}".format(self._name, len(files)))
for i in range(0, len(files)):
if os.path.exists(files[i]):
self._logger.info("{0} : ({1}/{2}) File is being processed : {3}".format(self._name, i+1, len(files), files[i]))
self._change(files[i])
except Exception as ex:
self._logger.error("{0} : Read file json from tmp : {1}".format(self._name, ex))
def _tmpFiles(self, files=[], number_thread=1, max_thread=1):
print()
divFiles = int(len(files) / int(max_thread))
currentRangeFiles = int(divFiles * (int(number_thread)+1))
firstRange = int(currentRangeFiles - divFiles)
self._logger.debug("{0} : index : {1}".format(self._name,number_thread))
self._logger.debug("{0} : first range : {1}".format(self._name,firstRange))
self._logger.debug("{0} : last range : {1}".format(self._name,currentRangeFiles))
webpage = []
for i in range(firstRange, currentRangeFiles):
webpage.append(files[i])
try:
string_webpage = json.dumps(webpage)
open("{0}/{1}.json".format(self._tmp, self._name), "wt").write(string_webpage)
except Exception as ex:
self._logger.error("{0} : Error for writing webpage : {1}".format(self._name, ex))
## Get all files
def _getFiles(self, item):
files = []
for i in item:
for j in os.listdir(i):
if os.path.isfile("{0}/{1}".format(i, j)):
files.append("{0}/{1}".format(i, j))
return files
## Get directories
def _getDirectories(self, subdirectory, item):
sub = subdirectory
for i in os.listdir(item):
if os.path.isdir("{0}/{1}".format(item, i)):
sub.append("{0}/{1}".format(item, i))
subdirectory = self._getDirectories(sub, "{0}/{1}".format(item, i))
return subdirectory
## Change path img file
def _change(self, file):
ext_img = ["png", "svg", "gif", "jpg", "jpeg"]
try:
with open(file, 'r') as f:
content = f.read()
soup = BeautifulSoup(content, self._parser)
img = soup.find_all("img")
for i in img:
src = i.get("src")
o = urlparse(src)
if len(o.netloc) > 0:
self._logger.info("{0} : Change source image {1} /img/{2}/{3}".format(self._name, src, o.netloc, o.path))
content = content.replace(src, "/img/{0}/{1}".format(o.netloc, o.path))
script = soup.find_all("script", {"type": "text/javascript"})
for i in script:
src = i.get("src")
if src is not None:
o = urlparse(src)
if len(o.netloc) > 0:
self._logger.info("{0} : Change source js {1} /dists/js/{2}/{3}".format(self._name, src, o.netloc, o.path))
content = content.replace(src, "/dists/js/{0}/{1}".format(o.netloc, o.path))
link = soup.find_all("link", {"rel": "stylesheet"})
for i in link:
href = i.get("href")
if href is not None:
o = urlparse(href)
if len(o.netloc) > 0:
self._logger.info("{0} : Change source css {1} /dists/css/{2}/{3}".format(self._name, href, o.netloc, o.path))
content = content.replace(href, "/dists/css/{0}/{1}".format(o.netloc, o.path))
a = soup.find_all("a", {"target": "_blank"})
for i in a:
href = i.get("href")
if href is not None:
o = urlparse(href)
if len(o.netloc) > 0:
ext = o.path.split(".")[len(o.path.split("."))-1]
if ext in ext_img:
self._logger.info("{0} : Change a img {1} /img/{2}/{3}".format(self._name, href, o.netloc, o.path))
content = content.replace(href, "/img/{0}/{1}".format(o.netloc, o.path))
try:
with open(file, "w") as f:
self._logger.info("{0} : File write : {1}".format(self._name, file))
f.write(content)
except Exception as ex:
self._logger.error("{0} : Error for write file {1} : {2}".format(self._name, file, ex))
except Exception as ex:
self._logger.error("{0} : Error for read file {1} : {2}".format(self._name, file, ex))

View File

@ -1,41 +1,27 @@
#!/usr/bin/python3
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import requests, os, argparse, logging, json
import requests, os, argparse, logging
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
class WPExport:
def __init__(self, name = "Thread-0", url = "", logger = None, parser = "html.parser", directory = "backup", ssl_canalblog=True, tmp="/tmp/import_export_canablog"):
def __init__(self, url, logger, parser, directory):
self._url = url
self._logger = logger
self._parser = parser
self._dir = directory
self._name = name
self._protocol = "https"
if ssl_canalblog is False:
self._protocol = "http"
self._request = requests.Session()
retries = Retry(total=10,
status_forcelist=[429, 500, 502, 503, 504], backoff_factor=2)
self._request.mount('{0}://'.format(self._protocol), HTTPAdapter(max_retries=retries))
self._tmp = tmp
self._request.mount('http://', HTTPAdapter(max_retries=retries))
# Destructor
def __del__(self):
self._logger.info("{0} : Export finished for {1}".format(self._name, self._url))
# Public method
# Set name
def setName(self, name):
self._name = "Thread-{0}".format(int(name) + 1)
# Set URL
def setUrl(self, url):
@ -57,131 +43,77 @@ class WPExport:
# Download HTML
def downloadHTML(self, first, second):
try:
with open("{0}/{1}.json".format(self._tmp, self._name)) as file:
webpage = json.loads(file.read())
self._downloadPage(webpage[first][second], self._dir)
except Exception as ex:
self._logger.error("{0} : Read file json from tmp : {1}".format(self._name, ex))
def downloadHTML(self, webpage):
self._downloadPage(webpage, self._dir)
# Download Image
def downloadImg(self, first, second):
try:
with open("{0}/{1}.json".format(self._tmp, self._name)) as file:
webpage = json.loads(file.read())
page_src = self._getImg(webpage[first][second])
o = urlparse(self._url)
self._downloadPage(page_src, "{0}/{1}/{2}".format(self._dir, o.path, "img"))
except Exception as ex:
self._logger.error("{0} : Read file json from tmp : {1}".format(self._name, ex))
def downloadImg(self, webpage):
page_src = self._getImg(webpage)
o = urlparse(self._url)
self._downloadPage(page_src, "{0}/{1}/{2}".format(self._dir, o.path, "img"))
# Get URL
def getUrlPage(self, index_thread, max_thread):
def getUrlPage(self):
try:
page = self._request.get(self._url)
page_url = []
if page.status_code == 200:
soup = BeautifulSoup(page.text, self._parser)
ul = soup.find_all("ul", id="listsmooth")
for anchor in ul[0].find_all("a"):
href = anchor.get('href', '/')
if href != "#":
page_url.append(href)
else:
self._logger.error("{0} : URL did not get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error : {1}".format(self._name, err))
self._logger.error("Connection error : {0}".format(err))
exit(1)
page_url = []
if page.status_code == 200:
soup = BeautifulSoup(page.text, self._parser)
ul = soup.find_all("ul", id="listsmooth")
for anchor in ul[0].find_all("a"):
href = anchor.get('href', '/')
if href != "#":
page_url.append(href)
webpage = {"principal": {"page":[], "article":[]}, "publications": {"page":[], "article":[]}}
webpage = []
for i in page_url:
section = "publications"
o = urlparse(i)
o = o._replace(scheme=self._protocol)
i = o.geturl().replace(":///", "://")
if i == "{0}/".format(self._url):
section = "principal"
try:
page = self._request.get(i)
if page.status_code == 200:
self._logger.info("{0} : page : {1}".format(self._name, i))
if i not in webpage[section]["page"]:
webpage[section]["page"].append(i)
soup = BeautifulSoup(page.text, self._parser)
class_div = soup.find_all("div", class_="pagingfirstline")
if len(class_div) > 0:
pagingfirstline = class_div[0].find_all("a")
if len(pagingfirstline) > 1:
lastpage = pagingfirstline[len(pagingfirstline)-1].get("href", "/")
self._logger.debug("{0} : Last page {1}".format(self._name, lastpage))
element_lastpage = lastpage.split("/")[len(lastpage.split("/"))-1]
number_page = element_lastpage.split("-")[0].split("p")[1]
number_lastpage = int(number_page) / 10
setPageDivided = int(number_lastpage) / max_thread
if setPageDivided > int(setPageDivided):
setPageDivided = setPageDivided + 1
setPagePart = setPageDivided * (index_thread + 1)
firstPagePart = (setPagePart - setPageDivided)
self._logger.debug("{0} : Total page : {1}".format(self._name,int(number_lastpage)))
self._logger.debug("{0} : First range : {1}".format(self._name, int(firstPagePart)))
self._logger.debug("{0} : Last range : {1}".format(self._name, int(setPagePart)))
for j in range(int(firstPagePart),int(setPagePart)+1):
paging = j * 10
categorie = urlparse(i).path.split("/")
url_paging = "{0}/archives/p{1}-10.html".format(self._url, paging)
if len(categorie) > 2:
url_paging = "{0}/archives/{1}/p{2}-10.html".format(self._url, categorie[2], paging)
self._logger.info("{0} : {1}".format(self._name, url_paging))
if url_paging not in webpage[section]["page"]:
webpage[section]["page"].append(url_paging)
page = self._request.get(url_paging)
if page.status_code == 200:
soup = BeautifulSoup(page.text, self._parser)
h2 = soup.find_all("h2")
self._logger.debug("{0} : {1} H2 : {2}".format(self._name, url_paging, h2))
for title in h2:
self._logger.debug("{0} : {1} a : {2}".format(self._name, url_paging, title.find_all("a")))
href = title.find_all("a")[0].get("href", "/")
if href not in webpage[section]["article"]:
try:
o = urlparse(href)
o = o._replace(scheme="https").geturl()
webpage[section]["article"].append(o)
except Exception as err:
self._logger.error("parsing error : {0}".format(err))
exit(1)
else:
self._logger.error("{0} : web didn't get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error : {1}".format(self._name, err))
self._logger.error("Connection error : {0}".format(err))
exit(1)
try:
string_webpage = json.dumps(webpage)
open("{0}/{1}.json".format(self._tmp, self._name), "wt").write(string_webpage)
except Exception as ex:
self._logger.error("{0} : Error for writing webpage : {1}".format(self._name, ex))
if page.status_code == 200:
self._logger.info("page : {0}".format(i))
if i not in webpage:
webpage.append(i)
soup = BeautifulSoup(page.text, self._parser)
class_div = pagingfirstline = soup.find_all("div", class_="pagingfirstline")
if len(class_div) > 0:
pagingfirstline = class_div[0].find_all("a")
if len(pagingfirstline) > 1:
lastpage = pagingfirstline[len(pagingfirstline)-1].get("href", "/")
element_lastpage = lastpage.split("/")[len(lastpage.split("/"))-1]
number_page = element_lastpage.split("-")[0].split("p")[1]
number_lastpage = int(number_page) / 10
for j in range(1,int(number_lastpage)):
paging = j * 10
categorie = urlparse(i).path.split("/")
url_paging = "{0}/archives/p{1}-10.html".format(self._url, paging)
if len(categorie) > 2:
url_paging = "{0}/archives/{1}/p{2}-10.html".format(self._url, categorie[2], paging)
self._logger.info(url_paging)
if url_paging not in webpage:
webpage.append(url_paging)
page = self._request.get(url_paging)
if page.status_code == 200:
soup = BeautifulSoup(page.text, self._parser)
h2 = soup.find_all("h2")
for title in h2:
href = title.find_all("a")[0].get("href", "/")
if href not in webpage:
try:
o = urlparse(href)
o = o._replace(scheme="https").geturl()
except Exception as err:
self._logger.error("parsing error : {0}".format(err))
exit(1)
webpage.append(o)
return webpage
# Private method
@ -195,7 +127,7 @@ class WPExport:
makedir.append(i)
repath = "/".join(makedir)
if not os.path.exists(repath):
self._logger.debug("{0} : Dossier crée : {1}".format(self._name, repath))
self._logger.debug("Dossier crée : {0}".format(repath))
try:
if len(repath) > 0:
os.mkdir(repath)
@ -209,53 +141,44 @@ class WPExport:
def _getScriptCss(self, js, css):
try:
page = self._request.get(self._url)
page_url = []
if page.status_code == 200:
soup = BeautifulSoup(page.text, self._parser)
if js is True:
script = soup.find_all("script")
for anchor in script:
src = anchor.get("src", "/")
if src != "/":
except Exception as err:
self._logger.error("Connection error : {0}".format(err))
exit(1)
page_url = []
if page.status_code == 200:
soup = BeautifulSoup(page.text, self._parser)
if js is True:
script = soup.find_all("script")
for anchor in script:
src = anchor.get("src", "/")
if src != "/":
try:
u = urlparse(self._url)
o = urlparse(src)
except Exception as err:
self._logger.error("parsing error : {0}".format(err))
exit(1)
if o.netloc == "":
o = o._replace(netloc=u.netloc)
o = o._replace(scheme=u.scheme)
page_url.append(o.geturl())
if css is True:
link = soup.find_all("link")
for anchor in link:
rel = anchor.get("rel")
if rel[0] == "stylesheet":
href = anchor.get("href", "/")
if href != "/":
try:
u = urlparse(self._url)
o = urlparse(src)
if o.netloc == "":
o = o._replace(netloc=u.netloc)
o = o._replace(scheme=u.scheme)
page_url.append(o.geturl())
o = urlparse(href)
except Exception as err:
self._logger.error("parsing error : {0}".format(err))
exit(1)
if css is True:
link = soup.find_all("link")
for anchor in link:
rel = anchor.get("rel")
if rel[0] == "stylesheet":
href = anchor.get("href", "/")
if href != "/":
try:
u = urlparse(self._url)
o = urlparse(href)
if o.netloc == "":
o = o._replace(netloc=u.netloc)
o = o._replace(scheme=u.scheme)
page_url.append(o.geturl())
except Exception as err:
self._logger.error("parsing error : {0}".format(err))
exit(1)
else:
self._logger.error("JS or CSS did not get due status code : {0}".format(page.status_code))
self._logger.debug(page.content)
except ConnectionError as err:
self._logger.error("Connection error : {0}".format(err))
exit(1)
except Exception as err:
self._logger.error("Exception error : {0}".format(err))
if o.netloc == "":
o = o._replace(netloc=u.netloc)
o = o._replace(scheme=u.scheme)
page_url.append(o.geturl())
return page_url
# Get image
@ -265,26 +188,19 @@ class WPExport:
for i in webpage:
try:
page = self._request.get(i)
if page.status_code == 200:
soup = BeautifulSoup(page.text, self._parser)
img = soup.find_all("img")
self._logger.info("{0} : image from page: {1} : ".format(self._name,i))
for anchor in img:
src = anchor.get("src", "/")
if src != "/":
if src not in page_img:
self._logger.info("{0} : image: {1} : ".format(self._name, src))
page_img.append(src)
else:
self._logger.error("{0} : Image did not get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error : {1}".format(self._name, err))
self._logger.error("Connection error : {0}".format(err))
exit(1)
if page.status_code == 200:
soup = BeautifulSoup(page.text, self._parser)
img = soup.find_all("img")
self._logger.info("image from page: {0} : ".format(i))
for anchor in img:
src = anchor.get("src", "/")
if src != "/":
if src not in page_img:
self._logger.info("image: {0} : ".format(src))
page_img.append(src)
return page_img
@ -294,33 +210,26 @@ class WPExport:
for i in range(0, len(webpage)):
try:
o = urlparse(webpage[i])
path_web = o.path.split("/")
filePageWeb = path_web[len(path_web)-1]
path_web.pop(len(path_web)-1)
dir_page_web = "/".join(path_web)
self._mkdirPath("{0}/{1}/{2}".format(backup_dir, o.netloc, dir_page_web))
try:
r = self._request.get(webpage[i])
if r.status_code == 200:
fileDownload = "{0}/{1}/index.html".format(backup_dir, o.netloc)
if len(dir_page_web) > 0 and len(filePageWeb) > 0:
fileDownload = "{0}/{1}{2}/{3}".format(backup_dir, o.netloc, dir_page_web, filePageWeb)
self._logger.info("{0} : {1}/{2} : {3}".format(self._name, i+1, len(webpage), fileDownload))
try:
open(fileDownload, "wb").write(r.content)
except Exception as err:
self._logger.error("file error : {0}".format(err))
exit(1)
else:
self._logger.error("Not download due status code : {0}".format(r.status_code))
self._logger.debug(r.content)
except ConnectionError as err:
self._logger.error("{0} : Connection error : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} Exception error : {1}".format(self._name, err))
except Exception as err:
self._logger.error("parsing error : {0}".format(err))
exit(1)
path_web = o.path.split("/")
filePageWeb = path_web[len(path_web)-1]
path_web.pop(len(path_web)-1)
dir_page_web = "/".join(path_web)
self._mkdirPath("{0}/{1}/{2}".format(backup_dir, o.netloc, dir_page_web))
try:
r = self._request.get(webpage[i])
except Exception as err:
self._logger.error("Connection error : {0}".format(err))
exit(1)
if r.status_code == 200:
fileDownload = "{0}/{1}/index.html".format(backup_dir, o.netloc)
if len(dir_page_web) > 0 and len(filePageWeb) > 0:
fileDownload = "{0}/{1}{2}/{3}".format(backup_dir, o.netloc, dir_page_web, filePageWeb)
self._logger.info("{0}/{1} : {2}".format(i+1, len(webpage), fileDownload))
try:
open(fileDownload, "wb").write(r.content)
except Exception as err:
self._logger.error("file error : {0}".format(err))
exit(1)

File diff suppressed because it is too large Load Diff

View File

@ -1,394 +0,0 @@
#!/usr/bin/python3
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import requests, os, logging, re, json
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
class WPMenu:
# Constructor
def __init__(self, name="Thread-0", basic=None, canalblog="", wordpress="", logger=None, parser="html.parser", ssl_canalblog=True, ssl_wordpress=True):
self._name = name
self._basic = basic
self._canalblog = canalblog
self._wordpress = wordpress
self._logger = logger
self._parser = parser
self._headers_json = {'Content-Type': 'application/json; charset=utf-8', 'Accept':'application/json'}
self._protocol_wordpress = "https"
self._protocol_canalblog = "https"
self._directory = "backup"
if ssl_wordpress is False:
self._protocol_wordpress = "http"
if ssl_canalblog is False:
self._protocol_canalblog = "http"
self._request_canalblog = requests.Session()
self._request_wordpress = requests.Session()
retries = Retry(connect=10, read=10, redirect=5,
status_forcelist=[429, 500, 502, 503, 504], backoff_factor=2)
self._request_canalblog.mount('{0}://'.format(self._protocol_canalblog), HTTPAdapter(max_retries=retries))
self._request_wordpress.mount('{0}://'.format(self._protocol_wordpress), HTTPAdapter(max_retries=retries))
# Destructor
def __del__(self):
print("{0} : Import finished for {1}".format(self._name, self._wordpress))
# Public method
## From file
def fromFile(self, files):
if os.path.exists(files):
with open(files, 'r') as f:
self._logger.info("{0} : File is being processed : {1}".format(self._name, files))
content = f.read()
self._menu(content)
else:
self._logger.error("{0} : File isn't exist : {1}".format(self._name, files))
## Get from URL
def fromUrl(self, canalblog):
self._canalblog = canalblog
try:
o = urlparse(canalblog)
o = o._replace(scheme=self._protocol_canalblog)
i = o.geturl().replace(":///", "://")
page = self._request_canalblog.get(i)
if page.status_code == 200:
self._logger.info("{0} : Page web is being processed : {1}".format(self._name, i))
self._menu(page.content)
else:
self._logger.error("{0} : index didn't get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get url {1} : {2}".format(self._name, canalblog, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get url {1} : {2}".format(self._name, canalblog, err))
## replace caracter
def _replaceCaracter(self, title_rendered):
list_replace = {'’': "'", '–': '-', '…': '...', '« ': '"', ' »': '"', '« ': '"', ' »': '"', '’': "'", '"‘': "'"}
for old, new in list_replace.items():
title_rendered = title_rendered.replace(old, new)
return title_rendered
def _getIdfromTitlePost(self, content):
idMenu = {"id":0, "type":"", "link":""}
soup = BeautifulSoup(content, self._parser)
articletitle = soup.find_all("h2", class_="articletitle")
if len(articletitle) > 0:
articletitle = articletitle[0].get_text()
search = "posts"
post_type = "post"
if len(articletitle) == 0:
articletitle = soup.find_all("div", class_="albumbody")
if len(articletitle) > 0:
articletitle = articletitle[0].find("h2").get_text()
search = "pages"
post_type = "page"
exist = False
for index in range(1,10):
if exist is False:
params = {"search":articletitle, "per_page":100, "page":index}
try:
self._logger.debug("{0} : Get Url for {3} : {1} {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/{2}".format(self._wordpress, self._protocol_wordpress, search), params, search))
page = self._request_wordpress.get("{1}://{0}/wp-json/wp/v2/{2}".format(self._wordpress, self._protocol_wordpress, search), auth=self._basic, params=params)
if page.status_code == 200:
result = page.json()
self._logger.info("{0} : Get content {2} : {1}".format(self._name, len(result), search))
if len(result) > 0:
for i in result:
title_rendered = i["title"]["rendered"]
if len(articletitle) != len(title_rendered):
title_rendered = self._replaceCaracter(title_rendered)
self._logger.debug("{0} : comparaison debug {1} {2}".format(self._name, articletitle, title_rendered))
if articletitle == title_rendered:
self._logger.debug("{0} : get {2} id : {1}".format(self._name, i, search))
idMenu = {"id":i["id"], "type":post_type, "link": i["link"]}
exist = True
else:
self._logger.debug("{0} : {2} {1}".format(self._name, result, len(result)))
break
elif page.status_code == 400:
self._logger.debug("{0} : {2} {1}".format(self._name, page.content, page.status_code))
break
else:
self._logger.error("{0} : Post didn't get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get content : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get content : {1} ".format(self._name, err))
return idMenu
def _getIdFromPost(self, href):
idMenu = {"id":0, "type":"", "link":""}
o = urlparse(href)
if len(o.netloc) > 0:
try:
page = self._request_canalblog.get(href)
if page.status_code == 200:
self._logger.info("{0} : Get content : {1}".format(self._name, href))
idMenu = self._getIdfromTitlePost(page.content)
else:
self._logger.error("{0} : {2} didn't get due status code : {1}".format(self._name, page.status_code, href))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get url {1} : {2}".format(self._name, href, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get url {1} : {2}".format(self._name, href, err))
else:
if os.path.exists("{0}/..{1}".format(self._directory, o.path)):
try:
content = open("{0}/..{1}".format(self._directory, o.path), "r")
idMenu = self._getIdfromTitlePost(content)
except Exception as err:
self._logger.error("{0} : Exception error for get file content {1} : {2}".format(self._name, href, err))
return idMenu
def _getIdFromReverse(self, title, href):
self._logger.info("{0} : get title {1} from href {2}".format(self._name, title, href))
idMenu = {"id":0, "type":"", "link":""}
if href != "#":
title = href[::-1]
second_title = title.split("/")[2]
second_title = second_title[::-1]
link = title.split("/")[0]
link = link[::-1]
title = title.split("/")[1]
title = title[::-1]
self._logger.info("{0} link {1} title {2}".format(self._name, link, title))
if link == "index.html":
if second_title == "albums":
idMenu = self._getIdFromPost(href)
else:
idMenu = self._getId(title)
else:
idMenu = self._getIdFromPost(href)
return idMenu
def _getId(self, title):
idMenu = {"id": 0, "type":"", "link":""}
exist = False
if exist is False:
for i in ["categories", "tags"]:
typeId = "category"
if i == "tags":
typeId = "tag"
for index in range(1,10):
try:
params = {"search":title, "per_page":"100", "page":index}
self._logger.info("{0} Get menu {1} {2} {3}".format(self._name, "{2}://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i, self._protocol_wordpress), index, title))
page = self._request_wordpress.get("{2}://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i, self._protocol_wordpress), auth=self._basic, params=params)
if page.status_code == 200:
result = page.json()
if len(result) > 0:
for j in result:
self._logger.info("{0} info : {1} {2} {3}".format(self._name, j["name"], j["slug"], title))
if j["name"] == title or j["slug"] == title:
self._logger.info("{0} : comparaison ok : {1} {2}".format(self._name, j["id"], i))
idMenu = {"id": j["id"], "type": typeId, "link": j["link"]}
exist = True
else:
break
elif page.status_code == 400:
break
else:
self._logger.error("{0} : {2} didn't get due status code : {1}".format(self._name, page.status_code, i))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get url {1} : {2}".format(self._name, "{2}://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i, self._protocol_wordpress), err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get url {1} : {2}".format(self._name, "{2}://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i, self._protocol_wordpress), err))
return idMenu
def _menu(self, content):
soup = BeautifulSoup(content, self._parser)
ul = soup.find("ul", id="listsmooth")
menu = list()
children = list()
for anchor in ul.find_all("li"):
parent = anchor.find("a").get_text().replace(" \xa0", "")
href = anchor.find("a").get("href")
if href == "{0}://{1}/".format(self._protocol_canalblog, self._canalblog):
parent = "home"
itemMenu = {"id":"", "type":"", "title": parent, "link":"", "href":href, "children":list()}
if href == "#":
li = anchor.find("ul").find_all("li")
for child in li:
a = child.find("a")
href = a.get("href")
self._logger.info("{0} Parent {1} : Child {2}".format(self._name, parent, a.get_text()))
children.append({"title": a.get_text(), "parent": parent, "href":href, "link":""})
menu.append(itemMenu)
for i in range(0, len(children)):
self._logger.info("{0} : Child {1} {2}".format(self._name, children[i], i))
for j in range(0, len(menu)):
if j < len(menu):
if menu[j]["title"] == children[i]["title"]:
self._logger.info("{0} : Parent {1} {2}".format(self._name, menu[j], j))
del menu[j]
for j in range(0, len(menu)):
self._logger.info("{0} : Children for : {1}".format(self._name, menu[j]["title"]))
if menu[j]["title"] == children[i]["parent"]:
menu[j]["children"].append({"id":"", "type":"", "title":children[i]["title"], "parent": children[i]["parent"], "link":"", "href":children[i]["href"]})
for i in range(0, len(menu)):
self._logger.info("{0} : Menu {1} {2}".format(self._name, menu[i]["title"], len(menu[i]["children"])))
if menu[i]["title"] != "home":
for j in range(0, len(menu[i]["children"])):
idMenu = self._getId(menu[i]["children"][j]["title"])
if idMenu["id"] == 0:
self._logger.debug("{0} : content children {1}".format(self._name, menu[i]["children"][j]))
idMenu = self._getIdFromReverse(menu[i]["children"][j]["title"], menu[i]["children"][j]["href"])
if idMenu["id"] != 0:
menu[i]["children"][j] = {"id":idMenu["id"], "type": idMenu["type"], "link": idMenu["link"], "title": menu[i]["children"][j]["title"], "parent": menu[i]["children"][j]["parent"]}
idMenu = self._getId(menu[i]["title"])
self._logger.debug("{0} : content parent {1}".format(self._name, menu[i]))
self._logger.debug("{0} : content idMenu {1}".format(self._name, idMenu))
if idMenu["id"] == 0:
idMenu = self._getIdFromReverse(menu[i]["title"], menu[i]["href"])
if idMenu["id"] != 0:
menu[i] = {"id":idMenu["id"], "type": idMenu["type"], "title":menu[i]["title"], "link":idMenu["link"], "children": menu[i]["children"]}
self._createMenu(menu)
def _createItemMenu(self, idMenu, itemMenu, parent):
idItemMenu = 0
self._logger.info("{0} : Create item menu from API Wordpress : {1}".format(self._name, self._wordpress))
try:
params = {"search": itemMenu["title"], "menus": idMenu}
page = self._request_wordpress.get("{1}://{0}/wp-json/wp/v2/menu-items".format(self._wordpress, self._protocol_wordpress), auth=self._basic, params=params)
if page.status_code == 200:
result = page.json()
for i in result:
if self._replaceCaracter(i["title"]["rendered"]) == itemMenu["title"]:
idItemMenu = int(i["id"])
self._logger.info("{0} : Length of result for item menus : {1}".format(self._name, len(result)))
url = "{1}://{0}/wp-json/wp/v2/menu-items".format(self._wordpress, self._protocol_wordpress)
if idItemMenu != 0:
url = "{1}://{0}/wp-json/wp/v2/menu-items/{2}".format(self._wordpress, self._protocol_wordpress, idItemMenu)
try:
objectt = itemMenu["type"]
if objectt == "tag":
objectt = "post_tag"
data = {"title": itemMenu["title"], "status": "publish", "parent":parent, "menus":idMenu, "url":"#"}
if itemMenu["title"] == "home":
data = {"title": itemMenu["title"], "status": "publish", "parent":parent, "menus":idMenu, "url":"{0}://{1}".format(self._protocol_wordpress, self._wordpress)}
if type(itemMenu["id"]) is str:
if len(itemMenu["id"]) > 0:
data = {"title": itemMenu["title"], "status": "publish", "parent":parent, "url": itemMenu["link"], "menus":idMenu, "object":objectt, "object_id":int(itemMenu["id"])}
elif type(itemMenu["id"]) is int:
data = {"title": itemMenu["title"], "status": "publish", "parent":parent, "url": itemMenu["link"], "menus":idMenu, "object":objectt, "object_id":itemMenu["id"]}
self._logger.debug("{0} : data for create/update : {1}".format(self._name, data))
page = self._request_wordpress.post(url, auth=self._basic, headers=self._headers_json, data=json.dumps(data))
if page.status_code in [201, 200]:
result = page.json()
idItemMenu = int(result["id"])
self._logger.info("{0} : create/update item menu : {1}".format(self._name, itemMenu["title"]))
else:
self._logger.error("{0} : Create menu items for {2} didn't get due status code : {1}".format(self._name, page.status_code, itemMenu["title"]))
self._logger.debug("{0} : {1} {2}".format(self._name, page.content, itemMenu))
except ConnectionError as err:
self._logger.error("{0} : Connection error for create item menu {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menu-items".format(self._wordpress, self._protocol_wordpress), err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for create item menu {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menu-items".format(self._wordpress, self._protocol_wordpress), err))
else:
self._logger.error("{0} : Get menu items didn't get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get item menus {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menu-items".format(self._wordpress, self._protocol_wordpress), err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get item menus {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menu-items".format(self._wordpress, self._protocol_wordpress), err))
return idItemMenu
def _createMenu(self, menu):
title = "Menu {0}".format(self._wordpress)
self._logger.info("{0} : Create menu from API Wordpress : {1}".format(self._name, title))
try:
params = {"search": title}
page = self._request_wordpress.get("{1}://{0}/wp-json/wp/v2/menus".format(self._wordpress, self._protocol_wordpress), auth=self._basic, params=params)
if page.status_code == 200:
result = page.json()
self._logger.info("{0} : Get content menus : {1}".format(self._name, len(result)))
idMenu = 0
if len(result) == 0:
self._logger.info("{0} : Create menu : {1}".format(self._name, title))
data = {"name": title}
try:
page = self._request_wordpress.post("{1}://{0}/wp-json/wp/v2/menus".format(self._wordpress, self._protocol_wordpress), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
if page.status_code == 201:
result = page.json()
self._logger.debug("{0} : Get menus : {1}".format(self._name, result))
if len(result) > 0:
idMenu = result["id"]
else:
self._logger.error("{0} : Post menu didn't get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for create menu {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menus".format(self._wordpress, self._protocol_wordpress), err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get menu {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menus".format(self._wordpress, self._protocol_wordpress), err))
else:
self._logger.debug("{0} : Get menus : {1}".format(self._name, result))
for i in result:
self._logger.debug("{0} : List menus : {1}".format(self._name, i))
if i["name"] == title:
idMenu = i["id"]
self._logger.info("{0} : Get ID menus : {1}".format(self._name, idMenu))
self._addItemMenu(menu, idMenu)
else:
self._logger.error("{0} : Get menu didn't get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get menu {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menus".format(self._wordpress, self._protocol_wordpress), err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get menu {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menus".format(self._wordpress, self._protocol_wordpress), err))
def _addItemMenu(self, menu, idMenu):
self._logger.info("{0} : add item to menu : {1}".format(self._name, idMenu))
parent = 0
for i in menu:
parent = 0
self._logger.debug("{0} : debug create item menu : {1}".format(self._name, i))
parent = self._createItemMenu(idMenu, i, parent)
for j in i["children"]:
self._createItemMenu(idMenu, j, parent)

View File

@ -1,128 +0,0 @@
#!/usr/bin/python3
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import requests, os, logging, re, json
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
class WPRemove:
# Constructor
def __init__(self, index_name=1, number_thread=1, basic=None, wordpress="", logger=None, ssl_wordpress=True):
self._basic = basic
self._wordpress = wordpress
self._logger = logger
self._headers_json = {'Content-Type': 'application/json', 'Accept':'application/json'}
self._name = "Thread-{0}".format(index_name)
self._index_thread = index_name
self._protocol = "https"
self._number_thread = number_thread
if ssl_wordpress is False:
self._protocol = "http"
self._request = requests.Session()
retries = Retry(connect=10, read=10, redirect=5,
status_forcelist=[429, 500, 502, 503, 504], backoff_factor=2)
self._request.mount('{0}://'.format(self._protocol), HTTPAdapter(max_retries=retries))
# Destructor
def __del__(self):
print("{0} : Import finished for {1}".format(self._name, self._wordpress))
# Public method
def _getCount(self, composant):
count = 0
try:
params = {"per_page":1}
self._logger.info("{0} : Get count {2} to remove for url : {1}".format(self._name, self._wordpress, composant))
r = self._request.get("{2}://{0}/wp-json/wp/v2/{1}".format(self._wordpress, composant, self._protocol), params=params, auth=self._basic, headers=self._headers_json)
if r.status_code == 200:
count = int(r.headers["X-WP-Total"])
else:
self._logger.error("{0} : Error for list to remove {1} due status code {2}".format(self._name, composant, r.status_code))
self._logger.debug("{0} : Content error for {1} : {2}".format(self._name, composant, r.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for list {1} to remove : {2}".format(self._name, composant, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for list {1} to remove : {2}".format(self._name, composant, err))
return count
def setUrl(self, wordpress):
self._wordpress = wordpress
def cleanPosts(self):
self._removeAll("posts")
def cleanTags(self):
self._removeAll("tags")
def cleanCategories(self):
self._removeAll("categories")
def cleanMedia(self):
self._removeAll("media")
# Private method
def _removeAll(self, composant):
count = self._getCount(composant)
self._logger.debug("{0} : Count for {1} : {2}".format(self._name, composant, count))
if count > 0:
self._logger.debug("{0} : Number thread for {1} : {2}".format(self._name, composant, self._number_thread))
page = count / int(self._number_thread)
self._logger.debug("{0} : Page for {1} : {2}".format(self._name, composant, page))
if page > int(page):
page = int(page) + 1
if page > 100:
page = 100
params = {"per_page":page, "page":self._index_thread}
self._logger.info("{0} : Params for {1} : {2}".format(self._name, composant, params))
try:
self._logger.info("{0} : List {2} to remove for url : {1}".format(self._name, self._wordpress, composant))
r = self._request.get("{2}://{0}/wp-json/wp/v2/{1}".format(self._wordpress, composant, self._protocol), auth=self._basic, params=params, headers=self._headers_json)
if r.status_code == 200:
result = r.json()
if len(result) > 0:
for i in result:
is_delete = True
self._logger.info(i["slug"])
if i["slug"] == "non-classe":
is_delete = False
if is_delete is True:
if composant == "tags" or composant == "categories":
title = i["name"]
else:
title = i["title"]["rendered"]
self._logger.info("{0} : Remove {2} for url {1} : {3}".format(self._name, self._wordpress, composant, title))
params = {"force":1}
try:
r = self._request.delete("{3}://{0}/wp-json/wp/v2/{1}/{2}".format(self._wordpress, composant, i["id"], self._protocol), auth=self._basic, headers=self._headers_json , params=params)
if r.status_code == 200:
self._logger.info("{0} : Post removed for URL {1} {2} : {3}".format(self._name, self._wordpress, composant, title))
else:
self._logger.error("{0} : Connection error for post {1} {2} {3} with status code {4}".format(self._name, self._wordpress, composant, title, r.status_code))
except ConnectionError as err:
self._logger.error("{0} : Connection error for {1} remove : {2}".format(self._name, composant, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for {1} remove : {2}".format(self._name, composant, err))
self._removeAll(composant)
if r.status_code == 400:
self._logger.error("{0} : No content for {1} to remove : {2}".format(self._name, composant, r.status_code))
else:
self._logger.error("{0} : Error for list to remove {1} due status code {2}".format(self._name, composant, r.status_code))
self._logger.debug("{0} : Content error for {1} : {2}".format(self._name, composant, r.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for list {1} to remove : {2}".format(self._name, composant, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for list {1} to remove : {2}".format(self._name, composant, err))