mirror of https://github.com/sgoudham/Enso-Bot.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
364 lines
13 KiB
Python
364 lines
13 KiB
Python
5 years ago
|
#!/usr/bin/env python3
|
||
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
|
||
|
# Use of this source code is governed by a BSD-style license that can be
|
||
|
# found in the LICENSE file.
|
||
|
|
||
|
"""
|
||
|
Notes about unicode handling in psutil
|
||
|
======================================
|
||
|
|
||
|
Starting from version 5.3.0 psutil adds unicode support, see:
|
||
|
https://github.com/giampaolo/psutil/issues/1040
|
||
|
The notes below apply to *any* API returning a string such as
|
||
|
process exe(), cwd() or username():
|
||
|
|
||
|
* all strings are encoded by using the OS filesystem encoding
|
||
|
(sys.getfilesystemencoding()) which varies depending on the platform
|
||
|
(e.g. "UTF-8" on macOS, "mbcs" on Win)
|
||
|
* no API call is supposed to crash with UnicodeDecodeError
|
||
|
* instead, in case of badly encoded data returned by the OS, the
|
||
|
following error handlers are used to replace the corrupted characters in
|
||
|
the string:
|
||
|
* Python 3: sys.getfilesystemencodeerrors() (PY 3.6+) or
|
||
|
"surrogatescape" on POSIX and "replace" on Windows
|
||
|
* Python 2: "replace"
|
||
|
* on Python 2 all APIs return bytes (str type), never unicode
|
||
|
* on Python 2, you can go back to unicode by doing:
|
||
|
|
||
|
>>> unicode(p.exe(), sys.getdefaultencoding(), errors="replace")
|
||
|
|
||
|
For a detailed explanation of how psutil handles unicode see #1040.
|
||
|
|
||
|
Tests
|
||
|
=====
|
||
|
|
||
|
List of APIs returning or dealing with a string:
|
||
|
('not tested' means they are not tested to deal with non-ASCII strings):
|
||
|
|
||
|
* Process.cmdline()
|
||
|
* Process.connections('unix')
|
||
|
* Process.cwd()
|
||
|
* Process.environ()
|
||
|
* Process.exe()
|
||
|
* Process.memory_maps()
|
||
|
* Process.name()
|
||
|
* Process.open_files()
|
||
|
* Process.username() (not tested)
|
||
|
|
||
|
* disk_io_counters() (not tested)
|
||
|
* disk_partitions() (not tested)
|
||
|
* disk_usage(str)
|
||
|
* net_connections('unix')
|
||
|
* net_if_addrs() (not tested)
|
||
|
* net_if_stats() (not tested)
|
||
|
* net_io_counters() (not tested)
|
||
|
* sensors_fans() (not tested)
|
||
|
* sensors_temperatures() (not tested)
|
||
|
* users() (not tested)
|
||
|
|
||
|
* WindowsService.binpath() (not tested)
|
||
|
* WindowsService.description() (not tested)
|
||
|
* WindowsService.display_name() (not tested)
|
||
|
* WindowsService.name() (not tested)
|
||
|
* WindowsService.status() (not tested)
|
||
|
* WindowsService.username() (not tested)
|
||
|
|
||
|
In here we create a unicode path with a funky non-ASCII name and (where
|
||
|
possible) make psutil return it back (e.g. on name(), exe(), open_files(),
|
||
|
etc.) and make sure that:
|
||
|
|
||
|
* psutil never crashes with UnicodeDecodeError
|
||
|
* the returned path matches
|
||
|
"""
|
||
|
|
||
|
import os
|
||
|
import shutil
|
||
|
import traceback
|
||
|
import warnings
|
||
|
from contextlib import closing
|
||
|
|
||
|
from psutil import BSD
|
||
|
from psutil import OPENBSD
|
||
|
from psutil import POSIX
|
||
|
from psutil import WINDOWS
|
||
|
from psutil._compat import PY3
|
||
|
from psutil._compat import u
|
||
|
from psutil.tests import APPVEYOR
|
||
|
from psutil.tests import ASCII_FS
|
||
|
from psutil.tests import bind_unix_socket
|
||
|
from psutil.tests import chdir
|
||
|
from psutil.tests import CI_TESTING
|
||
|
from psutil.tests import CIRRUS
|
||
|
from psutil.tests import copyload_shared_lib
|
||
|
from psutil.tests import create_exe
|
||
|
from psutil.tests import get_testfn
|
||
|
from psutil.tests import HAS_CONNECTIONS_UNIX
|
||
|
from psutil.tests import HAS_ENVIRON
|
||
|
from psutil.tests import HAS_MEMORY_MAPS
|
||
|
from psutil.tests import INVALID_UNICODE_SUFFIX
|
||
|
from psutil.tests import PsutilTestCase
|
||
|
from psutil.tests import PYPY
|
||
|
from psutil.tests import safe_mkdir
|
||
|
from psutil.tests import safe_rmpath
|
||
|
from psutil.tests import serialrun
|
||
|
from psutil.tests import skip_on_access_denied
|
||
|
from psutil.tests import spawn_testproc
|
||
|
from psutil.tests import terminate
|
||
|
from psutil.tests import TESTFN_PREFIX
|
||
|
from psutil.tests import UNICODE_SUFFIX
|
||
|
from psutil.tests import unittest
|
||
|
import psutil
|
||
|
|
||
|
|
||
|
if APPVEYOR:
|
||
|
def safe_rmpath(path): # NOQA
|
||
|
# TODO - this is quite random and I'm not sure why it happens,
|
||
|
# nor I can reproduce it locally:
|
||
|
# https://ci.appveyor.com/project/giampaolo/psutil/build/job/
|
||
|
# jiq2cgd6stsbtn60
|
||
|
# safe_rmpath() happens after reap_children() so this is weird
|
||
|
# Perhaps wait_procs() on Windows is broken? Maybe because
|
||
|
# of STILL_ACTIVE?
|
||
|
# https://github.com/giampaolo/psutil/blob/
|
||
|
# 68c7a70728a31d8b8b58f4be6c4c0baa2f449eda/psutil/arch/
|
||
|
# windows/process_info.c#L146
|
||
|
from psutil.tests import safe_rmpath as _rm
|
||
|
try:
|
||
|
return _rm(path)
|
||
|
except WindowsError:
|
||
|
traceback.print_exc()
|
||
|
|
||
|
|
||
|
def try_unicode(suffix):
|
||
|
"""Return True if both the fs and the subprocess module can
|
||
|
deal with a unicode file name.
|
||
|
"""
|
||
|
if PY3:
|
||
|
return True
|
||
|
sproc = None
|
||
|
testfn = get_testfn(suffix=suffix)
|
||
|
try:
|
||
|
safe_rmpath(testfn)
|
||
|
create_exe(testfn)
|
||
|
sproc = spawn_testproc(cmd=[testfn])
|
||
|
shutil.copyfile(testfn, testfn + '-2')
|
||
|
safe_rmpath(testfn + '-2')
|
||
|
except (UnicodeEncodeError, IOError):
|
||
|
return False
|
||
|
else:
|
||
|
return True
|
||
|
finally:
|
||
|
if sproc is not None:
|
||
|
terminate(sproc)
|
||
|
safe_rmpath(testfn)
|
||
|
|
||
|
|
||
|
# ===================================================================
|
||
|
# FS APIs
|
||
|
# ===================================================================
|
||
|
|
||
|
|
||
|
@serialrun
|
||
|
@unittest.skipIf(ASCII_FS, "ASCII fs")
|
||
|
@unittest.skipIf(PYPY and not PY3, "too much trouble on PYPY2")
|
||
|
class _BaseFSAPIsTests(object):
|
||
|
funky_suffix = None
|
||
|
|
||
|
@classmethod
|
||
|
def setUpClass(cls):
|
||
|
cls.funky_name = get_testfn(suffix=cls.funky_suffix)
|
||
|
create_exe(cls.funky_name)
|
||
|
|
||
|
@classmethod
|
||
|
def tearDownClass(cls):
|
||
|
safe_rmpath(cls.funky_name)
|
||
|
|
||
|
def expect_exact_path_match(self):
|
||
|
raise NotImplementedError("must be implemented in subclass")
|
||
|
|
||
|
# ---
|
||
|
|
||
|
def test_proc_exe(self):
|
||
|
subp = self.spawn_testproc(cmd=[self.funky_name])
|
||
|
p = psutil.Process(subp.pid)
|
||
|
exe = p.exe()
|
||
|
self.assertIsInstance(exe, str)
|
||
|
if self.expect_exact_path_match():
|
||
|
self.assertEqual(os.path.normcase(exe),
|
||
|
os.path.normcase(self.funky_name))
|
||
|
|
||
|
def test_proc_name(self):
|
||
|
subp = self.spawn_testproc(cmd=[self.funky_name])
|
||
|
name = psutil.Process(subp.pid).name()
|
||
|
self.assertIsInstance(name, str)
|
||
|
if self.expect_exact_path_match():
|
||
|
self.assertEqual(name, os.path.basename(self.funky_name))
|
||
|
|
||
|
def test_proc_cmdline(self):
|
||
|
subp = self.spawn_testproc(cmd=[self.funky_name])
|
||
|
p = psutil.Process(subp.pid)
|
||
|
cmdline = p.cmdline()
|
||
|
for part in cmdline:
|
||
|
self.assertIsInstance(part, str)
|
||
|
if self.expect_exact_path_match():
|
||
|
self.assertEqual(cmdline, [self.funky_name])
|
||
|
|
||
|
def test_proc_cwd(self):
|
||
|
dname = self.funky_name + "2"
|
||
|
self.addCleanup(safe_rmpath, dname)
|
||
|
safe_mkdir(dname)
|
||
|
with chdir(dname):
|
||
|
p = psutil.Process()
|
||
|
cwd = p.cwd()
|
||
|
self.assertIsInstance(p.cwd(), str)
|
||
|
if self.expect_exact_path_match():
|
||
|
self.assertEqual(cwd, dname)
|
||
|
|
||
|
@unittest.skipIf(PYPY and WINDOWS, "fails on PYPY + WINDOWS")
|
||
|
def test_proc_open_files(self):
|
||
|
p = psutil.Process()
|
||
|
start = set(p.open_files())
|
||
|
with open(self.funky_name, 'rb'):
|
||
|
new = set(p.open_files())
|
||
|
path = (new - start).pop().path
|
||
|
self.assertIsInstance(path, str)
|
||
|
if BSD and not path:
|
||
|
# XXX - see https://github.com/giampaolo/psutil/issues/595
|
||
|
return self.skipTest("open_files on BSD is broken")
|
||
|
if self.expect_exact_path_match():
|
||
|
self.assertEqual(os.path.normcase(path),
|
||
|
os.path.normcase(self.funky_name))
|
||
|
|
||
|
@unittest.skipIf(not POSIX, "POSIX only")
|
||
|
def test_proc_connections(self):
|
||
|
name = self.get_testfn(suffix=self.funky_suffix)
|
||
|
try:
|
||
|
sock = bind_unix_socket(name)
|
||
|
except UnicodeEncodeError:
|
||
|
if PY3:
|
||
|
raise
|
||
|
else:
|
||
|
raise unittest.SkipTest("not supported")
|
||
|
with closing(sock):
|
||
|
conn = psutil.Process().connections('unix')[0]
|
||
|
self.assertIsInstance(conn.laddr, str)
|
||
|
# AF_UNIX addr not set on OpenBSD
|
||
|
if not OPENBSD and not CIRRUS: # XXX
|
||
|
self.assertEqual(conn.laddr, name)
|
||
|
|
||
|
@unittest.skipIf(not POSIX, "POSIX only")
|
||
|
@unittest.skipIf(not HAS_CONNECTIONS_UNIX, "can't list UNIX sockets")
|
||
|
@skip_on_access_denied()
|
||
|
def test_net_connections(self):
|
||
|
def find_sock(cons):
|
||
|
for conn in cons:
|
||
|
if os.path.basename(conn.laddr).startswith(TESTFN_PREFIX):
|
||
|
return conn
|
||
|
raise ValueError("connection not found")
|
||
|
|
||
|
name = self.get_testfn(suffix=self.funky_suffix)
|
||
|
try:
|
||
|
sock = bind_unix_socket(name)
|
||
|
except UnicodeEncodeError:
|
||
|
if PY3:
|
||
|
raise
|
||
|
else:
|
||
|
raise unittest.SkipTest("not supported")
|
||
|
with closing(sock):
|
||
|
cons = psutil.net_connections(kind='unix')
|
||
|
# AF_UNIX addr not set on OpenBSD
|
||
|
if not OPENBSD:
|
||
|
conn = find_sock(cons)
|
||
|
self.assertIsInstance(conn.laddr, str)
|
||
|
self.assertEqual(conn.laddr, name)
|
||
|
|
||
|
def test_disk_usage(self):
|
||
|
dname = self.funky_name + "2"
|
||
|
self.addCleanup(safe_rmpath, dname)
|
||
|
safe_mkdir(dname)
|
||
|
psutil.disk_usage(dname)
|
||
|
|
||
|
@unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
|
||
|
@unittest.skipIf(not PY3, "ctypes does not support unicode on PY2")
|
||
|
@unittest.skipIf(PYPY and WINDOWS,
|
||
|
"copyload_shared_lib() unsupported on PYPY + WINDOWS")
|
||
|
def test_memory_maps(self):
|
||
|
# XXX: on Python 2, using ctypes.CDLL with a unicode path
|
||
|
# opens a message box which blocks the test run.
|
||
|
with copyload_shared_lib(suffix=self.funky_suffix) as funky_path:
|
||
|
def normpath(p):
|
||
|
return os.path.realpath(os.path.normcase(p))
|
||
|
libpaths = [normpath(x.path)
|
||
|
for x in psutil.Process().memory_maps()]
|
||
|
# ...just to have a clearer msg in case of failure
|
||
|
libpaths = [x for x in libpaths if TESTFN_PREFIX in x]
|
||
|
self.assertIn(normpath(funky_path), libpaths)
|
||
|
for path in libpaths:
|
||
|
self.assertIsInstance(path, str)
|
||
|
|
||
|
|
||
|
# https://travis-ci.org/giampaolo/psutil/jobs/440073249
|
||
|
# @unittest.skipIf(PYPY and TRAVIS, "unreliable on PYPY + TRAVIS")
|
||
|
# @unittest.skipIf(MACOS and TRAVIS, "unreliable on TRAVIS") # TODO
|
||
|
@unittest.skipIf(not try_unicode(UNICODE_SUFFIX),
|
||
|
"can't deal with unicode str")
|
||
|
class TestFSAPIs(_BaseFSAPIsTests, PsutilTestCase):
|
||
|
"""Test FS APIs with a funky, valid, UTF8 path name."""
|
||
|
funky_suffix = UNICODE_SUFFIX
|
||
|
|
||
|
def expect_exact_path_match(self):
|
||
|
# Do not expect psutil to correctly handle unicode paths on
|
||
|
# Python 2 if os.listdir() is not able either.
|
||
|
here = '.' if isinstance(self.funky_name, str) else u('.')
|
||
|
with warnings.catch_warnings():
|
||
|
warnings.simplefilter("ignore")
|
||
|
return self.funky_name in os.listdir(here)
|
||
|
|
||
|
|
||
|
@unittest.skipIf(CI_TESTING, "unreliable on CI")
|
||
|
@unittest.skipIf(not try_unicode(INVALID_UNICODE_SUFFIX),
|
||
|
"can't deal with invalid unicode str")
|
||
|
class TestFSAPIsWithInvalidPath(_BaseFSAPIsTests, PsutilTestCase):
|
||
|
"""Test FS APIs with a funky, invalid path name."""
|
||
|
funky_suffix = INVALID_UNICODE_SUFFIX
|
||
|
|
||
|
@classmethod
|
||
|
def expect_exact_path_match(cls):
|
||
|
# Invalid unicode names are supposed to work on Python 2.
|
||
|
return True
|
||
|
|
||
|
|
||
|
# ===================================================================
|
||
|
# Non fs APIs
|
||
|
# ===================================================================
|
||
|
|
||
|
|
||
|
class TestNonFSAPIS(PsutilTestCase):
|
||
|
"""Unicode tests for non fs-related APIs."""
|
||
|
|
||
|
@unittest.skipIf(not HAS_ENVIRON, "not supported")
|
||
|
@unittest.skipIf(PYPY and WINDOWS, "segfaults on PYPY + WINDOWS")
|
||
|
def test_proc_environ(self):
|
||
|
# Note: differently from others, this test does not deal
|
||
|
# with fs paths. On Python 2 subprocess module is broken as
|
||
|
# it's not able to handle with non-ASCII env vars, so
|
||
|
# we use "è", which is part of the extended ASCII table
|
||
|
# (unicode point <= 255).
|
||
|
env = os.environ.copy()
|
||
|
funky_str = UNICODE_SUFFIX if PY3 else 'è'
|
||
|
env['FUNNY_ARG'] = funky_str
|
||
|
sproc = self.spawn_testproc(env=env)
|
||
|
p = psutil.Process(sproc.pid)
|
||
|
env = p.environ()
|
||
|
for k, v in env.items():
|
||
|
self.assertIsInstance(k, str)
|
||
|
self.assertIsInstance(v, str)
|
||
|
self.assertEqual(env['FUNNY_ARG'], funky_str)
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
from psutil.tests.runner import run_from_name
|
||
|
run_from_name(__file__)
|