Merge pull request 'menu-api' (#22) from menu-api into master

Reviewed-on: #22
This commit is contained in:
v4l3n71n 2023-08-04 21:22:38 +00:00
commit 02f0c20bd0
2 changed files with 406 additions and 2 deletions

View File

@ -10,6 +10,7 @@ from lib.WPImport import WPimport
from lib.WPExport import WPExport from lib.WPExport import WPExport
from lib.WPRemove import WPRemove from lib.WPRemove import WPRemove
from lib.WPChange import WPChange from lib.WPChange import WPChange
from lib.WPMenu import WPMenu
def errorRevert(logger, revert, tmp): def errorRevert(logger, revert, tmp):
if revert is True: if revert is True:
@ -203,6 +204,12 @@ if __name__ == '__main__':
default="", default="",
help="File") help="File")
menu_parser = subparsers.add_parser("menu")
menu_parser.add_argument("--user", help="wordpress user", required=True)
menu_parser.add_argument("--password", help="password wordpress's user", default="")
menu_parser.add_argument("--file", help="HTML file", default="")
menu_parser.add_argument("--canalblog", help="URL Canalblog", default="")
menu_parser.add_argument("--wordpress", help="URL Wordpress", required=True)
args = parser.parse_args() args = parser.parse_args()
@ -240,7 +247,7 @@ if __name__ == '__main__':
os.makedirs(args.tmp, exist_ok=True) os.makedirs(args.tmp, exist_ok=True)
if args.command == "import" or args.command == "remove": if args.command == "import" or args.command == "remove" or args.command == "menu":
password = args.password password = args.password
if len(args.password) == 0: if len(args.password) == 0:
password = getpass() password = getpass()
@ -256,6 +263,8 @@ if __name__ == '__main__':
for i in wordpress: for i in wordpress:
importWp.setUrl(i) importWp.setUrl(i)
importWp.fromFile(files=args.file.split(",")) importWp.fromFile(files=args.file.split(","))
menuWp = WPMenu(name="Thread-1", basic=basic, wordpress=args.wordpress, logger=logger, parser=args.parser, ssl_canalblog=ssl_canalblog, ssl_wordpress=ssl_wordpress)
menuWp.fromFile("{0}/index.html".format(args.file.split(",")[0]))
if len(args.directory) > 0: if len(args.directory) > 0:
try: try:
with futures.ThreadPoolExecutor(max_workers=int(args.parallel)) as ex: with futures.ThreadPoolExecutor(max_workers=int(args.parallel)) as ex:
@ -266,6 +275,9 @@ if __name__ == '__main__':
ex.submit(importDirectory, i, int(args.parallel), args.directory, logger, args.parser, args.wordpress, basic, args.serial, ssl_wordpress, args.create, args.update, args.image, args.revert) ex.submit(importDirectory, i, int(args.parallel), args.directory, logger, args.parser, args.wordpress, basic, args.serial, ssl_wordpress, args.create, args.update, args.image, args.revert)
for i in range(0, int(args.parallel)) for i in range(0, int(args.parallel))
] ]
wait(wait_for, return_when=ALL_COMPLETED)
menuWp = WPMenu(name="Thread-1", basic=basic, wordpress=args.wordpress, logger=logger, parser=args.parser, ssl_canalblog=ssl_canalblog, ssl_wordpress=ssl_wordpress)
menuWp.fromFile("{0}/index.html".format(args.directory))
except Exception as err: except Exception as err:
logger.error("Threading error : {0}".format(err)) logger.error("Threading error : {0}".format(err))
if len(args.canalblog) > 0: if len(args.canalblog) > 0:
@ -278,6 +290,10 @@ if __name__ == '__main__':
ex.submit(importUrl, i, int(args.parallel), args.canalblog, logger, args.parser, args.wordpress, basic, args.serial, ssl_wordpress, ssl_canalblog, args.create, args.update, args.image, args.revert, args.tmp) ex.submit(importUrl, i, int(args.parallel), args.canalblog, logger, args.parser, args.wordpress, basic, args.serial, ssl_wordpress, ssl_canalblog, args.create, args.update, args.image, args.revert, args.tmp)
for i in range(0, int(args.parallel)) for i in range(0, int(args.parallel))
] ]
wait(wait_for, return_when=ALL_COMPLETED)
menuWp = WPMenu(name="Thread-1", basic=basic, wordpress=args.wordpress, logger=logger, parser=args.parser, ssl_canalblog=ssl_canalblog, ssl_wordpress=ssl_wordpress)
menuWp.fromUrl(args.canalblog)
except Exception as err: except Exception as err:
@ -342,3 +358,11 @@ if __name__ == '__main__':
for filei in args.file.split(","): for filei in args.file.split(","):
changeWp.fromFile(filei) changeWp.fromFile(filei)
exit(0) exit(0)
if args.command == "menu":
menuWp = WPMenu(name="Thread-1", basic=basic, wordpress=args.wordpress, logger=logger, parser=args.parser, ssl_canalblog=ssl_canalblog, ssl_wordpress=ssl_wordpress)
if len(args.file) > 0:
menuWp.fromFile(args.file)
if len(args.canalblog) > 0:
menuWp.fromUrl(args.canalblog)
exit(0)

380
lib/WPMenu.py Normal file
View File

@ -0,0 +1,380 @@
#!/usr/bin/python3
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import requests, os, logging, re, json
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
class WPMenu:
# Constructor
def __init__(self, name="Thread-0", basic=None, canalblog="", wordpress="", logger=None, parser="html.parser", ssl_canalblog=True, ssl_wordpress=True):
self._name = name
self._basic = basic
self._canalblog = canalblog
self._wordpress = wordpress
self._logger = logger
self._parser = parser
self._headers_json = {'Content-Type': 'application/json; charset=utf-8', 'Accept':'application/json'}
self._protocol_wordpress = "https"
self._protocol_canalblog = "https"
self._directory = "backup"
if ssl_wordpress is False:
self._protocol_wordpress = "http"
if ssl_canalblog is False:
self._protocol_canalblog = "http"
self._request_canalblog = requests.Session()
self._request_wordpress = requests.Session()
retries = Retry(connect=10, read=10, redirect=5,
status_forcelist=[429, 500, 502, 503, 504], backoff_factor=2)
self._request_canalblog.mount('{0}://'.format(self._protocol_canalblog), HTTPAdapter(max_retries=retries))
self._request_wordpress.mount('{0}://'.format(self._protocol_wordpress), HTTPAdapter(max_retries=retries))
# Destructor
def __del__(self):
print("{0} : Import finished for {1}".format(self._name, self._wordpress))
# Public method
## From file
def fromFile(self, files):
if os.path.exists(files):
with open(files, 'r') as f:
self._logger.info("{0} : File is being processed : {1}".format(self._name, files))
content = f.read()
self._menu(content)
else:
self._logger.error("{0} : File isn't exist : {1}".format(self._name, files))
## Get from URL
def fromUrl(self, canalblog):
self._canalblog = canalblog
try:
o = urlparse(canalblog)
o = o._replace(scheme=self._protocol_canalblog)
i = o.geturl().replace(":///", "://")
page = self._request_canalblog.get(i)
if page.status_code == 200:
self._logger.info("{0} : Page web is being processed : {1}".format(self._name, i))
self._menu(page.content)
else:
self._logger.error("{0} : index didn't get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get url {1} : {2}".format(self._name, canalblog, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get url {1} : {2}".format(self._name, canalblog, err))
## replace caracter
def _replaceCaracter(self, title_rendered):
list_replace = {'’': "'", '–': '-', '…': '...', '« ': '"', ' »': '"', '« ': '"', ' »': '"', '’': "'", '"‘': "'"}
for old, new in list_replace.items():
title_rendered = title_rendered.replace(old, new)
return title_rendered
def _getIdfromTitlePost(self, content):
idMenu = {"id":0, "type":"", "link":""}
soup = BeautifulSoup(content, self._parser)
articletitle = soup.find("h2", class_="articletitle").get_text()
if len(articletitle) > 0:
articletitle = soup.find("h2").get_text()
exist = False
for index in range(1,10):
if exist is False:
params = {"search":articletitle, "per_page":100, "page":index}
try:
self._logger.debug("{0} : Get Url for post : {1} {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/posts".format(self._wordpress, self._protocol_wordpress), params))
page = self._request_wordpress.get("{1}://{0}/wp-json/wp/v2/posts".format(self._wordpress, self._protocol_wordpress), auth=self._basic, params=params)
if page.status_code == 200:
result = page.json()
self._logger.info("{0} : Get content post : {1}".format(self._name, len(result)))
if len(result) > 0:
for i in result:
title_rendered = i["title"]["rendered"]
if len(articletitle) != len(title_rendered):
title_rendered = self._replaceCaracter(title_rendered)
self._logger.debug("{0} : comparaison debug {1} {2}".format(self._name, articletitle, title_rendered))
if articletitle == title_rendered:
self._logger.debug("{0} : get post id : {1}".format(self._name, i))
idMenu = {"id":i["id"], "type":"post", "link": i["link"]}
exist = True
else:
self._logger.debug("{0} : {2} {1}".format(self._name, result, len(result)))
break
elif page.status_code == 400:
self._logger.debug("{0} : {2} {1}".format(self._name, page.content, page.status_code))
break
else:
self._logger.error("{0} : Post didn't get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get content : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get content : {1} ".format(self._name, err))
return idMenu
def _getIdFromPost(self, href):
idMenu = {"id":0, "type":"", "link":""}
o = urlparse(href)
if len(o.netloc) > 0:
try:
page = self._request_canalblog.get(href)
if page.status_code == 200:
self._logger.info("{0} : Get content : {1}".format(self._name, href))
idMenu = self._getIdfromTitlePost(page.content)
else:
self._logger.error("{0} : {2} didn't get due status code : {1}".format(self._name, page.status_code, href))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get url {1} : {2}".format(self._name, href, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get url {1} : {2}".format(self._name, href, err))
else:
if os.path.exists("{0}/..{1}".format(self._directory, o.path)):
try:
content = open("{0}/..{1}".format(self._directory, o.path), "r")
idMenu = self._getIdfromTitlePost(content)
except Exception as err:
self._logger.error("{0} : Exception error for get file content {1} : {2}".format(self._name, href, err))
return idMenu
def _getIdFromReverse(self, title, href):
self._logger.info("{0} : get title {1} from href {2}".format(self._name, title, href))
idMenu = {"id":0, "type":"", "link":""}
if href != "#":
title = href[::-1]
link = title.split("/")[0]
link = link[::-1]
title = title.split("/")[1]
title = title[::-1]
self._logger.info("{0} link {1} title {2}".format(self._name, link, title))
if link == "index.html":
idMenu = self._getId(title)
else:
idMenu = self._getIdFromPost(href)
return idMenu
def _getId(self, title):
idMenu = {"id": 0, "type":"", "link":""}
exist = False
if exist is False:
for i in ["categories", "tags"]:
typeId = "category"
if i == "tags":
typeId = "tag"
for index in range(1,10):
try:
params = {"search":title, "per_page":"100", "page":index}
self._logger.info("{0} Get menu {1} {2} {3}".format(self._name, "{2}://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i, self._protocol_wordpress), index, title))
page = self._request_wordpress.get("{2}://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i, self._protocol_wordpress), auth=self._basic, params=params)
if page.status_code == 200:
result = page.json()
if len(result) > 0:
for j in result:
self._logger.info("{0} info : {1} {2} {3}".format(self._name, j["name"], j["slug"], title))
if j["name"] == title or j["slug"] == title:
self._logger.info("{0} : comparaison ok : {1} {2}".format(self._name, j["id"], i))
idMenu = {"id": j["id"], "type": typeId, "link": j["link"]}
exist = True
else:
break
elif page.status_code == 400:
break
else:
self._logger.error("{0} : {2} didn't get due status code : {1}".format(self._name, page.status_code, i))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get url {1} : {2}".format(self._name, "{2}://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i, self._protocol_wordpress), err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get url {1} : {2}".format(self._name, "{2}://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i, self._protocol_wordpress), err))
return idMenu
def _menu(self, content):
soup = BeautifulSoup(content, self._parser)
ul = soup.find("ul", id="listsmooth")
menu = list()
children = list()
for anchor in ul.find_all("li"):
parent = anchor.find("a").get_text().replace(" \xa0", "")
href = anchor.find("a").get("href")
if href == "{0}://{1}/".format(self._protocol_canalblog, self._canalblog):
parent = "home"
itemMenu = {"id":"", "type":"", "title": parent, "link":"", "href":href, "children":list()}
if href == "#":
li = anchor.find("ul").find_all("li")
for child in li:
a = child.find("a")
href = a.get("href")
self._logger.info("{0} Parent {1} : Child {2}".format(self._name, parent, a.get_text()))
children.append({"title": a.get_text(), "parent": parent, "href":href, "link":""})
menu.append(itemMenu)
for i in range(0, len(children)):
self._logger.info("{0} : Child {1} {2}".format(self._name, children[i], i))
for j in range(0, len(menu)):
if j < len(menu):
if menu[j]["title"] == children[i]["title"]:
self._logger.info("{0} : Parent {1} {2}".format(self._name, menu[j], j))
del menu[j]
for j in range(0, len(menu)):
self._logger.info("{0} : Children for : {1}".format(self._name, menu[j]["title"]))
if menu[j]["title"] == children[i]["parent"]:
menu[j]["children"].append({"id":"", "type":"", "title":children[i]["title"], "parent": children[i]["parent"], "link":"", "href":children[i]["href"]})
for i in range(0, len(menu)):
self._logger.info("{0} : Menu {1} {2}".format(self._name, menu[i]["title"], len(menu[i]["children"])))
if menu[i]["title"] != "home":
for j in range(0, len(menu[i]["children"])):
idMenu = self._getId(menu[i]["children"][j]["title"])
if idMenu["id"] == 0:
self._logger.debug("{0} : content children {1}".format(self._name, menu[i]["children"][j]))
idMenu = self._getIdFromReverse(menu[i]["children"][j]["title"], menu[i]["children"][j]["href"])
if idMenu["id"] != 0:
menu[i]["children"][j] = {"id":idMenu["id"], "type": idMenu["type"], "link": idMenu["link"], "title": menu[i]["children"][j]["title"], "parent": menu[i]["children"][j]["parent"]}
idMenu = self._getId(menu[i]["title"])
self._logger.debug("{0} : content parent {1}".format(self._name, menu[i]))
self._logger.debug("{0} : content idMenu {1}".format(self._name, idMenu))
if idMenu["id"] == 0:
idMenu = self._getIdFromReverse(menu[i]["title"], menu[i]["href"])
if idMenu["id"] != 0:
menu[i] = {"id":idMenu["id"], "type": idMenu["type"], "title":menu[i]["title"], "link":idMenu["link"], "children": menu[i]["children"]}
self._createMenu(menu)
def _createItemMenu(self, idMenu, itemMenu, parent):
idItemMenu = 0
self._logger.info("{0} : Create item menu from API Wordpress : {1}".format(self._name, self._wordpress))
try:
params = {"search": itemMenu["title"], "menus": idMenu}
page = self._request_wordpress.get("{1}://{0}/wp-json/wp/v2/menu-items".format(self._wordpress, self._protocol_wordpress), auth=self._basic, params=params)
if page.status_code == 200:
result = page.json()
for i in result:
if self._replaceCaracter(i["title"]["rendered"]) == itemMenu["title"]:
idItemMenu = int(i["id"])
self._logger.info("{0} : Length of result for item menus : {1}".format(self._name, len(result)))
url = "{1}://{0}/wp-json/wp/v2/menu-items".format(self._wordpress, self._protocol_wordpress)
if idItemMenu != 0:
url = "{1}://{0}/wp-json/wp/v2/menu-items/{2}".format(self._wordpress, self._protocol_wordpress, idItemMenu)
try:
objectt = itemMenu["type"]
if objectt == "tag":
objectt = "post_tag"
data = {"title": itemMenu["title"], "status": "publish", "parent":parent, "menus":idMenu, "url":"#"}
if itemMenu["title"] == "home":
data = {"title": itemMenu["title"], "status": "publish", "parent":parent, "menus":idMenu, "url":"{0}://{1}".format(self._protocol_wordpress, self._wordpress)}
if type(itemMenu["id"]) is str:
if len(itemMenu["id"]) > 0:
data = {"title": itemMenu["title"], "status": "publish", "parent":parent, "url": itemMenu["link"], "menus":idMenu, "object":objectt, "object_id":int(itemMenu["id"])}
elif type(itemMenu["id"]) is int:
data = {"title": itemMenu["title"], "status": "publish", "parent":parent, "url": itemMenu["link"], "menus":idMenu, "object":objectt, "object_id":itemMenu["id"]}
self._logger.debug("{0} : data for create/update : {1}".format(self._name, data))
page = self._request_wordpress.post(url, auth=self._basic, headers=self._headers_json, data=json.dumps(data))
if page.status_code in [201, 200]:
result = page.json()
idItemMenu = int(result["id"])
self._logger.info("{0} : create/update item menu : {1}".format(self._name, itemMenu["title"]))
else:
self._logger.error("{0} : Create menu items for {2} didn't get due status code : {1}".format(self._name, page.status_code, itemMenu["title"]))
self._logger.debug("{0} : {1} {2}".format(self._name, page.content, itemMenu))
except ConnectionError as err:
self._logger.error("{0} : Connection error for create item menu {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menu-items".format(self._wordpress, self._protocol_wordpress), err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for create item menu {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menu-items".format(self._wordpress, self._protocol_wordpress), err))
else:
self._logger.error("{0} : Get menu items didn't get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get item menus {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menu-items".format(self._wordpress, self._protocol_wordpress), err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get item menus {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menu-items".format(self._wordpress, self._protocol_wordpress), err))
return idItemMenu
def _createMenu(self, menu):
title = "Menu {0}".format(self._wordpress)
self._logger.info("{0} : Create menu from API Wordpress : {1}".format(self._name, title))
try:
params = {"search": title}
page = self._request_wordpress.get("{1}://{0}/wp-json/wp/v2/menus".format(self._wordpress, self._protocol_wordpress), auth=self._basic, params=params)
if page.status_code == 200:
result = page.json()
self._logger.info("{0} : Get content menus : {1}".format(self._name, len(result)))
idMenu = 0
if len(result) == 0:
self._logger.info("{0} : Create menu : {1}".format(self._name, title))
data = {"name": title}
try:
page = self._request_wordpress.post("{1}://{0}/wp-json/wp/v2/menus".format(self._wordpress, self._protocol_wordpress), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
if page.status_code == 201:
result = page.json()
self._logger.debug("{0} : Get menus : {1}".format(self._name, result))
if len(result) > 0:
idMenu = result["id"]
else:
self._logger.error("{0} : Post menu didn't get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for create menu {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menus".format(self._wordpress, self._protocol_wordpress), err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get menu {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menus".format(self._wordpress, self._protocol_wordpress), err))
else:
self._logger.debug("{0} : Get menus : {1}".format(self._name, result))
for i in result:
self._logger.debug("{0} : List menus : {1}".format(self._name, i))
if i["name"] == title:
idMenu = i["id"]
self._logger.info("{0} : Get ID menus : {1}".format(self._name, idMenu))
self._addItemMenu(menu, idMenu)
else:
self._logger.error("{0} : Get menu didn't get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get menu {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menus".format(self._wordpress, self._protocol_wordpress), err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get menu {1} : {2}".format(self._name, "{1}://{0}/wp-json/wp/v2/menus".format(self._wordpress, self._protocol_wordpress), err))
def _addItemMenu(self, menu, idMenu):
self._logger.info("{0} : add item to menu : {1}".format(self._name, idMenu))
parent = 0
for i in menu:
parent = 0
self._logger.debug("{0} : debug create item menu : {1}".format(self._name, i))
parent = self._createItemMenu(idMenu, i, parent)
for j in i["children"]:
self._createItemMenu(idMenu, j, parent)