Merge pull request #117 from MightyCreak/pylint
Fix lint messages in src/utils.py
This commit is contained in:
commit
1402e97424
|
@ -23,7 +23,7 @@ jobs:
|
|||
- name: Pylint
|
||||
uses: cclauss/GitHub-Action-for-pylint@master
|
||||
with:
|
||||
args: "pylint src/vcs/"
|
||||
args: "pylint src/utils.py src/vcs/"
|
||||
|
||||
meson-build-test:
|
||||
runs-on: ubuntu-latest
|
||||
|
|
|
@ -94,6 +94,7 @@ disable=raw-checker-failed,
|
|||
missing-function-docstring,
|
||||
import-error,
|
||||
no-self-use,
|
||||
too-many-arguments,
|
||||
too-many-branches,
|
||||
too-many-locals,
|
||||
too-many-statements,
|
||||
|
|
17
src/main.py
17
src/main.py
|
@ -29,25 +29,16 @@ import stat
|
|||
import unicodedata
|
||||
import webbrowser
|
||||
|
||||
# pylint: disable=wrong-import-position
|
||||
import gi
|
||||
|
||||
gi.require_version('GObject', '2.0')
|
||||
from gi.repository import GObject
|
||||
|
||||
gi.require_version('Gtk', '3.0')
|
||||
from gi.repository import Gtk
|
||||
|
||||
gi.require_version('Gdk', '3.0')
|
||||
from gi.repository import Gdk
|
||||
|
||||
gi.require_version('GdkPixbuf', '2.0')
|
||||
from gi.repository import GdkPixbuf
|
||||
|
||||
gi.require_version('Pango', '1.0')
|
||||
from gi.repository import Pango
|
||||
|
||||
gi.require_version('PangoCairo', '1.0')
|
||||
from gi.repository import PangoCairo
|
||||
from gi.repository import GObject, Gtk, Gdk, GdkPixbuf, Pango, PangoCairo
|
||||
# pylint: enable=wrong-import-position
|
||||
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
@ -6088,7 +6079,7 @@ class Diffuse(Gtk.Window):
|
|||
if os.path.isfile(statepath):
|
||||
try:
|
||||
f = open(statepath, 'r')
|
||||
ss = readlines(f)
|
||||
ss = utils.readlines(f)
|
||||
f.close()
|
||||
for j, s in enumerate(ss):
|
||||
try:
|
||||
|
|
148
src/utils.py
148
src/utils.py
|
@ -23,21 +23,28 @@ import locale
|
|||
import subprocess
|
||||
import traceback
|
||||
|
||||
# pylint: disable=wrong-import-position
|
||||
import gi
|
||||
|
||||
gi.require_version('Gtk', '3.0')
|
||||
from gi.repository import Gtk
|
||||
# pylint: enable=wrong-import-position
|
||||
|
||||
from diffuse import constants
|
||||
|
||||
# convenience class for displaying a message dialogue
|
||||
class MessageDialog(Gtk.MessageDialog):
|
||||
def __init__(self, parent, type, s):
|
||||
if type == Gtk.MessageType.ERROR:
|
||||
def __init__(self, parent, message_type, s):
|
||||
if message_type == Gtk.MessageType.ERROR:
|
||||
buttons = Gtk.ButtonsType.OK
|
||||
else:
|
||||
buttons = Gtk.ButtonsType.OK_CANCEL
|
||||
Gtk.MessageDialog.__init__(self, parent = parent, destroy_with_parent = True, message_type = type, buttons = buttons, text = s)
|
||||
Gtk.MessageDialog.__init__(
|
||||
self,
|
||||
parent=parent,
|
||||
destroy_with_parent=True,
|
||||
message_type=message_type,
|
||||
buttons=buttons,
|
||||
text=s)
|
||||
self.set_title(constants.APP_NAME)
|
||||
|
||||
# platform test
|
||||
|
@ -76,13 +83,17 @@ def make_subdirs(p, ss):
|
|||
pass
|
||||
return p
|
||||
|
||||
def useFlatpak():
|
||||
return constants.use_flatpak
|
||||
# returns the Windows drive or share from a from an absolute path
|
||||
def _drive_from_path(path):
|
||||
d = path.split(os.sep)
|
||||
if len(d) > 3 and d[0] == '' and d[1] == '':
|
||||
return os.path.join(d[:4])
|
||||
return d[0]
|
||||
|
||||
# constructs a relative path from 'a' to 'b', both should be absolute paths
|
||||
def relpath(a, b):
|
||||
if isWindows():
|
||||
if drive_from_path(a) != drive_from_path(b):
|
||||
if _drive_from_path(a) != _drive_from_path(b):
|
||||
return b
|
||||
c1 = [ c for c in a.split(os.sep) if c != '' ]
|
||||
c2 = [ c for c in b.split(os.sep) if c != '' ]
|
||||
|
@ -104,24 +115,25 @@ def safeRelativePath(abspath1, name, prefs, cygwin_pref):
|
|||
s = s.replace('/', '\\')
|
||||
return s
|
||||
|
||||
# returns the Windows drive or share from a from an absolute path
|
||||
def drive_from_path(s):
|
||||
c = s.split(os.sep)
|
||||
if len(c) > 3 and c[0] == '' and c[1] == '':
|
||||
return os.path.join(c[:4])
|
||||
return c[0]
|
||||
|
||||
# escape arguments for use with bash
|
||||
def bashEscape(s):
|
||||
def _bash_escape(s):
|
||||
return "'" + s.replace("'", "'\\''") + "'"
|
||||
|
||||
def _use_flatpak():
|
||||
return constants.use_flatpak
|
||||
|
||||
# use popen to read the output of a command
|
||||
def popenRead(dn, cmd, prefs, bash_pref, success_results=None):
|
||||
if success_results is None:
|
||||
success_results = [ 0 ]
|
||||
if isWindows() and prefs.getBool(bash_pref):
|
||||
# launch the command from a bash shell is requested
|
||||
cmd = [ prefs.convertToNativePath('/bin/bash.exe'), '-l', '-c', 'cd {}; {}'.format(bashEscape(dn), ' '.join([ bashEscape(arg) for arg in cmd ])) ]
|
||||
cmd = [
|
||||
prefs.convertToNativePath('/bin/bash.exe'),
|
||||
'-l',
|
||||
'-c',
|
||||
f"cd {_bash_escape(dn)}; {' '.join([ _bash_escape(arg) for arg in cmd ])}"
|
||||
]
|
||||
dn = None
|
||||
# use subprocess.Popen to retrieve the file contents
|
||||
if isWindows():
|
||||
|
@ -130,57 +142,26 @@ def popenRead(dn, cmd, prefs, bash_pref, success_results=None):
|
|||
info.wShowWindow = subprocess.SW_HIDE
|
||||
else:
|
||||
info = None
|
||||
if useFlatpak():
|
||||
if _use_flatpak():
|
||||
cmd = [ 'flatpak-spawn', '--host' ] + cmd
|
||||
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=dn, startupinfo=info)
|
||||
proc.stdin.close()
|
||||
proc.stderr.close()
|
||||
fd = proc.stdout
|
||||
# read the command's output
|
||||
s = fd.read()
|
||||
fd.close()
|
||||
if proc.wait() not in success_results:
|
||||
raise IOError('Command failed.')
|
||||
return s
|
||||
|
||||
# use popen to read the output of a command
|
||||
def popenReadLines(dn, cmd, prefs, bash_pref, success_results=None):
|
||||
return strip_eols(splitlines(popenRead(dn, cmd, prefs, bash_pref, success_results).decode('utf-8', errors='ignore')))
|
||||
|
||||
# simulate use of popen with xargs to read the output of a command
|
||||
def popenXArgsReadLines(dn, cmd, args, prefs, bash_pref):
|
||||
# os.sysconf() is only available on Unix
|
||||
if hasattr(os, 'sysconf'):
|
||||
maxsize = os.sysconf('SC_ARG_MAX')
|
||||
maxsize -= sum([ len(k) + len(v) + 2 for k, v in os.environ.items() ])
|
||||
else:
|
||||
# assume the Window's limit to CreateProcess()
|
||||
maxsize = 32767
|
||||
maxsize -= sum([ len(k) + 1 for k in cmd ])
|
||||
|
||||
ss = []
|
||||
i, s, a = 0, 0, []
|
||||
while i < len(args):
|
||||
f = (len(a) == 0)
|
||||
if f:
|
||||
# start a new command line
|
||||
a = cmd[:]
|
||||
elif s + len(args[i]) + 1 <= maxsize:
|
||||
f = True
|
||||
if f:
|
||||
# append another argument to the current command line
|
||||
a.append(args[i])
|
||||
s += len(args[i]) + 1
|
||||
i += 1
|
||||
if i == len(args) or not f:
|
||||
ss.extend(popenReadLines(dn, a, prefs, bash_pref))
|
||||
s, a = 0, []
|
||||
return ss
|
||||
|
||||
# escape special glob characters
|
||||
def globEscape(s):
|
||||
m = dict([ (c, f'[{c}]') for c in '[]?*' ])
|
||||
return ''.join([ m.get(c, c) for c in s ])
|
||||
with (
|
||||
subprocess.Popen(
|
||||
cmd,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
cwd=dn,
|
||||
startupinfo=info) as proc
|
||||
):
|
||||
proc.stdin.close()
|
||||
proc.stderr.close()
|
||||
fd = proc.stdout
|
||||
# read the command's output
|
||||
s = fd.read()
|
||||
fd.close()
|
||||
if proc.wait() not in success_results:
|
||||
raise IOError('Command failed.')
|
||||
return s
|
||||
|
||||
# returns the number of characters in the string excluding any line ending
|
||||
# characters
|
||||
|
@ -196,20 +177,35 @@ def len_minus_line_ending(s):
|
|||
|
||||
# returns the string without the line ending characters
|
||||
def strip_eol(s):
|
||||
if s is not None:
|
||||
return s[:len_minus_line_ending(s)]
|
||||
if s:
|
||||
s = s[:len_minus_line_ending(s)]
|
||||
return s
|
||||
|
||||
# returns the list of strings without line ending characters
|
||||
def _strip_eols(ss):
|
||||
return [ strip_eol(s) for s in ss ]
|
||||
|
||||
# use popen to read the output of a command
|
||||
def popenReadLines(dn, cmd, prefs, bash_pref, success_results=None):
|
||||
return _strip_eols(splitlines(popenRead(
|
||||
dn, cmd, prefs, bash_pref, success_results).decode('utf-8', errors='ignore')))
|
||||
|
||||
# escape special glob characters
|
||||
def globEscape(s):
|
||||
m = { c: f'[{c}]' for c in '[]?*' }
|
||||
return ''.join([ m.get(c, c) for c in s ])
|
||||
|
||||
# split string into lines based upon DOS, Mac, and Unix line endings
|
||||
def splitlines(s):
|
||||
def splitlines(text: str) -> list[str]:
|
||||
# split on new line characters
|
||||
temp, i, n = [], 0, len(s)
|
||||
temp, i, n = [], 0, len(text)
|
||||
while i < n:
|
||||
j = s.find('\n', i)
|
||||
j = text.find('\n', i)
|
||||
if j < 0:
|
||||
temp.append(s[i:])
|
||||
temp.append(text[i:])
|
||||
break
|
||||
j += 1
|
||||
temp.append(s[i:j])
|
||||
temp.append(text[i:j])
|
||||
i = j
|
||||
# split on carriage return characters
|
||||
ss = []
|
||||
|
@ -229,7 +225,7 @@ def splitlines(s):
|
|||
|
||||
# also recognize old Mac OS line endings
|
||||
def readlines(fd):
|
||||
return [ strip_eol(s) for s in splitlines(fd.read()) ]
|
||||
return _strip_eols(splitlines(fd.read()))
|
||||
|
||||
# use the program's location as a starting place to search for supporting files
|
||||
# such as icon and help documentation
|
||||
|
@ -249,8 +245,8 @@ if isWindows():
|
|||
if v in os.environ:
|
||||
lang = os.environ[v]
|
||||
# remove any additional languages, encodings, or modifications
|
||||
for v in ':.@':
|
||||
lang = lang.split(v)[0]
|
||||
for c in ':.@':
|
||||
lang = lang.split(c)[0]
|
||||
break
|
||||
else:
|
||||
if lang is not None:
|
||||
|
|
|
@ -59,6 +59,36 @@ class Rcs(VcsInterface):
|
|||
utils.logError(_('Error parsing revision %s.') % (rev, ))
|
||||
return result
|
||||
|
||||
# simulate use of popen with xargs to read the output of a command
|
||||
def _popen_xargs_readlines(self, dn, cmd, args, prefs, bash_pref):
|
||||
# os.sysconf() is only available on Unix
|
||||
if hasattr(os, 'sysconf'):
|
||||
maxsize = os.sysconf('SC_ARG_MAX')
|
||||
maxsize -= sum([ len(k) + len(v) + 2 for k, v in os.environ.items() ])
|
||||
else:
|
||||
# assume the Window's limit to CreateProcess()
|
||||
maxsize = 32767
|
||||
maxsize -= sum([ len(k) + 1 for k in cmd ])
|
||||
|
||||
ss = []
|
||||
i, s, a = 0, 0, []
|
||||
while i < len(args):
|
||||
f = (len(a) == 0)
|
||||
if f:
|
||||
# start a new command line
|
||||
a = cmd[:]
|
||||
elif s + len(args[i]) + 1 <= maxsize:
|
||||
f = True
|
||||
if f:
|
||||
# append another argument to the current command line
|
||||
a.append(args[i])
|
||||
s += len(args[i]) + 1
|
||||
i += 1
|
||||
if i == len(args) or not f:
|
||||
ss.extend(utils.popenReadLines(dn, a, prefs, bash_pref))
|
||||
s, a = 0, []
|
||||
return ss
|
||||
|
||||
def getFolderTemplate(self, prefs, names):
|
||||
# build command
|
||||
cmd = [ prefs.getString('rcs_bin_rlog'), '-L', '-h' ]
|
||||
|
@ -108,7 +138,7 @@ class Rcs(VcsInterface):
|
|||
args = [ utils.safeRelativePath(self.root, k, prefs, 'rcs_cygwin') for k in r ]
|
||||
# run command
|
||||
r, k = {}, ''
|
||||
for line in utils.popenXArgsReadLines(self.root, cmd, args, prefs, 'rcs_bash'):
|
||||
for line in self._popen_xargs_readlines(self.root, cmd, args, prefs, 'rcs_bash'):
|
||||
# parse response
|
||||
if line.startswith('Working file: '):
|
||||
k = prefs.convertToNativePath(line[14:])
|
||||
|
|
Loading…
Reference in New Issue