Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
bob
bob.core
Commits
46c274b3
Commit
46c274b3
authored
Oct 18, 2016
by
Manuel Günther
Browse files
Moved test cases into their own Extension; added test for C++ logging enabling
parent
16b0c1aa
Pipeline
#4950
failed with stages
in 10 minutes and 17 seconds
Changes
4
Pipelines
1
Show whitespace changes
Inline
Side-by-side
bob/core/logging.cpp
View file @
46c274b3
...
...
@@ -15,7 +15,6 @@
#include
<bob.blitz/cleanup.h>
#include
<bob.extension/documentation.h>
#include
<boost/shared_array.hpp>
#include
<boost/make_shared.hpp>
#define PYTHON_LOGGING_DEBUG 0
...
...
@@ -246,188 +245,6 @@ BOB_CATCH_FUNCTION("reset", 0)
* Testing Infrastructure *
**************************/
struct
message_info_t
{
boost
::
iostreams
::
stream
<
bob
::
core
::
AutoOutputDevice
>*
s
;
std
::
string
message
;
bool
exit
;
unsigned
int
ntimes
;
unsigned
int
thread_id
;
};
static
void
*
log_message_inner
(
void
*
cookie
)
{
message_info_t
*
mi
=
(
message_info_t
*
)
cookie
;
# if PYTHON_LOGGING_DEBUG != 0
if
(
PyEval_ThreadsInitialized
())
{
static_log
<<
"(thread "
<<
mi
->
thread_id
<<
") Python threads initialized correctly for this thread"
<<
std
::
endl
;
}
else
{
static_log
<<
"(thread "
<<
mi
->
thread_id
<<
") Python threads NOT INITIALIZED correctly for this thread"
<<
std
::
endl
;
}
# endif
for
(
unsigned
int
i
=
0
;
i
<
(
mi
->
ntimes
);
++
i
)
{
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread "
<<
mi
->
thread_id
<<
") Injecting message `"
<<
mi
->
message
<<
" (thread "
<<
mi
->
thread_id
<<
"; iteration "
<<
i
<<
")'"
<<
std
::
endl
;
# endif
*
(
mi
->
s
)
<<
mi
->
message
# if PYTHON_LOGGING_DEBUG != 0
<<
" (thread "
<<
mi
->
thread_id
<<
"; iteration "
<<
i
<<
")"
# endif
<<
std
::
endl
;
mi
->
s
->
flush
();
}
if
(
mi
->
exit
)
{
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread "
<<
mi
->
thread_id
<<
") Exiting this thread"
<<
std
::
endl
;
# endif
pthread_exit
(
0
);
}
# if PYTHON_LOGGING_DEBUG != 0
if
(
mi
->
exit
)
{
static_log
<<
"(thread "
<<
mi
->
thread_id
<<
") Returning 0"
<<
std
::
endl
;
}
# endif
return
0
;
}
static
auto
_logmsg_doc
=
bob
::
extension
::
FunctionDoc
(
"_log_message"
,
"Logs a message into Bob's logging system from C++"
,
"This function is bound for testing purposes only and is not part of the Python API for bob.core"
)
.
add_prototype
(
"ntimes, stream, message"
)
.
add_parameter
(
"ntimes"
,
"int"
,
"The number of times to print the given message"
)
.
add_parameter
(
"stream"
,
"str"
,
"The stream to use for logging the message. Choose from ``('debug', 'info', 'warn', 'error')"
)
.
add_parameter
(
"message"
,
"str"
,
"The message to be logged"
)
;
static
PyObject
*
log_message
(
PyObject
*
,
PyObject
*
args
,
PyObject
*
kwds
)
{
BOB_TRY
char
**
kwlist
=
_logmsg_doc
.
kwlist
();
unsigned
int
ntimes
;
const
char
*
stream
;
const
char
*
message
;
if
(
!
PyArg_ParseTupleAndKeywords
(
args
,
kwds
,
"Iss"
,
kwlist
,
&
ntimes
,
&
stream
,
&
message
))
return
0
;
// implements: if stream not in ('debug', 'info', 'warn', 'error')
boost
::
iostreams
::
stream
<
bob
::
core
::
AutoOutputDevice
>*
s
=
0
;
if
(
strncmp
(
stream
,
"debug"
,
5
)
==
0
)
s
=
&
bob
::
core
::
debug
;
else
if
(
strncmp
(
stream
,
"info"
,
4
)
==
0
)
s
=
&
bob
::
core
::
info
;
else
if
(
strncmp
(
stream
,
"warn"
,
4
)
==
0
)
s
=
&
bob
::
core
::
warn
;
else
if
(
strncmp
(
stream
,
"error"
,
5
)
==
0
)
s
=
&
bob
::
core
::
error
;
else
if
(
strncmp
(
stream
,
"fatal"
,
5
)
==
0
)
s
=
&
bob
::
core
::
error
;
else
{
PyErr_Format
(
PyExc_ValueError
,
"parameter `stream' must be one of 'debug', 'info', 'warn', 'error' or 'fatal' (synomym for 'error'), not '%s'"
,
stream
);
return
0
;
}
// do the work for this function
auto
no_gil
=
PyEval_SaveThread
();
message_info_t
mi
=
{
s
,
message
,
false
,
ntimes
,
0
};
log_message_inner
((
void
*
)
&
mi
);
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) Returning to caller"
<<
std
::
endl
;
# endif
PyEval_RestoreThread
(
no_gil
);
Py_RETURN_NONE
;
BOB_CATCH_FUNCTION
(
"_log_message"
,
0
)
}
static
auto
_logmsg_mt_doc
=
bob
::
extension
::
FunctionDoc
(
"_log_message_mt"
,
"Logs a message into Bob's logging system from C++, in a separate thread"
,
"This function is bound for testing purposes only and is not part of the Python API for bob.core"
)
.
add_prototype
(
"nthreads, ntimes, stream, message"
)
.
add_parameter
(
"nthreads"
,
"int"
,
"The total number of threads from which to write messages to the logging system using the C++->Python API"
)
.
add_parameter
(
"ntimes"
,
"int"
,
"The number of times to print the given message"
)
.
add_parameter
(
"stream"
,
"str"
,
"The stream to use for logging the message. Choose from ``('debug', 'info', 'warn', 'error')"
)
.
add_parameter
(
"message"
,
"str"
,
"The message to be logged"
)
;
static
PyObject
*
log_message_mt
(
PyObject
*
,
PyObject
*
args
,
PyObject
*
kwds
)
{
BOB_TRY
char
**
kwlist
=
_logmsg_mt_doc
.
kwlist
();
unsigned
int
nthreads
;
unsigned
int
ntimes
;
const
char
*
stream
;
const
char
*
message
;
if
(
!
PyArg_ParseTupleAndKeywords
(
args
,
kwds
,
"IIss"
,
kwlist
,
&
nthreads
,
&
ntimes
,
&
stream
,
&
message
))
return
0
;
// implements: if stream not in ('debug', 'info', 'warn', 'error')
boost
::
iostreams
::
stream
<
bob
::
core
::
AutoOutputDevice
>*
s
=
0
;
if
(
strncmp
(
stream
,
"debug"
,
5
)
==
0
)
s
=
&
bob
::
core
::
debug
;
else
if
(
strncmp
(
stream
,
"info"
,
4
)
==
0
)
s
=
&
bob
::
core
::
info
;
else
if
(
strncmp
(
stream
,
"warn"
,
4
)
==
0
)
s
=
&
bob
::
core
::
warn
;
else
if
(
strncmp
(
stream
,
"error"
,
5
)
==
0
)
s
=
&
bob
::
core
::
error
;
else
{
PyErr_Format
(
PyExc_ValueError
,
"parameter `stream' must be one of 'debug', 'info', 'warn' or 'error', not '%s'"
,
stream
);
return
0
;
}
// do the work for this function
auto
no_gil
=
PyEval_SaveThread
();
boost
::
shared_array
<
pthread_t
>
threads
(
new
pthread_t
[
nthreads
]);
boost
::
shared_array
<
message_info_t
>
infos
(
new
message_info_t
[
nthreads
]);
for
(
unsigned
int
i
=
0
;
i
<
nthreads
;
++
i
)
{
message_info_t
mi
=
{
s
,
message
,
true
,
ntimes
,
i
+
1
};
infos
[
i
]
=
mi
;
}
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) Launching "
<<
nthreads
<<
" thread(s)"
<<
std
::
endl
;
# endif
for
(
unsigned
int
i
=
0
;
i
<
nthreads
;
++
i
)
{
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) Launch thread "
<<
(
i
+
1
)
<<
": `"
<<
message
<<
"'"
<<
std
::
endl
;
# endif
pthread_create
(
&
threads
[
i
],
NULL
,
&
log_message_inner
,
(
void
*
)
&
infos
[
i
]);
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) thread "
<<
(
i
+
1
)
<<
" == "
<<
std
::
hex
<<
threads
[
i
]
<<
std
::
dec
<<
" launched"
<<
std
::
endl
;
# endif
}
void
*
status
;
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) Waiting "
<<
nthreads
<<
" thread(s)"
<<
std
::
endl
;
# endif
for
(
unsigned
int
i
=
0
;
i
<
nthreads
;
++
i
)
{
pthread_join
(
threads
[
i
],
&
status
);
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) Waiting on thread "
<<
(
i
+
1
)
<<
std
::
endl
;
# endif
}
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) Returning to caller"
<<
std
::
endl
;
# endif
PyEval_RestoreThread
(
no_gil
);
Py_RETURN_NONE
;
BOB_CATCH_FUNCTION
(
"_log_message_mt"
,
0
)
}
static
PyMethodDef
module_methods
[]
=
{
...
...
@@ -437,18 +254,6 @@ static PyMethodDef module_methods[] = {
METH_VARARGS
|
METH_KEYWORDS
,
reset_doc
.
doc
()
},
{
_logmsg_doc
.
name
(),
(
PyCFunction
)
log_message
,
METH_VARARGS
|
METH_KEYWORDS
,
_logmsg_doc
.
doc
()
},
{
_logmsg_mt_doc
.
name
(),
(
PyCFunction
)
log_message_mt
,
METH_VARARGS
|
METH_KEYWORDS
,
_logmsg_mt_doc
.
doc
()
},
{
0
}
/* Sentinel */
};
...
...
bob/core/test.cpp
0 → 100644
View file @
46c274b3
/**
* @author Manuel Gunther <siebenkopf@googlemail.com>
* @date Tue Oct 18 11:45:30 MDT 2016
*
* @brief Binds configuration information available from bob
*/
#include
<bob.blitz/config.h>
#include
<bob.blitz/cleanup.h>
#include
<bob.extension/documentation.h>
#include
<bob.core/logging.h>
#include
<boost/shared_array.hpp>
PyDoc_STRVAR
(
module_docstr
,
"Tests the C++ API of bob.core"
);
struct
message_info_t
{
boost
::
iostreams
::
stream
<
bob
::
core
::
AutoOutputDevice
>*
s
;
std
::
string
message
;
bool
exit
;
unsigned
int
ntimes
;
unsigned
int
thread_id
;
};
static
void
*
log_message_inner
(
void
*
cookie
)
{
message_info_t
*
mi
=
(
message_info_t
*
)
cookie
;
# if PYTHON_LOGGING_DEBUG != 0
if
(
PyEval_ThreadsInitialized
())
{
static_log
<<
"(thread "
<<
mi
->
thread_id
<<
") Python threads initialized correctly for this thread"
<<
std
::
endl
;
}
else
{
static_log
<<
"(thread "
<<
mi
->
thread_id
<<
") Python threads NOT INITIALIZED correctly for this thread"
<<
std
::
endl
;
}
# endif
for
(
unsigned
int
i
=
0
;
i
<
(
mi
->
ntimes
);
++
i
)
{
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread "
<<
mi
->
thread_id
<<
") Injecting message `"
<<
mi
->
message
<<
" (thread "
<<
mi
->
thread_id
<<
"; iteration "
<<
i
<<
")'"
<<
std
::
endl
;
# endif
*
(
mi
->
s
)
<<
mi
->
message
# if PYTHON_LOGGING_DEBUG != 0
<<
" (thread "
<<
mi
->
thread_id
<<
"; iteration "
<<
i
<<
")"
# endif
<<
std
::
endl
;
mi
->
s
->
flush
();
}
if
(
mi
->
exit
)
{
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread "
<<
mi
->
thread_id
<<
") Exiting this thread"
<<
std
::
endl
;
# endif
pthread_exit
(
0
);
}
# if PYTHON_LOGGING_DEBUG != 0
if
(
mi
->
exit
)
{
static_log
<<
"(thread "
<<
mi
->
thread_id
<<
") Returning 0"
<<
std
::
endl
;
}
# endif
return
0
;
}
static
auto
_logmsg_doc
=
bob
::
extension
::
FunctionDoc
(
"_test_log_message"
,
"Logs a message into Bob's logging system from C++"
,
"This function is bound for testing purposes only and is not part of the Python API for bob.core"
)
.
add_prototype
(
"ntimes, stream, message"
)
.
add_parameter
(
"ntimes"
,
"int"
,
"The number of times to print the given message"
)
.
add_parameter
(
"stream"
,
"str"
,
"The stream to use for logging the message. Choose from ``('debug', 'info', 'warn', 'error')"
)
.
add_parameter
(
"message"
,
"str"
,
"The message to be logged"
)
;
static
PyObject
*
log_message
(
PyObject
*
,
PyObject
*
args
,
PyObject
*
kwds
)
{
BOB_TRY
char
**
kwlist
=
_logmsg_doc
.
kwlist
();
unsigned
int
ntimes
;
const
char
*
stream
;
const
char
*
message
;
if
(
!
PyArg_ParseTupleAndKeywords
(
args
,
kwds
,
"Iss"
,
kwlist
,
&
ntimes
,
&
stream
,
&
message
))
return
0
;
// implements: if stream not in ('debug', 'info', 'warn', 'error')
boost
::
iostreams
::
stream
<
bob
::
core
::
AutoOutputDevice
>*
s
=
0
;
if
(
strncmp
(
stream
,
"debug"
,
5
)
==
0
)
s
=
&
bob
::
core
::
debug
;
else
if
(
strncmp
(
stream
,
"info"
,
4
)
==
0
)
s
=
&
bob
::
core
::
info
;
else
if
(
strncmp
(
stream
,
"warn"
,
4
)
==
0
)
s
=
&
bob
::
core
::
warn
;
else
if
(
strncmp
(
stream
,
"error"
,
5
)
==
0
)
s
=
&
bob
::
core
::
error
;
else
if
(
strncmp
(
stream
,
"fatal"
,
5
)
==
0
)
s
=
&
bob
::
core
::
error
;
else
{
PyErr_Format
(
PyExc_ValueError
,
"parameter `stream' must be one of 'debug', 'info', 'warn', 'error' or 'fatal' (synomym for 'error'), not '%s'"
,
stream
);
return
0
;
}
// do the work for this function
auto
no_gil
=
PyEval_SaveThread
();
message_info_t
mi
=
{
s
,
message
,
false
,
ntimes
,
0
};
log_message_inner
((
void
*
)
&
mi
);
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) Returning to caller"
<<
std
::
endl
;
# endif
PyEval_RestoreThread
(
no_gil
);
Py_RETURN_NONE
;
BOB_CATCH_FUNCTION
(
"_log_message"
,
0
)
}
static
auto
_logmsg_mt_doc
=
bob
::
extension
::
FunctionDoc
(
"_test_log_message_mt"
,
"Logs a message into Bob's logging system from C++, in a separate thread"
,
"This function is bound for testing purposes only and is not part of the Python API for bob.core"
)
.
add_prototype
(
"nthreads, ntimes, stream, message"
)
.
add_parameter
(
"nthreads"
,
"int"
,
"The total number of threads from which to write messages to the logging system using the C++->Python API"
)
.
add_parameter
(
"ntimes"
,
"int"
,
"The number of times to print the given message"
)
.
add_parameter
(
"stream"
,
"str"
,
"The stream to use for logging the message. Choose from ``('debug', 'info', 'warn', 'error')"
)
.
add_parameter
(
"message"
,
"str"
,
"The message to be logged"
)
;
static
PyObject
*
log_message_mt
(
PyObject
*
,
PyObject
*
args
,
PyObject
*
kwds
)
{
BOB_TRY
char
**
kwlist
=
_logmsg_mt_doc
.
kwlist
();
unsigned
int
nthreads
;
unsigned
int
ntimes
;
const
char
*
stream
;
const
char
*
message
;
if
(
!
PyArg_ParseTupleAndKeywords
(
args
,
kwds
,
"IIss"
,
kwlist
,
&
nthreads
,
&
ntimes
,
&
stream
,
&
message
))
return
0
;
// implements: if stream not in ('debug', 'info', 'warn', 'error')
boost
::
iostreams
::
stream
<
bob
::
core
::
AutoOutputDevice
>*
s
=
0
;
if
(
strncmp
(
stream
,
"debug"
,
5
)
==
0
)
s
=
&
bob
::
core
::
debug
;
else
if
(
strncmp
(
stream
,
"info"
,
4
)
==
0
)
s
=
&
bob
::
core
::
info
;
else
if
(
strncmp
(
stream
,
"warn"
,
4
)
==
0
)
s
=
&
bob
::
core
::
warn
;
else
if
(
strncmp
(
stream
,
"error"
,
5
)
==
0
)
s
=
&
bob
::
core
::
error
;
else
{
PyErr_Format
(
PyExc_ValueError
,
"parameter `stream' must be one of 'debug', 'info', 'warn' or 'error', not '%s'"
,
stream
);
return
0
;
}
// do the work for this function
auto
no_gil
=
PyEval_SaveThread
();
boost
::
shared_array
<
pthread_t
>
threads
(
new
pthread_t
[
nthreads
]);
boost
::
shared_array
<
message_info_t
>
infos
(
new
message_info_t
[
nthreads
]);
for
(
unsigned
int
i
=
0
;
i
<
nthreads
;
++
i
)
{
message_info_t
mi
=
{
s
,
message
,
true
,
ntimes
,
i
+
1
};
infos
[
i
]
=
mi
;
}
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) Launching "
<<
nthreads
<<
" thread(s)"
<<
std
::
endl
;
# endif
for
(
unsigned
int
i
=
0
;
i
<
nthreads
;
++
i
)
{
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) Launch thread "
<<
(
i
+
1
)
<<
": `"
<<
message
<<
"'"
<<
std
::
endl
;
# endif
pthread_create
(
&
threads
[
i
],
NULL
,
&
log_message_inner
,
(
void
*
)
&
infos
[
i
]);
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) thread "
<<
(
i
+
1
)
<<
" == "
<<
std
::
hex
<<
threads
[
i
]
<<
std
::
dec
<<
" launched"
<<
std
::
endl
;
# endif
}
void
*
status
;
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) Waiting "
<<
nthreads
<<
" thread(s)"
<<
std
::
endl
;
# endif
for
(
unsigned
int
i
=
0
;
i
<
nthreads
;
++
i
)
{
pthread_join
(
threads
[
i
],
&
status
);
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) Waiting on thread "
<<
(
i
+
1
)
<<
std
::
endl
;
# endif
}
# if PYTHON_LOGGING_DEBUG != 0
static_log
<<
"(thread 0) Returning to caller"
<<
std
::
endl
;
# endif
PyEval_RestoreThread
(
no_gil
);
Py_RETURN_NONE
;
BOB_CATCH_FUNCTION
(
"_log_message_mt"
,
0
)
}
static
auto
_output_disable_doc
=
bob
::
extension
::
FunctionDoc
(
"_test_output_disable"
,
"Writes C++ messages with and without being visible"
)
.
add_prototype
(
""
)
;
PyObject
*
output_disable
(
PyObject
*
,
PyObject
*
args
,
PyObject
*
kwds
)
{
BOB_TRY
char
**
kwlist
=
_output_disable_doc
.
kwlist
();
if
(
!
PyArg_ParseTupleAndKeywords
(
args
,
kwds
,
""
,
kwlist
))
return
0
;
bob
::
core
::
log_level
(
bob
::
core
::
DEBUG
);
bob
::
core
::
debug
<<
"This is a debug message"
<<
std
::
endl
;
bob
::
core
::
info
<<
"This is an info message"
<<
std
::
endl
;
bob
::
core
::
warn
<<
"This is a warning message"
<<
std
::
endl
;
bob
::
core
::
error
<<
"This is an error message"
<<
std
::
endl
;
bob
::
core
::
log_level
(
bob
::
core
::
ERROR
);
bob
::
core
::
debug
<<
"This is a debug message"
<<
std
::
endl
;
bob
::
core
::
info
<<
"This is an info message"
<<
std
::
endl
;
bob
::
core
::
warn
<<
"This is a warning message"
<<
std
::
endl
;
bob
::
core
::
error
<<
"This is an error message"
<<
std
::
endl
;
bob
::
core
::
log_level
(
bob
::
core
::
DISABLE
);
bob
::
core
::
debug
<<
"This is a debug message"
<<
std
::
endl
;
bob
::
core
::
info
<<
"This is an info message"
<<
std
::
endl
;
bob
::
core
::
warn
<<
"This is a warning message"
<<
std
::
endl
;
bob
::
core
::
error
<<
"This is an error message"
<<
std
::
endl
;
bob
::
core
::
log_level
(
bob
::
core
::
DEBUG
);
Py_RETURN_NONE
;
BOB_CATCH_FUNCTION
(
"_test_output_disable"
,
0
)
}
static
PyMethodDef
module_methods
[]
=
{
{
_logmsg_doc
.
name
(),
(
PyCFunction
)
log_message
,
METH_VARARGS
|
METH_KEYWORDS
,
_logmsg_doc
.
doc
()
},
{
_logmsg_mt_doc
.
name
(),
(
PyCFunction
)
log_message_mt
,
METH_VARARGS
|
METH_KEYWORDS
,
_logmsg_mt_doc
.
doc
()
},
{
_output_disable_doc
.
name
(),
(
PyCFunction
)
output_disable
,
METH_VARARGS
|
METH_KEYWORDS
,
_output_disable_doc
.
doc
()
},
{
0
}
/* Sentinel */
};
#if PY_VERSION_HEX >= 0x03000000
static
PyModuleDef
module_definition
=
{
PyModuleDef_HEAD_INIT
,
BOB_EXT_MODULE_NAME
,
module_docstr
,
-
1
,
module_methods
,
0
,
0
,
0
,
0
};
#endif
static
PyObject
*
create_module
(
void
)
{
# if PY_VERSION_HEX >= 0x03000000
PyObject
*
m
=
PyModule_Create
(
&
module_definition
);
auto
m_
=
make_xsafe
(
m
);
const
char
*
ret
=
"O"
;
# else
PyObject
*
m
=
Py_InitModule3
(
BOB_EXT_MODULE_NAME
,
module_methods
,
module_docstr
);
const
char
*
ret
=
"N"
;
# endif
if
(
!
m
)
return
0
;
/* register version numbers and constants */
if
(
PyModule_AddStringConstant
(
m
,
"module"
,
BOB_EXT_MODULE_VERSION
)
<
0
)
return
0
;
return
Py_BuildValue
(
ret
,
m
);
}
PyMODINIT_FUNC
BOB_EXT_ENTRY_NAME
(
void
)
{
# if PY_VERSION_HEX >= 0x03000000
return
# endif
create_module
();
}
bob/core/test_logging.py
View file @
46c274b3
...
...
@@ -66,9 +66,113 @@ def test_from_python_output():
def
test_from_cxx
():
bob
.
core
.
_logging
.
_log_message
(
1
,
'error'
,
'this is a test message'
)
from
bob.core._test
import
_test_log_message
_test
_log_message
(
1
,
'error'
,
'this is a test message'
)
def
test_from_cxx_multithreaded
():
bob
.
core
.
_logging
.
_log_message_mt
(
2
,
1
,
'error'
,
'this is a test message'
)
from
bob.core._test
import
_test_log_message_mt
_test_log_message_mt
(
2
,
1
,
'error'
,
'this is a test message'
)
def
test_from_cxx_disable
():
from
bob.core._test
import
_test_output_disable
import
sys
,
os
import
threading
class
OutputGrabber
(
object
):
"""
Class used to grab standard output or another stream.
This is a slight modification of what was proposed here:
http://stackoverflow.com/a/29834357/3301902
"""
escape_char
=
"
\b
"
def
__init__
(
self
,
stream
=
None
,
threaded
=
False
):
self
.
origstream
=
stream
self
.
threaded
=
threaded
if
self
.
origstream
is
None
:
self
.
origstream
=
sys
.
stdout
self
.
origstreamfd
=
self
.
origstream
.
fileno
()
self
.
capturedtext
=
""
# Create a pipe so the stream can be captured:
self
.
pipe_out
,
self
.
pipe_in
=
os
.
pipe
()
def
start
(
self
):
"""
Start capturing the stream data.
"""
self
.
capturedtext
=
""
# Save a copy of the stream:
self
.
streamfd
=
os
.
dup
(
self
.
origstreamfd
)
# Replace the Original stream with our write pipe
os
.
dup2
(
self
.
pipe_in
,
self
.
origstreamfd
)
if
self
.
threaded
:
# Start thread that will read the stream:
self
.
workerThread
=
threading
.
Thread
(
target
=
self
.
readOutput
)
self
.
workerThread
.
start
()
# Make sure that the thread is running and os.read is executed:
time
.
sleep
(
0.01
)
def
stop
(
self
):
"""
Stop capturing the stream data and save the text in `capturedtext`.
"""
# Flush the stream to make sure all our data goes in before
# the escape character.
self
.
origstream
.
flush
()
# Print the escape character to make the readOutput method stop:
self
.
origstream
.
write
(
self
.
escape_char
)
self
.
origstream
.
flush
()
if
self
.
threaded
:
# wait until the thread finishes so we are sure that
# we have until the last character:
self
.
workerThread
.
join
()
else
:
self
.
readOutput
()
# Close the pipe:
os
.
close
(
self
.
pipe_out
)
# Restore the original stream:
os
.
dup2
(
self
.
streamfd
,
self
.
origstreamfd
)
def
readOutput
(
self
):
"""
Read the stream data (one byte at a time)
and save the text in `capturedtext`.
"""
while
True
:
data
=
os
.
read
(
self
.
pipe_out
,
1
)
# Read One Byte Only
if
self
.
escape_char
in
data
:
break
if
not
data
:
break
self
.
capturedtext
+=
data
try
:
# redirect output and error streams
out
=
OutputGrabber
(
sys
.
stdout
)
err
=
OutputGrabber
(
sys
.
stderr
)