adding new stuff

This commit is contained in:
ViktorBarzin 2017-07-09 00:22:01 +03:00
parent f84d7183aa
commit 9ef8a96f9a
1580 changed files with 0 additions and 0 deletions

View file

@ -0,0 +1,11 @@
"""
Code audit tool for python.
:copyright: 2013 by Kirill Klenov.
:license: BSD, see LICENSE for more details.
"""
__version__ = "7.0.6"
__project__ = "pylama"
__author__ = "Kirill Klenov <horneds@gmail.com>"
__license__ = "GNU LGPL"

View file

@ -0,0 +1,6 @@
"""Support the module execution."""
from .main import shell
if __name__ == '__main__':
shell()

View file

@ -0,0 +1,76 @@
""" Support for asyncronious checking. """
import logging
import threading
from .core import run
try:
import Queue
except ImportError:
import queue as Queue
try:
import multiprocessing
CPU_COUNT = multiprocessing.cpu_count()
except (ImportError, NotImplementedError):
CPU_COUNT = 1
LOGGER = logging.getLogger('pylama')
class Worker(threading.Thread):
""" Get tasks from queue and run. """
def __init__(self, path_queue, result_queue):
""" Init worker. """
threading.Thread.__init__(self)
self.path_queue = path_queue
self.result_queue = result_queue
def run(self):
""" Run tasks from queue. """
while True:
path, params = self.path_queue.get()
errors = run(path, **params)
self.result_queue.put(errors)
self.path_queue.task_done()
def check_async(paths, options, rootdir=None):
""" Check given paths asynchronously.
:return list: list of errors
"""
LOGGER.info('Async code checking is enabled.')
path_queue = Queue.Queue()
result_queue = Queue.Queue()
for num in range(CPU_COUNT):
worker = Worker(path_queue, result_queue)
worker.setDaemon(True)
LOGGER.info('Start worker #%s', (num + 1))
worker.start()
for path in paths:
path_queue.put((path, dict(options=options, rootdir=rootdir)))
path_queue.join()
errors = []
while True:
try:
errors += result_queue.get(False)
except Queue.Empty:
break
return errors
# pylama:ignore=W0212,D210,F0001

View file

@ -0,0 +1,241 @@
""" Parse arguments from command line and configuration files. """
import fnmatch
import os
import sys
import re
import logging
from argparse import ArgumentParser
from . import __version__
from .libs.inirama import Namespace
from .lint.extensions import LINTERS
#: A default checkers
DEFAULT_LINTERS = 'pep8', 'pyflakes', 'mccabe'
CURDIR = os.getcwd()
CONFIG_FILES = 'pylama.ini', 'setup.cfg', 'tox.ini', 'pytest.ini'
#: The skip pattern
SKIP_PATTERN = re.compile(r'# *noqa\b', re.I).search
# Parse a modelines
MODELINE_RE = re.compile(r'^\s*#\s+(?:pylama:)\s*((?:[\w_]*=[^:\n\s]+:?)+)', re.I | re.M)
# Setup a logger
LOGGER = logging.getLogger('pylama')
LOGGER.propagate = False
STREAM = logging.StreamHandler(sys.stdout)
LOGGER.addHandler(STREAM)
class _Default(object):
def __init__(self, value=None):
self.value = value
def __str__(self):
return str(self.value)
def __repr__(self):
return "<_Default [%s]>" % self.value
def split_csp_str(s):
""" Split commaseparated string.
:returns: list of splitted values
"""
if isinstance(s, (list, tuple)):
return s
return list(set(i for i in s.strip().split(',') if i))
def parse_linters(linters):
""" Initialize choosen linters.
:returns: list of inited linters
"""
result = list()
for name in split_csp_str(linters):
linter = LINTERS.get(name)
if linter:
result.append((name, linter))
else:
logging.warn("Linter `%s` not found.", name)
return result
PARSER = ArgumentParser(description="Code audit tool for python.")
PARSER.add_argument(
"paths", nargs='*', default=_Default([CURDIR]),
help="Paths to files or directories for code check.")
PARSER.add_argument(
"--verbose", "-v", action='store_true', help="Verbose mode.")
PARSER.add_argument('--version', action='version',
version='%(prog)s ' + __version__)
PARSER.add_argument(
"--format", "-f", default=_Default('pep8'), choices=['pep8', 'pylint'],
help="Choose errors format (pep8, pylint).")
PARSER.add_argument(
"--select", "-s", default=_Default(''), type=split_csp_str,
help="Select errors and warnings. (comma-separated list)")
PARSER.add_argument(
"--sort", default=_Default(''), type=split_csp_str,
help="Sort result by error types. Ex. E,W,D")
PARSER.add_argument(
"--linters", "-l", default=_Default(','.join(DEFAULT_LINTERS)),
type=parse_linters, help=(
"Select linters. (comma-separated). Choices are %s."
% ','.join(s for s in LINTERS.keys())
))
PARSER.add_argument(
"--ignore", "-i", default=_Default(''), type=split_csp_str,
help="Ignore errors and warnings. (comma-separated)")
PARSER.add_argument(
"--skip", default=_Default(''),
type=lambda s: [re.compile(fnmatch.translate(p)) for p in s.split(',') if p],
help="Skip files by masks (comma-separated, Ex. */messages.py)")
PARSER.add_argument("--report", "-r", help="Send report to file [REPORT]")
PARSER.add_argument(
"--hook", action="store_true", help="Install Git (Mercurial) hook.")
PARSER.add_argument(
"--async", action="store_true",
help="Enable async mode. Usefull for checking a lot of files. "
"Dont supported with pylint.")
PARSER.add_argument(
"--options", "-o", default="",
help="Select configuration file. By default is '<CURDIR>/pylama.ini'")
PARSER.add_argument(
"--force", "-F", action='store_true', default=_Default(False),
help="Force code checking (if linter doesnt allow)")
PARSER.add_argument(
"--abspath", "-a", action='store_true', default=_Default(False),
help="Use absolute paths in output.")
ACTIONS = dict((a.dest, a) for a in PARSER._actions)
def parse_options(args=None, config=True, rootdir=CURDIR, **overrides): # noqa
""" Parse options from command line and configuration files.
:return argparse.Namespace:
"""
if args is None:
args = []
# Parse args from command string
options = PARSER.parse_args(args)
options.file_params = dict()
options.linters_params = dict()
# Override options
for k, v in overrides.items():
passed_value = getattr(options, k, _Default())
if isinstance(passed_value, _Default):
setattr(options, k, _Default(v))
# Compile options from ini
if config:
cfg = get_config(str(options.options), rootdir=rootdir)
for k, v in cfg.default.items():
LOGGER.info('Find option %s (%s)', k, v)
passed_value = getattr(options, k, _Default())
if isinstance(passed_value, _Default):
if k == 'paths':
v = v.split()
setattr(options, k, _Default(v))
# Parse file related options
for name, opts in cfg.sections.items():
if not name.startswith('pylama'):
continue
if name == cfg.default_section:
continue
name = name[7:]
if name in LINTERS:
options.linters_params[name] = dict(opts)
continue
mask = re.compile(fnmatch.translate(name))
options.file_params[mask] = dict(opts)
# Postprocess options
opts = dict(options.__dict__.items())
for name, value in opts.items():
if isinstance(value, _Default):
setattr(options, name, process_value(name, value.value))
if options.async and 'pylint' in options.linters:
LOGGER.warn('Cant parse code asynchronously while pylint is enabled.')
options.async = False
return options
def process_value(name, value):
""" Compile option value. """
action = ACTIONS.get(name)
if not action:
return value
if callable(action.type):
return action.type(value)
if action.const:
return bool(int(value))
return value
def get_config(ini_path=None, rootdir=CURDIR):
""" Load configuration from INI.
:return Namespace:
"""
config = Namespace()
config.default_section = 'pylama'
if not ini_path:
for path in CONFIG_FILES:
path = os.path.join(rootdir, path)
if os.path.isfile(path) and os.access(path, os.R_OK):
config.read(path)
else:
config.read(ini_path)
return config
def setup_logger(options):
""" Setup logger with options. """
LOGGER.setLevel(logging.INFO if options.verbose else logging.WARN)
if options.report:
LOGGER.removeHandler(STREAM)
LOGGER.addHandler(logging.FileHandler(options.report, mode='w'))
LOGGER.info('Try to read configuration from: ' + options.options)
# pylama:ignore=W0212,D210,F0001

View file

@ -0,0 +1,200 @@
""" Pylama's core functionality.
Prepare params, check a modeline and run the checkers.
"""
import logging
import os.path as op
from .config import process_value, LOGGER, MODELINE_RE, SKIP_PATTERN, CURDIR
from .errors import Error, remove_duplicates
from .lint.extensions import LINTERS
def run(path='', code=None, rootdir=CURDIR, options=None):
""" Run code checkers with given params.
:param path: (str) A file's path.
:param code: (str) A code source
:return errors: list of dictionaries with error's information
"""
errors = []
fileconfig = dict()
linters = LINTERS
linters_params = dict()
lname = 'undefined'
params = dict()
path = op.relpath(path, rootdir)
if options:
linters = options.linters
linters_params = options.linters_params
for mask in options.file_params:
if mask.match(path):
fileconfig.update(options.file_params[mask])
if options.skip and any(p.match(path) for p in options.skip):
LOGGER.info('Skip checking for path: %s', path)
return []
try:
with CodeContext(code, path) as ctx:
code = ctx.code
params = prepare_params(parse_modeline(code), fileconfig, options)
LOGGER.debug('Checking params: %s', params)
if params.get('skip'):
return errors
for item in params.get('linters') or linters:
if not isinstance(item, tuple):
item = (item, LINTERS.get(item))
lname, linter = item
if not linter:
continue
lparams = linters_params.get(lname, dict())
LOGGER.info("Run %s %s", lname, lparams)
for er in linter.run(
path, code=code, ignore=params.get("ignore", set()),
select=params.get("select", set()), params=lparams):
errors.append(Error(filename=path, linter=lname, **er))
except IOError as e:
LOGGER.debug("IOError %s", e)
errors.append(Error(text=str(e), filename=path, linter=lname))
except SyntaxError as e:
LOGGER.debug("SyntaxError %s", e)
errors.append(
Error(linter=lname, lnum=e.lineno, col=e.offset, text=e.args[0],
filename=path))
except Exception as e: # noqa
import traceback
LOGGER.info(traceback.format_exc())
errors = filter_errors(errors, **params)
errors = list(remove_duplicates(errors))
if code and errors:
errors = filter_skiplines(code, errors)
key = lambda e: e.lnum
if options and options.sort:
sort = dict((v, n) for n, v in enumerate(options.sort, 1))
key = lambda e: (sort.get(e.type, 999), e.lnum)
return sorted(errors, key=key)
def parse_modeline(code):
""" Parse params from file's modeline.
:return dict: Linter params.
"""
seek = MODELINE_RE.search(code)
if seek:
return dict(v.split('=') for v in seek.group(1).split(':'))
return dict()
def prepare_params(modeline, fileconfig, options):
""" Prepare and merge a params from modelines and configs.
:return dict:
"""
params = dict(skip=False, ignore=[], select=[], linters=[])
if options:
params['ignore'] = list(options.ignore)
params['select'] = list(options.select)
for config in filter(None, [modeline, fileconfig]):
for key in ('ignore', 'select', 'linters'):
params[key] += process_value(key, config.get(key, []))
params['skip'] = bool(int(config.get('skip', False)))
params['ignore'] = set(params['ignore'])
params['select'] = set(params['select'])
return params
def filter_errors(errors, select=None, ignore=None, **params):
""" Filter a erros by select and ignore options.
:return bool:
"""
select = select or []
ignore = ignore or []
for e in errors:
for s in select:
if e.number.startswith(s):
yield e
break
else:
for s in ignore:
if e.number.startswith(s):
break
else:
yield e
def filter_skiplines(code, errors):
""" Filter lines by `noqa`.
:return list: A filtered errors
"""
if not errors:
return errors
enums = set(er.lnum for er in errors)
removed = set([
num for num, l in enumerate(code.split('\n'), 1)
if num in enums and SKIP_PATTERN(l)
])
if removed:
errors = [er for er in errors if er.lnum not in removed]
return errors
class CodeContext(object):
""" Read file if code is None. """
def __init__(self, code, path):
""" Init context. """
self.code = code
self.path = path
self._file = None
def __enter__(self):
""" Open a file and read it. """
if self.code is None:
LOGGER.info("File is reading: %s", self.path)
self._file = open(self.path, 'rU')
self.code = self._file.read()
return self
def __exit__(self, t, value, traceback):
""" Close the file which was opened. """
if self._file is not None:
self._file.close()
if t and LOGGER.level == logging.DEBUG:
LOGGER.debug(traceback)
# pylama:ignore=R0912,D210,F0001

View file

@ -0,0 +1,97 @@
""" Don't duplicate same errors from different linters. """
from collections import defaultdict
DUPLICATES = (
# multiple statements on one line
[('pep8', 'E701'), ('pylint', 'C0321')],
# unused variable
[('pylint', 'W0612'), ('pyflakes', 'W0612')],
# undefined variable
[('pylint', 'E0602'), ('pyflakes', 'E0602')],
# unused import
[('pylint', 'W0611'), ('pyflakes', 'W0611')],
# whitespace before ')'
[('pylint', 'C0326'), ('pep8', 'E202')],
# whitespace before '('
[('pylint', 'C0326'), ('pep8', 'E211')],
# multiple spaces after operator
[('pylint', 'C0326'), ('pep8', 'E222')],
# missing whitespace around operator
[('pylint', 'C0326'), ('pep8', 'E225')],
# unexpected spaces
[('pylint', 'C0326'), ('pep8', 'E251')],
# long lines
[('pylint', 'C0301'), ('pep8', 'E501')],
# statement ends with a semicolon
[('pylint', 'W0301'), ('pep8', 'E703')],
# multiple statements on one line
[('pylint', 'C0321'), ('pep8', 'E702')],
# bad indentation
[('pylint', 'W0311'), ('pep8', 'E111')],
# wildcart import
[('pylint', 'W00401'), ('pyflakes', 'W0401')],
# module docstring
[('pep257', 'D100'), ('pylint', 'C0111')],
)
DUPLICATES = dict((key, values) for values in DUPLICATES for key in values)
def remove_duplicates(errors):
""" Filter duplicates from given error's list. """
passed = defaultdict(list)
for error in errors:
key = error.linter, error.number
if key in DUPLICATES:
if key in passed[error.lnum]:
continue
passed[error.lnum] = DUPLICATES[key]
yield error
class Error(object):
""" Store an error's information. """
def __init__(self, linter="", col=1, lnum=1, type="E",
text="unknown error", filename="", **kwargs):
""" Init error information with default values. """
text = ' '.join(str(text).strip().split('\n'))
if linter:
text = "%s [%s]" % (text, linter)
number = text.split(' ', 1)[0]
self._info = dict(linter=linter, col=col, lnum=lnum, type=type[:1],
text=text, filename=filename, number=number)
def __getattr__(self, name):
return self._info[name]
def __getitem__(self, name):
return self._info[name]
def get(self, name, default=None):
""" Implement dictionary `get` method. """
return self._info.get(name, default)
def __repr__(self):
return "<Error: %s %s>" % (self.number, self.linter)
# pylama:ignore=W0622,D,R0924

View file

@ -0,0 +1,111 @@
""" SCM hooks. Integration with git and mercurial. """
from __future__ import absolute_import
import sys
from os import path as op, chmod
from subprocess import Popen, PIPE
from .main import LOGGER, process_paths
from .config import parse_options, setup_logger
try:
from configparser import ConfigParser # noqa
except ImportError: # Python 2
from ConfigParser import ConfigParser
def run(command):
""" Run a shell command.
:return str: Stdout
"""
p = Popen(command.split(), stdout=PIPE, stderr=PIPE)
(stdout, stderr) = p.communicate()
return (p.returncode, [line.strip() for line in stdout.splitlines()],
[line.strip() for line in stderr.splitlines()])
def git_hook():
""" Run pylama after git commit. """
_, files_modified, _ = run("git diff-index --cached --name-only HEAD")
options = parse_options()
setup_logger(options)
candidates = list(map(str, files_modified))
if candidates:
process_paths(options, candidates=candidates)
def hg_hook(ui, repo, node=None, **kwargs):
""" Run pylama after mercurial commit. """
seen = set()
paths = []
if len(repo):
for rev in range(repo[node], len(repo)):
for file_ in repo[rev].files():
file_ = op.join(repo.root, file_)
if file_ in seen or not op.exists(file_):
continue
seen.add(file_)
paths.append(file_)
options = parse_options()
setup_logger(options)
if paths:
process_paths(options, candidates=paths)
def install_git(path):
""" Install hook in Git repository. """
hook = op.join(path, 'pre-commit')
with open(hook, 'w') as fd:
fd.write("""#!/usr/bin/env python
import sys
from pylama.hook import git_hook
if __name__ == '__main__':
sys.exit(git_hook())
""")
chmod(hook, 484)
def install_hg(path):
""" Install hook in Mercurial repository. """
hook = op.join(path, 'hgrc')
if not op.isfile(hook):
open(hook, 'w+').close()
c = ConfigParser()
c.readfp(open(hook, 'r'))
if not c.has_section('hooks'):
c.add_section('hooks')
if not c.has_option('hooks', 'commit'):
c.set('hooks', 'commit', 'python:pylama.hooks.hg_hook')
if not c.has_option('hooks', 'qrefresh'):
c.set('hooks', 'qrefresh', 'python:pylama.hooks.hg_hook')
c.write(open(hook, 'w+'))
def install_hook(path):
""" Auto definition of SCM and hook installation. """
git = op.join(path, '.git', 'hooks')
hg = op.join(path, '.hg')
if op.exists(git):
install_git(git)
LOGGER.warn('Git hook has been installed.')
elif op.exists(hg):
install_hg(hg)
LOGGER.warn('Mercurial hook has been installed.')
else:
LOGGER.error('VCS has not found. Check your path.')
sys.exit(1)
# pylama:ignore=F0401,E1103,D210,F0001

View file

@ -0,0 +1 @@
""" Support libs. """

View file

@ -0,0 +1,38 @@
"""Backport of importlib.import_module from 3.x."""
# While not critical (and in no way guaranteed!), it would be nice to keep this
# code compatible with Python 2.3.
import sys
def _resolve_name(name, package, level):
"""Return the absolute name of the module to be imported."""
if not hasattr(package, 'rindex'):
raise ValueError("'package' not set to a string")
dot = len(package)
for x in xrange(level, 1, -1):
try:
dot = package.rindex('.', 0, dot)
except ValueError:
raise ValueError("attempted relative import beyond top-level "
"package")
return "%s.%s" % (package[:dot], name)
def import_module(name, package=None):
"""Import a module.
The 'package' argument is required when performing a relative import. It
specifies the package to use as the anchor point from which to resolve the
relative import to an absolute import.
"""
if name.startswith('.'):
if not package:
raise TypeError("relative imports require the 'package' argument")
level = 0
for character in name:
if character != '.':
break
level += 1
name = _resolve_name(name[level:], package, level)
__import__(name)
return sys.modules[name]

View file

@ -0,0 +1,405 @@
"""
Inirama is a python module that parses INI files.
.. _badges:
.. include:: ../README.rst
:start-after: .. _badges:
:end-before: .. _contents:
.. _description:
.. include:: ../README.rst
:start-after: .. _description:
:end-before: .. _badges:
:copyright: 2013 by Kirill Klenov.
:license: BSD, see LICENSE for more details.
"""
from __future__ import unicode_literals, print_function
__version__ = "0.8.0"
__project__ = "Inirama"
__author__ = "Kirill Klenov <horneds@gmail.com>"
__license__ = "BSD"
import io
import re
import logging
try:
from collections import OrderedDict
except ImportError:
from UserDict import DictMixin
class OrderedDict(dict, DictMixin):
null = object()
def __init__(self, *args, **kwargs):
self.clear()
self.update(*args, **kwargs)
def clear(self):
self.__map = dict()
self.__order = list()
dict.clear(self)
def __setitem__(self, key, value):
if key not in self:
self.__map[key] = len(self.__order)
self.__order.append(key)
dict.__setitem__(self, key, value)
def __delitem__(self, key):
dict.__delitem__(self, key)
self.__map.pop(key)
self.__order = self.null
def __iter__(self):
for key in self.__order:
if key is not self.null:
yield key
def keys(self):
return list(self)
setdefault = DictMixin.setdefault
update = DictMixin.update
pop = DictMixin.pop
values = DictMixin.values
items = DictMixin.items
iterkeys = DictMixin.iterkeys
itervalues = DictMixin.itervalues
iteritems = DictMixin.iteritems
NS_LOGGER = logging.getLogger('inirama')
class Scanner(object):
""" Split a code string on tokens. """
def __init__(self, source, ignore=None, patterns=None):
""" Init Scanner instance.
:param patterns: List of token patterns [(token, regexp)]
:param ignore: List of ignored tokens
"""
self.reset(source)
if patterns:
self.patterns = []
for k, r in patterns:
self.patterns.append((k, re.compile(r)))
if ignore:
self.ignore = ignore
def reset(self, source):
""" Reset scanner's state.
:param source: Source for parsing
"""
self.tokens = []
self.source = source
self.pos = 0
def scan(self):
""" Scan source and grab tokens. """
self.pre_scan()
token = None
end = len(self.source)
while self.pos < end:
best_pat = None
best_pat_len = 0
# Check patterns
for p, regexp in self.patterns:
m = regexp.match(self.source, self.pos)
if m:
best_pat = p
best_pat_len = len(m.group(0))
break
if best_pat is None:
raise SyntaxError(
"SyntaxError[@char {0}: {1}]".format(
self.pos, "Bad token."))
# Ignore patterns
if best_pat in self.ignore:
self.pos += best_pat_len
continue
# Create token
token = (
best_pat,
self.source[self.pos:self.pos + best_pat_len],
self.pos,
self.pos + best_pat_len,
)
self.pos = token[-1]
self.tokens.append(token)
def pre_scan(self):
""" Prepare source. """
pass
def __repr__(self):
""" Print the last 5 tokens that have been scanned in.
:return str:
"""
return '<Scanner: ' + ','.join(
"{0}({2}:{3})".format(*t) for t in self.tokens[-5:]) + ">"
class INIScanner(Scanner):
""" Get tokens for INI. """
patterns = [
('SECTION', re.compile(r'\[[^]]+\]')),
('IGNORE', re.compile(r'[ \r\t\n]+')),
('COMMENT', re.compile(r'[;#].*')),
('KEY_VALUE', re.compile(r'[^=\s]+\s*[:=].*')),
('CONTINUATION', re.compile(r'.*'))
]
ignore = ['IGNORE']
def pre_scan(self):
""" Prepare string for scaning. """
escape_re = re.compile(r'\\\n[\t ]+')
self.source = escape_re.sub('', self.source)
undefined = object()
class Section(OrderedDict):
""" Representation of INI section. """
def __init__(self, namespace, *args, **kwargs):
super(Section, self).__init__(*args, **kwargs)
self.namespace = namespace
def __setitem__(self, name, value):
value = str(value)
if value.isdigit():
value = int(value)
super(Section, self).__setitem__(name, value)
class InterpolationSection(Section):
""" INI section with interpolation support. """
var_re = re.compile('{([^}]+)}')
def get(self, name, default=None):
""" Get item by name.
:return object: value or None if name not exists
"""
if name in self:
return self[name]
return default
def __interpolate__(self, math):
try:
key = math.group(1).strip()
return self.namespace.default.get(key) or self[key]
except KeyError:
return ''
def __getitem__(self, name, raw=False):
value = super(InterpolationSection, self).__getitem__(name)
if not raw:
sample = undefined
while sample != value:
try:
sample, value = value, self.var_re.sub(
self.__interpolate__, value)
except RuntimeError:
message = "Interpolation failed: {0}".format(name)
NS_LOGGER.error(message)
raise ValueError(message)
return value
def iteritems(self, raw=False):
""" Iterate self items. """
for key in self:
yield key, self.__getitem__(key, raw=raw)
items = iteritems
class Namespace(object):
""" Default class for parsing INI.
:param **default_items: Default items for default section.
Usage
-----
::
from inirama import Namespace
ns = Namespace()
ns.read('config.ini')
print ns['section']['key']
ns['other']['new'] = 'value'
ns.write('new_config.ini')
"""
#: Name of default section (:attr:`~inirama.Namespace.default`)
default_section = 'DEFAULT'
#: Dont raise any exception on file reading erorrs
silent_read = True
#: Class for generating sections
section_type = Section
def __init__(self, **default_items):
self.sections = OrderedDict()
for k, v in default_items.items():
self[self.default_section][k] = v
@property
def default(self):
""" Return default section or empty dict.
:return :class:`inirama.Section`: section
"""
return self.sections.get(self.default_section, dict())
def read(self, *files, **params):
""" Read and parse INI files.
:param *files: Files for reading
:param **params: Params for parsing
Set `update=False` for prevent values redefinition.
"""
for f in files:
try:
with io.open(f, encoding='utf-8') as ff:
NS_LOGGER.info('Read from `{0}`'.format(ff.name))
self.parse(ff.read(), **params)
except (IOError, TypeError, SyntaxError, io.UnsupportedOperation):
if not self.silent_read:
NS_LOGGER.error('Reading error `{0}`'.format(ff.name))
raise
def write(self, f):
""" Write namespace as INI file.
:param f: File object or path to file.
"""
if isinstance(f, str):
f = io.open(f, 'w', encoding='utf-8')
if not hasattr(f, 'read'):
raise AttributeError("Wrong type of file: {0}".format(type(f)))
NS_LOGGER.info('Write to `{0}`'.format(f.name))
for section in self.sections.keys():
f.write('[{0}]\n'.format(section))
for k, v in self[section].items():
f.write('{0:15}= {1}\n'.format(k, v))
f.write('\n')
f.close()
def parse(self, source, update=True, **params):
""" Parse INI source as string.
:param source: Source of INI
:param update: Replace alredy defined items
"""
scanner = INIScanner(source)
scanner.scan()
section = self.default_section
name = None
for token in scanner.tokens:
if token[0] == 'KEY_VALUE':
name, value = re.split('[=:]', token[1], 1)
name, value = name.strip(), value.strip()
if not update and name in self[section]:
continue
self[section][name] = value
elif token[0] == 'SECTION':
section = token[1].strip('[]')
elif token[0] == 'CONTINUATION':
if not name:
raise SyntaxError(
"SyntaxError[@char {0}: {1}]".format(
token[2], "Bad continuation."))
self[section][name] += '\n' + token[1].strip()
def __getitem__(self, name):
""" Look name in self sections.
:return :class:`inirama.Section`: section
"""
if name not in self.sections:
self.sections[name] = self.section_type(self)
return self.sections[name]
def __contains__(self, name):
return name in self.sections
def __repr__(self):
return "<Namespace: {0}>".format(self.sections)
class InterpolationNamespace(Namespace):
""" That implements the interpolation feature.
::
from inirama import InterpolationNamespace
ns = InterpolationNamespace()
ns.parse('''
[main]
test = value
foo = bar {test}
more_deep = wow {foo}
''')
print ns['main']['more_deep'] # wow bar value
"""
section_type = InterpolationSection
# pylama:ignore=D,W02,E731,W0621

View file

@ -0,0 +1,19 @@
"""Custom module loader."""
class Linter(object):
"""Abstract class for linter plugin."""
@staticmethod
def allow(path):
"""Check path is relevant for linter.
:return bool:
"""
return path.endswith('.py')
@staticmethod
def run(path, **meta):
"""Method 'run' should be defined."""
raise NotImplementedError(__doc__)

View file

@ -0,0 +1,39 @@
"""Load extensions."""
LINTERS = {}
try:
from pylama.lint.pylama_mccabe import Linter
LINTERS['mccabe'] = Linter()
except ImportError:
pass
try:
from pylama.lint.pylama_pep257 import Linter
LINTERS['pep257'] = Linter()
except ImportError:
pass
try:
from pylama.lint.pylama_pep8 import Linter
LINTERS['pep8'] = Linter()
except ImportError:
pass
try:
from pylama.lint.pylama_pyflakes import Linter
LINTERS['pyflakes'] = Linter()
except ImportError:
pass
from pkg_resources import iter_entry_points
for entry in iter_entry_points('pylama.linter'):
if entry.name not in LINTERS:
try:
LINTERS[entry.name] = entry.load()()
except ImportError:
pass
# pylama:ignore=E0611

View file

@ -0,0 +1,29 @@
"""Code complexity checking."""
from mccabe import McCabeChecker
from pylama.lint import Linter as Abstract
import ast
class Linter(Abstract):
"""Run complexity checking."""
@staticmethod
def run(path, code=None, params=None, **meta):
"""MCCabe code checking.
:return list: List of errors.
"""
try:
tree = compile(code, path, "exec", ast.PyCF_ONLY_AST)
except SyntaxError as exc:
return [{'lnum': exc.lineno, 'text': 'Invalid syntax: %s' % exc.text.strip()}]
McCabeChecker.max_complexity = int(params.get('complexity', 10))
return [
{'lnum': lineno, 'offset': offset, 'text': text, 'type': McCabeChecker._code}
for lineno, offset, text, _ in McCabeChecker(tree, path).run()
]
# pylama:ignore=W0212

View file

@ -0,0 +1,21 @@
"""PEP257 support."""
from pep257 import PEP257Checker
from pylama.lint import Linter as Abstract
class Linter(Abstract):
"""Check PEP257 errors."""
@staticmethod
def run(path, code=None, **meta):
"""PEP257 code checking.
:return list: List of errors.
"""
return [
{'lnum': e.line, 'text': e.message, 'type': 'D'}
for e in PEP257Checker().check_source(code, path)
]

View file

@ -0,0 +1,66 @@
"""PEP8 support."""
from pep8 import BaseReport, StyleGuide, get_parser
from pylama.lint import Linter as Abstract
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
class Linter(Abstract):
"""PEP8 runner."""
@staticmethod
def run(path, code=None, params=None, **meta):
"""Check code with PEP8.
:return list: List of errors.
"""
parser = get_parser()
for option in parser.option_list:
if option.dest and option.dest in params:
value = params[option.dest]
if not isinstance(value, str):
continue
params[option.dest] = option.convert_value(option, params[option.dest])
P8Style = StyleGuide(reporter=_PEP8Report, **params)
buf = StringIO(code)
return P8Style.input_file(path, lines=buf.readlines())
class _PEP8Report(BaseReport):
def __init__(self, *args, **kwargs):
super(_PEP8Report, self).__init__(*args, **kwargs)
self.errors = []
def init_file(self, filename, lines, expected, line_offset):
"""Prepare storage for errors."""
super(_PEP8Report, self).init_file(
filename, lines, expected, line_offset)
self.errors = []
def error(self, line_number, offset, text, check):
"""Save errors."""
code = super(_PEP8Report, self).error(
line_number, offset, text, check)
if code:
self.errors.append(dict(
text=text,
type=code.replace('E', 'C'),
col=offset + 1,
lnum=line_number,
))
def get_file_results(self):
"""Get errors.
:return list: List of errors.
"""
return self.errors

View file

@ -0,0 +1,49 @@
"""Pyflakes support."""
from pyflakes import checker
from pylama.lint import Linter as Abstract
checker.messages.UnusedImport.message = "W0611 %r imported but unused"
checker.messages.RedefinedWhileUnused.message = "W0404 redefinition of unused %r from line %r"
checker.messages.RedefinedInListComp.message = "W0621 list comprehension redefines %r from line %r"
checker.messages.ImportShadowedByLoopVar.message = "W0621 import %r from line %r shadowed by loop variable"
checker.messages.ImportStarUsed.message = "W0401 'from %s import *' used; unable to detect undefined names"
checker.messages.UndefinedName.message = "E0602 undefined name %r"
checker.messages.DoctestSyntaxError.message = "W0511 syntax error in doctest"
checker.messages.UndefinedExport.message = "E0603 undefined name %r in __all__"
checker.messages.UndefinedLocal.message = "E0602 local variable %r (defined in enclosing scope on line %r) referenced before assignment"
checker.messages.DuplicateArgument.message = "E1122 duplicate argument %r in function definition"
checker.messages.LateFutureImport.message = "W0410 future import(s) %r after other statements"
checker.messages.UnusedVariable.message = "W0612 local variable %r is assigned to but never used"
checker.messages.ReturnWithArgsInsideGenerator.message = "E0106 'return' with argument inside generator"
checker.messages.ReturnOutsideFunction.message = "E0104 'return' outside function"
class Linter(Abstract):
"""Pyflakes runner."""
@staticmethod
def run(path, code=None, params=None, **meta):
"""Check code with pyflakes.
:return list: List of errors.
"""
import _ast
builtins = params.get("builtins", "")
if builtins:
builtins = builtins.split(",")
tree = compile(code, path, "exec", _ast.PyCF_ONLY_AST)
w = checker.Checker(tree, path, builtins=builtins)
w.messages = sorted(w.messages, key=lambda m: m.lineno)
return [
{'lnum': m.lineno, 'text': m.message % m.message_args}
for m in sorted(w.messages, key=lambda m: m.lineno)
]
# pylama:ignore=E501,C0301

View file

@ -0,0 +1,12 @@
""" Description. """
# Module information
# ==================
__version__ = "2.1.1"
__project__ = "pylama_pylint"
__author__ = "horneds <horneds@gmail.com>"
__license__ = "BSD"
from .main import Linter # noqa

View file

@ -0,0 +1,111 @@
""" Pylint support. """
from os import path as op, environ
import logging
from pylama.lint import Linter as BaseLinter
CURDIR = op.abspath(op.dirname(__file__))
from astroid import MANAGER
from pylint.lint import Run
from pylint.reporters import BaseReporter
HOME_RCFILE = op.abspath(op.join(environ.get('HOME', ''), '.pylintrc'))
LAMA_RCFILE = op.abspath(op.join(CURDIR, 'pylint.rc'))
logger = logging.getLogger('pylama')
class Linter(BaseLinter):
""" Check code with pylint. """
@staticmethod
def run(path, code, params=None, ignore=None, select=None, **meta):
""" Pylint code checking.
:return list: List of errors.
"""
logger.debug('Start pylint')
MANAGER.astroid_cache.clear()
class Reporter(BaseReporter):
def __init__(self):
self.errors = []
super(Reporter, self).__init__()
def _display(self, layout):
pass
def add_message(self, msg_id, location, msg):
_, _, line, col = location[1:]
self.errors.append(dict(
lnum=line,
col=col,
text="%s %s" % (msg_id, msg),
type=msg_id[0]
))
params = _Params(ignore=ignore, select=select, params=params)
logger.debug(params)
runner = Run(
[path] + params.to_attrs(), reporter=Reporter(), exit=False)
return runner.linter.reporter.errors
class _Params(object):
""" Store pylint params. """
def __init__(self, select=None, ignore=None, params=None):
params = dict(params.items())
rcfile = params.get('rcfile', LAMA_RCFILE)
enable = params.get('enable', None)
disable = params.get('disable', None)
if op.exists(HOME_RCFILE):
rcfile = HOME_RCFILE
if select:
enable = select | set(enable.split(",") if enable else [])
if ignore:
disable = ignore | set(disable.split(",") if disable else [])
params.update(dict(
report=params.get('report', False), rcfile=rcfile,
enable=enable, disable=disable))
self.params = dict(
(name.replace('_', '-'), self.prepare_value(value))
for name, value in params.items() if value is not None)
@staticmethod
def prepare_value(value):
""" Prepare value to pylint. """
if isinstance(value, (list, tuple, set)):
return ",".join(value)
if isinstance(value, bool):
return "y" if value else "n"
return str(value)
def to_attrs(self):
""" Convert to argument list. """
return ["--%s=%s" % item for item in self.params.items()]
def __str__(self):
return " ".join(self.to_attrs())
def __repr__(self):
return "<Pylint %s>" % self
# pylama:ignore=W0403

View file

@ -0,0 +1,23 @@
[MESSAGES CONTROL]
# Disable the message(s) with the given id(s).
# http://pylint-messages.wikidot.com/all-codes
#
# C0103: Invalid name "%s" (should match %s)
# E1101: %s %r has no %r member
# R0901: Too many ancestors (%s/%s)
# R0902: Too many instance attributes (%s/%s)
# R0903: Too few public methods (%s/%s)
# R0904: Too many public methods (%s/%s)
# R0913: Too many arguments (%s/%s)
# R0915: Too many statements (%s/%s)
# W0141: Used builtin function %r
# W0142: Used * or ** magic
# W0221: Arguments number differs from %s method
# W0232: Class has no __init__ method
# W0613: Unused argument %r
# W0631: Using possibly undefined loop variable %r
#
disable = C0103,E1101,R0901,R0902,R0903,R0904,R0913,R0915,W0141,W0142,W0221,W0232,W0613,W0631
[TYPECHECK]
generated-members = REQUEST,acl_users,aq_parent,objects,DoesNotExist,_meta,status_code,content,context

View file

@ -0,0 +1,101 @@
"""Pylama's shell support."""
from __future__ import absolute_import, with_statement
import sys
from os import walk, path as op
from .config import parse_options, CURDIR, setup_logger
from .core import LOGGER, run
from .async import check_async
def check_path(options, rootdir=None, candidates=None, code=None):
"""Check path.
:param rootdir: Root directory (for making relative file paths)
:param options: Parsed pylama options (from pylama.config.parse_options)
:returns: (list) Errors list
"""
if not candidates:
candidates = []
for path_ in options.paths:
path = op.abspath(path_)
if op.isdir(path):
for root, _, files in walk(path):
candidates += [op.relpath(op.join(root, f), CURDIR) for f in files]
else:
candidates.append(path)
if rootdir is None:
rootdir = path if op.isdir(path) else op.dirname(path)
paths = []
for path in candidates:
if not options.force and not any(l.allow(path) for _, l in options.linters):
continue
if not op.exists(path):
continue
paths.append(path)
if options.async:
return check_async(paths, options, rootdir)
errors = []
for path in paths:
errors += run(path=path, code=code, rootdir=rootdir, options=options)
return errors
def shell(args=None, error=True):
"""Endpoint for console.
Parse a command arguments, configuration files and run a checkers.
:return list: list of errors
:raise SystemExit:
"""
if args is None:
args = sys.argv[1:]
options = parse_options(args)
setup_logger(options)
LOGGER.info(options)
# Install VSC hook
if options.hook:
from .hook import install_hook
return install_hook(options.path)
return process_paths(options, error=error)
def process_paths(options, candidates=None, error=True):
"""Process files and log errors."""
errors = check_path(options, rootdir=CURDIR, candidates=candidates)
pattern = "%(filename)s:%(lnum)s:%(col)s: %(text)s"
if options.format == 'pylint':
pattern = "%(filename)s:%(lnum)s: [%(type)s] %(text)s"
for er in errors:
if options.abspath:
er._info['filename'] = op.abspath(er.filename)
LOGGER.warning(pattern, er._info)
if error:
sys.exit(int(bool(errors)))
return errors
if __name__ == '__main__':
shell()
# pylama:ignore=F0001

View file

@ -0,0 +1,87 @@
""" py.test plugin for checking files with pylama. """
from __future__ import absolute_import
from os import path as op
import py # noqa
import pytest
HISTKEY = "pylama/mtimes"
def pytest_addoption(parser):
group = parser.getgroup("general")
group.addoption(
'--pylama', action='store_true',
help="perform some pylama code checks on .py files")
def pytest_sessionstart(session):
config = session.config
if config.option.pylama and getattr(config, 'cache', None):
config._pylamamtimes = config.cache.get(HISTKEY, {})
def pytest_sessionfinish(session):
config = session.config
if hasattr(config, "_pylamamtimes"):
config.cache.set(HISTKEY, config._pylamamtimes)
def pytest_collect_file(path, parent):
config = parent.config
if config.option.pylama and path.ext == '.py':
return PylamaItem(path, parent)
class PylamaError(Exception):
""" indicates an error during pylama checks. """
class PylamaItem(pytest.Item, pytest.File):
def __init__(self, path, parent):
super(PylamaItem, self).__init__(path, parent)
self.add_marker("pep8")
self.cache = None
self._pylamamtimes = None
def setup(self):
if not getattr(self.config, 'cache', None):
return False
self.cache = True
self._pylamamtimes = self.fspath.mtime()
pylamamtimes = self.config._pylamamtimes
old = pylamamtimes.get(str(self.fspath), 0)
if old == self._pylamamtimes:
pytest.skip("file(s) previously passed Pylama checks")
def runtest(self):
errors = check_file(self.fspath)
if errors:
pattern = "%(filename)s:%(lnum)s:%(col)s: %(text)s"
out = "\n".join([pattern % e._info for e in errors])
raise PylamaError(out)
# update mtime only if test passed
# otherwise failures would not be re-run next time
if self.cache:
self.config._pylamamtimes[str(self.fspath)] = self._pylamamtimes
def repr_failure(self, excinfo):
if excinfo.errisinstance(PylamaError):
return excinfo.value.args[0]
return super(PylamaItem, self).repr_failure(excinfo)
def check_file(path):
from pylama.main import parse_options, process_paths
from pylama.config import CURDIR
options = parse_options()
path = op.relpath(str(path), CURDIR)
return process_paths(options, candidates=[path], error=False)
# pylama:ignore=D,E1002,W0212,F0001